From c7052853f3b17afdb252b4daf3b40eceaeff2809 Mon Sep 17 00:00:00 2001 From: Gustavo Madeira Santana Date: Mon, 30 Mar 2026 02:34:00 -0400 Subject: [PATCH] Matrix: narrow history ingress lock --- docs/channels/matrix.md | 2 + .../matrix/src/matrix/monitor/handler.ts | 245 +++++++++--------- 2 files changed, 131 insertions(+), 116 deletions(-) diff --git a/docs/channels/matrix.md b/docs/channels/matrix.md index 3394ba814f7..6346370e1cc 100644 --- a/docs/channels/matrix.md +++ b/docs/channels/matrix.md @@ -581,7 +581,9 @@ Current behavior: - `channels.matrix.historyLimit` controls how many recent room messages are included as `InboundHistory` when a Matrix room message triggers the agent. - It falls back to `messages.groupChat.historyLimit`. Set `0` to disable. +- Matrix room history is room-only. DMs keep using normal session history. - Matrix room history is pending-only: OpenClaw buffers room messages that did not trigger a reply yet, then snapshots that window when a mention or other trigger arrives. +- The current trigger message is not included in `InboundHistory`; it stays in the main inbound body for that turn. - Retries of the same Matrix event reuse the original history snapshot instead of drifting forward to newer room messages. ## DM and room policy example diff --git a/extensions/matrix/src/matrix/monitor/handler.ts b/extensions/matrix/src/matrix/monitor/handler.ts index ae46003cf73..5592a5ae09d 100644 --- a/extensions/matrix/src/matrix/monitor/handler.ts +++ b/extensions/matrix/src/matrix/monitor/handler.ts @@ -774,29 +774,6 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam return; } const senderName = await getSenderName(); - const roomInfo = isRoom ? await getRoomInfo(roomId) : undefined; - const roomName = roomInfo?.name; - - const replyToEventId = content["m.relates_to"]?.["m.in_reply_to"]?.event_id; - const threadTarget = resolveMatrixThreadTarget({ - threadReplies, - messageId: _messageId, - threadRootId: _threadRootId, - isThreadRoot: false, - }); - const threadContext = _threadRootId - ? await resolveThreadContext({ roomId, threadRootId: _threadRootId }) - : undefined; - const replyContext = - replyToEventId && replyToEventId === _threadRootId && threadContext?.summary - ? { - replyToBody: threadContext.summary, - replyToSender: threadContext.senderLabel, - } - : replyToEventId - ? await resolveReplyContext({ roomId, eventId: replyToEventId }) - : undefined; - if (_configuredBinding) { const ensured = await ensureConfiguredAcpBindingReady({ cfg, @@ -815,7 +792,6 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam if (_runtimeBindingId) { getSessionBindingService().touch(_runtimeBindingId, eventTs ?? undefined); } - const envelopeFrom = isDirectMessage ? senderName : (roomName ?? roomId); const preparedTrigger = isRoom && historyLimit > 0 ? roomHistoryTracker.prepareTrigger(_route.agentId, roomId, historyLimit, { @@ -828,95 +804,10 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam const inboundHistory = preparedTrigger?.history; const triggerSnapshotIdx = preparedTrigger?.snapshotIdx ?? -1; - const textWithId = `${bodyText}\n[matrix event id: ${_messageId} room: ${roomId}]`; - const storePath = core.channel.session.resolveStorePath(cfg.session?.store, { - agentId: _route.agentId, - }); - const envelopeOptions = core.channel.reply.resolveEnvelopeFormatOptions(cfg); - const previousTimestamp = core.channel.session.readSessionUpdatedAt({ - storePath, - sessionKey: _route.sessionKey, - }); - const body = core.channel.reply.formatAgentEnvelope({ - channel: "Matrix", - from: envelopeFrom, - timestamp: eventTs ?? undefined, - previousTimestamp, - envelope: envelopeOptions, - body: textWithId, - }); - - const groupSystemPrompt = roomConfig?.systemPrompt?.trim() || undefined; - const ctxPayload = core.channel.reply.finalizeInboundContext({ - Body: body, - RawBody: bodyText, - CommandBody: bodyText, - InboundHistory: inboundHistory && inboundHistory.length > 0 ? inboundHistory : undefined, - From: isDirectMessage ? `matrix:${senderId}` : `matrix:channel:${roomId}`, - To: `room:${roomId}`, - SessionKey: _route.sessionKey, - AccountId: _route.accountId, - ChatType: isDirectMessage ? "direct" : "channel", - ConversationLabel: envelopeFrom, - SenderName: senderName, - SenderId: senderId, - SenderUsername: senderId.split(":")[0]?.replace(/^@/, ""), - GroupSubject: isRoom ? (roomName ?? roomId) : undefined, - GroupId: isRoom ? roomId : undefined, - GroupSystemPrompt: isRoom ? groupSystemPrompt : undefined, - Provider: "matrix" as const, - Surface: "matrix" as const, - WasMentioned: isRoom ? wasMentioned : undefined, - MessageSid: _messageId, - ReplyToId: threadTarget ? undefined : (replyToEventId ?? undefined), - ReplyToBody: replyContext?.replyToBody, - ReplyToSender: replyContext?.replyToSender, - MessageThreadId: threadTarget, - ThreadStarterBody: threadContext?.threadStarterBody, - Timestamp: eventTs ?? undefined, - MediaPath: media?.path, - MediaType: media?.contentType, - MediaUrl: media?.path, - ...locationPayload?.context, - CommandAuthorized: commandAuthorized, - CommandSource: "text" as const, - OriginatingChannel: "matrix" as const, - OriginatingTo: `room:${roomId}`, - }); - - await core.channel.session.recordInboundSession({ - storePath, - sessionKey: ctxPayload.SessionKey ?? _route.sessionKey, - ctx: ctxPayload, - updateLastRoute: isDirectMessage - ? { - sessionKey: _route.mainSessionKey, - channel: "matrix", - to: `room:${roomId}`, - accountId: _route.accountId, - } - : undefined, - onRecordError: (err) => { - logger.warn("failed updating session meta", { - error: String(err), - storePath, - sessionKey: ctxPayload.SessionKey ?? _route.sessionKey, - }); - }, - }); - - const preview = bodyText.slice(0, 200).replace(/\n/g, "\\n"); - logVerboseMessage(`matrix inbound: room=${roomId} from=${senderId} preview="${preview}"`); - - const replyTarget = ctxPayload.To; - if (!replyTarget) { - runtime.error?.("matrix: missing reply target"); - return; - } - return { - ctxPayload, route: _route, + configuredBinding: _configuredBinding, + runtimeBindingId: _runtimeBindingId, roomConfig, isDirectMessage, isRoom, @@ -924,10 +815,15 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam wasMentioned, shouldBypassMention, canDetectMention, + commandAuthorized, + inboundHistory, + senderName, + bodyText, + media, + locationPayload, messageId: _messageId, triggerSnapshotIdx, - threadTarget, - replyTarget, + threadRootId: _threadRootId, }; }); if (!ingressResult) { @@ -935,8 +831,9 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam } const { - ctxPayload, route: _route, + configuredBinding: _configuredBinding, + runtimeBindingId: _runtimeBindingId, roomConfig, isDirectMessage, isRoom, @@ -944,12 +841,128 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam wasMentioned, shouldBypassMention, canDetectMention, + commandAuthorized, + inboundHistory, + senderName, + bodyText, + media, + locationPayload, messageId: _messageId, triggerSnapshotIdx, - threadTarget, - replyTarget, + threadRootId: _threadRootId, } = ingressResult; + // Keep the per-room ingress gate focused on ordering-sensitive state updates. + // Prompt/session enrichment below can run concurrently after the history snapshot is fixed. + const replyToEventId = (event.content as RoomMessageEventContent)["m.relates_to"]?.[ + "m.in_reply_to" + ]?.event_id; + const threadTarget = resolveMatrixThreadTarget({ + threadReplies, + messageId: _messageId, + threadRootId: _threadRootId, + isThreadRoot: false, + }); + const threadContext = _threadRootId + ? await resolveThreadContext({ roomId, threadRootId: _threadRootId }) + : undefined; + const replyContext = + replyToEventId && replyToEventId === _threadRootId && threadContext?.summary + ? { + replyToBody: threadContext.summary, + replyToSender: threadContext.senderLabel, + } + : replyToEventId + ? await resolveReplyContext({ roomId, eventId: replyToEventId }) + : undefined; + const roomInfo = isRoom ? await getRoomInfo(roomId) : undefined; + const roomName = roomInfo?.name; + const envelopeFrom = isDirectMessage ? senderName : (roomName ?? roomId); + const textWithId = `${bodyText}\n[matrix event id: ${_messageId} room: ${roomId}]`; + const storePath = core.channel.session.resolveStorePath(cfg.session?.store, { + agentId: _route.agentId, + }); + const envelopeOptions = core.channel.reply.resolveEnvelopeFormatOptions(cfg); + const previousTimestamp = core.channel.session.readSessionUpdatedAt({ + storePath, + sessionKey: _route.sessionKey, + }); + const body = core.channel.reply.formatAgentEnvelope({ + channel: "Matrix", + from: envelopeFrom, + timestamp: eventTs ?? undefined, + previousTimestamp, + envelope: envelopeOptions, + body: textWithId, + }); + const groupSystemPrompt = roomConfig?.systemPrompt?.trim() || undefined; + const ctxPayload = core.channel.reply.finalizeInboundContext({ + Body: body, + RawBody: bodyText, + CommandBody: bodyText, + InboundHistory: inboundHistory && inboundHistory.length > 0 ? inboundHistory : undefined, + From: isDirectMessage ? `matrix:${senderId}` : `matrix:channel:${roomId}`, + To: `room:${roomId}`, + SessionKey: _route.sessionKey, + AccountId: _route.accountId, + ChatType: isDirectMessage ? "direct" : "channel", + ConversationLabel: envelopeFrom, + SenderName: senderName, + SenderId: senderId, + SenderUsername: senderId.split(":")[0]?.replace(/^@/, ""), + GroupSubject: isRoom ? (roomName ?? roomId) : undefined, + GroupId: isRoom ? roomId : undefined, + GroupSystemPrompt: isRoom ? groupSystemPrompt : undefined, + Provider: "matrix" as const, + Surface: "matrix" as const, + WasMentioned: isRoom ? wasMentioned : undefined, + MessageSid: _messageId, + ReplyToId: threadTarget ? undefined : (replyToEventId ?? undefined), + ReplyToBody: replyContext?.replyToBody, + ReplyToSender: replyContext?.replyToSender, + MessageThreadId: threadTarget, + ThreadStarterBody: threadContext?.threadStarterBody, + Timestamp: eventTs ?? undefined, + MediaPath: media?.path, + MediaType: media?.contentType, + MediaUrl: media?.path, + ...locationPayload?.context, + CommandAuthorized: commandAuthorized, + CommandSource: "text" as const, + OriginatingChannel: "matrix" as const, + OriginatingTo: `room:${roomId}`, + }); + + await core.channel.session.recordInboundSession({ + storePath, + sessionKey: ctxPayload.SessionKey ?? _route.sessionKey, + ctx: ctxPayload, + updateLastRoute: isDirectMessage + ? { + sessionKey: _route.mainSessionKey, + channel: "matrix", + to: `room:${roomId}`, + accountId: _route.accountId, + } + : undefined, + onRecordError: (err) => { + logger.warn("failed updating session meta", { + error: String(err), + storePath, + sessionKey: ctxPayload.SessionKey ?? _route.sessionKey, + }); + }, + }); + + const preview = bodyText.slice(0, 200).replace(/\n/g, "\\n"); + logVerboseMessage(`matrix inbound: room=${roomId} from=${senderId} preview="${preview}"`); + + const replyTarget = ctxPayload.To; + if (!replyTarget) { + runtime.error?.("matrix: missing reply target"); + return; + } + const { ackReaction, ackReactionScope: ackScope } = resolveMatrixAckReactionConfig({ cfg, agentId: _route.agentId,