fix(acp): scope cancellation and event routing by runId (#41331)

This commit is contained in:
Pejman Pour-Moezzi 2026-03-10 14:37:21 -07:00 committed by GitHub
parent c00117aff2
commit 7c76acafd6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 298 additions and 10 deletions

View File

@ -144,6 +144,7 @@ Docs: https://docs.openclaw.ai
- Skills/download installs: pin the validated per-skill tools root before writing downloaded archives, so rebinding the lexical tools path cannot redirect download writes outside the intended tools directory. Thanks @tdjackey.
- Control UI/Debug: replace the Manual RPC free-text method field with a sorted dropdown sourced from gateway-advertised methods, and stack the form vertically for narrower layouts. (#14967) thanks @rixau.
- Auth/profile resolution: log debug details when auto-discovered auth profiles fail during provider API-key resolution, so `--debug` output surfaces the real refresh/keychain/credential-store failure instead of only the generic missing-key message. (#41271) thanks @he-yufeng.
- ACP/cancel scoping: scope `chat.abort` and shared-session ACP event routing by `runId` so one session cannot cancel or consume another session's run when they share the same gateway session key. (#41331) Thanks @pejmanjohn.
## 2026.3.7

View File

@ -0,0 +1,274 @@
import type { CancelNotification, PromptRequest, PromptResponse } from "@agentclientprotocol/sdk";
import { describe, expect, it, vi } from "vitest";
import type { GatewayClient } from "../gateway/client.js";
import type { EventFrame } from "../gateway/protocol/index.js";
import { createInMemorySessionStore } from "./session.js";
import { AcpGatewayAgent } from "./translator.js";
import { createAcpConnection, createAcpGateway } from "./translator.test-helpers.js";
type Harness = {
agent: AcpGatewayAgent;
requestSpy: ReturnType<typeof vi.fn>;
sessionUpdateSpy: ReturnType<typeof vi.fn>;
sessionStore: ReturnType<typeof createInMemorySessionStore>;
sentRunIds: string[];
};
function createPromptRequest(sessionId: string): PromptRequest {
return {
sessionId,
prompt: [{ type: "text", text: "hello" }],
_meta: {},
} as unknown as PromptRequest;
}
function createChatEvent(payload: Record<string, unknown>): EventFrame {
return {
type: "event",
event: "chat",
payload,
} as EventFrame;
}
function createToolEvent(payload: Record<string, unknown>): EventFrame {
return {
type: "event",
event: "agent",
payload,
} as EventFrame;
}
function createHarness(sessions: Array<{ sessionId: string; sessionKey: string }>): Harness {
const sentRunIds: string[] = [];
const requestSpy = vi.fn(async (method: string, params?: Record<string, unknown>) => {
if (method === "chat.send") {
const runId = params?.idempotencyKey;
if (typeof runId === "string") {
sentRunIds.push(runId);
}
return new Promise<never>(() => {});
}
return {};
});
const connection = createAcpConnection();
const sessionStore = createInMemorySessionStore();
for (const session of sessions) {
sessionStore.createSession({
sessionId: session.sessionId,
sessionKey: session.sessionKey,
cwd: "/tmp",
});
}
const agent = new AcpGatewayAgent(
connection,
createAcpGateway(requestSpy as unknown as GatewayClient["request"]),
{ sessionStore },
);
return {
agent,
requestSpy,
// eslint-disable-next-line @typescript-eslint/unbound-method
sessionUpdateSpy: connection.sessionUpdate as unknown as ReturnType<typeof vi.fn>,
sessionStore,
sentRunIds,
};
}
async function startPendingPrompt(
harness: Harness,
sessionId: string,
): Promise<{ promptPromise: Promise<PromptResponse>; runId: string }> {
const before = harness.sentRunIds.length;
const promptPromise = harness.agent.prompt(createPromptRequest(sessionId));
await vi.waitFor(() => {
expect(harness.sentRunIds.length).toBe(before + 1);
});
return {
promptPromise,
runId: harness.sentRunIds[before],
};
}
describe("acp translator cancel and run scoping", () => {
it("cancel passes active runId to chat.abort", async () => {
const sessionKey = "agent:main:shared";
const harness = createHarness([{ sessionId: "session-1", sessionKey }]);
const pending = await startPendingPrompt(harness, "session-1");
await harness.agent.cancel({ sessionId: "session-1" } as CancelNotification);
expect(harness.requestSpy).toHaveBeenCalledWith("chat.abort", {
sessionKey,
runId: pending.runId,
});
await expect(pending.promptPromise).resolves.toEqual({ stopReason: "cancelled" });
});
it("cancel uses pending runId when there is no active run", async () => {
const sessionKey = "agent:main:shared";
const harness = createHarness([{ sessionId: "session-1", sessionKey }]);
const pending = await startPendingPrompt(harness, "session-1");
harness.sessionStore.clearActiveRun("session-1");
await harness.agent.cancel({ sessionId: "session-1" } as CancelNotification);
expect(harness.requestSpy).toHaveBeenCalledWith("chat.abort", {
sessionKey,
runId: pending.runId,
});
await expect(pending.promptPromise).resolves.toEqual({ stopReason: "cancelled" });
});
it("cancel skips chat.abort when there is no active run and no pending prompt", async () => {
const sessionKey = "agent:main:shared";
const harness = createHarness([{ sessionId: "session-1", sessionKey }]);
await harness.agent.cancel({ sessionId: "session-1" } as CancelNotification);
const abortCalls = harness.requestSpy.mock.calls.filter(([method]) => method === "chat.abort");
expect(abortCalls).toHaveLength(0);
});
it("cancel from a session without active run does not abort another session sharing the same key", async () => {
const sessionKey = "agent:main:shared";
const harness = createHarness([
{ sessionId: "session-1", sessionKey },
{ sessionId: "session-2", sessionKey },
]);
const pending2 = await startPendingPrompt(harness, "session-2");
await harness.agent.cancel({ sessionId: "session-1" } as CancelNotification);
const abortCalls = harness.requestSpy.mock.calls.filter(([method]) => method === "chat.abort");
expect(abortCalls).toHaveLength(0);
expect(harness.sessionStore.getSession("session-2")?.activeRunId).toBe(pending2.runId);
await harness.agent.handleGatewayEvent(
createChatEvent({
runId: pending2.runId,
sessionKey,
seq: 1,
state: "final",
}),
);
await expect(pending2.promptPromise).resolves.toEqual({ stopReason: "end_turn" });
});
it("drops chat events when runId does not match the active prompt", async () => {
const sessionKey = "agent:main:shared";
const harness = createHarness([{ sessionId: "session-1", sessionKey }]);
const pending = await startPendingPrompt(harness, "session-1");
await harness.agent.handleGatewayEvent(
createChatEvent({
runId: "run-other",
sessionKey,
seq: 1,
state: "final",
}),
);
expect(harness.sessionStore.getSession("session-1")?.activeRunId).toBe(pending.runId);
await harness.agent.handleGatewayEvent(
createChatEvent({
runId: pending.runId,
sessionKey,
seq: 2,
state: "final",
}),
);
await expect(pending.promptPromise).resolves.toEqual({ stopReason: "end_turn" });
});
it("drops tool events when runId does not match the active prompt", async () => {
const sessionKey = "agent:main:shared";
const harness = createHarness([{ sessionId: "session-1", sessionKey }]);
const pending = await startPendingPrompt(harness, "session-1");
harness.sessionUpdateSpy.mockClear();
await harness.agent.handleGatewayEvent(
createToolEvent({
runId: "run-other",
sessionKey,
stream: "tool",
data: {
phase: "start",
name: "read_file",
toolCallId: "tool-1",
args: { path: "README.md" },
},
}),
);
expect(harness.sessionUpdateSpy).not.toHaveBeenCalled();
await harness.agent.handleGatewayEvent(
createChatEvent({
runId: pending.runId,
sessionKey,
seq: 1,
state: "final",
}),
);
await expect(pending.promptPromise).resolves.toEqual({ stopReason: "end_turn" });
});
it("routes events to the pending prompt that matches runId when session keys are shared", async () => {
const sessionKey = "agent:main:shared";
const harness = createHarness([
{ sessionId: "session-1", sessionKey },
{ sessionId: "session-2", sessionKey },
]);
const pending1 = await startPendingPrompt(harness, "session-1");
const pending2 = await startPendingPrompt(harness, "session-2");
harness.sessionUpdateSpy.mockClear();
await harness.agent.handleGatewayEvent(
createToolEvent({
runId: pending2.runId,
sessionKey,
stream: "tool",
data: {
phase: "start",
name: "read_file",
toolCallId: "tool-2",
args: { path: "notes.txt" },
},
}),
);
expect(harness.sessionUpdateSpy).toHaveBeenCalledWith(
expect.objectContaining({
sessionId: "session-2",
update: expect.objectContaining({
sessionUpdate: "tool_call",
toolCallId: "tool-2",
status: "in_progress",
}),
}),
);
expect(harness.sessionUpdateSpy).toHaveBeenCalledTimes(1);
await harness.agent.handleGatewayEvent(
createChatEvent({
runId: pending2.runId,
sessionKey,
seq: 1,
state: "final",
}),
);
await expect(pending2.promptPromise).resolves.toEqual({ stopReason: "end_turn" });
expect(harness.sessionStore.getSession("session-1")?.activeRunId).toBe(pending1.runId);
await harness.agent.handleGatewayEvent(
createChatEvent({
runId: pending1.runId,
sessionKey,
seq: 2,
state: "final",
}),
);
await expect(pending1.promptPromise).resolves.toEqual({ stopReason: "end_turn" });
});
});

View File

@ -633,14 +633,25 @@ export class AcpGatewayAgent implements Agent {
if (!session) {
return;
}
// Capture runId before cancelActiveRun clears session.activeRunId.
const activeRunId = session.activeRunId;
this.sessionStore.cancelActiveRun(params.sessionId);
const pending = this.pendingPrompts.get(params.sessionId);
const scopedRunId = activeRunId ?? pending?.idempotencyKey;
if (!scopedRunId) {
return;
}
try {
await this.gateway.request("chat.abort", { sessionKey: session.sessionKey });
await this.gateway.request("chat.abort", {
sessionKey: session.sessionKey,
runId: scopedRunId,
});
} catch (err) {
this.log(`cancel error: ${String(err)}`);
}
const pending = this.pendingPrompts.get(params.sessionId);
if (pending) {
this.pendingPrompts.delete(params.sessionId);
pending.resolve({ stopReason: "cancelled" });
@ -672,6 +683,7 @@ export class AcpGatewayAgent implements Agent {
return;
}
const stream = payload.stream as string | undefined;
const runId = payload.runId as string | undefined;
const data = payload.data as Record<string, unknown> | undefined;
const sessionKey = payload.sessionKey as string | undefined;
if (!stream || !data || !sessionKey) {
@ -688,7 +700,7 @@ export class AcpGatewayAgent implements Agent {
return;
}
const pending = this.findPendingBySessionKey(sessionKey);
const pending = this.findPendingBySessionKey(sessionKey, runId);
if (!pending) {
return;
}
@ -774,13 +786,10 @@ export class AcpGatewayAgent implements Agent {
return;
}
const pending = this.findPendingBySessionKey(sessionKey);
const pending = this.findPendingBySessionKey(sessionKey, runId);
if (!pending) {
return;
}
if (runId && pending.idempotencyKey !== runId) {
return;
}
if (state === "delta" && messageData) {
await this.handleDeltaEvent(pending.sessionId, messageData);
@ -853,11 +862,15 @@ export class AcpGatewayAgent implements Agent {
pending.resolve({ stopReason });
}
private findPendingBySessionKey(sessionKey: string): PendingPrompt | undefined {
private findPendingBySessionKey(sessionKey: string, runId?: string): PendingPrompt | undefined {
for (const pending of this.pendingPrompts.values()) {
if (pending.sessionKey === sessionKey) {
return pending;
if (pending.sessionKey !== sessionKey) {
continue;
}
if (runId && pending.idempotencyKey !== runId) {
continue;
}
return pending;
}
return undefined;
}