Matrix: refresh watermark recency on consume

This commit is contained in:
Gustavo Madeira Santana 2026-03-30 11:00:12 -04:00
parent 2b3d776403
commit 7eac926f44
No known key found for this signature in database
2 changed files with 44 additions and 8 deletions

View File

@ -64,6 +64,33 @@ describe("createRoomHistoryTracker — watermark monotonicity", () => {
expect(retried.history.map((entry) => entry.body)).toEqual(["msg1"]);
expect(retried.snapshotIdx).toBe(first.snapshotIdx);
});
it("refreshes watermark recency before capped-map eviction", () => {
const tracker = createRoomHistoryTracker(200, 10, 2);
const room1 = "!room1:test";
const room2 = "!room2:test";
const room3 = "!room3:test";
tracker.recordPending(room1, entry("old msg in room1"));
const snap1 = tracker.recordTrigger(room1, entry("trigger in room1"));
tracker.consumeHistory(AGENT, room1, snap1);
tracker.recordPending(room2, entry("old msg in room2"));
const snap2 = tracker.recordTrigger(room2, entry("trigger in room2"));
tracker.consumeHistory(AGENT, room2, snap2);
// Refresh room1 so room2 becomes the stalest watermark entry.
tracker.consumeHistory(AGENT, room1, snap1);
tracker.recordPending(room3, entry("old msg in room3"));
const snap3 = tracker.recordTrigger(room3, entry("trigger in room3"));
tracker.consumeHistory(AGENT, room3, snap3);
tracker.recordPending(room1, entry("new msg in room1"));
const room1History = tracker.getPendingHistory(AGENT, room1, 100);
expect(room1History).toHaveLength(1);
expect(room1History[0]?.body).toBe("new msg in room1");
});
});
describe("createRoomHistoryTracker — roomQueues eviction", () => {

View File

@ -85,6 +85,7 @@ type RoomQueue = {
export function createRoomHistoryTracker(
maxQueueSize = DEFAULT_MAX_QUEUE_SIZE,
maxRoomQueues = DEFAULT_MAX_ROOM_QUEUES,
maxWatermarkEntries = MAX_WATERMARK_ENTRIES,
): RoomHistoryTracker {
const roomQueues = new Map<string, RoomQueue>();
/** Maps `${agentId}:${roomId}` → absolute consumed-up-to index */
@ -137,6 +138,21 @@ export function createRoomHistoryTracker(
return `${agentId}:${messageId.trim()}`;
}
function rememberWatermark(key: string, snapshotIdx: number): void {
const nextSnapshotIdx = Math.max(agentWatermarks.get(key) ?? 0, snapshotIdx);
if (agentWatermarks.has(key)) {
// Refresh insertion order so capped-map eviction removes the stalest pair, not an active one.
agentWatermarks.delete(key);
}
agentWatermarks.set(key, nextSnapshotIdx);
if (agentWatermarks.size > maxWatermarkEntries) {
const oldest = agentWatermarks.keys().next().value;
if (oldest !== undefined) {
agentWatermarks.delete(oldest);
}
}
}
function computePendingHistory(
queue: RoomQueue,
agentId: string,
@ -208,18 +224,11 @@ export function createRoomHistoryTracker(
// Monotone write: never regress an already-advanced watermark.
// Guards against out-of-order completion when two triggers for the same
// (agentId, roomId) are in-flight concurrently.
agentWatermarks.set(key, Math.max(agentWatermarks.get(key) ?? 0, snapshotIdx));
rememberWatermark(key, snapshotIdx);
const retryKey = preparedTriggerKey(agentId, messageId);
if (queue && retryKey) {
queue.preparedTriggers.delete(retryKey);
}
// LRU-style eviction to prevent unbounded growth
if (agentWatermarks.size > MAX_WATERMARK_ENTRIES) {
const oldest = agentWatermarks.keys().next().value;
if (oldest !== undefined) {
agentWatermarks.delete(oldest);
}
}
},
};
}