diff --git a/CHANGELOG.md b/CHANGELOG.md index 35b5bdcad66..2c8098c0578 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -118,6 +118,7 @@ Docs: https://docs.openclaw.ai - Gateway/config validation: stop treating the implicit default memory slot as a required explicit plugin config, so startup no longer fails with `plugins.slots.memory: plugin not found: memory-core` when `memory-core` was only inferred. (#47494) Thanks @ngutman. - Tlon: honor explicit empty allowlists and defer cite expansion. (#46788) Thanks @zpbrent and @vincentkoc. - Tlon/DM auth: defer cited-message expansion until after DM authorization and owner command handling, so unauthorized DMs and owner approval/admin commands no longer trigger cross-channel cite fetches before the deny or command path. +- Gateway/agent events: stop broadcasting false end-of-run `seq gap` errors to clients, and isolate node-driven ingress turns with per-turn run IDs so stale tail events cannot leak into later session runs. (#43751) Thanks @caesargattuso. - Docs/security audit: spell out that `gateway.controlUi.allowedOrigins: ["*"]` is an explicit allow-all browser-origin policy and should be avoided outside tightly controlled local testing. - Gateway/auth: clear self-declared scopes for device-less trusted-proxy Control UI sessions so proxy-authenticated connects cannot claim admin or secrets scopes without a bound device identity. - Nodes/pending actions: re-check queued foreground actions against the current node command policy before returning them to the node. (#46815) Thanks @zpbrent and @vincentkoc. diff --git a/src/gateway/server-chat.agent-events.test.ts b/src/gateway/server-chat.agent-events.test.ts index 72eb09c8643..dd644955afc 100644 --- a/src/gateway/server-chat.agent-events.test.ts +++ b/src/gateway/server-chat.agent-events.test.ts @@ -487,6 +487,46 @@ describe("agent event handler", () => { nowSpy?.mockRestore(); }); + it("drops stale events that arrive after lifecycle completion", () => { + const { broadcast, nodeSendToSession, chatRunState, handler, nowSpy } = createHarness({ + now: 2_500, + }); + chatRunState.registry.add("run-stale-tail", { + sessionKey: "session-stale-tail", + clientRunId: "client-stale-tail", + }); + + handler({ + runId: "run-stale-tail", + seq: 1, + stream: "assistant", + ts: Date.now(), + data: { text: "done" }, + }); + emitLifecycleEnd(handler, "run-stale-tail"); + const errorCallsBeforeStaleEvent = broadcast.mock.calls.filter( + ([event, payload]) => + event === "agent" && (payload as { stream?: string }).stream === "error", + ).length; + const sessionChatCallsBeforeStaleEvent = sessionChatCalls(nodeSendToSession).length; + + handler({ + runId: "run-stale-tail", + seq: 3, + stream: "assistant", + ts: Date.now(), + data: { text: "late tail" }, + }); + + const errorCalls = broadcast.mock.calls.filter( + ([event, payload]) => + event === "agent" && (payload as { stream?: string }).stream === "error", + ); + expect(errorCalls).toHaveLength(errorCallsBeforeStaleEvent); + expect(sessionChatCalls(nodeSendToSession)).toHaveLength(sessionChatCallsBeforeStaleEvent); + nowSpy?.mockRestore(); + }); + it("flushes buffered chat delta before tool start events", () => { let now = 12_000; const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now); diff --git a/src/gateway/server-chat.ts b/src/gateway/server-chat.ts index 0579f4083c0..7fda61b6c0c 100644 --- a/src/gateway/server-chat.ts +++ b/src/gateway/server-chat.ts @@ -710,7 +710,7 @@ export function createAgentEventHandler({ : { ...eventForClients, data }; })() : agentPayload; - if (evt.seq !== last + 1) { + if (last > 0 && evt.seq !== last + 1) { broadcast("agent", { runId: eventRunId, stream: "error", diff --git a/src/gateway/server-node-events.test.ts b/src/gateway/server-node-events.test.ts index a5a7578ddbc..dbf1bde579f 100644 --- a/src/gateway/server-node-events.test.ts +++ b/src/gateway/server-node-events.test.ts @@ -410,7 +410,9 @@ describe("voice transcript events", () => { }); it("forwards transcript with voice provenance", async () => { + const addChatRun = vi.fn(); const ctx = buildCtx(); + ctx.addChatRun = addChatRun; await handleNodeEvent(ctx, "node-v2", { event: "voice.transcript", @@ -432,6 +434,12 @@ describe("voice transcript events", () => { sourceTool: "gateway.voice.transcript", }, }); + expect(typeof opts.runId).toBe("string"); + expect(opts.runId).not.toBe(opts.sessionId); + expect(addChatRun).toHaveBeenCalledWith( + opts.runId, + expect.objectContaining({ clientRunId: expect.stringMatching(/^voice-/) }), + ); }); it("does not block agent dispatch when session-store touch fails", async () => { @@ -674,5 +682,6 @@ describe("agent request events", () => { channel: "telegram", to: "123", }); + expect(opts.runId).toBe(opts.sessionId); }); }); diff --git a/src/gateway/server-node-events.ts b/src/gateway/server-node-events.ts index c2aa3c454c7..2e9e911725a 100644 --- a/src/gateway/server-node-events.ts +++ b/src/gateway/server-node-events.ts @@ -288,16 +288,18 @@ export const handleNodeEvent = async (ctx: NodeEventContext, nodeId: string, evt sessionId, now, }); + const runId = randomUUID(); // Ensure chat UI clients refresh when this run completes (even though it wasn't started via chat.send). - // This maps agent bus events (keyed by sessionId) to chat events (keyed by clientRunId). - ctx.addChatRun(sessionId, { + // This maps agent bus events (keyed by per-turn runId) to chat events (keyed by clientRunId). + ctx.addChatRun(runId, { sessionKey: canonicalKey, clientRunId: `voice-${randomUUID()}`, }); void agentCommandFromIngress( { + runId, message: text, sessionId, sessionKey: canonicalKey, @@ -404,7 +406,6 @@ export const handleNodeEvent = async (ctx: NodeEventContext, nodeId: string, evt const deliver = deliverRequested && Boolean(channel && to); const deliveryChannel = deliver ? channel : undefined; const deliveryTo = deliver ? to : undefined; - if (deliverRequested && !deliver) { ctx.logGateway.warn( `agent delivery disabled node=${nodeId}: missing session delivery route (channel=${channel ?? "-"} to=${to ?? "-"})`, @@ -430,6 +431,7 @@ export const handleNodeEvent = async (ctx: NodeEventContext, nodeId: string, evt void agentCommandFromIngress( { + runId: sessionId, message, images, sessionId,