fix: report dropped subagent announce queue deliveries

This commit is contained in:
Tak Hoffman 2026-03-24 00:54:46 -05:00
parent 0fc27409c0
commit ab8c834aab
No known key found for this signature in database
4 changed files with 84 additions and 6 deletions

View File

@ -135,6 +135,25 @@ describe("runSubagentAnnounceDispatch", () => {
]);
});
it("does not fall through to direct delivery when non-completion queue drops the new item", async () => {
const queue = vi.fn(async () => "dropped" as const);
const direct = vi.fn(async () => ({ delivered: true, path: "direct" as const }));
const result = await runSubagentAnnounceDispatch({
expectsCompletionMessage: false,
queue,
direct,
});
expect(queue).toHaveBeenCalledTimes(1);
expect(direct).not.toHaveBeenCalled();
expect(result).toEqual({
delivered: false,
path: "none",
phases: [{ phase: "queue-primary", delivered: false, path: "none", error: undefined }],
});
});
it("preserves direct failure when completion dispatch aborts before fallback queue", async () => {
const controller = new AbortController();
const queue = vi.fn(async () => "queued" as const);

View File

@ -1,6 +1,6 @@
export type SubagentDeliveryPath = "queued" | "steered" | "direct" | "none";
export type SubagentAnnounceQueueOutcome = "steered" | "queued" | "none";
export type SubagentAnnounceQueueOutcome = "steered" | "queued" | "none" | "dropped";
export type SubagentAnnounceDeliveryResult = {
delivered: boolean;
@ -70,11 +70,15 @@ export async function runSubagentAnnounceDispatch(params: {
}
if (!params.expectsCompletionMessage) {
const primaryQueue = mapQueueOutcomeToDeliveryResult(await params.queue());
const primaryQueueOutcome = await params.queue();
const primaryQueue = mapQueueOutcomeToDeliveryResult(primaryQueueOutcome);
appendPhase("queue-primary", primaryQueue);
if (primaryQueue.delivered) {
return withPhases(primaryQueue);
}
if (primaryQueueOutcome === "dropped") {
return withPhases(primaryQueue);
}
const primaryDirect = await params.direct();
appendPhase("direct-primary", primaryDirect);
@ -91,7 +95,8 @@ export async function runSubagentAnnounceDispatch(params: {
return withPhases(primaryDirect);
}
const fallbackQueue = mapQueueOutcomeToDeliveryResult(await params.queue());
const fallbackQueueOutcome = await params.queue();
const fallbackQueue = mapQueueOutcomeToDeliveryResult(fallbackQueueOutcome);
appendPhase("queue-fallback", fallbackQueue);
if (fallbackQueue.delivered) {
return withPhases(fallbackQueue);

View File

@ -1469,6 +1469,60 @@ describe("subagent announce formatting", () => {
expect(agentSpy).toHaveBeenCalledTimes(1);
});
it("does not report queued delivery when active announce queue drops a new item", async () => {
embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true);
embeddedRunMock.isEmbeddedPiRunStreaming.mockReturnValue(false);
sessionStore = {
"agent:main:main": {
sessionId: "session-drop-new",
lastChannel: "telegram",
lastTo: "123",
queueMode: "followup",
queueDebounceMs: 0,
queueCap: 1,
queueDrop: "new",
},
};
let resolveFirstSend = () => {};
const firstSendPending = new Promise<void>((resolve) => {
resolveFirstSend = resolve;
});
agentSpy.mockImplementation(async (_req: AgentCallRequest) => {
await firstSendPending;
return { runId: "run-main", status: "ok" };
});
const firstDidAnnounce = await runSubagentAnnounceFlow({
childSessionKey: "agent:main:subagent:test",
childRunId: "run-queued-first",
requesterSessionKey: "main",
requesterDisplayKey: "main",
announceType: "subagent task",
...defaultOutcomeAnnounce,
});
await vi.waitFor(() => {
expect(agentSpy).toHaveBeenCalledTimes(1);
});
const secondDidAnnounce = await runSubagentAnnounceFlow({
childSessionKey: "agent:main:subagent:test",
childRunId: "run-queued-dropped",
requesterSessionKey: "main",
requesterDisplayKey: "main",
announceType: "subagent task",
...defaultOutcomeAnnounce,
});
expect(firstDidAnnounce).toBe(true);
expect(secondDidAnnounce).toBe(false);
expect(agentSpy).toHaveBeenCalledTimes(1);
resolveFirstSend();
await Promise.resolve();
});
it("keeps queued idempotency unique for same-ms distinct child runs", async () => {
embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true);
embeddedRunMock.isEmbeddedPiRunStreaming.mockReturnValue(false);

View File

@ -830,7 +830,7 @@ async function maybeQueueSubagentAnnounce(params: {
sourceTool?: string;
internalEvents?: AgentInternalEvent[];
signal?: AbortSignal;
}): Promise<"steered" | "queued" | "none"> {
}): Promise<"steered" | "queued" | "none" | "dropped"> {
if (params.signal?.aborted) {
return "none";
}
@ -863,7 +863,7 @@ async function maybeQueueSubagentAnnounce(params: {
queueSettings.mode === "interrupt";
if (isActive && (shouldFollowup || queueSettings.mode === "steer")) {
const origin = resolveAnnounceOrigin(entry, params.requesterOrigin);
enqueueAnnounce({
const didQueue = enqueueAnnounce({
key: buildAnnounceQueueKey(canonicalKey, origin),
item: {
announceId: params.announceId,
@ -880,7 +880,7 @@ async function maybeQueueSubagentAnnounce(params: {
settings: queueSettings,
send: sendAnnounce,
});
return "queued";
return didQueue ? "queued" : "dropped";
}
return "none";