mirror of https://github.com/openclaw/openclaw.git
fix(telegram): add download timeout to prevent polling loop hang
When downloading large files via Telegram, if the stream stalls mid-download (e.g. no Content-Length header, network congestion), the polling loop hangs indefinitely since reader.read() has no timeout. Changes: - Add timeoutMs option to fetchRemoteMedia, passed through to fetchWithSsrFGuard - Add per-chunk timeout (30s default) to readResponseWithLimit via Promise.race - Set 30s download timeout for Telegram media fetches This ensures stalled downloads are aborted and the polling loop continues processing subsequent messages. Fixes #40074
This commit is contained in:
parent
f6243916b5
commit
dcf1ea4287
|
|
@ -31,6 +31,9 @@ type FetchMediaOptions = {
|
|||
filePathHint?: string;
|
||||
maxBytes?: number;
|
||||
maxRedirects?: number;
|
||||
/** Abort the download if it takes longer than this (ms). Covers both the
|
||||
* initial connection and streaming the response body. */
|
||||
timeoutMs?: number;
|
||||
ssrfPolicy?: SsrFPolicy;
|
||||
lookupFn?: LookupFn;
|
||||
};
|
||||
|
|
@ -87,6 +90,7 @@ export async function fetchRemoteMedia(options: FetchMediaOptions): Promise<Fetc
|
|||
filePathHint,
|
||||
maxBytes,
|
||||
maxRedirects,
|
||||
timeoutMs,
|
||||
ssrfPolicy,
|
||||
lookupFn,
|
||||
} = options;
|
||||
|
|
@ -101,6 +105,7 @@ export async function fetchRemoteMedia(options: FetchMediaOptions): Promise<Fetc
|
|||
fetchImpl,
|
||||
init: requestInit,
|
||||
maxRedirects,
|
||||
timeoutMs,
|
||||
policy: ssrfPolicy,
|
||||
lookupFn,
|
||||
}),
|
||||
|
|
|
|||
|
|
@ -0,0 +1,68 @@
|
|||
import { describe, expect, it } from "vitest";
|
||||
import { readResponseWithLimit } from "./read-response-with-limit.js";
|
||||
|
||||
function makeStream(chunks: Uint8Array[], delayMs?: number) {
|
||||
return new ReadableStream<Uint8Array>({
|
||||
async start(controller) {
|
||||
for (const chunk of chunks) {
|
||||
if (delayMs) {
|
||||
await new Promise((r) => setTimeout(r, delayMs));
|
||||
}
|
||||
controller.enqueue(chunk);
|
||||
}
|
||||
controller.close();
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
function makeStallingStream(initialChunks: Uint8Array[]) {
|
||||
return new ReadableStream<Uint8Array>({
|
||||
start(controller) {
|
||||
for (const chunk of initialChunks) {
|
||||
controller.enqueue(chunk);
|
||||
}
|
||||
// Never close — simulates a stalled download
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
describe("readResponseWithLimit", () => {
|
||||
it("reads all chunks within the limit", async () => {
|
||||
const body = makeStream([new Uint8Array([1, 2]), new Uint8Array([3, 4])]);
|
||||
const res = new Response(body);
|
||||
const buf = await readResponseWithLimit(res, 100);
|
||||
expect(buf).toEqual(Buffer.from([1, 2, 3, 4]));
|
||||
});
|
||||
|
||||
it("throws when total exceeds maxBytes", async () => {
|
||||
const body = makeStream([new Uint8Array([1, 2, 3]), new Uint8Array([4, 5, 6])]);
|
||||
const res = new Response(body);
|
||||
await expect(readResponseWithLimit(res, 4)).rejects.toThrow(/too large/i);
|
||||
});
|
||||
|
||||
it("calls custom onOverflow", async () => {
|
||||
const body = makeStream([new Uint8Array(10)]);
|
||||
const res = new Response(body);
|
||||
await expect(
|
||||
readResponseWithLimit(res, 5, {
|
||||
onOverflow: ({ size, maxBytes }) =>
|
||||
new Error(`custom: ${size} > ${maxBytes}`),
|
||||
}),
|
||||
).rejects.toThrow("custom: 10 > 5");
|
||||
});
|
||||
|
||||
it("times out when the stream stalls", async () => {
|
||||
const body = makeStallingStream([new Uint8Array([1, 2])]);
|
||||
const res = new Response(body);
|
||||
await expect(
|
||||
readResponseWithLimit(res, 1024, { chunkTimeoutMs: 50 }),
|
||||
).rejects.toThrow(/stalled/i);
|
||||
}, 5_000);
|
||||
|
||||
it("succeeds when chunks arrive within the timeout", async () => {
|
||||
const body = makeStream([new Uint8Array([1]), new Uint8Array([2])], 10);
|
||||
const res = new Response(body);
|
||||
const buf = await readResponseWithLimit(res, 100, { chunkTimeoutMs: 500 });
|
||||
expect(buf).toEqual(Buffer.from([1, 2]));
|
||||
});
|
||||
});
|
||||
|
|
@ -1,14 +1,21 @@
|
|||
/** Default per-chunk read timeout: 30 seconds. */
|
||||
const DEFAULT_CHUNK_TIMEOUT_MS = 30_000;
|
||||
|
||||
export async function readResponseWithLimit(
|
||||
res: Response,
|
||||
maxBytes: number,
|
||||
opts?: {
|
||||
onOverflow?: (params: { size: number; maxBytes: number; res: Response }) => Error;
|
||||
/** Maximum time (ms) to wait for a single chunk before aborting.
|
||||
* Prevents the read loop from hanging indefinitely when a download stalls. */
|
||||
chunkTimeoutMs?: number;
|
||||
},
|
||||
): Promise<Buffer> {
|
||||
const onOverflow =
|
||||
opts?.onOverflow ??
|
||||
((params: { size: number; maxBytes: number }) =>
|
||||
new Error(`Content too large: ${params.size} bytes (limit: ${params.maxBytes} bytes)`));
|
||||
const chunkTimeout = opts?.chunkTimeoutMs ?? DEFAULT_CHUNK_TIMEOUT_MS;
|
||||
|
||||
const body = res.body;
|
||||
if (!body || typeof body.getReader !== "function") {
|
||||
|
|
@ -24,7 +31,16 @@ export async function readResponseWithLimit(
|
|||
let total = 0;
|
||||
try {
|
||||
while (true) {
|
||||
const { done, value } = await reader.read();
|
||||
const chunkResult = await Promise.race([
|
||||
reader.read(),
|
||||
new Promise<never>((_, reject) =>
|
||||
setTimeout(
|
||||
() => reject(new Error(`Media download stalled: no data received for ${chunkTimeout}ms`)),
|
||||
chunkTimeout,
|
||||
),
|
||||
),
|
||||
]);
|
||||
const { done, value } = chunkResult;
|
||||
if (done) {
|
||||
break;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -100,6 +100,10 @@ function resolveRequiredFetchImpl(proxyFetch?: typeof fetch): typeof fetch {
|
|||
return fetchImpl;
|
||||
}
|
||||
|
||||
/** Default timeout for downloading Telegram media files (30 seconds).
|
||||
* Prevents the polling loop from hanging indefinitely when a download stalls. */
|
||||
const TELEGRAM_DOWNLOAD_TIMEOUT_MS = 30_000;
|
||||
|
||||
async function downloadAndSaveTelegramFile(params: {
|
||||
filePath: string;
|
||||
token: string;
|
||||
|
|
@ -113,6 +117,7 @@ async function downloadAndSaveTelegramFile(params: {
|
|||
fetchImpl: params.fetchImpl,
|
||||
filePathHint: params.filePath,
|
||||
maxBytes: params.maxBytes,
|
||||
timeoutMs: TELEGRAM_DOWNLOAD_TIMEOUT_MS,
|
||||
ssrfPolicy: TELEGRAM_MEDIA_SSRF_POLICY,
|
||||
});
|
||||
const originalName = params.telegramFileName ?? fetched.fileName ?? params.filePath;
|
||||
|
|
|
|||
Loading…
Reference in New Issue