mirror of https://github.com/openclaw/openclaw.git
fix(gateway): stop stale-socket restarts before first event (#38643)
* fix(gateway): guard stale-socket restarts by event liveness * fix(gateway): centralize connect-time liveness tracking * fix(web): apply connected status patch atomically * fix(gateway): require active socket for stale checks * fix(gateway): ignore inherited stale event timestamps
This commit is contained in:
parent
a5c07fa115
commit
8873e13f1e
|
|
@ -35,6 +35,7 @@ Docs: https://docs.openclaw.ai
|
|||
|
||||
### Fixes
|
||||
|
||||
- Gateway/Telegram stale-socket restart guard: only apply stale-socket restarts to channels that publish event-liveness timestamps, preventing Telegram providers from being misclassified as stale solely due to long uptime and avoiding restart/pairing storms after upgrade. (openclaw#38464)
|
||||
- Onboarding/headless Linux daemon probe hardening: treat `systemctl --user is-enabled` probe failures as non-fatal during daemon install flow so onboarding no longer crashes on SSH/headless VPS environments before showing install guidance. (#37297) Thanks @acarbajal-web.
|
||||
- Memory/QMD mcporter Windows spawn hardening: when `mcporter.cmd` launch fails with `spawn EINVAL`, retry via bare `mcporter` shell resolution so QMD recall can continue instead of falling back to builtin memory search. (#27402) Thanks @i0ivi0i.
|
||||
- Tools/web_search Brave language-code validation: align `search_lang` handling with Brave-supported codes (including `zh-hans`, `zh-hant`, `en-gb`, and `pt-br`), map common alias inputs (`zh`, `ja`) to valid Brave values, and reject unsupported codes before upstream requests to prevent 422 failures. (#37260) Thanks @heyanming.
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
import type { Client } from "@buape/carbon";
|
||||
import type { GatewayPlugin } from "@buape/carbon/gateway";
|
||||
import { createArmableStallWatchdog } from "../../channels/transport/stall-watchdog.js";
|
||||
import { createConnectedChannelStatusPatch } from "../../gateway/channel-status-patches.js";
|
||||
import { danger } from "../../globals.js";
|
||||
import type { RuntimeEnv } from "../../runtime.js";
|
||||
import { attachDiscordGatewayLogging } from "../gateway-logging.js";
|
||||
|
|
@ -180,8 +181,7 @@ export async function runDiscordGatewayLifecycle(params: {
|
|||
let sawConnected = gateway?.isConnected === true;
|
||||
if (sawConnected) {
|
||||
pushStatus({
|
||||
connected: true,
|
||||
lastConnectedAt: at,
|
||||
...createConnectedChannelStatusPatch(at),
|
||||
lastDisconnect: null,
|
||||
});
|
||||
}
|
||||
|
|
@ -194,9 +194,7 @@ export async function runDiscordGatewayLifecycle(params: {
|
|||
const connectedAt = Date.now();
|
||||
reconnectStallWatchdog.disarm();
|
||||
pushStatus({
|
||||
connected: true,
|
||||
lastEventAt: connectedAt,
|
||||
lastConnectedAt: connectedAt,
|
||||
...createConnectedChannelStatusPatch(connectedAt),
|
||||
lastDisconnect: null,
|
||||
});
|
||||
if (helloConnectedPollId) {
|
||||
|
|
@ -253,9 +251,7 @@ export async function runDiscordGatewayLifecycle(params: {
|
|||
if (gateway?.isConnected && !lifecycleStopping) {
|
||||
const at = Date.now();
|
||||
pushStatus({
|
||||
connected: true,
|
||||
lastEventAt: at,
|
||||
lastConnectedAt: at,
|
||||
...createConnectedChannelStatusPatch(at),
|
||||
lastDisconnect: null,
|
||||
});
|
||||
}
|
||||
|
|
|
|||
|
|
@ -36,6 +36,7 @@ import {
|
|||
resolveDefaultGroupPolicy,
|
||||
warnMissingProviderGroupPolicyFallbackOnce,
|
||||
} from "../../config/runtime-group-policy.js";
|
||||
import { createConnectedChannelStatusPatch } from "../../gateway/channel-status-patches.js";
|
||||
import { danger, logVerbose, shouldLogVerbose, warn } from "../../globals.js";
|
||||
import { formatErrorMessage } from "../../infra/errors.js";
|
||||
import { createDiscordRetryRunner } from "../../infra/retry-policy.js";
|
||||
|
|
@ -752,7 +753,7 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) {
|
|||
botUserId && botUserName ? `${botUserId} (${botUserName})` : (botUserId ?? botUserName ?? "");
|
||||
runtime.log?.(`logged in to discord${botIdentity ? ` as ${botIdentity}` : ""}`);
|
||||
if (lifecycleGateway?.isConnected) {
|
||||
opts.setStatus?.({ connected: true });
|
||||
opts.setStatus?.(createConnectedChannelStatusPatch());
|
||||
}
|
||||
|
||||
lifecycleStarted = true;
|
||||
|
|
|
|||
|
|
@ -489,16 +489,34 @@ describe("channel-health-monitor", () => {
|
|||
await expectNoRestart(manager);
|
||||
});
|
||||
|
||||
it("restarts a channel that never received any event past the stale threshold", async () => {
|
||||
it("restarts a channel that has seen no events since connect past the stale threshold", async () => {
|
||||
const now = Date.now();
|
||||
const manager = createSlackSnapshotManager(
|
||||
runningConnectedSlackAccount({
|
||||
lastStartAt: now - STALE_THRESHOLD - 60_000,
|
||||
lastEventAt: now - STALE_THRESHOLD - 60_000,
|
||||
}),
|
||||
);
|
||||
await expectRestartedChannel(manager, "slack");
|
||||
});
|
||||
|
||||
it("skips connected channels that do not report event liveness", async () => {
|
||||
const now = Date.now();
|
||||
const manager = createSnapshotManager({
|
||||
telegram: {
|
||||
default: {
|
||||
running: true,
|
||||
connected: true,
|
||||
enabled: true,
|
||||
configured: true,
|
||||
lastStartAt: now - STALE_THRESHOLD - 60_000,
|
||||
lastEventAt: null,
|
||||
},
|
||||
},
|
||||
});
|
||||
await expectNoRestart(manager);
|
||||
});
|
||||
|
||||
it("respects custom staleEventThresholdMs", async () => {
|
||||
const customThreshold = 10 * 60_000;
|
||||
const now = Date.now();
|
||||
|
|
|
|||
|
|
@ -111,7 +111,7 @@ describe("evaluateChannelHealth", () => {
|
|||
enabled: true,
|
||||
configured: true,
|
||||
lastStartAt: 0,
|
||||
lastEventAt: null,
|
||||
lastEventAt: 0,
|
||||
},
|
||||
{
|
||||
channelId: "discord",
|
||||
|
|
@ -142,6 +142,65 @@ describe("evaluateChannelHealth", () => {
|
|||
);
|
||||
expect(evaluation).toEqual({ healthy: true, reason: "healthy" });
|
||||
});
|
||||
|
||||
it("does not flag stale sockets for channels without event tracking", () => {
|
||||
const evaluation = evaluateChannelHealth(
|
||||
{
|
||||
running: true,
|
||||
connected: true,
|
||||
enabled: true,
|
||||
configured: true,
|
||||
lastStartAt: 0,
|
||||
lastEventAt: null,
|
||||
},
|
||||
{
|
||||
channelId: "discord",
|
||||
now: 100_000,
|
||||
channelConnectGraceMs: 10_000,
|
||||
staleEventThresholdMs: 30_000,
|
||||
},
|
||||
);
|
||||
expect(evaluation).toEqual({ healthy: true, reason: "healthy" });
|
||||
});
|
||||
|
||||
it("does not flag stale sockets without an active connected socket", () => {
|
||||
const evaluation = evaluateChannelHealth(
|
||||
{
|
||||
running: true,
|
||||
enabled: true,
|
||||
configured: true,
|
||||
lastStartAt: 0,
|
||||
lastEventAt: 0,
|
||||
},
|
||||
{
|
||||
channelId: "slack",
|
||||
now: 100_000,
|
||||
channelConnectGraceMs: 10_000,
|
||||
staleEventThresholdMs: 30_000,
|
||||
},
|
||||
);
|
||||
expect(evaluation).toEqual({ healthy: true, reason: "healthy" });
|
||||
});
|
||||
|
||||
it("ignores inherited event timestamps from a previous lifecycle", () => {
|
||||
const evaluation = evaluateChannelHealth(
|
||||
{
|
||||
running: true,
|
||||
connected: true,
|
||||
enabled: true,
|
||||
configured: true,
|
||||
lastStartAt: 50_000,
|
||||
lastEventAt: 10_000,
|
||||
},
|
||||
{
|
||||
channelId: "slack",
|
||||
now: 100_000,
|
||||
channelConnectGraceMs: 10_000,
|
||||
staleEventThresholdMs: 30_000,
|
||||
},
|
||||
);
|
||||
expect(evaluation).toEqual({ healthy: true, reason: "healthy" });
|
||||
});
|
||||
});
|
||||
|
||||
describe("resolveChannelRestartReason", () => {
|
||||
|
|
|
|||
|
|
@ -103,17 +103,17 @@ export function evaluateChannelHealth(
|
|||
// Skip stale-socket check for Telegram (long-polling mode). Each polling request
|
||||
// acts as a heartbeat, so the half-dead WebSocket scenario this check is designed
|
||||
// to catch does not apply to Telegram's long-polling architecture.
|
||||
if (policy.channelId !== "telegram") {
|
||||
if (snapshot.lastEventAt != null || snapshot.lastStartAt != null) {
|
||||
const upSince = snapshot.lastStartAt ?? 0;
|
||||
const upDuration = policy.now - upSince;
|
||||
if (upDuration > policy.staleEventThresholdMs) {
|
||||
const lastEvent = snapshot.lastEventAt ?? 0;
|
||||
const eventAge = policy.now - lastEvent;
|
||||
if (eventAge > policy.staleEventThresholdMs) {
|
||||
return { healthy: false, reason: "stale-socket" };
|
||||
}
|
||||
}
|
||||
if (
|
||||
policy.channelId !== "telegram" &&
|
||||
snapshot.connected === true &&
|
||||
snapshot.lastEventAt != null
|
||||
) {
|
||||
if (lastStartAt != null && snapshot.lastEventAt < lastStartAt) {
|
||||
return { healthy: true, reason: "healthy" };
|
||||
}
|
||||
const eventAge = policy.now - snapshot.lastEventAt;
|
||||
if (eventAge > policy.staleEventThresholdMs) {
|
||||
return { healthy: false, reason: "stale-socket" };
|
||||
}
|
||||
}
|
||||
return { healthy: true, reason: "healthy" };
|
||||
|
|
|
|||
|
|
@ -0,0 +1,12 @@
|
|||
import { describe, expect, it } from "vitest";
|
||||
import { createConnectedChannelStatusPatch } from "./channel-status-patches.js";
|
||||
|
||||
describe("createConnectedChannelStatusPatch", () => {
|
||||
it("uses one timestamp for connected event-liveness state", () => {
|
||||
expect(createConnectedChannelStatusPatch(1234)).toEqual({
|
||||
connected: true,
|
||||
lastConnectedAt: 1234,
|
||||
lastEventAt: 1234,
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
@ -0,0 +1,15 @@
|
|||
export type ConnectedChannelStatusPatch = {
|
||||
connected: true;
|
||||
lastConnectedAt: number;
|
||||
lastEventAt: number;
|
||||
};
|
||||
|
||||
export function createConnectedChannelStatusPatch(
|
||||
at: number = Date.now(),
|
||||
): ConnectedChannelStatusPatch {
|
||||
return {
|
||||
connected: true,
|
||||
lastConnectedAt: at,
|
||||
lastEventAt: at,
|
||||
};
|
||||
}
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
import { describe, expect, it } from "vitest";
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import { __testing } from "./provider.js";
|
||||
|
||||
class FakeEmitter {
|
||||
|
|
@ -22,6 +22,22 @@ class FakeEmitter {
|
|||
}
|
||||
|
||||
describe("slack socket reconnect helpers", () => {
|
||||
it("seeds event liveness when socket mode connects", () => {
|
||||
const setStatus = vi.fn();
|
||||
|
||||
__testing.publishSlackConnectedStatus(setStatus);
|
||||
|
||||
expect(setStatus).toHaveBeenCalledTimes(1);
|
||||
expect(setStatus).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
connected: true,
|
||||
lastConnectedAt: expect.any(Number),
|
||||
lastEventAt: expect.any(Number),
|
||||
lastError: null,
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("resolves disconnect waiter on socket disconnect event", async () => {
|
||||
const client = new FakeEmitter();
|
||||
const app = { receiver: { client } };
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ import {
|
|||
} from "../../config/runtime-group-policy.js";
|
||||
import type { SessionScope } from "../../config/sessions.js";
|
||||
import { normalizeResolvedSecretInputString } from "../../config/types.secrets.js";
|
||||
import { createConnectedChannelStatusPatch } from "../../gateway/channel-status-patches.js";
|
||||
import { warn } from "../../globals.js";
|
||||
import { computeBackoff, sleepWithAbort } from "../../infra/backoff.js";
|
||||
import { installRequestBodyLimitGuard } from "../../infra/http-body.js";
|
||||
|
|
@ -65,6 +66,17 @@ function parseApiAppIdFromAppToken(raw?: string) {
|
|||
return match?.[1]?.toUpperCase();
|
||||
}
|
||||
|
||||
function publishSlackConnectedStatus(setStatus?: (next: Record<string, unknown>) => void) {
|
||||
if (!setStatus) {
|
||||
return;
|
||||
}
|
||||
const now = Date.now();
|
||||
setStatus({
|
||||
...createConnectedChannelStatusPatch(now),
|
||||
lastError: null,
|
||||
});
|
||||
}
|
||||
|
||||
export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
|
||||
const cfg = opts.config ?? loadConfig();
|
||||
const runtime: RuntimeEnv = opts.runtime ?? createNonExitingRuntime();
|
||||
|
|
@ -390,6 +402,7 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
|
|||
try {
|
||||
await app.start();
|
||||
reconnectAttempts = 0;
|
||||
publishSlackConnectedStatus(opts.setStatus);
|
||||
runtime.log?.("slack socket mode connected");
|
||||
} catch (err) {
|
||||
// Auth errors (account_inactive, invalid_auth, etc.) are permanent —
|
||||
|
|
@ -481,6 +494,7 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
|
|||
export { isNonRecoverableSlackAuthError } from "./reconnect-policy.js";
|
||||
|
||||
export const __testing = {
|
||||
publishSlackConnectedStatus,
|
||||
resolveSlackRuntimeGroupPolicy: resolveOpenProviderRuntimeGroupPolicy,
|
||||
resolveDefaultGroupPolicy,
|
||||
getSocketEmitter,
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import { DEFAULT_GROUP_HISTORY_LIMIT } from "../../auto-reply/reply/history.js";
|
|||
import { formatCliCommand } from "../../cli/command-format.js";
|
||||
import { waitForever } from "../../cli/wait.js";
|
||||
import { loadConfig } from "../../config/config.js";
|
||||
import { createConnectedChannelStatusPatch } from "../../gateway/channel-status-patches.js";
|
||||
import { logVerbose } from "../../globals.js";
|
||||
import { formatDurationPrecise } from "../../infra/format-time/format-duration.ts";
|
||||
import { enqueueSystemEvent } from "../../infra/system-events.js";
|
||||
|
|
@ -210,9 +211,7 @@ export async function monitorWebChannel(
|
|||
},
|
||||
});
|
||||
|
||||
status.connected = true;
|
||||
status.lastConnectedAt = Date.now();
|
||||
status.lastEventAt = status.lastConnectedAt;
|
||||
Object.assign(status, createConnectedChannelStatusPatch());
|
||||
status.lastError = null;
|
||||
emitStatus();
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue