fix(routing): unify session delivery invariants for duplicate suppression (#33786)

* Routing: unify session delivery invariants

* Routing: address PR review feedback

* Routing: tighten topic and session-scope suppression

* fix(chat): inherit routes for per-account channel-peer sessions
This commit is contained in:
Tak Hoffman 2026-03-03 21:40:38 -06:00 committed by GitHub
parent 1be39d4250
commit 7f2708a8c3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 436 additions and 28 deletions

View File

@ -20,6 +20,7 @@ Docs: https://docs.openclaw.ai
- Build/lazy runtime boundaries: replace ineffective dynamic import sites with dedicated lazy runtime boundaries across Slack slash handling, Telegram audit, CLI send deps, memory fallback, and outbound delivery paths while preserving behavior. (#33690) thanks @gumadeiras.
- Config/heartbeat legacy-path handling: auto-migrate top-level `heartbeat` into `agents.defaults.heartbeat` (with merge semantics that preserve explicit defaults), and keep startup failures on non-migratable legacy entries in the detailed invalid-config path instead of generic migration-failed errors. (#32706) thanks @xiwan.
- Plugins/SDK subpath parity: add channel-specific plugin SDK subpaths for Discord, Slack, Signal, iMessage, WhatsApp, and LINE; migrate bundled plugin entrypoints to scoped subpaths/core with CI guardrails; and keep `openclaw/plugin-sdk` root import compatibility for existing external plugins. (#33737) thanks @gumadeiras.
- Routing/session duplicate suppression synthesis: align shared session delivery-context inheritance, channel-paired route-field merges, and reply-surface target matching so dmScope=main turns avoid cross-surface duplicate replies while thread-aware forwarding keeps intended routing semantics. (from #33629, #26889, #17337, #33250) Thanks @Yuandiaodiaodiao, @kevinwildenradt, @Glucksberg, and @bmendonca3.
- Security/auth labels: remove token and API-key snippets from user-facing auth status labels so `/status` and `/models` do not expose credential fragments. (#33262) thanks @cu1ch3n.
- Auth/credential semantics: align profile eligibility + probe diagnostics with SecretRef/expiry rules and harden browser download atomic writes. (#33733) thanks @joshavant.
- Security/audit denyCommands guidance: suggest likely exact node command IDs for unknown `gateway.nodes.denyCommands` entries so ineffective denylist entries are easier to correct. (#29713) thanks @liquidhorizon88-bot.

View File

@ -5,6 +5,7 @@ export type MessagingToolSend = {
provider: string;
accountId?: string;
to?: string;
threadId?: string;
};
const CORE_MESSAGING_TOOLS = new Set(["sessions_send", "message"]);

View File

@ -1,5 +1,8 @@
import { describe, expect, it } from "vitest";
import { filterMessagingToolMediaDuplicates } from "./reply-payloads.js";
import {
filterMessagingToolMediaDuplicates,
shouldSuppressMessagingToolReplies,
} from "./reply-payloads.js";
describe("filterMessagingToolMediaDuplicates", () => {
it("strips mediaUrl when it matches sentMediaUrls", () => {
@ -75,3 +78,79 @@ describe("filterMessagingToolMediaDuplicates", () => {
expect(result).toEqual([{ text: "hello", mediaUrl: undefined, mediaUrls: undefined }]);
});
});
describe("shouldSuppressMessagingToolReplies", () => {
it("suppresses when target provider is missing but target matches current provider route", () => {
expect(
shouldSuppressMessagingToolReplies({
messageProvider: "telegram",
originatingTo: "123",
messagingToolSentTargets: [{ tool: "message", provider: "", to: "123" }],
}),
).toBe(true);
});
it('suppresses when target provider uses "message" placeholder and target matches', () => {
expect(
shouldSuppressMessagingToolReplies({
messageProvider: "telegram",
originatingTo: "123",
messagingToolSentTargets: [{ tool: "message", provider: "message", to: "123" }],
}),
).toBe(true);
});
it("does not suppress when providerless target does not match origin route", () => {
expect(
shouldSuppressMessagingToolReplies({
messageProvider: "telegram",
originatingTo: "123",
messagingToolSentTargets: [{ tool: "message", provider: "", to: "456" }],
}),
).toBe(false);
});
it("suppresses telegram topic-origin replies when explicit threadId matches", () => {
expect(
shouldSuppressMessagingToolReplies({
messageProvider: "telegram",
originatingTo: "telegram:group:-100123:topic:77",
messagingToolSentTargets: [
{ tool: "message", provider: "telegram", to: "-100123", threadId: "77" },
],
}),
).toBe(true);
});
it("does not suppress telegram topic-origin replies when explicit threadId differs", () => {
expect(
shouldSuppressMessagingToolReplies({
messageProvider: "telegram",
originatingTo: "telegram:group:-100123:topic:77",
messagingToolSentTargets: [
{ tool: "message", provider: "telegram", to: "-100123", threadId: "88" },
],
}),
).toBe(false);
});
it("does not suppress telegram topic-origin replies when target omits topic metadata", () => {
expect(
shouldSuppressMessagingToolReplies({
messageProvider: "telegram",
originatingTo: "telegram:group:-100123:topic:77",
messagingToolSentTargets: [{ tool: "message", provider: "telegram", to: "-100123" }],
}),
).toBe(false);
});
it("suppresses telegram replies when chatId matches but target forms differ", () => {
expect(
shouldSuppressMessagingToolReplies({
messageProvider: "telegram",
originatingTo: "telegram:group:-100123",
messagingToolSentTargets: [{ tool: "message", provider: "telegram", to: "-100123" }],
}),
).toBe(true);
});
});

View File

@ -4,6 +4,7 @@ import { normalizeChannelId } from "../../channels/plugins/index.js";
import type { ReplyToMode } from "../../config/types.js";
import { normalizeTargetForProvider } from "../../infra/outbound/target-normalization.js";
import { normalizeOptionalAccountId } from "../../routing/account-id.js";
import { parseTelegramTarget } from "../../telegram/targets.js";
import type { OriginatingChannelType } from "../templating.js";
import type { ReplyPayload } from "../types.js";
import { extractReplyToTag } from "./reply-tags.js";
@ -162,6 +163,62 @@ function normalizeProviderForComparison(value?: string): string | undefined {
return PROVIDER_ALIAS_MAP[lowered] ?? lowered;
}
function normalizeThreadIdForComparison(value?: string): string | undefined {
const trimmed = value?.trim();
if (!trimmed) {
return undefined;
}
if (/^-?\d+$/.test(trimmed)) {
return String(Number.parseInt(trimmed, 10));
}
return trimmed.toLowerCase();
}
function resolveTargetProviderForComparison(params: {
currentProvider: string;
targetProvider?: string;
}): string {
const targetProvider = normalizeProviderForComparison(params.targetProvider);
if (!targetProvider || targetProvider === "message") {
return params.currentProvider;
}
return targetProvider;
}
function targetsMatchForSuppression(params: {
provider: string;
originTarget: string;
targetKey: string;
targetThreadId?: string;
}): boolean {
if (params.provider !== "telegram") {
return params.targetKey === params.originTarget;
}
const origin = parseTelegramTarget(params.originTarget);
const target = parseTelegramTarget(params.targetKey);
const explicitTargetThreadId = normalizeThreadIdForComparison(params.targetThreadId);
const targetThreadId =
explicitTargetThreadId ??
(target.messageThreadId != null ? String(target.messageThreadId) : undefined);
const originThreadId =
origin.messageThreadId != null ? String(origin.messageThreadId) : undefined;
if (origin.chatId.trim().toLowerCase() !== target.chatId.trim().toLowerCase()) {
return false;
}
if (originThreadId && targetThreadId != null) {
return originThreadId === targetThreadId;
}
if (originThreadId && targetThreadId == null) {
return false;
}
if (!originThreadId && targetThreadId != null) {
return false;
}
// chatId already matched and neither side carries thread context.
return true;
}
export function shouldSuppressMessagingToolReplies(params: {
messageProvider?: string;
messagingToolSentTargets?: MessagingToolSend[];
@ -182,16 +239,14 @@ export function shouldSuppressMessagingToolReplies(params: {
return false;
}
return sentTargets.some((target) => {
const targetProvider = normalizeProviderForComparison(target?.provider);
if (!targetProvider) {
const targetProvider = resolveTargetProviderForComparison({
currentProvider: provider,
targetProvider: target?.provider,
});
if (targetProvider !== provider) {
return false;
}
const isGenericMessageProvider = targetProvider === "message";
if (!isGenericMessageProvider && targetProvider !== provider) {
return false;
}
const targetNormalizationProvider = isGenericMessageProvider ? provider : targetProvider;
const targetKey = normalizeTargetForProvider(targetNormalizationProvider, target.to);
const targetKey = normalizeTargetForProvider(targetProvider, target.to);
if (!targetKey) {
return false;
}
@ -199,6 +254,11 @@ export function shouldSuppressMessagingToolReplies(params: {
if (originAccount && targetAccount && originAccount !== targetAccount) {
return false;
}
return targetKey === originTarget;
return targetsMatchForSuppression({
provider,
originTarget,
targetKey,
targetThreadId: target.threadId,
});
});
}

View File

@ -30,6 +30,14 @@ function resolveSessionKeyChannelHint(sessionKey?: string): string | undefined {
return normalizeMessageChannel(head);
}
function isMainSessionKey(sessionKey?: string): boolean {
const parsed = parseAgentSessionKey(sessionKey);
if (!parsed) {
return (sessionKey ?? "").trim().toLowerCase() === "main";
}
return parsed.rest.trim().toLowerCase() === "main";
}
function isExternalRoutingChannel(channel?: string): channel is string {
return Boolean(
channel && channel !== INTERNAL_MESSAGE_CHANNEL && isDeliverableMessageChannel(channel),
@ -42,6 +50,9 @@ export function resolveLastChannelRaw(params: {
sessionKey?: string;
}): string | undefined {
const originatingChannel = normalizeMessageChannel(params.originatingChannelRaw);
if (originatingChannel === INTERNAL_MESSAGE_CHANNEL && isMainSessionKey(params.sessionKey)) {
return params.originatingChannelRaw;
}
const persistedChannel = normalizeMessageChannel(params.persistedLastChannel);
const sessionKeyChannelHint = resolveSessionKeyChannelHint(params.sessionKey);
let resolved = params.originatingChannelRaw || params.persistedLastChannel;
@ -66,6 +77,9 @@ export function resolveLastToRaw(params: {
sessionKey?: string;
}): string | undefined {
const originatingChannel = normalizeMessageChannel(params.originatingChannelRaw);
if (originatingChannel === INTERNAL_MESSAGE_CHANNEL && isMainSessionKey(params.sessionKey)) {
return params.originatingToRaw || params.toRaw;
}
const persistedChannel = normalizeMessageChannel(params.persistedLastChannel);
const sessionKeyChannelHint = resolveSessionKeyChannelHint(params.sessionKey);

View File

@ -1609,4 +1609,69 @@ describe("initSessionState internal channel routing preservation", () => {
expect(result.sessionEntry.lastChannel).toBe("webchat");
});
it("does not reuse stale external lastTo for webchat/main turns without destination", async () => {
const storePath = await createStorePath("webchat-main-no-stale-lastto-");
const sessionKey = "agent:main:main";
await writeSessionStoreFast(storePath, {
[sessionKey]: {
sessionId: "sess-webchat-main-1",
updatedAt: Date.now(),
lastChannel: "whatsapp",
lastTo: "+15555550123",
deliveryContext: {
channel: "whatsapp",
to: "+15555550123",
},
},
});
const cfg = { session: { store: storePath } } as OpenClawConfig;
const result = await initSessionState({
ctx: {
Body: "webchat follow-up",
SessionKey: sessionKey,
OriginatingChannel: "webchat",
},
cfg,
commandAuthorized: true,
});
expect(result.sessionEntry.lastChannel).toBe("webchat");
expect(result.sessionEntry.lastTo).toBeUndefined();
});
it("prefers webchat route over persisted external route for main session turns", async () => {
const storePath = await createStorePath("prefer-webchat-main-route-");
const sessionKey = "agent:main:main";
await writeSessionStoreFast(storePath, {
[sessionKey]: {
sessionId: "sess-webchat-main-2",
updatedAt: Date.now(),
lastChannel: "whatsapp",
lastTo: "+15555550123",
deliveryContext: {
channel: "whatsapp",
to: "+15555550123",
},
},
});
const cfg = { session: { store: storePath } } as OpenClawConfig;
const result = await initSessionState({
ctx: {
Body: "reply only here",
SessionKey: sessionKey,
OriginatingChannel: "webchat",
OriginatingTo: "session:webchat-main",
},
cfg,
commandAuthorized: true,
});
expect(result.sessionEntry.lastChannel).toBe("webchat");
expect(result.sessionEntry.lastTo).toBe("session:webchat-main");
expect(result.sessionEntry.deliveryContext?.channel).toBe("webchat");
expect(result.sessionEntry.deliveryContext?.to).toBe("session:webchat-main");
});
});

View File

@ -332,6 +332,7 @@ export type ChannelMessageActionContext = {
export type ChannelToolSend = {
to: string;
accountId?: string | null;
threadId?: string | null;
};
export type ChannelMessageActionAdapter = {

View File

@ -30,7 +30,7 @@ vi.mock("../session-utils.js", async (importOriginal) => {
const original = await importOriginal<typeof import("../session-utils.js")>();
return {
...original,
loadSessionEntry: () => ({
loadSessionEntry: (rawKey: string) => ({
cfg: {},
storePath: path.join(path.dirname(mockState.transcriptPath), "sessions.json"),
entry: {
@ -38,7 +38,7 @@ vi.mock("../session-utils.js", async (importOriginal) => {
sessionFile: mockState.transcriptPath,
...mockState.sessionEntry,
},
canonicalKey: "main",
canonicalKey: rawKey || "main",
}),
};
});
@ -147,12 +147,13 @@ async function runNonStreamingChatSend(params: {
respond: ReturnType<typeof vi.fn>;
idempotencyKey: string;
message?: string;
sessionKey?: string;
client?: unknown;
expectBroadcast?: boolean;
}) {
await chatHandlers["chat.send"]({
params: {
sessionKey: "main",
sessionKey: params.sessionKey ?? "main",
message: params.message ?? "hello",
idempotencyKey: params.idempotencyKey,
},
@ -367,6 +368,7 @@ describe("chat directive tag stripping for non-streaming final payloads", () =>
context,
respond,
idempotencyKey: "idem-origin-routing",
sessionKey: "agent:main:telegram:direct:6812765697",
expectBroadcast: false,
});
@ -400,6 +402,7 @@ describe("chat directive tag stripping for non-streaming final payloads", () =>
context,
respond,
idempotencyKey: "idem-feishu-origin-routing",
sessionKey: "agent:main:feishu:direct:ou_feishu_direct_123",
expectBroadcast: false,
});
@ -411,4 +414,103 @@ describe("chat directive tag stripping for non-streaming final payloads", () =>
}),
);
});
it("chat.send inherits routing metadata for per-account channel-peer session keys", async () => {
createTranscriptFixture("openclaw-chat-send-per-account-channel-peer-routing-");
mockState.finalText = "ok";
mockState.sessionEntry = {
deliveryContext: {
channel: "telegram",
to: "telegram:6812765697",
accountId: "account-a",
},
lastChannel: "telegram",
lastTo: "telegram:6812765697",
lastAccountId: "account-a",
};
const respond = vi.fn();
const context = createChatContext();
await runNonStreamingChatSend({
context,
respond,
idempotencyKey: "idem-per-account-channel-peer-routing",
sessionKey: "agent:main:telegram:account-a:direct:6812765697",
expectBroadcast: false,
});
expect(mockState.lastDispatchCtx).toEqual(
expect.objectContaining({
OriginatingChannel: "telegram",
OriginatingTo: "telegram:6812765697",
AccountId: "account-a",
}),
);
});
it("chat.send does not inherit external delivery context for shared main sessions", async () => {
createTranscriptFixture("openclaw-chat-send-main-no-cross-route-");
mockState.finalText = "ok";
mockState.sessionEntry = {
deliveryContext: {
channel: "discord",
to: "discord:1234567890",
accountId: "default",
},
lastChannel: "discord",
lastTo: "discord:1234567890",
lastAccountId: "default",
};
const respond = vi.fn();
const context = createChatContext();
await runNonStreamingChatSend({
context,
respond,
idempotencyKey: "idem-main-no-cross-route",
sessionKey: "main",
expectBroadcast: false,
});
expect(mockState.lastDispatchCtx).toEqual(
expect.objectContaining({
OriginatingChannel: "webchat",
OriginatingTo: undefined,
AccountId: undefined,
}),
);
});
it("chat.send does not inherit external delivery context for non-channel custom sessions", async () => {
createTranscriptFixture("openclaw-chat-send-custom-no-cross-route-");
mockState.finalText = "ok";
mockState.sessionEntry = {
deliveryContext: {
channel: "discord",
to: "discord:1234567890",
accountId: "default",
},
lastChannel: "discord",
lastTo: "discord:1234567890",
lastAccountId: "default",
};
const respond = vi.fn();
const context = createChatContext();
await runNonStreamingChatSend({
context,
respond,
idempotencyKey: "idem-custom-no-cross-route",
sessionKey: "agent:main:work",
expectBroadcast: false,
});
expect(mockState.lastDispatchCtx).toEqual(
expect.objectContaining({
OriginatingChannel: "webchat",
OriginatingTo: undefined,
AccountId: undefined,
}),
);
});
});

View File

@ -12,6 +12,7 @@ import { createReplyPrefixOptions } from "../../channels/reply-prefix.js";
import { resolveSessionFilePath } from "../../config/sessions.js";
import { jsonUtf8Bytes } from "../../infra/json-utf8-bytes.js";
import { resolveSendPolicy } from "../../sessions/send-policy.js";
import { parseAgentSessionKey } from "../../sessions/session-key-utils.js";
import {
stripInlineDirectiveTagsForDisplay,
stripInlineDirectiveTagsFromMessageForDisplay,
@ -70,6 +71,20 @@ const CHAT_HISTORY_TEXT_MAX_CHARS = 12_000;
const CHAT_HISTORY_MAX_SINGLE_MESSAGE_BYTES = 128 * 1024;
const CHAT_HISTORY_OVERSIZED_PLACEHOLDER = "[chat.history omitted: message too large]";
let chatHistoryPlaceholderEmitCount = 0;
const CHANNEL_AGNOSTIC_SESSION_SCOPES = new Set([
"main",
"direct",
"dm",
"group",
"channel",
"cron",
"run",
"subagent",
"acp",
"thread",
"topic",
]);
const CHANNEL_SCOPED_SESSION_SHAPES = new Set(["direct", "dm", "group", "channel"]);
function stripDisallowedChatControlChars(message: string): string {
let output = "";
@ -847,7 +862,30 @@ export const chatHandlers: GatewayRequestHandlers = {
const routeAccountIdCandidate =
entry?.deliveryContext?.accountId ?? entry?.lastAccountId ?? undefined;
const routeThreadIdCandidate = entry?.deliveryContext?.threadId ?? entry?.lastThreadId;
const parsedSessionKey = parseAgentSessionKey(sessionKey);
const sessionScopeParts = (parsedSessionKey?.rest ?? sessionKey).split(":").filter(Boolean);
const sessionScopeHead = sessionScopeParts[0];
const sessionChannelHint = normalizeMessageChannel(sessionScopeHead);
const sessionPeerShapeCandidates = [sessionScopeParts[1], sessionScopeParts[2]]
.map((part) => (part ?? "").trim().toLowerCase())
.filter(Boolean);
const isChannelAgnosticSessionScope = CHANNEL_AGNOSTIC_SESSION_SCOPES.has(
(sessionScopeHead ?? "").trim().toLowerCase(),
);
const isChannelScopedSession = sessionPeerShapeCandidates.some((part) =>
CHANNEL_SCOPED_SESSION_SHAPES.has(part),
);
// Only inherit prior external route metadata for channel-scoped sessions.
// Channel-agnostic sessions (main, direct:<peer>, etc.) can otherwise
// leak stale routes across surfaces.
const canInheritDeliverableRoute = Boolean(
sessionChannelHint &&
sessionChannelHint !== INTERNAL_MESSAGE_CHANNEL &&
!isChannelAgnosticSessionScope &&
isChannelScopedSession,
);
const hasDeliverableRoute =
canInheritDeliverableRoute &&
routeChannelCandidate &&
routeChannelCandidate !== INTERNAL_MESSAGE_CHANNEL &&
typeof routeToCandidate === "string" &&

View File

@ -1,7 +1,7 @@
export function extractToolSend(
args: Record<string, unknown>,
expectedAction = "sendMessage",
): { to: string; accountId?: string } | null {
): { to: string; accountId?: string; threadId?: string } | null {
const action = typeof args.action === "string" ? args.action.trim() : "";
if (action !== expectedAction) {
return null;
@ -11,5 +11,12 @@ export function extractToolSend(
return null;
}
const accountId = typeof args.accountId === "string" ? args.accountId.trim() : undefined;
return { to, accountId };
const threadIdRaw =
typeof args.threadId === "string"
? args.threadId.trim()
: typeof args.threadId === "number"
? String(args.threadId)
: "";
const threadId = threadIdRaw.length > 0 ? threadIdRaw : undefined;
return { to, accountId, threadId };
}

View File

@ -24,16 +24,45 @@ describe("delivery context helpers", () => {
expect(normalizeDeliveryContext({ channel: " " })).toBeUndefined();
});
it("merges primary values over fallback", () => {
it("does not inherit route fields from fallback when channels conflict", () => {
const merged = mergeDeliveryContext(
{ channel: "whatsapp", to: "channel:abc" },
{ channel: "slack", to: "channel:def", accountId: "acct" },
{ channel: "telegram" },
{ channel: "discord", to: "channel:def", accountId: "acct", threadId: "99" },
);
expect(merged).toEqual({
channel: "whatsapp",
to: "channel:abc",
channel: "telegram",
to: undefined,
accountId: undefined,
});
expect(merged?.threadId).toBeUndefined();
});
it("inherits missing route fields when channels match", () => {
const merged = mergeDeliveryContext(
{ channel: "telegram" },
{ channel: "telegram", to: "123", accountId: "acct", threadId: "99" },
);
expect(merged).toEqual({
channel: "telegram",
to: "123",
accountId: "acct",
threadId: "99",
});
});
it("uses fallback route fields when fallback has no channel", () => {
const merged = mergeDeliveryContext(
{ channel: "telegram" },
{ to: "123", accountId: "acct", threadId: "99" },
);
expect(merged).toEqual({
channel: "telegram",
to: "123",
accountId: "acct",
threadId: "99",
});
});
@ -103,7 +132,7 @@ describe("delivery context helpers", () => {
});
});
it("normalizes delivery fields and mirrors them on session entries", () => {
it("normalizes delivery fields, mirrors session fields, and avoids cross-channel carryover", () => {
const normalized = normalizeSessionDeliveryFields({
deliveryContext: {
channel: " Slack ",
@ -118,12 +147,11 @@ describe("delivery context helpers", () => {
expect(normalized.deliveryContext).toEqual({
channel: "whatsapp",
to: "+1555",
accountId: "acct-2",
threadId: "444",
accountId: undefined,
});
expect(normalized.lastChannel).toBe("whatsapp");
expect(normalized.lastTo).toBe("+1555");
expect(normalized.lastAccountId).toBe("acct-2");
expect(normalized.lastThreadId).toBe("444");
expect(normalized.lastAccountId).toBeUndefined();
expect(normalized.lastThreadId).toBeUndefined();
});
});

View File

@ -121,11 +121,23 @@ export function mergeDeliveryContext(
if (!normalizedPrimary && !normalizedFallback) {
return undefined;
}
const channelsConflict =
normalizedPrimary?.channel &&
normalizedFallback?.channel &&
normalizedPrimary.channel !== normalizedFallback.channel;
return normalizeDeliveryContext({
channel: normalizedPrimary?.channel ?? normalizedFallback?.channel,
to: normalizedPrimary?.to ?? normalizedFallback?.to,
accountId: normalizedPrimary?.accountId ?? normalizedFallback?.accountId,
threadId: normalizedPrimary?.threadId ?? normalizedFallback?.threadId,
// Keep route fields paired to their channel; avoid crossing fields between
// unrelated channels during session context merges.
to: channelsConflict
? normalizedPrimary?.to
: (normalizedPrimary?.to ?? normalizedFallback?.to),
accountId: channelsConflict
? normalizedPrimary?.accountId
: (normalizedPrimary?.accountId ?? normalizedFallback?.accountId),
threadId: channelsConflict
? normalizedPrimary?.threadId
: (normalizedPrimary?.threadId ?? normalizedFallback?.threadId),
});
}