TaskFlow: restore managed substrate (#58930)

Merged via squash.

Prepared head SHA: c99093838f
Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com>
Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com>
Reviewed-by: @mbelinky
This commit is contained in:
Mariano 2026-04-02 12:17:56 +02:00 committed by GitHub
parent 52d2bd5cc6
commit 2fa4c7cc61
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
25 changed files with 3243 additions and 6 deletions

View File

@ -11,6 +11,11 @@ Docs: https://docs.openclaw.ai
- Providers/runtime: add provider-owned replay hook surfaces for transcript policy, replay cleanup, and reasoning-mode dispatch. (#59143) Thanks @jalehman.
- Diffs: add plugin-owned `viewerBaseUrl` so viewer links can use a stable proxy/public origin without passing `baseUrl` on every tool call. (#59341) Related #59227. Thanks @gumadeiras.
- Matrix/plugin: emit spec-compliant `m.mentions` metadata across text sends, media captions, edits, poll fallback text, and action-driven edits so Matrix mentions notify reliably in clients like Element. (#59323) Thanks @gumadeiras.
- Agents/compaction: resolve `agents.defaults.compaction.model` consistently for manual `/compact` and other context-engine compaction paths, so engine-owned compaction uses the configured override model across runtime entrypoints. (#56710) Thanks @oliviareid-svg
- Channels/session routing: move provider-specific session conversation grammar into plugin-owned session-key surfaces, preserving Telegram topic routing and Feishu scoped inheritance across bootstrap, model override, restart, and tool-policy paths.
- WhatsApp/reactions: add `reactionLevel` guidance for agent reactions. Thanks @mcaxtr.
- Feishu/comments: add a dedicated Drive comment-event flow with comment-thread context resolution, in-thread replies, and `feishu_drive` comment actions for document collaboration workflows. (#58497) thanks @wittam-01.
- Tasks/TaskFlow: restore the core TaskFlow substrate with managed-vs-mirrored sync modes, durable flow state/revision tracking, and `openclaw flows` inspection/recovery primitives so background orchestration can persist and be operated separately from plugin authoring layers. (#58930) Thanks @mbelinky.
### Fixes

View File

@ -13,6 +13,9 @@ const mocks = vi.hoisted(() => ({
tasksShowCommand: vi.fn(),
tasksNotifyCommand: vi.fn(),
tasksCancelCommand: vi.fn(),
flowsListCommand: vi.fn(),
flowsShowCommand: vi.fn(),
flowsCancelCommand: vi.fn(),
setVerbose: vi.fn(),
runtime: {
log: vi.fn(),
@ -31,6 +34,9 @@ const tasksMaintenanceCommand = mocks.tasksMaintenanceCommand;
const tasksShowCommand = mocks.tasksShowCommand;
const tasksNotifyCommand = mocks.tasksNotifyCommand;
const tasksCancelCommand = mocks.tasksCancelCommand;
const flowsListCommand = mocks.flowsListCommand;
const flowsShowCommand = mocks.flowsShowCommand;
const flowsCancelCommand = mocks.flowsCancelCommand;
const setVerbose = mocks.setVerbose;
const runtime = mocks.runtime;
@ -59,6 +65,12 @@ vi.mock("../../commands/tasks.js", () => ({
tasksCancelCommand: mocks.tasksCancelCommand,
}));
vi.mock("../../commands/flows.js", () => ({
flowsListCommand: mocks.flowsListCommand,
flowsShowCommand: mocks.flowsShowCommand,
flowsCancelCommand: mocks.flowsCancelCommand,
}));
vi.mock("../../globals.js", () => ({
setVerbose: mocks.setVerbose,
}));
@ -87,6 +99,9 @@ describe("registerStatusHealthSessionsCommands", () => {
tasksShowCommand.mockResolvedValue(undefined);
tasksNotifyCommand.mockResolvedValue(undefined);
tasksCancelCommand.mockResolvedValue(undefined);
flowsListCommand.mockResolvedValue(undefined);
flowsShowCommand.mockResolvedValue(undefined);
flowsCancelCommand.mockResolvedValue(undefined);
});
it("runs status command with timeout and debug-derived verbose", async () => {
@ -223,6 +238,34 @@ describe("registerStatusHealthSessionsCommands", () => {
);
});
it("runs flows subcommands with forwarded options", async () => {
await runCli(["flows", "list", "--json", "--status", "blocked"]);
expect(flowsListCommand).toHaveBeenCalledWith(
expect.objectContaining({
json: true,
status: "blocked",
}),
runtime,
);
await runCli(["flows", "show", "flow-123", "--json"]);
expect(flowsShowCommand).toHaveBeenCalledWith(
expect.objectContaining({
lookup: "flow-123",
json: true,
}),
runtime,
);
await runCli(["flows", "cancel", "flow-123"]);
expect(flowsCancelCommand).toHaveBeenCalledWith(
expect.objectContaining({
lookup: "flow-123",
}),
runtime,
);
});
it("forwards parent-level all-agents to cleanup subcommand", async () => {
await runCli(["sessions", "--all-agents", "cleanup", "--dry-run"]);

View File

@ -1,4 +1,5 @@
import type { Command } from "commander";
import { flowsCancelCommand, flowsListCommand, flowsShowCommand } from "../../commands/flows.js";
import { healthCommand } from "../../commands/health.js";
import { sessionsCleanupCommand } from "../../commands/sessions-cleanup.js";
import { sessionsCommand } from "../../commands/sessions.js";
@ -373,4 +374,79 @@ export function registerStatusHealthSessionsCommands(program: Command) {
);
});
});
const flowsCmd = program
.command("flows")
.description("Inspect durable background flow state")
.option("--json", "Output as JSON", false)
.option(
"--status <name>",
"Filter by status (queued, running, waiting, blocked, succeeded, failed, cancelled, lost)",
)
.action(async (opts) => {
await runCommandWithRuntime(defaultRuntime, async () => {
await flowsListCommand(
{
json: Boolean(opts.json),
status: opts.status as string | undefined,
},
defaultRuntime,
);
});
});
flowsCmd.enablePositionalOptions();
flowsCmd
.command("list")
.description("List tracked background flows")
.option("--json", "Output as JSON", false)
.option(
"--status <name>",
"Filter by status (queued, running, waiting, blocked, succeeded, failed, cancelled, lost)",
)
.action(async (opts, command) => {
const parentOpts = command.parent?.opts() as { json?: boolean; status?: string } | undefined;
await runCommandWithRuntime(defaultRuntime, async () => {
await flowsListCommand(
{
json: Boolean(opts.json || parentOpts?.json),
status: (opts.status as string | undefined) ?? parentOpts?.status,
},
defaultRuntime,
);
});
});
flowsCmd
.command("show")
.description("Show one background flow by flow id or owner key")
.argument("<lookup>", "Flow id or owner key")
.option("--json", "Output as JSON", false)
.action(async (lookup, opts, command) => {
const parentOpts = command.parent?.opts() as { json?: boolean } | undefined;
await runCommandWithRuntime(defaultRuntime, async () => {
await flowsShowCommand(
{
lookup,
json: Boolean(opts.json || parentOpts?.json),
},
defaultRuntime,
);
});
});
flowsCmd
.command("cancel")
.description("Cancel a running background flow")
.argument("<lookup>", "Flow id or owner key")
.action(async (lookup) => {
await runCommandWithRuntime(defaultRuntime, async () => {
await flowsCancelCommand(
{
lookup,
},
defaultRuntime,
);
});
});
}

View File

@ -13,6 +13,8 @@ const mocks = vi.hoisted(() => ({
buildWorkspaceSkillStatus: vi.fn(),
buildPluginStatusReport: vi.fn(),
buildPluginCompatibilityWarnings: vi.fn(),
listFlowRecords: vi.fn<() => unknown[]>(() => []),
listTasksForFlowId: vi.fn<(flowId: string) => unknown[]>((_flowId: string) => []),
}));
vi.mock("../agents/agent-scope.js", () => ({
@ -30,9 +32,21 @@ vi.mock("../plugins/status.js", () => ({
mocks.buildPluginCompatibilityWarnings(...args),
}));
vi.mock("../tasks/flow-runtime-internal.js", () => ({
listFlowRecords: () => mocks.listFlowRecords(),
}));
vi.mock("../tasks/runtime-internal.js", () => ({
listTasksForFlowId: (flowId: string) => mocks.listTasksForFlowId(flowId),
}));
async function runNoteWorkspaceStatusForTest(
loadResult: ReturnType<typeof createPluginLoadResult>,
compatibilityWarnings: string[] = [],
opts?: {
flows?: unknown[];
tasksByFlowId?: (flowId: string) => unknown[];
},
) {
mocks.resolveDefaultAgentId.mockReturnValue("default");
mocks.resolveAgentWorkspaceDir.mockReturnValue("/workspace");
@ -44,6 +58,10 @@ async function runNoteWorkspaceStatusForTest(
...loadResult,
});
mocks.buildPluginCompatibilityWarnings.mockReturnValue(compatibilityWarnings);
mocks.listFlowRecords.mockReturnValue(opts?.flows ?? []);
mocks.listTasksForFlowId.mockImplementation((flowId: string) =>
opts?.tasksByFlowId ? opts.tasksByFlowId(flowId) : [],
);
const noteSpy = vi.spyOn(noteModule, "note").mockImplementation(() => {});
noteWorkspaceStatus({});
@ -159,4 +177,32 @@ describe("noteWorkspaceStatus", () => {
noteSpy.mockRestore();
}
});
it("adds TaskFlow recovery hints for broken blocked flows", async () => {
const noteSpy = await runNoteWorkspaceStatusForTest(createPluginLoadResult(), [], {
flows: [
{
flowId: "flow-123",
syncMode: "managed",
ownerKey: "agent:main:main",
revision: 0,
status: "blocked",
notifyPolicy: "done_only",
goal: "Investigate PR batch",
blockedTaskId: "task-missing",
createdAt: 100,
updatedAt: 100,
},
],
tasksByFlowId: () => [],
});
try {
const recoveryCalls = noteSpy.mock.calls.filter(([, title]) => title === "TaskFlow recovery");
expect(recoveryCalls).toHaveLength(1);
expect(String(recoveryCalls[0]?.[0])).toContain("flow-123");
expect(String(recoveryCalls[0]?.[0])).toContain("openclaw flows show <flow-id>");
} finally {
noteSpy.mockRestore();
}
});
});

View File

@ -1,10 +1,54 @@
import { resolveAgentWorkspaceDir, resolveDefaultAgentId } from "../agents/agent-scope.js";
import { buildWorkspaceSkillStatus } from "../agents/skills-status.js";
import { formatCliCommand } from "../cli/command-format.js";
import type { OpenClawConfig } from "../config/config.js";
import { buildPluginCompatibilityWarnings, buildPluginStatusReport } from "../plugins/status.js";
import { listFlowRecords } from "../tasks/flow-runtime-internal.js";
import { listTasksForFlowId } from "../tasks/runtime-internal.js";
import { note } from "../terminal/note.js";
import { detectLegacyWorkspaceDirs, formatLegacyWorkspaceWarning } from "./doctor-workspace.js";
function noteFlowRecoveryHints() {
const suspicious = listFlowRecords().flatMap((flow) => {
const tasks = listTasksForFlowId(flow.flowId);
const findings: string[] = [];
if (
flow.syncMode === "managed" &&
flow.status === "running" &&
tasks.length === 0 &&
flow.waitJson === undefined
) {
findings.push(
`${flow.flowId}: running managed flow has no linked tasks or wait state; inspect or cancel it manually.`,
);
}
if (
flow.status === "blocked" &&
flow.blockedTaskId &&
!tasks.some((task) => task.taskId === flow.blockedTaskId)
) {
findings.push(
`${flow.flowId}: blocked flow points at missing task ${flow.blockedTaskId}; inspect before retrying.`,
);
}
return findings;
});
if (suspicious.length === 0) {
return;
}
note(
[
...suspicious.slice(0, 5),
suspicious.length > 5 ? `...and ${suspicious.length - 5} more.` : null,
`Inspect: ${formatCliCommand("openclaw flows show <flow-id>")}`,
`Cancel: ${formatCliCommand("openclaw flows cancel <flow-id>")}`,
]
.filter((line): line is string => Boolean(line))
.join("\n"),
"TaskFlow recovery",
);
}
export function noteWorkspaceStatus(cfg: OpenClawConfig) {
const workspaceDir = resolveAgentWorkspaceDir(cfg, resolveDefaultAgentId(cfg));
const legacyWorkspace = detectLegacyWorkspaceDirs({ workspaceDir });
@ -74,5 +118,7 @@ export function noteWorkspaceStatus(cfg: OpenClawConfig) {
note(lines.join("\n"), "Plugin diagnostics");
}
noteFlowRecoveryHints();
return { workspaceDir };
}

247
src/commands/flows.ts Normal file
View File

@ -0,0 +1,247 @@
import { loadConfig } from "../config/config.js";
import { info } from "../globals.js";
import type { RuntimeEnv } from "../runtime.js";
import type { FlowRecord, FlowStatus } from "../tasks/flow-registry.types.js";
import {
getFlowById,
listFlowRecords,
resolveFlowForLookupToken,
} from "../tasks/flow-runtime-internal.js";
import { listTasksForFlowId } from "../tasks/runtime-internal.js";
import { cancelFlowById, getFlowTaskSummary } from "../tasks/task-executor.js";
import { isRich, theme } from "../terminal/theme.js";
const ID_PAD = 10;
const STATUS_PAD = 10;
const MODE_PAD = 14;
const REV_PAD = 6;
const CTRL_PAD = 20;
function truncate(value: string, maxChars: number) {
if (value.length <= maxChars) {
return value;
}
if (maxChars <= 1) {
return value.slice(0, maxChars);
}
return `${value.slice(0, maxChars - 1)}`;
}
function shortToken(value: string | undefined, maxChars = ID_PAD): string {
const trimmed = value?.trim();
if (!trimmed) {
return "n/a";
}
return truncate(trimmed, maxChars);
}
function formatFlowStatusCell(status: FlowStatus, rich: boolean) {
const padded = status.padEnd(STATUS_PAD);
if (!rich) {
return padded;
}
if (status === "succeeded") {
return theme.success(padded);
}
if (status === "failed" || status === "lost") {
return theme.error(padded);
}
if (status === "running") {
return theme.accentBright(padded);
}
if (status === "blocked") {
return theme.warn(padded);
}
return theme.muted(padded);
}
function formatFlowRows(flows: FlowRecord[], rich: boolean) {
const header = [
"TaskFlow".padEnd(ID_PAD),
"Mode".padEnd(MODE_PAD),
"Status".padEnd(STATUS_PAD),
"Rev".padEnd(REV_PAD),
"Controller".padEnd(CTRL_PAD),
"Tasks".padEnd(14),
"Goal",
].join(" ");
const lines = [rich ? theme.heading(header) : header];
for (const flow of flows) {
const taskSummary = getFlowTaskSummary(flow.flowId);
const counts = `${taskSummary.active} active/${taskSummary.total} total`;
lines.push(
[
shortToken(flow.flowId).padEnd(ID_PAD),
flow.syncMode.padEnd(MODE_PAD),
formatFlowStatusCell(flow.status, rich),
String(flow.revision).padEnd(REV_PAD),
truncate(flow.controllerId ?? "n/a", CTRL_PAD).padEnd(CTRL_PAD),
counts.padEnd(14),
truncate(flow.goal, 80),
].join(" "),
);
}
return lines;
}
function formatFlowListSummary(flows: FlowRecord[]) {
const active = flows.filter(
(flow) => flow.status === "queued" || flow.status === "running",
).length;
const blocked = flows.filter((flow) => flow.status === "blocked").length;
const cancelRequested = flows.filter((flow) => flow.cancelRequestedAt != null).length;
return `${active} active · ${blocked} blocked · ${cancelRequested} cancel-requested · ${flows.length} total`;
}
function summarizeWait(flow: FlowRecord): string {
if (flow.waitJson == null) {
return "n/a";
}
if (
typeof flow.waitJson === "string" ||
typeof flow.waitJson === "number" ||
typeof flow.waitJson === "boolean"
) {
return String(flow.waitJson);
}
if (Array.isArray(flow.waitJson)) {
return `array(${flow.waitJson.length})`;
}
return Object.keys(flow.waitJson).toSorted().join(", ") || "object";
}
export async function flowsListCommand(
opts: { json?: boolean; status?: string },
runtime: RuntimeEnv,
) {
const statusFilter = opts.status?.trim();
const flows = listFlowRecords().filter((flow) => {
if (statusFilter && flow.status !== statusFilter) {
return false;
}
return true;
});
if (opts.json) {
runtime.log(
JSON.stringify(
{
count: flows.length,
status: statusFilter ?? null,
flows: flows.map((flow) => ({
...flow,
tasks: listTasksForFlowId(flow.flowId),
taskSummary: getFlowTaskSummary(flow.flowId),
})),
},
null,
2,
),
);
return;
}
runtime.log(info(`TaskFlows: ${flows.length}`));
runtime.log(info(`TaskFlow pressure: ${formatFlowListSummary(flows)}`));
if (statusFilter) {
runtime.log(info(`Status filter: ${statusFilter}`));
}
if (flows.length === 0) {
runtime.log("No TaskFlows found.");
return;
}
const rich = isRich();
for (const line of formatFlowRows(flows, rich)) {
runtime.log(line);
}
}
export async function flowsShowCommand(
opts: { json?: boolean; lookup: string },
runtime: RuntimeEnv,
) {
const flow = resolveFlowForLookupToken(opts.lookup);
if (!flow) {
runtime.error(`TaskFlow not found: ${opts.lookup}`);
runtime.exit(1);
return;
}
const tasks = listTasksForFlowId(flow.flowId);
const taskSummary = getFlowTaskSummary(flow.flowId);
if (opts.json) {
runtime.log(
JSON.stringify(
{
...flow,
tasks,
taskSummary,
},
null,
2,
),
);
return;
}
const lines = [
"TaskFlow:",
`flowId: ${flow.flowId}`,
`syncMode: ${flow.syncMode}`,
`status: ${flow.status}`,
`notify: ${flow.notifyPolicy}`,
`ownerKey: ${flow.ownerKey}`,
`controllerId: ${flow.controllerId ?? "n/a"}`,
`revision: ${flow.revision}`,
`goal: ${flow.goal}`,
`currentStep: ${flow.currentStep ?? "n/a"}`,
`blockedTaskId: ${flow.blockedTaskId ?? "n/a"}`,
`blockedSummary: ${flow.blockedSummary ?? "n/a"}`,
`wait: ${summarizeWait(flow)}`,
`cancelRequestedAt: ${
flow.cancelRequestedAt ? new Date(flow.cancelRequestedAt).toISOString() : "n/a"
}`,
`createdAt: ${new Date(flow.createdAt).toISOString()}`,
`updatedAt: ${new Date(flow.updatedAt).toISOString()}`,
`endedAt: ${flow.endedAt ? new Date(flow.endedAt).toISOString() : "n/a"}`,
`tasks: ${taskSummary.total} total · ${taskSummary.active} active · ${taskSummary.failures} issues`,
];
for (const line of lines) {
runtime.log(line);
}
if (tasks.length === 0) {
runtime.log("Linked tasks: none");
return;
}
runtime.log("Linked tasks:");
for (const task of tasks) {
runtime.log(
`- ${task.taskId} ${task.status} ${task.runId ?? "n/a"} ${task.label ?? task.task}`,
);
}
}
export async function flowsCancelCommand(opts: { lookup: string }, runtime: RuntimeEnv) {
const flow = resolveFlowForLookupToken(opts.lookup);
if (!flow) {
runtime.error(`Flow not found: ${opts.lookup}`);
runtime.exit(1);
return;
}
const result = await cancelFlowById({
cfg: loadConfig(),
flowId: flow.flowId,
});
if (!result.found) {
runtime.error(result.reason ?? `Flow not found: ${opts.lookup}`);
runtime.exit(1);
return;
}
if (!result.cancelled) {
runtime.error(result.reason ?? `Could not cancel TaskFlow: ${opts.lookup}`);
runtime.exit(1);
return;
}
const updated = getFlowById(flow.flowId) ?? result.flow ?? flow;
runtime.log(`Cancelled ${updated.flowId} (${updated.syncMode}) with status ${updated.status}.`);
}

View File

@ -0,0 +1,86 @@
import { afterEach, describe, expect, it } from "vitest";
import {
findLatestFlowForOwner,
getFlowByIdForOwner,
listFlowsForOwner,
resolveFlowForLookupTokenForOwner,
} from "./flow-owner-access.js";
import { createManagedFlow, resetFlowRegistryForTests } from "./flow-registry.js";
afterEach(() => {
resetFlowRegistryForTests({ persist: false });
});
describe("flow owner access", () => {
it("returns owner-scoped flows for direct and owner-key lookups", () => {
const older = createManagedFlow({
ownerKey: "agent:main:main",
controllerId: "tests/owner-access",
goal: "Older flow",
createdAt: 100,
updatedAt: 100,
});
const latest = createManagedFlow({
ownerKey: "agent:main:main",
controllerId: "tests/owner-access",
goal: "Latest flow",
createdAt: 200,
updatedAt: 200,
});
expect(
getFlowByIdForOwner({
flowId: older.flowId,
callerOwnerKey: "agent:main:main",
})?.flowId,
).toBe(older.flowId);
expect(
findLatestFlowForOwner({
callerOwnerKey: "agent:main:main",
})?.flowId,
).toBe(latest.flowId);
expect(
resolveFlowForLookupTokenForOwner({
token: "agent:main:main",
callerOwnerKey: "agent:main:main",
})?.flowId,
).toBe(latest.flowId);
expect(
listFlowsForOwner({
callerOwnerKey: "agent:main:main",
}).map((flow) => flow.flowId),
).toEqual([latest.flowId, older.flowId]);
});
it("denies cross-owner flow reads", () => {
const flow = createManagedFlow({
ownerKey: "agent:main:main",
controllerId: "tests/owner-access",
goal: "Hidden flow",
});
expect(
getFlowByIdForOwner({
flowId: flow.flowId,
callerOwnerKey: "agent:main:other",
}),
).toBeUndefined();
expect(
resolveFlowForLookupTokenForOwner({
token: flow.flowId,
callerOwnerKey: "agent:main:other",
}),
).toBeUndefined();
expect(
resolveFlowForLookupTokenForOwner({
token: "agent:main:main",
callerOwnerKey: "agent:main:other",
}),
).toBeUndefined();
expect(
listFlowsForOwner({
callerOwnerKey: "agent:main:other",
}),
).toEqual([]);
});
});

View File

@ -0,0 +1,48 @@
import { findLatestFlowForOwnerKey, getFlowById, listFlowsForOwnerKey } from "./flow-registry.js";
import type { FlowRecord } from "./flow-registry.types.js";
function normalizeOwnerKey(ownerKey?: string): string | undefined {
const trimmed = ownerKey?.trim();
return trimmed ? trimmed : undefined;
}
function canOwnerAccessFlow(flow: FlowRecord, callerOwnerKey: string): boolean {
return normalizeOwnerKey(flow.ownerKey) === normalizeOwnerKey(callerOwnerKey);
}
export function getFlowByIdForOwner(params: {
flowId: string;
callerOwnerKey: string;
}): FlowRecord | undefined {
const flow = getFlowById(params.flowId);
return flow && canOwnerAccessFlow(flow, params.callerOwnerKey) ? flow : undefined;
}
export function listFlowsForOwner(params: { callerOwnerKey: string }): FlowRecord[] {
const ownerKey = normalizeOwnerKey(params.callerOwnerKey);
return ownerKey ? listFlowsForOwnerKey(ownerKey) : [];
}
export function findLatestFlowForOwner(params: { callerOwnerKey: string }): FlowRecord | undefined {
const ownerKey = normalizeOwnerKey(params.callerOwnerKey);
return ownerKey ? findLatestFlowForOwnerKey(ownerKey) : undefined;
}
export function resolveFlowForLookupTokenForOwner(params: {
token: string;
callerOwnerKey: string;
}): FlowRecord | undefined {
const direct = getFlowByIdForOwner({
flowId: params.token,
callerOwnerKey: params.callerOwnerKey,
});
if (direct) {
return direct;
}
const normalizedToken = normalizeOwnerKey(params.token);
const normalizedCallerOwnerKey = normalizeOwnerKey(params.callerOwnerKey);
if (!normalizedToken || normalizedToken !== normalizedCallerOwnerKey) {
return undefined;
}
return findLatestFlowForOwner({ callerOwnerKey: normalizedCallerOwnerKey });
}

View File

@ -0,0 +1,39 @@
import fs from "node:fs/promises";
import path from "node:path";
import { describe, expect, it } from "vitest";
const TASK_ROOT = path.resolve(import.meta.dirname);
const SRC_ROOT = path.resolve(TASK_ROOT, "..");
const ALLOWED_IMPORTERS = new Set(["tasks/flow-owner-access.ts", "tasks/flow-runtime-internal.ts"]);
async function listSourceFiles(root: string): Promise<string[]> {
const entries = await fs.readdir(root, { withFileTypes: true });
const files: string[] = [];
for (const entry of entries) {
const fullPath = path.join(root, entry.name);
if (entry.isDirectory()) {
files.push(...(await listSourceFiles(fullPath)));
continue;
}
if (!entry.isFile() || !entry.name.endsWith(".ts") || entry.name.endsWith(".test.ts")) {
continue;
}
files.push(fullPath);
}
return files;
}
describe("flow registry import boundary", () => {
it("keeps direct flow-registry imports behind approved flow access seams", async () => {
const importers: string[] = [];
for (const file of await listSourceFiles(SRC_ROOT)) {
const relative = path.relative(SRC_ROOT, file).replaceAll(path.sep, "/");
const source = await fs.readFile(file, "utf8");
if (source.includes("flow-registry.js")) {
importers.push(relative);
}
}
expect(importers.toSorted()).toEqual([...ALLOWED_IMPORTERS].toSorted());
});
});

View File

@ -0,0 +1,10 @@
import path from "node:path";
import { resolveTaskStateDir } from "./task-registry.paths.js";
export function resolveFlowRegistryDir(env: NodeJS.ProcessEnv = process.env): string {
return path.join(resolveTaskStateDir(env), "flows");
}
export function resolveFlowRegistrySqlitePath(env: NodeJS.ProcessEnv = process.env): string {
return path.join(resolveFlowRegistryDir(env), "registry.sqlite");
}

View File

@ -0,0 +1,401 @@
import { chmodSync, existsSync, mkdirSync } from "node:fs";
import type { DatabaseSync, StatementSync } from "node:sqlite";
import { requireNodeSqlite } from "../infra/node-sqlite.js";
import type { DeliveryContext } from "../utils/delivery-context.js";
import { resolveFlowRegistryDir, resolveFlowRegistrySqlitePath } from "./flow-registry.paths.js";
import type { FlowRegistryStoreSnapshot } from "./flow-registry.store.js";
import type { FlowRecord, FlowSyncMode, JsonValue } from "./flow-registry.types.js";
type FlowRegistryRow = {
flow_id: string;
sync_mode: FlowSyncMode | null;
shape?: string | null;
owner_key: string;
requester_origin_json: string | null;
controller_id: string | null;
revision: number | bigint | null;
status: FlowRecord["status"];
notify_policy: FlowRecord["notifyPolicy"];
goal: string;
current_step: string | null;
blocked_task_id: string | null;
blocked_summary: string | null;
state_json: string | null;
wait_json: string | null;
cancel_requested_at: number | bigint | null;
created_at: number | bigint;
updated_at: number | bigint;
ended_at: number | bigint | null;
};
type FlowRegistryStatements = {
selectAll: StatementSync;
upsertRow: StatementSync;
deleteRow: StatementSync;
clearRows: StatementSync;
};
type FlowRegistryDatabase = {
db: DatabaseSync;
path: string;
statements: FlowRegistryStatements;
};
let cachedDatabase: FlowRegistryDatabase | null = null;
const FLOW_REGISTRY_DIR_MODE = 0o700;
const FLOW_REGISTRY_FILE_MODE = 0o600;
const FLOW_REGISTRY_SIDECAR_SUFFIXES = ["", "-shm", "-wal"] as const;
function normalizeNumber(value: number | bigint | null): number | undefined {
if (typeof value === "bigint") {
return Number(value);
}
return typeof value === "number" ? value : undefined;
}
function serializeJson(value: unknown): string | null {
return value === undefined ? null : JSON.stringify(value);
}
function parseJsonValue<T>(raw: string | null): T | undefined {
if (!raw?.trim()) {
return undefined;
}
try {
return JSON.parse(raw) as T;
} catch {
return undefined;
}
}
function rowToSyncMode(row: FlowRegistryRow): FlowSyncMode {
if (row.sync_mode === "task_mirrored" || row.sync_mode === "managed") {
return row.sync_mode;
}
return row.shape === "single_task" ? "task_mirrored" : "managed";
}
function rowToFlowRecord(row: FlowRegistryRow): FlowRecord {
const endedAt = normalizeNumber(row.ended_at);
const cancelRequestedAt = normalizeNumber(row.cancel_requested_at);
const requesterOrigin = parseJsonValue<DeliveryContext>(row.requester_origin_json);
const stateJson = parseJsonValue<JsonValue>(row.state_json);
const waitJson = parseJsonValue<JsonValue>(row.wait_json);
return {
flowId: row.flow_id,
syncMode: rowToSyncMode(row),
ownerKey: row.owner_key,
...(requesterOrigin ? { requesterOrigin } : {}),
...(row.controller_id ? { controllerId: row.controller_id } : {}),
revision: normalizeNumber(row.revision) ?? 0,
status: row.status,
notifyPolicy: row.notify_policy,
goal: row.goal,
...(row.current_step ? { currentStep: row.current_step } : {}),
...(row.blocked_task_id ? { blockedTaskId: row.blocked_task_id } : {}),
...(row.blocked_summary ? { blockedSummary: row.blocked_summary } : {}),
...(stateJson !== undefined ? { stateJson } : {}),
...(waitJson !== undefined ? { waitJson } : {}),
...(cancelRequestedAt != null ? { cancelRequestedAt } : {}),
createdAt: normalizeNumber(row.created_at) ?? 0,
updatedAt: normalizeNumber(row.updated_at) ?? 0,
...(endedAt != null ? { endedAt } : {}),
};
}
function bindFlowRecord(record: FlowRecord) {
return {
flow_id: record.flowId,
sync_mode: record.syncMode,
owner_key: record.ownerKey,
requester_origin_json: serializeJson(record.requesterOrigin),
controller_id: record.controllerId ?? null,
revision: record.revision,
status: record.status,
notify_policy: record.notifyPolicy,
goal: record.goal,
current_step: record.currentStep ?? null,
blocked_task_id: record.blockedTaskId ?? null,
blocked_summary: record.blockedSummary ?? null,
state_json: serializeJson(record.stateJson),
wait_json: serializeJson(record.waitJson),
cancel_requested_at: record.cancelRequestedAt ?? null,
created_at: record.createdAt,
updated_at: record.updatedAt,
ended_at: record.endedAt ?? null,
};
}
function createStatements(db: DatabaseSync): FlowRegistryStatements {
return {
selectAll: db.prepare(`
SELECT
flow_id,
sync_mode,
shape,
owner_key,
requester_origin_json,
controller_id,
revision,
status,
notify_policy,
goal,
current_step,
blocked_task_id,
blocked_summary,
state_json,
wait_json,
cancel_requested_at,
created_at,
updated_at,
ended_at
FROM flow_runs
ORDER BY created_at ASC, flow_id ASC
`),
upsertRow: db.prepare(`
INSERT INTO flow_runs (
flow_id,
sync_mode,
owner_key,
requester_origin_json,
controller_id,
revision,
status,
notify_policy,
goal,
current_step,
blocked_task_id,
blocked_summary,
state_json,
wait_json,
cancel_requested_at,
created_at,
updated_at,
ended_at
) VALUES (
@flow_id,
@sync_mode,
@owner_key,
@requester_origin_json,
@controller_id,
@revision,
@status,
@notify_policy,
@goal,
@current_step,
@blocked_task_id,
@blocked_summary,
@state_json,
@wait_json,
@cancel_requested_at,
@created_at,
@updated_at,
@ended_at
)
ON CONFLICT(flow_id) DO UPDATE SET
sync_mode = excluded.sync_mode,
owner_key = excluded.owner_key,
requester_origin_json = excluded.requester_origin_json,
controller_id = excluded.controller_id,
revision = excluded.revision,
status = excluded.status,
notify_policy = excluded.notify_policy,
goal = excluded.goal,
current_step = excluded.current_step,
blocked_task_id = excluded.blocked_task_id,
blocked_summary = excluded.blocked_summary,
state_json = excluded.state_json,
wait_json = excluded.wait_json,
cancel_requested_at = excluded.cancel_requested_at,
created_at = excluded.created_at,
updated_at = excluded.updated_at,
ended_at = excluded.ended_at
`),
deleteRow: db.prepare(`DELETE FROM flow_runs WHERE flow_id = ?`),
clearRows: db.prepare(`DELETE FROM flow_runs`),
};
}
function hasFlowRunsColumn(db: DatabaseSync, columnName: string): boolean {
const rows = db.prepare(`PRAGMA table_info(flow_runs)`).all() as Array<{ name?: string }>;
return rows.some((row) => row.name === columnName);
}
function ensureSchema(db: DatabaseSync) {
db.exec(`
CREATE TABLE IF NOT EXISTS flow_runs (
flow_id TEXT PRIMARY KEY,
shape TEXT,
sync_mode TEXT NOT NULL DEFAULT 'managed',
owner_key TEXT NOT NULL,
requester_origin_json TEXT,
controller_id TEXT,
revision INTEGER NOT NULL DEFAULT 0,
status TEXT NOT NULL,
notify_policy TEXT NOT NULL,
goal TEXT NOT NULL,
current_step TEXT,
blocked_task_id TEXT,
blocked_summary TEXT,
state_json TEXT,
wait_json TEXT,
cancel_requested_at INTEGER,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
ended_at INTEGER
);
`);
if (!hasFlowRunsColumn(db, "owner_key") && hasFlowRunsColumn(db, "owner_session_key")) {
db.exec(`ALTER TABLE flow_runs ADD COLUMN owner_key TEXT;`);
db.exec(`
UPDATE flow_runs
SET owner_key = owner_session_key
WHERE owner_key IS NULL
`);
}
if (!hasFlowRunsColumn(db, "shape")) {
db.exec(`ALTER TABLE flow_runs ADD COLUMN shape TEXT;`);
}
if (!hasFlowRunsColumn(db, "sync_mode")) {
db.exec(`ALTER TABLE flow_runs ADD COLUMN sync_mode TEXT;`);
if (hasFlowRunsColumn(db, "shape")) {
db.exec(`
UPDATE flow_runs
SET sync_mode = CASE
WHEN shape = 'single_task' THEN 'task_mirrored'
ELSE 'managed'
END
WHERE sync_mode IS NULL
`);
} else {
db.exec(`
UPDATE flow_runs
SET sync_mode = 'managed'
WHERE sync_mode IS NULL
`);
}
}
if (!hasFlowRunsColumn(db, "controller_id")) {
db.exec(`ALTER TABLE flow_runs ADD COLUMN controller_id TEXT;`);
}
db.exec(`
UPDATE flow_runs
SET controller_id = 'core/legacy-restored'
WHERE sync_mode = 'managed'
AND (controller_id IS NULL OR trim(controller_id) = '')
`);
if (!hasFlowRunsColumn(db, "revision")) {
db.exec(`ALTER TABLE flow_runs ADD COLUMN revision INTEGER;`);
db.exec(`
UPDATE flow_runs
SET revision = 0
WHERE revision IS NULL
`);
}
if (!hasFlowRunsColumn(db, "blocked_task_id")) {
db.exec(`ALTER TABLE flow_runs ADD COLUMN blocked_task_id TEXT;`);
}
if (!hasFlowRunsColumn(db, "blocked_summary")) {
db.exec(`ALTER TABLE flow_runs ADD COLUMN blocked_summary TEXT;`);
}
if (!hasFlowRunsColumn(db, "state_json")) {
db.exec(`ALTER TABLE flow_runs ADD COLUMN state_json TEXT;`);
}
if (!hasFlowRunsColumn(db, "wait_json")) {
db.exec(`ALTER TABLE flow_runs ADD COLUMN wait_json TEXT;`);
}
if (!hasFlowRunsColumn(db, "cancel_requested_at")) {
db.exec(`ALTER TABLE flow_runs ADD COLUMN cancel_requested_at INTEGER;`);
}
db.exec(`CREATE INDEX IF NOT EXISTS idx_flow_runs_status ON flow_runs(status);`);
db.exec(`CREATE INDEX IF NOT EXISTS idx_flow_runs_owner_key ON flow_runs(owner_key);`);
db.exec(`CREATE INDEX IF NOT EXISTS idx_flow_runs_updated_at ON flow_runs(updated_at);`);
}
function ensureFlowRegistryPermissions(pathname: string) {
const dir = resolveFlowRegistryDir(process.env);
mkdirSync(dir, { recursive: true, mode: FLOW_REGISTRY_DIR_MODE });
chmodSync(dir, FLOW_REGISTRY_DIR_MODE);
for (const suffix of FLOW_REGISTRY_SIDECAR_SUFFIXES) {
const candidate = `${pathname}${suffix}`;
if (!existsSync(candidate)) {
continue;
}
chmodSync(candidate, FLOW_REGISTRY_FILE_MODE);
}
}
function openFlowRegistryDatabase(): FlowRegistryDatabase {
const pathname = resolveFlowRegistrySqlitePath(process.env);
if (cachedDatabase && cachedDatabase.path === pathname) {
return cachedDatabase;
}
if (cachedDatabase) {
cachedDatabase.db.close();
cachedDatabase = null;
}
ensureFlowRegistryPermissions(pathname);
const { DatabaseSync } = requireNodeSqlite();
const db = new DatabaseSync(pathname);
db.exec(`PRAGMA journal_mode = WAL;`);
db.exec(`PRAGMA synchronous = NORMAL;`);
db.exec(`PRAGMA busy_timeout = 5000;`);
ensureSchema(db);
ensureFlowRegistryPermissions(pathname);
cachedDatabase = {
db,
path: pathname,
statements: createStatements(db),
};
return cachedDatabase;
}
function withWriteTransaction(write: (statements: FlowRegistryStatements) => void) {
const { db, path, statements } = openFlowRegistryDatabase();
db.exec("BEGIN IMMEDIATE");
try {
write(statements);
db.exec("COMMIT");
ensureFlowRegistryPermissions(path);
} catch (error) {
db.exec("ROLLBACK");
throw error;
}
}
export function loadFlowRegistryStateFromSqlite(): FlowRegistryStoreSnapshot {
const { statements } = openFlowRegistryDatabase();
const rows = statements.selectAll.all() as FlowRegistryRow[];
return {
flows: new Map(rows.map((row) => [row.flow_id, rowToFlowRecord(row)])),
};
}
export function saveFlowRegistryStateToSqlite(snapshot: FlowRegistryStoreSnapshot) {
withWriteTransaction((statements) => {
statements.clearRows.run();
for (const flow of snapshot.flows.values()) {
statements.upsertRow.run(bindFlowRecord(flow));
}
});
}
export function upsertFlowRegistryRecordToSqlite(flow: FlowRecord) {
const store = openFlowRegistryDatabase();
store.statements.upsertRow.run(bindFlowRecord(flow));
ensureFlowRegistryPermissions(store.path);
}
export function deleteFlowRegistryRecordFromSqlite(flowId: string) {
const store = openFlowRegistryDatabase();
store.statements.deleteRow.run(flowId);
ensureFlowRegistryPermissions(store.path);
}
export function closeFlowRegistrySqliteStore() {
if (!cachedDatabase) {
return;
}
cachedDatabase.db.close();
cachedDatabase = null;
}

View File

@ -0,0 +1,195 @@
import { statSync } from "node:fs";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { withTempDir } from "../test-helpers/temp-dir.js";
import {
createManagedFlow,
getFlowById,
requestFlowCancel,
resetFlowRegistryForTests,
setFlowWaiting,
} from "./flow-registry.js";
import { resolveFlowRegistryDir, resolveFlowRegistrySqlitePath } from "./flow-registry.paths.js";
import { configureFlowRegistryRuntime } from "./flow-registry.store.js";
import type { FlowRecord } from "./flow-registry.types.js";
function createStoredFlow(): FlowRecord {
return {
flowId: "flow-restored",
syncMode: "managed",
ownerKey: "agent:main:main",
controllerId: "tests/restored-controller",
revision: 4,
status: "blocked",
notifyPolicy: "done_only",
goal: "Restored flow",
currentStep: "spawn_task",
blockedTaskId: "task-restored",
blockedSummary: "Writable session required.",
stateJson: { lane: "triage", done: 3 },
waitJson: { kind: "task", taskId: "task-restored" },
cancelRequestedAt: 115,
createdAt: 100,
updatedAt: 120,
endedAt: 120,
};
}
async function withFlowRegistryTempDir<T>(run: (root: string) => Promise<T>): Promise<T> {
return await withTempDir({ prefix: "openclaw-flow-store-" }, async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetFlowRegistryForTests();
try {
return await run(root);
} finally {
resetFlowRegistryForTests();
}
});
}
describe("flow-registry store runtime", () => {
beforeEach(() => {
vi.useRealTimers();
});
afterEach(() => {
vi.useRealTimers();
delete process.env.OPENCLAW_STATE_DIR;
resetFlowRegistryForTests();
});
it("uses the configured flow store for restore and save", () => {
const storedFlow = createStoredFlow();
const loadSnapshot = vi.fn(() => ({
flows: new Map([[storedFlow.flowId, storedFlow]]),
}));
const saveSnapshot = vi.fn();
configureFlowRegistryRuntime({
store: {
loadSnapshot,
saveSnapshot,
},
});
expect(getFlowById("flow-restored")).toMatchObject({
flowId: "flow-restored",
syncMode: "managed",
controllerId: "tests/restored-controller",
revision: 4,
stateJson: { lane: "triage", done: 3 },
waitJson: { kind: "task", taskId: "task-restored" },
cancelRequestedAt: 115,
});
expect(loadSnapshot).toHaveBeenCalledTimes(1);
createManagedFlow({
ownerKey: "agent:main:main",
controllerId: "tests/new-flow",
goal: "New flow",
status: "running",
currentStep: "wait_for",
});
expect(saveSnapshot).toHaveBeenCalled();
const latestSnapshot = saveSnapshot.mock.calls.at(-1)?.[0] as {
flows: ReadonlyMap<string, FlowRecord>;
};
expect(latestSnapshot.flows.size).toBe(2);
expect(latestSnapshot.flows.get("flow-restored")?.goal).toBe("Restored flow");
});
it("restores persisted wait-state, revision, and cancel intent from sqlite", async () => {
await withFlowRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetFlowRegistryForTests();
const created = createManagedFlow({
ownerKey: "agent:main:main",
controllerId: "tests/persisted-flow",
goal: "Persisted flow",
status: "running",
currentStep: "spawn_task",
stateJson: { phase: "spawn" },
});
const waiting = setFlowWaiting({
flowId: created.flowId,
expectedRevision: created.revision,
currentStep: "ask_user",
stateJson: { phase: "ask_user" },
waitJson: { kind: "external_event", topic: "telegram" },
});
expect(waiting).toMatchObject({
applied: true,
});
const cancelRequested = requestFlowCancel({
flowId: created.flowId,
expectedRevision: waiting.applied ? waiting.flow.revision : -1,
cancelRequestedAt: 444,
});
expect(cancelRequested).toMatchObject({
applied: true,
});
resetFlowRegistryForTests({ persist: false });
expect(getFlowById(created.flowId)).toMatchObject({
flowId: created.flowId,
syncMode: "managed",
controllerId: "tests/persisted-flow",
revision: 2,
status: "waiting",
currentStep: "ask_user",
stateJson: { phase: "ask_user" },
waitJson: { kind: "external_event", topic: "telegram" },
cancelRequestedAt: 444,
});
});
});
it("round-trips explicit json null through sqlite", async () => {
await withFlowRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetFlowRegistryForTests();
const created = createManagedFlow({
ownerKey: "agent:main:main",
controllerId: "tests/null-roundtrip",
goal: "Persist null payloads",
stateJson: null,
waitJson: null,
});
resetFlowRegistryForTests({ persist: false });
expect(getFlowById(created.flowId)).toMatchObject({
flowId: created.flowId,
stateJson: null,
waitJson: null,
});
});
});
it("hardens the sqlite flow store directory and file modes", async () => {
if (process.platform === "win32") {
return;
}
await withFlowRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetFlowRegistryForTests();
createManagedFlow({
ownerKey: "agent:main:main",
controllerId: "tests/secured-flow",
goal: "Secured flow",
status: "blocked",
blockedTaskId: "task-secured",
blockedSummary: "Need auth.",
waitJson: { kind: "task", taskId: "task-secured" },
});
const registryDir = resolveFlowRegistryDir(process.env);
const sqlitePath = resolveFlowRegistrySqlitePath(process.env);
expect(statSync(registryDir).mode & 0o777).toBe(0o700);
expect(statSync(sqlitePath).mode & 0o777).toBe(0o600);
});
});
});

View File

@ -0,0 +1,78 @@
import {
closeFlowRegistrySqliteStore,
deleteFlowRegistryRecordFromSqlite,
loadFlowRegistryStateFromSqlite,
saveFlowRegistryStateToSqlite,
upsertFlowRegistryRecordToSqlite,
} from "./flow-registry.store.sqlite.js";
import type { FlowRecord } from "./flow-registry.types.js";
export type FlowRegistryStoreSnapshot = {
flows: Map<string, FlowRecord>;
};
export type FlowRegistryStore = {
loadSnapshot: () => FlowRegistryStoreSnapshot;
saveSnapshot: (snapshot: FlowRegistryStoreSnapshot) => void;
upsertFlow?: (flow: FlowRecord) => void;
deleteFlow?: (flowId: string) => void;
close?: () => void;
};
export type FlowRegistryHookEvent =
| {
kind: "restored";
flows: FlowRecord[];
}
| {
kind: "upserted";
flow: FlowRecord;
previous?: FlowRecord;
}
| {
kind: "deleted";
flowId: string;
previous: FlowRecord;
};
export type FlowRegistryHooks = {
// Hooks are incremental/observational. Snapshot persistence belongs to FlowRegistryStore.
onEvent?: (event: FlowRegistryHookEvent) => void;
};
const defaultFlowRegistryStore: FlowRegistryStore = {
loadSnapshot: loadFlowRegistryStateFromSqlite,
saveSnapshot: saveFlowRegistryStateToSqlite,
upsertFlow: upsertFlowRegistryRecordToSqlite,
deleteFlow: deleteFlowRegistryRecordFromSqlite,
close: closeFlowRegistrySqliteStore,
};
let configuredFlowRegistryStore: FlowRegistryStore = defaultFlowRegistryStore;
let configuredFlowRegistryHooks: FlowRegistryHooks | null = null;
export function getFlowRegistryStore(): FlowRegistryStore {
return configuredFlowRegistryStore;
}
export function getFlowRegistryHooks(): FlowRegistryHooks | null {
return configuredFlowRegistryHooks;
}
export function configureFlowRegistryRuntime(params: {
store?: FlowRegistryStore;
hooks?: FlowRegistryHooks | null;
}) {
if (params.store) {
configuredFlowRegistryStore = params.store;
}
if ("hooks" in params) {
configuredFlowRegistryHooks = params.hooks ?? null;
}
}
export function resetFlowRegistryRuntimeForTests() {
configuredFlowRegistryStore.close?.();
configuredFlowRegistryStore = defaultFlowRegistryStore;
configuredFlowRegistryHooks = null;
}

View File

@ -0,0 +1,371 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { withTempDir } from "../test-helpers/temp-dir.js";
import {
createFlowRecord,
createFlowForTask,
createManagedFlow,
deleteFlowRecordById,
failFlow,
getFlowById,
listFlowRecords,
requestFlowCancel,
resetFlowRegistryForTests,
resumeFlow,
setFlowWaiting,
syncFlowFromTask,
updateFlowRecordByIdExpectedRevision,
} from "./flow-registry.js";
import { configureFlowRegistryRuntime } from "./flow-registry.store.js";
const ORIGINAL_STATE_DIR = process.env.OPENCLAW_STATE_DIR;
async function withFlowRegistryTempDir<T>(run: (root: string) => Promise<T>): Promise<T> {
return await withTempDir({ prefix: "openclaw-flow-registry-" }, async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetFlowRegistryForTests();
try {
return await run(root);
} finally {
resetFlowRegistryForTests();
}
});
}
describe("flow-registry", () => {
beforeEach(() => {
vi.useRealTimers();
});
afterEach(() => {
vi.useRealTimers();
if (ORIGINAL_STATE_DIR === undefined) {
delete process.env.OPENCLAW_STATE_DIR;
} else {
process.env.OPENCLAW_STATE_DIR = ORIGINAL_STATE_DIR;
}
resetFlowRegistryForTests();
});
it("creates managed flows and updates them through revision-checked helpers", async () => {
await withFlowRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetFlowRegistryForTests();
const created = createManagedFlow({
ownerKey: "agent:main:main",
controllerId: "tests/managed-controller",
goal: "Investigate flaky test",
currentStep: "spawn_task",
stateJson: { phase: "spawn" },
});
expect(created).toMatchObject({
flowId: created.flowId,
syncMode: "managed",
controllerId: "tests/managed-controller",
revision: 0,
status: "queued",
currentStep: "spawn_task",
stateJson: { phase: "spawn" },
});
const waiting = setFlowWaiting({
flowId: created.flowId,
expectedRevision: created.revision,
currentStep: "await_review",
stateJson: { phase: "await_review" },
waitJson: { kind: "task", taskId: "task-123" },
});
expect(waiting).toMatchObject({
applied: true,
flow: expect.objectContaining({
flowId: created.flowId,
revision: 1,
status: "waiting",
currentStep: "await_review",
waitJson: { kind: "task", taskId: "task-123" },
}),
});
const conflict = updateFlowRecordByIdExpectedRevision({
flowId: created.flowId,
expectedRevision: 0,
patch: {
currentStep: "stale",
},
});
expect(conflict).toMatchObject({
applied: false,
reason: "revision_conflict",
current: expect.objectContaining({
flowId: created.flowId,
revision: 1,
}),
});
const resumed = resumeFlow({
flowId: created.flowId,
expectedRevision: 1,
status: "running",
currentStep: "resume_work",
});
expect(resumed).toMatchObject({
applied: true,
flow: expect.objectContaining({
flowId: created.flowId,
revision: 2,
status: "running",
currentStep: "resume_work",
waitJson: null,
}),
});
const cancelRequested = requestFlowCancel({
flowId: created.flowId,
expectedRevision: 2,
cancelRequestedAt: 400,
});
expect(cancelRequested).toMatchObject({
applied: true,
flow: expect.objectContaining({
flowId: created.flowId,
revision: 3,
cancelRequestedAt: 400,
}),
});
const failed = failFlow({
flowId: created.flowId,
expectedRevision: 3,
blockedSummary: "Task runner failed.",
endedAt: 500,
});
expect(failed).toMatchObject({
applied: true,
flow: expect.objectContaining({
flowId: created.flowId,
revision: 4,
status: "failed",
blockedSummary: "Task runner failed.",
endedAt: 500,
}),
});
expect(listFlowRecords()).toEqual([
expect.objectContaining({
flowId: created.flowId,
revision: 4,
cancelRequestedAt: 400,
}),
]);
expect(deleteFlowRecordById(created.flowId)).toBe(true);
expect(getFlowById(created.flowId)).toBeUndefined();
});
});
it("requires a controller for managed flows and rejects clearing it later", async () => {
await withFlowRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetFlowRegistryForTests();
expect(() =>
createFlowRecord({
ownerKey: "agent:main:main",
goal: "Missing controller",
}),
).toThrow("Managed flow controllerId is required.");
const created = createManagedFlow({
ownerKey: "agent:main:main",
controllerId: "tests/managed-controller",
goal: "Protected controller",
});
expect(() =>
updateFlowRecordByIdExpectedRevision({
flowId: created.flowId,
expectedRevision: created.revision,
patch: {
controllerId: null,
},
}),
).toThrow("Managed flow controllerId is required.");
});
});
it("emits restored, upserted, and deleted flow hook events", () => {
const onEvent = vi.fn();
configureFlowRegistryRuntime({
store: {
loadSnapshot: () => ({
flows: new Map(),
}),
saveSnapshot: () => {},
},
hooks: {
onEvent,
},
});
const created = createManagedFlow({
ownerKey: "agent:main:main",
controllerId: "tests/hooks",
goal: "Observe hooks",
});
deleteFlowRecordById(created.flowId);
expect(onEvent).toHaveBeenCalledWith({
kind: "restored",
flows: [],
});
expect(onEvent).toHaveBeenCalledWith(
expect.objectContaining({
kind: "upserted",
flow: expect.objectContaining({
flowId: created.flowId,
}),
}),
);
expect(onEvent).toHaveBeenCalledWith(
expect.objectContaining({
kind: "deleted",
flowId: created.flowId,
}),
);
});
it("normalizes restored managed flows without a controller id", () => {
configureFlowRegistryRuntime({
store: {
loadSnapshot: () => ({
flows: new Map([
[
"legacy-managed",
{
flowId: "legacy-managed",
syncMode: "managed",
ownerKey: "agent:main:main",
revision: 0,
status: "queued",
notifyPolicy: "done_only",
goal: "Legacy managed flow",
createdAt: 10,
updatedAt: 10,
},
],
]),
}),
saveSnapshot: () => {},
},
});
expect(getFlowById("legacy-managed")).toMatchObject({
flowId: "legacy-managed",
syncMode: "managed",
controllerId: "core/legacy-restored",
});
});
it("mirrors one-task flow state from tasks and leaves managed flows alone", async () => {
await withFlowRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetFlowRegistryForTests();
const mirrored = createFlowForTask({
task: {
ownerKey: "agent:main:main",
taskId: "task-running",
notifyPolicy: "done_only",
status: "running",
label: "Fix permissions",
task: "Fix permissions",
createdAt: 100,
lastEventAt: 100,
},
});
const blocked = syncFlowFromTask({
taskId: "task-blocked",
parentFlowId: mirrored.flowId,
status: "succeeded",
terminalOutcome: "blocked",
notifyPolicy: "done_only",
label: "Fix permissions",
task: "Fix permissions",
lastEventAt: 200,
endedAt: 200,
terminalSummary: "Writable session required.",
});
expect(blocked).toMatchObject({
flowId: mirrored.flowId,
syncMode: "task_mirrored",
status: "blocked",
blockedTaskId: "task-blocked",
blockedSummary: "Writable session required.",
});
const managed = createManagedFlow({
ownerKey: "agent:main:main",
controllerId: "tests/managed",
goal: "Cluster PRs",
currentStep: "wait_for",
status: "waiting",
waitJson: { kind: "external_event" },
});
const syncedManaged = syncFlowFromTask({
taskId: "task-child",
parentFlowId: managed.flowId,
status: "running",
notifyPolicy: "done_only",
label: "Child task",
task: "Child task",
lastEventAt: 250,
progressSummary: "Running child task",
});
expect(syncedManaged).toMatchObject({
flowId: managed.flowId,
syncMode: "managed",
status: "waiting",
currentStep: "wait_for",
waitJson: { kind: "external_event" },
});
});
});
it("preserves explicit json null in state and wait payloads", async () => {
await withFlowRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetFlowRegistryForTests();
const created = createManagedFlow({
ownerKey: "agent:main:main",
controllerId: "tests/null-state",
goal: "Null payloads",
stateJson: null,
waitJson: null,
});
expect(created).toMatchObject({
flowId: created.flowId,
stateJson: null,
waitJson: null,
});
const resumed = resumeFlow({
flowId: created.flowId,
expectedRevision: created.revision,
stateJson: null,
});
expect(resumed).toMatchObject({
applied: true,
flow: expect.objectContaining({
flowId: created.flowId,
stateJson: null,
}),
});
});
});
});

690
src/tasks/flow-registry.ts Normal file
View File

@ -0,0 +1,690 @@
import crypto from "node:crypto";
import {
getFlowRegistryHooks,
getFlowRegistryStore,
resetFlowRegistryRuntimeForTests,
type FlowRegistryHookEvent,
} from "./flow-registry.store.js";
import type { FlowRecord, FlowStatus, FlowSyncMode, JsonValue } from "./flow-registry.types.js";
import type { TaskNotifyPolicy, TaskRecord } from "./task-registry.types.js";
const flows = new Map<string, FlowRecord>();
let restoreAttempted = false;
type FlowRecordPatch = Omit<
Partial<
Pick<
FlowRecord,
| "status"
| "notifyPolicy"
| "goal"
| "currentStep"
| "blockedTaskId"
| "blockedSummary"
| "controllerId"
| "stateJson"
| "waitJson"
| "cancelRequestedAt"
| "updatedAt"
| "endedAt"
>
>,
| "currentStep"
| "blockedTaskId"
| "blockedSummary"
| "controllerId"
| "stateJson"
| "waitJson"
| "cancelRequestedAt"
| "endedAt"
> & {
currentStep?: string | null;
blockedTaskId?: string | null;
blockedSummary?: string | null;
controllerId?: string | null;
stateJson?: JsonValue | null;
waitJson?: JsonValue | null;
cancelRequestedAt?: number | null;
endedAt?: number | null;
};
export type FlowUpdateResult =
| {
applied: true;
flow: FlowRecord;
}
| {
applied: false;
reason: "not_found" | "revision_conflict";
current?: FlowRecord;
};
function cloneStructuredValue<T>(value: T | undefined): T | undefined {
if (value === undefined) {
return undefined;
}
return structuredClone(value);
}
function cloneFlowRecord(record: FlowRecord): FlowRecord {
return {
...record,
...(record.requesterOrigin
? { requesterOrigin: cloneStructuredValue(record.requesterOrigin)! }
: {}),
...(record.stateJson !== undefined
? { stateJson: cloneStructuredValue(record.stateJson)! }
: {}),
...(record.waitJson !== undefined ? { waitJson: cloneStructuredValue(record.waitJson)! } : {}),
};
}
function normalizeRestoredFlowRecord(record: FlowRecord): FlowRecord {
const syncMode = record.syncMode === "task_mirrored" ? "task_mirrored" : "managed";
const controllerId =
syncMode === "managed"
? (normalizeText(record.controllerId) ?? "core/legacy-restored")
: undefined;
return {
...record,
syncMode,
ownerKey: assertFlowOwnerKey(record.ownerKey),
...(record.requesterOrigin
? { requesterOrigin: cloneStructuredValue(record.requesterOrigin)! }
: {}),
...(controllerId ? { controllerId } : {}),
currentStep: normalizeText(record.currentStep),
blockedTaskId: normalizeText(record.blockedTaskId),
blockedSummary: normalizeText(record.blockedSummary),
...(record.stateJson !== undefined
? { stateJson: cloneStructuredValue(record.stateJson)! }
: {}),
...(record.waitJson !== undefined ? { waitJson: cloneStructuredValue(record.waitJson)! } : {}),
revision: Math.max(0, record.revision),
cancelRequestedAt: record.cancelRequestedAt ?? undefined,
endedAt: record.endedAt ?? undefined,
};
}
function snapshotFlowRecords(source: ReadonlyMap<string, FlowRecord>): FlowRecord[] {
return [...source.values()].map((record) => cloneFlowRecord(record));
}
function emitFlowRegistryHookEvent(createEvent: () => FlowRegistryHookEvent): void {
const hooks = getFlowRegistryHooks();
if (!hooks?.onEvent) {
return;
}
try {
hooks.onEvent(createEvent());
} catch {
// Flow hooks are observational. They must not break registry writes.
}
}
function ensureNotifyPolicy(notifyPolicy?: TaskNotifyPolicy): TaskNotifyPolicy {
return notifyPolicy ?? "done_only";
}
function normalizeOwnerKey(ownerKey?: string): string | undefined {
const trimmed = ownerKey?.trim();
return trimmed ? trimmed : undefined;
}
function normalizeText(value?: string | null): string | undefined {
const trimmed = value?.trim();
return trimmed ? trimmed : undefined;
}
function normalizeJsonBlob(value: JsonValue | null | undefined): JsonValue | undefined {
return value === undefined ? undefined : cloneStructuredValue(value);
}
function assertFlowOwnerKey(ownerKey: string): string {
const normalized = normalizeOwnerKey(ownerKey);
if (!normalized) {
throw new Error("Flow ownerKey is required.");
}
return normalized;
}
function assertControllerId(controllerId?: string | null): string {
const normalized = normalizeText(controllerId);
if (!normalized) {
throw new Error("Managed flow controllerId is required.");
}
return normalized;
}
function resolveFlowGoal(task: Pick<TaskRecord, "label" | "task">): string {
return task.label?.trim() || task.task.trim() || "Background task";
}
function resolveFlowBlockedSummary(
task: Pick<TaskRecord, "status" | "terminalOutcome" | "terminalSummary" | "progressSummary">,
): string | undefined {
if (task.status !== "succeeded" || task.terminalOutcome !== "blocked") {
return undefined;
}
return task.terminalSummary?.trim() || task.progressSummary?.trim() || undefined;
}
export function deriveFlowStatusFromTask(
task: Pick<TaskRecord, "status" | "terminalOutcome">,
): FlowStatus {
if (task.status === "queued") {
return "queued";
}
if (task.status === "running") {
return "running";
}
if (task.status === "succeeded") {
return task.terminalOutcome === "blocked" ? "blocked" : "succeeded";
}
if (task.status === "cancelled") {
return "cancelled";
}
if (task.status === "lost") {
return "lost";
}
return "failed";
}
function ensureFlowRegistryReady() {
if (restoreAttempted) {
return;
}
restoreAttempted = true;
const restored = getFlowRegistryStore().loadSnapshot();
flows.clear();
for (const [flowId, flow] of restored.flows) {
flows.set(flowId, normalizeRestoredFlowRecord(flow));
}
emitFlowRegistryHookEvent(() => ({
kind: "restored",
flows: snapshotFlowRecords(flows),
}));
}
function persistFlowRegistry() {
getFlowRegistryStore().saveSnapshot({
flows: new Map(snapshotFlowRecords(flows).map((flow) => [flow.flowId, flow])),
});
}
function persistFlowUpsert(flow: FlowRecord) {
const store = getFlowRegistryStore();
if (store.upsertFlow) {
store.upsertFlow(cloneFlowRecord(flow));
return;
}
persistFlowRegistry();
}
function persistFlowDelete(flowId: string) {
const store = getFlowRegistryStore();
if (store.deleteFlow) {
store.deleteFlow(flowId);
return;
}
persistFlowRegistry();
}
function buildFlowRecord(params: {
syncMode?: FlowSyncMode;
ownerKey: string;
requesterOrigin?: FlowRecord["requesterOrigin"];
controllerId?: string | null;
revision?: number;
status?: FlowStatus;
notifyPolicy?: TaskNotifyPolicy;
goal: string;
currentStep?: string | null;
blockedTaskId?: string | null;
blockedSummary?: string | null;
stateJson?: JsonValue | null;
waitJson?: JsonValue | null;
cancelRequestedAt?: number | null;
createdAt?: number;
updatedAt?: number;
endedAt?: number | null;
}): FlowRecord {
const now = params.createdAt ?? Date.now();
const syncMode = params.syncMode ?? "managed";
const controllerId = syncMode === "managed" ? assertControllerId(params.controllerId) : undefined;
return {
flowId: crypto.randomUUID(),
syncMode,
ownerKey: assertFlowOwnerKey(params.ownerKey),
...(params.requesterOrigin
? { requesterOrigin: cloneStructuredValue(params.requesterOrigin)! }
: {}),
...(controllerId ? { controllerId } : {}),
revision: Math.max(0, params.revision ?? 0),
status: params.status ?? "queued",
notifyPolicy: ensureNotifyPolicy(params.notifyPolicy),
goal: params.goal,
currentStep: normalizeText(params.currentStep),
blockedTaskId: normalizeText(params.blockedTaskId),
blockedSummary: normalizeText(params.blockedSummary),
...(normalizeJsonBlob(params.stateJson) !== undefined
? { stateJson: normalizeJsonBlob(params.stateJson)! }
: {}),
...(normalizeJsonBlob(params.waitJson) !== undefined
? { waitJson: normalizeJsonBlob(params.waitJson)! }
: {}),
...(params.cancelRequestedAt != null ? { cancelRequestedAt: params.cancelRequestedAt } : {}),
createdAt: now,
updatedAt: params.updatedAt ?? now,
...(params.endedAt != null ? { endedAt: params.endedAt } : {}),
};
}
function applyFlowPatch(current: FlowRecord, patch: FlowRecordPatch): FlowRecord {
const controllerId =
patch.controllerId === undefined ? current.controllerId : normalizeText(patch.controllerId);
if (current.syncMode === "managed") {
assertControllerId(controllerId);
}
return {
...current,
...(patch.status ? { status: patch.status } : {}),
...(patch.notifyPolicy ? { notifyPolicy: patch.notifyPolicy } : {}),
...(patch.goal ? { goal: patch.goal } : {}),
controllerId,
currentStep:
patch.currentStep === undefined ? current.currentStep : normalizeText(patch.currentStep),
blockedTaskId:
patch.blockedTaskId === undefined
? current.blockedTaskId
: normalizeText(patch.blockedTaskId),
blockedSummary:
patch.blockedSummary === undefined
? current.blockedSummary
: normalizeText(patch.blockedSummary),
stateJson:
patch.stateJson === undefined ? current.stateJson : normalizeJsonBlob(patch.stateJson),
waitJson: patch.waitJson === undefined ? current.waitJson : normalizeJsonBlob(patch.waitJson),
cancelRequestedAt:
patch.cancelRequestedAt === undefined
? current.cancelRequestedAt
: (patch.cancelRequestedAt ?? undefined),
revision: current.revision + 1,
updatedAt: patch.updatedAt ?? Date.now(),
endedAt: patch.endedAt === undefined ? current.endedAt : (patch.endedAt ?? undefined),
};
}
function writeFlowRecord(next: FlowRecord, previous?: FlowRecord): FlowRecord {
flows.set(next.flowId, next);
persistFlowUpsert(next);
emitFlowRegistryHookEvent(() => ({
kind: "upserted",
flow: cloneFlowRecord(next),
...(previous ? { previous: cloneFlowRecord(previous) } : {}),
}));
return cloneFlowRecord(next);
}
export function createFlowRecord(params: {
syncMode?: FlowSyncMode;
ownerKey: string;
requesterOrigin?: FlowRecord["requesterOrigin"];
controllerId?: string | null;
revision?: number;
status?: FlowStatus;
notifyPolicy?: TaskNotifyPolicy;
goal: string;
currentStep?: string | null;
blockedTaskId?: string | null;
blockedSummary?: string | null;
stateJson?: JsonValue | null;
waitJson?: JsonValue | null;
cancelRequestedAt?: number | null;
createdAt?: number;
updatedAt?: number;
endedAt?: number | null;
}): FlowRecord {
ensureFlowRegistryReady();
const record = buildFlowRecord(params);
return writeFlowRecord(record);
}
export function createManagedFlow(params: {
ownerKey: string;
controllerId: string;
requesterOrigin?: FlowRecord["requesterOrigin"];
status?: FlowStatus;
notifyPolicy?: TaskNotifyPolicy;
goal: string;
currentStep?: string | null;
blockedTaskId?: string | null;
blockedSummary?: string | null;
stateJson?: JsonValue | null;
waitJson?: JsonValue | null;
cancelRequestedAt?: number | null;
createdAt?: number;
updatedAt?: number;
endedAt?: number | null;
}): FlowRecord {
return createFlowRecord({
...params,
syncMode: "managed",
controllerId: assertControllerId(params.controllerId),
});
}
export function createFlowForTask(params: {
task: Pick<
TaskRecord,
| "ownerKey"
| "taskId"
| "notifyPolicy"
| "status"
| "terminalOutcome"
| "label"
| "task"
| "createdAt"
| "lastEventAt"
| "endedAt"
| "terminalSummary"
| "progressSummary"
>;
requesterOrigin?: FlowRecord["requesterOrigin"];
}): FlowRecord {
const terminalFlowStatus = deriveFlowStatusFromTask(params.task);
const isTerminal =
terminalFlowStatus === "succeeded" ||
terminalFlowStatus === "blocked" ||
terminalFlowStatus === "failed" ||
terminalFlowStatus === "cancelled" ||
terminalFlowStatus === "lost";
const endedAt = isTerminal
? (params.task.endedAt ?? params.task.lastEventAt ?? params.task.createdAt)
: undefined;
return createFlowRecord({
syncMode: "task_mirrored",
ownerKey: params.task.ownerKey,
requesterOrigin: params.requesterOrigin,
status: terminalFlowStatus,
notifyPolicy: params.task.notifyPolicy,
goal: resolveFlowGoal(params.task),
blockedTaskId:
terminalFlowStatus === "blocked" ? params.task.taskId.trim() || undefined : undefined,
blockedSummary: resolveFlowBlockedSummary(params.task),
createdAt: params.task.createdAt,
updatedAt: params.task.lastEventAt ?? params.task.createdAt,
...(endedAt !== undefined ? { endedAt } : {}),
});
}
function updateFlowRecordByIdUnchecked(flowId: string, patch: FlowRecordPatch): FlowRecord | null {
ensureFlowRegistryReady();
const current = flows.get(flowId);
if (!current) {
return null;
}
return writeFlowRecord(applyFlowPatch(current, patch), current);
}
export function updateFlowRecordByIdExpectedRevision(params: {
flowId: string;
expectedRevision: number;
patch: FlowRecordPatch;
}): FlowUpdateResult {
ensureFlowRegistryReady();
const current = flows.get(params.flowId);
if (!current) {
return {
applied: false,
reason: "not_found",
};
}
if (current.revision !== params.expectedRevision) {
return {
applied: false,
reason: "revision_conflict",
current: cloneFlowRecord(current),
};
}
return {
applied: true,
flow: writeFlowRecord(applyFlowPatch(current, params.patch), current),
};
}
export function setFlowWaiting(params: {
flowId: string;
expectedRevision: number;
currentStep?: string | null;
stateJson?: JsonValue | null;
waitJson?: JsonValue | null;
blockedTaskId?: string | null;
blockedSummary?: string | null;
updatedAt?: number;
}): FlowUpdateResult {
return updateFlowRecordByIdExpectedRevision({
flowId: params.flowId,
expectedRevision: params.expectedRevision,
patch: {
status:
normalizeText(params.blockedTaskId) || normalizeText(params.blockedSummary)
? "blocked"
: "waiting",
currentStep: params.currentStep,
stateJson: params.stateJson,
waitJson: params.waitJson,
blockedTaskId: params.blockedTaskId,
blockedSummary: params.blockedSummary,
endedAt: null,
updatedAt: params.updatedAt,
},
});
}
export function resumeFlow(params: {
flowId: string;
expectedRevision: number;
status?: Extract<FlowStatus, "queued" | "running">;
currentStep?: string | null;
stateJson?: JsonValue | null;
updatedAt?: number;
}): FlowUpdateResult {
return updateFlowRecordByIdExpectedRevision({
flowId: params.flowId,
expectedRevision: params.expectedRevision,
patch: {
status: params.status ?? "queued",
currentStep: params.currentStep,
stateJson: params.stateJson,
waitJson: null,
blockedTaskId: null,
blockedSummary: null,
endedAt: null,
updatedAt: params.updatedAt,
},
});
}
export function finishFlow(params: {
flowId: string;
expectedRevision: number;
currentStep?: string | null;
stateJson?: JsonValue | null;
updatedAt?: number;
endedAt?: number;
}): FlowUpdateResult {
const endedAt = params.endedAt ?? params.updatedAt ?? Date.now();
return updateFlowRecordByIdExpectedRevision({
flowId: params.flowId,
expectedRevision: params.expectedRevision,
patch: {
status: "succeeded",
currentStep: params.currentStep,
stateJson: params.stateJson,
waitJson: null,
blockedTaskId: null,
blockedSummary: null,
endedAt,
updatedAt: params.updatedAt ?? endedAt,
},
});
}
export function failFlow(params: {
flowId: string;
expectedRevision: number;
currentStep?: string | null;
stateJson?: JsonValue | null;
blockedTaskId?: string | null;
blockedSummary?: string | null;
updatedAt?: number;
endedAt?: number;
}): FlowUpdateResult {
const endedAt = params.endedAt ?? params.updatedAt ?? Date.now();
return updateFlowRecordByIdExpectedRevision({
flowId: params.flowId,
expectedRevision: params.expectedRevision,
patch: {
status: "failed",
currentStep: params.currentStep,
stateJson: params.stateJson,
waitJson: null,
blockedTaskId: params.blockedTaskId,
blockedSummary: params.blockedSummary,
endedAt,
updatedAt: params.updatedAt ?? endedAt,
},
});
}
export function requestFlowCancel(params: {
flowId: string;
expectedRevision: number;
cancelRequestedAt?: number;
updatedAt?: number;
}): FlowUpdateResult {
return updateFlowRecordByIdExpectedRevision({
flowId: params.flowId,
expectedRevision: params.expectedRevision,
patch: {
cancelRequestedAt: params.cancelRequestedAt ?? params.updatedAt ?? Date.now(),
updatedAt: params.updatedAt,
},
});
}
export function syncFlowFromTask(
task: Pick<
TaskRecord,
| "parentFlowId"
| "status"
| "terminalOutcome"
| "notifyPolicy"
| "label"
| "task"
| "lastEventAt"
| "endedAt"
| "taskId"
| "terminalSummary"
| "progressSummary"
>,
): FlowRecord | null {
const flowId = task.parentFlowId?.trim();
if (!flowId) {
return null;
}
const flow = getFlowById(flowId);
if (!flow) {
return null;
}
if (flow.syncMode !== "task_mirrored") {
return flow;
}
const terminalFlowStatus = deriveFlowStatusFromTask(task);
const isTerminal =
terminalFlowStatus === "succeeded" ||
terminalFlowStatus === "blocked" ||
terminalFlowStatus === "failed" ||
terminalFlowStatus === "cancelled" ||
terminalFlowStatus === "lost";
return updateFlowRecordByIdUnchecked(flowId, {
status: terminalFlowStatus,
notifyPolicy: task.notifyPolicy,
goal: resolveFlowGoal(task),
blockedTaskId: terminalFlowStatus === "blocked" ? task.taskId.trim() || null : null,
blockedSummary:
terminalFlowStatus === "blocked" ? (resolveFlowBlockedSummary(task) ?? null) : null,
waitJson: null,
updatedAt: task.lastEventAt ?? Date.now(),
...(isTerminal
? {
endedAt: task.endedAt ?? task.lastEventAt ?? Date.now(),
}
: { endedAt: null }),
});
}
export function getFlowById(flowId: string): FlowRecord | undefined {
ensureFlowRegistryReady();
const flow = flows.get(flowId);
return flow ? cloneFlowRecord(flow) : undefined;
}
export function listFlowsForOwnerKey(ownerKey: string): FlowRecord[] {
ensureFlowRegistryReady();
const normalizedOwnerKey = ownerKey.trim();
if (!normalizedOwnerKey) {
return [];
}
return [...flows.values()]
.filter((flow) => flow.ownerKey.trim() === normalizedOwnerKey)
.map((flow) => cloneFlowRecord(flow))
.toSorted((left, right) => right.createdAt - left.createdAt);
}
export function findLatestFlowForOwnerKey(ownerKey: string): FlowRecord | undefined {
const flow = listFlowsForOwnerKey(ownerKey)[0];
return flow ? cloneFlowRecord(flow) : undefined;
}
export function resolveFlowForLookupToken(token: string): FlowRecord | undefined {
const lookup = token.trim();
if (!lookup) {
return undefined;
}
return getFlowById(lookup) ?? findLatestFlowForOwnerKey(lookup);
}
export function listFlowRecords(): FlowRecord[] {
ensureFlowRegistryReady();
return [...flows.values()]
.map((flow) => cloneFlowRecord(flow))
.toSorted((left, right) => right.createdAt - left.createdAt);
}
export function deleteFlowRecordById(flowId: string): boolean {
ensureFlowRegistryReady();
const current = flows.get(flowId);
if (!current) {
return false;
}
flows.delete(flowId);
persistFlowDelete(flowId);
emitFlowRegistryHookEvent(() => ({
kind: "deleted",
flowId,
previous: cloneFlowRecord(current),
}));
return true;
}
export function resetFlowRegistryForTests(opts?: { persist?: boolean }) {
flows.clear();
restoreAttempted = false;
resetFlowRegistryRuntimeForTests();
if (opts?.persist !== false) {
persistFlowRegistry();
getFlowRegistryStore().close?.();
}
}

View File

@ -0,0 +1,43 @@
import type { DeliveryContext } from "../utils/delivery-context.js";
import type { TaskNotifyPolicy } from "./task-registry.types.js";
export type JsonValue =
| null
| boolean
| number
| string
| JsonValue[]
| { [key: string]: JsonValue };
export type FlowSyncMode = "task_mirrored" | "managed";
export type FlowStatus =
| "queued"
| "running"
| "waiting"
| "blocked"
| "succeeded"
| "failed"
| "cancelled"
| "lost";
export type FlowRecord = {
flowId: string;
syncMode: FlowSyncMode;
ownerKey: string;
requesterOrigin?: DeliveryContext;
controllerId?: string;
revision: number;
status: FlowStatus;
notifyPolicy: TaskNotifyPolicy;
goal: string;
currentStep?: string;
blockedTaskId?: string;
blockedSummary?: string;
stateJson?: JsonValue;
waitJson?: JsonValue;
cancelRequestedAt?: number;
createdAt: number;
updatedAt: number;
endedAt?: number;
};

View File

@ -0,0 +1,17 @@
export {
createFlowForTask,
createFlowRecord,
createManagedFlow,
deleteFlowRecordById,
findLatestFlowForOwnerKey,
getFlowById,
listFlowRecords,
listFlowsForOwnerKey,
requestFlowCancel,
resolveFlowForLookupToken,
resetFlowRegistryForTests,
resumeFlow,
setFlowWaiting,
syncFlowFromTask,
updateFlowRecordByIdExpectedRevision,
} from "./flow-registry.js";

View File

@ -4,14 +4,17 @@ export {
deleteTaskRecordById,
ensureTaskRegistryReady,
findLatestTaskForOwnerKey,
findLatestTaskForFlowId,
findLatestTaskForRelatedSessionKey,
findTaskByRunId,
getTaskById,
getTaskRegistrySnapshot,
getTaskRegistrySummary,
listTaskRecords,
listTasksForFlowId,
listTasksForOwnerKey,
listTasksForRelatedSessionKey,
linkTaskToFlowById,
markTaskLostById,
markTaskRunningByRunId,
markTaskTerminalById,

View File

@ -1,16 +1,30 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import { withTempDir } from "../test-helpers/temp-dir.js";
import {
createManagedFlow,
getFlowById,
listFlowRecords,
resetFlowRegistryForTests,
} from "./flow-registry.js";
import {
cancelFlowById,
cancelFlowByIdForOwner,
cancelDetachedTaskRunById,
completeTaskRunByRunId,
createQueuedTaskRun,
createRunningTaskRun,
failTaskRunByRunId,
recordTaskRunProgressByRunId,
retryBlockedFlowAsQueuedTaskRun,
setDetachedTaskDeliveryStatusByRunId,
startTaskRunByRunId,
} from "./task-executor.js";
import { getTaskById, resetTaskRegistryForTests } from "./task-registry.js";
import {
getTaskById,
findLatestTaskForFlowId,
findTaskByRunId,
resetTaskRegistryForTests,
} from "./task-registry.js";
const ORIGINAL_STATE_DIR = process.env.OPENCLAW_STATE_DIR;
const hoisted = vi.hoisted(() => {
@ -42,10 +56,12 @@ async function withTaskExecutorStateDir(run: (root: string) => Promise<void>): P
await withTempDir({ prefix: "openclaw-task-executor-" }, async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
resetFlowRegistryForTests();
try {
await run(root);
} finally {
resetTaskRegistryForTests();
resetFlowRegistryForTests();
}
});
}
@ -58,6 +74,7 @@ describe("task-executor", () => {
process.env.OPENCLAW_STATE_DIR = ORIGINAL_STATE_DIR;
}
resetTaskRegistryForTests();
resetFlowRegistryForTests();
hoisted.sendMessageMock.mockReset();
hoisted.cancelSessionMock.mockReset();
hoisted.killSubagentRunAdminMock.mockReset();
@ -141,7 +158,64 @@ describe("task-executor", () => {
});
});
it("records blocked task outcomes without wrapping them in a separate flow model", async () => {
it("auto-creates a one-task flow and keeps it synced with task status", async () => {
await withTaskExecutorStateDir(async () => {
const created = createRunningTaskRun({
runtime: "subagent",
ownerKey: "agent:main:main",
scopeKind: "session",
childSessionKey: "agent:codex:subagent:child",
runId: "run-executor-flow",
task: "Write summary",
startedAt: 10,
deliveryStatus: "pending",
});
expect(created.parentFlowId).toEqual(expect.any(String));
expect(getFlowById(created.parentFlowId!)).toMatchObject({
flowId: created.parentFlowId,
ownerKey: "agent:main:main",
status: "running",
goal: "Write summary",
notifyPolicy: "done_only",
});
completeTaskRunByRunId({
runId: "run-executor-flow",
endedAt: 40,
lastEventAt: 40,
terminalSummary: "Done.",
});
expect(getFlowById(created.parentFlowId!)).toMatchObject({
flowId: created.parentFlowId,
status: "succeeded",
endedAt: 40,
goal: "Write summary",
notifyPolicy: "done_only",
});
});
});
it("does not auto-create one-task flows for non-returning bookkeeping runs", async () => {
await withTaskExecutorStateDir(async () => {
const created = createRunningTaskRun({
runtime: "cli",
ownerKey: "agent:main:main",
scopeKind: "session",
childSessionKey: "agent:main:main",
runId: "run-executor-cli",
task: "Foreground gateway run",
deliveryStatus: "not_applicable",
startedAt: 10,
});
expect(created.parentFlowId).toBeUndefined();
expect(listFlowRecords()).toEqual([]);
});
});
it("records blocked metadata on one-task flows and reuses the same flow for queued retries", async () => {
await withTaskExecutorStateDir(async () => {
const created = createRunningTaskRun({
runtime: "acp",
@ -156,7 +230,6 @@ describe("task-executor", () => {
task: "Patch file",
startedAt: 10,
deliveryStatus: "pending",
notifyPolicy: "silent",
});
completeTaskRunByRunId({
@ -173,6 +246,113 @@ describe("task-executor", () => {
terminalOutcome: "blocked",
terminalSummary: "Writable session required.",
});
expect(getFlowById(created.parentFlowId!)).toMatchObject({
flowId: created.parentFlowId,
status: "blocked",
blockedTaskId: created.taskId,
blockedSummary: "Writable session required.",
endedAt: 40,
});
const retried = retryBlockedFlowAsQueuedTaskRun({
flowId: created.parentFlowId!,
runId: "run-executor-retry",
childSessionKey: "agent:codex:acp:retry-child",
});
expect(retried).toMatchObject({
found: true,
retried: true,
previousTask: expect.objectContaining({
taskId: created.taskId,
}),
task: expect.objectContaining({
parentFlowId: created.parentFlowId,
parentTaskId: created.taskId,
status: "queued",
runId: "run-executor-retry",
}),
});
expect(getFlowById(created.parentFlowId!)).toMatchObject({
flowId: created.parentFlowId,
status: "queued",
});
expect(findLatestTaskForFlowId(created.parentFlowId!)).toMatchObject({
runId: "run-executor-retry",
});
expect(findTaskByRunId("run-executor-blocked")).toMatchObject({
taskId: created.taskId,
status: "succeeded",
terminalOutcome: "blocked",
terminalSummary: "Writable session required.",
});
});
});
it("cancels active tasks linked to a managed flow", async () => {
await withTaskExecutorStateDir(async () => {
hoisted.cancelSessionMock.mockResolvedValue(undefined);
const flow = createManagedFlow({
ownerKey: "agent:main:main",
controllerId: "tests/managed-flow",
goal: "Inspect PR batch",
});
const child = createRunningTaskRun({
runtime: "acp",
ownerKey: "agent:main:main",
scopeKind: "session",
parentFlowId: flow.flowId,
childSessionKey: "agent:codex:acp:child",
runId: "run-linear-cancel",
task: "Inspect a PR",
startedAt: 10,
deliveryStatus: "pending",
});
const cancelled = await cancelFlowById({
cfg: {} as never,
flowId: flow.flowId,
});
expect(cancelled).toMatchObject({
found: true,
cancelled: true,
});
expect(findTaskByRunId("run-linear-cancel")).toMatchObject({
taskId: child.taskId,
status: "cancelled",
});
expect(getFlowById(flow.flowId)).toMatchObject({
flowId: flow.flowId,
status: "cancelled",
});
});
});
it("denies cross-owner flow cancellation through the owner-scoped wrapper", async () => {
await withTaskExecutorStateDir(async () => {
const flow = createManagedFlow({
ownerKey: "agent:main:main",
controllerId: "tests/managed-flow",
goal: "Protected flow",
});
const cancelled = await cancelFlowByIdForOwner({
cfg: {} as never,
flowId: flow.flowId,
callerOwnerKey: "agent:main:other",
});
expect(cancelled).toMatchObject({
found: false,
cancelled: false,
reason: "Flow not found.",
});
expect(getFlowById(flow.flowId)).toMatchObject({
flowId: flow.flowId,
status: "queued",
});
});
});

View File

@ -1,24 +1,85 @@
import type { OpenClawConfig } from "../config/config.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import { getFlowByIdForOwner } from "./flow-owner-access.js";
import type { FlowRecord } from "./flow-registry.types.js";
import {
createFlowForTask,
deleteFlowRecordById,
getFlowById,
updateFlowRecordByIdExpectedRevision,
} from "./flow-runtime-internal.js";
import {
cancelTaskById,
createTaskRecord,
findLatestTaskForFlowId,
linkTaskToFlowById,
listTasksForFlowId,
markTaskLostById,
markTaskRunningByRunId,
markTaskTerminalByRunId,
recordTaskProgressByRunId,
setTaskRunDeliveryStatusByRunId,
} from "./runtime-internal.js";
import { summarizeTaskRecords } from "./task-registry.summary.js";
import type {
TaskDeliveryState,
TaskDeliveryStatus,
TaskNotifyPolicy,
TaskRecord,
TaskRegistrySummary,
TaskRuntime,
TaskScopeKind,
TaskStatus,
TaskTerminalOutcome,
} from "./task-registry.types.js";
const log = createSubsystemLogger("tasks/executor");
function isOneTaskFlowEligible(task: TaskRecord): boolean {
if (task.parentFlowId?.trim() || task.scopeKind !== "session") {
return false;
}
if (task.deliveryStatus === "not_applicable") {
return false;
}
return task.runtime === "acp" || task.runtime === "subagent";
}
function ensureSingleTaskFlow(params: {
task: TaskRecord;
requesterOrigin?: TaskDeliveryState["requesterOrigin"];
}): TaskRecord {
if (!isOneTaskFlowEligible(params.task)) {
return params.task;
}
try {
const flow = createFlowForTask({
task: params.task,
requesterOrigin: params.requesterOrigin,
});
const linked = linkTaskToFlowById({
taskId: params.task.taskId,
flowId: flow.flowId,
});
if (!linked) {
deleteFlowRecordById(flow.flowId);
return params.task;
}
if (linked.parentFlowId !== flow.flowId) {
deleteFlowRecordById(flow.flowId);
return linked;
}
return linked;
} catch (error) {
log.warn("Failed to create one-task flow for detached run", {
taskId: params.task.taskId,
runId: params.task.runId,
error,
});
return params.task;
}
}
export function createQueuedTaskRun(params: {
runtime: TaskRuntime;
sourceId?: string;
@ -26,6 +87,7 @@ export function createQueuedTaskRun(params: {
ownerKey?: string;
scopeKind?: TaskScopeKind;
requesterOrigin?: TaskDeliveryState["requesterOrigin"];
parentFlowId?: string;
childSessionKey?: string;
parentTaskId?: string;
agentId?: string;
@ -36,10 +98,18 @@ export function createQueuedTaskRun(params: {
notifyPolicy?: TaskNotifyPolicy;
deliveryStatus?: TaskDeliveryStatus;
}): TaskRecord {
return createTaskRecord({
const task = createTaskRecord({
...params,
status: "queued",
});
return ensureSingleTaskFlow({
task,
requesterOrigin: params.requesterOrigin,
});
}
export function getFlowTaskSummary(flowId: string): TaskRegistrySummary {
return summarizeTaskRecords(listTasksForFlowId(flowId));
}
export function createRunningTaskRun(params: {
@ -49,6 +119,7 @@ export function createRunningTaskRun(params: {
ownerKey?: string;
scopeKind?: TaskScopeKind;
requesterOrigin?: TaskDeliveryState["requesterOrigin"];
parentFlowId?: string;
childSessionKey?: string;
parentTaskId?: string;
agentId?: string;
@ -62,10 +133,14 @@ export function createRunningTaskRun(params: {
lastEventAt?: number;
progressSummary?: string | null;
}): TaskRecord {
return createTaskRecord({
const task = createTaskRecord({
...params,
status: "running",
});
return ensureSingleTaskFlow({
task,
requesterOrigin: params.requesterOrigin,
});
}
export function startTaskRunByRunId(params: {
@ -157,6 +232,255 @@ export function setDetachedTaskDeliveryStatusByRunId(params: {
return setTaskRunDeliveryStatusByRunId(params);
}
type RetryBlockedFlowResult = {
found: boolean;
retried: boolean;
reason?: string;
previousTask?: TaskRecord;
task?: TaskRecord;
};
type RetryBlockedFlowParams = {
flowId: string;
sourceId?: string;
requesterOrigin?: TaskDeliveryState["requesterOrigin"];
childSessionKey?: string;
agentId?: string;
runId?: string;
label?: string;
task?: string;
preferMetadata?: boolean;
notifyPolicy?: TaskNotifyPolicy;
deliveryStatus?: TaskDeliveryStatus;
status: "queued" | "running";
startedAt?: number;
lastEventAt?: number;
progressSummary?: string | null;
};
function resolveRetryableBlockedFlowTask(flowId: string): {
flowFound: boolean;
retryable: boolean;
latestTask?: TaskRecord;
reason?: string;
} {
const flow = getFlowById(flowId);
if (!flow) {
return {
flowFound: false,
retryable: false,
reason: "Flow not found.",
};
}
const latestTask = findLatestTaskForFlowId(flowId);
if (!latestTask) {
return {
flowFound: true,
retryable: false,
reason: "Flow has no retryable task.",
};
}
if (flow.status !== "blocked") {
return {
flowFound: true,
retryable: false,
latestTask,
reason: "Flow is not blocked.",
};
}
if (latestTask.status !== "succeeded" || latestTask.terminalOutcome !== "blocked") {
return {
flowFound: true,
retryable: false,
latestTask,
reason: "Latest flow task is not blocked.",
};
}
return {
flowFound: true,
retryable: true,
latestTask,
};
}
function retryBlockedFlowTask(params: RetryBlockedFlowParams): RetryBlockedFlowResult {
const resolved = resolveRetryableBlockedFlowTask(params.flowId);
if (!resolved.retryable || !resolved.latestTask) {
return {
found: resolved.flowFound,
retried: false,
reason: resolved.reason,
};
}
const flow = getFlowById(params.flowId);
if (!flow) {
return {
found: false,
retried: false,
reason: "Flow not found.",
previousTask: resolved.latestTask,
};
}
const task = createTaskRecord({
runtime: resolved.latestTask.runtime,
sourceId: params.sourceId ?? resolved.latestTask.sourceId,
ownerKey: flow.ownerKey,
scopeKind: "session",
requesterOrigin: params.requesterOrigin ?? flow.requesterOrigin,
parentFlowId: flow.flowId,
childSessionKey: params.childSessionKey,
parentTaskId: resolved.latestTask.taskId,
agentId: params.agentId ?? resolved.latestTask.agentId,
runId: params.runId,
label: params.label ?? resolved.latestTask.label,
task: params.task ?? resolved.latestTask.task,
preferMetadata: params.preferMetadata,
notifyPolicy: params.notifyPolicy ?? resolved.latestTask.notifyPolicy,
deliveryStatus: params.deliveryStatus ?? "pending",
status: params.status,
startedAt: params.startedAt,
lastEventAt: params.lastEventAt,
progressSummary: params.progressSummary,
});
return {
found: true,
retried: true,
previousTask: resolved.latestTask,
task,
};
}
export function retryBlockedFlowAsQueuedTaskRun(
params: Omit<RetryBlockedFlowParams, "status" | "startedAt" | "lastEventAt" | "progressSummary">,
): RetryBlockedFlowResult {
return retryBlockedFlowTask({
...params,
status: "queued",
});
}
export function retryBlockedFlowAsRunningTaskRun(
params: Omit<RetryBlockedFlowParams, "status">,
): RetryBlockedFlowResult {
return retryBlockedFlowTask({
...params,
status: "running",
});
}
type CancelFlowResult = {
found: boolean;
cancelled: boolean;
reason?: string;
flow?: FlowRecord;
tasks?: TaskRecord[];
};
function isActiveTaskStatus(status: TaskStatus): boolean {
return status === "queued" || status === "running";
}
function isTerminalFlowStatus(status: FlowRecord["status"]): boolean {
return (
status === "succeeded" || status === "failed" || status === "cancelled" || status === "lost"
);
}
export async function cancelFlowById(params: {
cfg: OpenClawConfig;
flowId: string;
}): Promise<CancelFlowResult> {
const flow = getFlowById(params.flowId);
if (!flow) {
return {
found: false,
cancelled: false,
reason: "Flow not found.",
};
}
const linkedTasks = listTasksForFlowId(flow.flowId);
const activeTasks = linkedTasks.filter((task) => isActiveTaskStatus(task.status));
for (const task of activeTasks) {
await cancelTaskById({
cfg: params.cfg,
taskId: task.taskId,
});
}
const refreshedTasks = listTasksForFlowId(flow.flowId);
const remainingActive = refreshedTasks.filter((task) => isActiveTaskStatus(task.status));
if (remainingActive.length > 0) {
return {
found: true,
cancelled: false,
reason: "One or more child tasks are still active.",
flow: getFlowById(flow.flowId),
tasks: refreshedTasks,
};
}
if (isTerminalFlowStatus(flow.status)) {
return {
found: true,
cancelled: false,
reason: `Flow is already ${flow.status}.`,
flow,
tasks: refreshedTasks,
};
}
const now = Date.now();
const refreshedFlow = getFlowById(flow.flowId) ?? flow;
const updatedFlowResult = updateFlowRecordByIdExpectedRevision({
flowId: refreshedFlow.flowId,
expectedRevision: refreshedFlow.revision,
patch: {
status: "cancelled",
blockedTaskId: null,
blockedSummary: null,
endedAt: now,
updatedAt: now,
},
});
if (!updatedFlowResult.applied) {
return {
found: true,
cancelled: false,
reason:
updatedFlowResult.reason === "revision_conflict"
? "Flow changed while cancellation was in progress."
: "Flow not found.",
flow: updatedFlowResult.current ?? getFlowById(flow.flowId),
tasks: refreshedTasks,
};
}
return {
found: true,
cancelled: true,
flow: updatedFlowResult.flow,
tasks: refreshedTasks,
};
}
export async function cancelFlowByIdForOwner(params: {
cfg: OpenClawConfig;
flowId: string;
callerOwnerKey: string;
}): Promise<CancelFlowResult> {
const flow = getFlowByIdForOwner({
flowId: params.flowId,
callerOwnerKey: params.callerOwnerKey,
});
if (!flow) {
return {
found: false,
cancelled: false,
reason: "Flow not found.",
};
}
return cancelFlowById({
cfg: params.cfg,
flowId: flow.flowId,
});
}
export async function cancelDetachedTaskRunById(params: { cfg: OpenClawConfig; taskId: string }) {
return cancelTaskById(params);
}

View File

@ -13,6 +13,7 @@ type TaskRegistryRow = {
owner_key: string;
scope_kind: TaskRecord["scopeKind"];
child_session_key: string | null;
parent_flow_id: string | null;
parent_task_id: string | null;
agent_id: string | null;
run_id: string | null;
@ -99,6 +100,7 @@ function rowToTaskRecord(row: TaskRegistryRow): TaskRecord {
ownerKey: row.owner_key,
scopeKind: row.scope_kind,
...(row.child_session_key ? { childSessionKey: row.child_session_key } : {}),
...(row.parent_flow_id ? { parentFlowId: row.parent_flow_id } : {}),
...(row.parent_task_id ? { parentTaskId: row.parent_task_id } : {}),
...(row.agent_id ? { agentId: row.agent_id } : {}),
...(row.run_id ? { runId: row.run_id } : {}),
@ -137,6 +139,7 @@ function bindTaskRecord(record: TaskRecord) {
owner_key: record.ownerKey,
scope_kind: record.scopeKind,
child_session_key: record.childSessionKey ?? null,
parent_flow_id: record.parentFlowId ?? null,
parent_task_id: record.parentTaskId ?? null,
agent_id: record.agentId ?? null,
run_id: record.runId ?? null,
@ -175,6 +178,7 @@ function createStatements(db: DatabaseSync): TaskRegistryStatements {
owner_key,
scope_kind,
child_session_key,
parent_flow_id,
parent_task_id,
agent_id,
run_id,
@ -211,6 +215,7 @@ function createStatements(db: DatabaseSync): TaskRegistryStatements {
owner_key,
scope_kind,
child_session_key,
parent_flow_id,
parent_task_id,
agent_id,
run_id,
@ -235,6 +240,7 @@ function createStatements(db: DatabaseSync): TaskRegistryStatements {
@owner_key,
@scope_kind,
@child_session_key,
@parent_flow_id,
@parent_task_id,
@agent_id,
@run_id,
@ -259,6 +265,7 @@ function createStatements(db: DatabaseSync): TaskRegistryStatements {
owner_key = excluded.owner_key,
scope_kind = excluded.scope_kind,
child_session_key = excluded.child_session_key,
parent_flow_id = excluded.parent_flow_id,
parent_task_id = excluded.parent_task_id,
agent_id = excluded.agent_id,
run_id = excluded.run_id,
@ -340,6 +347,7 @@ function ensureSchema(db: DatabaseSync) {
owner_key TEXT NOT NULL,
scope_kind TEXT NOT NULL,
child_session_key TEXT,
parent_flow_id TEXT,
parent_task_id TEXT,
agent_id TEXT,
run_id TEXT,
@ -360,6 +368,9 @@ function ensureSchema(db: DatabaseSync) {
);
`);
migrateLegacyOwnerColumns(db);
if (!hasTaskRunsColumn(db, "parent_flow_id")) {
db.exec(`ALTER TABLE task_runs ADD COLUMN parent_flow_id TEXT;`);
}
db.exec(`
CREATE TABLE IF NOT EXISTS task_delivery_state (
task_id TEXT PRIMARY KEY,
@ -373,6 +384,7 @@ function ensureSchema(db: DatabaseSync) {
db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_cleanup_after ON task_runs(cleanup_after);`);
db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_last_event_at ON task_runs(last_event_at);`);
db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_owner_key ON task_runs(owner_key);`);
db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_parent_flow_id ON task_runs(parent_flow_id);`);
db.exec(
`CREATE INDEX IF NOT EXISTS idx_task_runs_child_session_key ON task_runs(child_session_key);`,
);

View File

@ -194,6 +194,27 @@ describe("task-registry store runtime", () => {
});
});
it("persists parentFlowId with task rows", () => {
const created = createTaskRecord({
runtime: "acp",
ownerKey: "agent:main:main",
scopeKind: "session",
parentFlowId: "flow-123",
childSessionKey: "agent:codex:acp:new",
runId: "run-flow-linked",
task: "Linked task",
status: "running",
deliveryStatus: "pending",
});
resetTaskRegistryForTests({ persist: false });
expect(findTaskByRunId("run-flow-linked")).toMatchObject({
taskId: created.taskId,
parentFlowId: "flow-123",
});
});
it("hardens the sqlite task store directory and file modes", () => {
if (process.platform === "win32") {
return;

View File

@ -11,6 +11,7 @@ import {
} from "../infra/heartbeat-wake.js";
import { peekSystemEvents, resetSystemEventsForTest } from "../infra/system-events.js";
import { withTempDir } from "../test-helpers/temp-dir.js";
import { createManagedFlow, resetFlowRegistryForTests } from "./flow-registry.js";
import {
createTaskRecord,
findLatestTaskForOwnerKey,
@ -20,6 +21,7 @@ import {
getTaskRegistrySummary,
listTasksForOwnerKey,
listTaskRecords,
linkTaskToFlowById,
maybeDeliverTaskStateChangeUpdate,
maybeDeliverTaskTerminalUpdate,
markTaskRunningByRunId,
@ -190,6 +192,7 @@ describe("task-registry", () => {
resetHeartbeatWakeStateForTests();
resetAgentRunContextForTest();
resetTaskRegistryForTests({ persist: false });
resetFlowRegistryForTests({ persist: false });
hoisted.sendMessageMock.mockReset();
hoisted.cancelSessionMock.mockReset();
hoisted.killSubagentRunAdminMock.mockReset();
@ -293,6 +296,89 @@ describe("task-registry", () => {
});
});
it("rejects cross-owner parent flow links during task creation", async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
resetFlowRegistryForTests();
const flow = createManagedFlow({
ownerKey: "agent:main:main",
controllerId: "tests/task-registry",
goal: "Owner main flow",
});
expect(() =>
createTaskRecord({
runtime: "acp",
ownerKey: "agent:main:other",
scopeKind: "session",
parentFlowId: flow.flowId,
runId: "cross-owner-run",
task: "Attempt hijack",
}),
).toThrow("Task ownerKey must match parent flow ownerKey.");
});
});
it("rejects system-scoped parent flow links during task creation", async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
resetFlowRegistryForTests();
const flow = createManagedFlow({
ownerKey: "agent:main:main",
controllerId: "tests/task-registry",
goal: "Owner main flow",
});
expect(() =>
createTaskRecord({
runtime: "cron",
ownerKey: "agent:main:main",
scopeKind: "system",
parentFlowId: flow.flowId,
runId: "system-link-run",
task: "System task",
deliveryStatus: "not_applicable",
}),
).toThrow("Only session-scoped tasks can link to flows.");
});
});
it("rejects cross-owner flow links for existing tasks", async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
resetFlowRegistryForTests();
const task = createTaskRecord({
runtime: "acp",
ownerKey: "agent:main:main",
scopeKind: "session",
runId: "owner-main-task",
task: "Safe task",
});
const flow = createManagedFlow({
ownerKey: "agent:main:other",
controllerId: "tests/task-registry",
goal: "Other owner flow",
});
expect(() =>
linkTaskToFlowById({
taskId: task.taskId,
flowId: flow.flowId,
}),
).toThrow("Task ownerKey must match parent flow ownerKey.");
expect(getTaskById(task.taskId)).toMatchObject({
taskId: task.taskId,
parentFlowId: undefined,
});
});
});
it("delivers ACP completion to the requester channel when a delivery origin exists", async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;

View File

@ -9,6 +9,7 @@ import { createSubsystemLogger } from "../logging/subsystem.js";
import { parseAgentSessionKey } from "../routing/session-key.js";
import { normalizeDeliveryContext } from "../utils/delivery-context.js";
import { isDeliverableMessageChannel } from "../utils/message-channel.js";
import { getFlowById, syncFlowFromTask } from "./flow-runtime-internal.js";
import {
formatTaskBlockedFollowupMessage,
formatTaskStateChangeMessage,
@ -47,6 +48,7 @@ const tasks = new Map<string, TaskRecord>();
const taskDeliveryStates = new Map<string, TaskDeliveryState>();
const taskIdsByRunId = new Map<string, Set<string>>();
const taskIdsByOwnerKey = new Map<string, Set<string>>();
const taskIdsByParentFlowId = new Map<string, Set<string>>();
const taskIdsByRelatedSessionKey = new Map<string, Set<string>>();
const tasksWithPendingDelivery = new Set<string>();
let listenerStarted = false;
@ -58,6 +60,7 @@ let deliveryRuntimePromise: Promise<typeof import("./task-registry-delivery-runt
type TaskDeliveryOwner = {
sessionKey?: string;
requesterOrigin?: TaskDeliveryState["requesterOrigin"];
flowId?: string;
};
function assertTaskOwner(params: { ownerKey: string; scopeKind: TaskScopeKind }) {
@ -67,6 +70,32 @@ function assertTaskOwner(params: { ownerKey: string; scopeKind: TaskScopeKind })
}
}
function normalizeOwnerKey(ownerKey?: string): string | undefined {
const trimmed = ownerKey?.trim();
return trimmed ? trimmed : undefined;
}
function assertParentFlowLinkAllowed(params: {
ownerKey: string;
scopeKind: TaskScopeKind;
parentFlowId?: string;
}) {
const flowId = params.parentFlowId?.trim();
if (!flowId) {
return;
}
if (params.scopeKind !== "session") {
throw new Error("Only session-scoped tasks can link to flows.");
}
const flow = getFlowById(flowId);
if (!flow) {
throw new Error(`Parent flow not found: ${flowId}`);
}
if (normalizeOwnerKey(flow.ownerKey) !== normalizeOwnerKey(params.ownerKey)) {
throw new Error("Task ownerKey must match parent flow ownerKey.");
}
}
function cloneTaskRecord(record: TaskRecord): TaskRecord {
return { ...record };
}
@ -338,6 +367,22 @@ function deleteOwnerKeyIndex(taskId: string, task: Pick<TaskRecord, "ownerKey">)
deleteIndexedKey(taskIdsByOwnerKey, key, taskId);
}
function addParentFlowIdIndex(taskId: string, task: Pick<TaskRecord, "parentFlowId">) {
const key = task.parentFlowId?.trim();
if (!key) {
return;
}
addIndexedKey(taskIdsByParentFlowId, key, taskId);
}
function deleteParentFlowIdIndex(taskId: string, task: Pick<TaskRecord, "parentFlowId">) {
const key = task.parentFlowId?.trim();
if (!key) {
return;
}
deleteIndexedKey(taskIdsByParentFlowId, key, taskId);
}
function addRelatedSessionKeyIndex(
taskId: string,
task: Pick<TaskRecord, "ownerKey" | "childSessionKey">,
@ -370,6 +415,13 @@ function rebuildOwnerKeyIndex() {
}
}
function rebuildParentFlowIdIndex() {
taskIdsByParentFlowId.clear();
for (const [taskId, task] of tasks.entries()) {
addParentFlowIdIndex(taskId, task);
}
}
function rebuildRelatedSessionKeyIndex() {
taskIdsByRelatedSessionKey.clear();
for (const [taskId, task] of tasks.entries()) {
@ -473,6 +525,7 @@ function findExistingTaskForCreate(params: {
ownerKey: string;
scopeKind: TaskScopeKind;
childSessionKey?: string;
parentFlowId?: string;
runId?: string;
label?: string;
task: string;
@ -485,7 +538,9 @@ function findExistingTaskForCreate(params: {
task.scopeKind === params.scopeKind &&
normalizeComparableText(task.ownerKey) === normalizeComparableText(params.ownerKey) &&
normalizeComparableText(task.childSessionKey) ===
normalizeComparableText(params.childSessionKey),
normalizeComparableText(params.childSessionKey) &&
normalizeComparableText(task.parentFlowId) ===
normalizeComparableText(params.parentFlowId),
)
: [];
const exact = runId
@ -512,6 +567,7 @@ function mergeExistingTaskForCreate(
params: {
requesterOrigin?: TaskDeliveryState["requesterOrigin"];
sourceId?: string;
parentFlowId?: string;
parentTaskId?: string;
agentId?: string;
label?: string;
@ -534,6 +590,14 @@ function mergeExistingTaskForCreate(
if (params.sourceId?.trim() && !existing.sourceId?.trim()) {
patch.sourceId = params.sourceId.trim();
}
if (params.parentFlowId?.trim() && !existing.parentFlowId?.trim()) {
assertParentFlowLinkAllowed({
ownerKey: existing.ownerKey,
scopeKind: existing.scopeKind,
parentFlowId: params.parentFlowId,
});
patch.parentFlowId = params.parentFlowId.trim();
}
if (params.parentTaskId?.trim() && !existing.parentTaskId?.trim()) {
patch.parentTaskId = params.parentTaskId.trim();
}
@ -580,14 +644,47 @@ function resolveTaskStateChangeIdempotencyKey(params: {
latestEvent: TaskEventRecord;
owner: TaskDeliveryOwner;
}): string {
if (params.owner.flowId) {
return `flow-event:${params.owner.flowId}:${params.task.taskId}:${params.latestEvent.at}:${params.latestEvent.kind}`;
}
return `task-event:${params.task.taskId}:${params.latestEvent.at}:${params.latestEvent.kind}`;
}
function resolveTaskTerminalIdempotencyKey(task: TaskRecord): string {
const owner = resolveTaskDeliveryOwner(task);
if (owner.flowId) {
const outcome = task.status === "succeeded" ? (task.terminalOutcome ?? "default") : "default";
return `flow-terminal:${owner.flowId}:${task.taskId}:${task.status}:${outcome}`;
}
return taskTerminalDeliveryIdempotencyKey(task);
}
function getLinkedFlowForDelivery(task: TaskRecord) {
const flowId = task.parentFlowId?.trim();
if (!flowId || task.scopeKind !== "session") {
return undefined;
}
const flow = getFlowById(flowId);
if (!flow) {
return undefined;
}
if (normalizeOwnerKey(flow.ownerKey) !== normalizeOwnerKey(task.ownerKey)) {
return undefined;
}
return flow;
}
function resolveTaskDeliveryOwner(task: TaskRecord): TaskDeliveryOwner {
const flow = getLinkedFlowForDelivery(task);
if (flow) {
return {
sessionKey: flow.ownerKey.trim(),
requesterOrigin: normalizeDeliveryContext(
flow.requesterOrigin ?? taskDeliveryStates.get(task.taskId)?.requesterOrigin,
),
flowId: flow.flowId,
};
}
if (task.scopeKind !== "session") {
return {};
}
@ -615,6 +712,7 @@ function restoreTaskRegistryOnce() {
}
rebuildRunIdIndex();
rebuildOwnerKeyIndex();
rebuildParentFlowIdIndex();
rebuildRelatedSessionKeyIndex();
emitTaskRegistryHookEvent(() => ({
kind: "restored",
@ -644,6 +742,7 @@ function updateTask(taskId: string, patch: Partial<TaskRecord>): TaskRecord | nu
normalizeSessionIndexKey(current.ownerKey) !== normalizeSessionIndexKey(next.ownerKey) ||
normalizeSessionIndexKey(current.childSessionKey) !==
normalizeSessionIndexKey(next.childSessionKey);
const parentFlowIndexChanged = current.parentFlowId?.trim() !== next.parentFlowId?.trim();
tasks.set(taskId, next);
if (patch.runId && patch.runId !== current.runId) {
rebuildRunIdIndex();
@ -654,7 +753,20 @@ function updateTask(taskId: string, patch: Partial<TaskRecord>): TaskRecord | nu
deleteRelatedSessionKeyIndex(taskId, current);
addRelatedSessionKeyIndex(taskId, next);
}
if (parentFlowIndexChanged) {
deleteParentFlowIdIndex(taskId, current);
addParentFlowIdIndex(taskId, next);
}
persistTaskUpsert(next);
try {
syncFlowFromTask(next);
} catch (error) {
log.warn("Failed to sync parent flow from task update", {
taskId,
flowId: next.parentFlowId,
error,
});
}
emitTaskRegistryHookEvent(() => ({
kind: "upserted",
task: cloneTaskRecord(next),
@ -1107,6 +1219,7 @@ export function createTaskRecord(params: {
scopeKind?: TaskScopeKind;
requesterOrigin?: TaskDeliveryState["requesterOrigin"];
childSessionKey?: string;
parentFlowId?: string;
parentTaskId?: string;
agentId?: string;
runId?: string;
@ -1137,11 +1250,17 @@ export function createTaskRecord(params: {
ownerKey,
scopeKind,
});
assertParentFlowLinkAllowed({
ownerKey,
scopeKind,
parentFlowId: params.parentFlowId,
});
const existing = findExistingTaskForCreate({
runtime: params.runtime,
ownerKey,
scopeKind,
childSessionKey: params.childSessionKey,
parentFlowId: params.parentFlowId,
runId: params.runId,
label: params.label,
task: params.task,
@ -1173,6 +1292,7 @@ export function createTaskRecord(params: {
ownerKey,
scopeKind,
childSessionKey: params.childSessionKey,
parentFlowId: params.parentFlowId?.trim() || undefined,
parentTaskId: params.parentTaskId?.trim() || undefined,
agentId: params.agentId?.trim() || undefined,
runId: params.runId?.trim() || undefined,
@ -1203,8 +1323,18 @@ export function createTaskRecord(params: {
});
addRunIdIndex(taskId, record.runId);
addOwnerKeyIndex(taskId, record);
addParentFlowIdIndex(taskId, record);
addRelatedSessionKeyIndex(taskId, record);
persistTaskUpsert(record);
try {
syncFlowFromTask(record);
} catch (error) {
log.warn("Failed to sync parent flow from task create", {
taskId: record.taskId,
flowId: record.parentFlowId,
error,
});
}
emitTaskRegistryHookEvent(() => ({
kind: "upserted",
task: cloneTaskRecord(record),
@ -1400,6 +1530,29 @@ export function updateTaskNotifyPolicyById(params: {
});
}
export function linkTaskToFlowById(params: { taskId: string; flowId: string }): TaskRecord | null {
ensureTaskRegistryReady();
const flowId = params.flowId.trim();
if (!flowId) {
return null;
}
const current = tasks.get(params.taskId);
if (!current) {
return null;
}
if (current.parentFlowId?.trim()) {
return cloneTaskRecord(current);
}
assertParentFlowLinkAllowed({
ownerKey: current.ownerKey,
scopeKind: current.scopeKind,
parentFlowId: flowId,
});
return updateTask(params.taskId, {
parentFlowId: flowId,
});
}
export async function cancelTaskById(params: {
cfg: OpenClawConfig;
taskId: string;
@ -1567,6 +1720,11 @@ export function findLatestTaskForOwnerKey(ownerKey: string): TaskRecord | undefi
return task ? cloneTaskRecord(task) : undefined;
}
export function findLatestTaskForFlowId(flowId: string): TaskRecord | undefined {
const task = listTasksForFlowId(flowId)[0];
return task ? cloneTaskRecord(task) : undefined;
}
export function listTasksForOwnerKey(ownerKey: string): TaskRecord[] {
ensureTaskRegistryReady();
const key = normalizeSessionIndexKey(ownerKey);
@ -1576,6 +1734,15 @@ export function listTasksForOwnerKey(ownerKey: string): TaskRecord[] {
return listTasksFromIndex(taskIdsByOwnerKey, key);
}
export function listTasksForFlowId(flowId: string): TaskRecord[] {
ensureTaskRegistryReady();
const key = flowId.trim();
if (!key) {
return [];
}
return listTasksFromIndex(taskIdsByParentFlowId, key);
}
export function findLatestTaskForRelatedSessionKey(sessionKey: string): TaskRecord | undefined {
const task = listTasksForRelatedSessionKey(sessionKey)[0];
return task ? cloneTaskRecord(task) : undefined;
@ -1607,6 +1774,7 @@ export function deleteTaskRecordById(taskId: string): boolean {
return false;
}
deleteOwnerKeyIndex(taskId, current);
deleteParentFlowIdIndex(taskId, current);
deleteRelatedSessionKeyIndex(taskId, current);
tasks.delete(taskId);
taskDeliveryStates.delete(taskId);
@ -1626,6 +1794,7 @@ export function resetTaskRegistryForTests(opts?: { persist?: boolean }) {
taskDeliveryStates.clear();
taskIdsByRunId.clear();
taskIdsByOwnerKey.clear();
taskIdsByParentFlowId.clear();
taskIdsByRelatedSessionKey.clear();
tasksWithPendingDelivery.clear();
restoreAttempted = false;

View File

@ -58,6 +58,7 @@ export type TaskRecord = {
ownerKey: string;
scopeKind: TaskScopeKind;
childSessionKey?: string;
parentFlowId?: string;
parentTaskId?: string;
agentId?: string;
runId?: string;