diff --git a/extensions/telegram/src/bot-message-context.dm-topic-threadid.test.ts b/extensions/telegram/src/bot-message-context.dm-topic-threadid.test.ts index 0050c1c1c56..5acef982483 100644 --- a/extensions/telegram/src/bot-message-context.dm-topic-threadid.test.ts +++ b/extensions/telegram/src/bot-message-context.dm-topic-threadid.test.ts @@ -99,7 +99,7 @@ describe("buildTelegramMessageContext DM topic threadId in deliveryContext (#889 expect(ctx).not.toBeNull(); expect(recordInboundSessionMock).toHaveBeenCalled(); - expectRecordedRoute({ to: "telegram:-1001234567890", threadId: "99" }); + expectRecordedRoute({ to: "telegram:-1001234567890:topic:99", threadId: "99" }); }); it("passes threadId to updateLastRoute for the forum General topic", async () => { @@ -115,6 +115,6 @@ describe("buildTelegramMessageContext DM topic threadId in deliveryContext (#889 expect(ctx).not.toBeNull(); expect(recordInboundSessionMock).toHaveBeenCalled(); - expectRecordedRoute({ to: "telegram:-1001234567890", threadId: "1" }); + expectRecordedRoute({ to: "telegram:-1001234567890:topic:1", threadId: "1" }); }); }); diff --git a/extensions/telegram/src/bot-message-context.session.ts b/extensions/telegram/src/bot-message-context.session.ts index 12f37e7a74e..fe251e2c601 100644 --- a/extensions/telegram/src/bot-message-context.session.ts +++ b/extensions/telegram/src/bot-message-context.session.ts @@ -280,7 +280,10 @@ export async function buildTelegramInboundContextPayload(params: { ? { sessionKey: updateLastRouteSessionKey, channel: "telegram", - to: `telegram:${chatId}`, + to: + isGroup && updateLastRouteThreadId != null + ? `telegram:${chatId}:topic:${updateLastRouteThreadId}` + : `telegram:${chatId}`, accountId: route.accountId, threadId: updateLastRouteThreadId, mainDmOwnerPin: diff --git a/src/agents/subagent-announce-delivery.test.ts b/src/agents/subagent-announce-delivery.test.ts new file mode 100644 index 00000000000..232ef5acba7 --- /dev/null +++ b/src/agents/subagent-announce-delivery.test.ts @@ -0,0 +1,43 @@ +import { describe, expect, it } from "vitest"; +import { resolveAnnounceOrigin } from "./subagent-announce-delivery.js"; + +describe("resolveAnnounceOrigin telegram forum topics", () => { + it("preserves stored forum topic thread ids when requester origin omits one for the same chat", () => { + expect( + resolveAnnounceOrigin( + { + lastChannel: "telegram", + lastTo: "telegram:-1001234567890:topic:99", + lastThreadId: 99, + }, + { + channel: "telegram", + to: "telegram:-1001234567890", + }, + ), + ).toEqual({ + channel: "telegram", + to: "telegram:-1001234567890", + threadId: 99, + }); + }); + + it("still strips stale thread ids when the stored telegram route points at a different chat", () => { + expect( + resolveAnnounceOrigin( + { + lastChannel: "telegram", + lastTo: "telegram:-1009999999999:topic:99", + lastThreadId: 99, + }, + { + channel: "telegram", + to: "telegram:-1001234567890", + }, + ), + ).toEqual({ + channel: "telegram", + to: "telegram:-1001234567890", + }); + }); +}); diff --git a/src/agents/subagent-announce-delivery.ts b/src/agents/subagent-announce-delivery.ts index 9b13903238d..4798b031a66 100644 --- a/src/agents/subagent-announce-delivery.ts +++ b/src/agents/subagent-announce-delivery.ts @@ -92,6 +92,79 @@ function summarizeDeliveryError(error: unknown): string { } } +function stripTelegramAnnouncePrefix(to: string): string { + let trimmed = to.trim(); + let strippedTelegramPrefix = false; + while (true) { + const next = (() => { + if (/^(telegram|tg):/i.test(trimmed)) { + strippedTelegramPrefix = true; + return trimmed.replace(/^(telegram|tg):/i, "").trim(); + } + if (strippedTelegramPrefix && /^group:/i.test(trimmed)) { + return trimmed.replace(/^group:/i, "").trim(); + } + return trimmed; + })(); + if (next === trimmed) { + return trimmed; + } + trimmed = next; + } +} + +function parseTelegramAnnounceTarget(to: string): { + chatId: string; + chatType: "direct" | "group" | "unknown"; +} { + const normalized = stripTelegramAnnouncePrefix(to); + const topicMatch = /^(.+?):topic:(\d+)$/.exec(normalized); + const colonMatch = /^(.+):(\d+)$/.exec(normalized); + const chatId = topicMatch?.[1] ?? colonMatch?.[1] ?? normalized; + const trimmedChatId = chatId.trim(); + const chatType = /^-?\d+$/.test(trimmedChatId) + ? trimmedChatId.startsWith("-") + ? "group" + : "direct" + : "unknown"; + return { chatId: trimmedChatId, chatType }; +} + +function shouldStripThreadFromAnnounceEntry( + normalizedRequester?: DeliveryContext, + normalizedEntry?: DeliveryContext, +): boolean { + if ( + !normalizedRequester?.to || + normalizedRequester.threadId != null || + normalizedEntry?.threadId == null + ) { + return false; + } + const requesterChannel = normalizedRequester.channel?.trim().toLowerCase(); + if (requesterChannel && requesterChannel !== "telegram") { + return true; + } + if (!requesterChannel && !normalizedRequester.to.startsWith("telegram:")) { + return true; + } + try { + const requesterTarget = parseTelegramAnnounceTarget(normalizedRequester.to); + if (requesterTarget.chatType !== "group") { + return true; + } + const entryTarget = normalizedEntry.to + ? parseTelegramAnnounceTarget(normalizedEntry.to) + : undefined; + if (entryTarget && entryTarget.chatId !== requesterTarget.chatId) { + return true; + } + return false; + } catch { + return false; + } +} + const TRANSIENT_ANNOUNCE_DELIVERY_ERROR_PATTERNS: readonly RegExp[] = [ /\berrorcode=unavailable\b/i, /\bstatus\s*[:=]\s*"?unavailable\b/i, @@ -196,9 +269,7 @@ export function resolveAnnounceOrigin( ); } const entryForMerge = - normalizedRequester?.to && - normalizedRequester.threadId == null && - normalizedEntry?.threadId != null + normalizedEntry && shouldStripThreadFromAnnounceEntry(normalizedRequester, normalizedEntry) ? (() => { const { threadId: _ignore, ...rest } = normalizedEntry; return rest; diff --git a/src/config/sessions/delivery-info.test.ts b/src/config/sessions/delivery-info.test.ts index 002b40f6479..b32f5b32089 100644 --- a/src/config/sessions/delivery-info.test.ts +++ b/src/config/sessions/delivery-info.test.ts @@ -123,6 +123,7 @@ describe("extractDeliveryInfo", () => { to: "group:98765", accountId: "main", }); + storeState.store[baseKey].lastThreadId = "55"; const result = extractDeliveryInfo(topicKey); @@ -131,8 +132,33 @@ describe("extractDeliveryInfo", () => { channel: "telegram", to: "group:98765", accountId: "main", + threadId: "55", }, threadId: "55", }); }); + + it("falls back to session metadata thread ids when deliveryContext.threadId is missing", () => { + const sessionKey = "agent:main:telegram:group:98765"; + storeState.store[sessionKey] = { + ...buildEntry({ + channel: "telegram", + to: "group:98765", + accountId: "main", + }), + origin: { threadId: 77 }, + }; + + const result = extractDeliveryInfo(sessionKey); + + expect(result).toEqual({ + deliveryContext: { + channel: "telegram", + to: "group:98765", + accountId: "main", + threadId: "77", + }, + threadId: undefined, + }); + }); }); diff --git a/src/config/sessions/delivery-info.ts b/src/config/sessions/delivery-info.ts index 91295b1814b..43ea169599c 100644 --- a/src/config/sessions/delivery-info.ts +++ b/src/config/sessions/delivery-info.ts @@ -15,7 +15,9 @@ export function parseSessionThreadInfo(sessionKey: string | undefined): { } export function extractDeliveryInfo(sessionKey: string | undefined): { - deliveryContext: { channel?: string; to?: string; accountId?: string } | undefined; + deliveryContext: + | { channel?: string; to?: string; accountId?: string; threadId?: string } + | undefined; threadId: string | undefined; } { const { baseSessionKey, threadId } = parseSessionThreadInfo(sessionKey); @@ -23,7 +25,9 @@ export function extractDeliveryInfo(sessionKey: string | undefined): { return { deliveryContext: undefined, threadId }; } - let deliveryContext: { channel?: string; to?: string; accountId?: string } | undefined; + let deliveryContext: + | { channel?: string; to?: string; accountId?: string; threadId?: string } + | undefined; try { const cfg = loadConfig(); const storePath = resolveStorePath(cfg.session?.store); @@ -33,10 +37,13 @@ export function extractDeliveryInfo(sessionKey: string | undefined): { entry = store[baseSessionKey]; } if (entry?.deliveryContext) { + const resolvedThreadId = + entry.deliveryContext.threadId ?? entry.lastThreadId ?? entry.origin?.threadId; deliveryContext = { channel: entry.deliveryContext.channel, to: entry.deliveryContext.to, accountId: entry.deliveryContext.accountId, + threadId: resolvedThreadId != null ? String(resolvedThreadId) : undefined, }; } } catch { diff --git a/src/cron/legacy-delivery.test.ts b/src/cron/legacy-delivery.test.ts new file mode 100644 index 00000000000..8cdba4de5fa --- /dev/null +++ b/src/cron/legacy-delivery.test.ts @@ -0,0 +1,66 @@ +import { describe, expect, it } from "vitest"; +import { + buildDeliveryFromLegacyPayload, + buildDeliveryPatchFromLegacyPayload, + hasLegacyDeliveryHints, + mergeLegacyDeliveryInto, + normalizeLegacyDeliveryInput, +} from "./legacy-delivery.js"; + +describe("legacy delivery threadId support", () => { + it("treats threadId as a legacy delivery hint", () => { + expect(hasLegacyDeliveryHints({ threadId: "42" })).toBe(true); + expect(hasLegacyDeliveryHints({ threadId: 42 })).toBe(true); + }); + + it("hydrates threadId into new delivery payloads", () => { + expect( + buildDeliveryFromLegacyPayload({ + channel: "telegram", + to: "-100123:topic:42", + threadId: 42, + }), + ).toEqual({ + mode: "announce", + channel: "telegram", + to: "-100123:topic:42", + threadId: "42", + }); + }); + + it("patches and merges threadId into existing deliveries", () => { + expect(buildDeliveryPatchFromLegacyPayload({ threadId: "77" })).toEqual({ + mode: "announce", + threadId: "77", + }); + + expect( + mergeLegacyDeliveryInto( + { mode: "announce", channel: "telegram", to: "-100123", threadId: "1" }, + { threadId: 77 }, + ), + ).toEqual({ + delivery: { mode: "announce", channel: "telegram", to: "-100123", threadId: "77" }, + mutated: true, + }); + }); + + it("strips threadId from legacy payloads after normalization", () => { + const payload: Record = { + channel: "telegram", + to: "-100123:topic:42", + threadId: 42, + }; + + expect(normalizeLegacyDeliveryInput({ payload })).toEqual({ + delivery: { + mode: "announce", + channel: "telegram", + to: "-100123:topic:42", + threadId: "42", + }, + mutated: true, + }); + expect(payload.threadId).toBeUndefined(); + }); +}); diff --git a/src/cron/legacy-delivery.ts b/src/cron/legacy-delivery.ts index 0474f5d7b95..a1dc4210ef0 100644 --- a/src/cron/legacy-delivery.ts +++ b/src/cron/legacy-delivery.ts @@ -14,6 +14,12 @@ export function hasLegacyDeliveryHints(payload: Record) { if (typeof payload.to === "string" && payload.to.trim()) { return true; } + if (typeof payload.threadId === "string" && payload.threadId.trim()) { + return true; + } + if (typeof payload.threadId === "number" && Number.isFinite(payload.threadId)) { + return true; + } return false; } @@ -29,6 +35,12 @@ export function buildDeliveryFromLegacyPayload( ? payload.provider.trim().toLowerCase() : ""; const toRaw = typeof payload.to === "string" ? payload.to.trim() : ""; + const threadIdRaw = + typeof payload.threadId === "string" + ? payload.threadId.trim() + : typeof payload.threadId === "number" && Number.isFinite(payload.threadId) + ? String(payload.threadId) + : ""; const next: Record = { mode }; if (channelRaw) { next.channel = channelRaw; @@ -36,6 +48,9 @@ export function buildDeliveryFromLegacyPayload( if (toRaw) { next.to = toRaw; } + if (threadIdRaw) { + next.threadId = threadIdRaw; + } if (typeof payload.bestEffortDeliver === "boolean") { next.bestEffort = payload.bestEffortDeliver; } @@ -51,6 +66,12 @@ export function buildDeliveryPatchFromLegacyPayload(payload: Record = {}; let hasPatch = false; @@ -61,6 +82,7 @@ export function buildDeliveryPatchFromLegacyPayload(payload: Record) { if ("to" in payload) { delete payload.to; } + if ("threadId" in payload) { + delete payload.threadId; + } if ("bestEffortDeliver" in payload) { delete payload.bestEffortDeliver; }