Fix commentary/final answer phase separation

This commit is contained in:
Leo Zhang 2026-04-02 10:24:25 +00:00 committed by Peter Steinberger
parent b8e2e5c251
commit 7bef5a7466
10 changed files with 1568 additions and 54 deletions

View File

@ -349,10 +349,22 @@ export function convertMessagesToInputItems(
if (m.role === "assistant") {
const content = m.content;
let assistantPhase = normalizeAssistantPhase(m.phase);
const assistantMessagePhase = normalizeAssistantPhase(m.phase);
if (Array.isArray(content)) {
const textParts: string[] = [];
const pushAssistantText = () => {
let currentTextPhase: OpenAIResponsesAssistantPhase | undefined;
const hasExplicitBlockPhase = content.some((block) => {
if (!block || typeof block !== "object") {
return false;
}
const record = block as { type?: unknown; textSignature?: unknown };
return (
record.type === "text" &&
Boolean(parseAssistantTextSignature(record.textSignature)?.phase)
);
});
const pushAssistantText = (phase?: OpenAIResponsesAssistantPhase) => {
if (textParts.length === 0) {
return;
}
@ -360,7 +372,7 @@ export function convertMessagesToInputItems(
type: "message",
role: "assistant",
content: textParts.join(""),
...(assistantPhase ? { phase: assistantPhase } : {}),
...(phase ? { phase } : {}),
});
textParts.length = 0;
};
@ -376,15 +388,18 @@ export function convertMessagesToInputItems(
}>) {
if (block.type === "text" && typeof block.text === "string") {
const parsedSignature = parseAssistantTextSignature(block.textSignature);
if (!assistantPhase) {
assistantPhase = parsedSignature?.phase;
const blockPhase =
parsedSignature?.phase ?? (hasExplicitBlockPhase ? undefined : assistantMessagePhase);
if (textParts.length > 0 && blockPhase !== currentTextPhase) {
pushAssistantText(currentTextPhase);
}
textParts.push(block.text);
currentTextPhase = blockPhase;
continue;
}
if (block.type === "thinking") {
pushAssistantText();
pushAssistantText(currentTextPhase);
const reasoningItem = parseThinkingSignature(block.thinkingSignature);
if (reasoningItem) {
items.push(reasoningItem);
@ -396,7 +411,7 @@ export function convertMessagesToInputItems(
continue;
}
pushAssistantText();
pushAssistantText(currentTextPhase);
const replayId = decodeToolCallReplayId(block.id);
const toolName = toNonEmptyString(block.name);
if (!replayId || !toolName) {
@ -414,7 +429,7 @@ export function convertMessagesToInputItems(
});
}
pushAssistantText();
pushAssistantText(currentTextPhase);
continue;
}
@ -426,7 +441,7 @@ export function convertMessagesToInputItems(
type: "message",
role: "assistant",
content: text,
...(assistantPhase ? { phase: assistantPhase } : {}),
...(assistantMessagePhase ? { phase: assistantMessagePhase } : {}),
});
continue;
}
@ -471,16 +486,20 @@ export function buildAssistantMessageFromResponse(
modelInfo: { api: string; provider: string; id: string },
): AssistantMessage {
const content: AssistantMessage["content"] = [];
let assistantPhase: OpenAIResponsesAssistantPhase | undefined;
const assistantPhases = new Set<OpenAIResponsesAssistantPhase>();
let hasUnphasedAssistantText = false;
for (const item of response.output ?? []) {
if (item.type === "message") {
const itemPhase = normalizeAssistantPhase(item.phase);
if (itemPhase) {
assistantPhase = itemPhase;
assistantPhases.add(itemPhase);
}
for (const part of item.content ?? []) {
if (part.type === "output_text" && part.text) {
if (!itemPhase) {
hasUnphasedAssistantText = true;
}
content.push({
type: "text",
text: part.text,
@ -553,7 +572,10 @@ export function buildAssistantMessageFromResponse(
}),
});
return assistantPhase
? ({ ...message, phase: assistantPhase } as AssistantMessageWithPhase)
const finalAssistantPhase =
assistantPhases.size === 1 && !hasUnphasedAssistantText ? [...assistantPhases][0] : undefined;
return finalAssistantPhase
? ({ ...message, phase: finalAssistantPhase } as AssistantMessageWithPhase)
: message;
}

View File

@ -599,6 +599,146 @@ describe("convertMessagesToInputItems", () => {
});
});
it("splits replayed assistant text on phase changes from block signatures", () => {
const msg = {
role: "assistant" as const,
phase: "final_answer" as const,
content: [
{
type: "text" as const,
text: "Working... ",
textSignature: JSON.stringify({ v: 1, id: "item_commentary", phase: "commentary" }),
},
{
type: "text" as const,
text: "Done.",
textSignature: JSON.stringify({ v: 1, id: "item_final", phase: "final_answer" }),
},
],
stopReason: "stop",
api: "openai-responses",
provider: "openai",
model: "gpt-5.2",
usage: {},
timestamp: 0,
};
expect(
convertMessagesToInputItems([msg] as unknown as Parameters<
typeof convertMessagesToInputItems
>[0]),
).toEqual([
{
type: "message",
role: "assistant",
content: "Working... ",
phase: "commentary",
},
{
type: "message",
role: "assistant",
content: "Done.",
phase: "final_answer",
},
]);
});
it("splits legacy unphased text from later phased text during replay", () => {
const msg = {
role: "assistant" as const,
phase: "final_answer" as const,
content: [
{
type: "text" as const,
text: "Legacy. ",
},
{
type: "text" as const,
text: "Done.",
textSignature: JSON.stringify({ v: 1, id: "item_final", phase: "final_answer" }),
},
],
stopReason: "stop",
api: "openai-responses",
provider: "openai",
model: "gpt-5.2",
usage: {},
timestamp: 0,
};
expect(
convertMessagesToInputItems([msg] as unknown as Parameters<
typeof convertMessagesToInputItems
>[0]),
).toEqual([
{
type: "message",
role: "assistant",
content: "Legacy. ",
},
{
type: "message",
role: "assistant",
content: "Done.",
phase: "final_answer",
},
]);
});
it("preserves ordering when commentary text, tool calls, and final answer share one stored assistant message", () => {
const msg = {
role: "assistant" as const,
content: [
{
type: "text" as const,
text: "Working... ",
textSignature: JSON.stringify({ v: 1, id: "item_commentary", phase: "commentary" }),
},
{
type: "toolCall" as const,
id: "call_1|fc_1",
name: "exec",
arguments: { cmd: "ls" },
},
{
type: "text" as const,
text: "Done.",
textSignature: JSON.stringify({ v: 1, id: "item_final", phase: "final_answer" }),
},
],
stopReason: "toolUse",
api: "openai-responses",
provider: "openai",
model: "gpt-5.2",
usage: {},
timestamp: 0,
};
expect(
convertMessagesToInputItems([msg] as Parameters<typeof convertMessagesToInputItems>[0]),
).toEqual([
{
type: "message",
role: "assistant",
content: "Working... ",
phase: "commentary",
},
{
type: "function_call",
id: "fc_1",
call_id: "call_1",
name: "exec",
arguments: JSON.stringify({ cmd: "ls" }),
},
{
type: "message",
role: "assistant",
content: "Done.",
phase: "final_answer",
},
]);
});
it("converts a tool result message", () => {
const items = convertMessagesToInputItems([toolResultMsg("call_1", "file.txt")] as Parameters<
typeof convertMessagesToInputItems
@ -926,6 +1066,97 @@ describe("buildAssistantMessageFromResponse", () => {
expect(msg.content[0]?.text).toBe("Final answer");
});
it("omits top-level phase when a response contains mixed assistant phases", () => {
const response = {
id: "resp_mixed_phase",
object: "response",
created_at: Date.now(),
status: "completed",
model: "gpt-5.2",
output: [
{
type: "message",
id: "item_commentary",
role: "assistant",
phase: "commentary",
content: [{ type: "output_text", text: "Working... " }],
},
{
type: "message",
id: "item_final",
role: "assistant",
phase: "final_answer",
content: [{ type: "output_text", text: "Done." }],
},
],
usage: { input_tokens: 100, output_tokens: 50, total_tokens: 150 },
} as unknown as ResponseObject;
const msg = buildAssistantMessageFromResponse(response, modelInfo) as {
phase?: string;
content: Array<{ type: string; text?: string; textSignature?: string }>;
};
expect(msg.phase).toBeUndefined();
expect(msg.content).toMatchObject([
{
type: "text",
text: "Working... ",
textSignature: JSON.stringify({ v: 1, id: "item_commentary", phase: "commentary" }),
},
{
type: "text",
text: "Done.",
textSignature: JSON.stringify({ v: 1, id: "item_final", phase: "final_answer" }),
},
]);
});
it("omits top-level phase when unphased legacy text and phased final text coexist", () => {
const response = {
id: "resp_unphased_plus_final",
object: "response",
created_at: Date.now(),
status: "completed",
model: "gpt-5.2",
output: [
{
type: "message",
id: "item_legacy",
role: "assistant",
content: [{ type: "output_text", text: "Legacy. " }],
},
{
type: "message",
id: "item_final",
role: "assistant",
phase: "final_answer",
content: [{ type: "output_text", text: "Done." }],
},
],
usage: { input_tokens: 100, output_tokens: 50, total_tokens: 150 },
} as unknown as ResponseObject;
const msg = buildAssistantMessageFromResponse(response, modelInfo) as {
phase?: string;
content: Array<{ type: string; text?: string; textSignature?: string }>;
};
expect(msg.phase).toBeUndefined();
expect(msg.content).toMatchObject([
{
type: "text",
text: "Legacy. ",
textSignature: JSON.stringify({ v: 1, id: "item_legacy" }),
},
{
type: "text",
text: "Done.",
textSignature: JSON.stringify({ v: 1, id: "item_final", phase: "final_answer" }),
},
]);
});
it("maps reasoning output items to thinking blocks with signature", () => {
const response = {
id: "resp_reasoning",
@ -1244,6 +1475,8 @@ describe("createOpenAIWebSocketStreamFn", () => {
releaseWsSession("sess-incremental");
releaseWsSession("sess-full");
releaseWsSession("sess-phase");
releaseWsSession("sess-phase-stream");
releaseWsSession("sess-phase-late-map");
releaseWsSession("sess-tools");
releaseWsSession("sess-store-default");
releaseWsSession("sess-store-compat");
@ -1482,6 +1715,220 @@ describe("createOpenAIWebSocketStreamFn", () => {
expect(doneEvent?.message.stopReason).toBe("toolUse");
});
it("emits accumulated phase-aware partials when output item mapping is available", async () => {
const streamFn = createOpenAIWebSocketStreamFn("sk-test", "sess-phase-stream");
const stream = streamFn(
modelStub as Parameters<typeof streamFn>[0],
contextStub as Parameters<typeof streamFn>[1],
);
const events: Array<{
type?: string;
delta?: string;
partial?: { phase?: string; content?: unknown[] };
}> = [];
const done = (async () => {
for await (const ev of await resolveStream(stream)) {
events.push(ev as (typeof events)[number]);
}
})();
await new Promise((r) => setImmediate(r));
const manager = MockManager.lastInstance!;
manager.simulateEvent({
type: "response.output_item.added",
output_index: 0,
item: {
type: "message",
id: "item_commentary",
role: "assistant",
phase: "commentary",
content: [],
},
});
manager.simulateEvent({
type: "response.output_text.delta",
item_id: "item_commentary",
output_index: 0,
content_index: 0,
delta: "Working",
});
manager.simulateEvent({
type: "response.output_text.delta",
item_id: "item_commentary",
output_index: 0,
content_index: 0,
delta: "...",
});
manager.simulateEvent({
type: "response.output_item.added",
output_index: 1,
item: {
type: "message",
id: "item_final",
role: "assistant",
phase: "final_answer",
content: [],
},
});
manager.simulateEvent({
type: "response.output_text.delta",
item_id: "item_final",
output_index: 1,
content_index: 0,
delta: "Done.",
});
manager.simulateEvent({
type: "response.completed",
response: {
id: "resp_phase_stream",
object: "response",
created_at: Date.now(),
status: "completed",
model: "gpt-5.2",
output: [
{
type: "message",
id: "item_commentary",
role: "assistant",
phase: "commentary",
content: [{ type: "output_text", text: "Working..." }],
},
{
type: "message",
id: "item_final",
role: "assistant",
phase: "final_answer",
content: [{ type: "output_text", text: "Done." }],
},
],
usage: { input_tokens: 100, output_tokens: 50, total_tokens: 150 },
},
});
await done;
const deltas = events.filter((event) => event.type === "text_delta");
expect(deltas).toHaveLength(3);
expect(deltas[0]).toMatchObject({ delta: "Working" });
expect(deltas[0]?.partial?.phase).toBe("commentary");
expect(deltas[0]?.partial?.content).toEqual([
{
type: "text",
text: "Working",
textSignature: JSON.stringify({ v: 1, id: "item_commentary", phase: "commentary" }),
},
]);
expect(deltas[1]).toMatchObject({ delta: "..." });
expect(deltas[1]?.partial?.phase).toBe("commentary");
expect(deltas[1]?.partial?.content).toEqual([
{
type: "text",
text: "Working...",
textSignature: JSON.stringify({ v: 1, id: "item_commentary", phase: "commentary" }),
},
]);
expect(deltas[2]).toMatchObject({ delta: "Done." });
expect(deltas[2]?.partial?.phase).toBe("final_answer");
expect(deltas[2]?.partial?.content).toEqual([
{
type: "text",
text: "Done.",
textSignature: JSON.stringify({ v: 1, id: "item_final", phase: "final_answer" }),
},
]);
});
it("degrades safely when a text delta arrives before its item mapping", async () => {
const streamFn = createOpenAIWebSocketStreamFn("sk-test", "sess-phase-late-map");
const stream = streamFn(
modelStub as Parameters<typeof streamFn>[0],
contextStub as Parameters<typeof streamFn>[1],
);
const events: Array<{
type?: string;
delta?: string;
partial?: { phase?: string; content?: unknown[] };
}> = [];
const done = (async () => {
for await (const ev of await resolveStream(stream)) {
events.push(ev as (typeof events)[number]);
}
})();
await new Promise((r) => setImmediate(r));
const manager = MockManager.lastInstance!;
manager.simulateEvent({
type: "response.output_text.delta",
item_id: "item_late",
output_index: 0,
content_index: 0,
delta: "Working",
});
manager.simulateEvent({
type: "response.output_item.added",
output_index: 0,
item: {
type: "message",
id: "item_late",
role: "assistant",
phase: "commentary",
content: [],
},
});
manager.simulateEvent({
type: "response.output_text.delta",
item_id: "item_late",
output_index: 0,
content_index: 0,
delta: "...",
});
manager.simulateEvent({
type: "response.completed",
response: {
id: "resp_phase_late_map",
object: "response",
created_at: Date.now(),
status: "completed",
model: "gpt-5.2",
output: [
{
type: "message",
id: "item_late",
role: "assistant",
phase: "commentary",
content: [{ type: "output_text", text: "Working..." }],
},
],
usage: { input_tokens: 100, output_tokens: 50, total_tokens: 150 },
},
});
await done;
const deltas = events.filter((event) => event.type === "text_delta");
expect(deltas).toHaveLength(2);
expect(deltas[0]).toMatchObject({ delta: "Working" });
expect(deltas[0]?.partial?.phase).toBeUndefined();
expect(deltas[0]?.partial?.content).toEqual([
{
type: "text",
text: "Working",
textSignature: JSON.stringify({ v: 1, id: "item_late" }),
},
]);
expect(deltas[1]).toMatchObject({ delta: "..." });
expect(deltas[1]?.partial?.phase).toBe("commentary");
expect(deltas[1]?.partial?.content).toEqual([
{
type: "text",
text: "Working...",
textSignature: JSON.stringify({ v: 1, id: "item_late", phase: "commentary" }),
},
]);
});
it("falls back to HTTP when WebSocket connect fails (session pre-broken via flag)", async () => {
// Set the class-level flag BEFORE calling streamFn so the new instance
// fails on connect(). We patch the static default via MockManager directly.

View File

@ -39,6 +39,7 @@ import {
getOpenAIWebSocketErrorDetails,
OpenAIWebSocketManager,
type FunctionToolDefinition,
type OpenAIResponsesAssistantPhase,
type OpenAIWebSocketManagerOptions,
} from "./openai-ws-connection.js";
import {
@ -86,6 +87,23 @@ type OpenAIWsStreamDeps = {
streamSimple: typeof piAi.streamSimple;
};
type AssistantMessageWithPhase = AssistantMessage & { phase?: OpenAIResponsesAssistantPhase };
function normalizeAssistantPhase(value: unknown): OpenAIResponsesAssistantPhase | undefined {
return value === "commentary" || value === "final_answer" ? value : undefined;
}
function encodeAssistantTextSignature(params: {
id: string;
phase?: OpenAIResponsesAssistantPhase;
}): string {
return JSON.stringify({
v: 1,
id: params.id,
...(params.phase ? { phase: params.phase } : {}),
});
}
const defaultOpenAIWsStreamDeps: OpenAIWsStreamDeps = {
createManager: (options) => new OpenAIWebSocketManager(options),
createHttpFallbackStreamFn: (model) => createBoundaryAwareStreamFnForModel(model),
@ -888,12 +906,53 @@ export function createOpenAIWebSocketStreamFn(
emittedStart = true;
}
const outputItemPhaseById = new Map<string, OpenAIResponsesAssistantPhase | undefined>();
const emitTextDelta = (params: {
fullText: string;
deltaText: string;
itemId?: string;
contentIndex?: number;
}) => {
const resolvedItemId = params.itemId;
const contentIndex = params.contentIndex ?? 0;
const itemPhase = resolvedItemId
? normalizeAssistantPhase(outputItemPhaseById.get(resolvedItemId))
: undefined;
const partialBase = buildAssistantMessageWithZeroUsage({
model,
content: [
{
type: "text",
text: params.fullText,
...(resolvedItemId
? {
textSignature: encodeAssistantTextSignature({
id: resolvedItemId,
...(itemPhase ? { phase: itemPhase } : {}),
}),
}
: {}),
},
],
stopReason: "stop",
});
const partialMsg: AssistantMessageWithPhase = itemPhase
? ({ ...partialBase, phase: itemPhase } as AssistantMessageWithPhase)
: partialBase;
eventStream.push({
type: "text_delta",
contentIndex,
delta: params.deltaText,
partial: partialMsg,
});
};
const capturedContextLength = context.messages.length;
let sawWsOutput = false;
try {
await new Promise<void>((resolve, reject) => {
const abortHandler = () => {
outputItemPhaseById.clear();
cleanup();
reject(new Error("aborted"));
};
@ -904,6 +963,7 @@ export function createOpenAIWebSocketStreamFn(
signal?.addEventListener("abort", abortHandler, { once: true });
const closeHandler = (code: number, reason: string) => {
outputItemPhaseById.clear();
cleanup();
const closeInfo = session.manager.lastCloseInfo;
reject(
@ -926,6 +986,10 @@ export function createOpenAIWebSocketStreamFn(
unsubscribe();
};
const outputTextByPart = new Map<string, string>();
const getOutputTextKey = (itemId: string, contentIndex: number) =>
`${itemId}:${contentIndex}`;
const unsubscribe = session.manager.onMessage((event) => {
if (
event.type === "response.output_item.added" ||
@ -940,7 +1004,54 @@ export function createOpenAIWebSocketStreamFn(
sawWsOutput = true;
}
if (
event.type === "response.output_item.added" ||
event.type === "response.output_item.done"
) {
if (typeof event.item.id === "string") {
const itemPhase =
event.item.type === "message"
? normalizeAssistantPhase((event.item as { phase?: unknown }).phase)
: undefined;
outputItemPhaseById.set(event.item.id, itemPhase);
}
return;
}
if (event.type === "response.output_text.delta") {
const key = getOutputTextKey(event.item_id, event.content_index);
const nextText = `${outputTextByPart.get(key) ?? ""}${event.delta}`;
outputTextByPart.set(key, nextText);
emitTextDelta({
fullText: nextText,
deltaText: event.delta,
itemId: event.item_id,
contentIndex: event.content_index,
});
return;
}
if (event.type === "response.output_text.done") {
const key = getOutputTextKey(event.item_id, event.content_index);
const previousText = outputTextByPart.get(key) ?? "";
if (event.text && event.text !== previousText) {
outputTextByPart.set(key, event.text);
const deltaText = event.text.startsWith(previousText)
? event.text.slice(previousText.length)
: event.text;
emitTextDelta({
fullText: event.text,
deltaText,
itemId: event.item_id,
contentIndex: event.content_index,
});
}
return;
}
if (event.type === "response.completed") {
outputItemPhaseById.clear();
outputTextByPart.clear();
cleanup();
session.lastContextLength = capturedContextLength;
const assistantMsg = buildAssistantMessageFromResponse(event.response, {
@ -953,6 +1064,8 @@ export function createOpenAIWebSocketStreamFn(
eventStream.push({ type: "done", reason, message: assistantMsg });
resolve();
} else if (event.type === "response.failed") {
outputItemPhaseById.clear();
outputTextByPart.clear();
cleanup();
reject(
new OpenAIWebSocketRuntimeError(
@ -964,6 +1077,8 @@ export function createOpenAIWebSocketStreamFn(
),
);
} else if (event.type === "error") {
outputItemPhaseById.clear();
outputTextByPart.clear();
cleanup();
reject(
new OpenAIWebSocketRuntimeError(
@ -974,18 +1089,6 @@ export function createOpenAIWebSocketStreamFn(
},
),
);
} else if (event.type === "response.output_text.delta") {
const partialMsg: AssistantMessage = buildAssistantMessageWithZeroUsage({
model,
content: [{ type: "text", text: event.delta }],
stopReason: "stop",
});
eventStream.push({
type: "text_delta",
contentIndex: 0,
delta: event.delta,
partial: partialMsg,
});
}
});
});

View File

@ -254,6 +254,109 @@ describe("handleMessageUpdate", () => {
expect(ctx.state.blockBuffer).toBe("");
});
it("replaces commentary preview text when a final_answer partial arrives", () => {
const onAgentEvent = vi.fn();
const ctx = {
params: {
runId: "run-1",
session: { id: "session-1" },
onAgentEvent,
},
state: {
deterministicApprovalPromptSent: false,
reasoningStreamOpen: false,
streamReasoning: false,
deltaBuffer: "",
blockBuffer: "",
partialBlockState: {
thinking: false,
final: false,
inlineCode: createInlineCodeState(),
},
lastStreamedAssistant: undefined,
lastStreamedAssistantCleaned: undefined,
emittedAssistantUpdate: false,
shouldEmitPartialReplies: false,
blockReplyBreak: "text_end",
},
log: { debug: vi.fn() },
noteLastAssistant: vi.fn(),
stripBlockTags: (text: string) => text,
consumePartialReplyDirectives: vi.fn(() => null),
emitReasoningStream: vi.fn(),
flushBlockReplyBuffer: vi.fn(),
} as unknown as EmbeddedPiSubscribeContext;
handleMessageUpdate(ctx, {
type: "message_update",
message: { role: "assistant", content: [] },
assistantMessageEvent: {
type: "text_delta",
delta: "Working...",
partial: {
role: "assistant",
content: [
{
type: "text",
text: "Working...",
textSignature: JSON.stringify({ v: 1, id: "item_commentary", phase: "commentary" }),
},
],
phase: "commentary",
stopReason: "stop",
api: "openai-responses",
provider: "openai",
model: "gpt-5.2",
usage: {},
timestamp: 0,
},
},
} as never);
handleMessageUpdate(ctx, {
type: "message_update",
message: { role: "assistant", content: [] },
assistantMessageEvent: {
type: "text_delta",
delta: "Done.",
partial: {
role: "assistant",
content: [
{
type: "text",
text: "Done.",
textSignature: JSON.stringify({ v: 1, id: "item_final", phase: "final_answer" }),
},
],
phase: "final_answer",
stopReason: "stop",
api: "openai-responses",
provider: "openai",
model: "gpt-5.2",
usage: {},
timestamp: 0,
},
},
} as never);
expect(onAgentEvent).toHaveBeenCalledTimes(2);
expect(onAgentEvent.mock.calls[0]?.[0]).toMatchObject({
stream: "assistant",
data: {
text: "Working...",
delta: "Working...",
},
});
expect(onAgentEvent.mock.calls[1]?.[0]).toMatchObject({
stream: "assistant",
data: {
text: "Done.",
delta: "",
replace: true,
},
});
});
it("contains synchronous text_end flush failures", async () => {
const debug = vi.fn();
const ctx = {
@ -417,4 +520,78 @@ describe("handleMessageEnd", () => {
expect(emitBlockReply).not.toHaveBeenCalled();
expect(finalizeAssistantTexts).not.toHaveBeenCalled();
});
it("emits a replacement final assistant event when final_answer appears only at message_end", () => {
const onAgentEvent = vi.fn();
const ctx = {
params: {
runId: "run-1",
session: { id: "session-1" },
onAgentEvent,
},
state: {
deterministicApprovalPromptSent: false,
messagingToolSentTexts: [],
messagingToolSentTextsNormalized: [],
includeReasoning: false,
streamReasoning: false,
emittedAssistantUpdate: true,
lastStreamedAssistantCleaned: "Working...",
assistantTexts: [],
assistantTextBaseline: 0,
blockReplyBreak: "text_end",
lastReasoningSent: undefined,
reasoningStreamOpen: false,
deltaBuffer: "",
blockBuffer: "",
blockState: {
thinking: false,
final: false,
inlineCode: createInlineCodeState(),
},
},
log: { debug: vi.fn() },
noteLastAssistant: vi.fn(),
recordAssistantUsage: vi.fn(),
stripBlockTags: (text: string) => text,
finalizeAssistantTexts: vi.fn(),
emitReasoningStream: vi.fn(),
blockChunker: null,
} as unknown as EmbeddedPiSubscribeContext;
void handleMessageEnd(ctx, {
type: "message_end",
message: {
role: "assistant",
content: [
{
type: "text",
text: "Working...",
textSignature: JSON.stringify({ v: 1, id: "item_commentary", phase: "commentary" }),
},
{
type: "text",
text: "Done.",
textSignature: JSON.stringify({ v: 1, id: "item_final", phase: "final_answer" }),
},
],
stopReason: "stop",
api: "openai-responses",
provider: "openai",
model: "gpt-5.2",
usage: {},
timestamp: 0,
},
} as never);
expect(onAgentEvent).toHaveBeenCalledTimes(1);
expect(onAgentEvent.mock.calls[0]?.[0]).toMatchObject({
stream: "assistant",
data: {
text: "Done.",
delta: "",
replace: true,
},
});
});
});

View File

@ -1,4 +1,5 @@
import type { AgentEvent, AgentMessage } from "@mariozechner/pi-agent-core";
import type { AssistantMessage } from "@mariozechner/pi-ai";
import { resolveSendableOutboundReplyParts } from "openclaw/plugin-sdk/reply-payload";
import { parseReplyDirectives } from "../auto-reply/reply/reply-directives.js";
import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../auto-reply/tokens.js";
@ -18,6 +19,7 @@ import { appendRawStream } from "./pi-embedded-subscribe.raw-stream.js";
import {
extractAssistantText,
extractAssistantThinking,
extractAssistantVisibleText,
extractThinkingFromTaggedStream,
extractThinkingFromTaggedText,
formatReasoningMessage,
@ -41,6 +43,27 @@ const stripTrailingDirective = (text: string): string => {
return text.slice(0, openIndex);
};
type AssistantPhase = "commentary" | "final_answer";
const normalizeAssistantPhase = (value: unknown): AssistantPhase | undefined => {
return value === "commentary" || value === "final_answer" ? value : undefined;
};
const getAssistantTextSignaturePhase = (value: unknown): AssistantPhase | undefined => {
if (typeof value !== "string" || value.trim().length === 0) {
return undefined;
}
if (!value.startsWith("{")) {
return undefined;
}
try {
const parsed = JSON.parse(value) as { phase?: unknown; v?: unknown };
return parsed.v === 1 ? normalizeAssistantPhase(parsed.phase) : undefined;
} catch {
return undefined;
}
};
const coerceText = (value: unknown): string => {
if (typeof value === "string") {
return value;
@ -322,12 +345,47 @@ export function handleMessageUpdate(
}
}
const partialAssistant =
assistantRecord?.partial && typeof assistantRecord.partial === "object"
? (assistantRecord.partial as AssistantMessage)
: msg;
const phaseAwareVisibleText = coerceText(extractAssistantVisibleText(partialAssistant)).trim();
const partialVisiblePhase = normalizeAssistantPhase(
(partialAssistant as { phase?: unknown }).phase,
);
const explicitStructuredPartialPhases = new Set<AssistantPhase>();
if (Array.isArray(partialAssistant.content)) {
for (const block of partialAssistant.content) {
if (!block || typeof block !== "object") {
continue;
}
const record = block as { type?: unknown; textSignature?: unknown };
if (record.type !== "text") {
continue;
}
const signaturePhase = getAssistantTextSignaturePhase(record.textSignature);
if (signaturePhase) {
explicitStructuredPartialPhases.add(signaturePhase);
}
}
}
const structuredPartialPhase =
explicitStructuredPartialPhases.size === 1
? [...explicitStructuredPartialPhases][0]
: undefined;
const shouldUsePhaseAwareBlockReply = Boolean(partialVisiblePhase || structuredPartialPhase);
const shouldDeferTextEndBlockReply =
shouldUsePhaseAwareBlockReply &&
(partialVisiblePhase ?? structuredPartialPhase) !== "final_answer";
if (chunk) {
ctx.state.deltaBuffer += chunk;
if (ctx.blockChunker) {
ctx.blockChunker.append(chunk);
} else {
ctx.state.blockBuffer += chunk;
if (!shouldUsePhaseAwareBlockReply) {
if (ctx.blockChunker) {
ctx.blockChunker.append(chunk);
} else {
ctx.state.blockBuffer += chunk;
}
}
}
@ -335,14 +393,15 @@ export function handleMessageUpdate(
// Handle partial <think> tags: stream whatever reasoning is visible so far.
ctx.emitReasoningStream(extractThinkingFromTaggedStream(ctx.state.deltaBuffer));
}
const next = ctx
.stripBlockTags(ctx.state.deltaBuffer, {
thinking: false,
final: false,
inlineCode: createInlineCodeState(),
})
.trim();
const next =
phaseAwareVisibleText ||
ctx
.stripBlockTags(ctx.state.deltaBuffer, {
thinking: false,
final: false,
inlineCode: createInlineCodeState(),
})
.trim();
if (next) {
const wasThinking = ctx.state.partialBlockState.thinking;
const visibleDelta = chunk ? ctx.stripBlockTags(chunk, ctx.state.partialBlockState) : "";
@ -373,6 +432,35 @@ export function handleMessageUpdate(
: Boolean(deltaText || hasMedia || hasAudio);
}
if (shouldUsePhaseAwareBlockReply) {
if (replace) {
ctx.state.blockBuffer = "";
ctx.blockChunker?.reset();
}
const blockReplyChunk = replace ? cleanedText : deltaText;
if (!shouldDeferTextEndBlockReply && blockReplyChunk) {
if (ctx.blockChunker) {
ctx.blockChunker.append(blockReplyChunk);
} else {
ctx.state.blockBuffer += blockReplyChunk;
}
}
if (
!shouldDeferTextEndBlockReply &&
evtType === "text_end" &&
!ctx.state.lastBlockReplyText &&
cleanedText
) {
if (ctx.blockChunker) {
ctx.blockChunker.reset();
ctx.blockChunker.append(cleanedText);
} else {
ctx.state.blockBuffer = cleanedText;
}
}
}
ctx.state.lastStreamedAssistant = next;
ctx.state.lastStreamedAssistantCleaned = cleanedText;
@ -405,6 +493,7 @@ export function handleMessageUpdate(
if (
!ctx.params.silentExpected &&
!shouldDeferTextEndBlockReply &&
ctx.params.onBlockReply &&
ctx.blockChunking &&
ctx.state.blockReplyBreak === "text_end"
@ -414,6 +503,7 @@ export function handleMessageUpdate(
if (
!ctx.params.silentExpected &&
!shouldDeferTextEndBlockReply &&
evtType === "text_end" &&
ctx.state.blockReplyBreak === "text_end"
) {
@ -445,6 +535,7 @@ export function handleMessageEnd(
promoteThinkingTagsToBlocks(assistantMessage);
const rawText = coerceText(extractAssistantText(assistantMessage));
const rawVisibleText = coerceText(extractAssistantVisibleText(assistantMessage));
appendRawStream({
ts: Date.now(),
event: "assistant_message_end",
@ -455,7 +546,7 @@ export function handleMessageEnd(
});
const text = resolveSilentReplyFallbackText({
text: ctx.stripBlockTags(rawText, { thinking: false, final: false }),
text: ctx.stripBlockTags(rawVisibleText || rawText, { thinking: false, final: false }),
messagingToolSentTexts: ctx.state.messagingToolSentTexts,
});
const rawThinking =
@ -497,14 +588,29 @@ export function handleMessageEnd(
}
}
const previousStreamedText = ctx.state.lastStreamedAssistantCleaned ?? "";
const shouldReplaceFinalStream = Boolean(
previousStreamedText && cleanedText && !cleanedText.startsWith(previousStreamedText),
);
const didTextChangeWithinCurrentMessage = Boolean(
previousStreamedText && cleanedText !== previousStreamedText,
);
const finalStreamDelta = shouldReplaceFinalStream
? ""
: cleanedText.slice(previousStreamedText.length);
if (
!ctx.params.silentExpected &&
!ctx.state.emittedAssistantUpdate &&
(cleanedText || hasMedia)
(cleanedText || hasMedia) &&
(!ctx.state.emittedAssistantUpdate ||
shouldReplaceFinalStream ||
didTextChangeWithinCurrentMessage ||
hasMedia)
) {
const data = buildAssistantStreamData({
text: cleanedText,
delta: cleanedText,
delta: finalStreamDelta,
replace: shouldReplaceFinalStream,
mediaUrls,
});
emitAgentEvent({
@ -517,6 +623,7 @@ export function handleMessageEnd(
data,
});
ctx.state.emittedAssistantUpdate = true;
ctx.state.lastStreamedAssistantCleaned = cleanedText;
}
const silentExpectedWithoutSentinel =
@ -579,14 +686,19 @@ export function handleMessageEnd(
}
};
const hasBufferedBlockReply = ctx.blockChunker
? ctx.blockChunker.hasBuffered()
: ctx.state.blockBuffer.length > 0;
if (
!ctx.params.silentExpected &&
(ctx.state.blockReplyBreak === "message_end" ||
(ctx.blockChunker ? ctx.blockChunker.hasBuffered() : ctx.state.blockBuffer.length > 0)) &&
text &&
onBlockReply
onBlockReply &&
(ctx.state.blockReplyBreak === "message_end" ||
hasBufferedBlockReply ||
text !== ctx.state.lastBlockReplyText)
) {
if (ctx.blockChunker?.hasBuffered()) {
if (hasBufferedBlockReply && ctx.blockChunker?.hasBuffered()) {
ctx.blockChunker.drain({ force: true, emit: ctx.emitBlockChunk });
ctx.blockChunker.reset();
} else if (text !== ctx.state.lastBlockReplyText) {

View File

@ -45,6 +45,7 @@ describe("subscribeEmbeddedPiSession", () => {
emitDelta(delta);
emitTextEnd(content);
await Promise.resolve();
await vi.waitFor(() => {
expect(onBlockReply).toHaveBeenCalledTimes(1);

View File

@ -12,13 +12,14 @@ describe("subscribeEmbeddedPiSession", () => {
emitAssistantTextDelta({ emit, delta: "Good morning!" });
emitAssistantTextEnd({ emit, content: "Good morning!" });
await Promise.resolve();
await vi.waitFor(() => {
expect(onBlockReply).toHaveBeenCalledTimes(1);
});
expect(subscription.assistantTexts).toEqual(["Good morning!"]);
});
it("does not duplicate block chunks when text_end repeats full content", () => {
it("does not duplicate block chunks when text_end repeats full content", async () => {
const onBlockReply = vi.fn();
const { emit } = createTextEndBlockReplyHarness({
onBlockReply,
@ -32,11 +33,13 @@ describe("subscribeEmbeddedPiSession", () => {
const fullText = "First line\nSecond line\nThird line\n";
emitAssistantTextDelta({ emit, delta: fullText });
await Promise.resolve();
const callsAfterDelta = onBlockReply.mock.calls.length;
expect(callsAfterDelta).toBeGreaterThan(0);
emitAssistantTextEnd({ emit, content: fullText });
await Promise.resolve();
expect(onBlockReply).toHaveBeenCalledTimes(callsAfterDelta);
});

View File

@ -13,6 +13,7 @@ describe("subscribeEmbeddedPiSession", () => {
emitAssistantTextDelta({ emit, delta: "Hello block" });
emitAssistantTextEnd({ emit });
await Promise.resolve();
await vi.waitFor(() => {
expect(onBlockReply).toHaveBeenCalledTimes(1);
@ -31,7 +32,8 @@ describe("subscribeEmbeddedPiSession", () => {
expect(onBlockReply).toHaveBeenCalledTimes(1);
expect(subscription.assistantTexts).toEqual(["Hello block"]);
});
it("does not duplicate when message_end flushes and a late text_end arrives", () => {
it("does not duplicate when message_end flushes and a late text_end arrives", async () => {
const onBlockReply = vi.fn();
const { emit, subscription } = createTextEndBlockReplyHarness({ onBlockReply });
@ -52,8 +54,473 @@ describe("subscribeEmbeddedPiSession", () => {
// Some providers can still emit a late text_end; this must not re-emit.
emitAssistantTextEnd({ emit, content: "Hello block" });
await Promise.resolve();
expect(onBlockReply).toHaveBeenCalledTimes(1);
expect(subscription.assistantTexts).toEqual(["Hello block"]);
});
it("emits legacy structured partials on text_end without waiting for message_end", async () => {
const onBlockReply = vi.fn();
const { emit, subscription } = createTextEndBlockReplyHarness({ onBlockReply });
emit({ type: "message_start", message: { role: "assistant" } });
emit({
type: "message_update",
message: { role: "assistant", content: [] },
assistantMessageEvent: {
type: "text_delta",
delta: "Legacy answer",
partial: {
role: "assistant",
content: [
{
type: "text",
text: "Legacy answer",
textSignature: JSON.stringify({ v: 1, id: "item_legacy" }),
},
],
stopReason: "stop",
api: "openai-responses",
provider: "openai",
model: "gpt-5.2",
usage: {},
timestamp: 0,
},
},
});
emit({
type: "message_update",
message: { role: "assistant", content: [] },
assistantMessageEvent: {
type: "text_end",
content: "Legacy answer",
partial: {
role: "assistant",
content: [
{
type: "text",
text: "Legacy answer",
textSignature: JSON.stringify({ v: 1, id: "item_legacy" }),
},
],
stopReason: "stop",
api: "openai-responses",
provider: "openai",
model: "gpt-5.2",
usage: {},
timestamp: 0,
},
},
});
await Promise.resolve();
expect(onBlockReply).toHaveBeenCalledTimes(1);
expect(onBlockReply.mock.calls[0]?.[0]?.text).toBe("Legacy answer");
expect(subscription.assistantTexts).toEqual(["Legacy answer"]);
emit({
type: "message_end",
message: {
role: "assistant",
content: [{ type: "text", text: "Legacy answer" }],
} as AssistantMessage,
});
expect(onBlockReply).toHaveBeenCalledTimes(1);
expect(subscription.assistantTexts).toEqual(["Legacy answer"]);
});
it("suppresses commentary block replies until a final answer is available", async () => {
const onBlockReply = vi.fn();
const { emit, subscription } = createTextEndBlockReplyHarness({ onBlockReply });
emit({ type: "message_start", message: { role: "assistant" } });
emit({
type: "message_update",
message: { role: "assistant", content: [] },
assistantMessageEvent: {
type: "text_delta",
delta: "Working...",
partial: {
role: "assistant",
content: [
{
type: "text",
text: "Working...",
textSignature: JSON.stringify({ v: 1, id: "item_commentary", phase: "commentary" }),
},
],
phase: "commentary",
stopReason: "stop",
api: "openai-responses",
provider: "openai",
model: "gpt-5.2",
usage: {},
timestamp: 0,
},
},
});
emit({
type: "message_update",
message: { role: "assistant", content: [] },
assistantMessageEvent: {
type: "text_end",
content: "Working...",
partial: {
role: "assistant",
content: [
{
type: "text",
text: "Working...",
textSignature: JSON.stringify({ v: 1, id: "item_commentary", phase: "commentary" }),
},
],
phase: "commentary",
stopReason: "stop",
api: "openai-responses",
provider: "openai",
model: "gpt-5.2",
usage: {},
timestamp: 0,
},
},
});
await Promise.resolve();
expect(onBlockReply).not.toHaveBeenCalled();
expect(subscription.assistantTexts).toEqual([]);
emit({
type: "message_update",
message: { role: "assistant", content: [] },
assistantMessageEvent: {
type: "text_delta",
delta: "Done.",
partial: {
role: "assistant",
content: [
{
type: "text",
text: "Done.",
textSignature: JSON.stringify({ v: 1, id: "item_final", phase: "final_answer" }),
},
],
phase: "final_answer",
stopReason: "stop",
api: "openai-responses",
provider: "openai",
model: "gpt-5.2",
usage: {},
timestamp: 0,
},
},
});
emit({
type: "message_update",
message: { role: "assistant", content: [] },
assistantMessageEvent: {
type: "text_end",
content: "Done.",
partial: {
role: "assistant",
content: [
{
type: "text",
text: "Done.",
textSignature: JSON.stringify({ v: 1, id: "item_final", phase: "final_answer" }),
},
],
phase: "final_answer",
stopReason: "stop",
api: "openai-responses",
provider: "openai",
model: "gpt-5.2",
usage: {},
timestamp: 0,
},
},
});
await Promise.resolve();
emit({
type: "message_end",
message: {
role: "assistant",
content: [
{
type: "text",
text: "Working...",
textSignature: JSON.stringify({ v: 1, id: "item_commentary", phase: "commentary" }),
},
{
type: "text",
text: "Done.",
textSignature: JSON.stringify({ v: 1, id: "item_final", phase: "final_answer" }),
},
],
} as AssistantMessage,
});
expect(onBlockReply).toHaveBeenCalledTimes(1);
expect(onBlockReply.mock.calls[0]?.[0]?.text).toBe("Done.");
expect(subscription.assistantTexts).toEqual(["Done."]);
});
it("emits the full final answer on text_end when it extends suppressed commentary", async () => {
const onBlockReply = vi.fn();
const { emit, subscription } = createTextEndBlockReplyHarness({ onBlockReply });
emit({ type: "message_start", message: { role: "assistant" } });
emit({
type: "message_update",
message: { role: "assistant", content: [] },
assistantMessageEvent: {
type: "text_delta",
delta: "Hello",
partial: {
role: "assistant",
content: [
{
type: "text",
text: "Hello",
textSignature: JSON.stringify({ v: 1, id: "item_commentary", phase: "commentary" }),
},
],
phase: "commentary",
stopReason: "stop",
api: "openai-responses",
provider: "openai",
model: "gpt-5.2",
usage: {},
timestamp: 0,
},
},
});
emit({
type: "message_update",
message: { role: "assistant", content: [] },
assistantMessageEvent: {
type: "text_end",
content: "Hello",
partial: {
role: "assistant",
content: [
{
type: "text",
text: "Hello",
textSignature: JSON.stringify({ v: 1, id: "item_commentary", phase: "commentary" }),
},
],
phase: "commentary",
stopReason: "stop",
api: "openai-responses",
provider: "openai",
model: "gpt-5.2",
usage: {},
timestamp: 0,
},
},
});
await Promise.resolve();
expect(onBlockReply).not.toHaveBeenCalled();
emit({
type: "message_update",
message: { role: "assistant", content: [] },
assistantMessageEvent: {
type: "text_delta",
delta: " world",
partial: {
role: "assistant",
content: [
{
type: "text",
text: "Hello world",
textSignature: JSON.stringify({ v: 1, id: "item_final", phase: "final_answer" }),
},
],
phase: "final_answer",
stopReason: "stop",
api: "openai-responses",
provider: "openai",
model: "gpt-5.2",
usage: {},
timestamp: 0,
},
},
});
emit({
type: "message_update",
message: { role: "assistant", content: [] },
assistantMessageEvent: {
type: "text_end",
content: "Hello world",
partial: {
role: "assistant",
content: [
{
type: "text",
text: "Hello world",
textSignature: JSON.stringify({ v: 1, id: "item_final", phase: "final_answer" }),
},
],
phase: "final_answer",
stopReason: "stop",
api: "openai-responses",
provider: "openai",
model: "gpt-5.2",
usage: {},
timestamp: 0,
},
},
});
await Promise.resolve();
expect(onBlockReply).toHaveBeenCalledTimes(1);
expect(onBlockReply.mock.calls[0]?.[0]?.text).toBe("Hello world");
expect(subscription.assistantTexts).toEqual(["Hello world"]);
});
it("does not defer final_answer text_end when phase exists only in textSignature", async () => {
const onBlockReply = vi.fn();
const { emit, subscription } = createTextEndBlockReplyHarness({ onBlockReply });
emit({ type: "message_start", message: { role: "assistant" } });
emit({
type: "message_update",
message: { role: "assistant", content: [] },
assistantMessageEvent: {
type: "text_delta",
delta: "Done.",
partial: {
role: "assistant",
content: [
{
type: "text",
text: "Done.",
textSignature: JSON.stringify({ v: 1, id: "item_final", phase: "final_answer" }),
},
],
stopReason: "stop",
api: "openai-responses",
provider: "openai",
model: "gpt-5.2",
usage: {},
timestamp: 0,
},
},
});
emit({
type: "message_update",
message: { role: "assistant", content: [] },
assistantMessageEvent: {
type: "text_end",
content: "Done.",
partial: {
role: "assistant",
content: [
{
type: "text",
text: "Done.",
textSignature: JSON.stringify({ v: 1, id: "item_final", phase: "final_answer" }),
},
],
stopReason: "stop",
api: "openai-responses",
provider: "openai",
model: "gpt-5.2",
usage: {},
timestamp: 0,
},
},
});
await Promise.resolve();
expect(onBlockReply).toHaveBeenCalledTimes(1);
expect(onBlockReply.mock.calls[0]?.[0]?.text).toBe("Done.");
expect(subscription.assistantTexts).toEqual(["Done."]);
});
it("emits the final answer at message_end when commentary was streamed first", async () => {
const onBlockReply = vi.fn();
const { emit, subscription } = createTextEndBlockReplyHarness({ onBlockReply });
emit({ type: "message_start", message: { role: "assistant" } });
emit({
type: "message_update",
message: { role: "assistant", content: [] },
assistantMessageEvent: {
type: "text_delta",
delta: "Working...",
partial: {
role: "assistant",
content: [
{
type: "text",
text: "Working...",
textSignature: JSON.stringify({ v: 1, id: "item_commentary", phase: "commentary" }),
},
],
phase: "commentary",
stopReason: "stop",
api: "openai-responses",
provider: "openai",
model: "gpt-5.2",
usage: {},
timestamp: 0,
},
},
});
emit({
type: "message_update",
message: { role: "assistant", content: [] },
assistantMessageEvent: {
type: "text_end",
content: "Working...",
partial: {
role: "assistant",
content: [
{
type: "text",
text: "Working...",
textSignature: JSON.stringify({ v: 1, id: "item_commentary", phase: "commentary" }),
},
],
phase: "commentary",
stopReason: "stop",
api: "openai-responses",
provider: "openai",
model: "gpt-5.2",
usage: {},
timestamp: 0,
},
},
});
await Promise.resolve();
emit({
type: "message_end",
message: {
role: "assistant",
content: [
{
type: "text",
text: "Working...",
textSignature: JSON.stringify({ v: 1, id: "item_commentary", phase: "commentary" }),
},
{
type: "text",
text: "Done.",
textSignature: JSON.stringify({ v: 1, id: "item_final", phase: "final_answer" }),
},
],
} as AssistantMessage,
});
expect(onBlockReply).toHaveBeenCalledTimes(1);
expect(onBlockReply.mock.calls[0]?.[0]?.text).toBe("Done.");
expect(subscription.assistantTexts).toEqual(["Done."]);
});
});

View File

@ -2,6 +2,7 @@ import type { AssistantMessage } from "@mariozechner/pi-ai";
import { describe, expect, it } from "vitest";
import {
extractAssistantText,
extractAssistantVisibleText,
formatReasoningMessage,
promoteThinkingTagsToBlocks,
stripDowngradedToolCallText,
@ -9,7 +10,9 @@ import {
function makeAssistantMessage(
message: Omit<AssistantMessage, "api" | "provider" | "model" | "usage" | "stopReason"> &
Partial<Pick<AssistantMessage, "api" | "provider" | "model" | "usage" | "stopReason">>,
Partial<Pick<AssistantMessage, "api" | "provider" | "model" | "usage" | "stopReason">> & {
phase?: "commentary" | "final_answer";
},
): AssistantMessage {
return {
api: "responses",
@ -569,6 +572,78 @@ describe("stripDowngradedToolCallText", () => {
});
});
describe("extractAssistantVisibleText", () => {
it("prefers non-empty final_answer text over commentary", () => {
const msg = makeAssistantMessage({
role: "assistant",
content: [
{
type: "text",
text: "Working...",
textSignature: JSON.stringify({ v: 1, id: "item_commentary", phase: "commentary" }),
},
{
type: "text",
text: "Done.",
textSignature: JSON.stringify({ v: 1, id: "item_final", phase: "final_answer" }),
},
],
timestamp: Date.now(),
});
expect(extractAssistantVisibleText(msg)).toBe("Done.");
});
it("falls back to commentary when final_answer is empty", () => {
const msg = makeAssistantMessage({
role: "assistant",
content: [
{
type: "text",
text: "Working...",
textSignature: JSON.stringify({ v: 1, id: "item_commentary", phase: "commentary" }),
},
{
type: "text",
text: " ",
textSignature: JSON.stringify({ v: 1, id: "item_final", phase: "final_answer" }),
},
],
timestamp: Date.now(),
});
expect(extractAssistantVisibleText(msg)).toBe("Working...");
});
it("falls back to legacy unphased text when phased text is absent", () => {
const msg = makeAssistantMessage({
role: "assistant",
content: [{ type: "text", text: "Legacy answer" }],
timestamp: Date.now(),
});
expect(extractAssistantVisibleText(msg)).toBe("Legacy answer");
});
it("does not pull unphased legacy text into final_answer extraction when phased blocks are present", () => {
const msg = makeAssistantMessage({
role: "assistant",
phase: "final_answer",
content: [
{ type: "text", text: "Legacy." },
{
type: "text",
text: "Done.",
textSignature: JSON.stringify({ v: 1, id: "item_final", phase: "final_answer" }),
},
],
timestamp: Date.now(),
});
expect(extractAssistantVisibleText(msg)).toBe("Done.");
});
});
describe("promoteThinkingTagsToBlocks", () => {
it("does not crash on malformed null content entries", () => {
const msg = makeAssistantMessage({

View File

@ -233,13 +233,121 @@ export function stripThinkingTagsFromText(text: string): string {
return stripReasoningTagsFromText(text, { mode: "strict", trim: "both" });
}
type AssistantPhase = "commentary" | "final_answer";
function normalizeAssistantPhase(value: unknown): AssistantPhase | undefined {
return value === "commentary" || value === "final_answer" ? value : undefined;
}
function parseAssistantTextSignature(
value: unknown,
): { id?: string; phase?: AssistantPhase } | null {
if (typeof value !== "string" || value.trim().length === 0) {
return null;
}
if (!value.startsWith("{")) {
return { id: value };
}
try {
const parsed = JSON.parse(value) as { id?: unknown; phase?: unknown; v?: unknown };
if (parsed.v !== 1) {
return null;
}
return {
...(typeof parsed.id === "string" ? { id: parsed.id } : {}),
...(normalizeAssistantPhase(parsed.phase)
? { phase: normalizeAssistantPhase(parsed.phase) }
: {}),
};
} catch {
return null;
}
}
function sanitizeAssistantText(text: string): string {
return stripThinkingTagsFromText(
stripDowngradedToolCallText(stripModelSpecialTokens(stripMinimaxToolCallXml(text))),
).trim();
}
function finalizeAssistantExtraction(msg: AssistantMessage, extracted: string): string {
const errorContext = msg.stopReason === "error";
return sanitizeUserFacingText(extracted, { errorContext });
}
function extractAssistantTextForPhase(msg: AssistantMessage, phase?: AssistantPhase): string {
const messagePhase = normalizeAssistantPhase((msg as { phase?: unknown }).phase);
const shouldIncludeContent = (resolvedPhase?: AssistantPhase) => {
if (phase) {
return resolvedPhase === phase;
}
return resolvedPhase === undefined;
};
if (typeof msg.content === "string") {
return shouldIncludeContent(messagePhase)
? finalizeAssistantExtraction(msg, sanitizeAssistantText(msg.content))
: "";
}
if (!Array.isArray(msg.content)) {
return "";
}
const hasExplicitPhasedTextBlocks = msg.content.some((block) => {
if (!block || typeof block !== "object") {
return false;
}
const record = block as { type?: unknown; textSignature?: unknown };
if (record.type !== "text") {
return false;
}
return Boolean(parseAssistantTextSignature(record.textSignature)?.phase);
});
const extracted =
extractTextFromChatContent(
msg.content.filter((block) => {
if (!block || typeof block !== "object") {
return false;
}
const record = block as { type?: unknown; textSignature?: unknown };
if (record.type !== "text") {
return false;
}
const signature = parseAssistantTextSignature(record.textSignature);
const resolvedPhase =
signature?.phase ?? (hasExplicitPhasedTextBlocks ? undefined : messagePhase);
return shouldIncludeContent(resolvedPhase);
}),
{
sanitizeText: (text) => sanitizeAssistantText(text),
joinWith: "\n",
normalizeText: (text) => text.trim(),
},
) ?? "";
return finalizeAssistantExtraction(msg, extracted);
}
export function extractAssistantVisibleText(msg: AssistantMessage): string {
const finalAnswerText = extractAssistantTextForPhase(msg, "final_answer");
if (finalAnswerText.trim()) {
return finalAnswerText;
}
const commentaryText = extractAssistantTextForPhase(msg, "commentary");
if (commentaryText.trim()) {
return commentaryText;
}
return extractAssistantTextForPhase(msg);
}
export function extractAssistantText(msg: AssistantMessage): string {
const extracted =
extractTextFromChatContent(msg.content, {
sanitizeText: (text) =>
stripThinkingTagsFromText(
stripDowngradedToolCallText(stripModelSpecialTokens(stripMinimaxToolCallXml(text))),
).trim(),
sanitizeText: (text) => sanitizeAssistantText(text),
joinWith: "\n",
normalizeText: (text) => text.trim(),
}) ?? "";
@ -247,8 +355,7 @@ export function extractAssistantText(msg: AssistantMessage): string {
// Otherwise normal prose that *mentions* errors (e.g. "context overflow") can get clobbered.
// Gate on stopReason only — a non-error response with an errorMessage set (e.g. from a
// background tool failure) should not have its content rewritten (#13935).
const errorContext = msg.stopReason === "error";
return sanitizeUserFacingText(extracted, { errorContext });
return finalizeAssistantExtraction(msg, extracted);
}
export function extractAssistantThinking(msg: AssistantMessage): string {