feat: stream Claude CLI JSONL output

This commit is contained in:
Peter Steinberger 2026-04-04 19:32:56 +09:00
parent 2ab1f1c054
commit 4e099689c0
No known key found for this signature in database
8 changed files with 283 additions and 12 deletions

View File

@ -27,6 +27,7 @@ Docs: https://docs.openclaw.ai
- Providers/request overrides: add shared model and media request transport overrides across OpenAI-, Anthropic-, Google-, and compatible provider paths, including headers, auth, proxy, and TLS controls. (#60200)
- Matrix/exec approvals: add Matrix-native exec approval prompts with account-scoped approvers, channel-or-DM delivery, and room-thread aware resolution handling. (#58635) Thanks @gumadeiras.
- Agents/Claude CLI: expose OpenClaw tools to background Claude CLI runs through a loopback MCP bridge that reuses gateway tool policy, honors session/account/channel scoping, and only advertises the bridge when the local runtime is actually live. (#35676) Thanks @mylukin.
- Agents/Claude CLI: switch bundled Claude CLI runs to stdin + `stream-json` partial-message streaming so prompts stop riding argv, long replies show live progress, and final session/usage metadata still land cleanly.
- Prompt caching: keep prompt prefixes more reusable across transport fallback, deterministic MCP tool ordering, compaction, and embedded image history so follow-up turns hit cache more reliably. (#58036, #58037, #58038, #59054, #60603, #60691) Thanks @bcherny.
- Agents/cache: diagnostics: add prompt-cache break diagnostics, trace live cache scenarios through embedded runner paths, and show cache reuse explicitly in `openclaw status --verbose`. Thanks @vincentkoc.
- Agents/cache: stabilize cache-relevant system prompt fingerprints by normalizing equivalent structured prompt whitespace, line endings, hook-added system context, and runtime capability ordering so semantically unchanged prompts reuse KV/cache more reliably. Thanks @vincentkoc.

View File

@ -13,7 +13,7 @@ OpenClaw can run **local AI CLIs** as a **text-only fallback** when API provider
rate-limited, or temporarily misbehaving. This is intentionally conservative:
- **Tools are disabled** (no tool calls).
- **Text in → text out** (reliable).
- **Text in → text out** (reliable, with Claude CLI partial text streaming when enabled).
- **Sessions are supported** (so follow-up turns stay coherent).
- **Images can be passed through** if the CLI accepts image paths.
@ -185,8 +185,9 @@ load local files from plain paths (Claude Code CLI behavior).
## Inputs / outputs
- `output: "json"` (default) tries to parse JSON and extract text + session id.
- `output: "jsonl"` parses JSONL streams (Codex CLI `--json`) and extracts the
last agent message plus `thread_id` when present.
- `output: "jsonl"` parses JSONL streams (for example Claude CLI `stream-json`
and Codex CLI `--json`) and extracts the final agent message plus session
identifiers when present.
- `output: "text"` treats stdout as the final response.
Input modes:
@ -200,8 +201,10 @@ Input modes:
The bundled Anthropic plugin registers a default for `claude-cli`:
- `command: "claude"`
- `args: ["-p", "--output-format", "json", "--permission-mode", "bypassPermissions"]`
- `resumeArgs: ["-p", "--output-format", "json", "--permission-mode", "bypassPermissions", "--resume", "{sessionId}"]`
- `args: ["-p", "--output-format", "stream-json", "--include-partial-messages", "--verbose", "--permission-mode", "bypassPermissions"]`
- `resumeArgs: ["-p", "--output-format", "stream-json", "--include-partial-messages", "--verbose", "--permission-mode", "bypassPermissions", "--resume", "{sessionId}"]`
- `output: "jsonl"`
- `input: "stdin"`
- `modelArg: "--model"`
- `systemPromptArg: "--append-system-prompt"`
- `sessionArg: "--session-id"`
@ -244,7 +247,8 @@ CLI backend defaults are now part of the plugin surface:
- **No OpenClaw tools** (the CLI backend never receives tool calls). Some CLIs
may still run their own agent tooling.
- **No streaming** (CLI output is collected then returned).
- **Streaming is backend-specific**. Claude CLI forwards partial text from
`stream-json`; other CLI backends may still be buffered until exit.
- **Structured outputs** depend on the CLIs JSON format.
- **Codex CLI sessions** resume via text output (no JSONL), which is less
structured than the initial `--json` run. OpenClaw sessions still work

View File

@ -21,6 +21,7 @@ export function buildAnthropicCliBackend(): CliBackendPlugin {
"-p",
"--output-format",
"stream-json",
"--include-partial-messages",
"--verbose",
"--permission-mode",
"bypassPermissions",
@ -29,6 +30,7 @@ export function buildAnthropicCliBackend(): CliBackendPlugin {
"-p",
"--output-format",
"stream-json",
"--include-partial-messages",
"--verbose",
"--permission-mode",
"bypassPermissions",
@ -36,7 +38,7 @@ export function buildAnthropicCliBackend(): CliBackendPlugin {
"{sessionId}",
],
output: "jsonl",
input: "arg",
input: "stdin",
modelArg: "--model",
modelAliases: CLAUDE_CLI_MODEL_ALIASES,
sessionArg: "--session-id",

View File

@ -32,9 +32,16 @@ beforeEach(() => {
id: "claude-cli",
config: {
command: "claude",
args: ["stream-json", "--verbose", "--permission-mode", "bypassPermissions"],
args: [
"stream-json",
"--include-partial-messages",
"--verbose",
"--permission-mode",
"bypassPermissions",
],
resumeArgs: [
"stream-json",
"--include-partial-messages",
"--verbose",
"--permission-mode",
"bypassPermissions",
@ -42,6 +49,7 @@ beforeEach(() => {
"{sessionId}",
],
output: "jsonl",
input: "stdin",
},
normalizeConfig: (config) => {
const normalizeArgs = (args: string[] | undefined) => {
@ -185,11 +193,14 @@ describe("resolveCliBackendConfig claude-cli defaults", () => {
expect(resolved).not.toBeNull();
expect(resolved?.config.output).toBe("jsonl");
expect(resolved?.config.args).toContain("stream-json");
expect(resolved?.config.args).toContain("--include-partial-messages");
expect(resolved?.config.args).toContain("--verbose");
expect(resolved?.config.args).toContain("--permission-mode");
expect(resolved?.config.args).toContain("bypassPermissions");
expect(resolved?.config.args).not.toContain("--dangerously-skip-permissions");
expect(resolved?.config.input).toBe("stdin");
expect(resolved?.config.resumeArgs).toContain("stream-json");
expect(resolved?.config.resumeArgs).toContain("--include-partial-messages");
expect(resolved?.config.resumeArgs).toContain("--verbose");
expect(resolved?.config.resumeArgs).toContain("--permission-mode");
expect(resolved?.config.resumeArgs).toContain("bypassPermissions");

View File

@ -16,6 +16,13 @@ export type CliOutput = {
usage?: CliUsage;
};
export type CliStreamingDelta = {
text: string;
delta: string;
sessionId?: string;
usage?: CliUsage;
};
function extractJsonObjectCandidates(raw: string): string[] {
const candidates: string[] = [];
let depth = 0;
@ -216,6 +223,113 @@ function parseClaudeCliJsonlResult(params: {
return null;
}
function parseClaudeCliStreamingDelta(params: {
providerId: string;
parsed: Record<string, unknown>;
textSoFar: string;
sessionId?: string;
usage?: CliUsage;
}): CliStreamingDelta | null {
if (!isClaudeCliProvider(params.providerId)) {
return null;
}
if (params.parsed.type !== "stream_event" || !isRecord(params.parsed.event)) {
return null;
}
const event = params.parsed.event;
if (event.type !== "content_block_delta" || !isRecord(event.delta)) {
return null;
}
const delta = event.delta;
if (delta.type !== "text_delta" || typeof delta.text !== "string") {
return null;
}
if (!delta.text) {
return null;
}
return {
text: `${params.textSoFar}${delta.text}`,
delta: delta.text,
sessionId: params.sessionId,
usage: params.usage,
};
}
export function createCliJsonlStreamingParser(params: {
backend: CliBackendConfig;
providerId: string;
onAssistantDelta: (delta: CliStreamingDelta) => void;
}) {
let lineBuffer = "";
let assistantText = "";
let sessionId: string | undefined;
let usage: CliUsage | undefined;
const handleParsedRecord = (parsed: Record<string, unknown>) => {
sessionId = pickCliSessionId(parsed, params.backend) ?? sessionId;
if (!sessionId && typeof parsed.thread_id === "string") {
sessionId = parsed.thread_id.trim();
}
if (isRecord(parsed.usage)) {
usage = toCliUsage(parsed.usage) ?? usage;
}
const delta = parseClaudeCliStreamingDelta({
providerId: params.providerId,
parsed,
textSoFar: assistantText,
sessionId,
usage,
});
if (!delta) {
return;
}
assistantText = delta.text;
params.onAssistantDelta(delta);
};
const flushLines = (flushPartial: boolean) => {
while (true) {
const newlineIndex = lineBuffer.indexOf("\n");
if (newlineIndex < 0) {
break;
}
const line = lineBuffer.slice(0, newlineIndex).trim();
lineBuffer = lineBuffer.slice(newlineIndex + 1);
if (!line) {
continue;
}
for (const parsed of parseJsonRecordCandidates(line)) {
handleParsedRecord(parsed);
}
}
if (!flushPartial) {
return;
}
const tail = lineBuffer.trim();
lineBuffer = "";
if (!tail) {
return;
}
for (const parsed of parseJsonRecordCandidates(tail)) {
handleParsedRecord(parsed);
}
};
return {
push(chunk: string) {
if (!chunk) {
return;
}
lineBuffer += chunk;
flushLines(false);
},
finish() {
flushLines(true);
},
};
}
export function parseCliJsonl(
raw: string,
backend: CliBackendConfig,

View File

@ -1,7 +1,8 @@
import fs from "node:fs/promises";
import path from "node:path";
import { describe, expect, it } from "vitest";
import { beforeEach, describe, expect, it } from "vitest";
import type { OpenClawConfig } from "../config/config.js";
import { onAgentEvent, resetAgentEventsForTest } from "../infra/agent-events.js";
import { resolvePreferredOpenClawTmpDir } from "../infra/tmp-openclaw-dir.js";
import {
createManagedRun,
@ -13,6 +14,10 @@ import {
supervisorSpawnMock,
} from "./cli-runner.test-support.js";
beforeEach(() => {
resetAgentEventsForTest();
});
describe("runCliAgent spawn path", () => {
it("does not inject hardcoded 'Tools are disabled' text into CLI arguments", async () => {
const runCliAgent = await setupCliRunnerTestModule();
@ -47,6 +52,40 @@ describe("runCliAgent spawn path", () => {
expect(allArgs).toContain("You are a helpful assistant.");
});
it("pipes Claude prompts over stdin instead of argv", async () => {
const runCliAgent = await setupCliRunnerTestModule();
supervisorSpawnMock.mockResolvedValueOnce(
createManagedRun({
reason: "exit",
exitCode: 0,
exitSignal: null,
durationMs: 50,
stdout: "ok",
stderr: "",
timedOut: false,
noOutputTimedOut: false,
}),
);
await runCliAgent({
sessionId: "s1",
sessionFile: "/tmp/session.jsonl",
workspaceDir: "/tmp",
prompt: "Explain this diff",
provider: "claude-cli",
model: "sonnet",
timeoutMs: 1_000,
runId: "run-stdin-claude",
});
const input = supervisorSpawnMock.mock.calls[0]?.[0] as {
argv?: string[];
input?: string;
};
expect(input.input).toContain("Explain this diff");
expect(input.argv).not.toContain("Explain this diff");
});
it("injects a strict empty MCP config for bundle-MCP-enabled Claude CLI runs", async () => {
const runCliAgent = await setupCliRunnerTestModule();
supervisorSpawnMock.mockResolvedValueOnce(
@ -142,6 +181,82 @@ describe("runCliAgent spawn path", () => {
expect(input.scopeKey).toContain("thread-123");
});
it("streams Claude text deltas from stream-json stdout", async () => {
const runCliAgent = await setupCliRunnerTestModule();
const agentEvents: Array<{ stream: string; text?: string; delta?: string }> = [];
const stop = onAgentEvent((evt) => {
agentEvents.push({
stream: evt.stream,
text: typeof evt.data.text === "string" ? evt.data.text : undefined,
delta: typeof evt.data.delta === "string" ? evt.data.delta : undefined,
});
});
supervisorSpawnMock.mockImplementationOnce(async (...args: unknown[]) => {
const input = (args[0] ?? {}) as { onStdout?: (chunk: string) => void };
input.onStdout?.(
[
JSON.stringify({ type: "init", session_id: "session-123" }),
JSON.stringify({
type: "stream_event",
event: { type: "content_block_delta", delta: { type: "text_delta", text: "Hello" } },
}),
].join("\n") + "\n",
);
input.onStdout?.(
JSON.stringify({
type: "stream_event",
event: { type: "content_block_delta", delta: { type: "text_delta", text: " world" } },
}) + "\n",
);
return createManagedRun({
reason: "exit",
exitCode: 0,
exitSignal: null,
durationMs: 50,
stdout: [
JSON.stringify({ type: "init", session_id: "session-123" }),
JSON.stringify({
type: "stream_event",
event: { type: "content_block_delta", delta: { type: "text_delta", text: "Hello" } },
}),
JSON.stringify({
type: "stream_event",
event: { type: "content_block_delta", delta: { type: "text_delta", text: " world" } },
}),
JSON.stringify({
type: "result",
session_id: "session-123",
result: "Hello world",
}),
].join("\n"),
stderr: "",
timedOut: false,
noOutputTimedOut: false,
});
});
try {
const result = await runCliAgent({
sessionId: "s1",
sessionFile: "/tmp/session.jsonl",
workspaceDir: "/tmp",
prompt: "hi",
provider: "claude-cli",
model: "sonnet",
timeoutMs: 1_000,
runId: "run-claude-stream-json",
});
expect(result.payloads?.[0]?.text).toBe("Hello world");
expect(agentEvents).toEqual([
{ stream: "assistant", text: "Hello", delta: "Hello" },
{ stream: "assistant", text: "Hello world", delta: " world" },
]);
} finally {
stop();
}
});
it("sanitizes dangerous backend env overrides before spawn", async () => {
const runCliAgent = await setupCliRunnerTestModule();
mockSuccessfulCliRun();
@ -334,7 +449,9 @@ describe("runCliAgent spawn path", () => {
const argv = input.argv ?? [];
expect(argv).not.toContain("--image");
const promptCarrier = [input.input ?? "", ...argv].join("\n");
const appendedPath = argv.find((value) => value.includes("openclaw-cli-images-"));
const appendedPath = promptCarrier
.split("\n")
.find((value) => value.includes("openclaw-cli-images-"));
expect(appendedPath).toBeDefined();
expect(appendedPath).not.toBe(sourceImage);
expect(promptCarrier).toContain(appendedPath ?? "");

View File

@ -1,4 +1,5 @@
import { shouldLogVerbose } from "../../globals.js";
import { emitAgentEvent } from "../../infra/agent-events.js";
import { isTruthyEnvValue } from "../../infra/env.js";
import { requestHeartbeatNow as requestHeartbeatNowImpl } from "../../infra/heartbeat-wake.js";
import { sanitizeHostExecEnv } from "../../infra/host-env-security.js";
@ -6,7 +7,7 @@ import { enqueueSystemEvent as enqueueSystemEventImpl } from "../../infra/system
import { getProcessSupervisor as getProcessSupervisorImpl } from "../../process/supervisor/index.js";
import { scopedHeartbeatWakeOptions } from "../../routing/session-key.js";
import { prependBootstrapPromptWarning } from "../bootstrap-budget.js";
import { parseCliOutput, type CliOutput } from "../cli-output.js";
import { createCliJsonlStreamingParser, parseCliOutput, type CliOutput } from "../cli-output.js";
import { FailoverError, resolveFailoverStatus } from "../failover-error.js";
import { classifyFailoverReason } from "../pi-embedded-helpers.js";
import {
@ -184,6 +185,23 @@ export async function executePreparedCliRun(
timeoutMs: params.timeoutMs,
useResume,
});
const streamingParser =
backend.output === "jsonl"
? createCliJsonlStreamingParser({
backend,
providerId: context.backendResolved.id,
onAssistantDelta: ({ text, delta }) => {
emitAgentEvent({
runId: params.runId,
stream: "assistant",
data: {
text,
delta,
},
});
},
})
: null;
const supervisor = executeDeps.getProcessSupervisor();
const scopeKey = buildCliSupervisorScopeKey({
backend,
@ -203,8 +221,10 @@ export async function executePreparedCliRun(
cwd: context.workspaceDir,
env,
input: stdinPayload,
onStdout: streamingParser ? (chunk: string) => streamingParser.push(chunk) : undefined,
});
const result = await managedRun.wait();
streamingParser?.finish();
const stdout = result.stdout.trim();
const stderr = result.stderr.trim();

View File

@ -24,7 +24,9 @@ const DEFAULT_MODEL = "claude-cli/claude-sonnet-4-6";
const DEFAULT_CLAUDE_ARGS = [
"-p",
"--output-format",
"json",
"stream-json",
"--include-partial-messages",
"--verbose",
"--permission-mode",
"bypassPermissions",
];