From 0c926a2c5e82e5fa01eee151618f2d8a05c160de Mon Sep 17 00:00:00 2001 From: Teconomix Date: Sat, 14 Mar 2026 07:53:23 +0100 Subject: [PATCH] fix(mattermost): carry thread context to non-inbound reply paths (#44283) Merged via squash. Prepared head SHA: 2846a6cfa959019d3ed811ccafae6b757db3bdf3 Co-authored-by: teconomix <6959299+teconomix@users.noreply.github.com> Co-authored-by: mukhtharcm <56378562+mukhtharcm@users.noreply.github.com> Reviewed-by: @mukhtharcm --- CHANGELOG.md | 1 + extensions/mattermost/src/channel.test.ts | 47 ++++++++ extensions/mattermost/src/channel.ts | 17 ++- .../reply/dispatch-from-config.test.ts | 114 +++++++++++++++++- src/auto-reply/reply/dispatch-from-config.ts | 15 ++- src/auto-reply/reply/route-reply.test.ts | 37 ++++++ src/auto-reply/reply/route-reply.ts | 4 +- .../monitor.tool-result.test-harness.ts | 16 ++- src/slack/monitor.test-helpers.ts | 18 +-- 9 files changed, 244 insertions(+), 25 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 85ad205ff0e..25bad54390e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -75,6 +75,7 @@ Docs: https://docs.openclaw.ai - Config/discovery: accept `discovery.wideArea.domain` in strict config validation so unicast DNS-SD gateway configs no longer fail with an unrecognized-key error. (#35615) Thanks @ingyukoh. - Telegram/media errors: redact Telegram file URLs before building media fetch errors so failed inbound downloads do not leak bot tokens into logs. Thanks @space08. - Agents/failover: normalize abort-wrapped `429 RESOURCE_EXHAUSTED` provider failures before abort short-circuiting so wrapped Google/Vertex rate limits continue across configured fallback models, including the embedded runner prompt-error path. (#39820) Thanks @lupuletic. +- Mattermost/thread routing: non-inbound reply paths (TUI/WebUI turns, tool-call callbacks, subagent responses) now correctly route to the originating Mattermost thread when `replyToMode: "all"` is active; also prevents stale `origin.threadId` metadata from resurrecting cleared thread routes. (#44283) thanks @teconomix ## 2026.3.12 diff --git a/extensions/mattermost/src/channel.test.ts b/extensions/mattermost/src/channel.test.ts index c188a8e6719..5ac333b2e6c 100644 --- a/extensions/mattermost/src/channel.test.ts +++ b/extensions/mattermost/src/channel.test.ts @@ -355,6 +355,53 @@ describe("mattermostPlugin", () => { }), ); }); + + it("uses threadId as fallback when replyToId is absent (sendText)", async () => { + const sendText = mattermostPlugin.outbound?.sendText; + if (!sendText) { + return; + } + + await sendText({ + to: "channel:CHAN1", + text: "hello", + accountId: "default", + threadId: "post-root", + } as any); + + expect(sendMessageMattermostMock).toHaveBeenCalledWith( + "channel:CHAN1", + "hello", + expect.objectContaining({ + accountId: "default", + replyToId: "post-root", + }), + ); + }); + + it("uses threadId as fallback when replyToId is absent (sendMedia)", async () => { + const sendMedia = mattermostPlugin.outbound?.sendMedia; + if (!sendMedia) { + return; + } + + await sendMedia({ + to: "channel:CHAN1", + text: "caption", + mediaUrl: "https://example.com/image.png", + accountId: "default", + threadId: "post-root", + } as any); + + expect(sendMessageMattermostMock).toHaveBeenCalledWith( + "channel:CHAN1", + "caption", + expect.objectContaining({ + accountId: "default", + replyToId: "post-root", + }), + ); + }); }); describe("config", () => { diff --git a/extensions/mattermost/src/channel.ts b/extensions/mattermost/src/channel.ts index c872b8d5085..45c4d863c7c 100644 --- a/extensions/mattermost/src/channel.ts +++ b/extensions/mattermost/src/channel.ts @@ -390,21 +390,30 @@ export const mattermostPlugin: ChannelPlugin = { } return { ok: true, to: trimmed }; }, - sendText: async ({ cfg, to, text, accountId, replyToId }) => { + sendText: async ({ cfg, to, text, accountId, replyToId, threadId }) => { const result = await sendMessageMattermost(to, text, { cfg, accountId: accountId ?? undefined, - replyToId: replyToId ?? undefined, + replyToId: replyToId ?? (threadId != null ? String(threadId) : undefined), }); return { channel: "mattermost", ...result }; }, - sendMedia: async ({ cfg, to, text, mediaUrl, mediaLocalRoots, accountId, replyToId }) => { + sendMedia: async ({ + cfg, + to, + text, + mediaUrl, + mediaLocalRoots, + accountId, + replyToId, + threadId, + }) => { const result = await sendMessageMattermost(to, text, { cfg, accountId: accountId ?? undefined, mediaUrl, mediaLocalRoots, - replyToId: replyToId ?? undefined, + replyToId: replyToId ?? (threadId != null ? String(threadId) : undefined), }); return { channel: "mattermost", ...result }; }, diff --git a/src/auto-reply/reply/dispatch-from-config.test.ts b/src/auto-reply/reply/dispatch-from-config.test.ts index 87e77785bbb..666964eb865 100644 --- a/src/auto-reply/reply/dispatch-from-config.test.ts +++ b/src/auto-reply/reply/dispatch-from-config.test.ts @@ -41,6 +41,12 @@ const acpMocks = vi.hoisted(() => ({ const sessionBindingMocks = vi.hoisted(() => ({ listBySession: vi.fn<(targetSessionKey: string) => SessionBindingRecord[]>(() => []), })); +const sessionStoreMocks = vi.hoisted(() => ({ + currentEntry: undefined as Record | undefined, + loadSessionStore: vi.fn(() => ({})), + resolveStorePath: vi.fn(() => "/tmp/mock-sessions.json"), + resolveSessionStoreEntry: vi.fn(() => ({ existing: sessionStoreMocks.currentEntry })), +})); const ttsMocks = vi.hoisted(() => { const state = { synthesizeFinalAudio: false, @@ -77,9 +83,16 @@ vi.mock("./route-reply.js", () => ({ isRoutableChannel: (channel: string | undefined) => Boolean( channel && - ["telegram", "slack", "discord", "signal", "imessage", "whatsapp", "feishu"].includes( - channel, - ), + [ + "telegram", + "slack", + "discord", + "signal", + "imessage", + "whatsapp", + "feishu", + "mattermost", + ].includes(channel), ), routeReply: mocks.routeReply, })); @@ -100,6 +113,15 @@ vi.mock("../../logging/diagnostic.js", () => ({ logMessageProcessed: diagnosticMocks.logMessageProcessed, logSessionStateChange: diagnosticMocks.logSessionStateChange, })); +vi.mock("../../config/sessions.js", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + loadSessionStore: sessionStoreMocks.loadSessionStore, + resolveStorePath: sessionStoreMocks.resolveStorePath, + resolveSessionStoreEntry: sessionStoreMocks.resolveSessionStoreEntry, + }; +}); vi.mock("../../plugins/hook-runner-global.js", () => ({ getGlobalHookRunner: () => hookMocks.runner, @@ -228,6 +250,10 @@ describe("dispatchReplyFromConfig", () => { acpMocks.requireAcpRuntimeBackend.mockReset(); sessionBindingMocks.listBySession.mockReset(); sessionBindingMocks.listBySession.mockReturnValue([]); + sessionStoreMocks.currentEntry = undefined; + sessionStoreMocks.loadSessionStore.mockClear(); + sessionStoreMocks.resolveStorePath.mockClear(); + sessionStoreMocks.resolveSessionStoreEntry.mockClear(); ttsMocks.state.synthesizeFinalAudio = false; ttsMocks.maybeApplyTtsToPayload.mockClear(); ttsMocks.normalizeTtsAutoMode.mockClear(); @@ -293,6 +319,88 @@ describe("dispatchReplyFromConfig", () => { ); }); + it("falls back to thread-scoped session key when current ctx has no MessageThreadId", async () => { + setNoAbort(); + mocks.routeReply.mockClear(); + sessionStoreMocks.currentEntry = { + deliveryContext: { + channel: "mattermost", + to: "channel:CHAN1", + accountId: "default", + }, + origin: { + threadId: "stale-origin-root", + }, + lastThreadId: "stale-origin-root", + }; + const cfg = emptyConfig; + const dispatcher = createDispatcher(); + const ctx = buildTestCtx({ + Provider: "webchat", + Surface: "webchat", + SessionKey: "agent:main:mattermost:channel:CHAN1:thread:post-root", + AccountId: "default", + MessageThreadId: undefined, + OriginatingChannel: "mattermost", + OriginatingTo: "channel:CHAN1", + ExplicitDeliverRoute: true, + }); + + const replyResolver = async () => ({ text: "hi" }) satisfies ReplyPayload; + await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); + + expect(mocks.routeReply).toHaveBeenCalledWith( + expect.objectContaining({ + channel: "mattermost", + to: "channel:CHAN1", + threadId: "post-root", + }), + ); + }); + + it("does not resurrect a cleared route thread from origin metadata", async () => { + setNoAbort(); + mocks.routeReply.mockClear(); + // Simulate the real store: lastThreadId and deliveryContext.threadId may be normalised from + // origin.threadId on read, but a non-thread session key must still route to channel root. + sessionStoreMocks.currentEntry = { + deliveryContext: { + channel: "mattermost", + to: "channel:CHAN1", + accountId: "default", + threadId: "stale-root", + }, + lastThreadId: "stale-root", + origin: { + threadId: "stale-root", + }, + }; + const cfg = emptyConfig; + const dispatcher = createDispatcher(); + const ctx = buildTestCtx({ + Provider: "webchat", + Surface: "webchat", + SessionKey: "agent:main:mattermost:channel:CHAN1", + AccountId: "default", + MessageThreadId: undefined, + OriginatingChannel: "mattermost", + OriginatingTo: "channel:CHAN1", + ExplicitDeliverRoute: true, + }); + + const replyResolver = async () => ({ text: "hi" }) satisfies ReplyPayload; + await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); + + const routeCall = mocks.routeReply.mock.calls[0]?.[0] as + | { channel?: string; to?: string; threadId?: string | number } + | undefined; + expect(routeCall).toMatchObject({ + channel: "mattermost", + to: "channel:CHAN1", + }); + expect(routeCall?.threadId).toBeUndefined(); + }); + it("forces suppressTyping when routing to a different originating channel", async () => { setNoAbort(); const cfg = emptyConfig; diff --git a/src/auto-reply/reply/dispatch-from-config.ts b/src/auto-reply/reply/dispatch-from-config.ts index 5b250b03362..b21fcabe80b 100644 --- a/src/auto-reply/reply/dispatch-from-config.ts +++ b/src/auto-reply/reply/dispatch-from-config.ts @@ -2,6 +2,7 @@ import { resolveSessionAgentId } from "../../agents/agent-scope.js"; import type { OpenClawConfig } from "../../config/config.js"; import { loadSessionStore, + parseSessionThreadInfo, resolveSessionStoreEntry, resolveStorePath, type SessionEntry, @@ -172,6 +173,12 @@ export async function dispatchReplyFromConfig(params: { const sessionStoreEntry = resolveSessionStoreLookup(ctx, cfg); const acpDispatchSessionKey = sessionStoreEntry.sessionKey ?? sessionKey; + // Restore route thread context only from the active turn or the thread-scoped session key. + // Do not read thread ids from the normalised session store here: `origin.threadId` can be + // folded back into lastThreadId/deliveryContext during store normalisation and resurrect a + // stale route after thread delivery was intentionally cleared. + const routeThreadId = + ctx.MessageThreadId ?? parseSessionThreadInfo(acpDispatchSessionKey).threadId; const inboundAudio = isInboundAudioContext(ctx); const sessionTtsAuto = normalizeTtsAutoMode(sessionStoreEntry.entry?.ttsAuto); const hookRunner = getGlobalHookRunner(); @@ -260,7 +267,7 @@ export async function dispatchReplyFromConfig(params: { to: originatingTo, sessionKey: ctx.SessionKey, accountId: ctx.AccountId, - threadId: ctx.MessageThreadId, + threadId: routeThreadId, cfg, abortSignal, mirror, @@ -289,7 +296,7 @@ export async function dispatchReplyFromConfig(params: { to: originatingTo, sessionKey: ctx.SessionKey, accountId: ctx.AccountId, - threadId: ctx.MessageThreadId, + threadId: routeThreadId, cfg, isGroup, groupId, @@ -519,7 +526,7 @@ export async function dispatchReplyFromConfig(params: { to: originatingTo, sessionKey: ctx.SessionKey, accountId: ctx.AccountId, - threadId: ctx.MessageThreadId, + threadId: routeThreadId, cfg, isGroup, groupId, @@ -571,7 +578,7 @@ export async function dispatchReplyFromConfig(params: { to: originatingTo, sessionKey: ctx.SessionKey, accountId: ctx.AccountId, - threadId: ctx.MessageThreadId, + threadId: routeThreadId, cfg, isGroup, groupId, diff --git a/src/auto-reply/reply/route-reply.test.ts b/src/auto-reply/reply/route-reply.test.ts index 62f91097223..bfae51e63c2 100644 --- a/src/auto-reply/reply/route-reply.test.ts +++ b/src/auto-reply/reply/route-reply.test.ts @@ -1,4 +1,5 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { mattermostPlugin } from "../../../extensions/mattermost/src/channel.js"; import { discordOutbound } from "../../channels/plugins/outbound/discord.js"; import { imessageOutbound } from "../../channels/plugins/outbound/imessage.js"; import { signalOutbound } from "../../channels/plugins/outbound/signal.js"; @@ -24,6 +25,7 @@ const mocks = vi.hoisted(() => ({ sendMessageSlack: vi.fn(async () => ({ messageId: "m1", channelId: "c1" })), sendMessageTelegram: vi.fn(async () => ({ messageId: "m1", chatId: "c1" })), sendMessageWhatsApp: vi.fn(async () => ({ messageId: "m1", toJid: "jid" })), + sendMessageMattermost: vi.fn(async () => ({ messageId: "m1", channelId: "c1" })), deliverOutboundPayloads: vi.fn(), })); @@ -46,6 +48,9 @@ vi.mock("../../web/outbound.js", () => ({ sendMessageWhatsApp: mocks.sendMessageWhatsApp, sendPollWhatsApp: mocks.sendMessageWhatsApp, })); +vi.mock("../../../extensions/mattermost/src/mattermost/send.js", () => ({ + sendMessageMattermost: mocks.sendMessageMattermost, +})); vi.mock("../../infra/outbound/deliver.js", async () => { const actual = await vi.importActual( "../../infra/outbound/deliver.js", @@ -335,6 +340,33 @@ describe("routeReply", () => { ); }); + it("uses threadId as replyToId for Mattermost when replyToId is missing", async () => { + mocks.deliverOutboundPayloads.mockResolvedValue([]); + await routeReply({ + payload: { text: "hi" }, + channel: "mattermost", + to: "channel:CHAN1", + threadId: "post-root", + cfg: { + channels: { + mattermost: { + enabled: true, + botToken: "test-token", + baseUrl: "https://chat.example.com", + }, + }, + } as unknown as OpenClawConfig, + }); + expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith( + expect.objectContaining({ + channel: "mattermost", + to: "channel:CHAN1", + replyToId: "post-root", + threadId: "post-root", + }), + ); + }); + it("sends multiple mediaUrls (caption only on first)", async () => { mocks.sendMessageSlack.mockClear(); await routeReply({ @@ -501,4 +533,9 @@ const defaultRegistry = createTestRegistry([ }), source: "test", }, + { + pluginId: "mattermost", + plugin: mattermostPlugin, + source: "test", + }, ]); diff --git a/src/auto-reply/reply/route-reply.ts b/src/auto-reply/reply/route-reply.ts index 8b3319698b2..a6f863d7d18 100644 --- a/src/auto-reply/reply/route-reply.ts +++ b/src/auto-reply/reply/route-reply.ts @@ -149,7 +149,9 @@ export async function routeReply(params: RouteReplyParams): Promise ({ upsertChannelPairingRequest: (...args: unknown[]) => upsertPairingRequestMock(...args), })); -vi.mock("../config/sessions.js", () => ({ - resolveStorePath: vi.fn(() => "/tmp/openclaw-sessions.json"), - updateLastRoute: (...args: unknown[]) => updateLastRouteMock(...args), - readSessionUpdatedAt: vi.fn(() => undefined), - recordSessionMetaFromInbound: vi.fn().mockResolvedValue(undefined), -})); +vi.mock("../config/sessions.js", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + resolveStorePath: vi.fn(() => "/tmp/openclaw-sessions.json"), + updateLastRoute: (...args: unknown[]) => updateLastRouteMock(...args), + readSessionUpdatedAt: vi.fn(() => undefined), + recordSessionMetaFromInbound: vi.fn().mockResolvedValue(undefined), + }; +}); vi.mock("./client.js", () => ({ streamSignalEvents: (...args: unknown[]) => streamMock(...args), diff --git a/src/slack/monitor.test-helpers.ts b/src/slack/monitor.test-helpers.ts index 17b868fa972..99028f29a11 100644 --- a/src/slack/monitor.test-helpers.ts +++ b/src/slack/monitor.test-helpers.ts @@ -180,13 +180,17 @@ vi.mock("../pairing/pairing-store.js", () => ({ slackTestState.upsertPairingRequestMock(...args), })); -vi.mock("../config/sessions.js", () => ({ - resolveStorePath: vi.fn(() => "/tmp/openclaw-sessions.json"), - updateLastRoute: (...args: unknown[]) => slackTestState.updateLastRouteMock(...args), - resolveSessionKey: vi.fn(), - readSessionUpdatedAt: vi.fn(() => undefined), - recordSessionMetaFromInbound: vi.fn().mockResolvedValue(undefined), -})); +vi.mock("../config/sessions.js", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + resolveStorePath: vi.fn(() => "/tmp/openclaw-sessions.json"), + updateLastRoute: (...args: unknown[]) => slackTestState.updateLastRouteMock(...args), + resolveSessionKey: vi.fn(), + readSessionUpdatedAt: vi.fn(() => undefined), + recordSessionMetaFromInbound: vi.fn().mockResolvedValue(undefined), + }; +}); vi.mock("@slack/bolt", () => { const handlers = new Map();