fix(tasks): recheck current state during maintenance sweep

This commit is contained in:
Ayaan Zaidi 2026-04-01 09:25:31 +05:30
parent ccb67bd4bf
commit 2c5796c924
No known key found for this signature in database
3 changed files with 148 additions and 6 deletions

View File

@ -12,6 +12,7 @@ Docs: https://docs.openclaw.ai
### Fixes
- Telegram/local Bot API: preserve media MIME types for absolute-path downloads so local audio files still trigger transcription and other MIME-based handling. (#54603) Thanks @jzakirov
- Tasks/gateway: re-check the current task record before maintenance marks runs lost or prunes them, so a task heartbeat or cleanup update that lands during a sweep no longer gets overwritten by stale snapshot state.
- Tasks/gateway: keep the task registry maintenance sweep from stalling the gateway event loop under synchronous SQLite pressure, so upgraded gateways stop hanging about a minute after startup. (#58670) Thanks @openperf
## 2026.3.31

View File

@ -224,8 +224,12 @@ export async function runTaskRegistryMaintenance(): Promise<TaskRegistryMaintena
const tasks = listTaskRecords();
let processed = 0;
for (const task of tasks) {
if (shouldMarkLost(task, now)) {
const next = markTaskLost(task, now);
const current = getTaskById(task.taskId);
if (!current) {
continue;
}
if (shouldMarkLost(current, now)) {
const next = markTaskLost(current, now);
if (next.status === "lost") {
reconciled += 1;
}
@ -235,7 +239,7 @@ export async function runTaskRegistryMaintenance(): Promise<TaskRegistryMaintena
}
continue;
}
if (shouldPruneTerminalTask(task, now) && deleteTaskRecordById(task.taskId)) {
if (shouldPruneTerminalTask(current, now) && deleteTaskRecordById(current.taskId)) {
pruned += 1;
processed += 1;
if (processed % SWEEP_YIELD_BATCH_SIZE === 0) {
@ -244,10 +248,10 @@ export async function runTaskRegistryMaintenance(): Promise<TaskRegistryMaintena
continue;
}
if (
shouldStampCleanupAfter(task) &&
shouldStampCleanupAfter(current) &&
setTaskCleanupAfterById({
taskId: task.taskId,
cleanupAfter: resolveCleanupAfter(task),
taskId: current.taskId,
cleanupAfter: resolveCleanupAfter(current),
})
) {
cleanupStamped += 1;

View File

@ -84,6 +84,66 @@ async function loadFreshTaskRegistryModulesForControlTest() {
return await import("./task-registry.js");
}
async function loadFreshTaskRegistryMaintenanceModuleForTest(params: {
currentTasks: Map<string, ReturnType<typeof createTaskRecord>>;
snapshotTasks: ReturnType<typeof createTaskRecord>[];
}) {
vi.resetModules();
vi.doMock("../acp/runtime/session-meta.js", () => ({
readAcpSessionEntry: () => ({ entry: undefined, storeReadFailed: false }),
}));
vi.doMock("../config/sessions.js", () => ({
loadSessionStore: () => ({}),
resolveStorePath: () => "",
}));
vi.doMock("../routing/session-key.js", () => ({
parseAgentSessionKey: () => undefined,
}));
vi.doMock("./runtime-internal.js", () => ({
deleteTaskRecordById: (taskId: string) => params.currentTasks.delete(taskId),
ensureTaskRegistryReady: () => {},
getTaskById: (taskId: string) => params.currentTasks.get(taskId),
listTaskRecords: () => params.snapshotTasks,
markTaskLostById: (patch: {
taskId: string;
endedAt: number;
lastEventAt?: number;
error?: string;
cleanupAfter?: number;
}) => {
const current = params.currentTasks.get(patch.taskId);
if (!current) {
return null;
}
const next = {
...current,
status: "lost" as const,
endedAt: patch.endedAt,
lastEventAt: patch.lastEventAt ?? patch.endedAt,
...(patch.error !== undefined ? { error: patch.error } : {}),
...(patch.cleanupAfter !== undefined ? { cleanupAfter: patch.cleanupAfter } : {}),
};
params.currentTasks.set(patch.taskId, next);
return next;
},
maybeDeliverTaskTerminalUpdate: () => false,
resolveTaskForLookupToken: () => undefined,
setTaskCleanupAfterById: (patch: { taskId: string; cleanupAfter: number }) => {
const current = params.currentTasks.get(patch.taskId);
if (!current) {
return null;
}
const next = {
...current,
cleanupAfter: patch.cleanupAfter,
};
params.currentTasks.set(patch.taskId, next);
return next;
},
}));
return await import("./task-registry.maintenance.js");
}
async function waitForAssertion(assertion: () => void, timeoutMs = 2_000, stepMs = 5) {
const startedAt = Date.now();
for (;;) {
@ -1155,6 +1215,83 @@ describe("task-registry", () => {
});
});
it("rechecks current task state before marking a task lost", async () => {
const now = Date.now();
const snapshotTask = createTaskRecord({
runtime: "acp",
ownerKey: "agent:main:main",
scopeKind: "session",
childSessionKey: "agent:main:acp:missing-stale",
runId: "run-lost-stale",
task: "Missing child",
status: "running",
deliveryStatus: "pending",
});
const staleTask = {
...snapshotTask,
lastEventAt: now - 10 * 60_000,
};
const currentTask = {
...snapshotTask,
lastEventAt: now,
};
const currentTasks = new Map([[snapshotTask.taskId, currentTask]]);
const { runTaskRegistryMaintenance } = await loadFreshTaskRegistryMaintenanceModuleForTest({
currentTasks,
snapshotTasks: [staleTask],
});
expect(await runTaskRegistryMaintenance()).toEqual({
reconciled: 0,
cleanupStamped: 0,
pruned: 0,
});
expect(currentTasks.get(snapshotTask.taskId)).toMatchObject({
status: "running",
lastEventAt: now,
});
});
it("rechecks current task state before pruning a task", async () => {
const now = Date.now();
const snapshotTask = createTaskRecord({
runtime: "cli",
ownerKey: "agent:main:main",
scopeKind: "session",
childSessionKey: "agent:main:main",
runId: "run-prune-stale",
task: "Old completed task",
status: "succeeded",
deliveryStatus: "not_applicable",
startedAt: now - 9 * 24 * 60 * 60_000,
});
const staleTask = {
...snapshotTask,
endedAt: now - 8 * 24 * 60 * 60_000,
lastEventAt: now - 8 * 24 * 60 * 60_000,
cleanupAfter: now - 1,
};
const currentTask = {
...staleTask,
cleanupAfter: now + 60_000,
};
const currentTasks = new Map([[snapshotTask.taskId, currentTask]]);
const { sweepTaskRegistry } = await loadFreshTaskRegistryMaintenanceModuleForTest({
currentTasks,
snapshotTasks: [staleTask],
});
expect(await sweepTaskRegistry()).toEqual({
reconciled: 0,
cleanupStamped: 0,
pruned: 0,
});
expect(currentTasks.get(snapshotTask.taskId)).toMatchObject({
status: "succeeded",
cleanupAfter: now + 60_000,
});
});
it("summarizes inspectable task audit findings", async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;