refactor(tasks): remove flow registry layer

This commit is contained in:
Vincent Koc 2026-04-01 02:24:51 +09:00
parent 76b3235207
commit 1a313caff3
25 changed files with 55 additions and 3448 deletions

View File

@ -39,8 +39,7 @@ let AcpRuntimeError: typeof import("../runtime/errors.js").AcpRuntimeError;
let resetAcpSessionManagerForTests: typeof import("./manager.js").__testing.resetAcpSessionManagerForTests;
let findTaskByRunId: typeof import("../../tasks/task-registry.js").findTaskByRunId;
let resetTaskRegistryForTests: typeof import("../../tasks/task-registry.js").resetTaskRegistryForTests;
let resetFlowRegistryForTests: typeof import("../../tasks/flow-registry.js").resetFlowRegistryForTests;
let installInMemoryTaskAndFlowRegistryRuntime: typeof import("../../test-utils/task-flow-registry-runtime.js").installInMemoryTaskAndFlowRegistryRuntime;
let installInMemoryTaskRegistryRuntime: typeof import("../../test-utils/task-registry-runtime.js").installInMemoryTaskRegistryRuntime;
const baseCfg = {
acp: {
@ -55,13 +54,11 @@ async function withAcpManagerTaskStateDir(run: (root: string) => Promise<void>):
await withTempDir({ prefix: "openclaw-acp-manager-task-" }, async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests({ persist: false });
resetFlowRegistryForTests({ persist: false });
installInMemoryTaskAndFlowRegistryRuntime();
installInMemoryTaskRegistryRuntime();
try {
await run(root);
} finally {
resetTaskRegistryForTests({ persist: false });
resetFlowRegistryForTests({ persist: false });
}
});
}
@ -178,16 +175,14 @@ function extractRuntimeOptionsFromUpserts(): Array<AcpSessionRuntimeOptions | un
describe("AcpSessionManager", () => {
beforeAll(async () => {
vi.resetModules();
({
AcpSessionManager,
__testing: { resetAcpSessionManagerForTests },
} = await import("./manager.js"));
({ AcpRuntimeError } = await import("../runtime/errors.js"));
({ findTaskByRunId, resetTaskRegistryForTests } = await import("../../tasks/task-registry.js"));
({ resetFlowRegistryForTests } = await import("../../tasks/flow-registry.js"));
({ installInMemoryTaskAndFlowRegistryRuntime } =
await import("../../test-utils/task-flow-registry-runtime.js"));
({ installInMemoryTaskRegistryRuntime } =
await import("../../test-utils/task-registry-runtime.js"));
});
beforeEach(() => {
@ -206,7 +201,6 @@ describe("AcpSessionManager", () => {
process.env.OPENCLAW_STATE_DIR = ORIGINAL_STATE_DIR;
}
resetTaskRegistryForTests({ persist: false });
resetFlowRegistryForTests({ persist: false });
});
it("marks ACP-shaped sessions without metadata as stale", () => {

View File

@ -5,9 +5,6 @@ import { registerStatusHealthSessionsCommands } from "./register.status-health-s
const mocks = vi.hoisted(() => ({
statusCommand: vi.fn(),
healthCommand: vi.fn(),
flowsListCommand: vi.fn(),
flowsShowCommand: vi.fn(),
flowsCancelCommand: vi.fn(),
sessionsCommand: vi.fn(),
sessionsCleanupCommand: vi.fn(),
tasksListCommand: vi.fn(),
@ -26,9 +23,6 @@ const mocks = vi.hoisted(() => ({
const statusCommand = mocks.statusCommand;
const healthCommand = mocks.healthCommand;
const flowsListCommand = mocks.flowsListCommand;
const flowsShowCommand = mocks.flowsShowCommand;
const flowsCancelCommand = mocks.flowsCancelCommand;
const sessionsCommand = mocks.sessionsCommand;
const sessionsCleanupCommand = mocks.sessionsCleanupCommand;
const tasksListCommand = mocks.tasksListCommand;
@ -48,12 +42,6 @@ vi.mock("../../commands/health.js", () => ({
healthCommand: mocks.healthCommand,
}));
vi.mock("../../commands/flows.js", () => ({
flowsListCommand: mocks.flowsListCommand,
flowsShowCommand: mocks.flowsShowCommand,
flowsCancelCommand: mocks.flowsCancelCommand,
}));
vi.mock("../../commands/sessions.js", () => ({
sessionsCommand: mocks.sessionsCommand,
}));
@ -91,9 +79,6 @@ describe("registerStatusHealthSessionsCommands", () => {
runtime.exit.mockImplementation(() => {});
statusCommand.mockResolvedValue(undefined);
healthCommand.mockResolvedValue(undefined);
flowsListCommand.mockResolvedValue(undefined);
flowsShowCommand.mockResolvedValue(undefined);
flowsCancelCommand.mockResolvedValue(undefined);
sessionsCommand.mockResolvedValue(undefined);
sessionsCleanupCommand.mockResolvedValue(undefined);
tasksListCommand.mockResolvedValue(undefined);
@ -332,39 +317,4 @@ describe("registerStatusHealthSessionsCommands", () => {
runtime,
);
});
it("runs flows list from the parent command", async () => {
await runCli(["flows", "--json", "--status", "blocked"]);
expect(flowsListCommand).toHaveBeenCalledWith(
expect.objectContaining({
json: true,
status: "blocked",
}),
runtime,
);
});
it("runs flows show subcommand with lookup forwarding", async () => {
await runCli(["flows", "show", "flow-123", "--json"]);
expect(flowsShowCommand).toHaveBeenCalledWith(
expect.objectContaining({
lookup: "flow-123",
json: true,
}),
runtime,
);
});
it("runs flows cancel subcommand with lookup forwarding", async () => {
await runCli(["flows", "cancel", "flow-123"]);
expect(flowsCancelCommand).toHaveBeenCalledWith(
expect.objectContaining({
lookup: "flow-123",
}),
runtime,
);
});
});

View File

@ -1,5 +1,4 @@
import type { Command } from "commander";
import { flowsCancelCommand, flowsListCommand, flowsShowCommand } from "../../commands/flows.js";
import { healthCommand } from "../../commands/health.js";
import { sessionsCleanupCommand } from "../../commands/sessions-cleanup.js";
import { sessionsCommand } from "../../commands/sessions.js";
@ -374,84 +373,4 @@ export function registerStatusHealthSessionsCommands(program: Command) {
);
});
});
const flowsCmd = program
.command("flows")
.description("Inspect ClawFlow state")
.option("--json", "Output as JSON", false)
.option(
"--status <name>",
"Filter by status (queued, running, waiting, blocked, succeeded, failed, cancelled, lost)",
)
.action(async (opts) => {
await runCommandWithRuntime(defaultRuntime, async () => {
await flowsListCommand(
{
json: Boolean(opts.json),
status: opts.status as string | undefined,
},
defaultRuntime,
);
});
});
flowsCmd.enablePositionalOptions();
flowsCmd
.command("list")
.description("List tracked ClawFlow runs")
.option("--json", "Output as JSON", false)
.option(
"--status <name>",
"Filter by status (queued, running, waiting, blocked, succeeded, failed, cancelled, lost)",
)
.action(async (opts, command) => {
const parentOpts = command.parent?.opts() as
| {
json?: boolean;
status?: string;
}
| undefined;
await runCommandWithRuntime(defaultRuntime, async () => {
await flowsListCommand(
{
json: Boolean(opts.json || parentOpts?.json),
status: (opts.status as string | undefined) ?? parentOpts?.status,
},
defaultRuntime,
);
});
});
flowsCmd
.command("show")
.description("Show one ClawFlow by flow id or owner session key")
.argument("<lookup>", "Flow id or owner session key")
.option("--json", "Output as JSON", false)
.action(async (lookup, opts, command) => {
const parentOpts = command.parent?.opts() as { json?: boolean } | undefined;
await runCommandWithRuntime(defaultRuntime, async () => {
await flowsShowCommand(
{
lookup,
json: Boolean(opts.json || parentOpts?.json),
},
defaultRuntime,
);
});
});
flowsCmd
.command("cancel")
.description("Cancel a ClawFlow and its active child tasks")
.argument("<lookup>", "Flow id or owner session key")
.action(async (lookup) => {
await runCommandWithRuntime(defaultRuntime, async () => {
await flowsCancelCommand(
{
lookup,
},
defaultRuntime,
);
});
});
}

View File

@ -13,8 +13,6 @@ const mocks = vi.hoisted(() => ({
buildWorkspaceSkillStatus: vi.fn(),
buildPluginStatusReport: vi.fn(),
buildPluginCompatibilityWarnings: vi.fn(),
listFlowRecords: vi.fn(),
listTasksForFlowId: vi.fn(),
}));
vi.mock("../agents/agent-scope.js", () => ({
@ -32,14 +30,6 @@ vi.mock("../plugins/status.js", () => ({
mocks.buildPluginCompatibilityWarnings(...args),
}));
vi.mock("../tasks/flow-registry.js", () => ({
listFlowRecords: (...args: unknown[]) => mocks.listFlowRecords(...args),
}));
vi.mock("../tasks/task-registry.js", () => ({
listTasksForFlowId: (...args: unknown[]) => mocks.listTasksForFlowId(...args),
}));
async function runNoteWorkspaceStatusForTest(
loadResult: ReturnType<typeof createPluginLoadResult>,
compatibilityWarnings: string[] = [],
@ -54,8 +44,6 @@ async function runNoteWorkspaceStatusForTest(
...loadResult,
});
mocks.buildPluginCompatibilityWarnings.mockReturnValue(compatibilityWarnings);
mocks.listFlowRecords.mockReturnValue([]);
mocks.listTasksForFlowId.mockReturnValue([]);
const noteSpy = vi.spyOn(noteModule, "note").mockImplementation(() => {});
noteWorkspaceStatus({});
@ -171,54 +159,4 @@ describe("noteWorkspaceStatus", () => {
noteSpy.mockRestore();
}
});
it("surfaces ClawFlow recovery guidance for suspicious linear flows", async () => {
const noteSpy = await runNoteWorkspaceStatusForTest(createPluginLoadResult({ plugins: [] }));
mocks.listFlowRecords.mockReturnValue([
{
flowId: "flow-orphaned",
shape: "linear",
ownerSessionKey: "agent:main:main",
status: "waiting",
notifyPolicy: "done_only",
goal: "Process PRs",
waitingOnTaskId: "task-wait-missing",
createdAt: 10,
updatedAt: 20,
},
{
flowId: "flow-blocked",
shape: "single_task",
ownerSessionKey: "agent:main:main",
status: "blocked",
notifyPolicy: "done_only",
goal: "Patch file",
blockedTaskId: "task-missing",
createdAt: 10,
updatedAt: 20,
},
]);
mocks.listTasksForFlowId.mockImplementation((flowId: string) => {
if (flowId === "flow-blocked") {
return [{ taskId: "task-other" }];
}
return [];
});
noteWorkspaceStatus({});
try {
const recoveryCalls = noteSpy.mock.calls.filter(([, title]) => title === "ClawFlow recovery");
expect(recoveryCalls).toHaveLength(1);
const body = String(recoveryCalls[0]?.[0]);
expect(body).toContain(
"flow-orphaned: waiting flow points at missing task task-wait-missing",
);
expect(body).toContain("flow-blocked: blocked flow points at missing task task-missing");
expect(body).toContain("openclaw flows show <flow-id>");
expect(body).toContain("openclaw flows cancel <flow-id>");
} finally {
noteSpy.mockRestore();
}
});
});

View File

@ -1,65 +1,10 @@
import { resolveAgentWorkspaceDir, resolveDefaultAgentId } from "../agents/agent-scope.js";
import { buildWorkspaceSkillStatus } from "../agents/skills-status.js";
import { formatCliCommand } from "../cli/command-format.js";
import type { OpenClawConfig } from "../config/config.js";
import { buildPluginCompatibilityWarnings, buildPluginStatusReport } from "../plugins/status.js";
import { listFlowRecords } from "../tasks/flow-registry.js";
import { listTasksForFlowId } from "../tasks/task-registry.js";
import { note } from "../terminal/note.js";
import { detectLegacyWorkspaceDirs, formatLegacyWorkspaceWarning } from "./doctor-workspace.js";
function noteFlowRecoveryHints() {
const suspicious = listFlowRecords().flatMap((flow) => {
const tasks = listTasksForFlowId(flow.flowId);
const findings: string[] = [];
const missingWaitingTask =
flow.shape === "linear" &&
flow.status === "waiting" &&
flow.waitingOnTaskId &&
!tasks.some((task) => task.taskId === flow.waitingOnTaskId);
const missingBlockedTask =
flow.status === "blocked" &&
flow.blockedTaskId &&
!tasks.some((task) => task.taskId === flow.blockedTaskId);
if (
flow.shape === "linear" &&
(flow.status === "running" || flow.status === "waiting" || flow.status === "blocked") &&
tasks.length === 0 &&
!missingWaitingTask &&
!missingBlockedTask
) {
findings.push(
`${flow.flowId}: ${flow.status} linear flow has no linked tasks; inspect or cancel it manually.`,
);
}
if (missingWaitingTask) {
findings.push(
`${flow.flowId}: waiting flow points at missing task ${flow.waitingOnTaskId}; inspect or cancel it manually.`,
);
}
if (missingBlockedTask) {
findings.push(
`${flow.flowId}: blocked flow points at missing task ${flow.blockedTaskId}; inspect before retrying.`,
);
}
return findings;
});
if (suspicious.length === 0) {
return;
}
note(
[
...suspicious.slice(0, 5),
suspicious.length > 5 ? `...and ${suspicious.length - 5} more.` : null,
`Inspect: ${formatCliCommand("openclaw flows show <flow-id>")}`,
`Cancel: ${formatCliCommand("openclaw flows cancel <flow-id>")}`,
]
.filter((line): line is string => Boolean(line))
.join("\n"),
"ClawFlow recovery",
);
}
export function noteWorkspaceStatus(cfg: OpenClawConfig) {
const workspaceDir = resolveAgentWorkspaceDir(cfg, resolveDefaultAgentId(cfg));
const legacyWorkspace = detectLegacyWorkspaceDirs({ workspaceDir });
@ -129,7 +74,5 @@ export function noteWorkspaceStatus(cfg: OpenClawConfig) {
note(lines.join("\n"), "Plugin diagnostics");
}
noteFlowRecoveryHints();
return { workspaceDir };
}

View File

@ -1,160 +0,0 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import { createCliRuntimeCapture } from "../cli/test-runtime-capture.js";
import { flowsCancelCommand, flowsListCommand, flowsShowCommand } from "./flows.js";
const mocks = vi.hoisted(() => ({
listFlowRecordsMock: vi.fn(),
resolveFlowForLookupTokenMock: vi.fn(),
getFlowByIdMock: vi.fn(),
listTasksForFlowIdMock: vi.fn(),
getFlowTaskSummaryMock: vi.fn(),
cancelFlowByIdMock: vi.fn(),
loadConfigMock: vi.fn(() => ({ loaded: true })),
}));
vi.mock("../tasks/flow-registry.js", () => ({
listFlowRecords: (...args: unknown[]) => mocks.listFlowRecordsMock(...args),
resolveFlowForLookupToken: (...args: unknown[]) => mocks.resolveFlowForLookupTokenMock(...args),
getFlowById: (...args: unknown[]) => mocks.getFlowByIdMock(...args),
}));
vi.mock("../tasks/task-registry.js", () => ({
listTasksForFlowId: (...args: unknown[]) => mocks.listTasksForFlowIdMock(...args),
}));
vi.mock("../tasks/task-executor.js", () => ({
getFlowTaskSummary: (...args: unknown[]) => mocks.getFlowTaskSummaryMock(...args),
cancelFlowById: (...args: unknown[]) => mocks.cancelFlowByIdMock(...args),
}));
vi.mock("../config/config.js", () => ({
loadConfig: () => mocks.loadConfigMock(),
}));
const {
defaultRuntime: runtime,
runtimeLogs,
runtimeErrors,
resetRuntimeCapture,
} = createCliRuntimeCapture();
const flowFixture = {
flowId: "flow-12345678",
shape: "linear",
ownerSessionKey: "agent:main:main",
status: "waiting",
notifyPolicy: "done_only",
goal: "Process related PRs",
currentStep: "wait_for",
waitingOnTaskId: "task-12345678",
outputs: {
bucket: ["personal"],
},
createdAt: Date.parse("2026-03-31T10:00:00.000Z"),
updatedAt: Date.parse("2026-03-31T10:05:00.000Z"),
} as const;
const taskSummaryFixture = {
total: 2,
active: 1,
terminal: 1,
failures: 0,
byStatus: {
queued: 0,
running: 1,
succeeded: 1,
failed: 0,
timed_out: 0,
cancelled: 0,
lost: 0,
},
byRuntime: {
subagent: 1,
acp: 1,
cli: 0,
cron: 0,
},
} as const;
const taskFixture = {
taskId: "task-12345678",
runtime: "acp",
requesterSessionKey: "agent:main:main",
parentFlowId: "flow-12345678",
childSessionKey: "agent:codex:acp:child",
runId: "run-12345678",
task: "Review PR",
status: "running",
deliveryStatus: "pending",
notifyPolicy: "done_only",
createdAt: Date.parse("2026-03-31T10:00:00.000Z"),
lastEventAt: Date.parse("2026-03-31T10:05:00.000Z"),
} as const;
describe("flows commands", () => {
beforeEach(() => {
vi.clearAllMocks();
resetRuntimeCapture();
mocks.listFlowRecordsMock.mockReturnValue([]);
mocks.resolveFlowForLookupTokenMock.mockReturnValue(undefined);
mocks.getFlowByIdMock.mockReturnValue(undefined);
mocks.listTasksForFlowIdMock.mockReturnValue([]);
mocks.getFlowTaskSummaryMock.mockReturnValue(taskSummaryFixture);
mocks.cancelFlowByIdMock.mockResolvedValue({
found: false,
cancelled: false,
reason: "missing",
});
});
it("lists flow rows with task summary counts", async () => {
mocks.listFlowRecordsMock.mockReturnValue([flowFixture]);
await flowsListCommand({}, runtime);
expect(runtimeLogs[0]).toContain("Flows: 1");
expect(runtimeLogs[1]).toContain("Flow pressure: 1 active · 0 blocked · 1 total");
expect(runtimeLogs.join("\n")).toContain("Process related PRs");
expect(runtimeLogs.join("\n")).toContain("1 active/2 total");
});
it("shows one flow with linked tasks", async () => {
mocks.resolveFlowForLookupTokenMock.mockReturnValue(flowFixture);
mocks.listTasksForFlowIdMock.mockReturnValue([taskFixture]);
await flowsShowCommand({ lookup: "flow-12345678" }, runtime);
expect(runtimeLogs.join("\n")).toContain("shape: linear");
expect(runtimeLogs.join("\n")).toContain("currentStep: wait_for");
expect(runtimeLogs.join("\n")).toContain("waitingOnTaskId: task-12345678");
expect(runtimeLogs.join("\n")).toContain("outputKeys: bucket");
expect(runtimeLogs.join("\n")).toContain("tasks: 2 total · 1 active · 0 issues");
expect(runtimeLogs.join("\n")).toContain("task-12345678 running run-12345678 Review PR");
});
it("cancels a flow and reports the updated state", async () => {
mocks.resolveFlowForLookupTokenMock.mockReturnValue(flowFixture);
mocks.cancelFlowByIdMock.mockResolvedValue({
found: true,
cancelled: true,
flow: {
...flowFixture,
status: "cancelled",
},
});
mocks.getFlowByIdMock.mockReturnValue({
...flowFixture,
status: "cancelled",
});
await flowsCancelCommand({ lookup: "flow-12345678" }, runtime);
expect(mocks.loadConfigMock).toHaveBeenCalled();
expect(mocks.cancelFlowByIdMock).toHaveBeenCalledWith({
cfg: { loaded: true },
flowId: "flow-12345678",
});
expect(runtimeLogs[0]).toContain("Cancelled flow-12345678 (linear) with status cancelled.");
expect(runtimeErrors).toEqual([]);
});
});

View File

@ -1,219 +0,0 @@
import { loadConfig } from "../config/config.js";
import { info } from "../globals.js";
import type { RuntimeEnv } from "../runtime.js";
import { getFlowById, listFlowRecords, resolveFlowForLookupToken } from "../tasks/flow-registry.js";
import type { FlowRecord, FlowStatus } from "../tasks/flow-registry.types.js";
import { cancelFlowById, getFlowTaskSummary } from "../tasks/task-executor.js";
import { listTasksForFlowId } from "../tasks/task-registry.js";
import { isRich, theme } from "../terminal/theme.js";
const ID_PAD = 10;
const STATUS_PAD = 10;
const SHAPE_PAD = 12;
function truncate(value: string, maxChars: number) {
if (value.length <= maxChars) {
return value;
}
if (maxChars <= 1) {
return value.slice(0, maxChars);
}
return `${value.slice(0, maxChars - 1)}`;
}
function shortToken(value: string | undefined, maxChars = ID_PAD): string {
const trimmed = value?.trim();
if (!trimmed) {
return "n/a";
}
return truncate(trimmed, maxChars);
}
function formatFlowStatusCell(status: FlowStatus, rich: boolean) {
const padded = status.padEnd(STATUS_PAD);
if (!rich) {
return padded;
}
if (status === "succeeded") {
return theme.success(padded);
}
if (status === "failed" || status === "lost") {
return theme.error(padded);
}
if (status === "running") {
return theme.accentBright(padded);
}
if (status === "blocked") {
return theme.warn(padded);
}
return theme.muted(padded);
}
function formatFlowRows(flows: FlowRecord[], rich: boolean) {
const header = [
"Flow".padEnd(ID_PAD),
"Shape".padEnd(SHAPE_PAD),
"Status".padEnd(STATUS_PAD),
"Owner".padEnd(24),
"Tasks".padEnd(14),
"Goal",
].join(" ");
const lines = [rich ? theme.heading(header) : header];
for (const flow of flows) {
const taskSummary = getFlowTaskSummary(flow.flowId);
const counts = `${taskSummary.active} active/${taskSummary.total} total`;
lines.push(
[
shortToken(flow.flowId).padEnd(ID_PAD),
flow.shape.padEnd(SHAPE_PAD),
formatFlowStatusCell(flow.status, rich),
truncate(flow.ownerSessionKey, 24).padEnd(24),
counts.padEnd(14),
truncate(flow.goal, 80),
].join(" "),
);
}
return lines;
}
function formatFlowListSummary(flows: FlowRecord[]) {
const active = flows.filter(
(flow) => flow.status === "queued" || flow.status === "running" || flow.status === "waiting",
).length;
const blocked = flows.filter((flow) => flow.status === "blocked").length;
return `${active} active · ${blocked} blocked · ${flows.length} total`;
}
export async function flowsListCommand(
opts: { json?: boolean; status?: string },
runtime: RuntimeEnv,
) {
const statusFilter = opts.status?.trim();
const flows = listFlowRecords().filter((flow) => {
if (statusFilter && flow.status !== statusFilter) {
return false;
}
return true;
});
if (opts.json) {
runtime.log(
JSON.stringify(
{
count: flows.length,
status: statusFilter ?? null,
flows: flows.map((flow) => ({
...flow,
tasks: listTasksForFlowId(flow.flowId),
taskSummary: getFlowTaskSummary(flow.flowId),
})),
},
null,
2,
),
);
return;
}
runtime.log(info(`Flows: ${flows.length}`));
runtime.log(info(`Flow pressure: ${formatFlowListSummary(flows)}`));
if (statusFilter) {
runtime.log(info(`Status filter: ${statusFilter}`));
}
if (flows.length === 0) {
runtime.log("No flows found.");
return;
}
const rich = isRich();
for (const line of formatFlowRows(flows, rich)) {
runtime.log(line);
}
}
export async function flowsShowCommand(
opts: { json?: boolean; lookup: string },
runtime: RuntimeEnv,
) {
const flow = resolveFlowForLookupToken(opts.lookup);
if (!flow) {
runtime.error(`Flow not found: ${opts.lookup}`);
runtime.exit(1);
return;
}
const tasks = listTasksForFlowId(flow.flowId);
const taskSummary = getFlowTaskSummary(flow.flowId);
if (opts.json) {
runtime.log(
JSON.stringify(
{
...flow,
tasks,
taskSummary,
},
null,
2,
),
);
return;
}
const lines = [
"Flow:",
`flowId: ${flow.flowId}`,
`shape: ${flow.shape}`,
`status: ${flow.status}`,
`notify: ${flow.notifyPolicy}`,
`ownerSessionKey: ${flow.ownerSessionKey}`,
`goal: ${flow.goal}`,
`currentStep: ${flow.currentStep ?? "n/a"}`,
`waitingOnTaskId: ${flow.waitingOnTaskId ?? "n/a"}`,
`outputKeys: ${
flow.outputs ? Object.keys(flow.outputs).toSorted().join(", ") || "n/a" : "n/a"
}`,
`blockedTaskId: ${flow.blockedTaskId ?? "n/a"}`,
`blockedSummary: ${flow.blockedSummary ?? "n/a"}`,
`createdAt: ${new Date(flow.createdAt).toISOString()}`,
`updatedAt: ${new Date(flow.updatedAt).toISOString()}`,
`endedAt: ${flow.endedAt ? new Date(flow.endedAt).toISOString() : "n/a"}`,
`tasks: ${taskSummary.total} total · ${taskSummary.active} active · ${taskSummary.failures} issues`,
];
for (const line of lines) {
runtime.log(line);
}
if (tasks.length === 0) {
runtime.log("Linked tasks: none");
return;
}
runtime.log("Linked tasks:");
for (const task of tasks) {
runtime.log(
`- ${task.taskId} ${task.status} ${task.runId ?? "n/a"} ${task.label ?? task.task}`,
);
}
}
export async function flowsCancelCommand(opts: { lookup: string }, runtime: RuntimeEnv) {
const flow = resolveFlowForLookupToken(opts.lookup);
if (!flow) {
runtime.error(`Flow not found: ${opts.lookup}`);
runtime.exit(1);
return;
}
const result = await cancelFlowById({
cfg: loadConfig(),
flowId: flow.flowId,
});
if (!result.found) {
runtime.error(result.reason ?? `Flow not found: ${opts.lookup}`);
runtime.exit(1);
return;
}
if (!result.cancelled) {
runtime.error(result.reason ?? `Could not cancel flow: ${opts.lookup}`);
runtime.exit(1);
return;
}
const updated = getFlowById(flow.flowId) ?? result.flow ?? flow;
runtime.log(`Cancelled ${updated.flowId} (${updated.shape}) with status ${updated.status}.`);
}

View File

@ -1,10 +0,0 @@
import path from "node:path";
import { resolveTaskStateDir } from "./task-registry.paths.js";
export function resolveFlowRegistryDir(env: NodeJS.ProcessEnv = process.env): string {
return path.join(resolveTaskStateDir(env), "flows");
}
export function resolveFlowRegistrySqlitePath(env: NodeJS.ProcessEnv = process.env): string {
return path.join(resolveFlowRegistryDir(env), "registry.sqlite");
}

View File

@ -1,318 +0,0 @@
import { chmodSync, existsSync, mkdirSync } from "node:fs";
import type { DatabaseSync, StatementSync } from "node:sqlite";
import { requireNodeSqlite } from "../infra/node-sqlite.js";
import type { DeliveryContext } from "../utils/delivery-context.js";
import { resolveFlowRegistryDir, resolveFlowRegistrySqlitePath } from "./flow-registry.paths.js";
import type { FlowRegistryStoreSnapshot } from "./flow-registry.store.js";
import type { FlowOutputBag, FlowRecord, FlowShape } from "./flow-registry.types.js";
type FlowRegistryRow = {
flow_id: string;
shape: FlowShape | null;
owner_session_key: string;
requester_origin_json: string | null;
status: FlowRecord["status"];
notify_policy: FlowRecord["notifyPolicy"];
goal: string;
current_step: string | null;
waiting_on_task_id: string | null;
outputs_json: string | null;
blocked_task_id: string | null;
blocked_summary: string | null;
created_at: number | bigint;
updated_at: number | bigint;
ended_at: number | bigint | null;
};
type FlowRegistryStatements = {
selectAll: StatementSync;
upsertRow: StatementSync;
deleteRow: StatementSync;
clearRows: StatementSync;
};
type FlowRegistryDatabase = {
db: DatabaseSync;
path: string;
statements: FlowRegistryStatements;
};
let cachedDatabase: FlowRegistryDatabase | null = null;
const FLOW_REGISTRY_DIR_MODE = 0o700;
const FLOW_REGISTRY_FILE_MODE = 0o600;
const FLOW_REGISTRY_SIDECAR_SUFFIXES = ["", "-shm", "-wal"] as const;
function normalizeNumber(value: number | bigint | null): number | undefined {
if (typeof value === "bigint") {
return Number(value);
}
return typeof value === "number" ? value : undefined;
}
function serializeJson(value: unknown): string | null {
return value == null ? null : JSON.stringify(value);
}
function parseJsonValue<T>(raw: string | null): T | undefined {
if (!raw?.trim()) {
return undefined;
}
try {
return JSON.parse(raw) as T;
} catch {
return undefined;
}
}
function rowToFlowRecord(row: FlowRegistryRow): FlowRecord {
const endedAt = normalizeNumber(row.ended_at);
const requesterOrigin = parseJsonValue<DeliveryContext>(row.requester_origin_json);
const outputs = parseJsonValue<FlowOutputBag>(row.outputs_json);
return {
flowId: row.flow_id,
shape: row.shape === "linear" ? "linear" : "single_task",
ownerSessionKey: row.owner_session_key,
...(requesterOrigin ? { requesterOrigin } : {}),
status: row.status,
notifyPolicy: row.notify_policy,
goal: row.goal,
...(row.current_step ? { currentStep: row.current_step } : {}),
...(row.waiting_on_task_id ? { waitingOnTaskId: row.waiting_on_task_id } : {}),
...(outputs ? { outputs } : {}),
...(row.blocked_task_id ? { blockedTaskId: row.blocked_task_id } : {}),
...(row.blocked_summary ? { blockedSummary: row.blocked_summary } : {}),
createdAt: normalizeNumber(row.created_at) ?? 0,
updatedAt: normalizeNumber(row.updated_at) ?? 0,
...(endedAt != null ? { endedAt } : {}),
};
}
function bindFlowRecord(record: FlowRecord) {
return {
flow_id: record.flowId,
shape: record.shape,
owner_session_key: record.ownerSessionKey,
requester_origin_json: serializeJson(record.requesterOrigin),
status: record.status,
notify_policy: record.notifyPolicy,
goal: record.goal,
current_step: record.currentStep ?? null,
waiting_on_task_id: record.waitingOnTaskId ?? null,
outputs_json: serializeJson(record.outputs),
blocked_task_id: record.blockedTaskId ?? null,
blocked_summary: record.blockedSummary ?? null,
created_at: record.createdAt,
updated_at: record.updatedAt,
ended_at: record.endedAt ?? null,
};
}
function createStatements(db: DatabaseSync): FlowRegistryStatements {
return {
selectAll: db.prepare(`
SELECT
flow_id,
shape,
owner_session_key,
requester_origin_json,
status,
notify_policy,
goal,
current_step,
waiting_on_task_id,
outputs_json,
blocked_task_id,
blocked_summary,
created_at,
updated_at,
ended_at
FROM flow_runs
ORDER BY created_at ASC, flow_id ASC
`),
upsertRow: db.prepare(`
INSERT INTO flow_runs (
flow_id,
shape,
owner_session_key,
requester_origin_json,
status,
notify_policy,
goal,
current_step,
waiting_on_task_id,
outputs_json,
blocked_task_id,
blocked_summary,
created_at,
updated_at,
ended_at
) VALUES (
@flow_id,
@shape,
@owner_session_key,
@requester_origin_json,
@status,
@notify_policy,
@goal,
@current_step,
@waiting_on_task_id,
@outputs_json,
@blocked_task_id,
@blocked_summary,
@created_at,
@updated_at,
@ended_at
)
ON CONFLICT(flow_id) DO UPDATE SET
shape = excluded.shape,
owner_session_key = excluded.owner_session_key,
requester_origin_json = excluded.requester_origin_json,
status = excluded.status,
notify_policy = excluded.notify_policy,
goal = excluded.goal,
current_step = excluded.current_step,
waiting_on_task_id = excluded.waiting_on_task_id,
outputs_json = excluded.outputs_json,
blocked_task_id = excluded.blocked_task_id,
blocked_summary = excluded.blocked_summary,
created_at = excluded.created_at,
updated_at = excluded.updated_at,
ended_at = excluded.ended_at
`),
deleteRow: db.prepare(`DELETE FROM flow_runs WHERE flow_id = ?`),
clearRows: db.prepare(`DELETE FROM flow_runs`),
};
}
function ensureSchema(db: DatabaseSync) {
db.exec(`
CREATE TABLE IF NOT EXISTS flow_runs (
flow_id TEXT PRIMARY KEY,
shape TEXT NOT NULL,
owner_session_key TEXT NOT NULL,
requester_origin_json TEXT,
status TEXT NOT NULL,
notify_policy TEXT NOT NULL,
goal TEXT NOT NULL,
current_step TEXT,
waiting_on_task_id TEXT,
outputs_json TEXT,
blocked_task_id TEXT,
blocked_summary TEXT,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
ended_at INTEGER
);
`);
ensureColumn(db, "flow_runs", "shape", "TEXT");
ensureColumn(db, "flow_runs", "waiting_on_task_id", "TEXT");
ensureColumn(db, "flow_runs", "outputs_json", "TEXT");
ensureColumn(db, "flow_runs", "blocked_task_id", "TEXT");
ensureColumn(db, "flow_runs", "blocked_summary", "TEXT");
db.exec(`CREATE INDEX IF NOT EXISTS idx_flow_runs_status ON flow_runs(status);`);
db.exec(
`CREATE INDEX IF NOT EXISTS idx_flow_runs_owner_session_key ON flow_runs(owner_session_key);`,
);
db.exec(`CREATE INDEX IF NOT EXISTS idx_flow_runs_updated_at ON flow_runs(updated_at);`);
}
function ensureColumn(
db: DatabaseSync,
tableName: string,
columnName: string,
columnDefinition: string,
) {
const rows = db.prepare(`PRAGMA table_info(${tableName})`).all() as Array<{ name?: string }>;
if (rows.some((row) => row.name === columnName)) {
return;
}
db.exec(`ALTER TABLE ${tableName} ADD COLUMN ${columnName} ${columnDefinition};`);
}
function ensureFlowRegistryPermissions(pathname: string) {
const dir = resolveFlowRegistryDir(process.env);
mkdirSync(dir, { recursive: true, mode: FLOW_REGISTRY_DIR_MODE });
chmodSync(dir, FLOW_REGISTRY_DIR_MODE);
for (const suffix of FLOW_REGISTRY_SIDECAR_SUFFIXES) {
const candidate = `${pathname}${suffix}`;
if (!existsSync(candidate)) {
continue;
}
chmodSync(candidate, FLOW_REGISTRY_FILE_MODE);
}
}
function openFlowRegistryDatabase(): FlowRegistryDatabase {
const pathname = resolveFlowRegistrySqlitePath(process.env);
if (cachedDatabase && cachedDatabase.path === pathname) {
return cachedDatabase;
}
if (cachedDatabase) {
cachedDatabase.db.close();
cachedDatabase = null;
}
ensureFlowRegistryPermissions(pathname);
const { DatabaseSync } = requireNodeSqlite();
const db = new DatabaseSync(pathname);
db.exec(`PRAGMA journal_mode = WAL;`);
db.exec(`PRAGMA synchronous = NORMAL;`);
db.exec(`PRAGMA busy_timeout = 5000;`);
ensureSchema(db);
ensureFlowRegistryPermissions(pathname);
cachedDatabase = {
db,
path: pathname,
statements: createStatements(db),
};
return cachedDatabase;
}
function withWriteTransaction(write: (statements: FlowRegistryStatements) => void) {
const { db, path, statements } = openFlowRegistryDatabase();
db.exec("BEGIN IMMEDIATE");
try {
write(statements);
db.exec("COMMIT");
ensureFlowRegistryPermissions(path);
} catch (error) {
db.exec("ROLLBACK");
throw error;
}
}
export function loadFlowRegistryStateFromSqlite(): FlowRegistryStoreSnapshot {
const { statements } = openFlowRegistryDatabase();
const rows = statements.selectAll.all() as FlowRegistryRow[];
return {
flows: new Map(rows.map((row) => [row.flow_id, rowToFlowRecord(row)])),
};
}
export function saveFlowRegistryStateToSqlite(snapshot: FlowRegistryStoreSnapshot) {
withWriteTransaction((statements) => {
statements.clearRows.run();
for (const flow of snapshot.flows.values()) {
statements.upsertRow.run(bindFlowRecord(flow));
}
});
}
export function upsertFlowRegistryRecordToSqlite(flow: FlowRecord) {
const store = openFlowRegistryDatabase();
store.statements.upsertRow.run(bindFlowRecord(flow));
ensureFlowRegistryPermissions(store.path);
}
export function deleteFlowRegistryRecordFromSqlite(flowId: string) {
const store = openFlowRegistryDatabase();
store.statements.deleteRow.run(flowId);
ensureFlowRegistryPermissions(store.path);
}
export function closeFlowRegistrySqliteStore() {
if (!cachedDatabase) {
return;
}
cachedDatabase.db.close();
cachedDatabase = null;
}

View File

@ -1,148 +0,0 @@
import { statSync } from "node:fs";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { withTempDir } from "../test-helpers/temp-dir.js";
import { createFlowRecord, getFlowById, resetFlowRegistryForTests } from "./flow-registry.js";
import { resolveFlowRegistryDir, resolveFlowRegistrySqlitePath } from "./flow-registry.paths.js";
import { configureFlowRegistryRuntime } from "./flow-registry.store.js";
import type { FlowRecord } from "./flow-registry.types.js";
function createStoredFlow(): FlowRecord {
return {
flowId: "flow-restored",
shape: "linear",
ownerSessionKey: "agent:main:main",
status: "blocked",
notifyPolicy: "done_only",
goal: "Restored flow",
currentStep: "spawn_task",
waitingOnTaskId: "task-waiting",
outputs: {
bucket: ["business"],
},
blockedTaskId: "task-restored",
blockedSummary: "Writable session required.",
createdAt: 100,
updatedAt: 100,
endedAt: 120,
};
}
async function withFlowRegistryTempDir<T>(run: (root: string) => Promise<T>): Promise<T> {
return await withTempDir({ prefix: "openclaw-flow-store-" }, async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetFlowRegistryForTests();
try {
return await run(root);
} finally {
// Close the sqlite-backed registry before Windows temp-dir cleanup removes the store root.
resetFlowRegistryForTests();
}
});
}
describe("flow-registry store runtime", () => {
beforeEach(() => {
vi.useRealTimers();
});
afterEach(() => {
vi.useRealTimers();
delete process.env.OPENCLAW_STATE_DIR;
resetFlowRegistryForTests();
});
it("uses the configured flow store for restore and save", () => {
const storedFlow = createStoredFlow();
const loadSnapshot = vi.fn(() => ({
flows: new Map([[storedFlow.flowId, storedFlow]]),
}));
const saveSnapshot = vi.fn();
configureFlowRegistryRuntime({
store: {
loadSnapshot,
saveSnapshot,
},
});
expect(getFlowById("flow-restored")).toMatchObject({
flowId: "flow-restored",
shape: "linear",
goal: "Restored flow",
waitingOnTaskId: "task-waiting",
outputs: {
bucket: ["business"],
},
blockedTaskId: "task-restored",
blockedSummary: "Writable session required.",
});
expect(loadSnapshot).toHaveBeenCalledTimes(1);
createFlowRecord({
ownerSessionKey: "agent:main:main",
goal: "New flow",
status: "running",
currentStep: "wait_for",
});
expect(saveSnapshot).toHaveBeenCalled();
const latestSnapshot = saveSnapshot.mock.calls.at(-1)?.[0] as {
flows: ReadonlyMap<string, FlowRecord>;
};
expect(latestSnapshot.flows.size).toBe(2);
expect(latestSnapshot.flows.get("flow-restored")?.goal).toBe("Restored flow");
});
it("restores persisted flows from the default sqlite store", async () => {
await withFlowRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetFlowRegistryForTests();
const created = createFlowRecord({
ownerSessionKey: "agent:main:main",
goal: "Persisted flow",
status: "waiting",
currentStep: "ask_user",
waitingOnTaskId: "task-restored",
outputs: {
bucket: ["personal"],
},
});
resetFlowRegistryForTests({ persist: false });
expect(getFlowById(created.flowId)).toMatchObject({
flowId: created.flowId,
shape: "linear",
status: "waiting",
currentStep: "ask_user",
waitingOnTaskId: "task-restored",
outputs: {
bucket: ["personal"],
},
});
});
});
it("hardens the sqlite flow store directory and file modes", async () => {
if (process.platform === "win32") {
return;
}
await withFlowRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetFlowRegistryForTests();
createFlowRecord({
ownerSessionKey: "agent:main:main",
goal: "Secured flow",
status: "blocked",
blockedTaskId: "task-secured",
blockedSummary: "Need auth.",
});
const registryDir = resolveFlowRegistryDir(process.env);
const sqlitePath = resolveFlowRegistrySqlitePath(process.env);
expect(statSync(registryDir).mode & 0o777).toBe(0o700);
expect(statSync(sqlitePath).mode & 0o777).toBe(0o600);
});
});
});

View File

@ -1,45 +0,0 @@
import {
closeFlowRegistrySqliteStore,
deleteFlowRegistryRecordFromSqlite,
loadFlowRegistryStateFromSqlite,
saveFlowRegistryStateToSqlite,
upsertFlowRegistryRecordToSqlite,
} from "./flow-registry.store.sqlite.js";
import type { FlowRecord } from "./flow-registry.types.js";
export type FlowRegistryStoreSnapshot = {
flows: Map<string, FlowRecord>;
};
export type FlowRegistryStore = {
loadSnapshot: () => FlowRegistryStoreSnapshot;
saveSnapshot: (snapshot: FlowRegistryStoreSnapshot) => void;
upsertFlow?: (flow: FlowRecord) => void;
deleteFlow?: (flowId: string) => void;
close?: () => void;
};
const defaultFlowRegistryStore: FlowRegistryStore = {
loadSnapshot: loadFlowRegistryStateFromSqlite,
saveSnapshot: saveFlowRegistryStateToSqlite,
upsertFlow: upsertFlowRegistryRecordToSqlite,
deleteFlow: deleteFlowRegistryRecordFromSqlite,
close: closeFlowRegistrySqliteStore,
};
let configuredFlowRegistryStore: FlowRegistryStore = defaultFlowRegistryStore;
export function getFlowRegistryStore(): FlowRegistryStore {
return configuredFlowRegistryStore;
}
export function configureFlowRegistryRuntime(params: { store?: FlowRegistryStore }) {
if (params.store) {
configuredFlowRegistryStore = params.store;
}
}
export function resetFlowRegistryRuntimeForTests() {
configuredFlowRegistryStore.close?.();
configuredFlowRegistryStore = defaultFlowRegistryStore;
}

View File

@ -1,256 +0,0 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { withTempDir } from "../test-helpers/temp-dir.js";
import {
createFlowRecord,
deleteFlowRecordById,
getFlowById,
listFlowRecords,
resetFlowRegistryForTests,
syncFlowFromTask,
updateFlowRecordById,
} from "./flow-registry.js";
const ORIGINAL_STATE_DIR = process.env.OPENCLAW_STATE_DIR;
async function withFlowRegistryTempDir<T>(run: (root: string) => Promise<T>): Promise<T> {
return await withTempDir({ prefix: "openclaw-flow-registry-" }, async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetFlowRegistryForTests();
try {
return await run(root);
} finally {
// Close the sqlite-backed registry before Windows temp-dir cleanup removes the store root.
resetFlowRegistryForTests();
}
});
}
describe("flow-registry", () => {
beforeEach(() => {
vi.useRealTimers();
});
afterEach(() => {
vi.useRealTimers();
if (ORIGINAL_STATE_DIR === undefined) {
delete process.env.OPENCLAW_STATE_DIR;
} else {
process.env.OPENCLAW_STATE_DIR = ORIGINAL_STATE_DIR;
}
resetFlowRegistryForTests();
});
it("creates, updates, lists, and deletes flow records", async () => {
await withFlowRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetFlowRegistryForTests();
const created = createFlowRecord({
ownerSessionKey: "agent:main:main",
goal: "Investigate flaky test",
status: "running",
currentStep: "spawn_task",
});
expect(getFlowById(created.flowId)).toMatchObject({
flowId: created.flowId,
status: "running",
currentStep: "spawn_task",
});
const updated = updateFlowRecordById(created.flowId, {
status: "waiting",
currentStep: "ask_user",
waitingOnTaskId: "task-123",
outputs: {
bucket: ["personal"],
},
});
expect(updated).toMatchObject({
flowId: created.flowId,
status: "waiting",
currentStep: "ask_user",
waitingOnTaskId: "task-123",
outputs: {
bucket: ["personal"],
},
});
expect(listFlowRecords()).toEqual([
expect.objectContaining({
flowId: created.flowId,
goal: "Investigate flaky test",
status: "waiting",
}),
]);
expect(deleteFlowRecordById(created.flowId)).toBe(true);
expect(getFlowById(created.flowId)).toBeUndefined();
expect(listFlowRecords()).toEqual([]);
});
});
it("lists newest flows first", async () => {
await withFlowRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetFlowRegistryForTests();
const earlier = createFlowRecord({
ownerSessionKey: "agent:main:main",
goal: "First flow",
createdAt: 100,
updatedAt: 100,
});
const later = createFlowRecord({
ownerSessionKey: "agent:main:main",
goal: "Second flow",
createdAt: 200,
updatedAt: 200,
});
expect(listFlowRecords().map((flow) => flow.flowId)).toEqual([later.flowId, earlier.flowId]);
});
});
it("applies minimal defaults for new flow records", async () => {
await withFlowRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetFlowRegistryForTests();
const created = createFlowRecord({
ownerSessionKey: "agent:main:main",
goal: "Background job",
});
expect(created).toMatchObject({
flowId: expect.any(String),
shape: "linear",
ownerSessionKey: "agent:main:main",
goal: "Background job",
status: "queued",
notifyPolicy: "done_only",
});
expect(created.currentStep).toBeUndefined();
expect(created.endedAt).toBeUndefined();
});
});
it("preserves endedAt when later updates change other flow fields", async () => {
await withFlowRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetFlowRegistryForTests();
const created = createFlowRecord({
ownerSessionKey: "agent:main:main",
goal: "Finish a task",
status: "succeeded",
endedAt: 456,
});
const updated = updateFlowRecordById(created.flowId, {
currentStep: "finish",
});
expect(updated).toMatchObject({
flowId: created.flowId,
currentStep: "finish",
endedAt: 456,
});
expect(getFlowById(created.flowId)).toMatchObject({
flowId: created.flowId,
endedAt: 456,
});
});
});
it("stores blocked metadata and clears it when a later task resumes the same flow", async () => {
await withFlowRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetFlowRegistryForTests();
const created = createFlowRecord({
shape: "single_task",
ownerSessionKey: "agent:main:main",
goal: "Fix permissions",
status: "running",
});
const blocked = syncFlowFromTask({
taskId: "task-blocked",
parentFlowId: created.flowId,
status: "succeeded",
terminalOutcome: "blocked",
notifyPolicy: "done_only",
label: "Fix permissions",
task: "Fix permissions",
lastEventAt: 200,
endedAt: 200,
terminalSummary: "Writable session required.",
});
expect(blocked).toMatchObject({
flowId: created.flowId,
status: "blocked",
blockedTaskId: "task-blocked",
blockedSummary: "Writable session required.",
endedAt: 200,
});
const resumed = syncFlowFromTask({
taskId: "task-retry",
parentFlowId: created.flowId,
status: "running",
notifyPolicy: "done_only",
label: "Fix permissions",
task: "Fix permissions",
lastEventAt: 260,
progressSummary: "Retrying with writable session",
});
expect(resumed).toMatchObject({
flowId: created.flowId,
status: "running",
});
expect(resumed?.blockedTaskId).toBeUndefined();
expect(resumed?.blockedSummary).toBeUndefined();
expect(resumed?.endedAt).toBeUndefined();
});
});
it("does not auto-sync linear flow state from linked child tasks", async () => {
await withFlowRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetFlowRegistryForTests();
const created = createFlowRecord({
ownerSessionKey: "agent:main:main",
goal: "Cluster PRs",
status: "waiting",
currentStep: "wait_for",
});
const synced = syncFlowFromTask({
taskId: "task-child",
parentFlowId: created.flowId,
status: "running",
notifyPolicy: "done_only",
label: "Child task",
task: "Child task",
lastEventAt: 250,
progressSummary: "Running child task",
});
expect(synced).toMatchObject({
flowId: created.flowId,
shape: "linear",
status: "waiting",
currentStep: "wait_for",
});
expect(getFlowById(created.flowId)).toMatchObject({
flowId: created.flowId,
status: "waiting",
currentStep: "wait_for",
});
});
});
});

View File

@ -1,349 +0,0 @@
import crypto from "node:crypto";
import { getFlowRegistryStore, resetFlowRegistryRuntimeForTests } from "./flow-registry.store.js";
import type { FlowOutputBag, FlowRecord, FlowShape, FlowStatus } from "./flow-registry.types.js";
import type { TaskNotifyPolicy, TaskRecord } from "./task-registry.types.js";
const flows = new Map<string, FlowRecord>();
let restoreAttempted = false;
function cloneFlowOutputs(outputs: FlowOutputBag | undefined): FlowOutputBag | undefined {
if (!outputs) {
return undefined;
}
return JSON.parse(JSON.stringify(outputs)) as FlowOutputBag;
}
function cloneFlowRecord(record: FlowRecord): FlowRecord {
return {
...record,
...(record.requesterOrigin ? { requesterOrigin: { ...record.requesterOrigin } } : {}),
...(record.outputs ? { outputs: cloneFlowOutputs(record.outputs) } : {}),
};
}
function snapshotFlowRecords(source: ReadonlyMap<string, FlowRecord>): FlowRecord[] {
return [...source.values()].map((record) => cloneFlowRecord(record));
}
function ensureNotifyPolicy(notifyPolicy?: TaskNotifyPolicy): TaskNotifyPolicy {
return notifyPolicy ?? "done_only";
}
function ensureFlowShape(shape?: FlowShape): FlowShape {
return shape ?? "linear";
}
function resolveFlowGoal(task: Pick<TaskRecord, "label" | "task">): string {
return task.label?.trim() || task.task.trim() || "Background task";
}
function resolveFlowBlockedSummary(
task: Pick<TaskRecord, "status" | "terminalOutcome" | "terminalSummary" | "progressSummary">,
): string | undefined {
if (task.status !== "succeeded" || task.terminalOutcome !== "blocked") {
return undefined;
}
return task.terminalSummary?.trim() || task.progressSummary?.trim() || undefined;
}
type FlowRecordPatch = {
status?: FlowStatus;
notifyPolicy?: TaskNotifyPolicy;
goal?: string;
currentStep?: string | null;
waitingOnTaskId?: string | null;
outputs?: FlowOutputBag | null;
blockedTaskId?: string | null;
blockedSummary?: string | null;
updatedAt?: number;
endedAt?: number | null;
};
export function deriveFlowStatusFromTask(
task: Pick<TaskRecord, "status" | "terminalOutcome">,
): FlowStatus {
if (task.status === "queued") {
return "queued";
}
if (task.status === "running") {
return "running";
}
if (task.status === "succeeded") {
return task.terminalOutcome === "blocked" ? "blocked" : "succeeded";
}
if (task.status === "cancelled") {
return "cancelled";
}
if (task.status === "lost") {
return "lost";
}
return "failed";
}
function ensureFlowRegistryReady() {
if (restoreAttempted) {
return;
}
restoreAttempted = true;
const restored = getFlowRegistryStore().loadSnapshot();
flows.clear();
for (const [flowId, flow] of restored.flows) {
flows.set(flowId, cloneFlowRecord(flow));
}
}
function persistFlowRegistry() {
getFlowRegistryStore().saveSnapshot({
flows: new Map(snapshotFlowRecords(flows).map((flow) => [flow.flowId, flow])),
});
}
function persistFlowUpsert(flow: FlowRecord) {
const store = getFlowRegistryStore();
if (store.upsertFlow) {
store.upsertFlow(cloneFlowRecord(flow));
return;
}
persistFlowRegistry();
}
function persistFlowDelete(flowId: string) {
const store = getFlowRegistryStore();
if (store.deleteFlow) {
store.deleteFlow(flowId);
return;
}
persistFlowRegistry();
}
export function createFlowRecord(params: {
shape?: FlowShape;
ownerSessionKey: string;
requesterOrigin?: FlowRecord["requesterOrigin"];
status?: FlowStatus;
notifyPolicy?: TaskNotifyPolicy;
goal: string;
currentStep?: string;
waitingOnTaskId?: string;
outputs?: FlowOutputBag;
blockedTaskId?: string;
blockedSummary?: string;
createdAt?: number;
updatedAt?: number;
endedAt?: number;
}): FlowRecord {
ensureFlowRegistryReady();
const now = params.createdAt ?? Date.now();
const record: FlowRecord = {
flowId: crypto.randomUUID(),
shape: ensureFlowShape(params.shape),
ownerSessionKey: params.ownerSessionKey,
...(params.requesterOrigin ? { requesterOrigin: { ...params.requesterOrigin } } : {}),
status: params.status ?? "queued",
notifyPolicy: ensureNotifyPolicy(params.notifyPolicy),
goal: params.goal,
currentStep: params.currentStep?.trim() || undefined,
waitingOnTaskId: params.waitingOnTaskId?.trim() || undefined,
outputs: cloneFlowOutputs(params.outputs),
blockedTaskId: params.blockedTaskId?.trim() || undefined,
blockedSummary: params.blockedSummary?.trim() || undefined,
createdAt: now,
updatedAt: params.updatedAt ?? now,
...(params.endedAt !== undefined ? { endedAt: params.endedAt } : {}),
};
flows.set(record.flowId, record);
persistFlowUpsert(record);
return cloneFlowRecord(record);
}
export function createFlowForTask(params: {
task: Pick<
TaskRecord,
| "requesterSessionKey"
| "taskId"
| "notifyPolicy"
| "status"
| "terminalOutcome"
| "label"
| "task"
| "createdAt"
| "lastEventAt"
| "endedAt"
| "terminalSummary"
| "progressSummary"
>;
requesterOrigin?: FlowRecord["requesterOrigin"];
}): FlowRecord {
const terminalFlowStatus = deriveFlowStatusFromTask(params.task);
const isTerminal =
terminalFlowStatus === "succeeded" ||
terminalFlowStatus === "blocked" ||
terminalFlowStatus === "failed" ||
terminalFlowStatus === "cancelled" ||
terminalFlowStatus === "lost";
const endedAt = isTerminal
? (params.task.endedAt ?? params.task.lastEventAt ?? params.task.createdAt)
: undefined;
return createFlowRecord({
shape: "single_task",
ownerSessionKey: params.task.requesterSessionKey,
requesterOrigin: params.requesterOrigin,
status: terminalFlowStatus,
notifyPolicy: params.task.notifyPolicy,
goal: resolveFlowGoal(params.task),
blockedTaskId:
terminalFlowStatus === "blocked" ? params.task.taskId.trim() || undefined : undefined,
blockedSummary: resolveFlowBlockedSummary(params.task),
createdAt: params.task.createdAt,
updatedAt: params.task.lastEventAt ?? params.task.createdAt,
...(endedAt !== undefined ? { endedAt } : {}),
});
}
export function updateFlowRecordById(flowId: string, patch: FlowRecordPatch): FlowRecord | null {
ensureFlowRegistryReady();
const current = flows.get(flowId);
if (!current) {
return null;
}
const next: FlowRecord = {
...current,
...(patch.status ? { status: patch.status } : {}),
...(patch.notifyPolicy ? { notifyPolicy: patch.notifyPolicy } : {}),
...(patch.goal ? { goal: patch.goal } : {}),
currentStep:
patch.currentStep === undefined
? current.currentStep
: patch.currentStep?.trim() || undefined,
waitingOnTaskId:
patch.waitingOnTaskId === undefined
? current.waitingOnTaskId
: patch.waitingOnTaskId?.trim() || undefined,
outputs:
patch.outputs === undefined
? cloneFlowOutputs(current.outputs)
: (cloneFlowOutputs(patch.outputs ?? undefined) ?? undefined),
blockedTaskId:
patch.blockedTaskId === undefined
? current.blockedTaskId
: patch.blockedTaskId?.trim() || undefined,
blockedSummary:
patch.blockedSummary === undefined
? current.blockedSummary
: patch.blockedSummary?.trim() || undefined,
updatedAt: patch.updatedAt ?? Date.now(),
endedAt: patch.endedAt === undefined ? current.endedAt : (patch.endedAt ?? undefined),
};
flows.set(flowId, next);
persistFlowUpsert(next);
return cloneFlowRecord(next);
}
export function syncFlowFromTask(
task: Pick<
TaskRecord,
| "parentFlowId"
| "status"
| "terminalOutcome"
| "notifyPolicy"
| "label"
| "task"
| "lastEventAt"
| "endedAt"
| "taskId"
| "terminalSummary"
| "progressSummary"
>,
): FlowRecord | null {
const flowId = task.parentFlowId?.trim();
if (!flowId) {
return null;
}
const flow = getFlowById(flowId);
if (!flow) {
return null;
}
if (flow.shape !== "single_task") {
return flow;
}
const terminalFlowStatus = deriveFlowStatusFromTask(task);
const isTerminal =
terminalFlowStatus === "succeeded" ||
terminalFlowStatus === "blocked" ||
terminalFlowStatus === "failed" ||
terminalFlowStatus === "cancelled" ||
terminalFlowStatus === "lost";
return updateFlowRecordById(flowId, {
status: terminalFlowStatus,
notifyPolicy: task.notifyPolicy,
goal: resolveFlowGoal(task),
blockedTaskId: terminalFlowStatus === "blocked" ? task.taskId.trim() || null : null,
blockedSummary:
terminalFlowStatus === "blocked" ? (resolveFlowBlockedSummary(task) ?? null) : null,
updatedAt: task.lastEventAt ?? Date.now(),
...(isTerminal
? {
endedAt: task.endedAt ?? task.lastEventAt ?? Date.now(),
}
: { endedAt: null }),
});
}
export function getFlowById(flowId: string): FlowRecord | undefined {
ensureFlowRegistryReady();
const flow = flows.get(flowId);
return flow ? cloneFlowRecord(flow) : undefined;
}
export function listFlowsForOwnerSessionKey(sessionKey: string): FlowRecord[] {
ensureFlowRegistryReady();
const normalizedSessionKey = sessionKey.trim();
if (!normalizedSessionKey) {
return [];
}
return [...flows.values()]
.filter((flow) => flow.ownerSessionKey.trim() === normalizedSessionKey)
.map((flow) => cloneFlowRecord(flow))
.toSorted((left, right) => right.createdAt - left.createdAt);
}
export function findLatestFlowForOwnerSessionKey(sessionKey: string): FlowRecord | undefined {
const flow = listFlowsForOwnerSessionKey(sessionKey)[0];
return flow ? cloneFlowRecord(flow) : undefined;
}
export function resolveFlowForLookupToken(token: string): FlowRecord | undefined {
const lookup = token.trim();
if (!lookup) {
return undefined;
}
return getFlowById(lookup) ?? findLatestFlowForOwnerSessionKey(lookup);
}
export function listFlowRecords(): FlowRecord[] {
ensureFlowRegistryReady();
return [...flows.values()]
.map((flow) => cloneFlowRecord(flow))
.toSorted((left, right) => right.createdAt - left.createdAt);
}
export function deleteFlowRecordById(flowId: string): boolean {
ensureFlowRegistryReady();
const current = flows.get(flowId);
if (!current) {
return false;
}
flows.delete(flowId);
persistFlowDelete(flowId);
return true;
}
export function resetFlowRegistryForTests(opts?: { persist?: boolean }) {
flows.clear();
restoreAttempted = false;
resetFlowRegistryRuntimeForTests();
if (opts?.persist !== false) {
persistFlowRegistry();
getFlowRegistryStore().close?.();
}
}

View File

@ -1,42 +0,0 @@
import type { DeliveryContext } from "../utils/delivery-context.js";
import type { TaskNotifyPolicy } from "./task-registry.types.js";
export type FlowShape = "single_task" | "linear";
export type FlowOutputValue =
| null
| boolean
| number
| string
| FlowOutputValue[]
| { [key: string]: FlowOutputValue };
export type FlowOutputBag = Record<string, FlowOutputValue>;
export type FlowStatus =
| "queued"
| "running"
| "waiting"
| "blocked"
| "succeeded"
| "failed"
| "cancelled"
| "lost";
export type FlowRecord = {
flowId: string;
shape: FlowShape;
ownerSessionKey: string;
requesterOrigin?: DeliveryContext;
status: FlowStatus;
notifyPolicy: TaskNotifyPolicy;
goal: string;
currentStep?: string;
waitingOnTaskId?: string;
outputs?: FlowOutputBag;
blockedTaskId?: string;
blockedSummary?: string;
createdAt: number;
updatedAt: number;
endedAt?: number;
};

View File

@ -1,281 +0,0 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import { withTempDir } from "../test-helpers/temp-dir.js";
import { getFlowById, resetFlowRegistryForTests, updateFlowRecordById } from "./flow-registry.js";
import {
appendFlowOutput,
createFlow,
emitFlowUpdate,
failFlow,
finishFlow,
resumeFlow,
runTaskInFlow,
setFlowOutput,
} from "./flow-runtime.js";
import { listTasksForFlowId, resetTaskRegistryForTests } from "./task-registry.js";
const ORIGINAL_STATE_DIR = process.env.OPENCLAW_STATE_DIR;
const mocks = vi.hoisted(() => ({
sendMessageMock: vi.fn(),
enqueueSystemEventMock: vi.fn(),
requestHeartbeatNowMock: vi.fn(),
}));
vi.mock("./task-registry-delivery-runtime.js", () => ({
sendMessage: (...args: unknown[]) => mocks.sendMessageMock(...args),
}));
vi.mock("../infra/system-events.js", () => ({
enqueueSystemEvent: (...args: unknown[]) => mocks.enqueueSystemEventMock(...args),
}));
vi.mock("../infra/heartbeat-wake.js", () => ({
requestHeartbeatNow: (...args: unknown[]) => mocks.requestHeartbeatNowMock(...args),
}));
vi.mock("../infra/agent-events.js", () => ({
onAgentEvent: () => () => {},
}));
vi.mock("../acp/control-plane/manager.js", () => ({
getAcpSessionManager: () => ({
cancelSession: vi.fn(),
}),
}));
vi.mock("../agents/subagent-control.js", () => ({
killSubagentRunAdmin: vi.fn(),
}));
async function withFlowRuntimeStateDir(run: (root: string) => Promise<void>): Promise<void> {
await withTempDir({ prefix: "openclaw-flow-runtime-" }, async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
resetFlowRegistryForTests();
try {
await run(root);
} finally {
resetTaskRegistryForTests();
resetFlowRegistryForTests();
}
});
}
describe("flow-runtime", () => {
afterEach(() => {
if (ORIGINAL_STATE_DIR === undefined) {
delete process.env.OPENCLAW_STATE_DIR;
} else {
process.env.OPENCLAW_STATE_DIR = ORIGINAL_STATE_DIR;
}
resetTaskRegistryForTests();
resetFlowRegistryForTests();
mocks.sendMessageMock.mockReset();
mocks.enqueueSystemEventMock.mockReset();
mocks.requestHeartbeatNowMock.mockReset();
});
it("runs a child task under a linear flow and marks the flow as waiting on it", async () => {
await withFlowRuntimeStateDir(async () => {
const flow = createFlow({
ownerSessionKey: "agent:main:main",
requesterOrigin: {
channel: "telegram",
to: "telegram:123",
},
goal: "Triage inbox",
});
const started = runTaskInFlow({
flowId: flow.flowId,
runtime: "acp",
childSessionKey: "agent:codex:acp:child",
runId: "run-flow-runtime-1",
task: "Classify inbox messages",
currentStep: "wait_for_classification",
});
expect(started.task).toMatchObject({
requesterSessionKey: "agent:main:main",
parentFlowId: flow.flowId,
childSessionKey: "agent:codex:acp:child",
runId: "run-flow-runtime-1",
status: "queued",
});
expect(started.flow).toMatchObject({
flowId: flow.flowId,
status: "waiting",
currentStep: "wait_for_classification",
waitingOnTaskId: started.task.taskId,
});
expect(listTasksForFlowId(flow.flowId)).toHaveLength(1);
});
});
it("stores outputs and waiting metadata across sqlite restore", async () => {
await withFlowRuntimeStateDir(async () => {
const flow = createFlow({
ownerSessionKey: "agent:main:main",
goal: "Inbox routing",
});
const started = runTaskInFlow({
flowId: flow.flowId,
runtime: "subagent",
childSessionKey: "agent:codex:subagent:child",
runId: "run-flow-runtime-restore",
task: "Bucket messages",
});
setFlowOutput({
flowId: flow.flowId,
key: "classification",
value: {
business: 1,
personal: 2,
},
});
appendFlowOutput({
flowId: flow.flowId,
key: "eod_summary",
value: {
subject: "Newsletter",
},
});
resetTaskRegistryForTests({ persist: false });
resetFlowRegistryForTests({ persist: false });
expect(getFlowById(flow.flowId)).toMatchObject({
flowId: flow.flowId,
status: "waiting",
waitingOnTaskId: started.task.taskId,
outputs: {
classification: {
business: 1,
personal: 2,
},
eod_summary: [
{
subject: "Newsletter",
},
],
},
});
});
});
it("reopens a blocked flow with resume and marks terminal states with finish/fail", async () => {
await withFlowRuntimeStateDir(async () => {
const flow = createFlow({
ownerSessionKey: "agent:main:main",
goal: "Review inbox",
});
const started = runTaskInFlow({
flowId: flow.flowId,
runtime: "acp",
childSessionKey: "agent:codex:acp:child",
runId: "run-flow-runtime-reopen",
task: "Review inbox",
});
updateFlowRecordById(flow.flowId, {
status: "blocked",
blockedTaskId: started.task.taskId,
blockedSummary: "Need auth.",
endedAt: 120,
});
expect(resumeFlow({ flowId: flow.flowId, currentStep: "retry_auth" })).toMatchObject({
flowId: flow.flowId,
status: "running",
currentStep: "retry_auth",
});
expect(getFlowById(flow.flowId)?.blockedTaskId).toBeUndefined();
expect(getFlowById(flow.flowId)?.waitingOnTaskId).toBeUndefined();
expect(getFlowById(flow.flowId)?.endedAt).toBeUndefined();
expect(
finishFlow({ flowId: flow.flowId, currentStep: "finish", endedAt: 200 }),
).toMatchObject({
flowId: flow.flowId,
status: "succeeded",
currentStep: "finish",
endedAt: 200,
});
const failed = createFlow({
ownerSessionKey: "agent:main:main",
goal: "Failing flow",
});
expect(failFlow({ flowId: failed.flowId, currentStep: "abort", endedAt: 300 })).toMatchObject(
{
flowId: failed.flowId,
status: "failed",
currentStep: "abort",
endedAt: 300,
},
);
});
});
it("delivers explicit flow updates through the flow owner context when possible", async () => {
await withFlowRuntimeStateDir(async () => {
const flow = createFlow({
ownerSessionKey: "agent:main:main",
requesterOrigin: {
channel: "telegram",
to: "telegram:123",
threadId: "42",
},
goal: "Inbox routing",
});
const result = await emitFlowUpdate({
flowId: flow.flowId,
content: "Personal message needs your attention.",
eventKey: "personal-alert",
});
expect(result.delivery).toBe("direct");
expect(mocks.sendMessageMock).toHaveBeenCalledWith(
expect.objectContaining({
channel: "telegram",
to: "telegram:123",
threadId: "42",
content: "Personal message needs your attention.",
idempotencyKey: `flow:${flow.flowId}:update:personal-alert`,
mirror: expect.objectContaining({
sessionKey: "agent:main:main",
}),
}),
);
});
});
it("falls back to session-queued flow updates when direct delivery is unavailable", async () => {
await withFlowRuntimeStateDir(async () => {
const flow = createFlow({
ownerSessionKey: "agent:main:main",
goal: "Inbox routing",
});
const result = await emitFlowUpdate({
flowId: flow.flowId,
content: "Business email sent to Slack and waiting for reply.",
});
expect(result.delivery).toBe("session_queued");
expect(mocks.enqueueSystemEventMock).toHaveBeenCalledWith(
"Business email sent to Slack and waiting for reply.",
expect.objectContaining({
sessionKey: "agent:main:main",
contextKey: `flow:${flow.flowId}`,
}),
);
expect(mocks.requestHeartbeatNowMock).toHaveBeenCalledWith({
reason: "clawflow-update",
sessionKey: "agent:main:main",
});
});
});
});

View File

@ -1,377 +0,0 @@
import { requestHeartbeatNow } from "../infra/heartbeat-wake.js";
import { enqueueSystemEvent } from "../infra/system-events.js";
import { parseAgentSessionKey } from "../routing/session-key.js";
import { isDeliverableMessageChannel } from "../utils/message-channel.js";
import { createFlowRecord, getFlowById, updateFlowRecordById } from "./flow-registry.js";
import type { FlowOutputBag, FlowOutputValue, FlowRecord } from "./flow-registry.types.js";
import { createQueuedTaskRun, createRunningTaskRun } from "./task-executor.js";
import { listTasksForFlowId } from "./task-registry.js";
import type {
TaskDeliveryStatus,
TaskNotifyPolicy,
TaskRecord,
TaskRuntime,
} from "./task-registry.types.js";
let deliveryRuntimePromise: Promise<typeof import("./task-registry-delivery-runtime.js")> | null =
null;
type FlowTaskLaunch = "queued" | "running";
export type FlowUpdateDelivery = "direct" | "session_queued" | "parent_missing" | "failed";
function loadFlowDeliveryRuntime() {
deliveryRuntimePromise ??= import("./task-registry-delivery-runtime.js");
return deliveryRuntimePromise;
}
function requireFlow(flowId: string): FlowRecord {
const flow = getFlowById(flowId);
if (!flow) {
throw new Error(`Flow not found: ${flowId}`);
}
return flow;
}
function requireLinearFlow(flowId: string): FlowRecord {
const flow = requireFlow(flowId);
if (flow.shape !== "linear") {
throw new Error(`Flow is not linear: ${flowId}`);
}
return flow;
}
function cloneOutputValue<T extends FlowOutputValue>(value: T): T {
return JSON.parse(JSON.stringify(value)) as T;
}
function updateRequiredFlow(
flowId: string,
patch: Parameters<typeof updateFlowRecordById>[1],
): FlowRecord {
const updated = updateFlowRecordById(flowId, patch);
if (!updated) {
throw new Error(`Flow not found: ${flowId}`);
}
return updated;
}
function resolveFlowOutputs(flow: FlowRecord): FlowOutputBag {
return flow.outputs ? cloneOutputValue(flow.outputs) : {};
}
function canDeliverFlowToRequesterOrigin(flow: FlowRecord): boolean {
const channel = flow.requesterOrigin?.channel?.trim();
const to = flow.requesterOrigin?.to?.trim();
return Boolean(channel && to && isDeliverableMessageChannel(channel));
}
export function createFlow(params: {
ownerSessionKey: string;
requesterOrigin?: FlowRecord["requesterOrigin"];
goal: string;
notifyPolicy?: TaskNotifyPolicy;
currentStep?: string;
createdAt?: number;
updatedAt?: number;
}): FlowRecord {
return createFlowRecord({
shape: "linear",
ownerSessionKey: params.ownerSessionKey,
requesterOrigin: params.requesterOrigin,
goal: params.goal,
notifyPolicy: params.notifyPolicy,
currentStep: params.currentStep,
status: "queued",
createdAt: params.createdAt,
updatedAt: params.updatedAt,
});
}
export function runTaskInFlow(params: {
flowId: string;
runtime: TaskRuntime;
sourceId?: string;
childSessionKey?: string;
parentTaskId?: string;
agentId?: string;
runId?: string;
label?: string;
task: string;
preferMetadata?: boolean;
notifyPolicy?: TaskNotifyPolicy;
deliveryStatus?: TaskDeliveryStatus;
launch?: FlowTaskLaunch;
startedAt?: number;
lastEventAt?: number;
progressSummary?: string | null;
currentStep?: string;
}): { flow: FlowRecord; task: TaskRecord } {
const flow = requireLinearFlow(params.flowId);
const launch = params.launch ?? "queued";
const task =
launch === "running"
? createRunningTaskRun({
runtime: params.runtime,
sourceId: params.sourceId,
requesterSessionKey: flow.ownerSessionKey,
requesterOrigin: flow.requesterOrigin,
parentFlowId: flow.flowId,
childSessionKey: params.childSessionKey,
parentTaskId: params.parentTaskId,
agentId: params.agentId,
runId: params.runId,
label: params.label,
task: params.task,
preferMetadata: params.preferMetadata,
notifyPolicy: params.notifyPolicy ?? flow.notifyPolicy,
deliveryStatus: params.deliveryStatus,
startedAt: params.startedAt,
lastEventAt: params.lastEventAt,
progressSummary: params.progressSummary,
})
: createQueuedTaskRun({
runtime: params.runtime,
sourceId: params.sourceId,
requesterSessionKey: flow.ownerSessionKey,
requesterOrigin: flow.requesterOrigin,
parentFlowId: flow.flowId,
childSessionKey: params.childSessionKey,
parentTaskId: params.parentTaskId,
agentId: params.agentId,
runId: params.runId,
label: params.label,
task: params.task,
preferMetadata: params.preferMetadata,
notifyPolicy: params.notifyPolicy ?? flow.notifyPolicy,
deliveryStatus: params.deliveryStatus,
});
return {
task,
flow: updateRequiredFlow(flow.flowId, {
status: "waiting",
currentStep: params.currentStep ?? flow.currentStep ?? "wait_for_task",
waitingOnTaskId: task.taskId,
blockedTaskId: null,
blockedSummary: null,
endedAt: null,
updatedAt: task.lastEventAt ?? task.startedAt ?? Date.now(),
}),
};
}
export function setFlowWaiting(params: {
flowId: string;
currentStep?: string | null;
waitingOnTaskId?: string | null;
updatedAt?: number;
}): FlowRecord {
const flow = requireLinearFlow(params.flowId);
if (params.waitingOnTaskId?.trim()) {
const waitingOnTaskId = params.waitingOnTaskId.trim();
const linkedTaskIds = new Set(listTasksForFlowId(flow.flowId).map((task) => task.taskId));
if (!linkedTaskIds.has(waitingOnTaskId)) {
throw new Error(`Flow ${flow.flowId} is not linked to task ${waitingOnTaskId}`);
}
}
return updateRequiredFlow(flow.flowId, {
status: "waiting",
currentStep: params.currentStep,
waitingOnTaskId: params.waitingOnTaskId,
endedAt: null,
updatedAt: params.updatedAt ?? Date.now(),
});
}
export function setFlowOutput(params: {
flowId: string;
key: string;
value: FlowOutputValue;
updatedAt?: number;
}): FlowRecord {
const flow = requireLinearFlow(params.flowId);
const key = params.key.trim();
if (!key) {
throw new Error("Flow output key is required.");
}
const outputs = resolveFlowOutputs(flow);
outputs[key] = cloneOutputValue(params.value);
return updateRequiredFlow(flow.flowId, {
outputs,
updatedAt: params.updatedAt ?? Date.now(),
});
}
export function appendFlowOutput(params: {
flowId: string;
key: string;
value: FlowOutputValue;
updatedAt?: number;
}): FlowRecord {
const flow = requireLinearFlow(params.flowId);
const key = params.key.trim();
if (!key) {
throw new Error("Flow output key is required.");
}
const outputs = resolveFlowOutputs(flow);
const nextValue = cloneOutputValue(params.value);
const current = outputs[key];
if (current === undefined) {
outputs[key] = [nextValue];
} else if (Array.isArray(current)) {
outputs[key] = [...current, nextValue];
} else {
throw new Error(`Flow output ${key} is not an array.`);
}
return updateRequiredFlow(flow.flowId, {
outputs,
updatedAt: params.updatedAt ?? Date.now(),
});
}
export function resumeFlow(params: {
flowId: string;
currentStep?: string | null;
updatedAt?: number;
}): FlowRecord {
const flow = requireLinearFlow(params.flowId);
return updateRequiredFlow(flow.flowId, {
status: "running",
currentStep: params.currentStep,
waitingOnTaskId: null,
blockedTaskId: null,
blockedSummary: null,
endedAt: null,
updatedAt: params.updatedAt ?? Date.now(),
});
}
export function finishFlow(params: {
flowId: string;
currentStep?: string | null;
updatedAt?: number;
endedAt?: number;
}): FlowRecord {
const flow = requireLinearFlow(params.flowId);
const endedAt = params.endedAt ?? params.updatedAt ?? Date.now();
return updateRequiredFlow(flow.flowId, {
status: "succeeded",
currentStep: params.currentStep,
waitingOnTaskId: null,
blockedTaskId: null,
blockedSummary: null,
updatedAt: params.updatedAt ?? endedAt,
endedAt,
});
}
export function failFlow(params: {
flowId: string;
currentStep?: string | null;
updatedAt?: number;
endedAt?: number;
}): FlowRecord {
const flow = requireLinearFlow(params.flowId);
const endedAt = params.endedAt ?? params.updatedAt ?? Date.now();
return updateRequiredFlow(flow.flowId, {
status: "failed",
currentStep: params.currentStep,
waitingOnTaskId: null,
blockedTaskId: null,
blockedSummary: null,
updatedAt: params.updatedAt ?? endedAt,
endedAt,
});
}
export async function emitFlowUpdate(params: {
flowId: string;
content: string;
eventKey?: string;
currentStep?: string | null;
updatedAt?: number;
}): Promise<{ flow: FlowRecord; delivery: FlowUpdateDelivery }> {
const flow = requireFlow(params.flowId);
const content = params.content.trim();
if (!content) {
throw new Error("Flow update content is required.");
}
const ownerSessionKey = flow.ownerSessionKey.trim();
const updatedAt = params.updatedAt ?? Date.now();
const updatedFlow = updateRequiredFlow(flow.flowId, {
currentStep: params.currentStep,
updatedAt,
});
if (!ownerSessionKey) {
return {
flow: updatedFlow,
delivery: "parent_missing",
};
}
if (!canDeliverFlowToRequesterOrigin(updatedFlow)) {
try {
enqueueSystemEvent(content, {
sessionKey: ownerSessionKey,
contextKey: `flow:${updatedFlow.flowId}`,
deliveryContext: updatedFlow.requesterOrigin,
});
requestHeartbeatNow({
reason: "clawflow-update",
sessionKey: ownerSessionKey,
});
return {
flow: updatedFlow,
delivery: "session_queued",
};
} catch {
return {
flow: updatedFlow,
delivery: "failed",
};
}
}
try {
const requesterAgentId = parseAgentSessionKey(ownerSessionKey)?.agentId;
const idempotencyKey = `flow:${updatedFlow.flowId}:update:${params.eventKey?.trim() || updatedAt}`;
const { sendMessage } = await loadFlowDeliveryRuntime();
await sendMessage({
channel: updatedFlow.requesterOrigin?.channel,
to: updatedFlow.requesterOrigin?.to ?? "",
accountId: updatedFlow.requesterOrigin?.accountId,
threadId: updatedFlow.requesterOrigin?.threadId,
content,
agentId: requesterAgentId,
idempotencyKey,
mirror: {
sessionKey: ownerSessionKey,
agentId: requesterAgentId,
idempotencyKey,
},
});
return {
flow: updatedFlow,
delivery: "direct",
};
} catch {
try {
enqueueSystemEvent(content, {
sessionKey: ownerSessionKey,
contextKey: `flow:${updatedFlow.flowId}`,
deliveryContext: updatedFlow.requesterOrigin,
});
requestHeartbeatNow({
reason: "clawflow-update",
sessionKey: ownerSessionKey,
});
return {
flow: updatedFlow,
delivery: "session_queued",
};
} catch {
return {
flow: updatedFlow,
delivery: "failed",
};
}
}
}

View File

@ -1,29 +1,16 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import { withTempDir } from "../test-helpers/temp-dir.js";
import {
getFlowById,
listFlowRecords,
resetFlowRegistryForTests,
updateFlowRecordById,
} from "./flow-registry.js";
import {
cancelFlowById,
cancelDetachedTaskRunById,
completeTaskRunByRunId,
createLinearFlow,
createQueuedTaskRun,
createRunningTaskRun,
failTaskRunByRunId,
recordTaskRunProgressByRunId,
retryBlockedFlowAsQueuedTaskRun,
retryBlockedFlowAsRunningTaskRun,
setDetachedTaskDeliveryStatusByRunId,
startTaskRunByRunId,
} from "./task-executor.js";
import {
findLatestTaskForFlowId,
findTaskByRunId,
resetTaskRegistryForTests,
} from "./task-registry.js";
import { findTaskByRunId, resetTaskRegistryForTests } from "./task-registry.js";
const ORIGINAL_STATE_DIR = process.env.OPENCLAW_STATE_DIR;
const hoisted = vi.hoisted(() => {
@ -55,12 +42,10 @@ async function withTaskExecutorStateDir(run: (root: string) => Promise<void>): P
await withTempDir({ prefix: "openclaw-task-executor-" }, async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
resetFlowRegistryForTests();
try {
await run(root);
} finally {
resetTaskRegistryForTests();
resetFlowRegistryForTests();
}
});
}
@ -73,7 +58,6 @@ describe("task-executor", () => {
process.env.OPENCLAW_STATE_DIR = ORIGINAL_STATE_DIR;
}
resetTaskRegistryForTests();
resetFlowRegistryForTests();
hoisted.sendMessageMock.mockReset();
hoisted.cancelSessionMock.mockReset();
hoisted.killSubagentRunAdminMock.mockReset();
@ -155,62 +139,7 @@ describe("task-executor", () => {
});
});
it("auto-creates a one-task flow and keeps it synced with task status", async () => {
await withTaskExecutorStateDir(async () => {
const created = createRunningTaskRun({
runtime: "subagent",
requesterSessionKey: "agent:main:main",
childSessionKey: "agent:codex:subagent:child",
runId: "run-executor-flow",
task: "Write summary",
startedAt: 10,
deliveryStatus: "pending",
});
expect(created.parentFlowId).toEqual(expect.any(String));
expect(getFlowById(created.parentFlowId!)).toMatchObject({
flowId: created.parentFlowId,
ownerSessionKey: "agent:main:main",
status: "running",
goal: "Write summary",
notifyPolicy: "done_only",
});
completeTaskRunByRunId({
runId: "run-executor-flow",
endedAt: 40,
lastEventAt: 40,
terminalSummary: "Done.",
});
expect(getFlowById(created.parentFlowId!)).toMatchObject({
flowId: created.parentFlowId,
status: "succeeded",
endedAt: 40,
goal: "Write summary",
notifyPolicy: "done_only",
});
});
});
it("does not auto-create one-task flows for non-returning bookkeeping runs", async () => {
await withTaskExecutorStateDir(async () => {
const created = createRunningTaskRun({
runtime: "cli",
requesterSessionKey: "agent:main:main",
childSessionKey: "agent:main:main",
runId: "run-executor-cli",
task: "Foreground gateway run",
deliveryStatus: "not_applicable",
startedAt: 10,
});
expect(created.parentFlowId).toBeUndefined();
expect(listFlowRecords()).toEqual([]);
});
});
it("records blocked metadata on one-task flows and reuses the same flow for queued retries", async () => {
it("records blocked task outcomes without wrapping them in a separate flow model", async () => {
await withTaskExecutorStateDir(async () => {
const created = createRunningTaskRun({
runtime: "acp",
@ -235,171 +164,22 @@ describe("task-executor", () => {
terminalSummary: "Writable session required.",
});
expect(getFlowById(created.parentFlowId!)).toMatchObject({
flowId: created.parentFlowId,
status: "blocked",
blockedTaskId: created.taskId,
blockedSummary: "Writable session required.",
endedAt: 40,
});
const retried = retryBlockedFlowAsQueuedTaskRun({
flowId: created.parentFlowId!,
runId: "run-executor-retry",
childSessionKey: "agent:codex:acp:retry-child",
});
expect(retried).toMatchObject({
found: true,
retried: true,
previousTask: expect.objectContaining({
taskId: created.taskId,
}),
task: expect.objectContaining({
parentFlowId: created.parentFlowId,
parentTaskId: created.taskId,
status: "queued",
runId: "run-executor-retry",
}),
});
expect(getFlowById(created.parentFlowId!)).toMatchObject({
flowId: created.parentFlowId,
status: "queued",
});
expect(getFlowById(created.parentFlowId!)?.blockedTaskId).toBeUndefined();
expect(getFlowById(created.parentFlowId!)?.blockedSummary).toBeUndefined();
expect(getFlowById(created.parentFlowId!)?.endedAt).toBeUndefined();
expect(findLatestTaskForFlowId(created.parentFlowId!)).toMatchObject({
taskId: retried.task?.taskId,
});
});
});
it("can reopen blocked one-task flows directly into a running retry", async () => {
await withTaskExecutorStateDir(async () => {
const created = createRunningTaskRun({
runtime: "subagent",
requesterSessionKey: "agent:main:main",
childSessionKey: "agent:codex:subagent:child",
runId: "run-executor-blocked-running",
task: "Write summary",
startedAt: 10,
deliveryStatus: "pending",
});
completeTaskRunByRunId({
runId: "run-executor-blocked-running",
endedAt: 40,
lastEventAt: 40,
expect(findTaskByRunId("run-executor-blocked")).toMatchObject({
taskId: created.taskId,
status: "succeeded",
terminalOutcome: "blocked",
terminalSummary: "Need write approval.",
});
const retried = retryBlockedFlowAsRunningTaskRun({
flowId: created.parentFlowId!,
runId: "run-executor-running-retry",
childSessionKey: "agent:codex:subagent:retry",
startedAt: 55,
lastEventAt: 55,
progressSummary: "Retrying with approval",
});
expect(retried).toMatchObject({
found: true,
retried: true,
task: expect.objectContaining({
parentFlowId: created.parentFlowId,
status: "running",
runId: "run-executor-running-retry",
progressSummary: "Retrying with approval",
}),
});
expect(getFlowById(created.parentFlowId!)).toMatchObject({
flowId: created.parentFlowId,
status: "running",
terminalSummary: "Writable session required.",
});
});
});
it("refuses to retry flows that are not currently blocked", async () => {
await withTaskExecutorStateDir(async () => {
const created = createRunningTaskRun({
runtime: "acp",
requesterSessionKey: "agent:main:main",
childSessionKey: "agent:codex:acp:child",
runId: "run-executor-not-blocked",
task: "Patch file",
startedAt: 10,
deliveryStatus: "pending",
});
const retried = retryBlockedFlowAsQueuedTaskRun({
flowId: created.parentFlowId!,
runId: "run-should-not-exist",
});
expect(retried).toMatchObject({
found: true,
retried: false,
reason: "Flow is not blocked.",
});
expect(findTaskByRunId("run-should-not-exist")).toBeUndefined();
});
});
it("keeps linear flows under explicit control instead of auto-syncing child task status", async () => {
await withTaskExecutorStateDir(async () => {
const flow = createLinearFlow({
ownerSessionKey: "agent:main:main",
goal: "Triage a PR cluster",
currentStep: "wait_for",
notifyPolicy: "done_only",
});
const child = createRunningTaskRun({
runtime: "acp",
requesterSessionKey: "agent:main:main",
parentFlowId: flow.flowId,
childSessionKey: "agent:codex:acp:child",
runId: "run-linear-child",
task: "Inspect a PR",
startedAt: 10,
deliveryStatus: "pending",
});
completeTaskRunByRunId({
runId: "run-linear-child",
endedAt: 40,
lastEventAt: 40,
terminalSummary: "Done.",
});
expect(child.parentFlowId).toBe(flow.flowId);
expect(getFlowById(flow.flowId)).toMatchObject({
flowId: flow.flowId,
shape: "linear",
status: "queued",
currentStep: "wait_for",
});
});
});
it("cancels active child tasks and marks a linear flow cancelled", async () => {
it("cancels active ACP child tasks", async () => {
await withTaskExecutorStateDir(async () => {
hoisted.cancelSessionMock.mockResolvedValue(undefined);
const flow = createLinearFlow({
ownerSessionKey: "agent:main:main",
goal: "Cluster related PRs",
currentStep: "wait_for",
});
const child = createRunningTaskRun({
runtime: "acp",
requesterSessionKey: "agent:main:main",
parentFlowId: flow.flowId,
childSessionKey: "agent:codex:acp:child",
runId: "run-linear-cancel",
task: "Inspect a PR",
@ -407,58 +187,60 @@ describe("task-executor", () => {
deliveryStatus: "pending",
});
const cancelled = await cancelFlowById({
const cancelled = await cancelDetachedTaskRunById({
cfg: {} as never,
flowId: flow.flowId,
taskId: child.taskId,
});
expect(cancelled).toMatchObject({
found: true,
cancelled: true,
flow: expect.objectContaining({
flowId: flow.flowId,
status: "cancelled",
}),
});
expect(findTaskByRunId("run-linear-cancel")).toMatchObject({
taskId: child.taskId,
status: "cancelled",
});
expect(getFlowById(flow.flowId)).toMatchObject({
flowId: flow.flowId,
status: "cancelled",
expect(hoisted.cancelSessionMock).toHaveBeenCalledWith({
cfg: {} as never,
sessionKey: "agent:codex:acp:child",
reason: "task-cancel",
});
expect(hoisted.cancelSessionMock).toHaveBeenCalled();
});
});
it("refuses to rewrite terminal linear flows when cancel is requested", async () => {
it("cancels active subagent child tasks", async () => {
await withTaskExecutorStateDir(async () => {
const flow = createLinearFlow({
ownerSessionKey: "agent:main:main",
goal: "Cluster related PRs",
currentStep: "finish",
});
updateFlowRecordById(flow.flowId, {
status: "succeeded",
endedAt: 55,
updatedAt: 55,
hoisted.killSubagentRunAdminMock.mockResolvedValue({
found: true,
killed: true,
});
const cancelled = await cancelFlowById({
const child = createRunningTaskRun({
runtime: "subagent",
requesterSessionKey: "agent:main:main",
childSessionKey: "agent:codex:subagent:child",
runId: "run-subagent-cancel",
task: "Inspect a PR",
startedAt: 10,
deliveryStatus: "pending",
});
const cancelled = await cancelDetachedTaskRunById({
cfg: {} as never,
flowId: flow.flowId,
taskId: child.taskId,
});
expect(cancelled).toMatchObject({
found: true,
cancelled: false,
reason: "Flow is already succeeded.",
cancelled: true,
});
expect(getFlowById(flow.flowId)).toMatchObject({
flowId: flow.flowId,
status: "succeeded",
endedAt: 55,
expect(findTaskByRunId("run-subagent-cancel")).toMatchObject({
taskId: child.taskId,
status: "cancelled",
});
expect(hoisted.killSubagentRunAdminMock).toHaveBeenCalledWith({
cfg: {} as never,
sessionKey: "agent:codex:subagent:child",
});
});
});

View File

@ -1,90 +1,28 @@
import type { OpenClawConfig } from "../config/config.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import {
createFlowForTask,
createFlowRecord,
deleteFlowRecordById,
getFlowById,
updateFlowRecordById,
} from "./flow-registry.js";
import type { FlowRecord } from "./flow-registry.types.js";
import {
cancelTaskById,
createTaskRecord,
findLatestTaskForFlowId,
linkTaskToFlowById,
listTasksForFlowId,
markTaskLostById,
markTaskRunningByRunId,
markTaskTerminalByRunId,
recordTaskProgressByRunId,
setTaskRunDeliveryStatusByRunId,
} from "./task-registry.js";
import { summarizeTaskRecords } from "./task-registry.summary.js";
import type {
TaskDeliveryState,
TaskDeliveryStatus,
TaskNotifyPolicy,
TaskRecord,
TaskRegistrySummary,
TaskRuntime,
TaskStatus,
TaskTerminalOutcome,
} from "./task-registry.types.js";
const log = createSubsystemLogger("tasks/executor");
function isOneTaskFlowEligible(task: TaskRecord): boolean {
if (task.parentFlowId?.trim() || !task.requesterSessionKey.trim()) {
return false;
}
if (task.deliveryStatus === "not_applicable") {
return false;
}
return task.runtime === "acp" || task.runtime === "subagent";
}
function ensureSingleTaskFlow(params: {
task: TaskRecord;
requesterOrigin?: TaskDeliveryState["requesterOrigin"];
}): TaskRecord {
if (!isOneTaskFlowEligible(params.task)) {
return params.task;
}
try {
const flow = createFlowForTask({
task: params.task,
requesterOrigin: params.requesterOrigin,
});
const linked = linkTaskToFlowById({
taskId: params.task.taskId,
flowId: flow.flowId,
});
if (!linked) {
deleteFlowRecordById(flow.flowId);
return params.task;
}
if (linked.parentFlowId !== flow.flowId) {
deleteFlowRecordById(flow.flowId);
return linked;
}
return linked;
} catch (error) {
log.warn("Failed to create one-task flow for detached run", {
taskId: params.task.taskId,
runId: params.task.runId,
error,
});
return params.task;
}
}
export function createQueuedTaskRun(params: {
runtime: TaskRuntime;
sourceId?: string;
requesterSessionKey: string;
requesterOrigin?: TaskDeliveryState["requesterOrigin"];
parentFlowId?: string;
childSessionKey?: string;
parentTaskId?: string;
agentId?: string;
@ -95,248 +33,10 @@ export function createQueuedTaskRun(params: {
notifyPolicy?: TaskNotifyPolicy;
deliveryStatus?: TaskDeliveryStatus;
}): TaskRecord {
const task = createTaskRecord({
return createTaskRecord({
...params,
status: "queued",
});
return ensureSingleTaskFlow({
task,
requesterOrigin: params.requesterOrigin,
});
}
export function createLinearFlow(params: {
ownerSessionKey: string;
requesterOrigin?: TaskDeliveryState["requesterOrigin"];
goal: string;
notifyPolicy?: TaskNotifyPolicy;
currentStep?: string;
createdAt?: number;
updatedAt?: number;
}): FlowRecord {
return createFlowRecord({
shape: "linear",
ownerSessionKey: params.ownerSessionKey,
requesterOrigin: params.requesterOrigin,
goal: params.goal,
notifyPolicy: params.notifyPolicy,
currentStep: params.currentStep,
status: "queued",
createdAt: params.createdAt,
updatedAt: params.updatedAt,
});
}
export function getFlowTaskSummary(flowId: string): TaskRegistrySummary {
return summarizeTaskRecords(listTasksForFlowId(flowId));
}
type RetryBlockedFlowResult = {
found: boolean;
retried: boolean;
reason?: string;
previousTask?: TaskRecord;
task?: TaskRecord;
};
type RetryBlockedFlowParams = {
flowId: string;
sourceId?: string;
requesterOrigin?: TaskDeliveryState["requesterOrigin"];
childSessionKey?: string;
agentId?: string;
runId?: string;
label?: string;
task?: string;
preferMetadata?: boolean;
notifyPolicy?: TaskNotifyPolicy;
deliveryStatus?: TaskDeliveryStatus;
status: "queued" | "running";
startedAt?: number;
lastEventAt?: number;
progressSummary?: string | null;
};
function resolveRetryableBlockedFlowTask(flowId: string): {
flowFound: boolean;
retryable: boolean;
latestTask?: TaskRecord;
reason?: string;
} {
const flow = getFlowById(flowId);
if (!flow) {
return {
flowFound: false,
retryable: false,
reason: "Flow not found.",
};
}
const latestTask = findLatestTaskForFlowId(flowId);
if (!latestTask) {
return {
flowFound: true,
retryable: false,
reason: "Flow has no retryable task.",
};
}
if (flow.status !== "blocked") {
return {
flowFound: true,
retryable: false,
latestTask,
reason: "Flow is not blocked.",
};
}
if (latestTask.status !== "succeeded" || latestTask.terminalOutcome !== "blocked") {
return {
flowFound: true,
retryable: false,
latestTask,
reason: "Latest flow task is not blocked.",
};
}
return {
flowFound: true,
retryable: true,
latestTask,
};
}
function retryBlockedFlowTask(params: RetryBlockedFlowParams): RetryBlockedFlowResult {
const resolved = resolveRetryableBlockedFlowTask(params.flowId);
if (!resolved.retryable || !resolved.latestTask) {
return {
found: resolved.flowFound,
retried: false,
reason: resolved.reason,
};
}
const flow = getFlowById(params.flowId);
if (!flow) {
return {
found: false,
retried: false,
reason: "Flow not found.",
previousTask: resolved.latestTask,
};
}
const task = createTaskRecord({
runtime: resolved.latestTask.runtime,
sourceId: params.sourceId ?? resolved.latestTask.sourceId,
requesterSessionKey: flow.ownerSessionKey,
requesterOrigin: params.requesterOrigin ?? flow.requesterOrigin,
parentFlowId: flow.flowId,
childSessionKey: params.childSessionKey,
parentTaskId: resolved.latestTask.taskId,
agentId: params.agentId ?? resolved.latestTask.agentId,
runId: params.runId,
label: params.label ?? resolved.latestTask.label,
task: params.task ?? resolved.latestTask.task,
preferMetadata: params.preferMetadata,
notifyPolicy: params.notifyPolicy ?? resolved.latestTask.notifyPolicy,
deliveryStatus: params.deliveryStatus ?? "pending",
status: params.status,
startedAt: params.startedAt,
lastEventAt: params.lastEventAt,
progressSummary: params.progressSummary,
});
return {
found: true,
retried: true,
previousTask: resolved.latestTask,
task,
};
}
export function retryBlockedFlowAsQueuedTaskRun(
params: Omit<RetryBlockedFlowParams, "status" | "startedAt" | "lastEventAt" | "progressSummary">,
): RetryBlockedFlowResult {
return retryBlockedFlowTask({
...params,
status: "queued",
});
}
export function retryBlockedFlowAsRunningTaskRun(
params: Omit<RetryBlockedFlowParams, "status">,
): RetryBlockedFlowResult {
return retryBlockedFlowTask({
...params,
status: "running",
});
}
type CancelFlowResult = {
found: boolean;
cancelled: boolean;
reason?: string;
flow?: FlowRecord;
tasks?: TaskRecord[];
};
function isActiveTaskStatus(status: TaskStatus): boolean {
return status === "queued" || status === "running";
}
function isTerminalFlowStatus(status: FlowRecord["status"]): boolean {
return (
status === "succeeded" || status === "failed" || status === "cancelled" || status === "lost"
);
}
export async function cancelFlowById(params: {
cfg: OpenClawConfig;
flowId: string;
}): Promise<CancelFlowResult> {
const flow = getFlowById(params.flowId);
if (!flow) {
return {
found: false,
cancelled: false,
reason: "Flow not found.",
};
}
const linkedTasks = listTasksForFlowId(flow.flowId);
const activeTasks = linkedTasks.filter((task) => isActiveTaskStatus(task.status));
for (const task of activeTasks) {
await cancelTaskById({
cfg: params.cfg,
taskId: task.taskId,
});
}
const refreshedTasks = listTasksForFlowId(flow.flowId);
const remainingActive = refreshedTasks.filter((task) => isActiveTaskStatus(task.status));
if (remainingActive.length > 0) {
return {
found: true,
cancelled: false,
reason: "One or more child tasks are still active.",
flow: getFlowById(flow.flowId),
tasks: refreshedTasks,
};
}
if (isTerminalFlowStatus(flow.status)) {
return {
found: true,
cancelled: false,
reason: `Flow is already ${flow.status}.`,
flow,
tasks: refreshedTasks,
};
}
const updatedFlow = updateFlowRecordById(flow.flowId, {
status: "cancelled",
blockedTaskId: null,
blockedSummary: null,
endedAt: Date.now(),
updatedAt: Date.now(),
});
return {
found: true,
cancelled: true,
flow: updatedFlow ?? getFlowById(flow.flowId),
tasks: refreshedTasks,
};
}
export function createRunningTaskRun(params: {
@ -344,7 +44,6 @@ export function createRunningTaskRun(params: {
sourceId?: string;
requesterSessionKey: string;
requesterOrigin?: TaskDeliveryState["requesterOrigin"];
parentFlowId?: string;
childSessionKey?: string;
parentTaskId?: string;
agentId?: string;
@ -358,14 +57,10 @@ export function createRunningTaskRun(params: {
lastEventAt?: number;
progressSummary?: string | null;
}): TaskRecord {
const task = createTaskRecord({
return createTaskRecord({
...params,
status: "running",
});
return ensureSingleTaskFlow({
task,
requesterOrigin: params.requesterOrigin,
});
}
export function startTaskRunByRunId(params: {

View File

@ -9,10 +9,7 @@ const ALLOWED_IMPORTERS = new Set([
"agents/tools/session-status-tool.ts",
"auto-reply/reply/commands-acp/runtime-options.ts",
"auto-reply/reply/commands-subagents/action-info.ts",
"commands/doctor-workspace-status.ts",
"commands/flows.ts",
"commands/tasks.ts",
"tasks/flow-runtime.ts",
"tasks/task-executor.ts",
"tasks/task-registry.maintenance.ts",
]);

View File

@ -11,7 +11,6 @@ type TaskRegistryRow = {
runtime: TaskRecord["runtime"];
source_id: string | null;
requester_session_key: string;
parent_flow_id: string | null;
child_session_key: string | null;
parent_task_id: string | null;
agent_id: string | null;
@ -58,7 +57,7 @@ type TaskRegistryDatabase = {
let cachedDatabase: TaskRegistryDatabase | null = null;
const TASK_REGISTRY_DIR_MODE = 0o700;
const TASK_REGISTRY_FILE_MODE = 0o600;
const TASK_REGISTRY_SIDEcar_SUFFIXES = ["", "-shm", "-wal"] as const;
const TASK_REGISTRY_SIDECAR_SUFFIXES = ["", "-shm", "-wal"] as const;
function normalizeNumber(value: number | bigint | null): number | undefined {
if (typeof value === "bigint") {
@ -92,7 +91,6 @@ function rowToTaskRecord(row: TaskRegistryRow): TaskRecord {
runtime: row.runtime,
...(row.source_id ? { sourceId: row.source_id } : {}),
requesterSessionKey: row.requester_session_key,
...(row.parent_flow_id ? { parentFlowId: row.parent_flow_id } : {}),
...(row.child_session_key ? { childSessionKey: row.child_session_key } : {}),
...(row.parent_task_id ? { parentTaskId: row.parent_task_id } : {}),
...(row.agent_id ? { agentId: row.agent_id } : {}),
@ -130,7 +128,6 @@ function bindTaskRecord(record: TaskRecord) {
runtime: record.runtime,
source_id: record.sourceId ?? null,
requester_session_key: record.requesterSessionKey,
parent_flow_id: record.parentFlowId ?? null,
child_session_key: record.childSessionKey ?? null,
parent_task_id: record.parentTaskId ?? null,
agent_id: record.agentId ?? null,
@ -168,7 +165,6 @@ function createStatements(db: DatabaseSync): TaskRegistryStatements {
runtime,
source_id,
requester_session_key,
parent_flow_id,
child_session_key,
parent_task_id,
agent_id,
@ -204,7 +200,6 @@ function createStatements(db: DatabaseSync): TaskRegistryStatements {
runtime,
source_id,
requester_session_key,
parent_flow_id,
child_session_key,
parent_task_id,
agent_id,
@ -228,7 +223,6 @@ function createStatements(db: DatabaseSync): TaskRegistryStatements {
@runtime,
@source_id,
@requester_session_key,
@parent_flow_id,
@child_session_key,
@parent_task_id,
@agent_id,
@ -252,7 +246,6 @@ function createStatements(db: DatabaseSync): TaskRegistryStatements {
runtime = excluded.runtime,
source_id = excluded.source_id,
requester_session_key = excluded.requester_session_key,
parent_flow_id = excluded.parent_flow_id,
child_session_key = excluded.child_session_key,
parent_task_id = excluded.parent_task_id,
agent_id = excluded.agent_id,
@ -297,7 +290,6 @@ function ensureSchema(db: DatabaseSync) {
runtime TEXT NOT NULL,
source_id TEXT,
requester_session_key TEXT NOT NULL,
parent_flow_id TEXT,
child_session_key TEXT,
parent_task_id TEXT,
agent_id TEXT,
@ -326,35 +318,20 @@ function ensureSchema(db: DatabaseSync) {
);
`);
db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_run_id ON task_runs(run_id);`);
ensureColumn(db, "task_runs", "parent_flow_id", "TEXT");
db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_status ON task_runs(status);`);
db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_runtime_status ON task_runs(runtime, status);`);
db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_cleanup_after ON task_runs(cleanup_after);`);
db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_last_event_at ON task_runs(last_event_at);`);
db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_parent_flow_id ON task_runs(parent_flow_id);`);
db.exec(
`CREATE INDEX IF NOT EXISTS idx_task_runs_child_session_key ON task_runs(child_session_key);`,
);
}
function ensureColumn(
db: DatabaseSync,
tableName: string,
columnName: string,
columnDefinition: string,
) {
const rows = db.prepare(`PRAGMA table_info(${tableName})`).all() as Array<{ name?: string }>;
if (rows.some((row) => row.name === columnName)) {
return;
}
db.exec(`ALTER TABLE ${tableName} ADD COLUMN ${columnName} ${columnDefinition};`);
}
function ensureTaskRegistryPermissions(pathname: string) {
const dir = resolveTaskRegistryDir(process.env);
mkdirSync(dir, { recursive: true, mode: TASK_REGISTRY_DIR_MODE });
chmodSync(dir, TASK_REGISTRY_DIR_MODE);
for (const suffix of TASK_REGISTRY_SIDEcar_SUFFIXES) {
for (const suffix of TASK_REGISTRY_SIDECAR_SUFFIXES) {
const candidate = `${pathname}${suffix}`;
if (!existsSync(candidate)) {
continue;

View File

@ -138,26 +138,6 @@ describe("task-registry store runtime", () => {
});
});
it("persists parent flow linkage on task records", () => {
const created = createTaskRecord({
runtime: "acp",
requesterSessionKey: "agent:main:main",
parentFlowId: "flow-123",
runId: "run-linked",
task: "Linked task",
status: "running",
deliveryStatus: "pending",
});
resetTaskRegistryForTests({ persist: false });
expect(findTaskByRunId("run-linked")).toMatchObject({
taskId: created.taskId,
parentFlowId: "flow-123",
task: "Linked task",
});
});
it("hardens the sqlite task store directory and file modes", () => {
if (process.platform === "win32") {
return;

View File

@ -7,8 +7,6 @@ import {
} from "../infra/heartbeat-wake.js";
import { peekSystemEvents, resetSystemEventsForTest } from "../infra/system-events.js";
import { withTempDir } from "../test-helpers/temp-dir.js";
import { installInMemoryTaskAndFlowRegistryRuntime } from "../test-utils/task-flow-registry-runtime.js";
import { createFlowRecord, getFlowById, resetFlowRegistryForTests } from "./flow-registry.js";
import {
createTaskRecord,
findLatestTaskForSessionKey,
@ -103,13 +101,11 @@ async function withTaskRegistryTempDir<T>(run: (root: string) => Promise<T>): Pr
return await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
resetFlowRegistryForTests();
try {
return await run(root);
} finally {
// Close the sqlite-backed registry before Windows temp-dir cleanup tries to remove it.
resetTaskRegistryForTests();
resetFlowRegistryForTests();
}
});
}
@ -125,7 +121,6 @@ describe("task-registry", () => {
resetSystemEventsForTest();
resetHeartbeatWakeStateForTests();
resetTaskRegistryForTests({ persist: false });
resetFlowRegistryForTests({ persist: false });
hoisted.sendMessageMock.mockReset();
hoisted.cancelSessionMock.mockReset();
hoisted.killSubagentRunAdminMock.mockReset();
@ -697,50 +692,6 @@ describe("task-registry", () => {
});
});
it("adopts parent flow linkage when collapsing onto an earlier ACP record", async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests({ persist: false });
resetFlowRegistryForTests({ persist: false });
installInMemoryTaskAndFlowRegistryRuntime();
const directTask = createTaskRecord({
runtime: "acp",
requesterSessionKey: "agent:main:main",
requesterOrigin: {
channel: "telegram",
to: "telegram:123",
},
childSessionKey: "agent:main:acp:child",
runId: "run-collapse-parent-flow",
task: "Direct ACP child",
status: "running",
deliveryStatus: "pending",
});
const spawnedTask = createTaskRecord({
runtime: "acp",
requesterSessionKey: "agent:main:main",
requesterOrigin: {
channel: "telegram",
to: "telegram:123",
},
parentFlowId: "flow-123",
childSessionKey: "agent:main:acp:child",
runId: "run-collapse-parent-flow",
task: "Spawn ACP child",
status: "running",
deliveryStatus: "pending",
});
expect(spawnedTask.taskId).toBe(directTask.taskId);
expect(findTaskByRunId("run-collapse-parent-flow")).toMatchObject({
taskId: directTask.taskId,
parentFlowId: "flow-123",
});
});
});
it("collapses ACP run-owned task creation onto the existing spawned task", async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
@ -834,7 +785,6 @@ describe("task-registry", () => {
const task = createTaskRecord({
runtime: "subagent",
requesterSessionKey: "agent:main:main",
parentFlowId: "flow-restore",
childSessionKey: "agent:main:subagent:child",
runId: "run-restore",
task: "Restore me",
@ -848,7 +798,6 @@ describe("task-registry", () => {
expect(resolveTaskForLookupToken(task.taskId)).toMatchObject({
taskId: task.taskId,
parentFlowId: "flow-restore",
runId: "run-restore",
task: "Restore me",
});
@ -1132,71 +1081,6 @@ describe("task-registry", () => {
});
});
it("routes state-change updates through the parent flow owner when a task is flow-linked", async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
resetFlowRegistryForTests();
hoisted.sendMessageMock.mockResolvedValue({
channel: "discord",
to: "discord:flow",
via: "direct",
});
const flow = createFlowRecord({
shape: "single_task",
ownerSessionKey: "agent:flow:owner",
requesterOrigin: {
channel: "discord",
to: "discord:flow",
threadId: "444",
},
status: "queued",
notifyPolicy: "state_changes",
goal: "Investigate issue",
});
const task = createTaskRecord({
runtime: "acp",
requesterSessionKey: "agent:main:main",
requesterOrigin: {
channel: "telegram",
to: "telegram:123",
},
parentFlowId: flow.flowId,
childSessionKey: "agent:codex:acp:child",
runId: "run-flow-state",
task: "Investigate issue",
status: "queued",
notifyPolicy: "state_changes",
});
markTaskRunningByRunId({
runId: "run-flow-state",
eventSummary: "Started.",
});
await waitForAssertion(() =>
expect(hoisted.sendMessageMock).toHaveBeenCalledWith(
expect.objectContaining({
channel: "discord",
to: "discord:flow",
threadId: "444",
idempotencyKey: expect.stringContaining(`flow-event:${flow.flowId}:${task.taskId}:`),
mirror: expect.objectContaining({
sessionKey: "agent:flow:owner",
idempotencyKey: expect.stringContaining(`flow-event:${flow.flowId}:${task.taskId}:`),
}),
}),
),
);
expect(getFlowById(flow.flowId)).toMatchObject({
flowId: flow.flowId,
status: "running",
});
});
});
it("keeps background ACP progress off the foreground lane and only sends a terminal notify", async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
@ -1320,123 +1204,6 @@ describe("task-registry", () => {
});
});
it("routes terminal delivery through the parent flow owner when a task is flow-linked", async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
resetFlowRegistryForTests();
resetSystemEventsForTest();
hoisted.sendMessageMock.mockResolvedValue({
channel: "discord",
to: "discord:flow",
via: "direct",
});
const flow = createFlowRecord({
shape: "single_task",
ownerSessionKey: "agent:flow:owner",
requesterOrigin: {
channel: "discord",
to: "discord:flow",
threadId: "444",
},
status: "running",
goal: "Investigate issue",
});
createTaskRecord({
runtime: "acp",
requesterSessionKey: "agent:main:main",
requesterOrigin: {
channel: "telegram",
to: "telegram:123",
},
parentFlowId: flow.flowId,
childSessionKey: "agent:codex:acp:child",
runId: "run-flow-terminal",
task: "Investigate issue",
status: "running",
deliveryStatus: "pending",
});
emitAgentEvent({
runId: "run-flow-terminal",
stream: "lifecycle",
data: {
phase: "end",
endedAt: 250,
},
});
await flushAsyncWork();
await waitForAssertion(() =>
expect(hoisted.sendMessageMock).toHaveBeenCalledWith(
expect.objectContaining({
channel: "discord",
to: "discord:flow",
threadId: "444",
idempotencyKey: expect.stringContaining(`flow-terminal:${flow.flowId}:`),
mirror: expect.objectContaining({
sessionKey: "agent:flow:owner",
}),
}),
),
);
expect(getFlowById(flow.flowId)).toMatchObject({
flowId: flow.flowId,
status: "succeeded",
endedAt: 250,
});
expect(peekSystemEvents("agent:main:main")).toEqual([]);
});
});
it("queues fallback terminal delivery on the parent flow owner session when a task is flow-linked", async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
resetFlowRegistryForTests();
resetSystemEventsForTest();
const flow = createFlowRecord({
ownerSessionKey: "agent:flow:owner",
status: "running",
goal: "Investigate issue",
});
createTaskRecord({
runtime: "acp",
requesterSessionKey: "agent:main:main",
parentFlowId: flow.flowId,
childSessionKey: "agent:codex:acp:child",
runId: "run-flow-fallback",
task: "Investigate issue",
status: "running",
deliveryStatus: "pending",
});
emitAgentEvent({
runId: "run-flow-fallback",
stream: "lifecycle",
data: {
phase: "end",
endedAt: 250,
},
});
await flushAsyncWork();
await waitForAssertion(() =>
expect(peekSystemEvents("agent:flow:owner")).toEqual([
"Background task done: ACP background task (run run-flow).",
]),
);
expect(peekSystemEvents("agent:main:main")).toEqual([]);
expect(findTaskByRunId("run-flow-fallback")).toMatchObject({
deliveryStatus: "session_queued",
});
});
});
it("emits concise state-change updates without surfacing raw ACP chatter", async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;

View File

@ -9,7 +9,6 @@ import { createSubsystemLogger } from "../logging/subsystem.js";
import { parseAgentSessionKey } from "../routing/session-key.js";
import { normalizeDeliveryContext } from "../utils/delivery-context.js";
import { isDeliverableMessageChannel } from "../utils/message-channel.js";
import { getFlowById, syncFlowFromTask } from "./flow-registry.js";
import {
formatTaskBlockedFollowupMessage,
formatTaskStateChangeMessage,
@ -57,7 +56,6 @@ let deliveryRuntimePromise: Promise<typeof import("./task-registry-delivery-runt
type TaskDeliveryOwner = {
sessionKey: string;
requesterOrigin?: TaskDeliveryState["requesterOrigin"];
flowId?: string;
};
function cloneTaskRecord(record: TaskRecord): TaskRecord {
@ -374,7 +372,6 @@ function mergeExistingTaskForCreate(
params: {
requesterOrigin?: TaskDeliveryState["requesterOrigin"];
sourceId?: string;
parentFlowId?: string;
parentTaskId?: string;
agentId?: string;
label?: string;
@ -397,9 +394,6 @@ function mergeExistingTaskForCreate(
if (params.sourceId?.trim() && !existing.sourceId?.trim()) {
patch.sourceId = params.sourceId.trim();
}
if (params.parentFlowId?.trim() && !existing.parentFlowId?.trim()) {
patch.parentFlowId = params.parentFlowId.trim();
}
if (params.parentTaskId?.trim() && !existing.parentTaskId?.trim()) {
patch.parentTaskId = params.parentTaskId.trim();
}
@ -440,36 +434,22 @@ function taskTerminalDeliveryIdempotencyKey(task: TaskRecord): string {
return `task-terminal:${task.taskId}:${task.status}:${outcome}`;
}
function flowTerminalDeliveryIdempotencyKey(flowId: string, task: TaskRecord): string {
const outcome = task.status === "succeeded" ? (task.terminalOutcome ?? "default") : "default";
return `flow-terminal:${flowId}:${task.taskId}:${task.status}:${outcome}`;
}
function resolveTaskStateChangeIdempotencyKey(params: {
task: TaskRecord;
latestEvent: TaskEventRecord;
owner: TaskDeliveryOwner;
}): string {
if (params.owner.flowId) {
return `flow-event:${params.owner.flowId}:${params.task.taskId}:${params.latestEvent.at}:${params.latestEvent.kind}`;
}
return `task-event:${params.task.taskId}:${params.latestEvent.at}:${params.latestEvent.kind}`;
}
function resolveTaskTerminalIdempotencyKey(task: TaskRecord, owner: TaskDeliveryOwner): string {
return owner.flowId
? flowTerminalDeliveryIdempotencyKey(owner.flowId, task)
: taskTerminalDeliveryIdempotencyKey(task);
function resolveTaskTerminalIdempotencyKey(task: TaskRecord): string {
return taskTerminalDeliveryIdempotencyKey(task);
}
function resolveTaskDeliveryOwner(task: TaskRecord): TaskDeliveryOwner {
const flow = task.parentFlowId?.trim() ? getFlowById(task.parentFlowId) : undefined;
return {
sessionKey: flow?.ownerSessionKey?.trim() || task.requesterSessionKey.trim(),
requesterOrigin: normalizeDeliveryContext(
flow?.requesterOrigin ?? taskDeliveryStates.get(task.taskId)?.requesterOrigin,
),
...(flow ? { flowId: flow.flowId } : {}),
sessionKey: task.requesterSessionKey.trim(),
requesterOrigin: normalizeDeliveryContext(taskDeliveryStates.get(task.taskId)?.requesterOrigin),
};
}
@ -529,15 +509,6 @@ function updateTask(taskId: string, patch: Partial<TaskRecord>): TaskRecord | nu
addSessionKeyIndex(taskId, next);
}
persistTaskUpsert(next);
try {
syncFlowFromTask(next);
} catch (error) {
log.warn("Failed to sync parent flow from task update", {
taskId,
flowId: next.parentFlowId,
error,
});
}
emitTaskRegistryHookEvent(() => ({
kind: "upserted",
task: cloneTaskRecord(next),
@ -585,7 +556,7 @@ function queueTaskSystemEvent(task: TaskRecord, text: string) {
}
enqueueSystemEvent(text, {
sessionKey: requesterSessionKey,
contextKey: owner.flowId ? `flow:${owner.flowId}` : `task:${task.taskId}`,
contextKey: `task:${task.taskId}`,
deliveryContext: owner.requesterOrigin,
});
requestHeartbeatNow({
@ -607,9 +578,7 @@ function queueBlockedTaskFollowup(task: TaskRecord) {
}
enqueueSystemEvent(followupText, {
sessionKey: requesterSessionKey,
contextKey: owner.flowId
? `flow:${owner.flowId}:blocked-followup`
: `task:${task.taskId}:blocked-followup`,
contextKey: `task:${task.taskId}:blocked-followup`,
deliveryContext: owner.requesterOrigin,
});
requestHeartbeatNow({
@ -678,7 +647,7 @@ export async function maybeDeliverTaskTerminalUpdate(taskId: string): Promise<Ta
try {
const { sendMessage } = await loadTaskRegistryDeliveryRuntime();
const requesterAgentId = parseAgentSessionKey(owner.sessionKey)?.agentId;
const idempotencyKey = resolveTaskTerminalIdempotencyKey(latest, owner);
const idempotencyKey = resolveTaskTerminalIdempotencyKey(latest);
await sendMessage({
channel: owner.requesterOrigin?.channel,
to: owner.requesterOrigin?.to ?? "",
@ -973,7 +942,6 @@ export function createTaskRecord(params: {
sourceId?: string;
requesterSessionKey: string;
requesterOrigin?: TaskDeliveryState["requesterOrigin"];
parentFlowId?: string;
childSessionKey?: string;
parentTaskId?: string;
agentId?: string;
@ -1011,7 +979,6 @@ export function createTaskRecord(params: {
runtime: params.runtime,
sourceId: params.sourceId?.trim() || undefined,
requesterSessionKey: params.requesterSessionKey,
parentFlowId: params.parentFlowId?.trim() || undefined,
childSessionKey: params.childSessionKey,
parentTaskId: params.parentTaskId?.trim() || undefined,
agentId: params.agentId?.trim() || undefined,
@ -1048,17 +1015,6 @@ export function createTaskRecord(params: {
kind: "upserted",
task: cloneTaskRecord(record),
}));
if (record.parentFlowId?.trim()) {
try {
syncFlowFromTask(record);
} catch (error) {
log.warn("Failed to sync parent flow from task create", {
taskId: record.taskId,
flowId: record.parentFlowId,
error,
});
}
}
if (isTerminalTaskStatus(record.status)) {
void maybeDeliverTaskTerminalUpdate(taskId);
}
@ -1228,24 +1184,6 @@ export function updateTaskNotifyPolicyById(params: {
});
}
export function linkTaskToFlowById(params: { taskId: string; flowId: string }): TaskRecord | null {
ensureTaskRegistryReady();
const flowId = params.flowId.trim();
if (!flowId) {
return null;
}
const current = tasks.get(params.taskId);
if (!current) {
return null;
}
if (current.parentFlowId?.trim()) {
return cloneTaskRecord(current);
}
return updateTask(params.taskId, {
parentFlowId: flowId,
});
}
export async function cancelTaskById(params: {
cfg: OpenClawConfig;
taskId: string;
@ -1367,34 +1305,6 @@ export function findLatestTaskForSessionKey(sessionKey: string): TaskRecord | un
return task ? cloneTaskRecord(task) : undefined;
}
export function listTasksForFlowId(flowId: string): TaskRecord[] {
ensureTaskRegistryReady();
const normalizedFlowId = flowId.trim();
if (!normalizedFlowId) {
return [];
}
return [...tasks.values()]
.map((task, insertionIndex) =>
task.parentFlowId?.trim() === normalizedFlowId
? { ...cloneTaskRecord(task), insertionIndex }
: null,
)
.filter(
(
task,
): task is TaskRecord & {
insertionIndex: number;
} => Boolean(task),
)
.toSorted(compareTasksNewestFirst)
.map(({ insertionIndex: _, ...task }) => task);
}
export function findLatestTaskForFlowId(flowId: string): TaskRecord | undefined {
const task = listTasksForFlowId(flowId)[0];
return task ? cloneTaskRecord(task) : undefined;
}
export function listTasksForSessionKey(sessionKey: string): TaskRecord[] {
ensureTaskRegistryReady();
const key = normalizeSessionIndexKey(sessionKey);

View File

@ -54,7 +54,6 @@ export type TaskRecord = {
runtime: TaskRuntime;
sourceId?: string;
requesterSessionKey: string;
parentFlowId?: string;
childSessionKey?: string;
parentTaskId?: string;
agentId?: string;

View File

@ -1,9 +1,3 @@
import {
configureFlowRegistryRuntime,
type FlowRegistryStore,
type FlowRegistryStoreSnapshot,
} from "../tasks/flow-registry.store.js";
import type { FlowRecord } from "../tasks/flow-registry.types.js";
import {
configureTaskRegistryRuntime,
type TaskRegistryStore,
@ -22,24 +16,13 @@ function cloneDeliveryState(state: TaskDeliveryState): TaskDeliveryState {
};
}
function cloneFlow(flow: FlowRecord): FlowRecord {
return {
...flow,
...(flow.requesterOrigin ? { requesterOrigin: { ...flow.requesterOrigin } } : {}),
};
}
export function installInMemoryTaskAndFlowRegistryRuntime(): {
export function installInMemoryTaskRegistryRuntime(): {
taskStore: TaskRegistryStore;
flowStore: FlowRegistryStore;
} {
let taskSnapshot: TaskRegistryStoreSnapshot = {
tasks: new Map<string, TaskRecord>(),
deliveryStates: new Map<string, TaskDeliveryState>(),
};
let flowSnapshot: FlowRegistryStoreSnapshot = {
flows: new Map<string, FlowRecord>(),
};
const taskStore: TaskRegistryStore = {
loadSnapshot: () => ({
@ -80,28 +63,6 @@ export function installInMemoryTaskAndFlowRegistryRuntime(): {
},
};
const flowStore: FlowRegistryStore = {
loadSnapshot: () => ({
flows: new Map(
[...flowSnapshot.flows.entries()].map(([flowId, flow]) => [flowId, cloneFlow(flow)]),
),
}),
saveSnapshot: (snapshot) => {
flowSnapshot = {
flows: new Map(
[...snapshot.flows.entries()].map(([flowId, flow]) => [flowId, cloneFlow(flow)]),
),
};
},
upsertFlow: (flow) => {
flowSnapshot.flows.set(flow.flowId, cloneFlow(flow));
},
deleteFlow: (flowId) => {
flowSnapshot.flows.delete(flowId);
},
};
configureTaskRegistryRuntime({ store: taskStore });
configureFlowRegistryRuntime({ store: flowStore });
return { taskStore, flowStore };
return { taskStore };
}