feat(matrix): add draft streaming (edit-in-place partial replies) (#56387)

Merged via squash.

Prepared head SHA: 53e566bf30
Co-authored-by: jrusz <55534579+jrusz@users.noreply.github.com>
Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com>
Reviewed-by: @gumadeiras
This commit is contained in:
Jakub Rusz 2026-03-29 05:43:02 +00:00 committed by GitHub
parent dd61171f5b
commit 7e7e45c2f3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 1561 additions and 46 deletions

View File

@ -65,6 +65,7 @@ Docs: https://docs.openclaw.ai
- Agents/compaction: surface safeguard-specific cancel reasons and relabel benign manual `/compact` no-op cases as skipped instead of failed. (#51072) Thanks @afurm.
- Docs: add `pnpm docs:check-links:anchors` for Mintlify anchor validation while keeping `scripts/docs-link-audit.mjs` as the stable link-audit entrypoint. (#55912) Thanks @velvet-shark.
- Tavily: mark outbound API requests with `X-Client-Source: openclaw` so Tavily can attribute OpenClaw-originated traffic. (#55335) Thanks @lakshyaag-tavily.
- Matrix/streaming: add `streaming: "partial"` draft replies that stay on a single editable preview message, stop preview streaming once text no longer fits one Matrix event, and clear stale previews before media-only finals. (#56387) thanks @jrusz.
### Fixes

View File

@ -2608,6 +2608,26 @@
"tags": [],
"hasChildren": false
},
{
"path": "agents.defaults.memorySearch.store.fts",
"kind": "core",
"type": "object",
"required": false,
"deprecated": false,
"sensitive": false,
"tags": [],
"hasChildren": true
},
{
"path": "agents.defaults.memorySearch.store.fts.tokenizer",
"kind": "core",
"type": "string",
"required": false,
"deprecated": false,
"sensitive": false,
"tags": [],
"hasChildren": false
},
{
"path": "agents.defaults.memorySearch.store.path",
"kind": "core",
@ -5028,6 +5048,26 @@
"tags": [],
"hasChildren": false
},
{
"path": "agents.list.*.memorySearch.store.fts",
"kind": "core",
"type": "object",
"required": false,
"deprecated": false,
"sensitive": false,
"tags": [],
"hasChildren": true
},
{
"path": "agents.list.*.memorySearch.store.fts.tokenizer",
"kind": "core",
"type": "string",
"required": false,
"deprecated": false,
"sensitive": false,
"tags": [],
"hasChildren": false
},
{
"path": "agents.list.*.memorySearch.store.path",
"kind": "core",
@ -21273,6 +21313,66 @@
],
"hasChildren": false
},
{
"path": "channels.line.accounts.*.threadBindings",
"kind": "channel",
"type": "object",
"required": false,
"deprecated": false,
"sensitive": false,
"tags": [],
"hasChildren": true
},
{
"path": "channels.line.accounts.*.threadBindings.enabled",
"kind": "channel",
"type": "boolean",
"required": false,
"deprecated": false,
"sensitive": false,
"tags": [],
"hasChildren": false
},
{
"path": "channels.line.accounts.*.threadBindings.idleHours",
"kind": "channel",
"type": "number",
"required": false,
"deprecated": false,
"sensitive": false,
"tags": [],
"hasChildren": false
},
{
"path": "channels.line.accounts.*.threadBindings.maxAgeHours",
"kind": "channel",
"type": "number",
"required": false,
"deprecated": false,
"sensitive": false,
"tags": [],
"hasChildren": false
},
{
"path": "channels.line.accounts.*.threadBindings.spawnAcpSessions",
"kind": "channel",
"type": "boolean",
"required": false,
"deprecated": false,
"sensitive": false,
"tags": [],
"hasChildren": false
},
{
"path": "channels.line.accounts.*.threadBindings.spawnSubagentSessions",
"kind": "channel",
"type": "boolean",
"required": false,
"deprecated": false,
"sensitive": false,
"tags": [],
"hasChildren": false
},
{
"path": "channels.line.accounts.*.tokenFile",
"kind": "channel",
@ -21562,6 +21662,66 @@
],
"hasChildren": false
},
{
"path": "channels.line.threadBindings",
"kind": "channel",
"type": "object",
"required": false,
"deprecated": false,
"sensitive": false,
"tags": [],
"hasChildren": true
},
{
"path": "channels.line.threadBindings.enabled",
"kind": "channel",
"type": "boolean",
"required": false,
"deprecated": false,
"sensitive": false,
"tags": [],
"hasChildren": false
},
{
"path": "channels.line.threadBindings.idleHours",
"kind": "channel",
"type": "number",
"required": false,
"deprecated": false,
"sensitive": false,
"tags": [],
"hasChildren": false
},
{
"path": "channels.line.threadBindings.maxAgeHours",
"kind": "channel",
"type": "number",
"required": false,
"deprecated": false,
"sensitive": false,
"tags": [],
"hasChildren": false
},
{
"path": "channels.line.threadBindings.spawnAcpSessions",
"kind": "channel",
"type": "boolean",
"required": false,
"deprecated": false,
"sensitive": false,
"tags": [],
"hasChildren": false
},
{
"path": "channels.line.threadBindings.spawnSubagentSessions",
"kind": "channel",
"type": "boolean",
"required": false,
"deprecated": false,
"sensitive": false,
"tags": [],
"hasChildren": false
},
{
"path": "channels.line.tokenFile",
"kind": "channel",
@ -22583,6 +22743,23 @@
"tags": [],
"hasChildren": false
},
{
"path": "channels.matrix.streaming",
"kind": "channel",
"type": [
"boolean",
"string"
],
"required": false,
"enumValues": [
"partial",
"off"
],
"deprecated": false,
"sensitive": false,
"tags": [],
"hasChildren": false
},
{
"path": "channels.matrix.textChunkLimit",
"kind": "channel",

View File

@ -1,4 +1,4 @@
{"generatedBy":"scripts/generate-config-doc-baseline.ts","recordType":"meta","totalPaths":5576}
{"generatedBy":"scripts/generate-config-doc-baseline.ts","recordType":"meta","totalPaths":5593}
{"recordType":"path","path":"acp","kind":"core","type":"object","required":false,"deprecated":false,"sensitive":false,"tags":["advanced"],"label":"ACP","help":"ACP runtime controls for enabling dispatch, selecting backends, constraining allowed agent targets, and tuning streamed turn projection behavior.","hasChildren":true}
{"recordType":"path","path":"acp.allowedAgents","kind":"core","type":"array","required":false,"deprecated":false,"sensitive":false,"tags":["access"],"label":"ACP Allowed Agents","help":"Allowlist of ACP target agent ids permitted for ACP runtime sessions. Empty means no additional allowlist restriction.","hasChildren":true}
{"recordType":"path","path":"acp.allowedAgents.*","kind":"core","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false}
@ -217,6 +217,8 @@
{"recordType":"path","path":"agents.defaults.memorySearch.sources.*","kind":"core","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false}
{"recordType":"path","path":"agents.defaults.memorySearch.store","kind":"core","type":"object","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":true}
{"recordType":"path","path":"agents.defaults.memorySearch.store.driver","kind":"core","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false}
{"recordType":"path","path":"agents.defaults.memorySearch.store.fts","kind":"core","type":"object","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":true}
{"recordType":"path","path":"agents.defaults.memorySearch.store.fts.tokenizer","kind":"core","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false}
{"recordType":"path","path":"agents.defaults.memorySearch.store.path","kind":"core","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":["storage"],"label":"Memory Search Index Path","help":"Sets where the SQLite memory index is stored on disk for each agent. Keep the default `~/.openclaw/memory/{agentId}.sqlite` unless you need custom storage placement or backup policy alignment.","hasChildren":false}
{"recordType":"path","path":"agents.defaults.memorySearch.store.vector","kind":"core","type":"object","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":true}
{"recordType":"path","path":"agents.defaults.memorySearch.store.vector.enabled","kind":"core","type":"boolean","required":false,"deprecated":false,"sensitive":false,"tags":["storage"],"label":"Memory Search Vector Index","help":"Enables the sqlite-vec extension used for vector similarity queries in memory search (default: true). Keep this enabled for normal semantic recall; disable only for debugging or fallback-only operation.","hasChildren":false}
@ -443,6 +445,8 @@
{"recordType":"path","path":"agents.list.*.memorySearch.sources.*","kind":"core","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false}
{"recordType":"path","path":"agents.list.*.memorySearch.store","kind":"core","type":"object","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":true}
{"recordType":"path","path":"agents.list.*.memorySearch.store.driver","kind":"core","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false}
{"recordType":"path","path":"agents.list.*.memorySearch.store.fts","kind":"core","type":"object","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":true}
{"recordType":"path","path":"agents.list.*.memorySearch.store.fts.tokenizer","kind":"core","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false}
{"recordType":"path","path":"agents.list.*.memorySearch.store.path","kind":"core","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false}
{"recordType":"path","path":"agents.list.*.memorySearch.store.vector","kind":"core","type":"object","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":true}
{"recordType":"path","path":"agents.list.*.memorySearch.store.vector.enabled","kind":"core","type":"boolean","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false}
@ -1893,6 +1897,12 @@
{"recordType":"path","path":"channels.line.accounts.*.name","kind":"channel","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false}
{"recordType":"path","path":"channels.line.accounts.*.responsePrefix","kind":"channel","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false}
{"recordType":"path","path":"channels.line.accounts.*.secretFile","kind":"channel","type":"string","required":false,"deprecated":false,"sensitive":true,"tags":["auth","channels","network","security","storage"],"hasChildren":false}
{"recordType":"path","path":"channels.line.accounts.*.threadBindings","kind":"channel","type":"object","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":true}
{"recordType":"path","path":"channels.line.accounts.*.threadBindings.enabled","kind":"channel","type":"boolean","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false}
{"recordType":"path","path":"channels.line.accounts.*.threadBindings.idleHours","kind":"channel","type":"number","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false}
{"recordType":"path","path":"channels.line.accounts.*.threadBindings.maxAgeHours","kind":"channel","type":"number","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false}
{"recordType":"path","path":"channels.line.accounts.*.threadBindings.spawnAcpSessions","kind":"channel","type":"boolean","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false}
{"recordType":"path","path":"channels.line.accounts.*.threadBindings.spawnSubagentSessions","kind":"channel","type":"boolean","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false}
{"recordType":"path","path":"channels.line.accounts.*.tokenFile","kind":"channel","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false}
{"recordType":"path","path":"channels.line.accounts.*.webhookPath","kind":"channel","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false}
{"recordType":"path","path":"channels.line.allowFrom","kind":"channel","type":"array","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":true}
@ -1918,6 +1928,12 @@
{"recordType":"path","path":"channels.line.name","kind":"channel","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false}
{"recordType":"path","path":"channels.line.responsePrefix","kind":"channel","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false}
{"recordType":"path","path":"channels.line.secretFile","kind":"channel","type":"string","required":false,"deprecated":false,"sensitive":true,"tags":["auth","channels","network","security","storage"],"hasChildren":false}
{"recordType":"path","path":"channels.line.threadBindings","kind":"channel","type":"object","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":true}
{"recordType":"path","path":"channels.line.threadBindings.enabled","kind":"channel","type":"boolean","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false}
{"recordType":"path","path":"channels.line.threadBindings.idleHours","kind":"channel","type":"number","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false}
{"recordType":"path","path":"channels.line.threadBindings.maxAgeHours","kind":"channel","type":"number","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false}
{"recordType":"path","path":"channels.line.threadBindings.spawnAcpSessions","kind":"channel","type":"boolean","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false}
{"recordType":"path","path":"channels.line.threadBindings.spawnSubagentSessions","kind":"channel","type":"boolean","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false}
{"recordType":"path","path":"channels.line.tokenFile","kind":"channel","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false}
{"recordType":"path","path":"channels.line.webhookPath","kind":"channel","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false}
{"recordType":"path","path":"channels.matrix","kind":"channel","type":"object","required":false,"deprecated":false,"sensitive":false,"tags":["channels","network"],"label":"Matrix","help":"open protocol; install the plugin to enable.","hasChildren":true}
@ -2011,6 +2027,7 @@
{"recordType":"path","path":"channels.matrix.rooms.*.users.*","kind":"channel","type":["number","string"],"required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false}
{"recordType":"path","path":"channels.matrix.startupVerification","kind":"channel","type":"string","required":false,"enumValues":["off","if-unverified"],"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false}
{"recordType":"path","path":"channels.matrix.startupVerificationCooldownHours","kind":"channel","type":"number","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false}
{"recordType":"path","path":"channels.matrix.streaming","kind":"channel","type":["boolean","string"],"required":false,"enumValues":["partial","off"],"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false}
{"recordType":"path","path":"channels.matrix.textChunkLimit","kind":"channel","type":"number","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false}
{"recordType":"path","path":"channels.matrix.threadBindings","kind":"channel","type":"object","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":true}
{"recordType":"path","path":"channels.matrix.threadBindings.enabled","kind":"channel","type":"boolean","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false}

View File

@ -157,11 +157,36 @@ This is a practical baseline config with DM pairing, room allowlist, and E2EE en
autoJoinAllowlist: ["!roomid:example.org"],
threadReplies: "inbound",
replyToMode: "off",
streaming: "partial",
},
},
}
```
## Streaming previews
Matrix reply streaming is opt-in.
Set `channels.matrix.streaming` to `"partial"` when you want OpenClaw to send a single draft reply,
edit that draft in place while the model is generating text, and then finalize it when the reply is
done:
```json5
{
channels: {
matrix: {
streaming: "partial",
},
},
}
```
- `streaming: "off"` is the default. OpenClaw waits for the final reply and sends it once.
- `streaming: "partial"` creates one editable preview message instead of sending multiple partial messages.
- If the preview no longer fits in one Matrix event, OpenClaw stops preview streaming and falls back to normal final delivery.
- Media replies still send attachments normally. If a stale preview can no longer be reused safely, OpenClaw redacts it before sending the final media reply.
- Preview edits cost extra Matrix API calls. Leave streaming off if you want the most conservative rate-limit behavior.
## E2EE setup
## Bot to bot rooms
@ -673,6 +698,7 @@ Live directory lookup uses the logged-in Matrix account:
- `groupAllowFrom`: allowlist of user IDs for room traffic.
- `groupAllowFrom` entries should be full Matrix user IDs. Unresolved names are ignored at runtime.
- `replyToMode`: `off`, `first`, or `all`.
- `streaming`: `off` (default) or `partial`. `partial` enables single-message draft previews with edit-in-place updates.
- `threadReplies`: `off`, `inbound`, or `always`.
- `threadBindings`: per-channel overrides for thread-bound session routing and lifecycle.
- `startupVerification`: automatic self-verification request mode on startup (`if-unverified`, `off`).

View File

@ -64,6 +64,7 @@ export const MatrixConfigSchema = z.object({
allowlistOnly: z.boolean().optional(),
allowBots: z.union([z.boolean(), z.literal("mentions")]).optional(),
groupPolicy: GroupPolicySchema.optional(),
streaming: z.union([z.enum(["partial", "off"]), z.boolean()]).optional(),
replyToMode: z.enum(["off", "first", "all"]).optional(),
threadReplies: z.enum(["off", "inbound", "always"]).optional(),
textChunkLimit: z.number().optional(),

View File

@ -0,0 +1,326 @@
import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
import type { PluginRuntime } from "../runtime-api.js";
const loadConfigMock = vi.fn(() => ({}));
const resolveTextChunkLimitMock = vi.fn<
(cfg: unknown, channel: unknown, accountId?: unknown) => number
>(() => 4000);
const resolveChunkModeMock = vi.fn<(cfg: unknown, channel: unknown, accountId?: unknown) => string>(
() => "length",
);
const chunkMarkdownTextWithModeMock = vi.fn((text: string) => (text ? [text] : []));
const convertMarkdownTablesMock = vi.fn((text: string) => text);
const runtimeStub = {
config: { loadConfig: () => loadConfigMock() },
channel: {
text: {
resolveTextChunkLimit: (cfg: unknown, channel: unknown, accountId?: unknown) =>
resolveTextChunkLimitMock(cfg, channel, accountId),
resolveChunkMode: (cfg: unknown, channel: unknown, accountId?: unknown) =>
resolveChunkModeMock(cfg, channel, accountId),
chunkMarkdownText: (text: string) => (text ? [text] : []),
chunkMarkdownTextWithMode: (text: string) => chunkMarkdownTextWithModeMock(text),
resolveMarkdownTableMode: () => "code",
convertMarkdownTables: (text: string) => convertMarkdownTablesMock(text),
},
},
} as unknown as PluginRuntime;
let createMatrixDraftStream: typeof import("./draft-stream.js").createMatrixDraftStream;
const sendMessageMock = vi.fn();
const sendEventMock = vi.fn();
const joinedRoomsMock = vi.fn().mockResolvedValue([]);
function createMockClient() {
sendMessageMock.mockReset().mockResolvedValue("$evt1");
sendEventMock.mockReset().mockResolvedValue("$evt2");
joinedRoomsMock.mockReset().mockResolvedValue(["!room:test"]);
return {
sendMessage: sendMessageMock,
sendEvent: sendEventMock,
getJoinedRooms: joinedRoomsMock,
prepareForOneOff: vi.fn().mockResolvedValue(undefined),
start: vi.fn().mockResolvedValue(undefined),
} as unknown as import("./sdk.js").MatrixClient;
}
beforeAll(async () => {
vi.resetModules();
const runtimeModule = await import("../runtime.js");
runtimeModule.setMatrixRuntime(runtimeStub);
({ createMatrixDraftStream } = await import("./draft-stream.js"));
});
describe("createMatrixDraftStream", () => {
let client: ReturnType<typeof createMockClient>;
beforeEach(() => {
vi.useFakeTimers();
client = createMockClient();
resolveTextChunkLimitMock.mockReset().mockReturnValue(4000);
resolveChunkModeMock.mockReset().mockReturnValue("length");
chunkMarkdownTextWithModeMock
.mockReset()
.mockImplementation((text: string) => (text ? [text] : []));
convertMarkdownTablesMock.mockReset().mockImplementation((text: string) => text);
});
afterEach(() => {
vi.useRealTimers();
});
it("sends a new message on first update", async () => {
const stream = createMatrixDraftStream({
roomId: "!room:test",
client,
cfg: {} as import("../types.js").CoreConfig,
});
stream.update("Hello");
await stream.flush();
expect(sendMessageMock).toHaveBeenCalledTimes(1);
expect(stream.eventId()).toBe("$evt1");
});
it("edits the message on subsequent updates", async () => {
const stream = createMatrixDraftStream({
roomId: "!room:test",
client,
cfg: {} as import("../types.js").CoreConfig,
});
stream.update("Hello");
await stream.flush();
expect(sendMessageMock).toHaveBeenCalledTimes(1);
// Advance past throttle window so the next update fires immediately.
vi.advanceTimersByTime(1000);
stream.update("Hello world");
await stream.flush();
// First call = initial send, second call = edit (both go through sendMessage)
expect(sendMessageMock).toHaveBeenCalledTimes(2);
});
it("coalesces rapid updates within throttle window", async () => {
const stream = createMatrixDraftStream({
roomId: "!room:test",
client,
cfg: {} as import("../types.js").CoreConfig,
});
stream.update("A");
stream.update("AB");
stream.update("ABC");
await stream.flush();
// First update fires immediately (fresh throttle window), then AB/ABC
// coalesce into a single edit with the latest text.
expect(sendMessageMock).toHaveBeenCalledTimes(2);
expect(sendMessageMock.mock.calls[0][1]).toMatchObject({ body: "A" });
// Edit uses "* <text>" prefix per Matrix m.replace spec.
expect(sendMessageMock.mock.calls[1][1]).toMatchObject({ body: "* ABC" });
});
it("skips no-op updates", async () => {
const stream = createMatrixDraftStream({
roomId: "!room:test",
client,
cfg: {} as import("../types.js").CoreConfig,
});
stream.update("Hello");
await stream.flush();
const callCount = sendMessageMock.mock.calls.length;
vi.advanceTimersByTime(1000);
// Same text again — should not send
stream.update("Hello");
await stream.flush();
expect(sendMessageMock).toHaveBeenCalledTimes(callCount);
});
it("ignores updates after stop", async () => {
const stream = createMatrixDraftStream({
roomId: "!room:test",
client,
cfg: {} as import("../types.js").CoreConfig,
});
stream.update("Hello");
await stream.stop();
const callCount = sendMessageMock.mock.calls.length;
stream.update("Ignored");
await stream.flush();
expect(sendMessageMock).toHaveBeenCalledTimes(callCount);
});
it("stop returns the event ID", async () => {
const stream = createMatrixDraftStream({
roomId: "!room:test",
client,
cfg: {} as import("../types.js").CoreConfig,
});
stream.update("Hello");
const eventId = await stream.stop();
expect(eventId).toBe("$evt1");
});
it("reset allows reuse for next block", async () => {
sendMessageMock.mockResolvedValueOnce("$first").mockResolvedValueOnce("$second");
const stream = createMatrixDraftStream({
roomId: "!room:test",
client,
cfg: {} as import("../types.js").CoreConfig,
});
stream.update("Block 1");
await stream.stop();
expect(stream.eventId()).toBe("$first");
stream.reset();
expect(stream.eventId()).toBeUndefined();
stream.update("Block 2");
await stream.stop();
expect(stream.eventId()).toBe("$second");
});
it("stops retrying after send failure", async () => {
sendMessageMock.mockRejectedValueOnce(new Error("network error"));
const log = vi.fn();
const stream = createMatrixDraftStream({
roomId: "!room:test",
client,
cfg: {} as import("../types.js").CoreConfig,
log,
});
stream.update("Hello");
await stream.flush();
// Should have logged the failure
expect(log).toHaveBeenCalledWith(expect.stringContaining("send/edit failed"));
vi.advanceTimersByTime(1000);
// Further updates should not attempt sends (stream is stopped)
stream.update("More text");
await stream.flush();
// Only the initial failed attempt
expect(sendMessageMock).toHaveBeenCalledTimes(1);
expect(stream.eventId()).toBeUndefined();
});
it("skips empty/whitespace text", async () => {
const stream = createMatrixDraftStream({
roomId: "!room:test",
client,
cfg: {} as import("../types.js").CoreConfig,
});
stream.update(" ");
await stream.flush();
expect(sendMessageMock).not.toHaveBeenCalled();
});
it("stops on edit failure mid-stream", async () => {
sendMessageMock
.mockResolvedValueOnce("$evt1") // initial send succeeds
.mockRejectedValueOnce(new Error("rate limited")); // edit fails
const log = vi.fn();
const stream = createMatrixDraftStream({
roomId: "!room:test",
client,
cfg: {} as import("../types.js").CoreConfig,
log,
});
stream.update("Hello");
await stream.flush();
expect(stream.eventId()).toBe("$evt1");
vi.advanceTimersByTime(1000);
stream.update("Hello world");
await stream.flush();
expect(log).toHaveBeenCalledWith(expect.stringContaining("send/edit failed"));
vi.advanceTimersByTime(1000);
// Stream should be stopped — further updates are ignored
stream.update("More text");
await stream.flush();
expect(sendMessageMock).toHaveBeenCalledTimes(2);
});
it("bypasses newline chunking for the draft preview message", async () => {
resolveChunkModeMock.mockReturnValue("newline");
chunkMarkdownTextWithModeMock.mockImplementation((text: string) => text.split("\n"));
const stream = createMatrixDraftStream({
roomId: "!room:test",
client,
cfg: {} as import("../types.js").CoreConfig,
});
stream.update("line 1\nline 2");
await stream.flush();
expect(sendMessageMock).toHaveBeenCalledTimes(1);
expect(sendMessageMock.mock.calls[0]?.[1]).toMatchObject({ body: "line 1\nline 2" });
});
it("falls back to normal delivery when preview text exceeds one Matrix event", async () => {
const log = vi.fn();
resolveTextChunkLimitMock.mockReturnValue(5);
const stream = createMatrixDraftStream({
roomId: "!room:test",
client,
cfg: {} as import("../types.js").CoreConfig,
log,
});
stream.update("123456");
await stream.flush();
expect(sendMessageMock).not.toHaveBeenCalled();
expect(stream.eventId()).toBeUndefined();
expect(stream.mustDeliverFinalNormally()).toBe(true);
expect(log).toHaveBeenCalledWith(
expect.stringContaining("preview exceeded single-event limit"),
);
});
it("uses converted Matrix text when checking the single-event preview limit", async () => {
const log = vi.fn();
resolveTextChunkLimitMock.mockReturnValue(5);
convertMarkdownTablesMock.mockImplementation(() => "123456");
const stream = createMatrixDraftStream({
roomId: "!room:test",
client,
cfg: {} as import("../types.js").CoreConfig,
log,
});
stream.update("1234");
await stream.flush();
expect(sendMessageMock).not.toHaveBeenCalled();
expect(stream.mustDeliverFinalNormally()).toBe(true);
expect(log).toHaveBeenCalledWith(
expect.stringContaining("preview exceeded single-event limit"),
);
});
});

View File

@ -0,0 +1,154 @@
import { createDraftStreamLoop } from "openclaw/plugin-sdk/channel-lifecycle";
import type { CoreConfig } from "../types.js";
import type { MatrixClient } from "./sdk.js";
import { editMessageMatrix, prepareMatrixSingleText, sendSingleTextMessageMatrix } from "./send.js";
const DEFAULT_THROTTLE_MS = 1000;
export type MatrixDraftStream = {
/** Update the draft with the latest accumulated text for the current block. */
update: (text: string) => void;
/** Ensure the last pending update has been sent. */
flush: () => Promise<void>;
/** Flush and mark this block as done. Returns the event ID if a message was sent. */
stop: () => Promise<string | undefined>;
/** Reset state for the next text block (after tool calls). */
reset: () => void;
/** The event ID of the current draft message, if any. */
eventId: () => string | undefined;
/** The last text successfully sent or edited. */
lastSentText: () => string;
/** True when preview streaming must fall back to normal final delivery. */
mustDeliverFinalNormally: () => boolean;
};
export function createMatrixDraftStream(params: {
roomId: string;
client: MatrixClient;
cfg: CoreConfig;
threadId?: string;
replyToId?: string;
/** When true, reset() restores the original replyToId instead of clearing it. */
preserveReplyId?: boolean;
accountId?: string;
log?: (message: string) => void;
}): MatrixDraftStream {
const { roomId, client, cfg, threadId, accountId, log } = params;
let currentEventId: string | undefined;
let lastSentText = "";
let stopped = false;
let sendFailed = false;
let finalizeInPlaceBlocked = false;
let replyToId = params.replyToId;
const sendOrEdit = async (text: string): Promise<boolean> => {
const trimmed = text.trimEnd();
if (!trimmed) {
return false;
}
const preparedText = prepareMatrixSingleText(trimmed, { cfg, accountId });
if (!preparedText.fitsInSingleEvent) {
finalizeInPlaceBlocked = true;
if (!currentEventId) {
sendFailed = true;
}
stopped = true;
log?.(
`draft-stream: preview exceeded single-event limit (${preparedText.convertedText.length} > ${preparedText.singleEventLimit})`,
);
return false;
}
// If the initial send failed, stop trying for this block. The deliver
// callback will fall back to deliverMatrixReplies.
if (sendFailed) {
return false;
}
if (preparedText.trimmedText === lastSentText) {
return true;
}
try {
if (!currentEventId) {
const result = await sendSingleTextMessageMatrix(roomId, preparedText.trimmedText, {
client,
cfg,
replyToId,
threadId,
accountId,
});
currentEventId = result.messageId;
lastSentText = preparedText.trimmedText;
log?.(`draft-stream: created message ${currentEventId}`);
} else {
await editMessageMatrix(roomId, currentEventId, preparedText.trimmedText, {
client,
cfg,
threadId,
accountId,
});
lastSentText = preparedText.trimmedText;
}
return true;
} catch (err) {
log?.(`draft-stream: send/edit failed: ${String(err)}`);
const isPreviewLimitError =
err instanceof Error && err.message.startsWith("Matrix single-message text exceeds limit");
if (isPreviewLimitError) {
// Once the preview no longer fits in one editable event, preserve the
// current preview as-is and fall back to normal final delivery.
finalizeInPlaceBlocked = true;
}
if (!currentEventId) {
// First send failed — give up for this block so the deliver callback
// falls through to normal delivery.
sendFailed = true;
}
// Signal failure so the loop stops retrying.
stopped = true;
return false;
}
};
const loop = createDraftStreamLoop({
throttleMs: DEFAULT_THROTTLE_MS,
isStopped: () => stopped,
sendOrEditStreamMessage: sendOrEdit,
});
log?.(`draft-stream: ready (throttleMs=${DEFAULT_THROTTLE_MS})`);
const stop = async (): Promise<string | undefined> => {
// Flush before marking stopped so the loop can drain pending text.
await loop.flush();
stopped = true;
return currentEventId;
};
const reset = (): void => {
// Clear reply context unless preserveReplyId is set (replyToMode "all"),
// in which case subsequent blocks should keep replying to the original.
replyToId = params.preserveReplyId ? params.replyToId : undefined;
currentEventId = undefined;
lastSentText = "";
stopped = false;
sendFailed = false;
finalizeInPlaceBlocked = false;
loop.resetPending();
loop.resetThrottleWindow();
};
return {
update: (text: string) => {
if (stopped) {
return;
}
loop.update(text);
},
flush: loop.flush,
stop,
reset,
eventId: () => currentEventId,
lastSentText: () => lastSentText,
mustDeliverFinalNormally: () => sendFailed || finalizeInPlaceBlocked,
};
}

View File

@ -30,6 +30,7 @@ type MatrixHandlerTestHarnessOptions = {
groupPolicy?: "open" | "allowlist" | "disabled";
replyToMode?: ReplyToMode;
threadReplies?: "off" | "inbound" | "always";
streaming?: "partial" | "off";
dmEnabled?: boolean;
dmPolicy?: "pairing" | "allowlist" | "open" | "disabled";
textLimit?: number;
@ -210,6 +211,7 @@ export function createMatrixHandlerTestHarness(
groupPolicy: options.groupPolicy ?? "open",
replyToMode: options.replyToMode ?? "off",
threadReplies: options.threadReplies ?? "inbound",
streaming: options.streaming ?? "off",
dmEnabled: options.dmEnabled ?? true,
dmPolicy: options.dmPolicy ?? "open",
textLimit: options.textLimit ?? 8_000,

View File

@ -17,17 +17,50 @@ import { EventType } from "./types.js";
const sendMessageMatrixMock = vi.hoisted(() =>
vi.fn(async (..._args: unknown[]) => ({ messageId: "evt", roomId: "!room" })),
);
const sendSingleTextMessageMatrixMock = vi.hoisted(() =>
vi.fn(async (..._args: unknown[]) => ({ messageId: "$draft1", roomId: "!room" })),
);
const editMessageMatrixMock = vi.hoisted(() => vi.fn(async () => "$edited"));
const prepareMatrixSingleTextMock = vi.hoisted(() =>
vi.fn((text: string) => {
const trimmedText = text.trim();
return {
trimmedText,
convertedText: trimmedText,
singleEventLimit: 4000,
fitsInSingleEvent: true,
};
}),
);
vi.mock("../send.js", () => ({
editMessageMatrix: editMessageMatrixMock,
prepareMatrixSingleText: prepareMatrixSingleTextMock,
reactMatrixMessage: vi.fn(async () => {}),
sendMessageMatrix: sendMessageMatrixMock,
sendSingleTextMessageMatrix: sendSingleTextMessageMatrixMock,
sendReadReceiptMatrix: vi.fn(async () => {}),
sendTypingMatrix: vi.fn(async () => {}),
}));
const deliverMatrixRepliesMock = vi.hoisted(() => vi.fn(async () => {}));
vi.mock("./replies.js", () => ({
deliverMatrixReplies: deliverMatrixRepliesMock,
}));
beforeEach(() => {
sessionBindingTesting.resetSessionBindingAdaptersForTests();
installMatrixMonitorTestRuntime();
prepareMatrixSingleTextMock.mockReset().mockImplementation((text: string) => {
const trimmedText = text.trim();
return {
trimmedText,
convertedText: trimmedText,
singleEventLimit: 4000,
fitsInSingleEvent: true,
};
});
});
function createReactionHarness(params?: {
@ -852,6 +885,7 @@ describe("matrix monitor handler pairing account scope", () => {
groupPolicy: "open",
replyToMode: "off",
threadReplies: "inbound",
streaming: "off",
dmEnabled: true,
dmPolicy: "open",
textLimit: 8_000,
@ -1370,3 +1404,314 @@ describe("matrix monitor handler durable inbound dedupe", () => {
expect(inboundDeduper.releaseEvent).not.toHaveBeenCalled();
});
});
describe("matrix monitor handler draft streaming", () => {
type DeliverFn = (
payload: {
text?: string;
mediaUrl?: string;
mediaUrls?: string[];
isCompactionNotice?: boolean;
replyToId?: string;
},
info: { kind: string },
) => Promise<void>;
type ReplyOpts = {
onPartialReply?: (payload: { text: string }) => void;
onAssistantMessageStart?: () => void;
disableBlockStreaming?: boolean;
};
function createStreamingHarness(opts?: { replyToMode?: "off" | "first" | "all" }) {
let capturedDeliver: DeliverFn | undefined;
let capturedReplyOpts: ReplyOpts | undefined;
// Gate that keeps the handler's model run alive until the test releases it.
let resolveRunGate: (() => void) | undefined;
const runGate = new Promise<void>((resolve) => {
resolveRunGate = resolve;
});
sendMessageMatrixMock.mockReset().mockResolvedValue({ messageId: "$draft1", roomId: "!room" });
sendSingleTextMessageMatrixMock
.mockReset()
.mockResolvedValue({ messageId: "$draft1", roomId: "!room" });
editMessageMatrixMock.mockReset().mockResolvedValue("$edited");
deliverMatrixRepliesMock.mockReset().mockResolvedValue(undefined);
const redactEventMock = vi.fn(async () => "$redacted");
const { handler } = createMatrixHandlerTestHarness({
streaming: "partial",
replyToMode: opts?.replyToMode ?? "off",
client: { redactEvent: redactEventMock },
createReplyDispatcherWithTyping: (params: Record<string, unknown> | undefined) => {
capturedDeliver = params?.deliver as DeliverFn | undefined;
return {
dispatcher: {
markComplete: () => {},
waitForIdle: async () => {},
},
replyOptions: {},
markDispatchIdle: () => {},
markRunComplete: () => {},
};
},
dispatchReplyFromConfig: vi.fn(async (args: { replyOptions?: ReplyOpts }) => {
capturedReplyOpts = args?.replyOptions;
// Block until the test is done exercising callbacks.
await runGate;
return { queuedFinal: true, counts: { final: 1, block: 0, tool: 0 } };
}) as never,
withReplyDispatcher: async <T>(params: {
dispatcher: { markComplete?: () => void; waitForIdle?: () => Promise<void> };
run: () => Promise<T>;
onSettled?: () => void | Promise<void>;
}) => {
const result = await params.run();
await params.onSettled?.();
return result;
},
});
const dispatch = async () => {
// Start handler without awaiting — it blocks on runGate.
const handlerDone = handler(
"!room:example.org",
createMatrixTextMessageEvent({ eventId: "$msg1", body: "hello" }),
);
// Wait for callbacks to be captured.
await vi.waitFor(() => {
if (!capturedDeliver || !capturedReplyOpts) {
throw new Error("Streaming callbacks not captured yet");
}
});
return {
deliver: capturedDeliver!,
opts: capturedReplyOpts!,
// Release the run gate and wait for the handler to finish
// (including the finally block that stops the draft stream).
finish: async () => {
resolveRunGate?.();
await handlerDone;
},
};
};
return { dispatch, redactEventMock };
}
it("falls back to deliverMatrixReplies when final edit fails", async () => {
const { dispatch } = createStreamingHarness();
const { deliver, opts, finish } = await dispatch();
// Simulate streaming: partial reply creates draft message.
opts.onPartialReply?.({ text: "Hello" });
// Wait for the draft stream's immediate send to complete.
await vi.waitFor(() => {
expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1);
});
// Make the final edit fail.
editMessageMatrixMock.mockRejectedValueOnce(new Error("rate limited"));
// Deliver final — should catch edit failure and fall back.
await deliver({ text: "Hello world" }, { kind: "block" });
expect(deliverMatrixRepliesMock).toHaveBeenCalledTimes(1);
await finish();
});
it("does not reset draft stream after final delivery", async () => {
vi.useFakeTimers();
try {
const { dispatch } = createStreamingHarness();
const { deliver, opts, finish } = await dispatch();
opts.onPartialReply?.({ text: "Hello" });
await vi.waitFor(() => {
expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1);
});
// Final delivery — stream should stay stopped.
await deliver({ text: "Hello" }, { kind: "final" });
// Further partial updates should NOT create new messages.
sendSingleTextMessageMatrixMock.mockClear();
opts.onPartialReply?.({ text: "Ghost" });
await vi.advanceTimersByTimeAsync(50);
expect(sendSingleTextMessageMatrixMock).not.toHaveBeenCalled();
await finish();
} finally {
vi.useRealTimers();
}
});
it("resets materializedTextLength on assistant message start", async () => {
const { dispatch } = createStreamingHarness();
const { deliver, opts, finish } = await dispatch();
// Block 1: stream and deliver.
opts.onPartialReply?.({ text: "Block one" });
await vi.waitFor(() => {
expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1);
});
await deliver({ text: "Block one" }, { kind: "block" });
// Tool call delivered (bypasses draft stream).
await deliver({ text: "tool result" }, { kind: "tool" });
// New assistant message starts — payload.text will reset upstream.
opts.onAssistantMessageStart?.();
// Block 2: partial text starts fresh (no stale offset).
sendSingleTextMessageMatrixMock.mockClear();
sendSingleTextMessageMatrixMock.mockResolvedValue({ messageId: "$draft2", roomId: "!room" });
opts.onPartialReply?.({ text: "Block two" });
await vi.waitFor(() => {
expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1);
});
// The draft stream should have received "Block two", not empty string.
const sentBody = sendSingleTextMessageMatrixMock.mock.calls[0]?.[1];
expect(sentBody).toBeTruthy();
await finish();
});
it("stops draft stream on handler error (no leaked timer)", async () => {
vi.useFakeTimers();
try {
sendSingleTextMessageMatrixMock
.mockReset()
.mockResolvedValue({ messageId: "$draft1", roomId: "!room" });
editMessageMatrixMock.mockReset().mockResolvedValue("$edited");
deliverMatrixRepliesMock.mockReset().mockResolvedValue(undefined);
let capturedReplyOpts: ReplyOpts | undefined;
const { handler } = createMatrixHandlerTestHarness({
streaming: "partial",
createReplyDispatcherWithTyping: () => ({
dispatcher: { markComplete: () => {}, waitForIdle: async () => {} },
replyOptions: {},
markDispatchIdle: () => {},
markRunComplete: () => {},
}),
dispatchReplyFromConfig: vi.fn(async (args: { replyOptions?: ReplyOpts }) => {
capturedReplyOpts = args?.replyOptions;
// Simulate streaming then model error.
capturedReplyOpts?.onPartialReply?.({ text: "partial" });
await vi.waitFor(() => {
expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1);
});
throw new Error("model timeout");
}) as never,
withReplyDispatcher: async <T>(params: {
dispatcher: { markComplete?: () => void; waitForIdle?: () => Promise<void> };
run: () => Promise<T>;
onSettled?: () => void | Promise<void>;
}) => {
const result = await params.run();
await params.onSettled?.();
return result;
},
});
// Handler should not throw (outer catch absorbs it).
await handler(
"!room:example.org",
createMatrixTextMessageEvent({ eventId: "$msg1", body: "hello" }),
);
// After handler exits, draft stream timer must not fire.
sendSingleTextMessageMatrixMock.mockClear();
editMessageMatrixMock.mockClear();
await vi.advanceTimersByTimeAsync(50);
expect(sendSingleTextMessageMatrixMock).not.toHaveBeenCalled();
expect(editMessageMatrixMock).not.toHaveBeenCalled();
} finally {
vi.useRealTimers();
}
});
it("skips compaction notices in draft finalization", async () => {
const { dispatch } = createStreamingHarness();
const { deliver, opts, finish } = await dispatch();
opts.onPartialReply?.({ text: "Streaming" });
await vi.waitFor(() => {
expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1);
});
// Compaction notice should bypass draft path and go to normal delivery.
deliverMatrixRepliesMock.mockClear();
await deliver({ text: "Compacting...", isCompactionNotice: true }, { kind: "block" });
expect(deliverMatrixRepliesMock).toHaveBeenCalledTimes(1);
// Edit should NOT have been called for the compaction notice.
expect(editMessageMatrixMock).not.toHaveBeenCalled();
await finish();
});
it("redacts stale draft when payload reply target mismatches", async () => {
const { dispatch, redactEventMock } = createStreamingHarness({ replyToMode: "first" });
const { deliver, opts, finish } = await dispatch();
// Simulate streaming: partial reply creates draft message.
opts.onPartialReply?.({ text: "Partial reply" });
await vi.waitFor(() => {
expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1);
});
// Final delivery carries a different replyToId than the draft's.
deliverMatrixRepliesMock.mockClear();
await deliver({ text: "Final text", replyToId: "$different_msg" }, { kind: "final" });
// Draft should be redacted since it can't change reply relation.
expect(redactEventMock).toHaveBeenCalledWith("!room:example.org", "$draft1");
// Final answer delivered via normal path.
expect(deliverMatrixRepliesMock).toHaveBeenCalledTimes(1);
await finish();
});
it("redacts stale draft when final payload intentionally drops reply threading", async () => {
const { dispatch, redactEventMock } = createStreamingHarness({ replyToMode: "first" });
const { deliver, opts, finish } = await dispatch();
// A tool payload can consume the first reply slot upstream while draft
// streaming for the next assistant block still starts from the original
// reply target.
await deliver({ text: "tool result", replyToId: "$msg1" }, { kind: "tool" });
opts.onAssistantMessageStart?.();
opts.onPartialReply?.({ text: "Partial reply" });
await vi.waitFor(() => {
expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1);
});
deliverMatrixRepliesMock.mockClear();
await deliver({ text: "Final text" }, { kind: "final" });
expect(redactEventMock).toHaveBeenCalledWith("!room:example.org", "$draft1");
expect(deliverMatrixRepliesMock).toHaveBeenCalledTimes(1);
await finish();
});
it("redacts stale draft for media-only finals", async () => {
const { dispatch, redactEventMock } = createStreamingHarness();
const { deliver, opts, finish } = await dispatch();
opts.onPartialReply?.({ text: "Partial reply" });
await vi.waitFor(() => {
expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1);
});
deliverMatrixRepliesMock.mockClear();
await deliver({ mediaUrl: "https://example.com/image.png" }, { kind: "final" });
expect(redactEventMock).toHaveBeenCalledWith("!room:example.org", "$draft1");
expect(deliverMatrixRepliesMock).toHaveBeenCalledTimes(1);
await finish();
});
});

View File

@ -14,6 +14,7 @@ import {
type RuntimeLogger,
} from "../../runtime-api.js";
import type { CoreConfig, MatrixRoomConfig, ReplyToMode } from "../../types.js";
import { createMatrixDraftStream } from "../draft-stream.js";
import { formatMatrixMediaUnavailableText } from "../media-text.js";
import { fetchMatrixPollSnapshot } from "../poll-summary.js";
import {
@ -24,6 +25,7 @@ import {
} from "../poll-types.js";
import type { LocationMessageEventContent, MatrixClient } from "../sdk.js";
import {
editMessageMatrix,
reactMatrixMessage,
sendMessageMatrix,
sendReadReceiptMatrix,
@ -68,6 +70,7 @@ export type MatrixMonitorHandlerParams = {
groupPolicy: "open" | "allowlist" | "disabled";
replyToMode: ReplyToMode;
threadReplies: "off" | "inbound" | "always";
streaming: "partial" | "off";
dmEnabled: boolean;
dmPolicy: "open" | "pairing" | "allowlist" | "disabled";
textLimit: number;
@ -160,6 +163,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
groupPolicy,
replyToMode,
threadReplies,
streaming,
dmEnabled,
dmPolicy,
textLimit,
@ -231,6 +235,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
return async (roomId: string, event: MatrixRawEvent) => {
const eventId = typeof event.event_id === "string" ? event.event_id.trim() : "";
let claimedInboundEvent = false;
let draftStreamRef: ReturnType<typeof createMatrixDraftStream> | undefined;
try {
const eventType = event.type;
if (eventType === EventType.RoomMessageEncrypted) {
@ -904,24 +909,202 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
});
},
});
const streamingEnabled = streaming === "partial";
const draftReplyToId = replyToMode !== "off" && !threadTarget ? _messageId : undefined;
let currentDraftReplyToId = draftReplyToId;
const draftStream = streamingEnabled
? createMatrixDraftStream({
roomId,
client,
cfg,
threadId: threadTarget,
replyToId: draftReplyToId,
preserveReplyId: replyToMode === "all",
accountId: _route.accountId,
log: logVerboseMessage,
})
: undefined;
draftStreamRef = draftStream;
// Track how much of the full accumulated text has been materialized
// (delivered) so each new block only streams the new portion.
let materializedTextLength = 0;
let lastPartialFullTextLength = 0;
// Set after the first final payload consumes the draft event so
// subsequent finals go through normal delivery.
let draftConsumed = false;
const { dispatcher, replyOptions, markDispatchIdle, markRunComplete } =
core.channel.reply.createReplyDispatcherWithTyping({
...prefixOptions,
humanDelay: core.channel.reply.resolveHumanDelayConfig(cfg, _route.agentId),
deliver: async (payload: ReplyPayload) => {
await deliverMatrixReplies({
cfg,
replies: [payload],
roomId,
client,
runtime,
textLimit,
replyToMode,
threadId: threadTarget,
accountId: _route.accountId,
mediaLocalRoots,
tableMode,
});
deliver: async (payload: ReplyPayload, info: { kind: string }) => {
if (draftStream && info.kind !== "tool" && !payload.isCompactionNotice) {
const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
await draftStream.stop();
// After the first final payload consumes the draft, subsequent
// finals must go through normal delivery to avoid overwriting.
if (draftConsumed) {
await deliverMatrixReplies({
cfg,
replies: [payload],
roomId,
client,
runtime,
textLimit,
replyToMode,
threadId: threadTarget,
accountId: _route.accountId,
mediaLocalRoots,
tableMode,
});
return;
}
// Read event id after stop() — flush may have created the
// initial message while draining pending text.
const draftEventId = draftStream.eventId();
// If the payload carries a reply target that differs from the
// draft's, fall through to normal delivery — Matrix edits
// cannot change the reply relation on an existing event.
// Skip when replyToMode is "off" (replies stripped anyway)
// or when threadTarget is set (thread relations take
// precedence over replyToId in deliverMatrixReplies).
const payloadReplyToId = payload.replyToId?.trim() || undefined;
const payloadReplyMismatch =
replyToMode !== "off" &&
!threadTarget &&
payloadReplyToId !== currentDraftReplyToId;
const mustDeliverFinalNormally = draftStream.mustDeliverFinalNormally();
if (
draftEventId &&
payload.text &&
!hasMedia &&
!payloadReplyMismatch &&
!mustDeliverFinalNormally
) {
// Text-only: final edit of the draft message. Skip if
// stop() already flushed identical text to avoid a
// redundant API call that wastes rate-limit budget.
if (payload.text !== draftStream.lastSentText()) {
try {
await editMessageMatrix(roomId, draftEventId, payload.text, {
client,
cfg,
threadId: threadTarget,
accountId: _route.accountId,
});
} catch {
// Edit failed (rate limit, server error) — redact the
// stale draft and fall back to normal delivery so the
// user still gets the final answer.
await client.redactEvent(roomId, draftEventId).catch(() => {});
await deliverMatrixReplies({
cfg,
replies: [payload],
roomId,
client,
runtime,
textLimit,
replyToMode,
threadId: threadTarget,
accountId: _route.accountId,
mediaLocalRoots,
tableMode,
});
}
}
draftConsumed = true;
} else if (draftEventId && hasMedia && !payloadReplyMismatch) {
// Media payload: finalize draft text, send media separately.
let textEditOk = !mustDeliverFinalNormally;
if (textEditOk && payload.text && payload.text !== draftStream.lastSentText()) {
textEditOk = await editMessageMatrix(roomId, draftEventId, payload.text, {
client,
cfg,
threadId: threadTarget,
accountId: _route.accountId,
}).then(
() => true,
() => false,
);
}
const reusesDraftAsFinalText = Boolean(payload.text?.trim()) && textEditOk;
// If the text edit failed, or there is no final text to reuse
// the preview, redact the stale draft and include text in media
// delivery so the final caption is not lost.
if (!reusesDraftAsFinalText) {
await client.redactEvent(roomId, draftEventId).catch(() => {});
}
await deliverMatrixReplies({
cfg,
replies: [
{ ...payload, text: reusesDraftAsFinalText ? undefined : payload.text },
],
roomId,
client,
runtime,
textLimit,
replyToMode,
threadId: threadTarget,
accountId: _route.accountId,
mediaLocalRoots,
tableMode,
});
draftConsumed = true;
} else {
// Redact stale draft when the final delivery will create a
// new message (reply-target mismatch, preview overflow, or no
// usable draft).
if (draftEventId && (payloadReplyMismatch || mustDeliverFinalNormally)) {
await client.redactEvent(roomId, draftEventId).catch(() => {});
}
await deliverMatrixReplies({
cfg,
replies: [payload],
roomId,
client,
runtime,
textLimit,
replyToMode,
threadId: threadTarget,
accountId: _route.accountId,
mediaLocalRoots,
tableMode,
});
}
// Only reset for intermediate blocks — after the final delivery
// the stream must stay stopped so late async callbacks cannot
// create ghost messages.
if (info.kind === "block") {
materializedTextLength = lastPartialFullTextLength;
draftConsumed = false;
draftStream.reset();
currentDraftReplyToId = replyToMode === "all" ? draftReplyToId : undefined;
// Re-assert typing so the user still sees the indicator while
// the next block generates.
await sendTypingMatrix(roomId, true, undefined, client).catch(() => {});
}
} else {
await deliverMatrixReplies({
cfg,
replies: [payload],
roomId,
client,
runtime,
textLimit,
replyToMode,
threadId: threadTarget,
accountId: _route.accountId,
mediaLocalRoots,
tableMode,
});
}
},
onError: (err: unknown, info: { kind: "tool" | "block" | "final" }) => {
if (info.kind === "final") {
@ -949,6 +1132,28 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
replyOptions: {
...replyOptions,
skillFilter: roomConfig?.skills,
// When streaming is active, disable block streaming — draft
// streaming replaces it with edit-in-place updates.
disableBlockStreaming: streamingEnabled ? true : undefined,
onPartialReply: draftStream
? (payload) => {
const fullText = payload.text ?? "";
lastPartialFullTextLength = fullText.length;
const blockText = fullText.slice(materializedTextLength);
if (blockText) {
draftStream.update(blockText);
}
}
: undefined,
// Reset text offset on assistant message boundaries so
// post-tool blocks stream correctly (payload.text resets
// per assistant message upstream).
onAssistantMessageStart: draftStream
? () => {
materializedTextLength = 0;
lastPartialFullTextLength = 0;
}
: undefined,
onModelSelected,
},
});
@ -981,6 +1186,11 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
} catch (err) {
runtime.error?.(`matrix handler failed: ${String(err)}`);
} finally {
// Stop the draft stream timer so partial drafts don't leak if the
// model run throws or times out mid-stream.
if (draftStreamRef) {
await draftStreamRef.stop().catch(() => {});
}
if (claimedInboundEvent && inboundDeduper && eventId) {
inboundDeduper.releaseEvent({ roomId, eventId });
}

View File

@ -199,6 +199,8 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
const textLimit = core.channel.text.resolveTextChunkLimit(cfg, "matrix", account.accountId);
const mediaMaxMb = opts.mediaMaxMb ?? accountConfig.mediaMaxMb ?? DEFAULT_MEDIA_MAX_MB;
const mediaMaxBytes = Math.max(1, mediaMaxMb) * 1024 * 1024;
const streaming: "partial" | "off" =
accountConfig.streaming === true || accountConfig.streaming === "partial" ? "partial" : "off";
const startupMs = Date.now();
const startupGraceMs = 0;
// Cold starts should ignore old room history, but once we have a persisted
@ -227,6 +229,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
groupPolicy,
replyToMode,
threadReplies,
streaming,
dmEnabled,
dmPolicy,
textLimit,

View File

@ -3,8 +3,18 @@ import type { PluginRuntime, RuntimeEnv } from "../../../runtime-api.js";
import type { MatrixClient } from "../sdk.js";
const sendMessageMatrixMock = vi.hoisted(() => vi.fn().mockResolvedValue({ messageId: "mx-1" }));
const chunkMatrixTextMock = vi.hoisted(() =>
vi.fn((text: string, _opts?: unknown) => ({
trimmedText: text.trim(),
convertedText: text,
singleEventLimit: 4000,
fitsInSingleEvent: true,
chunks: text ? [text] : [],
})),
);
vi.mock("../send.js", () => ({
chunkMatrixText: (text: string, opts?: unknown) => chunkMatrixTextMock(text, opts),
sendMessageMatrix: (to: string, message: string, opts?: unknown) =>
sendMessageMatrixMock(to, message, opts),
}));
@ -48,11 +58,23 @@ describe("deliverMatrixReplies", () => {
beforeEach(() => {
vi.clearAllMocks();
setMatrixRuntime(runtimeStub);
chunkMarkdownTextWithModeMock.mockImplementation((text: string) => [text]);
chunkMatrixTextMock.mockReset().mockImplementation((text: string) => ({
trimmedText: text.trim(),
convertedText: text,
singleEventLimit: 4000,
fitsInSingleEvent: true,
chunks: text ? [text] : [],
}));
});
it("keeps replyToId on first reply only when replyToMode=first", async () => {
chunkMarkdownTextWithModeMock.mockImplementation((text: string) => text.split("|"));
chunkMatrixTextMock.mockImplementation((text: string) => ({
trimmedText: text.trim(),
convertedText: text,
singleEventLimit: 4000,
fitsInSingleEvent: true,
chunks: text.split("|"),
}));
await deliverMatrixReplies({
cfg,
@ -124,7 +146,13 @@ describe("deliverMatrixReplies", () => {
});
it("suppresses replyToId when threadId is set", async () => {
chunkMarkdownTextWithModeMock.mockImplementation((text: string) => text.split("|"));
chunkMatrixTextMock.mockImplementation((text: string) => ({
trimmedText: text.trim(),
convertedText: text,
singleEventLimit: 4000,
fitsInSingleEvent: true,
chunks: text.split("|"),
}));
await deliverMatrixReplies({
cfg,
@ -197,7 +225,11 @@ describe("deliverMatrixReplies", () => {
});
expect(loadConfigMock).not.toHaveBeenCalled();
expect(resolveChunkModeMock).toHaveBeenCalledWith(explicitCfg, "matrix", "ops");
expect(chunkMatrixTextMock).toHaveBeenCalledWith("hello", {
cfg: explicitCfg,
accountId: "ops",
tableMode: "code",
});
expect(sendMessageMatrixMock).toHaveBeenCalledWith(
"room:4",
"hello",
@ -208,4 +240,26 @@ describe("deliverMatrixReplies", () => {
}),
);
});
it("passes raw media captions through to sendMessageMatrix without pre-converting them", async () => {
convertMarkdownTablesMock.mockImplementation((text: string) => `converted:${text}`);
await deliverMatrixReplies({
cfg,
replies: [{ text: "caption", mediaUrl: "https://example.com/a.jpg" }],
roomId: "room:6",
client: {} as MatrixClient,
runtime: runtimeEnv,
textLimit: 4000,
replyToMode: "off",
});
expect(sendMessageMatrixMock).toHaveBeenCalledWith(
"room:6",
"caption",
expect.objectContaining({
mediaUrl: "https://example.com/a.jpg",
}),
);
});
});

View File

@ -6,7 +6,7 @@ import type {
} from "../../runtime-api.js";
import { getMatrixRuntime } from "../../runtime.js";
import type { MatrixClient } from "../sdk.js";
import { sendMessageMatrix } from "../send.js";
import { chunkMatrixText, sendMessageMatrix } from "../send.js";
const THINKING_TAG_RE = /<\s*\/?\s*(?:think(?:ing)?|thought|antthinking)\b[^<>]*>/gi;
const THINKING_BLOCK_RE =
@ -59,8 +59,6 @@ export async function deliverMatrixReplies(params: {
params.runtime.log?.(message);
}
};
const chunkLimit = Math.min(params.textLimit, 4000);
const chunkMode = core.channel.text.resolveChunkMode(params.cfg, "matrix", params.accountId);
let hasReplied = false;
for (const reply of params.replies) {
if (reply.isReasoning === true || shouldSuppressReasoningReplyText(reply.text)) {
@ -79,7 +77,6 @@ export async function deliverMatrixReplies(params: {
const replyToIdRaw = reply.replyToId?.trim();
const replyToId = params.threadId || params.replyToMode === "off" ? undefined : replyToIdRaw;
const rawText = reply.text ?? "";
const text = core.channel.text.convertMarkdownTables(rawText, tableMode);
const mediaList = reply.mediaUrls?.length
? reply.mediaUrls
: reply.mediaUrl
@ -92,11 +89,12 @@ export async function deliverMatrixReplies(params: {
if (mediaList.length === 0) {
let sentTextChunk = false;
for (const chunk of core.channel.text.chunkMarkdownTextWithMode(
text,
chunkLimit,
chunkMode,
)) {
const { chunks } = chunkMatrixText(rawText, {
cfg: params.cfg,
accountId: params.accountId,
tableMode,
});
for (const chunk of chunks) {
const trimmed = chunk.trim();
if (!trimmed) {
continue;
@ -118,7 +116,7 @@ export async function deliverMatrixReplies(params: {
let first = true;
for (const mediaUrl of mediaList) {
const caption = first ? text : "";
const caption = first ? rawText : "";
await sendMessageMatrix(params.roomId, caption, {
client: params.client,
cfg: params.cfg,

View File

@ -17,6 +17,8 @@ const isVoiceCompatibleAudioMock = vi.fn(
const resolveTextChunkLimitMock = vi.fn<
(cfg: unknown, channel: unknown, accountId?: unknown) => number
>(() => 4000);
const resolveMarkdownTableModeMock = vi.fn(() => "code");
const convertMarkdownTablesMock = vi.fn((text: string) => text);
const runtimeStub = {
config: {
@ -37,13 +39,14 @@ const runtimeStub = {
resolveChunkMode: () => "length",
chunkMarkdownText: (text: string) => (text ? [text] : []),
chunkMarkdownTextWithMode: (text: string) => (text ? [text] : []),
resolveMarkdownTableMode: () => "code",
convertMarkdownTables: (text: string) => text,
resolveMarkdownTableMode: () => resolveMarkdownTableModeMock(),
convertMarkdownTables: (text: string) => convertMarkdownTablesMock(text),
},
},
} as unknown as PluginRuntime;
let sendMessageMatrix: typeof import("./send.js").sendMessageMatrix;
let sendSingleTextMessageMatrix: typeof import("./send.js").sendSingleTextMessageMatrix;
let sendTypingMatrix: typeof import("./send.js").sendTypingMatrix;
let voteMatrixPoll: typeof import("./actions/polls.js").voteMatrixPoll;
@ -52,6 +55,7 @@ async function loadMatrixSendModules() {
const runtimeModule = await import("../runtime.js");
runtimeModule.setMatrixRuntime(runtimeStub);
({ sendMessageMatrix } = await import("./send.js"));
({ sendSingleTextMessageMatrix } = await import("./send.js"));
({ sendTypingMatrix } = await import("./send.js"));
({ voteMatrixPoll } = await import("./actions/polls.js"));
}
@ -127,6 +131,8 @@ describe("sendMessageMatrix media", () => {
mediaKindFromMimeMock.mockReset().mockReturnValue("image");
isVoiceCompatibleAudioMock.mockReset().mockReturnValue(false);
resolveTextChunkLimitMock.mockReset().mockReturnValue(4000);
resolveMarkdownTableModeMock.mockReset().mockReturnValue("code");
convertMarkdownTablesMock.mockReset().mockImplementation((text: string) => text);
await loadMatrixSendModules();
});
@ -404,6 +410,29 @@ describe("sendMessageMatrix threads", () => {
});
});
describe("sendSingleTextMessageMatrix", () => {
beforeEach(async () => {
vi.clearAllMocks();
await resetMatrixSendRuntimeMocks();
resolveMarkdownTableModeMock.mockReset().mockReturnValue("code");
convertMarkdownTablesMock.mockReset().mockImplementation((text: string) => text);
});
it("rejects single-event sends when converted text exceeds the Matrix limit", async () => {
const { client, sendMessage } = makeClient();
resolveTextChunkLimitMock.mockReturnValue(5);
convertMarkdownTablesMock.mockImplementation(() => "123456");
await expect(
sendSingleTextMessageMatrix("room:!room:example", "1234", {
client,
}),
).rejects.toThrow("Matrix single-message text exceeds limit");
expect(sendMessage).not.toHaveBeenCalled();
});
});
describe("voteMatrixPoll", () => {
beforeAll(async () => {
await loadMatrixSendModules();

View File

@ -1,4 +1,4 @@
import type { PollInput } from "../runtime-api.js";
import type { MarkdownTableMode, PollInput } from "../runtime-api.js";
import { getMatrixRuntime } from "../runtime.js";
import type { CoreConfig } from "../types.js";
import { buildPollStartContent, M_POLL_START } from "./poll-types.js";
@ -22,6 +22,7 @@ import { normalizeThreadId, resolveMatrixRoomId } from "./send/targets.js";
import {
EventType,
MsgType,
RelationType,
type MatrixOutboundContent,
type MatrixSendOpts,
type MatrixSendResult,
@ -33,6 +34,17 @@ const getCore = () => getMatrixRuntime();
export type { MatrixSendOpts, MatrixSendResult } from "./send/types.js";
export { resolveMatrixRoomId } from "./send/targets.js";
export type MatrixPreparedSingleText = {
trimmedText: string;
convertedText: string;
singleEventLimit: number;
fitsInSingleEvent: boolean;
};
export type MatrixPreparedChunkedText = MatrixPreparedSingleText & {
chunks: string[];
};
type MatrixClientResolveOpts = {
client?: MatrixClient;
cfg?: CoreConfig;
@ -61,6 +73,57 @@ function normalizeMatrixClientResolveOpts(
};
}
export function prepareMatrixSingleText(
text: string,
opts: {
cfg?: CoreConfig;
accountId?: string;
tableMode?: MarkdownTableMode;
} = {},
): MatrixPreparedSingleText {
const trimmedText = text.trim();
const cfg = opts.cfg ?? getCore().config.loadConfig();
const tableMode =
opts.tableMode ??
getCore().channel.text.resolveMarkdownTableMode({
cfg,
channel: "matrix",
accountId: opts.accountId,
});
const convertedText = getCore().channel.text.convertMarkdownTables(trimmedText, tableMode);
const singleEventLimit = Math.min(
getCore().channel.text.resolveTextChunkLimit(cfg, "matrix", opts.accountId),
MATRIX_TEXT_LIMIT,
);
return {
trimmedText,
convertedText,
singleEventLimit,
fitsInSingleEvent: convertedText.length <= singleEventLimit,
};
}
export function chunkMatrixText(
text: string,
opts: {
cfg?: CoreConfig;
accountId?: string;
tableMode?: MarkdownTableMode;
} = {},
): MatrixPreparedChunkedText {
const preparedText = prepareMatrixSingleText(text, opts);
const cfg = opts.cfg ?? getCore().config.loadConfig();
const chunkMode = getCore().channel.text.resolveChunkMode(cfg, "matrix", opts.accountId);
return {
...preparedText,
chunks: getCore().channel.text.chunkMarkdownTextWithMode(
preparedText.convertedText,
preparedText.singleEventLimit,
chunkMode,
),
};
}
export async function sendMessageMatrix(
to: string,
message: string | undefined,
@ -80,23 +143,10 @@ export async function sendMessageMatrix(
async (client) => {
const roomId = await resolveMatrixRoomId(client, to);
const cfg = opts.cfg ?? getCore().config.loadConfig();
const tableMode = getCore().channel.text.resolveMarkdownTableMode({
const { chunks } = chunkMatrixText(trimmedMessage, {
cfg,
channel: "matrix",
accountId: opts.accountId,
});
const convertedMessage = getCore().channel.text.convertMarkdownTables(
trimmedMessage,
tableMode,
);
const textLimit = getCore().channel.text.resolveTextChunkLimit(cfg, "matrix", opts.accountId);
const chunkLimit = Math.min(textLimit, MATRIX_TEXT_LIMIT);
const chunkMode = getCore().channel.text.resolveChunkMode(cfg, "matrix", opts.accountId);
const chunks = getCore().channel.text.chunkMarkdownTextWithMode(
convertedMessage,
chunkLimit,
chunkMode,
);
const threadId = normalizeThreadId(opts.threadId);
const relation = threadId
? buildThreadRelation(threadId, opts.replyToId)
@ -256,6 +306,109 @@ export async function sendReadReceiptMatrix(
});
}
export async function sendSingleTextMessageMatrix(
roomId: string,
text: string,
opts: {
client?: MatrixClient;
cfg?: CoreConfig;
replyToId?: string;
threadId?: string;
accountId?: string;
} = {},
): Promise<MatrixSendResult> {
const { trimmedText, convertedText, singleEventLimit, fitsInSingleEvent } =
prepareMatrixSingleText(text, {
cfg: opts.cfg,
accountId: opts.accountId,
});
if (!trimmedText) {
throw new Error("Matrix single-message send requires text");
}
if (!fitsInSingleEvent) {
throw new Error(
`Matrix single-message text exceeds limit (${convertedText.length} > ${singleEventLimit})`,
);
}
return await withResolvedMatrixClient(
{
client: opts.client,
cfg: opts.cfg,
accountId: opts.accountId,
},
async (client) => {
const resolvedRoom = await resolveMatrixRoomId(client, roomId);
const normalizedThreadId = normalizeThreadId(opts.threadId);
const relation = normalizedThreadId
? buildThreadRelation(normalizedThreadId, opts.replyToId)
: buildReplyRelation(opts.replyToId);
const content = buildTextContent(convertedText, relation);
const eventId = await client.sendMessage(resolvedRoom, content);
return {
messageId: eventId ?? "unknown",
roomId: resolvedRoom,
};
},
);
}
export async function editMessageMatrix(
roomId: string,
originalEventId: string,
newText: string,
opts: {
client?: MatrixClient;
cfg?: CoreConfig;
threadId?: string;
accountId?: string;
} = {},
): Promise<string> {
return await withResolvedMatrixClient(
{
client: opts.client,
cfg: opts.cfg,
accountId: opts.accountId,
},
async (client) => {
const resolvedRoom = await resolveMatrixRoomId(client, roomId);
const cfg = opts.cfg ?? getCore().config.loadConfig();
const tableMode = getCore().channel.text.resolveMarkdownTableMode({
cfg,
channel: "matrix",
accountId: opts.accountId,
});
const convertedText = getCore().channel.text.convertMarkdownTables(newText, tableMode);
const newContent = buildTextContent(convertedText);
const replaceRelation: Record<string, unknown> = {
rel_type: RelationType.Replace,
event_id: originalEventId,
};
const threadId = normalizeThreadId(opts.threadId);
if (threadId) {
// Thread-aware replace: Synapse needs the thread context to keep the
// edited event visible in the thread timeline.
replaceRelation["m.in_reply_to"] = { event_id: threadId };
}
// Spread newContent into the outer event so clients that don't support
// m.new_content still see properly formatted text (with HTML).
const content: Record<string, unknown> = {
...newContent,
body: `* ${convertedText}`,
...(typeof newContent.formatted_body === "string"
? { formatted_body: `* ${newContent.formatted_body}` }
: {}),
"m.new_content": newContent,
"m.relates_to": replaceRelation,
};
const eventId = await client.sendMessage(resolvedRoom, content);
return eventId ?? "";
},
);
}
export async function reactMatrixMessage(
roomId: string,
messageId: string,

View File

@ -133,6 +133,14 @@ export type MatrixConfig = {
rooms?: Record<string, MatrixRoomConfig>;
/** Per-action tool gating (default: true for all). */
actions?: MatrixActionConfig;
/**
* Streaming mode for Matrix replies.
* - `"partial"`: edit a single message in place as the model generates text.
* - `"off"`: deliver the full reply once the model finishes.
* - `true` maps to `"partial"`, `false` maps to `"off"`.
* Default: `"off"`.
*/
streaming?: "partial" | "off" | boolean;
};
export type CoreConfig = {

View File

@ -8532,6 +8532,17 @@ export const GENERATED_BUNDLED_PLUGIN_METADATA = [
type: "string",
enum: ["open", "disabled", "allowlist"],
},
streaming: {
anyOf: [
{
type: "string",
enum: ["partial", "off"],
},
{
type: "boolean",
},
],
},
replyToMode: {
type: "string",
enum: ["off", "first", "all"],