mirror of https://github.com/openclaw/openclaw.git
refactor: move cron legacy delivery migration to doctor
This commit is contained in:
parent
31ed09bc96
commit
802bdb099e
|
|
@ -246,4 +246,47 @@ describe("maybeRepairLegacyCronStore", () => {
|
|||
to: "https://example.invalid/cron-finished",
|
||||
});
|
||||
});
|
||||
|
||||
it("repairs legacy root delivery threadId hints into delivery", async () => {
|
||||
const storePath = await makeTempStorePath();
|
||||
await writeCronStore(storePath, [
|
||||
{
|
||||
id: "legacy-thread-hint",
|
||||
name: "Legacy thread hint",
|
||||
enabled: true,
|
||||
createdAtMs: Date.parse("2026-02-01T00:00:00.000Z"),
|
||||
updatedAtMs: Date.parse("2026-02-02T00:00:00.000Z"),
|
||||
schedule: { kind: "cron", cron: "0 7 * * *", tz: "UTC" },
|
||||
sessionTarget: "isolated",
|
||||
wakeMode: "now",
|
||||
payload: {
|
||||
kind: "agentTurn",
|
||||
message: "Morning brief",
|
||||
},
|
||||
channel: " telegram ",
|
||||
to: "-1001234567890",
|
||||
threadId: " 99 ",
|
||||
state: {},
|
||||
},
|
||||
]);
|
||||
|
||||
await maybeRepairLegacyCronStore({
|
||||
cfg: createCronConfig(storePath),
|
||||
options: {},
|
||||
prompter: makePrompter(true),
|
||||
});
|
||||
|
||||
const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")) as {
|
||||
jobs: Array<Record<string, unknown>>;
|
||||
};
|
||||
expect(persisted.jobs[0]?.channel).toBeUndefined();
|
||||
expect(persisted.jobs[0]?.to).toBeUndefined();
|
||||
expect(persisted.jobs[0]?.threadId).toBeUndefined();
|
||||
expect(persisted.jobs[0]?.delivery).toMatchObject({
|
||||
mode: "announce",
|
||||
channel: "telegram",
|
||||
to: "-1001234567890",
|
||||
threadId: "99",
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -32,6 +32,10 @@ function expectAnnounceDeliveryTarget(
|
|||
function expectPayloadDeliveryHintsCleared(payload: Record<string, unknown>): void {
|
||||
expect(payload.channel).toBeUndefined();
|
||||
expect(payload.deliver).toBeUndefined();
|
||||
expect(payload.to).toBeUndefined();
|
||||
expect(payload.threadId).toBeUndefined();
|
||||
expect(payload.bestEffortDeliver).toBeUndefined();
|
||||
expect(payload.provider).toBeUndefined();
|
||||
}
|
||||
|
||||
function normalizeIsolatedAgentTurnCreateJob(params: {
|
||||
|
|
@ -72,7 +76,7 @@ function normalizeMainSystemEventCreateJob(params: {
|
|||
}
|
||||
|
||||
describe("normalizeCronJobCreate", () => {
|
||||
it("maps legacy payload.provider to payload.channel and strips provider", () => {
|
||||
it("strips payload-level legacy delivery hints from live input", () => {
|
||||
const normalized = normalizeIsolatedAgentTurnCreateJob({
|
||||
name: "legacy",
|
||||
payload: {
|
||||
|
|
@ -84,10 +88,9 @@ describe("normalizeCronJobCreate", () => {
|
|||
|
||||
const payload = normalized.payload as Record<string, unknown>;
|
||||
expectPayloadDeliveryHintsCleared(payload);
|
||||
expect("provider" in payload).toBe(false);
|
||||
|
||||
const delivery = normalized.delivery as Record<string, unknown>;
|
||||
expectAnnounceDeliveryTarget(delivery, { channel: "telegram", to: "7200373102" });
|
||||
expect(delivery).toEqual({ mode: "announce" });
|
||||
});
|
||||
|
||||
it("trims agentId and drops null", () => {
|
||||
|
|
@ -146,19 +149,52 @@ describe("normalizeCronJobCreate", () => {
|
|||
expect("sessionKey" in cleared).toBe(false);
|
||||
});
|
||||
|
||||
it("canonicalizes payload.channel casing", () => {
|
||||
it("strips top-level legacy delivery hints from live input", () => {
|
||||
const normalized = normalizeIsolatedAgentTurnCreateJob({
|
||||
name: "legacy provider",
|
||||
name: "legacy top-level delivery",
|
||||
payload: {
|
||||
deliver: true,
|
||||
kind: "agentTurn",
|
||||
message: "hi",
|
||||
},
|
||||
delivery: undefined,
|
||||
});
|
||||
|
||||
const withLegacyTopLevel = normalizeCronJobCreate({
|
||||
name: "legacy top-level delivery",
|
||||
enabled: true,
|
||||
schedule: { kind: "cron", expr: "* * * * *" },
|
||||
sessionTarget: "isolated",
|
||||
wakeMode: "now",
|
||||
payload: {
|
||||
kind: "agentTurn",
|
||||
message: "hi",
|
||||
},
|
||||
deliver: false,
|
||||
channel: "Telegram",
|
||||
to: "-1001234567890",
|
||||
threadId: " 99 ",
|
||||
}) as unknown as Record<string, unknown>;
|
||||
|
||||
expect(normalized.delivery).toEqual({ mode: "announce" });
|
||||
expect(withLegacyTopLevel.deliver).toBeUndefined();
|
||||
expect(withLegacyTopLevel.channel).toBeUndefined();
|
||||
expect(withLegacyTopLevel.to).toBeUndefined();
|
||||
expect(withLegacyTopLevel.threadId).toBeUndefined();
|
||||
|
||||
const delivery = withLegacyTopLevel.delivery as Record<string, unknown>;
|
||||
expect(delivery).toEqual({ mode: "announce" });
|
||||
});
|
||||
|
||||
it("canonicalizes delivery.channel casing", () => {
|
||||
const normalized = normalizeIsolatedAgentTurnCreateJob({
|
||||
name: "delivery channel casing",
|
||||
delivery: {
|
||||
mode: "announce",
|
||||
channel: "Telegram",
|
||||
to: "7200373102",
|
||||
},
|
||||
});
|
||||
|
||||
const payload = normalized.payload as Record<string, unknown>;
|
||||
expectPayloadDeliveryHintsCleared(payload);
|
||||
|
||||
const delivery = normalized.delivery as Record<string, unknown>;
|
||||
expectAnnounceDeliveryTarget(delivery, { channel: "telegram", to: "7200373102" });
|
||||
});
|
||||
|
|
@ -316,64 +352,6 @@ describe("normalizeCronJobCreate", () => {
|
|||
expect(delivery.mode).toBe("announce");
|
||||
});
|
||||
|
||||
it("migrates legacy delivery fields to delivery", () => {
|
||||
const normalized = normalizeCronJobCreate({
|
||||
name: "legacy deliver",
|
||||
enabled: true,
|
||||
schedule: { kind: "cron", expr: "* * * * *" },
|
||||
payload: {
|
||||
kind: "agentTurn",
|
||||
message: "hi",
|
||||
deliver: true,
|
||||
channel: "telegram",
|
||||
to: "7200373102",
|
||||
bestEffortDeliver: true,
|
||||
},
|
||||
}) as unknown as Record<string, unknown>;
|
||||
|
||||
const delivery = normalized.delivery as Record<string, unknown>;
|
||||
expectAnnounceDeliveryTarget(delivery, { channel: "telegram", to: "7200373102" });
|
||||
expect(delivery.bestEffort).toBe(true);
|
||||
});
|
||||
|
||||
it("migrates legacy top-level threadId hints into delivery", () => {
|
||||
const normalized = normalizeCronJobCreate({
|
||||
name: "legacy root thread",
|
||||
enabled: true,
|
||||
schedule: { kind: "cron", expr: "* * * * *" },
|
||||
payload: {
|
||||
kind: "agentTurn",
|
||||
message: "hi",
|
||||
},
|
||||
channel: "telegram",
|
||||
to: "-1001234567890",
|
||||
threadId: " 99 ",
|
||||
}) as unknown as Record<string, unknown>;
|
||||
|
||||
const delivery = normalized.delivery as Record<string, unknown>;
|
||||
expectAnnounceDeliveryTarget(delivery, { channel: "telegram", to: "-1001234567890" });
|
||||
expect(delivery.threadId).toBe("99");
|
||||
expect(normalized.threadId).toBeUndefined();
|
||||
});
|
||||
|
||||
it("maps legacy deliver=false to delivery none", () => {
|
||||
const normalized = normalizeCronJobCreate({
|
||||
name: "legacy off",
|
||||
enabled: true,
|
||||
schedule: { kind: "cron", expr: "* * * * *" },
|
||||
payload: {
|
||||
kind: "agentTurn",
|
||||
message: "hi",
|
||||
deliver: false,
|
||||
channel: "telegram",
|
||||
to: "7200373102",
|
||||
},
|
||||
}) as unknown as Record<string, unknown>;
|
||||
|
||||
const delivery = normalized.delivery as Record<string, unknown>;
|
||||
expect(delivery.mode).toBe("none");
|
||||
});
|
||||
|
||||
it("migrates legacy isolation settings to announce delivery", () => {
|
||||
const normalized = normalizeCronJobCreate({
|
||||
name: "legacy isolation",
|
||||
|
|
@ -521,8 +499,7 @@ describe("normalizeCronJobPatch", () => {
|
|||
|
||||
const payload = normalized.payload as Record<string, unknown>;
|
||||
expect(payload.kind).toBeUndefined();
|
||||
expect(payload.channel).toBe("telegram");
|
||||
expect(payload.to).toBe("+15550001111");
|
||||
expectPayloadDeliveryHintsCleared(payload);
|
||||
});
|
||||
|
||||
it("preserves null sessionKey patches and trims string values", () => {
|
||||
|
|
@ -546,7 +523,7 @@ describe("normalizeCronJobPatch", () => {
|
|||
expect(schedule.staggerMs).toBe(30_000);
|
||||
});
|
||||
|
||||
it("preserves legacy patch threadId hints for downstream delivery migration", () => {
|
||||
it("strips legacy patch threadId hints from live input", () => {
|
||||
const normalized = normalizeCronJobPatch({
|
||||
payload: {
|
||||
kind: "agentTurn",
|
||||
|
|
@ -555,6 +532,6 @@ describe("normalizeCronJobPatch", () => {
|
|||
}) as unknown as Record<string, unknown>;
|
||||
|
||||
expect(normalized.delivery).toBeUndefined();
|
||||
expect((normalized.payload as Record<string, unknown>).threadId).toBe(77);
|
||||
expect((normalized.payload as Record<string, unknown>).threadId).toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -1,16 +1,12 @@
|
|||
import { sanitizeAgentId } from "../routing/session-key.js";
|
||||
import { isRecord } from "../utils.js";
|
||||
import {
|
||||
DeliveryThreadIdFieldSchema,
|
||||
TimeoutSecondsFieldSchema,
|
||||
TrimmedNonEmptyStringFieldSchema,
|
||||
parseDeliveryInput,
|
||||
parseLegacyDeliveryHintsInput,
|
||||
parseOptionalField,
|
||||
} from "./delivery-field-schemas.js";
|
||||
import { normalizeLegacyDeliveryInput } from "./legacy-delivery.js";
|
||||
import { parseAbsoluteTimeMs } from "./parse.js";
|
||||
import { migrateLegacyCronPayload } from "./payload-migration.js";
|
||||
import { inferLegacyName } from "./service/normalize.js";
|
||||
import { normalizeCronStaggerMs, resolveDefaultCronStaggerMs } from "./stagger.js";
|
||||
import type { CronJobCreate, CronJobPatch } from "./types.js";
|
||||
|
|
@ -92,8 +88,6 @@ function coerceSchedule(schedule: UnknownRecord) {
|
|||
|
||||
function coercePayload(payload: UnknownRecord) {
|
||||
const next: UnknownRecord = { ...payload };
|
||||
// Back-compat: older configs used `provider` for delivery channel.
|
||||
migrateLegacyCronPayload(next);
|
||||
const kindRaw = typeof next.kind === "string" ? next.kind.trim().toLowerCase() : "";
|
||||
if (kindRaw === "agentturn") {
|
||||
next.kind = "agentTurn";
|
||||
|
|
@ -176,6 +170,24 @@ function coercePayload(payload: UnknownRecord) {
|
|||
delete next.toolsAllow;
|
||||
}
|
||||
}
|
||||
if ("deliver" in next) {
|
||||
delete next.deliver;
|
||||
}
|
||||
if ("channel" in next) {
|
||||
delete next.channel;
|
||||
}
|
||||
if ("to" in next) {
|
||||
delete next.to;
|
||||
}
|
||||
if ("threadId" in next) {
|
||||
delete next.threadId;
|
||||
}
|
||||
if ("bestEffortDeliver" in next) {
|
||||
delete next.bestEffortDeliver;
|
||||
}
|
||||
if ("provider" in next) {
|
||||
delete next.provider;
|
||||
}
|
||||
return next;
|
||||
}
|
||||
|
||||
|
|
@ -274,29 +286,6 @@ function copyTopLevelAgentTurnFields(next: UnknownRecord, payload: UnknownRecord
|
|||
}
|
||||
}
|
||||
|
||||
function copyTopLevelLegacyDeliveryFields(next: UnknownRecord, payload: UnknownRecord) {
|
||||
const hints = parseLegacyDeliveryHintsInput(next);
|
||||
if (typeof payload.deliver !== "boolean" && hints.deliver !== undefined) {
|
||||
payload.deliver = hints.deliver;
|
||||
}
|
||||
if (typeof payload.channel !== "string" && hints.channel !== undefined) {
|
||||
payload.channel = hints.channel;
|
||||
}
|
||||
if (typeof payload.to !== "string" && hints.to !== undefined) {
|
||||
payload.to = hints.to;
|
||||
}
|
||||
const threadId = parseOptionalField(DeliveryThreadIdFieldSchema, next.threadId);
|
||||
if (!("threadId" in payload) && threadId !== undefined) {
|
||||
payload.threadId = threadId;
|
||||
}
|
||||
if (typeof payload.bestEffortDeliver !== "boolean" && hints.bestEffortDeliver !== undefined) {
|
||||
payload.bestEffortDeliver = hints.bestEffortDeliver;
|
||||
}
|
||||
if (typeof payload.provider !== "string" && hints.provider !== undefined) {
|
||||
payload.provider = hints.provider;
|
||||
}
|
||||
}
|
||||
|
||||
function stripLegacyTopLevelFields(next: UnknownRecord) {
|
||||
delete next.model;
|
||||
delete next.thinking;
|
||||
|
|
@ -412,7 +401,6 @@ export function normalizeCronJobInput(
|
|||
const payload = isRecord(next.payload) ? next.payload : null;
|
||||
if (payload && payload.kind === "agentTurn") {
|
||||
copyTopLevelAgentTurnFields(next, payload);
|
||||
copyTopLevelLegacyDeliveryFields(next, payload);
|
||||
}
|
||||
stripLegacyTopLevelFields(next);
|
||||
|
||||
|
|
@ -505,19 +493,7 @@ export function normalizeCronJobInput(
|
|||
sessionTarget.startsWith("session:") ||
|
||||
(sessionTarget === "" && payloadKind === "agentTurn");
|
||||
const hasDelivery = "delivery" in next && next.delivery !== undefined;
|
||||
const normalizedLegacy = normalizeLegacyDeliveryInput({
|
||||
delivery: isRecord(next.delivery) ? next.delivery : null,
|
||||
payload,
|
||||
});
|
||||
if (normalizedLegacy.mutated && normalizedLegacy.delivery) {
|
||||
next.delivery = normalizedLegacy.delivery;
|
||||
}
|
||||
if (
|
||||
!hasDelivery &&
|
||||
!normalizedLegacy.delivery &&
|
||||
isIsolatedAgentTurn &&
|
||||
payloadKind === "agentTurn"
|
||||
) {
|
||||
if (!hasDelivery && isIsolatedAgentTurn && payloadKind === "agentTurn") {
|
||||
next.delivery = { mode: "announce" };
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,218 +0,0 @@
|
|||
import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import { CronService } from "./service.js";
|
||||
import { setupCronServiceSuite } from "./service.test-harness.js";
|
||||
|
||||
const { logger: noopLogger, makeStorePath } = setupCronServiceSuite({
|
||||
prefix: "openclaw-cron-",
|
||||
baseTimeIso: "2026-02-06T17:00:00.000Z",
|
||||
});
|
||||
|
||||
function createStartedCron(storePath: string) {
|
||||
const cron = new CronService({
|
||||
storePath,
|
||||
cronEnabled: true,
|
||||
log: noopLogger,
|
||||
enqueueSystemEvent: vi.fn(),
|
||||
requestHeartbeatNow: vi.fn(),
|
||||
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const, summary: "ok" })),
|
||||
});
|
||||
return {
|
||||
cron,
|
||||
start: async () => {
|
||||
await cron.start();
|
||||
return cron;
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
async function listJobById(cron: CronService, jobId: string) {
|
||||
const jobs = await cron.list({ includeDisabled: true });
|
||||
return jobs.find((entry) => entry.id === jobId);
|
||||
}
|
||||
|
||||
async function startCronWithStoredJobs(jobs: Array<Record<string, unknown>>) {
|
||||
const store = await makeStorePath();
|
||||
await fs.mkdir(path.dirname(store.storePath), { recursive: true });
|
||||
await fs.writeFile(
|
||||
store.storePath,
|
||||
JSON.stringify(
|
||||
{
|
||||
version: 1,
|
||||
jobs,
|
||||
},
|
||||
null,
|
||||
2,
|
||||
),
|
||||
"utf-8",
|
||||
);
|
||||
const cron = await createStartedCron(store.storePath).start();
|
||||
return { store, cron };
|
||||
}
|
||||
|
||||
async function stopCronAndCleanup(cron: CronService, store: { cleanup: () => Promise<void> }) {
|
||||
cron.stop();
|
||||
await store.cleanup();
|
||||
}
|
||||
|
||||
function createLegacyIsolatedAgentTurnJob(
|
||||
overrides: Record<string, unknown>,
|
||||
): Record<string, unknown> {
|
||||
return {
|
||||
enabled: true,
|
||||
createdAtMs: Date.parse("2026-02-01T12:00:00.000Z"),
|
||||
updatedAtMs: Date.parse("2026-02-05T12:00:00.000Z"),
|
||||
schedule: { kind: "cron", expr: "0 23 * * *", tz: "UTC" },
|
||||
sessionTarget: "isolated",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "agentTurn", message: "legacy payload fields" },
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
describe("CronService store migrations", () => {
|
||||
it("treats stored current session targets as isolated-like for default delivery migration", async () => {
|
||||
const { store, cron } = await startCronWithStoredJobs([
|
||||
createLegacyIsolatedAgentTurnJob({
|
||||
id: "stored-current-job",
|
||||
name: "stored current",
|
||||
sessionTarget: "current",
|
||||
}),
|
||||
]);
|
||||
|
||||
const job = await listJobById(cron, "stored-current-job");
|
||||
expect(job).toBeDefined();
|
||||
expect(job?.sessionTarget).toBe("isolated");
|
||||
expect(job?.delivery).toEqual({ mode: "announce" });
|
||||
|
||||
await stopCronAndCleanup(cron, store);
|
||||
});
|
||||
|
||||
it("preserves stored custom session targets", async () => {
|
||||
const { store, cron } = await startCronWithStoredJobs([
|
||||
createLegacyIsolatedAgentTurnJob({
|
||||
id: "custom-session-job",
|
||||
name: "custom session",
|
||||
sessionTarget: "session:ProjectAlpha",
|
||||
}),
|
||||
]);
|
||||
|
||||
const job = await listJobById(cron, "custom-session-job");
|
||||
expect(job?.sessionTarget).toBe("session:ProjectAlpha");
|
||||
expect(job?.delivery).toEqual({ mode: "announce" });
|
||||
|
||||
await stopCronAndCleanup(cron, store);
|
||||
});
|
||||
|
||||
it("migrates legacy top-level agentTurn fields and initializes missing state", async () => {
|
||||
const { store, cron } = await startCronWithStoredJobs([
|
||||
createLegacyIsolatedAgentTurnJob({
|
||||
id: "legacy-agentturn-job",
|
||||
name: "legacy agentturn",
|
||||
model: "openrouter/deepseek/deepseek-r1",
|
||||
thinking: "high",
|
||||
timeoutSeconds: 120,
|
||||
allowUnsafeExternalContent: true,
|
||||
deliver: true,
|
||||
channel: "telegram",
|
||||
to: "12345",
|
||||
bestEffortDeliver: true,
|
||||
}),
|
||||
]);
|
||||
|
||||
const status = await cron.status();
|
||||
expect(status.enabled).toBe(true);
|
||||
|
||||
const job = await listJobById(cron, "legacy-agentturn-job");
|
||||
expect(job).toBeDefined();
|
||||
expect(job?.state).toBeDefined();
|
||||
expect(job?.sessionTarget).toBe("isolated");
|
||||
expect(job?.payload.kind).toBe("agentTurn");
|
||||
if (job?.payload.kind === "agentTurn") {
|
||||
expect(job.payload.model).toBe("openrouter/deepseek/deepseek-r1");
|
||||
expect(job.payload.thinking).toBe("high");
|
||||
expect(job.payload.timeoutSeconds).toBe(120);
|
||||
expect(job.payload.allowUnsafeExternalContent).toBe(true);
|
||||
}
|
||||
expect(job?.delivery).toEqual({
|
||||
mode: "announce",
|
||||
channel: "telegram",
|
||||
to: "12345",
|
||||
bestEffort: true,
|
||||
});
|
||||
|
||||
const persisted = JSON.parse(await fs.readFile(store.storePath, "utf-8")) as {
|
||||
jobs: Array<Record<string, unknown>>;
|
||||
};
|
||||
const persistedJob = persisted.jobs.find((entry) => entry.id === "legacy-agentturn-job");
|
||||
expect(persistedJob).toBeDefined();
|
||||
expect(persistedJob?.state).toEqual(expect.any(Object));
|
||||
expect(persistedJob?.model).toBeUndefined();
|
||||
expect(persistedJob?.thinking).toBeUndefined();
|
||||
expect(persistedJob?.timeoutSeconds).toBeUndefined();
|
||||
expect(persistedJob?.deliver).toBeUndefined();
|
||||
expect(persistedJob?.channel).toBeUndefined();
|
||||
expect(persistedJob?.to).toBeUndefined();
|
||||
expect(persistedJob?.bestEffortDeliver).toBeUndefined();
|
||||
|
||||
await stopCronAndCleanup(cron, store);
|
||||
});
|
||||
|
||||
it("preserves legacy timeoutSeconds=0 during top-level agentTurn field migration", async () => {
|
||||
const { store, cron } = await startCronWithStoredJobs([
|
||||
createLegacyIsolatedAgentTurnJob({
|
||||
id: "legacy-agentturn-no-timeout",
|
||||
name: "legacy no-timeout",
|
||||
timeoutSeconds: 0,
|
||||
}),
|
||||
]);
|
||||
|
||||
const job = await listJobById(cron, "legacy-agentturn-no-timeout");
|
||||
expect(job).toBeDefined();
|
||||
expect(job?.payload.kind).toBe("agentTurn");
|
||||
if (job?.payload.kind === "agentTurn") {
|
||||
expect(job.payload.timeoutSeconds).toBe(0);
|
||||
}
|
||||
|
||||
await stopCronAndCleanup(cron, store);
|
||||
});
|
||||
|
||||
it("migrates legacy cron fields (jobId + schedule.cron) and defaults wakeMode", async () => {
|
||||
const { store, cron } = await startCronWithStoredJobs([
|
||||
{
|
||||
jobId: "legacy-cron-field-job",
|
||||
name: "legacy cron field",
|
||||
enabled: true,
|
||||
createdAtMs: Date.parse("2026-02-01T12:00:00.000Z"),
|
||||
updatedAtMs: Date.parse("2026-02-05T12:00:00.000Z"),
|
||||
schedule: { kind: "cron", cron: "*/5 * * * *", tz: "UTC" },
|
||||
payload: { kind: "systemEvent", text: "tick" },
|
||||
state: {},
|
||||
},
|
||||
]);
|
||||
const job = await listJobById(cron, "legacy-cron-field-job");
|
||||
expect(job).toBeDefined();
|
||||
expect(job?.wakeMode).toBe("now");
|
||||
expect(job?.schedule.kind).toBe("cron");
|
||||
if (job?.schedule.kind === "cron") {
|
||||
expect(job.schedule.expr).toBe("*/5 * * * *");
|
||||
}
|
||||
|
||||
const persisted = JSON.parse(await fs.readFile(store.storePath, "utf-8")) as {
|
||||
jobs: Array<Record<string, unknown>>;
|
||||
};
|
||||
const persistedJob = persisted.jobs.find((entry) => entry.id === "legacy-cron-field-job");
|
||||
expect(persistedJob).toBeDefined();
|
||||
expect(persistedJob?.jobId).toBeUndefined();
|
||||
expect(persistedJob?.wakeMode).toBe("now");
|
||||
const persistedSchedule =
|
||||
persistedJob?.schedule && typeof persistedJob.schedule === "object"
|
||||
? (persistedJob.schedule as Record<string, unknown>)
|
||||
: null;
|
||||
expect(persistedSchedule?.cron).toBeUndefined();
|
||||
expect(persistedSchedule?.expr).toBe("*/5 * * * *");
|
||||
|
||||
await stopCronAndCleanup(cron, store);
|
||||
});
|
||||
});
|
||||
|
|
@ -10,7 +10,7 @@ const { logger, makeStorePath } = setupCronServiceSuite({
|
|||
});
|
||||
|
||||
describe("cron service store seam coverage", () => {
|
||||
it("loads, normalizes legacy stored jobs, recomputes next runs, and persists the migrated shape", async () => {
|
||||
it("loads stored jobs, recomputes next runs, and does not rewrite the store on load", async () => {
|
||||
const { storePath } = await makeStorePath();
|
||||
const now = Date.parse("2026-03-23T12:00:00.000Z");
|
||||
|
||||
|
|
@ -22,18 +22,16 @@ describe("cron service store seam coverage", () => {
|
|||
version: 1,
|
||||
jobs: [
|
||||
{
|
||||
id: "legacy-current-job",
|
||||
name: "legacy current job",
|
||||
id: "modern-job",
|
||||
name: "modern job",
|
||||
enabled: true,
|
||||
createdAtMs: now - 60_000,
|
||||
updatedAtMs: now - 60_000,
|
||||
schedule: { kind: "every", everyMs: 60_000 },
|
||||
sessionTarget: "current",
|
||||
wakeMode: "next-heartbeat",
|
||||
message: "legacy message-only payload",
|
||||
provider: "demo-channel",
|
||||
to: "123",
|
||||
deliver: true,
|
||||
sessionTarget: "isolated",
|
||||
wakeMode: "now",
|
||||
payload: { kind: "agentTurn", message: "ping" },
|
||||
delivery: { mode: "announce", channel: "telegram", to: "123" },
|
||||
state: {},
|
||||
},
|
||||
],
|
||||
|
|
@ -61,14 +59,11 @@ describe("cron service store seam coverage", () => {
|
|||
expect(job?.sessionTarget).toBe("isolated");
|
||||
expect(job?.payload.kind).toBe("agentTurn");
|
||||
if (job?.payload.kind === "agentTurn") {
|
||||
expect(job.payload.message).toBe("legacy message-only payload");
|
||||
expect(job.payload.channel).toBeUndefined();
|
||||
expect(job.payload.to).toBeUndefined();
|
||||
expect(job.payload.deliver).toBeUndefined();
|
||||
expect(job.payload.message).toBe("ping");
|
||||
}
|
||||
expect(job?.delivery).toMatchObject({
|
||||
mode: "announce",
|
||||
channel: "demo-channel",
|
||||
channel: "telegram",
|
||||
to: "123",
|
||||
});
|
||||
expect(job?.state.nextRunAtMs).toBe(now);
|
||||
|
|
@ -77,17 +72,13 @@ describe("cron service store seam coverage", () => {
|
|||
jobs: Array<Record<string, unknown>>;
|
||||
};
|
||||
const persistedJob = persisted.jobs[0];
|
||||
expect(persistedJob?.message).toBeUndefined();
|
||||
expect(persistedJob?.provider).toBeUndefined();
|
||||
expect(persistedJob?.to).toBeUndefined();
|
||||
expect(persistedJob?.deliver).toBeUndefined();
|
||||
expect(persistedJob?.payload).toMatchObject({
|
||||
kind: "agentTurn",
|
||||
message: "legacy message-only payload",
|
||||
message: "ping",
|
||||
});
|
||||
expect(persistedJob?.delivery).toMatchObject({
|
||||
mode: "announce",
|
||||
channel: "demo-channel",
|
||||
channel: "telegram",
|
||||
to: "123",
|
||||
});
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,4 @@
|
|||
import fs from "node:fs";
|
||||
import { normalizeStoredCronJobs } from "../store-migration.js";
|
||||
import { loadCronStore, saveCronStore } from "../store.js";
|
||||
import type { CronJob } from "../types.js";
|
||||
import { recomputeNextRuns } from "./jobs.js";
|
||||
|
|
@ -33,19 +32,16 @@ export async function ensureLoaded(
|
|||
|
||||
const fileMtimeMs = await getFileMtimeMs(state.deps.storePath);
|
||||
const loaded = await loadCronStore(state.deps.storePath);
|
||||
const jobs = (loaded.jobs ?? []) as unknown as Array<Record<string, unknown>>;
|
||||
const { mutated } = normalizeStoredCronJobs(jobs);
|
||||
state.store = { version: 1, jobs: jobs as unknown as CronJob[] };
|
||||
state.store = {
|
||||
version: 1,
|
||||
jobs: (loaded.jobs ?? []) as unknown as CronJob[],
|
||||
};
|
||||
state.storeLoadedAtMs = state.deps.nowMs();
|
||||
state.storeFileMtimeMs = fileMtimeMs;
|
||||
|
||||
if (!opts?.skipRecompute) {
|
||||
recomputeNextRuns(state);
|
||||
}
|
||||
|
||||
if (mutated) {
|
||||
await persist(state, { skipBackup: true });
|
||||
}
|
||||
}
|
||||
|
||||
export function warnIfDisabled(state: CronServiceState, action: string) {
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ describe("normalizeStoredCronJobs", () => {
|
|||
deliver: true,
|
||||
provider: " TeLeGrAm ",
|
||||
to: "12345",
|
||||
threadId: " 77 ",
|
||||
},
|
||||
] as Array<Record<string, unknown>>;
|
||||
|
||||
|
|
@ -39,6 +40,7 @@ describe("normalizeStoredCronJobs", () => {
|
|||
mode: "announce",
|
||||
channel: "telegram",
|
||||
to: "12345",
|
||||
threadId: "77",
|
||||
});
|
||||
expect(job?.payload).toMatchObject({
|
||||
kind: "agentTurn",
|
||||
|
|
|
|||
|
|
@ -118,6 +118,14 @@ function copyTopLevelAgentTurnFields(
|
|||
payload.to = raw.to.trim();
|
||||
mutated = true;
|
||||
}
|
||||
if (
|
||||
!("threadId" in payload) &&
|
||||
((typeof raw.threadId === "number" && Number.isFinite(raw.threadId)) ||
|
||||
(typeof raw.threadId === "string" && raw.threadId.trim()))
|
||||
) {
|
||||
payload.threadId = typeof raw.threadId === "string" ? raw.threadId.trim() : raw.threadId;
|
||||
mutated = true;
|
||||
}
|
||||
if (
|
||||
typeof payload.bestEffortDeliver !== "boolean" &&
|
||||
typeof raw.bestEffortDeliver === "boolean"
|
||||
|
|
@ -165,6 +173,9 @@ function stripLegacyTopLevelFields(raw: Record<string, unknown>) {
|
|||
if ("to" in raw) {
|
||||
delete raw.to;
|
||||
}
|
||||
if ("threadId" in raw) {
|
||||
delete raw.threadId;
|
||||
}
|
||||
if ("bestEffortDeliver" in raw) {
|
||||
delete raw.bestEffortDeliver;
|
||||
}
|
||||
|
|
@ -319,6 +330,7 @@ export function normalizeStoredCronJobs(
|
|||
"deliver" in raw ||
|
||||
"channel" in raw ||
|
||||
"to" in raw ||
|
||||
"threadId" in raw ||
|
||||
"bestEffortDeliver" in raw ||
|
||||
"provider" in raw;
|
||||
if (hadLegacyTopLevelPayloadFields || hadLegacyTopLevelDeliveryFields) {
|
||||
|
|
|
|||
|
|
@ -12,10 +12,6 @@ function cronAgentTurnPayloadSchema(params: { message: TSchema }) {
|
|||
timeoutSeconds: Type.Optional(Type.Integer({ minimum: 0 })),
|
||||
allowUnsafeExternalContent: Type.Optional(Type.Boolean()),
|
||||
lightContext: Type.Optional(Type.Boolean()),
|
||||
deliver: Type.Optional(Type.Boolean()),
|
||||
channel: Type.Optional(Type.String()),
|
||||
to: Type.Optional(Type.String()),
|
||||
bestEffortDeliver: Type.Optional(Type.Boolean()),
|
||||
},
|
||||
{ additionalProperties: false },
|
||||
);
|
||||
|
|
|
|||
Loading…
Reference in New Issue