diff --git a/extensions/anthropic/stream-wrappers.ts b/extensions/anthropic/stream-wrappers.ts index 6808f18b252..f60b9b77d9c 100644 --- a/extensions/anthropic/stream-wrappers.ts +++ b/extensions/anthropic/stream-wrappers.ts @@ -1,7 +1,10 @@ import type { StreamFn } from "@mariozechner/pi-agent-core"; import { streamSimple } from "@mariozechner/pi-ai"; -import { resolveProviderRequestCapabilities } from "openclaw/plugin-sdk/provider-http"; -import { streamWithPayloadPatch } from "openclaw/plugin-sdk/provider-stream"; +import { + applyAnthropicPayloadPolicyToParams, + resolveAnthropicPayloadPolicy, + streamWithPayloadPatch, +} from "openclaw/plugin-sdk/provider-stream"; import { createSubsystemLogger } from "openclaw/plugin-sdk/runtime-env"; const log = createSubsystemLogger("anthropic-stream"); @@ -52,20 +55,6 @@ function isAnthropicOAuthApiKey(apiKey: unknown): boolean { return typeof apiKey === "string" && apiKey.includes("sk-ant-oat"); } -function allowsAnthropicServiceTier(model: { - api?: unknown; - provider?: unknown; - baseUrl?: unknown; -}): boolean { - return resolveProviderRequestCapabilities({ - provider: typeof model.provider === "string" ? model.provider : undefined, - api: typeof model.api === "string" ? model.api : undefined, - baseUrl: typeof model.baseUrl === "string" ? model.baseUrl : undefined, - capability: "llm", - transport: "stream", - }).allowsAnthropicServiceTier; -} - function resolveAnthropicFastServiceTier(enabled: boolean): AnthropicServiceTier { return enabled ? "auto" : "standard_only"; } @@ -161,15 +150,19 @@ export function createAnthropicFastModeWrapper( const underlying = baseStreamFn ?? streamSimple; const serviceTier = resolveAnthropicFastServiceTier(enabled); return (model, context, options) => { - if (!allowsAnthropicServiceTier(model)) { + const payloadPolicy = resolveAnthropicPayloadPolicy({ + provider: typeof model.provider === "string" ? model.provider : undefined, + api: typeof model.api === "string" ? model.api : undefined, + baseUrl: typeof model.baseUrl === "string" ? model.baseUrl : undefined, + serviceTier, + }); + if (!payloadPolicy.allowsServiceTier) { return underlying(model, context, options); } - return streamWithPayloadPatch(underlying, model, context, options, (payloadObj) => { - if (payloadObj.service_tier === undefined) { - payloadObj.service_tier = serviceTier; - } - }); + return streamWithPayloadPatch(underlying, model, context, options, (payloadObj) => + applyAnthropicPayloadPolicyToParams(payloadObj, payloadPolicy), + ); }; } @@ -179,15 +172,19 @@ export function createAnthropicServiceTierWrapper( ): StreamFn { const underlying = baseStreamFn ?? streamSimple; return (model, context, options) => { - if (!allowsAnthropicServiceTier(model)) { + const payloadPolicy = resolveAnthropicPayloadPolicy({ + provider: typeof model.provider === "string" ? model.provider : undefined, + api: typeof model.api === "string" ? model.api : undefined, + baseUrl: typeof model.baseUrl === "string" ? model.baseUrl : undefined, + serviceTier, + }); + if (!payloadPolicy.allowsServiceTier) { return underlying(model, context, options); } - return streamWithPayloadPatch(underlying, model, context, options, (payloadObj) => { - if (payloadObj.service_tier === undefined) { - payloadObj.service_tier = serviceTier; - } - }); + return streamWithPayloadPatch(underlying, model, context, options, (payloadObj) => + applyAnthropicPayloadPolicyToParams(payloadObj, payloadPolicy), + ); }; } diff --git a/src/agents/anthropic-payload-policy.test.ts b/src/agents/anthropic-payload-policy.test.ts new file mode 100644 index 00000000000..fc47603a05b --- /dev/null +++ b/src/agents/anthropic-payload-policy.test.ts @@ -0,0 +1,99 @@ +import { describe, expect, it } from "vitest"; +import { + applyAnthropicPayloadPolicyToParams, + resolveAnthropicPayloadPolicy, +} from "./anthropic-payload-policy.js"; + +describe("anthropic payload policy", () => { + it("applies native Anthropic service tier and cache markers without widening cache scope", () => { + const policy = resolveAnthropicPayloadPolicy({ + provider: "anthropic", + api: "anthropic-messages", + baseUrl: "https://api.anthropic.com/v1", + cacheRetention: "long", + enableCacheControl: true, + serviceTier: "standard_only", + }); + const payload: Record = { + system: [ + { type: "text", text: "Follow policy." }, + { type: "text", text: "Use tools carefully." }, + ], + messages: [ + { + role: "assistant", + content: [{ type: "text", text: "Working." }], + }, + { + role: "user", + content: [ + { type: "text", text: "Hello" }, + { type: "tool_result", tool_use_id: "tool_1", content: "done" }, + ], + }, + ], + }; + + applyAnthropicPayloadPolicyToParams(payload, policy); + + expect(payload.service_tier).toBe("standard_only"); + expect(payload.system).toEqual([ + { + type: "text", + text: "Follow policy.", + cache_control: { type: "ephemeral", ttl: "1h" }, + }, + { + type: "text", + text: "Use tools carefully.", + cache_control: { type: "ephemeral", ttl: "1h" }, + }, + ]); + expect(payload.messages[0]).toEqual({ + role: "assistant", + content: [{ type: "text", text: "Working." }], + }); + expect(payload.messages[1]).toEqual({ + role: "user", + content: [ + { type: "text", text: "Hello" }, + { + type: "tool_result", + tool_use_id: "tool_1", + content: "done", + cache_control: { type: "ephemeral", ttl: "1h" }, + }, + ], + }); + }); + + it("denies proxied Anthropic service tier and omits long-TTL upgrades for custom hosts", () => { + const policy = resolveAnthropicPayloadPolicy({ + provider: "anthropic", + api: "anthropic-messages", + baseUrl: "https://proxy.example.com/anthropic", + cacheRetention: "long", + enableCacheControl: true, + serviceTier: "auto", + }); + const payload: Record = { + system: [{ type: "text", text: "Follow policy." }], + messages: [{ role: "user", content: "Hello" }], + }; + + applyAnthropicPayloadPolicyToParams(payload, policy); + + expect(payload).not.toHaveProperty("service_tier"); + expect(payload.system).toEqual([ + { + type: "text", + text: "Follow policy.", + cache_control: { type: "ephemeral" }, + }, + ]); + expect(payload.messages[0]).toEqual({ + role: "user", + content: [{ type: "text", text: "Hello", cache_control: { type: "ephemeral" } }], + }); + }); +}); diff --git a/src/agents/anthropic-payload-policy.ts b/src/agents/anthropic-payload-policy.ts new file mode 100644 index 00000000000..50887824938 --- /dev/null +++ b/src/agents/anthropic-payload-policy.ts @@ -0,0 +1,188 @@ +import { resolveProviderRequestCapabilities } from "./provider-attribution.js"; + +export type AnthropicServiceTier = "auto" | "standard_only"; + +export type AnthropicEphemeralCacheControl = { + type: "ephemeral"; + ttl?: "1h"; +}; + +type AnthropicPayloadPolicyInput = { + api?: string; + baseUrl?: string; + cacheRetention?: "short" | "long" | "none"; + enableCacheControl?: boolean; + provider?: string; + serviceTier?: AnthropicServiceTier; +}; + +export type AnthropicPayloadPolicy = { + allowsServiceTier: boolean; + cacheControl: AnthropicEphemeralCacheControl | undefined; + serviceTier: AnthropicServiceTier | undefined; +}; + +function resolveAnthropicEphemeralCacheControl( + baseUrl: string | undefined, + cacheRetention: AnthropicPayloadPolicyInput["cacheRetention"], +): AnthropicEphemeralCacheControl | undefined { + const retention = + cacheRetention ?? (process.env.PI_CACHE_RETENTION === "long" ? "long" : "short"); + if (retention === "none") { + return undefined; + } + const ttl = + retention === "long" && typeof baseUrl === "string" && baseUrl.includes("api.anthropic.com") + ? "1h" + : undefined; + return { type: "ephemeral", ...(ttl ? { ttl } : {}) }; +} + +function applyAnthropicCacheControlToSystem( + system: unknown, + cacheControl: AnthropicEphemeralCacheControl, +): void { + if (!Array.isArray(system)) { + return; + } + + for (const block of system) { + if (!block || typeof block !== "object") { + continue; + } + const record = block as Record; + if (record.type === "text" && record.cache_control === undefined) { + record.cache_control = cacheControl; + } + } +} + +function applyAnthropicCacheControlToMessages( + messages: unknown, + cacheControl: AnthropicEphemeralCacheControl, +): void { + if (!Array.isArray(messages) || messages.length === 0) { + return; + } + + const lastMessage = messages[messages.length - 1]; + if (!lastMessage || typeof lastMessage !== "object") { + return; + } + + const record = lastMessage as Record; + if (record.role !== "user") { + return; + } + + const content = record.content; + if (Array.isArray(content)) { + const lastBlock = content[content.length - 1]; + if (!lastBlock || typeof lastBlock !== "object") { + return; + } + const lastBlockRecord = lastBlock as Record; + if ( + lastBlockRecord.type === "text" || + lastBlockRecord.type === "image" || + lastBlockRecord.type === "tool_result" + ) { + lastBlockRecord.cache_control = cacheControl; + } + return; + } + + if (typeof content === "string") { + record.content = [ + { + type: "text", + text: content, + cache_control: cacheControl, + }, + ]; + } +} + +export function resolveAnthropicPayloadPolicy( + input: AnthropicPayloadPolicyInput, +): AnthropicPayloadPolicy { + const capabilities = resolveProviderRequestCapabilities({ + provider: input.provider, + api: input.api, + baseUrl: input.baseUrl, + capability: "llm", + transport: "stream", + }); + + return { + allowsServiceTier: capabilities.allowsAnthropicServiceTier, + cacheControl: + input.enableCacheControl === true + ? resolveAnthropicEphemeralCacheControl(input.baseUrl, input.cacheRetention) + : undefined, + serviceTier: input.serviceTier, + }; +} + +export function applyAnthropicPayloadPolicyToParams( + payloadObj: Record, + policy: AnthropicPayloadPolicy, +): void { + if ( + policy.allowsServiceTier && + policy.serviceTier !== undefined && + payloadObj.service_tier === undefined + ) { + payloadObj.service_tier = policy.serviceTier; + } + + if (!policy.cacheControl) { + return; + } + + applyAnthropicCacheControlToSystem(payloadObj.system, policy.cacheControl); + // Preserve Anthropic cache-write scope by only tagging the trailing user turn. + applyAnthropicCacheControlToMessages(payloadObj.messages, policy.cacheControl); +} + +export function applyAnthropicEphemeralCacheControlMarkers( + payloadObj: Record, +): void { + const messages = payloadObj.messages; + if (!Array.isArray(messages)) { + return; + } + + for (const message of messages as Array<{ role?: string; content?: unknown }>) { + if (message.role === "system" || message.role === "developer") { + if (typeof message.content === "string") { + message.content = [ + { type: "text", text: message.content, cache_control: { type: "ephemeral" } }, + ]; + continue; + } + if (Array.isArray(message.content) && message.content.length > 0) { + const last = message.content[message.content.length - 1]; + if (last && typeof last === "object") { + const record = last as Record; + if (record.type !== "thinking" && record.type !== "redacted_thinking") { + record.cache_control = { type: "ephemeral" }; + } + } + } + continue; + } + + if (message.role === "assistant" && Array.isArray(message.content)) { + for (const block of message.content) { + if (!block || typeof block !== "object") { + continue; + } + const record = block as Record; + if (record.type === "thinking" || record.type === "redacted_thinking") { + delete record.cache_control; + } + } + } + } +} diff --git a/src/agents/anthropic-transport-stream.ts b/src/agents/anthropic-transport-stream.ts index 6c66dde909d..618365a8869 100644 --- a/src/agents/anthropic-transport-stream.ts +++ b/src/agents/anthropic-transport-stream.ts @@ -10,6 +10,10 @@ import { type SimpleStreamOptions, type ThinkingLevel, } from "@mariozechner/pi-ai"; +import { + applyAnthropicPayloadPolicyToParams, + resolveAnthropicPayloadPolicy, +} from "./anthropic-payload-policy.js"; import { buildCopilotDynamicHeaders, hasCopilotVisionInput } from "./copilot-dynamic-headers.js"; import { buildGuardedModelFetch } from "./provider-transport-fetch.js"; import { transformTransportMessages } from "./transport-message-transform.js"; @@ -164,22 +168,6 @@ function fromClaudeCodeName(name: string, tools: Context["tools"] | undefined): return name; } -function resolveCacheControl( - baseUrl: string | undefined, - cacheRetention: AnthropicOptions["cacheRetention"], -): { type: "ephemeral"; ttl?: "1h" } | undefined { - const retention = - cacheRetention ?? (process.env.PI_CACHE_RETENTION === "long" ? "long" : "short"); - if (retention === "none") { - return undefined; - } - const ttl = - retention === "long" && typeof baseUrl === "string" && baseUrl.includes("api.anthropic.com") - ? "1h" - : undefined; - return { type: "ephemeral", ...(ttl ? { ttl } : {}) }; -} - function convertContentBlocks( content: Array< { type: "text"; text: string } | { type: "image"; data: string; mimeType: string } @@ -224,7 +212,6 @@ function convertAnthropicMessages( messages: Context["messages"], model: AnthropicTransportModel, isOAuthToken: boolean, - cacheControl: { type: "ephemeral"; ttl?: "1h" } | undefined, ) { const params: Array> = []; const transformedMessages = transformTransportMessages(messages, model, normalizeToolCallId); @@ -361,33 +348,6 @@ function convertAnthropicMessages( }); } } - if (cacheControl && params.length > 0) { - const lastMessage = params[params.length - 1]; - if (lastMessage.role === "user") { - const content = lastMessage.content; - if (Array.isArray(content)) { - const lastBlock = content[content.length - 1]; - if ( - lastBlock && - typeof lastBlock === "object" && - "type" in lastBlock && - (lastBlock.type === "text" || - lastBlock.type === "image" || - lastBlock.type === "tool_result") - ) { - (lastBlock as Record).cache_control = cacheControl; - } - } else if (typeof content === "string") { - lastMessage.content = [ - { - type: "text", - text: content, - cache_control: cacheControl, - }, - ]; - } - } - } return params; } @@ -515,11 +475,17 @@ function buildAnthropicParams( isOAuthToken: boolean, options: AnthropicTransportOptions | undefined, ) { - const cacheControl = resolveCacheControl(model.baseUrl, options?.cacheRetention); + const payloadPolicy = resolveAnthropicPayloadPolicy({ + provider: model.provider, + api: model.api, + baseUrl: model.baseUrl, + cacheRetention: options?.cacheRetention, + enableCacheControl: true, + }); const defaultMaxTokens = Math.min(model.maxTokens, 32_000); const params: Record = { model: model.id, - messages: convertAnthropicMessages(context.messages, model, isOAuthToken, cacheControl), + messages: convertAnthropicMessages(context.messages, model, isOAuthToken), max_tokens: options?.maxTokens || defaultMaxTokens, stream: true, }; @@ -528,14 +494,12 @@ function buildAnthropicParams( { type: "text", text: "You are Claude Code, Anthropic's official CLI for Claude.", - ...(cacheControl ? { cache_control: cacheControl } : {}), }, ...(context.systemPrompt ? [ { type: "text", text: sanitizeTransportPayloadText(context.systemPrompt), - ...(cacheControl ? { cache_control: cacheControl } : {}), }, ] : []), @@ -545,7 +509,6 @@ function buildAnthropicParams( { type: "text", text: sanitizeTransportPayloadText(context.systemPrompt), - ...(cacheControl ? { cache_control: cacheControl } : {}), }, ]; } @@ -579,6 +542,7 @@ function buildAnthropicParams( params.tool_choice = typeof options.toolChoice === "string" ? { type: options.toolChoice } : options.toolChoice; } + applyAnthropicPayloadPolicyToParams(params, payloadPolicy); return params; } diff --git a/src/agents/pi-embedded-runner/anthropic-cache-control-payload.ts b/src/agents/pi-embedded-runner/anthropic-cache-control-payload.ts index 5a710a150f8..b06c10d1849 100644 --- a/src/agents/pi-embedded-runner/anthropic-cache-control-payload.ts +++ b/src/agents/pi-embedded-runner/anthropic-cache-control-payload.ts @@ -1,41 +1 @@ -export function applyAnthropicEphemeralCacheControlMarkers( - payloadObj: Record, -): void { - const messages = payloadObj.messages; - if (!Array.isArray(messages)) { - return; - } - - for (const message of messages as Array<{ role?: string; content?: unknown }>) { - if (message.role === "system" || message.role === "developer") { - if (typeof message.content === "string") { - message.content = [ - { type: "text", text: message.content, cache_control: { type: "ephemeral" } }, - ]; - continue; - } - if (Array.isArray(message.content) && message.content.length > 0) { - const last = message.content[message.content.length - 1]; - if (last && typeof last === "object") { - const record = last as Record; - if (record.type !== "thinking" && record.type !== "redacted_thinking") { - record.cache_control = { type: "ephemeral" }; - } - } - } - continue; - } - - if (message.role === "assistant" && Array.isArray(message.content)) { - for (const block of message.content) { - if (!block || typeof block !== "object") { - continue; - } - const record = block as Record; - if (record.type === "thinking" || record.type === "redacted_thinking") { - delete record.cache_control; - } - } - } - } -} +export { applyAnthropicEphemeralCacheControlMarkers } from "../anthropic-payload-policy.js"; diff --git a/src/agents/provider-attribution.test.ts b/src/agents/provider-attribution.test.ts index 96122292b04..09b8b043c95 100644 --- a/src/agents/provider-attribution.test.ts +++ b/src/agents/provider-attribution.test.ts @@ -599,4 +599,210 @@ describe("provider attribution", () => { isKnownNativeEndpoint: true, }); }); + + it("resolves a provider capability matrix for representative native and proxied routes", () => { + const cases = [ + { + name: "native OpenAI responses", + input: { + provider: "openai", + api: "openai-responses", + baseUrl: "https://api.openai.com/v1", + capability: "llm" as const, + transport: "stream" as const, + }, + expected: { + knownProviderFamily: "openai-family", + endpointClass: "openai-public", + isKnownNativeEndpoint: true, + allowsOpenAIServiceTier: true, + supportsOpenAIReasoningCompatPayload: true, + allowsResponsesStore: true, + supportsResponsesStoreField: true, + shouldStripResponsesPromptCache: false, + allowsAnthropicServiceTier: false, + supportsNativeStreamingUsageCompat: false, + }, + }, + { + name: "proxied OpenAI responses", + input: { + provider: "openai", + api: "openai-responses", + baseUrl: "https://proxy.example.com/v1", + capability: "llm" as const, + transport: "stream" as const, + }, + expected: { + knownProviderFamily: "openai-family", + endpointClass: "custom", + isKnownNativeEndpoint: false, + allowsOpenAIServiceTier: false, + supportsOpenAIReasoningCompatPayload: false, + allowsResponsesStore: false, + supportsResponsesStoreField: true, + shouldStripResponsesPromptCache: true, + allowsAnthropicServiceTier: false, + supportsNativeStreamingUsageCompat: false, + }, + }, + { + name: "direct Anthropic messages", + input: { + provider: "anthropic", + api: "anthropic-messages", + baseUrl: "https://api.anthropic.com/v1", + capability: "llm" as const, + transport: "stream" as const, + }, + expected: { + knownProviderFamily: "anthropic", + endpointClass: "anthropic-public", + isKnownNativeEndpoint: true, + allowsOpenAIServiceTier: false, + supportsOpenAIReasoningCompatPayload: false, + allowsResponsesStore: false, + supportsResponsesStoreField: false, + shouldStripResponsesPromptCache: false, + allowsAnthropicServiceTier: true, + supportsNativeStreamingUsageCompat: false, + }, + }, + { + name: "proxied custom anthropic api", + input: { + provider: "custom-anthropic", + api: "anthropic-messages", + baseUrl: "https://proxy.example.com/anthropic", + capability: "llm" as const, + transport: "stream" as const, + }, + expected: { + endpointClass: "custom", + isKnownNativeEndpoint: false, + allowsAnthropicServiceTier: false, + supportsOpenAIReasoningCompatPayload: false, + supportsResponsesStoreField: false, + supportsNativeStreamingUsageCompat: false, + }, + }, + { + name: "native OpenRouter responses", + input: { + provider: "openrouter", + api: "openai-responses", + baseUrl: "https://openrouter.ai/api/v1", + capability: "llm" as const, + transport: "stream" as const, + }, + expected: { + knownProviderFamily: "openrouter", + endpointClass: "openrouter", + isKnownNativeEndpoint: true, + allowsOpenAIServiceTier: false, + supportsOpenAIReasoningCompatPayload: false, + allowsResponsesStore: false, + supportsResponsesStoreField: true, + shouldStripResponsesPromptCache: true, + allowsAnthropicServiceTier: false, + supportsNativeStreamingUsageCompat: false, + }, + }, + { + name: "native Moonshot completions", + input: { + provider: "moonshot", + api: "openai-completions", + baseUrl: "https://api.moonshot.ai/v1", + capability: "llm" as const, + transport: "stream" as const, + }, + expected: { + knownProviderFamily: "moonshot", + endpointClass: "moonshot-native", + isKnownNativeEndpoint: true, + allowsOpenAIServiceTier: false, + supportsOpenAIReasoningCompatPayload: false, + allowsResponsesStore: false, + supportsResponsesStoreField: false, + shouldStripResponsesPromptCache: false, + allowsAnthropicServiceTier: false, + supportsNativeStreamingUsageCompat: true, + compatibilityFamily: "moonshot", + }, + }, + { + name: "native ModelStudio completions", + input: { + provider: "modelstudio", + api: "openai-completions", + baseUrl: "https://dashscope-intl.aliyuncs.com/compatible-mode/v1", + capability: "llm" as const, + transport: "stream" as const, + }, + expected: { + knownProviderFamily: "modelstudio", + endpointClass: "modelstudio-native", + isKnownNativeEndpoint: true, + allowsOpenAIServiceTier: false, + supportsOpenAIReasoningCompatPayload: false, + allowsResponsesStore: false, + supportsResponsesStoreField: false, + shouldStripResponsesPromptCache: false, + allowsAnthropicServiceTier: false, + supportsNativeStreamingUsageCompat: true, + }, + }, + { + name: "native Google Gemini api", + input: { + provider: "google", + api: "google-generative-ai", + baseUrl: "https://generativelanguage.googleapis.com", + capability: "llm" as const, + transport: "stream" as const, + }, + expected: { + knownProviderFamily: "google", + endpointClass: "google-generative-ai", + isKnownNativeEndpoint: true, + allowsOpenAIServiceTier: false, + supportsOpenAIReasoningCompatPayload: false, + allowsResponsesStore: false, + supportsResponsesStoreField: false, + shouldStripResponsesPromptCache: false, + allowsAnthropicServiceTier: false, + supportsNativeStreamingUsageCompat: false, + }, + }, + { + name: "native GitHub Copilot responses", + input: { + provider: "github-copilot", + api: "openai-responses", + baseUrl: "https://api.individual.githubcopilot.com", + capability: "llm" as const, + transport: "stream" as const, + }, + expected: { + knownProviderFamily: "github-copilot", + endpointClass: "github-copilot-native", + isKnownNativeEndpoint: true, + allowsOpenAIServiceTier: false, + supportsOpenAIReasoningCompatPayload: false, + allowsResponsesStore: false, + supportsResponsesStoreField: true, + shouldStripResponsesPromptCache: true, + allowsAnthropicServiceTier: false, + supportsNativeStreamingUsageCompat: false, + }, + }, + ]; + + for (const testCase of cases) { + expect(resolveProviderRequestCapabilities(testCase.input), testCase.name).toMatchObject( + testCase.expected, + ); + } + }); }); diff --git a/src/plugin-sdk/provider-stream.ts b/src/plugin-sdk/provider-stream.ts index b658affdc5c..2c134ff5008 100644 --- a/src/plugin-sdk/provider-stream.ts +++ b/src/plugin-sdk/provider-stream.ts @@ -1,5 +1,9 @@ // Public stream-wrapper helpers for provider plugins. +export { + applyAnthropicPayloadPolicyToParams, + resolveAnthropicPayloadPolicy, +} from "../agents/anthropic-payload-policy.js"; export { buildCopilotDynamicHeaders, hasCopilotVisionInput,