From 9d9cf0d8ffd5763d69352b24c56c107cb3ac419e Mon Sep 17 00:00:00 2001 From: Mariano <132747814+mbelinky@users.noreply.github.com> Date: Mon, 30 Mar 2026 20:25:01 +0200 Subject: [PATCH] Tasks: route one-task emergence through parent flows (#57874) --- CHANGELOG.md | 1 + src/tasks/flow-registry.ts | 101 +++++++++++++++++- src/tasks/task-executor.test.ts | 88 ++++++++++++++- src/tasks/task-executor.ts | 62 ++++++++++- src/tasks/task-registry.test.ts | 182 ++++++++++++++++++++++++++++++++ src/tasks/task-registry.ts | 133 ++++++++++++++++++----- 6 files changed, 535 insertions(+), 32 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c256fbbc5fa..c41f84ffac1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ Docs: https://docs.openclaw.ai - Matrix/network: add explicit `channels.matrix.proxy` config for routing Matrix traffic through an HTTP(S) proxy, including account-level overrides and matching probe/runtime behavior. (#56931) thanks @patrick-yingxi-pan. - Background tasks: turn tasks into a real shared background-run control plane instead of ACP-only bookkeeping by unifying ACP, subagent, cron, and background CLI execution under one SQLite-backed ledger, routing detached lifecycle updates through the executor seam, adding audit/maintenance/status visibility, tightening auto-cleanup and lost-run recovery, improving task awareness in internal status/tool surfaces, and clarifying the split between heartbeat/main-session automation and detached scheduled runs. Thanks @vincentkoc and @mbelinky. - Flows/tasks: add a minimal SQLite-backed flow registry plus task-to-flow linkage scaffolding, so orchestrated work can start gaining a first-class parent record without changing current task delivery behavior. +- Flows/tasks: route one-task ACP and subagent updates through a parent flow owner context, so detached work can emerge back through the intended parent thread/session instead of speaking only as a raw child task. ### Fixes diff --git a/src/tasks/flow-registry.ts b/src/tasks/flow-registry.ts index 628040b14ec..4d68524b3ae 100644 --- a/src/tasks/flow-registry.ts +++ b/src/tasks/flow-registry.ts @@ -1,7 +1,7 @@ import crypto from "node:crypto"; import { getFlowRegistryStore, resetFlowRegistryRuntimeForTests } from "./flow-registry.store.js"; import type { FlowRecord, FlowStatus } from "./flow-registry.types.js"; -import type { TaskNotifyPolicy } from "./task-registry.types.js"; +import type { TaskNotifyPolicy, TaskRecord } from "./task-registry.types.js"; const flows = new Map(); let restoreAttempted = false; @@ -21,6 +21,31 @@ function ensureNotifyPolicy(notifyPolicy?: TaskNotifyPolicy): TaskNotifyPolicy { return notifyPolicy ?? "done_only"; } +function resolveFlowGoal(task: Pick): string { + return task.label?.trim() || task.task.trim() || "Background task"; +} + +export function deriveFlowStatusFromTask( + task: Pick, +): FlowStatus { + if (task.status === "queued") { + return "queued"; + } + if (task.status === "running") { + return "running"; + } + if (task.status === "succeeded") { + return task.terminalOutcome === "blocked" ? "blocked" : "succeeded"; + } + if (task.status === "cancelled") { + return "cancelled"; + } + if (task.status === "lost") { + return "lost"; + } + return "failed"; +} + function ensureFlowRegistryReady() { if (restoreAttempted) { return; @@ -87,6 +112,43 @@ export function createFlowRecord(params: { return cloneFlowRecord(record); } +export function createFlowForTask(params: { + task: Pick< + TaskRecord, + | "requesterSessionKey" + | "notifyPolicy" + | "status" + | "terminalOutcome" + | "label" + | "task" + | "createdAt" + | "lastEventAt" + | "endedAt" + >; + requesterOrigin?: FlowRecord["requesterOrigin"]; +}): FlowRecord { + const terminalFlowStatus = deriveFlowStatusFromTask(params.task); + const isTerminal = + terminalFlowStatus === "succeeded" || + terminalFlowStatus === "blocked" || + terminalFlowStatus === "failed" || + terminalFlowStatus === "cancelled" || + terminalFlowStatus === "lost"; + const endedAt = isTerminal + ? (params.task.endedAt ?? params.task.lastEventAt ?? params.task.createdAt) + : undefined; + return createFlowRecord({ + ownerSessionKey: params.task.requesterSessionKey, + requesterOrigin: params.requesterOrigin, + status: terminalFlowStatus, + notifyPolicy: params.task.notifyPolicy, + goal: resolveFlowGoal(params.task), + createdAt: params.task.createdAt, + updatedAt: params.task.lastEventAt ?? params.task.createdAt, + ...(endedAt !== undefined ? { endedAt } : {}), + }); +} + export function updateFlowRecordById( flowId: string, patch: Partial< @@ -113,6 +175,43 @@ export function updateFlowRecordById( return cloneFlowRecord(next); } +export function syncFlowFromTask( + task: Pick< + TaskRecord, + | "parentFlowId" + | "status" + | "terminalOutcome" + | "notifyPolicy" + | "label" + | "task" + | "lastEventAt" + | "endedAt" + >, +): FlowRecord | null { + const flowId = task.parentFlowId?.trim(); + if (!flowId) { + return null; + } + const terminalFlowStatus = deriveFlowStatusFromTask(task); + const isTerminal = + terminalFlowStatus === "succeeded" || + terminalFlowStatus === "blocked" || + terminalFlowStatus === "failed" || + terminalFlowStatus === "cancelled" || + terminalFlowStatus === "lost"; + return updateFlowRecordById(flowId, { + status: terminalFlowStatus, + notifyPolicy: task.notifyPolicy, + goal: resolveFlowGoal(task), + updatedAt: task.lastEventAt ?? Date.now(), + ...(isTerminal + ? { + endedAt: task.endedAt ?? task.lastEventAt ?? Date.now(), + } + : {}), + }); +} + export function getFlowById(flowId: string): FlowRecord | undefined { ensureFlowRegistryReady(); const flow = flows.get(flowId); diff --git a/src/tasks/task-executor.test.ts b/src/tasks/task-executor.test.ts index 84074eeb01d..928bff1d76c 100644 --- a/src/tasks/task-executor.test.ts +++ b/src/tasks/task-executor.test.ts @@ -1,5 +1,6 @@ -import { afterEach, describe, expect, it } from "vitest"; +import { afterEach, describe, expect, it, vi } from "vitest"; import { withTempDir } from "../test-helpers/temp-dir.js"; +import { getFlowById, listFlowRecords, resetFlowRegistryForTests } from "./flow-registry.js"; import { completeTaskRunByRunId, createQueuedTaskRun, @@ -12,15 +13,41 @@ import { import { findTaskByRunId, resetTaskRegistryForTests } from "./task-registry.js"; const ORIGINAL_STATE_DIR = process.env.OPENCLAW_STATE_DIR; +const hoisted = vi.hoisted(() => { + const sendMessageMock = vi.fn(); + const cancelSessionMock = vi.fn(); + const killSubagentRunAdminMock = vi.fn(); + return { + sendMessageMock, + cancelSessionMock, + killSubagentRunAdminMock, + }; +}); + +vi.mock("./task-registry-delivery-runtime.js", () => ({ + sendMessage: hoisted.sendMessageMock, +})); + +vi.mock("../acp/control-plane/manager.js", () => ({ + getAcpSessionManager: () => ({ + cancelSession: hoisted.cancelSessionMock, + }), +})); + +vi.mock("../agents/subagent-control.js", () => ({ + killSubagentRunAdmin: (params: unknown) => hoisted.killSubagentRunAdminMock(params), +})); async function withTaskExecutorStateDir(run: (root: string) => Promise): Promise { await withTempDir({ prefix: "openclaw-task-executor-" }, async (root) => { process.env.OPENCLAW_STATE_DIR = root; resetTaskRegistryForTests(); + resetFlowRegistryForTests(); try { await run(root); } finally { resetTaskRegistryForTests(); + resetFlowRegistryForTests(); } }); } @@ -33,6 +60,10 @@ describe("task-executor", () => { process.env.OPENCLAW_STATE_DIR = ORIGINAL_STATE_DIR; } resetTaskRegistryForTests(); + resetFlowRegistryForTests(); + hoisted.sendMessageMock.mockReset(); + hoisted.cancelSessionMock.mockReset(); + hoisted.killSubagentRunAdminMock.mockReset(); }); it("advances a queued run through start and completion", async () => { @@ -110,4 +141,59 @@ describe("task-executor", () => { }); }); }); + + it("auto-creates a one-task flow and keeps it synced with task status", async () => { + await withTaskExecutorStateDir(async () => { + const created = createRunningTaskRun({ + runtime: "subagent", + requesterSessionKey: "agent:main:main", + childSessionKey: "agent:codex:subagent:child", + runId: "run-executor-flow", + task: "Write summary", + startedAt: 10, + deliveryStatus: "pending", + }); + + expect(created.parentFlowId).toEqual(expect.any(String)); + expect(getFlowById(created.parentFlowId!)).toMatchObject({ + flowId: created.parentFlowId, + ownerSessionKey: "agent:main:main", + status: "running", + goal: "Write summary", + notifyPolicy: "done_only", + }); + + completeTaskRunByRunId({ + runId: "run-executor-flow", + endedAt: 40, + lastEventAt: 40, + terminalSummary: "Done.", + }); + + expect(getFlowById(created.parentFlowId!)).toMatchObject({ + flowId: created.parentFlowId, + status: "succeeded", + endedAt: 40, + goal: "Write summary", + notifyPolicy: "done_only", + }); + }); + }); + + it("does not auto-create one-task flows for non-returning bookkeeping runs", async () => { + await withTaskExecutorStateDir(async () => { + const created = createRunningTaskRun({ + runtime: "cli", + requesterSessionKey: "agent:main:main", + childSessionKey: "agent:main:main", + runId: "run-executor-cli", + task: "Foreground gateway run", + deliveryStatus: "not_applicable", + startedAt: 10, + }); + + expect(created.parentFlowId).toBeUndefined(); + expect(listFlowRecords()).toEqual([]); + }); + }); }); diff --git a/src/tasks/task-executor.ts b/src/tasks/task-executor.ts index 4e973a6694e..fd90c37a210 100644 --- a/src/tasks/task-executor.ts +++ b/src/tasks/task-executor.ts @@ -1,7 +1,10 @@ import type { OpenClawConfig } from "../config/config.js"; +import { createSubsystemLogger } from "../logging/subsystem.js"; +import { createFlowForTask, deleteFlowRecordById } from "./flow-registry.js"; import { cancelTaskById, createTaskRecord, + linkTaskToFlowById, markTaskLostById, markTaskRunningByRunId, markTaskTerminalByRunId, @@ -18,6 +21,53 @@ import type { TaskTerminalOutcome, } from "./task-registry.types.js"; +const log = createSubsystemLogger("tasks/executor"); + +function isOneTaskFlowEligible(task: TaskRecord): boolean { + if (task.parentFlowId?.trim() || !task.requesterSessionKey.trim()) { + return false; + } + if (task.deliveryStatus === "not_applicable") { + return false; + } + return task.runtime === "acp" || task.runtime === "subagent"; +} + +function ensureSingleTaskFlow(params: { + task: TaskRecord; + requesterOrigin?: TaskDeliveryState["requesterOrigin"]; +}): TaskRecord { + if (!isOneTaskFlowEligible(params.task)) { + return params.task; + } + try { + const flow = createFlowForTask({ + task: params.task, + requesterOrigin: params.requesterOrigin, + }); + const linked = linkTaskToFlowById({ + taskId: params.task.taskId, + flowId: flow.flowId, + }); + if (!linked) { + deleteFlowRecordById(flow.flowId); + return params.task; + } + if (linked.parentFlowId !== flow.flowId) { + deleteFlowRecordById(flow.flowId); + return linked; + } + return linked; + } catch (error) { + log.warn("Failed to create one-task flow for detached run", { + taskId: params.task.taskId, + runId: params.task.runId, + error, + }); + return params.task; + } +} + export function createQueuedTaskRun(params: { runtime: TaskRuntime; sourceId?: string; @@ -34,10 +84,14 @@ export function createQueuedTaskRun(params: { notifyPolicy?: TaskNotifyPolicy; deliveryStatus?: TaskDeliveryStatus; }): TaskRecord { - return createTaskRecord({ + const task = createTaskRecord({ ...params, status: "queued", }); + return ensureSingleTaskFlow({ + task, + requesterOrigin: params.requesterOrigin, + }); } export function createRunningTaskRun(params: { @@ -59,10 +113,14 @@ export function createRunningTaskRun(params: { lastEventAt?: number; progressSummary?: string | null; }): TaskRecord { - return createTaskRecord({ + const task = createTaskRecord({ ...params, status: "running", }); + return ensureSingleTaskFlow({ + task, + requesterOrigin: params.requesterOrigin, + }); } export function startTaskRunByRunId(params: { diff --git a/src/tasks/task-registry.test.ts b/src/tasks/task-registry.test.ts index 175893287b1..64111206abb 100644 --- a/src/tasks/task-registry.test.ts +++ b/src/tasks/task-registry.test.ts @@ -7,6 +7,7 @@ import { } from "../infra/heartbeat-wake.js"; import { peekSystemEvents, resetSystemEventsForTest } from "../infra/system-events.js"; import { withTempDir } from "../test-helpers/temp-dir.js"; +import { createFlowRecord, getFlowById, resetFlowRegistryForTests } from "./flow-registry.js"; import { createTaskRecord, findLatestTaskForSessionKey, @@ -108,6 +109,7 @@ describe("task-registry", () => { resetSystemEventsForTest(); resetHeartbeatWakeStateForTests(); resetTaskRegistryForTests(); + resetFlowRegistryForTests(); hoisted.sendMessageMock.mockReset(); hoisted.cancelSessionMock.mockReset(); hoisted.killSubagentRunAdminMock.mockReset(); @@ -1112,6 +1114,70 @@ describe("task-registry", () => { }); }); + it("routes state-change updates through the parent flow owner when a task is flow-linked", async () => { + await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskRegistryForTests(); + resetFlowRegistryForTests(); + hoisted.sendMessageMock.mockResolvedValue({ + channel: "discord", + to: "discord:flow", + via: "direct", + }); + + const flow = createFlowRecord({ + ownerSessionKey: "agent:flow:owner", + requesterOrigin: { + channel: "discord", + to: "discord:flow", + threadId: "444", + }, + status: "queued", + notifyPolicy: "state_changes", + goal: "Investigate issue", + }); + + const task = createTaskRecord({ + runtime: "acp", + requesterSessionKey: "agent:main:main", + requesterOrigin: { + channel: "telegram", + to: "telegram:123", + }, + parentFlowId: flow.flowId, + childSessionKey: "agent:codex:acp:child", + runId: "run-flow-state", + task: "Investigate issue", + status: "queued", + notifyPolicy: "state_changes", + }); + + markTaskRunningByRunId({ + runId: "run-flow-state", + eventSummary: "Started.", + }); + + await waitForAssertion(() => + expect(hoisted.sendMessageMock).toHaveBeenCalledWith( + expect.objectContaining({ + channel: "discord", + to: "discord:flow", + threadId: "444", + idempotencyKey: expect.stringContaining(`flow-event:${flow.flowId}:${task.taskId}:`), + mirror: expect.objectContaining({ + sessionKey: "agent:flow:owner", + idempotencyKey: expect.stringContaining(`flow-event:${flow.flowId}:${task.taskId}:`), + }), + }), + ), + ); + expect(getFlowById(flow.flowId)).toMatchObject({ + flowId: flow.flowId, + status: "running", + }); + }); + }); + it("keeps background ACP progress off the foreground lane and only sends a terminal notify", async () => { await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => { process.env.OPENCLAW_STATE_DIR = root; @@ -1235,6 +1301,122 @@ describe("task-registry", () => { }); }); + it("routes terminal delivery through the parent flow owner when a task is flow-linked", async () => { + await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskRegistryForTests(); + resetFlowRegistryForTests(); + resetSystemEventsForTest(); + hoisted.sendMessageMock.mockResolvedValue({ + channel: "discord", + to: "discord:flow", + via: "direct", + }); + + const flow = createFlowRecord({ + ownerSessionKey: "agent:flow:owner", + requesterOrigin: { + channel: "discord", + to: "discord:flow", + threadId: "444", + }, + status: "running", + goal: "Investigate issue", + }); + + createTaskRecord({ + runtime: "acp", + requesterSessionKey: "agent:main:main", + requesterOrigin: { + channel: "telegram", + to: "telegram:123", + }, + parentFlowId: flow.flowId, + childSessionKey: "agent:codex:acp:child", + runId: "run-flow-terminal", + task: "Investigate issue", + status: "running", + deliveryStatus: "pending", + }); + + emitAgentEvent({ + runId: "run-flow-terminal", + stream: "lifecycle", + data: { + phase: "end", + endedAt: 250, + }, + }); + await flushAsyncWork(); + + await waitForAssertion(() => + expect(hoisted.sendMessageMock).toHaveBeenCalledWith( + expect.objectContaining({ + channel: "discord", + to: "discord:flow", + threadId: "444", + idempotencyKey: expect.stringContaining(`flow-terminal:${flow.flowId}:`), + mirror: expect.objectContaining({ + sessionKey: "agent:flow:owner", + }), + }), + ), + ); + expect(getFlowById(flow.flowId)).toMatchObject({ + flowId: flow.flowId, + status: "succeeded", + endedAt: 250, + }); + expect(peekSystemEvents("agent:main:main")).toEqual([]); + }); + }); + + it("queues fallback terminal delivery on the parent flow owner session when a task is flow-linked", async () => { + await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskRegistryForTests(); + resetFlowRegistryForTests(); + resetSystemEventsForTest(); + + const flow = createFlowRecord({ + ownerSessionKey: "agent:flow:owner", + status: "running", + goal: "Investigate issue", + }); + + createTaskRecord({ + runtime: "acp", + requesterSessionKey: "agent:main:main", + parentFlowId: flow.flowId, + childSessionKey: "agent:codex:acp:child", + runId: "run-flow-fallback", + task: "Investigate issue", + status: "running", + deliveryStatus: "pending", + }); + + emitAgentEvent({ + runId: "run-flow-fallback", + stream: "lifecycle", + data: { + phase: "end", + endedAt: 250, + }, + }); + await flushAsyncWork(); + + await waitForAssertion(() => + expect(peekSystemEvents("agent:flow:owner")).toEqual([ + "Background task done: ACP background task (run run-flow).", + ]), + ); + expect(peekSystemEvents("agent:main:main")).toEqual([]); + expect(findTaskByRunId("run-flow-fallback")).toMatchObject({ + deliveryStatus: "session_queued", + }); + }); + }); + it("emits concise state-change updates without surfacing raw ACP chatter", async () => { await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => { process.env.OPENCLAW_STATE_DIR = root; diff --git a/src/tasks/task-registry.ts b/src/tasks/task-registry.ts index 3e67669612e..447ad70804f 100644 --- a/src/tasks/task-registry.ts +++ b/src/tasks/task-registry.ts @@ -9,6 +9,7 @@ import { createSubsystemLogger } from "../logging/subsystem.js"; import { parseAgentSessionKey } from "../routing/session-key.js"; import { normalizeDeliveryContext } from "../utils/delivery-context.js"; import { isDeliverableMessageChannel } from "../utils/message-channel.js"; +import { getFlowById, syncFlowFromTask } from "./flow-registry.js"; import { formatTaskBlockedFollowupMessage, formatTaskStateChangeMessage, @@ -53,6 +54,12 @@ let restoreAttempted = false; let deliveryRuntimePromise: Promise | null = null; +type TaskDeliveryOwner = { + sessionKey: string; + requesterOrigin?: TaskDeliveryState["requesterOrigin"]; + flowId?: string; +}; + function cloneTaskRecord(record: TaskRecord): TaskRecord { return { ...record }; } @@ -433,6 +440,39 @@ function taskTerminalDeliveryIdempotencyKey(task: TaskRecord): string { return `task-terminal:${task.taskId}:${task.status}:${outcome}`; } +function flowTerminalDeliveryIdempotencyKey(flowId: string, task: TaskRecord): string { + const outcome = task.status === "succeeded" ? (task.terminalOutcome ?? "default") : "default"; + return `flow-terminal:${flowId}:${task.taskId}:${task.status}:${outcome}`; +} + +function resolveTaskStateChangeIdempotencyKey(params: { + task: TaskRecord; + latestEvent: TaskEventRecord; + owner: TaskDeliveryOwner; +}): string { + if (params.owner.flowId) { + return `flow-event:${params.owner.flowId}:${params.task.taskId}:${params.latestEvent.at}:${params.latestEvent.kind}`; + } + return `task-event:${params.task.taskId}:${params.latestEvent.at}:${params.latestEvent.kind}`; +} + +function resolveTaskTerminalIdempotencyKey(task: TaskRecord, owner: TaskDeliveryOwner): string { + return owner.flowId + ? flowTerminalDeliveryIdempotencyKey(owner.flowId, task) + : taskTerminalDeliveryIdempotencyKey(task); +} + +function resolveTaskDeliveryOwner(task: TaskRecord): TaskDeliveryOwner { + const flow = task.parentFlowId?.trim() ? getFlowById(task.parentFlowId) : undefined; + return { + sessionKey: flow?.ownerSessionKey?.trim() || task.requesterSessionKey.trim(), + requesterOrigin: normalizeDeliveryContext( + flow?.requesterOrigin ?? taskDeliveryStates.get(task.taskId)?.requesterOrigin, + ), + ...(flow ? { flowId: flow.flowId } : {}), + }; +} + function restoreTaskRegistryOnce() { if (restoreAttempted) { return; @@ -489,6 +529,15 @@ function updateTask(taskId: string, patch: Partial): TaskRecord | nu addSessionKeyIndex(taskId, next); } persistTaskUpsert(next); + try { + syncFlowFromTask(next); + } catch (error) { + log.warn("Failed to sync parent flow from task update", { + taskId, + flowId: next.parentFlowId, + error, + }); + } emitTaskRegistryHookEvent(() => ({ kind: "upserted", task: cloneTaskRecord(next), @@ -522,21 +571,22 @@ function getTaskDeliveryState(taskId: string): TaskDeliveryState | undefined { } function canDeliverTaskToRequesterOrigin(task: TaskRecord): boolean { - const origin = normalizeDeliveryContext(taskDeliveryStates.get(task.taskId)?.requesterOrigin); + const origin = resolveTaskDeliveryOwner(task).requesterOrigin; const channel = origin?.channel?.trim(); const to = origin?.to?.trim(); return Boolean(channel && to && isDeliverableMessageChannel(channel)); } function queueTaskSystemEvent(task: TaskRecord, text: string) { - const requesterSessionKey = task.requesterSessionKey.trim(); + const owner = resolveTaskDeliveryOwner(task); + const requesterSessionKey = owner.sessionKey.trim(); if (!requesterSessionKey) { return false; } enqueueSystemEvent(text, { sessionKey: requesterSessionKey, - contextKey: `task:${task.taskId}`, - deliveryContext: taskDeliveryStates.get(task.taskId)?.requesterOrigin, + contextKey: owner.flowId ? `flow:${owner.flowId}` : `task:${task.taskId}`, + deliveryContext: owner.requesterOrigin, }); requestHeartbeatNow({ reason: "background-task", @@ -550,14 +600,17 @@ function queueBlockedTaskFollowup(task: TaskRecord) { if (!followupText) { return false; } - const requesterSessionKey = task.requesterSessionKey.trim(); + const owner = resolveTaskDeliveryOwner(task); + const requesterSessionKey = owner.sessionKey.trim(); if (!requesterSessionKey) { return false; } enqueueSystemEvent(followupText, { sessionKey: requesterSessionKey, - contextKey: `task:${task.taskId}:blocked-followup`, - deliveryContext: taskDeliveryStates.get(task.taskId)?.requesterOrigin, + contextKey: owner.flowId + ? `flow:${owner.flowId}:blocked-followup` + : `task:${task.taskId}:blocked-followup`, + deliveryContext: owner.requesterOrigin, }); requestHeartbeatNow({ reason: "background-task-blocked", @@ -592,7 +645,8 @@ export async function maybeDeliverTaskTerminalUpdate(taskId: string): Promise