diff --git a/src/agents/pi-embedded-subscribe.handlers.messages.ts b/src/agents/pi-embedded-subscribe.handlers.messages.ts index ce7fc0aa832..aa51437cf1d 100644 --- a/src/agents/pi-embedded-subscribe.handlers.messages.ts +++ b/src/agents/pi-embedded-subscribe.handlers.messages.ts @@ -524,6 +524,14 @@ export function handleMessageEnd( emitSplitResultAsBlockReply(ctx.consumeReplyDirectives("", { final: true })); } + if ( + !ctx.params.silentExpected && + ctx.state.blockReplyBreak === "message_end" && + ctx.params.onBlockReplyFlush + ) { + void ctx.params.onBlockReplyFlush(); + } + ctx.state.deltaBuffer = ""; ctx.state.blockBuffer = ""; ctx.blockChunker?.reset(); diff --git a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.calls-onblockreplyflush-before-tool-execution-start-preserve.test.ts b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.calls-onblockreplyflush-before-tool-execution-start-preserve.test.ts index e52ea881f95..0cdab75d804 100644 --- a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.calls-onblockreplyflush-before-tool-execution-start-preserve.test.ts +++ b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.calls-onblockreplyflush-before-tool-execution-start-preserve.test.ts @@ -50,7 +50,7 @@ describe("subscribeEmbeddedPiSession", () => { expect(onBlockReplyFlush).toHaveBeenCalledTimes(2); }); - it("flushes buffered block chunks before tool execution", () => { + it("flushes buffered block chunks before tool execution", async () => { const { session, emit } = createStubSessionHarness(); const onBlockReply = vi.fn(); @@ -80,12 +80,45 @@ describe("subscribeEmbeddedPiSession", () => { toolCallId: "tool-flush-buffer-1", args: { command: "echo flush" }, }); + await Promise.resolve(); expect(onBlockReply).toHaveBeenCalledTimes(1); expect(onBlockReply.mock.calls[0]?.[0]?.text).toBe("Short chunk."); expect(onBlockReplyFlush).toHaveBeenCalledTimes(1); - expect(onBlockReply.mock.invocationCallOrder[0]).toBeLessThan( - onBlockReplyFlush.mock.invocationCallOrder[0], - ); + }); + + it("calls onBlockReplyFlush at message_end for message-boundary turns", async () => { + const { session, emit } = createStubSessionHarness(); + + const onBlockReply = vi.fn(); + const onBlockReplyFlush = vi.fn(); + + subscribeEmbeddedPiSession({ + session: session as unknown as Parameters[0]["session"], + runId: "run-message-end-flush", + onBlockReply, + onBlockReplyFlush, + blockReplyBreak: "message_end", + }); + + emit({ + type: "message_start", + message: { role: "assistant" }, + }); + emitAssistantTextDelta({ emit, delta: "Final reply before lifecycle end." }); + expect(onBlockReplyFlush).not.toHaveBeenCalled(); + + emit({ + type: "message_end", + message: { + role: "assistant", + content: [{ type: "text", text: "Final reply before lifecycle end." }], + }, + }); + await Promise.resolve(); + + expect(onBlockReply).toHaveBeenCalledTimes(1); + expect(onBlockReply.mock.calls[0]?.[0]?.text).toBe("Final reply before lifecycle end."); + expect(onBlockReplyFlush).toHaveBeenCalledTimes(1); }); });