diff --git a/CHANGELOG.md b/CHANGELOG.md index f3002819abe..2cc1570b2cd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -98,6 +98,7 @@ Docs: https://docs.openclaw.ai - Slack/Usage footer formatting: wrap session keys in inline code in full response-usage footers so Slack does not parse colon-delimited session segments as emoji shortcodes. (#30258) Thanks @pushkarsingh32. - Slack/Socket Mode slash startup: treat `app.options()` registration as best-effort and fall back to static arg menus when listener registration fails, preventing Slack monitor startup crash loops on receiver init edge cases. (#21715) - Slack/Legacy streaming config: map boolean `channels.slack.streaming=false` to unified streaming mode `off` (with `nativeStreaming=false`) so legacy configs correctly disable draft preview/native streaming instead of defaulting to `partial`. (#25990) Thanks @chilu18. +- Slack/Socket reconnect reliability: reconnect Socket Mode after disconnect/start failures using bounded exponential backoff with abort-aware waits, while preserving clean shutdown behavior and adding disconnect/error helper tests. (#27232) Thanks @pandego. - Onboarding/Custom providers: raise default custom-provider model context window to the runtime hard minimum (16k) and auto-heal existing custom model entries below that threshold during reconfiguration, preventing immediate `Model context window too small (4096 tokens)` failures. (#21653) Thanks @r4jiv007. - Web UI/Assistant text: strip internal `...` scaffolding from rendered assistant messages (while preserving code-fence literals), preventing memory-context leakage in chat output for models that echo internal blocks. (#29851) Thanks @Valkster70. - Dashboard/Sessions: allow authenticated Control UI clients to delete and patch sessions while still blocking regular webchat clients from session mutation RPCs, fixing Dashboard session delete failures. (#21264) Thanks @jskoiz. diff --git a/src/slack/monitor/provider.reconnect.test.ts b/src/slack/monitor/provider.reconnect.test.ts new file mode 100644 index 00000000000..f2e36ad1fd0 --- /dev/null +++ b/src/slack/monitor/provider.reconnect.test.ts @@ -0,0 +1,45 @@ +import { describe, expect, it } from "vitest"; +import { __testing } from "./provider.js"; + +class FakeEmitter { + private listeners = new Map void>>(); + + on(event: string, listener: (...args: unknown[]) => void) { + const bucket = this.listeners.get(event) ?? new Set<(...args: unknown[]) => void>(); + bucket.add(listener); + this.listeners.set(event, bucket); + } + + off(event: string, listener: (...args: unknown[]) => void) { + this.listeners.get(event)?.delete(listener); + } + + emit(event: string, ...args: unknown[]) { + for (const listener of this.listeners.get(event) ?? []) { + listener(...args); + } + } +} + +describe("slack socket reconnect helpers", () => { + it("resolves disconnect waiter on socket disconnect event", async () => { + const client = new FakeEmitter(); + const app = { receiver: { client } }; + + const waiter = __testing.waitForSlackSocketDisconnect(app as never); + client.emit("disconnected"); + + await expect(waiter).resolves.toEqual({ event: "disconnect" }); + }); + + it("resolves disconnect waiter on socket error event", async () => { + const client = new FakeEmitter(); + const app = { receiver: { client } }; + const err = new Error("dns down"); + + const waiter = __testing.waitForSlackSocketDisconnect(app as never); + client.emit("error", err); + + await expect(waiter).resolves.toEqual({ event: "error", error: err }); + }); +}); diff --git a/src/slack/monitor/provider.ts b/src/slack/monitor/provider.ts index 168c5873b0a..316791460f9 100644 --- a/src/slack/monitor/provider.ts +++ b/src/slack/monitor/provider.ts @@ -18,6 +18,7 @@ import { } from "../../config/runtime-group-policy.js"; import type { SessionScope } from "../../config/sessions.js"; import { warn } from "../../globals.js"; +import { computeBackoff, sleepWithAbort } from "../../infra/backoff.js"; import { installRequestBodyLimitGuard } from "../../infra/http-body.js"; import { normalizeMainKey } from "../../routing/session-key.js"; import { createNonExitingRuntime, type RuntimeEnv } from "../../runtime.js"; @@ -46,6 +47,100 @@ const { App, HTTPReceiver } = slackBolt; const SLACK_WEBHOOK_MAX_BODY_BYTES = 1024 * 1024; const SLACK_WEBHOOK_BODY_TIMEOUT_MS = 30_000; +const SLACK_SOCKET_RECONNECT_POLICY = { + initialMs: 2_000, + maxMs: 30_000, + factor: 1.8, + jitter: 0.25, + maxAttempts: 12, +} as const; + +type SlackSocketDisconnectEvent = "disconnect" | "unable_to_socket_mode_start" | "error"; + +type EmitterLike = { + on: (event: string, listener: (...args: unknown[]) => void) => unknown; + off: (event: string, listener: (...args: unknown[]) => void) => unknown; +}; + +function getSocketEmitter(app: unknown): EmitterLike | null { + const receiver = (app as { receiver?: unknown }).receiver; + const client = + receiver && typeof receiver === "object" + ? (receiver as { client?: unknown }).client + : undefined; + if (!client || typeof client !== "object") { + return null; + } + const on = (client as { on?: unknown }).on; + const off = (client as { off?: unknown }).off; + if (typeof on !== "function" || typeof off !== "function") { + return null; + } + return { + on: (event, listener) => + ( + on as (this: unknown, event: string, listener: (...args: unknown[]) => void) => unknown + ).call(client, event, listener), + off: (event, listener) => + ( + off as (this: unknown, event: string, listener: (...args: unknown[]) => void) => unknown + ).call(client, event, listener), + }; +} + +function waitForSlackSocketDisconnect( + app: unknown, + abortSignal?: AbortSignal, +): Promise<{ + event: SlackSocketDisconnectEvent; + error?: unknown; +}> { + return new Promise((resolve) => { + const emitter = getSocketEmitter(app); + if (!emitter) { + abortSignal?.addEventListener("abort", () => resolve({ event: "disconnect" }), { + once: true, + }); + return; + } + + const disconnectListener = () => resolveOnce({ event: "disconnect" }); + const startFailListener = () => resolveOnce({ event: "unable_to_socket_mode_start" }); + const errorListener = (error: unknown) => resolveOnce({ event: "error", error }); + const abortListener = () => resolveOnce({ event: "disconnect" }); + + const cleanup = () => { + emitter.off("disconnected", disconnectListener); + emitter.off("unable_to_socket_mode_start", startFailListener); + emitter.off("error", errorListener); + abortSignal?.removeEventListener("abort", abortListener); + }; + + const resolveOnce = (value: { event: SlackSocketDisconnectEvent; error?: unknown }) => { + cleanup(); + resolve(value); + }; + + emitter.on("disconnected", disconnectListener); + emitter.on("unable_to_socket_mode_start", startFailListener); + emitter.on("error", errorListener); + abortSignal?.addEventListener("abort", abortListener, { once: true }); + }); +} + +function formatUnknownError(error: unknown): string { + if (error instanceof Error) { + return error.message; + } + if (typeof error === "string") { + return error; + } + try { + return JSON.stringify(error); + } catch { + return "unknown error"; + } +} function parseApiAppIdFromAppToken(raw?: string) { const token = raw?.trim(); @@ -362,19 +457,74 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { try { if (slackMode === "socket") { - await app.start(); - runtime.log?.("slack socket mode connected"); + let reconnectAttempts = 0; + while (!opts.abortSignal?.aborted) { + try { + await app.start(); + reconnectAttempts = 0; + runtime.log?.("slack socket mode connected"); + } catch (err) { + reconnectAttempts += 1; + if ( + SLACK_SOCKET_RECONNECT_POLICY.maxAttempts > 0 && + reconnectAttempts >= SLACK_SOCKET_RECONNECT_POLICY.maxAttempts + ) { + throw err; + } + const delayMs = computeBackoff(SLACK_SOCKET_RECONNECT_POLICY, reconnectAttempts); + runtime.error?.( + `slack socket mode failed to start. retry ${reconnectAttempts}/${SLACK_SOCKET_RECONNECT_POLICY.maxAttempts || "∞"} in ${Math.round(delayMs / 1000)}s (${formatUnknownError(err)})`, + ); + try { + await sleepWithAbort(delayMs, opts.abortSignal); + } catch { + break; + } + continue; + } + + if (opts.abortSignal?.aborted) { + break; + } + + const disconnect = await waitForSlackSocketDisconnect(app, opts.abortSignal); + if (opts.abortSignal?.aborted) { + break; + } + + reconnectAttempts += 1; + if ( + SLACK_SOCKET_RECONNECT_POLICY.maxAttempts > 0 && + reconnectAttempts >= SLACK_SOCKET_RECONNECT_POLICY.maxAttempts + ) { + throw new Error( + `Slack socket mode reconnect max attempts reached (${reconnectAttempts}/${SLACK_SOCKET_RECONNECT_POLICY.maxAttempts}) after ${disconnect.event}`, + ); + } + + const delayMs = computeBackoff(SLACK_SOCKET_RECONNECT_POLICY, reconnectAttempts); + runtime.error?.( + `slack socket disconnected (${disconnect.event}). retry ${reconnectAttempts}/${SLACK_SOCKET_RECONNECT_POLICY.maxAttempts || "∞"} in ${Math.round(delayMs / 1000)}s${ + disconnect.error ? ` (${formatUnknownError(disconnect.error)})` : "" + }`, + ); + await app.stop().catch(() => undefined); + try { + await sleepWithAbort(delayMs, opts.abortSignal); + } catch { + break; + } + } } else { runtime.log?.(`slack http mode listening at ${slackWebhookPath}`); + if (!opts.abortSignal?.aborted) { + await new Promise((resolve) => { + opts.abortSignal?.addEventListener("abort", () => resolve(), { + once: true, + }); + }); + } } - if (opts.abortSignal?.aborted) { - return; - } - await new Promise((resolve) => { - opts.abortSignal?.addEventListener("abort", () => resolve(), { - once: true, - }); - }); } finally { opts.abortSignal?.removeEventListener("abort", stopOnAbort); unregisterHttpHandler?.(); @@ -385,4 +535,6 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { export const __testing = { resolveSlackRuntimeGroupPolicy: resolveOpenProviderRuntimeGroupPolicy, resolveDefaultGroupPolicy, + getSocketEmitter, + waitForSlackSocketDisconnect, };