Compare commits

...

3 Commits

Author SHA1 Message Date
F_ool c5b4728972
Merge f2e34b7422 into c4265a5f16 2026-03-15 14:35:29 +00:00
jianxh f2e34b7422 Fix CI for telegram proxy PR 2026-03-15 22:35:14 +08:00
jianxh 6f864e21fa fix(telegram): route outbound media downloads through proxy transport 2026-03-15 19:04:42 +08:00
12 changed files with 287 additions and 47 deletions

View File

@ -35,6 +35,7 @@ import type { TelegramStreamMode } from "./bot/types.js";
import type { TelegramInlineButtons } from "./button-types.js"; import type { TelegramInlineButtons } from "./button-types.js";
import { createTelegramDraftStream } from "./draft-stream.js"; import { createTelegramDraftStream } from "./draft-stream.js";
import { shouldSuppressLocalTelegramExecApprovalPrompt } from "./exec-approvals.js"; import { shouldSuppressLocalTelegramExecApprovalPrompt } from "./exec-approvals.js";
import type { TelegramTransport } from "./fetch.js";
import { renderTelegramHtmlText } from "./format.js"; import { renderTelegramHtmlText } from "./format.js";
import { import {
type ArchivedPreview, type ArchivedPreview,
@ -111,6 +112,7 @@ type DispatchTelegramMessageParams = {
textLimit: number; textLimit: number;
telegramCfg: TelegramAccountConfig; telegramCfg: TelegramAccountConfig;
opts: Pick<TelegramBotOptions, "token">; opts: Pick<TelegramBotOptions, "token">;
telegramTransport?: TelegramTransport;
}; };
type TelegramReasoningLevel = "off" | "on" | "stream"; type TelegramReasoningLevel = "off" | "on" | "stream";
@ -148,6 +150,7 @@ export const dispatchTelegramMessage = async ({
textLimit, textLimit,
telegramCfg, telegramCfg,
opts, opts,
telegramTransport,
}: DispatchTelegramMessageParams) => { }: DispatchTelegramMessageParams) => {
const { const {
ctxPayload, ctxPayload,
@ -457,6 +460,7 @@ export const dispatchTelegramMessage = async ({
runtime, runtime,
bot, bot,
mediaLocalRoots, mediaLocalRoots,
telegramTransport,
replyToMode, replyToMode,
textLimit, textLimit,
thread: threadSpec, thread: threadSpec,

View File

@ -10,6 +10,7 @@ import {
import { dispatchTelegramMessage } from "./bot-message-dispatch.js"; import { dispatchTelegramMessage } from "./bot-message-dispatch.js";
import type { TelegramBotOptions } from "./bot.js"; import type { TelegramBotOptions } from "./bot.js";
import type { TelegramContext, TelegramStreamMode } from "./bot/types.js"; import type { TelegramContext, TelegramStreamMode } from "./bot/types.js";
import type { TelegramTransport } from "./fetch.js";
/** Dependencies injected once when creating the message processor. */ /** Dependencies injected once when creating the message processor. */
type TelegramMessageProcessorDeps = Omit< type TelegramMessageProcessorDeps = Omit<
@ -22,6 +23,7 @@ type TelegramMessageProcessorDeps = Omit<
streamMode: TelegramStreamMode; streamMode: TelegramStreamMode;
textLimit: number; textLimit: number;
opts: Pick<TelegramBotOptions, "token">; opts: Pick<TelegramBotOptions, "token">;
telegramTransport?: TelegramTransport;
}; };
export const createTelegramMessageProcessor = (deps: TelegramMessageProcessorDeps) => { export const createTelegramMessageProcessor = (deps: TelegramMessageProcessorDeps) => {
@ -46,6 +48,7 @@ export const createTelegramMessageProcessor = (deps: TelegramMessageProcessorDep
streamMode, streamMode,
textLimit, textLimit,
opts, opts,
telegramTransport,
} = deps; } = deps;
return async ( return async (
@ -90,6 +93,7 @@ export const createTelegramMessageProcessor = (deps: TelegramMessageProcessorDep
textLimit, textLimit,
telegramCfg, telegramCfg,
opts, opts,
telegramTransport,
}); });
} catch (err) { } catch (err) {
runtime.error?.(danger(`telegram message processing failed: ${String(err)}`)); runtime.error?.(danger(`telegram message processing failed: ${String(err)}`));

View File

@ -140,6 +140,7 @@ type RegisterTelegramNativeCommandsParams = {
) => { groupConfig?: TelegramGroupConfig; topicConfig?: TelegramTopicConfig }; ) => { groupConfig?: TelegramGroupConfig; topicConfig?: TelegramTopicConfig };
shouldSkipUpdate: (ctx: TelegramUpdateKeyContext) => boolean; shouldSkipUpdate: (ctx: TelegramUpdateKeyContext) => boolean;
opts: { token: string }; opts: { token: string };
telegramTransport?: TelegramTransport;
}; };
async function resolveTelegramCommandAuth(params: { async function resolveTelegramCommandAuth(params: {
@ -362,6 +363,7 @@ export const registerTelegramNativeCommands = ({
resolveTelegramGroupConfig, resolveTelegramGroupConfig,
shouldSkipUpdate, shouldSkipUpdate,
opts, opts,
telegramTransport,
}: RegisterTelegramNativeCommandsParams) => { }: RegisterTelegramNativeCommandsParams) => {
const boundRoute = const boundRoute =
nativeEnabled && nativeSkillsEnabled nativeEnabled && nativeSkillsEnabled
@ -536,6 +538,7 @@ export const registerTelegramNativeCommands = ({
runtime, runtime,
bot, bot,
mediaLocalRoots: params.mediaLocalRoots, mediaLocalRoots: params.mediaLocalRoots,
telegramTransport,
replyToMode, replyToMode,
textLimit, textLimit,
thread: params.threadSpec, thread: params.threadSpec,

View File

@ -471,6 +471,7 @@ export function createTelegramBot(opts: TelegramBotOptions) {
streamMode, streamMode,
textLimit, textLimit,
opts, opts,
telegramTransport,
}); });
registerTelegramNativeCommands({ registerTelegramNativeCommands({
@ -491,6 +492,7 @@ export function createTelegramBot(opts: TelegramBotOptions) {
resolveTelegramGroupConfig, resolveTelegramGroupConfig,
shouldSkipUpdate, shouldSkipUpdate,
opts, opts,
telegramTransport,
}); });
registerTelegramHandlers({ registerTelegramHandlers({

View File

@ -23,6 +23,7 @@ import type { RuntimeEnv } from "../../../../src/runtime.js";
import { loadWebMedia } from "../../../whatsapp/src/media.js"; import { loadWebMedia } from "../../../whatsapp/src/media.js";
import type { TelegramInlineButtons } from "../button-types.js"; import type { TelegramInlineButtons } from "../button-types.js";
import { splitTelegramCaption } from "../caption.js"; import { splitTelegramCaption } from "../caption.js";
import { shouldRetryTelegramIpv4Fallback, type TelegramTransport } from "../fetch.js";
import { import {
markdownToTelegramChunks, markdownToTelegramChunks,
markdownToTelegramHtml, markdownToTelegramHtml,
@ -234,6 +235,7 @@ async function deliverMediaReply(params: {
thread?: TelegramThreadSpec | null; thread?: TelegramThreadSpec | null;
tableMode?: MarkdownTableMode; tableMode?: MarkdownTableMode;
mediaLocalRoots?: readonly string[]; mediaLocalRoots?: readonly string[];
telegramTransport?: TelegramTransport;
chunkText: ChunkTextFn; chunkText: ChunkTextFn;
onVoiceRecording?: () => Promise<void> | void; onVoiceRecording?: () => Promise<void> | void;
linkPreview?: boolean; linkPreview?: boolean;
@ -250,7 +252,15 @@ async function deliverMediaReply(params: {
const isFirstMedia = first; const isFirstMedia = first;
const media = await loadWebMedia( const media = await loadWebMedia(
mediaUrl, mediaUrl,
buildOutboundMediaLoadOptions({ mediaLocalRoots: params.mediaLocalRoots }), buildOutboundMediaLoadOptions({
mediaLocalRoots: params.mediaLocalRoots,
fetchImpl: params.telegramTransport?.sourceFetch,
dispatcherPolicy: params.telegramTransport?.pinnedDispatcherPolicy,
fallbackDispatcherPolicy: params.telegramTransport?.fallbackPinnedDispatcherPolicy,
shouldRetryFetchError: params.telegramTransport
? shouldRetryTelegramIpv4Fallback
: undefined,
}),
); );
const kind = kindFromMime(media.contentType ?? undefined); const kind = kindFromMime(media.contentType ?? undefined);
const isGif = isGifMedia({ const isGif = isGifMedia({
@ -548,6 +558,7 @@ export async function deliverReplies(params: {
runtime: RuntimeEnv; runtime: RuntimeEnv;
bot: Bot; bot: Bot;
mediaLocalRoots?: readonly string[]; mediaLocalRoots?: readonly string[];
telegramTransport?: TelegramTransport;
replyToMode: ReplyToMode; replyToMode: ReplyToMode;
textLimit: number; textLimit: number;
thread?: TelegramThreadSpec | null; thread?: TelegramThreadSpec | null;
@ -651,6 +662,7 @@ export async function deliverReplies(params: {
thread: params.thread, thread: params.thread,
tableMode: params.tableMode, tableMode: params.tableMode,
mediaLocalRoots: params.mediaLocalRoots, mediaLocalRoots: params.mediaLocalRoots,
telegramTransport: params.telegramTransport,
chunkText, chunkText,
onVoiceRecording: params.onVoiceRecording, onVoiceRecording: params.onVoiceRecording,
linkPreview: params.linkPreview, linkPreview: params.linkPreview,

View File

@ -392,6 +392,44 @@ describe("deliverReplies", () => {
}); });
}); });
it("passes Telegram transport to remote media loading", async () => {
const runtime = createRuntime();
const sendPhoto = vi.fn().mockResolvedValue({
message_id: 13,
chat: { id: "123" },
});
const bot = createBot({ sendPhoto });
const sourceFetch = vi.fn() as unknown as typeof fetch;
const telegramTransport = {
fetch: sourceFetch,
sourceFetch,
pinnedDispatcherPolicy: {
mode: "explicit-proxy",
proxyUrl: "http://proxy.test:8080",
} as const,
fallbackPinnedDispatcherPolicy: { mode: "direct" } as const,
};
mockMediaLoad("photo.jpg", "image/jpeg", "image");
await deliverWith({
replies: [{ mediaUrl: "https://example.com/photo.jpg" }],
runtime,
bot,
telegramTransport,
});
expect(loadWebMedia).toHaveBeenCalledWith(
"https://example.com/photo.jpg",
expect.objectContaining({
fetchImpl: sourceFetch,
dispatcherPolicy: telegramTransport.pinnedDispatcherPolicy,
fallbackDispatcherPolicy: telegramTransport.fallbackPinnedDispatcherPolicy,
shouldRetryFetchError: expect.any(Function),
}),
);
});
it("includes link_preview_options when linkPreview is false", async () => { it("includes link_preview_options when linkPreview is false", async () => {
const runtime = createRuntime(); const runtime = createRuntime();
const sendMessage = vi.fn().mockResolvedValue({ const sendMessage = vi.fn().mockResolvedValue({

View File

@ -3,6 +3,7 @@ import { beforeEach, describe, expect, it, vi } from "vitest";
const { botApi, botCtorSpy } = vi.hoisted(() => ({ const { botApi, botCtorSpy } = vi.hoisted(() => ({
botApi: { botApi: {
sendMessage: vi.fn(), sendMessage: vi.fn(),
sendPhoto: vi.fn(),
setMessageReaction: vi.fn(), setMessageReaction: vi.fn(),
deleteMessage: vi.fn(), deleteMessage: vi.fn(),
}, },
@ -17,8 +18,13 @@ const { makeProxyFetch } = vi.hoisted(() => ({
makeProxyFetch: vi.fn(), makeProxyFetch: vi.fn(),
})); }));
const { resolveTelegramFetch } = vi.hoisted(() => ({ const { resolveTelegramTransport, shouldRetryTelegramIpv4Fallback } = vi.hoisted(() => ({
resolveTelegramFetch: vi.fn(), resolveTelegramTransport: vi.fn(),
shouldRetryTelegramIpv4Fallback: vi.fn(() => true),
}));
const { loadWebMedia } = vi.hoisted(() => ({
loadWebMedia: vi.fn(),
})); }));
vi.mock("../../../src/config/config.js", async (importOriginal) => { vi.mock("../../../src/config/config.js", async (importOriginal) => {
@ -33,13 +39,18 @@ vi.mock("./proxy.js", () => ({
makeProxyFetch, makeProxyFetch,
})); }));
vi.mock("../../whatsapp/src/media.js", () => ({
loadWebMedia,
}));
vi.mock("./fetch.js", () => ({ vi.mock("./fetch.js", () => ({
resolveTelegramFetch, resolveTelegramTransport,
shouldRetryTelegramIpv4Fallback,
})); }));
vi.mock("grammy", () => ({ vi.mock("grammy", () => ({
Bot: class { Bot: class {
api = botApi; api = { ...botApi };
catch = vi.fn(); catch = vi.fn();
constructor( constructor(
public token: string, public token: string,
@ -61,21 +72,30 @@ import {
describe("telegram proxy client", () => { describe("telegram proxy client", () => {
const proxyUrl = "http://proxy.test:8080"; const proxyUrl = "http://proxy.test:8080";
const prepareProxyFetch = () => { const prepareProxyTransport = () => {
const proxyFetch = vi.fn(); const proxyFetch = vi.fn();
const fetchImpl = vi.fn(); const clientFetch = vi.fn();
const sourceFetch = vi.fn();
const transport = {
fetch: clientFetch as unknown as typeof fetch,
sourceFetch: sourceFetch as unknown as typeof fetch,
pinnedDispatcherPolicy: { mode: "explicit-proxy", proxyUrl } as const,
fallbackPinnedDispatcherPolicy: { mode: "direct" } as const,
};
makeProxyFetch.mockReturnValue(proxyFetch as unknown as typeof fetch); makeProxyFetch.mockReturnValue(proxyFetch as unknown as typeof fetch);
resolveTelegramFetch.mockReturnValue(fetchImpl as unknown as typeof fetch); resolveTelegramTransport.mockReturnValue(transport);
return { proxyFetch, fetchImpl }; return { proxyFetch, transport };
}; };
const expectProxyClient = (fetchImpl: ReturnType<typeof vi.fn>) => { const expectProxyClient = (transport: ReturnType<typeof prepareProxyTransport>["transport"]) => {
expect(makeProxyFetch).toHaveBeenCalledWith(proxyUrl); expect(makeProxyFetch).toHaveBeenCalledWith(proxyUrl);
expect(resolveTelegramFetch).toHaveBeenCalledWith(expect.any(Function), { network: undefined }); expect(resolveTelegramTransport).toHaveBeenCalledWith(expect.any(Function), {
network: undefined,
});
expect(botCtorSpy).toHaveBeenCalledWith( expect(botCtorSpy).toHaveBeenCalledWith(
"tok", "tok",
expect.objectContaining({ expect.objectContaining({
client: expect.objectContaining({ fetch: fetchImpl }), client: expect.objectContaining({ fetch: transport.fetch }),
}), }),
); );
}; };
@ -86,16 +106,18 @@ describe("telegram proxy client", () => {
botApi.sendMessage.mockResolvedValue({ message_id: 1, chat: { id: "123" } }); botApi.sendMessage.mockResolvedValue({ message_id: 1, chat: { id: "123" } });
botApi.setMessageReaction.mockResolvedValue(undefined); botApi.setMessageReaction.mockResolvedValue(undefined);
botApi.deleteMessage.mockResolvedValue(true); botApi.deleteMessage.mockResolvedValue(true);
botApi.sendPhoto.mockResolvedValue({ message_id: 2, chat: { id: "123" } });
botCtorSpy.mockClear(); botCtorSpy.mockClear();
loadConfig.mockReturnValue({ loadConfig.mockReturnValue({
channels: { telegram: { accounts: { foo: { proxy: proxyUrl } } } }, channels: { telegram: { accounts: { foo: { proxy: proxyUrl } } } },
}); });
makeProxyFetch.mockClear(); makeProxyFetch.mockClear();
resolveTelegramFetch.mockClear(); resolveTelegramTransport.mockClear();
loadWebMedia.mockReset();
}); });
it("reuses cached Telegram client options for repeated sends with same account transport settings", async () => { it("reuses cached Telegram client options for repeated sends with same account transport settings", async () => {
const { fetchImpl } = prepareProxyFetch(); const { transport } = prepareProxyTransport();
vi.stubEnv("VITEST", ""); vi.stubEnv("VITEST", "");
vi.stubEnv("NODE_ENV", "production"); vi.stubEnv("NODE_ENV", "production");
@ -103,20 +125,20 @@ describe("telegram proxy client", () => {
await sendMessageTelegram("123", "second", { token: "tok", accountId: "foo" }); await sendMessageTelegram("123", "second", { token: "tok", accountId: "foo" });
expect(makeProxyFetch).toHaveBeenCalledTimes(1); expect(makeProxyFetch).toHaveBeenCalledTimes(1);
expect(resolveTelegramFetch).toHaveBeenCalledTimes(1); expect(resolveTelegramTransport).toHaveBeenCalledTimes(1);
expect(botCtorSpy).toHaveBeenCalledTimes(2); expect(botCtorSpy).toHaveBeenCalledTimes(2);
expect(botCtorSpy).toHaveBeenNthCalledWith( expect(botCtorSpy).toHaveBeenNthCalledWith(
1, 1,
"tok", "tok",
expect.objectContaining({ expect.objectContaining({
client: expect.objectContaining({ fetch: fetchImpl }), client: expect.objectContaining({ fetch: transport.fetch }),
}), }),
); );
expect(botCtorSpy).toHaveBeenNthCalledWith( expect(botCtorSpy).toHaveBeenNthCalledWith(
2, 2,
"tok", "tok",
expect.objectContaining({ expect.objectContaining({
client: expect.objectContaining({ fetch: fetchImpl }), client: expect.objectContaining({ fetch: transport.fetch }),
}), }),
); );
}); });
@ -135,10 +157,36 @@ describe("telegram proxy client", () => {
run: () => deleteMessageTelegram("123", "456", { token: "tok", accountId: "foo" }), run: () => deleteMessageTelegram("123", "456", { token: "tok", accountId: "foo" }),
}, },
])("uses proxy fetch for $name", async (testCase) => { ])("uses proxy fetch for $name", async (testCase) => {
const { fetchImpl } = prepareProxyFetch(); const { transport } = prepareProxyTransport();
await testCase.run(); await testCase.run();
expectProxyClient(fetchImpl); expectProxyClient(transport);
});
it("uses proxy-aware transport for outbound media prefetch", async () => {
const { transport } = prepareProxyTransport();
loadWebMedia.mockResolvedValueOnce({
buffer: Buffer.from("image"),
contentType: "image/jpeg",
fileName: "photo.jpg",
});
await sendMessageTelegram("123", "caption", {
token: "tok",
accountId: "foo",
mediaUrl: "https://example.com/photo.jpg",
});
expectProxyClient(transport);
expect(loadWebMedia).toHaveBeenCalledWith(
"https://example.com/photo.jpg",
expect.objectContaining({
fetchImpl: transport.sourceFetch,
dispatcherPolicy: transport.pinnedDispatcherPolicy,
fallbackDispatcherPolicy: transport.fallbackPinnedDispatcherPolicy,
shouldRetryFetchError: expect.any(Function),
}),
);
}); });
}); });

View File

@ -25,7 +25,11 @@ import { withTelegramApiErrorLogging } from "./api-logging.js";
import { buildTelegramThreadParams, buildTypingThreadParams } from "./bot/helpers.js"; import { buildTelegramThreadParams, buildTypingThreadParams } from "./bot/helpers.js";
import type { TelegramInlineButtons } from "./button-types.js"; import type { TelegramInlineButtons } from "./button-types.js";
import { splitTelegramCaption } from "./caption.js"; import { splitTelegramCaption } from "./caption.js";
import { resolveTelegramFetch } from "./fetch.js"; import {
resolveTelegramTransport,
shouldRetryTelegramIpv4Fallback,
type TelegramTransport,
} from "./fetch.js";
import { renderTelegramHtmlText, splitTelegramHtmlChunks } from "./format.js"; import { renderTelegramHtmlText, splitTelegramHtmlChunks } from "./format.js";
import { import {
isRecoverableTelegramNetworkError, isRecoverableTelegramNetworkError,
@ -159,10 +163,12 @@ const CHAT_NOT_FOUND_RE = /400: Bad Request: chat not found/i;
const sendLogger = createSubsystemLogger("telegram/send"); const sendLogger = createSubsystemLogger("telegram/send");
const diagLogger = createSubsystemLogger("telegram/diagnostic"); const diagLogger = createSubsystemLogger("telegram/diagnostic");
const telegramClientOptionsCache = new Map<string, ApiClientOptions | undefined>(); const telegramClientOptionsCache = new Map<string, ApiClientOptions | undefined>();
const telegramTransportCache = new Map<string, TelegramTransport>();
const MAX_TELEGRAM_CLIENT_OPTIONS_CACHE_SIZE = 64; const MAX_TELEGRAM_CLIENT_OPTIONS_CACHE_SIZE = 64;
export function resetTelegramClientOptionsCacheForTests(): void { export function resetTelegramClientOptionsCacheForTests(): void {
telegramClientOptionsCache.clear(); telegramClientOptionsCache.clear();
telegramTransportCache.clear();
} }
function createTelegramHttpLogger(cfg: ReturnType<typeof loadConfig>) { function createTelegramHttpLogger(cfg: ReturnType<typeof loadConfig>) {
@ -183,18 +189,22 @@ function shouldUseTelegramClientOptionsCache(): boolean {
return !process.env.VITEST && process.env.NODE_ENV !== "test"; return !process.env.VITEST && process.env.NODE_ENV !== "test";
} }
function buildTelegramTransportCacheKey(account: ResolvedTelegramAccount): string {
const proxyKey = account.config.proxy?.trim() ?? "";
const autoSelectFamily = account.config.network?.autoSelectFamily;
const autoSelectFamilyKey =
typeof autoSelectFamily === "boolean" ? String(autoSelectFamily) : "default";
const dnsResultOrderKey = account.config.network?.dnsResultOrder ?? "default";
return `${account.accountId}::${proxyKey}::${autoSelectFamilyKey}::${dnsResultOrderKey}`;
}
function buildTelegramClientOptionsCacheKey(params: { function buildTelegramClientOptionsCacheKey(params: {
account: ResolvedTelegramAccount; account: ResolvedTelegramAccount;
timeoutSeconds?: number; timeoutSeconds?: number;
}): string { }): string {
const proxyKey = params.account.config.proxy?.trim() ?? "";
const autoSelectFamily = params.account.config.network?.autoSelectFamily;
const autoSelectFamilyKey =
typeof autoSelectFamily === "boolean" ? String(autoSelectFamily) : "default";
const dnsResultOrderKey = params.account.config.network?.dnsResultOrder ?? "default";
const timeoutSecondsKey = const timeoutSecondsKey =
typeof params.timeoutSeconds === "number" ? String(params.timeoutSeconds) : "default"; typeof params.timeoutSeconds === "number" ? String(params.timeoutSeconds) : "default";
return `${params.account.accountId}::${proxyKey}::${autoSelectFamilyKey}::${dnsResultOrderKey}::${timeoutSecondsKey}`; return `${buildTelegramTransportCacheKey(params.account)}::${timeoutSecondsKey}`;
} }
function setCachedTelegramClientOptions( function setCachedTelegramClientOptions(
@ -211,8 +221,44 @@ function setCachedTelegramClientOptions(
return clientOptions; return clientOptions;
} }
function setCachedTelegramTransport(
cacheKey: string,
transport: TelegramTransport,
): TelegramTransport {
telegramTransportCache.set(cacheKey, transport);
if (telegramTransportCache.size > MAX_TELEGRAM_CLIENT_OPTIONS_CACHE_SIZE) {
const oldestKey = telegramTransportCache.keys().next().value;
if (oldestKey !== undefined) {
telegramTransportCache.delete(oldestKey);
}
}
return transport;
}
function resolveTelegramTransportForAccount(account: ResolvedTelegramAccount): TelegramTransport {
const cacheEnabled = shouldUseTelegramClientOptionsCache();
const cacheKey = cacheEnabled ? buildTelegramTransportCacheKey(account) : null;
if (cacheKey && telegramTransportCache.has(cacheKey)) {
const cached = telegramTransportCache.get(cacheKey);
if (cached) {
return cached;
}
}
const proxyUrl = account.config.proxy?.trim();
const proxyFetch = proxyUrl ? makeProxyFetch(proxyUrl) : undefined;
const transport = resolveTelegramTransport(proxyFetch, {
network: account.config.network,
});
if (cacheKey) {
return setCachedTelegramTransport(cacheKey, transport);
}
return transport;
}
function resolveTelegramClientOptions( function resolveTelegramClientOptions(
account: ResolvedTelegramAccount, account: ResolvedTelegramAccount,
transport: TelegramTransport = resolveTelegramTransportForAccount(account),
): ApiClientOptions | undefined { ): ApiClientOptions | undefined {
const timeoutSeconds = const timeoutSeconds =
typeof account.config.timeoutSeconds === "number" && typeof account.config.timeoutSeconds === "number" &&
@ -231,15 +277,12 @@ function resolveTelegramClientOptions(
return telegramClientOptionsCache.get(cacheKey); return telegramClientOptionsCache.get(cacheKey);
} }
const proxyUrl = account.config.proxy?.trim();
const proxyFetch = proxyUrl ? makeProxyFetch(proxyUrl) : undefined;
const fetchImpl = resolveTelegramFetch(proxyFetch, {
network: account.config.network,
});
const clientOptions = const clientOptions =
fetchImpl || timeoutSeconds transport.fetch || timeoutSeconds
? { ? {
...(fetchImpl ? { fetch: fetchImpl as unknown as ApiClientOptions["fetch"] } : {}), ...(transport.fetch
? { fetch: transport.fetch as unknown as ApiClientOptions["fetch"] }
: {}),
...(timeoutSeconds ? { timeoutSeconds } : {}), ...(timeoutSeconds ? { timeoutSeconds } : {}),
} }
: undefined; : undefined;
@ -426,6 +469,7 @@ type TelegramApiContext = {
cfg: ReturnType<typeof loadConfig>; cfg: ReturnType<typeof loadConfig>;
account: ResolvedTelegramAccount; account: ResolvedTelegramAccount;
api: TelegramApi; api: TelegramApi;
telegramTransport: TelegramTransport;
}; };
function resolveTelegramApiContext(opts: { function resolveTelegramApiContext(opts: {
@ -440,9 +484,10 @@ function resolveTelegramApiContext(opts: {
accountId: opts.accountId, accountId: opts.accountId,
}); });
const token = resolveToken(opts.token, account); const token = resolveToken(opts.token, account);
const client = resolveTelegramClientOptions(account); const telegramTransport = resolveTelegramTransportForAccount(account);
const client = resolveTelegramClientOptions(account, telegramTransport);
const api = (opts.api ?? new Bot(token, client ? { client } : undefined).api) as TelegramApi; const api = (opts.api ?? new Bot(token, client ? { client } : undefined).api) as TelegramApi;
return { cfg, account, api }; return { cfg, account, api, telegramTransport };
} }
type TelegramRequestWithDiag = <T>( type TelegramRequestWithDiag = <T>(
@ -592,7 +637,7 @@ export async function sendMessageTelegram(
text: string, text: string,
opts: TelegramSendOpts = {}, opts: TelegramSendOpts = {},
): Promise<TelegramSendResult> { ): Promise<TelegramSendResult> {
const { cfg, account, api } = resolveTelegramApiContext(opts); const { cfg, account, api, telegramTransport } = resolveTelegramApiContext(opts);
const target = parseTelegramTarget(to); const target = parseTelegramTarget(to);
const chatId = await resolveAndPersistChatId({ const chatId = await resolveAndPersistChatId({
cfg, cfg,
@ -766,6 +811,10 @@ export async function sendMessageTelegram(
maxBytes: mediaMaxBytes, maxBytes: mediaMaxBytes,
mediaLocalRoots: opts.mediaLocalRoots, mediaLocalRoots: opts.mediaLocalRoots,
optimizeImages: opts.forceDocument ? false : undefined, optimizeImages: opts.forceDocument ? false : undefined,
fetchImpl: telegramTransport.sourceFetch,
dispatcherPolicy: telegramTransport.pinnedDispatcherPolicy,
fallbackDispatcherPolicy: telegramTransport.fallbackPinnedDispatcherPolicy,
shouldRetryFetchError: shouldRetryTelegramIpv4Fallback,
}), }),
); );
const kind = kindFromMime(media.contentType ?? undefined); const kind = kindFromMime(media.contentType ?? undefined);

View File

@ -3,9 +3,9 @@ import path from "node:path";
import { fileURLToPath } from "node:url"; import { fileURLToPath } from "node:url";
import { logVerbose, shouldLogVerbose } from "../../../src/globals.js"; import { logVerbose, shouldLogVerbose } from "../../../src/globals.js";
import { SafeOpenError, readLocalFileSafely } from "../../../src/infra/fs-safe.js"; import { SafeOpenError, readLocalFileSafely } from "../../../src/infra/fs-safe.js";
import type { SsrFPolicy } from "../../../src/infra/net/ssrf.js"; import type { PinnedDispatcherPolicy, SsrFPolicy } from "../../../src/infra/net/ssrf.js";
import { type MediaKind, maxBytesForKind } from "../../../src/media/constants.js"; import { type MediaKind, maxBytesForKind } from "../../../src/media/constants.js";
import { fetchRemoteMedia } from "../../../src/media/fetch.js"; import { fetchRemoteMedia, type FetchLike } from "../../../src/media/fetch.js";
import { import {
convertHeicToJpeg, convertHeicToJpeg,
hasAlphaChannel, hasAlphaChannel,
@ -27,6 +27,10 @@ type WebMediaOptions = {
maxBytes?: number; maxBytes?: number;
optimizeImages?: boolean; optimizeImages?: boolean;
ssrfPolicy?: SsrFPolicy; ssrfPolicy?: SsrFPolicy;
fetchImpl?: FetchLike;
dispatcherPolicy?: PinnedDispatcherPolicy;
fallbackDispatcherPolicy?: PinnedDispatcherPolicy;
shouldRetryFetchError?: (error: unknown) => boolean;
/** Allowed root directories for local path reads. "any" is deprecated; prefer sandboxValidated + readFile. */ /** Allowed root directories for local path reads. "any" is deprecated; prefer sandboxValidated + readFile. */
localRoots?: readonly string[] | "any"; localRoots?: readonly string[] | "any";
/** Caller already validated the local path (sandbox/other guards); requires readFile override. */ /** Caller already validated the local path (sandbox/other guards); requires readFile override. */
@ -36,7 +40,14 @@ type WebMediaOptions = {
function resolveWebMediaOptions(params: { function resolveWebMediaOptions(params: {
maxBytesOrOptions?: number | WebMediaOptions; maxBytesOrOptions?: number | WebMediaOptions;
options?: { ssrfPolicy?: SsrFPolicy; localRoots?: readonly string[] | "any" }; options?: {
ssrfPolicy?: SsrFPolicy;
fetchImpl?: FetchLike;
dispatcherPolicy?: PinnedDispatcherPolicy;
fallbackDispatcherPolicy?: PinnedDispatcherPolicy;
shouldRetryFetchError?: (error: unknown) => boolean;
localRoots?: readonly string[] | "any";
};
optimizeImages: boolean; optimizeImages: boolean;
}): WebMediaOptions { }): WebMediaOptions {
if (typeof params.maxBytesOrOptions === "number" || params.maxBytesOrOptions === undefined) { if (typeof params.maxBytesOrOptions === "number" || params.maxBytesOrOptions === undefined) {
@ -44,6 +55,10 @@ function resolveWebMediaOptions(params: {
maxBytes: params.maxBytesOrOptions, maxBytes: params.maxBytesOrOptions,
optimizeImages: params.optimizeImages, optimizeImages: params.optimizeImages,
ssrfPolicy: params.options?.ssrfPolicy, ssrfPolicy: params.options?.ssrfPolicy,
fetchImpl: params.options?.fetchImpl,
dispatcherPolicy: params.options?.dispatcherPolicy,
fallbackDispatcherPolicy: params.options?.fallbackDispatcherPolicy,
shouldRetryFetchError: params.options?.shouldRetryFetchError,
localRoots: params.options?.localRoots, localRoots: params.options?.localRoots,
}; };
} }
@ -238,6 +253,10 @@ async function loadWebMediaInternal(
maxBytes, maxBytes,
optimizeImages = true, optimizeImages = true,
ssrfPolicy, ssrfPolicy,
fetchImpl,
dispatcherPolicy,
fallbackDispatcherPolicy,
shouldRetryFetchError,
localRoots, localRoots,
sandboxValidated = false, sandboxValidated = false,
readFile: readFileOverride, readFile: readFileOverride,
@ -331,7 +350,15 @@ async function loadWebMediaInternal(
: optimizeImages : optimizeImages
? Math.max(maxBytes, defaultFetchCap) ? Math.max(maxBytes, defaultFetchCap)
: maxBytes; : maxBytes;
const fetched = await fetchRemoteMedia({ url: mediaUrl, maxBytes: fetchCap, ssrfPolicy }); const fetched = await fetchRemoteMedia({
url: mediaUrl,
fetchImpl,
maxBytes: fetchCap,
ssrfPolicy,
dispatcherPolicy,
fallbackDispatcherPolicy,
shouldRetryFetchError,
});
const { buffer, contentType, fileName } = fetched; const { buffer, contentType, fileName } = fetched;
const kind = kindFromMime(contentType); const kind = kindFromMime(contentType);
return await clampAndFinalize({ buffer, contentType, kind, fileName }); return await clampAndFinalize({ buffer, contentType, kind, fileName });
@ -404,7 +431,14 @@ async function loadWebMediaInternal(
export async function loadWebMedia( export async function loadWebMedia(
mediaUrl: string, mediaUrl: string,
maxBytesOrOptions?: number | WebMediaOptions, maxBytesOrOptions?: number | WebMediaOptions,
options?: { ssrfPolicy?: SsrFPolicy; localRoots?: readonly string[] | "any" }, options?: {
ssrfPolicy?: SsrFPolicy;
fetchImpl?: FetchLike;
dispatcherPolicy?: PinnedDispatcherPolicy;
fallbackDispatcherPolicy?: PinnedDispatcherPolicy;
shouldRetryFetchError?: (error: unknown) => boolean;
localRoots?: readonly string[] | "any";
},
): Promise<WebMediaResult> { ): Promise<WebMediaResult> {
return await loadWebMediaInternal( return await loadWebMediaInternal(
mediaUrl, mediaUrl,
@ -415,7 +449,14 @@ export async function loadWebMedia(
export async function loadWebMediaRaw( export async function loadWebMediaRaw(
mediaUrl: string, mediaUrl: string,
maxBytesOrOptions?: number | WebMediaOptions, maxBytesOrOptions?: number | WebMediaOptions,
options?: { ssrfPolicy?: SsrFPolicy; localRoots?: readonly string[] | "any" }, options?: {
ssrfPolicy?: SsrFPolicy;
fetchImpl?: FetchLike;
dispatcherPolicy?: PinnedDispatcherPolicy;
fallbackDispatcherPolicy?: PinnedDispatcherPolicy;
shouldRetryFetchError?: (error: unknown) => boolean;
localRoots?: readonly string[] | "any";
},
): Promise<WebMediaResult> { ): Promise<WebMediaResult> {
return await loadWebMediaInternal( return await loadWebMediaInternal(
mediaUrl, mediaUrl,

View File

@ -252,7 +252,7 @@ describe("web monitor inbox", () => {
}); });
}); });
it("handles append messages by marking them read but skipping auto-reply", async () => { it("handles stale append messages by marking them read but skipping auto-reply", async () => {
const { onMessage, listener, sock } = await openInboxMonitor(); const { onMessage, listener, sock } = await openInboxMonitor();
const upsert = { const upsert = {
@ -265,7 +265,7 @@ describe("web monitor inbox", () => {
remoteJid: "999@s.whatsapp.net", remoteJid: "999@s.whatsapp.net",
}, },
message: { conversation: "old message" }, message: { conversation: "old message" },
messageTimestamp: nowSeconds(), messageTimestamp: nowSeconds(-300_000),
pushName: "History Sender", pushName: "History Sender",
}, },
], ],
@ -284,7 +284,7 @@ describe("web monitor inbox", () => {
}, },
]); ]);
// Verify it WAS NOT passed to onMessage // Verify stale history sync messages are not passed to onMessage
expect(onMessage).not.toHaveBeenCalled(); expect(onMessage).not.toHaveBeenCalled();
await listener.close(); await listener.close();

View File

@ -22,4 +22,25 @@ describe("media load options", () => {
localRoots: ["/tmp/workspace"], localRoots: ["/tmp/workspace"],
}); });
}); });
it("preserves custom fetch transport options", () => {
const fetchImpl = (() => Promise.resolve(new Response())) as typeof fetch;
const dispatcherPolicy = { mode: "direct" } as const;
const fallbackDispatcherPolicy = { mode: "env-proxy" } as const;
const shouldRetryFetchError = () => true;
expect(
buildOutboundMediaLoadOptions({
fetchImpl,
dispatcherPolicy,
fallbackDispatcherPolicy,
shouldRetryFetchError,
}),
).toEqual({
fetchImpl,
dispatcherPolicy,
fallbackDispatcherPolicy,
shouldRetryFetchError,
});
});
}); });

View File

@ -1,14 +1,24 @@
import type { PinnedDispatcherPolicy } from "../infra/net/ssrf.js";
import type { FetchLike } from "./fetch.js";
export type OutboundMediaFetchOptions = {
fetchImpl?: FetchLike;
dispatcherPolicy?: PinnedDispatcherPolicy;
fallbackDispatcherPolicy?: PinnedDispatcherPolicy;
shouldRetryFetchError?: (error: unknown) => boolean;
};
export type OutboundMediaLoadParams = { export type OutboundMediaLoadParams = {
maxBytes?: number; maxBytes?: number;
mediaLocalRoots?: readonly string[]; mediaLocalRoots?: readonly string[];
optimizeImages?: boolean; optimizeImages?: boolean;
}; } & OutboundMediaFetchOptions;
export type OutboundMediaLoadOptions = { export type OutboundMediaLoadOptions = {
maxBytes?: number; maxBytes?: number;
localRoots?: readonly string[]; localRoots?: readonly string[];
optimizeImages?: boolean; optimizeImages?: boolean;
}; } & OutboundMediaFetchOptions;
export function resolveOutboundMediaLocalRoots( export function resolveOutboundMediaLocalRoots(
mediaLocalRoots?: readonly string[], mediaLocalRoots?: readonly string[],
@ -24,5 +34,13 @@ export function buildOutboundMediaLoadOptions(
...(params.maxBytes !== undefined ? { maxBytes: params.maxBytes } : {}), ...(params.maxBytes !== undefined ? { maxBytes: params.maxBytes } : {}),
...(localRoots ? { localRoots } : {}), ...(localRoots ? { localRoots } : {}),
...(params.optimizeImages !== undefined ? { optimizeImages: params.optimizeImages } : {}), ...(params.optimizeImages !== undefined ? { optimizeImages: params.optimizeImages } : {}),
...(params.fetchImpl ? { fetchImpl: params.fetchImpl } : {}),
...(params.dispatcherPolicy ? { dispatcherPolicy: params.dispatcherPolicy } : {}),
...(params.fallbackDispatcherPolicy
? { fallbackDispatcherPolicy: params.fallbackDispatcherPolicy }
: {}),
...(params.shouldRetryFetchError
? { shouldRetryFetchError: params.shouldRetryFetchError }
: {}),
}; };
} }