mirror of https://github.com/openclaw/openclaw.git
628 lines
18 KiB
TypeScript
628 lines
18 KiB
TypeScript
import * as crypto from "crypto";
|
|
import * as http from "http";
|
|
import * as Lark from "@larksuiteoapi/node-sdk";
|
|
import {
|
|
type ClawdbotConfig,
|
|
type RuntimeEnv,
|
|
type HistoryEntry,
|
|
installRequestBodyLimitGuard,
|
|
} from "openclaw/plugin-sdk";
|
|
import { resolveFeishuAccount, listEnabledFeishuAccounts } from "./accounts.js";
|
|
import { handleFeishuMessage, type FeishuMessageEvent, type FeishuBotAddedEvent } from "./bot.js";
|
|
import { handleFeishuCardAction, type FeishuCardActionEvent } from "./card-action.js";
|
|
import { createFeishuWSClient, createEventDispatcher } from "./client.js";
|
|
import { probeFeishu } from "./probe.js";
|
|
import { getMessageFeishu } from "./send.js";
|
|
import type { ResolvedFeishuAccount } from "./types.js";
|
|
|
|
export type MonitorFeishuOpts = {
|
|
config?: ClawdbotConfig;
|
|
runtime?: RuntimeEnv;
|
|
abortSignal?: AbortSignal;
|
|
accountId?: string;
|
|
};
|
|
|
|
// Per-account WebSocket clients, HTTP servers, and bot info
|
|
const wsClients = new Map<string, Lark.WSClient>();
|
|
const httpServers = new Map<string, http.Server>();
|
|
const botOpenIds = new Map<string, string>();
|
|
const FEISHU_WEBHOOK_MAX_BODY_BYTES = 1024 * 1024;
|
|
const FEISHU_WEBHOOK_BODY_TIMEOUT_MS = 30_000;
|
|
const FEISHU_WEBHOOK_RATE_LIMIT_WINDOW_MS = 60_000;
|
|
const FEISHU_WEBHOOK_RATE_LIMIT_MAX_REQUESTS = 120;
|
|
const FEISHU_WEBHOOK_RATE_LIMIT_MAX_TRACKED_KEYS = 4_096;
|
|
const FEISHU_WEBHOOK_COUNTER_LOG_EVERY = 25;
|
|
const FEISHU_REACTION_VERIFY_TIMEOUT_MS = 1_500;
|
|
|
|
export type FeishuReactionCreatedEvent = {
|
|
message_id: string;
|
|
chat_id?: string;
|
|
chat_type?: "p2p" | "group";
|
|
reaction_type?: { emoji_type?: string };
|
|
operator_type?: string;
|
|
user_id?: { open_id?: string };
|
|
action_time?: string;
|
|
};
|
|
|
|
type ResolveReactionSyntheticEventParams = {
|
|
cfg: ClawdbotConfig;
|
|
accountId: string;
|
|
event: FeishuReactionCreatedEvent;
|
|
botOpenId?: string;
|
|
fetchMessage?: typeof getMessageFeishu;
|
|
verificationTimeoutMs?: number;
|
|
logger?: (message: string) => void;
|
|
uuid?: () => string;
|
|
};
|
|
|
|
const feishuWebhookRateLimits = new Map<string, { count: number; windowStartMs: number }>();
|
|
const feishuWebhookStatusCounters = new Map<string, number>();
|
|
let lastWebhookRateLimitCleanupMs = 0;
|
|
|
|
function isJsonContentType(value: string | string[] | undefined): boolean {
|
|
const first = Array.isArray(value) ? value[0] : value;
|
|
if (!first) {
|
|
return false;
|
|
}
|
|
const mediaType = first.split(";", 1)[0]?.trim().toLowerCase();
|
|
return mediaType === "application/json" || Boolean(mediaType?.endsWith("+json"));
|
|
}
|
|
|
|
function trimWebhookRateLimitState(): void {
|
|
while (feishuWebhookRateLimits.size > FEISHU_WEBHOOK_RATE_LIMIT_MAX_TRACKED_KEYS) {
|
|
const oldestKey = feishuWebhookRateLimits.keys().next().value;
|
|
if (typeof oldestKey !== "string") {
|
|
break;
|
|
}
|
|
feishuWebhookRateLimits.delete(oldestKey);
|
|
}
|
|
}
|
|
|
|
function maybePruneWebhookRateLimitState(nowMs: number): void {
|
|
if (
|
|
feishuWebhookRateLimits.size === 0 ||
|
|
nowMs - lastWebhookRateLimitCleanupMs < FEISHU_WEBHOOK_RATE_LIMIT_WINDOW_MS
|
|
) {
|
|
return;
|
|
}
|
|
lastWebhookRateLimitCleanupMs = nowMs;
|
|
for (const [key, state] of feishuWebhookRateLimits) {
|
|
if (nowMs - state.windowStartMs >= FEISHU_WEBHOOK_RATE_LIMIT_WINDOW_MS) {
|
|
feishuWebhookRateLimits.delete(key);
|
|
}
|
|
}
|
|
}
|
|
|
|
export function clearFeishuWebhookRateLimitStateForTest(): void {
|
|
feishuWebhookRateLimits.clear();
|
|
lastWebhookRateLimitCleanupMs = 0;
|
|
}
|
|
|
|
export function getFeishuWebhookRateLimitStateSizeForTest(): number {
|
|
return feishuWebhookRateLimits.size;
|
|
}
|
|
|
|
export function isWebhookRateLimitedForTest(key: string, nowMs: number): boolean {
|
|
maybePruneWebhookRateLimitState(nowMs);
|
|
|
|
const state = feishuWebhookRateLimits.get(key);
|
|
if (!state || nowMs - state.windowStartMs >= FEISHU_WEBHOOK_RATE_LIMIT_WINDOW_MS) {
|
|
feishuWebhookRateLimits.set(key, { count: 1, windowStartMs: nowMs });
|
|
trimWebhookRateLimitState();
|
|
return false;
|
|
}
|
|
|
|
state.count += 1;
|
|
if (state.count > FEISHU_WEBHOOK_RATE_LIMIT_MAX_REQUESTS) {
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
function isWebhookRateLimited(key: string, nowMs: number): boolean {
|
|
return isWebhookRateLimitedForTest(key, nowMs);
|
|
}
|
|
|
|
function recordWebhookStatus(
|
|
runtime: RuntimeEnv | undefined,
|
|
accountId: string,
|
|
path: string,
|
|
statusCode: number,
|
|
): void {
|
|
if (![400, 401, 408, 413, 415, 429].includes(statusCode)) {
|
|
return;
|
|
}
|
|
const key = `${accountId}:${path}:${statusCode}`;
|
|
const next = (feishuWebhookStatusCounters.get(key) ?? 0) + 1;
|
|
feishuWebhookStatusCounters.set(key, next);
|
|
if (next === 1 || next % FEISHU_WEBHOOK_COUNTER_LOG_EVERY === 0) {
|
|
const log = runtime?.log ?? console.log;
|
|
log(`feishu[${accountId}]: webhook anomaly path=${path} status=${statusCode} count=${next}`);
|
|
}
|
|
}
|
|
|
|
async function withTimeout<T>(promise: Promise<T>, timeoutMs: number): Promise<T | null> {
|
|
let timeoutId: NodeJS.Timeout | undefined;
|
|
try {
|
|
return await Promise.race<T | null>([
|
|
promise,
|
|
new Promise<null>((resolve) => {
|
|
timeoutId = setTimeout(() => resolve(null), timeoutMs);
|
|
}),
|
|
]);
|
|
} finally {
|
|
if (timeoutId) {
|
|
clearTimeout(timeoutId);
|
|
}
|
|
}
|
|
}
|
|
|
|
export async function resolveReactionSyntheticEvent(
|
|
params: ResolveReactionSyntheticEventParams,
|
|
): Promise<FeishuMessageEvent | null> {
|
|
const {
|
|
cfg,
|
|
accountId,
|
|
event,
|
|
botOpenId,
|
|
fetchMessage = getMessageFeishu,
|
|
verificationTimeoutMs = FEISHU_REACTION_VERIFY_TIMEOUT_MS,
|
|
logger,
|
|
uuid = () => crypto.randomUUID(),
|
|
} = params;
|
|
|
|
const emoji = event.reaction_type?.emoji_type;
|
|
const messageId = event.message_id;
|
|
const senderId = event.user_id?.open_id;
|
|
if (!emoji || !messageId || !senderId) {
|
|
return null;
|
|
}
|
|
|
|
const account = resolveFeishuAccount({ cfg, accountId });
|
|
const reactionNotifications = account.config.reactionNotifications ?? "own";
|
|
if (reactionNotifications === "off") {
|
|
return null;
|
|
}
|
|
|
|
// Skip bot self-reactions
|
|
if (event.operator_type === "app" || senderId === botOpenId) {
|
|
return null;
|
|
}
|
|
|
|
// Skip typing indicator emoji
|
|
if (emoji === "Typing") {
|
|
return null;
|
|
}
|
|
|
|
if (reactionNotifications === "own" && !botOpenId) {
|
|
logger?.(
|
|
`feishu[${accountId}]: bot open_id unavailable, skipping reaction ${emoji} on ${messageId}`,
|
|
);
|
|
return null;
|
|
}
|
|
|
|
const reactedMsg = await withTimeout(
|
|
fetchMessage({ cfg, messageId, accountId }),
|
|
verificationTimeoutMs,
|
|
).catch(() => null);
|
|
const isBotMessage = reactedMsg?.senderType === "app" || reactedMsg?.senderOpenId === botOpenId;
|
|
if (!reactedMsg || (reactionNotifications === "own" && !isBotMessage)) {
|
|
logger?.(
|
|
`feishu[${accountId}]: ignoring reaction on non-bot/unverified message ${messageId} ` +
|
|
`(sender: ${reactedMsg?.senderOpenId ?? "unknown"})`,
|
|
);
|
|
return null;
|
|
}
|
|
|
|
const syntheticChatIdRaw = event.chat_id ?? reactedMsg.chatId;
|
|
const syntheticChatId = syntheticChatIdRaw?.trim() ? syntheticChatIdRaw : `p2p:${senderId}`;
|
|
const syntheticChatType: "p2p" | "group" = event.chat_type ?? "p2p";
|
|
return {
|
|
sender: {
|
|
sender_id: { open_id: senderId },
|
|
sender_type: "user",
|
|
},
|
|
message: {
|
|
message_id: `${messageId}:reaction:${emoji}:${uuid()}`,
|
|
chat_id: syntheticChatId,
|
|
chat_type: syntheticChatType,
|
|
message_type: "text",
|
|
content: JSON.stringify({
|
|
text: `[reacted with ${emoji} to message ${messageId}]`,
|
|
}),
|
|
},
|
|
};
|
|
}
|
|
|
|
async function fetchBotOpenId(account: ResolvedFeishuAccount): Promise<string | undefined> {
|
|
try {
|
|
const result = await probeFeishu(account);
|
|
return result.ok ? result.botOpenId : undefined;
|
|
} catch {
|
|
return undefined;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Register common event handlers on an EventDispatcher.
|
|
* When fireAndForget is true (webhook mode), message handling is not awaited
|
|
* to avoid blocking the HTTP response (Lark requires <3s response).
|
|
*/
|
|
function registerEventHandlers(
|
|
eventDispatcher: Lark.EventDispatcher,
|
|
context: {
|
|
cfg: ClawdbotConfig;
|
|
accountId: string;
|
|
runtime?: RuntimeEnv;
|
|
chatHistories: Map<string, HistoryEntry[]>;
|
|
fireAndForget?: boolean;
|
|
},
|
|
) {
|
|
const { cfg, accountId, runtime, chatHistories, fireAndForget } = context;
|
|
const log = runtime?.log ?? console.log;
|
|
const error = runtime?.error ?? console.error;
|
|
|
|
eventDispatcher.register({
|
|
"im.message.receive_v1": async (data) => {
|
|
try {
|
|
const event = data as unknown as FeishuMessageEvent;
|
|
const promise = handleFeishuMessage({
|
|
cfg,
|
|
event,
|
|
botOpenId: botOpenIds.get(accountId),
|
|
runtime,
|
|
chatHistories,
|
|
accountId,
|
|
});
|
|
if (fireAndForget) {
|
|
promise.catch((err) => {
|
|
error(`feishu[${accountId}]: error handling message: ${String(err)}`);
|
|
});
|
|
} else {
|
|
await promise;
|
|
}
|
|
} catch (err) {
|
|
error(`feishu[${accountId}]: error handling message: ${String(err)}`);
|
|
}
|
|
},
|
|
"im.message.message_read_v1": async () => {
|
|
// Ignore read receipts
|
|
},
|
|
"im.chat.member.bot.added_v1": async (data) => {
|
|
try {
|
|
const event = data as unknown as FeishuBotAddedEvent;
|
|
log(`feishu[${accountId}]: bot added to chat ${event.chat_id}`);
|
|
} catch (err) {
|
|
error(`feishu[${accountId}]: error handling bot added event: ${String(err)}`);
|
|
}
|
|
},
|
|
"im.chat.member.bot.deleted_v1": async (data) => {
|
|
try {
|
|
const event = data as unknown as { chat_id: string };
|
|
log(`feishu[${accountId}]: bot removed from chat ${event.chat_id}`);
|
|
} catch (err) {
|
|
error(`feishu[${accountId}]: error handling bot removed event: ${String(err)}`);
|
|
}
|
|
},
|
|
"im.message.reaction.created_v1": async (data) => {
|
|
const processReaction = async () => {
|
|
const event = data as FeishuReactionCreatedEvent;
|
|
const myBotId = botOpenIds.get(accountId);
|
|
const syntheticEvent = await resolveReactionSyntheticEvent({
|
|
cfg,
|
|
accountId,
|
|
event,
|
|
botOpenId: myBotId,
|
|
logger: log,
|
|
});
|
|
if (!syntheticEvent) {
|
|
return;
|
|
}
|
|
const promise = handleFeishuMessage({
|
|
cfg,
|
|
event: syntheticEvent,
|
|
botOpenId: myBotId,
|
|
runtime,
|
|
chatHistories,
|
|
accountId,
|
|
});
|
|
if (fireAndForget) {
|
|
promise.catch((err) => {
|
|
error(`feishu[${accountId}]: error handling reaction: ${String(err)}`);
|
|
});
|
|
return;
|
|
}
|
|
await promise;
|
|
};
|
|
|
|
if (fireAndForget) {
|
|
void processReaction().catch((err) => {
|
|
error(`feishu[${accountId}]: error handling reaction event: ${String(err)}`);
|
|
});
|
|
return;
|
|
}
|
|
|
|
try {
|
|
await processReaction();
|
|
} catch (err) {
|
|
error(`feishu[${accountId}]: error handling reaction event: ${String(err)}`);
|
|
}
|
|
},
|
|
"im.message.reaction.deleted_v1": async () => {
|
|
// Ignore reaction removals
|
|
},
|
|
"card.action.trigger": async (data: unknown) => {
|
|
try {
|
|
const event = data as unknown as FeishuCardActionEvent;
|
|
const promise = handleFeishuCardAction({
|
|
cfg,
|
|
event,
|
|
botOpenId: botOpenIds.get(accountId),
|
|
runtime,
|
|
accountId,
|
|
});
|
|
if (fireAndForget) {
|
|
promise.catch((err) => {
|
|
error(`feishu[${accountId}]: error handling card action: ${String(err)}`);
|
|
});
|
|
} else {
|
|
await promise;
|
|
}
|
|
} catch (err) {
|
|
error(`feishu[${accountId}]: error handling card action: ${String(err)}`);
|
|
}
|
|
},
|
|
});
|
|
}
|
|
|
|
type MonitorAccountParams = {
|
|
cfg: ClawdbotConfig;
|
|
account: ResolvedFeishuAccount;
|
|
runtime?: RuntimeEnv;
|
|
abortSignal?: AbortSignal;
|
|
};
|
|
|
|
/**
|
|
* Monitor a single Feishu account.
|
|
*/
|
|
async function monitorSingleAccount(params: MonitorAccountParams): Promise<void> {
|
|
const { cfg, account, runtime, abortSignal } = params;
|
|
const { accountId } = account;
|
|
const log = runtime?.log ?? console.log;
|
|
|
|
// Fetch bot open_id
|
|
const botOpenId = await fetchBotOpenId(account);
|
|
botOpenIds.set(accountId, botOpenId ?? "");
|
|
log(`feishu[${accountId}]: bot open_id resolved: ${botOpenId ?? "unknown"}`);
|
|
|
|
const connectionMode = account.config.connectionMode ?? "websocket";
|
|
if (connectionMode === "webhook" && !account.verificationToken?.trim()) {
|
|
throw new Error(`Feishu account "${accountId}" webhook mode requires verificationToken`);
|
|
}
|
|
const eventDispatcher = createEventDispatcher(account);
|
|
const chatHistories = new Map<string, HistoryEntry[]>();
|
|
|
|
registerEventHandlers(eventDispatcher, {
|
|
cfg,
|
|
accountId,
|
|
runtime,
|
|
chatHistories,
|
|
fireAndForget: connectionMode === "webhook",
|
|
});
|
|
|
|
if (connectionMode === "webhook") {
|
|
return monitorWebhook({ params, accountId, eventDispatcher });
|
|
}
|
|
|
|
return monitorWebSocket({ params, accountId, eventDispatcher });
|
|
}
|
|
|
|
type ConnectionParams = {
|
|
params: MonitorAccountParams;
|
|
accountId: string;
|
|
eventDispatcher: Lark.EventDispatcher;
|
|
};
|
|
|
|
async function monitorWebSocket({
|
|
params,
|
|
accountId,
|
|
eventDispatcher,
|
|
}: ConnectionParams): Promise<void> {
|
|
const { account, runtime, abortSignal } = params;
|
|
const log = runtime?.log ?? console.log;
|
|
const error = runtime?.error ?? console.error;
|
|
|
|
log(`feishu[${accountId}]: starting WebSocket connection...`);
|
|
|
|
const wsClient = createFeishuWSClient(account);
|
|
wsClients.set(accountId, wsClient);
|
|
|
|
return new Promise((resolve, reject) => {
|
|
const cleanup = () => {
|
|
wsClients.delete(accountId);
|
|
botOpenIds.delete(accountId);
|
|
};
|
|
|
|
const handleAbort = () => {
|
|
log(`feishu[${accountId}]: abort signal received, stopping`);
|
|
cleanup();
|
|
resolve();
|
|
};
|
|
|
|
if (abortSignal?.aborted) {
|
|
cleanup();
|
|
resolve();
|
|
return;
|
|
}
|
|
|
|
abortSignal?.addEventListener("abort", handleAbort, { once: true });
|
|
|
|
try {
|
|
wsClient.start({ eventDispatcher });
|
|
log(`feishu[${accountId}]: WebSocket client started`);
|
|
} catch (err) {
|
|
cleanup();
|
|
abortSignal?.removeEventListener("abort", handleAbort);
|
|
reject(err);
|
|
}
|
|
});
|
|
}
|
|
|
|
async function monitorWebhook({
|
|
params,
|
|
accountId,
|
|
eventDispatcher,
|
|
}: ConnectionParams): Promise<void> {
|
|
const { account, runtime, abortSignal } = params;
|
|
const log = runtime?.log ?? console.log;
|
|
const error = runtime?.error ?? console.error;
|
|
|
|
const port = account.config.webhookPort ?? 3000;
|
|
const path = account.config.webhookPath ?? "/feishu/events";
|
|
const host = account.config.webhookHost ?? "127.0.0.1";
|
|
|
|
log(`feishu[${accountId}]: starting Webhook server on ${host}:${port}, path ${path}...`);
|
|
|
|
const server = http.createServer();
|
|
const webhookHandler = Lark.adaptDefault(path, eventDispatcher, { autoChallenge: true });
|
|
server.on("request", (req, res) => {
|
|
res.on("finish", () => {
|
|
recordWebhookStatus(runtime, accountId, path, res.statusCode);
|
|
});
|
|
|
|
const rateLimitKey = `${accountId}:${path}:${req.socket.remoteAddress ?? "unknown"}`;
|
|
if (isWebhookRateLimited(rateLimitKey, Date.now())) {
|
|
res.statusCode = 429;
|
|
res.end("Too Many Requests");
|
|
return;
|
|
}
|
|
|
|
if (req.method === "POST" && !isJsonContentType(req.headers["content-type"])) {
|
|
res.statusCode = 415;
|
|
res.end("Unsupported Media Type");
|
|
return;
|
|
}
|
|
|
|
const guard = installRequestBodyLimitGuard(req, res, {
|
|
maxBytes: FEISHU_WEBHOOK_MAX_BODY_BYTES,
|
|
timeoutMs: FEISHU_WEBHOOK_BODY_TIMEOUT_MS,
|
|
responseFormat: "text",
|
|
});
|
|
if (guard.isTripped()) {
|
|
return;
|
|
}
|
|
void Promise.resolve(webhookHandler(req, res))
|
|
.catch((err) => {
|
|
if (!guard.isTripped()) {
|
|
error(`feishu[${accountId}]: webhook handler error: ${String(err)}`);
|
|
}
|
|
})
|
|
.finally(() => {
|
|
guard.dispose();
|
|
});
|
|
});
|
|
httpServers.set(accountId, server);
|
|
|
|
return new Promise((resolve, reject) => {
|
|
const cleanup = () => {
|
|
server.close();
|
|
httpServers.delete(accountId);
|
|
botOpenIds.delete(accountId);
|
|
};
|
|
|
|
const handleAbort = () => {
|
|
log(`feishu[${accountId}]: abort signal received, stopping Webhook server`);
|
|
cleanup();
|
|
resolve();
|
|
};
|
|
|
|
if (abortSignal?.aborted) {
|
|
cleanup();
|
|
resolve();
|
|
return;
|
|
}
|
|
|
|
abortSignal?.addEventListener("abort", handleAbort, { once: true });
|
|
|
|
server.listen(port, host, () => {
|
|
log(`feishu[${accountId}]: Webhook server listening on ${host}:${port}`);
|
|
});
|
|
|
|
server.on("error", (err) => {
|
|
error(`feishu[${accountId}]: Webhook server error: ${err}`);
|
|
abortSignal?.removeEventListener("abort", handleAbort);
|
|
reject(err);
|
|
});
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Main entry: start monitoring for all enabled accounts.
|
|
*/
|
|
export async function monitorFeishuProvider(opts: MonitorFeishuOpts = {}): Promise<void> {
|
|
const cfg = opts.config;
|
|
if (!cfg) {
|
|
throw new Error("Config is required for Feishu monitor");
|
|
}
|
|
|
|
const log = opts.runtime?.log ?? console.log;
|
|
|
|
// If accountId is specified, only monitor that account
|
|
if (opts.accountId) {
|
|
const account = resolveFeishuAccount({ cfg, accountId: opts.accountId });
|
|
if (!account.enabled || !account.configured) {
|
|
throw new Error(`Feishu account "${opts.accountId}" not configured or disabled`);
|
|
}
|
|
return monitorSingleAccount({
|
|
cfg,
|
|
account,
|
|
runtime: opts.runtime,
|
|
abortSignal: opts.abortSignal,
|
|
});
|
|
}
|
|
|
|
// Otherwise, start all enabled accounts
|
|
const accounts = listEnabledFeishuAccounts(cfg);
|
|
if (accounts.length === 0) {
|
|
throw new Error("No enabled Feishu accounts configured");
|
|
}
|
|
|
|
log(
|
|
`feishu: starting ${accounts.length} account(s): ${accounts.map((a) => a.accountId).join(", ")}`,
|
|
);
|
|
|
|
// Start all accounts in parallel
|
|
await Promise.all(
|
|
accounts.map((account) =>
|
|
monitorSingleAccount({
|
|
cfg,
|
|
account,
|
|
runtime: opts.runtime,
|
|
abortSignal: opts.abortSignal,
|
|
}),
|
|
),
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Stop monitoring for a specific account or all accounts.
|
|
*/
|
|
export function stopFeishuMonitor(accountId?: string): void {
|
|
if (accountId) {
|
|
wsClients.delete(accountId);
|
|
const server = httpServers.get(accountId);
|
|
if (server) {
|
|
server.close();
|
|
httpServers.delete(accountId);
|
|
}
|
|
botOpenIds.delete(accountId);
|
|
} else {
|
|
wsClients.clear();
|
|
for (const server of httpServers.values()) {
|
|
server.close();
|
|
}
|
|
httpServers.clear();
|
|
botOpenIds.clear();
|
|
}
|
|
}
|