From 4e099689c075ef302028bd4ecb9b969e136e8703 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 4 Apr 2026 19:32:56 +0900 Subject: [PATCH] feat: stream Claude CLI JSONL output --- CHANGELOG.md | 1 + docs/gateway/cli-backends.md | 16 ++- extensions/anthropic/cli-backend.ts | 4 +- src/agents/cli-backends.test.ts | 13 +- src/agents/cli-output.ts | 114 +++++++++++++++++ src/agents/cli-runner.spawn.test.ts | 121 ++++++++++++++++++- src/agents/cli-runner/execute.ts | 22 +++- src/gateway/gateway-cli-backend.live.test.ts | 4 +- 8 files changed, 283 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c57cfd40472..f9cf5c7ed99 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ Docs: https://docs.openclaw.ai - Providers/request overrides: add shared model and media request transport overrides across OpenAI-, Anthropic-, Google-, and compatible provider paths, including headers, auth, proxy, and TLS controls. (#60200) - Matrix/exec approvals: add Matrix-native exec approval prompts with account-scoped approvers, channel-or-DM delivery, and room-thread aware resolution handling. (#58635) Thanks @gumadeiras. - Agents/Claude CLI: expose OpenClaw tools to background Claude CLI runs through a loopback MCP bridge that reuses gateway tool policy, honors session/account/channel scoping, and only advertises the bridge when the local runtime is actually live. (#35676) Thanks @mylukin. +- Agents/Claude CLI: switch bundled Claude CLI runs to stdin + `stream-json` partial-message streaming so prompts stop riding argv, long replies show live progress, and final session/usage metadata still land cleanly. - Prompt caching: keep prompt prefixes more reusable across transport fallback, deterministic MCP tool ordering, compaction, and embedded image history so follow-up turns hit cache more reliably. (#58036, #58037, #58038, #59054, #60603, #60691) Thanks @bcherny. - Agents/cache: diagnostics: add prompt-cache break diagnostics, trace live cache scenarios through embedded runner paths, and show cache reuse explicitly in `openclaw status --verbose`. Thanks @vincentkoc. - Agents/cache: stabilize cache-relevant system prompt fingerprints by normalizing equivalent structured prompt whitespace, line endings, hook-added system context, and runtime capability ordering so semantically unchanged prompts reuse KV/cache more reliably. Thanks @vincentkoc. diff --git a/docs/gateway/cli-backends.md b/docs/gateway/cli-backends.md index fb3b8c96e6d..bcfe537c487 100644 --- a/docs/gateway/cli-backends.md +++ b/docs/gateway/cli-backends.md @@ -13,7 +13,7 @@ OpenClaw can run **local AI CLIs** as a **text-only fallback** when API provider rate-limited, or temporarily misbehaving. This is intentionally conservative: - **Tools are disabled** (no tool calls). -- **Text in → text out** (reliable). +- **Text in → text out** (reliable, with Claude CLI partial text streaming when enabled). - **Sessions are supported** (so follow-up turns stay coherent). - **Images can be passed through** if the CLI accepts image paths. @@ -185,8 +185,9 @@ load local files from plain paths (Claude Code CLI behavior). ## Inputs / outputs - `output: "json"` (default) tries to parse JSON and extract text + session id. -- `output: "jsonl"` parses JSONL streams (Codex CLI `--json`) and extracts the - last agent message plus `thread_id` when present. +- `output: "jsonl"` parses JSONL streams (for example Claude CLI `stream-json` + and Codex CLI `--json`) and extracts the final agent message plus session + identifiers when present. - `output: "text"` treats stdout as the final response. Input modes: @@ -200,8 +201,10 @@ Input modes: The bundled Anthropic plugin registers a default for `claude-cli`: - `command: "claude"` -- `args: ["-p", "--output-format", "json", "--permission-mode", "bypassPermissions"]` -- `resumeArgs: ["-p", "--output-format", "json", "--permission-mode", "bypassPermissions", "--resume", "{sessionId}"]` +- `args: ["-p", "--output-format", "stream-json", "--include-partial-messages", "--verbose", "--permission-mode", "bypassPermissions"]` +- `resumeArgs: ["-p", "--output-format", "stream-json", "--include-partial-messages", "--verbose", "--permission-mode", "bypassPermissions", "--resume", "{sessionId}"]` +- `output: "jsonl"` +- `input: "stdin"` - `modelArg: "--model"` - `systemPromptArg: "--append-system-prompt"` - `sessionArg: "--session-id"` @@ -244,7 +247,8 @@ CLI backend defaults are now part of the plugin surface: - **No OpenClaw tools** (the CLI backend never receives tool calls). Some CLIs may still run their own agent tooling. -- **No streaming** (CLI output is collected then returned). +- **Streaming is backend-specific**. Claude CLI forwards partial text from + `stream-json`; other CLI backends may still be buffered until exit. - **Structured outputs** depend on the CLI’s JSON format. - **Codex CLI sessions** resume via text output (no JSONL), which is less structured than the initial `--json` run. OpenClaw sessions still work diff --git a/extensions/anthropic/cli-backend.ts b/extensions/anthropic/cli-backend.ts index c2bc0458418..7a1a2d8e899 100644 --- a/extensions/anthropic/cli-backend.ts +++ b/extensions/anthropic/cli-backend.ts @@ -21,6 +21,7 @@ export function buildAnthropicCliBackend(): CliBackendPlugin { "-p", "--output-format", "stream-json", + "--include-partial-messages", "--verbose", "--permission-mode", "bypassPermissions", @@ -29,6 +30,7 @@ export function buildAnthropicCliBackend(): CliBackendPlugin { "-p", "--output-format", "stream-json", + "--include-partial-messages", "--verbose", "--permission-mode", "bypassPermissions", @@ -36,7 +38,7 @@ export function buildAnthropicCliBackend(): CliBackendPlugin { "{sessionId}", ], output: "jsonl", - input: "arg", + input: "stdin", modelArg: "--model", modelAliases: CLAUDE_CLI_MODEL_ALIASES, sessionArg: "--session-id", diff --git a/src/agents/cli-backends.test.ts b/src/agents/cli-backends.test.ts index 1e47ae4b872..9498d280d36 100644 --- a/src/agents/cli-backends.test.ts +++ b/src/agents/cli-backends.test.ts @@ -32,9 +32,16 @@ beforeEach(() => { id: "claude-cli", config: { command: "claude", - args: ["stream-json", "--verbose", "--permission-mode", "bypassPermissions"], + args: [ + "stream-json", + "--include-partial-messages", + "--verbose", + "--permission-mode", + "bypassPermissions", + ], resumeArgs: [ "stream-json", + "--include-partial-messages", "--verbose", "--permission-mode", "bypassPermissions", @@ -42,6 +49,7 @@ beforeEach(() => { "{sessionId}", ], output: "jsonl", + input: "stdin", }, normalizeConfig: (config) => { const normalizeArgs = (args: string[] | undefined) => { @@ -185,11 +193,14 @@ describe("resolveCliBackendConfig claude-cli defaults", () => { expect(resolved).not.toBeNull(); expect(resolved?.config.output).toBe("jsonl"); expect(resolved?.config.args).toContain("stream-json"); + expect(resolved?.config.args).toContain("--include-partial-messages"); expect(resolved?.config.args).toContain("--verbose"); expect(resolved?.config.args).toContain("--permission-mode"); expect(resolved?.config.args).toContain("bypassPermissions"); expect(resolved?.config.args).not.toContain("--dangerously-skip-permissions"); + expect(resolved?.config.input).toBe("stdin"); expect(resolved?.config.resumeArgs).toContain("stream-json"); + expect(resolved?.config.resumeArgs).toContain("--include-partial-messages"); expect(resolved?.config.resumeArgs).toContain("--verbose"); expect(resolved?.config.resumeArgs).toContain("--permission-mode"); expect(resolved?.config.resumeArgs).toContain("bypassPermissions"); diff --git a/src/agents/cli-output.ts b/src/agents/cli-output.ts index 7dfea715d92..0df7e56e72c 100644 --- a/src/agents/cli-output.ts +++ b/src/agents/cli-output.ts @@ -16,6 +16,13 @@ export type CliOutput = { usage?: CliUsage; }; +export type CliStreamingDelta = { + text: string; + delta: string; + sessionId?: string; + usage?: CliUsage; +}; + function extractJsonObjectCandidates(raw: string): string[] { const candidates: string[] = []; let depth = 0; @@ -216,6 +223,113 @@ function parseClaudeCliJsonlResult(params: { return null; } +function parseClaudeCliStreamingDelta(params: { + providerId: string; + parsed: Record; + textSoFar: string; + sessionId?: string; + usage?: CliUsage; +}): CliStreamingDelta | null { + if (!isClaudeCliProvider(params.providerId)) { + return null; + } + if (params.parsed.type !== "stream_event" || !isRecord(params.parsed.event)) { + return null; + } + const event = params.parsed.event; + if (event.type !== "content_block_delta" || !isRecord(event.delta)) { + return null; + } + const delta = event.delta; + if (delta.type !== "text_delta" || typeof delta.text !== "string") { + return null; + } + if (!delta.text) { + return null; + } + return { + text: `${params.textSoFar}${delta.text}`, + delta: delta.text, + sessionId: params.sessionId, + usage: params.usage, + }; +} + +export function createCliJsonlStreamingParser(params: { + backend: CliBackendConfig; + providerId: string; + onAssistantDelta: (delta: CliStreamingDelta) => void; +}) { + let lineBuffer = ""; + let assistantText = ""; + let sessionId: string | undefined; + let usage: CliUsage | undefined; + + const handleParsedRecord = (parsed: Record) => { + sessionId = pickCliSessionId(parsed, params.backend) ?? sessionId; + if (!sessionId && typeof parsed.thread_id === "string") { + sessionId = parsed.thread_id.trim(); + } + if (isRecord(parsed.usage)) { + usage = toCliUsage(parsed.usage) ?? usage; + } + + const delta = parseClaudeCliStreamingDelta({ + providerId: params.providerId, + parsed, + textSoFar: assistantText, + sessionId, + usage, + }); + if (!delta) { + return; + } + assistantText = delta.text; + params.onAssistantDelta(delta); + }; + + const flushLines = (flushPartial: boolean) => { + while (true) { + const newlineIndex = lineBuffer.indexOf("\n"); + if (newlineIndex < 0) { + break; + } + const line = lineBuffer.slice(0, newlineIndex).trim(); + lineBuffer = lineBuffer.slice(newlineIndex + 1); + if (!line) { + continue; + } + for (const parsed of parseJsonRecordCandidates(line)) { + handleParsedRecord(parsed); + } + } + if (!flushPartial) { + return; + } + const tail = lineBuffer.trim(); + lineBuffer = ""; + if (!tail) { + return; + } + for (const parsed of parseJsonRecordCandidates(tail)) { + handleParsedRecord(parsed); + } + }; + + return { + push(chunk: string) { + if (!chunk) { + return; + } + lineBuffer += chunk; + flushLines(false); + }, + finish() { + flushLines(true); + }, + }; +} + export function parseCliJsonl( raw: string, backend: CliBackendConfig, diff --git a/src/agents/cli-runner.spawn.test.ts b/src/agents/cli-runner.spawn.test.ts index 23d8de56f7f..c2c753b60fa 100644 --- a/src/agents/cli-runner.spawn.test.ts +++ b/src/agents/cli-runner.spawn.test.ts @@ -1,7 +1,8 @@ import fs from "node:fs/promises"; import path from "node:path"; -import { describe, expect, it } from "vitest"; +import { beforeEach, describe, expect, it } from "vitest"; import type { OpenClawConfig } from "../config/config.js"; +import { onAgentEvent, resetAgentEventsForTest } from "../infra/agent-events.js"; import { resolvePreferredOpenClawTmpDir } from "../infra/tmp-openclaw-dir.js"; import { createManagedRun, @@ -13,6 +14,10 @@ import { supervisorSpawnMock, } from "./cli-runner.test-support.js"; +beforeEach(() => { + resetAgentEventsForTest(); +}); + describe("runCliAgent spawn path", () => { it("does not inject hardcoded 'Tools are disabled' text into CLI arguments", async () => { const runCliAgent = await setupCliRunnerTestModule(); @@ -47,6 +52,40 @@ describe("runCliAgent spawn path", () => { expect(allArgs).toContain("You are a helpful assistant."); }); + it("pipes Claude prompts over stdin instead of argv", async () => { + const runCliAgent = await setupCliRunnerTestModule(); + supervisorSpawnMock.mockResolvedValueOnce( + createManagedRun({ + reason: "exit", + exitCode: 0, + exitSignal: null, + durationMs: 50, + stdout: "ok", + stderr: "", + timedOut: false, + noOutputTimedOut: false, + }), + ); + + await runCliAgent({ + sessionId: "s1", + sessionFile: "/tmp/session.jsonl", + workspaceDir: "/tmp", + prompt: "Explain this diff", + provider: "claude-cli", + model: "sonnet", + timeoutMs: 1_000, + runId: "run-stdin-claude", + }); + + const input = supervisorSpawnMock.mock.calls[0]?.[0] as { + argv?: string[]; + input?: string; + }; + expect(input.input).toContain("Explain this diff"); + expect(input.argv).not.toContain("Explain this diff"); + }); + it("injects a strict empty MCP config for bundle-MCP-enabled Claude CLI runs", async () => { const runCliAgent = await setupCliRunnerTestModule(); supervisorSpawnMock.mockResolvedValueOnce( @@ -142,6 +181,82 @@ describe("runCliAgent spawn path", () => { expect(input.scopeKey).toContain("thread-123"); }); + it("streams Claude text deltas from stream-json stdout", async () => { + const runCliAgent = await setupCliRunnerTestModule(); + const agentEvents: Array<{ stream: string; text?: string; delta?: string }> = []; + const stop = onAgentEvent((evt) => { + agentEvents.push({ + stream: evt.stream, + text: typeof evt.data.text === "string" ? evt.data.text : undefined, + delta: typeof evt.data.delta === "string" ? evt.data.delta : undefined, + }); + }); + supervisorSpawnMock.mockImplementationOnce(async (...args: unknown[]) => { + const input = (args[0] ?? {}) as { onStdout?: (chunk: string) => void }; + input.onStdout?.( + [ + JSON.stringify({ type: "init", session_id: "session-123" }), + JSON.stringify({ + type: "stream_event", + event: { type: "content_block_delta", delta: { type: "text_delta", text: "Hello" } }, + }), + ].join("\n") + "\n", + ); + input.onStdout?.( + JSON.stringify({ + type: "stream_event", + event: { type: "content_block_delta", delta: { type: "text_delta", text: " world" } }, + }) + "\n", + ); + return createManagedRun({ + reason: "exit", + exitCode: 0, + exitSignal: null, + durationMs: 50, + stdout: [ + JSON.stringify({ type: "init", session_id: "session-123" }), + JSON.stringify({ + type: "stream_event", + event: { type: "content_block_delta", delta: { type: "text_delta", text: "Hello" } }, + }), + JSON.stringify({ + type: "stream_event", + event: { type: "content_block_delta", delta: { type: "text_delta", text: " world" } }, + }), + JSON.stringify({ + type: "result", + session_id: "session-123", + result: "Hello world", + }), + ].join("\n"), + stderr: "", + timedOut: false, + noOutputTimedOut: false, + }); + }); + + try { + const result = await runCliAgent({ + sessionId: "s1", + sessionFile: "/tmp/session.jsonl", + workspaceDir: "/tmp", + prompt: "hi", + provider: "claude-cli", + model: "sonnet", + timeoutMs: 1_000, + runId: "run-claude-stream-json", + }); + + expect(result.payloads?.[0]?.text).toBe("Hello world"); + expect(agentEvents).toEqual([ + { stream: "assistant", text: "Hello", delta: "Hello" }, + { stream: "assistant", text: "Hello world", delta: " world" }, + ]); + } finally { + stop(); + } + }); + it("sanitizes dangerous backend env overrides before spawn", async () => { const runCliAgent = await setupCliRunnerTestModule(); mockSuccessfulCliRun(); @@ -334,7 +449,9 @@ describe("runCliAgent spawn path", () => { const argv = input.argv ?? []; expect(argv).not.toContain("--image"); const promptCarrier = [input.input ?? "", ...argv].join("\n"); - const appendedPath = argv.find((value) => value.includes("openclaw-cli-images-")); + const appendedPath = promptCarrier + .split("\n") + .find((value) => value.includes("openclaw-cli-images-")); expect(appendedPath).toBeDefined(); expect(appendedPath).not.toBe(sourceImage); expect(promptCarrier).toContain(appendedPath ?? ""); diff --git a/src/agents/cli-runner/execute.ts b/src/agents/cli-runner/execute.ts index 34c2618a848..eed71f000c9 100644 --- a/src/agents/cli-runner/execute.ts +++ b/src/agents/cli-runner/execute.ts @@ -1,4 +1,5 @@ import { shouldLogVerbose } from "../../globals.js"; +import { emitAgentEvent } from "../../infra/agent-events.js"; import { isTruthyEnvValue } from "../../infra/env.js"; import { requestHeartbeatNow as requestHeartbeatNowImpl } from "../../infra/heartbeat-wake.js"; import { sanitizeHostExecEnv } from "../../infra/host-env-security.js"; @@ -6,7 +7,7 @@ import { enqueueSystemEvent as enqueueSystemEventImpl } from "../../infra/system import { getProcessSupervisor as getProcessSupervisorImpl } from "../../process/supervisor/index.js"; import { scopedHeartbeatWakeOptions } from "../../routing/session-key.js"; import { prependBootstrapPromptWarning } from "../bootstrap-budget.js"; -import { parseCliOutput, type CliOutput } from "../cli-output.js"; +import { createCliJsonlStreamingParser, parseCliOutput, type CliOutput } from "../cli-output.js"; import { FailoverError, resolveFailoverStatus } from "../failover-error.js"; import { classifyFailoverReason } from "../pi-embedded-helpers.js"; import { @@ -184,6 +185,23 @@ export async function executePreparedCliRun( timeoutMs: params.timeoutMs, useResume, }); + const streamingParser = + backend.output === "jsonl" + ? createCliJsonlStreamingParser({ + backend, + providerId: context.backendResolved.id, + onAssistantDelta: ({ text, delta }) => { + emitAgentEvent({ + runId: params.runId, + stream: "assistant", + data: { + text, + delta, + }, + }); + }, + }) + : null; const supervisor = executeDeps.getProcessSupervisor(); const scopeKey = buildCliSupervisorScopeKey({ backend, @@ -203,8 +221,10 @@ export async function executePreparedCliRun( cwd: context.workspaceDir, env, input: stdinPayload, + onStdout: streamingParser ? (chunk: string) => streamingParser.push(chunk) : undefined, }); const result = await managedRun.wait(); + streamingParser?.finish(); const stdout = result.stdout.trim(); const stderr = result.stderr.trim(); diff --git a/src/gateway/gateway-cli-backend.live.test.ts b/src/gateway/gateway-cli-backend.live.test.ts index ce5fcf0c1f9..0e7ee558423 100644 --- a/src/gateway/gateway-cli-backend.live.test.ts +++ b/src/gateway/gateway-cli-backend.live.test.ts @@ -24,7 +24,9 @@ const DEFAULT_MODEL = "claude-cli/claude-sonnet-4-6"; const DEFAULT_CLAUDE_ARGS = [ "-p", "--output-format", - "json", + "stream-json", + "--include-partial-messages", + "--verbose", "--permission-mode", "bypassPermissions", ];