From f4a2bbe0c92bc099257bcd93343fe28b9f352a78 Mon Sep 17 00:00:00 2001 From: yunweibang Date: Sat, 14 Mar 2026 11:37:40 +0800 Subject: [PATCH] fix(feishu): add early event-level dedup to prevent duplicate replies (#43762) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(feishu): add early event-level dedup to prevent duplicate replies Add synchronous in-memory dedup at EventDispatcher handler level using message_id as key with 5-minute TTL and 2000-entry cap. This catches duplicate events immediately when they arrive from the Lark SDK — before the inbound debouncer or processing queue — preventing the race condition where two concurrent dispatches enter the pipeline before either records the messageId in the downstream dedup layer. Fixes the root cause reported in #42687. * fix(feishu): correct inverted dedup condition check() returns false on first call (new key) and true on subsequent calls (duplicate). The previous `!check()` guard was inverted — dropping every first delivery and passing all duplicates. Remove the negation so the guard correctly drops duplicates. * fix(feishu): simplify eventDedup key — drop redundant accountId prefix eventDedup is already scoped per account (one instance per registerEventHandlers call), so the accountId prefix in the cache key is redundant. Use `evt:${messageId}` instead. * fix(feishu): share inbound processing claim dedupe --------- Co-authored-by: Tak Hoffman <781889+Takhoffman@users.noreply.github.com> --- CHANGELOG.md | 1 + extensions/feishu/src/bot.ts | 31 ++++-- extensions/feishu/src/dedup.ts | 103 ++++++++++++++++++ extensions/feishu/src/monitor.account.ts | 37 ++++--- .../feishu/src/monitor.reaction.test.ts | 76 ++++++++++--- src/infra/dedupe.ts | 7 ++ 6 files changed, 210 insertions(+), 45 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bac756acc1f..71dbee62a2f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -139,6 +139,7 @@ Docs: https://docs.openclaw.ai - Plugins/env-scoped roots: fix plugin discovery/load caches and provenance tracking so same-process `HOME`/`OPENCLAW_HOME` changes no longer reuse stale plugin state or misreport `~/...` plugins as untracked. (#44046) thanks @gumadeiras. - Gateway/session discovery: discover disk-only and retired ACP session stores under custom templated `session.store` roots so ACP reconciliation, session-id/session-label targeting, and run-id fallback keep working after restart. (#44176) thanks @gumadeiras. - Models/OpenRouter native ids: canonicalize native OpenRouter model keys across config writes, runtime lookups, fallback management, and `models list --plain`, and migrate legacy duplicated `openrouter/openrouter/...` config entries forward on write. +- Feishu/event dedupe: keep early duplicate suppression aligned with the shared Feishu message-id contract and release the pre-queue dedupe marker after failed dispatch so retried events can recover instead of being dropped until the short TTL expires. (#43762) Thanks @yunweibang. - Gateway/hooks: bucket hook auth failures by forwarded client IP behind trusted proxies and warn when `hooks.allowedAgentIds` leaves hook routing unrestricted. - Agents/compaction: skip the post-compaction `cache-ttl` marker write when a compaction completed in the same attempt, preventing the next turn from immediately triggering a second tiny compaction. (#28548) thanks @MoerAI. - Native chat/macOS: add `/new`, `/reset`, and `/clear` reset triggers, keep shared main-session aliases aligned, and ignore stale model-selection completions so native chat state stays in sync across reset and fast model changes. (#10898) Thanks @Nachx639. diff --git a/extensions/feishu/src/bot.ts b/extensions/feishu/src/bot.ts index 13a130b3d79..815f935ed94 100644 --- a/extensions/feishu/src/bot.ts +++ b/extensions/feishu/src/bot.ts @@ -15,7 +15,7 @@ import { } from "openclaw/plugin-sdk/feishu"; import { resolveFeishuAccount } from "./accounts.js"; import { createFeishuClient } from "./client.js"; -import { tryRecordMessage, tryRecordMessagePersistent } from "./dedup.js"; +import { finalizeFeishuMessageProcessing, tryRecordMessagePersistent } from "./dedup.js"; import { maybeCreateDynamicAgent } from "./dynamic-agent.js"; import { normalizeFeishuExternalKey } from "./external-keys.js"; import { downloadMessageResourceFeishu } from "./media.js"; @@ -867,8 +867,18 @@ export async function handleFeishuMessage(params: { runtime?: RuntimeEnv; chatHistories?: Map; accountId?: string; + processingClaimHeld?: boolean; }): Promise { - const { cfg, event, botOpenId, botName, runtime, chatHistories, accountId } = params; + const { + cfg, + event, + botOpenId, + botName, + runtime, + chatHistories, + accountId, + processingClaimHeld = false, + } = params; // Resolve account with merged config const account = resolveFeishuAccount({ cfg, accountId }); @@ -877,16 +887,15 @@ export async function handleFeishuMessage(params: { const log = runtime?.log ?? console.log; const error = runtime?.error ?? console.error; - // Dedup: synchronous memory guard prevents concurrent duplicate dispatch - // before the async persistent check completes. const messageId = event.message.message_id; - const memoryDedupeKey = `${account.accountId}:${messageId}`; - if (!tryRecordMessage(memoryDedupeKey)) { - log(`feishu: skipping duplicate message ${messageId} (memory dedup)`); - return; - } - // Persistent dedup survives restarts and reconnects. - if (!(await tryRecordMessagePersistent(messageId, account.accountId, log))) { + if ( + !(await finalizeFeishuMessageProcessing({ + messageId, + namespace: account.accountId, + log, + claimHeld: processingClaimHeld, + })) + ) { log(`feishu: skipping duplicate message ${messageId}`); return; } diff --git a/extensions/feishu/src/dedup.ts b/extensions/feishu/src/dedup.ts index 35f95d5c76b..fc3e9baad65 100644 --- a/extensions/feishu/src/dedup.ts +++ b/extensions/feishu/src/dedup.ts @@ -10,9 +10,15 @@ import { const DEDUP_TTL_MS = 24 * 60 * 60 * 1000; const MEMORY_MAX_SIZE = 1_000; const FILE_MAX_ENTRIES = 10_000; +const EVENT_DEDUP_TTL_MS = 5 * 60 * 1000; +const EVENT_MEMORY_MAX_SIZE = 2_000; type PersistentDedupeData = Record; const memoryDedupe = createDedupeCache({ ttlMs: DEDUP_TTL_MS, maxSize: MEMORY_MAX_SIZE }); +const processingClaims = createDedupeCache({ + ttlMs: EVENT_DEDUP_TTL_MS, + maxSize: EVENT_MEMORY_MAX_SIZE, +}); function resolveStateDirFromEnv(env: NodeJS.ProcessEnv = process.env): string { const stateOverride = env.OPENCLAW_STATE_DIR?.trim() || env.CLAWDBOT_STATE_DIR?.trim(); @@ -37,6 +43,103 @@ const persistentDedupe = createPersistentDedupe({ resolveFilePath: resolveNamespaceFilePath, }); +function resolveEventDedupeKey( + namespace: string, + messageId: string | undefined | null, +): string | null { + const trimmed = messageId?.trim(); + if (!trimmed) { + return null; + } + return `${namespace}:${trimmed}`; +} + +function normalizeMessageId(messageId: string | undefined | null): string | null { + const trimmed = messageId?.trim(); + return trimmed ? trimmed : null; +} + +function resolveMemoryDedupeKey( + namespace: string, + messageId: string | undefined | null, +): string | null { + const trimmed = normalizeMessageId(messageId); + if (!trimmed) { + return null; + } + return `${namespace}:${trimmed}`; +} + +export function tryBeginFeishuMessageProcessing( + messageId: string | undefined | null, + namespace = "global", +): boolean { + return !processingClaims.check(resolveEventDedupeKey(namespace, messageId)); +} + +export function releaseFeishuMessageProcessing( + messageId: string | undefined | null, + namespace = "global", +): void { + processingClaims.delete(resolveEventDedupeKey(namespace, messageId)); +} + +export async function finalizeFeishuMessageProcessing(params: { + messageId: string | undefined | null; + namespace?: string; + log?: (...args: unknown[]) => void; + claimHeld?: boolean; +}): Promise { + const { messageId, namespace = "global", log, claimHeld = false } = params; + const normalizedMessageId = normalizeMessageId(messageId); + const memoryKey = resolveMemoryDedupeKey(namespace, messageId); + if (!memoryKey || !normalizedMessageId) { + return false; + } + if (!claimHeld && !tryBeginFeishuMessageProcessing(normalizedMessageId, namespace)) { + return false; + } + if (!tryRecordMessage(memoryKey)) { + releaseFeishuMessageProcessing(normalizedMessageId, namespace); + return false; + } + if (!(await tryRecordMessagePersistent(normalizedMessageId, namespace, log))) { + releaseFeishuMessageProcessing(normalizedMessageId, namespace); + return false; + } + return true; +} + +export async function recordProcessedFeishuMessage( + messageId: string | undefined | null, + namespace = "global", + log?: (...args: unknown[]) => void, +): Promise { + const normalizedMessageId = normalizeMessageId(messageId); + const memoryKey = resolveMemoryDedupeKey(namespace, messageId); + if (!memoryKey || !normalizedMessageId) { + return false; + } + tryRecordMessage(memoryKey); + return await tryRecordMessagePersistent(normalizedMessageId, namespace, log); +} + +export async function hasProcessedFeishuMessage( + messageId: string | undefined | null, + namespace = "global", + log?: (...args: unknown[]) => void, +): Promise { + const normalizedMessageId = normalizeMessageId(messageId); + const memoryKey = resolveMemoryDedupeKey(namespace, messageId); + if (!memoryKey || !normalizedMessageId) { + return false; + } + if (hasRecordedMessage(memoryKey)) { + return true; + } + return hasRecordedMessagePersistent(normalizedMessageId, namespace, log); +} + /** * Synchronous dedup — memory only. * Kept for backward compatibility; prefer {@link tryRecordMessagePersistent}. diff --git a/extensions/feishu/src/monitor.account.ts b/extensions/feishu/src/monitor.account.ts index f7d40d8e280..3f3cad8ddc3 100644 --- a/extensions/feishu/src/monitor.account.ts +++ b/extensions/feishu/src/monitor.account.ts @@ -12,10 +12,10 @@ import { import { handleFeishuCardAction, type FeishuCardActionEvent } from "./card-action.js"; import { createEventDispatcher } from "./client.js"; import { - hasRecordedMessage, - hasRecordedMessagePersistent, - tryRecordMessage, - tryRecordMessagePersistent, + hasProcessedFeishuMessage, + recordProcessedFeishuMessage, + releaseFeishuMessageProcessing, + tryBeginFeishuMessageProcessing, warmupDedupFromDisk, } from "./dedup.js"; import { isMentionForwardRequest } from "./mention.js"; @@ -264,6 +264,7 @@ function registerEventHandlers( runtime, chatHistories, accountId, + processingClaimHeld: true, }); await enqueue(chatId, task); }; @@ -291,10 +292,8 @@ function registerEventHandlers( return; } for (const messageId of suppressedIds) { - // Keep in-memory dedupe in sync with handleFeishuMessage's keying. - tryRecordMessage(`${accountId}:${messageId}`); try { - await tryRecordMessagePersistent(messageId, accountId, log); + await recordProcessedFeishuMessage(messageId, accountId, log); } catch (err) { error( `feishu[${accountId}]: failed to record merged dedupe id ${messageId}: ${String(err)}`, @@ -303,15 +302,7 @@ function registerEventHandlers( } }; const isMessageAlreadyProcessed = async (entry: FeishuMessageEvent): Promise => { - const messageId = entry.message.message_id?.trim(); - if (!messageId) { - return false; - } - const memoryKey = `${accountId}:${messageId}`; - if (hasRecordedMessage(memoryKey)) { - return true; - } - return hasRecordedMessagePersistent(messageId, accountId, log); + return await hasProcessedFeishuMessage(entry.message.message_id, accountId, log); }; const inboundDebouncer = core.channel.debounce.createInboundDebouncer({ debounceMs: inboundDebounceMs, @@ -384,19 +375,28 @@ function registerEventHandlers( }, }); }, - onError: (err) => { + onError: (err, entries) => { + for (const entry of entries) { + releaseFeishuMessageProcessing(entry.message.message_id, accountId); + } error(`feishu[${accountId}]: inbound debounce flush failed: ${String(err)}`); }, }); eventDispatcher.register({ "im.message.receive_v1": async (data) => { + const event = data as unknown as FeishuMessageEvent; + const messageId = event.message?.message_id?.trim(); + if (!tryBeginFeishuMessageProcessing(messageId, accountId)) { + log(`feishu[${accountId}]: dropping duplicate event for message ${messageId}`); + return; + } const processMessage = async () => { - const event = data as unknown as FeishuMessageEvent; await inboundDebouncer.enqueue(event); }; if (fireAndForget) { void processMessage().catch((err) => { + releaseFeishuMessageProcessing(messageId, accountId); error(`feishu[${accountId}]: error handling message: ${String(err)}`); }); return; @@ -404,6 +404,7 @@ function registerEventHandlers( try { await processMessage(); } catch (err) { + releaseFeishuMessageProcessing(messageId, accountId); error(`feishu[${accountId}]: error handling message: ${String(err)}`); } }, diff --git a/extensions/feishu/src/monitor.reaction.test.ts b/extensions/feishu/src/monitor.reaction.test.ts index 6d3f64a32d0..49da928ea3b 100644 --- a/extensions/feishu/src/monitor.reaction.test.ts +++ b/extensions/feishu/src/monitor.reaction.test.ts @@ -212,10 +212,9 @@ function expectParsedFirstDispatchedEvent(botOpenId = "ou_bot") { } function setDedupPassThroughMocks(): void { - vi.spyOn(dedup, "tryRecordMessage").mockReturnValue(true); - vi.spyOn(dedup, "tryRecordMessagePersistent").mockResolvedValue(true); - vi.spyOn(dedup, "hasRecordedMessage").mockReturnValue(false); - vi.spyOn(dedup, "hasRecordedMessagePersistent").mockResolvedValue(false); + vi.spyOn(dedup, "tryBeginFeishuMessageProcessing").mockReturnValue(true); + vi.spyOn(dedup, "recordProcessedFeishuMessage").mockResolvedValue(true); + vi.spyOn(dedup, "hasProcessedFeishuMessage").mockResolvedValue(false); } function createMention(params: { openId: string; name: string; key?: string }): FeishuMention { @@ -236,8 +235,7 @@ async function enqueueDebouncedMessage( } function setStaleRetryMocks(messageId = "om_old") { - vi.spyOn(dedup, "hasRecordedMessage").mockImplementation((key) => key.endsWith(`:${messageId}`)); - vi.spyOn(dedup, "hasRecordedMessagePersistent").mockImplementation( + vi.spyOn(dedup, "hasProcessedFeishuMessage").mockImplementation( async (currentMessageId) => currentMessageId === messageId, ); } @@ -475,10 +473,9 @@ describe("Feishu inbound debounce regressions", () => { }); it("passes prefetched botName through to handleFeishuMessage", async () => { - vi.spyOn(dedup, "tryRecordMessage").mockReturnValue(true); - vi.spyOn(dedup, "tryRecordMessagePersistent").mockResolvedValue(true); - vi.spyOn(dedup, "hasRecordedMessage").mockReturnValue(false); - vi.spyOn(dedup, "hasRecordedMessagePersistent").mockResolvedValue(false); + vi.spyOn(dedup, "tryBeginFeishuMessageProcessing").mockReturnValue(true); + vi.spyOn(dedup, "recordProcessedFeishuMessage").mockResolvedValue(true); + vi.spyOn(dedup, "hasProcessedFeishuMessage").mockResolvedValue(false); const onMessage = await setupDebounceMonitor({ botName: "OpenClaw Bot" }); await onMessage( @@ -560,8 +557,8 @@ describe("Feishu inbound debounce regressions", () => { }); it("excludes previously processed retries from combined debounce text", async () => { - vi.spyOn(dedup, "tryRecordMessage").mockReturnValue(true); - vi.spyOn(dedup, "tryRecordMessagePersistent").mockResolvedValue(true); + vi.spyOn(dedup, "tryBeginFeishuMessageProcessing").mockReturnValue(true); + vi.spyOn(dedup, "recordProcessedFeishuMessage").mockResolvedValue(true); setStaleRetryMocks(); const onMessage = await setupDebounceMonitor(); @@ -586,8 +583,8 @@ describe("Feishu inbound debounce regressions", () => { }); it("uses latest fresh message id when debounce batch ends with stale retry", async () => { - const recordSpy = vi.spyOn(dedup, "tryRecordMessage").mockReturnValue(true); - vi.spyOn(dedup, "tryRecordMessagePersistent").mockResolvedValue(true); + vi.spyOn(dedup, "tryBeginFeishuMessageProcessing").mockReturnValue(true); + const recordSpy = vi.spyOn(dedup, "recordProcessedFeishuMessage").mockResolvedValue(true); setStaleRetryMocks(); const onMessage = await setupDebounceMonitor(); @@ -603,7 +600,54 @@ describe("Feishu inbound debounce regressions", () => { expect(dispatched.message.message_id).toBe("om_new"); const combined = JSON.parse(dispatched.message.content) as { text?: string }; expect(combined.text).toBe("fresh"); - expect(recordSpy).toHaveBeenCalledWith("default:om_old"); - expect(recordSpy).not.toHaveBeenCalledWith("default:om_new"); + expect(recordSpy).toHaveBeenCalledWith("om_old", "default", expect.any(Function)); + expect(recordSpy).not.toHaveBeenCalledWith("om_new", "default", expect.any(Function)); + }); + + it("releases early event dedupe when debounced dispatch fails", async () => { + setDedupPassThroughMocks(); + const enqueueMock = vi.fn(); + setFeishuRuntime( + createPluginRuntimeMock({ + channel: { + debounce: { + createInboundDebouncer: (params: { + onError?: (err: unknown, items: T[]) => void; + }) => ({ + enqueue: async (item: T) => { + enqueueMock(item); + params.onError?.(new Error("dispatch failed"), [item]); + }, + flushKey: async () => {}, + }), + resolveInboundDebounceMs, + }, + text: { + hasControlCommand, + }, + }, + }), + ); + const onMessage = await setupDebounceMonitor(); + const event = createTextEvent({ messageId: "om_retryable", text: "hello" }); + + await enqueueDebouncedMessage(onMessage, event); + expect(enqueueMock).toHaveBeenCalledTimes(1); + + await enqueueDebouncedMessage(onMessage, event); + expect(enqueueMock).toHaveBeenCalledTimes(2); + expect(handleFeishuMessageMock).not.toHaveBeenCalled(); + }); + + it("drops duplicate inbound events before they re-enter the debounce pipeline", async () => { + const onMessage = await setupDebounceMonitor(); + const event = createTextEvent({ messageId: "om_duplicate", text: "hello" }); + + await enqueueDebouncedMessage(onMessage, event); + await vi.advanceTimersByTimeAsync(25); + await enqueueDebouncedMessage(onMessage, event); + await vi.advanceTimersByTimeAsync(25); + + expect(handleFeishuMessageMock).toHaveBeenCalledTimes(1); }); }); diff --git a/src/infra/dedupe.ts b/src/infra/dedupe.ts index 2103d74c19c..0e609836542 100644 --- a/src/infra/dedupe.ts +++ b/src/infra/dedupe.ts @@ -3,6 +3,7 @@ import { pruneMapToMaxSize } from "./map-size.js"; export type DedupeCache = { check: (key: string | undefined | null, now?: number) => boolean; peek: (key: string | undefined | null, now?: number) => boolean; + delete: (key: string | undefined | null) => void; clear: () => void; size: () => number; }; @@ -71,6 +72,12 @@ export function createDedupeCache(options: DedupeCacheOptions): DedupeCache { } return hasUnexpired(key, now, false); }, + delete: (key) => { + if (!key) { + return; + } + cache.delete(key); + }, clear: () => { cache.clear(); },