diff --git a/src/auto-reply/reply/dispatch-acp-attachments.ts b/src/auto-reply/reply/dispatch-acp-attachments.ts new file mode 100644 index 00000000000..4863935be3f --- /dev/null +++ b/src/auto-reply/reply/dispatch-acp-attachments.ts @@ -0,0 +1,74 @@ +import type { AcpTurnAttachment } from "../../acp/control-plane/manager.types.js"; +import type { OpenClawConfig } from "../../config/config.js"; +import { logVerbose } from "../../globals.js"; +import type { FinalizedMsgContext } from "../templating.js"; + +let dispatchAcpMediaRuntimePromise: Promise< + typeof import("./dispatch-acp-media.runtime.js") +> | null = null; + +export function loadDispatchAcpMediaRuntime() { + dispatchAcpMediaRuntimePromise ??= import("./dispatch-acp-media.runtime.js"); + return dispatchAcpMediaRuntimePromise; +} + +export type DispatchAcpAttachmentRuntime = Pick< + Awaited>, + | "MediaAttachmentCache" + | "isMediaUnderstandingSkipError" + | "normalizeAttachments" + | "resolveMediaAttachmentLocalRoots" +>; + +const ACP_ATTACHMENT_MAX_BYTES = 10 * 1024 * 1024; +const ACP_ATTACHMENT_TIMEOUT_MS = 1_000; + +export async function resolveAcpAttachments(params: { + ctx: FinalizedMsgContext; + cfg: OpenClawConfig; + runtime?: DispatchAcpAttachmentRuntime; +}): Promise { + const runtime = params.runtime ?? (await loadDispatchAcpMediaRuntime()); + const mediaAttachments = runtime + .normalizeAttachments(params.ctx) + .map((attachment) => + attachment.path?.trim() ? { ...attachment, url: undefined } : attachment, + ); + const cache = new runtime.MediaAttachmentCache(mediaAttachments, { + localPathRoots: runtime.resolveMediaAttachmentLocalRoots({ + cfg: params.cfg, + ctx: params.ctx, + }), + }); + const results: AcpTurnAttachment[] = []; + for (const attachment of mediaAttachments) { + const mediaType = attachment.mime ?? "application/octet-stream"; + if (!mediaType.startsWith("image/")) { + continue; + } + if (!attachment.path?.trim()) { + continue; + } + try { + const { buffer } = await cache.getBuffer({ + attachmentIndex: attachment.index, + maxBytes: ACP_ATTACHMENT_MAX_BYTES, + timeoutMs: ACP_ATTACHMENT_TIMEOUT_MS, + }); + results.push({ + mediaType, + data: buffer.toString("base64"), + }); + } catch (error) { + if (runtime.isMediaUnderstandingSkipError(error)) { + logVerbose(`dispatch-acp: skipping attachment #${attachment.index + 1} (${error.reason})`); + } else { + const errorName = error instanceof Error ? error.name : typeof error; + logVerbose( + `dispatch-acp: failed to read attachment #${attachment.index + 1} (${errorName})`, + ); + } + } + } + return results; +} diff --git a/src/auto-reply/reply/dispatch-acp.test.ts b/src/auto-reply/reply/dispatch-acp.test.ts index 88f03672fac..f8d8312b759 100644 --- a/src/auto-reply/reply/dispatch-acp.test.ts +++ b/src/auto-reply/reply/dispatch-acp.test.ts @@ -6,7 +6,9 @@ import { AcpRuntimeError } from "../../acp/runtime/errors.js"; import type { AcpSessionStoreEntry } from "../../acp/runtime/session-meta.js"; import type { OpenClawConfig } from "../../config/config.js"; import type { SessionBindingRecord } from "../../infra/outbound/session-binding-service.js"; +import type { MediaUnderstandingSkipError } from "../../media-understanding/errors.js"; import { withFetchPreconnect } from "../../test-utils/fetch-mock.js"; +import { resolveAcpAttachments } from "./dispatch-acp-attachments.js"; import type { ReplyDispatcher } from "./reply-dispatcher.js"; import { buildTestCtx } from "./test-ctx.js"; import { createAcpSessionMeta, createAcpTestConfig } from "./test-fixtures/acp-runtime.js"; @@ -277,6 +279,11 @@ describe("tryDispatchAcpReply", () => { ({ tryDispatchAcpReply } = await import("./dispatch-acp.js")); managerMocks.resolveSession.mockReset(); managerMocks.runTurn.mockReset(); + managerMocks.runTurn.mockImplementation( + async ({ onEvent }: { onEvent?: (event: unknown) => Promise }) => { + await onEvent?.({ type: "done" }); + }, + ); managerMocks.getObservabilitySnapshot.mockReset(); managerMocks.getObservabilitySnapshot.mockReturnValue({ turns: { queueDepth: 0 }, @@ -435,15 +442,11 @@ describe("tryDispatchAcpReply", () => { }); it("forwards normalized image attachments into ACP turns", async () => { - setReadyAcpResolution(); const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "dispatch-acp-")); const imagePath = path.join(tempDir, "inbound.png"); try { await fs.writeFile(imagePath, "image-bytes"); - managerMocks.runTurn.mockResolvedValue(undefined); - - await runDispatch({ - bodyForAgent: " ", + const attachments = await resolveAcpAttachments({ cfg: createAcpTestConfig({ channels: { imessage: { @@ -451,23 +454,42 @@ describe("tryDispatchAcpReply", () => { }, }, }), - ctxOverrides: { + ctx: buildTestCtx({ + Provider: "imessage", + Surface: "imessage", MediaPath: imagePath, MediaType: "image/png", + }), + runtime: { + MediaAttachmentCache: class { + async getBuffer() { + return { + buffer: Buffer.from("image-bytes"), + mime: "image/png", + fileName: "inbound.png", + size: "image-bytes".length, + }; + } + } as unknown as typeof import("./dispatch-acp-media.runtime.js").MediaAttachmentCache, + isMediaUnderstandingSkipError: (_error: unknown): _error is MediaUnderstandingSkipError => + false, + normalizeAttachments: (ctx) => [ + { + path: ctx.MediaPath, + mime: ctx.MediaType, + index: 0, + }, + ], + resolveMediaAttachmentLocalRoots: () => [tempDir], }, }); - expect(managerMocks.runTurn).toHaveBeenCalledWith( - expect.objectContaining({ - text: "", - attachments: [ - { - mediaType: "image/png", - data: Buffer.from("image-bytes").toString("base64"), - }, - ], - }), - ); + expect(attachments).toEqual([ + { + mediaType: "image/png", + data: Buffer.from("image-bytes").toString("base64"), + }, + ]); } finally { await fs.rm(tempDir, { recursive: true, force: true }); } diff --git a/src/auto-reply/reply/dispatch-acp.ts b/src/auto-reply/reply/dispatch-acp.ts index e9484e9aa50..926170cab12 100644 --- a/src/auto-reply/reply/dispatch-acp.ts +++ b/src/auto-reply/reply/dispatch-acp.ts @@ -1,4 +1,3 @@ -import type { AcpTurnAttachment } from "../../acp/control-plane/manager.types.js"; import { resolveAcpAgentPolicyError, resolveAcpDispatchPolicyError } from "../../acp/policy.js"; import { formatAcpRuntimeErrorText } from "../../acp/runtime/error-text.js"; import { toAcpRuntimeError } from "../../acp/runtime/errors.js"; @@ -18,15 +17,13 @@ import { resolveStatusTtsSnapshot } from "../../tts/status-config.js"; import { resolveConfiguredTtsMode } from "../../tts/tts-config.js"; import type { FinalizedMsgContext } from "../templating.js"; import { createAcpReplyProjector } from "./acp-projector.js"; +import { loadDispatchAcpMediaRuntime, resolveAcpAttachments } from "./dispatch-acp-attachments.js"; import { createAcpDispatchDeliveryCoordinator, type AcpDispatchDeliveryCoordinator, } from "./dispatch-acp-delivery.js"; import type { ReplyDispatcher, ReplyDispatchKind } from "./reply-dispatcher.js"; -let dispatchAcpMediaRuntimePromise: Promise< - typeof import("./dispatch-acp-media.runtime.js") -> | null = null; let dispatchAcpManagerRuntimePromise: Promise< typeof import("./dispatch-acp-manager.runtime.js") > | null = null; @@ -36,11 +33,6 @@ let dispatchAcpSessionRuntimePromise: Promise< let dispatchAcpTtsRuntimePromise: Promise | null = null; -function loadDispatchAcpMediaRuntime() { - dispatchAcpMediaRuntimePromise ??= import("./dispatch-acp-media.runtime.js"); - return dispatchAcpMediaRuntimePromise; -} - function loadDispatchAcpManagerRuntime() { dispatchAcpManagerRuntimePromise ??= import("./dispatch-acp-manager.runtime.js"); return dispatchAcpManagerRuntimePromise; @@ -99,62 +91,6 @@ function hasInboundMediaForAcp(ctx: FinalizedMsgContext): boolean { ); } -const ACP_ATTACHMENT_MAX_BYTES = 10 * 1024 * 1024; -const ACP_ATTACHMENT_TIMEOUT_MS = 1_000; - -async function resolveAcpAttachments( - ctx: FinalizedMsgContext, - cfg: OpenClawConfig, -): Promise { - if (!hasInboundMediaForAcp(ctx)) { - return []; - } - const { - MediaAttachmentCache, - isMediaUnderstandingSkipError, - normalizeAttachments, - resolveMediaAttachmentLocalRoots, - } = await loadDispatchAcpMediaRuntime(); - const mediaAttachments = normalizeAttachments(ctx).map((attachment) => - attachment.path?.trim() ? { ...attachment, url: undefined } : attachment, - ); - const cache = new MediaAttachmentCache(mediaAttachments, { - localPathRoots: resolveMediaAttachmentLocalRoots({ cfg, ctx }), - }); - const results: AcpTurnAttachment[] = []; - for (const attachment of mediaAttachments) { - const mediaType = attachment.mime ?? "application/octet-stream"; - if (!mediaType.startsWith("image/")) { - continue; - } - if (!attachment.path?.trim()) { - continue; - } - try { - const { buffer } = await cache.getBuffer({ - attachmentIndex: attachment.index, - maxBytes: ACP_ATTACHMENT_MAX_BYTES, - timeoutMs: ACP_ATTACHMENT_TIMEOUT_MS, - }); - results.push({ - mediaType, - data: buffer.toString("base64"), - }); - } catch (error) { - if (isMediaUnderstandingSkipError(error)) { - logVerbose(`dispatch-acp: skipping attachment #${attachment.index + 1} (${error.reason})`); - } else { - const errorName = error instanceof Error ? error.name : typeof error; - logVerbose( - `dispatch-acp: failed to read attachment #${attachment.index + 1} (${errorName})`, - ); - } - // Skip unreadable files. Text content should still be delivered. - } - } - return results; -} - function resolveAcpRequestId(ctx: FinalizedMsgContext): string { const id = ctx.MessageSidFull ?? ctx.MessageSid ?? ctx.MessageSidFirst ?? ctx.MessageSidLast; if (typeof id === "string" && id.trim()) { @@ -492,7 +428,9 @@ export async function tryDispatchAcpReply(params: { } const promptText = resolveAcpPromptText(params.ctx); - const attachments = await resolveAcpAttachments(params.ctx, params.cfg); + const attachments = hasInboundMediaForAcp(params.ctx) + ? await resolveAcpAttachments({ ctx: params.ctx, cfg: params.cfg }) + : []; if (!promptText && attachments.length === 0) { const counts = params.dispatcher.getQueuedCounts(); delivery.applyRoutedCounts(counts);