diff --git a/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts b/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts index a6ddf28c110..4d17e8d42d0 100644 --- a/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts +++ b/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts @@ -56,12 +56,21 @@ type MatrixHandlerTestHarnessOptions = { dispatcher: Record; replyOptions: Record; markDispatchIdle: () => void; + markRunComplete: () => void; }; resolveHumanDelayConfig?: () => undefined; dispatchReplyFromConfig?: () => Promise<{ queuedFinal: boolean; counts: { final: number; block: number; tool: number }; }>; + withReplyDispatcher?: (params: { + dispatcher: { + markComplete?: () => void; + waitForIdle?: () => Promise; + }; + run: () => Promise; + onSettled?: () => void | Promise; + }) => Promise; inboundDeduper?: MatrixMonitorHandlerParams["inboundDeduper"]; shouldAckReaction?: () => boolean; enqueueSystemEvent?: (...args: unknown[]) => void; @@ -139,9 +148,32 @@ export function createMatrixHandlerTestHarness( dispatcher: {}, replyOptions: {}, markDispatchIdle: () => {}, + markRunComplete: () => {}, })), resolveHumanDelayConfig: options.resolveHumanDelayConfig ?? (() => undefined), dispatchReplyFromConfig, + withReplyDispatcher: + options.withReplyDispatcher ?? + (async (params: { + dispatcher: { + markComplete?: () => void; + waitForIdle?: () => Promise; + }; + run: () => Promise; + onSettled?: () => void | Promise; + }) => { + const { dispatcher, run, onSettled } = params; + try { + return await run(); + } finally { + dispatcher.markComplete?.(); + try { + await dispatcher.waitForIdle?.(); + } finally { + await onSettled?.(); + } + } + }), }, reactions: { shouldAckReaction: options.shouldAckReaction ?? (() => false), diff --git a/extensions/matrix/src/matrix/monitor/handler.test.ts b/extensions/matrix/src/matrix/monitor/handler.test.ts index 1f96e364ecc..19434083cf5 100644 --- a/extensions/matrix/src/matrix/monitor/handler.test.ts +++ b/extensions/matrix/src/matrix/monitor/handler.test.ts @@ -720,12 +720,36 @@ describe("matrix monitor handler pairing account scope", () => { dispatcher: {}, replyOptions: {}, markDispatchIdle: () => {}, + markRunComplete: () => {}, }), resolveHumanDelayConfig: () => undefined, dispatchReplyFromConfig: async () => ({ queuedFinal: true, counts: { final: 1, block: 0, tool: 0 }, }), + withReplyDispatcher: async ({ + dispatcher, + run, + onSettled, + }: { + dispatcher: { + markComplete?: () => void; + waitForIdle?: () => Promise; + }; + run: () => Promise; + onSettled?: () => void | Promise; + }) => { + try { + return await run(); + } finally { + dispatcher.markComplete?.(); + try { + await dispatcher.waitForIdle?.(); + } finally { + await onSettled?.(); + } + } + }, }, reactions: { shouldAckReaction: () => false, @@ -1022,7 +1046,7 @@ describe("matrix monitor handler durable inbound dedupe", () => { expect(inboundDeduper.releaseEvent).not.toHaveBeenCalled(); }); - it("commits inbound events before reply side effects", async () => { + it("commits inbound events only after queued replies finish delivering", async () => { const callOrder: string[] = []; const inboundDeduper = { claimEvent: vi.fn(() => { @@ -1050,6 +1074,23 @@ describe("matrix monitor handler durable inbound dedupe", () => { inboundDeduper, recordInboundSession, dispatchReplyFromConfig, + createReplyDispatcherWithTyping: () => ({ + dispatcher: { + markComplete: () => { + callOrder.push("mark-complete"); + }, + waitForIdle: async () => { + callOrder.push("wait-for-idle"); + }, + }, + replyOptions: {}, + markDispatchIdle: () => { + callOrder.push("dispatch-idle"); + }, + markRunComplete: () => { + callOrder.push("run-complete"); + }, + }), }); await handler( @@ -1060,7 +1101,16 @@ describe("matrix monitor handler durable inbound dedupe", () => { }), ); - expect(callOrder).toEqual(["claim", "record", "dispatch", "commit"]); + expect(callOrder).toEqual([ + "claim", + "record", + "dispatch", + "run-complete", + "mark-complete", + "wait-for-idle", + "dispatch-idle", + "commit", + ]); expect(inboundDeduper.releaseEvent).not.toHaveBeenCalled(); }); diff --git a/extensions/matrix/src/matrix/monitor/handler.ts b/extensions/matrix/src/matrix/monitor/handler.ts index 865c20d5a6a..8e34b04cec2 100644 --- a/extensions/matrix/src/matrix/monitor/handler.ts +++ b/extensions/matrix/src/matrix/monitor/handler.ts @@ -30,6 +30,7 @@ import { } from "../send.js"; import { resolveMatrixMonitorAccessState } from "./access-state.js"; import { resolveMatrixAckReactionConfig } from "./ack-config.js"; +import type { MatrixInboundEventDeduper } from "./inbound-dedupe.js"; import { resolveMatrixLocation, type MatrixLocationPayload } from "./location.js"; import { downloadMatrixMedia } from "./media.js"; import { resolveMentions } from "./mentions.js"; @@ -72,11 +73,7 @@ export type MatrixMonitorHandlerParams = { startupMs: number; startupGraceMs: number; dropPreStartupMessages: boolean; - inboundDeduper?: { - claimEvent: (params: { roomId: string; eventId: string }) => boolean; - commitEvent: (params: { roomId: string; eventId: string; eventTs?: number }) => Promise; - releaseEvent: (params: { roomId: string; eventId: string }) => void; - }; + inboundDeduper?: Pick; directTracker: { isDirectMessage: (params: { roomId: string; @@ -268,11 +265,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam if (!claimedInboundEvent || !inboundDeduper || !eventId) { return; } - await inboundDeduper.commitEvent({ - roomId, - eventId, - eventTs: eventTs ?? undefined, - }); + await inboundDeduper.commitEvent({ roomId, eventId }); claimedInboundEvent = false; }; if (dropPreStartupMessages) { @@ -871,7 +864,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam }); }, }); - const { dispatcher, replyOptions, markDispatchIdle } = + const { dispatcher, replyOptions, markDispatchIdle, markRunComplete } = core.channel.reply.createReplyDispatcherWithTyping({ ...prefixOptions, humanDelay: core.channel.reply.resolveHumanDelayConfig(cfg, route.agentId), @@ -897,17 +890,28 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam onIdle: typingCallbacks.onIdle, }); - const { queuedFinal, counts } = await core.channel.reply.dispatchReplyFromConfig({ - ctx: ctxPayload, - cfg, + const { queuedFinal, counts } = await core.channel.reply.withReplyDispatcher({ dispatcher, - replyOptions: { - ...replyOptions, - skillFilter: roomConfig?.skills, - onModelSelected, + onSettled: () => { + markDispatchIdle(); + }, + run: async () => { + try { + return await core.channel.reply.dispatchReplyFromConfig({ + ctx: ctxPayload, + cfg, + dispatcher, + replyOptions: { + ...replyOptions, + skillFilter: roomConfig?.skills, + onModelSelected, + }, + }); + } finally { + markRunComplete(); + } }, }); - markDispatchIdle(); if (!queuedFinal) { await commitInboundEventIfClaimed(); return; diff --git a/extensions/matrix/src/matrix/monitor/inbound-dedupe.test.ts b/extensions/matrix/src/matrix/monitor/inbound-dedupe.test.ts index da32ef4da66..e0ad423c1f1 100644 --- a/extensions/matrix/src/matrix/monitor/inbound-dedupe.test.ts +++ b/extensions/matrix/src/matrix/monitor/inbound-dedupe.test.ts @@ -97,4 +97,50 @@ describe("Matrix inbound event dedupe", () => { expect(deduper.claimEvent({ roomId: "!room:example.org", eventId: "$keep-2" })).toBe(false); expect(deduper.claimEvent({ roomId: "!room:example.org", eventId: "$keep-3" })).toBe(false); }); + + it("retains replayed backlog events based on processing time", async () => { + const storagePath = createStoragePath(); + let now = 100; + const first = await createMatrixInboundEventDeduper({ + auth: auth as never, + storagePath, + ttlMs: 20, + nowMs: () => now, + }); + + expect(first.claimEvent({ roomId: "!room:example.org", eventId: "$backlog" })).toBe(true); + await first.commitEvent({ + roomId: "!room:example.org", + eventId: "$backlog", + }); + await first.stop(); + + now = 110; + const second = await createMatrixInboundEventDeduper({ + auth: auth as never, + storagePath, + ttlMs: 20, + nowMs: () => now, + }); + expect(second.claimEvent({ roomId: "!room:example.org", eventId: "$backlog" })).toBe(false); + }); + + it("treats stop persistence failures as best-effort cleanup", async () => { + const blockingPath = createStoragePath(); + fs.writeFileSync(blockingPath, "blocking file", "utf8"); + const deduper = await createMatrixInboundEventDeduper({ + auth: auth as never, + storagePath: path.join(blockingPath, "nested", "inbound-dedupe.json"), + }); + + expect(deduper.claimEvent({ roomId: "!room:example.org", eventId: "$persist-fail" })).toBe( + true, + ); + await deduper.commitEvent({ + roomId: "!room:example.org", + eventId: "$persist-fail", + }); + + await expect(deduper.stop()).resolves.toBeUndefined(); + }); }); diff --git a/extensions/matrix/src/matrix/monitor/inbound-dedupe.ts b/extensions/matrix/src/matrix/monitor/inbound-dedupe.ts index e7ffa78af48..5784c1393a3 100644 --- a/extensions/matrix/src/matrix/monitor/inbound-dedupe.ts +++ b/extensions/matrix/src/matrix/monitor/inbound-dedupe.ts @@ -23,7 +23,7 @@ type StoredMatrixInboundDedupeState = { export type MatrixInboundEventDeduper = { claimEvent: (params: { roomId: string; eventId: string }) => boolean; - commitEvent: (params: { roomId: string; eventId: string; eventTs?: number }) => Promise; + commitEvent: (params: { roomId: string; eventId: string }) => Promise; releaseEvent: (params: { roomId: string; eventId: string }) => void; flush: () => Promise; stop: () => Promise; @@ -252,13 +252,13 @@ export async function createMatrixInboundEventDeduper(params: { pending.add(key); return true; }, - commitEvent: async ({ roomId, eventId, eventTs }) => { + commitEvent: async ({ roomId, eventId }) => { const key = buildEventKey({ roomId, eventId }); if (!key) { return; } pending.delete(key); - const ts = normalizeTimestamp(eventTs) ?? nowMs(); + const ts = nowMs(); seen.delete(key); seen.set(key, ts); pruneSeenEvents({ seen, ttlMs, maxEntries, nowMs: nowMs() }); @@ -273,7 +273,15 @@ export async function createMatrixInboundEventDeduper(params: { }, flush, stop: async () => { - await flush(); + try { + await flush(); + } catch (err) { + LogService.warn( + "MatrixInboundDedupe", + "Failed to flush Matrix inbound dedupe store during stop():", + err, + ); + } }, }; }