refactor(outbound): split delivery queue storage and recovery

This commit is contained in:
Peter Steinberger 2026-03-23 01:56:23 +00:00
parent 103098513f
commit 562e4a1791
12 changed files with 1153 additions and 1668 deletions

View File

@ -1,9 +1,9 @@
#!/usr/bin/env node
import { spawnSync } from "node:child_process";
import fs from "node:fs";
import path from "node:path";
import { fileURLToPath } from "node:url";
import { computeBaseConfigSchemaResponse } from "../src/config/schema-base.js";
import { formatGeneratedModule } from "./lib/format-generated-module.mjs";
const GENERATED_BY = "scripts/generate-base-config-schema.ts";
const DEFAULT_OUTPUT_PATH = "src/config/schema.base.generated.ts";
@ -18,26 +18,11 @@ function readIfExists(filePath: string): string | null {
function formatTypeScriptModule(source: string, outputPath: string): string {
const repoRoot = path.resolve(path.dirname(fileURLToPath(import.meta.url)), "..");
const formatter = spawnSync(
process.platform === "win32" ? "pnpm" : "pnpm",
["exec", "oxfmt", "--stdin-filepath", outputPath],
{
cwd: repoRoot,
input: source,
encoding: "utf8",
// Windows requires a shell to launch package-manager shim scripts reliably.
...(process.platform === "win32" ? { shell: true } : {}),
},
);
if (formatter.status !== 0) {
const details =
formatter.stderr?.trim() ||
formatter.stdout?.trim() ||
formatter.error?.message ||
"unknown formatter failure";
throw new Error(`failed to format generated base config schema: ${details}`);
}
return formatter.stdout;
return formatGeneratedModule(source, {
repoRoot,
outputPath,
errorLabel: "base config schema",
});
}
export function renderBaseConfigSchemaModule(params?: { generatedAt?: string }): string {

View File

@ -1,14 +1,13 @@
import { spawnSync } from "node:child_process";
import fs from "node:fs";
import path from "node:path";
import { pathToFileURL } from "node:url";
import { formatGeneratedModule } from "./lib/format-generated-module.mjs";
import { writeTextFileIfChanged } from "./runtime-postbuild-shared.mjs";
const GENERATED_BY = "scripts/generate-bundled-plugin-metadata.mjs";
const DEFAULT_OUTPUT_PATH = "src/plugins/bundled-plugin-metadata.generated.ts";
const MANIFEST_KEY = "openclaw";
const FORMATTER_CWD = path.resolve(import.meta.dirname, "..");
const OXFMT_BIN = path.join(FORMATTER_CWD, "node_modules", ".bin", "oxfmt");
const CANONICAL_PACKAGE_ID_ALIASES = {
"elevenlabs-speech": "elevenlabs",
"microsoft-speech": "microsoft",
@ -128,28 +127,11 @@ function normalizePluginManifest(raw) {
}
function formatTypeScriptModule(source, { outputPath }) {
const formatterPath = path.relative(FORMATTER_CWD, outputPath) || outputPath;
const useDirectFormatter = process.platform !== "win32" && fs.existsSync(OXFMT_BIN);
const command = useDirectFormatter ? OXFMT_BIN : "pnpm";
const args = useDirectFormatter
? ["--stdin-filepath", formatterPath]
: ["exec", "oxfmt", "--stdin-filepath", formatterPath];
const formatter = spawnSync(command, args, {
cwd: FORMATTER_CWD,
input: source,
encoding: "utf8",
// Windows requires a shell to launch package-manager shim scripts reliably.
...(process.platform === "win32" ? { shell: true } : {}),
return formatGeneratedModule(source, {
repoRoot: FORMATTER_CWD,
outputPath,
errorLabel: "bundled plugin metadata",
});
if (formatter.status !== 0) {
const details =
formatter.stderr?.trim() ||
formatter.stdout?.trim() ||
formatter.error?.message ||
"unknown formatter failure";
throw new Error(`failed to format generated bundled plugin metadata: ${details}`);
}
return formatter.stdout;
}
export function collectBundledPluginMetadata(params = {}) {

View File

@ -0,0 +1,34 @@
import { spawnSync } from "node:child_process";
import fs from "node:fs";
import path from "node:path";
export function formatGeneratedModule(source, { repoRoot, outputPath, errorLabel }) {
const resolvedRepoRoot = path.resolve(repoRoot);
const resolvedOutputPath = path.resolve(
resolvedRepoRoot,
path.isAbsolute(outputPath) ? path.relative(resolvedRepoRoot, outputPath) : outputPath,
);
const formatterPath = path.relative(resolvedRepoRoot, resolvedOutputPath) || resolvedOutputPath;
const directFormatterPath = path.join(resolvedRepoRoot, "node_modules", ".bin", "oxfmt");
const useDirectFormatter = process.platform !== "win32" && fs.existsSync(directFormatterPath);
const command = useDirectFormatter ? directFormatterPath : "pnpm";
const args = useDirectFormatter
? ["--stdin-filepath", formatterPath]
: ["exec", "oxfmt", "--stdin-filepath", formatterPath];
const formatter = spawnSync(command, args, {
cwd: resolvedRepoRoot,
input: source,
encoding: "utf8",
// Windows requires a shell to launch package-manager shim scripts reliably.
...(process.platform === "win32" ? { shell: true } : {}),
});
if (formatter.status !== 0) {
const details =
formatter.stderr?.trim() ||
formatter.stdout?.trim() ||
formatter.error?.message ||
"unknown formatter failure";
throw new Error(`failed to format generated ${errorLabel}: ${details}`);
}
return formatter.stdout;
}

View File

@ -0,0 +1,220 @@
import type { OpenClawConfig } from "../../config/config.js";
import {
ackDelivery,
failDelivery,
loadPendingDeliveries,
moveToFailed,
type QueuedDelivery,
type QueuedDeliveryPayload,
} from "./delivery-queue-storage.js";
export type RecoverySummary = {
recovered: number;
failed: number;
skippedMaxRetries: number;
deferredBackoff: number;
};
export type DeliverFn = (
params: {
cfg: OpenClawConfig;
} & QueuedDeliveryPayload & {
skipQueue?: boolean;
},
) => Promise<unknown>;
export interface RecoveryLogger {
info(msg: string): void;
warn(msg: string): void;
error(msg: string): void;
}
const MAX_RETRIES = 5;
/** Backoff delays in milliseconds indexed by retry count (1-based). */
const BACKOFF_MS: readonly number[] = [
5_000, // retry 1: 5s
25_000, // retry 2: 25s
120_000, // retry 3: 2m
600_000, // retry 4: 10m
];
const PERMANENT_ERROR_PATTERNS: readonly RegExp[] = [
/no conversation reference found/i,
/chat not found/i,
/user not found/i,
/bot was blocked by the user/i,
/forbidden: bot was kicked/i,
/chat_id is empty/i,
/recipient is not a valid/i,
/outbound not configured for channel/i,
/ambiguous discord recipient/i,
];
function createEmptyRecoverySummary(): RecoverySummary {
return {
recovered: 0,
failed: 0,
skippedMaxRetries: 0,
deferredBackoff: 0,
};
}
function buildRecoveryDeliverParams(entry: QueuedDelivery, cfg: OpenClawConfig) {
return {
cfg,
channel: entry.channel,
to: entry.to,
accountId: entry.accountId,
payloads: entry.payloads,
threadId: entry.threadId,
replyToId: entry.replyToId,
bestEffort: entry.bestEffort,
gifPlayback: entry.gifPlayback,
forceDocument: entry.forceDocument,
silent: entry.silent,
mirror: entry.mirror,
skipQueue: true, // Prevent re-enqueueing during recovery.
} satisfies Parameters<DeliverFn>[0];
}
async function moveEntryToFailedWithLogging(
entryId: string,
log: RecoveryLogger,
stateDir?: string,
): Promise<void> {
try {
await moveToFailed(entryId, stateDir);
} catch (err) {
log.error(`Failed to move entry ${entryId} to failed/: ${String(err)}`);
}
}
async function deferRemainingEntriesForBudget(
entries: readonly QueuedDelivery[],
stateDir: string | undefined,
): Promise<void> {
// Increment retryCount so entries that are repeatedly deferred by the
// recovery budget eventually hit MAX_RETRIES and get pruned.
await Promise.allSettled(
entries.map((entry) => failDelivery(entry.id, "recovery time budget exceeded", stateDir)),
);
}
/** Compute the backoff delay in ms for a given retry count. */
export function computeBackoffMs(retryCount: number): number {
if (retryCount <= 0) {
return 0;
}
return BACKOFF_MS[Math.min(retryCount - 1, BACKOFF_MS.length - 1)] ?? BACKOFF_MS.at(-1) ?? 0;
}
export function isEntryEligibleForRecoveryRetry(
entry: QueuedDelivery,
now: number,
): { eligible: true } | { eligible: false; remainingBackoffMs: number } {
const backoff = computeBackoffMs(entry.retryCount + 1);
if (backoff <= 0) {
return { eligible: true };
}
const firstReplayAfterCrash = entry.retryCount === 0 && entry.lastAttemptAt === undefined;
if (firstReplayAfterCrash) {
return { eligible: true };
}
const hasAttemptTimestamp =
typeof entry.lastAttemptAt === "number" &&
Number.isFinite(entry.lastAttemptAt) &&
entry.lastAttemptAt > 0;
const baseAttemptAt = hasAttemptTimestamp
? (entry.lastAttemptAt ?? entry.enqueuedAt)
: entry.enqueuedAt;
const nextEligibleAt = baseAttemptAt + backoff;
if (now >= nextEligibleAt) {
return { eligible: true };
}
return { eligible: false, remainingBackoffMs: nextEligibleAt - now };
}
export function isPermanentDeliveryError(error: string): boolean {
return PERMANENT_ERROR_PATTERNS.some((re) => re.test(error));
}
/**
* On gateway startup, scan the delivery queue and retry any pending entries.
* Uses exponential backoff and moves entries that exceed MAX_RETRIES to failed/.
*/
export async function recoverPendingDeliveries(opts: {
deliver: DeliverFn;
log: RecoveryLogger;
cfg: OpenClawConfig;
stateDir?: string;
/** Maximum wall-clock time for recovery in ms. Remaining entries are deferred to next startup. Default: 60 000. */
maxRecoveryMs?: number;
}): Promise<RecoverySummary> {
const pending = await loadPendingDeliveries(opts.stateDir);
if (pending.length === 0) {
return createEmptyRecoverySummary();
}
pending.sort((a, b) => a.enqueuedAt - b.enqueuedAt);
opts.log.info(`Found ${pending.length} pending delivery entries — starting recovery`);
const deadline = Date.now() + (opts.maxRecoveryMs ?? 60_000);
const summary = createEmptyRecoverySummary();
for (let i = 0; i < pending.length; i++) {
const entry = pending[i];
const now = Date.now();
if (now >= deadline) {
opts.log.warn(`Recovery time budget exceeded — remaining entries deferred to next startup`);
await deferRemainingEntriesForBudget(pending.slice(i), opts.stateDir);
break;
}
if (entry.retryCount >= MAX_RETRIES) {
opts.log.warn(
`Delivery ${entry.id} exceeded max retries (${entry.retryCount}/${MAX_RETRIES}) — moving to failed/`,
);
await moveEntryToFailedWithLogging(entry.id, opts.log, opts.stateDir);
summary.skippedMaxRetries += 1;
continue;
}
const retryEligibility = isEntryEligibleForRecoveryRetry(entry, now);
if (!retryEligibility.eligible) {
summary.deferredBackoff += 1;
opts.log.info(
`Delivery ${entry.id} not ready for retry yet — backoff ${retryEligibility.remainingBackoffMs}ms remaining`,
);
continue;
}
try {
await opts.deliver(buildRecoveryDeliverParams(entry, opts.cfg));
await ackDelivery(entry.id, opts.stateDir);
summary.recovered += 1;
opts.log.info(`Recovered delivery ${entry.id} to ${entry.channel}:${entry.to}`);
} catch (err) {
const errMsg = err instanceof Error ? err.message : String(err);
if (isPermanentDeliveryError(errMsg)) {
opts.log.warn(`Delivery ${entry.id} hit permanent error — moving to failed/: ${errMsg}`);
await moveEntryToFailedWithLogging(entry.id, opts.log, opts.stateDir);
summary.failed += 1;
continue;
}
try {
await failDelivery(entry.id, errMsg, opts.stateDir);
} catch {
// Best-effort update.
}
summary.failed += 1;
opts.log.warn(`Retry failed for delivery ${entry.id}: ${errMsg}`);
}
}
opts.log.info(
`Delivery recovery complete: ${summary.recovered} recovered, ${summary.failed} failed, ${summary.skippedMaxRetries} skipped (max retries), ${summary.deferredBackoff} deferred (backoff)`,
);
return summary;
}
export { MAX_RETRIES };

View File

@ -0,0 +1,241 @@
import fs from "node:fs";
import path from "node:path";
import type { ReplyPayload } from "../../auto-reply/types.js";
import { resolveStateDir } from "../../config/paths.js";
import { generateSecureUuid } from "../secure-random.js";
import type { OutboundMirror } from "./mirror.js";
import type { OutboundChannel } from "./targets.js";
const QUEUE_DIRNAME = "delivery-queue";
const FAILED_DIRNAME = "failed";
export type QueuedDeliveryPayload = {
channel: Exclude<OutboundChannel, "none">;
to: string;
accountId?: string;
/**
* Original payloads before plugin hooks. On recovery, hooks re-run on these
* payloads this is intentional since hooks are stateless transforms and
* should produce the same result on replay.
*/
payloads: ReplyPayload[];
threadId?: string | number | null;
replyToId?: string | null;
bestEffort?: boolean;
gifPlayback?: boolean;
forceDocument?: boolean;
silent?: boolean;
mirror?: OutboundMirror;
};
export interface QueuedDelivery extends QueuedDeliveryPayload {
id: string;
enqueuedAt: number;
retryCount: number;
lastAttemptAt?: number;
lastError?: string;
}
function resolveQueueDir(stateDir?: string): string {
const base = stateDir ?? resolveStateDir();
return path.join(base, QUEUE_DIRNAME);
}
function resolveFailedDir(stateDir?: string): string {
return path.join(resolveQueueDir(stateDir), FAILED_DIRNAME);
}
function resolveQueueEntryPaths(
id: string,
stateDir?: string,
): {
jsonPath: string;
deliveredPath: string;
} {
const queueDir = resolveQueueDir(stateDir);
return {
jsonPath: path.join(queueDir, `${id}.json`),
deliveredPath: path.join(queueDir, `${id}.delivered`),
};
}
function getErrnoCode(err: unknown): string | null {
return err && typeof err === "object" && "code" in err
? String((err as { code?: unknown }).code)
: null;
}
async function unlinkBestEffort(filePath: string): Promise<void> {
try {
await fs.promises.unlink(filePath);
} catch {
// Best-effort cleanup.
}
}
async function writeQueueEntry(filePath: string, entry: QueuedDelivery): Promise<void> {
const tmp = `${filePath}.${process.pid}.tmp`;
await fs.promises.writeFile(tmp, JSON.stringify(entry, null, 2), {
encoding: "utf-8",
mode: 0o600,
});
await fs.promises.rename(tmp, filePath);
}
async function readQueueEntry(filePath: string): Promise<QueuedDelivery> {
return JSON.parse(await fs.promises.readFile(filePath, "utf-8")) as QueuedDelivery;
}
function normalizeLegacyQueuedDeliveryEntry(entry: QueuedDelivery): {
entry: QueuedDelivery;
migrated: boolean;
} {
const hasAttemptTimestamp =
typeof entry.lastAttemptAt === "number" &&
Number.isFinite(entry.lastAttemptAt) &&
entry.lastAttemptAt > 0;
if (hasAttemptTimestamp || entry.retryCount <= 0) {
return { entry, migrated: false };
}
const hasEnqueuedTimestamp =
typeof entry.enqueuedAt === "number" &&
Number.isFinite(entry.enqueuedAt) &&
entry.enqueuedAt > 0;
if (!hasEnqueuedTimestamp) {
return { entry, migrated: false };
}
return {
entry: {
...entry,
lastAttemptAt: entry.enqueuedAt,
},
migrated: true,
};
}
/** Ensure the queue directory (and failed/ subdirectory) exist. */
export async function ensureQueueDir(stateDir?: string): Promise<string> {
const queueDir = resolveQueueDir(stateDir);
await fs.promises.mkdir(queueDir, { recursive: true, mode: 0o700 });
await fs.promises.mkdir(resolveFailedDir(stateDir), { recursive: true, mode: 0o700 });
return queueDir;
}
/** Persist a delivery entry to disk before attempting send. Returns the entry ID. */
export async function enqueueDelivery(
params: QueuedDeliveryPayload,
stateDir?: string,
): Promise<string> {
const queueDir = await ensureQueueDir(stateDir);
const id = generateSecureUuid();
await writeQueueEntry(path.join(queueDir, `${id}.json`), {
id,
enqueuedAt: Date.now(),
channel: params.channel,
to: params.to,
accountId: params.accountId,
payloads: params.payloads,
threadId: params.threadId,
replyToId: params.replyToId,
bestEffort: params.bestEffort,
gifPlayback: params.gifPlayback,
forceDocument: params.forceDocument,
silent: params.silent,
mirror: params.mirror,
retryCount: 0,
});
return id;
}
/** Remove a successfully delivered entry from the queue.
*
* Uses a two-phase approach so that a crash between delivery and cleanup
* does not cause the message to be replayed on the next recovery scan:
* Phase 1: atomic rename {id}.json {id}.delivered
* Phase 2: unlink the .delivered marker
* If the process dies between phase 1 and phase 2 the marker is cleaned up
* by {@link loadPendingDeliveries} on the next startup without re-sending.
*/
export async function ackDelivery(id: string, stateDir?: string): Promise<void> {
const { jsonPath, deliveredPath } = resolveQueueEntryPaths(id, stateDir);
try {
// Phase 1: atomic rename marks the delivery as complete.
await fs.promises.rename(jsonPath, deliveredPath);
} catch (err) {
const code = getErrnoCode(err);
if (code === "ENOENT") {
// .json already gone — may have been renamed by a previous ack attempt.
// Try to clean up a leftover .delivered marker if present.
await unlinkBestEffort(deliveredPath);
return;
}
throw err;
}
// Phase 2: remove the marker file.
await unlinkBestEffort(deliveredPath);
}
/** Update a queue entry after a failed delivery attempt. */
export async function failDelivery(id: string, error: string, stateDir?: string): Promise<void> {
const filePath = path.join(resolveQueueDir(stateDir), `${id}.json`);
const entry = await readQueueEntry(filePath);
entry.retryCount += 1;
entry.lastAttemptAt = Date.now();
entry.lastError = error;
await writeQueueEntry(filePath, entry);
}
/** Load all pending delivery entries from the queue directory. */
export async function loadPendingDeliveries(stateDir?: string): Promise<QueuedDelivery[]> {
const queueDir = resolveQueueDir(stateDir);
let files: string[];
try {
files = await fs.promises.readdir(queueDir);
} catch (err) {
const code = getErrnoCode(err);
if (code === "ENOENT") {
return [];
}
throw err;
}
// Clean up .delivered markers left by ackDelivery if the process crashed
// between the rename and the unlink.
for (const file of files) {
if (file.endsWith(".delivered")) {
await unlinkBestEffort(path.join(queueDir, file));
}
}
const entries: QueuedDelivery[] = [];
for (const file of files) {
if (!file.endsWith(".json")) {
continue;
}
const filePath = path.join(queueDir, file);
try {
const stat = await fs.promises.stat(filePath);
if (!stat.isFile()) {
continue;
}
const { entry, migrated } = normalizeLegacyQueuedDeliveryEntry(
await readQueueEntry(filePath),
);
if (migrated) {
await writeQueueEntry(filePath, entry);
}
entries.push(entry);
} catch {
// Skip malformed or inaccessible entries.
}
}
return entries;
}
/** Move a queue entry to the failed/ subdirectory. */
export async function moveToFailed(id: string, stateDir?: string): Promise<void> {
const queueDir = resolveQueueDir(stateDir);
const failedDir = resolveFailedDir(stateDir);
await fs.promises.mkdir(failedDir, { recursive: true, mode: 0o700 });
await fs.promises.rename(path.join(queueDir, `${id}.json`), path.join(failedDir, `${id}.json`));
}

View File

@ -0,0 +1,90 @@
import { describe, expect, it } from "vitest";
import {
computeBackoffMs,
isEntryEligibleForRecoveryRetry,
isPermanentDeliveryError,
} from "./delivery-queue.js";
describe("delivery-queue policy", () => {
describe("isPermanentDeliveryError", () => {
it.each([
"No conversation reference found for user:abc",
"Telegram send failed: chat not found (chat_id=user:123)",
"user not found",
"Bot was blocked by the user",
"Forbidden: bot was kicked from the group chat",
"chat_id is empty",
"Outbound not configured for channel: msteams",
])("returns true for permanent error: %s", (msg) => {
expect(isPermanentDeliveryError(msg)).toBe(true);
});
it.each([
"network down",
"ETIMEDOUT",
"socket hang up",
"rate limited",
"500 Internal Server Error",
])("returns false for transient error: %s", (msg) => {
expect(isPermanentDeliveryError(msg)).toBe(false);
});
});
describe("computeBackoffMs", () => {
it("returns scheduled backoff values and clamps at max retry", () => {
const cases = [
{ retryCount: 0, expected: 0 },
{ retryCount: 1, expected: 5_000 },
{ retryCount: 2, expected: 25_000 },
{ retryCount: 3, expected: 120_000 },
{ retryCount: 4, expected: 600_000 },
{ retryCount: 5, expected: 600_000 },
] as const;
for (const testCase of cases) {
expect(computeBackoffMs(testCase.retryCount), String(testCase.retryCount)).toBe(
testCase.expected,
);
}
});
});
describe("isEntryEligibleForRecoveryRetry", () => {
it("allows first replay after crash for retryCount=0 without lastAttemptAt", () => {
const now = Date.now();
const result = isEntryEligibleForRecoveryRetry(
{
id: "entry-1",
channel: "whatsapp",
to: "+1",
payloads: [{ text: "a" }],
enqueuedAt: now,
retryCount: 0,
},
now,
);
expect(result).toEqual({ eligible: true });
});
it("defers retry entries until backoff window elapses", () => {
const now = Date.now();
const result = isEntryEligibleForRecoveryRetry(
{
id: "entry-2",
channel: "whatsapp",
to: "+1",
payloads: [{ text: "a" }],
enqueuedAt: now - 30_000,
retryCount: 3,
lastAttemptAt: now,
},
now,
);
expect(result.eligible).toBe(false);
if (result.eligible) {
throw new Error("Expected ineligible retry entry");
}
expect(result.remainingBackoffMs).toBeGreaterThan(0);
});
});
});

View File

@ -0,0 +1,287 @@
import fs from "node:fs";
import path from "node:path";
import { describe, expect, it, vi } from "vitest";
import {
enqueueDelivery,
loadPendingDeliveries,
MAX_RETRIES,
recoverPendingDeliveries,
} from "./delivery-queue.js";
import {
asDeliverFn,
createRecoveryLog,
installDeliveryQueueTmpDirHooks,
setQueuedEntryState,
} from "./delivery-queue.test-helpers.js";
describe("delivery-queue recovery", () => {
const { tmpDir } = installDeliveryQueueTmpDirHooks();
const baseCfg = {};
const enqueueCrashRecoveryEntries = async () => {
await enqueueDelivery({ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, tmpDir());
await enqueueDelivery({ channel: "telegram", to: "2", payloads: [{ text: "b" }] }, tmpDir());
};
const runRecovery = async ({
deliver,
log = createRecoveryLog(),
maxRecoveryMs,
}: {
deliver: ReturnType<typeof vi.fn>;
log?: ReturnType<typeof createRecoveryLog>;
maxRecoveryMs?: number;
}) => {
const result = await recoverPendingDeliveries({
deliver: asDeliverFn(deliver),
log,
cfg: baseCfg,
stateDir: tmpDir(),
...(maxRecoveryMs === undefined ? {} : { maxRecoveryMs }),
});
return { result, log };
};
it("recovers entries from a simulated crash", async () => {
await enqueueCrashRecoveryEntries();
const deliver = vi.fn().mockResolvedValue([]);
const { result } = await runRecovery({ deliver });
expect(deliver).toHaveBeenCalledTimes(2);
expect(result).toEqual({
recovered: 2,
failed: 0,
skippedMaxRetries: 0,
deferredBackoff: 0,
});
expect(await loadPendingDeliveries(tmpDir())).toHaveLength(0);
});
it("moves entries that exceeded max retries to failed/", async () => {
const id = await enqueueDelivery(
{ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] },
tmpDir(),
);
setQueuedEntryState(tmpDir(), id, { retryCount: MAX_RETRIES });
const deliver = vi.fn();
const { result } = await runRecovery({ deliver });
expect(deliver).not.toHaveBeenCalled();
expect(result.skippedMaxRetries).toBe(1);
expect(result.deferredBackoff).toBe(0);
expect(fs.existsSync(path.join(tmpDir(), "delivery-queue", "failed", `${id}.json`))).toBe(true);
});
it("increments retryCount on failed recovery attempt", async () => {
await enqueueDelivery({ channel: "slack", to: "#ch", payloads: [{ text: "x" }] }, tmpDir());
const deliver = vi.fn().mockRejectedValue(new Error("network down"));
const { result } = await runRecovery({ deliver });
expect(result.failed).toBe(1);
expect(result.recovered).toBe(0);
const entries = await loadPendingDeliveries(tmpDir());
expect(entries).toHaveLength(1);
expect(entries[0]?.retryCount).toBe(1);
expect(entries[0]?.lastError).toBe("network down");
});
it("moves entries to failed/ immediately on permanent delivery errors", async () => {
const id = await enqueueDelivery(
{ channel: "msteams", to: "user:abc", payloads: [{ text: "hi" }] },
tmpDir(),
);
const deliver = vi
.fn()
.mockRejectedValue(new Error("No conversation reference found for user:abc"));
const log = createRecoveryLog();
const { result } = await runRecovery({ deliver, log });
expect(result.failed).toBe(1);
expect(result.recovered).toBe(0);
expect(await loadPendingDeliveries(tmpDir())).toHaveLength(0);
expect(fs.existsSync(path.join(tmpDir(), "delivery-queue", "failed", `${id}.json`))).toBe(true);
expect(log.warn).toHaveBeenCalledWith(expect.stringContaining("permanent error"));
});
it("passes skipQueue: true to prevent re-enqueueing during recovery", async () => {
await enqueueDelivery({ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, tmpDir());
const deliver = vi.fn().mockResolvedValue([]);
await runRecovery({ deliver });
expect(deliver).toHaveBeenCalledWith(expect.objectContaining({ skipQueue: true }));
});
it("replays stored delivery options during recovery", async () => {
await enqueueDelivery(
{
channel: "whatsapp",
to: "+1",
payloads: [{ text: "a" }],
bestEffort: true,
gifPlayback: true,
silent: true,
mirror: {
sessionKey: "agent:main:main",
text: "a",
mediaUrls: ["https://example.com/a.png"],
},
},
tmpDir(),
);
const deliver = vi.fn().mockResolvedValue([]);
await runRecovery({ deliver });
expect(deliver).toHaveBeenCalledWith(
expect.objectContaining({
bestEffort: true,
gifPlayback: true,
silent: true,
mirror: {
sessionKey: "agent:main:main",
text: "a",
mediaUrls: ["https://example.com/a.png"],
},
}),
);
});
it("respects maxRecoveryMs time budget and bumps deferred retries", async () => {
await enqueueCrashRecoveryEntries();
await enqueueDelivery({ channel: "slack", to: "#c", payloads: [{ text: "c" }] }, tmpDir());
const deliver = vi.fn().mockResolvedValue([]);
const { result, log } = await runRecovery({
deliver,
maxRecoveryMs: 0,
});
expect(deliver).not.toHaveBeenCalled();
expect(result).toEqual({
recovered: 0,
failed: 0,
skippedMaxRetries: 0,
deferredBackoff: 0,
});
const remaining = await loadPendingDeliveries(tmpDir());
expect(remaining).toHaveLength(3);
expect(remaining.every((entry) => entry.retryCount === 1)).toBe(true);
expect(log.warn).toHaveBeenCalledWith(expect.stringContaining("deferred to next startup"));
});
it("defers entries until backoff becomes eligible", async () => {
const id = await enqueueDelivery(
{ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] },
tmpDir(),
);
setQueuedEntryState(tmpDir(), id, { retryCount: 3, lastAttemptAt: Date.now() });
const deliver = vi.fn().mockResolvedValue([]);
const { result, log } = await runRecovery({
deliver,
maxRecoveryMs: 60_000,
});
expect(deliver).not.toHaveBeenCalled();
expect(result).toEqual({
recovered: 0,
failed: 0,
skippedMaxRetries: 0,
deferredBackoff: 1,
});
expect(await loadPendingDeliveries(tmpDir())).toHaveLength(1);
expect(log.info).toHaveBeenCalledWith(expect.stringContaining("not ready for retry yet"));
});
it("continues past high-backoff entries and recovers ready entries behind them", async () => {
const now = Date.now();
const blockedId = await enqueueDelivery(
{ channel: "whatsapp", to: "+1", payloads: [{ text: "blocked" }] },
tmpDir(),
);
const readyId = await enqueueDelivery(
{ channel: "telegram", to: "2", payloads: [{ text: "ready" }] },
tmpDir(),
);
setQueuedEntryState(tmpDir(), blockedId, {
retryCount: 3,
lastAttemptAt: now,
enqueuedAt: now - 30_000,
});
setQueuedEntryState(tmpDir(), readyId, { retryCount: 0, enqueuedAt: now - 10_000 });
const deliver = vi.fn().mockResolvedValue([]);
const { result } = await runRecovery({ deliver, maxRecoveryMs: 60_000 });
expect(result).toEqual({
recovered: 1,
failed: 0,
skippedMaxRetries: 0,
deferredBackoff: 1,
});
expect(deliver).toHaveBeenCalledTimes(1);
expect(deliver).toHaveBeenCalledWith(
expect.objectContaining({ channel: "telegram", to: "2", skipQueue: true }),
);
const remaining = await loadPendingDeliveries(tmpDir());
expect(remaining).toHaveLength(1);
expect(remaining[0]?.id).toBe(blockedId);
});
it("recovers deferred entries on a later restart once backoff elapsed", async () => {
vi.useFakeTimers();
const start = new Date("2026-01-01T00:00:00.000Z");
vi.setSystemTime(start);
const id = await enqueueDelivery(
{ channel: "whatsapp", to: "+1", payloads: [{ text: "later" }] },
tmpDir(),
);
setQueuedEntryState(tmpDir(), id, { retryCount: 3, lastAttemptAt: start.getTime() });
const firstDeliver = vi.fn().mockResolvedValue([]);
const firstRun = await runRecovery({ deliver: firstDeliver, maxRecoveryMs: 60_000 });
expect(firstRun.result).toEqual({
recovered: 0,
failed: 0,
skippedMaxRetries: 0,
deferredBackoff: 1,
});
expect(firstDeliver).not.toHaveBeenCalled();
vi.setSystemTime(new Date(start.getTime() + 600_000 + 1));
const secondDeliver = vi.fn().mockResolvedValue([]);
const secondRun = await runRecovery({ deliver: secondDeliver, maxRecoveryMs: 60_000 });
expect(secondRun.result).toEqual({
recovered: 1,
failed: 0,
skippedMaxRetries: 0,
deferredBackoff: 0,
});
expect(secondDeliver).toHaveBeenCalledTimes(1);
expect(await loadPendingDeliveries(tmpDir())).toHaveLength(0);
vi.useRealTimers();
});
it("returns zeros when queue is empty", async () => {
const deliver = vi.fn();
const { result } = await runRecovery({ deliver });
expect(result).toEqual({
recovered: 0,
failed: 0,
skippedMaxRetries: 0,
deferredBackoff: 0,
});
expect(deliver).not.toHaveBeenCalled();
});
});

View File

@ -0,0 +1,179 @@
import fs from "node:fs";
import path from "node:path";
import { describe, expect, it } from "vitest";
import {
ackDelivery,
enqueueDelivery,
failDelivery,
loadPendingDeliveries,
moveToFailed,
} from "./delivery-queue.js";
import { installDeliveryQueueTmpDirHooks, readQueuedEntry } from "./delivery-queue.test-helpers.js";
describe("delivery-queue storage", () => {
const { tmpDir } = installDeliveryQueueTmpDirHooks();
describe("enqueue + ack lifecycle", () => {
it("creates and removes a queue entry", async () => {
const id = await enqueueDelivery(
{
channel: "whatsapp",
to: "+1555",
payloads: [{ text: "hello" }],
bestEffort: true,
gifPlayback: true,
silent: true,
mirror: {
sessionKey: "agent:main:main",
text: "hello",
mediaUrls: ["https://example.com/file.png"],
},
},
tmpDir(),
);
const queueDir = path.join(tmpDir(), "delivery-queue");
const files = fs.readdirSync(queueDir).filter((file) => file.endsWith(".json"));
expect(files).toHaveLength(1);
expect(files[0]).toBe(`${id}.json`);
const entry = readQueuedEntry(tmpDir(), id);
expect(entry).toMatchObject({
id,
channel: "whatsapp",
to: "+1555",
bestEffort: true,
gifPlayback: true,
silent: true,
mirror: {
sessionKey: "agent:main:main",
text: "hello",
mediaUrls: ["https://example.com/file.png"],
},
retryCount: 0,
});
expect(entry.payloads).toEqual([{ text: "hello" }]);
await ackDelivery(id, tmpDir());
expect(fs.readdirSync(queueDir).filter((file) => file.endsWith(".json"))).toHaveLength(0);
});
it("ack is idempotent (no error on missing file)", async () => {
await expect(ackDelivery("nonexistent-id", tmpDir())).resolves.toBeUndefined();
});
it("ack cleans up leftover .delivered marker when .json is already gone", async () => {
const id = await enqueueDelivery(
{ channel: "whatsapp", to: "+1", payloads: [{ text: "stale-marker" }] },
tmpDir(),
);
const queueDir = path.join(tmpDir(), "delivery-queue");
fs.renameSync(path.join(queueDir, `${id}.json`), path.join(queueDir, `${id}.delivered`));
await expect(ackDelivery(id, tmpDir())).resolves.toBeUndefined();
expect(fs.existsSync(path.join(queueDir, `${id}.delivered`))).toBe(false);
});
it("ack removes .delivered marker so recovery does not replay", async () => {
const id = await enqueueDelivery(
{ channel: "whatsapp", to: "+1", payloads: [{ text: "ack-test" }] },
tmpDir(),
);
const queueDir = path.join(tmpDir(), "delivery-queue");
await ackDelivery(id, tmpDir());
expect(fs.existsSync(path.join(queueDir, `${id}.json`))).toBe(false);
expect(fs.existsSync(path.join(queueDir, `${id}.delivered`))).toBe(false);
});
it("loadPendingDeliveries cleans up stale .delivered markers without replaying", async () => {
const id = await enqueueDelivery(
{ channel: "telegram", to: "99", payloads: [{ text: "stale" }] },
tmpDir(),
);
const queueDir = path.join(tmpDir(), "delivery-queue");
fs.renameSync(path.join(queueDir, `${id}.json`), path.join(queueDir, `${id}.delivered`));
const entries = await loadPendingDeliveries(tmpDir());
expect(entries).toHaveLength(0);
expect(fs.existsSync(path.join(queueDir, `${id}.delivered`))).toBe(false);
});
});
describe("failDelivery", () => {
it("increments retryCount, records attempt time, and sets lastError", async () => {
const id = await enqueueDelivery(
{
channel: "telegram",
to: "123",
payloads: [{ text: "test" }],
},
tmpDir(),
);
await failDelivery(id, "connection refused", tmpDir());
const entry = readQueuedEntry(tmpDir(), id);
expect(entry.retryCount).toBe(1);
expect(typeof entry.lastAttemptAt).toBe("number");
expect((entry.lastAttemptAt as number) > 0).toBe(true);
expect(entry.lastError).toBe("connection refused");
});
});
describe("moveToFailed", () => {
it("moves entry to failed/ subdirectory", async () => {
const id = await enqueueDelivery(
{
channel: "slack",
to: "#general",
payloads: [{ text: "hi" }],
},
tmpDir(),
);
await moveToFailed(id, tmpDir());
const queueDir = path.join(tmpDir(), "delivery-queue");
const failedDir = path.join(queueDir, "failed");
expect(fs.existsSync(path.join(queueDir, `${id}.json`))).toBe(false);
expect(fs.existsSync(path.join(failedDir, `${id}.json`))).toBe(true);
});
});
describe("loadPendingDeliveries", () => {
it("returns empty array when queue directory does not exist", async () => {
expect(await loadPendingDeliveries(path.join(tmpDir(), "no-such-dir"))).toEqual([]);
});
it("loads multiple entries", async () => {
await enqueueDelivery({ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, tmpDir());
await enqueueDelivery({ channel: "telegram", to: "2", payloads: [{ text: "b" }] }, tmpDir());
expect(await loadPendingDeliveries(tmpDir())).toHaveLength(2);
});
it("backfills lastAttemptAt for legacy retry entries during load", async () => {
const id = await enqueueDelivery(
{ channel: "whatsapp", to: "+1", payloads: [{ text: "legacy" }] },
tmpDir(),
);
const filePath = path.join(tmpDir(), "delivery-queue", `${id}.json`);
const legacyEntry = readQueuedEntry(tmpDir(), id);
legacyEntry.retryCount = 2;
delete legacyEntry.lastAttemptAt;
fs.writeFileSync(filePath, JSON.stringify(legacyEntry), "utf-8");
const entries = await loadPendingDeliveries(tmpDir());
expect(entries).toHaveLength(1);
expect(entries[0]?.lastAttemptAt).toBe(entries[0]?.enqueuedAt);
const persisted = readQueuedEntry(tmpDir(), id);
expect(persisted.lastAttemptAt).toBe(persisted.enqueuedAt);
});
});
});

View File

@ -0,0 +1,73 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { afterAll, beforeAll, beforeEach, vi } from "vitest";
import type { DeliverFn, RecoveryLogger } from "./delivery-queue.js";
export function installDeliveryQueueTmpDirHooks(): { readonly tmpDir: () => string } {
let tmpDir = "";
let fixtureRoot = "";
let fixtureCount = 0;
beforeAll(() => {
fixtureRoot = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-dq-suite-"));
});
beforeEach(() => {
tmpDir = path.join(fixtureRoot, `case-${fixtureCount++}`);
fs.mkdirSync(tmpDir, { recursive: true });
});
afterAll(() => {
if (!fixtureRoot) {
return;
}
fs.rmSync(fixtureRoot, { recursive: true, force: true });
fixtureRoot = "";
});
return {
tmpDir: () => tmpDir,
};
}
export function readQueuedEntry(tmpDir: string, id: string): Record<string, unknown> {
return JSON.parse(
fs.readFileSync(path.join(tmpDir, "delivery-queue", `${id}.json`), "utf-8"),
) as Record<string, unknown>;
}
export function setQueuedEntryState(
tmpDir: string,
id: string,
state: { retryCount: number; lastAttemptAt?: number; enqueuedAt?: number },
): void {
const filePath = path.join(tmpDir, "delivery-queue", `${id}.json`);
const entry = readQueuedEntry(tmpDir, id);
entry.retryCount = state.retryCount;
if (state.lastAttemptAt === undefined) {
delete entry.lastAttemptAt;
} else {
entry.lastAttemptAt = state.lastAttemptAt;
}
if (state.enqueuedAt !== undefined) {
entry.enqueuedAt = state.enqueuedAt;
}
fs.writeFileSync(filePath, JSON.stringify(entry), "utf-8");
}
export function createRecoveryLog(): RecoveryLogger & {
info: ReturnType<typeof vi.fn<(msg: string) => void>>;
warn: ReturnType<typeof vi.fn<(msg: string) => void>>;
error: ReturnType<typeof vi.fn<(msg: string) => void>>;
} {
return {
info: vi.fn<(msg: string) => void>(),
warn: vi.fn<(msg: string) => void>(),
error: vi.fn<(msg: string) => void>(),
};
}
export function asDeliverFn(deliver: ReturnType<typeof vi.fn>): DeliverFn {
return deliver as DeliverFn;
}

View File

@ -1,580 +0,0 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
import {
ackDelivery,
computeBackoffMs,
type DeliverFn,
enqueueDelivery,
failDelivery,
isEntryEligibleForRecoveryRetry,
isPermanentDeliveryError,
loadPendingDeliveries,
MAX_RETRIES,
moveToFailed,
recoverPendingDeliveries,
} from "./delivery-queue.js";
describe("delivery-queue", () => {
let tmpDir: string;
let fixtureRoot = "";
let fixtureCount = 0;
beforeAll(() => {
fixtureRoot = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-dq-suite-"));
});
beforeEach(() => {
tmpDir = path.join(fixtureRoot, `case-${fixtureCount++}`);
fs.mkdirSync(tmpDir, { recursive: true });
});
afterAll(() => {
if (!fixtureRoot) {
return;
}
fs.rmSync(fixtureRoot, { recursive: true, force: true });
fixtureRoot = "";
});
describe("enqueue + ack lifecycle", () => {
it("creates and removes a queue entry", async () => {
const id = await enqueueDelivery(
{
channel: "whatsapp",
to: "+1555",
payloads: [{ text: "hello" }],
bestEffort: true,
gifPlayback: true,
silent: true,
mirror: {
sessionKey: "agent:main:main",
text: "hello",
mediaUrls: ["https://example.com/file.png"],
},
},
tmpDir,
);
const queueDir = path.join(tmpDir, "delivery-queue");
const files = fs.readdirSync(queueDir).filter((f) => f.endsWith(".json"));
expect(files).toHaveLength(1);
expect(files[0]).toBe(`${id}.json`);
const entry = JSON.parse(fs.readFileSync(path.join(queueDir, files[0]), "utf-8"));
expect(entry).toMatchObject({
id,
channel: "whatsapp",
to: "+1555",
bestEffort: true,
gifPlayback: true,
silent: true,
mirror: {
sessionKey: "agent:main:main",
text: "hello",
mediaUrls: ["https://example.com/file.png"],
},
retryCount: 0,
});
expect(entry.payloads).toEqual([{ text: "hello" }]);
await ackDelivery(id, tmpDir);
const remaining = fs.readdirSync(queueDir).filter((f) => f.endsWith(".json"));
expect(remaining).toHaveLength(0);
});
it("ack is idempotent (no error on missing file)", async () => {
await expect(ackDelivery("nonexistent-id", tmpDir)).resolves.toBeUndefined();
});
it("ack cleans up leftover .delivered marker when .json is already gone", async () => {
const id = await enqueueDelivery(
{ channel: "whatsapp", to: "+1", payloads: [{ text: "stale-marker" }] },
tmpDir,
);
const queueDir = path.join(tmpDir, "delivery-queue");
fs.renameSync(path.join(queueDir, `${id}.json`), path.join(queueDir, `${id}.delivered`));
await expect(ackDelivery(id, tmpDir)).resolves.toBeUndefined();
expect(fs.existsSync(path.join(queueDir, `${id}.delivered`))).toBe(false);
});
it("ack removes .delivered marker so recovery does not replay", async () => {
const id = await enqueueDelivery(
{ channel: "whatsapp", to: "+1", payloads: [{ text: "ack-test" }] },
tmpDir,
);
const queueDir = path.join(tmpDir, "delivery-queue");
await ackDelivery(id, tmpDir);
expect(fs.existsSync(path.join(queueDir, `${id}.json`))).toBe(false);
expect(fs.existsSync(path.join(queueDir, `${id}.delivered`))).toBe(false);
});
it("loadPendingDeliveries cleans up stale .delivered markers without replaying", async () => {
const id = await enqueueDelivery(
{ channel: "telegram", to: "99", payloads: [{ text: "stale" }] },
tmpDir,
);
const queueDir = path.join(tmpDir, "delivery-queue");
fs.renameSync(path.join(queueDir, `${id}.json`), path.join(queueDir, `${id}.delivered`));
const entries = await loadPendingDeliveries(tmpDir);
expect(entries).toHaveLength(0);
expect(fs.existsSync(path.join(queueDir, `${id}.delivered`))).toBe(false);
});
});
describe("failDelivery", () => {
it("increments retryCount, records attempt time, and sets lastError", async () => {
const id = await enqueueDelivery(
{
channel: "telegram",
to: "123",
payloads: [{ text: "test" }],
},
tmpDir,
);
await failDelivery(id, "connection refused", tmpDir);
const queueDir = path.join(tmpDir, "delivery-queue");
const entry = JSON.parse(fs.readFileSync(path.join(queueDir, `${id}.json`), "utf-8"));
expect(entry.retryCount).toBe(1);
expect(typeof entry.lastAttemptAt).toBe("number");
expect(entry.lastAttemptAt).toBeGreaterThan(0);
expect(entry.lastError).toBe("connection refused");
});
});
describe("moveToFailed", () => {
it("moves entry to failed/ subdirectory", async () => {
const id = await enqueueDelivery(
{
channel: "slack",
to: "#general",
payloads: [{ text: "hi" }],
},
tmpDir,
);
await moveToFailed(id, tmpDir);
const queueDir = path.join(tmpDir, "delivery-queue");
const failedDir = path.join(queueDir, "failed");
expect(fs.existsSync(path.join(queueDir, `${id}.json`))).toBe(false);
expect(fs.existsSync(path.join(failedDir, `${id}.json`))).toBe(true);
});
});
describe("isPermanentDeliveryError", () => {
it.each([
"No conversation reference found for user:abc",
"Telegram send failed: chat not found (chat_id=user:123)",
"user not found",
"Bot was blocked by the user",
"Forbidden: bot was kicked from the group chat",
"chat_id is empty",
"Outbound not configured for channel: msteams",
])("returns true for permanent error: %s", (msg) => {
expect(isPermanentDeliveryError(msg)).toBe(true);
});
it.each([
"network down",
"ETIMEDOUT",
"socket hang up",
"rate limited",
"500 Internal Server Error",
])("returns false for transient error: %s", (msg) => {
expect(isPermanentDeliveryError(msg)).toBe(false);
});
});
describe("loadPendingDeliveries", () => {
it("returns empty array when queue directory does not exist", async () => {
const nonexistent = path.join(tmpDir, "no-such-dir");
const entries = await loadPendingDeliveries(nonexistent);
expect(entries).toEqual([]);
});
it("loads multiple entries", async () => {
await enqueueDelivery({ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, tmpDir);
await enqueueDelivery({ channel: "telegram", to: "2", payloads: [{ text: "b" }] }, tmpDir);
const entries = await loadPendingDeliveries(tmpDir);
expect(entries).toHaveLength(2);
});
it("backfills lastAttemptAt for legacy retry entries during load", async () => {
const id = await enqueueDelivery(
{ channel: "whatsapp", to: "+1", payloads: [{ text: "legacy" }] },
tmpDir,
);
const filePath = path.join(tmpDir, "delivery-queue", `${id}.json`);
const legacyEntry = JSON.parse(fs.readFileSync(filePath, "utf-8"));
legacyEntry.retryCount = 2;
delete legacyEntry.lastAttemptAt;
fs.writeFileSync(filePath, JSON.stringify(legacyEntry), "utf-8");
const entries = await loadPendingDeliveries(tmpDir);
expect(entries).toHaveLength(1);
expect(entries[0]?.lastAttemptAt).toBe(entries[0]?.enqueuedAt);
const persisted = JSON.parse(fs.readFileSync(filePath, "utf-8"));
expect(persisted.lastAttemptAt).toBe(persisted.enqueuedAt);
});
});
describe("computeBackoffMs", () => {
it("returns scheduled backoff values and clamps at max retry", () => {
const cases = [
{ retryCount: 0, expected: 0 },
{ retryCount: 1, expected: 5_000 },
{ retryCount: 2, expected: 25_000 },
{ retryCount: 3, expected: 120_000 },
{ retryCount: 4, expected: 600_000 },
{ retryCount: 5, expected: 600_000 },
] as const;
for (const testCase of cases) {
expect(computeBackoffMs(testCase.retryCount), String(testCase.retryCount)).toBe(
testCase.expected,
);
}
});
});
describe("isEntryEligibleForRecoveryRetry", () => {
it("allows first replay after crash for retryCount=0 without lastAttemptAt", () => {
const now = Date.now();
const result = isEntryEligibleForRecoveryRetry(
{
id: "entry-1",
channel: "whatsapp",
to: "+1",
payloads: [{ text: "a" }],
enqueuedAt: now,
retryCount: 0,
},
now,
);
expect(result).toEqual({ eligible: true });
});
it("defers retry entries until backoff window elapses", () => {
const now = Date.now();
const result = isEntryEligibleForRecoveryRetry(
{
id: "entry-2",
channel: "whatsapp",
to: "+1",
payloads: [{ text: "a" }],
enqueuedAt: now - 30_000,
retryCount: 3,
lastAttemptAt: now,
},
now,
);
expect(result.eligible).toBe(false);
if (result.eligible) {
throw new Error("Expected ineligible retry entry");
}
expect(result.remainingBackoffMs).toBeGreaterThan(0);
});
});
describe("recoverPendingDeliveries", () => {
const baseCfg = {};
const createLog = () => ({ info: vi.fn(), warn: vi.fn(), error: vi.fn() });
const enqueueCrashRecoveryEntries = async () => {
await enqueueDelivery({ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, tmpDir);
await enqueueDelivery({ channel: "telegram", to: "2", payloads: [{ text: "b" }] }, tmpDir);
};
const setEntryState = (
id: string,
state: { retryCount: number; lastAttemptAt?: number; enqueuedAt?: number },
) => {
const filePath = path.join(tmpDir, "delivery-queue", `${id}.json`);
const entry = JSON.parse(fs.readFileSync(filePath, "utf-8"));
entry.retryCount = state.retryCount;
if (state.lastAttemptAt === undefined) {
delete entry.lastAttemptAt;
} else {
entry.lastAttemptAt = state.lastAttemptAt;
}
if (state.enqueuedAt !== undefined) {
entry.enqueuedAt = state.enqueuedAt;
}
fs.writeFileSync(filePath, JSON.stringify(entry), "utf-8");
};
const runRecovery = async ({
deliver,
log = createLog(),
maxRecoveryMs,
}: {
deliver: ReturnType<typeof vi.fn>;
log?: ReturnType<typeof createLog>;
maxRecoveryMs?: number;
}) => {
const result = await recoverPendingDeliveries({
deliver: deliver as DeliverFn,
log,
cfg: baseCfg,
stateDir: tmpDir,
...(maxRecoveryMs === undefined ? {} : { maxRecoveryMs }),
});
return { result, log };
};
it("recovers entries from a simulated crash", async () => {
await enqueueCrashRecoveryEntries();
const deliver = vi.fn().mockResolvedValue([]);
const { result } = await runRecovery({ deliver });
expect(deliver).toHaveBeenCalledTimes(2);
expect(result.recovered).toBe(2);
expect(result.failed).toBe(0);
expect(result.skippedMaxRetries).toBe(0);
expect(result.deferredBackoff).toBe(0);
const remaining = await loadPendingDeliveries(tmpDir);
expect(remaining).toHaveLength(0);
});
it("moves entries that exceeded max retries to failed/", async () => {
const id = await enqueueDelivery(
{ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] },
tmpDir,
);
setEntryState(id, { retryCount: MAX_RETRIES });
const deliver = vi.fn();
const { result } = await runRecovery({ deliver });
expect(deliver).not.toHaveBeenCalled();
expect(result.skippedMaxRetries).toBe(1);
expect(result.deferredBackoff).toBe(0);
const failedDir = path.join(tmpDir, "delivery-queue", "failed");
expect(fs.existsSync(path.join(failedDir, `${id}.json`))).toBe(true);
});
it("increments retryCount on failed recovery attempt", async () => {
await enqueueDelivery({ channel: "slack", to: "#ch", payloads: [{ text: "x" }] }, tmpDir);
const deliver = vi.fn().mockRejectedValue(new Error("network down"));
const { result } = await runRecovery({ deliver });
expect(result.failed).toBe(1);
expect(result.recovered).toBe(0);
const entries = await loadPendingDeliveries(tmpDir);
expect(entries).toHaveLength(1);
expect(entries[0].retryCount).toBe(1);
expect(entries[0].lastError).toBe("network down");
});
it("moves entries to failed/ immediately on permanent delivery errors", async () => {
const id = await enqueueDelivery(
{ channel: "msteams", to: "user:abc", payloads: [{ text: "hi" }] },
tmpDir,
);
const deliver = vi
.fn()
.mockRejectedValue(new Error("No conversation reference found for user:abc"));
const log = createLog();
const { result } = await runRecovery({ deliver, log });
expect(result.failed).toBe(1);
expect(result.recovered).toBe(0);
const remaining = await loadPendingDeliveries(tmpDir);
expect(remaining).toHaveLength(0);
const failedDir = path.join(tmpDir, "delivery-queue", "failed");
expect(fs.existsSync(path.join(failedDir, `${id}.json`))).toBe(true);
expect(log.warn).toHaveBeenCalledWith(expect.stringContaining("permanent error"));
});
it("passes skipQueue: true to prevent re-enqueueing during recovery", async () => {
await enqueueDelivery({ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, tmpDir);
const deliver = vi.fn().mockResolvedValue([]);
await runRecovery({ deliver });
expect(deliver).toHaveBeenCalledWith(expect.objectContaining({ skipQueue: true }));
});
it("replays stored delivery options during recovery", async () => {
await enqueueDelivery(
{
channel: "whatsapp",
to: "+1",
payloads: [{ text: "a" }],
bestEffort: true,
gifPlayback: true,
silent: true,
mirror: {
sessionKey: "agent:main:main",
text: "a",
mediaUrls: ["https://example.com/a.png"],
},
},
tmpDir,
);
const deliver = vi.fn().mockResolvedValue([]);
await runRecovery({ deliver });
expect(deliver).toHaveBeenCalledWith(
expect.objectContaining({
bestEffort: true,
gifPlayback: true,
silent: true,
mirror: {
sessionKey: "agent:main:main",
text: "a",
mediaUrls: ["https://example.com/a.png"],
},
}),
);
});
it("respects maxRecoveryMs time budget", async () => {
await enqueueCrashRecoveryEntries();
await enqueueDelivery({ channel: "slack", to: "#c", payloads: [{ text: "c" }] }, tmpDir);
const deliver = vi.fn().mockResolvedValue([]);
const { result, log } = await runRecovery({
deliver,
maxRecoveryMs: 0,
});
expect(deliver).not.toHaveBeenCalled();
expect(result.recovered).toBe(0);
expect(result.failed).toBe(0);
expect(result.skippedMaxRetries).toBe(0);
expect(result.deferredBackoff).toBe(0);
const remaining = await loadPendingDeliveries(tmpDir);
expect(remaining).toHaveLength(3);
expect(log.warn).toHaveBeenCalledWith(expect.stringContaining("deferred to next startup"));
});
it("defers entries until backoff becomes eligible", async () => {
const id = await enqueueDelivery(
{ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] },
tmpDir,
);
setEntryState(id, { retryCount: 3, lastAttemptAt: Date.now() });
const deliver = vi.fn().mockResolvedValue([]);
const { result, log } = await runRecovery({
deliver,
maxRecoveryMs: 60_000,
});
expect(deliver).not.toHaveBeenCalled();
expect(result).toEqual({
recovered: 0,
failed: 0,
skippedMaxRetries: 0,
deferredBackoff: 1,
});
const remaining = await loadPendingDeliveries(tmpDir);
expect(remaining).toHaveLength(1);
expect(log.info).toHaveBeenCalledWith(expect.stringContaining("not ready for retry yet"));
});
it("continues past high-backoff entries and recovers ready entries behind them", async () => {
const now = Date.now();
const blockedId = await enqueueDelivery(
{ channel: "whatsapp", to: "+1", payloads: [{ text: "blocked" }] },
tmpDir,
);
const readyId = await enqueueDelivery(
{ channel: "telegram", to: "2", payloads: [{ text: "ready" }] },
tmpDir,
);
setEntryState(blockedId, { retryCount: 3, lastAttemptAt: now, enqueuedAt: now - 30_000 });
setEntryState(readyId, { retryCount: 0, enqueuedAt: now - 10_000 });
const deliver = vi.fn().mockResolvedValue([]);
const { result } = await runRecovery({ deliver, maxRecoveryMs: 60_000 });
expect(result).toEqual({
recovered: 1,
failed: 0,
skippedMaxRetries: 0,
deferredBackoff: 1,
});
expect(deliver).toHaveBeenCalledTimes(1);
expect(deliver).toHaveBeenCalledWith(
expect.objectContaining({ channel: "telegram", to: "2", skipQueue: true }),
);
const remaining = await loadPendingDeliveries(tmpDir);
expect(remaining).toHaveLength(1);
expect(remaining[0]?.id).toBe(blockedId);
});
it("recovers deferred entries on a later restart once backoff elapsed", async () => {
vi.useFakeTimers();
const start = new Date("2026-01-01T00:00:00.000Z");
vi.setSystemTime(start);
const id = await enqueueDelivery(
{ channel: "whatsapp", to: "+1", payloads: [{ text: "later" }] },
tmpDir,
);
setEntryState(id, { retryCount: 3, lastAttemptAt: start.getTime() });
const firstDeliver = vi.fn().mockResolvedValue([]);
const firstRun = await runRecovery({ deliver: firstDeliver, maxRecoveryMs: 60_000 });
expect(firstRun.result).toEqual({
recovered: 0,
failed: 0,
skippedMaxRetries: 0,
deferredBackoff: 1,
});
expect(firstDeliver).not.toHaveBeenCalled();
vi.setSystemTime(new Date(start.getTime() + 600_000 + 1));
const secondDeliver = vi.fn().mockResolvedValue([]);
const secondRun = await runRecovery({ deliver: secondDeliver, maxRecoveryMs: 60_000 });
expect(secondRun.result).toEqual({
recovered: 1,
failed: 0,
skippedMaxRetries: 0,
deferredBackoff: 0,
});
expect(secondDeliver).toHaveBeenCalledTimes(1);
const remaining = await loadPendingDeliveries(tmpDir);
expect(remaining).toHaveLength(0);
vi.useRealTimers();
});
it("returns zeros when queue is empty", async () => {
const deliver = vi.fn();
const { result } = await runRecovery({ deliver });
expect(result).toEqual({
recovered: 0,
failed: 0,
skippedMaxRetries: 0,
deferredBackoff: 0,
});
expect(deliver).not.toHaveBeenCalled();
});
});
});

View File

@ -1,444 +1,17 @@
import fs from "node:fs";
import path from "node:path";
import type { ReplyPayload } from "../../auto-reply/types.js";
import type { OpenClawConfig } from "../../config/config.js";
import { resolveStateDir } from "../../config/paths.js";
import { generateSecureUuid } from "../secure-random.js";
import type { OutboundMirror } from "./mirror.js";
import type { OutboundChannel } from "./targets.js";
const QUEUE_DIRNAME = "delivery-queue";
const FAILED_DIRNAME = "failed";
const MAX_RETRIES = 5;
/** Backoff delays in milliseconds indexed by retry count (1-based). */
const BACKOFF_MS: readonly number[] = [
5_000, // retry 1: 5s
25_000, // retry 2: 25s
120_000, // retry 3: 2m
600_000, // retry 4: 10m
];
type QueuedDeliveryPayload = {
channel: Exclude<OutboundChannel, "none">;
to: string;
accountId?: string;
/**
* Original payloads before plugin hooks. On recovery, hooks re-run on these
* payloads this is intentional since hooks are stateless transforms and
* should produce the same result on replay.
*/
payloads: ReplyPayload[];
threadId?: string | number | null;
replyToId?: string | null;
bestEffort?: boolean;
gifPlayback?: boolean;
forceDocument?: boolean;
silent?: boolean;
mirror?: OutboundMirror;
};
export interface QueuedDelivery extends QueuedDeliveryPayload {
id: string;
enqueuedAt: number;
retryCount: number;
lastAttemptAt?: number;
lastError?: string;
}
export type RecoverySummary = {
recovered: number;
failed: number;
skippedMaxRetries: number;
deferredBackoff: number;
};
function resolveQueueDir(stateDir?: string): string {
const base = stateDir ?? resolveStateDir();
return path.join(base, QUEUE_DIRNAME);
}
function resolveFailedDir(stateDir?: string): string {
return path.join(resolveQueueDir(stateDir), FAILED_DIRNAME);
}
function resolveQueueEntryPaths(
id: string,
stateDir?: string,
): {
jsonPath: string;
deliveredPath: string;
} {
const queueDir = resolveQueueDir(stateDir);
return {
jsonPath: path.join(queueDir, `${id}.json`),
deliveredPath: path.join(queueDir, `${id}.delivered`),
};
}
function getErrnoCode(err: unknown): string | null {
return err && typeof err === "object" && "code" in err
? String((err as { code?: unknown }).code)
: null;
}
async function unlinkBestEffort(filePath: string): Promise<void> {
try {
await fs.promises.unlink(filePath);
} catch {
// Best-effort cleanup.
}
}
/** Ensure the queue directory (and failed/ subdirectory) exist. */
export async function ensureQueueDir(stateDir?: string): Promise<string> {
const queueDir = resolveQueueDir(stateDir);
await fs.promises.mkdir(queueDir, { recursive: true, mode: 0o700 });
await fs.promises.mkdir(resolveFailedDir(stateDir), { recursive: true, mode: 0o700 });
return queueDir;
}
/** Persist a delivery entry to disk before attempting send. Returns the entry ID. */
type QueuedDeliveryParams = QueuedDeliveryPayload;
export async function enqueueDelivery(
params: QueuedDeliveryParams,
stateDir?: string,
): Promise<string> {
const queueDir = await ensureQueueDir(stateDir);
const id = generateSecureUuid();
const entry: QueuedDelivery = {
id,
enqueuedAt: Date.now(),
channel: params.channel,
to: params.to,
accountId: params.accountId,
payloads: params.payloads,
threadId: params.threadId,
replyToId: params.replyToId,
bestEffort: params.bestEffort,
gifPlayback: params.gifPlayback,
forceDocument: params.forceDocument,
silent: params.silent,
mirror: params.mirror,
retryCount: 0,
};
const filePath = path.join(queueDir, `${id}.json`);
const tmp = `${filePath}.${process.pid}.tmp`;
const json = JSON.stringify(entry, null, 2);
await fs.promises.writeFile(tmp, json, { encoding: "utf-8", mode: 0o600 });
await fs.promises.rename(tmp, filePath);
return id;
}
/** Remove a successfully delivered entry from the queue.
*
* Uses a two-phase approach so that a crash between delivery and cleanup
* does not cause the message to be replayed on the next recovery scan:
* Phase 1: atomic rename {id}.json {id}.delivered
* Phase 2: unlink the .delivered marker
* If the process dies between phase 1 and phase 2 the marker is cleaned up
* by {@link loadPendingDeliveries} on the next startup without re-sending.
*/
export async function ackDelivery(id: string, stateDir?: string): Promise<void> {
const { jsonPath, deliveredPath } = resolveQueueEntryPaths(id, stateDir);
try {
// Phase 1: atomic rename marks the delivery as complete.
await fs.promises.rename(jsonPath, deliveredPath);
} catch (err) {
const code = getErrnoCode(err);
if (code === "ENOENT") {
// .json already gone — may have been renamed by a previous ack attempt.
// Try to clean up a leftover .delivered marker if present.
await unlinkBestEffort(deliveredPath);
return;
}
throw err;
}
// Phase 2: remove the marker file.
await unlinkBestEffort(deliveredPath);
}
/** Update a queue entry after a failed delivery attempt. */
export async function failDelivery(id: string, error: string, stateDir?: string): Promise<void> {
const filePath = path.join(resolveQueueDir(stateDir), `${id}.json`);
const raw = await fs.promises.readFile(filePath, "utf-8");
const entry: QueuedDelivery = JSON.parse(raw);
entry.retryCount += 1;
entry.lastAttemptAt = Date.now();
entry.lastError = error;
const tmp = `${filePath}.${process.pid}.tmp`;
await fs.promises.writeFile(tmp, JSON.stringify(entry, null, 2), {
encoding: "utf-8",
mode: 0o600,
});
await fs.promises.rename(tmp, filePath);
}
/** Load all pending delivery entries from the queue directory. */
export async function loadPendingDeliveries(stateDir?: string): Promise<QueuedDelivery[]> {
const queueDir = resolveQueueDir(stateDir);
let files: string[];
try {
files = await fs.promises.readdir(queueDir);
} catch (err) {
const code = getErrnoCode(err);
if (code === "ENOENT") {
return [];
}
throw err;
}
// Clean up .delivered markers left by ackDelivery if the process crashed
// between the rename and the unlink.
for (const file of files) {
if (!file.endsWith(".delivered")) {
continue;
}
await unlinkBestEffort(path.join(queueDir, file));
}
const entries: QueuedDelivery[] = [];
for (const file of files) {
if (!file.endsWith(".json")) {
continue;
}
const filePath = path.join(queueDir, file);
try {
const stat = await fs.promises.stat(filePath);
if (!stat.isFile()) {
continue;
}
const raw = await fs.promises.readFile(filePath, "utf-8");
const parsed = JSON.parse(raw) as QueuedDelivery;
const { entry, migrated } = normalizeLegacyQueuedDeliveryEntry(parsed);
if (migrated) {
const tmp = `${filePath}.${process.pid}.tmp`;
await fs.promises.writeFile(tmp, JSON.stringify(entry, null, 2), {
encoding: "utf-8",
mode: 0o600,
});
await fs.promises.rename(tmp, filePath);
}
entries.push(entry);
} catch {
// Skip malformed or inaccessible entries.
}
}
return entries;
}
/** Move a queue entry to the failed/ subdirectory. */
export async function moveToFailed(id: string, stateDir?: string): Promise<void> {
const queueDir = resolveQueueDir(stateDir);
const failedDir = resolveFailedDir(stateDir);
await fs.promises.mkdir(failedDir, { recursive: true, mode: 0o700 });
const src = path.join(queueDir, `${id}.json`);
const dest = path.join(failedDir, `${id}.json`);
await fs.promises.rename(src, dest);
}
/** Compute the backoff delay in ms for a given retry count. */
export function computeBackoffMs(retryCount: number): number {
if (retryCount <= 0) {
return 0;
}
return BACKOFF_MS[Math.min(retryCount - 1, BACKOFF_MS.length - 1)] ?? BACKOFF_MS.at(-1) ?? 0;
}
export function isEntryEligibleForRecoveryRetry(
entry: QueuedDelivery,
now: number,
): { eligible: true } | { eligible: false; remainingBackoffMs: number } {
const backoff = computeBackoffMs(entry.retryCount + 1);
if (backoff <= 0) {
return { eligible: true };
}
const firstReplayAfterCrash = entry.retryCount === 0 && entry.lastAttemptAt === undefined;
if (firstReplayAfterCrash) {
return { eligible: true };
}
const hasAttemptTimestamp =
typeof entry.lastAttemptAt === "number" &&
Number.isFinite(entry.lastAttemptAt) &&
entry.lastAttemptAt > 0;
const baseAttemptAt = hasAttemptTimestamp
? (entry.lastAttemptAt ?? entry.enqueuedAt)
: entry.enqueuedAt;
const nextEligibleAt = baseAttemptAt + backoff;
if (now >= nextEligibleAt) {
return { eligible: true };
}
return { eligible: false, remainingBackoffMs: nextEligibleAt - now };
}
function normalizeLegacyQueuedDeliveryEntry(entry: QueuedDelivery): {
entry: QueuedDelivery;
migrated: boolean;
} {
const hasAttemptTimestamp =
typeof entry.lastAttemptAt === "number" &&
Number.isFinite(entry.lastAttemptAt) &&
entry.lastAttemptAt > 0;
if (hasAttemptTimestamp || entry.retryCount <= 0) {
return { entry, migrated: false };
}
const hasEnqueuedTimestamp =
typeof entry.enqueuedAt === "number" &&
Number.isFinite(entry.enqueuedAt) &&
entry.enqueuedAt > 0;
if (!hasEnqueuedTimestamp) {
return { entry, migrated: false };
}
return {
entry: {
...entry,
lastAttemptAt: entry.enqueuedAt,
},
migrated: true,
};
}
export type DeliverFn = (
params: {
cfg: OpenClawConfig;
} & QueuedDeliveryParams & {
skipQueue?: boolean;
},
) => Promise<unknown>;
export interface RecoveryLogger {
info(msg: string): void;
warn(msg: string): void;
error(msg: string): void;
}
/**
* On gateway startup, scan the delivery queue and retry any pending entries.
* Uses exponential backoff and moves entries that exceed MAX_RETRIES to failed/.
*/
export async function recoverPendingDeliveries(opts: {
deliver: DeliverFn;
log: RecoveryLogger;
cfg: OpenClawConfig;
stateDir?: string;
/** Maximum wall-clock time for recovery in ms. Remaining entries are deferred to next startup. Default: 60 000. */
maxRecoveryMs?: number;
}): Promise<RecoverySummary> {
const pending = await loadPendingDeliveries(opts.stateDir);
if (pending.length === 0) {
return { recovered: 0, failed: 0, skippedMaxRetries: 0, deferredBackoff: 0 };
}
// Process oldest first.
pending.sort((a, b) => a.enqueuedAt - b.enqueuedAt);
opts.log.info(`Found ${pending.length} pending delivery entries — starting recovery`);
const deadline = Date.now() + (opts.maxRecoveryMs ?? 60_000);
let recovered = 0;
let failed = 0;
let skippedMaxRetries = 0;
let deferredBackoff = 0;
for (let i = 0; i < pending.length; i++) {
const entry = pending[i];
const now = Date.now();
if (now >= deadline) {
opts.log.warn(`Recovery time budget exceeded — remaining entries deferred to next startup`);
// Increment retryCount for all remaining entries so that entries which
// are consistently deferred by the time budget eventually reach
// MAX_RETRIES and are pruned rather than looping forever.
await Promise.allSettled(
pending
.slice(i)
.map((e) => failDelivery(e.id, "recovery time budget exceeded", opts.stateDir)),
);
break;
}
if (entry.retryCount >= MAX_RETRIES) {
opts.log.warn(
`Delivery ${entry.id} exceeded max retries (${entry.retryCount}/${MAX_RETRIES}) — moving to failed/`,
);
try {
await moveToFailed(entry.id, opts.stateDir);
} catch (err) {
opts.log.error(`Failed to move entry ${entry.id} to failed/: ${String(err)}`);
}
skippedMaxRetries += 1;
continue;
}
const retryEligibility = isEntryEligibleForRecoveryRetry(entry, now);
if (!retryEligibility.eligible) {
deferredBackoff += 1;
opts.log.info(
`Delivery ${entry.id} not ready for retry yet — backoff ${retryEligibility.remainingBackoffMs}ms remaining`,
);
continue;
}
try {
await opts.deliver({
cfg: opts.cfg,
channel: entry.channel,
to: entry.to,
accountId: entry.accountId,
payloads: entry.payloads,
threadId: entry.threadId,
replyToId: entry.replyToId,
bestEffort: entry.bestEffort,
gifPlayback: entry.gifPlayback,
forceDocument: entry.forceDocument,
silent: entry.silent,
mirror: entry.mirror,
skipQueue: true, // Prevent re-enqueueing during recovery
});
await ackDelivery(entry.id, opts.stateDir);
recovered += 1;
opts.log.info(`Recovered delivery ${entry.id} to ${entry.channel}:${entry.to}`);
} catch (err) {
const errMsg = err instanceof Error ? err.message : String(err);
if (isPermanentDeliveryError(errMsg)) {
opts.log.warn(`Delivery ${entry.id} hit permanent error — moving to failed/: ${errMsg}`);
try {
await moveToFailed(entry.id, opts.stateDir);
} catch (moveErr) {
opts.log.error(`Failed to move entry ${entry.id} to failed/: ${String(moveErr)}`);
}
failed += 1;
continue;
}
try {
await failDelivery(entry.id, errMsg, opts.stateDir);
} catch {
// Best-effort update.
}
failed += 1;
opts.log.warn(`Retry failed for delivery ${entry.id}: ${errMsg}`);
}
}
opts.log.info(
`Delivery recovery complete: ${recovered} recovered, ${failed} failed, ${skippedMaxRetries} skipped (max retries), ${deferredBackoff} deferred (backoff)`,
);
return { recovered, failed, skippedMaxRetries, deferredBackoff };
}
export { MAX_RETRIES };
const PERMANENT_ERROR_PATTERNS: readonly RegExp[] = [
/no conversation reference found/i,
/chat not found/i,
/user not found/i,
/bot was blocked by the user/i,
/forbidden: bot was kicked/i,
/chat_id is empty/i,
/recipient is not a valid/i,
/outbound not configured for channel/i,
/ambiguous discord recipient/i,
];
export function isPermanentDeliveryError(error: string): boolean {
return PERMANENT_ERROR_PATTERNS.some((re) => re.test(error));
}
export {
ackDelivery,
enqueueDelivery,
ensureQueueDir,
failDelivery,
loadPendingDeliveries,
moveToFailed,
} from "./delivery-queue-storage.js";
export type { QueuedDelivery, QueuedDeliveryPayload } from "./delivery-queue-storage.js";
export {
computeBackoffMs,
isEntryEligibleForRecoveryRetry,
isPermanentDeliveryError,
MAX_RETRIES,
recoverPendingDeliveries,
} from "./delivery-queue-recovery.js";
export type { DeliverFn, RecoveryLogger, RecoverySummary } from "./delivery-queue-recovery.js";

View File

@ -1,26 +1,10 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { ReplyPayload } from "../../auto-reply/types.js";
import { setDefaultChannelPluginRegistryForTests } from "../../commands/channel-test-helpers.js";
import type { OpenClawConfig } from "../../config/config.js";
import { setActivePluginRegistry } from "../../plugins/runtime.js";
import { createTestRegistry } from "../../test-utils/channel-plugins.js";
import { typedCases } from "../../test-utils/typed-cases.js";
import {
ackDelivery,
computeBackoffMs,
type DeliverFn,
enqueueDelivery,
failDelivery,
isEntryEligibleForRecoveryRetry,
isPermanentDeliveryError,
loadPendingDeliveries,
MAX_RETRIES,
moveToFailed,
recoverPendingDeliveries,
} from "./delivery-queue.js";
import { DirectoryCache } from "./directory-cache.js";
import { buildOutboundResultEnvelope } from "./envelope.js";
import type { OutboundDeliveryJson } from "./format.js";
@ -46,589 +30,6 @@ beforeEach(() => {
setActivePluginRegistry(createTestRegistry([]));
});
describe("delivery-queue", () => {
let tmpDir: string;
let fixtureRoot = "";
let fixtureCount = 0;
beforeAll(() => {
fixtureRoot = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-dq-suite-"));
});
beforeEach(() => {
tmpDir = path.join(fixtureRoot, `case-${fixtureCount++}`);
fs.mkdirSync(tmpDir, { recursive: true });
});
afterAll(() => {
if (!fixtureRoot) {
return;
}
fs.rmSync(fixtureRoot, { recursive: true, force: true });
fixtureRoot = "";
});
describe("enqueue + ack lifecycle", () => {
it("creates and removes a queue entry", async () => {
const id = await enqueueDelivery(
{
channel: "whatsapp",
to: "+1555",
payloads: [{ text: "hello" }],
bestEffort: true,
gifPlayback: true,
silent: true,
mirror: {
sessionKey: "agent:main:main",
text: "hello",
mediaUrls: ["https://example.com/file.png"],
},
},
tmpDir,
);
// Entry file exists after enqueue.
const queueDir = path.join(tmpDir, "delivery-queue");
const files = fs.readdirSync(queueDir).filter((f) => f.endsWith(".json"));
expect(files).toHaveLength(1);
expect(files[0]).toBe(`${id}.json`);
// Entry contents are correct.
const entry = JSON.parse(fs.readFileSync(path.join(queueDir, files[0]), "utf-8"));
expect(entry).toMatchObject({
id,
channel: "whatsapp",
to: "+1555",
bestEffort: true,
gifPlayback: true,
silent: true,
mirror: {
sessionKey: "agent:main:main",
text: "hello",
mediaUrls: ["https://example.com/file.png"],
},
retryCount: 0,
});
expect(entry.payloads).toEqual([{ text: "hello" }]);
// Ack removes the file.
await ackDelivery(id, tmpDir);
const remaining = fs.readdirSync(queueDir).filter((f) => f.endsWith(".json"));
expect(remaining).toHaveLength(0);
});
it("ack is idempotent (no error on missing file)", async () => {
await expect(ackDelivery("nonexistent-id", tmpDir)).resolves.toBeUndefined();
});
it("ack cleans up leftover .delivered marker when .json is already gone", async () => {
const id = await enqueueDelivery(
{ channel: "whatsapp", to: "+1", payloads: [{ text: "stale-marker" }] },
tmpDir,
);
const queueDir = path.join(tmpDir, "delivery-queue");
fs.renameSync(path.join(queueDir, `${id}.json`), path.join(queueDir, `${id}.delivered`));
await expect(ackDelivery(id, tmpDir)).resolves.toBeUndefined();
expect(fs.existsSync(path.join(queueDir, `${id}.delivered`))).toBe(false);
});
it("ack removes .delivered marker so recovery does not replay", async () => {
const id = await enqueueDelivery(
{ channel: "whatsapp", to: "+1", payloads: [{ text: "ack-test" }] },
tmpDir,
);
const queueDir = path.join(tmpDir, "delivery-queue");
await ackDelivery(id, tmpDir);
// Neither .json nor .delivered should remain.
expect(fs.existsSync(path.join(queueDir, `${id}.json`))).toBe(false);
expect(fs.existsSync(path.join(queueDir, `${id}.delivered`))).toBe(false);
});
it("loadPendingDeliveries cleans up stale .delivered markers without replaying", async () => {
const id = await enqueueDelivery(
{ channel: "telegram", to: "99", payloads: [{ text: "stale" }] },
tmpDir,
);
const queueDir = path.join(tmpDir, "delivery-queue");
// Simulate crash between ack phase 1 (rename) and phase 2 (unlink):
// rename .json → .delivered, then pretend the process died.
fs.renameSync(path.join(queueDir, `${id}.json`), path.join(queueDir, `${id}.delivered`));
const entries = await loadPendingDeliveries(tmpDir);
// The .delivered entry must NOT appear as pending.
expect(entries).toHaveLength(0);
// And the marker file should have been cleaned up.
expect(fs.existsSync(path.join(queueDir, `${id}.delivered`))).toBe(false);
});
});
describe("failDelivery", () => {
it("increments retryCount, records attempt time, and sets lastError", async () => {
const id = await enqueueDelivery(
{
channel: "telegram",
to: "123",
payloads: [{ text: "test" }],
},
tmpDir,
);
await failDelivery(id, "connection refused", tmpDir);
const queueDir = path.join(tmpDir, "delivery-queue");
const entry = JSON.parse(fs.readFileSync(path.join(queueDir, `${id}.json`), "utf-8"));
expect(entry.retryCount).toBe(1);
expect(typeof entry.lastAttemptAt).toBe("number");
expect(entry.lastAttemptAt).toBeGreaterThan(0);
expect(entry.lastError).toBe("connection refused");
});
});
describe("moveToFailed", () => {
it("moves entry to failed/ subdirectory", async () => {
const id = await enqueueDelivery(
{
channel: "slack",
to: "#general",
payloads: [{ text: "hi" }],
},
tmpDir,
);
await moveToFailed(id, tmpDir);
const queueDir = path.join(tmpDir, "delivery-queue");
const failedDir = path.join(queueDir, "failed");
expect(fs.existsSync(path.join(queueDir, `${id}.json`))).toBe(false);
expect(fs.existsSync(path.join(failedDir, `${id}.json`))).toBe(true);
});
});
describe("isPermanentDeliveryError", () => {
it.each([
"No conversation reference found for user:abc",
"Telegram send failed: chat not found (chat_id=user:123)",
"user not found",
"Bot was blocked by the user",
"Forbidden: bot was kicked from the group chat",
"chat_id is empty",
"Outbound not configured for channel: msteams",
])("returns true for permanent error: %s", (msg) => {
expect(isPermanentDeliveryError(msg)).toBe(true);
});
it.each([
"network down",
"ETIMEDOUT",
"socket hang up",
"rate limited",
"500 Internal Server Error",
])("returns false for transient error: %s", (msg) => {
expect(isPermanentDeliveryError(msg)).toBe(false);
});
});
describe("loadPendingDeliveries", () => {
it("returns empty array when queue directory does not exist", async () => {
const nonexistent = path.join(tmpDir, "no-such-dir");
const entries = await loadPendingDeliveries(nonexistent);
expect(entries).toEqual([]);
});
it("loads multiple entries", async () => {
await enqueueDelivery({ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, tmpDir);
await enqueueDelivery({ channel: "telegram", to: "2", payloads: [{ text: "b" }] }, tmpDir);
const entries = await loadPendingDeliveries(tmpDir);
expect(entries).toHaveLength(2);
});
it("backfills lastAttemptAt for legacy retry entries during load", async () => {
const id = await enqueueDelivery(
{ channel: "whatsapp", to: "+1", payloads: [{ text: "legacy" }] },
tmpDir,
);
const filePath = path.join(tmpDir, "delivery-queue", `${id}.json`);
const legacyEntry = JSON.parse(fs.readFileSync(filePath, "utf-8"));
legacyEntry.retryCount = 2;
delete legacyEntry.lastAttemptAt;
fs.writeFileSync(filePath, JSON.stringify(legacyEntry), "utf-8");
const entries = await loadPendingDeliveries(tmpDir);
expect(entries).toHaveLength(1);
expect(entries[0]?.lastAttemptAt).toBe(entries[0]?.enqueuedAt);
const persisted = JSON.parse(fs.readFileSync(filePath, "utf-8"));
expect(persisted.lastAttemptAt).toBe(persisted.enqueuedAt);
});
});
describe("computeBackoffMs", () => {
it("returns scheduled backoff values and clamps at max retry", () => {
const cases = [
{ retryCount: 0, expected: 0 },
{ retryCount: 1, expected: 5_000 },
{ retryCount: 2, expected: 25_000 },
{ retryCount: 3, expected: 120_000 },
{ retryCount: 4, expected: 600_000 },
// Beyond defined schedule -- clamps to last value.
{ retryCount: 5, expected: 600_000 },
] as const;
for (const testCase of cases) {
expect(computeBackoffMs(testCase.retryCount), String(testCase.retryCount)).toBe(
testCase.expected,
);
}
});
});
describe("isEntryEligibleForRecoveryRetry", () => {
it("allows first replay after crash for retryCount=0 without lastAttemptAt", () => {
const now = Date.now();
const result = isEntryEligibleForRecoveryRetry(
{
id: "entry-1",
channel: "whatsapp",
to: "+1",
payloads: [{ text: "a" }],
enqueuedAt: now,
retryCount: 0,
},
now,
);
expect(result).toEqual({ eligible: true });
});
it("defers retry entries until backoff window elapses", () => {
const now = Date.now();
const result = isEntryEligibleForRecoveryRetry(
{
id: "entry-2",
channel: "whatsapp",
to: "+1",
payloads: [{ text: "a" }],
enqueuedAt: now - 30_000,
retryCount: 3,
lastAttemptAt: now,
},
now,
);
expect(result.eligible).toBe(false);
if (result.eligible) {
throw new Error("Expected ineligible retry entry");
}
expect(result.remainingBackoffMs).toBeGreaterThan(0);
});
});
describe("recoverPendingDeliveries", () => {
const baseCfg = {};
const createLog = () => ({ info: vi.fn(), warn: vi.fn(), error: vi.fn() });
const enqueueCrashRecoveryEntries = async () => {
await enqueueDelivery({ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, tmpDir);
await enqueueDelivery({ channel: "telegram", to: "2", payloads: [{ text: "b" }] }, tmpDir);
};
const setEntryState = (
id: string,
state: { retryCount: number; lastAttemptAt?: number; enqueuedAt?: number },
) => {
const filePath = path.join(tmpDir, "delivery-queue", `${id}.json`);
const entry = JSON.parse(fs.readFileSync(filePath, "utf-8"));
entry.retryCount = state.retryCount;
if (state.lastAttemptAt === undefined) {
delete entry.lastAttemptAt;
} else {
entry.lastAttemptAt = state.lastAttemptAt;
}
if (state.enqueuedAt !== undefined) {
entry.enqueuedAt = state.enqueuedAt;
}
fs.writeFileSync(filePath, JSON.stringify(entry), "utf-8");
};
const runRecovery = async ({
deliver,
log = createLog(),
maxRecoveryMs,
}: {
deliver: ReturnType<typeof vi.fn>;
log?: ReturnType<typeof createLog>;
maxRecoveryMs?: number;
}) => {
const result = await recoverPendingDeliveries({
deliver: deliver as DeliverFn,
log,
cfg: baseCfg,
stateDir: tmpDir,
...(maxRecoveryMs === undefined ? {} : { maxRecoveryMs }),
});
return { result, log };
};
it("recovers entries from a simulated crash", async () => {
// Manually create queue entries as if gateway crashed before delivery.
await enqueueCrashRecoveryEntries();
const deliver = vi.fn().mockResolvedValue([]);
const { result } = await runRecovery({ deliver });
expect(deliver).toHaveBeenCalledTimes(2);
expect(result.recovered).toBe(2);
expect(result.failed).toBe(0);
expect(result.skippedMaxRetries).toBe(0);
expect(result.deferredBackoff).toBe(0);
// Queue should be empty after recovery.
const remaining = await loadPendingDeliveries(tmpDir);
expect(remaining).toHaveLength(0);
});
it("moves entries that exceeded max retries to failed/", async () => {
// Create an entry and manually set retryCount to MAX_RETRIES.
const id = await enqueueDelivery(
{ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] },
tmpDir,
);
setEntryState(id, { retryCount: MAX_RETRIES });
const deliver = vi.fn();
const { result } = await runRecovery({ deliver });
expect(deliver).not.toHaveBeenCalled();
expect(result.skippedMaxRetries).toBe(1);
expect(result.deferredBackoff).toBe(0);
// Entry should be in failed/ directory.
const failedDir = path.join(tmpDir, "delivery-queue", "failed");
expect(fs.existsSync(path.join(failedDir, `${id}.json`))).toBe(true);
});
it("increments retryCount on failed recovery attempt", async () => {
await enqueueDelivery({ channel: "slack", to: "#ch", payloads: [{ text: "x" }] }, tmpDir);
const deliver = vi.fn().mockRejectedValue(new Error("network down"));
const { result } = await runRecovery({ deliver });
expect(result.failed).toBe(1);
expect(result.recovered).toBe(0);
// Entry should still be in queue with incremented retryCount.
const entries = await loadPendingDeliveries(tmpDir);
expect(entries).toHaveLength(1);
expect(entries[0].retryCount).toBe(1);
expect(entries[0].lastError).toBe("network down");
});
it("moves entries to failed/ immediately on permanent delivery errors", async () => {
const id = await enqueueDelivery(
{ channel: "msteams", to: "user:abc", payloads: [{ text: "hi" }] },
tmpDir,
);
const deliver = vi
.fn()
.mockRejectedValue(new Error("No conversation reference found for user:abc"));
const log = createLog();
const { result } = await runRecovery({ deliver, log });
expect(result.failed).toBe(1);
expect(result.recovered).toBe(0);
const remaining = await loadPendingDeliveries(tmpDir);
expect(remaining).toHaveLength(0);
const failedDir = path.join(tmpDir, "delivery-queue", "failed");
expect(fs.existsSync(path.join(failedDir, `${id}.json`))).toBe(true);
expect(log.warn).toHaveBeenCalledWith(expect.stringContaining("permanent error"));
});
it("passes skipQueue: true to prevent re-enqueueing during recovery", async () => {
await enqueueDelivery({ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, tmpDir);
const deliver = vi.fn().mockResolvedValue([]);
await runRecovery({ deliver });
expect(deliver).toHaveBeenCalledWith(expect.objectContaining({ skipQueue: true }));
});
it("replays stored delivery options during recovery", async () => {
await enqueueDelivery(
{
channel: "whatsapp",
to: "+1",
payloads: [{ text: "a" }],
bestEffort: true,
gifPlayback: true,
silent: true,
mirror: {
sessionKey: "agent:main:main",
text: "a",
mediaUrls: ["https://example.com/a.png"],
},
},
tmpDir,
);
const deliver = vi.fn().mockResolvedValue([]);
await runRecovery({ deliver });
expect(deliver).toHaveBeenCalledWith(
expect.objectContaining({
bestEffort: true,
gifPlayback: true,
silent: true,
mirror: {
sessionKey: "agent:main:main",
text: "a",
mediaUrls: ["https://example.com/a.png"],
},
}),
);
});
it("respects maxRecoveryMs time budget", async () => {
await enqueueCrashRecoveryEntries();
await enqueueDelivery({ channel: "slack", to: "#c", payloads: [{ text: "c" }] }, tmpDir);
const deliver = vi.fn().mockResolvedValue([]);
const { result, log } = await runRecovery({
deliver,
maxRecoveryMs: 0, // Immediate timeout -- no entries should be processed.
});
expect(deliver).not.toHaveBeenCalled();
expect(result.recovered).toBe(0);
expect(result.failed).toBe(0);
expect(result.skippedMaxRetries).toBe(0);
expect(result.deferredBackoff).toBe(0);
// All entries should still be in the queue (retryCount < MAX_RETRIES).
const remaining = await loadPendingDeliveries(tmpDir);
expect(remaining).toHaveLength(3);
// retryCount should be incremented on all deferred entries so they
// eventually reach MAX_RETRIES and are pruned rather than looping forever.
expect(remaining.every((e) => e.retryCount === 1)).toBe(true);
// Should have logged a warning about deferred entries.
expect(log.warn).toHaveBeenCalledWith(expect.stringContaining("deferred to next startup"));
});
it("defers entries until backoff becomes eligible", async () => {
const id = await enqueueDelivery(
{ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] },
tmpDir,
);
setEntryState(id, { retryCount: 3, lastAttemptAt: Date.now() });
const deliver = vi.fn().mockResolvedValue([]);
const { result, log } = await runRecovery({
deliver,
maxRecoveryMs: 60_000,
});
expect(deliver).not.toHaveBeenCalled();
expect(result).toEqual({
recovered: 0,
failed: 0,
skippedMaxRetries: 0,
deferredBackoff: 1,
});
const remaining = await loadPendingDeliveries(tmpDir);
expect(remaining).toHaveLength(1);
expect(log.info).toHaveBeenCalledWith(expect.stringContaining("not ready for retry yet"));
});
it("continues past high-backoff entries and recovers ready entries behind them", async () => {
const now = Date.now();
const blockedId = await enqueueDelivery(
{ channel: "whatsapp", to: "+1", payloads: [{ text: "blocked" }] },
tmpDir,
);
const readyId = await enqueueDelivery(
{ channel: "telegram", to: "2", payloads: [{ text: "ready" }] },
tmpDir,
);
setEntryState(blockedId, { retryCount: 3, lastAttemptAt: now, enqueuedAt: now - 30_000 });
setEntryState(readyId, { retryCount: 0, enqueuedAt: now - 10_000 });
const deliver = vi.fn().mockResolvedValue([]);
const { result } = await runRecovery({ deliver, maxRecoveryMs: 60_000 });
expect(result).toEqual({
recovered: 1,
failed: 0,
skippedMaxRetries: 0,
deferredBackoff: 1,
});
expect(deliver).toHaveBeenCalledTimes(1);
expect(deliver).toHaveBeenCalledWith(
expect.objectContaining({ channel: "telegram", to: "2", skipQueue: true }),
);
const remaining = await loadPendingDeliveries(tmpDir);
expect(remaining).toHaveLength(1);
expect(remaining[0]?.id).toBe(blockedId);
});
it("recovers deferred entries on a later restart once backoff elapsed", async () => {
vi.useFakeTimers();
const start = new Date("2026-01-01T00:00:00.000Z");
vi.setSystemTime(start);
const id = await enqueueDelivery(
{ channel: "whatsapp", to: "+1", payloads: [{ text: "later" }] },
tmpDir,
);
setEntryState(id, { retryCount: 3, lastAttemptAt: start.getTime() });
const firstDeliver = vi.fn().mockResolvedValue([]);
const firstRun = await runRecovery({ deliver: firstDeliver, maxRecoveryMs: 60_000 });
expect(firstRun.result).toEqual({
recovered: 0,
failed: 0,
skippedMaxRetries: 0,
deferredBackoff: 1,
});
expect(firstDeliver).not.toHaveBeenCalled();
vi.setSystemTime(new Date(start.getTime() + 600_000 + 1));
const secondDeliver = vi.fn().mockResolvedValue([]);
const secondRun = await runRecovery({ deliver: secondDeliver, maxRecoveryMs: 60_000 });
expect(secondRun.result).toEqual({
recovered: 1,
failed: 0,
skippedMaxRetries: 0,
deferredBackoff: 0,
});
expect(secondDeliver).toHaveBeenCalledTimes(1);
const remaining = await loadPendingDeliveries(tmpDir);
expect(remaining).toHaveLength(0);
vi.useRealTimers();
});
it("returns zeros when queue is empty", async () => {
const deliver = vi.fn();
const { result } = await runRecovery({ deliver });
expect(result).toEqual({
recovered: 0,
failed: 0,
skippedMaxRetries: 0,
deferredBackoff: 0,
});
expect(deliver).not.toHaveBeenCalled();
});
});
});
describe("DirectoryCache", () => {
const cfg = {} as OpenClawConfig;