From 1a313caff3c59798cc8006395cfd8fdeb4cf6002 Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Wed, 1 Apr 2026 02:24:51 +0900 Subject: [PATCH] refactor(tasks): remove flow registry layer --- src/acp/control-plane/manager.test.ts | 14 +- .../register.status-health-sessions.test.ts | 50 --- .../register.status-health-sessions.ts | 81 ---- src/commands/doctor-workspace-status.test.ts | 62 --- src/commands/doctor-workspace-status.ts | 57 --- src/commands/flows.test.ts | 160 -------- src/commands/flows.ts | 219 ---------- src/tasks/flow-registry.paths.ts | 10 - src/tasks/flow-registry.store.sqlite.ts | 318 --------------- src/tasks/flow-registry.store.test.ts | 148 ------- src/tasks/flow-registry.store.ts | 45 --- src/tasks/flow-registry.test.ts | 256 ------------ src/tasks/flow-registry.ts | 349 ---------------- src/tasks/flow-registry.types.ts | 42 -- src/tasks/flow-runtime.test.ts | 281 ------------- src/tasks/flow-runtime.ts | 377 ------------------ src/tasks/task-executor.test.ts | 294 ++------------ src/tasks/task-executor.ts | 309 +------------- .../task-registry-import-boundary.test.ts | 3 - src/tasks/task-registry.store.sqlite.ts | 27 +- src/tasks/task-registry.store.test.ts | 20 - src/tasks/task-registry.test.ts | 233 ----------- src/tasks/task-registry.ts | 104 +---- src/tasks/task-registry.types.ts | 1 - ...ry-runtime.ts => task-registry-runtime.ts} | 43 +- 25 files changed, 55 insertions(+), 3448 deletions(-) delete mode 100644 src/commands/flows.test.ts delete mode 100644 src/commands/flows.ts delete mode 100644 src/tasks/flow-registry.paths.ts delete mode 100644 src/tasks/flow-registry.store.sqlite.ts delete mode 100644 src/tasks/flow-registry.store.test.ts delete mode 100644 src/tasks/flow-registry.store.ts delete mode 100644 src/tasks/flow-registry.test.ts delete mode 100644 src/tasks/flow-registry.ts delete mode 100644 src/tasks/flow-registry.types.ts delete mode 100644 src/tasks/flow-runtime.test.ts delete mode 100644 src/tasks/flow-runtime.ts rename src/test-utils/{task-flow-registry-runtime.ts => task-registry-runtime.ts} (60%) diff --git a/src/acp/control-plane/manager.test.ts b/src/acp/control-plane/manager.test.ts index 477c4451cf8..ef034c742c1 100644 --- a/src/acp/control-plane/manager.test.ts +++ b/src/acp/control-plane/manager.test.ts @@ -39,8 +39,7 @@ let AcpRuntimeError: typeof import("../runtime/errors.js").AcpRuntimeError; let resetAcpSessionManagerForTests: typeof import("./manager.js").__testing.resetAcpSessionManagerForTests; let findTaskByRunId: typeof import("../../tasks/task-registry.js").findTaskByRunId; let resetTaskRegistryForTests: typeof import("../../tasks/task-registry.js").resetTaskRegistryForTests; -let resetFlowRegistryForTests: typeof import("../../tasks/flow-registry.js").resetFlowRegistryForTests; -let installInMemoryTaskAndFlowRegistryRuntime: typeof import("../../test-utils/task-flow-registry-runtime.js").installInMemoryTaskAndFlowRegistryRuntime; +let installInMemoryTaskRegistryRuntime: typeof import("../../test-utils/task-registry-runtime.js").installInMemoryTaskRegistryRuntime; const baseCfg = { acp: { @@ -55,13 +54,11 @@ async function withAcpManagerTaskStateDir(run: (root: string) => Promise): await withTempDir({ prefix: "openclaw-acp-manager-task-" }, async (root) => { process.env.OPENCLAW_STATE_DIR = root; resetTaskRegistryForTests({ persist: false }); - resetFlowRegistryForTests({ persist: false }); - installInMemoryTaskAndFlowRegistryRuntime(); + installInMemoryTaskRegistryRuntime(); try { await run(root); } finally { resetTaskRegistryForTests({ persist: false }); - resetFlowRegistryForTests({ persist: false }); } }); } @@ -178,16 +175,14 @@ function extractRuntimeOptionsFromUpserts(): Array { beforeAll(async () => { - vi.resetModules(); ({ AcpSessionManager, __testing: { resetAcpSessionManagerForTests }, } = await import("./manager.js")); ({ AcpRuntimeError } = await import("../runtime/errors.js")); ({ findTaskByRunId, resetTaskRegistryForTests } = await import("../../tasks/task-registry.js")); - ({ resetFlowRegistryForTests } = await import("../../tasks/flow-registry.js")); - ({ installInMemoryTaskAndFlowRegistryRuntime } = - await import("../../test-utils/task-flow-registry-runtime.js")); + ({ installInMemoryTaskRegistryRuntime } = + await import("../../test-utils/task-registry-runtime.js")); }); beforeEach(() => { @@ -206,7 +201,6 @@ describe("AcpSessionManager", () => { process.env.OPENCLAW_STATE_DIR = ORIGINAL_STATE_DIR; } resetTaskRegistryForTests({ persist: false }); - resetFlowRegistryForTests({ persist: false }); }); it("marks ACP-shaped sessions without metadata as stale", () => { diff --git a/src/cli/program/register.status-health-sessions.test.ts b/src/cli/program/register.status-health-sessions.test.ts index f7c222a6893..f5cc4495ab4 100644 --- a/src/cli/program/register.status-health-sessions.test.ts +++ b/src/cli/program/register.status-health-sessions.test.ts @@ -5,9 +5,6 @@ import { registerStatusHealthSessionsCommands } from "./register.status-health-s const mocks = vi.hoisted(() => ({ statusCommand: vi.fn(), healthCommand: vi.fn(), - flowsListCommand: vi.fn(), - flowsShowCommand: vi.fn(), - flowsCancelCommand: vi.fn(), sessionsCommand: vi.fn(), sessionsCleanupCommand: vi.fn(), tasksListCommand: vi.fn(), @@ -26,9 +23,6 @@ const mocks = vi.hoisted(() => ({ const statusCommand = mocks.statusCommand; const healthCommand = mocks.healthCommand; -const flowsListCommand = mocks.flowsListCommand; -const flowsShowCommand = mocks.flowsShowCommand; -const flowsCancelCommand = mocks.flowsCancelCommand; const sessionsCommand = mocks.sessionsCommand; const sessionsCleanupCommand = mocks.sessionsCleanupCommand; const tasksListCommand = mocks.tasksListCommand; @@ -48,12 +42,6 @@ vi.mock("../../commands/health.js", () => ({ healthCommand: mocks.healthCommand, })); -vi.mock("../../commands/flows.js", () => ({ - flowsListCommand: mocks.flowsListCommand, - flowsShowCommand: mocks.flowsShowCommand, - flowsCancelCommand: mocks.flowsCancelCommand, -})); - vi.mock("../../commands/sessions.js", () => ({ sessionsCommand: mocks.sessionsCommand, })); @@ -91,9 +79,6 @@ describe("registerStatusHealthSessionsCommands", () => { runtime.exit.mockImplementation(() => {}); statusCommand.mockResolvedValue(undefined); healthCommand.mockResolvedValue(undefined); - flowsListCommand.mockResolvedValue(undefined); - flowsShowCommand.mockResolvedValue(undefined); - flowsCancelCommand.mockResolvedValue(undefined); sessionsCommand.mockResolvedValue(undefined); sessionsCleanupCommand.mockResolvedValue(undefined); tasksListCommand.mockResolvedValue(undefined); @@ -332,39 +317,4 @@ describe("registerStatusHealthSessionsCommands", () => { runtime, ); }); - - it("runs flows list from the parent command", async () => { - await runCli(["flows", "--json", "--status", "blocked"]); - - expect(flowsListCommand).toHaveBeenCalledWith( - expect.objectContaining({ - json: true, - status: "blocked", - }), - runtime, - ); - }); - - it("runs flows show subcommand with lookup forwarding", async () => { - await runCli(["flows", "show", "flow-123", "--json"]); - - expect(flowsShowCommand).toHaveBeenCalledWith( - expect.objectContaining({ - lookup: "flow-123", - json: true, - }), - runtime, - ); - }); - - it("runs flows cancel subcommand with lookup forwarding", async () => { - await runCli(["flows", "cancel", "flow-123"]); - - expect(flowsCancelCommand).toHaveBeenCalledWith( - expect.objectContaining({ - lookup: "flow-123", - }), - runtime, - ); - }); }); diff --git a/src/cli/program/register.status-health-sessions.ts b/src/cli/program/register.status-health-sessions.ts index 4affcdc933c..1467be8fe72 100644 --- a/src/cli/program/register.status-health-sessions.ts +++ b/src/cli/program/register.status-health-sessions.ts @@ -1,5 +1,4 @@ import type { Command } from "commander"; -import { flowsCancelCommand, flowsListCommand, flowsShowCommand } from "../../commands/flows.js"; import { healthCommand } from "../../commands/health.js"; import { sessionsCleanupCommand } from "../../commands/sessions-cleanup.js"; import { sessionsCommand } from "../../commands/sessions.js"; @@ -374,84 +373,4 @@ export function registerStatusHealthSessionsCommands(program: Command) { ); }); }); - - const flowsCmd = program - .command("flows") - .description("Inspect ClawFlow state") - .option("--json", "Output as JSON", false) - .option( - "--status ", - "Filter by status (queued, running, waiting, blocked, succeeded, failed, cancelled, lost)", - ) - .action(async (opts) => { - await runCommandWithRuntime(defaultRuntime, async () => { - await flowsListCommand( - { - json: Boolean(opts.json), - status: opts.status as string | undefined, - }, - defaultRuntime, - ); - }); - }); - flowsCmd.enablePositionalOptions(); - - flowsCmd - .command("list") - .description("List tracked ClawFlow runs") - .option("--json", "Output as JSON", false) - .option( - "--status ", - "Filter by status (queued, running, waiting, blocked, succeeded, failed, cancelled, lost)", - ) - .action(async (opts, command) => { - const parentOpts = command.parent?.opts() as - | { - json?: boolean; - status?: string; - } - | undefined; - await runCommandWithRuntime(defaultRuntime, async () => { - await flowsListCommand( - { - json: Boolean(opts.json || parentOpts?.json), - status: (opts.status as string | undefined) ?? parentOpts?.status, - }, - defaultRuntime, - ); - }); - }); - - flowsCmd - .command("show") - .description("Show one ClawFlow by flow id or owner session key") - .argument("", "Flow id or owner session key") - .option("--json", "Output as JSON", false) - .action(async (lookup, opts, command) => { - const parentOpts = command.parent?.opts() as { json?: boolean } | undefined; - await runCommandWithRuntime(defaultRuntime, async () => { - await flowsShowCommand( - { - lookup, - json: Boolean(opts.json || parentOpts?.json), - }, - defaultRuntime, - ); - }); - }); - - flowsCmd - .command("cancel") - .description("Cancel a ClawFlow and its active child tasks") - .argument("", "Flow id or owner session key") - .action(async (lookup) => { - await runCommandWithRuntime(defaultRuntime, async () => { - await flowsCancelCommand( - { - lookup, - }, - defaultRuntime, - ); - }); - }); } diff --git a/src/commands/doctor-workspace-status.test.ts b/src/commands/doctor-workspace-status.test.ts index 595734fd2c1..d3803b22ccf 100644 --- a/src/commands/doctor-workspace-status.test.ts +++ b/src/commands/doctor-workspace-status.test.ts @@ -13,8 +13,6 @@ const mocks = vi.hoisted(() => ({ buildWorkspaceSkillStatus: vi.fn(), buildPluginStatusReport: vi.fn(), buildPluginCompatibilityWarnings: vi.fn(), - listFlowRecords: vi.fn(), - listTasksForFlowId: vi.fn(), })); vi.mock("../agents/agent-scope.js", () => ({ @@ -32,14 +30,6 @@ vi.mock("../plugins/status.js", () => ({ mocks.buildPluginCompatibilityWarnings(...args), })); -vi.mock("../tasks/flow-registry.js", () => ({ - listFlowRecords: (...args: unknown[]) => mocks.listFlowRecords(...args), -})); - -vi.mock("../tasks/task-registry.js", () => ({ - listTasksForFlowId: (...args: unknown[]) => mocks.listTasksForFlowId(...args), -})); - async function runNoteWorkspaceStatusForTest( loadResult: ReturnType, compatibilityWarnings: string[] = [], @@ -54,8 +44,6 @@ async function runNoteWorkspaceStatusForTest( ...loadResult, }); mocks.buildPluginCompatibilityWarnings.mockReturnValue(compatibilityWarnings); - mocks.listFlowRecords.mockReturnValue([]); - mocks.listTasksForFlowId.mockReturnValue([]); const noteSpy = vi.spyOn(noteModule, "note").mockImplementation(() => {}); noteWorkspaceStatus({}); @@ -171,54 +159,4 @@ describe("noteWorkspaceStatus", () => { noteSpy.mockRestore(); } }); - - it("surfaces ClawFlow recovery guidance for suspicious linear flows", async () => { - const noteSpy = await runNoteWorkspaceStatusForTest(createPluginLoadResult({ plugins: [] })); - mocks.listFlowRecords.mockReturnValue([ - { - flowId: "flow-orphaned", - shape: "linear", - ownerSessionKey: "agent:main:main", - status: "waiting", - notifyPolicy: "done_only", - goal: "Process PRs", - waitingOnTaskId: "task-wait-missing", - createdAt: 10, - updatedAt: 20, - }, - { - flowId: "flow-blocked", - shape: "single_task", - ownerSessionKey: "agent:main:main", - status: "blocked", - notifyPolicy: "done_only", - goal: "Patch file", - blockedTaskId: "task-missing", - createdAt: 10, - updatedAt: 20, - }, - ]); - mocks.listTasksForFlowId.mockImplementation((flowId: string) => { - if (flowId === "flow-blocked") { - return [{ taskId: "task-other" }]; - } - return []; - }); - - noteWorkspaceStatus({}); - - try { - const recoveryCalls = noteSpy.mock.calls.filter(([, title]) => title === "ClawFlow recovery"); - expect(recoveryCalls).toHaveLength(1); - const body = String(recoveryCalls[0]?.[0]); - expect(body).toContain( - "flow-orphaned: waiting flow points at missing task task-wait-missing", - ); - expect(body).toContain("flow-blocked: blocked flow points at missing task task-missing"); - expect(body).toContain("openclaw flows show "); - expect(body).toContain("openclaw flows cancel "); - } finally { - noteSpy.mockRestore(); - } - }); }); diff --git a/src/commands/doctor-workspace-status.ts b/src/commands/doctor-workspace-status.ts index 21a5c56de4d..c4c8d0a4fea 100644 --- a/src/commands/doctor-workspace-status.ts +++ b/src/commands/doctor-workspace-status.ts @@ -1,65 +1,10 @@ import { resolveAgentWorkspaceDir, resolveDefaultAgentId } from "../agents/agent-scope.js"; import { buildWorkspaceSkillStatus } from "../agents/skills-status.js"; -import { formatCliCommand } from "../cli/command-format.js"; import type { OpenClawConfig } from "../config/config.js"; import { buildPluginCompatibilityWarnings, buildPluginStatusReport } from "../plugins/status.js"; -import { listFlowRecords } from "../tasks/flow-registry.js"; -import { listTasksForFlowId } from "../tasks/task-registry.js"; import { note } from "../terminal/note.js"; import { detectLegacyWorkspaceDirs, formatLegacyWorkspaceWarning } from "./doctor-workspace.js"; -function noteFlowRecoveryHints() { - const suspicious = listFlowRecords().flatMap((flow) => { - const tasks = listTasksForFlowId(flow.flowId); - const findings: string[] = []; - const missingWaitingTask = - flow.shape === "linear" && - flow.status === "waiting" && - flow.waitingOnTaskId && - !tasks.some((task) => task.taskId === flow.waitingOnTaskId); - const missingBlockedTask = - flow.status === "blocked" && - flow.blockedTaskId && - !tasks.some((task) => task.taskId === flow.blockedTaskId); - if ( - flow.shape === "linear" && - (flow.status === "running" || flow.status === "waiting" || flow.status === "blocked") && - tasks.length === 0 && - !missingWaitingTask && - !missingBlockedTask - ) { - findings.push( - `${flow.flowId}: ${flow.status} linear flow has no linked tasks; inspect or cancel it manually.`, - ); - } - if (missingWaitingTask) { - findings.push( - `${flow.flowId}: waiting flow points at missing task ${flow.waitingOnTaskId}; inspect or cancel it manually.`, - ); - } - if (missingBlockedTask) { - findings.push( - `${flow.flowId}: blocked flow points at missing task ${flow.blockedTaskId}; inspect before retrying.`, - ); - } - return findings; - }); - if (suspicious.length === 0) { - return; - } - note( - [ - ...suspicious.slice(0, 5), - suspicious.length > 5 ? `...and ${suspicious.length - 5} more.` : null, - `Inspect: ${formatCliCommand("openclaw flows show ")}`, - `Cancel: ${formatCliCommand("openclaw flows cancel ")}`, - ] - .filter((line): line is string => Boolean(line)) - .join("\n"), - "ClawFlow recovery", - ); -} - export function noteWorkspaceStatus(cfg: OpenClawConfig) { const workspaceDir = resolveAgentWorkspaceDir(cfg, resolveDefaultAgentId(cfg)); const legacyWorkspace = detectLegacyWorkspaceDirs({ workspaceDir }); @@ -129,7 +74,5 @@ export function noteWorkspaceStatus(cfg: OpenClawConfig) { note(lines.join("\n"), "Plugin diagnostics"); } - noteFlowRecoveryHints(); - return { workspaceDir }; } diff --git a/src/commands/flows.test.ts b/src/commands/flows.test.ts deleted file mode 100644 index 21bceee11a9..00000000000 --- a/src/commands/flows.test.ts +++ /dev/null @@ -1,160 +0,0 @@ -import { beforeEach, describe, expect, it, vi } from "vitest"; -import { createCliRuntimeCapture } from "../cli/test-runtime-capture.js"; -import { flowsCancelCommand, flowsListCommand, flowsShowCommand } from "./flows.js"; - -const mocks = vi.hoisted(() => ({ - listFlowRecordsMock: vi.fn(), - resolveFlowForLookupTokenMock: vi.fn(), - getFlowByIdMock: vi.fn(), - listTasksForFlowIdMock: vi.fn(), - getFlowTaskSummaryMock: vi.fn(), - cancelFlowByIdMock: vi.fn(), - loadConfigMock: vi.fn(() => ({ loaded: true })), -})); - -vi.mock("../tasks/flow-registry.js", () => ({ - listFlowRecords: (...args: unknown[]) => mocks.listFlowRecordsMock(...args), - resolveFlowForLookupToken: (...args: unknown[]) => mocks.resolveFlowForLookupTokenMock(...args), - getFlowById: (...args: unknown[]) => mocks.getFlowByIdMock(...args), -})); - -vi.mock("../tasks/task-registry.js", () => ({ - listTasksForFlowId: (...args: unknown[]) => mocks.listTasksForFlowIdMock(...args), -})); - -vi.mock("../tasks/task-executor.js", () => ({ - getFlowTaskSummary: (...args: unknown[]) => mocks.getFlowTaskSummaryMock(...args), - cancelFlowById: (...args: unknown[]) => mocks.cancelFlowByIdMock(...args), -})); - -vi.mock("../config/config.js", () => ({ - loadConfig: () => mocks.loadConfigMock(), -})); - -const { - defaultRuntime: runtime, - runtimeLogs, - runtimeErrors, - resetRuntimeCapture, -} = createCliRuntimeCapture(); - -const flowFixture = { - flowId: "flow-12345678", - shape: "linear", - ownerSessionKey: "agent:main:main", - status: "waiting", - notifyPolicy: "done_only", - goal: "Process related PRs", - currentStep: "wait_for", - waitingOnTaskId: "task-12345678", - outputs: { - bucket: ["personal"], - }, - createdAt: Date.parse("2026-03-31T10:00:00.000Z"), - updatedAt: Date.parse("2026-03-31T10:05:00.000Z"), -} as const; - -const taskSummaryFixture = { - total: 2, - active: 1, - terminal: 1, - failures: 0, - byStatus: { - queued: 0, - running: 1, - succeeded: 1, - failed: 0, - timed_out: 0, - cancelled: 0, - lost: 0, - }, - byRuntime: { - subagent: 1, - acp: 1, - cli: 0, - cron: 0, - }, -} as const; - -const taskFixture = { - taskId: "task-12345678", - runtime: "acp", - requesterSessionKey: "agent:main:main", - parentFlowId: "flow-12345678", - childSessionKey: "agent:codex:acp:child", - runId: "run-12345678", - task: "Review PR", - status: "running", - deliveryStatus: "pending", - notifyPolicy: "done_only", - createdAt: Date.parse("2026-03-31T10:00:00.000Z"), - lastEventAt: Date.parse("2026-03-31T10:05:00.000Z"), -} as const; - -describe("flows commands", () => { - beforeEach(() => { - vi.clearAllMocks(); - resetRuntimeCapture(); - mocks.listFlowRecordsMock.mockReturnValue([]); - mocks.resolveFlowForLookupTokenMock.mockReturnValue(undefined); - mocks.getFlowByIdMock.mockReturnValue(undefined); - mocks.listTasksForFlowIdMock.mockReturnValue([]); - mocks.getFlowTaskSummaryMock.mockReturnValue(taskSummaryFixture); - mocks.cancelFlowByIdMock.mockResolvedValue({ - found: false, - cancelled: false, - reason: "missing", - }); - }); - - it("lists flow rows with task summary counts", async () => { - mocks.listFlowRecordsMock.mockReturnValue([flowFixture]); - - await flowsListCommand({}, runtime); - - expect(runtimeLogs[0]).toContain("Flows: 1"); - expect(runtimeLogs[1]).toContain("Flow pressure: 1 active · 0 blocked · 1 total"); - expect(runtimeLogs.join("\n")).toContain("Process related PRs"); - expect(runtimeLogs.join("\n")).toContain("1 active/2 total"); - }); - - it("shows one flow with linked tasks", async () => { - mocks.resolveFlowForLookupTokenMock.mockReturnValue(flowFixture); - mocks.listTasksForFlowIdMock.mockReturnValue([taskFixture]); - - await flowsShowCommand({ lookup: "flow-12345678" }, runtime); - - expect(runtimeLogs.join("\n")).toContain("shape: linear"); - expect(runtimeLogs.join("\n")).toContain("currentStep: wait_for"); - expect(runtimeLogs.join("\n")).toContain("waitingOnTaskId: task-12345678"); - expect(runtimeLogs.join("\n")).toContain("outputKeys: bucket"); - expect(runtimeLogs.join("\n")).toContain("tasks: 2 total · 1 active · 0 issues"); - expect(runtimeLogs.join("\n")).toContain("task-12345678 running run-12345678 Review PR"); - }); - - it("cancels a flow and reports the updated state", async () => { - mocks.resolveFlowForLookupTokenMock.mockReturnValue(flowFixture); - mocks.cancelFlowByIdMock.mockResolvedValue({ - found: true, - cancelled: true, - flow: { - ...flowFixture, - status: "cancelled", - }, - }); - mocks.getFlowByIdMock.mockReturnValue({ - ...flowFixture, - status: "cancelled", - }); - - await flowsCancelCommand({ lookup: "flow-12345678" }, runtime); - - expect(mocks.loadConfigMock).toHaveBeenCalled(); - expect(mocks.cancelFlowByIdMock).toHaveBeenCalledWith({ - cfg: { loaded: true }, - flowId: "flow-12345678", - }); - expect(runtimeLogs[0]).toContain("Cancelled flow-12345678 (linear) with status cancelled."); - expect(runtimeErrors).toEqual([]); - }); -}); diff --git a/src/commands/flows.ts b/src/commands/flows.ts deleted file mode 100644 index b52a2b6f33e..00000000000 --- a/src/commands/flows.ts +++ /dev/null @@ -1,219 +0,0 @@ -import { loadConfig } from "../config/config.js"; -import { info } from "../globals.js"; -import type { RuntimeEnv } from "../runtime.js"; -import { getFlowById, listFlowRecords, resolveFlowForLookupToken } from "../tasks/flow-registry.js"; -import type { FlowRecord, FlowStatus } from "../tasks/flow-registry.types.js"; -import { cancelFlowById, getFlowTaskSummary } from "../tasks/task-executor.js"; -import { listTasksForFlowId } from "../tasks/task-registry.js"; -import { isRich, theme } from "../terminal/theme.js"; - -const ID_PAD = 10; -const STATUS_PAD = 10; -const SHAPE_PAD = 12; - -function truncate(value: string, maxChars: number) { - if (value.length <= maxChars) { - return value; - } - if (maxChars <= 1) { - return value.slice(0, maxChars); - } - return `${value.slice(0, maxChars - 1)}…`; -} - -function shortToken(value: string | undefined, maxChars = ID_PAD): string { - const trimmed = value?.trim(); - if (!trimmed) { - return "n/a"; - } - return truncate(trimmed, maxChars); -} - -function formatFlowStatusCell(status: FlowStatus, rich: boolean) { - const padded = status.padEnd(STATUS_PAD); - if (!rich) { - return padded; - } - if (status === "succeeded") { - return theme.success(padded); - } - if (status === "failed" || status === "lost") { - return theme.error(padded); - } - if (status === "running") { - return theme.accentBright(padded); - } - if (status === "blocked") { - return theme.warn(padded); - } - return theme.muted(padded); -} - -function formatFlowRows(flows: FlowRecord[], rich: boolean) { - const header = [ - "Flow".padEnd(ID_PAD), - "Shape".padEnd(SHAPE_PAD), - "Status".padEnd(STATUS_PAD), - "Owner".padEnd(24), - "Tasks".padEnd(14), - "Goal", - ].join(" "); - const lines = [rich ? theme.heading(header) : header]; - for (const flow of flows) { - const taskSummary = getFlowTaskSummary(flow.flowId); - const counts = `${taskSummary.active} active/${taskSummary.total} total`; - lines.push( - [ - shortToken(flow.flowId).padEnd(ID_PAD), - flow.shape.padEnd(SHAPE_PAD), - formatFlowStatusCell(flow.status, rich), - truncate(flow.ownerSessionKey, 24).padEnd(24), - counts.padEnd(14), - truncate(flow.goal, 80), - ].join(" "), - ); - } - return lines; -} - -function formatFlowListSummary(flows: FlowRecord[]) { - const active = flows.filter( - (flow) => flow.status === "queued" || flow.status === "running" || flow.status === "waiting", - ).length; - const blocked = flows.filter((flow) => flow.status === "blocked").length; - return `${active} active · ${blocked} blocked · ${flows.length} total`; -} - -export async function flowsListCommand( - opts: { json?: boolean; status?: string }, - runtime: RuntimeEnv, -) { - const statusFilter = opts.status?.trim(); - const flows = listFlowRecords().filter((flow) => { - if (statusFilter && flow.status !== statusFilter) { - return false; - } - return true; - }); - - if (opts.json) { - runtime.log( - JSON.stringify( - { - count: flows.length, - status: statusFilter ?? null, - flows: flows.map((flow) => ({ - ...flow, - tasks: listTasksForFlowId(flow.flowId), - taskSummary: getFlowTaskSummary(flow.flowId), - })), - }, - null, - 2, - ), - ); - return; - } - - runtime.log(info(`Flows: ${flows.length}`)); - runtime.log(info(`Flow pressure: ${formatFlowListSummary(flows)}`)); - if (statusFilter) { - runtime.log(info(`Status filter: ${statusFilter}`)); - } - if (flows.length === 0) { - runtime.log("No flows found."); - return; - } - const rich = isRich(); - for (const line of formatFlowRows(flows, rich)) { - runtime.log(line); - } -} - -export async function flowsShowCommand( - opts: { json?: boolean; lookup: string }, - runtime: RuntimeEnv, -) { - const flow = resolveFlowForLookupToken(opts.lookup); - if (!flow) { - runtime.error(`Flow not found: ${opts.lookup}`); - runtime.exit(1); - return; - } - const tasks = listTasksForFlowId(flow.flowId); - const taskSummary = getFlowTaskSummary(flow.flowId); - - if (opts.json) { - runtime.log( - JSON.stringify( - { - ...flow, - tasks, - taskSummary, - }, - null, - 2, - ), - ); - return; - } - - const lines = [ - "Flow:", - `flowId: ${flow.flowId}`, - `shape: ${flow.shape}`, - `status: ${flow.status}`, - `notify: ${flow.notifyPolicy}`, - `ownerSessionKey: ${flow.ownerSessionKey}`, - `goal: ${flow.goal}`, - `currentStep: ${flow.currentStep ?? "n/a"}`, - `waitingOnTaskId: ${flow.waitingOnTaskId ?? "n/a"}`, - `outputKeys: ${ - flow.outputs ? Object.keys(flow.outputs).toSorted().join(", ") || "n/a" : "n/a" - }`, - `blockedTaskId: ${flow.blockedTaskId ?? "n/a"}`, - `blockedSummary: ${flow.blockedSummary ?? "n/a"}`, - `createdAt: ${new Date(flow.createdAt).toISOString()}`, - `updatedAt: ${new Date(flow.updatedAt).toISOString()}`, - `endedAt: ${flow.endedAt ? new Date(flow.endedAt).toISOString() : "n/a"}`, - `tasks: ${taskSummary.total} total · ${taskSummary.active} active · ${taskSummary.failures} issues`, - ]; - for (const line of lines) { - runtime.log(line); - } - if (tasks.length === 0) { - runtime.log("Linked tasks: none"); - return; - } - runtime.log("Linked tasks:"); - for (const task of tasks) { - runtime.log( - `- ${task.taskId} ${task.status} ${task.runId ?? "n/a"} ${task.label ?? task.task}`, - ); - } -} - -export async function flowsCancelCommand(opts: { lookup: string }, runtime: RuntimeEnv) { - const flow = resolveFlowForLookupToken(opts.lookup); - if (!flow) { - runtime.error(`Flow not found: ${opts.lookup}`); - runtime.exit(1); - return; - } - const result = await cancelFlowById({ - cfg: loadConfig(), - flowId: flow.flowId, - }); - if (!result.found) { - runtime.error(result.reason ?? `Flow not found: ${opts.lookup}`); - runtime.exit(1); - return; - } - if (!result.cancelled) { - runtime.error(result.reason ?? `Could not cancel flow: ${opts.lookup}`); - runtime.exit(1); - return; - } - const updated = getFlowById(flow.flowId) ?? result.flow ?? flow; - runtime.log(`Cancelled ${updated.flowId} (${updated.shape}) with status ${updated.status}.`); -} diff --git a/src/tasks/flow-registry.paths.ts b/src/tasks/flow-registry.paths.ts deleted file mode 100644 index 8d35c657434..00000000000 --- a/src/tasks/flow-registry.paths.ts +++ /dev/null @@ -1,10 +0,0 @@ -import path from "node:path"; -import { resolveTaskStateDir } from "./task-registry.paths.js"; - -export function resolveFlowRegistryDir(env: NodeJS.ProcessEnv = process.env): string { - return path.join(resolveTaskStateDir(env), "flows"); -} - -export function resolveFlowRegistrySqlitePath(env: NodeJS.ProcessEnv = process.env): string { - return path.join(resolveFlowRegistryDir(env), "registry.sqlite"); -} diff --git a/src/tasks/flow-registry.store.sqlite.ts b/src/tasks/flow-registry.store.sqlite.ts deleted file mode 100644 index ecbd3e000dd..00000000000 --- a/src/tasks/flow-registry.store.sqlite.ts +++ /dev/null @@ -1,318 +0,0 @@ -import { chmodSync, existsSync, mkdirSync } from "node:fs"; -import type { DatabaseSync, StatementSync } from "node:sqlite"; -import { requireNodeSqlite } from "../infra/node-sqlite.js"; -import type { DeliveryContext } from "../utils/delivery-context.js"; -import { resolveFlowRegistryDir, resolveFlowRegistrySqlitePath } from "./flow-registry.paths.js"; -import type { FlowRegistryStoreSnapshot } from "./flow-registry.store.js"; -import type { FlowOutputBag, FlowRecord, FlowShape } from "./flow-registry.types.js"; - -type FlowRegistryRow = { - flow_id: string; - shape: FlowShape | null; - owner_session_key: string; - requester_origin_json: string | null; - status: FlowRecord["status"]; - notify_policy: FlowRecord["notifyPolicy"]; - goal: string; - current_step: string | null; - waiting_on_task_id: string | null; - outputs_json: string | null; - blocked_task_id: string | null; - blocked_summary: string | null; - created_at: number | bigint; - updated_at: number | bigint; - ended_at: number | bigint | null; -}; - -type FlowRegistryStatements = { - selectAll: StatementSync; - upsertRow: StatementSync; - deleteRow: StatementSync; - clearRows: StatementSync; -}; - -type FlowRegistryDatabase = { - db: DatabaseSync; - path: string; - statements: FlowRegistryStatements; -}; - -let cachedDatabase: FlowRegistryDatabase | null = null; -const FLOW_REGISTRY_DIR_MODE = 0o700; -const FLOW_REGISTRY_FILE_MODE = 0o600; -const FLOW_REGISTRY_SIDECAR_SUFFIXES = ["", "-shm", "-wal"] as const; - -function normalizeNumber(value: number | bigint | null): number | undefined { - if (typeof value === "bigint") { - return Number(value); - } - return typeof value === "number" ? value : undefined; -} - -function serializeJson(value: unknown): string | null { - return value == null ? null : JSON.stringify(value); -} - -function parseJsonValue(raw: string | null): T | undefined { - if (!raw?.trim()) { - return undefined; - } - try { - return JSON.parse(raw) as T; - } catch { - return undefined; - } -} - -function rowToFlowRecord(row: FlowRegistryRow): FlowRecord { - const endedAt = normalizeNumber(row.ended_at); - const requesterOrigin = parseJsonValue(row.requester_origin_json); - const outputs = parseJsonValue(row.outputs_json); - return { - flowId: row.flow_id, - shape: row.shape === "linear" ? "linear" : "single_task", - ownerSessionKey: row.owner_session_key, - ...(requesterOrigin ? { requesterOrigin } : {}), - status: row.status, - notifyPolicy: row.notify_policy, - goal: row.goal, - ...(row.current_step ? { currentStep: row.current_step } : {}), - ...(row.waiting_on_task_id ? { waitingOnTaskId: row.waiting_on_task_id } : {}), - ...(outputs ? { outputs } : {}), - ...(row.blocked_task_id ? { blockedTaskId: row.blocked_task_id } : {}), - ...(row.blocked_summary ? { blockedSummary: row.blocked_summary } : {}), - createdAt: normalizeNumber(row.created_at) ?? 0, - updatedAt: normalizeNumber(row.updated_at) ?? 0, - ...(endedAt != null ? { endedAt } : {}), - }; -} - -function bindFlowRecord(record: FlowRecord) { - return { - flow_id: record.flowId, - shape: record.shape, - owner_session_key: record.ownerSessionKey, - requester_origin_json: serializeJson(record.requesterOrigin), - status: record.status, - notify_policy: record.notifyPolicy, - goal: record.goal, - current_step: record.currentStep ?? null, - waiting_on_task_id: record.waitingOnTaskId ?? null, - outputs_json: serializeJson(record.outputs), - blocked_task_id: record.blockedTaskId ?? null, - blocked_summary: record.blockedSummary ?? null, - created_at: record.createdAt, - updated_at: record.updatedAt, - ended_at: record.endedAt ?? null, - }; -} - -function createStatements(db: DatabaseSync): FlowRegistryStatements { - return { - selectAll: db.prepare(` - SELECT - flow_id, - shape, - owner_session_key, - requester_origin_json, - status, - notify_policy, - goal, - current_step, - waiting_on_task_id, - outputs_json, - blocked_task_id, - blocked_summary, - created_at, - updated_at, - ended_at - FROM flow_runs - ORDER BY created_at ASC, flow_id ASC - `), - upsertRow: db.prepare(` - INSERT INTO flow_runs ( - flow_id, - shape, - owner_session_key, - requester_origin_json, - status, - notify_policy, - goal, - current_step, - waiting_on_task_id, - outputs_json, - blocked_task_id, - blocked_summary, - created_at, - updated_at, - ended_at - ) VALUES ( - @flow_id, - @shape, - @owner_session_key, - @requester_origin_json, - @status, - @notify_policy, - @goal, - @current_step, - @waiting_on_task_id, - @outputs_json, - @blocked_task_id, - @blocked_summary, - @created_at, - @updated_at, - @ended_at - ) - ON CONFLICT(flow_id) DO UPDATE SET - shape = excluded.shape, - owner_session_key = excluded.owner_session_key, - requester_origin_json = excluded.requester_origin_json, - status = excluded.status, - notify_policy = excluded.notify_policy, - goal = excluded.goal, - current_step = excluded.current_step, - waiting_on_task_id = excluded.waiting_on_task_id, - outputs_json = excluded.outputs_json, - blocked_task_id = excluded.blocked_task_id, - blocked_summary = excluded.blocked_summary, - created_at = excluded.created_at, - updated_at = excluded.updated_at, - ended_at = excluded.ended_at - `), - deleteRow: db.prepare(`DELETE FROM flow_runs WHERE flow_id = ?`), - clearRows: db.prepare(`DELETE FROM flow_runs`), - }; -} - -function ensureSchema(db: DatabaseSync) { - db.exec(` - CREATE TABLE IF NOT EXISTS flow_runs ( - flow_id TEXT PRIMARY KEY, - shape TEXT NOT NULL, - owner_session_key TEXT NOT NULL, - requester_origin_json TEXT, - status TEXT NOT NULL, - notify_policy TEXT NOT NULL, - goal TEXT NOT NULL, - current_step TEXT, - waiting_on_task_id TEXT, - outputs_json TEXT, - blocked_task_id TEXT, - blocked_summary TEXT, - created_at INTEGER NOT NULL, - updated_at INTEGER NOT NULL, - ended_at INTEGER - ); - `); - ensureColumn(db, "flow_runs", "shape", "TEXT"); - ensureColumn(db, "flow_runs", "waiting_on_task_id", "TEXT"); - ensureColumn(db, "flow_runs", "outputs_json", "TEXT"); - ensureColumn(db, "flow_runs", "blocked_task_id", "TEXT"); - ensureColumn(db, "flow_runs", "blocked_summary", "TEXT"); - db.exec(`CREATE INDEX IF NOT EXISTS idx_flow_runs_status ON flow_runs(status);`); - db.exec( - `CREATE INDEX IF NOT EXISTS idx_flow_runs_owner_session_key ON flow_runs(owner_session_key);`, - ); - db.exec(`CREATE INDEX IF NOT EXISTS idx_flow_runs_updated_at ON flow_runs(updated_at);`); -} - -function ensureColumn( - db: DatabaseSync, - tableName: string, - columnName: string, - columnDefinition: string, -) { - const rows = db.prepare(`PRAGMA table_info(${tableName})`).all() as Array<{ name?: string }>; - if (rows.some((row) => row.name === columnName)) { - return; - } - db.exec(`ALTER TABLE ${tableName} ADD COLUMN ${columnName} ${columnDefinition};`); -} - -function ensureFlowRegistryPermissions(pathname: string) { - const dir = resolveFlowRegistryDir(process.env); - mkdirSync(dir, { recursive: true, mode: FLOW_REGISTRY_DIR_MODE }); - chmodSync(dir, FLOW_REGISTRY_DIR_MODE); - for (const suffix of FLOW_REGISTRY_SIDECAR_SUFFIXES) { - const candidate = `${pathname}${suffix}`; - if (!existsSync(candidate)) { - continue; - } - chmodSync(candidate, FLOW_REGISTRY_FILE_MODE); - } -} - -function openFlowRegistryDatabase(): FlowRegistryDatabase { - const pathname = resolveFlowRegistrySqlitePath(process.env); - if (cachedDatabase && cachedDatabase.path === pathname) { - return cachedDatabase; - } - if (cachedDatabase) { - cachedDatabase.db.close(); - cachedDatabase = null; - } - ensureFlowRegistryPermissions(pathname); - const { DatabaseSync } = requireNodeSqlite(); - const db = new DatabaseSync(pathname); - db.exec(`PRAGMA journal_mode = WAL;`); - db.exec(`PRAGMA synchronous = NORMAL;`); - db.exec(`PRAGMA busy_timeout = 5000;`); - ensureSchema(db); - ensureFlowRegistryPermissions(pathname); - cachedDatabase = { - db, - path: pathname, - statements: createStatements(db), - }; - return cachedDatabase; -} - -function withWriteTransaction(write: (statements: FlowRegistryStatements) => void) { - const { db, path, statements } = openFlowRegistryDatabase(); - db.exec("BEGIN IMMEDIATE"); - try { - write(statements); - db.exec("COMMIT"); - ensureFlowRegistryPermissions(path); - } catch (error) { - db.exec("ROLLBACK"); - throw error; - } -} - -export function loadFlowRegistryStateFromSqlite(): FlowRegistryStoreSnapshot { - const { statements } = openFlowRegistryDatabase(); - const rows = statements.selectAll.all() as FlowRegistryRow[]; - return { - flows: new Map(rows.map((row) => [row.flow_id, rowToFlowRecord(row)])), - }; -} - -export function saveFlowRegistryStateToSqlite(snapshot: FlowRegistryStoreSnapshot) { - withWriteTransaction((statements) => { - statements.clearRows.run(); - for (const flow of snapshot.flows.values()) { - statements.upsertRow.run(bindFlowRecord(flow)); - } - }); -} - -export function upsertFlowRegistryRecordToSqlite(flow: FlowRecord) { - const store = openFlowRegistryDatabase(); - store.statements.upsertRow.run(bindFlowRecord(flow)); - ensureFlowRegistryPermissions(store.path); -} - -export function deleteFlowRegistryRecordFromSqlite(flowId: string) { - const store = openFlowRegistryDatabase(); - store.statements.deleteRow.run(flowId); - ensureFlowRegistryPermissions(store.path); -} - -export function closeFlowRegistrySqliteStore() { - if (!cachedDatabase) { - return; - } - cachedDatabase.db.close(); - cachedDatabase = null; -} diff --git a/src/tasks/flow-registry.store.test.ts b/src/tasks/flow-registry.store.test.ts deleted file mode 100644 index c0e742bd2f8..00000000000 --- a/src/tasks/flow-registry.store.test.ts +++ /dev/null @@ -1,148 +0,0 @@ -import { statSync } from "node:fs"; -import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; -import { withTempDir } from "../test-helpers/temp-dir.js"; -import { createFlowRecord, getFlowById, resetFlowRegistryForTests } from "./flow-registry.js"; -import { resolveFlowRegistryDir, resolveFlowRegistrySqlitePath } from "./flow-registry.paths.js"; -import { configureFlowRegistryRuntime } from "./flow-registry.store.js"; -import type { FlowRecord } from "./flow-registry.types.js"; - -function createStoredFlow(): FlowRecord { - return { - flowId: "flow-restored", - shape: "linear", - ownerSessionKey: "agent:main:main", - status: "blocked", - notifyPolicy: "done_only", - goal: "Restored flow", - currentStep: "spawn_task", - waitingOnTaskId: "task-waiting", - outputs: { - bucket: ["business"], - }, - blockedTaskId: "task-restored", - blockedSummary: "Writable session required.", - createdAt: 100, - updatedAt: 100, - endedAt: 120, - }; -} - -async function withFlowRegistryTempDir(run: (root: string) => Promise): Promise { - return await withTempDir({ prefix: "openclaw-flow-store-" }, async (root) => { - process.env.OPENCLAW_STATE_DIR = root; - resetFlowRegistryForTests(); - try { - return await run(root); - } finally { - // Close the sqlite-backed registry before Windows temp-dir cleanup removes the store root. - resetFlowRegistryForTests(); - } - }); -} - -describe("flow-registry store runtime", () => { - beforeEach(() => { - vi.useRealTimers(); - }); - - afterEach(() => { - vi.useRealTimers(); - delete process.env.OPENCLAW_STATE_DIR; - resetFlowRegistryForTests(); - }); - - it("uses the configured flow store for restore and save", () => { - const storedFlow = createStoredFlow(); - const loadSnapshot = vi.fn(() => ({ - flows: new Map([[storedFlow.flowId, storedFlow]]), - })); - const saveSnapshot = vi.fn(); - configureFlowRegistryRuntime({ - store: { - loadSnapshot, - saveSnapshot, - }, - }); - - expect(getFlowById("flow-restored")).toMatchObject({ - flowId: "flow-restored", - shape: "linear", - goal: "Restored flow", - waitingOnTaskId: "task-waiting", - outputs: { - bucket: ["business"], - }, - blockedTaskId: "task-restored", - blockedSummary: "Writable session required.", - }); - expect(loadSnapshot).toHaveBeenCalledTimes(1); - - createFlowRecord({ - ownerSessionKey: "agent:main:main", - goal: "New flow", - status: "running", - currentStep: "wait_for", - }); - - expect(saveSnapshot).toHaveBeenCalled(); - const latestSnapshot = saveSnapshot.mock.calls.at(-1)?.[0] as { - flows: ReadonlyMap; - }; - expect(latestSnapshot.flows.size).toBe(2); - expect(latestSnapshot.flows.get("flow-restored")?.goal).toBe("Restored flow"); - }); - - it("restores persisted flows from the default sqlite store", async () => { - await withFlowRegistryTempDir(async (root) => { - process.env.OPENCLAW_STATE_DIR = root; - resetFlowRegistryForTests(); - - const created = createFlowRecord({ - ownerSessionKey: "agent:main:main", - goal: "Persisted flow", - status: "waiting", - currentStep: "ask_user", - waitingOnTaskId: "task-restored", - outputs: { - bucket: ["personal"], - }, - }); - - resetFlowRegistryForTests({ persist: false }); - - expect(getFlowById(created.flowId)).toMatchObject({ - flowId: created.flowId, - shape: "linear", - status: "waiting", - currentStep: "ask_user", - waitingOnTaskId: "task-restored", - outputs: { - bucket: ["personal"], - }, - }); - }); - }); - - it("hardens the sqlite flow store directory and file modes", async () => { - if (process.platform === "win32") { - return; - } - await withFlowRegistryTempDir(async (root) => { - process.env.OPENCLAW_STATE_DIR = root; - resetFlowRegistryForTests(); - - createFlowRecord({ - ownerSessionKey: "agent:main:main", - goal: "Secured flow", - status: "blocked", - blockedTaskId: "task-secured", - blockedSummary: "Need auth.", - }); - - const registryDir = resolveFlowRegistryDir(process.env); - const sqlitePath = resolveFlowRegistrySqlitePath(process.env); - expect(statSync(registryDir).mode & 0o777).toBe(0o700); - expect(statSync(sqlitePath).mode & 0o777).toBe(0o600); - }); - }); -}); diff --git a/src/tasks/flow-registry.store.ts b/src/tasks/flow-registry.store.ts deleted file mode 100644 index 1574b0fde3a..00000000000 --- a/src/tasks/flow-registry.store.ts +++ /dev/null @@ -1,45 +0,0 @@ -import { - closeFlowRegistrySqliteStore, - deleteFlowRegistryRecordFromSqlite, - loadFlowRegistryStateFromSqlite, - saveFlowRegistryStateToSqlite, - upsertFlowRegistryRecordToSqlite, -} from "./flow-registry.store.sqlite.js"; -import type { FlowRecord } from "./flow-registry.types.js"; - -export type FlowRegistryStoreSnapshot = { - flows: Map; -}; - -export type FlowRegistryStore = { - loadSnapshot: () => FlowRegistryStoreSnapshot; - saveSnapshot: (snapshot: FlowRegistryStoreSnapshot) => void; - upsertFlow?: (flow: FlowRecord) => void; - deleteFlow?: (flowId: string) => void; - close?: () => void; -}; - -const defaultFlowRegistryStore: FlowRegistryStore = { - loadSnapshot: loadFlowRegistryStateFromSqlite, - saveSnapshot: saveFlowRegistryStateToSqlite, - upsertFlow: upsertFlowRegistryRecordToSqlite, - deleteFlow: deleteFlowRegistryRecordFromSqlite, - close: closeFlowRegistrySqliteStore, -}; - -let configuredFlowRegistryStore: FlowRegistryStore = defaultFlowRegistryStore; - -export function getFlowRegistryStore(): FlowRegistryStore { - return configuredFlowRegistryStore; -} - -export function configureFlowRegistryRuntime(params: { store?: FlowRegistryStore }) { - if (params.store) { - configuredFlowRegistryStore = params.store; - } -} - -export function resetFlowRegistryRuntimeForTests() { - configuredFlowRegistryStore.close?.(); - configuredFlowRegistryStore = defaultFlowRegistryStore; -} diff --git a/src/tasks/flow-registry.test.ts b/src/tasks/flow-registry.test.ts deleted file mode 100644 index a6a72322904..00000000000 --- a/src/tasks/flow-registry.test.ts +++ /dev/null @@ -1,256 +0,0 @@ -import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; -import { withTempDir } from "../test-helpers/temp-dir.js"; -import { - createFlowRecord, - deleteFlowRecordById, - getFlowById, - listFlowRecords, - resetFlowRegistryForTests, - syncFlowFromTask, - updateFlowRecordById, -} from "./flow-registry.js"; - -const ORIGINAL_STATE_DIR = process.env.OPENCLAW_STATE_DIR; - -async function withFlowRegistryTempDir(run: (root: string) => Promise): Promise { - return await withTempDir({ prefix: "openclaw-flow-registry-" }, async (root) => { - process.env.OPENCLAW_STATE_DIR = root; - resetFlowRegistryForTests(); - try { - return await run(root); - } finally { - // Close the sqlite-backed registry before Windows temp-dir cleanup removes the store root. - resetFlowRegistryForTests(); - } - }); -} - -describe("flow-registry", () => { - beforeEach(() => { - vi.useRealTimers(); - }); - - afterEach(() => { - vi.useRealTimers(); - if (ORIGINAL_STATE_DIR === undefined) { - delete process.env.OPENCLAW_STATE_DIR; - } else { - process.env.OPENCLAW_STATE_DIR = ORIGINAL_STATE_DIR; - } - resetFlowRegistryForTests(); - }); - - it("creates, updates, lists, and deletes flow records", async () => { - await withFlowRegistryTempDir(async (root) => { - process.env.OPENCLAW_STATE_DIR = root; - resetFlowRegistryForTests(); - - const created = createFlowRecord({ - ownerSessionKey: "agent:main:main", - goal: "Investigate flaky test", - status: "running", - currentStep: "spawn_task", - }); - - expect(getFlowById(created.flowId)).toMatchObject({ - flowId: created.flowId, - status: "running", - currentStep: "spawn_task", - }); - - const updated = updateFlowRecordById(created.flowId, { - status: "waiting", - currentStep: "ask_user", - waitingOnTaskId: "task-123", - outputs: { - bucket: ["personal"], - }, - }); - expect(updated).toMatchObject({ - flowId: created.flowId, - status: "waiting", - currentStep: "ask_user", - waitingOnTaskId: "task-123", - outputs: { - bucket: ["personal"], - }, - }); - - expect(listFlowRecords()).toEqual([ - expect.objectContaining({ - flowId: created.flowId, - goal: "Investigate flaky test", - status: "waiting", - }), - ]); - - expect(deleteFlowRecordById(created.flowId)).toBe(true); - expect(getFlowById(created.flowId)).toBeUndefined(); - expect(listFlowRecords()).toEqual([]); - }); - }); - - it("lists newest flows first", async () => { - await withFlowRegistryTempDir(async (root) => { - process.env.OPENCLAW_STATE_DIR = root; - resetFlowRegistryForTests(); - - const earlier = createFlowRecord({ - ownerSessionKey: "agent:main:main", - goal: "First flow", - createdAt: 100, - updatedAt: 100, - }); - const later = createFlowRecord({ - ownerSessionKey: "agent:main:main", - goal: "Second flow", - createdAt: 200, - updatedAt: 200, - }); - - expect(listFlowRecords().map((flow) => flow.flowId)).toEqual([later.flowId, earlier.flowId]); - }); - }); - - it("applies minimal defaults for new flow records", async () => { - await withFlowRegistryTempDir(async (root) => { - process.env.OPENCLAW_STATE_DIR = root; - resetFlowRegistryForTests(); - - const created = createFlowRecord({ - ownerSessionKey: "agent:main:main", - goal: "Background job", - }); - - expect(created).toMatchObject({ - flowId: expect.any(String), - shape: "linear", - ownerSessionKey: "agent:main:main", - goal: "Background job", - status: "queued", - notifyPolicy: "done_only", - }); - expect(created.currentStep).toBeUndefined(); - expect(created.endedAt).toBeUndefined(); - }); - }); - - it("preserves endedAt when later updates change other flow fields", async () => { - await withFlowRegistryTempDir(async (root) => { - process.env.OPENCLAW_STATE_DIR = root; - resetFlowRegistryForTests(); - - const created = createFlowRecord({ - ownerSessionKey: "agent:main:main", - goal: "Finish a task", - status: "succeeded", - endedAt: 456, - }); - - const updated = updateFlowRecordById(created.flowId, { - currentStep: "finish", - }); - - expect(updated).toMatchObject({ - flowId: created.flowId, - currentStep: "finish", - endedAt: 456, - }); - expect(getFlowById(created.flowId)).toMatchObject({ - flowId: created.flowId, - endedAt: 456, - }); - }); - }); - - it("stores blocked metadata and clears it when a later task resumes the same flow", async () => { - await withFlowRegistryTempDir(async (root) => { - process.env.OPENCLAW_STATE_DIR = root; - resetFlowRegistryForTests(); - - const created = createFlowRecord({ - shape: "single_task", - ownerSessionKey: "agent:main:main", - goal: "Fix permissions", - status: "running", - }); - - const blocked = syncFlowFromTask({ - taskId: "task-blocked", - parentFlowId: created.flowId, - status: "succeeded", - terminalOutcome: "blocked", - notifyPolicy: "done_only", - label: "Fix permissions", - task: "Fix permissions", - lastEventAt: 200, - endedAt: 200, - terminalSummary: "Writable session required.", - }); - - expect(blocked).toMatchObject({ - flowId: created.flowId, - status: "blocked", - blockedTaskId: "task-blocked", - blockedSummary: "Writable session required.", - endedAt: 200, - }); - - const resumed = syncFlowFromTask({ - taskId: "task-retry", - parentFlowId: created.flowId, - status: "running", - notifyPolicy: "done_only", - label: "Fix permissions", - task: "Fix permissions", - lastEventAt: 260, - progressSummary: "Retrying with writable session", - }); - - expect(resumed).toMatchObject({ - flowId: created.flowId, - status: "running", - }); - expect(resumed?.blockedTaskId).toBeUndefined(); - expect(resumed?.blockedSummary).toBeUndefined(); - expect(resumed?.endedAt).toBeUndefined(); - }); - }); - - it("does not auto-sync linear flow state from linked child tasks", async () => { - await withFlowRegistryTempDir(async (root) => { - process.env.OPENCLAW_STATE_DIR = root; - resetFlowRegistryForTests(); - - const created = createFlowRecord({ - ownerSessionKey: "agent:main:main", - goal: "Cluster PRs", - status: "waiting", - currentStep: "wait_for", - }); - - const synced = syncFlowFromTask({ - taskId: "task-child", - parentFlowId: created.flowId, - status: "running", - notifyPolicy: "done_only", - label: "Child task", - task: "Child task", - lastEventAt: 250, - progressSummary: "Running child task", - }); - - expect(synced).toMatchObject({ - flowId: created.flowId, - shape: "linear", - status: "waiting", - currentStep: "wait_for", - }); - expect(getFlowById(created.flowId)).toMatchObject({ - flowId: created.flowId, - status: "waiting", - currentStep: "wait_for", - }); - }); - }); -}); diff --git a/src/tasks/flow-registry.ts b/src/tasks/flow-registry.ts deleted file mode 100644 index f1defc59cac..00000000000 --- a/src/tasks/flow-registry.ts +++ /dev/null @@ -1,349 +0,0 @@ -import crypto from "node:crypto"; -import { getFlowRegistryStore, resetFlowRegistryRuntimeForTests } from "./flow-registry.store.js"; -import type { FlowOutputBag, FlowRecord, FlowShape, FlowStatus } from "./flow-registry.types.js"; -import type { TaskNotifyPolicy, TaskRecord } from "./task-registry.types.js"; - -const flows = new Map(); -let restoreAttempted = false; - -function cloneFlowOutputs(outputs: FlowOutputBag | undefined): FlowOutputBag | undefined { - if (!outputs) { - return undefined; - } - return JSON.parse(JSON.stringify(outputs)) as FlowOutputBag; -} - -function cloneFlowRecord(record: FlowRecord): FlowRecord { - return { - ...record, - ...(record.requesterOrigin ? { requesterOrigin: { ...record.requesterOrigin } } : {}), - ...(record.outputs ? { outputs: cloneFlowOutputs(record.outputs) } : {}), - }; -} - -function snapshotFlowRecords(source: ReadonlyMap): FlowRecord[] { - return [...source.values()].map((record) => cloneFlowRecord(record)); -} - -function ensureNotifyPolicy(notifyPolicy?: TaskNotifyPolicy): TaskNotifyPolicy { - return notifyPolicy ?? "done_only"; -} - -function ensureFlowShape(shape?: FlowShape): FlowShape { - return shape ?? "linear"; -} - -function resolveFlowGoal(task: Pick): string { - return task.label?.trim() || task.task.trim() || "Background task"; -} - -function resolveFlowBlockedSummary( - task: Pick, -): string | undefined { - if (task.status !== "succeeded" || task.terminalOutcome !== "blocked") { - return undefined; - } - return task.terminalSummary?.trim() || task.progressSummary?.trim() || undefined; -} - -type FlowRecordPatch = { - status?: FlowStatus; - notifyPolicy?: TaskNotifyPolicy; - goal?: string; - currentStep?: string | null; - waitingOnTaskId?: string | null; - outputs?: FlowOutputBag | null; - blockedTaskId?: string | null; - blockedSummary?: string | null; - updatedAt?: number; - endedAt?: number | null; -}; - -export function deriveFlowStatusFromTask( - task: Pick, -): FlowStatus { - if (task.status === "queued") { - return "queued"; - } - if (task.status === "running") { - return "running"; - } - if (task.status === "succeeded") { - return task.terminalOutcome === "blocked" ? "blocked" : "succeeded"; - } - if (task.status === "cancelled") { - return "cancelled"; - } - if (task.status === "lost") { - return "lost"; - } - return "failed"; -} - -function ensureFlowRegistryReady() { - if (restoreAttempted) { - return; - } - restoreAttempted = true; - const restored = getFlowRegistryStore().loadSnapshot(); - flows.clear(); - for (const [flowId, flow] of restored.flows) { - flows.set(flowId, cloneFlowRecord(flow)); - } -} - -function persistFlowRegistry() { - getFlowRegistryStore().saveSnapshot({ - flows: new Map(snapshotFlowRecords(flows).map((flow) => [flow.flowId, flow])), - }); -} - -function persistFlowUpsert(flow: FlowRecord) { - const store = getFlowRegistryStore(); - if (store.upsertFlow) { - store.upsertFlow(cloneFlowRecord(flow)); - return; - } - persistFlowRegistry(); -} - -function persistFlowDelete(flowId: string) { - const store = getFlowRegistryStore(); - if (store.deleteFlow) { - store.deleteFlow(flowId); - return; - } - persistFlowRegistry(); -} - -export function createFlowRecord(params: { - shape?: FlowShape; - ownerSessionKey: string; - requesterOrigin?: FlowRecord["requesterOrigin"]; - status?: FlowStatus; - notifyPolicy?: TaskNotifyPolicy; - goal: string; - currentStep?: string; - waitingOnTaskId?: string; - outputs?: FlowOutputBag; - blockedTaskId?: string; - blockedSummary?: string; - createdAt?: number; - updatedAt?: number; - endedAt?: number; -}): FlowRecord { - ensureFlowRegistryReady(); - const now = params.createdAt ?? Date.now(); - const record: FlowRecord = { - flowId: crypto.randomUUID(), - shape: ensureFlowShape(params.shape), - ownerSessionKey: params.ownerSessionKey, - ...(params.requesterOrigin ? { requesterOrigin: { ...params.requesterOrigin } } : {}), - status: params.status ?? "queued", - notifyPolicy: ensureNotifyPolicy(params.notifyPolicy), - goal: params.goal, - currentStep: params.currentStep?.trim() || undefined, - waitingOnTaskId: params.waitingOnTaskId?.trim() || undefined, - outputs: cloneFlowOutputs(params.outputs), - blockedTaskId: params.blockedTaskId?.trim() || undefined, - blockedSummary: params.blockedSummary?.trim() || undefined, - createdAt: now, - updatedAt: params.updatedAt ?? now, - ...(params.endedAt !== undefined ? { endedAt: params.endedAt } : {}), - }; - flows.set(record.flowId, record); - persistFlowUpsert(record); - return cloneFlowRecord(record); -} - -export function createFlowForTask(params: { - task: Pick< - TaskRecord, - | "requesterSessionKey" - | "taskId" - | "notifyPolicy" - | "status" - | "terminalOutcome" - | "label" - | "task" - | "createdAt" - | "lastEventAt" - | "endedAt" - | "terminalSummary" - | "progressSummary" - >; - requesterOrigin?: FlowRecord["requesterOrigin"]; -}): FlowRecord { - const terminalFlowStatus = deriveFlowStatusFromTask(params.task); - const isTerminal = - terminalFlowStatus === "succeeded" || - terminalFlowStatus === "blocked" || - terminalFlowStatus === "failed" || - terminalFlowStatus === "cancelled" || - terminalFlowStatus === "lost"; - const endedAt = isTerminal - ? (params.task.endedAt ?? params.task.lastEventAt ?? params.task.createdAt) - : undefined; - return createFlowRecord({ - shape: "single_task", - ownerSessionKey: params.task.requesterSessionKey, - requesterOrigin: params.requesterOrigin, - status: terminalFlowStatus, - notifyPolicy: params.task.notifyPolicy, - goal: resolveFlowGoal(params.task), - blockedTaskId: - terminalFlowStatus === "blocked" ? params.task.taskId.trim() || undefined : undefined, - blockedSummary: resolveFlowBlockedSummary(params.task), - createdAt: params.task.createdAt, - updatedAt: params.task.lastEventAt ?? params.task.createdAt, - ...(endedAt !== undefined ? { endedAt } : {}), - }); -} - -export function updateFlowRecordById(flowId: string, patch: FlowRecordPatch): FlowRecord | null { - ensureFlowRegistryReady(); - const current = flows.get(flowId); - if (!current) { - return null; - } - const next: FlowRecord = { - ...current, - ...(patch.status ? { status: patch.status } : {}), - ...(patch.notifyPolicy ? { notifyPolicy: patch.notifyPolicy } : {}), - ...(patch.goal ? { goal: patch.goal } : {}), - currentStep: - patch.currentStep === undefined - ? current.currentStep - : patch.currentStep?.trim() || undefined, - waitingOnTaskId: - patch.waitingOnTaskId === undefined - ? current.waitingOnTaskId - : patch.waitingOnTaskId?.trim() || undefined, - outputs: - patch.outputs === undefined - ? cloneFlowOutputs(current.outputs) - : (cloneFlowOutputs(patch.outputs ?? undefined) ?? undefined), - blockedTaskId: - patch.blockedTaskId === undefined - ? current.blockedTaskId - : patch.blockedTaskId?.trim() || undefined, - blockedSummary: - patch.blockedSummary === undefined - ? current.blockedSummary - : patch.blockedSummary?.trim() || undefined, - updatedAt: patch.updatedAt ?? Date.now(), - endedAt: patch.endedAt === undefined ? current.endedAt : (patch.endedAt ?? undefined), - }; - flows.set(flowId, next); - persistFlowUpsert(next); - return cloneFlowRecord(next); -} - -export function syncFlowFromTask( - task: Pick< - TaskRecord, - | "parentFlowId" - | "status" - | "terminalOutcome" - | "notifyPolicy" - | "label" - | "task" - | "lastEventAt" - | "endedAt" - | "taskId" - | "terminalSummary" - | "progressSummary" - >, -): FlowRecord | null { - const flowId = task.parentFlowId?.trim(); - if (!flowId) { - return null; - } - const flow = getFlowById(flowId); - if (!flow) { - return null; - } - if (flow.shape !== "single_task") { - return flow; - } - const terminalFlowStatus = deriveFlowStatusFromTask(task); - const isTerminal = - terminalFlowStatus === "succeeded" || - terminalFlowStatus === "blocked" || - terminalFlowStatus === "failed" || - terminalFlowStatus === "cancelled" || - terminalFlowStatus === "lost"; - return updateFlowRecordById(flowId, { - status: terminalFlowStatus, - notifyPolicy: task.notifyPolicy, - goal: resolveFlowGoal(task), - blockedTaskId: terminalFlowStatus === "blocked" ? task.taskId.trim() || null : null, - blockedSummary: - terminalFlowStatus === "blocked" ? (resolveFlowBlockedSummary(task) ?? null) : null, - updatedAt: task.lastEventAt ?? Date.now(), - ...(isTerminal - ? { - endedAt: task.endedAt ?? task.lastEventAt ?? Date.now(), - } - : { endedAt: null }), - }); -} - -export function getFlowById(flowId: string): FlowRecord | undefined { - ensureFlowRegistryReady(); - const flow = flows.get(flowId); - return flow ? cloneFlowRecord(flow) : undefined; -} - -export function listFlowsForOwnerSessionKey(sessionKey: string): FlowRecord[] { - ensureFlowRegistryReady(); - const normalizedSessionKey = sessionKey.trim(); - if (!normalizedSessionKey) { - return []; - } - return [...flows.values()] - .filter((flow) => flow.ownerSessionKey.trim() === normalizedSessionKey) - .map((flow) => cloneFlowRecord(flow)) - .toSorted((left, right) => right.createdAt - left.createdAt); -} - -export function findLatestFlowForOwnerSessionKey(sessionKey: string): FlowRecord | undefined { - const flow = listFlowsForOwnerSessionKey(sessionKey)[0]; - return flow ? cloneFlowRecord(flow) : undefined; -} - -export function resolveFlowForLookupToken(token: string): FlowRecord | undefined { - const lookup = token.trim(); - if (!lookup) { - return undefined; - } - return getFlowById(lookup) ?? findLatestFlowForOwnerSessionKey(lookup); -} - -export function listFlowRecords(): FlowRecord[] { - ensureFlowRegistryReady(); - return [...flows.values()] - .map((flow) => cloneFlowRecord(flow)) - .toSorted((left, right) => right.createdAt - left.createdAt); -} - -export function deleteFlowRecordById(flowId: string): boolean { - ensureFlowRegistryReady(); - const current = flows.get(flowId); - if (!current) { - return false; - } - flows.delete(flowId); - persistFlowDelete(flowId); - return true; -} - -export function resetFlowRegistryForTests(opts?: { persist?: boolean }) { - flows.clear(); - restoreAttempted = false; - resetFlowRegistryRuntimeForTests(); - if (opts?.persist !== false) { - persistFlowRegistry(); - getFlowRegistryStore().close?.(); - } -} diff --git a/src/tasks/flow-registry.types.ts b/src/tasks/flow-registry.types.ts deleted file mode 100644 index 188f5c9ceeb..00000000000 --- a/src/tasks/flow-registry.types.ts +++ /dev/null @@ -1,42 +0,0 @@ -import type { DeliveryContext } from "../utils/delivery-context.js"; -import type { TaskNotifyPolicy } from "./task-registry.types.js"; - -export type FlowShape = "single_task" | "linear"; - -export type FlowOutputValue = - | null - | boolean - | number - | string - | FlowOutputValue[] - | { [key: string]: FlowOutputValue }; - -export type FlowOutputBag = Record; - -export type FlowStatus = - | "queued" - | "running" - | "waiting" - | "blocked" - | "succeeded" - | "failed" - | "cancelled" - | "lost"; - -export type FlowRecord = { - flowId: string; - shape: FlowShape; - ownerSessionKey: string; - requesterOrigin?: DeliveryContext; - status: FlowStatus; - notifyPolicy: TaskNotifyPolicy; - goal: string; - currentStep?: string; - waitingOnTaskId?: string; - outputs?: FlowOutputBag; - blockedTaskId?: string; - blockedSummary?: string; - createdAt: number; - updatedAt: number; - endedAt?: number; -}; diff --git a/src/tasks/flow-runtime.test.ts b/src/tasks/flow-runtime.test.ts deleted file mode 100644 index 5fdf009bbed..00000000000 --- a/src/tasks/flow-runtime.test.ts +++ /dev/null @@ -1,281 +0,0 @@ -import { afterEach, describe, expect, it, vi } from "vitest"; -import { withTempDir } from "../test-helpers/temp-dir.js"; -import { getFlowById, resetFlowRegistryForTests, updateFlowRecordById } from "./flow-registry.js"; -import { - appendFlowOutput, - createFlow, - emitFlowUpdate, - failFlow, - finishFlow, - resumeFlow, - runTaskInFlow, - setFlowOutput, -} from "./flow-runtime.js"; -import { listTasksForFlowId, resetTaskRegistryForTests } from "./task-registry.js"; - -const ORIGINAL_STATE_DIR = process.env.OPENCLAW_STATE_DIR; -const mocks = vi.hoisted(() => ({ - sendMessageMock: vi.fn(), - enqueueSystemEventMock: vi.fn(), - requestHeartbeatNowMock: vi.fn(), -})); - -vi.mock("./task-registry-delivery-runtime.js", () => ({ - sendMessage: (...args: unknown[]) => mocks.sendMessageMock(...args), -})); - -vi.mock("../infra/system-events.js", () => ({ - enqueueSystemEvent: (...args: unknown[]) => mocks.enqueueSystemEventMock(...args), -})); - -vi.mock("../infra/heartbeat-wake.js", () => ({ - requestHeartbeatNow: (...args: unknown[]) => mocks.requestHeartbeatNowMock(...args), -})); - -vi.mock("../infra/agent-events.js", () => ({ - onAgentEvent: () => () => {}, -})); - -vi.mock("../acp/control-plane/manager.js", () => ({ - getAcpSessionManager: () => ({ - cancelSession: vi.fn(), - }), -})); - -vi.mock("../agents/subagent-control.js", () => ({ - killSubagentRunAdmin: vi.fn(), -})); - -async function withFlowRuntimeStateDir(run: (root: string) => Promise): Promise { - await withTempDir({ prefix: "openclaw-flow-runtime-" }, async (root) => { - process.env.OPENCLAW_STATE_DIR = root; - resetTaskRegistryForTests(); - resetFlowRegistryForTests(); - try { - await run(root); - } finally { - resetTaskRegistryForTests(); - resetFlowRegistryForTests(); - } - }); -} - -describe("flow-runtime", () => { - afterEach(() => { - if (ORIGINAL_STATE_DIR === undefined) { - delete process.env.OPENCLAW_STATE_DIR; - } else { - process.env.OPENCLAW_STATE_DIR = ORIGINAL_STATE_DIR; - } - resetTaskRegistryForTests(); - resetFlowRegistryForTests(); - mocks.sendMessageMock.mockReset(); - mocks.enqueueSystemEventMock.mockReset(); - mocks.requestHeartbeatNowMock.mockReset(); - }); - - it("runs a child task under a linear flow and marks the flow as waiting on it", async () => { - await withFlowRuntimeStateDir(async () => { - const flow = createFlow({ - ownerSessionKey: "agent:main:main", - requesterOrigin: { - channel: "telegram", - to: "telegram:123", - }, - goal: "Triage inbox", - }); - - const started = runTaskInFlow({ - flowId: flow.flowId, - runtime: "acp", - childSessionKey: "agent:codex:acp:child", - runId: "run-flow-runtime-1", - task: "Classify inbox messages", - currentStep: "wait_for_classification", - }); - - expect(started.task).toMatchObject({ - requesterSessionKey: "agent:main:main", - parentFlowId: flow.flowId, - childSessionKey: "agent:codex:acp:child", - runId: "run-flow-runtime-1", - status: "queued", - }); - expect(started.flow).toMatchObject({ - flowId: flow.flowId, - status: "waiting", - currentStep: "wait_for_classification", - waitingOnTaskId: started.task.taskId, - }); - expect(listTasksForFlowId(flow.flowId)).toHaveLength(1); - }); - }); - - it("stores outputs and waiting metadata across sqlite restore", async () => { - await withFlowRuntimeStateDir(async () => { - const flow = createFlow({ - ownerSessionKey: "agent:main:main", - goal: "Inbox routing", - }); - - const started = runTaskInFlow({ - flowId: flow.flowId, - runtime: "subagent", - childSessionKey: "agent:codex:subagent:child", - runId: "run-flow-runtime-restore", - task: "Bucket messages", - }); - - setFlowOutput({ - flowId: flow.flowId, - key: "classification", - value: { - business: 1, - personal: 2, - }, - }); - appendFlowOutput({ - flowId: flow.flowId, - key: "eod_summary", - value: { - subject: "Newsletter", - }, - }); - - resetTaskRegistryForTests({ persist: false }); - resetFlowRegistryForTests({ persist: false }); - - expect(getFlowById(flow.flowId)).toMatchObject({ - flowId: flow.flowId, - status: "waiting", - waitingOnTaskId: started.task.taskId, - outputs: { - classification: { - business: 1, - personal: 2, - }, - eod_summary: [ - { - subject: "Newsletter", - }, - ], - }, - }); - }); - }); - - it("reopens a blocked flow with resume and marks terminal states with finish/fail", async () => { - await withFlowRuntimeStateDir(async () => { - const flow = createFlow({ - ownerSessionKey: "agent:main:main", - goal: "Review inbox", - }); - const started = runTaskInFlow({ - flowId: flow.flowId, - runtime: "acp", - childSessionKey: "agent:codex:acp:child", - runId: "run-flow-runtime-reopen", - task: "Review inbox", - }); - - updateFlowRecordById(flow.flowId, { - status: "blocked", - blockedTaskId: started.task.taskId, - blockedSummary: "Need auth.", - endedAt: 120, - }); - - expect(resumeFlow({ flowId: flow.flowId, currentStep: "retry_auth" })).toMatchObject({ - flowId: flow.flowId, - status: "running", - currentStep: "retry_auth", - }); - expect(getFlowById(flow.flowId)?.blockedTaskId).toBeUndefined(); - expect(getFlowById(flow.flowId)?.waitingOnTaskId).toBeUndefined(); - expect(getFlowById(flow.flowId)?.endedAt).toBeUndefined(); - - expect( - finishFlow({ flowId: flow.flowId, currentStep: "finish", endedAt: 200 }), - ).toMatchObject({ - flowId: flow.flowId, - status: "succeeded", - currentStep: "finish", - endedAt: 200, - }); - - const failed = createFlow({ - ownerSessionKey: "agent:main:main", - goal: "Failing flow", - }); - expect(failFlow({ flowId: failed.flowId, currentStep: "abort", endedAt: 300 })).toMatchObject( - { - flowId: failed.flowId, - status: "failed", - currentStep: "abort", - endedAt: 300, - }, - ); - }); - }); - - it("delivers explicit flow updates through the flow owner context when possible", async () => { - await withFlowRuntimeStateDir(async () => { - const flow = createFlow({ - ownerSessionKey: "agent:main:main", - requesterOrigin: { - channel: "telegram", - to: "telegram:123", - threadId: "42", - }, - goal: "Inbox routing", - }); - - const result = await emitFlowUpdate({ - flowId: flow.flowId, - content: "Personal message needs your attention.", - eventKey: "personal-alert", - }); - - expect(result.delivery).toBe("direct"); - expect(mocks.sendMessageMock).toHaveBeenCalledWith( - expect.objectContaining({ - channel: "telegram", - to: "telegram:123", - threadId: "42", - content: "Personal message needs your attention.", - idempotencyKey: `flow:${flow.flowId}:update:personal-alert`, - mirror: expect.objectContaining({ - sessionKey: "agent:main:main", - }), - }), - ); - }); - }); - - it("falls back to session-queued flow updates when direct delivery is unavailable", async () => { - await withFlowRuntimeStateDir(async () => { - const flow = createFlow({ - ownerSessionKey: "agent:main:main", - goal: "Inbox routing", - }); - - const result = await emitFlowUpdate({ - flowId: flow.flowId, - content: "Business email sent to Slack and waiting for reply.", - }); - - expect(result.delivery).toBe("session_queued"); - expect(mocks.enqueueSystemEventMock).toHaveBeenCalledWith( - "Business email sent to Slack and waiting for reply.", - expect.objectContaining({ - sessionKey: "agent:main:main", - contextKey: `flow:${flow.flowId}`, - }), - ); - expect(mocks.requestHeartbeatNowMock).toHaveBeenCalledWith({ - reason: "clawflow-update", - sessionKey: "agent:main:main", - }); - }); - }); -}); diff --git a/src/tasks/flow-runtime.ts b/src/tasks/flow-runtime.ts deleted file mode 100644 index 0384dc5f139..00000000000 --- a/src/tasks/flow-runtime.ts +++ /dev/null @@ -1,377 +0,0 @@ -import { requestHeartbeatNow } from "../infra/heartbeat-wake.js"; -import { enqueueSystemEvent } from "../infra/system-events.js"; -import { parseAgentSessionKey } from "../routing/session-key.js"; -import { isDeliverableMessageChannel } from "../utils/message-channel.js"; -import { createFlowRecord, getFlowById, updateFlowRecordById } from "./flow-registry.js"; -import type { FlowOutputBag, FlowOutputValue, FlowRecord } from "./flow-registry.types.js"; -import { createQueuedTaskRun, createRunningTaskRun } from "./task-executor.js"; -import { listTasksForFlowId } from "./task-registry.js"; -import type { - TaskDeliveryStatus, - TaskNotifyPolicy, - TaskRecord, - TaskRuntime, -} from "./task-registry.types.js"; - -let deliveryRuntimePromise: Promise | null = - null; - -type FlowTaskLaunch = "queued" | "running"; - -export type FlowUpdateDelivery = "direct" | "session_queued" | "parent_missing" | "failed"; - -function loadFlowDeliveryRuntime() { - deliveryRuntimePromise ??= import("./task-registry-delivery-runtime.js"); - return deliveryRuntimePromise; -} - -function requireFlow(flowId: string): FlowRecord { - const flow = getFlowById(flowId); - if (!flow) { - throw new Error(`Flow not found: ${flowId}`); - } - return flow; -} - -function requireLinearFlow(flowId: string): FlowRecord { - const flow = requireFlow(flowId); - if (flow.shape !== "linear") { - throw new Error(`Flow is not linear: ${flowId}`); - } - return flow; -} - -function cloneOutputValue(value: T): T { - return JSON.parse(JSON.stringify(value)) as T; -} - -function updateRequiredFlow( - flowId: string, - patch: Parameters[1], -): FlowRecord { - const updated = updateFlowRecordById(flowId, patch); - if (!updated) { - throw new Error(`Flow not found: ${flowId}`); - } - return updated; -} - -function resolveFlowOutputs(flow: FlowRecord): FlowOutputBag { - return flow.outputs ? cloneOutputValue(flow.outputs) : {}; -} - -function canDeliverFlowToRequesterOrigin(flow: FlowRecord): boolean { - const channel = flow.requesterOrigin?.channel?.trim(); - const to = flow.requesterOrigin?.to?.trim(); - return Boolean(channel && to && isDeliverableMessageChannel(channel)); -} - -export function createFlow(params: { - ownerSessionKey: string; - requesterOrigin?: FlowRecord["requesterOrigin"]; - goal: string; - notifyPolicy?: TaskNotifyPolicy; - currentStep?: string; - createdAt?: number; - updatedAt?: number; -}): FlowRecord { - return createFlowRecord({ - shape: "linear", - ownerSessionKey: params.ownerSessionKey, - requesterOrigin: params.requesterOrigin, - goal: params.goal, - notifyPolicy: params.notifyPolicy, - currentStep: params.currentStep, - status: "queued", - createdAt: params.createdAt, - updatedAt: params.updatedAt, - }); -} - -export function runTaskInFlow(params: { - flowId: string; - runtime: TaskRuntime; - sourceId?: string; - childSessionKey?: string; - parentTaskId?: string; - agentId?: string; - runId?: string; - label?: string; - task: string; - preferMetadata?: boolean; - notifyPolicy?: TaskNotifyPolicy; - deliveryStatus?: TaskDeliveryStatus; - launch?: FlowTaskLaunch; - startedAt?: number; - lastEventAt?: number; - progressSummary?: string | null; - currentStep?: string; -}): { flow: FlowRecord; task: TaskRecord } { - const flow = requireLinearFlow(params.flowId); - const launch = params.launch ?? "queued"; - const task = - launch === "running" - ? createRunningTaskRun({ - runtime: params.runtime, - sourceId: params.sourceId, - requesterSessionKey: flow.ownerSessionKey, - requesterOrigin: flow.requesterOrigin, - parentFlowId: flow.flowId, - childSessionKey: params.childSessionKey, - parentTaskId: params.parentTaskId, - agentId: params.agentId, - runId: params.runId, - label: params.label, - task: params.task, - preferMetadata: params.preferMetadata, - notifyPolicy: params.notifyPolicy ?? flow.notifyPolicy, - deliveryStatus: params.deliveryStatus, - startedAt: params.startedAt, - lastEventAt: params.lastEventAt, - progressSummary: params.progressSummary, - }) - : createQueuedTaskRun({ - runtime: params.runtime, - sourceId: params.sourceId, - requesterSessionKey: flow.ownerSessionKey, - requesterOrigin: flow.requesterOrigin, - parentFlowId: flow.flowId, - childSessionKey: params.childSessionKey, - parentTaskId: params.parentTaskId, - agentId: params.agentId, - runId: params.runId, - label: params.label, - task: params.task, - preferMetadata: params.preferMetadata, - notifyPolicy: params.notifyPolicy ?? flow.notifyPolicy, - deliveryStatus: params.deliveryStatus, - }); - return { - task, - flow: updateRequiredFlow(flow.flowId, { - status: "waiting", - currentStep: params.currentStep ?? flow.currentStep ?? "wait_for_task", - waitingOnTaskId: task.taskId, - blockedTaskId: null, - blockedSummary: null, - endedAt: null, - updatedAt: task.lastEventAt ?? task.startedAt ?? Date.now(), - }), - }; -} - -export function setFlowWaiting(params: { - flowId: string; - currentStep?: string | null; - waitingOnTaskId?: string | null; - updatedAt?: number; -}): FlowRecord { - const flow = requireLinearFlow(params.flowId); - if (params.waitingOnTaskId?.trim()) { - const waitingOnTaskId = params.waitingOnTaskId.trim(); - const linkedTaskIds = new Set(listTasksForFlowId(flow.flowId).map((task) => task.taskId)); - if (!linkedTaskIds.has(waitingOnTaskId)) { - throw new Error(`Flow ${flow.flowId} is not linked to task ${waitingOnTaskId}`); - } - } - return updateRequiredFlow(flow.flowId, { - status: "waiting", - currentStep: params.currentStep, - waitingOnTaskId: params.waitingOnTaskId, - endedAt: null, - updatedAt: params.updatedAt ?? Date.now(), - }); -} - -export function setFlowOutput(params: { - flowId: string; - key: string; - value: FlowOutputValue; - updatedAt?: number; -}): FlowRecord { - const flow = requireLinearFlow(params.flowId); - const key = params.key.trim(); - if (!key) { - throw new Error("Flow output key is required."); - } - const outputs = resolveFlowOutputs(flow); - outputs[key] = cloneOutputValue(params.value); - return updateRequiredFlow(flow.flowId, { - outputs, - updatedAt: params.updatedAt ?? Date.now(), - }); -} - -export function appendFlowOutput(params: { - flowId: string; - key: string; - value: FlowOutputValue; - updatedAt?: number; -}): FlowRecord { - const flow = requireLinearFlow(params.flowId); - const key = params.key.trim(); - if (!key) { - throw new Error("Flow output key is required."); - } - const outputs = resolveFlowOutputs(flow); - const nextValue = cloneOutputValue(params.value); - const current = outputs[key]; - if (current === undefined) { - outputs[key] = [nextValue]; - } else if (Array.isArray(current)) { - outputs[key] = [...current, nextValue]; - } else { - throw new Error(`Flow output ${key} is not an array.`); - } - return updateRequiredFlow(flow.flowId, { - outputs, - updatedAt: params.updatedAt ?? Date.now(), - }); -} - -export function resumeFlow(params: { - flowId: string; - currentStep?: string | null; - updatedAt?: number; -}): FlowRecord { - const flow = requireLinearFlow(params.flowId); - return updateRequiredFlow(flow.flowId, { - status: "running", - currentStep: params.currentStep, - waitingOnTaskId: null, - blockedTaskId: null, - blockedSummary: null, - endedAt: null, - updatedAt: params.updatedAt ?? Date.now(), - }); -} - -export function finishFlow(params: { - flowId: string; - currentStep?: string | null; - updatedAt?: number; - endedAt?: number; -}): FlowRecord { - const flow = requireLinearFlow(params.flowId); - const endedAt = params.endedAt ?? params.updatedAt ?? Date.now(); - return updateRequiredFlow(flow.flowId, { - status: "succeeded", - currentStep: params.currentStep, - waitingOnTaskId: null, - blockedTaskId: null, - blockedSummary: null, - updatedAt: params.updatedAt ?? endedAt, - endedAt, - }); -} - -export function failFlow(params: { - flowId: string; - currentStep?: string | null; - updatedAt?: number; - endedAt?: number; -}): FlowRecord { - const flow = requireLinearFlow(params.flowId); - const endedAt = params.endedAt ?? params.updatedAt ?? Date.now(); - return updateRequiredFlow(flow.flowId, { - status: "failed", - currentStep: params.currentStep, - waitingOnTaskId: null, - blockedTaskId: null, - blockedSummary: null, - updatedAt: params.updatedAt ?? endedAt, - endedAt, - }); -} - -export async function emitFlowUpdate(params: { - flowId: string; - content: string; - eventKey?: string; - currentStep?: string | null; - updatedAt?: number; -}): Promise<{ flow: FlowRecord; delivery: FlowUpdateDelivery }> { - const flow = requireFlow(params.flowId); - const content = params.content.trim(); - if (!content) { - throw new Error("Flow update content is required."); - } - const ownerSessionKey = flow.ownerSessionKey.trim(); - const updatedAt = params.updatedAt ?? Date.now(); - const updatedFlow = updateRequiredFlow(flow.flowId, { - currentStep: params.currentStep, - updatedAt, - }); - if (!ownerSessionKey) { - return { - flow: updatedFlow, - delivery: "parent_missing", - }; - } - if (!canDeliverFlowToRequesterOrigin(updatedFlow)) { - try { - enqueueSystemEvent(content, { - sessionKey: ownerSessionKey, - contextKey: `flow:${updatedFlow.flowId}`, - deliveryContext: updatedFlow.requesterOrigin, - }); - requestHeartbeatNow({ - reason: "clawflow-update", - sessionKey: ownerSessionKey, - }); - return { - flow: updatedFlow, - delivery: "session_queued", - }; - } catch { - return { - flow: updatedFlow, - delivery: "failed", - }; - } - } - try { - const requesterAgentId = parseAgentSessionKey(ownerSessionKey)?.agentId; - const idempotencyKey = `flow:${updatedFlow.flowId}:update:${params.eventKey?.trim() || updatedAt}`; - const { sendMessage } = await loadFlowDeliveryRuntime(); - await sendMessage({ - channel: updatedFlow.requesterOrigin?.channel, - to: updatedFlow.requesterOrigin?.to ?? "", - accountId: updatedFlow.requesterOrigin?.accountId, - threadId: updatedFlow.requesterOrigin?.threadId, - content, - agentId: requesterAgentId, - idempotencyKey, - mirror: { - sessionKey: ownerSessionKey, - agentId: requesterAgentId, - idempotencyKey, - }, - }); - return { - flow: updatedFlow, - delivery: "direct", - }; - } catch { - try { - enqueueSystemEvent(content, { - sessionKey: ownerSessionKey, - contextKey: `flow:${updatedFlow.flowId}`, - deliveryContext: updatedFlow.requesterOrigin, - }); - requestHeartbeatNow({ - reason: "clawflow-update", - sessionKey: ownerSessionKey, - }); - return { - flow: updatedFlow, - delivery: "session_queued", - }; - } catch { - return { - flow: updatedFlow, - delivery: "failed", - }; - } - } -} diff --git a/src/tasks/task-executor.test.ts b/src/tasks/task-executor.test.ts index 7404ac78e84..46883aff569 100644 --- a/src/tasks/task-executor.test.ts +++ b/src/tasks/task-executor.test.ts @@ -1,29 +1,16 @@ import { afterEach, describe, expect, it, vi } from "vitest"; import { withTempDir } from "../test-helpers/temp-dir.js"; import { - getFlowById, - listFlowRecords, - resetFlowRegistryForTests, - updateFlowRecordById, -} from "./flow-registry.js"; -import { - cancelFlowById, + cancelDetachedTaskRunById, completeTaskRunByRunId, - createLinearFlow, createQueuedTaskRun, createRunningTaskRun, failTaskRunByRunId, recordTaskRunProgressByRunId, - retryBlockedFlowAsQueuedTaskRun, - retryBlockedFlowAsRunningTaskRun, setDetachedTaskDeliveryStatusByRunId, startTaskRunByRunId, } from "./task-executor.js"; -import { - findLatestTaskForFlowId, - findTaskByRunId, - resetTaskRegistryForTests, -} from "./task-registry.js"; +import { findTaskByRunId, resetTaskRegistryForTests } from "./task-registry.js"; const ORIGINAL_STATE_DIR = process.env.OPENCLAW_STATE_DIR; const hoisted = vi.hoisted(() => { @@ -55,12 +42,10 @@ async function withTaskExecutorStateDir(run: (root: string) => Promise): P await withTempDir({ prefix: "openclaw-task-executor-" }, async (root) => { process.env.OPENCLAW_STATE_DIR = root; resetTaskRegistryForTests(); - resetFlowRegistryForTests(); try { await run(root); } finally { resetTaskRegistryForTests(); - resetFlowRegistryForTests(); } }); } @@ -73,7 +58,6 @@ describe("task-executor", () => { process.env.OPENCLAW_STATE_DIR = ORIGINAL_STATE_DIR; } resetTaskRegistryForTests(); - resetFlowRegistryForTests(); hoisted.sendMessageMock.mockReset(); hoisted.cancelSessionMock.mockReset(); hoisted.killSubagentRunAdminMock.mockReset(); @@ -155,62 +139,7 @@ describe("task-executor", () => { }); }); - it("auto-creates a one-task flow and keeps it synced with task status", async () => { - await withTaskExecutorStateDir(async () => { - const created = createRunningTaskRun({ - runtime: "subagent", - requesterSessionKey: "agent:main:main", - childSessionKey: "agent:codex:subagent:child", - runId: "run-executor-flow", - task: "Write summary", - startedAt: 10, - deliveryStatus: "pending", - }); - - expect(created.parentFlowId).toEqual(expect.any(String)); - expect(getFlowById(created.parentFlowId!)).toMatchObject({ - flowId: created.parentFlowId, - ownerSessionKey: "agent:main:main", - status: "running", - goal: "Write summary", - notifyPolicy: "done_only", - }); - - completeTaskRunByRunId({ - runId: "run-executor-flow", - endedAt: 40, - lastEventAt: 40, - terminalSummary: "Done.", - }); - - expect(getFlowById(created.parentFlowId!)).toMatchObject({ - flowId: created.parentFlowId, - status: "succeeded", - endedAt: 40, - goal: "Write summary", - notifyPolicy: "done_only", - }); - }); - }); - - it("does not auto-create one-task flows for non-returning bookkeeping runs", async () => { - await withTaskExecutorStateDir(async () => { - const created = createRunningTaskRun({ - runtime: "cli", - requesterSessionKey: "agent:main:main", - childSessionKey: "agent:main:main", - runId: "run-executor-cli", - task: "Foreground gateway run", - deliveryStatus: "not_applicable", - startedAt: 10, - }); - - expect(created.parentFlowId).toBeUndefined(); - expect(listFlowRecords()).toEqual([]); - }); - }); - - it("records blocked metadata on one-task flows and reuses the same flow for queued retries", async () => { + it("records blocked task outcomes without wrapping them in a separate flow model", async () => { await withTaskExecutorStateDir(async () => { const created = createRunningTaskRun({ runtime: "acp", @@ -235,171 +164,22 @@ describe("task-executor", () => { terminalSummary: "Writable session required.", }); - expect(getFlowById(created.parentFlowId!)).toMatchObject({ - flowId: created.parentFlowId, - status: "blocked", - blockedTaskId: created.taskId, - blockedSummary: "Writable session required.", - endedAt: 40, - }); - - const retried = retryBlockedFlowAsQueuedTaskRun({ - flowId: created.parentFlowId!, - runId: "run-executor-retry", - childSessionKey: "agent:codex:acp:retry-child", - }); - - expect(retried).toMatchObject({ - found: true, - retried: true, - previousTask: expect.objectContaining({ - taskId: created.taskId, - }), - task: expect.objectContaining({ - parentFlowId: created.parentFlowId, - parentTaskId: created.taskId, - status: "queued", - runId: "run-executor-retry", - }), - }); - - expect(getFlowById(created.parentFlowId!)).toMatchObject({ - flowId: created.parentFlowId, - status: "queued", - }); - expect(getFlowById(created.parentFlowId!)?.blockedTaskId).toBeUndefined(); - expect(getFlowById(created.parentFlowId!)?.blockedSummary).toBeUndefined(); - expect(getFlowById(created.parentFlowId!)?.endedAt).toBeUndefined(); - expect(findLatestTaskForFlowId(created.parentFlowId!)).toMatchObject({ - taskId: retried.task?.taskId, - }); - }); - }); - - it("can reopen blocked one-task flows directly into a running retry", async () => { - await withTaskExecutorStateDir(async () => { - const created = createRunningTaskRun({ - runtime: "subagent", - requesterSessionKey: "agent:main:main", - childSessionKey: "agent:codex:subagent:child", - runId: "run-executor-blocked-running", - task: "Write summary", - startedAt: 10, - deliveryStatus: "pending", - }); - - completeTaskRunByRunId({ - runId: "run-executor-blocked-running", - endedAt: 40, - lastEventAt: 40, + expect(findTaskByRunId("run-executor-blocked")).toMatchObject({ + taskId: created.taskId, + status: "succeeded", terminalOutcome: "blocked", - terminalSummary: "Need write approval.", - }); - - const retried = retryBlockedFlowAsRunningTaskRun({ - flowId: created.parentFlowId!, - runId: "run-executor-running-retry", - childSessionKey: "agent:codex:subagent:retry", - startedAt: 55, - lastEventAt: 55, - progressSummary: "Retrying with approval", - }); - - expect(retried).toMatchObject({ - found: true, - retried: true, - task: expect.objectContaining({ - parentFlowId: created.parentFlowId, - status: "running", - runId: "run-executor-running-retry", - progressSummary: "Retrying with approval", - }), - }); - - expect(getFlowById(created.parentFlowId!)).toMatchObject({ - flowId: created.parentFlowId, - status: "running", + terminalSummary: "Writable session required.", }); }); }); - it("refuses to retry flows that are not currently blocked", async () => { - await withTaskExecutorStateDir(async () => { - const created = createRunningTaskRun({ - runtime: "acp", - requesterSessionKey: "agent:main:main", - childSessionKey: "agent:codex:acp:child", - runId: "run-executor-not-blocked", - task: "Patch file", - startedAt: 10, - deliveryStatus: "pending", - }); - - const retried = retryBlockedFlowAsQueuedTaskRun({ - flowId: created.parentFlowId!, - runId: "run-should-not-exist", - }); - - expect(retried).toMatchObject({ - found: true, - retried: false, - reason: "Flow is not blocked.", - }); - expect(findTaskByRunId("run-should-not-exist")).toBeUndefined(); - }); - }); - - it("keeps linear flows under explicit control instead of auto-syncing child task status", async () => { - await withTaskExecutorStateDir(async () => { - const flow = createLinearFlow({ - ownerSessionKey: "agent:main:main", - goal: "Triage a PR cluster", - currentStep: "wait_for", - notifyPolicy: "done_only", - }); - - const child = createRunningTaskRun({ - runtime: "acp", - requesterSessionKey: "agent:main:main", - parentFlowId: flow.flowId, - childSessionKey: "agent:codex:acp:child", - runId: "run-linear-child", - task: "Inspect a PR", - startedAt: 10, - deliveryStatus: "pending", - }); - - completeTaskRunByRunId({ - runId: "run-linear-child", - endedAt: 40, - lastEventAt: 40, - terminalSummary: "Done.", - }); - - expect(child.parentFlowId).toBe(flow.flowId); - expect(getFlowById(flow.flowId)).toMatchObject({ - flowId: flow.flowId, - shape: "linear", - status: "queued", - currentStep: "wait_for", - }); - }); - }); - - it("cancels active child tasks and marks a linear flow cancelled", async () => { + it("cancels active ACP child tasks", async () => { await withTaskExecutorStateDir(async () => { hoisted.cancelSessionMock.mockResolvedValue(undefined); - const flow = createLinearFlow({ - ownerSessionKey: "agent:main:main", - goal: "Cluster related PRs", - currentStep: "wait_for", - }); - const child = createRunningTaskRun({ runtime: "acp", requesterSessionKey: "agent:main:main", - parentFlowId: flow.flowId, childSessionKey: "agent:codex:acp:child", runId: "run-linear-cancel", task: "Inspect a PR", @@ -407,58 +187,60 @@ describe("task-executor", () => { deliveryStatus: "pending", }); - const cancelled = await cancelFlowById({ + const cancelled = await cancelDetachedTaskRunById({ cfg: {} as never, - flowId: flow.flowId, + taskId: child.taskId, }); expect(cancelled).toMatchObject({ found: true, cancelled: true, - flow: expect.objectContaining({ - flowId: flow.flowId, - status: "cancelled", - }), }); expect(findTaskByRunId("run-linear-cancel")).toMatchObject({ taskId: child.taskId, status: "cancelled", }); - expect(getFlowById(flow.flowId)).toMatchObject({ - flowId: flow.flowId, - status: "cancelled", + expect(hoisted.cancelSessionMock).toHaveBeenCalledWith({ + cfg: {} as never, + sessionKey: "agent:codex:acp:child", + reason: "task-cancel", }); - expect(hoisted.cancelSessionMock).toHaveBeenCalled(); }); }); - it("refuses to rewrite terminal linear flows when cancel is requested", async () => { + it("cancels active subagent child tasks", async () => { await withTaskExecutorStateDir(async () => { - const flow = createLinearFlow({ - ownerSessionKey: "agent:main:main", - goal: "Cluster related PRs", - currentStep: "finish", - }); - updateFlowRecordById(flow.flowId, { - status: "succeeded", - endedAt: 55, - updatedAt: 55, + hoisted.killSubagentRunAdminMock.mockResolvedValue({ + found: true, + killed: true, }); - const cancelled = await cancelFlowById({ + const child = createRunningTaskRun({ + runtime: "subagent", + requesterSessionKey: "agent:main:main", + childSessionKey: "agent:codex:subagent:child", + runId: "run-subagent-cancel", + task: "Inspect a PR", + startedAt: 10, + deliveryStatus: "pending", + }); + + const cancelled = await cancelDetachedTaskRunById({ cfg: {} as never, - flowId: flow.flowId, + taskId: child.taskId, }); expect(cancelled).toMatchObject({ found: true, - cancelled: false, - reason: "Flow is already succeeded.", + cancelled: true, }); - expect(getFlowById(flow.flowId)).toMatchObject({ - flowId: flow.flowId, - status: "succeeded", - endedAt: 55, + expect(findTaskByRunId("run-subagent-cancel")).toMatchObject({ + taskId: child.taskId, + status: "cancelled", + }); + expect(hoisted.killSubagentRunAdminMock).toHaveBeenCalledWith({ + cfg: {} as never, + sessionKey: "agent:codex:subagent:child", }); }); }); diff --git a/src/tasks/task-executor.ts b/src/tasks/task-executor.ts index b2e955ce768..8bfde5b8c07 100644 --- a/src/tasks/task-executor.ts +++ b/src/tasks/task-executor.ts @@ -1,90 +1,28 @@ import type { OpenClawConfig } from "../config/config.js"; -import { createSubsystemLogger } from "../logging/subsystem.js"; -import { - createFlowForTask, - createFlowRecord, - deleteFlowRecordById, - getFlowById, - updateFlowRecordById, -} from "./flow-registry.js"; -import type { FlowRecord } from "./flow-registry.types.js"; import { cancelTaskById, createTaskRecord, - findLatestTaskForFlowId, - linkTaskToFlowById, - listTasksForFlowId, markTaskLostById, markTaskRunningByRunId, markTaskTerminalByRunId, recordTaskProgressByRunId, setTaskRunDeliveryStatusByRunId, } from "./task-registry.js"; -import { summarizeTaskRecords } from "./task-registry.summary.js"; import type { TaskDeliveryState, TaskDeliveryStatus, TaskNotifyPolicy, TaskRecord, - TaskRegistrySummary, TaskRuntime, TaskStatus, TaskTerminalOutcome, } from "./task-registry.types.js"; -const log = createSubsystemLogger("tasks/executor"); - -function isOneTaskFlowEligible(task: TaskRecord): boolean { - if (task.parentFlowId?.trim() || !task.requesterSessionKey.trim()) { - return false; - } - if (task.deliveryStatus === "not_applicable") { - return false; - } - return task.runtime === "acp" || task.runtime === "subagent"; -} - -function ensureSingleTaskFlow(params: { - task: TaskRecord; - requesterOrigin?: TaskDeliveryState["requesterOrigin"]; -}): TaskRecord { - if (!isOneTaskFlowEligible(params.task)) { - return params.task; - } - try { - const flow = createFlowForTask({ - task: params.task, - requesterOrigin: params.requesterOrigin, - }); - const linked = linkTaskToFlowById({ - taskId: params.task.taskId, - flowId: flow.flowId, - }); - if (!linked) { - deleteFlowRecordById(flow.flowId); - return params.task; - } - if (linked.parentFlowId !== flow.flowId) { - deleteFlowRecordById(flow.flowId); - return linked; - } - return linked; - } catch (error) { - log.warn("Failed to create one-task flow for detached run", { - taskId: params.task.taskId, - runId: params.task.runId, - error, - }); - return params.task; - } -} - export function createQueuedTaskRun(params: { runtime: TaskRuntime; sourceId?: string; requesterSessionKey: string; requesterOrigin?: TaskDeliveryState["requesterOrigin"]; - parentFlowId?: string; childSessionKey?: string; parentTaskId?: string; agentId?: string; @@ -95,248 +33,10 @@ export function createQueuedTaskRun(params: { notifyPolicy?: TaskNotifyPolicy; deliveryStatus?: TaskDeliveryStatus; }): TaskRecord { - const task = createTaskRecord({ + return createTaskRecord({ ...params, status: "queued", }); - return ensureSingleTaskFlow({ - task, - requesterOrigin: params.requesterOrigin, - }); -} - -export function createLinearFlow(params: { - ownerSessionKey: string; - requesterOrigin?: TaskDeliveryState["requesterOrigin"]; - goal: string; - notifyPolicy?: TaskNotifyPolicy; - currentStep?: string; - createdAt?: number; - updatedAt?: number; -}): FlowRecord { - return createFlowRecord({ - shape: "linear", - ownerSessionKey: params.ownerSessionKey, - requesterOrigin: params.requesterOrigin, - goal: params.goal, - notifyPolicy: params.notifyPolicy, - currentStep: params.currentStep, - status: "queued", - createdAt: params.createdAt, - updatedAt: params.updatedAt, - }); -} - -export function getFlowTaskSummary(flowId: string): TaskRegistrySummary { - return summarizeTaskRecords(listTasksForFlowId(flowId)); -} - -type RetryBlockedFlowResult = { - found: boolean; - retried: boolean; - reason?: string; - previousTask?: TaskRecord; - task?: TaskRecord; -}; - -type RetryBlockedFlowParams = { - flowId: string; - sourceId?: string; - requesterOrigin?: TaskDeliveryState["requesterOrigin"]; - childSessionKey?: string; - agentId?: string; - runId?: string; - label?: string; - task?: string; - preferMetadata?: boolean; - notifyPolicy?: TaskNotifyPolicy; - deliveryStatus?: TaskDeliveryStatus; - status: "queued" | "running"; - startedAt?: number; - lastEventAt?: number; - progressSummary?: string | null; -}; - -function resolveRetryableBlockedFlowTask(flowId: string): { - flowFound: boolean; - retryable: boolean; - latestTask?: TaskRecord; - reason?: string; -} { - const flow = getFlowById(flowId); - if (!flow) { - return { - flowFound: false, - retryable: false, - reason: "Flow not found.", - }; - } - const latestTask = findLatestTaskForFlowId(flowId); - if (!latestTask) { - return { - flowFound: true, - retryable: false, - reason: "Flow has no retryable task.", - }; - } - if (flow.status !== "blocked") { - return { - flowFound: true, - retryable: false, - latestTask, - reason: "Flow is not blocked.", - }; - } - if (latestTask.status !== "succeeded" || latestTask.terminalOutcome !== "blocked") { - return { - flowFound: true, - retryable: false, - latestTask, - reason: "Latest flow task is not blocked.", - }; - } - return { - flowFound: true, - retryable: true, - latestTask, - }; -} - -function retryBlockedFlowTask(params: RetryBlockedFlowParams): RetryBlockedFlowResult { - const resolved = resolveRetryableBlockedFlowTask(params.flowId); - if (!resolved.retryable || !resolved.latestTask) { - return { - found: resolved.flowFound, - retried: false, - reason: resolved.reason, - }; - } - const flow = getFlowById(params.flowId); - if (!flow) { - return { - found: false, - retried: false, - reason: "Flow not found.", - previousTask: resolved.latestTask, - }; - } - const task = createTaskRecord({ - runtime: resolved.latestTask.runtime, - sourceId: params.sourceId ?? resolved.latestTask.sourceId, - requesterSessionKey: flow.ownerSessionKey, - requesterOrigin: params.requesterOrigin ?? flow.requesterOrigin, - parentFlowId: flow.flowId, - childSessionKey: params.childSessionKey, - parentTaskId: resolved.latestTask.taskId, - agentId: params.agentId ?? resolved.latestTask.agentId, - runId: params.runId, - label: params.label ?? resolved.latestTask.label, - task: params.task ?? resolved.latestTask.task, - preferMetadata: params.preferMetadata, - notifyPolicy: params.notifyPolicy ?? resolved.latestTask.notifyPolicy, - deliveryStatus: params.deliveryStatus ?? "pending", - status: params.status, - startedAt: params.startedAt, - lastEventAt: params.lastEventAt, - progressSummary: params.progressSummary, - }); - return { - found: true, - retried: true, - previousTask: resolved.latestTask, - task, - }; -} - -export function retryBlockedFlowAsQueuedTaskRun( - params: Omit, -): RetryBlockedFlowResult { - return retryBlockedFlowTask({ - ...params, - status: "queued", - }); -} - -export function retryBlockedFlowAsRunningTaskRun( - params: Omit, -): RetryBlockedFlowResult { - return retryBlockedFlowTask({ - ...params, - status: "running", - }); -} - -type CancelFlowResult = { - found: boolean; - cancelled: boolean; - reason?: string; - flow?: FlowRecord; - tasks?: TaskRecord[]; -}; - -function isActiveTaskStatus(status: TaskStatus): boolean { - return status === "queued" || status === "running"; -} - -function isTerminalFlowStatus(status: FlowRecord["status"]): boolean { - return ( - status === "succeeded" || status === "failed" || status === "cancelled" || status === "lost" - ); -} - -export async function cancelFlowById(params: { - cfg: OpenClawConfig; - flowId: string; -}): Promise { - const flow = getFlowById(params.flowId); - if (!flow) { - return { - found: false, - cancelled: false, - reason: "Flow not found.", - }; - } - const linkedTasks = listTasksForFlowId(flow.flowId); - const activeTasks = linkedTasks.filter((task) => isActiveTaskStatus(task.status)); - for (const task of activeTasks) { - await cancelTaskById({ - cfg: params.cfg, - taskId: task.taskId, - }); - } - const refreshedTasks = listTasksForFlowId(flow.flowId); - const remainingActive = refreshedTasks.filter((task) => isActiveTaskStatus(task.status)); - if (remainingActive.length > 0) { - return { - found: true, - cancelled: false, - reason: "One or more child tasks are still active.", - flow: getFlowById(flow.flowId), - tasks: refreshedTasks, - }; - } - if (isTerminalFlowStatus(flow.status)) { - return { - found: true, - cancelled: false, - reason: `Flow is already ${flow.status}.`, - flow, - tasks: refreshedTasks, - }; - } - const updatedFlow = updateFlowRecordById(flow.flowId, { - status: "cancelled", - blockedTaskId: null, - blockedSummary: null, - endedAt: Date.now(), - updatedAt: Date.now(), - }); - return { - found: true, - cancelled: true, - flow: updatedFlow ?? getFlowById(flow.flowId), - tasks: refreshedTasks, - }; } export function createRunningTaskRun(params: { @@ -344,7 +44,6 @@ export function createRunningTaskRun(params: { sourceId?: string; requesterSessionKey: string; requesterOrigin?: TaskDeliveryState["requesterOrigin"]; - parentFlowId?: string; childSessionKey?: string; parentTaskId?: string; agentId?: string; @@ -358,14 +57,10 @@ export function createRunningTaskRun(params: { lastEventAt?: number; progressSummary?: string | null; }): TaskRecord { - const task = createTaskRecord({ + return createTaskRecord({ ...params, status: "running", }); - return ensureSingleTaskFlow({ - task, - requesterOrigin: params.requesterOrigin, - }); } export function startTaskRunByRunId(params: { diff --git a/src/tasks/task-registry-import-boundary.test.ts b/src/tasks/task-registry-import-boundary.test.ts index d6c5091f9de..d99ef7775bd 100644 --- a/src/tasks/task-registry-import-boundary.test.ts +++ b/src/tasks/task-registry-import-boundary.test.ts @@ -9,10 +9,7 @@ const ALLOWED_IMPORTERS = new Set([ "agents/tools/session-status-tool.ts", "auto-reply/reply/commands-acp/runtime-options.ts", "auto-reply/reply/commands-subagents/action-info.ts", - "commands/doctor-workspace-status.ts", - "commands/flows.ts", "commands/tasks.ts", - "tasks/flow-runtime.ts", "tasks/task-executor.ts", "tasks/task-registry.maintenance.ts", ]); diff --git a/src/tasks/task-registry.store.sqlite.ts b/src/tasks/task-registry.store.sqlite.ts index 80ad7de93b1..9cf0746bbb0 100644 --- a/src/tasks/task-registry.store.sqlite.ts +++ b/src/tasks/task-registry.store.sqlite.ts @@ -11,7 +11,6 @@ type TaskRegistryRow = { runtime: TaskRecord["runtime"]; source_id: string | null; requester_session_key: string; - parent_flow_id: string | null; child_session_key: string | null; parent_task_id: string | null; agent_id: string | null; @@ -58,7 +57,7 @@ type TaskRegistryDatabase = { let cachedDatabase: TaskRegistryDatabase | null = null; const TASK_REGISTRY_DIR_MODE = 0o700; const TASK_REGISTRY_FILE_MODE = 0o600; -const TASK_REGISTRY_SIDEcar_SUFFIXES = ["", "-shm", "-wal"] as const; +const TASK_REGISTRY_SIDECAR_SUFFIXES = ["", "-shm", "-wal"] as const; function normalizeNumber(value: number | bigint | null): number | undefined { if (typeof value === "bigint") { @@ -92,7 +91,6 @@ function rowToTaskRecord(row: TaskRegistryRow): TaskRecord { runtime: row.runtime, ...(row.source_id ? { sourceId: row.source_id } : {}), requesterSessionKey: row.requester_session_key, - ...(row.parent_flow_id ? { parentFlowId: row.parent_flow_id } : {}), ...(row.child_session_key ? { childSessionKey: row.child_session_key } : {}), ...(row.parent_task_id ? { parentTaskId: row.parent_task_id } : {}), ...(row.agent_id ? { agentId: row.agent_id } : {}), @@ -130,7 +128,6 @@ function bindTaskRecord(record: TaskRecord) { runtime: record.runtime, source_id: record.sourceId ?? null, requester_session_key: record.requesterSessionKey, - parent_flow_id: record.parentFlowId ?? null, child_session_key: record.childSessionKey ?? null, parent_task_id: record.parentTaskId ?? null, agent_id: record.agentId ?? null, @@ -168,7 +165,6 @@ function createStatements(db: DatabaseSync): TaskRegistryStatements { runtime, source_id, requester_session_key, - parent_flow_id, child_session_key, parent_task_id, agent_id, @@ -204,7 +200,6 @@ function createStatements(db: DatabaseSync): TaskRegistryStatements { runtime, source_id, requester_session_key, - parent_flow_id, child_session_key, parent_task_id, agent_id, @@ -228,7 +223,6 @@ function createStatements(db: DatabaseSync): TaskRegistryStatements { @runtime, @source_id, @requester_session_key, - @parent_flow_id, @child_session_key, @parent_task_id, @agent_id, @@ -252,7 +246,6 @@ function createStatements(db: DatabaseSync): TaskRegistryStatements { runtime = excluded.runtime, source_id = excluded.source_id, requester_session_key = excluded.requester_session_key, - parent_flow_id = excluded.parent_flow_id, child_session_key = excluded.child_session_key, parent_task_id = excluded.parent_task_id, agent_id = excluded.agent_id, @@ -297,7 +290,6 @@ function ensureSchema(db: DatabaseSync) { runtime TEXT NOT NULL, source_id TEXT, requester_session_key TEXT NOT NULL, - parent_flow_id TEXT, child_session_key TEXT, parent_task_id TEXT, agent_id TEXT, @@ -326,35 +318,20 @@ function ensureSchema(db: DatabaseSync) { ); `); db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_run_id ON task_runs(run_id);`); - ensureColumn(db, "task_runs", "parent_flow_id", "TEXT"); db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_status ON task_runs(status);`); db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_runtime_status ON task_runs(runtime, status);`); db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_cleanup_after ON task_runs(cleanup_after);`); db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_last_event_at ON task_runs(last_event_at);`); - db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_parent_flow_id ON task_runs(parent_flow_id);`); db.exec( `CREATE INDEX IF NOT EXISTS idx_task_runs_child_session_key ON task_runs(child_session_key);`, ); } -function ensureColumn( - db: DatabaseSync, - tableName: string, - columnName: string, - columnDefinition: string, -) { - const rows = db.prepare(`PRAGMA table_info(${tableName})`).all() as Array<{ name?: string }>; - if (rows.some((row) => row.name === columnName)) { - return; - } - db.exec(`ALTER TABLE ${tableName} ADD COLUMN ${columnName} ${columnDefinition};`); -} - function ensureTaskRegistryPermissions(pathname: string) { const dir = resolveTaskRegistryDir(process.env); mkdirSync(dir, { recursive: true, mode: TASK_REGISTRY_DIR_MODE }); chmodSync(dir, TASK_REGISTRY_DIR_MODE); - for (const suffix of TASK_REGISTRY_SIDEcar_SUFFIXES) { + for (const suffix of TASK_REGISTRY_SIDECAR_SUFFIXES) { const candidate = `${pathname}${suffix}`; if (!existsSync(candidate)) { continue; diff --git a/src/tasks/task-registry.store.test.ts b/src/tasks/task-registry.store.test.ts index 3de5b4776bb..b4b3b6aca3c 100644 --- a/src/tasks/task-registry.store.test.ts +++ b/src/tasks/task-registry.store.test.ts @@ -138,26 +138,6 @@ describe("task-registry store runtime", () => { }); }); - it("persists parent flow linkage on task records", () => { - const created = createTaskRecord({ - runtime: "acp", - requesterSessionKey: "agent:main:main", - parentFlowId: "flow-123", - runId: "run-linked", - task: "Linked task", - status: "running", - deliveryStatus: "pending", - }); - - resetTaskRegistryForTests({ persist: false }); - - expect(findTaskByRunId("run-linked")).toMatchObject({ - taskId: created.taskId, - parentFlowId: "flow-123", - task: "Linked task", - }); - }); - it("hardens the sqlite task store directory and file modes", () => { if (process.platform === "win32") { return; diff --git a/src/tasks/task-registry.test.ts b/src/tasks/task-registry.test.ts index 8153e8effbc..b69cc7af650 100644 --- a/src/tasks/task-registry.test.ts +++ b/src/tasks/task-registry.test.ts @@ -7,8 +7,6 @@ import { } from "../infra/heartbeat-wake.js"; import { peekSystemEvents, resetSystemEventsForTest } from "../infra/system-events.js"; import { withTempDir } from "../test-helpers/temp-dir.js"; -import { installInMemoryTaskAndFlowRegistryRuntime } from "../test-utils/task-flow-registry-runtime.js"; -import { createFlowRecord, getFlowById, resetFlowRegistryForTests } from "./flow-registry.js"; import { createTaskRecord, findLatestTaskForSessionKey, @@ -103,13 +101,11 @@ async function withTaskRegistryTempDir(run: (root: string) => Promise): Pr return await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => { process.env.OPENCLAW_STATE_DIR = root; resetTaskRegistryForTests(); - resetFlowRegistryForTests(); try { return await run(root); } finally { // Close the sqlite-backed registry before Windows temp-dir cleanup tries to remove it. resetTaskRegistryForTests(); - resetFlowRegistryForTests(); } }); } @@ -125,7 +121,6 @@ describe("task-registry", () => { resetSystemEventsForTest(); resetHeartbeatWakeStateForTests(); resetTaskRegistryForTests({ persist: false }); - resetFlowRegistryForTests({ persist: false }); hoisted.sendMessageMock.mockReset(); hoisted.cancelSessionMock.mockReset(); hoisted.killSubagentRunAdminMock.mockReset(); @@ -697,50 +692,6 @@ describe("task-registry", () => { }); }); - it("adopts parent flow linkage when collapsing onto an earlier ACP record", async () => { - await withTaskRegistryTempDir(async (root) => { - process.env.OPENCLAW_STATE_DIR = root; - resetTaskRegistryForTests({ persist: false }); - resetFlowRegistryForTests({ persist: false }); - installInMemoryTaskAndFlowRegistryRuntime(); - - const directTask = createTaskRecord({ - runtime: "acp", - requesterSessionKey: "agent:main:main", - requesterOrigin: { - channel: "telegram", - to: "telegram:123", - }, - childSessionKey: "agent:main:acp:child", - runId: "run-collapse-parent-flow", - task: "Direct ACP child", - status: "running", - deliveryStatus: "pending", - }); - - const spawnedTask = createTaskRecord({ - runtime: "acp", - requesterSessionKey: "agent:main:main", - requesterOrigin: { - channel: "telegram", - to: "telegram:123", - }, - parentFlowId: "flow-123", - childSessionKey: "agent:main:acp:child", - runId: "run-collapse-parent-flow", - task: "Spawn ACP child", - status: "running", - deliveryStatus: "pending", - }); - - expect(spawnedTask.taskId).toBe(directTask.taskId); - expect(findTaskByRunId("run-collapse-parent-flow")).toMatchObject({ - taskId: directTask.taskId, - parentFlowId: "flow-123", - }); - }); - }); - it("collapses ACP run-owned task creation onto the existing spawned task", async () => { await withTaskRegistryTempDir(async (root) => { process.env.OPENCLAW_STATE_DIR = root; @@ -834,7 +785,6 @@ describe("task-registry", () => { const task = createTaskRecord({ runtime: "subagent", requesterSessionKey: "agent:main:main", - parentFlowId: "flow-restore", childSessionKey: "agent:main:subagent:child", runId: "run-restore", task: "Restore me", @@ -848,7 +798,6 @@ describe("task-registry", () => { expect(resolveTaskForLookupToken(task.taskId)).toMatchObject({ taskId: task.taskId, - parentFlowId: "flow-restore", runId: "run-restore", task: "Restore me", }); @@ -1132,71 +1081,6 @@ describe("task-registry", () => { }); }); - it("routes state-change updates through the parent flow owner when a task is flow-linked", async () => { - await withTaskRegistryTempDir(async (root) => { - process.env.OPENCLAW_STATE_DIR = root; - resetTaskRegistryForTests(); - resetFlowRegistryForTests(); - hoisted.sendMessageMock.mockResolvedValue({ - channel: "discord", - to: "discord:flow", - via: "direct", - }); - - const flow = createFlowRecord({ - shape: "single_task", - ownerSessionKey: "agent:flow:owner", - requesterOrigin: { - channel: "discord", - to: "discord:flow", - threadId: "444", - }, - status: "queued", - notifyPolicy: "state_changes", - goal: "Investigate issue", - }); - - const task = createTaskRecord({ - runtime: "acp", - requesterSessionKey: "agent:main:main", - requesterOrigin: { - channel: "telegram", - to: "telegram:123", - }, - parentFlowId: flow.flowId, - childSessionKey: "agent:codex:acp:child", - runId: "run-flow-state", - task: "Investigate issue", - status: "queued", - notifyPolicy: "state_changes", - }); - - markTaskRunningByRunId({ - runId: "run-flow-state", - eventSummary: "Started.", - }); - - await waitForAssertion(() => - expect(hoisted.sendMessageMock).toHaveBeenCalledWith( - expect.objectContaining({ - channel: "discord", - to: "discord:flow", - threadId: "444", - idempotencyKey: expect.stringContaining(`flow-event:${flow.flowId}:${task.taskId}:`), - mirror: expect.objectContaining({ - sessionKey: "agent:flow:owner", - idempotencyKey: expect.stringContaining(`flow-event:${flow.flowId}:${task.taskId}:`), - }), - }), - ), - ); - expect(getFlowById(flow.flowId)).toMatchObject({ - flowId: flow.flowId, - status: "running", - }); - }); - }); - it("keeps background ACP progress off the foreground lane and only sends a terminal notify", async () => { await withTaskRegistryTempDir(async (root) => { process.env.OPENCLAW_STATE_DIR = root; @@ -1320,123 +1204,6 @@ describe("task-registry", () => { }); }); - it("routes terminal delivery through the parent flow owner when a task is flow-linked", async () => { - await withTaskRegistryTempDir(async (root) => { - process.env.OPENCLAW_STATE_DIR = root; - resetTaskRegistryForTests(); - resetFlowRegistryForTests(); - resetSystemEventsForTest(); - hoisted.sendMessageMock.mockResolvedValue({ - channel: "discord", - to: "discord:flow", - via: "direct", - }); - - const flow = createFlowRecord({ - shape: "single_task", - ownerSessionKey: "agent:flow:owner", - requesterOrigin: { - channel: "discord", - to: "discord:flow", - threadId: "444", - }, - status: "running", - goal: "Investigate issue", - }); - - createTaskRecord({ - runtime: "acp", - requesterSessionKey: "agent:main:main", - requesterOrigin: { - channel: "telegram", - to: "telegram:123", - }, - parentFlowId: flow.flowId, - childSessionKey: "agent:codex:acp:child", - runId: "run-flow-terminal", - task: "Investigate issue", - status: "running", - deliveryStatus: "pending", - }); - - emitAgentEvent({ - runId: "run-flow-terminal", - stream: "lifecycle", - data: { - phase: "end", - endedAt: 250, - }, - }); - await flushAsyncWork(); - - await waitForAssertion(() => - expect(hoisted.sendMessageMock).toHaveBeenCalledWith( - expect.objectContaining({ - channel: "discord", - to: "discord:flow", - threadId: "444", - idempotencyKey: expect.stringContaining(`flow-terminal:${flow.flowId}:`), - mirror: expect.objectContaining({ - sessionKey: "agent:flow:owner", - }), - }), - ), - ); - expect(getFlowById(flow.flowId)).toMatchObject({ - flowId: flow.flowId, - status: "succeeded", - endedAt: 250, - }); - expect(peekSystemEvents("agent:main:main")).toEqual([]); - }); - }); - - it("queues fallback terminal delivery on the parent flow owner session when a task is flow-linked", async () => { - await withTaskRegistryTempDir(async (root) => { - process.env.OPENCLAW_STATE_DIR = root; - resetTaskRegistryForTests(); - resetFlowRegistryForTests(); - resetSystemEventsForTest(); - - const flow = createFlowRecord({ - ownerSessionKey: "agent:flow:owner", - status: "running", - goal: "Investigate issue", - }); - - createTaskRecord({ - runtime: "acp", - requesterSessionKey: "agent:main:main", - parentFlowId: flow.flowId, - childSessionKey: "agent:codex:acp:child", - runId: "run-flow-fallback", - task: "Investigate issue", - status: "running", - deliveryStatus: "pending", - }); - - emitAgentEvent({ - runId: "run-flow-fallback", - stream: "lifecycle", - data: { - phase: "end", - endedAt: 250, - }, - }); - await flushAsyncWork(); - - await waitForAssertion(() => - expect(peekSystemEvents("agent:flow:owner")).toEqual([ - "Background task done: ACP background task (run run-flow).", - ]), - ); - expect(peekSystemEvents("agent:main:main")).toEqual([]); - expect(findTaskByRunId("run-flow-fallback")).toMatchObject({ - deliveryStatus: "session_queued", - }); - }); - }); - it("emits concise state-change updates without surfacing raw ACP chatter", async () => { await withTaskRegistryTempDir(async (root) => { process.env.OPENCLAW_STATE_DIR = root; diff --git a/src/tasks/task-registry.ts b/src/tasks/task-registry.ts index 89c83f2eea7..e279cc577e6 100644 --- a/src/tasks/task-registry.ts +++ b/src/tasks/task-registry.ts @@ -9,7 +9,6 @@ import { createSubsystemLogger } from "../logging/subsystem.js"; import { parseAgentSessionKey } from "../routing/session-key.js"; import { normalizeDeliveryContext } from "../utils/delivery-context.js"; import { isDeliverableMessageChannel } from "../utils/message-channel.js"; -import { getFlowById, syncFlowFromTask } from "./flow-registry.js"; import { formatTaskBlockedFollowupMessage, formatTaskStateChangeMessage, @@ -57,7 +56,6 @@ let deliveryRuntimePromise: Promise): TaskRecord | nu addSessionKeyIndex(taskId, next); } persistTaskUpsert(next); - try { - syncFlowFromTask(next); - } catch (error) { - log.warn("Failed to sync parent flow from task update", { - taskId, - flowId: next.parentFlowId, - error, - }); - } emitTaskRegistryHookEvent(() => ({ kind: "upserted", task: cloneTaskRecord(next), @@ -585,7 +556,7 @@ function queueTaskSystemEvent(task: TaskRecord, text: string) { } enqueueSystemEvent(text, { sessionKey: requesterSessionKey, - contextKey: owner.flowId ? `flow:${owner.flowId}` : `task:${task.taskId}`, + contextKey: `task:${task.taskId}`, deliveryContext: owner.requesterOrigin, }); requestHeartbeatNow({ @@ -607,9 +578,7 @@ function queueBlockedTaskFollowup(task: TaskRecord) { } enqueueSystemEvent(followupText, { sessionKey: requesterSessionKey, - contextKey: owner.flowId - ? `flow:${owner.flowId}:blocked-followup` - : `task:${task.taskId}:blocked-followup`, + contextKey: `task:${task.taskId}:blocked-followup`, deliveryContext: owner.requesterOrigin, }); requestHeartbeatNow({ @@ -678,7 +647,7 @@ export async function maybeDeliverTaskTerminalUpdate(taskId: string): Promise - task.parentFlowId?.trim() === normalizedFlowId - ? { ...cloneTaskRecord(task), insertionIndex } - : null, - ) - .filter( - ( - task, - ): task is TaskRecord & { - insertionIndex: number; - } => Boolean(task), - ) - .toSorted(compareTasksNewestFirst) - .map(({ insertionIndex: _, ...task }) => task); -} - -export function findLatestTaskForFlowId(flowId: string): TaskRecord | undefined { - const task = listTasksForFlowId(flowId)[0]; - return task ? cloneTaskRecord(task) : undefined; -} - export function listTasksForSessionKey(sessionKey: string): TaskRecord[] { ensureTaskRegistryReady(); const key = normalizeSessionIndexKey(sessionKey); diff --git a/src/tasks/task-registry.types.ts b/src/tasks/task-registry.types.ts index 74a2296dba0..2228f16d998 100644 --- a/src/tasks/task-registry.types.ts +++ b/src/tasks/task-registry.types.ts @@ -54,7 +54,6 @@ export type TaskRecord = { runtime: TaskRuntime; sourceId?: string; requesterSessionKey: string; - parentFlowId?: string; childSessionKey?: string; parentTaskId?: string; agentId?: string; diff --git a/src/test-utils/task-flow-registry-runtime.ts b/src/test-utils/task-registry-runtime.ts similarity index 60% rename from src/test-utils/task-flow-registry-runtime.ts rename to src/test-utils/task-registry-runtime.ts index 371c264c102..c9ccbe16ea7 100644 --- a/src/test-utils/task-flow-registry-runtime.ts +++ b/src/test-utils/task-registry-runtime.ts @@ -1,9 +1,3 @@ -import { - configureFlowRegistryRuntime, - type FlowRegistryStore, - type FlowRegistryStoreSnapshot, -} from "../tasks/flow-registry.store.js"; -import type { FlowRecord } from "../tasks/flow-registry.types.js"; import { configureTaskRegistryRuntime, type TaskRegistryStore, @@ -22,24 +16,13 @@ function cloneDeliveryState(state: TaskDeliveryState): TaskDeliveryState { }; } -function cloneFlow(flow: FlowRecord): FlowRecord { - return { - ...flow, - ...(flow.requesterOrigin ? { requesterOrigin: { ...flow.requesterOrigin } } : {}), - }; -} - -export function installInMemoryTaskAndFlowRegistryRuntime(): { +export function installInMemoryTaskRegistryRuntime(): { taskStore: TaskRegistryStore; - flowStore: FlowRegistryStore; } { let taskSnapshot: TaskRegistryStoreSnapshot = { tasks: new Map(), deliveryStates: new Map(), }; - let flowSnapshot: FlowRegistryStoreSnapshot = { - flows: new Map(), - }; const taskStore: TaskRegistryStore = { loadSnapshot: () => ({ @@ -80,28 +63,6 @@ export function installInMemoryTaskAndFlowRegistryRuntime(): { }, }; - const flowStore: FlowRegistryStore = { - loadSnapshot: () => ({ - flows: new Map( - [...flowSnapshot.flows.entries()].map(([flowId, flow]) => [flowId, cloneFlow(flow)]), - ), - }), - saveSnapshot: (snapshot) => { - flowSnapshot = { - flows: new Map( - [...snapshot.flows.entries()].map(([flowId, flow]) => [flowId, cloneFlow(flow)]), - ), - }; - }, - upsertFlow: (flow) => { - flowSnapshot.flows.set(flow.flowId, cloneFlow(flow)); - }, - deleteFlow: (flowId) => { - flowSnapshot.flows.delete(flowId); - }, - }; - configureTaskRegistryRuntime({ store: taskStore }); - configureFlowRegistryRuntime({ store: flowStore }); - return { taskStore, flowStore }; + return { taskStore }; }