diff --git a/CHANGELOG.md b/CHANGELOG.md index b184d7aab46..20554ebd941 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -65,6 +65,7 @@ Docs: https://docs.openclaw.ai - Agents/compaction: surface safeguard-specific cancel reasons and relabel benign manual `/compact` no-op cases as skipped instead of failed. (#51072) Thanks @afurm. - Docs: add `pnpm docs:check-links:anchors` for Mintlify anchor validation while keeping `scripts/docs-link-audit.mjs` as the stable link-audit entrypoint. (#55912) Thanks @velvet-shark. - Tavily: mark outbound API requests with `X-Client-Source: openclaw` so Tavily can attribute OpenClaw-originated traffic. (#55335) Thanks @lakshyaag-tavily. +- Matrix/streaming: add `streaming: "partial"` draft replies that stay on a single editable preview message, stop preview streaming once text no longer fits one Matrix event, and clear stale previews before media-only finals. (#56387) thanks @jrusz. ### Fixes diff --git a/docs/.generated/config-baseline.json b/docs/.generated/config-baseline.json index 7c6a1200aaf..d84b918133e 100644 --- a/docs/.generated/config-baseline.json +++ b/docs/.generated/config-baseline.json @@ -2608,6 +2608,26 @@ "tags": [], "hasChildren": false }, + { + "path": "agents.defaults.memorySearch.store.fts", + "kind": "core", + "type": "object", + "required": false, + "deprecated": false, + "sensitive": false, + "tags": [], + "hasChildren": true + }, + { + "path": "agents.defaults.memorySearch.store.fts.tokenizer", + "kind": "core", + "type": "string", + "required": false, + "deprecated": false, + "sensitive": false, + "tags": [], + "hasChildren": false + }, { "path": "agents.defaults.memorySearch.store.path", "kind": "core", @@ -5028,6 +5048,26 @@ "tags": [], "hasChildren": false }, + { + "path": "agents.list.*.memorySearch.store.fts", + "kind": "core", + "type": "object", + "required": false, + "deprecated": false, + "sensitive": false, + "tags": [], + "hasChildren": true + }, + { + "path": "agents.list.*.memorySearch.store.fts.tokenizer", + "kind": "core", + "type": "string", + "required": false, + "deprecated": false, + "sensitive": false, + "tags": [], + "hasChildren": false + }, { "path": "agents.list.*.memorySearch.store.path", "kind": "core", @@ -21273,6 +21313,66 @@ ], "hasChildren": false }, + { + "path": "channels.line.accounts.*.threadBindings", + "kind": "channel", + "type": "object", + "required": false, + "deprecated": false, + "sensitive": false, + "tags": [], + "hasChildren": true + }, + { + "path": "channels.line.accounts.*.threadBindings.enabled", + "kind": "channel", + "type": "boolean", + "required": false, + "deprecated": false, + "sensitive": false, + "tags": [], + "hasChildren": false + }, + { + "path": "channels.line.accounts.*.threadBindings.idleHours", + "kind": "channel", + "type": "number", + "required": false, + "deprecated": false, + "sensitive": false, + "tags": [], + "hasChildren": false + }, + { + "path": "channels.line.accounts.*.threadBindings.maxAgeHours", + "kind": "channel", + "type": "number", + "required": false, + "deprecated": false, + "sensitive": false, + "tags": [], + "hasChildren": false + }, + { + "path": "channels.line.accounts.*.threadBindings.spawnAcpSessions", + "kind": "channel", + "type": "boolean", + "required": false, + "deprecated": false, + "sensitive": false, + "tags": [], + "hasChildren": false + }, + { + "path": "channels.line.accounts.*.threadBindings.spawnSubagentSessions", + "kind": "channel", + "type": "boolean", + "required": false, + "deprecated": false, + "sensitive": false, + "tags": [], + "hasChildren": false + }, { "path": "channels.line.accounts.*.tokenFile", "kind": "channel", @@ -21562,6 +21662,66 @@ ], "hasChildren": false }, + { + "path": "channels.line.threadBindings", + "kind": "channel", + "type": "object", + "required": false, + "deprecated": false, + "sensitive": false, + "tags": [], + "hasChildren": true + }, + { + "path": "channels.line.threadBindings.enabled", + "kind": "channel", + "type": "boolean", + "required": false, + "deprecated": false, + "sensitive": false, + "tags": [], + "hasChildren": false + }, + { + "path": "channels.line.threadBindings.idleHours", + "kind": "channel", + "type": "number", + "required": false, + "deprecated": false, + "sensitive": false, + "tags": [], + "hasChildren": false + }, + { + "path": "channels.line.threadBindings.maxAgeHours", + "kind": "channel", + "type": "number", + "required": false, + "deprecated": false, + "sensitive": false, + "tags": [], + "hasChildren": false + }, + { + "path": "channels.line.threadBindings.spawnAcpSessions", + "kind": "channel", + "type": "boolean", + "required": false, + "deprecated": false, + "sensitive": false, + "tags": [], + "hasChildren": false + }, + { + "path": "channels.line.threadBindings.spawnSubagentSessions", + "kind": "channel", + "type": "boolean", + "required": false, + "deprecated": false, + "sensitive": false, + "tags": [], + "hasChildren": false + }, { "path": "channels.line.tokenFile", "kind": "channel", @@ -22583,6 +22743,23 @@ "tags": [], "hasChildren": false }, + { + "path": "channels.matrix.streaming", + "kind": "channel", + "type": [ + "boolean", + "string" + ], + "required": false, + "enumValues": [ + "partial", + "off" + ], + "deprecated": false, + "sensitive": false, + "tags": [], + "hasChildren": false + }, { "path": "channels.matrix.textChunkLimit", "kind": "channel", diff --git a/docs/.generated/config-baseline.jsonl b/docs/.generated/config-baseline.jsonl index a0a69c7812c..cf1589942aa 100644 --- a/docs/.generated/config-baseline.jsonl +++ b/docs/.generated/config-baseline.jsonl @@ -1,4 +1,4 @@ -{"generatedBy":"scripts/generate-config-doc-baseline.ts","recordType":"meta","totalPaths":5576} +{"generatedBy":"scripts/generate-config-doc-baseline.ts","recordType":"meta","totalPaths":5593} {"recordType":"path","path":"acp","kind":"core","type":"object","required":false,"deprecated":false,"sensitive":false,"tags":["advanced"],"label":"ACP","help":"ACP runtime controls for enabling dispatch, selecting backends, constraining allowed agent targets, and tuning streamed turn projection behavior.","hasChildren":true} {"recordType":"path","path":"acp.allowedAgents","kind":"core","type":"array","required":false,"deprecated":false,"sensitive":false,"tags":["access"],"label":"ACP Allowed Agents","help":"Allowlist of ACP target agent ids permitted for ACP runtime sessions. Empty means no additional allowlist restriction.","hasChildren":true} {"recordType":"path","path":"acp.allowedAgents.*","kind":"core","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} @@ -217,6 +217,8 @@ {"recordType":"path","path":"agents.defaults.memorySearch.sources.*","kind":"core","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} {"recordType":"path","path":"agents.defaults.memorySearch.store","kind":"core","type":"object","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":true} {"recordType":"path","path":"agents.defaults.memorySearch.store.driver","kind":"core","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} +{"recordType":"path","path":"agents.defaults.memorySearch.store.fts","kind":"core","type":"object","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":true} +{"recordType":"path","path":"agents.defaults.memorySearch.store.fts.tokenizer","kind":"core","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} {"recordType":"path","path":"agents.defaults.memorySearch.store.path","kind":"core","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":["storage"],"label":"Memory Search Index Path","help":"Sets where the SQLite memory index is stored on disk for each agent. Keep the default `~/.openclaw/memory/{agentId}.sqlite` unless you need custom storage placement or backup policy alignment.","hasChildren":false} {"recordType":"path","path":"agents.defaults.memorySearch.store.vector","kind":"core","type":"object","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":true} {"recordType":"path","path":"agents.defaults.memorySearch.store.vector.enabled","kind":"core","type":"boolean","required":false,"deprecated":false,"sensitive":false,"tags":["storage"],"label":"Memory Search Vector Index","help":"Enables the sqlite-vec extension used for vector similarity queries in memory search (default: true). Keep this enabled for normal semantic recall; disable only for debugging or fallback-only operation.","hasChildren":false} @@ -443,6 +445,8 @@ {"recordType":"path","path":"agents.list.*.memorySearch.sources.*","kind":"core","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} {"recordType":"path","path":"agents.list.*.memorySearch.store","kind":"core","type":"object","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":true} {"recordType":"path","path":"agents.list.*.memorySearch.store.driver","kind":"core","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} +{"recordType":"path","path":"agents.list.*.memorySearch.store.fts","kind":"core","type":"object","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":true} +{"recordType":"path","path":"agents.list.*.memorySearch.store.fts.tokenizer","kind":"core","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} {"recordType":"path","path":"agents.list.*.memorySearch.store.path","kind":"core","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} {"recordType":"path","path":"agents.list.*.memorySearch.store.vector","kind":"core","type":"object","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":true} {"recordType":"path","path":"agents.list.*.memorySearch.store.vector.enabled","kind":"core","type":"boolean","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} @@ -1893,6 +1897,12 @@ {"recordType":"path","path":"channels.line.accounts.*.name","kind":"channel","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} {"recordType":"path","path":"channels.line.accounts.*.responsePrefix","kind":"channel","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} {"recordType":"path","path":"channels.line.accounts.*.secretFile","kind":"channel","type":"string","required":false,"deprecated":false,"sensitive":true,"tags":["auth","channels","network","security","storage"],"hasChildren":false} +{"recordType":"path","path":"channels.line.accounts.*.threadBindings","kind":"channel","type":"object","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":true} +{"recordType":"path","path":"channels.line.accounts.*.threadBindings.enabled","kind":"channel","type":"boolean","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} +{"recordType":"path","path":"channels.line.accounts.*.threadBindings.idleHours","kind":"channel","type":"number","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} +{"recordType":"path","path":"channels.line.accounts.*.threadBindings.maxAgeHours","kind":"channel","type":"number","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} +{"recordType":"path","path":"channels.line.accounts.*.threadBindings.spawnAcpSessions","kind":"channel","type":"boolean","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} +{"recordType":"path","path":"channels.line.accounts.*.threadBindings.spawnSubagentSessions","kind":"channel","type":"boolean","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} {"recordType":"path","path":"channels.line.accounts.*.tokenFile","kind":"channel","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} {"recordType":"path","path":"channels.line.accounts.*.webhookPath","kind":"channel","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} {"recordType":"path","path":"channels.line.allowFrom","kind":"channel","type":"array","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":true} @@ -1918,6 +1928,12 @@ {"recordType":"path","path":"channels.line.name","kind":"channel","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} {"recordType":"path","path":"channels.line.responsePrefix","kind":"channel","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} {"recordType":"path","path":"channels.line.secretFile","kind":"channel","type":"string","required":false,"deprecated":false,"sensitive":true,"tags":["auth","channels","network","security","storage"],"hasChildren":false} +{"recordType":"path","path":"channels.line.threadBindings","kind":"channel","type":"object","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":true} +{"recordType":"path","path":"channels.line.threadBindings.enabled","kind":"channel","type":"boolean","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} +{"recordType":"path","path":"channels.line.threadBindings.idleHours","kind":"channel","type":"number","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} +{"recordType":"path","path":"channels.line.threadBindings.maxAgeHours","kind":"channel","type":"number","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} +{"recordType":"path","path":"channels.line.threadBindings.spawnAcpSessions","kind":"channel","type":"boolean","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} +{"recordType":"path","path":"channels.line.threadBindings.spawnSubagentSessions","kind":"channel","type":"boolean","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} {"recordType":"path","path":"channels.line.tokenFile","kind":"channel","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} {"recordType":"path","path":"channels.line.webhookPath","kind":"channel","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} {"recordType":"path","path":"channels.matrix","kind":"channel","type":"object","required":false,"deprecated":false,"sensitive":false,"tags":["channels","network"],"label":"Matrix","help":"open protocol; install the plugin to enable.","hasChildren":true} @@ -2011,6 +2027,7 @@ {"recordType":"path","path":"channels.matrix.rooms.*.users.*","kind":"channel","type":["number","string"],"required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} {"recordType":"path","path":"channels.matrix.startupVerification","kind":"channel","type":"string","required":false,"enumValues":["off","if-unverified"],"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} {"recordType":"path","path":"channels.matrix.startupVerificationCooldownHours","kind":"channel","type":"number","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} +{"recordType":"path","path":"channels.matrix.streaming","kind":"channel","type":["boolean","string"],"required":false,"enumValues":["partial","off"],"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} {"recordType":"path","path":"channels.matrix.textChunkLimit","kind":"channel","type":"number","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} {"recordType":"path","path":"channels.matrix.threadBindings","kind":"channel","type":"object","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":true} {"recordType":"path","path":"channels.matrix.threadBindings.enabled","kind":"channel","type":"boolean","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} diff --git a/docs/channels/matrix.md b/docs/channels/matrix.md index a2dafba6742..06a03c5fc76 100644 --- a/docs/channels/matrix.md +++ b/docs/channels/matrix.md @@ -157,11 +157,36 @@ This is a practical baseline config with DM pairing, room allowlist, and E2EE en autoJoinAllowlist: ["!roomid:example.org"], threadReplies: "inbound", replyToMode: "off", + streaming: "partial", }, }, } ``` +## Streaming previews + +Matrix reply streaming is opt-in. + +Set `channels.matrix.streaming` to `"partial"` when you want OpenClaw to send a single draft reply, +edit that draft in place while the model is generating text, and then finalize it when the reply is +done: + +```json5 +{ + channels: { + matrix: { + streaming: "partial", + }, + }, +} +``` + +- `streaming: "off"` is the default. OpenClaw waits for the final reply and sends it once. +- `streaming: "partial"` creates one editable preview message instead of sending multiple partial messages. +- If the preview no longer fits in one Matrix event, OpenClaw stops preview streaming and falls back to normal final delivery. +- Media replies still send attachments normally. If a stale preview can no longer be reused safely, OpenClaw redacts it before sending the final media reply. +- Preview edits cost extra Matrix API calls. Leave streaming off if you want the most conservative rate-limit behavior. + ## E2EE setup ## Bot to bot rooms @@ -673,6 +698,7 @@ Live directory lookup uses the logged-in Matrix account: - `groupAllowFrom`: allowlist of user IDs for room traffic. - `groupAllowFrom` entries should be full Matrix user IDs. Unresolved names are ignored at runtime. - `replyToMode`: `off`, `first`, or `all`. +- `streaming`: `off` (default) or `partial`. `partial` enables single-message draft previews with edit-in-place updates. - `threadReplies`: `off`, `inbound`, or `always`. - `threadBindings`: per-channel overrides for thread-bound session routing and lifecycle. - `startupVerification`: automatic self-verification request mode on startup (`if-unverified`, `off`). diff --git a/extensions/matrix/src/config-schema.ts b/extensions/matrix/src/config-schema.ts index b20c6146bd5..07e5f4fe65e 100644 --- a/extensions/matrix/src/config-schema.ts +++ b/extensions/matrix/src/config-schema.ts @@ -64,6 +64,7 @@ export const MatrixConfigSchema = z.object({ allowlistOnly: z.boolean().optional(), allowBots: z.union([z.boolean(), z.literal("mentions")]).optional(), groupPolicy: GroupPolicySchema.optional(), + streaming: z.union([z.enum(["partial", "off"]), z.boolean()]).optional(), replyToMode: z.enum(["off", "first", "all"]).optional(), threadReplies: z.enum(["off", "inbound", "always"]).optional(), textChunkLimit: z.number().optional(), diff --git a/extensions/matrix/src/matrix/draft-stream.test.ts b/extensions/matrix/src/matrix/draft-stream.test.ts new file mode 100644 index 00000000000..ec5a0ab7039 --- /dev/null +++ b/extensions/matrix/src/matrix/draft-stream.test.ts @@ -0,0 +1,326 @@ +import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; +import type { PluginRuntime } from "../runtime-api.js"; + +const loadConfigMock = vi.fn(() => ({})); +const resolveTextChunkLimitMock = vi.fn< + (cfg: unknown, channel: unknown, accountId?: unknown) => number +>(() => 4000); +const resolveChunkModeMock = vi.fn<(cfg: unknown, channel: unknown, accountId?: unknown) => string>( + () => "length", +); +const chunkMarkdownTextWithModeMock = vi.fn((text: string) => (text ? [text] : [])); +const convertMarkdownTablesMock = vi.fn((text: string) => text); +const runtimeStub = { + config: { loadConfig: () => loadConfigMock() }, + channel: { + text: { + resolveTextChunkLimit: (cfg: unknown, channel: unknown, accountId?: unknown) => + resolveTextChunkLimitMock(cfg, channel, accountId), + resolveChunkMode: (cfg: unknown, channel: unknown, accountId?: unknown) => + resolveChunkModeMock(cfg, channel, accountId), + chunkMarkdownText: (text: string) => (text ? [text] : []), + chunkMarkdownTextWithMode: (text: string) => chunkMarkdownTextWithModeMock(text), + resolveMarkdownTableMode: () => "code", + convertMarkdownTables: (text: string) => convertMarkdownTablesMock(text), + }, + }, +} as unknown as PluginRuntime; + +let createMatrixDraftStream: typeof import("./draft-stream.js").createMatrixDraftStream; + +const sendMessageMock = vi.fn(); +const sendEventMock = vi.fn(); +const joinedRoomsMock = vi.fn().mockResolvedValue([]); + +function createMockClient() { + sendMessageMock.mockReset().mockResolvedValue("$evt1"); + sendEventMock.mockReset().mockResolvedValue("$evt2"); + joinedRoomsMock.mockReset().mockResolvedValue(["!room:test"]); + return { + sendMessage: sendMessageMock, + sendEvent: sendEventMock, + getJoinedRooms: joinedRoomsMock, + prepareForOneOff: vi.fn().mockResolvedValue(undefined), + start: vi.fn().mockResolvedValue(undefined), + } as unknown as import("./sdk.js").MatrixClient; +} + +beforeAll(async () => { + vi.resetModules(); + const runtimeModule = await import("../runtime.js"); + runtimeModule.setMatrixRuntime(runtimeStub); + ({ createMatrixDraftStream } = await import("./draft-stream.js")); +}); + +describe("createMatrixDraftStream", () => { + let client: ReturnType; + + beforeEach(() => { + vi.useFakeTimers(); + client = createMockClient(); + resolveTextChunkLimitMock.mockReset().mockReturnValue(4000); + resolveChunkModeMock.mockReset().mockReturnValue("length"); + chunkMarkdownTextWithModeMock + .mockReset() + .mockImplementation((text: string) => (text ? [text] : [])); + convertMarkdownTablesMock.mockReset().mockImplementation((text: string) => text); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it("sends a new message on first update", async () => { + const stream = createMatrixDraftStream({ + roomId: "!room:test", + client, + cfg: {} as import("../types.js").CoreConfig, + }); + + stream.update("Hello"); + await stream.flush(); + + expect(sendMessageMock).toHaveBeenCalledTimes(1); + expect(stream.eventId()).toBe("$evt1"); + }); + + it("edits the message on subsequent updates", async () => { + const stream = createMatrixDraftStream({ + roomId: "!room:test", + client, + cfg: {} as import("../types.js").CoreConfig, + }); + + stream.update("Hello"); + await stream.flush(); + expect(sendMessageMock).toHaveBeenCalledTimes(1); + + // Advance past throttle window so the next update fires immediately. + vi.advanceTimersByTime(1000); + + stream.update("Hello world"); + await stream.flush(); + + // First call = initial send, second call = edit (both go through sendMessage) + expect(sendMessageMock).toHaveBeenCalledTimes(2); + }); + + it("coalesces rapid updates within throttle window", async () => { + const stream = createMatrixDraftStream({ + roomId: "!room:test", + client, + cfg: {} as import("../types.js").CoreConfig, + }); + + stream.update("A"); + stream.update("AB"); + stream.update("ABC"); + await stream.flush(); + + // First update fires immediately (fresh throttle window), then AB/ABC + // coalesce into a single edit with the latest text. + expect(sendMessageMock).toHaveBeenCalledTimes(2); + expect(sendMessageMock.mock.calls[0][1]).toMatchObject({ body: "A" }); + // Edit uses "* " prefix per Matrix m.replace spec. + expect(sendMessageMock.mock.calls[1][1]).toMatchObject({ body: "* ABC" }); + }); + + it("skips no-op updates", async () => { + const stream = createMatrixDraftStream({ + roomId: "!room:test", + client, + cfg: {} as import("../types.js").CoreConfig, + }); + + stream.update("Hello"); + await stream.flush(); + const callCount = sendMessageMock.mock.calls.length; + + vi.advanceTimersByTime(1000); + + // Same text again — should not send + stream.update("Hello"); + await stream.flush(); + expect(sendMessageMock).toHaveBeenCalledTimes(callCount); + }); + + it("ignores updates after stop", async () => { + const stream = createMatrixDraftStream({ + roomId: "!room:test", + client, + cfg: {} as import("../types.js").CoreConfig, + }); + + stream.update("Hello"); + await stream.stop(); + const callCount = sendMessageMock.mock.calls.length; + + stream.update("Ignored"); + await stream.flush(); + expect(sendMessageMock).toHaveBeenCalledTimes(callCount); + }); + + it("stop returns the event ID", async () => { + const stream = createMatrixDraftStream({ + roomId: "!room:test", + client, + cfg: {} as import("../types.js").CoreConfig, + }); + + stream.update("Hello"); + const eventId = await stream.stop(); + expect(eventId).toBe("$evt1"); + }); + + it("reset allows reuse for next block", async () => { + sendMessageMock.mockResolvedValueOnce("$first").mockResolvedValueOnce("$second"); + + const stream = createMatrixDraftStream({ + roomId: "!room:test", + client, + cfg: {} as import("../types.js").CoreConfig, + }); + + stream.update("Block 1"); + await stream.stop(); + expect(stream.eventId()).toBe("$first"); + + stream.reset(); + expect(stream.eventId()).toBeUndefined(); + + stream.update("Block 2"); + await stream.stop(); + expect(stream.eventId()).toBe("$second"); + }); + + it("stops retrying after send failure", async () => { + sendMessageMock.mockRejectedValueOnce(new Error("network error")); + + const log = vi.fn(); + const stream = createMatrixDraftStream({ + roomId: "!room:test", + client, + cfg: {} as import("../types.js").CoreConfig, + log, + }); + + stream.update("Hello"); + await stream.flush(); + + // Should have logged the failure + expect(log).toHaveBeenCalledWith(expect.stringContaining("send/edit failed")); + + vi.advanceTimersByTime(1000); + + // Further updates should not attempt sends (stream is stopped) + stream.update("More text"); + await stream.flush(); + + // Only the initial failed attempt + expect(sendMessageMock).toHaveBeenCalledTimes(1); + expect(stream.eventId()).toBeUndefined(); + }); + + it("skips empty/whitespace text", async () => { + const stream = createMatrixDraftStream({ + roomId: "!room:test", + client, + cfg: {} as import("../types.js").CoreConfig, + }); + + stream.update(" "); + await stream.flush(); + + expect(sendMessageMock).not.toHaveBeenCalled(); + }); + + it("stops on edit failure mid-stream", async () => { + sendMessageMock + .mockResolvedValueOnce("$evt1") // initial send succeeds + .mockRejectedValueOnce(new Error("rate limited")); // edit fails + + const log = vi.fn(); + const stream = createMatrixDraftStream({ + roomId: "!room:test", + client, + cfg: {} as import("../types.js").CoreConfig, + log, + }); + + stream.update("Hello"); + await stream.flush(); + expect(stream.eventId()).toBe("$evt1"); + + vi.advanceTimersByTime(1000); + + stream.update("Hello world"); + await stream.flush(); + expect(log).toHaveBeenCalledWith(expect.stringContaining("send/edit failed")); + + vi.advanceTimersByTime(1000); + + // Stream should be stopped — further updates are ignored + stream.update("More text"); + await stream.flush(); + expect(sendMessageMock).toHaveBeenCalledTimes(2); + }); + + it("bypasses newline chunking for the draft preview message", async () => { + resolveChunkModeMock.mockReturnValue("newline"); + chunkMarkdownTextWithModeMock.mockImplementation((text: string) => text.split("\n")); + + const stream = createMatrixDraftStream({ + roomId: "!room:test", + client, + cfg: {} as import("../types.js").CoreConfig, + }); + + stream.update("line 1\nline 2"); + await stream.flush(); + + expect(sendMessageMock).toHaveBeenCalledTimes(1); + expect(sendMessageMock.mock.calls[0]?.[1]).toMatchObject({ body: "line 1\nline 2" }); + }); + + it("falls back to normal delivery when preview text exceeds one Matrix event", async () => { + const log = vi.fn(); + resolveTextChunkLimitMock.mockReturnValue(5); + const stream = createMatrixDraftStream({ + roomId: "!room:test", + client, + cfg: {} as import("../types.js").CoreConfig, + log, + }); + + stream.update("123456"); + await stream.flush(); + + expect(sendMessageMock).not.toHaveBeenCalled(); + expect(stream.eventId()).toBeUndefined(); + expect(stream.mustDeliverFinalNormally()).toBe(true); + expect(log).toHaveBeenCalledWith( + expect.stringContaining("preview exceeded single-event limit"), + ); + }); + + it("uses converted Matrix text when checking the single-event preview limit", async () => { + const log = vi.fn(); + resolveTextChunkLimitMock.mockReturnValue(5); + convertMarkdownTablesMock.mockImplementation(() => "123456"); + const stream = createMatrixDraftStream({ + roomId: "!room:test", + client, + cfg: {} as import("../types.js").CoreConfig, + log, + }); + + stream.update("1234"); + await stream.flush(); + + expect(sendMessageMock).not.toHaveBeenCalled(); + expect(stream.mustDeliverFinalNormally()).toBe(true); + expect(log).toHaveBeenCalledWith( + expect.stringContaining("preview exceeded single-event limit"), + ); + }); +}); diff --git a/extensions/matrix/src/matrix/draft-stream.ts b/extensions/matrix/src/matrix/draft-stream.ts new file mode 100644 index 00000000000..851fb902001 --- /dev/null +++ b/extensions/matrix/src/matrix/draft-stream.ts @@ -0,0 +1,154 @@ +import { createDraftStreamLoop } from "openclaw/plugin-sdk/channel-lifecycle"; +import type { CoreConfig } from "../types.js"; +import type { MatrixClient } from "./sdk.js"; +import { editMessageMatrix, prepareMatrixSingleText, sendSingleTextMessageMatrix } from "./send.js"; + +const DEFAULT_THROTTLE_MS = 1000; + +export type MatrixDraftStream = { + /** Update the draft with the latest accumulated text for the current block. */ + update: (text: string) => void; + /** Ensure the last pending update has been sent. */ + flush: () => Promise; + /** Flush and mark this block as done. Returns the event ID if a message was sent. */ + stop: () => Promise; + /** Reset state for the next text block (after tool calls). */ + reset: () => void; + /** The event ID of the current draft message, if any. */ + eventId: () => string | undefined; + /** The last text successfully sent or edited. */ + lastSentText: () => string; + /** True when preview streaming must fall back to normal final delivery. */ + mustDeliverFinalNormally: () => boolean; +}; + +export function createMatrixDraftStream(params: { + roomId: string; + client: MatrixClient; + cfg: CoreConfig; + threadId?: string; + replyToId?: string; + /** When true, reset() restores the original replyToId instead of clearing it. */ + preserveReplyId?: boolean; + accountId?: string; + log?: (message: string) => void; +}): MatrixDraftStream { + const { roomId, client, cfg, threadId, accountId, log } = params; + + let currentEventId: string | undefined; + let lastSentText = ""; + let stopped = false; + let sendFailed = false; + let finalizeInPlaceBlocked = false; + let replyToId = params.replyToId; + + const sendOrEdit = async (text: string): Promise => { + const trimmed = text.trimEnd(); + if (!trimmed) { + return false; + } + const preparedText = prepareMatrixSingleText(trimmed, { cfg, accountId }); + if (!preparedText.fitsInSingleEvent) { + finalizeInPlaceBlocked = true; + if (!currentEventId) { + sendFailed = true; + } + stopped = true; + log?.( + `draft-stream: preview exceeded single-event limit (${preparedText.convertedText.length} > ${preparedText.singleEventLimit})`, + ); + return false; + } + // If the initial send failed, stop trying for this block. The deliver + // callback will fall back to deliverMatrixReplies. + if (sendFailed) { + return false; + } + if (preparedText.trimmedText === lastSentText) { + return true; + } + try { + if (!currentEventId) { + const result = await sendSingleTextMessageMatrix(roomId, preparedText.trimmedText, { + client, + cfg, + replyToId, + threadId, + accountId, + }); + currentEventId = result.messageId; + lastSentText = preparedText.trimmedText; + log?.(`draft-stream: created message ${currentEventId}`); + } else { + await editMessageMatrix(roomId, currentEventId, preparedText.trimmedText, { + client, + cfg, + threadId, + accountId, + }); + lastSentText = preparedText.trimmedText; + } + return true; + } catch (err) { + log?.(`draft-stream: send/edit failed: ${String(err)}`); + const isPreviewLimitError = + err instanceof Error && err.message.startsWith("Matrix single-message text exceeds limit"); + if (isPreviewLimitError) { + // Once the preview no longer fits in one editable event, preserve the + // current preview as-is and fall back to normal final delivery. + finalizeInPlaceBlocked = true; + } + if (!currentEventId) { + // First send failed — give up for this block so the deliver callback + // falls through to normal delivery. + sendFailed = true; + } + // Signal failure so the loop stops retrying. + stopped = true; + return false; + } + }; + + const loop = createDraftStreamLoop({ + throttleMs: DEFAULT_THROTTLE_MS, + isStopped: () => stopped, + sendOrEditStreamMessage: sendOrEdit, + }); + + log?.(`draft-stream: ready (throttleMs=${DEFAULT_THROTTLE_MS})`); + + const stop = async (): Promise => { + // Flush before marking stopped so the loop can drain pending text. + await loop.flush(); + stopped = true; + return currentEventId; + }; + + const reset = (): void => { + // Clear reply context unless preserveReplyId is set (replyToMode "all"), + // in which case subsequent blocks should keep replying to the original. + replyToId = params.preserveReplyId ? params.replyToId : undefined; + currentEventId = undefined; + lastSentText = ""; + stopped = false; + sendFailed = false; + finalizeInPlaceBlocked = false; + loop.resetPending(); + loop.resetThrottleWindow(); + }; + + return { + update: (text: string) => { + if (stopped) { + return; + } + loop.update(text); + }, + flush: loop.flush, + stop, + reset, + eventId: () => currentEventId, + lastSentText: () => lastSentText, + mustDeliverFinalNormally: () => sendFailed || finalizeInPlaceBlocked, + }; +} diff --git a/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts b/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts index 5d8ff42dd7f..5bd3caf40cd 100644 --- a/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts +++ b/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts @@ -30,6 +30,7 @@ type MatrixHandlerTestHarnessOptions = { groupPolicy?: "open" | "allowlist" | "disabled"; replyToMode?: ReplyToMode; threadReplies?: "off" | "inbound" | "always"; + streaming?: "partial" | "off"; dmEnabled?: boolean; dmPolicy?: "pairing" | "allowlist" | "open" | "disabled"; textLimit?: number; @@ -210,6 +211,7 @@ export function createMatrixHandlerTestHarness( groupPolicy: options.groupPolicy ?? "open", replyToMode: options.replyToMode ?? "off", threadReplies: options.threadReplies ?? "inbound", + streaming: options.streaming ?? "off", dmEnabled: options.dmEnabled ?? true, dmPolicy: options.dmPolicy ?? "open", textLimit: options.textLimit ?? 8_000, diff --git a/extensions/matrix/src/matrix/monitor/handler.test.ts b/extensions/matrix/src/matrix/monitor/handler.test.ts index 283047d3ba3..ffa18c328fd 100644 --- a/extensions/matrix/src/matrix/monitor/handler.test.ts +++ b/extensions/matrix/src/matrix/monitor/handler.test.ts @@ -17,17 +17,50 @@ import { EventType } from "./types.js"; const sendMessageMatrixMock = vi.hoisted(() => vi.fn(async (..._args: unknown[]) => ({ messageId: "evt", roomId: "!room" })), ); +const sendSingleTextMessageMatrixMock = vi.hoisted(() => + vi.fn(async (..._args: unknown[]) => ({ messageId: "$draft1", roomId: "!room" })), +); +const editMessageMatrixMock = vi.hoisted(() => vi.fn(async () => "$edited")); +const prepareMatrixSingleTextMock = vi.hoisted(() => + vi.fn((text: string) => { + const trimmedText = text.trim(); + return { + trimmedText, + convertedText: trimmedText, + singleEventLimit: 4000, + fitsInSingleEvent: true, + }; + }), +); vi.mock("../send.js", () => ({ + editMessageMatrix: editMessageMatrixMock, + prepareMatrixSingleText: prepareMatrixSingleTextMock, reactMatrixMessage: vi.fn(async () => {}), sendMessageMatrix: sendMessageMatrixMock, + sendSingleTextMessageMatrix: sendSingleTextMessageMatrixMock, sendReadReceiptMatrix: vi.fn(async () => {}), sendTypingMatrix: vi.fn(async () => {}), })); +const deliverMatrixRepliesMock = vi.hoisted(() => vi.fn(async () => {})); + +vi.mock("./replies.js", () => ({ + deliverMatrixReplies: deliverMatrixRepliesMock, +})); + beforeEach(() => { sessionBindingTesting.resetSessionBindingAdaptersForTests(); installMatrixMonitorTestRuntime(); + prepareMatrixSingleTextMock.mockReset().mockImplementation((text: string) => { + const trimmedText = text.trim(); + return { + trimmedText, + convertedText: trimmedText, + singleEventLimit: 4000, + fitsInSingleEvent: true, + }; + }); }); function createReactionHarness(params?: { @@ -852,6 +885,7 @@ describe("matrix monitor handler pairing account scope", () => { groupPolicy: "open", replyToMode: "off", threadReplies: "inbound", + streaming: "off", dmEnabled: true, dmPolicy: "open", textLimit: 8_000, @@ -1370,3 +1404,314 @@ describe("matrix monitor handler durable inbound dedupe", () => { expect(inboundDeduper.releaseEvent).not.toHaveBeenCalled(); }); }); + +describe("matrix monitor handler draft streaming", () => { + type DeliverFn = ( + payload: { + text?: string; + mediaUrl?: string; + mediaUrls?: string[]; + isCompactionNotice?: boolean; + replyToId?: string; + }, + info: { kind: string }, + ) => Promise; + type ReplyOpts = { + onPartialReply?: (payload: { text: string }) => void; + onAssistantMessageStart?: () => void; + disableBlockStreaming?: boolean; + }; + + function createStreamingHarness(opts?: { replyToMode?: "off" | "first" | "all" }) { + let capturedDeliver: DeliverFn | undefined; + let capturedReplyOpts: ReplyOpts | undefined; + // Gate that keeps the handler's model run alive until the test releases it. + let resolveRunGate: (() => void) | undefined; + const runGate = new Promise((resolve) => { + resolveRunGate = resolve; + }); + + sendMessageMatrixMock.mockReset().mockResolvedValue({ messageId: "$draft1", roomId: "!room" }); + sendSingleTextMessageMatrixMock + .mockReset() + .mockResolvedValue({ messageId: "$draft1", roomId: "!room" }); + editMessageMatrixMock.mockReset().mockResolvedValue("$edited"); + deliverMatrixRepliesMock.mockReset().mockResolvedValue(undefined); + + const redactEventMock = vi.fn(async () => "$redacted"); + + const { handler } = createMatrixHandlerTestHarness({ + streaming: "partial", + replyToMode: opts?.replyToMode ?? "off", + client: { redactEvent: redactEventMock }, + createReplyDispatcherWithTyping: (params: Record | undefined) => { + capturedDeliver = params?.deliver as DeliverFn | undefined; + return { + dispatcher: { + markComplete: () => {}, + waitForIdle: async () => {}, + }, + replyOptions: {}, + markDispatchIdle: () => {}, + markRunComplete: () => {}, + }; + }, + dispatchReplyFromConfig: vi.fn(async (args: { replyOptions?: ReplyOpts }) => { + capturedReplyOpts = args?.replyOptions; + // Block until the test is done exercising callbacks. + await runGate; + return { queuedFinal: true, counts: { final: 1, block: 0, tool: 0 } }; + }) as never, + withReplyDispatcher: async (params: { + dispatcher: { markComplete?: () => void; waitForIdle?: () => Promise }; + run: () => Promise; + onSettled?: () => void | Promise; + }) => { + const result = await params.run(); + await params.onSettled?.(); + return result; + }, + }); + + const dispatch = async () => { + // Start handler without awaiting — it blocks on runGate. + const handlerDone = handler( + "!room:example.org", + createMatrixTextMessageEvent({ eventId: "$msg1", body: "hello" }), + ); + // Wait for callbacks to be captured. + await vi.waitFor(() => { + if (!capturedDeliver || !capturedReplyOpts) { + throw new Error("Streaming callbacks not captured yet"); + } + }); + return { + deliver: capturedDeliver!, + opts: capturedReplyOpts!, + // Release the run gate and wait for the handler to finish + // (including the finally block that stops the draft stream). + finish: async () => { + resolveRunGate?.(); + await handlerDone; + }, + }; + }; + + return { dispatch, redactEventMock }; + } + + it("falls back to deliverMatrixReplies when final edit fails", async () => { + const { dispatch } = createStreamingHarness(); + const { deliver, opts, finish } = await dispatch(); + + // Simulate streaming: partial reply creates draft message. + opts.onPartialReply?.({ text: "Hello" }); + // Wait for the draft stream's immediate send to complete. + await vi.waitFor(() => { + expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1); + }); + + // Make the final edit fail. + editMessageMatrixMock.mockRejectedValueOnce(new Error("rate limited")); + + // Deliver final — should catch edit failure and fall back. + await deliver({ text: "Hello world" }, { kind: "block" }); + + expect(deliverMatrixRepliesMock).toHaveBeenCalledTimes(1); + await finish(); + }); + + it("does not reset draft stream after final delivery", async () => { + vi.useFakeTimers(); + try { + const { dispatch } = createStreamingHarness(); + const { deliver, opts, finish } = await dispatch(); + + opts.onPartialReply?.({ text: "Hello" }); + await vi.waitFor(() => { + expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1); + }); + + // Final delivery — stream should stay stopped. + await deliver({ text: "Hello" }, { kind: "final" }); + + // Further partial updates should NOT create new messages. + sendSingleTextMessageMatrixMock.mockClear(); + opts.onPartialReply?.({ text: "Ghost" }); + + await vi.advanceTimersByTimeAsync(50); + expect(sendSingleTextMessageMatrixMock).not.toHaveBeenCalled(); + await finish(); + } finally { + vi.useRealTimers(); + } + }); + + it("resets materializedTextLength on assistant message start", async () => { + const { dispatch } = createStreamingHarness(); + const { deliver, opts, finish } = await dispatch(); + + // Block 1: stream and deliver. + opts.onPartialReply?.({ text: "Block one" }); + await vi.waitFor(() => { + expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1); + }); + await deliver({ text: "Block one" }, { kind: "block" }); + + // Tool call delivered (bypasses draft stream). + await deliver({ text: "tool result" }, { kind: "tool" }); + + // New assistant message starts — payload.text will reset upstream. + opts.onAssistantMessageStart?.(); + + // Block 2: partial text starts fresh (no stale offset). + sendSingleTextMessageMatrixMock.mockClear(); + sendSingleTextMessageMatrixMock.mockResolvedValue({ messageId: "$draft2", roomId: "!room" }); + + opts.onPartialReply?.({ text: "Block two" }); + await vi.waitFor(() => { + expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1); + }); + + // The draft stream should have received "Block two", not empty string. + const sentBody = sendSingleTextMessageMatrixMock.mock.calls[0]?.[1]; + expect(sentBody).toBeTruthy(); + await finish(); + }); + + it("stops draft stream on handler error (no leaked timer)", async () => { + vi.useFakeTimers(); + try { + sendSingleTextMessageMatrixMock + .mockReset() + .mockResolvedValue({ messageId: "$draft1", roomId: "!room" }); + editMessageMatrixMock.mockReset().mockResolvedValue("$edited"); + deliverMatrixRepliesMock.mockReset().mockResolvedValue(undefined); + + let capturedReplyOpts: ReplyOpts | undefined; + + const { handler } = createMatrixHandlerTestHarness({ + streaming: "partial", + createReplyDispatcherWithTyping: () => ({ + dispatcher: { markComplete: () => {}, waitForIdle: async () => {} }, + replyOptions: {}, + markDispatchIdle: () => {}, + markRunComplete: () => {}, + }), + dispatchReplyFromConfig: vi.fn(async (args: { replyOptions?: ReplyOpts }) => { + capturedReplyOpts = args?.replyOptions; + // Simulate streaming then model error. + capturedReplyOpts?.onPartialReply?.({ text: "partial" }); + await vi.waitFor(() => { + expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1); + }); + throw new Error("model timeout"); + }) as never, + withReplyDispatcher: async (params: { + dispatcher: { markComplete?: () => void; waitForIdle?: () => Promise }; + run: () => Promise; + onSettled?: () => void | Promise; + }) => { + const result = await params.run(); + await params.onSettled?.(); + return result; + }, + }); + + // Handler should not throw (outer catch absorbs it). + await handler( + "!room:example.org", + createMatrixTextMessageEvent({ eventId: "$msg1", body: "hello" }), + ); + + // After handler exits, draft stream timer must not fire. + sendSingleTextMessageMatrixMock.mockClear(); + editMessageMatrixMock.mockClear(); + await vi.advanceTimersByTimeAsync(50); + expect(sendSingleTextMessageMatrixMock).not.toHaveBeenCalled(); + expect(editMessageMatrixMock).not.toHaveBeenCalled(); + } finally { + vi.useRealTimers(); + } + }); + + it("skips compaction notices in draft finalization", async () => { + const { dispatch } = createStreamingHarness(); + const { deliver, opts, finish } = await dispatch(); + + opts.onPartialReply?.({ text: "Streaming" }); + await vi.waitFor(() => { + expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1); + }); + + // Compaction notice should bypass draft path and go to normal delivery. + deliverMatrixRepliesMock.mockClear(); + await deliver({ text: "Compacting...", isCompactionNotice: true }, { kind: "block" }); + + expect(deliverMatrixRepliesMock).toHaveBeenCalledTimes(1); + // Edit should NOT have been called for the compaction notice. + expect(editMessageMatrixMock).not.toHaveBeenCalled(); + await finish(); + }); + + it("redacts stale draft when payload reply target mismatches", async () => { + const { dispatch, redactEventMock } = createStreamingHarness({ replyToMode: "first" }); + const { deliver, opts, finish } = await dispatch(); + + // Simulate streaming: partial reply creates draft message. + opts.onPartialReply?.({ text: "Partial reply" }); + await vi.waitFor(() => { + expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1); + }); + + // Final delivery carries a different replyToId than the draft's. + deliverMatrixRepliesMock.mockClear(); + await deliver({ text: "Final text", replyToId: "$different_msg" }, { kind: "final" }); + + // Draft should be redacted since it can't change reply relation. + expect(redactEventMock).toHaveBeenCalledWith("!room:example.org", "$draft1"); + // Final answer delivered via normal path. + expect(deliverMatrixRepliesMock).toHaveBeenCalledTimes(1); + await finish(); + }); + + it("redacts stale draft when final payload intentionally drops reply threading", async () => { + const { dispatch, redactEventMock } = createStreamingHarness({ replyToMode: "first" }); + const { deliver, opts, finish } = await dispatch(); + + // A tool payload can consume the first reply slot upstream while draft + // streaming for the next assistant block still starts from the original + // reply target. + await deliver({ text: "tool result", replyToId: "$msg1" }, { kind: "tool" }); + opts.onAssistantMessageStart?.(); + + opts.onPartialReply?.({ text: "Partial reply" }); + await vi.waitFor(() => { + expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1); + }); + + deliverMatrixRepliesMock.mockClear(); + await deliver({ text: "Final text" }, { kind: "final" }); + + expect(redactEventMock).toHaveBeenCalledWith("!room:example.org", "$draft1"); + expect(deliverMatrixRepliesMock).toHaveBeenCalledTimes(1); + await finish(); + }); + + it("redacts stale draft for media-only finals", async () => { + const { dispatch, redactEventMock } = createStreamingHarness(); + const { deliver, opts, finish } = await dispatch(); + + opts.onPartialReply?.({ text: "Partial reply" }); + await vi.waitFor(() => { + expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1); + }); + + deliverMatrixRepliesMock.mockClear(); + await deliver({ mediaUrl: "https://example.com/image.png" }, { kind: "final" }); + + expect(redactEventMock).toHaveBeenCalledWith("!room:example.org", "$draft1"); + expect(deliverMatrixRepliesMock).toHaveBeenCalledTimes(1); + await finish(); + }); +}); diff --git a/extensions/matrix/src/matrix/monitor/handler.ts b/extensions/matrix/src/matrix/monitor/handler.ts index 508023344a3..305d7f0a321 100644 --- a/extensions/matrix/src/matrix/monitor/handler.ts +++ b/extensions/matrix/src/matrix/monitor/handler.ts @@ -14,6 +14,7 @@ import { type RuntimeLogger, } from "../../runtime-api.js"; import type { CoreConfig, MatrixRoomConfig, ReplyToMode } from "../../types.js"; +import { createMatrixDraftStream } from "../draft-stream.js"; import { formatMatrixMediaUnavailableText } from "../media-text.js"; import { fetchMatrixPollSnapshot } from "../poll-summary.js"; import { @@ -24,6 +25,7 @@ import { } from "../poll-types.js"; import type { LocationMessageEventContent, MatrixClient } from "../sdk.js"; import { + editMessageMatrix, reactMatrixMessage, sendMessageMatrix, sendReadReceiptMatrix, @@ -68,6 +70,7 @@ export type MatrixMonitorHandlerParams = { groupPolicy: "open" | "allowlist" | "disabled"; replyToMode: ReplyToMode; threadReplies: "off" | "inbound" | "always"; + streaming: "partial" | "off"; dmEnabled: boolean; dmPolicy: "open" | "pairing" | "allowlist" | "disabled"; textLimit: number; @@ -160,6 +163,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam groupPolicy, replyToMode, threadReplies, + streaming, dmEnabled, dmPolicy, textLimit, @@ -231,6 +235,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam return async (roomId: string, event: MatrixRawEvent) => { const eventId = typeof event.event_id === "string" ? event.event_id.trim() : ""; let claimedInboundEvent = false; + let draftStreamRef: ReturnType | undefined; try { const eventType = event.type; if (eventType === EventType.RoomMessageEncrypted) { @@ -904,24 +909,202 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam }); }, }); + const streamingEnabled = streaming === "partial"; + const draftReplyToId = replyToMode !== "off" && !threadTarget ? _messageId : undefined; + let currentDraftReplyToId = draftReplyToId; + const draftStream = streamingEnabled + ? createMatrixDraftStream({ + roomId, + client, + cfg, + threadId: threadTarget, + replyToId: draftReplyToId, + preserveReplyId: replyToMode === "all", + accountId: _route.accountId, + log: logVerboseMessage, + }) + : undefined; + draftStreamRef = draftStream; + // Track how much of the full accumulated text has been materialized + // (delivered) so each new block only streams the new portion. + let materializedTextLength = 0; + let lastPartialFullTextLength = 0; + // Set after the first final payload consumes the draft event so + // subsequent finals go through normal delivery. + let draftConsumed = false; + const { dispatcher, replyOptions, markDispatchIdle, markRunComplete } = core.channel.reply.createReplyDispatcherWithTyping({ ...prefixOptions, humanDelay: core.channel.reply.resolveHumanDelayConfig(cfg, _route.agentId), - deliver: async (payload: ReplyPayload) => { - await deliverMatrixReplies({ - cfg, - replies: [payload], - roomId, - client, - runtime, - textLimit, - replyToMode, - threadId: threadTarget, - accountId: _route.accountId, - mediaLocalRoots, - tableMode, - }); + deliver: async (payload: ReplyPayload, info: { kind: string }) => { + if (draftStream && info.kind !== "tool" && !payload.isCompactionNotice) { + const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0; + + await draftStream.stop(); + + // After the first final payload consumes the draft, subsequent + // finals must go through normal delivery to avoid overwriting. + if (draftConsumed) { + await deliverMatrixReplies({ + cfg, + replies: [payload], + roomId, + client, + runtime, + textLimit, + replyToMode, + threadId: threadTarget, + accountId: _route.accountId, + mediaLocalRoots, + tableMode, + }); + return; + } + + // Read event id after stop() — flush may have created the + // initial message while draining pending text. + const draftEventId = draftStream.eventId(); + + // If the payload carries a reply target that differs from the + // draft's, fall through to normal delivery — Matrix edits + // cannot change the reply relation on an existing event. + // Skip when replyToMode is "off" (replies stripped anyway) + // or when threadTarget is set (thread relations take + // precedence over replyToId in deliverMatrixReplies). + const payloadReplyToId = payload.replyToId?.trim() || undefined; + const payloadReplyMismatch = + replyToMode !== "off" && + !threadTarget && + payloadReplyToId !== currentDraftReplyToId; + const mustDeliverFinalNormally = draftStream.mustDeliverFinalNormally(); + + if ( + draftEventId && + payload.text && + !hasMedia && + !payloadReplyMismatch && + !mustDeliverFinalNormally + ) { + // Text-only: final edit of the draft message. Skip if + // stop() already flushed identical text to avoid a + // redundant API call that wastes rate-limit budget. + if (payload.text !== draftStream.lastSentText()) { + try { + await editMessageMatrix(roomId, draftEventId, payload.text, { + client, + cfg, + threadId: threadTarget, + accountId: _route.accountId, + }); + } catch { + // Edit failed (rate limit, server error) — redact the + // stale draft and fall back to normal delivery so the + // user still gets the final answer. + await client.redactEvent(roomId, draftEventId).catch(() => {}); + await deliverMatrixReplies({ + cfg, + replies: [payload], + roomId, + client, + runtime, + textLimit, + replyToMode, + threadId: threadTarget, + accountId: _route.accountId, + mediaLocalRoots, + tableMode, + }); + } + } + draftConsumed = true; + } else if (draftEventId && hasMedia && !payloadReplyMismatch) { + // Media payload: finalize draft text, send media separately. + let textEditOk = !mustDeliverFinalNormally; + if (textEditOk && payload.text && payload.text !== draftStream.lastSentText()) { + textEditOk = await editMessageMatrix(roomId, draftEventId, payload.text, { + client, + cfg, + threadId: threadTarget, + accountId: _route.accountId, + }).then( + () => true, + () => false, + ); + } + const reusesDraftAsFinalText = Boolean(payload.text?.trim()) && textEditOk; + // If the text edit failed, or there is no final text to reuse + // the preview, redact the stale draft and include text in media + // delivery so the final caption is not lost. + if (!reusesDraftAsFinalText) { + await client.redactEvent(roomId, draftEventId).catch(() => {}); + } + await deliverMatrixReplies({ + cfg, + replies: [ + { ...payload, text: reusesDraftAsFinalText ? undefined : payload.text }, + ], + roomId, + client, + runtime, + textLimit, + replyToMode, + threadId: threadTarget, + accountId: _route.accountId, + mediaLocalRoots, + tableMode, + }); + draftConsumed = true; + } else { + // Redact stale draft when the final delivery will create a + // new message (reply-target mismatch, preview overflow, or no + // usable draft). + if (draftEventId && (payloadReplyMismatch || mustDeliverFinalNormally)) { + await client.redactEvent(roomId, draftEventId).catch(() => {}); + } + await deliverMatrixReplies({ + cfg, + replies: [payload], + roomId, + client, + runtime, + textLimit, + replyToMode, + threadId: threadTarget, + accountId: _route.accountId, + mediaLocalRoots, + tableMode, + }); + } + + // Only reset for intermediate blocks — after the final delivery + // the stream must stay stopped so late async callbacks cannot + // create ghost messages. + if (info.kind === "block") { + materializedTextLength = lastPartialFullTextLength; + draftConsumed = false; + draftStream.reset(); + currentDraftReplyToId = replyToMode === "all" ? draftReplyToId : undefined; + + // Re-assert typing so the user still sees the indicator while + // the next block generates. + await sendTypingMatrix(roomId, true, undefined, client).catch(() => {}); + } + } else { + await deliverMatrixReplies({ + cfg, + replies: [payload], + roomId, + client, + runtime, + textLimit, + replyToMode, + threadId: threadTarget, + accountId: _route.accountId, + mediaLocalRoots, + tableMode, + }); + } }, onError: (err: unknown, info: { kind: "tool" | "block" | "final" }) => { if (info.kind === "final") { @@ -949,6 +1132,28 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam replyOptions: { ...replyOptions, skillFilter: roomConfig?.skills, + // When streaming is active, disable block streaming — draft + // streaming replaces it with edit-in-place updates. + disableBlockStreaming: streamingEnabled ? true : undefined, + onPartialReply: draftStream + ? (payload) => { + const fullText = payload.text ?? ""; + lastPartialFullTextLength = fullText.length; + const blockText = fullText.slice(materializedTextLength); + if (blockText) { + draftStream.update(blockText); + } + } + : undefined, + // Reset text offset on assistant message boundaries so + // post-tool blocks stream correctly (payload.text resets + // per assistant message upstream). + onAssistantMessageStart: draftStream + ? () => { + materializedTextLength = 0; + lastPartialFullTextLength = 0; + } + : undefined, onModelSelected, }, }); @@ -981,6 +1186,11 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam } catch (err) { runtime.error?.(`matrix handler failed: ${String(err)}`); } finally { + // Stop the draft stream timer so partial drafts don't leak if the + // model run throws or times out mid-stream. + if (draftStreamRef) { + await draftStreamRef.stop().catch(() => {}); + } if (claimedInboundEvent && inboundDeduper && eventId) { inboundDeduper.releaseEvent({ roomId, eventId }); } diff --git a/extensions/matrix/src/matrix/monitor/index.ts b/extensions/matrix/src/matrix/monitor/index.ts index 035c346b642..9917ac7f309 100644 --- a/extensions/matrix/src/matrix/monitor/index.ts +++ b/extensions/matrix/src/matrix/monitor/index.ts @@ -199,6 +199,8 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi const textLimit = core.channel.text.resolveTextChunkLimit(cfg, "matrix", account.accountId); const mediaMaxMb = opts.mediaMaxMb ?? accountConfig.mediaMaxMb ?? DEFAULT_MEDIA_MAX_MB; const mediaMaxBytes = Math.max(1, mediaMaxMb) * 1024 * 1024; + const streaming: "partial" | "off" = + accountConfig.streaming === true || accountConfig.streaming === "partial" ? "partial" : "off"; const startupMs = Date.now(); const startupGraceMs = 0; // Cold starts should ignore old room history, but once we have a persisted @@ -227,6 +229,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi groupPolicy, replyToMode, threadReplies, + streaming, dmEnabled, dmPolicy, textLimit, diff --git a/extensions/matrix/src/matrix/monitor/replies.test.ts b/extensions/matrix/src/matrix/monitor/replies.test.ts index 92146fa4901..4534d7becc4 100644 --- a/extensions/matrix/src/matrix/monitor/replies.test.ts +++ b/extensions/matrix/src/matrix/monitor/replies.test.ts @@ -3,8 +3,18 @@ import type { PluginRuntime, RuntimeEnv } from "../../../runtime-api.js"; import type { MatrixClient } from "../sdk.js"; const sendMessageMatrixMock = vi.hoisted(() => vi.fn().mockResolvedValue({ messageId: "mx-1" })); +const chunkMatrixTextMock = vi.hoisted(() => + vi.fn((text: string, _opts?: unknown) => ({ + trimmedText: text.trim(), + convertedText: text, + singleEventLimit: 4000, + fitsInSingleEvent: true, + chunks: text ? [text] : [], + })), +); vi.mock("../send.js", () => ({ + chunkMatrixText: (text: string, opts?: unknown) => chunkMatrixTextMock(text, opts), sendMessageMatrix: (to: string, message: string, opts?: unknown) => sendMessageMatrixMock(to, message, opts), })); @@ -48,11 +58,23 @@ describe("deliverMatrixReplies", () => { beforeEach(() => { vi.clearAllMocks(); setMatrixRuntime(runtimeStub); - chunkMarkdownTextWithModeMock.mockImplementation((text: string) => [text]); + chunkMatrixTextMock.mockReset().mockImplementation((text: string) => ({ + trimmedText: text.trim(), + convertedText: text, + singleEventLimit: 4000, + fitsInSingleEvent: true, + chunks: text ? [text] : [], + })); }); it("keeps replyToId on first reply only when replyToMode=first", async () => { - chunkMarkdownTextWithModeMock.mockImplementation((text: string) => text.split("|")); + chunkMatrixTextMock.mockImplementation((text: string) => ({ + trimmedText: text.trim(), + convertedText: text, + singleEventLimit: 4000, + fitsInSingleEvent: true, + chunks: text.split("|"), + })); await deliverMatrixReplies({ cfg, @@ -124,7 +146,13 @@ describe("deliverMatrixReplies", () => { }); it("suppresses replyToId when threadId is set", async () => { - chunkMarkdownTextWithModeMock.mockImplementation((text: string) => text.split("|")); + chunkMatrixTextMock.mockImplementation((text: string) => ({ + trimmedText: text.trim(), + convertedText: text, + singleEventLimit: 4000, + fitsInSingleEvent: true, + chunks: text.split("|"), + })); await deliverMatrixReplies({ cfg, @@ -197,7 +225,11 @@ describe("deliverMatrixReplies", () => { }); expect(loadConfigMock).not.toHaveBeenCalled(); - expect(resolveChunkModeMock).toHaveBeenCalledWith(explicitCfg, "matrix", "ops"); + expect(chunkMatrixTextMock).toHaveBeenCalledWith("hello", { + cfg: explicitCfg, + accountId: "ops", + tableMode: "code", + }); expect(sendMessageMatrixMock).toHaveBeenCalledWith( "room:4", "hello", @@ -208,4 +240,26 @@ describe("deliverMatrixReplies", () => { }), ); }); + + it("passes raw media captions through to sendMessageMatrix without pre-converting them", async () => { + convertMarkdownTablesMock.mockImplementation((text: string) => `converted:${text}`); + + await deliverMatrixReplies({ + cfg, + replies: [{ text: "caption", mediaUrl: "https://example.com/a.jpg" }], + roomId: "room:6", + client: {} as MatrixClient, + runtime: runtimeEnv, + textLimit: 4000, + replyToMode: "off", + }); + + expect(sendMessageMatrixMock).toHaveBeenCalledWith( + "room:6", + "caption", + expect.objectContaining({ + mediaUrl: "https://example.com/a.jpg", + }), + ); + }); }); diff --git a/extensions/matrix/src/matrix/monitor/replies.ts b/extensions/matrix/src/matrix/monitor/replies.ts index 182d7d208f5..5e4f5d3f821 100644 --- a/extensions/matrix/src/matrix/monitor/replies.ts +++ b/extensions/matrix/src/matrix/monitor/replies.ts @@ -6,7 +6,7 @@ import type { } from "../../runtime-api.js"; import { getMatrixRuntime } from "../../runtime.js"; import type { MatrixClient } from "../sdk.js"; -import { sendMessageMatrix } from "../send.js"; +import { chunkMatrixText, sendMessageMatrix } from "../send.js"; const THINKING_TAG_RE = /<\s*\/?\s*(?:think(?:ing)?|thought|antthinking)\b[^<>]*>/gi; const THINKING_BLOCK_RE = @@ -59,8 +59,6 @@ export async function deliverMatrixReplies(params: { params.runtime.log?.(message); } }; - const chunkLimit = Math.min(params.textLimit, 4000); - const chunkMode = core.channel.text.resolveChunkMode(params.cfg, "matrix", params.accountId); let hasReplied = false; for (const reply of params.replies) { if (reply.isReasoning === true || shouldSuppressReasoningReplyText(reply.text)) { @@ -79,7 +77,6 @@ export async function deliverMatrixReplies(params: { const replyToIdRaw = reply.replyToId?.trim(); const replyToId = params.threadId || params.replyToMode === "off" ? undefined : replyToIdRaw; const rawText = reply.text ?? ""; - const text = core.channel.text.convertMarkdownTables(rawText, tableMode); const mediaList = reply.mediaUrls?.length ? reply.mediaUrls : reply.mediaUrl @@ -92,11 +89,12 @@ export async function deliverMatrixReplies(params: { if (mediaList.length === 0) { let sentTextChunk = false; - for (const chunk of core.channel.text.chunkMarkdownTextWithMode( - text, - chunkLimit, - chunkMode, - )) { + const { chunks } = chunkMatrixText(rawText, { + cfg: params.cfg, + accountId: params.accountId, + tableMode, + }); + for (const chunk of chunks) { const trimmed = chunk.trim(); if (!trimmed) { continue; @@ -118,7 +116,7 @@ export async function deliverMatrixReplies(params: { let first = true; for (const mediaUrl of mediaList) { - const caption = first ? text : ""; + const caption = first ? rawText : ""; await sendMessageMatrix(params.roomId, caption, { client: params.client, cfg: params.cfg, diff --git a/extensions/matrix/src/matrix/send.test.ts b/extensions/matrix/src/matrix/send.test.ts index 8f91f2e6374..45c70a39b19 100644 --- a/extensions/matrix/src/matrix/send.test.ts +++ b/extensions/matrix/src/matrix/send.test.ts @@ -17,6 +17,8 @@ const isVoiceCompatibleAudioMock = vi.fn( const resolveTextChunkLimitMock = vi.fn< (cfg: unknown, channel: unknown, accountId?: unknown) => number >(() => 4000); +const resolveMarkdownTableModeMock = vi.fn(() => "code"); +const convertMarkdownTablesMock = vi.fn((text: string) => text); const runtimeStub = { config: { @@ -37,13 +39,14 @@ const runtimeStub = { resolveChunkMode: () => "length", chunkMarkdownText: (text: string) => (text ? [text] : []), chunkMarkdownTextWithMode: (text: string) => (text ? [text] : []), - resolveMarkdownTableMode: () => "code", - convertMarkdownTables: (text: string) => text, + resolveMarkdownTableMode: () => resolveMarkdownTableModeMock(), + convertMarkdownTables: (text: string) => convertMarkdownTablesMock(text), }, }, } as unknown as PluginRuntime; let sendMessageMatrix: typeof import("./send.js").sendMessageMatrix; +let sendSingleTextMessageMatrix: typeof import("./send.js").sendSingleTextMessageMatrix; let sendTypingMatrix: typeof import("./send.js").sendTypingMatrix; let voteMatrixPoll: typeof import("./actions/polls.js").voteMatrixPoll; @@ -52,6 +55,7 @@ async function loadMatrixSendModules() { const runtimeModule = await import("../runtime.js"); runtimeModule.setMatrixRuntime(runtimeStub); ({ sendMessageMatrix } = await import("./send.js")); + ({ sendSingleTextMessageMatrix } = await import("./send.js")); ({ sendTypingMatrix } = await import("./send.js")); ({ voteMatrixPoll } = await import("./actions/polls.js")); } @@ -127,6 +131,8 @@ describe("sendMessageMatrix media", () => { mediaKindFromMimeMock.mockReset().mockReturnValue("image"); isVoiceCompatibleAudioMock.mockReset().mockReturnValue(false); resolveTextChunkLimitMock.mockReset().mockReturnValue(4000); + resolveMarkdownTableModeMock.mockReset().mockReturnValue("code"); + convertMarkdownTablesMock.mockReset().mockImplementation((text: string) => text); await loadMatrixSendModules(); }); @@ -404,6 +410,29 @@ describe("sendMessageMatrix threads", () => { }); }); +describe("sendSingleTextMessageMatrix", () => { + beforeEach(async () => { + vi.clearAllMocks(); + await resetMatrixSendRuntimeMocks(); + resolveMarkdownTableModeMock.mockReset().mockReturnValue("code"); + convertMarkdownTablesMock.mockReset().mockImplementation((text: string) => text); + }); + + it("rejects single-event sends when converted text exceeds the Matrix limit", async () => { + const { client, sendMessage } = makeClient(); + resolveTextChunkLimitMock.mockReturnValue(5); + convertMarkdownTablesMock.mockImplementation(() => "123456"); + + await expect( + sendSingleTextMessageMatrix("room:!room:example", "1234", { + client, + }), + ).rejects.toThrow("Matrix single-message text exceeds limit"); + + expect(sendMessage).not.toHaveBeenCalled(); + }); +}); + describe("voteMatrixPoll", () => { beforeAll(async () => { await loadMatrixSendModules(); diff --git a/extensions/matrix/src/matrix/send.ts b/extensions/matrix/src/matrix/send.ts index 9ffd22061a1..be905f065ce 100644 --- a/extensions/matrix/src/matrix/send.ts +++ b/extensions/matrix/src/matrix/send.ts @@ -1,4 +1,4 @@ -import type { PollInput } from "../runtime-api.js"; +import type { MarkdownTableMode, PollInput } from "../runtime-api.js"; import { getMatrixRuntime } from "../runtime.js"; import type { CoreConfig } from "../types.js"; import { buildPollStartContent, M_POLL_START } from "./poll-types.js"; @@ -22,6 +22,7 @@ import { normalizeThreadId, resolveMatrixRoomId } from "./send/targets.js"; import { EventType, MsgType, + RelationType, type MatrixOutboundContent, type MatrixSendOpts, type MatrixSendResult, @@ -33,6 +34,17 @@ const getCore = () => getMatrixRuntime(); export type { MatrixSendOpts, MatrixSendResult } from "./send/types.js"; export { resolveMatrixRoomId } from "./send/targets.js"; +export type MatrixPreparedSingleText = { + trimmedText: string; + convertedText: string; + singleEventLimit: number; + fitsInSingleEvent: boolean; +}; + +export type MatrixPreparedChunkedText = MatrixPreparedSingleText & { + chunks: string[]; +}; + type MatrixClientResolveOpts = { client?: MatrixClient; cfg?: CoreConfig; @@ -61,6 +73,57 @@ function normalizeMatrixClientResolveOpts( }; } +export function prepareMatrixSingleText( + text: string, + opts: { + cfg?: CoreConfig; + accountId?: string; + tableMode?: MarkdownTableMode; + } = {}, +): MatrixPreparedSingleText { + const trimmedText = text.trim(); + const cfg = opts.cfg ?? getCore().config.loadConfig(); + const tableMode = + opts.tableMode ?? + getCore().channel.text.resolveMarkdownTableMode({ + cfg, + channel: "matrix", + accountId: opts.accountId, + }); + const convertedText = getCore().channel.text.convertMarkdownTables(trimmedText, tableMode); + const singleEventLimit = Math.min( + getCore().channel.text.resolveTextChunkLimit(cfg, "matrix", opts.accountId), + MATRIX_TEXT_LIMIT, + ); + return { + trimmedText, + convertedText, + singleEventLimit, + fitsInSingleEvent: convertedText.length <= singleEventLimit, + }; +} + +export function chunkMatrixText( + text: string, + opts: { + cfg?: CoreConfig; + accountId?: string; + tableMode?: MarkdownTableMode; + } = {}, +): MatrixPreparedChunkedText { + const preparedText = prepareMatrixSingleText(text, opts); + const cfg = opts.cfg ?? getCore().config.loadConfig(); + const chunkMode = getCore().channel.text.resolveChunkMode(cfg, "matrix", opts.accountId); + return { + ...preparedText, + chunks: getCore().channel.text.chunkMarkdownTextWithMode( + preparedText.convertedText, + preparedText.singleEventLimit, + chunkMode, + ), + }; +} + export async function sendMessageMatrix( to: string, message: string | undefined, @@ -80,23 +143,10 @@ export async function sendMessageMatrix( async (client) => { const roomId = await resolveMatrixRoomId(client, to); const cfg = opts.cfg ?? getCore().config.loadConfig(); - const tableMode = getCore().channel.text.resolveMarkdownTableMode({ + const { chunks } = chunkMatrixText(trimmedMessage, { cfg, - channel: "matrix", accountId: opts.accountId, }); - const convertedMessage = getCore().channel.text.convertMarkdownTables( - trimmedMessage, - tableMode, - ); - const textLimit = getCore().channel.text.resolveTextChunkLimit(cfg, "matrix", opts.accountId); - const chunkLimit = Math.min(textLimit, MATRIX_TEXT_LIMIT); - const chunkMode = getCore().channel.text.resolveChunkMode(cfg, "matrix", opts.accountId); - const chunks = getCore().channel.text.chunkMarkdownTextWithMode( - convertedMessage, - chunkLimit, - chunkMode, - ); const threadId = normalizeThreadId(opts.threadId); const relation = threadId ? buildThreadRelation(threadId, opts.replyToId) @@ -256,6 +306,109 @@ export async function sendReadReceiptMatrix( }); } +export async function sendSingleTextMessageMatrix( + roomId: string, + text: string, + opts: { + client?: MatrixClient; + cfg?: CoreConfig; + replyToId?: string; + threadId?: string; + accountId?: string; + } = {}, +): Promise { + const { trimmedText, convertedText, singleEventLimit, fitsInSingleEvent } = + prepareMatrixSingleText(text, { + cfg: opts.cfg, + accountId: opts.accountId, + }); + if (!trimmedText) { + throw new Error("Matrix single-message send requires text"); + } + if (!fitsInSingleEvent) { + throw new Error( + `Matrix single-message text exceeds limit (${convertedText.length} > ${singleEventLimit})`, + ); + } + return await withResolvedMatrixClient( + { + client: opts.client, + cfg: opts.cfg, + accountId: opts.accountId, + }, + async (client) => { + const resolvedRoom = await resolveMatrixRoomId(client, roomId); + const normalizedThreadId = normalizeThreadId(opts.threadId); + const relation = normalizedThreadId + ? buildThreadRelation(normalizedThreadId, opts.replyToId) + : buildReplyRelation(opts.replyToId); + const content = buildTextContent(convertedText, relation); + const eventId = await client.sendMessage(resolvedRoom, content); + return { + messageId: eventId ?? "unknown", + roomId: resolvedRoom, + }; + }, + ); +} + +export async function editMessageMatrix( + roomId: string, + originalEventId: string, + newText: string, + opts: { + client?: MatrixClient; + cfg?: CoreConfig; + threadId?: string; + accountId?: string; + } = {}, +): Promise { + return await withResolvedMatrixClient( + { + client: opts.client, + cfg: opts.cfg, + accountId: opts.accountId, + }, + async (client) => { + const resolvedRoom = await resolveMatrixRoomId(client, roomId); + const cfg = opts.cfg ?? getCore().config.loadConfig(); + const tableMode = getCore().channel.text.resolveMarkdownTableMode({ + cfg, + channel: "matrix", + accountId: opts.accountId, + }); + const convertedText = getCore().channel.text.convertMarkdownTables(newText, tableMode); + const newContent = buildTextContent(convertedText); + + const replaceRelation: Record = { + rel_type: RelationType.Replace, + event_id: originalEventId, + }; + const threadId = normalizeThreadId(opts.threadId); + if (threadId) { + // Thread-aware replace: Synapse needs the thread context to keep the + // edited event visible in the thread timeline. + replaceRelation["m.in_reply_to"] = { event_id: threadId }; + } + + // Spread newContent into the outer event so clients that don't support + // m.new_content still see properly formatted text (with HTML). + const content: Record = { + ...newContent, + body: `* ${convertedText}`, + ...(typeof newContent.formatted_body === "string" + ? { formatted_body: `* ${newContent.formatted_body}` } + : {}), + "m.new_content": newContent, + "m.relates_to": replaceRelation, + }; + + const eventId = await client.sendMessage(resolvedRoom, content); + return eventId ?? ""; + }, + ); +} + export async function reactMatrixMessage( roomId: string, messageId: string, diff --git a/extensions/matrix/src/types.ts b/extensions/matrix/src/types.ts index b45a97141cd..d168fd9dc0d 100644 --- a/extensions/matrix/src/types.ts +++ b/extensions/matrix/src/types.ts @@ -133,6 +133,14 @@ export type MatrixConfig = { rooms?: Record; /** Per-action tool gating (default: true for all). */ actions?: MatrixActionConfig; + /** + * Streaming mode for Matrix replies. + * - `"partial"`: edit a single message in place as the model generates text. + * - `"off"`: deliver the full reply once the model finishes. + * - `true` maps to `"partial"`, `false` maps to `"off"`. + * Default: `"off"`. + */ + streaming?: "partial" | "off" | boolean; }; export type CoreConfig = { diff --git a/src/plugins/bundled-plugin-metadata.generated.ts b/src/plugins/bundled-plugin-metadata.generated.ts index 97a09e37711..35711b3c902 100644 --- a/src/plugins/bundled-plugin-metadata.generated.ts +++ b/src/plugins/bundled-plugin-metadata.generated.ts @@ -8532,6 +8532,17 @@ export const GENERATED_BUNDLED_PLUGIN_METADATA = [ type: "string", enum: ["open", "disabled", "allowlist"], }, + streaming: { + anyOf: [ + { + type: "string", + enum: ["partial", "off"], + }, + { + type: "boolean", + }, + ], + }, replyToMode: { type: "string", enum: ["off", "first", "all"],