diff --git a/extensions/matrix/src/matrix/monitor/room-history.test.ts b/extensions/matrix/src/matrix/monitor/room-history.test.ts index cb0f0243021..edf493ba0f1 100644 --- a/extensions/matrix/src/matrix/monitor/room-history.test.ts +++ b/extensions/matrix/src/matrix/monitor/room-history.test.ts @@ -64,6 +64,33 @@ describe("createRoomHistoryTracker — watermark monotonicity", () => { expect(retried.history.map((entry) => entry.body)).toEqual(["msg1"]); expect(retried.snapshotIdx).toBe(first.snapshotIdx); }); + + it("refreshes watermark recency before capped-map eviction", () => { + const tracker = createRoomHistoryTracker(200, 10, 2); + const room1 = "!room1:test"; + const room2 = "!room2:test"; + const room3 = "!room3:test"; + + tracker.recordPending(room1, entry("old msg in room1")); + const snap1 = tracker.recordTrigger(room1, entry("trigger in room1")); + tracker.consumeHistory(AGENT, room1, snap1); + + tracker.recordPending(room2, entry("old msg in room2")); + const snap2 = tracker.recordTrigger(room2, entry("trigger in room2")); + tracker.consumeHistory(AGENT, room2, snap2); + + // Refresh room1 so room2 becomes the stalest watermark entry. + tracker.consumeHistory(AGENT, room1, snap1); + + tracker.recordPending(room3, entry("old msg in room3")); + const snap3 = tracker.recordTrigger(room3, entry("trigger in room3")); + tracker.consumeHistory(AGENT, room3, snap3); + + tracker.recordPending(room1, entry("new msg in room1")); + const room1History = tracker.getPendingHistory(AGENT, room1, 100); + expect(room1History).toHaveLength(1); + expect(room1History[0]?.body).toBe("new msg in room1"); + }); }); describe("createRoomHistoryTracker — roomQueues eviction", () => { diff --git a/extensions/matrix/src/matrix/monitor/room-history.ts b/extensions/matrix/src/matrix/monitor/room-history.ts index 80c77a3a116..aabc535e422 100644 --- a/extensions/matrix/src/matrix/monitor/room-history.ts +++ b/extensions/matrix/src/matrix/monitor/room-history.ts @@ -85,6 +85,7 @@ type RoomQueue = { export function createRoomHistoryTracker( maxQueueSize = DEFAULT_MAX_QUEUE_SIZE, maxRoomQueues = DEFAULT_MAX_ROOM_QUEUES, + maxWatermarkEntries = MAX_WATERMARK_ENTRIES, ): RoomHistoryTracker { const roomQueues = new Map(); /** Maps `${agentId}:${roomId}` → absolute consumed-up-to index */ @@ -137,6 +138,21 @@ export function createRoomHistoryTracker( return `${agentId}:${messageId.trim()}`; } + function rememberWatermark(key: string, snapshotIdx: number): void { + const nextSnapshotIdx = Math.max(agentWatermarks.get(key) ?? 0, snapshotIdx); + if (agentWatermarks.has(key)) { + // Refresh insertion order so capped-map eviction removes the stalest pair, not an active one. + agentWatermarks.delete(key); + } + agentWatermarks.set(key, nextSnapshotIdx); + if (agentWatermarks.size > maxWatermarkEntries) { + const oldest = agentWatermarks.keys().next().value; + if (oldest !== undefined) { + agentWatermarks.delete(oldest); + } + } + } + function computePendingHistory( queue: RoomQueue, agentId: string, @@ -208,18 +224,11 @@ export function createRoomHistoryTracker( // Monotone write: never regress an already-advanced watermark. // Guards against out-of-order completion when two triggers for the same // (agentId, roomId) are in-flight concurrently. - agentWatermarks.set(key, Math.max(agentWatermarks.get(key) ?? 0, snapshotIdx)); + rememberWatermark(key, snapshotIdx); const retryKey = preparedTriggerKey(agentId, messageId); if (queue && retryKey) { queue.preparedTriggers.delete(retryKey); } - // LRU-style eviction to prevent unbounded growth - if (agentWatermarks.size > MAX_WATERMARK_ENTRIES) { - const oldest = agentWatermarks.keys().next().value; - if (oldest !== undefined) { - agentWatermarks.delete(oldest); - } - } }, }; }