fix: tighten openai ws reasoning replay (#53856)

This commit is contained in:
Peter Steinberger 2026-03-27 01:48:48 +00:00
parent ad21d84940
commit d7b61228e2
4 changed files with 341 additions and 5 deletions

View File

@ -40,6 +40,7 @@ Docs: https://docs.openclaw.ai
- CLI/message send: write manual `openclaw message send` deliveries into the resolved agent session transcript again by always threading the default CLI agent through outbound mirroring. (#54187) Thanks @KevInTheCloud5617.
- CLI/onboarding: show the Kimi Code API key option again in the Moonshot setup menu so the interactive picker includes all Kimi setup paths together. Fixes #54412 Thanks @sparkyrider
- Agents/status: use provider-aware context window lookup for fresh Anthropic 4.6 model overrides so `/status` shows the correct 1.0m window instead of an underreported shared-cache minimum. (#54796) Thanks @neeravmakwana.
- OpenAI/WebSocket: preserve reasoning replay metadata and tool-call item ids on WebSocket tool turns, and start a fresh response chain when full-context resend is required. (#53856) Thanks @xujingchen1996.
- OpenAI/WS: restore reasoning blocks for Responses WebSocket runs and keep reasoning/tool-call replay metadata intact so resumed sessions do not lose or break follow-up reasoning-capable turns. (#53856) Thanks @xujingchen1996.
- Agents/errors: surface provider quota/reset details when available, but keep HTML/Cloudflare rate-limit pages on the generic fallback so raw error pages are not shown to users. (#54512) Thanks @bugkill3r.
- Claude CLI: switch the bundled Claude CLI backend to `stream-json` output so watchdogs see progress on long runs, and keep session/usage metadata even when Claude finishes with an empty result line. (#49698) Thanks @felear2022.

View File

@ -9,7 +9,7 @@
* - Connection lifecycle cleanup via releaseWsSession
*
* Run manually with a valid OPENAI_API_KEY:
* OPENAI_API_KEY=sk-... npx vitest run src/agents/openai-ws-stream.e2e.test.ts
* OPENCLAW_LIVE_TEST=1 pnpm exec vitest run --config vitest.e2e.config.ts src/agents/openai-ws-stream.e2e.test.ts
*
* Skipped in CI no API key available and we avoid billable external calls.
*/
@ -21,15 +21,19 @@ import type {
Context,
} from "@mariozechner/pi-ai";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { isLiveTestEnabled } from "./live-test-helpers.js";
import type { OutputItem, ResponseObject } from "./openai-ws-connection.js";
const API_KEY = process.env.OPENAI_API_KEY;
const LIVE = !!API_KEY;
const LIVE = isLiveTestEnabled(["OPENAI_LIVE_TEST"]) && !!API_KEY;
const testFn = LIVE ? it : it.skip;
type OpenAIWsStreamModule = typeof import("./openai-ws-stream.js");
type OpenAIWsConnectionModule = typeof import("./openai-ws-connection.js");
type StreamFactory = OpenAIWsStreamModule["createOpenAIWebSocketStreamFn"];
type StreamReturn = ReturnType<ReturnType<StreamFactory>>;
let openAIWsStreamModule: OpenAIWsStreamModule;
let openAIWsConnectionModule: OpenAIWsConnectionModule;
const model = {
api: "openai-responses" as const,
@ -106,6 +110,66 @@ function assistantText(message: AssistantMessage): string {
.join("");
}
function extractThinkingBlocks(message: AssistantMessage) {
return message.content.filter((block) => block.type === "thinking") as Array<{
type: "thinking";
thinking: string;
thinkingSignature?: string;
}>;
}
function extractToolCall(message: AssistantMessage) {
return message.content.find((block) => block.type === "toolCall") as
| { type: "toolCall"; id: string; name: string }
| undefined;
}
function parseReasoningSignature(value: string | undefined) {
if (!value) {
return null;
}
try {
return JSON.parse(value) as { id?: unknown; type?: unknown };
} catch {
return null;
}
}
function extractReasoningText(item: { summary?: unknown; content?: unknown }): string {
const summary = item.summary;
if (typeof summary === "string") {
return summary.trim();
}
if (Array.isArray(summary)) {
const summaryText = summary
.map((part) => {
if (typeof part === "string") {
return part.trim();
}
if (!part || typeof part !== "object") {
return "";
}
return typeof (part as { text?: unknown }).text === "string"
? ((part as { text: string }).text ?? "").trim()
: "";
})
.filter(Boolean)
.join("\n")
.trim();
if (summaryText) {
return summaryText;
}
}
return typeof item.content === "string" ? item.content.trim() : "";
}
function toExpectedReasoningSignature(item: { id?: string; type: string }) {
return {
type: item.type,
...(typeof item.id === "string" && item.id.startsWith("rs_") ? { id: item.id } : {}),
};
}
/** Each test gets a unique session ID to avoid cross-test interference. */
const sessions: string[] = [];
function freshSession(name: string): string {
@ -124,6 +188,7 @@ describe("OpenAI WebSocket e2e", () => {
createAssistantMessageEventStream: actual.createAssistantMessageEventStream,
};
});
openAIWsConnectionModule = await import("./openai-ws-connection.js");
openAIWsStreamModule = await import("./openai-ws-stream.js");
});
@ -131,6 +196,7 @@ describe("OpenAI WebSocket e2e", () => {
for (const id of sessions) {
openAIWsStreamModule.releaseWsSession(id);
}
openAIWsStreamModule.__testing.setDepsForTest();
sessions.length = 0;
});
@ -211,6 +277,90 @@ describe("OpenAI WebSocket e2e", () => {
60_000,
);
testFn(
"surfaces replay-safe reasoning metadata on websocket tool turns",
async () => {
const sid = freshSession("tool-reasoning");
const completedResponses: ResponseObject[] = [];
openAIWsStreamModule.__testing.setDepsForTest({
createManager: (options) => {
const manager = new openAIWsConnectionModule.OpenAIWebSocketManager(options);
manager.onMessage((event) => {
if (event.type === "response.completed") {
completedResponses.push(event.response);
}
});
return manager;
},
});
const streamFn = openAIWsStreamModule.createOpenAIWebSocketStreamFn(API_KEY!, sid);
const firstContext = makeToolContext(
"Think carefully, call the tool `noop` with {} first, then after the tool result reply with exactly TOOL_OK.",
);
const firstDone = expectDone(
await collectEvents(
streamFn(model, firstContext, {
transport: "websocket",
toolChoice: "required",
reasoningEffort: "high",
reasoningSummary: "detailed",
maxTokens: 256,
} as unknown as StreamFnParams[2]),
),
);
const firstResponse = completedResponses[0];
expect(firstResponse).toBeDefined();
const rawReasoningItems = (firstResponse?.output ?? []).filter(
(item): item is Extract<OutputItem, { type: "reasoning" | `reasoning.${string}` }> =>
item.type === "reasoning" || item.type.startsWith("reasoning."),
);
const replayableReasoningItems = rawReasoningItems.filter(
(item) => extractReasoningText(item).length > 0,
);
const thinkingBlocks = extractThinkingBlocks(firstDone);
expect(thinkingBlocks).toHaveLength(replayableReasoningItems.length);
expect(thinkingBlocks.map((block) => block.thinking)).toEqual(
replayableReasoningItems.map((item) => extractReasoningText(item)),
);
expect(
thinkingBlocks.map((block) => parseReasoningSignature(block.thinkingSignature)),
).toEqual(replayableReasoningItems.map((item) => toExpectedReasoningSignature(item)));
const rawToolCall = firstResponse?.output.find(
(item): item is Extract<OutputItem, { type: "function_call" }> =>
item.type === "function_call",
);
expect(rawToolCall).toBeDefined();
const toolCall = extractToolCall(firstDone);
expect(toolCall?.name).toBe(rawToolCall?.name);
expect(toolCall?.id).toBe(
rawToolCall ? `${rawToolCall.call_id}|${rawToolCall.id}` : undefined,
);
const secondContext = {
...firstContext,
messages: [
...firstContext.messages,
firstDone,
makeToolResultMessage(toolCall!.id, "TOOL_OK"),
],
} as unknown as StreamFnParams[1];
const secondDone = expectDone(
await collectEvents(
streamFn(model, secondContext, {
transport: "websocket",
maxTokens: 128,
}),
),
);
expect(assistantText(secondDone)).toMatch(/TOOL_OK/);
},
60_000,
);
testFn(
"supports websocket warm-up before the first request",
async () => {

View File

@ -618,6 +618,42 @@ describe("convertMessagesToInputItems", () => {
expect(items[0]).toMatchObject({ type: "reasoning", id: "rs_summary" });
});
it("drops reasoning replay ids that do not match OpenAI reasoning ids", () => {
const msg = {
role: "assistant" as const,
content: [
{
type: "thinking" as const,
thinking: "internal reasoning...",
thinkingSignature: JSON.stringify({
type: "reasoning",
id: " bad-id ",
}),
},
{ type: "text" as const, text: "Here is my answer." },
],
stopReason: "stop",
api: "openai-responses",
provider: "openai",
model: "gpt-5.2",
usage: {},
timestamp: 0,
};
const items = convertMessagesToInputItems([msg] as Parameters<
typeof convertMessagesToInputItems
>[0]);
expect(items).toEqual([
{
type: "reasoning",
},
{
type: "message",
role: "assistant",
content: "Here is my answer.",
},
]);
});
it("returns empty array for empty messages", () => {
expect(convertMessagesToInputItems([])).toEqual([]);
});
@ -764,6 +800,63 @@ describe("buildAssistantMessageFromResponse", () => {
);
});
it("prefers reasoning summary text over fallback content and preserves item order", () => {
const response = {
id: "resp_reasoning_order",
object: "response",
created_at: Date.now(),
status: "completed",
model: "gpt-5.2",
output: [
{
type: "reasoning.summary",
id: "rs_789",
summary: ["Plan A", { text: "Plan B" }, { nope: true }],
content: "hidden fallback content",
},
{
type: "function_call",
id: "fc_789",
call_id: "call_789",
name: "exec",
arguments: '{"arg":"value"}',
},
],
usage: { input_tokens: 100, output_tokens: 50, total_tokens: 150 },
} as unknown as ResponseObject;
const msg = buildAssistantMessageFromResponse(response, modelInfo);
expect(msg.content.map((block) => block.type)).toEqual(["thinking", "toolCall"]);
const thinkingBlock = msg.content[0] as
| { type: "thinking"; thinking: string; thinkingSignature?: string }
| undefined;
expect(thinkingBlock?.thinking).toBe("Plan A\nPlan B");
expect(thinkingBlock?.thinkingSignature).toBe(
JSON.stringify({ id: "rs_789", type: "reasoning.summary" }),
);
});
it("drops invalid reasoning ids from thinking signatures while preserving the visible block", () => {
const response = {
id: "resp_invalid_reasoning_id",
object: "response",
created_at: Date.now(),
status: "completed",
model: "gpt-5.2",
output: [
{
type: "reasoning",
id: "invalid_reasoning_id",
content: "Hidden reasoning",
},
],
usage: { input_tokens: 100, output_tokens: 50, total_tokens: 150 },
} as unknown as ResponseObject;
const msg = buildAssistantMessageFromResponse(response, modelInfo);
expect(msg.content).toEqual([{ type: "thinking", thinking: "Hidden reasoning" }]);
});
it("preserves function call item ids for replay when reasoning is present", () => {
const response = {
id: "resp_tool_reasoning",
@ -1148,6 +1241,95 @@ describe("createOpenAIWebSocketStreamFn", () => {
expect(inputTypes).toHaveLength(1);
});
it("omits previous_response_id when replaying full context on follow-up turns", async () => {
const sessionId = "sess-full-context-replay";
const streamFn = createOpenAIWebSocketStreamFn("sk-test", sessionId);
const ctx1 = {
systemPrompt: "You are helpful.",
messages: [userMsg("Run ls")] as Parameters<typeof convertMessagesToInputItems>[0],
tools: [],
};
const turn1Response = {
id: "resp_turn1_reasoning",
object: "response",
created_at: Date.now(),
status: "completed",
model: "gpt-5.2",
output: [
{
type: "reasoning",
id: "rs_turn1",
content: "Thinking before tool call",
},
{
type: "function_call",
id: "fc_turn1",
call_id: "call_turn1",
name: "exec",
arguments: '{"cmd":"ls"}',
},
],
usage: { input_tokens: 12, output_tokens: 8, total_tokens: 20 },
} as ResponseObject;
const stream1 = streamFn(
modelStub as Parameters<typeof streamFn>[0],
ctx1 as Parameters<typeof streamFn>[1],
);
const done1 = (async () => {
for await (const _ of await resolveStream(stream1)) {
/* consume */
}
})();
await new Promise((r) => setImmediate(r));
const manager = MockManager.lastInstance!;
manager.setPreviousResponseId("resp_turn1_reasoning");
manager.simulateEvent({ type: "response.completed", response: turn1Response });
await done1;
const ctx2 = {
systemPrompt: "You are helpful.",
messages: [
userMsg("Run ls"),
buildAssistantMessageFromResponse(turn1Response, modelStub),
] as Parameters<typeof convertMessagesToInputItems>[0],
tools: [],
};
const stream2 = streamFn(
modelStub as Parameters<typeof streamFn>[0],
ctx2 as Parameters<typeof streamFn>[1],
);
const done2 = (async () => {
for await (const _ of await resolveStream(stream2)) {
/* consume */
}
})();
await new Promise((r) => setImmediate(r));
manager.simulateEvent({
type: "response.completed",
response: makeResponseObject("resp_turn2", "Done"),
});
await done2;
const sent2 = manager.sentEvents[1] as {
previous_response_id?: string;
input: Array<{ type: string; id?: string; call_id?: string }>;
};
expect(sent2.previous_response_id).toBeUndefined();
expect(sent2.input.map((item) => item.type)).toEqual(["message", "reasoning", "function_call"]);
expect(sent2.input[1]).toMatchObject({ type: "reasoning", id: "rs_turn1" });
expect(sent2.input[2]).toMatchObject({
type: "function_call",
call_id: "call_turn1",
id: "fc_turn1",
});
});
it("sends instructions (system prompt) in each request", async () => {
const streamFn = createOpenAIWebSocketStreamFn("sk-test", "sess-tools");
const ctx = {

View File

@ -879,6 +879,7 @@ export function createOpenAIWebSocketStreamFn(
// ── 3. Compute incremental vs full input ─────────────────────────────
const prevResponseId = session.manager.previousResponseId;
let requestPreviousResponseId: string | undefined;
let inputItems: InputItem[];
if (prevResponseId && session.lastContextLength > 0) {
@ -887,13 +888,15 @@ export function createOpenAIWebSocketStreamFn(
// Filter to only tool results — the assistant message is already in server context
const toolResults = newMessages.filter((m) => (m as AnyMessage).role === "toolResult");
if (toolResults.length === 0) {
// Shouldn't happen in a well-formed turn, but fall back to full context
// The WebSocket guide requires a fresh full-context turn here: when we
// cannot continue the incremental chain, omit previous_response_id.
log.debug(
`[ws-stream] session=${sessionId}: no new tool results found; sending full context`,
`[ws-stream] session=${sessionId}: no new tool results found; sending full context without previous_response_id`,
);
inputItems = buildFullInput(context, model);
} else {
inputItems = convertMessagesToInputItems(toolResults, model);
requestPreviousResponseId = prevResponseId;
}
log.debug(
`[ws-stream] session=${sessionId}: incremental send (${inputItems.length} tool results) previous_response_id=${prevResponseId}`,
@ -955,7 +958,7 @@ export function createOpenAIWebSocketStreamFn(
input: inputItems,
instructions: context.systemPrompt ?? undefined,
tools: tools.length > 0 ? tools : undefined,
...(prevResponseId ? { previous_response_id: prevResponseId } : {}),
...(requestPreviousResponseId ? { previous_response_id: requestPreviousResponseId } : {}),
...extraParams,
};
const nextPayload = options?.onPayload?.(payload, model);