diff --git a/CHANGELOG.md b/CHANGELOG.md index 3dab830ff9f..c10f91d7430 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ Docs: https://docs.openclaw.ai ### Fixes - Telegram/local Bot API: preserve media MIME types for absolute-path downloads so local audio files still trigger transcription and other MIME-based handling. (#54603) Thanks @jzakirov +- Tasks/gateway: re-check the current task record before maintenance marks runs lost or prunes them, so a task heartbeat or cleanup update that lands during a sweep no longer gets overwritten by stale snapshot state. - Tasks/gateway: keep the task registry maintenance sweep from stalling the gateway event loop under synchronous SQLite pressure, so upgraded gateways stop hanging about a minute after startup. (#58670) Thanks @openperf ## 2026.3.31 diff --git a/src/tasks/task-registry.maintenance.ts b/src/tasks/task-registry.maintenance.ts index 55f80ab321d..c7d7bec3114 100644 --- a/src/tasks/task-registry.maintenance.ts +++ b/src/tasks/task-registry.maintenance.ts @@ -224,8 +224,12 @@ export async function runTaskRegistryMaintenance(): Promise>; + snapshotTasks: ReturnType[]; +}) { + vi.resetModules(); + vi.doMock("../acp/runtime/session-meta.js", () => ({ + readAcpSessionEntry: () => ({ entry: undefined, storeReadFailed: false }), + })); + vi.doMock("../config/sessions.js", () => ({ + loadSessionStore: () => ({}), + resolveStorePath: () => "", + })); + vi.doMock("../routing/session-key.js", () => ({ + parseAgentSessionKey: () => undefined, + })); + vi.doMock("./runtime-internal.js", () => ({ + deleteTaskRecordById: (taskId: string) => params.currentTasks.delete(taskId), + ensureTaskRegistryReady: () => {}, + getTaskById: (taskId: string) => params.currentTasks.get(taskId), + listTaskRecords: () => params.snapshotTasks, + markTaskLostById: (patch: { + taskId: string; + endedAt: number; + lastEventAt?: number; + error?: string; + cleanupAfter?: number; + }) => { + const current = params.currentTasks.get(patch.taskId); + if (!current) { + return null; + } + const next = { + ...current, + status: "lost" as const, + endedAt: patch.endedAt, + lastEventAt: patch.lastEventAt ?? patch.endedAt, + ...(patch.error !== undefined ? { error: patch.error } : {}), + ...(patch.cleanupAfter !== undefined ? { cleanupAfter: patch.cleanupAfter } : {}), + }; + params.currentTasks.set(patch.taskId, next); + return next; + }, + maybeDeliverTaskTerminalUpdate: () => false, + resolveTaskForLookupToken: () => undefined, + setTaskCleanupAfterById: (patch: { taskId: string; cleanupAfter: number }) => { + const current = params.currentTasks.get(patch.taskId); + if (!current) { + return null; + } + const next = { + ...current, + cleanupAfter: patch.cleanupAfter, + }; + params.currentTasks.set(patch.taskId, next); + return next; + }, + })); + return await import("./task-registry.maintenance.js"); +} + async function waitForAssertion(assertion: () => void, timeoutMs = 2_000, stepMs = 5) { const startedAt = Date.now(); for (;;) { @@ -1155,6 +1215,83 @@ describe("task-registry", () => { }); }); + it("rechecks current task state before marking a task lost", async () => { + const now = Date.now(); + const snapshotTask = createTaskRecord({ + runtime: "acp", + ownerKey: "agent:main:main", + scopeKind: "session", + childSessionKey: "agent:main:acp:missing-stale", + runId: "run-lost-stale", + task: "Missing child", + status: "running", + deliveryStatus: "pending", + }); + const staleTask = { + ...snapshotTask, + lastEventAt: now - 10 * 60_000, + }; + const currentTask = { + ...snapshotTask, + lastEventAt: now, + }; + const currentTasks = new Map([[snapshotTask.taskId, currentTask]]); + const { runTaskRegistryMaintenance } = await loadFreshTaskRegistryMaintenanceModuleForTest({ + currentTasks, + snapshotTasks: [staleTask], + }); + + expect(await runTaskRegistryMaintenance()).toEqual({ + reconciled: 0, + cleanupStamped: 0, + pruned: 0, + }); + expect(currentTasks.get(snapshotTask.taskId)).toMatchObject({ + status: "running", + lastEventAt: now, + }); + }); + + it("rechecks current task state before pruning a task", async () => { + const now = Date.now(); + const snapshotTask = createTaskRecord({ + runtime: "cli", + ownerKey: "agent:main:main", + scopeKind: "session", + childSessionKey: "agent:main:main", + runId: "run-prune-stale", + task: "Old completed task", + status: "succeeded", + deliveryStatus: "not_applicable", + startedAt: now - 9 * 24 * 60 * 60_000, + }); + const staleTask = { + ...snapshotTask, + endedAt: now - 8 * 24 * 60 * 60_000, + lastEventAt: now - 8 * 24 * 60 * 60_000, + cleanupAfter: now - 1, + }; + const currentTask = { + ...staleTask, + cleanupAfter: now + 60_000, + }; + const currentTasks = new Map([[snapshotTask.taskId, currentTask]]); + const { sweepTaskRegistry } = await loadFreshTaskRegistryMaintenanceModuleForTest({ + currentTasks, + snapshotTasks: [staleTask], + }); + + expect(await sweepTaskRegistry()).toEqual({ + reconciled: 0, + cleanupStamped: 0, + pruned: 0, + }); + expect(currentTasks.get(snapshotTask.taskId)).toMatchObject({ + status: "succeeded", + cleanupAfter: now + 60_000, + }); + }); + it("summarizes inspectable task audit findings", async () => { await withTaskRegistryTempDir(async (root) => { process.env.OPENCLAW_STATE_DIR = root;