fix: let carbon own gateway reconnects (#59019)

* refactor(discord): let carbon own gateway reconnects

* fix: finalize Discord gateway reconnect landing (#59019)
This commit is contained in:
Ayaan Zaidi 2026-04-01 19:12:35 +05:30 committed by GitHub
parent c5cfc05104
commit 095e7b830a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 435 additions and 1122 deletions

View File

@ -61,6 +61,7 @@ Docs: https://docs.openclaw.ai
- ACPX/runtime: repair `queue owner unavailable` session recovery by replacing dead named sessions and resuming the backend session when ACPX exposes a stable session id, so the first ACP prompt no longer inherits a dead handle. (#58669) Thanks @neeravmakwana
- ACPX/runtime: retry dead-session queue-owner repair without `--resume-session` when the reported ACPX session id is stale, so recovery still creates a fresh named session instead of failing session init. Thanks @obviyus.
- Auth/OpenAI Codex: persist plugin-refreshed OAuth credentials to `auth-profiles.json` before returning them, so rotated Codex refresh tokens survive restart and stop falling into `refresh_token_reused` loops. (#53082)
- Discord/gateway: hand reconnect ownership back to Carbon, keep runtime status aligned with close/reconnect state, and force-stop sockets that open without reaching READY so Discord monitors recover promptly instead of waiting on stale health timeouts. (#59019) Thanks @obviyus
## 2026.3.31

View File

@ -37,20 +37,25 @@ describe("attachDiscordGatewayLogging", () => {
runtime,
});
emitter.emit("debug", "WebSocket connection opened");
emitter.emit("debug", "WebSocket connection closed with code 1001");
emitter.emit("debug", "Reconnecting with backoff: 1000ms after code 1001");
emitter.emit("debug", "Gateway websocket opened");
emitter.emit("debug", "Gateway websocket closed: 1001");
emitter.emit("debug", "Gateway reconnect scheduled in 1000ms (close, resume=true)");
emitter.emit("debug", "Gateway forcing fresh IDENTIFY after 3 failed resume attempts");
const logVerboseMock = vi.mocked(logVerbose);
expect(logVerboseMock).toHaveBeenCalledTimes(3);
expect(runtime.log).toHaveBeenCalledTimes(2);
expect(logVerboseMock).toHaveBeenCalledTimes(4);
expect(runtime.log).toHaveBeenCalledTimes(3);
expect(runtime.log).toHaveBeenNthCalledWith(
1,
"discord gateway: WebSocket connection closed with code 1001",
"discord gateway: Gateway websocket closed: 1001",
);
expect(runtime.log).toHaveBeenNthCalledWith(
2,
"discord gateway: Reconnecting with backoff: 1000ms after code 1001",
"discord gateway: Gateway reconnect scheduled in 1000ms (close, resume=true)",
);
expect(runtime.log).toHaveBeenNthCalledWith(
3,
"discord gateway: Gateway forcing fresh IDENTIFY after 3 failed resume attempts",
);
cleanup();
@ -88,7 +93,7 @@ describe("attachDiscordGatewayLogging", () => {
const logVerboseMock = vi.mocked(logVerbose);
logVerboseMock.mockClear();
emitter.emit("debug", "WebSocket connection closed with code 1001");
emitter.emit("debug", "Gateway websocket closed: 1001");
emitter.emit("warning", "High latency detected: 1200ms");
emitter.emit("metrics", { latency: 42 });

View File

@ -5,9 +5,9 @@ import type { RuntimeEnv } from "openclaw/plugin-sdk/runtime-env";
type GatewayEmitter = Pick<EventEmitter, "on" | "removeListener">;
const INFO_DEBUG_MARKERS = [
"WebSocket connection closed",
"Reconnecting with backoff",
"Attempting resume with backoff",
"Gateway websocket closed",
"Gateway reconnect scheduled in",
"Gateway forcing fresh IDENTIFY after",
];
const shouldPromoteGatewayDebug = (message: string) =>

View File

@ -127,39 +127,19 @@ describe("waitForDiscordGatewayStop", () => {
it("rejects via registerForceStop and disconnects gateway", async () => {
let forceStop: ((err: unknown) => void) | undefined;
const { detachLifecycle, disconnect, promise } = startGatewayWait({
registerForceStop: (fn) => {
forceStop = fn;
registerForceStop: (handler) => {
forceStop = handler;
},
});
if (!forceStop) {
throw new Error("registerForceStop did not expose a stopper callback");
}
forceStop(new Error("reconnect watchdog timeout"));
forceStop?.(new Error("runtime-not-ready"));
await expect(promise).rejects.toThrow("reconnect watchdog timeout");
await expect(promise).rejects.toThrow("runtime-not-ready");
expect(disconnect).toHaveBeenCalledTimes(1);
expect(detachLifecycle).toHaveBeenCalledTimes(1);
});
it("ignores forceStop after promise already settled", async () => {
let forceStop: ((err: unknown) => void) | undefined;
const { abort, disconnect, promise } = startGatewayWait({
registerForceStop: (fn) => {
forceStop = fn;
},
});
abort.abort();
await expect(promise).resolves.toBeUndefined();
forceStop?.(new Error("too late"));
expect(disconnect).toHaveBeenCalledTimes(1);
});
it("keeps the lifecycle handler active until disconnect returns on abort", async () => {
const onGatewayEvent = vi.fn(() => "stop" as const);
const fatalEvent = createGatewayEvent("fatal", "disconnect emitted error");

View File

@ -65,7 +65,6 @@ export async function waitForDiscordGatewayStop(
const onForceStop = (err: unknown) => {
finishReject(err);
};
if (abortSignal?.aborted) {
onAbort();
return;

View File

@ -1,497 +0,0 @@
import { createArmableStallWatchdog } from "openclaw/plugin-sdk/channel-lifecycle";
import { createConnectedChannelStatusPatch } from "openclaw/plugin-sdk/gateway-runtime";
import { danger } from "openclaw/plugin-sdk/runtime-env";
import type { RuntimeEnv } from "openclaw/plugin-sdk/runtime-env";
import type { MutableDiscordGateway } from "./gateway-handle.js";
import type { DiscordMonitorStatusSink } from "./status.js";
const DISCORD_GATEWAY_READY_TIMEOUT_MS = 15_000;
const DISCORD_GATEWAY_READY_POLL_MS = 250;
const DISCORD_GATEWAY_DISCONNECT_DRAIN_TIMEOUT_MS = 5_000;
const DISCORD_GATEWAY_FORCE_TERMINATE_CLOSE_TIMEOUT_MS = 1_000;
const DISCORD_GATEWAY_HELLO_TIMEOUT_MS = 30_000;
const DISCORD_GATEWAY_HELLO_CONNECTED_POLL_MS = 250;
const DISCORD_GATEWAY_MAX_CONSECUTIVE_HELLO_STALLS = 3;
const DISCORD_GATEWAY_RECONNECT_STALL_TIMEOUT_MS = 5 * 60_000;
type GatewayReadyWaitResult = "ready" | "timeout" | "stopped";
async function waitForDiscordGatewayReady(params: {
gateway?: Pick<MutableDiscordGateway, "isConnected">;
abortSignal?: AbortSignal;
timeoutMs: number;
beforePoll?: () => Promise<"continue" | "stop"> | "continue" | "stop";
}): Promise<GatewayReadyWaitResult> {
const deadlineAt = Date.now() + params.timeoutMs;
while (!params.abortSignal?.aborted) {
const pollDecision = await params.beforePoll?.();
if (pollDecision === "stop") {
return "stopped";
}
if (params.gateway?.isConnected) {
return "ready";
}
if (Date.now() >= deadlineAt) {
return "timeout";
}
await new Promise<void>((resolve) => {
const timeout = setTimeout(resolve, DISCORD_GATEWAY_READY_POLL_MS);
timeout.unref?.();
});
}
return "stopped";
}
export function createDiscordGatewayReconnectController(params: {
accountId: string;
gateway?: MutableDiscordGateway;
runtime: RuntimeEnv;
abortSignal?: AbortSignal;
pushStatus: (patch: Parameters<DiscordMonitorStatusSink>[0]) => void;
isLifecycleStopping: () => boolean;
drainPendingGatewayErrors: () => "continue" | "stop";
}) {
let forceStopHandler: ((err: unknown) => void) | undefined;
let queuedForceStopError: unknown;
let helloTimeoutId: ReturnType<typeof setTimeout> | undefined;
let helloConnectedPollId: ReturnType<typeof setInterval> | undefined;
let reconnectInFlight: Promise<void> | undefined;
let consecutiveHelloStalls = 0;
const shouldStop = () => params.isLifecycleStopping() || params.abortSignal?.aborted;
const resetHelloStallCounter = () => {
consecutiveHelloStalls = 0;
};
const clearHelloWatch = () => {
if (helloTimeoutId) {
clearTimeout(helloTimeoutId);
helloTimeoutId = undefined;
}
if (helloConnectedPollId) {
clearInterval(helloConnectedPollId);
helloConnectedPollId = undefined;
}
};
const parseGatewayCloseCode = (message: string): number | undefined => {
const match = /code\s+(\d{3,5})/i.exec(message);
if (!match?.[1]) {
return undefined;
}
const code = Number.parseInt(match[1], 10);
return Number.isFinite(code) ? code : undefined;
};
const clearResumeState = () => {
if (!params.gateway?.state) {
return;
}
params.gateway.state.sessionId = null;
params.gateway.state.resumeGatewayUrl = null;
params.gateway.state.sequence = null;
params.gateway.sequence = null;
};
const triggerForceStop = (err: unknown) => {
if (forceStopHandler) {
forceStopHandler(err);
return;
}
queuedForceStopError = err;
};
const reconnectStallWatchdog = createArmableStallWatchdog({
label: `discord:${params.accountId}:reconnect`,
timeoutMs: DISCORD_GATEWAY_RECONNECT_STALL_TIMEOUT_MS,
abortSignal: params.abortSignal,
runtime: params.runtime,
onTimeout: () => {
if (shouldStop()) {
return;
}
const at = Date.now();
const error = new Error(
`discord reconnect watchdog timeout after ${DISCORD_GATEWAY_RECONNECT_STALL_TIMEOUT_MS}ms`,
);
params.pushStatus({
connected: false,
lastEventAt: at,
lastDisconnect: {
at,
error: error.message,
},
lastError: error.message,
});
params.runtime.error?.(
danger(
`discord: reconnect watchdog timeout after ${DISCORD_GATEWAY_RECONNECT_STALL_TIMEOUT_MS}ms; force-stopping monitor task`,
),
);
triggerForceStop(error);
},
});
const pushConnectedStatus = (at: number) => {
params.pushStatus({
...createConnectedChannelStatusPatch(at),
lastDisconnect: null,
});
};
const disconnectGatewaySocketWithoutAutoReconnect = async () => {
if (!params.gateway) {
return;
}
const gateway = params.gateway;
const socket = gateway.ws;
if (!socket) {
gateway.disconnect();
return;
}
// Carbon reconnects from the socket close handler even for intentional
// disconnects. Drop the current socket's close/error listeners so a forced
// reconnect does not race the old socket's automatic resume path.
for (const listener of socket.listeners("close")) {
socket.removeListener("close", listener);
}
for (const listener of socket.listeners("error")) {
socket.removeListener("error", listener);
}
await new Promise<void>((resolve, reject) => {
let settled = false;
let drainTimeout: ReturnType<typeof setTimeout> | undefined;
let terminateCloseTimeout: ReturnType<typeof setTimeout> | undefined;
const ignoreSocketError = () => {};
const clearPendingTimers = () => {
if (drainTimeout) {
clearTimeout(drainTimeout);
drainTimeout = undefined;
}
if (terminateCloseTimeout) {
clearTimeout(terminateCloseTimeout);
terminateCloseTimeout = undefined;
}
};
const cleanup = () => {
clearPendingTimers();
socket.removeListener("close", onClose);
socket.removeListener("error", ignoreSocketError);
};
const onClose = () => {
cleanup();
if (settled) {
return;
}
settled = true;
resolve();
};
const resolveStoppedWait = () => {
if (settled) {
return;
}
settled = true;
clearPendingTimers();
resolve();
};
const rejectClose = (error: Error) => {
if (shouldStop()) {
resolveStoppedWait();
return;
}
if (settled) {
return;
}
settled = true;
clearPendingTimers();
reject(error);
};
drainTimeout = setTimeout(() => {
if (settled) {
return;
}
if (shouldStop()) {
resolveStoppedWait();
return;
}
params.runtime.error?.(
danger(
`discord: gateway socket did not close within ${DISCORD_GATEWAY_DISCONNECT_DRAIN_TIMEOUT_MS}ms before reconnect; attempting forced terminate before giving up`,
),
);
let terminateStarted = false;
try {
if (typeof socket.terminate === "function") {
socket.terminate();
terminateStarted = true;
}
} catch {
// Best-effort only. If terminate fails, fail closed instead of
// opening another socket on top of an unknown old one.
}
if (!terminateStarted) {
params.runtime.error?.(
danger(
`discord: gateway socket did not expose a working terminate() after ${DISCORD_GATEWAY_DISCONNECT_DRAIN_TIMEOUT_MS}ms; force-stopping instead of opening a parallel socket`,
),
);
rejectClose(
new Error(
`discord gateway socket did not close within ${DISCORD_GATEWAY_DISCONNECT_DRAIN_TIMEOUT_MS}ms before reconnect`,
),
);
return;
}
terminateCloseTimeout = setTimeout(() => {
if (settled) {
return;
}
if (shouldStop()) {
resolveStoppedWait();
return;
}
params.runtime.error?.(
danger(
`discord: gateway socket did not close ${DISCORD_GATEWAY_FORCE_TERMINATE_CLOSE_TIMEOUT_MS}ms after forced terminate; force-stopping instead of opening a parallel socket`,
),
);
rejectClose(
new Error(
`discord gateway socket did not close within ${DISCORD_GATEWAY_DISCONNECT_DRAIN_TIMEOUT_MS}ms before reconnect`,
),
);
}, DISCORD_GATEWAY_FORCE_TERMINATE_CLOSE_TIMEOUT_MS);
terminateCloseTimeout.unref?.();
}, DISCORD_GATEWAY_DISCONNECT_DRAIN_TIMEOUT_MS);
drainTimeout.unref?.();
socket.on("error", ignoreSocketError);
socket.on("close", onClose);
gateway.disconnect();
});
};
const reconnectGateway = async (reconnectParams: {
resume: boolean;
forceFreshIdentify?: boolean;
}) => {
if (reconnectInFlight) {
return await reconnectInFlight;
}
reconnectInFlight = (async () => {
if (reconnectParams.forceFreshIdentify) {
// Carbon still sends RESUME on HELLO when session state is populated,
// even after connect(false). Clear cached session data first so this
// path truly forces a fresh IDENTIFY.
clearResumeState();
}
if (shouldStop()) {
return;
}
await disconnectGatewaySocketWithoutAutoReconnect();
if (shouldStop()) {
return;
}
params.gateway?.connect(reconnectParams.resume);
})().finally(() => {
reconnectInFlight = undefined;
});
return await reconnectInFlight;
};
const reconnectGatewayFresh = async () => {
await reconnectGateway({ resume: false, forceFreshIdentify: true });
};
const onGatewayDebug = (msg: unknown) => {
const message = String(msg);
const at = Date.now();
params.pushStatus({ lastEventAt: at });
if (message.includes("WebSocket connection closed")) {
if (params.gateway?.isConnected) {
resetHelloStallCounter();
}
reconnectStallWatchdog.arm(at);
params.pushStatus({
connected: false,
lastDisconnect: {
at,
status: parseGatewayCloseCode(message),
},
});
clearHelloWatch();
return;
}
if (!message.includes("WebSocket connection opened")) {
return;
}
reconnectStallWatchdog.disarm();
clearHelloWatch();
let sawConnected = params.gateway?.isConnected === true;
if (sawConnected) {
pushConnectedStatus(at);
}
helloConnectedPollId = setInterval(() => {
if (!params.gateway?.isConnected) {
return;
}
sawConnected = true;
resetHelloStallCounter();
reconnectStallWatchdog.disarm();
pushConnectedStatus(Date.now());
if (helloConnectedPollId) {
clearInterval(helloConnectedPollId);
helloConnectedPollId = undefined;
}
}, DISCORD_GATEWAY_HELLO_CONNECTED_POLL_MS);
helloTimeoutId = setTimeout(() => {
helloTimeoutId = undefined;
void (async () => {
try {
if (helloConnectedPollId) {
clearInterval(helloConnectedPollId);
helloConnectedPollId = undefined;
}
if (sawConnected || params.gateway?.isConnected) {
resetHelloStallCounter();
return;
}
consecutiveHelloStalls += 1;
const forceFreshIdentify =
consecutiveHelloStalls >= DISCORD_GATEWAY_MAX_CONSECUTIVE_HELLO_STALLS;
const stalledAt = Date.now();
reconnectStallWatchdog.arm(stalledAt);
params.pushStatus({
connected: false,
lastEventAt: stalledAt,
lastDisconnect: {
at: stalledAt,
error: "hello-timeout",
},
});
params.runtime.log?.(
danger(
forceFreshIdentify
? `connection stalled: no HELLO within ${DISCORD_GATEWAY_HELLO_TIMEOUT_MS}ms (${consecutiveHelloStalls}/${DISCORD_GATEWAY_MAX_CONSECUTIVE_HELLO_STALLS}); forcing fresh identify`
: `connection stalled: no HELLO within ${DISCORD_GATEWAY_HELLO_TIMEOUT_MS}ms (${consecutiveHelloStalls}/${DISCORD_GATEWAY_MAX_CONSECUTIVE_HELLO_STALLS}); retrying resume`,
),
);
if (forceFreshIdentify) {
resetHelloStallCounter();
}
if (shouldStop()) {
return;
}
if (forceFreshIdentify) {
await reconnectGatewayFresh();
return;
}
await reconnectGateway({ resume: true });
} catch (err) {
params.runtime.error?.(
danger(`discord: failed to restart stalled gateway socket: ${String(err)}`),
);
triggerForceStop(err);
}
})();
}, DISCORD_GATEWAY_HELLO_TIMEOUT_MS);
};
const onAbort = () => {
reconnectStallWatchdog.disarm();
const at = Date.now();
params.pushStatus({ connected: false, lastEventAt: at });
if (!params.gateway) {
return;
}
params.gateway.options.reconnect = { maxAttempts: 0 };
params.gateway.disconnect();
};
const ensureStartupReady = async () => {
if (!params.gateway || params.gateway.isConnected || shouldStop()) {
if (params.gateway?.isConnected && !shouldStop()) {
pushConnectedStatus(Date.now());
}
return;
}
const initialReady = await waitForDiscordGatewayReady({
gateway: params.gateway,
abortSignal: params.abortSignal,
timeoutMs: DISCORD_GATEWAY_READY_TIMEOUT_MS,
beforePoll: params.drainPendingGatewayErrors,
});
if (initialReady === "stopped" || shouldStop()) {
return;
}
if (initialReady === "timeout") {
params.runtime.error?.(
danger(
`discord: gateway was not ready after ${DISCORD_GATEWAY_READY_TIMEOUT_MS}ms; forcing a fresh reconnect`,
),
);
const startupRetryAt = Date.now();
params.pushStatus({
connected: false,
lastEventAt: startupRetryAt,
lastDisconnect: {
at: startupRetryAt,
error: "startup-not-ready",
},
});
await reconnectGatewayFresh();
const reconnected = await waitForDiscordGatewayReady({
gateway: params.gateway,
abortSignal: params.abortSignal,
timeoutMs: DISCORD_GATEWAY_READY_TIMEOUT_MS,
beforePoll: params.drainPendingGatewayErrors,
});
if (reconnected === "stopped" || shouldStop()) {
return;
}
if (reconnected === "timeout") {
const error = new Error(
`discord gateway did not reach READY within ${DISCORD_GATEWAY_READY_TIMEOUT_MS}ms after a forced reconnect`,
);
const startupFailureAt = Date.now();
params.pushStatus({
connected: false,
lastEventAt: startupFailureAt,
lastDisconnect: {
at: startupFailureAt,
error: "startup-reconnect-timeout",
},
lastError: error.message,
});
throw error;
}
}
if (params.gateway.isConnected && !shouldStop()) {
pushConnectedStatus(Date.now());
}
};
if (params.abortSignal?.aborted) {
onAbort();
} else {
params.abortSignal?.addEventListener("abort", onAbort, { once: true });
}
return {
ensureStartupReady,
onAbort,
onGatewayDebug,
clearHelloWatch,
registerForceStop: (handler: (err: unknown) => void) => {
forceStopHandler = handler;
if (queuedForceStopError !== undefined) {
const queued = queuedForceStopError;
queuedForceStopError = undefined;
handler(queued);
}
},
dispose: () => {
reconnectStallWatchdog.stop();
clearHelloWatch();
params.abortSignal?.removeEventListener("abort", onAbort);
},
};
}

View File

@ -14,14 +14,7 @@ type MockGateway = {
options: GatewayPlugin["options"];
disconnect: Mock<() => void>;
connect: Mock<(resume?: boolean) => void>;
state?: {
sessionId?: string | null;
resumeGatewayUrl?: string | null;
sequence?: number | null;
};
sequence?: number | null;
emitter: EventEmitter;
ws?: EventEmitter & { terminate?: () => void };
};
const {
@ -76,14 +69,40 @@ describe("runDiscordGatewayLifecycle", () => {
stopGatewayLoggingMock.mockClear();
});
const createLifecycleHarness = (params?: {
accountId?: string;
function createGatewayHarness(): { emitter: EventEmitter; gateway: MockGateway } {
const emitter = new EventEmitter();
return {
emitter,
gateway: {
isConnected: false,
options: { intents: 0, reconnect: { maxAttempts: 50 } } as GatewayPlugin["options"],
disconnect: vi.fn(),
connect: vi.fn(),
emitter,
},
};
}
function createGatewayEvent(
type: DiscordGatewayEvent["type"],
message: string,
): DiscordGatewayEvent {
const err = new Error(message);
return {
type,
err,
message: String(err),
shouldStopLifecycle: type !== "other",
};
}
function createLifecycleHarness(params?: {
gateway?: MockGateway;
start?: () => Promise<void>;
stop?: () => Promise<void>;
isDisallowedIntentsError?: (err: unknown) => boolean;
pendingGatewayEvents?: DiscordGatewayEvent[];
gateway?: MockGateway;
}) => {
}) {
const gateway =
params?.gateway ??
(() => {
@ -96,15 +115,11 @@ describe("runDiscordGatewayLifecycle", () => {
const threadStop = vi.fn();
const runtimeLog = vi.fn();
const runtimeError = vi.fn();
const runtimeExit = vi.fn();
const pendingGatewayEvents = params?.pendingGatewayEvents ?? [];
const gatewaySupervisor = {
attachLifecycle: vi.fn(),
detachLifecycle: vi.fn(),
drainPending: vi.fn((handler: (event: DiscordGatewayEvent) => "continue" | "stop") => {
if (pendingGatewayEvents.length === 0) {
return "continue";
}
const queued = [...pendingGatewayEvents];
pendingGatewayEvents.length = 0;
for (const event of queued) {
@ -121,7 +136,7 @@ describe("runDiscordGatewayLifecycle", () => {
const runtime: RuntimeEnv = {
log: runtimeLog,
error: runtimeError,
exit: runtimeExit,
exit: vi.fn(),
};
return {
start,
@ -132,7 +147,7 @@ describe("runDiscordGatewayLifecycle", () => {
gatewaySupervisor,
statusSink,
lifecycleParams: {
accountId: params?.accountId ?? "default",
accountId: "default",
gateway: gateway as unknown as MutableDiscordGateway,
runtime,
isDisallowedIntentsError: params?.isDisallowedIntentsError ?? (() => false),
@ -145,7 +160,7 @@ describe("runDiscordGatewayLifecycle", () => {
abortSignal: undefined as AbortSignal | undefined,
} satisfies LifecycleParams,
};
};
}
function expectLifecycleCleanup(params: {
start: ReturnType<typeof vi.fn>;
@ -163,64 +178,6 @@ describe("runDiscordGatewayLifecycle", () => {
expect(params.gatewaySupervisor.detachLifecycle).toHaveBeenCalledTimes(1);
}
function createGatewayHarness(params?: {
state?: {
sessionId?: string | null;
resumeGatewayUrl?: string | null;
sequence?: number | null;
};
sequence?: number | null;
ws?: EventEmitter & { terminate?: () => void };
}): { emitter: EventEmitter; gateway: MockGateway } {
const emitter = new EventEmitter();
const gateway: MockGateway = {
isConnected: false,
options: { intents: 0 } as GatewayPlugin["options"],
disconnect: vi.fn(),
connect: vi.fn(),
...(params?.state ? { state: params.state } : {}),
...(params?.sequence !== undefined ? { sequence: params.sequence } : {}),
...(params?.ws ? { ws: params.ws } : {}),
emitter,
};
return { emitter, gateway };
}
async function emitGatewayOpenAndWait(emitter: EventEmitter, delayMs = 30000): Promise<void> {
emitter.emit("debug", "WebSocket connection opened");
await vi.advanceTimersByTimeAsync(delayMs);
}
function createGatewayEvent(
type: DiscordGatewayEvent["type"],
message: string,
): DiscordGatewayEvent {
const err = new Error(message);
return {
type,
err,
message: String(err),
shouldStopLifecycle: type !== "other",
};
}
function expectGatewaySessionStateCleared(gateway: {
state?: {
sessionId?: string | null;
resumeGatewayUrl?: string | null;
sequence?: number | null;
};
sequence?: number | null;
}) {
if (!gateway.state) {
throw new Error("gateway state was not initialized");
}
expect(gateway.state.sessionId).toBeNull();
expect(gateway.state.resumeGatewayUrl).toBeNull();
expect(gateway.state.sequence).toBeNull();
expect(gateway.sequence).toBeNull();
}
it("cleans up thread bindings when exec approvals startup fails", async () => {
const { lifecycleParams, start, stop, threadStop, gatewaySupervisor } = createLifecycleHarness({
start: async () => {
@ -257,21 +214,6 @@ describe("runDiscordGatewayLifecycle", () => {
});
});
it("cleans up after successful gateway wait", async () => {
const { lifecycleParams, start, stop, threadStop, gatewaySupervisor } =
createLifecycleHarness();
await expect(runDiscordGatewayLifecycle(lifecycleParams)).resolves.toBeUndefined();
expectLifecycleCleanup({
start,
stop,
threadStop,
waitCalls: 1,
gatewaySupervisor,
});
});
it("pushes connected status when gateway is already connected at lifecycle start", async () => {
const { emitter, gateway } = createGatewayHarness();
gateway.isConnected = true;
@ -280,26 +222,20 @@ describe("runDiscordGatewayLifecycle", () => {
const { lifecycleParams, statusSink } = createLifecycleHarness({ gateway });
await expect(runDiscordGatewayLifecycle(lifecycleParams)).resolves.toBeUndefined();
const connectedCall = statusSink.mock.calls.find((call) => {
const patch = (call[0] ?? {}) as Record<string, unknown>;
return patch.connected === true;
});
if (!connectedCall) {
throw new Error("connected status update was not emitted");
}
expect(connectedCall[0]).toMatchObject({
connected: true,
lastDisconnect: null,
});
expect(connectedCall[0].lastConnectedAt).toBeTypeOf("number");
expect(statusSink).toHaveBeenCalledWith(
expect.objectContaining({
connected: true,
lastDisconnect: null,
}),
);
});
it("forces a fresh reconnect when startup never reaches READY, then recovers", async () => {
it("restarts the gateway once when startup never reaches READY, then recovers", async () => {
vi.useFakeTimers();
try {
const { emitter, gateway } = createGatewayHarness();
getDiscordGatewayEmitterMock.mockReturnValueOnce(emitter);
gateway.connect.mockImplementation((_resume?: boolean) => {
gateway.connect.mockImplementation(() => {
setTimeout(() => {
gateway.isConnected = true;
}, 1_000);
@ -307,11 +243,12 @@ describe("runDiscordGatewayLifecycle", () => {
const { lifecycleParams, runtimeError } = createLifecycleHarness({ gateway });
const lifecyclePromise = runDiscordGatewayLifecycle(lifecycleParams);
await vi.advanceTimersByTimeAsync(15_000 + 1_000);
await vi.advanceTimersByTimeAsync(16_500);
await expect(lifecyclePromise).resolves.toBeUndefined();
expect(runtimeError).toHaveBeenCalledWith(
expect.stringContaining("gateway was not ready after 15000ms"),
expect.stringContaining("gateway was not ready after 15000ms; restarting gateway"),
);
expect(gateway.disconnect).toHaveBeenCalledTimes(1);
expect(gateway.connect).toHaveBeenCalledTimes(1);
@ -321,218 +258,7 @@ describe("runDiscordGatewayLifecycle", () => {
}
});
it("clears resume state and suppresses socket-driven auto-resume during forced startup reconnects", async () => {
vi.useFakeTimers();
try {
const pendingGatewayEvents: DiscordGatewayEvent[] = [];
const socket = new EventEmitter();
const { emitter, gateway } = createGatewayHarness({
state: {
sessionId: "stale-session",
resumeGatewayUrl: "wss://gateway.discord.gg",
sequence: 123,
},
sequence: 123,
ws: socket,
});
getDiscordGatewayEmitterMock.mockReturnValueOnce(emitter);
socket.on("error", (err) => {
pendingGatewayEvents.push({
type: "other",
err,
message: String(err),
shouldStopLifecycle: false,
});
});
socket.on("close", () => {
gateway.connect(true);
});
gateway.disconnect.mockImplementation(() => {
setTimeout(() => {
socket.emit(
"error",
new Error("WebSocket was closed before the connection was established"),
);
socket.emit("close", 1006, "");
}, 1);
});
gateway.connect.mockImplementation((resume?: boolean) => {
if (resume === false) {
setTimeout(() => {
gateway.isConnected = true;
}, 1_000);
}
});
const { lifecycleParams, runtimeError } = createLifecycleHarness({
gateway,
pendingGatewayEvents,
});
const lifecyclePromise = runDiscordGatewayLifecycle(lifecycleParams);
await vi.advanceTimersByTimeAsync(17_000);
await expect(lifecyclePromise).resolves.toBeUndefined();
expect(gateway.connect).toHaveBeenCalledTimes(1);
expect(gateway.connect).toHaveBeenCalledWith(false);
expect(runtimeError).not.toHaveBeenCalledWith(
expect.stringContaining("WebSocket was closed before the connection was established"),
);
expectGatewaySessionStateCleared(gateway);
} finally {
vi.useRealTimers();
}
});
it("waits for forced terminate to close the old socket before reconnecting", async () => {
vi.useFakeTimers();
try {
const socket = Object.assign(new EventEmitter(), {
terminate: vi.fn(() => {
setTimeout(() => {
socket.emit(
"error",
new Error("WebSocket was closed before the connection was established"),
);
socket.emit("close", 1006, "");
}, 1);
}),
});
const { emitter, gateway } = createGatewayHarness({ ws: socket });
getDiscordGatewayEmitterMock.mockReturnValueOnce(emitter);
gateway.connect.mockImplementation((_resume?: boolean) => {
setTimeout(() => {
gateway.isConnected = true;
}, 1_000);
});
const { lifecycleParams, runtimeError } = createLifecycleHarness({ gateway });
const lifecyclePromise = runDiscordGatewayLifecycle(lifecycleParams);
await vi.advanceTimersByTimeAsync(15_000 + 5_000 + 1_500);
await expect(lifecyclePromise).resolves.toBeUndefined();
expect(socket.terminate).toHaveBeenCalledTimes(1);
expect(gateway.connect).toHaveBeenCalledTimes(1);
expect(gateway.connect).toHaveBeenCalledWith(false);
expect(runtimeError).toHaveBeenCalledWith(
expect.stringContaining("attempting forced terminate before giving up"),
);
expect(runtimeError).not.toHaveBeenCalledWith(
expect.stringContaining("WebSocket was closed before the connection was established"),
);
} finally {
vi.useRealTimers();
}
});
it("fails closed when forced terminate still does not close the old socket", async () => {
vi.useFakeTimers();
try {
const socket = Object.assign(new EventEmitter(), {
terminate: vi.fn(() => {
setTimeout(() => {
socket.emit(
"error",
new Error("WebSocket was closed before the connection was established"),
);
}, 1);
}),
});
const { emitter, gateway } = createGatewayHarness({ ws: socket });
getDiscordGatewayEmitterMock.mockReturnValueOnce(emitter);
const { lifecycleParams, start, stop, threadStop, runtimeError, gatewaySupervisor } =
createLifecycleHarness({ gateway });
const lifecyclePromise = runDiscordGatewayLifecycle(lifecycleParams);
lifecyclePromise.catch(() => {});
await vi.advanceTimersByTimeAsync(15_000 + 5_000 + 1_500);
await expect(lifecyclePromise).rejects.toThrow(
"discord gateway socket did not close within 5000ms before reconnect",
);
expect(socket.terminate).toHaveBeenCalledTimes(1);
expect(gateway.connect).not.toHaveBeenCalled();
expect(runtimeError).toHaveBeenCalledWith(
expect.stringContaining("force-stopping instead of opening a parallel socket"),
);
expect(runtimeError).not.toHaveBeenCalledWith(
expect.stringContaining("WebSocket was closed before the connection was established"),
);
expectLifecycleCleanup({
start,
stop,
threadStop,
waitCalls: 0,
gatewaySupervisor,
});
} finally {
vi.useRealTimers();
}
});
it("does not reconnect after lifecycle shutdown begins during socket drain", async () => {
vi.useFakeTimers();
try {
const socket = new EventEmitter();
const { emitter, gateway } = createGatewayHarness({ ws: socket });
getDiscordGatewayEmitterMock.mockReturnValueOnce(emitter);
gateway.disconnect.mockImplementation(() => {
setTimeout(() => {
socket.emit("close", 1000, "");
}, 1_000);
});
const abortController = new AbortController();
const { lifecycleParams } = createLifecycleHarness({ gateway });
lifecycleParams.abortSignal = abortController.signal;
const lifecyclePromise = runDiscordGatewayLifecycle(lifecycleParams);
await vi.advanceTimersByTimeAsync(15_100);
abortController.abort();
await vi.advanceTimersByTimeAsync(2_000);
await expect(lifecyclePromise).resolves.toBeUndefined();
expect(gateway.connect).not.toHaveBeenCalled();
} finally {
vi.useRealTimers();
}
});
it("treats drain timeout as a graceful stop after lifecycle abort", async () => {
vi.useFakeTimers();
try {
const socket = new EventEmitter();
const { emitter, gateway } = createGatewayHarness({ ws: socket });
getDiscordGatewayEmitterMock.mockReturnValueOnce(emitter);
const abortController = new AbortController();
const { lifecycleParams, start, stop, threadStop, runtimeError, gatewaySupervisor } =
createLifecycleHarness({ gateway });
lifecycleParams.abortSignal = abortController.signal;
const lifecyclePromise = runDiscordGatewayLifecycle(lifecycleParams);
await vi.advanceTimersByTimeAsync(15_100);
abortController.abort();
await vi.advanceTimersByTimeAsync(5_500);
await expect(lifecyclePromise).resolves.toBeUndefined();
expect(gateway.connect).not.toHaveBeenCalled();
expect(runtimeError).not.toHaveBeenCalledWith(
expect.stringContaining("gateway socket did not close within 5000ms before reconnect"),
);
expectLifecycleCleanup({
start,
stop,
threadStop,
waitCalls: 1,
gatewaySupervisor,
});
} finally {
vi.useRealTimers();
}
});
it("fails fast when startup never reaches READY after a forced reconnect", async () => {
it("fails when startup still is not ready after a restart", async () => {
vi.useFakeTimers();
try {
const { emitter, gateway } = createGatewayHarness();
@ -542,11 +268,11 @@ describe("runDiscordGatewayLifecycle", () => {
const lifecyclePromise = runDiscordGatewayLifecycle(lifecycleParams);
lifecyclePromise.catch(() => {});
await vi.advanceTimersByTimeAsync(15_000 * 2 + 1_000);
await expect(lifecyclePromise).rejects.toThrow(
"discord gateway did not reach READY within 15000ms after a forced reconnect",
);
await vi.advanceTimersByTimeAsync(31_000);
await expect(lifecyclePromise).rejects.toThrow(
"discord gateway did not reach READY within 15000ms after restart",
);
expect(gateway.disconnect).toHaveBeenCalledTimes(1);
expect(gateway.connect).toHaveBeenCalledTimes(1);
expect(gateway.connect).toHaveBeenCalledWith(false);
@ -605,7 +331,7 @@ describe("runDiscordGatewayLifecycle", () => {
});
});
it("throws queued non-disallowed fatal gateway errors", async () => {
it("throws queued fatal startup gateway errors", async () => {
const { lifecycleParams, start, stop, threadStop, gatewaySupervisor } = createLifecycleHarness({
pendingGatewayEvents: [createGatewayEvent("fatal", "Fatal Gateway error: 4000")],
});
@ -623,6 +349,29 @@ describe("runDiscordGatewayLifecycle", () => {
});
});
it("throws queued reconnect exhaustion errors", async () => {
const { lifecycleParams, start, stop, threadStop, gatewaySupervisor } = createLifecycleHarness({
pendingGatewayEvents: [
createGatewayEvent(
"reconnect-exhausted",
"Max reconnect attempts (50) reached after code 1005",
),
],
});
await expect(runDiscordGatewayLifecycle(lifecycleParams)).rejects.toThrow(
"Max reconnect attempts (50) reached after code 1005",
);
expectLifecycleCleanup({
start,
stop,
threadStop,
waitCalls: 0,
gatewaySupervisor,
});
});
it("surfaces fatal startup gateway errors while waiting for READY", async () => {
vi.useFakeTimers();
try {
@ -642,8 +391,8 @@ describe("runDiscordGatewayLifecycle", () => {
const lifecyclePromise = runDiscordGatewayLifecycle(lifecycleParams);
lifecyclePromise.catch(() => {});
await vi.advanceTimersByTimeAsync(1_500);
await expect(lifecyclePromise).rejects.toThrow("Fatal Gateway error: 4001");
await expect(lifecyclePromise).rejects.toThrow("Fatal Gateway error: 4001");
expect(runtimeError).toHaveBeenCalledWith(
expect.stringContaining("discord gateway error: Error: Fatal Gateway error: 4001"),
);
@ -661,247 +410,111 @@ describe("runDiscordGatewayLifecycle", () => {
}
});
it("retries stalled HELLO with resume before forcing fresh identify", async () => {
vi.useFakeTimers();
try {
const { emitter, gateway } = createGatewayHarness({
state: {
sessionId: "session-1",
resumeGatewayUrl: "wss://gateway.discord.gg",
sequence: 123,
},
sequence: 123,
});
gateway.isConnected = true;
getDiscordGatewayEmitterMock.mockReturnValueOnce(emitter);
waitForDiscordGatewayStopMock.mockImplementationOnce(async () => {
emitter.emit("debug", "WebSocket connection closed with code 1006");
gateway.isConnected = false;
await emitGatewayOpenAndWait(emitter);
await emitGatewayOpenAndWait(emitter);
await emitGatewayOpenAndWait(emitter);
});
const { lifecycleParams } = createLifecycleHarness({ gateway });
await expect(runDiscordGatewayLifecycle(lifecycleParams)).resolves.toBeUndefined();
expect(gateway.disconnect).toHaveBeenCalledTimes(3);
expect(gateway.connect).toHaveBeenNthCalledWith(1, true);
expect(gateway.connect).toHaveBeenNthCalledWith(2, true);
expect(gateway.connect).toHaveBeenNthCalledWith(3, false);
expectGatewaySessionStateCleared(gateway);
} finally {
vi.useRealTimers();
}
});
it("resets HELLO stall counter after a successful reconnect that drops quickly", async () => {
vi.useFakeTimers();
try {
const { emitter, gateway } = createGatewayHarness({
state: {
sessionId: "session-2",
resumeGatewayUrl: "wss://gateway.discord.gg",
sequence: 456,
},
sequence: 456,
});
gateway.isConnected = true;
getDiscordGatewayEmitterMock.mockReturnValueOnce(emitter);
waitForDiscordGatewayStopMock.mockImplementationOnce(async () => {
emitter.emit("debug", "WebSocket connection closed with code 1006");
gateway.isConnected = false;
await emitGatewayOpenAndWait(emitter);
await emitGatewayOpenAndWait(emitter);
// Successful reconnect (READY/RESUMED sets isConnected=true), then
// quick drop before the HELLO timeout window finishes.
gateway.isConnected = true;
await emitGatewayOpenAndWait(emitter, 10);
emitter.emit("debug", "WebSocket connection closed with code 1006");
gateway.isConnected = false;
await emitGatewayOpenAndWait(emitter);
await emitGatewayOpenAndWait(emitter);
});
const { lifecycleParams } = createLifecycleHarness({ gateway });
await expect(runDiscordGatewayLifecycle(lifecycleParams)).resolves.toBeUndefined();
expect(gateway.connect).toHaveBeenCalledTimes(4);
expect(gateway.connect).toHaveBeenNthCalledWith(1, true);
expect(gateway.connect).toHaveBeenNthCalledWith(2, true);
expect(gateway.connect).toHaveBeenNthCalledWith(3, true);
expect(gateway.connect).toHaveBeenNthCalledWith(4, true);
expect(gateway.connect).not.toHaveBeenCalledWith(false);
} finally {
vi.useRealTimers();
}
});
it("force-stops when reconnect stalls after a close event", async () => {
vi.useFakeTimers();
try {
const { emitter, gateway } = createGatewayHarness();
gateway.isConnected = true;
getDiscordGatewayEmitterMock.mockReturnValueOnce(emitter);
waitForDiscordGatewayStopMock.mockImplementationOnce(
(waitParams: WaitForDiscordGatewayStopParams) =>
new Promise<void>((_resolve, reject) => {
waitParams.registerForceStop?.((err) => reject(err));
}),
);
const { lifecycleParams } = createLifecycleHarness({ gateway });
const lifecyclePromise = runDiscordGatewayLifecycle(lifecycleParams);
lifecyclePromise.catch(() => {});
emitter.emit("debug", "WebSocket connection closed with code 1006");
await vi.advanceTimersByTimeAsync(5 * 60_000 + 1_000);
await expect(lifecyclePromise).rejects.toThrow("reconnect watchdog timeout");
} finally {
vi.useRealTimers();
}
});
it("does not force-stop when reconnect resumes before watchdog timeout", async () => {
vi.useFakeTimers();
try {
const { emitter, gateway } = createGatewayHarness();
gateway.isConnected = true;
getDiscordGatewayEmitterMock.mockReturnValueOnce(emitter);
let resolveWait: (() => void) | undefined;
waitForDiscordGatewayStopMock.mockImplementationOnce(
(waitParams: WaitForDiscordGatewayStopParams) =>
new Promise<void>((resolve, reject) => {
resolveWait = resolve;
waitParams.registerForceStop?.((err) => reject(err));
}),
);
const { lifecycleParams, runtimeLog } = createLifecycleHarness({ gateway });
const lifecyclePromise = runDiscordGatewayLifecycle(lifecycleParams);
emitter.emit("debug", "WebSocket connection closed with code 1006");
await vi.advanceTimersByTimeAsync(60_000);
gateway.isConnected = true;
emitter.emit("debug", "WebSocket connection opened");
await vi.advanceTimersByTimeAsync(5 * 60_000 + 1_000);
expect(runtimeLog).not.toHaveBeenCalledWith(
expect.stringContaining("reconnect watchdog timeout"),
);
resolveWait?.();
await expect(lifecyclePromise).resolves.toBeUndefined();
} finally {
vi.useRealTimers();
}
});
it("suppresses reconnect-exhausted already queued before shutdown", async () => {
const pendingGatewayEvents: DiscordGatewayEvent[] = [];
const abortController = new AbortController();
const emitter = new EventEmitter();
const gateway: MockGateway = {
isConnected: true,
options: { intents: 0, reconnect: { maxAttempts: 50 } } as GatewayPlugin["options"],
disconnect: vi.fn(),
connect: vi.fn(),
emitter,
};
it("pushes disconnected status when Carbon closes after startup", async () => {
const { emitter, gateway } = createGatewayHarness();
gateway.isConnected = true;
getDiscordGatewayEmitterMock.mockReturnValueOnce(emitter);
const { lifecycleParams, runtimeLog, runtimeError } = createLifecycleHarness({
gateway,
pendingGatewayEvents,
});
lifecycleParams.abortSignal = abortController.signal;
// Start lifecycle; it yields at execApprovalsHandler.start(). We then
// queue a reconnect-exhausted event and abort. The lifecycle resumes and
// drains the queued event before shutdown teardown flips lifecycleStopping,
// so drainPending must treat it as a graceful stop.
const lifecyclePromise = runDiscordGatewayLifecycle(lifecycleParams);
pendingGatewayEvents.push(
createGatewayEvent(
"reconnect-exhausted",
"Max reconnect attempts (0) reached after code 1005",
),
);
abortController.abort();
await expect(lifecyclePromise).resolves.toBeUndefined();
expect(runtimeLog).not.toHaveBeenCalledWith(
expect.stringContaining("ignoring expected reconnect-exhausted during shutdown"),
);
expect(runtimeError).toHaveBeenCalledWith(expect.stringContaining("Max reconnect attempts"));
});
it("rejects reconnect-exhausted queued before startup when shutdown has not begun", async () => {
const pendingGatewayEvents: DiscordGatewayEvent[] = [];
const emitter = new EventEmitter();
const gateway: MockGateway = {
isConnected: true,
options: { intents: 0, reconnect: { maxAttempts: 50 } } as GatewayPlugin["options"],
disconnect: vi.fn(),
connect: vi.fn(),
emitter,
};
getDiscordGatewayEmitterMock.mockReturnValueOnce(emitter);
const { lifecycleParams } = createLifecycleHarness({
gateway,
pendingGatewayEvents,
waitForDiscordGatewayStopMock.mockImplementationOnce(async () => {
emitter.emit("debug", "Gateway websocket closed: 1006");
});
pendingGatewayEvents.push(
createGatewayEvent(
"reconnect-exhausted",
"Max reconnect attempts (0) reached after code 1005",
),
);
await expect(runDiscordGatewayLifecycle(lifecycleParams)).rejects.toThrow(
"Max reconnect attempts",
);
});
it("does not push connected: true when abortSignal is already aborted", async () => {
const emitter = new EventEmitter();
const gateway: MockGateway = {
isConnected: true,
options: { intents: 0, reconnect: { maxAttempts: 3 } } as GatewayPlugin["options"],
disconnect: vi.fn(),
connect: vi.fn(),
emitter,
};
getDiscordGatewayEmitterMock.mockReturnValueOnce(emitter);
const abortController = new AbortController();
abortController.abort();
const statusUpdates: Array<Record<string, unknown>> = [];
const statusSink = (patch: Record<string, unknown>) => {
statusUpdates.push({ ...patch });
};
const { lifecycleParams } = createLifecycleHarness({ gateway });
lifecycleParams.abortSignal = abortController.signal;
(lifecycleParams as Record<string, unknown>).statusSink = statusSink;
const { lifecycleParams, statusSink } = createLifecycleHarness({ gateway });
await expect(runDiscordGatewayLifecycle(lifecycleParams)).resolves.toBeUndefined();
// onAbort should have pushed connected: false
const connectedFalse = statusUpdates.find((s) => s.connected === false);
expect(connectedFalse).toEqual(expect.objectContaining({ connected: false }));
expect(statusSink).toHaveBeenCalledWith(
expect.objectContaining({
connected: false,
lastDisconnect: expect.objectContaining({ status: 1006 }),
}),
);
});
// No connected: true should appear — the isConnected check must be
// guarded by !lifecycleStopping to avoid contradicting the abort.
const connectedTrue = statusUpdates.find((s) => s.connected === true);
expect(connectedTrue).toBeUndefined();
it("pushes disconnected status when Carbon schedules a reconnect", async () => {
const { emitter, gateway } = createGatewayHarness();
gateway.isConnected = true;
getDiscordGatewayEmitterMock.mockReturnValueOnce(emitter);
waitForDiscordGatewayStopMock.mockImplementationOnce(async () => {
emitter.emit("debug", "Gateway reconnect scheduled in 1000ms (zombie, resume=true)");
});
const { lifecycleParams, statusSink } = createLifecycleHarness({ gateway });
await expect(runDiscordGatewayLifecycle(lifecycleParams)).resolves.toBeUndefined();
expect(statusSink).toHaveBeenCalledWith(
expect.objectContaining({
connected: false,
lastError: "Gateway reconnect scheduled in 1000ms (zombie, resume=true)",
}),
);
});
it("pushes connected status when a runtime reconnect becomes ready", async () => {
vi.useFakeTimers();
try {
const { emitter, gateway } = createGatewayHarness();
gateway.isConnected = true;
getDiscordGatewayEmitterMock.mockReturnValueOnce(emitter);
waitForDiscordGatewayStopMock.mockImplementationOnce(async () => {
gateway.isConnected = false;
emitter.emit("debug", "Gateway websocket opened");
setTimeout(() => {
gateway.isConnected = true;
}, 1_000);
await vi.advanceTimersByTimeAsync(1_500);
});
const { lifecycleParams, statusSink } = createLifecycleHarness({ gateway });
await expect(runDiscordGatewayLifecycle(lifecycleParams)).resolves.toBeUndefined();
expect(statusSink).toHaveBeenCalledWith(expect.objectContaining({ connected: false }));
expect(statusSink).toHaveBeenCalledWith(
expect.objectContaining({
connected: true,
lastDisconnect: null,
}),
);
} finally {
vi.useRealTimers();
}
});
it("force-stops when a runtime reconnect opens but never becomes ready", async () => {
vi.useFakeTimers();
try {
const { emitter, gateway } = createGatewayHarness();
gateway.isConnected = true;
getDiscordGatewayEmitterMock.mockReturnValueOnce(emitter);
waitForDiscordGatewayStopMock.mockImplementationOnce(
(params: WaitForDiscordGatewayStopParams) =>
new Promise<void>((_resolve, reject) => {
params.registerForceStop?.((err) => reject(err));
gateway.isConnected = false;
emitter.emit("debug", "Gateway websocket opened");
}),
);
const { lifecycleParams, runtimeError, statusSink } = createLifecycleHarness({ gateway });
const lifecyclePromise = runDiscordGatewayLifecycle(lifecycleParams);
lifecyclePromise.catch(() => {});
await vi.advanceTimersByTimeAsync(30_500);
await expect(lifecyclePromise).rejects.toThrow(
"discord gateway opened but did not reach READY within 30000ms",
);
expect(runtimeError).toHaveBeenCalledWith(
expect.stringContaining("did not reach READY within 30000ms"),
);
expect(statusSink).toHaveBeenCalledWith(
expect.objectContaining({
connected: false,
lastDisconnect: expect.objectContaining({ error: "runtime-not-ready" }),
}),
);
} finally {
vi.useRealTimers();
}
});
});

View File

@ -1,3 +1,4 @@
import { createConnectedChannelStatusPatch } from "openclaw/plugin-sdk/gateway-runtime";
import { danger } from "openclaw/plugin-sdk/runtime-env";
import type { RuntimeEnv } from "openclaw/plugin-sdk/runtime-env";
import { attachDiscordGatewayLogging } from "../gateway-logging.js";
@ -6,14 +7,233 @@ import type { DiscordVoiceManager } from "../voice/manager.js";
import type { MutableDiscordGateway } from "./gateway-handle.js";
import { registerGateway, unregisterGateway } from "./gateway-registry.js";
import type { DiscordGatewayEvent, DiscordGatewaySupervisor } from "./gateway-supervisor.js";
import { createDiscordGatewayReconnectController } from "./provider.lifecycle.reconnect.js";
import type { DiscordMonitorStatusSink } from "./status.js";
const DISCORD_GATEWAY_READY_TIMEOUT_MS = 15_000;
const DISCORD_GATEWAY_RUNTIME_READY_TIMEOUT_MS = 30_000;
const DISCORD_GATEWAY_READY_POLL_MS = 250;
type ExecApprovalsHandler = {
start: () => Promise<void>;
stop: () => Promise<void>;
};
type GatewayReadyWaitResult = "ready" | "stopped" | "timeout";
function parseGatewayCloseCode(message: string): number | undefined {
const match = /Gateway websocket closed:\s*(\d{3,5})/.exec(message);
if (!match?.[1]) {
return undefined;
}
const code = Number.parseInt(match[1], 10);
return Number.isFinite(code) ? code : undefined;
}
function createGatewayStatusObserver(params: {
gateway?: Pick<MutableDiscordGateway, "isConnected">;
abortSignal?: AbortSignal;
runtime: RuntimeEnv;
pushStatus: (patch: Parameters<DiscordMonitorStatusSink>[0]) => void;
isLifecycleStopping: () => boolean;
}) {
let forceStopHandler: ((err: unknown) => void) | undefined;
let queuedForceStopError: unknown;
let readyPollId: ReturnType<typeof setInterval> | undefined;
let readyTimeoutId: ReturnType<typeof setTimeout> | undefined;
const shouldStop = () => params.abortSignal?.aborted || params.isLifecycleStopping();
const clearReadyWatch = () => {
if (readyPollId) {
clearInterval(readyPollId);
readyPollId = undefined;
}
if (readyTimeoutId) {
clearTimeout(readyTimeoutId);
readyTimeoutId = undefined;
}
};
const triggerForceStop = (err: unknown) => {
if (forceStopHandler) {
forceStopHandler(err);
return;
}
queuedForceStopError = err;
};
const pushConnectedStatus = (at: number) => {
params.pushStatus({
...createConnectedChannelStatusPatch(at),
lastDisconnect: null,
lastError: null,
});
};
const startReadyWatch = () => {
clearReadyWatch();
const pollConnected = () => {
if (shouldStop()) {
clearReadyWatch();
return;
}
if (!params.gateway?.isConnected) {
return;
}
clearReadyWatch();
pushConnectedStatus(Date.now());
};
pollConnected();
if (!readyTimeoutId) {
readyPollId = setInterval(pollConnected, DISCORD_GATEWAY_READY_POLL_MS);
readyPollId.unref?.();
readyTimeoutId = setTimeout(() => {
clearReadyWatch();
if (shouldStop() || params.gateway?.isConnected) {
return;
}
const at = Date.now();
const error = new Error(
`discord gateway opened but did not reach READY within ${DISCORD_GATEWAY_RUNTIME_READY_TIMEOUT_MS}ms`,
);
params.pushStatus({
connected: false,
lastEventAt: at,
lastDisconnect: {
at,
error: "runtime-not-ready",
},
lastError: "runtime-not-ready",
});
params.runtime.error?.(danger(error.message));
triggerForceStop(error);
}, DISCORD_GATEWAY_RUNTIME_READY_TIMEOUT_MS);
readyTimeoutId.unref?.();
}
};
const onGatewayDebug = (msg: unknown) => {
if (shouldStop()) {
return;
}
const at = Date.now();
const message = String(msg);
if (message.includes("Gateway websocket opened")) {
params.pushStatus({ connected: false, lastEventAt: at });
startReadyWatch();
return;
}
if (message.includes("Gateway websocket closed")) {
clearReadyWatch();
const code = parseGatewayCloseCode(message);
params.pushStatus({
connected: false,
lastEventAt: at,
lastDisconnect: {
at,
...(code !== undefined ? { status: code } : {}),
},
});
return;
}
if (message.includes("Gateway reconnect scheduled in")) {
clearReadyWatch();
params.pushStatus({
connected: false,
lastEventAt: at,
lastError: message,
});
}
};
return {
onGatewayDebug,
clearReadyWatch,
registerForceStop: (handler: (err: unknown) => void) => {
forceStopHandler = handler;
if (queuedForceStopError !== undefined) {
const err = queuedForceStopError;
queuedForceStopError = undefined;
handler(err);
}
},
dispose: () => {
clearReadyWatch();
forceStopHandler = undefined;
queuedForceStopError = undefined;
},
};
}
async function waitForGatewayReady(params: {
gateway?: Pick<MutableDiscordGateway, "connect" | "disconnect" | "isConnected">;
abortSignal?: AbortSignal;
beforePoll?: () => Promise<"continue" | "stop"> | "continue" | "stop";
pushStatus?: (patch: Parameters<DiscordMonitorStatusSink>[0]) => void;
runtime: RuntimeEnv;
beforeRestart?: () => void;
}): Promise<void> {
const waitUntilReady = async (): Promise<GatewayReadyWaitResult> => {
const deadlineAt = Date.now() + DISCORD_GATEWAY_READY_TIMEOUT_MS;
while (!params.abortSignal?.aborted) {
if ((await params.beforePoll?.()) === "stop") {
return "stopped";
}
if (params.gateway?.isConnected ?? true) {
const at = Date.now();
params.pushStatus?.({
...createConnectedChannelStatusPatch(at),
lastDisconnect: null,
});
return "ready";
}
if (Date.now() >= deadlineAt) {
return "timeout";
}
await new Promise<void>((resolve) => {
const timeout = setTimeout(resolve, DISCORD_GATEWAY_READY_POLL_MS);
timeout.unref?.();
});
}
return "stopped";
};
const firstAttempt = await waitUntilReady();
if (firstAttempt !== "timeout") {
return;
}
if (!params.gateway) {
throw new Error(
`discord gateway did not reach READY within ${DISCORD_GATEWAY_READY_TIMEOUT_MS}ms`,
);
}
const restartAt = Date.now();
params.runtime.error?.(
danger(
`discord: gateway was not ready after ${DISCORD_GATEWAY_READY_TIMEOUT_MS}ms; restarting gateway`,
),
);
params.pushStatus?.({
connected: false,
lastEventAt: restartAt,
lastDisconnect: {
at: restartAt,
error: "startup-not-ready",
},
lastError: "startup-not-ready",
});
if (params.abortSignal?.aborted) {
return;
}
params.beforeRestart?.();
params.gateway.disconnect();
params.gateway.connect(false);
if ((await waitUntilReady()) === "timeout") {
throw new Error(
`discord gateway did not reach READY within ${DISCORD_GATEWAY_READY_TIMEOUT_MS}ms after restart`,
);
}
}
export async function runDiscordGatewayLifecycle(params: {
accountId: string;
gateway?: MutableDiscordGateway;
@ -41,21 +261,19 @@ export async function runDiscordGatewayLifecycle(params: {
const pushStatus = (patch: Parameters<DiscordMonitorStatusSink>[0]) => {
params.statusSink?.(patch);
};
const reconnectController = createDiscordGatewayReconnectController({
accountId: params.accountId,
const statusObserver = createGatewayStatusObserver({
gateway,
runtime: params.runtime,
abortSignal: params.abortSignal,
runtime: params.runtime,
pushStatus,
isLifecycleStopping: () => lifecycleStopping,
drainPendingGatewayErrors: () => drainPendingGatewayErrors(),
});
const onGatewayDebug = reconnectController.onGatewayDebug;
gatewayEmitter?.on("debug", onGatewayDebug);
gatewayEmitter?.on("debug", statusObserver.onGatewayDebug);
let sawDisallowedIntents = false;
const handleGatewayEvent = (event: DiscordGatewayEvent): "continue" | "stop" => {
if (event.type === "disallowed-intents") {
lifecycleStopping = true;
sawDisallowedIntents = true;
params.runtime.error?.(
danger(
@ -64,14 +282,8 @@ export async function runDiscordGatewayLifecycle(params: {
);
return "stop";
}
// When we deliberately set maxAttempts=0 and disconnected (health-monitor
// stale-socket restart), Carbon fires "Max reconnect attempts (0)". This
// is expected — log at info instead of error to avoid false alarms.
if (lifecycleStopping && event.type === "reconnect-exhausted") {
params.runtime.log?.(
`discord: ignoring expected reconnect-exhausted during shutdown: ${event.message}`,
);
return "stop";
if (event.shouldStopLifecycle) {
lifecycleStopping = true;
}
params.runtime.error?.(danger(`discord gateway error: ${event.message}`));
return event.shouldStopLifecycle ? "stop" : "continue";
@ -82,14 +294,7 @@ export async function runDiscordGatewayLifecycle(params: {
if (decision !== "stop") {
return "continue";
}
// Don't throw for expected shutdown events. `reconnect-exhausted` can be
// queued just before an abort-driven shutdown flips `lifecycleStopping`,
// so only suppress it when shutdown is already underway.
if (
event.type === "disallowed-intents" ||
(event.type === "reconnect-exhausted" &&
(lifecycleStopping || params.abortSignal?.aborted === true))
) {
if (event.type === "disallowed-intents") {
return "stop";
}
throw event.err;
@ -104,7 +309,14 @@ export async function runDiscordGatewayLifecycle(params: {
return;
}
await reconnectController.ensureStartupReady();
await waitForGatewayReady({
gateway,
abortSignal: params.abortSignal,
beforePoll: drainPendingGatewayErrors,
pushStatus,
runtime: params.runtime,
beforeRestart: statusObserver.clearReadyWatch,
});
if (drainPendingGatewayErrors() === "stop") {
return;
@ -119,7 +331,7 @@ export async function runDiscordGatewayLifecycle(params: {
abortSignal: params.abortSignal,
gatewaySupervisor: params.gatewaySupervisor,
onGatewayEvent: handleGatewayEvent,
registerForceStop: reconnectController.registerForceStop,
registerForceStop: statusObserver.registerForceStop,
});
} catch (err) {
if (!sawDisallowedIntents && !params.isDisallowedIntentsError(err)) {
@ -130,8 +342,8 @@ export async function runDiscordGatewayLifecycle(params: {
params.gatewaySupervisor.detachLifecycle();
unregisterGateway(params.accountId);
stopGatewayLogging();
reconnectController.dispose();
gatewayEmitter?.removeListener("debug", onGatewayDebug);
statusObserver.dispose();
gatewayEmitter?.removeListener("debug", statusObserver.onGatewayDebug);
if (params.voiceManager) {
await params.voiceManager.destroy();
params.voiceManagerRef.current = null;

View File

@ -703,7 +703,7 @@ describe("monitorDiscordProvider", () => {
name === "gateway" ? gateway : undefined,
);
clientFetchUserMock.mockImplementationOnce(async () => {
emitter.emit("debug", "WebSocket connection opened");
emitter.emit("debug", "Gateway websocket opened");
return { id: "bot-1", username: "Molty" };
});
isVerboseMock.mockReturnValue(true);
@ -722,7 +722,7 @@ describe("monitorDiscordProvider", () => {
expect(messages.some((msg) => msg.includes("fetch-bot-identity:done"))).toBe(true);
expect(
messages.some(
(msg) => msg.includes("gateway-debug") && msg.includes("WebSocket connection opened"),
(msg) => msg.includes("gateway-debug") && msg.includes("Gateway websocket opened"),
),
).toBe(true);
});