diff --git a/CHANGELOG.md b/CHANGELOG.md index 57886d8ed9e..a0d88e88d27 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -61,6 +61,7 @@ Docs: https://docs.openclaw.ai - ACPX/runtime: repair `queue owner unavailable` session recovery by replacing dead named sessions and resuming the backend session when ACPX exposes a stable session id, so the first ACP prompt no longer inherits a dead handle. (#58669) Thanks @neeravmakwana - ACPX/runtime: retry dead-session queue-owner repair without `--resume-session` when the reported ACPX session id is stale, so recovery still creates a fresh named session instead of failing session init. Thanks @obviyus. - Auth/OpenAI Codex: persist plugin-refreshed OAuth credentials to `auth-profiles.json` before returning them, so rotated Codex refresh tokens survive restart and stop falling into `refresh_token_reused` loops. (#53082) +- Discord/gateway: hand reconnect ownership back to Carbon, keep runtime status aligned with close/reconnect state, and force-stop sockets that open without reaching READY so Discord monitors recover promptly instead of waiting on stale health timeouts. (#59019) Thanks @obviyus ## 2026.3.31 diff --git a/extensions/discord/src/gateway-logging.test.ts b/extensions/discord/src/gateway-logging.test.ts index f88449fed16..203c2aa1803 100644 --- a/extensions/discord/src/gateway-logging.test.ts +++ b/extensions/discord/src/gateway-logging.test.ts @@ -37,20 +37,25 @@ describe("attachDiscordGatewayLogging", () => { runtime, }); - emitter.emit("debug", "WebSocket connection opened"); - emitter.emit("debug", "WebSocket connection closed with code 1001"); - emitter.emit("debug", "Reconnecting with backoff: 1000ms after code 1001"); + emitter.emit("debug", "Gateway websocket opened"); + emitter.emit("debug", "Gateway websocket closed: 1001"); + emitter.emit("debug", "Gateway reconnect scheduled in 1000ms (close, resume=true)"); + emitter.emit("debug", "Gateway forcing fresh IDENTIFY after 3 failed resume attempts"); const logVerboseMock = vi.mocked(logVerbose); - expect(logVerboseMock).toHaveBeenCalledTimes(3); - expect(runtime.log).toHaveBeenCalledTimes(2); + expect(logVerboseMock).toHaveBeenCalledTimes(4); + expect(runtime.log).toHaveBeenCalledTimes(3); expect(runtime.log).toHaveBeenNthCalledWith( 1, - "discord gateway: WebSocket connection closed with code 1001", + "discord gateway: Gateway websocket closed: 1001", ); expect(runtime.log).toHaveBeenNthCalledWith( 2, - "discord gateway: Reconnecting with backoff: 1000ms after code 1001", + "discord gateway: Gateway reconnect scheduled in 1000ms (close, resume=true)", + ); + expect(runtime.log).toHaveBeenNthCalledWith( + 3, + "discord gateway: Gateway forcing fresh IDENTIFY after 3 failed resume attempts", ); cleanup(); @@ -88,7 +93,7 @@ describe("attachDiscordGatewayLogging", () => { const logVerboseMock = vi.mocked(logVerbose); logVerboseMock.mockClear(); - emitter.emit("debug", "WebSocket connection closed with code 1001"); + emitter.emit("debug", "Gateway websocket closed: 1001"); emitter.emit("warning", "High latency detected: 1200ms"); emitter.emit("metrics", { latency: 42 }); diff --git a/extensions/discord/src/gateway-logging.ts b/extensions/discord/src/gateway-logging.ts index 3a6802ccaef..eebe542252f 100644 --- a/extensions/discord/src/gateway-logging.ts +++ b/extensions/discord/src/gateway-logging.ts @@ -5,9 +5,9 @@ import type { RuntimeEnv } from "openclaw/plugin-sdk/runtime-env"; type GatewayEmitter = Pick; const INFO_DEBUG_MARKERS = [ - "WebSocket connection closed", - "Reconnecting with backoff", - "Attempting resume with backoff", + "Gateway websocket closed", + "Gateway reconnect scheduled in", + "Gateway forcing fresh IDENTIFY after", ]; const shouldPromoteGatewayDebug = (message: string) => diff --git a/extensions/discord/src/monitor.gateway.test.ts b/extensions/discord/src/monitor.gateway.test.ts index f4b0b13d04d..cde3427965a 100644 --- a/extensions/discord/src/monitor.gateway.test.ts +++ b/extensions/discord/src/monitor.gateway.test.ts @@ -127,39 +127,19 @@ describe("waitForDiscordGatewayStop", () => { it("rejects via registerForceStop and disconnects gateway", async () => { let forceStop: ((err: unknown) => void) | undefined; - const { detachLifecycle, disconnect, promise } = startGatewayWait({ - registerForceStop: (fn) => { - forceStop = fn; + registerForceStop: (handler) => { + forceStop = handler; }, }); - if (!forceStop) { - throw new Error("registerForceStop did not expose a stopper callback"); - } - forceStop(new Error("reconnect watchdog timeout")); + forceStop?.(new Error("runtime-not-ready")); - await expect(promise).rejects.toThrow("reconnect watchdog timeout"); + await expect(promise).rejects.toThrow("runtime-not-ready"); expect(disconnect).toHaveBeenCalledTimes(1); expect(detachLifecycle).toHaveBeenCalledTimes(1); }); - it("ignores forceStop after promise already settled", async () => { - let forceStop: ((err: unknown) => void) | undefined; - - const { abort, disconnect, promise } = startGatewayWait({ - registerForceStop: (fn) => { - forceStop = fn; - }, - }); - - abort.abort(); - await expect(promise).resolves.toBeUndefined(); - - forceStop?.(new Error("too late")); - expect(disconnect).toHaveBeenCalledTimes(1); - }); - it("keeps the lifecycle handler active until disconnect returns on abort", async () => { const onGatewayEvent = vi.fn(() => "stop" as const); const fatalEvent = createGatewayEvent("fatal", "disconnect emitted error"); diff --git a/extensions/discord/src/monitor.gateway.ts b/extensions/discord/src/monitor.gateway.ts index 6ef9cb95d2d..fee17555e76 100644 --- a/extensions/discord/src/monitor.gateway.ts +++ b/extensions/discord/src/monitor.gateway.ts @@ -65,7 +65,6 @@ export async function waitForDiscordGatewayStop( const onForceStop = (err: unknown) => { finishReject(err); }; - if (abortSignal?.aborted) { onAbort(); return; diff --git a/extensions/discord/src/monitor/provider.lifecycle.reconnect.ts b/extensions/discord/src/monitor/provider.lifecycle.reconnect.ts deleted file mode 100644 index fcdc7a6722a..00000000000 --- a/extensions/discord/src/monitor/provider.lifecycle.reconnect.ts +++ /dev/null @@ -1,497 +0,0 @@ -import { createArmableStallWatchdog } from "openclaw/plugin-sdk/channel-lifecycle"; -import { createConnectedChannelStatusPatch } from "openclaw/plugin-sdk/gateway-runtime"; -import { danger } from "openclaw/plugin-sdk/runtime-env"; -import type { RuntimeEnv } from "openclaw/plugin-sdk/runtime-env"; -import type { MutableDiscordGateway } from "./gateway-handle.js"; -import type { DiscordMonitorStatusSink } from "./status.js"; - -const DISCORD_GATEWAY_READY_TIMEOUT_MS = 15_000; -const DISCORD_GATEWAY_READY_POLL_MS = 250; -const DISCORD_GATEWAY_DISCONNECT_DRAIN_TIMEOUT_MS = 5_000; -const DISCORD_GATEWAY_FORCE_TERMINATE_CLOSE_TIMEOUT_MS = 1_000; -const DISCORD_GATEWAY_HELLO_TIMEOUT_MS = 30_000; -const DISCORD_GATEWAY_HELLO_CONNECTED_POLL_MS = 250; -const DISCORD_GATEWAY_MAX_CONSECUTIVE_HELLO_STALLS = 3; -const DISCORD_GATEWAY_RECONNECT_STALL_TIMEOUT_MS = 5 * 60_000; - -type GatewayReadyWaitResult = "ready" | "timeout" | "stopped"; - -async function waitForDiscordGatewayReady(params: { - gateway?: Pick; - abortSignal?: AbortSignal; - timeoutMs: number; - beforePoll?: () => Promise<"continue" | "stop"> | "continue" | "stop"; -}): Promise { - const deadlineAt = Date.now() + params.timeoutMs; - while (!params.abortSignal?.aborted) { - const pollDecision = await params.beforePoll?.(); - if (pollDecision === "stop") { - return "stopped"; - } - if (params.gateway?.isConnected) { - return "ready"; - } - if (Date.now() >= deadlineAt) { - return "timeout"; - } - await new Promise((resolve) => { - const timeout = setTimeout(resolve, DISCORD_GATEWAY_READY_POLL_MS); - timeout.unref?.(); - }); - } - return "stopped"; -} - -export function createDiscordGatewayReconnectController(params: { - accountId: string; - gateway?: MutableDiscordGateway; - runtime: RuntimeEnv; - abortSignal?: AbortSignal; - pushStatus: (patch: Parameters[0]) => void; - isLifecycleStopping: () => boolean; - drainPendingGatewayErrors: () => "continue" | "stop"; -}) { - let forceStopHandler: ((err: unknown) => void) | undefined; - let queuedForceStopError: unknown; - let helloTimeoutId: ReturnType | undefined; - let helloConnectedPollId: ReturnType | undefined; - let reconnectInFlight: Promise | undefined; - let consecutiveHelloStalls = 0; - - const shouldStop = () => params.isLifecycleStopping() || params.abortSignal?.aborted; - const resetHelloStallCounter = () => { - consecutiveHelloStalls = 0; - }; - const clearHelloWatch = () => { - if (helloTimeoutId) { - clearTimeout(helloTimeoutId); - helloTimeoutId = undefined; - } - if (helloConnectedPollId) { - clearInterval(helloConnectedPollId); - helloConnectedPollId = undefined; - } - }; - const parseGatewayCloseCode = (message: string): number | undefined => { - const match = /code\s+(\d{3,5})/i.exec(message); - if (!match?.[1]) { - return undefined; - } - const code = Number.parseInt(match[1], 10); - return Number.isFinite(code) ? code : undefined; - }; - const clearResumeState = () => { - if (!params.gateway?.state) { - return; - } - params.gateway.state.sessionId = null; - params.gateway.state.resumeGatewayUrl = null; - params.gateway.state.sequence = null; - params.gateway.sequence = null; - }; - const triggerForceStop = (err: unknown) => { - if (forceStopHandler) { - forceStopHandler(err); - return; - } - queuedForceStopError = err; - }; - const reconnectStallWatchdog = createArmableStallWatchdog({ - label: `discord:${params.accountId}:reconnect`, - timeoutMs: DISCORD_GATEWAY_RECONNECT_STALL_TIMEOUT_MS, - abortSignal: params.abortSignal, - runtime: params.runtime, - onTimeout: () => { - if (shouldStop()) { - return; - } - const at = Date.now(); - const error = new Error( - `discord reconnect watchdog timeout after ${DISCORD_GATEWAY_RECONNECT_STALL_TIMEOUT_MS}ms`, - ); - params.pushStatus({ - connected: false, - lastEventAt: at, - lastDisconnect: { - at, - error: error.message, - }, - lastError: error.message, - }); - params.runtime.error?.( - danger( - `discord: reconnect watchdog timeout after ${DISCORD_GATEWAY_RECONNECT_STALL_TIMEOUT_MS}ms; force-stopping monitor task`, - ), - ); - triggerForceStop(error); - }, - }); - const pushConnectedStatus = (at: number) => { - params.pushStatus({ - ...createConnectedChannelStatusPatch(at), - lastDisconnect: null, - }); - }; - const disconnectGatewaySocketWithoutAutoReconnect = async () => { - if (!params.gateway) { - return; - } - const gateway = params.gateway; - const socket = gateway.ws; - if (!socket) { - gateway.disconnect(); - return; - } - - // Carbon reconnects from the socket close handler even for intentional - // disconnects. Drop the current socket's close/error listeners so a forced - // reconnect does not race the old socket's automatic resume path. - for (const listener of socket.listeners("close")) { - socket.removeListener("close", listener); - } - for (const listener of socket.listeners("error")) { - socket.removeListener("error", listener); - } - - await new Promise((resolve, reject) => { - let settled = false; - let drainTimeout: ReturnType | undefined; - let terminateCloseTimeout: ReturnType | undefined; - const ignoreSocketError = () => {}; - const clearPendingTimers = () => { - if (drainTimeout) { - clearTimeout(drainTimeout); - drainTimeout = undefined; - } - if (terminateCloseTimeout) { - clearTimeout(terminateCloseTimeout); - terminateCloseTimeout = undefined; - } - }; - const cleanup = () => { - clearPendingTimers(); - socket.removeListener("close", onClose); - socket.removeListener("error", ignoreSocketError); - }; - const onClose = () => { - cleanup(); - if (settled) { - return; - } - settled = true; - resolve(); - }; - const resolveStoppedWait = () => { - if (settled) { - return; - } - settled = true; - clearPendingTimers(); - resolve(); - }; - const rejectClose = (error: Error) => { - if (shouldStop()) { - resolveStoppedWait(); - return; - } - if (settled) { - return; - } - settled = true; - clearPendingTimers(); - reject(error); - }; - - drainTimeout = setTimeout(() => { - if (settled) { - return; - } - if (shouldStop()) { - resolveStoppedWait(); - return; - } - params.runtime.error?.( - danger( - `discord: gateway socket did not close within ${DISCORD_GATEWAY_DISCONNECT_DRAIN_TIMEOUT_MS}ms before reconnect; attempting forced terminate before giving up`, - ), - ); - - let terminateStarted = false; - try { - if (typeof socket.terminate === "function") { - socket.terminate(); - terminateStarted = true; - } - } catch { - // Best-effort only. If terminate fails, fail closed instead of - // opening another socket on top of an unknown old one. - } - - if (!terminateStarted) { - params.runtime.error?.( - danger( - `discord: gateway socket did not expose a working terminate() after ${DISCORD_GATEWAY_DISCONNECT_DRAIN_TIMEOUT_MS}ms; force-stopping instead of opening a parallel socket`, - ), - ); - rejectClose( - new Error( - `discord gateway socket did not close within ${DISCORD_GATEWAY_DISCONNECT_DRAIN_TIMEOUT_MS}ms before reconnect`, - ), - ); - return; - } - - terminateCloseTimeout = setTimeout(() => { - if (settled) { - return; - } - if (shouldStop()) { - resolveStoppedWait(); - return; - } - params.runtime.error?.( - danger( - `discord: gateway socket did not close ${DISCORD_GATEWAY_FORCE_TERMINATE_CLOSE_TIMEOUT_MS}ms after forced terminate; force-stopping instead of opening a parallel socket`, - ), - ); - rejectClose( - new Error( - `discord gateway socket did not close within ${DISCORD_GATEWAY_DISCONNECT_DRAIN_TIMEOUT_MS}ms before reconnect`, - ), - ); - }, DISCORD_GATEWAY_FORCE_TERMINATE_CLOSE_TIMEOUT_MS); - terminateCloseTimeout.unref?.(); - }, DISCORD_GATEWAY_DISCONNECT_DRAIN_TIMEOUT_MS); - drainTimeout.unref?.(); - socket.on("error", ignoreSocketError); - socket.on("close", onClose); - gateway.disconnect(); - }); - }; - const reconnectGateway = async (reconnectParams: { - resume: boolean; - forceFreshIdentify?: boolean; - }) => { - if (reconnectInFlight) { - return await reconnectInFlight; - } - reconnectInFlight = (async () => { - if (reconnectParams.forceFreshIdentify) { - // Carbon still sends RESUME on HELLO when session state is populated, - // even after connect(false). Clear cached session data first so this - // path truly forces a fresh IDENTIFY. - clearResumeState(); - } - if (shouldStop()) { - return; - } - await disconnectGatewaySocketWithoutAutoReconnect(); - if (shouldStop()) { - return; - } - params.gateway?.connect(reconnectParams.resume); - })().finally(() => { - reconnectInFlight = undefined; - }); - return await reconnectInFlight; - }; - const reconnectGatewayFresh = async () => { - await reconnectGateway({ resume: false, forceFreshIdentify: true }); - }; - const onGatewayDebug = (msg: unknown) => { - const message = String(msg); - const at = Date.now(); - params.pushStatus({ lastEventAt: at }); - if (message.includes("WebSocket connection closed")) { - if (params.gateway?.isConnected) { - resetHelloStallCounter(); - } - reconnectStallWatchdog.arm(at); - params.pushStatus({ - connected: false, - lastDisconnect: { - at, - status: parseGatewayCloseCode(message), - }, - }); - clearHelloWatch(); - return; - } - if (!message.includes("WebSocket connection opened")) { - return; - } - reconnectStallWatchdog.disarm(); - clearHelloWatch(); - - let sawConnected = params.gateway?.isConnected === true; - if (sawConnected) { - pushConnectedStatus(at); - } - helloConnectedPollId = setInterval(() => { - if (!params.gateway?.isConnected) { - return; - } - sawConnected = true; - resetHelloStallCounter(); - reconnectStallWatchdog.disarm(); - pushConnectedStatus(Date.now()); - if (helloConnectedPollId) { - clearInterval(helloConnectedPollId); - helloConnectedPollId = undefined; - } - }, DISCORD_GATEWAY_HELLO_CONNECTED_POLL_MS); - - helloTimeoutId = setTimeout(() => { - helloTimeoutId = undefined; - void (async () => { - try { - if (helloConnectedPollId) { - clearInterval(helloConnectedPollId); - helloConnectedPollId = undefined; - } - if (sawConnected || params.gateway?.isConnected) { - resetHelloStallCounter(); - return; - } - - consecutiveHelloStalls += 1; - const forceFreshIdentify = - consecutiveHelloStalls >= DISCORD_GATEWAY_MAX_CONSECUTIVE_HELLO_STALLS; - const stalledAt = Date.now(); - reconnectStallWatchdog.arm(stalledAt); - params.pushStatus({ - connected: false, - lastEventAt: stalledAt, - lastDisconnect: { - at: stalledAt, - error: "hello-timeout", - }, - }); - params.runtime.log?.( - danger( - forceFreshIdentify - ? `connection stalled: no HELLO within ${DISCORD_GATEWAY_HELLO_TIMEOUT_MS}ms (${consecutiveHelloStalls}/${DISCORD_GATEWAY_MAX_CONSECUTIVE_HELLO_STALLS}); forcing fresh identify` - : `connection stalled: no HELLO within ${DISCORD_GATEWAY_HELLO_TIMEOUT_MS}ms (${consecutiveHelloStalls}/${DISCORD_GATEWAY_MAX_CONSECUTIVE_HELLO_STALLS}); retrying resume`, - ), - ); - if (forceFreshIdentify) { - resetHelloStallCounter(); - } - if (shouldStop()) { - return; - } - if (forceFreshIdentify) { - await reconnectGatewayFresh(); - return; - } - await reconnectGateway({ resume: true }); - } catch (err) { - params.runtime.error?.( - danger(`discord: failed to restart stalled gateway socket: ${String(err)}`), - ); - triggerForceStop(err); - } - })(); - }, DISCORD_GATEWAY_HELLO_TIMEOUT_MS); - }; - const onAbort = () => { - reconnectStallWatchdog.disarm(); - const at = Date.now(); - params.pushStatus({ connected: false, lastEventAt: at }); - if (!params.gateway) { - return; - } - params.gateway.options.reconnect = { maxAttempts: 0 }; - params.gateway.disconnect(); - }; - const ensureStartupReady = async () => { - if (!params.gateway || params.gateway.isConnected || shouldStop()) { - if (params.gateway?.isConnected && !shouldStop()) { - pushConnectedStatus(Date.now()); - } - return; - } - - const initialReady = await waitForDiscordGatewayReady({ - gateway: params.gateway, - abortSignal: params.abortSignal, - timeoutMs: DISCORD_GATEWAY_READY_TIMEOUT_MS, - beforePoll: params.drainPendingGatewayErrors, - }); - if (initialReady === "stopped" || shouldStop()) { - return; - } - if (initialReady === "timeout") { - params.runtime.error?.( - danger( - `discord: gateway was not ready after ${DISCORD_GATEWAY_READY_TIMEOUT_MS}ms; forcing a fresh reconnect`, - ), - ); - const startupRetryAt = Date.now(); - params.pushStatus({ - connected: false, - lastEventAt: startupRetryAt, - lastDisconnect: { - at: startupRetryAt, - error: "startup-not-ready", - }, - }); - await reconnectGatewayFresh(); - const reconnected = await waitForDiscordGatewayReady({ - gateway: params.gateway, - abortSignal: params.abortSignal, - timeoutMs: DISCORD_GATEWAY_READY_TIMEOUT_MS, - beforePoll: params.drainPendingGatewayErrors, - }); - if (reconnected === "stopped" || shouldStop()) { - return; - } - if (reconnected === "timeout") { - const error = new Error( - `discord gateway did not reach READY within ${DISCORD_GATEWAY_READY_TIMEOUT_MS}ms after a forced reconnect`, - ); - const startupFailureAt = Date.now(); - params.pushStatus({ - connected: false, - lastEventAt: startupFailureAt, - lastDisconnect: { - at: startupFailureAt, - error: "startup-reconnect-timeout", - }, - lastError: error.message, - }); - throw error; - } - } - - if (params.gateway.isConnected && !shouldStop()) { - pushConnectedStatus(Date.now()); - } - }; - - if (params.abortSignal?.aborted) { - onAbort(); - } else { - params.abortSignal?.addEventListener("abort", onAbort, { once: true }); - } - - return { - ensureStartupReady, - onAbort, - onGatewayDebug, - clearHelloWatch, - registerForceStop: (handler: (err: unknown) => void) => { - forceStopHandler = handler; - if (queuedForceStopError !== undefined) { - const queued = queuedForceStopError; - queuedForceStopError = undefined; - handler(queued); - } - }, - dispose: () => { - reconnectStallWatchdog.stop(); - clearHelloWatch(); - params.abortSignal?.removeEventListener("abort", onAbort); - }, - }; -} diff --git a/extensions/discord/src/monitor/provider.lifecycle.test.ts b/extensions/discord/src/monitor/provider.lifecycle.test.ts index 00780e51847..5feb89dbd28 100644 --- a/extensions/discord/src/monitor/provider.lifecycle.test.ts +++ b/extensions/discord/src/monitor/provider.lifecycle.test.ts @@ -14,14 +14,7 @@ type MockGateway = { options: GatewayPlugin["options"]; disconnect: Mock<() => void>; connect: Mock<(resume?: boolean) => void>; - state?: { - sessionId?: string | null; - resumeGatewayUrl?: string | null; - sequence?: number | null; - }; - sequence?: number | null; emitter: EventEmitter; - ws?: EventEmitter & { terminate?: () => void }; }; const { @@ -76,14 +69,40 @@ describe("runDiscordGatewayLifecycle", () => { stopGatewayLoggingMock.mockClear(); }); - const createLifecycleHarness = (params?: { - accountId?: string; + function createGatewayHarness(): { emitter: EventEmitter; gateway: MockGateway } { + const emitter = new EventEmitter(); + return { + emitter, + gateway: { + isConnected: false, + options: { intents: 0, reconnect: { maxAttempts: 50 } } as GatewayPlugin["options"], + disconnect: vi.fn(), + connect: vi.fn(), + emitter, + }, + }; + } + + function createGatewayEvent( + type: DiscordGatewayEvent["type"], + message: string, + ): DiscordGatewayEvent { + const err = new Error(message); + return { + type, + err, + message: String(err), + shouldStopLifecycle: type !== "other", + }; + } + + function createLifecycleHarness(params?: { + gateway?: MockGateway; start?: () => Promise; stop?: () => Promise; isDisallowedIntentsError?: (err: unknown) => boolean; pendingGatewayEvents?: DiscordGatewayEvent[]; - gateway?: MockGateway; - }) => { + }) { const gateway = params?.gateway ?? (() => { @@ -96,15 +115,11 @@ describe("runDiscordGatewayLifecycle", () => { const threadStop = vi.fn(); const runtimeLog = vi.fn(); const runtimeError = vi.fn(); - const runtimeExit = vi.fn(); const pendingGatewayEvents = params?.pendingGatewayEvents ?? []; const gatewaySupervisor = { attachLifecycle: vi.fn(), detachLifecycle: vi.fn(), drainPending: vi.fn((handler: (event: DiscordGatewayEvent) => "continue" | "stop") => { - if (pendingGatewayEvents.length === 0) { - return "continue"; - } const queued = [...pendingGatewayEvents]; pendingGatewayEvents.length = 0; for (const event of queued) { @@ -121,7 +136,7 @@ describe("runDiscordGatewayLifecycle", () => { const runtime: RuntimeEnv = { log: runtimeLog, error: runtimeError, - exit: runtimeExit, + exit: vi.fn(), }; return { start, @@ -132,7 +147,7 @@ describe("runDiscordGatewayLifecycle", () => { gatewaySupervisor, statusSink, lifecycleParams: { - accountId: params?.accountId ?? "default", + accountId: "default", gateway: gateway as unknown as MutableDiscordGateway, runtime, isDisallowedIntentsError: params?.isDisallowedIntentsError ?? (() => false), @@ -145,7 +160,7 @@ describe("runDiscordGatewayLifecycle", () => { abortSignal: undefined as AbortSignal | undefined, } satisfies LifecycleParams, }; - }; + } function expectLifecycleCleanup(params: { start: ReturnType; @@ -163,64 +178,6 @@ describe("runDiscordGatewayLifecycle", () => { expect(params.gatewaySupervisor.detachLifecycle).toHaveBeenCalledTimes(1); } - function createGatewayHarness(params?: { - state?: { - sessionId?: string | null; - resumeGatewayUrl?: string | null; - sequence?: number | null; - }; - sequence?: number | null; - ws?: EventEmitter & { terminate?: () => void }; - }): { emitter: EventEmitter; gateway: MockGateway } { - const emitter = new EventEmitter(); - const gateway: MockGateway = { - isConnected: false, - options: { intents: 0 } as GatewayPlugin["options"], - disconnect: vi.fn(), - connect: vi.fn(), - ...(params?.state ? { state: params.state } : {}), - ...(params?.sequence !== undefined ? { sequence: params.sequence } : {}), - ...(params?.ws ? { ws: params.ws } : {}), - emitter, - }; - return { emitter, gateway }; - } - - async function emitGatewayOpenAndWait(emitter: EventEmitter, delayMs = 30000): Promise { - emitter.emit("debug", "WebSocket connection opened"); - await vi.advanceTimersByTimeAsync(delayMs); - } - - function createGatewayEvent( - type: DiscordGatewayEvent["type"], - message: string, - ): DiscordGatewayEvent { - const err = new Error(message); - return { - type, - err, - message: String(err), - shouldStopLifecycle: type !== "other", - }; - } - - function expectGatewaySessionStateCleared(gateway: { - state?: { - sessionId?: string | null; - resumeGatewayUrl?: string | null; - sequence?: number | null; - }; - sequence?: number | null; - }) { - if (!gateway.state) { - throw new Error("gateway state was not initialized"); - } - expect(gateway.state.sessionId).toBeNull(); - expect(gateway.state.resumeGatewayUrl).toBeNull(); - expect(gateway.state.sequence).toBeNull(); - expect(gateway.sequence).toBeNull(); - } - it("cleans up thread bindings when exec approvals startup fails", async () => { const { lifecycleParams, start, stop, threadStop, gatewaySupervisor } = createLifecycleHarness({ start: async () => { @@ -257,21 +214,6 @@ describe("runDiscordGatewayLifecycle", () => { }); }); - it("cleans up after successful gateway wait", async () => { - const { lifecycleParams, start, stop, threadStop, gatewaySupervisor } = - createLifecycleHarness(); - - await expect(runDiscordGatewayLifecycle(lifecycleParams)).resolves.toBeUndefined(); - - expectLifecycleCleanup({ - start, - stop, - threadStop, - waitCalls: 1, - gatewaySupervisor, - }); - }); - it("pushes connected status when gateway is already connected at lifecycle start", async () => { const { emitter, gateway } = createGatewayHarness(); gateway.isConnected = true; @@ -280,26 +222,20 @@ describe("runDiscordGatewayLifecycle", () => { const { lifecycleParams, statusSink } = createLifecycleHarness({ gateway }); await expect(runDiscordGatewayLifecycle(lifecycleParams)).resolves.toBeUndefined(); - const connectedCall = statusSink.mock.calls.find((call) => { - const patch = (call[0] ?? {}) as Record; - return patch.connected === true; - }); - if (!connectedCall) { - throw new Error("connected status update was not emitted"); - } - expect(connectedCall[0]).toMatchObject({ - connected: true, - lastDisconnect: null, - }); - expect(connectedCall[0].lastConnectedAt).toBeTypeOf("number"); + expect(statusSink).toHaveBeenCalledWith( + expect.objectContaining({ + connected: true, + lastDisconnect: null, + }), + ); }); - it("forces a fresh reconnect when startup never reaches READY, then recovers", async () => { + it("restarts the gateway once when startup never reaches READY, then recovers", async () => { vi.useFakeTimers(); try { const { emitter, gateway } = createGatewayHarness(); getDiscordGatewayEmitterMock.mockReturnValueOnce(emitter); - gateway.connect.mockImplementation((_resume?: boolean) => { + gateway.connect.mockImplementation(() => { setTimeout(() => { gateway.isConnected = true; }, 1_000); @@ -307,11 +243,12 @@ describe("runDiscordGatewayLifecycle", () => { const { lifecycleParams, runtimeError } = createLifecycleHarness({ gateway }); const lifecyclePromise = runDiscordGatewayLifecycle(lifecycleParams); - await vi.advanceTimersByTimeAsync(15_000 + 1_000); + + await vi.advanceTimersByTimeAsync(16_500); await expect(lifecyclePromise).resolves.toBeUndefined(); expect(runtimeError).toHaveBeenCalledWith( - expect.stringContaining("gateway was not ready after 15000ms"), + expect.stringContaining("gateway was not ready after 15000ms; restarting gateway"), ); expect(gateway.disconnect).toHaveBeenCalledTimes(1); expect(gateway.connect).toHaveBeenCalledTimes(1); @@ -321,218 +258,7 @@ describe("runDiscordGatewayLifecycle", () => { } }); - it("clears resume state and suppresses socket-driven auto-resume during forced startup reconnects", async () => { - vi.useFakeTimers(); - try { - const pendingGatewayEvents: DiscordGatewayEvent[] = []; - const socket = new EventEmitter(); - const { emitter, gateway } = createGatewayHarness({ - state: { - sessionId: "stale-session", - resumeGatewayUrl: "wss://gateway.discord.gg", - sequence: 123, - }, - sequence: 123, - ws: socket, - }); - getDiscordGatewayEmitterMock.mockReturnValueOnce(emitter); - - socket.on("error", (err) => { - pendingGatewayEvents.push({ - type: "other", - err, - message: String(err), - shouldStopLifecycle: false, - }); - }); - socket.on("close", () => { - gateway.connect(true); - }); - gateway.disconnect.mockImplementation(() => { - setTimeout(() => { - socket.emit( - "error", - new Error("WebSocket was closed before the connection was established"), - ); - socket.emit("close", 1006, ""); - }, 1); - }); - gateway.connect.mockImplementation((resume?: boolean) => { - if (resume === false) { - setTimeout(() => { - gateway.isConnected = true; - }, 1_000); - } - }); - - const { lifecycleParams, runtimeError } = createLifecycleHarness({ - gateway, - pendingGatewayEvents, - }); - const lifecyclePromise = runDiscordGatewayLifecycle(lifecycleParams); - await vi.advanceTimersByTimeAsync(17_000); - await expect(lifecyclePromise).resolves.toBeUndefined(); - - expect(gateway.connect).toHaveBeenCalledTimes(1); - expect(gateway.connect).toHaveBeenCalledWith(false); - expect(runtimeError).not.toHaveBeenCalledWith( - expect.stringContaining("WebSocket was closed before the connection was established"), - ); - expectGatewaySessionStateCleared(gateway); - } finally { - vi.useRealTimers(); - } - }); - - it("waits for forced terminate to close the old socket before reconnecting", async () => { - vi.useFakeTimers(); - try { - const socket = Object.assign(new EventEmitter(), { - terminate: vi.fn(() => { - setTimeout(() => { - socket.emit( - "error", - new Error("WebSocket was closed before the connection was established"), - ); - socket.emit("close", 1006, ""); - }, 1); - }), - }); - const { emitter, gateway } = createGatewayHarness({ ws: socket }); - getDiscordGatewayEmitterMock.mockReturnValueOnce(emitter); - gateway.connect.mockImplementation((_resume?: boolean) => { - setTimeout(() => { - gateway.isConnected = true; - }, 1_000); - }); - - const { lifecycleParams, runtimeError } = createLifecycleHarness({ gateway }); - const lifecyclePromise = runDiscordGatewayLifecycle(lifecycleParams); - await vi.advanceTimersByTimeAsync(15_000 + 5_000 + 1_500); - await expect(lifecyclePromise).resolves.toBeUndefined(); - - expect(socket.terminate).toHaveBeenCalledTimes(1); - expect(gateway.connect).toHaveBeenCalledTimes(1); - expect(gateway.connect).toHaveBeenCalledWith(false); - expect(runtimeError).toHaveBeenCalledWith( - expect.stringContaining("attempting forced terminate before giving up"), - ); - expect(runtimeError).not.toHaveBeenCalledWith( - expect.stringContaining("WebSocket was closed before the connection was established"), - ); - } finally { - vi.useRealTimers(); - } - }); - - it("fails closed when forced terminate still does not close the old socket", async () => { - vi.useFakeTimers(); - try { - const socket = Object.assign(new EventEmitter(), { - terminate: vi.fn(() => { - setTimeout(() => { - socket.emit( - "error", - new Error("WebSocket was closed before the connection was established"), - ); - }, 1); - }), - }); - const { emitter, gateway } = createGatewayHarness({ ws: socket }); - getDiscordGatewayEmitterMock.mockReturnValueOnce(emitter); - const { lifecycleParams, start, stop, threadStop, runtimeError, gatewaySupervisor } = - createLifecycleHarness({ gateway }); - - const lifecyclePromise = runDiscordGatewayLifecycle(lifecycleParams); - lifecyclePromise.catch(() => {}); - await vi.advanceTimersByTimeAsync(15_000 + 5_000 + 1_500); - await expect(lifecyclePromise).rejects.toThrow( - "discord gateway socket did not close within 5000ms before reconnect", - ); - - expect(socket.terminate).toHaveBeenCalledTimes(1); - expect(gateway.connect).not.toHaveBeenCalled(); - expect(runtimeError).toHaveBeenCalledWith( - expect.stringContaining("force-stopping instead of opening a parallel socket"), - ); - expect(runtimeError).not.toHaveBeenCalledWith( - expect.stringContaining("WebSocket was closed before the connection was established"), - ); - expectLifecycleCleanup({ - start, - stop, - threadStop, - waitCalls: 0, - gatewaySupervisor, - }); - } finally { - vi.useRealTimers(); - } - }); - - it("does not reconnect after lifecycle shutdown begins during socket drain", async () => { - vi.useFakeTimers(); - try { - const socket = new EventEmitter(); - const { emitter, gateway } = createGatewayHarness({ ws: socket }); - getDiscordGatewayEmitterMock.mockReturnValueOnce(emitter); - gateway.disconnect.mockImplementation(() => { - setTimeout(() => { - socket.emit("close", 1000, ""); - }, 1_000); - }); - - const abortController = new AbortController(); - const { lifecycleParams } = createLifecycleHarness({ gateway }); - lifecycleParams.abortSignal = abortController.signal; - - const lifecyclePromise = runDiscordGatewayLifecycle(lifecycleParams); - await vi.advanceTimersByTimeAsync(15_100); - abortController.abort(); - await vi.advanceTimersByTimeAsync(2_000); - await expect(lifecyclePromise).resolves.toBeUndefined(); - - expect(gateway.connect).not.toHaveBeenCalled(); - } finally { - vi.useRealTimers(); - } - }); - - it("treats drain timeout as a graceful stop after lifecycle abort", async () => { - vi.useFakeTimers(); - try { - const socket = new EventEmitter(); - const { emitter, gateway } = createGatewayHarness({ ws: socket }); - getDiscordGatewayEmitterMock.mockReturnValueOnce(emitter); - - const abortController = new AbortController(); - const { lifecycleParams, start, stop, threadStop, runtimeError, gatewaySupervisor } = - createLifecycleHarness({ gateway }); - lifecycleParams.abortSignal = abortController.signal; - - const lifecyclePromise = runDiscordGatewayLifecycle(lifecycleParams); - await vi.advanceTimersByTimeAsync(15_100); - abortController.abort(); - await vi.advanceTimersByTimeAsync(5_500); - await expect(lifecyclePromise).resolves.toBeUndefined(); - - expect(gateway.connect).not.toHaveBeenCalled(); - expect(runtimeError).not.toHaveBeenCalledWith( - expect.stringContaining("gateway socket did not close within 5000ms before reconnect"), - ); - expectLifecycleCleanup({ - start, - stop, - threadStop, - waitCalls: 1, - gatewaySupervisor, - }); - } finally { - vi.useRealTimers(); - } - }); - - it("fails fast when startup never reaches READY after a forced reconnect", async () => { + it("fails when startup still is not ready after a restart", async () => { vi.useFakeTimers(); try { const { emitter, gateway } = createGatewayHarness(); @@ -542,11 +268,11 @@ describe("runDiscordGatewayLifecycle", () => { const lifecyclePromise = runDiscordGatewayLifecycle(lifecycleParams); lifecyclePromise.catch(() => {}); - await vi.advanceTimersByTimeAsync(15_000 * 2 + 1_000); - await expect(lifecyclePromise).rejects.toThrow( - "discord gateway did not reach READY within 15000ms after a forced reconnect", - ); + await vi.advanceTimersByTimeAsync(31_000); + await expect(lifecyclePromise).rejects.toThrow( + "discord gateway did not reach READY within 15000ms after restart", + ); expect(gateway.disconnect).toHaveBeenCalledTimes(1); expect(gateway.connect).toHaveBeenCalledTimes(1); expect(gateway.connect).toHaveBeenCalledWith(false); @@ -605,7 +331,7 @@ describe("runDiscordGatewayLifecycle", () => { }); }); - it("throws queued non-disallowed fatal gateway errors", async () => { + it("throws queued fatal startup gateway errors", async () => { const { lifecycleParams, start, stop, threadStop, gatewaySupervisor } = createLifecycleHarness({ pendingGatewayEvents: [createGatewayEvent("fatal", "Fatal Gateway error: 4000")], }); @@ -623,6 +349,29 @@ describe("runDiscordGatewayLifecycle", () => { }); }); + it("throws queued reconnect exhaustion errors", async () => { + const { lifecycleParams, start, stop, threadStop, gatewaySupervisor } = createLifecycleHarness({ + pendingGatewayEvents: [ + createGatewayEvent( + "reconnect-exhausted", + "Max reconnect attempts (50) reached after code 1005", + ), + ], + }); + + await expect(runDiscordGatewayLifecycle(lifecycleParams)).rejects.toThrow( + "Max reconnect attempts (50) reached after code 1005", + ); + + expectLifecycleCleanup({ + start, + stop, + threadStop, + waitCalls: 0, + gatewaySupervisor, + }); + }); + it("surfaces fatal startup gateway errors while waiting for READY", async () => { vi.useFakeTimers(); try { @@ -642,8 +391,8 @@ describe("runDiscordGatewayLifecycle", () => { const lifecyclePromise = runDiscordGatewayLifecycle(lifecycleParams); lifecyclePromise.catch(() => {}); await vi.advanceTimersByTimeAsync(1_500); - await expect(lifecyclePromise).rejects.toThrow("Fatal Gateway error: 4001"); + await expect(lifecyclePromise).rejects.toThrow("Fatal Gateway error: 4001"); expect(runtimeError).toHaveBeenCalledWith( expect.stringContaining("discord gateway error: Error: Fatal Gateway error: 4001"), ); @@ -661,247 +410,111 @@ describe("runDiscordGatewayLifecycle", () => { } }); - it("retries stalled HELLO with resume before forcing fresh identify", async () => { - vi.useFakeTimers(); - try { - const { emitter, gateway } = createGatewayHarness({ - state: { - sessionId: "session-1", - resumeGatewayUrl: "wss://gateway.discord.gg", - sequence: 123, - }, - sequence: 123, - }); - gateway.isConnected = true; - getDiscordGatewayEmitterMock.mockReturnValueOnce(emitter); - waitForDiscordGatewayStopMock.mockImplementationOnce(async () => { - emitter.emit("debug", "WebSocket connection closed with code 1006"); - gateway.isConnected = false; - await emitGatewayOpenAndWait(emitter); - await emitGatewayOpenAndWait(emitter); - await emitGatewayOpenAndWait(emitter); - }); - - const { lifecycleParams } = createLifecycleHarness({ gateway }); - await expect(runDiscordGatewayLifecycle(lifecycleParams)).resolves.toBeUndefined(); - - expect(gateway.disconnect).toHaveBeenCalledTimes(3); - expect(gateway.connect).toHaveBeenNthCalledWith(1, true); - expect(gateway.connect).toHaveBeenNthCalledWith(2, true); - expect(gateway.connect).toHaveBeenNthCalledWith(3, false); - expectGatewaySessionStateCleared(gateway); - } finally { - vi.useRealTimers(); - } - }); - - it("resets HELLO stall counter after a successful reconnect that drops quickly", async () => { - vi.useFakeTimers(); - try { - const { emitter, gateway } = createGatewayHarness({ - state: { - sessionId: "session-2", - resumeGatewayUrl: "wss://gateway.discord.gg", - sequence: 456, - }, - sequence: 456, - }); - gateway.isConnected = true; - getDiscordGatewayEmitterMock.mockReturnValueOnce(emitter); - waitForDiscordGatewayStopMock.mockImplementationOnce(async () => { - emitter.emit("debug", "WebSocket connection closed with code 1006"); - gateway.isConnected = false; - await emitGatewayOpenAndWait(emitter); - - await emitGatewayOpenAndWait(emitter); - - // Successful reconnect (READY/RESUMED sets isConnected=true), then - // quick drop before the HELLO timeout window finishes. - gateway.isConnected = true; - await emitGatewayOpenAndWait(emitter, 10); - emitter.emit("debug", "WebSocket connection closed with code 1006"); - gateway.isConnected = false; - - await emitGatewayOpenAndWait(emitter); - await emitGatewayOpenAndWait(emitter); - }); - - const { lifecycleParams } = createLifecycleHarness({ gateway }); - await expect(runDiscordGatewayLifecycle(lifecycleParams)).resolves.toBeUndefined(); - - expect(gateway.connect).toHaveBeenCalledTimes(4); - expect(gateway.connect).toHaveBeenNthCalledWith(1, true); - expect(gateway.connect).toHaveBeenNthCalledWith(2, true); - expect(gateway.connect).toHaveBeenNthCalledWith(3, true); - expect(gateway.connect).toHaveBeenNthCalledWith(4, true); - expect(gateway.connect).not.toHaveBeenCalledWith(false); - } finally { - vi.useRealTimers(); - } - }); - - it("force-stops when reconnect stalls after a close event", async () => { - vi.useFakeTimers(); - try { - const { emitter, gateway } = createGatewayHarness(); - gateway.isConnected = true; - getDiscordGatewayEmitterMock.mockReturnValueOnce(emitter); - waitForDiscordGatewayStopMock.mockImplementationOnce( - (waitParams: WaitForDiscordGatewayStopParams) => - new Promise((_resolve, reject) => { - waitParams.registerForceStop?.((err) => reject(err)); - }), - ); - const { lifecycleParams } = createLifecycleHarness({ gateway }); - - const lifecyclePromise = runDiscordGatewayLifecycle(lifecycleParams); - lifecyclePromise.catch(() => {}); - emitter.emit("debug", "WebSocket connection closed with code 1006"); - - await vi.advanceTimersByTimeAsync(5 * 60_000 + 1_000); - await expect(lifecyclePromise).rejects.toThrow("reconnect watchdog timeout"); - } finally { - vi.useRealTimers(); - } - }); - - it("does not force-stop when reconnect resumes before watchdog timeout", async () => { - vi.useFakeTimers(); - try { - const { emitter, gateway } = createGatewayHarness(); - gateway.isConnected = true; - getDiscordGatewayEmitterMock.mockReturnValueOnce(emitter); - let resolveWait: (() => void) | undefined; - waitForDiscordGatewayStopMock.mockImplementationOnce( - (waitParams: WaitForDiscordGatewayStopParams) => - new Promise((resolve, reject) => { - resolveWait = resolve; - waitParams.registerForceStop?.((err) => reject(err)); - }), - ); - const { lifecycleParams, runtimeLog } = createLifecycleHarness({ gateway }); - - const lifecyclePromise = runDiscordGatewayLifecycle(lifecycleParams); - emitter.emit("debug", "WebSocket connection closed with code 1006"); - await vi.advanceTimersByTimeAsync(60_000); - - gateway.isConnected = true; - emitter.emit("debug", "WebSocket connection opened"); - await vi.advanceTimersByTimeAsync(5 * 60_000 + 1_000); - - expect(runtimeLog).not.toHaveBeenCalledWith( - expect.stringContaining("reconnect watchdog timeout"), - ); - resolveWait?.(); - await expect(lifecyclePromise).resolves.toBeUndefined(); - } finally { - vi.useRealTimers(); - } - }); - - it("suppresses reconnect-exhausted already queued before shutdown", async () => { - const pendingGatewayEvents: DiscordGatewayEvent[] = []; - const abortController = new AbortController(); - - const emitter = new EventEmitter(); - const gateway: MockGateway = { - isConnected: true, - options: { intents: 0, reconnect: { maxAttempts: 50 } } as GatewayPlugin["options"], - disconnect: vi.fn(), - connect: vi.fn(), - emitter, - }; + it("pushes disconnected status when Carbon closes after startup", async () => { + const { emitter, gateway } = createGatewayHarness(); + gateway.isConnected = true; getDiscordGatewayEmitterMock.mockReturnValueOnce(emitter); - - const { lifecycleParams, runtimeLog, runtimeError } = createLifecycleHarness({ - gateway, - pendingGatewayEvents, - }); - lifecycleParams.abortSignal = abortController.signal; - - // Start lifecycle; it yields at execApprovalsHandler.start(). We then - // queue a reconnect-exhausted event and abort. The lifecycle resumes and - // drains the queued event before shutdown teardown flips lifecycleStopping, - // so drainPending must treat it as a graceful stop. - const lifecyclePromise = runDiscordGatewayLifecycle(lifecycleParams); - - pendingGatewayEvents.push( - createGatewayEvent( - "reconnect-exhausted", - "Max reconnect attempts (0) reached after code 1005", - ), - ); - abortController.abort(); - - await expect(lifecyclePromise).resolves.toBeUndefined(); - expect(runtimeLog).not.toHaveBeenCalledWith( - expect.stringContaining("ignoring expected reconnect-exhausted during shutdown"), - ); - expect(runtimeError).toHaveBeenCalledWith(expect.stringContaining("Max reconnect attempts")); - }); - - it("rejects reconnect-exhausted queued before startup when shutdown has not begun", async () => { - const pendingGatewayEvents: DiscordGatewayEvent[] = []; - - const emitter = new EventEmitter(); - const gateway: MockGateway = { - isConnected: true, - options: { intents: 0, reconnect: { maxAttempts: 50 } } as GatewayPlugin["options"], - disconnect: vi.fn(), - connect: vi.fn(), - emitter, - }; - getDiscordGatewayEmitterMock.mockReturnValueOnce(emitter); - - const { lifecycleParams } = createLifecycleHarness({ - gateway, - pendingGatewayEvents, + waitForDiscordGatewayStopMock.mockImplementationOnce(async () => { + emitter.emit("debug", "Gateway websocket closed: 1006"); }); - pendingGatewayEvents.push( - createGatewayEvent( - "reconnect-exhausted", - "Max reconnect attempts (0) reached after code 1005", - ), - ); - - await expect(runDiscordGatewayLifecycle(lifecycleParams)).rejects.toThrow( - "Max reconnect attempts", - ); - }); - - it("does not push connected: true when abortSignal is already aborted", async () => { - const emitter = new EventEmitter(); - const gateway: MockGateway = { - isConnected: true, - options: { intents: 0, reconnect: { maxAttempts: 3 } } as GatewayPlugin["options"], - disconnect: vi.fn(), - connect: vi.fn(), - emitter, - }; - getDiscordGatewayEmitterMock.mockReturnValueOnce(emitter); - - const abortController = new AbortController(); - abortController.abort(); - - const statusUpdates: Array> = []; - const statusSink = (patch: Record) => { - statusUpdates.push({ ...patch }); - }; - - const { lifecycleParams } = createLifecycleHarness({ gateway }); - lifecycleParams.abortSignal = abortController.signal; - (lifecycleParams as Record).statusSink = statusSink; + const { lifecycleParams, statusSink } = createLifecycleHarness({ gateway }); await expect(runDiscordGatewayLifecycle(lifecycleParams)).resolves.toBeUndefined(); - // onAbort should have pushed connected: false - const connectedFalse = statusUpdates.find((s) => s.connected === false); - expect(connectedFalse).toEqual(expect.objectContaining({ connected: false })); + expect(statusSink).toHaveBeenCalledWith( + expect.objectContaining({ + connected: false, + lastDisconnect: expect.objectContaining({ status: 1006 }), + }), + ); + }); - // No connected: true should appear — the isConnected check must be - // guarded by !lifecycleStopping to avoid contradicting the abort. - const connectedTrue = statusUpdates.find((s) => s.connected === true); - expect(connectedTrue).toBeUndefined(); + it("pushes disconnected status when Carbon schedules a reconnect", async () => { + const { emitter, gateway } = createGatewayHarness(); + gateway.isConnected = true; + getDiscordGatewayEmitterMock.mockReturnValueOnce(emitter); + waitForDiscordGatewayStopMock.mockImplementationOnce(async () => { + emitter.emit("debug", "Gateway reconnect scheduled in 1000ms (zombie, resume=true)"); + }); + + const { lifecycleParams, statusSink } = createLifecycleHarness({ gateway }); + + await expect(runDiscordGatewayLifecycle(lifecycleParams)).resolves.toBeUndefined(); + + expect(statusSink).toHaveBeenCalledWith( + expect.objectContaining({ + connected: false, + lastError: "Gateway reconnect scheduled in 1000ms (zombie, resume=true)", + }), + ); + }); + + it("pushes connected status when a runtime reconnect becomes ready", async () => { + vi.useFakeTimers(); + try { + const { emitter, gateway } = createGatewayHarness(); + gateway.isConnected = true; + getDiscordGatewayEmitterMock.mockReturnValueOnce(emitter); + waitForDiscordGatewayStopMock.mockImplementationOnce(async () => { + gateway.isConnected = false; + emitter.emit("debug", "Gateway websocket opened"); + setTimeout(() => { + gateway.isConnected = true; + }, 1_000); + await vi.advanceTimersByTimeAsync(1_500); + }); + + const { lifecycleParams, statusSink } = createLifecycleHarness({ gateway }); + + await expect(runDiscordGatewayLifecycle(lifecycleParams)).resolves.toBeUndefined(); + + expect(statusSink).toHaveBeenCalledWith(expect.objectContaining({ connected: false })); + expect(statusSink).toHaveBeenCalledWith( + expect.objectContaining({ + connected: true, + lastDisconnect: null, + }), + ); + } finally { + vi.useRealTimers(); + } + }); + + it("force-stops when a runtime reconnect opens but never becomes ready", async () => { + vi.useFakeTimers(); + try { + const { emitter, gateway } = createGatewayHarness(); + gateway.isConnected = true; + getDiscordGatewayEmitterMock.mockReturnValueOnce(emitter); + waitForDiscordGatewayStopMock.mockImplementationOnce( + (params: WaitForDiscordGatewayStopParams) => + new Promise((_resolve, reject) => { + params.registerForceStop?.((err) => reject(err)); + gateway.isConnected = false; + emitter.emit("debug", "Gateway websocket opened"); + }), + ); + + const { lifecycleParams, runtimeError, statusSink } = createLifecycleHarness({ gateway }); + const lifecyclePromise = runDiscordGatewayLifecycle(lifecycleParams); + lifecyclePromise.catch(() => {}); + + await vi.advanceTimersByTimeAsync(30_500); + await expect(lifecyclePromise).rejects.toThrow( + "discord gateway opened but did not reach READY within 30000ms", + ); + expect(runtimeError).toHaveBeenCalledWith( + expect.stringContaining("did not reach READY within 30000ms"), + ); + expect(statusSink).toHaveBeenCalledWith( + expect.objectContaining({ + connected: false, + lastDisconnect: expect.objectContaining({ error: "runtime-not-ready" }), + }), + ); + } finally { + vi.useRealTimers(); + } }); }); diff --git a/extensions/discord/src/monitor/provider.lifecycle.ts b/extensions/discord/src/monitor/provider.lifecycle.ts index c206cc60ba9..2afdfd59f8e 100644 --- a/extensions/discord/src/monitor/provider.lifecycle.ts +++ b/extensions/discord/src/monitor/provider.lifecycle.ts @@ -1,3 +1,4 @@ +import { createConnectedChannelStatusPatch } from "openclaw/plugin-sdk/gateway-runtime"; import { danger } from "openclaw/plugin-sdk/runtime-env"; import type { RuntimeEnv } from "openclaw/plugin-sdk/runtime-env"; import { attachDiscordGatewayLogging } from "../gateway-logging.js"; @@ -6,14 +7,233 @@ import type { DiscordVoiceManager } from "../voice/manager.js"; import type { MutableDiscordGateway } from "./gateway-handle.js"; import { registerGateway, unregisterGateway } from "./gateway-registry.js"; import type { DiscordGatewayEvent, DiscordGatewaySupervisor } from "./gateway-supervisor.js"; -import { createDiscordGatewayReconnectController } from "./provider.lifecycle.reconnect.js"; import type { DiscordMonitorStatusSink } from "./status.js"; +const DISCORD_GATEWAY_READY_TIMEOUT_MS = 15_000; +const DISCORD_GATEWAY_RUNTIME_READY_TIMEOUT_MS = 30_000; +const DISCORD_GATEWAY_READY_POLL_MS = 250; + type ExecApprovalsHandler = { start: () => Promise; stop: () => Promise; }; +type GatewayReadyWaitResult = "ready" | "stopped" | "timeout"; + +function parseGatewayCloseCode(message: string): number | undefined { + const match = /Gateway websocket closed:\s*(\d{3,5})/.exec(message); + if (!match?.[1]) { + return undefined; + } + const code = Number.parseInt(match[1], 10); + return Number.isFinite(code) ? code : undefined; +} + +function createGatewayStatusObserver(params: { + gateway?: Pick; + abortSignal?: AbortSignal; + runtime: RuntimeEnv; + pushStatus: (patch: Parameters[0]) => void; + isLifecycleStopping: () => boolean; +}) { + let forceStopHandler: ((err: unknown) => void) | undefined; + let queuedForceStopError: unknown; + let readyPollId: ReturnType | undefined; + let readyTimeoutId: ReturnType | undefined; + + const shouldStop = () => params.abortSignal?.aborted || params.isLifecycleStopping(); + const clearReadyWatch = () => { + if (readyPollId) { + clearInterval(readyPollId); + readyPollId = undefined; + } + if (readyTimeoutId) { + clearTimeout(readyTimeoutId); + readyTimeoutId = undefined; + } + }; + const triggerForceStop = (err: unknown) => { + if (forceStopHandler) { + forceStopHandler(err); + return; + } + queuedForceStopError = err; + }; + const pushConnectedStatus = (at: number) => { + params.pushStatus({ + ...createConnectedChannelStatusPatch(at), + lastDisconnect: null, + lastError: null, + }); + }; + const startReadyWatch = () => { + clearReadyWatch(); + const pollConnected = () => { + if (shouldStop()) { + clearReadyWatch(); + return; + } + if (!params.gateway?.isConnected) { + return; + } + clearReadyWatch(); + pushConnectedStatus(Date.now()); + }; + + pollConnected(); + if (!readyTimeoutId) { + readyPollId = setInterval(pollConnected, DISCORD_GATEWAY_READY_POLL_MS); + readyPollId.unref?.(); + readyTimeoutId = setTimeout(() => { + clearReadyWatch(); + if (shouldStop() || params.gateway?.isConnected) { + return; + } + const at = Date.now(); + const error = new Error( + `discord gateway opened but did not reach READY within ${DISCORD_GATEWAY_RUNTIME_READY_TIMEOUT_MS}ms`, + ); + params.pushStatus({ + connected: false, + lastEventAt: at, + lastDisconnect: { + at, + error: "runtime-not-ready", + }, + lastError: "runtime-not-ready", + }); + params.runtime.error?.(danger(error.message)); + triggerForceStop(error); + }, DISCORD_GATEWAY_RUNTIME_READY_TIMEOUT_MS); + readyTimeoutId.unref?.(); + } + }; + + const onGatewayDebug = (msg: unknown) => { + if (shouldStop()) { + return; + } + const at = Date.now(); + const message = String(msg); + if (message.includes("Gateway websocket opened")) { + params.pushStatus({ connected: false, lastEventAt: at }); + startReadyWatch(); + return; + } + if (message.includes("Gateway websocket closed")) { + clearReadyWatch(); + const code = parseGatewayCloseCode(message); + params.pushStatus({ + connected: false, + lastEventAt: at, + lastDisconnect: { + at, + ...(code !== undefined ? { status: code } : {}), + }, + }); + return; + } + if (message.includes("Gateway reconnect scheduled in")) { + clearReadyWatch(); + params.pushStatus({ + connected: false, + lastEventAt: at, + lastError: message, + }); + } + }; + + return { + onGatewayDebug, + clearReadyWatch, + registerForceStop: (handler: (err: unknown) => void) => { + forceStopHandler = handler; + if (queuedForceStopError !== undefined) { + const err = queuedForceStopError; + queuedForceStopError = undefined; + handler(err); + } + }, + dispose: () => { + clearReadyWatch(); + forceStopHandler = undefined; + queuedForceStopError = undefined; + }, + }; +} + +async function waitForGatewayReady(params: { + gateway?: Pick; + abortSignal?: AbortSignal; + beforePoll?: () => Promise<"continue" | "stop"> | "continue" | "stop"; + pushStatus?: (patch: Parameters[0]) => void; + runtime: RuntimeEnv; + beforeRestart?: () => void; +}): Promise { + const waitUntilReady = async (): Promise => { + const deadlineAt = Date.now() + DISCORD_GATEWAY_READY_TIMEOUT_MS; + while (!params.abortSignal?.aborted) { + if ((await params.beforePoll?.()) === "stop") { + return "stopped"; + } + if (params.gateway?.isConnected ?? true) { + const at = Date.now(); + params.pushStatus?.({ + ...createConnectedChannelStatusPatch(at), + lastDisconnect: null, + }); + return "ready"; + } + if (Date.now() >= deadlineAt) { + return "timeout"; + } + await new Promise((resolve) => { + const timeout = setTimeout(resolve, DISCORD_GATEWAY_READY_POLL_MS); + timeout.unref?.(); + }); + } + return "stopped"; + }; + + const firstAttempt = await waitUntilReady(); + if (firstAttempt !== "timeout") { + return; + } + if (!params.gateway) { + throw new Error( + `discord gateway did not reach READY within ${DISCORD_GATEWAY_READY_TIMEOUT_MS}ms`, + ); + } + + const restartAt = Date.now(); + params.runtime.error?.( + danger( + `discord: gateway was not ready after ${DISCORD_GATEWAY_READY_TIMEOUT_MS}ms; restarting gateway`, + ), + ); + params.pushStatus?.({ + connected: false, + lastEventAt: restartAt, + lastDisconnect: { + at: restartAt, + error: "startup-not-ready", + }, + lastError: "startup-not-ready", + }); + if (params.abortSignal?.aborted) { + return; + } + params.beforeRestart?.(); + params.gateway.disconnect(); + params.gateway.connect(false); + + if ((await waitUntilReady()) === "timeout") { + throw new Error( + `discord gateway did not reach READY within ${DISCORD_GATEWAY_READY_TIMEOUT_MS}ms after restart`, + ); + } +} + export async function runDiscordGatewayLifecycle(params: { accountId: string; gateway?: MutableDiscordGateway; @@ -41,21 +261,19 @@ export async function runDiscordGatewayLifecycle(params: { const pushStatus = (patch: Parameters[0]) => { params.statusSink?.(patch); }; - const reconnectController = createDiscordGatewayReconnectController({ - accountId: params.accountId, + const statusObserver = createGatewayStatusObserver({ gateway, - runtime: params.runtime, abortSignal: params.abortSignal, + runtime: params.runtime, pushStatus, isLifecycleStopping: () => lifecycleStopping, - drainPendingGatewayErrors: () => drainPendingGatewayErrors(), }); - const onGatewayDebug = reconnectController.onGatewayDebug; - gatewayEmitter?.on("debug", onGatewayDebug); + gatewayEmitter?.on("debug", statusObserver.onGatewayDebug); let sawDisallowedIntents = false; const handleGatewayEvent = (event: DiscordGatewayEvent): "continue" | "stop" => { if (event.type === "disallowed-intents") { + lifecycleStopping = true; sawDisallowedIntents = true; params.runtime.error?.( danger( @@ -64,14 +282,8 @@ export async function runDiscordGatewayLifecycle(params: { ); return "stop"; } - // When we deliberately set maxAttempts=0 and disconnected (health-monitor - // stale-socket restart), Carbon fires "Max reconnect attempts (0)". This - // is expected — log at info instead of error to avoid false alarms. - if (lifecycleStopping && event.type === "reconnect-exhausted") { - params.runtime.log?.( - `discord: ignoring expected reconnect-exhausted during shutdown: ${event.message}`, - ); - return "stop"; + if (event.shouldStopLifecycle) { + lifecycleStopping = true; } params.runtime.error?.(danger(`discord gateway error: ${event.message}`)); return event.shouldStopLifecycle ? "stop" : "continue"; @@ -82,14 +294,7 @@ export async function runDiscordGatewayLifecycle(params: { if (decision !== "stop") { return "continue"; } - // Don't throw for expected shutdown events. `reconnect-exhausted` can be - // queued just before an abort-driven shutdown flips `lifecycleStopping`, - // so only suppress it when shutdown is already underway. - if ( - event.type === "disallowed-intents" || - (event.type === "reconnect-exhausted" && - (lifecycleStopping || params.abortSignal?.aborted === true)) - ) { + if (event.type === "disallowed-intents") { return "stop"; } throw event.err; @@ -104,7 +309,14 @@ export async function runDiscordGatewayLifecycle(params: { return; } - await reconnectController.ensureStartupReady(); + await waitForGatewayReady({ + gateway, + abortSignal: params.abortSignal, + beforePoll: drainPendingGatewayErrors, + pushStatus, + runtime: params.runtime, + beforeRestart: statusObserver.clearReadyWatch, + }); if (drainPendingGatewayErrors() === "stop") { return; @@ -119,7 +331,7 @@ export async function runDiscordGatewayLifecycle(params: { abortSignal: params.abortSignal, gatewaySupervisor: params.gatewaySupervisor, onGatewayEvent: handleGatewayEvent, - registerForceStop: reconnectController.registerForceStop, + registerForceStop: statusObserver.registerForceStop, }); } catch (err) { if (!sawDisallowedIntents && !params.isDisallowedIntentsError(err)) { @@ -130,8 +342,8 @@ export async function runDiscordGatewayLifecycle(params: { params.gatewaySupervisor.detachLifecycle(); unregisterGateway(params.accountId); stopGatewayLogging(); - reconnectController.dispose(); - gatewayEmitter?.removeListener("debug", onGatewayDebug); + statusObserver.dispose(); + gatewayEmitter?.removeListener("debug", statusObserver.onGatewayDebug); if (params.voiceManager) { await params.voiceManager.destroy(); params.voiceManagerRef.current = null; diff --git a/extensions/discord/src/monitor/provider.test.ts b/extensions/discord/src/monitor/provider.test.ts index 594473e27e0..7bef408bb9f 100644 --- a/extensions/discord/src/monitor/provider.test.ts +++ b/extensions/discord/src/monitor/provider.test.ts @@ -703,7 +703,7 @@ describe("monitorDiscordProvider", () => { name === "gateway" ? gateway : undefined, ); clientFetchUserMock.mockImplementationOnce(async () => { - emitter.emit("debug", "WebSocket connection opened"); + emitter.emit("debug", "Gateway websocket opened"); return { id: "bot-1", username: "Molty" }; }); isVerboseMock.mockReturnValue(true); @@ -722,7 +722,7 @@ describe("monitorDiscordProvider", () => { expect(messages.some((msg) => msg.includes("fetch-bot-identity:done"))).toBe(true); expect( messages.some( - (msg) => msg.includes("gateway-debug") && msg.includes("WebSocket connection opened"), + (msg) => msg.includes("gateway-debug") && msg.includes("Gateway websocket opened"), ), ).toBe(true); });