diff --git a/CHANGELOG.md b/CHANGELOG.md index 3b558adc16e..c256fbbc5fa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ Docs: https://docs.openclaw.ai - Android/notifications: add notification-forwarding controls with package filtering, quiet hours, rate limiting, and safer picker behavior for forwarded notification events. (#40175) Thanks @nimbleenigma. - Matrix/network: add explicit `channels.matrix.proxy` config for routing Matrix traffic through an HTTP(S) proxy, including account-level overrides and matching probe/runtime behavior. (#56931) thanks @patrick-yingxi-pan. - Background tasks: turn tasks into a real shared background-run control plane instead of ACP-only bookkeeping by unifying ACP, subagent, cron, and background CLI execution under one SQLite-backed ledger, routing detached lifecycle updates through the executor seam, adding audit/maintenance/status visibility, tightening auto-cleanup and lost-run recovery, improving task awareness in internal status/tool surfaces, and clarifying the split between heartbeat/main-session automation and detached scheduled runs. Thanks @vincentkoc and @mbelinky. +- Flows/tasks: add a minimal SQLite-backed flow registry plus task-to-flow linkage scaffolding, so orchestrated work can start gaining a first-class parent record without changing current task delivery behavior. ### Fixes diff --git a/src/tasks/flow-registry.paths.ts b/src/tasks/flow-registry.paths.ts new file mode 100644 index 00000000000..8d35c657434 --- /dev/null +++ b/src/tasks/flow-registry.paths.ts @@ -0,0 +1,10 @@ +import path from "node:path"; +import { resolveTaskStateDir } from "./task-registry.paths.js"; + +export function resolveFlowRegistryDir(env: NodeJS.ProcessEnv = process.env): string { + return path.join(resolveTaskStateDir(env), "flows"); +} + +export function resolveFlowRegistrySqlitePath(env: NodeJS.ProcessEnv = process.env): string { + return path.join(resolveFlowRegistryDir(env), "registry.sqlite"); +} diff --git a/src/tasks/flow-registry.store.sqlite.ts b/src/tasks/flow-registry.store.sqlite.ts new file mode 100644 index 00000000000..be4a3d0908f --- /dev/null +++ b/src/tasks/flow-registry.store.sqlite.ts @@ -0,0 +1,259 @@ +import { chmodSync, existsSync, mkdirSync } from "node:fs"; +import type { DatabaseSync, StatementSync } from "node:sqlite"; +import { requireNodeSqlite } from "../infra/node-sqlite.js"; +import type { DeliveryContext } from "../utils/delivery-context.js"; +import { resolveFlowRegistryDir, resolveFlowRegistrySqlitePath } from "./flow-registry.paths.js"; +import type { FlowRegistryStoreSnapshot } from "./flow-registry.store.js"; +import type { FlowRecord } from "./flow-registry.types.js"; + +type FlowRegistryRow = { + flow_id: string; + owner_session_key: string; + requester_origin_json: string | null; + status: FlowRecord["status"]; + notify_policy: FlowRecord["notifyPolicy"]; + goal: string; + current_step: string | null; + created_at: number | bigint; + updated_at: number | bigint; + ended_at: number | bigint | null; +}; + +type FlowRegistryStatements = { + selectAll: StatementSync; + upsertRow: StatementSync; + deleteRow: StatementSync; + clearRows: StatementSync; +}; + +type FlowRegistryDatabase = { + db: DatabaseSync; + path: string; + statements: FlowRegistryStatements; +}; + +let cachedDatabase: FlowRegistryDatabase | null = null; +const FLOW_REGISTRY_DIR_MODE = 0o700; +const FLOW_REGISTRY_FILE_MODE = 0o600; +const FLOW_REGISTRY_SIDECAR_SUFFIXES = ["", "-shm", "-wal"] as const; + +function normalizeNumber(value: number | bigint | null): number | undefined { + if (typeof value === "bigint") { + return Number(value); + } + return typeof value === "number" ? value : undefined; +} + +function serializeJson(value: unknown): string | null { + return value == null ? null : JSON.stringify(value); +} + +function parseJsonValue(raw: string | null): T | undefined { + if (!raw?.trim()) { + return undefined; + } + try { + return JSON.parse(raw) as T; + } catch { + return undefined; + } +} + +function rowToFlowRecord(row: FlowRegistryRow): FlowRecord { + const endedAt = normalizeNumber(row.ended_at); + const requesterOrigin = parseJsonValue(row.requester_origin_json); + return { + flowId: row.flow_id, + ownerSessionKey: row.owner_session_key, + ...(requesterOrigin ? { requesterOrigin } : {}), + status: row.status, + notifyPolicy: row.notify_policy, + goal: row.goal, + ...(row.current_step ? { currentStep: row.current_step } : {}), + createdAt: normalizeNumber(row.created_at) ?? 0, + updatedAt: normalizeNumber(row.updated_at) ?? 0, + ...(endedAt != null ? { endedAt } : {}), + }; +} + +function bindFlowRecord(record: FlowRecord) { + return { + flow_id: record.flowId, + owner_session_key: record.ownerSessionKey, + requester_origin_json: serializeJson(record.requesterOrigin), + status: record.status, + notify_policy: record.notifyPolicy, + goal: record.goal, + current_step: record.currentStep ?? null, + created_at: record.createdAt, + updated_at: record.updatedAt, + ended_at: record.endedAt ?? null, + }; +} + +function createStatements(db: DatabaseSync): FlowRegistryStatements { + return { + selectAll: db.prepare(` + SELECT + flow_id, + owner_session_key, + requester_origin_json, + status, + notify_policy, + goal, + current_step, + created_at, + updated_at, + ended_at + FROM flow_runs + ORDER BY created_at ASC, flow_id ASC + `), + upsertRow: db.prepare(` + INSERT INTO flow_runs ( + flow_id, + owner_session_key, + requester_origin_json, + status, + notify_policy, + goal, + current_step, + created_at, + updated_at, + ended_at + ) VALUES ( + @flow_id, + @owner_session_key, + @requester_origin_json, + @status, + @notify_policy, + @goal, + @current_step, + @created_at, + @updated_at, + @ended_at + ) + ON CONFLICT(flow_id) DO UPDATE SET + owner_session_key = excluded.owner_session_key, + requester_origin_json = excluded.requester_origin_json, + status = excluded.status, + notify_policy = excluded.notify_policy, + goal = excluded.goal, + current_step = excluded.current_step, + created_at = excluded.created_at, + updated_at = excluded.updated_at, + ended_at = excluded.ended_at + `), + deleteRow: db.prepare(`DELETE FROM flow_runs WHERE flow_id = ?`), + clearRows: db.prepare(`DELETE FROM flow_runs`), + }; +} + +function ensureSchema(db: DatabaseSync) { + db.exec(` + CREATE TABLE IF NOT EXISTS flow_runs ( + flow_id TEXT PRIMARY KEY, + owner_session_key TEXT NOT NULL, + requester_origin_json TEXT, + status TEXT NOT NULL, + notify_policy TEXT NOT NULL, + goal TEXT NOT NULL, + current_step TEXT, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL, + ended_at INTEGER + ); + `); + db.exec(`CREATE INDEX IF NOT EXISTS idx_flow_runs_status ON flow_runs(status);`); + db.exec( + `CREATE INDEX IF NOT EXISTS idx_flow_runs_owner_session_key ON flow_runs(owner_session_key);`, + ); + db.exec(`CREATE INDEX IF NOT EXISTS idx_flow_runs_updated_at ON flow_runs(updated_at);`); +} + +function ensureFlowRegistryPermissions(pathname: string) { + const dir = resolveFlowRegistryDir(process.env); + mkdirSync(dir, { recursive: true, mode: FLOW_REGISTRY_DIR_MODE }); + chmodSync(dir, FLOW_REGISTRY_DIR_MODE); + for (const suffix of FLOW_REGISTRY_SIDECAR_SUFFIXES) { + const candidate = `${pathname}${suffix}`; + if (!existsSync(candidate)) { + continue; + } + chmodSync(candidate, FLOW_REGISTRY_FILE_MODE); + } +} + +function openFlowRegistryDatabase(): FlowRegistryDatabase { + const pathname = resolveFlowRegistrySqlitePath(process.env); + if (cachedDatabase && cachedDatabase.path === pathname) { + return cachedDatabase; + } + if (cachedDatabase) { + cachedDatabase.db.close(); + cachedDatabase = null; + } + ensureFlowRegistryPermissions(pathname); + const { DatabaseSync } = requireNodeSqlite(); + const db = new DatabaseSync(pathname); + db.exec(`PRAGMA journal_mode = WAL;`); + db.exec(`PRAGMA synchronous = NORMAL;`); + db.exec(`PRAGMA busy_timeout = 5000;`); + ensureSchema(db); + ensureFlowRegistryPermissions(pathname); + cachedDatabase = { + db, + path: pathname, + statements: createStatements(db), + }; + return cachedDatabase; +} + +function withWriteTransaction(write: (statements: FlowRegistryStatements) => void) { + const { db, path, statements } = openFlowRegistryDatabase(); + db.exec("BEGIN IMMEDIATE"); + try { + write(statements); + db.exec("COMMIT"); + ensureFlowRegistryPermissions(path); + } catch (error) { + db.exec("ROLLBACK"); + throw error; + } +} + +export function loadFlowRegistryStateFromSqlite(): FlowRegistryStoreSnapshot { + const { statements } = openFlowRegistryDatabase(); + const rows = statements.selectAll.all() as FlowRegistryRow[]; + return { + flows: new Map(rows.map((row) => [row.flow_id, rowToFlowRecord(row)])), + }; +} + +export function saveFlowRegistryStateToSqlite(snapshot: FlowRegistryStoreSnapshot) { + withWriteTransaction((statements) => { + statements.clearRows.run(); + for (const flow of snapshot.flows.values()) { + statements.upsertRow.run(bindFlowRecord(flow)); + } + }); +} + +export function upsertFlowRegistryRecordToSqlite(flow: FlowRecord) { + const store = openFlowRegistryDatabase(); + store.statements.upsertRow.run(bindFlowRecord(flow)); + ensureFlowRegistryPermissions(store.path); +} + +export function deleteFlowRegistryRecordFromSqlite(flowId: string) { + const store = openFlowRegistryDatabase(); + store.statements.deleteRow.run(flowId); + ensureFlowRegistryPermissions(store.path); +} + +export function closeFlowRegistrySqliteStore() { + if (!cachedDatabase) { + return; + } + cachedDatabase.db.close(); + cachedDatabase = null; +} diff --git a/src/tasks/flow-registry.store.test.ts b/src/tasks/flow-registry.store.test.ts new file mode 100644 index 00000000000..9a506badb42 --- /dev/null +++ b/src/tasks/flow-registry.store.test.ts @@ -0,0 +1,104 @@ +import { statSync } from "node:fs"; +import { afterEach, describe, expect, it, vi } from "vitest"; +import { withTempDir } from "../test-helpers/temp-dir.js"; +import { createFlowRecord, getFlowById, resetFlowRegistryForTests } from "./flow-registry.js"; +import { resolveFlowRegistryDir, resolveFlowRegistrySqlitePath } from "./flow-registry.paths.js"; +import { configureFlowRegistryRuntime } from "./flow-registry.store.js"; +import type { FlowRecord } from "./flow-registry.types.js"; + +function createStoredFlow(): FlowRecord { + return { + flowId: "flow-restored", + ownerSessionKey: "agent:main:main", + status: "running", + notifyPolicy: "done_only", + goal: "Restored flow", + currentStep: "spawn_task", + createdAt: 100, + updatedAt: 100, + }; +} + +describe("flow-registry store runtime", () => { + afterEach(() => { + delete process.env.OPENCLAW_STATE_DIR; + resetFlowRegistryForTests(); + }); + + it("uses the configured flow store for restore and save", () => { + const storedFlow = createStoredFlow(); + const loadSnapshot = vi.fn(() => ({ + flows: new Map([[storedFlow.flowId, storedFlow]]), + })); + const saveSnapshot = vi.fn(); + configureFlowRegistryRuntime({ + store: { + loadSnapshot, + saveSnapshot, + }, + }); + + expect(getFlowById("flow-restored")).toMatchObject({ + flowId: "flow-restored", + goal: "Restored flow", + }); + expect(loadSnapshot).toHaveBeenCalledTimes(1); + + createFlowRecord({ + ownerSessionKey: "agent:main:main", + goal: "New flow", + status: "running", + currentStep: "wait_for", + }); + + expect(saveSnapshot).toHaveBeenCalled(); + const latestSnapshot = saveSnapshot.mock.calls.at(-1)?.[0] as { + flows: ReadonlyMap; + }; + expect(latestSnapshot.flows.size).toBe(2); + expect(latestSnapshot.flows.get("flow-restored")?.goal).toBe("Restored flow"); + }); + + it("restores persisted flows from the default sqlite store", async () => { + await withTempDir({ prefix: "openclaw-flow-store-" }, async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetFlowRegistryForTests(); + + const created = createFlowRecord({ + ownerSessionKey: "agent:main:main", + goal: "Persisted flow", + status: "waiting", + currentStep: "ask_user", + }); + + resetFlowRegistryForTests({ persist: false }); + + expect(getFlowById(created.flowId)).toMatchObject({ + flowId: created.flowId, + status: "waiting", + currentStep: "ask_user", + }); + }); + }); + + it("hardens the sqlite flow store directory and file modes", async () => { + if (process.platform === "win32") { + return; + } + await withTempDir({ prefix: "openclaw-flow-store-" }, async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetFlowRegistryForTests(); + + createFlowRecord({ + ownerSessionKey: "agent:main:main", + goal: "Secured flow", + status: "queued", + }); + + const registryDir = resolveFlowRegistryDir(process.env); + const sqlitePath = resolveFlowRegistrySqlitePath(process.env); + expect(statSync(registryDir).mode & 0o777).toBe(0o700); + expect(statSync(sqlitePath).mode & 0o777).toBe(0o600); + }); + }); +}); diff --git a/src/tasks/flow-registry.store.ts b/src/tasks/flow-registry.store.ts new file mode 100644 index 00000000000..1574b0fde3a --- /dev/null +++ b/src/tasks/flow-registry.store.ts @@ -0,0 +1,45 @@ +import { + closeFlowRegistrySqliteStore, + deleteFlowRegistryRecordFromSqlite, + loadFlowRegistryStateFromSqlite, + saveFlowRegistryStateToSqlite, + upsertFlowRegistryRecordToSqlite, +} from "./flow-registry.store.sqlite.js"; +import type { FlowRecord } from "./flow-registry.types.js"; + +export type FlowRegistryStoreSnapshot = { + flows: Map; +}; + +export type FlowRegistryStore = { + loadSnapshot: () => FlowRegistryStoreSnapshot; + saveSnapshot: (snapshot: FlowRegistryStoreSnapshot) => void; + upsertFlow?: (flow: FlowRecord) => void; + deleteFlow?: (flowId: string) => void; + close?: () => void; +}; + +const defaultFlowRegistryStore: FlowRegistryStore = { + loadSnapshot: loadFlowRegistryStateFromSqlite, + saveSnapshot: saveFlowRegistryStateToSqlite, + upsertFlow: upsertFlowRegistryRecordToSqlite, + deleteFlow: deleteFlowRegistryRecordFromSqlite, + close: closeFlowRegistrySqliteStore, +}; + +let configuredFlowRegistryStore: FlowRegistryStore = defaultFlowRegistryStore; + +export function getFlowRegistryStore(): FlowRegistryStore { + return configuredFlowRegistryStore; +} + +export function configureFlowRegistryRuntime(params: { store?: FlowRegistryStore }) { + if (params.store) { + configuredFlowRegistryStore = params.store; + } +} + +export function resetFlowRegistryRuntimeForTests() { + configuredFlowRegistryStore.close?.(); + configuredFlowRegistryStore = defaultFlowRegistryStore; +} diff --git a/src/tasks/flow-registry.test.ts b/src/tasks/flow-registry.test.ts new file mode 100644 index 00000000000..5ccd8c1955c --- /dev/null +++ b/src/tasks/flow-registry.test.ts @@ -0,0 +1,115 @@ +import { afterEach, describe, expect, it } from "vitest"; +import { withTempDir } from "../test-helpers/temp-dir.js"; +import { + createFlowRecord, + deleteFlowRecordById, + getFlowById, + listFlowRecords, + resetFlowRegistryForTests, + updateFlowRecordById, +} from "./flow-registry.js"; + +const ORIGINAL_STATE_DIR = process.env.OPENCLAW_STATE_DIR; + +describe("flow-registry", () => { + afterEach(() => { + if (ORIGINAL_STATE_DIR === undefined) { + delete process.env.OPENCLAW_STATE_DIR; + } else { + process.env.OPENCLAW_STATE_DIR = ORIGINAL_STATE_DIR; + } + resetFlowRegistryForTests(); + }); + + it("creates, updates, lists, and deletes flow records", async () => { + await withTempDir({ prefix: "openclaw-flow-registry-" }, async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetFlowRegistryForTests(); + + const created = createFlowRecord({ + ownerSessionKey: "agent:main:main", + goal: "Investigate flaky test", + status: "running", + currentStep: "spawn_task", + }); + + expect(getFlowById(created.flowId)).toMatchObject({ + flowId: created.flowId, + status: "running", + currentStep: "spawn_task", + }); + + const updated = updateFlowRecordById(created.flowId, { + status: "waiting", + currentStep: "ask_user", + }); + expect(updated).toMatchObject({ + flowId: created.flowId, + status: "waiting", + currentStep: "ask_user", + }); + + expect(listFlowRecords()).toEqual([ + expect.objectContaining({ + flowId: created.flowId, + goal: "Investigate flaky test", + status: "waiting", + }), + ]); + + expect(deleteFlowRecordById(created.flowId)).toBe(true); + expect(getFlowById(created.flowId)).toBeUndefined(); + expect(listFlowRecords()).toEqual([]); + }); + }); + + it("applies minimal defaults for new flow records", async () => { + await withTempDir({ prefix: "openclaw-flow-registry-" }, async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetFlowRegistryForTests(); + + const created = createFlowRecord({ + ownerSessionKey: "agent:main:main", + goal: "Background job", + }); + + expect(created).toMatchObject({ + flowId: expect.any(String), + ownerSessionKey: "agent:main:main", + goal: "Background job", + status: "queued", + notifyPolicy: "done_only", + }); + expect(created.currentStep).toBeUndefined(); + expect(created.endedAt).toBeUndefined(); + }); + }); + + it("preserves endedAt when later updates change other flow fields", async () => { + await withTempDir({ prefix: "openclaw-flow-registry-" }, async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetFlowRegistryForTests(); + + const created = createFlowRecord({ + ownerSessionKey: "agent:main:main", + goal: "Finish a task", + status: "succeeded", + endedAt: 456, + }); + + const updated = updateFlowRecordById(created.flowId, { + currentStep: "finish", + }); + + expect(updated).toMatchObject({ + flowId: created.flowId, + currentStep: "finish", + endedAt: 456, + }); + expect(getFlowById(created.flowId)).toMatchObject({ + flowId: created.flowId, + endedAt: 456, + }); + }); + }); +}); diff --git a/src/tasks/flow-registry.ts b/src/tasks/flow-registry.ts new file mode 100644 index 00000000000..628040b14ec --- /dev/null +++ b/src/tasks/flow-registry.ts @@ -0,0 +1,148 @@ +import crypto from "node:crypto"; +import { getFlowRegistryStore, resetFlowRegistryRuntimeForTests } from "./flow-registry.store.js"; +import type { FlowRecord, FlowStatus } from "./flow-registry.types.js"; +import type { TaskNotifyPolicy } from "./task-registry.types.js"; + +const flows = new Map(); +let restoreAttempted = false; + +function cloneFlowRecord(record: FlowRecord): FlowRecord { + return { + ...record, + ...(record.requesterOrigin ? { requesterOrigin: { ...record.requesterOrigin } } : {}), + }; +} + +function snapshotFlowRecords(source: ReadonlyMap): FlowRecord[] { + return [...source.values()].map((record) => cloneFlowRecord(record)); +} + +function ensureNotifyPolicy(notifyPolicy?: TaskNotifyPolicy): TaskNotifyPolicy { + return notifyPolicy ?? "done_only"; +} + +function ensureFlowRegistryReady() { + if (restoreAttempted) { + return; + } + restoreAttempted = true; + const restored = getFlowRegistryStore().loadSnapshot(); + flows.clear(); + for (const [flowId, flow] of restored.flows) { + flows.set(flowId, cloneFlowRecord(flow)); + } +} + +function persistFlowRegistry() { + getFlowRegistryStore().saveSnapshot({ + flows: new Map(snapshotFlowRecords(flows).map((flow) => [flow.flowId, flow])), + }); +} + +function persistFlowUpsert(flow: FlowRecord) { + const store = getFlowRegistryStore(); + if (store.upsertFlow) { + store.upsertFlow(cloneFlowRecord(flow)); + return; + } + persistFlowRegistry(); +} + +function persistFlowDelete(flowId: string) { + const store = getFlowRegistryStore(); + if (store.deleteFlow) { + store.deleteFlow(flowId); + return; + } + persistFlowRegistry(); +} + +export function createFlowRecord(params: { + ownerSessionKey: string; + requesterOrigin?: FlowRecord["requesterOrigin"]; + status?: FlowStatus; + notifyPolicy?: TaskNotifyPolicy; + goal: string; + currentStep?: string; + createdAt?: number; + updatedAt?: number; + endedAt?: number; +}): FlowRecord { + ensureFlowRegistryReady(); + const now = params.createdAt ?? Date.now(); + const record: FlowRecord = { + flowId: crypto.randomUUID(), + ownerSessionKey: params.ownerSessionKey, + ...(params.requesterOrigin ? { requesterOrigin: { ...params.requesterOrigin } } : {}), + status: params.status ?? "queued", + notifyPolicy: ensureNotifyPolicy(params.notifyPolicy), + goal: params.goal, + currentStep: params.currentStep?.trim() || undefined, + createdAt: now, + updatedAt: params.updatedAt ?? now, + ...(params.endedAt !== undefined ? { endedAt: params.endedAt } : {}), + }; + flows.set(record.flowId, record); + persistFlowUpsert(record); + return cloneFlowRecord(record); +} + +export function updateFlowRecordById( + flowId: string, + patch: Partial< + Pick + >, +): FlowRecord | null { + ensureFlowRegistryReady(); + const current = flows.get(flowId); + if (!current) { + return null; + } + const next: FlowRecord = { + ...current, + ...(patch.status ? { status: patch.status } : {}), + ...(patch.notifyPolicy ? { notifyPolicy: patch.notifyPolicy } : {}), + ...(patch.goal ? { goal: patch.goal } : {}), + currentStep: + patch.currentStep === undefined ? current.currentStep : patch.currentStep.trim() || undefined, + updatedAt: patch.updatedAt ?? Date.now(), + ...(patch.endedAt !== undefined ? { endedAt: patch.endedAt } : {}), + }; + flows.set(flowId, next); + persistFlowUpsert(next); + return cloneFlowRecord(next); +} + +export function getFlowById(flowId: string): FlowRecord | undefined { + ensureFlowRegistryReady(); + const flow = flows.get(flowId); + return flow ? cloneFlowRecord(flow) : undefined; +} + +export function listFlowRecords(): FlowRecord[] { + ensureFlowRegistryReady(); + return [...flows.values()] + .map((flow) => cloneFlowRecord(flow)) + .toSorted((left, right) => left.createdAt - right.createdAt); +} + +export function deleteFlowRecordById(flowId: string): boolean { + ensureFlowRegistryReady(); + const current = flows.get(flowId); + if (!current) { + return false; + } + flows.delete(flowId); + persistFlowDelete(flowId); + return true; +} + +export function resetFlowRegistryForTests(opts?: { persist?: boolean }) { + flows.clear(); + restoreAttempted = false; + resetFlowRegistryRuntimeForTests(); + if (opts?.persist !== false) { + persistFlowRegistry(); + getFlowRegistryStore().close?.(); + } +} diff --git a/src/tasks/flow-registry.types.ts b/src/tasks/flow-registry.types.ts new file mode 100644 index 00000000000..59f85d6ad13 --- /dev/null +++ b/src/tasks/flow-registry.types.ts @@ -0,0 +1,25 @@ +import type { DeliveryContext } from "../utils/delivery-context.js"; +import type { TaskNotifyPolicy } from "./task-registry.types.js"; + +export type FlowStatus = + | "queued" + | "running" + | "waiting" + | "blocked" + | "succeeded" + | "failed" + | "cancelled" + | "lost"; + +export type FlowRecord = { + flowId: string; + ownerSessionKey: string; + requesterOrigin?: DeliveryContext; + status: FlowStatus; + notifyPolicy: TaskNotifyPolicy; + goal: string; + currentStep?: string; + createdAt: number; + updatedAt: number; + endedAt?: number; +}; diff --git a/src/tasks/task-executor.ts b/src/tasks/task-executor.ts index 8bfde5b8c07..4e973a6694e 100644 --- a/src/tasks/task-executor.ts +++ b/src/tasks/task-executor.ts @@ -23,6 +23,7 @@ export function createQueuedTaskRun(params: { sourceId?: string; requesterSessionKey: string; requesterOrigin?: TaskDeliveryState["requesterOrigin"]; + parentFlowId?: string; childSessionKey?: string; parentTaskId?: string; agentId?: string; @@ -44,6 +45,7 @@ export function createRunningTaskRun(params: { sourceId?: string; requesterSessionKey: string; requesterOrigin?: TaskDeliveryState["requesterOrigin"]; + parentFlowId?: string; childSessionKey?: string; parentTaskId?: string; agentId?: string; diff --git a/src/tasks/task-registry.store.sqlite.ts b/src/tasks/task-registry.store.sqlite.ts index 6ab863c9ce3..80ad7de93b1 100644 --- a/src/tasks/task-registry.store.sqlite.ts +++ b/src/tasks/task-registry.store.sqlite.ts @@ -11,6 +11,7 @@ type TaskRegistryRow = { runtime: TaskRecord["runtime"]; source_id: string | null; requester_session_key: string; + parent_flow_id: string | null; child_session_key: string | null; parent_task_id: string | null; agent_id: string | null; @@ -91,6 +92,7 @@ function rowToTaskRecord(row: TaskRegistryRow): TaskRecord { runtime: row.runtime, ...(row.source_id ? { sourceId: row.source_id } : {}), requesterSessionKey: row.requester_session_key, + ...(row.parent_flow_id ? { parentFlowId: row.parent_flow_id } : {}), ...(row.child_session_key ? { childSessionKey: row.child_session_key } : {}), ...(row.parent_task_id ? { parentTaskId: row.parent_task_id } : {}), ...(row.agent_id ? { agentId: row.agent_id } : {}), @@ -128,6 +130,7 @@ function bindTaskRecord(record: TaskRecord) { runtime: record.runtime, source_id: record.sourceId ?? null, requester_session_key: record.requesterSessionKey, + parent_flow_id: record.parentFlowId ?? null, child_session_key: record.childSessionKey ?? null, parent_task_id: record.parentTaskId ?? null, agent_id: record.agentId ?? null, @@ -165,6 +168,7 @@ function createStatements(db: DatabaseSync): TaskRegistryStatements { runtime, source_id, requester_session_key, + parent_flow_id, child_session_key, parent_task_id, agent_id, @@ -200,6 +204,7 @@ function createStatements(db: DatabaseSync): TaskRegistryStatements { runtime, source_id, requester_session_key, + parent_flow_id, child_session_key, parent_task_id, agent_id, @@ -223,6 +228,7 @@ function createStatements(db: DatabaseSync): TaskRegistryStatements { @runtime, @source_id, @requester_session_key, + @parent_flow_id, @child_session_key, @parent_task_id, @agent_id, @@ -246,6 +252,7 @@ function createStatements(db: DatabaseSync): TaskRegistryStatements { runtime = excluded.runtime, source_id = excluded.source_id, requester_session_key = excluded.requester_session_key, + parent_flow_id = excluded.parent_flow_id, child_session_key = excluded.child_session_key, parent_task_id = excluded.parent_task_id, agent_id = excluded.agent_id, @@ -290,6 +297,7 @@ function ensureSchema(db: DatabaseSync) { runtime TEXT NOT NULL, source_id TEXT, requester_session_key TEXT NOT NULL, + parent_flow_id TEXT, child_session_key TEXT, parent_task_id TEXT, agent_id TEXT, @@ -318,15 +326,30 @@ function ensureSchema(db: DatabaseSync) { ); `); db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_run_id ON task_runs(run_id);`); + ensureColumn(db, "task_runs", "parent_flow_id", "TEXT"); db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_status ON task_runs(status);`); db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_runtime_status ON task_runs(runtime, status);`); db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_cleanup_after ON task_runs(cleanup_after);`); db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_last_event_at ON task_runs(last_event_at);`); + db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_parent_flow_id ON task_runs(parent_flow_id);`); db.exec( `CREATE INDEX IF NOT EXISTS idx_task_runs_child_session_key ON task_runs(child_session_key);`, ); } +function ensureColumn( + db: DatabaseSync, + tableName: string, + columnName: string, + columnDefinition: string, +) { + const rows = db.prepare(`PRAGMA table_info(${tableName})`).all() as Array<{ name?: string }>; + if (rows.some((row) => row.name === columnName)) { + return; + } + db.exec(`ALTER TABLE ${tableName} ADD COLUMN ${columnName} ${columnDefinition};`); +} + function ensureTaskRegistryPermissions(pathname: string) { const dir = resolveTaskRegistryDir(process.env); mkdirSync(dir, { recursive: true, mode: TASK_REGISTRY_DIR_MODE }); diff --git a/src/tasks/task-registry.store.test.ts b/src/tasks/task-registry.store.test.ts index b4b3b6aca3c..3de5b4776bb 100644 --- a/src/tasks/task-registry.store.test.ts +++ b/src/tasks/task-registry.store.test.ts @@ -138,6 +138,26 @@ describe("task-registry store runtime", () => { }); }); + it("persists parent flow linkage on task records", () => { + const created = createTaskRecord({ + runtime: "acp", + requesterSessionKey: "agent:main:main", + parentFlowId: "flow-123", + runId: "run-linked", + task: "Linked task", + status: "running", + deliveryStatus: "pending", + }); + + resetTaskRegistryForTests({ persist: false }); + + expect(findTaskByRunId("run-linked")).toMatchObject({ + taskId: created.taskId, + parentFlowId: "flow-123", + task: "Linked task", + }); + }); + it("hardens the sqlite task store directory and file modes", () => { if (process.platform === "win32") { return; diff --git a/src/tasks/task-registry.test.ts b/src/tasks/task-registry.test.ts index 012207269d8..175893287b1 100644 --- a/src/tasks/task-registry.test.ts +++ b/src/tasks/task-registry.test.ts @@ -679,6 +679,48 @@ describe("task-registry", () => { }); }); + it("adopts parent flow linkage when collapsing onto an earlier ACP record", async () => { + await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskRegistryForTests(); + + const directTask = createTaskRecord({ + runtime: "acp", + requesterSessionKey: "agent:main:main", + requesterOrigin: { + channel: "telegram", + to: "telegram:123", + }, + childSessionKey: "agent:main:acp:child", + runId: "run-collapse-parent-flow", + task: "Direct ACP child", + status: "running", + deliveryStatus: "pending", + }); + + const spawnedTask = createTaskRecord({ + runtime: "acp", + requesterSessionKey: "agent:main:main", + requesterOrigin: { + channel: "telegram", + to: "telegram:123", + }, + parentFlowId: "flow-123", + childSessionKey: "agent:main:acp:child", + runId: "run-collapse-parent-flow", + task: "Spawn ACP child", + status: "running", + deliveryStatus: "pending", + }); + + expect(spawnedTask.taskId).toBe(directTask.taskId); + expect(findTaskByRunId("run-collapse-parent-flow")).toMatchObject({ + taskId: directTask.taskId, + parentFlowId: "flow-123", + }); + }); + }); + it("collapses ACP run-owned task creation onto the existing spawned task", async () => { await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => { process.env.OPENCLAW_STATE_DIR = root; @@ -772,6 +814,7 @@ describe("task-registry", () => { const task = createTaskRecord({ runtime: "subagent", requesterSessionKey: "agent:main:main", + parentFlowId: "flow-restore", childSessionKey: "agent:main:subagent:child", runId: "run-restore", task: "Restore me", @@ -785,6 +828,7 @@ describe("task-registry", () => { expect(resolveTaskForLookupToken(task.taskId)).toMatchObject({ taskId: task.taskId, + parentFlowId: "flow-restore", runId: "run-restore", task: "Restore me", }); diff --git a/src/tasks/task-registry.ts b/src/tasks/task-registry.ts index 0266e211b71..3e67669612e 100644 --- a/src/tasks/task-registry.ts +++ b/src/tasks/task-registry.ts @@ -367,6 +367,7 @@ function mergeExistingTaskForCreate( params: { requesterOrigin?: TaskDeliveryState["requesterOrigin"]; sourceId?: string; + parentFlowId?: string; parentTaskId?: string; agentId?: string; label?: string; @@ -389,6 +390,9 @@ function mergeExistingTaskForCreate( if (params.sourceId?.trim() && !existing.sourceId?.trim()) { patch.sourceId = params.sourceId.trim(); } + if (params.parentFlowId?.trim() && !existing.parentFlowId?.trim()) { + patch.parentFlowId = params.parentFlowId.trim(); + } if (params.parentTaskId?.trim() && !existing.parentTaskId?.trim()) { patch.parentTaskId = params.parentTaskId.trim(); } @@ -910,6 +914,7 @@ export function createTaskRecord(params: { sourceId?: string; requesterSessionKey: string; requesterOrigin?: TaskDeliveryState["requesterOrigin"]; + parentFlowId?: string; childSessionKey?: string; parentTaskId?: string; agentId?: string; @@ -947,6 +952,7 @@ export function createTaskRecord(params: { runtime: params.runtime, sourceId: params.sourceId?.trim() || undefined, requesterSessionKey: params.requesterSessionKey, + parentFlowId: params.parentFlowId?.trim() || undefined, childSessionKey: params.childSessionKey, parentTaskId: params.parentTaskId?.trim() || undefined, agentId: params.agentId?.trim() || undefined, diff --git a/src/tasks/task-registry.types.ts b/src/tasks/task-registry.types.ts index 2228f16d998..74a2296dba0 100644 --- a/src/tasks/task-registry.types.ts +++ b/src/tasks/task-registry.types.ts @@ -54,6 +54,7 @@ export type TaskRecord = { runtime: TaskRuntime; sourceId?: string; requesterSessionKey: string; + parentFlowId?: string; childSessionKey?: string; parentTaskId?: string; agentId?: string;