mirror of https://github.com/openclaw/openclaw.git
618 lines
21 KiB
TypeScript
618 lines
21 KiB
TypeScript
import { resolveSessionAgentId } from "../../agents/agent-scope.js";
|
|
import type { OpenClawConfig } from "../../config/config.js";
|
|
import {
|
|
loadSessionStore,
|
|
parseSessionThreadInfo,
|
|
resolveSessionStoreEntry,
|
|
resolveStorePath,
|
|
type SessionEntry,
|
|
} from "../../config/sessions.js";
|
|
import { shouldSuppressLocalDiscordExecApprovalPrompt } from "../../discord/exec-approvals.js";
|
|
import { logVerbose } from "../../globals.js";
|
|
import { fireAndForgetHook } from "../../hooks/fire-and-forget.js";
|
|
import { createInternalHookEvent, triggerInternalHook } from "../../hooks/internal-hooks.js";
|
|
import {
|
|
deriveInboundMessageHookContext,
|
|
toInternalMessageReceivedContext,
|
|
toPluginMessageContext,
|
|
toPluginMessageReceivedEvent,
|
|
} from "../../hooks/message-hook-mappers.js";
|
|
import { isDiagnosticsEnabled } from "../../infra/diagnostic-events.js";
|
|
import {
|
|
logMessageProcessed,
|
|
logMessageQueued,
|
|
logSessionStateChange,
|
|
} from "../../logging/diagnostic.js";
|
|
import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js";
|
|
import { resolveSendPolicy } from "../../sessions/send-policy.js";
|
|
import { maybeApplyTtsToPayload, normalizeTtsAutoMode, resolveTtsConfig } from "../../tts/tts.js";
|
|
import { INTERNAL_MESSAGE_CHANNEL, normalizeMessageChannel } from "../../utils/message-channel.js";
|
|
import { getReplyFromConfig } from "../reply.js";
|
|
import type { FinalizedMsgContext } from "../templating.js";
|
|
import type { GetReplyOptions, ReplyPayload } from "../types.js";
|
|
import { formatAbortReplyText, tryFastAbortFromMessage } from "./abort.js";
|
|
import { shouldBypassAcpDispatchForCommand, tryDispatchAcpReply } from "./dispatch-acp.js";
|
|
import { shouldSkipDuplicateInbound } from "./inbound-dedupe.js";
|
|
import type { ReplyDispatcher, ReplyDispatchKind } from "./reply-dispatcher.js";
|
|
import { shouldSuppressReasoningPayload } from "./reply-payloads.js";
|
|
import { isRoutableChannel, routeReply } from "./route-reply.js";
|
|
import { resolveRunTypingPolicy } from "./typing-policy.js";
|
|
|
|
const AUDIO_PLACEHOLDER_RE = /^<media:audio>(\s*\([^)]*\))?$/i;
|
|
const AUDIO_HEADER_RE = /^\[Audio\b/i;
|
|
const normalizeMediaType = (value: string): string => value.split(";")[0]?.trim().toLowerCase();
|
|
|
|
const isInboundAudioContext = (ctx: FinalizedMsgContext): boolean => {
|
|
const rawTypes = [
|
|
typeof ctx.MediaType === "string" ? ctx.MediaType : undefined,
|
|
...(Array.isArray(ctx.MediaTypes) ? ctx.MediaTypes : []),
|
|
].filter(Boolean) as string[];
|
|
const types = rawTypes.map((type) => normalizeMediaType(type));
|
|
if (types.some((type) => type === "audio" || type.startsWith("audio/"))) {
|
|
return true;
|
|
}
|
|
|
|
const body =
|
|
typeof ctx.BodyForCommands === "string"
|
|
? ctx.BodyForCommands
|
|
: typeof ctx.CommandBody === "string"
|
|
? ctx.CommandBody
|
|
: typeof ctx.RawBody === "string"
|
|
? ctx.RawBody
|
|
: typeof ctx.Body === "string"
|
|
? ctx.Body
|
|
: "";
|
|
const trimmed = body.trim();
|
|
if (!trimmed) {
|
|
return false;
|
|
}
|
|
if (AUDIO_PLACEHOLDER_RE.test(trimmed)) {
|
|
return true;
|
|
}
|
|
return AUDIO_HEADER_RE.test(trimmed);
|
|
};
|
|
|
|
const resolveSessionStoreLookup = (
|
|
ctx: FinalizedMsgContext,
|
|
cfg: OpenClawConfig,
|
|
): {
|
|
sessionKey?: string;
|
|
entry?: SessionEntry;
|
|
} => {
|
|
const targetSessionKey =
|
|
ctx.CommandSource === "native" ? ctx.CommandTargetSessionKey?.trim() : undefined;
|
|
const sessionKey = (targetSessionKey ?? ctx.SessionKey)?.trim();
|
|
if (!sessionKey) {
|
|
return {};
|
|
}
|
|
const agentId = resolveSessionAgentId({ sessionKey, config: cfg });
|
|
const storePath = resolveStorePath(cfg.session?.store, { agentId });
|
|
try {
|
|
const store = loadSessionStore(storePath);
|
|
return {
|
|
sessionKey,
|
|
entry: resolveSessionStoreEntry({ store, sessionKey }).existing,
|
|
};
|
|
} catch {
|
|
return {
|
|
sessionKey,
|
|
};
|
|
}
|
|
};
|
|
|
|
export type DispatchFromConfigResult = {
|
|
queuedFinal: boolean;
|
|
counts: Record<ReplyDispatchKind, number>;
|
|
};
|
|
|
|
export async function dispatchReplyFromConfig(params: {
|
|
ctx: FinalizedMsgContext;
|
|
cfg: OpenClawConfig;
|
|
dispatcher: ReplyDispatcher;
|
|
replyOptions?: Omit<GetReplyOptions, "onToolResult" | "onBlockReply">;
|
|
replyResolver?: typeof getReplyFromConfig;
|
|
}): Promise<DispatchFromConfigResult> {
|
|
const { ctx, cfg, dispatcher } = params;
|
|
const diagnosticsEnabled = isDiagnosticsEnabled(cfg);
|
|
const channel = String(ctx.Surface ?? ctx.Provider ?? "unknown").toLowerCase();
|
|
const chatId = ctx.To ?? ctx.From;
|
|
const messageId = ctx.MessageSid ?? ctx.MessageSidFirst ?? ctx.MessageSidLast;
|
|
const sessionKey = ctx.SessionKey;
|
|
const startTime = diagnosticsEnabled ? Date.now() : 0;
|
|
const canTrackSession = diagnosticsEnabled && Boolean(sessionKey);
|
|
|
|
const recordProcessed = (
|
|
outcome: "completed" | "skipped" | "error",
|
|
opts?: {
|
|
reason?: string;
|
|
error?: string;
|
|
},
|
|
) => {
|
|
if (!diagnosticsEnabled) {
|
|
return;
|
|
}
|
|
logMessageProcessed({
|
|
channel,
|
|
chatId,
|
|
messageId,
|
|
sessionKey,
|
|
durationMs: Date.now() - startTime,
|
|
outcome,
|
|
reason: opts?.reason,
|
|
error: opts?.error,
|
|
});
|
|
};
|
|
|
|
const markProcessing = () => {
|
|
if (!canTrackSession || !sessionKey) {
|
|
return;
|
|
}
|
|
logMessageQueued({ sessionKey, channel, source: "dispatch" });
|
|
logSessionStateChange({
|
|
sessionKey,
|
|
state: "processing",
|
|
reason: "message_start",
|
|
});
|
|
};
|
|
|
|
const markIdle = (reason: string) => {
|
|
if (!canTrackSession || !sessionKey) {
|
|
return;
|
|
}
|
|
logSessionStateChange({
|
|
sessionKey,
|
|
state: "idle",
|
|
reason,
|
|
});
|
|
};
|
|
|
|
if (shouldSkipDuplicateInbound(ctx)) {
|
|
recordProcessed("skipped", { reason: "duplicate" });
|
|
return { queuedFinal: false, counts: dispatcher.getQueuedCounts() };
|
|
}
|
|
|
|
const sessionStoreEntry = resolveSessionStoreLookup(ctx, cfg);
|
|
const acpDispatchSessionKey = sessionStoreEntry.sessionKey ?? sessionKey;
|
|
// Restore route thread context only from the active turn or the thread-scoped session key.
|
|
// Do not read thread ids from the normalised session store here: `origin.threadId` can be
|
|
// folded back into lastThreadId/deliveryContext during store normalisation and resurrect a
|
|
// stale route after thread delivery was intentionally cleared.
|
|
const routeThreadId =
|
|
ctx.MessageThreadId ?? parseSessionThreadInfo(acpDispatchSessionKey).threadId;
|
|
const inboundAudio = isInboundAudioContext(ctx);
|
|
const sessionTtsAuto = normalizeTtsAutoMode(sessionStoreEntry.entry?.ttsAuto);
|
|
const hookRunner = getGlobalHookRunner();
|
|
|
|
// Extract message context for hooks (plugin and internal)
|
|
const timestamp =
|
|
typeof ctx.Timestamp === "number" && Number.isFinite(ctx.Timestamp) ? ctx.Timestamp : undefined;
|
|
const messageIdForHook =
|
|
ctx.MessageSidFull ?? ctx.MessageSid ?? ctx.MessageSidFirst ?? ctx.MessageSidLast;
|
|
const hookContext = deriveInboundMessageHookContext(ctx, { messageId: messageIdForHook });
|
|
const { isGroup, groupId } = hookContext;
|
|
|
|
// Trigger plugin hooks (fire-and-forget)
|
|
if (hookRunner?.hasHooks("message_received")) {
|
|
fireAndForgetHook(
|
|
hookRunner.runMessageReceived(
|
|
toPluginMessageReceivedEvent(hookContext),
|
|
toPluginMessageContext(hookContext),
|
|
),
|
|
"dispatch-from-config: message_received plugin hook failed",
|
|
);
|
|
}
|
|
|
|
// Bridge to internal hooks (HOOK.md discovery system) - refs #8807
|
|
if (sessionKey) {
|
|
fireAndForgetHook(
|
|
triggerInternalHook(
|
|
createInternalHookEvent("message", "received", sessionKey, {
|
|
...toInternalMessageReceivedContext(hookContext),
|
|
timestamp,
|
|
}),
|
|
),
|
|
"dispatch-from-config: message_received internal hook failed",
|
|
);
|
|
}
|
|
|
|
// Check if we should route replies to originating channel instead of dispatcher.
|
|
// Only route when the originating channel is DIFFERENT from the current surface.
|
|
// This handles cross-provider routing (e.g., message from Telegram being processed
|
|
// by a shared session that's currently on Slack) while preserving normal dispatcher
|
|
// flow when the provider handles its own messages.
|
|
//
|
|
// Debug: `pnpm test src/auto-reply/reply/dispatch-from-config.test.ts`
|
|
const originatingChannel = normalizeMessageChannel(ctx.OriginatingChannel);
|
|
const originatingTo = ctx.OriginatingTo;
|
|
const providerChannel = normalizeMessageChannel(ctx.Provider);
|
|
const surfaceChannel = normalizeMessageChannel(ctx.Surface);
|
|
// Prefer provider channel because surface may carry origin metadata in relayed flows.
|
|
const currentSurface = providerChannel ?? surfaceChannel;
|
|
const isInternalWebchatTurn =
|
|
currentSurface === INTERNAL_MESSAGE_CHANNEL &&
|
|
(surfaceChannel === INTERNAL_MESSAGE_CHANNEL || !surfaceChannel) &&
|
|
ctx.ExplicitDeliverRoute !== true;
|
|
const shouldRouteToOriginating = Boolean(
|
|
!isInternalWebchatTurn &&
|
|
isRoutableChannel(originatingChannel) &&
|
|
originatingTo &&
|
|
originatingChannel !== currentSurface,
|
|
);
|
|
const shouldSuppressTyping =
|
|
shouldRouteToOriginating || originatingChannel === INTERNAL_MESSAGE_CHANNEL;
|
|
const ttsChannel = shouldRouteToOriginating ? originatingChannel : currentSurface;
|
|
|
|
/**
|
|
* Helper to send a payload via route-reply (async).
|
|
* Only used when actually routing to a different provider.
|
|
* Note: Only called when shouldRouteToOriginating is true, so
|
|
* originatingChannel and originatingTo are guaranteed to be defined.
|
|
*/
|
|
const sendPayloadAsync = async (
|
|
payload: ReplyPayload,
|
|
abortSignal?: AbortSignal,
|
|
mirror?: boolean,
|
|
): Promise<void> => {
|
|
// TypeScript doesn't narrow these from the shouldRouteToOriginating check,
|
|
// but they're guaranteed non-null when this function is called.
|
|
if (!originatingChannel || !originatingTo) {
|
|
return;
|
|
}
|
|
if (abortSignal?.aborted) {
|
|
return;
|
|
}
|
|
const result = await routeReply({
|
|
payload,
|
|
channel: originatingChannel,
|
|
to: originatingTo,
|
|
sessionKey: ctx.SessionKey,
|
|
accountId: ctx.AccountId,
|
|
threadId: routeThreadId,
|
|
cfg,
|
|
abortSignal,
|
|
mirror,
|
|
isGroup,
|
|
groupId,
|
|
});
|
|
if (!result.ok) {
|
|
logVerbose(`dispatch-from-config: route-reply failed: ${result.error ?? "unknown error"}`);
|
|
}
|
|
};
|
|
|
|
markProcessing();
|
|
|
|
try {
|
|
const fastAbort = await tryFastAbortFromMessage({ ctx, cfg });
|
|
if (fastAbort.handled) {
|
|
const payload = {
|
|
text: formatAbortReplyText(fastAbort.stoppedSubagents),
|
|
} satisfies ReplyPayload;
|
|
let queuedFinal = false;
|
|
let routedFinalCount = 0;
|
|
if (shouldRouteToOriginating && originatingChannel && originatingTo) {
|
|
const result = await routeReply({
|
|
payload,
|
|
channel: originatingChannel,
|
|
to: originatingTo,
|
|
sessionKey: ctx.SessionKey,
|
|
accountId: ctx.AccountId,
|
|
threadId: routeThreadId,
|
|
cfg,
|
|
isGroup,
|
|
groupId,
|
|
});
|
|
queuedFinal = result.ok;
|
|
if (result.ok) {
|
|
routedFinalCount += 1;
|
|
}
|
|
if (!result.ok) {
|
|
logVerbose(
|
|
`dispatch-from-config: route-reply (abort) failed: ${result.error ?? "unknown error"}`,
|
|
);
|
|
}
|
|
} else {
|
|
queuedFinal = dispatcher.sendFinalReply(payload);
|
|
}
|
|
const counts = dispatcher.getQueuedCounts();
|
|
counts.final += routedFinalCount;
|
|
recordProcessed("completed", { reason: "fast_abort" });
|
|
markIdle("message_completed");
|
|
return { queuedFinal, counts };
|
|
}
|
|
|
|
const bypassAcpForCommand = shouldBypassAcpDispatchForCommand(ctx, cfg);
|
|
|
|
const sendPolicy = resolveSendPolicy({
|
|
cfg,
|
|
entry: sessionStoreEntry.entry,
|
|
sessionKey: sessionStoreEntry.sessionKey ?? sessionKey,
|
|
channel:
|
|
sessionStoreEntry.entry?.channel ??
|
|
ctx.OriginatingChannel ??
|
|
ctx.Surface ??
|
|
ctx.Provider ??
|
|
undefined,
|
|
chatType: sessionStoreEntry.entry?.chatType,
|
|
});
|
|
if (sendPolicy === "deny" && !bypassAcpForCommand) {
|
|
logVerbose(
|
|
`Send blocked by policy for session ${sessionStoreEntry.sessionKey ?? sessionKey ?? "unknown"}`,
|
|
);
|
|
const counts = dispatcher.getQueuedCounts();
|
|
recordProcessed("completed", { reason: "send_policy_deny" });
|
|
markIdle("message_completed");
|
|
return { queuedFinal: false, counts };
|
|
}
|
|
|
|
const shouldSendToolSummaries = ctx.ChatType !== "group" && ctx.CommandSource !== "native";
|
|
const acpDispatch = await tryDispatchAcpReply({
|
|
ctx,
|
|
cfg,
|
|
dispatcher,
|
|
sessionKey: acpDispatchSessionKey,
|
|
inboundAudio,
|
|
sessionTtsAuto,
|
|
ttsChannel,
|
|
shouldRouteToOriginating,
|
|
originatingChannel,
|
|
originatingTo,
|
|
shouldSendToolSummaries,
|
|
bypassForCommand: bypassAcpForCommand,
|
|
onReplyStart: params.replyOptions?.onReplyStart,
|
|
recordProcessed,
|
|
markIdle,
|
|
});
|
|
if (acpDispatch) {
|
|
return acpDispatch;
|
|
}
|
|
|
|
// Track accumulated block text for TTS generation after streaming completes.
|
|
// When block streaming succeeds, there's no final reply, so we need to generate
|
|
// TTS audio separately from the accumulated block content.
|
|
let accumulatedBlockText = "";
|
|
let blockCount = 0;
|
|
|
|
const resolveToolDeliveryPayload = (payload: ReplyPayload): ReplyPayload | null => {
|
|
if (
|
|
normalizeMessageChannel(ctx.Surface ?? ctx.Provider) === "discord" &&
|
|
shouldSuppressLocalDiscordExecApprovalPrompt({
|
|
cfg,
|
|
accountId: ctx.AccountId,
|
|
payload,
|
|
})
|
|
) {
|
|
return null;
|
|
}
|
|
if (shouldSendToolSummaries) {
|
|
return payload;
|
|
}
|
|
const execApproval =
|
|
payload.channelData &&
|
|
typeof payload.channelData === "object" &&
|
|
!Array.isArray(payload.channelData)
|
|
? payload.channelData.execApproval
|
|
: undefined;
|
|
if (execApproval && typeof execApproval === "object" && !Array.isArray(execApproval)) {
|
|
return payload;
|
|
}
|
|
// Group/native flows intentionally suppress tool summary text, but media-only
|
|
// tool results (for example TTS audio) must still be delivered.
|
|
const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
|
|
if (!hasMedia) {
|
|
return null;
|
|
}
|
|
return { ...payload, text: undefined };
|
|
};
|
|
const typing = resolveRunTypingPolicy({
|
|
requestedPolicy: params.replyOptions?.typingPolicy,
|
|
suppressTyping: params.replyOptions?.suppressTyping === true || shouldSuppressTyping,
|
|
originatingChannel,
|
|
systemEvent: shouldRouteToOriginating,
|
|
});
|
|
|
|
const replyResult = await (params.replyResolver ?? getReplyFromConfig)(
|
|
ctx,
|
|
{
|
|
...params.replyOptions,
|
|
typingPolicy: typing.typingPolicy,
|
|
suppressTyping: typing.suppressTyping,
|
|
onToolResult: (payload: ReplyPayload) => {
|
|
const run = async () => {
|
|
const ttsPayload = await maybeApplyTtsToPayload({
|
|
payload,
|
|
cfg,
|
|
channel: ttsChannel,
|
|
kind: "tool",
|
|
inboundAudio,
|
|
ttsAuto: sessionTtsAuto,
|
|
});
|
|
const deliveryPayload = resolveToolDeliveryPayload(ttsPayload);
|
|
if (!deliveryPayload) {
|
|
return;
|
|
}
|
|
if (shouldRouteToOriginating) {
|
|
await sendPayloadAsync(deliveryPayload, undefined, false);
|
|
} else {
|
|
dispatcher.sendToolResult(deliveryPayload);
|
|
}
|
|
};
|
|
return run();
|
|
},
|
|
onBlockReply: (payload: ReplyPayload, context) => {
|
|
const run = async () => {
|
|
// Suppress reasoning payloads — channels using this generic dispatch
|
|
// path (WhatsApp, web, etc.) do not have a dedicated reasoning lane.
|
|
// Telegram has its own dispatch path that handles reasoning splitting.
|
|
if (shouldSuppressReasoningPayload(payload)) {
|
|
return;
|
|
}
|
|
// Accumulate block text for TTS generation after streaming
|
|
if (payload.text) {
|
|
if (accumulatedBlockText.length > 0) {
|
|
accumulatedBlockText += "\n";
|
|
}
|
|
accumulatedBlockText += payload.text;
|
|
blockCount++;
|
|
}
|
|
const ttsPayload = await maybeApplyTtsToPayload({
|
|
payload,
|
|
cfg,
|
|
channel: ttsChannel,
|
|
kind: "block",
|
|
inboundAudio,
|
|
ttsAuto: sessionTtsAuto,
|
|
});
|
|
if (shouldRouteToOriginating) {
|
|
await sendPayloadAsync(ttsPayload, context?.abortSignal, false);
|
|
} else {
|
|
dispatcher.sendBlockReply(ttsPayload);
|
|
}
|
|
};
|
|
return run();
|
|
},
|
|
},
|
|
cfg,
|
|
);
|
|
|
|
if (ctx.AcpDispatchTailAfterReset === true) {
|
|
// Command handling prepared a trailing prompt after ACP in-place reset.
|
|
// Route that tail through ACP now (same turn) instead of embedded dispatch.
|
|
ctx.AcpDispatchTailAfterReset = false;
|
|
const acpTailDispatch = await tryDispatchAcpReply({
|
|
ctx,
|
|
cfg,
|
|
dispatcher,
|
|
sessionKey: acpDispatchSessionKey,
|
|
inboundAudio,
|
|
sessionTtsAuto,
|
|
ttsChannel,
|
|
shouldRouteToOriginating,
|
|
originatingChannel,
|
|
originatingTo,
|
|
shouldSendToolSummaries,
|
|
bypassForCommand: false,
|
|
onReplyStart: params.replyOptions?.onReplyStart,
|
|
recordProcessed,
|
|
markIdle,
|
|
});
|
|
if (acpTailDispatch) {
|
|
return acpTailDispatch;
|
|
}
|
|
}
|
|
|
|
const replies = replyResult ? (Array.isArray(replyResult) ? replyResult : [replyResult]) : [];
|
|
|
|
let queuedFinal = false;
|
|
let routedFinalCount = 0;
|
|
for (const reply of replies) {
|
|
// Suppress reasoning payloads from channel delivery — channels using this
|
|
// generic dispatch path do not have a dedicated reasoning lane.
|
|
if (shouldSuppressReasoningPayload(reply)) {
|
|
continue;
|
|
}
|
|
const ttsReply = await maybeApplyTtsToPayload({
|
|
payload: reply,
|
|
cfg,
|
|
channel: ttsChannel,
|
|
kind: "final",
|
|
inboundAudio,
|
|
ttsAuto: sessionTtsAuto,
|
|
});
|
|
if (shouldRouteToOriginating && originatingChannel && originatingTo) {
|
|
// Route final reply to originating channel.
|
|
const result = await routeReply({
|
|
payload: ttsReply,
|
|
channel: originatingChannel,
|
|
to: originatingTo,
|
|
sessionKey: ctx.SessionKey,
|
|
accountId: ctx.AccountId,
|
|
threadId: routeThreadId,
|
|
cfg,
|
|
isGroup,
|
|
groupId,
|
|
});
|
|
if (!result.ok) {
|
|
logVerbose(
|
|
`dispatch-from-config: route-reply (final) failed: ${result.error ?? "unknown error"}`,
|
|
);
|
|
}
|
|
queuedFinal = result.ok || queuedFinal;
|
|
if (result.ok) {
|
|
routedFinalCount += 1;
|
|
}
|
|
} else {
|
|
queuedFinal = dispatcher.sendFinalReply(ttsReply) || queuedFinal;
|
|
}
|
|
}
|
|
|
|
const ttsMode = resolveTtsConfig(cfg).mode ?? "final";
|
|
// Generate TTS-only reply after block streaming completes (when there's no final reply).
|
|
// This handles the case where block streaming succeeds and drops final payloads,
|
|
// but we still want TTS audio to be generated from the accumulated block content.
|
|
if (
|
|
ttsMode === "final" &&
|
|
replies.length === 0 &&
|
|
blockCount > 0 &&
|
|
accumulatedBlockText.trim()
|
|
) {
|
|
try {
|
|
const ttsSyntheticReply = await maybeApplyTtsToPayload({
|
|
payload: { text: accumulatedBlockText },
|
|
cfg,
|
|
channel: ttsChannel,
|
|
kind: "final",
|
|
inboundAudio,
|
|
ttsAuto: sessionTtsAuto,
|
|
});
|
|
// Only send if TTS was actually applied (mediaUrl exists)
|
|
if (ttsSyntheticReply.mediaUrl) {
|
|
// Send TTS-only payload (no text, just audio) so it doesn't duplicate the block content
|
|
const ttsOnlyPayload: ReplyPayload = {
|
|
mediaUrl: ttsSyntheticReply.mediaUrl,
|
|
audioAsVoice: ttsSyntheticReply.audioAsVoice,
|
|
};
|
|
if (shouldRouteToOriginating && originatingChannel && originatingTo) {
|
|
const result = await routeReply({
|
|
payload: ttsOnlyPayload,
|
|
channel: originatingChannel,
|
|
to: originatingTo,
|
|
sessionKey: ctx.SessionKey,
|
|
accountId: ctx.AccountId,
|
|
threadId: routeThreadId,
|
|
cfg,
|
|
isGroup,
|
|
groupId,
|
|
});
|
|
queuedFinal = result.ok || queuedFinal;
|
|
if (result.ok) {
|
|
routedFinalCount += 1;
|
|
}
|
|
if (!result.ok) {
|
|
logVerbose(
|
|
`dispatch-from-config: route-reply (tts-only) failed: ${result.error ?? "unknown error"}`,
|
|
);
|
|
}
|
|
} else {
|
|
const didQueue = dispatcher.sendFinalReply(ttsOnlyPayload);
|
|
queuedFinal = didQueue || queuedFinal;
|
|
}
|
|
}
|
|
} catch (err) {
|
|
logVerbose(
|
|
`dispatch-from-config: accumulated block TTS failed: ${err instanceof Error ? err.message : String(err)}`,
|
|
);
|
|
}
|
|
}
|
|
|
|
const counts = dispatcher.getQueuedCounts();
|
|
counts.final += routedFinalCount;
|
|
recordProcessed("completed");
|
|
markIdle("message_completed");
|
|
return { queuedFinal, counts };
|
|
} catch (err) {
|
|
recordProcessed("error", { error: String(err) });
|
|
markIdle("message_error");
|
|
throw err;
|
|
}
|
|
}
|