mirror of https://github.com/openclaw/openclaw.git
fix(matrix): align IDB snapshot lock timing
This commit is contained in:
parent
1efa923ab8
commit
78a842d055
|
|
@ -1,3 +1,4 @@
|
|||
import "fake-indexeddb/auto";
|
||||
import { EventEmitter } from "node:events";
|
||||
import fs from "node:fs";
|
||||
import os from "node:os";
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ import type { MatrixCryptoFacade } from "./sdk/crypto-facade.js";
|
|||
import type { MatrixDecryptBridge } from "./sdk/decrypt-bridge.js";
|
||||
import { matrixEventToRaw, parseMxc } from "./sdk/event-helpers.js";
|
||||
import { MatrixAuthedHttpClient } from "./sdk/http-client.js";
|
||||
import { MATRIX_IDB_PERSIST_INTERVAL_MS } from "./sdk/idb-persistence-lock.js";
|
||||
import { ConsoleLogger, LogService, noop } from "./sdk/logger.js";
|
||||
import { MatrixRecoveryKeyStore } from "./sdk/recovery-key-store.js";
|
||||
import { createMatrixGuardedFetch, type HttpMethod, type QueryParams } from "./sdk/transport.js";
|
||||
|
|
@ -547,7 +548,7 @@ export class MatrixClient {
|
|||
snapshotPath: this.idbSnapshotPath,
|
||||
databasePrefix: this.cryptoDatabasePrefix,
|
||||
}).catch(noop);
|
||||
}, 60_000);
|
||||
}, MATRIX_IDB_PERSIST_INTERVAL_MS);
|
||||
} catch (err) {
|
||||
LogService.warn("MatrixClientLite", "Failed to initialize rust crypto:", err);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,51 @@
|
|||
import type { FileLockOptions } from "openclaw/plugin-sdk/infra-runtime";
|
||||
|
||||
export const MATRIX_IDB_PERSIST_INTERVAL_MS = 60_000;
|
||||
|
||||
const IDB_SNAPSHOT_LOCK_STALE_MS = 5 * 60_000;
|
||||
const IDB_SNAPSHOT_LOCK_RETRY_BASE = {
|
||||
factor: 2,
|
||||
minTimeout: 50,
|
||||
maxTimeout: 5_000,
|
||||
randomize: true,
|
||||
} satisfies Omit<FileLockOptions["retries"], "retries">;
|
||||
|
||||
function computeRetryDelayMs(retries: FileLockOptions["retries"], attempt: number): number {
|
||||
return Math.min(
|
||||
retries.maxTimeout,
|
||||
Math.max(retries.minTimeout, retries.minTimeout * retries.factor ** attempt),
|
||||
);
|
||||
}
|
||||
|
||||
export function computeMinimumRetryWindowMs(retries: FileLockOptions["retries"]): number {
|
||||
let total = 0;
|
||||
const attempts = Math.max(1, retries.retries + 1);
|
||||
for (let attempt = 0; attempt < attempts - 1; attempt += 1) {
|
||||
total += computeRetryDelayMs(retries, attempt);
|
||||
}
|
||||
return total;
|
||||
}
|
||||
|
||||
function resolveRetriesForMinimumWindowMs(
|
||||
retries: Omit<FileLockOptions["retries"], "retries">,
|
||||
minimumWindowMs: number,
|
||||
): FileLockOptions["retries"] {
|
||||
const resolved: FileLockOptions["retries"] = {
|
||||
...retries,
|
||||
retries: 0,
|
||||
};
|
||||
while (computeMinimumRetryWindowMs(resolved) < minimumWindowMs) {
|
||||
resolved.retries += 1;
|
||||
}
|
||||
return resolved;
|
||||
}
|
||||
|
||||
export const MATRIX_IDB_SNAPSHOT_LOCK_OPTIONS: FileLockOptions = {
|
||||
// Wait longer than one periodic persist interval so a concurrent restore
|
||||
// or large snapshot dump finishes instead of forcing warn-and-continue.
|
||||
retries: resolveRetriesForMinimumWindowMs(
|
||||
IDB_SNAPSHOT_LOCK_RETRY_BASE,
|
||||
MATRIX_IDB_PERSIST_INTERVAL_MS,
|
||||
),
|
||||
stale: IDB_SNAPSHOT_LOCK_STALE_MS,
|
||||
};
|
||||
|
|
@ -3,6 +3,10 @@ import fs from "node:fs";
|
|||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import {
|
||||
computeMinimumRetryWindowMs,
|
||||
MATRIX_IDB_PERSIST_INTERVAL_MS,
|
||||
} from "./idb-persistence-lock.js";
|
||||
import { clearAllIndexedDbState, seedDatabase } from "./idb-persistence.test-helpers.js";
|
||||
|
||||
const { withFileLockMock } = vi.hoisted(() => ({
|
||||
|
|
@ -20,9 +24,12 @@ vi.mock("openclaw/plugin-sdk/infra-runtime", async (importOriginal) => {
|
|||
});
|
||||
|
||||
let persistIdbToDisk: typeof import("./idb-persistence.js").persistIdbToDisk;
|
||||
let restoreIdbFromDisk: typeof import("./idb-persistence.js").restoreIdbFromDisk;
|
||||
type CapturedLockOptions =
|
||||
typeof import("./idb-persistence-lock.js").MATRIX_IDB_SNAPSHOT_LOCK_OPTIONS;
|
||||
|
||||
beforeAll(async () => {
|
||||
({ persistIdbToDisk } = await import("./idb-persistence.js"));
|
||||
({ persistIdbToDisk, restoreIdbFromDisk } = await import("./idb-persistence.js"));
|
||||
});
|
||||
|
||||
describe("Matrix IndexedDB persistence lock ordering", () => {
|
||||
|
|
@ -71,4 +78,29 @@ describe("Matrix IndexedDB persistence lock ordering", () => {
|
|||
const sessionsStore = data[0]?.stores.find((store) => store.name === "sessions");
|
||||
expect(sessionsStore?.records).toEqual([{ key: "room-1", value: { session: "new-session" } }]);
|
||||
});
|
||||
|
||||
it("waits at least one persist interval before timing out on snapshot lock contention", async () => {
|
||||
const snapshotPath = path.join(tmpDir, "crypto-idb-snapshot.json");
|
||||
const capturedOptions: CapturedLockOptions[] = [];
|
||||
|
||||
withFileLockMock.mockImplementationOnce(async (_filePath, options) => {
|
||||
capturedOptions.push(options as CapturedLockOptions);
|
||||
return 0;
|
||||
});
|
||||
await persistIdbToDisk({ snapshotPath, databasePrefix: "openclaw-matrix-test" });
|
||||
|
||||
withFileLockMock.mockImplementationOnce(async (_filePath, options) => {
|
||||
capturedOptions.push(options as CapturedLockOptions);
|
||||
return false;
|
||||
});
|
||||
await restoreIdbFromDisk(snapshotPath);
|
||||
|
||||
expect(capturedOptions).toHaveLength(2);
|
||||
for (const options of capturedOptions) {
|
||||
expect(computeMinimumRetryWindowMs(options.retries)).toBeGreaterThanOrEqual(
|
||||
MATRIX_IDB_PERSIST_INTERVAL_MS,
|
||||
);
|
||||
expect(options.stale).toBe(5 * 60_000);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -1,8 +1,8 @@
|
|||
import fs from "node:fs";
|
||||
import path from "node:path";
|
||||
import { indexedDB as fakeIndexedDB } from "fake-indexeddb";
|
||||
import type { FileLockOptions } from "openclaw/plugin-sdk/infra-runtime";
|
||||
import { withFileLock } from "openclaw/plugin-sdk/infra-runtime";
|
||||
import { MATRIX_IDB_SNAPSHOT_LOCK_OPTIONS } from "./idb-persistence-lock.js";
|
||||
import { LogService } from "./logger.js";
|
||||
|
||||
// Advisory lock options for IDB snapshot file access. Without locking, the
|
||||
|
|
@ -11,17 +11,6 @@ import { LogService } from "./logger.js";
|
|||
// Use a longer stale window than the generic 30s default because snapshot
|
||||
// restore and large crypto-store dumps can legitimately hold the lock for
|
||||
// longer, and reclaiming a live lock would reintroduce concurrent corruption.
|
||||
const IDB_SNAPSHOT_LOCK_OPTIONS: FileLockOptions = {
|
||||
retries: {
|
||||
retries: 10,
|
||||
factor: 2,
|
||||
minTimeout: 50,
|
||||
maxTimeout: 5_000,
|
||||
randomize: true,
|
||||
},
|
||||
stale: 5 * 60_000,
|
||||
};
|
||||
|
||||
type IdbStoreSnapshot = {
|
||||
name: string;
|
||||
keyPath: IDBObjectStoreParameters["keyPath"];
|
||||
|
|
@ -217,19 +206,23 @@ export async function restoreIdbFromDisk(snapshotPath?: string): Promise<boolean
|
|||
const candidatePaths = snapshotPath ? [snapshotPath] : [resolveDefaultIdbSnapshotPath()];
|
||||
for (const resolvedPath of candidatePaths) {
|
||||
try {
|
||||
const restored = await withFileLock(resolvedPath, IDB_SNAPSHOT_LOCK_OPTIONS, async () => {
|
||||
const data = fs.readFileSync(resolvedPath, "utf8");
|
||||
const snapshot = parseSnapshotPayload(data);
|
||||
if (!snapshot) {
|
||||
return false;
|
||||
}
|
||||
await restoreIndexedDatabases(snapshot);
|
||||
LogService.info(
|
||||
"IdbPersistence",
|
||||
`Restored ${snapshot.length} IndexedDB database(s) from ${resolvedPath}`,
|
||||
);
|
||||
return true;
|
||||
});
|
||||
const restored = await withFileLock(
|
||||
resolvedPath,
|
||||
MATRIX_IDB_SNAPSHOT_LOCK_OPTIONS,
|
||||
async () => {
|
||||
const data = fs.readFileSync(resolvedPath, "utf8");
|
||||
const snapshot = parseSnapshotPayload(data);
|
||||
if (!snapshot) {
|
||||
return false;
|
||||
}
|
||||
await restoreIndexedDatabases(snapshot);
|
||||
LogService.info(
|
||||
"IdbPersistence",
|
||||
`Restored ${snapshot.length} IndexedDB database(s) from ${resolvedPath}`,
|
||||
);
|
||||
return true;
|
||||
},
|
||||
);
|
||||
if (restored) {
|
||||
return true;
|
||||
}
|
||||
|
|
@ -252,15 +245,19 @@ export async function persistIdbToDisk(params?: {
|
|||
const snapshotPath = params?.snapshotPath ?? resolveDefaultIdbSnapshotPath();
|
||||
try {
|
||||
fs.mkdirSync(path.dirname(snapshotPath), { recursive: true });
|
||||
const persistedCount = await withFileLock(snapshotPath, IDB_SNAPSHOT_LOCK_OPTIONS, async () => {
|
||||
const snapshot = await dumpIndexedDatabases(params?.databasePrefix);
|
||||
if (snapshot.length === 0) {
|
||||
return 0;
|
||||
}
|
||||
fs.writeFileSync(snapshotPath, JSON.stringify(snapshot));
|
||||
fs.chmodSync(snapshotPath, 0o600);
|
||||
return snapshot.length;
|
||||
});
|
||||
const persistedCount = await withFileLock(
|
||||
snapshotPath,
|
||||
MATRIX_IDB_SNAPSHOT_LOCK_OPTIONS,
|
||||
async () => {
|
||||
const snapshot = await dumpIndexedDatabases(params?.databasePrefix);
|
||||
if (snapshot.length === 0) {
|
||||
return 0;
|
||||
}
|
||||
fs.writeFileSync(snapshotPath, JSON.stringify(snapshot));
|
||||
fs.chmodSync(snapshotPath, 0o600);
|
||||
return snapshot.length;
|
||||
},
|
||||
);
|
||||
if (persistedCount === 0) return;
|
||||
LogService.debug(
|
||||
"IdbPersistence",
|
||||
|
|
|
|||
Loading…
Reference in New Issue