Lobster: add managed TaskFlow mode (#61555)

This commit is contained in:
Mariano 2026-04-06 01:37:26 +02:00 committed by GitHub
parent 7f97fa6ed5
commit 30dc24fbd8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 949 additions and 3 deletions

View File

@ -12,7 +12,11 @@ export default definePluginEntry({
if (ctx.sandboxed) {
return null;
}
return createLobsterTool(api) as AnyAgentTool;
const taskFlow =
api.runtime?.taskFlow && ctx.sessionKey
? api.runtime.taskFlow.fromToolContext(ctx)
: undefined;
return createLobsterTool(api, { taskFlow }) as AnyAgentTool;
}) as OpenClawPluginToolFactory,
{ optional: true },
);

View File

@ -0,0 +1,314 @@
import { describe, expect, it, vi } from "vitest";
import type { OpenClawPluginApi } from "../runtime-api.js";
import type { LobsterRunner } from "./lobster-runner.js";
import { resumeManagedLobsterFlow, runManagedLobsterFlow } from "./lobster-taskflow.js";
type BoundTaskFlow = ReturnType<
NonNullable<OpenClawPluginApi["runtime"]>["taskFlow"]["bindSession"]
>;
function createFakeTaskFlow(overrides?: Partial<BoundTaskFlow>) {
const baseFlow = {
flowId: "flow-1",
revision: 1,
syncMode: "managed" as const,
controllerId: "tests/lobster",
ownerKey: "agent:main:main",
status: "running" as const,
goal: "Run Lobster workflow",
};
const taskFlow: BoundTaskFlow = {
sessionKey: "agent:main:main",
createManaged: vi.fn().mockReturnValue(baseFlow),
get: vi.fn(),
list: vi.fn().mockReturnValue([]),
findLatest: vi.fn(),
resolve: vi.fn(),
getTaskSummary: vi.fn(),
setWaiting: vi.fn().mockImplementation((input) => ({
applied: true,
flow: { ...baseFlow, revision: input.expectedRevision + 1, status: "waiting" as const },
})),
resume: vi.fn().mockImplementation((input) => ({
applied: true,
flow: { ...baseFlow, revision: input.expectedRevision + 1, status: "running" as const },
})),
finish: vi.fn().mockImplementation((input) => ({
applied: true,
flow: { ...baseFlow, revision: input.expectedRevision + 1, status: "completed" as const },
})),
fail: vi.fn().mockImplementation((input) => ({
applied: true,
flow: { ...baseFlow, revision: input.expectedRevision + 1, status: "failed" as const },
})),
requestCancel: vi.fn(),
cancel: vi.fn(),
runTask: vi.fn(),
...overrides,
};
return taskFlow;
}
function createRunner(result: Awaited<ReturnType<LobsterRunner["run"]>>): LobsterRunner {
return {
run: vi.fn().mockResolvedValue(result),
};
}
describe("runManagedLobsterFlow", () => {
it("creates a flow and finishes it when Lobster succeeds", async () => {
const taskFlow = createFakeTaskFlow();
const runner = createRunner({
ok: true,
status: "ok",
output: [{ id: "result-1" }],
requiresApproval: null,
});
const result = await runManagedLobsterFlow({
taskFlow,
runner,
runnerParams: {
action: "run",
pipeline: "noop",
cwd: process.cwd(),
timeoutMs: 1000,
maxStdoutBytes: 4096,
},
controllerId: "tests/lobster",
goal: "Run Lobster workflow",
});
expect(result.ok).toBe(true);
expect(taskFlow.createManaged).toHaveBeenCalledWith({
controllerId: "tests/lobster",
goal: "Run Lobster workflow",
currentStep: "run_lobster",
});
expect(taskFlow.finish).toHaveBeenCalledWith({
flowId: "flow-1",
expectedRevision: 1,
});
});
it("moves the flow to waiting when Lobster requests approval", async () => {
const taskFlow = createFakeTaskFlow();
const createdAt = new Date("2026-04-05T21:00:00.000Z");
const runner = createRunner({
ok: true,
status: "needs_approval",
output: [],
requiresApproval: {
type: "approval_request",
prompt: "Approve this?",
items: [{ id: "item-1", createdAt, count: 2n, skip: undefined }],
resumeToken: "resume-1",
},
});
const result = await runManagedLobsterFlow({
taskFlow,
runner,
runnerParams: {
action: "run",
pipeline: "noop",
cwd: process.cwd(),
timeoutMs: 1000,
maxStdoutBytes: 4096,
},
controllerId: "tests/lobster",
goal: "Run Lobster workflow",
});
expect(result.ok).toBe(true);
expect(taskFlow.setWaiting).toHaveBeenCalledWith({
flowId: "flow-1",
expectedRevision: 1,
currentStep: "await_lobster_approval",
waitJson: {
kind: "lobster_approval",
prompt: "Approve this?",
items: [{ id: "item-1", createdAt: createdAt.toISOString(), count: "2" }],
resumeToken: "resume-1",
},
});
});
it("fails the flow when Lobster returns an error envelope", async () => {
const taskFlow = createFakeTaskFlow();
const runner = createRunner({
ok: false,
error: {
type: "runtime_error",
message: "boom",
},
});
const result = await runManagedLobsterFlow({
taskFlow,
runner,
runnerParams: {
action: "run",
pipeline: "noop",
cwd: process.cwd(),
timeoutMs: 1000,
maxStdoutBytes: 4096,
},
controllerId: "tests/lobster",
goal: "Run Lobster workflow",
});
expect(result.ok).toBe(false);
expect(result.error.message).toBe("boom");
expect(taskFlow.fail).toHaveBeenCalledWith({
flowId: "flow-1",
expectedRevision: 1,
});
});
it("fails the flow when the runner throws", async () => {
const taskFlow = createFakeTaskFlow();
const runner: LobsterRunner = {
run: vi.fn().mockRejectedValue(new Error("crashed")),
};
const result = await runManagedLobsterFlow({
taskFlow,
runner,
runnerParams: {
action: "run",
pipeline: "noop",
cwd: process.cwd(),
timeoutMs: 1000,
maxStdoutBytes: 4096,
},
controllerId: "tests/lobster",
goal: "Run Lobster workflow",
});
expect(result.ok).toBe(false);
expect(result.error.message).toBe("crashed");
expect(taskFlow.fail).toHaveBeenCalledWith({
flowId: "flow-1",
expectedRevision: 1,
});
});
});
describe("resumeManagedLobsterFlow", () => {
it("resumes the flow and finishes it on success", async () => {
const taskFlow = createFakeTaskFlow();
const runner = createRunner({
ok: true,
status: "ok",
output: [],
requiresApproval: null,
});
const result = await resumeManagedLobsterFlow({
taskFlow,
runner,
flowId: "flow-1",
expectedRevision: 4,
runnerParams: {
action: "resume",
token: "resume-1",
approve: true,
cwd: process.cwd(),
timeoutMs: 1000,
maxStdoutBytes: 4096,
},
});
expect(result.ok).toBe(true);
expect(taskFlow.resume).toHaveBeenCalledWith({
flowId: "flow-1",
expectedRevision: 4,
status: "running",
currentStep: "resume_lobster",
});
expect(taskFlow.finish).toHaveBeenCalledWith({
flowId: "flow-1",
expectedRevision: 5,
});
});
it("returns a mutation error when taskFlow resume is rejected", async () => {
const taskFlow = createFakeTaskFlow({
resume: vi.fn().mockReturnValue({
applied: false,
code: "revision_conflict",
}),
});
const runner = createRunner({
ok: true,
status: "ok",
output: [],
requiresApproval: null,
});
const result = await resumeManagedLobsterFlow({
taskFlow,
runner,
flowId: "flow-1",
expectedRevision: 4,
runnerParams: {
action: "resume",
token: "resume-1",
approve: true,
cwd: process.cwd(),
timeoutMs: 1000,
maxStdoutBytes: 4096,
},
});
expect(result.ok).toBe(false);
expect(result.error.message).toMatch(/revision_conflict/);
expect(runner.run).not.toHaveBeenCalled();
});
it("returns to waiting when the resumed Lobster run needs approval again", async () => {
const taskFlow = createFakeTaskFlow();
const runner = createRunner({
ok: true,
status: "needs_approval",
output: [],
requiresApproval: {
type: "approval_request",
prompt: "Approve this too?",
items: [{ id: "item-2" }],
resumeToken: "resume-2",
},
});
const result = await resumeManagedLobsterFlow({
taskFlow,
runner,
flowId: "flow-1",
expectedRevision: 4,
runnerParams: {
action: "resume",
token: "resume-1",
approve: true,
cwd: process.cwd(),
timeoutMs: 1000,
maxStdoutBytes: 4096,
},
});
expect(result.ok).toBe(true);
expect(taskFlow.setWaiting).toHaveBeenCalledWith({
flowId: "flow-1",
expectedRevision: 5,
currentStep: "await_lobster_approval",
waitJson: {
kind: "lobster_approval",
prompt: "Approve this too?",
items: [{ id: "item-2" }],
resumeToken: "resume-2",
},
});
});
});

View File

@ -0,0 +1,275 @@
import type { OpenClawPluginApi } from "../runtime-api.js";
import type { LobsterEnvelope, LobsterRunner, LobsterRunnerParams } from "./lobster-runner.js";
type JsonLike =
| null
| boolean
| number
| string
| JsonLike[]
| {
[key: string]: JsonLike;
};
type BoundTaskFlow = ReturnType<
NonNullable<OpenClawPluginApi["runtime"]>["taskFlow"]["bindSession"]
>;
type FlowRecord = ReturnType<BoundTaskFlow["createManaged"]>;
type MutationResult = ReturnType<BoundTaskFlow["setWaiting"]>;
export type LobsterApprovalWaitState = {
kind: "lobster_approval";
prompt: string;
items: JsonLike[];
resumeToken?: string;
};
export type RunManagedLobsterFlowParams = {
taskFlow: BoundTaskFlow;
runner: LobsterRunner;
runnerParams: LobsterRunnerParams;
controllerId: string;
goal: string;
stateJson?: JsonLike;
currentStep?: string;
waitingStep?: string;
};
export type ResumeManagedLobsterFlowParams = {
taskFlow: BoundTaskFlow;
runner: LobsterRunner;
runnerParams: LobsterRunnerParams & {
action: "resume";
token: string;
approve: boolean;
};
flowId: string;
expectedRevision: number;
currentStep?: string;
waitingStep?: string;
};
export type ManagedLobsterFlowResult =
| {
ok: true;
envelope: LobsterEnvelope;
flow: FlowRecord;
mutation: MutationResult;
}
| {
ok: false;
flow?: FlowRecord;
mutation?: MutationResult;
error: Error;
};
function toJsonLike(value: unknown, seen = new WeakSet<object>()): JsonLike {
if (value === null) {
return null;
}
switch (typeof value) {
case "boolean":
case "string":
return value;
case "number":
return Number.isFinite(value) ? value : String(value);
case "bigint":
return value.toString();
case "undefined":
case "function":
case "symbol":
return null;
case "object": {
if (value instanceof Date) {
return value.toISOString();
}
if (Array.isArray(value)) {
return value.map((item) => toJsonLike(item, seen));
}
if (seen.has(value)) {
return "[Circular]";
}
seen.add(value);
const jsonObject: Record<string, JsonLike> = {};
for (const [key, entry] of Object.entries(value)) {
if (entry === undefined || typeof entry === "function" || typeof entry === "symbol") {
continue;
}
jsonObject[key] = toJsonLike(entry, seen);
}
seen.delete(value);
return jsonObject;
}
}
}
function buildApprovalWaitState(envelope: Extract<LobsterEnvelope, { ok: true }>): JsonLike {
if (!envelope.requiresApproval) {
return {
kind: "lobster_approval",
prompt: "",
items: [],
} satisfies LobsterApprovalWaitState;
}
return {
kind: "lobster_approval",
prompt: envelope.requiresApproval.prompt,
items: envelope.requiresApproval.items.map((item) => toJsonLike(item)),
...(envelope.requiresApproval.resumeToken
? { resumeToken: envelope.requiresApproval.resumeToken }
: {}),
} satisfies LobsterApprovalWaitState;
}
function applyEnvelopeToFlow(params: {
taskFlow: BoundTaskFlow;
flow: FlowRecord;
envelope: LobsterEnvelope;
waitingStep: string;
}): MutationResult {
const { taskFlow, flow, envelope, waitingStep } = params;
if (!envelope.ok) {
return taskFlow.fail({
flowId: flow.flowId,
expectedRevision: flow.revision,
});
}
if (envelope.status === "needs_approval") {
return taskFlow.setWaiting({
flowId: flow.flowId,
expectedRevision: flow.revision,
currentStep: waitingStep,
waitJson: buildApprovalWaitState(envelope),
});
}
return taskFlow.finish({
flowId: flow.flowId,
expectedRevision: flow.revision,
});
}
function buildEnvelopeError(envelope: Extract<LobsterEnvelope, { ok: false }>) {
return new Error(envelope.error.message);
}
export async function runManagedLobsterFlow(
params: RunManagedLobsterFlowParams,
): Promise<ManagedLobsterFlowResult> {
const flow = params.taskFlow.createManaged({
controllerId: params.controllerId,
goal: params.goal,
currentStep: params.currentStep ?? "run_lobster",
...(params.stateJson !== undefined ? { stateJson: params.stateJson } : {}),
});
try {
const envelope = await params.runner.run(params.runnerParams);
const mutation = applyEnvelopeToFlow({
taskFlow: params.taskFlow,
flow,
envelope,
waitingStep: params.waitingStep ?? "await_lobster_approval",
});
if (!envelope.ok) {
return {
ok: false,
flow,
mutation,
error: buildEnvelopeError(envelope),
};
}
return {
ok: true,
envelope,
flow,
mutation,
};
} catch (error) {
const err = error instanceof Error ? error : new Error(String(error));
try {
const mutation = params.taskFlow.fail({
flowId: flow.flowId,
expectedRevision: flow.revision,
});
return {
ok: false,
flow,
mutation,
error: err,
};
} catch {
return {
ok: false,
flow,
error: err,
};
}
}
}
export async function resumeManagedLobsterFlow(
params: ResumeManagedLobsterFlowParams,
): Promise<ManagedLobsterFlowResult> {
const resumed = params.taskFlow.resume({
flowId: params.flowId,
expectedRevision: params.expectedRevision,
status: "running",
currentStep: params.currentStep ?? "resume_lobster",
});
if (!resumed.applied) {
return {
ok: false,
mutation: resumed,
error: new Error(`TaskFlow resume failed: ${resumed.code}`),
};
}
try {
const envelope = await params.runner.run(params.runnerParams);
const mutation = applyEnvelopeToFlow({
taskFlow: params.taskFlow,
flow: resumed.flow,
envelope,
waitingStep: params.waitingStep ?? "await_lobster_approval",
});
if (!envelope.ok) {
return {
ok: false,
flow: resumed.flow,
mutation,
error: buildEnvelopeError(envelope),
};
}
return {
ok: true,
envelope,
flow: resumed.flow,
mutation,
};
} catch (error) {
const err = error instanceof Error ? error : new Error(String(error));
try {
const mutation = params.taskFlow.fail({
flowId: params.flowId,
expectedRevision: resumed.flow.revision,
});
return {
ok: false,
flow: resumed.flow,
mutation,
error: err,
};
} catch {
return {
ok: false,
flow: resumed.flow,
error: err,
};
}
}
}

View File

@ -4,6 +4,10 @@ import type { OpenClawPluginApi, OpenClawPluginToolContext } from "../runtime-ap
let createLobsterTool: typeof import("./lobster-tool.js").createLobsterTool;
type BoundTaskFlow = ReturnType<
NonNullable<OpenClawPluginApi["runtime"]>["taskFlow"]["bindSession"]
>;
function fakeApi(overrides: Partial<OpenClawPluginApi> = {}): OpenClawPluginApi {
return createTestPluginApi({
id: "lobster",
@ -30,6 +34,48 @@ function fakeCtx(overrides: Partial<OpenClawPluginToolContext> = {}): OpenClawPl
};
}
function createFakeTaskFlow(overrides?: Partial<BoundTaskFlow>): BoundTaskFlow {
const baseFlow = {
flowId: "flow-1",
revision: 1,
syncMode: "managed" as const,
controllerId: "tests/lobster",
ownerKey: "agent:main:main",
status: "running" as const,
goal: "Run Lobster workflow",
};
return {
sessionKey: "agent:main:main",
createManaged: vi.fn().mockReturnValue(baseFlow),
get: vi.fn(),
list: vi.fn().mockReturnValue([]),
findLatest: vi.fn(),
resolve: vi.fn(),
getTaskSummary: vi.fn(),
setWaiting: vi.fn().mockImplementation((input) => ({
applied: true,
flow: { ...baseFlow, revision: input.expectedRevision + 1, status: "waiting" as const },
})),
resume: vi.fn().mockImplementation((input) => ({
applied: true,
flow: { ...baseFlow, revision: input.expectedRevision + 1, status: "running" as const },
})),
finish: vi.fn().mockImplementation((input) => ({
applied: true,
flow: { ...baseFlow, revision: input.expectedRevision + 1, status: "completed" as const },
})),
fail: vi.fn().mockImplementation((input) => ({
applied: true,
flow: { ...baseFlow, revision: input.expectedRevision + 1, status: "failed" as const },
})),
requestCancel: vi.fn(),
cancel: vi.fn(),
runTask: vi.fn(),
...overrides,
};
}
describe("lobster plugin tool", () => {
it("returns the Lobster envelope in details", async () => {
({ createLobsterTool } = await import("./lobster-tool.js"));
@ -133,6 +179,100 @@ describe("lobster plugin tool", () => {
).rejects.toThrow("boom");
});
it("can run through managed TaskFlow mode", async () => {
({ createLobsterTool } = await import("./lobster-tool.js"));
const runner = {
run: vi.fn().mockResolvedValue({
ok: true,
status: "needs_approval",
output: [],
requiresApproval: {
type: "approval_request",
prompt: "Approve this?",
items: [{ id: "item-1" }],
resumeToken: "resume-1",
},
}),
};
const taskFlow = createFakeTaskFlow();
const tool = createLobsterTool(fakeApi(), { runner, taskFlow });
const res = await tool.execute("call-managed-run", {
action: "run",
pipeline: "noop",
flowControllerId: "tests/lobster",
flowGoal: "Run Lobster workflow",
flowStateJson: '{"lane":"email"}',
flowCurrentStep: "run_lobster",
flowWaitingStep: "await_review",
});
expect(taskFlow.createManaged).toHaveBeenCalledWith({
controllerId: "tests/lobster",
goal: "Run Lobster workflow",
currentStep: "run_lobster",
stateJson: { lane: "email" },
});
expect(taskFlow.setWaiting).toHaveBeenCalledWith({
flowId: "flow-1",
expectedRevision: 1,
currentStep: "await_review",
waitJson: {
kind: "lobster_approval",
prompt: "Approve this?",
items: [{ id: "item-1" }],
resumeToken: "resume-1",
},
});
expect(res.details).toMatchObject({
ok: true,
status: "needs_approval",
flow: {
flowId: "flow-1",
},
mutation: {
applied: true,
},
});
});
it("rejects managed TaskFlow params when no bound taskFlow runtime is available", async () => {
({ createLobsterTool } = await import("./lobster-tool.js"));
const tool = createLobsterTool(fakeApi(), {
runner: { run: vi.fn() },
});
await expect(
tool.execute("call-missing-taskflow", {
action: "run",
pipeline: "noop",
flowControllerId: "tests/lobster",
flowGoal: "Run Lobster workflow",
}),
).rejects.toThrow(/Managed TaskFlow run mode requires a bound taskFlow runtime/);
});
it("rejects invalid flowStateJson in managed TaskFlow mode", async () => {
({ createLobsterTool } = await import("./lobster-tool.js"));
const tool = createLobsterTool(fakeApi(), {
runner: { run: vi.fn() },
taskFlow: createFakeTaskFlow(),
});
await expect(
tool.execute("call-invalid-flow-json", {
action: "run",
pipeline: "noop",
flowControllerId: "tests/lobster",
flowGoal: "Run Lobster workflow",
flowStateJson: "{bad",
}),
).rejects.toThrow(/flowStateJson must be valid JSON/);
});
it("requires action", async () => {
({ createLobsterTool } = await import("./lobster-tool.js"));

View File

@ -6,8 +6,150 @@ import {
type LobsterRunner,
type LobsterRunnerParams,
} from "./lobster-runner.js";
import { resumeManagedLobsterFlow, runManagedLobsterFlow } from "./lobster-taskflow.js";
export function createLobsterTool(api: OpenClawPluginApi, options?: { runner?: LobsterRunner }) {
type BoundTaskFlow = ReturnType<
NonNullable<OpenClawPluginApi["runtime"]>["taskFlow"]["bindSession"]
>;
type JsonLike =
| null
| boolean
| number
| string
| JsonLike[]
| {
[key: string]: JsonLike;
};
type LobsterToolOptions = {
runner?: LobsterRunner;
taskFlow?: BoundTaskFlow;
};
type ManagedFlowRunParams = {
controllerId: string;
goal: string;
currentStep?: string;
waitingStep?: string;
stateJson?: JsonLike;
};
type ManagedFlowResumeParams = {
flowId: string;
expectedRevision: number;
currentStep?: string;
waitingStep?: string;
};
function readOptionalTrimmedString(value: unknown, fieldName: string): string | undefined {
if (value === undefined) {
return undefined;
}
if (typeof value !== "string") {
throw new Error(`${fieldName} must be a string`);
}
const trimmed = value.trim();
return trimmed ? trimmed : undefined;
}
function readOptionalNumber(value: unknown, fieldName: string): number | undefined {
if (value === undefined) {
return undefined;
}
if (typeof value !== "number" || !Number.isInteger(value)) {
throw new Error(`${fieldName} must be an integer`);
}
return value;
}
function parseOptionalFlowStateJson(value: unknown): JsonLike | undefined {
if (value === undefined) {
return undefined;
}
if (typeof value !== "string") {
throw new Error("flowStateJson must be a JSON string");
}
try {
return JSON.parse(value) as JsonLike;
} catch {
throw new Error("flowStateJson must be valid JSON");
}
}
function parseRunFlowParams(params: Record<string, unknown>): ManagedFlowRunParams | null {
const controllerId = readOptionalTrimmedString(params.flowControllerId, "flowControllerId");
const goal = readOptionalTrimmedString(params.flowGoal, "flowGoal");
const currentStep = readOptionalTrimmedString(params.flowCurrentStep, "flowCurrentStep");
const waitingStep = readOptionalTrimmedString(params.flowWaitingStep, "flowWaitingStep");
const stateJson = parseOptionalFlowStateJson(params.flowStateJson);
const resumeFlowId = readOptionalTrimmedString(params.flowId, "flowId");
const resumeRevision = readOptionalNumber(params.flowExpectedRevision, "flowExpectedRevision");
const hasRunFields =
controllerId !== undefined ||
goal !== undefined ||
currentStep !== undefined ||
waitingStep !== undefined ||
stateJson !== undefined;
if (!hasRunFields) {
return null;
}
if (resumeFlowId !== undefined || resumeRevision !== undefined) {
throw new Error("run action does not accept flowId or flowExpectedRevision");
}
if (!controllerId) {
throw new Error("flowControllerId required when using managed TaskFlow run mode");
}
if (!goal) {
throw new Error("flowGoal required when using managed TaskFlow run mode");
}
return {
controllerId,
goal,
...(currentStep ? { currentStep } : {}),
...(waitingStep ? { waitingStep } : {}),
...(stateJson !== undefined ? { stateJson } : {}),
};
}
function parseResumeFlowParams(params: Record<string, unknown>): ManagedFlowResumeParams | null {
const flowId = readOptionalTrimmedString(params.flowId, "flowId");
const expectedRevision = readOptionalNumber(params.flowExpectedRevision, "flowExpectedRevision");
const currentStep = readOptionalTrimmedString(params.flowCurrentStep, "flowCurrentStep");
const waitingStep = readOptionalTrimmedString(params.flowWaitingStep, "flowWaitingStep");
const runControllerId = readOptionalTrimmedString(params.flowControllerId, "flowControllerId");
const runGoal = readOptionalTrimmedString(params.flowGoal, "flowGoal");
const stateJson = params.flowStateJson;
const hasResumeFields =
flowId !== undefined ||
expectedRevision !== undefined ||
currentStep !== undefined ||
waitingStep !== undefined;
if (!hasResumeFields) {
return null;
}
if (runControllerId !== undefined || runGoal !== undefined || stateJson !== undefined) {
throw new Error("resume action does not accept flowControllerId, flowGoal, or flowStateJson");
}
if (!flowId) {
throw new Error("flowId required when using managed TaskFlow resume mode");
}
if (expectedRevision === undefined) {
throw new Error("flowExpectedRevision required when using managed TaskFlow resume mode");
}
return {
flowId,
expectedRevision,
...(currentStep ? { currentStep } : {}),
...(waitingStep ? { waitingStep } : {}),
};
}
export function createLobsterTool(api: OpenClawPluginApi, options?: LobsterToolOptions) {
const runner = options?.runner ?? createEmbeddedLobsterRunner();
return {
name: "lobster",
@ -29,6 +171,13 @@ export function createLobsterTool(api: OpenClawPluginApi, options?: { runner?: L
),
timeoutMs: Type.Optional(Type.Number()),
maxStdoutBytes: Type.Optional(Type.Number()),
flowControllerId: Type.Optional(Type.String()),
flowGoal: Type.Optional(Type.String()),
flowStateJson: Type.Optional(Type.String()),
flowId: Type.Optional(Type.String()),
flowExpectedRevision: Type.Optional(Type.Number()),
flowCurrentStep: Type.Optional(Type.String()),
flowWaitingStep: Type.Optional(Type.String()),
}),
async execute(_id: string, params: Record<string, unknown>) {
const action = typeof params.action === "string" ? params.action.trim() : "";
@ -58,11 +207,75 @@ export function createLobsterTool(api: OpenClawPluginApi, options?: { runner?: L
timeoutMs,
maxStdoutBytes,
};
const taskFlow = options?.taskFlow;
if (action === "run") {
const flowParams = parseRunFlowParams(params);
if (flowParams) {
if (!taskFlow) {
throw new Error("Managed TaskFlow run mode requires a bound taskFlow runtime");
}
const result = await runManagedLobsterFlow({
taskFlow,
runner,
runnerParams,
controllerId: flowParams.controllerId,
goal: flowParams.goal,
...(flowParams.stateJson !== undefined ? { stateJson: flowParams.stateJson } : {}),
...(flowParams.currentStep ? { currentStep: flowParams.currentStep } : {}),
...(flowParams.waitingStep ? { waitingStep: flowParams.waitingStep } : {}),
});
if (!result.ok) {
throw result.error;
}
const details = {
...result.envelope,
flow: result.flow,
mutation: result.mutation,
};
return {
content: [{ type: "text", text: JSON.stringify(details, null, 2) }],
details,
};
}
} else {
const flowParams = parseResumeFlowParams(params);
if (flowParams) {
if (!taskFlow) {
throw new Error("Managed TaskFlow resume mode requires a bound taskFlow runtime");
}
const result = await resumeManagedLobsterFlow({
taskFlow,
runner,
runnerParams: runnerParams as LobsterRunnerParams & {
action: "resume";
token: string;
approve: boolean;
},
flowId: flowParams.flowId,
expectedRevision: flowParams.expectedRevision,
...(flowParams.currentStep ? { currentStep: flowParams.currentStep } : {}),
...(flowParams.waitingStep ? { waitingStep: flowParams.waitingStep } : {}),
});
if (!result.ok) {
throw result.error;
}
const details = {
...result.envelope,
flow: result.flow,
mutation: result.mutation,
};
return {
content: [{ type: "text", text: JSON.stringify(details, null, 2) }],
details,
};
}
}
const envelope = await runner.run(runnerParams);
if (!envelope.ok) {
throw new Error(envelope.error.message);
}
return {
content: [{ type: "text", text: JSON.stringify(envelope, null, 2) }],
details: envelope,