Matrix: narrow history ingress lock

This commit is contained in:
Gustavo Madeira Santana 2026-03-30 02:34:00 -04:00
parent 28fbd2cf6c
commit c7052853f3
No known key found for this signature in database
2 changed files with 131 additions and 116 deletions

View File

@ -581,7 +581,9 @@ Current behavior:
- `channels.matrix.historyLimit` controls how many recent room messages are included as `InboundHistory` when a Matrix room message triggers the agent.
- It falls back to `messages.groupChat.historyLimit`. Set `0` to disable.
- Matrix room history is room-only. DMs keep using normal session history.
- Matrix room history is pending-only: OpenClaw buffers room messages that did not trigger a reply yet, then snapshots that window when a mention or other trigger arrives.
- The current trigger message is not included in `InboundHistory`; it stays in the main inbound body for that turn.
- Retries of the same Matrix event reuse the original history snapshot instead of drifting forward to newer room messages.
## DM and room policy example

View File

@ -774,29 +774,6 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
return;
}
const senderName = await getSenderName();
const roomInfo = isRoom ? await getRoomInfo(roomId) : undefined;
const roomName = roomInfo?.name;
const replyToEventId = content["m.relates_to"]?.["m.in_reply_to"]?.event_id;
const threadTarget = resolveMatrixThreadTarget({
threadReplies,
messageId: _messageId,
threadRootId: _threadRootId,
isThreadRoot: false,
});
const threadContext = _threadRootId
? await resolveThreadContext({ roomId, threadRootId: _threadRootId })
: undefined;
const replyContext =
replyToEventId && replyToEventId === _threadRootId && threadContext?.summary
? {
replyToBody: threadContext.summary,
replyToSender: threadContext.senderLabel,
}
: replyToEventId
? await resolveReplyContext({ roomId, eventId: replyToEventId })
: undefined;
if (_configuredBinding) {
const ensured = await ensureConfiguredAcpBindingReady({
cfg,
@ -815,7 +792,6 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
if (_runtimeBindingId) {
getSessionBindingService().touch(_runtimeBindingId, eventTs ?? undefined);
}
const envelopeFrom = isDirectMessage ? senderName : (roomName ?? roomId);
const preparedTrigger =
isRoom && historyLimit > 0
? roomHistoryTracker.prepareTrigger(_route.agentId, roomId, historyLimit, {
@ -828,95 +804,10 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
const inboundHistory = preparedTrigger?.history;
const triggerSnapshotIdx = preparedTrigger?.snapshotIdx ?? -1;
const textWithId = `${bodyText}\n[matrix event id: ${_messageId} room: ${roomId}]`;
const storePath = core.channel.session.resolveStorePath(cfg.session?.store, {
agentId: _route.agentId,
});
const envelopeOptions = core.channel.reply.resolveEnvelopeFormatOptions(cfg);
const previousTimestamp = core.channel.session.readSessionUpdatedAt({
storePath,
sessionKey: _route.sessionKey,
});
const body = core.channel.reply.formatAgentEnvelope({
channel: "Matrix",
from: envelopeFrom,
timestamp: eventTs ?? undefined,
previousTimestamp,
envelope: envelopeOptions,
body: textWithId,
});
const groupSystemPrompt = roomConfig?.systemPrompt?.trim() || undefined;
const ctxPayload = core.channel.reply.finalizeInboundContext({
Body: body,
RawBody: bodyText,
CommandBody: bodyText,
InboundHistory: inboundHistory && inboundHistory.length > 0 ? inboundHistory : undefined,
From: isDirectMessage ? `matrix:${senderId}` : `matrix:channel:${roomId}`,
To: `room:${roomId}`,
SessionKey: _route.sessionKey,
AccountId: _route.accountId,
ChatType: isDirectMessage ? "direct" : "channel",
ConversationLabel: envelopeFrom,
SenderName: senderName,
SenderId: senderId,
SenderUsername: senderId.split(":")[0]?.replace(/^@/, ""),
GroupSubject: isRoom ? (roomName ?? roomId) : undefined,
GroupId: isRoom ? roomId : undefined,
GroupSystemPrompt: isRoom ? groupSystemPrompt : undefined,
Provider: "matrix" as const,
Surface: "matrix" as const,
WasMentioned: isRoom ? wasMentioned : undefined,
MessageSid: _messageId,
ReplyToId: threadTarget ? undefined : (replyToEventId ?? undefined),
ReplyToBody: replyContext?.replyToBody,
ReplyToSender: replyContext?.replyToSender,
MessageThreadId: threadTarget,
ThreadStarterBody: threadContext?.threadStarterBody,
Timestamp: eventTs ?? undefined,
MediaPath: media?.path,
MediaType: media?.contentType,
MediaUrl: media?.path,
...locationPayload?.context,
CommandAuthorized: commandAuthorized,
CommandSource: "text" as const,
OriginatingChannel: "matrix" as const,
OriginatingTo: `room:${roomId}`,
});
await core.channel.session.recordInboundSession({
storePath,
sessionKey: ctxPayload.SessionKey ?? _route.sessionKey,
ctx: ctxPayload,
updateLastRoute: isDirectMessage
? {
sessionKey: _route.mainSessionKey,
channel: "matrix",
to: `room:${roomId}`,
accountId: _route.accountId,
}
: undefined,
onRecordError: (err) => {
logger.warn("failed updating session meta", {
error: String(err),
storePath,
sessionKey: ctxPayload.SessionKey ?? _route.sessionKey,
});
},
});
const preview = bodyText.slice(0, 200).replace(/\n/g, "\\n");
logVerboseMessage(`matrix inbound: room=${roomId} from=${senderId} preview="${preview}"`);
const replyTarget = ctxPayload.To;
if (!replyTarget) {
runtime.error?.("matrix: missing reply target");
return;
}
return {
ctxPayload,
route: _route,
configuredBinding: _configuredBinding,
runtimeBindingId: _runtimeBindingId,
roomConfig,
isDirectMessage,
isRoom,
@ -924,10 +815,15 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
wasMentioned,
shouldBypassMention,
canDetectMention,
commandAuthorized,
inboundHistory,
senderName,
bodyText,
media,
locationPayload,
messageId: _messageId,
triggerSnapshotIdx,
threadTarget,
replyTarget,
threadRootId: _threadRootId,
};
});
if (!ingressResult) {
@ -935,8 +831,9 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
}
const {
ctxPayload,
route: _route,
configuredBinding: _configuredBinding,
runtimeBindingId: _runtimeBindingId,
roomConfig,
isDirectMessage,
isRoom,
@ -944,12 +841,128 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
wasMentioned,
shouldBypassMention,
canDetectMention,
commandAuthorized,
inboundHistory,
senderName,
bodyText,
media,
locationPayload,
messageId: _messageId,
triggerSnapshotIdx,
threadTarget,
replyTarget,
threadRootId: _threadRootId,
} = ingressResult;
// Keep the per-room ingress gate focused on ordering-sensitive state updates.
// Prompt/session enrichment below can run concurrently after the history snapshot is fixed.
const replyToEventId = (event.content as RoomMessageEventContent)["m.relates_to"]?.[
"m.in_reply_to"
]?.event_id;
const threadTarget = resolveMatrixThreadTarget({
threadReplies,
messageId: _messageId,
threadRootId: _threadRootId,
isThreadRoot: false,
});
const threadContext = _threadRootId
? await resolveThreadContext({ roomId, threadRootId: _threadRootId })
: undefined;
const replyContext =
replyToEventId && replyToEventId === _threadRootId && threadContext?.summary
? {
replyToBody: threadContext.summary,
replyToSender: threadContext.senderLabel,
}
: replyToEventId
? await resolveReplyContext({ roomId, eventId: replyToEventId })
: undefined;
const roomInfo = isRoom ? await getRoomInfo(roomId) : undefined;
const roomName = roomInfo?.name;
const envelopeFrom = isDirectMessage ? senderName : (roomName ?? roomId);
const textWithId = `${bodyText}\n[matrix event id: ${_messageId} room: ${roomId}]`;
const storePath = core.channel.session.resolveStorePath(cfg.session?.store, {
agentId: _route.agentId,
});
const envelopeOptions = core.channel.reply.resolveEnvelopeFormatOptions(cfg);
const previousTimestamp = core.channel.session.readSessionUpdatedAt({
storePath,
sessionKey: _route.sessionKey,
});
const body = core.channel.reply.formatAgentEnvelope({
channel: "Matrix",
from: envelopeFrom,
timestamp: eventTs ?? undefined,
previousTimestamp,
envelope: envelopeOptions,
body: textWithId,
});
const groupSystemPrompt = roomConfig?.systemPrompt?.trim() || undefined;
const ctxPayload = core.channel.reply.finalizeInboundContext({
Body: body,
RawBody: bodyText,
CommandBody: bodyText,
InboundHistory: inboundHistory && inboundHistory.length > 0 ? inboundHistory : undefined,
From: isDirectMessage ? `matrix:${senderId}` : `matrix:channel:${roomId}`,
To: `room:${roomId}`,
SessionKey: _route.sessionKey,
AccountId: _route.accountId,
ChatType: isDirectMessage ? "direct" : "channel",
ConversationLabel: envelopeFrom,
SenderName: senderName,
SenderId: senderId,
SenderUsername: senderId.split(":")[0]?.replace(/^@/, ""),
GroupSubject: isRoom ? (roomName ?? roomId) : undefined,
GroupId: isRoom ? roomId : undefined,
GroupSystemPrompt: isRoom ? groupSystemPrompt : undefined,
Provider: "matrix" as const,
Surface: "matrix" as const,
WasMentioned: isRoom ? wasMentioned : undefined,
MessageSid: _messageId,
ReplyToId: threadTarget ? undefined : (replyToEventId ?? undefined),
ReplyToBody: replyContext?.replyToBody,
ReplyToSender: replyContext?.replyToSender,
MessageThreadId: threadTarget,
ThreadStarterBody: threadContext?.threadStarterBody,
Timestamp: eventTs ?? undefined,
MediaPath: media?.path,
MediaType: media?.contentType,
MediaUrl: media?.path,
...locationPayload?.context,
CommandAuthorized: commandAuthorized,
CommandSource: "text" as const,
OriginatingChannel: "matrix" as const,
OriginatingTo: `room:${roomId}`,
});
await core.channel.session.recordInboundSession({
storePath,
sessionKey: ctxPayload.SessionKey ?? _route.sessionKey,
ctx: ctxPayload,
updateLastRoute: isDirectMessage
? {
sessionKey: _route.mainSessionKey,
channel: "matrix",
to: `room:${roomId}`,
accountId: _route.accountId,
}
: undefined,
onRecordError: (err) => {
logger.warn("failed updating session meta", {
error: String(err),
storePath,
sessionKey: ctxPayload.SessionKey ?? _route.sessionKey,
});
},
});
const preview = bodyText.slice(0, 200).replace(/\n/g, "\\n");
logVerboseMessage(`matrix inbound: room=${roomId} from=${senderId} preview="${preview}"`);
const replyTarget = ctxPayload.To;
if (!replyTarget) {
runtime.error?.("matrix: missing reply target");
return;
}
const { ackReaction, ackReactionScope: ackScope } = resolveMatrixAckReactionConfig({
cfg,
agentId: _route.agentId,