From 58baf22230e0ec3b816da4dd5114db1734c38469 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 13 Mar 2026 21:45:01 +0000 Subject: [PATCH] refactor: share zalo monitor processing context --- extensions/zalo/src/monitor.ts | 175 ++++++++++++++------------------- 1 file changed, 72 insertions(+), 103 deletions(-) diff --git a/extensions/zalo/src/monitor.ts b/extensions/zalo/src/monitor.ts index bd1351bd147..2c5c420ce60 100644 --- a/extensions/zalo/src/monitor.ts +++ b/extensions/zalo/src/monitor.ts @@ -75,6 +75,35 @@ const WEBHOOK_CLEANUP_TIMEOUT_MS = 5_000; const ZALO_TYPING_TIMEOUT_MS = 5_000; type ZaloCoreRuntime = ReturnType; +type ZaloStatusSink = (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void; +type ZaloProcessingContext = { + token: string; + account: ResolvedZaloAccount; + config: OpenClawConfig; + runtime: ZaloRuntimeEnv; + core: ZaloCoreRuntime; + statusSink?: ZaloStatusSink; + fetcher?: ZaloFetch; +}; +type ZaloPollingLoopParams = ZaloProcessingContext & { + abortSignal: AbortSignal; + isStopped: () => boolean; + mediaMaxMb: number; +}; +type ZaloUpdateProcessingParams = ZaloProcessingContext & { + update: ZaloUpdate; + mediaMaxMb: number; +}; +type ZaloMessagePipelineParams = ZaloProcessingContext & { + message: ZaloMessage; + text?: string; + mediaPath?: string; + mediaType?: string; +}; +type ZaloImageMessageParams = ZaloProcessingContext & { + message: ZaloMessage; + mediaMaxMb: number; +}; function formatZaloError(error: unknown): string { if (error instanceof Error) { @@ -135,32 +164,21 @@ export async function handleZaloWebhookRequest( res: ServerResponse, ): Promise { return handleZaloWebhookRequestInternal(req, res, async ({ update, target }) => { - await processUpdate( + await processUpdate({ update, - target.token, - target.account, - target.config, - target.runtime, - target.core as ZaloCoreRuntime, - target.mediaMaxMb, - target.statusSink, - target.fetcher, - ); + token: target.token, + account: target.account, + config: target.config, + runtime: target.runtime, + core: target.core as ZaloCoreRuntime, + mediaMaxMb: target.mediaMaxMb, + statusSink: target.statusSink, + fetcher: target.fetcher, + }); }); } -function startPollingLoop(params: { - token: string; - account: ResolvedZaloAccount; - config: OpenClawConfig; - runtime: ZaloRuntimeEnv; - core: ZaloCoreRuntime; - abortSignal: AbortSignal; - isStopped: () => boolean; - mediaMaxMb: number; - statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void; - fetcher?: ZaloFetch; -}) { +function startPollingLoop(params: ZaloPollingLoopParams) { const { token, account, @@ -174,6 +192,16 @@ function startPollingLoop(params: { fetcher, } = params; const pollTimeout = 30; + const processingContext = { + token, + account, + config, + runtime, + core, + mediaMaxMb, + statusSink, + fetcher, + }; runtime.log?.(`[${account.accountId}] Zalo polling loop started timeout=${String(pollTimeout)}s`); @@ -186,17 +214,10 @@ function startPollingLoop(params: { const response = await getUpdates(token, { timeout: pollTimeout }, fetcher); if (response.ok && response.result) { statusSink?.({ lastInboundAt: Date.now() }); - await processUpdate( - response.result, - token, - account, - config, - runtime, - core, - mediaMaxMb, - statusSink, - fetcher, - ); + await processUpdate({ + update: response.result, + ...processingContext, + }); } } catch (err) { if (err instanceof ZaloApiError && err.isPollingTimeout) { @@ -215,38 +236,27 @@ function startPollingLoop(params: { void poll(); } -async function processUpdate( - update: ZaloUpdate, - token: string, - account: ResolvedZaloAccount, - config: OpenClawConfig, - runtime: ZaloRuntimeEnv, - core: ZaloCoreRuntime, - mediaMaxMb: number, - statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void, - fetcher?: ZaloFetch, -): Promise { +async function processUpdate(params: ZaloUpdateProcessingParams): Promise { + const { update, token, account, config, runtime, core, mediaMaxMb, statusSink, fetcher } = params; const { event_name, message } = update; + const sharedContext = { token, account, config, runtime, core, statusSink, fetcher }; if (!message) { return; } switch (event_name) { case "message.text.received": - await handleTextMessage(message, token, account, config, runtime, core, statusSink, fetcher); + await handleTextMessage({ + message, + ...sharedContext, + }); break; case "message.image.received": - await handleImageMessage( + await handleImageMessage({ message, - token, - account, - config, - runtime, - core, + ...sharedContext, mediaMaxMb, - statusSink, - fetcher, - ); + }); break; case "message.sticker.received": logVerbose(core, runtime, `[${account.accountId}] Received sticker from ${message.from.id}`); @@ -262,46 +272,24 @@ async function processUpdate( } async function handleTextMessage( - message: ZaloMessage, - token: string, - account: ResolvedZaloAccount, - config: OpenClawConfig, - runtime: ZaloRuntimeEnv, - core: ZaloCoreRuntime, - statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void, - fetcher?: ZaloFetch, + params: ZaloProcessingContext & { message: ZaloMessage }, ): Promise { + const { message } = params; const { text } = message; if (!text?.trim()) { return; } await processMessageWithPipeline({ - message, - token, - account, - config, - runtime, - core, + ...params, text, mediaPath: undefined, mediaType: undefined, - statusSink, - fetcher, }); } -async function handleImageMessage( - message: ZaloMessage, - token: string, - account: ResolvedZaloAccount, - config: OpenClawConfig, - runtime: ZaloRuntimeEnv, - core: ZaloCoreRuntime, - mediaMaxMb: number, - statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void, - fetcher?: ZaloFetch, -): Promise { +async function handleImageMessage(params: ZaloImageMessageParams): Promise { + const { message, mediaMaxMb } = params; const { photo, caption } = message; let mediaPath: string | undefined; @@ -325,33 +313,14 @@ async function handleImageMessage( } await processMessageWithPipeline({ - message, - token, - account, - config, - runtime, - core, + ...params, text: caption, mediaPath, mediaType, - statusSink, - fetcher, }); } -async function processMessageWithPipeline(params: { - message: ZaloMessage; - token: string; - account: ResolvedZaloAccount; - config: OpenClawConfig; - runtime: ZaloRuntimeEnv; - core: ZaloCoreRuntime; - text?: string; - mediaPath?: string; - mediaType?: string; - statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void; - fetcher?: ZaloFetch; -}): Promise { +async function processMessageWithPipeline(params: ZaloMessagePipelineParams): Promise { const { message, token, @@ -609,7 +578,7 @@ async function deliverZaloReply(params: { core: ZaloCoreRuntime; config: OpenClawConfig; accountId?: string; - statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void; + statusSink?: ZaloStatusSink; fetcher?: ZaloFetch; tableMode?: MarkdownTableMode; }): Promise {