fix(pi): flush message-boundary block replies on message end

This commit is contained in:
Vincent Koc 2026-04-01 05:42:59 +09:00
parent 0a7024e209
commit f425ea06bf
2 changed files with 45 additions and 4 deletions

View File

@ -524,6 +524,14 @@ export function handleMessageEnd(
emitSplitResultAsBlockReply(ctx.consumeReplyDirectives("", { final: true }));
}
if (
!ctx.params.silentExpected &&
ctx.state.blockReplyBreak === "message_end" &&
ctx.params.onBlockReplyFlush
) {
void ctx.params.onBlockReplyFlush();
}
ctx.state.deltaBuffer = "";
ctx.state.blockBuffer = "";
ctx.blockChunker?.reset();

View File

@ -50,7 +50,7 @@ describe("subscribeEmbeddedPiSession", () => {
expect(onBlockReplyFlush).toHaveBeenCalledTimes(2);
});
it("flushes buffered block chunks before tool execution", () => {
it("flushes buffered block chunks before tool execution", async () => {
const { session, emit } = createStubSessionHarness();
const onBlockReply = vi.fn();
@ -80,12 +80,45 @@ describe("subscribeEmbeddedPiSession", () => {
toolCallId: "tool-flush-buffer-1",
args: { command: "echo flush" },
});
await Promise.resolve();
expect(onBlockReply).toHaveBeenCalledTimes(1);
expect(onBlockReply.mock.calls[0]?.[0]?.text).toBe("Short chunk.");
expect(onBlockReplyFlush).toHaveBeenCalledTimes(1);
expect(onBlockReply.mock.invocationCallOrder[0]).toBeLessThan(
onBlockReplyFlush.mock.invocationCallOrder[0],
);
});
it("calls onBlockReplyFlush at message_end for message-boundary turns", async () => {
const { session, emit } = createStubSessionHarness();
const onBlockReply = vi.fn();
const onBlockReplyFlush = vi.fn();
subscribeEmbeddedPiSession({
session: session as unknown as Parameters<typeof subscribeEmbeddedPiSession>[0]["session"],
runId: "run-message-end-flush",
onBlockReply,
onBlockReplyFlush,
blockReplyBreak: "message_end",
});
emit({
type: "message_start",
message: { role: "assistant" },
});
emitAssistantTextDelta({ emit, delta: "Final reply before lifecycle end." });
expect(onBlockReplyFlush).not.toHaveBeenCalled();
emit({
type: "message_end",
message: {
role: "assistant",
content: [{ type: "text", text: "Final reply before lifecycle end." }],
},
});
await Promise.resolve();
expect(onBlockReply).toHaveBeenCalledTimes(1);
expect(onBlockReply.mock.calls[0]?.[0]?.text).toBe("Final reply before lifecycle end.");
expect(onBlockReplyFlush).toHaveBeenCalledTimes(1);
});
});