From 17c954c46e116dc271db70c418b3c963b4b9bfd9 Mon Sep 17 00:00:00 2001 From: scoootscooob <167050519+scoootscooob@users.noreply.github.com> Date: Thu, 12 Mar 2026 20:23:57 -0700 Subject: [PATCH] fix(acp): preserve final assistant message snapshot before end_turn (#44597) Process messageData via handleDeltaEvent for both delta and final states before resolving the turn, so ACP clients no longer drop the last visible assistant text when the gateway sends the final message body on the terminal chat event. Closes #15377 Based on #17615 Co-authored-by: PJ Eby <3527052+pjeby@users.noreply.github.com> --- CHANGELOG.md | 1 + src/acp/translator.session-rate-limit.test.ts | 141 ++++++++++++++++++ src/acp/translator.ts | 10 +- 3 files changed, 150 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 34b250f2f75..a2175fcf9fa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -662,6 +662,7 @@ Docs: https://docs.openclaw.ai - Memory flush/bootstrap file protection: restrict memory-flush runs to append-only `read`/`write` tools and route host-side memory appends through root-enforced safe file handles so flush turns cannot overwrite bootstrap files via `exec` or unsafe raw rewrites. (#38574) Thanks @frankekn. - Mattermost/DM media uploads: resolve bare 26-character Mattermost IDs user-first for direct messages so media sends no longer fail with `403 Forbidden` when targets are configured as unprefixed user IDs. (#29925) Thanks @teconomix. - Voice-call/OpenAI TTS config parity: add missing `speed`, `instructions`, and `baseUrl` fields to the OpenAI TTS config schema and gate `instructions` to supported models so voice-call overrides validate and route cleanly through core TTS. (#39226) Thanks @ademczuk. +- ACP/client final-message delivery: preserve terminal assistant text snapshots before resolving `end_turn`, so ACP clients no longer drop the last visible reply when the gateway sends the final message body on the terminal chat event. (#17615) Thanks @pjeby. ## 2026.3.2 diff --git a/src/acp/translator.session-rate-limit.test.ts b/src/acp/translator.session-rate-limit.test.ts index 554dc87e2b8..3e3f254d0ee 100644 --- a/src/acp/translator.session-rate-limit.test.ts +++ b/src/acp/translator.session-rate-limit.test.ts @@ -1020,3 +1020,144 @@ describe("acp prompt size hardening", () => { }); }); }); + +describe("acp final chat snapshots", () => { + async function createSnapshotHarness() { + const sessionStore = createInMemorySessionStore(); + const connection = createAcpConnection(); + const sessionUpdate = connection.__sessionUpdateMock; + const request = vi.fn(async (method: string) => { + if (method === "chat.send") { + return new Promise(() => {}); + } + return { ok: true }; + }) as GatewayClient["request"]; + const agent = new AcpGatewayAgent(connection, createAcpGateway(request), { + sessionStore, + }); + await agent.loadSession(createLoadSessionRequest("snapshot-session")); + sessionUpdate.mockClear(); + const promptPromise = agent.prompt(createPromptRequest("snapshot-session", "hello")); + const runId = sessionStore.getSession("snapshot-session")?.activeRunId; + if (!runId) { + throw new Error("Expected ACP prompt run to be active"); + } + return { agent, sessionUpdate, promptPromise, runId, sessionStore }; + } + + it("emits final snapshot text before resolving end_turn", async () => { + const { agent, sessionUpdate, promptPromise, runId, sessionStore } = + await createSnapshotHarness(); + + await agent.handleGatewayEvent({ + event: "chat", + payload: { + sessionKey: "snapshot-session", + runId, + state: "final", + stopReason: "end_turn", + message: { + content: [{ type: "text", text: "FINAL TEXT SHOULD BE EMITTED" }], + }, + }, + } as unknown as EventFrame); + + await expect(promptPromise).resolves.toEqual({ stopReason: "end_turn" }); + expect(sessionUpdate).toHaveBeenCalledWith({ + sessionId: "snapshot-session", + update: { + sessionUpdate: "agent_message_chunk", + content: { type: "text", text: "FINAL TEXT SHOULD BE EMITTED" }, + }, + }); + expect(sessionStore.getSession("snapshot-session")?.activeRunId).toBeNull(); + sessionStore.clearAllSessionsForTest(); + }); + + it("does not duplicate text when final repeats the last delta snapshot", async () => { + const { agent, sessionUpdate, promptPromise, runId, sessionStore } = + await createSnapshotHarness(); + + await agent.handleGatewayEvent({ + event: "chat", + payload: { + sessionKey: "snapshot-session", + runId, + state: "delta", + message: { + content: [{ type: "text", text: "Hello world" }], + }, + }, + } as unknown as EventFrame); + + await agent.handleGatewayEvent({ + event: "chat", + payload: { + sessionKey: "snapshot-session", + runId, + state: "final", + stopReason: "end_turn", + message: { + content: [{ type: "text", text: "Hello world" }], + }, + }, + } as unknown as EventFrame); + + await expect(promptPromise).resolves.toEqual({ stopReason: "end_turn" }); + const chunks = sessionUpdate.mock.calls.filter( + (call: unknown[]) => + (call[0] as Record)?.update && + (call[0] as Record>).update?.sessionUpdate === + "agent_message_chunk", + ); + expect(chunks).toHaveLength(1); + sessionStore.clearAllSessionsForTest(); + }); + + it("emits only the missing tail when the final snapshot extends prior deltas", async () => { + const { agent, sessionUpdate, promptPromise, runId, sessionStore } = + await createSnapshotHarness(); + + await agent.handleGatewayEvent({ + event: "chat", + payload: { + sessionKey: "snapshot-session", + runId, + state: "delta", + message: { + content: [{ type: "text", text: "Hello" }], + }, + }, + } as unknown as EventFrame); + + await agent.handleGatewayEvent({ + event: "chat", + payload: { + sessionKey: "snapshot-session", + runId, + state: "final", + stopReason: "max_tokens", + message: { + content: [{ type: "text", text: "Hello world" }], + }, + }, + } as unknown as EventFrame); + + await expect(promptPromise).resolves.toEqual({ stopReason: "max_tokens" }); + expect(sessionUpdate).toHaveBeenCalledWith({ + sessionId: "snapshot-session", + update: { + sessionUpdate: "agent_message_chunk", + content: { type: "text", text: "Hello" }, + }, + }); + expect(sessionUpdate).toHaveBeenCalledWith({ + sessionId: "snapshot-session", + update: { + sessionUpdate: "agent_message_chunk", + content: { type: "text", text: " world" }, + }, + }); + sessionStore.clearAllSessionsForTest(); + }); +}); diff --git a/src/acp/translator.ts b/src/acp/translator.ts index b5a6802d07b..8ab1f821fc8 100644 --- a/src/acp/translator.ts +++ b/src/acp/translator.ts @@ -800,9 +800,15 @@ export class AcpGatewayAgent implements Agent { return; } - if (state === "delta" && messageData) { + const shouldHandleMessageSnapshot = messageData && (state === "delta" || state === "final"); + if (shouldHandleMessageSnapshot) { + // Gateway chat events can carry the latest full assistant snapshot on both + // incremental updates and the terminal final event. Process the snapshot + // first so ACP clients never drop the last visible assistant text. await this.handleDeltaEvent(pending.sessionId, messageData); - return; + if (state === "delta") { + return; + } } if (state === "final") {