diff --git a/CHANGELOG.md b/CHANGELOG.md index 0fd9b6d5f54..94f851757bf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ Docs: https://docs.openclaw.ai - Flows/tasks: route one-task ACP and subagent updates through a parent flow owner context, so detached work can emerge back through the intended parent thread/session instead of speaking only as a raw child task. - Matrix/history: add optional room history context for Matrix group triggers via `channels.matrix.historyLimit`, with per-agent watermarks and retry-safe snapshots so failed trigger retries do not drift into newer room messages. (#57022) thanks @chain710. - Diffs: skip unused viewer-versus-file SSR preload work so `diffs` view-only and file-only runs do less render work while keeping mode outputs aligned. (#57909) thanks @gumadeiras. +- Matrix/threads: add per-DM `threadReplies` overrides and keep thread session isolation aligned with the effective room or DM thread policy from the triggering message onward. (#57995) thanks @teconomix. ### Fixes diff --git a/docs/channels/matrix.md b/docs/channels/matrix.md index 6346370e1cc..4b51952800e 100644 --- a/docs/channels/matrix.md +++ b/docs/channels/matrix.md @@ -143,6 +143,7 @@ This is a practical baseline config with DM pairing, room allowlist, and E2EE en dm: { policy: "pairing", + threadReplies: "off", }, groupPolicy: "allowlist", @@ -501,9 +502,10 @@ The repair flow does not delete old rooms automatically. It only picks the healt Matrix supports native Matrix threads for both automatic replies and message-tool sends. -- `threadReplies: "off"` keeps replies top-level. +- `threadReplies: "off"` keeps replies top-level and keeps inbound threaded messages on the parent session. - `threadReplies: "inbound"` replies inside a thread only when the inbound message was already in that thread. -- `threadReplies: "always"` keeps room replies in a thread rooted at the triggering message. +- `threadReplies: "always"` keeps room replies in a thread rooted at the triggering message and routes that conversation through the matching thread-scoped session from the first triggering message. +- `dm.threadReplies` overrides the top-level setting for DMs only. For example, you can keep room threads isolated while keeping DMs flat. - Inbound threaded messages include the thread root message as extra agent context. - Message-tool sends now auto-inherit the current Matrix thread when the target is the same room, or the same DM user target, unless an explicit `threadId` is provided. - Runtime thread bindings are supported for Matrix. `/focus`, `/unfocus`, `/agents`, `/session idle`, `/session max-age`, and thread-bound `/acp spawn` now work in Matrix rooms and DMs. @@ -595,6 +597,7 @@ Current behavior: dm: { policy: "allowlist", allowFrom: ["@admin:example.org"], + threadReplies: "off", }, groupPolicy: "allowlist", groupAllowFrom: ["@admin:example.org"], @@ -642,6 +645,7 @@ See [Pairing](/channels/pairing) for the shared DM pairing flow and storage layo dm: { policy: "allowlist", allowFrom: ["@ops:example.org"], + threadReplies: "off", }, }, }, @@ -757,8 +761,9 @@ Live directory lookup uses the logged-in Matrix account: - `mediaMaxMb`: media size cap in MB for Matrix media handling. It applies to outbound sends and inbound media processing. - `autoJoin`: invite auto-join policy (`always`, `allowlist`, `off`). Default: `off`. - `autoJoinAllowlist`: rooms/aliases allowed when `autoJoin` is `allowlist`. Alias entries are resolved to room IDs during invite handling; OpenClaw does not trust alias state claimed by the invited room. -- `dm`: DM policy block (`enabled`, `policy`, `allowFrom`). +- `dm`: DM policy block (`enabled`, `policy`, `allowFrom`, `threadReplies`). - `dm.allowFrom` entries should be full Matrix user IDs unless you already resolved them through live directory lookup. +- `dm.threadReplies`: DM-only thread policy override (`off`, `inbound`, `always`). It overrides the top-level `threadReplies` setting for both reply placement and session isolation in DMs. - `accounts`: named per-account overrides. Top-level `channels.matrix` values act as defaults for these entries. - `groups`: per-room policy map. Prefer room IDs or aliases; unresolved room names are ignored at runtime. Session/group identity uses the stable room ID after resolution, while human-readable labels still come from room names. - `rooms`: legacy alias for `groups`. diff --git a/extensions/matrix/src/config-schema.test.ts b/extensions/matrix/src/config-schema.test.ts index ee22517999d..ab441289ed0 100644 --- a/extensions/matrix/src/config-schema.test.ts +++ b/extensions/matrix/src/config-schema.test.ts @@ -18,4 +18,16 @@ describe("MatrixConfigSchema SecretInput", () => { }); expect(result.success).toBe(true); }); + + it("accepts dm threadReplies overrides", () => { + const result = MatrixConfigSchema.safeParse({ + homeserver: "https://matrix.example.org", + accessToken: "token", + dm: { + policy: "pairing", + threadReplies: "off", + }, + }); + expect(result.success).toBe(true); + }); }); diff --git a/extensions/matrix/src/config-schema.ts b/extensions/matrix/src/config-schema.ts index 91326daaa34..7dd3c962fcc 100644 --- a/extensions/matrix/src/config-schema.ts +++ b/extensions/matrix/src/config-schema.ts @@ -1,6 +1,5 @@ import { AllowFromListSchema, - buildNestedDmConfigSchema, DmPolicySchema, GroupPolicySchema, MarkdownConfigSchema, @@ -84,7 +83,14 @@ export const MatrixConfigSchema = z.object({ autoJoin: z.enum(["always", "allowlist", "off"]).optional(), autoJoinAllowlist: AllowFromListSchema, groupAllowFrom: AllowFromListSchema, - dm: buildNestedDmConfigSchema(), + dm: z + .object({ + enabled: z.boolean().optional(), + policy: DmPolicySchema.optional(), + allowFrom: AllowFromListSchema, + threadReplies: z.enum(["off", "inbound", "always"]).optional(), + }) + .optional(), groups: z.object({}).catchall(matrixRoomSchema).optional(), rooms: z.object({}).catchall(matrixRoomSchema).optional(), actions: matrixActionSchema, diff --git a/extensions/matrix/src/matrix/monitor/handler.body-for-agent.test.ts b/extensions/matrix/src/matrix/monitor/handler.body-for-agent.test.ts index cbfb5eb1c7d..fea06d85522 100644 --- a/extensions/matrix/src/matrix/monitor/handler.body-for-agent.test.ts +++ b/extensions/matrix/src/matrix/monitor/handler.body-for-agent.test.ts @@ -51,9 +51,39 @@ describe("createMatrixRoomMessageHandler inbound body formatting", () => { ThreadStarterBody: "Matrix thread root $thread-root from Alice:\nRoot topic", }), ); + // Thread messages get thread-scoped session keys (thread isolation feature). expect(recordInboundSession).toHaveBeenCalledWith( expect.objectContaining({ - sessionKey: "agent:ops:main", + sessionKey: "agent:ops:main:thread:$thread-root", + }), + ); + }); + + it("starts the thread-scoped session from the triggering message when threadReplies is always", async () => { + const { handler, finalizeInboundContext, recordInboundSession } = + createMatrixHandlerTestHarness({ + isDirectMessage: false, + threadReplies: "always", + }); + + await handler( + "!room:example.org", + createMatrixTextMessageEvent({ + eventId: "$thread-root", + body: "@room start thread", + mentions: { room: true }, + }), + ); + + expect(finalizeInboundContext).toHaveBeenCalledWith( + expect.objectContaining({ + MessageThreadId: "$thread-root", + ReplyToId: undefined, + }), + ); + expect(recordInboundSession).toHaveBeenCalledWith( + expect.objectContaining({ + sessionKey: "agent:ops:main:thread:$thread-root", }), ); }); diff --git a/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts b/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts index bf06aa24375..df727311ed4 100644 --- a/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts +++ b/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts @@ -30,6 +30,7 @@ type MatrixHandlerTestHarnessOptions = { groupPolicy?: "open" | "allowlist" | "disabled"; replyToMode?: ReplyToMode; threadReplies?: "off" | "inbound" | "always"; + dmThreadReplies?: "off" | "inbound" | "always"; streaming?: "partial" | "off"; dmEnabled?: boolean; dmPolicy?: "pairing" | "allowlist" | "open" | "disabled"; @@ -211,6 +212,7 @@ export function createMatrixHandlerTestHarness( groupPolicy: options.groupPolicy ?? "open", replyToMode: options.replyToMode ?? "off", threadReplies: options.threadReplies ?? "inbound", + dmThreadReplies: options.dmThreadReplies, streaming: options.streaming ?? "off", dmEnabled: options.dmEnabled ?? true, dmPolicy: options.dmPolicy ?? "open", diff --git a/extensions/matrix/src/matrix/monitor/handler.test.ts b/extensions/matrix/src/matrix/monitor/handler.test.ts index 7bbf0a49c26..e4cc8491741 100644 --- a/extensions/matrix/src/matrix/monitor/handler.test.ts +++ b/extensions/matrix/src/matrix/monitor/handler.test.ts @@ -71,6 +71,7 @@ function createReactionHarness(params?: { targetSender?: string; isDirectMessage?: boolean; senderName?: string; + client?: NonNullable[0]>["client"]; }) { return createMatrixHandlerTestHarness({ cfg: params?.cfg, @@ -79,6 +80,7 @@ function createReactionHarness(params?: { readAllowFromStore: vi.fn(async () => params?.storeAllowFrom ?? []), client: { getEvent: async () => ({ sender: params?.targetSender ?? "@bot:example.org" }), + ...params?.client, }, isDirectMessage: params?.isDirectMessage, getMemberDisplayName: async () => params?.senderName ?? "sender", @@ -626,6 +628,53 @@ describe("matrix monitor handler pairing account scope", () => { ThreadStarterBody: "Matrix thread root $root from Alice:\nRoot topic", }), ); + expect(recordInboundSession).toHaveBeenCalledWith( + expect.objectContaining({ + sessionKey: "agent:ops:main:thread:$root", + }), + ); + }); + + it("keeps threaded DMs flat when dm threadReplies is off", async () => { + const { handler, finalizeInboundContext, recordInboundSession } = + createMatrixHandlerTestHarness({ + threadReplies: "always", + dmThreadReplies: "off", + isDirectMessage: true, + client: { + getEvent: async (_roomId, eventId) => + eventId === "$root" + ? createMatrixTextMessageEvent({ + eventId: "$root", + sender: "@alice:example.org", + body: "Root topic", + }) + : ({ sender: "@bot:example.org" } as never), + }, + getMemberDisplayName: async (_roomId, userId) => + userId === "@alice:example.org" ? "Alice" : "sender", + }); + + await handler( + "!dm:example.org", + createMatrixTextMessageEvent({ + eventId: "$reply1", + body: "follow up", + relatesTo: { + rel_type: "m.thread", + event_id: "$root", + "m.in_reply_to": { event_id: "$root" }, + }, + }), + ); + + expect(finalizeInboundContext).toHaveBeenCalledWith( + expect.objectContaining({ + MessageThreadId: undefined, + ReplyToId: "$root", + ThreadStarterBody: "Matrix thread root $root from Alice:\nRoot topic", + }), + ); expect(recordInboundSession).toHaveBeenCalledWith( expect.objectContaining({ sessionKey: "agent:ops:main", @@ -1006,6 +1055,88 @@ describe("matrix monitor handler pairing account scope", () => { ); }); + it("keeps threaded DM reaction notifications on the flat session when dm threadReplies is off", async () => { + const { handler, enqueueSystemEvent } = createReactionHarness({ + cfg: { + channels: { + matrix: { + threadReplies: "always", + dm: { threadReplies: "off" }, + }, + }, + }, + isDirectMessage: true, + client: { + getEvent: async () => + createMatrixTextMessageEvent({ + eventId: "$reply1", + sender: "@bot:example.org", + body: "follow up", + relatesTo: { + rel_type: "m.thread", + event_id: "$root", + "m.in_reply_to": { event_id: "$root" }, + }, + }), + }, + }); + + await handler( + "!dm:example.org", + createMatrixReactionEvent({ + eventId: "$reaction-thread", + targetEventId: "$reply1", + key: "🎯", + }), + ); + + expect(enqueueSystemEvent).toHaveBeenCalledWith( + "Matrix reaction added: 🎯 by sender on msg $reply1", + { + sessionKey: "agent:ops:main", + contextKey: "matrix:reaction:add:!dm:example.org:$reply1:@user:example.org:🎯", + }, + ); + }); + + it("routes thread-root reaction notifications to the thread session when threadReplies is always", async () => { + const { handler, enqueueSystemEvent } = createReactionHarness({ + cfg: { + channels: { + matrix: { + threadReplies: "always", + }, + }, + }, + isDirectMessage: false, + client: { + getEvent: async () => + createMatrixTextMessageEvent({ + eventId: "$root", + sender: "@bot:example.org", + body: "start thread", + }), + }, + }); + + await handler( + "!room:example.org", + createMatrixReactionEvent({ + eventId: "$reaction-root", + targetEventId: "$root", + key: "🧵", + }), + ); + + expect(enqueueSystemEvent).toHaveBeenCalledWith( + "Matrix reaction added: 🧵 by sender on msg $root", + { + sessionKey: "agent:ops:main:thread:$root", + contextKey: "matrix:reaction:add:!room:example.org:$root:@user:example.org:🧵", + }, + ); + }); + it("ignores reactions that do not target bot-authored messages", async () => { const { handler, enqueueSystemEvent, resolveAgentRoute } = createReactionHarness({ targetSender: "@other:example.org", diff --git a/extensions/matrix/src/matrix/monitor/handler.ts b/extensions/matrix/src/matrix/monitor/handler.ts index 5fba58af5c9..31586fb8e62 100644 --- a/extensions/matrix/src/matrix/monitor/handler.ts +++ b/extensions/matrix/src/matrix/monitor/handler.ts @@ -53,7 +53,7 @@ import { createMatrixThreadContextResolver } from "./thread-context.js"; import { resolveMatrixReplyToEventId, resolveMatrixThreadRootId, - resolveMatrixThreadTarget, + resolveMatrixThreadRouting, } from "./threads.js"; import type { MatrixRawEvent, RoomMessageEventContent } from "./types.js"; import { EventType, RelationType } from "./types.js"; @@ -80,6 +80,8 @@ export type MatrixMonitorHandlerParams = { groupPolicy: "open" | "allowlist" | "disabled"; replyToMode: ReplyToMode; threadReplies: "off" | "inbound" | "always"; + /** DM-specific threadReplies override. Falls back to threadReplies when absent. */ + dmThreadReplies?: "off" | "inbound" | "always"; streaming: "partial" | "off"; dmEnabled: boolean; dmPolicy: "open" | "pairing" | "allowlist" | "disabled"; @@ -196,6 +198,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam groupPolicy, replyToMode, threadReplies, + dmThreadReplies, streaming, dmEnabled, dmPolicy, @@ -630,6 +633,13 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam const _messageId = event.event_id ?? ""; const _threadRootId = resolveMatrixThreadRootId({ event, content }); + const thread = resolveMatrixThreadRouting({ + isDirectMessage, + threadReplies, + dmThreadReplies, + messageId: _messageId, + threadRootId: _threadRootId, + }); const { route: _route, configuredBinding: _configuredBinding, @@ -640,8 +650,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam roomId, senderId, isDirectMessage, - messageId: _messageId, - threadRootId: _threadRootId, + threadId: thread.threadId, eventTs: eventTs ?? undefined, resolveAgentRoute: core.channel.routing.resolveAgentRoute, }); @@ -850,6 +859,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam messageId: _messageId, triggerSnapshot, threadRootId: _threadRootId, + thread, }; }; const ingressResult = @@ -899,17 +909,13 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam messageId: _messageId, triggerSnapshot, threadRootId: _threadRootId, + thread, } = resolvedIngressResult; // Keep the per-room ingress gate focused on ordering-sensitive state updates. // Prompt/session enrichment below can run concurrently after the history snapshot is fixed. const replyToEventId = resolveMatrixReplyToEventId(event.content as RoomMessageEventContent); - const threadTarget = resolveMatrixThreadTarget({ - threadReplies, - messageId: _messageId, - threadRootId: _threadRootId, - isThreadRoot: false, - }); + const threadTarget = thread.threadId; const threadContext = _threadRootId ? await resolveThreadContext({ roomId, threadRootId: _threadRootId }) : undefined; diff --git a/extensions/matrix/src/matrix/monitor/index.ts b/extensions/matrix/src/matrix/monitor/index.ts index d695ae30013..01b6a675f0d 100644 --- a/extensions/matrix/src/matrix/monitor/index.ts +++ b/extensions/matrix/src/matrix/monitor/index.ts @@ -186,6 +186,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi const groupPolicy = allowlistOnly && groupPolicyRaw === "open" ? "allowlist" : groupPolicyRaw; const replyToMode = opts.replyToMode ?? accountConfig.replyToMode ?? "off"; const threadReplies = accountConfig.threadReplies ?? "inbound"; + const dmThreadReplies = accountConfig.dm?.threadReplies; const threadBindingIdleTimeoutMs = resolveThreadBindingIdleTimeoutMsForChannel({ cfg, channel: "matrix", @@ -244,6 +245,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi groupPolicy, replyToMode, threadReplies, + dmThreadReplies, streaming, dmEnabled, dmPolicy, diff --git a/extensions/matrix/src/matrix/monitor/reaction-events.ts b/extensions/matrix/src/matrix/monitor/reaction-events.ts index 08af65857ad..06ffce55183 100644 --- a/extensions/matrix/src/matrix/monitor/reaction-events.ts +++ b/extensions/matrix/src/matrix/monitor/reaction-events.ts @@ -5,7 +5,7 @@ import { resolveMatrixAccountConfig } from "../accounts.js"; import { extractMatrixReactionAnnotation } from "../reaction-common.js"; import type { MatrixClient } from "../sdk.js"; import { resolveMatrixInboundRoute } from "./route.js"; -import { resolveMatrixThreadRootId } from "./threads.js"; +import { resolveMatrixThreadRootId, resolveMatrixThreadRouting } from "./threads.js"; import type { MatrixRawEvent, RoomMessageEventContent } from "./types.js"; export type MatrixReactionNotificationMode = "off" | "own"; @@ -73,14 +73,24 @@ export async function handleInboundMatrixReaction(params: { content: targetContent, }) : undefined; + const accountConfig = resolveMatrixAccountConfig({ + cfg: params.cfg, + accountId: params.accountId, + }); + const thread = resolveMatrixThreadRouting({ + isDirectMessage: params.isDirectMessage, + threadReplies: accountConfig.threadReplies ?? "inbound", + dmThreadReplies: accountConfig.dm?.threadReplies, + messageId: reaction.eventId, + threadRootId, + }); const { route, runtimeBindingId } = resolveMatrixInboundRoute({ cfg: params.cfg, accountId: params.accountId, roomId: params.roomId, senderId: params.senderId, isDirectMessage: params.isDirectMessage, - messageId: reaction.eventId, - threadRootId, + threadId: thread.threadId, eventTs: params.event.origin_server_ts, resolveAgentRoute: params.core.channel.routing.resolveAgentRoute, }); diff --git a/extensions/matrix/src/matrix/monitor/route.test.ts b/extensions/matrix/src/matrix/monitor/route.test.ts index c6f503ba11d..ad8a69dc39e 100644 --- a/extensions/matrix/src/matrix/monitor/route.test.ts +++ b/extensions/matrix/src/matrix/monitor/route.test.ts @@ -24,7 +24,6 @@ function resolveDmRoute(cfg: OpenClawConfig) { roomId: "!dm:example.org", senderId: "@alice:example.org", isDirectMessage: true, - messageId: "$msg1", resolveAgentRoute, }); } @@ -187,3 +186,54 @@ describe("resolveMatrixInboundRoute", () => { expect(touch).not.toHaveBeenCalled(); }); }); + +describe("resolveMatrixInboundRoute thread-isolated sessions", () => { + beforeEach(() => { + sessionBindingTesting.resetSessionBindingAdaptersForTests(); + setActivePluginRegistry( + createTestRegistry([{ pluginId: "matrix", source: "test", plugin: matrixPlugin }]), + ); + }); + + it("scopes session key to thread when a thread id is provided", () => { + const { route } = resolveMatrixInboundRoute({ + cfg: baseCfg as never, + accountId: "ops", + roomId: "!room:example.org", + senderId: "@alice:example.org", + isDirectMessage: false, + threadId: "$thread-root", + resolveAgentRoute, + }); + + expect(route.sessionKey).toContain(":thread:$thread-root"); + expect(route.mainSessionKey).not.toContain(":thread:"); + }); + + it("preserves mixed-case matrix thread ids in session keys", () => { + const { route } = resolveMatrixInboundRoute({ + cfg: baseCfg as never, + accountId: "ops", + roomId: "!room:example.org", + senderId: "@alice:example.org", + isDirectMessage: false, + threadId: "$AbC123:example.org", + resolveAgentRoute, + }); + + expect(route.sessionKey).toContain(":thread:$AbC123:example.org"); + }); + + it("does not scope session key when thread id is absent", () => { + const { route } = resolveMatrixInboundRoute({ + cfg: baseCfg as never, + accountId: "ops", + roomId: "!room:example.org", + senderId: "@alice:example.org", + isDirectMessage: false, + resolveAgentRoute, + }); + + expect(route.sessionKey).not.toContain(":thread:"); + }); +}); diff --git a/extensions/matrix/src/matrix/monitor/route.ts b/extensions/matrix/src/matrix/monitor/route.ts index 0cd6a0a8acf..203dc486ae0 100644 --- a/extensions/matrix/src/matrix/monitor/route.ts +++ b/extensions/matrix/src/matrix/monitor/route.ts @@ -5,6 +5,7 @@ import { type PluginRuntime, } from "../../runtime-api.js"; import type { CoreConfig } from "../../types.js"; +import { resolveMatrixThreadSessionKeys } from "./threads.js"; type MatrixResolvedRoute = ReturnType; @@ -14,8 +15,7 @@ export function resolveMatrixInboundRoute(params: { roomId: string; senderId: string; isDirectMessage: boolean; - messageId: string; - threadRootId?: string; + threadId?: string; eventTs?: number; resolveAgentRoute: PluginRuntime["channel"]["routing"]["resolveAgentRoute"]; }): { @@ -40,12 +40,8 @@ export function resolveMatrixInboundRoute(params: { } : undefined, }); - const bindingConversationId = - params.threadRootId && params.threadRootId !== params.messageId - ? params.threadRootId - : params.roomId; - const bindingParentConversationId = - bindingConversationId === params.roomId ? undefined : params.roomId; + const bindingConversationId = params.threadId ?? params.roomId; + const bindingParentConversationId = params.threadId ? params.roomId : undefined; const sessionBindingService = getSessionBindingService(); const runtimeBinding = sessionBindingService.resolveByConversation({ channel: "matrix", @@ -80,19 +76,39 @@ export function resolveMatrixInboundRoute(params: { : null; const configuredSessionKey = configuredBinding?.record.targetSessionKey?.trim(); + const effectiveRoute = + configuredBinding && configuredSessionKey + ? { + ...baseRoute, + sessionKey: configuredSessionKey, + agentId: + resolveAgentIdFromSessionKey(configuredSessionKey) || + configuredBinding.spec.agentId || + baseRoute.agentId, + matchedBy: "binding.channel" as const, + } + : baseRoute; + + // When no binding overrides the session key, isolate threads into their own sessions. + if (!configuredBinding && !configuredSessionKey && params.threadId) { + const threadKeys = resolveMatrixThreadSessionKeys({ + baseSessionKey: effectiveRoute.sessionKey, + threadId: params.threadId, + parentSessionKey: effectiveRoute.sessionKey, + }); + return { + route: { + ...effectiveRoute, + sessionKey: threadKeys.sessionKey, + mainSessionKey: threadKeys.parentSessionKey ?? effectiveRoute.sessionKey, + }, + configuredBinding, + runtimeBindingId: null, + }; + } + return { - route: - configuredBinding && configuredSessionKey - ? { - ...baseRoute, - sessionKey: configuredSessionKey, - agentId: - resolveAgentIdFromSessionKey(configuredSessionKey) || - configuredBinding.spec.agentId || - baseRoute.agentId, - matchedBy: "binding.channel", - } - : baseRoute, + route: effectiveRoute, configuredBinding, runtimeBindingId: null, }; diff --git a/extensions/matrix/src/matrix/monitor/threads.test.ts b/extensions/matrix/src/matrix/monitor/threads.test.ts new file mode 100644 index 00000000000..cfeb07ee20e --- /dev/null +++ b/extensions/matrix/src/matrix/monitor/threads.test.ts @@ -0,0 +1,68 @@ +import { describe, expect, it } from "vitest"; +import { resolveMatrixThreadRouting } from "./threads.js"; + +describe("resolveMatrixThreadRouting", () => { + it("keeps sessions flat when threadReplies is off", () => { + expect( + resolveMatrixThreadRouting({ + isDirectMessage: false, + threadReplies: "off", + messageId: "$reply1", + threadRootId: "$root", + }), + ).toEqual({ + threadId: undefined, + }); + }); + + it("uses the inbound thread root when replies arrive inside an existing thread", () => { + expect( + resolveMatrixThreadRouting({ + isDirectMessage: false, + threadReplies: "inbound", + messageId: "$reply1", + threadRootId: "$root", + }), + ).toEqual({ + threadId: "$root", + }); + }); + + it("keeps top-level inbound messages flat when threadReplies is inbound", () => { + expect( + resolveMatrixThreadRouting({ + isDirectMessage: false, + threadReplies: "inbound", + messageId: "$root", + }), + ).toEqual({ + threadId: undefined, + }); + }); + + it("uses the triggering message as the thread id when threadReplies is always", () => { + expect( + resolveMatrixThreadRouting({ + isDirectMessage: false, + threadReplies: "always", + messageId: "$root", + }), + ).toEqual({ + threadId: "$root", + }); + }); + + it("lets dm.threadReplies override room threading behavior", () => { + expect( + resolveMatrixThreadRouting({ + isDirectMessage: true, + threadReplies: "always", + dmThreadReplies: "off", + messageId: "$reply1", + threadRootId: "$root", + }), + ).toEqual({ + threadId: undefined, + }); + }); +}); diff --git a/extensions/matrix/src/matrix/monitor/threads.ts b/extensions/matrix/src/matrix/monitor/threads.ts index 592b2d53aaa..01e91b56680 100644 --- a/extensions/matrix/src/matrix/monitor/threads.ts +++ b/extensions/matrix/src/matrix/monitor/threads.ts @@ -1,6 +1,26 @@ +import { resolveThreadSessionKeys } from "openclaw/plugin-sdk/routing"; import type { MatrixRawEvent, RoomMessageEventContent } from "./types.js"; import { RelationType } from "./types.js"; +export type MatrixThreadReplies = "off" | "inbound" | "always"; + +export type MatrixThreadRouting = { + threadId?: string; +}; + +export function resolveMatrixThreadSessionKeys(params: { + baseSessionKey: string; + threadId?: string | null; + parentSessionKey?: string; + useSuffix?: boolean; +}): { sessionKey: string; parentSessionKey?: string } { + return resolveThreadSessionKeys({ + ...params, + // Matrix event IDs are opaque and case-sensitive; keep the exact thread root. + normalizeThreadId: (threadId) => threadId, + }); +} + function resolveMatrixRelatedReplyToEventId(relates: unknown): string | undefined { if (!relates || typeof relates !== "object") { return undefined; @@ -17,25 +37,33 @@ function resolveMatrixRelatedReplyToEventId(relates: unknown): string | undefine return undefined; } -export function resolveMatrixThreadTarget(params: { - threadReplies: "off" | "inbound" | "always"; +export function resolveMatrixThreadRouting(params: { + isDirectMessage: boolean; + threadReplies: MatrixThreadReplies; + dmThreadReplies?: MatrixThreadReplies; messageId: string; threadRootId?: string; isThreadRoot?: boolean; -}): string | undefined { - const { threadReplies, messageId, threadRootId } = params; - if (threadReplies === "off") { - return undefined; - } +}): MatrixThreadRouting { + const effectiveThreadReplies = + params.isDirectMessage && params.dmThreadReplies !== undefined + ? params.dmThreadReplies + : params.threadReplies; + const messageId = params.messageId.trim(); + const threadRootId = params.threadRootId?.trim(); const isThreadRoot = params.isThreadRoot === true; - const hasInboundThread = Boolean(threadRootId && threadRootId !== messageId && !isThreadRoot); - if (threadReplies === "inbound") { - return hasInboundThread ? threadRootId : undefined; - } - if (threadReplies === "always") { - return threadRootId ?? messageId; - } - return undefined; + const inboundThreadId = + threadRootId && threadRootId !== messageId && !isThreadRoot ? threadRootId : undefined; + const threadId = + effectiveThreadReplies === "off" + ? undefined + : effectiveThreadReplies === "inbound" + ? inboundThreadId + : (inboundThreadId ?? (messageId || undefined)); + + return { + threadId, + }; } export function resolveMatrixThreadRootId(params: { diff --git a/extensions/matrix/src/types.ts b/extensions/matrix/src/types.ts index d161f5b23ca..066aee1c7f2 100644 --- a/extensions/matrix/src/types.ts +++ b/extensions/matrix/src/types.ts @@ -10,6 +10,8 @@ export type MatrixDmConfig = { policy?: DmPolicy; /** Allowlist for DM senders (matrix user IDs or "*"). */ allowFrom?: Array; + /** Per-DM thread reply behavior override (off|inbound|always). Overrides top-level threadReplies for direct messages. */ + threadReplies?: "off" | "inbound" | "always"; }; export type MatrixRoomConfig = { diff --git a/src/agents/tools/sessions-send-helpers.test.ts b/src/agents/tools/sessions-send-helpers.test.ts index b08c6d89e71..b66ce99b42d 100644 --- a/src/agents/tools/sessions-send-helpers.test.ts +++ b/src/agents/tools/sessions-send-helpers.test.ts @@ -51,6 +51,28 @@ describe("resolveAnnounceTargetFromKey", () => { }, }, }, + { + pluginId: "matrix", + source: "test", + plugin: { + id: "matrix", + meta: { + id: "matrix", + label: "Matrix", + selectionLabel: "Matrix", + docsPath: "/channels/matrix", + blurb: "Matrix test stub.", + }, + capabilities: { chatTypes: ["direct", "channel", "thread"] }, + messaging: { + resolveSessionTarget: ({ id }: { id: string }) => `channel:${id}`, + }, + config: { + listAccountIds: () => ["default"], + resolveAccount: () => ({}), + }, + }, + }, { pluginId: "telegram", source: "test", @@ -107,4 +129,16 @@ describe("resolveAnnounceTargetFromKey", () => { threadId: "1699999999.0001", }); }); + + it("preserves colon-delimited matrix ids for channel and thread targets", () => { + expect( + resolveAnnounceTargetFromKey( + "agent:main:matrix:channel:!room:example.org:thread:$AbC123:example.org", + ), + ).toEqual({ + channel: "matrix", + to: "channel:!room:example.org", + threadId: "$AbC123:example.org", + }); + }); }); diff --git a/src/agents/tools/sessions-send-helpers.ts b/src/agents/tools/sessions-send-helpers.ts index cb104204e3c..db2d24293aa 100644 --- a/src/agents/tools/sessions-send-helpers.ts +++ b/src/agents/tools/sessions-send-helpers.ts @@ -4,6 +4,7 @@ import { } from "../../channels/plugins/index.js"; import { normalizeChannelId as normalizeChatChannelId } from "../../channels/registry.js"; import type { OpenClawConfig } from "../../config/config.js"; +import { parseSessionThreadInfo } from "../../config/sessions/delivery-info.js"; const ANNOUNCE_SKIP_TOKEN = "ANNOUNCE_SKIP"; const REPLY_SKIP_TOKEN = "REPLY_SKIP"; @@ -28,20 +29,9 @@ export function resolveAnnounceTargetFromKey(sessionKey: string): AnnounceTarget return null; } - // Extract topic/thread ID from rest (supports both :topic: and :thread:) - // Telegram uses :topic:, other platforms use :thread: - let threadId: string | undefined; const restJoined = rest.join(":"); - const topicMatch = restJoined.match(/:topic:([^:]+)$/); - const threadMatch = restJoined.match(/:thread:([^:]+)$/); - const match = topicMatch || threadMatch; - - if (match) { - threadId = match[1]; // Keep as string to match AgentCommandOpts.threadId - } - - // Remove :topic:N or :thread:N suffix from ID for target - const id = match ? restJoined.replace(/:(topic|thread):[^:]+$/, "") : restJoined.trim(); + const { baseSessionKey, threadId } = parseSessionThreadInfo(restJoined); + const id = (baseSessionKey ?? restJoined).trim(); if (!id) { return null; diff --git a/src/auto-reply/reply/commands-session-lifecycle.test.ts b/src/auto-reply/reply/commands-session-lifecycle.test.ts index 2b2b997a6a6..4952334391e 100644 --- a/src/auto-reply/reply/commands-session-lifecycle.test.ts +++ b/src/auto-reply/reply/commands-session-lifecycle.test.ts @@ -176,6 +176,21 @@ function createMatrixThreadCommandParams(commandBody: string, overrides?: Record }); } +function createMatrixTriggerThreadCommandParams( + commandBody: string, + overrides?: Record, +) { + return buildCommandTestParams(commandBody, baseCfg, { + Provider: "matrix", + Surface: "matrix", + OriginatingChannel: "matrix", + OriginatingTo: "room:!room:example.org", + AccountId: "default", + MessageThreadId: "$root", + ...overrides, + }); +} + function createMatrixRoomCommandParams(commandBody: string, overrides?: Record) { return buildCommandTestParams(commandBody, baseCfg, { Provider: "matrix", @@ -248,6 +263,21 @@ function createMatrixBinding(overrides?: Partial): Session }; } +function createMatrixTriggerBinding( + overrides?: Partial, +): SessionBindingRecord { + return createMatrixBinding({ + bindingId: "default:$root", + conversation: { + channel: "matrix", + accountId: "default", + conversationId: "$root", + parentConversationId: "!room:example.org", + }, + ...overrides, + }); +} + function expectIdleTimeoutSetReply( mock: ReturnType, text: string, @@ -409,6 +439,40 @@ describe("/session idle and /session max-age", () => { ); }); + it("sets idle timeout for the triggering Matrix always-thread turn", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-02-20T00:00:00.000Z")); + + hoisted.sessionBindingResolveByConversationMock.mockReturnValue(createMatrixTriggerBinding()); + hoisted.setMatrixThreadBindingIdleTimeoutBySessionKeyMock.mockReturnValue([ + { + targetSessionKey: "agent:main:subagent:child", + boundAt: Date.now(), + lastActivityAt: Date.now(), + idleTimeoutMs: 2 * 60 * 60 * 1000, + }, + ]); + + const result = await handleSessionCommand( + createMatrixTriggerThreadCommandParams("/session idle 2h"), + true, + ); + const text = result?.reply?.text ?? ""; + + expect(hoisted.sessionBindingResolveByConversationMock).toHaveBeenCalledWith({ + channel: "matrix", + accountId: "default", + conversationId: "$root", + parentConversationId: "!room:example.org", + }); + expectIdleTimeoutSetReply( + hoisted.setMatrixThreadBindingIdleTimeoutBySessionKeyMock, + text, + 2 * 60 * 60 * 1000, + "2h", + ); + }); + it("sets max age for focused Matrix threads", async () => { vi.useFakeTimers(); vi.setSystemTime(new Date("2026-02-20T00:00:00.000Z")); diff --git a/src/auto-reply/reply/commands-subagents-focus.test.ts b/src/auto-reply/reply/commands-subagents-focus.test.ts index de799e5208b..b6c14f0bb2c 100644 --- a/src/auto-reply/reply/commands-subagents-focus.test.ts +++ b/src/auto-reply/reply/commands-subagents-focus.test.ts @@ -116,6 +116,22 @@ function createMatrixThreadCommandParams(commandBody: string, cfg: OpenClawConfi return params; } +function createMatrixTriggerThreadCommandParams( + commandBody: string, + cfg: OpenClawConfig = baseCfg, +) { + const params = buildCommandTestParams(commandBody, cfg, { + Provider: "matrix", + Surface: "matrix", + OriginatingChannel: "matrix", + OriginatingTo: "room:!room:example.org", + AccountId: "default", + MessageThreadId: "$root", + }); + params.command.senderId = "user-1"; + return params; +} + function createMatrixRoomCommandParams(commandBody: string, cfg: OpenClawConfig = baseCfg) { const params = buildCommandTestParams(commandBody, cfg, { Provider: "matrix", @@ -282,6 +298,22 @@ describe("/focus, /unfocus, /agents", () => { ); }); + it("/focus treats the triggering Matrix always-thread turn as the current thread", async () => { + const result = await focusCodexAcp(createMatrixTriggerThreadCommandParams("/focus codex-acp")); + + expect(result?.reply?.text).toContain("bound this thread"); + expect(hoisted.sessionBindingBindMock).toHaveBeenCalledWith( + expect.objectContaining({ + placement: "current", + conversation: expect.objectContaining({ + channel: "matrix", + conversationId: "$root", + parentConversationId: "!room:example.org", + }), + }), + ); + }); + it("/focus rejects Matrix top-level thread creation when spawnSubagentSessions is disabled", async () => { const cfg = { ...baseCfg, diff --git a/src/config/sessions/delivery-info.test.ts b/src/config/sessions/delivery-info.test.ts index 9a435c38a7a..88c01c5a0d1 100644 --- a/src/config/sessions/delivery-info.test.ts +++ b/src/config/sessions/delivery-info.test.ts @@ -44,6 +44,14 @@ describe("extractDeliveryInfo", () => { baseSessionKey: "agent:main:slack:channel:C1", threadId: "123.456", }); + expect( + parseSessionThreadInfo( + "agent:main:matrix:channel:!room:example.org:thread:$AbC123:example.org", + ), + ).toEqual({ + baseSessionKey: "agent:main:matrix:channel:!room:example.org", + threadId: "$AbC123:example.org", + }); expect(parseSessionThreadInfo("agent:main:telegram:dm:user-1")).toEqual({ baseSessionKey: "agent:main:telegram:dm:user-1", threadId: undefined,