refactor(providers): share anthropic payload policy

This commit is contained in:
Vincent Koc 2026-04-04 04:57:34 +09:00
parent 3e0ddaf5bc
commit 067496b129
7 changed files with 536 additions and 118 deletions

View File

@ -1,7 +1,10 @@
import type { StreamFn } from "@mariozechner/pi-agent-core";
import { streamSimple } from "@mariozechner/pi-ai";
import { resolveProviderRequestCapabilities } from "openclaw/plugin-sdk/provider-http";
import { streamWithPayloadPatch } from "openclaw/plugin-sdk/provider-stream";
import {
applyAnthropicPayloadPolicyToParams,
resolveAnthropicPayloadPolicy,
streamWithPayloadPatch,
} from "openclaw/plugin-sdk/provider-stream";
import { createSubsystemLogger } from "openclaw/plugin-sdk/runtime-env";
const log = createSubsystemLogger("anthropic-stream");
@ -52,20 +55,6 @@ function isAnthropicOAuthApiKey(apiKey: unknown): boolean {
return typeof apiKey === "string" && apiKey.includes("sk-ant-oat");
}
function allowsAnthropicServiceTier(model: {
api?: unknown;
provider?: unknown;
baseUrl?: unknown;
}): boolean {
return resolveProviderRequestCapabilities({
provider: typeof model.provider === "string" ? model.provider : undefined,
api: typeof model.api === "string" ? model.api : undefined,
baseUrl: typeof model.baseUrl === "string" ? model.baseUrl : undefined,
capability: "llm",
transport: "stream",
}).allowsAnthropicServiceTier;
}
function resolveAnthropicFastServiceTier(enabled: boolean): AnthropicServiceTier {
return enabled ? "auto" : "standard_only";
}
@ -161,15 +150,19 @@ export function createAnthropicFastModeWrapper(
const underlying = baseStreamFn ?? streamSimple;
const serviceTier = resolveAnthropicFastServiceTier(enabled);
return (model, context, options) => {
if (!allowsAnthropicServiceTier(model)) {
const payloadPolicy = resolveAnthropicPayloadPolicy({
provider: typeof model.provider === "string" ? model.provider : undefined,
api: typeof model.api === "string" ? model.api : undefined,
baseUrl: typeof model.baseUrl === "string" ? model.baseUrl : undefined,
serviceTier,
});
if (!payloadPolicy.allowsServiceTier) {
return underlying(model, context, options);
}
return streamWithPayloadPatch(underlying, model, context, options, (payloadObj) => {
if (payloadObj.service_tier === undefined) {
payloadObj.service_tier = serviceTier;
}
});
return streamWithPayloadPatch(underlying, model, context, options, (payloadObj) =>
applyAnthropicPayloadPolicyToParams(payloadObj, payloadPolicy),
);
};
}
@ -179,15 +172,19 @@ export function createAnthropicServiceTierWrapper(
): StreamFn {
const underlying = baseStreamFn ?? streamSimple;
return (model, context, options) => {
if (!allowsAnthropicServiceTier(model)) {
const payloadPolicy = resolveAnthropicPayloadPolicy({
provider: typeof model.provider === "string" ? model.provider : undefined,
api: typeof model.api === "string" ? model.api : undefined,
baseUrl: typeof model.baseUrl === "string" ? model.baseUrl : undefined,
serviceTier,
});
if (!payloadPolicy.allowsServiceTier) {
return underlying(model, context, options);
}
return streamWithPayloadPatch(underlying, model, context, options, (payloadObj) => {
if (payloadObj.service_tier === undefined) {
payloadObj.service_tier = serviceTier;
}
});
return streamWithPayloadPatch(underlying, model, context, options, (payloadObj) =>
applyAnthropicPayloadPolicyToParams(payloadObj, payloadPolicy),
);
};
}

View File

@ -0,0 +1,99 @@
import { describe, expect, it } from "vitest";
import {
applyAnthropicPayloadPolicyToParams,
resolveAnthropicPayloadPolicy,
} from "./anthropic-payload-policy.js";
describe("anthropic payload policy", () => {
it("applies native Anthropic service tier and cache markers without widening cache scope", () => {
const policy = resolveAnthropicPayloadPolicy({
provider: "anthropic",
api: "anthropic-messages",
baseUrl: "https://api.anthropic.com/v1",
cacheRetention: "long",
enableCacheControl: true,
serviceTier: "standard_only",
});
const payload: Record<string, unknown> = {
system: [
{ type: "text", text: "Follow policy." },
{ type: "text", text: "Use tools carefully." },
],
messages: [
{
role: "assistant",
content: [{ type: "text", text: "Working." }],
},
{
role: "user",
content: [
{ type: "text", text: "Hello" },
{ type: "tool_result", tool_use_id: "tool_1", content: "done" },
],
},
],
};
applyAnthropicPayloadPolicyToParams(payload, policy);
expect(payload.service_tier).toBe("standard_only");
expect(payload.system).toEqual([
{
type: "text",
text: "Follow policy.",
cache_control: { type: "ephemeral", ttl: "1h" },
},
{
type: "text",
text: "Use tools carefully.",
cache_control: { type: "ephemeral", ttl: "1h" },
},
]);
expect(payload.messages[0]).toEqual({
role: "assistant",
content: [{ type: "text", text: "Working." }],
});
expect(payload.messages[1]).toEqual({
role: "user",
content: [
{ type: "text", text: "Hello" },
{
type: "tool_result",
tool_use_id: "tool_1",
content: "done",
cache_control: { type: "ephemeral", ttl: "1h" },
},
],
});
});
it("denies proxied Anthropic service tier and omits long-TTL upgrades for custom hosts", () => {
const policy = resolveAnthropicPayloadPolicy({
provider: "anthropic",
api: "anthropic-messages",
baseUrl: "https://proxy.example.com/anthropic",
cacheRetention: "long",
enableCacheControl: true,
serviceTier: "auto",
});
const payload: Record<string, unknown> = {
system: [{ type: "text", text: "Follow policy." }],
messages: [{ role: "user", content: "Hello" }],
};
applyAnthropicPayloadPolicyToParams(payload, policy);
expect(payload).not.toHaveProperty("service_tier");
expect(payload.system).toEqual([
{
type: "text",
text: "Follow policy.",
cache_control: { type: "ephemeral" },
},
]);
expect(payload.messages[0]).toEqual({
role: "user",
content: [{ type: "text", text: "Hello", cache_control: { type: "ephemeral" } }],
});
});
});

View File

@ -0,0 +1,188 @@
import { resolveProviderRequestCapabilities } from "./provider-attribution.js";
export type AnthropicServiceTier = "auto" | "standard_only";
export type AnthropicEphemeralCacheControl = {
type: "ephemeral";
ttl?: "1h";
};
type AnthropicPayloadPolicyInput = {
api?: string;
baseUrl?: string;
cacheRetention?: "short" | "long" | "none";
enableCacheControl?: boolean;
provider?: string;
serviceTier?: AnthropicServiceTier;
};
export type AnthropicPayloadPolicy = {
allowsServiceTier: boolean;
cacheControl: AnthropicEphemeralCacheControl | undefined;
serviceTier: AnthropicServiceTier | undefined;
};
function resolveAnthropicEphemeralCacheControl(
baseUrl: string | undefined,
cacheRetention: AnthropicPayloadPolicyInput["cacheRetention"],
): AnthropicEphemeralCacheControl | undefined {
const retention =
cacheRetention ?? (process.env.PI_CACHE_RETENTION === "long" ? "long" : "short");
if (retention === "none") {
return undefined;
}
const ttl =
retention === "long" && typeof baseUrl === "string" && baseUrl.includes("api.anthropic.com")
? "1h"
: undefined;
return { type: "ephemeral", ...(ttl ? { ttl } : {}) };
}
function applyAnthropicCacheControlToSystem(
system: unknown,
cacheControl: AnthropicEphemeralCacheControl,
): void {
if (!Array.isArray(system)) {
return;
}
for (const block of system) {
if (!block || typeof block !== "object") {
continue;
}
const record = block as Record<string, unknown>;
if (record.type === "text" && record.cache_control === undefined) {
record.cache_control = cacheControl;
}
}
}
function applyAnthropicCacheControlToMessages(
messages: unknown,
cacheControl: AnthropicEphemeralCacheControl,
): void {
if (!Array.isArray(messages) || messages.length === 0) {
return;
}
const lastMessage = messages[messages.length - 1];
if (!lastMessage || typeof lastMessage !== "object") {
return;
}
const record = lastMessage as Record<string, unknown>;
if (record.role !== "user") {
return;
}
const content = record.content;
if (Array.isArray(content)) {
const lastBlock = content[content.length - 1];
if (!lastBlock || typeof lastBlock !== "object") {
return;
}
const lastBlockRecord = lastBlock as Record<string, unknown>;
if (
lastBlockRecord.type === "text" ||
lastBlockRecord.type === "image" ||
lastBlockRecord.type === "tool_result"
) {
lastBlockRecord.cache_control = cacheControl;
}
return;
}
if (typeof content === "string") {
record.content = [
{
type: "text",
text: content,
cache_control: cacheControl,
},
];
}
}
export function resolveAnthropicPayloadPolicy(
input: AnthropicPayloadPolicyInput,
): AnthropicPayloadPolicy {
const capabilities = resolveProviderRequestCapabilities({
provider: input.provider,
api: input.api,
baseUrl: input.baseUrl,
capability: "llm",
transport: "stream",
});
return {
allowsServiceTier: capabilities.allowsAnthropicServiceTier,
cacheControl:
input.enableCacheControl === true
? resolveAnthropicEphemeralCacheControl(input.baseUrl, input.cacheRetention)
: undefined,
serviceTier: input.serviceTier,
};
}
export function applyAnthropicPayloadPolicyToParams(
payloadObj: Record<string, unknown>,
policy: AnthropicPayloadPolicy,
): void {
if (
policy.allowsServiceTier &&
policy.serviceTier !== undefined &&
payloadObj.service_tier === undefined
) {
payloadObj.service_tier = policy.serviceTier;
}
if (!policy.cacheControl) {
return;
}
applyAnthropicCacheControlToSystem(payloadObj.system, policy.cacheControl);
// Preserve Anthropic cache-write scope by only tagging the trailing user turn.
applyAnthropicCacheControlToMessages(payloadObj.messages, policy.cacheControl);
}
export function applyAnthropicEphemeralCacheControlMarkers(
payloadObj: Record<string, unknown>,
): void {
const messages = payloadObj.messages;
if (!Array.isArray(messages)) {
return;
}
for (const message of messages as Array<{ role?: string; content?: unknown }>) {
if (message.role === "system" || message.role === "developer") {
if (typeof message.content === "string") {
message.content = [
{ type: "text", text: message.content, cache_control: { type: "ephemeral" } },
];
continue;
}
if (Array.isArray(message.content) && message.content.length > 0) {
const last = message.content[message.content.length - 1];
if (last && typeof last === "object") {
const record = last as Record<string, unknown>;
if (record.type !== "thinking" && record.type !== "redacted_thinking") {
record.cache_control = { type: "ephemeral" };
}
}
}
continue;
}
if (message.role === "assistant" && Array.isArray(message.content)) {
for (const block of message.content) {
if (!block || typeof block !== "object") {
continue;
}
const record = block as Record<string, unknown>;
if (record.type === "thinking" || record.type === "redacted_thinking") {
delete record.cache_control;
}
}
}
}
}

View File

@ -10,6 +10,10 @@ import {
type SimpleStreamOptions,
type ThinkingLevel,
} from "@mariozechner/pi-ai";
import {
applyAnthropicPayloadPolicyToParams,
resolveAnthropicPayloadPolicy,
} from "./anthropic-payload-policy.js";
import { buildCopilotDynamicHeaders, hasCopilotVisionInput } from "./copilot-dynamic-headers.js";
import { buildGuardedModelFetch } from "./provider-transport-fetch.js";
import { transformTransportMessages } from "./transport-message-transform.js";
@ -164,22 +168,6 @@ function fromClaudeCodeName(name: string, tools: Context["tools"] | undefined):
return name;
}
function resolveCacheControl(
baseUrl: string | undefined,
cacheRetention: AnthropicOptions["cacheRetention"],
): { type: "ephemeral"; ttl?: "1h" } | undefined {
const retention =
cacheRetention ?? (process.env.PI_CACHE_RETENTION === "long" ? "long" : "short");
if (retention === "none") {
return undefined;
}
const ttl =
retention === "long" && typeof baseUrl === "string" && baseUrl.includes("api.anthropic.com")
? "1h"
: undefined;
return { type: "ephemeral", ...(ttl ? { ttl } : {}) };
}
function convertContentBlocks(
content: Array<
{ type: "text"; text: string } | { type: "image"; data: string; mimeType: string }
@ -224,7 +212,6 @@ function convertAnthropicMessages(
messages: Context["messages"],
model: AnthropicTransportModel,
isOAuthToken: boolean,
cacheControl: { type: "ephemeral"; ttl?: "1h" } | undefined,
) {
const params: Array<Record<string, unknown>> = [];
const transformedMessages = transformTransportMessages(messages, model, normalizeToolCallId);
@ -361,33 +348,6 @@ function convertAnthropicMessages(
});
}
}
if (cacheControl && params.length > 0) {
const lastMessage = params[params.length - 1];
if (lastMessage.role === "user") {
const content = lastMessage.content;
if (Array.isArray(content)) {
const lastBlock = content[content.length - 1];
if (
lastBlock &&
typeof lastBlock === "object" &&
"type" in lastBlock &&
(lastBlock.type === "text" ||
lastBlock.type === "image" ||
lastBlock.type === "tool_result")
) {
(lastBlock as Record<string, unknown>).cache_control = cacheControl;
}
} else if (typeof content === "string") {
lastMessage.content = [
{
type: "text",
text: content,
cache_control: cacheControl,
},
];
}
}
}
return params;
}
@ -515,11 +475,17 @@ function buildAnthropicParams(
isOAuthToken: boolean,
options: AnthropicTransportOptions | undefined,
) {
const cacheControl = resolveCacheControl(model.baseUrl, options?.cacheRetention);
const payloadPolicy = resolveAnthropicPayloadPolicy({
provider: model.provider,
api: model.api,
baseUrl: model.baseUrl,
cacheRetention: options?.cacheRetention,
enableCacheControl: true,
});
const defaultMaxTokens = Math.min(model.maxTokens, 32_000);
const params: Record<string, unknown> = {
model: model.id,
messages: convertAnthropicMessages(context.messages, model, isOAuthToken, cacheControl),
messages: convertAnthropicMessages(context.messages, model, isOAuthToken),
max_tokens: options?.maxTokens || defaultMaxTokens,
stream: true,
};
@ -528,14 +494,12 @@ function buildAnthropicParams(
{
type: "text",
text: "You are Claude Code, Anthropic's official CLI for Claude.",
...(cacheControl ? { cache_control: cacheControl } : {}),
},
...(context.systemPrompt
? [
{
type: "text",
text: sanitizeTransportPayloadText(context.systemPrompt),
...(cacheControl ? { cache_control: cacheControl } : {}),
},
]
: []),
@ -545,7 +509,6 @@ function buildAnthropicParams(
{
type: "text",
text: sanitizeTransportPayloadText(context.systemPrompt),
...(cacheControl ? { cache_control: cacheControl } : {}),
},
];
}
@ -579,6 +542,7 @@ function buildAnthropicParams(
params.tool_choice =
typeof options.toolChoice === "string" ? { type: options.toolChoice } : options.toolChoice;
}
applyAnthropicPayloadPolicyToParams(params, payloadPolicy);
return params;
}

View File

@ -1,41 +1 @@
export function applyAnthropicEphemeralCacheControlMarkers(
payloadObj: Record<string, unknown>,
): void {
const messages = payloadObj.messages;
if (!Array.isArray(messages)) {
return;
}
for (const message of messages as Array<{ role?: string; content?: unknown }>) {
if (message.role === "system" || message.role === "developer") {
if (typeof message.content === "string") {
message.content = [
{ type: "text", text: message.content, cache_control: { type: "ephemeral" } },
];
continue;
}
if (Array.isArray(message.content) && message.content.length > 0) {
const last = message.content[message.content.length - 1];
if (last && typeof last === "object") {
const record = last as Record<string, unknown>;
if (record.type !== "thinking" && record.type !== "redacted_thinking") {
record.cache_control = { type: "ephemeral" };
}
}
}
continue;
}
if (message.role === "assistant" && Array.isArray(message.content)) {
for (const block of message.content) {
if (!block || typeof block !== "object") {
continue;
}
const record = block as Record<string, unknown>;
if (record.type === "thinking" || record.type === "redacted_thinking") {
delete record.cache_control;
}
}
}
}
}
export { applyAnthropicEphemeralCacheControlMarkers } from "../anthropic-payload-policy.js";

View File

@ -599,4 +599,210 @@ describe("provider attribution", () => {
isKnownNativeEndpoint: true,
});
});
it("resolves a provider capability matrix for representative native and proxied routes", () => {
const cases = [
{
name: "native OpenAI responses",
input: {
provider: "openai",
api: "openai-responses",
baseUrl: "https://api.openai.com/v1",
capability: "llm" as const,
transport: "stream" as const,
},
expected: {
knownProviderFamily: "openai-family",
endpointClass: "openai-public",
isKnownNativeEndpoint: true,
allowsOpenAIServiceTier: true,
supportsOpenAIReasoningCompatPayload: true,
allowsResponsesStore: true,
supportsResponsesStoreField: true,
shouldStripResponsesPromptCache: false,
allowsAnthropicServiceTier: false,
supportsNativeStreamingUsageCompat: false,
},
},
{
name: "proxied OpenAI responses",
input: {
provider: "openai",
api: "openai-responses",
baseUrl: "https://proxy.example.com/v1",
capability: "llm" as const,
transport: "stream" as const,
},
expected: {
knownProviderFamily: "openai-family",
endpointClass: "custom",
isKnownNativeEndpoint: false,
allowsOpenAIServiceTier: false,
supportsOpenAIReasoningCompatPayload: false,
allowsResponsesStore: false,
supportsResponsesStoreField: true,
shouldStripResponsesPromptCache: true,
allowsAnthropicServiceTier: false,
supportsNativeStreamingUsageCompat: false,
},
},
{
name: "direct Anthropic messages",
input: {
provider: "anthropic",
api: "anthropic-messages",
baseUrl: "https://api.anthropic.com/v1",
capability: "llm" as const,
transport: "stream" as const,
},
expected: {
knownProviderFamily: "anthropic",
endpointClass: "anthropic-public",
isKnownNativeEndpoint: true,
allowsOpenAIServiceTier: false,
supportsOpenAIReasoningCompatPayload: false,
allowsResponsesStore: false,
supportsResponsesStoreField: false,
shouldStripResponsesPromptCache: false,
allowsAnthropicServiceTier: true,
supportsNativeStreamingUsageCompat: false,
},
},
{
name: "proxied custom anthropic api",
input: {
provider: "custom-anthropic",
api: "anthropic-messages",
baseUrl: "https://proxy.example.com/anthropic",
capability: "llm" as const,
transport: "stream" as const,
},
expected: {
endpointClass: "custom",
isKnownNativeEndpoint: false,
allowsAnthropicServiceTier: false,
supportsOpenAIReasoningCompatPayload: false,
supportsResponsesStoreField: false,
supportsNativeStreamingUsageCompat: false,
},
},
{
name: "native OpenRouter responses",
input: {
provider: "openrouter",
api: "openai-responses",
baseUrl: "https://openrouter.ai/api/v1",
capability: "llm" as const,
transport: "stream" as const,
},
expected: {
knownProviderFamily: "openrouter",
endpointClass: "openrouter",
isKnownNativeEndpoint: true,
allowsOpenAIServiceTier: false,
supportsOpenAIReasoningCompatPayload: false,
allowsResponsesStore: false,
supportsResponsesStoreField: true,
shouldStripResponsesPromptCache: true,
allowsAnthropicServiceTier: false,
supportsNativeStreamingUsageCompat: false,
},
},
{
name: "native Moonshot completions",
input: {
provider: "moonshot",
api: "openai-completions",
baseUrl: "https://api.moonshot.ai/v1",
capability: "llm" as const,
transport: "stream" as const,
},
expected: {
knownProviderFamily: "moonshot",
endpointClass: "moonshot-native",
isKnownNativeEndpoint: true,
allowsOpenAIServiceTier: false,
supportsOpenAIReasoningCompatPayload: false,
allowsResponsesStore: false,
supportsResponsesStoreField: false,
shouldStripResponsesPromptCache: false,
allowsAnthropicServiceTier: false,
supportsNativeStreamingUsageCompat: true,
compatibilityFamily: "moonshot",
},
},
{
name: "native ModelStudio completions",
input: {
provider: "modelstudio",
api: "openai-completions",
baseUrl: "https://dashscope-intl.aliyuncs.com/compatible-mode/v1",
capability: "llm" as const,
transport: "stream" as const,
},
expected: {
knownProviderFamily: "modelstudio",
endpointClass: "modelstudio-native",
isKnownNativeEndpoint: true,
allowsOpenAIServiceTier: false,
supportsOpenAIReasoningCompatPayload: false,
allowsResponsesStore: false,
supportsResponsesStoreField: false,
shouldStripResponsesPromptCache: false,
allowsAnthropicServiceTier: false,
supportsNativeStreamingUsageCompat: true,
},
},
{
name: "native Google Gemini api",
input: {
provider: "google",
api: "google-generative-ai",
baseUrl: "https://generativelanguage.googleapis.com",
capability: "llm" as const,
transport: "stream" as const,
},
expected: {
knownProviderFamily: "google",
endpointClass: "google-generative-ai",
isKnownNativeEndpoint: true,
allowsOpenAIServiceTier: false,
supportsOpenAIReasoningCompatPayload: false,
allowsResponsesStore: false,
supportsResponsesStoreField: false,
shouldStripResponsesPromptCache: false,
allowsAnthropicServiceTier: false,
supportsNativeStreamingUsageCompat: false,
},
},
{
name: "native GitHub Copilot responses",
input: {
provider: "github-copilot",
api: "openai-responses",
baseUrl: "https://api.individual.githubcopilot.com",
capability: "llm" as const,
transport: "stream" as const,
},
expected: {
knownProviderFamily: "github-copilot",
endpointClass: "github-copilot-native",
isKnownNativeEndpoint: true,
allowsOpenAIServiceTier: false,
supportsOpenAIReasoningCompatPayload: false,
allowsResponsesStore: false,
supportsResponsesStoreField: true,
shouldStripResponsesPromptCache: true,
allowsAnthropicServiceTier: false,
supportsNativeStreamingUsageCompat: false,
},
},
];
for (const testCase of cases) {
expect(resolveProviderRequestCapabilities(testCase.input), testCase.name).toMatchObject(
testCase.expected,
);
}
});
});

View File

@ -1,5 +1,9 @@
// Public stream-wrapper helpers for provider plugins.
export {
applyAnthropicPayloadPolicyToParams,
resolveAnthropicPayloadPolicy,
} from "../agents/anthropic-payload-policy.js";
export {
buildCopilotDynamicHeaders,
hasCopilotVisionInput,