diff --git a/src/telegram/bot-handlers.ts b/src/telegram/bot-handlers.ts index f8873aa9da2..9970b2cd9b1 100644 --- a/src/telegram/bot-handlers.ts +++ b/src/telegram/bot-handlers.ts @@ -1,4 +1,5 @@ import type { Message } from "@grammyjs/types"; +import type { TelegramGroupConfig, TelegramTopicConfig } from "../config/types.js"; import type { TelegramMediaRef } from "./bot-message-context.js"; import type { TelegramContext } from "./bot/types.js"; import { resolveDefaultAgentId } from "../agents/agent-scope.js"; @@ -21,7 +22,11 @@ import { readChannelAllowFromStore } from "../pairing/pairing-store.js"; import { resolveAgentRoute } from "../routing/resolve-route.js"; import { resolveThreadSessionKeys } from "../routing/session-key.js"; import { withTelegramApiErrorLogging } from "./api-logging.js"; -import { firstDefined, isSenderAllowed, normalizeAllowFromWithStore } from "./bot-access.js"; +import { + isSenderAllowed, + normalizeAllowFromWithStore, + type NormalizedAllowFrom, +} from "./bot-access.js"; import { RegisterTelegramHandlerParams } from "./bot-native-commands.js"; import { MEDIA_GROUP_TIMEOUT_MS, type MediaGroupEntry } from "./bot-updates.js"; import { resolveMedia } from "./bot/delivery.js"; @@ -31,6 +36,10 @@ import { resolveTelegramForumThreadId, resolveTelegramGroupAllowFromContext, } from "./bot/helpers.js"; +import { + evaluateTelegramGroupBaseAccess, + evaluateTelegramGroupPolicyAccess, +} from "./group-access.js"; import { migrateTelegramGroupConfig } from "./group-migration.js"; import { resolveTelegramInlineButtonsScope } from "./inline-buttons.js"; import { @@ -227,11 +236,7 @@ export const registerTelegramHandlers = ({ } } - const storeAllowFrom = await readChannelAllowFromStore( - "telegram", - process.env, - accountId, - ).catch(() => []); + const storeAllowFrom = await loadStoreAllowFrom(); await processMessage(primaryEntry.ctx, allMedia, storeAllowFrom); } catch (err) { runtime.error?.(danger(`media group handler failed: ${String(err)}`)); @@ -262,11 +267,7 @@ export const registerTelegramHandlers = ({ date: last.msg.date ?? first.msg.date, }; - const storeAllowFrom = await readChannelAllowFromStore( - "telegram", - process.env, - accountId, - ).catch(() => []); + const storeAllowFrom = await loadStoreAllowFrom(); const baseCtx = first.ctx; const getFile = typeof baseCtx.getFile === "function" ? baseCtx.getFile.bind(baseCtx) : async () => ({}); @@ -282,19 +283,170 @@ export const registerTelegramHandlers = ({ } }; + const queueTextFragmentFlush = async (entry: TextFragmentEntry) => { + textFragmentProcessing = textFragmentProcessing + .then(async () => { + await flushTextFragments(entry); + }) + .catch(() => undefined); + await textFragmentProcessing; + }; + + const runTextFragmentFlush = async (entry: TextFragmentEntry) => { + textFragmentBuffer.delete(entry.key); + await queueTextFragmentFlush(entry); + }; + const scheduleTextFragmentFlush = (entry: TextFragmentEntry) => { clearTimeout(entry.timer); entry.timer = setTimeout(async () => { - textFragmentBuffer.delete(entry.key); - textFragmentProcessing = textFragmentProcessing - .then(async () => { - await flushTextFragments(entry); - }) - .catch(() => undefined); - await textFragmentProcessing; + await runTextFragmentFlush(entry); }, TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS); }; + const enqueueMediaGroupFlush = async (mediaGroupId: string, entry: MediaGroupEntry) => { + mediaGroupBuffer.delete(mediaGroupId); + mediaGroupProcessing = mediaGroupProcessing + .then(async () => { + await processMediaGroup(entry); + }) + .catch(() => undefined); + await mediaGroupProcessing; + }; + + const scheduleMediaGroupFlush = (mediaGroupId: string, entry: MediaGroupEntry) => { + clearTimeout(entry.timer); + entry.timer = setTimeout(async () => { + await enqueueMediaGroupFlush(mediaGroupId, entry); + }, mediaGroupTimeoutMs); + }; + + const getOrCreateMediaGroupEntry = (mediaGroupId: string) => { + const existing = mediaGroupBuffer.get(mediaGroupId); + if (existing) { + return existing; + } + const entry: MediaGroupEntry = { + messages: [], + timer: setTimeout(() => undefined, mediaGroupTimeoutMs), + }; + mediaGroupBuffer.set(mediaGroupId, entry); + return entry; + }; + + const loadStoreAllowFrom = async () => + readChannelAllowFromStore("telegram", process.env, accountId).catch(() => []); + + const isAllowlistAuthorized = ( + allow: NormalizedAllowFrom, + senderId: string, + senderUsername: string, + ) => + allow.hasWildcard || + (allow.hasEntries && + isSenderAllowed({ + allow, + senderId, + senderUsername, + })); + + const shouldSkipGroupMessage = (params: { + isGroup: boolean; + chatId: string | number; + chatTitle?: string; + resolvedThreadId?: number; + senderId: string; + senderUsername: string; + effectiveGroupAllow: NormalizedAllowFrom; + hasGroupAllowOverride: boolean; + groupConfig?: TelegramGroupConfig; + topicConfig?: TelegramTopicConfig; + }) => { + const { + isGroup, + chatId, + chatTitle, + resolvedThreadId, + senderId, + senderUsername, + effectiveGroupAllow, + hasGroupAllowOverride, + groupConfig, + topicConfig, + } = params; + const baseAccess = evaluateTelegramGroupBaseAccess({ + isGroup, + groupConfig, + topicConfig, + hasGroupAllowOverride, + effectiveGroupAllow, + senderId, + senderUsername, + enforceAllowOverride: true, + requireSenderForAllowOverride: true, + }); + if (!baseAccess.allowed) { + if (baseAccess.reason === "group-disabled") { + logVerbose(`Blocked telegram group ${chatId} (group disabled)`); + return true; + } + if (baseAccess.reason === "topic-disabled") { + logVerbose( + `Blocked telegram topic ${chatId} (${resolvedThreadId ?? "unknown"}) (topic disabled)`, + ); + return true; + } + logVerbose( + `Blocked telegram group sender ${senderId || "unknown"} (group allowFrom override)`, + ); + return true; + } + if (!isGroup) { + return false; + } + const policyAccess = evaluateTelegramGroupPolicyAccess({ + isGroup, + chatId, + cfg, + telegramCfg, + topicConfig, + groupConfig, + effectiveGroupAllow, + senderId, + senderUsername, + resolveGroupPolicy, + enforcePolicy: true, + useTopicAndGroupOverrides: true, + enforceAllowlistAuthorization: true, + allowEmptyAllowlistEntries: false, + requireSenderForAllowlistAuthorization: true, + checkChatAllowlist: true, + }); + if (!policyAccess.allowed) { + if (policyAccess.reason === "group-policy-disabled") { + logVerbose("Blocked telegram group message (groupPolicy: disabled)"); + return true; + } + if (policyAccess.reason === "group-policy-allowlist-no-sender") { + logVerbose("Blocked telegram group message (no sender ID, groupPolicy: allowlist)"); + return true; + } + if (policyAccess.reason === "group-policy-allowlist-empty") { + logVerbose( + "Blocked telegram group message (groupPolicy: allowlist, no group allowlist entries)", + ); + return true; + } + if (policyAccess.reason === "group-policy-allowlist-unauthorized") { + logVerbose(`Blocked telegram group message from ${senderId} (groupPolicy: allowlist)`); + return true; + } + logger.info({ chatId, title: chatTitle, reason: "not-allowed" }, "skipping group message"); + return true; + } + return false; + }; + bot.on("callback_query", async (ctx) => { const callback = ctx.callbackQuery; if (!callback) { @@ -303,11 +455,15 @@ export const registerTelegramHandlers = ({ if (shouldSkipUpdate(ctx)) { return; } + const answerCallbackQuery = + typeof (ctx as { answerCallbackQuery?: unknown }).answerCallbackQuery === "function" + ? () => ctx.answerCallbackQuery() + : () => bot.api.answerCallbackQuery(callback.id); // Answer immediately to prevent Telegram from retrying while we process await withTelegramApiErrorLogging({ operation: "answerCallbackQuery", runtime, - fn: () => bot.api.answerCallbackQuery(callback.id), + fn: answerCallbackQuery, }).catch(() => {}); try { const data = (callback.data ?? "").trim(); @@ -315,6 +471,38 @@ export const registerTelegramHandlers = ({ if (!data || !callbackMessage) { return; } + const editCallbackMessage = async ( + text: string, + params?: Parameters[3], + ) => { + const editTextFn = (ctx as { editMessageText?: unknown }).editMessageText; + if (typeof editTextFn === "function") { + return await ctx.editMessageText(text, params); + } + return await bot.api.editMessageText( + callbackMessage.chat.id, + callbackMessage.message_id, + text, + params, + ); + }; + const deleteCallbackMessage = async () => { + const deleteFn = (ctx as { deleteMessage?: unknown }).deleteMessage; + if (typeof deleteFn === "function") { + return await ctx.deleteMessage(); + } + return await bot.api.deleteMessage(callbackMessage.chat.id, callbackMessage.message_id); + }; + const replyToCallbackChat = async ( + text: string, + params?: Parameters[2], + ) => { + const replyFn = (ctx as { reply?: unknown }).reply; + if (typeof replyFn === "function") { + return await ctx.reply(text, params); + } + return await bot.api.sendMessage(callbackMessage.chat.id, text, params); + }; const inlineButtonsScope = resolveTelegramInlineButtonsScope({ cfg, @@ -344,8 +532,14 @@ export const registerTelegramHandlers = ({ groupAllowFrom, resolveTelegramGroupConfig, }); - const { resolvedThreadId, storeAllowFrom, groupConfig, topicConfig, effectiveGroupAllow } = - groupAllowContext; + const { + resolvedThreadId, + storeAllowFrom, + groupConfig, + topicConfig, + effectiveGroupAllow, + hasGroupAllowOverride, + } = groupAllowContext; const effectiveDmAllow = normalizeAllowFromWithStore({ allowFrom: telegramCfg.allowFrom, storeAllowFrom, @@ -353,75 +547,21 @@ export const registerTelegramHandlers = ({ const dmPolicy = telegramCfg.dmPolicy ?? "pairing"; const senderId = callback.from?.id ? String(callback.from.id) : ""; const senderUsername = callback.from?.username ?? ""; - - if (isGroup) { - if (groupConfig?.enabled === false) { - logVerbose(`Blocked telegram group ${chatId} (group disabled)`); - return; - } - if (topicConfig?.enabled === false) { - logVerbose( - `Blocked telegram topic ${chatId} (${resolvedThreadId ?? "unknown"}) (topic disabled)`, - ); - return; - } - if (groupAllowContext.hasGroupAllowOverride) { - const allowed = - senderId && - isSenderAllowed({ - allow: effectiveGroupAllow, - senderId, - senderUsername, - }); - if (!allowed) { - logVerbose( - `Blocked telegram group sender ${senderId || "unknown"} (group allowFrom override)`, - ); - return; - } - } - const defaultGroupPolicy = cfg.channels?.defaults?.groupPolicy; - const groupPolicy = firstDefined( - topicConfig?.groupPolicy, - groupConfig?.groupPolicy, - telegramCfg.groupPolicy, - defaultGroupPolicy, - "open", - ); - if (groupPolicy === "disabled") { - logVerbose(`Blocked telegram group message (groupPolicy: disabled)`); - return; - } - if (groupPolicy === "allowlist") { - if (!senderId) { - logVerbose(`Blocked telegram group message (no sender ID, groupPolicy: allowlist)`); - return; - } - if (!effectiveGroupAllow.hasEntries) { - logVerbose( - "Blocked telegram group message (groupPolicy: allowlist, no group allowlist entries)", - ); - return; - } - if ( - !isSenderAllowed({ - allow: effectiveGroupAllow, - senderId, - senderUsername, - }) - ) { - logVerbose(`Blocked telegram group message from ${senderId} (groupPolicy: allowlist)`); - return; - } - } - const groupAllowlist = resolveGroupPolicy(chatId); - if (groupAllowlist.allowlistEnabled && !groupAllowlist.allowed) { - logger.info( - { chatId, title: callbackMessage.chat.title, reason: "not-allowed" }, - "skipping group message", - ); - return; - } + if ( + shouldSkipGroupMessage({ + isGroup, + chatId, + chatTitle: callbackMessage.chat.title, + resolvedThreadId, + senderId, + senderUsername, + effectiveGroupAllow, + hasGroupAllowOverride, + groupConfig, + topicConfig, + }) + ) { + return; } if (inlineButtonsScope === "allowlist") { @@ -430,27 +570,13 @@ export const registerTelegramHandlers = ({ return; } if (dmPolicy !== "open") { - const allowed = - effectiveDmAllow.hasWildcard || - (effectiveDmAllow.hasEntries && - isSenderAllowed({ - allow: effectiveDmAllow, - senderId, - senderUsername, - })); + const allowed = isAllowlistAuthorized(effectiveDmAllow, senderId, senderUsername); if (!allowed) { return; } } } else { - const allowed = - effectiveGroupAllow.hasWildcard || - (effectiveGroupAllow.hasEntries && - isSenderAllowed({ - allow: effectiveGroupAllow, - senderId, - senderUsername, - })); + const allowed = isAllowlistAuthorized(effectiveGroupAllow, senderId, senderUsername); if (!allowed) { return; } @@ -487,12 +613,7 @@ export const registerTelegramHandlers = ({ : undefined; try { - await bot.api.editMessageText( - callbackMessage.chat.id, - callbackMessage.message_id, - result.text, - keyboard ? { reply_markup: keyboard } : undefined, - ); + await editCallbackMessage(result.text, keyboard ? { reply_markup: keyboard } : undefined); } catch (editErr) { const errStr = String(editErr); if (!errStr.includes("message is not modified")) { @@ -514,23 +635,14 @@ export const registerTelegramHandlers = ({ ) => { const keyboard = buildInlineKeyboard(buttons); try { - await bot.api.editMessageText( - callbackMessage.chat.id, - callbackMessage.message_id, - text, - keyboard ? { reply_markup: keyboard } : undefined, - ); + await editCallbackMessage(text, keyboard ? { reply_markup: keyboard } : undefined); } catch (editErr) { const errStr = String(editErr); if (errStr.includes("no text in the message")) { try { - await bot.api.deleteMessage(callbackMessage.chat.id, callbackMessage.message_id); + await deleteCallbackMessage(); } catch {} - await bot.api.sendMessage( - callbackMessage.chat.id, - text, - keyboard ? { reply_markup: keyboard } : undefined, - ); + await replyToCallbackChat(text, keyboard ? { reply_markup: keyboard } : undefined); } else if (!errStr.includes("message is not modified")) { throw editErr; } @@ -723,85 +835,23 @@ export const registerTelegramHandlers = ({ hasGroupAllowOverride, } = groupAllowContext; - if (isGroup) { - if (groupConfig?.enabled === false) { - logVerbose(`Blocked telegram group ${chatId} (group disabled)`); - return; - } - if (topicConfig?.enabled === false) { - logVerbose( - `Blocked telegram topic ${chatId} (${resolvedThreadId ?? "unknown"}) (topic disabled)`, - ); - return; - } - if (hasGroupAllowOverride) { - const senderId = msg.from?.id; - const senderUsername = msg.from?.username ?? ""; - const allowed = - senderId != null && - isSenderAllowed({ - allow: effectiveGroupAllow, - senderId: String(senderId), - senderUsername, - }); - if (!allowed) { - logVerbose( - `Blocked telegram group sender ${senderId ?? "unknown"} (group allowFrom override)`, - ); - return; - } - } - // Group policy filtering: controls how group messages are handled - // - "open": groups bypass allowFrom, only mention-gating applies - // - "disabled": block all group messages entirely - // - "allowlist": only allow group messages from senders in groupAllowFrom/allowFrom - const defaultGroupPolicy = cfg.channels?.defaults?.groupPolicy; - const groupPolicy = firstDefined( - topicConfig?.groupPolicy, - groupConfig?.groupPolicy, - telegramCfg.groupPolicy, - defaultGroupPolicy, - "open", - ); - if (groupPolicy === "disabled") { - logVerbose(`Blocked telegram group message (groupPolicy: disabled)`); - return; - } - if (groupPolicy === "allowlist") { - // For allowlist mode, the sender (msg.from.id) must be in allowFrom - const senderId = msg.from?.id; - if (senderId == null) { - logVerbose(`Blocked telegram group message (no sender ID, groupPolicy: allowlist)`); - return; - } - if (!effectiveGroupAllow.hasEntries) { - logVerbose( - "Blocked telegram group message (groupPolicy: allowlist, no group allowlist entries)", - ); - return; - } - const senderUsername = msg.from?.username ?? ""; - if ( - !isSenderAllowed({ - allow: effectiveGroupAllow, - senderId: String(senderId), - senderUsername, - }) - ) { - logVerbose(`Blocked telegram group message from ${senderId} (groupPolicy: allowlist)`); - return; - } - } - - // Group allowlist based on configured group IDs. - const groupAllowlist = resolveGroupPolicy(chatId); - if (groupAllowlist.allowlistEnabled && !groupAllowlist.allowed) { - logger.info( - { chatId, title: msg.chat.title, reason: "not-allowed" }, - "skipping group message", - ); - return; - } + const senderId = msg.from?.id != null ? String(msg.from.id) : ""; + const senderUsername = msg.from?.username ?? ""; + if ( + shouldSkipGroupMessage({ + isGroup, + chatId, + chatTitle: msg.chat.title, + resolvedThreadId, + senderId, + senderUsername, + effectiveGroupAllow, + hasGroupAllowOverride, + groupConfig, + topicConfig, + }) + ) { + return; } // Text fragment handling - Telegram splits long pastes into multiple inbound messages (~4096 chars). @@ -844,13 +894,7 @@ export const registerTelegramHandlers = ({ // Not appendable (or limits exceeded): flush buffered entry first, then continue normally. clearTimeout(existing.timer); - textFragmentBuffer.delete(key); - textFragmentProcessing = textFragmentProcessing - .then(async () => { - await flushTextFragments(existing); - }) - .catch(() => undefined); - await textFragmentProcessing; + await runTextFragmentFlush(existing); } const shouldStart = text.length >= TELEGRAM_TEXT_FRAGMENT_START_THRESHOLD_CHARS; @@ -869,34 +913,9 @@ export const registerTelegramHandlers = ({ // Media group handling - buffer multi-image messages const mediaGroupId = msg.media_group_id; if (mediaGroupId) { - const existing = mediaGroupBuffer.get(mediaGroupId); - if (existing) { - clearTimeout(existing.timer); - existing.messages.push({ msg, ctx }); - existing.timer = setTimeout(async () => { - mediaGroupBuffer.delete(mediaGroupId); - mediaGroupProcessing = mediaGroupProcessing - .then(async () => { - await processMediaGroup(existing); - }) - .catch(() => undefined); - await mediaGroupProcessing; - }, mediaGroupTimeoutMs); - } else { - const entry: MediaGroupEntry = { - messages: [{ msg, ctx }], - timer: setTimeout(async () => { - mediaGroupBuffer.delete(mediaGroupId); - mediaGroupProcessing = mediaGroupProcessing - .then(async () => { - await processMediaGroup(entry); - }) - .catch(() => undefined); - await mediaGroupProcessing; - }, mediaGroupTimeoutMs), - }; - mediaGroupBuffer.set(mediaGroupId, entry); - } + const entry = getOrCreateMediaGroupEntry(mediaGroupId); + entry.messages.push({ msg, ctx }); + scheduleMediaGroupFlush(mediaGroupId, entry); return; } @@ -938,7 +957,6 @@ export const registerTelegramHandlers = ({ }, ] : []; - const senderId = msg.from?.id ? String(msg.from.id) : ""; const conversationKey = resolvedThreadId != null ? `${chatId}:topic:${resolvedThreadId}` : String(chatId); const debounceKey = senderId diff --git a/src/telegram/bot-message-context.ts b/src/telegram/bot-message-context.ts index a8ae50db178..78db031fe12 100644 --- a/src/telegram/bot-message-context.ts +++ b/src/telegram/bot-message-context.ts @@ -56,6 +56,7 @@ import { hasBotMention, resolveTelegramThreadSpec, } from "./bot/helpers.js"; +import { evaluateTelegramGroupBaseAccess } from "./group-access.js"; export type TelegramMediaRef = { path: string; @@ -192,15 +193,31 @@ export const buildTelegramMessageContext = async ({ storeAllowFrom, }); const hasGroupAllowOverride = typeof groupAllowOverride !== "undefined"; - - if (isGroup && groupConfig?.enabled === false) { - logVerbose(`Blocked telegram group ${chatId} (group disabled)`); - return null; - } - if (isGroup && topicConfig?.enabled === false) { - logVerbose( - `Blocked telegram topic ${chatId} (${resolvedThreadId ?? "unknown"}) (topic disabled)`, - ); + const senderId = msg.from?.id ? String(msg.from.id) : ""; + const senderUsername = msg.from?.username ?? ""; + const baseAccess = evaluateTelegramGroupBaseAccess({ + isGroup, + groupConfig, + topicConfig, + hasGroupAllowOverride, + effectiveGroupAllow, + senderId, + senderUsername, + enforceAllowOverride: true, + requireSenderForAllowOverride: false, + }); + if (!baseAccess.allowed) { + if (baseAccess.reason === "group-disabled") { + logVerbose(`Blocked telegram group ${chatId} (group disabled)`); + return null; + } + if (baseAccess.reason === "topic-disabled") { + logVerbose( + `Blocked telegram topic ${chatId} (${resolvedThreadId ?? "unknown"}) (topic disabled)`, + ); + return null; + } + logVerbose(`Blocked telegram group sender ${senderId || "unknown"} (group allowFrom override)`); return null; } @@ -320,21 +337,6 @@ export const buildTelegramMessageContext = async ({ } const botUsername = primaryCtx.me?.username?.toLowerCase(); - const senderId = msg.from?.id ? String(msg.from.id) : ""; - const senderUsername = msg.from?.username ?? ""; - if (isGroup && hasGroupAllowOverride) { - const allowed = isSenderAllowed({ - allow: effectiveGroupAllow, - senderId, - senderUsername, - }); - if (!allowed) { - logVerbose( - `Blocked telegram group sender ${senderId || "unknown"} (group allowFrom override)`, - ); - return null; - } - } const allowForCommands = isGroup ? effectiveGroupAllow : effectiveDmAllow; const senderAllowedForCommands = isSenderAllowed({ allow: allowForCommands, diff --git a/src/telegram/bot-native-commands.ts b/src/telegram/bot-native-commands.ts index 468726e2bb8..29636f6cca2 100644 --- a/src/telegram/bot-native-commands.ts +++ b/src/telegram/bot-native-commands.ts @@ -55,6 +55,10 @@ import { resolveTelegramGroupAllowFromContext, resolveTelegramThreadSpec, } from "./bot/helpers.js"; +import { + evaluateTelegramGroupBaseAccess, + evaluateTelegramGroupPolicyAccess, +} from "./group-access.js"; import { buildInlineKeyboard } from "./send.js"; const EMPTY_RESPONSE_FALLBACK = "No response generated. Please try again."; @@ -172,18 +176,9 @@ async function resolveTelegramCommandAuth(params: { effectiveGroupAllow, hasGroupAllowOverride, } = groupAllowContext; - const senderIdRaw = msg.from?.id; - const senderId = senderIdRaw ? String(senderIdRaw) : ""; + const senderId = msg.from?.id ? String(msg.from.id) : ""; const senderUsername = msg.from?.username ?? ""; - const isGroupSenderAllowed = () => - senderIdRaw != null && - isSenderAllowed({ - allow: effectiveGroupAllow, - senderId: String(senderIdRaw), - senderUsername, - }); - const rejectNotAuthorized = async () => { await withTelegramApiErrorLogging({ operation: "sendMessage", @@ -192,43 +187,68 @@ async function resolveTelegramCommandAuth(params: { return null; }; - if (isGroup && groupConfig?.enabled === false) { - await withTelegramApiErrorLogging({ - operation: "sendMessage", - fn: () => bot.api.sendMessage(chatId, "This group is disabled."), - }); - return null; - } - if (isGroup && topicConfig?.enabled === false) { - await withTelegramApiErrorLogging({ - operation: "sendMessage", - fn: () => bot.api.sendMessage(chatId, "This topic is disabled."), - }); - return null; - } - if (requireAuth && isGroup && hasGroupAllowOverride) { - if (!isGroupSenderAllowed()) { - return await rejectNotAuthorized(); + const baseAccess = evaluateTelegramGroupBaseAccess({ + isGroup, + groupConfig, + topicConfig, + hasGroupAllowOverride, + effectiveGroupAllow, + senderId, + senderUsername, + enforceAllowOverride: requireAuth, + requireSenderForAllowOverride: true, + }); + if (!baseAccess.allowed) { + if (baseAccess.reason === "group-disabled") { + await withTelegramApiErrorLogging({ + operation: "sendMessage", + fn: () => bot.api.sendMessage(chatId, "This group is disabled."), + }); + return null; } + if (baseAccess.reason === "topic-disabled") { + await withTelegramApiErrorLogging({ + operation: "sendMessage", + fn: () => bot.api.sendMessage(chatId, "This topic is disabled."), + }); + return null; + } + return await rejectNotAuthorized(); } - if (isGroup && useAccessGroups) { - const defaultGroupPolicy = cfg.channels?.defaults?.groupPolicy; - const groupPolicy = telegramCfg.groupPolicy ?? defaultGroupPolicy ?? "open"; - if (groupPolicy === "disabled") { + const policyAccess = evaluateTelegramGroupPolicyAccess({ + isGroup, + chatId, + cfg, + telegramCfg, + topicConfig, + groupConfig, + effectiveGroupAllow, + senderId, + senderUsername, + resolveGroupPolicy, + enforcePolicy: useAccessGroups, + useTopicAndGroupOverrides: false, + enforceAllowlistAuthorization: requireAuth, + allowEmptyAllowlistEntries: true, + requireSenderForAllowlistAuthorization: true, + checkChatAllowlist: useAccessGroups, + }); + if (!policyAccess.allowed) { + if (policyAccess.reason === "group-policy-disabled") { await withTelegramApiErrorLogging({ operation: "sendMessage", fn: () => bot.api.sendMessage(chatId, "Telegram group commands are disabled."), }); return null; } - if (groupPolicy === "allowlist" && requireAuth) { - if (!isGroupSenderAllowed()) { - return await rejectNotAuthorized(); - } + if ( + policyAccess.reason === "group-policy-allowlist-no-sender" || + policyAccess.reason === "group-policy-allowlist-unauthorized" + ) { + return await rejectNotAuthorized(); } - const groupAllowlist = resolveGroupPolicy(chatId); - if (groupAllowlist.allowlistEnabled && !groupAllowlist.allowed) { + if (policyAccess.reason === "group-chat-not-allowed") { await withTelegramApiErrorLogging({ operation: "sendMessage", fn: () => bot.api.sendMessage(chatId, "This group is not allowed."), @@ -357,6 +377,41 @@ export const registerTelegramNativeCommands = ({ // Keep hidden commands callable by registering handlers for the full catalog. syncTelegramMenuCommands({ bot, runtime, commandsToRegister }); + const resolveCommandRuntimeContext = (params: { + msg: NonNullable; + isGroup: boolean; + isForum: boolean; + resolvedThreadId?: number; + }) => { + const { msg, isGroup, isForum, resolvedThreadId } = params; + const chatId = msg.chat.id; + const messageThreadId = (msg as { message_thread_id?: number }).message_thread_id; + const threadSpec = resolveTelegramThreadSpec({ + isGroup, + isForum, + messageThreadId, + }); + const parentPeer = buildTelegramParentPeer({ isGroup, resolvedThreadId, chatId }); + const route = resolveAgentRoute({ + cfg, + channel: "telegram", + accountId, + peer: { + kind: isGroup ? "group" : "direct", + id: isGroup ? buildTelegramGroupPeerId(chatId, resolvedThreadId) : String(chatId), + }, + parentPeer, + }); + const mediaLocalRoots = getAgentScopedMediaLocalRoots(cfg, route.agentId); + const tableMode = resolveMarkdownTableMode({ + cfg, + channel: "telegram", + accountId: route.accountId, + }); + const chunkMode = resolveChunkMode(cfg, "telegram", route.accountId); + return { chatId, threadSpec, route, mediaLocalRoots, tableMode, chunkMode }; + }; + if (commandsToRegister.length > 0 || pluginCatalog.commands.length > 0) { if (typeof (bot as unknown as { command?: unknown }).command !== "function") { logVerbose("telegram: bot.command unavailable; skipping native handlers"); @@ -397,12 +452,13 @@ export const registerTelegramNativeCommands = ({ topicConfig, commandAuthorized, } = auth; - const messageThreadId = (msg as { message_thread_id?: number }).message_thread_id; - const threadSpec = resolveTelegramThreadSpec({ - isGroup, - isForum, - messageThreadId, - }); + const { threadSpec, route, mediaLocalRoots, tableMode, chunkMode } = + resolveCommandRuntimeContext({ + msg, + isGroup, + isForum, + resolvedThreadId, + }); const threadParams = buildTelegramThreadParams(threadSpec) ?? {}; const commandDefinition = findCommandByNativeName(command.name, "telegram"); @@ -455,18 +511,6 @@ export const registerTelegramNativeCommands = ({ }); return; } - const parentPeer = buildTelegramParentPeer({ isGroup, resolvedThreadId, chatId }); - const route = resolveAgentRoute({ - cfg, - channel: "telegram", - accountId, - peer: { - kind: isGroup ? "group" : "direct", - id: isGroup ? buildTelegramGroupPeerId(chatId, resolvedThreadId) : String(chatId), - }, - parentPeer, - }); - const mediaLocalRoots = getAgentScopedMediaLocalRoots(cfg, route.agentId); const baseSessionKey = route.sessionKey; // DMs: use raw messageThreadId for thread sessions (not resolvedThreadId which is for forums) const dmThreadId = threadSpec.scope === "dm" ? threadSpec.id : undefined; @@ -478,11 +522,6 @@ export const registerTelegramNativeCommands = ({ }) : null; const sessionKey = threadKeys?.sessionKey ?? baseSessionKey; - const tableMode = resolveMarkdownTableMode({ - cfg, - channel: "telegram", - accountId: route.accountId, - }); const skillFilter = firstDefined(topicConfig?.skills, groupConfig?.skills); const systemPromptParts = [ groupConfig?.systemPrompt?.trim() || null, @@ -530,7 +569,6 @@ export const registerTelegramNativeCommands = ({ typeof telegramCfg.blockStreaming === "boolean" ? !telegramCfg.blockStreaming : undefined; - const chunkMode = resolveChunkMode(cfg, "telegram", route.accountId); const deliveryState = { delivered: false, @@ -640,24 +678,13 @@ export const registerTelegramNativeCommands = ({ return; } const { senderId, commandAuthorized, isGroup, isForum, resolvedThreadId } = auth; - const messageThreadId = (msg as { message_thread_id?: number }).message_thread_id; - const threadSpec = resolveTelegramThreadSpec({ - isGroup, - isForum, - messageThreadId, - }); - const parentPeer = buildTelegramParentPeer({ isGroup, resolvedThreadId, chatId }); - const route = resolveAgentRoute({ - cfg, - channel: "telegram", - accountId, - peer: { - kind: isGroup ? "group" : "direct", - id: isGroup ? buildTelegramGroupPeerId(chatId, resolvedThreadId) : String(chatId), - }, - parentPeer, - }); - const mediaLocalRoots = getAgentScopedMediaLocalRoots(cfg, route.agentId); + const { threadSpec, mediaLocalRoots, tableMode, chunkMode } = + resolveCommandRuntimeContext({ + msg, + isGroup, + isForum, + resolvedThreadId, + }); const from = isGroup ? buildTelegramGroupFrom(chatId, threadSpec.id) : `telegram:${chatId}`; @@ -676,12 +703,6 @@ export const registerTelegramNativeCommands = ({ accountId, messageThreadId: threadSpec.id, }); - const tableMode = resolveMarkdownTableMode({ - cfg, - channel: "telegram", - accountId: route.accountId, - }); - const chunkMode = resolveChunkMode(cfg, "telegram", route.accountId); await deliverReplies({ replies: [result], diff --git a/src/telegram/download.ts b/src/telegram/download.ts deleted file mode 100644 index 8da41eab312..00000000000 --- a/src/telegram/download.ts +++ /dev/null @@ -1,57 +0,0 @@ -import { detectMime } from "../media/mime.js"; -import { type SavedMedia, saveMediaBuffer } from "../media/store.js"; - -export type TelegramFileInfo = { - file_id: string; - file_unique_id?: string; - file_size?: number; - file_path?: string; -}; - -export async function getTelegramFile( - token: string, - fileId: string, - timeoutMs = 30_000, -): Promise { - const res = await fetch( - `https://api.telegram.org/bot${token}/getFile?file_id=${encodeURIComponent(fileId)}`, - { signal: AbortSignal.timeout(timeoutMs) }, - ); - if (!res.ok) { - throw new Error(`getFile failed: ${res.status} ${res.statusText}`); - } - const json = (await res.json()) as { ok: boolean; result?: TelegramFileInfo }; - if (!json.ok || !json.result?.file_path) { - throw new Error("getFile returned no file_path"); - } - return json.result; -} - -export async function downloadTelegramFile( - token: string, - info: TelegramFileInfo, - maxBytes?: number, - timeoutMs = 60_000, -): Promise { - if (!info.file_path) { - throw new Error("file_path missing"); - } - const url = `https://api.telegram.org/file/bot${token}/${info.file_path}`; - const res = await fetch(url, { signal: AbortSignal.timeout(timeoutMs) }); - if (!res.ok || !res.body) { - throw new Error(`Failed to download telegram file: HTTP ${res.status}`); - } - const array = Buffer.from(await res.arrayBuffer()); - const mime = await detectMime({ - buffer: array, - headerMime: res.headers.get("content-type"), - filePath: info.file_path, - }); - // save with inbound subdir - const saved = await saveMediaBuffer(array, mime, "inbound", maxBytes, info.file_path); - // Ensure extension matches mime if possible - if (!saved.contentType && mime) { - saved.contentType = mime; - } - return saved; -} diff --git a/src/telegram/fetch.test.ts b/src/telegram/fetch.test.ts index a4f0d88c297..c20d3ad5e9a 100644 --- a/src/telegram/fetch.test.ts +++ b/src/telegram/fetch.test.ts @@ -1,5 +1,4 @@ import { afterEach, describe, expect, it, vi } from "vitest"; -import { downloadTelegramFile, getTelegramFile, type TelegramFileInfo } from "./download.js"; import { resetTelegramFetchStateForTests, resolveTelegramFetch } from "./fetch.js"; const setDefaultAutoSelectFamily = vi.hoisted(() => vi.fn()); @@ -61,36 +60,3 @@ describe("resolveTelegramFetch", () => { expect(setDefaultAutoSelectFamily).toHaveBeenCalledWith(false); }); }); - -describe("telegram download", () => { - it("fetches file info", async () => { - const json = vi.fn().mockResolvedValue({ ok: true, result: { file_path: "photos/1.jpg" } }); - vi.spyOn(globalThis, "fetch" as never).mockResolvedValueOnce({ - ok: true, - status: 200, - statusText: "OK", - json, - } as Response); - const info = await getTelegramFile("tok", "fid"); - expect(info.file_path).toBe("photos/1.jpg"); - }); - - it("downloads and saves", async () => { - const info: TelegramFileInfo = { - file_id: "fid", - file_path: "photos/1.jpg", - }; - const arrayBuffer = async () => new Uint8Array([1, 2, 3, 4]).buffer; - vi.spyOn(globalThis, "fetch" as never).mockResolvedValueOnce({ - ok: true, - status: 200, - statusText: "OK", - body: true, - arrayBuffer, - headers: { get: () => "image/jpeg" }, - } as Response); - const saved = await downloadTelegramFile("tok", info, 1024 * 1024); - expect(saved.path).toBeTruthy(); - expect(saved.contentType).toBe("image/jpeg"); - }); -}); diff --git a/src/telegram/group-access.ts b/src/telegram/group-access.ts new file mode 100644 index 00000000000..02375218171 --- /dev/null +++ b/src/telegram/group-access.ts @@ -0,0 +1,141 @@ +import type { OpenClawConfig } from "../config/config.js"; +import type { ChannelGroupPolicy } from "../config/group-policy.js"; +import type { + TelegramAccountConfig, + TelegramGroupConfig, + TelegramTopicConfig, +} from "../config/types.js"; +import { isSenderAllowed, type NormalizedAllowFrom } from "./bot-access.js"; +import { firstDefined } from "./bot-access.js"; + +export type TelegramGroupBaseBlockReason = + | "group-disabled" + | "topic-disabled" + | "group-override-unauthorized"; + +export type TelegramGroupBaseAccessResult = + | { allowed: true } + | { allowed: false; reason: TelegramGroupBaseBlockReason }; + +export const evaluateTelegramGroupBaseAccess = (params: { + isGroup: boolean; + groupConfig?: TelegramGroupConfig; + topicConfig?: TelegramTopicConfig; + hasGroupAllowOverride: boolean; + effectiveGroupAllow: NormalizedAllowFrom; + senderId?: string; + senderUsername?: string; + enforceAllowOverride: boolean; + requireSenderForAllowOverride: boolean; +}): TelegramGroupBaseAccessResult => { + if (!params.isGroup) { + return { allowed: true }; + } + if (params.groupConfig?.enabled === false) { + return { allowed: false, reason: "group-disabled" }; + } + if (params.topicConfig?.enabled === false) { + return { allowed: false, reason: "topic-disabled" }; + } + if (!params.enforceAllowOverride || !params.hasGroupAllowOverride) { + return { allowed: true }; + } + + const senderId = params.senderId ?? ""; + if (params.requireSenderForAllowOverride && !senderId) { + return { allowed: false, reason: "group-override-unauthorized" }; + } + + const allowed = isSenderAllowed({ + allow: params.effectiveGroupAllow, + senderId, + senderUsername: params.senderUsername ?? "", + }); + if (!allowed) { + return { allowed: false, reason: "group-override-unauthorized" }; + } + return { allowed: true }; +}; + +export type TelegramGroupPolicyBlockReason = + | "group-policy-disabled" + | "group-policy-allowlist-no-sender" + | "group-policy-allowlist-empty" + | "group-policy-allowlist-unauthorized" + | "group-chat-not-allowed"; + +export type TelegramGroupPolicyAccessResult = + | { allowed: true; groupPolicy: "open" | "disabled" | "allowlist" } + | { + allowed: false; + reason: TelegramGroupPolicyBlockReason; + groupPolicy: "open" | "disabled" | "allowlist"; + }; + +export const evaluateTelegramGroupPolicyAccess = (params: { + isGroup: boolean; + chatId: string | number; + cfg: OpenClawConfig; + telegramCfg: TelegramAccountConfig; + topicConfig?: TelegramTopicConfig; + groupConfig?: TelegramGroupConfig; + effectiveGroupAllow: NormalizedAllowFrom; + senderId?: string; + senderUsername?: string; + resolveGroupPolicy: (chatId: string | number) => ChannelGroupPolicy; + enforcePolicy: boolean; + useTopicAndGroupOverrides: boolean; + enforceAllowlistAuthorization: boolean; + allowEmptyAllowlistEntries: boolean; + requireSenderForAllowlistAuthorization: boolean; + checkChatAllowlist: boolean; +}): TelegramGroupPolicyAccessResult => { + const fallbackPolicy = + firstDefined( + params.telegramCfg.groupPolicy, + params.cfg.channels?.defaults?.groupPolicy, + "open", + ) ?? "open"; + const groupPolicy = params.useTopicAndGroupOverrides + ? (firstDefined( + params.topicConfig?.groupPolicy, + params.groupConfig?.groupPolicy, + params.telegramCfg.groupPolicy, + params.cfg.channels?.defaults?.groupPolicy, + "open", + ) ?? "open") + : fallbackPolicy; + + if (!params.isGroup || !params.enforcePolicy) { + return { allowed: true, groupPolicy }; + } + if (groupPolicy === "disabled") { + return { allowed: false, reason: "group-policy-disabled", groupPolicy }; + } + if (groupPolicy === "allowlist" && params.enforceAllowlistAuthorization) { + const senderId = params.senderId ?? ""; + if (params.requireSenderForAllowlistAuthorization && !senderId) { + return { allowed: false, reason: "group-policy-allowlist-no-sender", groupPolicy }; + } + if (!params.allowEmptyAllowlistEntries && !params.effectiveGroupAllow.hasEntries) { + return { allowed: false, reason: "group-policy-allowlist-empty", groupPolicy }; + } + const senderUsername = params.senderUsername ?? ""; + if ( + !isSenderAllowed({ + allow: params.effectiveGroupAllow, + senderId, + senderUsername, + }) + ) { + return { allowed: false, reason: "group-policy-allowlist-unauthorized", groupPolicy }; + } + } + if (params.checkChatAllowlist) { + const groupAllowlist = params.resolveGroupPolicy(params.chatId); + if (groupAllowlist.allowlistEnabled && !groupAllowlist.allowed) { + return { allowed: false, reason: "group-chat-not-allowed", groupPolicy }; + } + } + return { allowed: true, groupPolicy }; +}; diff --git a/src/telegram/index.ts b/src/telegram/index.ts deleted file mode 100644 index 5ffb8dacaf6..00000000000 --- a/src/telegram/index.ts +++ /dev/null @@ -1,4 +0,0 @@ -export { createTelegramBot, createTelegramWebhookCallback } from "./bot.js"; -export { monitorTelegramProvider } from "./monitor.js"; -export { reactMessageTelegram, sendMessageTelegram, sendPollTelegram } from "./send.js"; -export { startTelegramWebhook } from "./webhook.js"; diff --git a/src/telegram/send.ts b/src/telegram/send.ts index ec451844f65..414d080bfc8 100644 --- a/src/telegram/send.ts +++ b/src/telegram/send.ts @@ -63,6 +63,11 @@ type TelegramSendResult = { chatId: string; }; +type TelegramMessageLike = { + message_id?: number; + chat?: { id?: string | number }; +}; + type TelegramReactionOpts = { token?: string; accountId?: string; @@ -76,6 +81,7 @@ const PARSE_ERR_RE = /can't parse entities|parse entities|find end of the entity const THREAD_NOT_FOUND_RE = /400:\s*Bad Request:\s*message thread not found/i; const MESSAGE_NOT_MODIFIED_RE = /400:\s*Bad Request:\s*message is not modified|MESSAGE_NOT_MODIFIED/i; +const CHAT_NOT_FOUND_RE = /400: Bad Request: chat not found/i; const diagLogger = createSubsystemLogger("telegram/diagnostic"); function createTelegramHttpLogger(cfg: ReturnType) { @@ -213,6 +219,123 @@ function removeMessageThreadIdParam( return Object.keys(next).length > 0 ? next : undefined; } +type TelegramApiContext = { + cfg: ReturnType; + account: ResolvedTelegramAccount; + api: Bot["api"]; +}; + +function resolveTelegramApiContext(opts: { + token?: string; + accountId?: string; + api?: Bot["api"]; + cfg?: ReturnType; +}): TelegramApiContext { + const cfg = opts.cfg ?? loadConfig(); + const account = resolveTelegramAccount({ + cfg, + accountId: opts.accountId, + }); + const token = resolveToken(opts.token, account); + const client = resolveTelegramClientOptions(account); + const api = opts.api ?? new Bot(token, client ? { client } : undefined).api; + return { cfg, account, api }; +} + +type TelegramRequestWithDiag = ( + fn: () => Promise, + label?: string, + options?: { shouldLog?: (err: unknown) => boolean }, +) => Promise; + +function createTelegramRequestWithDiag(params: { + cfg: ReturnType; + account: ResolvedTelegramAccount; + retry?: RetryConfig; + verbose?: boolean; + shouldRetry?: (err: unknown) => boolean; + useApiErrorLogging?: boolean; +}): TelegramRequestWithDiag { + const request = createTelegramRetryRunner({ + retry: params.retry, + configRetry: params.account.config.retry, + verbose: params.verbose, + ...(params.shouldRetry ? { shouldRetry: params.shouldRetry } : {}), + }); + const logHttpError = createTelegramHttpLogger(params.cfg); + return ( + fn: () => Promise, + label?: string, + options?: { shouldLog?: (err: unknown) => boolean }, + ) => { + const runRequest = () => request(fn, label); + const call = + params.useApiErrorLogging === false + ? runRequest() + : withTelegramApiErrorLogging({ + operation: label ?? "request", + fn: runRequest, + ...(options?.shouldLog ? { shouldLog: options.shouldLog } : {}), + }); + return call.catch((err) => { + logHttpError(label ?? "request", err); + throw err; + }); + }; +} + +function wrapTelegramChatNotFoundError(err: unknown, params: { chatId: string; input: string }) { + if (!CHAT_NOT_FOUND_RE.test(formatErrorMessage(err))) { + return err; + } + return new Error( + [ + `Telegram send failed: chat not found (chat_id=${params.chatId}).`, + "Likely: bot not started in DM, bot removed from group/channel, group migrated (new -100… id), or wrong bot token.", + `Input was: ${JSON.stringify(params.input)}.`, + ].join(" "), + ); +} + +async function withTelegramThreadFallback( + params: Record | undefined, + label: string, + verbose: boolean | undefined, + attempt: ( + effectiveParams: Record | undefined, + effectiveLabel: string, + ) => Promise, +): Promise { + try { + return await attempt(params, label); + } catch (err) { + if (!hasMessageThreadIdParam(params) || !isTelegramThreadNotFoundError(err)) { + throw err; + } + if (verbose) { + console.warn( + `telegram ${label} failed with message_thread_id, retrying without thread: ${formatErrorMessage(err)}`, + ); + } + const retriedParams = removeMessageThreadIdParam(params); + return await attempt(retriedParams, `${label}-threadless`); + } +} + +function createRequestWithChatNotFound(params: { + requestWithDiag: TelegramRequestWithDiag; + chatId: string; + input: string; +}) { + return async (fn: () => Promise, label: string) => + params.requestWithDiag(fn, label).catch((err) => { + throw wrapTelegramChatNotFoundError(err, { + chatId: params.chatId, + input: params.input, + }); + }); +} + export function buildInlineKeyboard( buttons?: TelegramSendOpts["buttons"], ): InlineKeyboardMarkup | undefined { @@ -242,18 +365,9 @@ export async function sendMessageTelegram( text: string, opts: TelegramSendOpts = {}, ): Promise { - const cfg = loadConfig(); - const account = resolveTelegramAccount({ - cfg, - accountId: opts.accountId, - }); - const token = resolveToken(opts.token, account); + const { cfg, account, api } = resolveTelegramApiContext(opts); const target = parseTelegramTarget(to); const chatId = normalizeChatId(target.chatId); - // Use provided api or create a new Bot instance. The nullish coalescing - // operator ensures api is always defined (Bot.api is always non-null). - const client = resolveTelegramClientOptions(account); - const api = opts.api ?? new Bot(token, client ? { client } : undefined).api; const mediaUrl = opts.mediaUrl?.trim(); const replyMarkup = buildInlineKeyboard(opts.buttons); @@ -277,57 +391,18 @@ export async function sendMessageTelegram( } } const hasThreadParams = Object.keys(threadParams).length > 0; - const request = createTelegramRetryRunner({ + const requestWithDiag = createTelegramRequestWithDiag({ + cfg, + account, retry: opts.retry, - configRetry: account.config.retry, verbose: opts.verbose, shouldRetry: (err) => isRecoverableTelegramNetworkError(err, { context: "send" }), }); - const logHttpError = createTelegramHttpLogger(cfg); - const requestWithDiag = (fn: () => Promise, label?: string) => - withTelegramApiErrorLogging({ - operation: label ?? "request", - fn: () => request(fn, label), - }).catch((err) => { - logHttpError(label ?? "request", err); - throw err; - }); - const wrapChatNotFound = (err: unknown) => { - if (!/400: Bad Request: chat not found/i.test(formatErrorMessage(err))) { - return err; - } - return new Error( - [ - `Telegram send failed: chat not found (chat_id=${chatId}).`, - "Likely: bot not started in DM, bot removed from group/channel, group migrated (new -100… id), or wrong bot token.", - `Input was: ${JSON.stringify(to)}.`, - ].join(" "), - ); - }; - - const sendWithThreadFallback = async ( - params: Record | undefined, - label: string, - attempt: ( - effectiveParams: Record | undefined, - effectiveLabel: string, - ) => Promise, - ): Promise => { - try { - return await attempt(params, label); - } catch (err) { - if (!hasMessageThreadIdParam(params) || !isTelegramThreadNotFoundError(err)) { - throw err; - } - if (opts.verbose) { - console.warn( - `telegram ${label} failed with message_thread_id, retrying without thread: ${formatErrorMessage(err)}`, - ); - } - const retriedParams = removeMessageThreadIdParam(params); - return await attempt(retriedParams, `${label}-threadless`); - } - }; + const requestWithChatNotFound = createRequestWithChatNotFound({ + requestWithDiag, + chatId, + input: to, + }); const textMode = opts.textMode ?? "markdown"; const tableMode = resolveMarkdownTableMode({ @@ -346,48 +421,51 @@ export async function sendMessageTelegram( params?: Record, fallbackText?: string, ) => { - return await sendWithThreadFallback(params, "message", async (effectiveParams, label) => { - const htmlText = renderHtmlText(rawText); - const baseParams = effectiveParams ? { ...effectiveParams } : {}; - if (linkPreviewOptions) { - baseParams.link_preview_options = linkPreviewOptions; - } - const hasBaseParams = Object.keys(baseParams).length > 0; - const sendParams = { - parse_mode: "HTML" as const, - ...baseParams, - ...(opts.silent === true ? { disable_notification: true } : {}), - }; - const res = await requestWithDiag( - () => - api.sendMessage(chatId, htmlText, sendParams as Parameters[2]), - label, - ).catch(async (err) => { - // Telegram rejects malformed HTML (e.g., unsupported tags or entities). - // When that happens, fall back to plain text so the message still delivers. - const errText = formatErrorMessage(err); - if (PARSE_ERR_RE.test(errText)) { - if (opts.verbose) { - console.warn(`telegram HTML parse failed, retrying as plain text: ${errText}`); - } - const fallback = fallbackText ?? rawText; - const plainParams = hasBaseParams - ? (baseParams as Parameters[2]) - : undefined; - return await requestWithDiag( - () => - plainParams - ? api.sendMessage(chatId, fallback, plainParams) - : api.sendMessage(chatId, fallback), - `${label}-plain`, - ).catch((err2) => { - throw wrapChatNotFound(err2); - }); + return await withTelegramThreadFallback( + params, + "message", + opts.verbose, + async (effectiveParams, label) => { + const htmlText = renderHtmlText(rawText); + const baseParams = effectiveParams ? { ...effectiveParams } : {}; + if (linkPreviewOptions) { + baseParams.link_preview_options = linkPreviewOptions; } - throw wrapChatNotFound(err); - }); - return res; - }); + const hasBaseParams = Object.keys(baseParams).length > 0; + const sendParams = { + parse_mode: "HTML" as const, + ...baseParams, + ...(opts.silent === true ? { disable_notification: true } : {}), + }; + const res = await requestWithChatNotFound( + () => + api.sendMessage(chatId, htmlText, sendParams as Parameters[2]), + label, + ).catch(async (err) => { + // Telegram rejects malformed HTML (e.g., unsupported tags or entities). + // When that happens, fall back to plain text so the message still delivers. + const errText = formatErrorMessage(err); + if (PARSE_ERR_RE.test(errText)) { + if (opts.verbose) { + console.warn(`telegram HTML parse failed, retrying as plain text: ${errText}`); + } + const fallback = fallbackText ?? rawText; + const plainParams = hasBaseParams + ? (baseParams as Parameters[2]) + : undefined; + return await requestWithChatNotFound( + () => + plainParams + ? api.sendMessage(chatId, fallback, plainParams) + : api.sendMessage(chatId, fallback), + `${label}-plain`, + ); + } + throw err; + }); + return res; + }, + ); }; if (mediaUrl) { @@ -429,124 +507,105 @@ export async function sendMessageTelegram( ...baseMediaParams, ...(opts.silent === true ? { disable_notification: true } : {}), }; - let result: - | Awaited> - | Awaited> - | Awaited> - | Awaited> - | Awaited> - | Awaited> - | Awaited>; - if (isGif) { - result = await sendWithThreadFallback( + const sendMedia = async ( + label: string, + sender: ( + effectiveParams: Record | undefined, + ) => Promise, + ) => + await withTelegramThreadFallback( mediaParams, - "animation", - async (effectiveParams, label) => - requestWithDiag( - () => - api.sendAnimation( + label, + opts.verbose, + async (effectiveParams, retryLabel) => + requestWithChatNotFound(() => sender(effectiveParams), retryLabel), + ); + + const mediaSender = (() => { + if (isGif) { + return { + label: "animation", + sender: (effectiveParams: Record | undefined) => + api.sendAnimation( + chatId, + file, + effectiveParams as Parameters[2], + ) as Promise, + }; + } + if (kind === "image") { + return { + label: "photo", + sender: (effectiveParams: Record | undefined) => + api.sendPhoto( + chatId, + file, + effectiveParams as Parameters[2], + ) as Promise, + }; + } + if (kind === "video") { + if (isVideoNote) { + return { + label: "video_note", + sender: (effectiveParams: Record | undefined) => + api.sendVideoNote( chatId, file, - effectiveParams as Parameters[2], - ), - label, - ).catch((err) => { - throw wrapChatNotFound(err); - }), - ); - } else if (kind === "image") { - result = await sendWithThreadFallback(mediaParams, "photo", async (effectiveParams, label) => - requestWithDiag( - () => api.sendPhoto(chatId, file, effectiveParams as Parameters[2]), - label, - ).catch((err) => { - throw wrapChatNotFound(err); - }), - ); - } else if (kind === "video") { - if (isVideoNote) { - result = await sendWithThreadFallback( - mediaParams, - "video_note", - async (effectiveParams, label) => - requestWithDiag( - () => - api.sendVideoNote( - chatId, - file, - effectiveParams as Parameters[2], - ), - label, - ).catch((err) => { - throw wrapChatNotFound(err); - }), - ); - } else { - result = await sendWithThreadFallback( - mediaParams, - "video", - async (effectiveParams, label) => - requestWithDiag( - () => - api.sendVideo(chatId, file, effectiveParams as Parameters[2]), - label, - ).catch((err) => { - throw wrapChatNotFound(err); - }), - ); + effectiveParams as Parameters[2], + ) as Promise, + }; + } + return { + label: "video", + sender: (effectiveParams: Record | undefined) => + api.sendVideo( + chatId, + file, + effectiveParams as Parameters[2], + ) as Promise, + }; } - } else if (kind === "audio") { - const { useVoice } = resolveTelegramVoiceSend({ - wantsVoice: opts.asVoice === true, // default false (backward compatible) - contentType: media.contentType, - fileName, - logFallback: logVerbose, - }); - if (useVoice) { - result = await sendWithThreadFallback( - mediaParams, - "voice", - async (effectiveParams, label) => - requestWithDiag( - () => - api.sendVoice(chatId, file, effectiveParams as Parameters[2]), - label, - ).catch((err) => { - throw wrapChatNotFound(err); - }), - ); - } else { - result = await sendWithThreadFallback( - mediaParams, - "audio", - async (effectiveParams, label) => - requestWithDiag( - () => - api.sendAudio(chatId, file, effectiveParams as Parameters[2]), - label, - ).catch((err) => { - throw wrapChatNotFound(err); - }), - ); - } - } else { - result = await sendWithThreadFallback( - mediaParams, - "document", - async (effectiveParams, label) => - requestWithDiag( - () => - api.sendDocument( + if (kind === "audio") { + const { useVoice } = resolveTelegramVoiceSend({ + wantsVoice: opts.asVoice === true, // default false (backward compatible) + contentType: media.contentType, + fileName, + logFallback: logVerbose, + }); + if (useVoice) { + return { + label: "voice", + sender: (effectiveParams: Record | undefined) => + api.sendVoice( chatId, file, - effectiveParams as Parameters[2], - ), - label, - ).catch((err) => { - throw wrapChatNotFound(err); - }), - ); - } + effectiveParams as Parameters[2], + ) as Promise, + }; + } + return { + label: "audio", + sender: (effectiveParams: Record | undefined) => + api.sendAudio( + chatId, + file, + effectiveParams as Parameters[2], + ) as Promise, + }; + } + return { + label: "document", + sender: (effectiveParams: Record | undefined) => + api.sendDocument( + chatId, + file, + effectiveParams as Parameters[2], + ) as Promise, + }; + })(); + + const result = await sendMedia(mediaSender.label, mediaSender.sender); const mediaMessageId = String(result?.message_id ?? "unknown"); const resolvedChatId = String(result?.chat?.id ?? chatId); if (result?.message_id) { @@ -608,31 +667,16 @@ export async function reactMessageTelegram( emoji: string, opts: TelegramReactionOpts = {}, ): Promise<{ ok: true } | { ok: false; warning: string }> { - const cfg = loadConfig(); - const account = resolveTelegramAccount({ - cfg, - accountId: opts.accountId, - }); - const token = resolveToken(opts.token, account); + const { cfg, account, api } = resolveTelegramApiContext(opts); const chatId = normalizeChatId(String(chatIdInput)); const messageId = normalizeMessageId(messageIdInput); - const client = resolveTelegramClientOptions(account); - const api = opts.api ?? new Bot(token, client ? { client } : undefined).api; - const request = createTelegramRetryRunner({ + const requestWithDiag = createTelegramRequestWithDiag({ + cfg, + account, retry: opts.retry, - configRetry: account.config.retry, verbose: opts.verbose, shouldRetry: (err) => isRecoverableTelegramNetworkError(err, { context: "send" }), }); - const logHttpError = createTelegramHttpLogger(cfg); - const requestWithDiag = (fn: () => Promise, label?: string) => - withTelegramApiErrorLogging({ - operation: label ?? "request", - fn: () => request(fn, label), - }).catch((err) => { - logHttpError(label ?? "request", err); - throw err; - }); const remove = opts.remove === true; const trimmedEmoji = emoji.trim(); // Build the reaction array. We cast emoji to the grammY union type since @@ -669,31 +713,16 @@ export async function deleteMessageTelegram( messageIdInput: string | number, opts: TelegramDeleteOpts = {}, ): Promise<{ ok: true }> { - const cfg = loadConfig(); - const account = resolveTelegramAccount({ - cfg, - accountId: opts.accountId, - }); - const token = resolveToken(opts.token, account); + const { cfg, account, api } = resolveTelegramApiContext(opts); const chatId = normalizeChatId(String(chatIdInput)); const messageId = normalizeMessageId(messageIdInput); - const client = resolveTelegramClientOptions(account); - const api = opts.api ?? new Bot(token, client ? { client } : undefined).api; - const request = createTelegramRetryRunner({ + const requestWithDiag = createTelegramRequestWithDiag({ + cfg, + account, retry: opts.retry, - configRetry: account.config.retry, verbose: opts.verbose, shouldRetry: (err) => isRecoverableTelegramNetworkError(err, { context: "send" }), }); - const logHttpError = createTelegramHttpLogger(cfg); - const requestWithDiag = (fn: () => Promise, label?: string) => - withTelegramApiErrorLogging({ - operation: label ?? "request", - fn: () => request(fn, label), - }).catch((err) => { - logHttpError(label ?? "request", err); - throw err; - }); await requestWithDiag(() => api.deleteMessage(chatId, messageId), "deleteMessage"); logVerbose(`[telegram] Deleted message ${messageId} from chat ${chatId}`); return { ok: true }; @@ -720,35 +749,23 @@ export async function editMessageTelegram( text: string, opts: TelegramEditOpts = {}, ): Promise<{ ok: true; messageId: string; chatId: string }> { - const cfg = opts.cfg ?? loadConfig(); - const account = resolveTelegramAccount({ - cfg, - accountId: opts.accountId, + const { cfg, account, api } = resolveTelegramApiContext({ + ...opts, + cfg: opts.cfg, }); - const token = resolveToken(opts.token, account); const chatId = normalizeChatId(String(chatIdInput)); const messageId = normalizeMessageId(messageIdInput); - const client = resolveTelegramClientOptions(account); - const api = opts.api ?? new Bot(token, client ? { client } : undefined).api; - const request = createTelegramRetryRunner({ + const requestWithDiag = createTelegramRequestWithDiag({ + cfg, + account, retry: opts.retry, - configRetry: account.config.retry, verbose: opts.verbose, }); - const logHttpError = createTelegramHttpLogger(cfg); - const requestWithDiag = ( + const requestWithEditShouldLog = ( fn: () => Promise, label?: string, shouldLog?: (err: unknown) => boolean, - ) => - withTelegramApiErrorLogging({ - operation: label ?? "request", - fn: () => request(fn, label), - shouldLog, - }).catch((err) => { - logHttpError(label ?? "request", err); - throw err; - }); + ) => requestWithDiag(fn, label, shouldLog ? { shouldLog } : undefined); const textMode = opts.textMode ?? "markdown"; const tableMode = resolveMarkdownTableMode({ @@ -776,7 +793,7 @@ export async function editMessageTelegram( editParams.reply_markup = replyMarkup; } - await requestWithDiag( + await requestWithEditShouldLog( () => api.editMessageText(chatId, messageId, htmlText, editParams), "editMessage", (err) => !isTelegramMessageNotModifiedError(err), @@ -798,7 +815,7 @@ export async function editMessageTelegram( if (replyMarkup !== undefined) { plainParams.reply_markup = replyMarkup; } - return await requestWithDiag( + return await requestWithEditShouldLog( () => Object.keys(plainParams).length > 0 ? api.editMessageText(chatId, messageId, text, plainParams) @@ -859,90 +876,42 @@ export async function sendStickerTelegram( throw new Error("Telegram sticker file_id is required"); } - const cfg = loadConfig(); - const account = resolveTelegramAccount({ - cfg, - accountId: opts.accountId, - }); - const token = resolveToken(opts.token, account); + const { cfg, account, api } = resolveTelegramApiContext(opts); const target = parseTelegramTarget(to); const chatId = normalizeChatId(target.chatId); - const client = resolveTelegramClientOptions(account); - const api = opts.api ?? new Bot(token, client ? { client } : undefined).api; const messageThreadId = opts.messageThreadId != null ? opts.messageThreadId : target.messageThreadId; const threadSpec = messageThreadId != null ? { id: messageThreadId, scope: "forum" as const } : undefined; const threadIdParams = buildTelegramThreadParams(threadSpec); - const threadParams: Record = threadIdParams ? { ...threadIdParams } : {}; + const threadParams: Record = threadIdParams ? { ...threadIdParams } : {}; if (opts.replyToMessageId != null) { threadParams.reply_to_message_id = Math.trunc(opts.replyToMessageId); } const hasThreadParams = Object.keys(threadParams).length > 0; - const request = createTelegramRetryRunner({ + const requestWithDiag = createTelegramRequestWithDiag({ + cfg, + account, retry: opts.retry, - configRetry: account.config.retry, verbose: opts.verbose, + useApiErrorLogging: false, + }); + const requestWithChatNotFound = createRequestWithChatNotFound({ + requestWithDiag, + chatId, + input: to, }); - const logHttpError = createTelegramHttpLogger(cfg); - const requestWithDiag = (fn: () => Promise, label?: string) => - request(fn, label).catch((err) => { - logHttpError(label ?? "request", err); - throw err; - }); - - const wrapChatNotFound = (err: unknown) => { - if (!/400: Bad Request: chat not found/i.test(formatErrorMessage(err))) { - return err; - } - return new Error( - [ - `Telegram send failed: chat not found (chat_id=${chatId}).`, - "Likely: bot not started in DM, bot removed from group/channel, group migrated (new -100… id), or wrong bot token.", - `Input was: ${JSON.stringify(to)}.`, - ].join(" "), - ); - }; - - const sendWithThreadFallback = async ( - params: Record | undefined, - label: string, - attempt: ( - effectiveParams: Record | undefined, - effectiveLabel: string, - ) => Promise, - ): Promise => { - try { - return await attempt(params, label); - } catch (err) { - if (!hasMessageThreadIdParam(params) || !isTelegramThreadNotFoundError(err)) { - throw err; - } - if (opts.verbose) { - console.warn( - `telegram ${label} failed with message_thread_id, retrying without thread: ${formatErrorMessage(err)}`, - ); - } - const retriedParams = removeMessageThreadIdParam(params) as - | Record - | undefined; - return await attempt(retriedParams, `${label}-threadless`); - } - }; const stickerParams = hasThreadParams ? threadParams : undefined; - const result = await sendWithThreadFallback( + const result = await withTelegramThreadFallback( stickerParams, "sticker", + opts.verbose, async (effectiveParams, label) => - requestWithDiag(() => api.sendSticker(chatId, fileId.trim(), effectiveParams), label).catch( - (err) => { - throw wrapChatNotFound(err); - }, - ), + requestWithChatNotFound(() => api.sendSticker(chatId, fileId.trim(), effectiveParams), label), ); const messageId = String(result?.message_id ?? "unknown"); @@ -986,16 +955,9 @@ export async function sendPollTelegram( poll: PollInput, opts: TelegramPollOpts = {}, ): Promise<{ messageId: string; chatId: string; pollId?: string }> { - const cfg = loadConfig(); - const account = resolveTelegramAccount({ - cfg, - accountId: opts.accountId, - }); - const token = resolveToken(opts.token, account); + const { cfg, account, api } = resolveTelegramApiContext(opts); const target = parseTelegramTarget(to); const chatId = normalizeChatId(target.chatId); - const client = resolveTelegramClientOptions(account); - const api = opts.api ?? new Bot(token, client ? { client } : undefined).api; // Normalize the poll input (validates question, options, maxSelections) const normalizedPoll = normalizePollInput(poll, { maxOptions: 10 }); @@ -1009,58 +971,18 @@ export async function sendPollTelegram( // Build poll options as simple strings (Grammy accepts string[] or InputPollOption[]) const pollOptions = normalizedPoll.options; - const request = createTelegramRetryRunner({ + const requestWithDiag = createTelegramRequestWithDiag({ + cfg, + account, retry: opts.retry, - configRetry: account.config.retry, verbose: opts.verbose, shouldRetry: (err) => isRecoverableTelegramNetworkError(err, { context: "send" }), }); - const logHttpError = createTelegramHttpLogger(cfg); - const requestWithDiag = (fn: () => Promise, label?: string) => - withTelegramApiErrorLogging({ - operation: label ?? "request", - fn: () => request(fn, label), - }).catch((err) => { - logHttpError(label ?? "request", err); - throw err; - }); - - const wrapChatNotFound = (err: unknown) => { - if (!/400: Bad Request: chat not found/i.test(formatErrorMessage(err))) { - return err; - } - return new Error( - [ - `Telegram send failed: chat not found (chat_id=${chatId}).`, - "Likely: bot not started in DM, bot removed from group/channel, group migrated (new -100… id), or wrong bot token.", - `Input was: ${JSON.stringify(to)}.`, - ].join(" "), - ); - }; - - const sendWithThreadFallback = async ( - params: Record | undefined, - label: string, - attempt: ( - effectiveParams: Record | undefined, - effectiveLabel: string, - ) => Promise, - ): Promise => { - try { - return await attempt(params, label); - } catch (err) { - if (!hasMessageThreadIdParam(params) || !isTelegramThreadNotFoundError(err)) { - throw err; - } - if (opts.verbose) { - console.warn( - `telegram ${label} failed with message_thread_id, retrying without thread: ${formatErrorMessage(err)}`, - ); - } - const retriedParams = removeMessageThreadIdParam(params); - return await attempt(retriedParams, `${label}-threadless`); - } - }; + const requestWithChatNotFound = createRequestWithChatNotFound({ + requestWithDiag, + chatId, + input: to, + }); const durationSeconds = normalizedPoll.durationSeconds; if (durationSeconds === undefined && normalizedPoll.durationHours !== undefined) { @@ -1085,13 +1007,15 @@ export async function sendPollTelegram( ...(opts.silent === true ? { disable_notification: true } : {}), }; - const result = await sendWithThreadFallback(pollParams, "poll", async (effectiveParams, label) => - requestWithDiag( - () => api.sendPoll(chatId, normalizedPoll.question, pollOptions, effectiveParams), - label, - ).catch((err) => { - throw wrapChatNotFound(err); - }), + const result = await withTelegramThreadFallback( + pollParams, + "poll", + opts.verbose, + async (effectiveParams, label) => + requestWithChatNotFound( + () => api.sendPoll(chatId, normalizedPoll.question, pollOptions, effectiveParams), + label, + ), ); const messageId = String(result?.message_id ?? "unknown"); diff --git a/src/telegram/webhook-set.ts b/src/telegram/webhook-set.ts deleted file mode 100644 index 1bee5248526..00000000000 --- a/src/telegram/webhook-set.ts +++ /dev/null @@ -1,41 +0,0 @@ -import { type ApiClientOptions, Bot } from "grammy"; -import type { TelegramNetworkConfig } from "../config/types.telegram.js"; -import { withTelegramApiErrorLogging } from "./api-logging.js"; -import { resolveTelegramFetch } from "./fetch.js"; - -export async function setTelegramWebhook(opts: { - token: string; - url: string; - secret?: string; - dropPendingUpdates?: boolean; - network?: TelegramNetworkConfig; -}) { - const fetchImpl = resolveTelegramFetch(undefined, { network: opts.network }); - const client: ApiClientOptions | undefined = fetchImpl - ? { fetch: fetchImpl as unknown as ApiClientOptions["fetch"] } - : undefined; - const bot = new Bot(opts.token, client ? { client } : undefined); - await withTelegramApiErrorLogging({ - operation: "setWebhook", - fn: () => - bot.api.setWebhook(opts.url, { - secret_token: opts.secret, - drop_pending_updates: opts.dropPendingUpdates ?? false, - }), - }); -} - -export async function deleteTelegramWebhook(opts: { - token: string; - network?: TelegramNetworkConfig; -}) { - const fetchImpl = resolveTelegramFetch(undefined, { network: opts.network }); - const client: ApiClientOptions | undefined = fetchImpl - ? { fetch: fetchImpl as unknown as ApiClientOptions["fetch"] } - : undefined; - const bot = new Bot(opts.token, client ? { client } : undefined); - await withTelegramApiErrorLogging({ - operation: "deleteWebhook", - fn: () => bot.api.deleteWebhook(), - }); -}