diff --git a/CHANGELOG.md b/CHANGELOG.md index abe57c8108b..b2a2c7138ac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -99,6 +99,7 @@ Docs: https://docs.openclaw.ai - Mattermost/thread routing: non-inbound reply paths (TUI/WebUI turns, tool-call callbacks, subagent responses) now correctly route to the originating Mattermost thread when `replyToMode: "all"` is active; also prevents stale `origin.threadId` metadata from resurrecting cleared thread routes. (#44283) thanks @teconomix - Gateway/websocket pairing bypass for disabled auth: skip device-pairing enforcement when `gateway.auth.mode=none` so Control UI connections behind reverse proxies no longer get stuck on `pairing required` (code 1008) despite auth being explicitly disabled. (#42931) - Auth/login lockout recovery: clear stale `auth_permanent` and `billing` disabled state for all profiles matching the target provider when `openclaw models auth login` is invoked, so users locked out by expired or revoked OAuth tokens can recover by re-authenticating instead of waiting for the cooldown timer to expire. (#43057) +- Auto-reply/context-engine compaction: persist the exact embedded-run metadata compaction count for main and followup runner session accounting, so metadata-only auto-compactions no longer undercount multi-compaction runs. (#42629) thanks @uf-hy. ## 2026.3.12 diff --git a/src/agents/pi-embedded-subscribe.handlers.compaction.ts b/src/agents/pi-embedded-subscribe.handlers.compaction.ts index 705ffb7cf89..7b9c4499eff 100644 --- a/src/agents/pi-embedded-subscribe.handlers.compaction.ts +++ b/src/agents/pi-embedded-subscribe.handlers.compaction.ts @@ -64,11 +64,11 @@ export function handleAutoCompactionEnd( emitAgentEvent({ runId: ctx.params.runId, stream: "compaction", - data: { phase: "end", willRetry }, + data: { phase: "end", willRetry, completed: hasResult && !wasAborted }, }); void ctx.params.onAgentEvent?.({ stream: "compaction", - data: { phase: "end", willRetry }, + data: { phase: "end", willRetry, completed: hasResult && !wasAborted }, }); // Run after_compaction plugin hook (fire-and-forget) diff --git a/src/auto-reply/reply/agent-runner-execution.ts b/src/auto-reply/reply/agent-runner-execution.ts index 27a31c2387a..9ebc239f7ff 100644 --- a/src/auto-reply/reply/agent-runner-execution.ts +++ b/src/auto-reply/reply/agent-runner-execution.ts @@ -67,7 +67,7 @@ export type AgentRunLoopResult = fallbackModel?: string; fallbackAttempts: RuntimeFallbackAttempt[]; didLogHeartbeatStrip: boolean; - autoCompactionCompleted: boolean; + autoCompactionCount: number; /** Payload keys sent directly (not via pipeline) during tool flush. */ directlySentBlockKeys?: Set; } @@ -103,7 +103,7 @@ export async function runAgentTurnWithFallback(params: { }): Promise { const TRANSIENT_HTTP_RETRY_DELAY_MS = 2_500; let didLogHeartbeatStrip = false; - let autoCompactionCompleted = false; + let autoCompactionCount = 0; // Track payloads sent directly (not via pipeline) during tool flush to avoid duplicates. const directlySentBlockKeys = new Set(); @@ -319,154 +319,165 @@ export async function runAgentTurnWithFallback(params: { }, ); return (async () => { - const result = await runEmbeddedPiAgent({ - ...embeddedContext, - trigger: params.isHeartbeat ? "heartbeat" : "user", - groupId: resolveGroupSessionKey(params.sessionCtx)?.id, - groupChannel: - params.sessionCtx.GroupChannel?.trim() ?? params.sessionCtx.GroupSubject?.trim(), - groupSpace: params.sessionCtx.GroupSpace?.trim() ?? undefined, - ...senderContext, - ...runBaseParams, - prompt: params.commandBody, - extraSystemPrompt: params.followupRun.run.extraSystemPrompt, - toolResultFormat: (() => { - const channel = resolveMessageChannel( - params.sessionCtx.Surface, - params.sessionCtx.Provider, - ); - if (!channel) { - return "markdown"; - } - return isMarkdownCapableMessageChannel(channel) ? "markdown" : "plain"; - })(), - suppressToolErrorWarnings: params.opts?.suppressToolErrorWarnings, - bootstrapContextMode: params.opts?.bootstrapContextMode, - bootstrapContextRunKind: params.opts?.isHeartbeat ? "heartbeat" : "default", - images: params.opts?.images, - abortSignal: params.opts?.abortSignal, - blockReplyBreak: params.resolvedBlockStreamingBreak, - blockReplyChunking: params.blockReplyChunking, - onPartialReply: async (payload) => { - const textForTyping = await handlePartialForTyping(payload); - if (!params.opts?.onPartialReply || textForTyping === undefined) { - return; - } - await params.opts.onPartialReply({ - text: textForTyping, - mediaUrls: payload.mediaUrls, - }); - }, - onAssistantMessageStart: async () => { - await params.typingSignals.signalMessageStart(); - await params.opts?.onAssistantMessageStart?.(); - }, - onReasoningStream: - params.typingSignals.shouldStartOnReasoning || params.opts?.onReasoningStream - ? async (payload) => { - await params.typingSignals.signalReasoningDelta(); - await params.opts?.onReasoningStream?.({ - text: payload.text, - mediaUrls: payload.mediaUrls, - }); - } - : undefined, - onReasoningEnd: params.opts?.onReasoningEnd, - onAgentEvent: async (evt) => { - // Signal run start only after the embedded agent emits real activity. - const hasLifecyclePhase = - evt.stream === "lifecycle" && typeof evt.data.phase === "string"; - if (evt.stream !== "lifecycle" || hasLifecyclePhase) { - notifyAgentRunStart(); - } - // Trigger typing when tools start executing. - // Must await to ensure typing indicator starts before tool summaries are emitted. - if (evt.stream === "tool") { - const phase = typeof evt.data.phase === "string" ? evt.data.phase : ""; - const name = typeof evt.data.name === "string" ? evt.data.name : undefined; - if (phase === "start" || phase === "update") { - await params.typingSignals.signalToolStart(); - await params.opts?.onToolStart?.({ name, phase }); + let attemptCompactionCount = 0; + try { + const result = await runEmbeddedPiAgent({ + ...embeddedContext, + trigger: params.isHeartbeat ? "heartbeat" : "user", + groupId: resolveGroupSessionKey(params.sessionCtx)?.id, + groupChannel: + params.sessionCtx.GroupChannel?.trim() ?? params.sessionCtx.GroupSubject?.trim(), + groupSpace: params.sessionCtx.GroupSpace?.trim() ?? undefined, + ...senderContext, + ...runBaseParams, + prompt: params.commandBody, + extraSystemPrompt: params.followupRun.run.extraSystemPrompt, + toolResultFormat: (() => { + const channel = resolveMessageChannel( + params.sessionCtx.Surface, + params.sessionCtx.Provider, + ); + if (!channel) { + return "markdown"; } - } - // Track auto-compaction completion and notify UI layer - if (evt.stream === "compaction") { - const phase = typeof evt.data.phase === "string" ? evt.data.phase : ""; - if (phase === "start") { - await params.opts?.onCompactionStart?.(); + return isMarkdownCapableMessageChannel(channel) ? "markdown" : "plain"; + })(), + suppressToolErrorWarnings: params.opts?.suppressToolErrorWarnings, + bootstrapContextMode: params.opts?.bootstrapContextMode, + bootstrapContextRunKind: params.opts?.isHeartbeat ? "heartbeat" : "default", + images: params.opts?.images, + abortSignal: params.opts?.abortSignal, + blockReplyBreak: params.resolvedBlockStreamingBreak, + blockReplyChunking: params.blockReplyChunking, + onPartialReply: async (payload) => { + const textForTyping = await handlePartialForTyping(payload); + if (!params.opts?.onPartialReply || textForTyping === undefined) { + return; } - if (phase === "end") { - autoCompactionCompleted = true; - await params.opts?.onCompactionEnd?.(); - } - } - }, - // Always pass onBlockReply so flushBlockReplyBuffer works before tool execution, - // even when regular block streaming is disabled. The handler sends directly - // via opts.onBlockReply when the pipeline isn't available. - onBlockReply: params.opts?.onBlockReply - ? createBlockReplyDeliveryHandler({ - onBlockReply: params.opts.onBlockReply, - currentMessageId: - params.sessionCtx.MessageSidFull ?? params.sessionCtx.MessageSid, - normalizeStreamingText, - applyReplyToMode: params.applyReplyToMode, - normalizeMediaPaths: normalizeReplyMediaPaths, - typingSignals: params.typingSignals, - blockStreamingEnabled: params.blockStreamingEnabled, - blockReplyPipeline, - directlySentBlockKeys, - }) - : undefined, - onBlockReplyFlush: - params.blockStreamingEnabled && blockReplyPipeline - ? async () => { - await blockReplyPipeline.flush({ force: true }); - } - : undefined, - shouldEmitToolResult: params.shouldEmitToolResult, - shouldEmitToolOutput: params.shouldEmitToolOutput, - bootstrapPromptWarningSignaturesSeen, - bootstrapPromptWarningSignature: - bootstrapPromptWarningSignaturesSeen[ - bootstrapPromptWarningSignaturesSeen.length - 1 - ], - onToolResult: onToolResult - ? (() => { - // Serialize tool result delivery to preserve message ordering. - // Without this, concurrent tool callbacks race through typing signals - // and message sends, causing out-of-order delivery to the user. - // See: https://github.com/openclaw/openclaw/issues/11044 - let toolResultChain: Promise = Promise.resolve(); - return (payload: ReplyPayload) => { - toolResultChain = toolResultChain - .then(async () => { - const { text, skip } = normalizeStreamingText(payload); - if (skip) { - return; - } - await params.typingSignals.signalTextDelta(text); - await onToolResult({ - ...payload, - text, - }); - }) - .catch((err) => { - // Keep chain healthy after an error so later tool results still deliver. - logVerbose(`tool result delivery failed: ${String(err)}`); + await params.opts.onPartialReply({ + text: textForTyping, + mediaUrls: payload.mediaUrls, + }); + }, + onAssistantMessageStart: async () => { + await params.typingSignals.signalMessageStart(); + await params.opts?.onAssistantMessageStart?.(); + }, + onReasoningStream: + params.typingSignals.shouldStartOnReasoning || params.opts?.onReasoningStream + ? async (payload) => { + await params.typingSignals.signalReasoningDelta(); + await params.opts?.onReasoningStream?.({ + text: payload.text, + mediaUrls: payload.mediaUrls, }); - const task = toolResultChain.finally(() => { - params.pendingToolTasks.delete(task); - }); - params.pendingToolTasks.add(task); - }; - })() - : undefined, - }); - bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen( - result.meta?.systemPromptReport, - ); - return result; + } + : undefined, + onReasoningEnd: params.opts?.onReasoningEnd, + onAgentEvent: async (evt) => { + // Signal run start only after the embedded agent emits real activity. + const hasLifecyclePhase = + evt.stream === "lifecycle" && typeof evt.data.phase === "string"; + if (evt.stream !== "lifecycle" || hasLifecyclePhase) { + notifyAgentRunStart(); + } + // Trigger typing when tools start executing. + // Must await to ensure typing indicator starts before tool summaries are emitted. + if (evt.stream === "tool") { + const phase = typeof evt.data.phase === "string" ? evt.data.phase : ""; + const name = typeof evt.data.name === "string" ? evt.data.name : undefined; + if (phase === "start" || phase === "update") { + await params.typingSignals.signalToolStart(); + await params.opts?.onToolStart?.({ name, phase }); + } + } + // Track auto-compaction completion and notify UI layer. + if (evt.stream === "compaction") { + const phase = typeof evt.data.phase === "string" ? evt.data.phase : ""; + if (phase === "start") { + await params.opts?.onCompactionStart?.(); + } + const completed = evt.data?.completed === true; + if (phase === "end" && completed) { + attemptCompactionCount += 1; + await params.opts?.onCompactionEnd?.(); + } + } + }, + // Always pass onBlockReply so flushBlockReplyBuffer works before tool execution, + // even when regular block streaming is disabled. The handler sends directly + // via opts.onBlockReply when the pipeline isn't available. + onBlockReply: params.opts?.onBlockReply + ? createBlockReplyDeliveryHandler({ + onBlockReply: params.opts.onBlockReply, + currentMessageId: + params.sessionCtx.MessageSidFull ?? params.sessionCtx.MessageSid, + normalizeStreamingText, + applyReplyToMode: params.applyReplyToMode, + normalizeMediaPaths: normalizeReplyMediaPaths, + typingSignals: params.typingSignals, + blockStreamingEnabled: params.blockStreamingEnabled, + blockReplyPipeline, + directlySentBlockKeys, + }) + : undefined, + onBlockReplyFlush: + params.blockStreamingEnabled && blockReplyPipeline + ? async () => { + await blockReplyPipeline.flush({ force: true }); + } + : undefined, + shouldEmitToolResult: params.shouldEmitToolResult, + shouldEmitToolOutput: params.shouldEmitToolOutput, + bootstrapPromptWarningSignaturesSeen, + bootstrapPromptWarningSignature: + bootstrapPromptWarningSignaturesSeen[ + bootstrapPromptWarningSignaturesSeen.length - 1 + ], + onToolResult: onToolResult + ? (() => { + // Serialize tool result delivery to preserve message ordering. + // Without this, concurrent tool callbacks race through typing signals + // and message sends, causing out-of-order delivery to the user. + // See: https://github.com/openclaw/openclaw/issues/11044 + let toolResultChain: Promise = Promise.resolve(); + return (payload: ReplyPayload) => { + toolResultChain = toolResultChain + .then(async () => { + const { text, skip } = normalizeStreamingText(payload); + if (skip) { + return; + } + await params.typingSignals.signalTextDelta(text); + await onToolResult({ + ...payload, + text, + }); + }) + .catch((err) => { + // Keep chain healthy after an error so later tool results still deliver. + logVerbose(`tool result delivery failed: ${String(err)}`); + }); + const task = toolResultChain.finally(() => { + params.pendingToolTasks.delete(task); + }); + params.pendingToolTasks.add(task); + }; + })() + : undefined, + }); + bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen( + result.meta?.systemPromptReport, + ); + const resultCompactionCount = Math.max( + 0, + result.meta?.agentMeta?.compactionCount ?? 0, + ); + attemptCompactionCount = Math.max(attemptCompactionCount, resultCompactionCount); + return result; + } finally { + autoCompactionCount += attemptCompactionCount; + } })(); }, }); @@ -654,7 +665,7 @@ export async function runAgentTurnWithFallback(params: { fallbackModel, fallbackAttempts, didLogHeartbeatStrip, - autoCompactionCompleted, + autoCompactionCount, directlySentBlockKeys: directlySentBlockKeys.size > 0 ? directlySentBlockKeys : undefined, }; } diff --git a/src/auto-reply/reply/agent-runner.misc.runreplyagent.test.ts b/src/auto-reply/reply/agent-runner.misc.runreplyagent.test.ts index 14731dbb0ff..90535e69fb9 100644 --- a/src/auto-reply/reply/agent-runner.misc.runreplyagent.test.ts +++ b/src/auto-reply/reply/agent-runner.misc.runreplyagent.test.ts @@ -322,7 +322,7 @@ describe("runReplyAgent auto-compaction token update", () => { extraSystemPrompt?: string; onAgentEvent?: (evt: { stream?: string; - data?: { phase?: string; willRetry?: boolean }; + data?: { phase?: string; willRetry?: boolean; completed?: boolean }; }) => void; }; @@ -397,7 +397,10 @@ describe("runReplyAgent auto-compaction token update", () => { runEmbeddedPiAgentMock.mockImplementation(async (params: EmbeddedRunParams) => { // Simulate auto-compaction during agent run params.onAgentEvent?.({ stream: "compaction", data: { phase: "start" } }); - params.onAgentEvent?.({ stream: "compaction", data: { phase: "end", willRetry: false } }); + params.onAgentEvent?.({ + stream: "compaction", + data: { phase: "end", willRetry: false, completed: true }, + }); return { payloads: [{ text: "done" }], meta: { @@ -455,6 +458,238 @@ describe("runReplyAgent auto-compaction token update", () => { expect(stored[sessionKey].compactionCount).toBe(1); }); + it("tracks auto-compaction from embedded result metadata even when no compaction event is emitted", async () => { + const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-compact-meta-")); + const storePath = path.join(tmp, "sessions.json"); + const sessionKey = "main"; + const sessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + totalTokens: 181_000, + compactionCount: 0, + }; + + await seedSessionStore({ storePath, sessionKey, entry: sessionEntry }); + + runEmbeddedPiAgentMock.mockResolvedValue({ + payloads: [{ text: "done" }], + meta: { + agentMeta: { + usage: { input: 190_000, output: 8_000, total: 198_000 }, + lastCallUsage: { input: 10_000, output: 3_000, total: 13_000 }, + compactionCount: 2, + }, + }, + }); + + const config = { + agents: { defaults: { compaction: { memoryFlush: { enabled: false } } } }, + }; + const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({ + storePath, + sessionEntry, + config, + }); + + await runReplyAgent({ + commandBody: "hello", + followupRun, + queueKey: "main", + resolvedQueue, + shouldSteer: false, + shouldFollowup: false, + isActive: false, + isStreaming: false, + typing, + sessionCtx, + sessionEntry, + sessionStore: { [sessionKey]: sessionEntry }, + sessionKey, + storePath, + defaultModel: "anthropic/claude-opus-4-5", + agentCfgContextTokens: 200_000, + resolvedVerboseLevel: "off", + isNewSession: false, + blockStreamingEnabled: false, + resolvedBlockStreamingBreak: "message_end", + shouldInjectGroupIntro: false, + typingMode: "instant", + }); + + const stored = JSON.parse(await fs.readFile(storePath, "utf-8")); + expect(stored[sessionKey].totalTokens).toBe(10_000); + expect(stored[sessionKey].compactionCount).toBe(2); + }); + + it("accumulates compactions across fallback attempts without double-counting a single attempt", async () => { + const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-compact-fallback-")); + const storePath = path.join(tmp, "sessions.json"); + const sessionKey = "main"; + const sessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + totalTokens: 181_000, + compactionCount: 0, + }; + + await seedSessionStore({ storePath, sessionKey, entry: sessionEntry }); + + runWithModelFallbackMock.mockImplementationOnce(async ({ run }: RunWithModelFallbackParams) => { + try { + await run("anthropic", "claude"); + } catch { + // Expected first-attempt failure. + } + return { + result: await run("openai", "gpt-5.2"), + provider: "openai", + model: "gpt-5.2", + attempts: [{ provider: "anthropic", model: "claude", error: "attempt failed" }], + }; + }); + + runEmbeddedPiAgentMock + .mockImplementationOnce(async (params: EmbeddedRunParams) => { + params.onAgentEvent?.({ + stream: "compaction", + data: { phase: "end", willRetry: true, completed: true }, + }); + throw new Error("attempt failed"); + }) + .mockResolvedValueOnce({ + payloads: [{ text: "done" }], + meta: { + agentMeta: { + usage: { input: 190_000, output: 8_000, total: 198_000 }, + lastCallUsage: { input: 10_000, output: 3_000, total: 13_000 }, + compactionCount: 2, + }, + }, + }); + + const config = { + agents: { defaults: { compaction: { memoryFlush: { enabled: false } } } }, + }; + const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({ + storePath, + sessionEntry, + config, + }); + + await runReplyAgent({ + commandBody: "hello", + followupRun, + queueKey: "main", + resolvedQueue, + shouldSteer: false, + shouldFollowup: false, + isActive: false, + isStreaming: false, + typing, + sessionCtx, + sessionEntry, + sessionStore: { [sessionKey]: sessionEntry }, + sessionKey, + storePath, + defaultModel: "anthropic/claude-opus-4-5", + agentCfgContextTokens: 200_000, + resolvedVerboseLevel: "off", + isNewSession: false, + blockStreamingEnabled: false, + resolvedBlockStreamingBreak: "message_end", + shouldInjectGroupIntro: false, + typingMode: "instant", + }); + + const stored = JSON.parse(await fs.readFile(storePath, "utf-8")); + expect(stored[sessionKey].totalTokens).toBe(10_000); + expect(stored[sessionKey].compactionCount).toBe(3); + }); + + it("does not count failed compaction end events from earlier fallback attempts", async () => { + const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-compact-fallback-failed-")); + const storePath = path.join(tmp, "sessions.json"); + const sessionKey = "main"; + const sessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + totalTokens: 181_000, + compactionCount: 0, + }; + + await seedSessionStore({ storePath, sessionKey, entry: sessionEntry }); + + runWithModelFallbackMock.mockImplementationOnce(async ({ run }: RunWithModelFallbackParams) => { + try { + await run("anthropic", "claude"); + } catch { + // Expected first-attempt failure. + } + return { + result: await run("openai", "gpt-5.2"), + provider: "openai", + model: "gpt-5.2", + attempts: [{ provider: "anthropic", model: "claude", error: "attempt failed" }], + }; + }); + + runEmbeddedPiAgentMock + .mockImplementationOnce(async (params: EmbeddedRunParams) => { + params.onAgentEvent?.({ + stream: "compaction", + data: { phase: "end", willRetry: true, completed: false }, + }); + throw new Error("attempt failed"); + }) + .mockResolvedValueOnce({ + payloads: [{ text: "done" }], + meta: { + agentMeta: { + usage: { input: 190_000, output: 8_000, total: 198_000 }, + lastCallUsage: { input: 10_000, output: 3_000, total: 13_000 }, + compactionCount: 2, + }, + }, + }); + + const config = { + agents: { defaults: { compaction: { memoryFlush: { enabled: false } } } }, + }; + const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({ + storePath, + sessionEntry, + config, + }); + + await runReplyAgent({ + commandBody: "hello", + followupRun, + queueKey: "main", + resolvedQueue, + shouldSteer: false, + shouldFollowup: false, + isActive: false, + isStreaming: false, + typing, + sessionCtx, + sessionEntry, + sessionStore: { [sessionKey]: sessionEntry }, + sessionKey, + storePath, + defaultModel: "anthropic/claude-opus-4-5", + agentCfgContextTokens: 200_000, + resolvedVerboseLevel: "off", + isNewSession: false, + blockStreamingEnabled: false, + resolvedBlockStreamingBreak: "message_end", + shouldInjectGroupIntro: false, + typingMode: "instant", + }); + + const stored = JSON.parse(await fs.readFile(storePath, "utf-8")); + expect(stored[sessionKey].totalTokens).toBe(10_000); + expect(stored[sessionKey].compactionCount).toBe(2); + }); it("updates totalTokens from lastCallUsage even without compaction", async () => { const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-usage-last-")); const storePath = path.join(tmp, "sessions.json"); @@ -537,7 +772,10 @@ describe("runReplyAgent auto-compaction token update", () => { runEmbeddedPiAgentMock.mockImplementation(async (params: EmbeddedRunParams) => { params.onAgentEvent?.({ stream: "compaction", data: { phase: "start" } }); - params.onAgentEvent?.({ stream: "compaction", data: { phase: "end", willRetry: false } }); + params.onAgentEvent?.({ + stream: "compaction", + data: { phase: "end", willRetry: false, completed: true }, + }); return { payloads: [{ text: "done" }], meta: { diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index edc441a2552..76d86c45b05 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -380,7 +380,7 @@ export async function runReplyAgent(params: { fallbackAttempts, directlySentBlockKeys, } = runOutcome; - let { didLogHeartbeatStrip, autoCompactionCompleted } = runOutcome; + let { didLogHeartbeatStrip, autoCompactionCount } = runOutcome; if ( shouldInjectGroupIntro && @@ -664,12 +664,13 @@ export async function runReplyAgent(params: { } } - if (autoCompactionCompleted) { + if (autoCompactionCount > 0) { const count = await incrementRunCompactionCount({ sessionEntry: activeSessionEntry, sessionStore: activeSessionStore, sessionKey, storePath, + amount: autoCompactionCount, lastCallUsage: runResult.meta?.agentMeta?.lastCallUsage, contextTokensUsed, }); diff --git a/src/auto-reply/reply/followup-runner.test.ts b/src/auto-reply/reply/followup-runner.test.ts index 8d12e815685..c8e33397a2a 100644 --- a/src/auto-reply/reply/followup-runner.test.ts +++ b/src/auto-reply/reply/followup-runner.test.ts @@ -71,7 +71,7 @@ function mockCompactionRun(params: { }) => { args.onAgentEvent?.({ stream: "compaction", - data: { phase: "end", willRetry: params.willRetry }, + data: { phase: "end", willRetry: params.willRetry, completed: true }, }); return params.result; }, @@ -126,6 +126,110 @@ describe("createFollowupRunner compaction", () => { expect(firstCall?.[0]?.text).toContain("Auto-compaction complete"); expect(sessionStore.main.compactionCount).toBe(1); }); + + it("tracks auto-compaction from embedded result metadata even when no compaction event is emitted", async () => { + const storePath = path.join( + await fs.mkdtemp(path.join(tmpdir(), "openclaw-compaction-meta-")), + "sessions.json", + ); + const sessionEntry: SessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + }; + const sessionStore: Record = { + main: sessionEntry, + }; + const onBlockReply = vi.fn(async () => {}); + + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "final" }], + meta: { + agentMeta: { + compactionCount: 2, + lastCallUsage: { input: 10_000, output: 3_000, total: 13_000 }, + }, + }, + }); + + const runner = createFollowupRunner({ + opts: { onBlockReply }, + typing: createMockTypingController(), + typingMode: "instant", + sessionEntry, + sessionStore, + sessionKey: "main", + storePath, + defaultModel: "anthropic/claude-opus-4-5", + }); + + const queued = createQueuedRun({ + run: { + verboseLevel: "on", + }, + }); + + await runner(queued); + + expect(onBlockReply).toHaveBeenCalled(); + const firstCall = (onBlockReply.mock.calls as unknown as Array>)[0]; + expect(firstCall?.[0]?.text).toContain("Auto-compaction complete"); + expect(sessionStore.main.compactionCount).toBe(2); + }); + + it("does not count failed compaction end events in followup runs", async () => { + const storePath = path.join( + await fs.mkdtemp(path.join(tmpdir(), "openclaw-compaction-failed-")), + "sessions.json", + ); + const sessionEntry: SessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + }; + const sessionStore: Record = { + main: sessionEntry, + }; + const onBlockReply = vi.fn(async () => {}); + + const runner = createFollowupRunner({ + opts: { onBlockReply }, + typing: createMockTypingController(), + typingMode: "instant", + sessionEntry, + sessionStore, + sessionKey: "main", + storePath, + defaultModel: "anthropic/claude-opus-4-5", + }); + + const queued = createQueuedRun({ + run: { + verboseLevel: "on", + }, + }); + + runEmbeddedPiAgentMock.mockImplementationOnce(async (args) => { + args.onAgentEvent?.({ + stream: "compaction", + data: { phase: "end", willRetry: false, completed: false }, + }); + return { + payloads: [{ text: "final" }], + meta: { + agentMeta: { + compactionCount: 0, + lastCallUsage: { input: 10_000, output: 3_000, total: 13_000 }, + }, + }, + }; + }); + + await runner(queued); + + expect(onBlockReply).toHaveBeenCalledTimes(1); + const firstCall = (onBlockReply.mock.calls as unknown as Array>)[0]; + expect(firstCall?.[0]?.text).toBe("final"); + expect(sessionStore.main.compactionCount).toBeUndefined(); + }); }); describe("createFollowupRunner bootstrap warning dedupe", () => { diff --git a/src/auto-reply/reply/followup-runner.ts b/src/auto-reply/reply/followup-runner.ts index 8c7eccb5f02..fe90d56433c 100644 --- a/src/auto-reply/reply/followup-runner.ts +++ b/src/auto-reply/reply/followup-runner.ts @@ -145,7 +145,7 @@ export function createFollowupRunner(params: { isControlUiVisible: shouldSurfaceToControlUi, }); } - let autoCompactionCompleted = false; + let autoCompactionCount = 0; let runResult: Awaited>; let fallbackProvider = queued.run.provider; let fallbackModel = queued.run.model; @@ -168,68 +168,81 @@ export function createFollowupRunner(params: { }), run: async (provider, model, runOptions) => { const authProfile = resolveRunAuthProfile(queued.run, provider); - const result = await runEmbeddedPiAgent({ - sessionId: queued.run.sessionId, - sessionKey: queued.run.sessionKey, - agentId: queued.run.agentId, - trigger: "user", - messageChannel: queued.originatingChannel ?? undefined, - messageProvider: queued.run.messageProvider, - agentAccountId: queued.run.agentAccountId, - messageTo: queued.originatingTo, - messageThreadId: queued.originatingThreadId, - currentChannelId: queued.originatingTo, - currentThreadTs: - queued.originatingThreadId != null ? String(queued.originatingThreadId) : undefined, - groupId: queued.run.groupId, - groupChannel: queued.run.groupChannel, - groupSpace: queued.run.groupSpace, - senderId: queued.run.senderId, - senderName: queued.run.senderName, - senderUsername: queued.run.senderUsername, - senderE164: queued.run.senderE164, - senderIsOwner: queued.run.senderIsOwner, - sessionFile: queued.run.sessionFile, - agentDir: queued.run.agentDir, - workspaceDir: queued.run.workspaceDir, - config: queued.run.config, - skillsSnapshot: queued.run.skillsSnapshot, - prompt: queued.prompt, - extraSystemPrompt: queued.run.extraSystemPrompt, - ownerNumbers: queued.run.ownerNumbers, - enforceFinalTag: queued.run.enforceFinalTag, - provider, - model, - ...authProfile, - thinkLevel: queued.run.thinkLevel, - verboseLevel: queued.run.verboseLevel, - reasoningLevel: queued.run.reasoningLevel, - suppressToolErrorWarnings: opts?.suppressToolErrorWarnings, - execOverrides: queued.run.execOverrides, - bashElevated: queued.run.bashElevated, - timeoutMs: queued.run.timeoutMs, - runId, - allowTransientCooldownProbe: runOptions?.allowTransientCooldownProbe, - blockReplyBreak: queued.run.blockReplyBreak, - bootstrapPromptWarningSignaturesSeen, - bootstrapPromptWarningSignature: - bootstrapPromptWarningSignaturesSeen[ - bootstrapPromptWarningSignaturesSeen.length - 1 - ], - onAgentEvent: (evt) => { - if (evt.stream !== "compaction") { - return; - } - const phase = typeof evt.data.phase === "string" ? evt.data.phase : ""; - if (phase === "end") { - autoCompactionCompleted = true; - } - }, - }); - bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen( - result.meta?.systemPromptReport, - ); - return result; + let attemptCompactionCount = 0; + try { + const result = await runEmbeddedPiAgent({ + sessionId: queued.run.sessionId, + sessionKey: queued.run.sessionKey, + agentId: queued.run.agentId, + trigger: "user", + messageChannel: queued.originatingChannel ?? undefined, + messageProvider: queued.run.messageProvider, + agentAccountId: queued.run.agentAccountId, + messageTo: queued.originatingTo, + messageThreadId: queued.originatingThreadId, + currentChannelId: queued.originatingTo, + currentThreadTs: + queued.originatingThreadId != null + ? String(queued.originatingThreadId) + : undefined, + groupId: queued.run.groupId, + groupChannel: queued.run.groupChannel, + groupSpace: queued.run.groupSpace, + senderId: queued.run.senderId, + senderName: queued.run.senderName, + senderUsername: queued.run.senderUsername, + senderE164: queued.run.senderE164, + senderIsOwner: queued.run.senderIsOwner, + sessionFile: queued.run.sessionFile, + agentDir: queued.run.agentDir, + workspaceDir: queued.run.workspaceDir, + config: queued.run.config, + skillsSnapshot: queued.run.skillsSnapshot, + prompt: queued.prompt, + extraSystemPrompt: queued.run.extraSystemPrompt, + ownerNumbers: queued.run.ownerNumbers, + enforceFinalTag: queued.run.enforceFinalTag, + provider, + model, + ...authProfile, + thinkLevel: queued.run.thinkLevel, + verboseLevel: queued.run.verboseLevel, + reasoningLevel: queued.run.reasoningLevel, + suppressToolErrorWarnings: opts?.suppressToolErrorWarnings, + execOverrides: queued.run.execOverrides, + bashElevated: queued.run.bashElevated, + timeoutMs: queued.run.timeoutMs, + runId, + allowTransientCooldownProbe: runOptions?.allowTransientCooldownProbe, + blockReplyBreak: queued.run.blockReplyBreak, + bootstrapPromptWarningSignaturesSeen, + bootstrapPromptWarningSignature: + bootstrapPromptWarningSignaturesSeen[ + bootstrapPromptWarningSignaturesSeen.length - 1 + ], + onAgentEvent: (evt) => { + if (evt.stream !== "compaction") { + return; + } + const phase = typeof evt.data.phase === "string" ? evt.data.phase : ""; + const completed = evt.data?.completed === true; + if (phase === "end" && completed) { + attemptCompactionCount += 1; + } + }, + }); + bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen( + result.meta?.systemPromptReport, + ); + const resultCompactionCount = Math.max( + 0, + result.meta?.agentMeta?.compactionCount ?? 0, + ); + attemptCompactionCount = Math.max(attemptCompactionCount, resultCompactionCount); + return result; + } finally { + autoCompactionCount += attemptCompactionCount; + } }, }); runResult = fallbackResult.result; @@ -326,12 +339,13 @@ export function createFollowupRunner(params: { return; } - if (autoCompactionCompleted) { + if (autoCompactionCount > 0) { const count = await incrementRunCompactionCount({ sessionEntry, sessionStore, sessionKey, storePath, + amount: autoCompactionCount, lastCallUsage: runResult.meta?.agentMeta?.lastCallUsage, contextTokensUsed, }); diff --git a/src/auto-reply/reply/reply-state.test.ts b/src/auto-reply/reply/reply-state.test.ts index 69dbad531e7..f83d313e2d3 100644 --- a/src/auto-reply/reply/reply-state.test.ts +++ b/src/auto-reply/reply/reply-state.test.ts @@ -445,6 +445,23 @@ describe("incrementCompactionCount", () => { expect(stored[sessionKey].outputTokens).toBeUndefined(); }); + it("increments compaction count by an explicit amount", async () => { + const entry = { sessionId: "s1", updatedAt: Date.now(), compactionCount: 2 } as SessionEntry; + const { storePath, sessionKey, sessionStore } = await createCompactionSessionFixture(entry); + + const count = await incrementCompactionCount({ + sessionEntry: entry, + sessionStore, + sessionKey, + storePath, + amount: 2, + }); + expect(count).toBe(4); + + const stored = JSON.parse(await fs.readFile(storePath, "utf-8")); + expect(stored[sessionKey].compactionCount).toBe(4); + }); + it("does not update totalTokens when tokensAfter is not provided", async () => { const entry = { sessionId: "s1", diff --git a/src/auto-reply/reply/session-run-accounting.ts b/src/auto-reply/reply/session-run-accounting.ts index fe4b91a7cdc..1a8a0d83640 100644 --- a/src/auto-reply/reply/session-run-accounting.ts +++ b/src/auto-reply/reply/session-run-accounting.ts @@ -8,6 +8,7 @@ type IncrementRunCompactionCountParams = Omit< Parameters[0], "tokensAfter" > & { + amount?: number; lastCallUsage?: NormalizedUsage; contextTokensUsed?: number; }; @@ -30,6 +31,7 @@ export async function incrementRunCompactionCount( sessionStore: params.sessionStore, sessionKey: params.sessionKey, storePath: params.storePath, + amount: params.amount, tokensAfter: tokensAfterCompaction, }); } diff --git a/src/auto-reply/reply/session-updates.ts b/src/auto-reply/reply/session-updates.ts index 55b4d4eb15b..bea6cd326e0 100644 --- a/src/auto-reply/reply/session-updates.ts +++ b/src/auto-reply/reply/session-updates.ts @@ -255,6 +255,7 @@ export async function incrementCompactionCount(params: { sessionKey?: string; storePath?: string; now?: number; + amount?: number; /** Token count after compaction - if provided, updates session token counts */ tokensAfter?: number; }): Promise { @@ -264,6 +265,7 @@ export async function incrementCompactionCount(params: { sessionKey, storePath, now = Date.now(), + amount = 1, tokensAfter, } = params; if (!sessionStore || !sessionKey) { @@ -273,7 +275,8 @@ export async function incrementCompactionCount(params: { if (!entry) { return undefined; } - const nextCount = (entry.compactionCount ?? 0) + 1; + const incrementBy = Math.max(0, amount); + const nextCount = (entry.compactionCount ?? 0) + incrementBy; // Build update payload with compaction count and optionally updated token counts const updates: Partial = { compactionCount: nextCount, diff --git a/src/plugins/wired-hooks-compaction.test.ts b/src/plugins/wired-hooks-compaction.test.ts index 1e3f0021e29..694f4a1f4b4 100644 --- a/src/plugins/wired-hooks-compaction.test.ts +++ b/src/plugins/wired-hooks-compaction.test.ts @@ -138,7 +138,7 @@ describe("compaction hook wiring", () => { expect(emitAgentEvent).toHaveBeenCalledWith({ runId: "r2", stream: "compaction", - data: { phase: "end", willRetry: false }, + data: { phase: "end", willRetry: false, completed: true }, }); }); @@ -169,7 +169,7 @@ describe("compaction hook wiring", () => { expect(emitAgentEvent).toHaveBeenCalledWith({ runId: "r3", stream: "compaction", - data: { phase: "end", willRetry: true }, + data: { phase: "end", willRetry: true, completed: true }, }); });