test: split ACP attachment resolution from dispatch flow

This commit is contained in:
Peter Steinberger 2026-04-05 18:50:57 +01:00
parent b43d73b633
commit dcfc1f16ed
No known key found for this signature in database
3 changed files with 117 additions and 83 deletions

View File

@ -0,0 +1,74 @@
import type { AcpTurnAttachment } from "../../acp/control-plane/manager.types.js";
import type { OpenClawConfig } from "../../config/config.js";
import { logVerbose } from "../../globals.js";
import type { FinalizedMsgContext } from "../templating.js";
let dispatchAcpMediaRuntimePromise: Promise<
typeof import("./dispatch-acp-media.runtime.js")
> | null = null;
export function loadDispatchAcpMediaRuntime() {
dispatchAcpMediaRuntimePromise ??= import("./dispatch-acp-media.runtime.js");
return dispatchAcpMediaRuntimePromise;
}
export type DispatchAcpAttachmentRuntime = Pick<
Awaited<ReturnType<typeof loadDispatchAcpMediaRuntime>>,
| "MediaAttachmentCache"
| "isMediaUnderstandingSkipError"
| "normalizeAttachments"
| "resolveMediaAttachmentLocalRoots"
>;
const ACP_ATTACHMENT_MAX_BYTES = 10 * 1024 * 1024;
const ACP_ATTACHMENT_TIMEOUT_MS = 1_000;
export async function resolveAcpAttachments(params: {
ctx: FinalizedMsgContext;
cfg: OpenClawConfig;
runtime?: DispatchAcpAttachmentRuntime;
}): Promise<AcpTurnAttachment[]> {
const runtime = params.runtime ?? (await loadDispatchAcpMediaRuntime());
const mediaAttachments = runtime
.normalizeAttachments(params.ctx)
.map((attachment) =>
attachment.path?.trim() ? { ...attachment, url: undefined } : attachment,
);
const cache = new runtime.MediaAttachmentCache(mediaAttachments, {
localPathRoots: runtime.resolveMediaAttachmentLocalRoots({
cfg: params.cfg,
ctx: params.ctx,
}),
});
const results: AcpTurnAttachment[] = [];
for (const attachment of mediaAttachments) {
const mediaType = attachment.mime ?? "application/octet-stream";
if (!mediaType.startsWith("image/")) {
continue;
}
if (!attachment.path?.trim()) {
continue;
}
try {
const { buffer } = await cache.getBuffer({
attachmentIndex: attachment.index,
maxBytes: ACP_ATTACHMENT_MAX_BYTES,
timeoutMs: ACP_ATTACHMENT_TIMEOUT_MS,
});
results.push({
mediaType,
data: buffer.toString("base64"),
});
} catch (error) {
if (runtime.isMediaUnderstandingSkipError(error)) {
logVerbose(`dispatch-acp: skipping attachment #${attachment.index + 1} (${error.reason})`);
} else {
const errorName = error instanceof Error ? error.name : typeof error;
logVerbose(
`dispatch-acp: failed to read attachment #${attachment.index + 1} (${errorName})`,
);
}
}
}
return results;
}

View File

@ -6,7 +6,9 @@ import { AcpRuntimeError } from "../../acp/runtime/errors.js";
import type { AcpSessionStoreEntry } from "../../acp/runtime/session-meta.js";
import type { OpenClawConfig } from "../../config/config.js";
import type { SessionBindingRecord } from "../../infra/outbound/session-binding-service.js";
import type { MediaUnderstandingSkipError } from "../../media-understanding/errors.js";
import { withFetchPreconnect } from "../../test-utils/fetch-mock.js";
import { resolveAcpAttachments } from "./dispatch-acp-attachments.js";
import type { ReplyDispatcher } from "./reply-dispatcher.js";
import { buildTestCtx } from "./test-ctx.js";
import { createAcpSessionMeta, createAcpTestConfig } from "./test-fixtures/acp-runtime.js";
@ -277,6 +279,11 @@ describe("tryDispatchAcpReply", () => {
({ tryDispatchAcpReply } = await import("./dispatch-acp.js"));
managerMocks.resolveSession.mockReset();
managerMocks.runTurn.mockReset();
managerMocks.runTurn.mockImplementation(
async ({ onEvent }: { onEvent?: (event: unknown) => Promise<void> }) => {
await onEvent?.({ type: "done" });
},
);
managerMocks.getObservabilitySnapshot.mockReset();
managerMocks.getObservabilitySnapshot.mockReturnValue({
turns: { queueDepth: 0 },
@ -435,15 +442,11 @@ describe("tryDispatchAcpReply", () => {
});
it("forwards normalized image attachments into ACP turns", async () => {
setReadyAcpResolution();
const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "dispatch-acp-"));
const imagePath = path.join(tempDir, "inbound.png");
try {
await fs.writeFile(imagePath, "image-bytes");
managerMocks.runTurn.mockResolvedValue(undefined);
await runDispatch({
bodyForAgent: " ",
const attachments = await resolveAcpAttachments({
cfg: createAcpTestConfig({
channels: {
imessage: {
@ -451,23 +454,42 @@ describe("tryDispatchAcpReply", () => {
},
},
}),
ctxOverrides: {
ctx: buildTestCtx({
Provider: "imessage",
Surface: "imessage",
MediaPath: imagePath,
MediaType: "image/png",
}),
runtime: {
MediaAttachmentCache: class {
async getBuffer() {
return {
buffer: Buffer.from("image-bytes"),
mime: "image/png",
fileName: "inbound.png",
size: "image-bytes".length,
};
}
} as unknown as typeof import("./dispatch-acp-media.runtime.js").MediaAttachmentCache,
isMediaUnderstandingSkipError: (_error: unknown): _error is MediaUnderstandingSkipError =>
false,
normalizeAttachments: (ctx) => [
{
path: ctx.MediaPath,
mime: ctx.MediaType,
index: 0,
},
],
resolveMediaAttachmentLocalRoots: () => [tempDir],
},
});
expect(managerMocks.runTurn).toHaveBeenCalledWith(
expect.objectContaining({
text: "",
attachments: [
{
mediaType: "image/png",
data: Buffer.from("image-bytes").toString("base64"),
},
],
}),
);
expect(attachments).toEqual([
{
mediaType: "image/png",
data: Buffer.from("image-bytes").toString("base64"),
},
]);
} finally {
await fs.rm(tempDir, { recursive: true, force: true });
}

View File

@ -1,4 +1,3 @@
import type { AcpTurnAttachment } from "../../acp/control-plane/manager.types.js";
import { resolveAcpAgentPolicyError, resolveAcpDispatchPolicyError } from "../../acp/policy.js";
import { formatAcpRuntimeErrorText } from "../../acp/runtime/error-text.js";
import { toAcpRuntimeError } from "../../acp/runtime/errors.js";
@ -18,15 +17,13 @@ import { resolveStatusTtsSnapshot } from "../../tts/status-config.js";
import { resolveConfiguredTtsMode } from "../../tts/tts-config.js";
import type { FinalizedMsgContext } from "../templating.js";
import { createAcpReplyProjector } from "./acp-projector.js";
import { loadDispatchAcpMediaRuntime, resolveAcpAttachments } from "./dispatch-acp-attachments.js";
import {
createAcpDispatchDeliveryCoordinator,
type AcpDispatchDeliveryCoordinator,
} from "./dispatch-acp-delivery.js";
import type { ReplyDispatcher, ReplyDispatchKind } from "./reply-dispatcher.js";
let dispatchAcpMediaRuntimePromise: Promise<
typeof import("./dispatch-acp-media.runtime.js")
> | null = null;
let dispatchAcpManagerRuntimePromise: Promise<
typeof import("./dispatch-acp-manager.runtime.js")
> | null = null;
@ -36,11 +33,6 @@ let dispatchAcpSessionRuntimePromise: Promise<
let dispatchAcpTtsRuntimePromise: Promise<typeof import("./dispatch-acp-tts.runtime.js")> | null =
null;
function loadDispatchAcpMediaRuntime() {
dispatchAcpMediaRuntimePromise ??= import("./dispatch-acp-media.runtime.js");
return dispatchAcpMediaRuntimePromise;
}
function loadDispatchAcpManagerRuntime() {
dispatchAcpManagerRuntimePromise ??= import("./dispatch-acp-manager.runtime.js");
return dispatchAcpManagerRuntimePromise;
@ -99,62 +91,6 @@ function hasInboundMediaForAcp(ctx: FinalizedMsgContext): boolean {
);
}
const ACP_ATTACHMENT_MAX_BYTES = 10 * 1024 * 1024;
const ACP_ATTACHMENT_TIMEOUT_MS = 1_000;
async function resolveAcpAttachments(
ctx: FinalizedMsgContext,
cfg: OpenClawConfig,
): Promise<AcpTurnAttachment[]> {
if (!hasInboundMediaForAcp(ctx)) {
return [];
}
const {
MediaAttachmentCache,
isMediaUnderstandingSkipError,
normalizeAttachments,
resolveMediaAttachmentLocalRoots,
} = await loadDispatchAcpMediaRuntime();
const mediaAttachments = normalizeAttachments(ctx).map((attachment) =>
attachment.path?.trim() ? { ...attachment, url: undefined } : attachment,
);
const cache = new MediaAttachmentCache(mediaAttachments, {
localPathRoots: resolveMediaAttachmentLocalRoots({ cfg, ctx }),
});
const results: AcpTurnAttachment[] = [];
for (const attachment of mediaAttachments) {
const mediaType = attachment.mime ?? "application/octet-stream";
if (!mediaType.startsWith("image/")) {
continue;
}
if (!attachment.path?.trim()) {
continue;
}
try {
const { buffer } = await cache.getBuffer({
attachmentIndex: attachment.index,
maxBytes: ACP_ATTACHMENT_MAX_BYTES,
timeoutMs: ACP_ATTACHMENT_TIMEOUT_MS,
});
results.push({
mediaType,
data: buffer.toString("base64"),
});
} catch (error) {
if (isMediaUnderstandingSkipError(error)) {
logVerbose(`dispatch-acp: skipping attachment #${attachment.index + 1} (${error.reason})`);
} else {
const errorName = error instanceof Error ? error.name : typeof error;
logVerbose(
`dispatch-acp: failed to read attachment #${attachment.index + 1} (${errorName})`,
);
}
// Skip unreadable files. Text content should still be delivered.
}
}
return results;
}
function resolveAcpRequestId(ctx: FinalizedMsgContext): string {
const id = ctx.MessageSidFull ?? ctx.MessageSid ?? ctx.MessageSidFirst ?? ctx.MessageSidLast;
if (typeof id === "string" && id.trim()) {
@ -492,7 +428,9 @@ export async function tryDispatchAcpReply(params: {
}
const promptText = resolveAcpPromptText(params.ctx);
const attachments = await resolveAcpAttachments(params.ctx, params.cfg);
const attachments = hasInboundMediaForAcp(params.ctx)
? await resolveAcpAttachments({ ctx: params.ctx, cfg: params.cfg })
: [];
if (!promptText && attachments.length === 0) {
const counts = params.dispatcher.getQueuedCounts();
delivery.applyRoutedCounts(counts);