fix(feishu): add early event-level dedup to prevent duplicate replies (#43762)

* fix(feishu): add early event-level dedup to prevent duplicate replies

Add synchronous in-memory dedup at EventDispatcher handler level using
message_id as key with 5-minute TTL and 2000-entry cap.

This catches duplicate events immediately when they arrive from the Lark
SDK — before the inbound debouncer or processing queue — preventing the
race condition where two concurrent dispatches enter the pipeline before
either records the messageId in the downstream dedup layer.

Fixes the root cause reported in #42687.

* fix(feishu): correct inverted dedup condition

check() returns false on first call (new key) and true on subsequent
calls (duplicate). The previous `!check()` guard was inverted —
dropping every first delivery and passing all duplicates.

Remove the negation so the guard correctly drops duplicates.

* fix(feishu): simplify eventDedup key — drop redundant accountId prefix

eventDedup is already scoped per account (one instance per
registerEventHandlers call), so the accountId prefix in the cache key
is redundant. Use `evt:${messageId}` instead.

* fix(feishu): share inbound processing claim dedupe

---------

Co-authored-by: Tak Hoffman <781889+Takhoffman@users.noreply.github.com>
This commit is contained in:
yunweibang 2026-03-14 11:37:40 +08:00 committed by GitHub
parent 2659fc6c97
commit f4a2bbe0c9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 210 additions and 45 deletions

View File

@ -139,6 +139,7 @@ Docs: https://docs.openclaw.ai
- Plugins/env-scoped roots: fix plugin discovery/load caches and provenance tracking so same-process `HOME`/`OPENCLAW_HOME` changes no longer reuse stale plugin state or misreport `~/...` plugins as untracked. (#44046) thanks @gumadeiras. - Plugins/env-scoped roots: fix plugin discovery/load caches and provenance tracking so same-process `HOME`/`OPENCLAW_HOME` changes no longer reuse stale plugin state or misreport `~/...` plugins as untracked. (#44046) thanks @gumadeiras.
- Gateway/session discovery: discover disk-only and retired ACP session stores under custom templated `session.store` roots so ACP reconciliation, session-id/session-label targeting, and run-id fallback keep working after restart. (#44176) thanks @gumadeiras. - Gateway/session discovery: discover disk-only and retired ACP session stores under custom templated `session.store` roots so ACP reconciliation, session-id/session-label targeting, and run-id fallback keep working after restart. (#44176) thanks @gumadeiras.
- Models/OpenRouter native ids: canonicalize native OpenRouter model keys across config writes, runtime lookups, fallback management, and `models list --plain`, and migrate legacy duplicated `openrouter/openrouter/...` config entries forward on write. - Models/OpenRouter native ids: canonicalize native OpenRouter model keys across config writes, runtime lookups, fallback management, and `models list --plain`, and migrate legacy duplicated `openrouter/openrouter/...` config entries forward on write.
- Feishu/event dedupe: keep early duplicate suppression aligned with the shared Feishu message-id contract and release the pre-queue dedupe marker after failed dispatch so retried events can recover instead of being dropped until the short TTL expires. (#43762) Thanks @yunweibang.
- Gateway/hooks: bucket hook auth failures by forwarded client IP behind trusted proxies and warn when `hooks.allowedAgentIds` leaves hook routing unrestricted. - Gateway/hooks: bucket hook auth failures by forwarded client IP behind trusted proxies and warn when `hooks.allowedAgentIds` leaves hook routing unrestricted.
- Agents/compaction: skip the post-compaction `cache-ttl` marker write when a compaction completed in the same attempt, preventing the next turn from immediately triggering a second tiny compaction. (#28548) thanks @MoerAI. - Agents/compaction: skip the post-compaction `cache-ttl` marker write when a compaction completed in the same attempt, preventing the next turn from immediately triggering a second tiny compaction. (#28548) thanks @MoerAI.
- Native chat/macOS: add `/new`, `/reset`, and `/clear` reset triggers, keep shared main-session aliases aligned, and ignore stale model-selection completions so native chat state stays in sync across reset and fast model changes. (#10898) Thanks @Nachx639. - Native chat/macOS: add `/new`, `/reset`, and `/clear` reset triggers, keep shared main-session aliases aligned, and ignore stale model-selection completions so native chat state stays in sync across reset and fast model changes. (#10898) Thanks @Nachx639.

View File

@ -15,7 +15,7 @@ import {
} from "openclaw/plugin-sdk/feishu"; } from "openclaw/plugin-sdk/feishu";
import { resolveFeishuAccount } from "./accounts.js"; import { resolveFeishuAccount } from "./accounts.js";
import { createFeishuClient } from "./client.js"; import { createFeishuClient } from "./client.js";
import { tryRecordMessage, tryRecordMessagePersistent } from "./dedup.js"; import { finalizeFeishuMessageProcessing, tryRecordMessagePersistent } from "./dedup.js";
import { maybeCreateDynamicAgent } from "./dynamic-agent.js"; import { maybeCreateDynamicAgent } from "./dynamic-agent.js";
import { normalizeFeishuExternalKey } from "./external-keys.js"; import { normalizeFeishuExternalKey } from "./external-keys.js";
import { downloadMessageResourceFeishu } from "./media.js"; import { downloadMessageResourceFeishu } from "./media.js";
@ -867,8 +867,18 @@ export async function handleFeishuMessage(params: {
runtime?: RuntimeEnv; runtime?: RuntimeEnv;
chatHistories?: Map<string, HistoryEntry[]>; chatHistories?: Map<string, HistoryEntry[]>;
accountId?: string; accountId?: string;
processingClaimHeld?: boolean;
}): Promise<void> { }): Promise<void> {
const { cfg, event, botOpenId, botName, runtime, chatHistories, accountId } = params; const {
cfg,
event,
botOpenId,
botName,
runtime,
chatHistories,
accountId,
processingClaimHeld = false,
} = params;
// Resolve account with merged config // Resolve account with merged config
const account = resolveFeishuAccount({ cfg, accountId }); const account = resolveFeishuAccount({ cfg, accountId });
@ -877,16 +887,15 @@ export async function handleFeishuMessage(params: {
const log = runtime?.log ?? console.log; const log = runtime?.log ?? console.log;
const error = runtime?.error ?? console.error; const error = runtime?.error ?? console.error;
// Dedup: synchronous memory guard prevents concurrent duplicate dispatch
// before the async persistent check completes.
const messageId = event.message.message_id; const messageId = event.message.message_id;
const memoryDedupeKey = `${account.accountId}:${messageId}`; if (
if (!tryRecordMessage(memoryDedupeKey)) { !(await finalizeFeishuMessageProcessing({
log(`feishu: skipping duplicate message ${messageId} (memory dedup)`); messageId,
return; namespace: account.accountId,
} log,
// Persistent dedup survives restarts and reconnects. claimHeld: processingClaimHeld,
if (!(await tryRecordMessagePersistent(messageId, account.accountId, log))) { }))
) {
log(`feishu: skipping duplicate message ${messageId}`); log(`feishu: skipping duplicate message ${messageId}`);
return; return;
} }

View File

@ -10,9 +10,15 @@ import {
const DEDUP_TTL_MS = 24 * 60 * 60 * 1000; const DEDUP_TTL_MS = 24 * 60 * 60 * 1000;
const MEMORY_MAX_SIZE = 1_000; const MEMORY_MAX_SIZE = 1_000;
const FILE_MAX_ENTRIES = 10_000; const FILE_MAX_ENTRIES = 10_000;
const EVENT_DEDUP_TTL_MS = 5 * 60 * 1000;
const EVENT_MEMORY_MAX_SIZE = 2_000;
type PersistentDedupeData = Record<string, number>; type PersistentDedupeData = Record<string, number>;
const memoryDedupe = createDedupeCache({ ttlMs: DEDUP_TTL_MS, maxSize: MEMORY_MAX_SIZE }); const memoryDedupe = createDedupeCache({ ttlMs: DEDUP_TTL_MS, maxSize: MEMORY_MAX_SIZE });
const processingClaims = createDedupeCache({
ttlMs: EVENT_DEDUP_TTL_MS,
maxSize: EVENT_MEMORY_MAX_SIZE,
});
function resolveStateDirFromEnv(env: NodeJS.ProcessEnv = process.env): string { function resolveStateDirFromEnv(env: NodeJS.ProcessEnv = process.env): string {
const stateOverride = env.OPENCLAW_STATE_DIR?.trim() || env.CLAWDBOT_STATE_DIR?.trim(); const stateOverride = env.OPENCLAW_STATE_DIR?.trim() || env.CLAWDBOT_STATE_DIR?.trim();
@ -37,6 +43,103 @@ const persistentDedupe = createPersistentDedupe({
resolveFilePath: resolveNamespaceFilePath, resolveFilePath: resolveNamespaceFilePath,
}); });
function resolveEventDedupeKey(
namespace: string,
messageId: string | undefined | null,
): string | null {
const trimmed = messageId?.trim();
if (!trimmed) {
return null;
}
return `${namespace}:${trimmed}`;
}
function normalizeMessageId(messageId: string | undefined | null): string | null {
const trimmed = messageId?.trim();
return trimmed ? trimmed : null;
}
function resolveMemoryDedupeKey(
namespace: string,
messageId: string | undefined | null,
): string | null {
const trimmed = normalizeMessageId(messageId);
if (!trimmed) {
return null;
}
return `${namespace}:${trimmed}`;
}
export function tryBeginFeishuMessageProcessing(
messageId: string | undefined | null,
namespace = "global",
): boolean {
return !processingClaims.check(resolveEventDedupeKey(namespace, messageId));
}
export function releaseFeishuMessageProcessing(
messageId: string | undefined | null,
namespace = "global",
): void {
processingClaims.delete(resolveEventDedupeKey(namespace, messageId));
}
export async function finalizeFeishuMessageProcessing(params: {
messageId: string | undefined | null;
namespace?: string;
log?: (...args: unknown[]) => void;
claimHeld?: boolean;
}): Promise<boolean> {
const { messageId, namespace = "global", log, claimHeld = false } = params;
const normalizedMessageId = normalizeMessageId(messageId);
const memoryKey = resolveMemoryDedupeKey(namespace, messageId);
if (!memoryKey || !normalizedMessageId) {
return false;
}
if (!claimHeld && !tryBeginFeishuMessageProcessing(normalizedMessageId, namespace)) {
return false;
}
if (!tryRecordMessage(memoryKey)) {
releaseFeishuMessageProcessing(normalizedMessageId, namespace);
return false;
}
if (!(await tryRecordMessagePersistent(normalizedMessageId, namespace, log))) {
releaseFeishuMessageProcessing(normalizedMessageId, namespace);
return false;
}
return true;
}
export async function recordProcessedFeishuMessage(
messageId: string | undefined | null,
namespace = "global",
log?: (...args: unknown[]) => void,
): Promise<boolean> {
const normalizedMessageId = normalizeMessageId(messageId);
const memoryKey = resolveMemoryDedupeKey(namespace, messageId);
if (!memoryKey || !normalizedMessageId) {
return false;
}
tryRecordMessage(memoryKey);
return await tryRecordMessagePersistent(normalizedMessageId, namespace, log);
}
export async function hasProcessedFeishuMessage(
messageId: string | undefined | null,
namespace = "global",
log?: (...args: unknown[]) => void,
): Promise<boolean> {
const normalizedMessageId = normalizeMessageId(messageId);
const memoryKey = resolveMemoryDedupeKey(namespace, messageId);
if (!memoryKey || !normalizedMessageId) {
return false;
}
if (hasRecordedMessage(memoryKey)) {
return true;
}
return hasRecordedMessagePersistent(normalizedMessageId, namespace, log);
}
/** /**
* Synchronous dedup memory only. * Synchronous dedup memory only.
* Kept for backward compatibility; prefer {@link tryRecordMessagePersistent}. * Kept for backward compatibility; prefer {@link tryRecordMessagePersistent}.

View File

@ -12,10 +12,10 @@ import {
import { handleFeishuCardAction, type FeishuCardActionEvent } from "./card-action.js"; import { handleFeishuCardAction, type FeishuCardActionEvent } from "./card-action.js";
import { createEventDispatcher } from "./client.js"; import { createEventDispatcher } from "./client.js";
import { import {
hasRecordedMessage, hasProcessedFeishuMessage,
hasRecordedMessagePersistent, recordProcessedFeishuMessage,
tryRecordMessage, releaseFeishuMessageProcessing,
tryRecordMessagePersistent, tryBeginFeishuMessageProcessing,
warmupDedupFromDisk, warmupDedupFromDisk,
} from "./dedup.js"; } from "./dedup.js";
import { isMentionForwardRequest } from "./mention.js"; import { isMentionForwardRequest } from "./mention.js";
@ -264,6 +264,7 @@ function registerEventHandlers(
runtime, runtime,
chatHistories, chatHistories,
accountId, accountId,
processingClaimHeld: true,
}); });
await enqueue(chatId, task); await enqueue(chatId, task);
}; };
@ -291,10 +292,8 @@ function registerEventHandlers(
return; return;
} }
for (const messageId of suppressedIds) { for (const messageId of suppressedIds) {
// Keep in-memory dedupe in sync with handleFeishuMessage's keying.
tryRecordMessage(`${accountId}:${messageId}`);
try { try {
await tryRecordMessagePersistent(messageId, accountId, log); await recordProcessedFeishuMessage(messageId, accountId, log);
} catch (err) { } catch (err) {
error( error(
`feishu[${accountId}]: failed to record merged dedupe id ${messageId}: ${String(err)}`, `feishu[${accountId}]: failed to record merged dedupe id ${messageId}: ${String(err)}`,
@ -303,15 +302,7 @@ function registerEventHandlers(
} }
}; };
const isMessageAlreadyProcessed = async (entry: FeishuMessageEvent): Promise<boolean> => { const isMessageAlreadyProcessed = async (entry: FeishuMessageEvent): Promise<boolean> => {
const messageId = entry.message.message_id?.trim(); return await hasProcessedFeishuMessage(entry.message.message_id, accountId, log);
if (!messageId) {
return false;
}
const memoryKey = `${accountId}:${messageId}`;
if (hasRecordedMessage(memoryKey)) {
return true;
}
return hasRecordedMessagePersistent(messageId, accountId, log);
}; };
const inboundDebouncer = core.channel.debounce.createInboundDebouncer<FeishuMessageEvent>({ const inboundDebouncer = core.channel.debounce.createInboundDebouncer<FeishuMessageEvent>({
debounceMs: inboundDebounceMs, debounceMs: inboundDebounceMs,
@ -384,19 +375,28 @@ function registerEventHandlers(
}, },
}); });
}, },
onError: (err) => { onError: (err, entries) => {
for (const entry of entries) {
releaseFeishuMessageProcessing(entry.message.message_id, accountId);
}
error(`feishu[${accountId}]: inbound debounce flush failed: ${String(err)}`); error(`feishu[${accountId}]: inbound debounce flush failed: ${String(err)}`);
}, },
}); });
eventDispatcher.register({ eventDispatcher.register({
"im.message.receive_v1": async (data) => { "im.message.receive_v1": async (data) => {
const event = data as unknown as FeishuMessageEvent;
const messageId = event.message?.message_id?.trim();
if (!tryBeginFeishuMessageProcessing(messageId, accountId)) {
log(`feishu[${accountId}]: dropping duplicate event for message ${messageId}`);
return;
}
const processMessage = async () => { const processMessage = async () => {
const event = data as unknown as FeishuMessageEvent;
await inboundDebouncer.enqueue(event); await inboundDebouncer.enqueue(event);
}; };
if (fireAndForget) { if (fireAndForget) {
void processMessage().catch((err) => { void processMessage().catch((err) => {
releaseFeishuMessageProcessing(messageId, accountId);
error(`feishu[${accountId}]: error handling message: ${String(err)}`); error(`feishu[${accountId}]: error handling message: ${String(err)}`);
}); });
return; return;
@ -404,6 +404,7 @@ function registerEventHandlers(
try { try {
await processMessage(); await processMessage();
} catch (err) { } catch (err) {
releaseFeishuMessageProcessing(messageId, accountId);
error(`feishu[${accountId}]: error handling message: ${String(err)}`); error(`feishu[${accountId}]: error handling message: ${String(err)}`);
} }
}, },

View File

@ -212,10 +212,9 @@ function expectParsedFirstDispatchedEvent(botOpenId = "ou_bot") {
} }
function setDedupPassThroughMocks(): void { function setDedupPassThroughMocks(): void {
vi.spyOn(dedup, "tryRecordMessage").mockReturnValue(true); vi.spyOn(dedup, "tryBeginFeishuMessageProcessing").mockReturnValue(true);
vi.spyOn(dedup, "tryRecordMessagePersistent").mockResolvedValue(true); vi.spyOn(dedup, "recordProcessedFeishuMessage").mockResolvedValue(true);
vi.spyOn(dedup, "hasRecordedMessage").mockReturnValue(false); vi.spyOn(dedup, "hasProcessedFeishuMessage").mockResolvedValue(false);
vi.spyOn(dedup, "hasRecordedMessagePersistent").mockResolvedValue(false);
} }
function createMention(params: { openId: string; name: string; key?: string }): FeishuMention { function createMention(params: { openId: string; name: string; key?: string }): FeishuMention {
@ -236,8 +235,7 @@ async function enqueueDebouncedMessage(
} }
function setStaleRetryMocks(messageId = "om_old") { function setStaleRetryMocks(messageId = "om_old") {
vi.spyOn(dedup, "hasRecordedMessage").mockImplementation((key) => key.endsWith(`:${messageId}`)); vi.spyOn(dedup, "hasProcessedFeishuMessage").mockImplementation(
vi.spyOn(dedup, "hasRecordedMessagePersistent").mockImplementation(
async (currentMessageId) => currentMessageId === messageId, async (currentMessageId) => currentMessageId === messageId,
); );
} }
@ -475,10 +473,9 @@ describe("Feishu inbound debounce regressions", () => {
}); });
it("passes prefetched botName through to handleFeishuMessage", async () => { it("passes prefetched botName through to handleFeishuMessage", async () => {
vi.spyOn(dedup, "tryRecordMessage").mockReturnValue(true); vi.spyOn(dedup, "tryBeginFeishuMessageProcessing").mockReturnValue(true);
vi.spyOn(dedup, "tryRecordMessagePersistent").mockResolvedValue(true); vi.spyOn(dedup, "recordProcessedFeishuMessage").mockResolvedValue(true);
vi.spyOn(dedup, "hasRecordedMessage").mockReturnValue(false); vi.spyOn(dedup, "hasProcessedFeishuMessage").mockResolvedValue(false);
vi.spyOn(dedup, "hasRecordedMessagePersistent").mockResolvedValue(false);
const onMessage = await setupDebounceMonitor({ botName: "OpenClaw Bot" }); const onMessage = await setupDebounceMonitor({ botName: "OpenClaw Bot" });
await onMessage( await onMessage(
@ -560,8 +557,8 @@ describe("Feishu inbound debounce regressions", () => {
}); });
it("excludes previously processed retries from combined debounce text", async () => { it("excludes previously processed retries from combined debounce text", async () => {
vi.spyOn(dedup, "tryRecordMessage").mockReturnValue(true); vi.spyOn(dedup, "tryBeginFeishuMessageProcessing").mockReturnValue(true);
vi.spyOn(dedup, "tryRecordMessagePersistent").mockResolvedValue(true); vi.spyOn(dedup, "recordProcessedFeishuMessage").mockResolvedValue(true);
setStaleRetryMocks(); setStaleRetryMocks();
const onMessage = await setupDebounceMonitor(); const onMessage = await setupDebounceMonitor();
@ -586,8 +583,8 @@ describe("Feishu inbound debounce regressions", () => {
}); });
it("uses latest fresh message id when debounce batch ends with stale retry", async () => { it("uses latest fresh message id when debounce batch ends with stale retry", async () => {
const recordSpy = vi.spyOn(dedup, "tryRecordMessage").mockReturnValue(true); vi.spyOn(dedup, "tryBeginFeishuMessageProcessing").mockReturnValue(true);
vi.spyOn(dedup, "tryRecordMessagePersistent").mockResolvedValue(true); const recordSpy = vi.spyOn(dedup, "recordProcessedFeishuMessage").mockResolvedValue(true);
setStaleRetryMocks(); setStaleRetryMocks();
const onMessage = await setupDebounceMonitor(); const onMessage = await setupDebounceMonitor();
@ -603,7 +600,54 @@ describe("Feishu inbound debounce regressions", () => {
expect(dispatched.message.message_id).toBe("om_new"); expect(dispatched.message.message_id).toBe("om_new");
const combined = JSON.parse(dispatched.message.content) as { text?: string }; const combined = JSON.parse(dispatched.message.content) as { text?: string };
expect(combined.text).toBe("fresh"); expect(combined.text).toBe("fresh");
expect(recordSpy).toHaveBeenCalledWith("default:om_old"); expect(recordSpy).toHaveBeenCalledWith("om_old", "default", expect.any(Function));
expect(recordSpy).not.toHaveBeenCalledWith("default:om_new"); expect(recordSpy).not.toHaveBeenCalledWith("om_new", "default", expect.any(Function));
});
it("releases early event dedupe when debounced dispatch fails", async () => {
setDedupPassThroughMocks();
const enqueueMock = vi.fn();
setFeishuRuntime(
createPluginRuntimeMock({
channel: {
debounce: {
createInboundDebouncer: <T>(params: {
onError?: (err: unknown, items: T[]) => void;
}) => ({
enqueue: async (item: T) => {
enqueueMock(item);
params.onError?.(new Error("dispatch failed"), [item]);
},
flushKey: async () => {},
}),
resolveInboundDebounceMs,
},
text: {
hasControlCommand,
},
},
}),
);
const onMessage = await setupDebounceMonitor();
const event = createTextEvent({ messageId: "om_retryable", text: "hello" });
await enqueueDebouncedMessage(onMessage, event);
expect(enqueueMock).toHaveBeenCalledTimes(1);
await enqueueDebouncedMessage(onMessage, event);
expect(enqueueMock).toHaveBeenCalledTimes(2);
expect(handleFeishuMessageMock).not.toHaveBeenCalled();
});
it("drops duplicate inbound events before they re-enter the debounce pipeline", async () => {
const onMessage = await setupDebounceMonitor();
const event = createTextEvent({ messageId: "om_duplicate", text: "hello" });
await enqueueDebouncedMessage(onMessage, event);
await vi.advanceTimersByTimeAsync(25);
await enqueueDebouncedMessage(onMessage, event);
await vi.advanceTimersByTimeAsync(25);
expect(handleFeishuMessageMock).toHaveBeenCalledTimes(1);
}); });
}); });

View File

@ -3,6 +3,7 @@ import { pruneMapToMaxSize } from "./map-size.js";
export type DedupeCache = { export type DedupeCache = {
check: (key: string | undefined | null, now?: number) => boolean; check: (key: string | undefined | null, now?: number) => boolean;
peek: (key: string | undefined | null, now?: number) => boolean; peek: (key: string | undefined | null, now?: number) => boolean;
delete: (key: string | undefined | null) => void;
clear: () => void; clear: () => void;
size: () => number; size: () => number;
}; };
@ -71,6 +72,12 @@ export function createDedupeCache(options: DedupeCacheOptions): DedupeCache {
} }
return hasUnexpired(key, now, false); return hasUnexpired(key, now, false);
}, },
delete: (key) => {
if (!key) {
return;
}
cache.delete(key);
},
clear: () => { clear: () => {
cache.clear(); cache.clear();
}, },