mirror of https://github.com/openclaw/openclaw.git
msteams: add typingIndicator config and prevent duplicate DM typing indicator (#60771)
* msteams: add typingIndicator config and avoid duplicate DM typing * fix(msteams): validate typingIndicator config * fix(msteams): stop streaming before Teams timeout * fix(msteams): classify expired streams correctly * fix(msteams): handle link text from html attachments --------- Co-authored-by: Brad Groux <bradgroux@users.noreply.github.com>
This commit is contained in:
parent
af4e9d19cf
commit
fce81fccd8
|
|
@ -18,6 +18,25 @@ describe("msteams errors", () => {
|
|||
expect(classifyMSTeamsSendError({ statusCode: 403 }).kind).toBe("auth");
|
||||
});
|
||||
|
||||
it("classifies ContentStreamNotAllowed as permanent instead of auth", () => {
|
||||
expect(
|
||||
classifyMSTeamsSendError({
|
||||
statusCode: 403,
|
||||
response: {
|
||||
body: {
|
||||
error: {
|
||||
code: "ContentStreamNotAllowed",
|
||||
},
|
||||
},
|
||||
},
|
||||
}),
|
||||
).toMatchObject({
|
||||
kind: "permanent",
|
||||
statusCode: 403,
|
||||
errorCode: "ContentStreamNotAllowed",
|
||||
});
|
||||
});
|
||||
|
||||
it("classifies throttling errors and parses retry-after", () => {
|
||||
expect(classifyMSTeamsSendError({ statusCode: 429, retryAfter: "1.5" })).toMatchObject({
|
||||
kind: "throttled",
|
||||
|
|
@ -43,6 +62,12 @@ describe("msteams errors", () => {
|
|||
it("provides actionable hints for common cases", () => {
|
||||
expect(formatMSTeamsSendErrorHint({ kind: "auth" })).toContain("msteams");
|
||||
expect(formatMSTeamsSendErrorHint({ kind: "throttled" })).toContain("throttled");
|
||||
expect(
|
||||
formatMSTeamsSendErrorHint({
|
||||
kind: "permanent",
|
||||
errorCode: "ContentStreamNotAllowed",
|
||||
}),
|
||||
).toContain("expired the content stream");
|
||||
});
|
||||
|
||||
describe("isRevokedProxyError", () => {
|
||||
|
|
|
|||
|
|
@ -63,6 +63,32 @@ function extractStatusCode(err: unknown): number | null {
|
|||
return null;
|
||||
}
|
||||
|
||||
function extractErrorCode(err: unknown): string | null {
|
||||
if (!isRecord(err)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const direct = err.code;
|
||||
if (typeof direct === "string" && direct.trim()) {
|
||||
return direct;
|
||||
}
|
||||
|
||||
const response = err.response;
|
||||
if (!isRecord(response)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const body = response.body;
|
||||
if (isRecord(body)) {
|
||||
const error = body.error;
|
||||
if (isRecord(error) && typeof error.code === "string" && error.code.trim()) {
|
||||
return error.code;
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
function extractRetryAfterMs(err: unknown): number | null {
|
||||
if (!isRecord(err)) {
|
||||
return null;
|
||||
|
|
@ -129,6 +155,7 @@ export type MSTeamsSendErrorClassification = {
|
|||
kind: MSTeamsSendErrorKind;
|
||||
statusCode?: number;
|
||||
retryAfterMs?: number;
|
||||
errorCode?: string;
|
||||
};
|
||||
|
||||
/**
|
||||
|
|
@ -142,9 +169,17 @@ export type MSTeamsSendErrorClassification = {
|
|||
export function classifyMSTeamsSendError(err: unknown): MSTeamsSendErrorClassification {
|
||||
const statusCode = extractStatusCode(err);
|
||||
const retryAfterMs = extractRetryAfterMs(err);
|
||||
const errorCode = extractErrorCode(err) ?? undefined;
|
||||
|
||||
if (statusCode === 401 || statusCode === 403) {
|
||||
return { kind: "auth", statusCode };
|
||||
if (statusCode === 401) {
|
||||
return { kind: "auth", statusCode, errorCode };
|
||||
}
|
||||
|
||||
if (statusCode === 403) {
|
||||
if (errorCode === "ContentStreamNotAllowed") {
|
||||
return { kind: "permanent", statusCode, errorCode };
|
||||
}
|
||||
return { kind: "auth", statusCode, errorCode };
|
||||
}
|
||||
|
||||
if (statusCode === 429) {
|
||||
|
|
@ -152,6 +187,7 @@ export function classifyMSTeamsSendError(err: unknown): MSTeamsSendErrorClassifi
|
|||
kind: "throttled",
|
||||
statusCode,
|
||||
retryAfterMs: retryAfterMs ?? undefined,
|
||||
errorCode,
|
||||
};
|
||||
}
|
||||
|
||||
|
|
@ -160,17 +196,19 @@ export function classifyMSTeamsSendError(err: unknown): MSTeamsSendErrorClassifi
|
|||
kind: "transient",
|
||||
statusCode,
|
||||
retryAfterMs: retryAfterMs ?? undefined,
|
||||
errorCode,
|
||||
};
|
||||
}
|
||||
|
||||
if (statusCode != null && statusCode >= 400) {
|
||||
return { kind: "permanent", statusCode };
|
||||
return { kind: "permanent", statusCode, errorCode };
|
||||
}
|
||||
|
||||
return {
|
||||
kind: "unknown",
|
||||
statusCode: statusCode ?? undefined,
|
||||
retryAfterMs: retryAfterMs ?? undefined,
|
||||
errorCode,
|
||||
};
|
||||
}
|
||||
|
||||
|
|
@ -195,6 +233,9 @@ export function formatMSTeamsSendErrorHint(
|
|||
if (classification.kind === "auth") {
|
||||
return "check msteams appId/appPassword/tenantId (or env vars MSTEAMS_APP_ID/MSTEAMS_APP_PASSWORD/MSTEAMS_TENANT_ID)";
|
||||
}
|
||||
if (classification.errorCode === "ContentStreamNotAllowed") {
|
||||
return "Teams expired the content stream; stop streaming earlier and fall back to normal message delivery";
|
||||
}
|
||||
if (classification.kind === "throttled") {
|
||||
return "Teams throttled the bot; backing off may help";
|
||||
}
|
||||
|
|
|
|||
|
|
@ -38,6 +38,39 @@ import {
|
|||
translateMSTeamsDmConversationIdForGraph,
|
||||
wasMSTeamsBotMentioned,
|
||||
} from "../inbound.js";
|
||||
|
||||
function extractTextFromHtmlAttachments(attachments: MSTeamsAttachmentLike[]): string {
|
||||
for (const attachment of attachments) {
|
||||
if (attachment.contentType !== "text/html") {
|
||||
continue;
|
||||
}
|
||||
const raw =
|
||||
typeof attachment.content === "string"
|
||||
? attachment.content
|
||||
: typeof attachment.content?.text === "string"
|
||||
? attachment.content.text
|
||||
: typeof attachment.content?.body === "string"
|
||||
? attachment.content.body
|
||||
: "";
|
||||
if (!raw) {
|
||||
continue;
|
||||
}
|
||||
const text = raw
|
||||
.replace(/<at[^>]*>.*?<\/at>/gis, " ")
|
||||
.replace(/<a\b[^>]*href=["']([^"']+)["'][^>]*>(.*?)<\/a>/gis, "$2 $1")
|
||||
.replace(/<br\s*\/?>/gi, "\n")
|
||||
.replace(/<\/p>/gi, "\n")
|
||||
.replace(/<[^>]+>/g, " ")
|
||||
.replace(/ /gi, " ")
|
||||
.replace(/&/gi, "&")
|
||||
.replace(/\s+/g, " ")
|
||||
.trim();
|
||||
if (text) {
|
||||
return text;
|
||||
}
|
||||
}
|
||||
return "";
|
||||
}
|
||||
import type { MSTeamsMessageHandlerDeps } from "../monitor-handler.js";
|
||||
import {
|
||||
isMSTeamsGroupAllowed,
|
||||
|
|
@ -778,11 +811,12 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) {
|
|||
|
||||
return async function handleTeamsMessage(context: MSTeamsTurnContext) {
|
||||
const activity = context.activity;
|
||||
const rawText = activity.text?.trim() ?? "";
|
||||
const text = stripMSTeamsMentionTags(rawText);
|
||||
const attachments = Array.isArray(activity.attachments)
|
||||
? (activity.attachments as unknown as MSTeamsAttachmentLike[])
|
||||
: [];
|
||||
const rawText = activity.text?.trim() ?? "";
|
||||
const htmlText = extractTextFromHtmlAttachments(attachments);
|
||||
const text = stripMSTeamsMentionTags(rawText || htmlText);
|
||||
const wasMentioned = wasMSTeamsBotMentioned(activity);
|
||||
const conversationId = normalizeMSTeamsConversationId(activity.conversation?.id ?? "");
|
||||
const replyToId = activity.replyToId ?? undefined;
|
||||
|
|
|
|||
|
|
@ -145,9 +145,28 @@ describe("createMSTeamsReplyDispatcher", () => {
|
|||
|
||||
expect(streamInstances).toHaveLength(1);
|
||||
expect(streamInstances[0]?.sendInformativeUpdate).toHaveBeenCalledTimes(1);
|
||||
expect(typingCallbacks.onReplyStart).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("sends native typing indicator for channel conversations by default", async () => {
|
||||
createDispatcher("channel");
|
||||
const options = createReplyDispatcherWithTypingMock.mock.calls[0]?.[0];
|
||||
|
||||
await options.onReplyStart?.();
|
||||
|
||||
expect(streamInstances).toHaveLength(0);
|
||||
expect(typingCallbacks.onReplyStart).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("skips native typing indicator when typingIndicator=false", async () => {
|
||||
createDispatcher("channel", { typingIndicator: false });
|
||||
const options = createReplyDispatcherWithTypingMock.mock.calls[0]?.[0];
|
||||
|
||||
await options.onReplyStart?.();
|
||||
|
||||
expect(typingCallbacks.onReplyStart).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("only sends the informative status update once", async () => {
|
||||
createDispatcher("personal");
|
||||
const options = createReplyDispatcherWithTypingMock.mock.calls[0]?.[0];
|
||||
|
|
|
|||
|
|
@ -110,6 +110,8 @@ export function createMSTeamsReplyDispatcher(params: {
|
|||
|
||||
const blockStreamingEnabled =
|
||||
typeof msteamsCfg?.blockStreaming === "boolean" ? msteamsCfg.blockStreaming : false;
|
||||
const typingIndicatorEnabled =
|
||||
typeof msteamsCfg?.typingIndicator === "boolean" ? msteamsCfg.typingIndicator : true;
|
||||
|
||||
const pendingMessages: MSTeamsRenderedMessage[] = [];
|
||||
|
||||
|
|
@ -210,7 +212,10 @@ export function createMSTeamsReplyDispatcher(params: {
|
|||
humanDelay: core.channel.reply.resolveHumanDelayConfig(params.cfg, params.agentId),
|
||||
onReplyStart: async () => {
|
||||
await streamController.onReplyStart();
|
||||
await typingCallbacks?.onReplyStart?.();
|
||||
// Avoid duplicate typing UX in DMs: stream status already shows progress.
|
||||
if (typingIndicatorEnabled && !streamController.hasStream()) {
|
||||
await typingCallbacks?.onReplyStart?.();
|
||||
}
|
||||
},
|
||||
typingCallbacks,
|
||||
deliver: async (payload) => {
|
||||
|
|
|
|||
|
|
@ -1,7 +1,11 @@
|
|||
import { describe, expect, it, vi } from "vitest";
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import { TeamsHttpStream } from "./streaming-message.js";
|
||||
|
||||
describe("TeamsHttpStream", () => {
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("sends first chunk as typing activity with streaminfo", async () => {
|
||||
const sent: unknown[] = [];
|
||||
const stream = new TeamsHttpStream({
|
||||
|
|
@ -203,4 +207,40 @@ describe("TeamsHttpStream", () => {
|
|||
await stream.finalize();
|
||||
expect(sendActivity.mock.calls.length).toBe(callCount);
|
||||
});
|
||||
|
||||
it("stops streaming before stream age timeout and finalizes with last good text", async () => {
|
||||
vi.useFakeTimers();
|
||||
|
||||
const sent: unknown[] = [];
|
||||
const sendActivity = vi.fn(async (activity) => {
|
||||
sent.push(activity);
|
||||
return { id: "stream-1" };
|
||||
});
|
||||
const stream = new TeamsHttpStream({ sendActivity, throttleMs: 1 });
|
||||
|
||||
stream.update("Hello, this is a long enough response for streaming to begin.");
|
||||
await vi.advanceTimersByTimeAsync(1);
|
||||
|
||||
stream.update(
|
||||
"Hello, this is a long enough response for streaming to begin. More text before timeout.",
|
||||
);
|
||||
await vi.advanceTimersByTimeAsync(1);
|
||||
|
||||
vi.setSystemTime(new Date(Date.now() + 45_001));
|
||||
stream.update(
|
||||
"Hello, this is a long enough response for streaming to begin. More text before timeout. Even more text after timeout.",
|
||||
);
|
||||
await vi.advanceTimersByTimeAsync(1);
|
||||
|
||||
expect(stream.isFailed).toBe(true);
|
||||
|
||||
const finalActivity = sent.find((a) => (a as Record<string, unknown>).type === "message") as
|
||||
| Record<string, unknown>
|
||||
| undefined;
|
||||
|
||||
expect(finalActivity).toBeDefined();
|
||||
expect(finalActivity!.text).toBe(
|
||||
"Hello, this is a long enough response for streaming to begin. More text before timeout.",
|
||||
);
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -21,6 +21,12 @@ const MIN_INITIAL_CHARS = 20;
|
|||
/** Teams message text limit. */
|
||||
const TEAMS_MAX_CHARS = 4000;
|
||||
|
||||
/**
|
||||
* Stop streaming before Teams expires the content stream server-side.
|
||||
* The exact service limit is opaque, so stay comfortably under it.
|
||||
*/
|
||||
const MAX_STREAM_AGE_MS = 45_000;
|
||||
|
||||
type StreamSendFn = (activity: Record<string, unknown>) => Promise<{ id?: string } | unknown>;
|
||||
|
||||
export type TeamsStreamOptions = {
|
||||
|
|
@ -77,6 +83,7 @@ export class TeamsHttpStream {
|
|||
private finalized = false;
|
||||
private streamFailed = false;
|
||||
private lastStreamedText = "";
|
||||
private streamStartedAt: number | undefined = undefined;
|
||||
private loop: DraftStreamLoop;
|
||||
|
||||
constructor(options: TeamsStreamOptions) {
|
||||
|
|
@ -142,6 +149,15 @@ export class TeamsHttpStream {
|
|||
return;
|
||||
}
|
||||
|
||||
// Stop early before Teams expires the stream server-side. finalize() will
|
||||
// close the stream with the last good content, and reply-stream-controller
|
||||
// will deliver any remaining suffix via normal fallback delivery.
|
||||
if (this.streamStartedAt && Date.now() - this.streamStartedAt >= MAX_STREAM_AGE_MS) {
|
||||
this.streamFailed = true;
|
||||
void this.finalize();
|
||||
return;
|
||||
}
|
||||
|
||||
// Don't append cursor — Teams requires each chunk to be a prefix of subsequent chunks.
|
||||
// The cursor character would cause "content should contain previously streamed content" errors.
|
||||
this.loop.update(this.accumulatedText);
|
||||
|
|
@ -256,6 +272,9 @@ export class TeamsHttpStream {
|
|||
|
||||
try {
|
||||
const response = await this.sendActivity(activity);
|
||||
if (!this.streamStartedAt) {
|
||||
this.streamStartedAt = Date.now();
|
||||
}
|
||||
if (!this.streamId) {
|
||||
this.streamId = extractId(response);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -90,6 +90,8 @@ export type MSTeamsConfig = {
|
|||
textChunkLimit?: number;
|
||||
/** Chunking mode: "length" (default) splits by size; "newline" splits on every newline. */
|
||||
chunkMode?: "length" | "newline";
|
||||
/** Send native Teams typing indicator before replies. Default: true for groups/channels; DMs use informative stream status. */
|
||||
typingIndicator?: boolean;
|
||||
/** Enable progressive block-by-block message delivery instead of a single reply. */
|
||||
blockStreaming?: boolean;
|
||||
/** Merge streamed block replies before sending. */
|
||||
|
|
|
|||
|
|
@ -1534,6 +1534,7 @@ export const MSTeamsConfigSchema = z
|
|||
contextVisibility: ContextVisibilityModeSchema.optional(),
|
||||
textChunkLimit: z.number().int().positive().optional(),
|
||||
chunkMode: z.enum(["length", "newline"]).optional(),
|
||||
typingIndicator: z.boolean().optional(),
|
||||
blockStreaming: z.boolean().optional(),
|
||||
blockStreamingCoalesce: BlockStreamingCoalesceSchema.optional(),
|
||||
mediaAllowHosts: z.array(z.string()).optional(),
|
||||
|
|
|
|||
Loading…
Reference in New Issue