mirror of https://github.com/openclaw/openclaw.git
fix(feishu): close WebSocket connections on monitor stop (#52844)
* fix(feishu): close WebSocket connections on monitor stop/abort Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * test(feishu): add WebSocket cleanup tests Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix(feishu): close WebSocket connections on monitor stop (#52844) (thanks @schumilin) --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Co-authored-by: George Zhang <georgezhangtj97@gmail.com>
This commit is contained in:
parent
edb5123f26
commit
bd4237c16c
|
|
@ -11,6 +11,7 @@ Docs: https://docs.openclaw.ai
|
|||
### Fixes
|
||||
|
||||
- Agents/sandbox: honor `tools.sandbox.tools.alsoAllow`, let explicit sandbox re-allows remove matching built-in default-deny tools, and keep sandbox explain/error guidance aligned with the effective sandbox tool policy. (#54492) Thanks @ngutman.
|
||||
- Feishu: close WebSocket connections on monitor stop/abort so ghost connections no longer persist, preventing duplicate event processing and resource leaks across restart cycles. (#52844) Thanks @schumilin.
|
||||
|
||||
## 2026.3.24-beta.2
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,122 @@
|
|||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import { botNames, botOpenIds, stopFeishuMonitorState, wsClients } from "./monitor.state.js";
|
||||
import type { ResolvedFeishuAccount } from "./types.js";
|
||||
|
||||
const createFeishuWSClientMock = vi.hoisted(() => vi.fn());
|
||||
|
||||
vi.mock("./client.js", () => ({
|
||||
createFeishuWSClient: createFeishuWSClientMock,
|
||||
}));
|
||||
|
||||
import { monitorWebSocket } from "./monitor.transport.js";
|
||||
|
||||
type MockWsClient = {
|
||||
start: ReturnType<typeof vi.fn>;
|
||||
close: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
|
||||
function createAccount(accountId: string): ResolvedFeishuAccount {
|
||||
return {
|
||||
accountId,
|
||||
enabled: true,
|
||||
configured: true,
|
||||
appId: `cli_${accountId}`,
|
||||
appSecret: `secret_${accountId}`, // pragma: allowlist secret
|
||||
domain: "feishu",
|
||||
config: {
|
||||
enabled: true,
|
||||
connectionMode: "websocket",
|
||||
},
|
||||
} as ResolvedFeishuAccount;
|
||||
}
|
||||
|
||||
function createWsClient(): MockWsClient {
|
||||
return {
|
||||
start: vi.fn(),
|
||||
close: vi.fn(),
|
||||
};
|
||||
}
|
||||
|
||||
afterEach(() => {
|
||||
stopFeishuMonitorState();
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
describe("feishu websocket cleanup", () => {
|
||||
it("closes the websocket client when the monitor aborts", async () => {
|
||||
const wsClient = createWsClient();
|
||||
createFeishuWSClientMock.mockReturnValue(wsClient);
|
||||
|
||||
const abortController = new AbortController();
|
||||
const accountId = "alpha";
|
||||
|
||||
botOpenIds.set(accountId, "ou_alpha");
|
||||
botNames.set(accountId, "Alpha");
|
||||
|
||||
const monitorPromise = monitorWebSocket({
|
||||
account: createAccount(accountId),
|
||||
accountId,
|
||||
runtime: {
|
||||
log: vi.fn(),
|
||||
error: vi.fn(),
|
||||
exit: vi.fn(),
|
||||
},
|
||||
abortSignal: abortController.signal,
|
||||
eventDispatcher: {} as never,
|
||||
});
|
||||
|
||||
expect(wsClient.start).toHaveBeenCalledTimes(1);
|
||||
expect(wsClients.get(accountId)).toBe(wsClient);
|
||||
|
||||
abortController.abort();
|
||||
await monitorPromise;
|
||||
|
||||
expect(wsClient.close).toHaveBeenCalledTimes(1);
|
||||
expect(wsClients.has(accountId)).toBe(false);
|
||||
expect(botOpenIds.has(accountId)).toBe(false);
|
||||
expect(botNames.has(accountId)).toBe(false);
|
||||
});
|
||||
|
||||
it("closes targeted websocket clients during stop cleanup", () => {
|
||||
const alphaClient = createWsClient();
|
||||
const betaClient = createWsClient();
|
||||
|
||||
wsClients.set("alpha", alphaClient as never);
|
||||
wsClients.set("beta", betaClient as never);
|
||||
botOpenIds.set("alpha", "ou_alpha");
|
||||
botOpenIds.set("beta", "ou_beta");
|
||||
botNames.set("alpha", "Alpha");
|
||||
botNames.set("beta", "Beta");
|
||||
|
||||
stopFeishuMonitorState("alpha");
|
||||
|
||||
expect(alphaClient.close).toHaveBeenCalledTimes(1);
|
||||
expect(betaClient.close).not.toHaveBeenCalled();
|
||||
expect(wsClients.has("alpha")).toBe(false);
|
||||
expect(wsClients.has("beta")).toBe(true);
|
||||
expect(botOpenIds.has("alpha")).toBe(false);
|
||||
expect(botOpenIds.has("beta")).toBe(true);
|
||||
expect(botNames.has("alpha")).toBe(false);
|
||||
expect(botNames.has("beta")).toBe(true);
|
||||
});
|
||||
|
||||
it("closes all websocket clients during global stop cleanup", () => {
|
||||
const alphaClient = createWsClient();
|
||||
const betaClient = createWsClient();
|
||||
|
||||
wsClients.set("alpha", alphaClient as never);
|
||||
wsClients.set("beta", betaClient as never);
|
||||
botOpenIds.set("alpha", "ou_alpha");
|
||||
botOpenIds.set("beta", "ou_beta");
|
||||
botNames.set("alpha", "Alpha");
|
||||
botNames.set("beta", "Beta");
|
||||
|
||||
stopFeishuMonitorState();
|
||||
|
||||
expect(alphaClient.close).toHaveBeenCalledTimes(1);
|
||||
expect(betaClient.close).toHaveBeenCalledTimes(1);
|
||||
expect(wsClients.size).toBe(0);
|
||||
expect(botOpenIds.size).toBe(0);
|
||||
expect(botNames.size).toBe(0);
|
||||
});
|
||||
});
|
||||
|
|
@ -104,6 +104,15 @@ const feishuWebhookAnomalyTracker = createWebhookAnomalyTracker({
|
|||
logEvery: feishuWebhookAnomalyDefaults.logEvery,
|
||||
});
|
||||
|
||||
function closeWsClient(client: Lark.WSClient | undefined): void {
|
||||
if (!client) return;
|
||||
try {
|
||||
client.close();
|
||||
} catch {
|
||||
/* Best-effort cleanup */
|
||||
}
|
||||
}
|
||||
|
||||
export function clearFeishuWebhookRateLimitStateForTest(): void {
|
||||
feishuWebhookRateLimiter.clear();
|
||||
feishuWebhookAnomalyTracker.clear();
|
||||
|
|
@ -134,6 +143,7 @@ export function recordWebhookStatus(
|
|||
|
||||
export function stopFeishuMonitorState(accountId?: string): void {
|
||||
if (accountId) {
|
||||
closeWsClient(wsClients.get(accountId));
|
||||
wsClients.delete(accountId);
|
||||
const server = httpServers.get(accountId);
|
||||
if (server) {
|
||||
|
|
@ -145,6 +155,9 @@ export function stopFeishuMonitorState(accountId?: string): void {
|
|||
return;
|
||||
}
|
||||
|
||||
for (const client of wsClients.values()) {
|
||||
closeWsClient(client);
|
||||
}
|
||||
wsClients.clear();
|
||||
for (const server of httpServers.values()) {
|
||||
server.close();
|
||||
|
|
|
|||
|
|
@ -1,11 +1,11 @@
|
|||
import { vi } from "vitest";
|
||||
|
||||
export function createFeishuClientMockModule(): {
|
||||
createFeishuWSClient: () => { start: () => void };
|
||||
createFeishuWSClient: () => { start: () => void; close: () => void };
|
||||
createEventDispatcher: () => { register: () => void };
|
||||
} {
|
||||
return {
|
||||
createFeishuWSClient: vi.fn(() => ({ start: vi.fn() })),
|
||||
createFeishuWSClient: vi.fn(() => ({ start: vi.fn(), close: vi.fn() })),
|
||||
createEventDispatcher: vi.fn(() => ({ register: vi.fn() })),
|
||||
};
|
||||
}
|
||||
|
|
|
|||
|
|
@ -89,23 +89,35 @@ export async function monitorWebSocket({
|
|||
eventDispatcher,
|
||||
}: MonitorTransportParams): Promise<void> {
|
||||
const log = runtime?.log ?? console.log;
|
||||
const error = runtime?.error ?? console.error;
|
||||
log(`feishu[${accountId}]: starting WebSocket connection...`);
|
||||
|
||||
const wsClient = createFeishuWSClient(account);
|
||||
wsClients.set(accountId, wsClient);
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
let cleanedUp = false;
|
||||
|
||||
const cleanup = () => {
|
||||
wsClients.delete(accountId);
|
||||
botOpenIds.delete(accountId);
|
||||
botNames.delete(accountId);
|
||||
if (cleanedUp) return;
|
||||
cleanedUp = true;
|
||||
abortSignal?.removeEventListener("abort", handleAbort);
|
||||
try {
|
||||
wsClient.close();
|
||||
} catch (err) {
|
||||
error(`feishu[${accountId}]: error closing WebSocket client: ${String(err)}`);
|
||||
} finally {
|
||||
wsClients.delete(accountId);
|
||||
botOpenIds.delete(accountId);
|
||||
botNames.delete(accountId);
|
||||
}
|
||||
};
|
||||
|
||||
const handleAbort = () => {
|
||||
function handleAbort() {
|
||||
log(`feishu[${accountId}]: abort signal received, stopping`);
|
||||
cleanup();
|
||||
resolve();
|
||||
};
|
||||
}
|
||||
|
||||
if (abortSignal?.aborted) {
|
||||
cleanup();
|
||||
|
|
@ -120,7 +132,6 @@ export async function monitorWebSocket({
|
|||
log(`feishu[${accountId}]: WebSocket client started`);
|
||||
} catch (err) {
|
||||
cleanup();
|
||||
abortSignal?.removeEventListener("abort", handleAbort);
|
||||
reject(err);
|
||||
}
|
||||
});
|
||||
|
|
|
|||
Loading…
Reference in New Issue