From e1d963ed2ec844ce652512957df2b78af1d2eebd Mon Sep 17 00:00:00 2001 From: Kenny Xie <77253505+aquaright1@users.noreply.github.com> Date: Tue, 31 Mar 2026 22:29:58 -0700 Subject: [PATCH] fix: bound discord inbound media downloads (#58593) (thanks @aquaright1) * fix(discord): bound attachment downloads by timeout * fix(ci): unblock check and clarify discord timeouts * fix: bound discord inbound media downloads (#58593) (thanks @aquaright1) --------- Co-authored-by: Ayaan Zaidi --- CHANGELOG.md | 1 + .../src/monitor/message-handler.process.ts | 16 +- .../discord/src/monitor/message-utils.test.ts | 215 +++++++++++++++++- .../discord/src/monitor/message-utils.ts | 110 ++++++++- extensions/discord/src/monitor/timeouts.ts | 2 + 5 files changed, 323 insertions(+), 21 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 584fe9241cd..9bcae1d7778 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ Docs: https://docs.openclaw.ai ### Fixes +- Discord/inbound media: pass Discord attachment and sticker downloads through the shared idle-timeout and worker-abort path so slow or stuck inbound media fetches stop hanging message processing. (#58593) Thanks @aquaright1 - Telegram/local Bot API: preserve media MIME types for absolute-path downloads so local audio files still trigger transcription and other MIME-based handling. (#54603) Thanks @jzakirov - Tasks/gateway: re-check the current task record before maintenance marks runs lost or prunes them, so a task heartbeat or cleanup update that lands during a sweep no longer gets overwritten by stale snapshot state. - Tasks/gateway: keep the task registry maintenance sweep from stalling the gateway event loop under synchronous SQLite pressure, so upgraded gateways stop hanging about a minute after startup. (#58670) Thanks @openperf diff --git a/extensions/discord/src/monitor/message-handler.process.ts b/extensions/discord/src/monitor/message-handler.process.ts index a33e964dcf8..6b343449757 100644 --- a/extensions/discord/src/monitor/message-handler.process.ts +++ b/extensions/discord/src/monitor/message-handler.process.ts @@ -55,6 +55,10 @@ import { import { buildDirectLabel, buildGuildLabel, resolveReplyContext } from "./reply-context.js"; import { deliverDiscordReply } from "./reply-delivery.js"; import { resolveDiscordAutoThreadReplyPlan, resolveDiscordThreadStarter } from "./threading.js"; +import { + DISCORD_ATTACHMENT_IDLE_TIMEOUT_MS, + DISCORD_ATTACHMENT_TOTAL_TIMEOUT_MS, +} from "./timeouts.js"; import { sendTyping } from "./typing.js"; function sleep(ms: number): Promise { @@ -130,15 +134,21 @@ export async function processDiscordMessage( } const ssrfPolicy = cfg.browser?.ssrfPolicy; - const mediaList = await resolveMediaList(message, mediaMaxBytes, discordRestFetch, ssrfPolicy); + const mediaResolveOptions = { + fetchImpl: discordRestFetch, + ssrfPolicy, + readIdleTimeoutMs: DISCORD_ATTACHMENT_IDLE_TIMEOUT_MS, + totalTimeoutMs: DISCORD_ATTACHMENT_TOTAL_TIMEOUT_MS, + abortSignal, + }; + const mediaList = await resolveMediaList(message, mediaMaxBytes, mediaResolveOptions); if (isProcessAborted(abortSignal)) { return; } const forwardedMediaList = await resolveForwardedMediaList( message, mediaMaxBytes, - discordRestFetch, - ssrfPolicy, + mediaResolveOptions, ); if (isProcessAborted(abortSignal)) { return; diff --git a/extensions/discord/src/monitor/message-utils.test.ts b/extensions/discord/src/monitor/message-utils.test.ts index 4ed4142a616..8990d6f6ac7 100644 --- a/extensions/discord/src/monitor/message-utils.test.ts +++ b/extensions/discord/src/monitor/message-utils.test.ts @@ -69,6 +69,8 @@ function expectSinglePngDownload(params: { filePathHint?: string; maxBytes?: number; fetchImpl?: unknown; + readIdleTimeoutMs?: number; + requestInit?: { signal?: AbortSignal }; ssrfPolicy?: unknown; }; expect(call).toMatchObject({ @@ -216,7 +218,7 @@ describe("resolveForwardedMediaList", () => { }, }), 512, - proxyFetch, + { fetchImpl: proxyFetch }, ); expect(fetchRemoteMedia).toHaveBeenCalledWith( @@ -298,6 +300,67 @@ describe("resolveForwardedMediaList", () => { expect(result).toEqual([]); expect(fetchRemoteMedia).not.toHaveBeenCalled(); }); + + it("passes readIdleTimeoutMs to forwarded attachment downloads", async () => { + const attachment = { + id: "att-timeout-forwarded", + url: "https://cdn.discordapp.com/attachments/1/forwarded-timeout.png", + filename: "forwarded-timeout.png", + content_type: "image/png", + }; + fetchRemoteMedia.mockResolvedValueOnce({ + buffer: Buffer.from("image"), + contentType: "image/png", + }); + saveMediaBuffer.mockResolvedValueOnce({ + path: "/tmp/forwarded-timeout.png", + contentType: "image/png", + }); + + await resolveForwardedMediaList( + asMessage({ + rawData: { + message_snapshots: [{ message: { attachments: [attachment] } }], + }, + }), + 512, + { readIdleTimeoutMs: 60_000 }, + ); + + expect(fetchRemoteMedia).toHaveBeenCalledWith( + expect.objectContaining({ readIdleTimeoutMs: 60_000 }), + ); + }); + + it("passes readIdleTimeoutMs to forwarded sticker downloads", async () => { + const sticker = { + id: "sticker-timeout-forwarded", + name: "timeout-forwarded", + format_type: StickerFormatType.PNG, + }; + fetchRemoteMedia.mockResolvedValueOnce({ + buffer: Buffer.from("sticker"), + contentType: "image/png", + }); + saveMediaBuffer.mockResolvedValueOnce({ + path: "/tmp/forwarded-sticker-timeout.png", + contentType: "image/png", + }); + + await resolveForwardedMediaList( + asMessage({ + rawData: { + message_snapshots: [{ message: { sticker_items: [sticker] } }], + }, + }), + 512, + { readIdleTimeoutMs: 60_000 }, + ); + + expect(fetchRemoteMedia).toHaveBeenCalledWith( + expect.objectContaining({ readIdleTimeoutMs: 60_000 }), + ); + }); }); describe("resolveMediaList", () => { @@ -358,7 +421,7 @@ describe("resolveMediaList", () => { stickers: [sticker], }), 512, - proxyFetch, + { fetchImpl: proxyFetch }, ); expect(fetchRemoteMedia).toHaveBeenCalledWith( @@ -486,6 +549,145 @@ describe("resolveMediaList", () => { }, ]); }); + + it("passes readIdleTimeoutMs to fetchRemoteMedia for attachments", async () => { + const attachment = { + id: "att-timeout", + url: "https://cdn.discordapp.com/attachments/1/timeout.png", + filename: "timeout.png", + content_type: "image/png", + }; + fetchRemoteMedia.mockResolvedValueOnce({ + buffer: Buffer.from("image"), + contentType: "image/png", + }); + saveMediaBuffer.mockResolvedValueOnce({ + path: "/tmp/timeout.png", + contentType: "image/png", + }); + + await resolveMediaList( + asMessage({ + attachments: [attachment], + }), + 512, + { readIdleTimeoutMs: 60_000 }, + ); + + expect(fetchRemoteMedia).toHaveBeenCalledWith( + expect.objectContaining({ readIdleTimeoutMs: 60_000 }), + ); + }); + + it("passes readIdleTimeoutMs to fetchRemoteMedia for stickers", async () => { + const sticker = { + id: "sticker-timeout", + name: "timeout", + format_type: StickerFormatType.PNG, + }; + fetchRemoteMedia.mockResolvedValueOnce({ + buffer: Buffer.from("sticker"), + contentType: "image/png", + }); + saveMediaBuffer.mockResolvedValueOnce({ + path: "/tmp/sticker-timeout.png", + contentType: "image/png", + }); + + await resolveMediaList( + asMessage({ + stickers: [sticker], + }), + 512, + { readIdleTimeoutMs: 60_000 }, + ); + + expect(fetchRemoteMedia).toHaveBeenCalledWith( + expect.objectContaining({ readIdleTimeoutMs: 60_000 }), + ); + }); + + it("times out slow attachment downloads and returns fallback", async () => { + const attachment = { + id: "att-total-timeout", + url: "https://cdn.discordapp.com/attachments/1/slow.png", + filename: "slow.png", + content_type: "image/png", + }; + vi.useFakeTimers(); + fetchRemoteMedia.mockImplementation( + () => + new Promise(() => { + // never resolves + }), + ); + + try { + const resultPromise = resolveMediaList( + asMessage({ + attachments: [attachment], + }), + 512, + { totalTimeoutMs: 100 }, + ); + + await vi.advanceTimersByTimeAsync(100); + + await expect(resultPromise).resolves.toEqual([ + { + path: attachment.url, + contentType: "image/png", + placeholder: "", + }, + ]); + } finally { + vi.useRealTimers(); + } + }); + + it("passes abortSignal to fetchRemoteMedia and falls back when aborted", async () => { + const attachment = { + id: "att-abort", + url: "https://cdn.discordapp.com/attachments/1/abort.png", + filename: "abort.png", + content_type: "image/png", + }; + const abortController = new AbortController(); + fetchRemoteMedia.mockImplementationOnce( + (params: { requestInit?: { signal?: AbortSignal } }) => + new Promise((_, reject) => { + const signal = params.requestInit?.signal; + const abortError = Object.assign(new Error("aborted"), { name: "AbortError" }); + if (signal?.aborted) { + reject(abortError); + return; + } + signal?.addEventListener("abort", () => reject(abortError), { once: true }); + }), + ); + + const resultPromise = resolveMediaList( + asMessage({ + attachments: [attachment], + }), + 512, + { abortSignal: abortController.signal }, + ); + abortController.abort(); + + await expect(resultPromise).resolves.toEqual([ + { + path: attachment.url, + contentType: "image/png", + placeholder: "", + }, + ]); + expect(fetchRemoteMedia).toHaveBeenCalledWith( + expect.objectContaining({ + requestInit: expect.objectContaining({ signal: abortController.signal }), + }), + ); + }); }); describe("Discord media SSRF policy", () => { @@ -530,11 +732,12 @@ describe("Discord media SSRF policy", () => { attachments: [{ id: "b1", url: "https://cdn.discordapp.com/b.png", filename: "b.png" }], }), 1024, - undefined, { - allowPrivateNetwork: true, - hostnameAllowlist: ["assets.example.com"], - allowedHostnames: ["assets.example.com"], + ssrfPolicy: { + allowPrivateNetwork: true, + hostnameAllowlist: ["assets.example.com"], + allowedHostnames: ["assets.example.com"], + }, }, ); diff --git a/extensions/discord/src/monitor/message-utils.ts b/extensions/discord/src/monitor/message-utils.ts index b4949f08424..8de3afbc126 100644 --- a/extensions/discord/src/monitor/message-utils.ts +++ b/extensions/discord/src/monitor/message-utils.ts @@ -5,6 +5,7 @@ import { saveMediaBuffer } from "openclaw/plugin-sdk/media-runtime"; import { buildMediaPayload } from "openclaw/plugin-sdk/reply-payload"; import { logVerbose } from "openclaw/plugin-sdk/runtime-env"; import type { SsrFPolicy } from "openclaw/plugin-sdk/ssrf-runtime"; +import { mergeAbortSignals } from "./timeouts.js"; const DISCORD_CDN_HOSTNAMES = [ "cdn.discordapp.com", @@ -59,6 +60,14 @@ export type DiscordMediaInfo = { placeholder: string; }; +type DiscordMediaResolveOptions = { + fetchImpl?: FetchLike; + ssrfPolicy?: SsrFPolicy; + readIdleTimeoutMs?: number; + totalTimeoutMs?: number; + abortSignal?: AbortSignal; +}; + export type DiscordChannelInfo = { type: ChannelType; name?: string; @@ -209,26 +218,31 @@ export function hasDiscordMessageStickers(message: Message): boolean { export async function resolveMediaList( message: Message, maxBytes: number, - fetchImpl?: FetchLike, - ssrfPolicy?: SsrFPolicy, + options?: DiscordMediaResolveOptions, ): Promise { const out: DiscordMediaInfo[] = []; - const resolvedSsrFPolicy = resolveDiscordMediaSsrFPolicy(ssrfPolicy); + const resolvedSsrFPolicy = resolveDiscordMediaSsrFPolicy(options?.ssrfPolicy); await appendResolvedMediaFromAttachments({ attachments: message.attachments ?? [], maxBytes, out, errorPrefix: "discord: failed to download attachment", - fetchImpl, + fetchImpl: options?.fetchImpl, ssrfPolicy: resolvedSsrFPolicy, + readIdleTimeoutMs: options?.readIdleTimeoutMs, + totalTimeoutMs: options?.totalTimeoutMs, + abortSignal: options?.abortSignal, }); await appendResolvedMediaFromStickers({ stickers: resolveDiscordMessageStickers(message), maxBytes, out, errorPrefix: "discord: failed to download sticker", - fetchImpl, + fetchImpl: options?.fetchImpl, ssrfPolicy: resolvedSsrFPolicy, + readIdleTimeoutMs: options?.readIdleTimeoutMs, + totalTimeoutMs: options?.totalTimeoutMs, + abortSignal: options?.abortSignal, }); return out; } @@ -236,36 +250,96 @@ export async function resolveMediaList( export async function resolveForwardedMediaList( message: Message, maxBytes: number, - fetchImpl?: FetchLike, - ssrfPolicy?: SsrFPolicy, + options?: DiscordMediaResolveOptions, ): Promise { const snapshots = resolveDiscordMessageSnapshots(message); if (snapshots.length === 0) { return []; } const out: DiscordMediaInfo[] = []; - const resolvedSsrFPolicy = resolveDiscordMediaSsrFPolicy(ssrfPolicy); + const resolvedSsrFPolicy = resolveDiscordMediaSsrFPolicy(options?.ssrfPolicy); for (const snapshot of snapshots) { await appendResolvedMediaFromAttachments({ attachments: snapshot.message?.attachments, maxBytes, out, errorPrefix: "discord: failed to download forwarded attachment", - fetchImpl, + fetchImpl: options?.fetchImpl, ssrfPolicy: resolvedSsrFPolicy, + readIdleTimeoutMs: options?.readIdleTimeoutMs, + totalTimeoutMs: options?.totalTimeoutMs, + abortSignal: options?.abortSignal, }); await appendResolvedMediaFromStickers({ stickers: snapshot.message ? resolveDiscordSnapshotStickers(snapshot.message) : [], maxBytes, out, errorPrefix: "discord: failed to download forwarded sticker", - fetchImpl, + fetchImpl: options?.fetchImpl, ssrfPolicy: resolvedSsrFPolicy, + readIdleTimeoutMs: options?.readIdleTimeoutMs, + totalTimeoutMs: options?.totalTimeoutMs, + abortSignal: options?.abortSignal, }); } return out; } +async function fetchDiscordMedia(params: { + url: string; + filePathHint: string; + maxBytes: number; + fetchImpl?: FetchLike; + ssrfPolicy?: SsrFPolicy; + readIdleTimeoutMs?: number; + totalTimeoutMs?: number; + abortSignal?: AbortSignal; +}) { + // `totalTimeoutMs` is enforced per individual attachment or sticker fetch. + // The inbound worker's abort signal remains the outer bound for the message. + const timeoutAbortController = params.totalTimeoutMs ? new AbortController() : undefined; + const signal = mergeAbortSignals([params.abortSignal, timeoutAbortController?.signal]); + let timedOut = false; + let timeoutHandle: ReturnType | null = null; + + const fetchPromise = fetchRemoteMedia({ + url: params.url, + filePathHint: params.filePathHint, + maxBytes: params.maxBytes, + fetchImpl: params.fetchImpl, + ssrfPolicy: params.ssrfPolicy, + readIdleTimeoutMs: params.readIdleTimeoutMs, + ...(signal ? { requestInit: { signal } } : {}), + }).catch((error) => { + if (timedOut) { + // After the timeout wins the race we abort the underlying fetch and keep + // this branch pending so the later AbortError does not surface as an + // unhandled rejection after Promise.race has already settled. + return new Promise(() => {}); + } + throw error; + }); + + try { + if (!params.totalTimeoutMs) { + return await fetchPromise; + } + const timeoutPromise = new Promise((_, reject) => { + timeoutHandle = setTimeout(() => { + timedOut = true; + timeoutAbortController?.abort(); + reject(new Error(`discord media download timed out after ${params.totalTimeoutMs}ms`)); + }, params.totalTimeoutMs); + timeoutHandle.unref?.(); + }); + return await Promise.race([fetchPromise, timeoutPromise]); + } finally { + if (timeoutHandle) { + clearTimeout(timeoutHandle); + } + } +} + async function appendResolvedMediaFromAttachments(params: { attachments?: APIAttachment[] | null; maxBytes: number; @@ -273,6 +347,9 @@ async function appendResolvedMediaFromAttachments(params: { errorPrefix: string; fetchImpl?: FetchLike; ssrfPolicy?: SsrFPolicy; + readIdleTimeoutMs?: number; + totalTimeoutMs?: number; + abortSignal?: AbortSignal; }) { const attachments = params.attachments; if (!attachments || attachments.length === 0) { @@ -280,12 +357,15 @@ async function appendResolvedMediaFromAttachments(params: { } for (const attachment of attachments) { try { - const fetched = await fetchRemoteMedia({ + const fetched = await fetchDiscordMedia({ url: attachment.url, filePathHint: attachment.filename ?? attachment.url, maxBytes: params.maxBytes, fetchImpl: params.fetchImpl, ssrfPolicy: params.ssrfPolicy, + readIdleTimeoutMs: params.readIdleTimeoutMs, + totalTimeoutMs: params.totalTimeoutMs, + abortSignal: params.abortSignal, }); const saved = await saveMediaBuffer( fetched.buffer, @@ -383,6 +463,9 @@ async function appendResolvedMediaFromStickers(params: { errorPrefix: string; fetchImpl?: FetchLike; ssrfPolicy?: SsrFPolicy; + readIdleTimeoutMs?: number; + totalTimeoutMs?: number; + abortSignal?: AbortSignal; }) { const stickers = params.stickers; if (!stickers || stickers.length === 0) { @@ -393,12 +476,15 @@ async function appendResolvedMediaFromStickers(params: { let lastError: unknown; for (const candidate of candidates) { try { - const fetched = await fetchRemoteMedia({ + const fetched = await fetchDiscordMedia({ url: candidate.url, filePathHint: candidate.fileName, maxBytes: params.maxBytes, fetchImpl: params.fetchImpl, ssrfPolicy: params.ssrfPolicy, + readIdleTimeoutMs: params.readIdleTimeoutMs, + totalTimeoutMs: params.totalTimeoutMs, + abortSignal: params.abortSignal, }); const saved = await saveMediaBuffer( fetched.buffer, diff --git a/extensions/discord/src/monitor/timeouts.ts b/extensions/discord/src/monitor/timeouts.ts index a0083adaa0d..f0f682114b2 100644 --- a/extensions/discord/src/monitor/timeouts.ts +++ b/extensions/discord/src/monitor/timeouts.ts @@ -2,6 +2,8 @@ const MAX_DISCORD_TIMEOUT_MS = 2_147_483_647; export const DISCORD_DEFAULT_LISTENER_TIMEOUT_MS = 120_000; export const DISCORD_DEFAULT_INBOUND_WORKER_TIMEOUT_MS = 30 * 60_000; +export const DISCORD_ATTACHMENT_IDLE_TIMEOUT_MS = 60_000; +export const DISCORD_ATTACHMENT_TOTAL_TIMEOUT_MS = 120_000; function clampDiscordTimeoutMs(timeoutMs: number, minimumMs: number): number { return Math.max(minimumMs, Math.min(Math.floor(timeoutMs), MAX_DISCORD_TIMEOUT_MS));