diff --git a/CHANGELOG.md b/CHANGELOG.md index b1d3844d475..70dea246fc5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ Docs: https://docs.openclaw.ai - Exec approvals: route Slack, Discord, and Telegram approvals through the shared channel approval-capability path so native approval auth, delivery, and `/approve` handling stay aligned across channels while preserving Telegram session-key agent filtering. (#58634) thanks @gumadeiras - Matrix/runtime: resolve the verification/bootstrap runtime from a distinct packaged Matrix entry so global npm installs stop failing on crypto bootstrap with missing-module or recursive runtime alias errors. (#59249) Thanks @gumadeiras. - Matrix/streaming: preserve ordered block flushes before tool, message, and agent boundaries, add explicit `channels.matrix.blockStreaming` opt-in so Matrix `streaming: "off"` stays final-only by default, and move MiniMax plain-text final handling into the MiniMax provider runtime instead of the shared core heuristic. (#59266) thanks @gumadeiras +- Agents/compaction: resolve compaction wait before final reply/channel flush completion so slow end-of-run delivery drains no longer delay compaction completion. (#59308) thanks @gumadeiras ## 2026.4.2 diff --git a/src/agents/pi-embedded-subscribe.handlers.lifecycle.test.ts b/src/agents/pi-embedded-subscribe.handlers.lifecycle.test.ts index b986c559763..451cec36526 100644 --- a/src/agents/pi-embedded-subscribe.handlers.lifecycle.test.ts +++ b/src/agents/pi-embedded-subscribe.handlers.lifecycle.test.ts @@ -9,7 +9,10 @@ vi.mock("../infra/agent-events.js", () => ({ function createContext( lastAssistant: unknown, - overrides?: { onAgentEvent?: (event: unknown) => void }, + overrides?: { + onAgentEvent?: (event: unknown) => void; + onBlockReplyFlush?: () => void | Promise; + }, ): EmbeddedPiSubscribeContext { const onBlockReply = vi.fn(); return { @@ -19,6 +22,7 @@ function createContext( sessionKey: "agent:main:main", onAgentEvent: overrides?.onAgentEvent, onBlockReply, + onBlockReplyFlush: overrides?.onBlockReplyFlush, }, state: { lastAssistant: lastAssistant as EmbeddedPiSubscribeContext["state"]["lastAssistant"], @@ -179,4 +183,45 @@ describe("handleAgentEnd", () => { expect(ctx.state.pendingToolMediaUrls).toEqual([]); expect(ctx.state.pendingToolAudioAsVoice).toBe(false); }); + + it("resolves compaction wait before awaiting an async block reply flush", async () => { + let resolveFlush: (() => void) | undefined; + const ctx = createContext(undefined); + ctx.flushBlockReplyBuffer = vi + .fn() + .mockImplementationOnce( + () => + new Promise((resolve) => { + resolveFlush = resolve; + }), + ) + .mockImplementation(() => {}); + + const endPromise = handleAgentEnd(ctx); + + expect(ctx.maybeResolveCompactionWait).toHaveBeenCalledTimes(1); + expect(ctx.resolveCompactionRetry).not.toHaveBeenCalled(); + + resolveFlush?.(); + await endPromise; + }); + + it("resolves compaction wait before awaiting an async channel flush", async () => { + let resolveChannelFlush: (() => void) | undefined; + const onBlockReplyFlush = vi.fn( + () => + new Promise((resolve) => { + resolveChannelFlush = resolve; + }), + ); + const ctx = createContext(undefined, { onBlockReplyFlush }); + + const endPromise = handleAgentEnd(ctx); + + expect(ctx.maybeResolveCompactionWait).toHaveBeenCalledTimes(1); + expect(onBlockReplyFlush).toHaveBeenCalledTimes(1); + + resolveChannelFlush?.(); + await endPromise; + }); }); diff --git a/src/agents/pi-embedded-subscribe.handlers.lifecycle.ts b/src/agents/pi-embedded-subscribe.handlers.lifecycle.ts index 2b16c996bff..658b2215165 100644 --- a/src/agents/pi-embedded-subscribe.handlers.lifecycle.ts +++ b/src/agents/pi-embedded-subscribe.handlers.lifecycle.ts @@ -136,20 +136,13 @@ export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext) { }; const flushBlockReplyBufferResult = ctx.flushBlockReplyBuffer(); + finalizeAgentEnd(); if (isPromiseLike(flushBlockReplyBufferResult)) { - return flushBlockReplyBufferResult - .then(() => flushPendingMediaAndChannel()) - .finally(() => { - finalizeAgentEnd(); - }); + return flushBlockReplyBufferResult.then(() => flushPendingMediaAndChannel()); } const flushPendingMediaAndChannelResult = flushPendingMediaAndChannel(); if (isPromiseLike(flushPendingMediaAndChannelResult)) { - return flushPendingMediaAndChannelResult.finally(() => { - finalizeAgentEnd(); - }); + return flushPendingMediaAndChannelResult; } - - finalizeAgentEnd(); }