diff --git a/CHANGELOG.md b/CHANGELOG.md index b10d6b20bfb..147a35c8a2e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -230,6 +230,7 @@ Docs: https://docs.openclaw.ai - Control UI/gateway: reconnect the browser client when gateway event sequence gaps are detected, so stale non-chat state recovers automatically instead of only telling the user to refresh. (#23912) thanks @Olshansk. - ClawDock/docs: move the helper scripts to `scripts/clawdock`, publish ClawDock as a first-class docs page on the docs site, and document reinstalling local helper copies from the new raw GitHub path. (#23912) thanks @Olshansk. - Control UI/gateway: clear queued browser connect timeouts on client stop so aborted or replaced gateway clients do not send delayed connect requests after shutdown. (#57338) thanks @gumadeiras. +- Mattermost: detect stale websocket sessions after bot disable/enable cycles by polling the bot account `update_at` and forcing a reconnect when it changes. (#53604) Thanks @Qinsam. ## 2026.3.24 diff --git a/extensions/mattermost/src/mattermost/client.ts b/extensions/mattermost/src/mattermost/client.ts index cd8a6d85dc3..b6ef607b681 100644 --- a/extensions/mattermost/src/mattermost/client.ts +++ b/extensions/mattermost/src/mattermost/client.ts @@ -18,6 +18,7 @@ export type MattermostUser = { nickname?: string | null; first_name?: string | null; last_name?: string | null; + update_at?: number; }; export type MattermostChannel = { diff --git a/extensions/mattermost/src/mattermost/monitor-websocket.test.ts b/extensions/mattermost/src/mattermost/monitor-websocket.test.ts index 28aa67a7f8d..ed4656499a8 100644 --- a/extensions/mattermost/src/mattermost/monitor-websocket.test.ts +++ b/extensions/mattermost/src/mattermost/monitor-websocket.test.ts @@ -227,6 +227,191 @@ describe("mattermost websocket monitor", () => { emoji_name: "thumbsup", }), ); - expect(payload.data?.reaction).toBeDefined(); + }); + + it("terminates when bot update_at changes (disable/enable cycle)", async () => { + vi.useFakeTimers(); + const socket = new FakeWebSocket(); + const runtime = testRuntime(); + let updateAt = 1000; + const connectOnce = createMattermostConnectOnce({ + wsUrl: "wss://example.invalid/api/v4/websocket", + botToken: "token", + runtime, + nextSeq: () => 1, + onPosted: async () => {}, + webSocketFactory: () => socket, + getBotUpdateAt: async () => updateAt, + healthCheckIntervalMs: 100, + }); + + const connected = connectOnce(); + socket.emitOpen(); + + // Let initial getBotUpdateAt resolve + await vi.advanceTimersByTimeAsync(0); + + // update_at unchanged — no terminate + await vi.advanceTimersByTimeAsync(100); + expect(socket.terminateCalls).toBe(0); + + // Simulate disable/enable — update_at changes + updateAt = 2000; + await vi.advanceTimersByTimeAsync(100); + expect(socket.terminateCalls).toBe(1); + expect(runtime.log).toHaveBeenCalledWith( + "mattermost: bot account updated (update_at changed: 1000 → 2000) — reconnecting", + ); + + socket.emitClose(1006); + await connected; + vi.useRealTimers(); + }); + + it("keeps connection alive when update_at stays the same", async () => { + vi.useFakeTimers(); + const socket = new FakeWebSocket(); + const connectOnce = createMattermostConnectOnce({ + wsUrl: "wss://example.invalid/api/v4/websocket", + botToken: "token", + runtime: testRuntime(), + nextSeq: () => 1, + onPosted: async () => {}, + webSocketFactory: () => socket, + getBotUpdateAt: async () => 1000, + healthCheckIntervalMs: 100, + }); + + const connected = connectOnce(); + socket.emitOpen(); + + await vi.advanceTimersByTimeAsync(0); + await vi.advanceTimersByTimeAsync(300); + expect(socket.terminateCalls).toBe(0); + + socket.emitClose(1000); + await connected; + vi.useRealTimers(); + }); + + it("does not terminate when getBotUpdateAt throws", async () => { + vi.useFakeTimers(); + const socket = new FakeWebSocket(); + const runtime = testRuntime(); + let shouldThrow = false; + const connectOnce = createMattermostConnectOnce({ + wsUrl: "wss://example.invalid/api/v4/websocket", + botToken: "token", + runtime, + nextSeq: () => 1, + onPosted: async () => {}, + webSocketFactory: () => socket, + getBotUpdateAt: async () => { + if (shouldThrow) throw new Error("network error"); + return 1000; + }, + healthCheckIntervalMs: 100, + }); + + const connected = connectOnce(); + socket.emitOpen(); + + await vi.advanceTimersByTimeAsync(0); + + // API error — should log but not terminate + shouldThrow = true; + await vi.advanceTimersByTimeAsync(100); + expect(socket.terminateCalls).toBe(0); + expect(runtime.error).toHaveBeenCalledWith( + "mattermost: health check error: Error: network error", + ); + + socket.emitClose(1000); + await connected; + vi.useRealTimers(); + }); + + it("keeps polling when the initial getBotUpdateAt call fails", async () => { + vi.useFakeTimers(); + const socket = new FakeWebSocket(); + const runtime = testRuntime(); + const responses: Array = [new Error("network error"), 1000, 2000]; + const connectOnce = createMattermostConnectOnce({ + wsUrl: "wss://example.invalid/api/v4/websocket", + botToken: "token", + runtime, + nextSeq: () => 1, + onPosted: async () => {}, + webSocketFactory: () => socket, + getBotUpdateAt: async () => { + const next = responses.shift(); + if (next instanceof Error) { + throw next; + } + return next ?? 2000; + }, + healthCheckIntervalMs: 100, + }); + + const connected = connectOnce(); + socket.emitOpen(); + + await vi.advanceTimersByTimeAsync(0); + expect(runtime.error).toHaveBeenCalledWith( + "mattermost: failed to get initial update_at: Error: network error", + ); + + await vi.advanceTimersByTimeAsync(100); + expect(socket.terminateCalls).toBe(0); + + await vi.advanceTimersByTimeAsync(100); + expect(socket.terminateCalls).toBe(1); + expect(runtime.log).toHaveBeenCalledWith( + "mattermost: bot account updated (update_at changed: 1000 → 2000) — reconnecting", + ); + + socket.emitClose(1006); + await connected; + vi.useRealTimers(); + }); + + it("does not overlap health checks when a prior poll is still running", async () => { + vi.useFakeTimers(); + const socket = new FakeWebSocket(); + const resolvers: Array<(value: number) => void> = []; + let pollCount = 0; + const connectOnce = createMattermostConnectOnce({ + wsUrl: "wss://example.invalid/api/v4/websocket", + botToken: "token", + runtime: testRuntime(), + nextSeq: () => 1, + onPosted: async () => {}, + webSocketFactory: () => socket, + getBotUpdateAt: async () => { + pollCount++; + return await new Promise((resolve) => { + resolvers.push(resolve); + }); + }, + healthCheckIntervalMs: 100, + }); + + const connected = connectOnce(); + socket.emitOpen(); + + await vi.advanceTimersByTimeAsync(0); + expect(pollCount).toBe(1); + + await vi.advanceTimersByTimeAsync(300); + expect(pollCount).toBe(1); + + resolvers[0]?.(1000); + await vi.advanceTimersByTimeAsync(0); + await vi.advanceTimersByTimeAsync(100); + expect(pollCount).toBe(2); + + socket.emitClose(1000); + await connected; + vi.useRealTimers(); }); }); diff --git a/extensions/mattermost/src/mattermost/monitor-websocket.ts b/extensions/mattermost/src/mattermost/monitor-websocket.ts index fde37e2d61a..747dfe10ddd 100644 --- a/extensions/mattermost/src/mattermost/monitor-websocket.ts +++ b/extensions/mattermost/src/mattermost/monitor-websocket.ts @@ -89,6 +89,15 @@ type CreateMattermostConnectOnceOpts = { onPosted: (post: MattermostPost, payload: MattermostEventPayload) => Promise; onReaction?: (payload: MattermostEventPayload) => Promise; webSocketFactory?: MattermostWebSocketFactory; + /** + * Called periodically to check whether the bot account has been modified + * (e.g. disabled then re-enabled) since the WebSocket was opened. + * Returns the bot's current `update_at` timestamp. When it differs from + * the value recorded at connect time, the connection is terminated so the + * reconnect loop can establish a fresh one. + */ + getBotUpdateAt?: () => Promise; + healthCheckIntervalMs?: number; }; export const defaultMattermostWebSocketFactory: MattermostWebSocketFactory = (url) => @@ -126,20 +135,86 @@ export function createMattermostConnectOnce( opts: CreateMattermostConnectOnceOpts, ): () => Promise { const webSocketFactory = opts.webSocketFactory ?? defaultMattermostWebSocketFactory; + const healthCheckIntervalMs = opts.healthCheckIntervalMs ?? 30_000; return async () => { const ws = webSocketFactory(opts.wsUrl); const onAbort = () => ws.terminate(); opts.abortSignal?.addEventListener("abort", onAbort, { once: true }); + const getBotUpdateAt = opts.getBotUpdateAt; try { return await new Promise((resolve, reject) => { let opened = false; let settled = false; + let healthCheckEnabled = getBotUpdateAt != null; + let healthCheckInFlight = false; + let healthCheckTimer: ReturnType | undefined; + let initialUpdateAt: number | undefined; + + const clearTimers = () => { + if (healthCheckTimer !== undefined) { + clearTimeout(healthCheckTimer); + healthCheckTimer = undefined; + } + }; + + const stopHealthChecks = () => { + healthCheckEnabled = false; + clearTimers(); + }; + + const scheduleHealthCheck = () => { + if (!getBotUpdateAt || !healthCheckEnabled || settled || healthCheckInFlight) { + return; + } + healthCheckTimer = setTimeout(() => { + healthCheckTimer = undefined; + void runHealthCheck(); + }, healthCheckIntervalMs); + }; + + const runHealthCheck = async () => { + if (!getBotUpdateAt || !healthCheckEnabled || settled || healthCheckInFlight) { + return; + } + healthCheckInFlight = true; + try { + const current = await getBotUpdateAt(); + if (!healthCheckEnabled || settled) { + return; + } + if (initialUpdateAt === undefined) { + initialUpdateAt = current; + return; + } + if (current !== initialUpdateAt) { + opts.runtime.log?.( + `mattermost: bot account updated (update_at changed: ${initialUpdateAt} → ${current}) — reconnecting`, + ); + stopHealthChecks(); + ws.terminate(); + } + } catch (err) { + if (!healthCheckEnabled || settled) { + return; + } + const label = + initialUpdateAt === undefined + ? "mattermost: failed to get initial update_at" + : "mattermost: health check error"; + opts.runtime.error?.(`${label}: ${String(err)}`); + } finally { + healthCheckInFlight = false; + scheduleHealthCheck(); + } + }; + const resolveOnce = () => { if (settled) { return; } settled = true; + stopHealthChecks(); resolve(); }; const rejectOnce = (error: Error) => { @@ -147,6 +222,7 @@ export function createMattermostConnectOnce( return; } settled = true; + stopHealthChecks(); reject(error); }; @@ -164,6 +240,15 @@ export function createMattermostConnectOnce( data: { token: opts.botToken }, }), ); + + // Periodically check if the bot account was modified (e.g. disable/enable). + // After such a cycle the WebSocket silently stops delivering events even + // though the connection itself stays alive. Comparing update_at detects + // this reliably regardless of how quickly the cycle happens. + if (getBotUpdateAt) { + // Use a recursive timeout so only one REST poll can be in flight at a time. + void runHealthCheck(); + } }); ws.on("message", async (data) => { @@ -200,6 +285,7 @@ export function createMattermostConnectOnce( }); ws.on("close", (code, reason) => { + stopHealthChecks(); const message = reasonToString(reason); opts.statusSink?.({ connected: false, diff --git a/extensions/mattermost/src/mattermost/monitor.ts b/extensions/mattermost/src/mattermost/monitor.ts index bbf26a7555b..6d0b3b9b685 100644 --- a/extensions/mattermost/src/mattermost/monitor.ts +++ b/extensions/mattermost/src/mattermost/monitor.ts @@ -275,7 +275,33 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} botToken, allowPrivateNetwork: account.config?.allowPrivateNetwork === true, }); - const botUser = await fetchMattermostMe(client); + + // Wait for the Mattermost API to accept our bot token before proceeding. + // When a bot account is disabled and re-enabled, the session is invalidated + // and API calls return 401 until the account is fully active again. Retrying + // here (with exponential backoff) keeps the monitor alive and prevents the + // framework's auto-restart budget from being exhausted. + let botUser!: MattermostUser; + await runWithReconnect( + async () => { + botUser = await fetchMattermostMe(client); + }, + { + abortSignal: opts.abortSignal, + jitterRatio: 0.2, + shouldReconnect: ({ outcome }) => outcome === "rejected", + onError: (err) => { + runtime.error?.(`mattermost: API auth failed: ${String(err)}`); + opts.statusSink?.({ lastError: String(err), connected: false }); + }, + onReconnect: (delayMs) => { + runtime.log?.(`mattermost: API not accessible, retrying in ${Math.round(delayMs / 1000)}s`); + }, + }, + ); + if (opts.abortSignal?.aborted) { + return; + } const botUserId = botUser.id; const botUsername = botUser.username?.trim() || undefined; runtime.log?.(`mattermost connected as ${botUsername ? `@${botUsername}` : botUserId}`); @@ -1645,6 +1671,10 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} runtime, webSocketFactory: opts.webSocketFactory, nextSeq: () => seq++, + getBotUpdateAt: async () => { + const me = await fetchMattermostMe(client); + return me.update_at ?? 0; + }, onPosted: async (post, payload) => { await debouncer.enqueue({ post, payload }); }, diff --git a/src/agents/skills-install.ts b/src/agents/skills-install.ts index b925f9d3a9b..f1f7d8a4480 100644 --- a/src/agents/skills-install.ts +++ b/src/agents/skills-install.ts @@ -17,6 +17,7 @@ import { type SkillInstallSpec, type SkillsInstallPreferences, } from "./skills.js"; +import { resolveSkillSource } from "./skills/source.js"; export type SkillInstallRequest = { workspaceDir: string; @@ -503,7 +504,7 @@ export async function installSkill(params: SkillInstallRequest): Promise isConfigPathTruthy(config, pathStr); - const skillSource = entry.skill.sourceInfo?.source?.trim() || "unknown"; + const skillSource = resolveSkillSource(entry.skill); const bundled = skillSource === "openclaw-bundled" || (skillSource === "unknown" && bundledNames?.has(entry.skill.name) === true); diff --git a/src/agents/skills.test-helpers.ts b/src/agents/skills.test-helpers.ts index 5450f932208..e0cae977466 100644 --- a/src/agents/skills.test-helpers.ts +++ b/src/agents/skills.test-helpers.ts @@ -1,6 +1,6 @@ import fs from "node:fs/promises"; import path from "node:path"; -import { createSyntheticSourceInfo, type Skill } from "@mariozechner/pi-coding-agent"; +import type { Skill } from "@mariozechner/pi-coding-agent"; export async function writeSkill(params: { dir: string; @@ -31,17 +31,12 @@ export function createCanonicalFixtureSkill(params: { source: string; disableModelInvocation?: boolean; }): Skill { - // Keep skill fixtures on the upstream canonical provenance shape. return { name: params.name, description: params.description, filePath: params.filePath, baseDir: params.baseDir, source: params.source, - sourceInfo: createSyntheticSourceInfo(params.filePath, { - source: params.source, - baseDir: params.baseDir, - }), disableModelInvocation: params.disableModelInvocation ?? false, }; } diff --git a/src/agents/skills/compact-format.test.ts b/src/agents/skills/compact-format.test.ts index 8a7fca7b0b0..788256d1168 100644 --- a/src/agents/skills/compact-format.test.ts +++ b/src/agents/skills/compact-format.test.ts @@ -1,11 +1,8 @@ import os from "node:os"; -import { - createSyntheticSourceInfo, - formatSkillsForPrompt, - type Skill, -} from "@mariozechner/pi-coding-agent"; +import { formatSkillsForPrompt, type Skill } from "@mariozechner/pi-coding-agent"; import { describe, expect, it } from "vitest"; import type { OpenClawConfig } from "../../config/config.js"; +import { createCanonicalFixtureSkill } from "../skills.test-helpers.js"; import type { SkillEntry } from "./types.js"; import { formatSkillsCompact, @@ -14,18 +11,13 @@ import { } from "./workspace.js"; function makeSkill(name: string, desc = "A skill", filePath = `/skills/${name}/SKILL.md`): Skill { - return { + return createCanonicalFixtureSkill({ name, description: desc, filePath, baseDir: `/skills/${name}`, source: "workspace", - sourceInfo: createSyntheticSourceInfo(filePath, { - source: "workspace", - baseDir: `/skills/${name}`, - }), - disableModelInvocation: false, - }; + }); } function makeEntry(skill: Skill): SkillEntry { diff --git a/src/agents/skills/config.ts b/src/agents/skills/config.ts index 0392271264f..fd2e0644bfc 100644 --- a/src/agents/skills/config.ts +++ b/src/agents/skills/config.ts @@ -8,6 +8,7 @@ import { } from "../../shared/config-eval.js"; import { normalizeStringEntries } from "../../shared/string-normalization.js"; import { resolveSkillKey } from "./frontmatter.js"; +import { resolveSkillSource } from "./source.js"; import type { SkillEligibilityContext, SkillEntry } from "./types.js"; const DEFAULT_CONFIG_VALUES: Record = { @@ -50,7 +51,7 @@ function normalizeAllowlist(input: unknown): string[] | undefined { const BUNDLED_SOURCES = new Set(["openclaw-bundled"]); function isBundledSkill(entry: SkillEntry): boolean { - return BUNDLED_SOURCES.has(entry.skill.sourceInfo?.source ?? ""); + return BUNDLED_SOURCES.has(resolveSkillSource(entry.skill)); } export function resolveBundledAllowlist(config?: OpenClawConfig): string[] | undefined { diff --git a/src/agents/skills/source.ts b/src/agents/skills/source.ts new file mode 100644 index 00000000000..17019a6ce07 --- /dev/null +++ b/src/agents/skills/source.ts @@ -0,0 +1,18 @@ +import type { Skill } from "@mariozechner/pi-coding-agent"; + +type SkillSourceCompat = Skill & { + sourceInfo?: { + source?: string; + }; +}; + +export function resolveSkillSource(skill: Skill): string { + const compatSkill = skill as SkillSourceCompat; + const canonical = typeof compatSkill.source === "string" ? compatSkill.source.trim() : ""; + if (canonical) { + return canonical; + } + const legacy = + typeof compatSkill.sourceInfo?.source === "string" ? compatSkill.sourceInfo.source.trim() : ""; + return legacy || "unknown"; +} diff --git a/src/commands/channels.mock-harness.ts b/src/commands/channels.mock-harness.ts index 368604e4fef..8b4eae29522 100644 --- a/src/commands/channels.mock-harness.ts +++ b/src/commands/channels.mock-harness.ts @@ -40,7 +40,7 @@ vi.mock("../config/config.js", async (importOriginal) => { vi.mock( buildBundledPluginModuleId("telegram", "update-offset-runtime-api.js"), async (importOriginal) => { - const actual = (await importOriginal()) as Record; + const actual: Record = await importOriginal(); return { ...actual, deleteTelegramUpdateOffset: offsetMocks.deleteTelegramUpdateOffset, diff --git a/src/security/audit-extra.async.ts b/src/security/audit-extra.async.ts index 026c5ad66a6..1b81411d4e1 100644 --- a/src/security/audit-extra.async.ts +++ b/src/security/audit-extra.async.ts @@ -11,6 +11,7 @@ import { SANDBOX_BROWSER_SECURITY_HASH_EPOCH } from "../agents/sandbox/constants import { execDockerRaw, type ExecDockerRawResult } from "../agents/sandbox/docker.js"; import { resolveSandboxToolPolicyForAgent } from "../agents/sandbox/tool-policy.js"; import type { SandboxToolPolicy } from "../agents/sandbox/types.js"; +import { resolveSkillSource } from "../agents/skills/source.js"; import { isToolAllowedByPolicies } from "../agents/tool-policy-match.js"; import { resolveToolProfilePolicy } from "../agents/tool-policy.js"; import { listAgentWorkspaceDirs } from "../agents/workspace-dirs.js"; @@ -1261,7 +1262,7 @@ export async function collectInstalledSkillsCodeSafetyFindings(params: { for (const workspaceDir of workspaceDirs) { const entries = loadWorkspaceSkillEntries(workspaceDir, { config: params.cfg }); for (const entry of entries) { - if (entry.skill.sourceInfo?.source === "openclaw-bundled") { + if (resolveSkillSource(entry.skill) === "openclaw-bundled") { continue; } diff --git a/test/test-env.ts b/test/test-env.ts index ad01e5d3870..e247f396772 100644 --- a/test/test-env.ts +++ b/test/test-env.ts @@ -227,12 +227,12 @@ function copyFileIfExists(sourcePath: string, targetPath: string): void { function sanitizeLiveConfig(raw: string): string { try { - const parsed = JSON5.parse(raw) as { + const parsed: { agents?: { defaults?: Record; list?: Array>; }; - }; + } = JSON5.parse(raw); if (!parsed || typeof parsed !== "object") { return raw; }