diff --git a/CHANGELOG.md b/CHANGELOG.md index 8d1c4c0ccc3..5f6b29b84c0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -83,6 +83,7 @@ Docs: https://docs.openclaw.ai - Heartbeat/runner: guarantee the interval timer is re-armed after heartbeat runs and unexpected runner errors so scheduled heartbeats do not silently stop after an interrupted cycle. (#52270) Thanks @MiloStack. - Config/Doctor: rewrite stale bundled plugin load paths from legacy `extensions/*` locations to the packaged bundled path, including directory-name mismatches and slash-suffixed config entries. (#55054) Thanks @SnowSky1. - WhatsApp/mentions: stop treating mentions embedded in quoted messages as direct mentions so replying to a message that @mentioned the bot no longer falsely triggers mention gating. (#52711) Thanks @lurebat. +- Matrix: keep separate 2-person rooms out of DM routing after `m.direct` seeds successfully, while still honoring explicit `is_direct` state and startup fallback recovery. (#54890) thanks @private-peter - Agents/ollama fallback: surface non-2xx Ollama HTTP errors with a leading status code so HTTP 503 responses trigger model fallback again. (#55214) Thanks @bugkill3r. - Feishu/tools: stop synthetic agent ids like `agent-spawner` from being treated as Feishu account ids during tool execution, so tools fall back to the configured/default Feishu account unless the contextual id is a real enabled Feishu account. (#55627) Thanks @MonkeyLeeT. - Google/tools: strip empty `required: []` arrays from Gemini tool schemas so optional-only tool parameters no longer trigger Google validator 400s. (#52106) Thanks @oliviareid-svg. diff --git a/extensions/matrix/src/matrix/direct-room.ts b/extensions/matrix/src/matrix/direct-room.ts index a25004dbeb1..8060e6fa994 100644 --- a/extensions/matrix/src/matrix/direct-room.ts +++ b/extensions/matrix/src/matrix/direct-room.ts @@ -45,6 +45,23 @@ export async function readJoinedMatrixMembers( } } +export async function hasDirectMatrixMemberFlag( + client: MatrixClient, + roomId: string, + userId?: string | null, +): Promise { + const normalizedUserId = trimMaybeString(userId); + if (!normalizedUserId) { + return false; + } + try { + const state = await client.getRoomStateEvent(roomId, "m.room.member", normalizedUserId); + return state?.is_direct === true; + } catch { + return false; + } +} + export async function isStrictDirectRoom(params: { client: MatrixClient; roomId: string; diff --git a/extensions/matrix/src/matrix/monitor/direct.test.ts b/extensions/matrix/src/matrix/monitor/direct.test.ts index 95528da110d..36d53322c3e 100644 --- a/extensions/matrix/src/matrix/monitor/direct.test.ts +++ b/extensions/matrix/src/matrix/monitor/direct.test.ts @@ -2,15 +2,33 @@ import { afterEach, describe, expect, it, vi } from "vitest"; import type { MatrixClient } from "../sdk.js"; import { createDirectRoomTracker } from "./direct.js"; -function createMockClient(params: { isDm?: boolean; members?: string[] }) { +type MockStateEvents = Record>; + +function createMockClient(params: { + isDm?: boolean; + members?: string[]; + stateEvents?: MockStateEvents; + dmCacheAvailable?: boolean; +}) { let members = params.members ?? ["@alice:example.org", "@bot:example.org"]; + const stateEvents = params.stateEvents ?? {}; return { dms: { - update: vi.fn().mockResolvedValue(undefined), + update: vi.fn().mockResolvedValue(params.dmCacheAvailable !== false), isDm: vi.fn().mockReturnValue(params.isDm === true), }, getUserId: vi.fn().mockResolvedValue("@bot:example.org"), getJoinedRoomMembers: vi.fn().mockImplementation(async () => members), + getRoomStateEvent: vi + .fn() + .mockImplementation(async (roomId: string, eventType: string, stateKey = "") => { + const key = `${roomId}|${eventType}|${stateKey}`; + const state = stateEvents[key]; + if (state === undefined) { + throw new Error(`State event not found: ${key}`); + } + return state; + }), __setMembers(next: string[]) { members = next; }, @@ -20,6 +38,7 @@ function createMockClient(params: { isDm?: boolean; members?: string[] }) { isDm: ReturnType; }; getJoinedRoomMembers: ReturnType; + getRoomStateEvent: ReturnType; __setMembers: (members: string[]) => void; }; } @@ -61,7 +80,7 @@ describe("createDirectRoomTracker", () => { }); it("does not classify 2-member rooms as DMs when the dm cache refresh succeeds", async () => { - const client = createMockClient({ isDm: false }); + const client = createMockClient({ isDm: false, dmCacheAvailable: true }); const tracker = createDirectRoomTracker(client); await expect( @@ -74,9 +93,8 @@ describe("createDirectRoomTracker", () => { expect(client.getJoinedRoomMembers).toHaveBeenCalledWith("!room:example.org"); }); - it("falls back to strict 2-member membership when dm cache refresh fails", async () => { - const client = createMockClient({ isDm: false }); - client.dms.update.mockRejectedValue(new Error("dm cache unavailable")); + it("falls back to strict 2-member membership before m.direct account data is available", async () => { + const client = createMockClient({ isDm: false, dmCacheAvailable: false }); const tracker = createDirectRoomTracker(client); await expect( @@ -89,12 +107,84 @@ describe("createDirectRoomTracker", () => { expect(client.getJoinedRoomMembers).toHaveBeenCalledWith("!room:example.org"); }); + it("keeps using the strict 2-member fallback until the dm cache seeds successfully", async () => { + const client = createMockClient({ isDm: false, dmCacheAvailable: false }); + const tracker = createDirectRoomTracker(client); + + await expect( + tracker.isDirectMessage({ + roomId: "!room:example.org", + senderId: "@alice:example.org", + }), + ).resolves.toBe(true); + await expect( + tracker.isDirectMessage({ + roomId: "!room:example.org", + senderId: "@alice:example.org", + }), + ).resolves.toBe(true); + + expect(client.dms.update).toHaveBeenCalledTimes(1); + }); + it("does not classify rooms with extra members as DMs when falling back", async () => { const client = createMockClient({ isDm: false, members: ["@alice:example.org", "@bot:example.org", "@observer:example.org"], + dmCacheAvailable: false, + }); + const tracker = createDirectRoomTracker(client); + + await expect( + tracker.isDirectMessage({ + roomId: "!room:example.org", + senderId: "@alice:example.org", + }), + ).resolves.toBe(false); + + expect(client.getRoomStateEvent).not.toHaveBeenCalled(); + }); + + it("treats sender is_direct member state as a DM signal", async () => { + const client = createMockClient({ + isDm: false, + stateEvents: { + "!room:example.org|m.room.member|@alice:example.org": { is_direct: true }, + }, + }); + const tracker = createDirectRoomTracker(client); + + await expect( + tracker.isDirectMessage({ + roomId: "!room:example.org", + senderId: "@alice:example.org", + }), + ).resolves.toBe(true); + }); + + it("treats self is_direct member state as a DM signal", async () => { + const client = createMockClient({ + isDm: false, + stateEvents: { + "!room:example.org|m.room.member|@bot:example.org": { is_direct: true }, + }, + }); + const tracker = createDirectRoomTracker(client); + + await expect( + tracker.isDirectMessage({ + roomId: "!room:example.org", + senderId: "@alice:example.org", + }), + ).resolves.toBe(true); + }); + + it("does not classify 2-member rooms whose sender is not a joined member when falling back", async () => { + const client = createMockClient({ + isDm: false, + members: ["@mallory:example.org", "@bot:example.org"], + dmCacheAvailable: false, }); - client.dms.update.mockRejectedValue(new Error("dm cache unavailable")); const tracker = createDirectRoomTracker(client); await expect( @@ -105,12 +195,8 @@ describe("createDirectRoomTracker", () => { ).resolves.toBe(false); }); - it("does not classify 2-member rooms whose sender is not a joined member when falling back", async () => { - const client = createMockClient({ - isDm: false, - members: ["@mallory:example.org", "@bot:example.org"], - }); - client.dms.update.mockRejectedValue(new Error("dm cache unavailable")); + it("does not re-enable the strict 2-member fallback after the dm cache has seeded", async () => { + const client = createMockClient({ isDm: false, dmCacheAvailable: true }); const tracker = createDirectRoomTracker(client); await expect( @@ -119,11 +205,20 @@ describe("createDirectRoomTracker", () => { senderId: "@alice:example.org", }), ).resolves.toBe(false); + + client.dms.update.mockResolvedValue(false); + tracker.invalidateRoom("!room:example.org"); + + await expect( + tracker.isDirectMessage({ + roomId: "!room:example.org", + senderId: "@alice:example.org", + }), + ).resolves.toBe(false); }); it("re-checks room membership after invalidation when fallback membership changes", async () => { - const client = createMockClient({ isDm: false }); - client.dms.update.mockRejectedValue(new Error("dm cache unavailable")); + const client = createMockClient({ isDm: false, dmCacheAvailable: false }); const tracker = createDirectRoomTracker(client); await expect( @@ -145,8 +240,7 @@ describe("createDirectRoomTracker", () => { }); it("bounds joined-room membership cache size", async () => { - const client = createMockClient({ isDm: false }); - client.dms.update.mockRejectedValue(new Error("dm cache unavailable")); + const client = createMockClient({ isDm: false, dmCacheAvailable: false }); const tracker = createDirectRoomTracker(client); for (let i = 0; i <= 1024; i += 1) { @@ -192,4 +286,37 @@ describe("createDirectRoomTracker", () => { expect(client.dms.update).toHaveBeenCalledTimes(2); expect(client.getJoinedRoomMembers).toHaveBeenCalledTimes(2); }); + + it("caches member-state direct flag lookups until the ttl expires", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-03-12T10:00:00Z")); + const client = createMockClient({ + isDm: false, + dmCacheAvailable: true, + stateEvents: { + "!room:example.org|m.room.member|@alice:example.org": { is_direct: true }, + }, + }); + const tracker = createDirectRoomTracker(client); + + await tracker.isDirectMessage({ + roomId: "!room:example.org", + senderId: "@alice:example.org", + }); + await tracker.isDirectMessage({ + roomId: "!room:example.org", + senderId: "@alice:example.org", + }); + + expect(client.getRoomStateEvent).toHaveBeenCalledTimes(1); + + vi.setSystemTime(new Date("2026-03-12T10:00:31Z")); + + await tracker.isDirectMessage({ + roomId: "!room:example.org", + senderId: "@alice:example.org", + }); + + expect(client.getRoomStateEvent).toHaveBeenCalledTimes(2); + }); }); diff --git a/extensions/matrix/src/matrix/monitor/direct.ts b/extensions/matrix/src/matrix/monitor/direct.ts index a44d1c6c92c..d976a794c81 100644 --- a/extensions/matrix/src/matrix/monitor/direct.ts +++ b/extensions/matrix/src/matrix/monitor/direct.ts @@ -1,4 +1,8 @@ -import { isStrictDirectMembership, readJoinedMatrixMembers } from "../direct-room.js"; +import { + hasDirectMatrixMemberFlag, + isStrictDirectMembership, + readJoinedMatrixMembers, +} from "../direct-room.js"; import type { MatrixClient } from "../sdk.js"; type DirectMessageCheck = { @@ -13,6 +17,7 @@ type DirectRoomTrackerOptions = { const DM_CACHE_TTL_MS = 30_000; const MAX_TRACKED_DM_ROOMS = 1024; +const MAX_TRACKED_DM_MEMBER_FLAGS = 2048; function rememberBounded(map: Map, key: string, value: T): void { map.set(key, value); @@ -27,8 +32,12 @@ function rememberBounded(map: Map, key: string, value: T): void { export function createDirectRoomTracker(client: MatrixClient, opts: DirectRoomTrackerOptions = {}) { const log = opts.log ?? (() => {}); let lastDmUpdateMs = 0; + // Once m.direct has seeded successfully, prefer the explicit cache over + // re-enabling the broad 2-person fallback after a later transient failure. + let hasSeededDmCache = false; let cachedSelfUserId: string | null = null; const joinedMembersCache = new Map(); + const directMemberFlagCache = new Map(); const ensureSelfUserId = async (): Promise => { if (cachedSelfUserId) { @@ -48,7 +57,7 @@ export function createDirectRoomTracker(client: MatrixClient, opts: DirectRoomTr return; } lastDmUpdateMs = now; - await client.dms.update(); + hasSeededDmCache = (await client.dms.update()) || hasSeededDmCache; }; const resolveJoinedMembers = async (roomId: string): Promise => { @@ -70,9 +79,33 @@ export function createDirectRoomTracker(client: MatrixClient, opts: DirectRoomTr } }; + const resolveDirectMemberFlag = async ( + roomId: string, + userId?: string | null, + ): Promise => { + const normalizedUserId = userId?.trim(); + if (!normalizedUserId) { + return false; + } + const cacheKey = `${roomId}\n${normalizedUserId}`; + const cached = directMemberFlagCache.get(cacheKey); + const now = Date.now(); + if (cached && now - cached.ts < DM_CACHE_TTL_MS) { + return cached.isDirect; + } + const isDirect = await hasDirectMatrixMemberFlag(client, roomId, normalizedUserId); + rememberBounded(directMemberFlagCache, cacheKey, { isDirect, ts: now }); + return isDirect; + }; + return { invalidateRoom: (roomId: string): void => { joinedMembersCache.delete(roomId); + for (const key of directMemberFlagCache.keys()) { + if (key.startsWith(`${roomId}\n`)) { + directMemberFlagCache.delete(key); + } + } lastDmUpdateMs = 0; log(`matrix: invalidated dm cache room=${roomId}`); }, @@ -85,27 +118,36 @@ export function createDirectRoomTracker(client: MatrixClient, opts: DirectRoomTr remoteUserId: senderId, joinedMembers, }); - let refreshFailed = false; try { await refreshDmCache(); } catch (err) { log(`matrix: dm cache refresh failed (${String(err)})`); - refreshFailed = true; } - if (refreshFailed) { - if (strictDirectMembership) { - log(`matrix: dm detected via exact 2-member room room=${roomId}`); - return true; - } - } else if (client.dms.isDm(roomId)) { + if (client.dms.isDm(roomId)) { if (strictDirectMembership) { log(`matrix: dm detected via m.direct room=${roomId}`); return true; } log(`matrix: ignoring stale m.direct classification room=${roomId}`); - return false; + } + + if (strictDirectMembership) { + const directViaState = + (await resolveDirectMemberFlag(roomId, senderId)) || + (await resolveDirectMemberFlag(roomId, selfUserId)); + if (directViaState) { + log(`matrix: dm detected via member state room=${roomId}`); + return true; + } + + if (!hasSeededDmCache) { + log( + `matrix: dm detected via exact 2-member fallback before dm cache seed room=${roomId}`, + ); + return true; + } } log( diff --git a/extensions/matrix/src/matrix/sdk.ts b/extensions/matrix/src/matrix/sdk.ts index d8321272b96..2d92cc77c27 100644 --- a/extensions/matrix/src/matrix/sdk.ts +++ b/extensions/matrix/src/matrix/sdk.ts @@ -195,8 +195,8 @@ export class MatrixClient { private stopPersistPromise: Promise | null = null; readonly dms = { - update: async (): Promise => { - await this.refreshDmCache(); + update: async (): Promise => { + return await this.refreshDmCache(); }, isDm: (roomId: string): boolean => this.dmRoomIds.has(roomId), }; @@ -1510,11 +1510,11 @@ export class MatrixClient { } } - private async refreshDmCache(): Promise { + private async refreshDmCache(): Promise { const direct = await this.getAccountData("m.direct"); this.dmRoomIds.clear(); if (!direct || typeof direct !== "object") { - return; + return false; } for (const value of Object.values(direct)) { if (!Array.isArray(value)) { @@ -1526,5 +1526,6 @@ export class MatrixClient { } } } + return true; } }