fix: dedupe inbound replies per agent

This commit is contained in:
Ayaan Zaidi 2026-03-09 08:57:56 +05:30
parent 8befd88119
commit 2e8bb57c6b
4 changed files with 79 additions and 4 deletions

View File

@ -29,6 +29,7 @@ Docs: https://docs.openclaw.ai
- Agents/failover: detect Amazon Bedrock `Too many tokens per day` quota errors as rate limits across fallback, cron retry, and memory embeddings while keeping context-window `too many tokens per request` errors out of the rate-limit lane. (#39377) Thanks @gambletan.
- Android/Play distribution: remove self-update, background location, `screen.record`, and background mic capture from the Android app, narrow the foreground service to `dataSync` only, and clean up the legacy `location.enabledMode=always` preference migration. (#39660) Thanks @obviyus.
- Telegram/DM partial streaming: keep DM preview lanes on real message edits instead of native draft materialization so final replies no longer flash a second duplicate copy before collapsing back to one.
- Telegram/DM routing: dedupe inbound Telegram DMs per agent instead of per session key so the same DM cannot trigger duplicate replies when both `agent:main:main` and `agent:main:telegram:direct:<id>` resolve for one agent. Fixes #40005. Supersedes #40116.
- macOS overlays: fix VoiceWake, Talk, and Notify overlay exclusivity crashes by removing shared `inout` visibility mutation from `OverlayPanelFactory.present`, and add a repeated Talk overlay smoke test. (#39275, #39321) Thanks @fellanH.
- macOS Talk Mode: set the speech recognition request `taskHint` to `.dictation` for mic capture, and add regression coverage for the request defaults. (#38445) Thanks @dmiv.
- macOS release packaging: default `scripts/package-mac-app.sh` to universal binaries for `BUILD_CONFIG=release`, and clarify that `scripts/package-mac-dist.sh` already produces the release zip + DMG. (#33891) Thanks @cgdusek.

View File

@ -236,7 +236,7 @@ describe("inbound dedupe", () => {
).toBe(false);
});
it("does not dedupe across session keys", () => {
it("does not dedupe across agent ids", () => {
resetInboundDedupe();
const base: MsgContext = {
Provider: "whatsapp",
@ -248,12 +248,36 @@ describe("inbound dedupe", () => {
shouldSkipDuplicateInbound({ ...base, SessionKey: "agent:alpha:main" }, { now: 100 }),
).toBe(false);
expect(
shouldSkipDuplicateInbound({ ...base, SessionKey: "agent:bravo:main" }, { now: 200 }),
shouldSkipDuplicateInbound(
{ ...base, SessionKey: "agent:bravo:whatsapp:direct:+1555" },
{
now: 200,
},
),
).toBe(false);
expect(
shouldSkipDuplicateInbound({ ...base, SessionKey: "agent:alpha:main" }, { now: 300 }),
).toBe(true);
});
it("dedupes when the same agent sees the same inbound message under different session keys", () => {
resetInboundDedupe();
const base: MsgContext = {
Provider: "telegram",
OriginatingChannel: "telegram",
OriginatingTo: "telegram:7463849194",
MessageSid: "msg-1",
};
expect(
shouldSkipDuplicateInbound({ ...base, SessionKey: "agent:main:main" }, { now: 100 }),
).toBe(false);
expect(
shouldSkipDuplicateInbound(
{ ...base, SessionKey: "agent:main:telegram:direct:7463849194" },
{ now: 200 },
),
).toBe(true);
});
});
describe("createInboundDebouncer", () => {

View File

@ -1539,6 +1539,38 @@ describe("dispatchReplyFromConfig", () => {
expect(replyResolver).toHaveBeenCalledTimes(1);
});
it("deduplicates same-agent inbound replies across main and direct session keys", async () => {
setNoAbort();
const cfg = emptyConfig;
const replyResolver = vi.fn(async () => ({ text: "hi" }) as ReplyPayload);
const baseCtx = buildTestCtx({
Provider: "telegram",
Surface: "telegram",
OriginatingChannel: "telegram",
OriginatingTo: "telegram:7463849194",
MessageSid: "msg-1",
SessionKey: "agent:main:main",
});
await dispatchReplyFromConfig({
ctx: baseCtx,
cfg,
dispatcher: createDispatcher(),
replyResolver,
});
await dispatchReplyFromConfig({
ctx: {
...baseCtx,
SessionKey: "agent:main:telegram:direct:7463849194",
},
cfg,
dispatcher: createDispatcher(),
replyResolver,
});
expect(replyResolver).toHaveBeenCalledTimes(1);
});
it("emits message_received hook with originating channel metadata", async () => {
setNoAbort();
hookMocks.runner.hasHooks.mockReturnValue(true);

View File

@ -1,5 +1,6 @@
import { logVerbose, shouldLogVerbose } from "../../globals.js";
import { createDedupeCache, type DedupeCache } from "../../infra/dedupe.js";
import { parseAgentSessionKey } from "../../sessions/session-key-utils.js";
import type { MsgContext } from "../templating.js";
const DEFAULT_INBOUND_DEDUPE_TTL_MS = 20 * 60_000;
@ -15,6 +16,23 @@ const normalizeProvider = (value?: string | null) => value?.trim().toLowerCase()
const resolveInboundPeerId = (ctx: MsgContext) =>
ctx.OriginatingTo ?? ctx.To ?? ctx.From ?? ctx.SessionKey;
function resolveInboundDedupeSessionScope(ctx: MsgContext): string {
const sessionKey =
(ctx.CommandSource === "native" ? ctx.CommandTargetSessionKey : undefined)?.trim() ||
ctx.SessionKey?.trim() ||
"";
if (!sessionKey) {
return "";
}
const parsed = parseAgentSessionKey(sessionKey);
if (!parsed) {
return sessionKey;
}
// The same physical inbound message should never run twice for the same
// agent, even if a routing bug presents it under both main and direct keys.
return `agent:${parsed.agentId}`;
}
export function buildInboundDedupeKey(ctx: MsgContext): string | null {
const provider = normalizeProvider(ctx.OriginatingChannel ?? ctx.Provider ?? ctx.Surface);
const messageId = ctx.MessageSid?.trim();
@ -25,13 +43,13 @@ export function buildInboundDedupeKey(ctx: MsgContext): string | null {
if (!peerId) {
return null;
}
const sessionKey = ctx.SessionKey?.trim() ?? "";
const sessionScope = resolveInboundDedupeSessionScope(ctx);
const accountId = ctx.AccountId?.trim() ?? "";
const threadId =
ctx.MessageThreadId !== undefined && ctx.MessageThreadId !== null
? String(ctx.MessageThreadId)
: "";
return [provider, accountId, sessionKey, peerId, threadId, messageId].filter(Boolean).join("|");
return [provider, accountId, sessionScope, peerId, threadId, messageId].filter(Boolean).join("|");
}
export function shouldSkipDuplicateInbound(