diff --git a/CHANGELOG.md b/CHANGELOG.md index b62f51a41cf..a626f62fa18 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -127,6 +127,7 @@ Docs: https://docs.openclaw.ai - Telegram/audio: transcode Telegram voice-note `.ogg` attachments before the local `whisper-cli` auto fallback runs, and keep mention-preflight transcription enabled in auto mode when `tools.media.audio` is unset. - Matrix/direct rooms: recover fresh auto-joined 1:1 DMs without eagerly persisting invite-only `m.direct` mappings, while keeping named, aliased, and explicitly configured rooms on the room path. (#58024) Thanks @gumadeiras. - TTS: Restore 3.28 schema compatibility and fallback observability. (#57953) Thanks @joshavant. +- Telegram/forum topics: restore reply routing to the active topic and keep ACP `sessions_spawn(..., thread=true, mode="session")` bound to that same topic instead of falling back to root chat or losing follow-up routing. (#56060) Thanks @one27001. ## 2026.3.28 diff --git a/extensions/telegram/src/thread-bindings.test.ts b/extensions/telegram/src/thread-bindings.test.ts index 53af799f7b1..90b704c0b51 100644 --- a/extensions/telegram/src/thread-bindings.test.ts +++ b/extensions/telegram/src/thread-bindings.test.ts @@ -59,7 +59,7 @@ describe("telegram thread bindings", () => { expect(manager.getByConversationId("-100200300:topic:77")?.boundBy).toBe("user-1"); }); - it("does not support child placement", async () => { + it("rejects child placement when conversationId is a bare topic ID with no group context", async () => { createTelegramThreadBindingManager({ accountId: "default", persist: false, @@ -73,12 +73,36 @@ describe("telegram thread bindings", () => { conversation: { channel: "telegram", accountId: "default", - conversationId: "-100200300:topic:77", + conversationId: "77", }, placement: "child", }), ).rejects.toMatchObject({ - code: "BINDING_CAPABILITY_UNSUPPORTED", + code: "BINDING_CREATE_FAILED", + }); + }); + + it("rejects child placement when parentConversationId is also a bare topic ID", async () => { + createTelegramThreadBindingManager({ + accountId: "default", + persist: false, + enableSweeper: false, + }); + + await expect( + getSessionBindingService().bind({ + targetSessionKey: "agent:main:acp:child-acp-1", + targetKind: "session", + conversation: { + channel: "telegram", + accountId: "default", + conversationId: "77", + parentConversationId: "99", + }, + placement: "child", + }), + ).rejects.toMatchObject({ + code: "BINDING_CREATE_FAILED", }); }); diff --git a/extensions/telegram/src/thread-bindings.ts b/extensions/telegram/src/thread-bindings.ts index 57c51a40026..d6b964f1349 100644 --- a/extensions/telegram/src/thread-bindings.ts +++ b/extensions/telegram/src/thread-bindings.ts @@ -1,6 +1,7 @@ import fs from "node:fs"; import os from "node:os"; import path from "node:path"; +import { loadConfig } from "openclaw/plugin-sdk/config-runtime"; import { formatThreadBindingDurationLabel, registerSessionBindingAdapter, @@ -15,6 +16,8 @@ import { writeJsonFileAtomically } from "openclaw/plugin-sdk/json-store"; import { normalizeAccountId } from "openclaw/plugin-sdk/routing"; import { logVerbose } from "openclaw/plugin-sdk/runtime-env"; import { resolveStateDir } from "openclaw/plugin-sdk/state-paths"; +import { createForumTopicTelegram } from "./send.js"; +import { resolveTelegramToken } from "./token.js"; const DEFAULT_THREAD_BINDING_IDLE_TIMEOUT_MS = 24 * 60 * 60 * 1000; const DEFAULT_THREAD_BINDING_MAX_AGE_MS = 0; @@ -556,18 +559,63 @@ export function createTelegramThreadBindingManager( channel: "telegram", accountId, capabilities: { - placements: ["current"], + placements: ["current", "child"], }, bind: async (input) => { if (input.conversation.channel !== "telegram") { return null; } - if (input.placement === "child") { + const targetSessionKey = input.targetSessionKey.trim(); + if (!targetSessionKey) { return null; } - const conversationId = normalizeConversationId(input.conversation.conversationId); - const targetSessionKey = input.targetSessionKey.trim(); - if (!conversationId || !targetSessionKey) { + const placement = input.placement === "child" ? "child" : "current"; + const metadata = input.metadata ?? {}; + let conversationId: string | undefined; + + if (placement === "child") { + const rawConversationId = input.conversation.conversationId?.trim() ?? ""; + const rawParent = input.conversation.parentConversationId?.trim() ?? ""; + const cfg = loadConfig(); + let chatId = rawParent || rawConversationId; + if (!chatId) { + logVerbose( + `telegram: child bind failed: could not resolve group chat ID from conversationId=${rawConversationId}`, + ); + return null; + } + if (!chatId.startsWith("-")) { + logVerbose( + `telegram: child bind failed: conversationId "${chatId}" looks like a bare topic ID, not a group chat ID (expected to start with "-"). Provide a full chatId:topic:topicId conversationId or set parentConversationId to the group chat ID.`, + ); + return null; + } + const threadName = + (typeof metadata.threadName === "string" ? metadata.threadName.trim() : "") || + (typeof metadata.label === "string" ? metadata.label.trim() : "") || + `Agent: ${targetSessionKey.split(":").pop()}`; + try { + const tokenResolution = resolveTelegramToken(cfg, { accountId }); + if (!tokenResolution.token) { + return null; + } + const result = await createForumTopicTelegram(chatId, threadName, { + cfg, + token: tokenResolution.token, + accountId, + }); + conversationId = `${result.chatId}:topic:${result.topicId}`; + } catch (err) { + logVerbose( + `telegram: child thread-binding failed for ${chatId}: ${err instanceof Error ? err.message : String(err)}`, + ); + return null; + } + } else { + conversationId = normalizeConversationId(input.conversation.conversationId); + } + + if (!conversationId) { return null; } const record = fromSessionBindingInput({ diff --git a/src/agents/acp-spawn.test.ts b/src/agents/acp-spawn.test.ts index ad289b5c251..d26932af536 100644 --- a/src/agents/acp-spawn.test.ts +++ b/src/agents/acp-spawn.test.ts @@ -1260,7 +1260,7 @@ describe("spawnAcpDirect", () => { expect(notifyOrder[0] > agentCallOrder).toBe(true); }); - it("keeps inline delivery for thread-bound ACP session mode", async () => { + it("binds Telegram forum-topic ACP sessions to the current topic", async () => { replaceSpawnConfig({ ...hoisted.state.cfg, channels: { @@ -1296,11 +1296,22 @@ describe("spawnAcpDirect", () => { agentAccountId: "default", agentTo: "telegram:-1003342490704", agentThreadId: "2", + agentGroupId: "-1003342490704", }, ); expect(result.status).toBe("accepted"); expect(result.mode).toBe("session"); + expect(hoisted.sessionBindingBindMock).toHaveBeenCalledWith( + expect.objectContaining({ + placement: "current", + conversation: expect.objectContaining({ + channel: "telegram", + accountId: "default", + conversationId: "-1003342490704:topic:2", + }), + }), + ); const agentCall = hoisted.callGatewayMock.mock.calls .map((call: unknown[]) => call[0] as { method?: string; params?: Record }) .find((request) => request.method === "agent"); diff --git a/src/agents/acp-spawn.ts b/src/agents/acp-spawn.ts index 69718136ab6..155e491bf29 100644 --- a/src/agents/acp-spawn.ts +++ b/src/agents/acp-spawn.ts @@ -87,6 +87,8 @@ export type SpawnAcpContext = { agentAccountId?: string; agentTo?: string; agentThreadId?: string | number; + /** Group chat ID for channels that distinguish group vs. topic (e.g. Telegram). */ + agentGroupId?: string; sandboxed?: boolean; }; @@ -360,7 +362,39 @@ function resolveConversationIdForThreadBinding(params: { channel?: string; to?: string; threadId?: string | number; + groupId?: string; }): string | undefined { + const channel = params.channel?.trim().toLowerCase(); + const normalizedThreadId = + params.threadId != null ? String(params.threadId).trim() || undefined : undefined; + if (channel === "telegram") { + const rawChatId = (params.groupId ?? params.to ?? "").trim(); + let chatId = rawChatId; + while (true) { + const next = (() => { + if (/^(telegram|tg):/i.test(chatId)) { + return chatId.replace(/^(telegram|tg):/i, "").trim(); + } + if (/^(group|channel):/i.test(chatId)) { + return chatId.replace(/^(group|channel):/i, "").trim(); + } + return chatId; + })(); + if (next === chatId) { + break; + } + chatId = next; + } + chatId = chatId + .replace(/:topic:\d+$/i, "") + .replace(/:\d+$/i, "") + .trim(); + if (/^-?\d+$/.test(chatId)) { + return normalizedThreadId ? `${chatId}:topic:${normalizedThreadId}` : chatId; + } + return undefined; + } + const genericConversationId = resolveConversationIdFromTargets({ threadId: params.threadId, targets: [params.to], @@ -368,8 +402,6 @@ function resolveConversationIdForThreadBinding(params: { if (genericConversationId) { return genericConversationId; } - - const channel = params.channel?.trim().toLowerCase(); const target = params.to?.trim() || ""; if (channel === "line") { const prefixed = target.match(/^line:(?:(?:user|group|room):)?([UCR][a-f0-9]{32})$/i)?.[1]; @@ -390,6 +422,7 @@ function prepareAcpThreadBinding(params: { accountId?: string; to?: string; threadId?: string | number; + groupId?: string; }): { ok: true; binding: PreparedAcpThreadBinding } | { ok: false; error: string } { const channel = params.channel?.trim().toLowerCase(); if (!channel) { @@ -444,12 +477,13 @@ function prepareAcpThreadBinding(params: { error: `Thread bindings do not support ${placement} placement for ${policy.channel}.`, }; } - const conversationId = resolveConversationIdForThreadBinding({ + const conversationIdRaw = resolveConversationIdForThreadBinding({ channel: policy.channel, to: params.to, threadId: params.threadId, + groupId: params.groupId, }); - if (!conversationId) { + if (!conversationIdRaw) { return { ok: false, error: `Could not resolve a ${policy.channel} conversation for ACP thread spawn.`, @@ -462,7 +496,7 @@ function prepareAcpThreadBinding(params: { channel: policy.channel, accountId: policy.accountId, placement, - conversationId, + conversationId: conversationIdRaw, }, }; } @@ -752,7 +786,7 @@ export async function spawnAcpDirect( }; } - const requestThreadBinding = params.thread === true; + let requestThreadBinding = params.thread === true; const runtimePolicyError = resolveAcpSpawnRuntimePolicyError({ cfg, requesterSessionKey: ctx.agentSessionKey, @@ -819,6 +853,7 @@ export async function spawnAcpDirect( accountId: ctx.agentAccountId, to: ctx.agentTo, threadId: ctx.agentThreadId, + groupId: ctx.agentGroupId, }); if (!prepared.ok) { return { diff --git a/src/agents/tools/sessions-spawn-tool.ts b/src/agents/tools/sessions-spawn-tool.ts index 795ef895b9f..61b21542315 100644 --- a/src/agents/tools/sessions-spawn-tool.ts +++ b/src/agents/tools/sessions-spawn-tool.ts @@ -218,6 +218,7 @@ export function createSessionsSpawnTool( agentAccountId: opts?.agentAccountId, agentTo: opts?.agentTo, agentThreadId: opts?.agentThreadId, + agentGroupId: opts?.agentGroupId ?? undefined, sandboxed: opts?.sandboxed, }, ); diff --git a/src/auto-reply/reply/get-reply-inline-actions.ts b/src/auto-reply/reply/get-reply-inline-actions.ts index b663b0ce519..7e1c2412551 100644 --- a/src/auto-reply/reply/get-reply-inline-actions.ts +++ b/src/auto-reply/reply/get-reply-inline-actions.ts @@ -24,6 +24,7 @@ import { getAbortMemory, isAbortRequestText } from "./abort-primitives.js"; import type { buildStatusReply, handleCommands } from "./commands.runtime.js"; import type { InlineDirectives } from "./directive-handling.parse.js"; import { isDirectiveOnly } from "./directive-handling.parse.js"; +import { extractExplicitGroupId } from "./group-id.js"; import type { createModelSelectionState } from "./model-selection.js"; import { extractInlineSimpleCommand } from "./reply-inline.js"; import type { TypingController } from "./typing.js"; @@ -226,6 +227,7 @@ export async function handleInlineActions(params: { agentAccountId: (ctx as { AccountId?: string }).AccountId, agentTo: ctx.OriginatingTo ?? ctx.To, agentThreadId: ctx.MessageThreadId ?? undefined, + agentGroupId: extractExplicitGroupId(ctx.From), requesterAgentIdOverride: agentId, agentDir, workspaceDir, diff --git a/src/auto-reply/reply/group-id.test.ts b/src/auto-reply/reply/group-id.test.ts new file mode 100644 index 00000000000..cb99ea64cae --- /dev/null +++ b/src/auto-reply/reply/group-id.test.ts @@ -0,0 +1,48 @@ +import { describe, expect, it } from "vitest"; +import { extractExplicitGroupId } from "./group-id.js"; + +describe("extractExplicitGroupId", () => { + it("returns undefined for empty/null input", () => { + expect(extractExplicitGroupId(undefined)).toBeUndefined(); + expect(extractExplicitGroupId(null)).toBeUndefined(); + expect(extractExplicitGroupId("")).toBeUndefined(); + expect(extractExplicitGroupId(" ")).toBeUndefined(); + }); + + it("extracts group ID from telegram group format", () => { + expect(extractExplicitGroupId("telegram:group:-1003776849159")).toBe("-1003776849159"); + }); + + it("extracts group ID from telegram forum topic format, stripping topic suffix", () => { + expect(extractExplicitGroupId("telegram:group:-1003776849159:topic:1264")).toBe( + "-1003776849159", + ); + }); + + it("extracts group ID from channel format", () => { + expect(extractExplicitGroupId("telegram:channel:-1001234567890")).toBe("-1001234567890"); + }); + + it("extracts group ID from channel format with topic", () => { + expect(extractExplicitGroupId("telegram:channel:-1001234567890:topic:42")).toBe( + "-1001234567890", + ); + }); + + it("extracts group ID from bare group: prefix", () => { + expect(extractExplicitGroupId("group:-1003776849159")).toBe("-1003776849159"); + }); + + it("extracts group ID from bare group: prefix with topic", () => { + expect(extractExplicitGroupId("group:-1003776849159:topic:999")).toBe("-1003776849159"); + }); + + it("extracts WhatsApp group ID", () => { + expect(extractExplicitGroupId("whatsapp:120363123456789@g.us")).toBe("120363123456789@g.us"); + }); + + it("returns undefined for unrecognized formats", () => { + expect(extractExplicitGroupId("user:12345")).toBeUndefined(); + expect(extractExplicitGroupId("just-a-string")).toBeUndefined(); + }); +}); diff --git a/src/auto-reply/reply/group-id.ts b/src/auto-reply/reply/group-id.ts index cbd38f2fc06..18ec92a9f5f 100644 --- a/src/auto-reply/reply/group-id.ts +++ b/src/auto-reply/reply/group-id.ts @@ -5,7 +5,8 @@ export function extractExplicitGroupId(raw: string | undefined | null): string | } const parts = trimmed.split(":").filter(Boolean); if (parts.length >= 3 && (parts[1] === "group" || parts[1] === "channel")) { - return parts.slice(2).join(":") || undefined; + const joined = parts.slice(2).join(":"); + return joined.replace(/:topic:.*$/, "") || undefined; } if ( parts.length >= 2 && @@ -15,7 +16,8 @@ export function extractExplicitGroupId(raw: string | undefined | null): string | return parts.slice(1).join(":") || undefined; } if (parts.length >= 2 && (parts[0] === "group" || parts[0] === "channel")) { - return parts.slice(1).join(":") || undefined; + const joined = parts.slice(1).join(":"); + return joined.replace(/:topic:.*$/, "") || undefined; } return undefined; } diff --git a/src/infra/outbound/targets.test.ts b/src/infra/outbound/targets.test.ts index ed467b4eb57..596b6473408 100644 --- a/src/infra/outbound/targets.test.ts +++ b/src/infra/outbound/targets.test.ts @@ -154,9 +154,6 @@ describe("resolveOutboundTarget defaultTo config fallback", () => { }); describe("resolveSessionDeliveryTarget", () => { - type SessionDeliveryRequest = Parameters[0]; - type HeartbeatDeliveryRequest = Parameters[0]; - const expectImplicitRoute = ( resolved: SessionDeliveryTarget, params: { @@ -183,31 +180,13 @@ describe("resolveSessionDeliveryTarget", () => { const expectTopicParsedFromExplicitTo = ( entry: Parameters[0]["entry"], ) => { - expectResolvedSessionTarget({ - request: { - entry, - requestedChannel: "last", - explicitTo: "63448508:topic:1008013", - }, - expected: { - to: "63448508", - threadId: 1008013, - }, + const resolved = resolveSessionDeliveryTarget({ + entry, + requestedChannel: "last", + explicitTo: "63448508:topic:1008013", }); - }; - - const expectResolvedSessionTarget = (params: { - request: SessionDeliveryRequest; - expected: Partial; - }) => { - expect(resolveSessionDeliveryTarget(params.request)).toMatchObject(params.expected); - }; - - const expectResolvedHeartbeatRoute = (params: { - request: HeartbeatDeliveryRequest; - expected: Partial>; - }) => { - expect(resolveHeartbeatDeliveryTarget(params.request)).toMatchObject(params.expected); + expect(resolved.to).toBe("63448508"); + expect(resolved.threadId).toBe(1008013); }; it("derives implicit delivery from the last route", () => { @@ -236,34 +215,6 @@ describe("resolveSessionDeliveryTarget", () => { }); }); - it("uses origin provider and accountId when legacy last route fields are absent", () => { - const resolved = resolveSessionDeliveryTarget({ - entry: { - sessionId: "sess-origin-route", - updatedAt: 1, - lastTo: " +1555 ", - origin: { - provider: " whatsapp ", - accountId: " acct-origin ", - }, - }, - requestedChannel: "last", - }); - - expect(resolved).toEqual({ - channel: "whatsapp", - to: "+1555", - accountId: "acct-origin", - threadId: undefined, - threadIdExplicit: false, - mode: "implicit", - lastChannel: "whatsapp", - lastTo: "+1555", - lastAccountId: "acct-origin", - lastThreadId: undefined, - }); - }); - it("prefers explicit targets without reusing lastTo", () => { const resolved = resolveSessionDeliveryTarget({ entry: { @@ -303,65 +254,53 @@ describe("resolveSessionDeliveryTarget", () => { }); }); - it.each([ - { - name: "passes through explicitThreadId when provided", - request: { - entry: { - sessionId: "sess-thread", - updatedAt: 1, - lastChannel: "telegram", - lastTo: "-100123", - lastThreadId: 999, - }, - requestedChannel: "last", - explicitThreadId: 42, + it("passes through explicitThreadId when provided", () => { + const resolved = resolveSessionDeliveryTarget({ + entry: { + sessionId: "sess-thread", + updatedAt: 1, + lastChannel: "telegram", + lastTo: "-100123", + lastThreadId: 999, }, - expected: { - channel: "telegram", - to: "-100123", - threadId: 42, + requestedChannel: "last", + explicitThreadId: 42, + }); + + expect(resolved.threadId).toBe(42); + expect(resolved.channel).toBe("telegram"); + expect(resolved.to).toBe("-100123"); + }); + + it("uses session lastThreadId when no explicitThreadId", () => { + const resolved = resolveSessionDeliveryTarget({ + entry: { + sessionId: "sess-thread-2", + updatedAt: 1, + lastChannel: "telegram", + lastTo: "-100123", + lastThreadId: 999, }, - }, - { - name: "uses session lastThreadId when no explicitThreadId", - request: { - entry: { - sessionId: "sess-thread-2", - updatedAt: 1, - lastChannel: "telegram", - lastTo: "-100123", - lastThreadId: 999, - }, - requestedChannel: "last", + requestedChannel: "last", + }); + + expect(resolved.threadId).toBe(999); + }); + + it("does not inherit lastThreadId in heartbeat mode", () => { + const resolved = resolveSessionDeliveryTarget({ + entry: { + sessionId: "sess-heartbeat-thread", + updatedAt: 1, + lastChannel: "slack", + lastTo: "user:U123", + lastThreadId: "1739142736.000100", }, - expected: { - threadId: 999, - }, - }, - { - name: "does not inherit lastThreadId in heartbeat mode", - request: { - entry: { - sessionId: "sess-heartbeat-thread", - updatedAt: 1, - lastChannel: "slack", - lastTo: "user:U123", - lastThreadId: "1739142736.000100", - }, - requestedChannel: "last", - mode: "heartbeat", - }, - expected: { - threadId: undefined, - }, - }, - ] satisfies Array<{ - name: string; - request: SessionDeliveryRequest; - expected: Partial; - }>)("$name", ({ request, expected }) => { - expectResolvedSessionTarget({ request, expected }); + requestedChannel: "last", + mode: "heartbeat", + }); + + expect(resolved.threadId).toBeUndefined(); }); it("falls back to a provided channel when requested is unsupported", () => { @@ -401,44 +340,36 @@ describe("resolveSessionDeliveryTarget", () => { }); }); - it.each([ - { - name: "skips :topic: parsing for non-telegram channels", - request: { - entry: { - sessionId: "sess-slack", - updatedAt: 1, - lastChannel: "slack", - lastTo: "C12345", - }, - requestedChannel: "last", - explicitTo: "C12345:topic:999", - }, - }, - { - name: "skips :topic: parsing when channel is explicitly non-telegram even if lastChannel was telegram", - request: { - entry: { - sessionId: "sess-cross", - updatedAt: 1, - lastChannel: "telegram", - lastTo: "63448508", - }, - requestedChannel: "slack", - explicitTo: "C12345:topic:999", - }, - }, - ] satisfies Array<{ - name: string; - request: SessionDeliveryRequest; - }>)("$name", ({ request }) => { - expectResolvedSessionTarget({ - request, - expected: { - to: "C12345:topic:999", - threadId: undefined, + it("skips :topic: parsing for non-telegram channels", () => { + const resolved = resolveSessionDeliveryTarget({ + entry: { + sessionId: "sess-slack", + updatedAt: 1, + lastChannel: "slack", + lastTo: "C12345", }, + requestedChannel: "last", + explicitTo: "C12345:topic:999", }); + + expect(resolved.to).toBe("C12345:topic:999"); + expect(resolved.threadId).toBeUndefined(); + }); + + it("skips :topic: parsing when channel is explicitly non-telegram even if lastChannel was telegram", () => { + const resolved = resolveSessionDeliveryTarget({ + entry: { + sessionId: "sess-cross", + updatedAt: 1, + lastChannel: "telegram", + lastTo: "63448508", + }, + requestedChannel: "slack", + explicitTo: "C12345:topic:999", + }); + + expect(resolved.to).toBe("C12345:topic:999"); + expect(resolved.threadId).toBeUndefined(); }); it("keeps raw :topic: targets when the telegram plugin registry is unavailable", () => { @@ -460,23 +391,20 @@ describe("resolveSessionDeliveryTarget", () => { }); it("explicitThreadId takes priority over :topic: parsed value", () => { - expectResolvedSessionTarget({ - request: { - entry: { - sessionId: "sess-priority", - updatedAt: 1, - lastChannel: "telegram", - lastTo: "63448508", - }, - requestedChannel: "last", - explicitTo: "63448508:topic:1008013", - explicitThreadId: 42, - }, - expected: { - threadId: 42, - to: "63448508", + const resolved = resolveSessionDeliveryTarget({ + entry: { + sessionId: "sess-priority", + updatedAt: 1, + lastChannel: "telegram", + lastTo: "63448508", }, + requestedChannel: "last", + explicitTo: "63448508:topic:1008013", + explicitThreadId: 42, }); + + expect(resolved.threadId).toBe(42); + expect(resolved.to).toBe("63448508"); }); const resolveHeartbeatTarget = (entry: SessionEntry, directPolicy?: "allow" | "block") => @@ -630,290 +558,335 @@ describe("resolveSessionDeliveryTarget", () => { }); }); - it.each([ - { - name: "allows heartbeat delivery to Discord DMs by default", - request: { - cfg: {} as OpenClawConfig, - entry: { - sessionId: "sess-heartbeat-discord-dm", - updatedAt: 1, - lastChannel: "discord", - lastTo: "user:12345", - }, - heartbeat: { - target: "last", - }, + it("allows heartbeat delivery to Discord DMs by default", () => { + const cfg: OpenClawConfig = {}; + const resolved = resolveHeartbeatDeliveryTarget({ + cfg, + entry: { + sessionId: "sess-heartbeat-discord-dm", + updatedAt: 1, + lastChannel: "discord", + lastTo: "user:12345", }, - expected: { - channel: "discord", - to: "user:12345", + heartbeat: { + target: "last", }, - }, - { - name: "keeps heartbeat delivery to Discord channels", - request: { - cfg: {} as OpenClawConfig, - entry: { - sessionId: "sess-heartbeat-discord-channel", - updatedAt: 1, - lastChannel: "discord", - lastTo: "channel:999", - }, - heartbeat: { - target: "last", - }, + }); + + expect(resolved.channel).toBe("discord"); + expect(resolved.to).toBe("user:12345"); + }); + + it("keeps heartbeat delivery to Discord channels", () => { + const cfg: OpenClawConfig = {}; + const resolved = resolveHeartbeatDeliveryTarget({ + cfg, + entry: { + sessionId: "sess-heartbeat-discord-channel", + updatedAt: 1, + lastChannel: "discord", + lastTo: "channel:999", }, - expected: { - channel: "discord", - to: "channel:999", + heartbeat: { + target: "last", }, - }, - { - name: "parses explicit heartbeat topic targets into threadId", - request: { - cfg: {} as OpenClawConfig, - heartbeat: { - target: "telegram", - to: "-10063448508:topic:1008013", - }, - }, - expected: { - channel: "telegram", - to: "-10063448508", - threadId: 1008013, - }, - }, - { - name: "prefers turn-scoped routing over mutable session routing for target=last", - request: { - cfg: {}, - entry: { - sessionId: "sess-heartbeat-turn-source", - updatedAt: 1, - lastChannel: "slack", - lastTo: "U_WRONG", - }, - heartbeat: { - target: "last", - }, - turnSource: { - channel: "telegram", - to: "-100123", - threadId: 42, - }, - }, - expected: { - channel: "telegram", - to: "-100123", - threadId: 42, - }, - }, - { - name: "merges partial turn-scoped metadata with the stored session route for target=last", - request: { - cfg: {}, - entry: { - sessionId: "sess-heartbeat-turn-source-partial", - updatedAt: 1, - lastChannel: "telegram", - lastTo: "-100123", - }, - heartbeat: { - target: "last", - }, - turnSource: { - threadId: 42, - }, - }, - expected: { - channel: "telegram", - to: "-100123", - threadId: 42, - }, - }, - ] satisfies Array<{ - name: string; - request: HeartbeatDeliveryRequest; - expected: Partial>; - }>)("$name", ({ request, expected }) => { - expectResolvedHeartbeatRoute({ request, expected }); + }); + + expect(resolved.channel).toBe("discord"); + expect(resolved.to).toBe("channel:999"); }); it("keeps explicit threadId in heartbeat mode", () => { - expectResolvedSessionTarget({ - request: { - entry: { - sessionId: "sess-heartbeat-explicit-thread", - updatedAt: 1, - lastChannel: "telegram", - lastTo: "-100123", - lastThreadId: 999, - }, - requestedChannel: "last", - mode: "heartbeat", - explicitThreadId: 42, + const resolved = resolveSessionDeliveryTarget({ + entry: { + sessionId: "sess-heartbeat-explicit-thread", + updatedAt: 1, + lastChannel: "telegram", + lastTo: "-100123", + lastThreadId: 999, }, - expected: { + requestedChannel: "last", + mode: "heartbeat", + explicitThreadId: 42, + }); + + expect(resolved.channel).toBe("telegram"); + expect(resolved.to).toBe("-100123"); + expect(resolved.threadId).toBe(42); + expect(resolved.threadIdExplicit).toBe(true); + }); + + it("parses explicit heartbeat topic targets into threadId", () => { + const cfg: OpenClawConfig = {}; + const resolved = resolveHeartbeatDeliveryTarget({ + cfg, + heartbeat: { + target: "telegram", + to: "-10063448508:topic:1008013", + }, + }); + + expect(resolved.channel).toBe("telegram"); + expect(resolved.to).toBe("-10063448508"); + expect(resolved.threadId).toBe(1008013); + }); + + it("prefers turn-scoped routing over mutable session routing for target=last", () => { + const resolved = resolveHeartbeatDeliveryTarget({ + cfg: {}, + entry: { + sessionId: "sess-heartbeat-turn-source", + updatedAt: 1, + lastChannel: "slack", + lastTo: "U_WRONG", + }, + heartbeat: { + target: "last", + }, + turnSource: { channel: "telegram", to: "-100123", threadId: 42, - threadIdExplicit: true, }, }); + + expect(resolved.channel).toBe("telegram"); + expect(resolved.to).toBe("-100123"); + expect(resolved.threadId).toBe(42); + }); + + it("merges partial turn-scoped metadata with the stored session route for target=last", () => { + const resolved = resolveHeartbeatDeliveryTarget({ + cfg: {}, + entry: { + sessionId: "sess-heartbeat-turn-source-partial", + updatedAt: 1, + lastChannel: "telegram", + lastTo: "-100123", + }, + heartbeat: { + target: "last", + }, + turnSource: { + threadId: 42, + }, + }); + + expect(resolved.channel).toBe("telegram"); + expect(resolved.to).toBe("-100123"); + expect(resolved.threadId).toBe(42); }); }); describe("resolveSessionDeliveryTarget — cross-channel reply guard (#24152)", () => { - const expectCrossChannelReplyGuard = (params: { - request: Parameters[0]; - expected: Partial; - }) => { - expect(resolveSessionDeliveryTarget(params.request)).toMatchObject(params.expected); - }; + it("uses turnSourceChannel over session lastChannel when provided", () => { + // Simulate: WhatsApp message originated the turn, but a Slack message + // arrived concurrently and updated lastChannel to "slack" + const resolved = resolveSessionDeliveryTarget({ + entry: { + sessionId: "sess-shared", + updatedAt: 1, + lastChannel: "slack", // <- concurrently overwritten + lastTo: "U0AEMECNCBV", // <- Slack user (wrong target) + }, + requestedChannel: "last", + turnSourceChannel: "whatsapp", // <- originated from WhatsApp + turnSourceTo: "+66972796305", // <- WhatsApp user (correct target) + }); - it.each([ - { - name: "uses turnSourceChannel over session lastChannel when provided", - request: { - entry: { - sessionId: "sess-shared", - updatedAt: 1, - lastChannel: "slack", - lastTo: "U0AEMECNCBV", - }, - requestedChannel: "last", - turnSourceChannel: "whatsapp", - turnSourceTo: "+66972796305", + expect(resolved.channel).toBe("whatsapp"); + expect(resolved.to).toBe("+66972796305"); + }); + + it("falls back to session lastChannel when turnSourceChannel is not set", () => { + const resolved = resolveSessionDeliveryTarget({ + entry: { + sessionId: "sess-normal", + updatedAt: 1, + lastChannel: "telegram", + lastTo: "8587265585", }, - expected: { - channel: "whatsapp", - to: "+66972796305", + requestedChannel: "last", + }); + + expect(resolved.channel).toBe("telegram"); + expect(resolved.to).toBe("8587265585"); + }); + + it("respects explicit requestedChannel over turnSourceChannel", () => { + const resolved = resolveSessionDeliveryTarget({ + entry: { + sessionId: "sess-explicit", + updatedAt: 1, + lastChannel: "slack", + lastTo: "U12345", }, - }, - { - name: "falls back to session lastChannel when turnSourceChannel is not set", - request: { - entry: { - sessionId: "sess-normal", - updatedAt: 1, - lastChannel: "telegram", - lastTo: "8587265585", - }, - requestedChannel: "last", + requestedChannel: "telegram", + explicitTo: "8587265585", + turnSourceChannel: "whatsapp", + turnSourceTo: "+66972796305", + }); + + // Explicit requestedChannel "telegram" is not "last", so it takes priority + expect(resolved.channel).toBe("telegram"); + }); + + it("preserves turnSourceAccountId and turnSourceThreadId", () => { + const resolved = resolveSessionDeliveryTarget({ + entry: { + sessionId: "sess-meta", + updatedAt: 1, + lastChannel: "slack", + lastTo: "U_WRONG", + lastAccountId: "wrong-account", }, - expected: { - channel: "telegram", - to: "8587265585", + requestedChannel: "last", + turnSourceChannel: "telegram", + turnSourceTo: "8587265585", + turnSourceAccountId: "bot-123", + turnSourceThreadId: 42, + }); + + expect(resolved.channel).toBe("telegram"); + expect(resolved.to).toBe("8587265585"); + expect(resolved.accountId).toBe("bot-123"); + expect(resolved.threadId).toBe(42); + }); + + it("does not fall back to session target metadata when turnSourceChannel is set", () => { + const resolved = resolveSessionDeliveryTarget({ + entry: { + sessionId: "sess-no-fallback", + updatedAt: 1, + lastChannel: "slack", + lastTo: "U_WRONG", + lastAccountId: "wrong-account", + lastThreadId: "1739142736.000100", }, - }, - { - name: "respects explicit requestedChannel over turnSourceChannel", - request: { - entry: { - sessionId: "sess-explicit", - updatedAt: 1, - lastChannel: "slack", - lastTo: "U12345", - }, - requestedChannel: "telegram", - explicitTo: "8587265585", - turnSourceChannel: "whatsapp", - turnSourceTo: "+66972796305", + requestedChannel: "last", + turnSourceChannel: "whatsapp", + }); + + expect(resolved.channel).toBe("whatsapp"); + expect(resolved.to).toBeUndefined(); + expect(resolved.accountId).toBeUndefined(); + expect(resolved.threadId).toBeUndefined(); + expect(resolved.lastTo).toBeUndefined(); + expect(resolved.lastAccountId).toBeUndefined(); + expect(resolved.lastThreadId).toBeUndefined(); + }); + + it("falls back to session lastThreadId when turnSourceChannel matches session channel and no explicit turnSourceThreadId", () => { + // Regression: Telegram forum topic replies were landing in the root chat instead of the topic + // thread because turnSourceThreadId was undefined (not explicitly passed), causing lastThreadId + // to be undefined even though the session had the correct lastThreadId from the inbound message. + const resolved = resolveSessionDeliveryTarget({ + entry: { + sessionId: "sess-forum-topic", + updatedAt: 1, + lastChannel: "telegram", + lastTo: "-1001234567890", + lastThreadId: 1122, }, - expected: { - channel: "telegram", + requestedChannel: "last", + turnSourceChannel: "telegram", + turnSourceTo: "-1001234567890", + }); + + expect(resolved.channel).toBe("telegram"); + expect(resolved.to).toBe("-1001234567890"); + expect(resolved.threadId).toBe(1122); + }); + + it("does not fall back to session lastThreadId when turnSourceChannel differs from session channel", () => { + const resolved = resolveSessionDeliveryTarget({ + entry: { + sessionId: "sess-cross-channel-no-thread", + updatedAt: 1, + lastChannel: "slack", + lastTo: "U_SLACK", + lastThreadId: "1739142736.000100", }, - }, - { - name: "preserves turnSourceAccountId and turnSourceThreadId", - request: { - entry: { - sessionId: "sess-meta", - updatedAt: 1, - lastChannel: "slack", - lastTo: "U_WRONG", - lastAccountId: "wrong-account", - }, - requestedChannel: "last", - turnSourceChannel: "telegram", - turnSourceTo: "8587265585", - turnSourceAccountId: "bot-123", - turnSourceThreadId: 42, + requestedChannel: "last", + turnSourceChannel: "telegram", + turnSourceTo: "-1001234567890", + }); + + expect(resolved.channel).toBe("telegram"); + expect(resolved.threadId).toBeUndefined(); + }); + + it("prefers explicit turnSourceThreadId over session lastThreadId on same channel", () => { + const resolved = resolveSessionDeliveryTarget({ + entry: { + sessionId: "sess-explicit-thread-override", + updatedAt: 1, + lastChannel: "telegram", + lastTo: "-1001234567890", + lastThreadId: 1122, }, - expected: { - channel: "telegram", - to: "8587265585", - accountId: "bot-123", - threadId: 42, + requestedChannel: "last", + turnSourceChannel: "telegram", + turnSourceTo: "-1001234567890", + turnSourceThreadId: 9999, + }); + + expect(resolved.channel).toBe("telegram"); + expect(resolved.to).toBe("-1001234567890"); + expect(resolved.threadId).toBe(9999); + }); + + it("drops session threadId when turnSourceTo differs from session to (shared-session race)", () => { + const resolved = resolveSessionDeliveryTarget({ + entry: { + sessionId: "sess-shared-race", + updatedAt: 1, + lastChannel: "telegram", + lastTo: "-1001234567890", + lastThreadId: 1122, }, - }, - { - name: "does not fall back to session target metadata when turnSourceChannel is set", - request: { - entry: { - sessionId: "sess-no-fallback", - updatedAt: 1, - lastChannel: "slack", - lastTo: "U_WRONG", - lastAccountId: "wrong-account", - lastThreadId: "1739142736.000100", - }, - requestedChannel: "last", - turnSourceChannel: "whatsapp", + requestedChannel: "last", + turnSourceChannel: "telegram", + turnSourceTo: "-1009999999999", + }); + + expect(resolved.channel).toBe("telegram"); + expect(resolved.to).toBe("-1009999999999"); + expect(resolved.threadId).toBeUndefined(); + }); + + it("uses explicitTo even when turnSourceTo is omitted", () => { + const resolved = resolveSessionDeliveryTarget({ + entry: { + sessionId: "sess-explicit-to", + updatedAt: 1, + lastChannel: "slack", + lastTo: "U_WRONG", }, - expected: { - channel: "whatsapp", - to: undefined, - accountId: undefined, - threadId: undefined, - lastTo: undefined, - lastAccountId: undefined, - lastThreadId: undefined, + requestedChannel: "last", + explicitTo: "+15551234567", + turnSourceChannel: "whatsapp", + }); + + expect(resolved.channel).toBe("whatsapp"); + expect(resolved.to).toBe("+15551234567"); + }); + + it("still allows mismatched lastTo only from turn-scoped metadata", () => { + const resolved = resolveSessionDeliveryTarget({ + entry: { + sessionId: "sess-mismatch-turn", + updatedAt: 1, + lastChannel: "slack", + lastTo: "U_WRONG", }, - }, - { - name: "uses explicitTo even when turnSourceTo is omitted", - request: { - entry: { - sessionId: "sess-explicit-to", - updatedAt: 1, - lastChannel: "slack", - lastTo: "U_WRONG", - }, - requestedChannel: "last", - explicitTo: "+15551234567", - turnSourceChannel: "whatsapp", - }, - expected: { - channel: "whatsapp", - to: "+15551234567", - }, - }, - { - name: "still allows mismatched lastTo only from turn-scoped metadata", - request: { - entry: { - sessionId: "sess-mismatch-turn", - updatedAt: 1, - lastChannel: "slack", - lastTo: "U_WRONG", - }, - requestedChannel: "telegram", - allowMismatchedLastTo: true, - turnSourceChannel: "whatsapp", - turnSourceTo: "+15550000000", - }, - expected: { - channel: "telegram", - to: "+15550000000", - }, - }, - ] satisfies Array<{ - name: string; - request: Parameters[0]; - expected: Partial; - }>)("$name", ({ request, expected }) => { - expectCrossChannelReplyGuard({ request, expected }); + requestedChannel: "telegram", + allowMismatchedLastTo: true, + turnSourceChannel: "whatsapp", + turnSourceTo: "+15550000000", + }); + + expect(resolved.channel).toBe("telegram"); + expect(resolved.to).toBe("+15550000000"); }); }); diff --git a/src/infra/outbound/targets.ts b/src/infra/outbound/targets.ts index e7f1ebfd6ce..e08d8233ffa 100644 --- a/src/infra/outbound/targets.ts +++ b/src/infra/outbound/targets.ts @@ -117,10 +117,22 @@ export function resolveSessionDeliveryTarget(params: { // When a turn-source channel is provided, use only turn-scoped metadata. // Falling back to mutable session fields would re-introduce routing races. const hasTurnSourceChannel = params.turnSourceChannel != null; + const hasTurnSourceThreadId = + params.turnSourceThreadId != null && params.turnSourceThreadId !== ""; const lastChannel = hasTurnSourceChannel ? params.turnSourceChannel : sessionLastChannel; const lastTo = hasTurnSourceChannel ? params.turnSourceTo : context?.to; const lastAccountId = hasTurnSourceChannel ? params.turnSourceAccountId : context?.accountId; - const lastThreadId = hasTurnSourceChannel ? params.turnSourceThreadId : context?.threadId; + // Fall back to the session's stored threadId only when the turn-source channel AND destination + // match the session context. This avoids mixing a turn-scoped `to` with a stale session-scoped + // threadId from a different chat/topic in shared-session scenarios. + const turnToMatchesSession = + !params.turnSourceTo || !context?.to || params.turnSourceTo === context.to; + const lastThreadId = hasTurnSourceThreadId + ? params.turnSourceThreadId + : hasTurnSourceChannel && + (params.turnSourceChannel !== sessionLastChannel || !turnToMatchesSession) + ? undefined + : context?.threadId; const rawRequested = params.requestedChannel ?? "last"; const requested = rawRequested === "last" ? "last" : normalizeMessageChannel(rawRequested); @@ -166,8 +178,6 @@ export function resolveSessionDeliveryTarget(params: { const mode = params.mode ?? (explicitTo ? "explicit" : "implicit"); const accountId = channel && channel === lastChannel ? lastAccountId : undefined; - const hasTurnSourceThreadId = - params.turnSourceThreadId != null && params.turnSourceThreadId !== ""; const threadId = channel && channel === lastChannel ? mode === "heartbeat"