diff --git a/CHANGELOG.md b/CHANGELOG.md index b85028835bd..b1d3844d475 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ Docs: https://docs.openclaw.ai - Agents/failover: classify AbortError and stream-abort messages as timeout so Ollama NDJSON stream aborts stop showing `reason=unknown` in model fallback logs. (#58324) Thanks @yelog - Exec approvals: route Slack, Discord, and Telegram approvals through the shared channel approval-capability path so native approval auth, delivery, and `/approve` handling stay aligned across channels while preserving Telegram session-key agent filtering. (#58634) thanks @gumadeiras - Matrix/runtime: resolve the verification/bootstrap runtime from a distinct packaged Matrix entry so global npm installs stop failing on crypto bootstrap with missing-module or recursive runtime alias errors. (#59249) Thanks @gumadeiras. +- Matrix/streaming: preserve ordered block flushes before tool, message, and agent boundaries, add explicit `channels.matrix.blockStreaming` opt-in so Matrix `streaming: "off"` stays final-only by default, and move MiniMax plain-text final handling into the MiniMax provider runtime instead of the shared core heuristic. (#59266) thanks @gumadeiras ## 2026.4.2 diff --git a/docs/channels/matrix.md b/docs/channels/matrix.md index 583e84ca703..8519faf49a0 100644 --- a/docs/channels/matrix.md +++ b/docs/channels/matrix.md @@ -184,6 +184,8 @@ done: - `streaming: "off"` is the default. OpenClaw waits for the final reply and sends it once. - `streaming: "partial"` creates one editable preview message instead of sending multiple partial messages. +- `blockStreaming: true` enables separate Matrix progress messages instead of final-only delivery when `streaming` is off. +- When `streaming: "partial"`, Matrix disables shared block streaming so draft edits do not double-send. - If the preview no longer fits in one Matrix event, OpenClaw stops preview streaming and falls back to normal final delivery. - Media replies still send attachments normally. If a stale preview can no longer be reused safely, OpenClaw redacts it before sending the final media reply. - Preview edits cost extra Matrix API calls. Leave streaming off if you want the most conservative rate-limit behavior. @@ -752,6 +754,7 @@ Live directory lookup uses the logged-in Matrix account: - `historyLimit`: max room messages to include as group history context. Falls back to `messages.groupChat.historyLimit`. Set `0` to disable. - `replyToMode`: `off`, `first`, or `all`. - `streaming`: `off` (default) or `partial`. `partial` enables single-message draft previews with edit-in-place updates. +- `blockStreaming`: `true` enables separate progress messages; when unset, Matrix keeps `streaming: "off"` as final-only delivery. - `threadReplies`: `off`, `inbound`, or `always`. - `threadBindings`: per-channel overrides for thread-bound session routing and lifecycle. - `startupVerification`: automatic self-verification request mode on startup (`if-unverified`, `off`). diff --git a/extensions/matrix/src/config-schema.ts b/extensions/matrix/src/config-schema.ts index bebacd3908a..1a1789d36c6 100644 --- a/extensions/matrix/src/config-schema.ts +++ b/extensions/matrix/src/config-schema.ts @@ -65,6 +65,7 @@ export const MatrixConfigSchema = z.object({ allowlistOnly: z.boolean().optional(), allowBots: z.union([z.boolean(), z.literal("mentions")]).optional(), groupPolicy: GroupPolicySchema.optional(), + blockStreaming: z.boolean().optional(), streaming: z.union([z.enum(["partial", "off"]), z.boolean()]).optional(), replyToMode: z.enum(["off", "first", "all"]).optional(), threadReplies: z.enum(["off", "inbound", "always"]).optional(), diff --git a/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts b/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts index df727311ed4..02b424c04a9 100644 --- a/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts +++ b/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts @@ -32,6 +32,7 @@ type MatrixHandlerTestHarnessOptions = { threadReplies?: "off" | "inbound" | "always"; dmThreadReplies?: "off" | "inbound" | "always"; streaming?: "partial" | "off"; + blockStreamingEnabled?: boolean; dmEnabled?: boolean; dmPolicy?: "pairing" | "allowlist" | "open" | "disabled"; textLimit?: number; @@ -214,6 +215,7 @@ export function createMatrixHandlerTestHarness( threadReplies: options.threadReplies ?? "inbound", dmThreadReplies: options.dmThreadReplies, streaming: options.streaming ?? "off", + blockStreamingEnabled: options.blockStreamingEnabled ?? false, dmEnabled: options.dmEnabled ?? true, dmPolicy: options.dmPolicy ?? "open", textLimit: options.textLimit ?? 8_000, diff --git a/extensions/matrix/src/matrix/monitor/handler.test.ts b/extensions/matrix/src/matrix/monitor/handler.test.ts index e4cc8491741..a792bf9919a 100644 --- a/extensions/matrix/src/matrix/monitor/handler.test.ts +++ b/extensions/matrix/src/matrix/monitor/handler.test.ts @@ -937,6 +937,7 @@ describe("matrix monitor handler pairing account scope", () => { replyToMode: "off", threadReplies: "inbound", streaming: "off", + blockStreamingEnabled: false, dmEnabled: true, dmPolicy: "open", textLimit: 8_000, @@ -1849,3 +1850,91 @@ describe("matrix monitor handler draft streaming", () => { await finish(); }); }); + +describe("matrix monitor handler block streaming config", () => { + it("keeps final-only delivery when draft streaming is off by default", async () => { + let capturedDisableBlockStreaming: boolean | undefined; + + const { handler } = createMatrixHandlerTestHarness({ + streaming: "off", + dispatchReplyFromConfig: vi.fn( + async (args: { replyOptions?: { disableBlockStreaming?: boolean } }) => { + capturedDisableBlockStreaming = args.replyOptions?.disableBlockStreaming; + return { queuedFinal: false, counts: { final: 0, block: 0, tool: 0 } }; + }, + ) as never, + }); + + await handler( + "!room:example.org", + createMatrixTextMessageEvent({ eventId: "$msg1", body: "hello" }), + ); + + expect(capturedDisableBlockStreaming).toBe(true); + }); + + it("disables shared block streaming when draft streaming is partial", async () => { + let capturedDisableBlockStreaming: boolean | undefined; + + const { handler } = createMatrixHandlerTestHarness({ + streaming: "partial", + dispatchReplyFromConfig: vi.fn( + async (args: { replyOptions?: { disableBlockStreaming?: boolean } }) => { + capturedDisableBlockStreaming = args.replyOptions?.disableBlockStreaming; + return { queuedFinal: false, counts: { final: 0, block: 0, tool: 0 } }; + }, + ) as never, + }); + + await handler( + "!room:example.org", + createMatrixTextMessageEvent({ eventId: "$msg1", body: "hello" }), + ); + + expect(capturedDisableBlockStreaming).toBe(true); + }); + + it("keeps draft streaming authoritative when partial and block streaming are both enabled", async () => { + let capturedDisableBlockStreaming: boolean | undefined; + + const { handler } = createMatrixHandlerTestHarness({ + streaming: "partial", + blockStreamingEnabled: true, + dispatchReplyFromConfig: vi.fn( + async (args: { replyOptions?: { disableBlockStreaming?: boolean } }) => { + capturedDisableBlockStreaming = args.replyOptions?.disableBlockStreaming; + return { queuedFinal: false, counts: { final: 0, block: 0, tool: 0 } }; + }, + ) as never, + }); + + await handler( + "!room:example.org", + createMatrixTextMessageEvent({ eventId: "$msg1", body: "hello" }), + ); + + expect(capturedDisableBlockStreaming).toBe(true); + }); + + it("uses shared block streaming when explicitly enabled for Matrix", async () => { + let capturedDisableBlockStreaming: boolean | undefined; + + const { handler } = createMatrixHandlerTestHarness({ + streaming: "off", + blockStreamingEnabled: true, + dispatchReplyFromConfig: vi.fn( + async (args: { replyOptions?: { disableBlockStreaming?: boolean } }) => { + capturedDisableBlockStreaming = args.replyOptions?.disableBlockStreaming; + return { queuedFinal: false, counts: { final: 0, block: 0, tool: 0 } }; + }, + ) as never, + }); + + await handler( + "!room:example.org", + createMatrixTextMessageEvent({ eventId: "$msg1", body: "hello" }), + ); + + expect(capturedDisableBlockStreaming).toBe(false); + }); +}); diff --git a/extensions/matrix/src/matrix/monitor/handler.ts b/extensions/matrix/src/matrix/monitor/handler.ts index 5e2ec946137..3ffd7bb7a43 100644 --- a/extensions/matrix/src/matrix/monitor/handler.ts +++ b/extensions/matrix/src/matrix/monitor/handler.ts @@ -84,6 +84,7 @@ export type MatrixMonitorHandlerParams = { /** DM-specific threadReplies override. Falls back to threadReplies when absent. */ dmThreadReplies?: "off" | "inbound" | "always"; streaming: "partial" | "off"; + blockStreamingEnabled: boolean; dmEnabled: boolean; dmPolicy: "open" | "pairing" | "allowlist" | "disabled"; textLimit: number; @@ -201,6 +202,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam threadReplies, dmThreadReplies, streaming, + blockStreamingEnabled, dmEnabled, dmPolicy, textLimit, @@ -1127,10 +1129,10 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam }); }, }); - const streamingEnabled = streaming === "partial"; + const draftStreamingEnabled = streaming === "partial"; const draftReplyToId = replyToMode !== "off" && !threadTarget ? _messageId : undefined; let currentDraftReplyToId = draftReplyToId; - const draftStream = streamingEnabled + const draftStream = draftStreamingEnabled ? createMatrixDraftStream({ roomId, client, @@ -1350,9 +1352,11 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam replyOptions: { ...replyOptions, skillFilter: roomConfig?.skills, - // When streaming is active, disable block streaming — draft - // streaming replaces it with edit-in-place updates. - disableBlockStreaming: streamingEnabled ? true : undefined, + // Matrix expects explicit assistant progress updates as + // separate messages only when block streaming is explicitly + // enabled. Partial draft streaming still disables the shared + // block pipeline so draft edits do not double-send. + disableBlockStreaming: draftStream ? true : !blockStreamingEnabled, onPartialReply: draftStream ? (payload) => { const fullText = payload.text ?? ""; diff --git a/extensions/matrix/src/matrix/monitor/index.ts b/extensions/matrix/src/matrix/monitor/index.ts index b0dae1cddc1..1da020c3f18 100644 --- a/extensions/matrix/src/matrix/monitor/index.ts +++ b/extensions/matrix/src/matrix/monitor/index.ts @@ -210,6 +210,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi const mediaMaxBytes = Math.max(1, mediaMaxMb) * 1024 * 1024; const streaming: "partial" | "off" = accountConfig.streaming === true || accountConfig.streaming === "partial" ? "partial" : "off"; + const blockStreamingEnabled = accountConfig.blockStreaming === true; const startupMs = Date.now(); const startupGraceMs = 0; // Cold starts should ignore old room history, but once we have a persisted @@ -265,6 +266,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi threadReplies, dmThreadReplies, streaming, + blockStreamingEnabled, dmEnabled, dmPolicy, textLimit, diff --git a/extensions/matrix/src/setup-config.ts b/extensions/matrix/src/setup-config.ts index 2a82f3169a5..1660272fbef 100644 --- a/extensions/matrix/src/setup-config.ts +++ b/extensions/matrix/src/setup-config.ts @@ -39,6 +39,7 @@ const MATRIX_SINGLE_ACCOUNT_KEYS_TO_MOVE = new Set([ "encryption", "allowlistOnly", "allowBots", + "blockStreaming", "replyToMode", "threadReplies", "textChunkLimit", @@ -59,6 +60,9 @@ const MATRIX_SINGLE_ACCOUNT_KEYS_TO_MOVE = new Set([ "actions", ]); const MATRIX_NAMED_ACCOUNT_PROMOTION_KEYS = new Set([ + // When named accounts already exist, only move auth/bootstrap fields into the + // promoted account. Delivery-policy fields stay at the top level so they + // remain shared inherited defaults for every account. "name", "homeserver", "userId", diff --git a/extensions/matrix/src/setup-core.test.ts b/extensions/matrix/src/setup-core.test.ts index 5966b971a14..5fbff6d8b87 100644 --- a/extensions/matrix/src/setup-core.test.ts +++ b/extensions/matrix/src/setup-core.test.ts @@ -104,4 +104,45 @@ describe("matrixSetupAdapter", () => { proxy: "http://127.0.0.1:7890", }); }); + + it("keeps top-level block streaming as a shared default when named accounts already exist", () => { + const cfg = { + channels: { + matrix: { + homeserver: "https://matrix.example.org", + userId: "@default:example.org", + accessToken: "default-token", + blockStreaming: true, + accounts: { + support: { + homeserver: "https://matrix.example.org", + userId: "@support:example.org", + accessToken: "support-token", + }, + }, + }, + }, + } as CoreConfig; + + const next = matrixSetupAdapter.applyAccountConfig({ + cfg, + accountId: "ops", + input: { + name: "Ops", + homeserver: "https://matrix.example.org", + userId: "@ops:example.org", + accessToken: "ops-token", + }, + }) as CoreConfig; + + expect(next.channels?.matrix?.blockStreaming).toBe(true); + expect(next.channels?.matrix?.accounts?.ops).toMatchObject({ + name: "Ops", + enabled: true, + homeserver: "https://matrix.example.org", + userId: "@ops:example.org", + accessToken: "ops-token", + }); + expect(next.channels?.matrix?.accounts?.ops?.blockStreaming).toBeUndefined(); + }); }); diff --git a/extensions/matrix/src/types.ts b/extensions/matrix/src/types.ts index 1ac81018a84..82efecf9b49 100644 --- a/extensions/matrix/src/types.ts +++ b/extensions/matrix/src/types.ts @@ -101,6 +101,13 @@ export type MatrixConfig = { allowBots?: boolean | "mentions"; /** Group message policy (default: allowlist). */ groupPolicy?: GroupPolicy; + /** + * Enable shared block-streaming replies for Matrix. + * + * Default: false. Matrix keeps `streaming: "off"` as final-only delivery + * unless block streaming is explicitly enabled. + */ + blockStreaming?: boolean; /** Allowlist for group senders (matrix user IDs). */ groupAllowFrom?: Array; /** Control reply threading when reply tags are present (off|first|all). */ @@ -149,6 +156,8 @@ export type MatrixConfig = { * Streaming mode for Matrix replies. * - `"partial"`: edit a single message in place as the model generates text. * - `"off"`: deliver the full reply once the model finishes. + * - Use `blockStreaming: true` when you want separate progress messages + * while `streaming` remains `"off"`. * - `true` maps to `"partial"`, `false` maps to `"off"`. * Default: `"off"`. */ diff --git a/extensions/minimax/index.ts b/extensions/minimax/index.ts index 61729c23eb8..0011bfd9069 100644 --- a/extensions/minimax/index.ts +++ b/extensions/minimax/index.ts @@ -166,7 +166,9 @@ export default definePluginEntry({ id: API_PROVIDER_ID, label: PROVIDER_LABEL, docsPath: "/providers/minimax", + aliases: ["minimax-cn"], envVars: ["MINIMAX_API_KEY"], + resolveReasoningOutputMode: () => "native", auth: [ createProviderApiKeyAuthMethod({ providerId: API_PROVIDER_ID, @@ -240,6 +242,7 @@ export default definePluginEntry({ label: PROVIDER_LABEL, docsPath: "/providers/minimax", envVars: ["MINIMAX_OAUTH_TOKEN", "MINIMAX_API_KEY"], + resolveReasoningOutputMode: () => "native", catalog: { run: async (ctx) => resolvePortalCatalog(ctx), }, diff --git a/src/agents/pi-embedded-subscribe.handlers.lifecycle.test.ts b/src/agents/pi-embedded-subscribe.handlers.lifecycle.test.ts index e104c1ef895..b986c559763 100644 --- a/src/agents/pi-embedded-subscribe.handlers.lifecycle.test.ts +++ b/src/agents/pi-embedded-subscribe.handlers.lifecycle.test.ts @@ -43,7 +43,7 @@ function createContext( } describe("handleAgentEnd", () => { - it("logs the resolved error message when run ends with assistant error", () => { + it("logs the resolved error message when run ends with assistant error", async () => { const onAgentEvent = vi.fn(); const ctx = createContext( { @@ -55,7 +55,7 @@ describe("handleAgentEnd", () => { { onAgentEvent }, ); - handleAgentEnd(ctx); + await handleAgentEnd(ctx); const warn = vi.mocked(ctx.log.warn); expect(warn).toHaveBeenCalledTimes(1); @@ -77,7 +77,7 @@ describe("handleAgentEnd", () => { }); }); - it("attaches raw provider error metadata and includes model/provider in console output", () => { + it("attaches raw provider error metadata and includes model/provider in console output", async () => { const ctx = createContext({ role: "assistant", stopReason: "error", @@ -87,7 +87,7 @@ describe("handleAgentEnd", () => { content: [{ type: "text", text: "" }], }); - handleAgentEnd(ctx); + await handleAgentEnd(ctx); const warn = vi.mocked(ctx.log.warn); expect(warn).toHaveBeenCalledTimes(1); @@ -103,7 +103,7 @@ describe("handleAgentEnd", () => { }); }); - it("sanitizes model and provider before writing consoleMessage", () => { + it("sanitizes model and provider before writing consoleMessage", async () => { const ctx = createContext({ role: "assistant", stopReason: "error", @@ -113,7 +113,7 @@ describe("handleAgentEnd", () => { content: [{ type: "text", text: "" }], }); - handleAgentEnd(ctx); + await handleAgentEnd(ctx); const warn = vi.mocked(ctx.log.warn); const meta = warn.mock.calls[0]?.[1]; @@ -127,7 +127,7 @@ describe("handleAgentEnd", () => { expect(meta?.consoleMessage).not.toContain("\u001b"); }); - it("redacts logged error text before emitting lifecycle events", () => { + it("redacts logged error text before emitting lifecycle events", async () => { const onAgentEvent = vi.fn(); const ctx = createContext( { @@ -139,7 +139,7 @@ describe("handleAgentEnd", () => { { onAgentEvent }, ); - handleAgentEnd(ctx); + await handleAgentEnd(ctx); const warn = vi.mocked(ctx.log.warn); expect(warn.mock.calls[0]?.[1]).toMatchObject({ @@ -156,21 +156,21 @@ describe("handleAgentEnd", () => { }); }); - it("keeps non-error run-end logging on debug only", () => { + it("keeps non-error run-end logging on debug only", async () => { const ctx = createContext(undefined); - handleAgentEnd(ctx); + await handleAgentEnd(ctx); expect(ctx.log.warn).not.toHaveBeenCalled(); expect(ctx.log.debug).toHaveBeenCalledWith("embedded run agent end: runId=run-1 isError=false"); }); - it("flushes orphaned tool media as a media-only block reply", () => { + it("flushes orphaned tool media as a media-only block reply", async () => { const ctx = createContext(undefined); ctx.state.pendingToolMediaUrls = ["/tmp/reply.opus"]; ctx.state.pendingToolAudioAsVoice = true; - handleAgentEnd(ctx); + await handleAgentEnd(ctx); expect(ctx.emitBlockReply).toHaveBeenCalledWith({ mediaUrls: ["/tmp/reply.opus"], diff --git a/src/agents/pi-embedded-subscribe.handlers.lifecycle.ts b/src/agents/pi-embedded-subscribe.handlers.lifecycle.ts index 4d8debf430e..2b16c996bff 100644 --- a/src/agents/pi-embedded-subscribe.handlers.lifecycle.ts +++ b/src/agents/pi-embedded-subscribe.handlers.lifecycle.ts @@ -11,6 +11,7 @@ import { hasAssistantVisibleReply, } from "./pi-embedded-subscribe.handlers.messages.js"; import type { EmbeddedPiSubscribeContext } from "./pi-embedded-subscribe.handlers.types.js"; +import { isPromiseLike } from "./pi-embedded-subscribe.promise.js"; import { isAssistantMessage } from "./pi-embedded-utils.js"; export { @@ -100,24 +101,55 @@ export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext) { }); } - ctx.flushBlockReplyBuffer(); - const pendingToolMediaReply = consumePendingToolMediaReply(ctx.state); - if (pendingToolMediaReply && hasAssistantVisibleReply(pendingToolMediaReply)) { - ctx.emitBlockReply(pendingToolMediaReply); - } - // Flush the reply pipeline so the response reaches the channel before - // compaction wait blocks the run. This mirrors the pattern used by - // handleToolExecutionStart and ensures delivery is not held hostage to - // long-running compaction (#35074). - void ctx.params.onBlockReplyFlush?.(); + const finalizeAgentEnd = () => { + ctx.state.blockState.thinking = false; + ctx.state.blockState.final = false; + ctx.state.blockState.inlineCode = createInlineCodeState(); - ctx.state.blockState.thinking = false; - ctx.state.blockState.final = false; - ctx.state.blockState.inlineCode = createInlineCodeState(); + if (ctx.state.pendingCompactionRetry > 0) { + ctx.resolveCompactionRetry(); + } else { + ctx.maybeResolveCompactionWait(); + } + }; - if (ctx.state.pendingCompactionRetry > 0) { - ctx.resolveCompactionRetry(); - } else { - ctx.maybeResolveCompactionWait(); + const flushPendingMediaAndChannel = () => { + const pendingToolMediaReply = consumePendingToolMediaReply(ctx.state); + if (pendingToolMediaReply && hasAssistantVisibleReply(pendingToolMediaReply)) { + ctx.emitBlockReply(pendingToolMediaReply); + } + + const postMediaFlushResult = ctx.flushBlockReplyBuffer(); + if (isPromiseLike(postMediaFlushResult)) { + return postMediaFlushResult.then(() => { + const onBlockReplyFlushResult = ctx.params.onBlockReplyFlush?.(); + if (isPromiseLike(onBlockReplyFlushResult)) { + return onBlockReplyFlushResult; + } + }); + } + + const onBlockReplyFlushResult = ctx.params.onBlockReplyFlush?.(); + if (isPromiseLike(onBlockReplyFlushResult)) { + return onBlockReplyFlushResult; + } + }; + + const flushBlockReplyBufferResult = ctx.flushBlockReplyBuffer(); + if (isPromiseLike(flushBlockReplyBufferResult)) { + return flushBlockReplyBufferResult + .then(() => flushPendingMediaAndChannel()) + .finally(() => { + finalizeAgentEnd(); + }); } + + const flushPendingMediaAndChannelResult = flushPendingMediaAndChannel(); + if (isPromiseLike(flushPendingMediaAndChannelResult)) { + return flushPendingMediaAndChannelResult.finally(() => { + finalizeAgentEnd(); + }); + } + + finalizeAgentEnd(); } diff --git a/src/agents/pi-embedded-subscribe.handlers.messages.test.ts b/src/agents/pi-embedded-subscribe.handlers.messages.test.ts index 688f0029e88..41ec1cfce01 100644 --- a/src/agents/pi-embedded-subscribe.handlers.messages.test.ts +++ b/src/agents/pi-embedded-subscribe.handlers.messages.test.ts @@ -1,11 +1,14 @@ -import { describe, expect, it } from "vitest"; +import { describe, expect, it, vi } from "vitest"; +import { createInlineCodeState } from "../markdown/code-spans.js"; import { buildAssistantStreamData, consumePendingToolMediaIntoReply, consumePendingToolMediaReply, + handleMessageUpdate, hasAssistantVisibleReply, resolveSilentReplyFallbackText, } from "./pi-embedded-subscribe.handlers.messages.js"; +import type { EmbeddedPiSubscribeContext } from "./pi-embedded-subscribe.handlers.types.js"; describe("resolveSilentReplyFallbackText", () => { it("replaces NO_REPLY with latest messaging tool text when available", () => { @@ -135,3 +138,48 @@ describe("consumePendingToolMediaReply", () => { expect(state.pendingToolAudioAsVoice).toBe(false); }); }); + +describe("handleMessageUpdate", () => { + it("contains synchronous text_end flush failures", async () => { + const debug = vi.fn(); + const ctx = { + params: { + runId: "run-1", + session: { id: "session-1" }, + }, + state: { + deterministicApprovalPromptSent: false, + reasoningStreamOpen: false, + streamReasoning: false, + deltaBuffer: "", + blockBuffer: "", + partialBlockState: { + thinking: false, + final: false, + inlineCode: createInlineCodeState(), + }, + lastStreamedAssistantCleaned: undefined, + emittedAssistantUpdate: false, + shouldEmitPartialReplies: false, + blockReplyBreak: "text_end", + }, + log: { debug }, + noteLastAssistant: vi.fn(), + stripBlockTags: (text: string) => text, + consumePartialReplyDirectives: vi.fn(() => null), + flushBlockReplyBuffer: vi.fn(() => { + throw new Error("boom"); + }), + } as unknown as EmbeddedPiSubscribeContext; + + handleMessageUpdate(ctx, { + type: "message_update", + message: { role: "assistant", content: [] }, + assistantMessageEvent: { type: "text_end" }, + } as never); + + await vi.waitFor(() => { + expect(debug).toHaveBeenCalledWith("text_end block reply flush failed: Error: boom"); + }); + }); +}); diff --git a/src/agents/pi-embedded-subscribe.handlers.messages.ts b/src/agents/pi-embedded-subscribe.handlers.messages.ts index 5434ea71001..ae0191bcef8 100644 --- a/src/agents/pi-embedded-subscribe.handlers.messages.ts +++ b/src/agents/pi-embedded-subscribe.handlers.messages.ts @@ -13,6 +13,7 @@ import type { EmbeddedPiSubscribeContext, EmbeddedPiSubscribeState, } from "./pi-embedded-subscribe.handlers.types.js"; +import { isPromiseLike } from "./pi-embedded-subscribe.promise.js"; import { appendRawStream } from "./pi-embedded-subscribe.raw-stream.js"; import { extractAssistantText, @@ -367,7 +368,11 @@ export function handleMessageUpdate( evtType === "text_end" && ctx.state.blockReplyBreak === "text_end" ) { - ctx.flushBlockReplyBuffer(); + void Promise.resolve() + .then(() => ctx.flushBlockReplyBuffer()) + .catch((err) => { + ctx.log.debug(`text_end block reply flush failed: ${String(err)}`); + }); } } @@ -457,16 +462,6 @@ export function handleMessageEnd( }); const onBlockReply = ctx.params.onBlockReply; - const emitBlockReplySafely = (payload: Parameters>[0]) => { - if (!onBlockReply) { - return; - } - void Promise.resolve() - .then(() => onBlockReply(payload)) - .catch((err) => { - ctx.log.warn(`block reply callback failed: ${String(err)}`); - }); - }; const shouldEmitReasoning = Boolean( !ctx.params.silentExpected && ctx.state.includeReasoning && @@ -481,7 +476,7 @@ export function handleMessageEnd( return; } ctx.state.lastReasoningSent = formattedReasoning; - emitBlockReplySafely({ text: formattedReasoning, isReasoning: true }); + ctx.emitBlockReply({ text: formattedReasoning, isReasoning: true }); }; if (shouldEmitReasoningBeforeAnswer) { @@ -555,21 +550,43 @@ export function handleMessageEnd( emitSplitResultAsBlockReply(ctx.consumeReplyDirectives("", { final: true })); } + const finalizeMessageEnd = () => { + ctx.state.deltaBuffer = ""; + ctx.state.blockBuffer = ""; + ctx.blockChunker?.reset(); + ctx.state.blockState.thinking = false; + ctx.state.blockState.final = false; + ctx.state.blockState.inlineCode = createInlineCodeState(); + ctx.state.lastStreamedAssistant = undefined; + ctx.state.lastStreamedAssistantCleaned = undefined; + ctx.state.reasoningStreamOpen = false; + }; + if ( !ctx.params.silentExpected && ctx.state.blockReplyBreak === "message_end" && ctx.params.onBlockReplyFlush ) { - void ctx.params.onBlockReplyFlush(); + const flushBlockReplyBufferResult = ctx.flushBlockReplyBuffer(); + if (isPromiseLike(flushBlockReplyBufferResult)) { + return flushBlockReplyBufferResult + .then(() => { + const onBlockReplyFlushResult = ctx.params.onBlockReplyFlush?.(); + if (isPromiseLike(onBlockReplyFlushResult)) { + return onBlockReplyFlushResult; + } + }) + .finally(() => { + finalizeMessageEnd(); + }); + } + const onBlockReplyFlushResult = ctx.params.onBlockReplyFlush(); + if (isPromiseLike(onBlockReplyFlushResult)) { + return onBlockReplyFlushResult.finally(() => { + finalizeMessageEnd(); + }); + } } - ctx.state.deltaBuffer = ""; - ctx.state.blockBuffer = ""; - ctx.blockChunker?.reset(); - ctx.state.blockState.thinking = false; - ctx.state.blockState.final = false; - ctx.state.blockState.inlineCode = createInlineCodeState(); - ctx.state.lastStreamedAssistant = undefined; - ctx.state.lastStreamedAssistantCleaned = undefined; - ctx.state.reasoningStreamOpen = false; + finalizeMessageEnd(); } diff --git a/src/agents/pi-embedded-subscribe.handlers.tools.ts b/src/agents/pi-embedded-subscribe.handlers.tools.ts index 1662d2b2de6..ad55fd02723 100644 --- a/src/agents/pi-embedded-subscribe.handlers.tools.ts +++ b/src/agents/pi-embedded-subscribe.handlers.tools.ts @@ -12,6 +12,7 @@ import type { ToolCallSummary, ToolHandlerContext, } from "./pi-embedded-subscribe.handlers.types.js"; +import { isPromiseLike } from "./pi-embedded-subscribe.promise.js"; import { extractToolResultMediaArtifact, extractMessagingToolSend, @@ -326,96 +327,109 @@ async function emitToolResultOutput(params: { }); } -export async function handleToolExecutionStart( +export function handleToolExecutionStart( ctx: ToolHandlerContext, evt: AgentEvent & { toolName: string; toolCallId: string; args: unknown }, ) { + const continueAfterBlockReplyFlush = () => { + const onBlockReplyFlushResult = ctx.params.onBlockReplyFlush?.(); + if (isPromiseLike(onBlockReplyFlushResult)) { + return onBlockReplyFlushResult.then(() => { + continueToolExecutionStart(); + }); + } + continueToolExecutionStart(); + }; + + const continueToolExecutionStart = () => { + const rawToolName = String(evt.toolName); + const toolName = normalizeToolName(rawToolName); + const toolCallId = String(evt.toolCallId); + const args = evt.args; + const runId = ctx.params.runId; + + // Track start time and args for after_tool_call hook + toolStartData.set(buildToolStartKey(runId, toolCallId), { startTime: Date.now(), args }); + + if (toolName === "read") { + const record = args && typeof args === "object" ? (args as Record) : {}; + const filePathValue = + typeof record.path === "string" + ? record.path + : typeof record.file_path === "string" + ? record.file_path + : ""; + const filePath = filePathValue.trim(); + if (!filePath) { + const argsPreview = typeof args === "string" ? args.slice(0, 200) : undefined; + ctx.log.warn( + `read tool called without path: toolCallId=${toolCallId} argsType=${typeof args}${argsPreview ? ` argsPreview=${argsPreview}` : ""}`, + ); + } + } + + const meta = extendExecMeta(toolName, args, inferToolMetaFromArgs(toolName, args)); + ctx.state.toolMetaById.set(toolCallId, buildToolCallSummary(toolName, args, meta)); + ctx.log.debug( + `embedded run tool start: runId=${ctx.params.runId} tool=${toolName} toolCallId=${toolCallId}`, + ); + + const shouldEmitToolEvents = ctx.shouldEmitToolResult(); + emitAgentEvent({ + runId: ctx.params.runId, + stream: "tool", + data: { + phase: "start", + name: toolName, + toolCallId, + args: args as Record, + }, + }); + // Best-effort typing signal; do not block tool summaries on slow emitters. + void ctx.params.onAgentEvent?.({ + stream: "tool", + data: { phase: "start", name: toolName, toolCallId }, + }); + + if ( + ctx.params.onToolResult && + shouldEmitToolEvents && + !ctx.state.toolSummaryById.has(toolCallId) + ) { + ctx.state.toolSummaryById.add(toolCallId); + ctx.emitToolSummary(toolName, meta); + } + + // Track messaging tool sends (pending until confirmed in tool_execution_end). + if (isMessagingTool(toolName)) { + const argsRecord = args && typeof args === "object" ? (args as Record) : {}; + const isMessagingSend = isMessagingToolSendAction(toolName, argsRecord); + if (isMessagingSend) { + const sendTarget = extractMessagingToolSend(toolName, argsRecord); + if (sendTarget) { + ctx.state.pendingMessagingTargets.set(toolCallId, sendTarget); + } + // Field names vary by tool: Discord/Slack use "content", sessions_send uses "message" + const text = (argsRecord.content as string) ?? (argsRecord.message as string); + if (text && typeof text === "string") { + ctx.state.pendingMessagingTexts.set(toolCallId, text); + ctx.log.debug(`Tracking pending messaging text: tool=${toolName} len=${text.length}`); + } + // Track media URLs from messaging tool args (pending until tool_execution_end). + const mediaUrls = collectMessagingMediaUrlsFromRecord(argsRecord); + if (mediaUrls.length > 0) { + ctx.state.pendingMessagingMediaUrls.set(toolCallId, mediaUrls); + } + } + } + }; + // Flush pending block replies to preserve message boundaries before tool execution. - ctx.flushBlockReplyBuffer(); - if (ctx.params.onBlockReplyFlush) { - await ctx.params.onBlockReplyFlush(); - } - - const rawToolName = String(evt.toolName); - const toolName = normalizeToolName(rawToolName); - const toolCallId = String(evt.toolCallId); - const args = evt.args; - const runId = ctx.params.runId; - - // Track start time and args for after_tool_call hook - toolStartData.set(buildToolStartKey(runId, toolCallId), { startTime: Date.now(), args }); - - if (toolName === "read") { - const record = args && typeof args === "object" ? (args as Record) : {}; - const filePathValue = - typeof record.path === "string" - ? record.path - : typeof record.file_path === "string" - ? record.file_path - : ""; - const filePath = filePathValue.trim(); - if (!filePath) { - const argsPreview = typeof args === "string" ? args.slice(0, 200) : undefined; - ctx.log.warn( - `read tool called without path: toolCallId=${toolCallId} argsType=${typeof args}${argsPreview ? ` argsPreview=${argsPreview}` : ""}`, - ); - } - } - - const meta = extendExecMeta(toolName, args, inferToolMetaFromArgs(toolName, args)); - ctx.state.toolMetaById.set(toolCallId, buildToolCallSummary(toolName, args, meta)); - ctx.log.debug( - `embedded run tool start: runId=${ctx.params.runId} tool=${toolName} toolCallId=${toolCallId}`, - ); - - const shouldEmitToolEvents = ctx.shouldEmitToolResult(); - emitAgentEvent({ - runId: ctx.params.runId, - stream: "tool", - data: { - phase: "start", - name: toolName, - toolCallId, - args: args as Record, - }, - }); - // Best-effort typing signal; do not block tool summaries on slow emitters. - void ctx.params.onAgentEvent?.({ - stream: "tool", - data: { phase: "start", name: toolName, toolCallId }, - }); - - if ( - ctx.params.onToolResult && - shouldEmitToolEvents && - !ctx.state.toolSummaryById.has(toolCallId) - ) { - ctx.state.toolSummaryById.add(toolCallId); - ctx.emitToolSummary(toolName, meta); - } - - // Track messaging tool sends (pending until confirmed in tool_execution_end). - if (isMessagingTool(toolName)) { - const argsRecord = args && typeof args === "object" ? (args as Record) : {}; - const isMessagingSend = isMessagingToolSendAction(toolName, argsRecord); - if (isMessagingSend) { - const sendTarget = extractMessagingToolSend(toolName, argsRecord); - if (sendTarget) { - ctx.state.pendingMessagingTargets.set(toolCallId, sendTarget); - } - // Field names vary by tool: Discord/Slack use "content", sessions_send uses "message" - const text = (argsRecord.content as string) ?? (argsRecord.message as string); - if (text && typeof text === "string") { - ctx.state.pendingMessagingTexts.set(toolCallId, text); - ctx.log.debug(`Tracking pending messaging text: tool=${toolName} len=${text.length}`); - } - // Track media URLs from messaging tool args (pending until tool_execution_end). - const mediaUrls = collectMessagingMediaUrlsFromRecord(argsRecord); - if (mediaUrls.length > 0) { - ctx.state.pendingMessagingMediaUrls.set(toolCallId, mediaUrls); - } - } + const flushBlockReplyBufferResult = ctx.flushBlockReplyBuffer(); + if (isPromiseLike(flushBlockReplyBufferResult)) { + return flushBlockReplyBufferResult.then(() => continueAfterBlockReplyFlush()); } + return continueAfterBlockReplyFlush(); } export function handleToolExecutionUpdate( diff --git a/src/agents/pi-embedded-subscribe.handlers.ts b/src/agents/pi-embedded-subscribe.handlers.ts index 96ebe52ff1b..00ecb8654f9 100644 --- a/src/agents/pi-embedded-subscribe.handlers.ts +++ b/src/agents/pi-embedded-subscribe.handlers.ts @@ -18,46 +18,115 @@ import type { EmbeddedPiSubscribeContext, EmbeddedPiSubscribeEvent, } from "./pi-embedded-subscribe.handlers.types.js"; +import { isPromiseLike } from "./pi-embedded-subscribe.promise.js"; export function createEmbeddedPiSessionEventHandler(ctx: EmbeddedPiSubscribeContext) { + let pendingEventChain: Promise | null = null; + + const scheduleEvent = ( + evt: EmbeddedPiSubscribeEvent, + handler: () => void | Promise, + options?: { detach?: boolean }, + ): void => { + const run = () => { + try { + return handler(); + } catch (err) { + ctx.log.debug(`${evt.type} handler failed: ${String(err)}`); + return; + } + }; + + if (!pendingEventChain) { + const result = run(); + if (!isPromiseLike(result)) { + return; + } + const task = result + .catch((err) => { + ctx.log.debug(`${evt.type} handler failed: ${String(err)}`); + }) + .finally(() => { + if (pendingEventChain === task) { + pendingEventChain = null; + } + }); + if (!options?.detach) { + pendingEventChain = task; + } + return; + } + + const task = pendingEventChain + .then(() => run()) + .catch((err) => { + ctx.log.debug(`${evt.type} handler failed: ${String(err)}`); + }) + .finally(() => { + if (pendingEventChain === task) { + pendingEventChain = null; + } + }); + if (!options?.detach) { + pendingEventChain = task; + } + }; + return (evt: EmbeddedPiSubscribeEvent) => { switch (evt.type) { case "message_start": - handleMessageStart(ctx, evt as never); + scheduleEvent(evt, () => { + handleMessageStart(ctx, evt as never); + }); return; case "message_update": - handleMessageUpdate(ctx, evt as never); + scheduleEvent(evt, () => { + handleMessageUpdate(ctx, evt as never); + }); return; case "message_end": - handleMessageEnd(ctx, evt as never); + scheduleEvent(evt, () => { + return handleMessageEnd(ctx, evt as never); + }); return; case "tool_execution_start": - // Async handler - best-effort typing indicator, avoids blocking tool summaries. - // Catch rejections to avoid unhandled promise rejection crashes. - handleToolExecutionStart(ctx, evt as never).catch((err) => { - ctx.log.debug(`tool_execution_start handler failed: ${String(err)}`); + scheduleEvent(evt, () => { + return handleToolExecutionStart(ctx, evt as never); }); return; case "tool_execution_update": - handleToolExecutionUpdate(ctx, evt as never); - return; - case "tool_execution_end": - // Async handler - best-effort, non-blocking - handleToolExecutionEnd(ctx, evt as never).catch((err) => { - ctx.log.debug(`tool_execution_end handler failed: ${String(err)}`); + scheduleEvent(evt, () => { + handleToolExecutionUpdate(ctx, evt as never); }); return; + case "tool_execution_end": + scheduleEvent( + evt, + () => { + return handleToolExecutionEnd(ctx, evt as never); + }, + { detach: true }, + ); + return; case "agent_start": - handleAgentStart(ctx); + scheduleEvent(evt, () => { + handleAgentStart(ctx); + }); return; case "auto_compaction_start": - handleAutoCompactionStart(ctx); + scheduleEvent(evt, () => { + handleAutoCompactionStart(ctx); + }); return; case "auto_compaction_end": - handleAutoCompactionEnd(ctx, evt as never); + scheduleEvent(evt, () => { + handleAutoCompactionEnd(ctx, evt as never); + }); return; case "agent_end": - handleAgentEnd(ctx); + scheduleEvent(evt, () => { + return handleAgentEnd(ctx); + }); return; default: return; diff --git a/src/agents/pi-embedded-subscribe.handlers.types.ts b/src/agents/pi-embedded-subscribe.handlers.types.ts index 211ac887642..92ced5bd931 100644 --- a/src/agents/pi-embedded-subscribe.handlers.types.ts +++ b/src/agents/pi-embedded-subscribe.handlers.types.ts @@ -101,7 +101,7 @@ export type EmbeddedPiSubscribeContext = { state: { thinking: boolean; final: boolean; inlineCode?: InlineCodeState }, ) => string; emitBlockChunk: (text: string) => void; - flushBlockReplyBuffer: () => void; + flushBlockReplyBuffer: () => void | Promise; emitReasoningStream: (text: string) => void; consumeReplyDirectives: ( text: string, @@ -170,7 +170,7 @@ export type ToolHandlerContext = { state: ToolHandlerState; log: EmbeddedSubscribeLogger; hookRunner?: HookRunner; - flushBlockReplyBuffer: () => void; + flushBlockReplyBuffer: () => void | Promise; shouldEmitToolResult: () => boolean; shouldEmitToolOutput: () => boolean; emitToolSummary: (toolName?: string, meta?: string) => void; diff --git a/src/agents/pi-embedded-subscribe.promise.ts b/src/agents/pi-embedded-subscribe.promise.ts new file mode 100644 index 00000000000..0348290c1b9 --- /dev/null +++ b/src/agents/pi-embedded-subscribe.promise.ts @@ -0,0 +1,8 @@ +export function isPromiseLike(value: unknown): value is PromiseLike { + return Boolean( + value && + (typeof value === "object" || typeof value === "function") && + "then" in value && + typeof (value as { then?: unknown }).then === "function", + ); +} diff --git a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.calls-onblockreplyflush-before-tool-execution-start-preserve.test.ts b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.calls-onblockreplyflush-before-tool-execution-start-preserve.test.ts index 0cdab75d804..cdfb7ae1301 100644 --- a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.calls-onblockreplyflush-before-tool-execution-start-preserve.test.ts +++ b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.calls-onblockreplyflush-before-tool-execution-start-preserve.test.ts @@ -87,6 +87,45 @@ describe("subscribeEmbeddedPiSession", () => { expect(onBlockReplyFlush).toHaveBeenCalledTimes(1); }); + it("waits for async block replies before tool_execution_start flush", async () => { + const { session, emit } = createStubSessionHarness(); + const delivered: string[] = []; + const flushSnapshots: string[][] = []; + + subscribeEmbeddedPiSession({ + session: session as unknown as Parameters[0]["session"], + runId: "run-async-tool-flush", + onBlockReply: async (payload) => { + await Promise.resolve(); + if (payload.text) { + delivered.push(payload.text); + } + }, + onBlockReplyFlush: vi.fn(() => { + flushSnapshots.push([...delivered]); + }), + blockReplyBreak: "text_end", + blockReplyChunking: { minChars: 50, maxChars: 200 }, + }); + + emit({ + type: "message_start", + message: { role: "assistant" }, + }); + emitAssistantTextDelta({ emit, delta: "Short chunk." }); + + emit({ + type: "tool_execution_start", + toolName: "bash", + toolCallId: "tool-async-flush-1", + args: { command: "echo flush" }, + }); + await vi.waitFor(() => { + expect(delivered).toEqual(["Short chunk."]); + expect(flushSnapshots).toEqual([["Short chunk."]]); + }); + }); + it("calls onBlockReplyFlush at message_end for message-boundary turns", async () => { const { session, emit } = createStubSessionHarness(); @@ -121,4 +160,43 @@ describe("subscribeEmbeddedPiSession", () => { expect(onBlockReply.mock.calls[0]?.[0]?.text).toBe("Final reply before lifecycle end."); expect(onBlockReplyFlush).toHaveBeenCalledTimes(1); }); + + it("waits for async block replies before message_end flush", async () => { + const { session, emit } = createStubSessionHarness(); + const delivered: string[] = []; + const flushSnapshots: string[][] = []; + + subscribeEmbeddedPiSession({ + session: session as unknown as Parameters[0]["session"], + runId: "run-async-message-end-flush", + onBlockReply: async (payload) => { + await Promise.resolve(); + if (payload.text) { + delivered.push(payload.text); + } + }, + onBlockReplyFlush: vi.fn(() => { + flushSnapshots.push([...delivered]); + }), + blockReplyBreak: "message_end", + }); + + emit({ + type: "message_start", + message: { role: "assistant" }, + }); + emitAssistantTextDelta({ emit, delta: "Final reply before lifecycle end." }); + + emit({ + type: "message_end", + message: { + role: "assistant", + content: [{ type: "text", text: "Final reply before lifecycle end." }], + }, + }); + await vi.waitFor(() => { + expect(delivered).toEqual(["Final reply before lifecycle end."]); + expect(flushSnapshots).toEqual([["Final reply before lifecycle end."]]); + }); + }); }); diff --git a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.test.ts b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.test.ts index d50df122f8d..fa9a06ddd3a 100644 --- a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.test.ts +++ b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.test.ts @@ -153,6 +153,62 @@ describe("subscribeEmbeddedPiSession", () => { ]); }, ); + + it("does not let tool_execution_end delivery stall later assistant streaming", async () => { + let resolveToolResult: (() => void) | undefined; + const onToolResult = vi.fn( + () => + new Promise((resolve) => { + resolveToolResult = resolve; + }), + ); + const onPartialReply = vi.fn(); + + const { emit } = createSubscribedHarness({ + runId: "run", + onToolResult, + onPartialReply, + }); + + emit({ + type: "tool_execution_start", + toolName: "exec", + toolCallId: "tool-1", + args: { command: "echo hi" }, + }); + emit({ + type: "tool_execution_end", + toolName: "exec", + toolCallId: "tool-1", + isError: false, + result: { + details: { + status: "approval-pending", + approvalId: "12345678-1234-1234-1234-123456789012", + approvalSlug: "12345678", + host: "gateway", + command: "echo hi", + }, + }, + }); + + emit({ + type: "message_start", + message: { role: "assistant" }, + }); + emitAssistantTextDelta(emit, "After tool"); + + await vi.waitFor(() => { + expect(onToolResult).toHaveBeenCalledTimes(1); + expect(onPartialReply).toHaveBeenCalledWith( + expect.objectContaining({ text: "After tool", delta: "After tool" }), + ); + }); + + expect(resolveToolResult).toBeTypeOf("function"); + resolveToolResult?.(); + }); + it.each(THINKING_TAG_CASES)( "suppresses <%s> blocks across chunk boundaries", async ({ open, close }) => { diff --git a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.suppresses-message-end-block-replies-message-tool.test.ts b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.suppresses-message-end-block-replies-message-tool.test.ts index fae1018bf1e..13972d951c1 100644 --- a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.suppresses-message-end-block-replies-message-tool.test.ts +++ b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.suppresses-message-end-block-replies-message-tool.test.ts @@ -88,9 +88,9 @@ describe("subscribeEmbeddedPiSession", () => { result: { details: { status: "error" } }, }); emitAssistantMessageEnd(emit, messageText); - await Promise.resolve(); - - expect(onBlockReply).toHaveBeenCalledTimes(1); + await vi.waitFor(() => { + expect(onBlockReply).toHaveBeenCalledTimes(1); + }); }); it("ignores delivery-mirror assistant messages", async () => { diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts index f82d9b36ea1..6203bd748f8 100644 --- a/src/agents/pi-embedded-subscribe.ts +++ b/src/agents/pi-embedded-subscribe.ts @@ -19,6 +19,7 @@ import type { EmbeddedPiSubscribeContext, EmbeddedPiSubscribeState, } from "./pi-embedded-subscribe.handlers.types.js"; +import { isPromiseLike } from "./pi-embedded-subscribe.promise.js"; import { filterToolResultMediaUrls } from "./pi-embedded-subscribe.tools.js"; import type { SubscribeEmbeddedPiSessionParams } from "./pi-embedded-subscribe.types.js"; import { formatReasoningMessage, stripDowngradedToolCallText } from "./pi-embedded-utils.js"; @@ -104,6 +105,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar const messagingToolSentMediaUrls = state.messagingToolSentMediaUrls; const pendingMessagingTexts = state.pendingMessagingTexts; const pendingMessagingTargets = state.pendingMessagingTargets; + const pendingBlockReplyTasks = new Set>(); const replyDirectiveAccumulator = createStreamingDirectiveAccumulator(); const partialReplyDirectiveAccumulator = createStreamingDirectiveAccumulator(); const shouldAllowSilentTurnText = (text: string | undefined) => @@ -114,11 +116,21 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar if (!params.onBlockReply) { return; } - void Promise.resolve() - .then(() => params.onBlockReply?.(payload)) - .catch((err) => { + try { + const maybeTask = params.onBlockReply(payload); + if (!isPromiseLike(maybeTask)) { + return; + } + const task = Promise.resolve(maybeTask).catch((err) => { log.warn(`block reply callback failed: ${String(err)}`); }); + pendingBlockReplyTasks.add(task); + void task.finally(() => { + pendingBlockReplyTasks.delete(task); + }); + } catch (err) { + log.warn(`block reply callback failed: ${String(err)}`); + } }; const emitBlockReply = (payload: BlockReplyPayload) => { emitBlockReplySafely(consumePendingToolMediaIntoReply(state, payload)); @@ -554,19 +566,25 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar const consumePartialReplyDirectives = (text: string, options?: { final?: boolean }) => partialReplyDirectiveAccumulator.consume(text, options); - const flushBlockReplyBuffer = () => { + const flushBlockReplyBuffer = (): void | Promise => { if (!params.onBlockReply) { return; } if (blockChunker?.hasBuffered()) { blockChunker.drain({ force: true, emit: emitBlockChunk }); blockChunker.reset(); - return; - } - if (state.blockBuffer.length > 0) { + } else if (state.blockBuffer.length > 0) { emitBlockChunk(state.blockBuffer); state.blockBuffer = ""; } + if (pendingBlockReplyTasks.size === 0) { + return; + } + return (async () => { + while (pendingBlockReplyTasks.size > 0) { + await Promise.allSettled(pendingBlockReplyTasks); + } + })(); }; const emitReasoningStream = (text: string) => { diff --git a/src/auto-reply/reply/agent-runner-utils.test.ts b/src/auto-reply/reply/agent-runner-utils.test.ts index 2ab06633a88..cefebda08d5 100644 --- a/src/auto-reply/reply/agent-runner-utils.test.ts +++ b/src/auto-reply/reply/agent-runner-utils.test.ts @@ -127,6 +127,26 @@ describe("agent-runner-utils", () => { }); }); + it("does not force final-tag enforcement for minimax providers", () => { + const run = makeRun({ workspaceDir: process.cwd() }); + const authProfile = resolveProviderScopedAuthProfile({ + provider: "minimax", + primaryProvider: "minimax", + authProfileId: "profile-minimax", + authProfileIdSource: "user", + }); + + const resolved = buildEmbeddedRunBaseParams({ + run, + provider: "minimax", + model: "MiniMax-M2.7", + runId: "run-1", + authProfile, + }); + + expect(resolved.enforceFinalTag).toBe(false); + }); + it("builds embedded contexts and scopes auth profile by provider", () => { const run = makeRun({ authProfileId: "profile-openai", diff --git a/src/utils/provider-utils.ts b/src/utils/provider-utils.ts index ef2931fab0b..6949f5b366b 100644 --- a/src/utils/provider-utils.ts +++ b/src/utils/provider-utils.ts @@ -46,6 +46,9 @@ export function resolveReasoningOutputMode(params: { // handles reasoning natively via the `reasoning` field in streaming chunks, // so tag-based enforcement is unnecessary and causes all output to be // discarded as "(no output)" (#2279). + // Note: MiniMax is also intentionally excluded. In production it does not + // reliably wrap user-visible output in tags, so forcing tag + // enforcement suppresses normal assistant replies. if ( normalized === "google" || normalized === "google-gemini-cli" || @@ -54,11 +57,6 @@ export function resolveReasoningOutputMode(params: { return "tagged"; } - // Handle Minimax (M2.5 is chatty/reasoning-like) - if (normalized.includes("minimax")) { - return "tagged"; - } - return "native"; } diff --git a/src/utils/utils-misc.test.ts b/src/utils/utils-misc.test.ts index ea585bfecdf..2e79ac91ee6 100644 --- a/src/utils/utils-misc.test.ts +++ b/src/utils/utils-misc.test.ts @@ -70,8 +70,16 @@ describe("isReasoningTagProvider", () => { value: "google-generative-ai", expected: true, }, - { name: "returns true for minimax", value: "minimax", expected: true }, - { name: "returns true for minimax-cn", value: "minimax-cn", expected: true }, + { + name: "returns false for minimax - does not reliably honor wrappers in production", + value: "minimax", + expected: false, + }, + { + name: "returns false for minimax-cn", + value: "minimax-cn", + expected: false, + }, { name: "returns false for null", value: null, expected: false }, { name: "returns false for undefined", value: undefined, expected: false }, { name: "returns false for empty", value: "", expected: false }, @@ -83,7 +91,7 @@ describe("isReasoningTagProvider", () => { value: string | null | undefined; expected: boolean; }>)("$name", ({ value, expected }) => { - expect(isReasoningTagProvider(value)).toBe(expected); + expect(isReasoningTagProvider(value, { workspaceDir: process.cwd() })).toBe(expected); }); });