openclaw/extensions/zalo/src/monitor.ts

775 lines
22 KiB
TypeScript

import type { IncomingMessage, ServerResponse } from "node:http";
import type {
MarkdownTableMode,
OpenClawConfig,
OutboundReplyPayload,
} from "openclaw/plugin-sdk/zalo";
import {
createTypingCallbacks,
createScopedPairingAccess,
createReplyPrefixOptions,
issuePairingChallenge,
logTypingFailure,
resolveDirectDmAuthorizationOutcome,
resolveSenderCommandAuthorizationWithRuntime,
resolveOutboundMediaUrls,
resolveDefaultGroupPolicy,
resolveInboundRouteEnvelopeBuilderWithRuntime,
sendMediaWithLeadingCaption,
resolveWebhookPath,
waitForAbortSignal,
warnMissingProviderGroupPolicyFallbackOnce,
} from "openclaw/plugin-sdk/zalo";
import type { ResolvedZaloAccount } from "./accounts.js";
import {
ZaloApiError,
deleteWebhook,
getWebhookInfo,
getUpdates,
sendChatAction,
sendMessage,
sendPhoto,
setWebhook,
type ZaloFetch,
type ZaloMessage,
type ZaloUpdate,
} from "./api.js";
import {
evaluateZaloGroupAccess,
isZaloSenderAllowed,
resolveZaloRuntimeGroupPolicy,
} from "./group-access.js";
import {
clearZaloWebhookSecurityStateForTest,
getZaloWebhookRateLimitStateSizeForTest,
getZaloWebhookStatusCounterSizeForTest,
handleZaloWebhookRequest as handleZaloWebhookRequestInternal,
registerZaloWebhookTarget as registerZaloWebhookTargetInternal,
type ZaloWebhookTarget,
} from "./monitor.webhook.js";
import { resolveZaloProxyFetch } from "./proxy.js";
import { getZaloRuntime } from "./runtime.js";
export type ZaloRuntimeEnv = {
log?: (message: string) => void;
error?: (message: string) => void;
};
export type ZaloMonitorOptions = {
token: string;
account: ResolvedZaloAccount;
config: OpenClawConfig;
runtime: ZaloRuntimeEnv;
abortSignal: AbortSignal;
useWebhook?: boolean;
webhookUrl?: string;
webhookSecret?: string;
webhookPath?: string;
fetcher?: ZaloFetch;
statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void;
};
const ZALO_TEXT_LIMIT = 2000;
const DEFAULT_MEDIA_MAX_MB = 5;
const WEBHOOK_CLEANUP_TIMEOUT_MS = 5_000;
const ZALO_TYPING_TIMEOUT_MS = 5_000;
type ZaloCoreRuntime = ReturnType<typeof getZaloRuntime>;
type ZaloStatusSink = (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void;
type ZaloProcessingContext = {
token: string;
account: ResolvedZaloAccount;
config: OpenClawConfig;
runtime: ZaloRuntimeEnv;
core: ZaloCoreRuntime;
statusSink?: ZaloStatusSink;
fetcher?: ZaloFetch;
};
type ZaloPollingLoopParams = ZaloProcessingContext & {
abortSignal: AbortSignal;
isStopped: () => boolean;
mediaMaxMb: number;
};
type ZaloUpdateProcessingParams = ZaloProcessingContext & {
update: ZaloUpdate;
mediaMaxMb: number;
};
type ZaloMessagePipelineParams = ZaloProcessingContext & {
message: ZaloMessage;
text?: string;
mediaPath?: string;
mediaType?: string;
};
type ZaloImageMessageParams = ZaloProcessingContext & {
message: ZaloMessage;
mediaMaxMb: number;
};
function formatZaloError(error: unknown): string {
if (error instanceof Error) {
return error.stack ?? `${error.name}: ${error.message}`;
}
return String(error);
}
function describeWebhookTarget(rawUrl: string): string {
try {
const parsed = new URL(rawUrl);
return `${parsed.origin}${parsed.pathname}`;
} catch {
return rawUrl;
}
}
function normalizeWebhookUrl(url: string | undefined): string | undefined {
const trimmed = url?.trim();
return trimmed ? trimmed : undefined;
}
function logVerbose(core: ZaloCoreRuntime, runtime: ZaloRuntimeEnv, message: string): void {
if (core.logging.shouldLogVerbose()) {
runtime.log?.(`[zalo] ${message}`);
}
}
export function registerZaloWebhookTarget(target: ZaloWebhookTarget): () => void {
return registerZaloWebhookTargetInternal(target, {
route: {
auth: "plugin",
match: "exact",
pluginId: "zalo",
source: "zalo-webhook",
accountId: target.account.accountId,
log: target.runtime.log,
handler: async (req, res) => {
const handled = await handleZaloWebhookRequest(req, res);
if (!handled && !res.headersSent) {
res.statusCode = 404;
res.setHeader("Content-Type", "text/plain; charset=utf-8");
res.end("Not Found");
}
},
},
});
}
export {
clearZaloWebhookSecurityStateForTest,
getZaloWebhookRateLimitStateSizeForTest,
getZaloWebhookStatusCounterSizeForTest,
};
export async function handleZaloWebhookRequest(
req: IncomingMessage,
res: ServerResponse,
): Promise<boolean> {
return handleZaloWebhookRequestInternal(req, res, async ({ update, target }) => {
await processUpdate({
update,
token: target.token,
account: target.account,
config: target.config,
runtime: target.runtime,
core: target.core as ZaloCoreRuntime,
mediaMaxMb: target.mediaMaxMb,
statusSink: target.statusSink,
fetcher: target.fetcher,
});
});
}
function startPollingLoop(params: ZaloPollingLoopParams) {
const {
token,
account,
config,
runtime,
core,
abortSignal,
isStopped,
mediaMaxMb,
statusSink,
fetcher,
} = params;
const pollTimeout = 30;
const processingContext = {
token,
account,
config,
runtime,
core,
mediaMaxMb,
statusSink,
fetcher,
};
runtime.log?.(`[${account.accountId}] Zalo polling loop started timeout=${String(pollTimeout)}s`);
const poll = async () => {
if (isStopped() || abortSignal.aborted) {
return;
}
try {
const response = await getUpdates(token, { timeout: pollTimeout }, fetcher);
if (response.ok && response.result) {
statusSink?.({ lastInboundAt: Date.now() });
await processUpdate({
update: response.result,
...processingContext,
});
}
} catch (err) {
if (err instanceof ZaloApiError && err.isPollingTimeout) {
// no updates
} else if (!isStopped() && !abortSignal.aborted) {
runtime.error?.(`[${account.accountId}] Zalo polling error: ${formatZaloError(err)}`);
await new Promise((resolve) => setTimeout(resolve, 5000));
}
}
if (!isStopped() && !abortSignal.aborted) {
setImmediate(poll);
}
};
void poll();
}
async function processUpdate(params: ZaloUpdateProcessingParams): Promise<void> {
const { update, token, account, config, runtime, core, mediaMaxMb, statusSink, fetcher } = params;
const { event_name, message } = update;
const sharedContext = { token, account, config, runtime, core, statusSink, fetcher };
if (!message) {
return;
}
switch (event_name) {
case "message.text.received":
await handleTextMessage({
message,
...sharedContext,
});
break;
case "message.image.received":
await handleImageMessage({
message,
...sharedContext,
mediaMaxMb,
});
break;
case "message.sticker.received":
logVerbose(core, runtime, `[${account.accountId}] Received sticker from ${message.from.id}`);
break;
case "message.unsupported.received":
logVerbose(
core,
runtime,
`[${account.accountId}] Received unsupported message type from ${message.from.id}`,
);
break;
}
}
async function handleTextMessage(
params: ZaloProcessingContext & { message: ZaloMessage },
): Promise<void> {
const { message } = params;
const { text } = message;
if (!text?.trim()) {
return;
}
await processMessageWithPipeline({
...params,
text,
mediaPath: undefined,
mediaType: undefined,
});
}
async function handleImageMessage(params: ZaloImageMessageParams): Promise<void> {
const { message, mediaMaxMb } = params;
const { photo, caption } = message;
let mediaPath: string | undefined;
let mediaType: string | undefined;
if (photo) {
try {
const maxBytes = mediaMaxMb * 1024 * 1024;
const fetched = await core.channel.media.fetchRemoteMedia({ url: photo, maxBytes });
const saved = await core.channel.media.saveMediaBuffer(
fetched.buffer,
fetched.contentType,
"inbound",
maxBytes,
);
mediaPath = saved.path;
mediaType = saved.contentType;
} catch (err) {
runtime.error?.(`[${account.accountId}] Failed to download Zalo image: ${String(err)}`);
}
}
await processMessageWithPipeline({
...params,
text: caption,
mediaPath,
mediaType,
});
}
async function processMessageWithPipeline(params: ZaloMessagePipelineParams): Promise<void> {
const {
message,
token,
account,
config,
runtime,
core,
text,
mediaPath,
mediaType,
statusSink,
fetcher,
} = params;
const pairing = createScopedPairingAccess({
core,
channel: "zalo",
accountId: account.accountId,
});
const { from, chat, message_id, date } = message;
const isGroup = chat.chat_type === "GROUP";
const chatId = chat.id;
const senderId = from.id;
const senderName = from.name;
const dmPolicy = account.config.dmPolicy ?? "pairing";
const configAllowFrom = (account.config.allowFrom ?? []).map((v) => String(v));
const configuredGroupAllowFrom = (account.config.groupAllowFrom ?? []).map((v) => String(v));
const groupAllowFrom =
configuredGroupAllowFrom.length > 0 ? configuredGroupAllowFrom : configAllowFrom;
const defaultGroupPolicy = resolveDefaultGroupPolicy(config);
const groupAccess = isGroup
? evaluateZaloGroupAccess({
providerConfigPresent: config.channels?.zalo !== undefined,
configuredGroupPolicy: account.config.groupPolicy,
defaultGroupPolicy,
groupAllowFrom,
senderId,
})
: undefined;
if (groupAccess) {
warnMissingProviderGroupPolicyFallbackOnce({
providerMissingFallbackApplied: groupAccess.providerMissingFallbackApplied,
providerKey: "zalo",
accountId: account.accountId,
log: (message) => logVerbose(core, runtime, message),
});
if (!groupAccess.allowed) {
if (groupAccess.reason === "disabled") {
logVerbose(core, runtime, `zalo: drop group ${chatId} (groupPolicy=disabled)`);
} else if (groupAccess.reason === "empty_allowlist") {
logVerbose(
core,
runtime,
`zalo: drop group ${chatId} (groupPolicy=allowlist, no groupAllowFrom)`,
);
} else if (groupAccess.reason === "sender_not_allowlisted") {
logVerbose(core, runtime, `zalo: drop group sender ${senderId} (groupPolicy=allowlist)`);
}
return;
}
}
const rawBody = text?.trim() || (mediaPath ? "<media:image>" : "");
const { senderAllowedForCommands, commandAuthorized } =
await resolveSenderCommandAuthorizationWithRuntime({
cfg: config,
rawBody,
isGroup,
dmPolicy,
configuredAllowFrom: configAllowFrom,
configuredGroupAllowFrom: groupAllowFrom,
senderId,
isSenderAllowed: isZaloSenderAllowed,
readAllowFromStore: pairing.readAllowFromStore,
runtime: core.channel.commands,
});
const directDmOutcome = resolveDirectDmAuthorizationOutcome({
isGroup,
dmPolicy,
senderAllowedForCommands,
});
if (directDmOutcome === "disabled") {
logVerbose(core, runtime, `Blocked zalo DM from ${senderId} (dmPolicy=disabled)`);
return;
}
if (directDmOutcome === "unauthorized") {
if (dmPolicy === "pairing") {
await issuePairingChallenge({
channel: "zalo",
senderId,
senderIdLine: `Your Zalo user id: ${senderId}`,
meta: { name: senderName ?? undefined },
upsertPairingRequest: pairing.upsertPairingRequest,
onCreated: () => {
logVerbose(core, runtime, `zalo pairing request sender=${senderId}`);
},
sendPairingReply: async (text) => {
await sendMessage(
token,
{
chat_id: chatId,
text,
},
fetcher,
);
statusSink?.({ lastOutboundAt: Date.now() });
},
onReplyError: (err) => {
logVerbose(core, runtime, `zalo pairing reply failed for ${senderId}: ${String(err)}`);
},
});
} else {
logVerbose(
core,
runtime,
`Blocked unauthorized zalo sender ${senderId} (dmPolicy=${dmPolicy})`,
);
}
return;
}
const { route, buildEnvelope } = resolveInboundRouteEnvelopeBuilderWithRuntime({
cfg: config,
channel: "zalo",
accountId: account.accountId,
peer: {
kind: isGroup ? ("group" as const) : ("direct" as const),
id: chatId,
},
runtime: core.channel,
sessionStore: config.session?.store,
});
if (
isGroup &&
core.channel.commands.isControlCommandMessage(rawBody, config) &&
commandAuthorized !== true
) {
logVerbose(core, runtime, `zalo: drop control command from unauthorized sender ${senderId}`);
return;
}
const fromLabel = isGroup ? `group:${chatId}` : senderName || `user:${senderId}`;
const { storePath, body } = buildEnvelope({
channel: "Zalo",
from: fromLabel,
timestamp: date ? date * 1000 : undefined,
body: rawBody,
});
const ctxPayload = core.channel.reply.finalizeInboundContext({
Body: body,
BodyForAgent: rawBody,
RawBody: rawBody,
CommandBody: rawBody,
From: isGroup ? `zalo:group:${chatId}` : `zalo:${senderId}`,
To: `zalo:${chatId}`,
SessionKey: route.sessionKey,
AccountId: route.accountId,
ChatType: isGroup ? "group" : "direct",
ConversationLabel: fromLabel,
SenderName: senderName || undefined,
SenderId: senderId,
CommandAuthorized: commandAuthorized,
Provider: "zalo",
Surface: "zalo",
MessageSid: message_id,
MediaPath: mediaPath,
MediaType: mediaType,
MediaUrl: mediaPath,
OriginatingChannel: "zalo",
OriginatingTo: `zalo:${chatId}`,
});
await core.channel.session.recordInboundSession({
storePath,
sessionKey: ctxPayload.SessionKey ?? route.sessionKey,
ctx: ctxPayload,
onRecordError: (err) => {
runtime.error?.(`zalo: failed updating session meta: ${String(err)}`);
},
});
const tableMode = core.channel.text.resolveMarkdownTableMode({
cfg: config,
channel: "zalo",
accountId: account.accountId,
});
const { onModelSelected, ...prefixOptions } = createReplyPrefixOptions({
cfg: config,
agentId: route.agentId,
channel: "zalo",
accountId: account.accountId,
});
const typingCallbacks = createTypingCallbacks({
start: async () => {
await sendChatAction(
token,
{
chat_id: chatId,
action: "typing",
},
fetcher,
ZALO_TYPING_TIMEOUT_MS,
);
},
onStartError: (err) => {
logTypingFailure({
log: (message) => logVerbose(core, runtime, message),
channel: "zalo",
action: "start",
target: chatId,
error: err,
});
},
});
await core.channel.reply.dispatchReplyWithBufferedBlockDispatcher({
ctx: ctxPayload,
cfg: config,
dispatcherOptions: {
...prefixOptions,
typingCallbacks,
deliver: async (payload) => {
await deliverZaloReply({
payload,
token,
chatId,
runtime,
core,
config,
accountId: account.accountId,
statusSink,
fetcher,
tableMode,
});
},
onError: (err, info) => {
runtime.error?.(`[${account.accountId}] Zalo ${info.kind} reply failed: ${String(err)}`);
},
},
replyOptions: {
onModelSelected,
},
});
}
async function deliverZaloReply(params: {
payload: OutboundReplyPayload;
token: string;
chatId: string;
runtime: ZaloRuntimeEnv;
core: ZaloCoreRuntime;
config: OpenClawConfig;
accountId?: string;
statusSink?: ZaloStatusSink;
fetcher?: ZaloFetch;
tableMode?: MarkdownTableMode;
}): Promise<void> {
const { payload, token, chatId, runtime, core, config, accountId, statusSink, fetcher } = params;
const tableMode = params.tableMode ?? "code";
const text = core.channel.text.convertMarkdownTables(payload.text ?? "", tableMode);
const sentMedia = await sendMediaWithLeadingCaption({
mediaUrls: resolveOutboundMediaUrls(payload),
caption: text,
send: async ({ mediaUrl, caption }) => {
await sendPhoto(token, { chat_id: chatId, photo: mediaUrl, caption }, fetcher);
statusSink?.({ lastOutboundAt: Date.now() });
},
onError: (error) => {
runtime.error?.(`Zalo photo send failed: ${String(error)}`);
},
});
if (sentMedia) {
return;
}
if (text) {
const chunkMode = core.channel.text.resolveChunkMode(config, "zalo", accountId);
const chunks = core.channel.text.chunkMarkdownTextWithMode(text, ZALO_TEXT_LIMIT, chunkMode);
for (const chunk of chunks) {
try {
await sendMessage(token, { chat_id: chatId, text: chunk }, fetcher);
statusSink?.({ lastOutboundAt: Date.now() });
} catch (err) {
runtime.error?.(`Zalo message send failed: ${String(err)}`);
}
}
}
}
export async function monitorZaloProvider(options: ZaloMonitorOptions): Promise<void> {
const {
token,
account,
config,
runtime,
abortSignal,
useWebhook,
webhookUrl,
webhookSecret,
webhookPath,
statusSink,
fetcher: fetcherOverride,
} = options;
const core = getZaloRuntime();
const effectiveMediaMaxMb = account.config.mediaMaxMb ?? DEFAULT_MEDIA_MAX_MB;
const fetcher = fetcherOverride ?? resolveZaloProxyFetch(account.config.proxy);
const mode = useWebhook ? "webhook" : "polling";
let stopped = false;
const stopHandlers: Array<() => void> = [];
let cleanupWebhook: (() => Promise<void>) | undefined;
const stop = () => {
if (stopped) {
return;
}
stopped = true;
for (const handler of stopHandlers) {
handler();
}
};
runtime.log?.(
`[${account.accountId}] Zalo provider init mode=${mode} mediaMaxMb=${String(effectiveMediaMaxMb)}`,
);
try {
if (useWebhook) {
if (!webhookUrl || !webhookSecret) {
throw new Error("Zalo webhookUrl and webhookSecret are required for webhook mode");
}
if (!webhookUrl.startsWith("https://")) {
throw new Error("Zalo webhook URL must use HTTPS");
}
if (webhookSecret.length < 8 || webhookSecret.length > 256) {
throw new Error("Zalo webhook secret must be 8-256 characters");
}
const path = resolveWebhookPath({ webhookPath, webhookUrl, defaultPath: null });
if (!path) {
throw new Error("Zalo webhookPath could not be derived");
}
runtime.log?.(
`[${account.accountId}] Zalo configuring webhook path=${path} target=${describeWebhookTarget(webhookUrl)}`,
);
await setWebhook(token, { url: webhookUrl, secret_token: webhookSecret }, fetcher);
let webhookCleanupPromise: Promise<void> | undefined;
cleanupWebhook = async () => {
if (!webhookCleanupPromise) {
webhookCleanupPromise = (async () => {
runtime.log?.(`[${account.accountId}] Zalo stopping; deleting webhook`);
try {
await deleteWebhook(token, fetcher, WEBHOOK_CLEANUP_TIMEOUT_MS);
runtime.log?.(`[${account.accountId}] Zalo webhook deleted`);
} catch (err) {
const detail =
err instanceof Error && err.name === "AbortError"
? `timed out after ${String(WEBHOOK_CLEANUP_TIMEOUT_MS)}ms`
: formatZaloError(err);
runtime.error?.(`[${account.accountId}] Zalo webhook delete failed: ${detail}`);
}
})();
}
await webhookCleanupPromise;
};
runtime.log?.(`[${account.accountId}] Zalo webhook registered path=${path}`);
const unregister = registerZaloWebhookTarget({
token,
account,
config,
runtime,
core,
path,
secret: webhookSecret,
statusSink: (patch) => statusSink?.(patch),
mediaMaxMb: effectiveMediaMaxMb,
fetcher,
});
stopHandlers.push(unregister);
await waitForAbortSignal(abortSignal);
return;
}
runtime.log?.(`[${account.accountId}] Zalo polling mode: clearing webhook before startup`);
try {
try {
const currentWebhookUrl = normalizeWebhookUrl(
(await getWebhookInfo(token, fetcher)).result?.url,
);
if (!currentWebhookUrl) {
runtime.log?.(`[${account.accountId}] Zalo polling mode ready (no webhook configured)`);
} else {
runtime.log?.(
`[${account.accountId}] Zalo polling mode disabling existing webhook ${describeWebhookTarget(currentWebhookUrl)}`,
);
await deleteWebhook(token, fetcher);
runtime.log?.(`[${account.accountId}] Zalo polling mode ready (webhook disabled)`);
}
} catch (err) {
if (err instanceof ZaloApiError && err.errorCode === 404) {
// Some Zalo environments do not expose webhook inspection for polling bots.
runtime.log?.(
`[${account.accountId}] Zalo polling mode webhook inspection unavailable; continuing without webhook cleanup`,
);
} else {
throw err;
}
}
} catch (err) {
runtime.error?.(
`[${account.accountId}] Zalo polling startup could not clear webhook: ${formatZaloError(err)}`,
);
}
startPollingLoop({
token,
account,
config,
runtime,
core,
abortSignal,
isStopped: () => stopped,
mediaMaxMb: effectiveMediaMaxMb,
statusSink,
fetcher,
});
await waitForAbortSignal(abortSignal);
} catch (err) {
runtime.error?.(
`[${account.accountId}] Zalo provider startup failed mode=${mode}: ${formatZaloError(err)}`,
);
throw err;
} finally {
await cleanupWebhook?.();
stop();
runtime.log?.(`[${account.accountId}] Zalo provider stopped mode=${mode}`);
}
}
export const __testing = {
evaluateZaloGroupAccess,
resolveZaloRuntimeGroupPolicy,
};