mirror of https://github.com/openclaw/openclaw.git
884 lines
28 KiB
TypeScript
884 lines
28 KiB
TypeScript
import fs from "node:fs";
|
|
import path from "node:path";
|
|
import { acquireSessionWriteLock } from "../../agents/session-write-lock.js";
|
|
import type { MsgContext } from "../../auto-reply/templating.js";
|
|
import {
|
|
archiveSessionTranscripts,
|
|
cleanupArchivedSessionTranscripts,
|
|
} from "../../gateway/session-utils.fs.js";
|
|
import { writeTextAtomic } from "../../infra/json-files.js";
|
|
import { createSubsystemLogger } from "../../logging/subsystem.js";
|
|
import {
|
|
deliveryContextFromSession,
|
|
mergeDeliveryContext,
|
|
normalizeDeliveryContext,
|
|
normalizeSessionDeliveryFields,
|
|
type DeliveryContext,
|
|
} from "../../utils/delivery-context.js";
|
|
import { getFileStatSnapshot, isCacheEnabled, resolveCacheTtlMs } from "../cache-utils.js";
|
|
import { enforceSessionDiskBudget, type SessionDiskBudgetSweepResult } from "./disk-budget.js";
|
|
import { deriveSessionMetaPatch } from "./metadata.js";
|
|
import {
|
|
clearSessionStoreCaches,
|
|
dropSessionStoreObjectCache,
|
|
getSerializedSessionStore,
|
|
readSessionStoreCache,
|
|
setSerializedSessionStore,
|
|
writeSessionStoreCache,
|
|
} from "./store-cache.js";
|
|
import {
|
|
capEntryCount,
|
|
getActiveSessionMaintenanceWarning,
|
|
pruneStaleEntries,
|
|
resolveMaintenanceConfig,
|
|
rotateSessionFile,
|
|
type ResolvedSessionMaintenanceConfig,
|
|
type SessionMaintenanceWarning,
|
|
} from "./store-maintenance.js";
|
|
import { applySessionStoreMigrations } from "./store-migrations.js";
|
|
import {
|
|
mergeSessionEntry,
|
|
mergeSessionEntryPreserveActivity,
|
|
normalizeSessionRuntimeModelFields,
|
|
type SessionEntry,
|
|
} from "./types.js";
|
|
|
|
const log = createSubsystemLogger("sessions/store");
|
|
|
|
// ============================================================================
|
|
// Session Store Cache with TTL Support
|
|
// ============================================================================
|
|
|
|
const DEFAULT_SESSION_STORE_TTL_MS = 45_000; // 45 seconds (between 30-60s)
|
|
|
|
function isSessionStoreRecord(value: unknown): value is Record<string, SessionEntry> {
|
|
return !!value && typeof value === "object" && !Array.isArray(value);
|
|
}
|
|
|
|
function getSessionStoreTtl(): number {
|
|
return resolveCacheTtlMs({
|
|
envValue: process.env.OPENCLAW_SESSION_CACHE_TTL_MS,
|
|
defaultTtlMs: DEFAULT_SESSION_STORE_TTL_MS,
|
|
});
|
|
}
|
|
|
|
function isSessionStoreCacheEnabled(): boolean {
|
|
return isCacheEnabled(getSessionStoreTtl());
|
|
}
|
|
|
|
function normalizeSessionEntryDelivery(entry: SessionEntry): SessionEntry {
|
|
const normalized = normalizeSessionDeliveryFields({
|
|
channel: entry.channel,
|
|
lastChannel: entry.lastChannel,
|
|
lastTo: entry.lastTo,
|
|
lastAccountId: entry.lastAccountId,
|
|
lastThreadId: entry.lastThreadId ?? entry.deliveryContext?.threadId ?? entry.origin?.threadId,
|
|
deliveryContext: entry.deliveryContext,
|
|
});
|
|
const nextDelivery = normalized.deliveryContext;
|
|
const sameDelivery =
|
|
(entry.deliveryContext?.channel ?? undefined) === nextDelivery?.channel &&
|
|
(entry.deliveryContext?.to ?? undefined) === nextDelivery?.to &&
|
|
(entry.deliveryContext?.accountId ?? undefined) === nextDelivery?.accountId &&
|
|
(entry.deliveryContext?.threadId ?? undefined) === nextDelivery?.threadId;
|
|
const sameLast =
|
|
entry.lastChannel === normalized.lastChannel &&
|
|
entry.lastTo === normalized.lastTo &&
|
|
entry.lastAccountId === normalized.lastAccountId &&
|
|
entry.lastThreadId === normalized.lastThreadId;
|
|
if (sameDelivery && sameLast) {
|
|
return entry;
|
|
}
|
|
return {
|
|
...entry,
|
|
deliveryContext: nextDelivery,
|
|
lastChannel: normalized.lastChannel,
|
|
lastTo: normalized.lastTo,
|
|
lastAccountId: normalized.lastAccountId,
|
|
lastThreadId: normalized.lastThreadId,
|
|
};
|
|
}
|
|
|
|
function removeThreadFromDeliveryContext(context?: DeliveryContext): DeliveryContext | undefined {
|
|
if (!context || context.threadId == null) {
|
|
return context;
|
|
}
|
|
const next: DeliveryContext = { ...context };
|
|
delete next.threadId;
|
|
return next;
|
|
}
|
|
|
|
function normalizeStoreSessionKey(sessionKey: string): string {
|
|
return sessionKey.trim().toLowerCase();
|
|
}
|
|
|
|
function resolveStoreSessionEntry(params: {
|
|
store: Record<string, SessionEntry>;
|
|
sessionKey: string;
|
|
}): {
|
|
normalizedKey: string;
|
|
existing: SessionEntry | undefined;
|
|
legacyKeys: string[];
|
|
} {
|
|
const trimmedKey = params.sessionKey.trim();
|
|
const normalizedKey = normalizeStoreSessionKey(trimmedKey);
|
|
const legacyKeySet = new Set<string>();
|
|
if (
|
|
trimmedKey !== normalizedKey &&
|
|
Object.prototype.hasOwnProperty.call(params.store, trimmedKey)
|
|
) {
|
|
legacyKeySet.add(trimmedKey);
|
|
}
|
|
let existing =
|
|
params.store[normalizedKey] ?? (legacyKeySet.size > 0 ? params.store[trimmedKey] : undefined);
|
|
let existingUpdatedAt = existing?.updatedAt ?? 0;
|
|
for (const [candidateKey, candidateEntry] of Object.entries(params.store)) {
|
|
if (candidateKey === normalizedKey) {
|
|
continue;
|
|
}
|
|
if (candidateKey.toLowerCase() !== normalizedKey) {
|
|
continue;
|
|
}
|
|
legacyKeySet.add(candidateKey);
|
|
const candidateUpdatedAt = candidateEntry?.updatedAt ?? 0;
|
|
if (!existing || candidateUpdatedAt > existingUpdatedAt) {
|
|
existing = candidateEntry;
|
|
existingUpdatedAt = candidateUpdatedAt;
|
|
}
|
|
}
|
|
return {
|
|
normalizedKey,
|
|
existing,
|
|
legacyKeys: [...legacyKeySet],
|
|
};
|
|
}
|
|
|
|
function normalizeSessionStore(store: Record<string, SessionEntry>): void {
|
|
for (const [key, entry] of Object.entries(store)) {
|
|
if (!entry) {
|
|
continue;
|
|
}
|
|
const normalized = normalizeSessionEntryDelivery(normalizeSessionRuntimeModelFields(entry));
|
|
if (normalized !== entry) {
|
|
store[key] = normalized;
|
|
}
|
|
}
|
|
}
|
|
|
|
export function clearSessionStoreCacheForTest(): void {
|
|
clearSessionStoreCaches();
|
|
for (const queue of LOCK_QUEUES.values()) {
|
|
for (const task of queue.pending) {
|
|
task.reject(new Error("session store queue cleared for test"));
|
|
}
|
|
}
|
|
LOCK_QUEUES.clear();
|
|
}
|
|
|
|
/** Expose lock queue size for tests. */
|
|
export function getSessionStoreLockQueueSizeForTest(): number {
|
|
return LOCK_QUEUES.size;
|
|
}
|
|
|
|
export async function withSessionStoreLockForTest<T>(
|
|
storePath: string,
|
|
fn: () => Promise<T>,
|
|
opts: SessionStoreLockOptions = {},
|
|
): Promise<T> {
|
|
return await withSessionStoreLock(storePath, fn, opts);
|
|
}
|
|
|
|
type LoadSessionStoreOptions = {
|
|
skipCache?: boolean;
|
|
};
|
|
|
|
export function loadSessionStore(
|
|
storePath: string,
|
|
opts: LoadSessionStoreOptions = {},
|
|
): Record<string, SessionEntry> {
|
|
// Check cache first if enabled
|
|
if (!opts.skipCache && isSessionStoreCacheEnabled()) {
|
|
const currentFileStat = getFileStatSnapshot(storePath);
|
|
const cached = readSessionStoreCache({
|
|
storePath,
|
|
ttlMs: getSessionStoreTtl(),
|
|
mtimeMs: currentFileStat?.mtimeMs,
|
|
sizeBytes: currentFileStat?.sizeBytes,
|
|
});
|
|
if (cached) {
|
|
return cached;
|
|
}
|
|
}
|
|
|
|
// Cache miss or disabled - load from disk.
|
|
// Retry up to 3 times when the file is empty or unparseable. On Windows the
|
|
// temp-file + rename write is not fully atomic: a concurrent reader can briefly
|
|
// observe a 0-byte file (between truncate and write) or a stale/locked state.
|
|
// A short synchronous backoff (50 ms via `Atomics.wait`) is enough for the
|
|
// writer to finish.
|
|
let store: Record<string, SessionEntry> = {};
|
|
let fileStat = getFileStatSnapshot(storePath);
|
|
let mtimeMs = fileStat?.mtimeMs;
|
|
let serializedFromDisk: string | undefined;
|
|
const maxReadAttempts = process.platform === "win32" ? 3 : 1;
|
|
const retryBuf = maxReadAttempts > 1 ? new Int32Array(new SharedArrayBuffer(4)) : undefined;
|
|
for (let attempt = 0; attempt < maxReadAttempts; attempt++) {
|
|
try {
|
|
const raw = fs.readFileSync(storePath, "utf-8");
|
|
if (raw.length === 0 && attempt < maxReadAttempts - 1) {
|
|
// File is empty — likely caught mid-write; retry after a brief pause.
|
|
Atomics.wait(retryBuf!, 0, 0, 50);
|
|
continue;
|
|
}
|
|
const parsed = JSON.parse(raw);
|
|
if (isSessionStoreRecord(parsed)) {
|
|
store = parsed;
|
|
serializedFromDisk = raw;
|
|
}
|
|
fileStat = getFileStatSnapshot(storePath) ?? fileStat;
|
|
mtimeMs = fileStat?.mtimeMs;
|
|
break;
|
|
} catch {
|
|
// File missing, locked, or transiently corrupt — retry on Windows.
|
|
if (attempt < maxReadAttempts - 1) {
|
|
Atomics.wait(retryBuf!, 0, 0, 50);
|
|
continue;
|
|
}
|
|
// Final attempt failed; proceed with an empty store.
|
|
}
|
|
}
|
|
if (serializedFromDisk !== undefined) {
|
|
setSerializedSessionStore(storePath, serializedFromDisk);
|
|
} else {
|
|
setSerializedSessionStore(storePath, undefined);
|
|
}
|
|
|
|
applySessionStoreMigrations(store);
|
|
|
|
// Cache the result if caching is enabled
|
|
if (!opts.skipCache && isSessionStoreCacheEnabled()) {
|
|
writeSessionStoreCache({
|
|
storePath,
|
|
store,
|
|
mtimeMs,
|
|
sizeBytes: fileStat?.sizeBytes,
|
|
serialized: serializedFromDisk,
|
|
});
|
|
}
|
|
|
|
return structuredClone(store);
|
|
}
|
|
|
|
export function readSessionUpdatedAt(params: {
|
|
storePath: string;
|
|
sessionKey: string;
|
|
}): number | undefined {
|
|
try {
|
|
const store = loadSessionStore(params.storePath);
|
|
const resolved = resolveStoreSessionEntry({ store, sessionKey: params.sessionKey });
|
|
return resolved.existing?.updatedAt;
|
|
} catch {
|
|
return undefined;
|
|
}
|
|
}
|
|
|
|
// ============================================================================
|
|
// Session Store Pruning, Capping & File Rotation
|
|
// ============================================================================
|
|
|
|
export type SessionMaintenanceApplyReport = {
|
|
mode: ResolvedSessionMaintenanceConfig["mode"];
|
|
beforeCount: number;
|
|
afterCount: number;
|
|
pruned: number;
|
|
capped: number;
|
|
diskBudget: SessionDiskBudgetSweepResult | null;
|
|
};
|
|
|
|
export {
|
|
capEntryCount,
|
|
getActiveSessionMaintenanceWarning,
|
|
pruneStaleEntries,
|
|
resolveMaintenanceConfig,
|
|
rotateSessionFile,
|
|
};
|
|
export type { ResolvedSessionMaintenanceConfig, SessionMaintenanceWarning };
|
|
|
|
type SaveSessionStoreOptions = {
|
|
/** Skip pruning, capping, and rotation (e.g. during one-time migrations). */
|
|
skipMaintenance?: boolean;
|
|
/** Active session key for warn-only maintenance. */
|
|
activeSessionKey?: string;
|
|
/** Optional callback for warn-only maintenance. */
|
|
onWarn?: (warning: SessionMaintenanceWarning) => void | Promise<void>;
|
|
/** Optional callback with maintenance stats after a save. */
|
|
onMaintenanceApplied?: (report: SessionMaintenanceApplyReport) => void | Promise<void>;
|
|
/** Optional overrides used by maintenance commands. */
|
|
maintenanceOverride?: Partial<ResolvedSessionMaintenanceConfig>;
|
|
};
|
|
|
|
function updateSessionStoreWriteCaches(params: {
|
|
storePath: string;
|
|
store: Record<string, SessionEntry>;
|
|
serialized: string;
|
|
}): void {
|
|
const fileStat = getFileStatSnapshot(params.storePath);
|
|
setSerializedSessionStore(params.storePath, params.serialized);
|
|
if (!isSessionStoreCacheEnabled()) {
|
|
dropSessionStoreObjectCache(params.storePath);
|
|
return;
|
|
}
|
|
writeSessionStoreCache({
|
|
storePath: params.storePath,
|
|
store: params.store,
|
|
mtimeMs: fileStat?.mtimeMs,
|
|
sizeBytes: fileStat?.sizeBytes,
|
|
serialized: params.serialized,
|
|
});
|
|
}
|
|
|
|
async function saveSessionStoreUnlocked(
|
|
storePath: string,
|
|
store: Record<string, SessionEntry>,
|
|
opts?: SaveSessionStoreOptions,
|
|
): Promise<void> {
|
|
normalizeSessionStore(store);
|
|
|
|
if (!opts?.skipMaintenance) {
|
|
// Resolve maintenance config once (avoids repeated loadConfig() calls).
|
|
const maintenance = { ...resolveMaintenanceConfig(), ...opts?.maintenanceOverride };
|
|
const shouldWarnOnly = maintenance.mode === "warn";
|
|
const beforeCount = Object.keys(store).length;
|
|
|
|
if (shouldWarnOnly) {
|
|
const activeSessionKey = opts?.activeSessionKey?.trim();
|
|
if (activeSessionKey) {
|
|
const warning = getActiveSessionMaintenanceWarning({
|
|
store,
|
|
activeSessionKey,
|
|
pruneAfterMs: maintenance.pruneAfterMs,
|
|
maxEntries: maintenance.maxEntries,
|
|
});
|
|
if (warning) {
|
|
log.warn("session maintenance would evict active session; skipping enforcement", {
|
|
activeSessionKey: warning.activeSessionKey,
|
|
wouldPrune: warning.wouldPrune,
|
|
wouldCap: warning.wouldCap,
|
|
pruneAfterMs: warning.pruneAfterMs,
|
|
maxEntries: warning.maxEntries,
|
|
});
|
|
await opts?.onWarn?.(warning);
|
|
}
|
|
}
|
|
const diskBudget = await enforceSessionDiskBudget({
|
|
store,
|
|
storePath,
|
|
activeSessionKey: opts?.activeSessionKey,
|
|
maintenance,
|
|
warnOnly: true,
|
|
log,
|
|
});
|
|
await opts?.onMaintenanceApplied?.({
|
|
mode: maintenance.mode,
|
|
beforeCount,
|
|
afterCount: Object.keys(store).length,
|
|
pruned: 0,
|
|
capped: 0,
|
|
diskBudget,
|
|
});
|
|
} else {
|
|
// Prune stale entries and cap total count before serializing.
|
|
const removedSessionFiles = new Map<string, string | undefined>();
|
|
const pruned = pruneStaleEntries(store, maintenance.pruneAfterMs, {
|
|
onPruned: ({ entry }) => {
|
|
rememberRemovedSessionFile(removedSessionFiles, entry);
|
|
},
|
|
});
|
|
const capped = capEntryCount(store, maintenance.maxEntries, {
|
|
onCapped: ({ entry }) => {
|
|
rememberRemovedSessionFile(removedSessionFiles, entry);
|
|
},
|
|
});
|
|
const archivedDirs = new Set<string>();
|
|
const referencedSessionIds = new Set(
|
|
Object.values(store)
|
|
.map((entry) => entry?.sessionId)
|
|
.filter((id): id is string => Boolean(id)),
|
|
);
|
|
const archivedForDeletedSessions = archiveRemovedSessionTranscripts({
|
|
removedSessionFiles,
|
|
referencedSessionIds,
|
|
storePath,
|
|
reason: "deleted",
|
|
restrictToStoreDir: true,
|
|
});
|
|
for (const archivedDir of archivedForDeletedSessions) {
|
|
archivedDirs.add(archivedDir);
|
|
}
|
|
if (archivedDirs.size > 0 || maintenance.resetArchiveRetentionMs != null) {
|
|
const targetDirs =
|
|
archivedDirs.size > 0 ? [...archivedDirs] : [path.dirname(path.resolve(storePath))];
|
|
await cleanupArchivedSessionTranscripts({
|
|
directories: targetDirs,
|
|
olderThanMs: maintenance.pruneAfterMs,
|
|
reason: "deleted",
|
|
});
|
|
if (maintenance.resetArchiveRetentionMs != null) {
|
|
await cleanupArchivedSessionTranscripts({
|
|
directories: targetDirs,
|
|
olderThanMs: maintenance.resetArchiveRetentionMs,
|
|
reason: "reset",
|
|
});
|
|
}
|
|
}
|
|
|
|
// Rotate the on-disk file if it exceeds the size threshold.
|
|
await rotateSessionFile(storePath, maintenance.rotateBytes);
|
|
|
|
const diskBudget = await enforceSessionDiskBudget({
|
|
store,
|
|
storePath,
|
|
activeSessionKey: opts?.activeSessionKey,
|
|
maintenance,
|
|
warnOnly: false,
|
|
log,
|
|
});
|
|
await opts?.onMaintenanceApplied?.({
|
|
mode: maintenance.mode,
|
|
beforeCount,
|
|
afterCount: Object.keys(store).length,
|
|
pruned,
|
|
capped,
|
|
diskBudget,
|
|
});
|
|
}
|
|
}
|
|
|
|
await fs.promises.mkdir(path.dirname(storePath), { recursive: true });
|
|
const json = JSON.stringify(store, null, 2);
|
|
if (getSerializedSessionStore(storePath) === json) {
|
|
updateSessionStoreWriteCaches({ storePath, store, serialized: json });
|
|
return;
|
|
}
|
|
|
|
// Windows: keep retry semantics because rename can fail while readers hold locks.
|
|
if (process.platform === "win32") {
|
|
for (let i = 0; i < 5; i++) {
|
|
try {
|
|
await writeSessionStoreAtomic({ storePath, store, serialized: json });
|
|
return;
|
|
} catch (err) {
|
|
const code = getErrorCode(err);
|
|
if (code === "ENOENT") {
|
|
return;
|
|
}
|
|
if (i < 4) {
|
|
await new Promise((r) => setTimeout(r, 50 * (i + 1)));
|
|
continue;
|
|
}
|
|
// Final attempt failed — skip this save. The write lock ensures
|
|
// the next save will retry with fresh data. Log for diagnostics.
|
|
log.warn(`atomic write failed after 5 attempts: ${storePath}`);
|
|
}
|
|
}
|
|
return;
|
|
}
|
|
|
|
try {
|
|
await writeSessionStoreAtomic({ storePath, store, serialized: json });
|
|
} catch (err) {
|
|
const code = getErrorCode(err);
|
|
|
|
if (code === "ENOENT") {
|
|
// In tests the temp session-store directory may be deleted while writes are in-flight.
|
|
// Best-effort: try a direct write (recreating the parent dir), otherwise ignore.
|
|
try {
|
|
await writeSessionStoreAtomic({ storePath, store, serialized: json });
|
|
} catch (err2) {
|
|
const code2 = getErrorCode(err2);
|
|
if (code2 === "ENOENT") {
|
|
return;
|
|
}
|
|
throw err2;
|
|
}
|
|
return;
|
|
}
|
|
|
|
throw err;
|
|
}
|
|
}
|
|
|
|
export async function saveSessionStore(
|
|
storePath: string,
|
|
store: Record<string, SessionEntry>,
|
|
opts?: SaveSessionStoreOptions,
|
|
): Promise<void> {
|
|
await withSessionStoreLock(storePath, async () => {
|
|
await saveSessionStoreUnlocked(storePath, store, opts);
|
|
});
|
|
}
|
|
|
|
export async function updateSessionStore<T>(
|
|
storePath: string,
|
|
mutator: (store: Record<string, SessionEntry>) => Promise<T> | T,
|
|
opts?: SaveSessionStoreOptions,
|
|
): Promise<T> {
|
|
return await withSessionStoreLock(storePath, async () => {
|
|
// Always re-read inside the lock to avoid clobbering concurrent writers.
|
|
const store = loadSessionStore(storePath, { skipCache: true });
|
|
const result = await mutator(store);
|
|
await saveSessionStoreUnlocked(storePath, store, opts);
|
|
return result;
|
|
});
|
|
}
|
|
|
|
type SessionStoreLockOptions = {
|
|
timeoutMs?: number;
|
|
pollIntervalMs?: number;
|
|
staleMs?: number;
|
|
};
|
|
|
|
type SessionStoreLockTask = {
|
|
fn: () => Promise<unknown>;
|
|
resolve: (value: unknown) => void;
|
|
reject: (reason: unknown) => void;
|
|
timeoutMs?: number;
|
|
staleMs: number;
|
|
};
|
|
|
|
type SessionStoreLockQueue = {
|
|
running: boolean;
|
|
pending: SessionStoreLockTask[];
|
|
};
|
|
|
|
const LOCK_QUEUES = new Map<string, SessionStoreLockQueue>();
|
|
|
|
function getErrorCode(error: unknown): string | null {
|
|
if (!error || typeof error !== "object" || !("code" in error)) {
|
|
return null;
|
|
}
|
|
return String((error as { code?: unknown }).code);
|
|
}
|
|
|
|
function rememberRemovedSessionFile(
|
|
removedSessionFiles: Map<string, string | undefined>,
|
|
entry: SessionEntry,
|
|
): void {
|
|
if (!removedSessionFiles.has(entry.sessionId) || entry.sessionFile) {
|
|
removedSessionFiles.set(entry.sessionId, entry.sessionFile);
|
|
}
|
|
}
|
|
|
|
export function archiveRemovedSessionTranscripts(params: {
|
|
removedSessionFiles: Iterable<[string, string | undefined]>;
|
|
referencedSessionIds: ReadonlySet<string>;
|
|
storePath: string;
|
|
reason: "deleted" | "reset";
|
|
restrictToStoreDir?: boolean;
|
|
}): Set<string> {
|
|
const archivedDirs = new Set<string>();
|
|
for (const [sessionId, sessionFile] of params.removedSessionFiles) {
|
|
if (params.referencedSessionIds.has(sessionId)) {
|
|
continue;
|
|
}
|
|
const archived = archiveSessionTranscripts({
|
|
sessionId,
|
|
storePath: params.storePath,
|
|
sessionFile,
|
|
reason: params.reason,
|
|
restrictToStoreDir: params.restrictToStoreDir,
|
|
});
|
|
for (const archivedPath of archived) {
|
|
archivedDirs.add(path.dirname(archivedPath));
|
|
}
|
|
}
|
|
return archivedDirs;
|
|
}
|
|
|
|
async function writeSessionStoreAtomic(params: {
|
|
storePath: string;
|
|
store: Record<string, SessionEntry>;
|
|
serialized: string;
|
|
}): Promise<void> {
|
|
await writeTextAtomic(params.storePath, params.serialized, { mode: 0o600 });
|
|
updateSessionStoreWriteCaches({
|
|
storePath: params.storePath,
|
|
store: params.store,
|
|
serialized: params.serialized,
|
|
});
|
|
}
|
|
|
|
async function persistResolvedSessionEntry(params: {
|
|
storePath: string;
|
|
store: Record<string, SessionEntry>;
|
|
resolved: ReturnType<typeof resolveStoreSessionEntry>;
|
|
next: SessionEntry;
|
|
}): Promise<SessionEntry> {
|
|
params.store[params.resolved.normalizedKey] = params.next;
|
|
for (const legacyKey of params.resolved.legacyKeys) {
|
|
delete params.store[legacyKey];
|
|
}
|
|
await saveSessionStoreUnlocked(params.storePath, params.store, {
|
|
activeSessionKey: params.resolved.normalizedKey,
|
|
});
|
|
return params.next;
|
|
}
|
|
|
|
function lockTimeoutError(storePath: string): Error {
|
|
return new Error(`timeout waiting for session store lock: ${storePath}`);
|
|
}
|
|
|
|
function getOrCreateLockQueue(storePath: string): SessionStoreLockQueue {
|
|
const existing = LOCK_QUEUES.get(storePath);
|
|
if (existing) {
|
|
return existing;
|
|
}
|
|
const created: SessionStoreLockQueue = { running: false, pending: [] };
|
|
LOCK_QUEUES.set(storePath, created);
|
|
return created;
|
|
}
|
|
|
|
async function drainSessionStoreLockQueue(storePath: string): Promise<void> {
|
|
const queue = LOCK_QUEUES.get(storePath);
|
|
if (!queue || queue.running) {
|
|
return;
|
|
}
|
|
queue.running = true;
|
|
try {
|
|
while (queue.pending.length > 0) {
|
|
const task = queue.pending.shift();
|
|
if (!task) {
|
|
continue;
|
|
}
|
|
|
|
const remainingTimeoutMs = task.timeoutMs ?? Number.POSITIVE_INFINITY;
|
|
if (task.timeoutMs != null && remainingTimeoutMs <= 0) {
|
|
task.reject(lockTimeoutError(storePath));
|
|
continue;
|
|
}
|
|
|
|
let lock: { release: () => Promise<void> } | undefined;
|
|
let result: unknown;
|
|
let failed: unknown;
|
|
let hasFailure = false;
|
|
try {
|
|
lock = await acquireSessionWriteLock({
|
|
sessionFile: storePath,
|
|
timeoutMs: remainingTimeoutMs,
|
|
staleMs: task.staleMs,
|
|
});
|
|
result = await task.fn();
|
|
} catch (err) {
|
|
hasFailure = true;
|
|
failed = err;
|
|
} finally {
|
|
await lock?.release().catch(() => undefined);
|
|
}
|
|
if (hasFailure) {
|
|
task.reject(failed);
|
|
continue;
|
|
}
|
|
task.resolve(result);
|
|
}
|
|
} finally {
|
|
queue.running = false;
|
|
if (queue.pending.length === 0) {
|
|
LOCK_QUEUES.delete(storePath);
|
|
} else {
|
|
queueMicrotask(() => {
|
|
void drainSessionStoreLockQueue(storePath);
|
|
});
|
|
}
|
|
}
|
|
}
|
|
|
|
async function withSessionStoreLock<T>(
|
|
storePath: string,
|
|
fn: () => Promise<T>,
|
|
opts: SessionStoreLockOptions = {},
|
|
): Promise<T> {
|
|
if (!storePath || typeof storePath !== "string") {
|
|
throw new Error(
|
|
`withSessionStoreLock: storePath must be a non-empty string, got ${JSON.stringify(storePath)}`,
|
|
);
|
|
}
|
|
const timeoutMs = opts.timeoutMs ?? 10_000;
|
|
const staleMs = opts.staleMs ?? 30_000;
|
|
// `pollIntervalMs` is retained for API compatibility with older lock options.
|
|
void opts.pollIntervalMs;
|
|
|
|
const hasTimeout = timeoutMs > 0 && Number.isFinite(timeoutMs);
|
|
const queue = getOrCreateLockQueue(storePath);
|
|
|
|
const promise = new Promise<T>((resolve, reject) => {
|
|
const task: SessionStoreLockTask = {
|
|
fn: async () => await fn(),
|
|
resolve: (value) => resolve(value as T),
|
|
reject,
|
|
timeoutMs: hasTimeout ? timeoutMs : undefined,
|
|
staleMs,
|
|
};
|
|
|
|
queue.pending.push(task);
|
|
void drainSessionStoreLockQueue(storePath);
|
|
});
|
|
|
|
return await promise;
|
|
}
|
|
|
|
export async function updateSessionStoreEntry(params: {
|
|
storePath: string;
|
|
sessionKey: string;
|
|
update: (entry: SessionEntry) => Promise<Partial<SessionEntry> | null>;
|
|
}): Promise<SessionEntry | null> {
|
|
const { storePath, sessionKey, update } = params;
|
|
return await withSessionStoreLock(storePath, async () => {
|
|
const store = loadSessionStore(storePath, { skipCache: true });
|
|
const resolved = resolveStoreSessionEntry({ store, sessionKey });
|
|
const existing = resolved.existing;
|
|
if (!existing) {
|
|
return null;
|
|
}
|
|
const patch = await update(existing);
|
|
if (!patch) {
|
|
return existing;
|
|
}
|
|
const next = mergeSessionEntry(existing, patch);
|
|
return await persistResolvedSessionEntry({
|
|
storePath,
|
|
store,
|
|
resolved,
|
|
next,
|
|
});
|
|
});
|
|
}
|
|
|
|
export async function recordSessionMetaFromInbound(params: {
|
|
storePath: string;
|
|
sessionKey: string;
|
|
ctx: MsgContext;
|
|
groupResolution?: import("./types.js").GroupKeyResolution | null;
|
|
createIfMissing?: boolean;
|
|
}): Promise<SessionEntry | null> {
|
|
const { storePath, sessionKey, ctx } = params;
|
|
const createIfMissing = params.createIfMissing ?? true;
|
|
return await updateSessionStore(
|
|
storePath,
|
|
(store) => {
|
|
const resolved = resolveStoreSessionEntry({ store, sessionKey });
|
|
const existing = resolved.existing;
|
|
const patch = deriveSessionMetaPatch({
|
|
ctx,
|
|
sessionKey: resolved.normalizedKey,
|
|
existing,
|
|
groupResolution: params.groupResolution,
|
|
});
|
|
if (!patch) {
|
|
if (existing && resolved.legacyKeys.length > 0) {
|
|
store[resolved.normalizedKey] = existing;
|
|
for (const legacyKey of resolved.legacyKeys) {
|
|
delete store[legacyKey];
|
|
}
|
|
}
|
|
return existing ?? null;
|
|
}
|
|
if (!existing && !createIfMissing) {
|
|
return null;
|
|
}
|
|
const next = existing
|
|
? // Inbound metadata updates must not refresh activity timestamps;
|
|
// idle reset evaluation relies on updatedAt from actual session turns.
|
|
mergeSessionEntryPreserveActivity(existing, patch)
|
|
: mergeSessionEntry(existing, patch);
|
|
store[resolved.normalizedKey] = next;
|
|
for (const legacyKey of resolved.legacyKeys) {
|
|
delete store[legacyKey];
|
|
}
|
|
return next;
|
|
},
|
|
{ activeSessionKey: normalizeStoreSessionKey(sessionKey) },
|
|
);
|
|
}
|
|
|
|
export async function updateLastRoute(params: {
|
|
storePath: string;
|
|
sessionKey: string;
|
|
channel?: SessionEntry["lastChannel"];
|
|
to?: string;
|
|
accountId?: string;
|
|
threadId?: string | number;
|
|
deliveryContext?: DeliveryContext;
|
|
ctx?: MsgContext;
|
|
groupResolution?: import("./types.js").GroupKeyResolution | null;
|
|
}) {
|
|
const { storePath, sessionKey, channel, to, accountId, threadId, ctx } = params;
|
|
return await withSessionStoreLock(storePath, async () => {
|
|
const store = loadSessionStore(storePath);
|
|
const resolved = resolveStoreSessionEntry({ store, sessionKey });
|
|
const existing = resolved.existing;
|
|
const now = Date.now();
|
|
const explicitContext = normalizeDeliveryContext(params.deliveryContext);
|
|
const inlineContext = normalizeDeliveryContext({
|
|
channel,
|
|
to,
|
|
accountId,
|
|
threadId,
|
|
});
|
|
const mergedInput = mergeDeliveryContext(explicitContext, inlineContext);
|
|
const explicitDeliveryContext = params.deliveryContext;
|
|
const explicitThreadFromDeliveryContext =
|
|
explicitDeliveryContext != null &&
|
|
Object.prototype.hasOwnProperty.call(explicitDeliveryContext, "threadId")
|
|
? explicitDeliveryContext.threadId
|
|
: undefined;
|
|
const explicitThreadValue =
|
|
explicitThreadFromDeliveryContext ??
|
|
(threadId != null && threadId !== "" ? threadId : undefined);
|
|
const explicitRouteProvided = Boolean(
|
|
explicitContext?.channel ||
|
|
explicitContext?.to ||
|
|
inlineContext?.channel ||
|
|
inlineContext?.to,
|
|
);
|
|
const clearThreadFromFallback = explicitRouteProvided && explicitThreadValue == null;
|
|
const fallbackContext = clearThreadFromFallback
|
|
? removeThreadFromDeliveryContext(deliveryContextFromSession(existing))
|
|
: deliveryContextFromSession(existing);
|
|
const merged = mergeDeliveryContext(mergedInput, fallbackContext);
|
|
const normalized = normalizeSessionDeliveryFields({
|
|
deliveryContext: {
|
|
channel: merged?.channel,
|
|
to: merged?.to,
|
|
accountId: merged?.accountId,
|
|
threadId: merged?.threadId,
|
|
},
|
|
});
|
|
const metaPatch = ctx
|
|
? deriveSessionMetaPatch({
|
|
ctx,
|
|
sessionKey: resolved.normalizedKey,
|
|
existing,
|
|
groupResolution: params.groupResolution,
|
|
})
|
|
: null;
|
|
const basePatch: Partial<SessionEntry> = {
|
|
updatedAt: Math.max(existing?.updatedAt ?? 0, now),
|
|
deliveryContext: normalized.deliveryContext,
|
|
lastChannel: normalized.lastChannel,
|
|
lastTo: normalized.lastTo,
|
|
lastAccountId: normalized.lastAccountId,
|
|
lastThreadId: normalized.lastThreadId,
|
|
};
|
|
const next = mergeSessionEntry(
|
|
existing,
|
|
metaPatch ? { ...basePatch, ...metaPatch } : basePatch,
|
|
);
|
|
return await persistResolvedSessionEntry({
|
|
storePath,
|
|
store,
|
|
resolved,
|
|
next,
|
|
});
|
|
});
|
|
}
|