From b167df78aa17ad52c0deed80a62dd70906d09fb7 Mon Sep 17 00:00:00 2001 From: Mariano <132747814+mbelinky@users.noreply.github.com> Date: Mon, 6 Apr 2026 03:52:24 +0200 Subject: [PATCH] Lobster: harden embedded runtime integration (#61566) Merged via squash. Prepared head SHA: a6f48309fd8278af993bcac80c212736dab065c7 Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com> Reviewed-by: @mbelinky --- CHANGELOG.md | 2 + extensions/lobster/src/lobster-runner.test.ts | 40 +++++++++ extensions/lobster/src/lobster-runner.ts | 27 +++++- .../lobster/src/lobster-taskflow.test.ts | 57 ++----------- extensions/lobster/src/lobster-tool.test.ts | 83 +++++++++---------- extensions/lobster/src/lobster-tool.ts | 18 ++++ .../lobster/src/taskflow-test-helpers.ts | 48 +++++++++++ 7 files changed, 179 insertions(+), 96 deletions(-) create mode 100644 extensions/lobster/src/taskflow-test-helpers.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 5b99110448f..23922b3aeda 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -46,6 +46,8 @@ Docs: https://docs.openclaw.ai - Docs/IRC: replace public IRC hostname examples with `irc.example.com` and recommend private servers for bot coordination while listing common public networks for intentional use. - Memory/dreaming: group nearby daily-note lines into short coherent chunks before staging them for dreaming, so one-off context from recent notes reaches REM/deep with better evidence and less line-level noise. - Memory/dreaming: drop generic date/day headings from daily-note chunk prefixes while keeping meaningful section labels, so staged snippets stay cleaner and more reusable. (#61597) Thanks @mbelinky. +- Plugins/Lobster: run bundled Lobster workflows in process instead of spawning the external CLI, reducing transport overhead and unblocking native runtime integration. (#61523) Thanks @mbelinky. +- Plugins/Lobster: harden managed resume validation so invalid TaskFlow resume calls fail earlier, and memoize embedded runtime loading per runner while keeping failed loads retryable. (#61566) Thanks @mbelinky. ### Fixes diff --git a/extensions/lobster/src/lobster-runner.test.ts b/extensions/lobster/src/lobster-runner.test.ts index 28c910c9292..2d6d6795904 100644 --- a/extensions/lobster/src/lobster-runner.test.ts +++ b/extensions/lobster/src/lobster-runner.test.ts @@ -205,6 +205,46 @@ describe("createEmbeddedLobsterRunner", () => { }); }); + it("loads the embedded runtime once per runner", async () => { + const runtime = { + runToolRequest: vi.fn().mockResolvedValue({ + ok: true, + protocolVersion: 1, + status: "ok", + output: [], + requiresApproval: null, + }), + resumeToolRequest: vi.fn().mockResolvedValue({ + ok: true, + protocolVersion: 1, + status: "cancelled", + output: [], + requiresApproval: null, + }), + }; + const loadRuntime = vi.fn().mockResolvedValue(runtime); + + const runner = createEmbeddedLobsterRunner({ loadRuntime }); + + await runner.run({ + action: "run", + pipeline: "exec --json=true echo hi", + cwd: process.cwd(), + timeoutMs: 2000, + maxStdoutBytes: 4096, + }); + await runner.run({ + action: "resume", + token: "resume-token", + approve: false, + cwd: process.cwd(), + timeoutMs: 2000, + maxStdoutBytes: 4096, + }); + + expect(loadRuntime).toHaveBeenCalledTimes(1); + }); + it("requires a pipeline for run", async () => { const runner = createEmbeddedLobsterRunner({ loadRuntime: vi.fn().mockResolvedValue({ diff --git a/extensions/lobster/src/lobster-runner.ts b/extensions/lobster/src/lobster-runner.ts index 93598594e12..56127e7ad36 100644 --- a/extensions/lobster/src/lobster-runner.ts +++ b/extensions/lobster/src/lobster-runner.ts @@ -1,4 +1,5 @@ import { randomUUID } from "node:crypto"; +import { existsSync } from "node:fs"; import { createRequire } from "node:module"; import path from "node:path"; import { Readable, Writable } from "node:stream"; @@ -597,7 +598,19 @@ async function importInstalledLobsterModule( function resolveInstalledLobsterRoot() { const require = createRequire(import.meta.url); const sdkEntry = require.resolve("@clawdbot/lobster"); - return path.resolve(path.dirname(sdkEntry), "../../.."); + let currentDir = path.dirname(sdkEntry); + + while (true) { + const packageJsonPath = path.join(currentDir, "package.json"); + if (existsSync(packageJsonPath)) { + return currentDir; + } + const parentDir = path.dirname(currentDir); + if (parentDir === currentDir) { + throw new Error("Unable to resolve the installed @clawdbot/lobster package root"); + } + currentDir = parentDir; + } } async function loadEmbeddedToolRuntimeFromPackage(): Promise { @@ -672,9 +685,19 @@ export function createEmbeddedLobsterRunner(options?: { loadRuntime?: LoadEmbeddedToolRuntime; }): LobsterRunner { const loadRuntime = options?.loadRuntime ?? loadEmbeddedToolRuntimeFromPackage; + let runtimePromise: Promise | undefined; + + const getRuntime = () => { + runtimePromise ??= loadRuntime().catch((error) => { + runtimePromise = undefined; + throw error; + }); + return runtimePromise; + }; + return { async run(params) { - const runtime = await loadRuntime(); + const runtime = await getRuntime(); return await withTimeout(params.timeoutMs, async (signal) => { const ctx = createEmbeddedToolContext(params, signal); diff --git a/extensions/lobster/src/lobster-taskflow.test.ts b/extensions/lobster/src/lobster-taskflow.test.ts index 73d771dbcf2..580868a010f 100644 --- a/extensions/lobster/src/lobster-taskflow.test.ts +++ b/extensions/lobster/src/lobster-taskflow.test.ts @@ -1,56 +1,17 @@ import { describe, expect, it, vi } from "vitest"; -import type { OpenClawPluginApi } from "../runtime-api.js"; import type { LobsterRunner } from "./lobster-runner.js"; import { resumeManagedLobsterFlow, runManagedLobsterFlow } from "./lobster-taskflow.js"; +import { createFakeTaskFlow } from "./taskflow-test-helpers.js"; -type BoundTaskFlow = ReturnType< - NonNullable["taskFlow"]["bindSession"] ->; - -function createFakeTaskFlow(overrides?: Partial) { - const baseFlow = { - flowId: "flow-1", - revision: 1, - syncMode: "managed" as const, - controllerId: "tests/lobster", - ownerKey: "agent:main:main", - status: "running" as const, - goal: "Run Lobster workflow", - }; - - const taskFlow: BoundTaskFlow = { - sessionKey: "agent:main:main", - createManaged: vi.fn().mockReturnValue(baseFlow), - get: vi.fn(), - list: vi.fn().mockReturnValue([]), - findLatest: vi.fn(), - resolve: vi.fn(), - getTaskSummary: vi.fn(), - setWaiting: vi.fn().mockImplementation((input) => ({ - applied: true, - flow: { ...baseFlow, revision: input.expectedRevision + 1, status: "waiting" as const }, - })), - resume: vi.fn().mockImplementation((input) => ({ - applied: true, - flow: { ...baseFlow, revision: input.expectedRevision + 1, status: "running" as const }, - })), - finish: vi.fn().mockImplementation((input) => ({ - applied: true, - flow: { ...baseFlow, revision: input.expectedRevision + 1, status: "completed" as const }, - })), - fail: vi.fn().mockImplementation((input) => ({ - applied: true, - flow: { ...baseFlow, revision: input.expectedRevision + 1, status: "failed" as const }, - })), - requestCancel: vi.fn(), - cancel: vi.fn(), - runTask: vi.fn(), - ...overrides, - }; - - return taskFlow; +function expectManagedFlowFailure( + result: Awaited>, +) { + expect(result.ok).toBe(false); + if (result.ok) { + throw new Error("Expected managed Lobster flow to fail"); + } + return result; } - function createRunner(result: Awaited>): LobsterRunner { return { run: vi.fn().mockResolvedValue(result), diff --git a/extensions/lobster/src/lobster-tool.test.ts b/extensions/lobster/src/lobster-tool.test.ts index 1fe50dcbc9a..64b2e14c5b0 100644 --- a/extensions/lobster/src/lobster-tool.test.ts +++ b/extensions/lobster/src/lobster-tool.test.ts @@ -1,13 +1,10 @@ import { describe, expect, it, vi } from "vitest"; import { createTestPluginApi } from "../../../test/helpers/plugins/plugin-api.js"; import type { OpenClawPluginApi, OpenClawPluginToolContext } from "../runtime-api.js"; +import { createFakeTaskFlow } from "./taskflow-test-helpers.js"; let createLobsterTool: typeof import("./lobster-tool.js").createLobsterTool; -type BoundTaskFlow = ReturnType< - NonNullable["taskFlow"]["bindSession"] ->; - function fakeApi(overrides: Partial = {}): OpenClawPluginApi { return createTestPluginApi({ id: "lobster", @@ -34,48 +31,6 @@ function fakeCtx(overrides: Partial = {}): OpenClawPl }; } -function createFakeTaskFlow(overrides?: Partial): BoundTaskFlow { - const baseFlow = { - flowId: "flow-1", - revision: 1, - syncMode: "managed" as const, - controllerId: "tests/lobster", - ownerKey: "agent:main:main", - status: "running" as const, - goal: "Run Lobster workflow", - }; - - return { - sessionKey: "agent:main:main", - createManaged: vi.fn().mockReturnValue(baseFlow), - get: vi.fn(), - list: vi.fn().mockReturnValue([]), - findLatest: vi.fn(), - resolve: vi.fn(), - getTaskSummary: vi.fn(), - setWaiting: vi.fn().mockImplementation((input) => ({ - applied: true, - flow: { ...baseFlow, revision: input.expectedRevision + 1, status: "waiting" as const }, - })), - resume: vi.fn().mockImplementation((input) => ({ - applied: true, - flow: { ...baseFlow, revision: input.expectedRevision + 1, status: "running" as const }, - })), - finish: vi.fn().mockImplementation((input) => ({ - applied: true, - flow: { ...baseFlow, revision: input.expectedRevision + 1, status: "completed" as const }, - })), - fail: vi.fn().mockImplementation((input) => ({ - applied: true, - flow: { ...baseFlow, revision: input.expectedRevision + 1, status: "failed" as const }, - })), - requestCancel: vi.fn(), - cancel: vi.fn(), - runTask: vi.fn(), - ...overrides, - }; -} - describe("lobster plugin tool", () => { it("returns the Lobster envelope in details", async () => { ({ createLobsterTool } = await import("./lobster-tool.js")); @@ -273,6 +228,42 @@ describe("lobster plugin tool", () => { ).rejects.toThrow(/flowStateJson must be valid JSON/); }); + it("rejects managed TaskFlow resume mode without a token", async () => { + ({ createLobsterTool } = await import("./lobster-tool.js")); + + const tool = createLobsterTool(fakeApi(), { + runner: { run: vi.fn() }, + taskFlow: createFakeTaskFlow(), + }); + + await expect( + tool.execute("call-missing-resume-token", { + action: "resume", + flowId: "flow-1", + flowExpectedRevision: 1, + approve: true, + }), + ).rejects.toThrow(/token required when using managed TaskFlow resume mode/); + }); + + it("rejects managed TaskFlow resume mode without approve", async () => { + ({ createLobsterTool } = await import("./lobster-tool.js")); + + const tool = createLobsterTool(fakeApi(), { + runner: { run: vi.fn() }, + taskFlow: createFakeTaskFlow(), + }); + + await expect( + tool.execute("call-missing-resume-approve", { + action: "resume", + token: "resume-token", + flowId: "flow-1", + flowExpectedRevision: 1, + }), + ).rejects.toThrow(/approve required when using managed TaskFlow resume mode/); + }); + it("requires action", async () => { ({ createLobsterTool } = await import("./lobster-tool.js")); diff --git a/extensions/lobster/src/lobster-tool.ts b/extensions/lobster/src/lobster-tool.ts index 2b366080ff5..ada5535965b 100644 --- a/extensions/lobster/src/lobster-tool.ts +++ b/extensions/lobster/src/lobster-tool.ts @@ -63,6 +63,16 @@ function readOptionalNumber(value: unknown, fieldName: string): number | undefin return value; } +function readOptionalBoolean(value: unknown, fieldName: string): boolean | undefined { + if (value === undefined) { + return undefined; + } + if (typeof value !== "boolean") { + throw new Error(`${fieldName} must be a boolean`); + } + return value; +} + function parseOptionalFlowStateJson(value: unknown): JsonLike | undefined { if (value === undefined) { return undefined; @@ -119,6 +129,8 @@ function parseResumeFlowParams(params: Record): ManagedFlowResu const expectedRevision = readOptionalNumber(params.flowExpectedRevision, "flowExpectedRevision"); const currentStep = readOptionalTrimmedString(params.flowCurrentStep, "flowCurrentStep"); const waitingStep = readOptionalTrimmedString(params.flowWaitingStep, "flowWaitingStep"); + const token = readOptionalTrimmedString(params.token, "token"); + const approve = readOptionalBoolean(params.approve, "approve"); const runControllerId = readOptionalTrimmedString(params.flowControllerId, "flowControllerId"); const runGoal = readOptionalTrimmedString(params.flowGoal, "flowGoal"); const stateJson = params.flowStateJson; @@ -141,6 +153,12 @@ function parseResumeFlowParams(params: Record): ManagedFlowResu if (expectedRevision === undefined) { throw new Error("flowExpectedRevision required when using managed TaskFlow resume mode"); } + if (!token) { + throw new Error("token required when using managed TaskFlow resume mode"); + } + if (approve === undefined) { + throw new Error("approve required when using managed TaskFlow resume mode"); + } return { flowId, expectedRevision, diff --git a/extensions/lobster/src/taskflow-test-helpers.ts b/extensions/lobster/src/taskflow-test-helpers.ts new file mode 100644 index 00000000000..8e2f89abc50 --- /dev/null +++ b/extensions/lobster/src/taskflow-test-helpers.ts @@ -0,0 +1,48 @@ +import { vi } from "vitest"; +import type { OpenClawPluginApi } from "../runtime-api.js"; + +export type BoundTaskFlow = ReturnType< + NonNullable["taskFlow"]["bindSession"] +>; + +export function createFakeTaskFlow(overrides?: Partial): BoundTaskFlow { + const baseFlow = { + flowId: "flow-1", + revision: 1, + syncMode: "managed" as const, + controllerId: "tests/lobster", + ownerKey: "agent:main:main", + status: "running" as const, + goal: "Run Lobster workflow", + }; + + return { + sessionKey: "agent:main:main", + createManaged: vi.fn().mockReturnValue(baseFlow), + get: vi.fn(), + list: vi.fn().mockReturnValue([]), + findLatest: vi.fn(), + resolve: vi.fn(), + getTaskSummary: vi.fn(), + setWaiting: vi.fn().mockImplementation((input) => ({ + applied: true, + flow: { ...baseFlow, revision: input.expectedRevision + 1, status: "waiting" as const }, + })), + resume: vi.fn().mockImplementation((input) => ({ + applied: true, + flow: { ...baseFlow, revision: input.expectedRevision + 1, status: "running" as const }, + })), + finish: vi.fn().mockImplementation((input) => ({ + applied: true, + flow: { ...baseFlow, revision: input.expectedRevision + 1, status: "completed" as const }, + })), + fail: vi.fn().mockImplementation((input) => ({ + applied: true, + flow: { ...baseFlow, revision: input.expectedRevision + 1, status: "failed" as const }, + })), + requestCancel: vi.fn(), + cancel: vi.fn(), + runTask: vi.fn(), + ...overrides, + }; +}