diff --git a/CHANGELOG.md b/CHANGELOG.md index bc124c88112..940017f8277 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -99,6 +99,7 @@ Docs: https://docs.openclaw.ai - Agents/compaction: keep late compaction-retry rejections handled after the aggregate timeout path wins without swallowing real pre-timeout wait failures, so timed-out retries no longer surface an unhandled rejection on later unsubscribe. (#57451) Thanks @mpz4life and @vincentkoc. - Matrix/delivery recovery: treat Synapse `User not in room` replay failures as permanent during startup recovery so poisoned queued messages move to `failed/` instead of crash-looping Matrix after restart. (#57426) thanks @dlardo. - Plugins/facades: guard bundled plugin facade loads with a cache-first sentinel so circular re-entry stops crashing `xai`, `sglang`, and `vllm` during gateway plugin startup. (#57508) Thanks @openperf. +- Agents/MCP: dispose bundled MCP runtimes after one-shot `openclaw agent --local` runs finish, while preserving bundled MCP state across in-run retries so local JSON runs exit cleanly without restarting stateful MCP tools mid-run. ## 2026.3.28 diff --git a/src/agents/command/attempt-execution.ts b/src/agents/command/attempt-execution.ts index ea41eb7b8b1..81d47d09a10 100644 --- a/src/agents/command/attempt-execution.ts +++ b/src/agents/command/attempt-execution.ts @@ -471,6 +471,7 @@ export function runAgentAttempt(params: { streamParams: params.opts.streamParams, agentDir: params.agentDir, allowTransientCooldownProbe: params.allowTransientCooldownProbe, + cleanupBundleMcpOnRunEnd: params.opts.cleanupBundleMcpOnRunEnd, onAgentEvent: params.onAgentEvent, bootstrapPromptWarningSignaturesSeen, bootstrapPromptWarningSignature, diff --git a/src/agents/command/types.ts b/src/agents/command/types.ts index a85157bb191..4c677a9c9e3 100644 --- a/src/agents/command/types.ts +++ b/src/agents/command/types.ts @@ -88,6 +88,8 @@ export type AgentCommandOpts = { streamParams?: AgentStreamParams; /** Explicit workspace directory override (for subagents to inherit parent workspace). */ workspaceDir?: SpawnedRunMetadata["workspaceDir"]; + /** Force bundled MCP teardown when a one-shot local run completes. */ + cleanupBundleMcpOnRunEnd?: boolean; }; export type AgentCommandIngressOpts = Omit< diff --git a/src/agents/pi-bundle-mcp-materialize.ts b/src/agents/pi-bundle-mcp-materialize.ts index 221b9b1fe34..65c335c5ef9 100644 --- a/src/agents/pi-bundle-mcp-materialize.ts +++ b/src/agents/pi-bundle-mcp-materialize.ts @@ -62,6 +62,7 @@ function toAgentToolResult(params: { export async function materializeBundleMcpToolsForRun(params: { runtime: SessionMcpRuntime; reservedToolNames?: Iterable; + disposeRuntime?: () => Promise; }): Promise { params.runtime.markUsed(); const catalog = await params.runtime.getCatalog(); @@ -102,7 +103,9 @@ export async function materializeBundleMcpToolsForRun(params: { return { tools, - dispose: async () => {}, + dispose: async () => { + await params.disposeRuntime?.(); + }, }; } @@ -119,11 +122,9 @@ export async function createBundleMcpToolRuntime(params: { const materialized = await materializeBundleMcpToolsForRun({ runtime, reservedToolNames: params.reservedToolNames, - }); - return { - tools: materialized.tools, - dispose: async () => { + disposeRuntime: async () => { await runtime.dispose(); }, - }; + }); + return materialized; } diff --git a/src/agents/pi-bundle-mcp-runtime.test.ts b/src/agents/pi-bundle-mcp-runtime.test.ts index 62e0ce80a19..e37c439ead1 100644 --- a/src/agents/pi-bundle-mcp-runtime.test.ts +++ b/src/agents/pi-bundle-mcp-runtime.test.ts @@ -213,4 +213,63 @@ describe("session MCP runtime", () => { expect(await fs.readFile(startupCounterPath, "utf8")).toBe("1"); expect(__testing.getCachedSessionIds()).not.toContain("session-d"); }); + + it("materialized disposal can retire a manager-owned runtime", async () => { + const workspaceDir = await makeTempDir("openclaw-bundle-mcp-tools-"); + const startupCounterPath = path.join(workspaceDir, "bundle-starts.txt"); + const pidPath = path.join(workspaceDir, "bundle.pid"); + const exitMarkerPath = path.join(workspaceDir, "bundle.exit"); + const pluginRoot = path.join(workspaceDir, ".openclaw", "extensions", "bundle-probe"); + const serverScriptPath = path.join(pluginRoot, "servers", "bundle-probe.mjs"); + await writeBundleProbeMcpServer(serverScriptPath, { + startupCounterPath, + pidPath, + exitMarkerPath, + }); + await writeClaudeBundle({ pluginRoot, serverScriptPath }); + + const runtimeA = await getOrCreateSessionMcpRuntime({ + sessionId: "session-e", + sessionKey: "agent:test:session-e", + workspaceDir, + cfg: { + plugins: { + entries: { + "bundle-probe": { enabled: true }, + }, + }, + }, + }); + const materialized = await materializeBundleMcpToolsForRun({ + runtime: runtimeA, + disposeRuntime: async () => { + await disposeSessionMcpRuntime("session-e"); + }, + }); + + expect(materialized.tools.map((tool) => tool.name)).toEqual(["bundleProbe__bundle_probe"]); + expect(await waitForFileText(pidPath)).toMatch(/^\d+$/); + + await materialized.dispose(); + + expect(await waitForFileText(exitMarkerPath)).toBe("exited"); + expect(__testing.getCachedSessionIds()).not.toContain("session-e"); + + const runtimeB = await getOrCreateSessionMcpRuntime({ + sessionId: "session-e", + sessionKey: "agent:test:session-e", + workspaceDir, + cfg: { + plugins: { + entries: { + "bundle-probe": { enabled: true }, + }, + }, + }, + }); + + expect(runtimeB).not.toBe(runtimeA); + await materializeBundleMcpToolsForRun({ runtime: runtimeB }); + expect(await fs.readFile(startupCounterPath, "utf8")).toBe("2"); + }); }); diff --git a/src/agents/pi-embedded-runner.e2e.test.ts b/src/agents/pi-embedded-runner.e2e.test.ts index 1dfc0b93384..cc94a2e9d49 100644 --- a/src/agents/pi-embedded-runner.e2e.test.ts +++ b/src/agents/pi-embedded-runner.e2e.test.ts @@ -15,6 +15,10 @@ import { } from "./test-helpers/pi-embedded-runner-e2e-fixtures.js"; const runEmbeddedAttemptMock = vi.fn(); +const disposeSessionMcpRuntimeMock = vi.fn<(sessionId: string) => Promise>(async () => { + return undefined; +}); +let refreshRuntimeAuthOnFirstPromptError = false; vi.mock("@mariozechner/pi-ai", async (importOriginal) => { const actual = await importOriginal(); @@ -94,6 +98,9 @@ const installRunEmbeddedMocks = () => { vi.doMock("./pi-embedded-runner/run/attempt.js", () => ({ runEmbeddedAttempt: (params: unknown) => runEmbeddedAttemptMock(params), })); + vi.doMock("./pi-bundle-mcp-tools.js", () => ({ + disposeSessionMcpRuntime: (sessionId: string) => disposeSessionMcpRuntimeMock(sessionId), + })); vi.doMock("./pi-embedded-runner/model.js", async (importOriginal) => { const actual = await importOriginal(); return { @@ -102,6 +109,16 @@ const installRunEmbeddedMocks = () => { createResolvedEmbeddedRunnerModel(provider, modelId), }; }); + vi.doMock("./pi-embedded-runner/run/auth-controller.js", () => ({ + createEmbeddedRunAuthController: () => ({ + advanceAuthProfile: vi.fn(async () => false), + initializeAuthProfile: vi.fn(async () => undefined), + maybeRefreshRuntimeAuthForAuthError: vi.fn(async (_errorText: string, runtimeAuthRetry) => { + return refreshRuntimeAuthOnFirstPromptError && runtimeAuthRetry !== true; + }), + stopRuntimeAuthRefreshTimer: vi.fn(), + }), + })); vi.doMock("../plugins/provider-runtime.js", async (importOriginal) => { const actual = await importOriginal(); return { @@ -144,6 +161,8 @@ afterAll(async () => { beforeEach(() => { vi.useRealTimers(); runEmbeddedAttemptMock.mockReset(); + disposeSessionMcpRuntimeMock.mockReset(); + refreshRuntimeAuthOnFirstPromptError = false; runEmbeddedAttemptMock.mockImplementation(async () => { throw new Error("unexpected extra runEmbeddedAttempt call"); }); @@ -245,6 +264,84 @@ const runDefaultEmbeddedTurn = async (sessionFile: string, prompt: string, sessi }; describe("runEmbeddedPiAgent", () => { + it("disposes bundle MCP once when a one-shot local run completes", async () => { + const sessionFile = nextSessionFile(); + const cfg = createEmbeddedPiRunnerOpenAiConfig(["mock-1"]); + const sessionKey = nextSessionKey(); + runEmbeddedAttemptMock.mockResolvedValueOnce( + makeEmbeddedRunnerAttempt({ + assistantTexts: ["ok"], + lastAssistant: buildEmbeddedRunnerAssistant({ + content: [{ type: "text", text: "ok" }], + }), + }), + ); + + await runEmbeddedPiAgent({ + sessionId: "session:test", + sessionKey, + sessionFile, + workspaceDir, + config: cfg, + prompt: "hello", + provider: "openai", + model: "mock-1", + timeoutMs: 5_000, + agentDir, + runId: nextRunId("bundle-mcp-run-cleanup"), + enqueue: immediateEnqueue, + cleanupBundleMcpOnRunEnd: true, + }); + + expect(runEmbeddedAttemptMock).toHaveBeenCalledTimes(1); + expect(disposeSessionMcpRuntimeMock).toHaveBeenCalledTimes(1); + expect(disposeSessionMcpRuntimeMock).toHaveBeenCalledWith("session:test"); + }); + + it("preserves bundle MCP state across retries within one local run", async () => { + refreshRuntimeAuthOnFirstPromptError = true; + const sessionFile = nextSessionFile(); + const cfg = createEmbeddedPiRunnerOpenAiConfig(["mock-1"]); + const sessionKey = nextSessionKey(); + runEmbeddedAttemptMock + .mockImplementationOnce(async () => { + expect(disposeSessionMcpRuntimeMock).not.toHaveBeenCalled(); + return makeEmbeddedRunnerAttempt({ + promptError: new Error("401 unauthorized"), + }); + }) + .mockImplementationOnce(async () => { + expect(disposeSessionMcpRuntimeMock).not.toHaveBeenCalled(); + return makeEmbeddedRunnerAttempt({ + assistantTexts: ["ok"], + lastAssistant: buildEmbeddedRunnerAssistant({ + content: [{ type: "text", text: "ok" }], + }), + }); + }); + + const result = await runEmbeddedPiAgent({ + sessionId: "session:test", + sessionKey, + sessionFile, + workspaceDir, + config: cfg, + prompt: "hello", + provider: "openai", + model: "mock-1", + timeoutMs: 5_000, + agentDir, + runId: nextRunId("bundle-mcp-retry"), + enqueue: immediateEnqueue, + cleanupBundleMcpOnRunEnd: true, + }); + + expect(runEmbeddedAttemptMock).toHaveBeenCalledTimes(2); + expect(result.payloads?.[0]).toMatchObject({ text: "ok" }); + expect(disposeSessionMcpRuntimeMock).toHaveBeenCalledTimes(1); + expect(disposeSessionMcpRuntimeMock).toHaveBeenCalledWith("session:test"); + }); + it("handles prompt error paths without dropping user state", async () => { const sessionFile = nextSessionFile(); const cfg = createEmbeddedPiRunnerOpenAiConfig(["mock-error"]); diff --git a/src/agents/pi-embedded-runner/run.ts b/src/agents/pi-embedded-runner/run.ts index 0a29a3f8271..3a90978a0f4 100644 --- a/src/agents/pi-embedded-runner/run.ts +++ b/src/agents/pi-embedded-runner/run.ts @@ -39,6 +39,7 @@ import { } from "../model-auth.js"; import { normalizeProviderId } from "../model-selection.js"; import { ensureOpenClawModelsJson } from "../models-config.js"; +import { disposeSessionMcpRuntime } from "../pi-bundle-mcp-tools.js"; import { classifyFailoverReason, extractObservedOverflowTokenCount, @@ -1431,6 +1432,13 @@ export async function runEmbeddedPiAgent( } finally { await contextEngine.dispose?.(); stopRuntimeAuthRefreshTimer(); + if (params.cleanupBundleMcpOnRunEnd === true) { + await disposeSessionMcpRuntime(params.sessionId).catch((error) => { + log.warn( + `bundle-mcp cleanup failed after run for ${params.sessionId}: ${describeUnknownError(error)}`, + ); + }); + } } }), ); diff --git a/src/agents/pi-embedded-runner/run/params.ts b/src/agents/pi-embedded-runner/run/params.ts index 14c28cdcf61..9ed4411cd34 100644 --- a/src/agents/pi-embedded-runner/run/params.ts +++ b/src/agents/pi-embedded-runner/run/params.ts @@ -132,4 +132,10 @@ export type RunEmbeddedPiAgentParams = { * where transient service pressure is often model-scoped. */ allowTransientCooldownProbe?: boolean; + /** + * Dispose bundled MCP runtimes when the overall run ends instead of preserving + * the session-scoped cache. Intended for one-shot local CLI runs that must + * exit promptly after emitting the final JSON result. + */ + cleanupBundleMcpOnRunEnd?: boolean; }; diff --git a/src/commands/agent-via-gateway.test.ts b/src/commands/agent-via-gateway.test.ts index 15ba8f0c677..9b72f8ad2e6 100644 --- a/src/commands/agent-via-gateway.test.ts +++ b/src/commands/agent-via-gateway.test.ts @@ -135,7 +135,24 @@ describe("agentCliCommand", () => { expect(callGateway).not.toHaveBeenCalled(); expect(agentCommand).toHaveBeenCalledTimes(1); + expect(vi.mocked(agentCommand).mock.calls[0]?.[0]).toMatchObject({ + cleanupBundleMcpOnRunEnd: true, + }); expect(runtime.log).toHaveBeenCalledWith("local"); }); }); + + it("does not force bundle MCP cleanup on gateway fallback", async () => { + await withTempStore(async () => { + vi.mocked(callGateway).mockRejectedValue(new Error("gateway not connected")); + mockLocalAgentReply(); + + await agentCliCommand({ message: "hi", to: "+1555" }, runtime); + + expect(agentCommand).toHaveBeenCalledTimes(1); + expect(vi.mocked(agentCommand).mock.calls[0]?.[0]).not.toMatchObject({ + cleanupBundleMcpOnRunEnd: true, + }); + }); + }); }); diff --git a/src/commands/agent-via-gateway.ts b/src/commands/agent-via-gateway.ts index 8421778b798..f15874f70ce 100644 --- a/src/commands/agent-via-gateway.ts +++ b/src/commands/agent-via-gateway.ts @@ -183,6 +183,7 @@ export async function agentCliCommand(opts: AgentCliOpts, runtime: RuntimeEnv, d ...opts, agentId: opts.agent, replyAccountId: opts.replyAccount, + cleanupBundleMcpOnRunEnd: opts.local === true, }; if (opts.local === true) { return await agentCommand(localOpts, runtime, deps);