diff --git a/extensions/lobster/index.ts b/extensions/lobster/index.ts index 60db95937fe..5163fad561d 100644 --- a/extensions/lobster/index.ts +++ b/extensions/lobster/index.ts @@ -12,7 +12,11 @@ export default definePluginEntry({ if (ctx.sandboxed) { return null; } - return createLobsterTool(api) as AnyAgentTool; + const taskFlow = + api.runtime?.taskFlow && ctx.sessionKey + ? api.runtime.taskFlow.fromToolContext(ctx) + : undefined; + return createLobsterTool(api, { taskFlow }) as AnyAgentTool; }) as OpenClawPluginToolFactory, { optional: true }, ); diff --git a/extensions/lobster/src/lobster-taskflow.test.ts b/extensions/lobster/src/lobster-taskflow.test.ts new file mode 100644 index 00000000000..b542c3ab9d8 --- /dev/null +++ b/extensions/lobster/src/lobster-taskflow.test.ts @@ -0,0 +1,314 @@ +import { describe, expect, it, vi } from "vitest"; +import type { OpenClawPluginApi } from "../runtime-api.js"; +import type { LobsterRunner } from "./lobster-runner.js"; +import { resumeManagedLobsterFlow, runManagedLobsterFlow } from "./lobster-taskflow.js"; + +type BoundTaskFlow = ReturnType< + NonNullable["taskFlow"]["bindSession"] +>; + +function createFakeTaskFlow(overrides?: Partial) { + const baseFlow = { + flowId: "flow-1", + revision: 1, + syncMode: "managed" as const, + controllerId: "tests/lobster", + ownerKey: "agent:main:main", + status: "running" as const, + goal: "Run Lobster workflow", + }; + + const taskFlow: BoundTaskFlow = { + sessionKey: "agent:main:main", + createManaged: vi.fn().mockReturnValue(baseFlow), + get: vi.fn(), + list: vi.fn().mockReturnValue([]), + findLatest: vi.fn(), + resolve: vi.fn(), + getTaskSummary: vi.fn(), + setWaiting: vi.fn().mockImplementation((input) => ({ + applied: true, + flow: { ...baseFlow, revision: input.expectedRevision + 1, status: "waiting" as const }, + })), + resume: vi.fn().mockImplementation((input) => ({ + applied: true, + flow: { ...baseFlow, revision: input.expectedRevision + 1, status: "running" as const }, + })), + finish: vi.fn().mockImplementation((input) => ({ + applied: true, + flow: { ...baseFlow, revision: input.expectedRevision + 1, status: "completed" as const }, + })), + fail: vi.fn().mockImplementation((input) => ({ + applied: true, + flow: { ...baseFlow, revision: input.expectedRevision + 1, status: "failed" as const }, + })), + requestCancel: vi.fn(), + cancel: vi.fn(), + runTask: vi.fn(), + ...overrides, + }; + + return taskFlow; +} + +function createRunner(result: Awaited>): LobsterRunner { + return { + run: vi.fn().mockResolvedValue(result), + }; +} + +describe("runManagedLobsterFlow", () => { + it("creates a flow and finishes it when Lobster succeeds", async () => { + const taskFlow = createFakeTaskFlow(); + const runner = createRunner({ + ok: true, + status: "ok", + output: [{ id: "result-1" }], + requiresApproval: null, + }); + + const result = await runManagedLobsterFlow({ + taskFlow, + runner, + runnerParams: { + action: "run", + pipeline: "noop", + cwd: process.cwd(), + timeoutMs: 1000, + maxStdoutBytes: 4096, + }, + controllerId: "tests/lobster", + goal: "Run Lobster workflow", + }); + + expect(result.ok).toBe(true); + expect(taskFlow.createManaged).toHaveBeenCalledWith({ + controllerId: "tests/lobster", + goal: "Run Lobster workflow", + currentStep: "run_lobster", + }); + expect(taskFlow.finish).toHaveBeenCalledWith({ + flowId: "flow-1", + expectedRevision: 1, + }); + }); + + it("moves the flow to waiting when Lobster requests approval", async () => { + const taskFlow = createFakeTaskFlow(); + const createdAt = new Date("2026-04-05T21:00:00.000Z"); + const runner = createRunner({ + ok: true, + status: "needs_approval", + output: [], + requiresApproval: { + type: "approval_request", + prompt: "Approve this?", + items: [{ id: "item-1", createdAt, count: 2n, skip: undefined }], + resumeToken: "resume-1", + }, + }); + + const result = await runManagedLobsterFlow({ + taskFlow, + runner, + runnerParams: { + action: "run", + pipeline: "noop", + cwd: process.cwd(), + timeoutMs: 1000, + maxStdoutBytes: 4096, + }, + controllerId: "tests/lobster", + goal: "Run Lobster workflow", + }); + + expect(result.ok).toBe(true); + expect(taskFlow.setWaiting).toHaveBeenCalledWith({ + flowId: "flow-1", + expectedRevision: 1, + currentStep: "await_lobster_approval", + waitJson: { + kind: "lobster_approval", + prompt: "Approve this?", + items: [{ id: "item-1", createdAt: createdAt.toISOString(), count: "2" }], + resumeToken: "resume-1", + }, + }); + }); + + it("fails the flow when Lobster returns an error envelope", async () => { + const taskFlow = createFakeTaskFlow(); + const runner = createRunner({ + ok: false, + error: { + type: "runtime_error", + message: "boom", + }, + }); + + const result = await runManagedLobsterFlow({ + taskFlow, + runner, + runnerParams: { + action: "run", + pipeline: "noop", + cwd: process.cwd(), + timeoutMs: 1000, + maxStdoutBytes: 4096, + }, + controllerId: "tests/lobster", + goal: "Run Lobster workflow", + }); + + expect(result.ok).toBe(false); + expect(result.error.message).toBe("boom"); + expect(taskFlow.fail).toHaveBeenCalledWith({ + flowId: "flow-1", + expectedRevision: 1, + }); + }); + + it("fails the flow when the runner throws", async () => { + const taskFlow = createFakeTaskFlow(); + const runner: LobsterRunner = { + run: vi.fn().mockRejectedValue(new Error("crashed")), + }; + + const result = await runManagedLobsterFlow({ + taskFlow, + runner, + runnerParams: { + action: "run", + pipeline: "noop", + cwd: process.cwd(), + timeoutMs: 1000, + maxStdoutBytes: 4096, + }, + controllerId: "tests/lobster", + goal: "Run Lobster workflow", + }); + + expect(result.ok).toBe(false); + expect(result.error.message).toBe("crashed"); + expect(taskFlow.fail).toHaveBeenCalledWith({ + flowId: "flow-1", + expectedRevision: 1, + }); + }); +}); + +describe("resumeManagedLobsterFlow", () => { + it("resumes the flow and finishes it on success", async () => { + const taskFlow = createFakeTaskFlow(); + const runner = createRunner({ + ok: true, + status: "ok", + output: [], + requiresApproval: null, + }); + + const result = await resumeManagedLobsterFlow({ + taskFlow, + runner, + flowId: "flow-1", + expectedRevision: 4, + runnerParams: { + action: "resume", + token: "resume-1", + approve: true, + cwd: process.cwd(), + timeoutMs: 1000, + maxStdoutBytes: 4096, + }, + }); + + expect(result.ok).toBe(true); + expect(taskFlow.resume).toHaveBeenCalledWith({ + flowId: "flow-1", + expectedRevision: 4, + status: "running", + currentStep: "resume_lobster", + }); + expect(taskFlow.finish).toHaveBeenCalledWith({ + flowId: "flow-1", + expectedRevision: 5, + }); + }); + + it("returns a mutation error when taskFlow resume is rejected", async () => { + const taskFlow = createFakeTaskFlow({ + resume: vi.fn().mockReturnValue({ + applied: false, + code: "revision_conflict", + }), + }); + const runner = createRunner({ + ok: true, + status: "ok", + output: [], + requiresApproval: null, + }); + + const result = await resumeManagedLobsterFlow({ + taskFlow, + runner, + flowId: "flow-1", + expectedRevision: 4, + runnerParams: { + action: "resume", + token: "resume-1", + approve: true, + cwd: process.cwd(), + timeoutMs: 1000, + maxStdoutBytes: 4096, + }, + }); + + expect(result.ok).toBe(false); + expect(result.error.message).toMatch(/revision_conflict/); + expect(runner.run).not.toHaveBeenCalled(); + }); + + it("returns to waiting when the resumed Lobster run needs approval again", async () => { + const taskFlow = createFakeTaskFlow(); + const runner = createRunner({ + ok: true, + status: "needs_approval", + output: [], + requiresApproval: { + type: "approval_request", + prompt: "Approve this too?", + items: [{ id: "item-2" }], + resumeToken: "resume-2", + }, + }); + + const result = await resumeManagedLobsterFlow({ + taskFlow, + runner, + flowId: "flow-1", + expectedRevision: 4, + runnerParams: { + action: "resume", + token: "resume-1", + approve: true, + cwd: process.cwd(), + timeoutMs: 1000, + maxStdoutBytes: 4096, + }, + }); + + expect(result.ok).toBe(true); + expect(taskFlow.setWaiting).toHaveBeenCalledWith({ + flowId: "flow-1", + expectedRevision: 5, + currentStep: "await_lobster_approval", + waitJson: { + kind: "lobster_approval", + prompt: "Approve this too?", + items: [{ id: "item-2" }], + resumeToken: "resume-2", + }, + }); + }); +}); diff --git a/extensions/lobster/src/lobster-taskflow.ts b/extensions/lobster/src/lobster-taskflow.ts new file mode 100644 index 00000000000..11f5df26647 --- /dev/null +++ b/extensions/lobster/src/lobster-taskflow.ts @@ -0,0 +1,275 @@ +import type { OpenClawPluginApi } from "../runtime-api.js"; +import type { LobsterEnvelope, LobsterRunner, LobsterRunnerParams } from "./lobster-runner.js"; + +type JsonLike = + | null + | boolean + | number + | string + | JsonLike[] + | { + [key: string]: JsonLike; + }; + +type BoundTaskFlow = ReturnType< + NonNullable["taskFlow"]["bindSession"] +>; + +type FlowRecord = ReturnType; +type MutationResult = ReturnType; + +export type LobsterApprovalWaitState = { + kind: "lobster_approval"; + prompt: string; + items: JsonLike[]; + resumeToken?: string; +}; + +export type RunManagedLobsterFlowParams = { + taskFlow: BoundTaskFlow; + runner: LobsterRunner; + runnerParams: LobsterRunnerParams; + controllerId: string; + goal: string; + stateJson?: JsonLike; + currentStep?: string; + waitingStep?: string; +}; + +export type ResumeManagedLobsterFlowParams = { + taskFlow: BoundTaskFlow; + runner: LobsterRunner; + runnerParams: LobsterRunnerParams & { + action: "resume"; + token: string; + approve: boolean; + }; + flowId: string; + expectedRevision: number; + currentStep?: string; + waitingStep?: string; +}; + +export type ManagedLobsterFlowResult = + | { + ok: true; + envelope: LobsterEnvelope; + flow: FlowRecord; + mutation: MutationResult; + } + | { + ok: false; + flow?: FlowRecord; + mutation?: MutationResult; + error: Error; + }; + +function toJsonLike(value: unknown, seen = new WeakSet()): JsonLike { + if (value === null) { + return null; + } + switch (typeof value) { + case "boolean": + case "string": + return value; + case "number": + return Number.isFinite(value) ? value : String(value); + case "bigint": + return value.toString(); + case "undefined": + case "function": + case "symbol": + return null; + case "object": { + if (value instanceof Date) { + return value.toISOString(); + } + if (Array.isArray(value)) { + return value.map((item) => toJsonLike(item, seen)); + } + if (seen.has(value)) { + return "[Circular]"; + } + seen.add(value); + const jsonObject: Record = {}; + for (const [key, entry] of Object.entries(value)) { + if (entry === undefined || typeof entry === "function" || typeof entry === "symbol") { + continue; + } + jsonObject[key] = toJsonLike(entry, seen); + } + seen.delete(value); + return jsonObject; + } + } +} + +function buildApprovalWaitState(envelope: Extract): JsonLike { + if (!envelope.requiresApproval) { + return { + kind: "lobster_approval", + prompt: "", + items: [], + } satisfies LobsterApprovalWaitState; + } + return { + kind: "lobster_approval", + prompt: envelope.requiresApproval.prompt, + items: envelope.requiresApproval.items.map((item) => toJsonLike(item)), + ...(envelope.requiresApproval.resumeToken + ? { resumeToken: envelope.requiresApproval.resumeToken } + : {}), + } satisfies LobsterApprovalWaitState; +} + +function applyEnvelopeToFlow(params: { + taskFlow: BoundTaskFlow; + flow: FlowRecord; + envelope: LobsterEnvelope; + waitingStep: string; +}): MutationResult { + const { taskFlow, flow, envelope, waitingStep } = params; + + if (!envelope.ok) { + return taskFlow.fail({ + flowId: flow.flowId, + expectedRevision: flow.revision, + }); + } + + if (envelope.status === "needs_approval") { + return taskFlow.setWaiting({ + flowId: flow.flowId, + expectedRevision: flow.revision, + currentStep: waitingStep, + waitJson: buildApprovalWaitState(envelope), + }); + } + + return taskFlow.finish({ + flowId: flow.flowId, + expectedRevision: flow.revision, + }); +} + +function buildEnvelopeError(envelope: Extract) { + return new Error(envelope.error.message); +} + +export async function runManagedLobsterFlow( + params: RunManagedLobsterFlowParams, +): Promise { + const flow = params.taskFlow.createManaged({ + controllerId: params.controllerId, + goal: params.goal, + currentStep: params.currentStep ?? "run_lobster", + ...(params.stateJson !== undefined ? { stateJson: params.stateJson } : {}), + }); + + try { + const envelope = await params.runner.run(params.runnerParams); + const mutation = applyEnvelopeToFlow({ + taskFlow: params.taskFlow, + flow, + envelope, + waitingStep: params.waitingStep ?? "await_lobster_approval", + }); + if (!envelope.ok) { + return { + ok: false, + flow, + mutation, + error: buildEnvelopeError(envelope), + }; + } + return { + ok: true, + envelope, + flow, + mutation, + }; + } catch (error) { + const err = error instanceof Error ? error : new Error(String(error)); + try { + const mutation = params.taskFlow.fail({ + flowId: flow.flowId, + expectedRevision: flow.revision, + }); + return { + ok: false, + flow, + mutation, + error: err, + }; + } catch { + return { + ok: false, + flow, + error: err, + }; + } + } +} + +export async function resumeManagedLobsterFlow( + params: ResumeManagedLobsterFlowParams, +): Promise { + const resumed = params.taskFlow.resume({ + flowId: params.flowId, + expectedRevision: params.expectedRevision, + status: "running", + currentStep: params.currentStep ?? "resume_lobster", + }); + + if (!resumed.applied) { + return { + ok: false, + mutation: resumed, + error: new Error(`TaskFlow resume failed: ${resumed.code}`), + }; + } + + try { + const envelope = await params.runner.run(params.runnerParams); + const mutation = applyEnvelopeToFlow({ + taskFlow: params.taskFlow, + flow: resumed.flow, + envelope, + waitingStep: params.waitingStep ?? "await_lobster_approval", + }); + if (!envelope.ok) { + return { + ok: false, + flow: resumed.flow, + mutation, + error: buildEnvelopeError(envelope), + }; + } + return { + ok: true, + envelope, + flow: resumed.flow, + mutation, + }; + } catch (error) { + const err = error instanceof Error ? error : new Error(String(error)); + try { + const mutation = params.taskFlow.fail({ + flowId: params.flowId, + expectedRevision: resumed.flow.revision, + }); + return { + ok: false, + flow: resumed.flow, + mutation, + error: err, + }; + } catch { + return { + ok: false, + flow: resumed.flow, + error: err, + }; + } + } +} diff --git a/extensions/lobster/src/lobster-tool.test.ts b/extensions/lobster/src/lobster-tool.test.ts index 7e56b57ec7f..1fe50dcbc9a 100644 --- a/extensions/lobster/src/lobster-tool.test.ts +++ b/extensions/lobster/src/lobster-tool.test.ts @@ -4,6 +4,10 @@ import type { OpenClawPluginApi, OpenClawPluginToolContext } from "../runtime-ap let createLobsterTool: typeof import("./lobster-tool.js").createLobsterTool; +type BoundTaskFlow = ReturnType< + NonNullable["taskFlow"]["bindSession"] +>; + function fakeApi(overrides: Partial = {}): OpenClawPluginApi { return createTestPluginApi({ id: "lobster", @@ -30,6 +34,48 @@ function fakeCtx(overrides: Partial = {}): OpenClawPl }; } +function createFakeTaskFlow(overrides?: Partial): BoundTaskFlow { + const baseFlow = { + flowId: "flow-1", + revision: 1, + syncMode: "managed" as const, + controllerId: "tests/lobster", + ownerKey: "agent:main:main", + status: "running" as const, + goal: "Run Lobster workflow", + }; + + return { + sessionKey: "agent:main:main", + createManaged: vi.fn().mockReturnValue(baseFlow), + get: vi.fn(), + list: vi.fn().mockReturnValue([]), + findLatest: vi.fn(), + resolve: vi.fn(), + getTaskSummary: vi.fn(), + setWaiting: vi.fn().mockImplementation((input) => ({ + applied: true, + flow: { ...baseFlow, revision: input.expectedRevision + 1, status: "waiting" as const }, + })), + resume: vi.fn().mockImplementation((input) => ({ + applied: true, + flow: { ...baseFlow, revision: input.expectedRevision + 1, status: "running" as const }, + })), + finish: vi.fn().mockImplementation((input) => ({ + applied: true, + flow: { ...baseFlow, revision: input.expectedRevision + 1, status: "completed" as const }, + })), + fail: vi.fn().mockImplementation((input) => ({ + applied: true, + flow: { ...baseFlow, revision: input.expectedRevision + 1, status: "failed" as const }, + })), + requestCancel: vi.fn(), + cancel: vi.fn(), + runTask: vi.fn(), + ...overrides, + }; +} + describe("lobster plugin tool", () => { it("returns the Lobster envelope in details", async () => { ({ createLobsterTool } = await import("./lobster-tool.js")); @@ -133,6 +179,100 @@ describe("lobster plugin tool", () => { ).rejects.toThrow("boom"); }); + it("can run through managed TaskFlow mode", async () => { + ({ createLobsterTool } = await import("./lobster-tool.js")); + + const runner = { + run: vi.fn().mockResolvedValue({ + ok: true, + status: "needs_approval", + output: [], + requiresApproval: { + type: "approval_request", + prompt: "Approve this?", + items: [{ id: "item-1" }], + resumeToken: "resume-1", + }, + }), + }; + const taskFlow = createFakeTaskFlow(); + + const tool = createLobsterTool(fakeApi(), { runner, taskFlow }); + const res = await tool.execute("call-managed-run", { + action: "run", + pipeline: "noop", + flowControllerId: "tests/lobster", + flowGoal: "Run Lobster workflow", + flowStateJson: '{"lane":"email"}', + flowCurrentStep: "run_lobster", + flowWaitingStep: "await_review", + }); + + expect(taskFlow.createManaged).toHaveBeenCalledWith({ + controllerId: "tests/lobster", + goal: "Run Lobster workflow", + currentStep: "run_lobster", + stateJson: { lane: "email" }, + }); + expect(taskFlow.setWaiting).toHaveBeenCalledWith({ + flowId: "flow-1", + expectedRevision: 1, + currentStep: "await_review", + waitJson: { + kind: "lobster_approval", + prompt: "Approve this?", + items: [{ id: "item-1" }], + resumeToken: "resume-1", + }, + }); + expect(res.details).toMatchObject({ + ok: true, + status: "needs_approval", + flow: { + flowId: "flow-1", + }, + mutation: { + applied: true, + }, + }); + }); + + it("rejects managed TaskFlow params when no bound taskFlow runtime is available", async () => { + ({ createLobsterTool } = await import("./lobster-tool.js")); + + const tool = createLobsterTool(fakeApi(), { + runner: { run: vi.fn() }, + }); + + await expect( + tool.execute("call-missing-taskflow", { + action: "run", + pipeline: "noop", + flowControllerId: "tests/lobster", + flowGoal: "Run Lobster workflow", + }), + ).rejects.toThrow(/Managed TaskFlow run mode requires a bound taskFlow runtime/); + }); + + it("rejects invalid flowStateJson in managed TaskFlow mode", async () => { + ({ createLobsterTool } = await import("./lobster-tool.js")); + + const tool = createLobsterTool(fakeApi(), { + runner: { run: vi.fn() }, + taskFlow: createFakeTaskFlow(), + }); + + await expect( + tool.execute("call-invalid-flow-json", { + action: "run", + pipeline: "noop", + flowControllerId: "tests/lobster", + flowGoal: "Run Lobster workflow", + flowStateJson: "{bad", + }), + ).rejects.toThrow(/flowStateJson must be valid JSON/); + }); + it("requires action", async () => { ({ createLobsterTool } = await import("./lobster-tool.js")); diff --git a/extensions/lobster/src/lobster-tool.ts b/extensions/lobster/src/lobster-tool.ts index aff2d66aeb1..2b366080ff5 100644 --- a/extensions/lobster/src/lobster-tool.ts +++ b/extensions/lobster/src/lobster-tool.ts @@ -6,8 +6,150 @@ import { type LobsterRunner, type LobsterRunnerParams, } from "./lobster-runner.js"; +import { resumeManagedLobsterFlow, runManagedLobsterFlow } from "./lobster-taskflow.js"; -export function createLobsterTool(api: OpenClawPluginApi, options?: { runner?: LobsterRunner }) { +type BoundTaskFlow = ReturnType< + NonNullable["taskFlow"]["bindSession"] +>; + +type JsonLike = + | null + | boolean + | number + | string + | JsonLike[] + | { + [key: string]: JsonLike; + }; + +type LobsterToolOptions = { + runner?: LobsterRunner; + taskFlow?: BoundTaskFlow; +}; + +type ManagedFlowRunParams = { + controllerId: string; + goal: string; + currentStep?: string; + waitingStep?: string; + stateJson?: JsonLike; +}; + +type ManagedFlowResumeParams = { + flowId: string; + expectedRevision: number; + currentStep?: string; + waitingStep?: string; +}; + +function readOptionalTrimmedString(value: unknown, fieldName: string): string | undefined { + if (value === undefined) { + return undefined; + } + if (typeof value !== "string") { + throw new Error(`${fieldName} must be a string`); + } + const trimmed = value.trim(); + return trimmed ? trimmed : undefined; +} + +function readOptionalNumber(value: unknown, fieldName: string): number | undefined { + if (value === undefined) { + return undefined; + } + if (typeof value !== "number" || !Number.isInteger(value)) { + throw new Error(`${fieldName} must be an integer`); + } + return value; +} + +function parseOptionalFlowStateJson(value: unknown): JsonLike | undefined { + if (value === undefined) { + return undefined; + } + if (typeof value !== "string") { + throw new Error("flowStateJson must be a JSON string"); + } + try { + return JSON.parse(value) as JsonLike; + } catch { + throw new Error("flowStateJson must be valid JSON"); + } +} + +function parseRunFlowParams(params: Record): ManagedFlowRunParams | null { + const controllerId = readOptionalTrimmedString(params.flowControllerId, "flowControllerId"); + const goal = readOptionalTrimmedString(params.flowGoal, "flowGoal"); + const currentStep = readOptionalTrimmedString(params.flowCurrentStep, "flowCurrentStep"); + const waitingStep = readOptionalTrimmedString(params.flowWaitingStep, "flowWaitingStep"); + const stateJson = parseOptionalFlowStateJson(params.flowStateJson); + const resumeFlowId = readOptionalTrimmedString(params.flowId, "flowId"); + const resumeRevision = readOptionalNumber(params.flowExpectedRevision, "flowExpectedRevision"); + + const hasRunFields = + controllerId !== undefined || + goal !== undefined || + currentStep !== undefined || + waitingStep !== undefined || + stateJson !== undefined; + + if (!hasRunFields) { + return null; + } + if (resumeFlowId !== undefined || resumeRevision !== undefined) { + throw new Error("run action does not accept flowId or flowExpectedRevision"); + } + if (!controllerId) { + throw new Error("flowControllerId required when using managed TaskFlow run mode"); + } + if (!goal) { + throw new Error("flowGoal required when using managed TaskFlow run mode"); + } + return { + controllerId, + goal, + ...(currentStep ? { currentStep } : {}), + ...(waitingStep ? { waitingStep } : {}), + ...(stateJson !== undefined ? { stateJson } : {}), + }; +} + +function parseResumeFlowParams(params: Record): ManagedFlowResumeParams | null { + const flowId = readOptionalTrimmedString(params.flowId, "flowId"); + const expectedRevision = readOptionalNumber(params.flowExpectedRevision, "flowExpectedRevision"); + const currentStep = readOptionalTrimmedString(params.flowCurrentStep, "flowCurrentStep"); + const waitingStep = readOptionalTrimmedString(params.flowWaitingStep, "flowWaitingStep"); + const runControllerId = readOptionalTrimmedString(params.flowControllerId, "flowControllerId"); + const runGoal = readOptionalTrimmedString(params.flowGoal, "flowGoal"); + const stateJson = params.flowStateJson; + + const hasResumeFields = + flowId !== undefined || + expectedRevision !== undefined || + currentStep !== undefined || + waitingStep !== undefined; + + if (!hasResumeFields) { + return null; + } + if (runControllerId !== undefined || runGoal !== undefined || stateJson !== undefined) { + throw new Error("resume action does not accept flowControllerId, flowGoal, or flowStateJson"); + } + if (!flowId) { + throw new Error("flowId required when using managed TaskFlow resume mode"); + } + if (expectedRevision === undefined) { + throw new Error("flowExpectedRevision required when using managed TaskFlow resume mode"); + } + return { + flowId, + expectedRevision, + ...(currentStep ? { currentStep } : {}), + ...(waitingStep ? { waitingStep } : {}), + }; +} + +export function createLobsterTool(api: OpenClawPluginApi, options?: LobsterToolOptions) { const runner = options?.runner ?? createEmbeddedLobsterRunner(); return { name: "lobster", @@ -29,6 +171,13 @@ export function createLobsterTool(api: OpenClawPluginApi, options?: { runner?: L ), timeoutMs: Type.Optional(Type.Number()), maxStdoutBytes: Type.Optional(Type.Number()), + flowControllerId: Type.Optional(Type.String()), + flowGoal: Type.Optional(Type.String()), + flowStateJson: Type.Optional(Type.String()), + flowId: Type.Optional(Type.String()), + flowExpectedRevision: Type.Optional(Type.Number()), + flowCurrentStep: Type.Optional(Type.String()), + flowWaitingStep: Type.Optional(Type.String()), }), async execute(_id: string, params: Record) { const action = typeof params.action === "string" ? params.action.trim() : ""; @@ -58,11 +207,75 @@ export function createLobsterTool(api: OpenClawPluginApi, options?: { runner?: L timeoutMs, maxStdoutBytes, }; + + const taskFlow = options?.taskFlow; + if (action === "run") { + const flowParams = parseRunFlowParams(params); + if (flowParams) { + if (!taskFlow) { + throw new Error("Managed TaskFlow run mode requires a bound taskFlow runtime"); + } + const result = await runManagedLobsterFlow({ + taskFlow, + runner, + runnerParams, + controllerId: flowParams.controllerId, + goal: flowParams.goal, + ...(flowParams.stateJson !== undefined ? { stateJson: flowParams.stateJson } : {}), + ...(flowParams.currentStep ? { currentStep: flowParams.currentStep } : {}), + ...(flowParams.waitingStep ? { waitingStep: flowParams.waitingStep } : {}), + }); + if (!result.ok) { + throw result.error; + } + const details = { + ...result.envelope, + flow: result.flow, + mutation: result.mutation, + }; + return { + content: [{ type: "text", text: JSON.stringify(details, null, 2) }], + details, + }; + } + } else { + const flowParams = parseResumeFlowParams(params); + if (flowParams) { + if (!taskFlow) { + throw new Error("Managed TaskFlow resume mode requires a bound taskFlow runtime"); + } + const result = await resumeManagedLobsterFlow({ + taskFlow, + runner, + runnerParams: runnerParams as LobsterRunnerParams & { + action: "resume"; + token: string; + approve: boolean; + }, + flowId: flowParams.flowId, + expectedRevision: flowParams.expectedRevision, + ...(flowParams.currentStep ? { currentStep: flowParams.currentStep } : {}), + ...(flowParams.waitingStep ? { waitingStep: flowParams.waitingStep } : {}), + }); + if (!result.ok) { + throw result.error; + } + const details = { + ...result.envelope, + flow: result.flow, + mutation: result.mutation, + }; + return { + content: [{ type: "text", text: JSON.stringify(details, null, 2) }], + details, + }; + } + } + const envelope = await runner.run(runnerParams); if (!envelope.ok) { throw new Error(envelope.error.message); } - return { content: [{ type: "text", text: JSON.stringify(envelope, null, 2) }], details: envelope,