From 7c76acafd6c9835226f0b582ba86c50f0a1d20b3 Mon Sep 17 00:00:00 2001 From: Pejman Pour-Moezzi Date: Tue, 10 Mar 2026 14:37:21 -0700 Subject: [PATCH] fix(acp): scope cancellation and event routing by runId (#41331) --- CHANGELOG.md | 1 + src/acp/translator.cancel-scoping.test.ts | 274 ++++++++++++++++++++++ src/acp/translator.ts | 33 ++- 3 files changed, 298 insertions(+), 10 deletions(-) create mode 100644 src/acp/translator.cancel-scoping.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 60df48c6357..6b435ce2a80 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -144,6 +144,7 @@ Docs: https://docs.openclaw.ai - Skills/download installs: pin the validated per-skill tools root before writing downloaded archives, so rebinding the lexical tools path cannot redirect download writes outside the intended tools directory. Thanks @tdjackey. - Control UI/Debug: replace the Manual RPC free-text method field with a sorted dropdown sourced from gateway-advertised methods, and stack the form vertically for narrower layouts. (#14967) thanks @rixau. - Auth/profile resolution: log debug details when auto-discovered auth profiles fail during provider API-key resolution, so `--debug` output surfaces the real refresh/keychain/credential-store failure instead of only the generic missing-key message. (#41271) thanks @he-yufeng. +- ACP/cancel scoping: scope `chat.abort` and shared-session ACP event routing by `runId` so one session cannot cancel or consume another session's run when they share the same gateway session key. (#41331) Thanks @pejmanjohn. ## 2026.3.7 diff --git a/src/acp/translator.cancel-scoping.test.ts b/src/acp/translator.cancel-scoping.test.ts new file mode 100644 index 00000000000..c84832369a0 --- /dev/null +++ b/src/acp/translator.cancel-scoping.test.ts @@ -0,0 +1,274 @@ +import type { CancelNotification, PromptRequest, PromptResponse } from "@agentclientprotocol/sdk"; +import { describe, expect, it, vi } from "vitest"; +import type { GatewayClient } from "../gateway/client.js"; +import type { EventFrame } from "../gateway/protocol/index.js"; +import { createInMemorySessionStore } from "./session.js"; +import { AcpGatewayAgent } from "./translator.js"; +import { createAcpConnection, createAcpGateway } from "./translator.test-helpers.js"; + +type Harness = { + agent: AcpGatewayAgent; + requestSpy: ReturnType; + sessionUpdateSpy: ReturnType; + sessionStore: ReturnType; + sentRunIds: string[]; +}; + +function createPromptRequest(sessionId: string): PromptRequest { + return { + sessionId, + prompt: [{ type: "text", text: "hello" }], + _meta: {}, + } as unknown as PromptRequest; +} + +function createChatEvent(payload: Record): EventFrame { + return { + type: "event", + event: "chat", + payload, + } as EventFrame; +} + +function createToolEvent(payload: Record): EventFrame { + return { + type: "event", + event: "agent", + payload, + } as EventFrame; +} + +function createHarness(sessions: Array<{ sessionId: string; sessionKey: string }>): Harness { + const sentRunIds: string[] = []; + const requestSpy = vi.fn(async (method: string, params?: Record) => { + if (method === "chat.send") { + const runId = params?.idempotencyKey; + if (typeof runId === "string") { + sentRunIds.push(runId); + } + return new Promise(() => {}); + } + return {}; + }); + const connection = createAcpConnection(); + const sessionStore = createInMemorySessionStore(); + for (const session of sessions) { + sessionStore.createSession({ + sessionId: session.sessionId, + sessionKey: session.sessionKey, + cwd: "/tmp", + }); + } + + const agent = new AcpGatewayAgent( + connection, + createAcpGateway(requestSpy as unknown as GatewayClient["request"]), + { sessionStore }, + ); + + return { + agent, + requestSpy, + // eslint-disable-next-line @typescript-eslint/unbound-method + sessionUpdateSpy: connection.sessionUpdate as unknown as ReturnType, + sessionStore, + sentRunIds, + }; +} + +async function startPendingPrompt( + harness: Harness, + sessionId: string, +): Promise<{ promptPromise: Promise; runId: string }> { + const before = harness.sentRunIds.length; + const promptPromise = harness.agent.prompt(createPromptRequest(sessionId)); + await vi.waitFor(() => { + expect(harness.sentRunIds.length).toBe(before + 1); + }); + return { + promptPromise, + runId: harness.sentRunIds[before], + }; +} + +describe("acp translator cancel and run scoping", () => { + it("cancel passes active runId to chat.abort", async () => { + const sessionKey = "agent:main:shared"; + const harness = createHarness([{ sessionId: "session-1", sessionKey }]); + const pending = await startPendingPrompt(harness, "session-1"); + + await harness.agent.cancel({ sessionId: "session-1" } as CancelNotification); + + expect(harness.requestSpy).toHaveBeenCalledWith("chat.abort", { + sessionKey, + runId: pending.runId, + }); + await expect(pending.promptPromise).resolves.toEqual({ stopReason: "cancelled" }); + }); + + it("cancel uses pending runId when there is no active run", async () => { + const sessionKey = "agent:main:shared"; + const harness = createHarness([{ sessionId: "session-1", sessionKey }]); + const pending = await startPendingPrompt(harness, "session-1"); + harness.sessionStore.clearActiveRun("session-1"); + + await harness.agent.cancel({ sessionId: "session-1" } as CancelNotification); + + expect(harness.requestSpy).toHaveBeenCalledWith("chat.abort", { + sessionKey, + runId: pending.runId, + }); + await expect(pending.promptPromise).resolves.toEqual({ stopReason: "cancelled" }); + }); + + it("cancel skips chat.abort when there is no active run and no pending prompt", async () => { + const sessionKey = "agent:main:shared"; + const harness = createHarness([{ sessionId: "session-1", sessionKey }]); + + await harness.agent.cancel({ sessionId: "session-1" } as CancelNotification); + + const abortCalls = harness.requestSpy.mock.calls.filter(([method]) => method === "chat.abort"); + expect(abortCalls).toHaveLength(0); + }); + + it("cancel from a session without active run does not abort another session sharing the same key", async () => { + const sessionKey = "agent:main:shared"; + const harness = createHarness([ + { sessionId: "session-1", sessionKey }, + { sessionId: "session-2", sessionKey }, + ]); + const pending2 = await startPendingPrompt(harness, "session-2"); + + await harness.agent.cancel({ sessionId: "session-1" } as CancelNotification); + + const abortCalls = harness.requestSpy.mock.calls.filter(([method]) => method === "chat.abort"); + expect(abortCalls).toHaveLength(0); + expect(harness.sessionStore.getSession("session-2")?.activeRunId).toBe(pending2.runId); + + await harness.agent.handleGatewayEvent( + createChatEvent({ + runId: pending2.runId, + sessionKey, + seq: 1, + state: "final", + }), + ); + await expect(pending2.promptPromise).resolves.toEqual({ stopReason: "end_turn" }); + }); + + it("drops chat events when runId does not match the active prompt", async () => { + const sessionKey = "agent:main:shared"; + const harness = createHarness([{ sessionId: "session-1", sessionKey }]); + const pending = await startPendingPrompt(harness, "session-1"); + + await harness.agent.handleGatewayEvent( + createChatEvent({ + runId: "run-other", + sessionKey, + seq: 1, + state: "final", + }), + ); + expect(harness.sessionStore.getSession("session-1")?.activeRunId).toBe(pending.runId); + + await harness.agent.handleGatewayEvent( + createChatEvent({ + runId: pending.runId, + sessionKey, + seq: 2, + state: "final", + }), + ); + await expect(pending.promptPromise).resolves.toEqual({ stopReason: "end_turn" }); + }); + + it("drops tool events when runId does not match the active prompt", async () => { + const sessionKey = "agent:main:shared"; + const harness = createHarness([{ sessionId: "session-1", sessionKey }]); + const pending = await startPendingPrompt(harness, "session-1"); + harness.sessionUpdateSpy.mockClear(); + + await harness.agent.handleGatewayEvent( + createToolEvent({ + runId: "run-other", + sessionKey, + stream: "tool", + data: { + phase: "start", + name: "read_file", + toolCallId: "tool-1", + args: { path: "README.md" }, + }, + }), + ); + + expect(harness.sessionUpdateSpy).not.toHaveBeenCalled(); + + await harness.agent.handleGatewayEvent( + createChatEvent({ + runId: pending.runId, + sessionKey, + seq: 1, + state: "final", + }), + ); + await expect(pending.promptPromise).resolves.toEqual({ stopReason: "end_turn" }); + }); + + it("routes events to the pending prompt that matches runId when session keys are shared", async () => { + const sessionKey = "agent:main:shared"; + const harness = createHarness([ + { sessionId: "session-1", sessionKey }, + { sessionId: "session-2", sessionKey }, + ]); + const pending1 = await startPendingPrompt(harness, "session-1"); + const pending2 = await startPendingPrompt(harness, "session-2"); + harness.sessionUpdateSpy.mockClear(); + + await harness.agent.handleGatewayEvent( + createToolEvent({ + runId: pending2.runId, + sessionKey, + stream: "tool", + data: { + phase: "start", + name: "read_file", + toolCallId: "tool-2", + args: { path: "notes.txt" }, + }, + }), + ); + expect(harness.sessionUpdateSpy).toHaveBeenCalledWith( + expect.objectContaining({ + sessionId: "session-2", + update: expect.objectContaining({ + sessionUpdate: "tool_call", + toolCallId: "tool-2", + status: "in_progress", + }), + }), + ); + expect(harness.sessionUpdateSpy).toHaveBeenCalledTimes(1); + + await harness.agent.handleGatewayEvent( + createChatEvent({ + runId: pending2.runId, + sessionKey, + seq: 1, + state: "final", + }), + ); + await expect(pending2.promptPromise).resolves.toEqual({ stopReason: "end_turn" }); + expect(harness.sessionStore.getSession("session-1")?.activeRunId).toBe(pending1.runId); + + await harness.agent.handleGatewayEvent( + createChatEvent({ + runId: pending1.runId, + sessionKey, + seq: 2, + state: "final", + }), + ); + await expect(pending1.promptPromise).resolves.toEqual({ stopReason: "end_turn" }); + }); +}); diff --git a/src/acp/translator.ts b/src/acp/translator.ts index 667c075e9c0..585f97c8f43 100644 --- a/src/acp/translator.ts +++ b/src/acp/translator.ts @@ -633,14 +633,25 @@ export class AcpGatewayAgent implements Agent { if (!session) { return; } + // Capture runId before cancelActiveRun clears session.activeRunId. + const activeRunId = session.activeRunId; + this.sessionStore.cancelActiveRun(params.sessionId); + const pending = this.pendingPrompts.get(params.sessionId); + const scopedRunId = activeRunId ?? pending?.idempotencyKey; + if (!scopedRunId) { + return; + } + try { - await this.gateway.request("chat.abort", { sessionKey: session.sessionKey }); + await this.gateway.request("chat.abort", { + sessionKey: session.sessionKey, + runId: scopedRunId, + }); } catch (err) { this.log(`cancel error: ${String(err)}`); } - const pending = this.pendingPrompts.get(params.sessionId); if (pending) { this.pendingPrompts.delete(params.sessionId); pending.resolve({ stopReason: "cancelled" }); @@ -672,6 +683,7 @@ export class AcpGatewayAgent implements Agent { return; } const stream = payload.stream as string | undefined; + const runId = payload.runId as string | undefined; const data = payload.data as Record | undefined; const sessionKey = payload.sessionKey as string | undefined; if (!stream || !data || !sessionKey) { @@ -688,7 +700,7 @@ export class AcpGatewayAgent implements Agent { return; } - const pending = this.findPendingBySessionKey(sessionKey); + const pending = this.findPendingBySessionKey(sessionKey, runId); if (!pending) { return; } @@ -774,13 +786,10 @@ export class AcpGatewayAgent implements Agent { return; } - const pending = this.findPendingBySessionKey(sessionKey); + const pending = this.findPendingBySessionKey(sessionKey, runId); if (!pending) { return; } - if (runId && pending.idempotencyKey !== runId) { - return; - } if (state === "delta" && messageData) { await this.handleDeltaEvent(pending.sessionId, messageData); @@ -853,11 +862,15 @@ export class AcpGatewayAgent implements Agent { pending.resolve({ stopReason }); } - private findPendingBySessionKey(sessionKey: string): PendingPrompt | undefined { + private findPendingBySessionKey(sessionKey: string, runId?: string): PendingPrompt | undefined { for (const pending of this.pendingPrompts.values()) { - if (pending.sessionKey === sessionKey) { - return pending; + if (pending.sessionKey !== sessionKey) { + continue; } + if (runId && pending.idempotencyKey !== runId) { + continue; + } + return pending; } return undefined; }