Matrix: restore ordered progress delivery with explicit streaming modes (#59266)

Merged via squash.

Prepared head SHA: 523623b7e1
Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com>
Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com>
Reviewed-by: @gumadeiras
This commit is contained in:
Gustavo Madeira Santana 2026-04-01 19:35:03 -04:00 committed by GitHub
parent 91a7505af6
commit 560ea25294
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
26 changed files with 704 additions and 179 deletions

View File

@ -26,6 +26,7 @@ Docs: https://docs.openclaw.ai
- Agents/failover: classify AbortError and stream-abort messages as timeout so Ollama NDJSON stream aborts stop showing `reason=unknown` in model fallback logs. (#58324) Thanks @yelog
- Exec approvals: route Slack, Discord, and Telegram approvals through the shared channel approval-capability path so native approval auth, delivery, and `/approve` handling stay aligned across channels while preserving Telegram session-key agent filtering. (#58634) thanks @gumadeiras
- Matrix/runtime: resolve the verification/bootstrap runtime from a distinct packaged Matrix entry so global npm installs stop failing on crypto bootstrap with missing-module or recursive runtime alias errors. (#59249) Thanks @gumadeiras.
- Matrix/streaming: preserve ordered block flushes before tool, message, and agent boundaries, add explicit `channels.matrix.blockStreaming` opt-in so Matrix `streaming: "off"` stays final-only by default, and move MiniMax plain-text final handling into the MiniMax provider runtime instead of the shared core heuristic. (#59266) thanks @gumadeiras
## 2026.4.2

View File

@ -184,6 +184,8 @@ done:
- `streaming: "off"` is the default. OpenClaw waits for the final reply and sends it once.
- `streaming: "partial"` creates one editable preview message instead of sending multiple partial messages.
- `blockStreaming: true` enables separate Matrix progress messages instead of final-only delivery when `streaming` is off.
- When `streaming: "partial"`, Matrix disables shared block streaming so draft edits do not double-send.
- If the preview no longer fits in one Matrix event, OpenClaw stops preview streaming and falls back to normal final delivery.
- Media replies still send attachments normally. If a stale preview can no longer be reused safely, OpenClaw redacts it before sending the final media reply.
- Preview edits cost extra Matrix API calls. Leave streaming off if you want the most conservative rate-limit behavior.
@ -752,6 +754,7 @@ Live directory lookup uses the logged-in Matrix account:
- `historyLimit`: max room messages to include as group history context. Falls back to `messages.groupChat.historyLimit`. Set `0` to disable.
- `replyToMode`: `off`, `first`, or `all`.
- `streaming`: `off` (default) or `partial`. `partial` enables single-message draft previews with edit-in-place updates.
- `blockStreaming`: `true` enables separate progress messages; when unset, Matrix keeps `streaming: "off"` as final-only delivery.
- `threadReplies`: `off`, `inbound`, or `always`.
- `threadBindings`: per-channel overrides for thread-bound session routing and lifecycle.
- `startupVerification`: automatic self-verification request mode on startup (`if-unverified`, `off`).

View File

@ -65,6 +65,7 @@ export const MatrixConfigSchema = z.object({
allowlistOnly: z.boolean().optional(),
allowBots: z.union([z.boolean(), z.literal("mentions")]).optional(),
groupPolicy: GroupPolicySchema.optional(),
blockStreaming: z.boolean().optional(),
streaming: z.union([z.enum(["partial", "off"]), z.boolean()]).optional(),
replyToMode: z.enum(["off", "first", "all"]).optional(),
threadReplies: z.enum(["off", "inbound", "always"]).optional(),

View File

@ -32,6 +32,7 @@ type MatrixHandlerTestHarnessOptions = {
threadReplies?: "off" | "inbound" | "always";
dmThreadReplies?: "off" | "inbound" | "always";
streaming?: "partial" | "off";
blockStreamingEnabled?: boolean;
dmEnabled?: boolean;
dmPolicy?: "pairing" | "allowlist" | "open" | "disabled";
textLimit?: number;
@ -214,6 +215,7 @@ export function createMatrixHandlerTestHarness(
threadReplies: options.threadReplies ?? "inbound",
dmThreadReplies: options.dmThreadReplies,
streaming: options.streaming ?? "off",
blockStreamingEnabled: options.blockStreamingEnabled ?? false,
dmEnabled: options.dmEnabled ?? true,
dmPolicy: options.dmPolicy ?? "open",
textLimit: options.textLimit ?? 8_000,

View File

@ -937,6 +937,7 @@ describe("matrix monitor handler pairing account scope", () => {
replyToMode: "off",
threadReplies: "inbound",
streaming: "off",
blockStreamingEnabled: false,
dmEnabled: true,
dmPolicy: "open",
textLimit: 8_000,
@ -1849,3 +1850,91 @@ describe("matrix monitor handler draft streaming", () => {
await finish();
});
});
describe("matrix monitor handler block streaming config", () => {
it("keeps final-only delivery when draft streaming is off by default", async () => {
let capturedDisableBlockStreaming: boolean | undefined;
const { handler } = createMatrixHandlerTestHarness({
streaming: "off",
dispatchReplyFromConfig: vi.fn(
async (args: { replyOptions?: { disableBlockStreaming?: boolean } }) => {
capturedDisableBlockStreaming = args.replyOptions?.disableBlockStreaming;
return { queuedFinal: false, counts: { final: 0, block: 0, tool: 0 } };
},
) as never,
});
await handler(
"!room:example.org",
createMatrixTextMessageEvent({ eventId: "$msg1", body: "hello" }),
);
expect(capturedDisableBlockStreaming).toBe(true);
});
it("disables shared block streaming when draft streaming is partial", async () => {
let capturedDisableBlockStreaming: boolean | undefined;
const { handler } = createMatrixHandlerTestHarness({
streaming: "partial",
dispatchReplyFromConfig: vi.fn(
async (args: { replyOptions?: { disableBlockStreaming?: boolean } }) => {
capturedDisableBlockStreaming = args.replyOptions?.disableBlockStreaming;
return { queuedFinal: false, counts: { final: 0, block: 0, tool: 0 } };
},
) as never,
});
await handler(
"!room:example.org",
createMatrixTextMessageEvent({ eventId: "$msg1", body: "hello" }),
);
expect(capturedDisableBlockStreaming).toBe(true);
});
it("keeps draft streaming authoritative when partial and block streaming are both enabled", async () => {
let capturedDisableBlockStreaming: boolean | undefined;
const { handler } = createMatrixHandlerTestHarness({
streaming: "partial",
blockStreamingEnabled: true,
dispatchReplyFromConfig: vi.fn(
async (args: { replyOptions?: { disableBlockStreaming?: boolean } }) => {
capturedDisableBlockStreaming = args.replyOptions?.disableBlockStreaming;
return { queuedFinal: false, counts: { final: 0, block: 0, tool: 0 } };
},
) as never,
});
await handler(
"!room:example.org",
createMatrixTextMessageEvent({ eventId: "$msg1", body: "hello" }),
);
expect(capturedDisableBlockStreaming).toBe(true);
});
it("uses shared block streaming when explicitly enabled for Matrix", async () => {
let capturedDisableBlockStreaming: boolean | undefined;
const { handler } = createMatrixHandlerTestHarness({
streaming: "off",
blockStreamingEnabled: true,
dispatchReplyFromConfig: vi.fn(
async (args: { replyOptions?: { disableBlockStreaming?: boolean } }) => {
capturedDisableBlockStreaming = args.replyOptions?.disableBlockStreaming;
return { queuedFinal: false, counts: { final: 0, block: 0, tool: 0 } };
},
) as never,
});
await handler(
"!room:example.org",
createMatrixTextMessageEvent({ eventId: "$msg1", body: "hello" }),
);
expect(capturedDisableBlockStreaming).toBe(false);
});
});

View File

@ -84,6 +84,7 @@ export type MatrixMonitorHandlerParams = {
/** DM-specific threadReplies override. Falls back to threadReplies when absent. */
dmThreadReplies?: "off" | "inbound" | "always";
streaming: "partial" | "off";
blockStreamingEnabled: boolean;
dmEnabled: boolean;
dmPolicy: "open" | "pairing" | "allowlist" | "disabled";
textLimit: number;
@ -201,6 +202,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
threadReplies,
dmThreadReplies,
streaming,
blockStreamingEnabled,
dmEnabled,
dmPolicy,
textLimit,
@ -1127,10 +1129,10 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
});
},
});
const streamingEnabled = streaming === "partial";
const draftStreamingEnabled = streaming === "partial";
const draftReplyToId = replyToMode !== "off" && !threadTarget ? _messageId : undefined;
let currentDraftReplyToId = draftReplyToId;
const draftStream = streamingEnabled
const draftStream = draftStreamingEnabled
? createMatrixDraftStream({
roomId,
client,
@ -1350,9 +1352,11 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
replyOptions: {
...replyOptions,
skillFilter: roomConfig?.skills,
// When streaming is active, disable block streaming — draft
// streaming replaces it with edit-in-place updates.
disableBlockStreaming: streamingEnabled ? true : undefined,
// Matrix expects explicit assistant progress updates as
// separate messages only when block streaming is explicitly
// enabled. Partial draft streaming still disables the shared
// block pipeline so draft edits do not double-send.
disableBlockStreaming: draftStream ? true : !blockStreamingEnabled,
onPartialReply: draftStream
? (payload) => {
const fullText = payload.text ?? "";

View File

@ -210,6 +210,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
const mediaMaxBytes = Math.max(1, mediaMaxMb) * 1024 * 1024;
const streaming: "partial" | "off" =
accountConfig.streaming === true || accountConfig.streaming === "partial" ? "partial" : "off";
const blockStreamingEnabled = accountConfig.blockStreaming === true;
const startupMs = Date.now();
const startupGraceMs = 0;
// Cold starts should ignore old room history, but once we have a persisted
@ -265,6 +266,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
threadReplies,
dmThreadReplies,
streaming,
blockStreamingEnabled,
dmEnabled,
dmPolicy,
textLimit,

View File

@ -39,6 +39,7 @@ const MATRIX_SINGLE_ACCOUNT_KEYS_TO_MOVE = new Set([
"encryption",
"allowlistOnly",
"allowBots",
"blockStreaming",
"replyToMode",
"threadReplies",
"textChunkLimit",
@ -59,6 +60,9 @@ const MATRIX_SINGLE_ACCOUNT_KEYS_TO_MOVE = new Set([
"actions",
]);
const MATRIX_NAMED_ACCOUNT_PROMOTION_KEYS = new Set([
// When named accounts already exist, only move auth/bootstrap fields into the
// promoted account. Delivery-policy fields stay at the top level so they
// remain shared inherited defaults for every account.
"name",
"homeserver",
"userId",

View File

@ -104,4 +104,45 @@ describe("matrixSetupAdapter", () => {
proxy: "http://127.0.0.1:7890",
});
});
it("keeps top-level block streaming as a shared default when named accounts already exist", () => {
const cfg = {
channels: {
matrix: {
homeserver: "https://matrix.example.org",
userId: "@default:example.org",
accessToken: "default-token",
blockStreaming: true,
accounts: {
support: {
homeserver: "https://matrix.example.org",
userId: "@support:example.org",
accessToken: "support-token",
},
},
},
},
} as CoreConfig;
const next = matrixSetupAdapter.applyAccountConfig({
cfg,
accountId: "ops",
input: {
name: "Ops",
homeserver: "https://matrix.example.org",
userId: "@ops:example.org",
accessToken: "ops-token",
},
}) as CoreConfig;
expect(next.channels?.matrix?.blockStreaming).toBe(true);
expect(next.channels?.matrix?.accounts?.ops).toMatchObject({
name: "Ops",
enabled: true,
homeserver: "https://matrix.example.org",
userId: "@ops:example.org",
accessToken: "ops-token",
});
expect(next.channels?.matrix?.accounts?.ops?.blockStreaming).toBeUndefined();
});
});

View File

@ -101,6 +101,13 @@ export type MatrixConfig = {
allowBots?: boolean | "mentions";
/** Group message policy (default: allowlist). */
groupPolicy?: GroupPolicy;
/**
* Enable shared block-streaming replies for Matrix.
*
* Default: false. Matrix keeps `streaming: "off"` as final-only delivery
* unless block streaming is explicitly enabled.
*/
blockStreaming?: boolean;
/** Allowlist for group senders (matrix user IDs). */
groupAllowFrom?: Array<string | number>;
/** Control reply threading when reply tags are present (off|first|all). */
@ -149,6 +156,8 @@ export type MatrixConfig = {
* Streaming mode for Matrix replies.
* - `"partial"`: edit a single message in place as the model generates text.
* - `"off"`: deliver the full reply once the model finishes.
* - Use `blockStreaming: true` when you want separate progress messages
* while `streaming` remains `"off"`.
* - `true` maps to `"partial"`, `false` maps to `"off"`.
* Default: `"off"`.
*/

View File

@ -166,7 +166,9 @@ export default definePluginEntry({
id: API_PROVIDER_ID,
label: PROVIDER_LABEL,
docsPath: "/providers/minimax",
aliases: ["minimax-cn"],
envVars: ["MINIMAX_API_KEY"],
resolveReasoningOutputMode: () => "native",
auth: [
createProviderApiKeyAuthMethod({
providerId: API_PROVIDER_ID,
@ -240,6 +242,7 @@ export default definePluginEntry({
label: PROVIDER_LABEL,
docsPath: "/providers/minimax",
envVars: ["MINIMAX_OAUTH_TOKEN", "MINIMAX_API_KEY"],
resolveReasoningOutputMode: () => "native",
catalog: {
run: async (ctx) => resolvePortalCatalog(ctx),
},

View File

@ -43,7 +43,7 @@ function createContext(
}
describe("handleAgentEnd", () => {
it("logs the resolved error message when run ends with assistant error", () => {
it("logs the resolved error message when run ends with assistant error", async () => {
const onAgentEvent = vi.fn();
const ctx = createContext(
{
@ -55,7 +55,7 @@ describe("handleAgentEnd", () => {
{ onAgentEvent },
);
handleAgentEnd(ctx);
await handleAgentEnd(ctx);
const warn = vi.mocked(ctx.log.warn);
expect(warn).toHaveBeenCalledTimes(1);
@ -77,7 +77,7 @@ describe("handleAgentEnd", () => {
});
});
it("attaches raw provider error metadata and includes model/provider in console output", () => {
it("attaches raw provider error metadata and includes model/provider in console output", async () => {
const ctx = createContext({
role: "assistant",
stopReason: "error",
@ -87,7 +87,7 @@ describe("handleAgentEnd", () => {
content: [{ type: "text", text: "" }],
});
handleAgentEnd(ctx);
await handleAgentEnd(ctx);
const warn = vi.mocked(ctx.log.warn);
expect(warn).toHaveBeenCalledTimes(1);
@ -103,7 +103,7 @@ describe("handleAgentEnd", () => {
});
});
it("sanitizes model and provider before writing consoleMessage", () => {
it("sanitizes model and provider before writing consoleMessage", async () => {
const ctx = createContext({
role: "assistant",
stopReason: "error",
@ -113,7 +113,7 @@ describe("handleAgentEnd", () => {
content: [{ type: "text", text: "" }],
});
handleAgentEnd(ctx);
await handleAgentEnd(ctx);
const warn = vi.mocked(ctx.log.warn);
const meta = warn.mock.calls[0]?.[1];
@ -127,7 +127,7 @@ describe("handleAgentEnd", () => {
expect(meta?.consoleMessage).not.toContain("\u001b");
});
it("redacts logged error text before emitting lifecycle events", () => {
it("redacts logged error text before emitting lifecycle events", async () => {
const onAgentEvent = vi.fn();
const ctx = createContext(
{
@ -139,7 +139,7 @@ describe("handleAgentEnd", () => {
{ onAgentEvent },
);
handleAgentEnd(ctx);
await handleAgentEnd(ctx);
const warn = vi.mocked(ctx.log.warn);
expect(warn.mock.calls[0]?.[1]).toMatchObject({
@ -156,21 +156,21 @@ describe("handleAgentEnd", () => {
});
});
it("keeps non-error run-end logging on debug only", () => {
it("keeps non-error run-end logging on debug only", async () => {
const ctx = createContext(undefined);
handleAgentEnd(ctx);
await handleAgentEnd(ctx);
expect(ctx.log.warn).not.toHaveBeenCalled();
expect(ctx.log.debug).toHaveBeenCalledWith("embedded run agent end: runId=run-1 isError=false");
});
it("flushes orphaned tool media as a media-only block reply", () => {
it("flushes orphaned tool media as a media-only block reply", async () => {
const ctx = createContext(undefined);
ctx.state.pendingToolMediaUrls = ["/tmp/reply.opus"];
ctx.state.pendingToolAudioAsVoice = true;
handleAgentEnd(ctx);
await handleAgentEnd(ctx);
expect(ctx.emitBlockReply).toHaveBeenCalledWith({
mediaUrls: ["/tmp/reply.opus"],

View File

@ -11,6 +11,7 @@ import {
hasAssistantVisibleReply,
} from "./pi-embedded-subscribe.handlers.messages.js";
import type { EmbeddedPiSubscribeContext } from "./pi-embedded-subscribe.handlers.types.js";
import { isPromiseLike } from "./pi-embedded-subscribe.promise.js";
import { isAssistantMessage } from "./pi-embedded-utils.js";
export {
@ -100,24 +101,55 @@ export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext) {
});
}
ctx.flushBlockReplyBuffer();
const pendingToolMediaReply = consumePendingToolMediaReply(ctx.state);
if (pendingToolMediaReply && hasAssistantVisibleReply(pendingToolMediaReply)) {
ctx.emitBlockReply(pendingToolMediaReply);
}
// Flush the reply pipeline so the response reaches the channel before
// compaction wait blocks the run. This mirrors the pattern used by
// handleToolExecutionStart and ensures delivery is not held hostage to
// long-running compaction (#35074).
void ctx.params.onBlockReplyFlush?.();
const finalizeAgentEnd = () => {
ctx.state.blockState.thinking = false;
ctx.state.blockState.final = false;
ctx.state.blockState.inlineCode = createInlineCodeState();
ctx.state.blockState.thinking = false;
ctx.state.blockState.final = false;
ctx.state.blockState.inlineCode = createInlineCodeState();
if (ctx.state.pendingCompactionRetry > 0) {
ctx.resolveCompactionRetry();
} else {
ctx.maybeResolveCompactionWait();
}
};
if (ctx.state.pendingCompactionRetry > 0) {
ctx.resolveCompactionRetry();
} else {
ctx.maybeResolveCompactionWait();
const flushPendingMediaAndChannel = () => {
const pendingToolMediaReply = consumePendingToolMediaReply(ctx.state);
if (pendingToolMediaReply && hasAssistantVisibleReply(pendingToolMediaReply)) {
ctx.emitBlockReply(pendingToolMediaReply);
}
const postMediaFlushResult = ctx.flushBlockReplyBuffer();
if (isPromiseLike<void>(postMediaFlushResult)) {
return postMediaFlushResult.then(() => {
const onBlockReplyFlushResult = ctx.params.onBlockReplyFlush?.();
if (isPromiseLike<void>(onBlockReplyFlushResult)) {
return onBlockReplyFlushResult;
}
});
}
const onBlockReplyFlushResult = ctx.params.onBlockReplyFlush?.();
if (isPromiseLike<void>(onBlockReplyFlushResult)) {
return onBlockReplyFlushResult;
}
};
const flushBlockReplyBufferResult = ctx.flushBlockReplyBuffer();
if (isPromiseLike<void>(flushBlockReplyBufferResult)) {
return flushBlockReplyBufferResult
.then(() => flushPendingMediaAndChannel())
.finally(() => {
finalizeAgentEnd();
});
}
const flushPendingMediaAndChannelResult = flushPendingMediaAndChannel();
if (isPromiseLike<void>(flushPendingMediaAndChannelResult)) {
return flushPendingMediaAndChannelResult.finally(() => {
finalizeAgentEnd();
});
}
finalizeAgentEnd();
}

View File

@ -1,11 +1,14 @@
import { describe, expect, it } from "vitest";
import { describe, expect, it, vi } from "vitest";
import { createInlineCodeState } from "../markdown/code-spans.js";
import {
buildAssistantStreamData,
consumePendingToolMediaIntoReply,
consumePendingToolMediaReply,
handleMessageUpdate,
hasAssistantVisibleReply,
resolveSilentReplyFallbackText,
} from "./pi-embedded-subscribe.handlers.messages.js";
import type { EmbeddedPiSubscribeContext } from "./pi-embedded-subscribe.handlers.types.js";
describe("resolveSilentReplyFallbackText", () => {
it("replaces NO_REPLY with latest messaging tool text when available", () => {
@ -135,3 +138,48 @@ describe("consumePendingToolMediaReply", () => {
expect(state.pendingToolAudioAsVoice).toBe(false);
});
});
describe("handleMessageUpdate", () => {
it("contains synchronous text_end flush failures", async () => {
const debug = vi.fn();
const ctx = {
params: {
runId: "run-1",
session: { id: "session-1" },
},
state: {
deterministicApprovalPromptSent: false,
reasoningStreamOpen: false,
streamReasoning: false,
deltaBuffer: "",
blockBuffer: "",
partialBlockState: {
thinking: false,
final: false,
inlineCode: createInlineCodeState(),
},
lastStreamedAssistantCleaned: undefined,
emittedAssistantUpdate: false,
shouldEmitPartialReplies: false,
blockReplyBreak: "text_end",
},
log: { debug },
noteLastAssistant: vi.fn(),
stripBlockTags: (text: string) => text,
consumePartialReplyDirectives: vi.fn(() => null),
flushBlockReplyBuffer: vi.fn(() => {
throw new Error("boom");
}),
} as unknown as EmbeddedPiSubscribeContext;
handleMessageUpdate(ctx, {
type: "message_update",
message: { role: "assistant", content: [] },
assistantMessageEvent: { type: "text_end" },
} as never);
await vi.waitFor(() => {
expect(debug).toHaveBeenCalledWith("text_end block reply flush failed: Error: boom");
});
});
});

View File

@ -13,6 +13,7 @@ import type {
EmbeddedPiSubscribeContext,
EmbeddedPiSubscribeState,
} from "./pi-embedded-subscribe.handlers.types.js";
import { isPromiseLike } from "./pi-embedded-subscribe.promise.js";
import { appendRawStream } from "./pi-embedded-subscribe.raw-stream.js";
import {
extractAssistantText,
@ -367,7 +368,11 @@ export function handleMessageUpdate(
evtType === "text_end" &&
ctx.state.blockReplyBreak === "text_end"
) {
ctx.flushBlockReplyBuffer();
void Promise.resolve()
.then(() => ctx.flushBlockReplyBuffer())
.catch((err) => {
ctx.log.debug(`text_end block reply flush failed: ${String(err)}`);
});
}
}
@ -457,16 +462,6 @@ export function handleMessageEnd(
});
const onBlockReply = ctx.params.onBlockReply;
const emitBlockReplySafely = (payload: Parameters<NonNullable<typeof onBlockReply>>[0]) => {
if (!onBlockReply) {
return;
}
void Promise.resolve()
.then(() => onBlockReply(payload))
.catch((err) => {
ctx.log.warn(`block reply callback failed: ${String(err)}`);
});
};
const shouldEmitReasoning = Boolean(
!ctx.params.silentExpected &&
ctx.state.includeReasoning &&
@ -481,7 +476,7 @@ export function handleMessageEnd(
return;
}
ctx.state.lastReasoningSent = formattedReasoning;
emitBlockReplySafely({ text: formattedReasoning, isReasoning: true });
ctx.emitBlockReply({ text: formattedReasoning, isReasoning: true });
};
if (shouldEmitReasoningBeforeAnswer) {
@ -555,21 +550,43 @@ export function handleMessageEnd(
emitSplitResultAsBlockReply(ctx.consumeReplyDirectives("", { final: true }));
}
const finalizeMessageEnd = () => {
ctx.state.deltaBuffer = "";
ctx.state.blockBuffer = "";
ctx.blockChunker?.reset();
ctx.state.blockState.thinking = false;
ctx.state.blockState.final = false;
ctx.state.blockState.inlineCode = createInlineCodeState();
ctx.state.lastStreamedAssistant = undefined;
ctx.state.lastStreamedAssistantCleaned = undefined;
ctx.state.reasoningStreamOpen = false;
};
if (
!ctx.params.silentExpected &&
ctx.state.blockReplyBreak === "message_end" &&
ctx.params.onBlockReplyFlush
) {
void ctx.params.onBlockReplyFlush();
const flushBlockReplyBufferResult = ctx.flushBlockReplyBuffer();
if (isPromiseLike<void>(flushBlockReplyBufferResult)) {
return flushBlockReplyBufferResult
.then(() => {
const onBlockReplyFlushResult = ctx.params.onBlockReplyFlush?.();
if (isPromiseLike<void>(onBlockReplyFlushResult)) {
return onBlockReplyFlushResult;
}
})
.finally(() => {
finalizeMessageEnd();
});
}
const onBlockReplyFlushResult = ctx.params.onBlockReplyFlush();
if (isPromiseLike<void>(onBlockReplyFlushResult)) {
return onBlockReplyFlushResult.finally(() => {
finalizeMessageEnd();
});
}
}
ctx.state.deltaBuffer = "";
ctx.state.blockBuffer = "";
ctx.blockChunker?.reset();
ctx.state.blockState.thinking = false;
ctx.state.blockState.final = false;
ctx.state.blockState.inlineCode = createInlineCodeState();
ctx.state.lastStreamedAssistant = undefined;
ctx.state.lastStreamedAssistantCleaned = undefined;
ctx.state.reasoningStreamOpen = false;
finalizeMessageEnd();
}

View File

@ -12,6 +12,7 @@ import type {
ToolCallSummary,
ToolHandlerContext,
} from "./pi-embedded-subscribe.handlers.types.js";
import { isPromiseLike } from "./pi-embedded-subscribe.promise.js";
import {
extractToolResultMediaArtifact,
extractMessagingToolSend,
@ -326,96 +327,109 @@ async function emitToolResultOutput(params: {
});
}
export async function handleToolExecutionStart(
export function handleToolExecutionStart(
ctx: ToolHandlerContext,
evt: AgentEvent & { toolName: string; toolCallId: string; args: unknown },
) {
const continueAfterBlockReplyFlush = () => {
const onBlockReplyFlushResult = ctx.params.onBlockReplyFlush?.();
if (isPromiseLike<void>(onBlockReplyFlushResult)) {
return onBlockReplyFlushResult.then(() => {
continueToolExecutionStart();
});
}
continueToolExecutionStart();
};
const continueToolExecutionStart = () => {
const rawToolName = String(evt.toolName);
const toolName = normalizeToolName(rawToolName);
const toolCallId = String(evt.toolCallId);
const args = evt.args;
const runId = ctx.params.runId;
// Track start time and args for after_tool_call hook
toolStartData.set(buildToolStartKey(runId, toolCallId), { startTime: Date.now(), args });
if (toolName === "read") {
const record = args && typeof args === "object" ? (args as Record<string, unknown>) : {};
const filePathValue =
typeof record.path === "string"
? record.path
: typeof record.file_path === "string"
? record.file_path
: "";
const filePath = filePathValue.trim();
if (!filePath) {
const argsPreview = typeof args === "string" ? args.slice(0, 200) : undefined;
ctx.log.warn(
`read tool called without path: toolCallId=${toolCallId} argsType=${typeof args}${argsPreview ? ` argsPreview=${argsPreview}` : ""}`,
);
}
}
const meta = extendExecMeta(toolName, args, inferToolMetaFromArgs(toolName, args));
ctx.state.toolMetaById.set(toolCallId, buildToolCallSummary(toolName, args, meta));
ctx.log.debug(
`embedded run tool start: runId=${ctx.params.runId} tool=${toolName} toolCallId=${toolCallId}`,
);
const shouldEmitToolEvents = ctx.shouldEmitToolResult();
emitAgentEvent({
runId: ctx.params.runId,
stream: "tool",
data: {
phase: "start",
name: toolName,
toolCallId,
args: args as Record<string, unknown>,
},
});
// Best-effort typing signal; do not block tool summaries on slow emitters.
void ctx.params.onAgentEvent?.({
stream: "tool",
data: { phase: "start", name: toolName, toolCallId },
});
if (
ctx.params.onToolResult &&
shouldEmitToolEvents &&
!ctx.state.toolSummaryById.has(toolCallId)
) {
ctx.state.toolSummaryById.add(toolCallId);
ctx.emitToolSummary(toolName, meta);
}
// Track messaging tool sends (pending until confirmed in tool_execution_end).
if (isMessagingTool(toolName)) {
const argsRecord = args && typeof args === "object" ? (args as Record<string, unknown>) : {};
const isMessagingSend = isMessagingToolSendAction(toolName, argsRecord);
if (isMessagingSend) {
const sendTarget = extractMessagingToolSend(toolName, argsRecord);
if (sendTarget) {
ctx.state.pendingMessagingTargets.set(toolCallId, sendTarget);
}
// Field names vary by tool: Discord/Slack use "content", sessions_send uses "message"
const text = (argsRecord.content as string) ?? (argsRecord.message as string);
if (text && typeof text === "string") {
ctx.state.pendingMessagingTexts.set(toolCallId, text);
ctx.log.debug(`Tracking pending messaging text: tool=${toolName} len=${text.length}`);
}
// Track media URLs from messaging tool args (pending until tool_execution_end).
const mediaUrls = collectMessagingMediaUrlsFromRecord(argsRecord);
if (mediaUrls.length > 0) {
ctx.state.pendingMessagingMediaUrls.set(toolCallId, mediaUrls);
}
}
}
};
// Flush pending block replies to preserve message boundaries before tool execution.
ctx.flushBlockReplyBuffer();
if (ctx.params.onBlockReplyFlush) {
await ctx.params.onBlockReplyFlush();
}
const rawToolName = String(evt.toolName);
const toolName = normalizeToolName(rawToolName);
const toolCallId = String(evt.toolCallId);
const args = evt.args;
const runId = ctx.params.runId;
// Track start time and args for after_tool_call hook
toolStartData.set(buildToolStartKey(runId, toolCallId), { startTime: Date.now(), args });
if (toolName === "read") {
const record = args && typeof args === "object" ? (args as Record<string, unknown>) : {};
const filePathValue =
typeof record.path === "string"
? record.path
: typeof record.file_path === "string"
? record.file_path
: "";
const filePath = filePathValue.trim();
if (!filePath) {
const argsPreview = typeof args === "string" ? args.slice(0, 200) : undefined;
ctx.log.warn(
`read tool called without path: toolCallId=${toolCallId} argsType=${typeof args}${argsPreview ? ` argsPreview=${argsPreview}` : ""}`,
);
}
}
const meta = extendExecMeta(toolName, args, inferToolMetaFromArgs(toolName, args));
ctx.state.toolMetaById.set(toolCallId, buildToolCallSummary(toolName, args, meta));
ctx.log.debug(
`embedded run tool start: runId=${ctx.params.runId} tool=${toolName} toolCallId=${toolCallId}`,
);
const shouldEmitToolEvents = ctx.shouldEmitToolResult();
emitAgentEvent({
runId: ctx.params.runId,
stream: "tool",
data: {
phase: "start",
name: toolName,
toolCallId,
args: args as Record<string, unknown>,
},
});
// Best-effort typing signal; do not block tool summaries on slow emitters.
void ctx.params.onAgentEvent?.({
stream: "tool",
data: { phase: "start", name: toolName, toolCallId },
});
if (
ctx.params.onToolResult &&
shouldEmitToolEvents &&
!ctx.state.toolSummaryById.has(toolCallId)
) {
ctx.state.toolSummaryById.add(toolCallId);
ctx.emitToolSummary(toolName, meta);
}
// Track messaging tool sends (pending until confirmed in tool_execution_end).
if (isMessagingTool(toolName)) {
const argsRecord = args && typeof args === "object" ? (args as Record<string, unknown>) : {};
const isMessagingSend = isMessagingToolSendAction(toolName, argsRecord);
if (isMessagingSend) {
const sendTarget = extractMessagingToolSend(toolName, argsRecord);
if (sendTarget) {
ctx.state.pendingMessagingTargets.set(toolCallId, sendTarget);
}
// Field names vary by tool: Discord/Slack use "content", sessions_send uses "message"
const text = (argsRecord.content as string) ?? (argsRecord.message as string);
if (text && typeof text === "string") {
ctx.state.pendingMessagingTexts.set(toolCallId, text);
ctx.log.debug(`Tracking pending messaging text: tool=${toolName} len=${text.length}`);
}
// Track media URLs from messaging tool args (pending until tool_execution_end).
const mediaUrls = collectMessagingMediaUrlsFromRecord(argsRecord);
if (mediaUrls.length > 0) {
ctx.state.pendingMessagingMediaUrls.set(toolCallId, mediaUrls);
}
}
const flushBlockReplyBufferResult = ctx.flushBlockReplyBuffer();
if (isPromiseLike<void>(flushBlockReplyBufferResult)) {
return flushBlockReplyBufferResult.then(() => continueAfterBlockReplyFlush());
}
return continueAfterBlockReplyFlush();
}
export function handleToolExecutionUpdate(

View File

@ -18,46 +18,115 @@ import type {
EmbeddedPiSubscribeContext,
EmbeddedPiSubscribeEvent,
} from "./pi-embedded-subscribe.handlers.types.js";
import { isPromiseLike } from "./pi-embedded-subscribe.promise.js";
export function createEmbeddedPiSessionEventHandler(ctx: EmbeddedPiSubscribeContext) {
let pendingEventChain: Promise<void> | null = null;
const scheduleEvent = (
evt: EmbeddedPiSubscribeEvent,
handler: () => void | Promise<void>,
options?: { detach?: boolean },
): void => {
const run = () => {
try {
return handler();
} catch (err) {
ctx.log.debug(`${evt.type} handler failed: ${String(err)}`);
return;
}
};
if (!pendingEventChain) {
const result = run();
if (!isPromiseLike<void>(result)) {
return;
}
const task = result
.catch((err) => {
ctx.log.debug(`${evt.type} handler failed: ${String(err)}`);
})
.finally(() => {
if (pendingEventChain === task) {
pendingEventChain = null;
}
});
if (!options?.detach) {
pendingEventChain = task;
}
return;
}
const task = pendingEventChain
.then(() => run())
.catch((err) => {
ctx.log.debug(`${evt.type} handler failed: ${String(err)}`);
})
.finally(() => {
if (pendingEventChain === task) {
pendingEventChain = null;
}
});
if (!options?.detach) {
pendingEventChain = task;
}
};
return (evt: EmbeddedPiSubscribeEvent) => {
switch (evt.type) {
case "message_start":
handleMessageStart(ctx, evt as never);
scheduleEvent(evt, () => {
handleMessageStart(ctx, evt as never);
});
return;
case "message_update":
handleMessageUpdate(ctx, evt as never);
scheduleEvent(evt, () => {
handleMessageUpdate(ctx, evt as never);
});
return;
case "message_end":
handleMessageEnd(ctx, evt as never);
scheduleEvent(evt, () => {
return handleMessageEnd(ctx, evt as never);
});
return;
case "tool_execution_start":
// Async handler - best-effort typing indicator, avoids blocking tool summaries.
// Catch rejections to avoid unhandled promise rejection crashes.
handleToolExecutionStart(ctx, evt as never).catch((err) => {
ctx.log.debug(`tool_execution_start handler failed: ${String(err)}`);
scheduleEvent(evt, () => {
return handleToolExecutionStart(ctx, evt as never);
});
return;
case "tool_execution_update":
handleToolExecutionUpdate(ctx, evt as never);
return;
case "tool_execution_end":
// Async handler - best-effort, non-blocking
handleToolExecutionEnd(ctx, evt as never).catch((err) => {
ctx.log.debug(`tool_execution_end handler failed: ${String(err)}`);
scheduleEvent(evt, () => {
handleToolExecutionUpdate(ctx, evt as never);
});
return;
case "tool_execution_end":
scheduleEvent(
evt,
() => {
return handleToolExecutionEnd(ctx, evt as never);
},
{ detach: true },
);
return;
case "agent_start":
handleAgentStart(ctx);
scheduleEvent(evt, () => {
handleAgentStart(ctx);
});
return;
case "auto_compaction_start":
handleAutoCompactionStart(ctx);
scheduleEvent(evt, () => {
handleAutoCompactionStart(ctx);
});
return;
case "auto_compaction_end":
handleAutoCompactionEnd(ctx, evt as never);
scheduleEvent(evt, () => {
handleAutoCompactionEnd(ctx, evt as never);
});
return;
case "agent_end":
handleAgentEnd(ctx);
scheduleEvent(evt, () => {
return handleAgentEnd(ctx);
});
return;
default:
return;

View File

@ -101,7 +101,7 @@ export type EmbeddedPiSubscribeContext = {
state: { thinking: boolean; final: boolean; inlineCode?: InlineCodeState },
) => string;
emitBlockChunk: (text: string) => void;
flushBlockReplyBuffer: () => void;
flushBlockReplyBuffer: () => void | Promise<void>;
emitReasoningStream: (text: string) => void;
consumeReplyDirectives: (
text: string,
@ -170,7 +170,7 @@ export type ToolHandlerContext = {
state: ToolHandlerState;
log: EmbeddedSubscribeLogger;
hookRunner?: HookRunner;
flushBlockReplyBuffer: () => void;
flushBlockReplyBuffer: () => void | Promise<void>;
shouldEmitToolResult: () => boolean;
shouldEmitToolOutput: () => boolean;
emitToolSummary: (toolName?: string, meta?: string) => void;

View File

@ -0,0 +1,8 @@
export function isPromiseLike<T>(value: unknown): value is PromiseLike<T> {
return Boolean(
value &&
(typeof value === "object" || typeof value === "function") &&
"then" in value &&
typeof (value as { then?: unknown }).then === "function",
);
}

View File

@ -87,6 +87,45 @@ describe("subscribeEmbeddedPiSession", () => {
expect(onBlockReplyFlush).toHaveBeenCalledTimes(1);
});
it("waits for async block replies before tool_execution_start flush", async () => {
const { session, emit } = createStubSessionHarness();
const delivered: string[] = [];
const flushSnapshots: string[][] = [];
subscribeEmbeddedPiSession({
session: session as unknown as Parameters<typeof subscribeEmbeddedPiSession>[0]["session"],
runId: "run-async-tool-flush",
onBlockReply: async (payload) => {
await Promise.resolve();
if (payload.text) {
delivered.push(payload.text);
}
},
onBlockReplyFlush: vi.fn(() => {
flushSnapshots.push([...delivered]);
}),
blockReplyBreak: "text_end",
blockReplyChunking: { minChars: 50, maxChars: 200 },
});
emit({
type: "message_start",
message: { role: "assistant" },
});
emitAssistantTextDelta({ emit, delta: "Short chunk." });
emit({
type: "tool_execution_start",
toolName: "bash",
toolCallId: "tool-async-flush-1",
args: { command: "echo flush" },
});
await vi.waitFor(() => {
expect(delivered).toEqual(["Short chunk."]);
expect(flushSnapshots).toEqual([["Short chunk."]]);
});
});
it("calls onBlockReplyFlush at message_end for message-boundary turns", async () => {
const { session, emit } = createStubSessionHarness();
@ -121,4 +160,43 @@ describe("subscribeEmbeddedPiSession", () => {
expect(onBlockReply.mock.calls[0]?.[0]?.text).toBe("Final reply before lifecycle end.");
expect(onBlockReplyFlush).toHaveBeenCalledTimes(1);
});
it("waits for async block replies before message_end flush", async () => {
const { session, emit } = createStubSessionHarness();
const delivered: string[] = [];
const flushSnapshots: string[][] = [];
subscribeEmbeddedPiSession({
session: session as unknown as Parameters<typeof subscribeEmbeddedPiSession>[0]["session"],
runId: "run-async-message-end-flush",
onBlockReply: async (payload) => {
await Promise.resolve();
if (payload.text) {
delivered.push(payload.text);
}
},
onBlockReplyFlush: vi.fn(() => {
flushSnapshots.push([...delivered]);
}),
blockReplyBreak: "message_end",
});
emit({
type: "message_start",
message: { role: "assistant" },
});
emitAssistantTextDelta({ emit, delta: "Final reply before lifecycle end." });
emit({
type: "message_end",
message: {
role: "assistant",
content: [{ type: "text", text: "Final reply before lifecycle end." }],
},
});
await vi.waitFor(() => {
expect(delivered).toEqual(["Final reply before lifecycle end."]);
expect(flushSnapshots).toEqual([["Final reply before lifecycle end."]]);
});
});
});

View File

@ -153,6 +153,62 @@ describe("subscribeEmbeddedPiSession", () => {
]);
},
);
it("does not let tool_execution_end delivery stall later assistant streaming", async () => {
let resolveToolResult: (() => void) | undefined;
const onToolResult = vi.fn(
() =>
new Promise<void>((resolve) => {
resolveToolResult = resolve;
}),
);
const onPartialReply = vi.fn();
const { emit } = createSubscribedHarness({
runId: "run",
onToolResult,
onPartialReply,
});
emit({
type: "tool_execution_start",
toolName: "exec",
toolCallId: "tool-1",
args: { command: "echo hi" },
});
emit({
type: "tool_execution_end",
toolName: "exec",
toolCallId: "tool-1",
isError: false,
result: {
details: {
status: "approval-pending",
approvalId: "12345678-1234-1234-1234-123456789012",
approvalSlug: "12345678",
host: "gateway",
command: "echo hi",
},
},
});
emit({
type: "message_start",
message: { role: "assistant" },
});
emitAssistantTextDelta(emit, "After tool");
await vi.waitFor(() => {
expect(onToolResult).toHaveBeenCalledTimes(1);
expect(onPartialReply).toHaveBeenCalledWith(
expect.objectContaining({ text: "After tool", delta: "After tool" }),
);
});
expect(resolveToolResult).toBeTypeOf("function");
resolveToolResult?.();
});
it.each(THINKING_TAG_CASES)(
"suppresses <%s> blocks across chunk boundaries",
async ({ open, close }) => {

View File

@ -88,9 +88,9 @@ describe("subscribeEmbeddedPiSession", () => {
result: { details: { status: "error" } },
});
emitAssistantMessageEnd(emit, messageText);
await Promise.resolve();
expect(onBlockReply).toHaveBeenCalledTimes(1);
await vi.waitFor(() => {
expect(onBlockReply).toHaveBeenCalledTimes(1);
});
});
it("ignores delivery-mirror assistant messages", async () => {

View File

@ -19,6 +19,7 @@ import type {
EmbeddedPiSubscribeContext,
EmbeddedPiSubscribeState,
} from "./pi-embedded-subscribe.handlers.types.js";
import { isPromiseLike } from "./pi-embedded-subscribe.promise.js";
import { filterToolResultMediaUrls } from "./pi-embedded-subscribe.tools.js";
import type { SubscribeEmbeddedPiSessionParams } from "./pi-embedded-subscribe.types.js";
import { formatReasoningMessage, stripDowngradedToolCallText } from "./pi-embedded-utils.js";
@ -104,6 +105,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
const messagingToolSentMediaUrls = state.messagingToolSentMediaUrls;
const pendingMessagingTexts = state.pendingMessagingTexts;
const pendingMessagingTargets = state.pendingMessagingTargets;
const pendingBlockReplyTasks = new Set<Promise<void>>();
const replyDirectiveAccumulator = createStreamingDirectiveAccumulator();
const partialReplyDirectiveAccumulator = createStreamingDirectiveAccumulator();
const shouldAllowSilentTurnText = (text: string | undefined) =>
@ -114,11 +116,21 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
if (!params.onBlockReply) {
return;
}
void Promise.resolve()
.then(() => params.onBlockReply?.(payload))
.catch((err) => {
try {
const maybeTask = params.onBlockReply(payload);
if (!isPromiseLike<void>(maybeTask)) {
return;
}
const task = Promise.resolve(maybeTask).catch((err) => {
log.warn(`block reply callback failed: ${String(err)}`);
});
pendingBlockReplyTasks.add(task);
void task.finally(() => {
pendingBlockReplyTasks.delete(task);
});
} catch (err) {
log.warn(`block reply callback failed: ${String(err)}`);
}
};
const emitBlockReply = (payload: BlockReplyPayload) => {
emitBlockReplySafely(consumePendingToolMediaIntoReply(state, payload));
@ -554,19 +566,25 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
const consumePartialReplyDirectives = (text: string, options?: { final?: boolean }) =>
partialReplyDirectiveAccumulator.consume(text, options);
const flushBlockReplyBuffer = () => {
const flushBlockReplyBuffer = (): void | Promise<void> => {
if (!params.onBlockReply) {
return;
}
if (blockChunker?.hasBuffered()) {
blockChunker.drain({ force: true, emit: emitBlockChunk });
blockChunker.reset();
return;
}
if (state.blockBuffer.length > 0) {
} else if (state.blockBuffer.length > 0) {
emitBlockChunk(state.blockBuffer);
state.blockBuffer = "";
}
if (pendingBlockReplyTasks.size === 0) {
return;
}
return (async () => {
while (pendingBlockReplyTasks.size > 0) {
await Promise.allSettled(pendingBlockReplyTasks);
}
})();
};
const emitReasoningStream = (text: string) => {

View File

@ -127,6 +127,26 @@ describe("agent-runner-utils", () => {
});
});
it("does not force final-tag enforcement for minimax providers", () => {
const run = makeRun({ workspaceDir: process.cwd() });
const authProfile = resolveProviderScopedAuthProfile({
provider: "minimax",
primaryProvider: "minimax",
authProfileId: "profile-minimax",
authProfileIdSource: "user",
});
const resolved = buildEmbeddedRunBaseParams({
run,
provider: "minimax",
model: "MiniMax-M2.7",
runId: "run-1",
authProfile,
});
expect(resolved.enforceFinalTag).toBe(false);
});
it("builds embedded contexts and scopes auth profile by provider", () => {
const run = makeRun({
authProfileId: "profile-openai",

View File

@ -46,6 +46,9 @@ export function resolveReasoningOutputMode(params: {
// handles reasoning natively via the `reasoning` field in streaming chunks,
// so tag-based enforcement is unnecessary and causes all output to be
// discarded as "(no output)" (#2279).
// Note: MiniMax is also intentionally excluded. In production it does not
// reliably wrap user-visible output in <final> tags, so forcing tag
// enforcement suppresses normal assistant replies.
if (
normalized === "google" ||
normalized === "google-gemini-cli" ||
@ -54,11 +57,6 @@ export function resolveReasoningOutputMode(params: {
return "tagged";
}
// Handle Minimax (M2.5 is chatty/reasoning-like)
if (normalized.includes("minimax")) {
return "tagged";
}
return "native";
}

View File

@ -70,8 +70,16 @@ describe("isReasoningTagProvider", () => {
value: "google-generative-ai",
expected: true,
},
{ name: "returns true for minimax", value: "minimax", expected: true },
{ name: "returns true for minimax-cn", value: "minimax-cn", expected: true },
{
name: "returns false for minimax - does not reliably honor <final> wrappers in production",
value: "minimax",
expected: false,
},
{
name: "returns false for minimax-cn",
value: "minimax-cn",
expected: false,
},
{ name: "returns false for null", value: null, expected: false },
{ name: "returns false for undefined", value: undefined, expected: false },
{ name: "returns false for empty", value: "", expected: false },
@ -83,7 +91,7 @@ describe("isReasoningTagProvider", () => {
value: string | null | undefined;
expected: boolean;
}>)("$name", ({ value, expected }) => {
expect(isReasoningTagProvider(value)).toBe(expected);
expect(isReasoningTagProvider(value, { workspaceDir: process.cwd() })).toBe(expected);
});
});