openclaw/extensions/telegram/src/polling-session.test.ts

871 lines
28 KiB
TypeScript

import { beforeEach, describe, expect, it, vi } from "vitest";
const runMock = vi.hoisted(() => vi.fn());
const createTelegramBotMock = vi.hoisted(() => vi.fn());
const isRecoverableTelegramNetworkErrorMock = vi.hoisted(() => vi.fn(() => true));
const computeBackoffMock = vi.hoisted(() => vi.fn(() => 0));
const sleepWithAbortMock = vi.hoisted(() => vi.fn(async () => undefined));
vi.mock("@grammyjs/runner", () => ({
run: runMock,
}));
vi.mock("./bot.js", () => ({
createTelegramBot: createTelegramBotMock,
}));
vi.mock("./network-errors.js", () => ({
isRecoverableTelegramNetworkError: isRecoverableTelegramNetworkErrorMock,
}));
vi.mock("./api-logging.js", () => ({
withTelegramApiErrorLogging: async ({ fn }: { fn: () => Promise<unknown> }) => await fn(),
}));
vi.mock("openclaw/plugin-sdk/runtime-env", async (importOriginal) => {
const actual = await importOriginal<typeof import("openclaw/plugin-sdk/runtime-env")>();
return {
...actual,
computeBackoff: computeBackoffMock,
sleepWithAbort: sleepWithAbortMock,
};
});
let TelegramPollingSession: typeof import("./polling-session.js").TelegramPollingSession;
function makeBot() {
return {
api: {
deleteWebhook: vi.fn(async () => true),
getUpdates: vi.fn(async () => []),
config: { use: vi.fn() },
},
stop: vi.fn(async () => undefined),
};
}
function installPollingStallWatchdogHarness() {
let watchdog: (() => void) | undefined;
const setIntervalSpy = vi.spyOn(globalThis, "setInterval").mockImplementation((fn) => {
watchdog = fn as () => void;
return 1 as unknown as ReturnType<typeof setInterval>;
});
const clearIntervalSpy = vi.spyOn(globalThis, "clearInterval").mockImplementation(() => {});
const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout").mockImplementation((fn) => {
void Promise.resolve().then(() => (fn as () => void)());
return 1 as unknown as ReturnType<typeof setTimeout>;
});
const clearTimeoutSpy = vi.spyOn(globalThis, "clearTimeout").mockImplementation(() => {});
const dateNowSpy = vi
.spyOn(Date, "now")
.mockImplementationOnce(() => 0) // lastGetUpdatesAt init
.mockImplementationOnce(() => 0) // lastApiActivityAt init
.mockImplementation(() => 120_001);
return {
async waitForWatchdog() {
for (let attempt = 0; attempt < 20 && !watchdog; attempt += 1) {
await Promise.resolve();
}
expect(watchdog).toBeTypeOf("function");
return watchdog;
},
restore() {
setIntervalSpy.mockRestore();
clearIntervalSpy.mockRestore();
setTimeoutSpy.mockRestore();
clearTimeoutSpy.mockRestore();
dateNowSpy.mockRestore();
},
};
}
function expectTelegramBotTransportSequence(firstTransport: unknown, secondTransport: unknown) {
expect(createTelegramBotMock).toHaveBeenCalledTimes(2);
expect(createTelegramBotMock.mock.calls[0]?.[0]?.telegramTransport).toBe(firstTransport);
expect(createTelegramBotMock.mock.calls[1]?.[0]?.telegramTransport).toBe(secondTransport);
}
function makeTelegramTransport() {
return { fetch: globalThis.fetch, sourceFetch: globalThis.fetch };
}
function mockRestartAfterPollingError(error: unknown, abort: AbortController) {
let firstCycle = true;
runMock.mockImplementation(() => {
if (firstCycle) {
firstCycle = false;
return {
task: async () => {
throw error;
},
stop: vi.fn(async () => undefined),
isRunning: () => false,
};
}
return {
task: async () => {
abort.abort();
},
stop: vi.fn(async () => undefined),
isRunning: () => false,
};
});
}
function createPollingSessionWithTransportRestart(params: {
abortSignal: AbortSignal;
telegramTransport: ReturnType<typeof makeTelegramTransport>;
createTelegramTransport: () => ReturnType<typeof makeTelegramTransport>;
}) {
return new TelegramPollingSession({
token: "tok",
config: {},
accountId: "default",
runtime: undefined,
proxyFetch: undefined,
abortSignal: params.abortSignal,
runnerOptions: {},
getLastUpdateId: () => null,
persistUpdateId: async () => undefined,
log: () => undefined,
telegramTransport: params.telegramTransport,
createTelegramTransport: params.createTelegramTransport,
});
}
describe("TelegramPollingSession", () => {
beforeEach(async () => {
vi.resetModules();
runMock.mockReset();
createTelegramBotMock.mockReset();
isRecoverableTelegramNetworkErrorMock.mockReset().mockReturnValue(true);
computeBackoffMock.mockReset().mockReturnValue(0);
sleepWithAbortMock.mockReset().mockResolvedValue(undefined);
({ TelegramPollingSession } = await import("./polling-session.js"));
});
it("uses backoff helpers for recoverable polling retries", async () => {
const abort = new AbortController();
const recoverableError = new Error("recoverable polling error");
const botStop = vi.fn(async () => undefined);
const runnerStop = vi.fn(async () => undefined);
const bot = {
api: {
deleteWebhook: vi.fn(async () => true),
getUpdates: vi.fn(async () => []),
config: { use: vi.fn() },
},
stop: botStop,
};
createTelegramBotMock.mockReturnValue(bot);
let firstCycle = true;
runMock.mockImplementation(() => {
if (firstCycle) {
firstCycle = false;
return {
task: async () => {
throw recoverableError;
},
stop: runnerStop,
isRunning: () => false,
};
}
return {
task: async () => {
abort.abort();
},
stop: runnerStop,
isRunning: () => false,
};
});
const session = new TelegramPollingSession({
token: "tok",
config: {},
accountId: "default",
runtime: undefined,
proxyFetch: undefined,
abortSignal: abort.signal,
runnerOptions: {},
getLastUpdateId: () => null,
persistUpdateId: async () => undefined,
log: () => undefined,
telegramTransport: undefined,
});
await session.runUntilAbort();
expect(runMock).toHaveBeenCalledTimes(2);
expect(computeBackoffMock).toHaveBeenCalledTimes(1);
expect(sleepWithAbortMock).toHaveBeenCalledTimes(1);
});
it("forces a restart when polling stalls without getUpdates activity", async () => {
const abort = new AbortController();
const botStop = vi.fn(async () => undefined);
const firstRunnerStop = vi.fn(async () => undefined);
const secondRunnerStop = vi.fn(async () => undefined);
const bot = {
api: {
deleteWebhook: vi.fn(async () => true),
getUpdates: vi.fn(async () => []),
config: { use: vi.fn() },
},
stop: botStop,
};
createTelegramBotMock.mockReturnValue(bot);
let firstTaskResolve: (() => void) | undefined;
const firstTask = new Promise<void>((resolve) => {
firstTaskResolve = resolve;
});
let cycle = 0;
runMock.mockImplementation(() => {
cycle += 1;
if (cycle === 1) {
return {
task: () => firstTask,
stop: async () => {
await firstRunnerStop();
firstTaskResolve?.();
},
isRunning: () => true,
};
}
return {
task: async () => {
abort.abort();
},
stop: secondRunnerStop,
isRunning: () => false,
};
});
const watchdogHarness = installPollingStallWatchdogHarness();
const log = vi.fn();
const session = new TelegramPollingSession({
token: "tok",
config: {},
accountId: "default",
runtime: undefined,
proxyFetch: undefined,
abortSignal: abort.signal,
runnerOptions: {},
getLastUpdateId: () => null,
persistUpdateId: async () => undefined,
log,
telegramTransport: undefined,
});
try {
const runPromise = session.runUntilAbort();
const watchdog = await watchdogHarness.waitForWatchdog();
watchdog?.();
await runPromise;
expect(runMock).toHaveBeenCalledTimes(2);
expect(firstRunnerStop).toHaveBeenCalledTimes(1);
expect(botStop).toHaveBeenCalled();
expect(log).toHaveBeenCalledWith(expect.stringContaining("Polling stall detected"));
expect(log).toHaveBeenCalledWith(expect.stringContaining("polling stall detected"));
} finally {
watchdogHarness.restore();
}
});
it("rebuilds the transport after a stalled polling cycle", async () => {
vi.useFakeTimers({ shouldAdvanceTime: true });
const abort = new AbortController();
const firstBot = makeBot();
const secondBot = makeBot();
createTelegramBotMock.mockReturnValueOnce(firstBot).mockReturnValueOnce(secondBot);
let firstTaskResolve: (() => void) | undefined;
const firstTask = new Promise<void>((resolve) => {
firstTaskResolve = resolve;
});
let cycle = 0;
runMock.mockImplementation(() => {
cycle += 1;
if (cycle === 1) {
return {
task: () => firstTask,
stop: async () => {
firstTaskResolve?.();
},
isRunning: () => true,
};
}
return {
task: async () => {
abort.abort();
},
stop: vi.fn(async () => undefined),
isRunning: () => false,
};
});
const watchdogHarness = installPollingStallWatchdogHarness();
const transport1 = { fetch: globalThis.fetch, sourceFetch: globalThis.fetch };
const transport2 = { fetch: globalThis.fetch, sourceFetch: globalThis.fetch };
const createTelegramTransport = vi.fn(() => transport2);
try {
const session = new TelegramPollingSession({
token: "tok",
config: {},
accountId: "default",
runtime: undefined,
proxyFetch: undefined,
abortSignal: abort.signal,
runnerOptions: {},
getLastUpdateId: () => null,
persistUpdateId: async () => undefined,
log: () => undefined,
telegramTransport: transport1,
createTelegramTransport,
});
const runPromise = session.runUntilAbort();
const watchdog = await watchdogHarness.waitForWatchdog();
watchdog?.();
await runPromise;
expectTelegramBotTransportSequence(transport1, transport2);
expect(createTelegramTransport).toHaveBeenCalledTimes(1);
} finally {
watchdogHarness.restore();
vi.useRealTimers();
}
});
it("rebuilds the transport after a recoverable polling error", async () => {
const abort = new AbortController();
const recoverableError = new Error("recoverable polling error");
const transport1 = makeTelegramTransport();
const transport2 = makeTelegramTransport();
const createTelegramTransport = vi.fn(() => transport2);
createTelegramBotMock.mockReturnValueOnce(makeBot()).mockReturnValueOnce(makeBot());
mockRestartAfterPollingError(recoverableError, abort);
const session = createPollingSessionWithTransportRestart({
abortSignal: abort.signal,
telegramTransport: transport1,
createTelegramTransport,
});
await session.runUntilAbort();
expectTelegramBotTransportSequence(transport1, transport2);
expect(createTelegramTransport).toHaveBeenCalledTimes(1);
});
it("does not trigger stall restart when non-getUpdates API calls are active", async () => {
const abort = new AbortController();
const botStop = vi.fn(async () => undefined);
const runnerStop = vi.fn(async () => undefined);
// Capture the API middleware so we can simulate sendMessage calls
let apiMiddleware:
| ((
prev: (...args: unknown[]) => Promise<unknown>,
method: string,
payload: unknown,
) => Promise<unknown>)
| undefined;
const bot = {
api: {
deleteWebhook: vi.fn(async () => true),
getUpdates: vi.fn(async () => []),
config: {
use: vi.fn((fn: typeof apiMiddleware) => {
apiMiddleware = fn;
}),
},
},
stop: botStop,
};
createTelegramBotMock.mockReturnValue(bot);
let firstTaskResolve: (() => void) | undefined;
const firstTask = new Promise<void>((resolve) => {
firstTaskResolve = resolve;
});
runMock.mockImplementation(() => ({
task: () => firstTask,
stop: async () => {
await runnerStop();
firstTaskResolve?.();
},
isRunning: () => true,
}));
// t=0: lastGetUpdatesAt and lastApiActivityAt initialized
// t=120_001: watchdog fires (getUpdates stale for 120s)
// But right before watchdog, a sendMessage succeeded at t=120_000
const setIntervalSpy = vi.spyOn(globalThis, "setInterval").mockImplementation((fn) => {
watchdog = fn as () => void;
return 1 as unknown as ReturnType<typeof setInterval>;
});
const clearIntervalSpy = vi.spyOn(globalThis, "clearInterval").mockImplementation(() => {});
const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout").mockImplementation((fn) => {
void Promise.resolve().then(() => (fn as () => void)());
return 1 as unknown as ReturnType<typeof setTimeout>;
});
const clearTimeoutSpy = vi.spyOn(globalThis, "clearTimeout").mockImplementation(() => {});
const dateNowSpy = vi
.spyOn(Date, "now")
.mockImplementationOnce(() => 0) // lastGetUpdatesAt init
.mockImplementationOnce(() => 0) // lastApiActivityAt init
// All subsequent calls (sendMessage completion + watchdog check) return
// the same value, giving apiIdle = 0 — well below the stall threshold.
.mockImplementation(() => 120_001);
let watchdog: (() => void) | undefined;
const log = vi.fn();
const session = new TelegramPollingSession({
token: "tok",
config: {},
accountId: "default",
runtime: undefined,
proxyFetch: undefined,
abortSignal: abort.signal,
runnerOptions: {},
getLastUpdateId: () => null,
persistUpdateId: async () => undefined,
log,
telegramTransport: undefined,
});
try {
const runPromise = session.runUntilAbort();
// Wait for watchdog to be captured
for (let attempt = 0; attempt < 20 && !watchdog; attempt += 1) {
await Promise.resolve();
}
expect(watchdog).toBeTypeOf("function");
// Simulate a sendMessage call through the middleware before watchdog fires.
// This updates lastApiActivityAt, proving the network is alive.
if (apiMiddleware) {
const fakePrev = vi.fn(async () => ({ ok: true }));
await apiMiddleware(fakePrev, "sendMessage", { chat_id: 123, text: "hello" });
}
// Now fire the watchdog — getUpdates is stale (120s) but API was just active
watchdog?.();
// The watchdog should NOT have triggered a restart
expect(runnerStop).not.toHaveBeenCalled();
expect(botStop).not.toHaveBeenCalled();
expect(log).not.toHaveBeenCalledWith(expect.stringContaining("Polling stall detected"));
// Clean up: abort to end the session
abort.abort();
firstTaskResolve?.();
await runPromise;
} finally {
setIntervalSpy.mockRestore();
clearIntervalSpy.mockRestore();
setTimeoutSpy.mockRestore();
clearTimeoutSpy.mockRestore();
dateNowSpy.mockRestore();
}
});
it("does not trigger stall restart while a recent non-getUpdates API call is in-flight", async () => {
const abort = new AbortController();
const botStop = vi.fn(async () => undefined);
const runnerStop = vi.fn(async () => undefined);
let apiMiddleware:
| ((
prev: (...args: unknown[]) => Promise<unknown>,
method: string,
payload: unknown,
) => Promise<unknown>)
| undefined;
createTelegramBotMock.mockReturnValueOnce({
api: {
deleteWebhook: vi.fn(async () => true),
getUpdates: vi.fn(async () => []),
config: {
use: vi.fn((fn: typeof apiMiddleware) => {
apiMiddleware = fn;
}),
},
},
stop: botStop,
});
let firstTaskResolve: (() => void) | undefined;
runMock.mockReturnValue({
task: () =>
new Promise<void>((resolve) => {
firstTaskResolve = resolve;
}),
stop: async () => {
await runnerStop();
firstTaskResolve?.();
},
isRunning: () => true,
});
// t=0: lastGetUpdatesAt and lastApiActivityAt initialized
const setIntervalSpy = vi.spyOn(globalThis, "setInterval").mockImplementation((fn) => {
watchdog = fn as () => void;
return 1 as unknown as ReturnType<typeof setInterval>;
});
const clearIntervalSpy = vi.spyOn(globalThis, "clearInterval").mockImplementation(() => {});
const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout").mockImplementation((fn) => {
void Promise.resolve().then(() => (fn as () => void)());
return 1 as unknown as ReturnType<typeof setTimeout>;
});
const clearTimeoutSpy = vi.spyOn(globalThis, "clearTimeout").mockImplementation(() => {});
const dateNowSpy = vi
.spyOn(Date, "now")
.mockImplementationOnce(() => 0) // lastGetUpdatesAt init
.mockImplementationOnce(() => 0) // lastApiActivityAt init
.mockImplementationOnce(() => 60_000) // sendMessage start
.mockImplementation(() => 120_001);
let watchdog: (() => void) | undefined;
const log = vi.fn();
const session = new TelegramPollingSession({
token: "tok",
config: {},
accountId: "default",
runtime: undefined,
proxyFetch: undefined,
abortSignal: abort.signal,
runnerOptions: {},
getLastUpdateId: () => null,
persistUpdateId: async () => undefined,
log,
telegramTransport: undefined,
});
try {
const runPromise = session.runUntilAbort();
for (let attempt = 0; attempt < 20 && !watchdog; attempt += 1) {
await Promise.resolve();
}
expect(watchdog).toBeTypeOf("function");
// Start an in-flight sendMessage that has NOT yet resolved.
// This simulates a slow delivery where the API call is still pending.
let resolveSendMessage: ((v: unknown) => void) | undefined;
if (apiMiddleware) {
const slowPrev = vi.fn(
() =>
new Promise((resolve) => {
resolveSendMessage = resolve;
}),
);
// Fire-and-forget: the call is in-flight but not awaited yet
const sendPromise = apiMiddleware(slowPrev, "sendMessage", { chat_id: 123, text: "hello" });
// Fire the watchdog while sendMessage is still in-flight.
// The in-flight call started 60s ago, so API liveness is still recent.
watchdog?.();
// The watchdog should NOT have triggered a restart
expect(runnerStop).not.toHaveBeenCalled();
expect(botStop).not.toHaveBeenCalled();
expect(log).not.toHaveBeenCalledWith(expect.stringContaining("Polling stall detected"));
// Resolve the in-flight call to clean up
resolveSendMessage?.({ ok: true });
await sendPromise;
}
abort.abort();
firstTaskResolve?.();
await runPromise;
} finally {
setIntervalSpy.mockRestore();
clearIntervalSpy.mockRestore();
setTimeoutSpy.mockRestore();
clearTimeoutSpy.mockRestore();
dateNowSpy.mockRestore();
}
});
it("triggers stall restart when a non-getUpdates API call has been in-flight past the threshold", async () => {
const abort = new AbortController();
const botStop = vi.fn(async () => undefined);
const runnerStop = vi.fn(async () => undefined);
let apiMiddleware:
| ((
prev: (...args: unknown[]) => Promise<unknown>,
method: string,
payload: unknown,
) => Promise<unknown>)
| undefined;
createTelegramBotMock.mockReturnValueOnce({
api: {
deleteWebhook: vi.fn(async () => true),
getUpdates: vi.fn(async () => []),
config: {
use: vi.fn((fn: typeof apiMiddleware) => {
apiMiddleware = fn;
}),
},
},
stop: botStop,
});
let firstTaskResolve: (() => void) | undefined;
runMock.mockReturnValue({
task: () =>
new Promise<void>((resolve) => {
firstTaskResolve = resolve;
}),
stop: async () => {
await runnerStop();
firstTaskResolve?.();
},
isRunning: () => true,
});
const setIntervalSpy = vi.spyOn(globalThis, "setInterval").mockImplementation((fn) => {
watchdog = fn as () => void;
return 1 as unknown as ReturnType<typeof setInterval>;
});
const clearIntervalSpy = vi.spyOn(globalThis, "clearInterval").mockImplementation(() => {});
const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout").mockImplementation((fn) => {
void Promise.resolve().then(() => (fn as () => void)());
return 1 as unknown as ReturnType<typeof setTimeout>;
});
const clearTimeoutSpy = vi.spyOn(globalThis, "clearTimeout").mockImplementation(() => {});
const dateNowSpy = vi
.spyOn(Date, "now")
.mockImplementationOnce(() => 0) // lastGetUpdatesAt init
.mockImplementationOnce(() => 0) // lastApiActivityAt init
.mockImplementationOnce(() => 1) // sendMessage start
.mockImplementation(() => 120_001);
let watchdog: (() => void) | undefined;
const log = vi.fn();
const session = new TelegramPollingSession({
token: "tok",
config: {},
accountId: "default",
runtime: undefined,
proxyFetch: undefined,
abortSignal: abort.signal,
runnerOptions: {},
getLastUpdateId: () => null,
persistUpdateId: async () => undefined,
log,
telegramTransport: undefined,
});
try {
const runPromise = session.runUntilAbort();
for (let attempt = 0; attempt < 20 && !watchdog; attempt += 1) {
await Promise.resolve();
}
expect(watchdog).toBeTypeOf("function");
let resolveSendMessage: ((v: unknown) => void) | undefined;
if (apiMiddleware) {
const slowPrev = vi.fn(
() =>
new Promise((resolve) => {
resolveSendMessage = resolve;
}),
);
const sendPromise = apiMiddleware(slowPrev, "sendMessage", { chat_id: 123, text: "hello" });
// The in-flight send started at t=1 and is still stuck at t=120_001.
// That is older than the watchdog threshold, so restart should proceed.
watchdog?.();
expect(runnerStop).toHaveBeenCalledTimes(1);
expect(botStop).toHaveBeenCalledTimes(1);
expect(log).toHaveBeenCalledWith(expect.stringContaining("Polling stall detected"));
resolveSendMessage?.({ ok: true });
await sendPromise;
}
abort.abort();
firstTaskResolve?.();
await runPromise;
} finally {
setIntervalSpy.mockRestore();
clearIntervalSpy.mockRestore();
setTimeoutSpy.mockRestore();
clearTimeoutSpy.mockRestore();
dateNowSpy.mockRestore();
}
});
it("does not trigger stall restart when a newer non-getUpdates API call starts while an older one is still in-flight", async () => {
const abort = new AbortController();
const botStop = vi.fn(async () => undefined);
const runnerStop = vi.fn(async () => undefined);
let apiMiddleware:
| ((
prev: (...args: unknown[]) => Promise<unknown>,
method: string,
payload: unknown,
) => Promise<unknown>)
| undefined;
createTelegramBotMock.mockReturnValueOnce({
api: {
deleteWebhook: vi.fn(async () => true),
getUpdates: vi.fn(async () => []),
config: {
use: vi.fn((fn: typeof apiMiddleware) => {
apiMiddleware = fn;
}),
},
},
stop: botStop,
});
let firstTaskResolve: (() => void) | undefined;
runMock.mockReturnValue({
task: () =>
new Promise<void>((resolve) => {
firstTaskResolve = resolve;
}),
stop: async () => {
await runnerStop();
firstTaskResolve?.();
},
isRunning: () => true,
});
const setIntervalSpy = vi.spyOn(globalThis, "setInterval").mockImplementation((fn) => {
watchdog = fn as () => void;
return 1 as unknown as ReturnType<typeof setInterval>;
});
const clearIntervalSpy = vi.spyOn(globalThis, "clearInterval").mockImplementation(() => {});
const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout").mockImplementation((fn) => {
void Promise.resolve().then(() => (fn as () => void)());
return 1 as unknown as ReturnType<typeof setTimeout>;
});
const clearTimeoutSpy = vi.spyOn(globalThis, "clearTimeout").mockImplementation(() => {});
const dateNowSpy = vi
.spyOn(Date, "now")
.mockImplementationOnce(() => 0) // lastGetUpdatesAt init
.mockImplementationOnce(() => 0) // lastApiActivityAt init
.mockImplementationOnce(() => 1) // first sendMessage start
.mockImplementationOnce(() => 120_000) // second sendMessage start
.mockImplementation(() => 120_001);
let watchdog: (() => void) | undefined;
const log = vi.fn();
const session = new TelegramPollingSession({
token: "tok",
config: {},
accountId: "default",
runtime: undefined,
proxyFetch: undefined,
abortSignal: abort.signal,
runnerOptions: {},
getLastUpdateId: () => null,
persistUpdateId: async () => undefined,
log,
telegramTransport: undefined,
});
try {
const runPromise = session.runUntilAbort();
for (let attempt = 0; attempt < 20 && !watchdog; attempt += 1) {
await Promise.resolve();
}
expect(watchdog).toBeTypeOf("function");
let resolveFirstSend: ((v: unknown) => void) | undefined;
let resolveSecondSend: ((v: unknown) => void) | undefined;
if (apiMiddleware) {
const firstSendPromise = apiMiddleware(
vi.fn(
() =>
new Promise((resolve) => {
resolveFirstSend = resolve;
}),
),
"sendMessage",
{ chat_id: 123, text: "older" },
);
const secondSendPromise = apiMiddleware(
vi.fn(
() =>
new Promise((resolve) => {
resolveSecondSend = resolve;
}),
),
"sendMessage",
{ chat_id: 123, text: "newer" },
);
// The older send is stale, but the newer send started just now.
// Watchdog liveness must follow the newest active non-getUpdates call.
watchdog?.();
expect(runnerStop).not.toHaveBeenCalled();
expect(botStop).not.toHaveBeenCalled();
expect(log).not.toHaveBeenCalledWith(expect.stringContaining("Polling stall detected"));
resolveFirstSend?.({ ok: true });
resolveSecondSend?.({ ok: true });
await firstSendPromise;
await secondSendPromise;
}
abort.abort();
firstTaskResolve?.();
await runPromise;
} finally {
setIntervalSpy.mockRestore();
clearIntervalSpy.mockRestore();
setTimeoutSpy.mockRestore();
clearTimeoutSpy.mockRestore();
dateNowSpy.mockRestore();
}
});
it("reuses the transport after a getUpdates conflict", async () => {
const abort = new AbortController();
const conflictError = Object.assign(
new Error("Conflict: terminated by other getUpdates request"),
{
error_code: 409,
method: "getUpdates",
},
);
const transport1 = makeTelegramTransport();
const createTelegramTransport = vi.fn(() => makeTelegramTransport());
createTelegramBotMock.mockReturnValueOnce(makeBot()).mockReturnValueOnce(makeBot());
isRecoverableTelegramNetworkErrorMock.mockReturnValue(false);
mockRestartAfterPollingError(conflictError, abort);
const session = createPollingSessionWithTransportRestart({
abortSignal: abort.signal,
telegramTransport: transport1,
createTelegramTransport,
});
await session.runUntilAbort();
expectTelegramBotTransportSequence(transport1, transport1);
expect(createTelegramTransport).not.toHaveBeenCalled();
});
});