diff --git a/docs/help/debugging.md b/docs/help/debugging.md index 04fd150ef20..dc6a83780cf 100644 --- a/docs/help/debugging.md +++ b/docs/help/debugging.md @@ -50,7 +50,8 @@ gateway without forcing a `tsdown` rebuild; source and config changes still rebuild `dist` first. Add any gateway CLI flags after `gateway:watch` and they will be passed through on -each restart. +each restart. Re-running the same watch command for the same repo/flag set now +replaces the older watcher instead of leaving duplicate watcher parents behind. ## Dev profile + dev gateway (--dev) diff --git a/scripts/watch-node.mjs b/scripts/watch-node.mjs index ff6e10e4b37..eecda3bf0b0 100644 --- a/scripts/watch-node.mjs +++ b/scripts/watch-node.mjs @@ -1,5 +1,7 @@ #!/usr/bin/env node import { spawn } from "node:child_process"; +import { createHash } from "node:crypto"; +import fs from "node:fs"; import path from "node:path"; import process from "node:process"; import { pathToFileURL } from "node:url"; @@ -11,6 +13,9 @@ const WATCH_RESTART_SIGNAL = "SIGTERM"; const WATCH_RESTARTABLE_CHILD_EXIT_CODES = new Set([143]); const WATCH_RESTARTABLE_CHILD_SIGNALS = new Set(["SIGTERM"]); const WATCH_IGNORED_PATH_SEGMENTS = new Set([".git", "dist", "node_modules"]); +const WATCH_LOCK_WAIT_MS = 5_000; +const WATCH_LOCK_POLL_MS = 100; +const WATCH_LOCK_DIR = path.join(".local", "watch-node"); const buildRunnerArgs = (args) => [WATCH_NODE_RUNNER, ...args]; @@ -65,6 +70,129 @@ const shouldRestartAfterChildExit = (exitCode, exitSignal) => (typeof exitCode === "number" && WATCH_RESTARTABLE_CHILD_EXIT_CODES.has(exitCode)) || (typeof exitSignal === "string" && WATCH_RESTARTABLE_CHILD_SIGNALS.has(exitSignal)); +const isProcessAlive = (pid, signalProcess) => { + if (!Number.isInteger(pid) || pid <= 0) { + return false; + } + try { + signalProcess(pid, 0); + } catch { + return false; + } + return true; +}; + +const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms)); + +const createWatchLockKey = (cwd, args) => + createHash("sha256").update(cwd).update("\0").update(args.join("\0")).digest("hex").slice(0, 12); + +export const resolveWatchLockPath = (cwd, args = []) => + path.join(cwd, WATCH_LOCK_DIR, `${createWatchLockKey(cwd, args)}.json`); + +const readWatchLock = (lockPath) => { + try { + return JSON.parse(fs.readFileSync(lockPath, "utf8")); + } catch { + return null; + } +}; + +const removeWatchLock = (lockPath) => { + try { + fs.unlinkSync(lockPath); + } catch (error) { + if (error?.code !== "ENOENT") { + throw error; + } + } +}; + +const writeWatchLock = (lockPath, payload) => { + fs.mkdirSync(path.dirname(lockPath), { recursive: true }); + fs.writeFileSync(lockPath, `${JSON.stringify(payload)}\n`, { + encoding: "utf8", + flag: "wx", + }); +}; + +const logWatcher = (message, deps) => { + deps.process.stderr?.write?.(`[openclaw] ${message}\n`); +}; + +const waitForWatcherRelease = async (lockPath, pid, deps) => { + const deadline = deps.now() + WATCH_LOCK_WAIT_MS; + while (deps.now() < deadline) { + if (!isProcessAlive(pid, deps.signalProcess)) { + return true; + } + if (!fs.existsSync(lockPath)) { + return true; + } + await deps.sleep(WATCH_LOCK_POLL_MS); + } + return !isProcessAlive(pid, deps.signalProcess); +}; + +const acquireWatchLock = async (deps, watchSession) => { + const lockPath = resolveWatchLockPath(deps.cwd, deps.args); + const payload = { + pid: deps.process.pid, + command: deps.args.join(" "), + createdAt: new Date(deps.now()).toISOString(), + cwd: deps.cwd, + watchSession, + }; + + while (true) { + try { + writeWatchLock(lockPath, payload); + return { lockPath, pid: deps.process.pid }; + } catch (error) { + if (error?.code !== "EEXIST") { + throw error; + } + } + + const existing = readWatchLock(lockPath); + const existingPid = existing?.pid; + if (!isProcessAlive(existingPid, deps.signalProcess)) { + removeWatchLock(lockPath); + continue; + } + + logWatcher(`Replacing existing watcher pid ${existingPid}.`, deps); + try { + deps.signalProcess(existingPid, WATCH_RESTART_SIGNAL); + } catch (error) { + if (isProcessAlive(existingPid, deps.signalProcess)) { + logWatcher( + `Failed to stop existing watcher pid ${existingPid}: ${error?.message ?? "unknown error"}`, + deps, + ); + return null; + } + } + + const released = await waitForWatcherRelease(lockPath, existingPid, deps); + if (!released) { + logWatcher(`Timed out waiting for watcher pid ${existingPid} to exit.`, deps); + return null; + } + removeWatchLock(lockPath); + } +}; + +const releaseWatchLock = (lockHandle) => { + if (!lockHandle) { + return; + } + const current = readWatchLock(lockHandle.lockPath); + if (current?.pid === lockHandle.pid) { + removeWatchLock(lockHandle.lockPath); + } +}; + /** * @param {{ * spawn?: typeof spawn; @@ -73,6 +201,9 @@ const shouldRestartAfterChildExit = (exitCode, exitSignal) => * args?: string[]; * env?: NodeJS.ProcessEnv; * now?: () => number; + * sleep?: (ms: number) => Promise; + * signalProcess?: (pid: number, signal: string | number) => void; + * lockDisabled?: boolean; * createWatcher?: ( * watchPaths: string[], * options: { ignoreInitial: boolean; ignored: (watchPath: string) => boolean }, @@ -88,6 +219,9 @@ export async function runWatchMain(params = {}) { args: params.args ?? process.argv.slice(2), env: params.env ? { ...params.env } : { ...process.env }, now: params.now ?? Date.now, + sleep: params.sleep ?? sleep, + signalProcess: params.signalProcess ?? ((pid, signal) => process.kill(pid, signal)), + lockDisabled: params.lockDisabled === true, createWatcher: params.createWatcher ?? ((watchPaths, options) => chokidar.watch(watchPaths, options)), watchPaths: params.watchPaths ?? runNodeWatchedPaths, @@ -109,6 +243,7 @@ export async function runWatchMain(params = {}) { let shuttingDown = false; let restartRequested = false; let watchProcess = null; + let lockHandle = null; let onSigInt; let onSigTerm; @@ -129,6 +264,7 @@ export async function runWatchMain(params = {}) { if (onSigTerm) { deps.process.off("SIGTERM", onSigTerm); } + releaseWatchLock(lockHandle); watcher.close?.().catch?.(() => {}); resolve(code); }; @@ -139,6 +275,11 @@ export async function runWatchMain(params = {}) { env: childEnv, stdio: "inherit", }); + watchProcess.on("error", (error) => { + watchProcess = null; + logWatcher(`Failed to spawn watcher child: ${error?.message ?? "unknown error"}`, deps); + settle(1); + }); watchProcess.on("exit", (exitCode, exitSignal) => { watchProcess = null; if (shuttingDown) { @@ -178,8 +319,6 @@ export async function runWatchMain(params = {}) { settle(1); }); - startRunner(); - onSigInt = () => { shuttingDown = true; if (watchProcess && typeof watchProcess.kill === "function") { @@ -197,6 +336,26 @@ export async function runWatchMain(params = {}) { deps.process.on("SIGINT", onSigInt); deps.process.on("SIGTERM", onSigTerm); + + if (deps.lockDisabled) { + lockHandle = { lockPath: "", pid: deps.process.pid }; + startRunner(); + return; + } + + void acquireWatchLock(deps, watchSession) + .then((handle) => { + if (!handle) { + settle(1); + return; + } + lockHandle = handle; + startRunner(); + }) + .catch((error) => { + logWatcher(`Failed to acquire watcher lock: ${error?.message ?? "unknown error"}`, deps); + settle(1); + }); }); } diff --git a/src/infra/scripts-modules.d.ts b/src/infra/scripts-modules.d.ts index 2a911f8116f..4dd21a614bc 100644 --- a/src/infra/scripts-modules.d.ts +++ b/src/infra/scripts-modules.d.ts @@ -1,4 +1,5 @@ declare module "../../scripts/watch-node.mjs" { + export function resolveWatchLockPath(cwd: string, args?: string[]): string; export function runWatchMain(params?: { spawn?: ( cmd: string, @@ -10,6 +11,9 @@ declare module "../../scripts/watch-node.mjs" { args?: string[]; env?: NodeJS.ProcessEnv; now?: () => number; + sleep?: (ms: number) => Promise; + signalProcess?: (pid: number, signal: NodeJS.Signals | 0) => void; + lockDisabled?: boolean; }): Promise; } diff --git a/src/infra/watch-node.test.ts b/src/infra/watch-node.test.ts index 905d89eebb8..8dc33662166 100644 --- a/src/infra/watch-node.test.ts +++ b/src/infra/watch-node.test.ts @@ -1,3 +1,4 @@ +import { createHash } from "node:crypto"; import { EventEmitter } from "node:events"; import fs from "node:fs"; import os from "node:os"; @@ -12,6 +13,20 @@ const VOICE_CALL_MANIFEST = bundledPluginFile("voice-call", "openclaw.plugin.jso const VOICE_CALL_PACKAGE = bundledPluginFile("voice-call", "package.json"); const VOICE_CALL_INDEX = bundledPluginFile("voice-call", "index.ts"); const VOICE_CALL_RUNTIME = bundledPluginFile("voice-call", "src/runtime.ts"); +type WatchRunParams = NonNullable[0]> & { + lockDisabled?: boolean; + signalProcess?: (pid: number, signal: NodeJS.Signals | 0) => void; + sleep?: (ms: number) => Promise; +}; + +const runWatch = (params: WatchRunParams) => runWatchMain(params); +const resolveTestWatchLockPath = (cwd: string, args: string[]) => + path.join( + cwd, + ".local", + "watch-node", + `${createHash("sha256").update(cwd).update("\0").update(args.join("\0")).digest("hex").slice(0, 12)}.json`, + ); const createFakeProcess = () => Object.assign(new EventEmitter(), { @@ -39,11 +54,12 @@ describe("watch-node script", () => { fs.mkdirSync(path.join(cwd, "src", "infra"), { recursive: true }); fs.mkdirSync(path.join(cwd, "extensions", "voice-call"), { recursive: true }); - const runPromise = runWatchMain({ + const runPromise = runWatch({ args: ["gateway", "--force"], cwd, createWatcher, env: { PATH: "/usr/bin" }, + lockDisabled: true, now: () => 1700000000000, process: fakeProcess, spawn, @@ -104,9 +120,10 @@ describe("watch-node script", () => { it("terminates child on SIGINT and returns shell interrupt code", async () => { const { child, spawn, watcher, createWatcher, fakeProcess } = createWatchHarness(); - const runPromise = runWatchMain({ + const runPromise = runWatch({ args: ["gateway", "--force"], createWatcher, + lockDisabled: true, process: fakeProcess, spawn, }); @@ -124,9 +141,10 @@ describe("watch-node script", () => { it("terminates child on SIGTERM and returns shell terminate code", async () => { const { child, spawn, watcher, createWatcher, fakeProcess } = createWatchHarness(); - const runPromise = runWatchMain({ + const runPromise = runWatch({ args: ["gateway", "--force"], createWatcher, + lockDisabled: true, process: fakeProcess, spawn, }); @@ -144,9 +162,10 @@ describe("watch-node script", () => { it("returns the child exit code when the runner exits on its own", async () => { const { child, spawn, watcher, createWatcher, fakeProcess } = createWatchHarness(); - const runPromise = runWatchMain({ + const runPromise = runWatch({ args: ["gateway", "--force", "--help"], createWatcher, + lockDisabled: true, process: fakeProcess, spawn, }); @@ -174,9 +193,10 @@ describe("watch-node script", () => { const createWatcher = vi.fn(() => watcher); const fakeProcess = createFakeProcess(); - const runPromise = runWatchMain({ + const runPromise = runWatch({ args: ["gateway", "--force"], createWatcher, + lockDisabled: true, process: fakeProcess, spawn, }); @@ -195,13 +215,14 @@ describe("watch-node script", () => { it("forces no-respawn for watch children even when supervisor hints are inherited", async () => { const { child, spawn, watcher, createWatcher, fakeProcess } = createWatchHarness(); - const runPromise = runWatchMain({ + const runPromise = runWatch({ args: ["gateway", "--force"], createWatcher, env: { LAUNCH_JOB_LABEL: "ai.openclaw.gateway", PATH: "/usr/bin", }, + lockDisabled: true, process: fakeProcess, spawn, }); @@ -255,9 +276,10 @@ describe("watch-node script", () => { const createWatcher = vi.fn(() => watcher); const fakeProcess = createFakeProcess(); - const runPromise = runWatchMain({ + const runPromise = runWatch({ args: ["gateway", "--force"], createWatcher, + lockDisabled: true, process: fakeProcess, spawn, }); @@ -305,9 +327,10 @@ describe("watch-node script", () => { it("kills child and exits when watcher emits an error", async () => { const { child, spawn, watcher, createWatcher, fakeProcess } = createWatchHarness(); - const runPromise = runWatchMain({ + const runPromise = runWatch({ args: ["gateway", "--force"], createWatcher, + lockDisabled: true, process: fakeProcess, spawn, }); @@ -319,4 +342,66 @@ describe("watch-node script", () => { expect(child.kill).toHaveBeenCalledWith("SIGTERM"); expect(watcher.close).toHaveBeenCalledTimes(1); }); + + it("replaces an existing watcher lock holder before starting", async () => { + const { child, spawn, watcher, createWatcher, fakeProcess } = createWatchHarness(); + const cwd = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-watch-node-lock-")); + const lockPath = resolveTestWatchLockPath(cwd, ["gateway", "--force"]); + fs.mkdirSync(path.dirname(lockPath), { recursive: true }); + fs.writeFileSync( + lockPath, + `${JSON.stringify({ + pid: 2121, + command: "gateway --force", + createdAt: new Date(1_700_000_000_000).toISOString(), + cwd, + watchSession: "existing-session", + })}\n`, + "utf8", + ); + + let existingWatcherAlive = true; + const signalProcess = vi.fn((pid: number, signal: NodeJS.Signals | 0) => { + if (signal === 0) { + if (pid === 2121 && existingWatcherAlive) { + return; + } + throw Object.assign(new Error("ESRCH"), { code: "ESRCH" }); + } + if (pid === 2121 && signal === "SIGTERM") { + existingWatcherAlive = false; + return; + } + throw new Error(`unexpected signal ${signal} for pid ${pid}`); + }); + + const runPromise = runWatch({ + args: ["gateway", "--force"], + createWatcher, + cwd, + now: () => 1_700_000_000_000, + process: fakeProcess, + signalProcess, + sleep: async () => {}, + spawn, + }); + + await new Promise((resolve) => setImmediate(resolve)); + + expect(signalProcess).toHaveBeenCalledWith(2121, "SIGTERM"); + expect(spawn).toHaveBeenCalledTimes(1); + expect(JSON.parse(fs.readFileSync(lockPath, "utf8"))).toMatchObject({ + pid: 4242, + command: "gateway --force", + watchSession: "1700000000000-4242", + }); + + fakeProcess.emit("SIGINT"); + const exitCode = await runPromise; + + expect(exitCode).toBe(130); + expect(child.kill).toHaveBeenCalledWith("SIGTERM"); + expect(fs.existsSync(lockPath)).toBe(false); + expect(watcher.close).toHaveBeenCalledTimes(1); + }); });