From c6ecd2a0445b6fd8c1bd71f07afeb962db98084d Mon Sep 17 00:00:00 2001 From: Kentaro Kuribayashi Date: Fri, 13 Feb 2026 13:12:59 +0900 Subject: [PATCH] fix: replace file-based session store lock with in-process Promise chain mutex (#14498) * fix: replace file-based session store lock with in-process Promise chain mutex Node.js is single-threaded, so file-based locking (open('wx') + polling + stale eviction) is unnecessary and causes timeouts under heavy session load. Replace with a simple per-storePath Promise chain that serializes access without any filesystem overhead. In a 1159-session environment over 3 hours: - Lock timeouts: 25 - Stuck sessions: 157 (max 1031s, avg 388s) - Slow listeners: 39 (max 265s, avg 70s) Root cause: during sessions.json file I/O, await yields control and other lock requests hit the 10s timeout waiting for the .lock file to be released. * test: add comprehensive tests for Promise chain mutex lock - Concurrent access serialization (10 parallel writers, counter integrity) - Error resilience (single & multiple consecutive throws don't poison queue) - Independent storePath parallelism (different paths run concurrently) - LOCK_QUEUES cleanup after completion and after errors - No .lock file created on disk Also fix: store caught promise in LOCK_QUEUES to avoid unhandled rejection warnings when queued fn() throws. * fix: add timeout to Promise chain mutex to prevent infinite hangs on Windows * fix(session-store): enforce strict queue timeout + cross-process lock --------- Co-authored-by: Peter Steinberger --- src/config/sessions/store.lock.test.ts | 296 +++++++++++++++++++++++++ src/config/sessions/store.ts | 210 +++++++++++++----- 2 files changed, 449 insertions(+), 57 deletions(-) create mode 100644 src/config/sessions/store.lock.test.ts diff --git a/src/config/sessions/store.lock.test.ts b/src/config/sessions/store.lock.test.ts new file mode 100644 index 00000000000..f8a82f7aed5 --- /dev/null +++ b/src/config/sessions/store.lock.test.ts @@ -0,0 +1,296 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { afterEach, describe, expect, it } from "vitest"; +import type { SessionEntry } from "./types.js"; +import { sleep } from "../../utils.js"; +import { + clearSessionStoreCacheForTest, + getSessionStoreLockQueueSizeForTest, + loadSessionStore, + updateSessionStore, + updateSessionStoreEntry, + withSessionStoreLockForTest, +} from "../sessions.js"; + +describe("session store lock (Promise chain mutex)", () => { + let tmpDirs: string[] = []; + + async function makeTmpStore( + initial: Record = {}, + ): Promise<{ dir: string; storePath: string }> { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-lock-test-")); + tmpDirs.push(dir); + const storePath = path.join(dir, "sessions.json"); + if (Object.keys(initial).length > 0) { + await fs.writeFile(storePath, JSON.stringify(initial, null, 2), "utf-8"); + } + return { dir, storePath }; + } + + afterEach(async () => { + clearSessionStoreCacheForTest(); + for (const dir of tmpDirs) { + await fs.rm(dir, { recursive: true, force: true }).catch(() => undefined); + } + tmpDirs = []; + }); + + // ── 1. Concurrent access does not corrupt data ────────────────────── + + it("serializes concurrent updateSessionStore calls without data loss", async () => { + const key = "agent:main:test"; + const { storePath } = await makeTmpStore({ + [key]: { sessionId: "s1", updatedAt: 100, counter: 0 }, + }); + + // Launch 10 concurrent read-modify-write cycles. + const N = 10; + await Promise.all( + Array.from({ length: N }, (_, i) => + updateSessionStore(storePath, async (store) => { + const entry = store[key] as Record; + // Simulate async work so that without proper serialization + // multiple readers would see the same stale value. + await sleep(Math.random() * 20); + entry.counter = (entry.counter as number) + 1; + entry.tag = `writer-${i}`; + }), + ), + ); + + const store = loadSessionStore(storePath); + expect((store[key] as Record).counter).toBe(N); + }); + + it("concurrent updateSessionStoreEntry patches all merge correctly", async () => { + const key = "agent:main:merge"; + const { storePath } = await makeTmpStore({ + [key]: { sessionId: "s1", updatedAt: 100 }, + }); + + await Promise.all([ + updateSessionStoreEntry({ + storePath, + sessionKey: key, + update: async () => { + await sleep(30); + return { modelOverride: "model-a" }; + }, + }), + updateSessionStoreEntry({ + storePath, + sessionKey: key, + update: async () => { + await sleep(10); + return { thinkingLevel: "high" as const }; + }, + }), + updateSessionStoreEntry({ + storePath, + sessionKey: key, + update: async () => { + await sleep(20); + return { systemPromptOverride: "custom" }; + }, + }), + ]); + + const store = loadSessionStore(storePath); + const entry = store[key]; + expect(entry.modelOverride).toBe("model-a"); + expect(entry.thinkingLevel).toBe("high"); + expect(entry.systemPromptOverride).toBe("custom"); + }); + + // ── 2. Error in fn() does not break queue ─────────────────────────── + + it("continues processing queued tasks after a preceding task throws", async () => { + const key = "agent:main:err"; + const { storePath } = await makeTmpStore({ + [key]: { sessionId: "s1", updatedAt: 100 }, + }); + + const errorPromise = updateSessionStore(storePath, async () => { + throw new Error("boom"); + }); + + // Queue a second write immediately after the failing one. + const successPromise = updateSessionStore(storePath, async (store) => { + store[key] = { ...store[key], modelOverride: "after-error" } as unknown as SessionEntry; + }); + + await expect(errorPromise).rejects.toThrow("boom"); + await successPromise; // must resolve, not hang or reject + + const store = loadSessionStore(storePath); + expect(store[key]?.modelOverride).toBe("after-error"); + }); + + it("multiple consecutive errors do not permanently poison the queue", async () => { + const key = "agent:main:multi-err"; + const { storePath } = await makeTmpStore({ + [key]: { sessionId: "s1", updatedAt: 100 }, + }); + + const errors = Array.from({ length: 3 }, (_, i) => + updateSessionStore(storePath, async () => { + throw new Error(`fail-${i}`); + }), + ); + + const success = updateSessionStore(storePath, async (store) => { + store[key] = { ...store[key], modelOverride: "recovered" } as unknown as SessionEntry; + }); + + // All error promises reject. + for (const p of errors) { + await expect(p).rejects.toThrow(); + } + // The trailing write succeeds. + await success; + + const store = loadSessionStore(storePath); + expect(store[key]?.modelOverride).toBe("recovered"); + }); + + // ── 3. Different storePaths run independently / in parallel ───────── + + it("operations on different storePaths execute concurrently", async () => { + const { storePath: pathA } = await makeTmpStore({ + a: { sessionId: "a", updatedAt: 100 }, + }); + const { storePath: pathB } = await makeTmpStore({ + b: { sessionId: "b", updatedAt: 100 }, + }); + + const order: string[] = []; + + const opA = updateSessionStore(pathA, async (store) => { + order.push("a-start"); + await sleep(50); + store.a = { ...store.a, modelOverride: "done-a" } as unknown as SessionEntry; + order.push("a-end"); + }); + + const opB = updateSessionStore(pathB, async (store) => { + order.push("b-start"); + await sleep(10); + store.b = { ...store.b, modelOverride: "done-b" } as unknown as SessionEntry; + order.push("b-end"); + }); + + await Promise.all([opA, opB]); + + // B should finish before A because they run in parallel and B sleeps less. + expect(order.indexOf("b-end")).toBeLessThan(order.indexOf("a-end")); + + expect(loadSessionStore(pathA).a?.modelOverride).toBe("done-a"); + expect(loadSessionStore(pathB).b?.modelOverride).toBe("done-b"); + }); + + // ── 4. LOCK_QUEUES cleanup ───────────────────────────────────────── + + it("cleans up LOCK_QUEUES entry after all tasks complete", async () => { + const { storePath } = await makeTmpStore({ + x: { sessionId: "x", updatedAt: 100 }, + }); + + await updateSessionStore(storePath, async (store) => { + store.x = { ...store.x, modelOverride: "done" } as unknown as SessionEntry; + }); + + // Allow microtask (finally) to run. + await sleep(0); + + expect(getSessionStoreLockQueueSizeForTest()).toBe(0); + }); + + it("cleans up LOCK_QUEUES entry even after errors", async () => { + const { storePath } = await makeTmpStore({}); + + await updateSessionStore(storePath, async () => { + throw new Error("fail"); + }).catch(() => undefined); + + await sleep(0); + + expect(getSessionStoreLockQueueSizeForTest()).toBe(0); + }); + + // ── 5. FIFO order guarantee ────────────────────────────────────────── + + it("executes queued operations in FIFO order", async () => { + const key = "agent:main:fifo"; + const { storePath } = await makeTmpStore({ + [key]: { sessionId: "s1", updatedAt: 100, order: "" }, + }); + + const executionOrder: number[] = []; + + // Queue 5 operations sequentially (no awaiting in between). + const promises = Array.from({ length: 5 }, (_, i) => + updateSessionStore(storePath, async (store) => { + executionOrder.push(i); + const entry = store[key] as Record; + entry.order = ((entry.order as string) || "") + String(i); + }), + ); + + await Promise.all(promises); + + // Execution order must be 0, 1, 2, 3, 4 (FIFO). + expect(executionOrder).toEqual([0, 1, 2, 3, 4]); + + // The store should reflect sequential application. + const store = loadSessionStore(storePath); + expect((store[key] as Record).order).toBe("01234"); + }); + + it("times out queued operations strictly and does not run them later", async () => { + const { storePath } = await makeTmpStore({ + x: { sessionId: "x", updatedAt: 100 }, + }); + let timedOutRan = false; + + const lockHolder = withSessionStoreLockForTest( + storePath, + async () => { + await sleep(80); + }, + { timeoutMs: 2_000 }, + ); + const timedOut = withSessionStoreLockForTest( + storePath, + async () => { + timedOutRan = true; + }, + { timeoutMs: 20 }, + ); + + await expect(timedOut).rejects.toThrow("timeout waiting for session store lock"); + await lockHolder; + await sleep(30); + expect(timedOutRan).toBe(false); + }); + + it("creates and removes lock file while operation runs", async () => { + const key = "agent:main:no-lock-file"; + const { dir, storePath } = await makeTmpStore({ + [key]: { sessionId: "s1", updatedAt: 100 }, + }); + + const write = updateSessionStore(storePath, async (store) => { + await sleep(60); + store[key] = { ...store[key], modelOverride: "v" } as unknown as SessionEntry; + }); + + await sleep(10); + await expect(fs.access(`${storePath}.lock`)).resolves.toBeUndefined(); + await write; + + const files = await fs.readdir(dir); + const lockFiles = files.filter((f) => f.endsWith(".lock")); + expect(lockFiles).toHaveLength(0); + }); +}); diff --git a/src/config/sessions/store.ts b/src/config/sessions/store.ts index c8f790b759e..741763b04aa 100644 --- a/src/config/sessions/store.ts +++ b/src/config/sessions/store.ts @@ -3,6 +3,7 @@ import fs from "node:fs"; import path from "node:path"; import type { MsgContext } from "../../auto-reply/templating.js"; import type { SessionMaintenanceConfig, SessionMaintenanceMode } from "../types.base.js"; +import { acquireSessionWriteLock } from "../../agents/session-write-lock.js"; import { parseByteSize } from "../../cli/parse-bytes.js"; import { parseDurationMs } from "../../cli/parse-duration.js"; import { createSubsystemLogger } from "../../logging/subsystem.js"; @@ -115,6 +116,28 @@ function normalizeSessionStore(store: Record): void { export function clearSessionStoreCacheForTest(): void { SESSION_STORE_CACHE.clear(); + for (const queue of LOCK_QUEUES.values()) { + for (const task of queue.pending) { + task.timedOut = true; + if (task.timer) { + clearTimeout(task.timer); + } + } + } + LOCK_QUEUES.clear(); +} + +/** Expose lock queue size for tests. */ +export function getSessionStoreLockQueueSizeForTest(): number { + return LOCK_QUEUES.size; +} + +export async function withSessionStoreLockForTest( + storePath: string, + fn: () => Promise, + opts: SessionStoreLockOptions = {}, +): Promise { + return await withSessionStoreLock(storePath, fn, opts); } type LoadSessionStoreOptions = { @@ -584,76 +607,149 @@ type SessionStoreLockOptions = { staleMs?: number; }; +type SessionStoreLockTask = { + fn: () => Promise; + resolve: (value: unknown) => void; + reject: (reason: unknown) => void; + timeoutAt?: number; + staleMs: number; + timer?: ReturnType; + started: boolean; + timedOut: boolean; +}; + +type SessionStoreLockQueue = { + running: boolean; + pending: SessionStoreLockTask[]; +}; + +const LOCK_QUEUES = new Map(); + +function lockTimeoutError(storePath: string): Error { + return new Error(`timeout waiting for session store lock: ${storePath}`); +} + +function getOrCreateLockQueue(storePath: string): SessionStoreLockQueue { + const existing = LOCK_QUEUES.get(storePath); + if (existing) { + return existing; + } + const created: SessionStoreLockQueue = { running: false, pending: [] }; + LOCK_QUEUES.set(storePath, created); + return created; +} + +function removePendingTask(queue: SessionStoreLockQueue, task: SessionStoreLockTask): void { + const idx = queue.pending.indexOf(task); + if (idx >= 0) { + queue.pending.splice(idx, 1); + } +} + +async function drainSessionStoreLockQueue(storePath: string): Promise { + const queue = LOCK_QUEUES.get(storePath); + if (!queue || queue.running) { + return; + } + queue.running = true; + try { + while (queue.pending.length > 0) { + const task = queue.pending.shift(); + if (!task || task.timedOut) { + continue; + } + + if (task.timer) { + clearTimeout(task.timer); + } + task.started = true; + + const remainingTimeoutMs = + task.timeoutAt != null + ? Math.max(0, task.timeoutAt - Date.now()) + : Number.POSITIVE_INFINITY; + if (task.timeoutAt != null && remainingTimeoutMs <= 0) { + task.timedOut = true; + task.reject(lockTimeoutError(storePath)); + continue; + } + + let lock: { release: () => Promise } | undefined; + let result: unknown; + let failed: unknown; + let hasFailure = false; + try { + lock = await acquireSessionWriteLock({ + sessionFile: storePath, + timeoutMs: remainingTimeoutMs, + staleMs: task.staleMs, + }); + result = await task.fn(); + } catch (err) { + hasFailure = true; + failed = err; + } finally { + await lock?.release().catch(() => undefined); + } + if (hasFailure) { + task.reject(failed); + continue; + } + task.resolve(result); + } + } finally { + queue.running = false; + if (queue.pending.length === 0) { + LOCK_QUEUES.delete(storePath); + } else { + queueMicrotask(() => { + void drainSessionStoreLockQueue(storePath); + }); + } + } +} + async function withSessionStoreLock( storePath: string, fn: () => Promise, opts: SessionStoreLockOptions = {}, ): Promise { const timeoutMs = opts.timeoutMs ?? 10_000; - const pollIntervalMs = opts.pollIntervalMs ?? 25; const staleMs = opts.staleMs ?? 30_000; - const lockPath = `${storePath}.lock`; - const startedAt = Date.now(); + // `pollIntervalMs` is retained for API compatibility with older lock options. + void opts.pollIntervalMs; - await fs.promises.mkdir(path.dirname(storePath), { recursive: true }); + const hasTimeout = timeoutMs > 0 && Number.isFinite(timeoutMs); + const timeoutAt = hasTimeout ? Date.now() + timeoutMs : undefined; + const queue = getOrCreateLockQueue(storePath); - while (true) { - try { - const handle = await fs.promises.open(lockPath, "wx"); - try { - await handle.writeFile( - JSON.stringify({ pid: process.pid, startedAt: Date.now() }), - "utf-8", - ); - } catch { - // best-effort - } - await handle.close(); - break; - } catch (err) { - const code = - err && typeof err === "object" && "code" in err - ? String((err as { code?: unknown }).code) - : null; - if (code === "ENOENT") { - // Store directory may be deleted/recreated in tests while writes are in-flight. - // Best-effort: recreate the parent dir and retry until timeout. - await fs.promises - .mkdir(path.dirname(storePath), { recursive: true }) - .catch(() => undefined); - await new Promise((r) => setTimeout(r, pollIntervalMs)); - continue; - } - if (code !== "EEXIST") { - throw err; - } + const promise = new Promise((resolve, reject) => { + const task: SessionStoreLockTask = { + fn: async () => await fn(), + resolve: (value) => resolve(value as T), + reject, + timeoutAt, + staleMs, + started: false, + timedOut: false, + }; - const now = Date.now(); - if (now - startedAt > timeoutMs) { - throw new Error(`timeout acquiring session store lock: ${lockPath}`, { cause: err }); - } - - // Best-effort stale lock eviction (e.g. crashed process). - try { - const st = await fs.promises.stat(lockPath); - const ageMs = now - st.mtimeMs; - if (ageMs > staleMs) { - await fs.promises.unlink(lockPath); - continue; + if (hasTimeout) { + task.timer = setTimeout(() => { + if (task.started || task.timedOut) { + return; } - } catch { - // ignore - } - - await new Promise((r) => setTimeout(r, pollIntervalMs)); + task.timedOut = true; + removePendingTask(queue, task); + reject(lockTimeoutError(storePath)); + }, timeoutMs); } - } - try { - return await fn(); - } finally { - await fs.promises.unlink(lockPath).catch(() => undefined); - } + queue.pending.push(task); + void drainSessionStoreLockQueue(storePath); + }); + + return await promise; } export async function updateSessionStoreEntry(params: {