mirror of https://github.com/openclaw/openclaw.git
486 lines
16 KiB
TypeScript
486 lines
16 KiB
TypeScript
import { resolveChannelDefaultAccountId } from "../channels/plugins/helpers.js";
|
|
import { type ChannelId, getChannelPlugin, listChannelPlugins } from "../channels/plugins/index.js";
|
|
import type { ChannelAccountSnapshot } from "../channels/plugins/types.js";
|
|
import type { OpenClawConfig } from "../config/config.js";
|
|
import { type BackoffPolicy, computeBackoff, sleepWithAbort } from "../infra/backoff.js";
|
|
import { formatErrorMessage } from "../infra/errors.js";
|
|
import { resetDirectoryCache } from "../infra/outbound/target-resolver.js";
|
|
import type { createSubsystemLogger } from "../logging/subsystem.js";
|
|
import type { PluginRuntime } from "../plugins/runtime/types.js";
|
|
import { DEFAULT_ACCOUNT_ID } from "../routing/session-key.js";
|
|
import type { RuntimeEnv } from "../runtime.js";
|
|
|
|
const CHANNEL_RESTART_POLICY: BackoffPolicy = {
|
|
initialMs: 5_000,
|
|
maxMs: 5 * 60_000,
|
|
factor: 2,
|
|
jitter: 0.1,
|
|
};
|
|
const MAX_RESTART_ATTEMPTS = 10;
|
|
|
|
export type ChannelRuntimeSnapshot = {
|
|
channels: Partial<Record<ChannelId, ChannelAccountSnapshot>>;
|
|
channelAccounts: Partial<Record<ChannelId, Record<string, ChannelAccountSnapshot>>>;
|
|
};
|
|
|
|
type SubsystemLogger = ReturnType<typeof createSubsystemLogger>;
|
|
|
|
type ChannelRuntimeStore = {
|
|
aborts: Map<string, AbortController>;
|
|
tasks: Map<string, Promise<unknown>>;
|
|
runtimes: Map<string, ChannelAccountSnapshot>;
|
|
};
|
|
|
|
type RawHealthMonitorEntry = {
|
|
healthMonitor?: {
|
|
enabled?: boolean;
|
|
};
|
|
};
|
|
|
|
type RawChannelConfig = RawHealthMonitorEntry & {
|
|
accounts?: Record<string, RawHealthMonitorEntry | undefined>;
|
|
};
|
|
|
|
function createRuntimeStore(): ChannelRuntimeStore {
|
|
return {
|
|
aborts: new Map(),
|
|
tasks: new Map(),
|
|
runtimes: new Map(),
|
|
};
|
|
}
|
|
|
|
function isAccountEnabled(account: unknown): boolean {
|
|
if (!account || typeof account !== "object") {
|
|
return true;
|
|
}
|
|
const enabled = (account as { enabled?: boolean }).enabled;
|
|
return enabled !== false;
|
|
}
|
|
|
|
function resolveDefaultRuntime(channelId: ChannelId): ChannelAccountSnapshot {
|
|
const plugin = getChannelPlugin(channelId);
|
|
return plugin?.status?.defaultRuntime ?? { accountId: DEFAULT_ACCOUNT_ID };
|
|
}
|
|
|
|
function cloneDefaultRuntime(channelId: ChannelId, accountId: string): ChannelAccountSnapshot {
|
|
return { ...resolveDefaultRuntime(channelId), accountId };
|
|
}
|
|
|
|
type ChannelManagerOptions = {
|
|
loadConfig: () => OpenClawConfig;
|
|
channelLogs: Record<ChannelId, SubsystemLogger>;
|
|
channelRuntimeEnvs: Record<ChannelId, RuntimeEnv>;
|
|
/**
|
|
* Optional channel runtime helpers for external channel plugins.
|
|
*
|
|
* When provided, this value is passed to all channel plugins via the
|
|
* `channelRuntime` field in `ChannelGatewayContext`, enabling external
|
|
* plugins to access advanced Plugin SDK features (AI dispatch, routing,
|
|
* text processing, etc.).
|
|
*
|
|
* Built-in channels (slack, discord, telegram) typically don't use this
|
|
* because they can directly import internal modules from the monorepo.
|
|
*
|
|
* This field is optional - omitting it maintains backward compatibility
|
|
* with existing channels.
|
|
*
|
|
* @example
|
|
* ```typescript
|
|
* import { createPluginRuntime } from "../plugins/runtime/index.js";
|
|
*
|
|
* const channelManager = createChannelManager({
|
|
* loadConfig,
|
|
* channelLogs,
|
|
* channelRuntimeEnvs,
|
|
* channelRuntime: createPluginRuntime().channel,
|
|
* });
|
|
* ```
|
|
*
|
|
* @since Plugin SDK 2026.2.19
|
|
* @see {@link ChannelGatewayContext.channelRuntime}
|
|
*/
|
|
channelRuntime?: PluginRuntime["channel"];
|
|
};
|
|
|
|
type StartChannelOptions = {
|
|
preserveRestartAttempts?: boolean;
|
|
preserveManualStop?: boolean;
|
|
};
|
|
|
|
export type ChannelManager = {
|
|
getRuntimeSnapshot: () => ChannelRuntimeSnapshot;
|
|
startChannels: () => Promise<void>;
|
|
startChannel: (channel: ChannelId, accountId?: string) => Promise<void>;
|
|
stopChannel: (channel: ChannelId, accountId?: string) => Promise<void>;
|
|
markChannelLoggedOut: (channelId: ChannelId, cleared: boolean, accountId?: string) => void;
|
|
isManuallyStopped: (channelId: ChannelId, accountId: string) => boolean;
|
|
resetRestartAttempts: (channelId: ChannelId, accountId: string) => void;
|
|
isHealthMonitorEnabled: (channelId: ChannelId, accountId: string) => boolean;
|
|
};
|
|
|
|
// Channel docking: lifecycle hooks (`plugin.gateway`) flow through this manager.
|
|
export function createChannelManager(opts: ChannelManagerOptions): ChannelManager {
|
|
const { loadConfig, channelLogs, channelRuntimeEnvs, channelRuntime } = opts;
|
|
|
|
const channelStores = new Map<ChannelId, ChannelRuntimeStore>();
|
|
// Tracks restart attempts per channel:account. Reset on successful start.
|
|
const restartAttempts = new Map<string, number>();
|
|
// Tracks accounts that were manually stopped so we don't auto-restart them.
|
|
const manuallyStopped = new Set<string>();
|
|
|
|
const restartKey = (channelId: ChannelId, accountId: string) => `${channelId}:${accountId}`;
|
|
|
|
const isHealthMonitorEnabled = (channelId: ChannelId, accountId: string): boolean => {
|
|
const cfg = loadConfig();
|
|
const channelConfig = cfg.channels?.[channelId] as RawChannelConfig | undefined;
|
|
const accountOverride = channelConfig?.accounts?.[accountId]?.healthMonitor?.enabled;
|
|
if (typeof accountOverride === "boolean") {
|
|
return accountOverride;
|
|
}
|
|
|
|
const channelOverride = channelConfig?.healthMonitor?.enabled;
|
|
if (typeof channelOverride === "boolean") {
|
|
return channelOverride;
|
|
}
|
|
|
|
return true;
|
|
};
|
|
|
|
const getStore = (channelId: ChannelId): ChannelRuntimeStore => {
|
|
const existing = channelStores.get(channelId);
|
|
if (existing) {
|
|
return existing;
|
|
}
|
|
const next = createRuntimeStore();
|
|
channelStores.set(channelId, next);
|
|
return next;
|
|
};
|
|
|
|
const getRuntime = (channelId: ChannelId, accountId: string): ChannelAccountSnapshot => {
|
|
const store = getStore(channelId);
|
|
return store.runtimes.get(accountId) ?? cloneDefaultRuntime(channelId, accountId);
|
|
};
|
|
|
|
const setRuntime = (
|
|
channelId: ChannelId,
|
|
accountId: string,
|
|
patch: ChannelAccountSnapshot,
|
|
): ChannelAccountSnapshot => {
|
|
const store = getStore(channelId);
|
|
const current = getRuntime(channelId, accountId);
|
|
const next = { ...current, ...patch, accountId };
|
|
store.runtimes.set(accountId, next);
|
|
return next;
|
|
};
|
|
|
|
const startChannelInternal = async (
|
|
channelId: ChannelId,
|
|
accountId?: string,
|
|
opts: StartChannelOptions = {},
|
|
) => {
|
|
const plugin = getChannelPlugin(channelId);
|
|
const startAccount = plugin?.gateway?.startAccount;
|
|
if (!startAccount) {
|
|
return;
|
|
}
|
|
const { preserveRestartAttempts = false, preserveManualStop = false } = opts;
|
|
const cfg = loadConfig();
|
|
resetDirectoryCache({ channel: channelId, accountId });
|
|
const store = getStore(channelId);
|
|
const accountIds = accountId ? [accountId] : plugin.config.listAccountIds(cfg);
|
|
if (accountIds.length === 0) {
|
|
return;
|
|
}
|
|
|
|
await Promise.all(
|
|
accountIds.map(async (id) => {
|
|
if (store.tasks.has(id)) {
|
|
return;
|
|
}
|
|
const account = plugin.config.resolveAccount(cfg, id);
|
|
const enabled = plugin.config.isEnabled
|
|
? plugin.config.isEnabled(account, cfg)
|
|
: isAccountEnabled(account);
|
|
if (!enabled) {
|
|
setRuntime(channelId, id, {
|
|
accountId: id,
|
|
enabled: false,
|
|
configured: true,
|
|
running: false,
|
|
restartPending: false,
|
|
lastError: plugin.config.disabledReason?.(account, cfg) ?? "disabled",
|
|
});
|
|
return;
|
|
}
|
|
|
|
let configured = true;
|
|
if (plugin.config.isConfigured) {
|
|
configured = await plugin.config.isConfigured(account, cfg);
|
|
}
|
|
if (!configured) {
|
|
setRuntime(channelId, id, {
|
|
accountId: id,
|
|
enabled: true,
|
|
configured: false,
|
|
running: false,
|
|
restartPending: false,
|
|
lastError: plugin.config.unconfiguredReason?.(account, cfg) ?? "not configured",
|
|
});
|
|
return;
|
|
}
|
|
|
|
const rKey = restartKey(channelId, id);
|
|
if (!preserveManualStop) {
|
|
manuallyStopped.delete(rKey);
|
|
}
|
|
|
|
const abort = new AbortController();
|
|
store.aborts.set(id, abort);
|
|
if (!preserveRestartAttempts) {
|
|
restartAttempts.delete(rKey);
|
|
}
|
|
setRuntime(channelId, id, {
|
|
accountId: id,
|
|
enabled: true,
|
|
configured: true,
|
|
running: true,
|
|
restartPending: false,
|
|
lastStartAt: Date.now(),
|
|
lastError: null,
|
|
reconnectAttempts: preserveRestartAttempts ? (restartAttempts.get(rKey) ?? 0) : 0,
|
|
});
|
|
|
|
const log = channelLogs[channelId];
|
|
const task = startAccount({
|
|
cfg,
|
|
accountId: id,
|
|
account,
|
|
runtime: channelRuntimeEnvs[channelId],
|
|
abortSignal: abort.signal,
|
|
log,
|
|
getStatus: () => getRuntime(channelId, id),
|
|
setStatus: (next) => setRuntime(channelId, id, next),
|
|
...(channelRuntime ? { channelRuntime } : {}),
|
|
});
|
|
const trackedPromise = Promise.resolve(task)
|
|
.catch((err) => {
|
|
const message = formatErrorMessage(err);
|
|
setRuntime(channelId, id, { accountId: id, lastError: message });
|
|
log.error?.(`[${id}] channel exited: ${message}`);
|
|
})
|
|
.finally(() => {
|
|
setRuntime(channelId, id, {
|
|
accountId: id,
|
|
running: false,
|
|
lastStopAt: Date.now(),
|
|
});
|
|
})
|
|
.then(async () => {
|
|
if (manuallyStopped.has(rKey)) {
|
|
return;
|
|
}
|
|
const attempt = (restartAttempts.get(rKey) ?? 0) + 1;
|
|
restartAttempts.set(rKey, attempt);
|
|
if (attempt > MAX_RESTART_ATTEMPTS) {
|
|
setRuntime(channelId, id, {
|
|
accountId: id,
|
|
restartPending: false,
|
|
reconnectAttempts: attempt,
|
|
});
|
|
log.error?.(`[${id}] giving up after ${MAX_RESTART_ATTEMPTS} restart attempts`);
|
|
return;
|
|
}
|
|
const delayMs = computeBackoff(CHANNEL_RESTART_POLICY, attempt);
|
|
log.info?.(
|
|
`[${id}] auto-restart attempt ${attempt}/${MAX_RESTART_ATTEMPTS} in ${Math.round(delayMs / 1000)}s`,
|
|
);
|
|
setRuntime(channelId, id, {
|
|
accountId: id,
|
|
restartPending: true,
|
|
reconnectAttempts: attempt,
|
|
});
|
|
try {
|
|
await sleepWithAbort(delayMs, abort.signal);
|
|
if (manuallyStopped.has(rKey)) {
|
|
return;
|
|
}
|
|
if (store.tasks.get(id) === trackedPromise) {
|
|
store.tasks.delete(id);
|
|
}
|
|
if (store.aborts.get(id) === abort) {
|
|
store.aborts.delete(id);
|
|
}
|
|
await startChannelInternal(channelId, id, {
|
|
preserveRestartAttempts: true,
|
|
preserveManualStop: true,
|
|
});
|
|
} catch {
|
|
// abort or startup failure — next crash will retry
|
|
}
|
|
})
|
|
.finally(() => {
|
|
if (store.tasks.get(id) === trackedPromise) {
|
|
store.tasks.delete(id);
|
|
}
|
|
if (store.aborts.get(id) === abort) {
|
|
store.aborts.delete(id);
|
|
}
|
|
});
|
|
store.tasks.set(id, trackedPromise);
|
|
}),
|
|
);
|
|
};
|
|
|
|
const startChannel = async (channelId: ChannelId, accountId?: string) => {
|
|
await startChannelInternal(channelId, accountId);
|
|
};
|
|
|
|
const stopChannel = async (channelId: ChannelId, accountId?: string) => {
|
|
const plugin = getChannelPlugin(channelId);
|
|
const store = getStore(channelId);
|
|
// Fast path: nothing running and no explicit plugin shutdown hook to run.
|
|
if (!plugin?.gateway?.stopAccount && store.aborts.size === 0 && store.tasks.size === 0) {
|
|
return;
|
|
}
|
|
const cfg = loadConfig();
|
|
const knownIds = new Set<string>([
|
|
...store.aborts.keys(),
|
|
...store.tasks.keys(),
|
|
...(plugin ? plugin.config.listAccountIds(cfg) : []),
|
|
]);
|
|
if (accountId) {
|
|
knownIds.clear();
|
|
knownIds.add(accountId);
|
|
}
|
|
|
|
await Promise.all(
|
|
Array.from(knownIds.values()).map(async (id) => {
|
|
const abort = store.aborts.get(id);
|
|
const task = store.tasks.get(id);
|
|
if (!abort && !task && !plugin?.gateway?.stopAccount) {
|
|
return;
|
|
}
|
|
manuallyStopped.add(restartKey(channelId, id));
|
|
abort?.abort();
|
|
if (plugin?.gateway?.stopAccount) {
|
|
const account = plugin.config.resolveAccount(cfg, id);
|
|
await plugin.gateway.stopAccount({
|
|
cfg,
|
|
accountId: id,
|
|
account,
|
|
runtime: channelRuntimeEnvs[channelId],
|
|
abortSignal: abort?.signal ?? new AbortController().signal,
|
|
log: channelLogs[channelId],
|
|
getStatus: () => getRuntime(channelId, id),
|
|
setStatus: (next) => setRuntime(channelId, id, next),
|
|
});
|
|
}
|
|
try {
|
|
await task;
|
|
} catch {
|
|
// ignore
|
|
}
|
|
store.aborts.delete(id);
|
|
store.tasks.delete(id);
|
|
setRuntime(channelId, id, {
|
|
accountId: id,
|
|
running: false,
|
|
restartPending: false,
|
|
lastStopAt: Date.now(),
|
|
});
|
|
}),
|
|
);
|
|
};
|
|
|
|
const startChannels = async () => {
|
|
for (const plugin of listChannelPlugins()) {
|
|
await startChannel(plugin.id);
|
|
}
|
|
};
|
|
|
|
const markChannelLoggedOut = (channelId: ChannelId, cleared: boolean, accountId?: string) => {
|
|
const plugin = getChannelPlugin(channelId);
|
|
if (!plugin) {
|
|
return;
|
|
}
|
|
const cfg = loadConfig();
|
|
const resolvedId =
|
|
accountId ??
|
|
resolveChannelDefaultAccountId({
|
|
plugin,
|
|
cfg,
|
|
});
|
|
const current = getRuntime(channelId, resolvedId);
|
|
const next: ChannelAccountSnapshot = {
|
|
accountId: resolvedId,
|
|
running: false,
|
|
restartPending: false,
|
|
lastError: cleared ? "logged out" : current.lastError,
|
|
};
|
|
if (typeof current.connected === "boolean") {
|
|
next.connected = false;
|
|
}
|
|
setRuntime(channelId, resolvedId, next);
|
|
};
|
|
|
|
const getRuntimeSnapshot = (): ChannelRuntimeSnapshot => {
|
|
const cfg = loadConfig();
|
|
const channels: ChannelRuntimeSnapshot["channels"] = {};
|
|
const channelAccounts: ChannelRuntimeSnapshot["channelAccounts"] = {};
|
|
for (const plugin of listChannelPlugins()) {
|
|
const store = getStore(plugin.id);
|
|
const accountIds = plugin.config.listAccountIds(cfg);
|
|
const defaultAccountId = resolveChannelDefaultAccountId({
|
|
plugin,
|
|
cfg,
|
|
accountIds,
|
|
});
|
|
const accounts: Record<string, ChannelAccountSnapshot> = {};
|
|
for (const id of accountIds) {
|
|
const account = plugin.config.resolveAccount(cfg, id);
|
|
const enabled = plugin.config.isEnabled
|
|
? plugin.config.isEnabled(account, cfg)
|
|
: isAccountEnabled(account);
|
|
const described = plugin.config.describeAccount?.(account, cfg);
|
|
const configured = described?.configured;
|
|
const current = store.runtimes.get(id) ?? cloneDefaultRuntime(plugin.id, id);
|
|
const next = { ...current, accountId: id };
|
|
next.enabled = enabled;
|
|
next.configured = typeof configured === "boolean" ? configured : (next.configured ?? true);
|
|
if (!next.running) {
|
|
if (!enabled) {
|
|
next.lastError ??= plugin.config.disabledReason?.(account, cfg) ?? "disabled";
|
|
} else if (configured === false) {
|
|
next.lastError ??= plugin.config.unconfiguredReason?.(account, cfg) ?? "not configured";
|
|
}
|
|
}
|
|
accounts[id] = next;
|
|
}
|
|
const defaultAccount =
|
|
accounts[defaultAccountId] ?? cloneDefaultRuntime(plugin.id, defaultAccountId);
|
|
channels[plugin.id] = defaultAccount;
|
|
channelAccounts[plugin.id] = accounts;
|
|
}
|
|
return { channels, channelAccounts };
|
|
};
|
|
|
|
const isManuallyStopped_ = (channelId: ChannelId, accountId: string): boolean => {
|
|
return manuallyStopped.has(restartKey(channelId, accountId));
|
|
};
|
|
|
|
const resetRestartAttempts_ = (channelId: ChannelId, accountId: string): void => {
|
|
restartAttempts.delete(restartKey(channelId, accountId));
|
|
};
|
|
|
|
return {
|
|
getRuntimeSnapshot,
|
|
startChannels,
|
|
startChannel,
|
|
stopChannel,
|
|
markChannelLoggedOut,
|
|
isManuallyStopped: isManuallyStopped_,
|
|
resetRestartAttempts: resetRestartAttempts_,
|
|
isHealthMonitorEnabled,
|
|
};
|
|
}
|