From b76ed0fadfd28183084f92d239647c5cf4b56c22 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 4 Apr 2026 02:11:22 +0100 Subject: [PATCH] fix: harden OpenAI websocket transport --- CHANGELOG.md | 2 +- docs/providers/openai.md | 4 + src/agents/openai-ws-connection.test.ts | 31 ++ src/agents/openai-ws-connection.ts | 64 ++- src/agents/openai-ws-request.ts | 113 +++++ src/agents/openai-ws-stream.test.ts | 71 ++- src/agents/openai-ws-stream.ts | 643 ++++++++++++------------ 7 files changed, 600 insertions(+), 328 deletions(-) create mode 100644 src/agents/openai-ws-request.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 110e55f47d2..d02ae024349 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,7 +22,7 @@ Docs: https://docs.openclaw.ai ### Fixes -- Providers/OpenAI: preserve native `reasoning.effort: "none"` and strict tool schemas on direct OpenAI-family endpoints, keep OpenAI-compatible proxies on the older compat shim path, and enable OpenAI WebSocket warm-up by default for native Responses routes. +- Providers/OpenAI: preserve native `reasoning.effort: "none"` and strict tool schemas on direct OpenAI-family endpoints, keep OpenAI-compatible proxies on the older compat shim path, fix Responses WebSocket warm-up payloads, and retry one early retryable WebSocket failure before HTTP fallback while keeping forced WebSocket errors explicit. - Providers/OpenAI Codex: split native `contextWindow` from runtime `contextTokens` for `openai-codex/gpt-5.4`, keep the default effective cap at `272000`, and expose a per-model config override via `models.providers.*.models[].contextTokens`. - Skills/uv install: block workspace `.env` from overriding `UV_PYTHON` and strip related interpreter override keys from uv skill-install subprocesses so repository-controlled env files cannot steer the selected Python runtime. (#59178) Thanks @pgondhi987. - Telegram/reactions: preserve `reactionNotifications: "own"` across gateway restarts by persisting sent-message ownership state instead of treating cold cache as a permissive fallback. (#59207) Thanks @samzong. diff --git a/docs/providers/openai.md b/docs/providers/openai.md index 63c6690f2cc..35b1cea7f1c 100644 --- a/docs/providers/openai.md +++ b/docs/providers/openai.md @@ -184,6 +184,10 @@ OpenClaw uses `pi-ai` for model streaming. For both `openai/*` and `openai-codex/*`, default transport is `"auto"` (WebSocket-first, then SSE fallback). +In `"auto"` mode, OpenClaw also retries one early, retryable WebSocket failure +before it falls back to SSE. Forced `"websocket"` mode still surfaces transport +errors directly instead of hiding them behind fallback. + You can set `agents.defaults.models..params.transport`: - `"sse"`: force SSE diff --git a/src/agents/openai-ws-connection.test.ts b/src/agents/openai-ws-connection.test.ts index 7bc143942db..e698f3173c1 100644 --- a/src/agents/openai-ws-connection.test.ts +++ b/src/agents/openai-ws-connection.test.ts @@ -290,8 +290,10 @@ describe("OpenAIWebSocketManager", () => { it("resolves when the connection opens", async () => { const manager = buildManager(); const connectPromise = manager.connect("sk-test"); + expect(manager.connectionState).toBe("connecting"); lastSocket().simulateOpen(); await expect(connectPromise).resolves.toBeUndefined(); + expect(manager.connectionState).toBe("open"); }); it("rejects when the initial connection fails (maxRetries=0)", async () => { @@ -516,6 +518,7 @@ describe("OpenAIWebSocketManager", () => { it("is safe to call before connect()", () => { const manager = buildManager(); expect(() => manager.close()).not.toThrow(); + expect(manager.connectionState).toBe("closed"); }); }); @@ -533,6 +536,12 @@ describe("OpenAIWebSocketManager", () => { // Simulate a network drop sock1.simulateClose(1006, "Network error"); + expect(manager.connectionState).toBe("reconnecting"); + expect(manager.lastCloseInfo).toEqual({ + code: 1006, + reason: "Network error", + retryable: true, + }); // Advance time to trigger first retry (10ms delay) await vi.advanceTimersByTimeAsync(15); @@ -542,6 +551,27 @@ describe("OpenAIWebSocketManager", () => { expect(lastSocket()).not.toBe(sock1); }); + it("does not reconnect on non-retryable close codes", async () => { + const manager = buildManager({ backoffDelaysMs: [10, 20] }); + const p = manager.connect("sk-test"); + lastSocket().simulateOpen(); + await p; + + const sock = lastSocket(); + const instancesBefore = MockWebSocket.instances.length; + sock.simulateClose(1008, "policy violation"); + + await vi.advanceTimersByTimeAsync(25); + + expect(MockWebSocket.instances.length).toBe(instancesBefore); + expect(manager.connectionState).toBe("closed"); + expect(manager.lastCloseInfo).toEqual({ + code: 1008, + reason: "policy violation", + retryable: false, + }); + }); + it("stops retrying after maxRetries", async () => { const manager = buildManager({ maxRetries: 2, backoffDelaysMs: [5, 5] }); const p = manager.connect("sk-test"); @@ -642,6 +672,7 @@ describe("OpenAIWebSocketManager", () => { expect(sent["type"]).toBe("response.create"); expect(sent["generate"]).toBe(false); expect(sent["model"]).toBe("gpt-5.2"); + expect(sent["input"]).toEqual([]); expect(sent["instructions"]).toBe("You are helpful."); }); diff --git a/src/agents/openai-ws-connection.ts b/src/agents/openai-ws-connection.ts index 52d6f8623ff..6c982da8d42 100644 --- a/src/agents/openai-ws-connection.ts +++ b/src/agents/openai-ws-connection.ts @@ -15,6 +15,7 @@ import { EventEmitter } from "node:events"; import WebSocket, { type ClientOptions } from "ws"; +import { buildOpenAIWebSocketWarmUpPayload } from "./openai-ws-request.js"; import { buildProviderRequestTlsClientOptions, resolveProviderRequestPolicyConfig, @@ -284,6 +285,19 @@ export interface OpenAIWebSocketManagerOptions { request?: ProviderRequestTransportOverrides; } +export type OpenAIWebSocketConnectionState = + | "idle" + | "connecting" + | "open" + | "reconnecting" + | "closed"; + +export interface OpenAIWebSocketCloseInfo { + code: number; + reason: string; + retryable: boolean; +} + type InternalEvents = { message: [event: OpenAIWebSocketEvent]; open: []; @@ -317,6 +331,8 @@ export class OpenAIWebSocketManager extends EventEmitter { /** The ID of the most recent completed response on this connection. */ private _previousResponseId: string | null = null; + private _connectionState: OpenAIWebSocketConnectionState = "idle"; + private _lastCloseInfo: OpenAIWebSocketCloseInfo | null = null; private readonly wsUrl: string; private readonly maxRetries: number; @@ -344,6 +360,14 @@ export class OpenAIWebSocketManager extends EventEmitter { return this._previousResponseId; } + get connectionState(): OpenAIWebSocketConnectionState { + return this._connectionState; + } + + get lastCloseInfo(): OpenAIWebSocketCloseInfo | null { + return this._lastCloseInfo; + } + /** * Opens a WebSocket connection to the OpenAI Responses API. * Resolves when the connection is established (open event fires). @@ -353,6 +377,8 @@ export class OpenAIWebSocketManager extends EventEmitter { this.apiKey = apiKey; this.closed = false; this.retryCount = 0; + this._connectionState = "connecting"; + this._lastCloseInfo = null; return this._openConnection(); } @@ -392,6 +418,7 @@ export class OpenAIWebSocketManager extends EventEmitter { */ close(): void { this.closed = true; + this._connectionState = "closed"; this._cancelRetryTimer(); if (this.ws) { this.ws.removeAllListeners(); @@ -440,6 +467,8 @@ export class OpenAIWebSocketManager extends EventEmitter { const onOpen = () => { this.retryCount = 0; + this._connectionState = "open"; + this._lastCloseInfo = null; resolve(); this.emit("open"); }; @@ -454,15 +483,26 @@ export class OpenAIWebSocketManager extends EventEmitter { if (this.listenerCount("error") > 0) { this.emit("error", err); } + if (this._connectionState === "connecting" || this._connectionState === "reconnecting") { + this._connectionState = "closed"; + } reject(err); }; const onClose = (code: number, reason: Buffer) => { const reasonStr = reason.toString(); + const closeInfo = { + code, + reason: reasonStr, + retryable: isRetryableWebSocketClose(code), + } satisfies OpenAIWebSocketCloseInfo; + this._lastCloseInfo = closeInfo; this.emit("close", code, reasonStr); - if (!this.closed) { + if (!this.closed && closeInfo.retryable) { this._scheduleReconnect(); + } else { + this._connectionState = "closed"; } }; @@ -482,6 +522,7 @@ export class OpenAIWebSocketManager extends EventEmitter { return; } if (this.retryCount >= this.maxRetries) { + this._connectionState = "closed"; this._safeEmitError( new Error(`OpenAIWebSocketManager: max reconnect retries (${this.maxRetries}) exceeded.`), ); @@ -491,6 +532,7 @@ export class OpenAIWebSocketManager extends EventEmitter { const delayMs = this.backoffDelaysMs[Math.min(this.retryCount, this.backoffDelaysMs.length - 1)] ?? 1000; this.retryCount++; + this._connectionState = "reconnecting"; this.retryTimer = setTimeout(() => { if (this.closed) { @@ -566,17 +608,10 @@ export class OpenAIWebSocketManager extends EventEmitter { * Pass tools/instructions to prime the connection for the upcoming session. */ warmUp(params: { model: string; tools?: FunctionToolDefinition[]; instructions?: string }): void { - const event: WarmUpEvent = { - type: "response.create", - generate: false, - model: params.model, - ...(params.tools ? { tools: params.tools } : {}), - ...(params.instructions ? { instructions: params.instructions } : {}), - }; + const event = buildOpenAIWebSocketWarmUpPayload(params); this.send(event); } } - export function getOpenAIWebSocketErrorDetails(event: ErrorEvent): { status?: number; type?: string; @@ -592,3 +627,14 @@ export function getOpenAIWebSocketErrorDetails(event: ErrorEvent): { param: event.error?.param ?? event.param, }; } + +function isRetryableWebSocketClose(code: number): boolean { + return ( + code === 1001 || + code === 1005 || + code === 1006 || + code === 1011 || + code === 1012 || + code === 1013 + ); +} diff --git a/src/agents/openai-ws-request.ts b/src/agents/openai-ws-request.ts new file mode 100644 index 00000000000..fb3fb0e2816 --- /dev/null +++ b/src/agents/openai-ws-request.ts @@ -0,0 +1,113 @@ +import type { StreamFn } from "@mariozechner/pi-agent-core"; +import type { + FunctionToolDefinition, + InputItem, + ResponseCreateEvent, + WarmUpEvent, +} from "./openai-ws-connection.js"; +import { resolveOpenAITextVerbosity } from "./pi-embedded-runner/openai-stream-wrappers.js"; +import { resolveProviderRequestPolicyConfig } from "./provider-request-config.js"; + +type WsModel = Parameters[0]; +type WsContext = Parameters[1]; +type WsOptions = Parameters[2] & { + temperature?: number; + maxTokens?: number; + topP?: number; + toolChoice?: unknown; + textVerbosity?: string; + text_verbosity?: string; + reasoningEffort?: string; + reasoningSummary?: string; +}; + +export interface PlannedWsTurnInput { + inputItems: InputItem[]; + previousResponseId?: string; +} + +export function buildOpenAIWebSocketWarmUpPayload(params: { + model: string; + tools?: FunctionToolDefinition[]; + instructions?: string; +}): WarmUpEvent { + return { + type: "response.create", + generate: false, + model: params.model, + input: [], + ...(params.tools?.length ? { tools: params.tools } : {}), + ...(params.instructions ? { instructions: params.instructions } : {}), + }; +} + +export function buildOpenAIWebSocketResponseCreatePayload(params: { + model: WsModel; + context: WsContext; + options?: WsOptions; + turnInput: PlannedWsTurnInput; + tools: FunctionToolDefinition[]; +}): ResponseCreateEvent { + const extraParams: Record = {}; + const streamOpts = params.options; + + if (streamOpts?.temperature !== undefined) { + extraParams.temperature = streamOpts.temperature; + } + if (streamOpts?.maxTokens !== undefined) { + extraParams.max_output_tokens = streamOpts.maxTokens; + } + if (streamOpts?.topP !== undefined) { + extraParams.top_p = streamOpts.topP; + } + if (streamOpts?.toolChoice !== undefined) { + extraParams.tool_choice = streamOpts.toolChoice; + } + + if ( + streamOpts?.reasoningEffort !== "none" && + (streamOpts?.reasoningEffort || streamOpts?.reasoningSummary) + ) { + const reasoning: { effort?: string; summary?: string } = {}; + if (streamOpts.reasoningEffort !== undefined) { + reasoning.effort = streamOpts.reasoningEffort; + } + if (streamOpts.reasoningSummary !== undefined) { + reasoning.summary = streamOpts.reasoningSummary; + } + extraParams.reasoning = reasoning; + } + + const textVerbosity = resolveOpenAITextVerbosity( + streamOpts as Record | undefined, + ); + if (textVerbosity !== undefined) { + const existingText = + extraParams.text && typeof extraParams.text === "object" + ? (extraParams.text as Record) + : {}; + extraParams.text = { ...existingText, verbosity: textVerbosity }; + } + + const supportsResponsesStoreField = resolveProviderRequestPolicyConfig({ + provider: typeof params.model.provider === "string" ? params.model.provider : undefined, + api: typeof params.model.api === "string" ? params.model.api : undefined, + baseUrl: typeof params.model.baseUrl === "string" ? params.model.baseUrl : undefined, + compat: (params.model as { compat?: { supportsStore?: boolean } }).compat, + capability: "llm", + transport: "websocket", + }).capabilities.supportsResponsesStoreField; + + return { + type: "response.create", + model: params.model.id, + ...(supportsResponsesStoreField ? { store: false } : {}), + input: params.turnInput.inputItems, + instructions: params.context.systemPrompt ?? undefined, + tools: params.tools.length > 0 ? params.tools : undefined, + ...(params.turnInput.previousResponseId + ? { previous_response_id: params.turnInput.previousResponseId } + : {}), + ...extraParams, + }; +} diff --git a/src/agents/openai-ws-stream.test.ts b/src/agents/openai-ws-stream.test.ts index cfe25254ea8..a55b753a6fd 100644 --- a/src/agents/openai-ws-stream.test.ts +++ b/src/agents/openai-ws-stream.test.ts @@ -35,12 +35,14 @@ const { MockManager } = vi.hoisted(() => { // Shared mutable flag so inner class can see it let _globalConnectShouldFail = false; + let _globalSendFailuresRemaining = 0; class MockManager extends EventEmitter { private _listeners: AnyFn[] = []; private _previousResponseId: string | null = null; private _connected = false; private _broken = false; + private _lastCloseInfo: { code: number; reason: string; retryable: boolean } | null = null; sentEvents: unknown[] = []; connectCallCount = 0; @@ -54,6 +56,10 @@ const { MockManager } = vi.hoisted(() => { return this._previousResponseId; } + get lastCloseInfo(): { code: number; reason: string; retryable: boolean } | null { + return this._lastCloseInfo; + } + async connect(_apiKey: string): Promise { this.connectCallCount++; if (this.connectShouldFail || _globalConnectShouldFail) { @@ -70,7 +76,10 @@ const { MockManager } = vi.hoisted(() => { if (!this._connected) { throw new Error("cannot send — not connected"); } - if (this.sendShouldFail) { + if (this.sendShouldFail || _globalSendFailuresRemaining > 0) { + if (_globalSendFailuresRemaining > 0) { + _globalSendFailuresRemaining--; + } throw new Error("Mock send failure"); } this.sentEvents.push(event); @@ -112,6 +121,17 @@ const { MockManager } = vi.hoisted(() => { // Test helper: simulate WebSocket connection drop mid-request simulateClose(code = 1006, reason = "connection lost"): void { this._connected = false; + this._lastCloseInfo = { + code, + reason, + retryable: + code === 1001 || + code === 1005 || + code === 1006 || + code === 1011 || + code === 1012 || + code === 1013, + }; this.emit("close", code, reason); } @@ -162,10 +182,18 @@ const { MockManager } = vi.hoisted(() => { _globalConnectShouldFail = v; } + static get globalSendFailuresRemaining(): number { + return _globalSendFailuresRemaining; + } + static set globalSendFailuresRemaining(v: number) { + _globalSendFailuresRemaining = v; + } + static reset(): void { TrackedMockManager.lastInstance = null; TrackedMockManager.instances = []; _globalConnectShouldFail = false; + _globalSendFailuresRemaining = 0; } } @@ -1489,6 +1517,39 @@ describe("createOpenAIWebSocketStreamFn", () => { expect(doneEvent?.message?.content?.[0]?.text).toBe("http fallback response"); }); + it("retries one retryable mid-request close before falling back in auto mode", async () => { + const streamFn = createOpenAIWebSocketStreamFn("sk-test", "sess-runtime-retry"); + const stream = streamFn( + modelStub as Parameters[0], + contextStub as Parameters[1], + { transport: "auto" } as Parameters[2], + ); + + await new Promise((r) => setImmediate(r)); + const firstManager = MockManager.lastInstance!; + firstManager.simulateClose(1006, "connection lost"); + + await new Promise((r) => setImmediate(r)); + const secondManager = MockManager.lastInstance!; + expect(secondManager).not.toBe(firstManager); + expect(secondManager.connectCallCount).toBe(1); + + secondManager.simulateEvent({ + type: "response.completed", + response: makeResponseObject("resp-retried", "retry succeeded"), + }); + + const events: Array<{ type?: string; message?: { content?: Array<{ text?: string }> } }> = []; + for await (const ev of await resolveStream(stream)) { + events.push(ev as { type?: string; message?: { content?: Array<{ text?: string }> } }); + } + + expect(streamSimpleCalls).toHaveLength(0); + expect(firstManager.closeCallCount).toBeGreaterThanOrEqual(1); + expect(events.filter((event) => event.type === "start")).toHaveLength(1); + const doneEvent = events.find((event) => event.type === "done"); + expect(doneEvent?.message?.content?.[0]?.text).toBe("retry succeeded"); + }); it("tracks previous_response_id across turns (incremental send)", async () => { const sessionId = "sess-incremental"; const streamFn = createOpenAIWebSocketStreamFn("sk-test", sessionId); @@ -1686,7 +1747,7 @@ describe("createOpenAIWebSocketStreamFn", () => { expect((sent.tools ?? []).length).toBeGreaterThan(0); }); - it("resets session state and falls back to HTTP when send() throws", async () => { + it("falls back to HTTP after the websocket send retry budget is exhausted", async () => { const sessionId = "sess-send-fail-reset"; const streamFn = createOpenAIWebSocketStreamFn("sk-test", sessionId); @@ -1714,8 +1775,8 @@ describe("createOpenAIWebSocketStreamFn", () => { }); expect(hasWsSession(sessionId)).toBe(true); - // 2. Arm send failure and record pre-call streamSimpleCalls count - MockManager.lastInstance!.sendShouldFail = true; + // 2. Exhaust both websocket send attempts so auto mode must fall back. + MockManager.globalSendFailuresRemaining = 2; const callsBefore = streamSimpleCalls.length; // 3. Second call: send throws → must fall back to HTTP and clear registry @@ -1727,7 +1788,7 @@ describe("createOpenAIWebSocketStreamFn", () => { /* consume */ } - // Registry cleared after send failure + // Registry cleared after retry budget exhaustion + HTTP fallback expect(hasWsSession(sessionId)).toBe(false); // HTTP fallback invoked expect(streamSimpleCalls.length).toBeGreaterThan(callsBefore); diff --git a/src/agents/openai-ws-stream.ts b/src/agents/openai-ws-stream.ts index 5114f41e292..710b559c9ac 100644 --- a/src/agents/openai-ws-stream.ts +++ b/src/agents/openai-ws-stream.ts @@ -41,9 +41,8 @@ import { convertTools, planTurnInput, } from "./openai-ws-message-conversion.js"; +import { buildOpenAIWebSocketResponseCreatePayload } from "./openai-ws-request.js"; import { log } from "./pi-embedded-runner/logger.js"; -import { resolveOpenAITextVerbosity } from "./pi-embedded-runner/openai-stream-wrappers.js"; -import { resolveProviderRequestPolicyConfig } from "./provider-request-config.js"; import { buildAssistantMessageWithZeroUsage, buildStreamErrorAssistantMessage, @@ -208,6 +207,31 @@ export interface OpenAIWebSocketStreamOptions { type WsTransport = "sse" | "websocket" | "auto"; const WARM_UP_TIMEOUT_MS = 8_000; +const MAX_AUTO_WS_RUNTIME_RETRIES = 1; + +class OpenAIWebSocketRuntimeError extends Error { + readonly kind: "disconnect" | "send" | "server"; + readonly retryable: boolean; + readonly closeCode?: number; + readonly closeReason?: string; + + constructor( + message: string, + params: { + kind: "disconnect" | "send" | "server"; + retryable: boolean; + closeCode?: number; + closeReason?: string; + }, + ) { + super(message); + this.name = "OpenAIWebSocketRuntimeError"; + this.kind = params.kind; + this.retryable = params.retryable; + this.closeCode = params.closeCode; + this.closeReason = params.closeReason; + } +} function resolveWsTransport(options: Parameters[2]): WsTransport { const transport = (options as { transport?: unknown } | undefined)?.transport; @@ -263,6 +287,25 @@ function formatOpenAIWebSocketResponseFailure(response: { return "Unknown error (no error details in response)"; } +function normalizeWsRunError(err: unknown): OpenAIWebSocketRuntimeError { + if (err instanceof OpenAIWebSocketRuntimeError) { + return err; + } + return new OpenAIWebSocketRuntimeError(err instanceof Error ? err.message : String(err), { + kind: "server", + retryable: false, + }); +} + +function buildRetryableSendError(err: unknown): OpenAIWebSocketRuntimeError { + return new OpenAIWebSocketRuntimeError( + err instanceof Error ? err.message : `WebSocket send failed: ${String(err)}`, + { + kind: "send", + retryable: true, + }, + ); +} async function runWarmUp(params: { manager: OpenAIWebSocketManager; modelId: string; @@ -346,337 +389,311 @@ export function createOpenAIWebSocketStreamFn( return fallbackToHttp(model, context, options, apiKey, eventStream, opts.signal); } - // ── 1. Get or create session state ────────────────────────────────── - let session = wsRegistry.get(sessionId); - - if (!session) { - const manager = openAIWsStreamDeps.createManager(opts.managerOptions); - session = { - manager, - lastContextLength: 0, - everConnected: false, - warmUpAttempted: false, - broken: false, - }; - wsRegistry.set(sessionId, session); - } - - // ── 2. Ensure connection is open ───────────────────────────────────── - if (!session.manager.isConnected() && !session.broken) { - try { - await session.manager.connect(apiKey); - session.everConnected = true; - log.debug(`[ws-stream] connected for session=${sessionId}`); - } catch (connErr) { - // Cancel any background reconnect attempts before marking as broken. - try { - session.manager.close(); - } catch { - /* ignore */ - } - session.broken = true; - wsRegistry.delete(sessionId); - if (transport === "websocket") { - throw connErr instanceof Error ? connErr : new Error(String(connErr)); - } - log.warn( - `[ws-stream] WebSocket connect failed for session=${sessionId}; falling back to HTTP. error=${String(connErr)}`, - ); - // Fall back to HTTP immediately - return fallbackToHttp(model, context, options, apiKey, eventStream, opts.signal); - } - } - - if (session.broken || !session.manager.isConnected()) { - if (transport === "websocket") { - throw new Error("WebSocket session disconnected"); - } - log.warn(`[ws-stream] session=${sessionId} broken/disconnected; falling back to HTTP`); - // Clean up stale session to prevent next turn from using stale - // previousResponseId / lastContextLength after a mid-request drop. - try { - session.manager.close(); - } catch { - /* ignore */ - } - wsRegistry.delete(sessionId); - return fallbackToHttp(model, context, options, apiKey, eventStream, opts.signal); - } - const signal = opts.signal ?? (options as WsOptions | undefined)?.signal; + let emittedStart = false; + let runtimeRetries = 0; - if (resolveWsWarmup(options) && !session.warmUpAttempted) { - session.warmUpAttempted = true; - let warmupFailed = false; - try { - await runWarmUp({ - manager: session.manager, - modelId: model.id, - tools: convertTools(context.tools), - instructions: context.systemPrompt ?? undefined, - signal, - }); - log.debug(`[ws-stream] warm-up completed for session=${sessionId}`); - } catch (warmErr) { - if (signal?.aborted) { - throw warmErr instanceof Error ? warmErr : new Error(String(warmErr)); - } - warmupFailed = true; - log.warn( - `[ws-stream] warm-up failed for session=${sessionId}; continuing without warm-up. error=${String(warmErr)}`, - ); + while (true) { + let session = wsRegistry.get(sessionId); + if (!session) { + const manager = openAIWsStreamDeps.createManager(opts.managerOptions); + session = { + manager, + lastContextLength: 0, + everConnected: false, + warmUpAttempted: false, + broken: false, + }; + wsRegistry.set(sessionId, session); } - if (warmupFailed && !session.manager.isConnected()) { - try { - session.manager.close(); - } catch { - /* ignore */ - } + + if (!session.manager.isConnected() && !session.broken) { try { await session.manager.connect(apiKey); session.everConnected = true; - log.debug(`[ws-stream] reconnected after warm-up failure for session=${sessionId}`); - } catch (reconnectErr) { + log.debug(`[ws-stream] connected for session=${sessionId}`); + } catch (connErr) { + try { + session.manager.close(); + } catch { + /* ignore */ + } session.broken = true; wsRegistry.delete(sessionId); if (transport === "websocket") { - throw reconnectErr instanceof Error ? reconnectErr : new Error(String(reconnectErr)); + throw connErr instanceof Error ? connErr : new Error(String(connErr)); } log.warn( - `[ws-stream] reconnect after warm-up failed for session=${sessionId}; falling back to HTTP. error=${String(reconnectErr)}`, + `[ws-stream] WebSocket connect failed for session=${sessionId}; falling back to HTTP. error=${String(connErr)}`, ); - return fallbackToHttp(model, context, options, apiKey, eventStream, opts.signal); + return fallbackToHttp(model, context, options, apiKey, eventStream, opts.signal, { + suppressStart: emittedStart, + }); } } - } - // ── 3. Compute incremental vs full input ───────────────────────────── - const turnInput = planTurnInput({ - context, - model, - previousResponseId: session.manager.previousResponseId, - lastContextLength: session.lastContextLength, - }); - - if (turnInput.mode === "incremental_tool_results") { - log.debug( - `[ws-stream] session=${sessionId}: incremental send (${turnInput.inputItems.length} tool results) previous_response_id=${turnInput.previousResponseId}`, - ); - } else if (turnInput.mode === "full_context_restart") { - // The WebSocket guide requires a fresh full-context turn here: when we - // cannot continue the incremental chain, omit previous_response_id. - log.debug( - `[ws-stream] session=${sessionId}: no new tool results found; sending full context without previous_response_id`, - ); - } else { - log.debug( - `[ws-stream] session=${sessionId}: full context send (${turnInput.inputItems.length} items)`, - ); - } - - // ── 4. Build & send response.create ────────────────────────────────── - const tools = convertTools(context.tools); - - // Forward generation options that the HTTP path (openai-responses provider) also uses. - // Cast to record since SimpleStreamOptions carries openai-specific fields as unknown. - const streamOpts = options as - | (Record & { - temperature?: number; - maxTokens?: number; - topP?: number; - toolChoice?: unknown; - textVerbosity?: string; - text_verbosity?: string; - }) - | undefined; - const extraParams: Record = {}; - if (streamOpts?.temperature !== undefined) { - extraParams.temperature = streamOpts.temperature; - } - if (streamOpts?.maxTokens !== undefined) { - extraParams.max_output_tokens = streamOpts.maxTokens; - } - if (streamOpts?.topP !== undefined) { - extraParams.top_p = streamOpts.topP; - } - if (streamOpts?.toolChoice !== undefined) { - extraParams.tool_choice = streamOpts.toolChoice; - } - if ( - streamOpts?.reasoningEffort !== "none" && - (streamOpts?.reasoningEffort || streamOpts?.reasoningSummary) - ) { - const reasoning: { effort?: string; summary?: string } = {}; - if (streamOpts.reasoningEffort !== undefined) { - reasoning.effort = streamOpts.reasoningEffort as string; - } - if (streamOpts.reasoningSummary !== undefined) { - reasoning.summary = streamOpts.reasoningSummary as string; - } - extraParams.reasoning = reasoning; - } - const textVerbosity = resolveOpenAITextVerbosity( - streamOpts as Record | undefined, - ); - if (textVerbosity !== undefined) { - const existingText = - extraParams.text && typeof extraParams.text === "object" - ? (extraParams.text as Record) - : {}; - extraParams.text = { ...existingText, verbosity: textVerbosity }; - } - - // Respect compat.supportsStore — providers like Gemini reject unknown - // fields such as `store` with a 400 error. Fixes #39086. - const supportsResponsesStoreField = resolveProviderRequestPolicyConfig({ - provider: typeof model.provider === "string" ? model.provider : undefined, - api: typeof model.api === "string" ? model.api : undefined, - baseUrl: typeof model.baseUrl === "string" ? model.baseUrl : undefined, - compat: (model as { compat?: { supportsStore?: boolean } }).compat, - capability: "llm", - transport: "websocket", - }).capabilities.supportsResponsesStoreField; - - const payload: Record = { - type: "response.create", - model: model.id, - ...(supportsResponsesStoreField ? { store: false } : {}), - input: turnInput.inputItems, - instructions: context.systemPrompt ?? undefined, - tools: tools.length > 0 ? tools : undefined, - ...(turnInput.previousResponseId - ? { previous_response_id: turnInput.previousResponseId } - : {}), - ...extraParams, - }; - const nextPayload = options?.onPayload?.(payload, model); - const requestPayload = (nextPayload ?? payload) as Parameters< - OpenAIWebSocketManager["send"] - >[0]; - - try { - session.manager.send(requestPayload); - } catch (sendErr) { - if (transport === "websocket") { - throw sendErr instanceof Error ? sendErr : new Error(String(sendErr)); - } - log.warn( - `[ws-stream] send failed for session=${sessionId}; falling back to HTTP. error=${String(sendErr)}`, - ); - // Fully reset session state so the next WS turn doesn't use stale - // previous_response_id or lastContextLength from before the failure. - resetWsSession({ sessionId, session }); - return fallbackToHttp(model, context, options, apiKey, eventStream, opts.signal); - } - - eventStream.push({ - type: "start", - partial: buildAssistantMessageWithZeroUsage({ - model, - content: [], - stopReason: "stop", - }), - }); - - // ── 5. Wait for response.completed ─────────────────────────────────── - const capturedContextLength = context.messages.length; - let sawWsOutput = false; - - try { - await new Promise((resolve, reject) => { - // Honour abort signal - const abortHandler = () => { - cleanup(); - reject(new Error("aborted")); - }; - if (signal?.aborted) { - reject(new Error("aborted")); - return; + if (session.broken || !session.manager.isConnected()) { + if (transport === "websocket") { + throw new Error("WebSocket session disconnected"); } - signal?.addEventListener("abort", abortHandler, { once: true }); - - // If the WebSocket drops mid-request, reject so we don't hang forever. - const closeHandler = (code: number, reason: string) => { - cleanup(); - reject( - new Error( - `WebSocket closed mid-request (code=${code}, reason=${reason || "unknown"})`, - ), - ); - }; - session.manager.on("close", closeHandler); - - const cleanup = () => { - signal?.removeEventListener("abort", abortHandler); - session.manager.off("close", closeHandler); - unsubscribe(); - }; - - const unsubscribe = session.manager.onMessage((event) => { - if ( - event.type === "response.output_item.added" || - event.type === "response.output_item.done" || - event.type === "response.content_part.added" || - event.type === "response.content_part.done" || - event.type === "response.output_text.delta" || - event.type === "response.output_text.done" || - event.type === "response.function_call_arguments.delta" || - event.type === "response.function_call_arguments.done" - ) { - sawWsOutput = true; - } - - if (event.type === "response.completed") { - cleanup(); - // Update session state - session.lastContextLength = capturedContextLength; - // Build and emit the assistant message - const assistantMsg = buildAssistantMessageFromResponse(event.response, { - api: model.api, - provider: model.provider, - id: model.id, - }); - const reason: Extract = - assistantMsg.stopReason === "toolUse" ? "toolUse" : "stop"; - eventStream.push({ type: "done", reason, message: assistantMsg }); - resolve(); - } else if (event.type === "response.failed") { - cleanup(); - reject( - new Error( - `OpenAI WebSocket response failed: ${formatOpenAIWebSocketResponseFailure(event.response)}`, - ), - ); - } else if (event.type === "error") { - cleanup(); - reject(new Error(`OpenAI WebSocket error: ${formatOpenAIWebSocketError(event)}`)); - } else if (event.type === "response.output_text.delta") { - // Stream partial text updates for responsive UI - const partialMsg: AssistantMessage = buildAssistantMessageWithZeroUsage({ - model, - content: [{ type: "text", text: event.delta }], - stopReason: "stop", - }); - eventStream.push({ - type: "text_delta", - contentIndex: 0, - delta: event.delta, - partial: partialMsg, - }); - } - }); - }); - } catch (wsRunErr) { - if (transport !== "websocket" && !signal?.aborted && !sawWsOutput) { - log.warn( - `[ws-stream] session=${sessionId} runtime failure before output; falling back to HTTP. error=${String(wsRunErr)}`, - ); + log.warn(`[ws-stream] session=${sessionId} broken/disconnected; falling back to HTTP`); resetWsSession({ sessionId, session }); return fallbackToHttp(model, context, options, apiKey, eventStream, opts.signal, { - suppressStart: true, + suppressStart: emittedStart, }); } - throw wsRunErr; + + if (resolveWsWarmup(options) && !session.warmUpAttempted) { + session.warmUpAttempted = true; + let warmupFailed = false; + try { + await runWarmUp({ + manager: session.manager, + modelId: model.id, + tools: convertTools(context.tools), + instructions: context.systemPrompt ?? undefined, + signal, + }); + log.debug(`[ws-stream] warm-up completed for session=${sessionId}`); + } catch (warmErr) { + if (signal?.aborted) { + throw warmErr instanceof Error ? warmErr : new Error(String(warmErr)); + } + warmupFailed = true; + log.warn( + `[ws-stream] warm-up failed for session=${sessionId}; continuing without warm-up. error=${String(warmErr)}`, + ); + } + if (warmupFailed && !session.manager.isConnected()) { + try { + session.manager.close(); + } catch { + /* ignore */ + } + try { + await session.manager.connect(apiKey); + session.everConnected = true; + log.debug(`[ws-stream] reconnected after warm-up failure for session=${sessionId}`); + } catch (reconnectErr) { + session.broken = true; + wsRegistry.delete(sessionId); + if (transport === "websocket") { + throw reconnectErr instanceof Error + ? reconnectErr + : new Error(String(reconnectErr)); + } + log.warn( + `[ws-stream] reconnect after warm-up failed for session=${sessionId}; falling back to HTTP. error=${String(reconnectErr)}`, + ); + return fallbackToHttp(model, context, options, apiKey, eventStream, opts.signal, { + suppressStart: emittedStart, + }); + } + } + } + + const turnInput = planTurnInput({ + context, + model, + previousResponseId: session.manager.previousResponseId, + lastContextLength: session.lastContextLength, + }); + + if (turnInput.mode === "incremental_tool_results") { + log.debug( + `[ws-stream] session=${sessionId}: incremental send (${turnInput.inputItems.length} tool results) previous_response_id=${turnInput.previousResponseId}`, + ); + } else if (turnInput.mode === "full_context_restart") { + log.debug( + `[ws-stream] session=${sessionId}: no new tool results found; sending full context without previous_response_id`, + ); + } else { + log.debug( + `[ws-stream] session=${sessionId}: full context send (${turnInput.inputItems.length} items)`, + ); + } + + const payload = buildOpenAIWebSocketResponseCreatePayload({ + model, + context, + options: options as WsOptions | undefined, + turnInput, + tools: convertTools(context.tools), + }) as Record; + const nextPayload = options?.onPayload?.(payload, model); + const requestPayload = (nextPayload ?? payload) as Parameters< + OpenAIWebSocketManager["send"] + >[0]; + + try { + session.manager.send(requestPayload); + } catch (sendErr) { + const normalizedErr = buildRetryableSendError(sendErr); + if ( + transport !== "websocket" && + !signal?.aborted && + runtimeRetries < MAX_AUTO_WS_RUNTIME_RETRIES + ) { + runtimeRetries++; + log.warn( + `[ws-stream] retrying websocket turn after send failure for session=${sessionId} (${runtimeRetries}/${MAX_AUTO_WS_RUNTIME_RETRIES}). error=${normalizedErr.message}`, + ); + resetWsSession({ sessionId, session }); + continue; + } + if (transport !== "websocket") { + log.warn( + `[ws-stream] send failed for session=${sessionId}; falling back to HTTP. error=${normalizedErr.message}`, + ); + resetWsSession({ sessionId, session }); + return fallbackToHttp(model, context, options, apiKey, eventStream, opts.signal, { + suppressStart: emittedStart, + }); + } + throw normalizedErr; + } + + if (!emittedStart) { + eventStream.push({ + type: "start", + partial: buildAssistantMessageWithZeroUsage({ + model, + content: [], + stopReason: "stop", + }), + }); + emittedStart = true; + } + + const capturedContextLength = context.messages.length; + let sawWsOutput = false; + + try { + await new Promise((resolve, reject) => { + const abortHandler = () => { + cleanup(); + reject(new Error("aborted")); + }; + if (signal?.aborted) { + reject(new Error("aborted")); + return; + } + signal?.addEventListener("abort", abortHandler, { once: true }); + + const closeHandler = (code: number, reason: string) => { + cleanup(); + const closeInfo = session.manager.lastCloseInfo; + reject( + new OpenAIWebSocketRuntimeError( + `WebSocket closed mid-request (code=${code}, reason=${reason || "unknown"})`, + { + kind: "disconnect", + retryable: closeInfo?.retryable ?? true, + closeCode: closeInfo?.code ?? code, + closeReason: closeInfo?.reason ?? reason, + }, + ), + ); + }; + session.manager.on("close", closeHandler); + + const cleanup = () => { + signal?.removeEventListener("abort", abortHandler); + session.manager.off("close", closeHandler); + unsubscribe(); + }; + + const unsubscribe = session.manager.onMessage((event) => { + if ( + event.type === "response.output_item.added" || + event.type === "response.output_item.done" || + event.type === "response.content_part.added" || + event.type === "response.content_part.done" || + event.type === "response.output_text.delta" || + event.type === "response.output_text.done" || + event.type === "response.function_call_arguments.delta" || + event.type === "response.function_call_arguments.done" + ) { + sawWsOutput = true; + } + + if (event.type === "response.completed") { + cleanup(); + session.lastContextLength = capturedContextLength; + const assistantMsg = buildAssistantMessageFromResponse(event.response, { + api: model.api, + provider: model.provider, + id: model.id, + }); + const reason: Extract = + assistantMsg.stopReason === "toolUse" ? "toolUse" : "stop"; + eventStream.push({ type: "done", reason, message: assistantMsg }); + resolve(); + } else if (event.type === "response.failed") { + cleanup(); + reject( + new OpenAIWebSocketRuntimeError( + `OpenAI WebSocket response failed: ${formatOpenAIWebSocketResponseFailure(event.response)}`, + { + kind: "server", + retryable: false, + }, + ), + ); + } else if (event.type === "error") { + cleanup(); + reject( + new OpenAIWebSocketRuntimeError( + `OpenAI WebSocket error: ${formatOpenAIWebSocketError(event)}`, + { + kind: "server", + retryable: false, + }, + ), + ); + } else if (event.type === "response.output_text.delta") { + const partialMsg: AssistantMessage = buildAssistantMessageWithZeroUsage({ + model, + content: [{ type: "text", text: event.delta }], + stopReason: "stop", + }); + eventStream.push({ + type: "text_delta", + contentIndex: 0, + delta: event.delta, + partial: partialMsg, + }); + } + }); + }); + return; + } catch (wsRunErr) { + const normalizedErr = normalizeWsRunError(wsRunErr); + if ( + transport !== "websocket" && + !signal?.aborted && + normalizedErr.retryable && + !sawWsOutput && + runtimeRetries < MAX_AUTO_WS_RUNTIME_RETRIES + ) { + runtimeRetries++; + log.warn( + `[ws-stream] retrying websocket turn after retryable runtime failure for session=${sessionId} (${runtimeRetries}/${MAX_AUTO_WS_RUNTIME_RETRIES}). error=${normalizedErr.message}`, + ); + resetWsSession({ sessionId, session }); + continue; + } + if (transport !== "websocket" && !signal?.aborted && !sawWsOutput) { + log.warn( + `[ws-stream] session=${sessionId} runtime failure before output; falling back to HTTP. error=${normalizedErr.message}`, + ); + resetWsSession({ sessionId, session }); + return fallbackToHttp(model, context, options, apiKey, eventStream, opts.signal, { + suppressStart: true, + }); + } + throw normalizedErr; + } } };