From 8f7b02e5670d109785fefe6b6f9920364cd9f3ea Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Sat, 4 Apr 2026 20:28:54 +0900 Subject: [PATCH] refactor(providers): share openai stream families --- extensions/openai/openai-codex-provider.ts | 5 +- extensions/openai/openai-provider.ts | 8 ++- extensions/openai/stream-hooks.ts | 60 ---------------------- extensions/openrouter/index.ts | 21 ++++---- extensions/openrouter/register.runtime.ts | 2 +- src/plugin-sdk/provider-stream.test.ts | 45 ++++++++++++++++ src/plugin-sdk/provider-stream.ts | 53 +++++++++++++++++++ 7 files changed, 114 insertions(+), 80 deletions(-) delete mode 100644 extensions/openai/stream-hooks.ts diff --git a/extensions/openai/openai-codex-provider.ts b/extensions/openai/openai-codex-provider.ts index 10b95ddf14b..8d86e436fd4 100644 --- a/extensions/openai/openai-codex-provider.ts +++ b/extensions/openai/openai-codex-provider.ts @@ -17,6 +17,7 @@ import { normalizeProviderId, type ProviderPlugin, } from "openclaw/plugin-sdk/provider-model-shared"; +import { buildProviderStreamFamilyHooks } from "openclaw/plugin-sdk/provider-stream"; import { fetchCodexUsage } from "openclaw/plugin-sdk/provider-usage"; import { OPENAI_CODEX_DEFAULT_MODEL } from "./default-models.js"; import { resolveCodexAuthIdentity } from "./openai-codex-auth-identity.js"; @@ -28,7 +29,6 @@ import { isOpenAIApiBaseUrl, matchesExactOrPrefix, } from "./shared.js"; -import { wrapOpenAICodexProviderStream } from "./stream-hooks.js"; import { resolveOpenAITransportTurnState, resolveOpenAIWebSocketSessionPolicy, @@ -81,6 +81,7 @@ const OPENAI_CODEX_MODERN_MODEL_IDS = [ OPENAI_CODEX_GPT_53_MODEL_ID, OPENAI_CODEX_GPT_53_SPARK_MODEL_ID, ] as const; +const OPENAI_RESPONSES_STREAM_HOOKS = buildProviderStreamFamilyHooks("openai-responses-defaults"); function isOpenAICodexBaseUrl(baseUrl?: string): boolean { const trimmed = baseUrl?.trim(); @@ -316,7 +317,7 @@ export function buildOpenAICodexProviderPlugin(): ProviderPlugin { transport: "auto", }; }, - wrapStreamFn: (ctx) => wrapOpenAICodexProviderStream(ctx), + wrapStreamFn: (ctx) => OPENAI_RESPONSES_STREAM_HOOKS.wrapStreamFn?.(ctx), resolveTransportTurnState: (ctx) => resolveOpenAITransportTurnState(ctx), resolveWebSocketSessionPolicy: (ctx) => resolveOpenAIWebSocketSessionPolicy(ctx), resolveReasoningOutputMode: () => "native", diff --git a/extensions/openai/openai-provider.ts b/extensions/openai/openai-provider.ts index 7f4af5f87e4..4c7f093449c 100644 --- a/extensions/openai/openai-provider.ts +++ b/extensions/openai/openai-provider.ts @@ -9,6 +9,7 @@ import { normalizeProviderId, type ProviderPlugin, } from "openclaw/plugin-sdk/provider-model-shared"; +import { buildProviderStreamFamilyHooks } from "openclaw/plugin-sdk/provider-stream"; import { applyOpenAIConfig, OPENAI_DEFAULT_MODEL } from "./default-models.js"; import { buildOpenAIReplayPolicy } from "./replay-policy.js"; import { @@ -17,7 +18,6 @@ import { isOpenAIApiBaseUrl, matchesExactOrPrefix, } from "./shared.js"; -import { wrapAzureOpenAIProviderStream, wrapOpenAIProviderStream } from "./stream-hooks.js"; import { resolveOpenAITransportTurnState, resolveOpenAIWebSocketSessionPolicy, @@ -67,6 +67,7 @@ const OPENAI_MODERN_MODEL_IDS = [ ] as const; const OPENAI_DIRECT_SPARK_MODEL_ID = "gpt-5.3-codex-spark"; const SUPPRESSED_SPARK_PROVIDERS = new Set(["openai", "azure-openai-responses"]); +const OPENAI_RESPONSES_STREAM_HOOKS = buildProviderStreamFamilyHooks("openai-responses-defaults"); function shouldUseOpenAIResponsesTransport(params: { provider: string; @@ -255,10 +256,7 @@ export function buildOpenAIProvider(): ProviderPlugin { ...(hasExplicitWarmup ? {} : { openaiWsWarmup: true }), }; }, - wrapStreamFn: (ctx) => - normalizeProviderId(ctx.provider) === PROVIDER_ID - ? wrapOpenAIProviderStream(ctx) - : wrapAzureOpenAIProviderStream(ctx), + wrapStreamFn: (ctx) => OPENAI_RESPONSES_STREAM_HOOKS.wrapStreamFn?.(ctx), matchesContextOverflowError: ({ errorMessage }) => /content_filter.*(?:prompt|input).*(?:too long|exceed)/i.test(errorMessage), resolveTransportTurnState: (ctx) => resolveOpenAITransportTurnState(ctx), diff --git a/extensions/openai/stream-hooks.ts b/extensions/openai/stream-hooks.ts deleted file mode 100644 index 4b542524cf1..00000000000 --- a/extensions/openai/stream-hooks.ts +++ /dev/null @@ -1,60 +0,0 @@ -import type { ProviderWrapStreamFnContext } from "openclaw/plugin-sdk/plugin-entry"; -import { - createCodexNativeWebSearchWrapper, - createOpenAIAttributionHeadersWrapper, - createOpenAIFastModeWrapper, - createOpenAIReasoningCompatibilityWrapper, - createOpenAIResponsesContextManagementWrapper, - createOpenAIServiceTierWrapper, - createOpenAITextVerbosityWrapper, - resolveOpenAIFastMode, - resolveOpenAIServiceTier, - resolveOpenAITextVerbosity, -} from "openclaw/plugin-sdk/provider-stream"; - -function applySharedOpenAIWrappers( - streamFn: ProviderWrapStreamFnContext["streamFn"], - ctx: ProviderWrapStreamFnContext, -) { - // Transport-default ownership lives in prepareExtraParams. These wrappers stay - // intentionally identical across direct OpenAI, Azure OpenAI, and Codex. - let nextStreamFn = createOpenAIAttributionHeadersWrapper(streamFn); - - if (resolveOpenAIFastMode(ctx.extraParams)) { - nextStreamFn = createOpenAIFastModeWrapper(nextStreamFn); - } - - const serviceTier = resolveOpenAIServiceTier(ctx.extraParams); - if (serviceTier) { - nextStreamFn = createOpenAIServiceTierWrapper(nextStreamFn, serviceTier); - } - - const textVerbosity = resolveOpenAITextVerbosity(ctx.extraParams); - if (textVerbosity) { - nextStreamFn = createOpenAITextVerbosityWrapper(nextStreamFn, textVerbosity); - } - - nextStreamFn = createCodexNativeWebSearchWrapper(nextStreamFn, { - config: ctx.config, - agentDir: ctx.agentDir, - }); - return createOpenAIResponsesContextManagementWrapper( - createOpenAIReasoningCompatibilityWrapper(nextStreamFn), - ctx.extraParams, - ); -} - -/** Compose the direct OpenAI wrapper chain inside the owning provider plugin. */ -export function wrapOpenAIProviderStream(ctx: ProviderWrapStreamFnContext) { - return applySharedOpenAIWrappers(ctx.streamFn, ctx); -} - -/** Compose the Azure OpenAI wrapper chain without direct OpenAI transport defaults. */ -export function wrapAzureOpenAIProviderStream(ctx: ProviderWrapStreamFnContext) { - return applySharedOpenAIWrappers(ctx.streamFn, ctx); -} - -/** Compose the Codex-specific wrapper chain inside the owning provider plugin. */ -export function wrapOpenAICodexProviderStream(ctx: ProviderWrapStreamFnContext) { - return applySharedOpenAIWrappers(ctx.streamFn, ctx); -} diff --git a/extensions/openrouter/index.ts b/extensions/openrouter/index.ts index 7da3878fb65..741df6e94bd 100644 --- a/extensions/openrouter/index.ts +++ b/extensions/openrouter/index.ts @@ -22,12 +22,11 @@ export default definePluginEntry({ async register(api) { const { buildProviderReplayFamilyHooks, + buildProviderStreamFamilyHooks, composeProviderStreamWrappers, - createOpenRouterWrapper, createProviderApiKeyAuthMethod, DEFAULT_CONTEXT_TOKENS, getOpenRouterModelCapabilities, - isProxyReasoningUnsupported, loadOpenRouterModelCapabilities, OPENROUTER_DEFAULT_MODEL_REF, openrouterMediaUnderstandingProvider, @@ -37,6 +36,7 @@ export default definePluginEntry({ const PASSTHROUGH_GEMINI_REPLAY_HOOKS = buildProviderReplayFamilyHooks({ family: "passthrough-gemini", }); + const OPENROUTER_THINKING_STREAM_HOOKS = buildProviderStreamFamilyHooks("openrouter-thinking"); function buildDynamicOpenRouterModel( ctx: ProviderResolveDynamicModelContext, @@ -139,16 +139,13 @@ export default definePluginEntry({ ctx.extraParams?.provider != null && typeof ctx.extraParams.provider === "object" ? (ctx.extraParams.provider as Record) : undefined; - const skipReasoningInjection = - ctx.modelId === "auto" || isProxyReasoningUnsupported(ctx.modelId); - const openRouterThinkingLevel = skipReasoningInjection ? undefined : ctx.thinkingLevel; - return composeProviderStreamWrappers( - ctx.streamFn, - providerRouting - ? (streamFn) => injectOpenRouterRouting(streamFn, providerRouting) - : undefined, - (streamFn) => createOpenRouterWrapper(streamFn, openRouterThinkingLevel), - ); + const routedStreamFn = providerRouting + ? injectOpenRouterRouting(ctx.streamFn, providerRouting) + : ctx.streamFn; + return OPENROUTER_THINKING_STREAM_HOOKS.wrapStreamFn?.({ + ...ctx, + streamFn: routedStreamFn, + }); }, isCacheTtlEligible: (ctx) => isOpenRouterCacheTtlModel(ctx.modelId), }); diff --git a/extensions/openrouter/register.runtime.ts b/extensions/openrouter/register.runtime.ts index feb5ed31deb..d5a6298743c 100644 --- a/extensions/openrouter/register.runtime.ts +++ b/extensions/openrouter/register.runtime.ts @@ -4,7 +4,7 @@ export { DEFAULT_CONTEXT_TOKENS, } from "openclaw/plugin-sdk/provider-model-shared"; export { - composeProviderStreamWrappers, + buildProviderStreamFamilyHooks, createOpenRouterSystemCacheWrapper, createOpenRouterWrapper, getOpenRouterModelCapabilities, diff --git a/src/plugin-sdk/provider-stream.test.ts b/src/plugin-sdk/provider-stream.test.ts index 7df6d1257df..47765a1facc 100644 --- a/src/plugin-sdk/provider-stream.test.ts +++ b/src/plugin-sdk/provider-stream.test.ts @@ -41,6 +41,7 @@ describe("buildProviderStreamFamilyHooks", () => { it("covers the stream family matrix", () => { let capturedPayload: Record | undefined; let capturedModelId: string | undefined; + let capturedHeaders: Record | undefined; const baseStreamFn: StreamFn = (model, _context, options) => { capturedModelId = String(model.id); @@ -50,6 +51,7 @@ describe("buildProviderStreamFamilyHooks", () => { >; options?.onPayload?.(payload as never, model as never); capturedPayload = payload; + capturedHeaders = options?.headers as Record | undefined; return {} as never; }; @@ -99,6 +101,49 @@ describe("buildProviderStreamFamilyHooks", () => { thinking: { type: "disabled" }, }); + const openAiHooks = buildProviderStreamFamilyHooks("openai-responses-defaults"); + openAiHooks.wrapStreamFn?.({ + streamFn: baseStreamFn, + extraParams: { serviceTier: "flex" }, + config: {}, + agentDir: "/tmp/provider-stream-test", + } as never)( + { + api: "openai-responses", + provider: "openai", + baseUrl: "https://api.openai.com/v1", + id: "gpt-5.4", + } as never, + {} as never, + {}, + ); + expect(capturedPayload).toMatchObject({ + config: { thinkingConfig: { thinkingBudget: -1 } }, + service_tier: "flex", + }); + expect(capturedHeaders).toBeDefined(); + + const openRouterHooks = buildProviderStreamFamilyHooks("openrouter-thinking"); + openRouterHooks.wrapStreamFn?.({ + streamFn: baseStreamFn, + thinkingLevel: "high", + modelId: "openai/gpt-5.4", + } as never)({ provider: "openrouter", id: "openai/gpt-5.4" } as never, {} as never, {}); + expect(capturedPayload).toMatchObject({ + config: { thinkingConfig: { thinkingBudget: -1 } }, + reasoning: { effort: "high" }, + }); + + openRouterHooks.wrapStreamFn?.({ + streamFn: baseStreamFn, + thinkingLevel: "high", + modelId: "x-ai/grok-3", + } as never)({ provider: "openrouter", id: "x-ai/grok-3" } as never, {} as never, {}); + expect(capturedPayload).toMatchObject({ + config: { thinkingConfig: { thinkingBudget: -1 } }, + }); + expect(capturedPayload).not.toHaveProperty("reasoning"); + const toolStreamHooks = buildProviderStreamFamilyHooks("tool-stream-default-on"); toolStreamHooks.wrapStreamFn?.({ streamFn: baseStreamFn, diff --git a/src/plugin-sdk/provider-stream.ts b/src/plugin-sdk/provider-stream.ts index d53a2067919..5a1a8eadbaa 100644 --- a/src/plugin-sdk/provider-stream.ts +++ b/src/plugin-sdk/provider-stream.ts @@ -16,6 +16,18 @@ import { createOpenRouterWrapper, isProxyReasoningUnsupported, } from "../agents/pi-embedded-runner/proxy-stream-wrappers.js"; +import { + createCodexNativeWebSearchWrapper, + createOpenAIAttributionHeadersWrapper, + createOpenAIFastModeWrapper, + createOpenAIReasoningCompatibilityWrapper, + createOpenAIResponsesContextManagementWrapper, + createOpenAIServiceTierWrapper, + createOpenAITextVerbosityWrapper, + resolveOpenAIFastMode, + resolveOpenAIServiceTier, + resolveOpenAITextVerbosity, +} from "../agents/pi-embedded-runner/openai-stream-wrappers.js"; import { createToolStreamWrapper, createZaiToolStreamWrapper } from "../agents/pi-embedded-runner/zai-stream-wrappers.js"; export type ProviderStreamWrapperFactory = @@ -38,6 +50,8 @@ export type ProviderStreamFamily = | "google-thinking" | "moonshot-thinking" | "minimax-fast-mode" + | "openai-responses-defaults" + | "openrouter-thinking" | "tool-stream-default-on"; type ProviderStreamFamilyHooks = Pick; @@ -66,6 +80,45 @@ export function buildProviderStreamFamilyHooks( wrapStreamFn: (ctx: ProviderWrapStreamFnContext) => createMinimaxFastModeWrapper(ctx.streamFn, ctx.extraParams?.fastMode === true), }; + case "openai-responses-defaults": + return { + wrapStreamFn: (ctx: ProviderWrapStreamFnContext) => { + let nextStreamFn = createOpenAIAttributionHeadersWrapper(ctx.streamFn); + + if (resolveOpenAIFastMode(ctx.extraParams)) { + nextStreamFn = createOpenAIFastModeWrapper(nextStreamFn); + } + + const serviceTier = resolveOpenAIServiceTier(ctx.extraParams); + if (serviceTier) { + nextStreamFn = createOpenAIServiceTierWrapper(nextStreamFn, serviceTier); + } + + const textVerbosity = resolveOpenAITextVerbosity(ctx.extraParams); + if (textVerbosity) { + nextStreamFn = createOpenAITextVerbosityWrapper(nextStreamFn, textVerbosity); + } + + nextStreamFn = createCodexNativeWebSearchWrapper(nextStreamFn, { + config: ctx.config, + agentDir: ctx.agentDir, + }); + return createOpenAIResponsesContextManagementWrapper( + createOpenAIReasoningCompatibilityWrapper(nextStreamFn), + ctx.extraParams, + ); + }, + }; + case "openrouter-thinking": + return { + wrapStreamFn: (ctx: ProviderWrapStreamFnContext) => { + const thinkingLevel = + ctx.modelId === "auto" || isProxyReasoningUnsupported(ctx.modelId) + ? undefined + : ctx.thinkingLevel; + return createOpenRouterWrapper(ctx.streamFn, thinkingLevel); + }, + }; case "tool-stream-default-on": return { wrapStreamFn: (ctx: ProviderWrapStreamFnContext) =>