import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; import { beforeEach, describe, expect, it, vi } from "vitest"; import { AcpRuntimeError } from "../../acp/runtime/errors.js"; import type { AcpSessionStoreEntry } from "../../acp/runtime/session-meta.js"; import type { OpenClawConfig } from "../../config/config.js"; import type { SessionBindingRecord } from "../../infra/outbound/session-binding-service.js"; import type { ReplyDispatcher } from "./reply-dispatcher.js"; import { buildTestCtx } from "./test-ctx.js"; import { createAcpSessionMeta, createAcpTestConfig } from "./test-fixtures/acp-runtime.js"; const managerMocks = vi.hoisted(() => ({ resolveSession: vi.fn(), runTurn: vi.fn(), getObservabilitySnapshot: vi.fn(() => ({ turns: { queueDepth: 0 }, runtimeCache: { activeSessions: 0 }, })), })); const policyMocks = vi.hoisted(() => ({ resolveAcpDispatchPolicyError: vi.fn<(cfg: OpenClawConfig) => AcpRuntimeError | null>(() => null), resolveAcpAgentPolicyError: vi.fn<(cfg: OpenClawConfig, agent: string) => AcpRuntimeError | null>( () => null, ), })); const routeMocks = vi.hoisted(() => ({ routeReply: vi.fn(async (_params: unknown) => ({ ok: true, messageId: "mock" })), })); const messageActionMocks = vi.hoisted(() => ({ runMessageAction: vi.fn(async (_params: unknown) => ({ ok: true as const })), })); const ttsMocks = vi.hoisted(() => ({ maybeApplyTtsToPayload: vi.fn(async (paramsUnknown: unknown) => { const params = paramsUnknown as { payload: unknown }; return params.payload; }), resolveTtsConfig: vi.fn((_cfg: OpenClawConfig) => ({ mode: "final" })), })); const sessionMetaMocks = vi.hoisted(() => ({ readAcpSessionEntry: vi.fn< (params: { sessionKey: string; cfg?: OpenClawConfig }) => AcpSessionStoreEntry | null >(() => null), })); const bindingServiceMocks = vi.hoisted(() => ({ listBySession: vi.fn<(sessionKey: string) => SessionBindingRecord[]>(() => []), })); vi.mock("../../acp/control-plane/manager.js", () => ({ getAcpSessionManager: () => managerMocks, })); vi.mock("../../acp/policy.js", () => ({ resolveAcpDispatchPolicyError: (cfg: OpenClawConfig) => policyMocks.resolveAcpDispatchPolicyError(cfg), resolveAcpAgentPolicyError: (cfg: OpenClawConfig, agent: string) => policyMocks.resolveAcpAgentPolicyError(cfg, agent), })); vi.mock("./route-reply.js", () => ({ routeReply: (params: unknown) => routeMocks.routeReply(params), })); vi.mock("../../infra/outbound/message-action-runner.js", () => ({ runMessageAction: (params: unknown) => messageActionMocks.runMessageAction(params), })); vi.mock("../../tts/tts.js", () => ({ maybeApplyTtsToPayload: (params: unknown) => ttsMocks.maybeApplyTtsToPayload(params), resolveTtsConfig: (cfg: OpenClawConfig) => ttsMocks.resolveTtsConfig(cfg), })); vi.mock("../../acp/runtime/session-meta.js", () => ({ readAcpSessionEntry: (params: { sessionKey: string; cfg?: OpenClawConfig }) => sessionMetaMocks.readAcpSessionEntry(params), })); vi.mock("../../infra/outbound/session-binding-service.js", () => ({ getSessionBindingService: () => ({ listBySession: (sessionKey: string) => bindingServiceMocks.listBySession(sessionKey), }), })); const { tryDispatchAcpReply } = await import("./dispatch-acp.js"); const sessionKey = "agent:codex-acp:session-1"; type MockTtsReply = Awaited>; function createDispatcher(): { dispatcher: ReplyDispatcher; counts: Record<"tool" | "block" | "final", number>; } { const counts = { tool: 0, block: 0, final: 0 }; const dispatcher: ReplyDispatcher = { sendToolResult: vi.fn(() => true), sendBlockReply: vi.fn(() => true), sendFinalReply: vi.fn(() => true), waitForIdle: vi.fn(async () => {}), getQueuedCounts: vi.fn(() => counts), markComplete: vi.fn(), }; return { dispatcher, counts }; } function setReadyAcpResolution() { managerMocks.resolveSession.mockReturnValue({ kind: "ready", sessionKey, meta: createAcpSessionMeta(), }); } function createAcpConfigWithVisibleToolTags(): OpenClawConfig { return createAcpTestConfig({ acp: { enabled: true, stream: { tagVisibility: { tool_call: true, tool_call_update: true, }, }, }, }); } async function runDispatch(params: { bodyForAgent: string; cfg?: OpenClawConfig; dispatcher?: ReplyDispatcher; shouldRouteToOriginating?: boolean; onReplyStart?: () => void; ctxOverrides?: Record; }) { return tryDispatchAcpReply({ ctx: buildTestCtx({ Provider: "discord", Surface: "discord", SessionKey: sessionKey, BodyForAgent: params.bodyForAgent, ...params.ctxOverrides, }), cfg: params.cfg ?? createAcpTestConfig(), dispatcher: params.dispatcher ?? createDispatcher().dispatcher, sessionKey, inboundAudio: false, shouldRouteToOriginating: params.shouldRouteToOriginating ?? false, ...(params.shouldRouteToOriginating ? { originatingChannel: "telegram", originatingTo: "telegram:thread-1" } : {}), shouldSendToolSummaries: true, bypassForCommand: false, ...(params.onReplyStart ? { onReplyStart: params.onReplyStart } : {}), recordProcessed: vi.fn(), markIdle: vi.fn(), }); } async function emitToolLifecycleEvents( onEvent: (event: unknown) => Promise, toolCallId: string, ) { await onEvent({ type: "tool_call", tag: "tool_call", toolCallId, status: "in_progress", title: "Run command", text: "Run command (in_progress)", }); await onEvent({ type: "tool_call", tag: "tool_call_update", toolCallId, status: "completed", title: "Run command", text: "Run command (completed)", }); await onEvent({ type: "done" }); } function mockToolLifecycleTurn(toolCallId: string) { managerMocks.runTurn.mockImplementation( async ({ onEvent }: { onEvent: (event: unknown) => Promise }) => { await emitToolLifecycleEvents(onEvent, toolCallId); }, ); } function mockVisibleTextTurn(text = "visible") { managerMocks.runTurn.mockImplementationOnce( async ({ onEvent }: { onEvent: (event: unknown) => Promise }) => { await onEvent({ type: "text_delta", text, tag: "agent_message_chunk" }); await onEvent({ type: "done" }); }, ); } function mockRoutedTextTurn(text: string) { managerMocks.runTurn.mockImplementation( async ({ onEvent }: { onEvent: (event: unknown) => Promise }) => { await onEvent({ type: "text_delta", text, tag: "agent_message_chunk" }); await onEvent({ type: "done" }); }, ); } async function dispatchVisibleTurn(onReplyStart: () => void) { await runDispatch({ bodyForAgent: "visible", dispatcher: createDispatcher().dispatcher, onReplyStart, }); } function queueTtsReplies(...replies: MockTtsReply[]) { for (const reply of replies) { ttsMocks.maybeApplyTtsToPayload.mockResolvedValueOnce(reply); } } async function runRoutedAcpTextTurn(text: string) { mockRoutedTextTurn(text); const { dispatcher } = createDispatcher(); const result = await runDispatch({ bodyForAgent: "run acp", dispatcher, shouldRouteToOriginating: true, }); return { result }; } function expectSecondRoutedPayload(payload: Partial) { expect(routeMocks.routeReply).toHaveBeenNthCalledWith( 2, expect.objectContaining({ payload: expect.objectContaining(payload), }), ); } describe("tryDispatchAcpReply", () => { beforeEach(() => { managerMocks.resolveSession.mockReset(); managerMocks.runTurn.mockReset(); managerMocks.getObservabilitySnapshot.mockReset(); managerMocks.getObservabilitySnapshot.mockReturnValue({ turns: { queueDepth: 0 }, runtimeCache: { activeSessions: 0 }, }); policyMocks.resolveAcpDispatchPolicyError.mockReset(); policyMocks.resolveAcpDispatchPolicyError.mockReturnValue(null); policyMocks.resolveAcpAgentPolicyError.mockReset(); policyMocks.resolveAcpAgentPolicyError.mockReturnValue(null); routeMocks.routeReply.mockReset(); routeMocks.routeReply.mockResolvedValue({ ok: true, messageId: "mock" }); messageActionMocks.runMessageAction.mockReset(); messageActionMocks.runMessageAction.mockResolvedValue({ ok: true as const }); ttsMocks.maybeApplyTtsToPayload.mockClear(); ttsMocks.resolveTtsConfig.mockReset(); ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" }); sessionMetaMocks.readAcpSessionEntry.mockReset(); sessionMetaMocks.readAcpSessionEntry.mockReturnValue(null); bindingServiceMocks.listBySession.mockReset(); bindingServiceMocks.listBySession.mockReturnValue([]); }); it("routes ACP block output to originating channel", async () => { setReadyAcpResolution(); mockRoutedTextTurn("hello"); const { dispatcher } = createDispatcher(); const result = await runDispatch({ bodyForAgent: "reply", dispatcher, shouldRouteToOriginating: true, }); expect(result?.counts.block).toBe(1); expect(routeMocks.routeReply).toHaveBeenCalledWith( expect.objectContaining({ channel: "telegram", to: "telegram:thread-1", }), ); expect(dispatcher.sendBlockReply).not.toHaveBeenCalled(); }); it("edits ACP tool lifecycle updates in place when supported", async () => { setReadyAcpResolution(); mockToolLifecycleTurn("call-1"); routeMocks.routeReply.mockResolvedValueOnce({ ok: true, messageId: "tool-msg-1" }); const { dispatcher } = createDispatcher(); await runDispatch({ bodyForAgent: "run tool", cfg: createAcpConfigWithVisibleToolTags(), dispatcher, shouldRouteToOriginating: true, }); expect(routeMocks.routeReply).toHaveBeenCalledTimes(1); expect(messageActionMocks.runMessageAction).toHaveBeenCalledWith( expect.objectContaining({ action: "edit", params: expect.objectContaining({ messageId: "tool-msg-1", }), }), ); }); it("falls back to new tool message when edit fails", async () => { setReadyAcpResolution(); mockToolLifecycleTurn("call-2"); routeMocks.routeReply .mockResolvedValueOnce({ ok: true, messageId: "tool-msg-2" }) .mockResolvedValueOnce({ ok: true, messageId: "tool-msg-2-fallback" }); messageActionMocks.runMessageAction.mockRejectedValueOnce(new Error("edit unsupported")); const { dispatcher } = createDispatcher(); await runDispatch({ bodyForAgent: "run tool", cfg: createAcpConfigWithVisibleToolTags(), dispatcher, shouldRouteToOriginating: true, }); expect(messageActionMocks.runMessageAction).toHaveBeenCalledTimes(1); expect(routeMocks.routeReply).toHaveBeenCalledTimes(2); }); it("starts reply lifecycle when ACP turn starts, including hidden-only turns", async () => { setReadyAcpResolution(); const onReplyStart = vi.fn(); const { dispatcher } = createDispatcher(); managerMocks.runTurn.mockImplementationOnce( async ({ onEvent }: { onEvent: (event: unknown) => Promise }) => { await onEvent({ type: "status", tag: "usage_update", text: "usage updated: 1/100", used: 1, size: 100, }); await onEvent({ type: "done" }); }, ); await runDispatch({ bodyForAgent: "hidden", dispatcher, onReplyStart, }); expect(onReplyStart).toHaveBeenCalledTimes(1); mockVisibleTextTurn(); await dispatchVisibleTurn(onReplyStart); expect(onReplyStart).toHaveBeenCalledTimes(2); }); it("starts reply lifecycle once per turn when output is delivered", async () => { setReadyAcpResolution(); const onReplyStart = vi.fn(); mockVisibleTextTurn(); await dispatchVisibleTurn(onReplyStart); expect(onReplyStart).toHaveBeenCalledTimes(1); }); it("does not start reply lifecycle for empty ACP prompt", async () => { setReadyAcpResolution(); const onReplyStart = vi.fn(); const { dispatcher } = createDispatcher(); await runDispatch({ bodyForAgent: " ", dispatcher, onReplyStart, }); expect(managerMocks.runTurn).not.toHaveBeenCalled(); expect(onReplyStart).not.toHaveBeenCalled(); }); it("forwards normalized image attachments into ACP turns", async () => { setReadyAcpResolution(); const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "dispatch-acp-")); const imagePath = path.join(tempDir, "inbound.png"); try { await fs.writeFile(imagePath, "image-bytes"); managerMocks.runTurn.mockResolvedValue(undefined); await runDispatch({ bodyForAgent: " ", ctxOverrides: { MediaPath: imagePath, MediaType: "image/png", }, }); expect(managerMocks.runTurn).toHaveBeenCalledWith( expect.objectContaining({ text: "", attachments: [ { mediaType: "image/png", data: Buffer.from("image-bytes").toString("base64"), }, ], }), ); } finally { await fs.rm(tempDir, { recursive: true, force: true }); } }); it("skips ACP turns for non-image attachments when there is no text prompt", async () => { setReadyAcpResolution(); const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "dispatch-acp-")); const docPath = path.join(tempDir, "inbound.pdf"); const { dispatcher } = createDispatcher(); const onReplyStart = vi.fn(); try { await fs.writeFile(docPath, "pdf-bytes"); await runDispatch({ bodyForAgent: " ", dispatcher, onReplyStart, ctxOverrides: { MediaPath: docPath, MediaType: "application/pdf", }, }); expect(managerMocks.runTurn).not.toHaveBeenCalled(); expect(onReplyStart).not.toHaveBeenCalled(); } finally { await fs.rm(tempDir, { recursive: true, force: true }); } }); it("surfaces ACP policy errors as final error replies", async () => { setReadyAcpResolution(); policyMocks.resolveAcpDispatchPolicyError.mockReturnValue( new AcpRuntimeError("ACP_DISPATCH_DISABLED", "ACP dispatch is disabled by policy."), ); const { dispatcher } = createDispatcher(); await runDispatch({ bodyForAgent: "test", dispatcher, }); expect(managerMocks.runTurn).not.toHaveBeenCalled(); expect(dispatcher.sendFinalReply).toHaveBeenCalledWith( expect.objectContaining({ text: expect.stringContaining("ACP_DISPATCH_DISABLED"), }), ); }); it("delivers final fallback text even when routed block text already existed", async () => { setReadyAcpResolution(); ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" }); queueTtsReplies({ text: "CODEX_OK" }, {} as ReturnType); const { result } = await runRoutedAcpTextTurn("CODEX_OK"); expect(result?.counts.block).toBe(1); expect(result?.counts.final).toBe(1); expect(routeMocks.routeReply).toHaveBeenCalledTimes(2); expectSecondRoutedPayload({ text: "CODEX_OK" }); }); it("does not add text fallback when final TTS already delivered audio", async () => { setReadyAcpResolution(); ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" }); queueTtsReplies({ text: "Task completed" }, { mediaUrl: "https://example.com/final.mp3", audioAsVoice: true, } as MockTtsReply); const { result } = await runRoutedAcpTextTurn("Task completed"); expect(result?.counts.block).toBe(1); expect(result?.counts.final).toBe(1); expect(routeMocks.routeReply).toHaveBeenCalledTimes(2); expectSecondRoutedPayload({ mediaUrl: "https://example.com/final.mp3", audioAsVoice: true, }); }); it("skips fallback when TTS mode is all (blocks already processed with TTS)", async () => { setReadyAcpResolution(); ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "all" }); const { result } = await runRoutedAcpTextTurn("Response"); expect(result?.counts.block).toBe(1); expect(result?.counts.final).toBe(0); expect(routeMocks.routeReply).toHaveBeenCalledTimes(1); }); it("skips final TTS and fallback when no block text was accumulated", async () => { setReadyAcpResolution(); ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" }); managerMocks.runTurn.mockImplementation( async ({ onEvent }: { onEvent: (event: unknown) => Promise }) => { await onEvent({ type: "done" }); }, ); const { dispatcher } = createDispatcher(); const result = await runDispatch({ bodyForAgent: "run acp", dispatcher, shouldRouteToOriginating: true, }); expect(result?.counts.block).toBe(0); expect(result?.counts.final).toBe(0); expect(routeMocks.routeReply).not.toHaveBeenCalled(); expect(ttsMocks.maybeApplyTtsToPayload).not.toHaveBeenCalled(); }); });