From ab8c834aabff4cd3adc4ffecda9439f44e34fa5e Mon Sep 17 00:00:00 2001 From: Tak Hoffman <781889+Takhoffman@users.noreply.github.com> Date: Tue, 24 Mar 2026 00:54:46 -0500 Subject: [PATCH] fix: report dropped subagent announce queue deliveries --- src/agents/subagent-announce-dispatch.test.ts | 19 +++++++ src/agents/subagent-announce-dispatch.ts | 11 ++-- .../subagent-announce.format.e2e.test.ts | 54 +++++++++++++++++++ src/agents/subagent-announce.ts | 6 +-- 4 files changed, 84 insertions(+), 6 deletions(-) diff --git a/src/agents/subagent-announce-dispatch.test.ts b/src/agents/subagent-announce-dispatch.test.ts index 0cf7f8de87b..9362407575e 100644 --- a/src/agents/subagent-announce-dispatch.test.ts +++ b/src/agents/subagent-announce-dispatch.test.ts @@ -135,6 +135,25 @@ describe("runSubagentAnnounceDispatch", () => { ]); }); + it("does not fall through to direct delivery when non-completion queue drops the new item", async () => { + const queue = vi.fn(async () => "dropped" as const); + const direct = vi.fn(async () => ({ delivered: true, path: "direct" as const })); + + const result = await runSubagentAnnounceDispatch({ + expectsCompletionMessage: false, + queue, + direct, + }); + + expect(queue).toHaveBeenCalledTimes(1); + expect(direct).not.toHaveBeenCalled(); + expect(result).toEqual({ + delivered: false, + path: "none", + phases: [{ phase: "queue-primary", delivered: false, path: "none", error: undefined }], + }); + }); + it("preserves direct failure when completion dispatch aborts before fallback queue", async () => { const controller = new AbortController(); const queue = vi.fn(async () => "queued" as const); diff --git a/src/agents/subagent-announce-dispatch.ts b/src/agents/subagent-announce-dispatch.ts index b94ca7865dc..48ce0b835da 100644 --- a/src/agents/subagent-announce-dispatch.ts +++ b/src/agents/subagent-announce-dispatch.ts @@ -1,6 +1,6 @@ export type SubagentDeliveryPath = "queued" | "steered" | "direct" | "none"; -export type SubagentAnnounceQueueOutcome = "steered" | "queued" | "none"; +export type SubagentAnnounceQueueOutcome = "steered" | "queued" | "none" | "dropped"; export type SubagentAnnounceDeliveryResult = { delivered: boolean; @@ -70,11 +70,15 @@ export async function runSubagentAnnounceDispatch(params: { } if (!params.expectsCompletionMessage) { - const primaryQueue = mapQueueOutcomeToDeliveryResult(await params.queue()); + const primaryQueueOutcome = await params.queue(); + const primaryQueue = mapQueueOutcomeToDeliveryResult(primaryQueueOutcome); appendPhase("queue-primary", primaryQueue); if (primaryQueue.delivered) { return withPhases(primaryQueue); } + if (primaryQueueOutcome === "dropped") { + return withPhases(primaryQueue); + } const primaryDirect = await params.direct(); appendPhase("direct-primary", primaryDirect); @@ -91,7 +95,8 @@ export async function runSubagentAnnounceDispatch(params: { return withPhases(primaryDirect); } - const fallbackQueue = mapQueueOutcomeToDeliveryResult(await params.queue()); + const fallbackQueueOutcome = await params.queue(); + const fallbackQueue = mapQueueOutcomeToDeliveryResult(fallbackQueueOutcome); appendPhase("queue-fallback", fallbackQueue); if (fallbackQueue.delivered) { return withPhases(fallbackQueue); diff --git a/src/agents/subagent-announce.format.e2e.test.ts b/src/agents/subagent-announce.format.e2e.test.ts index a08c7fe4d15..5766fcc0d09 100644 --- a/src/agents/subagent-announce.format.e2e.test.ts +++ b/src/agents/subagent-announce.format.e2e.test.ts @@ -1469,6 +1469,60 @@ describe("subagent announce formatting", () => { expect(agentSpy).toHaveBeenCalledTimes(1); }); + it("does not report queued delivery when active announce queue drops a new item", async () => { + embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true); + embeddedRunMock.isEmbeddedPiRunStreaming.mockReturnValue(false); + sessionStore = { + "agent:main:main": { + sessionId: "session-drop-new", + lastChannel: "telegram", + lastTo: "123", + queueMode: "followup", + queueDebounceMs: 0, + queueCap: 1, + queueDrop: "new", + }, + }; + + let resolveFirstSend = () => {}; + const firstSendPending = new Promise((resolve) => { + resolveFirstSend = resolve; + }); + agentSpy.mockImplementation(async (_req: AgentCallRequest) => { + await firstSendPending; + return { runId: "run-main", status: "ok" }; + }); + + const firstDidAnnounce = await runSubagentAnnounceFlow({ + childSessionKey: "agent:main:subagent:test", + childRunId: "run-queued-first", + requesterSessionKey: "main", + requesterDisplayKey: "main", + announceType: "subagent task", + ...defaultOutcomeAnnounce, + }); + + await vi.waitFor(() => { + expect(agentSpy).toHaveBeenCalledTimes(1); + }); + + const secondDidAnnounce = await runSubagentAnnounceFlow({ + childSessionKey: "agent:main:subagent:test", + childRunId: "run-queued-dropped", + requesterSessionKey: "main", + requesterDisplayKey: "main", + announceType: "subagent task", + ...defaultOutcomeAnnounce, + }); + + expect(firstDidAnnounce).toBe(true); + expect(secondDidAnnounce).toBe(false); + expect(agentSpy).toHaveBeenCalledTimes(1); + + resolveFirstSend(); + await Promise.resolve(); + }); + it("keeps queued idempotency unique for same-ms distinct child runs", async () => { embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true); embeddedRunMock.isEmbeddedPiRunStreaming.mockReturnValue(false); diff --git a/src/agents/subagent-announce.ts b/src/agents/subagent-announce.ts index 6dfa8cafe0d..7443eccf23a 100644 --- a/src/agents/subagent-announce.ts +++ b/src/agents/subagent-announce.ts @@ -830,7 +830,7 @@ async function maybeQueueSubagentAnnounce(params: { sourceTool?: string; internalEvents?: AgentInternalEvent[]; signal?: AbortSignal; -}): Promise<"steered" | "queued" | "none"> { +}): Promise<"steered" | "queued" | "none" | "dropped"> { if (params.signal?.aborted) { return "none"; } @@ -863,7 +863,7 @@ async function maybeQueueSubagentAnnounce(params: { queueSettings.mode === "interrupt"; if (isActive && (shouldFollowup || queueSettings.mode === "steer")) { const origin = resolveAnnounceOrigin(entry, params.requesterOrigin); - enqueueAnnounce({ + const didQueue = enqueueAnnounce({ key: buildAnnounceQueueKey(canonicalKey, origin), item: { announceId: params.announceId, @@ -880,7 +880,7 @@ async function maybeQueueSubagentAnnounce(params: { settings: queueSettings, send: sendAnnounce, }); - return "queued"; + return didQueue ? "queued" : "dropped"; } return "none";