refactor(outbound): split recovery counters and normalize legacy retry entries

This commit is contained in:
Peter Steinberger 2026-02-26 22:42:11 +01:00
parent 5dd264d2fb
commit 10c7ae1eca
2 changed files with 181 additions and 31 deletions

View File

@ -51,6 +51,13 @@ export interface QueuedDelivery extends QueuedDeliveryPayload {
lastError?: string;
}
export type RecoverySummary = {
recovered: number;
failed: number;
skippedMaxRetries: number;
deferredBackoff: number;
};
function resolveQueueDir(stateDir?: string): string {
const base = stateDir ?? resolveStateDir();
return path.join(base, QUEUE_DIRNAME);
@ -161,7 +168,17 @@ export async function loadPendingDeliveries(stateDir?: string): Promise<QueuedDe
continue;
}
const raw = await fs.promises.readFile(filePath, "utf-8");
entries.push(JSON.parse(raw));
const parsed = JSON.parse(raw) as QueuedDelivery;
const { entry, migrated } = normalizeLegacyQueuedDeliveryEntry(parsed);
if (migrated) {
const tmp = `${filePath}.${process.pid}.tmp`;
await fs.promises.writeFile(tmp, JSON.stringify(entry, null, 2), {
encoding: "utf-8",
mode: 0o600,
});
await fs.promises.rename(tmp, filePath);
}
entries.push(entry);
} catch {
// Skip malformed or inaccessible entries.
}
@ -187,6 +204,59 @@ export function computeBackoffMs(retryCount: number): number {
return BACKOFF_MS[Math.min(retryCount - 1, BACKOFF_MS.length - 1)] ?? BACKOFF_MS.at(-1) ?? 0;
}
export function isEntryEligibleForRecoveryRetry(
entry: QueuedDelivery,
now: number,
): { eligible: true } | { eligible: false; remainingBackoffMs: number } {
const backoff = computeBackoffMs(entry.retryCount + 1);
if (backoff <= 0) {
return { eligible: true };
}
const firstReplayAfterCrash = entry.retryCount === 0 && entry.lastAttemptAt === undefined;
if (firstReplayAfterCrash) {
return { eligible: true };
}
const hasAttemptTimestamp =
typeof entry.lastAttemptAt === "number" &&
Number.isFinite(entry.lastAttemptAt) &&
entry.lastAttemptAt > 0;
const baseAttemptAt = hasAttemptTimestamp
? (entry.lastAttemptAt ?? entry.enqueuedAt)
: entry.enqueuedAt;
const nextEligibleAt = baseAttemptAt + backoff;
if (now >= nextEligibleAt) {
return { eligible: true };
}
return { eligible: false, remainingBackoffMs: nextEligibleAt - now };
}
function normalizeLegacyQueuedDeliveryEntry(entry: QueuedDelivery): {
entry: QueuedDelivery;
migrated: boolean;
} {
const hasAttemptTimestamp =
typeof entry.lastAttemptAt === "number" &&
Number.isFinite(entry.lastAttemptAt) &&
entry.lastAttemptAt > 0;
if (hasAttemptTimestamp || entry.retryCount <= 0) {
return { entry, migrated: false };
}
const hasEnqueuedTimestamp =
typeof entry.enqueuedAt === "number" &&
Number.isFinite(entry.enqueuedAt) &&
entry.enqueuedAt > 0;
if (!hasEnqueuedTimestamp) {
return { entry, migrated: false };
}
return {
entry: {
...entry,
lastAttemptAt: entry.enqueuedAt,
},
migrated: true,
};
}
export type DeliverFn = (
params: {
cfg: OpenClawConfig;
@ -212,10 +282,10 @@ export async function recoverPendingDeliveries(opts: {
stateDir?: string;
/** Maximum wall-clock time for recovery in ms. Remaining entries are deferred to next restart. Default: 60 000. */
maxRecoveryMs?: number;
}): Promise<{ recovered: number; failed: number; skipped: number }> {
}): Promise<RecoverySummary> {
const pending = await loadPendingDeliveries(opts.stateDir);
if (pending.length === 0) {
return { recovered: 0, failed: 0, skipped: 0 };
return { recovered: 0, failed: 0, skippedMaxRetries: 0, deferredBackoff: 0 };
}
// Process oldest first.
@ -227,13 +297,13 @@ export async function recoverPendingDeliveries(opts: {
let recovered = 0;
let failed = 0;
let skipped = 0;
let deferred = 0;
let skippedMaxRetries = 0;
let deferredBackoff = 0;
for (const entry of pending) {
const now = Date.now();
if (now >= deadline) {
const deferred = pending.length - recovered - failed - skipped;
const deferred = pending.length - recovered - failed - skippedMaxRetries - deferredBackoff;
opts.log.warn(`Recovery time budget exceeded — ${deferred} entries deferred to next restart`);
break;
}
@ -246,24 +316,17 @@ export async function recoverPendingDeliveries(opts: {
} catch (err) {
opts.log.error(`Failed to move entry ${entry.id} to failed/: ${String(err)}`);
}
skipped += 1;
skippedMaxRetries += 1;
continue;
}
const backoff = computeBackoffMs(entry.retryCount + 1);
if (backoff > 0) {
const firstReplayAfterCrash = entry.retryCount === 0 && entry.lastAttemptAt === undefined;
if (!firstReplayAfterCrash) {
const baseAttemptAt = entry.lastAttemptAt ?? entry.enqueuedAt;
const nextEligibleAt = baseAttemptAt + backoff;
if (now < nextEligibleAt) {
deferred += 1;
opts.log.info(
`Delivery ${entry.id} not ready for retry yet — backoff ${nextEligibleAt - now}ms remaining`,
);
continue;
}
}
const retryEligibility = isEntryEligibleForRecoveryRetry(entry, now);
if (!retryEligibility.eligible) {
deferredBackoff += 1;
opts.log.info(
`Delivery ${entry.id} not ready for retry yet — backoff ${retryEligibility.remainingBackoffMs}ms remaining`,
);
continue;
}
try {
@ -307,9 +370,9 @@ export async function recoverPendingDeliveries(opts: {
}
opts.log.info(
`Delivery recovery complete: ${recovered} recovered, ${failed} failed, ${skipped} skipped (max retries), ${deferred} deferred (backoff)`,
`Delivery recovery complete: ${recovered} recovered, ${failed} failed, ${skippedMaxRetries} skipped (max retries), ${deferredBackoff} deferred (backoff)`,
);
return { recovered, failed, skipped };
return { recovered, failed, skippedMaxRetries, deferredBackoff };
}
export { MAX_RETRIES };

View File

@ -11,6 +11,7 @@ import {
type DeliverFn,
enqueueDelivery,
failDelivery,
isEntryEligibleForRecoveryRetry,
isPermanentDeliveryError,
loadPendingDeliveries,
MAX_RETRIES,
@ -183,6 +184,25 @@ describe("delivery-queue", () => {
const entries = await loadPendingDeliveries(tmpDir);
expect(entries).toHaveLength(2);
});
it("backfills lastAttemptAt for legacy retry entries during load", async () => {
const id = await enqueueDelivery(
{ channel: "whatsapp", to: "+1", payloads: [{ text: "legacy" }] },
tmpDir,
);
const filePath = path.join(tmpDir, "delivery-queue", `${id}.json`);
const legacyEntry = JSON.parse(fs.readFileSync(filePath, "utf-8"));
legacyEntry.retryCount = 2;
delete legacyEntry.lastAttemptAt;
fs.writeFileSync(filePath, JSON.stringify(legacyEntry), "utf-8");
const entries = await loadPendingDeliveries(tmpDir);
expect(entries).toHaveLength(1);
expect(entries[0]?.lastAttemptAt).toBe(entries[0]?.enqueuedAt);
const persisted = JSON.parse(fs.readFileSync(filePath, "utf-8"));
expect(persisted.lastAttemptAt).toBe(persisted.enqueuedAt);
});
});
describe("computeBackoffMs", () => {
@ -205,6 +225,45 @@ describe("delivery-queue", () => {
});
});
describe("isEntryEligibleForRecoveryRetry", () => {
it("allows first replay after crash for retryCount=0 without lastAttemptAt", () => {
const now = Date.now();
const result = isEntryEligibleForRecoveryRetry(
{
id: "entry-1",
channel: "whatsapp",
to: "+1",
payloads: [{ text: "a" }],
enqueuedAt: now,
retryCount: 0,
},
now,
);
expect(result).toEqual({ eligible: true });
});
it("defers retry entries until backoff window elapses", () => {
const now = Date.now();
const result = isEntryEligibleForRecoveryRetry(
{
id: "entry-2",
channel: "whatsapp",
to: "+1",
payloads: [{ text: "a" }],
enqueuedAt: now - 30_000,
retryCount: 3,
lastAttemptAt: now,
},
now,
);
expect(result.eligible).toBe(false);
if (result.eligible) {
throw new Error("Expected ineligible retry entry");
}
expect(result.remainingBackoffMs).toBeGreaterThan(0);
});
});
describe("recoverPendingDeliveries", () => {
const baseCfg = {};
const createLog = () => ({ info: vi.fn(), warn: vi.fn(), error: vi.fn() });
@ -257,7 +316,8 @@ describe("delivery-queue", () => {
expect(deliver).toHaveBeenCalledTimes(2);
expect(result.recovered).toBe(2);
expect(result.failed).toBe(0);
expect(result.skipped).toBe(0);
expect(result.skippedMaxRetries).toBe(0);
expect(result.deferredBackoff).toBe(0);
// Queue should be empty after recovery.
const remaining = await loadPendingDeliveries(tmpDir);
@ -276,7 +336,8 @@ describe("delivery-queue", () => {
const { result } = await runRecovery({ deliver });
expect(deliver).not.toHaveBeenCalled();
expect(result.skipped).toBe(1);
expect(result.skippedMaxRetries).toBe(1);
expect(result.deferredBackoff).toBe(0);
// Entry should be in failed/ directory.
const failedDir = path.join(tmpDir, "delivery-queue", "failed");
@ -376,7 +437,8 @@ describe("delivery-queue", () => {
expect(deliver).not.toHaveBeenCalled();
expect(result.recovered).toBe(0);
expect(result.failed).toBe(0);
expect(result.skipped).toBe(0);
expect(result.skippedMaxRetries).toBe(0);
expect(result.deferredBackoff).toBe(0);
// All entries should still be in the queue.
const remaining = await loadPendingDeliveries(tmpDir);
@ -400,7 +462,12 @@ describe("delivery-queue", () => {
});
expect(deliver).not.toHaveBeenCalled();
expect(result).toEqual({ recovered: 0, failed: 0, skipped: 0 });
expect(result).toEqual({
recovered: 0,
failed: 0,
skippedMaxRetries: 0,
deferredBackoff: 1,
});
const remaining = await loadPendingDeliveries(tmpDir);
expect(remaining).toHaveLength(1);
@ -425,7 +492,12 @@ describe("delivery-queue", () => {
const deliver = vi.fn().mockResolvedValue([]);
const { result } = await runRecovery({ deliver, maxRecoveryMs: 60_000 });
expect(result).toEqual({ recovered: 1, failed: 0, skipped: 0 });
expect(result).toEqual({
recovered: 1,
failed: 0,
skippedMaxRetries: 0,
deferredBackoff: 1,
});
expect(deliver).toHaveBeenCalledTimes(1);
expect(deliver).toHaveBeenCalledWith(
expect.objectContaining({ channel: "telegram", to: "2", skipQueue: true }),
@ -449,13 +521,23 @@ describe("delivery-queue", () => {
const firstDeliver = vi.fn().mockResolvedValue([]);
const firstRun = await runRecovery({ deliver: firstDeliver, maxRecoveryMs: 60_000 });
expect(firstRun.result).toEqual({ recovered: 0, failed: 0, skipped: 0 });
expect(firstRun.result).toEqual({
recovered: 0,
failed: 0,
skippedMaxRetries: 0,
deferredBackoff: 1,
});
expect(firstDeliver).not.toHaveBeenCalled();
vi.setSystemTime(new Date(start.getTime() + 600_000 + 1));
const secondDeliver = vi.fn().mockResolvedValue([]);
const secondRun = await runRecovery({ deliver: secondDeliver, maxRecoveryMs: 60_000 });
expect(secondRun.result).toEqual({ recovered: 1, failed: 0, skipped: 0 });
expect(secondRun.result).toEqual({
recovered: 1,
failed: 0,
skippedMaxRetries: 0,
deferredBackoff: 0,
});
expect(secondDeliver).toHaveBeenCalledTimes(1);
const remaining = await loadPendingDeliveries(tmpDir);
@ -468,7 +550,12 @@ describe("delivery-queue", () => {
const deliver = vi.fn();
const { result } = await runRecovery({ deliver });
expect(result).toEqual({ recovered: 0, failed: 0, skipped: 0 });
expect(result).toEqual({
recovered: 0,
failed: 0,
skippedMaxRetries: 0,
deferredBackoff: 0,
});
expect(deliver).not.toHaveBeenCalled();
});
});