From d2dcd6fca6d7854bea0d2e5e90ace5e77477201d Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Tue, 31 Mar 2026 17:17:20 +0900 Subject: [PATCH] fix(memory): stagger qmd embed maintenance across agents (#58180) * fix(memory): stagger qmd embed maintenance across agents * fix(memory): keep qmd embed serialization in-process * fix(memory): extend qmd embed lock wait budget --- CHANGELOG.md | 1 + .../src/memory/qmd-manager.test.ts | 123 ++++++++++++++++ .../memory-core/src/memory/qmd-manager.ts | 134 +++++++++++++++--- 3 files changed, 236 insertions(+), 22 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 58ecb9b2abc..c75479436ac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -59,6 +59,7 @@ Docs: https://docs.openclaw.ai - macOS/local gateway: stop OpenClaw.app from killing healthy local gateway listeners after startup by recognizing the current `openclaw-gateway` process title and using the current `openclaw gateway` launch shape. - Gateway/OpenAI compatibility: accept flat Responses API function tool definitions on `/v1/responses` and preserve `strict` when normalizing hosted tools into the embedded runner, so spec-compliant clients like Codex no longer fail validation or silently lose strict tool enforcement. Thanks @malaiwah and @vincentkoc. - Memory/QMD: resolve slugified `memory_search` file hints back to the indexed filesystem path before returning search hits, so `memory_get` works again for mixed-case and spaced paths. (#50313) Thanks @erra9x. +- Memory/QMD: serialize cross-process `qmd embed` runs behind a shared lock and stagger periodic embed timers so multi-agent QMD collections stop thundering-herding on startup and every maintenance interval. Thanks @vincentkoc. - OpenAI/Codex fast mode: map `/fast` to priority processing on native OpenAI and Codex Responses endpoints instead of rewriting reasoning settings, and document the exact endpoint and override behavior. - Memory/QMD: weight CJK-heavy text correctly when estimating chunk sizes, preserve surrogate-pair characters during fine splits, and keep long Latin lines on the old chunk boundaries so memory indexing produces better-sized chunks for CJK notes. (#40271) Thanks @AaronLuo00. - Security/LINE: make webhook signature validation run the timing-safe compare even when the supplied signature length is wrong, closing a small timing side-channel. (#55663) Thanks @gavyngong. diff --git a/extensions/memory-core/src/memory/qmd-manager.test.ts b/extensions/memory-core/src/memory/qmd-manager.test.ts index 41e940869ae..76ed983bff2 100644 --- a/extensions/memory-core/src/memory/qmd-manager.test.ts +++ b/extensions/memory-core/src/memory/qmd-manager.test.ts @@ -20,7 +20,13 @@ const { watchMock } = vi.hoisted(() => ({ }); }), })); +const { withFileLockMock } = vi.hoisted(() => ({ + withFileLockMock: vi.fn( + async (_filePath: string, _options: unknown, fn: () => Promise) => await fn(), + ), +})); const MCPORTER_STATE_KEY = Symbol.for("openclaw.mcporterState"); +const QMD_EMBED_QUEUE_KEY = Symbol.for("openclaw.qmdEmbedQueueTail"); type MockChild = EventEmitter & { stdout: EventEmitter; @@ -106,6 +112,14 @@ vi.mock("chokidar", () => ({ watch: watchMock, })); +vi.mock("openclaw/plugin-sdk/file-lock", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + withFileLock: withFileLockMock, + }; +}); + import { spawn as mockedSpawn } from "node:child_process"; import type { OpenClawConfig } from "openclaw/plugin-sdk/memory-core-host-engine-foundation"; import { @@ -128,6 +142,7 @@ describe("QmdMemoryManager", () => { let cfg: OpenClawConfig; const agentId = "main"; const openManagers = new Set(); + let embedStartupJitterSpy: ReturnType | null = null; function trackManager(manager: T): T { if (manager) { @@ -166,6 +181,7 @@ describe("QmdMemoryManager", () => { spawnMock.mockClear(); spawnMock.mockImplementation(() => createMockChild()); watchMock.mockClear(); + withFileLockMock.mockClear(); logWarnMock.mockClear(); logDebugMock.mockClear(); logInfoMock.mockClear(); @@ -202,6 +218,14 @@ describe("QmdMemoryManager", () => { }, }, } as OpenClawConfig; + embedStartupJitterSpy = vi + .spyOn( + QmdMemoryManager.prototype as unknown as { + resolveEmbedStartupJitterMs: () => number; + }, + "resolveEmbedStartupJitterMs", + ) + .mockReturnValue(0); }); afterEach(async () => { @@ -212,6 +236,8 @@ describe("QmdMemoryManager", () => { ); openManagers.clear(); await fs.rm(tmpRoot, { recursive: true, force: true }); + embedStartupJitterSpy?.mockRestore(); + embedStartupJitterSpy = null; vi.useRealTimers(); delete process.env.OPENCLAW_STATE_DIR; if (originalPath === undefined) { @@ -230,6 +256,7 @@ describe("QmdMemoryManager", () => { (process.env as NodeJS.ProcessEnv & { Path?: string }).Path = originalWindowsPath; } delete (globalThis as Record)[MCPORTER_STATE_KEY]; + delete (globalThis as Record)[QMD_EMBED_QUEUE_KEY]; }); it("debounces back-to-back sync calls", async () => { @@ -3112,6 +3139,102 @@ describe("QmdMemoryManager", () => { await manager.close(); }); + it("delays the first periodic embed maintenance run by stable startup jitter", async () => { + vi.useFakeTimers(); + embedStartupJitterSpy?.mockRestore(); + embedStartupJitterSpy = vi + .spyOn( + QmdMemoryManager.prototype as unknown as { + resolveEmbedStartupJitterMs: () => number; + }, + "resolveEmbedStartupJitterMs", + ) + .mockReturnValue(60_000); + cfg = { + ...cfg, + memory: { + backend: "qmd", + qmd: { + includeDefaultMemory: false, + searchMode: "query", + update: { + interval: "0s", + debounceMs: 0, + onBoot: false, + embedInterval: "5m", + }, + paths: [{ path: workspaceDir, pattern: "**/*.md", name: "workspace" }], + }, + }, + } as OpenClawConfig; + + const { manager } = await createManager({ mode: "full" }); + + await vi.advanceTimersByTimeAsync(59_999); + const beforeCalls = spawnMock.mock.calls + .map((call: unknown[]) => call[1] as string[]) + .filter((args: string[]) => args[0] === "update" || args[0] === "embed"); + expect(beforeCalls).toHaveLength(0); + + await vi.advanceTimersByTimeAsync(1); + const commandCalls = spawnMock.mock.calls + .map((call: unknown[]) => call[1] as string[]) + .filter((args: string[]) => args[0] === "update" || args[0] === "embed"); + expect(commandCalls).toEqual([["update"], ["embed"]]); + + await manager.close(); + }); + + it("serializes qmd embeds within a process before taking the shared file lock", async () => { + vi.useFakeTimers(); + const embedChildren: MockChild[] = []; + spawnMock.mockImplementation((_cmd: string, args: string[]) => { + if (args[0] === "embed") { + const child = createMockChild({ autoClose: false }); + embedChildren.push(child); + return child; + } + return createMockChild(); + }); + + const first = await createManager({ mode: "status" }); + const second = await createManager({ mode: "status" }); + const firstSync = first.manager.sync({ reason: "manual", force: true }); + await vi.advanceTimersByTimeAsync(0); + expect(embedChildren).toHaveLength(1); + expect(withFileLockMock).toHaveBeenCalledWith( + expect.any(String), + expect.objectContaining({ + retries: expect.objectContaining({ + retries: expect.any(Number), + maxTimeout: 10_000, + }), + stale: expect.any(Number), + }), + expect.any(Function), + ); + const lockOptions = withFileLockMock.mock.calls[0]?.[1] as { + retries: { retries: number }; + stale: number; + }; + expect(lockOptions.retries.retries).toBeGreaterThanOrEqual(90); + expect(lockOptions.stale).toBeGreaterThanOrEqual(15 * 60 * 1000); + + const secondSync = second.manager.sync({ reason: "manual", force: true }); + await vi.advanceTimersByTimeAsync(0); + expect(embedChildren).toHaveLength(1); + + embedChildren[0]?.closeWith(0); + await vi.advanceTimersByTimeAsync(0); + expect(embedChildren).toHaveLength(2); + + embedChildren[1]?.closeWith(0); + await expect(firstSync).resolves.toBeUndefined(); + await expect(secondSync).resolves.toBeUndefined(); + await first.manager.close(); + await second.manager.close(); + }); + it("runs qmd embed in search mode for forced sync", async () => { cfg = { ...cfg, diff --git a/extensions/memory-core/src/memory/qmd-manager.ts b/extensions/memory-core/src/memory/qmd-manager.ts index 99379938359..51565ea3558 100644 --- a/extensions/memory-core/src/memory/qmd-manager.ts +++ b/extensions/memory-core/src/memory/qmd-manager.ts @@ -1,8 +1,10 @@ +import crypto from "node:crypto"; import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; import readline from "node:readline"; import chokidar, { type FSWatcher } from "chokidar"; +import { withFileLock } from "openclaw/plugin-sdk/file-lock"; import { createSubsystemLogger, resolveMemorySearchConfig, @@ -52,7 +54,15 @@ const NUL_MARKER_RE = /(?:\^@|\\0|\\x00|\\u0000|null\s*byte|nul\s*byte)/i; const QMD_EMBED_BACKOFF_BASE_MS = 60_000; const QMD_EMBED_BACKOFF_MAX_MS = 60 * 60 * 1000; const HAN_SCRIPT_RE = /[\u3400-\u9fff]/u; +const QMD_EMBED_LOCK_MIN_WAIT_MS = 15 * 60 * 1000; +const QMD_EMBED_LOCK_RETRY_TEMPLATE = { + factor: 1.2, + minTimeout: 250, + maxTimeout: 10_000, + randomize: true, +} as const; const MCPORTER_STATE_KEY = Symbol.for("openclaw.mcporterState"); +const QMD_EMBED_QUEUE_KEY = Symbol.for("openclaw.qmdEmbedQueueTail"); const IGNORED_MEMORY_WATCH_DIR_NAMES = new Set([ ".git", "node_modules", @@ -68,7 +78,9 @@ type McporterState = { daemonStart: Promise | null; }; -let qmdEmbedQueueTail: Promise = Promise.resolve(); +type QmdEmbedQueueState = { + tail: Promise; +}; function getMcporterState(): McporterState { return resolveGlobalSingleton(MCPORTER_STATE_KEY, () => ({ @@ -77,6 +89,12 @@ function getMcporterState(): McporterState { })); } +function getQmdEmbedQueueState(): QmdEmbedQueueState { + return resolveGlobalSingleton(QMD_EMBED_QUEUE_KEY, () => ({ + tail: Promise.resolve(), + })); +} + function hasHanScript(value: string): boolean { return HAN_SCRIPT_RE.test(value); } @@ -87,26 +105,33 @@ function normalizeHanBm25Query(query: string): string { return trimmed; } +function resolveStableJitterMs(params: { seed: string; windowMs: number }): number { + if (params.windowMs <= 0) { + return 0; + } + const hash = crypto.createHash("sha256").update(params.seed).digest(); + const bucket = hash.readUInt32BE(0); + return bucket % (Math.floor(params.windowMs) + 1); +} + +function resolveQmdEmbedLockOptions(embedTimeoutMs: number) { + const expectedEmbedMs = Math.max(1, embedTimeoutMs); + const waitBudgetMs = Math.max(QMD_EMBED_LOCK_MIN_WAIT_MS, expectedEmbedMs * 6); + return { + retries: { + retries: Math.max(60, Math.ceil(waitBudgetMs / QMD_EMBED_LOCK_RETRY_TEMPLATE.maxTimeout)), + ...QMD_EMBED_LOCK_RETRY_TEMPLATE, + }, + stale: Math.max(QMD_EMBED_LOCK_MIN_WAIT_MS, expectedEmbedMs * 2), + }; +} + function shouldIgnoreMemoryWatchPath(watchPath: string): boolean { const normalized = path.normalize(watchPath); const parts = normalized.split(path.sep).map((segment) => segment.trim().toLowerCase()); return parts.some((segment) => IGNORED_MEMORY_WATCH_DIR_NAMES.has(segment)); } -async function runWithQmdEmbedLock(task: () => Promise): Promise { - const previous = qmdEmbedQueueTail; - let release: (() => void) | undefined; - qmdEmbedQueueTail = new Promise((resolve) => { - release = resolve; - }); - await previous.catch(() => undefined); - try { - return await task(); - } finally { - release?.(); - } -} - type CollectionRoot = { path: string; kind: MemorySource; @@ -337,11 +362,33 @@ export class QmdMemoryManager implements MemorySearchManager { }, this.qmd.update.intervalMs); } if (this.shouldScheduleEmbedTimer()) { - this.embedTimer = setInterval(() => { - void this.runUpdate("embed-interval").catch((err) => { - log.warn(`qmd embed interval update failed (${String(err)})`); - }); - }, this.qmd.update.embedIntervalMs); + const startPeriodicEmbedTimer = () => { + this.embedTimer = setInterval(() => { + void this.runUpdate("embed-interval").catch((err) => { + log.warn(`qmd embed interval update failed (${String(err)})`); + }); + }, this.qmd.update.embedIntervalMs); + }; + const initialDelayMs = this.resolveEmbedStartupJitterMs(); + if (initialDelayMs > 0) { + this.embedTimer = setTimeout(() => { + this.embedTimer = null; + if (this.closed) { + return; + } + void this.runUpdate("embed-interval") + .catch((err) => { + log.warn(`qmd embed interval update failed (${String(err)})`); + }) + .finally(() => { + if (!this.closed) { + startPeriodicEmbedTimer(); + } + }); + }, initialDelayMs); + } else { + startPeriodicEmbedTimer(); + } } } @@ -1085,7 +1132,7 @@ export class QmdMemoryManager implements MemorySearchManager { this.updateTimer = null; } if (this.embedTimer) { - clearInterval(this.embedTimer); + clearTimeout(this.embedTimer); this.embedTimer = null; } if (this.watchTimer) { @@ -1136,7 +1183,7 @@ export class QmdMemoryManager implements MemorySearchManager { this.dirty = false; if (this.shouldRunEmbed(force)) { try { - await runWithQmdEmbedLock(async () => { + await this.withQmdEmbedLock(async () => { await this.runQmd(["embed"], { timeoutMs: this.qmd.update.embedTimeoutMs, discardOutput: true, @@ -1303,6 +1350,49 @@ export class QmdMemoryManager implements MemorySearchManager { return updateIntervalMs <= 0 || updateIntervalMs > embedIntervalMs; } + private resolveEmbedStartupJitterMs(): number { + const windowMs = this.qmd.update.embedIntervalMs; + if (windowMs <= 0) { + return 0; + } + const customCollections = this.qmd.collections + .filter((collection) => collection.kind === "custom") + .map((collection) => `${collection.path}\u0000${collection.pattern}`) + .toSorted() + .join("\u0001"); + if (!customCollections) { + return 0; + } + return resolveStableJitterMs({ + seed: `${this.agentId}:${customCollections}`, + windowMs, + }); + } + + private async withQmdEmbedLock(task: () => Promise): Promise { + const lockPath = path.join(this.stateDir, "qmd", "embed.lock"); + const queue = getQmdEmbedQueueState(); + const previous = queue.tail; + let releaseCurrent!: () => void; + const current = new Promise((resolve) => { + releaseCurrent = resolve; + }); + queue.tail = previous.then( + () => current, + () => current, + ); + await previous.catch(() => undefined); + try { + return await withFileLock( + lockPath, + resolveQmdEmbedLockOptions(this.qmd.update.embedTimeoutMs), + task, + ); + } finally { + releaseCurrent(); + } + } + private noteEmbedFailure(reason: string, err: unknown): void { this.embedFailureCount += 1; const delayMs = Math.min(