diff --git a/extensions/msteams/src/monitor-handler/message-handler.ts b/extensions/msteams/src/monitor-handler/message-handler.ts index e3ad16f5781..48d8289b5a3 100644 --- a/extensions/msteams/src/monitor-handler/message-handler.ts +++ b/extensions/msteams/src/monitor-handler/message-handler.ts @@ -620,6 +620,7 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) { const { dispatcher, replyOptions, markDispatchIdle } = createMSTeamsReplyDispatcher({ cfg, agentId: route.agentId, + sessionKey: route.sessionKey, accountId: route.accountId, runtime, log, diff --git a/extensions/msteams/src/reply-dispatcher.test.ts b/extensions/msteams/src/reply-dispatcher.test.ts index b2fda18f487..09a5072fbed 100644 --- a/extensions/msteams/src/reply-dispatcher.test.ts +++ b/extensions/msteams/src/reply-dispatcher.test.ts @@ -3,6 +3,7 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; const createChannelReplyPipelineMock = vi.hoisted(() => vi.fn()); const createReplyDispatcherWithTypingMock = vi.hoisted(() => vi.fn()); const getMSTeamsRuntimeMock = vi.hoisted(() => vi.fn()); +const enqueueSystemEventMock = vi.hoisted(() => vi.fn()); const renderReplyPayloadsToMessagesMock = vi.hoisted(() => vi.fn(() => [])); const sendMSTeamsMessagesMock = vi.hoisted(() => vi.fn(async () => [])); const streamInstances = vi.hoisted( @@ -86,6 +87,9 @@ describe("createMSTeamsReplyDispatcher", () => { })); getMSTeamsRuntimeMock.mockReturnValue({ + system: { + enqueueSystemEvent: enqueueSystemEventMock, + }, channel: { text: { resolveChunkMode: vi.fn(() => "length"), @@ -102,10 +106,12 @@ describe("createMSTeamsReplyDispatcher", () => { function createDispatcher( conversationType: string = "personal", msteamsConfig: Record = {}, + extraParams: { onSentMessageIds?: (ids: string[]) => void } = {}, ) { return createMSTeamsReplyDispatcher({ cfg: { channels: { msteams: msteamsConfig } } as never, agentId: "agent", + sessionKey: "agent:main:main", runtime: { error: vi.fn() } as never, log: { debug: vi.fn(), error: vi.fn(), warn: vi.fn() } as never, adapter: { @@ -127,6 +133,7 @@ describe("createMSTeamsReplyDispatcher", () => { } as never, replyStyle: "thread", textLimit: 4000, + ...extraParams, }); } @@ -206,6 +213,54 @@ describe("createMSTeamsReplyDispatcher", () => { expect(sendMSTeamsMessagesMock).not.toHaveBeenCalled(); }); + + it("queues a system event when some queued Teams messages fail to send", async () => { + const onSentMessageIds = vi.fn(); + renderReplyPayloadsToMessagesMock.mockReturnValue([ + { content: "one" }, + { content: "two" }, + ] as never); + sendMSTeamsMessagesMock + .mockRejectedValueOnce(Object.assign(new Error("gateway timeout"), { statusCode: 502 })) + .mockResolvedValueOnce(["id-1"] as never) + .mockRejectedValueOnce(Object.assign(new Error("gateway timeout"), { statusCode: 502 })); + + const dispatcher = createDispatcher( + "personal", + { blockStreaming: false }, + { onSentMessageIds }, + ); + const options = createReplyDispatcherWithTypingMock.mock.calls[0]?.[0]; + + await options.deliver({ text: "block content" }); + await dispatcher.markDispatchIdle(); + + expect(onSentMessageIds).toHaveBeenCalledWith(["id-1"]); + expect(enqueueSystemEventMock).toHaveBeenCalledWith( + expect.stringContaining("Microsoft Teams delivery failed"), + expect.objectContaining({ + sessionKey: "agent:main:main", + contextKey: "msteams:delivery-failure:conv", + }), + ); + expect(enqueueSystemEventMock).toHaveBeenCalledWith( + expect.stringContaining("The user may not have received the full reply"), + expect.any(Object), + ); + }); + + it("does not queue a delivery-failure system event when Teams send succeeds", async () => { + renderReplyPayloadsToMessagesMock.mockReturnValue([{ content: "hello" }] as never); + sendMSTeamsMessagesMock.mockResolvedValue(["id-1"] as never); + + const dispatcher = createDispatcher("personal", { blockStreaming: false }); + const options = createReplyDispatcherWithTypingMock.mock.calls[0]?.[0]; + + await options.deliver({ text: "block content" }); + await dispatcher.markDispatchIdle(); + + expect(enqueueSystemEventMock).not.toHaveBeenCalled(); + }); }); describe("pickInformativeStatusText", () => { diff --git a/extensions/msteams/src/reply-dispatcher.ts b/extensions/msteams/src/reply-dispatcher.ts index bfcd1740d41..f235b6c0958 100644 --- a/extensions/msteams/src/reply-dispatcher.ts +++ b/extensions/msteams/src/reply-dispatcher.ts @@ -31,6 +31,7 @@ export { pickInformativeStatusText } from "./reply-stream-controller.js"; export function createMSTeamsReplyDispatcher(params: { cfg: OpenClawConfig; agentId: string; + sessionKey: string; accountId?: string; runtime: RuntimeEnv; log: MSTeamsMonitorLogger; @@ -134,6 +135,32 @@ export function createMSTeamsReplyDispatcher(params: { }); }; + const queueDeliveryFailureSystemEvent = (failure: { + failed: number; + total: number; + error: unknown; + }) => { + const classification = classifyMSTeamsSendError(failure.error); + const errorText = formatUnknownError(failure.error); + const failedAll = failure.failed >= failure.total; + const summary = failedAll + ? "the previous reply was not delivered" + : `${failure.failed} of ${failure.total} message blocks were not delivered`; + const sentences = [ + `Microsoft Teams delivery failed: ${summary}.`, + `The user may not have received ${failedAll ? "that reply" : "the full reply"}.`, + `Error: ${errorText}.`, + classification.statusCode != null ? `Status: ${classification.statusCode}.` : undefined, + classification.kind === "transient" || classification.kind === "throttled" + ? "Retrying later may succeed." + : undefined, + ].filter(Boolean); + core.system.enqueueSystemEvent(sentences.join(" "), { + sessionKey: params.sessionKey, + contextKey: `msteams:delivery-failure:${params.conversationRef.conversation?.id ?? "unknown"}`, + }); + }; + const flushPendingMessages = async () => { if (pendingMessages.length === 0) { return; @@ -143,15 +170,17 @@ export function createMSTeamsReplyDispatcher(params: { let ids: string[]; try { ids = await sendMessages(toSend); - } catch { + } catch (batchError) { ids = []; let failed = 0; + let lastFailedError: unknown = batchError; for (const msg of toSend) { try { const msgIds = await sendMessages([msg]); ids.push(...msgIds); - } catch { + } catch (msgError) { failed += 1; + lastFailedError = msgError; params.log.debug?.("individual message send failed, continuing with remaining blocks"); } } @@ -160,6 +189,11 @@ export function createMSTeamsReplyDispatcher(params: { failed, total, }); + queueDeliveryFailureSystemEvent({ + failed, + total, + error: lastFailedError, + }); } } if (ids.length > 0) {