fix(agents): harden claude cli parsing and queueing

This commit is contained in:
Peter Steinberger 2026-04-04 13:59:53 +09:00
parent 4ed17fd987
commit bc8048250e
No known key found for this signature in database
7 changed files with 303 additions and 56 deletions

View File

@ -158,6 +158,12 @@ The provider id becomes the left side of your model ref:
- `existing`: only send a session id if one was stored before.
- `none`: never send a session id.
Serialization notes:
- `serialize: true` keeps same-lane runs ordered.
- Most CLIs serialize on one provider lane.
- `claude-cli` is narrower: resumed runs serialize per Claude session id, and fresh runs serialize per workspace path. Independent workspaces can run in parallel.
## Images (pass-through)
If your CLI accepts image paths, set `imageArg`:

View File

@ -117,7 +117,7 @@ describe("runClaudeCliAgent", () => {
expect(spawnInput.argv).toContain("hi");
});
it("serializes concurrent claude-cli runs", async () => {
it("serializes concurrent claude-cli runs in the same workspace", async () => {
const firstDeferred = createDeferred<ReturnType<typeof successExit>>();
const secondDeferred = createDeferred<ReturnType<typeof successExit>>();
@ -155,4 +155,40 @@ describe("runClaudeCliAgent", () => {
await Promise.all([firstRun, secondRun]);
});
it("allows concurrent claude-cli runs across different workspaces", async () => {
const firstDeferred = createDeferred<ReturnType<typeof successExit>>();
const secondDeferred = createDeferred<ReturnType<typeof successExit>>();
supervisorSpawnMock
.mockResolvedValueOnce(createManagedRun(firstDeferred.promise))
.mockResolvedValueOnce(createManagedRun(secondDeferred.promise));
const firstRun = runClaudeCliAgent({
sessionId: "s1",
sessionFile: "/tmp/session-1.jsonl",
workspaceDir: "/tmp/project-a",
prompt: "first",
model: "opus",
timeoutMs: 1_000,
runId: "run-a",
});
const secondRun = runClaudeCliAgent({
sessionId: "s2",
sessionFile: "/tmp/session-2.jsonl",
workspaceDir: "/tmp/project-b",
prompt: "second",
model: "opus",
timeoutMs: 1_000,
runId: "run-b",
});
await waitForCalls(supervisorSpawnMock, 2);
firstDeferred.resolve(successExit({ message: "ok", session_id: "sid-a" }));
secondDeferred.resolve(successExit({ message: "ok", session_id: "sid-b" }));
await Promise.all([firstRun, secondRun]);
});
});

View File

@ -1,5 +1,34 @@
import { describe, expect, it } from "vitest";
import { parseCliJsonl } from "./cli-output.js";
import { parseCliJson, parseCliJsonl } from "./cli-output.js";
describe("parseCliJson", () => {
it("recovers mixed-output Claude session metadata from embedded JSON objects", () => {
const result = parseCliJson(
[
"Claude Code starting...",
'{"type":"init","session_id":"session-789"}',
'{"type":"result","result":"Claude says hi","usage":{"input_tokens":9,"output_tokens":4}}',
].join("\n"),
{
command: "claude",
output: "json",
sessionIdFields: ["session_id"],
},
);
expect(result).toEqual({
text: "Claude says hi",
sessionId: "session-789",
usage: {
input: 9,
output: 4,
cacheRead: undefined,
cacheWrite: undefined,
total: undefined,
},
});
});
});
describe("parseCliJsonl", () => {
it("parses Claude stream-json result events", () => {
@ -72,4 +101,22 @@ describe("parseCliJsonl", () => {
},
});
});
it("parses multiple JSON objects embedded on the same line", () => {
const result = parseCliJsonl(
'{"type":"init","session_id":"session-999"} {"type":"result","session_id":"session-999","result":"done"}',
{
command: "claude",
output: "jsonl",
sessionIdFields: ["session_id"],
},
"claude-cli",
);
expect(result).toEqual({
text: "done",
sessionId: "session-999",
usage: undefined,
});
});
});

View File

@ -16,6 +16,82 @@ export type CliOutput = {
usage?: CliUsage;
};
function extractJsonObjectCandidates(raw: string): string[] {
const candidates: string[] = [];
let depth = 0;
let start = -1;
let inString = false;
let escaped = false;
for (let index = 0; index < raw.length; index += 1) {
const char = raw[index] ?? "";
if (escaped) {
escaped = false;
continue;
}
if (char === "\\") {
if (inString) {
escaped = true;
}
continue;
}
if (char === '"') {
inString = !inString;
continue;
}
if (inString) {
continue;
}
if (char === "{") {
if (depth === 0) {
start = index;
}
depth += 1;
continue;
}
if (char === "}" && depth > 0) {
depth -= 1;
if (depth === 0 && start >= 0) {
candidates.push(raw.slice(start, index + 1));
start = -1;
}
}
}
return candidates;
}
function parseJsonRecordCandidates(raw: string): Record<string, unknown>[] {
const parsedRecords: Record<string, unknown>[] = [];
const trimmed = raw.trim();
if (!trimmed) {
return parsedRecords;
}
try {
const parsed = JSON.parse(trimmed);
if (isRecord(parsed)) {
parsedRecords.push(parsed);
return parsedRecords;
}
} catch {
// Fall back to scanning for top-level JSON objects embedded in mixed output.
}
for (const candidate of extractJsonObjectCandidates(trimmed)) {
try {
const parsed = JSON.parse(candidate);
if (isRecord(parsed)) {
parsedRecords.push(parsed);
}
} catch {
// Ignore malformed fragments and keep scanning remaining objects.
}
}
return parsedRecords;
}
function toCliUsage(raw: Record<string, unknown>): CliUsage | undefined {
const pick = (key: string) =>
typeof raw[key] === "number" && raw[key] > 0 ? raw[key] : undefined;
@ -79,27 +155,40 @@ function pickCliSessionId(
}
export function parseCliJson(raw: string, backend: CliBackendConfig): CliOutput | null {
const trimmed = raw.trim();
if (!trimmed) {
const parsedRecords = parseJsonRecordCandidates(raw);
if (parsedRecords.length === 0) {
return null;
}
let parsed: unknown;
try {
parsed = JSON.parse(trimmed);
} catch {
let sessionId: string | undefined;
let usage: CliUsage | undefined;
let text = "";
let sawStructuredOutput = false;
for (const parsed of parsedRecords) {
sessionId = pickCliSessionId(parsed, backend) ?? sessionId;
if (isRecord(parsed.usage)) {
usage = toCliUsage(parsed.usage) ?? usage;
}
const nextText =
collectCliText(parsed.message) ||
collectCliText(parsed.content) ||
collectCliText(parsed.result) ||
collectCliText(parsed);
const trimmedText = nextText.trim();
if (trimmedText) {
text = trimmedText;
sawStructuredOutput = true;
continue;
}
if (sessionId || usage) {
sawStructuredOutput = true;
}
}
if (!text && !sawStructuredOutput) {
return null;
}
if (!isRecord(parsed)) {
return null;
}
const sessionId = pickCliSessionId(parsed, backend);
const usage = isRecord(parsed.usage) ? toCliUsage(parsed.usage) : undefined;
const text =
collectCliText(parsed.message) ||
collectCliText(parsed.content) ||
collectCliText(parsed.result) ||
collectCliText(parsed);
return { text: text.trim(), sessionId, usage };
return { text, sessionId, usage };
}
function parseClaudeCliJsonlResult(params: {
@ -143,40 +232,33 @@ export function parseCliJsonl(
let usage: CliUsage | undefined;
const texts: string[] = [];
for (const line of lines) {
let parsed: unknown;
try {
parsed = JSON.parse(line);
} catch {
continue;
}
if (!isRecord(parsed)) {
continue;
}
if (!sessionId) {
sessionId = pickCliSessionId(parsed, backend);
}
if (!sessionId && typeof parsed.thread_id === "string") {
sessionId = parsed.thread_id.trim();
}
if (isRecord(parsed.usage)) {
usage = toCliUsage(parsed.usage) ?? usage;
}
for (const parsed of parseJsonRecordCandidates(line)) {
if (!sessionId) {
sessionId = pickCliSessionId(parsed, backend);
}
if (!sessionId && typeof parsed.thread_id === "string") {
sessionId = parsed.thread_id.trim();
}
if (isRecord(parsed.usage)) {
usage = toCliUsage(parsed.usage) ?? usage;
}
const claudeResult = parseClaudeCliJsonlResult({
providerId,
parsed,
sessionId,
usage,
});
if (claudeResult) {
return claudeResult;
}
const claudeResult = parseClaudeCliJsonlResult({
providerId,
parsed,
sessionId,
usage,
});
if (claudeResult) {
return claudeResult;
}
const item = isRecord(parsed.item) ? parsed.item : null;
if (item && typeof item.text === "string") {
const type = typeof item.type === "string" ? item.type.toLowerCase() : "";
if (!type || type.includes("message")) {
texts.push(item.text);
const item = isRecord(parsed.item) ? parsed.item : null;
if (item && typeof item.text === "string") {
const type = typeof item.type === "string" ? item.type.toLowerCase() : "";
if (!type || type.includes("message")) {
texts.push(item.text);
}
}
}
}

View File

@ -1,7 +1,7 @@
import type { ImageContent } from "@mariozechner/pi-ai";
import { beforeEach, describe, expect, it, vi } from "vitest";
import { MAX_IMAGE_BYTES } from "../media/constants.js";
import { buildCliArgs, loadPromptRefImages } from "./cli-runner/helpers.js";
import { buildCliArgs, loadPromptRefImages, resolveCliRunQueueKey } from "./cli-runner/helpers.js";
import * as promptImageUtils from "./pi-embedded-runner/run/images.js";
import type { SandboxFsBridge } from "./sandbox/fs-bridge.js";
import { SYSTEM_PROMPT_CACHE_BOUNDARY } from "./system-prompt-cache-boundary.js";
@ -134,3 +134,51 @@ describe("buildCliArgs", () => {
).toEqual(["-p", "--append-system-prompt", "Stable prefix\nDynamic suffix"]);
});
});
describe("resolveCliRunQueueKey", () => {
it("scopes Claude CLI serialization to the workspace for fresh runs", () => {
expect(
resolveCliRunQueueKey({
backendId: "claude-cli",
serialize: true,
runId: "run-1",
workspaceDir: "/tmp/project-a",
}),
).toBe("claude-cli:workspace:/tmp/project-a");
});
it("scopes Claude CLI serialization to the resumed CLI session id", () => {
expect(
resolveCliRunQueueKey({
backendId: "claude-cli",
serialize: true,
runId: "run-2",
workspaceDir: "/tmp/project-a",
cliSessionId: "claude-session-123",
}),
).toBe("claude-cli:session:claude-session-123");
});
it("keeps non-Claude backends on the provider lane when serialized", () => {
expect(
resolveCliRunQueueKey({
backendId: "codex-cli",
serialize: true,
runId: "run-3",
workspaceDir: "/tmp/project-a",
cliSessionId: "thread-123",
}),
).toBe("codex-cli");
});
it("disables serialization when serialize=false", () => {
expect(
resolveCliRunQueueKey({
backendId: "claude-cli",
serialize: false,
runId: "run-4",
workspaceDir: "/tmp/project-a",
}),
).toBe("claude-cli:run-4");
});
});

View File

@ -13,6 +13,7 @@ import {
appendImagePathsToPrompt,
buildCliSupervisorScopeKey,
buildCliArgs,
resolveCliRunQueueKey,
enqueueCliRun,
loadPromptRefImages,
resolveCliNoOutputTimeoutMs,
@ -138,10 +139,13 @@ export async function executePreparedCliRun(
useResume,
});
const serialize = backend.serialize ?? true;
const queueKey = serialize
? context.backendResolved.id
: `${context.backendResolved.id}:${params.runId}`;
const queueKey = resolveCliRunQueueKey({
backendId: context.backendResolved.id,
serialize: backend.serialize,
runId: params.runId,
workspaceDir: context.workspaceDir,
cliSessionId: useResume ? resolvedSessionId : undefined,
});
try {
return await enqueueCliRun(queueKey, async () => {

View File

@ -9,6 +9,7 @@ import type { ThinkLevel } from "../../auto-reply/thinking.js";
import type { OpenClawConfig } from "../../config/config.js";
import type { CliBackendConfig } from "../../config/types.js";
import { MAX_IMAGE_BYTES } from "../../media/constants.js";
import { isClaudeCliProvider } from "../../plugin-sdk/anthropic-cli.js";
import { buildTtsSystemPromptHint } from "../../tts/tts.js";
import { buildModelAliasLines } from "../model-alias-lines.js";
import { resolveDefaultModelForAgent } from "../model-selection.js";
@ -28,6 +29,29 @@ export function enqueueCliRun<T>(key: string, task: () => Promise<T>): Promise<T
return CLI_RUN_QUEUE.enqueue(key, task);
}
export function resolveCliRunQueueKey(params: {
backendId: string;
serialize?: boolean;
runId: string;
workspaceDir: string;
cliSessionId?: string;
}): string {
if (params.serialize === false) {
return `${params.backendId}:${params.runId}`;
}
if (isClaudeCliProvider(params.backendId)) {
const sessionId = params.cliSessionId?.trim();
if (sessionId) {
return `${params.backendId}:session:${sessionId}`;
}
const workspaceDir = params.workspaceDir.trim();
if (workspaceDir) {
return `${params.backendId}:workspace:${workspaceDir}`;
}
}
return params.backendId;
}
export function buildSystemPrompt(params: {
workspaceDir: string;
config?: OpenClawConfig;