From bd4237c16c2c7d3cd0a30e58dd264938306bb03d Mon Sep 17 00:00:00 2001 From: Lin Z Date: Wed, 25 Mar 2026 23:32:21 +0800 Subject: [PATCH] fix(feishu): close WebSocket connections on monitor stop (#52844) * fix(feishu): close WebSocket connections on monitor stop/abort Co-Authored-By: Claude Opus 4.6 (1M context) * test(feishu): add WebSocket cleanup tests Co-Authored-By: Claude Opus 4.6 (1M context) * fix(feishu): close WebSocket connections on monitor stop (#52844) (thanks @schumilin) --------- Co-authored-by: Claude Opus 4.6 (1M context) Co-authored-by: George Zhang --- CHANGELOG.md | 1 + extensions/feishu/src/monitor.cleanup.test.ts | 122 ++++++++++++++++++ extensions/feishu/src/monitor.state.ts | 13 ++ extensions/feishu/src/monitor.test-mocks.ts | 4 +- extensions/feishu/src/monitor.transport.ts | 23 +++- 5 files changed, 155 insertions(+), 8 deletions(-) create mode 100644 extensions/feishu/src/monitor.cleanup.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index fbd00a0f345..5a3aa0ec8ba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ Docs: https://docs.openclaw.ai ### Fixes - Agents/sandbox: honor `tools.sandbox.tools.alsoAllow`, let explicit sandbox re-allows remove matching built-in default-deny tools, and keep sandbox explain/error guidance aligned with the effective sandbox tool policy. (#54492) Thanks @ngutman. +- Feishu: close WebSocket connections on monitor stop/abort so ghost connections no longer persist, preventing duplicate event processing and resource leaks across restart cycles. (#52844) Thanks @schumilin. ## 2026.3.24-beta.2 diff --git a/extensions/feishu/src/monitor.cleanup.test.ts b/extensions/feishu/src/monitor.cleanup.test.ts new file mode 100644 index 00000000000..b74417f0dd5 --- /dev/null +++ b/extensions/feishu/src/monitor.cleanup.test.ts @@ -0,0 +1,122 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import { botNames, botOpenIds, stopFeishuMonitorState, wsClients } from "./monitor.state.js"; +import type { ResolvedFeishuAccount } from "./types.js"; + +const createFeishuWSClientMock = vi.hoisted(() => vi.fn()); + +vi.mock("./client.js", () => ({ + createFeishuWSClient: createFeishuWSClientMock, +})); + +import { monitorWebSocket } from "./monitor.transport.js"; + +type MockWsClient = { + start: ReturnType; + close: ReturnType; +}; + +function createAccount(accountId: string): ResolvedFeishuAccount { + return { + accountId, + enabled: true, + configured: true, + appId: `cli_${accountId}`, + appSecret: `secret_${accountId}`, // pragma: allowlist secret + domain: "feishu", + config: { + enabled: true, + connectionMode: "websocket", + }, + } as ResolvedFeishuAccount; +} + +function createWsClient(): MockWsClient { + return { + start: vi.fn(), + close: vi.fn(), + }; +} + +afterEach(() => { + stopFeishuMonitorState(); + vi.clearAllMocks(); +}); + +describe("feishu websocket cleanup", () => { + it("closes the websocket client when the monitor aborts", async () => { + const wsClient = createWsClient(); + createFeishuWSClientMock.mockReturnValue(wsClient); + + const abortController = new AbortController(); + const accountId = "alpha"; + + botOpenIds.set(accountId, "ou_alpha"); + botNames.set(accountId, "Alpha"); + + const monitorPromise = monitorWebSocket({ + account: createAccount(accountId), + accountId, + runtime: { + log: vi.fn(), + error: vi.fn(), + exit: vi.fn(), + }, + abortSignal: abortController.signal, + eventDispatcher: {} as never, + }); + + expect(wsClient.start).toHaveBeenCalledTimes(1); + expect(wsClients.get(accountId)).toBe(wsClient); + + abortController.abort(); + await monitorPromise; + + expect(wsClient.close).toHaveBeenCalledTimes(1); + expect(wsClients.has(accountId)).toBe(false); + expect(botOpenIds.has(accountId)).toBe(false); + expect(botNames.has(accountId)).toBe(false); + }); + + it("closes targeted websocket clients during stop cleanup", () => { + const alphaClient = createWsClient(); + const betaClient = createWsClient(); + + wsClients.set("alpha", alphaClient as never); + wsClients.set("beta", betaClient as never); + botOpenIds.set("alpha", "ou_alpha"); + botOpenIds.set("beta", "ou_beta"); + botNames.set("alpha", "Alpha"); + botNames.set("beta", "Beta"); + + stopFeishuMonitorState("alpha"); + + expect(alphaClient.close).toHaveBeenCalledTimes(1); + expect(betaClient.close).not.toHaveBeenCalled(); + expect(wsClients.has("alpha")).toBe(false); + expect(wsClients.has("beta")).toBe(true); + expect(botOpenIds.has("alpha")).toBe(false); + expect(botOpenIds.has("beta")).toBe(true); + expect(botNames.has("alpha")).toBe(false); + expect(botNames.has("beta")).toBe(true); + }); + + it("closes all websocket clients during global stop cleanup", () => { + const alphaClient = createWsClient(); + const betaClient = createWsClient(); + + wsClients.set("alpha", alphaClient as never); + wsClients.set("beta", betaClient as never); + botOpenIds.set("alpha", "ou_alpha"); + botOpenIds.set("beta", "ou_beta"); + botNames.set("alpha", "Alpha"); + botNames.set("beta", "Beta"); + + stopFeishuMonitorState(); + + expect(alphaClient.close).toHaveBeenCalledTimes(1); + expect(betaClient.close).toHaveBeenCalledTimes(1); + expect(wsClients.size).toBe(0); + expect(botOpenIds.size).toBe(0); + expect(botNames.size).toBe(0); + }); +}); diff --git a/extensions/feishu/src/monitor.state.ts b/extensions/feishu/src/monitor.state.ts index 3100c1da839..4e55109e66a 100644 --- a/extensions/feishu/src/monitor.state.ts +++ b/extensions/feishu/src/monitor.state.ts @@ -104,6 +104,15 @@ const feishuWebhookAnomalyTracker = createWebhookAnomalyTracker({ logEvery: feishuWebhookAnomalyDefaults.logEvery, }); +function closeWsClient(client: Lark.WSClient | undefined): void { + if (!client) return; + try { + client.close(); + } catch { + /* Best-effort cleanup */ + } +} + export function clearFeishuWebhookRateLimitStateForTest(): void { feishuWebhookRateLimiter.clear(); feishuWebhookAnomalyTracker.clear(); @@ -134,6 +143,7 @@ export function recordWebhookStatus( export function stopFeishuMonitorState(accountId?: string): void { if (accountId) { + closeWsClient(wsClients.get(accountId)); wsClients.delete(accountId); const server = httpServers.get(accountId); if (server) { @@ -145,6 +155,9 @@ export function stopFeishuMonitorState(accountId?: string): void { return; } + for (const client of wsClients.values()) { + closeWsClient(client); + } wsClients.clear(); for (const server of httpServers.values()) { server.close(); diff --git a/extensions/feishu/src/monitor.test-mocks.ts b/extensions/feishu/src/monitor.test-mocks.ts index 276d6375464..691337f1f92 100644 --- a/extensions/feishu/src/monitor.test-mocks.ts +++ b/extensions/feishu/src/monitor.test-mocks.ts @@ -1,11 +1,11 @@ import { vi } from "vitest"; export function createFeishuClientMockModule(): { - createFeishuWSClient: () => { start: () => void }; + createFeishuWSClient: () => { start: () => void; close: () => void }; createEventDispatcher: () => { register: () => void }; } { return { - createFeishuWSClient: vi.fn(() => ({ start: vi.fn() })), + createFeishuWSClient: vi.fn(() => ({ start: vi.fn(), close: vi.fn() })), createEventDispatcher: vi.fn(() => ({ register: vi.fn() })), }; } diff --git a/extensions/feishu/src/monitor.transport.ts b/extensions/feishu/src/monitor.transport.ts index 06ffc2c17e9..4ff870f6acb 100644 --- a/extensions/feishu/src/monitor.transport.ts +++ b/extensions/feishu/src/monitor.transport.ts @@ -89,23 +89,35 @@ export async function monitorWebSocket({ eventDispatcher, }: MonitorTransportParams): Promise { const log = runtime?.log ?? console.log; + const error = runtime?.error ?? console.error; log(`feishu[${accountId}]: starting WebSocket connection...`); const wsClient = createFeishuWSClient(account); wsClients.set(accountId, wsClient); return new Promise((resolve, reject) => { + let cleanedUp = false; + const cleanup = () => { - wsClients.delete(accountId); - botOpenIds.delete(accountId); - botNames.delete(accountId); + if (cleanedUp) return; + cleanedUp = true; + abortSignal?.removeEventListener("abort", handleAbort); + try { + wsClient.close(); + } catch (err) { + error(`feishu[${accountId}]: error closing WebSocket client: ${String(err)}`); + } finally { + wsClients.delete(accountId); + botOpenIds.delete(accountId); + botNames.delete(accountId); + } }; - const handleAbort = () => { + function handleAbort() { log(`feishu[${accountId}]: abort signal received, stopping`); cleanup(); resolve(); - }; + } if (abortSignal?.aborted) { cleanup(); @@ -120,7 +132,6 @@ export async function monitorWebSocket({ log(`feishu[${accountId}]: WebSocket client started`); } catch (err) { cleanup(); - abortSignal?.removeEventListener("abort", handleAbort); reject(err); } });