diff --git a/CHANGELOG.md b/CHANGELOG.md index 4743079ed95..328e99458db 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ Docs: https://docs.openclaw.ai - Tests/runtime: trim local unit-test import/runtime fan-out across browser, WhatsApp, cron, task, and reply flows so owner suites start faster with lower shared-worker overhead while preserving the same focused behavior coverage. (#60249) Thanks @shakkernerd. - Tests/secrets runtime: restore split secrets suite cache and env isolation cleanup so broader runs do not leak stale plugin or provider snapshot state. (#60395) Thanks @shakkernerd. - Memory/dreaming (experimental): add opt-in weighted short-term recall promotion to `MEMORY.md`, managed dreaming modes (`off|core|rem|deep`), and a `/dreaming` command plus Dreams UI so durable memory promotion can run on background cadence without manual scheduling. (#60569) Thanks @vignesh07. +- Agents/system prompts: add an internal cache-prefix boundary across Anthropic-family, OpenAI-family, Google, and CLI transport shaping so stable system-prompt prefixes stay reusable without leaking internal cache markers to provider payloads. (#59054) ### Fixes diff --git a/src/agents/anthropic-payload-policy.test.ts b/src/agents/anthropic-payload-policy.test.ts index 5e7d3dd1afc..bd3032e8ddd 100644 --- a/src/agents/anthropic-payload-policy.test.ts +++ b/src/agents/anthropic-payload-policy.test.ts @@ -3,6 +3,7 @@ import { applyAnthropicPayloadPolicyToParams, resolveAnthropicPayloadPolicy, } from "./anthropic-payload-policy.js"; +import { SYSTEM_PROMPT_CACHE_BOUNDARY } from "./system-prompt-cache-boundary.js"; type TestPayload = { messages: Array<{ role: string; content: unknown }>; @@ -102,4 +103,65 @@ describe("anthropic payload policy", () => { content: [{ type: "text", text: "Hello", cache_control: { type: "ephemeral" } }], }); }); + + it("splits cached stable system content from uncached dynamic content", () => { + const policy = resolveAnthropicPayloadPolicy({ + provider: "anthropic", + api: "anthropic-messages", + baseUrl: "https://api.anthropic.com/v1", + cacheRetention: "long", + enableCacheControl: true, + }); + const payload: TestPayload = { + system: [ + { + type: "text", + text: `Stable prefix${SYSTEM_PROMPT_CACHE_BOUNDARY}Dynamic lab suffix`, + }, + ], + messages: [{ role: "user", content: "Hello" }], + }; + + applyAnthropicPayloadPolicyToParams(payload, policy); + + expect(payload.system).toEqual([ + { + type: "text", + text: "Stable prefix", + cache_control: { type: "ephemeral", ttl: "1h" }, + }, + { + type: "text", + text: "Dynamic lab suffix", + }, + ]); + }); + + it("strips the boundary even when cache retention is disabled", () => { + const policy = resolveAnthropicPayloadPolicy({ + provider: "anthropic", + api: "anthropic-messages", + baseUrl: "https://api.anthropic.com/v1", + cacheRetention: "none", + enableCacheControl: true, + }); + const payload: TestPayload = { + system: [ + { + type: "text", + text: `Stable prefix${SYSTEM_PROMPT_CACHE_BOUNDARY}Dynamic lab suffix`, + }, + ], + messages: [{ role: "user", content: "Hello" }], + }; + + applyAnthropicPayloadPolicyToParams(payload, policy); + + expect(payload.system).toEqual([ + { + type: "text", + text: "Stable prefix\nDynamic lab suffix", + }, + ]); + }); }); diff --git a/src/agents/anthropic-payload-policy.ts b/src/agents/anthropic-payload-policy.ts index 50887824938..838afa96801 100644 --- a/src/agents/anthropic-payload-policy.ts +++ b/src/agents/anthropic-payload-policy.ts @@ -1,4 +1,8 @@ import { resolveProviderRequestCapabilities } from "./provider-attribution.js"; +import { + splitSystemPromptCacheBoundary, + stripSystemPromptCacheBoundary, +} from "./system-prompt-cache-boundary.js"; export type AnthropicServiceTier = "auto" | "standard_only"; @@ -46,13 +50,57 @@ function applyAnthropicCacheControlToSystem( return; } + const normalizedBlocks: Array = []; + for (const block of system) { + if (!block || typeof block !== "object") { + normalizedBlocks.push(block); + continue; + } + const record = block as Record; + if (record.type !== "text" || typeof record.text !== "string") { + normalizedBlocks.push(block); + continue; + } + const split = splitSystemPromptCacheBoundary(record.text); + if (!split) { + if (record.cache_control === undefined) { + record.cache_control = cacheControl; + } + normalizedBlocks.push(record); + continue; + } + + const { cache_control: existingCacheControl, ...rest } = record; + if (split.stablePrefix) { + normalizedBlocks.push({ + ...rest, + text: split.stablePrefix, + cache_control: existingCacheControl ?? cacheControl, + }); + } + if (split.dynamicSuffix) { + normalizedBlocks.push({ + ...rest, + text: split.dynamicSuffix, + }); + } + } + + system.splice(0, system.length, ...normalizedBlocks); +} + +function stripAnthropicSystemPromptBoundary(system: unknown): void { + if (!Array.isArray(system)) { + return; + } + for (const block of system) { if (!block || typeof block !== "object") { continue; } const record = block as Record; - if (record.type === "text" && record.cache_control === undefined) { - record.cache_control = cacheControl; + if (record.type === "text" && typeof record.text === "string") { + record.text = stripSystemPromptCacheBoundary(record.text); } } } @@ -136,11 +184,16 @@ export function applyAnthropicPayloadPolicyToParams( payloadObj.service_tier = policy.serviceTier; } + if (policy.cacheControl) { + applyAnthropicCacheControlToSystem(payloadObj.system, policy.cacheControl); + } else { + stripAnthropicSystemPromptBoundary(payloadObj.system); + } + if (!policy.cacheControl) { return; } - applyAnthropicCacheControlToSystem(payloadObj.system, policy.cacheControl); // Preserve Anthropic cache-write scope by only tagging the trailing user turn. applyAnthropicCacheControlToMessages(payloadObj.messages, policy.cacheControl); } diff --git a/src/agents/cli-runner.helpers.test.ts b/src/agents/cli-runner.helpers.test.ts index 95659c178bf..8cbe04fd246 100644 --- a/src/agents/cli-runner.helpers.test.ts +++ b/src/agents/cli-runner.helpers.test.ts @@ -4,6 +4,7 @@ import { MAX_IMAGE_BYTES } from "../media/constants.js"; import { buildCliArgs, loadPromptRefImages } from "./cli-runner/helpers.js"; import * as promptImageUtils from "./pi-embedded-runner/run/images.js"; import type { SandboxFsBridge } from "./sandbox/fs-bridge.js"; +import { SYSTEM_PROMPT_CACHE_BOUNDARY } from "./system-prompt-cache-boundary.js"; import * as toolImages from "./tool-images.js"; describe("loadPromptRefImages", () => { @@ -117,4 +118,19 @@ describe("buildCliArgs", () => { }), ).toEqual(["exec", "resume", "thread-123", "--model", "gpt-5.4"]); }); + + it("strips the internal cache boundary from CLI system prompt args", () => { + expect( + buildCliArgs({ + backend: { + command: "claude", + systemPromptArg: "--append-system-prompt", + }, + baseArgs: ["-p"], + modelId: "claude-sonnet-4-6", + systemPrompt: `Stable prefix${SYSTEM_PROMPT_CACHE_BOUNDARY}Dynamic suffix`, + useResume: false, + }), + ).toEqual(["-p", "--append-system-prompt", "Stable prefix\nDynamic suffix"]); + }); }); diff --git a/src/agents/cli-runner/helpers.ts b/src/agents/cli-runner/helpers.ts index 296fd9fb1ad..bc709681fb6 100644 --- a/src/agents/cli-runner/helpers.ts +++ b/src/agents/cli-runner/helpers.ts @@ -17,6 +17,7 @@ import type { EmbeddedContextFile } from "../pi-embedded-helpers.js"; import { detectImageReferences, loadImageFromRef } from "../pi-embedded-runner/run/images.js"; import type { SandboxFsBridge } from "../sandbox/fs-bridge.js"; import { detectRuntimeShell } from "../shell-utils.js"; +import { stripSystemPromptCacheBoundary } from "../system-prompt-cache-boundary.js"; import { buildSystemPromptParams } from "../system-prompt-params.js"; import { buildAgentSystemPrompt } from "../system-prompt.js"; import { sanitizeImageBlocks } from "../tool-images.js"; @@ -253,7 +254,7 @@ export function buildCliArgs(params: { args.push(params.backend.modelArg, params.modelId); } if (!params.useResume && params.systemPrompt && params.backend.systemPromptArg) { - args.push(params.backend.systemPromptArg, params.systemPrompt); + args.push(params.backend.systemPromptArg, stripSystemPromptCacheBoundary(params.systemPrompt)); } if (!params.useResume && params.sessionId) { if (params.backend.sessionArgs && params.backend.sessionArgs.length > 0) { diff --git a/src/agents/google-transport-stream.ts b/src/agents/google-transport-stream.ts index b319db45e84..20ed9712e77 100644 --- a/src/agents/google-transport-stream.ts +++ b/src/agents/google-transport-stream.ts @@ -10,6 +10,7 @@ import { import { parseGeminiAuth } from "../infra/gemini-auth.js"; import { normalizeGoogleApiBaseUrl } from "../infra/google-api-base-url.js"; import { buildGuardedModelFetch } from "./provider-transport-fetch.js"; +import { stripSystemPromptCacheBoundary } from "./system-prompt-cache-boundary.js"; import { transformTransportMessages } from "./transport-message-transform.js"; import { createEmptyTransportUsage, @@ -445,7 +446,11 @@ export function buildGoogleGenerativeAiParams( } if (context.systemPrompt) { params.systemInstruction = { - parts: [{ text: sanitizeTransportPayloadText(context.systemPrompt) }], + parts: [ + { + text: sanitizeTransportPayloadText(stripSystemPromptCacheBoundary(context.systemPrompt)), + }, + ], }; } if (context.tools?.length) { diff --git a/src/agents/openai-transport-stream.test.ts b/src/agents/openai-transport-stream.test.ts index 39be59ae336..804a27934ed 100644 --- a/src/agents/openai-transport-stream.test.ts +++ b/src/agents/openai-transport-stream.test.ts @@ -10,10 +10,12 @@ import { import { attachModelProviderRequestTransport } from "./provider-request-config.js"; import { buildTransportAwareSimpleStreamFn, + createBoundaryAwareStreamFnForModel, isTransportAwareApiSupported, prepareTransportAwareSimpleModel, resolveTransportAwareSimpleApi, } from "./provider-transport-stream.js"; +import { SYSTEM_PROMPT_CACHE_BOUNDARY } from "./system-prompt-cache-boundary.js"; describe("openai transport stream", () => { it("reports the supported transport-aware APIs", () => { @@ -24,6 +26,51 @@ describe("openai transport stream", () => { expect(isTransportAwareApiSupported("google-generative-ai")).toBe(true); }); + it("builds boundary-aware stream shapers for supported default agent transports", () => { + expect( + createBoundaryAwareStreamFnForModel({ + id: "gpt-5.4", + name: "GPT-5.4", + api: "openai-responses", + provider: "openai", + baseUrl: "https://api.openai.com/v1", + reasoning: true, + input: ["text"], + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 }, + contextWindow: 200000, + maxTokens: 8192, + } satisfies Model<"openai-responses">), + ).toBeTypeOf("function"); + expect( + createBoundaryAwareStreamFnForModel({ + id: "claude-sonnet-4-6", + name: "Claude Sonnet 4.6", + api: "anthropic-messages", + provider: "anthropic", + baseUrl: "https://api.anthropic.com", + reasoning: true, + input: ["text"], + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 }, + contextWindow: 200000, + maxTokens: 8192, + } satisfies Model<"anthropic-messages">), + ).toBeTypeOf("function"); + expect( + createBoundaryAwareStreamFnForModel({ + id: "gemini-3.1-pro-preview", + name: "Gemini 3.1 Pro Preview", + api: "google-generative-ai", + provider: "google", + baseUrl: "https://generativelanguage.googleapis.com/v1beta", + reasoning: true, + input: ["text"], + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 }, + contextWindow: 200000, + maxTokens: 8192, + } satisfies Model<"google-generative-ai">), + ).toBeTypeOf("function"); + }); + it("prepares a custom simple-completion api alias when transport overrides are attached", () => { const model = attachModelProviderRequestTransport( { @@ -439,6 +486,31 @@ describe("openai transport stream", () => { expect(params.input?.[0]).toMatchObject({ role: "developer" }); }); + it("strips the internal cache boundary from OpenAI system prompts", () => { + const params = buildOpenAIResponsesParams( + { + id: "gpt-5.4", + name: "GPT-5.4", + api: "openai-responses", + provider: "openai", + baseUrl: "https://api.openai.com/v1", + reasoning: true, + input: ["text"], + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 }, + contextWindow: 200000, + maxTokens: 8192, + } satisfies Model<"openai-responses">, + { + systemPrompt: `Stable prefix${SYSTEM_PROMPT_CACHE_BOUNDARY}Dynamic suffix`, + messages: [], + tools: [], + } as never, + undefined, + ) as { input?: Array<{ content?: string }> }; + + expect(params.input?.[0]?.content).toBe("Stable prefix\nDynamic suffix"); + }); + it("defaults responses tool schemas to strict on native OpenAI routes", () => { const params = buildOpenAIResponsesParams( { @@ -689,6 +761,31 @@ describe("openai transport stream", () => { expect(params.messages?.[0]).toMatchObject({ role: "system" }); }); + it("strips the internal cache boundary from OpenAI completions system prompts", () => { + const params = buildOpenAICompletionsParams( + { + id: "gpt-4.1", + name: "GPT-4.1", + api: "openai-completions", + provider: "openai", + baseUrl: "https://api.openai.com/v1", + reasoning: false, + input: ["text"], + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 }, + contextWindow: 200000, + maxTokens: 8192, + } satisfies Model<"openai-completions">, + { + systemPrompt: `Stable prefix${SYSTEM_PROMPT_CACHE_BOUNDARY}Dynamic suffix`, + messages: [], + tools: [], + } as never, + undefined, + ) as { messages?: Array<{ content?: string }> }; + + expect(params.messages?.[0]?.content).toBe("Stable prefix\nDynamic suffix"); + }); + it("uses system role and streaming usage compat for native ModelStudio completions providers", () => { const params = buildOpenAICompletionsParams( { diff --git a/src/agents/openai-transport-stream.ts b/src/agents/openai-transport-stream.ts index 55eb53ef7f6..cd7f3fda733 100644 --- a/src/agents/openai-transport-stream.ts +++ b/src/agents/openai-transport-stream.ts @@ -29,6 +29,7 @@ import { } from "./openai-responses-payload-policy.js"; import { resolveProviderRequestCapabilities } from "./provider-attribution.js"; import { buildGuardedModelFetch } from "./provider-transport-fetch.js"; +import { stripSystemPromptCacheBoundary } from "./system-prompt-cache-boundary.js"; import { transformTransportMessages } from "./transport-message-transform.js"; import { mergeTransportMetadata, sanitizeTransportPayloadText } from "./transport-stream-shared.js"; @@ -225,7 +226,7 @@ function convertResponsesMessages( if (includeSystemPrompt && context.systemPrompt) { messages.push({ role: model.reasoning && options?.supportsDeveloperRole !== false ? "developer" : "system", - content: sanitizeTransportPayloadText(context.systemPrompt), + content: sanitizeTransportPayloadText(stripSystemPromptCacheBoundary(context.systemPrompt)), }); } let msgIndex = 0; @@ -1294,9 +1295,15 @@ export function buildOpenAICompletionsParams( options: OpenAICompletionsOptions | undefined, ) { const compat = getCompat(model); + const completionsContext = context.systemPrompt + ? { + ...context, + systemPrompt: stripSystemPromptCacheBoundary(context.systemPrompt), + } + : context; const params: Record = { model: model.id, - messages: convertMessages(model as never, context, compat as never), + messages: convertMessages(model as never, completionsContext, compat as never), stream: true, }; if (compat.supportsUsageInStreaming) { diff --git a/src/agents/openai-ws-request.ts b/src/agents/openai-ws-request.ts index f0772239ee2..d457953357f 100644 --- a/src/agents/openai-ws-request.ts +++ b/src/agents/openai-ws-request.ts @@ -7,6 +7,7 @@ import type { } from "./openai-ws-connection.js"; import { resolveOpenAITextVerbosity } from "./pi-embedded-runner/openai-stream-wrappers.js"; import { resolveProviderRequestPolicyConfig } from "./provider-request-config.js"; +import { stripSystemPromptCacheBoundary } from "./system-prompt-cache-boundary.js"; type WsModel = Parameters[0]; type WsContext = Parameters[1]; @@ -106,7 +107,9 @@ export function buildOpenAIWebSocketResponseCreatePayload(params: { model: params.model.id, ...(supportsResponsesStoreField ? { store: false } : {}), input: params.turnInput.inputItems, - instructions: params.context.systemPrompt ?? undefined, + instructions: params.context.systemPrompt + ? stripSystemPromptCacheBoundary(params.context.systemPrompt) + : undefined, tools: params.tools.length > 0 ? params.tools : undefined, ...(params.turnInput.previousResponseId ? { previous_response_id: params.turnInput.previousResponseId } diff --git a/src/agents/openai-ws-stream.test.ts b/src/agents/openai-ws-stream.test.ts index 5740dbd0cbb..00f6b3484df 100644 --- a/src/agents/openai-ws-stream.test.ts +++ b/src/agents/openai-ws-stream.test.ts @@ -22,6 +22,7 @@ import { releaseWsSession, } from "./openai-ws-stream.js"; import { log } from "./pi-embedded-runner/logger.js"; +import { SYSTEM_PROMPT_CACHE_BOUNDARY } from "./system-prompt-cache-boundary.js"; // ───────────────────────────────────────────────────────────────────────────── // Mock OpenAIWebSocketManager @@ -1853,6 +1854,36 @@ describe("createOpenAIWebSocketStreamFn", () => { expect((sent.tools ?? []).length).toBeGreaterThan(0); }); + it("strips the internal cache boundary from websocket instructions", async () => { + const streamFn = createOpenAIWebSocketStreamFn("sk-test", "sess-boundary"); + const ctx = { + systemPrompt: `Stable prefix${SYSTEM_PROMPT_CACHE_BOUNDARY}Dynamic suffix`, + messages: [userMsg("Hello")] as Parameters[0], + tools: [], + }; + + const stream = streamFn( + modelStub as Parameters[0], + ctx as Parameters[1], + ); + + await new Promise((r) => setImmediate(r)); + const manager = MockManager.lastInstance!; + manager.simulateEvent({ + type: "response.completed", + response: makeResponseObject("resp_boundary", "ok"), + }); + + for await (const _ of await resolveStream(stream)) { + // consume + } + + const sent = manager.sentEvents[0] as { + instructions?: string; + }; + expect(sent.instructions).toBe("Stable prefix\nDynamic suffix"); + }); + it("falls back to HTTP after the websocket send retry budget is exhausted", async () => { const sessionId = "sess-send-fail-reset"; const streamFn = createOpenAIWebSocketStreamFn("sk-test", sessionId); diff --git a/src/agents/openai-ws-stream.ts b/src/agents/openai-ws-stream.ts index b247f44151c..e765b0e78b0 100644 --- a/src/agents/openai-ws-stream.ts +++ b/src/agents/openai-ws-stream.ts @@ -54,6 +54,7 @@ import { buildStreamErrorAssistantMessage, } from "./stream-message-shared.js"; import { mergeTransportMetadata } from "./transport-stream-shared.js"; +import { stripSystemPromptCacheBoundary } from "./system-prompt-cache-boundary.js"; // ───────────────────────────────────────────────────────────────────────────── // Per-session state @@ -590,7 +591,9 @@ export function createOpenAIWebSocketStreamFn( manager: session.manager, modelId: model.id, tools: convertTools(context.tools), - instructions: context.systemPrompt ?? undefined, + instructions: context.systemPrompt + ? stripSystemPromptCacheBoundary(context.systemPrompt) + : undefined, metadata: resolveProviderTransportTurnState(model, { sessionId, turnId, diff --git a/src/agents/pi-embedded-runner/run/attempt.prompt-helpers.ts b/src/agents/pi-embedded-runner/run/attempt.prompt-helpers.ts index 7748d09f432..d58788d7bb9 100644 --- a/src/agents/pi-embedded-runner/run/attempt.prompt-helpers.ts +++ b/src/agents/pi-embedded-runner/run/attempt.prompt-helpers.ts @@ -6,6 +6,7 @@ import type { } from "../../../plugins/types.js"; import { isCronSessionKey, isSubagentSessionKey } from "../../../routing/session-key.js"; import { joinPresentTextSegments } from "../../../shared/text/join-segments.js"; +import { prependSystemPromptAdditionAfterCacheBoundary } from "../../system-prompt-cache-boundary.js"; import { resolveEffectiveToolFsWorkspaceOnly } from "../../tool-fs-policy.js"; import type { CompactEmbeddedPiSessionParams } from "../compact.js"; import { buildEmbeddedCompactionRuntimeContext } from "../compaction-runtime-context.js"; @@ -109,10 +110,7 @@ export function prependSystemPromptAddition(params: { systemPrompt: string; systemPromptAddition?: string; }): string { - if (!params.systemPromptAddition) { - return params.systemPrompt; - } - return `${params.systemPromptAddition}\n\n${params.systemPrompt}`; + return prependSystemPromptAdditionAfterCacheBoundary(params); } /** Build runtime context passed into context-engine afterTurn hooks. */ diff --git a/src/agents/pi-embedded-runner/run/attempt.test.ts b/src/agents/pi-embedded-runner/run/attempt.test.ts index 267ecad3825..d9262b54d8f 100644 --- a/src/agents/pi-embedded-runner/run/attempt.test.ts +++ b/src/agents/pi-embedded-runner/run/attempt.test.ts @@ -1,3 +1,4 @@ +import { streamSimple } from "@mariozechner/pi-ai"; import { describe, expect, it, vi } from "vitest"; import type { OpenClawConfig } from "../../../config/config.js"; import { @@ -7,6 +8,7 @@ import { wrapOllamaCompatNumCtx, } from "../../../plugin-sdk/ollama.js"; import { appendBootstrapPromptWarning } from "../../bootstrap-budget.js"; +import { SYSTEM_PROMPT_CACHE_BOUNDARY } from "../../system-prompt-cache-boundary.js"; import { buildAgentSystemPrompt } from "../../system-prompt.js"; import { buildAfterTurnRuntimeContext, @@ -247,6 +249,65 @@ describe("resolveEmbeddedAgentStreamFn", () => { }); expect(providerStreamFn).toHaveBeenCalledTimes(1); }); + + it("strips the internal cache boundary before provider-owned stream calls", async () => { + const providerStreamFn = vi.fn(async (_model, context) => context); + const streamFn = resolveEmbeddedAgentStreamFn({ + currentStreamFn: undefined, + providerStreamFn, + shouldUseWebSocketTransport: false, + sessionId: "session-1", + model: { + api: "openai-completions", + provider: "demo-provider", + id: "demo-model", + } as never, + }); + + await expect( + streamFn( + { provider: "demo-provider", id: "demo-model" } as never, + { + systemPrompt: `Stable prefix${SYSTEM_PROMPT_CACHE_BOUNDARY}Dynamic suffix`, + } as never, + {}, + ), + ).resolves.toMatchObject({ + systemPrompt: "Stable prefix\nDynamic suffix", + }); + expect(providerStreamFn).toHaveBeenCalledTimes(1); + }); + + it("routes supported default streamSimple fallbacks through boundary-aware transports", () => { + const streamFn = resolveEmbeddedAgentStreamFn({ + currentStreamFn: undefined, + shouldUseWebSocketTransport: false, + sessionId: "session-1", + model: { + api: "openai-responses", + provider: "openai", + id: "gpt-5.4", + } as never, + }); + + expect(streamFn).not.toBe(streamSimple); + }); + + it("keeps explicit custom currentStreamFn values unchanged", () => { + const currentStreamFn = vi.fn(); + const streamFn = resolveEmbeddedAgentStreamFn({ + currentStreamFn: currentStreamFn as never, + shouldUseWebSocketTransport: false, + sessionId: "session-1", + model: { + api: "openai-responses", + provider: "openai", + id: "gpt-5.4", + } as never, + }); + + expect(streamFn).toBe(currentStreamFn); + }); }); describe("resolveAttemptFsWorkspaceOnly", () => { diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index d8cdda1c27f..601c6d48cd0 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -75,6 +75,7 @@ import { applyPiAutoCompactionGuard } from "../../pi-settings.js"; import { toClientToolDefinitions } from "../../pi-tool-definition-adapter.js"; import { createOpenClawCodingTools, resolveToolLoopDetectionConfig } from "../../pi-tools.js"; import { registerProviderStreamForModel } from "../../provider-stream.js"; +import { createBoundaryAwareStreamFnForModel } from "../../provider-transport-stream.js"; import { resolveSandboxContext } from "../../sandbox.js"; import { resolveSandboxRuntimeStatus } from "../../sandbox/runtime-status.js"; import { repairSessionFileIfNeeded } from "../../session-file-repair.js"; @@ -90,6 +91,7 @@ import { applySkillEnvOverridesFromSnapshot, resolveSkillsPromptForRun, } from "../../skills.js"; +import { stripSystemPromptCacheBoundary } from "../../system-prompt-cache-boundary.js"; import { buildSystemPromptParams } from "../../system-prompt-params.js"; import { buildSystemPromptReport } from "../../system-prompt-report.js"; import { sanitizeToolCallIdsForCloudCodeAssist } from "../../tool-call-id.js"; @@ -227,6 +229,13 @@ export function resolveEmbeddedAgentStreamFn(params: { }): StreamFn { if (params.providerStreamFn) { const inner = params.providerStreamFn; + const normalizeContext = (context: Parameters[1]) => + context.systemPrompt + ? { + ...context, + systemPrompt: stripSystemPromptCacheBoundary(context.systemPrompt), + } + : context; // Provider-owned transports bypass pi-coding-agent's default auth lookup, // so keep injecting the resolved runtime apiKey for streamSimple-compatible // transports that still read credentials from options.apiKey. @@ -234,10 +243,13 @@ export function resolveEmbeddedAgentStreamFn(params: { const { authStorage, model } = params; return async (m, context, options) => { const apiKey = await authStorage.getApiKey(model.provider); - return inner(m, context, { ...options, apiKey: apiKey ?? options?.apiKey }); + return inner(m, normalizeContext(context), { + ...options, + apiKey: apiKey ?? options?.apiKey, + }); }; } - return inner; + return (m, context, options) => inner(m, normalizeContext(context), options); } const currentStreamFn = params.currentStreamFn ?? streamSimple; @@ -253,6 +265,13 @@ export function resolveEmbeddedAgentStreamFn(params: { return createAnthropicVertexStreamFnForModel(params.model); } + if (params.currentStreamFn === undefined || params.currentStreamFn === streamSimple) { + const boundaryAwareStreamFn = createBoundaryAwareStreamFnForModel(params.model); + if (boundaryAwareStreamFn) { + return boundaryAwareStreamFn; + } + } + return currentStreamFn; } diff --git a/src/agents/provider-transport-stream.ts b/src/agents/provider-transport-stream.ts index a30d46f560a..007511ac801 100644 --- a/src/agents/provider-transport-stream.ts +++ b/src/agents/provider-transport-stream.ts @@ -25,6 +25,23 @@ const SIMPLE_TRANSPORT_API_ALIAS: Record = { "google-generative-ai": "openclaw-google-generative-ai-transport", }; +function createSupportedTransportStreamFn(api: Api): StreamFn | undefined { + switch (api) { + case "openai-responses": + return createOpenAIResponsesTransportStreamFn(); + case "openai-completions": + return createOpenAICompletionsTransportStreamFn(); + case "azure-openai-responses": + return createAzureOpenAIResponsesTransportStreamFn(); + case "anthropic-messages": + return createAnthropicMessagesTransportStreamFn(); + case "google-generative-ai": + return createGoogleGenerativeAiTransportStreamFn(); + default: + return undefined; + } +} + function hasTransportOverrides(model: Model): boolean { const request = getModelProviderRequestTransport(model); return Boolean(request?.proxy || request?.tls); @@ -47,20 +64,14 @@ export function createTransportAwareStreamFnForModel(model: Model): StreamF `Model-provider request.proxy/request.tls is not yet supported for api "${model.api}"`, ); } - switch (model.api) { - case "openai-responses": - return createOpenAIResponsesTransportStreamFn(); - case "openai-completions": - return createOpenAICompletionsTransportStreamFn(); - case "azure-openai-responses": - return createAzureOpenAIResponsesTransportStreamFn(); - case "anthropic-messages": - return createAnthropicMessagesTransportStreamFn(); - case "google-generative-ai": - return createGoogleGenerativeAiTransportStreamFn(); - default: - return undefined; + return createSupportedTransportStreamFn(model.api); +} + +export function createBoundaryAwareStreamFnForModel(model: Model): StreamFn | undefined { + if (!isTransportAwareApiSupported(model.api)) { + return undefined; } + return createSupportedTransportStreamFn(model.api); } export function prepareTransportAwareSimpleModel(model: Model): Model { diff --git a/src/agents/system-prompt-cache-boundary.test.ts b/src/agents/system-prompt-cache-boundary.test.ts new file mode 100644 index 00000000000..2a63fb57250 --- /dev/null +++ b/src/agents/system-prompt-cache-boundary.test.ts @@ -0,0 +1,33 @@ +import { describe, expect, it } from "vitest"; +import { + prependSystemPromptAdditionAfterCacheBoundary, + splitSystemPromptCacheBoundary, + stripSystemPromptCacheBoundary, + SYSTEM_PROMPT_CACHE_BOUNDARY, +} from "./system-prompt-cache-boundary.js"; + +describe("system prompt cache boundary helpers", () => { + it("splits stable and dynamic prompt regions", () => { + expect( + splitSystemPromptCacheBoundary(`Stable prefix${SYSTEM_PROMPT_CACHE_BOUNDARY}Dynamic suffix`), + ).toEqual({ + stablePrefix: "Stable prefix", + dynamicSuffix: "Dynamic suffix", + }); + }); + + it("strips the internal marker from prompt text", () => { + expect( + stripSystemPromptCacheBoundary(`Stable prefix${SYSTEM_PROMPT_CACHE_BOUNDARY}Dynamic suffix`), + ).toBe("Stable prefix\nDynamic suffix"); + }); + + it("inserts prompt additions after the cache boundary", () => { + expect( + prependSystemPromptAdditionAfterCacheBoundary({ + systemPrompt: `Stable prefix${SYSTEM_PROMPT_CACHE_BOUNDARY}Dynamic suffix`, + systemPromptAddition: "Per-turn lab context", + }), + ).toBe(`Stable prefix${SYSTEM_PROMPT_CACHE_BOUNDARY}Per-turn lab context\n\nDynamic suffix`); + }); +}); diff --git a/src/agents/system-prompt-cache-boundary.ts b/src/agents/system-prompt-cache-boundary.ts new file mode 100644 index 00000000000..01e3558dff8 --- /dev/null +++ b/src/agents/system-prompt-cache-boundary.ts @@ -0,0 +1,38 @@ +export const SYSTEM_PROMPT_CACHE_BOUNDARY = "\n\n"; + +export function stripSystemPromptCacheBoundary(text: string): string { + return text.replaceAll(SYSTEM_PROMPT_CACHE_BOUNDARY, "\n"); +} + +export function splitSystemPromptCacheBoundary( + text: string, +): { stablePrefix: string; dynamicSuffix: string } | undefined { + const boundaryIndex = text.indexOf(SYSTEM_PROMPT_CACHE_BOUNDARY); + if (boundaryIndex === -1) { + return undefined; + } + return { + stablePrefix: text.slice(0, boundaryIndex).trimEnd(), + dynamicSuffix: text.slice(boundaryIndex + SYSTEM_PROMPT_CACHE_BOUNDARY.length).trimStart(), + }; +} + +export function prependSystemPromptAdditionAfterCacheBoundary(params: { + systemPrompt: string; + systemPromptAddition?: string; +}): string { + if (!params.systemPromptAddition) { + return params.systemPrompt; + } + + const split = splitSystemPromptCacheBoundary(params.systemPrompt); + if (!split) { + return `${params.systemPromptAddition}\n\n${params.systemPrompt}`; + } + + if (!split.dynamicSuffix) { + return `${split.stablePrefix}${SYSTEM_PROMPT_CACHE_BOUNDARY}${params.systemPromptAddition}`; + } + + return `${split.stablePrefix}${SYSTEM_PROMPT_CACHE_BOUNDARY}${params.systemPromptAddition}\n\n${split.dynamicSuffix}`; +} diff --git a/src/agents/system-prompt.ts b/src/agents/system-prompt.ts index b29ec32dcf7..e60ed34bbb7 100644 --- a/src/agents/system-prompt.ts +++ b/src/agents/system-prompt.ts @@ -10,6 +10,7 @@ import type { ResolvedTimeFormat } from "./date-time.js"; import type { EmbeddedContextFile } from "./pi-embedded-helpers.js"; import type { EmbeddedSandboxInfo } from "./pi-embedded-runner/types.js"; import { sanitizeForPromptLiteral } from "./sanitize-for-prompt.js"; +import { SYSTEM_PROMPT_CACHE_BOUNDARY } from "./system-prompt-cache-boundary.js"; /** * Controls which hardcoded sections are included in the system prompt. @@ -584,12 +585,6 @@ export function buildAgentSystemPrompt(params: { ...buildVoiceSection({ isMinimal, ttsHint: params.ttsHint }), ]; - if (extraSystemPrompt) { - // Use "Subagent Context" header for minimal mode (subagents), otherwise "Group Chat Context" - const contextHeader = - promptMode === "minimal" ? "## Subagent Context" : "## Group Chat Context"; - lines.push(contextHeader, extraSystemPrompt, ""); - } if (params.reactionGuidance) { const { level, channel } = params.reactionGuidance; const guidanceText = @@ -660,6 +655,18 @@ export function buildAgentSystemPrompt(params: { ); } + // Keep large stable prompt context above this seam so Anthropic-family + // transports can reuse it across labs and turns. Dynamic group/session + // additions below it are the primary cache invalidators. + lines.push(SYSTEM_PROMPT_CACHE_BOUNDARY); + + if (extraSystemPrompt) { + // Use "Subagent Context" header for minimal mode (subagents), otherwise "Group Chat Context" + const contextHeader = + promptMode === "minimal" ? "## Subagent Context" : "## Group Chat Context"; + lines.push(contextHeader, extraSystemPrompt, ""); + } + // Skip heartbeats for subagent/none modes if (!isMinimal && heartbeatPrompt) { lines.push(