refactor(providers): share openai stream families

This commit is contained in:
Vincent Koc 2026-04-04 20:28:54 +09:00
parent 035a754f0f
commit 8f7b02e567
7 changed files with 114 additions and 80 deletions

View File

@ -17,6 +17,7 @@ import {
normalizeProviderId,
type ProviderPlugin,
} from "openclaw/plugin-sdk/provider-model-shared";
import { buildProviderStreamFamilyHooks } from "openclaw/plugin-sdk/provider-stream";
import { fetchCodexUsage } from "openclaw/plugin-sdk/provider-usage";
import { OPENAI_CODEX_DEFAULT_MODEL } from "./default-models.js";
import { resolveCodexAuthIdentity } from "./openai-codex-auth-identity.js";
@ -28,7 +29,6 @@ import {
isOpenAIApiBaseUrl,
matchesExactOrPrefix,
} from "./shared.js";
import { wrapOpenAICodexProviderStream } from "./stream-hooks.js";
import {
resolveOpenAITransportTurnState,
resolveOpenAIWebSocketSessionPolicy,
@ -81,6 +81,7 @@ const OPENAI_CODEX_MODERN_MODEL_IDS = [
OPENAI_CODEX_GPT_53_MODEL_ID,
OPENAI_CODEX_GPT_53_SPARK_MODEL_ID,
] as const;
const OPENAI_RESPONSES_STREAM_HOOKS = buildProviderStreamFamilyHooks("openai-responses-defaults");
function isOpenAICodexBaseUrl(baseUrl?: string): boolean {
const trimmed = baseUrl?.trim();
@ -316,7 +317,7 @@ export function buildOpenAICodexProviderPlugin(): ProviderPlugin {
transport: "auto",
};
},
wrapStreamFn: (ctx) => wrapOpenAICodexProviderStream(ctx),
wrapStreamFn: (ctx) => OPENAI_RESPONSES_STREAM_HOOKS.wrapStreamFn?.(ctx),
resolveTransportTurnState: (ctx) => resolveOpenAITransportTurnState(ctx),
resolveWebSocketSessionPolicy: (ctx) => resolveOpenAIWebSocketSessionPolicy(ctx),
resolveReasoningOutputMode: () => "native",

View File

@ -9,6 +9,7 @@ import {
normalizeProviderId,
type ProviderPlugin,
} from "openclaw/plugin-sdk/provider-model-shared";
import { buildProviderStreamFamilyHooks } from "openclaw/plugin-sdk/provider-stream";
import { applyOpenAIConfig, OPENAI_DEFAULT_MODEL } from "./default-models.js";
import { buildOpenAIReplayPolicy } from "./replay-policy.js";
import {
@ -17,7 +18,6 @@ import {
isOpenAIApiBaseUrl,
matchesExactOrPrefix,
} from "./shared.js";
import { wrapAzureOpenAIProviderStream, wrapOpenAIProviderStream } from "./stream-hooks.js";
import {
resolveOpenAITransportTurnState,
resolveOpenAIWebSocketSessionPolicy,
@ -67,6 +67,7 @@ const OPENAI_MODERN_MODEL_IDS = [
] as const;
const OPENAI_DIRECT_SPARK_MODEL_ID = "gpt-5.3-codex-spark";
const SUPPRESSED_SPARK_PROVIDERS = new Set(["openai", "azure-openai-responses"]);
const OPENAI_RESPONSES_STREAM_HOOKS = buildProviderStreamFamilyHooks("openai-responses-defaults");
function shouldUseOpenAIResponsesTransport(params: {
provider: string;
@ -255,10 +256,7 @@ export function buildOpenAIProvider(): ProviderPlugin {
...(hasExplicitWarmup ? {} : { openaiWsWarmup: true }),
};
},
wrapStreamFn: (ctx) =>
normalizeProviderId(ctx.provider) === PROVIDER_ID
? wrapOpenAIProviderStream(ctx)
: wrapAzureOpenAIProviderStream(ctx),
wrapStreamFn: (ctx) => OPENAI_RESPONSES_STREAM_HOOKS.wrapStreamFn?.(ctx),
matchesContextOverflowError: ({ errorMessage }) =>
/content_filter.*(?:prompt|input).*(?:too long|exceed)/i.test(errorMessage),
resolveTransportTurnState: (ctx) => resolveOpenAITransportTurnState(ctx),

View File

@ -1,60 +0,0 @@
import type { ProviderWrapStreamFnContext } from "openclaw/plugin-sdk/plugin-entry";
import {
createCodexNativeWebSearchWrapper,
createOpenAIAttributionHeadersWrapper,
createOpenAIFastModeWrapper,
createOpenAIReasoningCompatibilityWrapper,
createOpenAIResponsesContextManagementWrapper,
createOpenAIServiceTierWrapper,
createOpenAITextVerbosityWrapper,
resolveOpenAIFastMode,
resolveOpenAIServiceTier,
resolveOpenAITextVerbosity,
} from "openclaw/plugin-sdk/provider-stream";
function applySharedOpenAIWrappers(
streamFn: ProviderWrapStreamFnContext["streamFn"],
ctx: ProviderWrapStreamFnContext,
) {
// Transport-default ownership lives in prepareExtraParams. These wrappers stay
// intentionally identical across direct OpenAI, Azure OpenAI, and Codex.
let nextStreamFn = createOpenAIAttributionHeadersWrapper(streamFn);
if (resolveOpenAIFastMode(ctx.extraParams)) {
nextStreamFn = createOpenAIFastModeWrapper(nextStreamFn);
}
const serviceTier = resolveOpenAIServiceTier(ctx.extraParams);
if (serviceTier) {
nextStreamFn = createOpenAIServiceTierWrapper(nextStreamFn, serviceTier);
}
const textVerbosity = resolveOpenAITextVerbosity(ctx.extraParams);
if (textVerbosity) {
nextStreamFn = createOpenAITextVerbosityWrapper(nextStreamFn, textVerbosity);
}
nextStreamFn = createCodexNativeWebSearchWrapper(nextStreamFn, {
config: ctx.config,
agentDir: ctx.agentDir,
});
return createOpenAIResponsesContextManagementWrapper(
createOpenAIReasoningCompatibilityWrapper(nextStreamFn),
ctx.extraParams,
);
}
/** Compose the direct OpenAI wrapper chain inside the owning provider plugin. */
export function wrapOpenAIProviderStream(ctx: ProviderWrapStreamFnContext) {
return applySharedOpenAIWrappers(ctx.streamFn, ctx);
}
/** Compose the Azure OpenAI wrapper chain without direct OpenAI transport defaults. */
export function wrapAzureOpenAIProviderStream(ctx: ProviderWrapStreamFnContext) {
return applySharedOpenAIWrappers(ctx.streamFn, ctx);
}
/** Compose the Codex-specific wrapper chain inside the owning provider plugin. */
export function wrapOpenAICodexProviderStream(ctx: ProviderWrapStreamFnContext) {
return applySharedOpenAIWrappers(ctx.streamFn, ctx);
}

View File

@ -22,12 +22,11 @@ export default definePluginEntry({
async register(api) {
const {
buildProviderReplayFamilyHooks,
buildProviderStreamFamilyHooks,
composeProviderStreamWrappers,
createOpenRouterWrapper,
createProviderApiKeyAuthMethod,
DEFAULT_CONTEXT_TOKENS,
getOpenRouterModelCapabilities,
isProxyReasoningUnsupported,
loadOpenRouterModelCapabilities,
OPENROUTER_DEFAULT_MODEL_REF,
openrouterMediaUnderstandingProvider,
@ -37,6 +36,7 @@ export default definePluginEntry({
const PASSTHROUGH_GEMINI_REPLAY_HOOKS = buildProviderReplayFamilyHooks({
family: "passthrough-gemini",
});
const OPENROUTER_THINKING_STREAM_HOOKS = buildProviderStreamFamilyHooks("openrouter-thinking");
function buildDynamicOpenRouterModel(
ctx: ProviderResolveDynamicModelContext,
@ -139,16 +139,13 @@ export default definePluginEntry({
ctx.extraParams?.provider != null && typeof ctx.extraParams.provider === "object"
? (ctx.extraParams.provider as Record<string, unknown>)
: undefined;
const skipReasoningInjection =
ctx.modelId === "auto" || isProxyReasoningUnsupported(ctx.modelId);
const openRouterThinkingLevel = skipReasoningInjection ? undefined : ctx.thinkingLevel;
return composeProviderStreamWrappers(
ctx.streamFn,
providerRouting
? (streamFn) => injectOpenRouterRouting(streamFn, providerRouting)
: undefined,
(streamFn) => createOpenRouterWrapper(streamFn, openRouterThinkingLevel),
);
const routedStreamFn = providerRouting
? injectOpenRouterRouting(ctx.streamFn, providerRouting)
: ctx.streamFn;
return OPENROUTER_THINKING_STREAM_HOOKS.wrapStreamFn?.({
...ctx,
streamFn: routedStreamFn,
});
},
isCacheTtlEligible: (ctx) => isOpenRouterCacheTtlModel(ctx.modelId),
});

View File

@ -4,7 +4,7 @@ export {
DEFAULT_CONTEXT_TOKENS,
} from "openclaw/plugin-sdk/provider-model-shared";
export {
composeProviderStreamWrappers,
buildProviderStreamFamilyHooks,
createOpenRouterSystemCacheWrapper,
createOpenRouterWrapper,
getOpenRouterModelCapabilities,

View File

@ -41,6 +41,7 @@ describe("buildProviderStreamFamilyHooks", () => {
it("covers the stream family matrix", () => {
let capturedPayload: Record<string, unknown> | undefined;
let capturedModelId: string | undefined;
let capturedHeaders: Record<string, string> | undefined;
const baseStreamFn: StreamFn = (model, _context, options) => {
capturedModelId = String(model.id);
@ -50,6 +51,7 @@ describe("buildProviderStreamFamilyHooks", () => {
>;
options?.onPayload?.(payload as never, model as never);
capturedPayload = payload;
capturedHeaders = options?.headers as Record<string, string> | undefined;
return {} as never;
};
@ -99,6 +101,49 @@ describe("buildProviderStreamFamilyHooks", () => {
thinking: { type: "disabled" },
});
const openAiHooks = buildProviderStreamFamilyHooks("openai-responses-defaults");
openAiHooks.wrapStreamFn?.({
streamFn: baseStreamFn,
extraParams: { serviceTier: "flex" },
config: {},
agentDir: "/tmp/provider-stream-test",
} as never)(
{
api: "openai-responses",
provider: "openai",
baseUrl: "https://api.openai.com/v1",
id: "gpt-5.4",
} as never,
{} as never,
{},
);
expect(capturedPayload).toMatchObject({
config: { thinkingConfig: { thinkingBudget: -1 } },
service_tier: "flex",
});
expect(capturedHeaders).toBeDefined();
const openRouterHooks = buildProviderStreamFamilyHooks("openrouter-thinking");
openRouterHooks.wrapStreamFn?.({
streamFn: baseStreamFn,
thinkingLevel: "high",
modelId: "openai/gpt-5.4",
} as never)({ provider: "openrouter", id: "openai/gpt-5.4" } as never, {} as never, {});
expect(capturedPayload).toMatchObject({
config: { thinkingConfig: { thinkingBudget: -1 } },
reasoning: { effort: "high" },
});
openRouterHooks.wrapStreamFn?.({
streamFn: baseStreamFn,
thinkingLevel: "high",
modelId: "x-ai/grok-3",
} as never)({ provider: "openrouter", id: "x-ai/grok-3" } as never, {} as never, {});
expect(capturedPayload).toMatchObject({
config: { thinkingConfig: { thinkingBudget: -1 } },
});
expect(capturedPayload).not.toHaveProperty("reasoning");
const toolStreamHooks = buildProviderStreamFamilyHooks("tool-stream-default-on");
toolStreamHooks.wrapStreamFn?.({
streamFn: baseStreamFn,

View File

@ -16,6 +16,18 @@ import {
createOpenRouterWrapper,
isProxyReasoningUnsupported,
} from "../agents/pi-embedded-runner/proxy-stream-wrappers.js";
import {
createCodexNativeWebSearchWrapper,
createOpenAIAttributionHeadersWrapper,
createOpenAIFastModeWrapper,
createOpenAIReasoningCompatibilityWrapper,
createOpenAIResponsesContextManagementWrapper,
createOpenAIServiceTierWrapper,
createOpenAITextVerbosityWrapper,
resolveOpenAIFastMode,
resolveOpenAIServiceTier,
resolveOpenAITextVerbosity,
} from "../agents/pi-embedded-runner/openai-stream-wrappers.js";
import { createToolStreamWrapper, createZaiToolStreamWrapper } from "../agents/pi-embedded-runner/zai-stream-wrappers.js";
export type ProviderStreamWrapperFactory =
@ -38,6 +50,8 @@ export type ProviderStreamFamily =
| "google-thinking"
| "moonshot-thinking"
| "minimax-fast-mode"
| "openai-responses-defaults"
| "openrouter-thinking"
| "tool-stream-default-on";
type ProviderStreamFamilyHooks = Pick<ProviderPlugin, "wrapStreamFn">;
@ -66,6 +80,45 @@ export function buildProviderStreamFamilyHooks(
wrapStreamFn: (ctx: ProviderWrapStreamFnContext) =>
createMinimaxFastModeWrapper(ctx.streamFn, ctx.extraParams?.fastMode === true),
};
case "openai-responses-defaults":
return {
wrapStreamFn: (ctx: ProviderWrapStreamFnContext) => {
let nextStreamFn = createOpenAIAttributionHeadersWrapper(ctx.streamFn);
if (resolveOpenAIFastMode(ctx.extraParams)) {
nextStreamFn = createOpenAIFastModeWrapper(nextStreamFn);
}
const serviceTier = resolveOpenAIServiceTier(ctx.extraParams);
if (serviceTier) {
nextStreamFn = createOpenAIServiceTierWrapper(nextStreamFn, serviceTier);
}
const textVerbosity = resolveOpenAITextVerbosity(ctx.extraParams);
if (textVerbosity) {
nextStreamFn = createOpenAITextVerbosityWrapper(nextStreamFn, textVerbosity);
}
nextStreamFn = createCodexNativeWebSearchWrapper(nextStreamFn, {
config: ctx.config,
agentDir: ctx.agentDir,
});
return createOpenAIResponsesContextManagementWrapper(
createOpenAIReasoningCompatibilityWrapper(nextStreamFn),
ctx.extraParams,
);
},
};
case "openrouter-thinking":
return {
wrapStreamFn: (ctx: ProviderWrapStreamFnContext) => {
const thinkingLevel =
ctx.modelId === "auto" || isProxyReasoningUnsupported(ctx.modelId)
? undefined
: ctx.thinkingLevel;
return createOpenRouterWrapper(ctx.streamFn, thinkingLevel);
},
};
case "tool-stream-default-on":
return {
wrapStreamFn: (ctx: ProviderWrapStreamFnContext) =>