From 66413487c8de260d089cd63ea77386fbf8b13bf7 Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Wed, 1 Apr 2026 03:23:06 +0900 Subject: [PATCH] fix(tasks): make task-store writes atomic (#58521) --- src/tasks/task-registry.store.sqlite.ts | 23 ++++++- src/tasks/task-registry.store.test.ts | 50 ++++++++++++++ src/tasks/task-registry.store.ts | 9 +++ src/tasks/task-registry.ts | 87 +++++++++++-------------- 4 files changed, 120 insertions(+), 49 deletions(-) diff --git a/src/tasks/task-registry.store.sqlite.ts b/src/tasks/task-registry.store.sqlite.ts index e6bdf12b0f0..e3c9a0f5fa5 100644 --- a/src/tasks/task-registry.store.sqlite.ts +++ b/src/tasks/task-registry.store.sqlite.ts @@ -403,7 +403,7 @@ function openTaskRegistryDatabase(): TaskRegistryDatabase { const { DatabaseSync } = requireNodeSqlite(); const db = new DatabaseSync(pathname); db.exec(`PRAGMA journal_mode = WAL;`); - db.exec(`PRAGMA synchronous = NORMAL;`); + db.exec(`PRAGMA synchronous = FULL;`); db.exec(`PRAGMA busy_timeout = 5000;`); ensureSchema(db); ensureTaskRegistryPermissions(pathname); @@ -457,6 +457,20 @@ export function upsertTaskRegistryRecordToSqlite(task: TaskRecord) { ensureTaskRegistryPermissions(store.path); } +export function upsertTaskWithDeliveryStateToSqlite(params: { + task: TaskRecord; + deliveryState?: TaskDeliveryState; +}) { + withWriteTransaction((statements) => { + statements.upsertRow.run(bindTaskRecord(params.task)); + if (params.deliveryState) { + statements.replaceDeliveryState.run(bindTaskDeliveryState(params.deliveryState)); + } else { + statements.deleteDeliveryState.run(params.task.taskId); + } + }); +} + export function deleteTaskRegistryRecordFromSqlite(taskId: string) { const store = openTaskRegistryDatabase(); store.statements.deleteRow.run(taskId); @@ -464,6 +478,13 @@ export function deleteTaskRegistryRecordFromSqlite(taskId: string) { ensureTaskRegistryPermissions(store.path); } +export function deleteTaskAndDeliveryStateFromSqlite(taskId: string) { + withWriteTransaction((statements) => { + statements.deleteRow.run(taskId); + statements.deleteDeliveryState.run(taskId); + }); +} + export function upsertTaskDeliveryStateToSqlite(state: TaskDeliveryState) { const store = openTaskRegistryDatabase(); store.statements.replaceDeliveryState.run(bindTaskDeliveryState(state)); diff --git a/src/tasks/task-registry.store.test.ts b/src/tasks/task-registry.store.test.ts index 09de084ddaa..3ef4025db69 100644 --- a/src/tasks/task-registry.store.test.ts +++ b/src/tasks/task-registry.store.test.ts @@ -7,6 +7,7 @@ import { createTaskRecord, deleteTaskRecordById, findTaskByRunId, + maybeDeliverTaskStateChangeUpdate, resetTaskRegistryForTests, } from "./task-registry.js"; import { resolveTaskRegistryDir, resolveTaskRegistrySqlitePath } from "./task-registry.paths.js"; @@ -121,6 +122,55 @@ describe("task-registry store runtime", () => { }); }); + it("uses atomic task-plus-delivery store methods when available", async () => { + const upsertTaskWithDeliveryState = vi.fn(); + const deleteTaskWithDeliveryState = vi.fn(); + configureTaskRegistryRuntime({ + store: { + loadSnapshot: () => ({ + tasks: new Map(), + deliveryStates: new Map(), + }), + saveSnapshot: vi.fn(), + upsertTaskWithDeliveryState, + deleteTaskWithDeliveryState, + }, + }); + + const created = createTaskRecord({ + runtime: "acp", + ownerKey: "agent:main:main", + scopeKind: "session", + childSessionKey: "agent:codex:acp:new", + runId: "run-atomic", + task: "Atomic task", + status: "running", + notifyPolicy: "state_changes", + deliveryStatus: "pending", + }); + + await maybeDeliverTaskStateChangeUpdate(created.taskId, { + at: 200, + kind: "progress", + summary: "working", + }); + expect(deleteTaskRecordById(created.taskId)).toBe(true); + + expect(upsertTaskWithDeliveryState).toHaveBeenCalled(); + expect(upsertTaskWithDeliveryState.mock.calls[0]?.[0]).toMatchObject({ + task: expect.objectContaining({ + taskId: created.taskId, + }), + }); + expect( + upsertTaskWithDeliveryState.mock.calls.some((call) => { + const params = call[0] as { deliveryState?: { lastNotifiedEventAt?: number } }; + return params.deliveryState?.lastNotifiedEventAt === 200; + }), + ).toBe(true); + expect(deleteTaskWithDeliveryState).toHaveBeenCalledWith(created.taskId); + }); + it("restores persisted tasks from the default sqlite store", () => { const created = createTaskRecord({ runtime: "cron", diff --git a/src/tasks/task-registry.store.ts b/src/tasks/task-registry.store.ts index a075272a1b0..ba23bd91b2f 100644 --- a/src/tasks/task-registry.store.ts +++ b/src/tasks/task-registry.store.ts @@ -1,9 +1,11 @@ import { closeTaskRegistrySqliteStore, + deleteTaskAndDeliveryStateFromSqlite, deleteTaskDeliveryStateFromSqlite, deleteTaskRegistryRecordFromSqlite, loadTaskRegistryStateFromSqlite, saveTaskRegistryStateToSqlite, + upsertTaskWithDeliveryStateToSqlite, upsertTaskDeliveryStateToSqlite, upsertTaskRegistryRecordToSqlite, } from "./task-registry.store.sqlite.js"; @@ -17,7 +19,12 @@ export type TaskRegistryStoreSnapshot = { export type TaskRegistryStore = { loadSnapshot: () => TaskRegistryStoreSnapshot; saveSnapshot: (snapshot: TaskRegistryStoreSnapshot) => void; + upsertTaskWithDeliveryState?: (params: { + task: TaskRecord; + deliveryState?: TaskDeliveryState; + }) => void; upsertTask?: (task: TaskRecord) => void; + deleteTaskWithDeliveryState?: (taskId: string) => void; deleteTask?: (taskId: string) => void; upsertDeliveryState?: (state: TaskDeliveryState) => void; deleteDeliveryState?: (taskId: string) => void; @@ -48,7 +55,9 @@ export type TaskRegistryHooks = { const defaultTaskRegistryStore: TaskRegistryStore = { loadSnapshot: loadTaskRegistryStateFromSqlite, saveSnapshot: saveTaskRegistryStateToSqlite, + upsertTaskWithDeliveryState: upsertTaskWithDeliveryStateToSqlite, upsertTask: upsertTaskRegistryRecordToSqlite, + deleteTaskWithDeliveryState: deleteTaskAndDeliveryStateFromSqlite, deleteTask: deleteTaskRegistryRecordFromSqlite, upsertDeliveryState: upsertTaskDeliveryStateToSqlite, deleteDeliveryState: deleteTaskDeliveryStateFromSqlite, diff --git a/src/tasks/task-registry.ts b/src/tasks/task-registry.ts index d97a69e6962..eb6d6775766 100644 --- a/src/tasks/task-registry.ts +++ b/src/tasks/task-registry.ts @@ -78,6 +78,24 @@ function cloneTaskDeliveryState(state: TaskDeliveryState): TaskDeliveryState { }; } +function setTaskDeliveryStateInMemory(state: TaskDeliveryState): TaskDeliveryState { + const current = taskDeliveryStates.get(state.taskId); + const next: TaskDeliveryState = { + taskId: state.taskId, + ...(state.requesterOrigin + ? { requesterOrigin: normalizeDeliveryContext(state.requesterOrigin) } + : {}), + ...(state.lastNotifiedEventAt != null + ? { lastNotifiedEventAt: state.lastNotifiedEventAt } + : {}), + }; + if (!next.requesterOrigin && typeof next.lastNotifiedEventAt !== "number" && !current) { + return cloneTaskDeliveryState({ taskId: state.taskId }); + } + taskDeliveryStates.set(state.taskId, next); + return cloneTaskDeliveryState(next); +} + function snapshotTaskRecords(source: ReadonlyMap): TaskRecord[] { return [...source.values()].map((record) => cloneTaskRecord(record)); } @@ -106,6 +124,14 @@ function persistTaskRegistry() { function persistTaskUpsert(task: TaskRecord) { const store = getTaskRegistryStore(); + const deliveryState = taskDeliveryStates.get(task.taskId); + if (store.upsertTaskWithDeliveryState) { + store.upsertTaskWithDeliveryState({ + task, + ...(deliveryState ? { deliveryState } : {}), + }); + return; + } if (store.upsertTask) { store.upsertTask(task); return; @@ -118,6 +144,10 @@ function persistTaskUpsert(task: TaskRecord) { function persistTaskDelete(taskId: string) { const store = getTaskRegistryStore(); + if (store.deleteTaskWithDeliveryState) { + store.deleteTaskWithDeliveryState(taskId); + return; + } if (store.deleteTask) { store.deleteTask(taskId); return; @@ -128,30 +158,6 @@ function persistTaskDelete(taskId: string) { }); } -function persistTaskDeliveryStateUpsert(state: TaskDeliveryState) { - const store = getTaskRegistryStore(); - if (store.upsertDeliveryState) { - store.upsertDeliveryState(state); - return; - } - store.saveSnapshot({ - tasks, - deliveryStates: taskDeliveryStates, - }); -} - -function persistTaskDeliveryStateDelete(taskId: string) { - const store = getTaskRegistryStore(); - if (store.deleteDeliveryState) { - store.deleteDeliveryState(taskId); - return; - } - store.saveSnapshot({ - tasks, - deliveryStates: taskDeliveryStates, - }); -} - function ensureDeliveryStatus(params: { ownerKey: string; scopeKind: TaskScopeKind; @@ -435,12 +441,14 @@ function mergeExistingTaskForCreate( const patch: Partial = {}; const requesterOrigin = normalizeDeliveryContext(params.requesterOrigin); const currentDeliveryState = taskDeliveryStates.get(existing.taskId); + let deliveryStateChanged = false; if (requesterOrigin && !currentDeliveryState?.requesterOrigin) { - upsertTaskDeliveryState({ + setTaskDeliveryStateInMemory({ taskId: existing.taskId, requesterOrigin, lastNotifiedEventAt: currentDeliveryState?.lastNotifiedEventAt, }); + deliveryStateChanged = true; } if (params.sourceId?.trim() && !existing.sourceId?.trim()) { patch.sourceId = params.sourceId.trim(); @@ -476,6 +484,9 @@ function mergeExistingTaskForCreate( patch.notifyPolicy = notifyPolicy; } if (Object.keys(patch).length === 0) { + if (deliveryStateChanged) { + persistTaskUpsert(existing); + } return cloneTaskRecord(existing); } return updateTask(existing.taskId, patch) ?? cloneTaskRecord(existing); @@ -574,25 +585,6 @@ function updateTask(taskId: string, patch: Partial): TaskRecord | nu return cloneTaskRecord(next); } -function upsertTaskDeliveryState(state: TaskDeliveryState): TaskDeliveryState { - const current = taskDeliveryStates.get(state.taskId); - const next: TaskDeliveryState = { - taskId: state.taskId, - ...(state.requesterOrigin - ? { requesterOrigin: normalizeDeliveryContext(state.requesterOrigin) } - : {}), - ...(state.lastNotifiedEventAt != null - ? { lastNotifiedEventAt: state.lastNotifiedEventAt } - : {}), - }; - if (!next.requesterOrigin && typeof next.lastNotifiedEventAt !== "number" && !current) { - return cloneTaskDeliveryState({ taskId: state.taskId }); - } - taskDeliveryStates.set(state.taskId, next); - persistTaskDeliveryStateUpsert(next); - return cloneTaskDeliveryState(next); -} - function getTaskDeliveryState(taskId: string): TaskDeliveryState | undefined { const state = taskDeliveryStates.get(taskId); return state ? cloneTaskDeliveryState(state) : undefined; @@ -788,7 +780,7 @@ export async function maybeDeliverTaskStateChangeUpdate( } if (!canDeliverTaskToRequesterOrigin(current)) { queueTaskSystemEvent(current, eventText); - upsertTaskDeliveryState({ + setTaskDeliveryStateInMemory({ taskId, requesterOrigin: deliveryState?.requesterOrigin, lastNotifiedEventAt: latestEvent.at, @@ -818,7 +810,7 @@ export async function maybeDeliverTaskStateChangeUpdate( idempotencyKey, }, }); - upsertTaskDeliveryState({ + setTaskDeliveryStateInMemory({ taskId, requesterOrigin: deliveryState?.requesterOrigin, lastNotifiedEventAt: latestEvent.at, @@ -1086,7 +1078,7 @@ export function createTaskRecord(params: { (record.endedAt ?? record.lastEventAt ?? record.createdAt) + DEFAULT_TASK_RETENTION_MS; } tasks.set(taskId, record); - upsertTaskDeliveryState({ + setTaskDeliveryStateInMemory({ taskId, requesterOrigin: normalizeDeliveryContext(params.requesterOrigin), }); @@ -1454,7 +1446,6 @@ export function deleteTaskRecordById(taskId: string): boolean { taskDeliveryStates.delete(taskId); rebuildRunIdIndex(); persistTaskDelete(taskId); - persistTaskDeliveryStateDelete(taskId); emitTaskRegistryHookEvent(() => ({ kind: "deleted", taskId: current.taskId,