From c75f4695b71bbbca557ade25543a2b134ef6ed67 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 31 Mar 2026 14:48:22 +0100 Subject: [PATCH] refactor: move tasks into bundled plugin --- docs/plugins/sdk-overview.md | 17 +- docs/plugins/sdk-runtime.md | 34 ++ extensions/lobster/src/lobster-tool.test.ts | 1 + extensions/tasks/index.test.ts | 29 ++ extensions/tasks/index.ts | 31 ++ extensions/tasks/openclaw.plugin.json | 9 + extensions/tasks/package.json | 12 + extensions/tasks/src/cli.runtime.test.ts | 371 ++++++++++++++ extensions/tasks/src/cli.runtime.ts | 464 ++++++++++++++++++ extensions/tasks/src/cli.ts | 162 ++++++ .../src/exec-approval-resolver.test.ts | 24 +- package.json | 4 + scripts/lib/plugin-sdk-entrypoints.json | 1 + .../bash-tools.exec-approval-followup.test.ts | 5 +- .../register.status-health-sessions.test.ts | 111 ----- .../register.status-health-sessions.ts | 169 +------ src/commands/tasks.ts | 424 ---------------- src/gateway/server.impl.ts | 6 +- src/infra/exec-approval-channel-runtime.ts | 5 +- src/infra/exec-approval-reply.ts | 3 +- src/plugin-sdk/operations-default.ts | 14 + src/plugin-sdk/plugin-entry.ts | 30 ++ src/plugins/api-builder.ts | 3 + src/plugins/captured-registration.test.ts | 1 + src/plugins/loader.test.ts | 179 +++++++ src/plugins/loader.ts | 26 + src/plugins/operations-state.test.ts | 134 +++++ src/plugins/operations-state.ts | 277 +++++++++++ src/plugins/registry.ts | 15 + src/plugins/runtime/index.test.ts | 14 + src/plugins/runtime/index.ts | 17 + src/plugins/runtime/types-core.ts | 27 + src/plugins/types.ts | 3 + src/tasks/operations-runtime.test.ts | 183 +++++++ src/tasks/operations-runtime.ts | 389 +++++++++++++++ src/tasks/task-executor-boundary.test.ts | 1 + .../task-registry-import-boundary.test.ts | 2 +- test/helpers/plugins/plugin-api.ts | 1 + test/helpers/plugins/plugin-runtime-mock.ts | 30 ++ 39 files changed, 2492 insertions(+), 736 deletions(-) create mode 100644 extensions/tasks/index.test.ts create mode 100644 extensions/tasks/index.ts create mode 100644 extensions/tasks/openclaw.plugin.json create mode 100644 extensions/tasks/package.json create mode 100644 extensions/tasks/src/cli.runtime.test.ts create mode 100644 extensions/tasks/src/cli.runtime.ts create mode 100644 extensions/tasks/src/cli.ts delete mode 100644 src/commands/tasks.ts create mode 100644 src/plugin-sdk/operations-default.ts create mode 100644 src/plugins/operations-state.test.ts create mode 100644 src/plugins/operations-state.ts create mode 100644 src/tasks/operations-runtime.test.ts create mode 100644 src/tasks/operations-runtime.ts diff --git a/docs/plugins/sdk-overview.md b/docs/plugins/sdk-overview.md index 4e05ef942cf..69c9be1455e 100644 --- a/docs/plugins/sdk-overview.md +++ b/docs/plugins/sdk-overview.md @@ -140,14 +140,15 @@ methods: ### Infrastructure -| Method | What it registers | -| ---------------------------------------------- | --------------------- | -| `api.registerHook(events, handler, opts?)` | Event hook | -| `api.registerHttpRoute(params)` | Gateway HTTP endpoint | -| `api.registerGatewayMethod(name, handler)` | Gateway RPC method | -| `api.registerCli(registrar, opts?)` | CLI subcommand | -| `api.registerService(service)` | Background service | -| `api.registerInteractiveHandler(registration)` | Interactive handler | +| Method | What it registers | +| ---------------------------------------------- | -------------------------- | +| `api.registerHook(events, handler, opts?)` | Event hook | +| `api.registerHttpRoute(params)` | Gateway HTTP endpoint | +| `api.registerGatewayMethod(name, handler)` | Gateway RPC method | +| `api.registerCli(registrar, opts?)` | CLI subcommand | +| `api.registerService(service)` | Background service | +| `api.registerInteractiveHandler(registration)` | Interactive handler | +| `api.registerOperationsRuntime(runtime)` | Durable operations runtime | ### CLI registration metadata diff --git a/docs/plugins/sdk-runtime.md b/docs/plugins/sdk-runtime.md index 607b6d74101..ab77fba0750 100644 --- a/docs/plugins/sdk-runtime.md +++ b/docs/plugins/sdk-runtime.md @@ -115,6 +115,40 @@ await api.runtime.subagent.deleteSession({ Untrusted plugins can still run subagents, but override requests are rejected. +### `api.runtime.operations` + +Dispatch and query durable operation records behind a plugin-owned operations +runtime. + +```typescript +const created = await api.runtime.operations.dispatch({ + type: "create", + namespace: "imports", + kind: "csv", + status: "queued", + description: "Import contacts.csv", + runId: "import-1", +}); + +const progressed = await api.runtime.operations.dispatch({ + type: "transition", + runId: "import-1", + status: "running", + progressSummary: "Parsing rows", +}); + +const record = await api.runtime.operations.findByRunId("import-1"); +const list = await api.runtime.operations.list({ namespace: "imports" }); +const summary = await api.runtime.operations.summarize({ namespace: "imports" }); +``` + +Notes: + +- `api.registerOperationsRuntime(...)` installs the active runtime. +- Core exposes the facade; plugins own the operation semantics and storage. +- The built-in default runtime maps the existing background task ledger into the + generic operations shape until a plugin overrides it. + ### `api.runtime.tts` Text-to-speech synthesis. diff --git a/extensions/lobster/src/lobster-tool.test.ts b/extensions/lobster/src/lobster-tool.test.ts index de3f2cc2cae..205402402d0 100644 --- a/extensions/lobster/src/lobster-tool.test.ts +++ b/extensions/lobster/src/lobster-tool.test.ts @@ -60,6 +60,7 @@ function fakeApi(overrides: Partial = {}): OpenClawPluginApi registerMemoryFlushPlan() {}, registerMemoryRuntime() {}, registerMemoryEmbeddingProvider() {}, + registerOperationsRuntime() {}, on() {}, resolvePath: (p) => p, ...overrides, diff --git a/extensions/tasks/index.test.ts b/extensions/tasks/index.test.ts new file mode 100644 index 00000000000..d9da16de07e --- /dev/null +++ b/extensions/tasks/index.test.ts @@ -0,0 +1,29 @@ +import { describe, expect, it, vi } from "vitest"; +import { createTestPluginApi } from "../../test/helpers/plugins/plugin-api.js"; +import { createPluginRuntimeMock } from "../../test/helpers/plugins/plugin-runtime-mock.js"; +import tasksPlugin from "./index.js"; + +describe("tasks plugin", () => { + it("registers the default operations runtime, maintenance service, and CLI", () => { + const registerOperationsRuntime = vi.fn(); + const registerService = vi.fn(); + const registerCli = vi.fn(); + + tasksPlugin.register( + createTestPluginApi({ + id: "tasks", + name: "Tasks", + source: "test", + config: {}, + runtime: createPluginRuntimeMock(), + registerOperationsRuntime, + registerService, + registerCli, + }), + ); + + expect(registerOperationsRuntime).toHaveBeenCalledTimes(1); + expect(registerService).toHaveBeenCalledTimes(1); + expect(registerCli).toHaveBeenCalledTimes(1); + }); +}); diff --git a/extensions/tasks/index.ts b/extensions/tasks/index.ts new file mode 100644 index 00000000000..ea304aeb930 --- /dev/null +++ b/extensions/tasks/index.ts @@ -0,0 +1,31 @@ +import { createDefaultOperationsMaintenanceService } from "openclaw/plugin-sdk/operations-default"; +import { defaultOperationsRuntime } from "openclaw/plugin-sdk/operations-default"; +import { definePluginEntry } from "openclaw/plugin-sdk/plugin-entry"; +import { registerTasksCli } from "./src/cli.js"; + +export default definePluginEntry({ + id: "tasks", + name: "Tasks", + description: "Durable task inspection and maintenance CLI", + register(api) { + api.registerOperationsRuntime(defaultOperationsRuntime); + api.registerService(createDefaultOperationsMaintenanceService()); + api.registerCli( + ({ program }) => { + registerTasksCli(program, { + config: api.config, + operations: api.runtime.operations, + }); + }, + { + descriptors: [ + { + name: "tasks", + description: "Inspect durable background task state", + hasSubcommands: true, + }, + ], + }, + ); + }, +}); diff --git a/extensions/tasks/openclaw.plugin.json b/extensions/tasks/openclaw.plugin.json new file mode 100644 index 00000000000..d277f3e83c0 --- /dev/null +++ b/extensions/tasks/openclaw.plugin.json @@ -0,0 +1,9 @@ +{ + "id": "tasks", + "enabledByDefault": true, + "configSchema": { + "type": "object", + "additionalProperties": false, + "properties": {} + } +} diff --git a/extensions/tasks/package.json b/extensions/tasks/package.json new file mode 100644 index 00000000000..36565bfde38 --- /dev/null +++ b/extensions/tasks/package.json @@ -0,0 +1,12 @@ +{ + "name": "@openclaw/tasks", + "version": "2026.3.30", + "private": true, + "description": "OpenClaw durable tasks plugin", + "type": "module", + "openclaw": { + "extensions": [ + "./index.ts" + ] + } +} diff --git a/extensions/tasks/src/cli.runtime.test.ts b/extensions/tasks/src/cli.runtime.test.ts new file mode 100644 index 00000000000..2901096c43c --- /dev/null +++ b/extensions/tasks/src/cli.runtime.test.ts @@ -0,0 +1,371 @@ +import type { + PluginOperationAuditFinding, + PluginOperationRecord, + PluginOperationsRuntime, +} from "openclaw/plugin-sdk/plugin-entry"; +import { createLoggerBackedRuntime, type OutputRuntimeEnv } from "openclaw/plugin-sdk/runtime"; +import { beforeEach, describe, expect, it, vi } from "vitest"; +import { + runTasksAudit, + runTasksCancel, + runTasksList, + runTasksMaintenance, + runTasksNotify, + runTasksShow, +} from "./cli.runtime.js"; + +function createRuntimeCapture() { + const logs: string[] = []; + const errors: string[] = []; + const runtime = createLoggerBackedRuntime({ + logger: { + info(message) { + logs.push(message); + }, + error(message) { + errors.push(message); + }, + }, + exitError(code) { + return new Error(`exit ${code}`); + }, + }) as OutputRuntimeEnv; + return { runtime, logs, errors }; +} + +function createOperationsMock(): PluginOperationsRuntime { + return { + dispatch: vi.fn().mockResolvedValue({ + matched: false, + record: null, + }), + getById: vi.fn().mockResolvedValue(null), + findByRunId: vi.fn().mockResolvedValue(null), + list: vi.fn().mockResolvedValue([]), + summarize: vi.fn().mockResolvedValue({ + total: 0, + active: 0, + terminal: 0, + failures: 0, + byNamespace: {}, + byKind: {}, + byStatus: {}, + }), + audit: vi.fn().mockResolvedValue([]), + maintenance: vi.fn().mockResolvedValue({ + reconciled: 0, + cleanupStamped: 0, + pruned: 0, + }), + cancel: vi.fn().mockResolvedValue({ + found: false, + cancelled: false, + }), + }; +} + +const taskFixture: PluginOperationRecord = { + operationId: "task-12345678", + namespace: "tasks", + kind: "acp", + status: "running", + sourceId: "run-12345678", + requesterSessionKey: "agent:main:main", + childSessionKey: "agent:codex:acp:child", + runId: "run-12345678", + title: "Task title", + description: "Create a file", + createdAt: Date.parse("2026-03-29T10:00:00.000Z"), + updatedAt: Date.parse("2026-03-29T10:00:10.000Z"), + progressSummary: "No output for 60s. It may be waiting for input.", + metadata: { + deliveryStatus: "pending", + notifyPolicy: "state_changes", + }, +}; + +describe("tasks CLI runtime", () => { + let operations: PluginOperationsRuntime; + let logs: string[]; + let errors: string[]; + let runtime: OutputRuntimeEnv; + let config: import("openclaw/plugin-sdk/plugin-entry").OpenClawConfig; + + beforeEach(() => { + operations = createOperationsMock(); + ({ runtime, logs, errors } = createRuntimeCapture()); + config = {}; + }); + + it("lists task rows with progress summary fallback", async () => { + vi.mocked(operations.list).mockResolvedValue([taskFixture]); + + await runTasksList( + { + runtime: "acp", + status: "running", + }, + { + config, + operations, + }, + runtime, + ); + + expect(logs[0]).toContain("Background tasks: 1"); + expect(logs[1]).toContain("Task pressure: 0 queued · 1 running · 0 issues"); + expect(logs.join("\n")).toContain("No output for 60s. It may be waiting for input."); + }); + + it("shows detailed task fields including notify and recent events", async () => { + vi.mocked(operations.findByRunId).mockResolvedValue(taskFixture); + + await runTasksShow( + { lookup: "run-12345678" }, + { + config: {}, + operations, + }, + runtime, + ); + + expect(logs.join("\n")).toContain("notify: state_changes"); + expect(logs.join("\n")).toContain( + "progressSummary: No output for 60s. It may be waiting for input.", + ); + }); + + it("updates notify policy for an existing task", async () => { + vi.mocked(operations.findByRunId).mockResolvedValue(taskFixture); + vi.mocked(operations.dispatch).mockResolvedValue({ + matched: true, + record: { + ...taskFixture, + metadata: { + ...taskFixture.metadata, + notifyPolicy: "silent", + }, + }, + }); + + await runTasksNotify( + { lookup: "run-12345678", notify: "silent" }, + { + config: {}, + operations, + }, + runtime, + ); + + expect(operations.dispatch).toHaveBeenCalledWith({ + type: "patch", + operationId: "task-12345678", + at: expect.any(Number), + metadataPatch: { + notifyPolicy: "silent", + }, + }); + expect(logs[0]).toContain("Updated task-12345678 notify policy to silent."); + }); + + it("cancels a running task and reports the updated runtime", async () => { + vi.mocked(operations.findByRunId).mockResolvedValue(taskFixture); + vi.mocked(operations.cancel).mockResolvedValue({ + found: true, + cancelled: true, + record: { + ...taskFixture, + status: "cancelled", + }, + }); + vi.mocked(operations.getById).mockResolvedValue({ + ...taskFixture, + status: "cancelled", + }); + + await runTasksCancel( + { lookup: "run-12345678" }, + { + config, + operations, + }, + runtime, + ); + + expect(operations.cancel).toHaveBeenCalledWith({ + cfg: config, + operationId: "task-12345678", + }); + expect(logs[0]).toContain("Cancelled task-12345678 (acp) run run-12345678."); + expect(errors).toEqual([]); + }); + + it("shows task audit findings with filters", async () => { + const findings: PluginOperationAuditFinding[] = [ + { + severity: "error", + code: "stale_running", + operation: taskFixture, + ageMs: 45 * 60_000, + detail: "running task appears stuck", + }, + { + severity: "warn", + code: "delivery_failed", + operation: { + ...taskFixture, + operationId: "task-87654321", + status: "failed", + }, + ageMs: 10 * 60_000, + detail: "terminal update delivery failed", + }, + ]; + vi.mocked(operations.audit) + .mockResolvedValueOnce(findings) + .mockResolvedValueOnce([findings[0]!]); + + await runTasksAudit( + { severity: "error", code: "stale_running", limit: 1 }, + { + config: {}, + operations, + }, + runtime, + ); + + expect(logs[0]).toContain("Task audit: 2 findings · 1 errors · 1 warnings"); + expect(logs[1]).toContain("Showing 1 matching findings."); + expect(logs.join("\n")).toContain("stale_running"); + expect(logs.join("\n")).toContain("running task appears stuck"); + expect(logs.join("\n")).not.toContain("delivery_failed"); + }); + + it("previews task maintenance without applying changes", async () => { + vi.mocked(operations.audit).mockResolvedValue([ + { + severity: "error", + code: "stale_running", + operation: taskFixture, + detail: "running task appears stuck", + }, + { + severity: "warn", + code: "lost", + operation: { + ...taskFixture, + operationId: "task-2", + status: "lost", + }, + detail: "backing session missing", + }, + ]); + vi.mocked(operations.maintenance).mockResolvedValue({ + reconciled: 2, + cleanupStamped: 1, + pruned: 3, + }); + vi.mocked(operations.summarize).mockResolvedValue({ + total: 5, + active: 2, + terminal: 3, + failures: 1, + byNamespace: { tasks: 5 }, + byKind: { acp: 1, cron: 2, subagent: 1, cli: 1 }, + byStatus: { + queued: 1, + running: 1, + succeeded: 1, + lost: 1, + failed: 1, + }, + }); + + await runTasksMaintenance( + {}, + { + config: {}, + operations, + }, + runtime, + ); + + expect(logs[0]).toContain( + "Task maintenance (preview): 2 reconcile · 1 cleanup stamp · 3 prune", + ); + expect(logs[1]).toContain( + "Task health: 1 queued · 1 running · 1 audit errors · 1 audit warnings", + ); + expect(logs[2]).toContain("Dry run only."); + }); + + it("shows before and after audit health when applying maintenance", async () => { + vi.mocked(operations.audit) + .mockResolvedValueOnce([ + { + severity: "error", + code: "stale_running", + operation: taskFixture, + detail: "running task appears stuck", + }, + { + severity: "warn", + code: "missing_cleanup", + operation: { + ...taskFixture, + operationId: "task-2", + status: "succeeded", + }, + detail: "missing cleanupAfter", + }, + ]) + .mockResolvedValueOnce([ + { + severity: "warn", + code: "lost", + operation: { + ...taskFixture, + operationId: "task-2", + status: "lost", + }, + detail: "backing session missing", + }, + ]); + vi.mocked(operations.maintenance).mockResolvedValue({ + reconciled: 2, + cleanupStamped: 1, + pruned: 3, + }); + vi.mocked(operations.summarize).mockResolvedValue({ + total: 4, + active: 2, + terminal: 2, + failures: 1, + byNamespace: { tasks: 4 }, + byKind: { acp: 1, cron: 2, subagent: 1 }, + byStatus: { + queued: 1, + running: 1, + succeeded: 1, + lost: 1, + }, + }); + + await runTasksMaintenance( + { apply: true }, + { + config: {}, + operations, + }, + runtime, + ); + + expect(logs[0]).toContain( + "Task maintenance (applied): 2 reconcile · 1 cleanup stamp · 3 prune", + ); + expect(logs[1]).toContain( + "Task health after apply: 1 queued · 1 running · 0 audit errors · 1 audit warnings", + ); + expect(logs[2]).toContain("Task health before apply: 1 audit errors · 1 audit warnings"); + }); +}); diff --git a/extensions/tasks/src/cli.runtime.ts b/extensions/tasks/src/cli.runtime.ts new file mode 100644 index 00000000000..5aa9a1a62c1 --- /dev/null +++ b/extensions/tasks/src/cli.runtime.ts @@ -0,0 +1,464 @@ +import type { + OpenClawConfig, + PluginOperationAuditFinding, + PluginOperationRecord, + PluginOperationsRuntime, +} from "openclaw/plugin-sdk/plugin-entry"; +import { info, type RuntimeEnv } from "openclaw/plugin-sdk/runtime"; + +type TasksCliDeps = { + config: OpenClawConfig; + operations: PluginOperationsRuntime; +}; + +type TaskNotifyPolicy = "done_only" | "state_changes" | "silent"; + +const KIND_PAD = 8; +const STATUS_PAD = 10; +const DELIVERY_PAD = 14; +const ID_PAD = 10; +const RUN_PAD = 10; + +function truncate(value: string, maxChars: number) { + if (value.length <= maxChars) { + return value; + } + if (maxChars <= 1) { + return value.slice(0, maxChars); + } + return `${value.slice(0, maxChars - 1)}…`; +} + +function shortToken(value: string | undefined, maxChars = ID_PAD): string { + const trimmed = value?.trim(); + if (!trimmed) { + return "n/a"; + } + return truncate(trimmed, maxChars); +} + +function readStringMetadata(record: PluginOperationRecord, key: string): string | undefined { + const value = record.metadata?.[key]; + return typeof value === "string" && value.trim() ? value : undefined; +} + +function readNumberMetadata(record: PluginOperationRecord, key: string): number | undefined { + const value = record.metadata?.[key]; + return typeof value === "number" && Number.isFinite(value) ? value : undefined; +} + +function formatTaskRows(tasks: PluginOperationRecord[]) { + const header = [ + "Task".padEnd(ID_PAD), + "Kind".padEnd(KIND_PAD), + "Status".padEnd(STATUS_PAD), + "Delivery".padEnd(DELIVERY_PAD), + "Run".padEnd(RUN_PAD), + "Child Session".padEnd(36), + "Summary", + ].join(" "); + const lines = [header]; + for (const task of tasks) { + const summary = truncate( + task.terminalSummary?.trim() || + task.progressSummary?.trim() || + task.title?.trim() || + task.description.trim(), + 80, + ); + const line = [ + shortToken(task.operationId).padEnd(ID_PAD), + task.kind.padEnd(KIND_PAD), + task.status.padEnd(STATUS_PAD), + (readStringMetadata(task, "deliveryStatus") ?? "n/a").padEnd(DELIVERY_PAD), + shortToken(task.runId, RUN_PAD).padEnd(RUN_PAD), + truncate(task.childSessionKey?.trim() || "n/a", 36).padEnd(36), + summary, + ].join(" "); + lines.push(line.trimEnd()); + } + return lines; +} + +function formatAgeMs(ageMs: number | undefined): string { + if (typeof ageMs !== "number" || ageMs < 1000) { + return "fresh"; + } + const totalSeconds = Math.floor(ageMs / 1000); + const days = Math.floor(totalSeconds / 86_400); + const hours = Math.floor((totalSeconds % 86_400) / 3600); + const minutes = Math.floor((totalSeconds % 3600) / 60); + if (days > 0) { + return `${days}d${hours}h`; + } + if (hours > 0) { + return `${hours}h${minutes}m`; + } + if (minutes > 0) { + return `${minutes}m`; + } + return `${totalSeconds}s`; +} + +function formatAuditRows(findings: PluginOperationAuditFinding[]) { + const header = [ + "Severity".padEnd(8), + "Code".padEnd(22), + "Task".padEnd(ID_PAD), + "Status".padEnd(STATUS_PAD), + "Age".padEnd(8), + "Detail", + ].join(" "); + const lines = [header]; + for (const finding of findings) { + lines.push( + [ + finding.severity.padEnd(8), + finding.code.padEnd(22), + shortToken(finding.operation.operationId).padEnd(ID_PAD), + finding.operation.status.padEnd(STATUS_PAD), + formatAgeMs(finding.ageMs).padEnd(8), + truncate(finding.detail, 88), + ] + .join(" ") + .trimEnd(), + ); + } + return lines; +} + +function summarizeAuditFindings(findings: Iterable) { + const summary = { + total: 0, + warnings: 0, + errors: 0, + byCode: {} as Record, + }; + for (const finding of findings) { + summary.total += 1; + summary.byCode[finding.code] = (summary.byCode[finding.code] ?? 0) + 1; + if (finding.severity === "error") { + summary.errors += 1; + continue; + } + summary.warnings += 1; + } + return summary; +} + +function formatTaskListSummary(tasks: PluginOperationRecord[]) { + const queued = tasks.filter((task) => task.status === "queued").length; + const running = tasks.filter((task) => task.status === "running").length; + const failures = tasks.filter( + (task) => task.status === "failed" || task.status === "timed_out" || task.status === "lost", + ).length; + return `${queued} queued · ${running} running · ${failures} issues`; +} + +async function resolveTaskLookupToken( + operations: PluginOperationsRuntime, + lookup: string, +): Promise { + const token = lookup.trim(); + if (!token) { + return null; + } + const byId = await operations.getById(token); + if (byId?.namespace === "tasks") { + return byId; + } + const byRunId = await operations.findByRunId(token); + if (byRunId?.namespace === "tasks") { + return byRunId; + } + const bySession = await operations.list({ + namespace: "tasks", + sessionKey: token, + limit: 1, + }); + return bySession[0] ?? null; +} + +export async function runTasksList( + opts: { json?: boolean; runtime?: string; status?: string }, + deps: TasksCliDeps, + runtime: RuntimeEnv, +) { + const tasks = await deps.operations.list({ + namespace: "tasks", + ...(opts.runtime ? { kind: opts.runtime.trim() } : {}), + ...(opts.status ? { status: opts.status.trim() } : {}), + }); + + if (opts.json) { + runtime.log( + JSON.stringify( + { + count: tasks.length, + runtime: opts.runtime ?? null, + status: opts.status ?? null, + tasks, + }, + null, + 2, + ), + ); + return; + } + + runtime.log(info(`Background tasks: ${tasks.length}`)); + runtime.log(info(`Task pressure: ${formatTaskListSummary(tasks)}`)); + if (opts.runtime) { + runtime.log(info(`Runtime filter: ${opts.runtime}`)); + } + if (opts.status) { + runtime.log(info(`Status filter: ${opts.status}`)); + } + if (tasks.length === 0) { + runtime.log("No background tasks found."); + return; + } + for (const line of formatTaskRows(tasks)) { + runtime.log(line); + } +} + +export async function runTasksShow( + opts: { json?: boolean; lookup: string }, + deps: TasksCliDeps, + runtime: RuntimeEnv, +) { + const task = await resolveTaskLookupToken(deps.operations, opts.lookup); + if (!task) { + runtime.error(`Task not found: ${opts.lookup}`); + runtime.exit(1); + return; + } + + if (opts.json) { + runtime.log(JSON.stringify(task, null, 2)); + return; + } + + const lines = [ + "Background task:", + `taskId: ${task.operationId}`, + `kind: ${task.kind}`, + `sourceId: ${task.sourceId ?? "n/a"}`, + `status: ${task.status}`, + `result: ${readStringMetadata(task, "terminalOutcome") ?? "n/a"}`, + `delivery: ${readStringMetadata(task, "deliveryStatus") ?? "n/a"}`, + `notify: ${readStringMetadata(task, "notifyPolicy") ?? "n/a"}`, + `requesterSessionKey: ${task.requesterSessionKey ?? "n/a"}`, + `childSessionKey: ${task.childSessionKey ?? "n/a"}`, + `parentTaskId: ${task.parentOperationId ?? "n/a"}`, + `agentId: ${task.agentId ?? "n/a"}`, + `runId: ${task.runId ?? "n/a"}`, + `label: ${task.title ?? "n/a"}`, + `task: ${task.description}`, + `createdAt: ${new Date(task.createdAt).toISOString()}`, + `startedAt: ${task.startedAt ? new Date(task.startedAt).toISOString() : "n/a"}`, + `endedAt: ${task.endedAt ? new Date(task.endedAt).toISOString() : "n/a"}`, + `lastEventAt: ${new Date(task.updatedAt).toISOString()}`, + `cleanupAfter: ${(() => { + const cleanupAfter = readNumberMetadata(task, "cleanupAfter"); + return cleanupAfter ? new Date(cleanupAfter).toISOString() : "n/a"; + })()}`, + ...(task.error ? [`error: ${task.error}`] : []), + ...(task.progressSummary ? [`progressSummary: ${task.progressSummary}`] : []), + ...(task.terminalSummary ? [`terminalSummary: ${task.terminalSummary}`] : []), + ]; + for (const line of lines) { + runtime.log(line); + } +} + +export async function runTasksNotify( + opts: { lookup: string; notify: TaskNotifyPolicy }, + deps: TasksCliDeps, + runtime: RuntimeEnv, +) { + const task = await resolveTaskLookupToken(deps.operations, opts.lookup); + if (!task) { + runtime.error(`Task not found: ${opts.lookup}`); + runtime.exit(1); + return; + } + const updated = await deps.operations.dispatch({ + type: "patch", + operationId: task.operationId, + at: Date.now(), + metadataPatch: { + notifyPolicy: opts.notify, + }, + }); + if (!updated.matched || !updated.record) { + runtime.error(`Task not found: ${opts.lookup}`); + runtime.exit(1); + return; + } + runtime.log(`Updated ${updated.record.operationId} notify policy to ${opts.notify}.`); +} + +export async function runTasksCancel( + opts: { lookup: string }, + deps: TasksCliDeps, + runtime: RuntimeEnv, +) { + const task = await resolveTaskLookupToken(deps.operations, opts.lookup); + if (!task) { + runtime.error(`Task not found: ${opts.lookup}`); + runtime.exit(1); + return; + } + const result = await deps.operations.cancel({ + cfg: deps.config, + operationId: task.operationId, + }); + if (!result.found) { + runtime.error(result.reason ?? `Task not found: ${opts.lookup}`); + runtime.exit(1); + return; + } + if (!result.cancelled) { + runtime.error(result.reason ?? `Could not cancel task: ${opts.lookup}`); + runtime.exit(1); + return; + } + const updated = await deps.operations.getById(task.operationId); + runtime.log( + `Cancelled ${updated?.operationId ?? task.operationId} (${updated?.kind ?? task.kind})${updated?.runId ? ` run ${updated.runId}` : ""}.`, + ); +} + +export async function runTasksAudit( + opts: { + json?: boolean; + severity?: "warn" | "error"; + code?: string; + limit?: number; + }, + deps: TasksCliDeps, + runtime: RuntimeEnv, +) { + const allFindings = await deps.operations.audit({ + namespace: "tasks", + }); + const findings = await deps.operations.audit({ + namespace: "tasks", + ...(opts.severity ? { severity: opts.severity } : {}), + ...(opts.code ? { code: opts.code.trim() } : {}), + }); + const displayed = + typeof opts.limit === "number" && opts.limit > 0 ? findings.slice(0, opts.limit) : findings; + const summary = summarizeAuditFindings(allFindings); + + if (opts.json) { + runtime.log( + JSON.stringify( + { + count: allFindings.length, + filteredCount: findings.length, + displayed: displayed.length, + filters: { + severity: opts.severity ?? null, + code: opts.code ?? null, + limit: opts.limit ?? null, + }, + summary, + findings: displayed, + }, + null, + 2, + ), + ); + return; + } + + runtime.log( + info( + `Task audit: ${summary.total} findings · ${summary.errors} errors · ${summary.warnings} warnings`, + ), + ); + if (opts.severity || opts.code) { + runtime.log(info(`Showing ${findings.length} matching findings.`)); + } + if (opts.severity) { + runtime.log(info(`Severity filter: ${opts.severity}`)); + } + if (opts.code) { + runtime.log(info(`Code filter: ${opts.code}`)); + } + if (typeof opts.limit === "number" && opts.limit > 0) { + runtime.log(info(`Limit: ${opts.limit}`)); + } + if (displayed.length === 0) { + runtime.log("No task audit findings."); + return; + } + for (const line of formatAuditRows(displayed)) { + runtime.log(line); + } +} + +export async function runTasksMaintenance( + opts: { json?: boolean; apply?: boolean }, + deps: TasksCliDeps, + runtime: RuntimeEnv, +) { + const auditBeforeFindings = await deps.operations.audit({ + namespace: "tasks", + }); + const maintenance = await deps.operations.maintenance({ + namespace: "tasks", + apply: Boolean(opts.apply), + }); + const summary = await deps.operations.summarize({ + namespace: "tasks", + }); + const auditAfterFindings = opts.apply + ? await deps.operations.audit({ + namespace: "tasks", + }) + : auditBeforeFindings; + const auditBefore = summarizeAuditFindings(auditBeforeFindings); + const auditAfter = summarizeAuditFindings(auditAfterFindings); + + if (opts.json) { + runtime.log( + JSON.stringify( + { + mode: opts.apply ? "apply" : "preview", + maintenance, + tasks: summary, + auditBefore, + auditAfter, + }, + null, + 2, + ), + ); + return; + } + + runtime.log( + info( + `Task maintenance (${opts.apply ? "applied" : "preview"}): ${maintenance.reconciled} reconcile · ${maintenance.cleanupStamped} cleanup stamp · ${maintenance.pruned} prune`, + ), + ); + runtime.log( + info( + `${opts.apply ? "Task health after apply" : "Task health"}: ${summary.byStatus.queued ?? 0} queued · ${summary.byStatus.running ?? 0} running · ${auditAfter.errors} audit errors · ${auditAfter.warnings} audit warnings`, + ), + ); + if (opts.apply) { + runtime.log( + info( + `Task health before apply: ${auditBefore.errors} audit errors · ${auditBefore.warnings} audit warnings`, + ), + ); + } + if (!opts.apply) { + runtime.log("Dry run only. Re-run with `openclaw tasks maintenance --apply` to write changes."); + } +} diff --git a/extensions/tasks/src/cli.ts b/extensions/tasks/src/cli.ts new file mode 100644 index 00000000000..c239d679df4 --- /dev/null +++ b/extensions/tasks/src/cli.ts @@ -0,0 +1,162 @@ +import type { Command } from "commander"; +import type { OpenClawConfig, PluginOperationsRuntime } from "openclaw/plugin-sdk/plugin-entry"; +import { defaultRuntime } from "openclaw/plugin-sdk/runtime"; +import { + runTasksAudit, + runTasksCancel, + runTasksList, + runTasksMaintenance, + runTasksNotify, + runTasksShow, +} from "./cli.runtime.js"; + +function parsePositiveIntOrUndefined(value: unknown): number | undefined { + if (typeof value !== "string" || !value.trim()) { + return undefined; + } + const parsed = Number.parseInt(value, 10); + return Number.isFinite(parsed) && parsed > 0 ? parsed : undefined; +} + +export function registerTasksCli( + program: Command, + deps: { + config: OpenClawConfig; + operations: PluginOperationsRuntime; + }, +) { + const tasks = program + .command("tasks") + .description("Inspect durable background task state") + .option("--json", "Output as JSON", false) + .option("--runtime ", "Filter by kind (subagent, acp, cron, cli)") + .option( + "--status ", + "Filter by status (queued, running, succeeded, failed, timed_out, cancelled, lost)", + ) + .action(async (opts) => { + await runTasksList( + { + json: Boolean(opts.json), + runtime: opts.runtime as string | undefined, + status: opts.status as string | undefined, + }, + deps, + defaultRuntime, + ); + }); + tasks.enablePositionalOptions(); + + tasks + .command("list") + .description("List tracked background tasks") + .option("--json", "Output as JSON", false) + .option("--runtime ", "Filter by kind (subagent, acp, cron, cli)") + .option( + "--status ", + "Filter by status (queued, running, succeeded, failed, timed_out, cancelled, lost)", + ) + .action(async (opts, command) => { + const parentOpts = command.parent?.opts() as + | { + json?: boolean; + runtime?: string; + status?: string; + } + | undefined; + await runTasksList( + { + json: Boolean(opts.json || parentOpts?.json), + runtime: (opts.runtime as string | undefined) ?? parentOpts?.runtime, + status: (opts.status as string | undefined) ?? parentOpts?.status, + }, + deps, + defaultRuntime, + ); + }); + + tasks + .command("audit") + .description("Show stale or broken background task runs") + .option("--json", "Output as JSON", false) + .option("--severity ", "Filter by severity (warn, error)") + .option("--code ", "Filter by finding code") + .option("--limit ", "Limit displayed findings") + .action(async (opts, command) => { + const parentOpts = command.parent?.opts() as { json?: boolean } | undefined; + await runTasksAudit( + { + json: Boolean(opts.json || parentOpts?.json), + severity: opts.severity as "warn" | "error" | undefined, + code: opts.code as string | undefined, + limit: parsePositiveIntOrUndefined(opts.limit), + }, + deps, + defaultRuntime, + ); + }); + + tasks + .command("maintenance") + .description("Preview or apply task ledger maintenance") + .option("--json", "Output as JSON", false) + .option("--apply", "Apply reconciliation, cleanup stamping, and pruning", false) + .action(async (opts, command) => { + const parentOpts = command.parent?.opts() as { json?: boolean } | undefined; + await runTasksMaintenance( + { + json: Boolean(opts.json || parentOpts?.json), + apply: Boolean(opts.apply), + }, + deps, + defaultRuntime, + ); + }); + + tasks + .command("show") + .description("Show one background task by task id, run id, or session key") + .argument("", "Task id, run id, or session key") + .option("--json", "Output as JSON", false) + .action(async (lookup, opts, command) => { + const parentOpts = command.parent?.opts() as { json?: boolean } | undefined; + await runTasksShow( + { + lookup, + json: Boolean(opts.json || parentOpts?.json), + }, + deps, + defaultRuntime, + ); + }); + + tasks + .command("notify") + .description("Set task notify policy") + .argument("", "Task id, run id, or session key") + .argument("", "Notify policy (done_only, state_changes, silent)") + .action(async (lookup, notify) => { + await runTasksNotify( + { + lookup, + notify: notify as "done_only" | "state_changes" | "silent", + }, + deps, + defaultRuntime, + ); + }); + + tasks + .command("cancel") + .description("Cancel a running background task") + .argument("", "Task id, run id, or session key") + .action(async (lookup) => { + await runTasksCancel( + { + lookup, + }, + deps, + defaultRuntime, + ); + }); +} diff --git a/extensions/telegram/src/exec-approval-resolver.test.ts b/extensions/telegram/src/exec-approval-resolver.test.ts index 0d430eee59a..d0c6165fa30 100644 --- a/extensions/telegram/src/exec-approval-resolver.test.ts +++ b/extensions/telegram/src/exec-approval-resolver.test.ts @@ -63,14 +63,10 @@ describe("resolveTelegramExecApproval", () => { id: "legacy-plugin-123", decision: "allow-always", }); - expect(gatewayRuntimeHoisted.requestSpy).toHaveBeenNthCalledWith( - 2, - "plugin.approval.resolve", - { - id: "legacy-plugin-123", - decision: "allow-always", - }, - ); + expect(gatewayRuntimeHoisted.requestSpy).toHaveBeenNthCalledWith(2, "plugin.approval.resolve", { + id: "legacy-plugin-123", + decision: "allow-always", + }); }); it("falls back to plugin.approval.resolve for structured approval-not-found errors", async () => { @@ -95,14 +91,10 @@ describe("resolveTelegramExecApproval", () => { id: "legacy-plugin-123", decision: "allow-always", }); - expect(gatewayRuntimeHoisted.requestSpy).toHaveBeenNthCalledWith( - 2, - "plugin.approval.resolve", - { - id: "legacy-plugin-123", - decision: "allow-always", - }, - ); + expect(gatewayRuntimeHoisted.requestSpy).toHaveBeenNthCalledWith(2, "plugin.approval.resolve", { + id: "legacy-plugin-123", + decision: "allow-always", + }); }); it("does not fall back to plugin.approval.resolve without explicit permission", async () => { diff --git a/package.json b/package.json index d5e9746bfcf..eb23ce80fdd 100644 --- a/package.json +++ b/package.json @@ -185,6 +185,10 @@ "types": "./dist/plugin-sdk/plugin-runtime.d.ts", "default": "./dist/plugin-sdk/plugin-runtime.js" }, + "./plugin-sdk/operations-default": { + "types": "./dist/plugin-sdk/operations-default.d.ts", + "default": "./dist/plugin-sdk/operations-default.js" + }, "./plugin-sdk/security-runtime": { "types": "./dist/plugin-sdk/security-runtime.d.ts", "default": "./dist/plugin-sdk/security-runtime.js" diff --git a/scripts/lib/plugin-sdk-entrypoints.json b/scripts/lib/plugin-sdk-entrypoints.json index 1085535796d..b103e7c7d9e 100644 --- a/scripts/lib/plugin-sdk-entrypoints.json +++ b/scripts/lib/plugin-sdk-entrypoints.json @@ -36,6 +36,7 @@ "speech-runtime", "speech-core", "plugin-runtime", + "operations-default", "security-runtime", "gateway-runtime", "github-copilot-login", diff --git a/src/agents/bash-tools.exec-approval-followup.test.ts b/src/agents/bash-tools.exec-approval-followup.test.ts index c5786735f83..f2ded9104e8 100644 --- a/src/agents/bash-tools.exec-approval-followup.test.ts +++ b/src/agents/bash-tools.exec-approval-followup.test.ts @@ -11,9 +11,8 @@ let sendExecApprovalFollowup: typeof import("./bash-tools.exec-approval-followup beforeEach(async () => { vi.resetModules(); ({ callGatewayTool } = await import("./tools/gateway.js")); - ({ buildExecApprovalFollowupPrompt, sendExecApprovalFollowup } = await import( - "./bash-tools.exec-approval-followup.js" - )); + ({ buildExecApprovalFollowupPrompt, sendExecApprovalFollowup } = + await import("./bash-tools.exec-approval-followup.js")); }); afterEach(() => { diff --git a/src/cli/program/register.status-health-sessions.test.ts b/src/cli/program/register.status-health-sessions.test.ts index f7c222a6893..ba5053fc2e6 100644 --- a/src/cli/program/register.status-health-sessions.test.ts +++ b/src/cli/program/register.status-health-sessions.test.ts @@ -10,12 +10,6 @@ const mocks = vi.hoisted(() => ({ flowsCancelCommand: vi.fn(), sessionsCommand: vi.fn(), sessionsCleanupCommand: vi.fn(), - tasksListCommand: vi.fn(), - tasksAuditCommand: vi.fn(), - tasksMaintenanceCommand: vi.fn(), - tasksShowCommand: vi.fn(), - tasksNotifyCommand: vi.fn(), - tasksCancelCommand: vi.fn(), setVerbose: vi.fn(), runtime: { log: vi.fn(), @@ -31,12 +25,6 @@ const flowsShowCommand = mocks.flowsShowCommand; const flowsCancelCommand = mocks.flowsCancelCommand; const sessionsCommand = mocks.sessionsCommand; const sessionsCleanupCommand = mocks.sessionsCleanupCommand; -const tasksListCommand = mocks.tasksListCommand; -const tasksAuditCommand = mocks.tasksAuditCommand; -const tasksMaintenanceCommand = mocks.tasksMaintenanceCommand; -const tasksShowCommand = mocks.tasksShowCommand; -const tasksNotifyCommand = mocks.tasksNotifyCommand; -const tasksCancelCommand = mocks.tasksCancelCommand; const setVerbose = mocks.setVerbose; const runtime = mocks.runtime; @@ -62,15 +50,6 @@ vi.mock("../../commands/sessions-cleanup.js", () => ({ sessionsCleanupCommand: mocks.sessionsCleanupCommand, })); -vi.mock("../../commands/tasks.js", () => ({ - tasksListCommand: mocks.tasksListCommand, - tasksAuditCommand: mocks.tasksAuditCommand, - tasksMaintenanceCommand: mocks.tasksMaintenanceCommand, - tasksShowCommand: mocks.tasksShowCommand, - tasksNotifyCommand: mocks.tasksNotifyCommand, - tasksCancelCommand: mocks.tasksCancelCommand, -})); - vi.mock("../../globals.js", () => ({ setVerbose: mocks.setVerbose, })); @@ -96,12 +75,6 @@ describe("registerStatusHealthSessionsCommands", () => { flowsCancelCommand.mockResolvedValue(undefined); sessionsCommand.mockResolvedValue(undefined); sessionsCleanupCommand.mockResolvedValue(undefined); - tasksListCommand.mockResolvedValue(undefined); - tasksAuditCommand.mockResolvedValue(undefined); - tasksMaintenanceCommand.mockResolvedValue(undefined); - tasksShowCommand.mockResolvedValue(undefined); - tasksNotifyCommand.mockResolvedValue(undefined); - tasksCancelCommand.mockResolvedValue(undefined); }); it("runs status command with timeout and debug-derived verbose", async () => { @@ -249,90 +222,6 @@ describe("registerStatusHealthSessionsCommands", () => { ); }); - it("runs tasks list from the parent command", async () => { - await runCli(["tasks", "--json", "--runtime", "acp", "--status", "running"]); - - expect(tasksListCommand).toHaveBeenCalledWith( - expect.objectContaining({ - json: true, - runtime: "acp", - status: "running", - }), - runtime, - ); - }); - - it("runs tasks show subcommand with lookup forwarding", async () => { - await runCli(["tasks", "show", "run-123", "--json"]); - - expect(tasksShowCommand).toHaveBeenCalledWith( - expect.objectContaining({ - lookup: "run-123", - json: true, - }), - runtime, - ); - }); - - it("runs tasks maintenance subcommand with apply forwarding", async () => { - await runCli(["tasks", "--json", "maintenance", "--apply"]); - - expect(tasksMaintenanceCommand).toHaveBeenCalledWith( - expect.objectContaining({ - json: true, - apply: true, - }), - runtime, - ); - }); - - it("runs tasks audit subcommand with filters", async () => { - await runCli([ - "tasks", - "--json", - "audit", - "--severity", - "error", - "--code", - "stale_running", - "--limit", - "5", - ]); - - expect(tasksAuditCommand).toHaveBeenCalledWith( - expect.objectContaining({ - json: true, - severity: "error", - code: "stale_running", - limit: 5, - }), - runtime, - ); - }); - - it("runs tasks notify subcommand with lookup and policy forwarding", async () => { - await runCli(["tasks", "notify", "run-123", "state_changes"]); - - expect(tasksNotifyCommand).toHaveBeenCalledWith( - expect.objectContaining({ - lookup: "run-123", - notify: "state_changes", - }), - runtime, - ); - }); - - it("runs tasks cancel subcommand with lookup forwarding", async () => { - await runCli(["tasks", "cancel", "run-123"]); - - expect(tasksCancelCommand).toHaveBeenCalledWith( - expect.objectContaining({ - lookup: "run-123", - }), - runtime, - ); - }); - it("runs flows list from the parent command", async () => { await runCli(["flows", "--json", "--status", "blocked"]); diff --git a/src/cli/program/register.status-health-sessions.ts b/src/cli/program/register.status-health-sessions.ts index 4affcdc933c..85ae149528f 100644 --- a/src/cli/program/register.status-health-sessions.ts +++ b/src/cli/program/register.status-health-sessions.ts @@ -4,28 +4,24 @@ import { healthCommand } from "../../commands/health.js"; import { sessionsCleanupCommand } from "../../commands/sessions-cleanup.js"; import { sessionsCommand } from "../../commands/sessions.js"; import { statusCommand } from "../../commands/status.js"; -import { - tasksAuditCommand, - tasksCancelCommand, - tasksListCommand, - tasksMaintenanceCommand, - tasksNotifyCommand, - tasksShowCommand, -} from "../../commands/tasks.js"; import { setVerbose } from "../../globals.js"; import { defaultRuntime } from "../../runtime.js"; import { formatDocsLink } from "../../terminal/links.js"; import { theme } from "../../terminal/theme.js"; import { runCommandWithRuntime } from "../cli-utils.js"; import { formatHelpExamples } from "../help-format.js"; -import { parsePositiveIntOrUndefined } from "./helpers.js"; function resolveVerbose(opts: { verbose?: boolean; debug?: boolean }): boolean { return Boolean(opts.verbose || opts.debug); } function parseTimeoutMs(timeout: unknown): number | null | undefined { - const parsed = parsePositiveIntOrUndefined(timeout); + const parsedRaw = + typeof timeout === "string" && timeout.trim() ? Number.parseInt(timeout, 10) : undefined; + const parsed = + typeof parsedRaw === "number" && Number.isFinite(parsedRaw) && parsedRaw > 0 + ? parsedRaw + : undefined; if (timeout !== undefined && parsed === undefined) { defaultRuntime.error("--timeout must be a positive integer (milliseconds)"); defaultRuntime.exit(1); @@ -222,159 +218,6 @@ export function registerStatusHealthSessionsCommands(program: Command) { ); }); }); - - const tasksCmd = program - .command("tasks") - .description("Inspect durable background task state") - .option("--json", "Output as JSON", false) - .option("--runtime ", "Filter by kind (subagent, acp, cron, cli)") - .option( - "--status ", - "Filter by status (queued, running, succeeded, failed, timed_out, cancelled, lost)", - ) - .action(async (opts) => { - await runCommandWithRuntime(defaultRuntime, async () => { - await tasksListCommand( - { - json: Boolean(opts.json), - runtime: opts.runtime as string | undefined, - status: opts.status as string | undefined, - }, - defaultRuntime, - ); - }); - }); - tasksCmd.enablePositionalOptions(); - - tasksCmd - .command("list") - .description("List tracked background tasks") - .option("--json", "Output as JSON", false) - .option("--runtime ", "Filter by kind (subagent, acp, cron, cli)") - .option( - "--status ", - "Filter by status (queued, running, succeeded, failed, timed_out, cancelled, lost)", - ) - .action(async (opts, command) => { - const parentOpts = command.parent?.opts() as - | { - json?: boolean; - runtime?: string; - status?: string; - } - | undefined; - await runCommandWithRuntime(defaultRuntime, async () => { - await tasksListCommand( - { - json: Boolean(opts.json || parentOpts?.json), - runtime: (opts.runtime as string | undefined) ?? parentOpts?.runtime, - status: (opts.status as string | undefined) ?? parentOpts?.status, - }, - defaultRuntime, - ); - }); - }); - - tasksCmd - .command("audit") - .description("Show stale or broken background task runs") - .option("--json", "Output as JSON", false) - .option("--severity ", "Filter by severity (warn, error)") - .option( - "--code ", - "Filter by finding code (stale_queued, stale_running, lost, delivery_failed, missing_cleanup, inconsistent_timestamps)", - ) - .option("--limit ", "Limit displayed findings") - .action(async (opts, command) => { - const parentOpts = command.parent?.opts() as { json?: boolean } | undefined; - await runCommandWithRuntime(defaultRuntime, async () => { - await tasksAuditCommand( - { - json: Boolean(opts.json || parentOpts?.json), - severity: opts.severity as "warn" | "error" | undefined, - code: opts.code as - | "stale_queued" - | "stale_running" - | "lost" - | "delivery_failed" - | "missing_cleanup" - | "inconsistent_timestamps" - | undefined, - limit: parsePositiveIntOrUndefined(opts.limit), - }, - defaultRuntime, - ); - }); - }); - - tasksCmd - .command("maintenance") - .description("Preview or apply task ledger maintenance") - .option("--json", "Output as JSON", false) - .option("--apply", "Apply reconciliation, cleanup stamping, and pruning", false) - .action(async (opts, command) => { - const parentOpts = command.parent?.opts() as { json?: boolean } | undefined; - await runCommandWithRuntime(defaultRuntime, async () => { - await tasksMaintenanceCommand( - { - json: Boolean(opts.json || parentOpts?.json), - apply: Boolean(opts.apply), - }, - defaultRuntime, - ); - }); - }); - - tasksCmd - .command("show") - .description("Show one background task by task id, run id, or session key") - .argument("", "Task id, run id, or session key") - .option("--json", "Output as JSON", false) - .action(async (lookup, opts, command) => { - const parentOpts = command.parent?.opts() as { json?: boolean } | undefined; - await runCommandWithRuntime(defaultRuntime, async () => { - await tasksShowCommand( - { - lookup, - json: Boolean(opts.json || parentOpts?.json), - }, - defaultRuntime, - ); - }); - }); - - tasksCmd - .command("notify") - .description("Set task notify policy") - .argument("", "Task id, run id, or session key") - .argument("", "Notify policy (done_only, state_changes, silent)") - .action(async (lookup, notify) => { - await runCommandWithRuntime(defaultRuntime, async () => { - await tasksNotifyCommand( - { - lookup, - notify: notify as "done_only" | "state_changes" | "silent", - }, - defaultRuntime, - ); - }); - }); - - tasksCmd - .command("cancel") - .description("Cancel a running background task") - .argument("", "Task id, run id, or session key") - .action(async (lookup) => { - await runCommandWithRuntime(defaultRuntime, async () => { - await tasksCancelCommand( - { - lookup, - }, - defaultRuntime, - ); - }); - }); - const flowsCmd = program .command("flows") .description("Inspect ClawFlow state") diff --git a/src/commands/tasks.ts b/src/commands/tasks.ts deleted file mode 100644 index 8d263b5e037..00000000000 --- a/src/commands/tasks.ts +++ /dev/null @@ -1,424 +0,0 @@ -import { loadConfig } from "../config/config.js"; -import { info } from "../globals.js"; -import type { RuntimeEnv } from "../runtime.js"; -import { - listTaskAuditFindings, - summarizeTaskAuditFindings, - type TaskAuditCode, - type TaskAuditFinding, - type TaskAuditSeverity, -} from "../tasks/task-registry.audit.js"; -import { cancelTaskById, getTaskById, updateTaskNotifyPolicyById } from "../tasks/task-registry.js"; -import { - getInspectableTaskAuditSummary, - getInspectableTaskRegistrySummary, - previewTaskRegistryMaintenance, - runTaskRegistryMaintenance, -} from "../tasks/task-registry.maintenance.js"; -import { - reconcileInspectableTasks, - reconcileTaskLookupToken, -} from "../tasks/task-registry.reconcile.js"; -import { summarizeTaskRecords } from "../tasks/task-registry.summary.js"; -import type { TaskNotifyPolicy, TaskRecord } from "../tasks/task-registry.types.js"; -import { isRich, theme } from "../terminal/theme.js"; - -const RUNTIME_PAD = 8; -const STATUS_PAD = 10; -const DELIVERY_PAD = 14; -const ID_PAD = 10; -const RUN_PAD = 10; - -function truncate(value: string, maxChars: number) { - if (value.length <= maxChars) { - return value; - } - if (maxChars <= 1) { - return value.slice(0, maxChars); - } - return `${value.slice(0, maxChars - 1)}…`; -} - -function shortToken(value: string | undefined, maxChars = ID_PAD): string { - const trimmed = value?.trim(); - if (!trimmed) { - return "n/a"; - } - return truncate(trimmed, maxChars); -} - -function formatTaskStatusCell(status: string, rich: boolean) { - const padded = status.padEnd(STATUS_PAD); - if (!rich) { - return padded; - } - if (status === "succeeded") { - return theme.success(padded); - } - if (status === "failed" || status === "lost" || status === "timed_out") { - return theme.error(padded); - } - if (status === "running") { - return theme.accentBright(padded); - } - return theme.muted(padded); -} - -function formatTaskRows(tasks: TaskRecord[], rich: boolean) { - const header = [ - "Task".padEnd(ID_PAD), - "Kind".padEnd(RUNTIME_PAD), - "Status".padEnd(STATUS_PAD), - "Delivery".padEnd(DELIVERY_PAD), - "Run".padEnd(RUN_PAD), - "Child Session", - "Summary", - ].join(" "); - const lines = [rich ? theme.heading(header) : header]; - for (const task of tasks) { - const summary = truncate( - task.terminalSummary?.trim() || - task.progressSummary?.trim() || - task.label?.trim() || - task.task.trim(), - 80, - ); - const line = [ - shortToken(task.taskId).padEnd(ID_PAD), - task.runtime.padEnd(RUNTIME_PAD), - formatTaskStatusCell(task.status, rich), - task.deliveryStatus.padEnd(DELIVERY_PAD), - shortToken(task.runId, RUN_PAD).padEnd(RUN_PAD), - truncate(task.childSessionKey?.trim() || "n/a", 36).padEnd(36), - summary, - ].join(" "); - lines.push(line.trimEnd()); - } - return lines; -} - -function formatTaskListSummary(tasks: TaskRecord[]) { - const summary = summarizeTaskRecords(tasks); - return `${summary.byStatus.queued} queued · ${summary.byStatus.running} running · ${summary.failures} issues`; -} - -function formatAgeMs(ageMs: number | undefined): string { - if (typeof ageMs !== "number" || ageMs < 1000) { - return "fresh"; - } - const totalSeconds = Math.floor(ageMs / 1000); - const days = Math.floor(totalSeconds / 86_400); - const hours = Math.floor((totalSeconds % 86_400) / 3600); - const minutes = Math.floor((totalSeconds % 3600) / 60); - if (days > 0) { - return `${days}d${hours}h`; - } - if (hours > 0) { - return `${hours}h${minutes}m`; - } - if (minutes > 0) { - return `${minutes}m`; - } - return `${totalSeconds}s`; -} - -function formatAuditRows(findings: TaskAuditFinding[], rich: boolean) { - const header = [ - "Severity".padEnd(8), - "Code".padEnd(22), - "Task".padEnd(ID_PAD), - "Status".padEnd(STATUS_PAD), - "Age".padEnd(8), - "Detail", - ].join(" "); - const lines = [rich ? theme.heading(header) : header]; - for (const finding of findings) { - const severity = finding.severity.padEnd(8); - const status = formatTaskStatusCell(finding.task.status, rich); - const severityCell = !rich - ? severity - : finding.severity === "error" - ? theme.error(severity) - : theme.warn(severity); - lines.push( - [ - severityCell, - finding.code.padEnd(22), - shortToken(finding.task.taskId).padEnd(ID_PAD), - status, - formatAgeMs(finding.ageMs).padEnd(8), - truncate(finding.detail, 88), - ] - .join(" ") - .trimEnd(), - ); - } - return lines; -} - -export async function tasksListCommand( - opts: { json?: boolean; runtime?: string; status?: string }, - runtime: RuntimeEnv, -) { - const runtimeFilter = opts.runtime?.trim(); - const statusFilter = opts.status?.trim(); - const tasks = reconcileInspectableTasks().filter((task) => { - if (runtimeFilter && task.runtime !== runtimeFilter) { - return false; - } - if (statusFilter && task.status !== statusFilter) { - return false; - } - return true; - }); - - if (opts.json) { - runtime.log( - JSON.stringify( - { - count: tasks.length, - runtime: runtimeFilter ?? null, - status: statusFilter ?? null, - tasks, - }, - null, - 2, - ), - ); - return; - } - - runtime.log(info(`Background tasks: ${tasks.length}`)); - runtime.log(info(`Task pressure: ${formatTaskListSummary(tasks)}`)); - if (runtimeFilter) { - runtime.log(info(`Runtime filter: ${runtimeFilter}`)); - } - if (statusFilter) { - runtime.log(info(`Status filter: ${statusFilter}`)); - } - if (tasks.length === 0) { - runtime.log("No background tasks found."); - return; - } - const rich = isRich(); - for (const line of formatTaskRows(tasks, rich)) { - runtime.log(line); - } -} - -export async function tasksShowCommand( - opts: { json?: boolean; lookup: string }, - runtime: RuntimeEnv, -) { - const task = reconcileTaskLookupToken(opts.lookup); - if (!task) { - runtime.error(`Task not found: ${opts.lookup}`); - runtime.exit(1); - return; - } - - if (opts.json) { - runtime.log(JSON.stringify(task, null, 2)); - return; - } - - const lines = [ - "Background task:", - `taskId: ${task.taskId}`, - `kind: ${task.runtime}`, - `sourceId: ${task.sourceId ?? "n/a"}`, - `status: ${task.status}`, - `result: ${task.terminalOutcome ?? "n/a"}`, - `delivery: ${task.deliveryStatus}`, - `notify: ${task.notifyPolicy}`, - `requesterSessionKey: ${task.requesterSessionKey}`, - `childSessionKey: ${task.childSessionKey ?? "n/a"}`, - `parentTaskId: ${task.parentTaskId ?? "n/a"}`, - `agentId: ${task.agentId ?? "n/a"}`, - `runId: ${task.runId ?? "n/a"}`, - `label: ${task.label ?? "n/a"}`, - `task: ${task.task}`, - `createdAt: ${new Date(task.createdAt).toISOString()}`, - `startedAt: ${task.startedAt ? new Date(task.startedAt).toISOString() : "n/a"}`, - `endedAt: ${task.endedAt ? new Date(task.endedAt).toISOString() : "n/a"}`, - `lastEventAt: ${task.lastEventAt ? new Date(task.lastEventAt).toISOString() : "n/a"}`, - `cleanupAfter: ${task.cleanupAfter ? new Date(task.cleanupAfter).toISOString() : "n/a"}`, - ...(task.error ? [`error: ${task.error}`] : []), - ...(task.progressSummary ? [`progressSummary: ${task.progressSummary}`] : []), - ...(task.terminalSummary ? [`terminalSummary: ${task.terminalSummary}`] : []), - ]; - for (const line of lines) { - runtime.log(line); - } -} - -export async function tasksNotifyCommand( - opts: { lookup: string; notify: TaskNotifyPolicy }, - runtime: RuntimeEnv, -) { - const task = reconcileTaskLookupToken(opts.lookup); - if (!task) { - runtime.error(`Task not found: ${opts.lookup}`); - runtime.exit(1); - return; - } - const updated = updateTaskNotifyPolicyById({ - taskId: task.taskId, - notifyPolicy: opts.notify, - }); - if (!updated) { - runtime.error(`Task not found: ${opts.lookup}`); - runtime.exit(1); - return; - } - runtime.log(`Updated ${updated.taskId} notify policy to ${updated.notifyPolicy}.`); -} - -export async function tasksCancelCommand(opts: { lookup: string }, runtime: RuntimeEnv) { - const task = reconcileTaskLookupToken(opts.lookup); - if (!task) { - runtime.error(`Task not found: ${opts.lookup}`); - runtime.exit(1); - return; - } - const result = await cancelTaskById({ - cfg: loadConfig(), - taskId: task.taskId, - }); - if (!result.found) { - runtime.error(result.reason ?? `Task not found: ${opts.lookup}`); - runtime.exit(1); - return; - } - if (!result.cancelled) { - runtime.error(result.reason ?? `Could not cancel task: ${opts.lookup}`); - runtime.exit(1); - return; - } - const updated = getTaskById(task.taskId); - runtime.log( - `Cancelled ${updated?.taskId ?? task.taskId} (${updated?.runtime ?? task.runtime})${updated?.runId ? ` run ${updated.runId}` : ""}.`, - ); -} - -export async function tasksAuditCommand( - opts: { - json?: boolean; - severity?: TaskAuditSeverity; - code?: TaskAuditCode; - limit?: number; - }, - runtime: RuntimeEnv, -) { - const severityFilter = opts.severity?.trim() as TaskAuditSeverity | undefined; - const codeFilter = opts.code?.trim() as TaskAuditCode | undefined; - const allFindings = listTaskAuditFindings(); - const findings = allFindings.filter((finding) => { - if (severityFilter && finding.severity !== severityFilter) { - return false; - } - if (codeFilter && finding.code !== codeFilter) { - return false; - } - return true; - }); - const limit = typeof opts.limit === "number" && opts.limit > 0 ? opts.limit : undefined; - const displayed = limit ? findings.slice(0, limit) : findings; - const summary = summarizeTaskAuditFindings(allFindings); - - if (opts.json) { - runtime.log( - JSON.stringify( - { - count: allFindings.length, - filteredCount: findings.length, - displayed: displayed.length, - filters: { - severity: severityFilter ?? null, - code: codeFilter ?? null, - limit: limit ?? null, - }, - summary, - findings: displayed, - }, - null, - 2, - ), - ); - return; - } - - runtime.log( - info( - `Task audit: ${summary.total} findings · ${summary.errors} errors · ${summary.warnings} warnings`, - ), - ); - if (severityFilter || codeFilter) { - runtime.log(info(`Showing ${findings.length} matching findings.`)); - } - if (severityFilter) { - runtime.log(info(`Severity filter: ${severityFilter}`)); - } - if (codeFilter) { - runtime.log(info(`Code filter: ${codeFilter}`)); - } - if (limit) { - runtime.log(info(`Limit: ${limit}`)); - } - if (displayed.length === 0) { - runtime.log("No task audit findings."); - return; - } - const rich = isRich(); - for (const line of formatAuditRows(displayed, rich)) { - runtime.log(line); - } -} - -export async function tasksMaintenanceCommand( - opts: { json?: boolean; apply?: boolean }, - runtime: RuntimeEnv, -) { - const auditBefore = getInspectableTaskAuditSummary(); - const maintenance = opts.apply ? runTaskRegistryMaintenance() : previewTaskRegistryMaintenance(); - const summary = getInspectableTaskRegistrySummary(); - const auditAfter = opts.apply ? getInspectableTaskAuditSummary() : auditBefore; - - if (opts.json) { - runtime.log( - JSON.stringify( - { - mode: opts.apply ? "apply" : "preview", - maintenance, - tasks: summary, - auditBefore, - auditAfter, - }, - null, - 2, - ), - ); - return; - } - - runtime.log( - info( - `Task maintenance (${opts.apply ? "applied" : "preview"}): ${maintenance.reconciled} reconcile · ${maintenance.cleanupStamped} cleanup stamp · ${maintenance.pruned} prune`, - ), - ); - runtime.log( - info( - `${opts.apply ? "Task health after apply" : "Task health"}: ${summary.byStatus.queued} queued · ${summary.byStatus.running} running · ${auditAfter.errors} audit errors · ${auditAfter.warnings} audit warnings`, - ), - ); - if (opts.apply) { - runtime.log( - info( - `Task health before apply: ${auditBefore.errors} audit errors · ${auditBefore.warnings} audit warnings`, - ), - ); - } - if (!opts.apply) { - runtime.log("Dry run only. Re-run with `openclaw tasks maintenance --apply` to write changes."); - } -} diff --git a/src/gateway/server.impl.ts b/src/gateway/server.impl.ts index a686fa08dd5..a15d595599c 100644 --- a/src/gateway/server.impl.ts +++ b/src/gateway/server.impl.ts @@ -75,10 +75,7 @@ import { } from "../secrets/runtime.js"; import { onSessionLifecycleEvent } from "../sessions/session-lifecycle-events.js"; import { onSessionTranscriptUpdate } from "../sessions/transcript-events.js"; -import { - getInspectableTaskRegistrySummary, - startTaskRegistryMaintenance, -} from "../tasks/task-registry.maintenance.js"; +import { getInspectableTaskRegistrySummary } from "../tasks/task-registry.maintenance.js"; import { runSetupWizard } from "../wizard/setup.js"; import { createAuthRateLimiter, type AuthRateLimiter } from "./auth-rate-limit.js"; import { startChannelHealthMonitor } from "./channel-health-monitor.js"; @@ -897,7 +894,6 @@ export async function startGatewayServer( }); if (!minimalTestGateway) { - startTaskRegistryMaintenance(); ({ tickInterval, healthInterval, dedupeCleanup, mediaCleanup } = startGatewayMaintenanceTimers({ broadcast, diff --git a/src/infra/exec-approval-channel-runtime.ts b/src/infra/exec-approval-channel-runtime.ts index 1a823ceddc6..bc4368cbde0 100644 --- a/src/infra/exec-approval-channel-runtime.ts +++ b/src/infra/exec-approval-channel-runtime.ts @@ -41,10 +41,7 @@ export type ExecApprovalChannelRuntimeAdapter< resolved: TResolved; entries: TPending[]; }) => Promise; - finalizeExpired?: (params: { - request: TRequest; - entries: TPending[]; - }) => Promise; + finalizeExpired?: (params: { request: TRequest; entries: TPending[] }) => Promise; nowMs?: () => number; }; diff --git a/src/infra/exec-approval-reply.ts b/src/infra/exec-approval-reply.ts index 2e8f3da0eba..c502541f4c1 100644 --- a/src/infra/exec-approval-reply.ts +++ b/src/infra/exec-approval-reply.ts @@ -148,7 +148,8 @@ export function parseExecApprovalCommandText( const rawDecision = match[2].toLowerCase(); return { approvalId: match[1], - decision: rawDecision === "always" ? "allow-always" : (rawDecision as ExecApprovalReplyDecision), + decision: + rawDecision === "always" ? "allow-always" : (rawDecision as ExecApprovalReplyDecision), }; } diff --git a/src/plugin-sdk/operations-default.ts b/src/plugin-sdk/operations-default.ts new file mode 100644 index 00000000000..ae928552702 --- /dev/null +++ b/src/plugin-sdk/operations-default.ts @@ -0,0 +1,14 @@ +import type { OpenClawPluginService } from "../plugins/types.js"; +import { defaultTaskOperationsRuntime } from "../tasks/operations-runtime.js"; +import { startTaskRegistryMaintenance } from "../tasks/task-registry.maintenance.js"; + +export const defaultOperationsRuntime = defaultTaskOperationsRuntime; + +export function createDefaultOperationsMaintenanceService(): OpenClawPluginService { + return { + id: "default-operations-maintenance", + start() { + startTaskRegistryMaintenance(); + }, + }; +} diff --git a/src/plugin-sdk/plugin-entry.ts b/src/plugin-sdk/plugin-entry.ts index f88041ae3b3..3ad358d7cbc 100644 --- a/src/plugin-sdk/plugin-entry.ts +++ b/src/plugin-sdk/plugin-entry.ts @@ -1,5 +1,21 @@ import type { OpenClawConfig } from "../config/config.js"; import { emptyPluginConfigSchema } from "../plugins/config-schema.js"; +import type { + PluginOperationAuditFinding, + PluginOperationAuditQuery, + PluginOperationAuditSeverity, + PluginOperationAuditSummary, + PluginOperationDispatchEvent, + PluginOperationDispatchResult, + PluginOperationListQuery, + PluginOperationMaintenanceQuery, + PluginOperationMaintenanceSummary, + PluginOperationPatchEvent, + PluginOperationRecord, + PluginOperationSummary, + PluginOperationsCancelResult, + PluginOperationsRuntime, +} from "../plugins/operations-state.js"; import type { AnyAgentTool, MediaUnderstandingProviderPlugin, @@ -96,6 +112,20 @@ export type { OpenClawPluginDefinition, PluginLogger, PluginInteractiveTelegramHandlerContext, + PluginOperationAuditFinding, + PluginOperationAuditQuery, + PluginOperationAuditSeverity, + PluginOperationAuditSummary, + PluginOperationDispatchEvent, + PluginOperationDispatchResult, + PluginOperationListQuery, + PluginOperationMaintenanceQuery, + PluginOperationMaintenanceSummary, + PluginOperationPatchEvent, + PluginOperationRecord, + PluginOperationSummary, + PluginOperationsCancelResult, + PluginOperationsRuntime, }; export type { OpenClawConfig }; diff --git a/src/plugins/api-builder.ts b/src/plugins/api-builder.ts index e743c1a9c12..ecccf444d75 100644 --- a/src/plugins/api-builder.ts +++ b/src/plugins/api-builder.ts @@ -39,6 +39,7 @@ export type BuildPluginApiParams = { | "registerMemoryFlushPlan" | "registerMemoryRuntime" | "registerMemoryEmbeddingProvider" + | "registerOperationsRuntime" | "on" > >; @@ -69,6 +70,7 @@ const noopRegisterMemoryFlushPlan: OpenClawPluginApi["registerMemoryFlushPlan"] const noopRegisterMemoryRuntime: OpenClawPluginApi["registerMemoryRuntime"] = () => {}; const noopRegisterMemoryEmbeddingProvider: OpenClawPluginApi["registerMemoryEmbeddingProvider"] = () => {}; +const noopRegisterOperationsRuntime: OpenClawPluginApi["registerOperationsRuntime"] = () => {}; const noopOn: OpenClawPluginApi["on"] = () => {}; export function buildPluginApi(params: BuildPluginApiParams): OpenClawPluginApi { @@ -112,6 +114,7 @@ export function buildPluginApi(params: BuildPluginApiParams): OpenClawPluginApi registerMemoryRuntime: handlers.registerMemoryRuntime ?? noopRegisterMemoryRuntime, registerMemoryEmbeddingProvider: handlers.registerMemoryEmbeddingProvider ?? noopRegisterMemoryEmbeddingProvider, + registerOperationsRuntime: handlers.registerOperationsRuntime ?? noopRegisterOperationsRuntime, resolvePath: params.resolvePath, on: handlers.on ?? noopOn, }; diff --git a/src/plugins/captured-registration.test.ts b/src/plugins/captured-registration.test.ts index 844be720a3e..295e09661c6 100644 --- a/src/plugins/captured-registration.test.ts +++ b/src/plugins/captured-registration.test.ts @@ -48,5 +48,6 @@ describe("captured plugin registration", () => { expect(captured.tools.map((tool) => tool.name)).toEqual(["captured-tool"]); expect(captured.providers.map((provider) => provider.id)).toEqual(["captured-provider"]); expect(captured.api.registerMemoryEmbeddingProvider).toBeTypeOf("function"); + expect(captured.api.registerOperationsRuntime).toBeTypeOf("function"); }); }); diff --git a/src/plugins/loader.test.ts b/src/plugins/loader.test.ts index c2e107f1933..bca4620629f 100644 --- a/src/plugins/loader.test.ts +++ b/src/plugins/loader.test.ts @@ -30,6 +30,10 @@ import { registerMemoryRuntime, resolveMemoryFlushPlan, } from "./memory-state.js"; +import { + getRegisteredOperationsRuntime, + registerOperationsRuntimeForOwner, +} from "./operations-state.js"; import { createEmptyPluginRegistry } from "./registry.js"; import { getActivePluginRegistry, @@ -1461,6 +1465,181 @@ module.exports = { id: "skipped-scoped-only", register() { throw new Error("skip expect(listMemoryEmbeddingProviders()).toEqual([]); }); + it("restores the active operations runtime during snapshot loads", () => { + const activeRuntime = { + async dispatch() { + return { matched: true, created: true, record: null }; + }, + async getById() { + return null; + }, + async findByRunId() { + return null; + }, + async list() { + return []; + }, + async summarize() { + return { + total: 0, + active: 0, + terminal: 0, + failures: 0, + byNamespace: { active: 0 }, + byKind: {}, + byStatus: {}, + }; + }, + async audit() { + return []; + }, + async maintenance() { + return { + reconciled: 0, + cleanupStamped: 0, + pruned: 0, + }; + }, + async cancel() { + return { found: false, cancelled: false, reason: "active" }; + }, + }; + registerOperationsRuntimeForOwner(activeRuntime, "active-operations"); + const plugin = writePlugin({ + id: "snapshot-operations", + filename: "snapshot-operations.cjs", + body: `module.exports = { + id: "snapshot-operations", + register(api) { + api.registerOperationsRuntime({ + async dispatch() { + return { matched: true, created: true, record: null }; + }, + async getById() { + return null; + }, + async findByRunId() { + return null; + }, + async list() { + return []; + }, + async summarize() { + return { + total: 1, + active: 1, + terminal: 0, + failures: 0, + byNamespace: { snapshot: 1 }, + byKind: { snapshot: 1 }, + byStatus: { queued: 1 }, + }; + }, + async audit() { + return []; + }, + async maintenance() { + return { + reconciled: 0, + cleanupStamped: 0, + pruned: 0, + }; + }, + async cancel() { + return { found: false, cancelled: false, reason: "snapshot" }; + }, + }); + }, + };`, + }); + + const scoped = loadOpenClawPlugins({ + cache: false, + activate: false, + workspaceDir: plugin.dir, + config: { + plugins: { + load: { paths: [plugin.file] }, + allow: ["snapshot-operations"], + }, + }, + onlyPluginIds: ["snapshot-operations"], + }); + + expect(scoped.plugins.find((entry) => entry.id === "snapshot-operations")?.status).toBe( + "loaded", + ); + expect(getRegisteredOperationsRuntime()).toBe(activeRuntime); + }); + + it("clears newly-registered operations runtime when plugin register fails", () => { + const plugin = writePlugin({ + id: "failing-operations", + filename: "failing-operations.cjs", + body: `module.exports = { + id: "failing-operations", + register(api) { + api.registerOperationsRuntime({ + async dispatch() { + return { matched: true, created: true, record: null }; + }, + async getById() { + return null; + }, + async findByRunId() { + return null; + }, + async list() { + return []; + }, + async summarize() { + return { + total: 1, + active: 1, + terminal: 0, + failures: 0, + byNamespace: { failing: 1 }, + byKind: { failing: 1 }, + byStatus: { queued: 1 }, + }; + }, + async audit() { + return []; + }, + async maintenance() { + return { + reconciled: 0, + cleanupStamped: 0, + pruned: 0, + }; + }, + async cancel() { + return { found: false, cancelled: false, reason: "failing" }; + }, + }); + throw new Error("operations register failed"); + }, + };`, + }); + + const registry = loadOpenClawPlugins({ + cache: false, + workspaceDir: plugin.dir, + config: { + plugins: { + load: { paths: [plugin.file] }, + allow: ["failing-operations"], + }, + }, + onlyPluginIds: ["failing-operations"], + }); + + expect(registry.plugins.find((entry) => entry.id === "failing-operations")?.status).toBe( + "error", + ); + expect(getRegisteredOperationsRuntime()).toBeUndefined(); + }); + it("throws when activate:false is used without cache:false", () => { expect(() => loadOpenClawPlugins({ activate: false })).toThrow( "activate:false requires cache:false", diff --git a/src/plugins/loader.ts b/src/plugins/loader.ts index f608c8b213a..601ae4fd971 100644 --- a/src/plugins/loader.ts +++ b/src/plugins/loader.ts @@ -35,6 +35,12 @@ import { getMemoryRuntime, restoreMemoryPluginState, } from "./memory-state.js"; +import { + clearOperationsRuntimeState, + getRegisteredOperationsRuntime, + getRegisteredOperationsRuntimeOwner, + restoreOperationsRuntimeState, +} from "./operations-state.js"; import { isPathInside, safeStatSync } from "./path-safety.js"; import { createPluginRegistry, type PluginRecord, type PluginRegistry } from "./registry.js"; import { resolvePluginCacheInputs } from "./roots.js"; @@ -116,6 +122,8 @@ type CachedPluginState = { memoryFlushPlanResolver: ReturnType; memoryPromptBuilder: ReturnType; memoryRuntime: ReturnType; + operationsRuntime: ReturnType; + operationsRuntimeOwner: ReturnType; }; const MAX_PLUGIN_REGISTRY_CACHE_ENTRIES = 128; @@ -136,6 +144,7 @@ const LAZY_RUNTIME_REFLECTION_KEYS = [ "logging", "state", "modelAuth", + "operations", ] as const satisfies readonly (keyof PluginRuntime)[]; export function clearPluginLoaderCache(): void { @@ -143,6 +152,7 @@ export function clearPluginLoaderCache(): void { openAllowlistWarningCache.clear(); clearMemoryEmbeddingProviders(); clearMemoryPluginState(); + clearOperationsRuntimeState(); } const defaultLogger = () => createSubsystemLogger("plugins"); @@ -843,6 +853,10 @@ export function loadOpenClawPlugins(options: PluginLoadOptions = {}): PluginRegi flushPlanResolver: cached.memoryFlushPlanResolver, runtime: cached.memoryRuntime, }); + restoreOperationsRuntimeState({ + runtime: cached.operationsRuntime, + ownerPluginId: cached.operationsRuntimeOwner, + }); if (shouldActivate) { activatePluginRegistry(cached.registry, cacheKey, runtimeSubagentMode); } @@ -1336,6 +1350,8 @@ export function loadOpenClawPlugins(options: PluginLoadOptions = {}): PluginRegi const previousMemoryFlushPlanResolver = getMemoryFlushPlanResolver(); const previousMemoryPromptBuilder = getMemoryPromptSectionBuilder(); const previousMemoryRuntime = getMemoryRuntime(); + const previousOperationsRuntime = getRegisteredOperationsRuntime(); + const previousOperationsRuntimeOwner = getRegisteredOperationsRuntimeOwner(); try { const result = register(api); @@ -1355,6 +1371,10 @@ export function loadOpenClawPlugins(options: PluginLoadOptions = {}): PluginRegi flushPlanResolver: previousMemoryFlushPlanResolver, runtime: previousMemoryRuntime, }); + restoreOperationsRuntimeState({ + runtime: previousOperationsRuntime, + ownerPluginId: previousOperationsRuntimeOwner, + }); } registry.plugins.push(record); seenIds.set(pluginId, candidate.origin); @@ -1365,6 +1385,10 @@ export function loadOpenClawPlugins(options: PluginLoadOptions = {}): PluginRegi flushPlanResolver: previousMemoryFlushPlanResolver, runtime: previousMemoryRuntime, }); + restoreOperationsRuntimeState({ + runtime: previousOperationsRuntime, + ownerPluginId: previousOperationsRuntimeOwner, + }); recordPluginError({ logger, registry, @@ -1404,6 +1428,8 @@ export function loadOpenClawPlugins(options: PluginLoadOptions = {}): PluginRegi memoryFlushPlanResolver: getMemoryFlushPlanResolver(), memoryPromptBuilder: getMemoryPromptSectionBuilder(), memoryRuntime: getMemoryRuntime(), + operationsRuntime: getRegisteredOperationsRuntime(), + operationsRuntimeOwner: getRegisteredOperationsRuntimeOwner(), }); } if (shouldActivate) { diff --git a/src/plugins/operations-state.test.ts b/src/plugins/operations-state.test.ts new file mode 100644 index 00000000000..cc1b4bad640 --- /dev/null +++ b/src/plugins/operations-state.test.ts @@ -0,0 +1,134 @@ +import { describe, expect, it } from "vitest"; +import { + clearOperationsRuntimeState, + getRegisteredOperationsRuntime, + getRegisteredOperationsRuntimeOwner, + registerOperationsRuntimeForOwner, + restoreOperationsRuntimeState, + summarizeOperationRecords, + type PluginOperationsRuntime, +} from "./operations-state.js"; + +function createRuntime(label: string): PluginOperationsRuntime { + return { + async dispatch() { + return { matched: true, created: true, record: null }; + }, + async getById() { + return null; + }, + async findByRunId() { + return null; + }, + async list() { + return []; + }, + async summarize() { + return { + total: 0, + active: 0, + terminal: 0, + failures: 0, + byNamespace: { [label]: 0 }, + byKind: {}, + byStatus: {}, + }; + }, + async audit() { + return []; + }, + async maintenance() { + return { + reconciled: 0, + cleanupStamped: 0, + pruned: 0, + }; + }, + async cancel() { + return { found: false, cancelled: false, reason: label }; + }, + }; +} + +describe("operations-state", () => { + it("registers an operations runtime and tracks the owner", () => { + clearOperationsRuntimeState(); + const runtime = createRuntime("one"); + expect(registerOperationsRuntimeForOwner(runtime, "plugin-one")).toEqual({ ok: true }); + expect(getRegisteredOperationsRuntime()).toBe(runtime); + expect(getRegisteredOperationsRuntimeOwner()).toBe("plugin-one"); + }); + + it("rejects a second owner and allows same-owner refresh", () => { + clearOperationsRuntimeState(); + const first = createRuntime("one"); + const second = createRuntime("two"); + const replacement = createRuntime("three"); + expect(registerOperationsRuntimeForOwner(first, "plugin-one")).toEqual({ ok: true }); + expect(registerOperationsRuntimeForOwner(second, "plugin-two")).toEqual({ + ok: false, + existingOwner: "plugin-one", + }); + expect( + registerOperationsRuntimeForOwner(replacement, "plugin-one", { + allowSameOwnerRefresh: true, + }), + ).toEqual({ ok: true }); + expect(getRegisteredOperationsRuntime()).toBe(replacement); + }); + + it("restores and clears runtime state", () => { + clearOperationsRuntimeState(); + const runtime = createRuntime("restore"); + restoreOperationsRuntimeState({ + runtime, + ownerPluginId: "plugin-restore", + }); + expect(getRegisteredOperationsRuntime()).toBe(runtime); + expect(getRegisteredOperationsRuntimeOwner()).toBe("plugin-restore"); + clearOperationsRuntimeState(); + expect(getRegisteredOperationsRuntime()).toBeUndefined(); + expect(getRegisteredOperationsRuntimeOwner()).toBeUndefined(); + }); + + it("summarizes generic operation records", () => { + const summary = summarizeOperationRecords([ + { + operationId: "op-1", + namespace: "tasks", + kind: "cli", + status: "queued", + description: "Queued task", + createdAt: 1, + updatedAt: 1, + }, + { + operationId: "op-2", + namespace: "imports", + kind: "csv", + status: "failed", + description: "Failed import", + createdAt: 2, + updatedAt: 2, + }, + ]); + expect(summary).toEqual({ + total: 2, + active: 1, + terminal: 1, + failures: 1, + byNamespace: { + imports: 1, + tasks: 1, + }, + byKind: { + cli: 1, + csv: 1, + }, + byStatus: { + failed: 1, + queued: 1, + }, + }); + }); +}); diff --git a/src/plugins/operations-state.ts b/src/plugins/operations-state.ts new file mode 100644 index 00000000000..5e9e68d6954 --- /dev/null +++ b/src/plugins/operations-state.ts @@ -0,0 +1,277 @@ +import type { OpenClawConfig } from "../config/config.js"; + +export type PluginOperationRecord = { + operationId: string; + namespace: string; + kind: string; + status: string; + sourceId?: string; + requesterSessionKey?: string; + childSessionKey?: string; + parentOperationId?: string; + agentId?: string; + runId?: string; + title?: string; + description: string; + createdAt: number; + startedAt?: number; + endedAt?: number; + updatedAt: number; + error?: string; + progressSummary?: string; + terminalSummary?: string; + metadata?: Record; +}; + +export type PluginOperationListQuery = { + namespace?: string; + kind?: string; + status?: string; + sessionKey?: string; + runId?: string; + sourceId?: string; + parentOperationId?: string; + limit?: number; +}; + +export type PluginOperationSummary = { + total: number; + active: number; + terminal: number; + failures: number; + byNamespace: Record; + byKind: Record; + byStatus: Record; +}; + +export type PluginOperationCreateEvent = { + type: "create"; + namespace: string; + kind: string; + status?: string; + sourceId?: string; + requesterSessionKey?: string; + childSessionKey?: string; + parentOperationId?: string; + agentId?: string; + runId?: string; + title?: string; + description: string; + createdAt?: number; + startedAt?: number; + endedAt?: number; + updatedAt?: number; + error?: string; + progressSummary?: string | null; + terminalSummary?: string | null; + metadata?: Record; +}; + +export type PluginOperationTransitionEvent = { + type: "transition"; + operationId?: string; + runId?: string; + status: string; + at?: number; + startedAt?: number; + endedAt?: number; + error?: string | null; + progressSummary?: string | null; + terminalSummary?: string | null; + metadataPatch?: Record; +}; + +export type PluginOperationPatchEvent = { + type: "patch"; + operationId?: string; + runId?: string; + at?: number; + title?: string | null; + description?: string | null; + error?: string | null; + progressSummary?: string | null; + terminalSummary?: string | null; + metadataPatch?: Record; +}; + +export type PluginOperationDispatchEvent = + | PluginOperationCreateEvent + | PluginOperationTransitionEvent + | PluginOperationPatchEvent; + +export type PluginOperationDispatchResult = { + matched: boolean; + created?: boolean; + record: PluginOperationRecord | null; +}; + +export type PluginOperationsCancelResult = { + found: boolean; + cancelled: boolean; + reason?: string; + record?: PluginOperationRecord | null; +}; + +export type PluginOperationAuditSeverity = "warn" | "error"; + +export type PluginOperationAuditFinding = { + severity: PluginOperationAuditSeverity; + code: string; + operation: PluginOperationRecord; + detail: string; + ageMs?: number; +}; + +export type PluginOperationAuditSummary = { + total: number; + warnings: number; + errors: number; + byCode: Record; +}; + +export type PluginOperationAuditQuery = { + namespace?: string; + severity?: PluginOperationAuditSeverity; + code?: string; +}; + +export type PluginOperationMaintenanceQuery = { + namespace?: string; + apply?: boolean; +}; + +export type PluginOperationMaintenanceSummary = { + reconciled: number; + cleanupStamped: number; + pruned: number; +}; + +export type PluginOperationsRuntime = { + dispatch(event: PluginOperationDispatchEvent): Promise; + getById(operationId: string): Promise; + findByRunId(runId: string): Promise; + list(query?: PluginOperationListQuery): Promise; + summarize(query?: PluginOperationListQuery): Promise; + audit(query?: PluginOperationAuditQuery): Promise; + maintenance(query?: PluginOperationMaintenanceQuery): Promise; + cancel(params: { + cfg: OpenClawConfig; + operationId: string; + }): Promise; +}; + +type OperationsRuntimeState = { + runtime?: PluginOperationsRuntime; + ownerPluginId?: string; +}; + +type RegisterOperationsRuntimeResult = { ok: true } | { ok: false; existingOwner?: string }; + +const operationsRuntimeState: OperationsRuntimeState = {}; + +function normalizeOwnedPluginId(ownerPluginId: string): string { + return ownerPluginId.trim(); +} + +export function registerOperationsRuntimeForOwner( + runtime: PluginOperationsRuntime, + ownerPluginId: string, + opts?: { allowSameOwnerRefresh?: boolean }, +): RegisterOperationsRuntimeResult { + const nextOwner = normalizeOwnedPluginId(ownerPluginId); + const existingOwner = operationsRuntimeState.ownerPluginId?.trim(); + if ( + operationsRuntimeState.runtime && + existingOwner && + existingOwner !== nextOwner && + !(opts?.allowSameOwnerRefresh === true && existingOwner === nextOwner) + ) { + return { + ok: false, + existingOwner, + }; + } + operationsRuntimeState.runtime = runtime; + operationsRuntimeState.ownerPluginId = nextOwner; + return { ok: true }; +} + +export function getRegisteredOperationsRuntime(): PluginOperationsRuntime | undefined { + return operationsRuntimeState.runtime; +} + +export function getRegisteredOperationsRuntimeOwner(): string | undefined { + return operationsRuntimeState.ownerPluginId; +} + +export function hasRegisteredOperationsRuntime(): boolean { + return operationsRuntimeState.runtime !== undefined; +} + +export function restoreOperationsRuntimeState(state: OperationsRuntimeState): void { + operationsRuntimeState.runtime = state.runtime; + operationsRuntimeState.ownerPluginId = state.ownerPluginId?.trim() || undefined; +} + +export function clearOperationsRuntimeState(): void { + operationsRuntimeState.runtime = undefined; + operationsRuntimeState.ownerPluginId = undefined; +} + +export function isActiveOperationStatus(status: string): boolean { + return status === "queued" || status === "running"; +} + +export function isFailureOperationStatus(status: string): boolean { + return status === "failed" || status === "timed_out" || status === "lost"; +} + +export function summarizeOperationRecords( + records: Iterable, +): PluginOperationSummary { + const summary: PluginOperationSummary = { + total: 0, + active: 0, + terminal: 0, + failures: 0, + byNamespace: {}, + byKind: {}, + byStatus: {}, + }; + for (const record of records) { + summary.total += 1; + summary.byNamespace[record.namespace] = (summary.byNamespace[record.namespace] ?? 0) + 1; + summary.byKind[record.kind] = (summary.byKind[record.kind] ?? 0) + 1; + summary.byStatus[record.status] = (summary.byStatus[record.status] ?? 0) + 1; + if (isActiveOperationStatus(record.status)) { + summary.active += 1; + } else { + summary.terminal += 1; + } + if (isFailureOperationStatus(record.status)) { + summary.failures += 1; + } + } + return summary; +} + +export function summarizeOperationAuditFindings( + findings: Iterable, +): PluginOperationAuditSummary { + const summary: PluginOperationAuditSummary = { + total: 0, + warnings: 0, + errors: 0, + byCode: {}, + }; + for (const finding of findings) { + summary.total += 1; + summary.byCode[finding.code] = (summary.byCode[finding.code] ?? 0) + 1; + if (finding.severity === "error") { + summary.errors += 1; + continue; + } + summary.warnings += 1; + } + return summary; +} diff --git a/src/plugins/registry.ts b/src/plugins/registry.ts index a35aaa8834b..3919b20ff06 100644 --- a/src/plugins/registry.ts +++ b/src/plugins/registry.ts @@ -24,6 +24,7 @@ import { registerMemoryPromptSection, registerMemoryRuntime, } from "./memory-state.js"; +import { registerOperationsRuntimeForOwner } from "./operations-state.js"; import { normalizeRegisteredProvider } from "./provider-validation.js"; import { createEmptyPluginRegistry } from "./registry-empty.js"; import { withPluginRuntimePluginIdScope } from "./runtime/gateway-request-scope.js"; @@ -1153,6 +1154,20 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) { ownerPluginId: record.id, }); }, + registerOperationsRuntime: (runtime) => { + const result = registerOperationsRuntimeForOwner(runtime, record.id, { + allowSameOwnerRefresh: true, + }); + if (!result.ok) { + const ownerDetail = result.existingOwner ? ` (${result.existingOwner})` : ""; + pushDiagnostic({ + level: "error", + pluginId: record.id, + source: record.source, + message: `operations runtime already registered${ownerDetail}`, + }); + } + }, on: (hookName, handler, opts) => registerTypedHook(record, hookName, handler, opts, params.hookPolicy), } diff --git a/src/plugins/runtime/index.test.ts b/src/plugins/runtime/index.test.ts index e3fc27f0632..47b38c48663 100644 --- a/src/plugins/runtime/index.test.ts +++ b/src/plugins/runtime/index.test.ts @@ -215,6 +215,20 @@ describe("plugin runtime command execution", () => { ]); }, }, + { + name: "exposes runtime.operations helpers", + assert: (runtime: ReturnType) => { + expect(runtime.operations).toBeDefined(); + expectFunctionKeys(runtime.operations as Record, [ + "dispatch", + "getById", + "findByRunId", + "list", + "summarize", + "cancel", + ]); + }, + }, ] as const)("$name", ({ assert }) => { expectRuntimeShape(assert); }); diff --git a/src/plugins/runtime/index.ts b/src/plugins/runtime/index.ts index c9ad7f44b4a..f72954cb915 100644 --- a/src/plugins/runtime/index.ts +++ b/src/plugins/runtime/index.ts @@ -6,8 +6,10 @@ import { createLazyRuntimeMethodBinder, createLazyRuntimeModule, } from "../../shared/lazy-runtime.js"; +import { defaultTaskOperationsRuntime } from "../../tasks/operations-runtime.js"; import { VERSION } from "../../version.js"; import { listWebSearchProviders, runWebSearch } from "../../web-search/runtime.js"; +import { getRegisteredOperationsRuntime } from "../operations-state.js"; import { createRuntimeAgent } from "./runtime-agent.js"; import { defineCachedValue } from "./runtime-cache.js"; import { createRuntimeChannel } from "./runtime-channel.js"; @@ -96,6 +98,20 @@ function createRuntimeModelAuth(): PluginRuntime["modelAuth"] { }; } +function createRuntimeOperations(): PluginRuntime["operations"] { + const resolveRuntime = () => getRegisteredOperationsRuntime() ?? defaultTaskOperationsRuntime; + return { + dispatch: (event) => resolveRuntime().dispatch(event), + getById: (operationId) => resolveRuntime().getById(operationId), + findByRunId: (runId) => resolveRuntime().findByRunId(runId), + list: (query) => resolveRuntime().list(query), + summarize: (query) => resolveRuntime().summarize(query), + audit: (query) => resolveRuntime().audit(query), + maintenance: (query) => resolveRuntime().maintenance(query), + cancel: (params) => resolveRuntime().cancel(params), + }; +} + function createUnavailableSubagentRuntime(): PluginRuntime["subagent"] { const unavailable = () => { throw new Error("Plugin runtime subagent methods are only available during a gateway request."); @@ -203,6 +219,7 @@ export function createPluginRuntime(_options: CreatePluginRuntimeOptions = {}): events: createRuntimeEvents(), logging: createRuntimeLogging(), state: { resolveStateDir }, + operations: createRuntimeOperations(), } satisfies Omit< PluginRuntime, "tts" | "mediaUnderstanding" | "stt" | "modelAuth" | "imageGeneration" diff --git a/src/plugins/runtime/types-core.ts b/src/plugins/runtime/types-core.ts index 9109b694d56..55718444494 100644 --- a/src/plugins/runtime/types-core.ts +++ b/src/plugins/runtime/types-core.ts @@ -1,5 +1,17 @@ import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js"; import type { LogLevel } from "../../logging/levels.js"; +import type { + PluginOperationAuditFinding, + PluginOperationAuditQuery, + PluginOperationDispatchEvent, + PluginOperationDispatchResult, + PluginOperationListQuery, + PluginOperationMaintenanceQuery, + PluginOperationMaintenanceSummary, + PluginOperationRecord, + PluginOperationSummary, + PluginOperationsCancelResult, +} from "../operations-state.js"; export type { HeartbeatRunResult }; @@ -115,4 +127,19 @@ export type PluginRuntimeCore = { cfg?: import("../../config/config.js").OpenClawConfig; }) => Promise; }; + operations: { + dispatch: (event: PluginOperationDispatchEvent) => Promise; + getById: (operationId: string) => Promise; + findByRunId: (runId: string) => Promise; + list: (query?: PluginOperationListQuery) => Promise; + summarize: (query?: PluginOperationListQuery) => Promise; + audit: (query?: PluginOperationAuditQuery) => Promise; + maintenance: ( + query?: PluginOperationMaintenanceQuery, + ) => Promise; + cancel: (params: { + cfg: import("../../config/config.js").OpenClawConfig; + operationId: string; + }) => Promise; + }; }; diff --git a/src/plugins/types.ts b/src/plugins/types.ts index 1bbc1ab0369..032517f488c 100644 --- a/src/plugins/types.ts +++ b/src/plugins/types.ts @@ -54,6 +54,7 @@ import type { } from "../tts/provider-types.js"; import type { DeliveryContext } from "../utils/delivery-context.js"; import type { WizardPrompter } from "../wizard/prompts.js"; +import type { PluginOperationsRuntime } from "./operations-state.js"; import type { SecretInputMode } from "./provider-auth-types.js"; import type { createVpsAwareOAuthHandlers } from "./provider-oauth-flow.js"; import type { PluginRuntime } from "./runtime/types.js"; @@ -1767,6 +1768,8 @@ export type OpenClawPluginApi = { registerMemoryEmbeddingProvider: ( adapter: import("./memory-embedding-providers.js").MemoryEmbeddingProviderAdapter, ) => void; + /** Register the active operations runtime adapter (exclusive slot — only one active at a time). */ + registerOperationsRuntime: (runtime: PluginOperationsRuntime) => void; resolvePath: (input: string) => string; /** Register a lifecycle hook handler */ on: ( diff --git a/src/tasks/operations-runtime.test.ts b/src/tasks/operations-runtime.test.ts new file mode 100644 index 00000000000..88fb8e007ce --- /dev/null +++ b/src/tasks/operations-runtime.test.ts @@ -0,0 +1,183 @@ +import { afterEach, describe, expect, it } from "vitest"; +import { withTempDir } from "../test-helpers/temp-dir.js"; +import { defaultTaskOperationsRuntime } from "./operations-runtime.js"; +import { findTaskByRunId, resetTaskRegistryForTests } from "./task-registry.js"; + +const ORIGINAL_STATE_DIR = process.env.OPENCLAW_STATE_DIR; + +async function withTaskStateDir(run: () => Promise): Promise { + await withTempDir({ prefix: "openclaw-task-operations-" }, async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskRegistryForTests(); + try { + await run(); + } finally { + resetTaskRegistryForTests(); + } + }); +} + +describe("task operations runtime", () => { + afterEach(() => { + if (ORIGINAL_STATE_DIR === undefined) { + delete process.env.OPENCLAW_STATE_DIR; + } else { + process.env.OPENCLAW_STATE_DIR = ORIGINAL_STATE_DIR; + } + resetTaskRegistryForTests(); + }); + + it("creates and transitions task records through the generic operations runtime", async () => { + await withTaskStateDir(async () => { + const created = await defaultTaskOperationsRuntime.dispatch({ + type: "create", + namespace: "tasks", + kind: "cli", + status: "queued", + requesterSessionKey: "agent:test:main", + childSessionKey: "agent:test:child", + runId: "run-ops-create", + title: "Task title", + description: "Do the thing", + }); + + expect(created.matched).toBe(true); + expect(created.created).toBe(true); + expect(created.record).toMatchObject({ + namespace: "tasks", + kind: "cli", + status: "queued", + title: "Task title", + description: "Do the thing", + runId: "run-ops-create", + }); + + const progressed = await defaultTaskOperationsRuntime.dispatch({ + type: "transition", + runId: "run-ops-create", + status: "running", + at: 100, + startedAt: 100, + progressSummary: "Started work", + }); + + expect(progressed.record).toMatchObject({ + status: "running", + progressSummary: "Started work", + }); + + const completed = await defaultTaskOperationsRuntime.dispatch({ + type: "transition", + runId: "run-ops-create", + status: "succeeded", + at: 200, + endedAt: 200, + terminalSummary: "All done", + }); + + expect(completed.record).toMatchObject({ + status: "succeeded", + terminalSummary: "All done", + }); + expect(findTaskByRunId("run-ops-create")).toMatchObject({ + status: "succeeded", + terminalSummary: "All done", + }); + }); + }); + + it("lists and summarizes task-backed operations", async () => { + await withTaskStateDir(async () => { + await defaultTaskOperationsRuntime.dispatch({ + type: "create", + namespace: "tasks", + kind: "acp", + status: "running", + requesterSessionKey: "agent:test:main", + runId: "run-ops-list-1", + description: "One", + startedAt: 10, + }); + await defaultTaskOperationsRuntime.dispatch({ + type: "create", + namespace: "tasks", + kind: "cron", + status: "failed", + requesterSessionKey: "agent:test:main", + runId: "run-ops-list-2", + description: "Two", + endedAt: 20, + terminalSummary: "Failed", + }); + + const listed = await defaultTaskOperationsRuntime.list({ + namespace: "tasks", + }); + const summary = await defaultTaskOperationsRuntime.summarize({ + namespace: "tasks", + }); + + expect(listed).toHaveLength(2); + expect(summary).toEqual({ + total: 2, + active: 1, + terminal: 1, + failures: 1, + byNamespace: { tasks: 2 }, + byKind: { acp: 1, cron: 1 }, + byStatus: { failed: 1, running: 1 }, + }); + }); + }); + + it("patches notify policy and exposes audit plus maintenance", async () => { + await withTaskStateDir(async () => { + const created = await defaultTaskOperationsRuntime.dispatch({ + type: "create", + namespace: "tasks", + kind: "cli", + status: "running", + requesterSessionKey: "agent:test:main", + runId: "run-ops-patch", + description: "Patch me", + startedAt: Date.now() - 31 * 60_000, + }); + + expect(created.record?.metadata?.notifyPolicy).toBe("done_only"); + + const findings = await defaultTaskOperationsRuntime.audit({ + namespace: "tasks", + severity: "error", + code: "stale_running", + }); + + const patched = await defaultTaskOperationsRuntime.dispatch({ + type: "patch", + operationId: created.record?.operationId, + metadataPatch: { + notifyPolicy: "silent", + }, + }); + + expect(patched.record?.metadata?.notifyPolicy).toBe("silent"); + + const preview = await defaultTaskOperationsRuntime.maintenance({ + namespace: "tasks", + }); + + expect(findings).toHaveLength(1); + expect(findings[0]).toMatchObject({ + severity: "error", + code: "stale_running", + operation: { + operationId: created.record?.operationId, + }, + }); + expect(preview).toEqual({ + reconciled: 0, + cleanupStamped: 0, + pruned: 0, + }); + }); + }); +}); diff --git a/src/tasks/operations-runtime.ts b/src/tasks/operations-runtime.ts new file mode 100644 index 00000000000..ba207b540eb --- /dev/null +++ b/src/tasks/operations-runtime.ts @@ -0,0 +1,389 @@ +import type { + PluginOperationAuditFinding, + PluginOperationAuditQuery, + PluginOperationDispatchEvent, + PluginOperationDispatchResult, + PluginOperationListQuery, + PluginOperationMaintenanceQuery, + PluginOperationMaintenanceSummary, + PluginOperationRecord, + PluginOperationSummary, + PluginOperationsCancelResult, + PluginOperationsRuntime, +} from "../plugins/operations-state.js"; +import { summarizeOperationRecords } from "../plugins/operations-state.js"; +import { + listTaskAuditFindings, + type TaskAuditFinding, + type TaskAuditSeverity, +} from "./task-registry.audit.js"; +import { + cancelTaskById, + createTaskRecord, + findTaskByRunId, + getTaskById, + listTaskRecords, + listTasksForSessionKey, + markTaskLostById, + markTaskRunningByRunId, + markTaskTerminalByRunId, + recordTaskProgressByRunId, + updateTaskNotifyPolicyById, +} from "./task-registry.js"; +import { + previewTaskRegistryMaintenance, + runTaskRegistryMaintenance, +} from "./task-registry.maintenance.js"; +import type { + TaskRecord, + TaskRuntime, + TaskStatus, + TaskTerminalOutcome, +} from "./task-registry.types.js"; + +const TASK_NAMESPACE = "tasks"; + +function isTaskNamespace(namespace: string | undefined): boolean { + const trimmed = namespace?.trim().toLowerCase(); + return !trimmed || trimmed === "task" || trimmed === TASK_NAMESPACE; +} + +function normalizeTaskRuntime(kind: string): TaskRuntime { + const trimmed = kind.trim(); + if (trimmed === "acp" || trimmed === "subagent" || trimmed === "cli" || trimmed === "cron") { + return trimmed; + } + throw new Error(`Unsupported task operation kind: ${kind}`); +} + +function normalizeTaskStatus(status: string | undefined): TaskStatus { + const trimmed = status?.trim(); + if ( + trimmed === "queued" || + trimmed === "running" || + trimmed === "succeeded" || + trimmed === "failed" || + trimmed === "timed_out" || + trimmed === "cancelled" || + trimmed === "lost" + ) { + return trimmed; + } + return "queued"; +} + +function normalizeTaskTerminalOutcome(status: TaskStatus): TaskTerminalOutcome | undefined { + return status === "succeeded" ? "succeeded" : undefined; +} + +function toOperationRecord(task: TaskRecord): PluginOperationRecord { + const metadata: Record = { + deliveryStatus: task.deliveryStatus, + notifyPolicy: task.notifyPolicy, + }; + if (typeof task.cleanupAfter === "number") { + metadata.cleanupAfter = task.cleanupAfter; + } + if (task.terminalOutcome) { + metadata.terminalOutcome = task.terminalOutcome; + } + return { + operationId: task.taskId, + namespace: TASK_NAMESPACE, + kind: task.runtime, + status: task.status, + sourceId: task.sourceId, + requesterSessionKey: task.requesterSessionKey, + childSessionKey: task.childSessionKey, + parentOperationId: task.parentTaskId, + agentId: task.agentId, + runId: task.runId, + title: task.label, + description: task.task, + createdAt: task.createdAt, + startedAt: task.startedAt, + endedAt: task.endedAt, + updatedAt: task.lastEventAt ?? task.endedAt ?? task.startedAt ?? task.createdAt, + error: task.error, + progressSummary: task.progressSummary, + terminalSummary: task.terminalSummary, + metadata, + }; +} + +function resolveTaskRecordForTransition(event: { + operationId?: string; + runId?: string; +}): TaskRecord | undefined { + const operationId = event.operationId?.trim(); + if (operationId) { + return getTaskById(operationId); + } + const runId = event.runId?.trim(); + if (runId) { + return findTaskByRunId(runId); + } + return undefined; +} + +function filterOperationRecord( + record: PluginOperationRecord, + query: PluginOperationListQuery, +): boolean { + if (query.namespace && !isTaskNamespace(query.namespace)) { + return false; + } + if (query.kind && record.kind !== query.kind) { + return false; + } + if (query.status && record.status !== query.status) { + return false; + } + if (query.runId && record.runId !== query.runId) { + return false; + } + if (query.sourceId && record.sourceId !== query.sourceId) { + return false; + } + if (query.parentOperationId && record.parentOperationId !== query.parentOperationId) { + return false; + } + if ( + query.sessionKey && + record.requesterSessionKey !== query.sessionKey && + record.childSessionKey !== query.sessionKey + ) { + return false; + } + return true; +} + +async function dispatchTaskOperation( + event: PluginOperationDispatchEvent, +): Promise { + if (event.type === "create") { + if (!isTaskNamespace(event.namespace)) { + throw new Error( + `Default operations runtime only supports the "${TASK_NAMESPACE}" namespace.`, + ); + } + const status = normalizeTaskStatus(event.status); + const record = createTaskRecord({ + runtime: normalizeTaskRuntime(event.kind), + sourceId: event.sourceId, + requesterSessionKey: event.requesterSessionKey?.trim() || "", + childSessionKey: event.childSessionKey, + parentTaskId: event.parentOperationId, + agentId: event.agentId, + runId: event.runId, + label: event.title, + task: event.description, + status, + startedAt: event.startedAt, + lastEventAt: event.updatedAt ?? event.startedAt ?? event.createdAt, + progressSummary: event.progressSummary, + terminalSummary: event.terminalSummary, + terminalOutcome: normalizeTaskTerminalOutcome(status), + }); + return { + matched: true, + created: true, + record: toOperationRecord(record), + }; + } + + if (event.type === "patch") { + const current = resolveTaskRecordForTransition(event); + if (!current) { + return { + matched: false, + record: null, + }; + } + const nextNotifyPolicy = event.metadataPatch?.notifyPolicy; + const next = + nextNotifyPolicy === "done_only" || + nextNotifyPolicy === "state_changes" || + nextNotifyPolicy === "silent" + ? (updateTaskNotifyPolicyById({ + taskId: current.taskId, + notifyPolicy: nextNotifyPolicy, + }) ?? current) + : current; + return { + matched: true, + record: toOperationRecord(next), + }; + } + + const current = resolveTaskRecordForTransition(event); + if (!current) { + return { + matched: false, + record: null, + }; + } + + const at = event.at ?? event.endedAt ?? event.startedAt ?? Date.now(); + const runId = event.runId?.trim() || current.runId?.trim(); + const status = normalizeTaskStatus(event.status); + let next: TaskRecord | null | undefined; + + if (status === "running") { + if (!runId) { + throw new Error("Task transition to running requires a runId."); + } + next = markTaskRunningByRunId({ + runId, + startedAt: event.startedAt, + lastEventAt: at, + progressSummary: event.progressSummary, + eventSummary: event.progressSummary, + })[0]; + } else if (status === "queued") { + if (!runId) { + throw new Error("Task transition to queued requires a runId."); + } + next = recordTaskProgressByRunId({ + runId, + lastEventAt: at, + progressSummary: event.progressSummary, + eventSummary: event.progressSummary, + })[0]; + } else if ( + status === "succeeded" || + status === "failed" || + status === "timed_out" || + status === "cancelled" + ) { + if (!runId) { + throw new Error(`Task transition to ${status} requires a runId.`); + } + next = markTaskTerminalByRunId({ + runId, + status, + startedAt: event.startedAt, + endedAt: event.endedAt ?? at, + lastEventAt: at, + error: event.error ?? undefined, + progressSummary: event.progressSummary, + terminalSummary: event.terminalSummary, + terminalOutcome: status === "succeeded" ? "succeeded" : undefined, + })[0]; + } else if (status === "lost") { + next = markTaskLostById({ + taskId: current.taskId, + endedAt: event.endedAt ?? at, + lastEventAt: at, + error: event.error ?? undefined, + }); + } + + return { + matched: true, + record: next ? toOperationRecord(next) : toOperationRecord(current), + }; +} + +async function getTaskOperationList( + query: PluginOperationListQuery = {}, +): Promise { + if (query.namespace && !isTaskNamespace(query.namespace)) { + return []; + } + const records = ( + query.sessionKey ? listTasksForSessionKey(query.sessionKey) : listTaskRecords() + ).map(toOperationRecord); + const filtered = records.filter((record) => filterOperationRecord(record, query)); + const limit = + typeof query.limit === "number" && Number.isFinite(query.limit) && query.limit > 0 + ? Math.floor(query.limit) + : undefined; + return typeof limit === "number" ? filtered.slice(0, limit) : filtered; +} + +function isMatchingTaskAuditSeverity( + actual: TaskAuditSeverity, + requested: PluginOperationAuditQuery["severity"], +): boolean { + return !requested || actual === requested; +} + +function toOperationAuditFinding(finding: TaskAuditFinding): PluginOperationAuditFinding { + return { + severity: finding.severity, + code: finding.code, + operation: toOperationRecord(finding.task), + detail: finding.detail, + ...(typeof finding.ageMs === "number" ? { ageMs: finding.ageMs } : {}), + }; +} + +async function auditTaskOperations( + query: PluginOperationAuditQuery = {}, +): Promise { + if (query.namespace && !isTaskNamespace(query.namespace)) { + return []; + } + return listTaskAuditFindings() + .filter((finding) => { + if (!isMatchingTaskAuditSeverity(finding.severity, query.severity)) { + return false; + } + if (query.code && finding.code !== query.code) { + return false; + } + return true; + }) + .map(toOperationAuditFinding); +} + +async function maintainTaskOperations( + query: PluginOperationMaintenanceQuery = {}, +): Promise { + if (query.namespace && !isTaskNamespace(query.namespace)) { + return { + reconciled: 0, + cleanupStamped: 0, + pruned: 0, + }; + } + return query.apply ? runTaskRegistryMaintenance() : previewTaskRegistryMaintenance(); +} + +export const defaultTaskOperationsRuntime: PluginOperationsRuntime = { + dispatch: dispatchTaskOperation, + async getById(operationId: string) { + const record = getTaskById(operationId.trim()); + return record ? toOperationRecord(record) : null; + }, + async findByRunId(runId: string) { + const record = findTaskByRunId(runId.trim()); + return record ? toOperationRecord(record) : null; + }, + list: getTaskOperationList, + async summarize(query) { + const records = await getTaskOperationList(query); + return summarizeOperationRecords(records); + }, + audit: auditTaskOperations, + maintenance: maintainTaskOperations, + async cancel(params): Promise { + const result = await cancelTaskById({ + cfg: params.cfg, + taskId: params.operationId, + }); + return { + found: result.found, + cancelled: result.cancelled, + reason: result.reason, + record: result.task ? toOperationRecord(result.task) : null, + }; + }, +}; + +export async function summarizeTaskOperations( + query: PluginOperationListQuery = {}, +): Promise { + return defaultTaskOperationsRuntime.summarize(query); +} diff --git a/src/tasks/task-executor-boundary.test.ts b/src/tasks/task-executor-boundary.test.ts index b0c140f0487..b2c5af37892 100644 --- a/src/tasks/task-executor-boundary.test.ts +++ b/src/tasks/task-executor-boundary.test.ts @@ -14,6 +14,7 @@ const RAW_TASK_MUTATORS = [ ] as const; const ALLOWED_CALLERS = new Set([ + "tasks/operations-runtime.ts", "tasks/task-executor.ts", "tasks/task-registry.ts", "tasks/task-registry.maintenance.ts", diff --git a/src/tasks/task-registry-import-boundary.test.ts b/src/tasks/task-registry-import-boundary.test.ts index d6c5091f9de..8be3a7bce1f 100644 --- a/src/tasks/task-registry-import-boundary.test.ts +++ b/src/tasks/task-registry-import-boundary.test.ts @@ -11,8 +11,8 @@ const ALLOWED_IMPORTERS = new Set([ "auto-reply/reply/commands-subagents/action-info.ts", "commands/doctor-workspace-status.ts", "commands/flows.ts", - "commands/tasks.ts", "tasks/flow-runtime.ts", + "tasks/operations-runtime.ts", "tasks/task-executor.ts", "tasks/task-registry.maintenance.ts", ]); diff --git a/test/helpers/plugins/plugin-api.ts b/test/helpers/plugins/plugin-api.ts index d89aca74b01..d015903ce21 100644 --- a/test/helpers/plugins/plugin-api.ts +++ b/test/helpers/plugins/plugin-api.ts @@ -31,6 +31,7 @@ export function createTestPluginApi(api: TestPluginApiInput): OpenClawPluginApi registerMemoryFlushPlan() {}, registerMemoryRuntime() {}, registerMemoryEmbeddingProvider() {}, + registerOperationsRuntime() {}, resolvePath(input: string) { return input; }, diff --git a/test/helpers/plugins/plugin-runtime-mock.ts b/test/helpers/plugins/plugin-runtime-mock.ts index a3e94e871b4..99fa42f831b 100644 --- a/test/helpers/plugins/plugin-runtime-mock.ts +++ b/test/helpers/plugins/plugin-runtime-mock.ts @@ -132,6 +132,36 @@ export function createPluginRuntimeMock(overrides: DeepPartial = stt: { transcribeAudioFile: vi.fn() as unknown as PluginRuntime["stt"]["transcribeAudioFile"], }, + operations: { + dispatch: vi.fn().mockResolvedValue({ + matched: false, + record: null, + }) as unknown as PluginRuntime["operations"]["dispatch"], + getById: vi.fn().mockResolvedValue(null) as unknown as PluginRuntime["operations"]["getById"], + findByRunId: vi + .fn() + .mockResolvedValue(null) as unknown as PluginRuntime["operations"]["findByRunId"], + list: vi.fn().mockResolvedValue([]) as unknown as PluginRuntime["operations"]["list"], + summarize: vi.fn().mockResolvedValue({ + total: 0, + active: 0, + terminal: 0, + failures: 0, + byNamespace: {}, + byKind: {}, + byStatus: {}, + }) as unknown as PluginRuntime["operations"]["summarize"], + audit: vi.fn().mockResolvedValue([]) as unknown as PluginRuntime["operations"]["audit"], + maintenance: vi.fn().mockResolvedValue({ + reconciled: 0, + cleanupStamped: 0, + pruned: 0, + }) as unknown as PluginRuntime["operations"]["maintenance"], + cancel: vi.fn().mockResolvedValue({ + found: false, + cancelled: false, + }) as unknown as PluginRuntime["operations"]["cancel"], + }, channel: { text: { chunkByNewline: vi.fn((text: string) => (text ? [text] : [])),