refactor(tasks): distill task registry sweep scheduling

This commit is contained in:
Ayaan Zaidi 2026-04-01 09:04:25 +05:30
parent 97fd6c27a1
commit 2dbfd4ebe2
2 changed files with 49 additions and 20 deletions

View File

@ -205,6 +205,16 @@ function yieldToEventLoop(): Promise<void> {
return new Promise((resolve) => setImmediate(resolve));
}
function startScheduledSweep() {
if (sweepInProgress) {
return;
}
sweepInProgress = true;
void sweepTaskRegistry().finally(() => {
sweepInProgress = false;
});
}
export async function runTaskRegistryMaintenance(): Promise<TaskRegistryMaintenanceSummary> {
ensureTaskRegistryReady();
const now = Date.now();
@ -256,32 +266,15 @@ export async function sweepTaskRegistry(): Promise<TaskRegistryMaintenanceSummar
export function startTaskRegistryMaintenance() {
ensureTaskRegistryReady();
// Defer the first sweep to avoid blocking the critical startup window.
// Use setTimeout instead of running synchronously at startup.
deferredSweep = setTimeout(() => {
deferredSweep = null;
if (sweepInProgress) {
return;
}
sweepInProgress = true;
void sweepTaskRegistry().finally(() => {
sweepInProgress = false;
});
startScheduledSweep();
}, 5_000);
deferredSweep.unref?.();
if (sweeper) {
return;
}
sweeper = setInterval(() => {
// Prevent overlapping sweeps — if a previous async sweep is still
// running, skip this tick entirely.
if (sweepInProgress) {
return;
}
sweepInProgress = true;
void sweepTaskRegistry().finally(() => {
sweepInProgress = false;
});
}, TASK_SWEEP_INTERVAL_MS);
sweeper = setInterval(startScheduledSweep, TASK_SWEEP_INTERVAL_MS);
sweeper.unref?.();
}

View File

@ -36,6 +36,8 @@ import {
previewTaskRegistryMaintenance,
reconcileInspectableTasks,
runTaskRegistryMaintenance,
startTaskRegistryMaintenance,
stopTaskRegistryMaintenanceForTests,
sweepTaskRegistry,
} from "./task-registry.maintenance.js";
import { configureTaskRegistryRuntime } from "./task-registry.store.js";
@ -1119,6 +1121,40 @@ describe("task-registry", () => {
});
});
it("cancels the deferred maintenance sweep during test teardown", async () => {
await withTaskRegistryTempDir(async (root) => {
vi.useFakeTimers();
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
const now = Date.now();
const task = createTaskRecord({
runtime: "acp",
ownerKey: "agent:main:main",
scopeKind: "session",
childSessionKey: "agent:main:acp:missing",
runId: "run-deferred-maintenance-stop",
task: "Missing child",
status: "running",
deliveryStatus: "pending",
});
setTaskTimingById({
taskId: task.taskId,
lastEventAt: now - 10 * 60_000,
});
startTaskRegistryMaintenance();
stopTaskRegistryMaintenanceForTests();
await vi.advanceTimersByTimeAsync(5_000);
await flushAsyncWork();
expect(getTaskById(task.taskId)).toMatchObject({
status: "running",
});
});
});
it("summarizes inspectable task audit findings", async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;