diff --git a/extensions/telegram/src/monitor.ts b/extensions/telegram/src/monitor.ts index e703266f0f0..6ff2e8ef9d0 100644 --- a/extensions/telegram/src/monitor.ts +++ b/extensions/telegram/src/monitor.ts @@ -91,8 +91,10 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { const activeRunner = pollingSession?.activeRunner; if (isNetworkError && isTelegramPollingError && activeRunner && activeRunner.isRunning()) { pollingSession?.markForceRestarted(); + pollingSession?.markTransportDirty(); pollingSession?.abortActiveFetch(); void activeRunner.stop().catch(() => {}); + log("[telegram][diag] marking transport dirty after polling network failure"); log( `[telegram] Restarting polling after unhandled network error: ${formatErrorMessage(err)}`, ); @@ -180,9 +182,11 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { } // Create transport once to preserve sticky IPv4 fallback state across polling restarts - const telegramTransport = resolveTelegramTransport(proxyFetch, { - network: account.config.network, - }); + const createTelegramTransportForPolling = () => + resolveTelegramTransport(proxyFetch, { + network: account.config.network, + }); + const telegramTransport = createTelegramTransportForPolling(); pollingSession = new TelegramPollingSession({ token, @@ -196,6 +200,7 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { persistUpdateId, log, telegramTransport, + createTelegramTransport: createTelegramTransportForPolling, }); await pollingSession.runUntilAbort(); } finally { diff --git a/extensions/telegram/src/polling-session.ts b/extensions/telegram/src/polling-session.ts index 9b92435f4ec..15690b3a34d 100644 --- a/extensions/telegram/src/polling-session.ts +++ b/extensions/telegram/src/polling-session.ts @@ -50,6 +50,8 @@ type TelegramPollingSessionOpts = { log: (line: string) => void; /** Pre-resolved Telegram transport to reuse across bot instances */ telegramTransport?: TelegramTransport; + /** Rebuild Telegram transport after stall/network recovery when marked dirty. */ + createTelegramTransport?: () => TelegramTransport; }; export class TelegramPollingSession { @@ -58,8 +60,12 @@ export class TelegramPollingSession { #forceRestarted = false; #activeRunner: ReturnType | undefined; #activeFetchAbort: AbortController | undefined; + #telegramTransport: TelegramTransport | undefined; + #discardTransportOnRestart = false; - constructor(private readonly opts: TelegramPollingSessionOpts) {} + constructor(private readonly opts: TelegramPollingSessionOpts) { + this.#telegramTransport = opts.telegramTransport; + } get activeRunner() { return this.#activeRunner; @@ -69,6 +75,10 @@ export class TelegramPollingSession { this.#forceRestarted = true; } + markTransportDirty() { + this.#discardTransportOnRestart = true; + } + abortActiveFetch() { this.#activeFetchAbort?.abort(); } @@ -126,6 +136,15 @@ export class TelegramPollingSession { async #createPollingBot(): Promise { const fetchAbortController = new AbortController(); this.#activeFetchAbort = fetchAbortController; + const shouldRebuildTransport = this.#discardTransportOnRestart || !this.#telegramTransport; + const telegramTransport = shouldRebuildTransport + ? (this.opts.createTelegramTransport?.() ?? this.#telegramTransport) + : this.#telegramTransport; + if (shouldRebuildTransport && telegramTransport) { + this.opts.log("[telegram][diag] rebuilding transport for next polling cycle"); + } + this.#telegramTransport = telegramTransport; + this.#discardTransportOnRestart = false; try { return createTelegramBot({ token: this.opts.token, @@ -138,7 +157,7 @@ export class TelegramPollingSession { lastUpdateId: this.opts.getLastUpdateId(), onUpdateId: this.opts.persistUpdateId, }, - telegramTransport: this.opts.telegramTransport, + telegramTransport, }); } catch (err) { await this.#waitBeforeRetryOnRecoverableSetupError(err, "Telegram setup network error"); @@ -186,11 +205,49 @@ export class TelegramPollingSession { await this.#confirmPersistedOffset(bot); let lastGetUpdatesAt = Date.now(); - bot.api.config.use((prev, method, payload, signal) => { - if (method === "getUpdates") { - lastGetUpdatesAt = Date.now(); + let lastGetUpdatesStartedAt: number | null = null; + let lastGetUpdatesFinishedAt: number | null = null; + let lastGetUpdatesDurationMs: number | null = null; + let lastGetUpdatesOutcome = "not-started"; + let lastGetUpdatesError: string | null = null; + let lastGetUpdatesOffset: number | null = null; + let inFlightGetUpdates = 0; + let stopSequenceLogged = false; + let stallDiagLoggedAt = 0; + + bot.api.config.use(async (prev, method, payload, signal) => { + if (method !== "getUpdates") { + return prev(method, payload, signal); + } + + const startedAt = Date.now(); + lastGetUpdatesAt = startedAt; + lastGetUpdatesStartedAt = startedAt; + lastGetUpdatesOffset = + payload && typeof payload === "object" && "offset" in payload + ? ((payload as { offset?: number }).offset ?? null) + : null; + inFlightGetUpdates += 1; + lastGetUpdatesOutcome = "started"; + lastGetUpdatesError = null; + + try { + const result = await prev(method, payload, signal); + const finishedAt = Date.now(); + lastGetUpdatesFinishedAt = finishedAt; + lastGetUpdatesDurationMs = finishedAt - startedAt; + lastGetUpdatesOutcome = Array.isArray(result) ? `ok:${result.length}` : "ok"; + return result; + } catch (err) { + const finishedAt = Date.now(); + lastGetUpdatesFinishedAt = finishedAt; + lastGetUpdatesDurationMs = finishedAt - startedAt; + lastGetUpdatesOutcome = "error"; + lastGetUpdatesError = formatErrorMessage(err); + throw err; + } finally { + inFlightGetUpdates = Math.max(0, inFlightGetUpdates - 1); } - return prev(method, payload, signal); }); const runner = run(bot, this.opts.runnerOptions); @@ -236,11 +293,26 @@ export class TelegramPollingSession { if (this.opts.abortSignal?.aborted) { return; } - const elapsed = Date.now() - lastGetUpdatesAt; + + const now = Date.now(); + const activeElapsed = + inFlightGetUpdates > 0 && lastGetUpdatesStartedAt != null ? now - lastGetUpdatesStartedAt : 0; + const idleElapsed = inFlightGetUpdates > 0 ? 0 : now - (lastGetUpdatesFinishedAt ?? lastGetUpdatesAt); + const elapsed = inFlightGetUpdates > 0 ? activeElapsed : idleElapsed; + if (elapsed > POLL_STALL_THRESHOLD_MS && runner.isRunning()) { + if (stallDiagLoggedAt && now - stallDiagLoggedAt < POLL_STALL_THRESHOLD_MS / 2) { + return; + } + stallDiagLoggedAt = now; + this.#discardTransportOnRestart = true; stalledRestart = true; + const elapsedLabel = + inFlightGetUpdates > 0 + ? `active getUpdates stuck for ${formatDurationPrecise(elapsed)}` + : `no completed getUpdates for ${formatDurationPrecise(elapsed)}`; this.opts.log( - `[telegram] Polling stall detected (no getUpdates for ${formatDurationPrecise(elapsed)}); forcing restart.`, + `[telegram] Polling stall detected (${elapsedLabel}); forcing restart. [diag inFlight=${inFlightGetUpdates} outcome=${lastGetUpdatesOutcome} startedAt=${lastGetUpdatesStartedAt ?? "n/a"} finishedAt=${lastGetUpdatesFinishedAt ?? "n/a"} durationMs=${lastGetUpdatesDurationMs ?? "n/a"} offset=${lastGetUpdatesOffset ?? "n/a"}${lastGetUpdatesError ? ` error=${lastGetUpdatesError}` : ""}]`, ); void stopRunner(); void stopBot(); @@ -270,6 +342,9 @@ export class TelegramPollingSession { ? "unhandled network error" : "runner stopped (maxRetryTime exceeded or graceful stop)"; this.#forceRestarted = false; + this.opts.log( + `[telegram][diag] polling cycle finished reason=${reason} inFlight=${inFlightGetUpdates} outcome=${lastGetUpdatesOutcome} startedAt=${lastGetUpdatesStartedAt ?? "n/a"} finishedAt=${lastGetUpdatesFinishedAt ?? "n/a"} durationMs=${lastGetUpdatesDurationMs ?? "n/a"} offset=${lastGetUpdatesOffset ?? "n/a"}${lastGetUpdatesError ? ` error=${lastGetUpdatesError}` : ""}`, + ); const shouldRestart = await this.#waitBeforeRestart( (delay) => `Telegram polling runner stopped (${reason}); restarting in ${delay}.`, ); @@ -284,11 +359,17 @@ export class TelegramPollingSession { this.#webhookCleared = false; } const isRecoverable = isRecoverableTelegramNetworkError(err, { context: "polling" }); + if (isConflict || isRecoverable) { + this.#discardTransportOnRestart = true; + } if (!isConflict && !isRecoverable) { throw err; } const reason = isConflict ? "getUpdates conflict" : "network error"; const errMsg = formatErrorMessage(err); + this.opts.log( + `[telegram][diag] polling cycle error reason=${reason} inFlight=${inFlightGetUpdates} outcome=${lastGetUpdatesOutcome} startedAt=${lastGetUpdatesStartedAt ?? "n/a"} finishedAt=${lastGetUpdatesFinishedAt ?? "n/a"} durationMs=${lastGetUpdatesDurationMs ?? "n/a"} offset=${lastGetUpdatesOffset ?? "n/a"} err=${errMsg}${lastGetUpdatesError ? ` lastGetUpdatesError=${lastGetUpdatesError}` : ""}`, + ); const shouldRestart = await this.#waitBeforeRestart( (delay) => `Telegram ${reason}: ${errMsg}; retrying in ${delay}.`, );