diff --git a/src/agents/openai-ws-message-conversion.ts b/src/agents/openai-ws-message-conversion.ts index d3ee22b3d3c..fa2f7961102 100644 --- a/src/agents/openai-ws-message-conversion.ts +++ b/src/agents/openai-ws-message-conversion.ts @@ -349,10 +349,22 @@ export function convertMessagesToInputItems( if (m.role === "assistant") { const content = m.content; - let assistantPhase = normalizeAssistantPhase(m.phase); + const assistantMessagePhase = normalizeAssistantPhase(m.phase); if (Array.isArray(content)) { const textParts: string[] = []; - const pushAssistantText = () => { + let currentTextPhase: OpenAIResponsesAssistantPhase | undefined; + const hasExplicitBlockPhase = content.some((block) => { + if (!block || typeof block !== "object") { + return false; + } + const record = block as { type?: unknown; textSignature?: unknown }; + return ( + record.type === "text" && + Boolean(parseAssistantTextSignature(record.textSignature)?.phase) + ); + }); + + const pushAssistantText = (phase?: OpenAIResponsesAssistantPhase) => { if (textParts.length === 0) { return; } @@ -360,7 +372,7 @@ export function convertMessagesToInputItems( type: "message", role: "assistant", content: textParts.join(""), - ...(assistantPhase ? { phase: assistantPhase } : {}), + ...(phase ? { phase } : {}), }); textParts.length = 0; }; @@ -376,15 +388,18 @@ export function convertMessagesToInputItems( }>) { if (block.type === "text" && typeof block.text === "string") { const parsedSignature = parseAssistantTextSignature(block.textSignature); - if (!assistantPhase) { - assistantPhase = parsedSignature?.phase; + const blockPhase = + parsedSignature?.phase ?? (hasExplicitBlockPhase ? undefined : assistantMessagePhase); + if (textParts.length > 0 && blockPhase !== currentTextPhase) { + pushAssistantText(currentTextPhase); } textParts.push(block.text); + currentTextPhase = blockPhase; continue; } if (block.type === "thinking") { - pushAssistantText(); + pushAssistantText(currentTextPhase); const reasoningItem = parseThinkingSignature(block.thinkingSignature); if (reasoningItem) { items.push(reasoningItem); @@ -396,7 +411,7 @@ export function convertMessagesToInputItems( continue; } - pushAssistantText(); + pushAssistantText(currentTextPhase); const replayId = decodeToolCallReplayId(block.id); const toolName = toNonEmptyString(block.name); if (!replayId || !toolName) { @@ -414,7 +429,7 @@ export function convertMessagesToInputItems( }); } - pushAssistantText(); + pushAssistantText(currentTextPhase); continue; } @@ -426,7 +441,7 @@ export function convertMessagesToInputItems( type: "message", role: "assistant", content: text, - ...(assistantPhase ? { phase: assistantPhase } : {}), + ...(assistantMessagePhase ? { phase: assistantMessagePhase } : {}), }); continue; } @@ -471,16 +486,20 @@ export function buildAssistantMessageFromResponse( modelInfo: { api: string; provider: string; id: string }, ): AssistantMessage { const content: AssistantMessage["content"] = []; - let assistantPhase: OpenAIResponsesAssistantPhase | undefined; + const assistantPhases = new Set(); + let hasUnphasedAssistantText = false; for (const item of response.output ?? []) { if (item.type === "message") { const itemPhase = normalizeAssistantPhase(item.phase); if (itemPhase) { - assistantPhase = itemPhase; + assistantPhases.add(itemPhase); } for (const part of item.content ?? []) { if (part.type === "output_text" && part.text) { + if (!itemPhase) { + hasUnphasedAssistantText = true; + } content.push({ type: "text", text: part.text, @@ -553,7 +572,10 @@ export function buildAssistantMessageFromResponse( }), }); - return assistantPhase - ? ({ ...message, phase: assistantPhase } as AssistantMessageWithPhase) + const finalAssistantPhase = + assistantPhases.size === 1 && !hasUnphasedAssistantText ? [...assistantPhases][0] : undefined; + + return finalAssistantPhase + ? ({ ...message, phase: finalAssistantPhase } as AssistantMessageWithPhase) : message; } diff --git a/src/agents/openai-ws-stream.test.ts b/src/agents/openai-ws-stream.test.ts index 5b84ab1dbcb..0e2d4bdff03 100644 --- a/src/agents/openai-ws-stream.test.ts +++ b/src/agents/openai-ws-stream.test.ts @@ -599,6 +599,146 @@ describe("convertMessagesToInputItems", () => { }); }); + it("splits replayed assistant text on phase changes from block signatures", () => { + const msg = { + role: "assistant" as const, + phase: "final_answer" as const, + content: [ + { + type: "text" as const, + text: "Working... ", + textSignature: JSON.stringify({ v: 1, id: "item_commentary", phase: "commentary" }), + }, + { + type: "text" as const, + text: "Done.", + textSignature: JSON.stringify({ v: 1, id: "item_final", phase: "final_answer" }), + }, + ], + stopReason: "stop", + api: "openai-responses", + provider: "openai", + model: "gpt-5.2", + usage: {}, + timestamp: 0, + }; + + expect( + convertMessagesToInputItems([msg] as unknown as Parameters< + typeof convertMessagesToInputItems + >[0]), + ).toEqual([ + { + type: "message", + role: "assistant", + content: "Working... ", + phase: "commentary", + }, + { + type: "message", + role: "assistant", + content: "Done.", + phase: "final_answer", + }, + ]); + }); + + it("splits legacy unphased text from later phased text during replay", () => { + const msg = { + role: "assistant" as const, + phase: "final_answer" as const, + content: [ + { + type: "text" as const, + text: "Legacy. ", + }, + { + type: "text" as const, + text: "Done.", + textSignature: JSON.stringify({ v: 1, id: "item_final", phase: "final_answer" }), + }, + ], + stopReason: "stop", + api: "openai-responses", + provider: "openai", + model: "gpt-5.2", + usage: {}, + timestamp: 0, + }; + + expect( + convertMessagesToInputItems([msg] as unknown as Parameters< + typeof convertMessagesToInputItems + >[0]), + ).toEqual([ + { + type: "message", + role: "assistant", + content: "Legacy. ", + }, + { + type: "message", + role: "assistant", + content: "Done.", + phase: "final_answer", + }, + ]); + }); + + it("preserves ordering when commentary text, tool calls, and final answer share one stored assistant message", () => { + const msg = { + role: "assistant" as const, + content: [ + { + type: "text" as const, + text: "Working... ", + textSignature: JSON.stringify({ v: 1, id: "item_commentary", phase: "commentary" }), + }, + { + type: "toolCall" as const, + id: "call_1|fc_1", + name: "exec", + arguments: { cmd: "ls" }, + }, + { + type: "text" as const, + text: "Done.", + textSignature: JSON.stringify({ v: 1, id: "item_final", phase: "final_answer" }), + }, + ], + stopReason: "toolUse", + api: "openai-responses", + provider: "openai", + model: "gpt-5.2", + usage: {}, + timestamp: 0, + }; + + expect( + convertMessagesToInputItems([msg] as Parameters[0]), + ).toEqual([ + { + type: "message", + role: "assistant", + content: "Working... ", + phase: "commentary", + }, + { + type: "function_call", + id: "fc_1", + call_id: "call_1", + name: "exec", + arguments: JSON.stringify({ cmd: "ls" }), + }, + { + type: "message", + role: "assistant", + content: "Done.", + phase: "final_answer", + }, + ]); + }); + it("converts a tool result message", () => { const items = convertMessagesToInputItems([toolResultMsg("call_1", "file.txt")] as Parameters< typeof convertMessagesToInputItems @@ -926,6 +1066,97 @@ describe("buildAssistantMessageFromResponse", () => { expect(msg.content[0]?.text).toBe("Final answer"); }); + it("omits top-level phase when a response contains mixed assistant phases", () => { + const response = { + id: "resp_mixed_phase", + object: "response", + created_at: Date.now(), + status: "completed", + model: "gpt-5.2", + output: [ + { + type: "message", + id: "item_commentary", + role: "assistant", + phase: "commentary", + content: [{ type: "output_text", text: "Working... " }], + }, + { + type: "message", + id: "item_final", + role: "assistant", + phase: "final_answer", + content: [{ type: "output_text", text: "Done." }], + }, + ], + usage: { input_tokens: 100, output_tokens: 50, total_tokens: 150 }, + } as unknown as ResponseObject; + + const msg = buildAssistantMessageFromResponse(response, modelInfo) as { + phase?: string; + content: Array<{ type: string; text?: string; textSignature?: string }>; + }; + + expect(msg.phase).toBeUndefined(); + expect(msg.content).toMatchObject([ + { + type: "text", + text: "Working... ", + textSignature: JSON.stringify({ v: 1, id: "item_commentary", phase: "commentary" }), + }, + { + type: "text", + text: "Done.", + textSignature: JSON.stringify({ v: 1, id: "item_final", phase: "final_answer" }), + }, + ]); + }); + + it("omits top-level phase when unphased legacy text and phased final text coexist", () => { + const response = { + id: "resp_unphased_plus_final", + object: "response", + created_at: Date.now(), + status: "completed", + model: "gpt-5.2", + output: [ + { + type: "message", + id: "item_legacy", + role: "assistant", + content: [{ type: "output_text", text: "Legacy. " }], + }, + { + type: "message", + id: "item_final", + role: "assistant", + phase: "final_answer", + content: [{ type: "output_text", text: "Done." }], + }, + ], + usage: { input_tokens: 100, output_tokens: 50, total_tokens: 150 }, + } as unknown as ResponseObject; + + const msg = buildAssistantMessageFromResponse(response, modelInfo) as { + phase?: string; + content: Array<{ type: string; text?: string; textSignature?: string }>; + }; + + expect(msg.phase).toBeUndefined(); + expect(msg.content).toMatchObject([ + { + type: "text", + text: "Legacy. ", + textSignature: JSON.stringify({ v: 1, id: "item_legacy" }), + }, + { + type: "text", + text: "Done.", + textSignature: JSON.stringify({ v: 1, id: "item_final", phase: "final_answer" }), + }, + ]); + }); + it("maps reasoning output items to thinking blocks with signature", () => { const response = { id: "resp_reasoning", @@ -1244,6 +1475,8 @@ describe("createOpenAIWebSocketStreamFn", () => { releaseWsSession("sess-incremental"); releaseWsSession("sess-full"); releaseWsSession("sess-phase"); + releaseWsSession("sess-phase-stream"); + releaseWsSession("sess-phase-late-map"); releaseWsSession("sess-tools"); releaseWsSession("sess-store-default"); releaseWsSession("sess-store-compat"); @@ -1482,6 +1715,220 @@ describe("createOpenAIWebSocketStreamFn", () => { expect(doneEvent?.message.stopReason).toBe("toolUse"); }); + it("emits accumulated phase-aware partials when output item mapping is available", async () => { + const streamFn = createOpenAIWebSocketStreamFn("sk-test", "sess-phase-stream"); + const stream = streamFn( + modelStub as Parameters[0], + contextStub as Parameters[1], + ); + + const events: Array<{ + type?: string; + delta?: string; + partial?: { phase?: string; content?: unknown[] }; + }> = []; + const done = (async () => { + for await (const ev of await resolveStream(stream)) { + events.push(ev as (typeof events)[number]); + } + })(); + + await new Promise((r) => setImmediate(r)); + const manager = MockManager.lastInstance!; + manager.simulateEvent({ + type: "response.output_item.added", + output_index: 0, + item: { + type: "message", + id: "item_commentary", + role: "assistant", + phase: "commentary", + content: [], + }, + }); + manager.simulateEvent({ + type: "response.output_text.delta", + item_id: "item_commentary", + output_index: 0, + content_index: 0, + delta: "Working", + }); + manager.simulateEvent({ + type: "response.output_text.delta", + item_id: "item_commentary", + output_index: 0, + content_index: 0, + delta: "...", + }); + manager.simulateEvent({ + type: "response.output_item.added", + output_index: 1, + item: { + type: "message", + id: "item_final", + role: "assistant", + phase: "final_answer", + content: [], + }, + }); + manager.simulateEvent({ + type: "response.output_text.delta", + item_id: "item_final", + output_index: 1, + content_index: 0, + delta: "Done.", + }); + manager.simulateEvent({ + type: "response.completed", + response: { + id: "resp_phase_stream", + object: "response", + created_at: Date.now(), + status: "completed", + model: "gpt-5.2", + output: [ + { + type: "message", + id: "item_commentary", + role: "assistant", + phase: "commentary", + content: [{ type: "output_text", text: "Working..." }], + }, + { + type: "message", + id: "item_final", + role: "assistant", + phase: "final_answer", + content: [{ type: "output_text", text: "Done." }], + }, + ], + usage: { input_tokens: 100, output_tokens: 50, total_tokens: 150 }, + }, + }); + + await done; + + const deltas = events.filter((event) => event.type === "text_delta"); + expect(deltas).toHaveLength(3); + expect(deltas[0]).toMatchObject({ delta: "Working" }); + expect(deltas[0]?.partial?.phase).toBe("commentary"); + expect(deltas[0]?.partial?.content).toEqual([ + { + type: "text", + text: "Working", + textSignature: JSON.stringify({ v: 1, id: "item_commentary", phase: "commentary" }), + }, + ]); + expect(deltas[1]).toMatchObject({ delta: "..." }); + expect(deltas[1]?.partial?.phase).toBe("commentary"); + expect(deltas[1]?.partial?.content).toEqual([ + { + type: "text", + text: "Working...", + textSignature: JSON.stringify({ v: 1, id: "item_commentary", phase: "commentary" }), + }, + ]); + expect(deltas[2]).toMatchObject({ delta: "Done." }); + expect(deltas[2]?.partial?.phase).toBe("final_answer"); + expect(deltas[2]?.partial?.content).toEqual([ + { + type: "text", + text: "Done.", + textSignature: JSON.stringify({ v: 1, id: "item_final", phase: "final_answer" }), + }, + ]); + }); + + it("degrades safely when a text delta arrives before its item mapping", async () => { + const streamFn = createOpenAIWebSocketStreamFn("sk-test", "sess-phase-late-map"); + const stream = streamFn( + modelStub as Parameters[0], + contextStub as Parameters[1], + ); + + const events: Array<{ + type?: string; + delta?: string; + partial?: { phase?: string; content?: unknown[] }; + }> = []; + const done = (async () => { + for await (const ev of await resolveStream(stream)) { + events.push(ev as (typeof events)[number]); + } + })(); + + await new Promise((r) => setImmediate(r)); + const manager = MockManager.lastInstance!; + manager.simulateEvent({ + type: "response.output_text.delta", + item_id: "item_late", + output_index: 0, + content_index: 0, + delta: "Working", + }); + manager.simulateEvent({ + type: "response.output_item.added", + output_index: 0, + item: { + type: "message", + id: "item_late", + role: "assistant", + phase: "commentary", + content: [], + }, + }); + manager.simulateEvent({ + type: "response.output_text.delta", + item_id: "item_late", + output_index: 0, + content_index: 0, + delta: "...", + }); + manager.simulateEvent({ + type: "response.completed", + response: { + id: "resp_phase_late_map", + object: "response", + created_at: Date.now(), + status: "completed", + model: "gpt-5.2", + output: [ + { + type: "message", + id: "item_late", + role: "assistant", + phase: "commentary", + content: [{ type: "output_text", text: "Working..." }], + }, + ], + usage: { input_tokens: 100, output_tokens: 50, total_tokens: 150 }, + }, + }); + + await done; + + const deltas = events.filter((event) => event.type === "text_delta"); + expect(deltas).toHaveLength(2); + expect(deltas[0]).toMatchObject({ delta: "Working" }); + expect(deltas[0]?.partial?.phase).toBeUndefined(); + expect(deltas[0]?.partial?.content).toEqual([ + { + type: "text", + text: "Working", + textSignature: JSON.stringify({ v: 1, id: "item_late" }), + }, + ]); + expect(deltas[1]).toMatchObject({ delta: "..." }); + expect(deltas[1]?.partial?.phase).toBe("commentary"); + expect(deltas[1]?.partial?.content).toEqual([ + { + type: "text", + text: "Working...", + textSignature: JSON.stringify({ v: 1, id: "item_late", phase: "commentary" }), + }, + ]); + }); + it("falls back to HTTP when WebSocket connect fails (session pre-broken via flag)", async () => { // Set the class-level flag BEFORE calling streamFn so the new instance // fails on connect(). We patch the static default via MockManager directly. diff --git a/src/agents/openai-ws-stream.ts b/src/agents/openai-ws-stream.ts index 8bceeb2634b..78904ac06a6 100644 --- a/src/agents/openai-ws-stream.ts +++ b/src/agents/openai-ws-stream.ts @@ -39,6 +39,7 @@ import { getOpenAIWebSocketErrorDetails, OpenAIWebSocketManager, type FunctionToolDefinition, + type OpenAIResponsesAssistantPhase, type OpenAIWebSocketManagerOptions, } from "./openai-ws-connection.js"; import { @@ -86,6 +87,23 @@ type OpenAIWsStreamDeps = { streamSimple: typeof piAi.streamSimple; }; +type AssistantMessageWithPhase = AssistantMessage & { phase?: OpenAIResponsesAssistantPhase }; + +function normalizeAssistantPhase(value: unknown): OpenAIResponsesAssistantPhase | undefined { + return value === "commentary" || value === "final_answer" ? value : undefined; +} + +function encodeAssistantTextSignature(params: { + id: string; + phase?: OpenAIResponsesAssistantPhase; +}): string { + return JSON.stringify({ + v: 1, + id: params.id, + ...(params.phase ? { phase: params.phase } : {}), + }); +} + const defaultOpenAIWsStreamDeps: OpenAIWsStreamDeps = { createManager: (options) => new OpenAIWebSocketManager(options), createHttpFallbackStreamFn: (model) => createBoundaryAwareStreamFnForModel(model), @@ -888,12 +906,53 @@ export function createOpenAIWebSocketStreamFn( emittedStart = true; } + const outputItemPhaseById = new Map(); + const emitTextDelta = (params: { + fullText: string; + deltaText: string; + itemId?: string; + contentIndex?: number; + }) => { + const resolvedItemId = params.itemId; + const contentIndex = params.contentIndex ?? 0; + const itemPhase = resolvedItemId + ? normalizeAssistantPhase(outputItemPhaseById.get(resolvedItemId)) + : undefined; + const partialBase = buildAssistantMessageWithZeroUsage({ + model, + content: [ + { + type: "text", + text: params.fullText, + ...(resolvedItemId + ? { + textSignature: encodeAssistantTextSignature({ + id: resolvedItemId, + ...(itemPhase ? { phase: itemPhase } : {}), + }), + } + : {}), + }, + ], + stopReason: "stop", + }); + const partialMsg: AssistantMessageWithPhase = itemPhase + ? ({ ...partialBase, phase: itemPhase } as AssistantMessageWithPhase) + : partialBase; + eventStream.push({ + type: "text_delta", + contentIndex, + delta: params.deltaText, + partial: partialMsg, + }); + }; const capturedContextLength = context.messages.length; let sawWsOutput = false; try { await new Promise((resolve, reject) => { const abortHandler = () => { + outputItemPhaseById.clear(); cleanup(); reject(new Error("aborted")); }; @@ -904,6 +963,7 @@ export function createOpenAIWebSocketStreamFn( signal?.addEventListener("abort", abortHandler, { once: true }); const closeHandler = (code: number, reason: string) => { + outputItemPhaseById.clear(); cleanup(); const closeInfo = session.manager.lastCloseInfo; reject( @@ -926,6 +986,10 @@ export function createOpenAIWebSocketStreamFn( unsubscribe(); }; + const outputTextByPart = new Map(); + const getOutputTextKey = (itemId: string, contentIndex: number) => + `${itemId}:${contentIndex}`; + const unsubscribe = session.manager.onMessage((event) => { if ( event.type === "response.output_item.added" || @@ -940,7 +1004,54 @@ export function createOpenAIWebSocketStreamFn( sawWsOutput = true; } + if ( + event.type === "response.output_item.added" || + event.type === "response.output_item.done" + ) { + if (typeof event.item.id === "string") { + const itemPhase = + event.item.type === "message" + ? normalizeAssistantPhase((event.item as { phase?: unknown }).phase) + : undefined; + outputItemPhaseById.set(event.item.id, itemPhase); + } + return; + } + + if (event.type === "response.output_text.delta") { + const key = getOutputTextKey(event.item_id, event.content_index); + const nextText = `${outputTextByPart.get(key) ?? ""}${event.delta}`; + outputTextByPart.set(key, nextText); + emitTextDelta({ + fullText: nextText, + deltaText: event.delta, + itemId: event.item_id, + contentIndex: event.content_index, + }); + return; + } + + if (event.type === "response.output_text.done") { + const key = getOutputTextKey(event.item_id, event.content_index); + const previousText = outputTextByPart.get(key) ?? ""; + if (event.text && event.text !== previousText) { + outputTextByPart.set(key, event.text); + const deltaText = event.text.startsWith(previousText) + ? event.text.slice(previousText.length) + : event.text; + emitTextDelta({ + fullText: event.text, + deltaText, + itemId: event.item_id, + contentIndex: event.content_index, + }); + } + return; + } + if (event.type === "response.completed") { + outputItemPhaseById.clear(); + outputTextByPart.clear(); cleanup(); session.lastContextLength = capturedContextLength; const assistantMsg = buildAssistantMessageFromResponse(event.response, { @@ -953,6 +1064,8 @@ export function createOpenAIWebSocketStreamFn( eventStream.push({ type: "done", reason, message: assistantMsg }); resolve(); } else if (event.type === "response.failed") { + outputItemPhaseById.clear(); + outputTextByPart.clear(); cleanup(); reject( new OpenAIWebSocketRuntimeError( @@ -964,6 +1077,8 @@ export function createOpenAIWebSocketStreamFn( ), ); } else if (event.type === "error") { + outputItemPhaseById.clear(); + outputTextByPart.clear(); cleanup(); reject( new OpenAIWebSocketRuntimeError( @@ -974,18 +1089,6 @@ export function createOpenAIWebSocketStreamFn( }, ), ); - } else if (event.type === "response.output_text.delta") { - const partialMsg: AssistantMessage = buildAssistantMessageWithZeroUsage({ - model, - content: [{ type: "text", text: event.delta }], - stopReason: "stop", - }); - eventStream.push({ - type: "text_delta", - contentIndex: 0, - delta: event.delta, - partial: partialMsg, - }); } }); }); diff --git a/src/agents/pi-embedded-subscribe.handlers.messages.test.ts b/src/agents/pi-embedded-subscribe.handlers.messages.test.ts index 47481110a83..dc44833ddb7 100644 --- a/src/agents/pi-embedded-subscribe.handlers.messages.test.ts +++ b/src/agents/pi-embedded-subscribe.handlers.messages.test.ts @@ -254,6 +254,109 @@ describe("handleMessageUpdate", () => { expect(ctx.state.blockBuffer).toBe(""); }); + it("replaces commentary preview text when a final_answer partial arrives", () => { + const onAgentEvent = vi.fn(); + const ctx = { + params: { + runId: "run-1", + session: { id: "session-1" }, + onAgentEvent, + }, + state: { + deterministicApprovalPromptSent: false, + reasoningStreamOpen: false, + streamReasoning: false, + deltaBuffer: "", + blockBuffer: "", + partialBlockState: { + thinking: false, + final: false, + inlineCode: createInlineCodeState(), + }, + lastStreamedAssistant: undefined, + lastStreamedAssistantCleaned: undefined, + emittedAssistantUpdate: false, + shouldEmitPartialReplies: false, + blockReplyBreak: "text_end", + }, + log: { debug: vi.fn() }, + noteLastAssistant: vi.fn(), + stripBlockTags: (text: string) => text, + consumePartialReplyDirectives: vi.fn(() => null), + emitReasoningStream: vi.fn(), + flushBlockReplyBuffer: vi.fn(), + } as unknown as EmbeddedPiSubscribeContext; + + handleMessageUpdate(ctx, { + type: "message_update", + message: { role: "assistant", content: [] }, + assistantMessageEvent: { + type: "text_delta", + delta: "Working...", + partial: { + role: "assistant", + content: [ + { + type: "text", + text: "Working...", + textSignature: JSON.stringify({ v: 1, id: "item_commentary", phase: "commentary" }), + }, + ], + phase: "commentary", + stopReason: "stop", + api: "openai-responses", + provider: "openai", + model: "gpt-5.2", + usage: {}, + timestamp: 0, + }, + }, + } as never); + + handleMessageUpdate(ctx, { + type: "message_update", + message: { role: "assistant", content: [] }, + assistantMessageEvent: { + type: "text_delta", + delta: "Done.", + partial: { + role: "assistant", + content: [ + { + type: "text", + text: "Done.", + textSignature: JSON.stringify({ v: 1, id: "item_final", phase: "final_answer" }), + }, + ], + phase: "final_answer", + stopReason: "stop", + api: "openai-responses", + provider: "openai", + model: "gpt-5.2", + usage: {}, + timestamp: 0, + }, + }, + } as never); + + expect(onAgentEvent).toHaveBeenCalledTimes(2); + expect(onAgentEvent.mock.calls[0]?.[0]).toMatchObject({ + stream: "assistant", + data: { + text: "Working...", + delta: "Working...", + }, + }); + expect(onAgentEvent.mock.calls[1]?.[0]).toMatchObject({ + stream: "assistant", + data: { + text: "Done.", + delta: "", + replace: true, + }, + }); + }); + it("contains synchronous text_end flush failures", async () => { const debug = vi.fn(); const ctx = { @@ -417,4 +520,78 @@ describe("handleMessageEnd", () => { expect(emitBlockReply).not.toHaveBeenCalled(); expect(finalizeAssistantTexts).not.toHaveBeenCalled(); }); + + it("emits a replacement final assistant event when final_answer appears only at message_end", () => { + const onAgentEvent = vi.fn(); + const ctx = { + params: { + runId: "run-1", + session: { id: "session-1" }, + onAgentEvent, + }, + state: { + deterministicApprovalPromptSent: false, + messagingToolSentTexts: [], + messagingToolSentTextsNormalized: [], + includeReasoning: false, + streamReasoning: false, + emittedAssistantUpdate: true, + lastStreamedAssistantCleaned: "Working...", + assistantTexts: [], + assistantTextBaseline: 0, + blockReplyBreak: "text_end", + lastReasoningSent: undefined, + reasoningStreamOpen: false, + deltaBuffer: "", + blockBuffer: "", + blockState: { + thinking: false, + final: false, + inlineCode: createInlineCodeState(), + }, + }, + log: { debug: vi.fn() }, + noteLastAssistant: vi.fn(), + recordAssistantUsage: vi.fn(), + stripBlockTags: (text: string) => text, + finalizeAssistantTexts: vi.fn(), + emitReasoningStream: vi.fn(), + blockChunker: null, + } as unknown as EmbeddedPiSubscribeContext; + + void handleMessageEnd(ctx, { + type: "message_end", + message: { + role: "assistant", + content: [ + { + type: "text", + text: "Working...", + textSignature: JSON.stringify({ v: 1, id: "item_commentary", phase: "commentary" }), + }, + { + type: "text", + text: "Done.", + textSignature: JSON.stringify({ v: 1, id: "item_final", phase: "final_answer" }), + }, + ], + stopReason: "stop", + api: "openai-responses", + provider: "openai", + model: "gpt-5.2", + usage: {}, + timestamp: 0, + }, + } as never); + + expect(onAgentEvent).toHaveBeenCalledTimes(1); + expect(onAgentEvent.mock.calls[0]?.[0]).toMatchObject({ + stream: "assistant", + data: { + text: "Done.", + delta: "", + replace: true, + }, + }); + }); }); diff --git a/src/agents/pi-embedded-subscribe.handlers.messages.ts b/src/agents/pi-embedded-subscribe.handlers.messages.ts index 579bec00113..947c7d14c46 100644 --- a/src/agents/pi-embedded-subscribe.handlers.messages.ts +++ b/src/agents/pi-embedded-subscribe.handlers.messages.ts @@ -1,4 +1,5 @@ import type { AgentEvent, AgentMessage } from "@mariozechner/pi-agent-core"; +import type { AssistantMessage } from "@mariozechner/pi-ai"; import { resolveSendableOutboundReplyParts } from "openclaw/plugin-sdk/reply-payload"; import { parseReplyDirectives } from "../auto-reply/reply/reply-directives.js"; import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../auto-reply/tokens.js"; @@ -18,6 +19,7 @@ import { appendRawStream } from "./pi-embedded-subscribe.raw-stream.js"; import { extractAssistantText, extractAssistantThinking, + extractAssistantVisibleText, extractThinkingFromTaggedStream, extractThinkingFromTaggedText, formatReasoningMessage, @@ -41,6 +43,27 @@ const stripTrailingDirective = (text: string): string => { return text.slice(0, openIndex); }; +type AssistantPhase = "commentary" | "final_answer"; + +const normalizeAssistantPhase = (value: unknown): AssistantPhase | undefined => { + return value === "commentary" || value === "final_answer" ? value : undefined; +}; + +const getAssistantTextSignaturePhase = (value: unknown): AssistantPhase | undefined => { + if (typeof value !== "string" || value.trim().length === 0) { + return undefined; + } + if (!value.startsWith("{")) { + return undefined; + } + try { + const parsed = JSON.parse(value) as { phase?: unknown; v?: unknown }; + return parsed.v === 1 ? normalizeAssistantPhase(parsed.phase) : undefined; + } catch { + return undefined; + } +}; + const coerceText = (value: unknown): string => { if (typeof value === "string") { return value; @@ -322,12 +345,47 @@ export function handleMessageUpdate( } } + const partialAssistant = + assistantRecord?.partial && typeof assistantRecord.partial === "object" + ? (assistantRecord.partial as AssistantMessage) + : msg; + const phaseAwareVisibleText = coerceText(extractAssistantVisibleText(partialAssistant)).trim(); + const partialVisiblePhase = normalizeAssistantPhase( + (partialAssistant as { phase?: unknown }).phase, + ); + const explicitStructuredPartialPhases = new Set(); + if (Array.isArray(partialAssistant.content)) { + for (const block of partialAssistant.content) { + if (!block || typeof block !== "object") { + continue; + } + const record = block as { type?: unknown; textSignature?: unknown }; + if (record.type !== "text") { + continue; + } + const signaturePhase = getAssistantTextSignaturePhase(record.textSignature); + if (signaturePhase) { + explicitStructuredPartialPhases.add(signaturePhase); + } + } + } + const structuredPartialPhase = + explicitStructuredPartialPhases.size === 1 + ? [...explicitStructuredPartialPhases][0] + : undefined; + const shouldUsePhaseAwareBlockReply = Boolean(partialVisiblePhase || structuredPartialPhase); + const shouldDeferTextEndBlockReply = + shouldUsePhaseAwareBlockReply && + (partialVisiblePhase ?? structuredPartialPhase) !== "final_answer"; + if (chunk) { ctx.state.deltaBuffer += chunk; - if (ctx.blockChunker) { - ctx.blockChunker.append(chunk); - } else { - ctx.state.blockBuffer += chunk; + if (!shouldUsePhaseAwareBlockReply) { + if (ctx.blockChunker) { + ctx.blockChunker.append(chunk); + } else { + ctx.state.blockBuffer += chunk; + } } } @@ -335,14 +393,15 @@ export function handleMessageUpdate( // Handle partial tags: stream whatever reasoning is visible so far. ctx.emitReasoningStream(extractThinkingFromTaggedStream(ctx.state.deltaBuffer)); } - - const next = ctx - .stripBlockTags(ctx.state.deltaBuffer, { - thinking: false, - final: false, - inlineCode: createInlineCodeState(), - }) - .trim(); + const next = + phaseAwareVisibleText || + ctx + .stripBlockTags(ctx.state.deltaBuffer, { + thinking: false, + final: false, + inlineCode: createInlineCodeState(), + }) + .trim(); if (next) { const wasThinking = ctx.state.partialBlockState.thinking; const visibleDelta = chunk ? ctx.stripBlockTags(chunk, ctx.state.partialBlockState) : ""; @@ -373,6 +432,35 @@ export function handleMessageUpdate( : Boolean(deltaText || hasMedia || hasAudio); } + if (shouldUsePhaseAwareBlockReply) { + if (replace) { + ctx.state.blockBuffer = ""; + ctx.blockChunker?.reset(); + } + const blockReplyChunk = replace ? cleanedText : deltaText; + if (!shouldDeferTextEndBlockReply && blockReplyChunk) { + if (ctx.blockChunker) { + ctx.blockChunker.append(blockReplyChunk); + } else { + ctx.state.blockBuffer += blockReplyChunk; + } + } + + if ( + !shouldDeferTextEndBlockReply && + evtType === "text_end" && + !ctx.state.lastBlockReplyText && + cleanedText + ) { + if (ctx.blockChunker) { + ctx.blockChunker.reset(); + ctx.blockChunker.append(cleanedText); + } else { + ctx.state.blockBuffer = cleanedText; + } + } + } + ctx.state.lastStreamedAssistant = next; ctx.state.lastStreamedAssistantCleaned = cleanedText; @@ -405,6 +493,7 @@ export function handleMessageUpdate( if ( !ctx.params.silentExpected && + !shouldDeferTextEndBlockReply && ctx.params.onBlockReply && ctx.blockChunking && ctx.state.blockReplyBreak === "text_end" @@ -414,6 +503,7 @@ export function handleMessageUpdate( if ( !ctx.params.silentExpected && + !shouldDeferTextEndBlockReply && evtType === "text_end" && ctx.state.blockReplyBreak === "text_end" ) { @@ -445,6 +535,7 @@ export function handleMessageEnd( promoteThinkingTagsToBlocks(assistantMessage); const rawText = coerceText(extractAssistantText(assistantMessage)); + const rawVisibleText = coerceText(extractAssistantVisibleText(assistantMessage)); appendRawStream({ ts: Date.now(), event: "assistant_message_end", @@ -455,7 +546,7 @@ export function handleMessageEnd( }); const text = resolveSilentReplyFallbackText({ - text: ctx.stripBlockTags(rawText, { thinking: false, final: false }), + text: ctx.stripBlockTags(rawVisibleText || rawText, { thinking: false, final: false }), messagingToolSentTexts: ctx.state.messagingToolSentTexts, }); const rawThinking = @@ -497,14 +588,29 @@ export function handleMessageEnd( } } + const previousStreamedText = ctx.state.lastStreamedAssistantCleaned ?? ""; + const shouldReplaceFinalStream = Boolean( + previousStreamedText && cleanedText && !cleanedText.startsWith(previousStreamedText), + ); + const didTextChangeWithinCurrentMessage = Boolean( + previousStreamedText && cleanedText !== previousStreamedText, + ); + const finalStreamDelta = shouldReplaceFinalStream + ? "" + : cleanedText.slice(previousStreamedText.length); + if ( !ctx.params.silentExpected && - !ctx.state.emittedAssistantUpdate && - (cleanedText || hasMedia) + (cleanedText || hasMedia) && + (!ctx.state.emittedAssistantUpdate || + shouldReplaceFinalStream || + didTextChangeWithinCurrentMessage || + hasMedia) ) { const data = buildAssistantStreamData({ text: cleanedText, - delta: cleanedText, + delta: finalStreamDelta, + replace: shouldReplaceFinalStream, mediaUrls, }); emitAgentEvent({ @@ -517,6 +623,7 @@ export function handleMessageEnd( data, }); ctx.state.emittedAssistantUpdate = true; + ctx.state.lastStreamedAssistantCleaned = cleanedText; } const silentExpectedWithoutSentinel = @@ -579,14 +686,19 @@ export function handleMessageEnd( } }; + const hasBufferedBlockReply = ctx.blockChunker + ? ctx.blockChunker.hasBuffered() + : ctx.state.blockBuffer.length > 0; + if ( !ctx.params.silentExpected && - (ctx.state.blockReplyBreak === "message_end" || - (ctx.blockChunker ? ctx.blockChunker.hasBuffered() : ctx.state.blockBuffer.length > 0)) && text && - onBlockReply + onBlockReply && + (ctx.state.blockReplyBreak === "message_end" || + hasBufferedBlockReply || + text !== ctx.state.lastBlockReplyText) ) { - if (ctx.blockChunker?.hasBuffered()) { + if (hasBufferedBlockReply && ctx.blockChunker?.hasBuffered()) { ctx.blockChunker.drain({ force: true, emit: ctx.emitBlockChunk }); ctx.blockChunker.reset(); } else if (text !== ctx.state.lastBlockReplyText) { diff --git a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.does-not-append-text-end-content-is.test.ts b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.does-not-append-text-end-content-is.test.ts index efe15240ded..008e01c1153 100644 --- a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.does-not-append-text-end-content-is.test.ts +++ b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.does-not-append-text-end-content-is.test.ts @@ -45,6 +45,7 @@ describe("subscribeEmbeddedPiSession", () => { emitDelta(delta); emitTextEnd(content); + await Promise.resolve(); await vi.waitFor(() => { expect(onBlockReply).toHaveBeenCalledTimes(1); diff --git a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.does-not-duplicate-text-end-repeats-full.test.ts b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.does-not-duplicate-text-end-repeats-full.test.ts index 182ab95ba08..d7d4add7929 100644 --- a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.does-not-duplicate-text-end-repeats-full.test.ts +++ b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.does-not-duplicate-text-end-repeats-full.test.ts @@ -12,13 +12,14 @@ describe("subscribeEmbeddedPiSession", () => { emitAssistantTextDelta({ emit, delta: "Good morning!" }); emitAssistantTextEnd({ emit, content: "Good morning!" }); + await Promise.resolve(); await vi.waitFor(() => { expect(onBlockReply).toHaveBeenCalledTimes(1); }); expect(subscription.assistantTexts).toEqual(["Good morning!"]); }); - it("does not duplicate block chunks when text_end repeats full content", () => { + it("does not duplicate block chunks when text_end repeats full content", async () => { const onBlockReply = vi.fn(); const { emit } = createTextEndBlockReplyHarness({ onBlockReply, @@ -32,11 +33,13 @@ describe("subscribeEmbeddedPiSession", () => { const fullText = "First line\nSecond line\nThird line\n"; emitAssistantTextDelta({ emit, delta: fullText }); + await Promise.resolve(); const callsAfterDelta = onBlockReply.mock.calls.length; expect(callsAfterDelta).toBeGreaterThan(0); emitAssistantTextEnd({ emit, content: fullText }); + await Promise.resolve(); expect(onBlockReply).toHaveBeenCalledTimes(callsAfterDelta); }); diff --git a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.emits-block-replies-text-end-does-not.test.ts b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.emits-block-replies-text-end-does-not.test.ts index 915685cf175..f2a8fc643b7 100644 --- a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.emits-block-replies-text-end-does-not.test.ts +++ b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.emits-block-replies-text-end-does-not.test.ts @@ -13,6 +13,7 @@ describe("subscribeEmbeddedPiSession", () => { emitAssistantTextDelta({ emit, delta: "Hello block" }); emitAssistantTextEnd({ emit }); + await Promise.resolve(); await vi.waitFor(() => { expect(onBlockReply).toHaveBeenCalledTimes(1); @@ -31,7 +32,8 @@ describe("subscribeEmbeddedPiSession", () => { expect(onBlockReply).toHaveBeenCalledTimes(1); expect(subscription.assistantTexts).toEqual(["Hello block"]); }); - it("does not duplicate when message_end flushes and a late text_end arrives", () => { + + it("does not duplicate when message_end flushes and a late text_end arrives", async () => { const onBlockReply = vi.fn(); const { emit, subscription } = createTextEndBlockReplyHarness({ onBlockReply }); @@ -52,8 +54,473 @@ describe("subscribeEmbeddedPiSession", () => { // Some providers can still emit a late text_end; this must not re-emit. emitAssistantTextEnd({ emit, content: "Hello block" }); + await Promise.resolve(); expect(onBlockReply).toHaveBeenCalledTimes(1); expect(subscription.assistantTexts).toEqual(["Hello block"]); }); + + it("emits legacy structured partials on text_end without waiting for message_end", async () => { + const onBlockReply = vi.fn(); + const { emit, subscription } = createTextEndBlockReplyHarness({ onBlockReply }); + + emit({ type: "message_start", message: { role: "assistant" } }); + emit({ + type: "message_update", + message: { role: "assistant", content: [] }, + assistantMessageEvent: { + type: "text_delta", + delta: "Legacy answer", + partial: { + role: "assistant", + content: [ + { + type: "text", + text: "Legacy answer", + textSignature: JSON.stringify({ v: 1, id: "item_legacy" }), + }, + ], + stopReason: "stop", + api: "openai-responses", + provider: "openai", + model: "gpt-5.2", + usage: {}, + timestamp: 0, + }, + }, + }); + emit({ + type: "message_update", + message: { role: "assistant", content: [] }, + assistantMessageEvent: { + type: "text_end", + content: "Legacy answer", + partial: { + role: "assistant", + content: [ + { + type: "text", + text: "Legacy answer", + textSignature: JSON.stringify({ v: 1, id: "item_legacy" }), + }, + ], + stopReason: "stop", + api: "openai-responses", + provider: "openai", + model: "gpt-5.2", + usage: {}, + timestamp: 0, + }, + }, + }); + await Promise.resolve(); + + expect(onBlockReply).toHaveBeenCalledTimes(1); + expect(onBlockReply.mock.calls[0]?.[0]?.text).toBe("Legacy answer"); + expect(subscription.assistantTexts).toEqual(["Legacy answer"]); + + emit({ + type: "message_end", + message: { + role: "assistant", + content: [{ type: "text", text: "Legacy answer" }], + } as AssistantMessage, + }); + + expect(onBlockReply).toHaveBeenCalledTimes(1); + expect(subscription.assistantTexts).toEqual(["Legacy answer"]); + }); + + it("suppresses commentary block replies until a final answer is available", async () => { + const onBlockReply = vi.fn(); + const { emit, subscription } = createTextEndBlockReplyHarness({ onBlockReply }); + + emit({ type: "message_start", message: { role: "assistant" } }); + emit({ + type: "message_update", + message: { role: "assistant", content: [] }, + assistantMessageEvent: { + type: "text_delta", + delta: "Working...", + partial: { + role: "assistant", + content: [ + { + type: "text", + text: "Working...", + textSignature: JSON.stringify({ v: 1, id: "item_commentary", phase: "commentary" }), + }, + ], + phase: "commentary", + stopReason: "stop", + api: "openai-responses", + provider: "openai", + model: "gpt-5.2", + usage: {}, + timestamp: 0, + }, + }, + }); + emit({ + type: "message_update", + message: { role: "assistant", content: [] }, + assistantMessageEvent: { + type: "text_end", + content: "Working...", + partial: { + role: "assistant", + content: [ + { + type: "text", + text: "Working...", + textSignature: JSON.stringify({ v: 1, id: "item_commentary", phase: "commentary" }), + }, + ], + phase: "commentary", + stopReason: "stop", + api: "openai-responses", + provider: "openai", + model: "gpt-5.2", + usage: {}, + timestamp: 0, + }, + }, + }); + await Promise.resolve(); + + expect(onBlockReply).not.toHaveBeenCalled(); + expect(subscription.assistantTexts).toEqual([]); + + emit({ + type: "message_update", + message: { role: "assistant", content: [] }, + assistantMessageEvent: { + type: "text_delta", + delta: "Done.", + partial: { + role: "assistant", + content: [ + { + type: "text", + text: "Done.", + textSignature: JSON.stringify({ v: 1, id: "item_final", phase: "final_answer" }), + }, + ], + phase: "final_answer", + stopReason: "stop", + api: "openai-responses", + provider: "openai", + model: "gpt-5.2", + usage: {}, + timestamp: 0, + }, + }, + }); + emit({ + type: "message_update", + message: { role: "assistant", content: [] }, + assistantMessageEvent: { + type: "text_end", + content: "Done.", + partial: { + role: "assistant", + content: [ + { + type: "text", + text: "Done.", + textSignature: JSON.stringify({ v: 1, id: "item_final", phase: "final_answer" }), + }, + ], + phase: "final_answer", + stopReason: "stop", + api: "openai-responses", + provider: "openai", + model: "gpt-5.2", + usage: {}, + timestamp: 0, + }, + }, + }); + await Promise.resolve(); + + emit({ + type: "message_end", + message: { + role: "assistant", + content: [ + { + type: "text", + text: "Working...", + textSignature: JSON.stringify({ v: 1, id: "item_commentary", phase: "commentary" }), + }, + { + type: "text", + text: "Done.", + textSignature: JSON.stringify({ v: 1, id: "item_final", phase: "final_answer" }), + }, + ], + } as AssistantMessage, + }); + + expect(onBlockReply).toHaveBeenCalledTimes(1); + expect(onBlockReply.mock.calls[0]?.[0]?.text).toBe("Done."); + expect(subscription.assistantTexts).toEqual(["Done."]); + }); + + it("emits the full final answer on text_end when it extends suppressed commentary", async () => { + const onBlockReply = vi.fn(); + const { emit, subscription } = createTextEndBlockReplyHarness({ onBlockReply }); + + emit({ type: "message_start", message: { role: "assistant" } }); + emit({ + type: "message_update", + message: { role: "assistant", content: [] }, + assistantMessageEvent: { + type: "text_delta", + delta: "Hello", + partial: { + role: "assistant", + content: [ + { + type: "text", + text: "Hello", + textSignature: JSON.stringify({ v: 1, id: "item_commentary", phase: "commentary" }), + }, + ], + phase: "commentary", + stopReason: "stop", + api: "openai-responses", + provider: "openai", + model: "gpt-5.2", + usage: {}, + timestamp: 0, + }, + }, + }); + emit({ + type: "message_update", + message: { role: "assistant", content: [] }, + assistantMessageEvent: { + type: "text_end", + content: "Hello", + partial: { + role: "assistant", + content: [ + { + type: "text", + text: "Hello", + textSignature: JSON.stringify({ v: 1, id: "item_commentary", phase: "commentary" }), + }, + ], + phase: "commentary", + stopReason: "stop", + api: "openai-responses", + provider: "openai", + model: "gpt-5.2", + usage: {}, + timestamp: 0, + }, + }, + }); + await Promise.resolve(); + + expect(onBlockReply).not.toHaveBeenCalled(); + + emit({ + type: "message_update", + message: { role: "assistant", content: [] }, + assistantMessageEvent: { + type: "text_delta", + delta: " world", + partial: { + role: "assistant", + content: [ + { + type: "text", + text: "Hello world", + textSignature: JSON.stringify({ v: 1, id: "item_final", phase: "final_answer" }), + }, + ], + phase: "final_answer", + stopReason: "stop", + api: "openai-responses", + provider: "openai", + model: "gpt-5.2", + usage: {}, + timestamp: 0, + }, + }, + }); + emit({ + type: "message_update", + message: { role: "assistant", content: [] }, + assistantMessageEvent: { + type: "text_end", + content: "Hello world", + partial: { + role: "assistant", + content: [ + { + type: "text", + text: "Hello world", + textSignature: JSON.stringify({ v: 1, id: "item_final", phase: "final_answer" }), + }, + ], + phase: "final_answer", + stopReason: "stop", + api: "openai-responses", + provider: "openai", + model: "gpt-5.2", + usage: {}, + timestamp: 0, + }, + }, + }); + await Promise.resolve(); + + expect(onBlockReply).toHaveBeenCalledTimes(1); + expect(onBlockReply.mock.calls[0]?.[0]?.text).toBe("Hello world"); + expect(subscription.assistantTexts).toEqual(["Hello world"]); + }); + + it("does not defer final_answer text_end when phase exists only in textSignature", async () => { + const onBlockReply = vi.fn(); + const { emit, subscription } = createTextEndBlockReplyHarness({ onBlockReply }); + + emit({ type: "message_start", message: { role: "assistant" } }); + emit({ + type: "message_update", + message: { role: "assistant", content: [] }, + assistantMessageEvent: { + type: "text_delta", + delta: "Done.", + partial: { + role: "assistant", + content: [ + { + type: "text", + text: "Done.", + textSignature: JSON.stringify({ v: 1, id: "item_final", phase: "final_answer" }), + }, + ], + stopReason: "stop", + api: "openai-responses", + provider: "openai", + model: "gpt-5.2", + usage: {}, + timestamp: 0, + }, + }, + }); + emit({ + type: "message_update", + message: { role: "assistant", content: [] }, + assistantMessageEvent: { + type: "text_end", + content: "Done.", + partial: { + role: "assistant", + content: [ + { + type: "text", + text: "Done.", + textSignature: JSON.stringify({ v: 1, id: "item_final", phase: "final_answer" }), + }, + ], + stopReason: "stop", + api: "openai-responses", + provider: "openai", + model: "gpt-5.2", + usage: {}, + timestamp: 0, + }, + }, + }); + await Promise.resolve(); + + expect(onBlockReply).toHaveBeenCalledTimes(1); + expect(onBlockReply.mock.calls[0]?.[0]?.text).toBe("Done."); + expect(subscription.assistantTexts).toEqual(["Done."]); + }); + + it("emits the final answer at message_end when commentary was streamed first", async () => { + const onBlockReply = vi.fn(); + const { emit, subscription } = createTextEndBlockReplyHarness({ onBlockReply }); + + emit({ type: "message_start", message: { role: "assistant" } }); + emit({ + type: "message_update", + message: { role: "assistant", content: [] }, + assistantMessageEvent: { + type: "text_delta", + delta: "Working...", + partial: { + role: "assistant", + content: [ + { + type: "text", + text: "Working...", + textSignature: JSON.stringify({ v: 1, id: "item_commentary", phase: "commentary" }), + }, + ], + phase: "commentary", + stopReason: "stop", + api: "openai-responses", + provider: "openai", + model: "gpt-5.2", + usage: {}, + timestamp: 0, + }, + }, + }); + emit({ + type: "message_update", + message: { role: "assistant", content: [] }, + assistantMessageEvent: { + type: "text_end", + content: "Working...", + partial: { + role: "assistant", + content: [ + { + type: "text", + text: "Working...", + textSignature: JSON.stringify({ v: 1, id: "item_commentary", phase: "commentary" }), + }, + ], + phase: "commentary", + stopReason: "stop", + api: "openai-responses", + provider: "openai", + model: "gpt-5.2", + usage: {}, + timestamp: 0, + }, + }, + }); + await Promise.resolve(); + + emit({ + type: "message_end", + message: { + role: "assistant", + content: [ + { + type: "text", + text: "Working...", + textSignature: JSON.stringify({ v: 1, id: "item_commentary", phase: "commentary" }), + }, + { + type: "text", + text: "Done.", + textSignature: JSON.stringify({ v: 1, id: "item_final", phase: "final_answer" }), + }, + ], + } as AssistantMessage, + }); + + expect(onBlockReply).toHaveBeenCalledTimes(1); + expect(onBlockReply.mock.calls[0]?.[0]?.text).toBe("Done."); + expect(subscription.assistantTexts).toEqual(["Done."]); + }); }); diff --git a/src/agents/pi-embedded-utils.test.ts b/src/agents/pi-embedded-utils.test.ts index a7894452cec..1d3677b8411 100644 --- a/src/agents/pi-embedded-utils.test.ts +++ b/src/agents/pi-embedded-utils.test.ts @@ -2,6 +2,7 @@ import type { AssistantMessage } from "@mariozechner/pi-ai"; import { describe, expect, it } from "vitest"; import { extractAssistantText, + extractAssistantVisibleText, formatReasoningMessage, promoteThinkingTagsToBlocks, stripDowngradedToolCallText, @@ -9,7 +10,9 @@ import { function makeAssistantMessage( message: Omit & - Partial>, + Partial> & { + phase?: "commentary" | "final_answer"; + }, ): AssistantMessage { return { api: "responses", @@ -569,6 +572,78 @@ describe("stripDowngradedToolCallText", () => { }); }); +describe("extractAssistantVisibleText", () => { + it("prefers non-empty final_answer text over commentary", () => { + const msg = makeAssistantMessage({ + role: "assistant", + content: [ + { + type: "text", + text: "Working...", + textSignature: JSON.stringify({ v: 1, id: "item_commentary", phase: "commentary" }), + }, + { + type: "text", + text: "Done.", + textSignature: JSON.stringify({ v: 1, id: "item_final", phase: "final_answer" }), + }, + ], + timestamp: Date.now(), + }); + + expect(extractAssistantVisibleText(msg)).toBe("Done."); + }); + + it("falls back to commentary when final_answer is empty", () => { + const msg = makeAssistantMessage({ + role: "assistant", + content: [ + { + type: "text", + text: "Working...", + textSignature: JSON.stringify({ v: 1, id: "item_commentary", phase: "commentary" }), + }, + { + type: "text", + text: " ", + textSignature: JSON.stringify({ v: 1, id: "item_final", phase: "final_answer" }), + }, + ], + timestamp: Date.now(), + }); + + expect(extractAssistantVisibleText(msg)).toBe("Working..."); + }); + + it("falls back to legacy unphased text when phased text is absent", () => { + const msg = makeAssistantMessage({ + role: "assistant", + content: [{ type: "text", text: "Legacy answer" }], + timestamp: Date.now(), + }); + + expect(extractAssistantVisibleText(msg)).toBe("Legacy answer"); + }); + + it("does not pull unphased legacy text into final_answer extraction when phased blocks are present", () => { + const msg = makeAssistantMessage({ + role: "assistant", + phase: "final_answer", + content: [ + { type: "text", text: "Legacy." }, + { + type: "text", + text: "Done.", + textSignature: JSON.stringify({ v: 1, id: "item_final", phase: "final_answer" }), + }, + ], + timestamp: Date.now(), + }); + + expect(extractAssistantVisibleText(msg)).toBe("Done."); + }); +}); + describe("promoteThinkingTagsToBlocks", () => { it("does not crash on malformed null content entries", () => { const msg = makeAssistantMessage({ diff --git a/src/agents/pi-embedded-utils.ts b/src/agents/pi-embedded-utils.ts index 375df11654d..b60d0122e7d 100644 --- a/src/agents/pi-embedded-utils.ts +++ b/src/agents/pi-embedded-utils.ts @@ -233,13 +233,121 @@ export function stripThinkingTagsFromText(text: string): string { return stripReasoningTagsFromText(text, { mode: "strict", trim: "both" }); } +type AssistantPhase = "commentary" | "final_answer"; + +function normalizeAssistantPhase(value: unknown): AssistantPhase | undefined { + return value === "commentary" || value === "final_answer" ? value : undefined; +} + +function parseAssistantTextSignature( + value: unknown, +): { id?: string; phase?: AssistantPhase } | null { + if (typeof value !== "string" || value.trim().length === 0) { + return null; + } + if (!value.startsWith("{")) { + return { id: value }; + } + try { + const parsed = JSON.parse(value) as { id?: unknown; phase?: unknown; v?: unknown }; + if (parsed.v !== 1) { + return null; + } + return { + ...(typeof parsed.id === "string" ? { id: parsed.id } : {}), + ...(normalizeAssistantPhase(parsed.phase) + ? { phase: normalizeAssistantPhase(parsed.phase) } + : {}), + }; + } catch { + return null; + } +} + +function sanitizeAssistantText(text: string): string { + return stripThinkingTagsFromText( + stripDowngradedToolCallText(stripModelSpecialTokens(stripMinimaxToolCallXml(text))), + ).trim(); +} + +function finalizeAssistantExtraction(msg: AssistantMessage, extracted: string): string { + const errorContext = msg.stopReason === "error"; + return sanitizeUserFacingText(extracted, { errorContext }); +} + +function extractAssistantTextForPhase(msg: AssistantMessage, phase?: AssistantPhase): string { + const messagePhase = normalizeAssistantPhase((msg as { phase?: unknown }).phase); + const shouldIncludeContent = (resolvedPhase?: AssistantPhase) => { + if (phase) { + return resolvedPhase === phase; + } + return resolvedPhase === undefined; + }; + + if (typeof msg.content === "string") { + return shouldIncludeContent(messagePhase) + ? finalizeAssistantExtraction(msg, sanitizeAssistantText(msg.content)) + : ""; + } + + if (!Array.isArray(msg.content)) { + return ""; + } + + const hasExplicitPhasedTextBlocks = msg.content.some((block) => { + if (!block || typeof block !== "object") { + return false; + } + const record = block as { type?: unknown; textSignature?: unknown }; + if (record.type !== "text") { + return false; + } + return Boolean(parseAssistantTextSignature(record.textSignature)?.phase); + }); + + const extracted = + extractTextFromChatContent( + msg.content.filter((block) => { + if (!block || typeof block !== "object") { + return false; + } + const record = block as { type?: unknown; textSignature?: unknown }; + if (record.type !== "text") { + return false; + } + const signature = parseAssistantTextSignature(record.textSignature); + const resolvedPhase = + signature?.phase ?? (hasExplicitPhasedTextBlocks ? undefined : messagePhase); + return shouldIncludeContent(resolvedPhase); + }), + { + sanitizeText: (text) => sanitizeAssistantText(text), + joinWith: "\n", + normalizeText: (text) => text.trim(), + }, + ) ?? ""; + + return finalizeAssistantExtraction(msg, extracted); +} + +export function extractAssistantVisibleText(msg: AssistantMessage): string { + const finalAnswerText = extractAssistantTextForPhase(msg, "final_answer"); + if (finalAnswerText.trim()) { + return finalAnswerText; + } + + const commentaryText = extractAssistantTextForPhase(msg, "commentary"); + if (commentaryText.trim()) { + return commentaryText; + } + + return extractAssistantTextForPhase(msg); +} + export function extractAssistantText(msg: AssistantMessage): string { const extracted = extractTextFromChatContent(msg.content, { - sanitizeText: (text) => - stripThinkingTagsFromText( - stripDowngradedToolCallText(stripModelSpecialTokens(stripMinimaxToolCallXml(text))), - ).trim(), + sanitizeText: (text) => sanitizeAssistantText(text), joinWith: "\n", normalizeText: (text) => text.trim(), }) ?? ""; @@ -247,8 +355,7 @@ export function extractAssistantText(msg: AssistantMessage): string { // Otherwise normal prose that *mentions* errors (e.g. "context overflow") can get clobbered. // Gate on stopReason only — a non-error response with an errorMessage set (e.g. from a // background tool failure) should not have its content rewritten (#13935). - const errorContext = msg.stopReason === "error"; - return sanitizeUserFacingText(extracted, { errorContext }); + return finalizeAssistantExtraction(msg, extracted); } export function extractAssistantThinking(msg: AssistantMessage): string {