feat(gateway): stream thinking events and decouple tool events from verbose level (#10568)

This commit is contained in:
Nate 2026-02-10 19:17:21 -06:00 committed by GitHub
parent d2c2f4185b
commit 2b02e8a7a8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 48 additions and 7 deletions

View File

@ -7,6 +7,7 @@ import type { SubscribeEmbeddedPiSessionParams } from "./pi-embedded-subscribe.t
import { parseReplyDirectives } from "../auto-reply/reply/reply-directives.js";
import { createStreamingDirectiveAccumulator } from "../auto-reply/reply/streaming-directives.js";
import { formatToolAggregate } from "../auto-reply/tool-meta.js";
import { emitAgentEvent } from "../infra/agent-events.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import { buildCodeSpanIndex, createInlineCodeState } from "../markdown/code-spans.js";
import { EmbeddedBlockChunker } from "./pi-embedded-block-chunker.js";
@ -533,7 +534,22 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
if (formatted === state.lastStreamedReasoning) {
return;
}
// Compute delta: new text since the last emitted reasoning.
// Guard against non-prefix changes (e.g. trim/format altering earlier content).
const prior = state.lastStreamedReasoning ?? "";
const delta = formatted.startsWith(prior) ? formatted.slice(prior.length) : formatted;
state.lastStreamedReasoning = formatted;
// Broadcast thinking event to WebSocket clients in real-time
emitAgentEvent({
runId: params.runId,
stream: "thinking",
data: {
text: formatted,
delta,
},
});
void params.onReasoningStream({
text: formatted,
});

View File

@ -84,7 +84,7 @@ describe("agent event handler", () => {
resetAgentRunContextForTest();
});
it("suppresses tool events when verbose is off", () => {
it("broadcasts tool events to WS recipients even when verbose is off, but skips node send", () => {
const broadcast = vi.fn();
const broadcastToConnIds = vi.fn();
const nodeSendToSession = vi.fn();
@ -114,7 +114,11 @@ describe("agent event handler", () => {
data: { phase: "start", name: "read", toolCallId: "t2" },
});
expect(broadcastToConnIds).not.toHaveBeenCalled();
// Tool events always broadcast to registered WS recipients
expect(broadcastToConnIds).toHaveBeenCalledTimes(1);
// But node/channel subscribers should NOT receive when verbose is off
const nodeToolCalls = nodeSendToSession.mock.calls.filter(([, event]) => event === "agent");
expect(nodeToolCalls).toHaveLength(0);
resetAgentRunContextForTest();
});

View File

@ -328,10 +328,7 @@ export function createAgentEventHandler({
const last = agentRunSeq.get(evt.runId) ?? 0;
const isToolEvent = evt.stream === "tool";
const toolVerbose = isToolEvent ? resolveToolVerboseLevel(evt.runId, sessionKey) : "off";
if (isToolEvent && toolVerbose === "off") {
agentRunSeq.set(evt.runId, evt.seq);
return;
}
// Build tool payload: strip result/partialResult unless verbose=full
const toolPayload =
isToolEvent && toolVerbose !== "full"
? (() => {
@ -356,6 +353,10 @@ export function createAgentEventHandler({
}
agentRunSeq.set(evt.runId, evt.seq);
if (isToolEvent) {
// Always broadcast tool events to registered WS recipients with
// tool-events capability, regardless of verboseLevel. The verbose
// setting only controls whether tool details are sent as channel
// messages to messaging surfaces (Telegram, Discord, etc.).
const recipients = toolEventRecipients.get(evt.runId);
if (recipients && recipients.size > 0) {
broadcastToConnIds("agent", toolPayload, recipients);
@ -368,7 +369,11 @@ export function createAgentEventHandler({
evt.stream === "lifecycle" && typeof evt.data?.phase === "string" ? evt.data.phase : null;
if (sessionKey) {
nodeSendToSession(sessionKey, "agent", isToolEvent ? toolPayload : agentPayload);
// Send tool events to node/channel subscribers only when verbose is enabled;
// WS clients already received the event above via broadcastToConnIds.
if (!isToolEvent || toolVerbose !== "off") {
nodeSendToSession(sessionKey, "agent", isToolEvent ? toolPayload : agentPayload);
}
if (!isAborted && evt.stream === "assistant" && typeof evt.data?.text === "string") {
emitChatDelta(sessionKey, clientRunId, evt.seq, evt.data.text);
} else if (!isAborted && (lifecyclePhase === "end" || lifecyclePhase === "error")) {

View File

@ -304,6 +304,14 @@ export const agentHandlers: GatewayRequestHandlers = {
);
if (connId && wantsToolEvents) {
context.registerToolEventRecipient(runId, connId);
// Register for any other active runs *in the same session* so
// late-joining clients (e.g. page refresh mid-response) receive
// in-progress tool events without leaking cross-session data.
for (const [activeRunId, active] of context.chatAbortControllers) {
if (activeRunId !== runId && active.sessionKey === requestedSessionKey) {
context.registerToolEventRecipient(activeRunId, connId);
}
}
}
const wantsDelivery = request.deliver === true;

View File

@ -535,6 +535,14 @@ export const chatHandlers: GatewayRequestHandlers = {
);
if (connId && wantsToolEvents) {
context.registerToolEventRecipient(runId, connId);
// Register for any other active runs *in the same session* so
// late-joining clients (e.g. page refresh mid-response) receive
// in-progress tool events without leaking cross-session data.
for (const [activeRunId, active] of context.chatAbortControllers) {
if (activeRunId !== runId && active.sessionKey === p.sessionKey) {
context.registerToolEventRecipient(activeRunId, connId);
}
}
}
},
onModelSelected,