From df8f292039e27edec45b8ed2ad65ab0ac7f56194 Mon Sep 17 00:00:00 2001 From: Josh Lehman Date: Sat, 14 Mar 2026 07:54:21 -0700 Subject: [PATCH] fix: accumulate compaction counts across retries --- .../reply/agent-runner-execution.ts | 273 ++++++++---------- src/auto-reply/reply/followup-runner.ts | 1 + 2 files changed, 127 insertions(+), 147 deletions(-) diff --git a/src/auto-reply/reply/agent-runner-execution.ts b/src/auto-reply/reply/agent-runner-execution.ts index 15fc6bf508b..9ebc239f7ff 100644 --- a/src/auto-reply/reply/agent-runner-execution.ts +++ b/src/auto-reply/reply/agent-runner-execution.ts @@ -319,59 +319,79 @@ export async function runAgentTurnWithFallback(params: { }, ); return (async () => { - const result = await runEmbeddedPiAgent({ - ...embeddedContext, - trigger: params.isHeartbeat ? "heartbeat" : "user", - groupId: resolveGroupSessionKey(params.sessionCtx)?.id, - groupChannel: - params.sessionCtx.GroupChannel?.trim() ?? params.sessionCtx.GroupSubject?.trim(), - groupSpace: params.sessionCtx.GroupSpace?.trim() ?? undefined, - ...senderContext, - ...runBaseParams, - prompt: params.commandBody, - extraSystemPrompt: params.followupRun.run.extraSystemPrompt, - toolResultFormat: (() => { - const channel = resolveMessageChannel( - params.sessionCtx.Surface, - params.sessionCtx.Provider, - ); - if (!channel) { - return "markdown"; - } - return isMarkdownCapableMessageChannel(channel) ? "markdown" : "plain"; - })(), - suppressToolErrorWarnings: params.opts?.suppressToolErrorWarnings, - bootstrapContextMode: params.opts?.bootstrapContextMode, - bootstrapContextRunKind: params.opts?.isHeartbeat ? "heartbeat" : "default", - images: params.opts?.images, - abortSignal: params.opts?.abortSignal, - blockReplyBreak: params.resolvedBlockStreamingBreak, - blockReplyChunking: params.blockReplyChunking, - onPartialReply: async (payload) => { - const textForTyping = await handlePartialForTyping(payload); - if (!params.opts?.onPartialReply || textForTyping === undefined) { - return; - } - await params.opts.onPartialReply({ - text: textForTyping, - mediaUrls: payload.mediaUrls, - }); - }, - onAssistantMessageStart: async () => { - await params.typingSignals.signalMessageStart(); - await params.opts?.onAssistantMessageStart?.(); - }, - onReasoningStream: - params.typingSignals.shouldStartOnReasoning || params.opts?.onReasoningStream - ? async (payload) => { - await params.typingSignals.signalReasoningDelta(); - await params.opts?.onReasoningStream?.({ - text: payload.text, - mediaUrls: payload.mediaUrls, - }); + let attemptCompactionCount = 0; + try { + const result = await runEmbeddedPiAgent({ + ...embeddedContext, + trigger: params.isHeartbeat ? "heartbeat" : "user", + groupId: resolveGroupSessionKey(params.sessionCtx)?.id, + groupChannel: + params.sessionCtx.GroupChannel?.trim() ?? params.sessionCtx.GroupSubject?.trim(), + groupSpace: params.sessionCtx.GroupSpace?.trim() ?? undefined, + ...senderContext, + ...runBaseParams, + prompt: params.commandBody, + extraSystemPrompt: params.followupRun.run.extraSystemPrompt, + toolResultFormat: (() => { + const channel = resolveMessageChannel( + params.sessionCtx.Surface, + params.sessionCtx.Provider, + ); + if (!channel) { + return "markdown"; + } + return isMarkdownCapableMessageChannel(channel) ? "markdown" : "plain"; + })(), + suppressToolErrorWarnings: params.opts?.suppressToolErrorWarnings, + bootstrapContextMode: params.opts?.bootstrapContextMode, + bootstrapContextRunKind: params.opts?.isHeartbeat ? "heartbeat" : "default", + images: params.opts?.images, + abortSignal: params.opts?.abortSignal, + blockReplyBreak: params.resolvedBlockStreamingBreak, + blockReplyChunking: params.blockReplyChunking, + onPartialReply: async (payload) => { + const textForTyping = await handlePartialForTyping(payload); + if (!params.opts?.onPartialReply || textForTyping === undefined) { + return; + } + await params.opts.onPartialReply({ + text: textForTyping, + mediaUrls: payload.mediaUrls, + }); + }, + onAssistantMessageStart: async () => { + await params.typingSignals.signalMessageStart(); + await params.opts?.onAssistantMessageStart?.(); + }, + onReasoningStream: + params.typingSignals.shouldStartOnReasoning || params.opts?.onReasoningStream + ? async (payload) => { + await params.typingSignals.signalReasoningDelta(); + await params.opts?.onReasoningStream?.({ + text: payload.text, + mediaUrls: payload.mediaUrls, + }); + } + : undefined, + onReasoningEnd: params.opts?.onReasoningEnd, + onAgentEvent: async (evt) => { + // Signal run start only after the embedded agent emits real activity. + const hasLifecyclePhase = + evt.stream === "lifecycle" && typeof evt.data.phase === "string"; + if (evt.stream !== "lifecycle" || hasLifecyclePhase) { + notifyAgentRunStart(); + } + // Trigger typing when tools start executing. + // Must await to ensure typing indicator starts before tool summaries are emitted. + if (evt.stream === "tool") { + const phase = typeof evt.data.phase === "string" ? evt.data.phase : ""; + const name = typeof evt.data.name === "string" ? evt.data.name : undefined; + if (phase === "start" || phase === "update") { + await params.typingSignals.signalToolStart(); + await params.opts?.onToolStart?.({ name, phase }); } } - // Track auto-compaction completion and notify UI layer + // Track auto-compaction completion and notify UI layer. if (evt.stream === "compaction") { const phase = typeof evt.data.phase === "string" ? evt.data.phase : ""; if (phase === "start") { @@ -401,104 +421,63 @@ export async function runAgentTurnWithFallback(params: { directlySentBlockKeys, }) : undefined, - onReasoningEnd: params.opts?.onReasoningEnd, - onAgentEvent: async (evt) => { - // Signal run start only after the embedded agent emits real activity. - const hasLifecyclePhase = - evt.stream === "lifecycle" && typeof evt.data.phase === "string"; - if (evt.stream !== "lifecycle" || hasLifecyclePhase) { - notifyAgentRunStart(); - } - // Trigger typing when tools start executing. - // Must await to ensure typing indicator starts before tool summaries are emitted. - if (evt.stream === "tool") { - const phase = typeof evt.data.phase === "string" ? evt.data.phase : ""; - const name = typeof evt.data.name === "string" ? evt.data.name : undefined; - if (phase === "start" || phase === "update") { - await params.typingSignals.signalToolStart(); - await params.opts?.onToolStart?.({ name, phase }); - } - } - // Track auto-compaction completion and notify UI layer - if (evt.stream === "compaction") { - const phase = typeof evt.data.phase === "string" ? evt.data.phase : ""; - if (phase === "start") { - await params.opts?.onCompactionStart?.(); - } - if (phase === "end") { - autoCompactionCount += 1; - await params.opts?.onCompactionEnd?.(); - } - } - }, - // Always pass onBlockReply so flushBlockReplyBuffer works before tool execution, - // even when regular block streaming is disabled. The handler sends directly - // via opts.onBlockReply when the pipeline isn't available. - onBlockReply: params.opts?.onBlockReply - ? createBlockReplyDeliveryHandler({ - onBlockReply: params.opts.onBlockReply, - currentMessageId: - params.sessionCtx.MessageSidFull ?? params.sessionCtx.MessageSid, - normalizeStreamingText, - applyReplyToMode: params.applyReplyToMode, - normalizeMediaPaths: normalizeReplyMediaPaths, - typingSignals: params.typingSignals, - blockStreamingEnabled: params.blockStreamingEnabled, - blockReplyPipeline, - directlySentBlockKeys, - }) - : undefined, - onBlockReplyFlush: - params.blockStreamingEnabled && blockReplyPipeline - ? async () => { - await blockReplyPipeline.flush({ force: true }); - } - : undefined, - shouldEmitToolResult: params.shouldEmitToolResult, - shouldEmitToolOutput: params.shouldEmitToolOutput, - bootstrapPromptWarningSignaturesSeen, - bootstrapPromptWarningSignature: - bootstrapPromptWarningSignaturesSeen[ - bootstrapPromptWarningSignaturesSeen.length - 1 - ], - onToolResult: onToolResult - ? (() => { - // Serialize tool result delivery to preserve message ordering. - // Without this, concurrent tool callbacks race through typing signals - // and message sends, causing out-of-order delivery to the user. - // See: https://github.com/openclaw/openclaw/issues/11044 - let toolResultChain: Promise = Promise.resolve(); - return (payload: ReplyPayload) => { - toolResultChain = toolResultChain - .then(async () => { - const { text, skip } = normalizeStreamingText(payload); - if (skip) { - return; - } - await params.typingSignals.signalTextDelta(text); - await onToolResult({ - ...payload, - text, + onBlockReplyFlush: + params.blockStreamingEnabled && blockReplyPipeline + ? async () => { + await blockReplyPipeline.flush({ force: true }); + } + : undefined, + shouldEmitToolResult: params.shouldEmitToolResult, + shouldEmitToolOutput: params.shouldEmitToolOutput, + bootstrapPromptWarningSignaturesSeen, + bootstrapPromptWarningSignature: + bootstrapPromptWarningSignaturesSeen[ + bootstrapPromptWarningSignaturesSeen.length - 1 + ], + onToolResult: onToolResult + ? (() => { + // Serialize tool result delivery to preserve message ordering. + // Without this, concurrent tool callbacks race through typing signals + // and message sends, causing out-of-order delivery to the user. + // See: https://github.com/openclaw/openclaw/issues/11044 + let toolResultChain: Promise = Promise.resolve(); + return (payload: ReplyPayload) => { + toolResultChain = toolResultChain + .then(async () => { + const { text, skip } = normalizeStreamingText(payload); + if (skip) { + return; + } + await params.typingSignals.signalTextDelta(text); + await onToolResult({ + ...payload, + text, + }); + }) + .catch((err) => { + // Keep chain healthy after an error so later tool results still deliver. + logVerbose(`tool result delivery failed: ${String(err)}`); }); - }) - .catch((err) => { - // Keep chain healthy after an error so later tool results still deliver. - logVerbose(`tool result delivery failed: ${String(err)}`); + const task = toolResultChain.finally(() => { + params.pendingToolTasks.delete(task); }); - const task = toolResultChain.finally(() => { - params.pendingToolTasks.delete(task); - }); - params.pendingToolTasks.add(task); - }; - })() - : undefined, - }); - bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen( - result.meta?.systemPromptReport, - ); - const resultCompactionCount = Math.max(0, result.meta?.agentMeta?.compactionCount ?? 0); - autoCompactionCount = Math.max(autoCompactionCount, resultCompactionCount); - return result; + params.pendingToolTasks.add(task); + }; + })() + : undefined, + }); + bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen( + result.meta?.systemPromptReport, + ); + const resultCompactionCount = Math.max( + 0, + result.meta?.agentMeta?.compactionCount ?? 0, + ); + attemptCompactionCount = Math.max(attemptCompactionCount, resultCompactionCount); + return result; + } finally { + autoCompactionCount += attemptCompactionCount; + } })(); }, }); diff --git a/src/auto-reply/reply/followup-runner.ts b/src/auto-reply/reply/followup-runner.ts index e1a66898b9a..fe90d56433c 100644 --- a/src/auto-reply/reply/followup-runner.ts +++ b/src/auto-reply/reply/followup-runner.ts @@ -168,6 +168,7 @@ export function createFollowupRunner(params: { }), run: async (provider, model, runOptions) => { const authProfile = resolveRunAuthProfile(queued.run, provider); + let attemptCompactionCount = 0; try { const result = await runEmbeddedPiAgent({ sessionId: queued.run.sessionId,