diff --git a/src/agents/openai-ws-connection.test.ts b/src/agents/openai-ws-connection.test.ts index 57f4ff896a4..7bc143942db 100644 --- a/src/agents/openai-ws-connection.test.ts +++ b/src/agents/openai-ws-connection.test.ts @@ -9,11 +9,12 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import type { ClientOptions } from "ws"; import type { ClientEvent, + ErrorEvent, OpenAIWebSocketEvent, ResponseCompletedEvent, ResponseCreateEvent, } from "./openai-ws-connection.js"; -import { OpenAIWebSocketManager } from "./openai-ws-connection.js"; +import { getOpenAIWebSocketErrorDetails, OpenAIWebSocketManager } from "./openai-ws-connection.js"; // ───────────────────────────────────────────────────────────────────────────── // Mock WebSocket (hoisted so vi.mock factory can reference it) @@ -661,6 +662,27 @@ describe("OpenAIWebSocketManager", () => { // ─── Error handling ───────────────────────────────────────────────────────── describe("error handling", () => { + it("normalizes nested websocket error payloads", () => { + const details = getOpenAIWebSocketErrorDetails({ + type: "error", + status: 400, + error: { + type: "invalid_request_error", + code: "previous_response_not_found", + message: "Previous response with id 'resp_abc' not found.", + param: "previous_response_id", + }, + } satisfies ErrorEvent); + + expect(details).toEqual({ + status: 400, + type: "invalid_request_error", + code: "previous_response_not_found", + message: "Previous response with id 'resp_abc' not found.", + param: "previous_response_id", + }); + }); + it("emits error event on malformed JSON message", async () => { const manager = buildManager(); const sock = await connectManagerAndGetSocket(manager); diff --git a/src/agents/openai-ws-connection.ts b/src/agents/openai-ws-connection.ts index c044531f820..52d6f8623ff 100644 --- a/src/agents/openai-ws-connection.ts +++ b/src/agents/openai-ws-connection.ts @@ -34,6 +34,7 @@ export interface ResponseObject { output: OutputItem[]; usage?: UsageInfo; error?: { code: string; message: string }; + incomplete_details?: { reason?: string }; } export interface UsageInfo { @@ -160,9 +161,16 @@ export interface RateLimitUpdatedEvent { export interface ErrorEvent { type: "error"; - code: string; - message: string; + status?: number; + code?: string; + message?: string; param?: string; + error?: { + type?: string; + code?: string; + message?: string; + param?: string; + }; } export type OpenAIWebSocketEvent = @@ -568,3 +576,19 @@ export class OpenAIWebSocketManager extends EventEmitter { this.send(event); } } + +export function getOpenAIWebSocketErrorDetails(event: ErrorEvent): { + status?: number; + type?: string; + code?: string; + message?: string; + param?: string; +} { + return { + status: typeof event.status === "number" ? event.status : undefined, + type: event.error?.type, + code: event.error?.code ?? event.code, + message: event.error?.message ?? event.message, + param: event.error?.param ?? event.param, + }; +} diff --git a/src/agents/openai-ws-stream.test.ts b/src/agents/openai-ws-stream.test.ts index f82e54e6116..cfe25254ea8 100644 --- a/src/agents/openai-ws-stream.test.ts +++ b/src/agents/openai-ws-stream.test.ts @@ -1455,6 +1455,40 @@ describe("createOpenAIWebSocketStreamFn", () => { expect(doneEvent?.message?.content?.[0]?.text).toBe("http fallback response"); }); + it("falls back to HTTP when OpenAI sends a nested websocket error payload", async () => { + const streamFn = createOpenAIWebSocketStreamFn("sk-test", "sess-runtime-fallback-nested"); + const stream = streamFn( + modelStub as Parameters[0], + contextStub as Parameters[1], + { transport: "auto" } as Parameters[2], + ); + + await new Promise((r) => setImmediate(r)); + const manager = MockManager.lastInstance!; + manager.simulateEvent({ + type: "error", + status: 400, + error: { + type: "invalid_request_error", + code: "previous_response_not_found", + message: "Previous response with id 'resp_abc' not found.", + param: "previous_response_id", + }, + }); + + const events: Array<{ type?: string; message?: { content?: Array<{ text?: string }> } }> = []; + for await (const ev of await resolveStream(stream)) { + events.push(ev as { type?: string; message?: { content?: Array<{ text?: string }> } }); + } + + expect(streamSimpleCalls.length).toBeGreaterThanOrEqual(1); + expect(manager.closeCallCount).toBeGreaterThanOrEqual(1); + expect(events.filter((event) => event.type === "start")).toHaveLength(1); + expect(events.some((event) => event.type === "error")).toBe(false); + const doneEvent = events.find((event) => event.type === "done"); + expect(doneEvent?.message?.content?.[0]?.text).toBe("http fallback response"); + }); + it("tracks previous_response_id across turns (incremental send)", async () => { const sessionId = "sess-incremental"; const streamFn = createOpenAIWebSocketStreamFn("sk-test", sessionId); diff --git a/src/agents/openai-ws-stream.ts b/src/agents/openai-ws-stream.ts index 24b16ef94bf..5114f41e292 100644 --- a/src/agents/openai-ws-stream.ts +++ b/src/agents/openai-ws-stream.ts @@ -30,6 +30,7 @@ import type { } from "@mariozechner/pi-ai"; import * as piAi from "@mariozechner/pi-ai"; import { + getOpenAIWebSocketErrorDetails, OpenAIWebSocketManager, type FunctionToolDefinition, type OpenAIWebSocketManagerOptions, @@ -231,6 +232,37 @@ function resetWsSession(params: { sessionId: string; session: WsSession }): void wsRegistry.delete(params.sessionId); } +function formatOpenAIWebSocketError( + event: Parameters[0] extends (arg: infer T) => void + ? Extract + : never, +): string { + const details = getOpenAIWebSocketErrorDetails(event); + const code = details.code ?? "unknown"; + const message = details.message ?? "Unknown error"; + const extras = [ + typeof details.status === "number" ? `status=${details.status}` : null, + details.type ? `type=${details.type}` : null, + details.param ? `param=${details.param}` : null, + ].filter(Boolean); + return extras.length > 0 + ? `${message} (code=${code}; ${extras.join(", ")})` + : `${message} (code=${code})`; +} + +function formatOpenAIWebSocketResponseFailure(response: { + error?: { code?: string; message?: string }; + incomplete_details?: { reason?: string }; +}): string { + if (response.error) { + return `${response.error.code || "unknown"}: ${response.error.message || "no message"}`; + } + if (response.incomplete_details?.reason) { + return `incomplete: ${response.incomplete_details.reason}`; + } + return "Unknown error (no error details in response)"; +} + async function runWarmUp(params: { manager: OpenAIWebSocketManager; modelId: string; @@ -261,11 +293,12 @@ async function runWarmUp(params: { resolve(); } else if (event.type === "response.failed") { cleanup(); - const errMsg = event.response?.error?.message ?? "Response failed"; - reject(new Error(`warm-up failed: ${errMsg}`)); + reject( + new Error(`warm-up failed: ${formatOpenAIWebSocketResponseFailure(event.response)}`), + ); } else if (event.type === "error") { cleanup(); - reject(new Error(`warm-up error: ${event.message} (code=${event.code})`)); + reject(new Error(`warm-up error: ${formatOpenAIWebSocketError(event)}`)); } }); @@ -609,11 +642,14 @@ export function createOpenAIWebSocketStreamFn( resolve(); } else if (event.type === "response.failed") { cleanup(); - const errMsg = event.response?.error?.message ?? "Response failed"; - reject(new Error(`OpenAI WebSocket response failed: ${errMsg}`)); + reject( + new Error( + `OpenAI WebSocket response failed: ${formatOpenAIWebSocketResponseFailure(event.response)}`, + ), + ); } else if (event.type === "error") { cleanup(); - reject(new Error(`OpenAI WebSocket error: ${event.message} (code=${event.code})`)); + reject(new Error(`OpenAI WebSocket error: ${formatOpenAIWebSocketError(event)}`)); } else if (event.type === "response.output_text.delta") { // Stream partial text updates for responsive UI const partialMsg: AssistantMessage = buildAssistantMessageWithZeroUsage({