fix: prevent Telegram polling watchdog from dropping replies (#56343) (thanks @openperf)

* fix(telegram): prevent polling watchdog from aborting in-flight message delivery

The polling-stall watchdog only tracked getUpdates timestamps to detect
network stalls. When the agent takes >90s to process a message (common
with local/large models), getUpdates naturally pauses, and the watchdog
misidentifies this as a stall. It then calls fetchAbortController.abort(),
which cancels all in-flight Telegram API requests — including the
sendMessage call delivering the agent's reply. The message is silently
lost with no retry.

Track a separate lastApiActivityAt timestamp that is updated whenever
any Telegram API call (sendMessage, sendChatAction, etc.) completes
successfully. The watchdog now only triggers when both getUpdates AND
all other API activity have been silent beyond the threshold, proving
the network is genuinely stalled rather than just busy processing.

Update existing stall test to account for the new timestamp, and add a
regression test verifying that recent sendMessage activity suppresses
the watchdog.

Fixes #56065
Related: #53374, #54708

* fix(telegram): guard watchdog against in-flight API calls

* fix(telegram): bound watchdog API liveness

* fix: track newest watchdog API activity (#56343) (thanks @openperf)

* fix: note Telegram watchdog delivery fix (#56343) (thanks @openperf)

---------

Co-authored-by: Ayaan Zaidi <hi@obviy.us>
This commit is contained in:
wangchunyue 2026-03-29 13:41:28 +08:00 committed by GitHub
parent eee8e9679e
commit dd61171f5b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 522 additions and 3 deletions

View File

@ -33,6 +33,7 @@ Docs: https://docs.openclaw.ai
- Control UI/slash commands: make `/steer` and `/redirect` work from the chat command palette with visible pending state for active-run `/steer`, correct redirected-run tracking, and a single canonical `/steer` entry in the command menu. (#54625) Thanks @fuller-stack-dev.
- Exec: fail closed when the implicit sandbox host has no sandbox runtime, and stop denied async approval followups from reusing prior command output from the same session. (#56800) Thanks @scoootscooob.
- Plugins/ClawHub: sanitize temporary archive filenames for scoped package names and slash-containing skill slugs so `openclaw plugins install @scope/name` no longer fails with `ENOENT` during archive download. (#56452) Thanks @soimy.
- Telegram/polling: keep the watchdog from aborting long-running reply delivery by treating recent non-polling API activity as bounded liveness instead of a hard stall. (#56343) Thanks @openperf.
## 2026.3.28

View File

@ -58,7 +58,8 @@ function installPollingStallWatchdogHarness() {
const clearTimeoutSpy = vi.spyOn(globalThis, "clearTimeout").mockImplementation(() => {});
const dateNowSpy = vi
.spyOn(Date, "now")
.mockImplementationOnce(() => 0)
.mockImplementationOnce(() => 0) // lastGetUpdatesAt init
.mockImplementationOnce(() => 0) // lastApiActivityAt init
.mockImplementation(() => 120_001);
return {
@ -363,6 +364,483 @@ describe("TelegramPollingSession", () => {
expect(createTelegramTransport).toHaveBeenCalledTimes(1);
});
it("does not trigger stall restart when non-getUpdates API calls are active", async () => {
const abort = new AbortController();
const botStop = vi.fn(async () => undefined);
const runnerStop = vi.fn(async () => undefined);
// Capture the API middleware so we can simulate sendMessage calls
let apiMiddleware:
| ((
prev: (...args: unknown[]) => Promise<unknown>,
method: string,
payload: unknown,
) => Promise<unknown>)
| undefined;
const bot = {
api: {
deleteWebhook: vi.fn(async () => true),
getUpdates: vi.fn(async () => []),
config: {
use: vi.fn((fn: typeof apiMiddleware) => {
apiMiddleware = fn;
}),
},
},
stop: botStop,
};
createTelegramBotMock.mockReturnValue(bot);
let firstTaskResolve: (() => void) | undefined;
const firstTask = new Promise<void>((resolve) => {
firstTaskResolve = resolve;
});
runMock.mockImplementation(() => ({
task: () => firstTask,
stop: async () => {
await runnerStop();
firstTaskResolve?.();
},
isRunning: () => true,
}));
// t=0: lastGetUpdatesAt and lastApiActivityAt initialized
// t=120_001: watchdog fires (getUpdates stale for 120s)
// But right before watchdog, a sendMessage succeeded at t=120_000
const setIntervalSpy = vi.spyOn(globalThis, "setInterval").mockImplementation((fn) => {
watchdog = fn as () => void;
return 1 as unknown as ReturnType<typeof setInterval>;
});
const clearIntervalSpy = vi.spyOn(globalThis, "clearInterval").mockImplementation(() => {});
const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout").mockImplementation((fn) => {
void Promise.resolve().then(() => (fn as () => void)());
return 1 as unknown as ReturnType<typeof setTimeout>;
});
const clearTimeoutSpy = vi.spyOn(globalThis, "clearTimeout").mockImplementation(() => {});
const dateNowSpy = vi
.spyOn(Date, "now")
.mockImplementationOnce(() => 0) // lastGetUpdatesAt init
.mockImplementationOnce(() => 0) // lastApiActivityAt init
// All subsequent calls (sendMessage completion + watchdog check) return
// the same value, giving apiIdle = 0 — well below the stall threshold.
.mockImplementation(() => 120_001);
let watchdog: (() => void) | undefined;
const log = vi.fn();
const session = new TelegramPollingSession({
token: "tok",
config: {},
accountId: "default",
runtime: undefined,
proxyFetch: undefined,
abortSignal: abort.signal,
runnerOptions: {},
getLastUpdateId: () => null,
persistUpdateId: async () => undefined,
log,
telegramTransport: undefined,
});
try {
const runPromise = session.runUntilAbort();
// Wait for watchdog to be captured
for (let attempt = 0; attempt < 20 && !watchdog; attempt += 1) {
await Promise.resolve();
}
expect(watchdog).toBeTypeOf("function");
// Simulate a sendMessage call through the middleware before watchdog fires.
// This updates lastApiActivityAt, proving the network is alive.
if (apiMiddleware) {
const fakePrev = vi.fn(async () => ({ ok: true }));
await apiMiddleware(fakePrev, "sendMessage", { chat_id: 123, text: "hello" });
}
// Now fire the watchdog — getUpdates is stale (120s) but API was just active
watchdog?.();
// The watchdog should NOT have triggered a restart
expect(runnerStop).not.toHaveBeenCalled();
expect(botStop).not.toHaveBeenCalled();
expect(log).not.toHaveBeenCalledWith(expect.stringContaining("Polling stall detected"));
// Clean up: abort to end the session
abort.abort();
firstTaskResolve?.();
await runPromise;
} finally {
setIntervalSpy.mockRestore();
clearIntervalSpy.mockRestore();
setTimeoutSpy.mockRestore();
clearTimeoutSpy.mockRestore();
dateNowSpy.mockRestore();
}
});
it("does not trigger stall restart while a recent non-getUpdates API call is in-flight", async () => {
const abort = new AbortController();
const botStop = vi.fn(async () => undefined);
const runnerStop = vi.fn(async () => undefined);
let apiMiddleware:
| ((
prev: (...args: unknown[]) => Promise<unknown>,
method: string,
payload: unknown,
) => Promise<unknown>)
| undefined;
createTelegramBotMock.mockReturnValueOnce({
api: {
deleteWebhook: vi.fn(async () => true),
getUpdates: vi.fn(async () => []),
config: {
use: vi.fn((fn: typeof apiMiddleware) => {
apiMiddleware = fn;
}),
},
},
stop: botStop,
});
let firstTaskResolve: (() => void) | undefined;
runMock.mockReturnValue({
task: () =>
new Promise<void>((resolve) => {
firstTaskResolve = resolve;
}),
stop: async () => {
await runnerStop();
firstTaskResolve?.();
},
isRunning: () => true,
});
// t=0: lastGetUpdatesAt and lastApiActivityAt initialized
const setIntervalSpy = vi.spyOn(globalThis, "setInterval").mockImplementation((fn) => {
watchdog = fn as () => void;
return 1 as unknown as ReturnType<typeof setInterval>;
});
const clearIntervalSpy = vi.spyOn(globalThis, "clearInterval").mockImplementation(() => {});
const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout").mockImplementation((fn) => {
void Promise.resolve().then(() => (fn as () => void)());
return 1 as unknown as ReturnType<typeof setTimeout>;
});
const clearTimeoutSpy = vi.spyOn(globalThis, "clearTimeout").mockImplementation(() => {});
const dateNowSpy = vi
.spyOn(Date, "now")
.mockImplementationOnce(() => 0) // lastGetUpdatesAt init
.mockImplementationOnce(() => 0) // lastApiActivityAt init
.mockImplementationOnce(() => 60_000) // sendMessage start
.mockImplementation(() => 120_001);
let watchdog: (() => void) | undefined;
const log = vi.fn();
const session = new TelegramPollingSession({
token: "tok",
config: {},
accountId: "default",
runtime: undefined,
proxyFetch: undefined,
abortSignal: abort.signal,
runnerOptions: {},
getLastUpdateId: () => null,
persistUpdateId: async () => undefined,
log,
telegramTransport: undefined,
});
try {
const runPromise = session.runUntilAbort();
for (let attempt = 0; attempt < 20 && !watchdog; attempt += 1) {
await Promise.resolve();
}
expect(watchdog).toBeTypeOf("function");
// Start an in-flight sendMessage that has NOT yet resolved.
// This simulates a slow delivery where the API call is still pending.
let resolveSendMessage: ((v: unknown) => void) | undefined;
if (apiMiddleware) {
const slowPrev = vi.fn(
() =>
new Promise((resolve) => {
resolveSendMessage = resolve;
}),
);
// Fire-and-forget: the call is in-flight but not awaited yet
const sendPromise = apiMiddleware(slowPrev, "sendMessage", { chat_id: 123, text: "hello" });
// Fire the watchdog while sendMessage is still in-flight.
// The in-flight call started 60s ago, so API liveness is still recent.
watchdog?.();
// The watchdog should NOT have triggered a restart
expect(runnerStop).not.toHaveBeenCalled();
expect(botStop).not.toHaveBeenCalled();
expect(log).not.toHaveBeenCalledWith(expect.stringContaining("Polling stall detected"));
// Resolve the in-flight call to clean up
resolveSendMessage?.({ ok: true });
await sendPromise;
}
abort.abort();
firstTaskResolve?.();
await runPromise;
} finally {
setIntervalSpy.mockRestore();
clearIntervalSpy.mockRestore();
setTimeoutSpy.mockRestore();
clearTimeoutSpy.mockRestore();
dateNowSpy.mockRestore();
}
});
it("triggers stall restart when a non-getUpdates API call has been in-flight past the threshold", async () => {
const abort = new AbortController();
const botStop = vi.fn(async () => undefined);
const runnerStop = vi.fn(async () => undefined);
let apiMiddleware:
| ((
prev: (...args: unknown[]) => Promise<unknown>,
method: string,
payload: unknown,
) => Promise<unknown>)
| undefined;
createTelegramBotMock.mockReturnValueOnce({
api: {
deleteWebhook: vi.fn(async () => true),
getUpdates: vi.fn(async () => []),
config: {
use: vi.fn((fn: typeof apiMiddleware) => {
apiMiddleware = fn;
}),
},
},
stop: botStop,
});
let firstTaskResolve: (() => void) | undefined;
runMock.mockReturnValue({
task: () =>
new Promise<void>((resolve) => {
firstTaskResolve = resolve;
}),
stop: async () => {
await runnerStop();
firstTaskResolve?.();
},
isRunning: () => true,
});
const setIntervalSpy = vi.spyOn(globalThis, "setInterval").mockImplementation((fn) => {
watchdog = fn as () => void;
return 1 as unknown as ReturnType<typeof setInterval>;
});
const clearIntervalSpy = vi.spyOn(globalThis, "clearInterval").mockImplementation(() => {});
const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout").mockImplementation((fn) => {
void Promise.resolve().then(() => (fn as () => void)());
return 1 as unknown as ReturnType<typeof setTimeout>;
});
const clearTimeoutSpy = vi.spyOn(globalThis, "clearTimeout").mockImplementation(() => {});
const dateNowSpy = vi
.spyOn(Date, "now")
.mockImplementationOnce(() => 0) // lastGetUpdatesAt init
.mockImplementationOnce(() => 0) // lastApiActivityAt init
.mockImplementationOnce(() => 1) // sendMessage start
.mockImplementation(() => 120_001);
let watchdog: (() => void) | undefined;
const log = vi.fn();
const session = new TelegramPollingSession({
token: "tok",
config: {},
accountId: "default",
runtime: undefined,
proxyFetch: undefined,
abortSignal: abort.signal,
runnerOptions: {},
getLastUpdateId: () => null,
persistUpdateId: async () => undefined,
log,
telegramTransport: undefined,
});
try {
const runPromise = session.runUntilAbort();
for (let attempt = 0; attempt < 20 && !watchdog; attempt += 1) {
await Promise.resolve();
}
expect(watchdog).toBeTypeOf("function");
let resolveSendMessage: ((v: unknown) => void) | undefined;
if (apiMiddleware) {
const slowPrev = vi.fn(
() =>
new Promise((resolve) => {
resolveSendMessage = resolve;
}),
);
const sendPromise = apiMiddleware(slowPrev, "sendMessage", { chat_id: 123, text: "hello" });
// The in-flight send started at t=1 and is still stuck at t=120_001.
// That is older than the watchdog threshold, so restart should proceed.
watchdog?.();
expect(runnerStop).toHaveBeenCalledTimes(1);
expect(botStop).toHaveBeenCalledTimes(1);
expect(log).toHaveBeenCalledWith(expect.stringContaining("Polling stall detected"));
resolveSendMessage?.({ ok: true });
await sendPromise;
}
abort.abort();
firstTaskResolve?.();
await runPromise;
} finally {
setIntervalSpy.mockRestore();
clearIntervalSpy.mockRestore();
setTimeoutSpy.mockRestore();
clearTimeoutSpy.mockRestore();
dateNowSpy.mockRestore();
}
});
it("does not trigger stall restart when a newer non-getUpdates API call starts while an older one is still in-flight", async () => {
const abort = new AbortController();
const botStop = vi.fn(async () => undefined);
const runnerStop = vi.fn(async () => undefined);
let apiMiddleware:
| ((
prev: (...args: unknown[]) => Promise<unknown>,
method: string,
payload: unknown,
) => Promise<unknown>)
| undefined;
createTelegramBotMock.mockReturnValueOnce({
api: {
deleteWebhook: vi.fn(async () => true),
getUpdates: vi.fn(async () => []),
config: {
use: vi.fn((fn: typeof apiMiddleware) => {
apiMiddleware = fn;
}),
},
},
stop: botStop,
});
let firstTaskResolve: (() => void) | undefined;
runMock.mockReturnValue({
task: () =>
new Promise<void>((resolve) => {
firstTaskResolve = resolve;
}),
stop: async () => {
await runnerStop();
firstTaskResolve?.();
},
isRunning: () => true,
});
const setIntervalSpy = vi.spyOn(globalThis, "setInterval").mockImplementation((fn) => {
watchdog = fn as () => void;
return 1 as unknown as ReturnType<typeof setInterval>;
});
const clearIntervalSpy = vi.spyOn(globalThis, "clearInterval").mockImplementation(() => {});
const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout").mockImplementation((fn) => {
void Promise.resolve().then(() => (fn as () => void)());
return 1 as unknown as ReturnType<typeof setTimeout>;
});
const clearTimeoutSpy = vi.spyOn(globalThis, "clearTimeout").mockImplementation(() => {});
const dateNowSpy = vi
.spyOn(Date, "now")
.mockImplementationOnce(() => 0) // lastGetUpdatesAt init
.mockImplementationOnce(() => 0) // lastApiActivityAt init
.mockImplementationOnce(() => 1) // first sendMessage start
.mockImplementationOnce(() => 120_000) // second sendMessage start
.mockImplementation(() => 120_001);
let watchdog: (() => void) | undefined;
const log = vi.fn();
const session = new TelegramPollingSession({
token: "tok",
config: {},
accountId: "default",
runtime: undefined,
proxyFetch: undefined,
abortSignal: abort.signal,
runnerOptions: {},
getLastUpdateId: () => null,
persistUpdateId: async () => undefined,
log,
telegramTransport: undefined,
});
try {
const runPromise = session.runUntilAbort();
for (let attempt = 0; attempt < 20 && !watchdog; attempt += 1) {
await Promise.resolve();
}
expect(watchdog).toBeTypeOf("function");
let resolveFirstSend: ((v: unknown) => void) | undefined;
let resolveSecondSend: ((v: unknown) => void) | undefined;
if (apiMiddleware) {
const firstSendPromise = apiMiddleware(
vi.fn(
() =>
new Promise((resolve) => {
resolveFirstSend = resolve;
}),
),
"sendMessage",
{ chat_id: 123, text: "older" },
);
const secondSendPromise = apiMiddleware(
vi.fn(
() =>
new Promise((resolve) => {
resolveSecondSend = resolve;
}),
),
"sendMessage",
{ chat_id: 123, text: "newer" },
);
// The older send is stale, but the newer send started just now.
// Watchdog liveness must follow the newest active non-getUpdates call.
watchdog?.();
expect(runnerStop).not.toHaveBeenCalled();
expect(botStop).not.toHaveBeenCalled();
expect(log).not.toHaveBeenCalledWith(expect.stringContaining("Polling stall detected"));
resolveFirstSend?.({ ok: true });
resolveSecondSend?.({ ok: true });
await firstSendPromise;
await secondSendPromise;
}
abort.abort();
firstTaskResolve?.();
await runPromise;
} finally {
setIntervalSpy.mockRestore();
clearIntervalSpy.mockRestore();
setTimeoutSpy.mockRestore();
clearTimeoutSpy.mockRestore();
dateNowSpy.mockRestore();
}
});
it("reuses the transport after a getUpdates conflict", async () => {
const abort = new AbortController();
const conflictError = Object.assign(

View File

@ -204,6 +204,10 @@ export class TelegramPollingSession {
await this.#confirmPersistedOffset(bot);
let lastGetUpdatesAt = Date.now();
let lastApiActivityAt = Date.now();
let nextInFlightApiCallId = 0;
let latestInFlightApiStartedAt: number | null = null;
const inFlightApiStartedAt = new Map<number, number>();
let lastGetUpdatesStartedAt: number | null = null;
let lastGetUpdatesFinishedAt: number | null = null;
let lastGetUpdatesDurationMs: number | null = null;
@ -216,7 +220,31 @@ export class TelegramPollingSession {
bot.api.config.use(async (prev, method, payload, signal) => {
if (method !== "getUpdates") {
return prev(method, payload, signal);
const startedAt = Date.now();
const callId = nextInFlightApiCallId;
nextInFlightApiCallId += 1;
inFlightApiStartedAt.set(callId, startedAt);
latestInFlightApiStartedAt =
latestInFlightApiStartedAt == null
? startedAt
: Math.max(latestInFlightApiStartedAt, startedAt);
try {
const result = await prev(method, payload, signal);
lastApiActivityAt = Date.now();
return result;
} finally {
inFlightApiStartedAt.delete(callId);
if (latestInFlightApiStartedAt === startedAt) {
let newestStartedAt: number | null = null;
for (const activeStartedAt of inFlightApiStartedAt.values()) {
newestStartedAt =
newestStartedAt == null
? activeStartedAt
: Math.max(newestStartedAt, activeStartedAt);
}
latestInFlightApiStartedAt = newestStartedAt;
}
}
}
const startedAt = Date.now();
@ -301,8 +329,20 @@ export class TelegramPollingSession {
const idleElapsed =
inFlightGetUpdates > 0 ? 0 : now - (lastGetUpdatesFinishedAt ?? lastGetUpdatesAt);
const elapsed = inFlightGetUpdates > 0 ? activeElapsed : idleElapsed;
const apiLivenessAt =
latestInFlightApiStartedAt == null
? lastApiActivityAt
: Math.max(lastApiActivityAt, latestInFlightApiStartedAt);
const apiElapsed = now - apiLivenessAt;
if (elapsed > POLL_STALL_THRESHOLD_MS && runner.isRunning()) {
// Treat recent non-getUpdates success and recent non-getUpdates start as
// the same liveness signal. Slow delivery should suppress the watchdog,
// but only for the same bounded window as recent successful API traffic.
if (
elapsed > POLL_STALL_THRESHOLD_MS &&
apiElapsed > POLL_STALL_THRESHOLD_MS &&
runner.isRunning()
) {
if (stallDiagLoggedAt && now - stallDiagLoggedAt < POLL_STALL_THRESHOLD_MS / 2) {
return;
}