diff --git a/src/infra/bonjour.test.ts b/src/infra/bonjour.test.ts index fbb80603bb3..8a24f2a50d6 100644 --- a/src/infra/bonjour.test.ts +++ b/src/infra/bonjour.test.ts @@ -329,6 +329,37 @@ describe("gateway bonjour advertiser", () => { await started.stop(); }); + it("suppresses ciao self-probe retry console noise while advertising", async () => { + enableAdvertiserUnitMode(); + + const destroy = vi.fn().mockResolvedValue(undefined); + const advertise = vi.fn().mockResolvedValue(undefined); + mockCiaoService({ advertise, destroy }); + + const originalConsoleLog = console.log; + const baseConsoleLog = vi.fn(); + console.log = baseConsoleLog as typeof console.log; + + try { + const started = await startGatewayBonjourAdvertiser({ + gatewayPort: 18789, + sshPort: 2222, + }); + + console.log( + "[test._openclaw-gw._tcp.local.] failed probing with reason: Error: Can't probe for a service which is announced already. Received announcing for service test._openclaw-gw._tcp.local.. Trying again in 2 seconds!", + ); + console.log("ordinary console line"); + + expect(baseConsoleLog).toHaveBeenCalledTimes(1); + expect(baseConsoleLog).toHaveBeenCalledWith("ordinary console line"); + + await started.stop(); + } finally { + console.log = originalConsoleLog; + } + }); + it("recreates the advertiser when ciao gets stuck announcing", async () => { enableAdvertiserUnitMode(); vi.useFakeTimers(); diff --git a/src/infra/bonjour.ts b/src/infra/bonjour.ts index 02775a8f086..f58c905eca0 100644 --- a/src/infra/bonjour.ts +++ b/src/infra/bonjour.ts @@ -63,9 +63,13 @@ type ServiceStateTracker = { sinceMs: number; }; +type ConsoleLogFn = (...args: unknown[]) => void; + const WATCHDOG_INTERVAL_MS = 5_000; const REPAIR_DEBOUNCE_MS = 30_000; const STUCK_ANNOUNCING_MS = 8_000; +const CIAO_SELF_PROBE_RETRY_FRAGMENT = + "failed probing with reason: Error: Can't probe for a service which is announced already."; function serviceSummary(label: string, svc: BonjourService): string { let fqdn = "unknown"; @@ -109,6 +113,28 @@ function handleCiaoUnhandledRejection(reason: unknown): boolean { return true; } +function shouldSuppressCiaoConsoleLog(args: unknown[]): boolean { + return args.some( + (arg) => typeof arg === "string" && arg.includes(CIAO_SELF_PROBE_RETRY_FRAGMENT), + ); +} + +function installCiaoConsoleNoiseFilter(): () => void { + const originalConsoleLog = console.log as ConsoleLogFn; + console.log = ((...args: unknown[]) => { + if (shouldSuppressCiaoConsoleLog(args)) { + return; + } + originalConsoleLog(...args); + }) as ConsoleLogFn; + return () => { + if (console.log === originalConsoleLog) { + return; + } + console.log = originalConsoleLog; + }; +} + export async function startGatewayBonjourAdvertiser( opts: GatewayBonjourAdvertiseOpts, ): Promise { @@ -117,278 +143,287 @@ export async function startGatewayBonjourAdvertiser( } const { getResponder, Protocol } = await import("@homebridge/ciao"); + const restoreConsoleLog = installCiaoConsoleNoiseFilter(); + try { + // mDNS service instance names are single DNS labels; dots in hostnames (like + // `Mac.localdomain`) can confuse some resolvers/browsers and break discovery. + // Keep only the first label and normalize away a trailing `.local`. + const hostnameRaw = process.env.OPENCLAW_MDNS_HOSTNAME?.trim() || "openclaw"; + const hostname = + hostnameRaw + .replace(/\.local$/i, "") + .split(".")[0] + .trim() || "openclaw"; + const instanceName = + typeof opts.instanceName === "string" && opts.instanceName.trim() + ? opts.instanceName.trim() + : `${hostname} (OpenClaw)`; + const displayName = prettifyInstanceName(instanceName); - // mDNS service instance names are single DNS labels; dots in hostnames (like - // `Mac.localdomain`) can confuse some resolvers/browsers and break discovery. - // Keep only the first label and normalize away a trailing `.local`. - const hostnameRaw = process.env.OPENCLAW_MDNS_HOSTNAME?.trim() || "openclaw"; - const hostname = - hostnameRaw - .replace(/\.local$/i, "") - .split(".")[0] - .trim() || "openclaw"; - const instanceName = - typeof opts.instanceName === "string" && opts.instanceName.trim() - ? opts.instanceName.trim() - : `${hostname} (OpenClaw)`; - const displayName = prettifyInstanceName(instanceName); - - const txtBase: Record = { - role: "gateway", - gatewayPort: String(opts.gatewayPort), - lanHost: `${hostname}.local`, - displayName, - }; - if (opts.gatewayTlsEnabled) { - txtBase.gatewayTls = "1"; - if (opts.gatewayTlsFingerprintSha256) { - txtBase.gatewayTlsSha256 = opts.gatewayTlsFingerprintSha256; - } - } - if (typeof opts.canvasPort === "number" && opts.canvasPort > 0) { - txtBase.canvasPort = String(opts.canvasPort); - } - if (typeof opts.tailnetDns === "string" && opts.tailnetDns.trim()) { - txtBase.tailnetDns = opts.tailnetDns.trim(); - } - // In minimal mode, omit cliPath to avoid exposing filesystem structure. - // This info can be obtained via the authenticated WebSocket if needed. - if (!opts.minimal && typeof opts.cliPath === "string" && opts.cliPath.trim()) { - txtBase.cliPath = opts.cliPath.trim(); - } - - // Build TXT record for the gateway service. - // In minimal mode, omit sshPort to avoid advertising SSH availability. - const gatewayTxt: Record = { - ...txtBase, - transport: "gateway", - }; - if (!opts.minimal) { - gatewayTxt.sshPort = String(opts.sshPort ?? 22); - } - - function createCycle(): BonjourCycle { - const responder = getResponder(); - const services: Array<{ label: string; svc: BonjourService }> = []; - - const gateway = responder.createService({ - name: safeServiceName(instanceName), - type: "openclaw-gw", - protocol: Protocol.TCP, - port: opts.gatewayPort, - domain: "local", - hostname, - txt: gatewayTxt, - }); - services.push({ - label: "gateway", - svc: gateway as unknown as BonjourService, - }); - - const cleanupUnhandledRejection = - services.length > 0 - ? registerUnhandledRejectionHandler(handleCiaoUnhandledRejection) - : undefined; - - return { responder, services, cleanupUnhandledRejection }; - } - - async function stopCycle(cycle: BonjourCycle | null) { - if (!cycle) { - return; - } - const responder = cycle.responder as unknown as { - advertiseService?: (...args: unknown[]) => unknown; - announce?: (...args: unknown[]) => unknown; - probe?: (...args: unknown[]) => unknown; - republishService?: (...args: unknown[]) => unknown; + const txtBase: Record = { + role: "gateway", + gatewayPort: String(opts.gatewayPort), + lanHost: `${hostname}.local`, + displayName, }; - const noopAsync = async () => {}; - // ciao schedules its own 2s retry timers after failed probe/announce attempts. - // Those callbacks target the original responder instance, so disarm it before - // destroy/shutdown to prevent a dead cycle from re-entering advertise/probe. - responder.advertiseService = noopAsync; - responder.announce = noopAsync; - responder.probe = noopAsync; - responder.republishService = noopAsync; - for (const { svc } of cycle.services) { + if (opts.gatewayTlsEnabled) { + txtBase.gatewayTls = "1"; + if (opts.gatewayTlsFingerprintSha256) { + txtBase.gatewayTlsSha256 = opts.gatewayTlsFingerprintSha256; + } + } + if (typeof opts.canvasPort === "number" && opts.canvasPort > 0) { + txtBase.canvasPort = String(opts.canvasPort); + } + if (typeof opts.tailnetDns === "string" && opts.tailnetDns.trim()) { + txtBase.tailnetDns = opts.tailnetDns.trim(); + } + // In minimal mode, omit cliPath to avoid exposing filesystem structure. + // This info can be obtained via the authenticated WebSocket if needed. + if (!opts.minimal && typeof opts.cliPath === "string" && opts.cliPath.trim()) { + txtBase.cliPath = opts.cliPath.trim(); + } + + // Build TXT record for the gateway service. + // In minimal mode, omit sshPort to avoid advertising SSH availability. + const gatewayTxt: Record = { + ...txtBase, + transport: "gateway", + }; + if (!opts.minimal) { + gatewayTxt.sshPort = String(opts.sshPort ?? 22); + } + + function createCycle(): BonjourCycle { + const responder = getResponder(); + const services: Array<{ label: string; svc: BonjourService }> = []; + + const gateway = responder.createService({ + name: safeServiceName(instanceName), + type: "openclaw-gw", + protocol: Protocol.TCP, + port: opts.gatewayPort, + domain: "local", + hostname, + txt: gatewayTxt, + }); + services.push({ + label: "gateway", + svc: gateway as unknown as BonjourService, + }); + + const cleanupUnhandledRejection = + services.length > 0 + ? registerUnhandledRejectionHandler(handleCiaoUnhandledRejection) + : undefined; + + return { responder, services, cleanupUnhandledRejection }; + } + + async function stopCycle(cycle: BonjourCycle | null) { + if (!cycle) { + return; + } + const responder = cycle.responder as unknown as { + advertiseService?: (...args: unknown[]) => unknown; + announce?: (...args: unknown[]) => unknown; + probe?: (...args: unknown[]) => unknown; + republishService?: (...args: unknown[]) => unknown; + }; + const noopAsync = async () => {}; + // ciao schedules its own 2s retry timers after failed probe/announce attempts. + // Those callbacks target the original responder instance, so disarm it before + // destroy/shutdown to prevent a dead cycle from re-entering advertise/probe. + responder.advertiseService = noopAsync; + responder.announce = noopAsync; + responder.probe = noopAsync; + responder.republishService = noopAsync; + for (const { svc } of cycle.services) { + try { + await svc.destroy(); + } catch { + /* ignore */ + } + } try { - await svc.destroy(); + await cycle.responder.shutdown(); } catch { /* ignore */ + } finally { + cycle.cleanupUnhandledRejection?.(); } } - try { - await cycle.responder.shutdown(); - } catch { - /* ignore */ - } finally { - cycle.cleanupUnhandledRejection?.(); - } - } - function attachConflictListeners(services: Array<{ label: string; svc: BonjourService }>) { - for (const { label, svc } of services) { - try { - svc.on("name-change", (name: unknown) => { - const next = typeof name === "string" ? name : String(name); - logWarn(`bonjour: ${label} name conflict resolved; newName=${JSON.stringify(next)}`); - }); - svc.on("hostname-change", (nextHostname: unknown) => { - const next = typeof nextHostname === "string" ? nextHostname : String(nextHostname); - logWarn( - `bonjour: ${label} hostname conflict resolved; newHostname=${JSON.stringify(next)}`, - ); - }); - } catch (err) { - logDebug(`bonjour: failed to attach listeners for ${label}: ${String(err)}`); - } - } - } - - function startAdvertising(services: Array<{ label: string; svc: BonjourService }>) { - for (const { label, svc } of services) { - try { - void svc - .advertise() - .then(() => { - // Keep this out of stdout/stderr (menubar + tests) but capture in the rolling log. - getLogger().info(`bonjour: advertised ${serviceSummary(label, svc)}`); - }) - .catch((err) => { + function attachConflictListeners(services: Array<{ label: string; svc: BonjourService }>) { + for (const { label, svc } of services) { + try { + svc.on("name-change", (name: unknown) => { + const next = typeof name === "string" ? name : String(name); + logWarn(`bonjour: ${label} name conflict resolved; newName=${JSON.stringify(next)}`); + }); + svc.on("hostname-change", (nextHostname: unknown) => { + const next = typeof nextHostname === "string" ? nextHostname : String(nextHostname); logWarn( - `bonjour: advertise failed (${serviceSummary(label, svc)}): ${formatBonjourError(err)}`, + `bonjour: ${label} hostname conflict resolved; newHostname=${JSON.stringify(next)}`, ); }); - } catch (err) { - logWarn( - `bonjour: advertise threw (${serviceSummary(label, svc)}): ${formatBonjourError(err)}`, - ); + } catch (err) { + logDebug(`bonjour: failed to attach listeners for ${label}: ${String(err)}`); + } } } - } - logDebug( - `bonjour: starting (hostname=${hostname}, instance=${JSON.stringify( - safeServiceName(instanceName), - )}, gatewayPort=${opts.gatewayPort}${opts.minimal ? ", minimal=true" : `, sshPort=${opts.sshPort ?? 22}`})`, - ); - - let stopped = false; - let recreatePromise: Promise | null = null; - let cycle = createCycle(); - const stateTracker = new Map(); - attachConflictListeners(cycle.services); - startAdvertising(cycle.services); - - const updateStateTrackers = (services: Array<{ label: string; svc: BonjourService }>) => { - const now = Date.now(); - for (const { label, svc } of services) { - const nextState: BonjourServiceState | "unknown" = - typeof svc.serviceState === "string" ? svc.serviceState : "unknown"; - const current = stateTracker.get(label); - const nextEnteredAt = - current && !isAnnouncedState(current.state) && !isAnnouncedState(nextState) - ? current.sinceMs - : now; - if (!current || current.state !== nextState || current.sinceMs !== nextEnteredAt) { - stateTracker.set(label, { state: nextState, sinceMs: nextEnteredAt }); + function startAdvertising(services: Array<{ label: string; svc: BonjourService }>) { + for (const { label, svc } of services) { + try { + void svc + .advertise() + .then(() => { + // Keep this out of stdout/stderr (menubar + tests) but capture in the rolling log. + getLogger().info(`bonjour: advertised ${serviceSummary(label, svc)}`); + }) + .catch((err) => { + logWarn( + `bonjour: advertise failed (${serviceSummary(label, svc)}): ${formatBonjourError(err)}`, + ); + }); + } catch (err) { + logWarn( + `bonjour: advertise threw (${serviceSummary(label, svc)}): ${formatBonjourError(err)}`, + ); + } } } - }; - const recreateAdvertiser = async (reason: string) => { - if (stopped) { - return; - } - if (recreatePromise) { + logDebug( + `bonjour: starting (hostname=${hostname}, instance=${JSON.stringify( + safeServiceName(instanceName), + )}, gatewayPort=${opts.gatewayPort}${opts.minimal ? ", minimal=true" : `, sshPort=${opts.sshPort ?? 22}`})`, + ); + + let stopped = false; + let recreatePromise: Promise | null = null; + let cycle = createCycle(); + const stateTracker = new Map(); + attachConflictListeners(cycle.services); + startAdvertising(cycle.services); + + const updateStateTrackers = (services: Array<{ label: string; svc: BonjourService }>) => { + const now = Date.now(); + for (const { label, svc } of services) { + const nextState: BonjourServiceState | "unknown" = + typeof svc.serviceState === "string" ? svc.serviceState : "unknown"; + const current = stateTracker.get(label); + const nextEnteredAt = + current && !isAnnouncedState(current.state) && !isAnnouncedState(nextState) + ? current.sinceMs + : now; + if (!current || current.state !== nextState || current.sinceMs !== nextEnteredAt) { + stateTracker.set(label, { state: nextState, sinceMs: nextEnteredAt }); + } + } + }; + + const recreateAdvertiser = async (reason: string) => { + if (stopped) { + return; + } + if (recreatePromise) { + return recreatePromise; + } + recreatePromise = (async () => { + logWarn(`bonjour: restarting advertiser (${reason})`); + const previous = cycle; + await stopCycle(previous); + cycle = createCycle(); + stateTracker.clear(); + attachConflictListeners(cycle.services); + startAdvertising(cycle.services); + })().finally(() => { + recreatePromise = null; + }); return recreatePromise; - } - recreatePromise = (async () => { - logWarn(`bonjour: restarting advertiser (${reason})`); - const previous = cycle; - await stopCycle(previous); - cycle = createCycle(); - stateTracker.clear(); - attachConflictListeners(cycle.services); - startAdvertising(cycle.services); - })().finally(() => { - recreatePromise = null; - }); - return recreatePromise; - }; + }; - // Watchdog: if we ever end up in an unannounced state (e.g. after sleep/wake or - // interface churn), try to re-advertise instead of requiring a full gateway restart. - const lastRepairAttempt = new Map(); - const watchdog = setInterval(() => { - if (stopped || recreatePromise) { - return; - } - updateStateTrackers(cycle.services); - for (const { label, svc } of cycle.services) { - const stateUnknown = (svc as { serviceState?: unknown }).serviceState; - if (typeof stateUnknown !== "string") { - continue; + // Watchdog: if we ever end up in an unannounced state (e.g. after sleep/wake or + // interface churn), try to re-advertise instead of requiring a full gateway restart. + const lastRepairAttempt = new Map(); + const watchdog = setInterval(() => { + if (stopped || recreatePromise) { + return; } - const tracked = stateTracker.get(label); - if ( - stateUnknown !== "announced" && - tracked && - Date.now() - tracked.sinceMs >= STUCK_ANNOUNCING_MS - ) { - void recreateAdvertiser( - `service stuck in ${stateUnknown} for ${Date.now() - tracked.sinceMs}ms (${serviceSummary( + updateStateTrackers(cycle.services); + for (const { label, svc } of cycle.services) { + const stateUnknown = (svc as { serviceState?: unknown }).serviceState; + if (typeof stateUnknown !== "string") { + continue; + } + const tracked = stateTracker.get(label); + if ( + stateUnknown !== "announced" && + tracked && + Date.now() - tracked.sinceMs >= STUCK_ANNOUNCING_MS + ) { + void recreateAdvertiser( + `service stuck in ${stateUnknown} for ${Date.now() - tracked.sinceMs}ms (${serviceSummary( + label, + svc, + )})`, + ); + return; + } + if (stateUnknown === "announced" || stateUnknown === "announcing") { + continue; + } + + let key = label; + try { + key = `${label}:${svc.getFQDN()}`; + } catch { + // ignore + } + const now = Date.now(); + const last = lastRepairAttempt.get(key) ?? 0; + if (now - last < REPAIR_DEBOUNCE_MS) { + continue; + } + lastRepairAttempt.set(key, now); + + logWarn( + `bonjour: watchdog detected non-announced service; attempting re-advertise (${serviceSummary( label, svc, )})`, ); - return; - } - if (stateUnknown === "announced" || stateUnknown === "announcing") { - continue; - } - - let key = label; - try { - key = `${label}:${svc.getFQDN()}`; - } catch { - // ignore - } - const now = Date.now(); - const last = lastRepairAttempt.get(key) ?? 0; - if (now - last < REPAIR_DEBOUNCE_MS) { - continue; - } - lastRepairAttempt.set(key, now); - - logWarn( - `bonjour: watchdog detected non-announced service; attempting re-advertise (${serviceSummary( - label, - svc, - )})`, - ); - try { - void svc.advertise().catch((err) => { + try { + void svc.advertise().catch((err) => { + logWarn( + `bonjour: watchdog advertise failed (${serviceSummary(label, svc)}): ${formatBonjourError(err)}`, + ); + }); + } catch (err) { logWarn( - `bonjour: watchdog advertise failed (${serviceSummary(label, svc)}): ${formatBonjourError(err)}`, + `bonjour: watchdog advertise threw (${serviceSummary(label, svc)}): ${formatBonjourError(err)}`, ); - }); - } catch (err) { - logWarn( - `bonjour: watchdog advertise threw (${serviceSummary(label, svc)}): ${formatBonjourError(err)}`, - ); + } } - } - }, WATCHDOG_INTERVAL_MS); - watchdog.unref?.(); + }, WATCHDOG_INTERVAL_MS); + watchdog.unref?.(); - return { - stop: async () => { - stopped = true; - clearInterval(watchdog); - await recreatePromise; - await stopCycle(cycle); - }, - }; + return { + stop: async () => { + stopped = true; + try { + clearInterval(watchdog); + await recreatePromise; + await stopCycle(cycle); + } finally { + restoreConsoleLog(); + } + }, + }; + } catch (err) { + restoreConsoleLog(); + throw err; + } } diff --git a/src/plugin-sdk/root-alias.cjs b/src/plugin-sdk/root-alias.cjs index d94e21f2282..a818c8d8e4e 100644 --- a/src/plugin-sdk/root-alias.cjs +++ b/src/plugin-sdk/root-alias.cjs @@ -10,10 +10,13 @@ const pluginSdkSubpathsCache = new Map(); const isDistRootAlias = __filename.includes( `${path.sep}dist${path.sep}plugin-sdk${path.sep}root-alias.cjs`, ); -const shouldPreferSourceInTests = +// Source plugin entry loading must stay on the source graph end-to-end. Mixing a +// source root alias with dist compat/runtime shims can split singleton deps +// (for example matrix-js-sdk) across two module graphs. +const shouldPreferSourceGraph = !isDistRootAlias && - (Boolean(process.env.VITEST) || - process.env.NODE_ENV === "test" || + (process.env.NODE_ENV !== "production" || + Boolean(process.env.VITEST) || process.env.OPENCLAW_PLUGIN_SDK_SOURCE_IN_TESTS === "1"); function emptyPluginConfigSchema() { @@ -160,7 +163,7 @@ function loadMonolithicSdk() { } const distCandidate = path.resolve(__dirname, "..", "..", "dist", "plugin-sdk", "compat.js"); - if (!shouldPreferSourceInTests && fs.existsSync(distCandidate)) { + if (!shouldPreferSourceGraph && fs.existsSync(distCandidate)) { try { monolithicSdk = getJiti(true)(distCandidate); return monolithicSdk; @@ -186,7 +189,7 @@ function loadDiagnosticEventsModule() { "infra", "diagnostic-events.js", ); - if (!shouldPreferSourceInTests) { + if (!shouldPreferSourceGraph) { const distCandidate = (fs.existsSync(directDistCandidate) && directDistCandidate) || findDistChunkByPrefix("diagnostic-events"); diff --git a/src/plugin-sdk/root-alias.test.ts b/src/plugin-sdk/root-alias.test.ts index 7ef757ca5ab..2350d517cc4 100644 --- a/src/plugin-sdk/root-alias.test.ts +++ b/src/plugin-sdk/root-alias.test.ts @@ -173,10 +173,22 @@ describe("plugin-sdk root alias", () => { }); it.each([ + { + name: "prefers source loading when the source root alias runs in development", + options: { + distExists: true, + env: { NODE_ENV: "development" }, + monolithicExports: { + slowHelper: (): string => "loaded", + }, + }, + expectedTryNative: false, + }, { name: "prefers native loading when compat resolves to dist", options: { distExists: true, + env: { NODE_ENV: "production" }, monolithicExports: { slowHelper: (): string => "loaded", }, diff --git a/src/process/command-queue.test.ts b/src/process/command-queue.test.ts index a35512d4f0d..2288e98fcec 100644 --- a/src/process/command-queue.test.ts +++ b/src/process/command-queue.test.ts @@ -152,6 +152,22 @@ describe("command queue", () => { } }); + it("demotes live model switch lane failures to debug noise", async () => { + const error = new Error("Live session model switch requested: anthropic/claude-opus-4-6"); + error.name = "LiveSessionModelSwitchError"; + + await expect( + enqueueCommandInLane("nested", async () => { + throw error; + }), + ).rejects.toBe(error); + + expect(diagnosticMocks.diag.error).not.toHaveBeenCalled(); + expect(diagnosticMocks.diag.debug).toHaveBeenCalledWith( + expect.stringContaining("lane task interrupted: lane=nested"), + ); + }); + it("getActiveTaskCount returns count of currently executing tasks", async () => { const { task, release } = enqueueBlockedMainTask(); diff --git a/src/process/command-queue.ts b/src/process/command-queue.ts index 42da633e846..85fec647ea6 100644 --- a/src/process/command-queue.ts +++ b/src/process/command-queue.ts @@ -47,6 +47,10 @@ type LaneState = { generation: number; }; +function isExpectedNonErrorLaneFailure(err: unknown): boolean { + return err instanceof Error && err.name === "LiveSessionModelSwitchError"; +} + /** * Keep queue runtime state on globalThis so every bundled entry/chunk shares * the same lanes, counters, and draining flag in production builds. @@ -141,10 +145,14 @@ function drainLane(lane: string) { } catch (err) { const completedCurrentGeneration = completeTask(state, taskId, taskGeneration); const isProbeLane = lane.startsWith("auth-probe:") || lane.startsWith("session:probe-"); - if (!isProbeLane) { + if (!isProbeLane && !isExpectedNonErrorLaneFailure(err)) { diag.error( `lane task error: lane=${lane} durationMs=${Date.now() - startTime} error="${String(err)}"`, ); + } else if (!isProbeLane) { + diag.debug( + `lane task interrupted: lane=${lane} durationMs=${Date.now() - startTime} reason="${String(err)}"`, + ); } if (completedCurrentGeneration) { pump();