diff --git a/CHANGELOG.md b/CHANGELOG.md index 0ab9e08bbd1..b184d7aab46 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,7 @@ Docs: https://docs.openclaw.ai - Control UI/slash commands: make `/steer` and `/redirect` work from the chat command palette with visible pending state for active-run `/steer`, correct redirected-run tracking, and a single canonical `/steer` entry in the command menu. (#54625) Thanks @fuller-stack-dev. - Exec: fail closed when the implicit sandbox host has no sandbox runtime, and stop denied async approval followups from reusing prior command output from the same session. (#56800) Thanks @scoootscooob. - Plugins/ClawHub: sanitize temporary archive filenames for scoped package names and slash-containing skill slugs so `openclaw plugins install @scope/name` no longer fails with `ENOENT` during archive download. (#56452) Thanks @soimy. +- Telegram/polling: keep the watchdog from aborting long-running reply delivery by treating recent non-polling API activity as bounded liveness instead of a hard stall. (#56343) Thanks @openperf. ## 2026.3.28 diff --git a/extensions/telegram/src/polling-session.test.ts b/extensions/telegram/src/polling-session.test.ts index 41f1e9e936e..e873a409f2f 100644 --- a/extensions/telegram/src/polling-session.test.ts +++ b/extensions/telegram/src/polling-session.test.ts @@ -58,7 +58,8 @@ function installPollingStallWatchdogHarness() { const clearTimeoutSpy = vi.spyOn(globalThis, "clearTimeout").mockImplementation(() => {}); const dateNowSpy = vi .spyOn(Date, "now") - .mockImplementationOnce(() => 0) + .mockImplementationOnce(() => 0) // lastGetUpdatesAt init + .mockImplementationOnce(() => 0) // lastApiActivityAt init .mockImplementation(() => 120_001); return { @@ -363,6 +364,483 @@ describe("TelegramPollingSession", () => { expect(createTelegramTransport).toHaveBeenCalledTimes(1); }); + it("does not trigger stall restart when non-getUpdates API calls are active", async () => { + const abort = new AbortController(); + const botStop = vi.fn(async () => undefined); + const runnerStop = vi.fn(async () => undefined); + + // Capture the API middleware so we can simulate sendMessage calls + let apiMiddleware: + | (( + prev: (...args: unknown[]) => Promise, + method: string, + payload: unknown, + ) => Promise) + | undefined; + + const bot = { + api: { + deleteWebhook: vi.fn(async () => true), + getUpdates: vi.fn(async () => []), + config: { + use: vi.fn((fn: typeof apiMiddleware) => { + apiMiddleware = fn; + }), + }, + }, + stop: botStop, + }; + createTelegramBotMock.mockReturnValue(bot); + + let firstTaskResolve: (() => void) | undefined; + const firstTask = new Promise((resolve) => { + firstTaskResolve = resolve; + }); + runMock.mockImplementation(() => ({ + task: () => firstTask, + stop: async () => { + await runnerStop(); + firstTaskResolve?.(); + }, + isRunning: () => true, + })); + + // t=0: lastGetUpdatesAt and lastApiActivityAt initialized + // t=120_001: watchdog fires (getUpdates stale for 120s) + // But right before watchdog, a sendMessage succeeded at t=120_000 + const setIntervalSpy = vi.spyOn(globalThis, "setInterval").mockImplementation((fn) => { + watchdog = fn as () => void; + return 1 as unknown as ReturnType; + }); + const clearIntervalSpy = vi.spyOn(globalThis, "clearInterval").mockImplementation(() => {}); + const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout").mockImplementation((fn) => { + void Promise.resolve().then(() => (fn as () => void)()); + return 1 as unknown as ReturnType; + }); + const clearTimeoutSpy = vi.spyOn(globalThis, "clearTimeout").mockImplementation(() => {}); + const dateNowSpy = vi + .spyOn(Date, "now") + .mockImplementationOnce(() => 0) // lastGetUpdatesAt init + .mockImplementationOnce(() => 0) // lastApiActivityAt init + // All subsequent calls (sendMessage completion + watchdog check) return + // the same value, giving apiIdle = 0 — well below the stall threshold. + .mockImplementation(() => 120_001); + + let watchdog: (() => void) | undefined; + const log = vi.fn(); + const session = new TelegramPollingSession({ + token: "tok", + config: {}, + accountId: "default", + runtime: undefined, + proxyFetch: undefined, + abortSignal: abort.signal, + runnerOptions: {}, + getLastUpdateId: () => null, + persistUpdateId: async () => undefined, + log, + telegramTransport: undefined, + }); + + try { + const runPromise = session.runUntilAbort(); + + // Wait for watchdog to be captured + for (let attempt = 0; attempt < 20 && !watchdog; attempt += 1) { + await Promise.resolve(); + } + expect(watchdog).toBeTypeOf("function"); + + // Simulate a sendMessage call through the middleware before watchdog fires. + // This updates lastApiActivityAt, proving the network is alive. + if (apiMiddleware) { + const fakePrev = vi.fn(async () => ({ ok: true })); + await apiMiddleware(fakePrev, "sendMessage", { chat_id: 123, text: "hello" }); + } + + // Now fire the watchdog — getUpdates is stale (120s) but API was just active + watchdog?.(); + + // The watchdog should NOT have triggered a restart + expect(runnerStop).not.toHaveBeenCalled(); + expect(botStop).not.toHaveBeenCalled(); + expect(log).not.toHaveBeenCalledWith(expect.stringContaining("Polling stall detected")); + + // Clean up: abort to end the session + abort.abort(); + firstTaskResolve?.(); + await runPromise; + } finally { + setIntervalSpy.mockRestore(); + clearIntervalSpy.mockRestore(); + setTimeoutSpy.mockRestore(); + clearTimeoutSpy.mockRestore(); + dateNowSpy.mockRestore(); + } + }); + + it("does not trigger stall restart while a recent non-getUpdates API call is in-flight", async () => { + const abort = new AbortController(); + const botStop = vi.fn(async () => undefined); + const runnerStop = vi.fn(async () => undefined); + + let apiMiddleware: + | (( + prev: (...args: unknown[]) => Promise, + method: string, + payload: unknown, + ) => Promise) + | undefined; + createTelegramBotMock.mockReturnValueOnce({ + api: { + deleteWebhook: vi.fn(async () => true), + getUpdates: vi.fn(async () => []), + config: { + use: vi.fn((fn: typeof apiMiddleware) => { + apiMiddleware = fn; + }), + }, + }, + stop: botStop, + }); + + let firstTaskResolve: (() => void) | undefined; + runMock.mockReturnValue({ + task: () => + new Promise((resolve) => { + firstTaskResolve = resolve; + }), + stop: async () => { + await runnerStop(); + firstTaskResolve?.(); + }, + isRunning: () => true, + }); + + // t=0: lastGetUpdatesAt and lastApiActivityAt initialized + const setIntervalSpy = vi.spyOn(globalThis, "setInterval").mockImplementation((fn) => { + watchdog = fn as () => void; + return 1 as unknown as ReturnType; + }); + const clearIntervalSpy = vi.spyOn(globalThis, "clearInterval").mockImplementation(() => {}); + const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout").mockImplementation((fn) => { + void Promise.resolve().then(() => (fn as () => void)()); + return 1 as unknown as ReturnType; + }); + const clearTimeoutSpy = vi.spyOn(globalThis, "clearTimeout").mockImplementation(() => {}); + const dateNowSpy = vi + .spyOn(Date, "now") + .mockImplementationOnce(() => 0) // lastGetUpdatesAt init + .mockImplementationOnce(() => 0) // lastApiActivityAt init + .mockImplementationOnce(() => 60_000) // sendMessage start + .mockImplementation(() => 120_001); + + let watchdog: (() => void) | undefined; + const log = vi.fn(); + const session = new TelegramPollingSession({ + token: "tok", + config: {}, + accountId: "default", + runtime: undefined, + proxyFetch: undefined, + abortSignal: abort.signal, + runnerOptions: {}, + getLastUpdateId: () => null, + persistUpdateId: async () => undefined, + log, + telegramTransport: undefined, + }); + + try { + const runPromise = session.runUntilAbort(); + + for (let attempt = 0; attempt < 20 && !watchdog; attempt += 1) { + await Promise.resolve(); + } + expect(watchdog).toBeTypeOf("function"); + + // Start an in-flight sendMessage that has NOT yet resolved. + // This simulates a slow delivery where the API call is still pending. + let resolveSendMessage: ((v: unknown) => void) | undefined; + if (apiMiddleware) { + const slowPrev = vi.fn( + () => + new Promise((resolve) => { + resolveSendMessage = resolve; + }), + ); + // Fire-and-forget: the call is in-flight but not awaited yet + const sendPromise = apiMiddleware(slowPrev, "sendMessage", { chat_id: 123, text: "hello" }); + + // Fire the watchdog while sendMessage is still in-flight. + // The in-flight call started 60s ago, so API liveness is still recent. + watchdog?.(); + + // The watchdog should NOT have triggered a restart + expect(runnerStop).not.toHaveBeenCalled(); + expect(botStop).not.toHaveBeenCalled(); + expect(log).not.toHaveBeenCalledWith(expect.stringContaining("Polling stall detected")); + + // Resolve the in-flight call to clean up + resolveSendMessage?.({ ok: true }); + await sendPromise; + } + + abort.abort(); + firstTaskResolve?.(); + await runPromise; + } finally { + setIntervalSpy.mockRestore(); + clearIntervalSpy.mockRestore(); + setTimeoutSpy.mockRestore(); + clearTimeoutSpy.mockRestore(); + dateNowSpy.mockRestore(); + } + }); + + it("triggers stall restart when a non-getUpdates API call has been in-flight past the threshold", async () => { + const abort = new AbortController(); + const botStop = vi.fn(async () => undefined); + const runnerStop = vi.fn(async () => undefined); + + let apiMiddleware: + | (( + prev: (...args: unknown[]) => Promise, + method: string, + payload: unknown, + ) => Promise) + | undefined; + createTelegramBotMock.mockReturnValueOnce({ + api: { + deleteWebhook: vi.fn(async () => true), + getUpdates: vi.fn(async () => []), + config: { + use: vi.fn((fn: typeof apiMiddleware) => { + apiMiddleware = fn; + }), + }, + }, + stop: botStop, + }); + + let firstTaskResolve: (() => void) | undefined; + runMock.mockReturnValue({ + task: () => + new Promise((resolve) => { + firstTaskResolve = resolve; + }), + stop: async () => { + await runnerStop(); + firstTaskResolve?.(); + }, + isRunning: () => true, + }); + + const setIntervalSpy = vi.spyOn(globalThis, "setInterval").mockImplementation((fn) => { + watchdog = fn as () => void; + return 1 as unknown as ReturnType; + }); + const clearIntervalSpy = vi.spyOn(globalThis, "clearInterval").mockImplementation(() => {}); + const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout").mockImplementation((fn) => { + void Promise.resolve().then(() => (fn as () => void)()); + return 1 as unknown as ReturnType; + }); + const clearTimeoutSpy = vi.spyOn(globalThis, "clearTimeout").mockImplementation(() => {}); + const dateNowSpy = vi + .spyOn(Date, "now") + .mockImplementationOnce(() => 0) // lastGetUpdatesAt init + .mockImplementationOnce(() => 0) // lastApiActivityAt init + .mockImplementationOnce(() => 1) // sendMessage start + .mockImplementation(() => 120_001); + + let watchdog: (() => void) | undefined; + const log = vi.fn(); + const session = new TelegramPollingSession({ + token: "tok", + config: {}, + accountId: "default", + runtime: undefined, + proxyFetch: undefined, + abortSignal: abort.signal, + runnerOptions: {}, + getLastUpdateId: () => null, + persistUpdateId: async () => undefined, + log, + telegramTransport: undefined, + }); + + try { + const runPromise = session.runUntilAbort(); + + for (let attempt = 0; attempt < 20 && !watchdog; attempt += 1) { + await Promise.resolve(); + } + expect(watchdog).toBeTypeOf("function"); + + let resolveSendMessage: ((v: unknown) => void) | undefined; + if (apiMiddleware) { + const slowPrev = vi.fn( + () => + new Promise((resolve) => { + resolveSendMessage = resolve; + }), + ); + const sendPromise = apiMiddleware(slowPrev, "sendMessage", { chat_id: 123, text: "hello" }); + + // The in-flight send started at t=1 and is still stuck at t=120_001. + // That is older than the watchdog threshold, so restart should proceed. + watchdog?.(); + + expect(runnerStop).toHaveBeenCalledTimes(1); + expect(botStop).toHaveBeenCalledTimes(1); + expect(log).toHaveBeenCalledWith(expect.stringContaining("Polling stall detected")); + + resolveSendMessage?.({ ok: true }); + await sendPromise; + } + + abort.abort(); + firstTaskResolve?.(); + await runPromise; + } finally { + setIntervalSpy.mockRestore(); + clearIntervalSpy.mockRestore(); + setTimeoutSpy.mockRestore(); + clearTimeoutSpy.mockRestore(); + dateNowSpy.mockRestore(); + } + }); + + it("does not trigger stall restart when a newer non-getUpdates API call starts while an older one is still in-flight", async () => { + const abort = new AbortController(); + const botStop = vi.fn(async () => undefined); + const runnerStop = vi.fn(async () => undefined); + + let apiMiddleware: + | (( + prev: (...args: unknown[]) => Promise, + method: string, + payload: unknown, + ) => Promise) + | undefined; + createTelegramBotMock.mockReturnValueOnce({ + api: { + deleteWebhook: vi.fn(async () => true), + getUpdates: vi.fn(async () => []), + config: { + use: vi.fn((fn: typeof apiMiddleware) => { + apiMiddleware = fn; + }), + }, + }, + stop: botStop, + }); + + let firstTaskResolve: (() => void) | undefined; + runMock.mockReturnValue({ + task: () => + new Promise((resolve) => { + firstTaskResolve = resolve; + }), + stop: async () => { + await runnerStop(); + firstTaskResolve?.(); + }, + isRunning: () => true, + }); + + const setIntervalSpy = vi.spyOn(globalThis, "setInterval").mockImplementation((fn) => { + watchdog = fn as () => void; + return 1 as unknown as ReturnType; + }); + const clearIntervalSpy = vi.spyOn(globalThis, "clearInterval").mockImplementation(() => {}); + const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout").mockImplementation((fn) => { + void Promise.resolve().then(() => (fn as () => void)()); + return 1 as unknown as ReturnType; + }); + const clearTimeoutSpy = vi.spyOn(globalThis, "clearTimeout").mockImplementation(() => {}); + const dateNowSpy = vi + .spyOn(Date, "now") + .mockImplementationOnce(() => 0) // lastGetUpdatesAt init + .mockImplementationOnce(() => 0) // lastApiActivityAt init + .mockImplementationOnce(() => 1) // first sendMessage start + .mockImplementationOnce(() => 120_000) // second sendMessage start + .mockImplementation(() => 120_001); + + let watchdog: (() => void) | undefined; + const log = vi.fn(); + const session = new TelegramPollingSession({ + token: "tok", + config: {}, + accountId: "default", + runtime: undefined, + proxyFetch: undefined, + abortSignal: abort.signal, + runnerOptions: {}, + getLastUpdateId: () => null, + persistUpdateId: async () => undefined, + log, + telegramTransport: undefined, + }); + + try { + const runPromise = session.runUntilAbort(); + + for (let attempt = 0; attempt < 20 && !watchdog; attempt += 1) { + await Promise.resolve(); + } + expect(watchdog).toBeTypeOf("function"); + + let resolveFirstSend: ((v: unknown) => void) | undefined; + let resolveSecondSend: ((v: unknown) => void) | undefined; + if (apiMiddleware) { + const firstSendPromise = apiMiddleware( + vi.fn( + () => + new Promise((resolve) => { + resolveFirstSend = resolve; + }), + ), + "sendMessage", + { chat_id: 123, text: "older" }, + ); + const secondSendPromise = apiMiddleware( + vi.fn( + () => + new Promise((resolve) => { + resolveSecondSend = resolve; + }), + ), + "sendMessage", + { chat_id: 123, text: "newer" }, + ); + + // The older send is stale, but the newer send started just now. + // Watchdog liveness must follow the newest active non-getUpdates call. + watchdog?.(); + + expect(runnerStop).not.toHaveBeenCalled(); + expect(botStop).not.toHaveBeenCalled(); + expect(log).not.toHaveBeenCalledWith(expect.stringContaining("Polling stall detected")); + + resolveFirstSend?.({ ok: true }); + resolveSecondSend?.({ ok: true }); + await firstSendPromise; + await secondSendPromise; + } + + abort.abort(); + firstTaskResolve?.(); + await runPromise; + } finally { + setIntervalSpy.mockRestore(); + clearIntervalSpy.mockRestore(); + setTimeoutSpy.mockRestore(); + clearTimeoutSpy.mockRestore(); + dateNowSpy.mockRestore(); + } + }); + it("reuses the transport after a getUpdates conflict", async () => { const abort = new AbortController(); const conflictError = Object.assign( diff --git a/extensions/telegram/src/polling-session.ts b/extensions/telegram/src/polling-session.ts index 908a4a00a97..921f9bd35d8 100644 --- a/extensions/telegram/src/polling-session.ts +++ b/extensions/telegram/src/polling-session.ts @@ -204,6 +204,10 @@ export class TelegramPollingSession { await this.#confirmPersistedOffset(bot); let lastGetUpdatesAt = Date.now(); + let lastApiActivityAt = Date.now(); + let nextInFlightApiCallId = 0; + let latestInFlightApiStartedAt: number | null = null; + const inFlightApiStartedAt = new Map(); let lastGetUpdatesStartedAt: number | null = null; let lastGetUpdatesFinishedAt: number | null = null; let lastGetUpdatesDurationMs: number | null = null; @@ -216,7 +220,31 @@ export class TelegramPollingSession { bot.api.config.use(async (prev, method, payload, signal) => { if (method !== "getUpdates") { - return prev(method, payload, signal); + const startedAt = Date.now(); + const callId = nextInFlightApiCallId; + nextInFlightApiCallId += 1; + inFlightApiStartedAt.set(callId, startedAt); + latestInFlightApiStartedAt = + latestInFlightApiStartedAt == null + ? startedAt + : Math.max(latestInFlightApiStartedAt, startedAt); + try { + const result = await prev(method, payload, signal); + lastApiActivityAt = Date.now(); + return result; + } finally { + inFlightApiStartedAt.delete(callId); + if (latestInFlightApiStartedAt === startedAt) { + let newestStartedAt: number | null = null; + for (const activeStartedAt of inFlightApiStartedAt.values()) { + newestStartedAt = + newestStartedAt == null + ? activeStartedAt + : Math.max(newestStartedAt, activeStartedAt); + } + latestInFlightApiStartedAt = newestStartedAt; + } + } } const startedAt = Date.now(); @@ -301,8 +329,20 @@ export class TelegramPollingSession { const idleElapsed = inFlightGetUpdates > 0 ? 0 : now - (lastGetUpdatesFinishedAt ?? lastGetUpdatesAt); const elapsed = inFlightGetUpdates > 0 ? activeElapsed : idleElapsed; + const apiLivenessAt = + latestInFlightApiStartedAt == null + ? lastApiActivityAt + : Math.max(lastApiActivityAt, latestInFlightApiStartedAt); + const apiElapsed = now - apiLivenessAt; - if (elapsed > POLL_STALL_THRESHOLD_MS && runner.isRunning()) { + // Treat recent non-getUpdates success and recent non-getUpdates start as + // the same liveness signal. Slow delivery should suppress the watchdog, + // but only for the same bounded window as recent successful API traffic. + if ( + elapsed > POLL_STALL_THRESHOLD_MS && + apiElapsed > POLL_STALL_THRESHOLD_MS && + runner.isRunning() + ) { if (stallDiagLoggedAt && now - stallDiagLoggedAt < POLL_STALL_THRESHOLD_MS / 2) { return; }