diff --git a/extensions/telegram/src/bot-message-dispatch.ts b/extensions/telegram/src/bot-message-dispatch.ts index a9c0e625508..26a2925470f 100644 --- a/extensions/telegram/src/bot-message-dispatch.ts +++ b/extensions/telegram/src/bot-message-dispatch.ts @@ -35,6 +35,7 @@ import type { TelegramStreamMode } from "./bot/types.js"; import type { TelegramInlineButtons } from "./button-types.js"; import { createTelegramDraftStream } from "./draft-stream.js"; import { shouldSuppressLocalTelegramExecApprovalPrompt } from "./exec-approvals.js"; +import type { TelegramTransport } from "./fetch.js"; import { renderTelegramHtmlText } from "./format.js"; import { type ArchivedPreview, @@ -111,6 +112,7 @@ type DispatchTelegramMessageParams = { textLimit: number; telegramCfg: TelegramAccountConfig; opts: Pick; + telegramTransport?: TelegramTransport; }; type TelegramReasoningLevel = "off" | "on" | "stream"; @@ -148,6 +150,7 @@ export const dispatchTelegramMessage = async ({ textLimit, telegramCfg, opts, + telegramTransport, }: DispatchTelegramMessageParams) => { const { ctxPayload, @@ -457,6 +460,7 @@ export const dispatchTelegramMessage = async ({ runtime, bot, mediaLocalRoots, + telegramTransport, replyToMode, textLimit, thread: threadSpec, diff --git a/extensions/telegram/src/bot-message.ts b/extensions/telegram/src/bot-message.ts index 0a5d44c65db..fc40b91009a 100644 --- a/extensions/telegram/src/bot-message.ts +++ b/extensions/telegram/src/bot-message.ts @@ -10,6 +10,7 @@ import { import { dispatchTelegramMessage } from "./bot-message-dispatch.js"; import type { TelegramBotOptions } from "./bot.js"; import type { TelegramContext, TelegramStreamMode } from "./bot/types.js"; +import type { TelegramTransport } from "./fetch.js"; /** Dependencies injected once when creating the message processor. */ type TelegramMessageProcessorDeps = Omit< @@ -22,6 +23,7 @@ type TelegramMessageProcessorDeps = Omit< streamMode: TelegramStreamMode; textLimit: number; opts: Pick; + telegramTransport?: TelegramTransport; }; export const createTelegramMessageProcessor = (deps: TelegramMessageProcessorDeps) => { @@ -46,6 +48,7 @@ export const createTelegramMessageProcessor = (deps: TelegramMessageProcessorDep streamMode, textLimit, opts, + telegramTransport, } = deps; return async ( @@ -90,6 +93,7 @@ export const createTelegramMessageProcessor = (deps: TelegramMessageProcessorDep textLimit, telegramCfg, opts, + telegramTransport, }); } catch (err) { runtime.error?.(danger(`telegram message processing failed: ${String(err)}`)); diff --git a/extensions/telegram/src/bot-native-commands.ts b/extensions/telegram/src/bot-native-commands.ts index 7dd91f6ad63..c620e428bd6 100644 --- a/extensions/telegram/src/bot-native-commands.ts +++ b/extensions/telegram/src/bot-native-commands.ts @@ -140,6 +140,7 @@ type RegisterTelegramNativeCommandsParams = { ) => { groupConfig?: TelegramGroupConfig; topicConfig?: TelegramTopicConfig }; shouldSkipUpdate: (ctx: TelegramUpdateKeyContext) => boolean; opts: { token: string }; + telegramTransport?: TelegramTransport; }; async function resolveTelegramCommandAuth(params: { @@ -362,6 +363,7 @@ export const registerTelegramNativeCommands = ({ resolveTelegramGroupConfig, shouldSkipUpdate, opts, + telegramTransport, }: RegisterTelegramNativeCommandsParams) => { const boundRoute = nativeEnabled && nativeSkillsEnabled @@ -536,6 +538,7 @@ export const registerTelegramNativeCommands = ({ runtime, bot, mediaLocalRoots: params.mediaLocalRoots, + telegramTransport, replyToMode, textLimit, thread: params.threadSpec, diff --git a/extensions/telegram/src/bot.ts b/extensions/telegram/src/bot.ts index a817e10cbac..35ba22cfd25 100644 --- a/extensions/telegram/src/bot.ts +++ b/extensions/telegram/src/bot.ts @@ -471,6 +471,7 @@ export function createTelegramBot(opts: TelegramBotOptions) { streamMode, textLimit, opts, + telegramTransport, }); registerTelegramNativeCommands({ @@ -491,6 +492,7 @@ export function createTelegramBot(opts: TelegramBotOptions) { resolveTelegramGroupConfig, shouldSkipUpdate, opts, + telegramTransport, }); registerTelegramHandlers({ diff --git a/extensions/telegram/src/bot/delivery.replies.ts b/extensions/telegram/src/bot/delivery.replies.ts index 84d66fec12b..a473d44e4c5 100644 --- a/extensions/telegram/src/bot/delivery.replies.ts +++ b/extensions/telegram/src/bot/delivery.replies.ts @@ -23,6 +23,7 @@ import type { RuntimeEnv } from "../../../../src/runtime.js"; import { loadWebMedia } from "../../../whatsapp/src/media.js"; import type { TelegramInlineButtons } from "../button-types.js"; import { splitTelegramCaption } from "../caption.js"; +import { shouldRetryTelegramIpv4Fallback, type TelegramTransport } from "../fetch.js"; import { markdownToTelegramChunks, markdownToTelegramHtml, @@ -234,6 +235,7 @@ async function deliverMediaReply(params: { thread?: TelegramThreadSpec | null; tableMode?: MarkdownTableMode; mediaLocalRoots?: readonly string[]; + telegramTransport?: TelegramTransport; chunkText: ChunkTextFn; onVoiceRecording?: () => Promise | void; linkPreview?: boolean; @@ -250,7 +252,15 @@ async function deliverMediaReply(params: { const isFirstMedia = first; const media = await loadWebMedia( mediaUrl, - buildOutboundMediaLoadOptions({ mediaLocalRoots: params.mediaLocalRoots }), + buildOutboundMediaLoadOptions({ + mediaLocalRoots: params.mediaLocalRoots, + fetchImpl: params.telegramTransport?.sourceFetch, + dispatcherPolicy: params.telegramTransport?.pinnedDispatcherPolicy, + fallbackDispatcherPolicy: params.telegramTransport?.fallbackPinnedDispatcherPolicy, + shouldRetryFetchError: params.telegramTransport + ? shouldRetryTelegramIpv4Fallback + : undefined, + }), ); const kind = kindFromMime(media.contentType ?? undefined); const isGif = isGifMedia({ @@ -548,6 +558,7 @@ export async function deliverReplies(params: { runtime: RuntimeEnv; bot: Bot; mediaLocalRoots?: readonly string[]; + telegramTransport?: TelegramTransport; replyToMode: ReplyToMode; textLimit: number; thread?: TelegramThreadSpec | null; @@ -651,6 +662,7 @@ export async function deliverReplies(params: { thread: params.thread, tableMode: params.tableMode, mediaLocalRoots: params.mediaLocalRoots, + telegramTransport: params.telegramTransport, chunkText, onVoiceRecording: params.onVoiceRecording, linkPreview: params.linkPreview, diff --git a/extensions/telegram/src/bot/delivery.test.ts b/extensions/telegram/src/bot/delivery.test.ts index a1dce34dceb..045f41a511e 100644 --- a/extensions/telegram/src/bot/delivery.test.ts +++ b/extensions/telegram/src/bot/delivery.test.ts @@ -392,6 +392,44 @@ describe("deliverReplies", () => { }); }); + it("passes Telegram transport to remote media loading", async () => { + const runtime = createRuntime(); + const sendPhoto = vi.fn().mockResolvedValue({ + message_id: 13, + chat: { id: "123" }, + }); + const bot = createBot({ sendPhoto }); + const sourceFetch = vi.fn() as unknown as typeof fetch; + const telegramTransport = { + fetch: sourceFetch, + sourceFetch, + pinnedDispatcherPolicy: { + mode: "explicit-proxy", + proxyUrl: "http://proxy.test:8080", + } as const, + fallbackPinnedDispatcherPolicy: { mode: "direct" } as const, + }; + + mockMediaLoad("photo.jpg", "image/jpeg", "image"); + + await deliverWith({ + replies: [{ mediaUrl: "https://example.com/photo.jpg" }], + runtime, + bot, + telegramTransport, + }); + + expect(loadWebMedia).toHaveBeenCalledWith( + "https://example.com/photo.jpg", + expect.objectContaining({ + fetchImpl: sourceFetch, + dispatcherPolicy: telegramTransport.pinnedDispatcherPolicy, + fallbackDispatcherPolicy: telegramTransport.fallbackPinnedDispatcherPolicy, + shouldRetryFetchError: expect.any(Function), + }), + ); + }); + it("includes link_preview_options when linkPreview is false", async () => { const runtime = createRuntime(); const sendMessage = vi.fn().mockResolvedValue({ diff --git a/extensions/telegram/src/send.proxy.test.ts b/extensions/telegram/src/send.proxy.test.ts index 6c17b33fe38..6cb6bf2a2a5 100644 --- a/extensions/telegram/src/send.proxy.test.ts +++ b/extensions/telegram/src/send.proxy.test.ts @@ -3,6 +3,7 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; const { botApi, botCtorSpy } = vi.hoisted(() => ({ botApi: { sendMessage: vi.fn(), + sendPhoto: vi.fn(), setMessageReaction: vi.fn(), deleteMessage: vi.fn(), }, @@ -17,8 +18,13 @@ const { makeProxyFetch } = vi.hoisted(() => ({ makeProxyFetch: vi.fn(), })); -const { resolveTelegramFetch } = vi.hoisted(() => ({ - resolveTelegramFetch: vi.fn(), +const { resolveTelegramTransport, shouldRetryTelegramIpv4Fallback } = vi.hoisted(() => ({ + resolveTelegramTransport: vi.fn(), + shouldRetryTelegramIpv4Fallback: vi.fn(() => true), +})); + +const { loadWebMedia } = vi.hoisted(() => ({ + loadWebMedia: vi.fn(), })); vi.mock("../../../src/config/config.js", async (importOriginal) => { @@ -33,13 +39,18 @@ vi.mock("./proxy.js", () => ({ makeProxyFetch, })); +vi.mock("../../whatsapp/src/media.js", () => ({ + loadWebMedia, +})); + vi.mock("./fetch.js", () => ({ - resolveTelegramFetch, + resolveTelegramTransport, + shouldRetryTelegramIpv4Fallback, })); vi.mock("grammy", () => ({ Bot: class { - api = botApi; + api = { ...botApi }; catch = vi.fn(); constructor( public token: string, @@ -61,21 +72,30 @@ import { describe("telegram proxy client", () => { const proxyUrl = "http://proxy.test:8080"; - const prepareProxyFetch = () => { + const prepareProxyTransport = () => { const proxyFetch = vi.fn(); - const fetchImpl = vi.fn(); + const clientFetch = vi.fn(); + const sourceFetch = vi.fn(); + const transport = { + fetch: clientFetch as unknown as typeof fetch, + sourceFetch: sourceFetch as unknown as typeof fetch, + pinnedDispatcherPolicy: { mode: "explicit-proxy", proxyUrl } as const, + fallbackPinnedDispatcherPolicy: { mode: "direct" } as const, + }; makeProxyFetch.mockReturnValue(proxyFetch as unknown as typeof fetch); - resolveTelegramFetch.mockReturnValue(fetchImpl as unknown as typeof fetch); - return { proxyFetch, fetchImpl }; + resolveTelegramTransport.mockReturnValue(transport); + return { proxyFetch, transport }; }; - const expectProxyClient = (fetchImpl: ReturnType) => { + const expectProxyClient = (transport: ReturnType["transport"]) => { expect(makeProxyFetch).toHaveBeenCalledWith(proxyUrl); - expect(resolveTelegramFetch).toHaveBeenCalledWith(expect.any(Function), { network: undefined }); + expect(resolveTelegramTransport).toHaveBeenCalledWith(expect.any(Function), { + network: undefined, + }); expect(botCtorSpy).toHaveBeenCalledWith( "tok", expect.objectContaining({ - client: expect.objectContaining({ fetch: fetchImpl }), + client: expect.objectContaining({ fetch: transport.fetch }), }), ); }; @@ -86,16 +106,18 @@ describe("telegram proxy client", () => { botApi.sendMessage.mockResolvedValue({ message_id: 1, chat: { id: "123" } }); botApi.setMessageReaction.mockResolvedValue(undefined); botApi.deleteMessage.mockResolvedValue(true); + botApi.sendPhoto.mockResolvedValue({ message_id: 2, chat: { id: "123" } }); botCtorSpy.mockClear(); loadConfig.mockReturnValue({ channels: { telegram: { accounts: { foo: { proxy: proxyUrl } } } }, }); makeProxyFetch.mockClear(); - resolveTelegramFetch.mockClear(); + resolveTelegramTransport.mockClear(); + loadWebMedia.mockReset(); }); it("reuses cached Telegram client options for repeated sends with same account transport settings", async () => { - const { fetchImpl } = prepareProxyFetch(); + const { transport } = prepareProxyTransport(); vi.stubEnv("VITEST", ""); vi.stubEnv("NODE_ENV", "production"); @@ -103,20 +125,20 @@ describe("telegram proxy client", () => { await sendMessageTelegram("123", "second", { token: "tok", accountId: "foo" }); expect(makeProxyFetch).toHaveBeenCalledTimes(1); - expect(resolveTelegramFetch).toHaveBeenCalledTimes(1); + expect(resolveTelegramTransport).toHaveBeenCalledTimes(1); expect(botCtorSpy).toHaveBeenCalledTimes(2); expect(botCtorSpy).toHaveBeenNthCalledWith( 1, "tok", expect.objectContaining({ - client: expect.objectContaining({ fetch: fetchImpl }), + client: expect.objectContaining({ fetch: transport.fetch }), }), ); expect(botCtorSpy).toHaveBeenNthCalledWith( 2, "tok", expect.objectContaining({ - client: expect.objectContaining({ fetch: fetchImpl }), + client: expect.objectContaining({ fetch: transport.fetch }), }), ); }); @@ -135,10 +157,36 @@ describe("telegram proxy client", () => { run: () => deleteMessageTelegram("123", "456", { token: "tok", accountId: "foo" }), }, ])("uses proxy fetch for $name", async (testCase) => { - const { fetchImpl } = prepareProxyFetch(); + const { transport } = prepareProxyTransport(); await testCase.run(); - expectProxyClient(fetchImpl); + expectProxyClient(transport); + }); + + it("uses proxy-aware transport for outbound media prefetch", async () => { + const { transport } = prepareProxyTransport(); + loadWebMedia.mockResolvedValueOnce({ + buffer: Buffer.from("image"), + contentType: "image/jpeg", + fileName: "photo.jpg", + }); + + await sendMessageTelegram("123", "caption", { + token: "tok", + accountId: "foo", + mediaUrl: "https://example.com/photo.jpg", + }); + + expectProxyClient(transport); + expect(loadWebMedia).toHaveBeenCalledWith( + "https://example.com/photo.jpg", + expect.objectContaining({ + fetchImpl: transport.sourceFetch, + dispatcherPolicy: transport.pinnedDispatcherPolicy, + fallbackDispatcherPolicy: transport.fallbackPinnedDispatcherPolicy, + shouldRetryFetchError: expect.any(Function), + }), + ); }); }); diff --git a/extensions/telegram/src/send.ts b/extensions/telegram/src/send.ts index e7d2c48e9fc..4509be33a6b 100644 --- a/extensions/telegram/src/send.ts +++ b/extensions/telegram/src/send.ts @@ -25,7 +25,11 @@ import { withTelegramApiErrorLogging } from "./api-logging.js"; import { buildTelegramThreadParams, buildTypingThreadParams } from "./bot/helpers.js"; import type { TelegramInlineButtons } from "./button-types.js"; import { splitTelegramCaption } from "./caption.js"; -import { resolveTelegramFetch } from "./fetch.js"; +import { + resolveTelegramTransport, + shouldRetryTelegramIpv4Fallback, + type TelegramTransport, +} from "./fetch.js"; import { renderTelegramHtmlText, splitTelegramHtmlChunks } from "./format.js"; import { isRecoverableTelegramNetworkError, @@ -159,10 +163,12 @@ const CHAT_NOT_FOUND_RE = /400: Bad Request: chat not found/i; const sendLogger = createSubsystemLogger("telegram/send"); const diagLogger = createSubsystemLogger("telegram/diagnostic"); const telegramClientOptionsCache = new Map(); +const telegramTransportCache = new Map(); const MAX_TELEGRAM_CLIENT_OPTIONS_CACHE_SIZE = 64; export function resetTelegramClientOptionsCacheForTests(): void { telegramClientOptionsCache.clear(); + telegramTransportCache.clear(); } function createTelegramHttpLogger(cfg: ReturnType) { @@ -183,18 +189,22 @@ function shouldUseTelegramClientOptionsCache(): boolean { return !process.env.VITEST && process.env.NODE_ENV !== "test"; } +function buildTelegramTransportCacheKey(account: ResolvedTelegramAccount): string { + const proxyKey = account.config.proxy?.trim() ?? ""; + const autoSelectFamily = account.config.network?.autoSelectFamily; + const autoSelectFamilyKey = + typeof autoSelectFamily === "boolean" ? String(autoSelectFamily) : "default"; + const dnsResultOrderKey = account.config.network?.dnsResultOrder ?? "default"; + return `${account.accountId}::${proxyKey}::${autoSelectFamilyKey}::${dnsResultOrderKey}`; +} + function buildTelegramClientOptionsCacheKey(params: { account: ResolvedTelegramAccount; timeoutSeconds?: number; }): string { - const proxyKey = params.account.config.proxy?.trim() ?? ""; - const autoSelectFamily = params.account.config.network?.autoSelectFamily; - const autoSelectFamilyKey = - typeof autoSelectFamily === "boolean" ? String(autoSelectFamily) : "default"; - const dnsResultOrderKey = params.account.config.network?.dnsResultOrder ?? "default"; const timeoutSecondsKey = typeof params.timeoutSeconds === "number" ? String(params.timeoutSeconds) : "default"; - return `${params.account.accountId}::${proxyKey}::${autoSelectFamilyKey}::${dnsResultOrderKey}::${timeoutSecondsKey}`; + return `${buildTelegramTransportCacheKey(params.account)}::${timeoutSecondsKey}`; } function setCachedTelegramClientOptions( @@ -211,8 +221,44 @@ function setCachedTelegramClientOptions( return clientOptions; } +function setCachedTelegramTransport( + cacheKey: string, + transport: TelegramTransport, +): TelegramTransport { + telegramTransportCache.set(cacheKey, transport); + if (telegramTransportCache.size > MAX_TELEGRAM_CLIENT_OPTIONS_CACHE_SIZE) { + const oldestKey = telegramTransportCache.keys().next().value; + if (oldestKey !== undefined) { + telegramTransportCache.delete(oldestKey); + } + } + return transport; +} + +function resolveTelegramTransportForAccount(account: ResolvedTelegramAccount): TelegramTransport { + const cacheEnabled = shouldUseTelegramClientOptionsCache(); + const cacheKey = cacheEnabled ? buildTelegramTransportCacheKey(account) : null; + if (cacheKey && telegramTransportCache.has(cacheKey)) { + const cached = telegramTransportCache.get(cacheKey); + if (cached) { + return cached; + } + } + + const proxyUrl = account.config.proxy?.trim(); + const proxyFetch = proxyUrl ? makeProxyFetch(proxyUrl) : undefined; + const transport = resolveTelegramTransport(proxyFetch, { + network: account.config.network, + }); + if (cacheKey) { + return setCachedTelegramTransport(cacheKey, transport); + } + return transport; +} + function resolveTelegramClientOptions( account: ResolvedTelegramAccount, + transport: TelegramTransport = resolveTelegramTransportForAccount(account), ): ApiClientOptions | undefined { const timeoutSeconds = typeof account.config.timeoutSeconds === "number" && @@ -231,15 +277,12 @@ function resolveTelegramClientOptions( return telegramClientOptionsCache.get(cacheKey); } - const proxyUrl = account.config.proxy?.trim(); - const proxyFetch = proxyUrl ? makeProxyFetch(proxyUrl) : undefined; - const fetchImpl = resolveTelegramFetch(proxyFetch, { - network: account.config.network, - }); const clientOptions = - fetchImpl || timeoutSeconds + transport.fetch || timeoutSeconds ? { - ...(fetchImpl ? { fetch: fetchImpl as unknown as ApiClientOptions["fetch"] } : {}), + ...(transport.fetch + ? { fetch: transport.fetch as unknown as ApiClientOptions["fetch"] } + : {}), ...(timeoutSeconds ? { timeoutSeconds } : {}), } : undefined; @@ -426,6 +469,7 @@ type TelegramApiContext = { cfg: ReturnType; account: ResolvedTelegramAccount; api: TelegramApi; + telegramTransport: TelegramTransport; }; function resolveTelegramApiContext(opts: { @@ -440,9 +484,10 @@ function resolveTelegramApiContext(opts: { accountId: opts.accountId, }); const token = resolveToken(opts.token, account); - const client = resolveTelegramClientOptions(account); + const telegramTransport = resolveTelegramTransportForAccount(account); + const client = resolveTelegramClientOptions(account, telegramTransport); const api = (opts.api ?? new Bot(token, client ? { client } : undefined).api) as TelegramApi; - return { cfg, account, api }; + return { cfg, account, api, telegramTransport }; } type TelegramRequestWithDiag = ( @@ -592,7 +637,7 @@ export async function sendMessageTelegram( text: string, opts: TelegramSendOpts = {}, ): Promise { - const { cfg, account, api } = resolveTelegramApiContext(opts); + const { cfg, account, api, telegramTransport } = resolveTelegramApiContext(opts); const target = parseTelegramTarget(to); const chatId = await resolveAndPersistChatId({ cfg, @@ -766,6 +811,10 @@ export async function sendMessageTelegram( maxBytes: mediaMaxBytes, mediaLocalRoots: opts.mediaLocalRoots, optimizeImages: opts.forceDocument ? false : undefined, + fetchImpl: telegramTransport.sourceFetch, + dispatcherPolicy: telegramTransport.pinnedDispatcherPolicy, + fallbackDispatcherPolicy: telegramTransport.fallbackPinnedDispatcherPolicy, + shouldRetryFetchError: shouldRetryTelegramIpv4Fallback, }), ); const kind = kindFromMime(media.contentType ?? undefined); diff --git a/extensions/whatsapp/src/media.ts b/extensions/whatsapp/src/media.ts index 2b297ef8907..23d59df82fd 100644 --- a/extensions/whatsapp/src/media.ts +++ b/extensions/whatsapp/src/media.ts @@ -3,9 +3,9 @@ import path from "node:path"; import { fileURLToPath } from "node:url"; import { logVerbose, shouldLogVerbose } from "../../../src/globals.js"; import { SafeOpenError, readLocalFileSafely } from "../../../src/infra/fs-safe.js"; -import type { SsrFPolicy } from "../../../src/infra/net/ssrf.js"; +import type { PinnedDispatcherPolicy, SsrFPolicy } from "../../../src/infra/net/ssrf.js"; import { type MediaKind, maxBytesForKind } from "../../../src/media/constants.js"; -import { fetchRemoteMedia } from "../../../src/media/fetch.js"; +import { fetchRemoteMedia, type FetchLike } from "../../../src/media/fetch.js"; import { convertHeicToJpeg, hasAlphaChannel, @@ -27,6 +27,10 @@ type WebMediaOptions = { maxBytes?: number; optimizeImages?: boolean; ssrfPolicy?: SsrFPolicy; + fetchImpl?: FetchLike; + dispatcherPolicy?: PinnedDispatcherPolicy; + fallbackDispatcherPolicy?: PinnedDispatcherPolicy; + shouldRetryFetchError?: (error: unknown) => boolean; /** Allowed root directories for local path reads. "any" is deprecated; prefer sandboxValidated + readFile. */ localRoots?: readonly string[] | "any"; /** Caller already validated the local path (sandbox/other guards); requires readFile override. */ @@ -36,7 +40,14 @@ type WebMediaOptions = { function resolveWebMediaOptions(params: { maxBytesOrOptions?: number | WebMediaOptions; - options?: { ssrfPolicy?: SsrFPolicy; localRoots?: readonly string[] | "any" }; + options?: { + ssrfPolicy?: SsrFPolicy; + fetchImpl?: FetchLike; + dispatcherPolicy?: PinnedDispatcherPolicy; + fallbackDispatcherPolicy?: PinnedDispatcherPolicy; + shouldRetryFetchError?: (error: unknown) => boolean; + localRoots?: readonly string[] | "any"; + }; optimizeImages: boolean; }): WebMediaOptions { if (typeof params.maxBytesOrOptions === "number" || params.maxBytesOrOptions === undefined) { @@ -44,6 +55,10 @@ function resolveWebMediaOptions(params: { maxBytes: params.maxBytesOrOptions, optimizeImages: params.optimizeImages, ssrfPolicy: params.options?.ssrfPolicy, + fetchImpl: params.options?.fetchImpl, + dispatcherPolicy: params.options?.dispatcherPolicy, + fallbackDispatcherPolicy: params.options?.fallbackDispatcherPolicy, + shouldRetryFetchError: params.options?.shouldRetryFetchError, localRoots: params.options?.localRoots, }; } @@ -238,6 +253,10 @@ async function loadWebMediaInternal( maxBytes, optimizeImages = true, ssrfPolicy, + fetchImpl, + dispatcherPolicy, + fallbackDispatcherPolicy, + shouldRetryFetchError, localRoots, sandboxValidated = false, readFile: readFileOverride, @@ -331,7 +350,15 @@ async function loadWebMediaInternal( : optimizeImages ? Math.max(maxBytes, defaultFetchCap) : maxBytes; - const fetched = await fetchRemoteMedia({ url: mediaUrl, maxBytes: fetchCap, ssrfPolicy }); + const fetched = await fetchRemoteMedia({ + url: mediaUrl, + fetchImpl, + maxBytes: fetchCap, + ssrfPolicy, + dispatcherPolicy, + fallbackDispatcherPolicy, + shouldRetryFetchError, + }); const { buffer, contentType, fileName } = fetched; const kind = kindFromMime(contentType); return await clampAndFinalize({ buffer, contentType, kind, fileName }); @@ -404,7 +431,14 @@ async function loadWebMediaInternal( export async function loadWebMedia( mediaUrl: string, maxBytesOrOptions?: number | WebMediaOptions, - options?: { ssrfPolicy?: SsrFPolicy; localRoots?: readonly string[] | "any" }, + options?: { + ssrfPolicy?: SsrFPolicy; + fetchImpl?: FetchLike; + dispatcherPolicy?: PinnedDispatcherPolicy; + fallbackDispatcherPolicy?: PinnedDispatcherPolicy; + shouldRetryFetchError?: (error: unknown) => boolean; + localRoots?: readonly string[] | "any"; + }, ): Promise { return await loadWebMediaInternal( mediaUrl, @@ -415,7 +449,14 @@ export async function loadWebMedia( export async function loadWebMediaRaw( mediaUrl: string, maxBytesOrOptions?: number | WebMediaOptions, - options?: { ssrfPolicy?: SsrFPolicy; localRoots?: readonly string[] | "any" }, + options?: { + ssrfPolicy?: SsrFPolicy; + fetchImpl?: FetchLike; + dispatcherPolicy?: PinnedDispatcherPolicy; + fallbackDispatcherPolicy?: PinnedDispatcherPolicy; + shouldRetryFetchError?: (error: unknown) => boolean; + localRoots?: readonly string[] | "any"; + }, ): Promise { return await loadWebMediaInternal( mediaUrl, diff --git a/extensions/whatsapp/src/monitor-inbox.allows-messages-from-senders-allowfrom-list.test.ts b/extensions/whatsapp/src/monitor-inbox.allows-messages-from-senders-allowfrom-list.test.ts index 545a010ed50..c0902b3d113 100644 --- a/extensions/whatsapp/src/monitor-inbox.allows-messages-from-senders-allowfrom-list.test.ts +++ b/extensions/whatsapp/src/monitor-inbox.allows-messages-from-senders-allowfrom-list.test.ts @@ -252,7 +252,7 @@ describe("web monitor inbox", () => { }); }); - it("handles append messages by marking them read but skipping auto-reply", async () => { + it("handles stale append messages by marking them read but skipping auto-reply", async () => { const { onMessage, listener, sock } = await openInboxMonitor(); const upsert = { @@ -265,7 +265,7 @@ describe("web monitor inbox", () => { remoteJid: "999@s.whatsapp.net", }, message: { conversation: "old message" }, - messageTimestamp: nowSeconds(), + messageTimestamp: nowSeconds(-300_000), pushName: "History Sender", }, ], @@ -284,7 +284,7 @@ describe("web monitor inbox", () => { }, ]); - // Verify it WAS NOT passed to onMessage + // Verify stale history sync messages are not passed to onMessage expect(onMessage).not.toHaveBeenCalled(); await listener.close(); diff --git a/src/media/load-options.test.ts b/src/media/load-options.test.ts index 52e61e59cc7..1da7a0430b4 100644 --- a/src/media/load-options.test.ts +++ b/src/media/load-options.test.ts @@ -22,4 +22,25 @@ describe("media load options", () => { localRoots: ["/tmp/workspace"], }); }); + + it("preserves custom fetch transport options", () => { + const fetchImpl = (() => Promise.resolve(new Response())) as typeof fetch; + const dispatcherPolicy = { mode: "direct" } as const; + const fallbackDispatcherPolicy = { mode: "env-proxy" } as const; + const shouldRetryFetchError = () => true; + + expect( + buildOutboundMediaLoadOptions({ + fetchImpl, + dispatcherPolicy, + fallbackDispatcherPolicy, + shouldRetryFetchError, + }), + ).toEqual({ + fetchImpl, + dispatcherPolicy, + fallbackDispatcherPolicy, + shouldRetryFetchError, + }); + }); }); diff --git a/src/media/load-options.ts b/src/media/load-options.ts index da4545ae10e..a82a5f40ca0 100644 --- a/src/media/load-options.ts +++ b/src/media/load-options.ts @@ -1,14 +1,24 @@ +import type { PinnedDispatcherPolicy } from "../infra/net/ssrf.js"; +import type { FetchLike } from "./fetch.js"; + +export type OutboundMediaFetchOptions = { + fetchImpl?: FetchLike; + dispatcherPolicy?: PinnedDispatcherPolicy; + fallbackDispatcherPolicy?: PinnedDispatcherPolicy; + shouldRetryFetchError?: (error: unknown) => boolean; +}; + export type OutboundMediaLoadParams = { maxBytes?: number; mediaLocalRoots?: readonly string[]; optimizeImages?: boolean; -}; +} & OutboundMediaFetchOptions; export type OutboundMediaLoadOptions = { maxBytes?: number; localRoots?: readonly string[]; optimizeImages?: boolean; -}; +} & OutboundMediaFetchOptions; export function resolveOutboundMediaLocalRoots( mediaLocalRoots?: readonly string[], @@ -24,5 +34,13 @@ export function buildOutboundMediaLoadOptions( ...(params.maxBytes !== undefined ? { maxBytes: params.maxBytes } : {}), ...(localRoots ? { localRoots } : {}), ...(params.optimizeImages !== undefined ? { optimizeImages: params.optimizeImages } : {}), + ...(params.fetchImpl ? { fetchImpl: params.fetchImpl } : {}), + ...(params.dispatcherPolicy ? { dispatcherPolicy: params.dispatcherPolicy } : {}), + ...(params.fallbackDispatcherPolicy + ? { fallbackDispatcherPolicy: params.fallbackDispatcherPolicy } + : {}), + ...(params.shouldRetryFetchError + ? { shouldRetryFetchError: params.shouldRetryFetchError } + : {}), }; }