diff --git a/CHANGELOG.md b/CHANGELOG.md index 34301c6da07..48f8b1af8e9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,6 +40,7 @@ Docs: https://docs.openclaw.ai - CLI/message send: write manual `openclaw message send` deliveries into the resolved agent session transcript again by always threading the default CLI agent through outbound mirroring. (#54187) Thanks @KevInTheCloud5617. - CLI/onboarding: show the Kimi Code API key option again in the Moonshot setup menu so the interactive picker includes all Kimi setup paths together. Fixes #54412 Thanks @sparkyrider - Agents/status: use provider-aware context window lookup for fresh Anthropic 4.6 model overrides so `/status` shows the correct 1.0m window instead of an underreported shared-cache minimum. (#54796) Thanks @neeravmakwana. +- OpenAI/WebSocket: preserve reasoning replay metadata and tool-call item ids on WebSocket tool turns, and start a fresh response chain when full-context resend is required. (#53856) Thanks @xujingchen1996. - OpenAI/WS: restore reasoning blocks for Responses WebSocket runs and keep reasoning/tool-call replay metadata intact so resumed sessions do not lose or break follow-up reasoning-capable turns. (#53856) Thanks @xujingchen1996. - Agents/errors: surface provider quota/reset details when available, but keep HTML/Cloudflare rate-limit pages on the generic fallback so raw error pages are not shown to users. (#54512) Thanks @bugkill3r. - Claude CLI: switch the bundled Claude CLI backend to `stream-json` output so watchdogs see progress on long runs, and keep session/usage metadata even when Claude finishes with an empty result line. (#49698) Thanks @felear2022. diff --git a/src/agents/openai-ws-stream.e2e.test.ts b/src/agents/openai-ws-stream.e2e.test.ts index aabdacde289..ea53669c54b 100644 --- a/src/agents/openai-ws-stream.e2e.test.ts +++ b/src/agents/openai-ws-stream.e2e.test.ts @@ -9,7 +9,7 @@ * - Connection lifecycle cleanup via releaseWsSession * * Run manually with a valid OPENAI_API_KEY: - * OPENAI_API_KEY=sk-... npx vitest run src/agents/openai-ws-stream.e2e.test.ts + * OPENCLAW_LIVE_TEST=1 pnpm exec vitest run --config vitest.e2e.config.ts src/agents/openai-ws-stream.e2e.test.ts * * Skipped in CI — no API key available and we avoid billable external calls. */ @@ -21,15 +21,19 @@ import type { Context, } from "@mariozechner/pi-ai"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { isLiveTestEnabled } from "./live-test-helpers.js"; +import type { OutputItem, ResponseObject } from "./openai-ws-connection.js"; const API_KEY = process.env.OPENAI_API_KEY; -const LIVE = !!API_KEY; +const LIVE = isLiveTestEnabled(["OPENAI_LIVE_TEST"]) && !!API_KEY; const testFn = LIVE ? it : it.skip; type OpenAIWsStreamModule = typeof import("./openai-ws-stream.js"); +type OpenAIWsConnectionModule = typeof import("./openai-ws-connection.js"); type StreamFactory = OpenAIWsStreamModule["createOpenAIWebSocketStreamFn"]; type StreamReturn = ReturnType>; let openAIWsStreamModule: OpenAIWsStreamModule; +let openAIWsConnectionModule: OpenAIWsConnectionModule; const model = { api: "openai-responses" as const, @@ -106,6 +110,66 @@ function assistantText(message: AssistantMessage): string { .join(""); } +function extractThinkingBlocks(message: AssistantMessage) { + return message.content.filter((block) => block.type === "thinking") as Array<{ + type: "thinking"; + thinking: string; + thinkingSignature?: string; + }>; +} + +function extractToolCall(message: AssistantMessage) { + return message.content.find((block) => block.type === "toolCall") as + | { type: "toolCall"; id: string; name: string } + | undefined; +} + +function parseReasoningSignature(value: string | undefined) { + if (!value) { + return null; + } + try { + return JSON.parse(value) as { id?: unknown; type?: unknown }; + } catch { + return null; + } +} + +function extractReasoningText(item: { summary?: unknown; content?: unknown }): string { + const summary = item.summary; + if (typeof summary === "string") { + return summary.trim(); + } + if (Array.isArray(summary)) { + const summaryText = summary + .map((part) => { + if (typeof part === "string") { + return part.trim(); + } + if (!part || typeof part !== "object") { + return ""; + } + return typeof (part as { text?: unknown }).text === "string" + ? ((part as { text: string }).text ?? "").trim() + : ""; + }) + .filter(Boolean) + .join("\n") + .trim(); + if (summaryText) { + return summaryText; + } + } + return typeof item.content === "string" ? item.content.trim() : ""; +} + +function toExpectedReasoningSignature(item: { id?: string; type: string }) { + return { + type: item.type, + ...(typeof item.id === "string" && item.id.startsWith("rs_") ? { id: item.id } : {}), + }; +} + /** Each test gets a unique session ID to avoid cross-test interference. */ const sessions: string[] = []; function freshSession(name: string): string { @@ -124,6 +188,7 @@ describe("OpenAI WebSocket e2e", () => { createAssistantMessageEventStream: actual.createAssistantMessageEventStream, }; }); + openAIWsConnectionModule = await import("./openai-ws-connection.js"); openAIWsStreamModule = await import("./openai-ws-stream.js"); }); @@ -131,6 +196,7 @@ describe("OpenAI WebSocket e2e", () => { for (const id of sessions) { openAIWsStreamModule.releaseWsSession(id); } + openAIWsStreamModule.__testing.setDepsForTest(); sessions.length = 0; }); @@ -211,6 +277,90 @@ describe("OpenAI WebSocket e2e", () => { 60_000, ); + testFn( + "surfaces replay-safe reasoning metadata on websocket tool turns", + async () => { + const sid = freshSession("tool-reasoning"); + const completedResponses: ResponseObject[] = []; + openAIWsStreamModule.__testing.setDepsForTest({ + createManager: (options) => { + const manager = new openAIWsConnectionModule.OpenAIWebSocketManager(options); + manager.onMessage((event) => { + if (event.type === "response.completed") { + completedResponses.push(event.response); + } + }); + return manager; + }, + }); + const streamFn = openAIWsStreamModule.createOpenAIWebSocketStreamFn(API_KEY!, sid); + const firstContext = makeToolContext( + "Think carefully, call the tool `noop` with {} first, then after the tool result reply with exactly TOOL_OK.", + ); + const firstDone = expectDone( + await collectEvents( + streamFn(model, firstContext, { + transport: "websocket", + toolChoice: "required", + reasoningEffort: "high", + reasoningSummary: "detailed", + maxTokens: 256, + } as unknown as StreamFnParams[2]), + ), + ); + + const firstResponse = completedResponses[0]; + expect(firstResponse).toBeDefined(); + + const rawReasoningItems = (firstResponse?.output ?? []).filter( + (item): item is Extract => + item.type === "reasoning" || item.type.startsWith("reasoning."), + ); + const replayableReasoningItems = rawReasoningItems.filter( + (item) => extractReasoningText(item).length > 0, + ); + const thinkingBlocks = extractThinkingBlocks(firstDone); + expect(thinkingBlocks).toHaveLength(replayableReasoningItems.length); + expect(thinkingBlocks.map((block) => block.thinking)).toEqual( + replayableReasoningItems.map((item) => extractReasoningText(item)), + ); + expect( + thinkingBlocks.map((block) => parseReasoningSignature(block.thinkingSignature)), + ).toEqual(replayableReasoningItems.map((item) => toExpectedReasoningSignature(item))); + + const rawToolCall = firstResponse?.output.find( + (item): item is Extract => + item.type === "function_call", + ); + expect(rawToolCall).toBeDefined(); + const toolCall = extractToolCall(firstDone); + expect(toolCall?.name).toBe(rawToolCall?.name); + expect(toolCall?.id).toBe( + rawToolCall ? `${rawToolCall.call_id}|${rawToolCall.id}` : undefined, + ); + + const secondContext = { + ...firstContext, + messages: [ + ...firstContext.messages, + firstDone, + makeToolResultMessage(toolCall!.id, "TOOL_OK"), + ], + } as unknown as StreamFnParams[1]; + const secondDone = expectDone( + await collectEvents( + streamFn(model, secondContext, { + transport: "websocket", + maxTokens: 128, + }), + ), + ); + + expect(assistantText(secondDone)).toMatch(/TOOL_OK/); + }, + 60_000, + ); + testFn( "supports websocket warm-up before the first request", async () => { diff --git a/src/agents/openai-ws-stream.test.ts b/src/agents/openai-ws-stream.test.ts index 344a05f4162..09218eaa97f 100644 --- a/src/agents/openai-ws-stream.test.ts +++ b/src/agents/openai-ws-stream.test.ts @@ -618,6 +618,42 @@ describe("convertMessagesToInputItems", () => { expect(items[0]).toMatchObject({ type: "reasoning", id: "rs_summary" }); }); + it("drops reasoning replay ids that do not match OpenAI reasoning ids", () => { + const msg = { + role: "assistant" as const, + content: [ + { + type: "thinking" as const, + thinking: "internal reasoning...", + thinkingSignature: JSON.stringify({ + type: "reasoning", + id: " bad-id ", + }), + }, + { type: "text" as const, text: "Here is my answer." }, + ], + stopReason: "stop", + api: "openai-responses", + provider: "openai", + model: "gpt-5.2", + usage: {}, + timestamp: 0, + }; + const items = convertMessagesToInputItems([msg] as Parameters< + typeof convertMessagesToInputItems + >[0]); + expect(items).toEqual([ + { + type: "reasoning", + }, + { + type: "message", + role: "assistant", + content: "Here is my answer.", + }, + ]); + }); + it("returns empty array for empty messages", () => { expect(convertMessagesToInputItems([])).toEqual([]); }); @@ -764,6 +800,63 @@ describe("buildAssistantMessageFromResponse", () => { ); }); + it("prefers reasoning summary text over fallback content and preserves item order", () => { + const response = { + id: "resp_reasoning_order", + object: "response", + created_at: Date.now(), + status: "completed", + model: "gpt-5.2", + output: [ + { + type: "reasoning.summary", + id: "rs_789", + summary: ["Plan A", { text: "Plan B" }, { nope: true }], + content: "hidden fallback content", + }, + { + type: "function_call", + id: "fc_789", + call_id: "call_789", + name: "exec", + arguments: '{"arg":"value"}', + }, + ], + usage: { input_tokens: 100, output_tokens: 50, total_tokens: 150 }, + } as unknown as ResponseObject; + + const msg = buildAssistantMessageFromResponse(response, modelInfo); + expect(msg.content.map((block) => block.type)).toEqual(["thinking", "toolCall"]); + const thinkingBlock = msg.content[0] as + | { type: "thinking"; thinking: string; thinkingSignature?: string } + | undefined; + expect(thinkingBlock?.thinking).toBe("Plan A\nPlan B"); + expect(thinkingBlock?.thinkingSignature).toBe( + JSON.stringify({ id: "rs_789", type: "reasoning.summary" }), + ); + }); + + it("drops invalid reasoning ids from thinking signatures while preserving the visible block", () => { + const response = { + id: "resp_invalid_reasoning_id", + object: "response", + created_at: Date.now(), + status: "completed", + model: "gpt-5.2", + output: [ + { + type: "reasoning", + id: "invalid_reasoning_id", + content: "Hidden reasoning", + }, + ], + usage: { input_tokens: 100, output_tokens: 50, total_tokens: 150 }, + } as unknown as ResponseObject; + + const msg = buildAssistantMessageFromResponse(response, modelInfo); + expect(msg.content).toEqual([{ type: "thinking", thinking: "Hidden reasoning" }]); + }); + it("preserves function call item ids for replay when reasoning is present", () => { const response = { id: "resp_tool_reasoning", @@ -1148,6 +1241,95 @@ describe("createOpenAIWebSocketStreamFn", () => { expect(inputTypes).toHaveLength(1); }); + it("omits previous_response_id when replaying full context on follow-up turns", async () => { + const sessionId = "sess-full-context-replay"; + const streamFn = createOpenAIWebSocketStreamFn("sk-test", sessionId); + + const ctx1 = { + systemPrompt: "You are helpful.", + messages: [userMsg("Run ls")] as Parameters[0], + tools: [], + }; + + const turn1Response = { + id: "resp_turn1_reasoning", + object: "response", + created_at: Date.now(), + status: "completed", + model: "gpt-5.2", + output: [ + { + type: "reasoning", + id: "rs_turn1", + content: "Thinking before tool call", + }, + { + type: "function_call", + id: "fc_turn1", + call_id: "call_turn1", + name: "exec", + arguments: '{"cmd":"ls"}', + }, + ], + usage: { input_tokens: 12, output_tokens: 8, total_tokens: 20 }, + } as ResponseObject; + + const stream1 = streamFn( + modelStub as Parameters[0], + ctx1 as Parameters[1], + ); + const done1 = (async () => { + for await (const _ of await resolveStream(stream1)) { + /* consume */ + } + })(); + + await new Promise((r) => setImmediate(r)); + const manager = MockManager.lastInstance!; + manager.setPreviousResponseId("resp_turn1_reasoning"); + manager.simulateEvent({ type: "response.completed", response: turn1Response }); + await done1; + + const ctx2 = { + systemPrompt: "You are helpful.", + messages: [ + userMsg("Run ls"), + buildAssistantMessageFromResponse(turn1Response, modelStub), + ] as Parameters[0], + tools: [], + }; + + const stream2 = streamFn( + modelStub as Parameters[0], + ctx2 as Parameters[1], + ); + const done2 = (async () => { + for await (const _ of await resolveStream(stream2)) { + /* consume */ + } + })(); + + await new Promise((r) => setImmediate(r)); + manager.simulateEvent({ + type: "response.completed", + response: makeResponseObject("resp_turn2", "Done"), + }); + await done2; + + const sent2 = manager.sentEvents[1] as { + previous_response_id?: string; + input: Array<{ type: string; id?: string; call_id?: string }>; + }; + expect(sent2.previous_response_id).toBeUndefined(); + expect(sent2.input.map((item) => item.type)).toEqual(["message", "reasoning", "function_call"]); + expect(sent2.input[1]).toMatchObject({ type: "reasoning", id: "rs_turn1" }); + expect(sent2.input[2]).toMatchObject({ + type: "function_call", + call_id: "call_turn1", + id: "fc_turn1", + }); + }); + it("sends instructions (system prompt) in each request", async () => { const streamFn = createOpenAIWebSocketStreamFn("sk-test", "sess-tools"); const ctx = { diff --git a/src/agents/openai-ws-stream.ts b/src/agents/openai-ws-stream.ts index 38b086b45d2..ca5be9d0664 100644 --- a/src/agents/openai-ws-stream.ts +++ b/src/agents/openai-ws-stream.ts @@ -879,6 +879,7 @@ export function createOpenAIWebSocketStreamFn( // ── 3. Compute incremental vs full input ───────────────────────────── const prevResponseId = session.manager.previousResponseId; + let requestPreviousResponseId: string | undefined; let inputItems: InputItem[]; if (prevResponseId && session.lastContextLength > 0) { @@ -887,13 +888,15 @@ export function createOpenAIWebSocketStreamFn( // Filter to only tool results — the assistant message is already in server context const toolResults = newMessages.filter((m) => (m as AnyMessage).role === "toolResult"); if (toolResults.length === 0) { - // Shouldn't happen in a well-formed turn, but fall back to full context + // The WebSocket guide requires a fresh full-context turn here: when we + // cannot continue the incremental chain, omit previous_response_id. log.debug( - `[ws-stream] session=${sessionId}: no new tool results found; sending full context`, + `[ws-stream] session=${sessionId}: no new tool results found; sending full context without previous_response_id`, ); inputItems = buildFullInput(context, model); } else { inputItems = convertMessagesToInputItems(toolResults, model); + requestPreviousResponseId = prevResponseId; } log.debug( `[ws-stream] session=${sessionId}: incremental send (${inputItems.length} tool results) previous_response_id=${prevResponseId}`, @@ -955,7 +958,7 @@ export function createOpenAIWebSocketStreamFn( input: inputItems, instructions: context.systemPrompt ?? undefined, tools: tools.length > 0 ? tools : undefined, - ...(prevResponseId ? { previous_response_id: prevResponseId } : {}), + ...(requestPreviousResponseId ? { previous_response_id: requestPreviousResponseId } : {}), ...extraParams, }; const nextPayload = options?.onPayload?.(payload, model);