feat(matrix): add group chat history context for agent triggers

Implement per-room message queue with per-agent watermarks so each agent
in a Matrix room independently tracks which messages it has consumed.

- Non-trigger messages accumulate in a shared per-room queue
- When an agent is triggered, it receives InboundHistory of pending messages
  since its last reply (capped at historyLimit)
- Watermark only advances to the snapshot taken at dispatch time, so messages
  arriving during async processing are visible on the next trigger (race safety)
- Each agent has an independent watermark, so multiple agents in the same room
  see independent history windows

Configure via channels.matrix.historyLimit (or messages.groupChat.historyLimit).
Default is 0 (disabled, preserving existing behavior).
This commit is contained in:
chain710 2026-03-29 00:01:54 +08:00 committed by Gustavo Madeira Santana
parent 8deb9522f3
commit 6f7825a3a9
No known key found for this signature in database
12 changed files with 833 additions and 2 deletions

View File

@ -22371,6 +22371,16 @@
"tags": [],
"hasChildren": false
},
{
"path": "channels.matrix.historyLimit",
"kind": "channel",
"type": "integer",
"required": false,
"deprecated": false,
"sensitive": false,
"tags": [],
"hasChildren": false
},
{
"path": "channels.matrix.homeserver",
"kind": "channel",

View File

@ -1,4 +1,4 @@
{"generatedBy":"scripts/generate-config-doc-baseline.ts","recordType":"meta","totalPaths":5593}
{"generatedBy":"scripts/generate-config-doc-baseline.ts","recordType":"meta","totalPaths":5594}
{"recordType":"path","path":"acp","kind":"core","type":"object","required":false,"deprecated":false,"sensitive":false,"tags":["advanced"],"label":"ACP","help":"ACP runtime controls for enabling dispatch, selecting backends, constraining allowed agent targets, and tuning streamed turn projection behavior.","hasChildren":true}
{"recordType":"path","path":"acp.allowedAgents","kind":"core","type":"array","required":false,"deprecated":false,"sensitive":false,"tags":["access"],"label":"ACP Allowed Agents","help":"Allowlist of ACP target agent ids permitted for ACP runtime sessions. Empty means no additional allowlist restriction.","hasChildren":true}
{"recordType":"path","path":"acp.allowedAgents.*","kind":"core","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false}
@ -1993,6 +1993,7 @@
{"recordType":"path","path":"channels.matrix.groups.*.tools.deny.*","kind":"channel","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false}
{"recordType":"path","path":"channels.matrix.groups.*.users","kind":"channel","type":"array","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":true}
{"recordType":"path","path":"channels.matrix.groups.*.users.*","kind":"channel","type":["number","string"],"required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false}
{"recordType":"path","path":"channels.matrix.historyLimit","kind":"channel","type":"integer","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false}
{"recordType":"path","path":"channels.matrix.homeserver","kind":"channel","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false}
{"recordType":"path","path":"channels.matrix.initialSyncLimit","kind":"channel","type":"number","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false}
{"recordType":"path","path":"channels.matrix.markdown","kind":"channel","type":"object","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":true}

View File

@ -80,6 +80,7 @@ export const MatrixConfigSchema = z.object({
startupVerification: z.enum(["off", "if-unverified"]).optional(),
startupVerificationCooldownHours: z.number().optional(),
mediaMaxMb: z.number().optional(),
historyLimit: z.number().int().min(0).optional(),
autoJoin: z.enum(["always", "allowlist", "off"]).optional(),
autoJoinAllowlist: AllowFromListSchema,
groupAllowFrom: AllowFromListSchema,

View File

@ -0,0 +1,421 @@
/**
* Tests for Matrix group chat history accumulation.
*
* Covers two key scenarios:
*
* Scenario 1 basic accumulation across agents:
* user: msg A (no mention, accumulates)
* user: @agent_a msg B (triggers agent_a; agent_a sees [A] in history, not B itself)
* user: @agent_b msg C (triggers agent_b; agent_b sees [A, B] independent watermark)
* user: @agent_b msg D (triggers agent_b; agent_b sees [] A/B/C were consumed)
*
* Scenario 2 race condition safety:
* user: @agent_a msg A (triggers agent_a; agent starts processing, not yet replied)
* user: msg B (no mention, arrives during processing must not be lost)
* agent_a: reply (watermark advances to just after A, not after B)
* user: @agent_a msg C (triggers agent_a; agent_a sees [B] in history)
*/
import { beforeEach, describe, expect, it, vi } from "vitest";
import { installMatrixMonitorTestRuntime } from "../../test-runtime.js";
import {
createMatrixHandlerTestHarness,
createMatrixRoomMessageEvent,
createMatrixTextMessageEvent,
} from "./handler.test-helpers.js";
import { EventType, type MatrixRawEvent } from "./types.js";
const DEFAULT_ROOM = "!room:example.org";
function makeRoomTriggerEvent(params: { eventId: string; body: string; ts?: number }) {
// Use @room mention to trigger the bot without requiring agent-specific mention regexes
return createMatrixTextMessageEvent({
eventId: params.eventId,
body: `@room ${params.body}`,
originServerTs: params.ts ?? Date.now(),
mentions: { room: true },
});
}
function makeRoomPlainEvent(params: { eventId: string; body: string; ts?: number }) {
return createMatrixTextMessageEvent({
eventId: params.eventId,
body: params.body,
originServerTs: params.ts ?? Date.now(),
});
}
function makeDevRoute(agentId: string) {
return {
agentId,
channel: "matrix" as const,
accountId: "ops",
sessionKey: `agent:${agentId}:main`,
mainSessionKey: `agent:${agentId}:main`,
matchedBy: "binding.account" as const,
};
}
beforeEach(() => {
installMatrixMonitorTestRuntime();
});
describe("matrix group chat history — scenario 1: basic accumulation", () => {
it("pending messages appear in InboundHistory; trigger itself does not", async () => {
const finalizeInboundContext = vi.fn((ctx: unknown) => ctx);
const { handler } = createMatrixHandlerTestHarness({
historyLimit: 20,
groupPolicy: "open",
isDirectMessage: false,
finalizeInboundContext,
dispatchReplyFromConfig: async () => ({
queuedFinal: true,
counts: { final: 1, block: 0, tool: 0 },
}),
});
// Non-trigger message A — should not dispatch
await handler(DEFAULT_ROOM, makeRoomPlainEvent({ eventId: "$a", body: "msg A", ts: 1000 }));
expect(finalizeInboundContext).not.toHaveBeenCalled();
// Trigger B — history must contain [msg A] only, not the trigger itself
await handler(DEFAULT_ROOM, makeRoomTriggerEvent({ eventId: "$b", body: "msg B", ts: 2000 }));
expect(finalizeInboundContext).toHaveBeenCalledOnce();
const ctx = finalizeInboundContext.mock.calls[0]?.[0] as Record<string, unknown>;
const history = ctx["InboundHistory"] as Array<{ body: string; sender: string }>;
expect(history).toHaveLength(1);
expect(history[0]?.body).toContain("msg A");
});
it("multi-agent: each agent has an independent watermark", async () => {
let currentAgentId = "agent_a";
const finalizeInboundContext = vi.fn((ctx: unknown) => ctx);
const { handler } = createMatrixHandlerTestHarness({
historyLimit: 20,
groupPolicy: "open",
isDirectMessage: false,
finalizeInboundContext,
resolveAgentRoute: vi.fn(() => makeDevRoute(currentAgentId)),
dispatchReplyFromConfig: async () => ({
queuedFinal: true,
counts: { final: 1, block: 0, tool: 0 },
}),
});
// msg A accumulates for all agents
await handler(DEFAULT_ROOM, makeRoomPlainEvent({ eventId: "$a", body: "msg A", ts: 1000 }));
// @agent_a trigger B — agent_a sees [msg A]
currentAgentId = "agent_a";
await handler(DEFAULT_ROOM, makeRoomTriggerEvent({ eventId: "$b", body: "msg B", ts: 2000 }));
{
const ctx = finalizeInboundContext.mock.calls[0]?.[0] as Record<string, unknown>;
const history = ctx["InboundHistory"] as Array<{ body: string }>;
expect(history).toHaveLength(1);
expect(history[0]?.body).toContain("msg A");
}
// @agent_b trigger C — agent_b watermark is 0, so it sees [msg A, msg B]
currentAgentId = "agent_b";
await handler(DEFAULT_ROOM, makeRoomTriggerEvent({ eventId: "$c", body: "msg C", ts: 3000 }));
{
const ctx = finalizeInboundContext.mock.calls[1]?.[0] as Record<string, unknown>;
const history = ctx["InboundHistory"] as Array<{ body: string }>;
expect(history).toHaveLength(2);
expect(history.map((h) => h.body).some((b) => b.includes("msg A"))).toBe(true);
expect(history.map((h) => h.body).some((b) => b.includes("msg B"))).toBe(true);
}
// @agent_b trigger D — A/B/C consumed; history is empty
currentAgentId = "agent_b";
await handler(DEFAULT_ROOM, makeRoomTriggerEvent({ eventId: "$d", body: "msg D", ts: 4000 }));
{
const ctx = finalizeInboundContext.mock.calls[2]?.[0] as Record<string, unknown>;
const history = ctx["InboundHistory"] as Array<unknown> | undefined;
expect(history ?? []).toHaveLength(0);
}
});
it("respects historyLimit: caps to the most recent N entries", async () => {
const finalizeInboundContext = vi.fn((ctx: unknown) => ctx);
const { handler } = createMatrixHandlerTestHarness({
historyLimit: 2,
groupPolicy: "open",
isDirectMessage: false,
finalizeInboundContext,
dispatchReplyFromConfig: async () => ({
queuedFinal: true,
counts: { final: 1, block: 0, tool: 0 },
}),
});
for (let i = 1; i <= 4; i++) {
await handler(
DEFAULT_ROOM,
makeRoomPlainEvent({ eventId: `$p${i}`, body: `pending ${i}`, ts: i * 1000 }),
);
}
await handler(DEFAULT_ROOM, makeRoomTriggerEvent({ eventId: "$t", body: "trigger", ts: 5000 }));
const ctx = finalizeInboundContext.mock.calls[0]?.[0] as Record<string, unknown>;
const history = ctx["InboundHistory"] as Array<{ body: string }>;
expect(history).toHaveLength(2);
expect(history[0]?.body).toContain("pending 3");
expect(history[1]?.body).toContain("pending 4");
});
it("historyLimit=0 disables history accumulation entirely", async () => {
const finalizeInboundContext = vi.fn((ctx: unknown) => ctx);
const { handler } = createMatrixHandlerTestHarness({
historyLimit: 0,
groupPolicy: "open",
isDirectMessage: false,
finalizeInboundContext,
dispatchReplyFromConfig: async () => ({
queuedFinal: true,
counts: { final: 1, block: 0, tool: 0 },
}),
});
await handler(DEFAULT_ROOM, makeRoomPlainEvent({ eventId: "$p", body: "pending" }));
await handler(DEFAULT_ROOM, makeRoomTriggerEvent({ eventId: "$t", body: "trigger" }));
const ctx = finalizeInboundContext.mock.calls[0]?.[0] as Record<string, unknown>;
const history = ctx["InboundHistory"] as Array<unknown> | undefined;
expect(history ?? []).toHaveLength(0);
});
it("DMs do not accumulate history (group chat only)", async () => {
const finalizeInboundContext = vi.fn((ctx: unknown) => ctx);
const { handler } = createMatrixHandlerTestHarness({
historyLimit: 20,
isDirectMessage: true,
finalizeInboundContext,
dispatchReplyFromConfig: async () => ({
queuedFinal: true,
counts: { final: 1, block: 0, tool: 0 },
}),
});
await handler(DEFAULT_ROOM, makeRoomPlainEvent({ eventId: "$dm1", body: "dm message 1" }));
await handler(DEFAULT_ROOM, makeRoomPlainEvent({ eventId: "$dm2", body: "dm message 2" }));
expect(finalizeInboundContext).toHaveBeenCalledTimes(2);
for (const call of finalizeInboundContext.mock.calls) {
const ctx = call[0] as Record<string, unknown>;
const history = ctx["InboundHistory"] as Array<unknown> | undefined;
expect(history ?? []).toHaveLength(0);
}
});
it("includes skipped media-only room messages in next trigger history", async () => {
const finalizeInboundContext = vi.fn((ctx: unknown) => ctx);
const { handler } = createMatrixHandlerTestHarness({
historyLimit: 20,
groupPolicy: "open",
isDirectMessage: false,
finalizeInboundContext,
dispatchReplyFromConfig: async () => ({
queuedFinal: true,
counts: { final: 1, block: 0, tool: 0 },
}),
});
// Unmentioned media-only message should be buffered as pending history context.
await handler(
DEFAULT_ROOM,
createMatrixRoomMessageEvent({
eventId: "$media-a",
originServerTs: 1000,
content: {
msgtype: "m.image",
body: "",
url: "mxc://example.org/media-a",
},
}) as MatrixRawEvent,
);
expect(finalizeInboundContext).not.toHaveBeenCalled();
await handler(
DEFAULT_ROOM,
makeRoomTriggerEvent({ eventId: "$trigger-media", body: "trigger", ts: 2000 }),
);
expect(finalizeInboundContext).toHaveBeenCalledOnce();
const ctx = finalizeInboundContext.mock.calls[0]?.[0] as Record<string, unknown>;
const history = ctx["InboundHistory"] as Array<{ body: string }> | undefined;
expect(history?.some((entry) => entry.body.includes("[matrix image attachment]"))).toBe(true);
});
});
describe("matrix group chat history — scenario 2: race condition safety", () => {
it("messages arriving during agent processing are visible on the next trigger", async () => {
let resolveFirstDispatch: (() => void) | undefined;
let firstDispatchStarted = false;
const finalizeInboundContext = vi.fn((ctx: unknown) => ctx);
const dispatchReplyFromConfig = vi.fn(async () => {
if (!firstDispatchStarted) {
firstDispatchStarted = true;
await new Promise<void>((resolve) => {
resolveFirstDispatch = resolve;
});
}
return { queuedFinal: true, counts: { final: 1, block: 0, tool: 0 } };
});
const { handler } = createMatrixHandlerTestHarness({
historyLimit: 20,
groupPolicy: "open",
isDirectMessage: false,
finalizeInboundContext,
dispatchReplyFromConfig,
});
// Step 1: trigger msg A — don't await, let it block in dispatch
const firstHandlerDone = handler(
DEFAULT_ROOM,
makeRoomTriggerEvent({ eventId: "$a", body: "msg A", ts: 1000 }),
);
// Step 2: wait until dispatch is in-flight
await vi.waitFor(() => {
expect(firstDispatchStarted).toBe(true);
});
// Step 3: msg B arrives while agent is processing — must not be lost
await handler(DEFAULT_ROOM, makeRoomPlainEvent({ eventId: "$b", body: "msg B", ts: 2000 }));
// Step 4: unblock dispatch and complete
resolveFirstDispatch!();
await firstHandlerDone;
// watermark advances to snapshot taken at dispatch time (just after msg A), not to queue end
// Step 5: trigger msg C — should see [msg B] in history (msg A was consumed)
await handler(DEFAULT_ROOM, makeRoomTriggerEvent({ eventId: "$c", body: "msg C", ts: 3000 }));
expect(finalizeInboundContext).toHaveBeenCalledTimes(2);
const ctxForC = finalizeInboundContext.mock.calls[1]?.[0] as Record<string, unknown>;
const history = ctxForC["InboundHistory"] as Array<{ body: string }>;
expect(history.some((h) => h.body.includes("msg B"))).toBe(true);
expect(history.every((h) => !h.body.includes("msg A"))).toBe(true);
});
it("watermark does not advance when final reply delivery fails (retry sees same history)", async () => {
// Capture the onError callback so we can fire a simulated final delivery failure
let capturedOnError:
| ((err: unknown, info: { kind: "tool" | "block" | "final" }) => void)
| undefined;
const finalizeInboundContext = vi.fn((ctx: unknown) => ctx);
const { handler } = createMatrixHandlerTestHarness({
historyLimit: 20,
groupPolicy: "open",
isDirectMessage: false,
finalizeInboundContext,
dispatchReplyFromConfig: async () => ({
queuedFinal: true,
counts: { final: 1, block: 0, tool: 0 },
}),
createReplyDispatcherWithTyping: (params?: {
onError?: (err: unknown, info: { kind: "tool" | "block" | "final" }) => void;
}) => {
capturedOnError = params?.onError;
return {
dispatcher: {},
replyOptions: {},
markDispatchIdle: () => {},
markRunComplete: () => {},
};
},
withReplyDispatcher: async <T>(params: {
dispatcher: { markComplete?: () => void; waitForIdle?: () => Promise<void> };
run: () => Promise<T>;
onSettled?: () => void | Promise<void>;
}) => {
const result = await params.run();
capturedOnError?.(new Error("simulated delivery failure"), { kind: "final" });
params.dispatcher.markComplete?.();
await params.dispatcher.waitForIdle?.();
await params.onSettled?.();
return result;
},
});
await handler(
DEFAULT_ROOM,
makeRoomPlainEvent({ eventId: "$p", body: "pending msg", ts: 1000 }),
);
// First trigger — delivery fails; watermark must NOT advance
await handler(
DEFAULT_ROOM,
makeRoomTriggerEvent({ eventId: "$t1", body: "trigger 1", ts: 2000 }),
);
expect(finalizeInboundContext).toHaveBeenCalledOnce();
{
const ctx = finalizeInboundContext.mock.calls[0]?.[0] as Record<string, unknown>;
const history = ctx["InboundHistory"] as Array<{ body: string }>;
expect(history).toHaveLength(1);
expect(history[0]?.body).toContain("pending msg");
}
// Second trigger — pending msg must still be visible (watermark not advanced)
await handler(
DEFAULT_ROOM,
makeRoomTriggerEvent({ eventId: "$t2", body: "trigger 2", ts: 3000 }),
);
expect(finalizeInboundContext).toHaveBeenCalledTimes(2);
{
const ctx = finalizeInboundContext.mock.calls[1]?.[0] as Record<string, unknown>;
const history = ctx["InboundHistory"] as Array<{ body: string }> | undefined;
expect(history?.some((h) => h.body.includes("pending msg"))).toBe(true);
}
});
it("records pending history before sender-name lookup resolves", async () => {
let resolveFirstName: (() => void) | undefined;
let firstNameLookupStarted = false;
const getMemberDisplayName = vi.fn(async () => {
firstNameLookupStarted = true;
await new Promise<void>((resolve) => {
resolveFirstName = resolve;
});
return "sender";
});
const finalizeInboundContext = vi.fn((ctx: unknown) => ctx);
const { handler } = createMatrixHandlerTestHarness({
historyLimit: 20,
groupPolicy: "open",
isDirectMessage: false,
getMemberDisplayName,
finalizeInboundContext,
dispatchReplyFromConfig: async () => ({
queuedFinal: true,
counts: { final: 1, block: 0, tool: 0 },
}),
});
// Unmentioned message should be buffered without waiting for async sender-name lookup.
await handler(
DEFAULT_ROOM,
makeRoomPlainEvent({ eventId: "$slow-name", body: "plain before trigger", ts: 1000 }),
);
expect(firstNameLookupStarted).toBe(false);
// Trigger reads pending history first, then can await sender-name lookup later.
const triggerDone = handler(
DEFAULT_ROOM,
makeRoomTriggerEvent({ eventId: "$trigger-after-slow-name", body: "trigger", ts: 2000 }),
);
await vi.waitFor(() => {
expect(firstNameLookupStarted).toBe(true);
});
resolveFirstName?.();
await triggerDone;
const ctx = finalizeInboundContext.mock.calls[0]?.[0] as Record<string, unknown>;
const history = ctx["InboundHistory"] as Array<{ body: string }> | undefined;
expect(history?.some((entry) => entry.body.includes("plain before trigger"))).toBe(true);
});
});

View File

@ -40,6 +40,7 @@ type MatrixHandlerTestHarnessOptions = {
dropPreStartupMessages?: boolean;
needsRoomAliasesForConfig?: boolean;
isDirectMessage?: boolean;
historyLimit?: number;
readAllowFromStore?: MatrixMonitorHandlerParams["core"]["channel"]["pairing"]["readAllowFromStore"];
upsertPairingRequest?: MatrixMonitorHandlerParams["core"]["channel"]["pairing"]["upsertPairingRequest"];
buildPairingReply?: () => string;
@ -225,6 +226,7 @@ export function createMatrixHandlerTestHarness(
getRoomInfo: options.getRoomInfo ?? (async () => ({ altAliases: [] })),
getMemberDisplayName: options.getMemberDisplayName ?? (async () => "sender"),
needsRoomAliasesForConfig: options.needsRoomAliasesForConfig ?? false,
historyLimit: options.historyLimit ?? 0,
});
return {

View File

@ -892,6 +892,7 @@ describe("matrix monitor handler pairing account scope", () => {
dmPolicy: "open",
textLimit: 8_000,
mediaMaxBytes: 10_000_000,
historyLimit: 0,
startupMs: 0,
startupGraceMs: 0,
directTracker: {

View File

@ -15,7 +15,12 @@ import {
} from "../../runtime-api.js";
import type { CoreConfig, MatrixRoomConfig, ReplyToMode } from "../../types.js";
import { createMatrixDraftStream } from "../draft-stream.js";
import { formatMatrixMediaUnavailableText } from "../media-text.js";
import {
formatMatrixMediaUnavailableText,
formatMatrixMessageText,
resolveMatrixMessageAttachment,
resolveMatrixMessageBody,
} from "../media-text.js";
import { fetchMatrixPollSnapshot } from "../poll-summary.js";
import {
formatPollAsText,
@ -40,6 +45,8 @@ import { resolveMentions } from "./mentions.js";
import { handleInboundMatrixReaction } from "./reaction-events.js";
import { deliverMatrixReplies } from "./replies.js";
import { createMatrixReplyContextResolver } from "./reply-context.js";
import { createRoomHistoryTracker } from "./room-history.js";
import type { HistoryEntry } from "./room-history.js";
import { resolveMatrixRoomConfig } from "./rooms.js";
import { resolveMatrixInboundRoute } from "./route.js";
import { createMatrixThreadContextResolver } from "./thread-context.js";
@ -74,6 +81,7 @@ export type MatrixMonitorHandlerParams = {
dmPolicy: "open" | "pairing" | "allowlist" | "disabled";
textLimit: number;
mediaMaxBytes: number;
historyLimit: number;
startupMs: number;
startupGraceMs: number;
dropPreStartupMessages: boolean;
@ -134,6 +142,29 @@ function resolveMatrixInboundBodyText(params: {
});
}
function resolveMatrixPendingHistoryText(params: {
mentionPrecheckText: string;
content: RoomMessageEventContent;
mediaUrl?: string;
}): string {
if (params.mentionPrecheckText) {
return params.mentionPrecheckText;
}
if (!params.mediaUrl) {
return "";
}
const body = typeof params.content.body === "string" ? params.content.body.trim() : undefined;
const filename =
typeof params.content.filename === "string" ? params.content.filename.trim() : undefined;
const msgtype = typeof params.content.msgtype === "string" ? params.content.msgtype : undefined;
return (
formatMatrixMessageText({
body: resolveMatrixMessageBody({ body, filename, msgtype }),
attachment: resolveMatrixMessageAttachment({ body, filename, msgtype }),
}) ?? ""
);
}
function resolveMatrixAllowBotsMode(value?: boolean | "mentions"): MatrixAllowBotsMode {
if (value === true) {
return "all";
@ -166,6 +197,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
dmPolicy,
textLimit,
mediaMaxBytes,
historyLimit,
startupMs,
startupGraceMs,
dropPreStartupMessages,
@ -190,6 +222,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
getMemberDisplayName,
logVerboseMessage,
});
const roomHistoryTracker = createRoomHistoryTracker();
const readStoreAllowFrom = async (): Promise<string[]> => {
const now = Date.now();
@ -532,6 +565,11 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
? content.file
: undefined;
const mediaUrl = contentUrl ?? contentFile?.url;
const pendingHistoryText = resolveMatrixPendingHistoryText({
mentionPrecheckText,
content,
mediaUrl,
});
if (!mentionPrecheckText && !mediaUrl && !isPollEvent) {
await commitInboundEventIfClaimed();
return;
@ -622,6 +660,16 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
hasControlCommandInMessage;
const canDetectMention = agentMentionRegexes.length > 0 || hasExplicitMention;
if (isRoom && shouldRequireMention && !wasMentioned && !shouldBypassMention) {
// Record in room history so future triggered replies can see this message as context.
if (historyLimit > 0 && pendingHistoryText) {
const pendingEntry: HistoryEntry = {
// Keep skipped-message buffering non-blocking: sender name lookup can be async.
sender: senderId,
body: pendingHistoryText,
timestamp: eventTs ?? undefined,
};
roomHistoryTracker.recordPending(roomId, pendingEntry);
}
logger.info("skipping room message", { roomId, reason: "no-mention" });
await commitInboundEventIfClaimed();
return;
@ -753,6 +801,22 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
getSessionBindingService().touch(_runtimeBindingId, eventTs ?? undefined);
}
const envelopeFrom = isDirectMessage ? senderName : (roomName ?? roomId);
// Group chat history: read pending history before recording this trigger, then
// snapshot the queue position so the watermark can advance to exactly here on reply.
const inboundHistory =
isRoom && historyLimit > 0
? roomHistoryTracker.getPendingHistory(_route.agentId, roomId, historyLimit)
: undefined;
const triggerSnapshotIdx =
isRoom && historyLimit > 0
? roomHistoryTracker.recordTrigger(roomId, {
sender: senderName,
body: bodyText,
timestamp: eventTs ?? undefined,
})
: -1;
const textWithId = `${bodyText}\n[matrix event id: ${_messageId} room: ${roomId}]`;
const storePath = core.channel.session.resolveStorePath(cfg.session?.store, {
agentId: _route.agentId,
@ -776,6 +840,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
Body: body,
RawBody: bodyText,
CommandBody: bodyText,
InboundHistory: inboundHistory && inboundHistory.length > 0 ? inboundHistory : undefined,
From: isDirectMessage ? `matrix:${senderId}` : `matrix:channel:${roomId}`,
To: `room:${roomId}`,
SessionKey: _route.sessionKey,
@ -1164,14 +1229,22 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
logVerboseMessage(
`matrix: final reply delivery failed room=${roomId} id=${_messageId}; leaving event uncommitted`,
);
// Do not advance watermark — the event will be retried and should see the same history.
return;
}
if (!queuedFinal && nonFinalReplyDeliveryFailed) {
logVerboseMessage(
`matrix: non-final reply delivery failed room=${roomId} id=${_messageId}; leaving event uncommitted`,
);
// Do not advance watermark — the event will be retried.
return;
}
// Advance the per-agent watermark now that the reply succeeded (or no reply was needed).
// Only advance to the snapshot position — messages added during async processing remain
// visible for the next trigger.
if (isRoom && triggerSnapshotIdx >= 0) {
roomHistoryTracker.consumeHistory(_route.agentId, roomId, triggerSnapshotIdx);
}
if (!queuedFinal) {
await commitInboundEventIfClaimed();
return;

View File

@ -196,6 +196,10 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
const dmPolicyRaw = dmConfig?.policy ?? "pairing";
const dmPolicy = allowlistOnly && dmPolicyRaw !== "disabled" ? "allowlist" : dmPolicyRaw;
const textLimit = core.channel.text.resolveTextChunkLimit(cfg, "matrix", account.accountId);
const globalGroupChatHistoryLimit = (
cfg.messages as { groupChat?: { historyLimit?: number } } | undefined
)?.groupChat?.historyLimit;
const historyLimit = Math.max(0, accountConfig.historyLimit ?? globalGroupChatHistoryLimit ?? 0);
const mediaMaxMb = opts.mediaMaxMb ?? accountConfig.mediaMaxMb ?? DEFAULT_MEDIA_MAX_MB;
const mediaMaxBytes = Math.max(1, mediaMaxMb) * 1024 * 1024;
const streaming: "partial" | "off" =
@ -232,6 +236,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
dmPolicy,
textLimit,
mediaMaxBytes,
historyLimit,
startupMs,
startupGraceMs,
dropPreStartupMessages,

View File

@ -0,0 +1,106 @@
/**
* Unit tests for createRoomHistoryTracker.
*
* Covers correctness properties that are hard to observe through the handler harness:
* - Monotone watermark advancement (out-of-order consumeHistory must not regress)
* - roomQueues FIFO eviction when the room count exceeds the cap
*/
import { describe, expect, it } from "vitest";
import { createRoomHistoryTracker } from "./room-history.js";
const ROOM = "!room:test";
const AGENT = "agent_a";
function entry(body: string) {
return { sender: "user", body };
}
describe("createRoomHistoryTracker — watermark monotonicity", () => {
it("consumeHistory is monotone: out-of-order completion does not regress the watermark", () => {
const tracker = createRoomHistoryTracker();
// Queue: [msg1, msg2, trigger1, msg3, trigger2]
tracker.recordPending(ROOM, entry("msg1"));
tracker.recordPending(ROOM, entry("msg2"));
const snap1 = tracker.recordTrigger(ROOM, entry("trigger1")); // snap=3
tracker.recordPending(ROOM, entry("msg3"));
const snap2 = tracker.recordTrigger(ROOM, entry("trigger2")); // snap=5
// trigger2 completes first (higher index)
tracker.consumeHistory(AGENT, ROOM, snap2); // watermark → 5
expect(tracker.getPendingHistory(AGENT, ROOM, 100)).toHaveLength(0);
// trigger1 completes later (lower index) — must NOT regress to 3
tracker.consumeHistory(AGENT, ROOM, snap1);
// If regressed: [msg3, trigger2] would be visible (2 entries); must stay at 0
expect(tracker.getPendingHistory(AGENT, ROOM, 100)).toHaveLength(0);
// In-order advancement still works
tracker.recordPending(ROOM, entry("msg4"));
const snap3 = tracker.recordTrigger(ROOM, entry("trigger3")); // snap=7
tracker.consumeHistory(AGENT, ROOM, snap3); // watermark → 7
expect(tracker.getPendingHistory(AGENT, ROOM, 100)).toHaveLength(0);
});
});
describe("createRoomHistoryTracker — roomQueues eviction", () => {
it("evicts the oldest room (FIFO) when the room count exceeds the cap", () => {
const tracker = createRoomHistoryTracker(200, 3);
const room1 = "!room1:test";
const room2 = "!room2:test";
const room3 = "!room3:test";
const room4 = "!room4:test";
tracker.recordPending(room1, entry("msg in room1"));
tracker.recordPending(room2, entry("msg in room2"));
tracker.recordPending(room3, entry("msg in room3"));
// At cap (3 rooms) — no eviction yet
expect(tracker.getPendingHistory(AGENT, room1, 100)).toHaveLength(1);
// room4 pushes count to 4 > cap=3 → room1 (oldest) evicted
tracker.recordPending(room4, entry("msg in room4"));
expect(tracker.getPendingHistory(AGENT, room1, 100)).toHaveLength(0);
expect(tracker.getPendingHistory(AGENT, room2, 100)).toHaveLength(1);
expect(tracker.getPendingHistory(AGENT, room3, 100)).toHaveLength(1);
expect(tracker.getPendingHistory(AGENT, room4, 100)).toHaveLength(1);
});
it("re-accessing an evicted room starts a fresh empty queue", () => {
const tracker = createRoomHistoryTracker(200, 2);
const room1 = "!room1:test";
const room2 = "!room2:test";
const room3 = "!room3:test";
tracker.recordPending(room1, entry("old msg in room1"));
tracker.recordPending(room2, entry("msg in room2"));
tracker.recordPending(room3, entry("msg in room3")); // evicts room1
tracker.recordPending(room1, entry("new msg in room1"));
const history = tracker.getPendingHistory(AGENT, room1, 100);
expect(history).toHaveLength(1);
expect(history[0]?.body).toBe("new msg in room1");
});
it("clears stale room watermarks when an evicted room is recreated", () => {
const tracker = createRoomHistoryTracker(200, 1);
const room1 = "!room1:test";
const room2 = "!room2:test";
tracker.recordPending(room1, entry("old msg in room1"));
const firstSnapshot = tracker.recordTrigger(room1, entry("trigger in room1"));
tracker.consumeHistory(AGENT, room1, firstSnapshot);
// room2 creation evicts room1 (maxRoomQueues=1)
tracker.recordPending(room2, entry("msg in room2"));
// Recreate room1 and add fresh content.
tracker.recordPending(room1, entry("new msg in room1"));
const history = tracker.getPendingHistory(AGENT, room1, 100);
expect(history).toHaveLength(1);
expect(history[0]?.body).toBe("new msg in room1");
});
});

View File

@ -0,0 +1,147 @@
/**
* Per-room group chat history tracking for Matrix.
*
* Maintains a shared per-room message queue and per-(agentId, roomId) watermarks so
* each agent independently tracks which messages it has already consumed. This design
* lets multiple agents in the same room see independent history windows:
*
* - dev replies to @dev msgB (watermark advances to B) room queue still has [A, B]
* - spark replies to @spark msgC spark watermark starts at 0 and sees [A, B, C]
*
* Race-condition safety: the watermark only advances to the snapshot index taken at
* dispatch time, NOT to the queue's end at reply time. Messages that land in the queue
* while the agent is processing stay visible to the next trigger for that agent.
*/
import type { HistoryEntry } from "openclaw/plugin-sdk/reply-history";
/** Maximum entries retained per room (hard cap to bound memory). */
const DEFAULT_MAX_QUEUE_SIZE = 200;
/** Maximum number of rooms to retain queues for (FIFO eviction beyond this). */
const DEFAULT_MAX_ROOM_QUEUES = 1000;
/** Maximum number of (agentId, roomId) watermark entries to retain. */
const MAX_WATERMARK_ENTRIES = 5000;
export type { HistoryEntry };
export type RoomHistoryTracker = {
/**
* Record a non-trigger message for future context.
* Call this when a room message arrives but does not mention the bot.
*/
recordPending: (roomId: string, entry: HistoryEntry) => void;
/**
* Get pending history for an agent: all messages in the room since the
* agent's last watermark, capped at `limit` most-recent entries.
* Call this BEFORE recordTrigger so the trigger itself is not included.
*/
getPendingHistory: (agentId: string, roomId: string, limit: number) => HistoryEntry[];
/**
* Append the trigger message to the room queue and return a snapshot index.
* The snapshot index must be passed to consumeHistory after the agent replies.
*/
recordTrigger: (roomId: string, entry: HistoryEntry) => number;
/**
* Advance the agent's watermark to the snapshot index returned by recordTrigger.
* Only messages appended after that snapshot remain visible on the next trigger.
*/
consumeHistory: (agentId: string, roomId: string, snapshotIdx: number) => void;
};
type RoomQueue = {
entries: HistoryEntry[];
/** Absolute index of entries[0] — increases as old entries are trimmed. */
baseIndex: number;
};
export function createRoomHistoryTracker(
maxQueueSize = DEFAULT_MAX_QUEUE_SIZE,
maxRoomQueues = DEFAULT_MAX_ROOM_QUEUES,
): RoomHistoryTracker {
const roomQueues = new Map<string, RoomQueue>();
/** Maps `${agentId}:${roomId}` → absolute consumed-up-to index */
const agentWatermarks = new Map<string, number>();
function clearRoomWatermarks(roomId: string): void {
const roomSuffix = `:${roomId}`;
for (const key of agentWatermarks.keys()) {
if (key.endsWith(roomSuffix)) {
agentWatermarks.delete(key);
}
}
}
function getOrCreateQueue(roomId: string): RoomQueue {
let queue = roomQueues.get(roomId);
if (!queue) {
queue = { entries: [], baseIndex: 0 };
roomQueues.set(roomId, queue);
// FIFO eviction to prevent unbounded growth across many rooms
if (roomQueues.size > maxRoomQueues) {
const oldest = roomQueues.keys().next().value;
if (oldest !== undefined) {
roomQueues.delete(oldest);
clearRoomWatermarks(oldest);
}
}
}
return queue;
}
function appendToQueue(queue: RoomQueue, entry: HistoryEntry): number {
queue.entries.push(entry);
if (queue.entries.length > maxQueueSize) {
const overflow = queue.entries.length - maxQueueSize;
queue.entries.splice(0, overflow);
queue.baseIndex += overflow;
}
return queue.baseIndex + queue.entries.length;
}
function wmKey(agentId: string, roomId: string): string {
return `${agentId}:${roomId}`;
}
return {
recordPending(roomId, entry) {
const queue = getOrCreateQueue(roomId);
appendToQueue(queue, entry);
},
getPendingHistory(agentId, roomId, limit) {
if (limit <= 0) return [];
const queue = roomQueues.get(roomId);
if (!queue || queue.entries.length === 0) return [];
const wm = agentWatermarks.get(wmKey(agentId, roomId)) ?? 0;
// startAbs: the first absolute index the agent hasn't seen yet
const startAbs = Math.max(wm, queue.baseIndex);
const startRel = startAbs - queue.baseIndex;
const available = queue.entries.slice(startRel);
// Cap to the last `limit` entries
return limit > 0 && available.length > limit ? available.slice(-limit) : available;
},
recordTrigger(roomId, entry) {
const queue = getOrCreateQueue(roomId);
return appendToQueue(queue, entry);
},
consumeHistory(agentId, roomId, snapshotIdx) {
const key = wmKey(agentId, roomId);
// Monotone write: never regress an already-advanced watermark.
// Guards against out-of-order completion when two triggers for the same
// (agentId, roomId) are in-flight concurrently.
agentWatermarks.set(key, Math.max(agentWatermarks.get(key) ?? 0, snapshotIdx));
// LRU-style eviction to prevent unbounded growth
if (agentWatermarks.size > MAX_WATERMARK_ENTRIES) {
const oldest = agentWatermarks.keys().next().value;
if (oldest !== undefined) {
agentWatermarks.delete(oldest);
}
}
},
};
}

View File

@ -123,6 +123,12 @@ export type MatrixConfig = {
startupVerificationCooldownHours?: number;
/** Max outbound media size in MB. */
mediaMaxMb?: number;
/**
* Number of recent room messages shown to the agent as context when it is mentioned
* in a group chat (0 = disabled). Applies to room messages that did not directly
* trigger a reply. Default: 0 (disabled).
*/
historyLimit?: number;
/** Auto-join invites (always|allowlist|off). Default: off. */
autoJoin?: "always" | "allowlist" | "off";
/** Allowlist for auto-join invites (room IDs, aliases). */

View File

@ -6301,6 +6301,27 @@ export const GENERATED_BUNDLED_CHANNEL_CONFIG_METADATA = [
webhookPath: {
type: "string",
},
threadBindings: {
type: "object",
properties: {
enabled: {
type: "boolean",
},
idleHours: {
type: "number",
},
maxAgeHours: {
type: "number",
},
spawnSubagentSessions: {
type: "boolean",
},
spawnAcpSessions: {
type: "boolean",
},
},
additionalProperties: false,
},
accounts: {
type: "object",
propertyNames: {
@ -6372,6 +6393,27 @@ export const GENERATED_BUNDLED_CHANNEL_CONFIG_METADATA = [
webhookPath: {
type: "string",
},
threadBindings: {
type: "object",
properties: {
enabled: {
type: "boolean",
},
idleHours: {
type: "number",
},
maxAgeHours: {
type: "number",
},
spawnSubagentSessions: {
type: "boolean",
},
spawnAcpSessions: {
type: "boolean",
},
},
additionalProperties: false,
},
groups: {
type: "object",
propertyNames: {
@ -6674,6 +6716,17 @@ export const GENERATED_BUNDLED_CHANNEL_CONFIG_METADATA = [
type: "string",
enum: ["open", "disabled", "allowlist"],
},
streaming: {
anyOf: [
{
type: "string",
enum: ["partial", "off"],
},
{
type: "boolean",
},
],
},
replyToMode: {
type: "string",
enum: ["off", "first", "all"],
@ -6736,6 +6789,11 @@ export const GENERATED_BUNDLED_CHANNEL_CONFIG_METADATA = [
mediaMaxMb: {
type: "number",
},
historyLimit: {
type: "integer",
minimum: 0,
maximum: 9007199254740991,
},
autoJoin: {
type: "string",
enum: ["always", "allowlist", "off"],