From 6f7825a3a947ed0341037d64ef28e3e01947770c Mon Sep 17 00:00:00 2001 From: chain710 Date: Sun, 29 Mar 2026 00:01:54 +0800 Subject: [PATCH] feat(matrix): add group chat history context for agent triggers Implement per-room message queue with per-agent watermarks so each agent in a Matrix room independently tracks which messages it has consumed. - Non-trigger messages accumulate in a shared per-room queue - When an agent is triggered, it receives InboundHistory of pending messages since its last reply (capped at historyLimit) - Watermark only advances to the snapshot taken at dispatch time, so messages arriving during async processing are visible on the next trigger (race safety) - Each agent has an independent watermark, so multiple agents in the same room see independent history windows Configure via channels.matrix.historyLimit (or messages.groupChat.historyLimit). Default is 0 (disabled, preserving existing behavior). --- docs/.generated/config-baseline.json | 10 + docs/.generated/config-baseline.jsonl | 3 +- extensions/matrix/src/config-schema.ts | 1 + .../monitor/handler.group-history.test.ts | 421 ++++++++++++++++++ .../matrix/monitor/handler.test-helpers.ts | 2 + .../matrix/src/matrix/monitor/handler.test.ts | 1 + .../matrix/src/matrix/monitor/handler.ts | 75 +++- extensions/matrix/src/matrix/monitor/index.ts | 5 + .../src/matrix/monitor/room-history.test.ts | 106 +++++ .../matrix/src/matrix/monitor/room-history.ts | 147 ++++++ extensions/matrix/src/types.ts | 6 + ...ndled-channel-config-metadata.generated.ts | 58 +++ 12 files changed, 833 insertions(+), 2 deletions(-) create mode 100644 extensions/matrix/src/matrix/monitor/handler.group-history.test.ts create mode 100644 extensions/matrix/src/matrix/monitor/room-history.test.ts create mode 100644 extensions/matrix/src/matrix/monitor/room-history.ts diff --git a/docs/.generated/config-baseline.json b/docs/.generated/config-baseline.json index d84b918133e..baa775e0465 100644 --- a/docs/.generated/config-baseline.json +++ b/docs/.generated/config-baseline.json @@ -22371,6 +22371,16 @@ "tags": [], "hasChildren": false }, + { + "path": "channels.matrix.historyLimit", + "kind": "channel", + "type": "integer", + "required": false, + "deprecated": false, + "sensitive": false, + "tags": [], + "hasChildren": false + }, { "path": "channels.matrix.homeserver", "kind": "channel", diff --git a/docs/.generated/config-baseline.jsonl b/docs/.generated/config-baseline.jsonl index cf1589942aa..67aa6a01d73 100644 --- a/docs/.generated/config-baseline.jsonl +++ b/docs/.generated/config-baseline.jsonl @@ -1,4 +1,4 @@ -{"generatedBy":"scripts/generate-config-doc-baseline.ts","recordType":"meta","totalPaths":5593} +{"generatedBy":"scripts/generate-config-doc-baseline.ts","recordType":"meta","totalPaths":5594} {"recordType":"path","path":"acp","kind":"core","type":"object","required":false,"deprecated":false,"sensitive":false,"tags":["advanced"],"label":"ACP","help":"ACP runtime controls for enabling dispatch, selecting backends, constraining allowed agent targets, and tuning streamed turn projection behavior.","hasChildren":true} {"recordType":"path","path":"acp.allowedAgents","kind":"core","type":"array","required":false,"deprecated":false,"sensitive":false,"tags":["access"],"label":"ACP Allowed Agents","help":"Allowlist of ACP target agent ids permitted for ACP runtime sessions. Empty means no additional allowlist restriction.","hasChildren":true} {"recordType":"path","path":"acp.allowedAgents.*","kind":"core","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} @@ -1993,6 +1993,7 @@ {"recordType":"path","path":"channels.matrix.groups.*.tools.deny.*","kind":"channel","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} {"recordType":"path","path":"channels.matrix.groups.*.users","kind":"channel","type":"array","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":true} {"recordType":"path","path":"channels.matrix.groups.*.users.*","kind":"channel","type":["number","string"],"required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} +{"recordType":"path","path":"channels.matrix.historyLimit","kind":"channel","type":"integer","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} {"recordType":"path","path":"channels.matrix.homeserver","kind":"channel","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} {"recordType":"path","path":"channels.matrix.initialSyncLimit","kind":"channel","type":"number","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} {"recordType":"path","path":"channels.matrix.markdown","kind":"channel","type":"object","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":true} diff --git a/extensions/matrix/src/config-schema.ts b/extensions/matrix/src/config-schema.ts index 07bf5ae26db..91326daaa34 100644 --- a/extensions/matrix/src/config-schema.ts +++ b/extensions/matrix/src/config-schema.ts @@ -80,6 +80,7 @@ export const MatrixConfigSchema = z.object({ startupVerification: z.enum(["off", "if-unverified"]).optional(), startupVerificationCooldownHours: z.number().optional(), mediaMaxMb: z.number().optional(), + historyLimit: z.number().int().min(0).optional(), autoJoin: z.enum(["always", "allowlist", "off"]).optional(), autoJoinAllowlist: AllowFromListSchema, groupAllowFrom: AllowFromListSchema, diff --git a/extensions/matrix/src/matrix/monitor/handler.group-history.test.ts b/extensions/matrix/src/matrix/monitor/handler.group-history.test.ts new file mode 100644 index 00000000000..8a389ac97fc --- /dev/null +++ b/extensions/matrix/src/matrix/monitor/handler.group-history.test.ts @@ -0,0 +1,421 @@ +/** + * Tests for Matrix group chat history accumulation. + * + * Covers two key scenarios: + * + * Scenario 1 — basic accumulation across agents: + * user: msg A (no mention, accumulates) + * user: @agent_a msg B (triggers agent_a; agent_a sees [A] in history, not B itself) + * user: @agent_b msg C (triggers agent_b; agent_b sees [A, B] — independent watermark) + * user: @agent_b msg D (triggers agent_b; agent_b sees [] — A/B/C were consumed) + * + * Scenario 2 — race condition safety: + * user: @agent_a msg A (triggers agent_a; agent starts processing, not yet replied) + * user: msg B (no mention, arrives during processing — must not be lost) + * agent_a: reply (watermark advances to just after A, not after B) + * user: @agent_a msg C (triggers agent_a; agent_a sees [B] in history) + */ + +import { beforeEach, describe, expect, it, vi } from "vitest"; +import { installMatrixMonitorTestRuntime } from "../../test-runtime.js"; +import { + createMatrixHandlerTestHarness, + createMatrixRoomMessageEvent, + createMatrixTextMessageEvent, +} from "./handler.test-helpers.js"; +import { EventType, type MatrixRawEvent } from "./types.js"; + +const DEFAULT_ROOM = "!room:example.org"; + +function makeRoomTriggerEvent(params: { eventId: string; body: string; ts?: number }) { + // Use @room mention to trigger the bot without requiring agent-specific mention regexes + return createMatrixTextMessageEvent({ + eventId: params.eventId, + body: `@room ${params.body}`, + originServerTs: params.ts ?? Date.now(), + mentions: { room: true }, + }); +} + +function makeRoomPlainEvent(params: { eventId: string; body: string; ts?: number }) { + return createMatrixTextMessageEvent({ + eventId: params.eventId, + body: params.body, + originServerTs: params.ts ?? Date.now(), + }); +} + +function makeDevRoute(agentId: string) { + return { + agentId, + channel: "matrix" as const, + accountId: "ops", + sessionKey: `agent:${agentId}:main`, + mainSessionKey: `agent:${agentId}:main`, + matchedBy: "binding.account" as const, + }; +} + +beforeEach(() => { + installMatrixMonitorTestRuntime(); +}); + +describe("matrix group chat history — scenario 1: basic accumulation", () => { + it("pending messages appear in InboundHistory; trigger itself does not", async () => { + const finalizeInboundContext = vi.fn((ctx: unknown) => ctx); + const { handler } = createMatrixHandlerTestHarness({ + historyLimit: 20, + groupPolicy: "open", + isDirectMessage: false, + finalizeInboundContext, + dispatchReplyFromConfig: async () => ({ + queuedFinal: true, + counts: { final: 1, block: 0, tool: 0 }, + }), + }); + + // Non-trigger message A — should not dispatch + await handler(DEFAULT_ROOM, makeRoomPlainEvent({ eventId: "$a", body: "msg A", ts: 1000 })); + expect(finalizeInboundContext).not.toHaveBeenCalled(); + + // Trigger B — history must contain [msg A] only, not the trigger itself + await handler(DEFAULT_ROOM, makeRoomTriggerEvent({ eventId: "$b", body: "msg B", ts: 2000 })); + expect(finalizeInboundContext).toHaveBeenCalledOnce(); + const ctx = finalizeInboundContext.mock.calls[0]?.[0] as Record; + const history = ctx["InboundHistory"] as Array<{ body: string; sender: string }>; + expect(history).toHaveLength(1); + expect(history[0]?.body).toContain("msg A"); + }); + + it("multi-agent: each agent has an independent watermark", async () => { + let currentAgentId = "agent_a"; + const finalizeInboundContext = vi.fn((ctx: unknown) => ctx); + const { handler } = createMatrixHandlerTestHarness({ + historyLimit: 20, + groupPolicy: "open", + isDirectMessage: false, + finalizeInboundContext, + resolveAgentRoute: vi.fn(() => makeDevRoute(currentAgentId)), + dispatchReplyFromConfig: async () => ({ + queuedFinal: true, + counts: { final: 1, block: 0, tool: 0 }, + }), + }); + + // msg A accumulates for all agents + await handler(DEFAULT_ROOM, makeRoomPlainEvent({ eventId: "$a", body: "msg A", ts: 1000 })); + + // @agent_a trigger B — agent_a sees [msg A] + currentAgentId = "agent_a"; + await handler(DEFAULT_ROOM, makeRoomTriggerEvent({ eventId: "$b", body: "msg B", ts: 2000 })); + { + const ctx = finalizeInboundContext.mock.calls[0]?.[0] as Record; + const history = ctx["InboundHistory"] as Array<{ body: string }>; + expect(history).toHaveLength(1); + expect(history[0]?.body).toContain("msg A"); + } + + // @agent_b trigger C — agent_b watermark is 0, so it sees [msg A, msg B] + currentAgentId = "agent_b"; + await handler(DEFAULT_ROOM, makeRoomTriggerEvent({ eventId: "$c", body: "msg C", ts: 3000 })); + { + const ctx = finalizeInboundContext.mock.calls[1]?.[0] as Record; + const history = ctx["InboundHistory"] as Array<{ body: string }>; + expect(history).toHaveLength(2); + expect(history.map((h) => h.body).some((b) => b.includes("msg A"))).toBe(true); + expect(history.map((h) => h.body).some((b) => b.includes("msg B"))).toBe(true); + } + + // @agent_b trigger D — A/B/C consumed; history is empty + currentAgentId = "agent_b"; + await handler(DEFAULT_ROOM, makeRoomTriggerEvent({ eventId: "$d", body: "msg D", ts: 4000 })); + { + const ctx = finalizeInboundContext.mock.calls[2]?.[0] as Record; + const history = ctx["InboundHistory"] as Array | undefined; + expect(history ?? []).toHaveLength(0); + } + }); + + it("respects historyLimit: caps to the most recent N entries", async () => { + const finalizeInboundContext = vi.fn((ctx: unknown) => ctx); + const { handler } = createMatrixHandlerTestHarness({ + historyLimit: 2, + groupPolicy: "open", + isDirectMessage: false, + finalizeInboundContext, + dispatchReplyFromConfig: async () => ({ + queuedFinal: true, + counts: { final: 1, block: 0, tool: 0 }, + }), + }); + + for (let i = 1; i <= 4; i++) { + await handler( + DEFAULT_ROOM, + makeRoomPlainEvent({ eventId: `$p${i}`, body: `pending ${i}`, ts: i * 1000 }), + ); + } + + await handler(DEFAULT_ROOM, makeRoomTriggerEvent({ eventId: "$t", body: "trigger", ts: 5000 })); + const ctx = finalizeInboundContext.mock.calls[0]?.[0] as Record; + const history = ctx["InboundHistory"] as Array<{ body: string }>; + expect(history).toHaveLength(2); + expect(history[0]?.body).toContain("pending 3"); + expect(history[1]?.body).toContain("pending 4"); + }); + + it("historyLimit=0 disables history accumulation entirely", async () => { + const finalizeInboundContext = vi.fn((ctx: unknown) => ctx); + const { handler } = createMatrixHandlerTestHarness({ + historyLimit: 0, + groupPolicy: "open", + isDirectMessage: false, + finalizeInboundContext, + dispatchReplyFromConfig: async () => ({ + queuedFinal: true, + counts: { final: 1, block: 0, tool: 0 }, + }), + }); + + await handler(DEFAULT_ROOM, makeRoomPlainEvent({ eventId: "$p", body: "pending" })); + await handler(DEFAULT_ROOM, makeRoomTriggerEvent({ eventId: "$t", body: "trigger" })); + + const ctx = finalizeInboundContext.mock.calls[0]?.[0] as Record; + const history = ctx["InboundHistory"] as Array | undefined; + expect(history ?? []).toHaveLength(0); + }); + + it("DMs do not accumulate history (group chat only)", async () => { + const finalizeInboundContext = vi.fn((ctx: unknown) => ctx); + const { handler } = createMatrixHandlerTestHarness({ + historyLimit: 20, + isDirectMessage: true, + finalizeInboundContext, + dispatchReplyFromConfig: async () => ({ + queuedFinal: true, + counts: { final: 1, block: 0, tool: 0 }, + }), + }); + + await handler(DEFAULT_ROOM, makeRoomPlainEvent({ eventId: "$dm1", body: "dm message 1" })); + await handler(DEFAULT_ROOM, makeRoomPlainEvent({ eventId: "$dm2", body: "dm message 2" })); + + expect(finalizeInboundContext).toHaveBeenCalledTimes(2); + for (const call of finalizeInboundContext.mock.calls) { + const ctx = call[0] as Record; + const history = ctx["InboundHistory"] as Array | undefined; + expect(history ?? []).toHaveLength(0); + } + }); + + it("includes skipped media-only room messages in next trigger history", async () => { + const finalizeInboundContext = vi.fn((ctx: unknown) => ctx); + const { handler } = createMatrixHandlerTestHarness({ + historyLimit: 20, + groupPolicy: "open", + isDirectMessage: false, + finalizeInboundContext, + dispatchReplyFromConfig: async () => ({ + queuedFinal: true, + counts: { final: 1, block: 0, tool: 0 }, + }), + }); + + // Unmentioned media-only message should be buffered as pending history context. + await handler( + DEFAULT_ROOM, + createMatrixRoomMessageEvent({ + eventId: "$media-a", + originServerTs: 1000, + content: { + msgtype: "m.image", + body: "", + url: "mxc://example.org/media-a", + }, + }) as MatrixRawEvent, + ); + expect(finalizeInboundContext).not.toHaveBeenCalled(); + + await handler( + DEFAULT_ROOM, + makeRoomTriggerEvent({ eventId: "$trigger-media", body: "trigger", ts: 2000 }), + ); + expect(finalizeInboundContext).toHaveBeenCalledOnce(); + const ctx = finalizeInboundContext.mock.calls[0]?.[0] as Record; + const history = ctx["InboundHistory"] as Array<{ body: string }> | undefined; + expect(history?.some((entry) => entry.body.includes("[matrix image attachment]"))).toBe(true); + }); +}); + +describe("matrix group chat history — scenario 2: race condition safety", () => { + it("messages arriving during agent processing are visible on the next trigger", async () => { + let resolveFirstDispatch: (() => void) | undefined; + let firstDispatchStarted = false; + + const finalizeInboundContext = vi.fn((ctx: unknown) => ctx); + const dispatchReplyFromConfig = vi.fn(async () => { + if (!firstDispatchStarted) { + firstDispatchStarted = true; + await new Promise((resolve) => { + resolveFirstDispatch = resolve; + }); + } + return { queuedFinal: true, counts: { final: 1, block: 0, tool: 0 } }; + }); + + const { handler } = createMatrixHandlerTestHarness({ + historyLimit: 20, + groupPolicy: "open", + isDirectMessage: false, + finalizeInboundContext, + dispatchReplyFromConfig, + }); + + // Step 1: trigger msg A — don't await, let it block in dispatch + const firstHandlerDone = handler( + DEFAULT_ROOM, + makeRoomTriggerEvent({ eventId: "$a", body: "msg A", ts: 1000 }), + ); + + // Step 2: wait until dispatch is in-flight + await vi.waitFor(() => { + expect(firstDispatchStarted).toBe(true); + }); + + // Step 3: msg B arrives while agent is processing — must not be lost + await handler(DEFAULT_ROOM, makeRoomPlainEvent({ eventId: "$b", body: "msg B", ts: 2000 })); + + // Step 4: unblock dispatch and complete + resolveFirstDispatch!(); + await firstHandlerDone; + // watermark advances to snapshot taken at dispatch time (just after msg A), not to queue end + + // Step 5: trigger msg C — should see [msg B] in history (msg A was consumed) + await handler(DEFAULT_ROOM, makeRoomTriggerEvent({ eventId: "$c", body: "msg C", ts: 3000 })); + + expect(finalizeInboundContext).toHaveBeenCalledTimes(2); + const ctxForC = finalizeInboundContext.mock.calls[1]?.[0] as Record; + const history = ctxForC["InboundHistory"] as Array<{ body: string }>; + expect(history.some((h) => h.body.includes("msg B"))).toBe(true); + expect(history.every((h) => !h.body.includes("msg A"))).toBe(true); + }); + + it("watermark does not advance when final reply delivery fails (retry sees same history)", async () => { + // Capture the onError callback so we can fire a simulated final delivery failure + let capturedOnError: + | ((err: unknown, info: { kind: "tool" | "block" | "final" }) => void) + | undefined; + + const finalizeInboundContext = vi.fn((ctx: unknown) => ctx); + const { handler } = createMatrixHandlerTestHarness({ + historyLimit: 20, + groupPolicy: "open", + isDirectMessage: false, + finalizeInboundContext, + dispatchReplyFromConfig: async () => ({ + queuedFinal: true, + counts: { final: 1, block: 0, tool: 0 }, + }), + createReplyDispatcherWithTyping: (params?: { + onError?: (err: unknown, info: { kind: "tool" | "block" | "final" }) => void; + }) => { + capturedOnError = params?.onError; + return { + dispatcher: {}, + replyOptions: {}, + markDispatchIdle: () => {}, + markRunComplete: () => {}, + }; + }, + withReplyDispatcher: async (params: { + dispatcher: { markComplete?: () => void; waitForIdle?: () => Promise }; + run: () => Promise; + onSettled?: () => void | Promise; + }) => { + const result = await params.run(); + capturedOnError?.(new Error("simulated delivery failure"), { kind: "final" }); + params.dispatcher.markComplete?.(); + await params.dispatcher.waitForIdle?.(); + await params.onSettled?.(); + return result; + }, + }); + + await handler( + DEFAULT_ROOM, + makeRoomPlainEvent({ eventId: "$p", body: "pending msg", ts: 1000 }), + ); + + // First trigger — delivery fails; watermark must NOT advance + await handler( + DEFAULT_ROOM, + makeRoomTriggerEvent({ eventId: "$t1", body: "trigger 1", ts: 2000 }), + ); + expect(finalizeInboundContext).toHaveBeenCalledOnce(); + { + const ctx = finalizeInboundContext.mock.calls[0]?.[0] as Record; + const history = ctx["InboundHistory"] as Array<{ body: string }>; + expect(history).toHaveLength(1); + expect(history[0]?.body).toContain("pending msg"); + } + + // Second trigger — pending msg must still be visible (watermark not advanced) + await handler( + DEFAULT_ROOM, + makeRoomTriggerEvent({ eventId: "$t2", body: "trigger 2", ts: 3000 }), + ); + expect(finalizeInboundContext).toHaveBeenCalledTimes(2); + { + const ctx = finalizeInboundContext.mock.calls[1]?.[0] as Record; + const history = ctx["InboundHistory"] as Array<{ body: string }> | undefined; + expect(history?.some((h) => h.body.includes("pending msg"))).toBe(true); + } + }); + + it("records pending history before sender-name lookup resolves", async () => { + let resolveFirstName: (() => void) | undefined; + let firstNameLookupStarted = false; + const getMemberDisplayName = vi.fn(async () => { + firstNameLookupStarted = true; + await new Promise((resolve) => { + resolveFirstName = resolve; + }); + return "sender"; + }); + + const finalizeInboundContext = vi.fn((ctx: unknown) => ctx); + const { handler } = createMatrixHandlerTestHarness({ + historyLimit: 20, + groupPolicy: "open", + isDirectMessage: false, + getMemberDisplayName, + finalizeInboundContext, + dispatchReplyFromConfig: async () => ({ + queuedFinal: true, + counts: { final: 1, block: 0, tool: 0 }, + }), + }); + + // Unmentioned message should be buffered without waiting for async sender-name lookup. + await handler( + DEFAULT_ROOM, + makeRoomPlainEvent({ eventId: "$slow-name", body: "plain before trigger", ts: 1000 }), + ); + expect(firstNameLookupStarted).toBe(false); + + // Trigger reads pending history first, then can await sender-name lookup later. + const triggerDone = handler( + DEFAULT_ROOM, + makeRoomTriggerEvent({ eventId: "$trigger-after-slow-name", body: "trigger", ts: 2000 }), + ); + await vi.waitFor(() => { + expect(firstNameLookupStarted).toBe(true); + }); + resolveFirstName?.(); + await triggerDone; + + const ctx = finalizeInboundContext.mock.calls[0]?.[0] as Record; + const history = ctx["InboundHistory"] as Array<{ body: string }> | undefined; + expect(history?.some((entry) => entry.body.includes("plain before trigger"))).toBe(true); + }); +}); diff --git a/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts b/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts index 88a8055b437..bf06aa24375 100644 --- a/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts +++ b/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts @@ -40,6 +40,7 @@ type MatrixHandlerTestHarnessOptions = { dropPreStartupMessages?: boolean; needsRoomAliasesForConfig?: boolean; isDirectMessage?: boolean; + historyLimit?: number; readAllowFromStore?: MatrixMonitorHandlerParams["core"]["channel"]["pairing"]["readAllowFromStore"]; upsertPairingRequest?: MatrixMonitorHandlerParams["core"]["channel"]["pairing"]["upsertPairingRequest"]; buildPairingReply?: () => string; @@ -225,6 +226,7 @@ export function createMatrixHandlerTestHarness( getRoomInfo: options.getRoomInfo ?? (async () => ({ altAliases: [] })), getMemberDisplayName: options.getMemberDisplayName ?? (async () => "sender"), needsRoomAliasesForConfig: options.needsRoomAliasesForConfig ?? false, + historyLimit: options.historyLimit ?? 0, }); return { diff --git a/extensions/matrix/src/matrix/monitor/handler.test.ts b/extensions/matrix/src/matrix/monitor/handler.test.ts index c30061400b1..7bbf0a49c26 100644 --- a/extensions/matrix/src/matrix/monitor/handler.test.ts +++ b/extensions/matrix/src/matrix/monitor/handler.test.ts @@ -892,6 +892,7 @@ describe("matrix monitor handler pairing account scope", () => { dmPolicy: "open", textLimit: 8_000, mediaMaxBytes: 10_000_000, + historyLimit: 0, startupMs: 0, startupGraceMs: 0, directTracker: { diff --git a/extensions/matrix/src/matrix/monitor/handler.ts b/extensions/matrix/src/matrix/monitor/handler.ts index 5fd8d97beb8..a880546fd94 100644 --- a/extensions/matrix/src/matrix/monitor/handler.ts +++ b/extensions/matrix/src/matrix/monitor/handler.ts @@ -15,7 +15,12 @@ import { } from "../../runtime-api.js"; import type { CoreConfig, MatrixRoomConfig, ReplyToMode } from "../../types.js"; import { createMatrixDraftStream } from "../draft-stream.js"; -import { formatMatrixMediaUnavailableText } from "../media-text.js"; +import { + formatMatrixMediaUnavailableText, + formatMatrixMessageText, + resolveMatrixMessageAttachment, + resolveMatrixMessageBody, +} from "../media-text.js"; import { fetchMatrixPollSnapshot } from "../poll-summary.js"; import { formatPollAsText, @@ -40,6 +45,8 @@ import { resolveMentions } from "./mentions.js"; import { handleInboundMatrixReaction } from "./reaction-events.js"; import { deliverMatrixReplies } from "./replies.js"; import { createMatrixReplyContextResolver } from "./reply-context.js"; +import { createRoomHistoryTracker } from "./room-history.js"; +import type { HistoryEntry } from "./room-history.js"; import { resolveMatrixRoomConfig } from "./rooms.js"; import { resolveMatrixInboundRoute } from "./route.js"; import { createMatrixThreadContextResolver } from "./thread-context.js"; @@ -74,6 +81,7 @@ export type MatrixMonitorHandlerParams = { dmPolicy: "open" | "pairing" | "allowlist" | "disabled"; textLimit: number; mediaMaxBytes: number; + historyLimit: number; startupMs: number; startupGraceMs: number; dropPreStartupMessages: boolean; @@ -134,6 +142,29 @@ function resolveMatrixInboundBodyText(params: { }); } +function resolveMatrixPendingHistoryText(params: { + mentionPrecheckText: string; + content: RoomMessageEventContent; + mediaUrl?: string; +}): string { + if (params.mentionPrecheckText) { + return params.mentionPrecheckText; + } + if (!params.mediaUrl) { + return ""; + } + const body = typeof params.content.body === "string" ? params.content.body.trim() : undefined; + const filename = + typeof params.content.filename === "string" ? params.content.filename.trim() : undefined; + const msgtype = typeof params.content.msgtype === "string" ? params.content.msgtype : undefined; + return ( + formatMatrixMessageText({ + body: resolveMatrixMessageBody({ body, filename, msgtype }), + attachment: resolveMatrixMessageAttachment({ body, filename, msgtype }), + }) ?? "" + ); +} + function resolveMatrixAllowBotsMode(value?: boolean | "mentions"): MatrixAllowBotsMode { if (value === true) { return "all"; @@ -166,6 +197,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam dmPolicy, textLimit, mediaMaxBytes, + historyLimit, startupMs, startupGraceMs, dropPreStartupMessages, @@ -190,6 +222,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam getMemberDisplayName, logVerboseMessage, }); + const roomHistoryTracker = createRoomHistoryTracker(); const readStoreAllowFrom = async (): Promise => { const now = Date.now(); @@ -532,6 +565,11 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam ? content.file : undefined; const mediaUrl = contentUrl ?? contentFile?.url; + const pendingHistoryText = resolveMatrixPendingHistoryText({ + mentionPrecheckText, + content, + mediaUrl, + }); if (!mentionPrecheckText && !mediaUrl && !isPollEvent) { await commitInboundEventIfClaimed(); return; @@ -622,6 +660,16 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam hasControlCommandInMessage; const canDetectMention = agentMentionRegexes.length > 0 || hasExplicitMention; if (isRoom && shouldRequireMention && !wasMentioned && !shouldBypassMention) { + // Record in room history so future triggered replies can see this message as context. + if (historyLimit > 0 && pendingHistoryText) { + const pendingEntry: HistoryEntry = { + // Keep skipped-message buffering non-blocking: sender name lookup can be async. + sender: senderId, + body: pendingHistoryText, + timestamp: eventTs ?? undefined, + }; + roomHistoryTracker.recordPending(roomId, pendingEntry); + } logger.info("skipping room message", { roomId, reason: "no-mention" }); await commitInboundEventIfClaimed(); return; @@ -753,6 +801,22 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam getSessionBindingService().touch(_runtimeBindingId, eventTs ?? undefined); } const envelopeFrom = isDirectMessage ? senderName : (roomName ?? roomId); + + // Group chat history: read pending history before recording this trigger, then + // snapshot the queue position so the watermark can advance to exactly here on reply. + const inboundHistory = + isRoom && historyLimit > 0 + ? roomHistoryTracker.getPendingHistory(_route.agentId, roomId, historyLimit) + : undefined; + const triggerSnapshotIdx = + isRoom && historyLimit > 0 + ? roomHistoryTracker.recordTrigger(roomId, { + sender: senderName, + body: bodyText, + timestamp: eventTs ?? undefined, + }) + : -1; + const textWithId = `${bodyText}\n[matrix event id: ${_messageId} room: ${roomId}]`; const storePath = core.channel.session.resolveStorePath(cfg.session?.store, { agentId: _route.agentId, @@ -776,6 +840,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam Body: body, RawBody: bodyText, CommandBody: bodyText, + InboundHistory: inboundHistory && inboundHistory.length > 0 ? inboundHistory : undefined, From: isDirectMessage ? `matrix:${senderId}` : `matrix:channel:${roomId}`, To: `room:${roomId}`, SessionKey: _route.sessionKey, @@ -1164,14 +1229,22 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam logVerboseMessage( `matrix: final reply delivery failed room=${roomId} id=${_messageId}; leaving event uncommitted`, ); + // Do not advance watermark — the event will be retried and should see the same history. return; } if (!queuedFinal && nonFinalReplyDeliveryFailed) { logVerboseMessage( `matrix: non-final reply delivery failed room=${roomId} id=${_messageId}; leaving event uncommitted`, ); + // Do not advance watermark — the event will be retried. return; } + // Advance the per-agent watermark now that the reply succeeded (or no reply was needed). + // Only advance to the snapshot position — messages added during async processing remain + // visible for the next trigger. + if (isRoom && triggerSnapshotIdx >= 0) { + roomHistoryTracker.consumeHistory(_route.agentId, roomId, triggerSnapshotIdx); + } if (!queuedFinal) { await commitInboundEventIfClaimed(); return; diff --git a/extensions/matrix/src/matrix/monitor/index.ts b/extensions/matrix/src/matrix/monitor/index.ts index 4352107d3f7..51e1eec27e7 100644 --- a/extensions/matrix/src/matrix/monitor/index.ts +++ b/extensions/matrix/src/matrix/monitor/index.ts @@ -196,6 +196,10 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi const dmPolicyRaw = dmConfig?.policy ?? "pairing"; const dmPolicy = allowlistOnly && dmPolicyRaw !== "disabled" ? "allowlist" : dmPolicyRaw; const textLimit = core.channel.text.resolveTextChunkLimit(cfg, "matrix", account.accountId); + const globalGroupChatHistoryLimit = ( + cfg.messages as { groupChat?: { historyLimit?: number } } | undefined + )?.groupChat?.historyLimit; + const historyLimit = Math.max(0, accountConfig.historyLimit ?? globalGroupChatHistoryLimit ?? 0); const mediaMaxMb = opts.mediaMaxMb ?? accountConfig.mediaMaxMb ?? DEFAULT_MEDIA_MAX_MB; const mediaMaxBytes = Math.max(1, mediaMaxMb) * 1024 * 1024; const streaming: "partial" | "off" = @@ -232,6 +236,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi dmPolicy, textLimit, mediaMaxBytes, + historyLimit, startupMs, startupGraceMs, dropPreStartupMessages, diff --git a/extensions/matrix/src/matrix/monitor/room-history.test.ts b/extensions/matrix/src/matrix/monitor/room-history.test.ts new file mode 100644 index 00000000000..24f9841df03 --- /dev/null +++ b/extensions/matrix/src/matrix/monitor/room-history.test.ts @@ -0,0 +1,106 @@ +/** + * Unit tests for createRoomHistoryTracker. + * + * Covers correctness properties that are hard to observe through the handler harness: + * - Monotone watermark advancement (out-of-order consumeHistory must not regress) + * - roomQueues FIFO eviction when the room count exceeds the cap + */ + +import { describe, expect, it } from "vitest"; +import { createRoomHistoryTracker } from "./room-history.js"; + +const ROOM = "!room:test"; +const AGENT = "agent_a"; + +function entry(body: string) { + return { sender: "user", body }; +} + +describe("createRoomHistoryTracker — watermark monotonicity", () => { + it("consumeHistory is monotone: out-of-order completion does not regress the watermark", () => { + const tracker = createRoomHistoryTracker(); + + // Queue: [msg1, msg2, trigger1, msg3, trigger2] + tracker.recordPending(ROOM, entry("msg1")); + tracker.recordPending(ROOM, entry("msg2")); + const snap1 = tracker.recordTrigger(ROOM, entry("trigger1")); // snap=3 + tracker.recordPending(ROOM, entry("msg3")); + const snap2 = tracker.recordTrigger(ROOM, entry("trigger2")); // snap=5 + + // trigger2 completes first (higher index) + tracker.consumeHistory(AGENT, ROOM, snap2); // watermark → 5 + expect(tracker.getPendingHistory(AGENT, ROOM, 100)).toHaveLength(0); + + // trigger1 completes later (lower index) — must NOT regress to 3 + tracker.consumeHistory(AGENT, ROOM, snap1); + // If regressed: [msg3, trigger2] would be visible (2 entries); must stay at 0 + expect(tracker.getPendingHistory(AGENT, ROOM, 100)).toHaveLength(0); + + // In-order advancement still works + tracker.recordPending(ROOM, entry("msg4")); + const snap3 = tracker.recordTrigger(ROOM, entry("trigger3")); // snap=7 + tracker.consumeHistory(AGENT, ROOM, snap3); // watermark → 7 + expect(tracker.getPendingHistory(AGENT, ROOM, 100)).toHaveLength(0); + }); +}); + +describe("createRoomHistoryTracker — roomQueues eviction", () => { + it("evicts the oldest room (FIFO) when the room count exceeds the cap", () => { + const tracker = createRoomHistoryTracker(200, 3); + + const room1 = "!room1:test"; + const room2 = "!room2:test"; + const room3 = "!room3:test"; + const room4 = "!room4:test"; + + tracker.recordPending(room1, entry("msg in room1")); + tracker.recordPending(room2, entry("msg in room2")); + tracker.recordPending(room3, entry("msg in room3")); + + // At cap (3 rooms) — no eviction yet + expect(tracker.getPendingHistory(AGENT, room1, 100)).toHaveLength(1); + + // room4 pushes count to 4 > cap=3 → room1 (oldest) evicted + tracker.recordPending(room4, entry("msg in room4")); + expect(tracker.getPendingHistory(AGENT, room1, 100)).toHaveLength(0); + expect(tracker.getPendingHistory(AGENT, room2, 100)).toHaveLength(1); + expect(tracker.getPendingHistory(AGENT, room3, 100)).toHaveLength(1); + expect(tracker.getPendingHistory(AGENT, room4, 100)).toHaveLength(1); + }); + + it("re-accessing an evicted room starts a fresh empty queue", () => { + const tracker = createRoomHistoryTracker(200, 2); + + const room1 = "!room1:test"; + const room2 = "!room2:test"; + const room3 = "!room3:test"; + + tracker.recordPending(room1, entry("old msg in room1")); + tracker.recordPending(room2, entry("msg in room2")); + tracker.recordPending(room3, entry("msg in room3")); // evicts room1 + + tracker.recordPending(room1, entry("new msg in room1")); + const history = tracker.getPendingHistory(AGENT, room1, 100); + expect(history).toHaveLength(1); + expect(history[0]?.body).toBe("new msg in room1"); + }); + + it("clears stale room watermarks when an evicted room is recreated", () => { + const tracker = createRoomHistoryTracker(200, 1); + const room1 = "!room1:test"; + const room2 = "!room2:test"; + + tracker.recordPending(room1, entry("old msg in room1")); + const firstSnapshot = tracker.recordTrigger(room1, entry("trigger in room1")); + tracker.consumeHistory(AGENT, room1, firstSnapshot); + + // room2 creation evicts room1 (maxRoomQueues=1) + tracker.recordPending(room2, entry("msg in room2")); + + // Recreate room1 and add fresh content. + tracker.recordPending(room1, entry("new msg in room1")); + const history = tracker.getPendingHistory(AGENT, room1, 100); + expect(history).toHaveLength(1); + expect(history[0]?.body).toBe("new msg in room1"); + }); +}); diff --git a/extensions/matrix/src/matrix/monitor/room-history.ts b/extensions/matrix/src/matrix/monitor/room-history.ts new file mode 100644 index 00000000000..9a0ad0a5dc2 --- /dev/null +++ b/extensions/matrix/src/matrix/monitor/room-history.ts @@ -0,0 +1,147 @@ +/** + * Per-room group chat history tracking for Matrix. + * + * Maintains a shared per-room message queue and per-(agentId, roomId) watermarks so + * each agent independently tracks which messages it has already consumed. This design + * lets multiple agents in the same room see independent history windows: + * + * - dev replies to @dev msgB (watermark advances to B) → room queue still has [A, B] + * - spark replies to @spark msgC → spark watermark starts at 0 and sees [A, B, C] + * + * Race-condition safety: the watermark only advances to the snapshot index taken at + * dispatch time, NOT to the queue's end at reply time. Messages that land in the queue + * while the agent is processing stay visible to the next trigger for that agent. + */ + +import type { HistoryEntry } from "openclaw/plugin-sdk/reply-history"; + +/** Maximum entries retained per room (hard cap to bound memory). */ +const DEFAULT_MAX_QUEUE_SIZE = 200; +/** Maximum number of rooms to retain queues for (FIFO eviction beyond this). */ +const DEFAULT_MAX_ROOM_QUEUES = 1000; +/** Maximum number of (agentId, roomId) watermark entries to retain. */ +const MAX_WATERMARK_ENTRIES = 5000; + +export type { HistoryEntry }; + +export type RoomHistoryTracker = { + /** + * Record a non-trigger message for future context. + * Call this when a room message arrives but does not mention the bot. + */ + recordPending: (roomId: string, entry: HistoryEntry) => void; + + /** + * Get pending history for an agent: all messages in the room since the + * agent's last watermark, capped at `limit` most-recent entries. + * Call this BEFORE recordTrigger so the trigger itself is not included. + */ + getPendingHistory: (agentId: string, roomId: string, limit: number) => HistoryEntry[]; + + /** + * Append the trigger message to the room queue and return a snapshot index. + * The snapshot index must be passed to consumeHistory after the agent replies. + */ + recordTrigger: (roomId: string, entry: HistoryEntry) => number; + + /** + * Advance the agent's watermark to the snapshot index returned by recordTrigger. + * Only messages appended after that snapshot remain visible on the next trigger. + */ + consumeHistory: (agentId: string, roomId: string, snapshotIdx: number) => void; +}; + +type RoomQueue = { + entries: HistoryEntry[]; + /** Absolute index of entries[0] — increases as old entries are trimmed. */ + baseIndex: number; +}; + +export function createRoomHistoryTracker( + maxQueueSize = DEFAULT_MAX_QUEUE_SIZE, + maxRoomQueues = DEFAULT_MAX_ROOM_QUEUES, +): RoomHistoryTracker { + const roomQueues = new Map(); + /** Maps `${agentId}:${roomId}` → absolute consumed-up-to index */ + const agentWatermarks = new Map(); + + function clearRoomWatermarks(roomId: string): void { + const roomSuffix = `:${roomId}`; + for (const key of agentWatermarks.keys()) { + if (key.endsWith(roomSuffix)) { + agentWatermarks.delete(key); + } + } + } + + function getOrCreateQueue(roomId: string): RoomQueue { + let queue = roomQueues.get(roomId); + if (!queue) { + queue = { entries: [], baseIndex: 0 }; + roomQueues.set(roomId, queue); + // FIFO eviction to prevent unbounded growth across many rooms + if (roomQueues.size > maxRoomQueues) { + const oldest = roomQueues.keys().next().value; + if (oldest !== undefined) { + roomQueues.delete(oldest); + clearRoomWatermarks(oldest); + } + } + } + return queue; + } + + function appendToQueue(queue: RoomQueue, entry: HistoryEntry): number { + queue.entries.push(entry); + if (queue.entries.length > maxQueueSize) { + const overflow = queue.entries.length - maxQueueSize; + queue.entries.splice(0, overflow); + queue.baseIndex += overflow; + } + return queue.baseIndex + queue.entries.length; + } + + function wmKey(agentId: string, roomId: string): string { + return `${agentId}:${roomId}`; + } + + return { + recordPending(roomId, entry) { + const queue = getOrCreateQueue(roomId); + appendToQueue(queue, entry); + }, + + getPendingHistory(agentId, roomId, limit) { + if (limit <= 0) return []; + const queue = roomQueues.get(roomId); + if (!queue || queue.entries.length === 0) return []; + const wm = agentWatermarks.get(wmKey(agentId, roomId)) ?? 0; + // startAbs: the first absolute index the agent hasn't seen yet + const startAbs = Math.max(wm, queue.baseIndex); + const startRel = startAbs - queue.baseIndex; + const available = queue.entries.slice(startRel); + // Cap to the last `limit` entries + return limit > 0 && available.length > limit ? available.slice(-limit) : available; + }, + + recordTrigger(roomId, entry) { + const queue = getOrCreateQueue(roomId); + return appendToQueue(queue, entry); + }, + + consumeHistory(agentId, roomId, snapshotIdx) { + const key = wmKey(agentId, roomId); + // Monotone write: never regress an already-advanced watermark. + // Guards against out-of-order completion when two triggers for the same + // (agentId, roomId) are in-flight concurrently. + agentWatermarks.set(key, Math.max(agentWatermarks.get(key) ?? 0, snapshotIdx)); + // LRU-style eviction to prevent unbounded growth + if (agentWatermarks.size > MAX_WATERMARK_ENTRIES) { + const oldest = agentWatermarks.keys().next().value; + if (oldest !== undefined) { + agentWatermarks.delete(oldest); + } + } + }, + }; +} diff --git a/extensions/matrix/src/types.ts b/extensions/matrix/src/types.ts index 408144ca3f3..d161f5b23ca 100644 --- a/extensions/matrix/src/types.ts +++ b/extensions/matrix/src/types.ts @@ -123,6 +123,12 @@ export type MatrixConfig = { startupVerificationCooldownHours?: number; /** Max outbound media size in MB. */ mediaMaxMb?: number; + /** + * Number of recent room messages shown to the agent as context when it is mentioned + * in a group chat (0 = disabled). Applies to room messages that did not directly + * trigger a reply. Default: 0 (disabled). + */ + historyLimit?: number; /** Auto-join invites (always|allowlist|off). Default: off. */ autoJoin?: "always" | "allowlist" | "off"; /** Allowlist for auto-join invites (room IDs, aliases). */ diff --git a/src/config/bundled-channel-config-metadata.generated.ts b/src/config/bundled-channel-config-metadata.generated.ts index 5e331953b3f..53e45bbd6fa 100644 --- a/src/config/bundled-channel-config-metadata.generated.ts +++ b/src/config/bundled-channel-config-metadata.generated.ts @@ -6301,6 +6301,27 @@ export const GENERATED_BUNDLED_CHANNEL_CONFIG_METADATA = [ webhookPath: { type: "string", }, + threadBindings: { + type: "object", + properties: { + enabled: { + type: "boolean", + }, + idleHours: { + type: "number", + }, + maxAgeHours: { + type: "number", + }, + spawnSubagentSessions: { + type: "boolean", + }, + spawnAcpSessions: { + type: "boolean", + }, + }, + additionalProperties: false, + }, accounts: { type: "object", propertyNames: { @@ -6372,6 +6393,27 @@ export const GENERATED_BUNDLED_CHANNEL_CONFIG_METADATA = [ webhookPath: { type: "string", }, + threadBindings: { + type: "object", + properties: { + enabled: { + type: "boolean", + }, + idleHours: { + type: "number", + }, + maxAgeHours: { + type: "number", + }, + spawnSubagentSessions: { + type: "boolean", + }, + spawnAcpSessions: { + type: "boolean", + }, + }, + additionalProperties: false, + }, groups: { type: "object", propertyNames: { @@ -6674,6 +6716,17 @@ export const GENERATED_BUNDLED_CHANNEL_CONFIG_METADATA = [ type: "string", enum: ["open", "disabled", "allowlist"], }, + streaming: { + anyOf: [ + { + type: "string", + enum: ["partial", "off"], + }, + { + type: "boolean", + }, + ], + }, replyToMode: { type: "string", enum: ["off", "first", "all"], @@ -6736,6 +6789,11 @@ export const GENERATED_BUNDLED_CHANNEL_CONFIG_METADATA = [ mediaMaxMb: { type: "number", }, + historyLimit: { + type: "integer", + minimum: 0, + maximum: 9007199254740991, + }, autoJoin: { type: "string", enum: ["always", "allowlist", "off"],