openclaw/extensions/matrix/src/matrix/monitor/handler.test.ts

1970 lines
60 KiB
TypeScript

import {
__testing as sessionBindingTesting,
registerSessionBindingAdapter,
} from "openclaw/plugin-sdk/conversation-runtime";
import { beforeEach, describe, expect, it, vi } from "vitest";
import { installMatrixMonitorTestRuntime } from "../../test-runtime.js";
import { createMatrixRoomMessageHandler } from "./handler.js";
import {
createMatrixHandlerTestHarness,
createMatrixReactionEvent,
createMatrixRoomMessageEvent,
createMatrixTextMessageEvent,
} from "./handler.test-helpers.js";
import type { MatrixRawEvent } from "./types.js";
import { EventType } from "./types.js";
const sendMessageMatrixMock = vi.hoisted(() =>
vi.fn(async (..._args: unknown[]) => ({ messageId: "evt", roomId: "!room" })),
);
const sendSingleTextMessageMatrixMock = vi.hoisted(() =>
vi.fn(async (..._args: unknown[]) => ({ messageId: "$draft1", roomId: "!room" })),
);
const editMessageMatrixMock = vi.hoisted(() => vi.fn(async () => "$edited"));
const prepareMatrixSingleTextMock = vi.hoisted(() =>
vi.fn((text: string) => {
const trimmedText = text.trim();
return {
trimmedText,
convertedText: trimmedText,
singleEventLimit: 4000,
fitsInSingleEvent: true,
};
}),
);
vi.mock("../send.js", () => ({
editMessageMatrix: editMessageMatrixMock,
prepareMatrixSingleText: prepareMatrixSingleTextMock,
reactMatrixMessage: vi.fn(async () => {}),
sendMessageMatrix: sendMessageMatrixMock,
sendSingleTextMessageMatrix: sendSingleTextMessageMatrixMock,
sendReadReceiptMatrix: vi.fn(async () => {}),
sendTypingMatrix: vi.fn(async () => {}),
}));
const deliverMatrixRepliesMock = vi.hoisted(() => vi.fn(async () => {}));
vi.mock("./replies.js", () => ({
deliverMatrixReplies: deliverMatrixRepliesMock,
}));
beforeEach(() => {
sessionBindingTesting.resetSessionBindingAdaptersForTests();
installMatrixMonitorTestRuntime();
prepareMatrixSingleTextMock.mockReset().mockImplementation((text: string) => {
const trimmedText = text.trim();
return {
trimmedText,
convertedText: trimmedText,
singleEventLimit: 4000,
fitsInSingleEvent: true,
};
});
});
function createReactionHarness(params?: {
cfg?: unknown;
dmPolicy?: "pairing" | "allowlist" | "open" | "disabled";
allowFrom?: string[];
storeAllowFrom?: string[];
targetSender?: string;
isDirectMessage?: boolean;
senderName?: string;
client?: NonNullable<Parameters<typeof createMatrixHandlerTestHarness>[0]>["client"];
}) {
return createMatrixHandlerTestHarness({
cfg: params?.cfg,
dmPolicy: params?.dmPolicy,
allowFrom: params?.allowFrom,
readAllowFromStore: vi.fn(async () => params?.storeAllowFrom ?? []),
client: {
getEvent: async () => ({ sender: params?.targetSender ?? "@bot:example.org" }),
...params?.client,
},
isDirectMessage: params?.isDirectMessage,
getMemberDisplayName: async () => params?.senderName ?? "sender",
});
}
describe("matrix monitor handler pairing account scope", () => {
it("caches account-scoped allowFrom store reads on hot path", async () => {
const readAllowFromStore = vi.fn(async () => [] as string[]);
sendMessageMatrixMock.mockClear();
const { handler } = createMatrixHandlerTestHarness({
readAllowFromStore,
dmPolicy: "pairing",
buildPairingReply: () => "pairing",
});
await handler(
"!room:example.org",
createMatrixTextMessageEvent({
eventId: "$event1",
body: "@room hello",
mentions: { room: true },
}),
);
await handler(
"!room:example.org",
createMatrixTextMessageEvent({
eventId: "$event2",
body: "@room hello again",
mentions: { room: true },
}),
);
expect(readAllowFromStore).toHaveBeenCalledTimes(1);
});
it("refreshes the account-scoped allowFrom cache after its ttl expires", async () => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-03-01T10:00:00.000Z"));
try {
const readAllowFromStore = vi.fn(async () => [] as string[]);
const { handler } = createMatrixHandlerTestHarness({
readAllowFromStore,
dmPolicy: "pairing",
buildPairingReply: () => "pairing",
});
const makeEvent = (id: string): MatrixRawEvent =>
createMatrixTextMessageEvent({
eventId: id,
body: "@room hello",
mentions: { room: true },
});
await handler("!room:example.org", makeEvent("$event1"));
await handler("!room:example.org", makeEvent("$event2"));
expect(readAllowFromStore).toHaveBeenCalledTimes(1);
await vi.advanceTimersByTimeAsync(30_001);
await handler("!room:example.org", makeEvent("$event3"));
expect(readAllowFromStore).toHaveBeenCalledTimes(2);
} finally {
vi.useRealTimers();
}
});
it("sends pairing reminders for pending requests with cooldown", async () => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-03-01T10:00:00.000Z"));
try {
const readAllowFromStore = vi.fn(async () => [] as string[]);
sendMessageMatrixMock.mockClear();
const { handler } = createMatrixHandlerTestHarness({
readAllowFromStore,
dmPolicy: "pairing",
buildPairingReply: () => "Pairing code: ABCDEFGH",
isDirectMessage: true,
getMemberDisplayName: async () => "sender",
});
const makeEvent = (id: string): MatrixRawEvent =>
createMatrixTextMessageEvent({
eventId: id,
body: "hello",
mentions: { room: true },
});
await handler("!room:example.org", makeEvent("$event1"));
await handler("!room:example.org", makeEvent("$event2"));
expect(sendMessageMatrixMock).toHaveBeenCalledTimes(1);
expect(String(sendMessageMatrixMock.mock.calls[0]?.[1] ?? "")).toContain(
"Pairing request is still pending approval.",
);
await vi.advanceTimersByTimeAsync(5 * 60_000 + 1);
await handler("!room:example.org", makeEvent("$event3"));
expect(sendMessageMatrixMock).toHaveBeenCalledTimes(2);
} finally {
vi.useRealTimers();
}
});
it("uses account-scoped pairing store reads and upserts for dm pairing", async () => {
const readAllowFromStore = vi.fn(async () => [] as string[]);
const upsertPairingRequest = vi.fn(async () => ({ code: "ABCDEFGH", created: false }));
const { handler } = createMatrixHandlerTestHarness({
readAllowFromStore,
upsertPairingRequest,
dmPolicy: "pairing",
isDirectMessage: true,
getMemberDisplayName: async () => "sender",
dropPreStartupMessages: true,
needsRoomAliasesForConfig: false,
});
await handler(
"!room:example.org",
createMatrixTextMessageEvent({
eventId: "$event1",
body: "hello",
mentions: { room: true },
}),
);
expect(readAllowFromStore).toHaveBeenCalledWith({
channel: "matrix",
env: process.env,
accountId: "ops",
});
expect(upsertPairingRequest).toHaveBeenCalledWith({
channel: "matrix",
id: "@user:example.org",
accountId: "ops",
meta: { name: "sender" },
});
});
it("passes accountId into route resolution for inbound dm messages", async () => {
const resolveAgentRoute = vi.fn(() => ({
agentId: "ops",
channel: "matrix",
accountId: "ops",
sessionKey: "agent:ops:main",
mainSessionKey: "agent:ops:main",
matchedBy: "binding.account" as const,
}));
const { handler } = createMatrixHandlerTestHarness({
resolveAgentRoute,
isDirectMessage: true,
getMemberDisplayName: async () => "sender",
});
await handler(
"!room:example.org",
createMatrixTextMessageEvent({
eventId: "$event2",
body: "hello",
mentions: { room: true },
}),
);
expect(resolveAgentRoute).toHaveBeenCalledWith(
expect.objectContaining({
channel: "matrix",
accountId: "ops",
}),
);
});
it("does not enqueue delivered text messages into system events", async () => {
const dispatchReplyFromConfig = vi.fn(async () => ({
queuedFinal: true,
counts: { final: 1, block: 0, tool: 0 },
}));
const { handler, enqueueSystemEvent } = createMatrixHandlerTestHarness({
dispatchReplyFromConfig,
isDirectMessage: true,
getMemberDisplayName: async () => "sender",
});
await handler(
"!room:example.org",
createMatrixTextMessageEvent({
eventId: "$event-system-preview",
body: "hello from matrix",
mentions: { room: true },
}),
);
expect(dispatchReplyFromConfig).toHaveBeenCalled();
expect(enqueueSystemEvent).not.toHaveBeenCalled();
});
it("drops room messages from configured Matrix bot accounts when allowBots is off", async () => {
const { handler, recordInboundSession } = createMatrixHandlerTestHarness({
isDirectMessage: false,
configuredBotUserIds: new Set(["@ops:example.org"]),
roomsConfig: {
"!room:example.org": { requireMention: false },
},
getMemberDisplayName: async () => "ops-bot",
});
await handler(
"!room:example.org",
createMatrixTextMessageEvent({
eventId: "$bot-off",
sender: "@ops:example.org",
body: "hello from bot",
}),
);
expect(recordInboundSession).not.toHaveBeenCalled();
});
it("accepts room messages from configured Matrix bot accounts when allowBots is true", async () => {
const { handler, recordInboundSession } = createMatrixHandlerTestHarness({
isDirectMessage: false,
accountAllowBots: true,
configuredBotUserIds: new Set(["@ops:example.org"]),
roomsConfig: {
"!room:example.org": { requireMention: false },
},
getMemberDisplayName: async () => "ops-bot",
});
await handler(
"!room:example.org",
createMatrixTextMessageEvent({
eventId: "$bot-on",
sender: "@ops:example.org",
body: "hello from bot",
}),
);
expect(recordInboundSession).toHaveBeenCalled();
});
it("does not treat unconfigured Matrix users as bots when allowBots is off", async () => {
const { handler, resolveAgentRoute, recordInboundSession } = createMatrixHandlerTestHarness({
isDirectMessage: false,
configuredBotUserIds: new Set(["@ops:example.org"]),
roomsConfig: {
"!room:example.org": { requireMention: false },
},
getMemberDisplayName: async () => "human",
});
await handler(
"!room:example.org",
createMatrixTextMessageEvent({
eventId: "$non-bot",
sender: "@alice:example.org",
body: "hello from human",
}),
);
expect(resolveAgentRoute).toHaveBeenCalled();
expect(recordInboundSession).toHaveBeenCalled();
});
it('drops configured Matrix bot room messages without a mention when allowBots="mentions"', async () => {
const { handler, recordInboundSession } = createMatrixHandlerTestHarness({
isDirectMessage: false,
accountAllowBots: "mentions",
configuredBotUserIds: new Set(["@ops:example.org"]),
roomsConfig: {
"!room:example.org": { requireMention: false },
},
mentionRegexes: [/@bot/i],
getMemberDisplayName: async () => "ops-bot",
});
await handler(
"!room:example.org",
createMatrixTextMessageEvent({
eventId: "$bot-mentions-off",
sender: "@ops:example.org",
body: "hello from bot",
}),
);
expect(recordInboundSession).not.toHaveBeenCalled();
});
it('accepts configured Matrix bot room messages with a mention when allowBots="mentions"', async () => {
const { handler, recordInboundSession } = createMatrixHandlerTestHarness({
isDirectMessage: false,
accountAllowBots: "mentions",
configuredBotUserIds: new Set(["@ops:example.org"]),
roomsConfig: {
"!room:example.org": { requireMention: false },
},
mentionRegexes: [/@bot/i],
getMemberDisplayName: async () => "ops-bot",
});
await handler(
"!room:example.org",
createMatrixTextMessageEvent({
eventId: "$bot-mentions-on",
sender: "@ops:example.org",
body: "hello @bot",
mentions: { user_ids: ["@bot:example.org"] },
}),
);
expect(recordInboundSession).toHaveBeenCalled();
});
it('accepts configured Matrix bot DMs without a mention when allowBots="mentions"', async () => {
const { handler, recordInboundSession } = createMatrixHandlerTestHarness({
isDirectMessage: true,
accountAllowBots: "mentions",
configuredBotUserIds: new Set(["@ops:example.org"]),
getMemberDisplayName: async () => "ops-bot",
});
await handler(
"!dm:example.org",
createMatrixTextMessageEvent({
eventId: "$bot-dm-mentions",
sender: "@ops:example.org",
body: "hello from dm bot",
}),
);
expect(recordInboundSession).toHaveBeenCalled();
});
it("lets room-level allowBots override a permissive account default", async () => {
const { handler, recordInboundSession } = createMatrixHandlerTestHarness({
isDirectMessage: false,
accountAllowBots: true,
configuredBotUserIds: new Set(["@ops:example.org"]),
roomsConfig: {
"!room:example.org": { requireMention: false, allowBots: false },
},
getMemberDisplayName: async () => "ops-bot",
});
await handler(
"!room:example.org",
createMatrixTextMessageEvent({
eventId: "$bot-room-override",
sender: "@ops:example.org",
body: "hello from bot",
}),
);
expect(recordInboundSession).not.toHaveBeenCalled();
});
it("processes room messages mentioned via displayName in formatted_body", async () => {
const recordInboundSession = vi.fn(async () => {});
const { handler } = createMatrixHandlerTestHarness({
isDirectMessage: false,
getMemberDisplayName: async () => "Tom Servo",
recordInboundSession,
});
await handler(
"!room:example.org",
createMatrixRoomMessageEvent({
eventId: "$display-name-mention",
content: {
msgtype: "m.text",
body: "Tom Servo: hello",
formatted_body: '<a href="https://matrix.to/#/@bot:example.org">Tom Servo</a>: hello',
},
}),
);
expect(recordInboundSession).toHaveBeenCalled();
});
it("does not fetch self displayName for plain-text room mentions", async () => {
const getMemberDisplayName = vi.fn(async () => "Tom Servo");
const { handler, recordInboundSession } = createMatrixHandlerTestHarness({
isDirectMessage: false,
mentionRegexes: [/\btom servo\b/i],
getMemberDisplayName,
});
await handler(
"!room:example.org",
createMatrixTextMessageEvent({
eventId: "$plain-text-mention",
body: "Tom Servo: hello",
}),
);
expect(recordInboundSession).toHaveBeenCalled();
expect(getMemberDisplayName).not.toHaveBeenCalledWith("!room:example.org", "@bot:example.org");
});
it("drops forged metadata-only mentions before agent routing", async () => {
const { handler, recordInboundSession, resolveAgentRoute } = createMatrixHandlerTestHarness({
isDirectMessage: false,
mentionRegexes: [/@bot/i],
getMemberDisplayName: async () => "sender",
});
await handler(
"!room:example.org",
createMatrixTextMessageEvent({
eventId: "$spoofed-mention",
body: "hello there",
mentions: { user_ids: ["@bot:example.org"] },
}),
);
expect(recordInboundSession).not.toHaveBeenCalled();
});
it("skips media downloads for unmentioned group media messages", async () => {
const downloadContent = vi.fn(async () => Buffer.from("image"));
const getMemberDisplayName = vi.fn(async () => "sender");
const getRoomInfo = vi.fn(async () => ({ altAliases: [] }));
const { handler } = createMatrixHandlerTestHarness({
client: {
downloadContent,
},
isDirectMessage: false,
mentionRegexes: [/@bot/i],
getMemberDisplayName,
getRoomInfo,
});
await handler("!room:example.org", {
type: EventType.RoomMessage,
sender: "@user:example.org",
event_id: "$media1",
origin_server_ts: Date.now(),
content: {
msgtype: "m.image",
body: "",
url: "mxc://example.org/media",
info: {
mimetype: "image/png",
size: 5,
},
},
} as MatrixRawEvent);
expect(downloadContent).not.toHaveBeenCalled();
expect(getMemberDisplayName).not.toHaveBeenCalled();
expect(getRoomInfo).not.toHaveBeenCalled();
});
it("skips poll snapshot fetches for unmentioned group poll responses", async () => {
const getEvent = vi.fn(async () => ({
event_id: "$poll",
sender: "@user:example.org",
type: "m.poll.start",
origin_server_ts: Date.now(),
content: {
"m.poll.start": {
question: { "m.text": "Lunch?" },
kind: "m.poll.disclosed",
max_selections: 1,
answers: [{ id: "a1", "m.text": "Pizza" }],
},
},
}));
const getRelations = vi.fn(async () => ({
events: [],
nextBatch: null,
prevBatch: null,
}));
const getMemberDisplayName = vi.fn(async () => "sender");
const getRoomInfo = vi.fn(async () => ({ altAliases: [] }));
const { handler } = createMatrixHandlerTestHarness({
client: {
getEvent,
getRelations,
},
isDirectMessage: false,
mentionRegexes: [/@bot/i],
getMemberDisplayName,
getRoomInfo,
});
await handler("!room:example.org", {
type: "m.poll.response",
sender: "@user:example.org",
event_id: "$poll-response-1",
origin_server_ts: Date.now(),
content: {
"m.poll.response": {
answers: ["a1"],
},
"m.relates_to": {
rel_type: "m.reference",
event_id: "$poll",
},
},
} as MatrixRawEvent);
expect(getEvent).not.toHaveBeenCalled();
expect(getRelations).not.toHaveBeenCalled();
expect(getMemberDisplayName).not.toHaveBeenCalled();
expect(getRoomInfo).not.toHaveBeenCalled();
});
it("records thread starter context for inbound thread replies", async () => {
const { handler, finalizeInboundContext, recordInboundSession } =
createMatrixHandlerTestHarness({
client: {
getEvent: async () =>
createMatrixTextMessageEvent({
eventId: "$root",
sender: "@alice:example.org",
body: "Root topic",
}),
},
isDirectMessage: false,
getMemberDisplayName: async (_roomId, userId) =>
userId === "@alice:example.org" ? "Alice" : "sender",
});
await handler(
"!room:example.org",
createMatrixTextMessageEvent({
eventId: "$reply1",
body: "@room follow up",
relatesTo: {
rel_type: "m.thread",
event_id: "$root",
"m.in_reply_to": { event_id: "$root" },
},
mentions: { room: true },
}),
);
expect(finalizeInboundContext).toHaveBeenCalledWith(
expect.objectContaining({
MessageThreadId: "$root",
ThreadStarterBody: "Matrix thread root $root from Alice:\nRoot topic",
}),
);
expect(recordInboundSession).toHaveBeenCalledWith(
expect.objectContaining({
sessionKey: "agent:ops:main:thread:$root",
}),
);
});
it("keeps threaded DMs flat when dm threadReplies is off", async () => {
const { handler, finalizeInboundContext, recordInboundSession } =
createMatrixHandlerTestHarness({
threadReplies: "always",
dmThreadReplies: "off",
isDirectMessage: true,
client: {
getEvent: async (_roomId, eventId) =>
eventId === "$root"
? createMatrixTextMessageEvent({
eventId: "$root",
sender: "@alice:example.org",
body: "Root topic",
})
: ({ sender: "@bot:example.org" } as never),
},
getMemberDisplayName: async (_roomId, userId) =>
userId === "@alice:example.org" ? "Alice" : "sender",
});
await handler(
"!dm:example.org",
createMatrixTextMessageEvent({
eventId: "$reply1",
body: "follow up",
relatesTo: {
rel_type: "m.thread",
event_id: "$root",
"m.in_reply_to": { event_id: "$root" },
},
}),
);
expect(finalizeInboundContext).toHaveBeenCalledWith(
expect.objectContaining({
MessageThreadId: undefined,
ReplyToId: "$root",
ThreadStarterBody: "Matrix thread root $root from Alice:\nRoot topic",
}),
);
expect(recordInboundSession).toHaveBeenCalledWith(
expect.objectContaining({
sessionKey: "agent:ops:main",
}),
);
});
it("uses stable room ids instead of room-declared aliases in group context", async () => {
const { handler, finalizeInboundContext } = createMatrixHandlerTestHarness({
isDirectMessage: false,
getRoomInfo: async () => ({
name: "Ops Room",
canonicalAlias: "#spoofed:example.org",
altAliases: ["#alt:example.org"],
}),
getMemberDisplayName: async () => "sender",
dispatchReplyFromConfig: async () => ({
queuedFinal: false,
counts: { final: 0, block: 0, tool: 0 },
}),
});
await handler(
"!room:example.org",
createMatrixTextMessageEvent({
eventId: "$group1",
body: "@room hello",
mentions: { room: true },
}),
);
const finalized = vi.mocked(finalizeInboundContext).mock.calls.at(-1)?.[0];
expect(finalized).toEqual(
expect.objectContaining({
GroupSubject: "Ops Room",
GroupId: "!room:example.org",
}),
);
expect(finalized).not.toHaveProperty("GroupChannel");
});
it("routes bound Matrix threads to the target session key", async () => {
const touch = vi.fn();
registerSessionBindingAdapter({
channel: "matrix",
accountId: "ops",
listBySession: () => [],
resolveByConversation: (ref) =>
ref.conversationId === "$root"
? {
bindingId: "ops:!room:example:$root",
targetSessionKey: "agent:bound:session-1",
targetKind: "session",
conversation: {
channel: "matrix",
accountId: "ops",
conversationId: "$root",
parentConversationId: "!room:example",
},
status: "active",
boundAt: Date.now(),
metadata: {
boundBy: "user-1",
},
}
: null,
touch,
});
const { handler, recordInboundSession } = createMatrixHandlerTestHarness({
client: {
getEvent: async () =>
createMatrixTextMessageEvent({
eventId: "$root",
sender: "@alice:example.org",
body: "Root topic",
}),
},
isDirectMessage: false,
finalizeInboundContext: (ctx: unknown) => ctx,
getMemberDisplayName: async () => "sender",
});
await handler(
"!room:example",
createMatrixTextMessageEvent({
eventId: "$reply1",
body: "@room follow up",
relatesTo: {
rel_type: "m.thread",
event_id: "$root",
"m.in_reply_to": { event_id: "$root" },
},
mentions: { room: true },
}),
);
expect(recordInboundSession).toHaveBeenCalledWith(
expect.objectContaining({
sessionKey: "agent:bound:session-1",
}),
);
expect(touch).toHaveBeenCalledTimes(1);
});
it("does not refresh bound Matrix thread bindings for room messages dropped before routing", async () => {
const touch = vi.fn();
registerSessionBindingAdapter({
channel: "matrix",
accountId: "ops",
listBySession: () => [],
resolveByConversation: (ref) =>
ref.conversationId === "$root"
? {
bindingId: "ops:!room:example:$root",
targetSessionKey: "agent:bound:session-1",
targetKind: "session",
conversation: {
channel: "matrix",
accountId: "ops",
conversationId: "$root",
parentConversationId: "!room:example",
},
status: "active",
boundAt: Date.now(),
metadata: {
boundBy: "user-1",
},
}
: null,
touch,
});
const { handler, recordInboundSession } = createMatrixHandlerTestHarness({
client: {
getEvent: async () =>
createMatrixTextMessageEvent({
eventId: "$root",
sender: "@alice:example.org",
body: "Root topic",
}),
},
isDirectMessage: false,
getMemberDisplayName: async () => "sender",
});
await handler(
"!room:example",
createMatrixTextMessageEvent({
eventId: "$reply-no-mention",
body: "follow up without mention",
relatesTo: {
rel_type: "m.thread",
event_id: "$root",
"m.in_reply_to": { event_id: "$root" },
},
}),
);
expect(recordInboundSession).not.toHaveBeenCalled();
expect(touch).not.toHaveBeenCalled();
});
it("does not enqueue system events for delivered text replies", async () => {
const enqueueSystemEvent = vi.fn();
const handler = createMatrixRoomMessageHandler({
client: {
getUserId: async () => "@bot:example.org",
} as never,
core: {
channel: {
pairing: {
readAllowFromStore: async () => [] as string[],
upsertPairingRequest: async () => ({ code: "ABCDEFGH", created: false }),
buildPairingReply: () => "pairing",
},
commands: {
shouldHandleTextCommands: () => false,
},
text: {
hasControlCommand: () => false,
resolveMarkdownTableMode: () => "preserve",
},
routing: {
resolveAgentRoute: () => ({
agentId: "ops",
channel: "matrix",
accountId: "ops",
sessionKey: "agent:ops:main",
mainSessionKey: "agent:ops:main",
matchedBy: "binding.account",
}),
},
mentions: {
buildMentionRegexes: () => [],
},
session: {
resolveStorePath: () => "/tmp/session-store",
readSessionUpdatedAt: () => undefined,
recordInboundSession: vi.fn(async () => {}),
},
reply: {
resolveEnvelopeFormatOptions: () => ({}),
formatAgentEnvelope: ({ body }: { body: string }) => body,
finalizeInboundContext: (ctx: unknown) => ctx,
createReplyDispatcherWithTyping: () => ({
dispatcher: {},
replyOptions: {},
markDispatchIdle: () => {},
markRunComplete: () => {},
}),
resolveHumanDelayConfig: () => undefined,
dispatchReplyFromConfig: async () => ({
queuedFinal: true,
counts: { final: 1, block: 0, tool: 0 },
}),
withReplyDispatcher: async <T>({
dispatcher,
run,
onSettled,
}: {
dispatcher: {
markComplete?: () => void;
waitForIdle?: () => Promise<void>;
};
run: () => Promise<T>;
onSettled?: () => void | Promise<void>;
}) => {
try {
return await run();
} finally {
dispatcher.markComplete?.();
try {
await dispatcher.waitForIdle?.();
} finally {
await onSettled?.();
}
}
},
},
reactions: {
shouldAckReaction: () => false,
},
},
system: {
enqueueSystemEvent,
},
} as never,
cfg: {} as never,
accountId: "ops",
runtime: {
error: () => {},
} as never,
logger: {
info: () => {},
warn: () => {},
} as never,
logVerboseMessage: () => {},
allowFrom: [],
groupPolicy: "open",
replyToMode: "off",
threadReplies: "inbound",
streaming: "off",
blockStreamingEnabled: false,
dmEnabled: true,
dmPolicy: "open",
textLimit: 8_000,
mediaMaxBytes: 10_000_000,
historyLimit: 0,
startupMs: 0,
startupGraceMs: 0,
directTracker: {
isDirectMessage: async () => false,
},
dropPreStartupMessages: true,
getRoomInfo: async () => ({ altAliases: [] }),
getMemberDisplayName: async () => "sender",
needsRoomAliasesForConfig: false,
});
await handler(
"!room:example.org",
createMatrixTextMessageEvent({
eventId: "$message1",
sender: "@user:example.org",
body: "hello there",
mentions: { room: true },
}) as MatrixRawEvent,
);
expect(enqueueSystemEvent).not.toHaveBeenCalled();
});
it("enqueues system events for reactions on bot-authored messages", async () => {
const { handler, enqueueSystemEvent, resolveAgentRoute } = createReactionHarness();
await handler(
"!room:example.org",
createMatrixReactionEvent({
eventId: "$reaction1",
targetEventId: "$msg1",
key: "👍",
}),
);
expect(resolveAgentRoute).toHaveBeenCalledWith(
expect.objectContaining({
channel: "matrix",
accountId: "ops",
}),
);
expect(enqueueSystemEvent).toHaveBeenCalledWith(
"Matrix reaction added: 👍 by sender on msg $msg1",
{
sessionKey: "agent:ops:main",
contextKey: "matrix:reaction:add:!room:example.org:$msg1:@user:example.org:👍",
},
);
});
it("routes reaction notifications for bound thread messages to the bound session", async () => {
registerSessionBindingAdapter({
channel: "matrix",
accountId: "ops",
listBySession: () => [],
resolveByConversation: (ref) =>
ref.conversationId === "$root"
? {
bindingId: "ops:!room:example.org:$root",
targetSessionKey: "agent:bound:session-1",
targetKind: "session",
conversation: {
channel: "matrix",
accountId: "ops",
conversationId: "$root",
parentConversationId: "!room:example.org",
},
status: "active",
boundAt: Date.now(),
metadata: {
boundBy: "user-1",
},
}
: null,
touch: vi.fn(),
});
const { handler, enqueueSystemEvent } = createMatrixHandlerTestHarness({
client: {
getEvent: async () =>
createMatrixTextMessageEvent({
eventId: "$reply1",
sender: "@bot:example.org",
body: "follow up",
relatesTo: {
rel_type: "m.thread",
event_id: "$root",
"m.in_reply_to": { event_id: "$root" },
},
}),
},
isDirectMessage: false,
});
await handler(
"!room:example.org",
createMatrixReactionEvent({
eventId: "$reaction-thread",
targetEventId: "$reply1",
key: "🎯",
}),
);
expect(enqueueSystemEvent).toHaveBeenCalledWith(
"Matrix reaction added: 🎯 by sender on msg $reply1",
{
sessionKey: "agent:bound:session-1",
contextKey: "matrix:reaction:add:!room:example.org:$reply1:@user:example.org:🎯",
},
);
});
it("keeps threaded DM reaction notifications on the flat session when dm threadReplies is off", async () => {
const { handler, enqueueSystemEvent } = createReactionHarness({
cfg: {
channels: {
matrix: {
threadReplies: "always",
dm: { threadReplies: "off" },
},
},
},
isDirectMessage: true,
client: {
getEvent: async () =>
createMatrixTextMessageEvent({
eventId: "$reply1",
sender: "@bot:example.org",
body: "follow up",
relatesTo: {
rel_type: "m.thread",
event_id: "$root",
"m.in_reply_to": { event_id: "$root" },
},
}),
},
});
await handler(
"!dm:example.org",
createMatrixReactionEvent({
eventId: "$reaction-thread",
targetEventId: "$reply1",
key: "🎯",
}),
);
expect(enqueueSystemEvent).toHaveBeenCalledWith(
"Matrix reaction added: 🎯 by sender on msg $reply1",
{
sessionKey: "agent:ops:main",
contextKey: "matrix:reaction:add:!dm:example.org:$reply1:@user:example.org:🎯",
},
);
});
it("routes thread-root reaction notifications to the thread session when threadReplies is always", async () => {
const { handler, enqueueSystemEvent } = createReactionHarness({
cfg: {
channels: {
matrix: {
threadReplies: "always",
},
},
},
isDirectMessage: false,
client: {
getEvent: async () =>
createMatrixTextMessageEvent({
eventId: "$root",
sender: "@bot:example.org",
body: "start thread",
}),
},
});
await handler(
"!room:example.org",
createMatrixReactionEvent({
eventId: "$reaction-root",
targetEventId: "$root",
key: "🧵",
}),
);
expect(enqueueSystemEvent).toHaveBeenCalledWith(
"Matrix reaction added: 🧵 by sender on msg $root",
{
sessionKey: "agent:ops:main:thread:$root",
contextKey: "matrix:reaction:add:!room:example.org:$root:@user:example.org:🧵",
},
);
});
it("ignores reactions that do not target bot-authored messages", async () => {
const { handler, enqueueSystemEvent, resolveAgentRoute } = createReactionHarness({
targetSender: "@other:example.org",
});
await handler(
"!room:example.org",
createMatrixReactionEvent({
eventId: "$reaction2",
targetEventId: "$msg2",
key: "👀",
}),
);
expect(enqueueSystemEvent).not.toHaveBeenCalled();
expect(resolveAgentRoute).not.toHaveBeenCalled();
});
it("does not create pairing requests for unauthorized dm reactions", async () => {
const { handler, enqueueSystemEvent, upsertPairingRequest } = createReactionHarness({
dmPolicy: "pairing",
});
await handler(
"!room:example.org",
createMatrixReactionEvent({
eventId: "$reaction3",
targetEventId: "$msg3",
key: "🔥",
}),
);
expect(upsertPairingRequest).not.toHaveBeenCalled();
expect(enqueueSystemEvent).not.toHaveBeenCalled();
});
it("honors account-scoped reaction notification overrides", async () => {
const { handler, enqueueSystemEvent } = createReactionHarness({
cfg: {
channels: {
matrix: {
reactionNotifications: "own",
accounts: {
ops: {
reactionNotifications: "off",
},
},
},
},
},
});
await handler(
"!room:example.org",
createMatrixReactionEvent({
eventId: "$reaction4",
targetEventId: "$msg4",
key: "✅",
}),
);
expect(enqueueSystemEvent).not.toHaveBeenCalled();
});
it("drops pre-startup dm messages on cold start", async () => {
const resolveAgentRoute = vi.fn(() => ({
agentId: "ops",
channel: "matrix",
accountId: "ops",
sessionKey: "agent:ops:main",
mainSessionKey: "agent:ops:main",
matchedBy: "binding.account" as const,
}));
const { handler } = createMatrixHandlerTestHarness({
resolveAgentRoute,
isDirectMessage: true,
startupMs: 1_000,
startupGraceMs: 0,
dropPreStartupMessages: true,
});
await handler(
"!room:example.org",
createMatrixTextMessageEvent({
eventId: "$old-cold-start",
body: "hello",
originServerTs: 999,
}),
);
expect(resolveAgentRoute).not.toHaveBeenCalled();
});
it("replays pre-startup dm messages when persisted sync state exists", async () => {
const resolveAgentRoute = vi.fn(() => ({
agentId: "ops",
channel: "matrix",
accountId: "ops",
sessionKey: "agent:ops:main",
mainSessionKey: "agent:ops:main",
matchedBy: "binding.account" as const,
}));
const { handler } = createMatrixHandlerTestHarness({
resolveAgentRoute,
isDirectMessage: true,
startupMs: 1_000,
startupGraceMs: 0,
dropPreStartupMessages: false,
});
await handler(
"!room:example.org",
createMatrixTextMessageEvent({
eventId: "$old-resume",
body: "hello",
originServerTs: 999,
}),
);
expect(resolveAgentRoute).toHaveBeenCalledTimes(1);
});
});
describe("matrix monitor handler durable inbound dedupe", () => {
it("skips replayed inbound events before session recording", async () => {
const inboundDeduper = {
claimEvent: vi.fn(() => false),
commitEvent: vi.fn(async () => undefined),
releaseEvent: vi.fn(),
};
const { handler, recordInboundSession } = createMatrixHandlerTestHarness({
inboundDeduper,
dispatchReplyFromConfig: vi.fn(async () => ({
queuedFinal: true,
counts: { final: 1, block: 0, tool: 0 },
})),
});
await handler(
"!room:example.org",
createMatrixTextMessageEvent({
eventId: "$dup",
body: "hello",
}),
);
expect(inboundDeduper.claimEvent).toHaveBeenCalledWith({
roomId: "!room:example.org",
eventId: "$dup",
});
expect(recordInboundSession).not.toHaveBeenCalled();
expect(inboundDeduper.commitEvent).not.toHaveBeenCalled();
expect(inboundDeduper.releaseEvent).not.toHaveBeenCalled();
});
it("commits inbound events only after queued replies finish delivering", async () => {
const callOrder: string[] = [];
const inboundDeduper = {
claimEvent: vi.fn(() => {
callOrder.push("claim");
return true;
}),
commitEvent: vi.fn(async () => {
callOrder.push("commit");
}),
releaseEvent: vi.fn(() => {
callOrder.push("release");
}),
};
const recordInboundSession = vi.fn(async () => {
callOrder.push("record");
});
const dispatchReplyFromConfig = vi.fn(async () => {
callOrder.push("dispatch");
return {
queuedFinal: true,
counts: { final: 1, block: 0, tool: 0 },
};
});
const { handler } = createMatrixHandlerTestHarness({
inboundDeduper,
recordInboundSession,
dispatchReplyFromConfig,
createReplyDispatcherWithTyping: () => ({
dispatcher: {
markComplete: () => {
callOrder.push("mark-complete");
},
waitForIdle: async () => {
callOrder.push("wait-for-idle");
},
},
replyOptions: {},
markDispatchIdle: () => {
callOrder.push("dispatch-idle");
},
markRunComplete: () => {
callOrder.push("run-complete");
},
}),
});
await handler(
"!room:example.org",
createMatrixTextMessageEvent({
eventId: "$commit-order",
body: "hello",
}),
);
expect(callOrder).toEqual([
"claim",
"record",
"dispatch",
"run-complete",
"mark-complete",
"wait-for-idle",
"dispatch-idle",
"commit",
]);
expect(inboundDeduper.releaseEvent).not.toHaveBeenCalled();
});
it("releases a claimed event when reply dispatch fails before completion", async () => {
const inboundDeduper = {
claimEvent: vi.fn(() => true),
commitEvent: vi.fn(async () => undefined),
releaseEvent: vi.fn(),
};
const runtime = {
error: vi.fn(),
};
const { handler } = createMatrixHandlerTestHarness({
inboundDeduper,
runtime: runtime as never,
recordInboundSession: vi.fn(async () => {
throw new Error("disk failed");
}),
dispatchReplyFromConfig: vi.fn(async () => ({
queuedFinal: true,
counts: { final: 1, block: 0, tool: 0 },
})),
});
await handler(
"!room:example.org",
createMatrixTextMessageEvent({
eventId: "$release-on-error",
body: "hello",
}),
);
expect(inboundDeduper.commitEvent).not.toHaveBeenCalled();
expect(inboundDeduper.releaseEvent).toHaveBeenCalledWith({
roomId: "!room:example.org",
eventId: "$release-on-error",
});
expect(runtime.error).toHaveBeenCalledWith(expect.stringContaining("matrix handler failed"));
});
it("releases a claimed event when queued final delivery fails", async () => {
const inboundDeduper = {
claimEvent: vi.fn(() => true),
commitEvent: vi.fn(async () => undefined),
releaseEvent: vi.fn(),
};
const runtime = {
error: vi.fn(),
};
const { handler } = createMatrixHandlerTestHarness({
inboundDeduper,
runtime: runtime as never,
dispatchReplyFromConfig: vi.fn(async () => ({
queuedFinal: true,
counts: { final: 1, block: 0, tool: 0 },
})),
createReplyDispatcherWithTyping: (params) => ({
dispatcher: {
markComplete: () => {},
waitForIdle: async () => {
params?.onError?.(new Error("send failed"), { kind: "final" });
},
},
replyOptions: {},
markDispatchIdle: () => {},
markRunComplete: () => {},
}),
});
await handler(
"!room:example.org",
createMatrixTextMessageEvent({
eventId: "$release-on-final-delivery-error",
body: "hello",
}),
);
expect(inboundDeduper.commitEvent).not.toHaveBeenCalled();
expect(inboundDeduper.releaseEvent).toHaveBeenCalledWith({
roomId: "!room:example.org",
eventId: "$release-on-final-delivery-error",
});
expect(runtime.error).toHaveBeenCalledWith(
expect.stringContaining("matrix final reply failed"),
);
});
it.each(["tool", "block"] as const)(
"releases a claimed event when queued %s delivery fails and no final reply exists",
async (kind) => {
const inboundDeduper = {
claimEvent: vi.fn(() => true),
commitEvent: vi.fn(async () => undefined),
releaseEvent: vi.fn(),
};
const runtime = {
error: vi.fn(),
};
const { handler } = createMatrixHandlerTestHarness({
inboundDeduper,
runtime: runtime as never,
dispatchReplyFromConfig: vi.fn(async () => ({
queuedFinal: false,
counts: {
final: 0,
block: kind === "block" ? 1 : 0,
tool: kind === "tool" ? 1 : 0,
},
})),
createReplyDispatcherWithTyping: (params) => ({
dispatcher: {
markComplete: () => {},
waitForIdle: async () => {
params?.onError?.(new Error("send failed"), { kind });
},
},
replyOptions: {},
markDispatchIdle: () => {},
markRunComplete: () => {},
}),
});
await handler(
"!room:example.org",
createMatrixTextMessageEvent({
eventId: `$release-on-${kind}-delivery-error`,
body: "hello",
}),
);
expect(inboundDeduper.commitEvent).not.toHaveBeenCalled();
expect(inboundDeduper.releaseEvent).toHaveBeenCalledWith({
roomId: "!room:example.org",
eventId: `$release-on-${kind}-delivery-error`,
});
expect(runtime.error).toHaveBeenCalledWith(
expect.stringContaining(`matrix ${kind} reply failed`),
);
},
);
it("commits a claimed event when dispatch completes without a final reply", async () => {
const callOrder: string[] = [];
const inboundDeduper = {
claimEvent: vi.fn(() => {
callOrder.push("claim");
return true;
}),
commitEvent: vi.fn(async () => {
callOrder.push("commit");
}),
releaseEvent: vi.fn(() => {
callOrder.push("release");
}),
};
const { handler } = createMatrixHandlerTestHarness({
inboundDeduper,
recordInboundSession: vi.fn(async () => {
callOrder.push("record");
}),
dispatchReplyFromConfig: vi.fn(async () => {
callOrder.push("dispatch");
return {
queuedFinal: false,
counts: { final: 0, block: 0, tool: 0 },
};
}),
});
await handler(
"!room:example.org",
createMatrixTextMessageEvent({
eventId: "$no-final",
body: "hello",
}),
);
expect(callOrder).toEqual(["claim", "record", "dispatch", "commit"]);
expect(inboundDeduper.releaseEvent).not.toHaveBeenCalled();
});
});
describe("matrix monitor handler draft streaming", () => {
type DeliverFn = (
payload: {
text?: string;
mediaUrl?: string;
mediaUrls?: string[];
isCompactionNotice?: boolean;
replyToId?: string;
},
info: { kind: string },
) => Promise<void>;
type ReplyOpts = {
onPartialReply?: (payload: { text: string }) => void;
onAssistantMessageStart?: () => void;
disableBlockStreaming?: boolean;
};
function createStreamingHarness(opts?: { replyToMode?: "off" | "first" | "all" }) {
let capturedDeliver: DeliverFn | undefined;
let capturedReplyOpts: ReplyOpts | undefined;
// Gate that keeps the handler's model run alive until the test releases it.
let resolveRunGate: (() => void) | undefined;
const runGate = new Promise<void>((resolve) => {
resolveRunGate = resolve;
});
sendMessageMatrixMock.mockReset().mockResolvedValue({ messageId: "$draft1", roomId: "!room" });
sendSingleTextMessageMatrixMock
.mockReset()
.mockResolvedValue({ messageId: "$draft1", roomId: "!room" });
editMessageMatrixMock.mockReset().mockResolvedValue("$edited");
deliverMatrixRepliesMock.mockReset().mockResolvedValue(undefined);
const redactEventMock = vi.fn(async () => "$redacted");
const { handler } = createMatrixHandlerTestHarness({
streaming: "partial",
replyToMode: opts?.replyToMode ?? "off",
client: { redactEvent: redactEventMock },
createReplyDispatcherWithTyping: (params: Record<string, unknown> | undefined) => {
capturedDeliver = params?.deliver as DeliverFn | undefined;
return {
dispatcher: {
markComplete: () => {},
waitForIdle: async () => {},
},
replyOptions: {},
markDispatchIdle: () => {},
markRunComplete: () => {},
};
},
dispatchReplyFromConfig: vi.fn(async (args: { replyOptions?: ReplyOpts }) => {
capturedReplyOpts = args?.replyOptions;
// Block until the test is done exercising callbacks.
await runGate;
return { queuedFinal: true, counts: { final: 1, block: 0, tool: 0 } };
}) as never,
withReplyDispatcher: async <T>(params: {
dispatcher: { markComplete?: () => void; waitForIdle?: () => Promise<void> };
run: () => Promise<T>;
onSettled?: () => void | Promise<void>;
}) => {
const result = await params.run();
await params.onSettled?.();
return result;
},
});
const dispatch = async () => {
// Start handler without awaiting — it blocks on runGate.
const handlerDone = handler(
"!room:example.org",
createMatrixTextMessageEvent({ eventId: "$msg1", body: "hello" }),
);
// Wait for callbacks to be captured.
await vi.waitFor(() => {
if (!capturedDeliver || !capturedReplyOpts) {
throw new Error("Streaming callbacks not captured yet");
}
});
return {
deliver: capturedDeliver!,
opts: capturedReplyOpts!,
// Release the run gate and wait for the handler to finish
// (including the finally block that stops the draft stream).
finish: async () => {
resolveRunGate?.();
await handlerDone;
},
};
};
return { dispatch, redactEventMock };
}
it("falls back to deliverMatrixReplies when final edit fails", async () => {
const { dispatch } = createStreamingHarness();
const { deliver, opts, finish } = await dispatch();
// Simulate streaming: partial reply creates draft message.
opts.onPartialReply?.({ text: "Hello" });
// Wait for the draft stream's immediate send to complete.
await vi.waitFor(() => {
expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1);
});
// Make the final edit fail.
editMessageMatrixMock.mockRejectedValueOnce(new Error("rate limited"));
// Deliver final — should catch edit failure and fall back.
await deliver({ text: "Hello world" }, { kind: "block" });
expect(deliverMatrixRepliesMock).toHaveBeenCalledTimes(1);
await finish();
});
it("does not reset draft stream after final delivery", async () => {
vi.useFakeTimers();
try {
const { dispatch } = createStreamingHarness();
const { deliver, opts, finish } = await dispatch();
opts.onPartialReply?.({ text: "Hello" });
await vi.waitFor(() => {
expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1);
});
// Final delivery — stream should stay stopped.
await deliver({ text: "Hello" }, { kind: "final" });
// Further partial updates should NOT create new messages.
sendSingleTextMessageMatrixMock.mockClear();
opts.onPartialReply?.({ text: "Ghost" });
await vi.advanceTimersByTimeAsync(50);
expect(sendSingleTextMessageMatrixMock).not.toHaveBeenCalled();
await finish();
} finally {
vi.useRealTimers();
}
});
it("resets materializedTextLength on assistant message start", async () => {
const { dispatch } = createStreamingHarness();
const { deliver, opts, finish } = await dispatch();
// Block 1: stream and deliver.
opts.onPartialReply?.({ text: "Block one" });
await vi.waitFor(() => {
expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1);
});
await deliver({ text: "Block one" }, { kind: "block" });
// Tool call delivered (bypasses draft stream).
await deliver({ text: "tool result" }, { kind: "tool" });
// New assistant message starts — payload.text will reset upstream.
opts.onAssistantMessageStart?.();
// Block 2: partial text starts fresh (no stale offset).
sendSingleTextMessageMatrixMock.mockClear();
sendSingleTextMessageMatrixMock.mockResolvedValue({ messageId: "$draft2", roomId: "!room" });
opts.onPartialReply?.({ text: "Block two" });
await vi.waitFor(() => {
expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1);
});
// The draft stream should have received "Block two", not empty string.
const sentBody = sendSingleTextMessageMatrixMock.mock.calls[0]?.[1];
expect(sentBody).toBeTruthy();
await finish();
});
it("stops draft stream on handler error (no leaked timer)", async () => {
vi.useFakeTimers();
try {
sendSingleTextMessageMatrixMock
.mockReset()
.mockResolvedValue({ messageId: "$draft1", roomId: "!room" });
editMessageMatrixMock.mockReset().mockResolvedValue("$edited");
deliverMatrixRepliesMock.mockReset().mockResolvedValue(undefined);
let capturedReplyOpts: ReplyOpts | undefined;
const { handler } = createMatrixHandlerTestHarness({
streaming: "partial",
createReplyDispatcherWithTyping: () => ({
dispatcher: { markComplete: () => {}, waitForIdle: async () => {} },
replyOptions: {},
markDispatchIdle: () => {},
markRunComplete: () => {},
}),
dispatchReplyFromConfig: vi.fn(async (args: { replyOptions?: ReplyOpts }) => {
capturedReplyOpts = args?.replyOptions;
// Simulate streaming then model error.
capturedReplyOpts?.onPartialReply?.({ text: "partial" });
await vi.waitFor(() => {
expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1);
});
throw new Error("model timeout");
}) as never,
withReplyDispatcher: async <T>(params: {
dispatcher: { markComplete?: () => void; waitForIdle?: () => Promise<void> };
run: () => Promise<T>;
onSettled?: () => void | Promise<void>;
}) => {
const result = await params.run();
await params.onSettled?.();
return result;
},
});
// Handler should not throw (outer catch absorbs it).
await handler(
"!room:example.org",
createMatrixTextMessageEvent({ eventId: "$msg1", body: "hello" }),
);
// After handler exits, draft stream timer must not fire.
sendSingleTextMessageMatrixMock.mockClear();
editMessageMatrixMock.mockClear();
await vi.advanceTimersByTimeAsync(50);
expect(sendSingleTextMessageMatrixMock).not.toHaveBeenCalled();
expect(editMessageMatrixMock).not.toHaveBeenCalled();
} finally {
vi.useRealTimers();
}
});
it("skips compaction notices in draft finalization", async () => {
const { dispatch } = createStreamingHarness();
const { deliver, opts, finish } = await dispatch();
opts.onPartialReply?.({ text: "Streaming" });
await vi.waitFor(() => {
expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1);
});
// Compaction notice should bypass draft path and go to normal delivery.
deliverMatrixRepliesMock.mockClear();
await deliver({ text: "Compacting...", isCompactionNotice: true }, { kind: "block" });
expect(deliverMatrixRepliesMock).toHaveBeenCalledTimes(1);
// Edit should NOT have been called for the compaction notice.
expect(editMessageMatrixMock).not.toHaveBeenCalled();
await finish();
});
it("redacts stale draft when payload reply target mismatches", async () => {
const { dispatch, redactEventMock } = createStreamingHarness({ replyToMode: "first" });
const { deliver, opts, finish } = await dispatch();
// Simulate streaming: partial reply creates draft message.
opts.onPartialReply?.({ text: "Partial reply" });
await vi.waitFor(() => {
expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1);
});
// Final delivery carries a different replyToId than the draft's.
deliverMatrixRepliesMock.mockClear();
await deliver({ text: "Final text", replyToId: "$different_msg" }, { kind: "final" });
// Draft should be redacted since it can't change reply relation.
expect(redactEventMock).toHaveBeenCalledWith("!room:example.org", "$draft1");
// Final answer delivered via normal path.
expect(deliverMatrixRepliesMock).toHaveBeenCalledTimes(1);
await finish();
});
it("redacts stale draft when final payload intentionally drops reply threading", async () => {
const { dispatch, redactEventMock } = createStreamingHarness({ replyToMode: "first" });
const { deliver, opts, finish } = await dispatch();
// A tool payload can consume the first reply slot upstream while draft
// streaming for the next assistant block still starts from the original
// reply target.
await deliver({ text: "tool result", replyToId: "$msg1" }, { kind: "tool" });
opts.onAssistantMessageStart?.();
opts.onPartialReply?.({ text: "Partial reply" });
await vi.waitFor(() => {
expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1);
});
deliverMatrixRepliesMock.mockClear();
await deliver({ text: "Final text" }, { kind: "final" });
expect(redactEventMock).toHaveBeenCalledWith("!room:example.org", "$draft1");
expect(deliverMatrixRepliesMock).toHaveBeenCalledTimes(1);
await finish();
});
it("redacts stale draft for media-only finals", async () => {
const { dispatch, redactEventMock } = createStreamingHarness();
const { deliver, opts, finish } = await dispatch();
opts.onPartialReply?.({ text: "Partial reply" });
await vi.waitFor(() => {
expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1);
});
deliverMatrixRepliesMock.mockClear();
await deliver({ mediaUrl: "https://example.com/image.png" }, { kind: "final" });
expect(redactEventMock).toHaveBeenCalledWith("!room:example.org", "$draft1");
expect(deliverMatrixRepliesMock).toHaveBeenCalledTimes(1);
await finish();
});
it("redacts stale draft and sends the final once when a later preview exceeds the event limit", async () => {
const { dispatch, redactEventMock } = createStreamingHarness();
const { deliver, opts, finish } = await dispatch();
opts.onPartialReply?.({ text: "1234" });
await vi.waitFor(() => {
expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1);
});
prepareMatrixSingleTextMock.mockImplementation((text: string) => {
const trimmedText = text.trim();
return {
trimmedText,
convertedText: trimmedText,
singleEventLimit: 5,
fitsInSingleEvent: trimmedText.length <= 5,
};
});
opts.onPartialReply?.({ text: "123456" });
await deliver({ text: "123456" }, { kind: "final" });
expect(editMessageMatrixMock).not.toHaveBeenCalled();
expect(redactEventMock).toHaveBeenCalledWith("!room:example.org", "$draft1");
expect(deliverMatrixRepliesMock).toHaveBeenCalledTimes(1);
expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1);
await finish();
});
});
describe("matrix monitor handler block streaming config", () => {
it("keeps final-only delivery when draft streaming is off by default", async () => {
let capturedDisableBlockStreaming: boolean | undefined;
const { handler } = createMatrixHandlerTestHarness({
streaming: "off",
dispatchReplyFromConfig: vi.fn(
async (args: { replyOptions?: { disableBlockStreaming?: boolean } }) => {
capturedDisableBlockStreaming = args.replyOptions?.disableBlockStreaming;
return { queuedFinal: false, counts: { final: 0, block: 0, tool: 0 } };
},
) as never,
});
await handler(
"!room:example.org",
createMatrixTextMessageEvent({ eventId: "$msg1", body: "hello" }),
);
expect(capturedDisableBlockStreaming).toBe(true);
});
it("disables shared block streaming when draft streaming is partial", async () => {
let capturedDisableBlockStreaming: boolean | undefined;
const { handler } = createMatrixHandlerTestHarness({
streaming: "partial",
dispatchReplyFromConfig: vi.fn(
async (args: { replyOptions?: { disableBlockStreaming?: boolean } }) => {
capturedDisableBlockStreaming = args.replyOptions?.disableBlockStreaming;
return { queuedFinal: false, counts: { final: 0, block: 0, tool: 0 } };
},
) as never,
});
await handler(
"!room:example.org",
createMatrixTextMessageEvent({ eventId: "$msg1", body: "hello" }),
);
expect(capturedDisableBlockStreaming).toBe(true);
});
it("keeps draft streaming authoritative when partial and block streaming are both enabled", async () => {
let capturedDisableBlockStreaming: boolean | undefined;
const { handler } = createMatrixHandlerTestHarness({
streaming: "partial",
blockStreamingEnabled: true,
dispatchReplyFromConfig: vi.fn(
async (args: { replyOptions?: { disableBlockStreaming?: boolean } }) => {
capturedDisableBlockStreaming = args.replyOptions?.disableBlockStreaming;
return { queuedFinal: false, counts: { final: 0, block: 0, tool: 0 } };
},
) as never,
});
await handler(
"!room:example.org",
createMatrixTextMessageEvent({ eventId: "$msg1", body: "hello" }),
);
expect(capturedDisableBlockStreaming).toBe(true);
});
it("uses shared block streaming when explicitly enabled for Matrix", async () => {
let capturedDisableBlockStreaming: boolean | undefined;
const { handler } = createMatrixHandlerTestHarness({
streaming: "off",
blockStreamingEnabled: true,
dispatchReplyFromConfig: vi.fn(
async (args: { replyOptions?: { disableBlockStreaming?: boolean } }) => {
capturedDisableBlockStreaming = args.replyOptions?.disableBlockStreaming;
return { queuedFinal: false, counts: { final: 0, block: 0, tool: 0 } };
},
) as never,
});
await handler(
"!room:example.org",
createMatrixTextMessageEvent({ eventId: "$msg1", body: "hello" }),
);
expect(capturedDisableBlockStreaming).toBe(false);
});
});