openclaw/src/gateway/server-methods/chat.ts

1755 lines
56 KiB
TypeScript

import fs from "node:fs";
import path from "node:path";
import { CURRENT_SESSION_VERSION, SessionManager } from "@mariozechner/pi-coding-agent";
import { resolveSessionAgentId } from "../../agents/agent-scope.js";
import { resolveThinkingDefault } from "../../agents/model-selection.js";
import { rewriteTranscriptEntriesInSessionFile } from "../../agents/pi-embedded-runner/transcript-rewrite.js";
import { resolveAgentTimeoutMs } from "../../agents/timeout.js";
import { dispatchInboundMessage } from "../../auto-reply/dispatch.js";
import { createReplyDispatcher } from "../../auto-reply/reply/reply-dispatcher.js";
import type { MsgContext } from "../../auto-reply/templating.js";
import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../../auto-reply/tokens.js";
import type { ReplyPayload } from "../../auto-reply/types.js";
import { resolveSessionFilePath } from "../../config/sessions.js";
import { jsonUtf8Bytes } from "../../infra/json-utf8-bytes.js";
import { type SavedMedia, saveMediaBuffer } from "../../media/store.js";
import { createChannelReplyPipeline } from "../../plugin-sdk/channel-reply-pipeline.js";
import { normalizeInputProvenance, type InputProvenance } from "../../sessions/input-provenance.js";
import { resolveSendPolicy } from "../../sessions/send-policy.js";
import { parseAgentSessionKey } from "../../sessions/session-key-utils.js";
import { emitSessionTranscriptUpdate } from "../../sessions/transcript-events.js";
import {
stripInlineDirectiveTagsForDisplay,
stripInlineDirectiveTagsFromMessageForDisplay,
} from "../../utils/directive-tags.js";
import {
INTERNAL_MESSAGE_CHANNEL,
isGatewayCliClient,
isWebchatClient,
normalizeMessageChannel,
} from "../../utils/message-channel.js";
import {
abortChatRunById,
type ChatAbortControllerEntry,
type ChatAbortOps,
isChatStopCommandText,
resolveChatRunExpiresAtMs,
} from "../chat-abort.js";
import { type ChatImageContent, parseMessageWithAttachments } from "../chat-attachments.js";
import { stripEnvelopeFromMessage, stripEnvelopeFromMessages } from "../chat-sanitize.js";
import { ADMIN_SCOPE } from "../method-scopes.js";
import {
GATEWAY_CLIENT_CAPS,
GATEWAY_CLIENT_MODES,
GATEWAY_CLIENT_NAMES,
hasGatewayClientCap,
} from "../protocol/client-info.js";
import {
ErrorCodes,
errorShape,
formatValidationErrors,
validateChatAbortParams,
validateChatHistoryParams,
validateChatInjectParams,
validateChatSendParams,
} from "../protocol/index.js";
import { CHAT_SEND_SESSION_KEY_MAX_LENGTH } from "../protocol/schema/primitives.js";
import { getMaxChatHistoryMessagesBytes } from "../server-constants.js";
import {
capArrayByJsonBytes,
loadSessionEntry,
readSessionMessages,
resolveSessionModelRef,
} from "../session-utils.js";
import { formatForLog } from "../ws-log.js";
import { injectTimestamp, timestampOptsFromConfig } from "./agent-timestamp.js";
import { setGatewayDedupeEntry } from "./agent-wait-dedupe.js";
import { normalizeRpcAttachmentsToChatAttachments } from "./attachment-normalize.js";
import { appendInjectedAssistantMessageToTranscript } from "./chat-transcript-inject.js";
import type {
GatewayRequestContext,
GatewayRequestHandlerOptions,
GatewayRequestHandlers,
} from "./types.js";
type TranscriptAppendResult = {
ok: boolean;
messageId?: string;
message?: Record<string, unknown>;
error?: string;
};
type AbortOrigin = "rpc" | "stop-command";
type AbortedPartialSnapshot = {
runId: string;
sessionId: string;
text: string;
abortOrigin: AbortOrigin;
};
type ChatAbortRequester = {
connId?: string;
deviceId?: string;
isAdmin: boolean;
};
const CHAT_HISTORY_TEXT_MAX_CHARS = 12_000;
const CHAT_HISTORY_MAX_SINGLE_MESSAGE_BYTES = 128 * 1024;
const CHAT_HISTORY_OVERSIZED_PLACEHOLDER = "[chat.history omitted: message too large]";
let chatHistoryPlaceholderEmitCount = 0;
const CHANNEL_AGNOSTIC_SESSION_SCOPES = new Set([
"main",
"direct",
"dm",
"group",
"channel",
"cron",
"run",
"subagent",
"acp",
"thread",
"topic",
]);
const CHANNEL_SCOPED_SESSION_SHAPES = new Set(["direct", "dm", "group", "channel"]);
type ChatSendDeliveryEntry = {
deliveryContext?: {
channel?: string;
to?: string;
accountId?: string;
threadId?: string | number;
};
lastChannel?: string;
lastTo?: string;
lastAccountId?: string;
lastThreadId?: string | number;
};
type ChatSendOriginatingRoute = {
originatingChannel: string;
originatingTo?: string;
accountId?: string;
messageThreadId?: string | number;
explicitDeliverRoute: boolean;
};
type SideResultPayload = {
kind: "btw";
runId: string;
sessionKey: string;
question: string;
text: string;
isError?: boolean;
ts: number;
};
function resolveChatSendOriginatingRoute(params: {
client?: { mode?: string | null; id?: string | null } | null;
deliver?: boolean;
entry?: ChatSendDeliveryEntry;
hasConnectedClient?: boolean;
mainKey?: string;
sessionKey: string;
}): ChatSendOriginatingRoute {
const shouldDeliverExternally = params.deliver === true;
if (!shouldDeliverExternally) {
return {
originatingChannel: INTERNAL_MESSAGE_CHANNEL,
explicitDeliverRoute: false,
};
}
const routeChannelCandidate = normalizeMessageChannel(
params.entry?.deliveryContext?.channel ?? params.entry?.lastChannel,
);
const routeToCandidate = params.entry?.deliveryContext?.to ?? params.entry?.lastTo;
const routeAccountIdCandidate =
params.entry?.deliveryContext?.accountId ?? params.entry?.lastAccountId ?? undefined;
const routeThreadIdCandidate =
params.entry?.deliveryContext?.threadId ?? params.entry?.lastThreadId;
if (params.sessionKey.length > CHAT_SEND_SESSION_KEY_MAX_LENGTH) {
return {
originatingChannel: INTERNAL_MESSAGE_CHANNEL,
explicitDeliverRoute: false,
};
}
const parsedSessionKey = parseAgentSessionKey(params.sessionKey);
const sessionScopeParts = (parsedSessionKey?.rest ?? params.sessionKey)
.split(":", 3)
.filter(Boolean);
const sessionScopeHead = sessionScopeParts[0];
const sessionChannelHint = normalizeMessageChannel(sessionScopeHead);
const normalizedSessionScopeHead = (sessionScopeHead ?? "").trim().toLowerCase();
const sessionPeerShapeCandidates = [sessionScopeParts[1], sessionScopeParts[2]]
.map((part) => (part ?? "").trim().toLowerCase())
.filter(Boolean);
const isChannelAgnosticSessionScope = CHANNEL_AGNOSTIC_SESSION_SCOPES.has(
normalizedSessionScopeHead,
);
const isChannelScopedSession = sessionPeerShapeCandidates.some((part) =>
CHANNEL_SCOPED_SESSION_SHAPES.has(part),
);
const hasLegacyChannelPeerShape =
!isChannelScopedSession &&
typeof sessionScopeParts[1] === "string" &&
sessionChannelHint === routeChannelCandidate;
const isFromWebchatClient = isWebchatClient(params.client);
const isFromGatewayCliClient = isGatewayCliClient(params.client);
const hasClientMetadata =
(typeof params.client?.mode === "string" && params.client.mode.trim().length > 0) ||
(typeof params.client?.id === "string" && params.client.id.trim().length > 0);
const configuredMainKey = (params.mainKey ?? "main").trim().toLowerCase();
const isConfiguredMainSessionScope =
normalizedSessionScopeHead.length > 0 && normalizedSessionScopeHead === configuredMainKey;
const canInheritConfiguredMainRoute =
isConfiguredMainSessionScope &&
params.hasConnectedClient &&
(isFromGatewayCliClient || !hasClientMetadata);
// Webchat clients never inherit external delivery routes. Configured-main
// sessions are stricter than channel-scoped sessions: only CLI callers, or
// legacy callers with no client metadata, may inherit the last external route.
const canInheritDeliverableRoute = Boolean(
!isFromWebchatClient &&
sessionChannelHint &&
sessionChannelHint !== INTERNAL_MESSAGE_CHANNEL &&
((!isChannelAgnosticSessionScope && (isChannelScopedSession || hasLegacyChannelPeerShape)) ||
canInheritConfiguredMainRoute),
);
const hasDeliverableRoute =
canInheritDeliverableRoute &&
routeChannelCandidate &&
routeChannelCandidate !== INTERNAL_MESSAGE_CHANNEL &&
typeof routeToCandidate === "string" &&
routeToCandidate.trim().length > 0;
if (!hasDeliverableRoute) {
return {
originatingChannel: INTERNAL_MESSAGE_CHANNEL,
explicitDeliverRoute: false,
};
}
return {
originatingChannel: routeChannelCandidate,
originatingTo: routeToCandidate,
accountId: routeAccountIdCandidate,
messageThreadId: routeThreadIdCandidate,
explicitDeliverRoute: true,
};
}
function stripDisallowedChatControlChars(message: string): string {
let output = "";
for (const char of message) {
const code = char.charCodeAt(0);
if (code === 9 || code === 10 || code === 13 || (code >= 32 && code !== 127)) {
output += char;
}
}
return output;
}
export function sanitizeChatSendMessageInput(
message: string,
): { ok: true; message: string } | { ok: false; error: string } {
const normalized = message.normalize("NFC");
if (normalized.includes("\u0000")) {
return { ok: false, error: "message must not contain null bytes" };
}
return { ok: true, message: stripDisallowedChatControlChars(normalized) };
}
function normalizeOptionalChatSystemReceipt(
value: unknown,
): { ok: true; receipt?: string } | { ok: false; error: string } {
if (value == null) {
return { ok: true };
}
if (typeof value !== "string") {
return { ok: false, error: "systemProvenanceReceipt must be a string" };
}
const sanitized = sanitizeChatSendMessageInput(value);
if (!sanitized.ok) {
return sanitized;
}
const receipt = sanitized.message.trim();
return { ok: true, receipt: receipt || undefined };
}
function isAcpBridgeClient(client: GatewayRequestHandlerOptions["client"]): boolean {
const info = client?.connect?.client;
return (
info?.id === GATEWAY_CLIENT_NAMES.CLI &&
info?.mode === GATEWAY_CLIENT_MODES.CLI &&
info?.displayName === "ACP" &&
info?.version === "acp"
);
}
async function persistChatSendImages(params: {
images: ChatImageContent[];
client: GatewayRequestHandlerOptions["client"];
logGateway: GatewayRequestContext["logGateway"];
}): Promise<SavedMedia[]> {
if (params.images.length === 0 || isAcpBridgeClient(params.client)) {
return [];
}
const saved: SavedMedia[] = [];
for (const img of params.images) {
try {
saved.push(await saveMediaBuffer(Buffer.from(img.data, "base64"), img.mimeType, "inbound"));
} catch (err) {
params.logGateway.warn(
`chat.send: failed to persist inbound image (${img.mimeType}): ${formatForLog(err)}`,
);
}
}
return saved;
}
function buildChatSendTranscriptMessage(params: {
message: string;
savedImages: SavedMedia[];
timestamp: number;
}) {
const mediaFields = resolveChatSendTranscriptMediaFields(params.savedImages);
return {
role: "user" as const,
content: params.message,
timestamp: params.timestamp,
...mediaFields,
};
}
function resolveChatSendTranscriptMediaFields(savedImages: SavedMedia[]) {
const mediaPaths = savedImages.map((entry) => entry.path);
if (mediaPaths.length === 0) {
return {};
}
const mediaTypes = savedImages.map((entry) => entry.contentType ?? "application/octet-stream");
return {
MediaPath: mediaPaths[0],
MediaPaths: mediaPaths,
MediaType: mediaTypes[0],
MediaTypes: mediaTypes,
};
}
function extractTranscriptUserText(content: unknown): string | undefined {
if (typeof content === "string") {
return content;
}
if (!Array.isArray(content)) {
return undefined;
}
const textBlocks = content
.map((block) =>
block && typeof block === "object" && "text" in block ? block.text : undefined,
)
.filter((text): text is string => typeof text === "string");
return textBlocks.length > 0 ? textBlocks.join("") : undefined;
}
async function rewriteChatSendUserTurnMediaPaths(params: {
transcriptPath: string;
sessionKey: string;
message: string;
savedImages: SavedMedia[];
}) {
const mediaFields = resolveChatSendTranscriptMediaFields(params.savedImages);
if (!("MediaPath" in mediaFields)) {
return;
}
const sessionManager = SessionManager.open(params.transcriptPath);
const branch = sessionManager.getBranch();
const target = [...branch].toReversed().find((entry) => {
if (entry.type !== "message" || entry.message.role !== "user") {
return false;
}
const existingPaths = Array.isArray((entry.message as { MediaPaths?: unknown }).MediaPaths)
? (entry.message as { MediaPaths?: unknown[] }).MediaPaths
: undefined;
if (
(typeof (entry.message as { MediaPath?: unknown }).MediaPath === "string" &&
(entry.message as { MediaPath?: string }).MediaPath) ||
(existingPaths && existingPaths.length > 0)
) {
return false;
}
return (
extractTranscriptUserText((entry.message as { content?: unknown }).content) === params.message
);
});
if (!target || target.type !== "message") {
return;
}
const rewrittenMessage = {
...target.message,
...mediaFields,
};
await rewriteTranscriptEntriesInSessionFile({
sessionFile: params.transcriptPath,
sessionKey: params.sessionKey,
request: {
replacements: [
{
entryId: target.id,
message: rewrittenMessage,
},
],
},
});
}
function truncateChatHistoryText(text: string): { text: string; truncated: boolean } {
if (text.length <= CHAT_HISTORY_TEXT_MAX_CHARS) {
return { text, truncated: false };
}
return {
text: `${text.slice(0, CHAT_HISTORY_TEXT_MAX_CHARS)}\n...(truncated)...`,
truncated: true,
};
}
function sanitizeChatHistoryContentBlock(block: unknown): { block: unknown; changed: boolean } {
if (!block || typeof block !== "object") {
return { block, changed: false };
}
const entry = { ...(block as Record<string, unknown>) };
let changed = false;
if (typeof entry.text === "string") {
const stripped = stripInlineDirectiveTagsForDisplay(entry.text);
const res = truncateChatHistoryText(stripped.text);
entry.text = res.text;
changed ||= stripped.changed || res.truncated;
}
if (typeof entry.partialJson === "string") {
const res = truncateChatHistoryText(entry.partialJson);
entry.partialJson = res.text;
changed ||= res.truncated;
}
if (typeof entry.arguments === "string") {
const res = truncateChatHistoryText(entry.arguments);
entry.arguments = res.text;
changed ||= res.truncated;
}
if (typeof entry.thinking === "string") {
const res = truncateChatHistoryText(entry.thinking);
entry.thinking = res.text;
changed ||= res.truncated;
}
if ("thinkingSignature" in entry) {
delete entry.thinkingSignature;
changed = true;
}
const type = typeof entry.type === "string" ? entry.type : "";
if (type === "image" && typeof entry.data === "string") {
const bytes = Buffer.byteLength(entry.data, "utf8");
delete entry.data;
entry.omitted = true;
entry.bytes = bytes;
changed = true;
}
return { block: changed ? entry : block, changed };
}
/**
* Validate that a value is a finite number, returning undefined otherwise.
*/
function toFiniteNumber(x: unknown): number | undefined {
return typeof x === "number" && Number.isFinite(x) ? x : undefined;
}
/**
* Sanitize usage metadata to ensure only finite numeric fields are included.
* Prevents UI crashes from malformed transcript JSON.
*/
function sanitizeUsage(raw: unknown): Record<string, number> | undefined {
if (!raw || typeof raw !== "object") {
return undefined;
}
const u = raw as Record<string, unknown>;
const out: Record<string, number> = {};
// Whitelist known usage fields and validate they're finite numbers
const knownFields = [
"input",
"output",
"totalTokens",
"inputTokens",
"outputTokens",
"cacheRead",
"cacheWrite",
"cache_read_input_tokens",
"cache_creation_input_tokens",
];
for (const k of knownFields) {
const n = toFiniteNumber(u[k]);
if (n !== undefined) {
out[k] = n;
}
}
// Preserve nested usage.cost when present
if ("cost" in u && u.cost != null && typeof u.cost === "object") {
const sanitizedCost = sanitizeCost(u.cost);
if (sanitizedCost) {
(out as Record<string, unknown>).cost = sanitizedCost;
}
}
return Object.keys(out).length > 0 ? out : undefined;
}
/**
* Sanitize cost metadata to ensure only finite numeric fields are included.
* Prevents UI crashes from calling .toFixed() on non-numbers.
*/
function sanitizeCost(raw: unknown): { total?: number } | undefined {
if (!raw || typeof raw !== "object") {
return undefined;
}
const c = raw as Record<string, unknown>;
const total = toFiniteNumber(c.total);
return total !== undefined ? { total } : undefined;
}
function sanitizeChatHistoryMessage(message: unknown): { message: unknown; changed: boolean } {
if (!message || typeof message !== "object") {
return { message, changed: false };
}
const entry = { ...(message as Record<string, unknown>) };
let changed = false;
if ("details" in entry) {
delete entry.details;
changed = true;
}
// Keep usage/cost so the chat UI can render per-message token and cost badges.
// Only retain usage/cost on assistant messages and validate numeric fields to prevent UI crashes.
if (entry.role !== "assistant") {
if ("usage" in entry) {
delete entry.usage;
changed = true;
}
if ("cost" in entry) {
delete entry.cost;
changed = true;
}
} else {
// Validate and sanitize usage/cost for assistant messages
if ("usage" in entry) {
const sanitized = sanitizeUsage(entry.usage);
if (sanitized) {
entry.usage = sanitized;
} else {
delete entry.usage;
}
changed = true;
}
if ("cost" in entry) {
const sanitized = sanitizeCost(entry.cost);
if (sanitized) {
entry.cost = sanitized;
} else {
delete entry.cost;
}
changed = true;
}
}
if (typeof entry.content === "string") {
const stripped = stripInlineDirectiveTagsForDisplay(entry.content);
const res = truncateChatHistoryText(stripped.text);
entry.content = res.text;
changed ||= stripped.changed || res.truncated;
} else if (Array.isArray(entry.content)) {
const updated = entry.content.map((block) => sanitizeChatHistoryContentBlock(block));
if (updated.some((item) => item.changed)) {
entry.content = updated.map((item) => item.block);
changed = true;
}
}
if (typeof entry.text === "string") {
const stripped = stripInlineDirectiveTagsForDisplay(entry.text);
const res = truncateChatHistoryText(stripped.text);
entry.text = res.text;
changed ||= stripped.changed || res.truncated;
}
return { message: changed ? entry : message, changed };
}
/**
* Extract the visible text from an assistant history message for silent-token checks.
* Returns `undefined` for non-assistant messages or messages with no extractable text.
* When `entry.text` is present it takes precedence over `entry.content` to avoid
* dropping messages that carry real text alongside a stale `content: "NO_REPLY"`.
*/
function extractAssistantTextForSilentCheck(message: unknown): string | undefined {
if (!message || typeof message !== "object") {
return undefined;
}
const entry = message as Record<string, unknown>;
if (entry.role !== "assistant") {
return undefined;
}
if (typeof entry.text === "string") {
return entry.text;
}
if (typeof entry.content === "string") {
return entry.content;
}
if (!Array.isArray(entry.content) || entry.content.length === 0) {
return undefined;
}
const texts: string[] = [];
for (const block of entry.content) {
if (!block || typeof block !== "object") {
return undefined;
}
const typed = block as { type?: unknown; text?: unknown };
if (typed.type !== "text" || typeof typed.text !== "string") {
return undefined;
}
texts.push(typed.text);
}
return texts.length > 0 ? texts.join("\n") : undefined;
}
function sanitizeChatHistoryMessages(messages: unknown[]): unknown[] {
if (messages.length === 0) {
return messages;
}
let changed = false;
const next: unknown[] = [];
for (const message of messages) {
const res = sanitizeChatHistoryMessage(message);
changed ||= res.changed;
// Drop assistant messages whose entire visible text is the silent reply token.
const text = extractAssistantTextForSilentCheck(res.message);
if (text !== undefined && isSilentReplyText(text, SILENT_REPLY_TOKEN)) {
changed = true;
continue;
}
next.push(res.message);
}
return changed ? next : messages;
}
function buildOversizedHistoryPlaceholder(message?: unknown): Record<string, unknown> {
const role =
message &&
typeof message === "object" &&
typeof (message as { role?: unknown }).role === "string"
? (message as { role: string }).role
: "assistant";
const timestamp =
message &&
typeof message === "object" &&
typeof (message as { timestamp?: unknown }).timestamp === "number"
? (message as { timestamp: number }).timestamp
: Date.now();
return {
role,
timestamp,
content: [{ type: "text", text: CHAT_HISTORY_OVERSIZED_PLACEHOLDER }],
__openclaw: { truncated: true, reason: "oversized" },
};
}
function replaceOversizedChatHistoryMessages(params: {
messages: unknown[];
maxSingleMessageBytes: number;
}): { messages: unknown[]; replacedCount: number } {
const { messages, maxSingleMessageBytes } = params;
if (messages.length === 0) {
return { messages, replacedCount: 0 };
}
let replacedCount = 0;
const next = messages.map((message) => {
if (jsonUtf8Bytes(message) <= maxSingleMessageBytes) {
return message;
}
replacedCount += 1;
return buildOversizedHistoryPlaceholder(message);
});
return { messages: replacedCount > 0 ? next : messages, replacedCount };
}
function enforceChatHistoryFinalBudget(params: { messages: unknown[]; maxBytes: number }): {
messages: unknown[];
placeholderCount: number;
} {
const { messages, maxBytes } = params;
if (messages.length === 0) {
return { messages, placeholderCount: 0 };
}
if (jsonUtf8Bytes(messages) <= maxBytes) {
return { messages, placeholderCount: 0 };
}
const last = messages.at(-1);
if (last && jsonUtf8Bytes([last]) <= maxBytes) {
return { messages: [last], placeholderCount: 0 };
}
const placeholder = buildOversizedHistoryPlaceholder(last);
if (jsonUtf8Bytes([placeholder]) <= maxBytes) {
return { messages: [placeholder], placeholderCount: 1 };
}
return { messages: [], placeholderCount: 0 };
}
function resolveTranscriptPath(params: {
sessionId: string;
storePath: string | undefined;
sessionFile?: string;
agentId?: string;
}): string | null {
const { sessionId, storePath, sessionFile, agentId } = params;
if (!storePath && !sessionFile) {
return null;
}
try {
const sessionsDir = storePath ? path.dirname(storePath) : undefined;
return resolveSessionFilePath(
sessionId,
sessionFile ? { sessionFile } : undefined,
sessionsDir || agentId ? { sessionsDir, agentId } : undefined,
);
} catch {
return null;
}
}
function ensureTranscriptFile(params: { transcriptPath: string; sessionId: string }): {
ok: boolean;
error?: string;
} {
if (fs.existsSync(params.transcriptPath)) {
return { ok: true };
}
try {
fs.mkdirSync(path.dirname(params.transcriptPath), { recursive: true });
const header = {
type: "session",
version: CURRENT_SESSION_VERSION,
id: params.sessionId,
timestamp: new Date().toISOString(),
cwd: process.cwd(),
};
fs.writeFileSync(params.transcriptPath, `${JSON.stringify(header)}\n`, {
encoding: "utf-8",
mode: 0o600,
});
return { ok: true };
} catch (err) {
return { ok: false, error: err instanceof Error ? err.message : String(err) };
}
}
function transcriptHasIdempotencyKey(transcriptPath: string, idempotencyKey: string): boolean {
try {
const lines = fs.readFileSync(transcriptPath, "utf-8").split(/\r?\n/);
for (const line of lines) {
if (!line.trim()) {
continue;
}
const parsed = JSON.parse(line) as { message?: { idempotencyKey?: unknown } };
if (parsed?.message?.idempotencyKey === idempotencyKey) {
return true;
}
}
return false;
} catch {
return false;
}
}
function appendAssistantTranscriptMessage(params: {
message: string;
label?: string;
sessionId: string;
storePath: string | undefined;
sessionFile?: string;
agentId?: string;
createIfMissing?: boolean;
idempotencyKey?: string;
abortMeta?: {
aborted: true;
origin: AbortOrigin;
runId: string;
};
}): TranscriptAppendResult {
const transcriptPath = resolveTranscriptPath({
sessionId: params.sessionId,
storePath: params.storePath,
sessionFile: params.sessionFile,
agentId: params.agentId,
});
if (!transcriptPath) {
return { ok: false, error: "transcript path not resolved" };
}
if (!fs.existsSync(transcriptPath)) {
if (!params.createIfMissing) {
return { ok: false, error: "transcript file not found" };
}
const ensured = ensureTranscriptFile({
transcriptPath,
sessionId: params.sessionId,
});
if (!ensured.ok) {
return { ok: false, error: ensured.error ?? "failed to create transcript file" };
}
}
if (params.idempotencyKey && transcriptHasIdempotencyKey(transcriptPath, params.idempotencyKey)) {
return { ok: true };
}
return appendInjectedAssistantMessageToTranscript({
transcriptPath,
message: params.message,
label: params.label,
idempotencyKey: params.idempotencyKey,
abortMeta: params.abortMeta,
});
}
function collectSessionAbortPartials(params: {
chatAbortControllers: Map<string, ChatAbortControllerEntry>;
chatRunBuffers: Map<string, string>;
runIds: ReadonlySet<string>;
abortOrigin: AbortOrigin;
}): AbortedPartialSnapshot[] {
const out: AbortedPartialSnapshot[] = [];
for (const [runId, active] of params.chatAbortControllers) {
if (!params.runIds.has(runId)) {
continue;
}
const text = params.chatRunBuffers.get(runId);
if (!text || !text.trim()) {
continue;
}
out.push({
runId,
sessionId: active.sessionId,
text,
abortOrigin: params.abortOrigin,
});
}
return out;
}
function persistAbortedPartials(params: {
context: Pick<GatewayRequestContext, "logGateway">;
sessionKey: string;
snapshots: AbortedPartialSnapshot[];
}) {
if (params.snapshots.length === 0) {
return;
}
const { storePath, entry } = loadSessionEntry(params.sessionKey);
for (const snapshot of params.snapshots) {
const sessionId = entry?.sessionId ?? snapshot.sessionId ?? snapshot.runId;
const appended = appendAssistantTranscriptMessage({
message: snapshot.text,
sessionId,
storePath,
sessionFile: entry?.sessionFile,
createIfMissing: true,
idempotencyKey: `${snapshot.runId}:assistant`,
abortMeta: {
aborted: true,
origin: snapshot.abortOrigin,
runId: snapshot.runId,
},
});
if (!appended.ok) {
params.context.logGateway.warn(
`chat.abort transcript append failed: ${appended.error ?? "unknown error"}`,
);
}
}
}
function createChatAbortOps(context: GatewayRequestContext): ChatAbortOps {
return {
chatAbortControllers: context.chatAbortControllers,
chatRunBuffers: context.chatRunBuffers,
chatDeltaSentAt: context.chatDeltaSentAt,
chatDeltaLastBroadcastLen: context.chatDeltaLastBroadcastLen,
chatAbortedRuns: context.chatAbortedRuns,
removeChatRun: context.removeChatRun,
agentRunSeq: context.agentRunSeq,
broadcast: context.broadcast,
nodeSendToSession: context.nodeSendToSession,
};
}
function normalizeOptionalText(value?: string | null): string | undefined {
const trimmed = value?.trim();
return trimmed || undefined;
}
function resolveChatAbortRequester(
client: GatewayRequestHandlerOptions["client"],
): ChatAbortRequester {
const scopes = Array.isArray(client?.connect?.scopes) ? client.connect.scopes : [];
return {
connId: normalizeOptionalText(client?.connId),
deviceId: normalizeOptionalText(client?.connect?.device?.id),
isAdmin: scopes.includes(ADMIN_SCOPE),
};
}
function canRequesterAbortChatRun(
entry: ChatAbortControllerEntry,
requester: ChatAbortRequester,
): boolean {
if (requester.isAdmin) {
return true;
}
const ownerDeviceId = normalizeOptionalText(entry.ownerDeviceId);
const ownerConnId = normalizeOptionalText(entry.ownerConnId);
if (!ownerDeviceId && !ownerConnId) {
return true;
}
if (ownerDeviceId && requester.deviceId && ownerDeviceId === requester.deviceId) {
return true;
}
if (ownerConnId && requester.connId && ownerConnId === requester.connId) {
return true;
}
return false;
}
function resolveAuthorizedRunIdsForSession(params: {
chatAbortControllers: Map<string, ChatAbortControllerEntry>;
sessionKey: string;
requester: ChatAbortRequester;
}) {
const authorizedRunIds: string[] = [];
let matchedSessionRuns = 0;
for (const [runId, active] of params.chatAbortControllers) {
if (active.sessionKey !== params.sessionKey) {
continue;
}
matchedSessionRuns += 1;
if (canRequesterAbortChatRun(active, params.requester)) {
authorizedRunIds.push(runId);
}
}
return {
matchedSessionRuns,
authorizedRunIds,
};
}
function abortChatRunsForSessionKeyWithPartials(params: {
context: GatewayRequestContext;
ops: ChatAbortOps;
sessionKey: string;
abortOrigin: AbortOrigin;
stopReason?: string;
requester: ChatAbortRequester;
}) {
const { matchedSessionRuns, authorizedRunIds } = resolveAuthorizedRunIdsForSession({
chatAbortControllers: params.context.chatAbortControllers,
sessionKey: params.sessionKey,
requester: params.requester,
});
if (authorizedRunIds.length === 0) {
return {
aborted: false,
runIds: [],
unauthorized: matchedSessionRuns > 0,
};
}
const authorizedRunIdSet = new Set(authorizedRunIds);
const snapshots = collectSessionAbortPartials({
chatAbortControllers: params.context.chatAbortControllers,
chatRunBuffers: params.context.chatRunBuffers,
runIds: authorizedRunIdSet,
abortOrigin: params.abortOrigin,
});
const runIds: string[] = [];
for (const runId of authorizedRunIds) {
const res = abortChatRunById(params.ops, {
runId,
sessionKey: params.sessionKey,
stopReason: params.stopReason,
});
if (res.aborted) {
runIds.push(runId);
}
}
const res = { aborted: runIds.length > 0, runIds, unauthorized: false };
if (res.aborted) {
persistAbortedPartials({
context: params.context,
sessionKey: params.sessionKey,
snapshots,
});
}
return res;
}
function nextChatSeq(context: { agentRunSeq: Map<string, number> }, runId: string) {
const next = (context.agentRunSeq.get(runId) ?? 0) + 1;
context.agentRunSeq.set(runId, next);
return next;
}
function broadcastChatFinal(params: {
context: Pick<GatewayRequestContext, "broadcast" | "nodeSendToSession" | "agentRunSeq">;
runId: string;
sessionKey: string;
message?: Record<string, unknown>;
}) {
const seq = nextChatSeq({ agentRunSeq: params.context.agentRunSeq }, params.runId);
const strippedEnvelopeMessage = stripEnvelopeFromMessage(params.message) as
| Record<string, unknown>
| undefined;
const payload = {
runId: params.runId,
sessionKey: params.sessionKey,
seq,
state: "final" as const,
message: stripInlineDirectiveTagsFromMessageForDisplay(strippedEnvelopeMessage),
};
params.context.broadcast("chat", payload);
params.context.nodeSendToSession(params.sessionKey, "chat", payload);
params.context.agentRunSeq.delete(params.runId);
}
function isBtwReplyPayload(payload: ReplyPayload | undefined): payload is ReplyPayload & {
btw: { question: string };
text: string;
} {
return (
typeof payload?.btw?.question === "string" &&
payload.btw.question.trim().length > 0 &&
typeof payload.text === "string" &&
payload.text.trim().length > 0
);
}
function broadcastSideResult(params: {
context: Pick<GatewayRequestContext, "broadcast" | "nodeSendToSession" | "agentRunSeq">;
payload: SideResultPayload;
}) {
const seq = nextChatSeq({ agentRunSeq: params.context.agentRunSeq }, params.payload.runId);
params.context.broadcast("chat.side_result", {
...params.payload,
seq,
});
params.context.nodeSendToSession(params.payload.sessionKey, "chat.side_result", {
...params.payload,
seq,
});
}
function broadcastChatError(params: {
context: Pick<GatewayRequestContext, "broadcast" | "nodeSendToSession" | "agentRunSeq">;
runId: string;
sessionKey: string;
errorMessage?: string;
}) {
const seq = nextChatSeq({ agentRunSeq: params.context.agentRunSeq }, params.runId);
const payload = {
runId: params.runId,
sessionKey: params.sessionKey,
seq,
state: "error" as const,
errorMessage: params.errorMessage,
};
params.context.broadcast("chat", payload);
params.context.nodeSendToSession(params.sessionKey, "chat", payload);
params.context.agentRunSeq.delete(params.runId);
}
export const chatHandlers: GatewayRequestHandlers = {
"chat.history": async ({ params, respond, context }) => {
if (!validateChatHistoryParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid chat.history params: ${formatValidationErrors(validateChatHistoryParams.errors)}`,
),
);
return;
}
const { sessionKey, limit } = params as {
sessionKey: string;
limit?: number;
};
const { cfg, storePath, entry } = loadSessionEntry(sessionKey);
const sessionId = entry?.sessionId;
const rawMessages =
sessionId && storePath ? readSessionMessages(sessionId, storePath, entry?.sessionFile) : [];
const hardMax = 1000;
const defaultLimit = 200;
const requested = typeof limit === "number" ? limit : defaultLimit;
const max = Math.min(hardMax, requested);
const sliced = rawMessages.length > max ? rawMessages.slice(-max) : rawMessages;
const sanitized = stripEnvelopeFromMessages(sliced);
const normalized = sanitizeChatHistoryMessages(sanitized);
const maxHistoryBytes = getMaxChatHistoryMessagesBytes();
const perMessageHardCap = Math.min(CHAT_HISTORY_MAX_SINGLE_MESSAGE_BYTES, maxHistoryBytes);
const replaced = replaceOversizedChatHistoryMessages({
messages: normalized,
maxSingleMessageBytes: perMessageHardCap,
});
const capped = capArrayByJsonBytes(replaced.messages, maxHistoryBytes).items;
const bounded = enforceChatHistoryFinalBudget({ messages: capped, maxBytes: maxHistoryBytes });
const placeholderCount = replaced.replacedCount + bounded.placeholderCount;
if (placeholderCount > 0) {
chatHistoryPlaceholderEmitCount += placeholderCount;
context.logGateway.debug(
`chat.history omitted oversized payloads placeholders=${placeholderCount} total=${chatHistoryPlaceholderEmitCount}`,
);
}
let thinkingLevel = entry?.thinkingLevel;
if (!thinkingLevel) {
const sessionAgentId = resolveSessionAgentId({ sessionKey, config: cfg });
const { provider, model } = resolveSessionModelRef(cfg, entry, sessionAgentId);
const catalog = await context.loadGatewayModelCatalog();
thinkingLevel = resolveThinkingDefault({
cfg,
provider,
model,
catalog,
});
}
const verboseLevel = entry?.verboseLevel ?? cfg.agents?.defaults?.verboseDefault;
respond(true, {
sessionKey,
sessionId,
messages: bounded.messages,
thinkingLevel,
fastMode: entry?.fastMode,
verboseLevel,
});
},
"chat.abort": ({ params, respond, context, client }) => {
if (!validateChatAbortParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid chat.abort params: ${formatValidationErrors(validateChatAbortParams.errors)}`,
),
);
return;
}
const { sessionKey: rawSessionKey, runId } = params as {
sessionKey: string;
runId?: string;
};
const ops = createChatAbortOps(context);
const requester = resolveChatAbortRequester(client);
if (!runId) {
const res = abortChatRunsForSessionKeyWithPartials({
context,
ops,
sessionKey: rawSessionKey,
abortOrigin: "rpc",
stopReason: "rpc",
requester,
});
if (res.unauthorized) {
respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, "unauthorized"));
return;
}
respond(true, { ok: true, aborted: res.aborted, runIds: res.runIds });
return;
}
const active = context.chatAbortControllers.get(runId);
if (!active) {
respond(true, { ok: true, aborted: false, runIds: [] });
return;
}
if (active.sessionKey !== rawSessionKey) {
respond(
false,
undefined,
errorShape(ErrorCodes.INVALID_REQUEST, "runId does not match sessionKey"),
);
return;
}
if (!canRequesterAbortChatRun(active, requester)) {
respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, "unauthorized"));
return;
}
const partialText = context.chatRunBuffers.get(runId);
const res = abortChatRunById(ops, {
runId,
sessionKey: rawSessionKey,
stopReason: "rpc",
});
if (res.aborted && partialText && partialText.trim()) {
persistAbortedPartials({
context,
sessionKey: rawSessionKey,
snapshots: [
{
runId,
sessionId: active.sessionId,
text: partialText,
abortOrigin: "rpc",
},
],
});
}
respond(true, {
ok: true,
aborted: res.aborted,
runIds: res.aborted ? [runId] : [],
});
},
"chat.send": async ({ params, respond, context, client }) => {
if (!validateChatSendParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid chat.send params: ${formatValidationErrors(validateChatSendParams.errors)}`,
),
);
return;
}
const p = params as {
sessionKey: string;
message: string;
thinking?: string;
deliver?: boolean;
attachments?: Array<{
type?: string;
mimeType?: string;
fileName?: string;
content?: unknown;
}>;
timeoutMs?: number;
systemInputProvenance?: InputProvenance;
systemProvenanceReceipt?: string;
idempotencyKey: string;
};
if ((p.systemInputProvenance || p.systemProvenanceReceipt) && !isAcpBridgeClient(client)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
"system provenance fields are reserved for the ACP bridge",
),
);
return;
}
const sanitizedMessageResult = sanitizeChatSendMessageInput(p.message);
if (!sanitizedMessageResult.ok) {
respond(
false,
undefined,
errorShape(ErrorCodes.INVALID_REQUEST, sanitizedMessageResult.error),
);
return;
}
const systemReceiptResult = normalizeOptionalChatSystemReceipt(p.systemProvenanceReceipt);
if (!systemReceiptResult.ok) {
respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, systemReceiptResult.error));
return;
}
const inboundMessage = sanitizedMessageResult.message;
const systemInputProvenance = normalizeInputProvenance(p.systemInputProvenance);
const systemProvenanceReceipt = systemReceiptResult.receipt;
const stopCommand = isChatStopCommandText(inboundMessage);
const normalizedAttachments = normalizeRpcAttachmentsToChatAttachments(p.attachments);
const rawMessage = inboundMessage.trim();
if (!rawMessage && normalizedAttachments.length === 0) {
respond(
false,
undefined,
errorShape(ErrorCodes.INVALID_REQUEST, "message or attachment required"),
);
return;
}
let parsedMessage = inboundMessage;
let parsedImages: ChatImageContent[] = [];
if (normalizedAttachments.length > 0) {
try {
const parsed = await parseMessageWithAttachments(inboundMessage, normalizedAttachments, {
maxBytes: 5_000_000,
log: context.logGateway,
});
parsedMessage = parsed.message;
parsedImages = parsed.images;
} catch (err) {
respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, String(err)));
return;
}
}
const rawSessionKey = p.sessionKey;
const { cfg, entry, canonicalKey: sessionKey } = loadSessionEntry(rawSessionKey);
const timeoutMs = resolveAgentTimeoutMs({
cfg,
overrideMs: p.timeoutMs,
});
const now = Date.now();
const clientRunId = p.idempotencyKey;
const sendPolicy = resolveSendPolicy({
cfg,
entry,
sessionKey,
channel: entry?.channel,
chatType: entry?.chatType,
});
if (sendPolicy === "deny") {
respond(
false,
undefined,
errorShape(ErrorCodes.INVALID_REQUEST, "send blocked by session policy"),
);
return;
}
if (stopCommand) {
const res = abortChatRunsForSessionKeyWithPartials({
context,
ops: createChatAbortOps(context),
sessionKey: rawSessionKey,
abortOrigin: "stop-command",
stopReason: "stop",
requester: resolveChatAbortRequester(client),
});
if (res.unauthorized) {
respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, "unauthorized"));
return;
}
respond(true, { ok: true, aborted: res.aborted, runIds: res.runIds });
return;
}
const cached = context.dedupe.get(`chat:${clientRunId}`);
if (cached) {
respond(cached.ok, cached.payload, cached.error, {
cached: true,
});
return;
}
const activeExisting = context.chatAbortControllers.get(clientRunId);
if (activeExisting) {
respond(true, { runId: clientRunId, status: "in_flight" as const }, undefined, {
cached: true,
runId: clientRunId,
});
return;
}
try {
const abortController = new AbortController();
context.chatAbortControllers.set(clientRunId, {
controller: abortController,
sessionId: entry?.sessionId ?? clientRunId,
sessionKey: rawSessionKey,
startedAtMs: now,
expiresAtMs: resolveChatRunExpiresAtMs({ now, timeoutMs }),
ownerConnId: normalizeOptionalText(client?.connId),
ownerDeviceId: normalizeOptionalText(client?.connect?.device?.id),
});
const ackPayload = {
runId: clientRunId,
status: "started" as const,
};
respond(true, ackPayload, undefined, { runId: clientRunId });
const persistedImagesPromise = persistChatSendImages({
images: parsedImages,
client,
logGateway: context.logGateway,
});
const trimmedMessage = parsedMessage.trim();
const injectThinking = Boolean(
p.thinking && trimmedMessage && !trimmedMessage.startsWith("/"),
);
const commandBody = injectThinking ? `/think ${p.thinking} ${parsedMessage}` : parsedMessage;
const messageForAgent = systemProvenanceReceipt
? [systemProvenanceReceipt, parsedMessage].filter(Boolean).join("\n\n")
: parsedMessage;
const clientInfo = client?.connect?.client;
const {
originatingChannel,
originatingTo,
accountId,
messageThreadId,
explicitDeliverRoute,
} = resolveChatSendOriginatingRoute({
client: clientInfo,
deliver: p.deliver,
entry,
hasConnectedClient: client?.connect !== undefined,
mainKey: cfg.session?.mainKey,
sessionKey,
});
// Inject timestamp so agents know the current date/time.
// Only BodyForAgent gets the timestamp — Body stays raw for UI display.
// See: https://github.com/moltbot/moltbot/issues/3658
const stampedMessage = injectTimestamp(messageForAgent, timestampOptsFromConfig(cfg));
const ctx: MsgContext = {
Body: messageForAgent,
BodyForAgent: stampedMessage,
BodyForCommands: commandBody,
RawBody: parsedMessage,
CommandBody: commandBody,
InputProvenance: systemInputProvenance,
SessionKey: sessionKey,
Provider: INTERNAL_MESSAGE_CHANNEL,
Surface: INTERNAL_MESSAGE_CHANNEL,
OriginatingChannel: originatingChannel,
OriginatingTo: originatingTo,
ExplicitDeliverRoute: explicitDeliverRoute,
AccountId: accountId,
MessageThreadId: messageThreadId,
ChatType: "direct",
CommandAuthorized: true,
MessageSid: clientRunId,
SenderId: clientInfo?.id,
SenderName: clientInfo?.displayName,
SenderUsername: clientInfo?.displayName,
GatewayClientScopes: client?.connect?.scopes,
};
const agentId = resolveSessionAgentId({
sessionKey,
config: cfg,
});
const { onModelSelected, ...replyPipeline } = createChannelReplyPipeline({
cfg,
agentId,
channel: INTERNAL_MESSAGE_CHANNEL,
});
const deliveredReplies: Array<{ payload: ReplyPayload; kind: "block" | "final" }> = [];
let userTranscriptUpdateEmitted = false;
const emitUserTranscriptUpdate = async () => {
if (userTranscriptUpdateEmitted) {
return;
}
const { storePath: latestStorePath, entry: latestEntry } = loadSessionEntry(sessionKey);
const resolvedSessionId = latestEntry?.sessionId ?? entry?.sessionId;
if (!resolvedSessionId) {
return;
}
const transcriptPath = resolveTranscriptPath({
sessionId: resolvedSessionId,
storePath: latestStorePath,
sessionFile: latestEntry?.sessionFile ?? entry?.sessionFile,
agentId,
});
if (!transcriptPath) {
return;
}
userTranscriptUpdateEmitted = true;
const persistedImages = await persistedImagesPromise;
emitSessionTranscriptUpdate({
sessionFile: transcriptPath,
sessionKey,
message: buildChatSendTranscriptMessage({
message: parsedMessage,
savedImages: persistedImages,
timestamp: now,
}),
});
};
let transcriptMediaRewriteDone = false;
const rewriteUserTranscriptMedia = async () => {
if (transcriptMediaRewriteDone) {
return;
}
const { storePath: latestStorePath, entry: latestEntry } = loadSessionEntry(sessionKey);
const resolvedSessionId = latestEntry?.sessionId ?? entry?.sessionId;
if (!resolvedSessionId) {
return;
}
const transcriptPath = resolveTranscriptPath({
sessionId: resolvedSessionId,
storePath: latestStorePath,
sessionFile: latestEntry?.sessionFile ?? entry?.sessionFile,
agentId,
});
if (!transcriptPath) {
return;
}
transcriptMediaRewriteDone = true;
await rewriteChatSendUserTurnMediaPaths({
transcriptPath,
sessionKey,
message: parsedMessage,
savedImages: await persistedImagesPromise,
});
};
const dispatcher = createReplyDispatcher({
...replyPipeline,
onError: (err) => {
context.logGateway.warn(`webchat dispatch failed: ${formatForLog(err)}`);
},
deliver: async (payload, info) => {
if (info.kind !== "block" && info.kind !== "final") {
return;
}
deliveredReplies.push({ payload, kind: info.kind });
},
});
let agentRunStarted = false;
void dispatchInboundMessage({
ctx,
cfg,
dispatcher,
replyOptions: {
runId: clientRunId,
abortSignal: abortController.signal,
images: parsedImages.length > 0 ? parsedImages : undefined,
onAgentRunStart: (runId) => {
agentRunStarted = true;
void emitUserTranscriptUpdate();
const connId = typeof client?.connId === "string" ? client.connId : undefined;
const wantsToolEvents = hasGatewayClientCap(
client?.connect?.caps,
GATEWAY_CLIENT_CAPS.TOOL_EVENTS,
);
if (connId && wantsToolEvents) {
context.registerToolEventRecipient(runId, connId);
// Register for any other active runs *in the same session* so
// late-joining clients (e.g. page refresh mid-response) receive
// in-progress tool events without leaking cross-session data.
for (const [activeRunId, active] of context.chatAbortControllers) {
if (activeRunId !== runId && active.sessionKey === p.sessionKey) {
context.registerToolEventRecipient(activeRunId, connId);
}
}
}
},
onModelSelected,
},
})
.then(async () => {
await rewriteUserTranscriptMedia();
if (!agentRunStarted) {
await emitUserTranscriptUpdate();
const btwReplies = deliveredReplies
.map((entry) => entry.payload)
.filter(isBtwReplyPayload);
const btwText = btwReplies
.map((payload) => payload.text.trim())
.filter(Boolean)
.join("\n\n")
.trim();
if (btwReplies.length > 0 && btwText) {
broadcastSideResult({
context,
payload: {
kind: "btw",
runId: clientRunId,
sessionKey: rawSessionKey,
question: btwReplies[0].btw.question.trim(),
text: btwText,
isError: btwReplies.some((payload) => payload.isError),
ts: Date.now(),
},
});
broadcastChatFinal({
context,
runId: clientRunId,
sessionKey: rawSessionKey,
});
} else {
const combinedReply = deliveredReplies
.filter((entry) => entry.kind === "final")
.map((entry) => entry.payload)
.map((part) => part.text?.trim() ?? "")
.filter(Boolean)
.join("\n\n")
.trim();
let message: Record<string, unknown> | undefined;
if (combinedReply) {
const { storePath: latestStorePath, entry: latestEntry } =
loadSessionEntry(sessionKey);
const sessionId = latestEntry?.sessionId ?? entry?.sessionId ?? clientRunId;
const appended = appendAssistantTranscriptMessage({
message: combinedReply,
sessionId,
storePath: latestStorePath,
sessionFile: latestEntry?.sessionFile,
agentId,
createIfMissing: true,
});
if (appended.ok) {
message = appended.message;
} else {
context.logGateway.warn(
`webchat transcript append failed: ${appended.error ?? "unknown error"}`,
);
const now = Date.now();
message = {
role: "assistant",
content: [{ type: "text", text: combinedReply }],
timestamp: now,
// Keep this compatible with Pi stopReason enums even though this message isn't
// persisted to the transcript due to the append failure.
stopReason: "stop",
usage: { input: 0, output: 0, totalTokens: 0 },
};
}
}
broadcastChatFinal({
context,
runId: clientRunId,
sessionKey: rawSessionKey,
message,
});
}
} else {
void emitUserTranscriptUpdate();
}
setGatewayDedupeEntry({
dedupe: context.dedupe,
key: `chat:${clientRunId}`,
entry: {
ts: Date.now(),
ok: true,
payload: { runId: clientRunId, status: "ok" as const },
},
});
})
.catch((err) => {
const error = errorShape(ErrorCodes.UNAVAILABLE, String(err));
setGatewayDedupeEntry({
dedupe: context.dedupe,
key: `chat:${clientRunId}`,
entry: {
ts: Date.now(),
ok: false,
payload: {
runId: clientRunId,
status: "error" as const,
summary: String(err),
},
error,
},
});
broadcastChatError({
context,
runId: clientRunId,
sessionKey: rawSessionKey,
errorMessage: String(err),
});
})
.finally(() => {
context.chatAbortControllers.delete(clientRunId);
});
} catch (err) {
const error = errorShape(ErrorCodes.UNAVAILABLE, String(err));
const payload = {
runId: clientRunId,
status: "error" as const,
summary: String(err),
};
setGatewayDedupeEntry({
dedupe: context.dedupe,
key: `chat:${clientRunId}`,
entry: {
ts: Date.now(),
ok: false,
payload,
error,
},
});
respond(false, payload, error, {
runId: clientRunId,
error: formatForLog(err),
});
}
},
"chat.inject": async ({ params, respond, context }) => {
if (!validateChatInjectParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid chat.inject params: ${formatValidationErrors(validateChatInjectParams.errors)}`,
),
);
return;
}
const p = params as {
sessionKey: string;
message: string;
label?: string;
};
// Load session to find transcript file
const rawSessionKey = p.sessionKey;
const { cfg, storePath, entry } = loadSessionEntry(rawSessionKey);
const sessionId = entry?.sessionId;
if (!sessionId || !storePath) {
respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, "session not found"));
return;
}
const appended = appendAssistantTranscriptMessage({
message: p.message,
label: p.label,
sessionId,
storePath,
sessionFile: entry?.sessionFile,
agentId: resolveSessionAgentId({ sessionKey: rawSessionKey, config: cfg }),
createIfMissing: true,
});
if (!appended.ok || !appended.messageId || !appended.message) {
respond(
false,
undefined,
errorShape(
ErrorCodes.UNAVAILABLE,
`failed to write transcript: ${appended.error ?? "unknown error"}`,
),
);
return;
}
// Broadcast to webchat for immediate UI update
const chatPayload = {
runId: `inject-${appended.messageId}`,
sessionKey: rawSessionKey,
seq: 0,
state: "final" as const,
message: stripInlineDirectiveTagsFromMessageForDisplay(
stripEnvelopeFromMessage(appended.message) as Record<string, unknown>,
),
};
context.broadcast("chat", chatPayload);
context.nodeSendToSession(rawSessionKey, "chat", chatPayload);
respond(true, { ok: true, messageId: appended.messageId });
},
};