import { sequentialize } from "@grammyjs/runner"; import { apiThrottler } from "@grammyjs/transformer-throttler"; import { type Message, type UserFromGetMe } from "@grammyjs/types"; import type { ApiClientOptions } from "grammy"; import { Bot, webhookCallback } from "grammy"; import { resolveDefaultAgentId } from "../agents/agent-scope.js"; import { resolveTextChunkLimit } from "../auto-reply/chunk.js"; import { isAbortRequestText } from "../auto-reply/reply/abort.js"; import { DEFAULT_GROUP_HISTORY_LIMIT, type HistoryEntry } from "../auto-reply/reply/history.js"; import { isNativeCommandsExplicitlyDisabled, resolveNativeCommandsEnabled, resolveNativeSkillsEnabled, } from "../config/commands.js"; import type { OpenClawConfig, ReplyToMode } from "../config/config.js"; import { loadConfig } from "../config/config.js"; import { resolveChannelGroupPolicy, resolveChannelGroupRequireMention, } from "../config/group-policy.js"; import { loadSessionStore, resolveStorePath } from "../config/sessions.js"; import { danger, logVerbose, shouldLogVerbose } from "../globals.js"; import { formatUncaughtError } from "../infra/errors.js"; import { getChildLogger } from "../logging.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; import type { RuntimeEnv } from "../runtime.js"; import { resolveTelegramAccount } from "./accounts.js"; import { registerTelegramHandlers } from "./bot-handlers.js"; import { createTelegramMessageProcessor } from "./bot-message.js"; import { registerTelegramNativeCommands } from "./bot-native-commands.js"; import { buildTelegramUpdateKey, createTelegramUpdateDedupe, resolveTelegramUpdateId, type TelegramUpdateKeyContext, } from "./bot-updates.js"; import { buildTelegramGroupPeerId, resolveTelegramForumThreadId, resolveTelegramStreamMode, } from "./bot/helpers.js"; import { resolveTelegramFetch } from "./fetch.js"; export type TelegramBotOptions = { token: string; accountId?: string; runtime?: RuntimeEnv; requireMention?: boolean; allowFrom?: Array; groupAllowFrom?: Array; mediaMaxMb?: number; replyToMode?: ReplyToMode; proxyFetch?: typeof fetch; config?: OpenClawConfig; updateOffset?: { lastUpdateId?: number | null; onUpdateId?: (updateId: number) => void | Promise; }; testTimings?: { mediaGroupFlushMs?: number; textFragmentGapMs?: number; }; }; export function getTelegramSequentialKey(ctx: { chat?: { id?: number }; me?: UserFromGetMe; message?: Message; channelPost?: Message; editedChannelPost?: Message; update?: { message?: Message; edited_message?: Message; channel_post?: Message; edited_channel_post?: Message; callback_query?: { message?: Message }; message_reaction?: { chat?: { id?: number } }; }; }): string { // Handle reaction updates const reaction = ctx.update?.message_reaction; if (reaction?.chat?.id) { return `telegram:${reaction.chat.id}`; } const msg = ctx.message ?? ctx.channelPost ?? ctx.editedChannelPost ?? ctx.update?.message ?? ctx.update?.edited_message ?? ctx.update?.channel_post ?? ctx.update?.edited_channel_post ?? ctx.update?.callback_query?.message; const chatId = msg?.chat?.id ?? ctx.chat?.id; const rawText = msg?.text ?? msg?.caption; const botUsername = ctx.me?.username; if (isAbortRequestText(rawText, botUsername ? { botUsername } : undefined)) { if (typeof chatId === "number") { return `telegram:${chatId}:control`; } return "telegram:control"; } const isGroup = msg?.chat?.type === "group" || msg?.chat?.type === "supergroup"; const messageThreadId = msg?.message_thread_id; const isForum = msg?.chat?.is_forum; const threadId = isGroup ? resolveTelegramForumThreadId({ isForum, messageThreadId }) : messageThreadId; if (typeof chatId === "number") { return threadId != null ? `telegram:${chatId}:topic:${threadId}` : `telegram:${chatId}`; } return "telegram:unknown"; } export function createTelegramBot(opts: TelegramBotOptions) { const runtime: RuntimeEnv = opts.runtime ?? { log: console.log, error: console.error, exit: (code: number): never => { throw new Error(`exit ${code}`); }, }; const cfg = opts.config ?? loadConfig(); const account = resolveTelegramAccount({ cfg, accountId: opts.accountId, }); const telegramCfg = account.config; const fetchImpl = resolveTelegramFetch(opts.proxyFetch, { network: telegramCfg.network, }) as unknown as ApiClientOptions["fetch"]; const shouldProvideFetch = Boolean(fetchImpl); // grammY's ApiClientOptions types still track `node-fetch` types; Node 22+ global fetch // (undici) is structurally compatible at runtime but not assignable in TS. const fetchForClient = fetchImpl as unknown as NonNullable; const timeoutSeconds = typeof telegramCfg?.timeoutSeconds === "number" && Number.isFinite(telegramCfg.timeoutSeconds) ? Math.max(1, Math.floor(telegramCfg.timeoutSeconds)) : undefined; const client: ApiClientOptions | undefined = shouldProvideFetch || timeoutSeconds ? { ...(shouldProvideFetch && fetchImpl ? { fetch: fetchForClient } : {}), ...(timeoutSeconds ? { timeoutSeconds } : {}), } : undefined; const bot = new Bot(opts.token, client ? { client } : undefined); bot.api.config.use(apiThrottler()); // Catch all errors from bot middleware to prevent unhandled rejections bot.catch((err) => { runtime.error?.(danger(`telegram bot error: ${formatUncaughtError(err)}`)); }); const recentUpdates = createTelegramUpdateDedupe(); const initialUpdateId = typeof opts.updateOffset?.lastUpdateId === "number" ? opts.updateOffset.lastUpdateId : null; // Track update_ids that have entered the middleware pipeline but have not completed yet. // This includes updates that are "queued" behind sequentialize(...) for a chat/topic key. // We only persist a watermark that is strictly less than the smallest pending update_id, // so we never write an offset that would skip an update still waiting to run. const pendingUpdateIds = new Set(); let highestCompletedUpdateId: number | null = initialUpdateId; let highestPersistedUpdateId: number | null = initialUpdateId; const maybePersistSafeWatermark = () => { if (typeof opts.updateOffset?.onUpdateId !== "function") { return; } if (highestCompletedUpdateId === null) { return; } let safe = highestCompletedUpdateId; if (pendingUpdateIds.size > 0) { let minPending: number | null = null; for (const id of pendingUpdateIds) { if (minPending === null || id < minPending) { minPending = id; } } if (minPending !== null) { safe = Math.min(safe, minPending - 1); } } if (highestPersistedUpdateId !== null && safe <= highestPersistedUpdateId) { return; } highestPersistedUpdateId = safe; void opts.updateOffset.onUpdateId(safe); }; const shouldSkipUpdate = (ctx: TelegramUpdateKeyContext) => { const updateId = resolveTelegramUpdateId(ctx); const skipCutoff = highestPersistedUpdateId ?? initialUpdateId; if (typeof updateId === "number" && skipCutoff !== null && updateId <= skipCutoff) { return true; } const key = buildTelegramUpdateKey(ctx); const skipped = recentUpdates.check(key); if (skipped && key && shouldLogVerbose()) { logVerbose(`telegram dedupe: skipped ${key}`); } return skipped; }; bot.use(async (ctx, next) => { const updateId = resolveTelegramUpdateId(ctx); if (typeof updateId === "number") { pendingUpdateIds.add(updateId); } try { await next(); } finally { if (typeof updateId === "number") { pendingUpdateIds.delete(updateId); if (highestCompletedUpdateId === null || updateId > highestCompletedUpdateId) { highestCompletedUpdateId = updateId; } maybePersistSafeWatermark(); } } }); bot.use(sequentialize(getTelegramSequentialKey)); const rawUpdateLogger = createSubsystemLogger("gateway/channels/telegram/raw-update"); const MAX_RAW_UPDATE_CHARS = 8000; const MAX_RAW_UPDATE_STRING = 500; const MAX_RAW_UPDATE_ARRAY = 20; const stringifyUpdate = (update: unknown) => { const seen = new WeakSet(); return JSON.stringify(update ?? null, (key, value) => { if (typeof value === "string" && value.length > MAX_RAW_UPDATE_STRING) { return `${value.slice(0, MAX_RAW_UPDATE_STRING)}...`; } if (Array.isArray(value) && value.length > MAX_RAW_UPDATE_ARRAY) { return [ ...value.slice(0, MAX_RAW_UPDATE_ARRAY), `...(${value.length - MAX_RAW_UPDATE_ARRAY} more)`, ]; } if (value && typeof value === "object") { if (seen.has(value)) { return "[Circular]"; } seen.add(value); } return value; }); }; bot.use(async (ctx, next) => { if (shouldLogVerbose()) { try { const raw = stringifyUpdate(ctx.update); const preview = raw.length > MAX_RAW_UPDATE_CHARS ? `${raw.slice(0, MAX_RAW_UPDATE_CHARS)}...` : raw; rawUpdateLogger.debug(`telegram update: ${preview}`); } catch (err) { rawUpdateLogger.debug(`telegram update log failed: ${String(err)}`); } } await next(); }); const historyLimit = Math.max( 0, telegramCfg.historyLimit ?? cfg.messages?.groupChat?.historyLimit ?? DEFAULT_GROUP_HISTORY_LIMIT, ); const groupHistories = new Map(); const textLimit = resolveTextChunkLimit(cfg, "telegram", account.accountId); const dmPolicy = telegramCfg.dmPolicy ?? "pairing"; const allowFrom = opts.allowFrom ?? telegramCfg.allowFrom; const groupAllowFrom = opts.groupAllowFrom ?? telegramCfg.groupAllowFrom ?? (telegramCfg.allowFrom && telegramCfg.allowFrom.length > 0 ? telegramCfg.allowFrom : undefined) ?? (opts.allowFrom && opts.allowFrom.length > 0 ? opts.allowFrom : undefined); const replyToMode = opts.replyToMode ?? telegramCfg.replyToMode ?? "off"; const nativeEnabled = resolveNativeCommandsEnabled({ providerId: "telegram", providerSetting: telegramCfg.commands?.native, globalSetting: cfg.commands?.native, }); const nativeSkillsEnabled = resolveNativeSkillsEnabled({ providerId: "telegram", providerSetting: telegramCfg.commands?.nativeSkills, globalSetting: cfg.commands?.nativeSkills, }); const nativeDisabledExplicit = isNativeCommandsExplicitlyDisabled({ providerSetting: telegramCfg.commands?.native, globalSetting: cfg.commands?.native, }); const useAccessGroups = cfg.commands?.useAccessGroups !== false; const ackReactionScope = cfg.messages?.ackReactionScope ?? "group-mentions"; const mediaMaxBytes = (opts.mediaMaxMb ?? telegramCfg.mediaMaxMb ?? 5) * 1024 * 1024; const logger = getChildLogger({ module: "telegram-auto-reply" }); const streamMode = resolveTelegramStreamMode(telegramCfg); const resolveGroupPolicy = (chatId: string | number) => resolveChannelGroupPolicy({ cfg, channel: "telegram", accountId: account.accountId, groupId: String(chatId), }); const resolveGroupActivation = (params: { chatId: string | number; agentId?: string; messageThreadId?: number; sessionKey?: string; }) => { const agentId = params.agentId ?? resolveDefaultAgentId(cfg); const sessionKey = params.sessionKey ?? `agent:${agentId}:telegram:group:${buildTelegramGroupPeerId(params.chatId, params.messageThreadId)}`; const storePath = resolveStorePath(cfg.session?.store, { agentId }); try { const store = loadSessionStore(storePath); const entry = store[sessionKey]; if (entry?.groupActivation === "always") { return false; } if (entry?.groupActivation === "mention") { return true; } } catch (err) { logVerbose(`Failed to load session for activation check: ${String(err)}`); } return undefined; }; const resolveGroupRequireMention = (chatId: string | number) => resolveChannelGroupRequireMention({ cfg, channel: "telegram", accountId: account.accountId, groupId: String(chatId), requireMentionOverride: opts.requireMention, overrideOrder: "after-config", }); const resolveTelegramGroupConfig = (chatId: string | number, messageThreadId?: number) => { const groups = telegramCfg.groups; if (!groups) { return { groupConfig: undefined, topicConfig: undefined }; } const groupKey = String(chatId); const groupConfig = groups[groupKey] ?? groups["*"]; const topicConfig = messageThreadId != null ? groupConfig?.topics?.[String(messageThreadId)] : undefined; return { groupConfig, topicConfig }; }; const processMessage = createTelegramMessageProcessor({ bot, cfg, account, telegramCfg, historyLimit, groupHistories, dmPolicy, allowFrom, groupAllowFrom, ackReactionScope, logger, resolveGroupActivation, resolveGroupRequireMention, resolveTelegramGroupConfig, runtime, replyToMode, streamMode, textLimit, opts, }); registerTelegramNativeCommands({ bot, cfg, runtime, accountId: account.accountId, telegramCfg, allowFrom, groupAllowFrom, replyToMode, textLimit, useAccessGroups, nativeEnabled, nativeSkillsEnabled, nativeDisabledExplicit, resolveGroupPolicy, resolveTelegramGroupConfig, shouldSkipUpdate, opts, }); registerTelegramHandlers({ cfg, accountId: account.accountId, bot, opts, runtime, mediaMaxBytes, telegramCfg, groupAllowFrom, resolveGroupPolicy, resolveTelegramGroupConfig, shouldSkipUpdate, processMessage, logger, }); return bot; } export function createTelegramWebhookCallback(bot: Bot, path = "/telegram-webhook") { return { path, handler: webhookCallback(bot, "http") }; }