mirror of https://github.com/openclaw/openclaw.git
Lobster: harden embedded runtime integration (#61566)
Merged via squash.
Prepared head SHA: a6f48309fd
Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com>
Reviewed-by: @mbelinky
This commit is contained in:
parent
e8f0f91d29
commit
b167df78aa
|
|
@ -46,6 +46,8 @@ Docs: https://docs.openclaw.ai
|
|||
- Docs/IRC: replace public IRC hostname examples with `irc.example.com` and recommend private servers for bot coordination while listing common public networks for intentional use.
|
||||
- Memory/dreaming: group nearby daily-note lines into short coherent chunks before staging them for dreaming, so one-off context from recent notes reaches REM/deep with better evidence and less line-level noise.
|
||||
- Memory/dreaming: drop generic date/day headings from daily-note chunk prefixes while keeping meaningful section labels, so staged snippets stay cleaner and more reusable. (#61597) Thanks @mbelinky.
|
||||
- Plugins/Lobster: run bundled Lobster workflows in process instead of spawning the external CLI, reducing transport overhead and unblocking native runtime integration. (#61523) Thanks @mbelinky.
|
||||
- Plugins/Lobster: harden managed resume validation so invalid TaskFlow resume calls fail earlier, and memoize embedded runtime loading per runner while keeping failed loads retryable. (#61566) Thanks @mbelinky.
|
||||
|
||||
### Fixes
|
||||
|
||||
|
|
|
|||
|
|
@ -205,6 +205,46 @@ describe("createEmbeddedLobsterRunner", () => {
|
|||
});
|
||||
});
|
||||
|
||||
it("loads the embedded runtime once per runner", async () => {
|
||||
const runtime = {
|
||||
runToolRequest: vi.fn().mockResolvedValue({
|
||||
ok: true,
|
||||
protocolVersion: 1,
|
||||
status: "ok",
|
||||
output: [],
|
||||
requiresApproval: null,
|
||||
}),
|
||||
resumeToolRequest: vi.fn().mockResolvedValue({
|
||||
ok: true,
|
||||
protocolVersion: 1,
|
||||
status: "cancelled",
|
||||
output: [],
|
||||
requiresApproval: null,
|
||||
}),
|
||||
};
|
||||
const loadRuntime = vi.fn().mockResolvedValue(runtime);
|
||||
|
||||
const runner = createEmbeddedLobsterRunner({ loadRuntime });
|
||||
|
||||
await runner.run({
|
||||
action: "run",
|
||||
pipeline: "exec --json=true echo hi",
|
||||
cwd: process.cwd(),
|
||||
timeoutMs: 2000,
|
||||
maxStdoutBytes: 4096,
|
||||
});
|
||||
await runner.run({
|
||||
action: "resume",
|
||||
token: "resume-token",
|
||||
approve: false,
|
||||
cwd: process.cwd(),
|
||||
timeoutMs: 2000,
|
||||
maxStdoutBytes: 4096,
|
||||
});
|
||||
|
||||
expect(loadRuntime).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("requires a pipeline for run", async () => {
|
||||
const runner = createEmbeddedLobsterRunner({
|
||||
loadRuntime: vi.fn().mockResolvedValue({
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
import { randomUUID } from "node:crypto";
|
||||
import { existsSync } from "node:fs";
|
||||
import { createRequire } from "node:module";
|
||||
import path from "node:path";
|
||||
import { Readable, Writable } from "node:stream";
|
||||
|
|
@ -597,7 +598,19 @@ async function importInstalledLobsterModule<T>(
|
|||
function resolveInstalledLobsterRoot() {
|
||||
const require = createRequire(import.meta.url);
|
||||
const sdkEntry = require.resolve("@clawdbot/lobster");
|
||||
return path.resolve(path.dirname(sdkEntry), "../../..");
|
||||
let currentDir = path.dirname(sdkEntry);
|
||||
|
||||
while (true) {
|
||||
const packageJsonPath = path.join(currentDir, "package.json");
|
||||
if (existsSync(packageJsonPath)) {
|
||||
return currentDir;
|
||||
}
|
||||
const parentDir = path.dirname(currentDir);
|
||||
if (parentDir === currentDir) {
|
||||
throw new Error("Unable to resolve the installed @clawdbot/lobster package root");
|
||||
}
|
||||
currentDir = parentDir;
|
||||
}
|
||||
}
|
||||
|
||||
async function loadEmbeddedToolRuntimeFromPackage(): Promise<EmbeddedToolRuntime> {
|
||||
|
|
@ -672,9 +685,19 @@ export function createEmbeddedLobsterRunner(options?: {
|
|||
loadRuntime?: LoadEmbeddedToolRuntime;
|
||||
}): LobsterRunner {
|
||||
const loadRuntime = options?.loadRuntime ?? loadEmbeddedToolRuntimeFromPackage;
|
||||
let runtimePromise: Promise<EmbeddedToolRuntime> | undefined;
|
||||
|
||||
const getRuntime = () => {
|
||||
runtimePromise ??= loadRuntime().catch((error) => {
|
||||
runtimePromise = undefined;
|
||||
throw error;
|
||||
});
|
||||
return runtimePromise;
|
||||
};
|
||||
|
||||
return {
|
||||
async run(params) {
|
||||
const runtime = await loadRuntime();
|
||||
const runtime = await getRuntime();
|
||||
return await withTimeout(params.timeoutMs, async (signal) => {
|
||||
const ctx = createEmbeddedToolContext(params, signal);
|
||||
|
||||
|
|
|
|||
|
|
@ -1,56 +1,17 @@
|
|||
import { describe, expect, it, vi } from "vitest";
|
||||
import type { OpenClawPluginApi } from "../runtime-api.js";
|
||||
import type { LobsterRunner } from "./lobster-runner.js";
|
||||
import { resumeManagedLobsterFlow, runManagedLobsterFlow } from "./lobster-taskflow.js";
|
||||
import { createFakeTaskFlow } from "./taskflow-test-helpers.js";
|
||||
|
||||
type BoundTaskFlow = ReturnType<
|
||||
NonNullable<OpenClawPluginApi["runtime"]>["taskFlow"]["bindSession"]
|
||||
>;
|
||||
|
||||
function createFakeTaskFlow(overrides?: Partial<BoundTaskFlow>) {
|
||||
const baseFlow = {
|
||||
flowId: "flow-1",
|
||||
revision: 1,
|
||||
syncMode: "managed" as const,
|
||||
controllerId: "tests/lobster",
|
||||
ownerKey: "agent:main:main",
|
||||
status: "running" as const,
|
||||
goal: "Run Lobster workflow",
|
||||
};
|
||||
|
||||
const taskFlow: BoundTaskFlow = {
|
||||
sessionKey: "agent:main:main",
|
||||
createManaged: vi.fn().mockReturnValue(baseFlow),
|
||||
get: vi.fn(),
|
||||
list: vi.fn().mockReturnValue([]),
|
||||
findLatest: vi.fn(),
|
||||
resolve: vi.fn(),
|
||||
getTaskSummary: vi.fn(),
|
||||
setWaiting: vi.fn().mockImplementation((input) => ({
|
||||
applied: true,
|
||||
flow: { ...baseFlow, revision: input.expectedRevision + 1, status: "waiting" as const },
|
||||
})),
|
||||
resume: vi.fn().mockImplementation((input) => ({
|
||||
applied: true,
|
||||
flow: { ...baseFlow, revision: input.expectedRevision + 1, status: "running" as const },
|
||||
})),
|
||||
finish: vi.fn().mockImplementation((input) => ({
|
||||
applied: true,
|
||||
flow: { ...baseFlow, revision: input.expectedRevision + 1, status: "completed" as const },
|
||||
})),
|
||||
fail: vi.fn().mockImplementation((input) => ({
|
||||
applied: true,
|
||||
flow: { ...baseFlow, revision: input.expectedRevision + 1, status: "failed" as const },
|
||||
})),
|
||||
requestCancel: vi.fn(),
|
||||
cancel: vi.fn(),
|
||||
runTask: vi.fn(),
|
||||
...overrides,
|
||||
};
|
||||
|
||||
return taskFlow;
|
||||
function expectManagedFlowFailure(
|
||||
result: Awaited<ReturnType<typeof runManagedLobsterFlow | typeof resumeManagedLobsterFlow>>,
|
||||
) {
|
||||
expect(result.ok).toBe(false);
|
||||
if (result.ok) {
|
||||
throw new Error("Expected managed Lobster flow to fail");
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
function createRunner(result: Awaited<ReturnType<LobsterRunner["run"]>>): LobsterRunner {
|
||||
return {
|
||||
run: vi.fn().mockResolvedValue(result),
|
||||
|
|
|
|||
|
|
@ -1,13 +1,10 @@
|
|||
import { describe, expect, it, vi } from "vitest";
|
||||
import { createTestPluginApi } from "../../../test/helpers/plugins/plugin-api.js";
|
||||
import type { OpenClawPluginApi, OpenClawPluginToolContext } from "../runtime-api.js";
|
||||
import { createFakeTaskFlow } from "./taskflow-test-helpers.js";
|
||||
|
||||
let createLobsterTool: typeof import("./lobster-tool.js").createLobsterTool;
|
||||
|
||||
type BoundTaskFlow = ReturnType<
|
||||
NonNullable<OpenClawPluginApi["runtime"]>["taskFlow"]["bindSession"]
|
||||
>;
|
||||
|
||||
function fakeApi(overrides: Partial<OpenClawPluginApi> = {}): OpenClawPluginApi {
|
||||
return createTestPluginApi({
|
||||
id: "lobster",
|
||||
|
|
@ -34,48 +31,6 @@ function fakeCtx(overrides: Partial<OpenClawPluginToolContext> = {}): OpenClawPl
|
|||
};
|
||||
}
|
||||
|
||||
function createFakeTaskFlow(overrides?: Partial<BoundTaskFlow>): BoundTaskFlow {
|
||||
const baseFlow = {
|
||||
flowId: "flow-1",
|
||||
revision: 1,
|
||||
syncMode: "managed" as const,
|
||||
controllerId: "tests/lobster",
|
||||
ownerKey: "agent:main:main",
|
||||
status: "running" as const,
|
||||
goal: "Run Lobster workflow",
|
||||
};
|
||||
|
||||
return {
|
||||
sessionKey: "agent:main:main",
|
||||
createManaged: vi.fn().mockReturnValue(baseFlow),
|
||||
get: vi.fn(),
|
||||
list: vi.fn().mockReturnValue([]),
|
||||
findLatest: vi.fn(),
|
||||
resolve: vi.fn(),
|
||||
getTaskSummary: vi.fn(),
|
||||
setWaiting: vi.fn().mockImplementation((input) => ({
|
||||
applied: true,
|
||||
flow: { ...baseFlow, revision: input.expectedRevision + 1, status: "waiting" as const },
|
||||
})),
|
||||
resume: vi.fn().mockImplementation((input) => ({
|
||||
applied: true,
|
||||
flow: { ...baseFlow, revision: input.expectedRevision + 1, status: "running" as const },
|
||||
})),
|
||||
finish: vi.fn().mockImplementation((input) => ({
|
||||
applied: true,
|
||||
flow: { ...baseFlow, revision: input.expectedRevision + 1, status: "completed" as const },
|
||||
})),
|
||||
fail: vi.fn().mockImplementation((input) => ({
|
||||
applied: true,
|
||||
flow: { ...baseFlow, revision: input.expectedRevision + 1, status: "failed" as const },
|
||||
})),
|
||||
requestCancel: vi.fn(),
|
||||
cancel: vi.fn(),
|
||||
runTask: vi.fn(),
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
describe("lobster plugin tool", () => {
|
||||
it("returns the Lobster envelope in details", async () => {
|
||||
({ createLobsterTool } = await import("./lobster-tool.js"));
|
||||
|
|
@ -273,6 +228,42 @@ describe("lobster plugin tool", () => {
|
|||
).rejects.toThrow(/flowStateJson must be valid JSON/);
|
||||
});
|
||||
|
||||
it("rejects managed TaskFlow resume mode without a token", async () => {
|
||||
({ createLobsterTool } = await import("./lobster-tool.js"));
|
||||
|
||||
const tool = createLobsterTool(fakeApi(), {
|
||||
runner: { run: vi.fn() },
|
||||
taskFlow: createFakeTaskFlow(),
|
||||
});
|
||||
|
||||
await expect(
|
||||
tool.execute("call-missing-resume-token", {
|
||||
action: "resume",
|
||||
flowId: "flow-1",
|
||||
flowExpectedRevision: 1,
|
||||
approve: true,
|
||||
}),
|
||||
).rejects.toThrow(/token required when using managed TaskFlow resume mode/);
|
||||
});
|
||||
|
||||
it("rejects managed TaskFlow resume mode without approve", async () => {
|
||||
({ createLobsterTool } = await import("./lobster-tool.js"));
|
||||
|
||||
const tool = createLobsterTool(fakeApi(), {
|
||||
runner: { run: vi.fn() },
|
||||
taskFlow: createFakeTaskFlow(),
|
||||
});
|
||||
|
||||
await expect(
|
||||
tool.execute("call-missing-resume-approve", {
|
||||
action: "resume",
|
||||
token: "resume-token",
|
||||
flowId: "flow-1",
|
||||
flowExpectedRevision: 1,
|
||||
}),
|
||||
).rejects.toThrow(/approve required when using managed TaskFlow resume mode/);
|
||||
});
|
||||
|
||||
it("requires action", async () => {
|
||||
({ createLobsterTool } = await import("./lobster-tool.js"));
|
||||
|
||||
|
|
|
|||
|
|
@ -63,6 +63,16 @@ function readOptionalNumber(value: unknown, fieldName: string): number | undefin
|
|||
return value;
|
||||
}
|
||||
|
||||
function readOptionalBoolean(value: unknown, fieldName: string): boolean | undefined {
|
||||
if (value === undefined) {
|
||||
return undefined;
|
||||
}
|
||||
if (typeof value !== "boolean") {
|
||||
throw new Error(`${fieldName} must be a boolean`);
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
function parseOptionalFlowStateJson(value: unknown): JsonLike | undefined {
|
||||
if (value === undefined) {
|
||||
return undefined;
|
||||
|
|
@ -119,6 +129,8 @@ function parseResumeFlowParams(params: Record<string, unknown>): ManagedFlowResu
|
|||
const expectedRevision = readOptionalNumber(params.flowExpectedRevision, "flowExpectedRevision");
|
||||
const currentStep = readOptionalTrimmedString(params.flowCurrentStep, "flowCurrentStep");
|
||||
const waitingStep = readOptionalTrimmedString(params.flowWaitingStep, "flowWaitingStep");
|
||||
const token = readOptionalTrimmedString(params.token, "token");
|
||||
const approve = readOptionalBoolean(params.approve, "approve");
|
||||
const runControllerId = readOptionalTrimmedString(params.flowControllerId, "flowControllerId");
|
||||
const runGoal = readOptionalTrimmedString(params.flowGoal, "flowGoal");
|
||||
const stateJson = params.flowStateJson;
|
||||
|
|
@ -141,6 +153,12 @@ function parseResumeFlowParams(params: Record<string, unknown>): ManagedFlowResu
|
|||
if (expectedRevision === undefined) {
|
||||
throw new Error("flowExpectedRevision required when using managed TaskFlow resume mode");
|
||||
}
|
||||
if (!token) {
|
||||
throw new Error("token required when using managed TaskFlow resume mode");
|
||||
}
|
||||
if (approve === undefined) {
|
||||
throw new Error("approve required when using managed TaskFlow resume mode");
|
||||
}
|
||||
return {
|
||||
flowId,
|
||||
expectedRevision,
|
||||
|
|
|
|||
|
|
@ -0,0 +1,48 @@
|
|||
import { vi } from "vitest";
|
||||
import type { OpenClawPluginApi } from "../runtime-api.js";
|
||||
|
||||
export type BoundTaskFlow = ReturnType<
|
||||
NonNullable<OpenClawPluginApi["runtime"]>["taskFlow"]["bindSession"]
|
||||
>;
|
||||
|
||||
export function createFakeTaskFlow(overrides?: Partial<BoundTaskFlow>): BoundTaskFlow {
|
||||
const baseFlow = {
|
||||
flowId: "flow-1",
|
||||
revision: 1,
|
||||
syncMode: "managed" as const,
|
||||
controllerId: "tests/lobster",
|
||||
ownerKey: "agent:main:main",
|
||||
status: "running" as const,
|
||||
goal: "Run Lobster workflow",
|
||||
};
|
||||
|
||||
return {
|
||||
sessionKey: "agent:main:main",
|
||||
createManaged: vi.fn().mockReturnValue(baseFlow),
|
||||
get: vi.fn(),
|
||||
list: vi.fn().mockReturnValue([]),
|
||||
findLatest: vi.fn(),
|
||||
resolve: vi.fn(),
|
||||
getTaskSummary: vi.fn(),
|
||||
setWaiting: vi.fn().mockImplementation((input) => ({
|
||||
applied: true,
|
||||
flow: { ...baseFlow, revision: input.expectedRevision + 1, status: "waiting" as const },
|
||||
})),
|
||||
resume: vi.fn().mockImplementation((input) => ({
|
||||
applied: true,
|
||||
flow: { ...baseFlow, revision: input.expectedRevision + 1, status: "running" as const },
|
||||
})),
|
||||
finish: vi.fn().mockImplementation((input) => ({
|
||||
applied: true,
|
||||
flow: { ...baseFlow, revision: input.expectedRevision + 1, status: "completed" as const },
|
||||
})),
|
||||
fail: vi.fn().mockImplementation((input) => ({
|
||||
applied: true,
|
||||
flow: { ...baseFlow, revision: input.expectedRevision + 1, status: "failed" as const },
|
||||
})),
|
||||
requestCancel: vi.fn(),
|
||||
cancel: vi.fn(),
|
||||
runTask: vi.fn(),
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
Loading…
Reference in New Issue