From d58d90074f75da214a60c3b41a29dd2f704e2bd6 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 24 Mar 2026 10:35:04 -0700 Subject: [PATCH] refactor: isolate ACP final delivery flow --- .../reply/dispatch-acp-delivery.test.ts | 11 ++ src/auto-reply/reply/dispatch-acp-delivery.ts | 22 ++- src/auto-reply/reply/dispatch-acp.test.ts | 124 ++++++------- src/auto-reply/reply/dispatch-acp.ts | 165 ++++++++++-------- 4 files changed, 168 insertions(+), 154 deletions(-) diff --git a/src/auto-reply/reply/dispatch-acp-delivery.test.ts b/src/auto-reply/reply/dispatch-acp-delivery.test.ts index b7193a13ea5..c584cd3853e 100644 --- a/src/auto-reply/reply/dispatch-acp-delivery.test.ts +++ b/src/auto-reply/reply/dispatch-acp-delivery.test.ts @@ -62,6 +62,17 @@ describe("createAcpDispatchDeliveryCoordinator", () => { expect(dispatcher.sendFinalReply).toHaveBeenCalledWith({ text: "hello" }); }); + it("tracks successful final delivery separately from routed counters", async () => { + const coordinator = createCoordinator(); + + expect(coordinator.hasDeliveredFinalReply()).toBe(false); + + await coordinator.deliver("final", { text: "hello" }, { skipTts: true }); + + expect(coordinator.hasDeliveredFinalReply()).toBe(true); + expect(coordinator.getRoutedCounts().final).toBe(0); + }); + it("starts reply lifecycle only once when called directly and through deliver", async () => { const onReplyStart = vi.fn(async () => {}); const coordinator = createCoordinator(onReplyStart); diff --git a/src/auto-reply/reply/dispatch-acp-delivery.ts b/src/auto-reply/reply/dispatch-acp-delivery.ts index 67d33e3de13..24310e80f67 100644 --- a/src/auto-reply/reply/dispatch-acp-delivery.ts +++ b/src/auto-reply/reply/dispatch-acp-delivery.ts @@ -27,6 +27,7 @@ type AcpDispatchDeliveryState = { startedReplyLifecycle: boolean; accumulatedBlockText: string; blockCount: number; + deliveredFinalReply: boolean; routedCounts: Record; toolMessageByCallId: Map; }; @@ -40,6 +41,7 @@ export type AcpDispatchDeliveryCoordinator = { ) => Promise; getBlockCount: () => number; getAccumulatedBlockText: () => string; + hasDeliveredFinalReply: () => boolean; getRoutedCounts: () => Record; applyRoutedCounts: (counts: Record) => void; }; @@ -60,6 +62,7 @@ export function createAcpDispatchDeliveryCoordinator(params: { startedReplyLifecycle: false, accumulatedBlockText: "", blockCount: 0, + deliveredFinalReply: false, routedCounts: { tool: 0, block: 0, @@ -177,17 +180,23 @@ export function createAcpDispatchDeliveryCoordinator(params: { messageId: result.messageId, }); } + if (kind === "final") { + state.deliveredFinalReply = true; + } state.routedCounts[kind] += 1; return true; } - if (kind === "tool") { - return params.dispatcher.sendToolResult(ttsPayload); + const delivered = + kind === "tool" + ? params.dispatcher.sendToolResult(ttsPayload) + : kind === "block" + ? params.dispatcher.sendBlockReply(ttsPayload) + : params.dispatcher.sendFinalReply(ttsPayload); + if (kind === "final" && delivered) { + state.deliveredFinalReply = true; } - if (kind === "block") { - return params.dispatcher.sendBlockReply(ttsPayload); - } - return params.dispatcher.sendFinalReply(ttsPayload); + return delivered; }; return { @@ -195,6 +204,7 @@ export function createAcpDispatchDeliveryCoordinator(params: { deliver, getBlockCount: () => state.blockCount, getAccumulatedBlockText: () => state.accumulatedBlockText, + hasDeliveredFinalReply: () => state.deliveredFinalReply, getRoutedCounts: () => ({ ...state.routedCounts }), applyRoutedCounts: (counts) => { counts.tool += state.routedCounts.tool; diff --git a/src/auto-reply/reply/dispatch-acp.test.ts b/src/auto-reply/reply/dispatch-acp.test.ts index 5c53d80940b..6a18822edff 100644 --- a/src/auto-reply/reply/dispatch-acp.test.ts +++ b/src/auto-reply/reply/dispatch-acp.test.ts @@ -89,6 +89,7 @@ vi.mock("../../infra/outbound/session-binding-service.js", () => ({ const { tryDispatchAcpReply } = await import("./dispatch-acp.js"); const sessionKey = "agent:codex-acp:session-1"; +type MockTtsReply = Awaited>; function createDispatcher(): { dispatcher: ReplyDispatcher; @@ -200,6 +201,15 @@ function mockVisibleTextTurn(text = "visible") { ); } +function mockRoutedTextTurn(text: string) { + managerMocks.runTurn.mockImplementation( + async ({ onEvent }: { onEvent: (event: unknown) => Promise }) => { + await onEvent({ type: "text_delta", text, tag: "agent_message_chunk" }); + await onEvent({ type: "done" }); + }, + ); +} + async function dispatchVisibleTurn(onReplyStart: () => void) { await runDispatch({ bodyForAgent: "visible", @@ -208,6 +218,32 @@ async function dispatchVisibleTurn(onReplyStart: () => void) { }); } +function queueTtsReplies(...replies: MockTtsReply[]) { + for (const reply of replies) { + ttsMocks.maybeApplyTtsToPayload.mockResolvedValueOnce(reply); + } +} + +async function runRoutedAcpTextTurn(text: string) { + mockRoutedTextTurn(text); + const { dispatcher } = createDispatcher(); + const result = await runDispatch({ + bodyForAgent: "run acp", + dispatcher, + shouldRouteToOriginating: true, + }); + return { result }; +} + +function expectSecondRoutedPayload(payload: Partial) { + expect(routeMocks.routeReply).toHaveBeenNthCalledWith( + 2, + expect.objectContaining({ + payload: expect.objectContaining(payload), + }), + ); +} + describe("tryDispatchAcpReply", () => { beforeEach(() => { managerMocks.resolveSession.mockReset(); @@ -236,12 +272,7 @@ describe("tryDispatchAcpReply", () => { it("routes ACP block output to originating channel", async () => { setReadyAcpResolution(); - managerMocks.runTurn.mockImplementation( - async ({ onEvent }: { onEvent: (event: unknown) => Promise }) => { - await onEvent({ type: "text_delta", text: "hello", tag: "agent_message_chunk" }); - await onEvent({ type: "done" }); - }, - ); + mockRoutedTextTurn("hello"); const { dispatcher } = createDispatcher(); const result = await runDispatch({ @@ -439,92 +470,37 @@ describe("tryDispatchAcpReply", () => { it("delivers final fallback text even when routed block text already existed", async () => { setReadyAcpResolution(); ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" }); - ttsMocks.maybeApplyTtsToPayload - .mockResolvedValueOnce({ text: "CODEX_OK" }) - .mockResolvedValueOnce({} as ReturnType); - - managerMocks.runTurn.mockImplementation( - async ({ onEvent }: { onEvent: (event: unknown) => Promise }) => { - await onEvent({ type: "text_delta", text: "CODEX_OK", tag: "agent_message_chunk" }); - await onEvent({ type: "done" }); - }, - ); - - const { dispatcher } = createDispatcher(); - const result = await runDispatch({ - bodyForAgent: "run acp", - dispatcher, - shouldRouteToOriginating: true, - }); + queueTtsReplies({ text: "CODEX_OK" }, {} as ReturnType); + const { result } = await runRoutedAcpTextTurn("CODEX_OK"); expect(result?.counts.block).toBe(1); expect(result?.counts.final).toBe(1); expect(routeMocks.routeReply).toHaveBeenCalledTimes(2); - expect(routeMocks.routeReply).toHaveBeenNthCalledWith( - 2, - expect.objectContaining({ - payload: expect.objectContaining({ - text: "CODEX_OK", - }), - }), - ); + expectSecondRoutedPayload({ text: "CODEX_OK" }); }); it("does not add text fallback when final TTS already delivered audio", async () => { setReadyAcpResolution(); ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" }); - ttsMocks.maybeApplyTtsToPayload - .mockResolvedValueOnce({ text: "Task completed" }) - .mockResolvedValueOnce({ - mediaUrl: "https://example.com/final.mp3", - audioAsVoice: true, - } as Awaited>); - managerMocks.runTurn.mockImplementation( - async ({ onEvent }: { onEvent: (event: unknown) => Promise }) => { - await onEvent({ type: "text_delta", text: "Task completed", tag: "agent_message_chunk" }); - await onEvent({ type: "done" }); - }, - ); - - const { dispatcher } = createDispatcher(); - const result = await runDispatch({ - bodyForAgent: "run acp", - dispatcher, - shouldRouteToOriginating: true, - }); + queueTtsReplies({ text: "Task completed" }, { + mediaUrl: "https://example.com/final.mp3", + audioAsVoice: true, + } as MockTtsReply); + const { result } = await runRoutedAcpTextTurn("Task completed"); expect(result?.counts.block).toBe(1); expect(result?.counts.final).toBe(1); expect(routeMocks.routeReply).toHaveBeenCalledTimes(2); - expect(routeMocks.routeReply).toHaveBeenNthCalledWith( - 2, - expect.objectContaining({ - payload: expect.objectContaining({ - mediaUrl: "https://example.com/final.mp3", - audioAsVoice: true, - }), - }), - ); + expectSecondRoutedPayload({ + mediaUrl: "https://example.com/final.mp3", + audioAsVoice: true, + }); }); it("skips fallback when TTS mode is all (blocks already processed with TTS)", async () => { setReadyAcpResolution(); - // Configure TTS mode as "all" - blocks already went through TTS ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "all" }); - - managerMocks.runTurn.mockImplementation( - async ({ onEvent }: { onEvent: (event: unknown) => Promise }) => { - await onEvent({ type: "text_delta", text: "Response", tag: "agent_message_chunk" }); - await onEvent({ type: "done" }); - }, - ); - - const { dispatcher } = createDispatcher(); - const result = await runDispatch({ - bodyForAgent: "run acp", - dispatcher, - shouldRouteToOriginating: true, - }); + const { result } = await runRoutedAcpTextTurn("Response"); expect(result?.counts.block).toBe(1); expect(result?.counts.final).toBe(0); diff --git a/src/auto-reply/reply/dispatch-acp.ts b/src/auto-reply/reply/dispatch-acp.ts index 47e2d966ef7..5f0c168e305 100644 --- a/src/auto-reply/reply/dispatch-acp.ts +++ b/src/auto-reply/reply/dispatch-acp.ts @@ -30,7 +30,10 @@ import { } from "../commands-registry.js"; import type { FinalizedMsgContext } from "../templating.js"; import { createAcpReplyProjector } from "./acp-projector.js"; -import { createAcpDispatchDeliveryCoordinator } from "./dispatch-acp-delivery.js"; +import { + createAcpDispatchDeliveryCoordinator, + type AcpDispatchDeliveryCoordinator, +} from "./dispatch-acp-delivery.js"; import type { ReplyDispatcher, ReplyDispatchKind } from "./reply-dispatcher.js"; type DispatchProcessedRecorder = ( @@ -184,6 +187,85 @@ export type AcpDispatchAttemptResult = { counts: Record; }; +async function finalizeAcpTurnOutput(params: { + cfg: OpenClawConfig; + sessionKey: string; + delivery: AcpDispatchDeliveryCoordinator; + inboundAudio: boolean; + sessionTtsAuto?: TtsAutoMode; + ttsChannel?: string; + shouldEmitResolvedIdentityNotice: boolean; +}): Promise { + let queuedFinal = false; + const ttsMode = resolveTtsConfig(params.cfg).mode ?? "final"; + const accumulatedBlockText = params.delivery.getAccumulatedBlockText(); + const hasAccumulatedBlockText = accumulatedBlockText.trim().length > 0; + + let finalMediaDelivered = false; + if (ttsMode === "final" && hasAccumulatedBlockText) { + try { + const ttsSyntheticReply = await maybeApplyTtsToPayload({ + payload: { text: accumulatedBlockText }, + cfg: params.cfg, + channel: params.ttsChannel, + kind: "final", + inboundAudio: params.inboundAudio, + ttsAuto: params.sessionTtsAuto, + }); + if (ttsSyntheticReply.mediaUrl) { + const delivered = await params.delivery.deliver("final", { + mediaUrl: ttsSyntheticReply.mediaUrl, + audioAsVoice: ttsSyntheticReply.audioAsVoice, + }); + queuedFinal = queuedFinal || delivered; + finalMediaDelivered = delivered; + } + } catch (err) { + logVerbose( + `dispatch-acp: accumulated ACP block TTS failed: ${err instanceof Error ? err.message : String(err)}`, + ); + } + } + + // Some ACP parent surfaces only expose terminal replies, so block routing alone is not enough + // to prove the final result was visible to the user. + const shouldDeliverTextFallback = + ttsMode !== "all" && + hasAccumulatedBlockText && + !finalMediaDelivered && + !params.delivery.hasDeliveredFinalReply(); + if (shouldDeliverTextFallback) { + const delivered = await params.delivery.deliver( + "final", + { text: accumulatedBlockText }, + { skipTts: true }, + ); + queuedFinal = queuedFinal || delivered; + } + + if (params.shouldEmitResolvedIdentityNotice) { + const currentMeta = readAcpSessionEntry({ + cfg: params.cfg, + sessionKey: params.sessionKey, + })?.acp; + const identityAfterTurn = resolveSessionIdentityFromMeta(currentMeta); + if (!isSessionIdentityPending(identityAfterTurn)) { + const resolvedDetails = resolveAcpThreadSessionDetailLines({ + sessionKey: params.sessionKey, + meta: currentMeta, + }); + if (resolvedDetails.length > 0) { + const delivered = await params.delivery.deliver("final", { + text: prefixSystemMessage(["Session ids resolved.", ...resolvedDetails].join("\n")), + }); + queuedFinal = queuedFinal || delivered; + } + } + } + + return queuedFinal; +} + export async function tryDispatchAcpReply(params: { ctx: FinalizedMsgContext; cfg: OpenClawConfig; @@ -314,81 +396,16 @@ export async function tryDispatchAcpReply(params: { }); await projector.flush(true); - const ttsMode = resolveTtsConfig(params.cfg).mode ?? "final"; - const accumulatedBlockText = delivery.getAccumulatedBlockText(); - const hasAccumulatedBlockText = accumulatedBlockText.trim().length > 0; - const routedCounts = delivery.getRoutedCounts(); - // Attempt final TTS synthesis for ttsMode="final" (independent of delivery status). - // This ensures routed ACP flows still get final audio even after block delivery. - let ttsSucceeded = false; - if (ttsMode === "final" && hasAccumulatedBlockText) { - try { - const ttsSyntheticReply = await maybeApplyTtsToPayload({ - payload: { text: accumulatedBlockText }, - cfg: params.cfg, - channel: params.ttsChannel, - kind: "final", - inboundAudio: params.inboundAudio, - ttsAuto: params.sessionTtsAuto, - }); - if (ttsSyntheticReply.mediaUrl) { - // Use delivery.deliver to ensure proper routing in cross-provider ACP turns. - // Pass audioAsVoice to avoid re-entering TTS synthesis. - const delivered = await delivery.deliver("final", { - mediaUrl: ttsSyntheticReply.mediaUrl, - audioAsVoice: ttsSyntheticReply.audioAsVoice, - }); - queuedFinal = queuedFinal || delivered; - if (delivered) { - ttsSucceeded = true; // TTS succeeded AND delivered, skip text fallback - } - } - } catch (err) { - logVerbose( - `dispatch-acp: accumulated ACP block TTS failed: ${err instanceof Error ? err.message : String(err)}`, - ); - // TTS failed, fall through to text fallback - } - } - // Only attempt text fallback if a terminal delivery has not happened yet. - // Block routing alone is not enough here because some ACP parent surfaces only expose - // terminal replies, so routed block text can still leave the user with no visible result. - // For non-routed flows, we still skip the final-only guard because dispatcher counts are not - // tracked here and we want to recover when earlier block sends fail. - // Skip fallback for ttsMode="all" because blocks were already processed with TTS. - const shouldSkipTextFallback = - ttsMode === "all" || - ttsSucceeded || - (params.shouldRouteToOriginating && routedCounts.final > 0); - if (!shouldSkipTextFallback && hasAccumulatedBlockText) { - // Fallback to text-only delivery (no TTS). - // For routed flows, use delivery.deliver with skipTts to bypass TTS re-entry. - // For non-routed flows, use dispatcher directly to bypass TTS. - const delivered = params.shouldRouteToOriginating - ? await delivery.deliver("final", { text: accumulatedBlockText }, { skipTts: true }) - : params.dispatcher.sendFinalReply({ text: accumulatedBlockText }); - queuedFinal = queuedFinal || delivered; - } - - if (shouldEmitResolvedIdentityNotice) { - const currentMeta = readAcpSessionEntry({ + queuedFinal = + (await finalizeAcpTurnOutput({ cfg: params.cfg, sessionKey, - })?.acp; - const identityAfterTurn = resolveSessionIdentityFromMeta(currentMeta); - if (!isSessionIdentityPending(identityAfterTurn)) { - const resolvedDetails = resolveAcpThreadSessionDetailLines({ - sessionKey, - meta: currentMeta, - }); - if (resolvedDetails.length > 0) { - const delivered = await delivery.deliver("final", { - text: prefixSystemMessage(["Session ids resolved.", ...resolvedDetails].join("\n")), - }); - queuedFinal = queuedFinal || delivered; - } - } - } + delivery, + inboundAudio: params.inboundAudio, + sessionTtsAuto: params.sessionTtsAuto, + ttsChannel: params.ttsChannel, + shouldEmitResolvedIdentityNotice, + })) || queuedFinal; const counts = params.dispatcher.getQueuedCounts(); delivery.applyRoutedCounts(counts);