refactor(queue): share runtime settings and summary helpers

This commit is contained in:
Peter Steinberger 2026-02-18 17:54:56 +00:00
parent 84841aebe5
commit 1aa4d3a6f0
5 changed files with 187 additions and 64 deletions

View File

@ -6,10 +6,12 @@ import {
normalizeDeliveryContext,
} from "../utils/delivery-context.js";
import {
applyQueueRuntimeSettings,
applyQueueDropPolicy,
buildCollectPrompt,
buildQueueSummaryPrompt,
clearQueueSummaryState,
hasCrossChannelItems,
previewQueueSummaryPrompt,
waitForQueueDebounce,
} from "../utils/queue-helpers.js";
@ -47,22 +49,6 @@ type AnnounceQueueState = {
const ANNOUNCE_QUEUES = new Map<string, AnnounceQueueState>();
function previewQueueSummaryPrompt(queue: AnnounceQueueState): string | undefined {
return buildQueueSummaryPrompt({
state: {
dropPolicy: queue.dropPolicy,
droppedCount: queue.droppedCount,
summaryLines: [...queue.summaryLines],
},
noun: "announce",
});
}
function clearQueueSummaryState(queue: AnnounceQueueState) {
queue.droppedCount = 0;
queue.summaryLines = [];
}
export function resetAnnounceQueuesForTests() {
// Test isolation: other suites may leave a draining queue behind in the worker.
// Clearing the map alone isn't enough because drain loops capture `queue` by reference.
@ -82,16 +68,10 @@ function getAnnounceQueue(
) {
const existing = ANNOUNCE_QUEUES.get(key);
if (existing) {
existing.mode = settings.mode;
existing.debounceMs =
typeof settings.debounceMs === "number"
? Math.max(0, settings.debounceMs)
: existing.debounceMs;
existing.cap =
typeof settings.cap === "number" && settings.cap > 0
? Math.floor(settings.cap)
: existing.cap;
existing.dropPolicy = settings.dropPolicy ?? existing.dropPolicy;
applyQueueRuntimeSettings({
target: existing,
settings,
});
existing.send = send;
return existing;
}
@ -107,6 +87,10 @@ function getAnnounceQueue(
summaryLines: [],
send,
};
applyQueueRuntimeSettings({
target: created,
settings,
});
ANNOUNCE_QUEUES.set(key, created);
return created;
}
@ -152,7 +136,7 @@ function scheduleAnnounceDrain(key: string) {
continue;
}
const items = queue.items.slice();
const summary = previewQueueSummaryPrompt(queue);
const summary = previewQueueSummaryPrompt({ state: queue, noun: "announce" });
const prompt = buildCollectPrompt({
title: "[Queued announce messages while agent was busy]",
items,
@ -171,7 +155,7 @@ function scheduleAnnounceDrain(key: string) {
continue;
}
const summaryPrompt = previewQueueSummaryPrompt(queue);
const summaryPrompt = previewQueueSummaryPrompt({ state: queue, noun: "announce" });
if (summaryPrompt) {
const next = queue.items[0];
if (!next) {

View File

@ -1,34 +1,15 @@
import { defaultRuntime } from "../../../runtime.js";
import {
buildCollectPrompt,
buildQueueSummaryPrompt,
clearQueueSummaryState,
hasCrossChannelItems,
previewQueueSummaryPrompt,
waitForQueueDebounce,
} from "../../../utils/queue-helpers.js";
import { isRoutableChannel } from "../route-reply.js";
import { FOLLOWUP_QUEUES } from "./state.js";
import type { FollowupRun } from "./types.js";
function previewQueueSummaryPrompt(queue: {
dropPolicy: "summarize" | "old" | "new";
droppedCount: number;
summaryLines: string[];
}): string | undefined {
return buildQueueSummaryPrompt({
state: {
dropPolicy: queue.dropPolicy,
droppedCount: queue.droppedCount,
summaryLines: [...queue.summaryLines],
},
noun: "message",
});
}
function clearQueueSummaryState(queue: { droppedCount: number; summaryLines: string[] }): void {
queue.droppedCount = 0;
queue.summaryLines = [];
}
export function scheduleFollowupDrain(
key: string,
runFollowup: (run: FollowupRun) => Promise<void>,
@ -89,7 +70,7 @@ export function scheduleFollowupDrain(
}
const items = queue.items.slice();
const summary = previewQueueSummaryPrompt(queue);
const summary = previewQueueSummaryPrompt({ state: queue, noun: "message" });
const run = items.at(-1)?.run ?? queue.lastRun;
if (!run) {
break;
@ -127,7 +108,7 @@ export function scheduleFollowupDrain(
continue;
}
const summaryPrompt = previewQueueSummaryPrompt(queue);
const summaryPrompt = previewQueueSummaryPrompt({ state: queue, noun: "message" });
if (summaryPrompt) {
const run = queue.lastRun;
if (!run) {

View File

@ -1,3 +1,4 @@
import { applyQueueRuntimeSettings } from "../../../utils/queue-helpers.js";
import type { FollowupRun, QueueDropPolicy, QueueMode, QueueSettings } from "./types.js";
export type FollowupQueueState = {
@ -22,16 +23,10 @@ export const FOLLOWUP_QUEUES = new Map<string, FollowupQueueState>();
export function getFollowupQueue(key: string, settings: QueueSettings): FollowupQueueState {
const existing = FOLLOWUP_QUEUES.get(key);
if (existing) {
existing.mode = settings.mode;
existing.debounceMs =
typeof settings.debounceMs === "number"
? Math.max(0, settings.debounceMs)
: existing.debounceMs;
existing.cap =
typeof settings.cap === "number" && settings.cap > 0
? Math.floor(settings.cap)
: existing.cap;
existing.dropPolicy = settings.dropPolicy ?? existing.dropPolicy;
applyQueueRuntimeSettings({
target: existing,
settings,
});
return existing;
}
@ -52,6 +47,10 @@ export function getFollowupQueue(key: string, settings: QueueSettings): Followup
droppedCount: 0,
summaryLines: [],
};
applyQueueRuntimeSettings({
target: created,
settings,
});
FOLLOWUP_QUEUES.set(key, created);
return created;
}

View File

@ -0,0 +1,113 @@
import { describe, expect, it } from "vitest";
import {
applyQueueRuntimeSettings,
buildQueueSummaryPrompt,
clearQueueSummaryState,
previewQueueSummaryPrompt,
} from "./queue-helpers.js";
describe("applyQueueRuntimeSettings", () => {
it("updates runtime queue settings with normalization", () => {
const target = {
mode: "followup" as const,
debounceMs: 1000,
cap: 20,
dropPolicy: "summarize" as const,
};
applyQueueRuntimeSettings({
target,
settings: {
mode: "collect",
debounceMs: -12,
cap: 9.8,
dropPolicy: "new",
},
});
expect(target).toEqual({
mode: "collect",
debounceMs: 0,
cap: 9,
dropPolicy: "new",
});
});
it("keeps existing values when optional settings are missing/invalid", () => {
const target = {
mode: "followup" as const,
debounceMs: 1000,
cap: 20,
dropPolicy: "summarize" as const,
};
applyQueueRuntimeSettings({
target,
settings: {
mode: "queue",
cap: 0,
},
});
expect(target).toEqual({
mode: "queue",
debounceMs: 1000,
cap: 20,
dropPolicy: "summarize",
});
});
});
describe("queue summary helpers", () => {
it("previewQueueSummaryPrompt does not mutate state", () => {
const state = {
dropPolicy: "summarize" as const,
droppedCount: 2,
summaryLines: ["first", "second"],
};
const prompt = previewQueueSummaryPrompt({
state,
noun: "message",
});
expect(prompt).toContain("[Queue overflow] Dropped 2 messages due to cap.");
expect(prompt).toContain("first");
expect(state).toEqual({
dropPolicy: "summarize",
droppedCount: 2,
summaryLines: ["first", "second"],
});
});
it("buildQueueSummaryPrompt clears state after rendering", () => {
const state = {
dropPolicy: "summarize" as const,
droppedCount: 1,
summaryLines: ["line"],
};
const prompt = buildQueueSummaryPrompt({
state,
noun: "announce",
});
expect(prompt).toContain("[Queue overflow] Dropped 1 announce due to cap.");
expect(state).toEqual({
dropPolicy: "summarize",
droppedCount: 0,
summaryLines: [],
});
});
it("clearQueueSummaryState resets summary counters", () => {
const state = {
dropPolicy: "summarize" as const,
droppedCount: 5,
summaryLines: ["a", "b"],
};
clearQueueSummaryState(state);
expect(state.droppedCount).toBe(0);
expect(state.summaryLines).toEqual([]);
});
});

View File

@ -11,6 +11,53 @@ export type QueueState<T> = QueueSummaryState & {
cap: number;
};
export function clearQueueSummaryState(state: QueueSummaryState): void {
state.droppedCount = 0;
state.summaryLines = [];
}
export function previewQueueSummaryPrompt(params: {
state: QueueSummaryState;
noun: string;
title?: string;
}): string | undefined {
return buildQueueSummaryPrompt({
state: {
dropPolicy: params.state.dropPolicy,
droppedCount: params.state.droppedCount,
summaryLines: [...params.state.summaryLines],
},
noun: params.noun,
title: params.title,
});
}
export function applyQueueRuntimeSettings<TMode extends string>(params: {
target: {
mode: TMode;
debounceMs: number;
cap: number;
dropPolicy: QueueDropPolicy;
};
settings: {
mode: TMode;
debounceMs?: number;
cap?: number;
dropPolicy?: QueueDropPolicy;
};
}): void {
params.target.mode = params.settings.mode;
params.target.debounceMs =
typeof params.settings.debounceMs === "number"
? Math.max(0, params.settings.debounceMs)
: params.target.debounceMs;
params.target.cap =
typeof params.settings.cap === "number" && params.settings.cap > 0
? Math.floor(params.settings.cap)
: params.target.cap;
params.target.dropPolicy = params.settings.dropPolicy ?? params.target.dropPolicy;
}
export function elideQueueText(text: string, limit = 140): string {
if (text.length <= limit) {
return text;
@ -101,8 +148,7 @@ export function buildQueueSummaryPrompt(params: {
lines.push(`- ${line}`);
}
}
params.state.droppedCount = 0;
params.state.summaryLines = [];
clearQueueSummaryState(params.state);
return lines.join("\n");
}