From 98ce1c29027aa075c3a3cf5cc5186a341ec17e2f Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sun, 5 Apr 2026 18:30:14 +0100 Subject: [PATCH] fix: hide commentary partial leaks until final answer (#59643) (thanks @ringlochid) --- CHANGELOG.md | 1 + src/agents/openai-ws-stream.test.ts | 6 +- src/agents/openai-ws-stream.ts | 66 ++++++++++++++----- ...bedded-subscribe.handlers.messages.test.ts | 14 +--- ...pi-embedded-subscribe.handlers.messages.ts | 54 ++------------- src/agents/pi-embedded-utils.test.ts | 4 +- src/agents/pi-embedded-utils.ts | 5 -- 7 files changed, 64 insertions(+), 86 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6b99bbd6930..ba6d2ab2443 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -71,6 +71,7 @@ Docs: https://docs.openclaw.ai - Telegram/local Bot API: honor `channels.telegram.apiRoot` for buffered media downloads, add `channels.telegram.network.dangerouslyAllowPrivateNetwork` for trusted fake-IP setups, and require `channels.telegram.trustedLocalFileRoots` before reading absolute Bot API `file_path` values. (#59544, #60705) Thanks @SARAMALI15792 and @obviyus. - Outbound/sanitizer: strip leaked ``, ``, and model special tokens from shared user-visible assistant text, including truncated tool-call streams, so internal scaffolding no longer bleeds into replies across surfaces. (#60619) Thanks @oliviareid-svg. - Agents/output delivery: suppress `phase:”commentary”` assistant text at the embedded subscribe boundary so internal planning text cannot leak into user-visible replies or Telegram partials. (#61282) Thanks @mbelinky. +- Agents/streaming: keep commentary-only partials hidden until `final_answer` is available and buffer OpenAI Responses websocket text deltas until phase metadata arrives, so commentary does not leak into visible embedded replies. (#59643) Thanks @ringlochid. - Agents/errors: surface an explicit disk-full message when local session or transcript writes fail with `ENOSPC`/`disk full`, so those runs stop degrading into opaque `NO_REPLY`-style failures. Thanks @vincentkoc. - Exec approvals: remove heuristic command-obfuscation gating from host exec so gateway and node runs rely on explicit policy, allowlist, and strict inline-eval rules only. - Config/All Settings: keep the raw config view intact when sensitive fields are blank instead of corrupting or dropping the rendered snapshot. (#28214) Thanks @solodmd. diff --git a/src/agents/openai-ws-stream.test.ts b/src/agents/openai-ws-stream.test.ts index 0e2d4bdff03..16308215419 100644 --- a/src/agents/openai-ws-stream.test.ts +++ b/src/agents/openai-ws-stream.test.ts @@ -1839,7 +1839,7 @@ describe("createOpenAIWebSocketStreamFn", () => { ]); }); - it("degrades safely when a text delta arrives before its item mapping", async () => { + it("buffers text deltas until item mapping is available", async () => { const streamFn = createOpenAIWebSocketStreamFn("sk-test", "sess-phase-late-map"); const stream = streamFn( modelStub as Parameters[0], @@ -1910,12 +1910,12 @@ describe("createOpenAIWebSocketStreamFn", () => { const deltas = events.filter((event) => event.type === "text_delta"); expect(deltas).toHaveLength(2); expect(deltas[0]).toMatchObject({ delta: "Working" }); - expect(deltas[0]?.partial?.phase).toBeUndefined(); + expect(deltas[0]?.partial?.phase).toBe("commentary"); expect(deltas[0]?.partial?.content).toEqual([ { type: "text", text: "Working", - textSignature: JSON.stringify({ v: 1, id: "item_late" }), + textSignature: JSON.stringify({ v: 1, id: "item_late", phase: "commentary" }), }, ]); expect(deltas[1]).toMatchObject({ delta: "..." }); diff --git a/src/agents/openai-ws-stream.ts b/src/agents/openai-ws-stream.ts index 78904ac06a6..c9e5fd80f3b 100644 --- a/src/agents/openai-ws-stream.ts +++ b/src/agents/openai-ws-stream.ts @@ -907,6 +907,10 @@ export function createOpenAIWebSocketStreamFn( } const outputItemPhaseById = new Map(); + const outputTextByPart = new Map(); + const emittedTextByPart = new Map(); + const getOutputTextKey = (itemId: string, contentIndex: number) => + `${itemId}:${contentIndex}`; const emitTextDelta = (params: { fullText: string; deltaText: string; @@ -946,6 +950,24 @@ export function createOpenAIWebSocketStreamFn( partial: partialMsg, }); }; + const emitBufferedTextDelta = (params: { itemId: string; contentIndex: number }) => { + const key = getOutputTextKey(params.itemId, params.contentIndex); + const fullText = outputTextByPart.get(key) ?? ""; + const emittedText = emittedTextByPart.get(key) ?? ""; + if (!fullText || fullText === emittedText) { + return; + } + const deltaText = fullText.startsWith(emittedText) + ? fullText.slice(emittedText.length) + : fullText; + emittedTextByPart.set(key, fullText); + emitTextDelta({ + fullText, + deltaText, + itemId: params.itemId, + contentIndex: params.contentIndex, + }); + }; const capturedContextLength = context.messages.length; let sawWsOutput = false; @@ -953,6 +975,8 @@ export function createOpenAIWebSocketStreamFn( await new Promise((resolve, reject) => { const abortHandler = () => { outputItemPhaseById.clear(); + outputTextByPart.clear(); + emittedTextByPart.clear(); cleanup(); reject(new Error("aborted")); }; @@ -964,6 +988,8 @@ export function createOpenAIWebSocketStreamFn( const closeHandler = (code: number, reason: string) => { outputItemPhaseById.clear(); + outputTextByPart.clear(); + emittedTextByPart.clear(); cleanup(); const closeInfo = session.manager.lastCloseInfo; reject( @@ -986,10 +1012,6 @@ export function createOpenAIWebSocketStreamFn( unsubscribe(); }; - const outputTextByPart = new Map(); - const getOutputTextKey = (itemId: string, contentIndex: number) => - `${itemId}:${contentIndex}`; - const unsubscribe = session.manager.onMessage((event) => { if ( event.type === "response.output_item.added" || @@ -1014,6 +1036,15 @@ export function createOpenAIWebSocketStreamFn( ? normalizeAssistantPhase((event.item as { phase?: unknown }).phase) : undefined; outputItemPhaseById.set(event.item.id, itemPhase); + for (const key of outputTextByPart.keys()) { + if (key.startsWith(`${event.item.id}:`)) { + const [, contentIndexText] = key.split(":"); + emitBufferedTextDelta({ + itemId: event.item.id, + contentIndex: Number.parseInt(contentIndexText ?? "0", 10) || 0, + }); + } + } } return; } @@ -1022,26 +1053,22 @@ export function createOpenAIWebSocketStreamFn( const key = getOutputTextKey(event.item_id, event.content_index); const nextText = `${outputTextByPart.get(key) ?? ""}${event.delta}`; outputTextByPart.set(key, nextText); - emitTextDelta({ - fullText: nextText, - deltaText: event.delta, - itemId: event.item_id, - contentIndex: event.content_index, - }); + if (outputItemPhaseById.has(event.item_id)) { + emitBufferedTextDelta({ + itemId: event.item_id, + contentIndex: event.content_index, + }); + } return; } if (event.type === "response.output_text.done") { const key = getOutputTextKey(event.item_id, event.content_index); - const previousText = outputTextByPart.get(key) ?? ""; - if (event.text && event.text !== previousText) { + if (event.text && event.text !== outputTextByPart.get(key)) { outputTextByPart.set(key, event.text); - const deltaText = event.text.startsWith(previousText) - ? event.text.slice(previousText.length) - : event.text; - emitTextDelta({ - fullText: event.text, - deltaText, + } + if (outputItemPhaseById.has(event.item_id)) { + emitBufferedTextDelta({ itemId: event.item_id, contentIndex: event.content_index, }); @@ -1052,6 +1079,7 @@ export function createOpenAIWebSocketStreamFn( if (event.type === "response.completed") { outputItemPhaseById.clear(); outputTextByPart.clear(); + emittedTextByPart.clear(); cleanup(); session.lastContextLength = capturedContextLength; const assistantMsg = buildAssistantMessageFromResponse(event.response, { @@ -1066,6 +1094,7 @@ export function createOpenAIWebSocketStreamFn( } else if (event.type === "response.failed") { outputItemPhaseById.clear(); outputTextByPart.clear(); + emittedTextByPart.clear(); cleanup(); reject( new OpenAIWebSocketRuntimeError( @@ -1079,6 +1108,7 @@ export function createOpenAIWebSocketStreamFn( } else if (event.type === "error") { outputItemPhaseById.clear(); outputTextByPart.clear(); + emittedTextByPart.clear(); cleanup(); reject( new OpenAIWebSocketRuntimeError( diff --git a/src/agents/pi-embedded-subscribe.handlers.messages.test.ts b/src/agents/pi-embedded-subscribe.handlers.messages.test.ts index dc44833ddb7..400d23c0cb1 100644 --- a/src/agents/pi-embedded-subscribe.handlers.messages.test.ts +++ b/src/agents/pi-embedded-subscribe.handlers.messages.test.ts @@ -254,7 +254,7 @@ describe("handleMessageUpdate", () => { expect(ctx.state.blockBuffer).toBe(""); }); - it("replaces commentary preview text when a final_answer partial arrives", () => { + it("suppresses commentary partials until a final_answer partial arrives", () => { const onAgentEvent = vi.fn(); const ctx = { params: { @@ -339,20 +339,12 @@ describe("handleMessageUpdate", () => { }, } as never); - expect(onAgentEvent).toHaveBeenCalledTimes(2); + expect(onAgentEvent).toHaveBeenCalledTimes(1); expect(onAgentEvent.mock.calls[0]?.[0]).toMatchObject({ - stream: "assistant", - data: { - text: "Working...", - delta: "Working...", - }, - }); - expect(onAgentEvent.mock.calls[1]?.[0]).toMatchObject({ stream: "assistant", data: { text: "Done.", - delta: "", - replace: true, + delta: "Done.", }, }); }); diff --git a/src/agents/pi-embedded-subscribe.handlers.messages.ts b/src/agents/pi-embedded-subscribe.handlers.messages.ts index 947c7d14c46..41a13217285 100644 --- a/src/agents/pi-embedded-subscribe.handlers.messages.ts +++ b/src/agents/pi-embedded-subscribe.handlers.messages.ts @@ -350,33 +350,11 @@ export function handleMessageUpdate( ? (assistantRecord.partial as AssistantMessage) : msg; const phaseAwareVisibleText = coerceText(extractAssistantVisibleText(partialAssistant)).trim(); - const partialVisiblePhase = normalizeAssistantPhase( - (partialAssistant as { phase?: unknown }).phase, - ); - const explicitStructuredPartialPhases = new Set(); - if (Array.isArray(partialAssistant.content)) { - for (const block of partialAssistant.content) { - if (!block || typeof block !== "object") { - continue; - } - const record = block as { type?: unknown; textSignature?: unknown }; - if (record.type !== "text") { - continue; - } - const signaturePhase = getAssistantTextSignaturePhase(record.textSignature); - if (signaturePhase) { - explicitStructuredPartialPhases.add(signaturePhase); - } - } + const deliveryPhase = resolveAssistantDeliveryPhase(partialAssistant); + if (deliveryPhase === "commentary" && !phaseAwareVisibleText) { + return; } - const structuredPartialPhase = - explicitStructuredPartialPhases.size === 1 - ? [...explicitStructuredPartialPhases][0] - : undefined; - const shouldUsePhaseAwareBlockReply = Boolean(partialVisiblePhase || structuredPartialPhase); - const shouldDeferTextEndBlockReply = - shouldUsePhaseAwareBlockReply && - (partialVisiblePhase ?? structuredPartialPhase) !== "final_answer"; + const shouldUsePhaseAwareBlockReply = Boolean(deliveryPhase); if (chunk) { ctx.state.deltaBuffer += chunk; @@ -438,7 +416,7 @@ export function handleMessageUpdate( ctx.blockChunker?.reset(); } const blockReplyChunk = replace ? cleanedText : deltaText; - if (!shouldDeferTextEndBlockReply && blockReplyChunk) { + if (blockReplyChunk) { if (ctx.blockChunker) { ctx.blockChunker.append(blockReplyChunk); } else { @@ -446,12 +424,7 @@ export function handleMessageUpdate( } } - if ( - !shouldDeferTextEndBlockReply && - evtType === "text_end" && - !ctx.state.lastBlockReplyText && - cleanedText - ) { + if (evtType === "text_end" && !ctx.state.lastBlockReplyText && cleanedText) { if (ctx.blockChunker) { ctx.blockChunker.reset(); ctx.blockChunker.append(cleanedText); @@ -493,7 +466,6 @@ export function handleMessageUpdate( if ( !ctx.params.silentExpected && - !shouldDeferTextEndBlockReply && ctx.params.onBlockReply && ctx.blockChunking && ctx.state.blockReplyBreak === "text_end" @@ -503,7 +475,6 @@ export function handleMessageUpdate( if ( !ctx.params.silentExpected && - !shouldDeferTextEndBlockReply && evtType === "text_end" && ctx.state.blockReplyBreak === "text_end" ) { @@ -546,7 +517,7 @@ export function handleMessageEnd( }); const text = resolveSilentReplyFallbackText({ - text: ctx.stripBlockTags(rawVisibleText || rawText, { thinking: false, final: false }), + text: ctx.stripBlockTags(rawVisibleText, { thinking: false, final: false }), messagingToolSentTexts: ctx.state.messagingToolSentTexts, }); const rawThinking = @@ -577,17 +548,6 @@ export function handleMessageEnd( return; } - if (!cleanedText && !hasMedia && !ctx.params.enforceFinalTag) { - const rawTrimmed = coerceText(rawText).trim(); - const rawStrippedFinal = rawTrimmed.replace(/<\s*\/?\s*final\s*>/gi, "").trim(); - const rawCandidate = rawStrippedFinal || rawTrimmed; - if (rawCandidate) { - const parsedFallback = parseReplyDirectives(stripTrailingDirective(rawCandidate)); - cleanedText = parsedFallback.text ?? rawCandidate; - ({ mediaUrls, hasMedia } = resolveSendableOutboundReplyParts(parsedFallback)); - } - } - const previousStreamedText = ctx.state.lastStreamedAssistantCleaned ?? ""; const shouldReplaceFinalStream = Boolean( previousStreamedText && cleanedText && !cleanedText.startsWith(previousStreamedText), diff --git a/src/agents/pi-embedded-utils.test.ts b/src/agents/pi-embedded-utils.test.ts index 1d3677b8411..a692f8728bb 100644 --- a/src/agents/pi-embedded-utils.test.ts +++ b/src/agents/pi-embedded-utils.test.ts @@ -594,7 +594,7 @@ describe("extractAssistantVisibleText", () => { expect(extractAssistantVisibleText(msg)).toBe("Done."); }); - it("falls back to commentary when final_answer is empty", () => { + it("does not fall back to commentary when final_answer is empty", () => { const msg = makeAssistantMessage({ role: "assistant", content: [ @@ -612,7 +612,7 @@ describe("extractAssistantVisibleText", () => { timestamp: Date.now(), }); - expect(extractAssistantVisibleText(msg)).toBe("Working..."); + expect(extractAssistantVisibleText(msg)).toBe(""); }); it("falls back to legacy unphased text when phased text is absent", () => { diff --git a/src/agents/pi-embedded-utils.ts b/src/agents/pi-embedded-utils.ts index b60d0122e7d..c4d016b5a4f 100644 --- a/src/agents/pi-embedded-utils.ts +++ b/src/agents/pi-embedded-utils.ts @@ -336,11 +336,6 @@ export function extractAssistantVisibleText(msg: AssistantMessage): string { return finalAnswerText; } - const commentaryText = extractAssistantTextForPhase(msg, "commentary"); - if (commentaryText.trim()) { - return commentaryText; - } - return extractAssistantTextForPhase(msg); }