import { DM_GROUP_ACCESS_REASON, DEFAULT_GROUP_HISTORY_LIMIT, type HistoryEntry, KeyedAsyncQueue, buildPendingHistoryContextFromMap, clearHistoryEntriesIfEnabled, recordPendingHistoryEntryIfEnabled, resolveDmGroupAccessWithLists, } from "openclaw/plugin-sdk/compat"; import type { MarkdownTableMode, OpenClawConfig, OutboundReplyPayload, RuntimeEnv, } from "openclaw/plugin-sdk/zalouser"; import { createTypingCallbacks, createScopedPairingAccess, createReplyPrefixOptions, evaluateGroupRouteAccessForPolicy, isDangerousNameMatchingEnabled, issuePairingChallenge, resolveOutboundMediaUrls, mergeAllowlist, resolveMentionGatingWithBypass, resolveOpenProviderRuntimeGroupPolicy, resolveDefaultGroupPolicy, resolveSenderCommandAuthorization, resolveSenderScopedGroupPolicy, sendMediaWithLeadingCaption, summarizeMapping, warnMissingProviderGroupPolicyFallbackOnce, } from "openclaw/plugin-sdk/zalouser"; import { createDeferred } from "../../shared/deferred.js"; import { buildZalouserGroupCandidates, findZalouserGroupEntry, isZalouserGroupEntryAllowed, } from "./group-policy.js"; import { formatZalouserMessageSidFull, resolveZalouserMessageSid } from "./message-sid.js"; import { getZalouserRuntime } from "./runtime.js"; import { sendDeliveredZalouser, sendMessageZalouser, sendSeenZalouser, sendTypingZalouser, } from "./send.js"; import type { ResolvedZalouserAccount, ZaloInboundMessage } from "./types.js"; import { listZaloFriends, listZaloGroups, resolveZaloGroupContext, startZaloListener, } from "./zalo-js.js"; export type ZalouserMonitorOptions = { account: ResolvedZalouserAccount; config: OpenClawConfig; runtime: RuntimeEnv; abortSignal: AbortSignal; statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void; }; export type ZalouserMonitorResult = { stop: () => void; }; const ZALOUSER_TEXT_LIMIT = 2000; function normalizeZalouserEntry(entry: string): string { return entry.replace(/^(zalouser|zlu):/i, "").trim(); } function buildNameIndex(items: T[], nameFn: (item: T) => string | undefined): Map { const index = new Map(); for (const item of items) { const name = nameFn(item)?.trim().toLowerCase(); if (!name) { continue; } const list = index.get(name) ?? []; list.push(item); index.set(name, list); } return index; } function resolveUserAllowlistEntries( entries: string[], byName: Map>, ): { additions: string[]; mapping: string[]; unresolved: string[]; } { const additions: string[] = []; const mapping: string[] = []; const unresolved: string[] = []; for (const entry of entries) { if (/^\d+$/.test(entry)) { additions.push(entry); continue; } const matches = byName.get(entry.toLowerCase()) ?? []; const match = matches[0]; const id = match?.userId ? String(match.userId) : undefined; if (id) { additions.push(id); mapping.push(`${entry}->${id}`); } else { unresolved.push(entry); } } return { additions, mapping, unresolved }; } type ZalouserCoreRuntime = ReturnType; type ZalouserGroupHistoryState = { historyLimit: number; groupHistories: Map; }; function resolveInboundQueueKey(message: ZaloInboundMessage): string { const threadId = message.threadId?.trim() || "unknown"; if (message.isGroup) { return `group:${threadId}`; } const senderId = message.senderId?.trim(); return `direct:${senderId || threadId}`; } function resolveZalouserDmSessionScope(config: OpenClawConfig) { const configured = config.session?.dmScope; return configured === "main" || !configured ? "per-channel-peer" : configured; } function resolveZalouserInboundSessionKey(params: { core: ZalouserCoreRuntime; config: OpenClawConfig; route: { agentId: string; accountId: string; sessionKey: string }; storePath: string; isGroup: boolean; senderId: string; }): string { if (params.isGroup) { return params.route.sessionKey; } const directSessionKey = params.core.channel.routing .buildAgentSessionKey({ agentId: params.route.agentId, channel: "zalouser", accountId: params.route.accountId, peer: { kind: "direct", id: params.senderId }, dmScope: resolveZalouserDmSessionScope(params.config), identityLinks: params.config.session?.identityLinks, }) .toLowerCase(); const legacySessionKey = params.core.channel.routing .buildAgentSessionKey({ agentId: params.route.agentId, channel: "zalouser", accountId: params.route.accountId, peer: { kind: "group", id: params.senderId }, }) .toLowerCase(); const hasDirectSession = params.core.channel.session.readSessionUpdatedAt({ storePath: params.storePath, sessionKey: directSessionKey, }) !== undefined; const hasLegacySession = params.core.channel.session.readSessionUpdatedAt({ storePath: params.storePath, sessionKey: legacySessionKey, }) !== undefined; // Keep existing DM history on upgrade, but use canonical direct keys for new sessions. return hasLegacySession && !hasDirectSession ? legacySessionKey : directSessionKey; } function logVerbose(core: ZalouserCoreRuntime, runtime: RuntimeEnv, message: string): void { if (core.logging.shouldLogVerbose()) { runtime.log(`[zalouser] ${message}`); } } function isSenderAllowed(senderId: string | undefined, allowFrom: string[]): boolean { if (allowFrom.includes("*")) { return true; } const normalizedSenderId = senderId?.trim().toLowerCase(); if (!normalizedSenderId) { return false; } return allowFrom.some((entry) => { const normalized = entry.toLowerCase().replace(/^(zalouser|zlu):/i, ""); return normalized === normalizedSenderId; }); } function resolveGroupRequireMention(params: { groupId: string; groupName?: string | null; groups: Record; allowNameMatching?: boolean; }): boolean { const entry = findZalouserGroupEntry( params.groups ?? {}, buildZalouserGroupCandidates({ groupId: params.groupId, groupName: params.groupName, includeGroupIdAlias: true, includeWildcard: true, allowNameMatching: params.allowNameMatching, }), ); if (typeof entry?.requireMention === "boolean") { return entry.requireMention; } return true; } async function sendZalouserDeliveryAcks(params: { profile: string; isGroup: boolean; message: NonNullable; }): Promise { await sendDeliveredZalouser({ profile: params.profile, isGroup: params.isGroup, message: params.message, isSeen: true, }); await sendSeenZalouser({ profile: params.profile, isGroup: params.isGroup, message: params.message, }); } async function processMessage( message: ZaloInboundMessage, account: ResolvedZalouserAccount, config: OpenClawConfig, core: ZalouserCoreRuntime, runtime: RuntimeEnv, historyState: ZalouserGroupHistoryState, statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void, ): Promise { const pairing = createScopedPairingAccess({ core, channel: "zalouser", accountId: account.accountId, }); const rawBody = message.content?.trim(); if (!rawBody) { return; } const commandBody = message.commandContent?.trim() || rawBody; const isGroup = message.isGroup; const chatId = message.threadId; const senderId = message.senderId?.trim(); if (!senderId) { logVerbose(core, runtime, `zalouser: drop message ${chatId} (missing senderId)`); return; } const senderName = message.senderName ?? ""; const configuredGroupName = message.groupName?.trim() || ""; const groupContext = isGroup && !configuredGroupName ? await resolveZaloGroupContext(account.profile, chatId).catch((err) => { logVerbose( core, runtime, `zalouser: group context lookup failed for ${chatId}: ${String(err)}`, ); return null; }) : null; const groupName = configuredGroupName || groupContext?.name?.trim() || ""; const groupMembers = groupContext?.members?.slice(0, 20).join(", ") || undefined; if (message.eventMessage) { try { await sendZalouserDeliveryAcks({ profile: account.profile, isGroup, message: message.eventMessage, }); } catch (err) { logVerbose(core, runtime, `zalouser: delivery/seen ack failed for ${chatId}: ${String(err)}`); } } const defaultGroupPolicy = resolveDefaultGroupPolicy(config); const { groupPolicy, providerMissingFallbackApplied } = resolveOpenProviderRuntimeGroupPolicy({ providerConfigPresent: config.channels?.zalouser !== undefined, groupPolicy: account.config.groupPolicy, defaultGroupPolicy, }); warnMissingProviderGroupPolicyFallbackOnce({ providerMissingFallbackApplied, providerKey: "zalouser", accountId: account.accountId, log: (entry) => logVerbose(core, runtime, entry), }); const groups = account.config.groups ?? {}; const allowNameMatching = isDangerousNameMatchingEnabled(account.config); if (isGroup) { const groupEntry = findZalouserGroupEntry( groups, buildZalouserGroupCandidates({ groupId: chatId, groupName, includeGroupIdAlias: true, includeWildcard: true, allowNameMatching, }), ); const routeAccess = evaluateGroupRouteAccessForPolicy({ groupPolicy, routeAllowlistConfigured: Object.keys(groups).length > 0, routeMatched: Boolean(groupEntry), routeEnabled: isZalouserGroupEntryAllowed(groupEntry), }); if (!routeAccess.allowed) { if (routeAccess.reason === "disabled") { logVerbose(core, runtime, `zalouser: drop group ${chatId} (groupPolicy=disabled)`); } else if (routeAccess.reason === "empty_allowlist") { logVerbose( core, runtime, `zalouser: drop group ${chatId} (groupPolicy=allowlist, no allowlist)`, ); } else if (routeAccess.reason === "route_not_allowlisted") { logVerbose(core, runtime, `zalouser: drop group ${chatId} (not allowlisted)`); } else if (routeAccess.reason === "route_disabled") { logVerbose(core, runtime, `zalouser: drop group ${chatId} (group disabled)`); } return; } } const dmPolicy = account.config.dmPolicy ?? "pairing"; const configAllowFrom = (account.config.allowFrom ?? []).map((v) => String(v)); const configGroupAllowFrom = (account.config.groupAllowFrom ?? []).map((v) => String(v)); const senderGroupPolicy = resolveSenderScopedGroupPolicy({ groupPolicy, groupAllowFrom: configGroupAllowFrom, }); const shouldComputeCommandAuth = core.channel.commands.shouldComputeCommandAuthorized( commandBody, config, ); const storeAllowFrom = !isGroup && dmPolicy !== "allowlist" && (dmPolicy !== "open" || shouldComputeCommandAuth) ? await pairing.readAllowFromStore().catch(() => []) : []; const accessDecision = resolveDmGroupAccessWithLists({ isGroup, dmPolicy, groupPolicy: senderGroupPolicy, allowFrom: configAllowFrom, groupAllowFrom: configGroupAllowFrom, storeAllowFrom, groupAllowFromFallbackToAllowFrom: false, isSenderAllowed: (allowFrom) => isSenderAllowed(senderId, allowFrom), }); if (isGroup && accessDecision.decision !== "allow") { if (accessDecision.reasonCode === DM_GROUP_ACCESS_REASON.GROUP_POLICY_EMPTY_ALLOWLIST) { logVerbose(core, runtime, "Blocked zalouser group message (no group allowlist)"); } else if (accessDecision.reasonCode === DM_GROUP_ACCESS_REASON.GROUP_POLICY_NOT_ALLOWLISTED) { logVerbose( core, runtime, `Blocked zalouser sender ${senderId} (not in groupAllowFrom/allowFrom)`, ); } return; } if (!isGroup && accessDecision.decision !== "allow") { if (accessDecision.decision === "pairing") { await issuePairingChallenge({ channel: "zalouser", senderId, senderIdLine: `Your Zalo user id: ${senderId}`, meta: { name: senderName || undefined }, upsertPairingRequest: pairing.upsertPairingRequest, onCreated: () => { logVerbose(core, runtime, `zalouser pairing request sender=${senderId}`); }, sendPairingReply: async (text) => { await sendMessageZalouser(chatId, text, { profile: account.profile }); statusSink?.({ lastOutboundAt: Date.now() }); }, onReplyError: (err) => { logVerbose( core, runtime, `zalouser pairing reply failed for ${senderId}: ${String(err)}`, ); }, }); return; } if (accessDecision.reasonCode === DM_GROUP_ACCESS_REASON.DM_POLICY_DISABLED) { logVerbose(core, runtime, `Blocked zalouser DM from ${senderId} (dmPolicy=disabled)`); } else { logVerbose( core, runtime, `Blocked unauthorized zalouser sender ${senderId} (dmPolicy=${dmPolicy})`, ); } return; } const { commandAuthorized } = await resolveSenderCommandAuthorization({ cfg: config, rawBody: commandBody, isGroup, dmPolicy, configuredAllowFrom: configAllowFrom, configuredGroupAllowFrom: configGroupAllowFrom, senderId, isSenderAllowed, readAllowFromStore: async () => storeAllowFrom, shouldComputeCommandAuthorized: (body, cfg) => core.channel.commands.shouldComputeCommandAuthorized(body, cfg), resolveCommandAuthorizedFromAuthorizers: (params) => core.channel.commands.resolveCommandAuthorizedFromAuthorizers(params), }); const hasControlCommand = core.channel.commands.isControlCommandMessage(commandBody, config); if (isGroup && hasControlCommand && commandAuthorized !== true) { logVerbose( core, runtime, `zalouser: drop control command from unauthorized sender ${senderId}`, ); return; } const peer = isGroup ? { kind: "group" as const, id: chatId } : { kind: "direct" as const, id: senderId }; const route = core.channel.routing.resolveAgentRoute({ cfg: config, channel: "zalouser", accountId: account.accountId, peer: { // Keep DM peer kind as "direct" so session keys follow dmScope and UI labels stay DM-shaped. kind: peer.kind, id: peer.id, }, }); const historyKey = isGroup ? route.sessionKey : undefined; const requireMention = isGroup ? resolveGroupRequireMention({ groupId: chatId, groupName, groups, allowNameMatching, }) : false; const mentionRegexes = core.channel.mentions.buildMentionRegexes(config, route.agentId); const explicitMention = { hasAnyMention: message.hasAnyMention === true, isExplicitlyMentioned: message.wasExplicitlyMentioned === true, canResolveExplicit: message.canResolveExplicitMention === true, }; const wasMentioned = isGroup ? core.channel.mentions.matchesMentionWithExplicit({ text: rawBody, mentionRegexes, explicit: explicitMention, }) : true; const canDetectMention = mentionRegexes.length > 0 || explicitMention.canResolveExplicit; const mentionGate = resolveMentionGatingWithBypass({ isGroup, requireMention, canDetectMention, wasMentioned, implicitMention: message.implicitMention === true, hasAnyMention: explicitMention.hasAnyMention, allowTextCommands: core.channel.commands.shouldHandleTextCommands({ cfg: config, surface: "zalouser", }), hasControlCommand, commandAuthorized: commandAuthorized === true, }); if (isGroup && requireMention && !canDetectMention && !mentionGate.effectiveWasMentioned) { runtime.error?.( `[${account.accountId}] zalouser mention required but detection unavailable ` + `(missing mention regexes and bot self id); dropping group ${chatId}`, ); return; } if (isGroup && mentionGate.shouldSkip) { recordPendingHistoryEntryIfEnabled({ historyMap: historyState.groupHistories, historyKey: historyKey ?? "", limit: historyState.historyLimit, entry: historyKey && rawBody ? { sender: senderName || senderId, body: rawBody, timestamp: message.timestampMs, messageId: resolveZalouserMessageSid({ msgId: message.msgId, cliMsgId: message.cliMsgId, fallback: `${message.timestampMs}`, }), } : null, }); logVerbose(core, runtime, `zalouser: skip group ${chatId} (mention required, not mentioned)`); return; } const fromLabel = isGroup ? groupName || `group:${chatId}` : senderName || `user:${senderId}`; const storePath = core.channel.session.resolveStorePath(config.session?.store, { agentId: route.agentId, }); const inboundSessionKey = resolveZalouserInboundSessionKey({ core, config, route, storePath, isGroup, senderId, }); const envelopeOptions = core.channel.reply.resolveEnvelopeFormatOptions(config); const previousTimestamp = core.channel.session.readSessionUpdatedAt({ storePath, sessionKey: inboundSessionKey, }); const body = core.channel.reply.formatAgentEnvelope({ channel: "Zalo Personal", from: fromLabel, timestamp: message.timestampMs, previousTimestamp, envelope: envelopeOptions, body: rawBody, }); const combinedBody = isGroup && historyKey ? buildPendingHistoryContextFromMap({ historyMap: historyState.groupHistories, historyKey, limit: historyState.historyLimit, currentMessage: body, formatEntry: (entry) => core.channel.reply.formatAgentEnvelope({ channel: "Zalo Personal", from: fromLabel, timestamp: entry.timestamp, envelope: envelopeOptions, body: `${entry.sender}: ${entry.body}${ entry.messageId ? ` [id:${entry.messageId}]` : "" }`, }), }) : body; const inboundHistory = isGroup && historyKey && historyState.historyLimit > 0 ? (historyState.groupHistories.get(historyKey) ?? []).map((entry) => ({ sender: entry.sender, body: entry.body, timestamp: entry.timestamp, })) : undefined; const normalizedTo = isGroup ? `zalouser:group:${chatId}` : `zalouser:${chatId}`; const ctxPayload = core.channel.reply.finalizeInboundContext({ Body: combinedBody, BodyForAgent: rawBody, InboundHistory: inboundHistory, RawBody: rawBody, CommandBody: commandBody, BodyForCommands: commandBody, From: isGroup ? `zalouser:group:${chatId}` : `zalouser:${senderId}`, To: normalizedTo, SessionKey: inboundSessionKey, AccountId: route.accountId, ChatType: isGroup ? "group" : "direct", ConversationLabel: fromLabel, GroupSubject: isGroup ? groupName || undefined : undefined, GroupChannel: isGroup ? groupName || undefined : undefined, GroupMembers: isGroup ? groupMembers : undefined, SenderName: senderName || undefined, SenderId: senderId, WasMentioned: isGroup ? mentionGate.effectiveWasMentioned : undefined, CommandAuthorized: commandAuthorized, Provider: "zalouser", Surface: "zalouser", MessageSid: resolveZalouserMessageSid({ msgId: message.msgId, cliMsgId: message.cliMsgId, fallback: `${message.timestampMs}`, }), MessageSidFull: formatZalouserMessageSidFull({ msgId: message.msgId, cliMsgId: message.cliMsgId, }), OriginatingChannel: "zalouser", OriginatingTo: normalizedTo, }); await core.channel.session.recordInboundSession({ storePath, sessionKey: ctxPayload.SessionKey ?? route.sessionKey, ctx: ctxPayload, onRecordError: (err) => { runtime.error?.(`zalouser: failed updating session meta: ${String(err)}`); }, }); const { onModelSelected, ...prefixOptions } = createReplyPrefixOptions({ cfg: config, agentId: route.agentId, channel: "zalouser", accountId: account.accountId, }); const typingCallbacks = createTypingCallbacks({ start: async () => { await sendTypingZalouser(chatId, { profile: account.profile, isGroup, }); }, onStartError: (err) => { runtime.error?.( `[${account.accountId}] zalouser typing start failed for ${chatId}: ${String(err)}`, ); logVerbose(core, runtime, `zalouser typing failed for ${chatId}: ${String(err)}`); }, }); await core.channel.reply.dispatchReplyWithBufferedBlockDispatcher({ ctx: ctxPayload, cfg: config, dispatcherOptions: { ...prefixOptions, typingCallbacks, deliver: async (payload) => { await deliverZalouserReply({ payload: payload as { text?: string; mediaUrls?: string[]; mediaUrl?: string }, profile: account.profile, chatId, isGroup, runtime, core, config, accountId: account.accountId, statusSink, tableMode: core.channel.text.resolveMarkdownTableMode({ cfg: config, channel: "zalouser", accountId: account.accountId, }), }); }, onError: (err, info) => { runtime.error(`[${account.accountId}] Zalouser ${info.kind} reply failed: ${String(err)}`); }, }, replyOptions: { onModelSelected, }, }); if (isGroup && historyKey) { clearHistoryEntriesIfEnabled({ historyMap: historyState.groupHistories, historyKey, limit: historyState.historyLimit, }); } } async function deliverZalouserReply(params: { payload: OutboundReplyPayload; profile: string; chatId: string; isGroup: boolean; runtime: RuntimeEnv; core: ZalouserCoreRuntime; config: OpenClawConfig; accountId?: string; statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void; tableMode?: MarkdownTableMode; }): Promise { const { payload, profile, chatId, isGroup, runtime, core, config, accountId, statusSink } = params; const tableMode = params.tableMode ?? "code"; const text = core.channel.text.convertMarkdownTables(payload.text ?? "", tableMode); const chunkMode = core.channel.text.resolveChunkMode(config, "zalouser", accountId); const textChunkLimit = core.channel.text.resolveTextChunkLimit(config, "zalouser", accountId, { fallbackLimit: ZALOUSER_TEXT_LIMIT, }); const sentMedia = await sendMediaWithLeadingCaption({ mediaUrls: resolveOutboundMediaUrls(payload), caption: text, send: async ({ mediaUrl, caption }) => { logVerbose(core, runtime, `Sending media to ${chatId}`); await sendMessageZalouser(chatId, caption ?? "", { profile, mediaUrl, isGroup, textMode: "markdown", textChunkMode: chunkMode, textChunkLimit, }); statusSink?.({ lastOutboundAt: Date.now() }); }, onError: (error) => { runtime.error(`Zalouser media send failed: ${String(error)}`); }, }); if (sentMedia) { return; } if (text) { try { await sendMessageZalouser(chatId, text, { profile, isGroup, textMode: "markdown", textChunkMode: chunkMode, textChunkLimit, }); statusSink?.({ lastOutboundAt: Date.now() }); } catch (err) { runtime.error(`Zalouser message send failed: ${String(err)}`); } } } export async function monitorZalouserProvider( options: ZalouserMonitorOptions, ): Promise { let { account, config } = options; const { abortSignal, statusSink, runtime } = options; const core = getZalouserRuntime(); const inboundQueue = new KeyedAsyncQueue(); const historyLimit = Math.max( 0, account.config.historyLimit ?? config.messages?.groupChat?.historyLimit ?? DEFAULT_GROUP_HISTORY_LIMIT, ); const groupHistories = new Map(); try { const profile = account.profile; const allowFromEntries = (account.config.allowFrom ?? []) .map((entry) => normalizeZalouserEntry(String(entry))) .filter((entry) => entry && entry !== "*"); const groupAllowFromEntries = (account.config.groupAllowFrom ?? []) .map((entry) => normalizeZalouserEntry(String(entry))) .filter((entry) => entry && entry !== "*"); if (allowFromEntries.length > 0 || groupAllowFromEntries.length > 0) { const friends = await listZaloFriends(profile); const byName = buildNameIndex(friends, (friend) => friend.displayName); if (allowFromEntries.length > 0) { const { additions, mapping, unresolved } = resolveUserAllowlistEntries( allowFromEntries, byName, ); const allowFrom = mergeAllowlist({ existing: account.config.allowFrom, additions }); account = { ...account, config: { ...account.config, allowFrom, }, }; summarizeMapping("zalouser users", mapping, unresolved, runtime); } if (groupAllowFromEntries.length > 0) { const { additions, mapping, unresolved } = resolveUserAllowlistEntries( groupAllowFromEntries, byName, ); const groupAllowFrom = mergeAllowlist({ existing: account.config.groupAllowFrom, additions, }); account = { ...account, config: { ...account.config, groupAllowFrom, }, }; summarizeMapping("zalouser group users", mapping, unresolved, runtime); } } const groupsConfig = account.config.groups ?? {}; const groupKeys = Object.keys(groupsConfig).filter((key) => key !== "*"); if (groupKeys.length > 0) { const groups = await listZaloGroups(profile); const byName = buildNameIndex(groups, (group) => group.name); const mapping: string[] = []; const unresolved: string[] = []; const nextGroups = { ...groupsConfig }; for (const entry of groupKeys) { const cleaned = normalizeZalouserEntry(entry); if (/^\d+$/.test(cleaned)) { if (!nextGroups[cleaned]) { nextGroups[cleaned] = groupsConfig[entry]; } mapping.push(`${entry}→${cleaned}`); continue; } const matches = byName.get(cleaned.toLowerCase()) ?? []; const match = matches[0]; const id = match?.groupId ? String(match.groupId) : undefined; if (id) { if (!nextGroups[id]) { nextGroups[id] = groupsConfig[entry]; } mapping.push(`${entry}→${id}`); } else { unresolved.push(entry); } } account = { ...account, config: { ...account.config, groups: nextGroups, }, }; summarizeMapping("zalouser groups", mapping, unresolved, runtime); } } catch (err) { runtime.log?.(`zalouser resolve failed; using config entries. ${String(err)}`); } let listenerStop: (() => void) | null = null; let stopped = false; const stop = () => { if (stopped) { return; } stopped = true; listenerStop?.(); listenerStop = null; }; let settled = false; const { promise: waitForExit, resolve: resolveRun, reject: rejectRun } = createDeferred(); const settleSuccess = () => { if (settled) { return; } settled = true; stop(); resolveRun(); }; const settleFailure = (error: unknown) => { if (settled) { return; } settled = true; stop(); rejectRun(error instanceof Error ? error : new Error(String(error))); }; const onAbort = () => { settleSuccess(); }; abortSignal.addEventListener("abort", onAbort, { once: true }); let listener: Awaited>; try { listener = await startZaloListener({ accountId: account.accountId, profile: account.profile, abortSignal, onMessage: (msg) => { if (stopped) { return; } logVerbose(core, runtime, `[${account.accountId}] inbound message`); statusSink?.({ lastInboundAt: Date.now() }); const queueKey = resolveInboundQueueKey(msg); void inboundQueue .enqueue(queueKey, async () => { if (stopped || abortSignal.aborted) { return; } await processMessage( msg, account, config, core, runtime, { historyLimit, groupHistories }, statusSink, ); }) .catch((err) => { runtime.error(`[${account.accountId}] Failed to process message: ${String(err)}`); }); }, onError: (err) => { if (stopped || abortSignal.aborted) { return; } runtime.error(`[${account.accountId}] Zalo listener error: ${String(err)}`); settleFailure(err); }, }); } catch (error) { abortSignal.removeEventListener("abort", onAbort); throw error; } listenerStop = listener.stop; if (stopped) { listenerStop(); listenerStop = null; } if (abortSignal.aborted) { settleSuccess(); } try { await waitForExit; } finally { abortSignal.removeEventListener("abort", onAbort); } return { stop }; } export const __testing = { processMessage: async (params: { message: ZaloInboundMessage; account: ResolvedZalouserAccount; config: OpenClawConfig; runtime: RuntimeEnv; historyState?: { historyLimit?: number; groupHistories?: Map; }; statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void; }) => { const historyLimit = Math.max( 0, params.historyState?.historyLimit ?? params.account.config.historyLimit ?? params.config.messages?.groupChat?.historyLimit ?? DEFAULT_GROUP_HISTORY_LIMIT, ); const groupHistories = params.historyState?.groupHistories ?? new Map(); await processMessage( params.message, params.account, params.config, getZalouserRuntime(), params.runtime, { historyLimit, groupHistories }, params.statusSink, ); }, };