mirror of https://github.com/openclaw/openclaw.git
fix(matrix): add advisory file locking to IDB crypto persistence (#59851)
Merged via squash.
Prepared head SHA: 392e411ffd
Co-authored-by: al3mart <11448715+al3mart@users.noreply.github.com>
Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com>
Reviewed-by: @gumadeiras
This commit is contained in:
parent
b894ca6702
commit
3a91a4f8d4
|
|
@ -29,6 +29,7 @@ Docs: https://docs.openclaw.ai
|
|||
- Agents/tool policy: preserve restrictive plugin-only allowlists instead of silently widening access to core tools, and keep allowlist warnings aligned with the enforced policy. (#58476) Thanks @eleqtrizit.
|
||||
- Telegram/native commands: clean up metadata-driven progress placeholders when replies fall back, edits fail, or local exec approval prompts are suppressed. (#59300) Thanks @jalehman.
|
||||
- Matrix: allow secret-storage recreation during automatic repair bootstrap so clients that lose their recovery key can recover and persist new cross-signing keys. (#59846) Thanks @al3mart.
|
||||
- Matrix/crypto persistence: capture and write the IndexedDB snapshot while holding the snapshot file lock so concurrent gateway and CLI persists cannot overwrite newer crypto state. (#59851) Thanks @al3mart.
|
||||
|
||||
## 2026.4.2
|
||||
|
||||
|
|
|
|||
|
|
@ -422,6 +422,7 @@ OpenClaw currently provides that in Node by:
|
|||
- using `fake-indexeddb` as the IndexedDB API shim expected by the SDK
|
||||
- restoring the Rust crypto IndexedDB contents from `crypto-idb-snapshot.json` before `initRustCrypto`
|
||||
- persisting the updated IndexedDB contents back to `crypto-idb-snapshot.json` after init and during runtime
|
||||
- serializing snapshot restore and persist against `crypto-idb-snapshot.json` with an advisory file lock so gateway runtime persistence and CLI maintenance do not race on the same snapshot file
|
||||
|
||||
This is compatibility/storage plumbing, not a custom crypto implementation.
|
||||
The snapshot file is sensitive runtime state and is stored with restrictive file permissions.
|
||||
|
|
|
|||
|
|
@ -1194,7 +1194,9 @@ describe("MatrixClient crypto bootstrapping", () => {
|
|||
const callsAfterStart = databasesSpy.mock.calls.length;
|
||||
|
||||
await vi.advanceTimersByTimeAsync(60_000);
|
||||
expect(databasesSpy.mock.calls.length).toBeGreaterThan(callsAfterStart);
|
||||
await vi.waitFor(() => {
|
||||
expect(databasesSpy.mock.calls.length).toBeGreaterThan(callsAfterStart);
|
||||
});
|
||||
|
||||
client.stop();
|
||||
const callsAfterStop = databasesSpy.mock.calls.length;
|
||||
|
|
|
|||
|
|
@ -0,0 +1,74 @@
|
|||
import "fake-indexeddb/auto";
|
||||
import fs from "node:fs";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { clearAllIndexedDbState, seedDatabase } from "./idb-persistence.test-helpers.js";
|
||||
|
||||
const { withFileLockMock } = vi.hoisted(() => ({
|
||||
withFileLockMock: vi.fn(
|
||||
async <T>(_filePath: string, _options: unknown, fn: () => Promise<T>) => await fn(),
|
||||
),
|
||||
}));
|
||||
|
||||
vi.mock("openclaw/plugin-sdk/infra-runtime", async (importOriginal) => {
|
||||
const actual = await importOriginal<typeof import("openclaw/plugin-sdk/infra-runtime")>();
|
||||
return {
|
||||
...actual,
|
||||
withFileLock: withFileLockMock,
|
||||
};
|
||||
});
|
||||
|
||||
let persistIdbToDisk: typeof import("./idb-persistence.js").persistIdbToDisk;
|
||||
|
||||
beforeAll(async () => {
|
||||
({ persistIdbToDisk } = await import("./idb-persistence.js"));
|
||||
});
|
||||
|
||||
describe("Matrix IndexedDB persistence lock ordering", () => {
|
||||
let tmpDir: string;
|
||||
|
||||
beforeEach(async () => {
|
||||
tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "matrix-idb-lock-order-"));
|
||||
withFileLockMock.mockReset();
|
||||
withFileLockMock.mockImplementation(
|
||||
async <T>(_filePath: string, _options: unknown, fn: () => Promise<T>) => await fn(),
|
||||
);
|
||||
await clearAllIndexedDbState();
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await clearAllIndexedDbState();
|
||||
fs.rmSync(tmpDir, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
it("captures the snapshot after the file lock is acquired", async () => {
|
||||
const snapshotPath = path.join(tmpDir, "crypto-idb-snapshot.json");
|
||||
const dbName = "openclaw-matrix-test::matrix-sdk-crypto";
|
||||
await seedDatabase({
|
||||
name: dbName,
|
||||
storeName: "sessions",
|
||||
records: [{ key: "room-1", value: { session: "old-session" } }],
|
||||
});
|
||||
|
||||
withFileLockMock.mockImplementationOnce(async (_filePath, _options, fn) => {
|
||||
await seedDatabase({
|
||||
name: dbName,
|
||||
storeName: "sessions",
|
||||
records: [{ key: "room-1", value: { session: "new-session" } }],
|
||||
});
|
||||
return await fn();
|
||||
});
|
||||
|
||||
await persistIdbToDisk({ snapshotPath, databasePrefix: "openclaw-matrix-test" });
|
||||
|
||||
const data = JSON.parse(fs.readFileSync(snapshotPath, "utf8")) as Array<{
|
||||
stores: Array<{
|
||||
name: string;
|
||||
records: Array<{ key: IDBValidKey; value: { session: string } }>;
|
||||
}>;
|
||||
}>;
|
||||
const sessionsStore = data[0]?.stores.find((store) => store.name === "sessions");
|
||||
expect(sessionsStore?.records).toEqual([{ key: "room-1", value: { session: "new-session" } }]);
|
||||
});
|
||||
});
|
||||
|
|
@ -0,0 +1,88 @@
|
|||
export async function clearAllIndexedDbState(): Promise<void> {
|
||||
const databases = await indexedDB.databases();
|
||||
await Promise.all(
|
||||
databases
|
||||
.map((entry) => entry.name)
|
||||
.filter((name): name is string => Boolean(name))
|
||||
.map(
|
||||
(name) =>
|
||||
new Promise<void>((resolve, reject) => {
|
||||
const req = indexedDB.deleteDatabase(name);
|
||||
req.onsuccess = () => resolve();
|
||||
req.onerror = () => reject(req.error);
|
||||
req.onblocked = () => resolve();
|
||||
}),
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
export async function seedDatabase(params: {
|
||||
name: string;
|
||||
version?: number;
|
||||
storeName: string;
|
||||
records: Array<{ key: IDBValidKey; value: unknown }>;
|
||||
}): Promise<void> {
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
const req = indexedDB.open(params.name, params.version ?? 1);
|
||||
req.onupgradeneeded = () => {
|
||||
const db = req.result;
|
||||
if (!db.objectStoreNames.contains(params.storeName)) {
|
||||
db.createObjectStore(params.storeName);
|
||||
}
|
||||
};
|
||||
req.onsuccess = () => {
|
||||
const db = req.result;
|
||||
const tx = db.transaction(params.storeName, "readwrite");
|
||||
const store = tx.objectStore(params.storeName);
|
||||
for (const record of params.records) {
|
||||
store.put(record.value, record.key);
|
||||
}
|
||||
tx.oncomplete = () => {
|
||||
db.close();
|
||||
resolve();
|
||||
};
|
||||
tx.onerror = () => reject(tx.error);
|
||||
};
|
||||
req.onerror = () => reject(req.error);
|
||||
});
|
||||
}
|
||||
|
||||
export async function readDatabaseRecords(params: {
|
||||
name: string;
|
||||
version?: number;
|
||||
storeName: string;
|
||||
}): Promise<Array<{ key: IDBValidKey; value: unknown }>> {
|
||||
return await new Promise((resolve, reject) => {
|
||||
const req = indexedDB.open(params.name, params.version ?? 1);
|
||||
req.onsuccess = () => {
|
||||
const db = req.result;
|
||||
const tx = db.transaction(params.storeName, "readonly");
|
||||
const store = tx.objectStore(params.storeName);
|
||||
const keysReq = store.getAllKeys();
|
||||
const valuesReq = store.getAll();
|
||||
let keys: IDBValidKey[] | null = null;
|
||||
let values: unknown[] | null = null;
|
||||
|
||||
const maybeResolve = () => {
|
||||
if (!keys || !values) {
|
||||
return;
|
||||
}
|
||||
db.close();
|
||||
const resolvedValues = values;
|
||||
resolve(keys.map((key, index) => ({ key, value: resolvedValues[index] })));
|
||||
};
|
||||
|
||||
keysReq.onsuccess = () => {
|
||||
keys = keysReq.result;
|
||||
maybeResolve();
|
||||
};
|
||||
valuesReq.onsuccess = () => {
|
||||
values = valuesReq.result;
|
||||
maybeResolve();
|
||||
};
|
||||
keysReq.onerror = () => reject(keysReq.error);
|
||||
valuesReq.onerror = () => reject(valuesReq.error);
|
||||
};
|
||||
req.onerror = () => reject(req.error);
|
||||
});
|
||||
}
|
||||
|
|
@ -2,99 +2,19 @@ import "fake-indexeddb/auto";
|
|||
import fs from "node:fs";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import {
|
||||
drainFileLockStateForTest,
|
||||
resetFileLockStateForTest,
|
||||
} from "openclaw/plugin-sdk/infra-runtime";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { persistIdbToDisk, restoreIdbFromDisk } from "./idb-persistence.js";
|
||||
import {
|
||||
clearAllIndexedDbState,
|
||||
readDatabaseRecords,
|
||||
seedDatabase,
|
||||
} from "./idb-persistence.test-helpers.js";
|
||||
import { LogService } from "./logger.js";
|
||||
|
||||
async function clearAllIndexedDbState(): Promise<void> {
|
||||
const databases = await indexedDB.databases();
|
||||
await Promise.all(
|
||||
databases
|
||||
.map((entry) => entry.name)
|
||||
.filter((name): name is string => Boolean(name))
|
||||
.map(
|
||||
(name) =>
|
||||
new Promise<void>((resolve, reject) => {
|
||||
const req = indexedDB.deleteDatabase(name);
|
||||
req.onsuccess = () => resolve();
|
||||
req.onerror = () => reject(req.error);
|
||||
req.onblocked = () => resolve();
|
||||
}),
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
async function seedDatabase(params: {
|
||||
name: string;
|
||||
version?: number;
|
||||
storeName: string;
|
||||
records: Array<{ key: IDBValidKey; value: unknown }>;
|
||||
}): Promise<void> {
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
const req = indexedDB.open(params.name, params.version ?? 1);
|
||||
req.onupgradeneeded = () => {
|
||||
const db = req.result;
|
||||
if (!db.objectStoreNames.contains(params.storeName)) {
|
||||
db.createObjectStore(params.storeName);
|
||||
}
|
||||
};
|
||||
req.onsuccess = () => {
|
||||
const db = req.result;
|
||||
const tx = db.transaction(params.storeName, "readwrite");
|
||||
const store = tx.objectStore(params.storeName);
|
||||
for (const record of params.records) {
|
||||
store.put(record.value, record.key);
|
||||
}
|
||||
tx.oncomplete = () => {
|
||||
db.close();
|
||||
resolve();
|
||||
};
|
||||
tx.onerror = () => reject(tx.error);
|
||||
};
|
||||
req.onerror = () => reject(req.error);
|
||||
});
|
||||
}
|
||||
|
||||
async function readDatabaseRecords(params: {
|
||||
name: string;
|
||||
version?: number;
|
||||
storeName: string;
|
||||
}): Promise<Array<{ key: IDBValidKey; value: unknown }>> {
|
||||
return await new Promise((resolve, reject) => {
|
||||
const req = indexedDB.open(params.name, params.version ?? 1);
|
||||
req.onsuccess = () => {
|
||||
const db = req.result;
|
||||
const tx = db.transaction(params.storeName, "readonly");
|
||||
const store = tx.objectStore(params.storeName);
|
||||
const keysReq = store.getAllKeys();
|
||||
const valuesReq = store.getAll();
|
||||
let keys: IDBValidKey[] | null = null;
|
||||
let values: unknown[] | null = null;
|
||||
|
||||
const maybeResolve = () => {
|
||||
if (!keys || !values) {
|
||||
return;
|
||||
}
|
||||
db.close();
|
||||
const resolvedValues = values;
|
||||
resolve(keys.map((key, index) => ({ key, value: resolvedValues[index] })));
|
||||
};
|
||||
|
||||
keysReq.onsuccess = () => {
|
||||
keys = keysReq.result;
|
||||
maybeResolve();
|
||||
};
|
||||
valuesReq.onsuccess = () => {
|
||||
values = valuesReq.result;
|
||||
maybeResolve();
|
||||
};
|
||||
keysReq.onerror = () => reject(keysReq.error);
|
||||
valuesReq.onerror = () => reject(valuesReq.error);
|
||||
};
|
||||
req.onerror = () => reject(req.error);
|
||||
});
|
||||
}
|
||||
|
||||
describe("Matrix IndexedDB persistence", () => {
|
||||
let tmpDir: string;
|
||||
let warnSpy: ReturnType<typeof vi.spyOn>;
|
||||
|
|
@ -108,6 +28,7 @@ describe("Matrix IndexedDB persistence", () => {
|
|||
afterEach(async () => {
|
||||
warnSpy.mockRestore();
|
||||
await clearAllIndexedDbState();
|
||||
resetFileLockStateForTest();
|
||||
fs.rmSync(tmpDir, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
|
|
@ -171,4 +92,58 @@ describe("Matrix IndexedDB persistence", () => {
|
|||
const dbs = await indexedDB.databases();
|
||||
expect(dbs).toEqual([]);
|
||||
});
|
||||
|
||||
it("serializes concurrent persist operations via file lock", async () => {
|
||||
const snapshotPath = path.join(tmpDir, "concurrent-persist.json");
|
||||
await seedDatabase({
|
||||
name: "openclaw-matrix-test::matrix-sdk-crypto",
|
||||
storeName: "sessions",
|
||||
records: [{ key: "room-1", value: { session: "abc123" } }],
|
||||
});
|
||||
|
||||
await Promise.all([
|
||||
persistIdbToDisk({ snapshotPath, databasePrefix: "openclaw-matrix-test" }),
|
||||
persistIdbToDisk({ snapshotPath, databasePrefix: "openclaw-matrix-test" }),
|
||||
]);
|
||||
|
||||
expect(fs.existsSync(snapshotPath)).toBe(true);
|
||||
|
||||
const data = JSON.parse(fs.readFileSync(snapshotPath, "utf8"));
|
||||
expect(Array.isArray(data)).toBe(true);
|
||||
expect(data.length).toBe(1);
|
||||
});
|
||||
|
||||
it("releases lock after persist completes", async () => {
|
||||
const snapshotPath = path.join(tmpDir, "lock-release.json");
|
||||
await seedDatabase({
|
||||
name: "openclaw-matrix-test::matrix-sdk-crypto",
|
||||
storeName: "sessions",
|
||||
records: [{ key: "room-1", value: { session: "abc123" } }],
|
||||
});
|
||||
|
||||
await persistIdbToDisk({ snapshotPath, databasePrefix: "openclaw-matrix-test" });
|
||||
|
||||
const lockPath = `${snapshotPath}.lock`;
|
||||
expect(fs.existsSync(lockPath)).toBe(false);
|
||||
await drainFileLockStateForTest();
|
||||
});
|
||||
|
||||
it("releases lock after restore completes", async () => {
|
||||
const snapshotPath = path.join(tmpDir, "lock-release-restore.json");
|
||||
await seedDatabase({
|
||||
name: "openclaw-matrix-test::matrix-sdk-crypto",
|
||||
storeName: "sessions",
|
||||
records: [{ key: "room-1", value: { session: "abc123" } }],
|
||||
});
|
||||
|
||||
await persistIdbToDisk({ snapshotPath, databasePrefix: "openclaw-matrix-test" });
|
||||
await clearAllIndexedDbState();
|
||||
await drainFileLockStateForTest();
|
||||
|
||||
await restoreIdbFromDisk(snapshotPath);
|
||||
|
||||
const lockPath = `${snapshotPath}.lock`;
|
||||
expect(fs.existsSync(lockPath)).toBe(false);
|
||||
await drainFileLockStateForTest();
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -1,8 +1,27 @@
|
|||
import fs from "node:fs";
|
||||
import path from "node:path";
|
||||
import { indexedDB as fakeIndexedDB } from "fake-indexeddb";
|
||||
import type { FileLockOptions } from "openclaw/plugin-sdk/infra-runtime";
|
||||
import { withFileLock } from "openclaw/plugin-sdk/infra-runtime";
|
||||
import { LogService } from "./logger.js";
|
||||
|
||||
// Advisory lock options for IDB snapshot file access. Without locking, the
|
||||
// gateway's periodic 60-second persist cycle and CLI crypto commands (e.g.
|
||||
// `openclaw matrix verify bootstrap`) can corrupt each other's state.
|
||||
// Use a longer stale window than the generic 30s default because snapshot
|
||||
// restore and large crypto-store dumps can legitimately hold the lock for
|
||||
// longer, and reclaiming a live lock would reintroduce concurrent corruption.
|
||||
const IDB_SNAPSHOT_LOCK_OPTIONS: FileLockOptions = {
|
||||
retries: {
|
||||
retries: 10,
|
||||
factor: 2,
|
||||
minTimeout: 50,
|
||||
maxTimeout: 5_000,
|
||||
randomize: true,
|
||||
},
|
||||
stale: 5 * 60_000,
|
||||
};
|
||||
|
||||
type IdbStoreSnapshot = {
|
||||
name: string;
|
||||
keyPath: IDBObjectStoreParameters["keyPath"];
|
||||
|
|
@ -198,17 +217,22 @@ export async function restoreIdbFromDisk(snapshotPath?: string): Promise<boolean
|
|||
const candidatePaths = snapshotPath ? [snapshotPath] : [resolveDefaultIdbSnapshotPath()];
|
||||
for (const resolvedPath of candidatePaths) {
|
||||
try {
|
||||
const data = fs.readFileSync(resolvedPath, "utf8");
|
||||
const snapshot = parseSnapshotPayload(data);
|
||||
if (!snapshot) {
|
||||
continue;
|
||||
const restored = await withFileLock(resolvedPath, IDB_SNAPSHOT_LOCK_OPTIONS, async () => {
|
||||
const data = fs.readFileSync(resolvedPath, "utf8");
|
||||
const snapshot = parseSnapshotPayload(data);
|
||||
if (!snapshot) {
|
||||
return false;
|
||||
}
|
||||
await restoreIndexedDatabases(snapshot);
|
||||
LogService.info(
|
||||
"IdbPersistence",
|
||||
`Restored ${snapshot.length} IndexedDB database(s) from ${resolvedPath}`,
|
||||
);
|
||||
return true;
|
||||
});
|
||||
if (restored) {
|
||||
return true;
|
||||
}
|
||||
await restoreIndexedDatabases(snapshot);
|
||||
LogService.info(
|
||||
"IdbPersistence",
|
||||
`Restored ${snapshot.length} IndexedDB database(s) from ${resolvedPath}`,
|
||||
);
|
||||
return true;
|
||||
} catch (err) {
|
||||
LogService.warn(
|
||||
"IdbPersistence",
|
||||
|
|
@ -227,14 +251,20 @@ export async function persistIdbToDisk(params?: {
|
|||
}): Promise<void> {
|
||||
const snapshotPath = params?.snapshotPath ?? resolveDefaultIdbSnapshotPath();
|
||||
try {
|
||||
const snapshot = await dumpIndexedDatabases(params?.databasePrefix);
|
||||
if (snapshot.length === 0) return;
|
||||
fs.mkdirSync(path.dirname(snapshotPath), { recursive: true });
|
||||
fs.writeFileSync(snapshotPath, JSON.stringify(snapshot));
|
||||
fs.chmodSync(snapshotPath, 0o600);
|
||||
const persistedCount = await withFileLock(snapshotPath, IDB_SNAPSHOT_LOCK_OPTIONS, async () => {
|
||||
const snapshot = await dumpIndexedDatabases(params?.databasePrefix);
|
||||
if (snapshot.length === 0) {
|
||||
return 0;
|
||||
}
|
||||
fs.writeFileSync(snapshotPath, JSON.stringify(snapshot));
|
||||
fs.chmodSync(snapshotPath, 0o600);
|
||||
return snapshot.length;
|
||||
});
|
||||
if (persistedCount === 0) return;
|
||||
LogService.debug(
|
||||
"IdbPersistence",
|
||||
`Persisted ${snapshot.length} IndexedDB database(s) to ${snapshotPath}`,
|
||||
`Persisted ${persistedCount} IndexedDB database(s) to ${snapshotPath}`,
|
||||
);
|
||||
} catch (err) {
|
||||
LogService.warn("IdbPersistence", "Failed to persist IndexedDB snapshot:", err);
|
||||
|
|
|
|||
Loading…
Reference in New Issue