From 2b02e8a7a8c144675ec55b579fe773591588dc5e Mon Sep 17 00:00:00 2001 From: Nate <12980165+nk1tz@users.noreply.github.com> Date: Tue, 10 Feb 2026 19:17:21 -0600 Subject: [PATCH] feat(gateway): stream thinking events and decouple tool events from verbose level (#10568) --- src/agents/pi-embedded-subscribe.ts | 16 ++++++++++++++++ src/gateway/server-chat.agent-events.test.ts | 8 ++++++-- src/gateway/server-chat.ts | 15 ++++++++++----- src/gateway/server-methods/agent.ts | 8 ++++++++ src/gateway/server-methods/chat.ts | 8 ++++++++ 5 files changed, 48 insertions(+), 7 deletions(-) diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts index cf073b92c1b..75b6a8d1dbb 100644 --- a/src/agents/pi-embedded-subscribe.ts +++ b/src/agents/pi-embedded-subscribe.ts @@ -7,6 +7,7 @@ import type { SubscribeEmbeddedPiSessionParams } from "./pi-embedded-subscribe.t import { parseReplyDirectives } from "../auto-reply/reply/reply-directives.js"; import { createStreamingDirectiveAccumulator } from "../auto-reply/reply/streaming-directives.js"; import { formatToolAggregate } from "../auto-reply/tool-meta.js"; +import { emitAgentEvent } from "../infra/agent-events.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; import { buildCodeSpanIndex, createInlineCodeState } from "../markdown/code-spans.js"; import { EmbeddedBlockChunker } from "./pi-embedded-block-chunker.js"; @@ -533,7 +534,22 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar if (formatted === state.lastStreamedReasoning) { return; } + // Compute delta: new text since the last emitted reasoning. + // Guard against non-prefix changes (e.g. trim/format altering earlier content). + const prior = state.lastStreamedReasoning ?? ""; + const delta = formatted.startsWith(prior) ? formatted.slice(prior.length) : formatted; state.lastStreamedReasoning = formatted; + + // Broadcast thinking event to WebSocket clients in real-time + emitAgentEvent({ + runId: params.runId, + stream: "thinking", + data: { + text: formatted, + delta, + }, + }); + void params.onReasoningStream({ text: formatted, }); diff --git a/src/gateway/server-chat.agent-events.test.ts b/src/gateway/server-chat.agent-events.test.ts index 33dc90155bc..95fd32d496d 100644 --- a/src/gateway/server-chat.agent-events.test.ts +++ b/src/gateway/server-chat.agent-events.test.ts @@ -84,7 +84,7 @@ describe("agent event handler", () => { resetAgentRunContextForTest(); }); - it("suppresses tool events when verbose is off", () => { + it("broadcasts tool events to WS recipients even when verbose is off, but skips node send", () => { const broadcast = vi.fn(); const broadcastToConnIds = vi.fn(); const nodeSendToSession = vi.fn(); @@ -114,7 +114,11 @@ describe("agent event handler", () => { data: { phase: "start", name: "read", toolCallId: "t2" }, }); - expect(broadcastToConnIds).not.toHaveBeenCalled(); + // Tool events always broadcast to registered WS recipients + expect(broadcastToConnIds).toHaveBeenCalledTimes(1); + // But node/channel subscribers should NOT receive when verbose is off + const nodeToolCalls = nodeSendToSession.mock.calls.filter(([, event]) => event === "agent"); + expect(nodeToolCalls).toHaveLength(0); resetAgentRunContextForTest(); }); diff --git a/src/gateway/server-chat.ts b/src/gateway/server-chat.ts index 2f9d17d577a..23586291446 100644 --- a/src/gateway/server-chat.ts +++ b/src/gateway/server-chat.ts @@ -328,10 +328,7 @@ export function createAgentEventHandler({ const last = agentRunSeq.get(evt.runId) ?? 0; const isToolEvent = evt.stream === "tool"; const toolVerbose = isToolEvent ? resolveToolVerboseLevel(evt.runId, sessionKey) : "off"; - if (isToolEvent && toolVerbose === "off") { - agentRunSeq.set(evt.runId, evt.seq); - return; - } + // Build tool payload: strip result/partialResult unless verbose=full const toolPayload = isToolEvent && toolVerbose !== "full" ? (() => { @@ -356,6 +353,10 @@ export function createAgentEventHandler({ } agentRunSeq.set(evt.runId, evt.seq); if (isToolEvent) { + // Always broadcast tool events to registered WS recipients with + // tool-events capability, regardless of verboseLevel. The verbose + // setting only controls whether tool details are sent as channel + // messages to messaging surfaces (Telegram, Discord, etc.). const recipients = toolEventRecipients.get(evt.runId); if (recipients && recipients.size > 0) { broadcastToConnIds("agent", toolPayload, recipients); @@ -368,7 +369,11 @@ export function createAgentEventHandler({ evt.stream === "lifecycle" && typeof evt.data?.phase === "string" ? evt.data.phase : null; if (sessionKey) { - nodeSendToSession(sessionKey, "agent", isToolEvent ? toolPayload : agentPayload); + // Send tool events to node/channel subscribers only when verbose is enabled; + // WS clients already received the event above via broadcastToConnIds. + if (!isToolEvent || toolVerbose !== "off") { + nodeSendToSession(sessionKey, "agent", isToolEvent ? toolPayload : agentPayload); + } if (!isAborted && evt.stream === "assistant" && typeof evt.data?.text === "string") { emitChatDelta(sessionKey, clientRunId, evt.seq, evt.data.text); } else if (!isAborted && (lifecyclePhase === "end" || lifecyclePhase === "error")) { diff --git a/src/gateway/server-methods/agent.ts b/src/gateway/server-methods/agent.ts index 6ba6f9731fd..3f828103ab5 100644 --- a/src/gateway/server-methods/agent.ts +++ b/src/gateway/server-methods/agent.ts @@ -304,6 +304,14 @@ export const agentHandlers: GatewayRequestHandlers = { ); if (connId && wantsToolEvents) { context.registerToolEventRecipient(runId, connId); + // Register for any other active runs *in the same session* so + // late-joining clients (e.g. page refresh mid-response) receive + // in-progress tool events without leaking cross-session data. + for (const [activeRunId, active] of context.chatAbortControllers) { + if (activeRunId !== runId && active.sessionKey === requestedSessionKey) { + context.registerToolEventRecipient(activeRunId, connId); + } + } } const wantsDelivery = request.deliver === true; diff --git a/src/gateway/server-methods/chat.ts b/src/gateway/server-methods/chat.ts index af2b50a8899..d19d98072b6 100644 --- a/src/gateway/server-methods/chat.ts +++ b/src/gateway/server-methods/chat.ts @@ -535,6 +535,14 @@ export const chatHandlers: GatewayRequestHandlers = { ); if (connId && wantsToolEvents) { context.registerToolEventRecipient(runId, connId); + // Register for any other active runs *in the same session* so + // late-joining clients (e.g. page refresh mid-response) receive + // in-progress tool events without leaking cross-session data. + for (const [activeRunId, active] of context.chatAbortControllers) { + if (activeRunId !== runId && active.sessionKey === p.sessionKey) { + context.registerToolEventRecipient(activeRunId, connId); + } + } } }, onModelSelected,