Matrix: restore explicit DM detection

This commit is contained in:
Gustavo Madeira Santana 2026-03-27 18:53:46 -04:00
parent eaebed0e38
commit e32d220ef0
No known key found for this signature in database
5 changed files with 220 additions and 32 deletions

View File

@ -83,6 +83,7 @@ Docs: https://docs.openclaw.ai
- Heartbeat/runner: guarantee the interval timer is re-armed after heartbeat runs and unexpected runner errors so scheduled heartbeats do not silently stop after an interrupted cycle. (#52270) Thanks @MiloStack.
- Config/Doctor: rewrite stale bundled plugin load paths from legacy `extensions/*` locations to the packaged bundled path, including directory-name mismatches and slash-suffixed config entries. (#55054) Thanks @SnowSky1.
- WhatsApp/mentions: stop treating mentions embedded in quoted messages as direct mentions so replying to a message that @mentioned the bot no longer falsely triggers mention gating. (#52711) Thanks @lurebat.
- Matrix: keep separate 2-person rooms out of DM routing after `m.direct` seeds successfully, while still honoring explicit `is_direct` state and startup fallback recovery. (#54890) thanks @private-peter
- Agents/ollama fallback: surface non-2xx Ollama HTTP errors with a leading status code so HTTP 503 responses trigger model fallback again. (#55214) Thanks @bugkill3r.
- Feishu/tools: stop synthetic agent ids like `agent-spawner` from being treated as Feishu account ids during tool execution, so tools fall back to the configured/default Feishu account unless the contextual id is a real enabled Feishu account. (#55627) Thanks @MonkeyLeeT.
- Google/tools: strip empty `required: []` arrays from Gemini tool schemas so optional-only tool parameters no longer trigger Google validator 400s. (#52106) Thanks @oliviareid-svg.

View File

@ -45,6 +45,23 @@ export async function readJoinedMatrixMembers(
}
}
export async function hasDirectMatrixMemberFlag(
client: MatrixClient,
roomId: string,
userId?: string | null,
): Promise<boolean> {
const normalizedUserId = trimMaybeString(userId);
if (!normalizedUserId) {
return false;
}
try {
const state = await client.getRoomStateEvent(roomId, "m.room.member", normalizedUserId);
return state?.is_direct === true;
} catch {
return false;
}
}
export async function isStrictDirectRoom(params: {
client: MatrixClient;
roomId: string;

View File

@ -2,15 +2,33 @@ import { afterEach, describe, expect, it, vi } from "vitest";
import type { MatrixClient } from "../sdk.js";
import { createDirectRoomTracker } from "./direct.js";
function createMockClient(params: { isDm?: boolean; members?: string[] }) {
type MockStateEvents = Record<string, Record<string, unknown>>;
function createMockClient(params: {
isDm?: boolean;
members?: string[];
stateEvents?: MockStateEvents;
dmCacheAvailable?: boolean;
}) {
let members = params.members ?? ["@alice:example.org", "@bot:example.org"];
const stateEvents = params.stateEvents ?? {};
return {
dms: {
update: vi.fn().mockResolvedValue(undefined),
update: vi.fn().mockResolvedValue(params.dmCacheAvailable !== false),
isDm: vi.fn().mockReturnValue(params.isDm === true),
},
getUserId: vi.fn().mockResolvedValue("@bot:example.org"),
getJoinedRoomMembers: vi.fn().mockImplementation(async () => members),
getRoomStateEvent: vi
.fn()
.mockImplementation(async (roomId: string, eventType: string, stateKey = "") => {
const key = `${roomId}|${eventType}|${stateKey}`;
const state = stateEvents[key];
if (state === undefined) {
throw new Error(`State event not found: ${key}`);
}
return state;
}),
__setMembers(next: string[]) {
members = next;
},
@ -20,6 +38,7 @@ function createMockClient(params: { isDm?: boolean; members?: string[] }) {
isDm: ReturnType<typeof vi.fn>;
};
getJoinedRoomMembers: ReturnType<typeof vi.fn>;
getRoomStateEvent: ReturnType<typeof vi.fn>;
__setMembers: (members: string[]) => void;
};
}
@ -61,7 +80,7 @@ describe("createDirectRoomTracker", () => {
});
it("does not classify 2-member rooms as DMs when the dm cache refresh succeeds", async () => {
const client = createMockClient({ isDm: false });
const client = createMockClient({ isDm: false, dmCacheAvailable: true });
const tracker = createDirectRoomTracker(client);
await expect(
@ -74,9 +93,8 @@ describe("createDirectRoomTracker", () => {
expect(client.getJoinedRoomMembers).toHaveBeenCalledWith("!room:example.org");
});
it("falls back to strict 2-member membership when dm cache refresh fails", async () => {
const client = createMockClient({ isDm: false });
client.dms.update.mockRejectedValue(new Error("dm cache unavailable"));
it("falls back to strict 2-member membership before m.direct account data is available", async () => {
const client = createMockClient({ isDm: false, dmCacheAvailable: false });
const tracker = createDirectRoomTracker(client);
await expect(
@ -89,12 +107,84 @@ describe("createDirectRoomTracker", () => {
expect(client.getJoinedRoomMembers).toHaveBeenCalledWith("!room:example.org");
});
it("keeps using the strict 2-member fallback until the dm cache seeds successfully", async () => {
const client = createMockClient({ isDm: false, dmCacheAvailable: false });
const tracker = createDirectRoomTracker(client);
await expect(
tracker.isDirectMessage({
roomId: "!room:example.org",
senderId: "@alice:example.org",
}),
).resolves.toBe(true);
await expect(
tracker.isDirectMessage({
roomId: "!room:example.org",
senderId: "@alice:example.org",
}),
).resolves.toBe(true);
expect(client.dms.update).toHaveBeenCalledTimes(1);
});
it("does not classify rooms with extra members as DMs when falling back", async () => {
const client = createMockClient({
isDm: false,
members: ["@alice:example.org", "@bot:example.org", "@observer:example.org"],
dmCacheAvailable: false,
});
const tracker = createDirectRoomTracker(client);
await expect(
tracker.isDirectMessage({
roomId: "!room:example.org",
senderId: "@alice:example.org",
}),
).resolves.toBe(false);
expect(client.getRoomStateEvent).not.toHaveBeenCalled();
});
it("treats sender is_direct member state as a DM signal", async () => {
const client = createMockClient({
isDm: false,
stateEvents: {
"!room:example.org|m.room.member|@alice:example.org": { is_direct: true },
},
});
const tracker = createDirectRoomTracker(client);
await expect(
tracker.isDirectMessage({
roomId: "!room:example.org",
senderId: "@alice:example.org",
}),
).resolves.toBe(true);
});
it("treats self is_direct member state as a DM signal", async () => {
const client = createMockClient({
isDm: false,
stateEvents: {
"!room:example.org|m.room.member|@bot:example.org": { is_direct: true },
},
});
const tracker = createDirectRoomTracker(client);
await expect(
tracker.isDirectMessage({
roomId: "!room:example.org",
senderId: "@alice:example.org",
}),
).resolves.toBe(true);
});
it("does not classify 2-member rooms whose sender is not a joined member when falling back", async () => {
const client = createMockClient({
isDm: false,
members: ["@mallory:example.org", "@bot:example.org"],
dmCacheAvailable: false,
});
client.dms.update.mockRejectedValue(new Error("dm cache unavailable"));
const tracker = createDirectRoomTracker(client);
await expect(
@ -105,12 +195,8 @@ describe("createDirectRoomTracker", () => {
).resolves.toBe(false);
});
it("does not classify 2-member rooms whose sender is not a joined member when falling back", async () => {
const client = createMockClient({
isDm: false,
members: ["@mallory:example.org", "@bot:example.org"],
});
client.dms.update.mockRejectedValue(new Error("dm cache unavailable"));
it("does not re-enable the strict 2-member fallback after the dm cache has seeded", async () => {
const client = createMockClient({ isDm: false, dmCacheAvailable: true });
const tracker = createDirectRoomTracker(client);
await expect(
@ -119,11 +205,20 @@ describe("createDirectRoomTracker", () => {
senderId: "@alice:example.org",
}),
).resolves.toBe(false);
client.dms.update.mockResolvedValue(false);
tracker.invalidateRoom("!room:example.org");
await expect(
tracker.isDirectMessage({
roomId: "!room:example.org",
senderId: "@alice:example.org",
}),
).resolves.toBe(false);
});
it("re-checks room membership after invalidation when fallback membership changes", async () => {
const client = createMockClient({ isDm: false });
client.dms.update.mockRejectedValue(new Error("dm cache unavailable"));
const client = createMockClient({ isDm: false, dmCacheAvailable: false });
const tracker = createDirectRoomTracker(client);
await expect(
@ -145,8 +240,7 @@ describe("createDirectRoomTracker", () => {
});
it("bounds joined-room membership cache size", async () => {
const client = createMockClient({ isDm: false });
client.dms.update.mockRejectedValue(new Error("dm cache unavailable"));
const client = createMockClient({ isDm: false, dmCacheAvailable: false });
const tracker = createDirectRoomTracker(client);
for (let i = 0; i <= 1024; i += 1) {
@ -192,4 +286,37 @@ describe("createDirectRoomTracker", () => {
expect(client.dms.update).toHaveBeenCalledTimes(2);
expect(client.getJoinedRoomMembers).toHaveBeenCalledTimes(2);
});
it("caches member-state direct flag lookups until the ttl expires", async () => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-03-12T10:00:00Z"));
const client = createMockClient({
isDm: false,
dmCacheAvailable: true,
stateEvents: {
"!room:example.org|m.room.member|@alice:example.org": { is_direct: true },
},
});
const tracker = createDirectRoomTracker(client);
await tracker.isDirectMessage({
roomId: "!room:example.org",
senderId: "@alice:example.org",
});
await tracker.isDirectMessage({
roomId: "!room:example.org",
senderId: "@alice:example.org",
});
expect(client.getRoomStateEvent).toHaveBeenCalledTimes(1);
vi.setSystemTime(new Date("2026-03-12T10:00:31Z"));
await tracker.isDirectMessage({
roomId: "!room:example.org",
senderId: "@alice:example.org",
});
expect(client.getRoomStateEvent).toHaveBeenCalledTimes(2);
});
});

View File

@ -1,4 +1,8 @@
import { isStrictDirectMembership, readJoinedMatrixMembers } from "../direct-room.js";
import {
hasDirectMatrixMemberFlag,
isStrictDirectMembership,
readJoinedMatrixMembers,
} from "../direct-room.js";
import type { MatrixClient } from "../sdk.js";
type DirectMessageCheck = {
@ -13,6 +17,7 @@ type DirectRoomTrackerOptions = {
const DM_CACHE_TTL_MS = 30_000;
const MAX_TRACKED_DM_ROOMS = 1024;
const MAX_TRACKED_DM_MEMBER_FLAGS = 2048;
function rememberBounded<T>(map: Map<string, T>, key: string, value: T): void {
map.set(key, value);
@ -27,8 +32,12 @@ function rememberBounded<T>(map: Map<string, T>, key: string, value: T): void {
export function createDirectRoomTracker(client: MatrixClient, opts: DirectRoomTrackerOptions = {}) {
const log = opts.log ?? (() => {});
let lastDmUpdateMs = 0;
// Once m.direct has seeded successfully, prefer the explicit cache over
// re-enabling the broad 2-person fallback after a later transient failure.
let hasSeededDmCache = false;
let cachedSelfUserId: string | null = null;
const joinedMembersCache = new Map<string, { members: string[]; ts: number }>();
const directMemberFlagCache = new Map<string, { isDirect: boolean; ts: number }>();
const ensureSelfUserId = async (): Promise<string | null> => {
if (cachedSelfUserId) {
@ -48,7 +57,7 @@ export function createDirectRoomTracker(client: MatrixClient, opts: DirectRoomTr
return;
}
lastDmUpdateMs = now;
await client.dms.update();
hasSeededDmCache = (await client.dms.update()) || hasSeededDmCache;
};
const resolveJoinedMembers = async (roomId: string): Promise<string[] | null> => {
@ -70,9 +79,33 @@ export function createDirectRoomTracker(client: MatrixClient, opts: DirectRoomTr
}
};
const resolveDirectMemberFlag = async (
roomId: string,
userId?: string | null,
): Promise<boolean> => {
const normalizedUserId = userId?.trim();
if (!normalizedUserId) {
return false;
}
const cacheKey = `${roomId}\n${normalizedUserId}`;
const cached = directMemberFlagCache.get(cacheKey);
const now = Date.now();
if (cached && now - cached.ts < DM_CACHE_TTL_MS) {
return cached.isDirect;
}
const isDirect = await hasDirectMatrixMemberFlag(client, roomId, normalizedUserId);
rememberBounded(directMemberFlagCache, cacheKey, { isDirect, ts: now });
return isDirect;
};
return {
invalidateRoom: (roomId: string): void => {
joinedMembersCache.delete(roomId);
for (const key of directMemberFlagCache.keys()) {
if (key.startsWith(`${roomId}\n`)) {
directMemberFlagCache.delete(key);
}
}
lastDmUpdateMs = 0;
log(`matrix: invalidated dm cache room=${roomId}`);
},
@ -85,27 +118,36 @@ export function createDirectRoomTracker(client: MatrixClient, opts: DirectRoomTr
remoteUserId: senderId,
joinedMembers,
});
let refreshFailed = false;
try {
await refreshDmCache();
} catch (err) {
log(`matrix: dm cache refresh failed (${String(err)})`);
refreshFailed = true;
}
if (refreshFailed) {
if (strictDirectMembership) {
log(`matrix: dm detected via exact 2-member room room=${roomId}`);
return true;
}
} else if (client.dms.isDm(roomId)) {
if (client.dms.isDm(roomId)) {
if (strictDirectMembership) {
log(`matrix: dm detected via m.direct room=${roomId}`);
return true;
}
log(`matrix: ignoring stale m.direct classification room=${roomId}`);
return false;
}
if (strictDirectMembership) {
const directViaState =
(await resolveDirectMemberFlag(roomId, senderId)) ||
(await resolveDirectMemberFlag(roomId, selfUserId));
if (directViaState) {
log(`matrix: dm detected via member state room=${roomId}`);
return true;
}
if (!hasSeededDmCache) {
log(
`matrix: dm detected via exact 2-member fallback before dm cache seed room=${roomId}`,
);
return true;
}
}
log(

View File

@ -195,8 +195,8 @@ export class MatrixClient {
private stopPersistPromise: Promise<void> | null = null;
readonly dms = {
update: async (): Promise<void> => {
await this.refreshDmCache();
update: async (): Promise<boolean> => {
return await this.refreshDmCache();
},
isDm: (roomId: string): boolean => this.dmRoomIds.has(roomId),
};
@ -1510,11 +1510,11 @@ export class MatrixClient {
}
}
private async refreshDmCache(): Promise<void> {
private async refreshDmCache(): Promise<boolean> {
const direct = await this.getAccountData("m.direct");
this.dmRoomIds.clear();
if (!direct || typeof direct !== "object") {
return;
return false;
}
for (const value of Object.values(direct)) {
if (!Array.isArray(value)) {
@ -1526,5 +1526,6 @@ export class MatrixClient {
}
}
}
return true;
}
}