From f86e5c0a08fcbc5bd822e74cd321aedf717cdc39 Mon Sep 17 00:00:00 2001 From: Mariano <132747814+mbelinky@users.noreply.github.com> Date: Tue, 31 Mar 2026 10:08:50 +0200 Subject: [PATCH] ClawFlow: add linear flow control surface (#58227) * ClawFlow: add linear flow control surface * Flows: clear blocked metadata on resume --- CHANGELOG.md | 2 + docs/automation/clawflow.md | 62 +++++ docs/automation/index.md | 8 + docs/automation/tasks.md | 8 + docs/cli/flows.md | 54 +++++ docs/cli/index.md | 13 ++ docs/docs.json | 2 + .../register.status-health-sessions.test.ts | 50 ++++ .../register.status-health-sessions.ts | 81 +++++++ src/commands/doctor-workspace-status.test.ts | 59 +++++ src/commands/doctor-workspace-status.ts | 45 ++++ src/commands/flows.test.ts | 154 +++++++++++++ src/commands/flows.ts | 215 ++++++++++++++++++ src/tasks/flow-registry.store.sqlite.ts | 11 +- src/tasks/flow-registry.store.test.ts | 3 + src/tasks/flow-registry.test.ts | 61 +++++ src/tasks/flow-registry.ts | 49 +++- src/tasks/flow-registry.types.ts | 3 + src/tasks/task-executor.test.ts | 123 +++++++++- src/tasks/task-executor.ts | 111 ++++++++- src/tasks/task-registry.test.ts | 2 + 21 files changed, 1108 insertions(+), 8 deletions(-) create mode 100644 docs/automation/clawflow.md create mode 100644 docs/cli/flows.md create mode 100644 src/commands/flows.test.ts create mode 100644 src/commands/flows.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index e7773fdb1c2..35efd48db4f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,7 @@ Docs: https://docs.openclaw.ai - Flows/tasks: add a minimal SQLite-backed flow registry plus task-to-flow linkage scaffolding, so orchestrated work can start gaining a first-class parent record without changing current task delivery behavior. - Flows/tasks: route one-task ACP and subagent updates through a parent flow owner context, so detached work can emerge back through the intended parent thread/session instead of speaking only as a raw child task. - Flows/tasks: persist blocked state on one-task flows and let the same flow reopen cleanly on retry, so blocked detached work can carry a parent-level reason and continue without fragmenting into a new job. +- ClawFlow: add the first linear flow control surface with `openclaw flows list|show|cancel`, keep manual multi-task flows separate from one-task auto-sync flows, and surface doctor recovery hints for obviously orphaned or broken flow/task linkage. - Matrix/history: add optional room history context for Matrix group triggers via `channels.matrix.historyLimit`, with per-agent watermarks and retry-safe snapshots so failed trigger retries do not drift into newer room messages. (#57022) thanks @chain710. - Diffs: skip unused viewer-versus-file SSR preload work so `diffs` view-only and file-only runs do less render work while keeping mode outputs aligned. (#57909) thanks @gumadeiras. - Matrix/threads: add per-DM `threadReplies` overrides and keep thread session isolation aligned with the effective room or DM thread policy from the triggering message onward. (#57995) thanks @teconomix. @@ -33,6 +34,7 @@ Docs: https://docs.openclaw.ai - Slack/exec approvals: add native Slack approval routing and approver authorization so exec approval prompts can stay in Slack instead of falling back to the Web UI or terminal. Thanks @vincentkoc. ### Fixes + - Image generation/build: write stable runtime alias files into `dist/` and route provider-auth runtime lookups through those aliases so image-generation providers keep resolving auth/runtime modules after rebuilds instead of crashing on missing hashed chunk files. - Config/runtime: pin the first successful config load in memory for the running process and refresh that snapshot on successful writes/reloads, so hot paths stop reparsing `openclaw.json` between watcher-driven swaps. - Config/legacy cleanup: stop probing obsolete alternate legacy config names and service labels during local config/service detection, while keeping the active `~/.openclaw/openclaw.json` path canonical. diff --git a/docs/automation/clawflow.md b/docs/automation/clawflow.md new file mode 100644 index 00000000000..c488d76d978 --- /dev/null +++ b/docs/automation/clawflow.md @@ -0,0 +1,62 @@ +--- +summary: "ClawFlow workflow orchestration for background tasks and detached runs" +read_when: + - You want a flow to own one or more detached tasks + - You want to inspect or cancel a background job as a unit + - You want to understand how flows relate to tasks and background work +title: "ClawFlow" +--- + +# ClawFlow + +ClawFlow is the flow layer above [Background Tasks](/automation/tasks). Tasks still track detached work. ClawFlow groups those task runs into a single job, keeps the parent owner context, and gives you a flow-level control surface. + +Use ClawFlow when the work is more than a single detached run. A flow can still be one task, but it can also coordinate multiple tasks in a simple linear sequence. + +## TL;DR + +- Tasks are the execution records. +- ClawFlow is the job-level wrapper above tasks. +- A flow keeps one owner/session context for the whole job. +- Use `openclaw flows list`, `openclaw flows show`, and `openclaw flows cancel` to inspect or manage flows. + +## Quick start + +```bash +openclaw flows list +openclaw flows show +openclaw flows cancel +``` + +## How it relates to tasks + +Background tasks still do the low-level work: + +- ACP runs +- subagent runs +- cron executions +- CLI-initiated runs + +ClawFlow sits above that ledger: + +- it keeps related task runs under one flow id +- it tracks the flow state separately from the individual task state +- it makes blocked or multi-step work easier to inspect from one place + +For a single detached run, the flow can be a one-task flow. For more structured work, ClawFlow can keep multiple task runs under the same job. + +## CLI surface + +The flow CLI is intentionally small: + +- `openclaw flows list` shows active and recent flows +- `openclaw flows show ` shows one flow and its linked tasks +- `openclaw flows cancel ` cancels the flow and any active child tasks + +The lookup token accepts either a flow id or the owner session key. + +## Related + +- [Background Tasks](/automation/tasks) — detached work ledger +- [CLI: flows](/cli/flows) — flow inspection and control commands +- [Cron Jobs](/automation/cron-jobs) — scheduled jobs that may create tasks diff --git a/docs/automation/index.md b/docs/automation/index.md index 9f91a25c401..f738786fb30 100644 --- a/docs/automation/index.md +++ b/docs/automation/index.md @@ -51,11 +51,19 @@ The most effective setups combine multiple mechanisms: 3. **Hooks** react to specific events (tool calls, session resets, compaction) with custom scripts. 4. **Standing Orders** give the agent persistent context ("always check the project board before replying"). 5. **Background Tasks** automatically track all detached work so you can inspect and audit it. +6. **ClawFlow** groups related detached tasks into a single flow when the work needs a higher-level job view. See [Cron vs Heartbeat](/automation/cron-vs-heartbeat) for a detailed comparison of the two scheduling mechanisms. +## ClawFlow + +ClawFlow sits above [Background Tasks](/automation/tasks). Tasks still track the detached runs, while ClawFlow groups related task runs into one job that you can inspect or cancel from the CLI. + +See [ClawFlow](/automation/clawflow) for the flow overview and [CLI: flows](/cli/flows) for the command surface. + ## Related - [Cron vs Heartbeat](/automation/cron-vs-heartbeat) — detailed comparison guide +- [ClawFlow](/automation/clawflow) — flow-level orchestration above tasks - [Troubleshooting](/automation/troubleshooting) — debugging automation issues - [Configuration Reference](/gateway/configuration-reference) — all config keys diff --git a/docs/automation/tasks.md b/docs/automation/tasks.md index bb08760692c..fbc608c4f47 100644 --- a/docs/automation/tasks.md +++ b/docs/automation/tasks.md @@ -210,6 +210,12 @@ A sweeper runs every **60 seconds** and handles three things: ## How tasks relate to other systems +### Tasks and ClawFlow + +ClawFlow is the flow layer above tasks. A flow groups one or more task runs into a single job, owns the parent session context, and gives you a higher-level control surface for blocked or multi-step work. + +See [ClawFlow](/automation/clawflow) for the flow overview and [CLI: flows](/cli/flows) for the command surface. + ### Tasks and cron A cron job **definition** lives in `~/.openclaw/cron/jobs.json`. **Every** cron execution creates a task record — both main-session and isolated. Main-session cron tasks default to `silent` notify policy so they track without generating notifications. @@ -233,7 +239,9 @@ A task's `runId` links to the agent run doing the work. Agent lifecycle events ( ## Related - [Automation Overview](/automation) — all automation mechanisms at a glance +- [ClawFlow](/automation/clawflow) — job-level orchestration above tasks - [Cron Jobs](/automation/cron-jobs) — scheduling background work - [Cron vs Heartbeat](/automation/cron-vs-heartbeat) — choosing the right mechanism - [Heartbeat](/gateway/heartbeat) — periodic main-session turns +- [CLI: flows](/cli/flows) — flow inspection and control commands - [CLI: Tasks](/cli/index#tasks) — CLI command reference diff --git a/docs/cli/flows.md b/docs/cli/flows.md new file mode 100644 index 00000000000..e0c495e602e --- /dev/null +++ b/docs/cli/flows.md @@ -0,0 +1,54 @@ +--- +summary: "CLI reference for `openclaw flows` (list, inspect, cancel)" +read_when: + - You want to inspect or cancel a flow + - You want to see how background tasks roll up into a higher-level job +title: "flows" +--- + +# `openclaw flows` + +Inspect and manage [ClawFlow](/automation/clawflow) jobs. + +```bash +openclaw flows list +openclaw flows show +openclaw flows cancel +``` + +## Commands + +### `flows list` + +List tracked flows and their task counts. + +```bash +openclaw flows list +openclaw flows list --status blocked +openclaw flows list --json +``` + +### `flows show` + +Show one flow by flow id or owner session key. + +```bash +openclaw flows show +openclaw flows show --json +``` + +The output includes the flow status, current step, blocked summary when present, and linked tasks. + +### `flows cancel` + +Cancel a flow and any active child tasks. + +```bash +openclaw flows cancel +``` + +## Related + +- [ClawFlow](/automation/clawflow) — job-level orchestration above tasks +- [Background Tasks](/automation/tasks) — detached work ledger +- [CLI reference](/cli/index) — full command tree diff --git a/docs/cli/index.md b/docs/cli/index.md index 3a0637abc01..c69fd40484a 100644 --- a/docs/cli/index.md +++ b/docs/cli/index.md @@ -45,6 +45,7 @@ This page describes the current CLI behavior. If commands change, update this do - [`tui`](/cli/tui) - [`browser`](/cli/browser) - [`cron`](/cli/cron) +- [`flows`](/cli/flows) - [`dns`](/cli/dns) - [`docs`](/cli/docs) - [`hooks`](/cli/hooks) @@ -171,6 +172,10 @@ openclaw [--dev] [--profile ] show notify cancel + flows + list + show + cancel gateway call health @@ -809,6 +814,14 @@ List and manage [background task](/automation/tasks) runs across agents. - `tasks cancel ` — cancel a running task - `tasks audit` — surface operational issues (stale, lost, delivery failures) +### `flows` + +List and manage [ClawFlow](/automation/clawflow) jobs across agents. + +- `flows list` — show active and recent flows +- `flows show ` — show details for a specific flow +- `flows cancel ` — cancel a flow and its active child tasks + ## Gateway ### `gateway` diff --git a/docs/docs.json b/docs/docs.json index 0daefe922a8..446486a43d0 100644 --- a/docs/docs.json +++ b/docs/docs.json @@ -1121,6 +1121,7 @@ "automation/cron-jobs", "automation/cron-vs-heartbeat", "automation/tasks", + "automation/clawflow", "automation/troubleshooting", "automation/webhook", "automation/gmail-pubsub", @@ -1432,6 +1433,7 @@ "cli/approvals", "cli/browser", "cli/cron", + "cli/flows", "cli/node", "cli/nodes", "cli/sandbox" diff --git a/src/cli/program/register.status-health-sessions.test.ts b/src/cli/program/register.status-health-sessions.test.ts index f5cc4495ab4..f7c222a6893 100644 --- a/src/cli/program/register.status-health-sessions.test.ts +++ b/src/cli/program/register.status-health-sessions.test.ts @@ -5,6 +5,9 @@ import { registerStatusHealthSessionsCommands } from "./register.status-health-s const mocks = vi.hoisted(() => ({ statusCommand: vi.fn(), healthCommand: vi.fn(), + flowsListCommand: vi.fn(), + flowsShowCommand: vi.fn(), + flowsCancelCommand: vi.fn(), sessionsCommand: vi.fn(), sessionsCleanupCommand: vi.fn(), tasksListCommand: vi.fn(), @@ -23,6 +26,9 @@ const mocks = vi.hoisted(() => ({ const statusCommand = mocks.statusCommand; const healthCommand = mocks.healthCommand; +const flowsListCommand = mocks.flowsListCommand; +const flowsShowCommand = mocks.flowsShowCommand; +const flowsCancelCommand = mocks.flowsCancelCommand; const sessionsCommand = mocks.sessionsCommand; const sessionsCleanupCommand = mocks.sessionsCleanupCommand; const tasksListCommand = mocks.tasksListCommand; @@ -42,6 +48,12 @@ vi.mock("../../commands/health.js", () => ({ healthCommand: mocks.healthCommand, })); +vi.mock("../../commands/flows.js", () => ({ + flowsListCommand: mocks.flowsListCommand, + flowsShowCommand: mocks.flowsShowCommand, + flowsCancelCommand: mocks.flowsCancelCommand, +})); + vi.mock("../../commands/sessions.js", () => ({ sessionsCommand: mocks.sessionsCommand, })); @@ -79,6 +91,9 @@ describe("registerStatusHealthSessionsCommands", () => { runtime.exit.mockImplementation(() => {}); statusCommand.mockResolvedValue(undefined); healthCommand.mockResolvedValue(undefined); + flowsListCommand.mockResolvedValue(undefined); + flowsShowCommand.mockResolvedValue(undefined); + flowsCancelCommand.mockResolvedValue(undefined); sessionsCommand.mockResolvedValue(undefined); sessionsCleanupCommand.mockResolvedValue(undefined); tasksListCommand.mockResolvedValue(undefined); @@ -317,4 +332,39 @@ describe("registerStatusHealthSessionsCommands", () => { runtime, ); }); + + it("runs flows list from the parent command", async () => { + await runCli(["flows", "--json", "--status", "blocked"]); + + expect(flowsListCommand).toHaveBeenCalledWith( + expect.objectContaining({ + json: true, + status: "blocked", + }), + runtime, + ); + }); + + it("runs flows show subcommand with lookup forwarding", async () => { + await runCli(["flows", "show", "flow-123", "--json"]); + + expect(flowsShowCommand).toHaveBeenCalledWith( + expect.objectContaining({ + lookup: "flow-123", + json: true, + }), + runtime, + ); + }); + + it("runs flows cancel subcommand with lookup forwarding", async () => { + await runCli(["flows", "cancel", "flow-123"]); + + expect(flowsCancelCommand).toHaveBeenCalledWith( + expect.objectContaining({ + lookup: "flow-123", + }), + runtime, + ); + }); }); diff --git a/src/cli/program/register.status-health-sessions.ts b/src/cli/program/register.status-health-sessions.ts index 1467be8fe72..4affcdc933c 100644 --- a/src/cli/program/register.status-health-sessions.ts +++ b/src/cli/program/register.status-health-sessions.ts @@ -1,4 +1,5 @@ import type { Command } from "commander"; +import { flowsCancelCommand, flowsListCommand, flowsShowCommand } from "../../commands/flows.js"; import { healthCommand } from "../../commands/health.js"; import { sessionsCleanupCommand } from "../../commands/sessions-cleanup.js"; import { sessionsCommand } from "../../commands/sessions.js"; @@ -373,4 +374,84 @@ export function registerStatusHealthSessionsCommands(program: Command) { ); }); }); + + const flowsCmd = program + .command("flows") + .description("Inspect ClawFlow state") + .option("--json", "Output as JSON", false) + .option( + "--status ", + "Filter by status (queued, running, waiting, blocked, succeeded, failed, cancelled, lost)", + ) + .action(async (opts) => { + await runCommandWithRuntime(defaultRuntime, async () => { + await flowsListCommand( + { + json: Boolean(opts.json), + status: opts.status as string | undefined, + }, + defaultRuntime, + ); + }); + }); + flowsCmd.enablePositionalOptions(); + + flowsCmd + .command("list") + .description("List tracked ClawFlow runs") + .option("--json", "Output as JSON", false) + .option( + "--status ", + "Filter by status (queued, running, waiting, blocked, succeeded, failed, cancelled, lost)", + ) + .action(async (opts, command) => { + const parentOpts = command.parent?.opts() as + | { + json?: boolean; + status?: string; + } + | undefined; + await runCommandWithRuntime(defaultRuntime, async () => { + await flowsListCommand( + { + json: Boolean(opts.json || parentOpts?.json), + status: (opts.status as string | undefined) ?? parentOpts?.status, + }, + defaultRuntime, + ); + }); + }); + + flowsCmd + .command("show") + .description("Show one ClawFlow by flow id or owner session key") + .argument("", "Flow id or owner session key") + .option("--json", "Output as JSON", false) + .action(async (lookup, opts, command) => { + const parentOpts = command.parent?.opts() as { json?: boolean } | undefined; + await runCommandWithRuntime(defaultRuntime, async () => { + await flowsShowCommand( + { + lookup, + json: Boolean(opts.json || parentOpts?.json), + }, + defaultRuntime, + ); + }); + }); + + flowsCmd + .command("cancel") + .description("Cancel a ClawFlow and its active child tasks") + .argument("", "Flow id or owner session key") + .action(async (lookup) => { + await runCommandWithRuntime(defaultRuntime, async () => { + await flowsCancelCommand( + { + lookup, + }, + defaultRuntime, + ); + }); + }); } diff --git a/src/commands/doctor-workspace-status.test.ts b/src/commands/doctor-workspace-status.test.ts index d3803b22ccf..327eaebba46 100644 --- a/src/commands/doctor-workspace-status.test.ts +++ b/src/commands/doctor-workspace-status.test.ts @@ -13,6 +13,8 @@ const mocks = vi.hoisted(() => ({ buildWorkspaceSkillStatus: vi.fn(), buildPluginStatusReport: vi.fn(), buildPluginCompatibilityWarnings: vi.fn(), + listFlowRecords: vi.fn(), + listTasksForFlowId: vi.fn(), })); vi.mock("../agents/agent-scope.js", () => ({ @@ -30,6 +32,14 @@ vi.mock("../plugins/status.js", () => ({ mocks.buildPluginCompatibilityWarnings(...args), })); +vi.mock("../tasks/flow-registry.js", () => ({ + listFlowRecords: (...args: unknown[]) => mocks.listFlowRecords(...args), +})); + +vi.mock("../tasks/task-registry.js", () => ({ + listTasksForFlowId: (...args: unknown[]) => mocks.listTasksForFlowId(...args), +})); + async function runNoteWorkspaceStatusForTest( loadResult: ReturnType, compatibilityWarnings: string[] = [], @@ -44,6 +54,8 @@ async function runNoteWorkspaceStatusForTest( ...loadResult, }); mocks.buildPluginCompatibilityWarnings.mockReturnValue(compatibilityWarnings); + mocks.listFlowRecords.mockReturnValue([]); + mocks.listTasksForFlowId.mockReturnValue([]); const noteSpy = vi.spyOn(noteModule, "note").mockImplementation(() => {}); noteWorkspaceStatus({}); @@ -159,4 +171,51 @@ describe("noteWorkspaceStatus", () => { noteSpy.mockRestore(); } }); + + it("surfaces ClawFlow recovery guidance for suspicious linear flows", async () => { + const noteSpy = await runNoteWorkspaceStatusForTest(createPluginLoadResult({ plugins: [] })); + mocks.listFlowRecords.mockReturnValue([ + { + flowId: "flow-orphaned", + shape: "linear", + ownerSessionKey: "agent:main:main", + status: "waiting", + notifyPolicy: "done_only", + goal: "Process PRs", + createdAt: 10, + updatedAt: 20, + }, + { + flowId: "flow-blocked", + shape: "single_task", + ownerSessionKey: "agent:main:main", + status: "blocked", + notifyPolicy: "done_only", + goal: "Patch file", + blockedTaskId: "task-missing", + createdAt: 10, + updatedAt: 20, + }, + ]); + mocks.listTasksForFlowId.mockImplementation((flowId: string) => { + if (flowId === "flow-blocked") { + return [{ taskId: "task-other" }]; + } + return []; + }); + + noteWorkspaceStatus({}); + + try { + const recoveryCalls = noteSpy.mock.calls.filter(([, title]) => title === "ClawFlow recovery"); + expect(recoveryCalls).toHaveLength(1); + const body = String(recoveryCalls[0]?.[0]); + expect(body).toContain("flow-orphaned: waiting linear flow has no linked tasks"); + expect(body).toContain("flow-blocked: blocked flow points at missing task task-missing"); + expect(body).toContain("openclaw flows show "); + expect(body).toContain("openclaw flows cancel "); + } finally { + noteSpy.mockRestore(); + } + }); }); diff --git a/src/commands/doctor-workspace-status.ts b/src/commands/doctor-workspace-status.ts index c4c8d0a4fea..d0afa05df0e 100644 --- a/src/commands/doctor-workspace-status.ts +++ b/src/commands/doctor-workspace-status.ts @@ -1,10 +1,53 @@ import { resolveAgentWorkspaceDir, resolveDefaultAgentId } from "../agents/agent-scope.js"; import { buildWorkspaceSkillStatus } from "../agents/skills-status.js"; +import { formatCliCommand } from "../cli/command-format.js"; import type { OpenClawConfig } from "../config/config.js"; import { buildPluginCompatibilityWarnings, buildPluginStatusReport } from "../plugins/status.js"; +import { listFlowRecords } from "../tasks/flow-registry.js"; +import { listTasksForFlowId } from "../tasks/task-registry.js"; import { note } from "../terminal/note.js"; import { detectLegacyWorkspaceDirs, formatLegacyWorkspaceWarning } from "./doctor-workspace.js"; +function noteFlowRecoveryHints() { + const suspicious = listFlowRecords().flatMap((flow) => { + const tasks = listTasksForFlowId(flow.flowId); + const findings: string[] = []; + if ( + flow.shape === "linear" && + (flow.status === "running" || flow.status === "waiting" || flow.status === "blocked") && + tasks.length === 0 + ) { + findings.push( + `${flow.flowId}: ${flow.status} linear flow has no linked tasks; inspect or cancel it manually.`, + ); + } + if ( + flow.status === "blocked" && + flow.blockedTaskId && + !tasks.some((task) => task.taskId === flow.blockedTaskId) + ) { + findings.push( + `${flow.flowId}: blocked flow points at missing task ${flow.blockedTaskId}; inspect before retrying.`, + ); + } + return findings; + }); + if (suspicious.length === 0) { + return; + } + note( + [ + ...suspicious.slice(0, 5), + suspicious.length > 5 ? `...and ${suspicious.length - 5} more.` : null, + `Inspect: ${formatCliCommand("openclaw flows show ")}`, + `Cancel: ${formatCliCommand("openclaw flows cancel ")}`, + ] + .filter((line): line is string => Boolean(line)) + .join("\n"), + "ClawFlow recovery", + ); +} + export function noteWorkspaceStatus(cfg: OpenClawConfig) { const workspaceDir = resolveAgentWorkspaceDir(cfg, resolveDefaultAgentId(cfg)); const legacyWorkspace = detectLegacyWorkspaceDirs({ workspaceDir }); @@ -74,5 +117,7 @@ export function noteWorkspaceStatus(cfg: OpenClawConfig) { note(lines.join("\n"), "Plugin diagnostics"); } + noteFlowRecoveryHints(); + return { workspaceDir }; } diff --git a/src/commands/flows.test.ts b/src/commands/flows.test.ts new file mode 100644 index 00000000000..4f9f817baf9 --- /dev/null +++ b/src/commands/flows.test.ts @@ -0,0 +1,154 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; +import { createCliRuntimeCapture } from "../cli/test-runtime-capture.js"; +import { flowsCancelCommand, flowsListCommand, flowsShowCommand } from "./flows.js"; + +const mocks = vi.hoisted(() => ({ + listFlowRecordsMock: vi.fn(), + resolveFlowForLookupTokenMock: vi.fn(), + getFlowByIdMock: vi.fn(), + listTasksForFlowIdMock: vi.fn(), + getFlowTaskSummaryMock: vi.fn(), + cancelFlowByIdMock: vi.fn(), + loadConfigMock: vi.fn(() => ({ loaded: true })), +})); + +vi.mock("../tasks/flow-registry.js", () => ({ + listFlowRecords: (...args: unknown[]) => mocks.listFlowRecordsMock(...args), + resolveFlowForLookupToken: (...args: unknown[]) => mocks.resolveFlowForLookupTokenMock(...args), + getFlowById: (...args: unknown[]) => mocks.getFlowByIdMock(...args), +})); + +vi.mock("../tasks/task-registry.js", () => ({ + listTasksForFlowId: (...args: unknown[]) => mocks.listTasksForFlowIdMock(...args), +})); + +vi.mock("../tasks/task-executor.js", () => ({ + getFlowTaskSummary: (...args: unknown[]) => mocks.getFlowTaskSummaryMock(...args), + cancelFlowById: (...args: unknown[]) => mocks.cancelFlowByIdMock(...args), +})); + +vi.mock("../config/config.js", () => ({ + loadConfig: () => mocks.loadConfigMock(), +})); + +const { + defaultRuntime: runtime, + runtimeLogs, + runtimeErrors, + resetRuntimeCapture, +} = createCliRuntimeCapture(); + +const flowFixture = { + flowId: "flow-12345678", + shape: "linear", + ownerSessionKey: "agent:main:main", + status: "waiting", + notifyPolicy: "done_only", + goal: "Process related PRs", + currentStep: "wait_for", + createdAt: Date.parse("2026-03-31T10:00:00.000Z"), + updatedAt: Date.parse("2026-03-31T10:05:00.000Z"), +} as const; + +const taskSummaryFixture = { + total: 2, + active: 1, + terminal: 1, + failures: 0, + byStatus: { + queued: 0, + running: 1, + succeeded: 1, + failed: 0, + timed_out: 0, + cancelled: 0, + lost: 0, + }, + byRuntime: { + subagent: 1, + acp: 1, + cli: 0, + cron: 0, + }, +} as const; + +const taskFixture = { + taskId: "task-12345678", + runtime: "acp", + requesterSessionKey: "agent:main:main", + parentFlowId: "flow-12345678", + childSessionKey: "agent:codex:acp:child", + runId: "run-12345678", + task: "Review PR", + status: "running", + deliveryStatus: "pending", + notifyPolicy: "done_only", + createdAt: Date.parse("2026-03-31T10:00:00.000Z"), + lastEventAt: Date.parse("2026-03-31T10:05:00.000Z"), +} as const; + +describe("flows commands", () => { + beforeEach(() => { + vi.clearAllMocks(); + resetRuntimeCapture(); + mocks.listFlowRecordsMock.mockReturnValue([]); + mocks.resolveFlowForLookupTokenMock.mockReturnValue(undefined); + mocks.getFlowByIdMock.mockReturnValue(undefined); + mocks.listTasksForFlowIdMock.mockReturnValue([]); + mocks.getFlowTaskSummaryMock.mockReturnValue(taskSummaryFixture); + mocks.cancelFlowByIdMock.mockResolvedValue({ + found: false, + cancelled: false, + reason: "missing", + }); + }); + + it("lists flow rows with task summary counts", async () => { + mocks.listFlowRecordsMock.mockReturnValue([flowFixture]); + + await flowsListCommand({}, runtime); + + expect(runtimeLogs[0]).toContain("Flows: 1"); + expect(runtimeLogs[1]).toContain("Flow pressure: 0 active · 0 blocked · 1 total"); + expect(runtimeLogs.join("\n")).toContain("Process related PRs"); + expect(runtimeLogs.join("\n")).toContain("1 active/2 total"); + }); + + it("shows one flow with linked tasks", async () => { + mocks.resolveFlowForLookupTokenMock.mockReturnValue(flowFixture); + mocks.listTasksForFlowIdMock.mockReturnValue([taskFixture]); + + await flowsShowCommand({ lookup: "flow-12345678" }, runtime); + + expect(runtimeLogs.join("\n")).toContain("shape: linear"); + expect(runtimeLogs.join("\n")).toContain("currentStep: wait_for"); + expect(runtimeLogs.join("\n")).toContain("tasks: 2 total · 1 active · 0 issues"); + expect(runtimeLogs.join("\n")).toContain("task-12345678 running run-12345678 Review PR"); + }); + + it("cancels a flow and reports the updated state", async () => { + mocks.resolveFlowForLookupTokenMock.mockReturnValue(flowFixture); + mocks.cancelFlowByIdMock.mockResolvedValue({ + found: true, + cancelled: true, + flow: { + ...flowFixture, + status: "cancelled", + }, + }); + mocks.getFlowByIdMock.mockReturnValue({ + ...flowFixture, + status: "cancelled", + }); + + await flowsCancelCommand({ lookup: "flow-12345678" }, runtime); + + expect(mocks.loadConfigMock).toHaveBeenCalled(); + expect(mocks.cancelFlowByIdMock).toHaveBeenCalledWith({ + cfg: { loaded: true }, + flowId: "flow-12345678", + }); + expect(runtimeLogs[0]).toContain("Cancelled flow-12345678 (linear) with status cancelled."); + expect(runtimeErrors).toEqual([]); + }); +}); diff --git a/src/commands/flows.ts b/src/commands/flows.ts new file mode 100644 index 00000000000..1c9732a9d89 --- /dev/null +++ b/src/commands/flows.ts @@ -0,0 +1,215 @@ +import { loadConfig } from "../config/config.js"; +import { info } from "../globals.js"; +import type { RuntimeEnv } from "../runtime.js"; +import { getFlowById, listFlowRecords, resolveFlowForLookupToken } from "../tasks/flow-registry.js"; +import type { FlowRecord, FlowStatus } from "../tasks/flow-registry.types.js"; +import { cancelFlowById, getFlowTaskSummary } from "../tasks/task-executor.js"; +import { listTasksForFlowId } from "../tasks/task-registry.js"; +import { isRich, theme } from "../terminal/theme.js"; + +const ID_PAD = 10; +const STATUS_PAD = 10; +const SHAPE_PAD = 12; + +function truncate(value: string, maxChars: number) { + if (value.length <= maxChars) { + return value; + } + if (maxChars <= 1) { + return value.slice(0, maxChars); + } + return `${value.slice(0, maxChars - 1)}…`; +} + +function shortToken(value: string | undefined, maxChars = ID_PAD): string { + const trimmed = value?.trim(); + if (!trimmed) { + return "n/a"; + } + return truncate(trimmed, maxChars); +} + +function formatFlowStatusCell(status: FlowStatus, rich: boolean) { + const padded = status.padEnd(STATUS_PAD); + if (!rich) { + return padded; + } + if (status === "succeeded") { + return theme.success(padded); + } + if (status === "failed" || status === "lost") { + return theme.error(padded); + } + if (status === "running") { + return theme.accentBright(padded); + } + if (status === "blocked") { + return theme.warn(padded); + } + return theme.muted(padded); +} + +function formatFlowRows(flows: FlowRecord[], rich: boolean) { + const header = [ + "Flow".padEnd(ID_PAD), + "Shape".padEnd(SHAPE_PAD), + "Status".padEnd(STATUS_PAD), + "Owner".padEnd(24), + "Tasks".padEnd(14), + "Goal", + ].join(" "); + const lines = [rich ? theme.heading(header) : header]; + for (const flow of flows) { + const taskSummary = getFlowTaskSummary(flow.flowId); + const counts = `${taskSummary.active} active/${taskSummary.total} total`; + lines.push( + [ + shortToken(flow.flowId).padEnd(ID_PAD), + flow.shape.padEnd(SHAPE_PAD), + formatFlowStatusCell(flow.status, rich), + truncate(flow.ownerSessionKey, 24).padEnd(24), + counts.padEnd(14), + truncate(flow.goal, 80), + ].join(" "), + ); + } + return lines; +} + +function formatFlowListSummary(flows: FlowRecord[]) { + const active = flows.filter( + (flow) => flow.status === "queued" || flow.status === "running", + ).length; + const blocked = flows.filter((flow) => flow.status === "blocked").length; + return `${active} active · ${blocked} blocked · ${flows.length} total`; +} + +export async function flowsListCommand( + opts: { json?: boolean; status?: string }, + runtime: RuntimeEnv, +) { + const statusFilter = opts.status?.trim(); + const flows = listFlowRecords().filter((flow) => { + if (statusFilter && flow.status !== statusFilter) { + return false; + } + return true; + }); + + if (opts.json) { + runtime.log( + JSON.stringify( + { + count: flows.length, + status: statusFilter ?? null, + flows: flows.map((flow) => ({ + ...flow, + tasks: listTasksForFlowId(flow.flowId), + taskSummary: getFlowTaskSummary(flow.flowId), + })), + }, + null, + 2, + ), + ); + return; + } + + runtime.log(info(`Flows: ${flows.length}`)); + runtime.log(info(`Flow pressure: ${formatFlowListSummary(flows)}`)); + if (statusFilter) { + runtime.log(info(`Status filter: ${statusFilter}`)); + } + if (flows.length === 0) { + runtime.log("No flows found."); + return; + } + const rich = isRich(); + for (const line of formatFlowRows(flows, rich)) { + runtime.log(line); + } +} + +export async function flowsShowCommand( + opts: { json?: boolean; lookup: string }, + runtime: RuntimeEnv, +) { + const flow = resolveFlowForLookupToken(opts.lookup); + if (!flow) { + runtime.error(`Flow not found: ${opts.lookup}`); + runtime.exit(1); + return; + } + const tasks = listTasksForFlowId(flow.flowId); + const taskSummary = getFlowTaskSummary(flow.flowId); + + if (opts.json) { + runtime.log( + JSON.stringify( + { + ...flow, + tasks, + taskSummary, + }, + null, + 2, + ), + ); + return; + } + + const lines = [ + "Flow:", + `flowId: ${flow.flowId}`, + `shape: ${flow.shape}`, + `status: ${flow.status}`, + `notify: ${flow.notifyPolicy}`, + `ownerSessionKey: ${flow.ownerSessionKey}`, + `goal: ${flow.goal}`, + `currentStep: ${flow.currentStep ?? "n/a"}`, + `blockedTaskId: ${flow.blockedTaskId ?? "n/a"}`, + `blockedSummary: ${flow.blockedSummary ?? "n/a"}`, + `createdAt: ${new Date(flow.createdAt).toISOString()}`, + `updatedAt: ${new Date(flow.updatedAt).toISOString()}`, + `endedAt: ${flow.endedAt ? new Date(flow.endedAt).toISOString() : "n/a"}`, + `tasks: ${taskSummary.total} total · ${taskSummary.active} active · ${taskSummary.failures} issues`, + ]; + for (const line of lines) { + runtime.log(line); + } + if (tasks.length === 0) { + runtime.log("Linked tasks: none"); + return; + } + runtime.log("Linked tasks:"); + for (const task of tasks) { + runtime.log( + `- ${task.taskId} ${task.status} ${task.runId ?? "n/a"} ${task.label ?? task.task}`, + ); + } +} + +export async function flowsCancelCommand(opts: { lookup: string }, runtime: RuntimeEnv) { + const flow = resolveFlowForLookupToken(opts.lookup); + if (!flow) { + runtime.error(`Flow not found: ${opts.lookup}`); + runtime.exit(1); + return; + } + const result = await cancelFlowById({ + cfg: loadConfig(), + flowId: flow.flowId, + }); + if (!result.found) { + runtime.error(result.reason ?? `Flow not found: ${opts.lookup}`); + runtime.exit(1); + return; + } + if (!result.cancelled) { + runtime.error(result.reason ?? `Could not cancel flow: ${opts.lookup}`); + runtime.exit(1); + return; + } + const updated = getFlowById(flow.flowId) ?? result.flow ?? flow; + runtime.log(`Cancelled ${updated.flowId} (${updated.shape}) with status ${updated.status}.`); +} diff --git a/src/tasks/flow-registry.store.sqlite.ts b/src/tasks/flow-registry.store.sqlite.ts index 5f9b44d196c..041fea73741 100644 --- a/src/tasks/flow-registry.store.sqlite.ts +++ b/src/tasks/flow-registry.store.sqlite.ts @@ -4,10 +4,11 @@ import { requireNodeSqlite } from "../infra/node-sqlite.js"; import type { DeliveryContext } from "../utils/delivery-context.js"; import { resolveFlowRegistryDir, resolveFlowRegistrySqlitePath } from "./flow-registry.paths.js"; import type { FlowRegistryStoreSnapshot } from "./flow-registry.store.js"; -import type { FlowRecord } from "./flow-registry.types.js"; +import type { FlowRecord, FlowShape } from "./flow-registry.types.js"; type FlowRegistryRow = { flow_id: string; + shape: FlowShape | null; owner_session_key: string; requester_origin_json: string | null; status: FlowRecord["status"]; @@ -66,6 +67,7 @@ function rowToFlowRecord(row: FlowRegistryRow): FlowRecord { const requesterOrigin = parseJsonValue(row.requester_origin_json); return { flowId: row.flow_id, + shape: row.shape === "linear" ? "linear" : "single_task", ownerSessionKey: row.owner_session_key, ...(requesterOrigin ? { requesterOrigin } : {}), status: row.status, @@ -83,6 +85,7 @@ function rowToFlowRecord(row: FlowRegistryRow): FlowRecord { function bindFlowRecord(record: FlowRecord) { return { flow_id: record.flowId, + shape: record.shape, owner_session_key: record.ownerSessionKey, requester_origin_json: serializeJson(record.requesterOrigin), status: record.status, @@ -102,6 +105,7 @@ function createStatements(db: DatabaseSync): FlowRegistryStatements { selectAll: db.prepare(` SELECT flow_id, + shape, owner_session_key, requester_origin_json, status, @@ -119,6 +123,7 @@ function createStatements(db: DatabaseSync): FlowRegistryStatements { upsertRow: db.prepare(` INSERT INTO flow_runs ( flow_id, + shape, owner_session_key, requester_origin_json, status, @@ -132,6 +137,7 @@ function createStatements(db: DatabaseSync): FlowRegistryStatements { ended_at ) VALUES ( @flow_id, + @shape, @owner_session_key, @requester_origin_json, @status, @@ -145,6 +151,7 @@ function createStatements(db: DatabaseSync): FlowRegistryStatements { @ended_at ) ON CONFLICT(flow_id) DO UPDATE SET + shape = excluded.shape, owner_session_key = excluded.owner_session_key, requester_origin_json = excluded.requester_origin_json, status = excluded.status, @@ -166,6 +173,7 @@ function ensureSchema(db: DatabaseSync) { db.exec(` CREATE TABLE IF NOT EXISTS flow_runs ( flow_id TEXT PRIMARY KEY, + shape TEXT NOT NULL, owner_session_key TEXT NOT NULL, requester_origin_json TEXT, status TEXT NOT NULL, @@ -179,6 +187,7 @@ function ensureSchema(db: DatabaseSync) { ended_at INTEGER ); `); + ensureColumn(db, "flow_runs", "shape", "TEXT"); ensureColumn(db, "flow_runs", "blocked_task_id", "TEXT"); ensureColumn(db, "flow_runs", "blocked_summary", "TEXT"); db.exec(`CREATE INDEX IF NOT EXISTS idx_flow_runs_status ON flow_runs(status);`); diff --git a/src/tasks/flow-registry.store.test.ts b/src/tasks/flow-registry.store.test.ts index 3dd1d8e835e..e83d20a5080 100644 --- a/src/tasks/flow-registry.store.test.ts +++ b/src/tasks/flow-registry.store.test.ts @@ -9,6 +9,7 @@ import type { FlowRecord } from "./flow-registry.types.js"; function createStoredFlow(): FlowRecord { return { flowId: "flow-restored", + shape: "linear", ownerSessionKey: "agent:main:main", status: "blocked", notifyPolicy: "done_only", @@ -61,6 +62,7 @@ describe("flow-registry store runtime", () => { expect(getFlowById("flow-restored")).toMatchObject({ flowId: "flow-restored", + shape: "linear", goal: "Restored flow", blockedTaskId: "task-restored", blockedSummary: "Writable session required.", @@ -98,6 +100,7 @@ describe("flow-registry store runtime", () => { expect(getFlowById(created.flowId)).toMatchObject({ flowId: created.flowId, + shape: "linear", status: "waiting", currentStep: "ask_user", }); diff --git a/src/tasks/flow-registry.test.ts b/src/tasks/flow-registry.test.ts index 6185218b9b3..5a0af86e4e5 100644 --- a/src/tasks/flow-registry.test.ts +++ b/src/tasks/flow-registry.test.ts @@ -82,6 +82,28 @@ describe("flow-registry", () => { }); }); + it("lists newest flows first", async () => { + await withFlowRegistryTempDir(async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetFlowRegistryForTests(); + + const earlier = createFlowRecord({ + ownerSessionKey: "agent:main:main", + goal: "First flow", + createdAt: 100, + updatedAt: 100, + }); + const later = createFlowRecord({ + ownerSessionKey: "agent:main:main", + goal: "Second flow", + createdAt: 200, + updatedAt: 200, + }); + + expect(listFlowRecords().map((flow) => flow.flowId)).toEqual([later.flowId, earlier.flowId]); + }); + }); + it("applies minimal defaults for new flow records", async () => { await withFlowRegistryTempDir(async (root) => { process.env.OPENCLAW_STATE_DIR = root; @@ -94,6 +116,7 @@ describe("flow-registry", () => { expect(created).toMatchObject({ flowId: expect.any(String), + shape: "linear", ownerSessionKey: "agent:main:main", goal: "Background job", status: "queued", @@ -138,6 +161,7 @@ describe("flow-registry", () => { resetFlowRegistryForTests(); const created = createFlowRecord({ + shape: "single_task", ownerSessionKey: "agent:main:main", goal: "Fix permissions", status: "running", @@ -184,4 +208,41 @@ describe("flow-registry", () => { expect(resumed?.endedAt).toBeUndefined(); }); }); + + it("does not auto-sync linear flow state from linked child tasks", async () => { + await withFlowRegistryTempDir(async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetFlowRegistryForTests(); + + const created = createFlowRecord({ + ownerSessionKey: "agent:main:main", + goal: "Cluster PRs", + status: "waiting", + currentStep: "wait_for", + }); + + const synced = syncFlowFromTask({ + taskId: "task-child", + parentFlowId: created.flowId, + status: "running", + notifyPolicy: "done_only", + label: "Child task", + task: "Child task", + lastEventAt: 250, + progressSummary: "Running child task", + }); + + expect(synced).toMatchObject({ + flowId: created.flowId, + shape: "linear", + status: "waiting", + currentStep: "wait_for", + }); + expect(getFlowById(created.flowId)).toMatchObject({ + flowId: created.flowId, + status: "waiting", + currentStep: "wait_for", + }); + }); + }); }); diff --git a/src/tasks/flow-registry.ts b/src/tasks/flow-registry.ts index 0fc647de651..74647959b38 100644 --- a/src/tasks/flow-registry.ts +++ b/src/tasks/flow-registry.ts @@ -1,6 +1,6 @@ import crypto from "node:crypto"; import { getFlowRegistryStore, resetFlowRegistryRuntimeForTests } from "./flow-registry.store.js"; -import type { FlowRecord, FlowStatus } from "./flow-registry.types.js"; +import type { FlowRecord, FlowShape, FlowStatus } from "./flow-registry.types.js"; import type { TaskNotifyPolicy, TaskRecord } from "./task-registry.types.js"; const flows = new Map(); @@ -21,6 +21,10 @@ function ensureNotifyPolicy(notifyPolicy?: TaskNotifyPolicy): TaskNotifyPolicy { return notifyPolicy ?? "done_only"; } +function ensureFlowShape(shape?: FlowShape): FlowShape { + return shape ?? "linear"; +} + function resolveFlowGoal(task: Pick): string { return task.label?.trim() || task.task.trim() || "Background task"; } @@ -111,6 +115,7 @@ function persistFlowDelete(flowId: string) { } export function createFlowRecord(params: { + shape?: FlowShape; ownerSessionKey: string; requesterOrigin?: FlowRecord["requesterOrigin"]; status?: FlowStatus; @@ -127,6 +132,7 @@ export function createFlowRecord(params: { const now = params.createdAt ?? Date.now(); const record: FlowRecord = { flowId: crypto.randomUUID(), + shape: ensureFlowShape(params.shape), ownerSessionKey: params.ownerSessionKey, ...(params.requesterOrigin ? { requesterOrigin: { ...params.requesterOrigin } } : {}), status: params.status ?? "queued", @@ -173,6 +179,7 @@ export function createFlowForTask(params: { ? (params.task.endedAt ?? params.task.lastEventAt ?? params.task.createdAt) : undefined; return createFlowRecord({ + shape: "single_task", ownerSessionKey: params.task.requesterSessionKey, requesterOrigin: params.requesterOrigin, status: terminalFlowStatus, @@ -238,6 +245,13 @@ export function syncFlowFromTask( if (!flowId) { return null; } + const flow = getFlowById(flowId); + if (!flow) { + return null; + } + if (flow.shape !== "single_task") { + return flow; + } const terminalFlowStatus = deriveFlowStatusFromTask(task); const isTerminal = terminalFlowStatus === "succeeded" || @@ -249,15 +263,15 @@ export function syncFlowFromTask( status: terminalFlowStatus, notifyPolicy: task.notifyPolicy, goal: resolveFlowGoal(task), - blockedTaskId: terminalFlowStatus === "blocked" ? task.taskId.trim() || undefined : undefined, + blockedTaskId: terminalFlowStatus === "blocked" ? task.taskId.trim() || null : null, blockedSummary: - terminalFlowStatus === "blocked" ? (resolveFlowBlockedSummary(task) ?? undefined) : undefined, + terminalFlowStatus === "blocked" ? (resolveFlowBlockedSummary(task) ?? null) : null, updatedAt: task.lastEventAt ?? Date.now(), ...(isTerminal ? { endedAt: task.endedAt ?? task.lastEventAt ?? Date.now(), } - : { endedAt: undefined }), + : { endedAt: null }), }); } @@ -267,11 +281,36 @@ export function getFlowById(flowId: string): FlowRecord | undefined { return flow ? cloneFlowRecord(flow) : undefined; } +export function listFlowsForOwnerSessionKey(sessionKey: string): FlowRecord[] { + ensureFlowRegistryReady(); + const normalizedSessionKey = sessionKey.trim(); + if (!normalizedSessionKey) { + return []; + } + return [...flows.values()] + .filter((flow) => flow.ownerSessionKey.trim() === normalizedSessionKey) + .map((flow) => cloneFlowRecord(flow)) + .toSorted((left, right) => right.createdAt - left.createdAt); +} + +export function findLatestFlowForOwnerSessionKey(sessionKey: string): FlowRecord | undefined { + const flow = listFlowsForOwnerSessionKey(sessionKey)[0]; + return flow ? cloneFlowRecord(flow) : undefined; +} + +export function resolveFlowForLookupToken(token: string): FlowRecord | undefined { + const lookup = token.trim(); + if (!lookup) { + return undefined; + } + return getFlowById(lookup) ?? findLatestFlowForOwnerSessionKey(lookup); +} + export function listFlowRecords(): FlowRecord[] { ensureFlowRegistryReady(); return [...flows.values()] .map((flow) => cloneFlowRecord(flow)) - .toSorted((left, right) => left.createdAt - right.createdAt); + .toSorted((left, right) => right.createdAt - left.createdAt); } export function deleteFlowRecordById(flowId: string): boolean { diff --git a/src/tasks/flow-registry.types.ts b/src/tasks/flow-registry.types.ts index 5a52e95100b..e984e12132a 100644 --- a/src/tasks/flow-registry.types.ts +++ b/src/tasks/flow-registry.types.ts @@ -1,6 +1,8 @@ import type { DeliveryContext } from "../utils/delivery-context.js"; import type { TaskNotifyPolicy } from "./task-registry.types.js"; +export type FlowShape = "single_task" | "linear"; + export type FlowStatus = | "queued" | "running" @@ -13,6 +15,7 @@ export type FlowStatus = export type FlowRecord = { flowId: string; + shape: FlowShape; ownerSessionKey: string; requesterOrigin?: DeliveryContext; status: FlowStatus; diff --git a/src/tasks/task-executor.test.ts b/src/tasks/task-executor.test.ts index 36a14fdb29d..330170063e3 100644 --- a/src/tasks/task-executor.test.ts +++ b/src/tasks/task-executor.test.ts @@ -1,8 +1,15 @@ import { afterEach, describe, expect, it, vi } from "vitest"; import { withTempDir } from "../test-helpers/temp-dir.js"; -import { getFlowById, listFlowRecords, resetFlowRegistryForTests } from "./flow-registry.js"; import { + getFlowById, + listFlowRecords, + resetFlowRegistryForTests, + updateFlowRecordById, +} from "./flow-registry.js"; +import { + cancelFlowById, completeTaskRunByRunId, + createLinearFlow, createQueuedTaskRun, createRunningTaskRun, failTaskRunByRunId, @@ -340,4 +347,118 @@ describe("task-executor", () => { expect(findTaskByRunId("run-should-not-exist")).toBeUndefined(); }); }); + + it("keeps linear flows under explicit control instead of auto-syncing child task status", async () => { + await withTaskExecutorStateDir(async () => { + const flow = createLinearFlow({ + ownerSessionKey: "agent:main:main", + goal: "Triage a PR cluster", + currentStep: "wait_for", + notifyPolicy: "done_only", + }); + + const child = createRunningTaskRun({ + runtime: "acp", + requesterSessionKey: "agent:main:main", + parentFlowId: flow.flowId, + childSessionKey: "agent:codex:acp:child", + runId: "run-linear-child", + task: "Inspect a PR", + startedAt: 10, + deliveryStatus: "pending", + }); + + completeTaskRunByRunId({ + runId: "run-linear-child", + endedAt: 40, + lastEventAt: 40, + terminalSummary: "Done.", + }); + + expect(child.parentFlowId).toBe(flow.flowId); + expect(getFlowById(flow.flowId)).toMatchObject({ + flowId: flow.flowId, + shape: "linear", + status: "queued", + currentStep: "wait_for", + }); + }); + }); + + it("cancels active child tasks and marks a linear flow cancelled", async () => { + await withTaskExecutorStateDir(async () => { + hoisted.cancelSessionMock.mockResolvedValue(undefined); + + const flow = createLinearFlow({ + ownerSessionKey: "agent:main:main", + goal: "Cluster related PRs", + currentStep: "wait_for", + }); + + const child = createRunningTaskRun({ + runtime: "acp", + requesterSessionKey: "agent:main:main", + parentFlowId: flow.flowId, + childSessionKey: "agent:codex:acp:child", + runId: "run-linear-cancel", + task: "Inspect a PR", + startedAt: 10, + deliveryStatus: "pending", + }); + + const cancelled = await cancelFlowById({ + cfg: {} as never, + flowId: flow.flowId, + }); + + expect(cancelled).toMatchObject({ + found: true, + cancelled: true, + flow: expect.objectContaining({ + flowId: flow.flowId, + status: "cancelled", + }), + }); + expect(findTaskByRunId("run-linear-cancel")).toMatchObject({ + taskId: child.taskId, + status: "cancelled", + }); + expect(getFlowById(flow.flowId)).toMatchObject({ + flowId: flow.flowId, + status: "cancelled", + }); + expect(hoisted.cancelSessionMock).toHaveBeenCalled(); + }); + }); + + it("refuses to rewrite terminal linear flows when cancel is requested", async () => { + await withTaskExecutorStateDir(async () => { + const flow = createLinearFlow({ + ownerSessionKey: "agent:main:main", + goal: "Cluster related PRs", + currentStep: "finish", + }); + updateFlowRecordById(flow.flowId, { + status: "succeeded", + endedAt: 55, + updatedAt: 55, + }); + + const cancelled = await cancelFlowById({ + cfg: {} as never, + flowId: flow.flowId, + }); + + expect(cancelled).toMatchObject({ + found: true, + cancelled: false, + reason: "Flow is already succeeded.", + }); + expect(getFlowById(flow.flowId)).toMatchObject({ + flowId: flow.flowId, + status: "succeeded", + endedAt: 55, + }); + }); + }); }); diff --git a/src/tasks/task-executor.ts b/src/tasks/task-executor.ts index 4a5dbf7da23..b2e955ce768 100644 --- a/src/tasks/task-executor.ts +++ b/src/tasks/task-executor.ts @@ -1,22 +1,32 @@ import type { OpenClawConfig } from "../config/config.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; -import { createFlowForTask, deleteFlowRecordById, getFlowById } from "./flow-registry.js"; +import { + createFlowForTask, + createFlowRecord, + deleteFlowRecordById, + getFlowById, + updateFlowRecordById, +} from "./flow-registry.js"; +import type { FlowRecord } from "./flow-registry.types.js"; import { cancelTaskById, createTaskRecord, findLatestTaskForFlowId, linkTaskToFlowById, + listTasksForFlowId, markTaskLostById, markTaskRunningByRunId, markTaskTerminalByRunId, recordTaskProgressByRunId, setTaskRunDeliveryStatusByRunId, } from "./task-registry.js"; +import { summarizeTaskRecords } from "./task-registry.summary.js"; import type { TaskDeliveryState, TaskDeliveryStatus, TaskNotifyPolicy, TaskRecord, + TaskRegistrySummary, TaskRuntime, TaskStatus, TaskTerminalOutcome, @@ -95,6 +105,32 @@ export function createQueuedTaskRun(params: { }); } +export function createLinearFlow(params: { + ownerSessionKey: string; + requesterOrigin?: TaskDeliveryState["requesterOrigin"]; + goal: string; + notifyPolicy?: TaskNotifyPolicy; + currentStep?: string; + createdAt?: number; + updatedAt?: number; +}): FlowRecord { + return createFlowRecord({ + shape: "linear", + ownerSessionKey: params.ownerSessionKey, + requesterOrigin: params.requesterOrigin, + goal: params.goal, + notifyPolicy: params.notifyPolicy, + currentStep: params.currentStep, + status: "queued", + createdAt: params.createdAt, + updatedAt: params.updatedAt, + }); +} + +export function getFlowTaskSummary(flowId: string): TaskRegistrySummary { + return summarizeTaskRecords(listTasksForFlowId(flowId)); +} + type RetryBlockedFlowResult = { found: boolean; retried: boolean; @@ -230,6 +266,79 @@ export function retryBlockedFlowAsRunningTaskRun( }); } +type CancelFlowResult = { + found: boolean; + cancelled: boolean; + reason?: string; + flow?: FlowRecord; + tasks?: TaskRecord[]; +}; + +function isActiveTaskStatus(status: TaskStatus): boolean { + return status === "queued" || status === "running"; +} + +function isTerminalFlowStatus(status: FlowRecord["status"]): boolean { + return ( + status === "succeeded" || status === "failed" || status === "cancelled" || status === "lost" + ); +} + +export async function cancelFlowById(params: { + cfg: OpenClawConfig; + flowId: string; +}): Promise { + const flow = getFlowById(params.flowId); + if (!flow) { + return { + found: false, + cancelled: false, + reason: "Flow not found.", + }; + } + const linkedTasks = listTasksForFlowId(flow.flowId); + const activeTasks = linkedTasks.filter((task) => isActiveTaskStatus(task.status)); + for (const task of activeTasks) { + await cancelTaskById({ + cfg: params.cfg, + taskId: task.taskId, + }); + } + const refreshedTasks = listTasksForFlowId(flow.flowId); + const remainingActive = refreshedTasks.filter((task) => isActiveTaskStatus(task.status)); + if (remainingActive.length > 0) { + return { + found: true, + cancelled: false, + reason: "One or more child tasks are still active.", + flow: getFlowById(flow.flowId), + tasks: refreshedTasks, + }; + } + if (isTerminalFlowStatus(flow.status)) { + return { + found: true, + cancelled: false, + reason: `Flow is already ${flow.status}.`, + flow, + tasks: refreshedTasks, + }; + } + const updatedFlow = updateFlowRecordById(flow.flowId, { + status: "cancelled", + blockedTaskId: null, + blockedSummary: null, + endedAt: Date.now(), + updatedAt: Date.now(), + }); + return { + found: true, + cancelled: true, + flow: updatedFlow ?? getFlowById(flow.flowId), + tasks: refreshedTasks, + }; +} + export function createRunningTaskRun(params: { runtime: TaskRuntime; sourceId?: string; diff --git a/src/tasks/task-registry.test.ts b/src/tasks/task-registry.test.ts index ba97cbfbb92..8153e8effbc 100644 --- a/src/tasks/task-registry.test.ts +++ b/src/tasks/task-registry.test.ts @@ -1144,6 +1144,7 @@ describe("task-registry", () => { }); const flow = createFlowRecord({ + shape: "single_task", ownerSessionKey: "agent:flow:owner", requesterOrigin: { channel: "discord", @@ -1332,6 +1333,7 @@ describe("task-registry", () => { }); const flow = createFlowRecord({ + shape: "single_task", ownerSessionKey: "agent:flow:owner", requesterOrigin: { channel: "discord",