mirror of https://github.com/openclaw/openclaw.git
telegram: rebuild transport after stalled polling cycles
This commit is contained in:
parent
663ba5a3cd
commit
e035a0d98c
|
|
@ -91,8 +91,10 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
|
|||
const activeRunner = pollingSession?.activeRunner;
|
||||
if (isNetworkError && isTelegramPollingError && activeRunner && activeRunner.isRunning()) {
|
||||
pollingSession?.markForceRestarted();
|
||||
pollingSession?.markTransportDirty();
|
||||
pollingSession?.abortActiveFetch();
|
||||
void activeRunner.stop().catch(() => {});
|
||||
log("[telegram][diag] marking transport dirty after polling network failure");
|
||||
log(
|
||||
`[telegram] Restarting polling after unhandled network error: ${formatErrorMessage(err)}`,
|
||||
);
|
||||
|
|
@ -180,9 +182,11 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
|
|||
}
|
||||
|
||||
// Create transport once to preserve sticky IPv4 fallback state across polling restarts
|
||||
const telegramTransport = resolveTelegramTransport(proxyFetch, {
|
||||
network: account.config.network,
|
||||
});
|
||||
const createTelegramTransportForPolling = () =>
|
||||
resolveTelegramTransport(proxyFetch, {
|
||||
network: account.config.network,
|
||||
});
|
||||
const telegramTransport = createTelegramTransportForPolling();
|
||||
|
||||
pollingSession = new TelegramPollingSession({
|
||||
token,
|
||||
|
|
@ -196,6 +200,7 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
|
|||
persistUpdateId,
|
||||
log,
|
||||
telegramTransport,
|
||||
createTelegramTransport: createTelegramTransportForPolling,
|
||||
});
|
||||
await pollingSession.runUntilAbort();
|
||||
} finally {
|
||||
|
|
|
|||
|
|
@ -50,6 +50,8 @@ type TelegramPollingSessionOpts = {
|
|||
log: (line: string) => void;
|
||||
/** Pre-resolved Telegram transport to reuse across bot instances */
|
||||
telegramTransport?: TelegramTransport;
|
||||
/** Rebuild Telegram transport after stall/network recovery when marked dirty. */
|
||||
createTelegramTransport?: () => TelegramTransport;
|
||||
};
|
||||
|
||||
export class TelegramPollingSession {
|
||||
|
|
@ -58,8 +60,12 @@ export class TelegramPollingSession {
|
|||
#forceRestarted = false;
|
||||
#activeRunner: ReturnType<typeof run> | undefined;
|
||||
#activeFetchAbort: AbortController | undefined;
|
||||
#telegramTransport: TelegramTransport | undefined;
|
||||
#discardTransportOnRestart = false;
|
||||
|
||||
constructor(private readonly opts: TelegramPollingSessionOpts) {}
|
||||
constructor(private readonly opts: TelegramPollingSessionOpts) {
|
||||
this.#telegramTransport = opts.telegramTransport;
|
||||
}
|
||||
|
||||
get activeRunner() {
|
||||
return this.#activeRunner;
|
||||
|
|
@ -69,6 +75,10 @@ export class TelegramPollingSession {
|
|||
this.#forceRestarted = true;
|
||||
}
|
||||
|
||||
markTransportDirty() {
|
||||
this.#discardTransportOnRestart = true;
|
||||
}
|
||||
|
||||
abortActiveFetch() {
|
||||
this.#activeFetchAbort?.abort();
|
||||
}
|
||||
|
|
@ -126,6 +136,15 @@ export class TelegramPollingSession {
|
|||
async #createPollingBot(): Promise<TelegramBot | undefined> {
|
||||
const fetchAbortController = new AbortController();
|
||||
this.#activeFetchAbort = fetchAbortController;
|
||||
const shouldRebuildTransport = this.#discardTransportOnRestart || !this.#telegramTransport;
|
||||
const telegramTransport = shouldRebuildTransport
|
||||
? (this.opts.createTelegramTransport?.() ?? this.#telegramTransport)
|
||||
: this.#telegramTransport;
|
||||
if (shouldRebuildTransport && telegramTransport) {
|
||||
this.opts.log("[telegram][diag] rebuilding transport for next polling cycle");
|
||||
}
|
||||
this.#telegramTransport = telegramTransport;
|
||||
this.#discardTransportOnRestart = false;
|
||||
try {
|
||||
return createTelegramBot({
|
||||
token: this.opts.token,
|
||||
|
|
@ -138,7 +157,7 @@ export class TelegramPollingSession {
|
|||
lastUpdateId: this.opts.getLastUpdateId(),
|
||||
onUpdateId: this.opts.persistUpdateId,
|
||||
},
|
||||
telegramTransport: this.opts.telegramTransport,
|
||||
telegramTransport,
|
||||
});
|
||||
} catch (err) {
|
||||
await this.#waitBeforeRetryOnRecoverableSetupError(err, "Telegram setup network error");
|
||||
|
|
@ -186,11 +205,49 @@ export class TelegramPollingSession {
|
|||
await this.#confirmPersistedOffset(bot);
|
||||
|
||||
let lastGetUpdatesAt = Date.now();
|
||||
bot.api.config.use((prev, method, payload, signal) => {
|
||||
if (method === "getUpdates") {
|
||||
lastGetUpdatesAt = Date.now();
|
||||
let lastGetUpdatesStartedAt: number | null = null;
|
||||
let lastGetUpdatesFinishedAt: number | null = null;
|
||||
let lastGetUpdatesDurationMs: number | null = null;
|
||||
let lastGetUpdatesOutcome = "not-started";
|
||||
let lastGetUpdatesError: string | null = null;
|
||||
let lastGetUpdatesOffset: number | null = null;
|
||||
let inFlightGetUpdates = 0;
|
||||
let stopSequenceLogged = false;
|
||||
let stallDiagLoggedAt = 0;
|
||||
|
||||
bot.api.config.use(async (prev, method, payload, signal) => {
|
||||
if (method !== "getUpdates") {
|
||||
return prev(method, payload, signal);
|
||||
}
|
||||
|
||||
const startedAt = Date.now();
|
||||
lastGetUpdatesAt = startedAt;
|
||||
lastGetUpdatesStartedAt = startedAt;
|
||||
lastGetUpdatesOffset =
|
||||
payload && typeof payload === "object" && "offset" in payload
|
||||
? ((payload as { offset?: number }).offset ?? null)
|
||||
: null;
|
||||
inFlightGetUpdates += 1;
|
||||
lastGetUpdatesOutcome = "started";
|
||||
lastGetUpdatesError = null;
|
||||
|
||||
try {
|
||||
const result = await prev(method, payload, signal);
|
||||
const finishedAt = Date.now();
|
||||
lastGetUpdatesFinishedAt = finishedAt;
|
||||
lastGetUpdatesDurationMs = finishedAt - startedAt;
|
||||
lastGetUpdatesOutcome = Array.isArray(result) ? `ok:${result.length}` : "ok";
|
||||
return result;
|
||||
} catch (err) {
|
||||
const finishedAt = Date.now();
|
||||
lastGetUpdatesFinishedAt = finishedAt;
|
||||
lastGetUpdatesDurationMs = finishedAt - startedAt;
|
||||
lastGetUpdatesOutcome = "error";
|
||||
lastGetUpdatesError = formatErrorMessage(err);
|
||||
throw err;
|
||||
} finally {
|
||||
inFlightGetUpdates = Math.max(0, inFlightGetUpdates - 1);
|
||||
}
|
||||
return prev(method, payload, signal);
|
||||
});
|
||||
|
||||
const runner = run(bot, this.opts.runnerOptions);
|
||||
|
|
@ -236,11 +293,26 @@ export class TelegramPollingSession {
|
|||
if (this.opts.abortSignal?.aborted) {
|
||||
return;
|
||||
}
|
||||
const elapsed = Date.now() - lastGetUpdatesAt;
|
||||
|
||||
const now = Date.now();
|
||||
const activeElapsed =
|
||||
inFlightGetUpdates > 0 && lastGetUpdatesStartedAt != null ? now - lastGetUpdatesStartedAt : 0;
|
||||
const idleElapsed = inFlightGetUpdates > 0 ? 0 : now - (lastGetUpdatesFinishedAt ?? lastGetUpdatesAt);
|
||||
const elapsed = inFlightGetUpdates > 0 ? activeElapsed : idleElapsed;
|
||||
|
||||
if (elapsed > POLL_STALL_THRESHOLD_MS && runner.isRunning()) {
|
||||
if (stallDiagLoggedAt && now - stallDiagLoggedAt < POLL_STALL_THRESHOLD_MS / 2) {
|
||||
return;
|
||||
}
|
||||
stallDiagLoggedAt = now;
|
||||
this.#discardTransportOnRestart = true;
|
||||
stalledRestart = true;
|
||||
const elapsedLabel =
|
||||
inFlightGetUpdates > 0
|
||||
? `active getUpdates stuck for ${formatDurationPrecise(elapsed)}`
|
||||
: `no completed getUpdates for ${formatDurationPrecise(elapsed)}`;
|
||||
this.opts.log(
|
||||
`[telegram] Polling stall detected (no getUpdates for ${formatDurationPrecise(elapsed)}); forcing restart.`,
|
||||
`[telegram] Polling stall detected (${elapsedLabel}); forcing restart. [diag inFlight=${inFlightGetUpdates} outcome=${lastGetUpdatesOutcome} startedAt=${lastGetUpdatesStartedAt ?? "n/a"} finishedAt=${lastGetUpdatesFinishedAt ?? "n/a"} durationMs=${lastGetUpdatesDurationMs ?? "n/a"} offset=${lastGetUpdatesOffset ?? "n/a"}${lastGetUpdatesError ? ` error=${lastGetUpdatesError}` : ""}]`,
|
||||
);
|
||||
void stopRunner();
|
||||
void stopBot();
|
||||
|
|
@ -270,6 +342,9 @@ export class TelegramPollingSession {
|
|||
? "unhandled network error"
|
||||
: "runner stopped (maxRetryTime exceeded or graceful stop)";
|
||||
this.#forceRestarted = false;
|
||||
this.opts.log(
|
||||
`[telegram][diag] polling cycle finished reason=${reason} inFlight=${inFlightGetUpdates} outcome=${lastGetUpdatesOutcome} startedAt=${lastGetUpdatesStartedAt ?? "n/a"} finishedAt=${lastGetUpdatesFinishedAt ?? "n/a"} durationMs=${lastGetUpdatesDurationMs ?? "n/a"} offset=${lastGetUpdatesOffset ?? "n/a"}${lastGetUpdatesError ? ` error=${lastGetUpdatesError}` : ""}`,
|
||||
);
|
||||
const shouldRestart = await this.#waitBeforeRestart(
|
||||
(delay) => `Telegram polling runner stopped (${reason}); restarting in ${delay}.`,
|
||||
);
|
||||
|
|
@ -284,11 +359,17 @@ export class TelegramPollingSession {
|
|||
this.#webhookCleared = false;
|
||||
}
|
||||
const isRecoverable = isRecoverableTelegramNetworkError(err, { context: "polling" });
|
||||
if (isConflict || isRecoverable) {
|
||||
this.#discardTransportOnRestart = true;
|
||||
}
|
||||
if (!isConflict && !isRecoverable) {
|
||||
throw err;
|
||||
}
|
||||
const reason = isConflict ? "getUpdates conflict" : "network error";
|
||||
const errMsg = formatErrorMessage(err);
|
||||
this.opts.log(
|
||||
`[telegram][diag] polling cycle error reason=${reason} inFlight=${inFlightGetUpdates} outcome=${lastGetUpdatesOutcome} startedAt=${lastGetUpdatesStartedAt ?? "n/a"} finishedAt=${lastGetUpdatesFinishedAt ?? "n/a"} durationMs=${lastGetUpdatesDurationMs ?? "n/a"} offset=${lastGetUpdatesOffset ?? "n/a"} err=${errMsg}${lastGetUpdatesError ? ` lastGetUpdatesError=${lastGetUpdatesError}` : ""}`,
|
||||
);
|
||||
const shouldRestart = await this.#waitBeforeRestart(
|
||||
(delay) => `Telegram ${reason}: ${errMsg}; retrying in ${delay}.`,
|
||||
);
|
||||
|
|
|
|||
Loading…
Reference in New Issue