fix: harden OpenAI websocket transport

This commit is contained in:
Peter Steinberger 2026-04-04 02:11:22 +01:00
parent 1e6e685347
commit b76ed0fadf
No known key found for this signature in database
7 changed files with 600 additions and 328 deletions

View File

@ -22,7 +22,7 @@ Docs: https://docs.openclaw.ai
### Fixes
- Providers/OpenAI: preserve native `reasoning.effort: "none"` and strict tool schemas on direct OpenAI-family endpoints, keep OpenAI-compatible proxies on the older compat shim path, and enable OpenAI WebSocket warm-up by default for native Responses routes.
- Providers/OpenAI: preserve native `reasoning.effort: "none"` and strict tool schemas on direct OpenAI-family endpoints, keep OpenAI-compatible proxies on the older compat shim path, fix Responses WebSocket warm-up payloads, and retry one early retryable WebSocket failure before HTTP fallback while keeping forced WebSocket errors explicit.
- Providers/OpenAI Codex: split native `contextWindow` from runtime `contextTokens` for `openai-codex/gpt-5.4`, keep the default effective cap at `272000`, and expose a per-model config override via `models.providers.*.models[].contextTokens`.
- Skills/uv install: block workspace `.env` from overriding `UV_PYTHON` and strip related interpreter override keys from uv skill-install subprocesses so repository-controlled env files cannot steer the selected Python runtime. (#59178) Thanks @pgondhi987.
- Telegram/reactions: preserve `reactionNotifications: "own"` across gateway restarts by persisting sent-message ownership state instead of treating cold cache as a permissive fallback. (#59207) Thanks @samzong.

View File

@ -184,6 +184,10 @@ OpenClaw uses `pi-ai` for model streaming. For both `openai/*` and
`openai-codex/*`, default transport is `"auto"` (WebSocket-first, then SSE
fallback).
In `"auto"` mode, OpenClaw also retries one early, retryable WebSocket failure
before it falls back to SSE. Forced `"websocket"` mode still surfaces transport
errors directly instead of hiding them behind fallback.
You can set `agents.defaults.models.<provider/model>.params.transport`:
- `"sse"`: force SSE

View File

@ -290,8 +290,10 @@ describe("OpenAIWebSocketManager", () => {
it("resolves when the connection opens", async () => {
const manager = buildManager();
const connectPromise = manager.connect("sk-test");
expect(manager.connectionState).toBe("connecting");
lastSocket().simulateOpen();
await expect(connectPromise).resolves.toBeUndefined();
expect(manager.connectionState).toBe("open");
});
it("rejects when the initial connection fails (maxRetries=0)", async () => {
@ -516,6 +518,7 @@ describe("OpenAIWebSocketManager", () => {
it("is safe to call before connect()", () => {
const manager = buildManager();
expect(() => manager.close()).not.toThrow();
expect(manager.connectionState).toBe("closed");
});
});
@ -533,6 +536,12 @@ describe("OpenAIWebSocketManager", () => {
// Simulate a network drop
sock1.simulateClose(1006, "Network error");
expect(manager.connectionState).toBe("reconnecting");
expect(manager.lastCloseInfo).toEqual({
code: 1006,
reason: "Network error",
retryable: true,
});
// Advance time to trigger first retry (10ms delay)
await vi.advanceTimersByTimeAsync(15);
@ -542,6 +551,27 @@ describe("OpenAIWebSocketManager", () => {
expect(lastSocket()).not.toBe(sock1);
});
it("does not reconnect on non-retryable close codes", async () => {
const manager = buildManager({ backoffDelaysMs: [10, 20] });
const p = manager.connect("sk-test");
lastSocket().simulateOpen();
await p;
const sock = lastSocket();
const instancesBefore = MockWebSocket.instances.length;
sock.simulateClose(1008, "policy violation");
await vi.advanceTimersByTimeAsync(25);
expect(MockWebSocket.instances.length).toBe(instancesBefore);
expect(manager.connectionState).toBe("closed");
expect(manager.lastCloseInfo).toEqual({
code: 1008,
reason: "policy violation",
retryable: false,
});
});
it("stops retrying after maxRetries", async () => {
const manager = buildManager({ maxRetries: 2, backoffDelaysMs: [5, 5] });
const p = manager.connect("sk-test");
@ -642,6 +672,7 @@ describe("OpenAIWebSocketManager", () => {
expect(sent["type"]).toBe("response.create");
expect(sent["generate"]).toBe(false);
expect(sent["model"]).toBe("gpt-5.2");
expect(sent["input"]).toEqual([]);
expect(sent["instructions"]).toBe("You are helpful.");
});

View File

@ -15,6 +15,7 @@
import { EventEmitter } from "node:events";
import WebSocket, { type ClientOptions } from "ws";
import { buildOpenAIWebSocketWarmUpPayload } from "./openai-ws-request.js";
import {
buildProviderRequestTlsClientOptions,
resolveProviderRequestPolicyConfig,
@ -284,6 +285,19 @@ export interface OpenAIWebSocketManagerOptions {
request?: ProviderRequestTransportOverrides;
}
export type OpenAIWebSocketConnectionState =
| "idle"
| "connecting"
| "open"
| "reconnecting"
| "closed";
export interface OpenAIWebSocketCloseInfo {
code: number;
reason: string;
retryable: boolean;
}
type InternalEvents = {
message: [event: OpenAIWebSocketEvent];
open: [];
@ -317,6 +331,8 @@ export class OpenAIWebSocketManager extends EventEmitter<InternalEvents> {
/** The ID of the most recent completed response on this connection. */
private _previousResponseId: string | null = null;
private _connectionState: OpenAIWebSocketConnectionState = "idle";
private _lastCloseInfo: OpenAIWebSocketCloseInfo | null = null;
private readonly wsUrl: string;
private readonly maxRetries: number;
@ -344,6 +360,14 @@ export class OpenAIWebSocketManager extends EventEmitter<InternalEvents> {
return this._previousResponseId;
}
get connectionState(): OpenAIWebSocketConnectionState {
return this._connectionState;
}
get lastCloseInfo(): OpenAIWebSocketCloseInfo | null {
return this._lastCloseInfo;
}
/**
* Opens a WebSocket connection to the OpenAI Responses API.
* Resolves when the connection is established (open event fires).
@ -353,6 +377,8 @@ export class OpenAIWebSocketManager extends EventEmitter<InternalEvents> {
this.apiKey = apiKey;
this.closed = false;
this.retryCount = 0;
this._connectionState = "connecting";
this._lastCloseInfo = null;
return this._openConnection();
}
@ -392,6 +418,7 @@ export class OpenAIWebSocketManager extends EventEmitter<InternalEvents> {
*/
close(): void {
this.closed = true;
this._connectionState = "closed";
this._cancelRetryTimer();
if (this.ws) {
this.ws.removeAllListeners();
@ -440,6 +467,8 @@ export class OpenAIWebSocketManager extends EventEmitter<InternalEvents> {
const onOpen = () => {
this.retryCount = 0;
this._connectionState = "open";
this._lastCloseInfo = null;
resolve();
this.emit("open");
};
@ -454,15 +483,26 @@ export class OpenAIWebSocketManager extends EventEmitter<InternalEvents> {
if (this.listenerCount("error") > 0) {
this.emit("error", err);
}
if (this._connectionState === "connecting" || this._connectionState === "reconnecting") {
this._connectionState = "closed";
}
reject(err);
};
const onClose = (code: number, reason: Buffer) => {
const reasonStr = reason.toString();
const closeInfo = {
code,
reason: reasonStr,
retryable: isRetryableWebSocketClose(code),
} satisfies OpenAIWebSocketCloseInfo;
this._lastCloseInfo = closeInfo;
this.emit("close", code, reasonStr);
if (!this.closed) {
if (!this.closed && closeInfo.retryable) {
this._scheduleReconnect();
} else {
this._connectionState = "closed";
}
};
@ -482,6 +522,7 @@ export class OpenAIWebSocketManager extends EventEmitter<InternalEvents> {
return;
}
if (this.retryCount >= this.maxRetries) {
this._connectionState = "closed";
this._safeEmitError(
new Error(`OpenAIWebSocketManager: max reconnect retries (${this.maxRetries}) exceeded.`),
);
@ -491,6 +532,7 @@ export class OpenAIWebSocketManager extends EventEmitter<InternalEvents> {
const delayMs =
this.backoffDelaysMs[Math.min(this.retryCount, this.backoffDelaysMs.length - 1)] ?? 1000;
this.retryCount++;
this._connectionState = "reconnecting";
this.retryTimer = setTimeout(() => {
if (this.closed) {
@ -566,17 +608,10 @@ export class OpenAIWebSocketManager extends EventEmitter<InternalEvents> {
* Pass tools/instructions to prime the connection for the upcoming session.
*/
warmUp(params: { model: string; tools?: FunctionToolDefinition[]; instructions?: string }): void {
const event: WarmUpEvent = {
type: "response.create",
generate: false,
model: params.model,
...(params.tools ? { tools: params.tools } : {}),
...(params.instructions ? { instructions: params.instructions } : {}),
};
const event = buildOpenAIWebSocketWarmUpPayload(params);
this.send(event);
}
}
export function getOpenAIWebSocketErrorDetails(event: ErrorEvent): {
status?: number;
type?: string;
@ -592,3 +627,14 @@ export function getOpenAIWebSocketErrorDetails(event: ErrorEvent): {
param: event.error?.param ?? event.param,
};
}
function isRetryableWebSocketClose(code: number): boolean {
return (
code === 1001 ||
code === 1005 ||
code === 1006 ||
code === 1011 ||
code === 1012 ||
code === 1013
);
}

View File

@ -0,0 +1,113 @@
import type { StreamFn } from "@mariozechner/pi-agent-core";
import type {
FunctionToolDefinition,
InputItem,
ResponseCreateEvent,
WarmUpEvent,
} from "./openai-ws-connection.js";
import { resolveOpenAITextVerbosity } from "./pi-embedded-runner/openai-stream-wrappers.js";
import { resolveProviderRequestPolicyConfig } from "./provider-request-config.js";
type WsModel = Parameters<StreamFn>[0];
type WsContext = Parameters<StreamFn>[1];
type WsOptions = Parameters<StreamFn>[2] & {
temperature?: number;
maxTokens?: number;
topP?: number;
toolChoice?: unknown;
textVerbosity?: string;
text_verbosity?: string;
reasoningEffort?: string;
reasoningSummary?: string;
};
export interface PlannedWsTurnInput {
inputItems: InputItem[];
previousResponseId?: string;
}
export function buildOpenAIWebSocketWarmUpPayload(params: {
model: string;
tools?: FunctionToolDefinition[];
instructions?: string;
}): WarmUpEvent {
return {
type: "response.create",
generate: false,
model: params.model,
input: [],
...(params.tools?.length ? { tools: params.tools } : {}),
...(params.instructions ? { instructions: params.instructions } : {}),
};
}
export function buildOpenAIWebSocketResponseCreatePayload(params: {
model: WsModel;
context: WsContext;
options?: WsOptions;
turnInput: PlannedWsTurnInput;
tools: FunctionToolDefinition[];
}): ResponseCreateEvent {
const extraParams: Record<string, unknown> = {};
const streamOpts = params.options;
if (streamOpts?.temperature !== undefined) {
extraParams.temperature = streamOpts.temperature;
}
if (streamOpts?.maxTokens !== undefined) {
extraParams.max_output_tokens = streamOpts.maxTokens;
}
if (streamOpts?.topP !== undefined) {
extraParams.top_p = streamOpts.topP;
}
if (streamOpts?.toolChoice !== undefined) {
extraParams.tool_choice = streamOpts.toolChoice;
}
if (
streamOpts?.reasoningEffort !== "none" &&
(streamOpts?.reasoningEffort || streamOpts?.reasoningSummary)
) {
const reasoning: { effort?: string; summary?: string } = {};
if (streamOpts.reasoningEffort !== undefined) {
reasoning.effort = streamOpts.reasoningEffort;
}
if (streamOpts.reasoningSummary !== undefined) {
reasoning.summary = streamOpts.reasoningSummary;
}
extraParams.reasoning = reasoning;
}
const textVerbosity = resolveOpenAITextVerbosity(
streamOpts as Record<string, unknown> | undefined,
);
if (textVerbosity !== undefined) {
const existingText =
extraParams.text && typeof extraParams.text === "object"
? (extraParams.text as Record<string, unknown>)
: {};
extraParams.text = { ...existingText, verbosity: textVerbosity };
}
const supportsResponsesStoreField = resolveProviderRequestPolicyConfig({
provider: typeof params.model.provider === "string" ? params.model.provider : undefined,
api: typeof params.model.api === "string" ? params.model.api : undefined,
baseUrl: typeof params.model.baseUrl === "string" ? params.model.baseUrl : undefined,
compat: (params.model as { compat?: { supportsStore?: boolean } }).compat,
capability: "llm",
transport: "websocket",
}).capabilities.supportsResponsesStoreField;
return {
type: "response.create",
model: params.model.id,
...(supportsResponsesStoreField ? { store: false } : {}),
input: params.turnInput.inputItems,
instructions: params.context.systemPrompt ?? undefined,
tools: params.tools.length > 0 ? params.tools : undefined,
...(params.turnInput.previousResponseId
? { previous_response_id: params.turnInput.previousResponseId }
: {}),
...extraParams,
};
}

View File

@ -35,12 +35,14 @@ const { MockManager } = vi.hoisted(() => {
// Shared mutable flag so inner class can see it
let _globalConnectShouldFail = false;
let _globalSendFailuresRemaining = 0;
class MockManager extends EventEmitter {
private _listeners: AnyFn[] = [];
private _previousResponseId: string | null = null;
private _connected = false;
private _broken = false;
private _lastCloseInfo: { code: number; reason: string; retryable: boolean } | null = null;
sentEvents: unknown[] = [];
connectCallCount = 0;
@ -54,6 +56,10 @@ const { MockManager } = vi.hoisted(() => {
return this._previousResponseId;
}
get lastCloseInfo(): { code: number; reason: string; retryable: boolean } | null {
return this._lastCloseInfo;
}
async connect(_apiKey: string): Promise<void> {
this.connectCallCount++;
if (this.connectShouldFail || _globalConnectShouldFail) {
@ -70,7 +76,10 @@ const { MockManager } = vi.hoisted(() => {
if (!this._connected) {
throw new Error("cannot send — not connected");
}
if (this.sendShouldFail) {
if (this.sendShouldFail || _globalSendFailuresRemaining > 0) {
if (_globalSendFailuresRemaining > 0) {
_globalSendFailuresRemaining--;
}
throw new Error("Mock send failure");
}
this.sentEvents.push(event);
@ -112,6 +121,17 @@ const { MockManager } = vi.hoisted(() => {
// Test helper: simulate WebSocket connection drop mid-request
simulateClose(code = 1006, reason = "connection lost"): void {
this._connected = false;
this._lastCloseInfo = {
code,
reason,
retryable:
code === 1001 ||
code === 1005 ||
code === 1006 ||
code === 1011 ||
code === 1012 ||
code === 1013,
};
this.emit("close", code, reason);
}
@ -162,10 +182,18 @@ const { MockManager } = vi.hoisted(() => {
_globalConnectShouldFail = v;
}
static get globalSendFailuresRemaining(): number {
return _globalSendFailuresRemaining;
}
static set globalSendFailuresRemaining(v: number) {
_globalSendFailuresRemaining = v;
}
static reset(): void {
TrackedMockManager.lastInstance = null;
TrackedMockManager.instances = [];
_globalConnectShouldFail = false;
_globalSendFailuresRemaining = 0;
}
}
@ -1489,6 +1517,39 @@ describe("createOpenAIWebSocketStreamFn", () => {
expect(doneEvent?.message?.content?.[0]?.text).toBe("http fallback response");
});
it("retries one retryable mid-request close before falling back in auto mode", async () => {
const streamFn = createOpenAIWebSocketStreamFn("sk-test", "sess-runtime-retry");
const stream = streamFn(
modelStub as Parameters<typeof streamFn>[0],
contextStub as Parameters<typeof streamFn>[1],
{ transport: "auto" } as Parameters<typeof streamFn>[2],
);
await new Promise((r) => setImmediate(r));
const firstManager = MockManager.lastInstance!;
firstManager.simulateClose(1006, "connection lost");
await new Promise((r) => setImmediate(r));
const secondManager = MockManager.lastInstance!;
expect(secondManager).not.toBe(firstManager);
expect(secondManager.connectCallCount).toBe(1);
secondManager.simulateEvent({
type: "response.completed",
response: makeResponseObject("resp-retried", "retry succeeded"),
});
const events: Array<{ type?: string; message?: { content?: Array<{ text?: string }> } }> = [];
for await (const ev of await resolveStream(stream)) {
events.push(ev as { type?: string; message?: { content?: Array<{ text?: string }> } });
}
expect(streamSimpleCalls).toHaveLength(0);
expect(firstManager.closeCallCount).toBeGreaterThanOrEqual(1);
expect(events.filter((event) => event.type === "start")).toHaveLength(1);
const doneEvent = events.find((event) => event.type === "done");
expect(doneEvent?.message?.content?.[0]?.text).toBe("retry succeeded");
});
it("tracks previous_response_id across turns (incremental send)", async () => {
const sessionId = "sess-incremental";
const streamFn = createOpenAIWebSocketStreamFn("sk-test", sessionId);
@ -1686,7 +1747,7 @@ describe("createOpenAIWebSocketStreamFn", () => {
expect((sent.tools ?? []).length).toBeGreaterThan(0);
});
it("resets session state and falls back to HTTP when send() throws", async () => {
it("falls back to HTTP after the websocket send retry budget is exhausted", async () => {
const sessionId = "sess-send-fail-reset";
const streamFn = createOpenAIWebSocketStreamFn("sk-test", sessionId);
@ -1714,8 +1775,8 @@ describe("createOpenAIWebSocketStreamFn", () => {
});
expect(hasWsSession(sessionId)).toBe(true);
// 2. Arm send failure and record pre-call streamSimpleCalls count
MockManager.lastInstance!.sendShouldFail = true;
// 2. Exhaust both websocket send attempts so auto mode must fall back.
MockManager.globalSendFailuresRemaining = 2;
const callsBefore = streamSimpleCalls.length;
// 3. Second call: send throws → must fall back to HTTP and clear registry
@ -1727,7 +1788,7 @@ describe("createOpenAIWebSocketStreamFn", () => {
/* consume */
}
// Registry cleared after send failure
// Registry cleared after retry budget exhaustion + HTTP fallback
expect(hasWsSession(sessionId)).toBe(false);
// HTTP fallback invoked
expect(streamSimpleCalls.length).toBeGreaterThan(callsBefore);

View File

@ -41,9 +41,8 @@ import {
convertTools,
planTurnInput,
} from "./openai-ws-message-conversion.js";
import { buildOpenAIWebSocketResponseCreatePayload } from "./openai-ws-request.js";
import { log } from "./pi-embedded-runner/logger.js";
import { resolveOpenAITextVerbosity } from "./pi-embedded-runner/openai-stream-wrappers.js";
import { resolveProviderRequestPolicyConfig } from "./provider-request-config.js";
import {
buildAssistantMessageWithZeroUsage,
buildStreamErrorAssistantMessage,
@ -208,6 +207,31 @@ export interface OpenAIWebSocketStreamOptions {
type WsTransport = "sse" | "websocket" | "auto";
const WARM_UP_TIMEOUT_MS = 8_000;
const MAX_AUTO_WS_RUNTIME_RETRIES = 1;
class OpenAIWebSocketRuntimeError extends Error {
readonly kind: "disconnect" | "send" | "server";
readonly retryable: boolean;
readonly closeCode?: number;
readonly closeReason?: string;
constructor(
message: string,
params: {
kind: "disconnect" | "send" | "server";
retryable: boolean;
closeCode?: number;
closeReason?: string;
},
) {
super(message);
this.name = "OpenAIWebSocketRuntimeError";
this.kind = params.kind;
this.retryable = params.retryable;
this.closeCode = params.closeCode;
this.closeReason = params.closeReason;
}
}
function resolveWsTransport(options: Parameters<StreamFn>[2]): WsTransport {
const transport = (options as { transport?: unknown } | undefined)?.transport;
@ -263,6 +287,25 @@ function formatOpenAIWebSocketResponseFailure(response: {
return "Unknown error (no error details in response)";
}
function normalizeWsRunError(err: unknown): OpenAIWebSocketRuntimeError {
if (err instanceof OpenAIWebSocketRuntimeError) {
return err;
}
return new OpenAIWebSocketRuntimeError(err instanceof Error ? err.message : String(err), {
kind: "server",
retryable: false,
});
}
function buildRetryableSendError(err: unknown): OpenAIWebSocketRuntimeError {
return new OpenAIWebSocketRuntimeError(
err instanceof Error ? err.message : `WebSocket send failed: ${String(err)}`,
{
kind: "send",
retryable: true,
},
);
}
async function runWarmUp(params: {
manager: OpenAIWebSocketManager;
modelId: string;
@ -346,337 +389,311 @@ export function createOpenAIWebSocketStreamFn(
return fallbackToHttp(model, context, options, apiKey, eventStream, opts.signal);
}
// ── 1. Get or create session state ──────────────────────────────────
let session = wsRegistry.get(sessionId);
if (!session) {
const manager = openAIWsStreamDeps.createManager(opts.managerOptions);
session = {
manager,
lastContextLength: 0,
everConnected: false,
warmUpAttempted: false,
broken: false,
};
wsRegistry.set(sessionId, session);
}
// ── 2. Ensure connection is open ─────────────────────────────────────
if (!session.manager.isConnected() && !session.broken) {
try {
await session.manager.connect(apiKey);
session.everConnected = true;
log.debug(`[ws-stream] connected for session=${sessionId}`);
} catch (connErr) {
// Cancel any background reconnect attempts before marking as broken.
try {
session.manager.close();
} catch {
/* ignore */
}
session.broken = true;
wsRegistry.delete(sessionId);
if (transport === "websocket") {
throw connErr instanceof Error ? connErr : new Error(String(connErr));
}
log.warn(
`[ws-stream] WebSocket connect failed for session=${sessionId}; falling back to HTTP. error=${String(connErr)}`,
);
// Fall back to HTTP immediately
return fallbackToHttp(model, context, options, apiKey, eventStream, opts.signal);
}
}
if (session.broken || !session.manager.isConnected()) {
if (transport === "websocket") {
throw new Error("WebSocket session disconnected");
}
log.warn(`[ws-stream] session=${sessionId} broken/disconnected; falling back to HTTP`);
// Clean up stale session to prevent next turn from using stale
// previousResponseId / lastContextLength after a mid-request drop.
try {
session.manager.close();
} catch {
/* ignore */
}
wsRegistry.delete(sessionId);
return fallbackToHttp(model, context, options, apiKey, eventStream, opts.signal);
}
const signal = opts.signal ?? (options as WsOptions | undefined)?.signal;
let emittedStart = false;
let runtimeRetries = 0;
if (resolveWsWarmup(options) && !session.warmUpAttempted) {
session.warmUpAttempted = true;
let warmupFailed = false;
try {
await runWarmUp({
manager: session.manager,
modelId: model.id,
tools: convertTools(context.tools),
instructions: context.systemPrompt ?? undefined,
signal,
});
log.debug(`[ws-stream] warm-up completed for session=${sessionId}`);
} catch (warmErr) {
if (signal?.aborted) {
throw warmErr instanceof Error ? warmErr : new Error(String(warmErr));
}
warmupFailed = true;
log.warn(
`[ws-stream] warm-up failed for session=${sessionId}; continuing without warm-up. error=${String(warmErr)}`,
);
while (true) {
let session = wsRegistry.get(sessionId);
if (!session) {
const manager = openAIWsStreamDeps.createManager(opts.managerOptions);
session = {
manager,
lastContextLength: 0,
everConnected: false,
warmUpAttempted: false,
broken: false,
};
wsRegistry.set(sessionId, session);
}
if (warmupFailed && !session.manager.isConnected()) {
try {
session.manager.close();
} catch {
/* ignore */
}
if (!session.manager.isConnected() && !session.broken) {
try {
await session.manager.connect(apiKey);
session.everConnected = true;
log.debug(`[ws-stream] reconnected after warm-up failure for session=${sessionId}`);
} catch (reconnectErr) {
log.debug(`[ws-stream] connected for session=${sessionId}`);
} catch (connErr) {
try {
session.manager.close();
} catch {
/* ignore */
}
session.broken = true;
wsRegistry.delete(sessionId);
if (transport === "websocket") {
throw reconnectErr instanceof Error ? reconnectErr : new Error(String(reconnectErr));
throw connErr instanceof Error ? connErr : new Error(String(connErr));
}
log.warn(
`[ws-stream] reconnect after warm-up failed for session=${sessionId}; falling back to HTTP. error=${String(reconnectErr)}`,
`[ws-stream] WebSocket connect failed for session=${sessionId}; falling back to HTTP. error=${String(connErr)}`,
);
return fallbackToHttp(model, context, options, apiKey, eventStream, opts.signal);
return fallbackToHttp(model, context, options, apiKey, eventStream, opts.signal, {
suppressStart: emittedStart,
});
}
}
}
// ── 3. Compute incremental vs full input ─────────────────────────────
const turnInput = planTurnInput({
context,
model,
previousResponseId: session.manager.previousResponseId,
lastContextLength: session.lastContextLength,
});
if (turnInput.mode === "incremental_tool_results") {
log.debug(
`[ws-stream] session=${sessionId}: incremental send (${turnInput.inputItems.length} tool results) previous_response_id=${turnInput.previousResponseId}`,
);
} else if (turnInput.mode === "full_context_restart") {
// The WebSocket guide requires a fresh full-context turn here: when we
// cannot continue the incremental chain, omit previous_response_id.
log.debug(
`[ws-stream] session=${sessionId}: no new tool results found; sending full context without previous_response_id`,
);
} else {
log.debug(
`[ws-stream] session=${sessionId}: full context send (${turnInput.inputItems.length} items)`,
);
}
// ── 4. Build & send response.create ──────────────────────────────────
const tools = convertTools(context.tools);
// Forward generation options that the HTTP path (openai-responses provider) also uses.
// Cast to record since SimpleStreamOptions carries openai-specific fields as unknown.
const streamOpts = options as
| (Record<string, unknown> & {
temperature?: number;
maxTokens?: number;
topP?: number;
toolChoice?: unknown;
textVerbosity?: string;
text_verbosity?: string;
})
| undefined;
const extraParams: Record<string, unknown> = {};
if (streamOpts?.temperature !== undefined) {
extraParams.temperature = streamOpts.temperature;
}
if (streamOpts?.maxTokens !== undefined) {
extraParams.max_output_tokens = streamOpts.maxTokens;
}
if (streamOpts?.topP !== undefined) {
extraParams.top_p = streamOpts.topP;
}
if (streamOpts?.toolChoice !== undefined) {
extraParams.tool_choice = streamOpts.toolChoice;
}
if (
streamOpts?.reasoningEffort !== "none" &&
(streamOpts?.reasoningEffort || streamOpts?.reasoningSummary)
) {
const reasoning: { effort?: string; summary?: string } = {};
if (streamOpts.reasoningEffort !== undefined) {
reasoning.effort = streamOpts.reasoningEffort as string;
}
if (streamOpts.reasoningSummary !== undefined) {
reasoning.summary = streamOpts.reasoningSummary as string;
}
extraParams.reasoning = reasoning;
}
const textVerbosity = resolveOpenAITextVerbosity(
streamOpts as Record<string, unknown> | undefined,
);
if (textVerbosity !== undefined) {
const existingText =
extraParams.text && typeof extraParams.text === "object"
? (extraParams.text as Record<string, unknown>)
: {};
extraParams.text = { ...existingText, verbosity: textVerbosity };
}
// Respect compat.supportsStore — providers like Gemini reject unknown
// fields such as `store` with a 400 error. Fixes #39086.
const supportsResponsesStoreField = resolveProviderRequestPolicyConfig({
provider: typeof model.provider === "string" ? model.provider : undefined,
api: typeof model.api === "string" ? model.api : undefined,
baseUrl: typeof model.baseUrl === "string" ? model.baseUrl : undefined,
compat: (model as { compat?: { supportsStore?: boolean } }).compat,
capability: "llm",
transport: "websocket",
}).capabilities.supportsResponsesStoreField;
const payload: Record<string, unknown> = {
type: "response.create",
model: model.id,
...(supportsResponsesStoreField ? { store: false } : {}),
input: turnInput.inputItems,
instructions: context.systemPrompt ?? undefined,
tools: tools.length > 0 ? tools : undefined,
...(turnInput.previousResponseId
? { previous_response_id: turnInput.previousResponseId }
: {}),
...extraParams,
};
const nextPayload = options?.onPayload?.(payload, model);
const requestPayload = (nextPayload ?? payload) as Parameters<
OpenAIWebSocketManager["send"]
>[0];
try {
session.manager.send(requestPayload);
} catch (sendErr) {
if (transport === "websocket") {
throw sendErr instanceof Error ? sendErr : new Error(String(sendErr));
}
log.warn(
`[ws-stream] send failed for session=${sessionId}; falling back to HTTP. error=${String(sendErr)}`,
);
// Fully reset session state so the next WS turn doesn't use stale
// previous_response_id or lastContextLength from before the failure.
resetWsSession({ sessionId, session });
return fallbackToHttp(model, context, options, apiKey, eventStream, opts.signal);
}
eventStream.push({
type: "start",
partial: buildAssistantMessageWithZeroUsage({
model,
content: [],
stopReason: "stop",
}),
});
// ── 5. Wait for response.completed ───────────────────────────────────
const capturedContextLength = context.messages.length;
let sawWsOutput = false;
try {
await new Promise<void>((resolve, reject) => {
// Honour abort signal
const abortHandler = () => {
cleanup();
reject(new Error("aborted"));
};
if (signal?.aborted) {
reject(new Error("aborted"));
return;
if (session.broken || !session.manager.isConnected()) {
if (transport === "websocket") {
throw new Error("WebSocket session disconnected");
}
signal?.addEventListener("abort", abortHandler, { once: true });
// If the WebSocket drops mid-request, reject so we don't hang forever.
const closeHandler = (code: number, reason: string) => {
cleanup();
reject(
new Error(
`WebSocket closed mid-request (code=${code}, reason=${reason || "unknown"})`,
),
);
};
session.manager.on("close", closeHandler);
const cleanup = () => {
signal?.removeEventListener("abort", abortHandler);
session.manager.off("close", closeHandler);
unsubscribe();
};
const unsubscribe = session.manager.onMessage((event) => {
if (
event.type === "response.output_item.added" ||
event.type === "response.output_item.done" ||
event.type === "response.content_part.added" ||
event.type === "response.content_part.done" ||
event.type === "response.output_text.delta" ||
event.type === "response.output_text.done" ||
event.type === "response.function_call_arguments.delta" ||
event.type === "response.function_call_arguments.done"
) {
sawWsOutput = true;
}
if (event.type === "response.completed") {
cleanup();
// Update session state
session.lastContextLength = capturedContextLength;
// Build and emit the assistant message
const assistantMsg = buildAssistantMessageFromResponse(event.response, {
api: model.api,
provider: model.provider,
id: model.id,
});
const reason: Extract<StopReason, "stop" | "length" | "toolUse"> =
assistantMsg.stopReason === "toolUse" ? "toolUse" : "stop";
eventStream.push({ type: "done", reason, message: assistantMsg });
resolve();
} else if (event.type === "response.failed") {
cleanup();
reject(
new Error(
`OpenAI WebSocket response failed: ${formatOpenAIWebSocketResponseFailure(event.response)}`,
),
);
} else if (event.type === "error") {
cleanup();
reject(new Error(`OpenAI WebSocket error: ${formatOpenAIWebSocketError(event)}`));
} else if (event.type === "response.output_text.delta") {
// Stream partial text updates for responsive UI
const partialMsg: AssistantMessage = buildAssistantMessageWithZeroUsage({
model,
content: [{ type: "text", text: event.delta }],
stopReason: "stop",
});
eventStream.push({
type: "text_delta",
contentIndex: 0,
delta: event.delta,
partial: partialMsg,
});
}
});
});
} catch (wsRunErr) {
if (transport !== "websocket" && !signal?.aborted && !sawWsOutput) {
log.warn(
`[ws-stream] session=${sessionId} runtime failure before output; falling back to HTTP. error=${String(wsRunErr)}`,
);
log.warn(`[ws-stream] session=${sessionId} broken/disconnected; falling back to HTTP`);
resetWsSession({ sessionId, session });
return fallbackToHttp(model, context, options, apiKey, eventStream, opts.signal, {
suppressStart: true,
suppressStart: emittedStart,
});
}
throw wsRunErr;
if (resolveWsWarmup(options) && !session.warmUpAttempted) {
session.warmUpAttempted = true;
let warmupFailed = false;
try {
await runWarmUp({
manager: session.manager,
modelId: model.id,
tools: convertTools(context.tools),
instructions: context.systemPrompt ?? undefined,
signal,
});
log.debug(`[ws-stream] warm-up completed for session=${sessionId}`);
} catch (warmErr) {
if (signal?.aborted) {
throw warmErr instanceof Error ? warmErr : new Error(String(warmErr));
}
warmupFailed = true;
log.warn(
`[ws-stream] warm-up failed for session=${sessionId}; continuing without warm-up. error=${String(warmErr)}`,
);
}
if (warmupFailed && !session.manager.isConnected()) {
try {
session.manager.close();
} catch {
/* ignore */
}
try {
await session.manager.connect(apiKey);
session.everConnected = true;
log.debug(`[ws-stream] reconnected after warm-up failure for session=${sessionId}`);
} catch (reconnectErr) {
session.broken = true;
wsRegistry.delete(sessionId);
if (transport === "websocket") {
throw reconnectErr instanceof Error
? reconnectErr
: new Error(String(reconnectErr));
}
log.warn(
`[ws-stream] reconnect after warm-up failed for session=${sessionId}; falling back to HTTP. error=${String(reconnectErr)}`,
);
return fallbackToHttp(model, context, options, apiKey, eventStream, opts.signal, {
suppressStart: emittedStart,
});
}
}
}
const turnInput = planTurnInput({
context,
model,
previousResponseId: session.manager.previousResponseId,
lastContextLength: session.lastContextLength,
});
if (turnInput.mode === "incremental_tool_results") {
log.debug(
`[ws-stream] session=${sessionId}: incremental send (${turnInput.inputItems.length} tool results) previous_response_id=${turnInput.previousResponseId}`,
);
} else if (turnInput.mode === "full_context_restart") {
log.debug(
`[ws-stream] session=${sessionId}: no new tool results found; sending full context without previous_response_id`,
);
} else {
log.debug(
`[ws-stream] session=${sessionId}: full context send (${turnInput.inputItems.length} items)`,
);
}
const payload = buildOpenAIWebSocketResponseCreatePayload({
model,
context,
options: options as WsOptions | undefined,
turnInput,
tools: convertTools(context.tools),
}) as Record<string, unknown>;
const nextPayload = options?.onPayload?.(payload, model);
const requestPayload = (nextPayload ?? payload) as Parameters<
OpenAIWebSocketManager["send"]
>[0];
try {
session.manager.send(requestPayload);
} catch (sendErr) {
const normalizedErr = buildRetryableSendError(sendErr);
if (
transport !== "websocket" &&
!signal?.aborted &&
runtimeRetries < MAX_AUTO_WS_RUNTIME_RETRIES
) {
runtimeRetries++;
log.warn(
`[ws-stream] retrying websocket turn after send failure for session=${sessionId} (${runtimeRetries}/${MAX_AUTO_WS_RUNTIME_RETRIES}). error=${normalizedErr.message}`,
);
resetWsSession({ sessionId, session });
continue;
}
if (transport !== "websocket") {
log.warn(
`[ws-stream] send failed for session=${sessionId}; falling back to HTTP. error=${normalizedErr.message}`,
);
resetWsSession({ sessionId, session });
return fallbackToHttp(model, context, options, apiKey, eventStream, opts.signal, {
suppressStart: emittedStart,
});
}
throw normalizedErr;
}
if (!emittedStart) {
eventStream.push({
type: "start",
partial: buildAssistantMessageWithZeroUsage({
model,
content: [],
stopReason: "stop",
}),
});
emittedStart = true;
}
const capturedContextLength = context.messages.length;
let sawWsOutput = false;
try {
await new Promise<void>((resolve, reject) => {
const abortHandler = () => {
cleanup();
reject(new Error("aborted"));
};
if (signal?.aborted) {
reject(new Error("aborted"));
return;
}
signal?.addEventListener("abort", abortHandler, { once: true });
const closeHandler = (code: number, reason: string) => {
cleanup();
const closeInfo = session.manager.lastCloseInfo;
reject(
new OpenAIWebSocketRuntimeError(
`WebSocket closed mid-request (code=${code}, reason=${reason || "unknown"})`,
{
kind: "disconnect",
retryable: closeInfo?.retryable ?? true,
closeCode: closeInfo?.code ?? code,
closeReason: closeInfo?.reason ?? reason,
},
),
);
};
session.manager.on("close", closeHandler);
const cleanup = () => {
signal?.removeEventListener("abort", abortHandler);
session.manager.off("close", closeHandler);
unsubscribe();
};
const unsubscribe = session.manager.onMessage((event) => {
if (
event.type === "response.output_item.added" ||
event.type === "response.output_item.done" ||
event.type === "response.content_part.added" ||
event.type === "response.content_part.done" ||
event.type === "response.output_text.delta" ||
event.type === "response.output_text.done" ||
event.type === "response.function_call_arguments.delta" ||
event.type === "response.function_call_arguments.done"
) {
sawWsOutput = true;
}
if (event.type === "response.completed") {
cleanup();
session.lastContextLength = capturedContextLength;
const assistantMsg = buildAssistantMessageFromResponse(event.response, {
api: model.api,
provider: model.provider,
id: model.id,
});
const reason: Extract<StopReason, "stop" | "length" | "toolUse"> =
assistantMsg.stopReason === "toolUse" ? "toolUse" : "stop";
eventStream.push({ type: "done", reason, message: assistantMsg });
resolve();
} else if (event.type === "response.failed") {
cleanup();
reject(
new OpenAIWebSocketRuntimeError(
`OpenAI WebSocket response failed: ${formatOpenAIWebSocketResponseFailure(event.response)}`,
{
kind: "server",
retryable: false,
},
),
);
} else if (event.type === "error") {
cleanup();
reject(
new OpenAIWebSocketRuntimeError(
`OpenAI WebSocket error: ${formatOpenAIWebSocketError(event)}`,
{
kind: "server",
retryable: false,
},
),
);
} else if (event.type === "response.output_text.delta") {
const partialMsg: AssistantMessage = buildAssistantMessageWithZeroUsage({
model,
content: [{ type: "text", text: event.delta }],
stopReason: "stop",
});
eventStream.push({
type: "text_delta",
contentIndex: 0,
delta: event.delta,
partial: partialMsg,
});
}
});
});
return;
} catch (wsRunErr) {
const normalizedErr = normalizeWsRunError(wsRunErr);
if (
transport !== "websocket" &&
!signal?.aborted &&
normalizedErr.retryable &&
!sawWsOutput &&
runtimeRetries < MAX_AUTO_WS_RUNTIME_RETRIES
) {
runtimeRetries++;
log.warn(
`[ws-stream] retrying websocket turn after retryable runtime failure for session=${sessionId} (${runtimeRetries}/${MAX_AUTO_WS_RUNTIME_RETRIES}). error=${normalizedErr.message}`,
);
resetWsSession({ sessionId, session });
continue;
}
if (transport !== "websocket" && !signal?.aborted && !sawWsOutput) {
log.warn(
`[ws-stream] session=${sessionId} runtime failure before output; falling back to HTTP. error=${normalizedErr.message}`,
);
resetWsSession({ sessionId, session });
return fallbackToHttp(model, context, options, apiKey, eventStream, opts.signal, {
suppressStart: true,
});
}
throw normalizedErr;
}
}
};