Tasks: add minimal flow registry scaffold (#57865)

This commit is contained in:
Mariano 2026-03-30 19:57:26 +02:00 committed by GitHub
parent 8c83128fc3
commit 7590c22db7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 803 additions and 0 deletions

View File

@ -22,6 +22,7 @@ Docs: https://docs.openclaw.ai
- Android/notifications: add notification-forwarding controls with package filtering, quiet hours, rate limiting, and safer picker behavior for forwarded notification events. (#40175) Thanks @nimbleenigma.
- Matrix/network: add explicit `channels.matrix.proxy` config for routing Matrix traffic through an HTTP(S) proxy, including account-level overrides and matching probe/runtime behavior. (#56931) thanks @patrick-yingxi-pan.
- Background tasks: turn tasks into a real shared background-run control plane instead of ACP-only bookkeeping by unifying ACP, subagent, cron, and background CLI execution under one SQLite-backed ledger, routing detached lifecycle updates through the executor seam, adding audit/maintenance/status visibility, tightening auto-cleanup and lost-run recovery, improving task awareness in internal status/tool surfaces, and clarifying the split between heartbeat/main-session automation and detached scheduled runs. Thanks @vincentkoc and @mbelinky.
- Flows/tasks: add a minimal SQLite-backed flow registry plus task-to-flow linkage scaffolding, so orchestrated work can start gaining a first-class parent record without changing current task delivery behavior.
### Fixes

View File

@ -0,0 +1,10 @@
import path from "node:path";
import { resolveTaskStateDir } from "./task-registry.paths.js";
export function resolveFlowRegistryDir(env: NodeJS.ProcessEnv = process.env): string {
return path.join(resolveTaskStateDir(env), "flows");
}
export function resolveFlowRegistrySqlitePath(env: NodeJS.ProcessEnv = process.env): string {
return path.join(resolveFlowRegistryDir(env), "registry.sqlite");
}

View File

@ -0,0 +1,259 @@
import { chmodSync, existsSync, mkdirSync } from "node:fs";
import type { DatabaseSync, StatementSync } from "node:sqlite";
import { requireNodeSqlite } from "../infra/node-sqlite.js";
import type { DeliveryContext } from "../utils/delivery-context.js";
import { resolveFlowRegistryDir, resolveFlowRegistrySqlitePath } from "./flow-registry.paths.js";
import type { FlowRegistryStoreSnapshot } from "./flow-registry.store.js";
import type { FlowRecord } from "./flow-registry.types.js";
type FlowRegistryRow = {
flow_id: string;
owner_session_key: string;
requester_origin_json: string | null;
status: FlowRecord["status"];
notify_policy: FlowRecord["notifyPolicy"];
goal: string;
current_step: string | null;
created_at: number | bigint;
updated_at: number | bigint;
ended_at: number | bigint | null;
};
type FlowRegistryStatements = {
selectAll: StatementSync;
upsertRow: StatementSync;
deleteRow: StatementSync;
clearRows: StatementSync;
};
type FlowRegistryDatabase = {
db: DatabaseSync;
path: string;
statements: FlowRegistryStatements;
};
let cachedDatabase: FlowRegistryDatabase | null = null;
const FLOW_REGISTRY_DIR_MODE = 0o700;
const FLOW_REGISTRY_FILE_MODE = 0o600;
const FLOW_REGISTRY_SIDECAR_SUFFIXES = ["", "-shm", "-wal"] as const;
function normalizeNumber(value: number | bigint | null): number | undefined {
if (typeof value === "bigint") {
return Number(value);
}
return typeof value === "number" ? value : undefined;
}
function serializeJson(value: unknown): string | null {
return value == null ? null : JSON.stringify(value);
}
function parseJsonValue<T>(raw: string | null): T | undefined {
if (!raw?.trim()) {
return undefined;
}
try {
return JSON.parse(raw) as T;
} catch {
return undefined;
}
}
function rowToFlowRecord(row: FlowRegistryRow): FlowRecord {
const endedAt = normalizeNumber(row.ended_at);
const requesterOrigin = parseJsonValue<DeliveryContext>(row.requester_origin_json);
return {
flowId: row.flow_id,
ownerSessionKey: row.owner_session_key,
...(requesterOrigin ? { requesterOrigin } : {}),
status: row.status,
notifyPolicy: row.notify_policy,
goal: row.goal,
...(row.current_step ? { currentStep: row.current_step } : {}),
createdAt: normalizeNumber(row.created_at) ?? 0,
updatedAt: normalizeNumber(row.updated_at) ?? 0,
...(endedAt != null ? { endedAt } : {}),
};
}
function bindFlowRecord(record: FlowRecord) {
return {
flow_id: record.flowId,
owner_session_key: record.ownerSessionKey,
requester_origin_json: serializeJson(record.requesterOrigin),
status: record.status,
notify_policy: record.notifyPolicy,
goal: record.goal,
current_step: record.currentStep ?? null,
created_at: record.createdAt,
updated_at: record.updatedAt,
ended_at: record.endedAt ?? null,
};
}
function createStatements(db: DatabaseSync): FlowRegistryStatements {
return {
selectAll: db.prepare(`
SELECT
flow_id,
owner_session_key,
requester_origin_json,
status,
notify_policy,
goal,
current_step,
created_at,
updated_at,
ended_at
FROM flow_runs
ORDER BY created_at ASC, flow_id ASC
`),
upsertRow: db.prepare(`
INSERT INTO flow_runs (
flow_id,
owner_session_key,
requester_origin_json,
status,
notify_policy,
goal,
current_step,
created_at,
updated_at,
ended_at
) VALUES (
@flow_id,
@owner_session_key,
@requester_origin_json,
@status,
@notify_policy,
@goal,
@current_step,
@created_at,
@updated_at,
@ended_at
)
ON CONFLICT(flow_id) DO UPDATE SET
owner_session_key = excluded.owner_session_key,
requester_origin_json = excluded.requester_origin_json,
status = excluded.status,
notify_policy = excluded.notify_policy,
goal = excluded.goal,
current_step = excluded.current_step,
created_at = excluded.created_at,
updated_at = excluded.updated_at,
ended_at = excluded.ended_at
`),
deleteRow: db.prepare(`DELETE FROM flow_runs WHERE flow_id = ?`),
clearRows: db.prepare(`DELETE FROM flow_runs`),
};
}
function ensureSchema(db: DatabaseSync) {
db.exec(`
CREATE TABLE IF NOT EXISTS flow_runs (
flow_id TEXT PRIMARY KEY,
owner_session_key TEXT NOT NULL,
requester_origin_json TEXT,
status TEXT NOT NULL,
notify_policy TEXT NOT NULL,
goal TEXT NOT NULL,
current_step TEXT,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
ended_at INTEGER
);
`);
db.exec(`CREATE INDEX IF NOT EXISTS idx_flow_runs_status ON flow_runs(status);`);
db.exec(
`CREATE INDEX IF NOT EXISTS idx_flow_runs_owner_session_key ON flow_runs(owner_session_key);`,
);
db.exec(`CREATE INDEX IF NOT EXISTS idx_flow_runs_updated_at ON flow_runs(updated_at);`);
}
function ensureFlowRegistryPermissions(pathname: string) {
const dir = resolveFlowRegistryDir(process.env);
mkdirSync(dir, { recursive: true, mode: FLOW_REGISTRY_DIR_MODE });
chmodSync(dir, FLOW_REGISTRY_DIR_MODE);
for (const suffix of FLOW_REGISTRY_SIDECAR_SUFFIXES) {
const candidate = `${pathname}${suffix}`;
if (!existsSync(candidate)) {
continue;
}
chmodSync(candidate, FLOW_REGISTRY_FILE_MODE);
}
}
function openFlowRegistryDatabase(): FlowRegistryDatabase {
const pathname = resolveFlowRegistrySqlitePath(process.env);
if (cachedDatabase && cachedDatabase.path === pathname) {
return cachedDatabase;
}
if (cachedDatabase) {
cachedDatabase.db.close();
cachedDatabase = null;
}
ensureFlowRegistryPermissions(pathname);
const { DatabaseSync } = requireNodeSqlite();
const db = new DatabaseSync(pathname);
db.exec(`PRAGMA journal_mode = WAL;`);
db.exec(`PRAGMA synchronous = NORMAL;`);
db.exec(`PRAGMA busy_timeout = 5000;`);
ensureSchema(db);
ensureFlowRegistryPermissions(pathname);
cachedDatabase = {
db,
path: pathname,
statements: createStatements(db),
};
return cachedDatabase;
}
function withWriteTransaction(write: (statements: FlowRegistryStatements) => void) {
const { db, path, statements } = openFlowRegistryDatabase();
db.exec("BEGIN IMMEDIATE");
try {
write(statements);
db.exec("COMMIT");
ensureFlowRegistryPermissions(path);
} catch (error) {
db.exec("ROLLBACK");
throw error;
}
}
export function loadFlowRegistryStateFromSqlite(): FlowRegistryStoreSnapshot {
const { statements } = openFlowRegistryDatabase();
const rows = statements.selectAll.all() as FlowRegistryRow[];
return {
flows: new Map(rows.map((row) => [row.flow_id, rowToFlowRecord(row)])),
};
}
export function saveFlowRegistryStateToSqlite(snapshot: FlowRegistryStoreSnapshot) {
withWriteTransaction((statements) => {
statements.clearRows.run();
for (const flow of snapshot.flows.values()) {
statements.upsertRow.run(bindFlowRecord(flow));
}
});
}
export function upsertFlowRegistryRecordToSqlite(flow: FlowRecord) {
const store = openFlowRegistryDatabase();
store.statements.upsertRow.run(bindFlowRecord(flow));
ensureFlowRegistryPermissions(store.path);
}
export function deleteFlowRegistryRecordFromSqlite(flowId: string) {
const store = openFlowRegistryDatabase();
store.statements.deleteRow.run(flowId);
ensureFlowRegistryPermissions(store.path);
}
export function closeFlowRegistrySqliteStore() {
if (!cachedDatabase) {
return;
}
cachedDatabase.db.close();
cachedDatabase = null;
}

View File

@ -0,0 +1,104 @@
import { statSync } from "node:fs";
import { afterEach, describe, expect, it, vi } from "vitest";
import { withTempDir } from "../test-helpers/temp-dir.js";
import { createFlowRecord, getFlowById, resetFlowRegistryForTests } from "./flow-registry.js";
import { resolveFlowRegistryDir, resolveFlowRegistrySqlitePath } from "./flow-registry.paths.js";
import { configureFlowRegistryRuntime } from "./flow-registry.store.js";
import type { FlowRecord } from "./flow-registry.types.js";
function createStoredFlow(): FlowRecord {
return {
flowId: "flow-restored",
ownerSessionKey: "agent:main:main",
status: "running",
notifyPolicy: "done_only",
goal: "Restored flow",
currentStep: "spawn_task",
createdAt: 100,
updatedAt: 100,
};
}
describe("flow-registry store runtime", () => {
afterEach(() => {
delete process.env.OPENCLAW_STATE_DIR;
resetFlowRegistryForTests();
});
it("uses the configured flow store for restore and save", () => {
const storedFlow = createStoredFlow();
const loadSnapshot = vi.fn(() => ({
flows: new Map([[storedFlow.flowId, storedFlow]]),
}));
const saveSnapshot = vi.fn();
configureFlowRegistryRuntime({
store: {
loadSnapshot,
saveSnapshot,
},
});
expect(getFlowById("flow-restored")).toMatchObject({
flowId: "flow-restored",
goal: "Restored flow",
});
expect(loadSnapshot).toHaveBeenCalledTimes(1);
createFlowRecord({
ownerSessionKey: "agent:main:main",
goal: "New flow",
status: "running",
currentStep: "wait_for",
});
expect(saveSnapshot).toHaveBeenCalled();
const latestSnapshot = saveSnapshot.mock.calls.at(-1)?.[0] as {
flows: ReadonlyMap<string, FlowRecord>;
};
expect(latestSnapshot.flows.size).toBe(2);
expect(latestSnapshot.flows.get("flow-restored")?.goal).toBe("Restored flow");
});
it("restores persisted flows from the default sqlite store", async () => {
await withTempDir({ prefix: "openclaw-flow-store-" }, async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetFlowRegistryForTests();
const created = createFlowRecord({
ownerSessionKey: "agent:main:main",
goal: "Persisted flow",
status: "waiting",
currentStep: "ask_user",
});
resetFlowRegistryForTests({ persist: false });
expect(getFlowById(created.flowId)).toMatchObject({
flowId: created.flowId,
status: "waiting",
currentStep: "ask_user",
});
});
});
it("hardens the sqlite flow store directory and file modes", async () => {
if (process.platform === "win32") {
return;
}
await withTempDir({ prefix: "openclaw-flow-store-" }, async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetFlowRegistryForTests();
createFlowRecord({
ownerSessionKey: "agent:main:main",
goal: "Secured flow",
status: "queued",
});
const registryDir = resolveFlowRegistryDir(process.env);
const sqlitePath = resolveFlowRegistrySqlitePath(process.env);
expect(statSync(registryDir).mode & 0o777).toBe(0o700);
expect(statSync(sqlitePath).mode & 0o777).toBe(0o600);
});
});
});

View File

@ -0,0 +1,45 @@
import {
closeFlowRegistrySqliteStore,
deleteFlowRegistryRecordFromSqlite,
loadFlowRegistryStateFromSqlite,
saveFlowRegistryStateToSqlite,
upsertFlowRegistryRecordToSqlite,
} from "./flow-registry.store.sqlite.js";
import type { FlowRecord } from "./flow-registry.types.js";
export type FlowRegistryStoreSnapshot = {
flows: Map<string, FlowRecord>;
};
export type FlowRegistryStore = {
loadSnapshot: () => FlowRegistryStoreSnapshot;
saveSnapshot: (snapshot: FlowRegistryStoreSnapshot) => void;
upsertFlow?: (flow: FlowRecord) => void;
deleteFlow?: (flowId: string) => void;
close?: () => void;
};
const defaultFlowRegistryStore: FlowRegistryStore = {
loadSnapshot: loadFlowRegistryStateFromSqlite,
saveSnapshot: saveFlowRegistryStateToSqlite,
upsertFlow: upsertFlowRegistryRecordToSqlite,
deleteFlow: deleteFlowRegistryRecordFromSqlite,
close: closeFlowRegistrySqliteStore,
};
let configuredFlowRegistryStore: FlowRegistryStore = defaultFlowRegistryStore;
export function getFlowRegistryStore(): FlowRegistryStore {
return configuredFlowRegistryStore;
}
export function configureFlowRegistryRuntime(params: { store?: FlowRegistryStore }) {
if (params.store) {
configuredFlowRegistryStore = params.store;
}
}
export function resetFlowRegistryRuntimeForTests() {
configuredFlowRegistryStore.close?.();
configuredFlowRegistryStore = defaultFlowRegistryStore;
}

View File

@ -0,0 +1,115 @@
import { afterEach, describe, expect, it } from "vitest";
import { withTempDir } from "../test-helpers/temp-dir.js";
import {
createFlowRecord,
deleteFlowRecordById,
getFlowById,
listFlowRecords,
resetFlowRegistryForTests,
updateFlowRecordById,
} from "./flow-registry.js";
const ORIGINAL_STATE_DIR = process.env.OPENCLAW_STATE_DIR;
describe("flow-registry", () => {
afterEach(() => {
if (ORIGINAL_STATE_DIR === undefined) {
delete process.env.OPENCLAW_STATE_DIR;
} else {
process.env.OPENCLAW_STATE_DIR = ORIGINAL_STATE_DIR;
}
resetFlowRegistryForTests();
});
it("creates, updates, lists, and deletes flow records", async () => {
await withTempDir({ prefix: "openclaw-flow-registry-" }, async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetFlowRegistryForTests();
const created = createFlowRecord({
ownerSessionKey: "agent:main:main",
goal: "Investigate flaky test",
status: "running",
currentStep: "spawn_task",
});
expect(getFlowById(created.flowId)).toMatchObject({
flowId: created.flowId,
status: "running",
currentStep: "spawn_task",
});
const updated = updateFlowRecordById(created.flowId, {
status: "waiting",
currentStep: "ask_user",
});
expect(updated).toMatchObject({
flowId: created.flowId,
status: "waiting",
currentStep: "ask_user",
});
expect(listFlowRecords()).toEqual([
expect.objectContaining({
flowId: created.flowId,
goal: "Investigate flaky test",
status: "waiting",
}),
]);
expect(deleteFlowRecordById(created.flowId)).toBe(true);
expect(getFlowById(created.flowId)).toBeUndefined();
expect(listFlowRecords()).toEqual([]);
});
});
it("applies minimal defaults for new flow records", async () => {
await withTempDir({ prefix: "openclaw-flow-registry-" }, async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetFlowRegistryForTests();
const created = createFlowRecord({
ownerSessionKey: "agent:main:main",
goal: "Background job",
});
expect(created).toMatchObject({
flowId: expect.any(String),
ownerSessionKey: "agent:main:main",
goal: "Background job",
status: "queued",
notifyPolicy: "done_only",
});
expect(created.currentStep).toBeUndefined();
expect(created.endedAt).toBeUndefined();
});
});
it("preserves endedAt when later updates change other flow fields", async () => {
await withTempDir({ prefix: "openclaw-flow-registry-" }, async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetFlowRegistryForTests();
const created = createFlowRecord({
ownerSessionKey: "agent:main:main",
goal: "Finish a task",
status: "succeeded",
endedAt: 456,
});
const updated = updateFlowRecordById(created.flowId, {
currentStep: "finish",
});
expect(updated).toMatchObject({
flowId: created.flowId,
currentStep: "finish",
endedAt: 456,
});
expect(getFlowById(created.flowId)).toMatchObject({
flowId: created.flowId,
endedAt: 456,
});
});
});
});

148
src/tasks/flow-registry.ts Normal file
View File

@ -0,0 +1,148 @@
import crypto from "node:crypto";
import { getFlowRegistryStore, resetFlowRegistryRuntimeForTests } from "./flow-registry.store.js";
import type { FlowRecord, FlowStatus } from "./flow-registry.types.js";
import type { TaskNotifyPolicy } from "./task-registry.types.js";
const flows = new Map<string, FlowRecord>();
let restoreAttempted = false;
function cloneFlowRecord(record: FlowRecord): FlowRecord {
return {
...record,
...(record.requesterOrigin ? { requesterOrigin: { ...record.requesterOrigin } } : {}),
};
}
function snapshotFlowRecords(source: ReadonlyMap<string, FlowRecord>): FlowRecord[] {
return [...source.values()].map((record) => cloneFlowRecord(record));
}
function ensureNotifyPolicy(notifyPolicy?: TaskNotifyPolicy): TaskNotifyPolicy {
return notifyPolicy ?? "done_only";
}
function ensureFlowRegistryReady() {
if (restoreAttempted) {
return;
}
restoreAttempted = true;
const restored = getFlowRegistryStore().loadSnapshot();
flows.clear();
for (const [flowId, flow] of restored.flows) {
flows.set(flowId, cloneFlowRecord(flow));
}
}
function persistFlowRegistry() {
getFlowRegistryStore().saveSnapshot({
flows: new Map(snapshotFlowRecords(flows).map((flow) => [flow.flowId, flow])),
});
}
function persistFlowUpsert(flow: FlowRecord) {
const store = getFlowRegistryStore();
if (store.upsertFlow) {
store.upsertFlow(cloneFlowRecord(flow));
return;
}
persistFlowRegistry();
}
function persistFlowDelete(flowId: string) {
const store = getFlowRegistryStore();
if (store.deleteFlow) {
store.deleteFlow(flowId);
return;
}
persistFlowRegistry();
}
export function createFlowRecord(params: {
ownerSessionKey: string;
requesterOrigin?: FlowRecord["requesterOrigin"];
status?: FlowStatus;
notifyPolicy?: TaskNotifyPolicy;
goal: string;
currentStep?: string;
createdAt?: number;
updatedAt?: number;
endedAt?: number;
}): FlowRecord {
ensureFlowRegistryReady();
const now = params.createdAt ?? Date.now();
const record: FlowRecord = {
flowId: crypto.randomUUID(),
ownerSessionKey: params.ownerSessionKey,
...(params.requesterOrigin ? { requesterOrigin: { ...params.requesterOrigin } } : {}),
status: params.status ?? "queued",
notifyPolicy: ensureNotifyPolicy(params.notifyPolicy),
goal: params.goal,
currentStep: params.currentStep?.trim() || undefined,
createdAt: now,
updatedAt: params.updatedAt ?? now,
...(params.endedAt !== undefined ? { endedAt: params.endedAt } : {}),
};
flows.set(record.flowId, record);
persistFlowUpsert(record);
return cloneFlowRecord(record);
}
export function updateFlowRecordById(
flowId: string,
patch: Partial<
Pick<FlowRecord, "status" | "notifyPolicy" | "goal" | "currentStep" | "updatedAt" | "endedAt">
>,
): FlowRecord | null {
ensureFlowRegistryReady();
const current = flows.get(flowId);
if (!current) {
return null;
}
const next: FlowRecord = {
...current,
...(patch.status ? { status: patch.status } : {}),
...(patch.notifyPolicy ? { notifyPolicy: patch.notifyPolicy } : {}),
...(patch.goal ? { goal: patch.goal } : {}),
currentStep:
patch.currentStep === undefined ? current.currentStep : patch.currentStep.trim() || undefined,
updatedAt: patch.updatedAt ?? Date.now(),
...(patch.endedAt !== undefined ? { endedAt: patch.endedAt } : {}),
};
flows.set(flowId, next);
persistFlowUpsert(next);
return cloneFlowRecord(next);
}
export function getFlowById(flowId: string): FlowRecord | undefined {
ensureFlowRegistryReady();
const flow = flows.get(flowId);
return flow ? cloneFlowRecord(flow) : undefined;
}
export function listFlowRecords(): FlowRecord[] {
ensureFlowRegistryReady();
return [...flows.values()]
.map((flow) => cloneFlowRecord(flow))
.toSorted((left, right) => left.createdAt - right.createdAt);
}
export function deleteFlowRecordById(flowId: string): boolean {
ensureFlowRegistryReady();
const current = flows.get(flowId);
if (!current) {
return false;
}
flows.delete(flowId);
persistFlowDelete(flowId);
return true;
}
export function resetFlowRegistryForTests(opts?: { persist?: boolean }) {
flows.clear();
restoreAttempted = false;
resetFlowRegistryRuntimeForTests();
if (opts?.persist !== false) {
persistFlowRegistry();
getFlowRegistryStore().close?.();
}
}

View File

@ -0,0 +1,25 @@
import type { DeliveryContext } from "../utils/delivery-context.js";
import type { TaskNotifyPolicy } from "./task-registry.types.js";
export type FlowStatus =
| "queued"
| "running"
| "waiting"
| "blocked"
| "succeeded"
| "failed"
| "cancelled"
| "lost";
export type FlowRecord = {
flowId: string;
ownerSessionKey: string;
requesterOrigin?: DeliveryContext;
status: FlowStatus;
notifyPolicy: TaskNotifyPolicy;
goal: string;
currentStep?: string;
createdAt: number;
updatedAt: number;
endedAt?: number;
};

View File

@ -23,6 +23,7 @@ export function createQueuedTaskRun(params: {
sourceId?: string;
requesterSessionKey: string;
requesterOrigin?: TaskDeliveryState["requesterOrigin"];
parentFlowId?: string;
childSessionKey?: string;
parentTaskId?: string;
agentId?: string;
@ -44,6 +45,7 @@ export function createRunningTaskRun(params: {
sourceId?: string;
requesterSessionKey: string;
requesterOrigin?: TaskDeliveryState["requesterOrigin"];
parentFlowId?: string;
childSessionKey?: string;
parentTaskId?: string;
agentId?: string;

View File

@ -11,6 +11,7 @@ type TaskRegistryRow = {
runtime: TaskRecord["runtime"];
source_id: string | null;
requester_session_key: string;
parent_flow_id: string | null;
child_session_key: string | null;
parent_task_id: string | null;
agent_id: string | null;
@ -91,6 +92,7 @@ function rowToTaskRecord(row: TaskRegistryRow): TaskRecord {
runtime: row.runtime,
...(row.source_id ? { sourceId: row.source_id } : {}),
requesterSessionKey: row.requester_session_key,
...(row.parent_flow_id ? { parentFlowId: row.parent_flow_id } : {}),
...(row.child_session_key ? { childSessionKey: row.child_session_key } : {}),
...(row.parent_task_id ? { parentTaskId: row.parent_task_id } : {}),
...(row.agent_id ? { agentId: row.agent_id } : {}),
@ -128,6 +130,7 @@ function bindTaskRecord(record: TaskRecord) {
runtime: record.runtime,
source_id: record.sourceId ?? null,
requester_session_key: record.requesterSessionKey,
parent_flow_id: record.parentFlowId ?? null,
child_session_key: record.childSessionKey ?? null,
parent_task_id: record.parentTaskId ?? null,
agent_id: record.agentId ?? null,
@ -165,6 +168,7 @@ function createStatements(db: DatabaseSync): TaskRegistryStatements {
runtime,
source_id,
requester_session_key,
parent_flow_id,
child_session_key,
parent_task_id,
agent_id,
@ -200,6 +204,7 @@ function createStatements(db: DatabaseSync): TaskRegistryStatements {
runtime,
source_id,
requester_session_key,
parent_flow_id,
child_session_key,
parent_task_id,
agent_id,
@ -223,6 +228,7 @@ function createStatements(db: DatabaseSync): TaskRegistryStatements {
@runtime,
@source_id,
@requester_session_key,
@parent_flow_id,
@child_session_key,
@parent_task_id,
@agent_id,
@ -246,6 +252,7 @@ function createStatements(db: DatabaseSync): TaskRegistryStatements {
runtime = excluded.runtime,
source_id = excluded.source_id,
requester_session_key = excluded.requester_session_key,
parent_flow_id = excluded.parent_flow_id,
child_session_key = excluded.child_session_key,
parent_task_id = excluded.parent_task_id,
agent_id = excluded.agent_id,
@ -290,6 +297,7 @@ function ensureSchema(db: DatabaseSync) {
runtime TEXT NOT NULL,
source_id TEXT,
requester_session_key TEXT NOT NULL,
parent_flow_id TEXT,
child_session_key TEXT,
parent_task_id TEXT,
agent_id TEXT,
@ -318,15 +326,30 @@ function ensureSchema(db: DatabaseSync) {
);
`);
db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_run_id ON task_runs(run_id);`);
ensureColumn(db, "task_runs", "parent_flow_id", "TEXT");
db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_status ON task_runs(status);`);
db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_runtime_status ON task_runs(runtime, status);`);
db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_cleanup_after ON task_runs(cleanup_after);`);
db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_last_event_at ON task_runs(last_event_at);`);
db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_parent_flow_id ON task_runs(parent_flow_id);`);
db.exec(
`CREATE INDEX IF NOT EXISTS idx_task_runs_child_session_key ON task_runs(child_session_key);`,
);
}
function ensureColumn(
db: DatabaseSync,
tableName: string,
columnName: string,
columnDefinition: string,
) {
const rows = db.prepare(`PRAGMA table_info(${tableName})`).all() as Array<{ name?: string }>;
if (rows.some((row) => row.name === columnName)) {
return;
}
db.exec(`ALTER TABLE ${tableName} ADD COLUMN ${columnName} ${columnDefinition};`);
}
function ensureTaskRegistryPermissions(pathname: string) {
const dir = resolveTaskRegistryDir(process.env);
mkdirSync(dir, { recursive: true, mode: TASK_REGISTRY_DIR_MODE });

View File

@ -138,6 +138,26 @@ describe("task-registry store runtime", () => {
});
});
it("persists parent flow linkage on task records", () => {
const created = createTaskRecord({
runtime: "acp",
requesterSessionKey: "agent:main:main",
parentFlowId: "flow-123",
runId: "run-linked",
task: "Linked task",
status: "running",
deliveryStatus: "pending",
});
resetTaskRegistryForTests({ persist: false });
expect(findTaskByRunId("run-linked")).toMatchObject({
taskId: created.taskId,
parentFlowId: "flow-123",
task: "Linked task",
});
});
it("hardens the sqlite task store directory and file modes", () => {
if (process.platform === "win32") {
return;

View File

@ -679,6 +679,48 @@ describe("task-registry", () => {
});
});
it("adopts parent flow linkage when collapsing onto an earlier ACP record", async () => {
await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
const directTask = createTaskRecord({
runtime: "acp",
requesterSessionKey: "agent:main:main",
requesterOrigin: {
channel: "telegram",
to: "telegram:123",
},
childSessionKey: "agent:main:acp:child",
runId: "run-collapse-parent-flow",
task: "Direct ACP child",
status: "running",
deliveryStatus: "pending",
});
const spawnedTask = createTaskRecord({
runtime: "acp",
requesterSessionKey: "agent:main:main",
requesterOrigin: {
channel: "telegram",
to: "telegram:123",
},
parentFlowId: "flow-123",
childSessionKey: "agent:main:acp:child",
runId: "run-collapse-parent-flow",
task: "Spawn ACP child",
status: "running",
deliveryStatus: "pending",
});
expect(spawnedTask.taskId).toBe(directTask.taskId);
expect(findTaskByRunId("run-collapse-parent-flow")).toMatchObject({
taskId: directTask.taskId,
parentFlowId: "flow-123",
});
});
});
it("collapses ACP run-owned task creation onto the existing spawned task", async () => {
await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
@ -772,6 +814,7 @@ describe("task-registry", () => {
const task = createTaskRecord({
runtime: "subagent",
requesterSessionKey: "agent:main:main",
parentFlowId: "flow-restore",
childSessionKey: "agent:main:subagent:child",
runId: "run-restore",
task: "Restore me",
@ -785,6 +828,7 @@ describe("task-registry", () => {
expect(resolveTaskForLookupToken(task.taskId)).toMatchObject({
taskId: task.taskId,
parentFlowId: "flow-restore",
runId: "run-restore",
task: "Restore me",
});

View File

@ -367,6 +367,7 @@ function mergeExistingTaskForCreate(
params: {
requesterOrigin?: TaskDeliveryState["requesterOrigin"];
sourceId?: string;
parentFlowId?: string;
parentTaskId?: string;
agentId?: string;
label?: string;
@ -389,6 +390,9 @@ function mergeExistingTaskForCreate(
if (params.sourceId?.trim() && !existing.sourceId?.trim()) {
patch.sourceId = params.sourceId.trim();
}
if (params.parentFlowId?.trim() && !existing.parentFlowId?.trim()) {
patch.parentFlowId = params.parentFlowId.trim();
}
if (params.parentTaskId?.trim() && !existing.parentTaskId?.trim()) {
patch.parentTaskId = params.parentTaskId.trim();
}
@ -910,6 +914,7 @@ export function createTaskRecord(params: {
sourceId?: string;
requesterSessionKey: string;
requesterOrigin?: TaskDeliveryState["requesterOrigin"];
parentFlowId?: string;
childSessionKey?: string;
parentTaskId?: string;
agentId?: string;
@ -947,6 +952,7 @@ export function createTaskRecord(params: {
runtime: params.runtime,
sourceId: params.sourceId?.trim() || undefined,
requesterSessionKey: params.requesterSessionKey,
parentFlowId: params.parentFlowId?.trim() || undefined,
childSessionKey: params.childSessionKey,
parentTaskId: params.parentTaskId?.trim() || undefined,
agentId: params.agentId?.trim() || undefined,

View File

@ -54,6 +54,7 @@ export type TaskRecord = {
runtime: TaskRuntime;
sourceId?: string;
requesterSessionKey: string;
parentFlowId?: string;
childSessionKey?: string;
parentTaskId?: string;
agentId?: string;