From b6f88b72e8a30e4e5b7bff53fc661bb9d01ab98b Mon Sep 17 00:00:00 2001 From: Gustavo Madeira Santana Date: Mon, 30 Mar 2026 15:05:51 -0400 Subject: [PATCH] Matrix: limit ingress locking to room history --- .../monitor/handler.group-history.test.ts | 40 +++++++++++++++ .../matrix/src/matrix/monitor/handler.ts | 50 +++++++++++++++---- 2 files changed, 80 insertions(+), 10 deletions(-) diff --git a/extensions/matrix/src/matrix/monitor/handler.group-history.test.ts b/extensions/matrix/src/matrix/monitor/handler.group-history.test.ts index fefd05a2692..5e854c64a8a 100644 --- a/extensions/matrix/src/matrix/monitor/handler.group-history.test.ts +++ b/extensions/matrix/src/matrix/monitor/handler.group-history.test.ts @@ -249,6 +249,46 @@ describe("matrix group chat history — scenario 1: basic accumulation", () => { } }); + it("history-enabled rooms do not serialize DM ingress heavy work", async () => { + let resolveFirstName: (() => void) | undefined; + let nameLookupCalls = 0; + const getMemberDisplayName = vi.fn(async () => { + nameLookupCalls += 1; + if (nameLookupCalls === 1) { + await new Promise((resolve) => { + resolveFirstName = resolve; + }); + } + return "sender"; + }); + + const { handler } = createMatrixHandlerTestHarness({ + historyLimit: 20, + isDirectMessage: true, + getMemberDisplayName, + dispatchReplyFromConfig: async () => ({ + queuedFinal: true, + counts: { final: 1, block: 0, tool: 0 }, + }), + }); + + const first = handler(DEFAULT_ROOM, makeRoomPlainEvent({ eventId: "$dm-a", body: "first dm" })); + await vi.waitFor(() => { + expect(resolveFirstName).toBeTypeOf("function"); + }); + + const second = handler( + DEFAULT_ROOM, + makeRoomPlainEvent({ eventId: "$dm-b", body: "second dm" }), + ); + await vi.waitFor(() => { + expect(nameLookupCalls).toBe(2); + }); + + resolveFirstName?.(); + await Promise.all([first, second]); + }); + it("includes skipped media-only room messages in next trigger history", async () => { const finalizeInboundContext = vi.fn((ctx: unknown) => ctx); const { handler } = createMatrixHandlerTestHarness({ diff --git a/extensions/matrix/src/matrix/monitor/handler.ts b/extensions/matrix/src/matrix/monitor/handler.ts index 842b557ece0..5fba58af5c9 100644 --- a/extensions/matrix/src/matrix/monitor/handler.ts +++ b/extensions/matrix/src/matrix/monitor/handler.ts @@ -287,11 +287,6 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam } }; - const runHistoryAwareRoomIngress = async ( - roomId: string, - task: () => Promise, - ): Promise => (historyLimit > 0 ? runRoomIngress(roomId, task) : task()); - return async (roomId: string, event: MatrixRawEvent) => { const eventId = typeof event.event_id === "string" ? event.event_id.trim() : ""; let claimedInboundEvent = false; @@ -336,7 +331,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam await inboundDeduper.commitEvent({ roomId, eventId }); claimedInboundEvent = false; }; - const ingressResult = await runHistoryAwareRoomIngress(roomId, async () => { + const readIngressPrefix = async () => { const selfUserId = await client.getUserId(); if (senderId === selfUserId) { return; @@ -389,8 +384,18 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam senderId, selfUserId, }); + return { content, isDirectMessage, locationPayload, selfUserId }; + }; + const continueIngress = async (params: { + content: RoomMessageEventContent; + isDirectMessage: boolean; + locationPayload: MatrixLocationPayload | null; + selfUserId: string; + }) => { + let content = params.content; + const isDirectMessage = params.isDirectMessage; const isRoom = !isDirectMessage; - + const { locationPayload, selfUserId } = params; if (isRoom && groupPolicy === "disabled") { await commitInboundEventIfClaimed(); return; @@ -846,8 +851,33 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam triggerSnapshot, threadRootId: _threadRootId, }; - }); - if (!ingressResult) { + }; + const ingressResult = + historyLimit > 0 + ? await runRoomIngress(roomId, async () => { + const prefix = await readIngressPrefix(); + if (!prefix) { + return; + } + if (prefix.isDirectMessage) { + return { deferredPrefix: prefix } as const; + } + return { ingressResult: await continueIngress(prefix) } as const; + }) + : undefined; + const resolvedIngressResult = + historyLimit > 0 + ? ingressResult?.deferredPrefix + ? await continueIngress(ingressResult.deferredPrefix) + : ingressResult?.ingressResult + : await (async () => { + const prefix = await readIngressPrefix(); + if (!prefix) { + return; + } + return await continueIngress(prefix); + })(); + if (!resolvedIngressResult) { return; } @@ -869,7 +899,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam messageId: _messageId, triggerSnapshot, threadRootId: _threadRootId, - } = ingressResult; + } = resolvedIngressResult; // Keep the per-room ingress gate focused on ordering-sensitive state updates. // Prompt/session enrichment below can run concurrently after the history snapshot is fixed.