import fs from "node:fs"; import os from "node:os"; import path from "node:path"; import { formatThreadBindingDurationLabel, registerSessionBindingAdapter, resolveThreadBindingConversationIdFromBindingId, resolveThreadBindingEffectiveExpiresAt, unregisterSessionBindingAdapter, type BindingTargetKind, type SessionBindingAdapter, type SessionBindingRecord, } from "openclaw/plugin-sdk/conversation-runtime"; import { writeJsonAtomic } from "openclaw/plugin-sdk/infra-runtime"; import { normalizeAccountId } from "openclaw/plugin-sdk/routing"; import { logVerbose } from "openclaw/plugin-sdk/runtime-env"; import { resolveStateDir } from "openclaw/plugin-sdk/state-paths"; import { resolveGlobalSingleton } from "openclaw/plugin-sdk/text-runtime"; const DEFAULT_THREAD_BINDING_IDLE_TIMEOUT_MS = 24 * 60 * 60 * 1000; const DEFAULT_THREAD_BINDING_MAX_AGE_MS = 0; const THREAD_BINDINGS_SWEEP_INTERVAL_MS = 60_000; const STORE_VERSION = 1; type TelegramBindingTargetKind = "subagent" | "acp"; export type TelegramThreadBindingRecord = { accountId: string; conversationId: string; targetKind: TelegramBindingTargetKind; targetSessionKey: string; agentId?: string; label?: string; boundBy?: string; boundAt: number; lastActivityAt: number; idleTimeoutMs?: number; maxAgeMs?: number; metadata?: Record; }; type StoredTelegramBindingState = { version: number; bindings: TelegramThreadBindingRecord[]; }; export type TelegramThreadBindingManager = { accountId: string; shouldPersistMutations: () => boolean; getIdleTimeoutMs: () => number; getMaxAgeMs: () => number; getByConversationId: (conversationId: string) => TelegramThreadBindingRecord | undefined; listBySessionKey: (targetSessionKey: string) => TelegramThreadBindingRecord[]; listBindings: () => TelegramThreadBindingRecord[]; touchConversation: (conversationId: string, at?: number) => TelegramThreadBindingRecord | null; unbindConversation: (params: { conversationId: string; reason?: string; sendFarewell?: boolean; }) => TelegramThreadBindingRecord | null; unbindBySessionKey: (params: { targetSessionKey: string; reason?: string; sendFarewell?: boolean; }) => TelegramThreadBindingRecord[]; stop: () => void; }; type TelegramThreadBindingsState = { managersByAccountId: Map; bindingsByAccountConversation: Map; persistQueueByAccountId: Map>; }; /** * Keep Telegram thread binding state shared across bundled chunks so routing, * binding lookups, and binding mutations all observe the same live registry. */ const TELEGRAM_THREAD_BINDINGS_STATE_KEY = Symbol.for("openclaw.telegramThreadBindingsState"); const threadBindingsState = resolveGlobalSingleton( TELEGRAM_THREAD_BINDINGS_STATE_KEY, () => ({ managersByAccountId: new Map(), bindingsByAccountConversation: new Map(), persistQueueByAccountId: new Map>(), }), ); function getThreadBindingsState(): TelegramThreadBindingsState { return threadBindingsState; } function normalizeDurationMs(raw: unknown, fallback: number): number { if (typeof raw !== "number" || !Number.isFinite(raw)) { return fallback; } return Math.max(0, Math.floor(raw)); } function normalizeConversationId(raw: unknown): string | undefined { if (typeof raw !== "string") { return undefined; } const trimmed = raw.trim(); return trimmed || undefined; } function resolveBindingKey(params: { accountId: string; conversationId: string }): string { return `${params.accountId}:${params.conversationId}`; } function toSessionBindingTargetKind(raw: TelegramBindingTargetKind): BindingTargetKind { return raw === "subagent" ? "subagent" : "session"; } function toTelegramTargetKind(raw: BindingTargetKind): TelegramBindingTargetKind { return raw === "subagent" ? "subagent" : "acp"; } function toSessionBindingRecord( record: TelegramThreadBindingRecord, defaults: { idleTimeoutMs: number; maxAgeMs: number }, ): SessionBindingRecord { return { bindingId: resolveBindingKey({ accountId: record.accountId, conversationId: record.conversationId, }), targetSessionKey: record.targetSessionKey, targetKind: toSessionBindingTargetKind(record.targetKind), conversation: { channel: "telegram", accountId: record.accountId, conversationId: record.conversationId, }, status: "active", boundAt: record.boundAt, expiresAt: resolveThreadBindingEffectiveExpiresAt({ record, defaultIdleTimeoutMs: defaults.idleTimeoutMs, defaultMaxAgeMs: defaults.maxAgeMs, }), metadata: { agentId: record.agentId, label: record.label, boundBy: record.boundBy, lastActivityAt: record.lastActivityAt, idleTimeoutMs: typeof record.idleTimeoutMs === "number" ? Math.max(0, Math.floor(record.idleTimeoutMs)) : defaults.idleTimeoutMs, maxAgeMs: typeof record.maxAgeMs === "number" ? Math.max(0, Math.floor(record.maxAgeMs)) : defaults.maxAgeMs, ...record.metadata, }, }; } function fromSessionBindingInput(params: { accountId: string; input: { targetSessionKey: string; targetKind: BindingTargetKind; conversationId: string; metadata?: Record; }; }): TelegramThreadBindingRecord { const now = Date.now(); const metadata = params.input.metadata ?? {}; const existing = getThreadBindingsState().bindingsByAccountConversation.get( resolveBindingKey({ accountId: params.accountId, conversationId: params.input.conversationId, }), ); const record: TelegramThreadBindingRecord = { accountId: params.accountId, conversationId: params.input.conversationId, targetKind: toTelegramTargetKind(params.input.targetKind), targetSessionKey: params.input.targetSessionKey, agentId: typeof metadata.agentId === "string" && metadata.agentId.trim() ? metadata.agentId.trim() : existing?.agentId, label: typeof metadata.label === "string" && metadata.label.trim() ? metadata.label.trim() : existing?.label, boundBy: typeof metadata.boundBy === "string" && metadata.boundBy.trim() ? metadata.boundBy.trim() : existing?.boundBy, boundAt: now, lastActivityAt: now, metadata: { ...existing?.metadata, ...metadata, }, }; if (typeof metadata.idleTimeoutMs === "number" && Number.isFinite(metadata.idleTimeoutMs)) { record.idleTimeoutMs = Math.max(0, Math.floor(metadata.idleTimeoutMs)); } else if (typeof existing?.idleTimeoutMs === "number") { record.idleTimeoutMs = existing.idleTimeoutMs; } if (typeof metadata.maxAgeMs === "number" && Number.isFinite(metadata.maxAgeMs)) { record.maxAgeMs = Math.max(0, Math.floor(metadata.maxAgeMs)); } else if (typeof existing?.maxAgeMs === "number") { record.maxAgeMs = existing.maxAgeMs; } return record; } function resolveBindingsPath(accountId: string, env: NodeJS.ProcessEnv = process.env): string { const stateDir = resolveStateDir(env, os.homedir); return path.join(stateDir, "telegram", `thread-bindings-${accountId}.json`); } function summarizeLifecycleForLog( record: TelegramThreadBindingRecord, defaults: { idleTimeoutMs: number; maxAgeMs: number; }, ) { const idleTimeoutMs = typeof record.idleTimeoutMs === "number" ? record.idleTimeoutMs : defaults.idleTimeoutMs; const maxAgeMs = typeof record.maxAgeMs === "number" ? record.maxAgeMs : defaults.maxAgeMs; const idleLabel = formatThreadBindingDurationLabel(Math.max(0, Math.floor(idleTimeoutMs))); const maxAgeLabel = formatThreadBindingDurationLabel(Math.max(0, Math.floor(maxAgeMs))); return `idle=${idleLabel} maxAge=${maxAgeLabel}`; } function loadBindingsFromDisk(accountId: string): TelegramThreadBindingRecord[] { const filePath = resolveBindingsPath(accountId); try { const raw = fs.readFileSync(filePath, "utf-8"); const parsed = JSON.parse(raw) as StoredTelegramBindingState; if (parsed?.version !== STORE_VERSION || !Array.isArray(parsed.bindings)) { return []; } const bindings: TelegramThreadBindingRecord[] = []; for (const entry of parsed.bindings) { const conversationId = normalizeConversationId(entry?.conversationId); const targetSessionKey = typeof entry?.targetSessionKey === "string" ? entry.targetSessionKey.trim() : ""; const targetKind = entry?.targetKind === "subagent" ? "subagent" : "acp"; if (!conversationId || !targetSessionKey) { continue; } const boundAt = typeof entry?.boundAt === "number" && Number.isFinite(entry.boundAt) ? Math.floor(entry.boundAt) : Date.now(); const lastActivityAt = typeof entry?.lastActivityAt === "number" && Number.isFinite(entry.lastActivityAt) ? Math.floor(entry.lastActivityAt) : boundAt; const record: TelegramThreadBindingRecord = { accountId, conversationId, targetSessionKey, targetKind, boundAt, lastActivityAt, }; if (typeof entry?.idleTimeoutMs === "number" && Number.isFinite(entry.idleTimeoutMs)) { record.idleTimeoutMs = Math.max(0, Math.floor(entry.idleTimeoutMs)); } if (typeof entry?.maxAgeMs === "number" && Number.isFinite(entry.maxAgeMs)) { record.maxAgeMs = Math.max(0, Math.floor(entry.maxAgeMs)); } if (typeof entry?.agentId === "string" && entry.agentId.trim()) { record.agentId = entry.agentId.trim(); } if (typeof entry?.label === "string" && entry.label.trim()) { record.label = entry.label.trim(); } if (typeof entry?.boundBy === "string" && entry.boundBy.trim()) { record.boundBy = entry.boundBy.trim(); } if (entry?.metadata && typeof entry.metadata === "object") { record.metadata = { ...entry.metadata }; } bindings.push(record); } return bindings; } catch (err) { const code = (err as { code?: string }).code; if (code !== "ENOENT") { logVerbose(`telegram thread bindings load failed (${accountId}): ${String(err)}`); } return []; } } async function persistBindingsToDisk(params: { accountId: string; persist: boolean; bindings?: TelegramThreadBindingRecord[]; }): Promise { if (!params.persist) { return; } const payload: StoredTelegramBindingState = { version: STORE_VERSION, bindings: params.bindings ?? [...getThreadBindingsState().bindingsByAccountConversation.values()].filter( (entry) => entry.accountId === params.accountId, ), }; await writeJsonAtomic(resolveBindingsPath(params.accountId), payload, { mode: 0o600, trailingNewline: true, ensureDirMode: 0o700, }); } function listBindingsForAccount(accountId: string): TelegramThreadBindingRecord[] { return [...getThreadBindingsState().bindingsByAccountConversation.values()].filter( (entry) => entry.accountId === accountId, ); } function enqueuePersistBindings(params: { accountId: string; persist: boolean; bindings?: TelegramThreadBindingRecord[]; }): Promise { if (!params.persist) { return Promise.resolve(); } const previous = getThreadBindingsState().persistQueueByAccountId.get(params.accountId) ?? Promise.resolve(); const next = previous .catch(() => undefined) .then(async () => { await persistBindingsToDisk(params); }); getThreadBindingsState().persistQueueByAccountId.set(params.accountId, next); void next.finally(() => { if (getThreadBindingsState().persistQueueByAccountId.get(params.accountId) === next) { getThreadBindingsState().persistQueueByAccountId.delete(params.accountId); } }); return next; } function persistBindingsSafely(params: { accountId: string; persist: boolean; bindings?: TelegramThreadBindingRecord[]; reason: string; }): void { void enqueuePersistBindings(params).catch((err) => { logVerbose( `telegram thread bindings persist failed (${params.accountId}, ${params.reason}): ${String(err)}`, ); }); } function normalizeTimestampMs(raw: unknown): number { if (typeof raw !== "number" || !Number.isFinite(raw)) { return Date.now(); } return Math.max(0, Math.floor(raw)); } function shouldExpireByIdle(params: { now: number; record: TelegramThreadBindingRecord; defaultIdleTimeoutMs: number; }): boolean { const idleTimeoutMs = typeof params.record.idleTimeoutMs === "number" ? Math.max(0, Math.floor(params.record.idleTimeoutMs)) : params.defaultIdleTimeoutMs; if (idleTimeoutMs <= 0) { return false; } return ( params.now >= Math.max(params.record.lastActivityAt, params.record.boundAt) + idleTimeoutMs ); } function shouldExpireByMaxAge(params: { now: number; record: TelegramThreadBindingRecord; defaultMaxAgeMs: number; }): boolean { const maxAgeMs = typeof params.record.maxAgeMs === "number" ? Math.max(0, Math.floor(params.record.maxAgeMs)) : params.defaultMaxAgeMs; if (maxAgeMs <= 0) { return false; } return params.now >= params.record.boundAt + maxAgeMs; } export function createTelegramThreadBindingManager( params: { accountId?: string; persist?: boolean; idleTimeoutMs?: number; maxAgeMs?: number; enableSweeper?: boolean; } = {}, ): TelegramThreadBindingManager { const accountId = normalizeAccountId(params.accountId); const existing = getThreadBindingsState().managersByAccountId.get(accountId); if (existing) { return existing; } const persist = params.persist ?? true; const idleTimeoutMs = normalizeDurationMs( params.idleTimeoutMs, DEFAULT_THREAD_BINDING_IDLE_TIMEOUT_MS, ); const maxAgeMs = normalizeDurationMs(params.maxAgeMs, DEFAULT_THREAD_BINDING_MAX_AGE_MS); const loaded = loadBindingsFromDisk(accountId); for (const entry of loaded) { const key = resolveBindingKey({ accountId, conversationId: entry.conversationId, }); getThreadBindingsState().bindingsByAccountConversation.set(key, { ...entry, accountId, }); } let sweepTimer: NodeJS.Timeout | null = null; const manager: TelegramThreadBindingManager = { accountId, shouldPersistMutations: () => persist, getIdleTimeoutMs: () => idleTimeoutMs, getMaxAgeMs: () => maxAgeMs, getByConversationId: (conversationIdRaw) => { const conversationId = normalizeConversationId(conversationIdRaw); if (!conversationId) { return undefined; } return getThreadBindingsState().bindingsByAccountConversation.get( resolveBindingKey({ accountId, conversationId, }), ); }, listBySessionKey: (targetSessionKeyRaw) => { const targetSessionKey = targetSessionKeyRaw.trim(); if (!targetSessionKey) { return []; } return listBindingsForAccount(accountId).filter( (entry) => entry.targetSessionKey === targetSessionKey, ); }, listBindings: () => listBindingsForAccount(accountId), touchConversation: (conversationIdRaw, at) => { const conversationId = normalizeConversationId(conversationIdRaw); if (!conversationId) { return null; } const key = resolveBindingKey({ accountId, conversationId }); const existing = getThreadBindingsState().bindingsByAccountConversation.get(key); if (!existing) { return null; } const nextRecord: TelegramThreadBindingRecord = { ...existing, lastActivityAt: normalizeTimestampMs(at ?? Date.now()), }; getThreadBindingsState().bindingsByAccountConversation.set(key, nextRecord); persistBindingsSafely({ accountId, persist: manager.shouldPersistMutations(), bindings: listBindingsForAccount(accountId), reason: "touch", }); return nextRecord; }, unbindConversation: (unbindParams) => { const conversationId = normalizeConversationId(unbindParams.conversationId); if (!conversationId) { return null; } const key = resolveBindingKey({ accountId, conversationId }); const removed = getThreadBindingsState().bindingsByAccountConversation.get(key) ?? null; if (!removed) { return null; } getThreadBindingsState().bindingsByAccountConversation.delete(key); persistBindingsSafely({ accountId, persist: manager.shouldPersistMutations(), bindings: listBindingsForAccount(accountId), reason: "unbind-conversation", }); return removed; }, unbindBySessionKey: (unbindParams) => { const targetSessionKey = unbindParams.targetSessionKey.trim(); if (!targetSessionKey) { return []; } const removed: TelegramThreadBindingRecord[] = []; for (const entry of listBindingsForAccount(accountId)) { if (entry.targetSessionKey !== targetSessionKey) { continue; } const key = resolveBindingKey({ accountId, conversationId: entry.conversationId, }); getThreadBindingsState().bindingsByAccountConversation.delete(key); removed.push(entry); } if (removed.length > 0) { persistBindingsSafely({ accountId, persist: manager.shouldPersistMutations(), bindings: listBindingsForAccount(accountId), reason: "unbind-session", }); } return removed; }, stop: () => { if (sweepTimer) { clearInterval(sweepTimer); sweepTimer = null; } unregisterSessionBindingAdapter({ channel: "telegram", accountId, adapter: sessionBindingAdapter, }); const existingManager = getThreadBindingsState().managersByAccountId.get(accountId); if (existingManager === manager) { getThreadBindingsState().managersByAccountId.delete(accountId); } }, }; const sessionBindingAdapter: SessionBindingAdapter = { channel: "telegram", accountId, capabilities: { placements: ["current"], }, bind: async (input) => { if (input.conversation.channel !== "telegram") { return null; } if (input.placement === "child") { return null; } const conversationId = normalizeConversationId(input.conversation.conversationId); const targetSessionKey = input.targetSessionKey.trim(); if (!conversationId || !targetSessionKey) { return null; } const record = fromSessionBindingInput({ accountId, input: { targetSessionKey, targetKind: input.targetKind, conversationId, metadata: input.metadata, }, }); getThreadBindingsState().bindingsByAccountConversation.set( resolveBindingKey({ accountId, conversationId }), record, ); await enqueuePersistBindings({ accountId, persist: manager.shouldPersistMutations(), bindings: listBindingsForAccount(accountId), }); logVerbose( `telegram: bound conversation ${conversationId} -> ${targetSessionKey} (${summarizeLifecycleForLog( record, { idleTimeoutMs, maxAgeMs, }, )})`, ); return toSessionBindingRecord(record, { idleTimeoutMs, maxAgeMs, }); }, listBySession: (targetSessionKeyRaw) => { const targetSessionKey = targetSessionKeyRaw.trim(); if (!targetSessionKey) { return []; } return manager.listBySessionKey(targetSessionKey).map((entry) => toSessionBindingRecord(entry, { idleTimeoutMs, maxAgeMs, }), ); }, resolveByConversation: (ref) => { if (ref.channel !== "telegram") { return null; } const conversationId = normalizeConversationId(ref.conversationId); if (!conversationId) { return null; } const record = manager.getByConversationId(conversationId); return record ? toSessionBindingRecord(record, { idleTimeoutMs, maxAgeMs, }) : null; }, touch: (bindingId, at) => { const conversationId = resolveThreadBindingConversationIdFromBindingId({ accountId, bindingId, }); if (!conversationId) { return; } manager.touchConversation(conversationId, at); }, unbind: async (input) => { if (input.targetSessionKey?.trim()) { const removed = manager.unbindBySessionKey({ targetSessionKey: input.targetSessionKey, reason: input.reason, sendFarewell: false, }); if (removed.length > 0) { await enqueuePersistBindings({ accountId, persist: manager.shouldPersistMutations(), bindings: listBindingsForAccount(accountId), }); } return removed.map((entry) => toSessionBindingRecord(entry, { idleTimeoutMs, maxAgeMs, }), ); } const conversationId = resolveThreadBindingConversationIdFromBindingId({ accountId, bindingId: input.bindingId, }); if (!conversationId) { return []; } const removed = manager.unbindConversation({ conversationId, reason: input.reason, sendFarewell: false, }); if (removed) { await enqueuePersistBindings({ accountId, persist: manager.shouldPersistMutations(), bindings: listBindingsForAccount(accountId), }); } return removed ? [ toSessionBindingRecord(removed, { idleTimeoutMs, maxAgeMs, }), ] : []; }, }; registerSessionBindingAdapter(sessionBindingAdapter); const sweeperEnabled = params.enableSweeper !== false; if (sweeperEnabled) { sweepTimer = setInterval(() => { const now = Date.now(); for (const record of listBindingsForAccount(accountId)) { const idleExpired = shouldExpireByIdle({ now, record, defaultIdleTimeoutMs: idleTimeoutMs, }); const maxAgeExpired = shouldExpireByMaxAge({ now, record, defaultMaxAgeMs: maxAgeMs, }); if (!idleExpired && !maxAgeExpired) { continue; } manager.unbindConversation({ conversationId: record.conversationId, reason: idleExpired ? "idle-expired" : "max-age-expired", sendFarewell: false, }); } }, THREAD_BINDINGS_SWEEP_INTERVAL_MS); sweepTimer.unref?.(); } getThreadBindingsState().managersByAccountId.set(accountId, manager); return manager; } export function getTelegramThreadBindingManager( accountId?: string, ): TelegramThreadBindingManager | null { return getThreadBindingsState().managersByAccountId.get(normalizeAccountId(accountId)) ?? null; } function updateTelegramBindingsBySessionKey(params: { manager: TelegramThreadBindingManager; targetSessionKey: string; update: (entry: TelegramThreadBindingRecord, now: number) => TelegramThreadBindingRecord; }): TelegramThreadBindingRecord[] { const targetSessionKey = params.targetSessionKey.trim(); if (!targetSessionKey) { return []; } const now = Date.now(); const updated: TelegramThreadBindingRecord[] = []; for (const entry of params.manager.listBySessionKey(targetSessionKey)) { const key = resolveBindingKey({ accountId: params.manager.accountId, conversationId: entry.conversationId, }); const next = params.update(entry, now); getThreadBindingsState().bindingsByAccountConversation.set(key, next); updated.push(next); } if (updated.length > 0) { persistBindingsSafely({ accountId: params.manager.accountId, persist: params.manager.shouldPersistMutations(), bindings: listBindingsForAccount(params.manager.accountId), reason: "session-lifecycle-update", }); } return updated; } export function setTelegramThreadBindingIdleTimeoutBySessionKey(params: { targetSessionKey: string; accountId?: string; idleTimeoutMs: number; }): TelegramThreadBindingRecord[] { const manager = getTelegramThreadBindingManager(params.accountId); if (!manager) { return []; } const idleTimeoutMs = normalizeDurationMs(params.idleTimeoutMs, 0); return updateTelegramBindingsBySessionKey({ manager, targetSessionKey: params.targetSessionKey, update: (entry, now) => ({ ...entry, idleTimeoutMs, lastActivityAt: now, }), }); } export function setTelegramThreadBindingMaxAgeBySessionKey(params: { targetSessionKey: string; accountId?: string; maxAgeMs: number; }): TelegramThreadBindingRecord[] { const manager = getTelegramThreadBindingManager(params.accountId); if (!manager) { return []; } const maxAgeMs = normalizeDurationMs(params.maxAgeMs, 0); return updateTelegramBindingsBySessionKey({ manager, targetSessionKey: params.targetSessionKey, update: (entry, now) => ({ ...entry, maxAgeMs, lastActivityAt: now, }), }); } export const __testing = { async resetTelegramThreadBindingsForTests() { for (const manager of getThreadBindingsState().managersByAccountId.values()) { manager.stop(); } await Promise.allSettled(getThreadBindingsState().persistQueueByAccountId.values()); getThreadBindingsState().persistQueueByAccountId.clear(); getThreadBindingsState().managersByAccountId.clear(); getThreadBindingsState().bindingsByAccountConversation.clear(); }, };