diff --git a/src/infra/outbound/delivery-queue.ts b/src/infra/outbound/delivery-queue.ts index 4928b0ff62f..1e954ea8e39 100644 --- a/src/infra/outbound/delivery-queue.ts +++ b/src/infra/outbound/delivery-queue.ts @@ -51,6 +51,13 @@ export interface QueuedDelivery extends QueuedDeliveryPayload { lastError?: string; } +export type RecoverySummary = { + recovered: number; + failed: number; + skippedMaxRetries: number; + deferredBackoff: number; +}; + function resolveQueueDir(stateDir?: string): string { const base = stateDir ?? resolveStateDir(); return path.join(base, QUEUE_DIRNAME); @@ -161,7 +168,17 @@ export async function loadPendingDeliveries(stateDir?: string): Promise 0; + const baseAttemptAt = hasAttemptTimestamp + ? (entry.lastAttemptAt ?? entry.enqueuedAt) + : entry.enqueuedAt; + const nextEligibleAt = baseAttemptAt + backoff; + if (now >= nextEligibleAt) { + return { eligible: true }; + } + return { eligible: false, remainingBackoffMs: nextEligibleAt - now }; +} + +function normalizeLegacyQueuedDeliveryEntry(entry: QueuedDelivery): { + entry: QueuedDelivery; + migrated: boolean; +} { + const hasAttemptTimestamp = + typeof entry.lastAttemptAt === "number" && + Number.isFinite(entry.lastAttemptAt) && + entry.lastAttemptAt > 0; + if (hasAttemptTimestamp || entry.retryCount <= 0) { + return { entry, migrated: false }; + } + const hasEnqueuedTimestamp = + typeof entry.enqueuedAt === "number" && + Number.isFinite(entry.enqueuedAt) && + entry.enqueuedAt > 0; + if (!hasEnqueuedTimestamp) { + return { entry, migrated: false }; + } + return { + entry: { + ...entry, + lastAttemptAt: entry.enqueuedAt, + }, + migrated: true, + }; +} + export type DeliverFn = ( params: { cfg: OpenClawConfig; @@ -212,10 +282,10 @@ export async function recoverPendingDeliveries(opts: { stateDir?: string; /** Maximum wall-clock time for recovery in ms. Remaining entries are deferred to next restart. Default: 60 000. */ maxRecoveryMs?: number; -}): Promise<{ recovered: number; failed: number; skipped: number }> { +}): Promise { const pending = await loadPendingDeliveries(opts.stateDir); if (pending.length === 0) { - return { recovered: 0, failed: 0, skipped: 0 }; + return { recovered: 0, failed: 0, skippedMaxRetries: 0, deferredBackoff: 0 }; } // Process oldest first. @@ -227,13 +297,13 @@ export async function recoverPendingDeliveries(opts: { let recovered = 0; let failed = 0; - let skipped = 0; - let deferred = 0; + let skippedMaxRetries = 0; + let deferredBackoff = 0; for (const entry of pending) { const now = Date.now(); if (now >= deadline) { - const deferred = pending.length - recovered - failed - skipped; + const deferred = pending.length - recovered - failed - skippedMaxRetries - deferredBackoff; opts.log.warn(`Recovery time budget exceeded — ${deferred} entries deferred to next restart`); break; } @@ -246,24 +316,17 @@ export async function recoverPendingDeliveries(opts: { } catch (err) { opts.log.error(`Failed to move entry ${entry.id} to failed/: ${String(err)}`); } - skipped += 1; + skippedMaxRetries += 1; continue; } - const backoff = computeBackoffMs(entry.retryCount + 1); - if (backoff > 0) { - const firstReplayAfterCrash = entry.retryCount === 0 && entry.lastAttemptAt === undefined; - if (!firstReplayAfterCrash) { - const baseAttemptAt = entry.lastAttemptAt ?? entry.enqueuedAt; - const nextEligibleAt = baseAttemptAt + backoff; - if (now < nextEligibleAt) { - deferred += 1; - opts.log.info( - `Delivery ${entry.id} not ready for retry yet — backoff ${nextEligibleAt - now}ms remaining`, - ); - continue; - } - } + const retryEligibility = isEntryEligibleForRecoveryRetry(entry, now); + if (!retryEligibility.eligible) { + deferredBackoff += 1; + opts.log.info( + `Delivery ${entry.id} not ready for retry yet — backoff ${retryEligibility.remainingBackoffMs}ms remaining`, + ); + continue; } try { @@ -307,9 +370,9 @@ export async function recoverPendingDeliveries(opts: { } opts.log.info( - `Delivery recovery complete: ${recovered} recovered, ${failed} failed, ${skipped} skipped (max retries), ${deferred} deferred (backoff)`, + `Delivery recovery complete: ${recovered} recovered, ${failed} failed, ${skippedMaxRetries} skipped (max retries), ${deferredBackoff} deferred (backoff)`, ); - return { recovered, failed, skipped }; + return { recovered, failed, skippedMaxRetries, deferredBackoff }; } export { MAX_RETRIES }; diff --git a/src/infra/outbound/outbound.test.ts b/src/infra/outbound/outbound.test.ts index bce1c246147..f15f3de3730 100644 --- a/src/infra/outbound/outbound.test.ts +++ b/src/infra/outbound/outbound.test.ts @@ -11,6 +11,7 @@ import { type DeliverFn, enqueueDelivery, failDelivery, + isEntryEligibleForRecoveryRetry, isPermanentDeliveryError, loadPendingDeliveries, MAX_RETRIES, @@ -183,6 +184,25 @@ describe("delivery-queue", () => { const entries = await loadPendingDeliveries(tmpDir); expect(entries).toHaveLength(2); }); + + it("backfills lastAttemptAt for legacy retry entries during load", async () => { + const id = await enqueueDelivery( + { channel: "whatsapp", to: "+1", payloads: [{ text: "legacy" }] }, + tmpDir, + ); + const filePath = path.join(tmpDir, "delivery-queue", `${id}.json`); + const legacyEntry = JSON.parse(fs.readFileSync(filePath, "utf-8")); + legacyEntry.retryCount = 2; + delete legacyEntry.lastAttemptAt; + fs.writeFileSync(filePath, JSON.stringify(legacyEntry), "utf-8"); + + const entries = await loadPendingDeliveries(tmpDir); + expect(entries).toHaveLength(1); + expect(entries[0]?.lastAttemptAt).toBe(entries[0]?.enqueuedAt); + + const persisted = JSON.parse(fs.readFileSync(filePath, "utf-8")); + expect(persisted.lastAttemptAt).toBe(persisted.enqueuedAt); + }); }); describe("computeBackoffMs", () => { @@ -205,6 +225,45 @@ describe("delivery-queue", () => { }); }); + describe("isEntryEligibleForRecoveryRetry", () => { + it("allows first replay after crash for retryCount=0 without lastAttemptAt", () => { + const now = Date.now(); + const result = isEntryEligibleForRecoveryRetry( + { + id: "entry-1", + channel: "whatsapp", + to: "+1", + payloads: [{ text: "a" }], + enqueuedAt: now, + retryCount: 0, + }, + now, + ); + expect(result).toEqual({ eligible: true }); + }); + + it("defers retry entries until backoff window elapses", () => { + const now = Date.now(); + const result = isEntryEligibleForRecoveryRetry( + { + id: "entry-2", + channel: "whatsapp", + to: "+1", + payloads: [{ text: "a" }], + enqueuedAt: now - 30_000, + retryCount: 3, + lastAttemptAt: now, + }, + now, + ); + expect(result.eligible).toBe(false); + if (result.eligible) { + throw new Error("Expected ineligible retry entry"); + } + expect(result.remainingBackoffMs).toBeGreaterThan(0); + }); + }); + describe("recoverPendingDeliveries", () => { const baseCfg = {}; const createLog = () => ({ info: vi.fn(), warn: vi.fn(), error: vi.fn() }); @@ -257,7 +316,8 @@ describe("delivery-queue", () => { expect(deliver).toHaveBeenCalledTimes(2); expect(result.recovered).toBe(2); expect(result.failed).toBe(0); - expect(result.skipped).toBe(0); + expect(result.skippedMaxRetries).toBe(0); + expect(result.deferredBackoff).toBe(0); // Queue should be empty after recovery. const remaining = await loadPendingDeliveries(tmpDir); @@ -276,7 +336,8 @@ describe("delivery-queue", () => { const { result } = await runRecovery({ deliver }); expect(deliver).not.toHaveBeenCalled(); - expect(result.skipped).toBe(1); + expect(result.skippedMaxRetries).toBe(1); + expect(result.deferredBackoff).toBe(0); // Entry should be in failed/ directory. const failedDir = path.join(tmpDir, "delivery-queue", "failed"); @@ -376,7 +437,8 @@ describe("delivery-queue", () => { expect(deliver).not.toHaveBeenCalled(); expect(result.recovered).toBe(0); expect(result.failed).toBe(0); - expect(result.skipped).toBe(0); + expect(result.skippedMaxRetries).toBe(0); + expect(result.deferredBackoff).toBe(0); // All entries should still be in the queue. const remaining = await loadPendingDeliveries(tmpDir); @@ -400,7 +462,12 @@ describe("delivery-queue", () => { }); expect(deliver).not.toHaveBeenCalled(); - expect(result).toEqual({ recovered: 0, failed: 0, skipped: 0 }); + expect(result).toEqual({ + recovered: 0, + failed: 0, + skippedMaxRetries: 0, + deferredBackoff: 1, + }); const remaining = await loadPendingDeliveries(tmpDir); expect(remaining).toHaveLength(1); @@ -425,7 +492,12 @@ describe("delivery-queue", () => { const deliver = vi.fn().mockResolvedValue([]); const { result } = await runRecovery({ deliver, maxRecoveryMs: 60_000 }); - expect(result).toEqual({ recovered: 1, failed: 0, skipped: 0 }); + expect(result).toEqual({ + recovered: 1, + failed: 0, + skippedMaxRetries: 0, + deferredBackoff: 1, + }); expect(deliver).toHaveBeenCalledTimes(1); expect(deliver).toHaveBeenCalledWith( expect.objectContaining({ channel: "telegram", to: "2", skipQueue: true }), @@ -449,13 +521,23 @@ describe("delivery-queue", () => { const firstDeliver = vi.fn().mockResolvedValue([]); const firstRun = await runRecovery({ deliver: firstDeliver, maxRecoveryMs: 60_000 }); - expect(firstRun.result).toEqual({ recovered: 0, failed: 0, skipped: 0 }); + expect(firstRun.result).toEqual({ + recovered: 0, + failed: 0, + skippedMaxRetries: 0, + deferredBackoff: 1, + }); expect(firstDeliver).not.toHaveBeenCalled(); vi.setSystemTime(new Date(start.getTime() + 600_000 + 1)); const secondDeliver = vi.fn().mockResolvedValue([]); const secondRun = await runRecovery({ deliver: secondDeliver, maxRecoveryMs: 60_000 }); - expect(secondRun.result).toEqual({ recovered: 1, failed: 0, skipped: 0 }); + expect(secondRun.result).toEqual({ + recovered: 1, + failed: 0, + skippedMaxRetries: 0, + deferredBackoff: 0, + }); expect(secondDeliver).toHaveBeenCalledTimes(1); const remaining = await loadPendingDeliveries(tmpDir); @@ -468,7 +550,12 @@ describe("delivery-queue", () => { const deliver = vi.fn(); const { result } = await runRecovery({ deliver }); - expect(result).toEqual({ recovered: 0, failed: 0, skipped: 0 }); + expect(result).toEqual({ + recovered: 0, + failed: 0, + skippedMaxRetries: 0, + deferredBackoff: 0, + }); expect(deliver).not.toHaveBeenCalled(); }); });