fix: accumulate compaction counts across retries

This commit is contained in:
Josh Lehman 2026-03-14 07:54:21 -07:00
parent 37313844c1
commit df8f292039
No known key found for this signature in database
GPG Key ID: D141B425AC7F876B
2 changed files with 127 additions and 147 deletions

View File

@ -319,59 +319,79 @@ export async function runAgentTurnWithFallback(params: {
},
);
return (async () => {
const result = await runEmbeddedPiAgent({
...embeddedContext,
trigger: params.isHeartbeat ? "heartbeat" : "user",
groupId: resolveGroupSessionKey(params.sessionCtx)?.id,
groupChannel:
params.sessionCtx.GroupChannel?.trim() ?? params.sessionCtx.GroupSubject?.trim(),
groupSpace: params.sessionCtx.GroupSpace?.trim() ?? undefined,
...senderContext,
...runBaseParams,
prompt: params.commandBody,
extraSystemPrompt: params.followupRun.run.extraSystemPrompt,
toolResultFormat: (() => {
const channel = resolveMessageChannel(
params.sessionCtx.Surface,
params.sessionCtx.Provider,
);
if (!channel) {
return "markdown";
}
return isMarkdownCapableMessageChannel(channel) ? "markdown" : "plain";
})(),
suppressToolErrorWarnings: params.opts?.suppressToolErrorWarnings,
bootstrapContextMode: params.opts?.bootstrapContextMode,
bootstrapContextRunKind: params.opts?.isHeartbeat ? "heartbeat" : "default",
images: params.opts?.images,
abortSignal: params.opts?.abortSignal,
blockReplyBreak: params.resolvedBlockStreamingBreak,
blockReplyChunking: params.blockReplyChunking,
onPartialReply: async (payload) => {
const textForTyping = await handlePartialForTyping(payload);
if (!params.opts?.onPartialReply || textForTyping === undefined) {
return;
}
await params.opts.onPartialReply({
text: textForTyping,
mediaUrls: payload.mediaUrls,
});
},
onAssistantMessageStart: async () => {
await params.typingSignals.signalMessageStart();
await params.opts?.onAssistantMessageStart?.();
},
onReasoningStream:
params.typingSignals.shouldStartOnReasoning || params.opts?.onReasoningStream
? async (payload) => {
await params.typingSignals.signalReasoningDelta();
await params.opts?.onReasoningStream?.({
text: payload.text,
mediaUrls: payload.mediaUrls,
});
let attemptCompactionCount = 0;
try {
const result = await runEmbeddedPiAgent({
...embeddedContext,
trigger: params.isHeartbeat ? "heartbeat" : "user",
groupId: resolveGroupSessionKey(params.sessionCtx)?.id,
groupChannel:
params.sessionCtx.GroupChannel?.trim() ?? params.sessionCtx.GroupSubject?.trim(),
groupSpace: params.sessionCtx.GroupSpace?.trim() ?? undefined,
...senderContext,
...runBaseParams,
prompt: params.commandBody,
extraSystemPrompt: params.followupRun.run.extraSystemPrompt,
toolResultFormat: (() => {
const channel = resolveMessageChannel(
params.sessionCtx.Surface,
params.sessionCtx.Provider,
);
if (!channel) {
return "markdown";
}
return isMarkdownCapableMessageChannel(channel) ? "markdown" : "plain";
})(),
suppressToolErrorWarnings: params.opts?.suppressToolErrorWarnings,
bootstrapContextMode: params.opts?.bootstrapContextMode,
bootstrapContextRunKind: params.opts?.isHeartbeat ? "heartbeat" : "default",
images: params.opts?.images,
abortSignal: params.opts?.abortSignal,
blockReplyBreak: params.resolvedBlockStreamingBreak,
blockReplyChunking: params.blockReplyChunking,
onPartialReply: async (payload) => {
const textForTyping = await handlePartialForTyping(payload);
if (!params.opts?.onPartialReply || textForTyping === undefined) {
return;
}
await params.opts.onPartialReply({
text: textForTyping,
mediaUrls: payload.mediaUrls,
});
},
onAssistantMessageStart: async () => {
await params.typingSignals.signalMessageStart();
await params.opts?.onAssistantMessageStart?.();
},
onReasoningStream:
params.typingSignals.shouldStartOnReasoning || params.opts?.onReasoningStream
? async (payload) => {
await params.typingSignals.signalReasoningDelta();
await params.opts?.onReasoningStream?.({
text: payload.text,
mediaUrls: payload.mediaUrls,
});
}
: undefined,
onReasoningEnd: params.opts?.onReasoningEnd,
onAgentEvent: async (evt) => {
// Signal run start only after the embedded agent emits real activity.
const hasLifecyclePhase =
evt.stream === "lifecycle" && typeof evt.data.phase === "string";
if (evt.stream !== "lifecycle" || hasLifecyclePhase) {
notifyAgentRunStart();
}
// Trigger typing when tools start executing.
// Must await to ensure typing indicator starts before tool summaries are emitted.
if (evt.stream === "tool") {
const phase = typeof evt.data.phase === "string" ? evt.data.phase : "";
const name = typeof evt.data.name === "string" ? evt.data.name : undefined;
if (phase === "start" || phase === "update") {
await params.typingSignals.signalToolStart();
await params.opts?.onToolStart?.({ name, phase });
}
}
// Track auto-compaction completion and notify UI layer
// Track auto-compaction completion and notify UI layer.
if (evt.stream === "compaction") {
const phase = typeof evt.data.phase === "string" ? evt.data.phase : "";
if (phase === "start") {
@ -401,104 +421,63 @@ export async function runAgentTurnWithFallback(params: {
directlySentBlockKeys,
})
: undefined,
onReasoningEnd: params.opts?.onReasoningEnd,
onAgentEvent: async (evt) => {
// Signal run start only after the embedded agent emits real activity.
const hasLifecyclePhase =
evt.stream === "lifecycle" && typeof evt.data.phase === "string";
if (evt.stream !== "lifecycle" || hasLifecyclePhase) {
notifyAgentRunStart();
}
// Trigger typing when tools start executing.
// Must await to ensure typing indicator starts before tool summaries are emitted.
if (evt.stream === "tool") {
const phase = typeof evt.data.phase === "string" ? evt.data.phase : "";
const name = typeof evt.data.name === "string" ? evt.data.name : undefined;
if (phase === "start" || phase === "update") {
await params.typingSignals.signalToolStart();
await params.opts?.onToolStart?.({ name, phase });
}
}
// Track auto-compaction completion and notify UI layer
if (evt.stream === "compaction") {
const phase = typeof evt.data.phase === "string" ? evt.data.phase : "";
if (phase === "start") {
await params.opts?.onCompactionStart?.();
}
if (phase === "end") {
autoCompactionCount += 1;
await params.opts?.onCompactionEnd?.();
}
}
},
// Always pass onBlockReply so flushBlockReplyBuffer works before tool execution,
// even when regular block streaming is disabled. The handler sends directly
// via opts.onBlockReply when the pipeline isn't available.
onBlockReply: params.opts?.onBlockReply
? createBlockReplyDeliveryHandler({
onBlockReply: params.opts.onBlockReply,
currentMessageId:
params.sessionCtx.MessageSidFull ?? params.sessionCtx.MessageSid,
normalizeStreamingText,
applyReplyToMode: params.applyReplyToMode,
normalizeMediaPaths: normalizeReplyMediaPaths,
typingSignals: params.typingSignals,
blockStreamingEnabled: params.blockStreamingEnabled,
blockReplyPipeline,
directlySentBlockKeys,
})
: undefined,
onBlockReplyFlush:
params.blockStreamingEnabled && blockReplyPipeline
? async () => {
await blockReplyPipeline.flush({ force: true });
}
: undefined,
shouldEmitToolResult: params.shouldEmitToolResult,
shouldEmitToolOutput: params.shouldEmitToolOutput,
bootstrapPromptWarningSignaturesSeen,
bootstrapPromptWarningSignature:
bootstrapPromptWarningSignaturesSeen[
bootstrapPromptWarningSignaturesSeen.length - 1
],
onToolResult: onToolResult
? (() => {
// Serialize tool result delivery to preserve message ordering.
// Without this, concurrent tool callbacks race through typing signals
// and message sends, causing out-of-order delivery to the user.
// See: https://github.com/openclaw/openclaw/issues/11044
let toolResultChain: Promise<void> = Promise.resolve();
return (payload: ReplyPayload) => {
toolResultChain = toolResultChain
.then(async () => {
const { text, skip } = normalizeStreamingText(payload);
if (skip) {
return;
}
await params.typingSignals.signalTextDelta(text);
await onToolResult({
...payload,
text,
onBlockReplyFlush:
params.blockStreamingEnabled && blockReplyPipeline
? async () => {
await blockReplyPipeline.flush({ force: true });
}
: undefined,
shouldEmitToolResult: params.shouldEmitToolResult,
shouldEmitToolOutput: params.shouldEmitToolOutput,
bootstrapPromptWarningSignaturesSeen,
bootstrapPromptWarningSignature:
bootstrapPromptWarningSignaturesSeen[
bootstrapPromptWarningSignaturesSeen.length - 1
],
onToolResult: onToolResult
? (() => {
// Serialize tool result delivery to preserve message ordering.
// Without this, concurrent tool callbacks race through typing signals
// and message sends, causing out-of-order delivery to the user.
// See: https://github.com/openclaw/openclaw/issues/11044
let toolResultChain: Promise<void> = Promise.resolve();
return (payload: ReplyPayload) => {
toolResultChain = toolResultChain
.then(async () => {
const { text, skip } = normalizeStreamingText(payload);
if (skip) {
return;
}
await params.typingSignals.signalTextDelta(text);
await onToolResult({
...payload,
text,
});
})
.catch((err) => {
// Keep chain healthy after an error so later tool results still deliver.
logVerbose(`tool result delivery failed: ${String(err)}`);
});
})
.catch((err) => {
// Keep chain healthy after an error so later tool results still deliver.
logVerbose(`tool result delivery failed: ${String(err)}`);
const task = toolResultChain.finally(() => {
params.pendingToolTasks.delete(task);
});
const task = toolResultChain.finally(() => {
params.pendingToolTasks.delete(task);
});
params.pendingToolTasks.add(task);
};
})()
: undefined,
});
bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen(
result.meta?.systemPromptReport,
);
const resultCompactionCount = Math.max(0, result.meta?.agentMeta?.compactionCount ?? 0);
autoCompactionCount = Math.max(autoCompactionCount, resultCompactionCount);
return result;
params.pendingToolTasks.add(task);
};
})()
: undefined,
});
bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen(
result.meta?.systemPromptReport,
);
const resultCompactionCount = Math.max(
0,
result.meta?.agentMeta?.compactionCount ?? 0,
);
attemptCompactionCount = Math.max(attemptCompactionCount, resultCompactionCount);
return result;
} finally {
autoCompactionCount += attemptCompactionCount;
}
})();
},
});

View File

@ -168,6 +168,7 @@ export function createFollowupRunner(params: {
}),
run: async (provider, model, runOptions) => {
const authProfile = resolveRunAuthProfile(queued.run, provider);
let attemptCompactionCount = 0;
try {
const result = await runEmbeddedPiAgent({
sessionId: queued.run.sessionId,