Matrix: limit ingress locking to room history

This commit is contained in:
Gustavo Madeira Santana 2026-03-30 15:05:51 -04:00
parent be740253de
commit b6f88b72e8
No known key found for this signature in database
2 changed files with 80 additions and 10 deletions

View File

@ -249,6 +249,46 @@ describe("matrix group chat history — scenario 1: basic accumulation", () => {
}
});
it("history-enabled rooms do not serialize DM ingress heavy work", async () => {
let resolveFirstName: (() => void) | undefined;
let nameLookupCalls = 0;
const getMemberDisplayName = vi.fn(async () => {
nameLookupCalls += 1;
if (nameLookupCalls === 1) {
await new Promise<void>((resolve) => {
resolveFirstName = resolve;
});
}
return "sender";
});
const { handler } = createMatrixHandlerTestHarness({
historyLimit: 20,
isDirectMessage: true,
getMemberDisplayName,
dispatchReplyFromConfig: async () => ({
queuedFinal: true,
counts: { final: 1, block: 0, tool: 0 },
}),
});
const first = handler(DEFAULT_ROOM, makeRoomPlainEvent({ eventId: "$dm-a", body: "first dm" }));
await vi.waitFor(() => {
expect(resolveFirstName).toBeTypeOf("function");
});
const second = handler(
DEFAULT_ROOM,
makeRoomPlainEvent({ eventId: "$dm-b", body: "second dm" }),
);
await vi.waitFor(() => {
expect(nameLookupCalls).toBe(2);
});
resolveFirstName?.();
await Promise.all([first, second]);
});
it("includes skipped media-only room messages in next trigger history", async () => {
const finalizeInboundContext = vi.fn((ctx: unknown) => ctx);
const { handler } = createMatrixHandlerTestHarness({

View File

@ -287,11 +287,6 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
}
};
const runHistoryAwareRoomIngress = async <T>(
roomId: string,
task: () => Promise<T>,
): Promise<T> => (historyLimit > 0 ? runRoomIngress(roomId, task) : task());
return async (roomId: string, event: MatrixRawEvent) => {
const eventId = typeof event.event_id === "string" ? event.event_id.trim() : "";
let claimedInboundEvent = false;
@ -336,7 +331,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
await inboundDeduper.commitEvent({ roomId, eventId });
claimedInboundEvent = false;
};
const ingressResult = await runHistoryAwareRoomIngress(roomId, async () => {
const readIngressPrefix = async () => {
const selfUserId = await client.getUserId();
if (senderId === selfUserId) {
return;
@ -389,8 +384,18 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
senderId,
selfUserId,
});
return { content, isDirectMessage, locationPayload, selfUserId };
};
const continueIngress = async (params: {
content: RoomMessageEventContent;
isDirectMessage: boolean;
locationPayload: MatrixLocationPayload | null;
selfUserId: string;
}) => {
let content = params.content;
const isDirectMessage = params.isDirectMessage;
const isRoom = !isDirectMessage;
const { locationPayload, selfUserId } = params;
if (isRoom && groupPolicy === "disabled") {
await commitInboundEventIfClaimed();
return;
@ -846,8 +851,33 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
triggerSnapshot,
threadRootId: _threadRootId,
};
});
if (!ingressResult) {
};
const ingressResult =
historyLimit > 0
? await runRoomIngress(roomId, async () => {
const prefix = await readIngressPrefix();
if (!prefix) {
return;
}
if (prefix.isDirectMessage) {
return { deferredPrefix: prefix } as const;
}
return { ingressResult: await continueIngress(prefix) } as const;
})
: undefined;
const resolvedIngressResult =
historyLimit > 0
? ingressResult?.deferredPrefix
? await continueIngress(ingressResult.deferredPrefix)
: ingressResult?.ingressResult
: await (async () => {
const prefix = await readIngressPrefix();
if (!prefix) {
return;
}
return await continueIngress(prefix);
})();
if (!resolvedIngressResult) {
return;
}
@ -869,7 +899,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
messageId: _messageId,
triggerSnapshot,
threadRootId: _threadRootId,
} = ingressResult;
} = resolvedIngressResult;
// Keep the per-room ingress gate focused on ordering-sensitive state updates.
// Prompt/session enrichment below can run concurrently after the history snapshot is fixed.