diff --git a/docs/automation/cron-jobs.md b/docs/automation/cron-jobs.md index aae5f58fdf2..8d140192607 100644 --- a/docs/automation/cron-jobs.md +++ b/docs/automation/cron-jobs.md @@ -349,7 +349,8 @@ Notes: ## Storage & history - Job store: `~/.openclaw/cron/jobs.json` (Gateway-managed JSON). -- Run history: `~/.openclaw/cron/runs/.jsonl` (JSONL, auto-pruned). +- Run history: `~/.openclaw/cron/runs/.jsonl` (JSONL, auto-pruned by size and line count). +- Isolated cron run sessions in `sessions.json` are pruned by `cron.sessionRetention` (default `24h`; set `false` to disable). - Override store path: `cron.store` in config. ## Configuration @@ -362,10 +363,21 @@ Notes: maxConcurrentRuns: 1, // default 1 webhook: "https://example.invalid/legacy", // deprecated fallback for stored notify:true jobs webhookToken: "replace-with-dedicated-webhook-token", // optional bearer token for webhook mode + sessionRetention: "24h", // duration string or false + runLog: { + maxBytes: "2mb", // default 2_000_000 bytes + keepLines: 2000, // default 2000 + }, }, } ``` +Run-log pruning behavior: + +- `cron.runLog.maxBytes`: max run-log file size before pruning. +- `cron.runLog.keepLines`: when pruning, keep only the newest N lines. +- Both apply to `cron/runs/.jsonl` files. + Webhook behavior: - Preferred: set `delivery.mode: "webhook"` with `delivery.to: "https://..."` per job. @@ -380,6 +392,85 @@ Disable cron entirely: - `cron.enabled: false` (config) - `OPENCLAW_SKIP_CRON=1` (env) +## Maintenance + +Cron has two built-in maintenance paths: isolated run-session retention and run-log pruning. + +### Defaults + +- `cron.sessionRetention`: `24h` (set `false` to disable run-session pruning) +- `cron.runLog.maxBytes`: `2_000_000` bytes +- `cron.runLog.keepLines`: `2000` + +### How it works + +- Isolated runs create session entries (`...:cron::run:`) and transcript files. +- The reaper removes expired run-session entries older than `cron.sessionRetention`. +- For removed run sessions no longer referenced by the session store, OpenClaw archives transcript files and purges old deleted archives on the same retention window. +- After each run append, `cron/runs/.jsonl` is size-checked: + - if file size exceeds `runLog.maxBytes`, it is trimmed to the newest `runLog.keepLines` lines. + +### Performance caveat for high volume schedulers + +High-frequency cron setups can generate large run-session and run-log footprints. Maintenance is built in, but loose limits can still create avoidable IO and cleanup work. + +What to watch: + +- long `cron.sessionRetention` windows with many isolated runs +- high `cron.runLog.keepLines` combined with large `runLog.maxBytes` +- many noisy recurring jobs writing to the same `cron/runs/.jsonl` + +What to do: + +- keep `cron.sessionRetention` as short as your debugging/audit needs allow +- keep run logs bounded with moderate `runLog.maxBytes` and `runLog.keepLines` +- move noisy background jobs to isolated mode with delivery rules that avoid unnecessary chatter +- review growth periodically with `openclaw cron runs` and adjust retention before logs become large + +### Customize examples + +Keep run sessions for a week and allow bigger run logs: + +```json5 +{ + cron: { + sessionRetention: "7d", + runLog: { + maxBytes: "10mb", + keepLines: 5000, + }, + }, +} +``` + +Disable isolated run-session pruning but keep run-log pruning: + +```json5 +{ + cron: { + sessionRetention: false, + runLog: { + maxBytes: "5mb", + keepLines: 3000, + }, + }, +} +``` + +Tune for high-volume cron usage (example): + +```json5 +{ + cron: { + sessionRetention: "12h", + runLog: { + maxBytes: "3mb", + keepLines: 1500, + }, + }, +} +``` + ## CLI quickstart One-shot reminder (UTC ISO, auto-delete after success): diff --git a/docs/cli/cron.md b/docs/cli/cron.md index 3e56db9717a..9c129518e21 100644 --- a/docs/cli/cron.md +++ b/docs/cli/cron.md @@ -23,6 +23,11 @@ Note: one-shot (`--at`) jobs delete after success by default. Use `--keep-after- Note: recurring jobs now use exponential retry backoff after consecutive errors (30s → 1m → 5m → 15m → 60m), then return to normal schedule after the next successful run. +Note: retention/pruning is controlled in config: + +- `cron.sessionRetention` (default `24h`) prunes completed isolated run sessions. +- `cron.runLog.maxBytes` + `cron.runLog.keepLines` prune `~/.openclaw/cron/runs/.jsonl`. + ## Common edits Update delivery settings without changing the message: diff --git a/docs/cli/doctor.md b/docs/cli/doctor.md index 7dc1f6fc1b8..dff899d7cd2 100644 --- a/docs/cli/doctor.md +++ b/docs/cli/doctor.md @@ -27,6 +27,7 @@ Notes: - Interactive prompts (like keychain/OAuth fixes) only run when stdin is a TTY and `--non-interactive` is **not** set. Headless runs (cron, Telegram, no terminal) will skip prompts. - `--fix` (alias for `--repair`) writes a backup to `~/.openclaw/openclaw.json.bak` and drops unknown config keys, listing each removal. +- State integrity checks now detect orphan transcript files in the sessions directory and can archive them as `.deleted.` to reclaim space safely. ## macOS: `launchctl` env overrides diff --git a/docs/cli/sessions.md b/docs/cli/sessions.md index 0709bc1f0df..a15e4250c13 100644 --- a/docs/cli/sessions.md +++ b/docs/cli/sessions.md @@ -14,3 +14,26 @@ openclaw sessions openclaw sessions --active 120 openclaw sessions --json ``` + +## Cleanup maintenance + +Run maintenance now (instead of waiting for the next write cycle): + +```bash +openclaw sessions cleanup --dry-run +openclaw sessions cleanup --enforce +openclaw sessions cleanup --enforce --active-key "agent:main:telegram:dm:123" +openclaw sessions cleanup --json +``` + +`openclaw sessions cleanup` uses `session.maintenance` settings from config: + +- `--dry-run`: preview how many entries would be pruned/capped without writing. +- `--enforce`: apply maintenance even when `session.maintenance.mode` is `warn`. +- `--active-key `: protect a specific active key from disk-budget eviction. +- `--store `: run against a specific `sessions.json` file. +- `--json`: print one JSON summary object. Dry-run output includes projected `diskBudget` impact (`totalBytesBefore/After`, `removedFiles`, `removedEntries`) when disk budgeting is enabled. + +Related: + +- Session config: [Configuration reference](/gateway/configuration-reference#session) diff --git a/docs/concepts/session.md b/docs/concepts/session.md index 3d1503ab80e..81550a032ed 100644 --- a/docs/concepts/session.md +++ b/docs/concepts/session.md @@ -71,6 +71,109 @@ All session state is **owned by the gateway** (the “master” OpenClaw). UI cl - Session entries include `origin` metadata (label + routing hints) so UIs can explain where a session came from. - OpenClaw does **not** read legacy Pi/Tau session folders. +## Maintenance + +OpenClaw applies session-store maintenance to keep `sessions.json` and transcript artifacts bounded over time. + +### Defaults + +- `session.maintenance.mode`: `warn` +- `session.maintenance.pruneAfter`: `30d` +- `session.maintenance.maxEntries`: `500` +- `session.maintenance.rotateBytes`: `10mb` +- `session.maintenance.resetArchiveRetention`: defaults to `pruneAfter` (`30d`) +- `session.maintenance.maxDiskBytes`: unset (disabled) +- `session.maintenance.highWaterBytes`: defaults to `80%` of `maxDiskBytes` when budgeting is enabled + +### How it works + +Maintenance runs during session-store writes, and you can trigger it on demand with `openclaw sessions cleanup`. + +- `mode: "warn"`: reports what would be evicted but does not mutate entries/transcripts. +- `mode: "enforce"`: applies cleanup in this order: + 1. prune stale entries older than `pruneAfter` + 2. cap entry count to `maxEntries` (oldest first) + 3. archive transcript files for removed entries that are no longer referenced + 4. purge old `*.deleted.` and `*.reset.` archives by retention policy + 5. rotate `sessions.json` when it exceeds `rotateBytes` + 6. if `maxDiskBytes` is set, enforce disk budget toward `highWaterBytes` (oldest artifacts first, then oldest sessions) + +### Performance caveat for large stores + +Large session stores are common in high-volume setups. Maintenance work is write-path work, so very large stores can increase write latency. + +What increases cost most: + +- very high `session.maintenance.maxEntries` values +- long `pruneAfter` windows that keep stale entries around +- many transcript/archive artifacts in `~/.openclaw/agents//sessions/` +- enabling disk budgets (`maxDiskBytes`) without reasonable pruning/cap limits + +What to do: + +- use `mode: "enforce"` in production so growth is bounded automatically +- set both time and count limits (`pruneAfter` + `maxEntries`), not just one +- set `maxDiskBytes` + `highWaterBytes` for hard upper bounds in large deployments +- keep `highWaterBytes` meaningfully below `maxDiskBytes` (default is 80%) +- run `openclaw sessions cleanup --dry-run --json` after config changes to verify projected impact before enforcing +- for frequent active sessions, pass `--active-key` when running manual cleanup + +### Customize examples + +Use a conservative enforce policy: + +```json5 +{ + session: { + maintenance: { + mode: "enforce", + pruneAfter: "45d", + maxEntries: 800, + rotateBytes: "20mb", + resetArchiveRetention: "14d", + }, + }, +} +``` + +Enable a hard disk budget for the sessions directory: + +```json5 +{ + session: { + maintenance: { + mode: "enforce", + maxDiskBytes: "1gb", + highWaterBytes: "800mb", + }, + }, +} +``` + +Tune for larger installs (example): + +```json5 +{ + session: { + maintenance: { + mode: "enforce", + pruneAfter: "14d", + maxEntries: 2000, + rotateBytes: "25mb", + maxDiskBytes: "2gb", + highWaterBytes: "1.6gb", + }, + }, +} +``` + +Preview or force maintenance from CLI: + +```bash +openclaw sessions cleanup --dry-run +openclaw sessions cleanup --enforce +``` + ## Session pruning OpenClaw trims **old tool results** from the in-memory context right before LLM calls by default. diff --git a/docs/gateway/configuration-examples.md b/docs/gateway/configuration-examples.md index 960f37c005b..6d310b0a32d 100644 --- a/docs/gateway/configuration-examples.md +++ b/docs/gateway/configuration-examples.md @@ -169,6 +169,9 @@ Save to `~/.openclaw/openclaw.json` and you can DM the bot from that number. pruneAfter: "30d", maxEntries: 500, rotateBytes: "10mb", + resetArchiveRetention: "30d", // duration or false + maxDiskBytes: "500mb", // optional + highWaterBytes: "400mb", // optional (defaults to 80% of maxDiskBytes) }, typingIntervalSeconds: 5, sendPolicy: { @@ -355,6 +358,10 @@ Save to `~/.openclaw/openclaw.json` and you can DM the bot from that number. store: "~/.openclaw/cron/cron.json", maxConcurrentRuns: 2, sessionRetention: "24h", + runLog: { + maxBytes: "2mb", + keepLines: 2000, + }, }, // Webhooks diff --git a/docs/gateway/configuration-reference.md b/docs/gateway/configuration-reference.md index adde566e886..77379112907 100644 --- a/docs/gateway/configuration-reference.md +++ b/docs/gateway/configuration-reference.md @@ -1246,6 +1246,9 @@ See [Multi-Agent Sandbox & Tools](/tools/multi-agent-sandbox-tools) for preceden pruneAfter: "30d", maxEntries: 500, rotateBytes: "10mb", + resetArchiveRetention: "30d", // duration or false + maxDiskBytes: "500mb", // optional hard budget + highWaterBytes: "400mb", // optional cleanup target }, threadBindings: { enabled: true, @@ -1273,7 +1276,14 @@ See [Multi-Agent Sandbox & Tools](/tools/multi-agent-sandbox-tools) for preceden - **`resetByType`**: per-type overrides (`direct`, `group`, `thread`). Legacy `dm` accepted as alias for `direct`. - **`mainKey`**: legacy field. Runtime now always uses `"main"` for the main direct-chat bucket. - **`sendPolicy`**: match by `channel`, `chatType` (`direct|group|channel`, with legacy `dm` alias), `keyPrefix`, or `rawKeyPrefix`. First deny wins. -- **`maintenance`**: `warn` warns the active session on eviction; `enforce` applies pruning and rotation. +- **`maintenance`**: session-store cleanup + retention controls. + - `mode`: `warn` emits warnings only; `enforce` applies cleanup. + - `pruneAfter`: age cutoff for stale entries (default `30d`). + - `maxEntries`: maximum number of entries in `sessions.json` (default `500`). + - `rotateBytes`: rotate `sessions.json` when it exceeds this size (default `10mb`). + - `resetArchiveRetention`: retention for `*.reset.` transcript archives. Defaults to `pruneAfter`; set `false` to disable. + - `maxDiskBytes`: optional sessions-directory disk budget. In `warn` mode it logs warnings; in `enforce` mode it removes oldest artifacts/sessions first. + - `highWaterBytes`: optional target after budget cleanup. Defaults to `80%` of `maxDiskBytes`. - **`threadBindings`**: global defaults for thread-bound session features. - `enabled`: master default switch (providers can override; Discord uses `channels.discord.threadBindings.enabled`) - `ttlHours`: default auto-unfocus TTL in hours (`0` disables; providers can override) @@ -2459,11 +2469,17 @@ Current builds no longer include the TCP bridge. Nodes connect over the Gateway webhook: "https://example.invalid/legacy", // deprecated fallback for stored notify:true jobs webhookToken: "replace-with-dedicated-token", // optional bearer token for outbound webhook auth sessionRetention: "24h", // duration string or false + runLog: { + maxBytes: "2mb", // default 2_000_000 bytes + keepLines: 2000, // default 2000 + }, }, } ``` -- `sessionRetention`: how long to keep completed cron sessions before pruning. Default: `24h`. +- `sessionRetention`: how long to keep completed isolated cron run sessions before pruning from `sessions.json`. Also controls cleanup of archived deleted cron transcripts. Default: `24h`; set `false` to disable. +- `runLog.maxBytes`: max size per run log file (`cron/runs/.jsonl`) before pruning. Default: `2_000_000` bytes. +- `runLog.keepLines`: newest lines retained when run-log pruning is triggered. Default: `2000`. - `webhookToken`: bearer token used for cron webhook POST delivery (`delivery.mode = "webhook"`), if omitted no auth header is sent. - `webhook`: deprecated legacy fallback webhook URL (http/https) used only for stored jobs that still have `notify: true`. diff --git a/docs/gateway/configuration.md b/docs/gateway/configuration.md index e367b4caf0d..f4fea3b5a35 100644 --- a/docs/gateway/configuration.md +++ b/docs/gateway/configuration.md @@ -251,11 +251,17 @@ When validation fails: enabled: true, maxConcurrentRuns: 2, sessionRetention: "24h", + runLog: { + maxBytes: "2mb", + keepLines: 2000, + }, }, } ``` - See [Cron jobs](/automation/cron-jobs) for the feature overview and CLI examples. + - `sessionRetention`: prune completed isolated run sessions from `sessions.json` (default `24h`; set `false` to disable). + - `runLog`: prune `cron/runs/.jsonl` by size and retained lines. + - See [Cron jobs](/automation/cron-jobs) for feature overview and CLI examples. diff --git a/docs/reference/session-management-compaction.md b/docs/reference/session-management-compaction.md index 3a08575454e..aff09a303e8 100644 --- a/docs/reference/session-management-compaction.md +++ b/docs/reference/session-management-compaction.md @@ -65,6 +65,44 @@ OpenClaw resolves these via `src/config/sessions.ts`. --- +## Store maintenance and disk controls + +Session persistence has automatic maintenance controls (`session.maintenance`) for `sessions.json` and transcript artifacts: + +- `mode`: `warn` (default) or `enforce` +- `pruneAfter`: stale-entry age cutoff (default `30d`) +- `maxEntries`: cap entries in `sessions.json` (default `500`) +- `rotateBytes`: rotate `sessions.json` when oversized (default `10mb`) +- `resetArchiveRetention`: retention for `*.reset.` transcript archives (default: same as `pruneAfter`; `false` disables cleanup) +- `maxDiskBytes`: optional sessions-directory budget +- `highWaterBytes`: optional target after cleanup (default `80%` of `maxDiskBytes`) + +Enforcement order for disk budget cleanup (`mode: "enforce"`): + +1. Remove oldest archived or orphan transcript artifacts first. +2. If still above the target, evict oldest session entries and their transcript files. +3. Keep going until usage is at or below `highWaterBytes`. + +In `mode: "warn"`, OpenClaw reports potential evictions but does not mutate the store/files. + +Run maintenance on demand: + +```bash +openclaw sessions cleanup --dry-run +openclaw sessions cleanup --enforce +``` + +--- + +## Cron sessions and run logs + +Isolated cron runs also create session entries/transcripts, and they have dedicated retention controls: + +- `cron.sessionRetention` (default `24h`) prunes old isolated cron run sessions from the session store (`false` disables). +- `cron.runLog.maxBytes` + `cron.runLog.keepLines` prune `~/.openclaw/cron/runs/.jsonl` files (defaults: `2_000_000` bytes and `2000` lines). + +--- + ## Session keys (`sessionKey`) A `sessionKey` identifies _which conversation bucket_ you’re in (routing + isolation). diff --git a/src/cli/cli-utils.test.ts b/src/cli/cli-utils.test.ts index 95a074a6620..69f65cfb3fb 100644 --- a/src/cli/cli-utils.test.ts +++ b/src/cli/cli-utils.test.ts @@ -100,9 +100,16 @@ describe("parseDurationMs", () => { ["parses hours suffix", "2h", 7_200_000], ["parses days suffix", "2d", 172_800_000], ["supports decimals", "0.5s", 500], + ["parses composite hours+minutes", "1h30m", 5_400_000], + ["parses composite with milliseconds", "2m500ms", 120_500], ] as const; for (const [name, input, expected] of cases) { expect(parseDurationMs(input), name).toBe(expected); } }); + + it("rejects invalid composite strings", () => { + expect(() => parseDurationMs("1h30")).toThrow(); + expect(() => parseDurationMs("1h-30m")).toThrow(); + }); }); diff --git a/src/cli/parse-duration.ts b/src/cli/parse-duration.ts index 38e0aedd8cf..4ad673fb39c 100644 --- a/src/cli/parse-duration.ts +++ b/src/cli/parse-duration.ts @@ -2,6 +2,14 @@ export type DurationMsParseOptions = { defaultUnit?: "ms" | "s" | "m" | "h" | "d"; }; +const DURATION_MULTIPLIERS: Record = { + ms: 1, + s: 1000, + m: 60_000, + h: 3_600_000, + d: 86_400_000, +}; + export function parseDurationMs(raw: string, opts?: DurationMsParseOptions): number { const trimmed = String(raw ?? "") .trim() @@ -10,28 +18,51 @@ export function parseDurationMs(raw: string, opts?: DurationMsParseOptions): num throw new Error("invalid duration (empty)"); } - const m = /^(\d+(?:\.\d+)?)(ms|s|m|h|d)?$/.exec(trimmed); - if (!m) { + // Fast path for a single token (supports default unit for bare numbers). + const single = /^(\d+(?:\.\d+)?)(ms|s|m|h|d)?$/.exec(trimmed); + if (single) { + const value = Number(single[1]); + if (!Number.isFinite(value) || value < 0) { + throw new Error(`invalid duration: ${raw}`); + } + const unit = (single[2] ?? opts?.defaultUnit ?? "ms") as "ms" | "s" | "m" | "h" | "d"; + const ms = Math.round(value * DURATION_MULTIPLIERS[unit]); + if (!Number.isFinite(ms)) { + throw new Error(`invalid duration: ${raw}`); + } + return ms; + } + + // Composite form (e.g. "1h30m", "2m500ms"); each token must include a unit. + let totalMs = 0; + let consumed = 0; + const tokenRe = /(\d+(?:\.\d+)?)(ms|s|m|h|d)/g; + for (const match of trimmed.matchAll(tokenRe)) { + const [full, valueRaw, unitRaw] = match; + const index = match.index ?? -1; + if (!full || !valueRaw || !unitRaw || index < 0) { + throw new Error(`invalid duration: ${raw}`); + } + if (index !== consumed) { + throw new Error(`invalid duration: ${raw}`); + } + const value = Number(valueRaw); + if (!Number.isFinite(value) || value < 0) { + throw new Error(`invalid duration: ${raw}`); + } + const multiplier = DURATION_MULTIPLIERS[unitRaw]; + if (!multiplier) { + throw new Error(`invalid duration: ${raw}`); + } + totalMs += value * multiplier; + consumed += full.length; + } + + if (consumed !== trimmed.length || consumed === 0) { throw new Error(`invalid duration: ${raw}`); } - const value = Number(m[1]); - if (!Number.isFinite(value) || value < 0) { - throw new Error(`invalid duration: ${raw}`); - } - - const unit = (m[2] ?? opts?.defaultUnit ?? "ms") as "ms" | "s" | "m" | "h" | "d"; - const multiplier = - unit === "ms" - ? 1 - : unit === "s" - ? 1000 - : unit === "m" - ? 60_000 - : unit === "h" - ? 3_600_000 - : 86_400_000; - const ms = Math.round(value * multiplier); + const ms = Math.round(totalMs); if (!Number.isFinite(ms)) { throw new Error(`invalid duration: ${raw}`); } diff --git a/src/cli/program/register.status-health-sessions.test.ts b/src/cli/program/register.status-health-sessions.test.ts index 10ee685a79c..cdbd188e652 100644 --- a/src/cli/program/register.status-health-sessions.test.ts +++ b/src/cli/program/register.status-health-sessions.test.ts @@ -4,6 +4,7 @@ import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; const statusCommand = vi.fn(); const healthCommand = vi.fn(); const sessionsCommand = vi.fn(); +const sessionsCleanupCommand = vi.fn(); const setVerbose = vi.fn(); const runtime = { @@ -24,6 +25,10 @@ vi.mock("../../commands/sessions.js", () => ({ sessionsCommand, })); +vi.mock("../../commands/sessions-cleanup.js", () => ({ + sessionsCleanupCommand, +})); + vi.mock("../../globals.js", () => ({ setVerbose, })); @@ -50,6 +55,7 @@ describe("registerStatusHealthSessionsCommands", () => { statusCommand.mockResolvedValue(undefined); healthCommand.mockResolvedValue(undefined); sessionsCommand.mockResolvedValue(undefined); + sessionsCleanupCommand.mockResolvedValue(undefined); }); it("runs status command with timeout and debug-derived verbose", async () => { @@ -133,4 +139,29 @@ describe("registerStatusHealthSessionsCommands", () => { runtime, ); }); + + it("runs sessions cleanup subcommand with forwarded options", async () => { + await runCli([ + "sessions", + "cleanup", + "--store", + "/tmp/sessions.json", + "--dry-run", + "--enforce", + "--active-key", + "agent:main:main", + "--json", + ]); + + expect(sessionsCleanupCommand).toHaveBeenCalledWith( + expect.objectContaining({ + store: "/tmp/sessions.json", + dryRun: true, + enforce: true, + activeKey: "agent:main:main", + json: true, + }), + runtime, + ); + }); }); diff --git a/src/cli/program/register.status-health-sessions.ts b/src/cli/program/register.status-health-sessions.ts index 1aa092a4fe7..f497f4d6382 100644 --- a/src/cli/program/register.status-health-sessions.ts +++ b/src/cli/program/register.status-health-sessions.ts @@ -1,5 +1,6 @@ import type { Command } from "commander"; import { healthCommand } from "../../commands/health.js"; +import { sessionsCleanupCommand } from "../../commands/sessions-cleanup.js"; import { sessionsCommand } from "../../commands/sessions.js"; import { statusCommand } from "../../commands/status.js"; import { setVerbose } from "../../globals.js"; @@ -111,7 +112,7 @@ export function registerStatusHealthSessionsCommands(program: Command) { }); }); - program + const sessionsCmd = program .command("sessions") .description("List stored conversation sessions") .option("--json", "Output as JSON", false) @@ -146,4 +147,46 @@ export function registerStatusHealthSessionsCommands(program: Command) { defaultRuntime, ); }); + sessionsCmd.enablePositionalOptions(); + + sessionsCmd + .command("cleanup") + .description("Run session-store maintenance now") + .option("--store ", "Path to session store (default: resolved from config)") + .option("--dry-run", "Preview maintenance actions without writing", false) + .option("--enforce", "Apply maintenance even when configured mode is warn", false) + .option("--active-key ", "Protect this session key from budget-eviction") + .option("--json", "Output JSON", false) + .addHelpText( + "after", + () => + `\n${theme.heading("Examples:")}\n${formatHelpExamples([ + ["openclaw sessions cleanup --dry-run", "Preview stale/cap cleanup."], + ["openclaw sessions cleanup --enforce", "Apply maintenance now."], + [ + "openclaw sessions cleanup --enforce --store ./tmp/sessions.json", + "Use a specific store.", + ], + ])}`, + ) + .action(async (opts, command) => { + const parentOpts = command.parent?.opts() as + | { + store?: string; + json?: boolean; + } + | undefined; + await runCommandWithRuntime(defaultRuntime, async () => { + await sessionsCleanupCommand( + { + store: (opts.store as string | undefined) ?? parentOpts?.store, + dryRun: Boolean(opts.dryRun), + enforce: Boolean(opts.enforce), + activeKey: opts.activeKey as string | undefined, + json: Boolean(opts.json || parentOpts?.json), + }, + defaultRuntime, + ); + }); + }); } diff --git a/src/commands/doctor-state-integrity.test.ts b/src/commands/doctor-state-integrity.test.ts index 50dd5c89114..7a89677175c 100644 --- a/src/commands/doctor-state-integrity.test.ts +++ b/src/commands/doctor-state-integrity.test.ts @@ -124,4 +124,23 @@ describe("doctor state integrity oauth dir checks", () => { expect(confirmSkipInNonInteractive).toHaveBeenCalledWith(OAUTH_PROMPT_MATCHER); expect(stateIntegrityText()).toContain("CRITICAL: OAuth dir missing"); }); + + it("detects orphan transcripts and offers archival remediation", async () => { + const cfg: OpenClawConfig = {}; + setupSessionState(cfg, process.env, process.env.HOME ?? ""); + const sessionsDir = resolveSessionTranscriptsDirForAgent("main", process.env, () => tempHome); + fs.writeFileSync(path.join(sessionsDir, "orphan-session.jsonl"), '{"type":"session"}\n'); + const confirmSkipInNonInteractive = vi.fn(async (params: { message: string }) => + params.message.includes("orphan transcript file"), + ); + await noteStateIntegrity(cfg, { confirmSkipInNonInteractive }); + expect(stateIntegrityText()).toContain("orphan transcript file"); + expect(confirmSkipInNonInteractive).toHaveBeenCalledWith( + expect.objectContaining({ + message: expect.stringContaining("orphan transcript file"), + }), + ); + const files = fs.readdirSync(sessionsDir); + expect(files.some((name) => name.startsWith("orphan-session.jsonl.deleted."))).toBe(true); + }); }); diff --git a/src/commands/doctor-state-integrity.ts b/src/commands/doctor-state-integrity.ts index d5beae1cec6..be2c99a3f20 100644 --- a/src/commands/doctor-state-integrity.ts +++ b/src/commands/doctor-state-integrity.ts @@ -5,6 +5,8 @@ import { resolveDefaultAgentId } from "../agents/agent-scope.js"; import type { OpenClawConfig } from "../config/config.js"; import { resolveOAuthDir, resolveStateDir } from "../config/paths.js"; import { + formatSessionArchiveTimestamp, + isPrimarySessionTranscriptFileName, loadSessionStore, resolveMainSessionKey, resolveSessionFilePath, @@ -435,6 +437,54 @@ export async function noteStateIntegrity( } } + if (existsDir(sessionsDir)) { + const referencedTranscriptPaths = new Set(); + for (const [, entry] of entries) { + if (!entry?.sessionId) { + continue; + } + try { + referencedTranscriptPaths.add( + path.resolve(resolveSessionFilePath(entry.sessionId, entry, sessionPathOpts)), + ); + } catch { + // ignore invalid legacy paths + } + } + const sessionDirEntries = fs.readdirSync(sessionsDir, { withFileTypes: true }); + const orphanTranscriptPaths = sessionDirEntries + .filter((entry) => entry.isFile() && isPrimarySessionTranscriptFileName(entry.name)) + .map((entry) => path.resolve(path.join(sessionsDir, entry.name))) + .filter((filePath) => !referencedTranscriptPaths.has(filePath)); + if (orphanTranscriptPaths.length > 0) { + warnings.push( + `- Found ${orphanTranscriptPaths.length} orphan transcript file(s) in ${displaySessionsDir}. They are not referenced by sessions.json and can consume disk over time.`, + ); + const archiveOrphans = await prompter.confirmSkipInNonInteractive({ + message: `Archive ${orphanTranscriptPaths.length} orphan transcript file(s) in ${displaySessionsDir}?`, + initialValue: false, + }); + if (archiveOrphans) { + let archived = 0; + const archivedAt = formatSessionArchiveTimestamp(); + for (const orphanPath of orphanTranscriptPaths) { + const archivedPath = `${orphanPath}.deleted.${archivedAt}`; + try { + fs.renameSync(orphanPath, archivedPath); + archived += 1; + } catch (err) { + warnings.push( + `- Failed to archive orphan transcript ${shortenHomePath(orphanPath)}: ${String(err)}`, + ); + } + } + if (archived > 0) { + changes.push(`- Archived ${archived} orphan transcript file(s) in ${displaySessionsDir}`); + } + } + } + } + if (warnings.length > 0) { note(warnings.join("\n"), "State integrity"); } diff --git a/src/commands/sessions-cleanup.test.ts b/src/commands/sessions-cleanup.test.ts new file mode 100644 index 00000000000..3436d993912 --- /dev/null +++ b/src/commands/sessions-cleanup.test.ts @@ -0,0 +1,154 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; +import type { SessionEntry } from "../config/sessions.js"; +import type { RuntimeEnv } from "../runtime.js"; + +const mocks = vi.hoisted(() => ({ + loadConfig: vi.fn(), + resolveDefaultAgentId: vi.fn(), + resolveStorePath: vi.fn(), + resolveMaintenanceConfig: vi.fn(), + loadSessionStore: vi.fn(), + pruneStaleEntries: vi.fn(), + capEntryCount: vi.fn(), + updateSessionStore: vi.fn(), + enforceSessionDiskBudget: vi.fn(), +})); + +vi.mock("../config/config.js", () => ({ + loadConfig: mocks.loadConfig, +})); + +vi.mock("../agents/agent-scope.js", () => ({ + resolveDefaultAgentId: mocks.resolveDefaultAgentId, +})); + +vi.mock("../config/sessions.js", () => ({ + resolveStorePath: mocks.resolveStorePath, + resolveMaintenanceConfig: mocks.resolveMaintenanceConfig, + loadSessionStore: mocks.loadSessionStore, + pruneStaleEntries: mocks.pruneStaleEntries, + capEntryCount: mocks.capEntryCount, + updateSessionStore: mocks.updateSessionStore, + enforceSessionDiskBudget: mocks.enforceSessionDiskBudget, +})); + +import { sessionsCleanupCommand } from "./sessions-cleanup.js"; + +function makeRuntime(): { runtime: RuntimeEnv; logs: string[] } { + const logs: string[] = []; + return { + runtime: { + log: (msg: unknown) => logs.push(String(msg)), + error: () => {}, + exit: () => {}, + }, + logs, + }; +} + +describe("sessionsCleanupCommand", () => { + beforeEach(() => { + vi.clearAllMocks(); + mocks.loadConfig.mockReturnValue({ session: { store: "/cfg/sessions.json" } }); + mocks.resolveDefaultAgentId.mockReturnValue("main"); + mocks.resolveStorePath.mockReturnValue("/resolved/sessions.json"); + mocks.resolveMaintenanceConfig.mockReturnValue({ + mode: "warn", + pruneAfterMs: 7 * 24 * 60 * 60 * 1000, + maxEntries: 500, + rotateBytes: 10_485_760, + resetArchiveRetentionMs: 7 * 24 * 60 * 60 * 1000, + maxDiskBytes: null, + highWaterBytes: null, + }); + mocks.pruneStaleEntries.mockImplementation((store: Record) => { + if (store.stale) { + delete store.stale; + return 1; + } + return 0; + }); + mocks.capEntryCount.mockImplementation(() => 0); + mocks.updateSessionStore.mockResolvedValue(undefined); + mocks.enforceSessionDiskBudget.mockResolvedValue({ + totalBytesBefore: 1000, + totalBytesAfter: 700, + removedFiles: 1, + removedEntries: 1, + freedBytes: 300, + maxBytes: 900, + highWaterBytes: 700, + overBudget: true, + }); + }); + + it("emits a single JSON object for non-dry runs and applies maintenance", async () => { + mocks.loadSessionStore + .mockReturnValueOnce({ + stale: { sessionId: "stale", updatedAt: 1 }, + fresh: { sessionId: "fresh", updatedAt: 2 }, + }) + .mockReturnValueOnce({ + fresh: { sessionId: "fresh", updatedAt: 2 }, + }); + + const { runtime, logs } = makeRuntime(); + await sessionsCleanupCommand( + { + json: true, + enforce: true, + activeKey: "agent:main:main", + }, + runtime, + ); + + expect(logs).toHaveLength(1); + const payload = JSON.parse(logs[0] ?? "{}") as Record; + expect(payload.applied).toBe(true); + expect(payload.mode).toBe("enforce"); + expect(payload.beforeCount).toBe(2); + expect(payload.appliedCount).toBe(1); + expect(payload.diskBudget).toEqual( + expect.objectContaining({ + removedFiles: 1, + removedEntries: 1, + }), + ); + expect(mocks.updateSessionStore).toHaveBeenCalledWith( + "/resolved/sessions.json", + expect.any(Function), + expect.objectContaining({ + activeSessionKey: "agent:main:main", + maintenanceOverride: { mode: "enforce" }, + }), + ); + }); + + it("returns dry-run JSON without mutating the store", async () => { + mocks.loadSessionStore.mockReturnValue({ + stale: { sessionId: "stale", updatedAt: 1 }, + fresh: { sessionId: "fresh", updatedAt: 2 }, + }); + + const { runtime, logs } = makeRuntime(); + await sessionsCleanupCommand( + { + json: true, + dryRun: true, + }, + runtime, + ); + + expect(logs).toHaveLength(1); + const payload = JSON.parse(logs[0] ?? "{}") as Record; + expect(payload.dryRun).toBe(true); + expect(payload.applied).toBeUndefined(); + expect(mocks.updateSessionStore).not.toHaveBeenCalled(); + expect(payload.diskBudget).toEqual( + expect.objectContaining({ + removedFiles: 1, + removedEntries: 1, + }), + ); + }); +}); diff --git a/src/commands/sessions-cleanup.ts b/src/commands/sessions-cleanup.ts new file mode 100644 index 00000000000..fd2c79f4051 --- /dev/null +++ b/src/commands/sessions-cleanup.ts @@ -0,0 +1,113 @@ +import { resolveDefaultAgentId } from "../agents/agent-scope.js"; +import { loadConfig } from "../config/config.js"; +import { + capEntryCount, + enforceSessionDiskBudget, + loadSessionStore, + pruneStaleEntries, + resolveMaintenanceConfig, + resolveStorePath, + updateSessionStore, +} from "../config/sessions.js"; +import type { RuntimeEnv } from "../runtime.js"; + +export type SessionsCleanupOptions = { + store?: string; + dryRun?: boolean; + enforce?: boolean; + activeKey?: string; + json?: boolean; +}; + +export async function sessionsCleanupCommand(opts: SessionsCleanupOptions, runtime: RuntimeEnv) { + const cfg = loadConfig(); + const defaultAgentId = resolveDefaultAgentId(cfg); + const storePath = resolveStorePath(opts.store ?? cfg.session?.store, { agentId: defaultAgentId }); + const maintenance = resolveMaintenanceConfig(); + const effectiveMode = opts.enforce ? "enforce" : maintenance.mode; + + const beforeStore = loadSessionStore(storePath, { skipCache: true }); + const previewStore = structuredClone(beforeStore); + const pruned = pruneStaleEntries(previewStore, maintenance.pruneAfterMs, { log: false }); + const capped = capEntryCount(previewStore, maintenance.maxEntries, { log: false }); + const diskBudget = await enforceSessionDiskBudget({ + store: previewStore, + storePath, + activeSessionKey: opts.activeKey, + maintenance, + warnOnly: false, + dryRun: true, + }); + const beforeCount = Object.keys(beforeStore).length; + const afterPreviewCount = Object.keys(previewStore).length; + const wouldMutate = + pruned > 0 || + capped > 0 || + Boolean((diskBudget?.removedEntries ?? 0) > 0 || (diskBudget?.removedFiles ?? 0) > 0); + + const summary = { + storePath, + mode: effectiveMode, + dryRun: Boolean(opts.dryRun), + beforeCount, + afterCount: afterPreviewCount, + pruned, + capped, + diskBudget, + wouldMutate, + }; + + if (opts.json && opts.dryRun) { + runtime.log(JSON.stringify(summary, null, 2)); + return; + } + + if (!opts.json) { + runtime.log(`Session store: ${storePath}`); + runtime.log(`Maintenance mode: ${effectiveMode}`); + runtime.log(`Entries: ${beforeCount} -> ${afterPreviewCount}`); + runtime.log(`Would prune stale: ${pruned}`); + runtime.log(`Would cap overflow: ${capped}`); + if (diskBudget) { + runtime.log( + `Would enforce disk budget: ${diskBudget.totalBytesBefore} -> ${diskBudget.totalBytesAfter} bytes (files ${diskBudget.removedFiles}, entries ${diskBudget.removedEntries})`, + ); + } + } + + if (opts.dryRun) { + return; + } + + await updateSessionStore( + storePath, + async () => { + // Maintenance runs in saveSessionStoreUnlocked(); no direct store mutation needed here. + }, + { + activeSessionKey: opts.activeKey, + maintenanceOverride: { + mode: effectiveMode, + }, + }, + ); + + const afterStore = loadSessionStore(storePath, { skipCache: true }); + const appliedCount = Object.keys(afterStore).length; + if (opts.json) { + runtime.log( + JSON.stringify( + { + ...summary, + applied: true, + appliedCount, + }, + null, + 2, + ), + ); + return; + } + + runtime.log(`Applied maintenance. Current entries: ${appliedCount}`); +} diff --git a/src/config/schema.help.quality.test.ts b/src/config/schema.help.quality.test.ts index 286005b0aa2..7532cedae47 100644 --- a/src/config/schema.help.quality.test.ts +++ b/src/config/schema.help.quality.test.ts @@ -110,6 +110,9 @@ const TARGET_KEYS = [ "cron.webhook", "cron.webhookToken", "cron.sessionRetention", + "cron.runLog", + "cron.runLog.maxBytes", + "cron.runLog.keepLines", "session", "session.scope", "session.dmScope", @@ -150,6 +153,9 @@ const TARGET_KEYS = [ "session.maintenance.pruneDays", "session.maintenance.maxEntries", "session.maintenance.rotateBytes", + "session.maintenance.resetArchiveRetention", + "session.maintenance.maxDiskBytes", + "session.maintenance.highWaterBytes", "approvals", "approvals.exec", "approvals.exec.enabled", @@ -663,6 +669,27 @@ describe("config help copy quality", () => { const deprecated = FIELD_HELP["session.maintenance.pruneDays"]; expect(/deprecated/i.test(deprecated)).toBe(true); expect(deprecated.includes("session.maintenance.pruneAfter")).toBe(true); + + const resetRetention = FIELD_HELP["session.maintenance.resetArchiveRetention"]; + expect(resetRetention.includes(".reset.")).toBe(true); + expect(/false/i.test(resetRetention)).toBe(true); + + const maxDisk = FIELD_HELP["session.maintenance.maxDiskBytes"]; + expect(maxDisk.includes("500mb")).toBe(true); + + const highWater = FIELD_HELP["session.maintenance.highWaterBytes"]; + expect(highWater.includes("80%")).toBe(true); + }); + + it("documents cron run-log retention controls", () => { + const runLog = FIELD_HELP["cron.runLog"]; + expect(runLog.includes("cron/runs")).toBe(true); + + const maxBytes = FIELD_HELP["cron.runLog.maxBytes"]; + expect(maxBytes.includes("2mb")).toBe(true); + + const keepLines = FIELD_HELP["cron.runLog.keepLines"]; + expect(keepLines.includes("2000")).toBe(true); }); it("documents approvals filters and target semantics", () => { diff --git a/src/config/schema.help.ts b/src/config/schema.help.ts index 84536311cf7..72be98576b3 100644 --- a/src/config/schema.help.ts +++ b/src/config/schema.help.ts @@ -999,6 +999,12 @@ export const FIELD_HELP: Record = { "Caps total session entry count retained in the store to prevent unbounded growth over time. Use lower limits for constrained environments, or higher limits when longer history is required.", "session.maintenance.rotateBytes": "Rotates the session store when file size exceeds a threshold such as `10mb` or `1gb`. Use this to bound single-file growth and keep backup/restore operations manageable.", + "session.maintenance.resetArchiveRetention": + "Retention for reset transcript archives (`*.reset.`). Accepts a duration (for example `30d`), or `false` to disable cleanup. Defaults to pruneAfter so reset artifacts do not grow forever.", + "session.maintenance.maxDiskBytes": + "Optional per-agent sessions-directory disk budget (for example `500mb`). When exceeded, warn mode reports pressure and enforce mode performs oldest-first cleanup.", + "session.maintenance.highWaterBytes": + "Target size after disk-budget cleanup (high-water mark). Defaults to 80% of maxDiskBytes; set explicitly for tighter reclaim behavior on constrained disks.", cron: "Global scheduler settings for stored cron jobs, run concurrency, delivery fallback, and run-session retention. Keep defaults unless you are scaling job volume or integrating external webhook receivers.", "cron.enabled": "Enables cron job execution for stored schedules managed by the gateway. Keep enabled for normal reminder/automation flows, and disable only to pause all cron execution without deleting jobs.", @@ -1012,6 +1018,12 @@ export const FIELD_HELP: Record = { "Bearer token attached to cron webhook POST deliveries when webhook mode is used. Prefer secret/env substitution and rotate this token regularly if shared webhook endpoints are internet-reachable.", "cron.sessionRetention": "Controls how long completed cron run sessions are kept before pruning (`24h`, `7d`, `1h30m`, or `false` to disable pruning; default: `24h`). Use shorter retention to reduce storage growth on high-frequency schedules.", + "cron.runLog": + "Pruning controls for per-job cron run history files under `cron/runs/.jsonl`, including size and line retention.", + "cron.runLog.maxBytes": + "Maximum bytes per cron run-log file before pruning rewrites to the last keepLines entries (for example `2mb`, default `2000000`).", + "cron.runLog.keepLines": + "How many trailing run-log lines to retain when a file exceeds maxBytes (default `2000`). Increase for longer forensic history or lower for smaller disks.", hooks: "Inbound webhook automation surface for mapping external events into wake or agent actions in OpenClaw. Keep this locked down with explicit token/session/agent controls before exposing it beyond trusted networks.", "hooks.enabled": diff --git a/src/config/schema.labels.ts b/src/config/schema.labels.ts index 5142c3ac8b3..aa007c23a9a 100644 --- a/src/config/schema.labels.ts +++ b/src/config/schema.labels.ts @@ -471,6 +471,9 @@ export const FIELD_LABELS: Record = { "session.maintenance.pruneDays": "Session Prune Days (Deprecated)", "session.maintenance.maxEntries": "Session Max Entries", "session.maintenance.rotateBytes": "Session Rotate Size", + "session.maintenance.resetArchiveRetention": "Session Reset Archive Retention", + "session.maintenance.maxDiskBytes": "Session Max Disk Budget", + "session.maintenance.highWaterBytes": "Session Disk High-water Target", cron: "Cron", "cron.enabled": "Cron Enabled", "cron.store": "Cron Store Path", @@ -478,6 +481,9 @@ export const FIELD_LABELS: Record = { "cron.webhook": "Cron Legacy Webhook (Deprecated)", "cron.webhookToken": "Cron Webhook Bearer Token", "cron.sessionRetention": "Cron Session Retention", + "cron.runLog": "Cron Run Log Pruning", + "cron.runLog.maxBytes": "Cron Run Log Max Bytes", + "cron.runLog.keepLines": "Cron Run Log Keep Lines", hooks: "Hooks", "hooks.enabled": "Hooks Enabled", "hooks.path": "Hooks Endpoint Path", diff --git a/src/config/sessions.ts b/src/config/sessions.ts index f4a6cbc0926..701870ec8a7 100644 --- a/src/config/sessions.ts +++ b/src/config/sessions.ts @@ -1,4 +1,5 @@ export * from "./sessions/group.js"; +export * from "./sessions/artifacts.js"; export * from "./sessions/metadata.js"; export * from "./sessions/main-session.js"; export * from "./sessions/paths.js"; @@ -9,3 +10,4 @@ export * from "./sessions/types.js"; export * from "./sessions/transcript.js"; export * from "./sessions/session-file.js"; export * from "./sessions/delivery-info.js"; +export * from "./sessions/disk-budget.js"; diff --git a/src/config/sessions/artifacts.test.ts b/src/config/sessions/artifacts.test.ts new file mode 100644 index 00000000000..c46cb3466c6 --- /dev/null +++ b/src/config/sessions/artifacts.test.ts @@ -0,0 +1,35 @@ +import { describe, expect, it } from "vitest"; +import { + formatSessionArchiveTimestamp, + isPrimarySessionTranscriptFileName, + isSessionArchiveArtifactName, + parseSessionArchiveTimestamp, +} from "./artifacts.js"; + +describe("session artifact helpers", () => { + it("classifies archived artifact file names", () => { + expect(isSessionArchiveArtifactName("abc.jsonl.deleted.2026-01-01T00-00-00.000Z")).toBe(true); + expect(isSessionArchiveArtifactName("abc.jsonl.reset.2026-01-01T00-00-00.000Z")).toBe(true); + expect(isSessionArchiveArtifactName("abc.jsonl.bak.2026-01-01T00-00-00.000Z")).toBe(true); + expect(isSessionArchiveArtifactName("sessions.json.bak.1737420882")).toBe(true); + expect(isSessionArchiveArtifactName("abc.jsonl")).toBe(false); + }); + + it("classifies primary transcript files", () => { + expect(isPrimarySessionTranscriptFileName("abc.jsonl")).toBe(true); + expect(isPrimarySessionTranscriptFileName("abc.jsonl.deleted.2026-01-01T00-00-00.000Z")).toBe( + false, + ); + expect(isPrimarySessionTranscriptFileName("sessions.json")).toBe(false); + }); + + it("formats and parses archive timestamps", () => { + const now = Date.parse("2026-02-23T12:34:56.000Z"); + const stamp = formatSessionArchiveTimestamp(now); + expect(stamp).toBe("2026-02-23T12-34-56.000Z"); + + const file = `abc.jsonl.deleted.${stamp}`; + expect(parseSessionArchiveTimestamp(file, "deleted")).toBe(now); + expect(parseSessionArchiveTimestamp(file, "reset")).toBeNull(); + }); +}); diff --git a/src/config/sessions/artifacts.ts b/src/config/sessions/artifacts.ts new file mode 100644 index 00000000000..1916047ad72 --- /dev/null +++ b/src/config/sessions/artifacts.ts @@ -0,0 +1,49 @@ +export type SessionArchiveReason = "bak" | "reset" | "deleted"; + +export function isSessionArchiveArtifactName(fileName: string): boolean { + return ( + fileName.includes(".deleted.") || + fileName.includes(".reset.") || + fileName.includes(".bak.") || + fileName.startsWith("sessions.json.bak.") + ); +} + +export function isPrimarySessionTranscriptFileName(fileName: string): boolean { + if (fileName === "sessions.json") { + return false; + } + if (!fileName.endsWith(".jsonl")) { + return false; + } + return !isSessionArchiveArtifactName(fileName); +} + +export function formatSessionArchiveTimestamp(nowMs = Date.now()): string { + return new Date(nowMs).toISOString().replaceAll(":", "-"); +} + +function restoreSessionArchiveTimestamp(raw: string): string { + const [datePart, timePart] = raw.split("T"); + if (!datePart || !timePart) { + return raw; + } + return `${datePart}T${timePart.replace(/-/g, ":")}`; +} + +export function parseSessionArchiveTimestamp( + fileName: string, + reason: SessionArchiveReason, +): number | null { + const marker = `.${reason}.`; + const index = fileName.lastIndexOf(marker); + if (index < 0) { + return null; + } + const raw = fileName.slice(index + marker.length); + if (!raw) { + return null; + } + const timestamp = Date.parse(restoreSessionArchiveTimestamp(raw)); + return Number.isNaN(timestamp) ? null : timestamp; +} diff --git a/src/config/sessions/disk-budget.ts b/src/config/sessions/disk-budget.ts new file mode 100644 index 00000000000..862992f30a3 --- /dev/null +++ b/src/config/sessions/disk-budget.ts @@ -0,0 +1,360 @@ +import fs from "node:fs"; +import path from "node:path"; +import { isPrimarySessionTranscriptFileName, isSessionArchiveArtifactName } from "./artifacts.js"; +import { resolveSessionFilePath } from "./paths.js"; +import type { SessionEntry } from "./types.js"; + +export type SessionDiskBudgetConfig = { + maxDiskBytes: number | null; + highWaterBytes: number | null; +}; + +export type SessionDiskBudgetSweepResult = { + totalBytesBefore: number; + totalBytesAfter: number; + removedFiles: number; + removedEntries: number; + freedBytes: number; + maxBytes: number; + highWaterBytes: number; + overBudget: boolean; +}; + +export type SessionDiskBudgetLogger = { + warn: (message: string, context?: Record) => void; + info: (message: string, context?: Record) => void; +}; + +const NOOP_LOGGER: SessionDiskBudgetLogger = { + warn: () => {}, + info: () => {}, +}; + +type SessionsDirFileStat = { + path: string; + name: string; + size: number; + mtimeMs: number; +}; + +function measureStoreBytes(store: Record): number { + return Buffer.byteLength(JSON.stringify(store, null, 2), "utf-8"); +} + +function measureStoreEntryChunkBytes(key: string, entry: SessionEntry): number { + const singleEntryStore = JSON.stringify({ [key]: entry }, null, 2); + if (!singleEntryStore.startsWith("{\n") || !singleEntryStore.endsWith("\n}")) { + return measureStoreBytes({ [key]: entry }) - 4; + } + const chunk = singleEntryStore.slice(2, -2); + return Buffer.byteLength(chunk, "utf-8"); +} + +function buildStoreEntryChunkSizeMap(store: Record): Map { + const out = new Map(); + for (const [key, entry] of Object.entries(store)) { + out.set(key, measureStoreEntryChunkBytes(key, entry)); + } + return out; +} + +function getEntryUpdatedAt(entry?: SessionEntry): number { + if (!entry) { + return 0; + } + const updatedAt = entry.updatedAt; + return Number.isFinite(updatedAt) ? updatedAt : 0; +} + +function buildSessionIdRefCounts(store: Record): Map { + const counts = new Map(); + for (const entry of Object.values(store)) { + const sessionId = entry?.sessionId; + if (!sessionId) { + continue; + } + counts.set(sessionId, (counts.get(sessionId) ?? 0) + 1); + } + return counts; +} + +function resolveSessionTranscriptPathForEntry(params: { + sessionsDir: string; + entry: SessionEntry; +}): string | null { + if (!params.entry.sessionId) { + return null; + } + try { + const resolved = resolveSessionFilePath(params.entry.sessionId, params.entry, { + sessionsDir: params.sessionsDir, + }); + const resolvedSessionsDir = path.resolve(params.sessionsDir); + const relative = path.relative(resolvedSessionsDir, path.resolve(resolved)); + if (!relative || relative.startsWith("..") || path.isAbsolute(relative)) { + return null; + } + return resolved; + } catch { + return null; + } +} + +function resolveReferencedSessionTranscriptPaths(params: { + sessionsDir: string; + store: Record; +}): Set { + const referenced = new Set(); + for (const entry of Object.values(params.store)) { + const resolved = resolveSessionTranscriptPathForEntry({ + sessionsDir: params.sessionsDir, + entry, + }); + if (resolved) { + referenced.add(resolved); + } + } + return referenced; +} + +async function readSessionsDirFiles(sessionsDir: string): Promise { + const dirEntries = await fs.promises + .readdir(sessionsDir, { withFileTypes: true }) + .catch(() => []); + const files: SessionsDirFileStat[] = []; + for (const dirent of dirEntries) { + if (!dirent.isFile()) { + continue; + } + const filePath = path.join(sessionsDir, dirent.name); + const stat = await fs.promises.stat(filePath).catch(() => null); + if (!stat?.isFile()) { + continue; + } + files.push({ + path: filePath, + name: dirent.name, + size: stat.size, + mtimeMs: stat.mtimeMs, + }); + } + return files; +} + +async function removeFileIfExists(filePath: string): Promise { + const stat = await fs.promises.stat(filePath).catch(() => null); + if (!stat?.isFile()) { + return 0; + } + await fs.promises.rm(filePath, { force: true }).catch(() => undefined); + return stat.size; +} + +async function removeFileForBudget(params: { + filePath: string; + dryRun: boolean; + fileSizesByPath: Map; + simulatedRemovedPaths: Set; +}): Promise { + const resolvedPath = path.resolve(params.filePath); + if (params.dryRun) { + if (params.simulatedRemovedPaths.has(resolvedPath)) { + return 0; + } + const size = params.fileSizesByPath.get(resolvedPath) ?? 0; + if (size <= 0) { + return 0; + } + params.simulatedRemovedPaths.add(resolvedPath); + return size; + } + return removeFileIfExists(resolvedPath); +} + +export async function enforceSessionDiskBudget(params: { + store: Record; + storePath: string; + activeSessionKey?: string; + maintenance: SessionDiskBudgetConfig; + warnOnly: boolean; + dryRun?: boolean; + log?: SessionDiskBudgetLogger; +}): Promise { + const maxBytes = params.maintenance.maxDiskBytes; + const highWaterBytes = params.maintenance.highWaterBytes; + if (maxBytes == null || highWaterBytes == null) { + return null; + } + const log = params.log ?? NOOP_LOGGER; + const dryRun = params.dryRun === true; + const sessionsDir = path.dirname(params.storePath); + const files = await readSessionsDirFiles(sessionsDir); + const fileSizesByPath = new Map(files.map((file) => [path.resolve(file.path), file.size])); + const simulatedRemovedPaths = new Set(); + const resolvedStorePath = path.resolve(params.storePath); + const storeFile = files.find((file) => path.resolve(file.path) === resolvedStorePath); + let projectedStoreBytes = measureStoreBytes(params.store); + let total = + files.reduce((sum, file) => sum + file.size, 0) - (storeFile?.size ?? 0) + projectedStoreBytes; + const totalBefore = total; + if (total <= maxBytes) { + return { + totalBytesBefore: totalBefore, + totalBytesAfter: total, + removedFiles: 0, + removedEntries: 0, + freedBytes: 0, + maxBytes, + highWaterBytes, + overBudget: false, + }; + } + + if (params.warnOnly) { + log.warn("session disk budget exceeded (warn-only mode)", { + sessionsDir, + totalBytes: total, + maxBytes, + highWaterBytes, + }); + return { + totalBytesBefore: totalBefore, + totalBytesAfter: total, + removedFiles: 0, + removedEntries: 0, + freedBytes: 0, + maxBytes, + highWaterBytes, + overBudget: true, + }; + } + + let removedFiles = 0; + let removedEntries = 0; + let freedBytes = 0; + + const referencedPaths = resolveReferencedSessionTranscriptPaths({ + sessionsDir, + store: params.store, + }); + const removableFileQueue = files + .filter( + (file) => + isSessionArchiveArtifactName(file.name) || + (isPrimarySessionTranscriptFileName(file.name) && !referencedPaths.has(file.path)), + ) + .toSorted((a, b) => a.mtimeMs - b.mtimeMs); + for (const file of removableFileQueue) { + if (total <= highWaterBytes) { + break; + } + const deletedBytes = await removeFileForBudget({ + filePath: file.path, + dryRun, + fileSizesByPath, + simulatedRemovedPaths, + }); + if (deletedBytes <= 0) { + continue; + } + total -= deletedBytes; + freedBytes += deletedBytes; + removedFiles += 1; + } + + if (total > highWaterBytes) { + const activeSessionKey = params.activeSessionKey?.trim().toLowerCase(); + const sessionIdRefCounts = buildSessionIdRefCounts(params.store); + const entryChunkBytesByKey = buildStoreEntryChunkSizeMap(params.store); + const keys = Object.keys(params.store).toSorted((a, b) => { + const aTime = getEntryUpdatedAt(params.store[a]); + const bTime = getEntryUpdatedAt(params.store[b]); + return aTime - bTime; + }); + for (const key of keys) { + if (total <= highWaterBytes) { + break; + } + if (activeSessionKey && key.trim().toLowerCase() === activeSessionKey) { + continue; + } + const entry = params.store[key]; + if (!entry) { + continue; + } + const previousProjectedBytes = projectedStoreBytes; + delete params.store[key]; + const chunkBytes = entryChunkBytesByKey.get(key); + entryChunkBytesByKey.delete(key); + if (typeof chunkBytes === "number" && Number.isFinite(chunkBytes) && chunkBytes >= 0) { + // Removing any one pretty-printed top-level entry always removes the entry chunk plus ",\n" (2 bytes). + projectedStoreBytes = Math.max(2, projectedStoreBytes - (chunkBytes + 2)); + } else { + projectedStoreBytes = measureStoreBytes(params.store); + } + total += projectedStoreBytes - previousProjectedBytes; + removedEntries += 1; + + const sessionId = entry.sessionId; + if (!sessionId) { + continue; + } + const nextRefCount = (sessionIdRefCounts.get(sessionId) ?? 1) - 1; + if (nextRefCount > 0) { + sessionIdRefCounts.set(sessionId, nextRefCount); + continue; + } + sessionIdRefCounts.delete(sessionId); + const transcriptPath = resolveSessionTranscriptPathForEntry({ sessionsDir, entry }); + if (!transcriptPath) { + continue; + } + const deletedBytes = await removeFileForBudget({ + filePath: transcriptPath, + dryRun, + fileSizesByPath, + simulatedRemovedPaths, + }); + if (deletedBytes <= 0) { + continue; + } + total -= deletedBytes; + freedBytes += deletedBytes; + removedFiles += 1; + } + } + + if (!dryRun) { + if (total > highWaterBytes) { + log.warn("session disk budget still above high-water target after cleanup", { + sessionsDir, + totalBytes: total, + maxBytes, + highWaterBytes, + removedFiles, + removedEntries, + }); + } else if (removedFiles > 0 || removedEntries > 0) { + log.info("applied session disk budget cleanup", { + sessionsDir, + totalBytesBefore: totalBefore, + totalBytesAfter: total, + maxBytes, + highWaterBytes, + removedFiles, + removedEntries, + }); + } + } + + return { + totalBytesBefore: totalBefore, + totalBytesAfter: total, + removedFiles, + removedEntries, + freedBytes, + maxBytes, + highWaterBytes, + overBudget: true, + }; +} diff --git a/src/config/sessions/store.pruning.integration.test.ts b/src/config/sessions/store.pruning.integration.test.ts index f1ef11e7cd3..75cf27e20a2 100644 --- a/src/config/sessions/store.pruning.integration.test.ts +++ b/src/config/sessions/store.pruning.integration.test.ts @@ -159,6 +159,40 @@ describe("Integration: saveSessionStore with pruning", () => { await expect(fs.stat(bakArchived)).resolves.toBeDefined(); }); + it("cleans up reset archives using resetArchiveRetention", async () => { + mockLoadConfig.mockReturnValue({ + session: { + maintenance: { + mode: "enforce", + pruneAfter: "30d", + resetArchiveRetention: "3d", + maxEntries: 500, + rotateBytes: 10_485_760, + }, + }, + }); + + const now = Date.now(); + const store: Record = { + fresh: { sessionId: "fresh-session", updatedAt: now }, + }; + const oldReset = path.join( + testDir, + `old-reset.jsonl.reset.${archiveTimestamp(now - 10 * DAY_MS)}`, + ); + const freshReset = path.join( + testDir, + `fresh-reset.jsonl.reset.${archiveTimestamp(now - 1 * DAY_MS)}`, + ); + await fs.writeFile(oldReset, "old", "utf-8"); + await fs.writeFile(freshReset, "fresh", "utf-8"); + + await saveSessionStore(storePath, store); + + await expect(fs.stat(oldReset)).rejects.toThrow(); + await expect(fs.stat(freshReset)).resolves.toBeDefined(); + }); + it("saveSessionStore skips enforcement when maintenance mode is warn", async () => { mockLoadConfig.mockReturnValue({ session: { @@ -180,4 +214,181 @@ describe("Integration: saveSessionStore with pruning", () => { expect(loaded.fresh).toBeDefined(); expect(Object.keys(loaded)).toHaveLength(2); }); + + it("archives transcript files for entries evicted by maxEntries capping", async () => { + mockLoadConfig.mockReturnValue({ + session: { + maintenance: { + mode: "enforce", + pruneAfter: "365d", + maxEntries: 1, + rotateBytes: 10_485_760, + }, + }, + }); + + const now = Date.now(); + const oldestSessionId = "oldest-session"; + const newestSessionId = "newest-session"; + const store: Record = { + oldest: { sessionId: oldestSessionId, updatedAt: now - DAY_MS }, + newest: { sessionId: newestSessionId, updatedAt: now }, + }; + const oldestTranscript = path.join(testDir, `${oldestSessionId}.jsonl`); + const newestTranscript = path.join(testDir, `${newestSessionId}.jsonl`); + await fs.writeFile(oldestTranscript, '{"type":"session"}\n', "utf-8"); + await fs.writeFile(newestTranscript, '{"type":"session"}\n', "utf-8"); + + await saveSessionStore(storePath, store); + + const loaded = loadSessionStore(storePath); + expect(loaded.oldest).toBeUndefined(); + expect(loaded.newest).toBeDefined(); + await expect(fs.stat(oldestTranscript)).rejects.toThrow(); + await expect(fs.stat(newestTranscript)).resolves.toBeDefined(); + const files = await fs.readdir(testDir); + expect(files.some((name) => name.startsWith(`${oldestSessionId}.jsonl.deleted.`))).toBe(true); + }); + + it("does not archive external transcript paths when capping entries", async () => { + mockLoadConfig.mockReturnValue({ + session: { + maintenance: { + mode: "enforce", + pruneAfter: "365d", + maxEntries: 1, + rotateBytes: 10_485_760, + }, + }, + }); + + const now = Date.now(); + const externalDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-external-cap-")); + const externalTranscript = path.join(externalDir, "outside.jsonl"); + await fs.writeFile(externalTranscript, "external", "utf-8"); + const store: Record = { + oldest: { + sessionId: "outside", + sessionFile: externalTranscript, + updatedAt: now - DAY_MS, + }, + newest: { sessionId: "inside", updatedAt: now }, + }; + await fs.writeFile(path.join(testDir, "inside.jsonl"), '{"type":"session"}\n', "utf-8"); + + try { + await saveSessionStore(storePath, store); + const loaded = loadSessionStore(storePath); + expect(loaded.oldest).toBeUndefined(); + expect(loaded.newest).toBeDefined(); + await expect(fs.stat(externalTranscript)).resolves.toBeDefined(); + } finally { + await fs.rm(externalDir, { recursive: true, force: true }); + } + }); + + it("enforces maxDiskBytes with oldest-first session eviction", async () => { + mockLoadConfig.mockReturnValue({ + session: { + maintenance: { + mode: "enforce", + pruneAfter: "365d", + maxEntries: 100, + rotateBytes: 10_485_760, + maxDiskBytes: 900, + highWaterBytes: 700, + }, + }, + }); + + const now = Date.now(); + const oldSessionId = "old-disk-session"; + const newSessionId = "new-disk-session"; + const store: Record = { + old: { sessionId: oldSessionId, updatedAt: now - DAY_MS }, + recent: { sessionId: newSessionId, updatedAt: now }, + }; + await fs.writeFile(path.join(testDir, `${oldSessionId}.jsonl`), "x".repeat(500), "utf-8"); + await fs.writeFile(path.join(testDir, `${newSessionId}.jsonl`), "y".repeat(500), "utf-8"); + + await saveSessionStore(storePath, store); + + const loaded = loadSessionStore(storePath); + expect(Object.keys(loaded).length).toBe(1); + expect(loaded.recent).toBeDefined(); + await expect(fs.stat(path.join(testDir, `${oldSessionId}.jsonl`))).rejects.toThrow(); + await expect(fs.stat(path.join(testDir, `${newSessionId}.jsonl`))).resolves.toBeDefined(); + }); + + it("uses projected sessions.json size to avoid over-eviction", async () => { + mockLoadConfig.mockReturnValue({ + session: { + maintenance: { + mode: "enforce", + pruneAfter: "365d", + maxEntries: 100, + rotateBytes: 10_485_760, + maxDiskBytes: 900, + highWaterBytes: 700, + }, + }, + }); + + // Simulate a stale oversized on-disk sessions.json from a previous write. + await fs.writeFile(storePath, JSON.stringify({ noisy: "x".repeat(10_000) }), "utf-8"); + + const now = Date.now(); + const store: Record = { + older: { sessionId: "older", updatedAt: now - DAY_MS }, + newer: { sessionId: "newer", updatedAt: now }, + }; + await fs.writeFile(path.join(testDir, "older.jsonl"), "x".repeat(80), "utf-8"); + await fs.writeFile(path.join(testDir, "newer.jsonl"), "y".repeat(80), "utf-8"); + + await saveSessionStore(storePath, store); + + const loaded = loadSessionStore(storePath); + expect(loaded.older).toBeDefined(); + expect(loaded.newer).toBeDefined(); + }); + + it("never deletes transcripts outside the agent sessions directory during budget cleanup", async () => { + mockLoadConfig.mockReturnValue({ + session: { + maintenance: { + mode: "enforce", + pruneAfter: "365d", + maxEntries: 100, + rotateBytes: 10_485_760, + maxDiskBytes: 500, + highWaterBytes: 300, + }, + }, + }); + + const now = Date.now(); + const externalDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-external-session-")); + const externalTranscript = path.join(externalDir, "outside.jsonl"); + await fs.writeFile(externalTranscript, "z".repeat(400), "utf-8"); + + const store: Record = { + older: { + sessionId: "outside", + sessionFile: externalTranscript, + updatedAt: now - DAY_MS, + }, + newer: { + sessionId: "inside", + updatedAt: now, + }, + }; + await fs.writeFile(path.join(testDir, "inside.jsonl"), "i".repeat(400), "utf-8"); + + try { + await saveSessionStore(storePath, store); + await expect(fs.stat(externalTranscript)).resolves.toBeDefined(); + } finally { + await fs.rm(externalDir, { recursive: true, force: true }); + } + }); }); diff --git a/src/config/sessions/store.ts b/src/config/sessions/store.ts index 54b0c0c70e0..378b9737bc9 100644 --- a/src/config/sessions/store.ts +++ b/src/config/sessions/store.ts @@ -20,6 +20,7 @@ import { import { getFileMtimeMs, isCacheEnabled, resolveCacheTtlMs } from "../cache-utils.js"; import { loadConfig } from "../config.js"; import type { SessionMaintenanceConfig, SessionMaintenanceMode } from "../types.base.js"; +import { enforceSessionDiskBudget } from "./disk-budget.js"; import { deriveSessionMetaPatch } from "./metadata.js"; import { mergeSessionEntry, type SessionEntry } from "./types.js"; @@ -299,6 +300,7 @@ const DEFAULT_SESSION_PRUNE_AFTER_MS = 30 * 24 * 60 * 60 * 1000; const DEFAULT_SESSION_MAX_ENTRIES = 500; const DEFAULT_SESSION_ROTATE_BYTES = 10_485_760; // 10 MB const DEFAULT_SESSION_MAINTENANCE_MODE: SessionMaintenanceMode = "warn"; +const DEFAULT_SESSION_DISK_BUDGET_HIGH_WATER_RATIO = 0.8; export type SessionMaintenanceWarning = { activeSessionKey: string; @@ -315,6 +317,9 @@ type ResolvedSessionMaintenanceConfig = { pruneAfterMs: number; maxEntries: number; rotateBytes: number; + resetArchiveRetentionMs: number | null; + maxDiskBytes: number | null; + highWaterBytes: number | null; }; function resolvePruneAfterMs(maintenance?: SessionMaintenanceConfig): number { @@ -341,6 +346,70 @@ function resolveRotateBytes(maintenance?: SessionMaintenanceConfig): number { } } +function resolveResetArchiveRetentionMs( + maintenance: SessionMaintenanceConfig | undefined, + pruneAfterMs: number, +): number | null { + const raw = maintenance?.resetArchiveRetention; + if (raw === false) { + return null; + } + if (raw === undefined || raw === null || raw === "") { + return pruneAfterMs; + } + try { + return parseDurationMs(String(raw).trim(), { defaultUnit: "d" }); + } catch { + return pruneAfterMs; + } +} + +function resolveMaxDiskBytes(maintenance?: SessionMaintenanceConfig): number | null { + const raw = maintenance?.maxDiskBytes; + if (raw === undefined || raw === null || raw === "") { + return null; + } + try { + return parseByteSize(String(raw).trim(), { defaultUnit: "b" }); + } catch { + return null; + } +} + +function resolveHighWaterBytes( + maintenance: SessionMaintenanceConfig | undefined, + maxDiskBytes: number | null, +): number | null { + const computeDefault = () => { + if (maxDiskBytes == null) { + return null; + } + if (maxDiskBytes <= 0) { + return 0; + } + return Math.max( + 1, + Math.min( + maxDiskBytes, + Math.floor(maxDiskBytes * DEFAULT_SESSION_DISK_BUDGET_HIGH_WATER_RATIO), + ), + ); + }; + if (maxDiskBytes == null) { + return null; + } + const raw = maintenance?.highWaterBytes; + if (raw === undefined || raw === null || raw === "") { + return computeDefault(); + } + try { + const parsed = parseByteSize(String(raw).trim(), { defaultUnit: "b" }); + return Math.min(parsed, maxDiskBytes); + } catch { + return computeDefault(); + } +} + /** * Resolve maintenance settings from openclaw.json (`session.maintenance`). * Falls back to built-in defaults when config is missing or unset. @@ -352,11 +421,16 @@ export function resolveMaintenanceConfig(): ResolvedSessionMaintenanceConfig { } catch { // Config may not be available (e.g. in tests). Use defaults. } + const pruneAfterMs = resolvePruneAfterMs(maintenance); + const maxDiskBytes = resolveMaxDiskBytes(maintenance); return { mode: maintenance?.mode ?? DEFAULT_SESSION_MAINTENANCE_MODE, - pruneAfterMs: resolvePruneAfterMs(maintenance), + pruneAfterMs, maxEntries: maintenance?.maxEntries ?? DEFAULT_SESSION_MAX_ENTRIES, rotateBytes: resolveRotateBytes(maintenance), + resetArchiveRetentionMs: resolveResetArchiveRetentionMs(maintenance, pruneAfterMs), + maxDiskBytes, + highWaterBytes: resolveHighWaterBytes(maintenance, maxDiskBytes), }; } @@ -439,7 +513,10 @@ export function getActiveSessionMaintenanceWarning(params: { export function capEntryCount( store: Record, overrideMax?: number, - opts: { log?: boolean } = {}, + opts: { + log?: boolean; + onCapped?: (params: { key: string; entry: SessionEntry }) => void; + } = {}, ): number { const maxEntries = overrideMax ?? resolveMaintenanceConfig().maxEntries; const keys = Object.keys(store); @@ -456,6 +533,10 @@ export function capEntryCount( const toRemove = sorted.slice(maxEntries); for (const key of toRemove) { + const entry = store[key]; + if (entry) { + opts.onCapped?.({ key, entry }); + } delete store[key]; } if (opts.log !== false) { @@ -539,6 +620,8 @@ type SaveSessionStoreOptions = { activeSessionKey?: string; /** Optional callback for warn-only maintenance. */ onWarn?: (warning: SessionMaintenanceWarning) => void | Promise; + /** Optional overrides used by maintenance commands. */ + maintenanceOverride?: Partial; }; async function saveSessionStoreUnlocked( @@ -553,7 +636,7 @@ async function saveSessionStoreUnlocked( if (!opts?.skipMaintenance) { // Resolve maintenance config once (avoids repeated loadConfig() calls). - const maintenance = resolveMaintenanceConfig(); + const maintenance = { ...resolveMaintenanceConfig(), ...opts?.maintenanceOverride }; const shouldWarnOnly = maintenance.mode === "warn"; if (shouldWarnOnly) { @@ -576,39 +659,80 @@ async function saveSessionStoreUnlocked( await opts?.onWarn?.(warning); } } + await enforceSessionDiskBudget({ + store, + storePath, + activeSessionKey: opts?.activeSessionKey, + maintenance, + warnOnly: true, + log, + }); } else { // Prune stale entries and cap total count before serializing. - const prunedSessionFiles = new Map(); + const removedSessionFiles = new Map(); pruneStaleEntries(store, maintenance.pruneAfterMs, { onPruned: ({ entry }) => { - if (!prunedSessionFiles.has(entry.sessionId) || entry.sessionFile) { - prunedSessionFiles.set(entry.sessionId, entry.sessionFile); + if (!removedSessionFiles.has(entry.sessionId) || entry.sessionFile) { + removedSessionFiles.set(entry.sessionId, entry.sessionFile); + } + }, + }); + capEntryCount(store, maintenance.maxEntries, { + onCapped: ({ entry }) => { + if (!removedSessionFiles.has(entry.sessionId) || entry.sessionFile) { + removedSessionFiles.set(entry.sessionId, entry.sessionFile); } }, }); - capEntryCount(store, maintenance.maxEntries); const archivedDirs = new Set(); - for (const [sessionId, sessionFile] of prunedSessionFiles) { + const referencedSessionIds = new Set( + Object.values(store) + .map((entry) => entry?.sessionId) + .filter((id): id is string => Boolean(id)), + ); + for (const [sessionId, sessionFile] of removedSessionFiles) { + if (referencedSessionIds.has(sessionId)) { + continue; + } const archived = archiveSessionTranscripts({ sessionId, storePath, sessionFile, reason: "deleted", + restrictToStoreDir: true, }); for (const archivedPath of archived) { archivedDirs.add(path.dirname(archivedPath)); } } - if (archivedDirs.size > 0) { + if (archivedDirs.size > 0 || maintenance.resetArchiveRetentionMs != null) { + const targetDirs = + archivedDirs.size > 0 ? [...archivedDirs] : [path.dirname(path.resolve(storePath))]; await cleanupArchivedSessionTranscripts({ - directories: [...archivedDirs], + directories: targetDirs, olderThanMs: maintenance.pruneAfterMs, reason: "deleted", }); + if (maintenance.resetArchiveRetentionMs != null) { + await cleanupArchivedSessionTranscripts({ + directories: targetDirs, + olderThanMs: maintenance.resetArchiveRetentionMs, + reason: "reset", + }); + } } // Rotate the on-disk file if it exceeds the size threshold. await rotateSessionFile(storePath, maintenance.rotateBytes); + + await enforceSessionDiskBudget({ + store, + storePath, + activeSessionKey: opts?.activeSessionKey, + maintenance, + warnOnly: false, + log, + }); } } diff --git a/src/config/types.base.ts b/src/config/types.base.ts index 1f59ed08069..cb1b926b53f 100644 --- a/src/config/types.base.ts +++ b/src/config/types.base.ts @@ -137,6 +137,21 @@ export type SessionMaintenanceConfig = { maxEntries?: number; /** Rotate sessions.json when it exceeds this size (e.g. "10mb"). Default: 10mb. */ rotateBytes?: number | string; + /** + * Retention for archived reset transcripts (`*.reset.`). + * Set `false` to disable reset-archive cleanup. Default: same as `pruneAfter` (30d). + */ + resetArchiveRetention?: string | number | false; + /** + * Optional per-agent sessions-directory disk budget (e.g. "500mb"). + * When exceeded, warn (mode=warn) or enforce oldest-first cleanup (mode=enforce). + */ + maxDiskBytes?: number | string; + /** + * Target size after disk-budget cleanup (high-water mark), e.g. "400mb". + * Default: 80% of maxDiskBytes. + */ + highWaterBytes?: number | string; }; export type LoggingConfig = { diff --git a/src/config/types.cron.ts b/src/config/types.cron.ts index 45a8b715103..300e0c2ceef 100644 --- a/src/config/types.cron.ts +++ b/src/config/types.cron.ts @@ -15,4 +15,12 @@ export type CronConfig = { * Default: "24h". */ sessionRetention?: string | false; + /** + * Run-log pruning controls for `cron/runs/.jsonl`. + * Defaults: `maxBytes=2_000_000`, `keepLines=2000`. + */ + runLog?: { + maxBytes?: number | string; + keepLines?: number; + }; }; diff --git a/src/config/zod-schema.cron-retention.test.ts b/src/config/zod-schema.cron-retention.test.ts new file mode 100644 index 00000000000..a3733872956 --- /dev/null +++ b/src/config/zod-schema.cron-retention.test.ts @@ -0,0 +1,40 @@ +import { describe, expect, it } from "vitest"; +import { OpenClawSchema } from "./zod-schema.js"; + +describe("OpenClawSchema cron retention and run-log validation", () => { + it("accepts valid cron.sessionRetention and runLog values", () => { + expect(() => + OpenClawSchema.parse({ + cron: { + sessionRetention: "1h30m", + runLog: { + maxBytes: "5mb", + keepLines: 2500, + }, + }, + }), + ).not.toThrow(); + }); + + it("rejects invalid cron.sessionRetention", () => { + expect(() => + OpenClawSchema.parse({ + cron: { + sessionRetention: "abc", + }, + }), + ).toThrow(/sessionRetention|duration/i); + }); + + it("rejects invalid cron.runLog.maxBytes", () => { + expect(() => + OpenClawSchema.parse({ + cron: { + runLog: { + maxBytes: "wat", + }, + }, + }), + ).toThrow(/runLog|maxBytes|size/i); + }); +}); diff --git a/src/config/zod-schema.session-maintenance-extensions.test.ts b/src/config/zod-schema.session-maintenance-extensions.test.ts new file mode 100644 index 00000000000..6efe8b39907 --- /dev/null +++ b/src/config/zod-schema.session-maintenance-extensions.test.ts @@ -0,0 +1,44 @@ +import { describe, expect, it } from "vitest"; +import { SessionSchema } from "./zod-schema.session.js"; + +describe("SessionSchema maintenance extensions", () => { + it("accepts valid maintenance extensions", () => { + expect(() => + SessionSchema.parse({ + maintenance: { + resetArchiveRetention: "14d", + maxDiskBytes: "500mb", + highWaterBytes: "350mb", + }, + }), + ).not.toThrow(); + }); + + it("accepts disabling reset archive cleanup", () => { + expect(() => + SessionSchema.parse({ + maintenance: { + resetArchiveRetention: false, + }, + }), + ).not.toThrow(); + }); + + it("rejects invalid maintenance extension values", () => { + expect(() => + SessionSchema.parse({ + maintenance: { + resetArchiveRetention: "never", + }, + }), + ).toThrow(/resetArchiveRetention|duration/i); + + expect(() => + SessionSchema.parse({ + maintenance: { + maxDiskBytes: "big", + }, + }), + ).toThrow(/maxDiskBytes|size/i); + }); +}); diff --git a/src/config/zod-schema.session.ts b/src/config/zod-schema.session.ts index 0f38fafd887..5af707b2804 100644 --- a/src/config/zod-schema.session.ts +++ b/src/config/zod-schema.session.ts @@ -75,6 +75,9 @@ export const SessionSchema = z pruneDays: z.number().int().positive().optional(), maxEntries: z.number().int().positive().optional(), rotateBytes: z.union([z.string(), z.number()]).optional(), + resetArchiveRetention: z.union([z.string(), z.number(), z.literal(false)]).optional(), + maxDiskBytes: z.union([z.string(), z.number()]).optional(), + highWaterBytes: z.union([z.string(), z.number()]).optional(), }) .strict() .superRefine((val, ctx) => { @@ -100,6 +103,39 @@ export const SessionSchema = z }); } } + if (val.resetArchiveRetention !== undefined && val.resetArchiveRetention !== false) { + try { + parseDurationMs(String(val.resetArchiveRetention).trim(), { defaultUnit: "d" }); + } catch { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + path: ["resetArchiveRetention"], + message: "invalid duration (use ms, s, m, h, d)", + }); + } + } + if (val.maxDiskBytes !== undefined) { + try { + parseByteSize(String(val.maxDiskBytes).trim(), { defaultUnit: "b" }); + } catch { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + path: ["maxDiskBytes"], + message: "invalid size (use b, kb, mb, gb, tb)", + }); + } + } + if (val.highWaterBytes !== undefined) { + try { + parseByteSize(String(val.highWaterBytes).trim(), { defaultUnit: "b" }); + } catch { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + path: ["highWaterBytes"], + message: "invalid size (use b, kb, mb, gb, tb)", + }); + } + } }) .optional(), }) diff --git a/src/config/zod-schema.ts b/src/config/zod-schema.ts index 8c5db8696c9..d29ea965308 100644 --- a/src/config/zod-schema.ts +++ b/src/config/zod-schema.ts @@ -1,4 +1,6 @@ import { z } from "zod"; +import { parseByteSize } from "../cli/parse-bytes.js"; +import { parseDurationMs } from "../cli/parse-duration.js"; import { ToolsSchema } from "./zod-schema.agent-runtime.js"; import { AgentsSchema, AudioSchema, BindingsSchema, BroadcastSchema } from "./zod-schema.agents.js"; import { ApprovalsSchema } from "./zod-schema.approvals.js"; @@ -324,8 +326,39 @@ export const OpenClawSchema = z webhook: HttpUrlSchema.optional(), webhookToken: z.string().optional().register(sensitive), sessionRetention: z.union([z.string(), z.literal(false)]).optional(), + runLog: z + .object({ + maxBytes: z.union([z.string(), z.number()]).optional(), + keepLines: z.number().int().positive().optional(), + }) + .strict() + .optional(), }) .strict() + .superRefine((val, ctx) => { + if (val.sessionRetention !== undefined && val.sessionRetention !== false) { + try { + parseDurationMs(String(val.sessionRetention).trim(), { defaultUnit: "h" }); + } catch { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + path: ["sessionRetention"], + message: "invalid duration (use ms, s, m, h, d)", + }); + } + } + if (val.runLog?.maxBytes !== undefined) { + try { + parseByteSize(String(val.runLog.maxBytes).trim(), { defaultUnit: "b" }); + } catch { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + path: ["runLog", "maxBytes"], + message: "invalid size (use b, kb, mb, gb, tb)", + }); + } + } + }) .optional(), hooks: z .object({ diff --git a/src/cron/run-log.test.ts b/src/cron/run-log.test.ts index 45c3b75b0df..f4eba5fe519 100644 --- a/src/cron/run-log.test.ts +++ b/src/cron/run-log.test.ts @@ -4,12 +4,40 @@ import path from "node:path"; import { describe, expect, it } from "vitest"; import { appendCronRunLog, + DEFAULT_CRON_RUN_LOG_KEEP_LINES, + DEFAULT_CRON_RUN_LOG_MAX_BYTES, getPendingCronRunLogWriteCountForTests, readCronRunLogEntries, + resolveCronRunLogPruneOptions, resolveCronRunLogPath, } from "./run-log.js"; describe("cron run log", () => { + it("resolves prune options from config with defaults", () => { + expect(resolveCronRunLogPruneOptions()).toEqual({ + maxBytes: DEFAULT_CRON_RUN_LOG_MAX_BYTES, + keepLines: DEFAULT_CRON_RUN_LOG_KEEP_LINES, + }); + expect( + resolveCronRunLogPruneOptions({ + maxBytes: "5mb", + keepLines: 123, + }), + ).toEqual({ + maxBytes: 5 * 1024 * 1024, + keepLines: 123, + }); + expect( + resolveCronRunLogPruneOptions({ + maxBytes: "invalid", + keepLines: -1, + }), + ).toEqual({ + maxBytes: DEFAULT_CRON_RUN_LOG_MAX_BYTES, + keepLines: DEFAULT_CRON_RUN_LOG_KEEP_LINES, + }); + }); + async function withRunLogDir(prefix: string, run: (dir: string) => Promise) { const dir = await fs.mkdtemp(path.join(os.tmpdir(), prefix)); try { diff --git a/src/cron/run-log.ts b/src/cron/run-log.ts index 6b3240b58c6..44f36446a1a 100644 --- a/src/cron/run-log.ts +++ b/src/cron/run-log.ts @@ -1,5 +1,7 @@ import fs from "node:fs/promises"; import path from "node:path"; +import { parseByteSize } from "../cli/parse-bytes.js"; +import type { CronConfig } from "../config/types.cron.js"; import type { CronDeliveryStatus, CronRunStatus, CronRunTelemetry } from "./types.js"; export type CronRunLogEntry = { @@ -73,6 +75,30 @@ export function resolveCronRunLogPath(params: { storePath: string; jobId: string const writesByPath = new Map>(); +export const DEFAULT_CRON_RUN_LOG_MAX_BYTES = 2_000_000; +export const DEFAULT_CRON_RUN_LOG_KEEP_LINES = 2_000; + +export function resolveCronRunLogPruneOptions(cfg?: CronConfig["runLog"]): { + maxBytes: number; + keepLines: number; +} { + let maxBytes = DEFAULT_CRON_RUN_LOG_MAX_BYTES; + if (cfg?.maxBytes !== undefined) { + try { + maxBytes = parseByteSize(String(cfg.maxBytes).trim(), { defaultUnit: "b" }); + } catch { + maxBytes = DEFAULT_CRON_RUN_LOG_MAX_BYTES; + } + } + + let keepLines = DEFAULT_CRON_RUN_LOG_KEEP_LINES; + if (typeof cfg?.keepLines === "number" && Number.isFinite(cfg.keepLines) && cfg.keepLines > 0) { + keepLines = Math.floor(cfg.keepLines); + } + + return { maxBytes, keepLines }; +} + export function getPendingCronRunLogWriteCountForTests() { return writesByPath.size; } @@ -108,8 +134,8 @@ export async function appendCronRunLog( await fs.mkdir(path.dirname(resolved), { recursive: true }); await fs.appendFile(resolved, `${JSON.stringify(entry)}\n`, "utf-8"); await pruneIfNeeded(resolved, { - maxBytes: opts?.maxBytes ?? 2_000_000, - keepLines: opts?.keepLines ?? 2_000, + maxBytes: opts?.maxBytes ?? DEFAULT_CRON_RUN_LOG_MAX_BYTES, + keepLines: opts?.keepLines ?? DEFAULT_CRON_RUN_LOG_KEEP_LINES, }); }); writesByPath.set(resolved, next); diff --git a/src/cron/session-reaper.test.ts b/src/cron/session-reaper.test.ts index 0da7cffff95..8797e54d672 100644 --- a/src/cron/session-reaper.test.ts +++ b/src/cron/session-reaper.test.ts @@ -109,6 +109,61 @@ describe("sweepCronRunSessions", () => { expect(updated["agent:main:telegram:dm:123"]).toBeDefined(); }); + it("archives transcript files for pruned run sessions that are no longer referenced", async () => { + const now = Date.now(); + const runSessionId = "old-run"; + const runTranscript = path.join(tmpDir, `${runSessionId}.jsonl`); + fs.writeFileSync(runTranscript, '{"type":"session"}\n'); + const store: Record = { + "agent:main:cron:job1:run:old-run": { + sessionId: runSessionId, + updatedAt: now - 25 * 3_600_000, + }, + }; + fs.writeFileSync(storePath, JSON.stringify(store)); + + const result = await sweepCronRunSessions({ + sessionStorePath: storePath, + nowMs: now, + log, + force: true, + }); + + expect(result.pruned).toBe(1); + expect(fs.existsSync(runTranscript)).toBe(false); + const files = fs.readdirSync(tmpDir); + expect(files.some((name) => name.startsWith(`${runSessionId}.jsonl.deleted.`))).toBe(true); + }); + + it("does not archive external transcript paths for pruned runs", async () => { + const now = Date.now(); + const externalDir = fs.mkdtempSync(path.join(os.tmpdir(), "cron-reaper-external-")); + const externalTranscript = path.join(externalDir, "outside.jsonl"); + fs.writeFileSync(externalTranscript, '{"type":"session"}\n'); + const store: Record = { + "agent:main:cron:job1:run:old-run": { + sessionId: "old-run", + sessionFile: externalTranscript, + updatedAt: now - 25 * 3_600_000, + }, + }; + fs.writeFileSync(storePath, JSON.stringify(store)); + + try { + const result = await sweepCronRunSessions({ + sessionStorePath: storePath, + nowMs: now, + log, + force: true, + }); + + expect(result.pruned).toBe(1); + expect(fs.existsSync(externalTranscript)).toBe(true); + } finally { + fs.rmSync(externalDir, { recursive: true, force: true }); + } + }); + it("respects custom retention", async () => { const now = Date.now(); const store: Record = { diff --git a/src/cron/session-reaper.ts b/src/cron/session-reaper.ts index c42236d3645..fa12caa2f56 100644 --- a/src/cron/session-reaper.ts +++ b/src/cron/session-reaper.ts @@ -6,9 +6,14 @@ * run records. The base session (`...:cron:`) is kept as-is. */ +import path from "node:path"; import { parseDurationMs } from "../cli/parse-duration.js"; -import { updateSessionStore } from "../config/sessions.js"; +import { loadSessionStore, updateSessionStore } from "../config/sessions.js"; import type { CronConfig } from "../config/types.cron.js"; +import { + archiveSessionTranscripts, + cleanupArchivedSessionTranscripts, +} from "../gateway/session-utils.fs.js"; import { isCronRunSessionKey } from "../sessions/session-key-utils.js"; import type { Logger } from "./service/state.js"; @@ -74,6 +79,7 @@ export async function sweepCronRunSessions(params: { } let pruned = 0; + const prunedSessions = new Map(); try { await updateSessionStore(storePath, (store) => { const cutoff = now - retentionMs; @@ -87,6 +93,9 @@ export async function sweepCronRunSessions(params: { } const updatedAt = entry.updatedAt ?? 0; if (updatedAt < cutoff) { + if (!prunedSessions.has(entry.sessionId) || entry.sessionFile) { + prunedSessions.set(entry.sessionId, entry.sessionFile); + } delete store[key]; pruned++; } @@ -99,6 +108,43 @@ export async function sweepCronRunSessions(params: { lastSweepAtMsByStore.set(storePath, now); + if (prunedSessions.size > 0) { + try { + const store = loadSessionStore(storePath, { skipCache: true }); + const referencedSessionIds = new Set( + Object.values(store) + .map((entry) => entry?.sessionId) + .filter((id): id is string => Boolean(id)), + ); + const archivedDirs = new Set(); + for (const [sessionId, sessionFile] of prunedSessions) { + if (referencedSessionIds.has(sessionId)) { + continue; + } + const archived = archiveSessionTranscripts({ + sessionId, + storePath, + sessionFile, + reason: "deleted", + restrictToStoreDir: true, + }); + for (const archivedPath of archived) { + archivedDirs.add(path.dirname(archivedPath)); + } + } + if (archivedDirs.size > 0) { + await cleanupArchivedSessionTranscripts({ + directories: [...archivedDirs], + olderThanMs: retentionMs, + reason: "deleted", + nowMs: now, + }); + } + } catch (err) { + params.log.warn({ err: String(err) }, "cron-reaper: transcript cleanup failed"); + } + } + if (pruned > 0) { params.log.info( { pruned, retentionMs }, diff --git a/src/gateway/server-cron.ts b/src/gateway/server-cron.ts index be6f63ed134..c97d90b99f3 100644 --- a/src/gateway/server-cron.ts +++ b/src/gateway/server-cron.ts @@ -8,7 +8,11 @@ import { } from "../config/sessions.js"; import { resolveStorePath } from "../config/sessions/paths.js"; import { runCronIsolatedAgentTurn } from "../cron/isolated-agent.js"; -import { appendCronRunLog, resolveCronRunLogPath } from "../cron/run-log.js"; +import { + appendCronRunLog, + resolveCronRunLogPath, + resolveCronRunLogPruneOptions, +} from "../cron/run-log.js"; import { CronService } from "../cron/service.js"; import { resolveCronStorePath } from "../cron/store.js"; import { normalizeHttpWebhookUrl } from "../cron/webhook-url.js"; @@ -144,6 +148,7 @@ export function buildGatewayCronService(params: { }; const defaultAgentId = resolveDefaultAgentId(params.cfg); + const runLogPrune = resolveCronRunLogPruneOptions(params.cfg.cron?.runLog); const resolveSessionStorePath = (agentId?: string) => resolveStorePath(params.cfg.session?.store, { agentId: agentId ?? defaultAgentId, @@ -289,25 +294,29 @@ export function buildGatewayCronService(params: { storePath, jobId: evt.jobId, }); - void appendCronRunLog(logPath, { - ts: Date.now(), - jobId: evt.jobId, - action: "finished", - status: evt.status, - error: evt.error, - summary: evt.summary, - delivered: evt.delivered, - deliveryStatus: evt.deliveryStatus, - deliveryError: evt.deliveryError, - sessionId: evt.sessionId, - sessionKey: evt.sessionKey, - runAtMs: evt.runAtMs, - durationMs: evt.durationMs, - nextRunAtMs: evt.nextRunAtMs, - model: evt.model, - provider: evt.provider, - usage: evt.usage, - }).catch((err) => { + void appendCronRunLog( + logPath, + { + ts: Date.now(), + jobId: evt.jobId, + action: "finished", + status: evt.status, + error: evt.error, + summary: evt.summary, + delivered: evt.delivered, + deliveryStatus: evt.deliveryStatus, + deliveryError: evt.deliveryError, + sessionId: evt.sessionId, + sessionKey: evt.sessionKey, + runAtMs: evt.runAtMs, + durationMs: evt.durationMs, + nextRunAtMs: evt.nextRunAtMs, + model: evt.model, + provider: evt.provider, + usage: evt.usage, + }, + runLogPrune, + ).catch((err) => { cronLogger.warn({ err: String(err), logPath }, "cron: run log append failed"); }); } diff --git a/src/gateway/session-utils.fs.ts b/src/gateway/session-utils.fs.ts index 6aa0308eccf..acf14859469 100644 --- a/src/gateway/session-utils.fs.ts +++ b/src/gateway/session-utils.fs.ts @@ -2,6 +2,9 @@ import fs from "node:fs"; import os from "node:os"; import path from "node:path"; import { + formatSessionArchiveTimestamp, + parseSessionArchiveTimestamp, + type SessionArchiveReason, resolveSessionFilePath, resolveSessionTranscriptPath, resolveSessionTranscriptPathInDir, @@ -159,10 +162,10 @@ export function resolveSessionTranscriptCandidates( return Array.from(new Set(candidates)); } -export type ArchiveFileReason = "bak" | "reset" | "deleted"; +export type ArchiveFileReason = SessionArchiveReason; export function archiveFileOnDisk(filePath: string, reason: ArchiveFileReason): string { - const ts = new Date().toISOString().replaceAll(":", "-"); + const ts = formatSessionArchiveTimestamp(); const archived = `${filePath}.${reason}.${ts}`; fs.renameSync(filePath, archived); return archived; @@ -178,14 +181,27 @@ export function archiveSessionTranscripts(opts: { sessionFile?: string; agentId?: string; reason: "reset" | "deleted"; + /** + * When true, only archive files resolved under the session store directory. + * This prevents maintenance operations from mutating paths outside the agent sessions dir. + */ + restrictToStoreDir?: boolean; }): string[] { const archived: string[] = []; + const storeDir = + opts.restrictToStoreDir && opts.storePath ? path.resolve(path.dirname(opts.storePath)) : null; for (const candidate of resolveSessionTranscriptCandidates( opts.sessionId, opts.storePath, opts.sessionFile, opts.agentId, )) { + if (storeDir) { + const relative = path.relative(storeDir, path.resolve(candidate)); + if (!relative || relative.startsWith("..") || path.isAbsolute(relative)) { + continue; + } + } if (!fs.existsSync(candidate)) { continue; } @@ -198,32 +214,10 @@ export function archiveSessionTranscripts(opts: { return archived; } -function restoreArchiveTimestamp(raw: string): string { - const [datePart, timePart] = raw.split("T"); - if (!datePart || !timePart) { - return raw; - } - return `${datePart}T${timePart.replace(/-/g, ":")}`; -} - -function parseArchivedTimestamp(fileName: string, reason: ArchiveFileReason): number | null { - const marker = `.${reason}.`; - const index = fileName.lastIndexOf(marker); - if (index < 0) { - return null; - } - const raw = fileName.slice(index + marker.length); - if (!raw) { - return null; - } - const timestamp = Date.parse(restoreArchiveTimestamp(raw)); - return Number.isNaN(timestamp) ? null : timestamp; -} - export async function cleanupArchivedSessionTranscripts(opts: { directories: string[]; olderThanMs: number; - reason?: "deleted"; + reason?: ArchiveFileReason; nowMs?: number; }): Promise<{ removed: number; scanned: number }> { if (!Number.isFinite(opts.olderThanMs) || opts.olderThanMs < 0) { @@ -238,7 +232,7 @@ export async function cleanupArchivedSessionTranscripts(opts: { for (const dir of directories) { const entries = await fs.promises.readdir(dir).catch(() => []); for (const entry of entries) { - const timestamp = parseArchivedTimestamp(entry, reason); + const timestamp = parseSessionArchiveTimestamp(entry, reason); if (timestamp == null) { continue; }