refactor(tasks): split delivery state from task runs

This commit is contained in:
Vincent Koc 2026-03-30 13:03:05 +09:00
parent ca2a67e07e
commit fa5827079f
11 changed files with 419 additions and 207 deletions

View File

@ -3,7 +3,11 @@ import type { OpenClawConfig } from "../../config/config.js";
import { logVerbose } from "../../globals.js";
import { normalizeAgentId } from "../../routing/session-key.js";
import { isAcpSessionKey } from "../../sessions/session-key-utils.js";
import { createTaskRecord, updateTaskStateByRunId } from "../../tasks/task-registry.js";
import {
createTaskRecord,
markTaskRunningByRunId,
markTaskTerminalByRunId,
} from "../../tasks/task-registry.js";
import type { DeliveryContext } from "../../utils/delivery-context.js";
import {
AcpRuntimeError,
@ -144,8 +148,6 @@ type BackgroundTaskContext = {
task: string;
};
type BackgroundTaskStatePatch = Omit<Parameters<typeof updateTaskStateByRunId>[0], "runId">;
export class AcpSessionManager {
private readonly actorQueue = new SessionActorQueue();
private readonly actorTailBySession = this.actorQueue.getTailMapForTesting();
@ -786,8 +788,7 @@ export class AcpSessionManager {
);
}
if (taskContext) {
this.updateBackgroundTaskState(taskContext.runId, {
status: "running",
this.markBackgroundTaskRunning(taskContext.runId, {
lastEventAt: Date.now(),
progressSummary: taskProgressSummary || null,
});
@ -832,7 +833,7 @@ export class AcpSessionManager {
});
if (taskContext) {
const terminalResult = resolveBackgroundTaskTerminalResult(taskProgressSummary);
this.updateBackgroundTaskState(taskContext.runId, {
this.markBackgroundTaskTerminal(taskContext.runId, {
status: "succeeded",
endedAt: Date.now(),
lastEventAt: Date.now(),
@ -871,7 +872,7 @@ export class AcpSessionManager {
errorCode: acpError.code,
});
if (taskContext) {
this.updateBackgroundTaskState(taskContext.runId, {
this.markBackgroundTaskTerminal(taskContext.runId, {
status: resolveBackgroundTaskFailureStatus(acpError),
endedAt: Date.now(),
lastEventAt: Date.now(),
@ -1898,11 +1899,46 @@ export class AcpSessionManager {
}
}
private updateBackgroundTaskState(runId: string, patch: BackgroundTaskStatePatch): void {
private markBackgroundTaskRunning(
runId: string,
params: {
lastEventAt?: number;
progressSummary?: string | null;
},
): void {
try {
updateTaskStateByRunId({
...patch,
markTaskRunningByRunId({
runId,
lastEventAt: params.lastEventAt,
progressSummary: params.progressSummary,
});
} catch (error) {
logVerbose(`acp-manager: failed updating background task for ${runId}: ${String(error)}`);
}
}
private markBackgroundTaskTerminal(
runId: string,
params: {
status: "succeeded" | "failed" | "timed_out";
endedAt: number;
lastEventAt?: number;
error?: string;
progressSummary?: string | null;
terminalSummary?: string | null;
terminalOutcome?: "succeeded" | "blocked" | null;
},
): void {
try {
markTaskTerminalByRunId({
runId,
status: params.status,
endedAt: params.endedAt,
lastEventAt: params.lastEventAt,
error: params.error,
progressSummary: params.progressSummary,
terminalSummary: params.terminalSummary,
terminalOutcome: params.terminalOutcome,
});
} catch (error) {
logVerbose(`acp-manager: failed updating background task for ${runId}: ${String(error)}`);

View File

@ -6,7 +6,7 @@ import { onAgentEvent } from "../infra/agent-events.js";
import { requestHeartbeatNow } from "../infra/heartbeat-wake.js";
import { enqueueSystemEvent } from "../infra/system-events.js";
import { scopedHeartbeatWakeOptions } from "../routing/session-key.js";
import { updateTaskStateByRunId } from "../tasks/task-registry.js";
import { recordTaskProgressByRunId } from "../tasks/task-registry.js";
const DEFAULT_STREAM_FLUSH_MS = 2_500;
const DEFAULT_NO_OUTPUT_NOTICE_MS = 60_000;
@ -204,7 +204,7 @@ export function startAcpSpawnParentStreamRelay(params: {
wake();
};
const emitStartNotice = () => {
updateTaskStateByRunId({
recordTaskProgressByRunId({
runId,
lastEventAt: Date.now(),
eventSummary: "Started.",
@ -271,7 +271,7 @@ export function startAcpSpawnParentStreamRelay(params: {
return;
}
stallNotified = true;
updateTaskStateByRunId({
recordTaskProgressByRunId({
runId,
lastEventAt: Date.now(),
eventSummary: `No output for ${Math.round(noOutputNoticeMs / 1000)}s. It may be waiting for input.`,
@ -317,7 +317,7 @@ export function startAcpSpawnParentStreamRelay(params: {
if (stallNotified) {
stallNotified = false;
updateTaskStateByRunId({
recordTaskProgressByRunId({
runId,
lastEventAt: Date.now(),
eventSummary: "Resumed output.",

View File

@ -1,7 +1,10 @@
import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../auto-reply/tokens.js";
import { defaultRuntime } from "../runtime.js";
import { emitSessionLifecycleEvent } from "../sessions/session-lifecycle-events.js";
import { updateTaskDeliveryByRunId, updateTaskStateByRunId } from "../tasks/task-registry.js";
import {
markTaskTerminalByRunId,
setTaskRunDeliveryStatusByRunId,
} from "../tasks/task-registry.js";
import { normalizeDeliveryContext } from "../utils/delivery-context.js";
import {
captureSubagentCompletionReply,
@ -154,7 +157,7 @@ export function createSubagentRegistryLifecycleController(params: {
entry: SubagentRunRecord;
reason: "retry-limit" | "expiry";
}) => {
updateTaskDeliveryByRunId({
setTaskRunDeliveryStatusByRunId({
runId: giveUpParams.runId,
deliveryStatus: "failed",
});
@ -270,7 +273,7 @@ export function createSubagentRegistryLifecycleController(params: {
return;
}
if (didAnnounce) {
updateTaskDeliveryByRunId({
setTaskRunDeliveryStatusByRunId({
runId,
deliveryStatus: "delivered",
});
@ -326,7 +329,7 @@ export function createSubagentRegistryLifecycleController(params: {
}
if (deferredDecision.kind === "give-up") {
updateTaskDeliveryByRunId({
setTaskRunDeliveryStatusByRunId({
runId,
deliveryStatus: "failed",
});
@ -377,26 +380,27 @@ export function createSubagentRegistryLifecycleController(params: {
});
};
void params.runSubagentAnnounceFlow({
childSessionKey: entry.childSessionKey,
childRunId: entry.runId,
requesterSessionKey: entry.requesterSessionKey,
requesterOrigin,
requesterDisplayKey: entry.requesterDisplayKey,
task: entry.task,
timeoutMs: params.subagentAnnounceTimeoutMs,
cleanup: entry.cleanup,
roundOneReply: entry.frozenResultText ?? undefined,
fallbackReply: entry.fallbackFrozenResultText ?? undefined,
waitForCompletion: false,
startedAt: entry.startedAt,
endedAt: entry.endedAt,
label: entry.label,
outcome: entry.outcome,
spawnMode: entry.spawnMode,
expectsCompletionMessage: entry.expectsCompletionMessage,
wakeOnDescendantSettle: entry.wakeOnDescendantSettle === true,
})
void params
.runSubagentAnnounceFlow({
childSessionKey: entry.childSessionKey,
childRunId: entry.runId,
requesterSessionKey: entry.requesterSessionKey,
requesterOrigin,
requesterDisplayKey: entry.requesterDisplayKey,
task: entry.task,
timeoutMs: params.subagentAnnounceTimeoutMs,
cleanup: entry.cleanup,
roundOneReply: entry.frozenResultText ?? undefined,
fallbackReply: entry.fallbackFrozenResultText ?? undefined,
waitForCompletion: false,
startedAt: entry.startedAt,
endedAt: entry.endedAt,
label: entry.label,
outcome: entry.outcome,
spawnMode: entry.spawnMode,
expectsCompletionMessage: entry.expectsCompletionMessage,
wakeOnDescendantSettle: entry.wakeOnDescendantSettle === true,
})
.then((didAnnounce) => {
finalizeAnnounceCleanup(didAnnounce);
})
@ -458,7 +462,7 @@ export function createSubagentRegistryLifecycleController(params: {
if (mutated) {
params.persist();
}
updateTaskStateByRunId({
markTaskTerminalByRunId({
runId: entry.runId,
status:
completeParams.outcome.status === "ok"

View File

@ -72,13 +72,6 @@ const taskFixture = {
createdAt: Date.parse("2026-03-29T10:00:00.000Z"),
lastEventAt: Date.parse("2026-03-29T10:00:10.000Z"),
progressSummary: "No output for 60s. It may be waiting for input.",
recentEvents: [
{
at: Date.parse("2026-03-29T10:00:10.000Z"),
kind: "progress",
summary: "No output for 60s. It may be waiting for input.",
},
],
} as const;
beforeAll(async () => {
@ -180,7 +173,6 @@ describe("tasks commands", () => {
expect(runtimeLogs.join("\n")).toContain(
"progressSummary: No output for 60s. It may be waiting for input.",
);
expect(runtimeLogs.join("\n")).toContain("recentEvent[0]: 2026-03-29T10:00:10.000Z progress");
});
it("updates notify policy for an existing task", async () => {

View File

@ -246,14 +246,6 @@ export async function tasksShowCommand(
...(task.error ? [`error: ${task.error}`] : []),
...(task.progressSummary ? [`progressSummary: ${task.progressSummary}`] : []),
...(task.terminalSummary ? [`terminalSummary: ${task.terminalSummary}`] : []),
...(task.recentEvents?.length
? task.recentEvents.map(
(event, index) =>
`recentEvent[${index}]: ${new Date(event.at).toISOString()} ${event.kind}${
event.summary ? ` ${event.summary}` : ""
}`,
)
: []),
];
for (const line of lines) {
runtime.log(line);

View File

@ -3,14 +3,14 @@ import type { DatabaseSync, StatementSync } from "node:sqlite";
import { requireNodeSqlite } from "../infra/node-sqlite.js";
import type { DeliveryContext } from "../utils/delivery-context.js";
import { resolveTaskRegistryDir, resolveTaskRegistrySqlitePath } from "./task-registry.paths.js";
import type { TaskEventRecord, TaskRecord } from "./task-registry.types.js";
import type { TaskRegistryStoreSnapshot } from "./task-registry.store.js";
import type { TaskDeliveryState, TaskRecord } from "./task-registry.types.js";
type TaskRegistryRow = {
task_id: string;
runtime: TaskRecord["runtime"];
source_id: string | null;
requester_session_key: string;
requester_origin_json: string | null;
child_session_key: string | null;
parent_task_id: string | null;
agent_id: string | null;
@ -29,15 +29,23 @@ type TaskRegistryRow = {
progress_summary: string | null;
terminal_summary: string | null;
terminal_outcome: TaskRecord["terminalOutcome"] | null;
recent_events_json: string | null;
};
type TaskDeliveryStateRow = {
task_id: string;
requester_origin_json: string | null;
last_notified_event_at: number | bigint | null;
};
type TaskRegistryStatements = {
selectAll: StatementSync;
selectAllDeliveryStates: StatementSync;
replaceRow: StatementSync;
replaceDeliveryState: StatementSync;
deleteRow: StatementSync;
deleteDeliveryState: StatementSync;
clearRows: StatementSync;
clearDeliveryStates: StatementSync;
};
type TaskRegistryDatabase = {
@ -74,19 +82,15 @@ function parseJsonValue<T>(raw: string | null): T | undefined {
}
function rowToTaskRecord(row: TaskRegistryRow): TaskRecord {
const requesterOrigin = parseJsonValue<DeliveryContext>(row.requester_origin_json);
const recentEvents = parseJsonValue<TaskEventRecord[]>(row.recent_events_json);
const startedAt = normalizeNumber(row.started_at);
const endedAt = normalizeNumber(row.ended_at);
const lastEventAt = normalizeNumber(row.last_event_at);
const cleanupAfter = normalizeNumber(row.cleanup_after);
const lastNotifiedEventAt = normalizeNumber(row.last_notified_event_at);
return {
taskId: row.task_id,
runtime: row.runtime,
...(row.source_id ? { sourceId: row.source_id } : {}),
requesterSessionKey: row.requester_session_key,
...(requesterOrigin ? { requesterOrigin } : {}),
...(row.child_session_key ? { childSessionKey: row.child_session_key } : {}),
...(row.parent_task_id ? { parentTaskId: row.parent_task_id } : {}),
...(row.agent_id ? { agentId: row.agent_id } : {}),
@ -105,7 +109,15 @@ function rowToTaskRecord(row: TaskRegistryRow): TaskRecord {
...(row.progress_summary ? { progressSummary: row.progress_summary } : {}),
...(row.terminal_summary ? { terminalSummary: row.terminal_summary } : {}),
...(row.terminal_outcome ? { terminalOutcome: row.terminal_outcome } : {}),
...(recentEvents?.length ? { recentEvents } : {}),
};
}
function rowToTaskDeliveryState(row: TaskDeliveryStateRow): TaskDeliveryState {
const requesterOrigin = parseJsonValue<DeliveryContext>(row.requester_origin_json);
const lastNotifiedEventAt = normalizeNumber(row.last_notified_event_at);
return {
taskId: row.task_id,
...(requesterOrigin ? { requesterOrigin } : {}),
...(lastNotifiedEventAt != null ? { lastNotifiedEventAt } : {}),
};
}
@ -116,7 +128,6 @@ function bindTaskRecord(record: TaskRecord) {
runtime: record.runtime,
source_id: record.sourceId ?? null,
requester_session_key: record.requesterSessionKey,
requester_origin_json: serializeJson(record.requesterOrigin),
child_session_key: record.childSessionKey ?? null,
parent_task_id: record.parentTaskId ?? null,
agent_id: record.agentId ?? null,
@ -135,8 +146,14 @@ function bindTaskRecord(record: TaskRecord) {
progress_summary: record.progressSummary ?? null,
terminal_summary: record.terminalSummary ?? null,
terminal_outcome: record.terminalOutcome ?? null,
recent_events_json: serializeJson(record.recentEvents),
last_notified_event_at: record.lastNotifiedEventAt ?? null,
};
}
function bindTaskDeliveryState(state: TaskDeliveryState) {
return {
task_id: state.taskId,
requester_origin_json: serializeJson(state.requesterOrigin),
last_notified_event_at: state.lastNotifiedEventAt ?? null,
};
}
@ -148,7 +165,6 @@ function createStatements(db: DatabaseSync): TaskRegistryStatements {
runtime,
source_id,
requester_session_key,
requester_origin_json,
child_session_key,
parent_task_id,
agent_id,
@ -166,19 +182,24 @@ function createStatements(db: DatabaseSync): TaskRegistryStatements {
error,
progress_summary,
terminal_summary,
terminal_outcome,
recent_events_json,
last_notified_event_at
terminal_outcome
FROM task_runs
ORDER BY created_at ASC, task_id ASC
`),
selectAllDeliveryStates: db.prepare(`
SELECT
task_id,
requester_origin_json,
last_notified_event_at
FROM task_delivery_state
ORDER BY task_id ASC
`),
replaceRow: db.prepare(`
INSERT OR REPLACE INTO task_runs (
task_id,
runtime,
source_id,
requester_session_key,
requester_origin_json,
child_session_key,
parent_task_id,
agent_id,
@ -196,15 +217,12 @@ function createStatements(db: DatabaseSync): TaskRegistryStatements {
error,
progress_summary,
terminal_summary,
terminal_outcome,
recent_events_json,
last_notified_event_at
terminal_outcome
) VALUES (
@task_id,
@runtime,
@source_id,
@requester_session_key,
@requester_origin_json,
@child_session_key,
@parent_task_id,
@agent_id,
@ -222,13 +240,24 @@ function createStatements(db: DatabaseSync): TaskRegistryStatements {
@error,
@progress_summary,
@terminal_summary,
@terminal_outcome,
@recent_events_json,
@terminal_outcome
)
`),
replaceDeliveryState: db.prepare(`
INSERT OR REPLACE INTO task_delivery_state (
task_id,
requester_origin_json,
last_notified_event_at
) VALUES (
@task_id,
@requester_origin_json,
@last_notified_event_at
)
`),
deleteRow: db.prepare(`DELETE FROM task_runs WHERE task_id = ?`),
deleteDeliveryState: db.prepare(`DELETE FROM task_delivery_state WHERE task_id = ?`),
clearRows: db.prepare(`DELETE FROM task_runs`),
clearDeliveryStates: db.prepare(`DELETE FROM task_delivery_state`),
};
}
@ -239,7 +268,6 @@ function ensureSchema(db: DatabaseSync) {
runtime TEXT NOT NULL,
source_id TEXT,
requester_session_key TEXT NOT NULL,
requester_origin_json TEXT,
child_session_key TEXT,
parent_task_id TEXT,
agent_id TEXT,
@ -257,8 +285,13 @@ function ensureSchema(db: DatabaseSync) {
error TEXT,
progress_summary TEXT,
terminal_summary TEXT,
terminal_outcome TEXT,
recent_events_json TEXT,
terminal_outcome TEXT
);
`);
db.exec(`
CREATE TABLE IF NOT EXISTS task_delivery_state (
task_id TEXT PRIMARY KEY,
requester_origin_json TEXT,
last_notified_event_at INTEGER
);
`);
@ -323,18 +356,26 @@ function withWriteTransaction(write: (statements: TaskRegistryStatements) => voi
}
}
export function loadTaskRegistrySnapshotFromSqlite(): Map<string, TaskRecord> {
export function loadTaskRegistryStateFromSqlite(): TaskRegistryStoreSnapshot {
const { statements } = openTaskRegistryDatabase();
const rows = statements.selectAll.all() as TaskRegistryRow[];
return new Map(rows.map((row) => [row.task_id, rowToTaskRecord(row)]));
const taskRows = statements.selectAll.all() as TaskRegistryRow[];
const deliveryRows = statements.selectAllDeliveryStates.all() as TaskDeliveryStateRow[];
return {
tasks: new Map(taskRows.map((row) => [row.task_id, rowToTaskRecord(row)])),
deliveryStates: new Map(deliveryRows.map((row) => [row.task_id, rowToTaskDeliveryState(row)])),
};
}
export function saveTaskRegistrySnapshotToSqlite(tasks: ReadonlyMap<string, TaskRecord>) {
export function saveTaskRegistryStateToSqlite(snapshot: TaskRegistryStoreSnapshot) {
withWriteTransaction((statements) => {
statements.clearDeliveryStates.run();
statements.clearRows.run();
for (const task of tasks.values()) {
for (const task of snapshot.tasks.values()) {
statements.replaceRow.run(bindTaskRecord(task));
}
for (const state of snapshot.deliveryStates.values()) {
statements.replaceDeliveryState.run(bindTaskDeliveryState(state));
}
});
}
@ -347,6 +388,19 @@ export function upsertTaskRegistryRecordToSqlite(task: TaskRecord) {
export function deleteTaskRegistryRecordFromSqlite(taskId: string) {
const store = openTaskRegistryDatabase();
store.statements.deleteRow.run(taskId);
store.statements.deleteDeliveryState.run(taskId);
ensureTaskRegistryPermissions(store.path);
}
export function upsertTaskDeliveryStateToSqlite(state: TaskDeliveryState) {
const store = openTaskRegistryDatabase();
store.statements.replaceDeliveryState.run(bindTaskDeliveryState(state));
ensureTaskRegistryPermissions(store.path);
}
export function deleteTaskDeliveryStateFromSqlite(taskId: string) {
const store = openTaskRegistryDatabase();
store.statements.deleteDeliveryState.run(taskId);
ensureTaskRegistryPermissions(store.path);
}

View File

@ -37,7 +37,10 @@ describe("task-registry store runtime", () => {
it("uses the configured task store for restore and save", () => {
const storedTask = createStoredTask();
const loadSnapshot = vi.fn(() => new Map([[storedTask.taskId, storedTask]]));
const loadSnapshot = vi.fn(() => ({
tasks: new Map([[storedTask.taskId, storedTask]]),
deliveryStates: new Map(),
}));
const saveSnapshot = vi.fn();
configureTaskRegistryRuntime({
store: {
@ -63,16 +66,21 @@ describe("task-registry store runtime", () => {
});
expect(saveSnapshot).toHaveBeenCalled();
const latestSnapshot = saveSnapshot.mock.calls.at(-1)?.[0] as ReadonlyMap<string, TaskRecord>;
expect(latestSnapshot.size).toBe(2);
expect(latestSnapshot.get("task-restored")?.task).toBe("Restored task");
const latestSnapshot = saveSnapshot.mock.calls.at(-1)?.[0] as {
tasks: ReadonlyMap<string, TaskRecord>;
};
expect(latestSnapshot.tasks.size).toBe(2);
expect(latestSnapshot.tasks.get("task-restored")?.task).toBe("Restored task");
});
it("emits incremental hook events for restore, mutation, and delete", () => {
const events: TaskRegistryHookEvent[] = [];
configureTaskRegistryRuntime({
store: {
loadSnapshot: () => new Map([[createStoredTask().taskId, createStoredTask()]]),
loadSnapshot: () => ({
tasks: new Map([[createStoredTask().taskId, createStoredTask()]]),
deliveryStates: new Map(),
}),
saveSnapshot: () => {},
},
hooks: {

View File

@ -1,17 +1,26 @@
import {
closeTaskRegistrySqliteStore,
deleteTaskDeliveryStateFromSqlite,
deleteTaskRegistryRecordFromSqlite,
loadTaskRegistrySnapshotFromSqlite,
saveTaskRegistrySnapshotToSqlite,
loadTaskRegistryStateFromSqlite,
saveTaskRegistryStateToSqlite,
upsertTaskDeliveryStateToSqlite,
upsertTaskRegistryRecordToSqlite,
} from "./task-registry.store.sqlite.js";
import type { TaskRecord } from "./task-registry.types.js";
import type { TaskDeliveryState, TaskRecord } from "./task-registry.types.js";
export type TaskRegistryStoreSnapshot = {
tasks: Map<string, TaskRecord>;
deliveryStates: Map<string, TaskDeliveryState>;
};
export type TaskRegistryStore = {
loadSnapshot: () => Map<string, TaskRecord>;
saveSnapshot: (tasks: ReadonlyMap<string, TaskRecord>) => void;
loadSnapshot: () => TaskRegistryStoreSnapshot;
saveSnapshot: (snapshot: TaskRegistryStoreSnapshot) => void;
upsertTask?: (task: TaskRecord) => void;
deleteTask?: (taskId: string) => void;
upsertDeliveryState?: (state: TaskDeliveryState) => void;
deleteDeliveryState?: (taskId: string) => void;
close?: () => void;
};
@ -37,10 +46,12 @@ export type TaskRegistryHooks = {
};
const defaultTaskRegistryStore: TaskRegistryStore = {
loadSnapshot: loadTaskRegistrySnapshotFromSqlite,
saveSnapshot: saveTaskRegistrySnapshotToSqlite,
loadSnapshot: loadTaskRegistryStateFromSqlite,
saveSnapshot: saveTaskRegistryStateToSqlite,
upsertTask: upsertTaskRegistryRecordToSqlite,
deleteTask: deleteTaskRegistryRecordFromSqlite,
upsertDeliveryState: upsertTaskDeliveryStateToSqlite,
deleteDeliveryState: deleteTaskDeliveryStateFromSqlite,
close: closeTaskRegistrySqliteStore,
};

View File

@ -15,12 +15,13 @@ import {
listTaskRecords,
maybeDeliverTaskStateChangeUpdate,
maybeDeliverTaskTerminalUpdate,
markTaskRunningByRunId,
recordTaskProgressByRunId,
resetTaskRegistryForTests,
resolveTaskForLookupToken,
setTaskProgressById,
setTaskTimingById,
updateTaskNotifyPolicyById,
updateTaskStateByRunId,
} from "./task-registry.js";
import {
getInspectableTaskAuditSummary,
@ -890,8 +891,8 @@ describe("task-registry", () => {
const now = Date.now();
configureTaskRegistryRuntime({
store: {
loadSnapshot: () =>
new Map([
loadSnapshot: () => ({
tasks: new Map([
[
"task-missing-cleanup",
{
@ -909,6 +910,8 @@ describe("task-registry", () => {
},
],
]),
deliveryStates: new Map(),
}),
saveSnapshot: () => {},
},
});
@ -935,8 +938,8 @@ describe("task-registry", () => {
const now = Date.now();
configureTaskRegistryRuntime({
store: {
loadSnapshot: () =>
new Map([
loadSnapshot: () => ({
tasks: new Map([
[
"task-audit-summary",
{
@ -954,6 +957,8 @@ describe("task-registry", () => {
},
],
]),
deliveryStates: new Map(),
}),
saveSnapshot: () => {},
},
});
@ -998,9 +1003,8 @@ describe("task-registry", () => {
notifyPolicy: "done_only",
});
updateTaskStateByRunId({
markTaskRunningByRunId({
runId: "run-state-change",
status: "running",
eventSummary: "Started.",
});
await waitForAssertion(() => expect(hoisted.sendMessageMock).not.toHaveBeenCalled());
@ -1009,7 +1013,7 @@ describe("task-registry", () => {
taskId: task.taskId,
notifyPolicy: "state_changes",
});
updateTaskStateByRunId({
recordTaskProgressByRunId({
runId: "run-state-change",
eventSummary: "No output for 60s. It may be waiting for input.",
});
@ -1024,13 +1028,6 @@ describe("task-registry", () => {
);
expect(findTaskByRunId("run-state-change")).toMatchObject({
notifyPolicy: "state_changes",
lastNotifiedEventAt: expect.any(Number),
recentEvents: expect.arrayContaining([
expect.objectContaining({
kind: "progress",
summary: "No output for 60s. It may be waiting for input.",
}),
]),
});
await maybeDeliverTaskStateChangeUpdate(task.taskId);
expect(hoisted.sendMessageMock).toHaveBeenCalledTimes(1);

View File

@ -17,6 +17,7 @@ import {
} from "./task-registry.store.js";
import { summarizeTaskRecords } from "./task-registry.summary.js";
import type {
TaskDeliveryState,
TaskDeliveryStatus,
TaskEventKind,
TaskEventRecord,
@ -33,6 +34,7 @@ const log = createSubsystemLogger("tasks/registry");
const DEFAULT_TASK_RETENTION_MS = 7 * 24 * 60 * 60_000;
const tasks = new Map<string, TaskRecord>();
const taskDeliveryStates = new Map<string, TaskDeliveryState>();
const taskIdsByRunId = new Map<string, Set<string>>();
const tasksWithPendingDelivery = new Set<string>();
let listenerStarted = false;
@ -42,12 +44,13 @@ let deliveryRuntimePromise: Promise<typeof import("./task-registry-delivery-runt
null;
function cloneTaskRecord(record: TaskRecord): TaskRecord {
return { ...record };
}
function cloneTaskDeliveryState(state: TaskDeliveryState): TaskDeliveryState {
return {
...record,
...(record.requesterOrigin ? { requesterOrigin: { ...record.requesterOrigin } } : {}),
...(record.recentEvents
? { recentEvents: record.recentEvents.map((event) => ({ ...event })) }
: {}),
...state,
...(state.requesterOrigin ? { requesterOrigin: { ...state.requesterOrigin } } : {}),
};
}
@ -71,7 +74,10 @@ function emitTaskRegistryHookEvent(createEvent: () => TaskRegistryHookEvent): vo
}
function persistTaskRegistry() {
getTaskRegistryStore().saveSnapshot(tasks);
getTaskRegistryStore().saveSnapshot({
tasks,
deliveryStates: taskDeliveryStates,
});
}
function persistTaskUpsert(task: TaskRecord) {
@ -80,7 +86,10 @@ function persistTaskUpsert(task: TaskRecord) {
store.upsertTask(task);
return;
}
store.saveSnapshot(tasks);
store.saveSnapshot({
tasks,
deliveryStates: taskDeliveryStates,
});
}
function persistTaskDelete(taskId: string) {
@ -89,7 +98,34 @@ function persistTaskDelete(taskId: string) {
store.deleteTask(taskId);
return;
}
store.saveSnapshot(tasks);
store.saveSnapshot({
tasks,
deliveryStates: taskDeliveryStates,
});
}
function persistTaskDeliveryStateUpsert(state: TaskDeliveryState) {
const store = getTaskRegistryStore();
if (store.upsertDeliveryState) {
store.upsertDeliveryState(state);
return;
}
store.saveSnapshot({
tasks,
deliveryStates: taskDeliveryStates,
});
}
function persistTaskDeliveryStateDelete(taskId: string) {
const store = getTaskRegistryStore();
if (store.deleteDeliveryState) {
store.deleteDeliveryState(taskId);
return;
}
store.saveSnapshot({
tasks,
deliveryStates: taskDeliveryStates,
});
}
function ensureDeliveryStatus(requesterSessionKey: string): TaskDeliveryStatus {
@ -142,25 +178,17 @@ function resolveTaskTerminalOutcome(params: {
return params.status === "succeeded" ? "succeeded" : undefined;
}
const TASK_RECENT_EVENT_LIMIT = 12;
function appendTaskEvent(
current: TaskRecord,
event: {
at: number;
kind: TaskEventKind;
summary?: string | null;
},
): TaskEventRecord[] {
function appendTaskEvent(event: {
at: number;
kind: TaskEventKind;
summary?: string | null;
}): TaskEventRecord {
const summary = normalizeTaskSummary(event.summary);
const nextEvent: TaskEventRecord = {
return {
at: event.at,
kind: event.kind,
...(summary ? { summary } : {}),
};
const previous = current.recentEvents ?? [];
const merged = [...previous, nextEvent];
return merged.slice(-TASK_RECENT_EVENT_LIMIT);
}
function loadTaskRegistryDeliveryRuntime() {
@ -261,7 +289,7 @@ function findExistingTaskForCreate(params: {
function mergeExistingTaskForCreate(
existing: TaskRecord,
params: {
requesterOrigin?: TaskRecord["requesterOrigin"];
requesterOrigin?: TaskDeliveryState["requesterOrigin"];
sourceId?: string;
parentTaskId?: string;
agentId?: string;
@ -274,8 +302,13 @@ function mergeExistingTaskForCreate(
): TaskRecord {
const patch: Partial<TaskRecord> = {};
const requesterOrigin = normalizeDeliveryContext(params.requesterOrigin);
if (requesterOrigin && !existing.requesterOrigin) {
patch.requesterOrigin = requesterOrigin;
const currentDeliveryState = taskDeliveryStates.get(existing.taskId);
if (requesterOrigin && !currentDeliveryState?.requesterOrigin) {
upsertTaskDeliveryState({
taskId: existing.taskId,
requesterOrigin,
lastNotifiedEventAt: currentDeliveryState?.lastNotifiedEventAt,
});
}
if (params.sourceId?.trim() && !existing.sourceId?.trim()) {
patch.sourceId = params.sourceId.trim();
@ -327,12 +360,15 @@ function restoreTaskRegistryOnce() {
restoreAttempted = true;
try {
const restored = getTaskRegistryStore().loadSnapshot();
if (restored.size === 0) {
if (restored.tasks.size === 0 && restored.deliveryStates.size === 0) {
return;
}
for (const [taskId, task] of restored.entries()) {
for (const [taskId, task] of restored.tasks.entries()) {
tasks.set(taskId, task);
}
for (const [taskId, state] of restored.deliveryStates.entries()) {
taskDeliveryStates.set(taskId, state);
}
rebuildRunIdIndex();
emitTaskRegistryHookEvent(() => ({
kind: "restored",
@ -381,6 +417,30 @@ function updateTask(taskId: string, patch: Partial<TaskRecord>): TaskRecord | nu
return cloneTaskRecord(next);
}
function upsertTaskDeliveryState(state: TaskDeliveryState): TaskDeliveryState {
const current = taskDeliveryStates.get(state.taskId);
const next: TaskDeliveryState = {
taskId: state.taskId,
...(state.requesterOrigin
? { requesterOrigin: normalizeDeliveryContext(state.requesterOrigin) }
: {}),
...(state.lastNotifiedEventAt != null
? { lastNotifiedEventAt: state.lastNotifiedEventAt }
: {}),
};
if (!next.requesterOrigin && typeof next.lastNotifiedEventAt !== "number" && !current) {
return cloneTaskDeliveryState({ taskId: state.taskId });
}
taskDeliveryStates.set(state.taskId, next);
persistTaskDeliveryStateUpsert(next);
return cloneTaskDeliveryState(next);
}
function getTaskDeliveryState(taskId: string): TaskDeliveryState | undefined {
const state = taskDeliveryStates.get(taskId);
return state ? cloneTaskDeliveryState(state) : undefined;
}
function formatTaskTerminalEvent(task: TaskRecord): string {
// User-facing task notifications stay intentionally terse. Detailed runtime chatter lives
// in task metadata for inspection, not in the default channel ping.
@ -419,7 +479,7 @@ function formatTaskTerminalEvent(task: TaskRecord): string {
}
function canDeliverTaskToRequesterOrigin(task: TaskRecord): boolean {
const origin = normalizeDeliveryContext(task.requesterOrigin);
const origin = normalizeDeliveryContext(taskDeliveryStates.get(task.taskId)?.requesterOrigin);
const channel = origin?.channel?.trim();
const to = origin?.to?.trim();
return Boolean(channel && to && isDeliverableMessageChannel(channel));
@ -433,7 +493,7 @@ function queueTaskSystemEvent(task: TaskRecord, text: string) {
enqueueSystemEvent(text, {
sessionKey: requesterSessionKey,
contextKey: `task:${task.taskId}`,
deliveryContext: task.requesterOrigin,
deliveryContext: taskDeliveryStates.get(task.taskId)?.requesterOrigin,
});
requestHeartbeatNow({
reason: "background-task",
@ -462,7 +522,7 @@ function queueBlockedTaskFollowup(task: TaskRecord) {
enqueueSystemEvent(`Task needs follow-up: ${title}${runLabel}. ${summary}`, {
sessionKey: requesterSessionKey,
contextKey: `task:${task.taskId}:blocked-followup`,
deliveryContext: task.requesterOrigin,
deliveryContext: taskDeliveryStates.get(task.taskId)?.requesterOrigin,
});
requestHeartbeatNow({
reason: "background-task-blocked",
@ -579,7 +639,7 @@ export async function maybeDeliverTaskTerminalUpdate(taskId: string): Promise<Ta
}
try {
const { sendMessage } = await loadTaskRegistryDeliveryRuntime();
const origin = normalizeDeliveryContext(latest.requesterOrigin);
const origin = normalizeDeliveryContext(taskDeliveryStates.get(taskId)?.requesterOrigin);
const requesterAgentId = parseAgentSessionKey(latest.requesterSessionKey)?.agentId;
await sendMessage({
channel: origin?.channel,
@ -606,7 +666,7 @@ export async function maybeDeliverTaskTerminalUpdate(taskId: string): Promise<Ta
log.warn("Failed to deliver background task update", {
taskId,
requesterSessionKey: latest.requesterSessionKey,
requesterOrigin: latest.requesterOrigin,
requesterOrigin: taskDeliveryStates.get(taskId)?.requesterOrigin,
error,
});
try {
@ -633,14 +693,15 @@ export async function maybeDeliverTaskTerminalUpdate(taskId: string): Promise<Ta
export async function maybeDeliverTaskStateChangeUpdate(
taskId: string,
latestEvent?: TaskEventRecord,
): Promise<TaskRecord | null> {
ensureTaskRegistryReady();
const current = tasks.get(taskId);
if (!current || !shouldAutoDeliverTaskStateChange(current)) {
return current ? cloneTaskRecord(current) : null;
}
const latestEvent = current.recentEvents?.at(-1);
if (!latestEvent || (current.lastNotifiedEventAt ?? 0) >= latestEvent.at) {
const deliveryState = getTaskDeliveryState(taskId);
if (!latestEvent || (deliveryState?.lastNotifiedEventAt ?? 0) >= latestEvent.at) {
return cloneTaskRecord(current);
}
const eventText = formatTaskStateChangeEvent(current, latestEvent);
@ -650,13 +711,17 @@ export async function maybeDeliverTaskStateChangeUpdate(
try {
if (!canDeliverTaskToRequesterOrigin(current)) {
queueTaskSystemEvent(current, eventText);
return updateTask(taskId, {
upsertTaskDeliveryState({
taskId,
requesterOrigin: deliveryState?.requesterOrigin,
lastNotifiedEventAt: latestEvent.at,
});
return updateTask(taskId, {
lastEventAt: Date.now(),
});
}
const { sendMessage } = await loadTaskRegistryDeliveryRuntime();
const origin = normalizeDeliveryContext(current.requesterOrigin);
const origin = normalizeDeliveryContext(deliveryState?.requesterOrigin);
const requesterAgentId = parseAgentSessionKey(current.requesterSessionKey)?.agentId;
await sendMessage({
channel: origin?.channel,
@ -672,8 +737,12 @@ export async function maybeDeliverTaskStateChangeUpdate(
idempotencyKey: `task-event:${current.taskId}:${latestEvent.at}:${latestEvent.kind}`,
},
});
return updateTask(taskId, {
upsertTaskDeliveryState({
taskId,
requesterOrigin: deliveryState?.requesterOrigin,
lastNotifiedEventAt: latestEvent.at,
});
return updateTask(taskId, {
lastEventAt: Date.now(),
});
} catch (error) {
@ -834,21 +903,22 @@ function ensureListener() {
} else if (evt.stream === "error") {
patch.error = typeof evt.data?.error === "string" ? evt.data.error : current.error;
}
if (patch.status && patch.status !== current.status) {
patch.recentEvents = appendTaskEvent(current, {
at: now,
kind: patch.status,
summary:
patch.status === "failed"
? (patch.error ?? current.error)
: patch.status === "succeeded"
? current.terminalSummary
: undefined,
});
}
const stateChangeEvent =
patch.status && patch.status !== current.status
? appendTaskEvent({
at: now,
kind: patch.status,
summary:
patch.status === "failed"
? (patch.error ?? current.error)
: patch.status === "succeeded"
? current.terminalSummary
: undefined,
})
: undefined;
const updated = updateTask(taskId, patch);
if (updated) {
void maybeDeliverTaskStateChangeUpdate(taskId);
void maybeDeliverTaskStateChangeUpdate(taskId, stateChangeEvent);
void maybeDeliverTaskTerminalUpdate(taskId);
}
}
@ -859,7 +929,7 @@ export function createTaskRecord(params: {
runtime: TaskRuntime;
sourceId?: string;
requesterSessionKey: string;
requesterOrigin?: TaskRecord["requesterOrigin"];
requesterOrigin?: TaskDeliveryState["requesterOrigin"];
childSessionKey?: string;
parentTaskId?: string;
agentId?: string;
@ -897,7 +967,6 @@ export function createTaskRecord(params: {
runtime: params.runtime,
sourceId: params.sourceId?.trim() || undefined,
requesterSessionKey: params.requesterSessionKey,
requesterOrigin: normalizeDeliveryContext(params.requesterOrigin),
childSessionKey: params.childSessionKey,
parentTaskId: params.parentTaskId?.trim() || undefined,
agentId: params.agentId?.trim() || undefined,
@ -917,28 +986,16 @@ export function createTaskRecord(params: {
status,
terminalOutcome: params.terminalOutcome,
}),
recentEvents: appendTaskEvent(
{
taskId,
runtime: params.runtime,
requesterSessionKey: params.requesterSessionKey,
task: params.task,
status,
deliveryStatus,
notifyPolicy,
createdAt: now,
} as TaskRecord,
{
at: lastEventAt,
kind: status,
},
),
};
if (isTerminalTaskStatus(record.status) && typeof record.cleanupAfter !== "number") {
record.cleanupAfter =
(record.endedAt ?? record.lastEventAt ?? record.createdAt) + DEFAULT_TASK_RETENTION_MS;
}
tasks.set(taskId, record);
upsertTaskDeliveryState({
taskId,
requesterOrigin: normalizeDeliveryContext(params.requesterOrigin),
});
addRunIdIndex(taskId, record.runId);
persistTaskUpsert(record);
emitTaskRegistryHookEvent(() => ({
@ -951,7 +1008,7 @@ export function createTaskRecord(params: {
return cloneTaskRecord(record);
}
export function updateTaskStateByRunId(params: {
function updateTaskStateByRunId(params: {
runId: string;
status?: TaskStatus;
startedAt?: number;
@ -1014,38 +1071,95 @@ export function updateTaskStateByRunId(params: {
const shouldAppendEvent =
(params.status && params.status !== current.status) ||
Boolean(normalizeTaskSummary(params.eventSummary));
if (shouldAppendEvent) {
patch.recentEvents = appendTaskEvent(current, {
at: eventAt,
kind:
params.status && normalizeTaskStatus(params.status) !== current.status
? normalizeTaskStatus(params.status)
: "progress",
summary: eventSummary,
});
}
const nextEvent = shouldAppendEvent
? appendTaskEvent({
at: eventAt,
kind:
params.status && normalizeTaskStatus(params.status) !== current.status
? normalizeTaskStatus(params.status)
: "progress",
summary: eventSummary,
})
: undefined;
const task = updateTask(taskId, patch);
if (task) {
updated.push(task);
void maybeDeliverTaskStateChangeUpdate(task.taskId, nextEvent);
void maybeDeliverTaskTerminalUpdate(task.taskId);
}
}
for (const task of updated) {
void maybeDeliverTaskStateChangeUpdate(task.taskId);
void maybeDeliverTaskTerminalUpdate(task.taskId);
}
return updated;
}
export function updateTaskDeliveryByRunId(params: {
runId: string;
deliveryStatus: TaskDeliveryStatus;
}) {
function updateTaskDeliveryByRunId(params: { runId: string; deliveryStatus: TaskDeliveryStatus }) {
ensureTaskRegistryReady();
return updateTasksByRunId(params.runId, {
deliveryStatus: params.deliveryStatus,
});
}
export function markTaskRunningByRunId(params: {
runId: string;
startedAt?: number;
lastEventAt?: number;
progressSummary?: string | null;
eventSummary?: string | null;
}) {
return updateTaskStateByRunId({
runId: params.runId,
status: "running",
startedAt: params.startedAt,
lastEventAt: params.lastEventAt,
progressSummary: params.progressSummary,
eventSummary: params.eventSummary,
});
}
export function recordTaskProgressByRunId(params: {
runId: string;
lastEventAt?: number;
progressSummary?: string | null;
eventSummary?: string | null;
}) {
return updateTaskStateByRunId({
runId: params.runId,
lastEventAt: params.lastEventAt,
progressSummary: params.progressSummary,
eventSummary: params.eventSummary,
});
}
export function markTaskTerminalByRunId(params: {
runId: string;
status: Extract<TaskStatus, "succeeded" | "failed" | "timed_out" | "cancelled">;
startedAt?: number;
endedAt: number;
lastEventAt?: number;
error?: string;
progressSummary?: string | null;
terminalSummary?: string | null;
terminalOutcome?: TaskTerminalOutcome | null;
}) {
return updateTaskStateByRunId({
runId: params.runId,
status: params.status,
startedAt: params.startedAt,
endedAt: params.endedAt,
lastEventAt: params.lastEventAt,
error: params.error,
progressSummary: params.progressSummary,
terminalSummary: params.terminalSummary,
terminalOutcome: params.terminalOutcome,
});
}
export function setTaskRunDeliveryStatusByRunId(params: {
runId: string;
deliveryStatus: TaskDeliveryStatus;
}) {
return updateTaskDeliveryByRunId(params);
}
export function updateTaskNotifyPolicyById(params: {
taskId: string;
notifyPolicy: TaskNotifyPolicy;
@ -1122,11 +1236,6 @@ export async function cancelTaskById(params: {
endedAt: Date.now(),
lastEventAt: Date.now(),
error: "Cancelled by operator.",
recentEvents: appendTaskEvent(task, {
at: Date.now(),
kind: "cancelled",
summary: "Cancelled by operator.",
}),
});
if (updated) {
void maybeDeliverTaskTerminalUpdate(updated.taskId);
@ -1161,6 +1270,7 @@ export function getTaskRegistrySummary(): TaskRegistrySummary {
export function getTaskRegistrySnapshot(): TaskRegistrySnapshot {
return {
tasks: listTaskRecords(),
deliveryStates: [...taskDeliveryStates.values()].map((state) => cloneTaskDeliveryState(state)),
};
}
@ -1201,8 +1311,10 @@ export function deleteTaskRecordById(taskId: string): boolean {
return false;
}
tasks.delete(taskId);
taskDeliveryStates.delete(taskId);
rebuildRunIdIndex();
persistTaskDelete(taskId);
persistTaskDeliveryStateDelete(taskId);
emitTaskRegistryHookEvent(() => ({
kind: "deleted",
taskId: current.taskId,
@ -1213,7 +1325,9 @@ export function deleteTaskRecordById(taskId: string): boolean {
export function resetTaskRegistryForTests(opts?: { persist?: boolean }) {
tasks.clear();
taskDeliveryStates.clear();
taskIdsByRunId.clear();
tasksWithPendingDelivery.clear();
restoreAttempted = false;
resetTaskRegistryRuntimeForTests();
if (listenerStop) {

View File

@ -43,12 +43,17 @@ export type TaskEventRecord = {
summary?: string;
};
export type TaskDeliveryState = {
taskId: string;
requesterOrigin?: DeliveryContext;
lastNotifiedEventAt?: number;
};
export type TaskRecord = {
taskId: string;
runtime: TaskRuntime;
sourceId?: string;
requesterSessionKey: string;
requesterOrigin?: DeliveryContext;
childSessionKey?: string;
parentTaskId?: string;
agentId?: string;
@ -67,10 +72,9 @@ export type TaskRecord = {
progressSummary?: string;
terminalSummary?: string;
terminalOutcome?: TaskTerminalOutcome;
recentEvents?: TaskEventRecord[];
lastNotifiedEventAt?: number;
};
export type TaskRegistrySnapshot = {
tasks: TaskRecord[];
deliveryStates: TaskDeliveryState[];
};