openclaw/extensions/matrix-js/src/matrix/monitor/handler.ts

742 lines
25 KiB
TypeScript

import {
createReplyPrefixOptions,
createTypingCallbacks,
formatAllowlistMatchMeta,
logInboundDrop,
logTypingFailure,
resolveControlCommandGate,
type PluginRuntime,
type ReplyPayload,
type RuntimeEnv,
type RuntimeLogger,
} from "openclaw/plugin-sdk/matrix-js";
import type { CoreConfig, MatrixRoomConfig, ReplyToMode } from "../../types.js";
import {
formatPollAsText,
isPollStartType,
parsePollStartContent,
type PollStartContent,
} from "../poll-types.js";
import type { LocationMessageEventContent, MatrixClient } from "../sdk.js";
import {
reactMatrixMessage,
sendMessageMatrix,
sendReadReceiptMatrix,
sendTypingMatrix,
} from "../send.js";
import { resolveMatrixAckReactionConfig } from "./ack-config.js";
import {
normalizeMatrixAllowList,
resolveMatrixAllowListMatch,
resolveMatrixAllowListMatches,
} from "./allowlist.js";
import { resolveMatrixLocation, type MatrixLocationPayload } from "./location.js";
import { downloadMatrixMedia } from "./media.js";
import { resolveMentions } from "./mentions.js";
import { handleInboundMatrixReaction } from "./reaction-events.js";
import { deliverMatrixReplies } from "./replies.js";
import { resolveMatrixRoomConfig } from "./rooms.js";
import { resolveMatrixThreadRootId, resolveMatrixThreadTarget } from "./threads.js";
import type { MatrixRawEvent, RoomMessageEventContent } from "./types.js";
import { EventType, RelationType } from "./types.js";
import { isMatrixVerificationRoomMessage } from "./verification-utils.js";
const ALLOW_FROM_STORE_CACHE_TTL_MS = 30_000;
const PAIRING_REPLY_COOLDOWN_MS = 5 * 60_000;
const MAX_TRACKED_PAIRING_REPLY_SENDERS = 512;
export type MatrixMonitorHandlerParams = {
client: MatrixClient;
core: PluginRuntime;
cfg: CoreConfig;
accountId: string;
runtime: RuntimeEnv;
logger: RuntimeLogger;
logVerboseMessage: (message: string) => void;
allowFrom: string[];
roomsConfig?: Record<string, MatrixRoomConfig>;
mentionRegexes: ReturnType<PluginRuntime["channel"]["mentions"]["buildMentionRegexes"]>;
groupPolicy: "open" | "allowlist" | "disabled";
replyToMode: ReplyToMode;
threadReplies: "off" | "inbound" | "always";
dmEnabled: boolean;
dmPolicy: "open" | "pairing" | "allowlist" | "disabled";
textLimit: number;
mediaMaxBytes: number;
startupMs: number;
startupGraceMs: number;
directTracker: {
isDirectMessage: (params: {
roomId: string;
senderId: string;
selfUserId: string;
}) => Promise<boolean>;
};
getRoomInfo: (
roomId: string,
) => Promise<{ name?: string; canonicalAlias?: string; altAliases: string[] }>;
getMemberDisplayName: (roomId: string, userId: string) => Promise<string>;
};
export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParams) {
const {
client,
core,
cfg,
accountId,
runtime,
logger,
logVerboseMessage,
allowFrom,
roomsConfig,
mentionRegexes,
groupPolicy,
replyToMode,
threadReplies,
dmEnabled,
dmPolicy,
textLimit,
mediaMaxBytes,
startupMs,
startupGraceMs,
directTracker,
getRoomInfo,
getMemberDisplayName,
} = params;
let cachedStoreAllowFrom: {
value: string[];
expiresAtMs: number;
} | null = null;
const pairingReplySentAtMsBySender = new Map<string, number>();
const readStoreAllowFrom = async (): Promise<string[]> => {
const now = Date.now();
if (cachedStoreAllowFrom && now < cachedStoreAllowFrom.expiresAtMs) {
return cachedStoreAllowFrom.value;
}
const value = await core.channel.pairing
.readAllowFromStore({
channel: "matrix-js",
env: process.env,
accountId,
})
.catch(() => []);
cachedStoreAllowFrom = {
value,
expiresAtMs: now + ALLOW_FROM_STORE_CACHE_TTL_MS,
};
return value;
};
const shouldSendPairingReply = (senderId: string, created: boolean): boolean => {
const now = Date.now();
if (created) {
pairingReplySentAtMsBySender.set(senderId, now);
return true;
}
const lastSentAtMs = pairingReplySentAtMsBySender.get(senderId);
if (typeof lastSentAtMs === "number" && now - lastSentAtMs < PAIRING_REPLY_COOLDOWN_MS) {
return false;
}
pairingReplySentAtMsBySender.set(senderId, now);
if (pairingReplySentAtMsBySender.size > MAX_TRACKED_PAIRING_REPLY_SENDERS) {
const oldestSender = pairingReplySentAtMsBySender.keys().next().value;
if (typeof oldestSender === "string") {
pairingReplySentAtMsBySender.delete(oldestSender);
}
}
return true;
};
return async (roomId: string, event: MatrixRawEvent) => {
try {
const eventType = event.type;
if (eventType === EventType.RoomMessageEncrypted) {
// Encrypted payloads are emitted separately after decryption.
return;
}
const isPollEvent = isPollStartType(eventType);
const isReactionEvent = eventType === EventType.Reaction;
const locationContent = event.content as LocationMessageEventContent;
const isLocationEvent =
eventType === EventType.Location ||
(eventType === EventType.RoomMessage && locationContent.msgtype === EventType.Location);
if (
eventType !== EventType.RoomMessage &&
!isPollEvent &&
!isLocationEvent &&
!isReactionEvent
) {
return;
}
logVerboseMessage(
`matrix: inbound event room=${roomId} type=${eventType} id=${event.event_id ?? "unknown"}`,
);
if (event.unsigned?.redacted_because) {
return;
}
const senderId = event.sender;
if (!senderId) {
return;
}
const selfUserId = await client.getUserId();
if (senderId === selfUserId) {
return;
}
const eventTs = event.origin_server_ts;
const eventAge = event.unsigned?.age;
if (typeof eventTs === "number" && eventTs < startupMs - startupGraceMs) {
return;
}
if (
typeof eventTs !== "number" &&
typeof eventAge === "number" &&
eventAge > startupGraceMs
) {
return;
}
const roomInfo = await getRoomInfo(roomId);
const roomName = roomInfo.name;
const roomAliases = [roomInfo.canonicalAlias ?? "", ...roomInfo.altAliases].filter(Boolean);
let content = event.content as RoomMessageEventContent;
if (isPollEvent) {
const pollStartContent = event.content as PollStartContent;
const pollSummary = parsePollStartContent(pollStartContent);
if (pollSummary) {
pollSummary.eventId = event.event_id ?? "";
pollSummary.roomId = roomId;
pollSummary.sender = senderId;
const senderDisplayName = await getMemberDisplayName(roomId, senderId);
pollSummary.senderName = senderDisplayName;
const pollText = formatPollAsText(pollSummary);
content = {
msgtype: "m.text",
body: pollText,
} as unknown as RoomMessageEventContent;
} else {
return;
}
}
if (
eventType === EventType.RoomMessage &&
isMatrixVerificationRoomMessage({
msgtype: (content as { msgtype?: unknown }).msgtype,
body: content.body,
})
) {
logVerboseMessage(`matrix: skip verification/system room message room=${roomId}`);
return;
}
const locationPayload: MatrixLocationPayload | null = resolveMatrixLocation({
eventType,
content: content as LocationMessageEventContent,
});
const relates = content["m.relates_to"];
if (relates && "rel_type" in relates) {
if (relates.rel_type === RelationType.Replace) {
return;
}
}
const isDirectMessage = await directTracker.isDirectMessage({
roomId,
senderId,
selfUserId,
});
const isRoom = !isDirectMessage;
if (isRoom && groupPolicy === "disabled") {
return;
}
const roomConfigInfo = isRoom
? resolveMatrixRoomConfig({
rooms: roomsConfig,
roomId,
aliases: roomAliases,
name: roomName,
})
: undefined;
const roomConfig = roomConfigInfo?.config;
const roomMatchMeta = roomConfigInfo
? `matchKey=${roomConfigInfo.matchKey ?? "none"} matchSource=${
roomConfigInfo.matchSource ?? "none"
}`
: "matchKey=none matchSource=none";
if (isRoom && roomConfig && !roomConfigInfo?.allowed) {
logVerboseMessage(`matrix: room disabled room=${roomId} (${roomMatchMeta})`);
return;
}
if (isRoom && groupPolicy === "allowlist") {
if (!roomConfigInfo?.allowlistConfigured) {
logVerboseMessage(`matrix: drop room message (no allowlist, ${roomMatchMeta})`);
return;
}
if (!roomConfig) {
logVerboseMessage(`matrix: drop room message (not in allowlist, ${roomMatchMeta})`);
return;
}
}
const senderName = await getMemberDisplayName(roomId, senderId);
const storeAllowFrom = await readStoreAllowFrom();
const effectiveAllowFrom = normalizeMatrixAllowList([...allowFrom, ...storeAllowFrom]);
const groupAllowFrom = cfg.channels?.["matrix-js"]?.groupAllowFrom ?? [];
const effectiveGroupAllowFrom = normalizeMatrixAllowList(groupAllowFrom);
const groupAllowConfigured = effectiveGroupAllowFrom.length > 0;
if (isDirectMessage) {
if (!dmEnabled || dmPolicy === "disabled") {
return;
}
if (dmPolicy !== "open") {
const allowMatch = resolveMatrixAllowListMatch({
allowList: effectiveAllowFrom,
userId: senderId,
});
const allowMatchMeta = formatAllowlistMatchMeta(allowMatch);
if (!allowMatch.allowed) {
if (!isReactionEvent && dmPolicy === "pairing") {
const { code, created } = await core.channel.pairing.upsertPairingRequest({
channel: "matrix-js",
id: senderId,
accountId,
meta: { name: senderName },
});
if (shouldSendPairingReply(senderId, created)) {
const pairingReply = core.channel.pairing.buildPairingReply({
channel: "matrix-js",
idLine: `Your Matrix user id: ${senderId}`,
code,
});
logVerboseMessage(
created
? `matrix pairing request sender=${senderId} name=${senderName ?? "unknown"} (${allowMatchMeta})`
: `matrix pairing reminder sender=${senderId} name=${senderName ?? "unknown"} (${allowMatchMeta})`,
);
try {
await sendMessageMatrix(
`room:${roomId}`,
created
? pairingReply
: `${pairingReply}\n\nPairing request is still pending approval. Reusing existing code.`,
{ client },
);
} catch (err) {
logVerboseMessage(`matrix pairing reply failed for ${senderId}: ${String(err)}`);
}
} else {
logVerboseMessage(
`matrix pairing reminder suppressed sender=${senderId} (cooldown)`,
);
}
}
if (isReactionEvent || dmPolicy !== "pairing") {
logVerboseMessage(
`matrix: blocked ${isReactionEvent ? "reaction" : "dm"} sender ${senderId} (dmPolicy=${dmPolicy}, ${allowMatchMeta})`,
);
}
return;
}
}
}
const roomUsers = roomConfig?.users ?? [];
if (isRoom && roomUsers.length > 0) {
const userMatch = resolveMatrixAllowListMatch({
allowList: normalizeMatrixAllowList(roomUsers),
userId: senderId,
});
if (!userMatch.allowed) {
logVerboseMessage(
`matrix: blocked sender ${senderId} (room users allowlist, ${roomMatchMeta}, ${formatAllowlistMatchMeta(
userMatch,
)})`,
);
return;
}
}
if (isRoom && groupPolicy === "allowlist" && roomUsers.length === 0 && groupAllowConfigured) {
const groupAllowMatch = resolveMatrixAllowListMatch({
allowList: effectiveGroupAllowFrom,
userId: senderId,
});
if (!groupAllowMatch.allowed) {
logVerboseMessage(
`matrix: blocked sender ${senderId} (groupAllowFrom, ${roomMatchMeta}, ${formatAllowlistMatchMeta(
groupAllowMatch,
)})`,
);
return;
}
}
if (isRoom) {
logVerboseMessage(`matrix: allow room ${roomId} (${roomMatchMeta})`);
}
if (isReactionEvent) {
await handleInboundMatrixReaction({
client,
core,
cfg,
accountId,
roomId,
event,
senderId,
senderLabel: senderName,
selfUserId,
isDirectMessage,
logVerboseMessage,
});
return;
}
const rawBody =
locationPayload?.text ?? (typeof content.body === "string" ? content.body.trim() : "");
let media: {
path: string;
contentType?: string;
placeholder: string;
} | null = null;
const contentUrl =
"url" in content && typeof content.url === "string" ? content.url : undefined;
const contentFile =
"file" in content && content.file && typeof content.file === "object"
? content.file
: undefined;
const mediaUrl = contentUrl ?? contentFile?.url;
if (!rawBody && !mediaUrl) {
return;
}
const contentInfo =
"info" in content && content.info && typeof content.info === "object"
? (content.info as { mimetype?: string; size?: number })
: undefined;
const contentType = contentInfo?.mimetype;
const contentSize = typeof contentInfo?.size === "number" ? contentInfo.size : undefined;
if (mediaUrl?.startsWith("mxc://")) {
try {
media = await downloadMatrixMedia({
client,
mxcUrl: mediaUrl,
contentType,
sizeBytes: contentSize,
maxBytes: mediaMaxBytes,
file: contentFile,
});
} catch (err) {
logVerboseMessage(`matrix: media download failed: ${String(err)}`);
}
}
const bodyText = rawBody || media?.placeholder || "";
if (!bodyText) {
return;
}
const { wasMentioned, hasExplicitMention } = resolveMentions({
content,
userId: selfUserId,
text: bodyText,
mentionRegexes,
});
const allowTextCommands = core.channel.commands.shouldHandleTextCommands({
cfg,
surface: "matrix-js",
});
const useAccessGroups = cfg.commands?.useAccessGroups !== false;
const senderAllowedForCommands = resolveMatrixAllowListMatches({
allowList: effectiveAllowFrom,
userId: senderId,
});
const senderAllowedForGroup = groupAllowConfigured
? resolveMatrixAllowListMatches({
allowList: effectiveGroupAllowFrom,
userId: senderId,
})
: false;
const senderAllowedForRoomUsers =
isRoom && roomUsers.length > 0
? resolveMatrixAllowListMatches({
allowList: normalizeMatrixAllowList(roomUsers),
userId: senderId,
})
: false;
const hasControlCommandInMessage = core.channel.text.hasControlCommand(bodyText, cfg);
const commandGate = resolveControlCommandGate({
useAccessGroups,
authorizers: [
{ configured: effectiveAllowFrom.length > 0, allowed: senderAllowedForCommands },
{ configured: roomUsers.length > 0, allowed: senderAllowedForRoomUsers },
{ configured: groupAllowConfigured, allowed: senderAllowedForGroup },
],
allowTextCommands,
hasControlCommand: hasControlCommandInMessage,
});
const commandAuthorized = commandGate.commandAuthorized;
if (isRoom && commandGate.shouldBlock) {
logInboundDrop({
log: logVerboseMessage,
channel: "matrix-js",
reason: "control command (unauthorized)",
target: senderId,
});
return;
}
const shouldRequireMention = isRoom
? roomConfig?.autoReply === true
? false
: roomConfig?.autoReply === false
? true
: typeof roomConfig?.requireMention === "boolean"
? roomConfig?.requireMention
: true
: false;
const shouldBypassMention =
allowTextCommands &&
isRoom &&
shouldRequireMention &&
!wasMentioned &&
!hasExplicitMention &&
commandAuthorized &&
hasControlCommandInMessage;
const canDetectMention = mentionRegexes.length > 0 || hasExplicitMention;
if (isRoom && shouldRequireMention && !wasMentioned && !shouldBypassMention) {
logger.info("skipping room message", { roomId, reason: "no-mention" });
return;
}
const messageId = event.event_id ?? "";
const replyToEventId = content["m.relates_to"]?.["m.in_reply_to"]?.event_id;
const threadRootId = resolveMatrixThreadRootId({ event, content });
const threadTarget = resolveMatrixThreadTarget({
threadReplies,
messageId,
threadRootId,
isThreadRoot: false, // Raw event payload does not carry explicit thread-root metadata.
});
const route = core.channel.routing.resolveAgentRoute({
cfg,
channel: "matrix-js",
accountId,
peer: {
kind: isDirectMessage ? "direct" : "channel",
id: isDirectMessage ? senderId : roomId,
},
});
const envelopeFrom = isDirectMessage ? senderName : (roomName ?? roomId);
const textWithId = `${bodyText}\n[matrix event id: ${messageId} room: ${roomId}]`;
const storePath = core.channel.session.resolveStorePath(cfg.session?.store, {
agentId: route.agentId,
});
const envelopeOptions = core.channel.reply.resolveEnvelopeFormatOptions(cfg);
const previousTimestamp = core.channel.session.readSessionUpdatedAt({
storePath,
sessionKey: route.sessionKey,
});
const body = core.channel.reply.formatAgentEnvelope({
channel: "Matrix",
from: envelopeFrom,
timestamp: eventTs ?? undefined,
previousTimestamp,
envelope: envelopeOptions,
body: textWithId,
});
const groupSystemPrompt = roomConfig?.systemPrompt?.trim() || undefined;
const ctxPayload = core.channel.reply.finalizeInboundContext({
Body: body,
RawBody: bodyText,
CommandBody: bodyText,
From: isDirectMessage ? `matrix:${senderId}` : `matrix:channel:${roomId}`,
To: `room:${roomId}`,
SessionKey: route.sessionKey,
AccountId: route.accountId,
ChatType: isDirectMessage ? "direct" : "channel",
ConversationLabel: envelopeFrom,
SenderName: senderName,
SenderId: senderId,
SenderUsername: senderId.split(":")[0]?.replace(/^@/, ""),
GroupSubject: isRoom ? (roomName ?? roomId) : undefined,
GroupChannel: isRoom ? (roomInfo.canonicalAlias ?? roomId) : undefined,
GroupSystemPrompt: isRoom ? groupSystemPrompt : undefined,
Provider: "matrix-js" as const,
Surface: "matrix-js" as const,
WasMentioned: isRoom ? wasMentioned : undefined,
MessageSid: messageId,
ReplyToId: threadTarget ? undefined : (replyToEventId ?? undefined),
MessageThreadId: threadTarget,
Timestamp: eventTs ?? undefined,
MediaPath: media?.path,
MediaType: media?.contentType,
MediaUrl: media?.path,
...locationPayload?.context,
CommandAuthorized: commandAuthorized,
CommandSource: "text" as const,
OriginatingChannel: "matrix-js" as const,
OriginatingTo: `room:${roomId}`,
});
await core.channel.session.recordInboundSession({
storePath,
sessionKey: ctxPayload.SessionKey ?? route.sessionKey,
ctx: ctxPayload,
updateLastRoute: isDirectMessage
? {
sessionKey: route.mainSessionKey,
channel: "matrix-js",
to: `room:${roomId}`,
accountId: route.accountId,
}
: undefined,
onRecordError: (err) => {
logger.warn("failed updating session meta", {
error: String(err),
storePath,
sessionKey: ctxPayload.SessionKey ?? route.sessionKey,
});
},
});
const preview = bodyText.slice(0, 200).replace(/\n/g, "\\n");
logVerboseMessage(`matrix inbound: room=${roomId} from=${senderId} preview="${preview}"`);
const { ackReaction, ackReactionScope: ackScope } = resolveMatrixAckReactionConfig({
cfg,
agentId: route.agentId,
accountId,
});
const shouldAckReaction = () =>
Boolean(
ackReaction &&
core.channel.reactions.shouldAckReaction({
scope: ackScope,
isDirect: isDirectMessage,
isGroup: isRoom,
isMentionableGroup: isRoom,
requireMention: Boolean(shouldRequireMention),
canDetectMention,
effectiveWasMentioned: wasMentioned || shouldBypassMention,
shouldBypassMention,
}),
);
if (shouldAckReaction() && messageId) {
reactMatrixMessage(roomId, messageId, ackReaction, client).catch((err) => {
logVerboseMessage(`matrix react failed for room ${roomId}: ${String(err)}`);
});
}
const replyTarget = ctxPayload.To;
if (!replyTarget) {
runtime.error?.("matrix: missing reply target");
return;
}
if (messageId) {
sendReadReceiptMatrix(roomId, messageId, client).catch((err) => {
logVerboseMessage(
`matrix: read receipt failed room=${roomId} id=${messageId}: ${String(err)}`,
);
});
}
let didSendReply = false;
const tableMode = core.channel.text.resolveMarkdownTableMode({
cfg,
channel: "matrix-js",
accountId: route.accountId,
});
const { onModelSelected, ...prefixOptions } = createReplyPrefixOptions({
cfg,
agentId: route.agentId,
channel: "matrix-js",
accountId: route.accountId,
});
const typingCallbacks = createTypingCallbacks({
start: () => sendTypingMatrix(roomId, true, undefined, client),
stop: () => sendTypingMatrix(roomId, false, undefined, client),
onStartError: (err) => {
logTypingFailure({
log: logVerboseMessage,
channel: "matrix-js",
action: "start",
target: roomId,
error: err,
});
},
onStopError: (err) => {
logTypingFailure({
log: logVerboseMessage,
channel: "matrix-js",
action: "stop",
target: roomId,
error: err,
});
},
});
const { dispatcher, replyOptions, markDispatchIdle } =
core.channel.reply.createReplyDispatcherWithTyping({
...prefixOptions,
humanDelay: core.channel.reply.resolveHumanDelayConfig(cfg, route.agentId),
deliver: async (payload: ReplyPayload) => {
await deliverMatrixReplies({
replies: [payload],
roomId,
client,
runtime,
textLimit,
replyToMode,
threadId: threadTarget,
accountId: route.accountId,
tableMode,
});
didSendReply = true;
},
onError: (err: unknown, info: { kind: "tool" | "block" | "final" }) => {
runtime.error?.(`matrix ${info.kind} reply failed: ${String(err)}`);
},
onReplyStart: typingCallbacks.onReplyStart,
onIdle: typingCallbacks.onIdle,
});
const { queuedFinal, counts } = await core.channel.reply.dispatchReplyFromConfig({
ctx: ctxPayload,
cfg,
dispatcher,
replyOptions: {
...replyOptions,
skillFilter: roomConfig?.skills,
onModelSelected,
},
});
markDispatchIdle();
if (!queuedFinal) {
return;
}
didSendReply = true;
const finalCount = counts.final;
logVerboseMessage(
`matrix: delivered ${finalCount} reply${finalCount === 1 ? "" : "ies"} to ${replyTarget}`,
);
if (didSendReply) {
const previewText = bodyText.replace(/\s+/g, " ").slice(0, 160);
core.system.enqueueSystemEvent(`Matrix message from ${senderName}: ${previewText}`, {
sessionKey: route.sessionKey,
contextKey: `matrix:message:${roomId}:${messageId || "unknown"}`,
});
}
} catch (err) {
runtime.error?.(`matrix handler failed: ${String(err)}`);
}
};
}