refactor: isolate ACP final delivery flow

This commit is contained in:
Peter Steinberger 2026-03-24 10:35:04 -07:00
parent 822563d1ab
commit d58d90074f
No known key found for this signature in database
4 changed files with 168 additions and 154 deletions

View File

@ -62,6 +62,17 @@ describe("createAcpDispatchDeliveryCoordinator", () => {
expect(dispatcher.sendFinalReply).toHaveBeenCalledWith({ text: "hello" });
});
it("tracks successful final delivery separately from routed counters", async () => {
const coordinator = createCoordinator();
expect(coordinator.hasDeliveredFinalReply()).toBe(false);
await coordinator.deliver("final", { text: "hello" }, { skipTts: true });
expect(coordinator.hasDeliveredFinalReply()).toBe(true);
expect(coordinator.getRoutedCounts().final).toBe(0);
});
it("starts reply lifecycle only once when called directly and through deliver", async () => {
const onReplyStart = vi.fn(async () => {});
const coordinator = createCoordinator(onReplyStart);

View File

@ -27,6 +27,7 @@ type AcpDispatchDeliveryState = {
startedReplyLifecycle: boolean;
accumulatedBlockText: string;
blockCount: number;
deliveredFinalReply: boolean;
routedCounts: Record<ReplyDispatchKind, number>;
toolMessageByCallId: Map<string, ToolMessageHandle>;
};
@ -40,6 +41,7 @@ export type AcpDispatchDeliveryCoordinator = {
) => Promise<boolean>;
getBlockCount: () => number;
getAccumulatedBlockText: () => string;
hasDeliveredFinalReply: () => boolean;
getRoutedCounts: () => Record<ReplyDispatchKind, number>;
applyRoutedCounts: (counts: Record<ReplyDispatchKind, number>) => void;
};
@ -60,6 +62,7 @@ export function createAcpDispatchDeliveryCoordinator(params: {
startedReplyLifecycle: false,
accumulatedBlockText: "",
blockCount: 0,
deliveredFinalReply: false,
routedCounts: {
tool: 0,
block: 0,
@ -177,17 +180,23 @@ export function createAcpDispatchDeliveryCoordinator(params: {
messageId: result.messageId,
});
}
if (kind === "final") {
state.deliveredFinalReply = true;
}
state.routedCounts[kind] += 1;
return true;
}
if (kind === "tool") {
return params.dispatcher.sendToolResult(ttsPayload);
const delivered =
kind === "tool"
? params.dispatcher.sendToolResult(ttsPayload)
: kind === "block"
? params.dispatcher.sendBlockReply(ttsPayload)
: params.dispatcher.sendFinalReply(ttsPayload);
if (kind === "final" && delivered) {
state.deliveredFinalReply = true;
}
if (kind === "block") {
return params.dispatcher.sendBlockReply(ttsPayload);
}
return params.dispatcher.sendFinalReply(ttsPayload);
return delivered;
};
return {
@ -195,6 +204,7 @@ export function createAcpDispatchDeliveryCoordinator(params: {
deliver,
getBlockCount: () => state.blockCount,
getAccumulatedBlockText: () => state.accumulatedBlockText,
hasDeliveredFinalReply: () => state.deliveredFinalReply,
getRoutedCounts: () => ({ ...state.routedCounts }),
applyRoutedCounts: (counts) => {
counts.tool += state.routedCounts.tool;

View File

@ -89,6 +89,7 @@ vi.mock("../../infra/outbound/session-binding-service.js", () => ({
const { tryDispatchAcpReply } = await import("./dispatch-acp.js");
const sessionKey = "agent:codex-acp:session-1";
type MockTtsReply = Awaited<ReturnType<typeof ttsMocks.maybeApplyTtsToPayload>>;
function createDispatcher(): {
dispatcher: ReplyDispatcher;
@ -200,6 +201,15 @@ function mockVisibleTextTurn(text = "visible") {
);
}
function mockRoutedTextTurn(text: string) {
managerMocks.runTurn.mockImplementation(
async ({ onEvent }: { onEvent: (event: unknown) => Promise<void> }) => {
await onEvent({ type: "text_delta", text, tag: "agent_message_chunk" });
await onEvent({ type: "done" });
},
);
}
async function dispatchVisibleTurn(onReplyStart: () => void) {
await runDispatch({
bodyForAgent: "visible",
@ -208,6 +218,32 @@ async function dispatchVisibleTurn(onReplyStart: () => void) {
});
}
function queueTtsReplies(...replies: MockTtsReply[]) {
for (const reply of replies) {
ttsMocks.maybeApplyTtsToPayload.mockResolvedValueOnce(reply);
}
}
async function runRoutedAcpTextTurn(text: string) {
mockRoutedTextTurn(text);
const { dispatcher } = createDispatcher();
const result = await runDispatch({
bodyForAgent: "run acp",
dispatcher,
shouldRouteToOriginating: true,
});
return { result };
}
function expectSecondRoutedPayload(payload: Partial<MockTtsReply>) {
expect(routeMocks.routeReply).toHaveBeenNthCalledWith(
2,
expect.objectContaining({
payload: expect.objectContaining(payload),
}),
);
}
describe("tryDispatchAcpReply", () => {
beforeEach(() => {
managerMocks.resolveSession.mockReset();
@ -236,12 +272,7 @@ describe("tryDispatchAcpReply", () => {
it("routes ACP block output to originating channel", async () => {
setReadyAcpResolution();
managerMocks.runTurn.mockImplementation(
async ({ onEvent }: { onEvent: (event: unknown) => Promise<void> }) => {
await onEvent({ type: "text_delta", text: "hello", tag: "agent_message_chunk" });
await onEvent({ type: "done" });
},
);
mockRoutedTextTurn("hello");
const { dispatcher } = createDispatcher();
const result = await runDispatch({
@ -439,92 +470,37 @@ describe("tryDispatchAcpReply", () => {
it("delivers final fallback text even when routed block text already existed", async () => {
setReadyAcpResolution();
ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" });
ttsMocks.maybeApplyTtsToPayload
.mockResolvedValueOnce({ text: "CODEX_OK" })
.mockResolvedValueOnce({} as ReturnType<typeof ttsMocks.maybeApplyTtsToPayload>);
managerMocks.runTurn.mockImplementation(
async ({ onEvent }: { onEvent: (event: unknown) => Promise<void> }) => {
await onEvent({ type: "text_delta", text: "CODEX_OK", tag: "agent_message_chunk" });
await onEvent({ type: "done" });
},
);
const { dispatcher } = createDispatcher();
const result = await runDispatch({
bodyForAgent: "run acp",
dispatcher,
shouldRouteToOriginating: true,
});
queueTtsReplies({ text: "CODEX_OK" }, {} as ReturnType<typeof ttsMocks.maybeApplyTtsToPayload>);
const { result } = await runRoutedAcpTextTurn("CODEX_OK");
expect(result?.counts.block).toBe(1);
expect(result?.counts.final).toBe(1);
expect(routeMocks.routeReply).toHaveBeenCalledTimes(2);
expect(routeMocks.routeReply).toHaveBeenNthCalledWith(
2,
expect.objectContaining({
payload: expect.objectContaining({
text: "CODEX_OK",
}),
}),
);
expectSecondRoutedPayload({ text: "CODEX_OK" });
});
it("does not add text fallback when final TTS already delivered audio", async () => {
setReadyAcpResolution();
ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" });
ttsMocks.maybeApplyTtsToPayload
.mockResolvedValueOnce({ text: "Task completed" })
.mockResolvedValueOnce({
mediaUrl: "https://example.com/final.mp3",
audioAsVoice: true,
} as Awaited<ReturnType<typeof ttsMocks.maybeApplyTtsToPayload>>);
managerMocks.runTurn.mockImplementation(
async ({ onEvent }: { onEvent: (event: unknown) => Promise<void> }) => {
await onEvent({ type: "text_delta", text: "Task completed", tag: "agent_message_chunk" });
await onEvent({ type: "done" });
},
);
const { dispatcher } = createDispatcher();
const result = await runDispatch({
bodyForAgent: "run acp",
dispatcher,
shouldRouteToOriginating: true,
});
queueTtsReplies({ text: "Task completed" }, {
mediaUrl: "https://example.com/final.mp3",
audioAsVoice: true,
} as MockTtsReply);
const { result } = await runRoutedAcpTextTurn("Task completed");
expect(result?.counts.block).toBe(1);
expect(result?.counts.final).toBe(1);
expect(routeMocks.routeReply).toHaveBeenCalledTimes(2);
expect(routeMocks.routeReply).toHaveBeenNthCalledWith(
2,
expect.objectContaining({
payload: expect.objectContaining({
mediaUrl: "https://example.com/final.mp3",
audioAsVoice: true,
}),
}),
);
expectSecondRoutedPayload({
mediaUrl: "https://example.com/final.mp3",
audioAsVoice: true,
});
});
it("skips fallback when TTS mode is all (blocks already processed with TTS)", async () => {
setReadyAcpResolution();
// Configure TTS mode as "all" - blocks already went through TTS
ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "all" });
managerMocks.runTurn.mockImplementation(
async ({ onEvent }: { onEvent: (event: unknown) => Promise<void> }) => {
await onEvent({ type: "text_delta", text: "Response", tag: "agent_message_chunk" });
await onEvent({ type: "done" });
},
);
const { dispatcher } = createDispatcher();
const result = await runDispatch({
bodyForAgent: "run acp",
dispatcher,
shouldRouteToOriginating: true,
});
const { result } = await runRoutedAcpTextTurn("Response");
expect(result?.counts.block).toBe(1);
expect(result?.counts.final).toBe(0);

View File

@ -30,7 +30,10 @@ import {
} from "../commands-registry.js";
import type { FinalizedMsgContext } from "../templating.js";
import { createAcpReplyProjector } from "./acp-projector.js";
import { createAcpDispatchDeliveryCoordinator } from "./dispatch-acp-delivery.js";
import {
createAcpDispatchDeliveryCoordinator,
type AcpDispatchDeliveryCoordinator,
} from "./dispatch-acp-delivery.js";
import type { ReplyDispatcher, ReplyDispatchKind } from "./reply-dispatcher.js";
type DispatchProcessedRecorder = (
@ -184,6 +187,85 @@ export type AcpDispatchAttemptResult = {
counts: Record<ReplyDispatchKind, number>;
};
async function finalizeAcpTurnOutput(params: {
cfg: OpenClawConfig;
sessionKey: string;
delivery: AcpDispatchDeliveryCoordinator;
inboundAudio: boolean;
sessionTtsAuto?: TtsAutoMode;
ttsChannel?: string;
shouldEmitResolvedIdentityNotice: boolean;
}): Promise<boolean> {
let queuedFinal = false;
const ttsMode = resolveTtsConfig(params.cfg).mode ?? "final";
const accumulatedBlockText = params.delivery.getAccumulatedBlockText();
const hasAccumulatedBlockText = accumulatedBlockText.trim().length > 0;
let finalMediaDelivered = false;
if (ttsMode === "final" && hasAccumulatedBlockText) {
try {
const ttsSyntheticReply = await maybeApplyTtsToPayload({
payload: { text: accumulatedBlockText },
cfg: params.cfg,
channel: params.ttsChannel,
kind: "final",
inboundAudio: params.inboundAudio,
ttsAuto: params.sessionTtsAuto,
});
if (ttsSyntheticReply.mediaUrl) {
const delivered = await params.delivery.deliver("final", {
mediaUrl: ttsSyntheticReply.mediaUrl,
audioAsVoice: ttsSyntheticReply.audioAsVoice,
});
queuedFinal = queuedFinal || delivered;
finalMediaDelivered = delivered;
}
} catch (err) {
logVerbose(
`dispatch-acp: accumulated ACP block TTS failed: ${err instanceof Error ? err.message : String(err)}`,
);
}
}
// Some ACP parent surfaces only expose terminal replies, so block routing alone is not enough
// to prove the final result was visible to the user.
const shouldDeliverTextFallback =
ttsMode !== "all" &&
hasAccumulatedBlockText &&
!finalMediaDelivered &&
!params.delivery.hasDeliveredFinalReply();
if (shouldDeliverTextFallback) {
const delivered = await params.delivery.deliver(
"final",
{ text: accumulatedBlockText },
{ skipTts: true },
);
queuedFinal = queuedFinal || delivered;
}
if (params.shouldEmitResolvedIdentityNotice) {
const currentMeta = readAcpSessionEntry({
cfg: params.cfg,
sessionKey: params.sessionKey,
})?.acp;
const identityAfterTurn = resolveSessionIdentityFromMeta(currentMeta);
if (!isSessionIdentityPending(identityAfterTurn)) {
const resolvedDetails = resolveAcpThreadSessionDetailLines({
sessionKey: params.sessionKey,
meta: currentMeta,
});
if (resolvedDetails.length > 0) {
const delivered = await params.delivery.deliver("final", {
text: prefixSystemMessage(["Session ids resolved.", ...resolvedDetails].join("\n")),
});
queuedFinal = queuedFinal || delivered;
}
}
}
return queuedFinal;
}
export async function tryDispatchAcpReply(params: {
ctx: FinalizedMsgContext;
cfg: OpenClawConfig;
@ -314,81 +396,16 @@ export async function tryDispatchAcpReply(params: {
});
await projector.flush(true);
const ttsMode = resolveTtsConfig(params.cfg).mode ?? "final";
const accumulatedBlockText = delivery.getAccumulatedBlockText();
const hasAccumulatedBlockText = accumulatedBlockText.trim().length > 0;
const routedCounts = delivery.getRoutedCounts();
// Attempt final TTS synthesis for ttsMode="final" (independent of delivery status).
// This ensures routed ACP flows still get final audio even after block delivery.
let ttsSucceeded = false;
if (ttsMode === "final" && hasAccumulatedBlockText) {
try {
const ttsSyntheticReply = await maybeApplyTtsToPayload({
payload: { text: accumulatedBlockText },
cfg: params.cfg,
channel: params.ttsChannel,
kind: "final",
inboundAudio: params.inboundAudio,
ttsAuto: params.sessionTtsAuto,
});
if (ttsSyntheticReply.mediaUrl) {
// Use delivery.deliver to ensure proper routing in cross-provider ACP turns.
// Pass audioAsVoice to avoid re-entering TTS synthesis.
const delivered = await delivery.deliver("final", {
mediaUrl: ttsSyntheticReply.mediaUrl,
audioAsVoice: ttsSyntheticReply.audioAsVoice,
});
queuedFinal = queuedFinal || delivered;
if (delivered) {
ttsSucceeded = true; // TTS succeeded AND delivered, skip text fallback
}
}
} catch (err) {
logVerbose(
`dispatch-acp: accumulated ACP block TTS failed: ${err instanceof Error ? err.message : String(err)}`,
);
// TTS failed, fall through to text fallback
}
}
// Only attempt text fallback if a terminal delivery has not happened yet.
// Block routing alone is not enough here because some ACP parent surfaces only expose
// terminal replies, so routed block text can still leave the user with no visible result.
// For non-routed flows, we still skip the final-only guard because dispatcher counts are not
// tracked here and we want to recover when earlier block sends fail.
// Skip fallback for ttsMode="all" because blocks were already processed with TTS.
const shouldSkipTextFallback =
ttsMode === "all" ||
ttsSucceeded ||
(params.shouldRouteToOriginating && routedCounts.final > 0);
if (!shouldSkipTextFallback && hasAccumulatedBlockText) {
// Fallback to text-only delivery (no TTS).
// For routed flows, use delivery.deliver with skipTts to bypass TTS re-entry.
// For non-routed flows, use dispatcher directly to bypass TTS.
const delivered = params.shouldRouteToOriginating
? await delivery.deliver("final", { text: accumulatedBlockText }, { skipTts: true })
: params.dispatcher.sendFinalReply({ text: accumulatedBlockText });
queuedFinal = queuedFinal || delivered;
}
if (shouldEmitResolvedIdentityNotice) {
const currentMeta = readAcpSessionEntry({
queuedFinal =
(await finalizeAcpTurnOutput({
cfg: params.cfg,
sessionKey,
})?.acp;
const identityAfterTurn = resolveSessionIdentityFromMeta(currentMeta);
if (!isSessionIdentityPending(identityAfterTurn)) {
const resolvedDetails = resolveAcpThreadSessionDetailLines({
sessionKey,
meta: currentMeta,
});
if (resolvedDetails.length > 0) {
const delivered = await delivery.deliver("final", {
text: prefixSystemMessage(["Session ids resolved.", ...resolvedDetails].join("\n")),
});
queuedFinal = queuedFinal || delivered;
}
}
}
delivery,
inboundAudio: params.inboundAudio,
sessionTtsAuto: params.sessionTtsAuto,
ttsChannel: params.ttsChannel,
shouldEmitResolvedIdentityNotice,
})) || queuedFinal;
const counts = params.dispatcher.getQueuedCounts();
delivery.applyRoutedCounts(counts);