From 2de32fbf14bb5af43fa12db9a27402120f6f65ea Mon Sep 17 00:00:00 2001 From: Jackal Xin Date: Wed, 25 Mar 2026 13:00:41 -0400 Subject: [PATCH] fix: reconcile session compaction count after late compaction success (#45493) Merged via squash. Prepared head SHA: d0715a5555791dd44a406d4732843454a3e9619e Co-authored-by: jackal092927 <3854860+jackal092927@users.noreply.github.com> Co-authored-by: jalehman <550978+jalehman@users.noreply.github.com> Reviewed-by: @jalehman --- CHANGELOG.md | 1 + ...dded-subscribe.handlers.compaction.test.ts | 172 +++++++++++++++++ ...-embedded-subscribe.handlers.compaction.ts | 42 ++++- ...ers.lifecycle.compaction-reconcile.test.ts | 173 ++++++++++++++++++ 4 files changed, 387 insertions(+), 1 deletion(-) create mode 100644 src/agents/pi-embedded-subscribe.handlers.compaction.test.ts create mode 100644 src/agents/pi-embedded-subscribe.handlers.lifecycle.compaction-reconcile.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index d395fa191c0..ea125e140e8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -528,6 +528,7 @@ Docs: https://docs.openclaw.ai ### Fixes - Agents/edit tool: accept common path/text alias spellings, show current file contents on exact-match failures, and avoid false edit failures after successful writes. (#52516) thanks @mbelinky. +- Agents/compaction: reconcile `sessions.json.compactionCount` after a late embedded auto-compaction success so persisted session counts catch up once the handler reports completion. (#45493) Thanks @jackal092927. ## 2026.3.13 diff --git a/src/agents/pi-embedded-subscribe.handlers.compaction.test.ts b/src/agents/pi-embedded-subscribe.handlers.compaction.test.ts new file mode 100644 index 00000000000..b6ae77ba05b --- /dev/null +++ b/src/agents/pi-embedded-subscribe.handlers.compaction.test.ts @@ -0,0 +1,172 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { describe, expect, it, vi } from "vitest"; +import { + handleAutoCompactionEnd, + reconcileSessionStoreCompactionCountAfterSuccess, +} from "./pi-embedded-subscribe.handlers.compaction.js"; +import type { EmbeddedPiSubscribeContext } from "./pi-embedded-subscribe.handlers.types.js"; + +async function seedSessionStore(params: { + storePath: string; + sessionKey: string; + compactionCount: number; + updatedAt?: number; +}) { + await fs.mkdir(path.dirname(params.storePath), { recursive: true }); + await fs.writeFile( + params.storePath, + JSON.stringify( + { + [params.sessionKey]: { + sessionId: "session-1", + updatedAt: params.updatedAt ?? 1_000, + compactionCount: params.compactionCount, + }, + }, + null, + 2, + ), + "utf-8", + ); +} + +async function readCompactionCount(storePath: string, sessionKey: string): Promise { + const store = JSON.parse(await fs.readFile(storePath, "utf-8")) as Record< + string, + { compactionCount?: number } + >; + return store[sessionKey]?.compactionCount ?? 0; +} + +async function waitForCompactionCount(params: { + storePath: string; + sessionKey: string; + expected: number; +}) { + for (let attempt = 0; attempt < 40; attempt += 1) { + if ((await readCompactionCount(params.storePath, params.sessionKey)) === params.expected) { + return; + } + await new Promise((resolve) => setTimeout(resolve, 10)); + } + throw new Error(`timed out waiting for compactionCount=${params.expected}`); +} + +function createCompactionContext(params: { + storePath: string; + sessionKey: string; + agentId?: string; + initialCount: number; +}): EmbeddedPiSubscribeContext { + let compactionCount = params.initialCount; + return { + params: { + runId: "run-test", + session: { messages: [] } as never, + config: { session: { store: params.storePath } } as never, + sessionKey: params.sessionKey, + sessionId: "session-1", + agentId: params.agentId ?? "test-agent", + onAgentEvent: undefined, + }, + state: { + compactionInFlight: true, + pendingCompactionRetry: 0, + } as never, + log: { + debug: vi.fn(), + warn: vi.fn(), + }, + ensureCompactionPromise: vi.fn(), + noteCompactionRetry: vi.fn(), + maybeResolveCompactionWait: vi.fn(), + resolveCompactionRetry: vi.fn(), + resetForCompactionRetry: vi.fn(), + incrementCompactionCount: () => { + compactionCount += 1; + }, + getCompactionCount: () => compactionCount, + } as unknown as EmbeddedPiSubscribeContext; +} + +describe("reconcileSessionStoreCompactionCountAfterSuccess", () => { + it("raises the stored compaction count to the observed value", async () => { + const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-compaction-reconcile-")); + const storePath = path.join(tmp, "sessions.json"); + const sessionKey = "main"; + await seedSessionStore({ + storePath, + sessionKey, + compactionCount: 1, + }); + + const nextCount = await reconcileSessionStoreCompactionCountAfterSuccess({ + sessionKey, + agentId: "test-agent", + configStore: storePath, + observedCompactionCount: 2, + now: 2_000, + }); + + expect(nextCount).toBe(2); + expect(await readCompactionCount(storePath, sessionKey)).toBe(2); + }); + + it("does not double count when the store is already at or above the observed value", async () => { + const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-compaction-idempotent-")); + const storePath = path.join(tmp, "sessions.json"); + const sessionKey = "main"; + await seedSessionStore({ + storePath, + sessionKey, + compactionCount: 3, + }); + + const nextCount = await reconcileSessionStoreCompactionCountAfterSuccess({ + sessionKey, + agentId: "test-agent", + configStore: storePath, + observedCompactionCount: 2, + now: 2_000, + }); + + expect(nextCount).toBe(3); + expect(await readCompactionCount(storePath, sessionKey)).toBe(3); + }); +}); + +describe("handleAutoCompactionEnd", () => { + it("reconciles the session store after a successful compaction end event", async () => { + const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-compaction-handler-")); + const storePath = path.join(tmp, "sessions.json"); + const sessionKey = "main"; + await seedSessionStore({ + storePath, + sessionKey, + compactionCount: 1, + }); + + const ctx = createCompactionContext({ + storePath, + sessionKey, + initialCount: 1, + }); + + handleAutoCompactionEnd(ctx, { + type: "auto_compaction_end", + result: { kept: 12 }, + willRetry: false, + aborted: false, + } as never); + + await waitForCompactionCount({ + storePath, + sessionKey, + expected: 2, + }); + + expect(await readCompactionCount(storePath, sessionKey)).toBe(2); + }); +}); diff --git a/src/agents/pi-embedded-subscribe.handlers.compaction.ts b/src/agents/pi-embedded-subscribe.handlers.compaction.ts index f0717f140cf..6b165a1f8f9 100644 --- a/src/agents/pi-embedded-subscribe.handlers.compaction.ts +++ b/src/agents/pi-embedded-subscribe.handlers.compaction.ts @@ -1,4 +1,5 @@ import type { AgentEvent } from "@mariozechner/pi-agent-core"; +import { resolveStorePath, updateSessionStoreEntry } from "../config/sessions.js"; import { emitAgentEvent } from "../infra/agent-events.js"; import { getGlobalHookRunner } from "../plugins/hook-runner-global.js"; import type { EmbeddedPiSubscribeContext } from "./pi-embedded-subscribe.handlers.types.js"; @@ -51,7 +52,16 @@ export function handleAutoCompactionEnd( const hasResult = evt.result != null; const wasAborted = Boolean(evt.aborted); if (hasResult && !wasAborted) { - ctx.incrementCompactionCount?.(); + ctx.incrementCompactionCount(); + const observedCompactionCount = ctx.getCompactionCount(); + void reconcileSessionStoreCompactionCountAfterSuccess({ + sessionKey: ctx.params.sessionKey, + agentId: ctx.params.agentId, + configStore: ctx.params.config?.session?.store, + observedCompactionCount, + }).catch((err) => { + ctx.log.warn(`late compaction count reconcile failed: ${String(err)}`); + }); } if (willRetry) { ctx.noteCompactionRetry(); @@ -91,6 +101,36 @@ export function handleAutoCompactionEnd( } } +export async function reconcileSessionStoreCompactionCountAfterSuccess(params: { + sessionKey?: string; + agentId?: string; + configStore?: string; + observedCompactionCount: number; + now?: number; +}): Promise { + const { sessionKey, agentId, configStore, observedCompactionCount, now = Date.now() } = params; + if (!sessionKey || observedCompactionCount <= 0) { + return undefined; + } + const storePath = resolveStorePath(configStore, { agentId }); + const nextEntry = await updateSessionStoreEntry({ + storePath, + sessionKey, + update: async (entry) => { + const currentCount = Math.max(0, entry.compactionCount ?? 0); + const nextCount = Math.max(currentCount, observedCompactionCount); + if (nextCount === currentCount) { + return null; + } + return { + compactionCount: nextCount, + updatedAt: Math.max(entry.updatedAt ?? 0, now), + }; + }, + }); + return nextEntry?.compactionCount; +} + function clearStaleAssistantUsageOnSessionMessages(ctx: EmbeddedPiSubscribeContext): void { const messages = ctx.params.session.messages; if (!Array.isArray(messages)) { diff --git a/src/agents/pi-embedded-subscribe.handlers.lifecycle.compaction-reconcile.test.ts b/src/agents/pi-embedded-subscribe.handlers.lifecycle.compaction-reconcile.test.ts new file mode 100644 index 00000000000..8b21f376b38 --- /dev/null +++ b/src/agents/pi-embedded-subscribe.handlers.lifecycle.compaction-reconcile.test.ts @@ -0,0 +1,173 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { describe, expect, it, vi } from "vitest"; +import { createEmbeddedPiSessionEventHandler } from "./pi-embedded-subscribe.handlers.js"; +import type { EmbeddedPiSubscribeContext } from "./pi-embedded-subscribe.handlers.types.js"; + +async function seedSessionStore(params: { + storePath: string; + sessionKey: string; + compactionCount: number; + updatedAt?: number; +}) { + await fs.mkdir(path.dirname(params.storePath), { recursive: true }); + await fs.writeFile( + params.storePath, + JSON.stringify( + { + [params.sessionKey]: { + sessionId: "session-1", + updatedAt: params.updatedAt ?? 1_000, + compactionCount: params.compactionCount, + }, + }, + null, + 2, + ), + "utf-8", + ); +} + +async function readCompactionCount(storePath: string, sessionKey: string): Promise { + const store = JSON.parse(await fs.readFile(storePath, "utf-8")) as Record< + string, + { compactionCount?: number } + >; + return store[sessionKey]?.compactionCount ?? 0; +} + +async function waitForCompactionCount(params: { + storePath: string; + sessionKey: string; + expected: number; +}) { + for (let attempt = 0; attempt < 40; attempt += 1) { + if ((await readCompactionCount(params.storePath, params.sessionKey)) === params.expected) { + return; + } + await new Promise((resolve) => setTimeout(resolve, 10)); + } + throw new Error(`timed out waiting for compactionCount=${params.expected}`); +} + +function createLifecycleContext(params: { + storePath: string; + sessionKey: string; + initialCount: number; + agentId?: string; +}): EmbeddedPiSubscribeContext { + let compactionCount = params.initialCount; + return { + params: { + runId: "run-lifecycle-test", + session: { messages: [] } as never, + config: { session: { store: params.storePath } } as never, + sessionKey: params.sessionKey, + sessionId: "session-1", + agentId: params.agentId ?? "test-agent", + onAgentEvent: undefined, + }, + state: { + assistantTexts: [], + toolMetas: [], + toolMetaById: new Map(), + toolSummaryById: new Set(), + deltaBuffer: "", + blockBuffer: "", + blockState: { thinking: false, final: false, inlineCode: {} as never }, + partialBlockState: { thinking: false, final: false, inlineCode: {} as never }, + emittedAssistantUpdate: false, + reasoningMode: "off", + includeReasoning: false, + shouldEmitPartialReplies: true, + streamReasoning: false, + assistantMessageIndex: 0, + lastAssistantTextMessageIndex: -1, + assistantTextBaseline: 0, + suppressBlockChunks: false, + compactionInFlight: false, + pendingCompactionRetry: 0, + compactionRetryPromise: null, + unsubscribed: false, + messagingToolSentTexts: [], + messagingToolSentTextsNormalized: [], + messagingToolSentTargets: [], + messagingToolSentMediaUrls: [], + pendingMessagingTexts: new Map(), + pendingMessagingTargets: new Map(), + successfulCronAdds: 0, + pendingMessagingMediaUrls: new Map(), + deterministicApprovalPromptSent: false, + } as never, + log: { + debug: vi.fn(), + warn: vi.fn(), + }, + blockChunker: null, + noteLastAssistant: vi.fn(), + shouldEmitToolResult: () => false, + shouldEmitToolOutput: () => false, + emitToolSummary: vi.fn(), + emitToolOutput: vi.fn(), + stripBlockTags: vi.fn((text: string) => text), + emitBlockChunk: vi.fn(), + flushBlockReplyBuffer: vi.fn(), + emitReasoningStream: vi.fn(), + consumeReplyDirectives: vi.fn(), + consumePartialReplyDirectives: vi.fn(), + resetAssistantMessageState: vi.fn(), + resetForCompactionRetry: vi.fn(), + finalizeAssistantTexts: vi.fn(), + trimMessagingToolSent: vi.fn(), + ensureCompactionPromise: vi.fn(), + noteCompactionRetry: vi.fn(), + resolveCompactionRetry: vi.fn(), + maybeResolveCompactionWait: vi.fn(), + recordAssistantUsage: vi.fn(), + incrementCompactionCount: () => { + compactionCount += 1; + }, + getUsageTotals: vi.fn(), + getCompactionCount: () => compactionCount, + } as unknown as EmbeddedPiSubscribeContext; +} + +describe("createEmbeddedPiSessionEventHandler compaction reconciliation", () => { + it("reconciles sessions.json on routed auto_compaction_end success", async () => { + const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-lifecycle-compaction-")); + const storePath = path.join(tmp, "sessions.json"); + const sessionKey = "main"; + await seedSessionStore({ + storePath, + sessionKey, + compactionCount: 1, + }); + + const ctx = createLifecycleContext({ + storePath, + sessionKey, + initialCount: 1, + }); + const handleEvent = createEmbeddedPiSessionEventHandler(ctx); + + handleEvent({ type: "auto_compaction_start" }); + expect(ctx.state.compactionInFlight).toBe(true); + + handleEvent({ + type: "auto_compaction_end", + willRetry: false, + aborted: false, + result: { kept: 12 }, + }); + + await waitForCompactionCount({ + storePath, + sessionKey, + expected: 2, + }); + + expect(ctx.getCompactionCount()).toBe(2); + expect(await readCompactionCount(storePath, sessionKey)).toBe(2); + }); +});