mirror of https://github.com/openclaw/openclaw.git
refactor: add doctor cron migration helpers
This commit is contained in:
parent
19d0c2dd1d
commit
2bc8a0d67c
|
|
@ -0,0 +1,66 @@
|
|||
import { describe, expect, it } from "vitest";
|
||||
import {
|
||||
buildDeliveryFromLegacyPayload,
|
||||
buildDeliveryPatchFromLegacyPayload,
|
||||
hasLegacyDeliveryHints,
|
||||
mergeLegacyDeliveryInto,
|
||||
normalizeLegacyDeliveryInput,
|
||||
} from "./doctor-cron-legacy-delivery.js";
|
||||
|
||||
describe("legacy delivery threadId support", () => {
|
||||
it("treats threadId as a legacy delivery hint", () => {
|
||||
expect(hasLegacyDeliveryHints({ threadId: "42" })).toBe(true);
|
||||
expect(hasLegacyDeliveryHints({ threadId: 42 })).toBe(true);
|
||||
});
|
||||
|
||||
it("hydrates threadId into new delivery payloads", () => {
|
||||
expect(
|
||||
buildDeliveryFromLegacyPayload({
|
||||
channel: "telegram",
|
||||
to: "-100123:topic:42",
|
||||
threadId: 42,
|
||||
}),
|
||||
).toEqual({
|
||||
mode: "announce",
|
||||
channel: "telegram",
|
||||
to: "-100123:topic:42",
|
||||
threadId: "42",
|
||||
});
|
||||
});
|
||||
|
||||
it("patches and merges threadId into existing deliveries", () => {
|
||||
expect(buildDeliveryPatchFromLegacyPayload({ threadId: "77" })).toEqual({
|
||||
mode: "announce",
|
||||
threadId: "77",
|
||||
});
|
||||
|
||||
expect(
|
||||
mergeLegacyDeliveryInto(
|
||||
{ mode: "announce", channel: "telegram", to: "-100123", threadId: "1" },
|
||||
{ threadId: 77 },
|
||||
),
|
||||
).toEqual({
|
||||
delivery: { mode: "announce", channel: "telegram", to: "-100123", threadId: "77" },
|
||||
mutated: true,
|
||||
});
|
||||
});
|
||||
|
||||
it("strips threadId from legacy payloads after normalization", () => {
|
||||
const payload: Record<string, unknown> = {
|
||||
channel: "telegram",
|
||||
to: "-100123:topic:42",
|
||||
threadId: 42,
|
||||
};
|
||||
|
||||
expect(normalizeLegacyDeliveryInput({ payload })).toEqual({
|
||||
delivery: {
|
||||
mode: "announce",
|
||||
channel: "telegram",
|
||||
to: "-100123:topic:42",
|
||||
threadId: "42",
|
||||
},
|
||||
mutated: true,
|
||||
});
|
||||
expect(payload.threadId).toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
|
@ -0,0 +1,174 @@
|
|||
import { z } from "zod";
|
||||
import {
|
||||
DeliveryThreadIdFieldSchema,
|
||||
LowercaseNonEmptyStringFieldSchema,
|
||||
TrimmedNonEmptyStringFieldSchema,
|
||||
parseOptionalField,
|
||||
} from "../cron/delivery-field-schemas.js";
|
||||
|
||||
function parseLegacyDeliveryHintsInput(payload: Record<string, unknown>) {
|
||||
return {
|
||||
deliver: parseOptionalField(z.boolean(), payload.deliver),
|
||||
bestEffortDeliver: parseOptionalField(z.boolean(), payload.bestEffortDeliver),
|
||||
channel: parseOptionalField(LowercaseNonEmptyStringFieldSchema, payload.channel),
|
||||
provider: parseOptionalField(LowercaseNonEmptyStringFieldSchema, payload.provider),
|
||||
to: parseOptionalField(TrimmedNonEmptyStringFieldSchema, payload.to),
|
||||
threadId: parseOptionalField(
|
||||
DeliveryThreadIdFieldSchema.transform((value) => String(value)),
|
||||
payload.threadId,
|
||||
),
|
||||
};
|
||||
}
|
||||
|
||||
export function hasLegacyDeliveryHints(payload: Record<string, unknown>) {
|
||||
const hints = parseLegacyDeliveryHintsInput(payload);
|
||||
return (
|
||||
hints.deliver !== undefined ||
|
||||
hints.bestEffortDeliver !== undefined ||
|
||||
hints.channel !== undefined ||
|
||||
hints.provider !== undefined ||
|
||||
hints.to !== undefined ||
|
||||
hints.threadId !== undefined
|
||||
);
|
||||
}
|
||||
|
||||
export function buildDeliveryFromLegacyPayload(
|
||||
payload: Record<string, unknown>,
|
||||
): Record<string, unknown> {
|
||||
const hints = parseLegacyDeliveryHintsInput(payload);
|
||||
const mode = hints.deliver === false ? "none" : "announce";
|
||||
const next: Record<string, unknown> = { mode };
|
||||
if (hints.channel ?? hints.provider) {
|
||||
next.channel = hints.channel ?? hints.provider;
|
||||
}
|
||||
if (hints.to) {
|
||||
next.to = hints.to;
|
||||
}
|
||||
if (hints.threadId) {
|
||||
next.threadId = hints.threadId;
|
||||
}
|
||||
if (hints.bestEffortDeliver !== undefined) {
|
||||
next.bestEffort = hints.bestEffortDeliver;
|
||||
}
|
||||
return next;
|
||||
}
|
||||
|
||||
export function buildDeliveryPatchFromLegacyPayload(payload: Record<string, unknown>) {
|
||||
const hints = parseLegacyDeliveryHintsInput(payload);
|
||||
const next: Record<string, unknown> = {};
|
||||
let hasPatch = false;
|
||||
|
||||
if (hints.deliver === false) {
|
||||
next.mode = "none";
|
||||
hasPatch = true;
|
||||
} else if (
|
||||
hints.deliver === true ||
|
||||
hints.channel ||
|
||||
hints.provider ||
|
||||
hints.to ||
|
||||
hints.threadId ||
|
||||
hints.bestEffortDeliver !== undefined
|
||||
) {
|
||||
next.mode = "announce";
|
||||
hasPatch = true;
|
||||
}
|
||||
if (hints.channel ?? hints.provider) {
|
||||
next.channel = hints.channel ?? hints.provider;
|
||||
hasPatch = true;
|
||||
}
|
||||
if (hints.to) {
|
||||
next.to = hints.to;
|
||||
hasPatch = true;
|
||||
}
|
||||
if (hints.threadId) {
|
||||
next.threadId = hints.threadId;
|
||||
hasPatch = true;
|
||||
}
|
||||
if (hints.bestEffortDeliver !== undefined) {
|
||||
next.bestEffort = hints.bestEffortDeliver;
|
||||
hasPatch = true;
|
||||
}
|
||||
|
||||
return hasPatch ? next : null;
|
||||
}
|
||||
|
||||
export function mergeLegacyDeliveryInto(
|
||||
delivery: Record<string, unknown>,
|
||||
payload: Record<string, unknown>,
|
||||
) {
|
||||
const patch = buildDeliveryPatchFromLegacyPayload(payload);
|
||||
if (!patch) {
|
||||
return { delivery, mutated: false };
|
||||
}
|
||||
|
||||
const next = { ...delivery };
|
||||
let mutated = false;
|
||||
|
||||
if ("mode" in patch && patch.mode !== next.mode) {
|
||||
next.mode = patch.mode;
|
||||
mutated = true;
|
||||
}
|
||||
if ("channel" in patch && patch.channel !== next.channel) {
|
||||
next.channel = patch.channel;
|
||||
mutated = true;
|
||||
}
|
||||
if ("to" in patch && patch.to !== next.to) {
|
||||
next.to = patch.to;
|
||||
mutated = true;
|
||||
}
|
||||
if ("threadId" in patch && patch.threadId !== next.threadId) {
|
||||
next.threadId = patch.threadId;
|
||||
mutated = true;
|
||||
}
|
||||
if ("bestEffort" in patch && patch.bestEffort !== next.bestEffort) {
|
||||
next.bestEffort = patch.bestEffort;
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
return { delivery: next, mutated };
|
||||
}
|
||||
|
||||
export function normalizeLegacyDeliveryInput(params: {
|
||||
delivery?: Record<string, unknown> | null;
|
||||
payload?: Record<string, unknown> | null;
|
||||
}) {
|
||||
if (!params.payload || !hasLegacyDeliveryHints(params.payload)) {
|
||||
return {
|
||||
delivery: params.delivery ?? undefined,
|
||||
mutated: false,
|
||||
};
|
||||
}
|
||||
|
||||
const nextDelivery = params.delivery
|
||||
? mergeLegacyDeliveryInto(params.delivery, params.payload)
|
||||
: {
|
||||
delivery: buildDeliveryFromLegacyPayload(params.payload),
|
||||
mutated: true,
|
||||
};
|
||||
stripLegacyDeliveryFields(params.payload);
|
||||
return {
|
||||
delivery: nextDelivery.delivery,
|
||||
mutated: true,
|
||||
};
|
||||
}
|
||||
|
||||
export function stripLegacyDeliveryFields(payload: Record<string, unknown>) {
|
||||
if ("deliver" in payload) {
|
||||
delete payload.deliver;
|
||||
}
|
||||
if ("channel" in payload) {
|
||||
delete payload.channel;
|
||||
}
|
||||
if ("provider" in payload) {
|
||||
delete payload.provider;
|
||||
}
|
||||
if ("to" in payload) {
|
||||
delete payload.to;
|
||||
}
|
||||
if ("threadId" in payload) {
|
||||
delete payload.threadId;
|
||||
}
|
||||
if ("bestEffortDeliver" in payload) {
|
||||
delete payload.bestEffortDeliver;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,40 @@
|
|||
type UnknownRecord = Record<string, unknown>;
|
||||
|
||||
function readString(value: unknown): string | undefined {
|
||||
if (typeof value !== "string") {
|
||||
return undefined;
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
function normalizeChannel(value: string): string {
|
||||
return value.trim().toLowerCase();
|
||||
}
|
||||
|
||||
export function migrateLegacyCronPayload(payload: UnknownRecord): boolean {
|
||||
let mutated = false;
|
||||
|
||||
const channelValue = readString(payload.channel);
|
||||
const providerValue = readString(payload.provider);
|
||||
|
||||
const nextChannel =
|
||||
typeof channelValue === "string" && channelValue.trim().length > 0
|
||||
? normalizeChannel(channelValue)
|
||||
: typeof providerValue === "string" && providerValue.trim().length > 0
|
||||
? normalizeChannel(providerValue)
|
||||
: "";
|
||||
|
||||
if (nextChannel) {
|
||||
if (channelValue !== nextChannel) {
|
||||
payload.channel = nextChannel;
|
||||
mutated = true;
|
||||
}
|
||||
}
|
||||
|
||||
if ("provider" in payload) {
|
||||
delete payload.provider;
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
return mutated;
|
||||
}
|
||||
|
|
@ -0,0 +1,135 @@
|
|||
import { describe, expect, it } from "vitest";
|
||||
import { normalizeStoredCronJobs } from "./doctor-cron-store-migration.js";
|
||||
|
||||
describe("normalizeStoredCronJobs", () => {
|
||||
it("normalizes legacy cron fields and reports migration issues", () => {
|
||||
const jobs = [
|
||||
{
|
||||
jobId: "legacy-job",
|
||||
schedule: { kind: "cron", cron: "*/5 * * * *", tz: "UTC" },
|
||||
message: "say hi",
|
||||
model: "openai/gpt-5.4",
|
||||
deliver: true,
|
||||
provider: " TeLeGrAm ",
|
||||
to: "12345",
|
||||
threadId: " 77 ",
|
||||
},
|
||||
] as Array<Record<string, unknown>>;
|
||||
|
||||
const result = normalizeStoredCronJobs(jobs);
|
||||
|
||||
expect(result.mutated).toBe(true);
|
||||
expect(result.issues).toMatchObject({
|
||||
jobId: 1,
|
||||
legacyScheduleCron: 1,
|
||||
legacyTopLevelPayloadFields: 1,
|
||||
legacyTopLevelDeliveryFields: 1,
|
||||
});
|
||||
|
||||
const [job] = jobs;
|
||||
expect(job?.jobId).toBeUndefined();
|
||||
expect(job?.id).toBe("legacy-job");
|
||||
expect(job?.schedule).toMatchObject({
|
||||
kind: "cron",
|
||||
expr: "*/5 * * * *",
|
||||
tz: "UTC",
|
||||
});
|
||||
expect(job?.message).toBeUndefined();
|
||||
expect(job?.provider).toBeUndefined();
|
||||
expect(job?.delivery).toMatchObject({
|
||||
mode: "announce",
|
||||
channel: "telegram",
|
||||
to: "12345",
|
||||
threadId: "77",
|
||||
});
|
||||
expect(job?.payload).toMatchObject({
|
||||
kind: "agentTurn",
|
||||
message: "say hi",
|
||||
model: "openai/gpt-5.4",
|
||||
});
|
||||
});
|
||||
|
||||
it("normalizes payload provider alias into channel", () => {
|
||||
const jobs = [
|
||||
{
|
||||
id: "legacy-provider",
|
||||
schedule: { kind: "every", everyMs: 60_000 },
|
||||
payload: {
|
||||
kind: "agentTurn",
|
||||
message: "ping",
|
||||
provider: " Slack ",
|
||||
},
|
||||
},
|
||||
] as Array<Record<string, unknown>>;
|
||||
|
||||
const result = normalizeStoredCronJobs(jobs);
|
||||
|
||||
expect(result.mutated).toBe(true);
|
||||
expect(result.issues.legacyPayloadProvider).toBe(1);
|
||||
expect(jobs[0]?.payload).toMatchObject({
|
||||
kind: "agentTurn",
|
||||
message: "ping",
|
||||
});
|
||||
const payload = jobs[0]?.payload as Record<string, unknown> | undefined;
|
||||
expect(payload?.provider).toBeUndefined();
|
||||
expect(jobs[0]?.delivery).toMatchObject({
|
||||
mode: "announce",
|
||||
channel: "slack",
|
||||
});
|
||||
});
|
||||
|
||||
it("does not report legacyPayloadKind for already-normalized payload kinds", () => {
|
||||
const jobs = [
|
||||
{
|
||||
id: "normalized-agent-turn",
|
||||
name: "normalized",
|
||||
enabled: true,
|
||||
wakeMode: "now",
|
||||
schedule: { kind: "every", everyMs: 60_000, anchorMs: 1 },
|
||||
payload: { kind: "agentTurn", message: "ping" },
|
||||
sessionTarget: "isolated",
|
||||
delivery: { mode: "announce" },
|
||||
state: {},
|
||||
},
|
||||
] as Array<Record<string, unknown>>;
|
||||
|
||||
const result = normalizeStoredCronJobs(jobs);
|
||||
|
||||
expect(result.mutated).toBe(false);
|
||||
expect(result.issues.legacyPayloadKind).toBeUndefined();
|
||||
});
|
||||
|
||||
it("normalizes whitespace-padded and non-canonical payload kinds", () => {
|
||||
const jobs = [
|
||||
{
|
||||
id: "spaced-agent-turn",
|
||||
name: "normalized",
|
||||
enabled: true,
|
||||
wakeMode: "now",
|
||||
schedule: { kind: "every", everyMs: 60_000, anchorMs: 1 },
|
||||
payload: { kind: " agentTurn ", message: "ping" },
|
||||
sessionTarget: "isolated",
|
||||
delivery: { mode: "announce" },
|
||||
state: {},
|
||||
},
|
||||
{
|
||||
id: "upper-system-event",
|
||||
name: "normalized",
|
||||
enabled: true,
|
||||
wakeMode: "now",
|
||||
schedule: { kind: "every", everyMs: 60_000, anchorMs: 1 },
|
||||
payload: { kind: "SYSTEMEVENT", text: "pong" },
|
||||
sessionTarget: "main",
|
||||
delivery: { mode: "announce" },
|
||||
state: {},
|
||||
},
|
||||
] as Array<Record<string, unknown>>;
|
||||
|
||||
const result = normalizeStoredCronJobs(jobs);
|
||||
|
||||
expect(result.mutated).toBe(true);
|
||||
expect(result.issues.legacyPayloadKind).toBe(2);
|
||||
expect(jobs[0]?.payload).toMatchObject({ kind: "agentTurn", message: "ping" });
|
||||
expect(jobs[1]?.payload).toMatchObject({ kind: "systemEvent", text: "pong" });
|
||||
});
|
||||
});
|
||||
|
|
@ -0,0 +1,526 @@
|
|||
import { parseAbsoluteTimeMs } from "../cron/parse.js";
|
||||
import { coerceFiniteScheduleNumber } from "../cron/schedule.js";
|
||||
import { inferLegacyName, normalizeOptionalText } from "../cron/service/normalize.js";
|
||||
import { normalizeCronStaggerMs, resolveDefaultCronStaggerMs } from "../cron/stagger.js";
|
||||
import { normalizeLegacyDeliveryInput } from "./doctor-cron-legacy-delivery.js";
|
||||
import { migrateLegacyCronPayload } from "./doctor-cron-payload-migration.js";
|
||||
|
||||
type CronStoreIssueKey =
|
||||
| "jobId"
|
||||
| "legacyScheduleString"
|
||||
| "legacyScheduleCron"
|
||||
| "legacyPayloadKind"
|
||||
| "legacyPayloadProvider"
|
||||
| "legacyTopLevelPayloadFields"
|
||||
| "legacyTopLevelDeliveryFields"
|
||||
| "legacyDeliveryMode";
|
||||
|
||||
type CronStoreIssues = Partial<Record<CronStoreIssueKey, number>>;
|
||||
|
||||
type NormalizeCronStoreJobsResult = {
|
||||
issues: CronStoreIssues;
|
||||
jobs: Array<Record<string, unknown>>;
|
||||
mutated: boolean;
|
||||
};
|
||||
|
||||
function incrementIssue(issues: CronStoreIssues, key: CronStoreIssueKey) {
|
||||
issues[key] = (issues[key] ?? 0) + 1;
|
||||
}
|
||||
|
||||
function normalizePayloadKind(payload: Record<string, unknown>) {
|
||||
const raw = typeof payload.kind === "string" ? payload.kind.trim().toLowerCase() : "";
|
||||
if (raw === "agentturn") {
|
||||
if (payload.kind !== "agentTurn") {
|
||||
payload.kind = "agentTurn";
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
if (raw === "systemevent") {
|
||||
if (payload.kind !== "systemEvent") {
|
||||
payload.kind = "systemEvent";
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
function inferPayloadIfMissing(raw: Record<string, unknown>) {
|
||||
const message = typeof raw.message === "string" ? raw.message.trim() : "";
|
||||
const text = typeof raw.text === "string" ? raw.text.trim() : "";
|
||||
const command = typeof raw.command === "string" ? raw.command.trim() : "";
|
||||
if (message) {
|
||||
raw.payload = { kind: "agentTurn", message };
|
||||
return true;
|
||||
}
|
||||
if (text) {
|
||||
raw.payload = { kind: "systemEvent", text };
|
||||
return true;
|
||||
}
|
||||
if (command) {
|
||||
raw.payload = { kind: "systemEvent", text: command };
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
function copyTopLevelAgentTurnFields(
|
||||
raw: Record<string, unknown>,
|
||||
payload: Record<string, unknown>,
|
||||
) {
|
||||
let mutated = false;
|
||||
|
||||
const copyTrimmedString = (field: "model" | "thinking") => {
|
||||
const existing = payload[field];
|
||||
if (typeof existing === "string" && existing.trim()) {
|
||||
return;
|
||||
}
|
||||
const value = raw[field];
|
||||
if (typeof value === "string" && value.trim()) {
|
||||
payload[field] = value.trim();
|
||||
mutated = true;
|
||||
}
|
||||
};
|
||||
copyTrimmedString("model");
|
||||
copyTrimmedString("thinking");
|
||||
|
||||
if (
|
||||
typeof payload.timeoutSeconds !== "number" &&
|
||||
typeof raw.timeoutSeconds === "number" &&
|
||||
Number.isFinite(raw.timeoutSeconds)
|
||||
) {
|
||||
payload.timeoutSeconds = Math.max(0, Math.floor(raw.timeoutSeconds));
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
if (
|
||||
typeof payload.allowUnsafeExternalContent !== "boolean" &&
|
||||
typeof raw.allowUnsafeExternalContent === "boolean"
|
||||
) {
|
||||
payload.allowUnsafeExternalContent = raw.allowUnsafeExternalContent;
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
if (typeof payload.deliver !== "boolean" && typeof raw.deliver === "boolean") {
|
||||
payload.deliver = raw.deliver;
|
||||
mutated = true;
|
||||
}
|
||||
if (
|
||||
typeof payload.channel !== "string" &&
|
||||
typeof raw.channel === "string" &&
|
||||
raw.channel.trim()
|
||||
) {
|
||||
payload.channel = raw.channel.trim();
|
||||
mutated = true;
|
||||
}
|
||||
if (typeof payload.to !== "string" && typeof raw.to === "string" && raw.to.trim()) {
|
||||
payload.to = raw.to.trim();
|
||||
mutated = true;
|
||||
}
|
||||
if (
|
||||
!("threadId" in payload) &&
|
||||
((typeof raw.threadId === "number" && Number.isFinite(raw.threadId)) ||
|
||||
(typeof raw.threadId === "string" && raw.threadId.trim()))
|
||||
) {
|
||||
payload.threadId = typeof raw.threadId === "string" ? raw.threadId.trim() : raw.threadId;
|
||||
mutated = true;
|
||||
}
|
||||
if (
|
||||
typeof payload.bestEffortDeliver !== "boolean" &&
|
||||
typeof raw.bestEffortDeliver === "boolean"
|
||||
) {
|
||||
payload.bestEffortDeliver = raw.bestEffortDeliver;
|
||||
mutated = true;
|
||||
}
|
||||
if (
|
||||
typeof payload.provider !== "string" &&
|
||||
typeof raw.provider === "string" &&
|
||||
raw.provider.trim()
|
||||
) {
|
||||
payload.provider = raw.provider.trim();
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
return mutated;
|
||||
}
|
||||
|
||||
function stripLegacyTopLevelFields(raw: Record<string, unknown>) {
|
||||
if ("model" in raw) {
|
||||
delete raw.model;
|
||||
}
|
||||
if ("thinking" in raw) {
|
||||
delete raw.thinking;
|
||||
}
|
||||
if ("timeoutSeconds" in raw) {
|
||||
delete raw.timeoutSeconds;
|
||||
}
|
||||
if ("allowUnsafeExternalContent" in raw) {
|
||||
delete raw.allowUnsafeExternalContent;
|
||||
}
|
||||
if ("message" in raw) {
|
||||
delete raw.message;
|
||||
}
|
||||
if ("text" in raw) {
|
||||
delete raw.text;
|
||||
}
|
||||
if ("deliver" in raw) {
|
||||
delete raw.deliver;
|
||||
}
|
||||
if ("channel" in raw) {
|
||||
delete raw.channel;
|
||||
}
|
||||
if ("to" in raw) {
|
||||
delete raw.to;
|
||||
}
|
||||
if ("threadId" in raw) {
|
||||
delete raw.threadId;
|
||||
}
|
||||
if ("bestEffortDeliver" in raw) {
|
||||
delete raw.bestEffortDeliver;
|
||||
}
|
||||
if ("provider" in raw) {
|
||||
delete raw.provider;
|
||||
}
|
||||
if ("command" in raw) {
|
||||
delete raw.command;
|
||||
}
|
||||
if ("timeout" in raw) {
|
||||
delete raw.timeout;
|
||||
}
|
||||
}
|
||||
|
||||
export function normalizeStoredCronJobs(
|
||||
jobs: Array<Record<string, unknown>>,
|
||||
): NormalizeCronStoreJobsResult {
|
||||
const issues: CronStoreIssues = {};
|
||||
let mutated = false;
|
||||
|
||||
for (const raw of jobs) {
|
||||
const jobIssues = new Set<CronStoreIssueKey>();
|
||||
const trackIssue = (key: CronStoreIssueKey) => {
|
||||
if (jobIssues.has(key)) {
|
||||
return;
|
||||
}
|
||||
jobIssues.add(key);
|
||||
incrementIssue(issues, key);
|
||||
};
|
||||
|
||||
const state = raw.state;
|
||||
if (!state || typeof state !== "object" || Array.isArray(state)) {
|
||||
raw.state = {};
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
const rawId = typeof raw.id === "string" ? raw.id.trim() : "";
|
||||
const legacyJobId = typeof raw.jobId === "string" ? raw.jobId.trim() : "";
|
||||
if (!rawId && legacyJobId) {
|
||||
raw.id = legacyJobId;
|
||||
mutated = true;
|
||||
trackIssue("jobId");
|
||||
} else if (rawId && raw.id !== rawId) {
|
||||
raw.id = rawId;
|
||||
mutated = true;
|
||||
}
|
||||
if ("jobId" in raw) {
|
||||
delete raw.jobId;
|
||||
mutated = true;
|
||||
trackIssue("jobId");
|
||||
}
|
||||
|
||||
if (typeof raw.schedule === "string") {
|
||||
const expr = raw.schedule.trim();
|
||||
raw.schedule = { kind: "cron", expr };
|
||||
mutated = true;
|
||||
trackIssue("legacyScheduleString");
|
||||
}
|
||||
|
||||
const nameRaw = raw.name;
|
||||
if (typeof nameRaw !== "string" || nameRaw.trim().length === 0) {
|
||||
raw.name = inferLegacyName({
|
||||
schedule: raw.schedule as never,
|
||||
payload: raw.payload as never,
|
||||
});
|
||||
mutated = true;
|
||||
} else {
|
||||
raw.name = nameRaw.trim();
|
||||
}
|
||||
|
||||
const desc = normalizeOptionalText(raw.description);
|
||||
if (raw.description !== desc) {
|
||||
raw.description = desc;
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
if ("sessionKey" in raw) {
|
||||
const sessionKey =
|
||||
typeof raw.sessionKey === "string" ? normalizeOptionalText(raw.sessionKey) : undefined;
|
||||
if (raw.sessionKey !== sessionKey) {
|
||||
raw.sessionKey = sessionKey;
|
||||
mutated = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (typeof raw.enabled !== "boolean") {
|
||||
raw.enabled = true;
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
const wakeModeRaw = typeof raw.wakeMode === "string" ? raw.wakeMode.trim().toLowerCase() : "";
|
||||
if (wakeModeRaw === "next-heartbeat") {
|
||||
if (raw.wakeMode !== "next-heartbeat") {
|
||||
raw.wakeMode = "next-heartbeat";
|
||||
mutated = true;
|
||||
}
|
||||
} else if (wakeModeRaw === "now") {
|
||||
if (raw.wakeMode !== "now") {
|
||||
raw.wakeMode = "now";
|
||||
mutated = true;
|
||||
}
|
||||
} else {
|
||||
raw.wakeMode = "now";
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
const payload = raw.payload;
|
||||
if (
|
||||
(!payload || typeof payload !== "object" || Array.isArray(payload)) &&
|
||||
inferPayloadIfMissing(raw)
|
||||
) {
|
||||
mutated = true;
|
||||
trackIssue("legacyTopLevelPayloadFields");
|
||||
}
|
||||
|
||||
const payloadRecord =
|
||||
raw.payload && typeof raw.payload === "object" && !Array.isArray(raw.payload)
|
||||
? (raw.payload as Record<string, unknown>)
|
||||
: null;
|
||||
|
||||
if (payloadRecord) {
|
||||
if (normalizePayloadKind(payloadRecord)) {
|
||||
mutated = true;
|
||||
trackIssue("legacyPayloadKind");
|
||||
}
|
||||
if (!payloadRecord.kind) {
|
||||
if (typeof payloadRecord.message === "string" && payloadRecord.message.trim()) {
|
||||
payloadRecord.kind = "agentTurn";
|
||||
mutated = true;
|
||||
trackIssue("legacyPayloadKind");
|
||||
} else if (typeof payloadRecord.text === "string" && payloadRecord.text.trim()) {
|
||||
payloadRecord.kind = "systemEvent";
|
||||
mutated = true;
|
||||
trackIssue("legacyPayloadKind");
|
||||
}
|
||||
}
|
||||
if (payloadRecord.kind === "agentTurn" && copyTopLevelAgentTurnFields(raw, payloadRecord)) {
|
||||
mutated = true;
|
||||
}
|
||||
}
|
||||
|
||||
const hadLegacyTopLevelPayloadFields =
|
||||
"model" in raw ||
|
||||
"thinking" in raw ||
|
||||
"timeoutSeconds" in raw ||
|
||||
"allowUnsafeExternalContent" in raw ||
|
||||
"message" in raw ||
|
||||
"text" in raw ||
|
||||
"command" in raw ||
|
||||
"timeout" in raw;
|
||||
const hadLegacyTopLevelDeliveryFields =
|
||||
"deliver" in raw ||
|
||||
"channel" in raw ||
|
||||
"to" in raw ||
|
||||
"threadId" in raw ||
|
||||
"bestEffortDeliver" in raw ||
|
||||
"provider" in raw;
|
||||
if (hadLegacyTopLevelPayloadFields || hadLegacyTopLevelDeliveryFields) {
|
||||
stripLegacyTopLevelFields(raw);
|
||||
mutated = true;
|
||||
if (hadLegacyTopLevelPayloadFields) {
|
||||
trackIssue("legacyTopLevelPayloadFields");
|
||||
}
|
||||
if (hadLegacyTopLevelDeliveryFields) {
|
||||
trackIssue("legacyTopLevelDeliveryFields");
|
||||
}
|
||||
}
|
||||
|
||||
if (payloadRecord) {
|
||||
const hadLegacyPayloadProvider =
|
||||
typeof payloadRecord.provider === "string" && payloadRecord.provider.trim().length > 0;
|
||||
if (migrateLegacyCronPayload(payloadRecord)) {
|
||||
mutated = true;
|
||||
if (hadLegacyPayloadProvider) {
|
||||
trackIssue("legacyPayloadProvider");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const schedule = raw.schedule;
|
||||
if (schedule && typeof schedule === "object" && !Array.isArray(schedule)) {
|
||||
const sched = schedule as Record<string, unknown>;
|
||||
const kind = typeof sched.kind === "string" ? sched.kind.trim().toLowerCase() : "";
|
||||
if (!kind && ("at" in sched || "atMs" in sched)) {
|
||||
sched.kind = "at";
|
||||
mutated = true;
|
||||
}
|
||||
const atRaw = typeof sched.at === "string" ? sched.at.trim() : "";
|
||||
const atMsRaw = sched.atMs;
|
||||
const parsedAtMs =
|
||||
typeof atMsRaw === "number"
|
||||
? atMsRaw
|
||||
: typeof atMsRaw === "string"
|
||||
? parseAbsoluteTimeMs(atMsRaw)
|
||||
: atRaw
|
||||
? parseAbsoluteTimeMs(atRaw)
|
||||
: null;
|
||||
if (parsedAtMs !== null) {
|
||||
sched.at = new Date(parsedAtMs).toISOString();
|
||||
if ("atMs" in sched) {
|
||||
delete sched.atMs;
|
||||
}
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
const everyMsRaw = sched.everyMs;
|
||||
const everyMsCoerced = coerceFiniteScheduleNumber(everyMsRaw);
|
||||
const everyMs = everyMsCoerced !== undefined ? Math.floor(everyMsCoerced) : null;
|
||||
if (everyMs !== null && everyMsRaw !== everyMs) {
|
||||
sched.everyMs = everyMs;
|
||||
mutated = true;
|
||||
}
|
||||
if ((kind === "every" || sched.kind === "every") && everyMs !== null) {
|
||||
const anchorRaw = sched.anchorMs;
|
||||
const anchorCoerced = coerceFiniteScheduleNumber(anchorRaw);
|
||||
const normalizedAnchor =
|
||||
anchorCoerced !== undefined
|
||||
? Math.max(0, Math.floor(anchorCoerced))
|
||||
: typeof raw.createdAtMs === "number" && Number.isFinite(raw.createdAtMs)
|
||||
? Math.max(0, Math.floor(raw.createdAtMs))
|
||||
: typeof raw.updatedAtMs === "number" && Number.isFinite(raw.updatedAtMs)
|
||||
? Math.max(0, Math.floor(raw.updatedAtMs))
|
||||
: null;
|
||||
if (normalizedAnchor !== null && anchorRaw !== normalizedAnchor) {
|
||||
sched.anchorMs = normalizedAnchor;
|
||||
mutated = true;
|
||||
}
|
||||
}
|
||||
|
||||
const exprRaw = typeof sched.expr === "string" ? sched.expr.trim() : "";
|
||||
const legacyCronRaw = typeof sched.cron === "string" ? sched.cron.trim() : "";
|
||||
let normalizedExpr = exprRaw;
|
||||
if (!normalizedExpr && legacyCronRaw) {
|
||||
normalizedExpr = legacyCronRaw;
|
||||
sched.expr = normalizedExpr;
|
||||
mutated = true;
|
||||
trackIssue("legacyScheduleCron");
|
||||
}
|
||||
if (typeof sched.expr === "string" && sched.expr !== normalizedExpr) {
|
||||
sched.expr = normalizedExpr;
|
||||
mutated = true;
|
||||
}
|
||||
if ("cron" in sched) {
|
||||
delete sched.cron;
|
||||
mutated = true;
|
||||
trackIssue("legacyScheduleCron");
|
||||
}
|
||||
if ((kind === "cron" || sched.kind === "cron") && normalizedExpr) {
|
||||
const explicitStaggerMs = normalizeCronStaggerMs(sched.staggerMs);
|
||||
const defaultStaggerMs = resolveDefaultCronStaggerMs(normalizedExpr);
|
||||
const targetStaggerMs = explicitStaggerMs ?? defaultStaggerMs;
|
||||
if (targetStaggerMs === undefined) {
|
||||
if ("staggerMs" in sched) {
|
||||
delete sched.staggerMs;
|
||||
mutated = true;
|
||||
}
|
||||
} else if (sched.staggerMs !== targetStaggerMs) {
|
||||
sched.staggerMs = targetStaggerMs;
|
||||
mutated = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const delivery = raw.delivery;
|
||||
if (delivery && typeof delivery === "object" && !Array.isArray(delivery)) {
|
||||
const modeRaw = (delivery as { mode?: unknown }).mode;
|
||||
if (typeof modeRaw === "string") {
|
||||
const lowered = modeRaw.trim().toLowerCase();
|
||||
if (lowered === "deliver") {
|
||||
(delivery as { mode?: unknown }).mode = "announce";
|
||||
mutated = true;
|
||||
trackIssue("legacyDeliveryMode");
|
||||
}
|
||||
} else if (modeRaw === undefined || modeRaw === null) {
|
||||
(delivery as { mode?: unknown }).mode = "announce";
|
||||
mutated = true;
|
||||
}
|
||||
}
|
||||
|
||||
const isolation = raw.isolation;
|
||||
if (isolation && typeof isolation === "object" && !Array.isArray(isolation)) {
|
||||
delete raw.isolation;
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
const payloadKind =
|
||||
payloadRecord && typeof payloadRecord.kind === "string" ? payloadRecord.kind : "";
|
||||
const rawSessionTarget = typeof raw.sessionTarget === "string" ? raw.sessionTarget.trim() : "";
|
||||
const loweredSessionTarget = rawSessionTarget.toLowerCase();
|
||||
if (loweredSessionTarget === "main" || loweredSessionTarget === "isolated") {
|
||||
if (raw.sessionTarget !== loweredSessionTarget) {
|
||||
raw.sessionTarget = loweredSessionTarget;
|
||||
mutated = true;
|
||||
}
|
||||
} else if (loweredSessionTarget.startsWith("session:")) {
|
||||
const customSessionId = rawSessionTarget.slice(8).trim();
|
||||
if (customSessionId) {
|
||||
const normalizedSessionTarget = `session:${customSessionId}`;
|
||||
if (raw.sessionTarget !== normalizedSessionTarget) {
|
||||
raw.sessionTarget = normalizedSessionTarget;
|
||||
mutated = true;
|
||||
}
|
||||
}
|
||||
} else if (loweredSessionTarget === "current") {
|
||||
if (raw.sessionTarget !== "isolated") {
|
||||
raw.sessionTarget = "isolated";
|
||||
mutated = true;
|
||||
}
|
||||
} else {
|
||||
const inferredSessionTarget = payloadKind === "agentTurn" ? "isolated" : "main";
|
||||
if (raw.sessionTarget !== inferredSessionTarget) {
|
||||
raw.sessionTarget = inferredSessionTarget;
|
||||
mutated = true;
|
||||
}
|
||||
}
|
||||
|
||||
const sessionTarget =
|
||||
typeof raw.sessionTarget === "string" ? raw.sessionTarget.trim().toLowerCase() : "";
|
||||
const isIsolatedAgentTurn =
|
||||
sessionTarget === "isolated" ||
|
||||
sessionTarget === "current" ||
|
||||
sessionTarget.startsWith("session:") ||
|
||||
(sessionTarget === "" && payloadKind === "agentTurn");
|
||||
const hasDelivery = delivery && typeof delivery === "object" && !Array.isArray(delivery);
|
||||
const normalizedLegacy = normalizeLegacyDeliveryInput({
|
||||
delivery: hasDelivery ? (delivery as Record<string, unknown>) : null,
|
||||
payload: payloadRecord,
|
||||
});
|
||||
|
||||
if (isIsolatedAgentTurn && payloadKind === "agentTurn") {
|
||||
if (!hasDelivery && normalizedLegacy.delivery) {
|
||||
raw.delivery = normalizedLegacy.delivery;
|
||||
mutated = true;
|
||||
} else if (!hasDelivery) {
|
||||
raw.delivery = { mode: "announce" };
|
||||
mutated = true;
|
||||
} else if (normalizedLegacy.mutated && normalizedLegacy.delivery) {
|
||||
raw.delivery = normalizedLegacy.delivery;
|
||||
mutated = true;
|
||||
}
|
||||
} else if (normalizedLegacy.mutated && normalizedLegacy.delivery) {
|
||||
raw.delivery = normalizedLegacy.delivery;
|
||||
mutated = true;
|
||||
}
|
||||
}
|
||||
|
||||
return { issues, jobs, mutated };
|
||||
}
|
||||
Loading…
Reference in New Issue