mirror of https://github.com/openclaw/openclaw.git
fix(agents): dispose bundled MCP runtime after local runs (#57520)
* fix(agents): dispose bundled MCP runtime after local runs * fix(agents): scope bundle MCP cleanup to local one-shots * fix(agents): dispose bundle MCP after local runs * docs(changelog): note local bundle MCP cleanup fix
This commit is contained in:
parent
926693e993
commit
43cd29c4af
|
|
@ -99,6 +99,7 @@ Docs: https://docs.openclaw.ai
|
|||
- Agents/compaction: keep late compaction-retry rejections handled after the aggregate timeout path wins without swallowing real pre-timeout wait failures, so timed-out retries no longer surface an unhandled rejection on later unsubscribe. (#57451) Thanks @mpz4life and @vincentkoc.
|
||||
- Matrix/delivery recovery: treat Synapse `User not in room` replay failures as permanent during startup recovery so poisoned queued messages move to `failed/` instead of crash-looping Matrix after restart. (#57426) thanks @dlardo.
|
||||
- Plugins/facades: guard bundled plugin facade loads with a cache-first sentinel so circular re-entry stops crashing `xai`, `sglang`, and `vllm` during gateway plugin startup. (#57508) Thanks @openperf.
|
||||
- Agents/MCP: dispose bundled MCP runtimes after one-shot `openclaw agent --local` runs finish, while preserving bundled MCP state across in-run retries so local JSON runs exit cleanly without restarting stateful MCP tools mid-run.
|
||||
|
||||
## 2026.3.28
|
||||
|
||||
|
|
|
|||
|
|
@ -471,6 +471,7 @@ export function runAgentAttempt(params: {
|
|||
streamParams: params.opts.streamParams,
|
||||
agentDir: params.agentDir,
|
||||
allowTransientCooldownProbe: params.allowTransientCooldownProbe,
|
||||
cleanupBundleMcpOnRunEnd: params.opts.cleanupBundleMcpOnRunEnd,
|
||||
onAgentEvent: params.onAgentEvent,
|
||||
bootstrapPromptWarningSignaturesSeen,
|
||||
bootstrapPromptWarningSignature,
|
||||
|
|
|
|||
|
|
@ -88,6 +88,8 @@ export type AgentCommandOpts = {
|
|||
streamParams?: AgentStreamParams;
|
||||
/** Explicit workspace directory override (for subagents to inherit parent workspace). */
|
||||
workspaceDir?: SpawnedRunMetadata["workspaceDir"];
|
||||
/** Force bundled MCP teardown when a one-shot local run completes. */
|
||||
cleanupBundleMcpOnRunEnd?: boolean;
|
||||
};
|
||||
|
||||
export type AgentCommandIngressOpts = Omit<
|
||||
|
|
|
|||
|
|
@ -62,6 +62,7 @@ function toAgentToolResult(params: {
|
|||
export async function materializeBundleMcpToolsForRun(params: {
|
||||
runtime: SessionMcpRuntime;
|
||||
reservedToolNames?: Iterable<string>;
|
||||
disposeRuntime?: () => Promise<void>;
|
||||
}): Promise<BundleMcpToolRuntime> {
|
||||
params.runtime.markUsed();
|
||||
const catalog = await params.runtime.getCatalog();
|
||||
|
|
@ -102,7 +103,9 @@ export async function materializeBundleMcpToolsForRun(params: {
|
|||
|
||||
return {
|
||||
tools,
|
||||
dispose: async () => {},
|
||||
dispose: async () => {
|
||||
await params.disposeRuntime?.();
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
|
|
@ -119,11 +122,9 @@ export async function createBundleMcpToolRuntime(params: {
|
|||
const materialized = await materializeBundleMcpToolsForRun({
|
||||
runtime,
|
||||
reservedToolNames: params.reservedToolNames,
|
||||
});
|
||||
return {
|
||||
tools: materialized.tools,
|
||||
dispose: async () => {
|
||||
disposeRuntime: async () => {
|
||||
await runtime.dispose();
|
||||
},
|
||||
};
|
||||
});
|
||||
return materialized;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -213,4 +213,63 @@ describe("session MCP runtime", () => {
|
|||
expect(await fs.readFile(startupCounterPath, "utf8")).toBe("1");
|
||||
expect(__testing.getCachedSessionIds()).not.toContain("session-d");
|
||||
});
|
||||
|
||||
it("materialized disposal can retire a manager-owned runtime", async () => {
|
||||
const workspaceDir = await makeTempDir("openclaw-bundle-mcp-tools-");
|
||||
const startupCounterPath = path.join(workspaceDir, "bundle-starts.txt");
|
||||
const pidPath = path.join(workspaceDir, "bundle.pid");
|
||||
const exitMarkerPath = path.join(workspaceDir, "bundle.exit");
|
||||
const pluginRoot = path.join(workspaceDir, ".openclaw", "extensions", "bundle-probe");
|
||||
const serverScriptPath = path.join(pluginRoot, "servers", "bundle-probe.mjs");
|
||||
await writeBundleProbeMcpServer(serverScriptPath, {
|
||||
startupCounterPath,
|
||||
pidPath,
|
||||
exitMarkerPath,
|
||||
});
|
||||
await writeClaudeBundle({ pluginRoot, serverScriptPath });
|
||||
|
||||
const runtimeA = await getOrCreateSessionMcpRuntime({
|
||||
sessionId: "session-e",
|
||||
sessionKey: "agent:test:session-e",
|
||||
workspaceDir,
|
||||
cfg: {
|
||||
plugins: {
|
||||
entries: {
|
||||
"bundle-probe": { enabled: true },
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
const materialized = await materializeBundleMcpToolsForRun({
|
||||
runtime: runtimeA,
|
||||
disposeRuntime: async () => {
|
||||
await disposeSessionMcpRuntime("session-e");
|
||||
},
|
||||
});
|
||||
|
||||
expect(materialized.tools.map((tool) => tool.name)).toEqual(["bundleProbe__bundle_probe"]);
|
||||
expect(await waitForFileText(pidPath)).toMatch(/^\d+$/);
|
||||
|
||||
await materialized.dispose();
|
||||
|
||||
expect(await waitForFileText(exitMarkerPath)).toBe("exited");
|
||||
expect(__testing.getCachedSessionIds()).not.toContain("session-e");
|
||||
|
||||
const runtimeB = await getOrCreateSessionMcpRuntime({
|
||||
sessionId: "session-e",
|
||||
sessionKey: "agent:test:session-e",
|
||||
workspaceDir,
|
||||
cfg: {
|
||||
plugins: {
|
||||
entries: {
|
||||
"bundle-probe": { enabled: true },
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
expect(runtimeB).not.toBe(runtimeA);
|
||||
await materializeBundleMcpToolsForRun({ runtime: runtimeB });
|
||||
expect(await fs.readFile(startupCounterPath, "utf8")).toBe("2");
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -15,6 +15,10 @@ import {
|
|||
} from "./test-helpers/pi-embedded-runner-e2e-fixtures.js";
|
||||
|
||||
const runEmbeddedAttemptMock = vi.fn();
|
||||
const disposeSessionMcpRuntimeMock = vi.fn<(sessionId: string) => Promise<void>>(async () => {
|
||||
return undefined;
|
||||
});
|
||||
let refreshRuntimeAuthOnFirstPromptError = false;
|
||||
|
||||
vi.mock("@mariozechner/pi-ai", async (importOriginal) => {
|
||||
const actual = await importOriginal<typeof import("@mariozechner/pi-ai")>();
|
||||
|
|
@ -94,6 +98,9 @@ const installRunEmbeddedMocks = () => {
|
|||
vi.doMock("./pi-embedded-runner/run/attempt.js", () => ({
|
||||
runEmbeddedAttempt: (params: unknown) => runEmbeddedAttemptMock(params),
|
||||
}));
|
||||
vi.doMock("./pi-bundle-mcp-tools.js", () => ({
|
||||
disposeSessionMcpRuntime: (sessionId: string) => disposeSessionMcpRuntimeMock(sessionId),
|
||||
}));
|
||||
vi.doMock("./pi-embedded-runner/model.js", async (importOriginal) => {
|
||||
const actual = await importOriginal<typeof import("./pi-embedded-runner/model.js")>();
|
||||
return {
|
||||
|
|
@ -102,6 +109,16 @@ const installRunEmbeddedMocks = () => {
|
|||
createResolvedEmbeddedRunnerModel(provider, modelId),
|
||||
};
|
||||
});
|
||||
vi.doMock("./pi-embedded-runner/run/auth-controller.js", () => ({
|
||||
createEmbeddedRunAuthController: () => ({
|
||||
advanceAuthProfile: vi.fn(async () => false),
|
||||
initializeAuthProfile: vi.fn(async () => undefined),
|
||||
maybeRefreshRuntimeAuthForAuthError: vi.fn(async (_errorText: string, runtimeAuthRetry) => {
|
||||
return refreshRuntimeAuthOnFirstPromptError && runtimeAuthRetry !== true;
|
||||
}),
|
||||
stopRuntimeAuthRefreshTimer: vi.fn(),
|
||||
}),
|
||||
}));
|
||||
vi.doMock("../plugins/provider-runtime.js", async (importOriginal) => {
|
||||
const actual = await importOriginal<typeof import("../plugins/provider-runtime.js")>();
|
||||
return {
|
||||
|
|
@ -144,6 +161,8 @@ afterAll(async () => {
|
|||
beforeEach(() => {
|
||||
vi.useRealTimers();
|
||||
runEmbeddedAttemptMock.mockReset();
|
||||
disposeSessionMcpRuntimeMock.mockReset();
|
||||
refreshRuntimeAuthOnFirstPromptError = false;
|
||||
runEmbeddedAttemptMock.mockImplementation(async () => {
|
||||
throw new Error("unexpected extra runEmbeddedAttempt call");
|
||||
});
|
||||
|
|
@ -245,6 +264,84 @@ const runDefaultEmbeddedTurn = async (sessionFile: string, prompt: string, sessi
|
|||
};
|
||||
|
||||
describe("runEmbeddedPiAgent", () => {
|
||||
it("disposes bundle MCP once when a one-shot local run completes", async () => {
|
||||
const sessionFile = nextSessionFile();
|
||||
const cfg = createEmbeddedPiRunnerOpenAiConfig(["mock-1"]);
|
||||
const sessionKey = nextSessionKey();
|
||||
runEmbeddedAttemptMock.mockResolvedValueOnce(
|
||||
makeEmbeddedRunnerAttempt({
|
||||
assistantTexts: ["ok"],
|
||||
lastAssistant: buildEmbeddedRunnerAssistant({
|
||||
content: [{ type: "text", text: "ok" }],
|
||||
}),
|
||||
}),
|
||||
);
|
||||
|
||||
await runEmbeddedPiAgent({
|
||||
sessionId: "session:test",
|
||||
sessionKey,
|
||||
sessionFile,
|
||||
workspaceDir,
|
||||
config: cfg,
|
||||
prompt: "hello",
|
||||
provider: "openai",
|
||||
model: "mock-1",
|
||||
timeoutMs: 5_000,
|
||||
agentDir,
|
||||
runId: nextRunId("bundle-mcp-run-cleanup"),
|
||||
enqueue: immediateEnqueue,
|
||||
cleanupBundleMcpOnRunEnd: true,
|
||||
});
|
||||
|
||||
expect(runEmbeddedAttemptMock).toHaveBeenCalledTimes(1);
|
||||
expect(disposeSessionMcpRuntimeMock).toHaveBeenCalledTimes(1);
|
||||
expect(disposeSessionMcpRuntimeMock).toHaveBeenCalledWith("session:test");
|
||||
});
|
||||
|
||||
it("preserves bundle MCP state across retries within one local run", async () => {
|
||||
refreshRuntimeAuthOnFirstPromptError = true;
|
||||
const sessionFile = nextSessionFile();
|
||||
const cfg = createEmbeddedPiRunnerOpenAiConfig(["mock-1"]);
|
||||
const sessionKey = nextSessionKey();
|
||||
runEmbeddedAttemptMock
|
||||
.mockImplementationOnce(async () => {
|
||||
expect(disposeSessionMcpRuntimeMock).not.toHaveBeenCalled();
|
||||
return makeEmbeddedRunnerAttempt({
|
||||
promptError: new Error("401 unauthorized"),
|
||||
});
|
||||
})
|
||||
.mockImplementationOnce(async () => {
|
||||
expect(disposeSessionMcpRuntimeMock).not.toHaveBeenCalled();
|
||||
return makeEmbeddedRunnerAttempt({
|
||||
assistantTexts: ["ok"],
|
||||
lastAssistant: buildEmbeddedRunnerAssistant({
|
||||
content: [{ type: "text", text: "ok" }],
|
||||
}),
|
||||
});
|
||||
});
|
||||
|
||||
const result = await runEmbeddedPiAgent({
|
||||
sessionId: "session:test",
|
||||
sessionKey,
|
||||
sessionFile,
|
||||
workspaceDir,
|
||||
config: cfg,
|
||||
prompt: "hello",
|
||||
provider: "openai",
|
||||
model: "mock-1",
|
||||
timeoutMs: 5_000,
|
||||
agentDir,
|
||||
runId: nextRunId("bundle-mcp-retry"),
|
||||
enqueue: immediateEnqueue,
|
||||
cleanupBundleMcpOnRunEnd: true,
|
||||
});
|
||||
|
||||
expect(runEmbeddedAttemptMock).toHaveBeenCalledTimes(2);
|
||||
expect(result.payloads?.[0]).toMatchObject({ text: "ok" });
|
||||
expect(disposeSessionMcpRuntimeMock).toHaveBeenCalledTimes(1);
|
||||
expect(disposeSessionMcpRuntimeMock).toHaveBeenCalledWith("session:test");
|
||||
});
|
||||
|
||||
it("handles prompt error paths without dropping user state", async () => {
|
||||
const sessionFile = nextSessionFile();
|
||||
const cfg = createEmbeddedPiRunnerOpenAiConfig(["mock-error"]);
|
||||
|
|
|
|||
|
|
@ -39,6 +39,7 @@ import {
|
|||
} from "../model-auth.js";
|
||||
import { normalizeProviderId } from "../model-selection.js";
|
||||
import { ensureOpenClawModelsJson } from "../models-config.js";
|
||||
import { disposeSessionMcpRuntime } from "../pi-bundle-mcp-tools.js";
|
||||
import {
|
||||
classifyFailoverReason,
|
||||
extractObservedOverflowTokenCount,
|
||||
|
|
@ -1431,6 +1432,13 @@ export async function runEmbeddedPiAgent(
|
|||
} finally {
|
||||
await contextEngine.dispose?.();
|
||||
stopRuntimeAuthRefreshTimer();
|
||||
if (params.cleanupBundleMcpOnRunEnd === true) {
|
||||
await disposeSessionMcpRuntime(params.sessionId).catch((error) => {
|
||||
log.warn(
|
||||
`bundle-mcp cleanup failed after run for ${params.sessionId}: ${describeUnknownError(error)}`,
|
||||
);
|
||||
});
|
||||
}
|
||||
}
|
||||
}),
|
||||
);
|
||||
|
|
|
|||
|
|
@ -132,4 +132,10 @@ export type RunEmbeddedPiAgentParams = {
|
|||
* where transient service pressure is often model-scoped.
|
||||
*/
|
||||
allowTransientCooldownProbe?: boolean;
|
||||
/**
|
||||
* Dispose bundled MCP runtimes when the overall run ends instead of preserving
|
||||
* the session-scoped cache. Intended for one-shot local CLI runs that must
|
||||
* exit promptly after emitting the final JSON result.
|
||||
*/
|
||||
cleanupBundleMcpOnRunEnd?: boolean;
|
||||
};
|
||||
|
|
|
|||
|
|
@ -135,7 +135,24 @@ describe("agentCliCommand", () => {
|
|||
|
||||
expect(callGateway).not.toHaveBeenCalled();
|
||||
expect(agentCommand).toHaveBeenCalledTimes(1);
|
||||
expect(vi.mocked(agentCommand).mock.calls[0]?.[0]).toMatchObject({
|
||||
cleanupBundleMcpOnRunEnd: true,
|
||||
});
|
||||
expect(runtime.log).toHaveBeenCalledWith("local");
|
||||
});
|
||||
});
|
||||
|
||||
it("does not force bundle MCP cleanup on gateway fallback", async () => {
|
||||
await withTempStore(async () => {
|
||||
vi.mocked(callGateway).mockRejectedValue(new Error("gateway not connected"));
|
||||
mockLocalAgentReply();
|
||||
|
||||
await agentCliCommand({ message: "hi", to: "+1555" }, runtime);
|
||||
|
||||
expect(agentCommand).toHaveBeenCalledTimes(1);
|
||||
expect(vi.mocked(agentCommand).mock.calls[0]?.[0]).not.toMatchObject({
|
||||
cleanupBundleMcpOnRunEnd: true,
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -183,6 +183,7 @@ export async function agentCliCommand(opts: AgentCliOpts, runtime: RuntimeEnv, d
|
|||
...opts,
|
||||
agentId: opts.agent,
|
||||
replyAccountId: opts.replyAccount,
|
||||
cleanupBundleMcpOnRunEnd: opts.local === true,
|
||||
};
|
||||
if (opts.local === true) {
|
||||
return await agentCommand(localOpts, runtime, deps);
|
||||
|
|
|
|||
Loading…
Reference in New Issue