mirror of https://github.com/openclaw/openclaw.git
refactor(whatsapp): centralize web monitor state
This commit is contained in:
parent
cfd9242e5d
commit
66743b84fa
|
|
@ -0,0 +1,94 @@
|
|||
import { createConnectedChannelStatusPatch } from "openclaw/plugin-sdk/gateway-runtime";
|
||||
import type { WebChannelHealthState, WebChannelStatus } from "./types.js";
|
||||
|
||||
function cloneStatus(status: WebChannelStatus): WebChannelStatus {
|
||||
return {
|
||||
...status,
|
||||
lastDisconnect: status.lastDisconnect ? { ...status.lastDisconnect } : null,
|
||||
};
|
||||
}
|
||||
|
||||
function isTerminalHealthState(healthState: WebChannelHealthState | undefined): boolean {
|
||||
return healthState === "conflict" || healthState === "logged-out" || healthState === "stopped";
|
||||
}
|
||||
|
||||
export function createWebChannelStatusController(statusSink?: (status: WebChannelStatus) => void) {
|
||||
const status: WebChannelStatus = {
|
||||
running: true,
|
||||
connected: false,
|
||||
reconnectAttempts: 0,
|
||||
lastConnectedAt: null,
|
||||
lastDisconnect: null,
|
||||
lastInboundAt: null,
|
||||
lastMessageAt: null,
|
||||
lastEventAt: null,
|
||||
lastError: null,
|
||||
healthState: "starting",
|
||||
};
|
||||
|
||||
const emit = () => {
|
||||
statusSink?.(cloneStatus(status));
|
||||
};
|
||||
|
||||
return {
|
||||
emit,
|
||||
snapshot: () => status,
|
||||
noteConnected(at = Date.now()) {
|
||||
Object.assign(status, createConnectedChannelStatusPatch(at));
|
||||
status.lastError = null;
|
||||
status.healthState = "healthy";
|
||||
emit();
|
||||
},
|
||||
noteInbound(at = Date.now()) {
|
||||
status.lastInboundAt = at;
|
||||
status.lastMessageAt = at;
|
||||
status.lastEventAt = at;
|
||||
if (status.connected) {
|
||||
status.healthState = "healthy";
|
||||
}
|
||||
emit();
|
||||
},
|
||||
noteWatchdogStale(at = Date.now()) {
|
||||
status.lastEventAt = at;
|
||||
if (status.connected) {
|
||||
status.healthState = "stale";
|
||||
}
|
||||
emit();
|
||||
},
|
||||
noteReconnectAttempts(reconnectAttempts: number) {
|
||||
status.reconnectAttempts = reconnectAttempts;
|
||||
emit();
|
||||
},
|
||||
noteClose(params: {
|
||||
at?: number;
|
||||
statusCode?: number;
|
||||
loggedOut?: boolean;
|
||||
error?: string;
|
||||
reconnectAttempts: number;
|
||||
healthState: WebChannelHealthState;
|
||||
}) {
|
||||
const at = params.at ?? Date.now();
|
||||
status.connected = false;
|
||||
status.lastEventAt = at;
|
||||
status.lastDisconnect = {
|
||||
at,
|
||||
status: params.statusCode,
|
||||
error: params.error,
|
||||
loggedOut: Boolean(params.loggedOut),
|
||||
};
|
||||
status.lastError = params.error ?? null;
|
||||
status.reconnectAttempts = params.reconnectAttempts;
|
||||
status.healthState = params.healthState;
|
||||
emit();
|
||||
},
|
||||
markStopped(at = Date.now()) {
|
||||
status.running = false;
|
||||
status.connected = false;
|
||||
status.lastEventAt = at;
|
||||
if (!isTerminalHealthState(status.healthState)) {
|
||||
status.healthState = "stopped";
|
||||
}
|
||||
emit();
|
||||
},
|
||||
};
|
||||
}
|
||||
|
|
@ -3,7 +3,6 @@ import { formatCliCommand } from "openclaw/plugin-sdk/cli-runtime";
|
|||
import { waitForever } from "openclaw/plugin-sdk/cli-runtime";
|
||||
import { hasControlCommand } from "openclaw/plugin-sdk/command-auth";
|
||||
import { loadConfig } from "openclaw/plugin-sdk/config-runtime";
|
||||
import { createConnectedChannelStatusPatch } from "openclaw/plugin-sdk/gateway-runtime";
|
||||
import { formatDurationPrecise } from "openclaw/plugin-sdk/infra-runtime";
|
||||
import { enqueueSystemEvent } from "openclaw/plugin-sdk/infra-runtime";
|
||||
import { DEFAULT_GROUP_HISTORY_LIMIT } from "openclaw/plugin-sdk/reply-history";
|
||||
|
|
@ -26,9 +25,10 @@ import {
|
|||
import { formatError, getWebAuthAgeMs, readWebSelfId } from "../session.js";
|
||||
import { whatsappHeartbeatLog, whatsappLog } from "./loggers.js";
|
||||
import { buildMentionConfig } from "./mentions.js";
|
||||
import { createWebChannelStatusController } from "./monitor-state.js";
|
||||
import { createEchoTracker } from "./monitor/echo.js";
|
||||
import { createWebOnMessageHandler } from "./monitor/on-message.js";
|
||||
import type { WebChannelStatus, WebInboundMsg, WebMonitorTuning } from "./types.js";
|
||||
import type { WebInboundMsg, WebMonitorTuning } from "./types.js";
|
||||
import { isLikelyWhatsAppCryptoError } from "./util.js";
|
||||
|
||||
function isNonRetryableWebCloseStatus(statusCode: unknown): boolean {
|
||||
|
|
@ -37,6 +37,30 @@ function isNonRetryableWebCloseStatus(statusCode: unknown): boolean {
|
|||
return statusCode === 440;
|
||||
}
|
||||
|
||||
type ActiveConnectionRun = {
|
||||
connectionId: string;
|
||||
startedAt: number;
|
||||
heartbeat: NodeJS.Timeout | null;
|
||||
watchdogTimer: NodeJS.Timeout | null;
|
||||
lastInboundAt: number | null;
|
||||
handledMessages: number;
|
||||
unregisterUnhandled: (() => void) | null;
|
||||
backgroundTasks: Set<Promise<unknown>>;
|
||||
};
|
||||
|
||||
function createActiveConnectionRun(lastInboundAt: number | null): ActiveConnectionRun {
|
||||
return {
|
||||
connectionId: newConnectionId(),
|
||||
startedAt: Date.now(),
|
||||
heartbeat: null,
|
||||
watchdogTimer: null,
|
||||
lastInboundAt,
|
||||
handledMessages: 0,
|
||||
unregisterUnhandled: null,
|
||||
backgroundTasks: new Set<Promise<unknown>>(),
|
||||
};
|
||||
}
|
||||
|
||||
export async function monitorWebChannel(
|
||||
verbose: boolean,
|
||||
listenerFactory: typeof monitorWebInbox | undefined = monitorWebInbox,
|
||||
|
|
@ -50,23 +74,9 @@ export async function monitorWebChannel(
|
|||
const replyLogger = getChildLogger({ module: "web-auto-reply", runId });
|
||||
const heartbeatLogger = getChildLogger({ module: "web-heartbeat", runId });
|
||||
const reconnectLogger = getChildLogger({ module: "web-reconnect", runId });
|
||||
const status: WebChannelStatus = {
|
||||
running: true,
|
||||
connected: false,
|
||||
reconnectAttempts: 0,
|
||||
lastConnectedAt: null,
|
||||
lastDisconnect: null,
|
||||
lastMessageAt: null,
|
||||
lastEventAt: null,
|
||||
lastError: null,
|
||||
};
|
||||
const emitStatus = () => {
|
||||
tuning.statusSink?.({
|
||||
...status,
|
||||
lastDisconnect: status.lastDisconnect ? { ...status.lastDisconnect } : null,
|
||||
});
|
||||
};
|
||||
emitStatus();
|
||||
const statusController = createWebChannelStatusController(tuning.statusSink);
|
||||
const status = statusController.snapshot();
|
||||
statusController.emit();
|
||||
|
||||
const baseCfg = loadConfig();
|
||||
const account = resolveWhatsAppAccount({
|
||||
|
|
@ -147,34 +157,23 @@ export async function monitorWebChannel(
|
|||
break;
|
||||
}
|
||||
|
||||
const connectionId = newConnectionId();
|
||||
const startedAt = Date.now();
|
||||
let heartbeat: NodeJS.Timeout | null = null;
|
||||
let watchdogTimer: NodeJS.Timeout | null = null;
|
||||
// Preserve the last known inbound timestamp across reconnects so the watchdog
|
||||
// can still detect a listener that comes back "connected" but never receives
|
||||
// another message after a restart cycle.
|
||||
let lastMessageAt: number | null = status.lastMessageAt ?? null;
|
||||
let handledMessages = 0;
|
||||
let _lastInboundMsg: WebInboundMsg | null = null;
|
||||
let unregisterUnhandled: (() => void) | null = null;
|
||||
const active = createActiveConnectionRun(status.lastInboundAt ?? status.lastMessageAt ?? null);
|
||||
|
||||
// Watchdog to detect stuck message processing (e.g., event emitter died).
|
||||
// Tuning overrides are test-oriented; production defaults remain unchanged.
|
||||
const MESSAGE_TIMEOUT_MS = tuning.messageTimeoutMs ?? 30 * 60 * 1000; // 30m default
|
||||
const WATCHDOG_CHECK_MS = tuning.watchdogCheckMs ?? 60 * 1000; // 1m default
|
||||
|
||||
const backgroundTasks = new Set<Promise<unknown>>();
|
||||
const onMessage = createWebOnMessageHandler({
|
||||
cfg,
|
||||
verbose,
|
||||
connectionId,
|
||||
connectionId: active.connectionId,
|
||||
maxMediaBytes,
|
||||
groupHistoryLimit,
|
||||
groupHistories,
|
||||
groupMemberNames,
|
||||
echoTracker,
|
||||
backgroundTasks,
|
||||
backgroundTasks: active.backgroundTasks,
|
||||
replyResolver: replyResolver ?? getReplyFromConfig,
|
||||
replyLogger,
|
||||
baseMentionConfig,
|
||||
|
|
@ -204,19 +203,14 @@ export async function monitorWebChannel(
|
|||
debounceMs: inboundDebounceMs,
|
||||
shouldDebounce,
|
||||
onMessage: async (msg: WebInboundMsg) => {
|
||||
handledMessages += 1;
|
||||
lastMessageAt = Date.now();
|
||||
status.lastMessageAt = lastMessageAt;
|
||||
status.lastEventAt = lastMessageAt;
|
||||
emitStatus();
|
||||
_lastInboundMsg = msg;
|
||||
active.handledMessages += 1;
|
||||
active.lastInboundAt = Date.now();
|
||||
statusController.noteInbound(active.lastInboundAt);
|
||||
await onMessage(msg);
|
||||
},
|
||||
});
|
||||
|
||||
Object.assign(status, createConnectedChannelStatusPatch());
|
||||
status.lastError = null;
|
||||
emitStatus();
|
||||
statusController.noteConnected();
|
||||
|
||||
// Surface a concise connection event for the next main-session turn/heartbeat.
|
||||
const { e164: selfE164 } = readWebSelfId(account.authDir);
|
||||
|
|
@ -230,13 +224,13 @@ export async function monitorWebChannel(
|
|||
});
|
||||
|
||||
setActiveWebListener(account.accountId, listener);
|
||||
unregisterUnhandled = registerUnhandledRejectionHandler((reason) => {
|
||||
active.unregisterUnhandled = registerUnhandledRejectionHandler((reason) => {
|
||||
if (!isLikelyWhatsAppCryptoError(reason)) {
|
||||
return false;
|
||||
}
|
||||
const errorStr = formatError(reason);
|
||||
reconnectLogger.warn(
|
||||
{ connectionId, error: errorStr },
|
||||
{ connectionId: active.connectionId, error: errorStr },
|
||||
"web reconnect: unhandled rejection from WhatsApp socket; forcing reconnect",
|
||||
);
|
||||
listener.signalClose?.({
|
||||
|
|
@ -249,19 +243,19 @@ export async function monitorWebChannel(
|
|||
|
||||
const closeListener = async () => {
|
||||
setActiveWebListener(account.accountId, null);
|
||||
if (unregisterUnhandled) {
|
||||
unregisterUnhandled();
|
||||
unregisterUnhandled = null;
|
||||
if (active.unregisterUnhandled) {
|
||||
active.unregisterUnhandled();
|
||||
active.unregisterUnhandled = null;
|
||||
}
|
||||
if (heartbeat) {
|
||||
clearInterval(heartbeat);
|
||||
if (active.heartbeat) {
|
||||
clearInterval(active.heartbeat);
|
||||
}
|
||||
if (watchdogTimer) {
|
||||
clearInterval(watchdogTimer);
|
||||
if (active.watchdogTimer) {
|
||||
clearInterval(active.watchdogTimer);
|
||||
}
|
||||
if (backgroundTasks.size > 0) {
|
||||
await Promise.allSettled(backgroundTasks);
|
||||
backgroundTasks.clear();
|
||||
if (active.backgroundTasks.size > 0) {
|
||||
await Promise.allSettled(active.backgroundTasks);
|
||||
active.backgroundTasks.clear();
|
||||
}
|
||||
try {
|
||||
await listener.close();
|
||||
|
|
@ -271,19 +265,19 @@ export async function monitorWebChannel(
|
|||
};
|
||||
|
||||
if (keepAlive) {
|
||||
heartbeat = setInterval(() => {
|
||||
active.heartbeat = setInterval(() => {
|
||||
const authAgeMs = getWebAuthAgeMs(account.authDir);
|
||||
const minutesSinceLastMessage = lastMessageAt
|
||||
? Math.floor((Date.now() - lastMessageAt) / 60000)
|
||||
const minutesSinceLastMessage = active.lastInboundAt
|
||||
? Math.floor((Date.now() - active.lastInboundAt) / 60000)
|
||||
: null;
|
||||
|
||||
const logData = {
|
||||
connectionId,
|
||||
connectionId: active.connectionId,
|
||||
reconnectAttempts,
|
||||
messagesHandled: handledMessages,
|
||||
lastMessageAt,
|
||||
messagesHandled: active.handledMessages,
|
||||
lastInboundAt: active.lastInboundAt,
|
||||
authAgeMs,
|
||||
uptimeMs: Date.now() - startedAt,
|
||||
uptimeMs: Date.now() - active.startedAt,
|
||||
...(minutesSinceLastMessage !== null && minutesSinceLastMessage > 30
|
||||
? { minutesSinceLastMessage }
|
||||
: {}),
|
||||
|
|
@ -296,21 +290,22 @@ export async function monitorWebChannel(
|
|||
}
|
||||
}, heartbeatSeconds * 1000);
|
||||
|
||||
watchdogTimer = setInterval(() => {
|
||||
if (!lastMessageAt) {
|
||||
active.watchdogTimer = setInterval(() => {
|
||||
if (!active.lastInboundAt) {
|
||||
return;
|
||||
}
|
||||
const timeSinceLastMessage = Date.now() - lastMessageAt;
|
||||
const timeSinceLastMessage = Date.now() - active.lastInboundAt;
|
||||
if (timeSinceLastMessage <= MESSAGE_TIMEOUT_MS) {
|
||||
return;
|
||||
}
|
||||
const minutesSinceLastMessage = Math.floor(timeSinceLastMessage / 60000);
|
||||
statusController.noteWatchdogStale();
|
||||
heartbeatLogger.warn(
|
||||
{
|
||||
connectionId,
|
||||
connectionId: active.connectionId,
|
||||
minutesSinceLastMessage,
|
||||
lastMessageAt: new Date(lastMessageAt),
|
||||
messagesHandled: handledMessages,
|
||||
lastInboundAt: new Date(active.lastInboundAt),
|
||||
messagesHandled: active.handledMessages,
|
||||
},
|
||||
"Message timeout detected - forcing reconnect",
|
||||
);
|
||||
|
|
@ -347,12 +342,11 @@ export async function monitorWebChannel(
|
|||
abortPromise ?? waitForever(),
|
||||
]);
|
||||
|
||||
const uptimeMs = Date.now() - startedAt;
|
||||
const uptimeMs = Date.now() - active.startedAt;
|
||||
if (uptimeMs > heartbeatSeconds * 1000) {
|
||||
reconnectAttempts = 0; // Healthy stretch; reset the backoff.
|
||||
}
|
||||
status.reconnectAttempts = reconnectAttempts;
|
||||
emitStatus();
|
||||
statusController.noteReconnectAttempts(reconnectAttempts);
|
||||
|
||||
if (stopRequested() || sigintStop || reason === "aborted") {
|
||||
await closeListener();
|
||||
|
|
@ -370,21 +364,11 @@ export async function monitorWebChannel(
|
|||
(reason as { isLoggedOut?: boolean }).isLoggedOut;
|
||||
|
||||
const errorStr = formatError(reason);
|
||||
status.connected = false;
|
||||
status.lastEventAt = Date.now();
|
||||
status.lastDisconnect = {
|
||||
at: status.lastEventAt,
|
||||
status: typeof statusCode === "number" ? statusCode : undefined,
|
||||
error: errorStr,
|
||||
loggedOut: Boolean(loggedOut),
|
||||
};
|
||||
status.lastError = errorStr;
|
||||
status.reconnectAttempts = reconnectAttempts;
|
||||
emitStatus();
|
||||
const numericStatusCode = typeof statusCode === "number" ? statusCode : undefined;
|
||||
|
||||
reconnectLogger.info(
|
||||
{
|
||||
connectionId,
|
||||
connectionId: active.connectionId,
|
||||
status: statusCode,
|
||||
loggedOut,
|
||||
reconnectAttempts,
|
||||
|
|
@ -398,6 +382,13 @@ export async function monitorWebChannel(
|
|||
});
|
||||
|
||||
if (loggedOut) {
|
||||
statusController.noteClose({
|
||||
statusCode: numericStatusCode,
|
||||
loggedOut: true,
|
||||
error: errorStr,
|
||||
reconnectAttempts,
|
||||
healthState: "logged-out",
|
||||
});
|
||||
runtime.error(
|
||||
`WhatsApp session logged out. Run \`${formatCliCommand("openclaw channels login --channel web")}\` to relink.`,
|
||||
);
|
||||
|
|
@ -406,9 +397,15 @@ export async function monitorWebChannel(
|
|||
}
|
||||
|
||||
if (isNonRetryableWebCloseStatus(statusCode)) {
|
||||
statusController.noteClose({
|
||||
statusCode: numericStatusCode,
|
||||
error: errorStr,
|
||||
reconnectAttempts,
|
||||
healthState: "conflict",
|
||||
});
|
||||
reconnectLogger.warn(
|
||||
{
|
||||
connectionId,
|
||||
connectionId: active.connectionId,
|
||||
status: statusCode,
|
||||
error: errorStr,
|
||||
},
|
||||
|
|
@ -422,12 +419,16 @@ export async function monitorWebChannel(
|
|||
}
|
||||
|
||||
reconnectAttempts += 1;
|
||||
status.reconnectAttempts = reconnectAttempts;
|
||||
emitStatus();
|
||||
if (reconnectPolicy.maxAttempts > 0 && reconnectAttempts >= reconnectPolicy.maxAttempts) {
|
||||
statusController.noteClose({
|
||||
statusCode: numericStatusCode,
|
||||
error: errorStr,
|
||||
reconnectAttempts,
|
||||
healthState: "stopped",
|
||||
});
|
||||
reconnectLogger.warn(
|
||||
{
|
||||
connectionId,
|
||||
connectionId: active.connectionId,
|
||||
status: statusCode,
|
||||
reconnectAttempts,
|
||||
maxAttempts: reconnectPolicy.maxAttempts,
|
||||
|
|
@ -441,10 +442,16 @@ export async function monitorWebChannel(
|
|||
break;
|
||||
}
|
||||
|
||||
statusController.noteClose({
|
||||
statusCode: numericStatusCode,
|
||||
error: errorStr,
|
||||
reconnectAttempts,
|
||||
healthState: "reconnecting",
|
||||
});
|
||||
const delay = computeBackoff(reconnectPolicy, reconnectAttempts);
|
||||
reconnectLogger.info(
|
||||
{
|
||||
connectionId,
|
||||
connectionId: active.connectionId,
|
||||
status: statusCode,
|
||||
reconnectAttempts,
|
||||
maxAttempts: reconnectPolicy.maxAttempts || "unlimited",
|
||||
|
|
@ -463,10 +470,7 @@ export async function monitorWebChannel(
|
|||
}
|
||||
}
|
||||
|
||||
status.running = false;
|
||||
status.connected = false;
|
||||
status.lastEventAt = Date.now();
|
||||
emitStatus();
|
||||
statusController.markStopped();
|
||||
|
||||
process.removeListener("SIGINT", handleSigint);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,15 @@
|
|||
import type { monitorWebInbox } from "../inbound.js";
|
||||
import type { ReconnectPolicy } from "../reconnect.js";
|
||||
|
||||
export type WebChannelHealthState =
|
||||
| "starting"
|
||||
| "healthy"
|
||||
| "stale"
|
||||
| "reconnecting"
|
||||
| "conflict"
|
||||
| "logged-out"
|
||||
| "stopped";
|
||||
|
||||
export type WebInboundMsg = Parameters<typeof monitorWebInbox>[0]["onMessage"] extends (
|
||||
msg: infer M,
|
||||
) => unknown
|
||||
|
|
@ -18,9 +27,11 @@ export type WebChannelStatus = {
|
|||
error?: string;
|
||||
loggedOut?: boolean;
|
||||
} | null;
|
||||
lastInboundAt?: number | null;
|
||||
lastMessageAt?: number | null;
|
||||
lastEventAt?: number | null;
|
||||
lastError?: string | null;
|
||||
healthState?: WebChannelHealthState;
|
||||
};
|
||||
|
||||
export type WebMonitorTuning = {
|
||||
|
|
|
|||
|
|
@ -208,9 +208,11 @@ export const whatsappPlugin: ChannelPlugin<ResolvedWhatsAppAccount> = {
|
|||
reconnectAttempts: 0,
|
||||
lastConnectedAt: null,
|
||||
lastDisconnect: null,
|
||||
lastInboundAt: null,
|
||||
lastMessageAt: null,
|
||||
lastEventAt: null,
|
||||
lastError: null,
|
||||
healthState: "stopped",
|
||||
},
|
||||
collectStatusIssues: collectWhatsAppStatusIssues,
|
||||
buildChannelSummary: async ({ account, snapshot }) => {
|
||||
|
|
@ -237,9 +239,11 @@ export const whatsappPlugin: ChannelPlugin<ResolvedWhatsAppAccount> = {
|
|||
lastConnectedAt: snapshot.lastConnectedAt ?? null,
|
||||
lastDisconnect: snapshot.lastDisconnect ?? null,
|
||||
reconnectAttempts: snapshot.reconnectAttempts,
|
||||
lastInboundAt: snapshot.lastInboundAt ?? snapshot.lastMessageAt ?? null,
|
||||
lastMessageAt: snapshot.lastMessageAt ?? null,
|
||||
lastEventAt: snapshot.lastEventAt ?? null,
|
||||
lastError: snapshot.lastError ?? null,
|
||||
healthState: snapshot.healthState ?? undefined,
|
||||
};
|
||||
},
|
||||
buildAccountSnapshot: async ({ account, runtime }) => {
|
||||
|
|
@ -255,9 +259,11 @@ export const whatsappPlugin: ChannelPlugin<ResolvedWhatsAppAccount> = {
|
|||
reconnectAttempts: runtime?.reconnectAttempts,
|
||||
lastConnectedAt: runtime?.lastConnectedAt ?? null,
|
||||
lastDisconnect: runtime?.lastDisconnect ?? null,
|
||||
lastInboundAt: runtime?.lastInboundAt ?? runtime?.lastMessageAt ?? null,
|
||||
lastMessageAt: runtime?.lastMessageAt ?? null,
|
||||
lastEventAt: runtime?.lastEventAt ?? null,
|
||||
lastError: runtime?.lastError ?? null,
|
||||
healthState: runtime?.healthState ?? undefined,
|
||||
dmPolicy: account.dmPolicy,
|
||||
allowFrom: account.allowFrom,
|
||||
};
|
||||
|
|
|
|||
|
|
@ -0,0 +1,34 @@
|
|||
type Listener = (...args: unknown[]) => void;
|
||||
|
||||
type OffCapableEmitter = {
|
||||
on: (event: string, listener: Listener) => void;
|
||||
off?: (event: string, listener: Listener) => void;
|
||||
removeListener?: (event: string, listener: Listener) => void;
|
||||
};
|
||||
|
||||
type ClosableSocket = {
|
||||
ws?: {
|
||||
close?: () => void;
|
||||
};
|
||||
};
|
||||
|
||||
export function attachEmitterListener(
|
||||
emitter: OffCapableEmitter,
|
||||
event: string,
|
||||
listener: Listener,
|
||||
): () => void {
|
||||
emitter.on(event, listener);
|
||||
return () => {
|
||||
if (typeof emitter.off === "function") {
|
||||
emitter.off(event, listener);
|
||||
return;
|
||||
}
|
||||
if (typeof emitter.removeListener === "function") {
|
||||
emitter.removeListener(event, listener);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
export function closeInboundMonitorSocket(sock: ClosableSocket): void {
|
||||
sock.ws?.close?.();
|
||||
}
|
||||
|
|
@ -17,6 +17,7 @@ import {
|
|||
extractMentionedJids,
|
||||
extractText,
|
||||
} from "./extract.js";
|
||||
import { attachEmitterListener, closeInboundMonitorSocket } from "./lifecycle.js";
|
||||
import { downloadInboundMedia } from "./media.js";
|
||||
import { createWebSendApi } from "./send-api.js";
|
||||
import type { WebInboundMessage, WebListenerCloseReason } from "./types.js";
|
||||
|
|
@ -429,8 +430,6 @@ export async function monitorWebInbox(options: {
|
|||
await enqueueInboundMessage(msg, inbound, enriched);
|
||||
}
|
||||
};
|
||||
sock.ev.on("messages.upsert", handleMessagesUpsert);
|
||||
|
||||
const handleConnectionUpdate = (
|
||||
update: Partial<import("@whiskeysockets/baileys").ConnectionState>,
|
||||
) => {
|
||||
|
|
@ -448,7 +447,24 @@ export async function monitorWebInbox(options: {
|
|||
resolveClose({ status: undefined, isLoggedOut: false, error: err });
|
||||
}
|
||||
};
|
||||
sock.ev.on("connection.update", handleConnectionUpdate);
|
||||
const detachMessagesUpsert = attachEmitterListener(
|
||||
sock.ev as unknown as {
|
||||
on: (event: string, listener: (...args: unknown[]) => void) => void;
|
||||
off?: (event: string, listener: (...args: unknown[]) => void) => void;
|
||||
removeListener?: (event: string, listener: (...args: unknown[]) => void) => void;
|
||||
},
|
||||
"messages.upsert",
|
||||
handleMessagesUpsert as unknown as (...args: unknown[]) => void,
|
||||
);
|
||||
const detachConnectionUpdate = attachEmitterListener(
|
||||
sock.ev as unknown as {
|
||||
on: (event: string, listener: (...args: unknown[]) => void) => void;
|
||||
off?: (event: string, listener: (...args: unknown[]) => void) => void;
|
||||
removeListener?: (event: string, listener: (...args: unknown[]) => void) => void;
|
||||
},
|
||||
"connection.update",
|
||||
handleConnectionUpdate as unknown as (...args: unknown[]) => void,
|
||||
);
|
||||
|
||||
const sendApi = createWebSendApi({
|
||||
sock: {
|
||||
|
|
@ -461,24 +477,9 @@ export async function monitorWebInbox(options: {
|
|||
return {
|
||||
close: async () => {
|
||||
try {
|
||||
const ev = sock.ev as unknown as {
|
||||
off?: (event: string, listener: (...args: unknown[]) => void) => void;
|
||||
removeListener?: (event: string, listener: (...args: unknown[]) => void) => void;
|
||||
};
|
||||
const messagesUpsertHandler = handleMessagesUpsert as unknown as (
|
||||
...args: unknown[]
|
||||
) => void;
|
||||
const connectionUpdateHandler = handleConnectionUpdate as unknown as (
|
||||
...args: unknown[]
|
||||
) => void;
|
||||
if (typeof ev.off === "function") {
|
||||
ev.off("messages.upsert", messagesUpsertHandler);
|
||||
ev.off("connection.update", connectionUpdateHandler);
|
||||
} else if (typeof ev.removeListener === "function") {
|
||||
ev.removeListener("messages.upsert", messagesUpsertHandler);
|
||||
ev.removeListener("connection.update", connectionUpdateHandler);
|
||||
}
|
||||
sock.ws?.close();
|
||||
detachMessagesUpsert();
|
||||
detachConnectionUpdate();
|
||||
closeInboundMonitorSocket(sock);
|
||||
} catch (err) {
|
||||
logVerbose(`Socket close failed: ${String(err)}`);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,7 +16,9 @@ type WhatsAppAccountStatus = {
|
|||
connected?: unknown;
|
||||
running?: unknown;
|
||||
reconnectAttempts?: unknown;
|
||||
lastInboundAt?: unknown;
|
||||
lastError?: unknown;
|
||||
healthState?: unknown;
|
||||
};
|
||||
|
||||
function readWhatsAppAccountStatus(value: ChannelAccountSnapshot): WhatsAppAccountStatus | null {
|
||||
|
|
@ -30,7 +32,9 @@ function readWhatsAppAccountStatus(value: ChannelAccountSnapshot): WhatsAppAccou
|
|||
connected: value.connected,
|
||||
running: value.running,
|
||||
reconnectAttempts: value.reconnectAttempts,
|
||||
lastInboundAt: value.lastInboundAt,
|
||||
lastError: value.lastError,
|
||||
healthState: value.healthState,
|
||||
};
|
||||
}
|
||||
|
||||
|
|
@ -46,7 +50,10 @@ export function collectWhatsAppStatusIssues(
|
|||
const connected = account.connected === true;
|
||||
const reconnectAttempts =
|
||||
typeof account.reconnectAttempts === "number" ? account.reconnectAttempts : null;
|
||||
const lastInboundAt =
|
||||
typeof account.lastInboundAt === "number" ? account.lastInboundAt : null;
|
||||
const lastError = asString(account.lastError);
|
||||
const healthState = asString(account.healthState);
|
||||
|
||||
if (!linked) {
|
||||
issues.push({
|
||||
|
|
@ -59,6 +66,53 @@ export function collectWhatsAppStatusIssues(
|
|||
return;
|
||||
}
|
||||
|
||||
if (healthState === "stale") {
|
||||
const staleSuffix =
|
||||
lastInboundAt != null
|
||||
? ` (last inbound ${Math.max(0, Math.floor((Date.now() - lastInboundAt) / 60000))}m ago)`
|
||||
: "";
|
||||
issues.push({
|
||||
channel: "whatsapp",
|
||||
accountId,
|
||||
kind: "runtime",
|
||||
message: `Linked but stale${staleSuffix}${lastError ? `: ${lastError}` : "."}`,
|
||||
fix: `Run: ${formatCliCommand("openclaw doctor")} (or restart the gateway). If it persists, relink via channels login and check logs.`,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
if (
|
||||
healthState === "reconnecting" ||
|
||||
healthState === "conflict" ||
|
||||
healthState === "stopped"
|
||||
) {
|
||||
const stateLabel =
|
||||
healthState === "conflict"
|
||||
? "session conflict"
|
||||
: healthState === "reconnecting"
|
||||
? "reconnecting"
|
||||
: "stopped";
|
||||
issues.push({
|
||||
channel: "whatsapp",
|
||||
accountId,
|
||||
kind: "runtime",
|
||||
message: `Linked but ${stateLabel}${reconnectAttempts != null ? ` (reconnectAttempts=${reconnectAttempts})` : ""}${lastError ? `: ${lastError}` : "."}`,
|
||||
fix: `Run: ${formatCliCommand("openclaw doctor")} (or restart the gateway). If it persists, relink via channels login and check logs.`,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
if (healthState === "logged-out") {
|
||||
issues.push({
|
||||
channel: "whatsapp",
|
||||
accountId,
|
||||
kind: "auth",
|
||||
message: `Linked session logged out${lastError ? `: ${lastError}` : "."}`,
|
||||
fix: `Run: ${formatCliCommand("openclaw channels login")} (scan QR on the gateway host).`,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
if (running && !connected) {
|
||||
issues.push({
|
||||
channel: "whatsapp",
|
||||
|
|
|
|||
|
|
@ -195,6 +195,12 @@ export function projectSafeChannelAccountSnapshotFields(
|
|||
...(readNumber(record, "reconnectAttempts") !== undefined
|
||||
? { reconnectAttempts: readNumber(record, "reconnectAttempts") }
|
||||
: {}),
|
||||
...(readNumber(record, "lastInboundAt") !== undefined
|
||||
? { lastInboundAt: readNumber(record, "lastInboundAt") }
|
||||
: {}),
|
||||
...(readTrimmedString(record, "healthState")
|
||||
? { healthState: readTrimmedString(record, "healthState") }
|
||||
: {}),
|
||||
...(readTrimmedString(record, "mode") ? { mode: readTrimmedString(record, "mode") } : {}),
|
||||
...(readTrimmedString(record, "dmPolicy")
|
||||
? { dmPolicy: readTrimmedString(record, "dmPolicy") }
|
||||
|
|
|
|||
|
|
@ -164,6 +164,7 @@ export type ChannelAccountSnapshot = {
|
|||
lastMessageAt?: number | null;
|
||||
lastEventAt?: number | null;
|
||||
lastError?: string | null;
|
||||
healthState?: string;
|
||||
lastStartAt?: number | null;
|
||||
lastStopAt?: number | null;
|
||||
lastInboundAt?: number | null;
|
||||
|
|
|
|||
|
|
@ -137,6 +137,7 @@ export const ChannelAccountSnapshotSchema = Type.Object(
|
|||
reconnectAttempts: Type.Optional(Type.Integer({ minimum: 0 })),
|
||||
lastConnectedAt: Type.Optional(Type.Integer({ minimum: 0 })),
|
||||
lastError: Type.Optional(Type.String()),
|
||||
healthState: Type.Optional(Type.String()),
|
||||
lastStartAt: Type.Optional(Type.Integer({ minimum: 0 })),
|
||||
lastStopAt: Type.Optional(Type.Integer({ minimum: 0 })),
|
||||
lastInboundAt: Type.Optional(Type.Integer({ minimum: 0 })),
|
||||
|
|
|
|||
Loading…
Reference in New Issue