test: share heartbeat scheduler helpers

This commit is contained in:
Peter Steinberger 2026-03-14 00:04:34 +00:00
parent 1243927cfb
commit ed14682d63
1 changed files with 89 additions and 90 deletions

View File

@ -4,15 +4,60 @@ import { startHeartbeatRunner } from "./heartbeat-runner.js";
import { requestHeartbeatNow, resetHeartbeatWakeStateForTests } from "./heartbeat-wake.js"; import { requestHeartbeatNow, resetHeartbeatWakeStateForTests } from "./heartbeat-wake.js";
describe("startHeartbeatRunner", () => { describe("startHeartbeatRunner", () => {
function useFakeHeartbeatTime() {
vi.useFakeTimers();
vi.setSystemTime(new Date(0));
}
function startDefaultRunner(runOnce: Parameters<typeof startHeartbeatRunner>[0]["runOnce"]) { function startDefaultRunner(runOnce: Parameters<typeof startHeartbeatRunner>[0]["runOnce"]) {
return startHeartbeatRunner({ return startHeartbeatRunner({
cfg: { cfg: heartbeatConfig(),
agents: { defaults: { heartbeat: { every: "30m" } } },
} as OpenClawConfig,
runOnce, runOnce,
}); });
} }
function heartbeatConfig(
list?: NonNullable<NonNullable<OpenClawConfig["agents"]>["list"]>,
): OpenClawConfig {
return {
agents: {
defaults: { heartbeat: { every: "30m" } },
...(list ? { list } : {}),
},
} as OpenClawConfig;
}
function createRequestsInFlightRunSpy(skipCount: number) {
let callCount = 0;
return vi.fn().mockImplementation(async () => {
callCount++;
if (callCount <= skipCount) {
return { status: "skipped", reason: "requests-in-flight" } as const;
}
return { status: "ran", durationMs: 1 } as const;
});
}
async function expectWakeDispatch(params: {
cfg: OpenClawConfig;
runSpy: ReturnType<typeof vi.fn>;
wake: { reason: string; agentId?: string; sessionKey?: string; coalesceMs: number };
expectedCall: Record<string, unknown>;
}) {
const runner = startHeartbeatRunner({
cfg: params.cfg,
runOnce: params.runSpy,
});
requestHeartbeatNow(params.wake);
await vi.advanceTimersByTimeAsync(1);
expect(params.runSpy).toHaveBeenCalledTimes(1);
expect(params.runSpy).toHaveBeenCalledWith(expect.objectContaining(params.expectedCall));
return runner;
}
afterEach(() => { afterEach(() => {
resetHeartbeatWakeStateForTests(); resetHeartbeatWakeStateForTests();
vi.useRealTimers(); vi.useRealTimers();
@ -20,8 +65,7 @@ describe("startHeartbeatRunner", () => {
}); });
it("updates scheduling when config changes without restart", async () => { it("updates scheduling when config changes without restart", async () => {
vi.useFakeTimers(); useFakeHeartbeatTime();
vi.setSystemTime(new Date(0));
const runSpy = vi.fn().mockResolvedValue({ status: "ran", durationMs: 1 }); const runSpy = vi.fn().mockResolvedValue({ status: "ran", durationMs: 1 });
@ -62,8 +106,7 @@ describe("startHeartbeatRunner", () => {
}); });
it("continues scheduling after runOnce throws an unhandled error", async () => { it("continues scheduling after runOnce throws an unhandled error", async () => {
vi.useFakeTimers(); useFakeHeartbeatTime();
vi.setSystemTime(new Date(0));
let callCount = 0; let callCount = 0;
const runSpy = vi.fn().mockImplementation(async () => { const runSpy = vi.fn().mockImplementation(async () => {
@ -89,8 +132,7 @@ describe("startHeartbeatRunner", () => {
}); });
it("cleanup is idempotent and does not clear a newer runner's handler", async () => { it("cleanup is idempotent and does not clear a newer runner's handler", async () => {
vi.useFakeTimers(); useFakeHeartbeatTime();
vi.setSystemTime(new Date(0));
const runSpy1 = vi.fn().mockResolvedValue({ status: "ran", durationMs: 1 }); const runSpy1 = vi.fn().mockResolvedValue({ status: "ran", durationMs: 1 });
const runSpy2 = vi.fn().mockResolvedValue({ status: "ran", durationMs: 1 }); const runSpy2 = vi.fn().mockResolvedValue({ status: "ran", durationMs: 1 });
@ -120,8 +162,7 @@ describe("startHeartbeatRunner", () => {
}); });
it("run() returns skipped when runner is stopped", async () => { it("run() returns skipped when runner is stopped", async () => {
vi.useFakeTimers(); useFakeHeartbeatTime();
vi.setSystemTime(new Date(0));
const runSpy = vi.fn().mockResolvedValue({ status: "ran", durationMs: 1 }); const runSpy = vi.fn().mockResolvedValue({ status: "ran", durationMs: 1 });
@ -135,22 +176,12 @@ describe("startHeartbeatRunner", () => {
}); });
it("reschedules timer when runOnce returns requests-in-flight", async () => { it("reschedules timer when runOnce returns requests-in-flight", async () => {
vi.useFakeTimers(); useFakeHeartbeatTime();
vi.setSystemTime(new Date(0));
let callCount = 0; const runSpy = createRequestsInFlightRunSpy(1);
const runSpy = vi.fn().mockImplementation(async () => {
callCount++;
if (callCount === 1) {
return { status: "skipped", reason: "requests-in-flight" };
}
return { status: "ran", durationMs: 1 };
});
const runner = startHeartbeatRunner({ const runner = startHeartbeatRunner({
cfg: { cfg: heartbeatConfig(),
agents: { defaults: { heartbeat: { every: "30m" } } },
} as OpenClawConfig,
runOnce: runSpy, runOnce: runSpy,
}); });
@ -167,24 +198,14 @@ describe("startHeartbeatRunner", () => {
}); });
it("does not push nextDueMs forward on repeated requests-in-flight skips", async () => { it("does not push nextDueMs forward on repeated requests-in-flight skips", async () => {
vi.useFakeTimers(); useFakeHeartbeatTime();
vi.setSystemTime(new Date(0));
// Simulate a long-running heartbeat: the first 5 calls return // Simulate a long-running heartbeat: the first 5 calls return
// requests-in-flight (retries from the wake layer), then the 6th succeeds. // requests-in-flight (retries from the wake layer), then the 6th succeeds.
let callCount = 0; const runSpy = createRequestsInFlightRunSpy(5);
const runSpy = vi.fn().mockImplementation(async () => {
callCount++;
if (callCount <= 5) {
return { status: "skipped", reason: "requests-in-flight" };
}
return { status: "ran", durationMs: 1 };
});
const runner = startHeartbeatRunner({ const runner = startHeartbeatRunner({
cfg: { cfg: heartbeatConfig(),
agents: { defaults: { heartbeat: { every: "30m" } } },
} as OpenClawConfig,
runOnce: runSpy, runOnce: runSpy,
}); });
@ -208,76 +229,54 @@ describe("startHeartbeatRunner", () => {
}); });
it("routes targeted wake requests to the requested agent/session", async () => { it("routes targeted wake requests to the requested agent/session", async () => {
vi.useFakeTimers(); useFakeHeartbeatTime();
vi.setSystemTime(new Date(0));
const runSpy = vi.fn().mockResolvedValue({ status: "ran", durationMs: 1 }); const runSpy = vi.fn().mockResolvedValue({ status: "ran", durationMs: 1 });
const runner = startHeartbeatRunner({ const runner = await expectWakeDispatch({
cfg: { cfg: {
agents: { ...heartbeatConfig([
defaults: { heartbeat: { every: "30m" } }, { id: "main", heartbeat: { every: "30m" } },
list: [ { id: "ops", heartbeat: { every: "15m" } },
{ id: "main", heartbeat: { every: "30m" } }, ]),
{ id: "ops", heartbeat: { every: "15m" } },
],
},
} as OpenClawConfig, } as OpenClawConfig,
runOnce: runSpy, runSpy,
}); wake: {
reason: "cron:job-123",
requestHeartbeatNow({ agentId: "ops",
reason: "cron:job-123", sessionKey: "agent:ops:discord:channel:alerts",
agentId: "ops", coalesceMs: 0,
sessionKey: "agent:ops:discord:channel:alerts", },
coalesceMs: 0, expectedCall: {
});
await vi.advanceTimersByTimeAsync(1);
expect(runSpy).toHaveBeenCalledTimes(1);
expect(runSpy).toHaveBeenCalledWith(
expect.objectContaining({
agentId: "ops", agentId: "ops",
reason: "cron:job-123", reason: "cron:job-123",
sessionKey: "agent:ops:discord:channel:alerts", sessionKey: "agent:ops:discord:channel:alerts",
}), },
); });
runner.stop(); runner.stop();
}); });
it("does not fan out to unrelated agents for session-scoped exec wakes", async () => { it("does not fan out to unrelated agents for session-scoped exec wakes", async () => {
vi.useFakeTimers(); useFakeHeartbeatTime();
vi.setSystemTime(new Date(0));
const runSpy = vi.fn().mockResolvedValue({ status: "ran", durationMs: 1 }); const runSpy = vi.fn().mockResolvedValue({ status: "ran", durationMs: 1 });
const runner = startHeartbeatRunner({ const runner = await expectWakeDispatch({
cfg: { cfg: {
agents: { ...heartbeatConfig([
defaults: { heartbeat: { every: "30m" } }, { id: "main", heartbeat: { every: "30m" } },
list: [ { id: "finance", heartbeat: { every: "30m" } },
{ id: "main", heartbeat: { every: "30m" } }, ]),
{ id: "finance", heartbeat: { every: "30m" } },
],
},
} as OpenClawConfig, } as OpenClawConfig,
runOnce: runSpy, runSpy,
}); wake: {
reason: "exec-event",
requestHeartbeatNow({ sessionKey: "agent:main:main",
reason: "exec-event", coalesceMs: 0,
sessionKey: "agent:main:main", },
coalesceMs: 0, expectedCall: {
});
await vi.advanceTimersByTimeAsync(1);
expect(runSpy).toHaveBeenCalledTimes(1);
expect(runSpy).toHaveBeenCalledWith(
expect.objectContaining({
agentId: "main", agentId: "main",
reason: "exec-event", reason: "exec-event",
sessionKey: "agent:main:main", sessionKey: "agent:main:main",
}), },
); });
expect(runSpy.mock.calls.some((call) => call[0]?.agentId === "finance")).toBe(false); expect(runSpy.mock.calls.some((call) => call[0]?.agentId === "finance")).toBe(false);
runner.stop(); runner.stop();