refactor: split session store loader from maintenance

This commit is contained in:
Shakker 2026-04-01 17:13:53 +01:00 committed by Shakker
parent 883a35a38c
commit c2e93c76bd
6 changed files with 155 additions and 145 deletions

View File

@ -0,0 +1,136 @@
import fs from "node:fs";
import {
normalizeSessionDeliveryFields,
type DeliveryContext,
} from "../../utils/delivery-context.js";
import { getFileStatSnapshot } from "../cache-utils.js";
import {
isSessionStoreCacheEnabled,
readSessionStoreCache,
setSerializedSessionStore,
writeSessionStoreCache,
} from "./store-cache.js";
import { applySessionStoreMigrations } from "./store-migrations.js";
import { normalizeSessionRuntimeModelFields, type SessionEntry } from "./types.js";
export type LoadSessionStoreOptions = {
skipCache?: boolean;
};
function isSessionStoreRecord(value: unknown): value is Record<string, SessionEntry> {
return !!value && typeof value === "object" && !Array.isArray(value);
}
function normalizeSessionEntryDelivery(entry: SessionEntry): SessionEntry {
const normalized = normalizeSessionDeliveryFields({
channel: entry.channel,
lastChannel: entry.lastChannel,
lastTo: entry.lastTo,
lastAccountId: entry.lastAccountId,
lastThreadId: entry.lastThreadId ?? entry.deliveryContext?.threadId ?? entry.origin?.threadId,
deliveryContext: entry.deliveryContext,
});
const nextDelivery = normalized.deliveryContext;
const sameDelivery =
(entry.deliveryContext?.channel ?? undefined) === nextDelivery?.channel &&
(entry.deliveryContext?.to ?? undefined) === nextDelivery?.to &&
(entry.deliveryContext?.accountId ?? undefined) === nextDelivery?.accountId &&
(entry.deliveryContext?.threadId ?? undefined) === nextDelivery?.threadId;
const sameLast =
entry.lastChannel === normalized.lastChannel &&
entry.lastTo === normalized.lastTo &&
entry.lastAccountId === normalized.lastAccountId &&
entry.lastThreadId === normalized.lastThreadId;
if (sameDelivery && sameLast) {
return entry;
}
return {
...entry,
deliveryContext: nextDelivery,
lastChannel: normalized.lastChannel,
lastTo: normalized.lastTo,
lastAccountId: normalized.lastAccountId,
lastThreadId: normalized.lastThreadId,
};
}
export function normalizeSessionStore(store: Record<string, SessionEntry>): void {
for (const [key, entry] of Object.entries(store)) {
if (!entry) {
continue;
}
const normalized = normalizeSessionEntryDelivery(normalizeSessionRuntimeModelFields(entry));
if (normalized !== entry) {
store[key] = normalized;
}
}
}
export function loadSessionStore(
storePath: string,
opts: LoadSessionStoreOptions = {},
): Record<string, SessionEntry> {
if (!opts.skipCache && isSessionStoreCacheEnabled()) {
const currentFileStat = getFileStatSnapshot(storePath);
const cached = readSessionStoreCache({
storePath,
mtimeMs: currentFileStat?.mtimeMs,
sizeBytes: currentFileStat?.sizeBytes,
});
if (cached) {
return cached;
}
}
// Retry a few times on Windows because readers can briefly observe empty or
// transiently invalid content while another process is swapping the file.
let store: Record<string, SessionEntry> = {};
let fileStat = getFileStatSnapshot(storePath);
let mtimeMs = fileStat?.mtimeMs;
let serializedFromDisk: string | undefined;
const maxReadAttempts = process.platform === "win32" ? 3 : 1;
const retryBuf = maxReadAttempts > 1 ? new Int32Array(new SharedArrayBuffer(4)) : undefined;
for (let attempt = 0; attempt < maxReadAttempts; attempt += 1) {
try {
const raw = fs.readFileSync(storePath, "utf-8");
if (raw.length === 0 && attempt < maxReadAttempts - 1) {
Atomics.wait(retryBuf!, 0, 0, 50);
continue;
}
const parsed = JSON.parse(raw);
if (isSessionStoreRecord(parsed)) {
store = parsed;
serializedFromDisk = raw;
}
fileStat = getFileStatSnapshot(storePath) ?? fileStat;
mtimeMs = fileStat?.mtimeMs;
break;
} catch {
if (attempt < maxReadAttempts - 1) {
Atomics.wait(retryBuf!, 0, 0, 50);
continue;
}
}
}
if (serializedFromDisk !== undefined) {
setSerializedSessionStore(storePath, serializedFromDisk);
} else {
setSerializedSessionStore(storePath, undefined);
}
applySessionStoreMigrations(store);
normalizeSessionStore(store);
if (!opts.skipCache && isSessionStoreCacheEnabled()) {
writeSessionStoreCache({
storePath,
store,
mtimeMs,
sizeBytes: fileStat?.sizeBytes,
serialized: serializedFromDisk,
});
}
return structuredClone(store);
}

View File

@ -21,10 +21,10 @@ import {
dropSessionStoreObjectCache,
getSerializedSessionStore,
isSessionStoreCacheEnabled,
readSessionStoreCache,
setSerializedSessionStore,
writeSessionStoreCache,
} from "./store-cache.js";
import { loadSessionStore, normalizeSessionStore } from "./store-load.js";
import {
clearSessionStoreCacheForTest,
drainSessionStoreLockQueuesForTest,
@ -42,11 +42,9 @@ import {
type ResolvedSessionMaintenanceConfig,
type SessionMaintenanceWarning,
} from "./store-maintenance.js";
import { applySessionStoreMigrations } from "./store-migrations.js";
import {
mergeSessionEntry,
mergeSessionEntryPreserveActivity,
normalizeSessionRuntimeModelFields,
type SessionEntry,
} from "./types.js";
@ -55,6 +53,7 @@ export {
drainSessionStoreLockQueuesForTest,
getSessionStoreLockQueueSizeForTest,
} from "./store-lock-state.js";
export { loadSessionStore } from "./store-load.js";
const log = createSubsystemLogger("sessions/store");
let sessionArchiveRuntimePromise: Promise<
@ -67,43 +66,6 @@ function loadSessionArchiveRuntime() {
return sessionArchiveRuntimePromise;
}
function isSessionStoreRecord(value: unknown): value is Record<string, SessionEntry> {
return !!value && typeof value === "object" && !Array.isArray(value);
}
function normalizeSessionEntryDelivery(entry: SessionEntry): SessionEntry {
const normalized = normalizeSessionDeliveryFields({
channel: entry.channel,
lastChannel: entry.lastChannel,
lastTo: entry.lastTo,
lastAccountId: entry.lastAccountId,
lastThreadId: entry.lastThreadId ?? entry.deliveryContext?.threadId ?? entry.origin?.threadId,
deliveryContext: entry.deliveryContext,
});
const nextDelivery = normalized.deliveryContext;
const sameDelivery =
(entry.deliveryContext?.channel ?? undefined) === nextDelivery?.channel &&
(entry.deliveryContext?.to ?? undefined) === nextDelivery?.to &&
(entry.deliveryContext?.accountId ?? undefined) === nextDelivery?.accountId &&
(entry.deliveryContext?.threadId ?? undefined) === nextDelivery?.threadId;
const sameLast =
entry.lastChannel === normalized.lastChannel &&
entry.lastTo === normalized.lastTo &&
entry.lastAccountId === normalized.lastAccountId &&
entry.lastThreadId === normalized.lastThreadId;
if (sameDelivery && sameLast) {
return entry;
}
return {
...entry,
deliveryContext: nextDelivery,
lastChannel: normalized.lastChannel,
lastTo: normalized.lastTo,
lastAccountId: normalized.lastAccountId,
lastThreadId: normalized.lastThreadId,
};
}
function removeThreadFromDeliveryContext(context?: DeliveryContext): DeliveryContext | undefined {
if (!context || context.threadId == null) {
return context;
@ -158,18 +120,6 @@ export function resolveSessionStoreEntry(params: {
};
}
function normalizeSessionStore(store: Record<string, SessionEntry>): void {
for (const [key, entry] of Object.entries(store)) {
if (!entry) {
continue;
}
const normalized = normalizeSessionEntryDelivery(normalizeSessionRuntimeModelFields(entry));
if (normalized !== entry) {
store[key] = normalized;
}
}
}
export function setSessionWriteLockAcquirerForTests(
acquirer: typeof acquireSessionWriteLock | null,
): void {
@ -188,86 +138,6 @@ export async function withSessionStoreLockForTest<T>(
return await withSessionStoreLock(storePath, fn, opts);
}
type LoadSessionStoreOptions = {
skipCache?: boolean;
};
export function loadSessionStore(
storePath: string,
opts: LoadSessionStoreOptions = {},
): Record<string, SessionEntry> {
// Check cache first if enabled
if (!opts.skipCache && isSessionStoreCacheEnabled()) {
const currentFileStat = getFileStatSnapshot(storePath);
const cached = readSessionStoreCache({
storePath,
mtimeMs: currentFileStat?.mtimeMs,
sizeBytes: currentFileStat?.sizeBytes,
});
if (cached) {
return cached;
}
}
// Cache miss or disabled - load from disk.
// Retry up to 3 times when the file is empty or unparseable. On Windows the
// temp-file + rename write is not fully atomic: a concurrent reader can briefly
// observe a 0-byte file (between truncate and write) or a stale/locked state.
// A short synchronous backoff (50 ms via `Atomics.wait`) is enough for the
// writer to finish.
let store: Record<string, SessionEntry> = {};
let fileStat = getFileStatSnapshot(storePath);
let mtimeMs = fileStat?.mtimeMs;
let serializedFromDisk: string | undefined;
const maxReadAttempts = process.platform === "win32" ? 3 : 1;
const retryBuf = maxReadAttempts > 1 ? new Int32Array(new SharedArrayBuffer(4)) : undefined;
for (let attempt = 0; attempt < maxReadAttempts; attempt++) {
try {
const raw = fs.readFileSync(storePath, "utf-8");
if (raw.length === 0 && attempt < maxReadAttempts - 1) {
// File is empty — likely caught mid-write; retry after a brief pause.
Atomics.wait(retryBuf!, 0, 0, 50);
continue;
}
const parsed = JSON.parse(raw);
if (isSessionStoreRecord(parsed)) {
store = parsed;
serializedFromDisk = raw;
}
fileStat = getFileStatSnapshot(storePath) ?? fileStat;
mtimeMs = fileStat?.mtimeMs;
break;
} catch {
// File missing, locked, or transiently corrupt — retry on Windows.
if (attempt < maxReadAttempts - 1) {
Atomics.wait(retryBuf!, 0, 0, 50);
continue;
}
// Final attempt failed; proceed with an empty store.
}
}
if (serializedFromDisk !== undefined) {
setSerializedSessionStore(storePath, serializedFromDisk);
} else {
setSerializedSessionStore(storePath, undefined);
}
applySessionStoreMigrations(store);
// Cache the result if caching is enabled
if (!opts.skipCache && isSessionStoreCacheEnabled()) {
writeSessionStoreCache({
storePath,
store,
mtimeMs,
sizeBytes: fileStat?.sizeBytes,
serialized: serializedFromDisk,
});
}
return structuredClone(store);
}
export function readSessionUpdatedAt(params: {
storePath: string;
sessionKey: string;

View File

@ -12,7 +12,7 @@ vi.mock("../../config/sessions/paths.js", () => ({
resolveStorePath: vi.fn().mockReturnValue("/tmp/test-store.json"),
}));
vi.mock("../../config/sessions/store.js", () => ({
vi.mock("../../config/sessions/store-load.js", () => ({
loadSessionStore: vi.fn().mockReturnValue({}),
}));
@ -33,13 +33,13 @@ vi.mock("../../pairing/pairing-store.js", () => ({
const mockedModuleIds = [
"../../config/sessions/main-session.js",
"../../config/sessions/paths.js",
"../../config/sessions/store.js",
"../../config/sessions/store-load.js",
"../../infra/outbound/channel-selection.js",
"../../infra/outbound/target-resolver.js",
"../../pairing/pairing-store.js",
];
import { loadSessionStore } from "../../config/sessions/store.js";
import { loadSessionStore } from "../../config/sessions/store-load.js";
import { resolveMessageChannelSelection } from "../../infra/outbound/channel-selection.js";
import { maybeResolveIdLikeTarget } from "../../infra/outbound/target-resolver.js";
import { readChannelAllowFromStoreSync } from "../../pairing/pairing-store.js";

View File

@ -3,7 +3,7 @@ import type { ChannelId } from "../../channels/plugins/types.js";
import type { OpenClawConfig } from "../../config/config.js";
import { resolveAgentMainSessionKey } from "../../config/sessions/main-session.js";
import { resolveStorePath } from "../../config/sessions/paths.js";
import { loadSessionStore } from "../../config/sessions/store.js";
import { loadSessionStore } from "../../config/sessions/store-load.js";
import { resolveMessageChannelSelection } from "../../infra/outbound/channel-selection.js";
import { maybeResolveIdLikeTarget } from "../../infra/outbound/target-resolver.js";
import type { OutboundChannel } from "../../infra/outbound/targets.js";

View File

@ -1,9 +1,15 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import type { OpenClawConfig } from "../../config/config.js";
vi.mock("../../config/sessions.js", () => ({
vi.mock("../../config/sessions/store-load.js", () => ({
loadSessionStore: vi.fn(),
}));
vi.mock("../../config/sessions/paths.js", () => ({
resolveStorePath: vi.fn().mockReturnValue("/tmp/test-store.json"),
}));
vi.mock("../../config/sessions/reset.js", () => ({
evaluateSessionFreshness: vi.fn().mockReturnValue({ fresh: true }),
resolveSessionResetPolicy: vi.fn().mockReturnValue({ mode: "idle", idleMinutes: 60 }),
}));
@ -18,7 +24,8 @@ vi.mock("../../agents/bootstrap-cache.js", () => ({
}));
import { clearBootstrapSnapshot } from "../../agents/bootstrap-cache.js";
import { loadSessionStore, evaluateSessionFreshness } from "../../config/sessions.js";
import { evaluateSessionFreshness } from "../../config/sessions/reset.js";
import { loadSessionStore } from "../../config/sessions/store-load.js";
import { resolveCronSession } from "./session.js";
const NOW_MS = 1_737_600_000_000;

View File

@ -1,13 +1,10 @@
import crypto from "node:crypto";
import { clearBootstrapSnapshotOnSessionRollover } from "../../agents/bootstrap-cache.js";
import type { OpenClawConfig } from "../../config/config.js";
import {
evaluateSessionFreshness,
loadSessionStore,
resolveSessionResetPolicy,
resolveStorePath,
type SessionEntry,
} from "../../config/sessions.js";
} from "../../config/sessions/reset.js";
import { resolveStorePath } from "../../config/sessions/paths.js";
import { loadSessionStore } from "../../config/sessions/store-load.js";
import type { SessionEntry } from "../../config/sessions/types.js";
export function resolveCronSession(params: {
cfg: OpenClawConfig;