diff --git a/extensions/msteams/src/errors.test.ts b/extensions/msteams/src/errors.test.ts index dd8a4f3fdab..5e78e41157a 100644 --- a/extensions/msteams/src/errors.test.ts +++ b/extensions/msteams/src/errors.test.ts @@ -18,6 +18,25 @@ describe("msteams errors", () => { expect(classifyMSTeamsSendError({ statusCode: 403 }).kind).toBe("auth"); }); + it("classifies ContentStreamNotAllowed as permanent instead of auth", () => { + expect( + classifyMSTeamsSendError({ + statusCode: 403, + response: { + body: { + error: { + code: "ContentStreamNotAllowed", + }, + }, + }, + }), + ).toMatchObject({ + kind: "permanent", + statusCode: 403, + errorCode: "ContentStreamNotAllowed", + }); + }); + it("classifies throttling errors and parses retry-after", () => { expect(classifyMSTeamsSendError({ statusCode: 429, retryAfter: "1.5" })).toMatchObject({ kind: "throttled", @@ -43,6 +62,12 @@ describe("msteams errors", () => { it("provides actionable hints for common cases", () => { expect(formatMSTeamsSendErrorHint({ kind: "auth" })).toContain("msteams"); expect(formatMSTeamsSendErrorHint({ kind: "throttled" })).toContain("throttled"); + expect( + formatMSTeamsSendErrorHint({ + kind: "permanent", + errorCode: "ContentStreamNotAllowed", + }), + ).toContain("expired the content stream"); }); describe("isRevokedProxyError", () => { diff --git a/extensions/msteams/src/errors.ts b/extensions/msteams/src/errors.ts index 985cdb5fff0..62449e126f9 100644 --- a/extensions/msteams/src/errors.ts +++ b/extensions/msteams/src/errors.ts @@ -63,6 +63,32 @@ function extractStatusCode(err: unknown): number | null { return null; } +function extractErrorCode(err: unknown): string | null { + if (!isRecord(err)) { + return null; + } + + const direct = err.code; + if (typeof direct === "string" && direct.trim()) { + return direct; + } + + const response = err.response; + if (!isRecord(response)) { + return null; + } + + const body = response.body; + if (isRecord(body)) { + const error = body.error; + if (isRecord(error) && typeof error.code === "string" && error.code.trim()) { + return error.code; + } + } + + return null; +} + function extractRetryAfterMs(err: unknown): number | null { if (!isRecord(err)) { return null; @@ -129,6 +155,7 @@ export type MSTeamsSendErrorClassification = { kind: MSTeamsSendErrorKind; statusCode?: number; retryAfterMs?: number; + errorCode?: string; }; /** @@ -142,9 +169,17 @@ export type MSTeamsSendErrorClassification = { export function classifyMSTeamsSendError(err: unknown): MSTeamsSendErrorClassification { const statusCode = extractStatusCode(err); const retryAfterMs = extractRetryAfterMs(err); + const errorCode = extractErrorCode(err) ?? undefined; - if (statusCode === 401 || statusCode === 403) { - return { kind: "auth", statusCode }; + if (statusCode === 401) { + return { kind: "auth", statusCode, errorCode }; + } + + if (statusCode === 403) { + if (errorCode === "ContentStreamNotAllowed") { + return { kind: "permanent", statusCode, errorCode }; + } + return { kind: "auth", statusCode, errorCode }; } if (statusCode === 429) { @@ -152,6 +187,7 @@ export function classifyMSTeamsSendError(err: unknown): MSTeamsSendErrorClassifi kind: "throttled", statusCode, retryAfterMs: retryAfterMs ?? undefined, + errorCode, }; } @@ -160,17 +196,19 @@ export function classifyMSTeamsSendError(err: unknown): MSTeamsSendErrorClassifi kind: "transient", statusCode, retryAfterMs: retryAfterMs ?? undefined, + errorCode, }; } if (statusCode != null && statusCode >= 400) { - return { kind: "permanent", statusCode }; + return { kind: "permanent", statusCode, errorCode }; } return { kind: "unknown", statusCode: statusCode ?? undefined, retryAfterMs: retryAfterMs ?? undefined, + errorCode, }; } @@ -195,6 +233,9 @@ export function formatMSTeamsSendErrorHint( if (classification.kind === "auth") { return "check msteams appId/appPassword/tenantId (or env vars MSTEAMS_APP_ID/MSTEAMS_APP_PASSWORD/MSTEAMS_TENANT_ID)"; } + if (classification.errorCode === "ContentStreamNotAllowed") { + return "Teams expired the content stream; stop streaming earlier and fall back to normal message delivery"; + } if (classification.kind === "throttled") { return "Teams throttled the bot; backing off may help"; } diff --git a/extensions/msteams/src/monitor-handler/message-handler.ts b/extensions/msteams/src/monitor-handler/message-handler.ts index 948f7a43357..2cc4ac7d09e 100644 --- a/extensions/msteams/src/monitor-handler/message-handler.ts +++ b/extensions/msteams/src/monitor-handler/message-handler.ts @@ -38,6 +38,39 @@ import { translateMSTeamsDmConversationIdForGraph, wasMSTeamsBotMentioned, } from "../inbound.js"; + +function extractTextFromHtmlAttachments(attachments: MSTeamsAttachmentLike[]): string { + for (const attachment of attachments) { + if (attachment.contentType !== "text/html") { + continue; + } + const raw = + typeof attachment.content === "string" + ? attachment.content + : typeof attachment.content?.text === "string" + ? attachment.content.text + : typeof attachment.content?.body === "string" + ? attachment.content.body + : ""; + if (!raw) { + continue; + } + const text = raw + .replace(/]*>.*?<\/at>/gis, " ") + .replace(/]*href=["']([^"']+)["'][^>]*>(.*?)<\/a>/gis, "$2 $1") + .replace(//gi, "\n") + .replace(/<\/p>/gi, "\n") + .replace(/<[^>]+>/g, " ") + .replace(/ /gi, " ") + .replace(/&/gi, "&") + .replace(/\s+/g, " ") + .trim(); + if (text) { + return text; + } + } + return ""; +} import type { MSTeamsMessageHandlerDeps } from "../monitor-handler.js"; import { isMSTeamsGroupAllowed, @@ -778,11 +811,12 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) { return async function handleTeamsMessage(context: MSTeamsTurnContext) { const activity = context.activity; - const rawText = activity.text?.trim() ?? ""; - const text = stripMSTeamsMentionTags(rawText); const attachments = Array.isArray(activity.attachments) ? (activity.attachments as unknown as MSTeamsAttachmentLike[]) : []; + const rawText = activity.text?.trim() ?? ""; + const htmlText = extractTextFromHtmlAttachments(attachments); + const text = stripMSTeamsMentionTags(rawText || htmlText); const wasMentioned = wasMSTeamsBotMentioned(activity); const conversationId = normalizeMSTeamsConversationId(activity.conversation?.id ?? ""); const replyToId = activity.replyToId ?? undefined; diff --git a/extensions/msteams/src/reply-dispatcher.test.ts b/extensions/msteams/src/reply-dispatcher.test.ts index 09a5072fbed..deb43da447f 100644 --- a/extensions/msteams/src/reply-dispatcher.test.ts +++ b/extensions/msteams/src/reply-dispatcher.test.ts @@ -145,9 +145,28 @@ describe("createMSTeamsReplyDispatcher", () => { expect(streamInstances).toHaveLength(1); expect(streamInstances[0]?.sendInformativeUpdate).toHaveBeenCalledTimes(1); + expect(typingCallbacks.onReplyStart).not.toHaveBeenCalled(); + }); + + it("sends native typing indicator for channel conversations by default", async () => { + createDispatcher("channel"); + const options = createReplyDispatcherWithTypingMock.mock.calls[0]?.[0]; + + await options.onReplyStart?.(); + + expect(streamInstances).toHaveLength(0); expect(typingCallbacks.onReplyStart).toHaveBeenCalledTimes(1); }); + it("skips native typing indicator when typingIndicator=false", async () => { + createDispatcher("channel", { typingIndicator: false }); + const options = createReplyDispatcherWithTypingMock.mock.calls[0]?.[0]; + + await options.onReplyStart?.(); + + expect(typingCallbacks.onReplyStart).not.toHaveBeenCalled(); + }); + it("only sends the informative status update once", async () => { createDispatcher("personal"); const options = createReplyDispatcherWithTypingMock.mock.calls[0]?.[0]; diff --git a/extensions/msteams/src/reply-dispatcher.ts b/extensions/msteams/src/reply-dispatcher.ts index f235b6c0958..a1b0ca3d0d3 100644 --- a/extensions/msteams/src/reply-dispatcher.ts +++ b/extensions/msteams/src/reply-dispatcher.ts @@ -110,6 +110,8 @@ export function createMSTeamsReplyDispatcher(params: { const blockStreamingEnabled = typeof msteamsCfg?.blockStreaming === "boolean" ? msteamsCfg.blockStreaming : false; + const typingIndicatorEnabled = + typeof msteamsCfg?.typingIndicator === "boolean" ? msteamsCfg.typingIndicator : true; const pendingMessages: MSTeamsRenderedMessage[] = []; @@ -210,7 +212,10 @@ export function createMSTeamsReplyDispatcher(params: { humanDelay: core.channel.reply.resolveHumanDelayConfig(params.cfg, params.agentId), onReplyStart: async () => { await streamController.onReplyStart(); - await typingCallbacks?.onReplyStart?.(); + // Avoid duplicate typing UX in DMs: stream status already shows progress. + if (typingIndicatorEnabled && !streamController.hasStream()) { + await typingCallbacks?.onReplyStart?.(); + } }, typingCallbacks, deliver: async (payload) => { diff --git a/extensions/msteams/src/streaming-message.test.ts b/extensions/msteams/src/streaming-message.test.ts index 0d17d7c764e..3d255888f41 100644 --- a/extensions/msteams/src/streaming-message.test.ts +++ b/extensions/msteams/src/streaming-message.test.ts @@ -1,7 +1,11 @@ -import { describe, expect, it, vi } from "vitest"; +import { afterEach, describe, expect, it, vi } from "vitest"; import { TeamsHttpStream } from "./streaming-message.js"; describe("TeamsHttpStream", () => { + afterEach(() => { + vi.useRealTimers(); + }); + it("sends first chunk as typing activity with streaminfo", async () => { const sent: unknown[] = []; const stream = new TeamsHttpStream({ @@ -203,4 +207,40 @@ describe("TeamsHttpStream", () => { await stream.finalize(); expect(sendActivity.mock.calls.length).toBe(callCount); }); + + it("stops streaming before stream age timeout and finalizes with last good text", async () => { + vi.useFakeTimers(); + + const sent: unknown[] = []; + const sendActivity = vi.fn(async (activity) => { + sent.push(activity); + return { id: "stream-1" }; + }); + const stream = new TeamsHttpStream({ sendActivity, throttleMs: 1 }); + + stream.update("Hello, this is a long enough response for streaming to begin."); + await vi.advanceTimersByTimeAsync(1); + + stream.update( + "Hello, this is a long enough response for streaming to begin. More text before timeout.", + ); + await vi.advanceTimersByTimeAsync(1); + + vi.setSystemTime(new Date(Date.now() + 45_001)); + stream.update( + "Hello, this is a long enough response for streaming to begin. More text before timeout. Even more text after timeout.", + ); + await vi.advanceTimersByTimeAsync(1); + + expect(stream.isFailed).toBe(true); + + const finalActivity = sent.find((a) => (a as Record).type === "message") as + | Record + | undefined; + + expect(finalActivity).toBeDefined(); + expect(finalActivity!.text).toBe( + "Hello, this is a long enough response for streaming to begin. More text before timeout.", + ); + }); }); diff --git a/extensions/msteams/src/streaming-message.ts b/extensions/msteams/src/streaming-message.ts index 25ee75efb68..23a9f2bbf1b 100644 --- a/extensions/msteams/src/streaming-message.ts +++ b/extensions/msteams/src/streaming-message.ts @@ -21,6 +21,12 @@ const MIN_INITIAL_CHARS = 20; /** Teams message text limit. */ const TEAMS_MAX_CHARS = 4000; +/** + * Stop streaming before Teams expires the content stream server-side. + * The exact service limit is opaque, so stay comfortably under it. + */ +const MAX_STREAM_AGE_MS = 45_000; + type StreamSendFn = (activity: Record) => Promise<{ id?: string } | unknown>; export type TeamsStreamOptions = { @@ -77,6 +83,7 @@ export class TeamsHttpStream { private finalized = false; private streamFailed = false; private lastStreamedText = ""; + private streamStartedAt: number | undefined = undefined; private loop: DraftStreamLoop; constructor(options: TeamsStreamOptions) { @@ -142,6 +149,15 @@ export class TeamsHttpStream { return; } + // Stop early before Teams expires the stream server-side. finalize() will + // close the stream with the last good content, and reply-stream-controller + // will deliver any remaining suffix via normal fallback delivery. + if (this.streamStartedAt && Date.now() - this.streamStartedAt >= MAX_STREAM_AGE_MS) { + this.streamFailed = true; + void this.finalize(); + return; + } + // Don't append cursor — Teams requires each chunk to be a prefix of subsequent chunks. // The cursor character would cause "content should contain previously streamed content" errors. this.loop.update(this.accumulatedText); @@ -256,6 +272,9 @@ export class TeamsHttpStream { try { const response = await this.sendActivity(activity); + if (!this.streamStartedAt) { + this.streamStartedAt = Date.now(); + } if (!this.streamId) { this.streamId = extractId(response); } diff --git a/src/config/types.msteams.ts b/src/config/types.msteams.ts index 6530c169beb..5c88f2bbb25 100644 --- a/src/config/types.msteams.ts +++ b/src/config/types.msteams.ts @@ -90,6 +90,8 @@ export type MSTeamsConfig = { textChunkLimit?: number; /** Chunking mode: "length" (default) splits by size; "newline" splits on every newline. */ chunkMode?: "length" | "newline"; + /** Send native Teams typing indicator before replies. Default: true for groups/channels; DMs use informative stream status. */ + typingIndicator?: boolean; /** Enable progressive block-by-block message delivery instead of a single reply. */ blockStreaming?: boolean; /** Merge streamed block replies before sending. */ diff --git a/src/config/zod-schema.providers-core.ts b/src/config/zod-schema.providers-core.ts index 9279cfd4567..4fe886330fe 100644 --- a/src/config/zod-schema.providers-core.ts +++ b/src/config/zod-schema.providers-core.ts @@ -1534,6 +1534,7 @@ export const MSTeamsConfigSchema = z contextVisibility: ContextVisibilityModeSchema.optional(), textChunkLimit: z.number().int().positive().optional(), chunkMode: z.enum(["length", "newline"]).optional(), + typingIndicator: z.boolean().optional(), blockStreaming: z.boolean().optional(), blockStreamingCoalesce: BlockStreamingCoalesceSchema.optional(), mediaAllowHosts: z.array(z.string()).optional(),