mirror of https://github.com/openclaw/openclaw.git
refactor: simplify telegram pipeline and remove dead modules
This commit is contained in:
parent
1f607bec49
commit
732f68f259
|
|
@ -1,4 +1,5 @@
|
|||
import type { Message } from "@grammyjs/types";
|
||||
import type { TelegramGroupConfig, TelegramTopicConfig } from "../config/types.js";
|
||||
import type { TelegramMediaRef } from "./bot-message-context.js";
|
||||
import type { TelegramContext } from "./bot/types.js";
|
||||
import { resolveDefaultAgentId } from "../agents/agent-scope.js";
|
||||
|
|
@ -21,7 +22,11 @@ import { readChannelAllowFromStore } from "../pairing/pairing-store.js";
|
|||
import { resolveAgentRoute } from "../routing/resolve-route.js";
|
||||
import { resolveThreadSessionKeys } from "../routing/session-key.js";
|
||||
import { withTelegramApiErrorLogging } from "./api-logging.js";
|
||||
import { firstDefined, isSenderAllowed, normalizeAllowFromWithStore } from "./bot-access.js";
|
||||
import {
|
||||
isSenderAllowed,
|
||||
normalizeAllowFromWithStore,
|
||||
type NormalizedAllowFrom,
|
||||
} from "./bot-access.js";
|
||||
import { RegisterTelegramHandlerParams } from "./bot-native-commands.js";
|
||||
import { MEDIA_GROUP_TIMEOUT_MS, type MediaGroupEntry } from "./bot-updates.js";
|
||||
import { resolveMedia } from "./bot/delivery.js";
|
||||
|
|
@ -31,6 +36,10 @@ import {
|
|||
resolveTelegramForumThreadId,
|
||||
resolveTelegramGroupAllowFromContext,
|
||||
} from "./bot/helpers.js";
|
||||
import {
|
||||
evaluateTelegramGroupBaseAccess,
|
||||
evaluateTelegramGroupPolicyAccess,
|
||||
} from "./group-access.js";
|
||||
import { migrateTelegramGroupConfig } from "./group-migration.js";
|
||||
import { resolveTelegramInlineButtonsScope } from "./inline-buttons.js";
|
||||
import {
|
||||
|
|
@ -227,11 +236,7 @@ export const registerTelegramHandlers = ({
|
|||
}
|
||||
}
|
||||
|
||||
const storeAllowFrom = await readChannelAllowFromStore(
|
||||
"telegram",
|
||||
process.env,
|
||||
accountId,
|
||||
).catch(() => []);
|
||||
const storeAllowFrom = await loadStoreAllowFrom();
|
||||
await processMessage(primaryEntry.ctx, allMedia, storeAllowFrom);
|
||||
} catch (err) {
|
||||
runtime.error?.(danger(`media group handler failed: ${String(err)}`));
|
||||
|
|
@ -262,11 +267,7 @@ export const registerTelegramHandlers = ({
|
|||
date: last.msg.date ?? first.msg.date,
|
||||
};
|
||||
|
||||
const storeAllowFrom = await readChannelAllowFromStore(
|
||||
"telegram",
|
||||
process.env,
|
||||
accountId,
|
||||
).catch(() => []);
|
||||
const storeAllowFrom = await loadStoreAllowFrom();
|
||||
const baseCtx = first.ctx;
|
||||
const getFile =
|
||||
typeof baseCtx.getFile === "function" ? baseCtx.getFile.bind(baseCtx) : async () => ({});
|
||||
|
|
@ -282,19 +283,170 @@ export const registerTelegramHandlers = ({
|
|||
}
|
||||
};
|
||||
|
||||
const queueTextFragmentFlush = async (entry: TextFragmentEntry) => {
|
||||
textFragmentProcessing = textFragmentProcessing
|
||||
.then(async () => {
|
||||
await flushTextFragments(entry);
|
||||
})
|
||||
.catch(() => undefined);
|
||||
await textFragmentProcessing;
|
||||
};
|
||||
|
||||
const runTextFragmentFlush = async (entry: TextFragmentEntry) => {
|
||||
textFragmentBuffer.delete(entry.key);
|
||||
await queueTextFragmentFlush(entry);
|
||||
};
|
||||
|
||||
const scheduleTextFragmentFlush = (entry: TextFragmentEntry) => {
|
||||
clearTimeout(entry.timer);
|
||||
entry.timer = setTimeout(async () => {
|
||||
textFragmentBuffer.delete(entry.key);
|
||||
textFragmentProcessing = textFragmentProcessing
|
||||
.then(async () => {
|
||||
await flushTextFragments(entry);
|
||||
})
|
||||
.catch(() => undefined);
|
||||
await textFragmentProcessing;
|
||||
await runTextFragmentFlush(entry);
|
||||
}, TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS);
|
||||
};
|
||||
|
||||
const enqueueMediaGroupFlush = async (mediaGroupId: string, entry: MediaGroupEntry) => {
|
||||
mediaGroupBuffer.delete(mediaGroupId);
|
||||
mediaGroupProcessing = mediaGroupProcessing
|
||||
.then(async () => {
|
||||
await processMediaGroup(entry);
|
||||
})
|
||||
.catch(() => undefined);
|
||||
await mediaGroupProcessing;
|
||||
};
|
||||
|
||||
const scheduleMediaGroupFlush = (mediaGroupId: string, entry: MediaGroupEntry) => {
|
||||
clearTimeout(entry.timer);
|
||||
entry.timer = setTimeout(async () => {
|
||||
await enqueueMediaGroupFlush(mediaGroupId, entry);
|
||||
}, mediaGroupTimeoutMs);
|
||||
};
|
||||
|
||||
const getOrCreateMediaGroupEntry = (mediaGroupId: string) => {
|
||||
const existing = mediaGroupBuffer.get(mediaGroupId);
|
||||
if (existing) {
|
||||
return existing;
|
||||
}
|
||||
const entry: MediaGroupEntry = {
|
||||
messages: [],
|
||||
timer: setTimeout(() => undefined, mediaGroupTimeoutMs),
|
||||
};
|
||||
mediaGroupBuffer.set(mediaGroupId, entry);
|
||||
return entry;
|
||||
};
|
||||
|
||||
const loadStoreAllowFrom = async () =>
|
||||
readChannelAllowFromStore("telegram", process.env, accountId).catch(() => []);
|
||||
|
||||
const isAllowlistAuthorized = (
|
||||
allow: NormalizedAllowFrom,
|
||||
senderId: string,
|
||||
senderUsername: string,
|
||||
) =>
|
||||
allow.hasWildcard ||
|
||||
(allow.hasEntries &&
|
||||
isSenderAllowed({
|
||||
allow,
|
||||
senderId,
|
||||
senderUsername,
|
||||
}));
|
||||
|
||||
const shouldSkipGroupMessage = (params: {
|
||||
isGroup: boolean;
|
||||
chatId: string | number;
|
||||
chatTitle?: string;
|
||||
resolvedThreadId?: number;
|
||||
senderId: string;
|
||||
senderUsername: string;
|
||||
effectiveGroupAllow: NormalizedAllowFrom;
|
||||
hasGroupAllowOverride: boolean;
|
||||
groupConfig?: TelegramGroupConfig;
|
||||
topicConfig?: TelegramTopicConfig;
|
||||
}) => {
|
||||
const {
|
||||
isGroup,
|
||||
chatId,
|
||||
chatTitle,
|
||||
resolvedThreadId,
|
||||
senderId,
|
||||
senderUsername,
|
||||
effectiveGroupAllow,
|
||||
hasGroupAllowOverride,
|
||||
groupConfig,
|
||||
topicConfig,
|
||||
} = params;
|
||||
const baseAccess = evaluateTelegramGroupBaseAccess({
|
||||
isGroup,
|
||||
groupConfig,
|
||||
topicConfig,
|
||||
hasGroupAllowOverride,
|
||||
effectiveGroupAllow,
|
||||
senderId,
|
||||
senderUsername,
|
||||
enforceAllowOverride: true,
|
||||
requireSenderForAllowOverride: true,
|
||||
});
|
||||
if (!baseAccess.allowed) {
|
||||
if (baseAccess.reason === "group-disabled") {
|
||||
logVerbose(`Blocked telegram group ${chatId} (group disabled)`);
|
||||
return true;
|
||||
}
|
||||
if (baseAccess.reason === "topic-disabled") {
|
||||
logVerbose(
|
||||
`Blocked telegram topic ${chatId} (${resolvedThreadId ?? "unknown"}) (topic disabled)`,
|
||||
);
|
||||
return true;
|
||||
}
|
||||
logVerbose(
|
||||
`Blocked telegram group sender ${senderId || "unknown"} (group allowFrom override)`,
|
||||
);
|
||||
return true;
|
||||
}
|
||||
if (!isGroup) {
|
||||
return false;
|
||||
}
|
||||
const policyAccess = evaluateTelegramGroupPolicyAccess({
|
||||
isGroup,
|
||||
chatId,
|
||||
cfg,
|
||||
telegramCfg,
|
||||
topicConfig,
|
||||
groupConfig,
|
||||
effectiveGroupAllow,
|
||||
senderId,
|
||||
senderUsername,
|
||||
resolveGroupPolicy,
|
||||
enforcePolicy: true,
|
||||
useTopicAndGroupOverrides: true,
|
||||
enforceAllowlistAuthorization: true,
|
||||
allowEmptyAllowlistEntries: false,
|
||||
requireSenderForAllowlistAuthorization: true,
|
||||
checkChatAllowlist: true,
|
||||
});
|
||||
if (!policyAccess.allowed) {
|
||||
if (policyAccess.reason === "group-policy-disabled") {
|
||||
logVerbose("Blocked telegram group message (groupPolicy: disabled)");
|
||||
return true;
|
||||
}
|
||||
if (policyAccess.reason === "group-policy-allowlist-no-sender") {
|
||||
logVerbose("Blocked telegram group message (no sender ID, groupPolicy: allowlist)");
|
||||
return true;
|
||||
}
|
||||
if (policyAccess.reason === "group-policy-allowlist-empty") {
|
||||
logVerbose(
|
||||
"Blocked telegram group message (groupPolicy: allowlist, no group allowlist entries)",
|
||||
);
|
||||
return true;
|
||||
}
|
||||
if (policyAccess.reason === "group-policy-allowlist-unauthorized") {
|
||||
logVerbose(`Blocked telegram group message from ${senderId} (groupPolicy: allowlist)`);
|
||||
return true;
|
||||
}
|
||||
logger.info({ chatId, title: chatTitle, reason: "not-allowed" }, "skipping group message");
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
};
|
||||
|
||||
bot.on("callback_query", async (ctx) => {
|
||||
const callback = ctx.callbackQuery;
|
||||
if (!callback) {
|
||||
|
|
@ -303,11 +455,15 @@ export const registerTelegramHandlers = ({
|
|||
if (shouldSkipUpdate(ctx)) {
|
||||
return;
|
||||
}
|
||||
const answerCallbackQuery =
|
||||
typeof (ctx as { answerCallbackQuery?: unknown }).answerCallbackQuery === "function"
|
||||
? () => ctx.answerCallbackQuery()
|
||||
: () => bot.api.answerCallbackQuery(callback.id);
|
||||
// Answer immediately to prevent Telegram from retrying while we process
|
||||
await withTelegramApiErrorLogging({
|
||||
operation: "answerCallbackQuery",
|
||||
runtime,
|
||||
fn: () => bot.api.answerCallbackQuery(callback.id),
|
||||
fn: answerCallbackQuery,
|
||||
}).catch(() => {});
|
||||
try {
|
||||
const data = (callback.data ?? "").trim();
|
||||
|
|
@ -315,6 +471,38 @@ export const registerTelegramHandlers = ({
|
|||
if (!data || !callbackMessage) {
|
||||
return;
|
||||
}
|
||||
const editCallbackMessage = async (
|
||||
text: string,
|
||||
params?: Parameters<typeof bot.api.editMessageText>[3],
|
||||
) => {
|
||||
const editTextFn = (ctx as { editMessageText?: unknown }).editMessageText;
|
||||
if (typeof editTextFn === "function") {
|
||||
return await ctx.editMessageText(text, params);
|
||||
}
|
||||
return await bot.api.editMessageText(
|
||||
callbackMessage.chat.id,
|
||||
callbackMessage.message_id,
|
||||
text,
|
||||
params,
|
||||
);
|
||||
};
|
||||
const deleteCallbackMessage = async () => {
|
||||
const deleteFn = (ctx as { deleteMessage?: unknown }).deleteMessage;
|
||||
if (typeof deleteFn === "function") {
|
||||
return await ctx.deleteMessage();
|
||||
}
|
||||
return await bot.api.deleteMessage(callbackMessage.chat.id, callbackMessage.message_id);
|
||||
};
|
||||
const replyToCallbackChat = async (
|
||||
text: string,
|
||||
params?: Parameters<typeof bot.api.sendMessage>[2],
|
||||
) => {
|
||||
const replyFn = (ctx as { reply?: unknown }).reply;
|
||||
if (typeof replyFn === "function") {
|
||||
return await ctx.reply(text, params);
|
||||
}
|
||||
return await bot.api.sendMessage(callbackMessage.chat.id, text, params);
|
||||
};
|
||||
|
||||
const inlineButtonsScope = resolveTelegramInlineButtonsScope({
|
||||
cfg,
|
||||
|
|
@ -344,8 +532,14 @@ export const registerTelegramHandlers = ({
|
|||
groupAllowFrom,
|
||||
resolveTelegramGroupConfig,
|
||||
});
|
||||
const { resolvedThreadId, storeAllowFrom, groupConfig, topicConfig, effectiveGroupAllow } =
|
||||
groupAllowContext;
|
||||
const {
|
||||
resolvedThreadId,
|
||||
storeAllowFrom,
|
||||
groupConfig,
|
||||
topicConfig,
|
||||
effectiveGroupAllow,
|
||||
hasGroupAllowOverride,
|
||||
} = groupAllowContext;
|
||||
const effectiveDmAllow = normalizeAllowFromWithStore({
|
||||
allowFrom: telegramCfg.allowFrom,
|
||||
storeAllowFrom,
|
||||
|
|
@ -353,75 +547,21 @@ export const registerTelegramHandlers = ({
|
|||
const dmPolicy = telegramCfg.dmPolicy ?? "pairing";
|
||||
const senderId = callback.from?.id ? String(callback.from.id) : "";
|
||||
const senderUsername = callback.from?.username ?? "";
|
||||
|
||||
if (isGroup) {
|
||||
if (groupConfig?.enabled === false) {
|
||||
logVerbose(`Blocked telegram group ${chatId} (group disabled)`);
|
||||
return;
|
||||
}
|
||||
if (topicConfig?.enabled === false) {
|
||||
logVerbose(
|
||||
`Blocked telegram topic ${chatId} (${resolvedThreadId ?? "unknown"}) (topic disabled)`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
if (groupAllowContext.hasGroupAllowOverride) {
|
||||
const allowed =
|
||||
senderId &&
|
||||
isSenderAllowed({
|
||||
allow: effectiveGroupAllow,
|
||||
senderId,
|
||||
senderUsername,
|
||||
});
|
||||
if (!allowed) {
|
||||
logVerbose(
|
||||
`Blocked telegram group sender ${senderId || "unknown"} (group allowFrom override)`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
const defaultGroupPolicy = cfg.channels?.defaults?.groupPolicy;
|
||||
const groupPolicy = firstDefined(
|
||||
topicConfig?.groupPolicy,
|
||||
groupConfig?.groupPolicy,
|
||||
telegramCfg.groupPolicy,
|
||||
defaultGroupPolicy,
|
||||
"open",
|
||||
);
|
||||
if (groupPolicy === "disabled") {
|
||||
logVerbose(`Blocked telegram group message (groupPolicy: disabled)`);
|
||||
return;
|
||||
}
|
||||
if (groupPolicy === "allowlist") {
|
||||
if (!senderId) {
|
||||
logVerbose(`Blocked telegram group message (no sender ID, groupPolicy: allowlist)`);
|
||||
return;
|
||||
}
|
||||
if (!effectiveGroupAllow.hasEntries) {
|
||||
logVerbose(
|
||||
"Blocked telegram group message (groupPolicy: allowlist, no group allowlist entries)",
|
||||
);
|
||||
return;
|
||||
}
|
||||
if (
|
||||
!isSenderAllowed({
|
||||
allow: effectiveGroupAllow,
|
||||
senderId,
|
||||
senderUsername,
|
||||
})
|
||||
) {
|
||||
logVerbose(`Blocked telegram group message from ${senderId} (groupPolicy: allowlist)`);
|
||||
return;
|
||||
}
|
||||
}
|
||||
const groupAllowlist = resolveGroupPolicy(chatId);
|
||||
if (groupAllowlist.allowlistEnabled && !groupAllowlist.allowed) {
|
||||
logger.info(
|
||||
{ chatId, title: callbackMessage.chat.title, reason: "not-allowed" },
|
||||
"skipping group message",
|
||||
);
|
||||
return;
|
||||
}
|
||||
if (
|
||||
shouldSkipGroupMessage({
|
||||
isGroup,
|
||||
chatId,
|
||||
chatTitle: callbackMessage.chat.title,
|
||||
resolvedThreadId,
|
||||
senderId,
|
||||
senderUsername,
|
||||
effectiveGroupAllow,
|
||||
hasGroupAllowOverride,
|
||||
groupConfig,
|
||||
topicConfig,
|
||||
})
|
||||
) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (inlineButtonsScope === "allowlist") {
|
||||
|
|
@ -430,27 +570,13 @@ export const registerTelegramHandlers = ({
|
|||
return;
|
||||
}
|
||||
if (dmPolicy !== "open") {
|
||||
const allowed =
|
||||
effectiveDmAllow.hasWildcard ||
|
||||
(effectiveDmAllow.hasEntries &&
|
||||
isSenderAllowed({
|
||||
allow: effectiveDmAllow,
|
||||
senderId,
|
||||
senderUsername,
|
||||
}));
|
||||
const allowed = isAllowlistAuthorized(effectiveDmAllow, senderId, senderUsername);
|
||||
if (!allowed) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
const allowed =
|
||||
effectiveGroupAllow.hasWildcard ||
|
||||
(effectiveGroupAllow.hasEntries &&
|
||||
isSenderAllowed({
|
||||
allow: effectiveGroupAllow,
|
||||
senderId,
|
||||
senderUsername,
|
||||
}));
|
||||
const allowed = isAllowlistAuthorized(effectiveGroupAllow, senderId, senderUsername);
|
||||
if (!allowed) {
|
||||
return;
|
||||
}
|
||||
|
|
@ -487,12 +613,7 @@ export const registerTelegramHandlers = ({
|
|||
: undefined;
|
||||
|
||||
try {
|
||||
await bot.api.editMessageText(
|
||||
callbackMessage.chat.id,
|
||||
callbackMessage.message_id,
|
||||
result.text,
|
||||
keyboard ? { reply_markup: keyboard } : undefined,
|
||||
);
|
||||
await editCallbackMessage(result.text, keyboard ? { reply_markup: keyboard } : undefined);
|
||||
} catch (editErr) {
|
||||
const errStr = String(editErr);
|
||||
if (!errStr.includes("message is not modified")) {
|
||||
|
|
@ -514,23 +635,14 @@ export const registerTelegramHandlers = ({
|
|||
) => {
|
||||
const keyboard = buildInlineKeyboard(buttons);
|
||||
try {
|
||||
await bot.api.editMessageText(
|
||||
callbackMessage.chat.id,
|
||||
callbackMessage.message_id,
|
||||
text,
|
||||
keyboard ? { reply_markup: keyboard } : undefined,
|
||||
);
|
||||
await editCallbackMessage(text, keyboard ? { reply_markup: keyboard } : undefined);
|
||||
} catch (editErr) {
|
||||
const errStr = String(editErr);
|
||||
if (errStr.includes("no text in the message")) {
|
||||
try {
|
||||
await bot.api.deleteMessage(callbackMessage.chat.id, callbackMessage.message_id);
|
||||
await deleteCallbackMessage();
|
||||
} catch {}
|
||||
await bot.api.sendMessage(
|
||||
callbackMessage.chat.id,
|
||||
text,
|
||||
keyboard ? { reply_markup: keyboard } : undefined,
|
||||
);
|
||||
await replyToCallbackChat(text, keyboard ? { reply_markup: keyboard } : undefined);
|
||||
} else if (!errStr.includes("message is not modified")) {
|
||||
throw editErr;
|
||||
}
|
||||
|
|
@ -723,85 +835,23 @@ export const registerTelegramHandlers = ({
|
|||
hasGroupAllowOverride,
|
||||
} = groupAllowContext;
|
||||
|
||||
if (isGroup) {
|
||||
if (groupConfig?.enabled === false) {
|
||||
logVerbose(`Blocked telegram group ${chatId} (group disabled)`);
|
||||
return;
|
||||
}
|
||||
if (topicConfig?.enabled === false) {
|
||||
logVerbose(
|
||||
`Blocked telegram topic ${chatId} (${resolvedThreadId ?? "unknown"}) (topic disabled)`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
if (hasGroupAllowOverride) {
|
||||
const senderId = msg.from?.id;
|
||||
const senderUsername = msg.from?.username ?? "";
|
||||
const allowed =
|
||||
senderId != null &&
|
||||
isSenderAllowed({
|
||||
allow: effectiveGroupAllow,
|
||||
senderId: String(senderId),
|
||||
senderUsername,
|
||||
});
|
||||
if (!allowed) {
|
||||
logVerbose(
|
||||
`Blocked telegram group sender ${senderId ?? "unknown"} (group allowFrom override)`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
// Group policy filtering: controls how group messages are handled
|
||||
// - "open": groups bypass allowFrom, only mention-gating applies
|
||||
// - "disabled": block all group messages entirely
|
||||
// - "allowlist": only allow group messages from senders in groupAllowFrom/allowFrom
|
||||
const defaultGroupPolicy = cfg.channels?.defaults?.groupPolicy;
|
||||
const groupPolicy = firstDefined(
|
||||
topicConfig?.groupPolicy,
|
||||
groupConfig?.groupPolicy,
|
||||
telegramCfg.groupPolicy,
|
||||
defaultGroupPolicy,
|
||||
"open",
|
||||
);
|
||||
if (groupPolicy === "disabled") {
|
||||
logVerbose(`Blocked telegram group message (groupPolicy: disabled)`);
|
||||
return;
|
||||
}
|
||||
if (groupPolicy === "allowlist") {
|
||||
// For allowlist mode, the sender (msg.from.id) must be in allowFrom
|
||||
const senderId = msg.from?.id;
|
||||
if (senderId == null) {
|
||||
logVerbose(`Blocked telegram group message (no sender ID, groupPolicy: allowlist)`);
|
||||
return;
|
||||
}
|
||||
if (!effectiveGroupAllow.hasEntries) {
|
||||
logVerbose(
|
||||
"Blocked telegram group message (groupPolicy: allowlist, no group allowlist entries)",
|
||||
);
|
||||
return;
|
||||
}
|
||||
const senderUsername = msg.from?.username ?? "";
|
||||
if (
|
||||
!isSenderAllowed({
|
||||
allow: effectiveGroupAllow,
|
||||
senderId: String(senderId),
|
||||
senderUsername,
|
||||
})
|
||||
) {
|
||||
logVerbose(`Blocked telegram group message from ${senderId} (groupPolicy: allowlist)`);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Group allowlist based on configured group IDs.
|
||||
const groupAllowlist = resolveGroupPolicy(chatId);
|
||||
if (groupAllowlist.allowlistEnabled && !groupAllowlist.allowed) {
|
||||
logger.info(
|
||||
{ chatId, title: msg.chat.title, reason: "not-allowed" },
|
||||
"skipping group message",
|
||||
);
|
||||
return;
|
||||
}
|
||||
const senderId = msg.from?.id != null ? String(msg.from.id) : "";
|
||||
const senderUsername = msg.from?.username ?? "";
|
||||
if (
|
||||
shouldSkipGroupMessage({
|
||||
isGroup,
|
||||
chatId,
|
||||
chatTitle: msg.chat.title,
|
||||
resolvedThreadId,
|
||||
senderId,
|
||||
senderUsername,
|
||||
effectiveGroupAllow,
|
||||
hasGroupAllowOverride,
|
||||
groupConfig,
|
||||
topicConfig,
|
||||
})
|
||||
) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Text fragment handling - Telegram splits long pastes into multiple inbound messages (~4096 chars).
|
||||
|
|
@ -844,13 +894,7 @@ export const registerTelegramHandlers = ({
|
|||
|
||||
// Not appendable (or limits exceeded): flush buffered entry first, then continue normally.
|
||||
clearTimeout(existing.timer);
|
||||
textFragmentBuffer.delete(key);
|
||||
textFragmentProcessing = textFragmentProcessing
|
||||
.then(async () => {
|
||||
await flushTextFragments(existing);
|
||||
})
|
||||
.catch(() => undefined);
|
||||
await textFragmentProcessing;
|
||||
await runTextFragmentFlush(existing);
|
||||
}
|
||||
|
||||
const shouldStart = text.length >= TELEGRAM_TEXT_FRAGMENT_START_THRESHOLD_CHARS;
|
||||
|
|
@ -869,34 +913,9 @@ export const registerTelegramHandlers = ({
|
|||
// Media group handling - buffer multi-image messages
|
||||
const mediaGroupId = msg.media_group_id;
|
||||
if (mediaGroupId) {
|
||||
const existing = mediaGroupBuffer.get(mediaGroupId);
|
||||
if (existing) {
|
||||
clearTimeout(existing.timer);
|
||||
existing.messages.push({ msg, ctx });
|
||||
existing.timer = setTimeout(async () => {
|
||||
mediaGroupBuffer.delete(mediaGroupId);
|
||||
mediaGroupProcessing = mediaGroupProcessing
|
||||
.then(async () => {
|
||||
await processMediaGroup(existing);
|
||||
})
|
||||
.catch(() => undefined);
|
||||
await mediaGroupProcessing;
|
||||
}, mediaGroupTimeoutMs);
|
||||
} else {
|
||||
const entry: MediaGroupEntry = {
|
||||
messages: [{ msg, ctx }],
|
||||
timer: setTimeout(async () => {
|
||||
mediaGroupBuffer.delete(mediaGroupId);
|
||||
mediaGroupProcessing = mediaGroupProcessing
|
||||
.then(async () => {
|
||||
await processMediaGroup(entry);
|
||||
})
|
||||
.catch(() => undefined);
|
||||
await mediaGroupProcessing;
|
||||
}, mediaGroupTimeoutMs),
|
||||
};
|
||||
mediaGroupBuffer.set(mediaGroupId, entry);
|
||||
}
|
||||
const entry = getOrCreateMediaGroupEntry(mediaGroupId);
|
||||
entry.messages.push({ msg, ctx });
|
||||
scheduleMediaGroupFlush(mediaGroupId, entry);
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
@ -938,7 +957,6 @@ export const registerTelegramHandlers = ({
|
|||
},
|
||||
]
|
||||
: [];
|
||||
const senderId = msg.from?.id ? String(msg.from.id) : "";
|
||||
const conversationKey =
|
||||
resolvedThreadId != null ? `${chatId}:topic:${resolvedThreadId}` : String(chatId);
|
||||
const debounceKey = senderId
|
||||
|
|
|
|||
|
|
@ -56,6 +56,7 @@ import {
|
|||
hasBotMention,
|
||||
resolveTelegramThreadSpec,
|
||||
} from "./bot/helpers.js";
|
||||
import { evaluateTelegramGroupBaseAccess } from "./group-access.js";
|
||||
|
||||
export type TelegramMediaRef = {
|
||||
path: string;
|
||||
|
|
@ -192,15 +193,31 @@ export const buildTelegramMessageContext = async ({
|
|||
storeAllowFrom,
|
||||
});
|
||||
const hasGroupAllowOverride = typeof groupAllowOverride !== "undefined";
|
||||
|
||||
if (isGroup && groupConfig?.enabled === false) {
|
||||
logVerbose(`Blocked telegram group ${chatId} (group disabled)`);
|
||||
return null;
|
||||
}
|
||||
if (isGroup && topicConfig?.enabled === false) {
|
||||
logVerbose(
|
||||
`Blocked telegram topic ${chatId} (${resolvedThreadId ?? "unknown"}) (topic disabled)`,
|
||||
);
|
||||
const senderId = msg.from?.id ? String(msg.from.id) : "";
|
||||
const senderUsername = msg.from?.username ?? "";
|
||||
const baseAccess = evaluateTelegramGroupBaseAccess({
|
||||
isGroup,
|
||||
groupConfig,
|
||||
topicConfig,
|
||||
hasGroupAllowOverride,
|
||||
effectiveGroupAllow,
|
||||
senderId,
|
||||
senderUsername,
|
||||
enforceAllowOverride: true,
|
||||
requireSenderForAllowOverride: false,
|
||||
});
|
||||
if (!baseAccess.allowed) {
|
||||
if (baseAccess.reason === "group-disabled") {
|
||||
logVerbose(`Blocked telegram group ${chatId} (group disabled)`);
|
||||
return null;
|
||||
}
|
||||
if (baseAccess.reason === "topic-disabled") {
|
||||
logVerbose(
|
||||
`Blocked telegram topic ${chatId} (${resolvedThreadId ?? "unknown"}) (topic disabled)`,
|
||||
);
|
||||
return null;
|
||||
}
|
||||
logVerbose(`Blocked telegram group sender ${senderId || "unknown"} (group allowFrom override)`);
|
||||
return null;
|
||||
}
|
||||
|
||||
|
|
@ -320,21 +337,6 @@ export const buildTelegramMessageContext = async ({
|
|||
}
|
||||
|
||||
const botUsername = primaryCtx.me?.username?.toLowerCase();
|
||||
const senderId = msg.from?.id ? String(msg.from.id) : "";
|
||||
const senderUsername = msg.from?.username ?? "";
|
||||
if (isGroup && hasGroupAllowOverride) {
|
||||
const allowed = isSenderAllowed({
|
||||
allow: effectiveGroupAllow,
|
||||
senderId,
|
||||
senderUsername,
|
||||
});
|
||||
if (!allowed) {
|
||||
logVerbose(
|
||||
`Blocked telegram group sender ${senderId || "unknown"} (group allowFrom override)`,
|
||||
);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
const allowForCommands = isGroup ? effectiveGroupAllow : effectiveDmAllow;
|
||||
const senderAllowedForCommands = isSenderAllowed({
|
||||
allow: allowForCommands,
|
||||
|
|
|
|||
|
|
@ -55,6 +55,10 @@ import {
|
|||
resolveTelegramGroupAllowFromContext,
|
||||
resolveTelegramThreadSpec,
|
||||
} from "./bot/helpers.js";
|
||||
import {
|
||||
evaluateTelegramGroupBaseAccess,
|
||||
evaluateTelegramGroupPolicyAccess,
|
||||
} from "./group-access.js";
|
||||
import { buildInlineKeyboard } from "./send.js";
|
||||
|
||||
const EMPTY_RESPONSE_FALLBACK = "No response generated. Please try again.";
|
||||
|
|
@ -172,18 +176,9 @@ async function resolveTelegramCommandAuth(params: {
|
|||
effectiveGroupAllow,
|
||||
hasGroupAllowOverride,
|
||||
} = groupAllowContext;
|
||||
const senderIdRaw = msg.from?.id;
|
||||
const senderId = senderIdRaw ? String(senderIdRaw) : "";
|
||||
const senderId = msg.from?.id ? String(msg.from.id) : "";
|
||||
const senderUsername = msg.from?.username ?? "";
|
||||
|
||||
const isGroupSenderAllowed = () =>
|
||||
senderIdRaw != null &&
|
||||
isSenderAllowed({
|
||||
allow: effectiveGroupAllow,
|
||||
senderId: String(senderIdRaw),
|
||||
senderUsername,
|
||||
});
|
||||
|
||||
const rejectNotAuthorized = async () => {
|
||||
await withTelegramApiErrorLogging({
|
||||
operation: "sendMessage",
|
||||
|
|
@ -192,43 +187,68 @@ async function resolveTelegramCommandAuth(params: {
|
|||
return null;
|
||||
};
|
||||
|
||||
if (isGroup && groupConfig?.enabled === false) {
|
||||
await withTelegramApiErrorLogging({
|
||||
operation: "sendMessage",
|
||||
fn: () => bot.api.sendMessage(chatId, "This group is disabled."),
|
||||
});
|
||||
return null;
|
||||
}
|
||||
if (isGroup && topicConfig?.enabled === false) {
|
||||
await withTelegramApiErrorLogging({
|
||||
operation: "sendMessage",
|
||||
fn: () => bot.api.sendMessage(chatId, "This topic is disabled."),
|
||||
});
|
||||
return null;
|
||||
}
|
||||
if (requireAuth && isGroup && hasGroupAllowOverride) {
|
||||
if (!isGroupSenderAllowed()) {
|
||||
return await rejectNotAuthorized();
|
||||
const baseAccess = evaluateTelegramGroupBaseAccess({
|
||||
isGroup,
|
||||
groupConfig,
|
||||
topicConfig,
|
||||
hasGroupAllowOverride,
|
||||
effectiveGroupAllow,
|
||||
senderId,
|
||||
senderUsername,
|
||||
enforceAllowOverride: requireAuth,
|
||||
requireSenderForAllowOverride: true,
|
||||
});
|
||||
if (!baseAccess.allowed) {
|
||||
if (baseAccess.reason === "group-disabled") {
|
||||
await withTelegramApiErrorLogging({
|
||||
operation: "sendMessage",
|
||||
fn: () => bot.api.sendMessage(chatId, "This group is disabled."),
|
||||
});
|
||||
return null;
|
||||
}
|
||||
if (baseAccess.reason === "topic-disabled") {
|
||||
await withTelegramApiErrorLogging({
|
||||
operation: "sendMessage",
|
||||
fn: () => bot.api.sendMessage(chatId, "This topic is disabled."),
|
||||
});
|
||||
return null;
|
||||
}
|
||||
return await rejectNotAuthorized();
|
||||
}
|
||||
|
||||
if (isGroup && useAccessGroups) {
|
||||
const defaultGroupPolicy = cfg.channels?.defaults?.groupPolicy;
|
||||
const groupPolicy = telegramCfg.groupPolicy ?? defaultGroupPolicy ?? "open";
|
||||
if (groupPolicy === "disabled") {
|
||||
const policyAccess = evaluateTelegramGroupPolicyAccess({
|
||||
isGroup,
|
||||
chatId,
|
||||
cfg,
|
||||
telegramCfg,
|
||||
topicConfig,
|
||||
groupConfig,
|
||||
effectiveGroupAllow,
|
||||
senderId,
|
||||
senderUsername,
|
||||
resolveGroupPolicy,
|
||||
enforcePolicy: useAccessGroups,
|
||||
useTopicAndGroupOverrides: false,
|
||||
enforceAllowlistAuthorization: requireAuth,
|
||||
allowEmptyAllowlistEntries: true,
|
||||
requireSenderForAllowlistAuthorization: true,
|
||||
checkChatAllowlist: useAccessGroups,
|
||||
});
|
||||
if (!policyAccess.allowed) {
|
||||
if (policyAccess.reason === "group-policy-disabled") {
|
||||
await withTelegramApiErrorLogging({
|
||||
operation: "sendMessage",
|
||||
fn: () => bot.api.sendMessage(chatId, "Telegram group commands are disabled."),
|
||||
});
|
||||
return null;
|
||||
}
|
||||
if (groupPolicy === "allowlist" && requireAuth) {
|
||||
if (!isGroupSenderAllowed()) {
|
||||
return await rejectNotAuthorized();
|
||||
}
|
||||
if (
|
||||
policyAccess.reason === "group-policy-allowlist-no-sender" ||
|
||||
policyAccess.reason === "group-policy-allowlist-unauthorized"
|
||||
) {
|
||||
return await rejectNotAuthorized();
|
||||
}
|
||||
const groupAllowlist = resolveGroupPolicy(chatId);
|
||||
if (groupAllowlist.allowlistEnabled && !groupAllowlist.allowed) {
|
||||
if (policyAccess.reason === "group-chat-not-allowed") {
|
||||
await withTelegramApiErrorLogging({
|
||||
operation: "sendMessage",
|
||||
fn: () => bot.api.sendMessage(chatId, "This group is not allowed."),
|
||||
|
|
@ -357,6 +377,41 @@ export const registerTelegramNativeCommands = ({
|
|||
// Keep hidden commands callable by registering handlers for the full catalog.
|
||||
syncTelegramMenuCommands({ bot, runtime, commandsToRegister });
|
||||
|
||||
const resolveCommandRuntimeContext = (params: {
|
||||
msg: NonNullable<TelegramNativeCommandContext["message"]>;
|
||||
isGroup: boolean;
|
||||
isForum: boolean;
|
||||
resolvedThreadId?: number;
|
||||
}) => {
|
||||
const { msg, isGroup, isForum, resolvedThreadId } = params;
|
||||
const chatId = msg.chat.id;
|
||||
const messageThreadId = (msg as { message_thread_id?: number }).message_thread_id;
|
||||
const threadSpec = resolveTelegramThreadSpec({
|
||||
isGroup,
|
||||
isForum,
|
||||
messageThreadId,
|
||||
});
|
||||
const parentPeer = buildTelegramParentPeer({ isGroup, resolvedThreadId, chatId });
|
||||
const route = resolveAgentRoute({
|
||||
cfg,
|
||||
channel: "telegram",
|
||||
accountId,
|
||||
peer: {
|
||||
kind: isGroup ? "group" : "direct",
|
||||
id: isGroup ? buildTelegramGroupPeerId(chatId, resolvedThreadId) : String(chatId),
|
||||
},
|
||||
parentPeer,
|
||||
});
|
||||
const mediaLocalRoots = getAgentScopedMediaLocalRoots(cfg, route.agentId);
|
||||
const tableMode = resolveMarkdownTableMode({
|
||||
cfg,
|
||||
channel: "telegram",
|
||||
accountId: route.accountId,
|
||||
});
|
||||
const chunkMode = resolveChunkMode(cfg, "telegram", route.accountId);
|
||||
return { chatId, threadSpec, route, mediaLocalRoots, tableMode, chunkMode };
|
||||
};
|
||||
|
||||
if (commandsToRegister.length > 0 || pluginCatalog.commands.length > 0) {
|
||||
if (typeof (bot as unknown as { command?: unknown }).command !== "function") {
|
||||
logVerbose("telegram: bot.command unavailable; skipping native handlers");
|
||||
|
|
@ -397,12 +452,13 @@ export const registerTelegramNativeCommands = ({
|
|||
topicConfig,
|
||||
commandAuthorized,
|
||||
} = auth;
|
||||
const messageThreadId = (msg as { message_thread_id?: number }).message_thread_id;
|
||||
const threadSpec = resolveTelegramThreadSpec({
|
||||
isGroup,
|
||||
isForum,
|
||||
messageThreadId,
|
||||
});
|
||||
const { threadSpec, route, mediaLocalRoots, tableMode, chunkMode } =
|
||||
resolveCommandRuntimeContext({
|
||||
msg,
|
||||
isGroup,
|
||||
isForum,
|
||||
resolvedThreadId,
|
||||
});
|
||||
const threadParams = buildTelegramThreadParams(threadSpec) ?? {};
|
||||
|
||||
const commandDefinition = findCommandByNativeName(command.name, "telegram");
|
||||
|
|
@ -455,18 +511,6 @@ export const registerTelegramNativeCommands = ({
|
|||
});
|
||||
return;
|
||||
}
|
||||
const parentPeer = buildTelegramParentPeer({ isGroup, resolvedThreadId, chatId });
|
||||
const route = resolveAgentRoute({
|
||||
cfg,
|
||||
channel: "telegram",
|
||||
accountId,
|
||||
peer: {
|
||||
kind: isGroup ? "group" : "direct",
|
||||
id: isGroup ? buildTelegramGroupPeerId(chatId, resolvedThreadId) : String(chatId),
|
||||
},
|
||||
parentPeer,
|
||||
});
|
||||
const mediaLocalRoots = getAgentScopedMediaLocalRoots(cfg, route.agentId);
|
||||
const baseSessionKey = route.sessionKey;
|
||||
// DMs: use raw messageThreadId for thread sessions (not resolvedThreadId which is for forums)
|
||||
const dmThreadId = threadSpec.scope === "dm" ? threadSpec.id : undefined;
|
||||
|
|
@ -478,11 +522,6 @@ export const registerTelegramNativeCommands = ({
|
|||
})
|
||||
: null;
|
||||
const sessionKey = threadKeys?.sessionKey ?? baseSessionKey;
|
||||
const tableMode = resolveMarkdownTableMode({
|
||||
cfg,
|
||||
channel: "telegram",
|
||||
accountId: route.accountId,
|
||||
});
|
||||
const skillFilter = firstDefined(topicConfig?.skills, groupConfig?.skills);
|
||||
const systemPromptParts = [
|
||||
groupConfig?.systemPrompt?.trim() || null,
|
||||
|
|
@ -530,7 +569,6 @@ export const registerTelegramNativeCommands = ({
|
|||
typeof telegramCfg.blockStreaming === "boolean"
|
||||
? !telegramCfg.blockStreaming
|
||||
: undefined;
|
||||
const chunkMode = resolveChunkMode(cfg, "telegram", route.accountId);
|
||||
|
||||
const deliveryState = {
|
||||
delivered: false,
|
||||
|
|
@ -640,24 +678,13 @@ export const registerTelegramNativeCommands = ({
|
|||
return;
|
||||
}
|
||||
const { senderId, commandAuthorized, isGroup, isForum, resolvedThreadId } = auth;
|
||||
const messageThreadId = (msg as { message_thread_id?: number }).message_thread_id;
|
||||
const threadSpec = resolveTelegramThreadSpec({
|
||||
isGroup,
|
||||
isForum,
|
||||
messageThreadId,
|
||||
});
|
||||
const parentPeer = buildTelegramParentPeer({ isGroup, resolvedThreadId, chatId });
|
||||
const route = resolveAgentRoute({
|
||||
cfg,
|
||||
channel: "telegram",
|
||||
accountId,
|
||||
peer: {
|
||||
kind: isGroup ? "group" : "direct",
|
||||
id: isGroup ? buildTelegramGroupPeerId(chatId, resolvedThreadId) : String(chatId),
|
||||
},
|
||||
parentPeer,
|
||||
});
|
||||
const mediaLocalRoots = getAgentScopedMediaLocalRoots(cfg, route.agentId);
|
||||
const { threadSpec, mediaLocalRoots, tableMode, chunkMode } =
|
||||
resolveCommandRuntimeContext({
|
||||
msg,
|
||||
isGroup,
|
||||
isForum,
|
||||
resolvedThreadId,
|
||||
});
|
||||
const from = isGroup
|
||||
? buildTelegramGroupFrom(chatId, threadSpec.id)
|
||||
: `telegram:${chatId}`;
|
||||
|
|
@ -676,12 +703,6 @@ export const registerTelegramNativeCommands = ({
|
|||
accountId,
|
||||
messageThreadId: threadSpec.id,
|
||||
});
|
||||
const tableMode = resolveMarkdownTableMode({
|
||||
cfg,
|
||||
channel: "telegram",
|
||||
accountId: route.accountId,
|
||||
});
|
||||
const chunkMode = resolveChunkMode(cfg, "telegram", route.accountId);
|
||||
|
||||
await deliverReplies({
|
||||
replies: [result],
|
||||
|
|
|
|||
|
|
@ -1,57 +0,0 @@
|
|||
import { detectMime } from "../media/mime.js";
|
||||
import { type SavedMedia, saveMediaBuffer } from "../media/store.js";
|
||||
|
||||
export type TelegramFileInfo = {
|
||||
file_id: string;
|
||||
file_unique_id?: string;
|
||||
file_size?: number;
|
||||
file_path?: string;
|
||||
};
|
||||
|
||||
export async function getTelegramFile(
|
||||
token: string,
|
||||
fileId: string,
|
||||
timeoutMs = 30_000,
|
||||
): Promise<TelegramFileInfo> {
|
||||
const res = await fetch(
|
||||
`https://api.telegram.org/bot${token}/getFile?file_id=${encodeURIComponent(fileId)}`,
|
||||
{ signal: AbortSignal.timeout(timeoutMs) },
|
||||
);
|
||||
if (!res.ok) {
|
||||
throw new Error(`getFile failed: ${res.status} ${res.statusText}`);
|
||||
}
|
||||
const json = (await res.json()) as { ok: boolean; result?: TelegramFileInfo };
|
||||
if (!json.ok || !json.result?.file_path) {
|
||||
throw new Error("getFile returned no file_path");
|
||||
}
|
||||
return json.result;
|
||||
}
|
||||
|
||||
export async function downloadTelegramFile(
|
||||
token: string,
|
||||
info: TelegramFileInfo,
|
||||
maxBytes?: number,
|
||||
timeoutMs = 60_000,
|
||||
): Promise<SavedMedia> {
|
||||
if (!info.file_path) {
|
||||
throw new Error("file_path missing");
|
||||
}
|
||||
const url = `https://api.telegram.org/file/bot${token}/${info.file_path}`;
|
||||
const res = await fetch(url, { signal: AbortSignal.timeout(timeoutMs) });
|
||||
if (!res.ok || !res.body) {
|
||||
throw new Error(`Failed to download telegram file: HTTP ${res.status}`);
|
||||
}
|
||||
const array = Buffer.from(await res.arrayBuffer());
|
||||
const mime = await detectMime({
|
||||
buffer: array,
|
||||
headerMime: res.headers.get("content-type"),
|
||||
filePath: info.file_path,
|
||||
});
|
||||
// save with inbound subdir
|
||||
const saved = await saveMediaBuffer(array, mime, "inbound", maxBytes, info.file_path);
|
||||
// Ensure extension matches mime if possible
|
||||
if (!saved.contentType && mime) {
|
||||
saved.contentType = mime;
|
||||
}
|
||||
return saved;
|
||||
}
|
||||
|
|
@ -1,5 +1,4 @@
|
|||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import { downloadTelegramFile, getTelegramFile, type TelegramFileInfo } from "./download.js";
|
||||
import { resetTelegramFetchStateForTests, resolveTelegramFetch } from "./fetch.js";
|
||||
|
||||
const setDefaultAutoSelectFamily = vi.hoisted(() => vi.fn());
|
||||
|
|
@ -61,36 +60,3 @@ describe("resolveTelegramFetch", () => {
|
|||
expect(setDefaultAutoSelectFamily).toHaveBeenCalledWith(false);
|
||||
});
|
||||
});
|
||||
|
||||
describe("telegram download", () => {
|
||||
it("fetches file info", async () => {
|
||||
const json = vi.fn().mockResolvedValue({ ok: true, result: { file_path: "photos/1.jpg" } });
|
||||
vi.spyOn(globalThis, "fetch" as never).mockResolvedValueOnce({
|
||||
ok: true,
|
||||
status: 200,
|
||||
statusText: "OK",
|
||||
json,
|
||||
} as Response);
|
||||
const info = await getTelegramFile("tok", "fid");
|
||||
expect(info.file_path).toBe("photos/1.jpg");
|
||||
});
|
||||
|
||||
it("downloads and saves", async () => {
|
||||
const info: TelegramFileInfo = {
|
||||
file_id: "fid",
|
||||
file_path: "photos/1.jpg",
|
||||
};
|
||||
const arrayBuffer = async () => new Uint8Array([1, 2, 3, 4]).buffer;
|
||||
vi.spyOn(globalThis, "fetch" as never).mockResolvedValueOnce({
|
||||
ok: true,
|
||||
status: 200,
|
||||
statusText: "OK",
|
||||
body: true,
|
||||
arrayBuffer,
|
||||
headers: { get: () => "image/jpeg" },
|
||||
} as Response);
|
||||
const saved = await downloadTelegramFile("tok", info, 1024 * 1024);
|
||||
expect(saved.path).toBeTruthy();
|
||||
expect(saved.contentType).toBe("image/jpeg");
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -0,0 +1,141 @@
|
|||
import type { OpenClawConfig } from "../config/config.js";
|
||||
import type { ChannelGroupPolicy } from "../config/group-policy.js";
|
||||
import type {
|
||||
TelegramAccountConfig,
|
||||
TelegramGroupConfig,
|
||||
TelegramTopicConfig,
|
||||
} from "../config/types.js";
|
||||
import { isSenderAllowed, type NormalizedAllowFrom } from "./bot-access.js";
|
||||
import { firstDefined } from "./bot-access.js";
|
||||
|
||||
export type TelegramGroupBaseBlockReason =
|
||||
| "group-disabled"
|
||||
| "topic-disabled"
|
||||
| "group-override-unauthorized";
|
||||
|
||||
export type TelegramGroupBaseAccessResult =
|
||||
| { allowed: true }
|
||||
| { allowed: false; reason: TelegramGroupBaseBlockReason };
|
||||
|
||||
export const evaluateTelegramGroupBaseAccess = (params: {
|
||||
isGroup: boolean;
|
||||
groupConfig?: TelegramGroupConfig;
|
||||
topicConfig?: TelegramTopicConfig;
|
||||
hasGroupAllowOverride: boolean;
|
||||
effectiveGroupAllow: NormalizedAllowFrom;
|
||||
senderId?: string;
|
||||
senderUsername?: string;
|
||||
enforceAllowOverride: boolean;
|
||||
requireSenderForAllowOverride: boolean;
|
||||
}): TelegramGroupBaseAccessResult => {
|
||||
if (!params.isGroup) {
|
||||
return { allowed: true };
|
||||
}
|
||||
if (params.groupConfig?.enabled === false) {
|
||||
return { allowed: false, reason: "group-disabled" };
|
||||
}
|
||||
if (params.topicConfig?.enabled === false) {
|
||||
return { allowed: false, reason: "topic-disabled" };
|
||||
}
|
||||
if (!params.enforceAllowOverride || !params.hasGroupAllowOverride) {
|
||||
return { allowed: true };
|
||||
}
|
||||
|
||||
const senderId = params.senderId ?? "";
|
||||
if (params.requireSenderForAllowOverride && !senderId) {
|
||||
return { allowed: false, reason: "group-override-unauthorized" };
|
||||
}
|
||||
|
||||
const allowed = isSenderAllowed({
|
||||
allow: params.effectiveGroupAllow,
|
||||
senderId,
|
||||
senderUsername: params.senderUsername ?? "",
|
||||
});
|
||||
if (!allowed) {
|
||||
return { allowed: false, reason: "group-override-unauthorized" };
|
||||
}
|
||||
return { allowed: true };
|
||||
};
|
||||
|
||||
export type TelegramGroupPolicyBlockReason =
|
||||
| "group-policy-disabled"
|
||||
| "group-policy-allowlist-no-sender"
|
||||
| "group-policy-allowlist-empty"
|
||||
| "group-policy-allowlist-unauthorized"
|
||||
| "group-chat-not-allowed";
|
||||
|
||||
export type TelegramGroupPolicyAccessResult =
|
||||
| { allowed: true; groupPolicy: "open" | "disabled" | "allowlist" }
|
||||
| {
|
||||
allowed: false;
|
||||
reason: TelegramGroupPolicyBlockReason;
|
||||
groupPolicy: "open" | "disabled" | "allowlist";
|
||||
};
|
||||
|
||||
export const evaluateTelegramGroupPolicyAccess = (params: {
|
||||
isGroup: boolean;
|
||||
chatId: string | number;
|
||||
cfg: OpenClawConfig;
|
||||
telegramCfg: TelegramAccountConfig;
|
||||
topicConfig?: TelegramTopicConfig;
|
||||
groupConfig?: TelegramGroupConfig;
|
||||
effectiveGroupAllow: NormalizedAllowFrom;
|
||||
senderId?: string;
|
||||
senderUsername?: string;
|
||||
resolveGroupPolicy: (chatId: string | number) => ChannelGroupPolicy;
|
||||
enforcePolicy: boolean;
|
||||
useTopicAndGroupOverrides: boolean;
|
||||
enforceAllowlistAuthorization: boolean;
|
||||
allowEmptyAllowlistEntries: boolean;
|
||||
requireSenderForAllowlistAuthorization: boolean;
|
||||
checkChatAllowlist: boolean;
|
||||
}): TelegramGroupPolicyAccessResult => {
|
||||
const fallbackPolicy =
|
||||
firstDefined(
|
||||
params.telegramCfg.groupPolicy,
|
||||
params.cfg.channels?.defaults?.groupPolicy,
|
||||
"open",
|
||||
) ?? "open";
|
||||
const groupPolicy = params.useTopicAndGroupOverrides
|
||||
? (firstDefined(
|
||||
params.topicConfig?.groupPolicy,
|
||||
params.groupConfig?.groupPolicy,
|
||||
params.telegramCfg.groupPolicy,
|
||||
params.cfg.channels?.defaults?.groupPolicy,
|
||||
"open",
|
||||
) ?? "open")
|
||||
: fallbackPolicy;
|
||||
|
||||
if (!params.isGroup || !params.enforcePolicy) {
|
||||
return { allowed: true, groupPolicy };
|
||||
}
|
||||
if (groupPolicy === "disabled") {
|
||||
return { allowed: false, reason: "group-policy-disabled", groupPolicy };
|
||||
}
|
||||
if (groupPolicy === "allowlist" && params.enforceAllowlistAuthorization) {
|
||||
const senderId = params.senderId ?? "";
|
||||
if (params.requireSenderForAllowlistAuthorization && !senderId) {
|
||||
return { allowed: false, reason: "group-policy-allowlist-no-sender", groupPolicy };
|
||||
}
|
||||
if (!params.allowEmptyAllowlistEntries && !params.effectiveGroupAllow.hasEntries) {
|
||||
return { allowed: false, reason: "group-policy-allowlist-empty", groupPolicy };
|
||||
}
|
||||
const senderUsername = params.senderUsername ?? "";
|
||||
if (
|
||||
!isSenderAllowed({
|
||||
allow: params.effectiveGroupAllow,
|
||||
senderId,
|
||||
senderUsername,
|
||||
})
|
||||
) {
|
||||
return { allowed: false, reason: "group-policy-allowlist-unauthorized", groupPolicy };
|
||||
}
|
||||
}
|
||||
if (params.checkChatAllowlist) {
|
||||
const groupAllowlist = params.resolveGroupPolicy(params.chatId);
|
||||
if (groupAllowlist.allowlistEnabled && !groupAllowlist.allowed) {
|
||||
return { allowed: false, reason: "group-chat-not-allowed", groupPolicy };
|
||||
}
|
||||
}
|
||||
return { allowed: true, groupPolicy };
|
||||
};
|
||||
|
|
@ -1,4 +0,0 @@
|
|||
export { createTelegramBot, createTelegramWebhookCallback } from "./bot.js";
|
||||
export { monitorTelegramProvider } from "./monitor.js";
|
||||
export { reactMessageTelegram, sendMessageTelegram, sendPollTelegram } from "./send.js";
|
||||
export { startTelegramWebhook } from "./webhook.js";
|
||||
|
|
@ -63,6 +63,11 @@ type TelegramSendResult = {
|
|||
chatId: string;
|
||||
};
|
||||
|
||||
type TelegramMessageLike = {
|
||||
message_id?: number;
|
||||
chat?: { id?: string | number };
|
||||
};
|
||||
|
||||
type TelegramReactionOpts = {
|
||||
token?: string;
|
||||
accountId?: string;
|
||||
|
|
@ -76,6 +81,7 @@ const PARSE_ERR_RE = /can't parse entities|parse entities|find end of the entity
|
|||
const THREAD_NOT_FOUND_RE = /400:\s*Bad Request:\s*message thread not found/i;
|
||||
const MESSAGE_NOT_MODIFIED_RE =
|
||||
/400:\s*Bad Request:\s*message is not modified|MESSAGE_NOT_MODIFIED/i;
|
||||
const CHAT_NOT_FOUND_RE = /400: Bad Request: chat not found/i;
|
||||
const diagLogger = createSubsystemLogger("telegram/diagnostic");
|
||||
|
||||
function createTelegramHttpLogger(cfg: ReturnType<typeof loadConfig>) {
|
||||
|
|
@ -213,6 +219,123 @@ function removeMessageThreadIdParam(
|
|||
return Object.keys(next).length > 0 ? next : undefined;
|
||||
}
|
||||
|
||||
type TelegramApiContext = {
|
||||
cfg: ReturnType<typeof loadConfig>;
|
||||
account: ResolvedTelegramAccount;
|
||||
api: Bot["api"];
|
||||
};
|
||||
|
||||
function resolveTelegramApiContext(opts: {
|
||||
token?: string;
|
||||
accountId?: string;
|
||||
api?: Bot["api"];
|
||||
cfg?: ReturnType<typeof loadConfig>;
|
||||
}): TelegramApiContext {
|
||||
const cfg = opts.cfg ?? loadConfig();
|
||||
const account = resolveTelegramAccount({
|
||||
cfg,
|
||||
accountId: opts.accountId,
|
||||
});
|
||||
const token = resolveToken(opts.token, account);
|
||||
const client = resolveTelegramClientOptions(account);
|
||||
const api = opts.api ?? new Bot(token, client ? { client } : undefined).api;
|
||||
return { cfg, account, api };
|
||||
}
|
||||
|
||||
type TelegramRequestWithDiag = <T>(
|
||||
fn: () => Promise<T>,
|
||||
label?: string,
|
||||
options?: { shouldLog?: (err: unknown) => boolean },
|
||||
) => Promise<T>;
|
||||
|
||||
function createTelegramRequestWithDiag(params: {
|
||||
cfg: ReturnType<typeof loadConfig>;
|
||||
account: ResolvedTelegramAccount;
|
||||
retry?: RetryConfig;
|
||||
verbose?: boolean;
|
||||
shouldRetry?: (err: unknown) => boolean;
|
||||
useApiErrorLogging?: boolean;
|
||||
}): TelegramRequestWithDiag {
|
||||
const request = createTelegramRetryRunner({
|
||||
retry: params.retry,
|
||||
configRetry: params.account.config.retry,
|
||||
verbose: params.verbose,
|
||||
...(params.shouldRetry ? { shouldRetry: params.shouldRetry } : {}),
|
||||
});
|
||||
const logHttpError = createTelegramHttpLogger(params.cfg);
|
||||
return <T>(
|
||||
fn: () => Promise<T>,
|
||||
label?: string,
|
||||
options?: { shouldLog?: (err: unknown) => boolean },
|
||||
) => {
|
||||
const runRequest = () => request(fn, label);
|
||||
const call =
|
||||
params.useApiErrorLogging === false
|
||||
? runRequest()
|
||||
: withTelegramApiErrorLogging({
|
||||
operation: label ?? "request",
|
||||
fn: runRequest,
|
||||
...(options?.shouldLog ? { shouldLog: options.shouldLog } : {}),
|
||||
});
|
||||
return call.catch((err) => {
|
||||
logHttpError(label ?? "request", err);
|
||||
throw err;
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
function wrapTelegramChatNotFoundError(err: unknown, params: { chatId: string; input: string }) {
|
||||
if (!CHAT_NOT_FOUND_RE.test(formatErrorMessage(err))) {
|
||||
return err;
|
||||
}
|
||||
return new Error(
|
||||
[
|
||||
`Telegram send failed: chat not found (chat_id=${params.chatId}).`,
|
||||
"Likely: bot not started in DM, bot removed from group/channel, group migrated (new -100… id), or wrong bot token.",
|
||||
`Input was: ${JSON.stringify(params.input)}.`,
|
||||
].join(" "),
|
||||
);
|
||||
}
|
||||
|
||||
async function withTelegramThreadFallback<T>(
|
||||
params: Record<string, unknown> | undefined,
|
||||
label: string,
|
||||
verbose: boolean | undefined,
|
||||
attempt: (
|
||||
effectiveParams: Record<string, unknown> | undefined,
|
||||
effectiveLabel: string,
|
||||
) => Promise<T>,
|
||||
): Promise<T> {
|
||||
try {
|
||||
return await attempt(params, label);
|
||||
} catch (err) {
|
||||
if (!hasMessageThreadIdParam(params) || !isTelegramThreadNotFoundError(err)) {
|
||||
throw err;
|
||||
}
|
||||
if (verbose) {
|
||||
console.warn(
|
||||
`telegram ${label} failed with message_thread_id, retrying without thread: ${formatErrorMessage(err)}`,
|
||||
);
|
||||
}
|
||||
const retriedParams = removeMessageThreadIdParam(params);
|
||||
return await attempt(retriedParams, `${label}-threadless`);
|
||||
}
|
||||
}
|
||||
|
||||
function createRequestWithChatNotFound(params: {
|
||||
requestWithDiag: TelegramRequestWithDiag;
|
||||
chatId: string;
|
||||
input: string;
|
||||
}) {
|
||||
return async <T>(fn: () => Promise<T>, label: string) =>
|
||||
params.requestWithDiag(fn, label).catch((err) => {
|
||||
throw wrapTelegramChatNotFoundError(err, {
|
||||
chatId: params.chatId,
|
||||
input: params.input,
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
export function buildInlineKeyboard(
|
||||
buttons?: TelegramSendOpts["buttons"],
|
||||
): InlineKeyboardMarkup | undefined {
|
||||
|
|
@ -242,18 +365,9 @@ export async function sendMessageTelegram(
|
|||
text: string,
|
||||
opts: TelegramSendOpts = {},
|
||||
): Promise<TelegramSendResult> {
|
||||
const cfg = loadConfig();
|
||||
const account = resolveTelegramAccount({
|
||||
cfg,
|
||||
accountId: opts.accountId,
|
||||
});
|
||||
const token = resolveToken(opts.token, account);
|
||||
const { cfg, account, api } = resolveTelegramApiContext(opts);
|
||||
const target = parseTelegramTarget(to);
|
||||
const chatId = normalizeChatId(target.chatId);
|
||||
// Use provided api or create a new Bot instance. The nullish coalescing
|
||||
// operator ensures api is always defined (Bot.api is always non-null).
|
||||
const client = resolveTelegramClientOptions(account);
|
||||
const api = opts.api ?? new Bot(token, client ? { client } : undefined).api;
|
||||
const mediaUrl = opts.mediaUrl?.trim();
|
||||
const replyMarkup = buildInlineKeyboard(opts.buttons);
|
||||
|
||||
|
|
@ -277,57 +391,18 @@ export async function sendMessageTelegram(
|
|||
}
|
||||
}
|
||||
const hasThreadParams = Object.keys(threadParams).length > 0;
|
||||
const request = createTelegramRetryRunner({
|
||||
const requestWithDiag = createTelegramRequestWithDiag({
|
||||
cfg,
|
||||
account,
|
||||
retry: opts.retry,
|
||||
configRetry: account.config.retry,
|
||||
verbose: opts.verbose,
|
||||
shouldRetry: (err) => isRecoverableTelegramNetworkError(err, { context: "send" }),
|
||||
});
|
||||
const logHttpError = createTelegramHttpLogger(cfg);
|
||||
const requestWithDiag = <T>(fn: () => Promise<T>, label?: string) =>
|
||||
withTelegramApiErrorLogging({
|
||||
operation: label ?? "request",
|
||||
fn: () => request(fn, label),
|
||||
}).catch((err) => {
|
||||
logHttpError(label ?? "request", err);
|
||||
throw err;
|
||||
});
|
||||
const wrapChatNotFound = (err: unknown) => {
|
||||
if (!/400: Bad Request: chat not found/i.test(formatErrorMessage(err))) {
|
||||
return err;
|
||||
}
|
||||
return new Error(
|
||||
[
|
||||
`Telegram send failed: chat not found (chat_id=${chatId}).`,
|
||||
"Likely: bot not started in DM, bot removed from group/channel, group migrated (new -100… id), or wrong bot token.",
|
||||
`Input was: ${JSON.stringify(to)}.`,
|
||||
].join(" "),
|
||||
);
|
||||
};
|
||||
|
||||
const sendWithThreadFallback = async <T>(
|
||||
params: Record<string, unknown> | undefined,
|
||||
label: string,
|
||||
attempt: (
|
||||
effectiveParams: Record<string, unknown> | undefined,
|
||||
effectiveLabel: string,
|
||||
) => Promise<T>,
|
||||
): Promise<T> => {
|
||||
try {
|
||||
return await attempt(params, label);
|
||||
} catch (err) {
|
||||
if (!hasMessageThreadIdParam(params) || !isTelegramThreadNotFoundError(err)) {
|
||||
throw err;
|
||||
}
|
||||
if (opts.verbose) {
|
||||
console.warn(
|
||||
`telegram ${label} failed with message_thread_id, retrying without thread: ${formatErrorMessage(err)}`,
|
||||
);
|
||||
}
|
||||
const retriedParams = removeMessageThreadIdParam(params);
|
||||
return await attempt(retriedParams, `${label}-threadless`);
|
||||
}
|
||||
};
|
||||
const requestWithChatNotFound = createRequestWithChatNotFound({
|
||||
requestWithDiag,
|
||||
chatId,
|
||||
input: to,
|
||||
});
|
||||
|
||||
const textMode = opts.textMode ?? "markdown";
|
||||
const tableMode = resolveMarkdownTableMode({
|
||||
|
|
@ -346,48 +421,51 @@ export async function sendMessageTelegram(
|
|||
params?: Record<string, unknown>,
|
||||
fallbackText?: string,
|
||||
) => {
|
||||
return await sendWithThreadFallback(params, "message", async (effectiveParams, label) => {
|
||||
const htmlText = renderHtmlText(rawText);
|
||||
const baseParams = effectiveParams ? { ...effectiveParams } : {};
|
||||
if (linkPreviewOptions) {
|
||||
baseParams.link_preview_options = linkPreviewOptions;
|
||||
}
|
||||
const hasBaseParams = Object.keys(baseParams).length > 0;
|
||||
const sendParams = {
|
||||
parse_mode: "HTML" as const,
|
||||
...baseParams,
|
||||
...(opts.silent === true ? { disable_notification: true } : {}),
|
||||
};
|
||||
const res = await requestWithDiag(
|
||||
() =>
|
||||
api.sendMessage(chatId, htmlText, sendParams as Parameters<typeof api.sendMessage>[2]),
|
||||
label,
|
||||
).catch(async (err) => {
|
||||
// Telegram rejects malformed HTML (e.g., unsupported tags or entities).
|
||||
// When that happens, fall back to plain text so the message still delivers.
|
||||
const errText = formatErrorMessage(err);
|
||||
if (PARSE_ERR_RE.test(errText)) {
|
||||
if (opts.verbose) {
|
||||
console.warn(`telegram HTML parse failed, retrying as plain text: ${errText}`);
|
||||
}
|
||||
const fallback = fallbackText ?? rawText;
|
||||
const plainParams = hasBaseParams
|
||||
? (baseParams as Parameters<typeof api.sendMessage>[2])
|
||||
: undefined;
|
||||
return await requestWithDiag(
|
||||
() =>
|
||||
plainParams
|
||||
? api.sendMessage(chatId, fallback, plainParams)
|
||||
: api.sendMessage(chatId, fallback),
|
||||
`${label}-plain`,
|
||||
).catch((err2) => {
|
||||
throw wrapChatNotFound(err2);
|
||||
});
|
||||
return await withTelegramThreadFallback(
|
||||
params,
|
||||
"message",
|
||||
opts.verbose,
|
||||
async (effectiveParams, label) => {
|
||||
const htmlText = renderHtmlText(rawText);
|
||||
const baseParams = effectiveParams ? { ...effectiveParams } : {};
|
||||
if (linkPreviewOptions) {
|
||||
baseParams.link_preview_options = linkPreviewOptions;
|
||||
}
|
||||
throw wrapChatNotFound(err);
|
||||
});
|
||||
return res;
|
||||
});
|
||||
const hasBaseParams = Object.keys(baseParams).length > 0;
|
||||
const sendParams = {
|
||||
parse_mode: "HTML" as const,
|
||||
...baseParams,
|
||||
...(opts.silent === true ? { disable_notification: true } : {}),
|
||||
};
|
||||
const res = await requestWithChatNotFound(
|
||||
() =>
|
||||
api.sendMessage(chatId, htmlText, sendParams as Parameters<typeof api.sendMessage>[2]),
|
||||
label,
|
||||
).catch(async (err) => {
|
||||
// Telegram rejects malformed HTML (e.g., unsupported tags or entities).
|
||||
// When that happens, fall back to plain text so the message still delivers.
|
||||
const errText = formatErrorMessage(err);
|
||||
if (PARSE_ERR_RE.test(errText)) {
|
||||
if (opts.verbose) {
|
||||
console.warn(`telegram HTML parse failed, retrying as plain text: ${errText}`);
|
||||
}
|
||||
const fallback = fallbackText ?? rawText;
|
||||
const plainParams = hasBaseParams
|
||||
? (baseParams as Parameters<typeof api.sendMessage>[2])
|
||||
: undefined;
|
||||
return await requestWithChatNotFound(
|
||||
() =>
|
||||
plainParams
|
||||
? api.sendMessage(chatId, fallback, plainParams)
|
||||
: api.sendMessage(chatId, fallback),
|
||||
`${label}-plain`,
|
||||
);
|
||||
}
|
||||
throw err;
|
||||
});
|
||||
return res;
|
||||
},
|
||||
);
|
||||
};
|
||||
|
||||
if (mediaUrl) {
|
||||
|
|
@ -429,124 +507,105 @@ export async function sendMessageTelegram(
|
|||
...baseMediaParams,
|
||||
...(opts.silent === true ? { disable_notification: true } : {}),
|
||||
};
|
||||
let result:
|
||||
| Awaited<ReturnType<typeof api.sendPhoto>>
|
||||
| Awaited<ReturnType<typeof api.sendVideo>>
|
||||
| Awaited<ReturnType<typeof api.sendVideoNote>>
|
||||
| Awaited<ReturnType<typeof api.sendAudio>>
|
||||
| Awaited<ReturnType<typeof api.sendVoice>>
|
||||
| Awaited<ReturnType<typeof api.sendAnimation>>
|
||||
| Awaited<ReturnType<typeof api.sendDocument>>;
|
||||
if (isGif) {
|
||||
result = await sendWithThreadFallback(
|
||||
const sendMedia = async (
|
||||
label: string,
|
||||
sender: (
|
||||
effectiveParams: Record<string, unknown> | undefined,
|
||||
) => Promise<TelegramMessageLike>,
|
||||
) =>
|
||||
await withTelegramThreadFallback(
|
||||
mediaParams,
|
||||
"animation",
|
||||
async (effectiveParams, label) =>
|
||||
requestWithDiag(
|
||||
() =>
|
||||
api.sendAnimation(
|
||||
label,
|
||||
opts.verbose,
|
||||
async (effectiveParams, retryLabel) =>
|
||||
requestWithChatNotFound(() => sender(effectiveParams), retryLabel),
|
||||
);
|
||||
|
||||
const mediaSender = (() => {
|
||||
if (isGif) {
|
||||
return {
|
||||
label: "animation",
|
||||
sender: (effectiveParams: Record<string, unknown> | undefined) =>
|
||||
api.sendAnimation(
|
||||
chatId,
|
||||
file,
|
||||
effectiveParams as Parameters<typeof api.sendAnimation>[2],
|
||||
) as Promise<TelegramMessageLike>,
|
||||
};
|
||||
}
|
||||
if (kind === "image") {
|
||||
return {
|
||||
label: "photo",
|
||||
sender: (effectiveParams: Record<string, unknown> | undefined) =>
|
||||
api.sendPhoto(
|
||||
chatId,
|
||||
file,
|
||||
effectiveParams as Parameters<typeof api.sendPhoto>[2],
|
||||
) as Promise<TelegramMessageLike>,
|
||||
};
|
||||
}
|
||||
if (kind === "video") {
|
||||
if (isVideoNote) {
|
||||
return {
|
||||
label: "video_note",
|
||||
sender: (effectiveParams: Record<string, unknown> | undefined) =>
|
||||
api.sendVideoNote(
|
||||
chatId,
|
||||
file,
|
||||
effectiveParams as Parameters<typeof api.sendAnimation>[2],
|
||||
),
|
||||
label,
|
||||
).catch((err) => {
|
||||
throw wrapChatNotFound(err);
|
||||
}),
|
||||
);
|
||||
} else if (kind === "image") {
|
||||
result = await sendWithThreadFallback(mediaParams, "photo", async (effectiveParams, label) =>
|
||||
requestWithDiag(
|
||||
() => api.sendPhoto(chatId, file, effectiveParams as Parameters<typeof api.sendPhoto>[2]),
|
||||
label,
|
||||
).catch((err) => {
|
||||
throw wrapChatNotFound(err);
|
||||
}),
|
||||
);
|
||||
} else if (kind === "video") {
|
||||
if (isVideoNote) {
|
||||
result = await sendWithThreadFallback(
|
||||
mediaParams,
|
||||
"video_note",
|
||||
async (effectiveParams, label) =>
|
||||
requestWithDiag(
|
||||
() =>
|
||||
api.sendVideoNote(
|
||||
chatId,
|
||||
file,
|
||||
effectiveParams as Parameters<typeof api.sendVideoNote>[2],
|
||||
),
|
||||
label,
|
||||
).catch((err) => {
|
||||
throw wrapChatNotFound(err);
|
||||
}),
|
||||
);
|
||||
} else {
|
||||
result = await sendWithThreadFallback(
|
||||
mediaParams,
|
||||
"video",
|
||||
async (effectiveParams, label) =>
|
||||
requestWithDiag(
|
||||
() =>
|
||||
api.sendVideo(chatId, file, effectiveParams as Parameters<typeof api.sendVideo>[2]),
|
||||
label,
|
||||
).catch((err) => {
|
||||
throw wrapChatNotFound(err);
|
||||
}),
|
||||
);
|
||||
effectiveParams as Parameters<typeof api.sendVideoNote>[2],
|
||||
) as Promise<TelegramMessageLike>,
|
||||
};
|
||||
}
|
||||
return {
|
||||
label: "video",
|
||||
sender: (effectiveParams: Record<string, unknown> | undefined) =>
|
||||
api.sendVideo(
|
||||
chatId,
|
||||
file,
|
||||
effectiveParams as Parameters<typeof api.sendVideo>[2],
|
||||
) as Promise<TelegramMessageLike>,
|
||||
};
|
||||
}
|
||||
} else if (kind === "audio") {
|
||||
const { useVoice } = resolveTelegramVoiceSend({
|
||||
wantsVoice: opts.asVoice === true, // default false (backward compatible)
|
||||
contentType: media.contentType,
|
||||
fileName,
|
||||
logFallback: logVerbose,
|
||||
});
|
||||
if (useVoice) {
|
||||
result = await sendWithThreadFallback(
|
||||
mediaParams,
|
||||
"voice",
|
||||
async (effectiveParams, label) =>
|
||||
requestWithDiag(
|
||||
() =>
|
||||
api.sendVoice(chatId, file, effectiveParams as Parameters<typeof api.sendVoice>[2]),
|
||||
label,
|
||||
).catch((err) => {
|
||||
throw wrapChatNotFound(err);
|
||||
}),
|
||||
);
|
||||
} else {
|
||||
result = await sendWithThreadFallback(
|
||||
mediaParams,
|
||||
"audio",
|
||||
async (effectiveParams, label) =>
|
||||
requestWithDiag(
|
||||
() =>
|
||||
api.sendAudio(chatId, file, effectiveParams as Parameters<typeof api.sendAudio>[2]),
|
||||
label,
|
||||
).catch((err) => {
|
||||
throw wrapChatNotFound(err);
|
||||
}),
|
||||
);
|
||||
}
|
||||
} else {
|
||||
result = await sendWithThreadFallback(
|
||||
mediaParams,
|
||||
"document",
|
||||
async (effectiveParams, label) =>
|
||||
requestWithDiag(
|
||||
() =>
|
||||
api.sendDocument(
|
||||
if (kind === "audio") {
|
||||
const { useVoice } = resolveTelegramVoiceSend({
|
||||
wantsVoice: opts.asVoice === true, // default false (backward compatible)
|
||||
contentType: media.contentType,
|
||||
fileName,
|
||||
logFallback: logVerbose,
|
||||
});
|
||||
if (useVoice) {
|
||||
return {
|
||||
label: "voice",
|
||||
sender: (effectiveParams: Record<string, unknown> | undefined) =>
|
||||
api.sendVoice(
|
||||
chatId,
|
||||
file,
|
||||
effectiveParams as Parameters<typeof api.sendDocument>[2],
|
||||
),
|
||||
label,
|
||||
).catch((err) => {
|
||||
throw wrapChatNotFound(err);
|
||||
}),
|
||||
);
|
||||
}
|
||||
effectiveParams as Parameters<typeof api.sendVoice>[2],
|
||||
) as Promise<TelegramMessageLike>,
|
||||
};
|
||||
}
|
||||
return {
|
||||
label: "audio",
|
||||
sender: (effectiveParams: Record<string, unknown> | undefined) =>
|
||||
api.sendAudio(
|
||||
chatId,
|
||||
file,
|
||||
effectiveParams as Parameters<typeof api.sendAudio>[2],
|
||||
) as Promise<TelegramMessageLike>,
|
||||
};
|
||||
}
|
||||
return {
|
||||
label: "document",
|
||||
sender: (effectiveParams: Record<string, unknown> | undefined) =>
|
||||
api.sendDocument(
|
||||
chatId,
|
||||
file,
|
||||
effectiveParams as Parameters<typeof api.sendDocument>[2],
|
||||
) as Promise<TelegramMessageLike>,
|
||||
};
|
||||
})();
|
||||
|
||||
const result = await sendMedia(mediaSender.label, mediaSender.sender);
|
||||
const mediaMessageId = String(result?.message_id ?? "unknown");
|
||||
const resolvedChatId = String(result?.chat?.id ?? chatId);
|
||||
if (result?.message_id) {
|
||||
|
|
@ -608,31 +667,16 @@ export async function reactMessageTelegram(
|
|||
emoji: string,
|
||||
opts: TelegramReactionOpts = {},
|
||||
): Promise<{ ok: true } | { ok: false; warning: string }> {
|
||||
const cfg = loadConfig();
|
||||
const account = resolveTelegramAccount({
|
||||
cfg,
|
||||
accountId: opts.accountId,
|
||||
});
|
||||
const token = resolveToken(opts.token, account);
|
||||
const { cfg, account, api } = resolveTelegramApiContext(opts);
|
||||
const chatId = normalizeChatId(String(chatIdInput));
|
||||
const messageId = normalizeMessageId(messageIdInput);
|
||||
const client = resolveTelegramClientOptions(account);
|
||||
const api = opts.api ?? new Bot(token, client ? { client } : undefined).api;
|
||||
const request = createTelegramRetryRunner({
|
||||
const requestWithDiag = createTelegramRequestWithDiag({
|
||||
cfg,
|
||||
account,
|
||||
retry: opts.retry,
|
||||
configRetry: account.config.retry,
|
||||
verbose: opts.verbose,
|
||||
shouldRetry: (err) => isRecoverableTelegramNetworkError(err, { context: "send" }),
|
||||
});
|
||||
const logHttpError = createTelegramHttpLogger(cfg);
|
||||
const requestWithDiag = <T>(fn: () => Promise<T>, label?: string) =>
|
||||
withTelegramApiErrorLogging({
|
||||
operation: label ?? "request",
|
||||
fn: () => request(fn, label),
|
||||
}).catch((err) => {
|
||||
logHttpError(label ?? "request", err);
|
||||
throw err;
|
||||
});
|
||||
const remove = opts.remove === true;
|
||||
const trimmedEmoji = emoji.trim();
|
||||
// Build the reaction array. We cast emoji to the grammY union type since
|
||||
|
|
@ -669,31 +713,16 @@ export async function deleteMessageTelegram(
|
|||
messageIdInput: string | number,
|
||||
opts: TelegramDeleteOpts = {},
|
||||
): Promise<{ ok: true }> {
|
||||
const cfg = loadConfig();
|
||||
const account = resolveTelegramAccount({
|
||||
cfg,
|
||||
accountId: opts.accountId,
|
||||
});
|
||||
const token = resolveToken(opts.token, account);
|
||||
const { cfg, account, api } = resolveTelegramApiContext(opts);
|
||||
const chatId = normalizeChatId(String(chatIdInput));
|
||||
const messageId = normalizeMessageId(messageIdInput);
|
||||
const client = resolveTelegramClientOptions(account);
|
||||
const api = opts.api ?? new Bot(token, client ? { client } : undefined).api;
|
||||
const request = createTelegramRetryRunner({
|
||||
const requestWithDiag = createTelegramRequestWithDiag({
|
||||
cfg,
|
||||
account,
|
||||
retry: opts.retry,
|
||||
configRetry: account.config.retry,
|
||||
verbose: opts.verbose,
|
||||
shouldRetry: (err) => isRecoverableTelegramNetworkError(err, { context: "send" }),
|
||||
});
|
||||
const logHttpError = createTelegramHttpLogger(cfg);
|
||||
const requestWithDiag = <T>(fn: () => Promise<T>, label?: string) =>
|
||||
withTelegramApiErrorLogging({
|
||||
operation: label ?? "request",
|
||||
fn: () => request(fn, label),
|
||||
}).catch((err) => {
|
||||
logHttpError(label ?? "request", err);
|
||||
throw err;
|
||||
});
|
||||
await requestWithDiag(() => api.deleteMessage(chatId, messageId), "deleteMessage");
|
||||
logVerbose(`[telegram] Deleted message ${messageId} from chat ${chatId}`);
|
||||
return { ok: true };
|
||||
|
|
@ -720,35 +749,23 @@ export async function editMessageTelegram(
|
|||
text: string,
|
||||
opts: TelegramEditOpts = {},
|
||||
): Promise<{ ok: true; messageId: string; chatId: string }> {
|
||||
const cfg = opts.cfg ?? loadConfig();
|
||||
const account = resolveTelegramAccount({
|
||||
cfg,
|
||||
accountId: opts.accountId,
|
||||
const { cfg, account, api } = resolveTelegramApiContext({
|
||||
...opts,
|
||||
cfg: opts.cfg,
|
||||
});
|
||||
const token = resolveToken(opts.token, account);
|
||||
const chatId = normalizeChatId(String(chatIdInput));
|
||||
const messageId = normalizeMessageId(messageIdInput);
|
||||
const client = resolveTelegramClientOptions(account);
|
||||
const api = opts.api ?? new Bot(token, client ? { client } : undefined).api;
|
||||
const request = createTelegramRetryRunner({
|
||||
const requestWithDiag = createTelegramRequestWithDiag({
|
||||
cfg,
|
||||
account,
|
||||
retry: opts.retry,
|
||||
configRetry: account.config.retry,
|
||||
verbose: opts.verbose,
|
||||
});
|
||||
const logHttpError = createTelegramHttpLogger(cfg);
|
||||
const requestWithDiag = <T>(
|
||||
const requestWithEditShouldLog = <T>(
|
||||
fn: () => Promise<T>,
|
||||
label?: string,
|
||||
shouldLog?: (err: unknown) => boolean,
|
||||
) =>
|
||||
withTelegramApiErrorLogging({
|
||||
operation: label ?? "request",
|
||||
fn: () => request(fn, label),
|
||||
shouldLog,
|
||||
}).catch((err) => {
|
||||
logHttpError(label ?? "request", err);
|
||||
throw err;
|
||||
});
|
||||
) => requestWithDiag(fn, label, shouldLog ? { shouldLog } : undefined);
|
||||
|
||||
const textMode = opts.textMode ?? "markdown";
|
||||
const tableMode = resolveMarkdownTableMode({
|
||||
|
|
@ -776,7 +793,7 @@ export async function editMessageTelegram(
|
|||
editParams.reply_markup = replyMarkup;
|
||||
}
|
||||
|
||||
await requestWithDiag(
|
||||
await requestWithEditShouldLog(
|
||||
() => api.editMessageText(chatId, messageId, htmlText, editParams),
|
||||
"editMessage",
|
||||
(err) => !isTelegramMessageNotModifiedError(err),
|
||||
|
|
@ -798,7 +815,7 @@ export async function editMessageTelegram(
|
|||
if (replyMarkup !== undefined) {
|
||||
plainParams.reply_markup = replyMarkup;
|
||||
}
|
||||
return await requestWithDiag(
|
||||
return await requestWithEditShouldLog(
|
||||
() =>
|
||||
Object.keys(plainParams).length > 0
|
||||
? api.editMessageText(chatId, messageId, text, plainParams)
|
||||
|
|
@ -859,90 +876,42 @@ export async function sendStickerTelegram(
|
|||
throw new Error("Telegram sticker file_id is required");
|
||||
}
|
||||
|
||||
const cfg = loadConfig();
|
||||
const account = resolveTelegramAccount({
|
||||
cfg,
|
||||
accountId: opts.accountId,
|
||||
});
|
||||
const token = resolveToken(opts.token, account);
|
||||
const { cfg, account, api } = resolveTelegramApiContext(opts);
|
||||
const target = parseTelegramTarget(to);
|
||||
const chatId = normalizeChatId(target.chatId);
|
||||
const client = resolveTelegramClientOptions(account);
|
||||
const api = opts.api ?? new Bot(token, client ? { client } : undefined).api;
|
||||
|
||||
const messageThreadId =
|
||||
opts.messageThreadId != null ? opts.messageThreadId : target.messageThreadId;
|
||||
const threadSpec =
|
||||
messageThreadId != null ? { id: messageThreadId, scope: "forum" as const } : undefined;
|
||||
const threadIdParams = buildTelegramThreadParams(threadSpec);
|
||||
const threadParams: Record<string, number> = threadIdParams ? { ...threadIdParams } : {};
|
||||
const threadParams: Record<string, unknown> = threadIdParams ? { ...threadIdParams } : {};
|
||||
if (opts.replyToMessageId != null) {
|
||||
threadParams.reply_to_message_id = Math.trunc(opts.replyToMessageId);
|
||||
}
|
||||
const hasThreadParams = Object.keys(threadParams).length > 0;
|
||||
|
||||
const request = createTelegramRetryRunner({
|
||||
const requestWithDiag = createTelegramRequestWithDiag({
|
||||
cfg,
|
||||
account,
|
||||
retry: opts.retry,
|
||||
configRetry: account.config.retry,
|
||||
verbose: opts.verbose,
|
||||
useApiErrorLogging: false,
|
||||
});
|
||||
const requestWithChatNotFound = createRequestWithChatNotFound({
|
||||
requestWithDiag,
|
||||
chatId,
|
||||
input: to,
|
||||
});
|
||||
const logHttpError = createTelegramHttpLogger(cfg);
|
||||
const requestWithDiag = <T>(fn: () => Promise<T>, label?: string) =>
|
||||
request(fn, label).catch((err) => {
|
||||
logHttpError(label ?? "request", err);
|
||||
throw err;
|
||||
});
|
||||
|
||||
const wrapChatNotFound = (err: unknown) => {
|
||||
if (!/400: Bad Request: chat not found/i.test(formatErrorMessage(err))) {
|
||||
return err;
|
||||
}
|
||||
return new Error(
|
||||
[
|
||||
`Telegram send failed: chat not found (chat_id=${chatId}).`,
|
||||
"Likely: bot not started in DM, bot removed from group/channel, group migrated (new -100… id), or wrong bot token.",
|
||||
`Input was: ${JSON.stringify(to)}.`,
|
||||
].join(" "),
|
||||
);
|
||||
};
|
||||
|
||||
const sendWithThreadFallback = async <T>(
|
||||
params: Record<string, number> | undefined,
|
||||
label: string,
|
||||
attempt: (
|
||||
effectiveParams: Record<string, number> | undefined,
|
||||
effectiveLabel: string,
|
||||
) => Promise<T>,
|
||||
): Promise<T> => {
|
||||
try {
|
||||
return await attempt(params, label);
|
||||
} catch (err) {
|
||||
if (!hasMessageThreadIdParam(params) || !isTelegramThreadNotFoundError(err)) {
|
||||
throw err;
|
||||
}
|
||||
if (opts.verbose) {
|
||||
console.warn(
|
||||
`telegram ${label} failed with message_thread_id, retrying without thread: ${formatErrorMessage(err)}`,
|
||||
);
|
||||
}
|
||||
const retriedParams = removeMessageThreadIdParam(params) as
|
||||
| Record<string, number>
|
||||
| undefined;
|
||||
return await attempt(retriedParams, `${label}-threadless`);
|
||||
}
|
||||
};
|
||||
|
||||
const stickerParams = hasThreadParams ? threadParams : undefined;
|
||||
|
||||
const result = await sendWithThreadFallback(
|
||||
const result = await withTelegramThreadFallback(
|
||||
stickerParams,
|
||||
"sticker",
|
||||
opts.verbose,
|
||||
async (effectiveParams, label) =>
|
||||
requestWithDiag(() => api.sendSticker(chatId, fileId.trim(), effectiveParams), label).catch(
|
||||
(err) => {
|
||||
throw wrapChatNotFound(err);
|
||||
},
|
||||
),
|
||||
requestWithChatNotFound(() => api.sendSticker(chatId, fileId.trim(), effectiveParams), label),
|
||||
);
|
||||
|
||||
const messageId = String(result?.message_id ?? "unknown");
|
||||
|
|
@ -986,16 +955,9 @@ export async function sendPollTelegram(
|
|||
poll: PollInput,
|
||||
opts: TelegramPollOpts = {},
|
||||
): Promise<{ messageId: string; chatId: string; pollId?: string }> {
|
||||
const cfg = loadConfig();
|
||||
const account = resolveTelegramAccount({
|
||||
cfg,
|
||||
accountId: opts.accountId,
|
||||
});
|
||||
const token = resolveToken(opts.token, account);
|
||||
const { cfg, account, api } = resolveTelegramApiContext(opts);
|
||||
const target = parseTelegramTarget(to);
|
||||
const chatId = normalizeChatId(target.chatId);
|
||||
const client = resolveTelegramClientOptions(account);
|
||||
const api = opts.api ?? new Bot(token, client ? { client } : undefined).api;
|
||||
|
||||
// Normalize the poll input (validates question, options, maxSelections)
|
||||
const normalizedPoll = normalizePollInput(poll, { maxOptions: 10 });
|
||||
|
|
@ -1009,58 +971,18 @@ export async function sendPollTelegram(
|
|||
// Build poll options as simple strings (Grammy accepts string[] or InputPollOption[])
|
||||
const pollOptions = normalizedPoll.options;
|
||||
|
||||
const request = createTelegramRetryRunner({
|
||||
const requestWithDiag = createTelegramRequestWithDiag({
|
||||
cfg,
|
||||
account,
|
||||
retry: opts.retry,
|
||||
configRetry: account.config.retry,
|
||||
verbose: opts.verbose,
|
||||
shouldRetry: (err) => isRecoverableTelegramNetworkError(err, { context: "send" }),
|
||||
});
|
||||
const logHttpError = createTelegramHttpLogger(cfg);
|
||||
const requestWithDiag = <T>(fn: () => Promise<T>, label?: string) =>
|
||||
withTelegramApiErrorLogging({
|
||||
operation: label ?? "request",
|
||||
fn: () => request(fn, label),
|
||||
}).catch((err) => {
|
||||
logHttpError(label ?? "request", err);
|
||||
throw err;
|
||||
});
|
||||
|
||||
const wrapChatNotFound = (err: unknown) => {
|
||||
if (!/400: Bad Request: chat not found/i.test(formatErrorMessage(err))) {
|
||||
return err;
|
||||
}
|
||||
return new Error(
|
||||
[
|
||||
`Telegram send failed: chat not found (chat_id=${chatId}).`,
|
||||
"Likely: bot not started in DM, bot removed from group/channel, group migrated (new -100… id), or wrong bot token.",
|
||||
`Input was: ${JSON.stringify(to)}.`,
|
||||
].join(" "),
|
||||
);
|
||||
};
|
||||
|
||||
const sendWithThreadFallback = async <T>(
|
||||
params: Record<string, unknown> | undefined,
|
||||
label: string,
|
||||
attempt: (
|
||||
effectiveParams: Record<string, unknown> | undefined,
|
||||
effectiveLabel: string,
|
||||
) => Promise<T>,
|
||||
): Promise<T> => {
|
||||
try {
|
||||
return await attempt(params, label);
|
||||
} catch (err) {
|
||||
if (!hasMessageThreadIdParam(params) || !isTelegramThreadNotFoundError(err)) {
|
||||
throw err;
|
||||
}
|
||||
if (opts.verbose) {
|
||||
console.warn(
|
||||
`telegram ${label} failed with message_thread_id, retrying without thread: ${formatErrorMessage(err)}`,
|
||||
);
|
||||
}
|
||||
const retriedParams = removeMessageThreadIdParam(params);
|
||||
return await attempt(retriedParams, `${label}-threadless`);
|
||||
}
|
||||
};
|
||||
const requestWithChatNotFound = createRequestWithChatNotFound({
|
||||
requestWithDiag,
|
||||
chatId,
|
||||
input: to,
|
||||
});
|
||||
|
||||
const durationSeconds = normalizedPoll.durationSeconds;
|
||||
if (durationSeconds === undefined && normalizedPoll.durationHours !== undefined) {
|
||||
|
|
@ -1085,13 +1007,15 @@ export async function sendPollTelegram(
|
|||
...(opts.silent === true ? { disable_notification: true } : {}),
|
||||
};
|
||||
|
||||
const result = await sendWithThreadFallback(pollParams, "poll", async (effectiveParams, label) =>
|
||||
requestWithDiag(
|
||||
() => api.sendPoll(chatId, normalizedPoll.question, pollOptions, effectiveParams),
|
||||
label,
|
||||
).catch((err) => {
|
||||
throw wrapChatNotFound(err);
|
||||
}),
|
||||
const result = await withTelegramThreadFallback(
|
||||
pollParams,
|
||||
"poll",
|
||||
opts.verbose,
|
||||
async (effectiveParams, label) =>
|
||||
requestWithChatNotFound(
|
||||
() => api.sendPoll(chatId, normalizedPoll.question, pollOptions, effectiveParams),
|
||||
label,
|
||||
),
|
||||
);
|
||||
|
||||
const messageId = String(result?.message_id ?? "unknown");
|
||||
|
|
|
|||
|
|
@ -1,41 +0,0 @@
|
|||
import { type ApiClientOptions, Bot } from "grammy";
|
||||
import type { TelegramNetworkConfig } from "../config/types.telegram.js";
|
||||
import { withTelegramApiErrorLogging } from "./api-logging.js";
|
||||
import { resolveTelegramFetch } from "./fetch.js";
|
||||
|
||||
export async function setTelegramWebhook(opts: {
|
||||
token: string;
|
||||
url: string;
|
||||
secret?: string;
|
||||
dropPendingUpdates?: boolean;
|
||||
network?: TelegramNetworkConfig;
|
||||
}) {
|
||||
const fetchImpl = resolveTelegramFetch(undefined, { network: opts.network });
|
||||
const client: ApiClientOptions | undefined = fetchImpl
|
||||
? { fetch: fetchImpl as unknown as ApiClientOptions["fetch"] }
|
||||
: undefined;
|
||||
const bot = new Bot(opts.token, client ? { client } : undefined);
|
||||
await withTelegramApiErrorLogging({
|
||||
operation: "setWebhook",
|
||||
fn: () =>
|
||||
bot.api.setWebhook(opts.url, {
|
||||
secret_token: opts.secret,
|
||||
drop_pending_updates: opts.dropPendingUpdates ?? false,
|
||||
}),
|
||||
});
|
||||
}
|
||||
|
||||
export async function deleteTelegramWebhook(opts: {
|
||||
token: string;
|
||||
network?: TelegramNetworkConfig;
|
||||
}) {
|
||||
const fetchImpl = resolveTelegramFetch(undefined, { network: opts.network });
|
||||
const client: ApiClientOptions | undefined = fetchImpl
|
||||
? { fetch: fetchImpl as unknown as ApiClientOptions["fetch"] }
|
||||
: undefined;
|
||||
const bot = new Bot(opts.token, client ? { client } : undefined);
|
||||
await withTelegramApiErrorLogging({
|
||||
operation: "deleteWebhook",
|
||||
fn: () => bot.api.deleteWebhook(),
|
||||
});
|
||||
}
|
||||
Loading…
Reference in New Issue