refactor: share cron restart catchup harness

This commit is contained in:
Peter Steinberger 2026-03-13 20:31:03 +00:00
parent e762a57d62
commit 9dafcd417d
1 changed files with 216 additions and 268 deletions

View File

@ -47,326 +47,274 @@ describe("CronService restart catch-up", () => {
}; };
} }
it("executes an overdue recurring job immediately on start", async () => { async function withRestartedCron(
jobs: unknown[],
run: (params: {
cron: CronService;
enqueueSystemEvent: ReturnType<typeof vi.fn>;
requestHeartbeatNow: ReturnType<typeof vi.fn>;
}) => Promise<void>,
) {
const store = await makeStorePath(); const store = await makeStorePath();
const enqueueSystemEvent = vi.fn(); const enqueueSystemEvent = vi.fn();
const requestHeartbeatNow = vi.fn(); const requestHeartbeatNow = vi.fn();
await writeStoreJobs(store.storePath, jobs);
const cron = createRestartCronService({
storePath: store.storePath,
enqueueSystemEvent,
requestHeartbeatNow,
});
try {
await cron.start();
await run({ cron, enqueueSystemEvent, requestHeartbeatNow });
} finally {
cron.stop();
await store.cleanup();
}
}
it("executes an overdue recurring job immediately on start", async () => {
const dueAt = Date.parse("2025-12-13T15:00:00.000Z"); const dueAt = Date.parse("2025-12-13T15:00:00.000Z");
const lastRunAt = Date.parse("2025-12-12T15:00:00.000Z"); const lastRunAt = Date.parse("2025-12-12T15:00:00.000Z");
await writeStoreJobs(store.storePath, [ await withRestartedCron(
{ [
id: "restart-overdue-job", {
name: "daily digest", id: "restart-overdue-job",
enabled: true, name: "daily digest",
createdAtMs: Date.parse("2025-12-10T12:00:00.000Z"), enabled: true,
updatedAtMs: Date.parse("2025-12-12T15:00:00.000Z"), createdAtMs: Date.parse("2025-12-10T12:00:00.000Z"),
schedule: { kind: "cron", expr: "0 15 * * *", tz: "UTC" }, updatedAtMs: Date.parse("2025-12-12T15:00:00.000Z"),
sessionTarget: "main", schedule: { kind: "cron", expr: "0 15 * * *", tz: "UTC" },
wakeMode: "next-heartbeat", sessionTarget: "main",
payload: { kind: "systemEvent", text: "digest now" }, wakeMode: "next-heartbeat",
state: { payload: { kind: "systemEvent", text: "digest now" },
nextRunAtMs: dueAt, state: {
lastRunAtMs: lastRunAt, nextRunAtMs: dueAt,
lastStatus: "ok", lastRunAtMs: lastRunAt,
lastStatus: "ok",
},
}, },
],
async ({ cron, enqueueSystemEvent, requestHeartbeatNow }) => {
expect(enqueueSystemEvent).toHaveBeenCalledWith(
"digest now",
expect.objectContaining({ agentId: undefined }),
);
expect(requestHeartbeatNow).toHaveBeenCalled();
const listedJobs = await cron.list({ includeDisabled: true });
const updated = listedJobs.find((job) => job.id === "restart-overdue-job");
expect(updated?.state.lastStatus).toBe("ok");
expect(updated?.state.lastRunAtMs).toBe(Date.parse("2025-12-13T17:00:00.000Z"));
expect(updated?.state.nextRunAtMs).toBeGreaterThan(Date.parse("2025-12-13T17:00:00.000Z"));
}, },
]);
const cron = createRestartCronService({
storePath: store.storePath,
enqueueSystemEvent,
requestHeartbeatNow,
});
await cron.start();
expect(enqueueSystemEvent).toHaveBeenCalledWith(
"digest now",
expect.objectContaining({ agentId: undefined }),
); );
expect(requestHeartbeatNow).toHaveBeenCalled();
const jobs = await cron.list({ includeDisabled: true });
const updated = jobs.find((job) => job.id === "restart-overdue-job");
expect(updated?.state.lastStatus).toBe("ok");
expect(updated?.state.lastRunAtMs).toBe(Date.parse("2025-12-13T17:00:00.000Z"));
expect(updated?.state.nextRunAtMs).toBeGreaterThan(Date.parse("2025-12-13T17:00:00.000Z"));
cron.stop();
await store.cleanup();
}); });
it("clears stale running markers without replaying interrupted startup jobs", async () => { it("clears stale running markers without replaying interrupted startup jobs", async () => {
const store = await makeStorePath();
const enqueueSystemEvent = vi.fn();
const requestHeartbeatNow = vi.fn();
const dueAt = Date.parse("2025-12-13T16:00:00.000Z"); const dueAt = Date.parse("2025-12-13T16:00:00.000Z");
const staleRunningAt = Date.parse("2025-12-13T16:30:00.000Z"); const staleRunningAt = Date.parse("2025-12-13T16:30:00.000Z");
await writeStoreJobs(store.storePath, [ await withRestartedCron(
{ [
id: "restart-stale-running", {
name: "daily stale marker", id: "restart-stale-running",
enabled: true, name: "daily stale marker",
createdAtMs: Date.parse("2025-12-10T12:00:00.000Z"), enabled: true,
updatedAtMs: Date.parse("2025-12-13T16:30:00.000Z"), createdAtMs: Date.parse("2025-12-10T12:00:00.000Z"),
schedule: { kind: "cron", expr: "0 16 * * *", tz: "UTC" }, updatedAtMs: Date.parse("2025-12-13T16:30:00.000Z"),
sessionTarget: "main", schedule: { kind: "cron", expr: "0 16 * * *", tz: "UTC" },
wakeMode: "next-heartbeat", sessionTarget: "main",
payload: { kind: "systemEvent", text: "resume stale marker" }, wakeMode: "next-heartbeat",
state: { payload: { kind: "systemEvent", text: "resume stale marker" },
nextRunAtMs: dueAt, state: {
runningAtMs: staleRunningAt, nextRunAtMs: dueAt,
runningAtMs: staleRunningAt,
},
}, },
],
async ({ cron, enqueueSystemEvent }) => {
expect(enqueueSystemEvent).not.toHaveBeenCalled();
expect(noopLogger.warn).toHaveBeenCalledWith(
expect.objectContaining({ jobId: "restart-stale-running" }),
"cron: clearing stale running marker on startup",
);
const listedJobs = await cron.list({ includeDisabled: true });
const updated = listedJobs.find((job) => job.id === "restart-stale-running");
expect(updated?.state.runningAtMs).toBeUndefined();
expect(updated?.state.lastStatus).toBeUndefined();
expect(updated?.state.lastRunAtMs).toBeUndefined();
expect((updated?.state.nextRunAtMs ?? 0) > Date.parse("2025-12-13T17:00:00.000Z")).toBe(
true,
);
}, },
]);
const cron = createRestartCronService({
storePath: store.storePath,
enqueueSystemEvent,
requestHeartbeatNow,
});
await cron.start();
expect(enqueueSystemEvent).not.toHaveBeenCalled();
expect(noopLogger.warn).toHaveBeenCalledWith(
expect.objectContaining({ jobId: "restart-stale-running" }),
"cron: clearing stale running marker on startup",
); );
const jobs = await cron.list({ includeDisabled: true });
const updated = jobs.find((job) => job.id === "restart-stale-running");
expect(updated?.state.runningAtMs).toBeUndefined();
expect(updated?.state.lastStatus).toBeUndefined();
expect(updated?.state.lastRunAtMs).toBeUndefined();
expect((updated?.state.nextRunAtMs ?? 0) > Date.parse("2025-12-13T17:00:00.000Z")).toBe(true);
cron.stop();
await store.cleanup();
}); });
it("replays the most recent missed cron slot after restart when nextRunAtMs already advanced", async () => { it("replays the most recent missed cron slot after restart when nextRunAtMs already advanced", async () => {
vi.setSystemTime(new Date("2025-12-13T04:02:00.000Z")); vi.setSystemTime(new Date("2025-12-13T04:02:00.000Z"));
const store = await makeStorePath(); await withRestartedCron(
const enqueueSystemEvent = vi.fn(); [
const requestHeartbeatNow = vi.fn(); {
id: "restart-missed-slot",
await writeStoreJobs(store.storePath, [ name: "every ten minutes +1",
{ enabled: true,
id: "restart-missed-slot", createdAtMs: Date.parse("2025-12-10T12:00:00.000Z"),
name: "every ten minutes +1", updatedAtMs: Date.parse("2025-12-13T04:01:00.000Z"),
enabled: true, schedule: { kind: "cron", expr: "1,11,21,31,41,51 4-20 * * *", tz: "UTC" },
createdAtMs: Date.parse("2025-12-10T12:00:00.000Z"), sessionTarget: "main",
updatedAtMs: Date.parse("2025-12-13T04:01:00.000Z"), wakeMode: "next-heartbeat",
schedule: { kind: "cron", expr: "1,11,21,31,41,51 4-20 * * *", tz: "UTC" }, payload: { kind: "systemEvent", text: "catch missed slot" },
sessionTarget: "main", state: {
wakeMode: "next-heartbeat", // Persisted state may already be recomputed from restart time and
payload: { kind: "systemEvent", text: "catch missed slot" }, // point to the future slot, even though 04:01 was missed.
state: { nextRunAtMs: Date.parse("2025-12-13T04:11:00.000Z"),
// Persisted state may already be recomputed from restart time and lastRunAtMs: Date.parse("2025-12-13T03:51:00.000Z"),
// point to the future slot, even though 04:01 was missed. lastStatus: "ok",
nextRunAtMs: Date.parse("2025-12-13T04:11:00.000Z"), },
lastRunAtMs: Date.parse("2025-12-13T03:51:00.000Z"),
lastStatus: "ok",
}, },
],
async ({ cron, enqueueSystemEvent, requestHeartbeatNow }) => {
expect(enqueueSystemEvent).toHaveBeenCalledWith(
"catch missed slot",
expect.objectContaining({ agentId: undefined }),
);
expect(requestHeartbeatNow).toHaveBeenCalled();
const listedJobs = await cron.list({ includeDisabled: true });
const updated = listedJobs.find((job) => job.id === "restart-missed-slot");
expect(updated?.state.lastRunAtMs).toBe(Date.parse("2025-12-13T04:02:00.000Z"));
}, },
]);
const cron = createRestartCronService({
storePath: store.storePath,
enqueueSystemEvent,
requestHeartbeatNow,
});
await cron.start();
expect(enqueueSystemEvent).toHaveBeenCalledWith(
"catch missed slot",
expect.objectContaining({ agentId: undefined }),
); );
expect(requestHeartbeatNow).toHaveBeenCalled();
const jobs = await cron.list({ includeDisabled: true });
const updated = jobs.find((job) => job.id === "restart-missed-slot");
expect(updated?.state.lastRunAtMs).toBe(Date.parse("2025-12-13T04:02:00.000Z"));
cron.stop();
await store.cleanup();
}); });
it("does not replay interrupted one-shot jobs on startup", async () => { it("does not replay interrupted one-shot jobs on startup", async () => {
const store = await makeStorePath();
const enqueueSystemEvent = vi.fn();
const requestHeartbeatNow = vi.fn();
const dueAt = Date.parse("2025-12-13T16:00:00.000Z"); const dueAt = Date.parse("2025-12-13T16:00:00.000Z");
const staleRunningAt = Date.parse("2025-12-13T16:30:00.000Z"); const staleRunningAt = Date.parse("2025-12-13T16:30:00.000Z");
await writeStoreJobs(store.storePath, [ await withRestartedCron(
{ [
id: "restart-stale-one-shot", {
name: "one shot stale marker", id: "restart-stale-one-shot",
enabled: true, name: "one shot stale marker",
createdAtMs: Date.parse("2025-12-10T12:00:00.000Z"), enabled: true,
updatedAtMs: Date.parse("2025-12-13T16:30:00.000Z"), createdAtMs: Date.parse("2025-12-10T12:00:00.000Z"),
schedule: { kind: "at", at: "2025-12-13T16:00:00.000Z" }, updatedAtMs: Date.parse("2025-12-13T16:30:00.000Z"),
sessionTarget: "main", schedule: { kind: "at", at: "2025-12-13T16:00:00.000Z" },
wakeMode: "next-heartbeat", sessionTarget: "main",
payload: { kind: "systemEvent", text: "one-shot stale marker" }, wakeMode: "next-heartbeat",
state: { payload: { kind: "systemEvent", text: "one-shot stale marker" },
nextRunAtMs: dueAt, state: {
runningAtMs: staleRunningAt, nextRunAtMs: dueAt,
runningAtMs: staleRunningAt,
},
}, },
],
async ({ cron, enqueueSystemEvent, requestHeartbeatNow }) => {
expect(enqueueSystemEvent).not.toHaveBeenCalled();
expect(requestHeartbeatNow).not.toHaveBeenCalled();
const listedJobs = await cron.list({ includeDisabled: true });
const updated = listedJobs.find((job) => job.id === "restart-stale-one-shot");
expect(updated?.state.runningAtMs).toBeUndefined();
}, },
]); );
const cron = createRestartCronService({
storePath: store.storePath,
enqueueSystemEvent,
requestHeartbeatNow,
});
await cron.start();
expect(enqueueSystemEvent).not.toHaveBeenCalled();
expect(requestHeartbeatNow).not.toHaveBeenCalled();
const jobs = await cron.list({ includeDisabled: true });
const updated = jobs.find((job) => job.id === "restart-stale-one-shot");
expect(updated?.state.runningAtMs).toBeUndefined();
cron.stop();
await store.cleanup();
}); });
it("does not replay cron slot when the latest slot already ran before restart", async () => { it("does not replay cron slot when the latest slot already ran before restart", async () => {
vi.setSystemTime(new Date("2025-12-13T04:02:00.000Z")); vi.setSystemTime(new Date("2025-12-13T04:02:00.000Z"));
const store = await makeStorePath(); await withRestartedCron(
const enqueueSystemEvent = vi.fn(); [
const requestHeartbeatNow = vi.fn(); {
id: "restart-no-duplicate-slot",
await writeStoreJobs(store.storePath, [ name: "every ten minutes +1 no duplicate",
{ enabled: true,
id: "restart-no-duplicate-slot", createdAtMs: Date.parse("2025-12-10T12:00:00.000Z"),
name: "every ten minutes +1 no duplicate", updatedAtMs: Date.parse("2025-12-13T04:01:00.000Z"),
enabled: true, schedule: { kind: "cron", expr: "1,11,21,31,41,51 4-20 * * *", tz: "UTC" },
createdAtMs: Date.parse("2025-12-10T12:00:00.000Z"), sessionTarget: "main",
updatedAtMs: Date.parse("2025-12-13T04:01:00.000Z"), wakeMode: "next-heartbeat",
schedule: { kind: "cron", expr: "1,11,21,31,41,51 4-20 * * *", tz: "UTC" }, payload: { kind: "systemEvent", text: "already ran" },
sessionTarget: "main", state: {
wakeMode: "next-heartbeat", nextRunAtMs: Date.parse("2025-12-13T04:11:00.000Z"),
payload: { kind: "systemEvent", text: "already ran" }, lastRunAtMs: Date.parse("2025-12-13T04:01:00.000Z"),
state: { lastStatus: "ok",
nextRunAtMs: Date.parse("2025-12-13T04:11:00.000Z"), },
lastRunAtMs: Date.parse("2025-12-13T04:01:00.000Z"),
lastStatus: "ok",
}, },
],
async ({ enqueueSystemEvent, requestHeartbeatNow }) => {
expect(enqueueSystemEvent).not.toHaveBeenCalled();
expect(requestHeartbeatNow).not.toHaveBeenCalled();
}, },
]); );
const cron = createRestartCronService({
storePath: store.storePath,
enqueueSystemEvent,
requestHeartbeatNow,
});
await cron.start();
expect(enqueueSystemEvent).not.toHaveBeenCalled();
expect(requestHeartbeatNow).not.toHaveBeenCalled();
cron.stop();
await store.cleanup();
}); });
it("does not replay missed cron slots while error backoff is pending after restart", async () => { it("does not replay missed cron slots while error backoff is pending after restart", async () => {
vi.setSystemTime(new Date("2025-12-13T04:02:00.000Z")); vi.setSystemTime(new Date("2025-12-13T04:02:00.000Z"));
const store = await makeStorePath(); await withRestartedCron(
const enqueueSystemEvent = vi.fn(); [
const requestHeartbeatNow = vi.fn(); {
id: "restart-backoff-pending",
await writeStoreJobs(store.storePath, [ name: "backoff pending",
{ enabled: true,
id: "restart-backoff-pending", createdAtMs: Date.parse("2025-12-10T12:00:00.000Z"),
name: "backoff pending", updatedAtMs: Date.parse("2025-12-13T04:01:10.000Z"),
enabled: true, schedule: { kind: "cron", expr: "* * * * *", tz: "UTC" },
createdAtMs: Date.parse("2025-12-10T12:00:00.000Z"), sessionTarget: "main",
updatedAtMs: Date.parse("2025-12-13T04:01:10.000Z"), wakeMode: "next-heartbeat",
schedule: { kind: "cron", expr: "* * * * *", tz: "UTC" }, payload: { kind: "systemEvent", text: "do not run during backoff" },
sessionTarget: "main", state: {
wakeMode: "next-heartbeat", // Next retry is intentionally delayed by backoff despite a newer cron slot.
payload: { kind: "systemEvent", text: "do not run during backoff" }, nextRunAtMs: Date.parse("2025-12-13T04:10:00.000Z"),
state: { lastRunAtMs: Date.parse("2025-12-13T04:01:00.000Z"),
// Next retry is intentionally delayed by backoff despite a newer cron slot. lastStatus: "error",
nextRunAtMs: Date.parse("2025-12-13T04:10:00.000Z"), consecutiveErrors: 4,
lastRunAtMs: Date.parse("2025-12-13T04:01:00.000Z"), },
lastStatus: "error",
consecutiveErrors: 4,
}, },
],
async ({ enqueueSystemEvent, requestHeartbeatNow }) => {
expect(enqueueSystemEvent).not.toHaveBeenCalled();
expect(requestHeartbeatNow).not.toHaveBeenCalled();
}, },
]); );
const cron = createRestartCronService({
storePath: store.storePath,
enqueueSystemEvent,
requestHeartbeatNow,
});
await cron.start();
expect(enqueueSystemEvent).not.toHaveBeenCalled();
expect(requestHeartbeatNow).not.toHaveBeenCalled();
cron.stop();
await store.cleanup();
}); });
it("replays missed cron slot after restart when error backoff has already elapsed", async () => { it("replays missed cron slot after restart when error backoff has already elapsed", async () => {
vi.setSystemTime(new Date("2025-12-13T04:02:00.000Z")); vi.setSystemTime(new Date("2025-12-13T04:02:00.000Z"));
const store = await makeStorePath(); await withRestartedCron(
const enqueueSystemEvent = vi.fn(); [
const requestHeartbeatNow = vi.fn(); {
id: "restart-backoff-elapsed-replay",
await writeStoreJobs(store.storePath, [ name: "backoff elapsed replay",
{ enabled: true,
id: "restart-backoff-elapsed-replay", createdAtMs: Date.parse("2025-12-10T12:00:00.000Z"),
name: "backoff elapsed replay", updatedAtMs: Date.parse("2025-12-13T04:01:10.000Z"),
enabled: true, schedule: { kind: "cron", expr: "1,11,21,31,41,51 4-20 * * *", tz: "UTC" },
createdAtMs: Date.parse("2025-12-10T12:00:00.000Z"), sessionTarget: "main",
updatedAtMs: Date.parse("2025-12-13T04:01:10.000Z"), wakeMode: "next-heartbeat",
schedule: { kind: "cron", expr: "1,11,21,31,41,51 4-20 * * *", tz: "UTC" }, payload: { kind: "systemEvent", text: "replay after backoff elapsed" },
sessionTarget: "main", state: {
wakeMode: "next-heartbeat", // Startup maintenance may already point to a future slot (04:11) even
payload: { kind: "systemEvent", text: "replay after backoff elapsed" }, // though 04:01 was missed and the 30s error backoff has elapsed.
state: { nextRunAtMs: Date.parse("2025-12-13T04:11:00.000Z"),
// Startup maintenance may already point to a future slot (04:11) even lastRunAtMs: Date.parse("2025-12-13T03:51:00.000Z"),
// though 04:01 was missed and the 30s error backoff has elapsed. lastStatus: "error",
nextRunAtMs: Date.parse("2025-12-13T04:11:00.000Z"), consecutiveErrors: 1,
lastRunAtMs: Date.parse("2025-12-13T03:51:00.000Z"), },
lastStatus: "error",
consecutiveErrors: 1,
}, },
],
async ({ enqueueSystemEvent, requestHeartbeatNow }) => {
expect(enqueueSystemEvent).toHaveBeenCalledWith(
"replay after backoff elapsed",
expect.objectContaining({ agentId: undefined }),
);
expect(requestHeartbeatNow).toHaveBeenCalled();
}, },
]);
const cron = createRestartCronService({
storePath: store.storePath,
enqueueSystemEvent,
requestHeartbeatNow,
});
await cron.start();
expect(enqueueSystemEvent).toHaveBeenCalledWith(
"replay after backoff elapsed",
expect.objectContaining({ agentId: undefined }),
); );
expect(requestHeartbeatNow).toHaveBeenCalled();
cron.stop();
await store.cleanup();
}); });
it("reschedules deferred missed jobs from the post-catchup clock so they stay in the future", async () => { it("reschedules deferred missed jobs from the post-catchup clock so they stay in the future", async () => {