From efd52d28f36cd8ac55d1b364fecfec6a171197f7 Mon Sep 17 00:00:00 2001 From: Gustavo Madeira Santana Date: Sun, 5 Apr 2026 15:18:18 -0400 Subject: [PATCH] fix(matrix): split partial and quiet preview modes --- docs/channels/matrix.md | 26 ++++---- extensions/matrix/src/config-schema.test.ts | 9 +++ extensions/matrix/src/config-schema.ts | 2 +- .../matrix/src/matrix/draft-stream.test.ts | 30 +++++++-- extensions/matrix/src/matrix/draft-stream.ts | 27 ++++++-- .../matrix/monitor/handler.test-helpers.ts | 4 +- .../matrix/src/matrix/monitor/handler.test.ts | 61 ++++++++++++++++++- .../matrix/src/matrix/monitor/handler.ts | 17 ++++-- extensions/matrix/src/matrix/monitor/index.ts | 8 ++- extensions/matrix/src/types.ts | 12 +++- ...ndled-channel-config-metadata.generated.ts | 2 +- 11 files changed, 160 insertions(+), 38 deletions(-) diff --git a/docs/channels/matrix.md b/docs/channels/matrix.md index 109898669b6..baf798c8adb 100644 --- a/docs/channels/matrix.md +++ b/docs/channels/matrix.md @@ -178,9 +178,9 @@ This is a practical baseline config with DM pairing, room allowlist, and E2EE en Matrix reply streaming is opt-in. -Set `channels.matrix.streaming` to `"partial"` when you want OpenClaw to send a single draft reply, -edit that draft in place while the model is generating text, and then finalize it when the reply is -done: +Set `channels.matrix.streaming` to `"partial"` when you want OpenClaw to send a single live preview +reply, edit that preview in place while the model is generating text, and then finalize it when the +reply is done: ```json5 { @@ -193,26 +193,26 @@ done: ``` - `streaming: "off"` is the default. OpenClaw waits for the final reply and sends it once. -- `streaming: "partial"` creates one editable preview message for the current assistant block instead of sending multiple partial messages. -- `blockStreaming: true` enables separate Matrix progress messages. With `streaming: "partial"`, Matrix keeps the live draft for the current block and preserves completed blocks as separate messages. -- When `streaming: "partial"` and `blockStreaming` is off, Matrix only edits the live draft and sends the completed reply once that block or turn finishes. -- Draft preview events use quiet Matrix notices. On stock Matrix push rules, notice previews and later edit events are both non-notifying. +- `streaming: "partial"` creates one editable preview message for the current assistant block using normal Matrix text messages. This preserves Matrix's legacy preview-first notification behavior, so stock clients may notify on the first streamed preview text instead of the finished block. +- `streaming: "quiet"` creates one editable quiet preview notice for the current assistant block. Use this only when you also configure recipient push rules for finalized preview edits. +- `blockStreaming: true` enables separate Matrix progress messages. With preview streaming enabled, Matrix keeps the live draft for the current block and preserves completed blocks as separate messages. +- When preview streaming is on and `blockStreaming` is off, Matrix edits the live draft in place and finalizes that same event when the block or turn finishes. - If the preview no longer fits in one Matrix event, OpenClaw stops preview streaming and falls back to normal final delivery. - Media replies still send attachments normally. If a stale preview can no longer be reused safely, OpenClaw redacts it before sending the final media reply. - Preview edits cost extra Matrix API calls. Leave streaming off if you want the most conservative rate-limit behavior. `blockStreaming` does not enable draft previews by itself. -Use `streaming: "partial"` for preview edits; then add `blockStreaming: true` only if you also want completed assistant blocks to remain visible as separate progress messages. +Use `streaming: "partial"` or `streaming: "quiet"` for preview edits; then add `blockStreaming: true` only if you also want completed assistant blocks to remain visible as separate progress messages. -If you need notifications without custom Matrix push rules, leave `streaming` off. Then: +If you need stock Matrix notifications without custom push rules, use `streaming: "partial"` for preview-first behavior or leave `streaming` off for final-only delivery. With `streaming: "off"`: - `blockStreaming: true` sends each finished block as a normal notifying Matrix message. - `blockStreaming: false` sends only the final completed reply as a normal notifying Matrix message. -### Self-hosted push rules for finalized previews +### Self-hosted push rules for quiet finalized previews -If you run your own Matrix infrastructure and want `streaming: "partial"` previews to notify only when a -block or final reply is done, add a per-user push rule for finalized preview edits. +If you run your own Matrix infrastructure and want quiet previews to notify only when a block or +final reply is done, set `streaming: "quiet"` and add a per-user push rule for finalized preview edits. OpenClaw marks finalized text-only preview edits with: @@ -906,7 +906,7 @@ Live directory lookup uses the logged-in Matrix account: - `historyLimit`: max room messages to include as group history context. Falls back to `messages.groupChat.historyLimit`. Set `0` to disable. - `replyToMode`: `off`, `first`, or `all`. - `markdown`: optional Markdown rendering configuration for outbound Matrix text. -- `streaming`: `off` (default), `partial`, `true`, or `false`. `partial` and `true` enable single-message draft previews with edit-in-place updates. +- `streaming`: `off` (default), `partial`, `quiet`, `true`, or `false`. `partial` and `true` enable preview-first draft updates with normal Matrix text messages. `quiet` uses non-notifying preview notices for self-hosted push-rule setups. - `blockStreaming`: `true` enables separate progress messages for completed assistant blocks while draft preview streaming is active. - `threadReplies`: `off`, `inbound`, or `always`. - `threadBindings`: per-channel overrides for thread-bound session routing and lifecycle. diff --git a/extensions/matrix/src/config-schema.test.ts b/extensions/matrix/src/config-schema.test.ts index b9c11aa3a02..b820d0e2ad3 100644 --- a/extensions/matrix/src/config-schema.test.ts +++ b/extensions/matrix/src/config-schema.test.ts @@ -78,4 +78,13 @@ describe("MatrixConfigSchema SecretInput", () => { } expect(result.data.rooms?.["!room:example.org"]?.account).toBe("axis"); }); + + it("accepts quiet Matrix streaming mode", () => { + const result = MatrixConfigSchema.safeParse({ + homeserver: "https://matrix.example.org", + accessToken: "token", + streaming: "quiet", + }); + expect(result.success).toBe(true); + }); }); diff --git a/extensions/matrix/src/config-schema.ts b/extensions/matrix/src/config-schema.ts index e2d03713cb2..2bbc360712e 100644 --- a/extensions/matrix/src/config-schema.ts +++ b/extensions/matrix/src/config-schema.ts @@ -84,7 +84,7 @@ export const MatrixConfigSchema = z.object({ groupPolicy: GroupPolicySchema.optional(), contextVisibility: ContextVisibilityModeSchema.optional(), blockStreaming: z.boolean().optional(), - streaming: z.union([z.enum(["partial", "off"]), z.boolean()]).optional(), + streaming: z.union([z.enum(["partial", "quiet", "off"]), z.boolean()]).optional(), replyToMode: z.enum(["off", "first", "all", "batched"]).optional(), threadReplies: z.enum(["off", "inbound", "always"]).optional(), textChunkLimit: z.number().optional(), diff --git a/extensions/matrix/src/matrix/draft-stream.test.ts b/extensions/matrix/src/matrix/draft-stream.test.ts index 4c6567779dc..b5b5bbab980 100644 --- a/extensions/matrix/src/matrix/draft-stream.test.ts +++ b/extensions/matrix/src/matrix/draft-stream.test.ts @@ -69,7 +69,7 @@ describe("createMatrixDraftStream", () => { vi.useRealTimers(); }); - it("sends a new message on first update", async () => { + it("sends a normal text preview on first partial update", async () => { const stream = createMatrixDraftStream({ roomId: "!room:test", client, @@ -81,17 +81,35 @@ describe("createMatrixDraftStream", () => { expect(sendMessageMock).toHaveBeenCalledTimes(1); expect(sendMessageMock.mock.calls[0]?.[1]).toMatchObject({ - msgtype: "m.notice", + msgtype: "m.text", }); - expect(sendMessageMock.mock.calls[0]?.[1]).not.toHaveProperty("m.mentions"); expect(stream.eventId()).toBe("$evt1"); }); - it("edits the message on subsequent updates", async () => { + it("sends quiet preview notices when quiet mode is enabled", async () => { const stream = createMatrixDraftStream({ roomId: "!room:test", client, cfg: {} as import("../types.js").CoreConfig, + mode: "quiet", + }); + + stream.update("Hello"); + await stream.flush(); + + expect(sendMessageMock).toHaveBeenCalledTimes(1); + expect(sendMessageMock.mock.calls[0]?.[1]).toMatchObject({ + msgtype: "m.notice", + }); + expect(sendMessageMock.mock.calls[0]?.[1]).not.toHaveProperty("m.mentions"); + }); + + it("edits the message on subsequent quiet updates", async () => { + const stream = createMatrixDraftStream({ + roomId: "!room:test", + client, + cfg: {} as import("../types.js").CoreConfig, + mode: "quiet", }); stream.update("Hello"); @@ -112,11 +130,12 @@ describe("createMatrixDraftStream", () => { }); }); - it("coalesces rapid updates within throttle window", async () => { + it("coalesces rapid quiet updates within throttle window", async () => { const stream = createMatrixDraftStream({ roomId: "!room:test", client, cfg: {} as import("../types.js").CoreConfig, + mode: "quiet", }); stream.update("A"); @@ -191,6 +210,7 @@ describe("createMatrixDraftStream", () => { roomId: "!room:test", client, cfg: {} as import("../types.js").CoreConfig, + mode: "quiet", }); stream.update("Block 1"); diff --git a/extensions/matrix/src/matrix/draft-stream.ts b/extensions/matrix/src/matrix/draft-stream.ts index 333f219023f..cbf322a504e 100644 --- a/extensions/matrix/src/matrix/draft-stream.ts +++ b/extensions/matrix/src/matrix/draft-stream.ts @@ -5,7 +5,22 @@ import { editMessageMatrix, prepareMatrixSingleText, sendSingleTextMessageMatrix import { MsgType } from "./send/types.js"; const DEFAULT_THROTTLE_MS = 1000; -const DRAFT_PREVIEW_MSGTYPE = MsgType.Notice; +type MatrixDraftPreviewMode = "partial" | "quiet"; + +function resolveDraftPreviewOptions(mode: MatrixDraftPreviewMode): { + msgtype: typeof MsgType.Text | typeof MsgType.Notice; + includeMentions?: boolean; +} { + if (mode === "quiet") { + return { + msgtype: MsgType.Notice, + includeMentions: false, + }; + } + return { + msgtype: MsgType.Text, + }; +} export type MatrixDraftStream = { /** Update the draft with the latest accumulated text for the current block. */ @@ -28,6 +43,7 @@ export function createMatrixDraftStream(params: { roomId: string; client: MatrixClient; cfg: CoreConfig; + mode?: MatrixDraftPreviewMode; threadId?: string; replyToId?: string; /** When true, reset() restores the original replyToId instead of clearing it. */ @@ -36,6 +52,7 @@ export function createMatrixDraftStream(params: { log?: (message: string) => void; }): MatrixDraftStream { const { roomId, client, cfg, threadId, accountId, log } = params; + const preview = resolveDraftPreviewOptions(params.mode ?? "partial"); let currentEventId: string | undefined; let lastSentText = ""; @@ -75,8 +92,8 @@ export function createMatrixDraftStream(params: { replyToId, threadId, accountId, - msgtype: DRAFT_PREVIEW_MSGTYPE, - includeMentions: false, + msgtype: preview.msgtype, + includeMentions: preview.includeMentions, }); currentEventId = result.messageId; lastSentText = preparedText.trimmedText; @@ -87,8 +104,8 @@ export function createMatrixDraftStream(params: { cfg, threadId, accountId, - msgtype: DRAFT_PREVIEW_MSGTYPE, - includeMentions: false, + msgtype: preview.msgtype, + includeMentions: preview.includeMentions, }); lastSentText = preparedText.trimmedText; } diff --git a/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts b/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts index 393b64ab682..dbb7614348e 100644 --- a/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts +++ b/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts @@ -1,6 +1,6 @@ import { vi } from "vitest"; import type { RuntimeEnv, RuntimeLogger } from "../../runtime-api.js"; -import type { MatrixRoomConfig, ReplyToMode } from "../../types.js"; +import type { MatrixRoomConfig, MatrixStreamingMode, ReplyToMode } from "../../types.js"; import type { MatrixClient } from "../sdk.js"; import { createMatrixRoomMessageHandler, type MatrixMonitorHandlerParams } from "./handler.js"; import { EventType, type MatrixRawEvent, type RoomMessageEventContent } from "./types.js"; @@ -32,7 +32,7 @@ type MatrixHandlerTestHarnessOptions = { threadReplies?: "off" | "inbound" | "always"; dmThreadReplies?: "off" | "inbound" | "always"; dmSessionScope?: "per-user" | "per-room"; - streaming?: "partial" | "off"; + streaming?: MatrixStreamingMode; blockStreamingEnabled?: boolean; dmEnabled?: boolean; dmPolicy?: "pairing" | "allowlist" | "open" | "disabled"; diff --git a/extensions/matrix/src/matrix/monitor/handler.test.ts b/extensions/matrix/src/matrix/monitor/handler.test.ts index 17a7b3683d5..fd3514c1aeb 100644 --- a/extensions/matrix/src/matrix/monitor/handler.test.ts +++ b/extensions/matrix/src/matrix/monitor/handler.test.ts @@ -1989,6 +1989,7 @@ describe("matrix monitor handler draft streaming", () => { function createStreamingHarness(opts?: { replyToMode?: "off" | "first" | "all" | "batched"; blockStreamingEnabled?: boolean; + streaming?: "partial" | "quiet"; }) { let capturedDeliver: DeliverFn | undefined; let capturedReplyOpts: ReplyOpts | undefined; @@ -2008,7 +2009,7 @@ describe("matrix monitor handler draft streaming", () => { const redactEventMock = vi.fn(async () => "$redacted"); const { handler } = createMatrixHandlerTestHarness({ - streaming: "partial", + streaming: opts?.streaming ?? "quiet", blockStreamingEnabled: opts?.blockStreamingEnabled ?? false, replyToMode: opts?.replyToMode ?? "off", client: { redactEvent: redactEventMock }, @@ -2094,6 +2095,41 @@ describe("matrix monitor handler draft streaming", () => { await finish(); }); + it("keeps partial preview-first finalization free of quiet-preview markers", async () => { + const { dispatch, redactEventMock } = createStreamingHarness({ + blockStreamingEnabled: true, + streaming: "partial", + }); + const { deliver, opts, finish } = await dispatch(); + + opts.onPartialReply?.({ text: "Single block" }); + await vi.waitFor(() => { + expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1); + }); + + expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledWith( + "!room:example.org", + "Single block", + expect.not.objectContaining({ + msgtype: "m.notice", + includeMentions: false, + }), + ); + + await deliver({ text: "Single block" }, { kind: "final" }); + + expect(editMessageMatrixMock).toHaveBeenCalledWith( + "!room:example.org", + "$draft1", + "Single block", + expect.not.objectContaining({ + extraContent: { [MATRIX_OPENCLAW_FINALIZED_PREVIEW_KEY]: true }, + }), + ); + expect(redactEventMock).not.toHaveBeenCalled(); + await finish(); + }); + it("preserves completed blocks by rotating to a new quiet preview", async () => { const { dispatch, redactEventMock } = createStreamingHarness({ blockStreamingEnabled: true }); const { deliver, opts, finish } = await dispatch(); @@ -2470,7 +2506,7 @@ describe("matrix monitor handler draft streaming", () => { let capturedReplyOpts: ReplyOpts | undefined; const { handler } = createMatrixHandlerTestHarness({ - streaming: "partial", + streaming: "quiet", createReplyDispatcherWithTyping: () => ({ dispatcher: { markComplete: () => {}, waitForIdle: async () => {} }, replyOptions: {}, @@ -2666,6 +2702,27 @@ describe("matrix monitor handler block streaming config", () => { expect(capturedDisableBlockStreaming).toBe(true); }); + it("keeps block streaming disabled when quiet previews are on and block streaming is off", async () => { + let capturedDisableBlockStreaming: boolean | undefined; + + const { handler } = createMatrixHandlerTestHarness({ + streaming: "quiet", + dispatchReplyFromConfig: vi.fn( + async (args: { replyOptions?: { disableBlockStreaming?: boolean } }) => { + capturedDisableBlockStreaming = args.replyOptions?.disableBlockStreaming; + return { queuedFinal: false, counts: { final: 0, block: 0, tool: 0 } }; + }, + ) as never, + }); + + await handler( + "!room:example.org", + createMatrixTextMessageEvent({ eventId: "$msg1", body: "hello" }), + ); + + expect(capturedDisableBlockStreaming).toBe(true); + }); + it("allows shared block streaming when partial previews and block streaming are both enabled", async () => { let capturedDisableBlockStreaming: boolean | undefined; diff --git a/extensions/matrix/src/matrix/monitor/handler.ts b/extensions/matrix/src/matrix/monitor/handler.ts index 54342d2de07..62563669061 100644 --- a/extensions/matrix/src/matrix/monitor/handler.ts +++ b/extensions/matrix/src/matrix/monitor/handler.ts @@ -6,7 +6,12 @@ import { } from "openclaw/plugin-sdk/config-runtime"; import { getSessionBindingService } from "openclaw/plugin-sdk/conversation-runtime"; import { evaluateSupplementalContextVisibility } from "openclaw/plugin-sdk/security-runtime"; -import type { CoreConfig, MatrixRoomConfig, ReplyToMode } from "../../types.js"; +import type { + CoreConfig, + MatrixRoomConfig, + MatrixStreamingMode, + ReplyToMode, +} from "../../types.js"; import { createMatrixDraftStream } from "../draft-stream.js"; import { isMatrixMediaSizeLimitError } from "../media-errors.js"; import { @@ -109,7 +114,7 @@ export type MatrixMonitorHandlerParams = { dmThreadReplies?: "off" | "inbound" | "always"; /** DM session grouping behavior. */ dmSessionScope?: "per-user" | "per-room"; - streaming: "partial" | "off"; + streaming: MatrixStreamingMode; blockStreamingEnabled: boolean; dmEnabled: boolean; dmPolicy: "open" | "pairing" | "allowlist" | "disabled"; @@ -1293,13 +1298,15 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam }); }, }); - const draftStreamingEnabled = streaming === "partial"; + const draftStreamingEnabled = streaming !== "off"; + const quietDraftStreaming = streaming === "quiet"; const draftReplyToId = replyToMode !== "off" && !threadTarget ? _messageId : undefined; const draftStream = draftStreamingEnabled ? createMatrixDraftStream({ roomId, client, cfg, + mode: quietDraftStreaming ? "quiet" : "partial", threadId: threadTarget, replyToId: draftReplyToId, preserveReplyId: replyToMode === "all", @@ -1432,7 +1439,9 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam cfg, threadId: threadTarget, accountId: _route.accountId, - extraContent: buildMatrixFinalizedPreviewContent(), + extraContent: quietDraftStreaming + ? buildMatrixFinalizedPreviewContent() + : undefined, }); } catch { await redactMatrixDraftEvent(client, roomId, draftEventId); diff --git a/extensions/matrix/src/matrix/monitor/index.ts b/extensions/matrix/src/matrix/monitor/index.ts index 5c70504c2ce..7b301084d53 100644 --- a/extensions/matrix/src/matrix/monitor/index.ts +++ b/extensions/matrix/src/matrix/monitor/index.ts @@ -215,8 +215,12 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi const historyLimit = Math.max(0, accountConfig.historyLimit ?? globalGroupChatHistoryLimit ?? 0); const mediaMaxMb = opts.mediaMaxMb ?? accountConfig.mediaMaxMb ?? DEFAULT_MEDIA_MAX_MB; const mediaMaxBytes = Math.max(1, mediaMaxMb) * 1024 * 1024; - const streaming: "partial" | "off" = - accountConfig.streaming === true || accountConfig.streaming === "partial" ? "partial" : "off"; + const streaming: "partial" | "quiet" | "off" = + accountConfig.streaming === true || accountConfig.streaming === "partial" + ? "partial" + : accountConfig.streaming === "quiet" + ? "quiet" + : "off"; const blockStreamingEnabled = accountConfig.blockStreaming === true; const startupMs = Date.now(); const startupGraceMs = 0; diff --git a/extensions/matrix/src/types.ts b/extensions/matrix/src/types.ts index 725a08b5a27..ff1bb9bff9a 100644 --- a/extensions/matrix/src/types.ts +++ b/extensions/matrix/src/types.ts @@ -83,6 +83,8 @@ export type MatrixExecApprovalConfig = { target?: MatrixExecApprovalTarget; }; +export type MatrixStreamingMode = "partial" | "quiet" | "off"; + export type MatrixNetworkConfig = { /** Dangerous opt-in for trusted private/internal Matrix homeservers. */ dangerouslyAllowPrivateNetwork?: boolean; @@ -189,16 +191,20 @@ export type MatrixConfig = { /** * Streaming mode for Matrix replies. * - `"partial"`: edit a single draft message in place for the current + * assistant block as the model generates text using normal Matrix text + * messages. This preserves legacy preview-first notification behavior. + * - `"quiet"`: edit a single quiet draft notice in place for the current * assistant block as the model generates text. * - `"off"`: deliver the full reply once the model finishes. * - Use `blockStreaming: true` when you want completed assistant blocks to * stay visible as separate progress messages. When combined with - * `"partial"`, Matrix keeps a live draft for the current block and + * preview streaming, Matrix keeps a live draft for the current block and * preserves completed blocks as separate messages. - * - `true` maps to `"partial"`, `false` maps to `"off"`. + * - `true` maps to `"partial"`, `false` maps to `"off"` for backward + * compatibility. * Default: `"off"`. */ - streaming?: "partial" | "off" | boolean; + streaming?: MatrixStreamingMode | boolean; }; export type CoreConfig = { diff --git a/src/config/bundled-channel-config-metadata.generated.ts b/src/config/bundled-channel-config-metadata.generated.ts index a2eac944777..a473364860d 100644 --- a/src/config/bundled-channel-config-metadata.generated.ts +++ b/src/config/bundled-channel-config-metadata.generated.ts @@ -6692,7 +6692,7 @@ export const GENERATED_BUNDLED_CHANNEL_CONFIG_METADATA = [ anyOf: [ { type: "string", - enum: ["partial", "off"], + enum: ["partial", "quiet", "off"], }, { type: "boolean",