mirror of https://github.com/openclaw/openclaw.git
991 lines
30 KiB
TypeScript
991 lines
30 KiB
TypeScript
import {
|
|
DM_GROUP_ACCESS_REASON,
|
|
DEFAULT_GROUP_HISTORY_LIMIT,
|
|
type HistoryEntry,
|
|
KeyedAsyncQueue,
|
|
buildPendingHistoryContextFromMap,
|
|
clearHistoryEntriesIfEnabled,
|
|
recordPendingHistoryEntryIfEnabled,
|
|
resolveDmGroupAccessWithLists,
|
|
} from "openclaw/plugin-sdk/compat";
|
|
import type {
|
|
MarkdownTableMode,
|
|
OpenClawConfig,
|
|
OutboundReplyPayload,
|
|
RuntimeEnv,
|
|
} from "openclaw/plugin-sdk/zalouser";
|
|
import {
|
|
createTypingCallbacks,
|
|
createScopedPairingAccess,
|
|
createReplyPrefixOptions,
|
|
evaluateGroupRouteAccessForPolicy,
|
|
isDangerousNameMatchingEnabled,
|
|
issuePairingChallenge,
|
|
resolveOutboundMediaUrls,
|
|
mergeAllowlist,
|
|
resolveMentionGatingWithBypass,
|
|
resolveOpenProviderRuntimeGroupPolicy,
|
|
resolveDefaultGroupPolicy,
|
|
resolveSenderCommandAuthorization,
|
|
resolveSenderScopedGroupPolicy,
|
|
sendMediaWithLeadingCaption,
|
|
summarizeMapping,
|
|
warnMissingProviderGroupPolicyFallbackOnce,
|
|
} from "openclaw/plugin-sdk/zalouser";
|
|
import { createDeferred } from "../../shared/deferred.js";
|
|
import {
|
|
buildZalouserGroupCandidates,
|
|
findZalouserGroupEntry,
|
|
isZalouserGroupEntryAllowed,
|
|
} from "./group-policy.js";
|
|
import { formatZalouserMessageSidFull, resolveZalouserMessageSid } from "./message-sid.js";
|
|
import { getZalouserRuntime } from "./runtime.js";
|
|
import {
|
|
sendDeliveredZalouser,
|
|
sendMessageZalouser,
|
|
sendSeenZalouser,
|
|
sendTypingZalouser,
|
|
} from "./send.js";
|
|
import type { ResolvedZalouserAccount, ZaloInboundMessage } from "./types.js";
|
|
import {
|
|
listZaloFriends,
|
|
listZaloGroups,
|
|
resolveZaloGroupContext,
|
|
startZaloListener,
|
|
} from "./zalo-js.js";
|
|
|
|
export type ZalouserMonitorOptions = {
|
|
account: ResolvedZalouserAccount;
|
|
config: OpenClawConfig;
|
|
runtime: RuntimeEnv;
|
|
abortSignal: AbortSignal;
|
|
statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void;
|
|
};
|
|
|
|
export type ZalouserMonitorResult = {
|
|
stop: () => void;
|
|
};
|
|
|
|
const ZALOUSER_TEXT_LIMIT = 2000;
|
|
|
|
function normalizeZalouserEntry(entry: string): string {
|
|
return entry.replace(/^(zalouser|zlu):/i, "").trim();
|
|
}
|
|
|
|
function buildNameIndex<T>(items: T[], nameFn: (item: T) => string | undefined): Map<string, T[]> {
|
|
const index = new Map<string, T[]>();
|
|
for (const item of items) {
|
|
const name = nameFn(item)?.trim().toLowerCase();
|
|
if (!name) {
|
|
continue;
|
|
}
|
|
const list = index.get(name) ?? [];
|
|
list.push(item);
|
|
index.set(name, list);
|
|
}
|
|
return index;
|
|
}
|
|
|
|
function resolveUserAllowlistEntries(
|
|
entries: string[],
|
|
byName: Map<string, Array<{ userId: string }>>,
|
|
): {
|
|
additions: string[];
|
|
mapping: string[];
|
|
unresolved: string[];
|
|
} {
|
|
const additions: string[] = [];
|
|
const mapping: string[] = [];
|
|
const unresolved: string[] = [];
|
|
for (const entry of entries) {
|
|
if (/^\d+$/.test(entry)) {
|
|
additions.push(entry);
|
|
continue;
|
|
}
|
|
const matches = byName.get(entry.toLowerCase()) ?? [];
|
|
const match = matches[0];
|
|
const id = match?.userId ? String(match.userId) : undefined;
|
|
if (id) {
|
|
additions.push(id);
|
|
mapping.push(`${entry}->${id}`);
|
|
} else {
|
|
unresolved.push(entry);
|
|
}
|
|
}
|
|
return { additions, mapping, unresolved };
|
|
}
|
|
|
|
type ZalouserCoreRuntime = ReturnType<typeof getZalouserRuntime>;
|
|
|
|
type ZalouserGroupHistoryState = {
|
|
historyLimit: number;
|
|
groupHistories: Map<string, HistoryEntry[]>;
|
|
};
|
|
|
|
function resolveInboundQueueKey(message: ZaloInboundMessage): string {
|
|
const threadId = message.threadId?.trim() || "unknown";
|
|
if (message.isGroup) {
|
|
return `group:${threadId}`;
|
|
}
|
|
const senderId = message.senderId?.trim();
|
|
return `direct:${senderId || threadId}`;
|
|
}
|
|
|
|
function resolveZalouserDmSessionScope(config: OpenClawConfig) {
|
|
const configured = config.session?.dmScope;
|
|
return configured === "main" || !configured ? "per-channel-peer" : configured;
|
|
}
|
|
|
|
function resolveZalouserInboundSessionKey(params: {
|
|
core: ZalouserCoreRuntime;
|
|
config: OpenClawConfig;
|
|
route: { agentId: string; accountId: string; sessionKey: string };
|
|
storePath: string;
|
|
isGroup: boolean;
|
|
senderId: string;
|
|
}): string {
|
|
if (params.isGroup) {
|
|
return params.route.sessionKey;
|
|
}
|
|
|
|
const directSessionKey = params.core.channel.routing
|
|
.buildAgentSessionKey({
|
|
agentId: params.route.agentId,
|
|
channel: "zalouser",
|
|
accountId: params.route.accountId,
|
|
peer: { kind: "direct", id: params.senderId },
|
|
dmScope: resolveZalouserDmSessionScope(params.config),
|
|
identityLinks: params.config.session?.identityLinks,
|
|
})
|
|
.toLowerCase();
|
|
const legacySessionKey = params.core.channel.routing
|
|
.buildAgentSessionKey({
|
|
agentId: params.route.agentId,
|
|
channel: "zalouser",
|
|
accountId: params.route.accountId,
|
|
peer: { kind: "group", id: params.senderId },
|
|
})
|
|
.toLowerCase();
|
|
const hasDirectSession =
|
|
params.core.channel.session.readSessionUpdatedAt({
|
|
storePath: params.storePath,
|
|
sessionKey: directSessionKey,
|
|
}) !== undefined;
|
|
const hasLegacySession =
|
|
params.core.channel.session.readSessionUpdatedAt({
|
|
storePath: params.storePath,
|
|
sessionKey: legacySessionKey,
|
|
}) !== undefined;
|
|
|
|
// Keep existing DM history on upgrade, but use canonical direct keys for new sessions.
|
|
return hasLegacySession && !hasDirectSession ? legacySessionKey : directSessionKey;
|
|
}
|
|
|
|
function logVerbose(core: ZalouserCoreRuntime, runtime: RuntimeEnv, message: string): void {
|
|
if (core.logging.shouldLogVerbose()) {
|
|
runtime.log(`[zalouser] ${message}`);
|
|
}
|
|
}
|
|
|
|
function isSenderAllowed(senderId: string | undefined, allowFrom: string[]): boolean {
|
|
if (allowFrom.includes("*")) {
|
|
return true;
|
|
}
|
|
const normalizedSenderId = senderId?.trim().toLowerCase();
|
|
if (!normalizedSenderId) {
|
|
return false;
|
|
}
|
|
return allowFrom.some((entry) => {
|
|
const normalized = entry.toLowerCase().replace(/^(zalouser|zlu):/i, "");
|
|
return normalized === normalizedSenderId;
|
|
});
|
|
}
|
|
|
|
function resolveGroupRequireMention(params: {
|
|
groupId: string;
|
|
groupName?: string | null;
|
|
groups: Record<string, { allow?: boolean; enabled?: boolean; requireMention?: boolean }>;
|
|
allowNameMatching?: boolean;
|
|
}): boolean {
|
|
const entry = findZalouserGroupEntry(
|
|
params.groups ?? {},
|
|
buildZalouserGroupCandidates({
|
|
groupId: params.groupId,
|
|
groupName: params.groupName,
|
|
includeGroupIdAlias: true,
|
|
includeWildcard: true,
|
|
allowNameMatching: params.allowNameMatching,
|
|
}),
|
|
);
|
|
if (typeof entry?.requireMention === "boolean") {
|
|
return entry.requireMention;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
async function sendZalouserDeliveryAcks(params: {
|
|
profile: string;
|
|
isGroup: boolean;
|
|
message: NonNullable<ZaloInboundMessage["eventMessage"]>;
|
|
}): Promise<void> {
|
|
await sendDeliveredZalouser({
|
|
profile: params.profile,
|
|
isGroup: params.isGroup,
|
|
message: params.message,
|
|
isSeen: true,
|
|
});
|
|
await sendSeenZalouser({
|
|
profile: params.profile,
|
|
isGroup: params.isGroup,
|
|
message: params.message,
|
|
});
|
|
}
|
|
|
|
async function processMessage(
|
|
message: ZaloInboundMessage,
|
|
account: ResolvedZalouserAccount,
|
|
config: OpenClawConfig,
|
|
core: ZalouserCoreRuntime,
|
|
runtime: RuntimeEnv,
|
|
historyState: ZalouserGroupHistoryState,
|
|
statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void,
|
|
): Promise<void> {
|
|
const pairing = createScopedPairingAccess({
|
|
core,
|
|
channel: "zalouser",
|
|
accountId: account.accountId,
|
|
});
|
|
|
|
const rawBody = message.content?.trim();
|
|
if (!rawBody) {
|
|
return;
|
|
}
|
|
const commandBody = message.commandContent?.trim() || rawBody;
|
|
|
|
const isGroup = message.isGroup;
|
|
const chatId = message.threadId;
|
|
const senderId = message.senderId?.trim();
|
|
if (!senderId) {
|
|
logVerbose(core, runtime, `zalouser: drop message ${chatId} (missing senderId)`);
|
|
return;
|
|
}
|
|
const senderName = message.senderName ?? "";
|
|
const configuredGroupName = message.groupName?.trim() || "";
|
|
const groupContext =
|
|
isGroup && !configuredGroupName
|
|
? await resolveZaloGroupContext(account.profile, chatId).catch((err) => {
|
|
logVerbose(
|
|
core,
|
|
runtime,
|
|
`zalouser: group context lookup failed for ${chatId}: ${String(err)}`,
|
|
);
|
|
return null;
|
|
})
|
|
: null;
|
|
const groupName = configuredGroupName || groupContext?.name?.trim() || "";
|
|
const groupMembers = groupContext?.members?.slice(0, 20).join(", ") || undefined;
|
|
|
|
if (message.eventMessage) {
|
|
try {
|
|
await sendZalouserDeliveryAcks({
|
|
profile: account.profile,
|
|
isGroup,
|
|
message: message.eventMessage,
|
|
});
|
|
} catch (err) {
|
|
logVerbose(core, runtime, `zalouser: delivery/seen ack failed for ${chatId}: ${String(err)}`);
|
|
}
|
|
}
|
|
|
|
const defaultGroupPolicy = resolveDefaultGroupPolicy(config);
|
|
const { groupPolicy, providerMissingFallbackApplied } = resolveOpenProviderRuntimeGroupPolicy({
|
|
providerConfigPresent: config.channels?.zalouser !== undefined,
|
|
groupPolicy: account.config.groupPolicy,
|
|
defaultGroupPolicy,
|
|
});
|
|
warnMissingProviderGroupPolicyFallbackOnce({
|
|
providerMissingFallbackApplied,
|
|
providerKey: "zalouser",
|
|
accountId: account.accountId,
|
|
log: (entry) => logVerbose(core, runtime, entry),
|
|
});
|
|
|
|
const groups = account.config.groups ?? {};
|
|
const allowNameMatching = isDangerousNameMatchingEnabled(account.config);
|
|
if (isGroup) {
|
|
const groupEntry = findZalouserGroupEntry(
|
|
groups,
|
|
buildZalouserGroupCandidates({
|
|
groupId: chatId,
|
|
groupName,
|
|
includeGroupIdAlias: true,
|
|
includeWildcard: true,
|
|
allowNameMatching,
|
|
}),
|
|
);
|
|
const routeAccess = evaluateGroupRouteAccessForPolicy({
|
|
groupPolicy,
|
|
routeAllowlistConfigured: Object.keys(groups).length > 0,
|
|
routeMatched: Boolean(groupEntry),
|
|
routeEnabled: isZalouserGroupEntryAllowed(groupEntry),
|
|
});
|
|
if (!routeAccess.allowed) {
|
|
if (routeAccess.reason === "disabled") {
|
|
logVerbose(core, runtime, `zalouser: drop group ${chatId} (groupPolicy=disabled)`);
|
|
} else if (routeAccess.reason === "empty_allowlist") {
|
|
logVerbose(
|
|
core,
|
|
runtime,
|
|
`zalouser: drop group ${chatId} (groupPolicy=allowlist, no allowlist)`,
|
|
);
|
|
} else if (routeAccess.reason === "route_not_allowlisted") {
|
|
logVerbose(core, runtime, `zalouser: drop group ${chatId} (not allowlisted)`);
|
|
} else if (routeAccess.reason === "route_disabled") {
|
|
logVerbose(core, runtime, `zalouser: drop group ${chatId} (group disabled)`);
|
|
}
|
|
return;
|
|
}
|
|
}
|
|
|
|
const dmPolicy = account.config.dmPolicy ?? "pairing";
|
|
const configAllowFrom = (account.config.allowFrom ?? []).map((v) => String(v));
|
|
const configGroupAllowFrom = (account.config.groupAllowFrom ?? []).map((v) => String(v));
|
|
const senderGroupPolicy = resolveSenderScopedGroupPolicy({
|
|
groupPolicy,
|
|
groupAllowFrom: configGroupAllowFrom,
|
|
});
|
|
const shouldComputeCommandAuth = core.channel.commands.shouldComputeCommandAuthorized(
|
|
commandBody,
|
|
config,
|
|
);
|
|
const storeAllowFrom =
|
|
!isGroup && dmPolicy !== "allowlist" && (dmPolicy !== "open" || shouldComputeCommandAuth)
|
|
? await pairing.readAllowFromStore().catch(() => [])
|
|
: [];
|
|
const accessDecision = resolveDmGroupAccessWithLists({
|
|
isGroup,
|
|
dmPolicy,
|
|
groupPolicy: senderGroupPolicy,
|
|
allowFrom: configAllowFrom,
|
|
groupAllowFrom: configGroupAllowFrom,
|
|
storeAllowFrom,
|
|
groupAllowFromFallbackToAllowFrom: false,
|
|
isSenderAllowed: (allowFrom) => isSenderAllowed(senderId, allowFrom),
|
|
});
|
|
if (isGroup && accessDecision.decision !== "allow") {
|
|
if (accessDecision.reasonCode === DM_GROUP_ACCESS_REASON.GROUP_POLICY_EMPTY_ALLOWLIST) {
|
|
logVerbose(core, runtime, "Blocked zalouser group message (no group allowlist)");
|
|
} else if (accessDecision.reasonCode === DM_GROUP_ACCESS_REASON.GROUP_POLICY_NOT_ALLOWLISTED) {
|
|
logVerbose(
|
|
core,
|
|
runtime,
|
|
`Blocked zalouser sender ${senderId} (not in groupAllowFrom/allowFrom)`,
|
|
);
|
|
}
|
|
return;
|
|
}
|
|
|
|
if (!isGroup && accessDecision.decision !== "allow") {
|
|
if (accessDecision.decision === "pairing") {
|
|
await issuePairingChallenge({
|
|
channel: "zalouser",
|
|
senderId,
|
|
senderIdLine: `Your Zalo user id: ${senderId}`,
|
|
meta: { name: senderName || undefined },
|
|
upsertPairingRequest: pairing.upsertPairingRequest,
|
|
onCreated: () => {
|
|
logVerbose(core, runtime, `zalouser pairing request sender=${senderId}`);
|
|
},
|
|
sendPairingReply: async (text) => {
|
|
await sendMessageZalouser(chatId, text, { profile: account.profile });
|
|
statusSink?.({ lastOutboundAt: Date.now() });
|
|
},
|
|
onReplyError: (err) => {
|
|
logVerbose(
|
|
core,
|
|
runtime,
|
|
`zalouser pairing reply failed for ${senderId}: ${String(err)}`,
|
|
);
|
|
},
|
|
});
|
|
return;
|
|
}
|
|
if (accessDecision.reasonCode === DM_GROUP_ACCESS_REASON.DM_POLICY_DISABLED) {
|
|
logVerbose(core, runtime, `Blocked zalouser DM from ${senderId} (dmPolicy=disabled)`);
|
|
} else {
|
|
logVerbose(
|
|
core,
|
|
runtime,
|
|
`Blocked unauthorized zalouser sender ${senderId} (dmPolicy=${dmPolicy})`,
|
|
);
|
|
}
|
|
return;
|
|
}
|
|
|
|
const { commandAuthorized } = await resolveSenderCommandAuthorization({
|
|
cfg: config,
|
|
rawBody: commandBody,
|
|
isGroup,
|
|
dmPolicy,
|
|
configuredAllowFrom: configAllowFrom,
|
|
configuredGroupAllowFrom: configGroupAllowFrom,
|
|
senderId,
|
|
isSenderAllowed,
|
|
readAllowFromStore: async () => storeAllowFrom,
|
|
shouldComputeCommandAuthorized: (body, cfg) =>
|
|
core.channel.commands.shouldComputeCommandAuthorized(body, cfg),
|
|
resolveCommandAuthorizedFromAuthorizers: (params) =>
|
|
core.channel.commands.resolveCommandAuthorizedFromAuthorizers(params),
|
|
});
|
|
const hasControlCommand = core.channel.commands.isControlCommandMessage(commandBody, config);
|
|
if (isGroup && hasControlCommand && commandAuthorized !== true) {
|
|
logVerbose(
|
|
core,
|
|
runtime,
|
|
`zalouser: drop control command from unauthorized sender ${senderId}`,
|
|
);
|
|
return;
|
|
}
|
|
|
|
const peer = isGroup
|
|
? { kind: "group" as const, id: chatId }
|
|
: { kind: "direct" as const, id: senderId };
|
|
|
|
const route = core.channel.routing.resolveAgentRoute({
|
|
cfg: config,
|
|
channel: "zalouser",
|
|
accountId: account.accountId,
|
|
peer: {
|
|
// Keep DM peer kind as "direct" so session keys follow dmScope and UI labels stay DM-shaped.
|
|
kind: peer.kind,
|
|
id: peer.id,
|
|
},
|
|
});
|
|
const historyKey = isGroup ? route.sessionKey : undefined;
|
|
|
|
const requireMention = isGroup
|
|
? resolveGroupRequireMention({
|
|
groupId: chatId,
|
|
groupName,
|
|
groups,
|
|
allowNameMatching,
|
|
})
|
|
: false;
|
|
const mentionRegexes = core.channel.mentions.buildMentionRegexes(config, route.agentId);
|
|
const explicitMention = {
|
|
hasAnyMention: message.hasAnyMention === true,
|
|
isExplicitlyMentioned: message.wasExplicitlyMentioned === true,
|
|
canResolveExplicit: message.canResolveExplicitMention === true,
|
|
};
|
|
const wasMentioned = isGroup
|
|
? core.channel.mentions.matchesMentionWithExplicit({
|
|
text: rawBody,
|
|
mentionRegexes,
|
|
explicit: explicitMention,
|
|
})
|
|
: true;
|
|
const canDetectMention = mentionRegexes.length > 0 || explicitMention.canResolveExplicit;
|
|
const mentionGate = resolveMentionGatingWithBypass({
|
|
isGroup,
|
|
requireMention,
|
|
canDetectMention,
|
|
wasMentioned,
|
|
implicitMention: message.implicitMention === true,
|
|
hasAnyMention: explicitMention.hasAnyMention,
|
|
allowTextCommands: core.channel.commands.shouldHandleTextCommands({
|
|
cfg: config,
|
|
surface: "zalouser",
|
|
}),
|
|
hasControlCommand,
|
|
commandAuthorized: commandAuthorized === true,
|
|
});
|
|
if (isGroup && requireMention && !canDetectMention && !mentionGate.effectiveWasMentioned) {
|
|
runtime.error?.(
|
|
`[${account.accountId}] zalouser mention required but detection unavailable ` +
|
|
`(missing mention regexes and bot self id); dropping group ${chatId}`,
|
|
);
|
|
return;
|
|
}
|
|
if (isGroup && mentionGate.shouldSkip) {
|
|
recordPendingHistoryEntryIfEnabled({
|
|
historyMap: historyState.groupHistories,
|
|
historyKey: historyKey ?? "",
|
|
limit: historyState.historyLimit,
|
|
entry:
|
|
historyKey && rawBody
|
|
? {
|
|
sender: senderName || senderId,
|
|
body: rawBody,
|
|
timestamp: message.timestampMs,
|
|
messageId: resolveZalouserMessageSid({
|
|
msgId: message.msgId,
|
|
cliMsgId: message.cliMsgId,
|
|
fallback: `${message.timestampMs}`,
|
|
}),
|
|
}
|
|
: null,
|
|
});
|
|
logVerbose(core, runtime, `zalouser: skip group ${chatId} (mention required, not mentioned)`);
|
|
return;
|
|
}
|
|
|
|
const fromLabel = isGroup ? groupName || `group:${chatId}` : senderName || `user:${senderId}`;
|
|
const storePath = core.channel.session.resolveStorePath(config.session?.store, {
|
|
agentId: route.agentId,
|
|
});
|
|
const inboundSessionKey = resolveZalouserInboundSessionKey({
|
|
core,
|
|
config,
|
|
route,
|
|
storePath,
|
|
isGroup,
|
|
senderId,
|
|
});
|
|
const envelopeOptions = core.channel.reply.resolveEnvelopeFormatOptions(config);
|
|
const previousTimestamp = core.channel.session.readSessionUpdatedAt({
|
|
storePath,
|
|
sessionKey: inboundSessionKey,
|
|
});
|
|
const body = core.channel.reply.formatAgentEnvelope({
|
|
channel: "Zalo Personal",
|
|
from: fromLabel,
|
|
timestamp: message.timestampMs,
|
|
previousTimestamp,
|
|
envelope: envelopeOptions,
|
|
body: rawBody,
|
|
});
|
|
const combinedBody =
|
|
isGroup && historyKey
|
|
? buildPendingHistoryContextFromMap({
|
|
historyMap: historyState.groupHistories,
|
|
historyKey,
|
|
limit: historyState.historyLimit,
|
|
currentMessage: body,
|
|
formatEntry: (entry) =>
|
|
core.channel.reply.formatAgentEnvelope({
|
|
channel: "Zalo Personal",
|
|
from: fromLabel,
|
|
timestamp: entry.timestamp,
|
|
envelope: envelopeOptions,
|
|
body: `${entry.sender}: ${entry.body}${
|
|
entry.messageId ? ` [id:${entry.messageId}]` : ""
|
|
}`,
|
|
}),
|
|
})
|
|
: body;
|
|
const inboundHistory =
|
|
isGroup && historyKey && historyState.historyLimit > 0
|
|
? (historyState.groupHistories.get(historyKey) ?? []).map((entry) => ({
|
|
sender: entry.sender,
|
|
body: entry.body,
|
|
timestamp: entry.timestamp,
|
|
}))
|
|
: undefined;
|
|
|
|
const normalizedTo = isGroup ? `zalouser:group:${chatId}` : `zalouser:${chatId}`;
|
|
|
|
const ctxPayload = core.channel.reply.finalizeInboundContext({
|
|
Body: combinedBody,
|
|
BodyForAgent: rawBody,
|
|
InboundHistory: inboundHistory,
|
|
RawBody: rawBody,
|
|
CommandBody: commandBody,
|
|
BodyForCommands: commandBody,
|
|
From: isGroup ? `zalouser:group:${chatId}` : `zalouser:${senderId}`,
|
|
To: normalizedTo,
|
|
SessionKey: inboundSessionKey,
|
|
AccountId: route.accountId,
|
|
ChatType: isGroup ? "group" : "direct",
|
|
ConversationLabel: fromLabel,
|
|
GroupSubject: isGroup ? groupName || undefined : undefined,
|
|
GroupChannel: isGroup ? groupName || undefined : undefined,
|
|
GroupMembers: isGroup ? groupMembers : undefined,
|
|
SenderName: senderName || undefined,
|
|
SenderId: senderId,
|
|
WasMentioned: isGroup ? mentionGate.effectiveWasMentioned : undefined,
|
|
CommandAuthorized: commandAuthorized,
|
|
Provider: "zalouser",
|
|
Surface: "zalouser",
|
|
MessageSid: resolveZalouserMessageSid({
|
|
msgId: message.msgId,
|
|
cliMsgId: message.cliMsgId,
|
|
fallback: `${message.timestampMs}`,
|
|
}),
|
|
MessageSidFull: formatZalouserMessageSidFull({
|
|
msgId: message.msgId,
|
|
cliMsgId: message.cliMsgId,
|
|
}),
|
|
OriginatingChannel: "zalouser",
|
|
OriginatingTo: normalizedTo,
|
|
});
|
|
|
|
await core.channel.session.recordInboundSession({
|
|
storePath,
|
|
sessionKey: ctxPayload.SessionKey ?? route.sessionKey,
|
|
ctx: ctxPayload,
|
|
onRecordError: (err) => {
|
|
runtime.error?.(`zalouser: failed updating session meta: ${String(err)}`);
|
|
},
|
|
});
|
|
|
|
const { onModelSelected, ...prefixOptions } = createReplyPrefixOptions({
|
|
cfg: config,
|
|
agentId: route.agentId,
|
|
channel: "zalouser",
|
|
accountId: account.accountId,
|
|
});
|
|
const typingCallbacks = createTypingCallbacks({
|
|
start: async () => {
|
|
await sendTypingZalouser(chatId, {
|
|
profile: account.profile,
|
|
isGroup,
|
|
});
|
|
},
|
|
onStartError: (err) => {
|
|
runtime.error?.(
|
|
`[${account.accountId}] zalouser typing start failed for ${chatId}: ${String(err)}`,
|
|
);
|
|
logVerbose(core, runtime, `zalouser typing failed for ${chatId}: ${String(err)}`);
|
|
},
|
|
});
|
|
|
|
await core.channel.reply.dispatchReplyWithBufferedBlockDispatcher({
|
|
ctx: ctxPayload,
|
|
cfg: config,
|
|
dispatcherOptions: {
|
|
...prefixOptions,
|
|
typingCallbacks,
|
|
deliver: async (payload) => {
|
|
await deliverZalouserReply({
|
|
payload: payload as { text?: string; mediaUrls?: string[]; mediaUrl?: string },
|
|
profile: account.profile,
|
|
chatId,
|
|
isGroup,
|
|
runtime,
|
|
core,
|
|
config,
|
|
accountId: account.accountId,
|
|
statusSink,
|
|
tableMode: core.channel.text.resolveMarkdownTableMode({
|
|
cfg: config,
|
|
channel: "zalouser",
|
|
accountId: account.accountId,
|
|
}),
|
|
});
|
|
},
|
|
onError: (err, info) => {
|
|
runtime.error(`[${account.accountId}] Zalouser ${info.kind} reply failed: ${String(err)}`);
|
|
},
|
|
},
|
|
replyOptions: {
|
|
onModelSelected,
|
|
},
|
|
});
|
|
if (isGroup && historyKey) {
|
|
clearHistoryEntriesIfEnabled({
|
|
historyMap: historyState.groupHistories,
|
|
historyKey,
|
|
limit: historyState.historyLimit,
|
|
});
|
|
}
|
|
}
|
|
|
|
async function deliverZalouserReply(params: {
|
|
payload: OutboundReplyPayload;
|
|
profile: string;
|
|
chatId: string;
|
|
isGroup: boolean;
|
|
runtime: RuntimeEnv;
|
|
core: ZalouserCoreRuntime;
|
|
config: OpenClawConfig;
|
|
accountId?: string;
|
|
statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void;
|
|
tableMode?: MarkdownTableMode;
|
|
}): Promise<void> {
|
|
const { payload, profile, chatId, isGroup, runtime, core, config, accountId, statusSink } =
|
|
params;
|
|
const tableMode = params.tableMode ?? "code";
|
|
const text = core.channel.text.convertMarkdownTables(payload.text ?? "", tableMode);
|
|
const chunkMode = core.channel.text.resolveChunkMode(config, "zalouser", accountId);
|
|
const textChunkLimit = core.channel.text.resolveTextChunkLimit(config, "zalouser", accountId, {
|
|
fallbackLimit: ZALOUSER_TEXT_LIMIT,
|
|
});
|
|
|
|
const sentMedia = await sendMediaWithLeadingCaption({
|
|
mediaUrls: resolveOutboundMediaUrls(payload),
|
|
caption: text,
|
|
send: async ({ mediaUrl, caption }) => {
|
|
logVerbose(core, runtime, `Sending media to ${chatId}`);
|
|
await sendMessageZalouser(chatId, caption ?? "", {
|
|
profile,
|
|
mediaUrl,
|
|
isGroup,
|
|
textMode: "markdown",
|
|
textChunkMode: chunkMode,
|
|
textChunkLimit,
|
|
});
|
|
statusSink?.({ lastOutboundAt: Date.now() });
|
|
},
|
|
onError: (error) => {
|
|
runtime.error(`Zalouser media send failed: ${String(error)}`);
|
|
},
|
|
});
|
|
if (sentMedia) {
|
|
return;
|
|
}
|
|
|
|
if (text) {
|
|
try {
|
|
await sendMessageZalouser(chatId, text, {
|
|
profile,
|
|
isGroup,
|
|
textMode: "markdown",
|
|
textChunkMode: chunkMode,
|
|
textChunkLimit,
|
|
});
|
|
statusSink?.({ lastOutboundAt: Date.now() });
|
|
} catch (err) {
|
|
runtime.error(`Zalouser message send failed: ${String(err)}`);
|
|
}
|
|
}
|
|
}
|
|
|
|
export async function monitorZalouserProvider(
|
|
options: ZalouserMonitorOptions,
|
|
): Promise<ZalouserMonitorResult> {
|
|
let { account, config } = options;
|
|
const { abortSignal, statusSink, runtime } = options;
|
|
|
|
const core = getZalouserRuntime();
|
|
const inboundQueue = new KeyedAsyncQueue();
|
|
const historyLimit = Math.max(
|
|
0,
|
|
account.config.historyLimit ??
|
|
config.messages?.groupChat?.historyLimit ??
|
|
DEFAULT_GROUP_HISTORY_LIMIT,
|
|
);
|
|
const groupHistories = new Map<string, HistoryEntry[]>();
|
|
|
|
try {
|
|
const profile = account.profile;
|
|
const allowFromEntries = (account.config.allowFrom ?? [])
|
|
.map((entry) => normalizeZalouserEntry(String(entry)))
|
|
.filter((entry) => entry && entry !== "*");
|
|
const groupAllowFromEntries = (account.config.groupAllowFrom ?? [])
|
|
.map((entry) => normalizeZalouserEntry(String(entry)))
|
|
.filter((entry) => entry && entry !== "*");
|
|
|
|
if (allowFromEntries.length > 0 || groupAllowFromEntries.length > 0) {
|
|
const friends = await listZaloFriends(profile);
|
|
const byName = buildNameIndex(friends, (friend) => friend.displayName);
|
|
if (allowFromEntries.length > 0) {
|
|
const { additions, mapping, unresolved } = resolveUserAllowlistEntries(
|
|
allowFromEntries,
|
|
byName,
|
|
);
|
|
const allowFrom = mergeAllowlist({ existing: account.config.allowFrom, additions });
|
|
account = {
|
|
...account,
|
|
config: {
|
|
...account.config,
|
|
allowFrom,
|
|
},
|
|
};
|
|
summarizeMapping("zalouser users", mapping, unresolved, runtime);
|
|
}
|
|
if (groupAllowFromEntries.length > 0) {
|
|
const { additions, mapping, unresolved } = resolveUserAllowlistEntries(
|
|
groupAllowFromEntries,
|
|
byName,
|
|
);
|
|
const groupAllowFrom = mergeAllowlist({
|
|
existing: account.config.groupAllowFrom,
|
|
additions,
|
|
});
|
|
account = {
|
|
...account,
|
|
config: {
|
|
...account.config,
|
|
groupAllowFrom,
|
|
},
|
|
};
|
|
summarizeMapping("zalouser group users", mapping, unresolved, runtime);
|
|
}
|
|
}
|
|
|
|
const groupsConfig = account.config.groups ?? {};
|
|
const groupKeys = Object.keys(groupsConfig).filter((key) => key !== "*");
|
|
if (groupKeys.length > 0) {
|
|
const groups = await listZaloGroups(profile);
|
|
const byName = buildNameIndex(groups, (group) => group.name);
|
|
const mapping: string[] = [];
|
|
const unresolved: string[] = [];
|
|
const nextGroups = { ...groupsConfig };
|
|
for (const entry of groupKeys) {
|
|
const cleaned = normalizeZalouserEntry(entry);
|
|
if (/^\d+$/.test(cleaned)) {
|
|
if (!nextGroups[cleaned]) {
|
|
nextGroups[cleaned] = groupsConfig[entry];
|
|
}
|
|
mapping.push(`${entry}→${cleaned}`);
|
|
continue;
|
|
}
|
|
const matches = byName.get(cleaned.toLowerCase()) ?? [];
|
|
const match = matches[0];
|
|
const id = match?.groupId ? String(match.groupId) : undefined;
|
|
if (id) {
|
|
if (!nextGroups[id]) {
|
|
nextGroups[id] = groupsConfig[entry];
|
|
}
|
|
mapping.push(`${entry}→${id}`);
|
|
} else {
|
|
unresolved.push(entry);
|
|
}
|
|
}
|
|
account = {
|
|
...account,
|
|
config: {
|
|
...account.config,
|
|
groups: nextGroups,
|
|
},
|
|
};
|
|
summarizeMapping("zalouser groups", mapping, unresolved, runtime);
|
|
}
|
|
} catch (err) {
|
|
runtime.log?.(`zalouser resolve failed; using config entries. ${String(err)}`);
|
|
}
|
|
|
|
let listenerStop: (() => void) | null = null;
|
|
let stopped = false;
|
|
|
|
const stop = () => {
|
|
if (stopped) {
|
|
return;
|
|
}
|
|
stopped = true;
|
|
listenerStop?.();
|
|
listenerStop = null;
|
|
};
|
|
|
|
let settled = false;
|
|
const { promise: waitForExit, resolve: resolveRun, reject: rejectRun } = createDeferred<void>();
|
|
|
|
const settleSuccess = () => {
|
|
if (settled) {
|
|
return;
|
|
}
|
|
settled = true;
|
|
stop();
|
|
resolveRun();
|
|
};
|
|
|
|
const settleFailure = (error: unknown) => {
|
|
if (settled) {
|
|
return;
|
|
}
|
|
settled = true;
|
|
stop();
|
|
rejectRun(error instanceof Error ? error : new Error(String(error)));
|
|
};
|
|
|
|
const onAbort = () => {
|
|
settleSuccess();
|
|
};
|
|
abortSignal.addEventListener("abort", onAbort, { once: true });
|
|
|
|
let listener: Awaited<ReturnType<typeof startZaloListener>>;
|
|
try {
|
|
listener = await startZaloListener({
|
|
accountId: account.accountId,
|
|
profile: account.profile,
|
|
abortSignal,
|
|
onMessage: (msg) => {
|
|
if (stopped) {
|
|
return;
|
|
}
|
|
logVerbose(core, runtime, `[${account.accountId}] inbound message`);
|
|
statusSink?.({ lastInboundAt: Date.now() });
|
|
const queueKey = resolveInboundQueueKey(msg);
|
|
void inboundQueue
|
|
.enqueue(queueKey, async () => {
|
|
if (stopped || abortSignal.aborted) {
|
|
return;
|
|
}
|
|
await processMessage(
|
|
msg,
|
|
account,
|
|
config,
|
|
core,
|
|
runtime,
|
|
{ historyLimit, groupHistories },
|
|
statusSink,
|
|
);
|
|
})
|
|
.catch((err) => {
|
|
runtime.error(`[${account.accountId}] Failed to process message: ${String(err)}`);
|
|
});
|
|
},
|
|
onError: (err) => {
|
|
if (stopped || abortSignal.aborted) {
|
|
return;
|
|
}
|
|
runtime.error(`[${account.accountId}] Zalo listener error: ${String(err)}`);
|
|
settleFailure(err);
|
|
},
|
|
});
|
|
} catch (error) {
|
|
abortSignal.removeEventListener("abort", onAbort);
|
|
throw error;
|
|
}
|
|
|
|
listenerStop = listener.stop;
|
|
if (stopped) {
|
|
listenerStop();
|
|
listenerStop = null;
|
|
}
|
|
|
|
if (abortSignal.aborted) {
|
|
settleSuccess();
|
|
}
|
|
|
|
try {
|
|
await waitForExit;
|
|
} finally {
|
|
abortSignal.removeEventListener("abort", onAbort);
|
|
}
|
|
|
|
return { stop };
|
|
}
|
|
|
|
export const __testing = {
|
|
processMessage: async (params: {
|
|
message: ZaloInboundMessage;
|
|
account: ResolvedZalouserAccount;
|
|
config: OpenClawConfig;
|
|
runtime: RuntimeEnv;
|
|
historyState?: {
|
|
historyLimit?: number;
|
|
groupHistories?: Map<string, HistoryEntry[]>;
|
|
};
|
|
statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void;
|
|
}) => {
|
|
const historyLimit = Math.max(
|
|
0,
|
|
params.historyState?.historyLimit ??
|
|
params.account.config.historyLimit ??
|
|
params.config.messages?.groupChat?.historyLimit ??
|
|
DEFAULT_GROUP_HISTORY_LIMIT,
|
|
);
|
|
const groupHistories = params.historyState?.groupHistories ?? new Map<string, HistoryEntry[]>();
|
|
await processMessage(
|
|
params.message,
|
|
params.account,
|
|
params.config,
|
|
getZalouserRuntime(),
|
|
params.runtime,
|
|
{ historyLimit, groupHistories },
|
|
params.statusSink,
|
|
);
|
|
},
|
|
};
|