fix(mattermost): detect stale websocket after bot disable/enable cycle (#53604)

Merged via squash.

Prepared head SHA: 818d437a54
Co-authored-by: Qinsam <19649380+Qinsam@users.noreply.github.com>
Co-authored-by: mukhtharcm <56378562+mukhtharcm@users.noreply.github.com>
Reviewed-by: @mukhtharcm
This commit is contained in:
qsam 2026-03-30 10:24:59 +08:00 committed by GitHub
parent c6ded0fa54
commit 47839d3b9a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 339 additions and 27 deletions

View File

@ -230,6 +230,7 @@ Docs: https://docs.openclaw.ai
- Control UI/gateway: reconnect the browser client when gateway event sequence gaps are detected, so stale non-chat state recovers automatically instead of only telling the user to refresh. (#23912) thanks @Olshansk.
- ClawDock/docs: move the helper scripts to `scripts/clawdock`, publish ClawDock as a first-class docs page on the docs site, and document reinstalling local helper copies from the new raw GitHub path. (#23912) thanks @Olshansk.
- Control UI/gateway: clear queued browser connect timeouts on client stop so aborted or replaced gateway clients do not send delayed connect requests after shutdown. (#57338) thanks @gumadeiras.
- Mattermost: detect stale websocket sessions after bot disable/enable cycles by polling the bot account `update_at` and forcing a reconnect when it changes. (#53604) Thanks @Qinsam.
## 2026.3.24

View File

@ -18,6 +18,7 @@ export type MattermostUser = {
nickname?: string | null;
first_name?: string | null;
last_name?: string | null;
update_at?: number;
};
export type MattermostChannel = {

View File

@ -227,6 +227,191 @@ describe("mattermost websocket monitor", () => {
emoji_name: "thumbsup",
}),
);
expect(payload.data?.reaction).toBeDefined();
});
it("terminates when bot update_at changes (disable/enable cycle)", async () => {
vi.useFakeTimers();
const socket = new FakeWebSocket();
const runtime = testRuntime();
let updateAt = 1000;
const connectOnce = createMattermostConnectOnce({
wsUrl: "wss://example.invalid/api/v4/websocket",
botToken: "token",
runtime,
nextSeq: () => 1,
onPosted: async () => {},
webSocketFactory: () => socket,
getBotUpdateAt: async () => updateAt,
healthCheckIntervalMs: 100,
});
const connected = connectOnce();
socket.emitOpen();
// Let initial getBotUpdateAt resolve
await vi.advanceTimersByTimeAsync(0);
// update_at unchanged — no terminate
await vi.advanceTimersByTimeAsync(100);
expect(socket.terminateCalls).toBe(0);
// Simulate disable/enable — update_at changes
updateAt = 2000;
await vi.advanceTimersByTimeAsync(100);
expect(socket.terminateCalls).toBe(1);
expect(runtime.log).toHaveBeenCalledWith(
"mattermost: bot account updated (update_at changed: 1000 → 2000) — reconnecting",
);
socket.emitClose(1006);
await connected;
vi.useRealTimers();
});
it("keeps connection alive when update_at stays the same", async () => {
vi.useFakeTimers();
const socket = new FakeWebSocket();
const connectOnce = createMattermostConnectOnce({
wsUrl: "wss://example.invalid/api/v4/websocket",
botToken: "token",
runtime: testRuntime(),
nextSeq: () => 1,
onPosted: async () => {},
webSocketFactory: () => socket,
getBotUpdateAt: async () => 1000,
healthCheckIntervalMs: 100,
});
const connected = connectOnce();
socket.emitOpen();
await vi.advanceTimersByTimeAsync(0);
await vi.advanceTimersByTimeAsync(300);
expect(socket.terminateCalls).toBe(0);
socket.emitClose(1000);
await connected;
vi.useRealTimers();
});
it("does not terminate when getBotUpdateAt throws", async () => {
vi.useFakeTimers();
const socket = new FakeWebSocket();
const runtime = testRuntime();
let shouldThrow = false;
const connectOnce = createMattermostConnectOnce({
wsUrl: "wss://example.invalid/api/v4/websocket",
botToken: "token",
runtime,
nextSeq: () => 1,
onPosted: async () => {},
webSocketFactory: () => socket,
getBotUpdateAt: async () => {
if (shouldThrow) throw new Error("network error");
return 1000;
},
healthCheckIntervalMs: 100,
});
const connected = connectOnce();
socket.emitOpen();
await vi.advanceTimersByTimeAsync(0);
// API error — should log but not terminate
shouldThrow = true;
await vi.advanceTimersByTimeAsync(100);
expect(socket.terminateCalls).toBe(0);
expect(runtime.error).toHaveBeenCalledWith(
"mattermost: health check error: Error: network error",
);
socket.emitClose(1000);
await connected;
vi.useRealTimers();
});
it("keeps polling when the initial getBotUpdateAt call fails", async () => {
vi.useFakeTimers();
const socket = new FakeWebSocket();
const runtime = testRuntime();
const responses: Array<number | Error> = [new Error("network error"), 1000, 2000];
const connectOnce = createMattermostConnectOnce({
wsUrl: "wss://example.invalid/api/v4/websocket",
botToken: "token",
runtime,
nextSeq: () => 1,
onPosted: async () => {},
webSocketFactory: () => socket,
getBotUpdateAt: async () => {
const next = responses.shift();
if (next instanceof Error) {
throw next;
}
return next ?? 2000;
},
healthCheckIntervalMs: 100,
});
const connected = connectOnce();
socket.emitOpen();
await vi.advanceTimersByTimeAsync(0);
expect(runtime.error).toHaveBeenCalledWith(
"mattermost: failed to get initial update_at: Error: network error",
);
await vi.advanceTimersByTimeAsync(100);
expect(socket.terminateCalls).toBe(0);
await vi.advanceTimersByTimeAsync(100);
expect(socket.terminateCalls).toBe(1);
expect(runtime.log).toHaveBeenCalledWith(
"mattermost: bot account updated (update_at changed: 1000 → 2000) — reconnecting",
);
socket.emitClose(1006);
await connected;
vi.useRealTimers();
});
it("does not overlap health checks when a prior poll is still running", async () => {
vi.useFakeTimers();
const socket = new FakeWebSocket();
const resolvers: Array<(value: number) => void> = [];
let pollCount = 0;
const connectOnce = createMattermostConnectOnce({
wsUrl: "wss://example.invalid/api/v4/websocket",
botToken: "token",
runtime: testRuntime(),
nextSeq: () => 1,
onPosted: async () => {},
webSocketFactory: () => socket,
getBotUpdateAt: async () => {
pollCount++;
return await new Promise<number>((resolve) => {
resolvers.push(resolve);
});
},
healthCheckIntervalMs: 100,
});
const connected = connectOnce();
socket.emitOpen();
await vi.advanceTimersByTimeAsync(0);
expect(pollCount).toBe(1);
await vi.advanceTimersByTimeAsync(300);
expect(pollCount).toBe(1);
resolvers[0]?.(1000);
await vi.advanceTimersByTimeAsync(0);
await vi.advanceTimersByTimeAsync(100);
expect(pollCount).toBe(2);
socket.emitClose(1000);
await connected;
vi.useRealTimers();
});
});

View File

@ -89,6 +89,15 @@ type CreateMattermostConnectOnceOpts = {
onPosted: (post: MattermostPost, payload: MattermostEventPayload) => Promise<void>;
onReaction?: (payload: MattermostEventPayload) => Promise<void>;
webSocketFactory?: MattermostWebSocketFactory;
/**
* Called periodically to check whether the bot account has been modified
* (e.g. disabled then re-enabled) since the WebSocket was opened.
* Returns the bot's current `update_at` timestamp. When it differs from
* the value recorded at connect time, the connection is terminated so the
* reconnect loop can establish a fresh one.
*/
getBotUpdateAt?: () => Promise<number>;
healthCheckIntervalMs?: number;
};
export const defaultMattermostWebSocketFactory: MattermostWebSocketFactory = (url) =>
@ -126,20 +135,86 @@ export function createMattermostConnectOnce(
opts: CreateMattermostConnectOnceOpts,
): () => Promise<void> {
const webSocketFactory = opts.webSocketFactory ?? defaultMattermostWebSocketFactory;
const healthCheckIntervalMs = opts.healthCheckIntervalMs ?? 30_000;
return async () => {
const ws = webSocketFactory(opts.wsUrl);
const onAbort = () => ws.terminate();
opts.abortSignal?.addEventListener("abort", onAbort, { once: true });
const getBotUpdateAt = opts.getBotUpdateAt;
try {
return await new Promise<void>((resolve, reject) => {
let opened = false;
let settled = false;
let healthCheckEnabled = getBotUpdateAt != null;
let healthCheckInFlight = false;
let healthCheckTimer: ReturnType<typeof setTimeout> | undefined;
let initialUpdateAt: number | undefined;
const clearTimers = () => {
if (healthCheckTimer !== undefined) {
clearTimeout(healthCheckTimer);
healthCheckTimer = undefined;
}
};
const stopHealthChecks = () => {
healthCheckEnabled = false;
clearTimers();
};
const scheduleHealthCheck = () => {
if (!getBotUpdateAt || !healthCheckEnabled || settled || healthCheckInFlight) {
return;
}
healthCheckTimer = setTimeout(() => {
healthCheckTimer = undefined;
void runHealthCheck();
}, healthCheckIntervalMs);
};
const runHealthCheck = async () => {
if (!getBotUpdateAt || !healthCheckEnabled || settled || healthCheckInFlight) {
return;
}
healthCheckInFlight = true;
try {
const current = await getBotUpdateAt();
if (!healthCheckEnabled || settled) {
return;
}
if (initialUpdateAt === undefined) {
initialUpdateAt = current;
return;
}
if (current !== initialUpdateAt) {
opts.runtime.log?.(
`mattermost: bot account updated (update_at changed: ${initialUpdateAt}${current}) — reconnecting`,
);
stopHealthChecks();
ws.terminate();
}
} catch (err) {
if (!healthCheckEnabled || settled) {
return;
}
const label =
initialUpdateAt === undefined
? "mattermost: failed to get initial update_at"
: "mattermost: health check error";
opts.runtime.error?.(`${label}: ${String(err)}`);
} finally {
healthCheckInFlight = false;
scheduleHealthCheck();
}
};
const resolveOnce = () => {
if (settled) {
return;
}
settled = true;
stopHealthChecks();
resolve();
};
const rejectOnce = (error: Error) => {
@ -147,6 +222,7 @@ export function createMattermostConnectOnce(
return;
}
settled = true;
stopHealthChecks();
reject(error);
};
@ -164,6 +240,15 @@ export function createMattermostConnectOnce(
data: { token: opts.botToken },
}),
);
// Periodically check if the bot account was modified (e.g. disable/enable).
// After such a cycle the WebSocket silently stops delivering events even
// though the connection itself stays alive. Comparing update_at detects
// this reliably regardless of how quickly the cycle happens.
if (getBotUpdateAt) {
// Use a recursive timeout so only one REST poll can be in flight at a time.
void runHealthCheck();
}
});
ws.on("message", async (data) => {
@ -200,6 +285,7 @@ export function createMattermostConnectOnce(
});
ws.on("close", (code, reason) => {
stopHealthChecks();
const message = reasonToString(reason);
opts.statusSink?.({
connected: false,

View File

@ -275,7 +275,33 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {}
botToken,
allowPrivateNetwork: account.config?.allowPrivateNetwork === true,
});
const botUser = await fetchMattermostMe(client);
// Wait for the Mattermost API to accept our bot token before proceeding.
// When a bot account is disabled and re-enabled, the session is invalidated
// and API calls return 401 until the account is fully active again. Retrying
// here (with exponential backoff) keeps the monitor alive and prevents the
// framework's auto-restart budget from being exhausted.
let botUser!: MattermostUser;
await runWithReconnect(
async () => {
botUser = await fetchMattermostMe(client);
},
{
abortSignal: opts.abortSignal,
jitterRatio: 0.2,
shouldReconnect: ({ outcome }) => outcome === "rejected",
onError: (err) => {
runtime.error?.(`mattermost: API auth failed: ${String(err)}`);
opts.statusSink?.({ lastError: String(err), connected: false });
},
onReconnect: (delayMs) => {
runtime.log?.(`mattermost: API not accessible, retrying in ${Math.round(delayMs / 1000)}s`);
},
},
);
if (opts.abortSignal?.aborted) {
return;
}
const botUserId = botUser.id;
const botUsername = botUser.username?.trim() || undefined;
runtime.log?.(`mattermost connected as ${botUsername ? `@${botUsername}` : botUserId}`);
@ -1645,6 +1671,10 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {}
runtime,
webSocketFactory: opts.webSocketFactory,
nextSeq: () => seq++,
getBotUpdateAt: async () => {
const me = await fetchMattermostMe(client);
return me.update_at ?? 0;
},
onPosted: async (post, payload) => {
await debouncer.enqueue({ post, payload });
},

View File

@ -17,6 +17,7 @@ import {
type SkillInstallSpec,
type SkillsInstallPreferences,
} from "./skills.js";
import { resolveSkillSource } from "./skills/source.js";
export type SkillInstallRequest = {
workspaceDir: string;
@ -503,7 +504,7 @@ export async function installSkill(params: SkillInstallRequest): Promise<SkillIn
const spec = findInstallSpec(entry, params.installId);
const scanResult = await collectSkillInstallScanWarnings(entry);
const warnings = scanResult.warnings;
const skillSource = entry.skill.sourceInfo?.source?.trim() || "unknown";
const skillSource = resolveSkillSource(entry.skill);
// Run before_install so external scanners can augment findings or block installs.
const hookRunner = getGlobalHookRunner();

View File

@ -17,6 +17,7 @@ import {
type SkillsInstallPreferences,
} from "./skills.js";
import { resolveBundledSkillsContext } from "./skills/bundled-context.js";
import { resolveSkillSource } from "./skills/source.js";
export type SkillStatusConfigCheck = RequirementConfigCheck;
@ -186,7 +187,7 @@ function buildSkillStatus(
(skillConfig?.apiKey && entry.metadata?.primaryEnv === envName),
);
const isConfigSatisfied = (pathStr: string) => isConfigPathTruthy(config, pathStr);
const skillSource = entry.skill.sourceInfo?.source?.trim() || "unknown";
const skillSource = resolveSkillSource(entry.skill);
const bundled =
skillSource === "openclaw-bundled" ||
(skillSource === "unknown" && bundledNames?.has(entry.skill.name) === true);

View File

@ -1,6 +1,6 @@
import fs from "node:fs/promises";
import path from "node:path";
import { createSyntheticSourceInfo, type Skill } from "@mariozechner/pi-coding-agent";
import type { Skill } from "@mariozechner/pi-coding-agent";
export async function writeSkill(params: {
dir: string;
@ -31,17 +31,12 @@ export function createCanonicalFixtureSkill(params: {
source: string;
disableModelInvocation?: boolean;
}): Skill {
// Keep skill fixtures on the upstream canonical provenance shape.
return {
name: params.name,
description: params.description,
filePath: params.filePath,
baseDir: params.baseDir,
source: params.source,
sourceInfo: createSyntheticSourceInfo(params.filePath, {
source: params.source,
baseDir: params.baseDir,
}),
disableModelInvocation: params.disableModelInvocation ?? false,
};
}

View File

@ -1,11 +1,8 @@
import os from "node:os";
import {
createSyntheticSourceInfo,
formatSkillsForPrompt,
type Skill,
} from "@mariozechner/pi-coding-agent";
import { formatSkillsForPrompt, type Skill } from "@mariozechner/pi-coding-agent";
import { describe, expect, it } from "vitest";
import type { OpenClawConfig } from "../../config/config.js";
import { createCanonicalFixtureSkill } from "../skills.test-helpers.js";
import type { SkillEntry } from "./types.js";
import {
formatSkillsCompact,
@ -14,18 +11,13 @@ import {
} from "./workspace.js";
function makeSkill(name: string, desc = "A skill", filePath = `/skills/${name}/SKILL.md`): Skill {
return {
return createCanonicalFixtureSkill({
name,
description: desc,
filePath,
baseDir: `/skills/${name}`,
source: "workspace",
sourceInfo: createSyntheticSourceInfo(filePath, {
source: "workspace",
baseDir: `/skills/${name}`,
}),
disableModelInvocation: false,
};
});
}
function makeEntry(skill: Skill): SkillEntry {

View File

@ -8,6 +8,7 @@ import {
} from "../../shared/config-eval.js";
import { normalizeStringEntries } from "../../shared/string-normalization.js";
import { resolveSkillKey } from "./frontmatter.js";
import { resolveSkillSource } from "./source.js";
import type { SkillEligibilityContext, SkillEntry } from "./types.js";
const DEFAULT_CONFIG_VALUES: Record<string, boolean> = {
@ -50,7 +51,7 @@ function normalizeAllowlist(input: unknown): string[] | undefined {
const BUNDLED_SOURCES = new Set(["openclaw-bundled"]);
function isBundledSkill(entry: SkillEntry): boolean {
return BUNDLED_SOURCES.has(entry.skill.sourceInfo?.source ?? "");
return BUNDLED_SOURCES.has(resolveSkillSource(entry.skill));
}
export function resolveBundledAllowlist(config?: OpenClawConfig): string[] | undefined {

View File

@ -0,0 +1,18 @@
import type { Skill } from "@mariozechner/pi-coding-agent";
type SkillSourceCompat = Skill & {
sourceInfo?: {
source?: string;
};
};
export function resolveSkillSource(skill: Skill): string {
const compatSkill = skill as SkillSourceCompat;
const canonical = typeof compatSkill.source === "string" ? compatSkill.source.trim() : "";
if (canonical) {
return canonical;
}
const legacy =
typeof compatSkill.sourceInfo?.source === "string" ? compatSkill.sourceInfo.source.trim() : "";
return legacy || "unknown";
}

View File

@ -40,7 +40,7 @@ vi.mock("../config/config.js", async (importOriginal) => {
vi.mock(
buildBundledPluginModuleId("telegram", "update-offset-runtime-api.js"),
async (importOriginal) => {
const actual = (await importOriginal()) as Record<string, unknown>;
const actual: Record<string, unknown> = await importOriginal();
return {
...actual,
deleteTelegramUpdateOffset: offsetMocks.deleteTelegramUpdateOffset,

View File

@ -11,6 +11,7 @@ import { SANDBOX_BROWSER_SECURITY_HASH_EPOCH } from "../agents/sandbox/constants
import { execDockerRaw, type ExecDockerRawResult } from "../agents/sandbox/docker.js";
import { resolveSandboxToolPolicyForAgent } from "../agents/sandbox/tool-policy.js";
import type { SandboxToolPolicy } from "../agents/sandbox/types.js";
import { resolveSkillSource } from "../agents/skills/source.js";
import { isToolAllowedByPolicies } from "../agents/tool-policy-match.js";
import { resolveToolProfilePolicy } from "../agents/tool-policy.js";
import { listAgentWorkspaceDirs } from "../agents/workspace-dirs.js";
@ -1261,7 +1262,7 @@ export async function collectInstalledSkillsCodeSafetyFindings(params: {
for (const workspaceDir of workspaceDirs) {
const entries = loadWorkspaceSkillEntries(workspaceDir, { config: params.cfg });
for (const entry of entries) {
if (entry.skill.sourceInfo?.source === "openclaw-bundled") {
if (resolveSkillSource(entry.skill) === "openclaw-bundled") {
continue;
}

View File

@ -227,12 +227,12 @@ function copyFileIfExists(sourcePath: string, targetPath: string): void {
function sanitizeLiveConfig(raw: string): string {
try {
const parsed = JSON5.parse(raw) as {
const parsed: {
agents?: {
defaults?: Record<string, unknown>;
list?: Array<Record<string, unknown>>;
};
};
} = JSON5.parse(raw);
if (!parsed || typeof parsed !== "object") {
return raw;
}