fix: persist context-engine auto-compaction counts (#42629)

Merged via squash.

Prepared head SHA: df8f292039
Co-authored-by: uf-hy <41638541+uf-hy@users.noreply.github.com>
Co-authored-by: jalehman <550978+jalehman@users.noreply.github.com>
Reviewed-by: @jalehman
This commit is contained in:
ufhy 2026-03-15 07:22:10 +08:00 committed by GitHub
parent 8a607d7553
commit 3928b4872a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 614 additions and 223 deletions

View File

@ -99,6 +99,7 @@ Docs: https://docs.openclaw.ai
- Mattermost/thread routing: non-inbound reply paths (TUI/WebUI turns, tool-call callbacks, subagent responses) now correctly route to the originating Mattermost thread when `replyToMode: "all"` is active; also prevents stale `origin.threadId` metadata from resurrecting cleared thread routes. (#44283) thanks @teconomix
- Gateway/websocket pairing bypass for disabled auth: skip device-pairing enforcement when `gateway.auth.mode=none` so Control UI connections behind reverse proxies no longer get stuck on `pairing required` (code 1008) despite auth being explicitly disabled. (#42931)
- Auth/login lockout recovery: clear stale `auth_permanent` and `billing` disabled state for all profiles matching the target provider when `openclaw models auth login` is invoked, so users locked out by expired or revoked OAuth tokens can recover by re-authenticating instead of waiting for the cooldown timer to expire. (#43057)
- Auto-reply/context-engine compaction: persist the exact embedded-run metadata compaction count for main and followup runner session accounting, so metadata-only auto-compactions no longer undercount multi-compaction runs. (#42629) thanks @uf-hy.
## 2026.3.12

View File

@ -64,11 +64,11 @@ export function handleAutoCompactionEnd(
emitAgentEvent({
runId: ctx.params.runId,
stream: "compaction",
data: { phase: "end", willRetry },
data: { phase: "end", willRetry, completed: hasResult && !wasAborted },
});
void ctx.params.onAgentEvent?.({
stream: "compaction",
data: { phase: "end", willRetry },
data: { phase: "end", willRetry, completed: hasResult && !wasAborted },
});
// Run after_compaction plugin hook (fire-and-forget)

View File

@ -67,7 +67,7 @@ export type AgentRunLoopResult =
fallbackModel?: string;
fallbackAttempts: RuntimeFallbackAttempt[];
didLogHeartbeatStrip: boolean;
autoCompactionCompleted: boolean;
autoCompactionCount: number;
/** Payload keys sent directly (not via pipeline) during tool flush. */
directlySentBlockKeys?: Set<string>;
}
@ -103,7 +103,7 @@ export async function runAgentTurnWithFallback(params: {
}): Promise<AgentRunLoopResult> {
const TRANSIENT_HTTP_RETRY_DELAY_MS = 2_500;
let didLogHeartbeatStrip = false;
let autoCompactionCompleted = false;
let autoCompactionCount = 0;
// Track payloads sent directly (not via pipeline) during tool flush to avoid duplicates.
const directlySentBlockKeys = new Set<string>();
@ -319,154 +319,165 @@ export async function runAgentTurnWithFallback(params: {
},
);
return (async () => {
const result = await runEmbeddedPiAgent({
...embeddedContext,
trigger: params.isHeartbeat ? "heartbeat" : "user",
groupId: resolveGroupSessionKey(params.sessionCtx)?.id,
groupChannel:
params.sessionCtx.GroupChannel?.trim() ?? params.sessionCtx.GroupSubject?.trim(),
groupSpace: params.sessionCtx.GroupSpace?.trim() ?? undefined,
...senderContext,
...runBaseParams,
prompt: params.commandBody,
extraSystemPrompt: params.followupRun.run.extraSystemPrompt,
toolResultFormat: (() => {
const channel = resolveMessageChannel(
params.sessionCtx.Surface,
params.sessionCtx.Provider,
);
if (!channel) {
return "markdown";
}
return isMarkdownCapableMessageChannel(channel) ? "markdown" : "plain";
})(),
suppressToolErrorWarnings: params.opts?.suppressToolErrorWarnings,
bootstrapContextMode: params.opts?.bootstrapContextMode,
bootstrapContextRunKind: params.opts?.isHeartbeat ? "heartbeat" : "default",
images: params.opts?.images,
abortSignal: params.opts?.abortSignal,
blockReplyBreak: params.resolvedBlockStreamingBreak,
blockReplyChunking: params.blockReplyChunking,
onPartialReply: async (payload) => {
const textForTyping = await handlePartialForTyping(payload);
if (!params.opts?.onPartialReply || textForTyping === undefined) {
return;
}
await params.opts.onPartialReply({
text: textForTyping,
mediaUrls: payload.mediaUrls,
});
},
onAssistantMessageStart: async () => {
await params.typingSignals.signalMessageStart();
await params.opts?.onAssistantMessageStart?.();
},
onReasoningStream:
params.typingSignals.shouldStartOnReasoning || params.opts?.onReasoningStream
? async (payload) => {
await params.typingSignals.signalReasoningDelta();
await params.opts?.onReasoningStream?.({
text: payload.text,
mediaUrls: payload.mediaUrls,
});
}
: undefined,
onReasoningEnd: params.opts?.onReasoningEnd,
onAgentEvent: async (evt) => {
// Signal run start only after the embedded agent emits real activity.
const hasLifecyclePhase =
evt.stream === "lifecycle" && typeof evt.data.phase === "string";
if (evt.stream !== "lifecycle" || hasLifecyclePhase) {
notifyAgentRunStart();
}
// Trigger typing when tools start executing.
// Must await to ensure typing indicator starts before tool summaries are emitted.
if (evt.stream === "tool") {
const phase = typeof evt.data.phase === "string" ? evt.data.phase : "";
const name = typeof evt.data.name === "string" ? evt.data.name : undefined;
if (phase === "start" || phase === "update") {
await params.typingSignals.signalToolStart();
await params.opts?.onToolStart?.({ name, phase });
let attemptCompactionCount = 0;
try {
const result = await runEmbeddedPiAgent({
...embeddedContext,
trigger: params.isHeartbeat ? "heartbeat" : "user",
groupId: resolveGroupSessionKey(params.sessionCtx)?.id,
groupChannel:
params.sessionCtx.GroupChannel?.trim() ?? params.sessionCtx.GroupSubject?.trim(),
groupSpace: params.sessionCtx.GroupSpace?.trim() ?? undefined,
...senderContext,
...runBaseParams,
prompt: params.commandBody,
extraSystemPrompt: params.followupRun.run.extraSystemPrompt,
toolResultFormat: (() => {
const channel = resolveMessageChannel(
params.sessionCtx.Surface,
params.sessionCtx.Provider,
);
if (!channel) {
return "markdown";
}
}
// Track auto-compaction completion and notify UI layer
if (evt.stream === "compaction") {
const phase = typeof evt.data.phase === "string" ? evt.data.phase : "";
if (phase === "start") {
await params.opts?.onCompactionStart?.();
return isMarkdownCapableMessageChannel(channel) ? "markdown" : "plain";
})(),
suppressToolErrorWarnings: params.opts?.suppressToolErrorWarnings,
bootstrapContextMode: params.opts?.bootstrapContextMode,
bootstrapContextRunKind: params.opts?.isHeartbeat ? "heartbeat" : "default",
images: params.opts?.images,
abortSignal: params.opts?.abortSignal,
blockReplyBreak: params.resolvedBlockStreamingBreak,
blockReplyChunking: params.blockReplyChunking,
onPartialReply: async (payload) => {
const textForTyping = await handlePartialForTyping(payload);
if (!params.opts?.onPartialReply || textForTyping === undefined) {
return;
}
if (phase === "end") {
autoCompactionCompleted = true;
await params.opts?.onCompactionEnd?.();
}
}
},
// Always pass onBlockReply so flushBlockReplyBuffer works before tool execution,
// even when regular block streaming is disabled. The handler sends directly
// via opts.onBlockReply when the pipeline isn't available.
onBlockReply: params.opts?.onBlockReply
? createBlockReplyDeliveryHandler({
onBlockReply: params.opts.onBlockReply,
currentMessageId:
params.sessionCtx.MessageSidFull ?? params.sessionCtx.MessageSid,
normalizeStreamingText,
applyReplyToMode: params.applyReplyToMode,
normalizeMediaPaths: normalizeReplyMediaPaths,
typingSignals: params.typingSignals,
blockStreamingEnabled: params.blockStreamingEnabled,
blockReplyPipeline,
directlySentBlockKeys,
})
: undefined,
onBlockReplyFlush:
params.blockStreamingEnabled && blockReplyPipeline
? async () => {
await blockReplyPipeline.flush({ force: true });
}
: undefined,
shouldEmitToolResult: params.shouldEmitToolResult,
shouldEmitToolOutput: params.shouldEmitToolOutput,
bootstrapPromptWarningSignaturesSeen,
bootstrapPromptWarningSignature:
bootstrapPromptWarningSignaturesSeen[
bootstrapPromptWarningSignaturesSeen.length - 1
],
onToolResult: onToolResult
? (() => {
// Serialize tool result delivery to preserve message ordering.
// Without this, concurrent tool callbacks race through typing signals
// and message sends, causing out-of-order delivery to the user.
// See: https://github.com/openclaw/openclaw/issues/11044
let toolResultChain: Promise<void> = Promise.resolve();
return (payload: ReplyPayload) => {
toolResultChain = toolResultChain
.then(async () => {
const { text, skip } = normalizeStreamingText(payload);
if (skip) {
return;
}
await params.typingSignals.signalTextDelta(text);
await onToolResult({
...payload,
text,
});
})
.catch((err) => {
// Keep chain healthy after an error so later tool results still deliver.
logVerbose(`tool result delivery failed: ${String(err)}`);
await params.opts.onPartialReply({
text: textForTyping,
mediaUrls: payload.mediaUrls,
});
},
onAssistantMessageStart: async () => {
await params.typingSignals.signalMessageStart();
await params.opts?.onAssistantMessageStart?.();
},
onReasoningStream:
params.typingSignals.shouldStartOnReasoning || params.opts?.onReasoningStream
? async (payload) => {
await params.typingSignals.signalReasoningDelta();
await params.opts?.onReasoningStream?.({
text: payload.text,
mediaUrls: payload.mediaUrls,
});
const task = toolResultChain.finally(() => {
params.pendingToolTasks.delete(task);
});
params.pendingToolTasks.add(task);
};
})()
: undefined,
});
bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen(
result.meta?.systemPromptReport,
);
return result;
}
: undefined,
onReasoningEnd: params.opts?.onReasoningEnd,
onAgentEvent: async (evt) => {
// Signal run start only after the embedded agent emits real activity.
const hasLifecyclePhase =
evt.stream === "lifecycle" && typeof evt.data.phase === "string";
if (evt.stream !== "lifecycle" || hasLifecyclePhase) {
notifyAgentRunStart();
}
// Trigger typing when tools start executing.
// Must await to ensure typing indicator starts before tool summaries are emitted.
if (evt.stream === "tool") {
const phase = typeof evt.data.phase === "string" ? evt.data.phase : "";
const name = typeof evt.data.name === "string" ? evt.data.name : undefined;
if (phase === "start" || phase === "update") {
await params.typingSignals.signalToolStart();
await params.opts?.onToolStart?.({ name, phase });
}
}
// Track auto-compaction completion and notify UI layer.
if (evt.stream === "compaction") {
const phase = typeof evt.data.phase === "string" ? evt.data.phase : "";
if (phase === "start") {
await params.opts?.onCompactionStart?.();
}
const completed = evt.data?.completed === true;
if (phase === "end" && completed) {
attemptCompactionCount += 1;
await params.opts?.onCompactionEnd?.();
}
}
},
// Always pass onBlockReply so flushBlockReplyBuffer works before tool execution,
// even when regular block streaming is disabled. The handler sends directly
// via opts.onBlockReply when the pipeline isn't available.
onBlockReply: params.opts?.onBlockReply
? createBlockReplyDeliveryHandler({
onBlockReply: params.opts.onBlockReply,
currentMessageId:
params.sessionCtx.MessageSidFull ?? params.sessionCtx.MessageSid,
normalizeStreamingText,
applyReplyToMode: params.applyReplyToMode,
normalizeMediaPaths: normalizeReplyMediaPaths,
typingSignals: params.typingSignals,
blockStreamingEnabled: params.blockStreamingEnabled,
blockReplyPipeline,
directlySentBlockKeys,
})
: undefined,
onBlockReplyFlush:
params.blockStreamingEnabled && blockReplyPipeline
? async () => {
await blockReplyPipeline.flush({ force: true });
}
: undefined,
shouldEmitToolResult: params.shouldEmitToolResult,
shouldEmitToolOutput: params.shouldEmitToolOutput,
bootstrapPromptWarningSignaturesSeen,
bootstrapPromptWarningSignature:
bootstrapPromptWarningSignaturesSeen[
bootstrapPromptWarningSignaturesSeen.length - 1
],
onToolResult: onToolResult
? (() => {
// Serialize tool result delivery to preserve message ordering.
// Without this, concurrent tool callbacks race through typing signals
// and message sends, causing out-of-order delivery to the user.
// See: https://github.com/openclaw/openclaw/issues/11044
let toolResultChain: Promise<void> = Promise.resolve();
return (payload: ReplyPayload) => {
toolResultChain = toolResultChain
.then(async () => {
const { text, skip } = normalizeStreamingText(payload);
if (skip) {
return;
}
await params.typingSignals.signalTextDelta(text);
await onToolResult({
...payload,
text,
});
})
.catch((err) => {
// Keep chain healthy after an error so later tool results still deliver.
logVerbose(`tool result delivery failed: ${String(err)}`);
});
const task = toolResultChain.finally(() => {
params.pendingToolTasks.delete(task);
});
params.pendingToolTasks.add(task);
};
})()
: undefined,
});
bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen(
result.meta?.systemPromptReport,
);
const resultCompactionCount = Math.max(
0,
result.meta?.agentMeta?.compactionCount ?? 0,
);
attemptCompactionCount = Math.max(attemptCompactionCount, resultCompactionCount);
return result;
} finally {
autoCompactionCount += attemptCompactionCount;
}
})();
},
});
@ -654,7 +665,7 @@ export async function runAgentTurnWithFallback(params: {
fallbackModel,
fallbackAttempts,
didLogHeartbeatStrip,
autoCompactionCompleted,
autoCompactionCount,
directlySentBlockKeys: directlySentBlockKeys.size > 0 ? directlySentBlockKeys : undefined,
};
}

View File

@ -322,7 +322,7 @@ describe("runReplyAgent auto-compaction token update", () => {
extraSystemPrompt?: string;
onAgentEvent?: (evt: {
stream?: string;
data?: { phase?: string; willRetry?: boolean };
data?: { phase?: string; willRetry?: boolean; completed?: boolean };
}) => void;
};
@ -397,7 +397,10 @@ describe("runReplyAgent auto-compaction token update", () => {
runEmbeddedPiAgentMock.mockImplementation(async (params: EmbeddedRunParams) => {
// Simulate auto-compaction during agent run
params.onAgentEvent?.({ stream: "compaction", data: { phase: "start" } });
params.onAgentEvent?.({ stream: "compaction", data: { phase: "end", willRetry: false } });
params.onAgentEvent?.({
stream: "compaction",
data: { phase: "end", willRetry: false, completed: true },
});
return {
payloads: [{ text: "done" }],
meta: {
@ -455,6 +458,238 @@ describe("runReplyAgent auto-compaction token update", () => {
expect(stored[sessionKey].compactionCount).toBe(1);
});
it("tracks auto-compaction from embedded result metadata even when no compaction event is emitted", async () => {
const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-compact-meta-"));
const storePath = path.join(tmp, "sessions.json");
const sessionKey = "main";
const sessionEntry = {
sessionId: "session",
updatedAt: Date.now(),
totalTokens: 181_000,
compactionCount: 0,
};
await seedSessionStore({ storePath, sessionKey, entry: sessionEntry });
runEmbeddedPiAgentMock.mockResolvedValue({
payloads: [{ text: "done" }],
meta: {
agentMeta: {
usage: { input: 190_000, output: 8_000, total: 198_000 },
lastCallUsage: { input: 10_000, output: 3_000, total: 13_000 },
compactionCount: 2,
},
},
});
const config = {
agents: { defaults: { compaction: { memoryFlush: { enabled: false } } } },
};
const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({
storePath,
sessionEntry,
config,
});
await runReplyAgent({
commandBody: "hello",
followupRun,
queueKey: "main",
resolvedQueue,
shouldSteer: false,
shouldFollowup: false,
isActive: false,
isStreaming: false,
typing,
sessionCtx,
sessionEntry,
sessionStore: { [sessionKey]: sessionEntry },
sessionKey,
storePath,
defaultModel: "anthropic/claude-opus-4-5",
agentCfgContextTokens: 200_000,
resolvedVerboseLevel: "off",
isNewSession: false,
blockStreamingEnabled: false,
resolvedBlockStreamingBreak: "message_end",
shouldInjectGroupIntro: false,
typingMode: "instant",
});
const stored = JSON.parse(await fs.readFile(storePath, "utf-8"));
expect(stored[sessionKey].totalTokens).toBe(10_000);
expect(stored[sessionKey].compactionCount).toBe(2);
});
it("accumulates compactions across fallback attempts without double-counting a single attempt", async () => {
const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-compact-fallback-"));
const storePath = path.join(tmp, "sessions.json");
const sessionKey = "main";
const sessionEntry = {
sessionId: "session",
updatedAt: Date.now(),
totalTokens: 181_000,
compactionCount: 0,
};
await seedSessionStore({ storePath, sessionKey, entry: sessionEntry });
runWithModelFallbackMock.mockImplementationOnce(async ({ run }: RunWithModelFallbackParams) => {
try {
await run("anthropic", "claude");
} catch {
// Expected first-attempt failure.
}
return {
result: await run("openai", "gpt-5.2"),
provider: "openai",
model: "gpt-5.2",
attempts: [{ provider: "anthropic", model: "claude", error: "attempt failed" }],
};
});
runEmbeddedPiAgentMock
.mockImplementationOnce(async (params: EmbeddedRunParams) => {
params.onAgentEvent?.({
stream: "compaction",
data: { phase: "end", willRetry: true, completed: true },
});
throw new Error("attempt failed");
})
.mockResolvedValueOnce({
payloads: [{ text: "done" }],
meta: {
agentMeta: {
usage: { input: 190_000, output: 8_000, total: 198_000 },
lastCallUsage: { input: 10_000, output: 3_000, total: 13_000 },
compactionCount: 2,
},
},
});
const config = {
agents: { defaults: { compaction: { memoryFlush: { enabled: false } } } },
};
const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({
storePath,
sessionEntry,
config,
});
await runReplyAgent({
commandBody: "hello",
followupRun,
queueKey: "main",
resolvedQueue,
shouldSteer: false,
shouldFollowup: false,
isActive: false,
isStreaming: false,
typing,
sessionCtx,
sessionEntry,
sessionStore: { [sessionKey]: sessionEntry },
sessionKey,
storePath,
defaultModel: "anthropic/claude-opus-4-5",
agentCfgContextTokens: 200_000,
resolvedVerboseLevel: "off",
isNewSession: false,
blockStreamingEnabled: false,
resolvedBlockStreamingBreak: "message_end",
shouldInjectGroupIntro: false,
typingMode: "instant",
});
const stored = JSON.parse(await fs.readFile(storePath, "utf-8"));
expect(stored[sessionKey].totalTokens).toBe(10_000);
expect(stored[sessionKey].compactionCount).toBe(3);
});
it("does not count failed compaction end events from earlier fallback attempts", async () => {
const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-compact-fallback-failed-"));
const storePath = path.join(tmp, "sessions.json");
const sessionKey = "main";
const sessionEntry = {
sessionId: "session",
updatedAt: Date.now(),
totalTokens: 181_000,
compactionCount: 0,
};
await seedSessionStore({ storePath, sessionKey, entry: sessionEntry });
runWithModelFallbackMock.mockImplementationOnce(async ({ run }: RunWithModelFallbackParams) => {
try {
await run("anthropic", "claude");
} catch {
// Expected first-attempt failure.
}
return {
result: await run("openai", "gpt-5.2"),
provider: "openai",
model: "gpt-5.2",
attempts: [{ provider: "anthropic", model: "claude", error: "attempt failed" }],
};
});
runEmbeddedPiAgentMock
.mockImplementationOnce(async (params: EmbeddedRunParams) => {
params.onAgentEvent?.({
stream: "compaction",
data: { phase: "end", willRetry: true, completed: false },
});
throw new Error("attempt failed");
})
.mockResolvedValueOnce({
payloads: [{ text: "done" }],
meta: {
agentMeta: {
usage: { input: 190_000, output: 8_000, total: 198_000 },
lastCallUsage: { input: 10_000, output: 3_000, total: 13_000 },
compactionCount: 2,
},
},
});
const config = {
agents: { defaults: { compaction: { memoryFlush: { enabled: false } } } },
};
const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({
storePath,
sessionEntry,
config,
});
await runReplyAgent({
commandBody: "hello",
followupRun,
queueKey: "main",
resolvedQueue,
shouldSteer: false,
shouldFollowup: false,
isActive: false,
isStreaming: false,
typing,
sessionCtx,
sessionEntry,
sessionStore: { [sessionKey]: sessionEntry },
sessionKey,
storePath,
defaultModel: "anthropic/claude-opus-4-5",
agentCfgContextTokens: 200_000,
resolvedVerboseLevel: "off",
isNewSession: false,
blockStreamingEnabled: false,
resolvedBlockStreamingBreak: "message_end",
shouldInjectGroupIntro: false,
typingMode: "instant",
});
const stored = JSON.parse(await fs.readFile(storePath, "utf-8"));
expect(stored[sessionKey].totalTokens).toBe(10_000);
expect(stored[sessionKey].compactionCount).toBe(2);
});
it("updates totalTokens from lastCallUsage even without compaction", async () => {
const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-usage-last-"));
const storePath = path.join(tmp, "sessions.json");
@ -537,7 +772,10 @@ describe("runReplyAgent auto-compaction token update", () => {
runEmbeddedPiAgentMock.mockImplementation(async (params: EmbeddedRunParams) => {
params.onAgentEvent?.({ stream: "compaction", data: { phase: "start" } });
params.onAgentEvent?.({ stream: "compaction", data: { phase: "end", willRetry: false } });
params.onAgentEvent?.({
stream: "compaction",
data: { phase: "end", willRetry: false, completed: true },
});
return {
payloads: [{ text: "done" }],
meta: {

View File

@ -380,7 +380,7 @@ export async function runReplyAgent(params: {
fallbackAttempts,
directlySentBlockKeys,
} = runOutcome;
let { didLogHeartbeatStrip, autoCompactionCompleted } = runOutcome;
let { didLogHeartbeatStrip, autoCompactionCount } = runOutcome;
if (
shouldInjectGroupIntro &&
@ -664,12 +664,13 @@ export async function runReplyAgent(params: {
}
}
if (autoCompactionCompleted) {
if (autoCompactionCount > 0) {
const count = await incrementRunCompactionCount({
sessionEntry: activeSessionEntry,
sessionStore: activeSessionStore,
sessionKey,
storePath,
amount: autoCompactionCount,
lastCallUsage: runResult.meta?.agentMeta?.lastCallUsage,
contextTokensUsed,
});

View File

@ -71,7 +71,7 @@ function mockCompactionRun(params: {
}) => {
args.onAgentEvent?.({
stream: "compaction",
data: { phase: "end", willRetry: params.willRetry },
data: { phase: "end", willRetry: params.willRetry, completed: true },
});
return params.result;
},
@ -126,6 +126,110 @@ describe("createFollowupRunner compaction", () => {
expect(firstCall?.[0]?.text).toContain("Auto-compaction complete");
expect(sessionStore.main.compactionCount).toBe(1);
});
it("tracks auto-compaction from embedded result metadata even when no compaction event is emitted", async () => {
const storePath = path.join(
await fs.mkdtemp(path.join(tmpdir(), "openclaw-compaction-meta-")),
"sessions.json",
);
const sessionEntry: SessionEntry = {
sessionId: "session",
updatedAt: Date.now(),
};
const sessionStore: Record<string, SessionEntry> = {
main: sessionEntry,
};
const onBlockReply = vi.fn(async () => {});
runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [{ text: "final" }],
meta: {
agentMeta: {
compactionCount: 2,
lastCallUsage: { input: 10_000, output: 3_000, total: 13_000 },
},
},
});
const runner = createFollowupRunner({
opts: { onBlockReply },
typing: createMockTypingController(),
typingMode: "instant",
sessionEntry,
sessionStore,
sessionKey: "main",
storePath,
defaultModel: "anthropic/claude-opus-4-5",
});
const queued = createQueuedRun({
run: {
verboseLevel: "on",
},
});
await runner(queued);
expect(onBlockReply).toHaveBeenCalled();
const firstCall = (onBlockReply.mock.calls as unknown as Array<Array<{ text?: string }>>)[0];
expect(firstCall?.[0]?.text).toContain("Auto-compaction complete");
expect(sessionStore.main.compactionCount).toBe(2);
});
it("does not count failed compaction end events in followup runs", async () => {
const storePath = path.join(
await fs.mkdtemp(path.join(tmpdir(), "openclaw-compaction-failed-")),
"sessions.json",
);
const sessionEntry: SessionEntry = {
sessionId: "session",
updatedAt: Date.now(),
};
const sessionStore: Record<string, SessionEntry> = {
main: sessionEntry,
};
const onBlockReply = vi.fn(async () => {});
const runner = createFollowupRunner({
opts: { onBlockReply },
typing: createMockTypingController(),
typingMode: "instant",
sessionEntry,
sessionStore,
sessionKey: "main",
storePath,
defaultModel: "anthropic/claude-opus-4-5",
});
const queued = createQueuedRun({
run: {
verboseLevel: "on",
},
});
runEmbeddedPiAgentMock.mockImplementationOnce(async (args) => {
args.onAgentEvent?.({
stream: "compaction",
data: { phase: "end", willRetry: false, completed: false },
});
return {
payloads: [{ text: "final" }],
meta: {
agentMeta: {
compactionCount: 0,
lastCallUsage: { input: 10_000, output: 3_000, total: 13_000 },
},
},
};
});
await runner(queued);
expect(onBlockReply).toHaveBeenCalledTimes(1);
const firstCall = (onBlockReply.mock.calls as unknown as Array<Array<{ text?: string }>>)[0];
expect(firstCall?.[0]?.text).toBe("final");
expect(sessionStore.main.compactionCount).toBeUndefined();
});
});
describe("createFollowupRunner bootstrap warning dedupe", () => {

View File

@ -145,7 +145,7 @@ export function createFollowupRunner(params: {
isControlUiVisible: shouldSurfaceToControlUi,
});
}
let autoCompactionCompleted = false;
let autoCompactionCount = 0;
let runResult: Awaited<ReturnType<typeof runEmbeddedPiAgent>>;
let fallbackProvider = queued.run.provider;
let fallbackModel = queued.run.model;
@ -168,68 +168,81 @@ export function createFollowupRunner(params: {
}),
run: async (provider, model, runOptions) => {
const authProfile = resolveRunAuthProfile(queued.run, provider);
const result = await runEmbeddedPiAgent({
sessionId: queued.run.sessionId,
sessionKey: queued.run.sessionKey,
agentId: queued.run.agentId,
trigger: "user",
messageChannel: queued.originatingChannel ?? undefined,
messageProvider: queued.run.messageProvider,
agentAccountId: queued.run.agentAccountId,
messageTo: queued.originatingTo,
messageThreadId: queued.originatingThreadId,
currentChannelId: queued.originatingTo,
currentThreadTs:
queued.originatingThreadId != null ? String(queued.originatingThreadId) : undefined,
groupId: queued.run.groupId,
groupChannel: queued.run.groupChannel,
groupSpace: queued.run.groupSpace,
senderId: queued.run.senderId,
senderName: queued.run.senderName,
senderUsername: queued.run.senderUsername,
senderE164: queued.run.senderE164,
senderIsOwner: queued.run.senderIsOwner,
sessionFile: queued.run.sessionFile,
agentDir: queued.run.agentDir,
workspaceDir: queued.run.workspaceDir,
config: queued.run.config,
skillsSnapshot: queued.run.skillsSnapshot,
prompt: queued.prompt,
extraSystemPrompt: queued.run.extraSystemPrompt,
ownerNumbers: queued.run.ownerNumbers,
enforceFinalTag: queued.run.enforceFinalTag,
provider,
model,
...authProfile,
thinkLevel: queued.run.thinkLevel,
verboseLevel: queued.run.verboseLevel,
reasoningLevel: queued.run.reasoningLevel,
suppressToolErrorWarnings: opts?.suppressToolErrorWarnings,
execOverrides: queued.run.execOverrides,
bashElevated: queued.run.bashElevated,
timeoutMs: queued.run.timeoutMs,
runId,
allowTransientCooldownProbe: runOptions?.allowTransientCooldownProbe,
blockReplyBreak: queued.run.blockReplyBreak,
bootstrapPromptWarningSignaturesSeen,
bootstrapPromptWarningSignature:
bootstrapPromptWarningSignaturesSeen[
bootstrapPromptWarningSignaturesSeen.length - 1
],
onAgentEvent: (evt) => {
if (evt.stream !== "compaction") {
return;
}
const phase = typeof evt.data.phase === "string" ? evt.data.phase : "";
if (phase === "end") {
autoCompactionCompleted = true;
}
},
});
bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen(
result.meta?.systemPromptReport,
);
return result;
let attemptCompactionCount = 0;
try {
const result = await runEmbeddedPiAgent({
sessionId: queued.run.sessionId,
sessionKey: queued.run.sessionKey,
agentId: queued.run.agentId,
trigger: "user",
messageChannel: queued.originatingChannel ?? undefined,
messageProvider: queued.run.messageProvider,
agentAccountId: queued.run.agentAccountId,
messageTo: queued.originatingTo,
messageThreadId: queued.originatingThreadId,
currentChannelId: queued.originatingTo,
currentThreadTs:
queued.originatingThreadId != null
? String(queued.originatingThreadId)
: undefined,
groupId: queued.run.groupId,
groupChannel: queued.run.groupChannel,
groupSpace: queued.run.groupSpace,
senderId: queued.run.senderId,
senderName: queued.run.senderName,
senderUsername: queued.run.senderUsername,
senderE164: queued.run.senderE164,
senderIsOwner: queued.run.senderIsOwner,
sessionFile: queued.run.sessionFile,
agentDir: queued.run.agentDir,
workspaceDir: queued.run.workspaceDir,
config: queued.run.config,
skillsSnapshot: queued.run.skillsSnapshot,
prompt: queued.prompt,
extraSystemPrompt: queued.run.extraSystemPrompt,
ownerNumbers: queued.run.ownerNumbers,
enforceFinalTag: queued.run.enforceFinalTag,
provider,
model,
...authProfile,
thinkLevel: queued.run.thinkLevel,
verboseLevel: queued.run.verboseLevel,
reasoningLevel: queued.run.reasoningLevel,
suppressToolErrorWarnings: opts?.suppressToolErrorWarnings,
execOverrides: queued.run.execOverrides,
bashElevated: queued.run.bashElevated,
timeoutMs: queued.run.timeoutMs,
runId,
allowTransientCooldownProbe: runOptions?.allowTransientCooldownProbe,
blockReplyBreak: queued.run.blockReplyBreak,
bootstrapPromptWarningSignaturesSeen,
bootstrapPromptWarningSignature:
bootstrapPromptWarningSignaturesSeen[
bootstrapPromptWarningSignaturesSeen.length - 1
],
onAgentEvent: (evt) => {
if (evt.stream !== "compaction") {
return;
}
const phase = typeof evt.data.phase === "string" ? evt.data.phase : "";
const completed = evt.data?.completed === true;
if (phase === "end" && completed) {
attemptCompactionCount += 1;
}
},
});
bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen(
result.meta?.systemPromptReport,
);
const resultCompactionCount = Math.max(
0,
result.meta?.agentMeta?.compactionCount ?? 0,
);
attemptCompactionCount = Math.max(attemptCompactionCount, resultCompactionCount);
return result;
} finally {
autoCompactionCount += attemptCompactionCount;
}
},
});
runResult = fallbackResult.result;
@ -326,12 +339,13 @@ export function createFollowupRunner(params: {
return;
}
if (autoCompactionCompleted) {
if (autoCompactionCount > 0) {
const count = await incrementRunCompactionCount({
sessionEntry,
sessionStore,
sessionKey,
storePath,
amount: autoCompactionCount,
lastCallUsage: runResult.meta?.agentMeta?.lastCallUsage,
contextTokensUsed,
});

View File

@ -445,6 +445,23 @@ describe("incrementCompactionCount", () => {
expect(stored[sessionKey].outputTokens).toBeUndefined();
});
it("increments compaction count by an explicit amount", async () => {
const entry = { sessionId: "s1", updatedAt: Date.now(), compactionCount: 2 } as SessionEntry;
const { storePath, sessionKey, sessionStore } = await createCompactionSessionFixture(entry);
const count = await incrementCompactionCount({
sessionEntry: entry,
sessionStore,
sessionKey,
storePath,
amount: 2,
});
expect(count).toBe(4);
const stored = JSON.parse(await fs.readFile(storePath, "utf-8"));
expect(stored[sessionKey].compactionCount).toBe(4);
});
it("does not update totalTokens when tokensAfter is not provided", async () => {
const entry = {
sessionId: "s1",

View File

@ -8,6 +8,7 @@ type IncrementRunCompactionCountParams = Omit<
Parameters<typeof incrementCompactionCount>[0],
"tokensAfter"
> & {
amount?: number;
lastCallUsage?: NormalizedUsage;
contextTokensUsed?: number;
};
@ -30,6 +31,7 @@ export async function incrementRunCompactionCount(
sessionStore: params.sessionStore,
sessionKey: params.sessionKey,
storePath: params.storePath,
amount: params.amount,
tokensAfter: tokensAfterCompaction,
});
}

View File

@ -255,6 +255,7 @@ export async function incrementCompactionCount(params: {
sessionKey?: string;
storePath?: string;
now?: number;
amount?: number;
/** Token count after compaction - if provided, updates session token counts */
tokensAfter?: number;
}): Promise<number | undefined> {
@ -264,6 +265,7 @@ export async function incrementCompactionCount(params: {
sessionKey,
storePath,
now = Date.now(),
amount = 1,
tokensAfter,
} = params;
if (!sessionStore || !sessionKey) {
@ -273,7 +275,8 @@ export async function incrementCompactionCount(params: {
if (!entry) {
return undefined;
}
const nextCount = (entry.compactionCount ?? 0) + 1;
const incrementBy = Math.max(0, amount);
const nextCount = (entry.compactionCount ?? 0) + incrementBy;
// Build update payload with compaction count and optionally updated token counts
const updates: Partial<SessionEntry> = {
compactionCount: nextCount,

View File

@ -138,7 +138,7 @@ describe("compaction hook wiring", () => {
expect(emitAgentEvent).toHaveBeenCalledWith({
runId: "r2",
stream: "compaction",
data: { phase: "end", willRetry: false },
data: { phase: "end", willRetry: false, completed: true },
});
});
@ -169,7 +169,7 @@ describe("compaction hook wiring", () => {
expect(emitAgentEvent).toHaveBeenCalledWith({
runId: "r3",
stream: "compaction",
data: { phase: "end", willRetry: true },
data: { phase: "end", willRetry: true, completed: true },
});
});