From 5ea03efe92d639169769762e5b5082b54d60dbb6 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 13 Mar 2026 18:33:59 +0000 Subject: [PATCH] fix: harden windows gateway lifecycle --- src/cli/daemon-cli/lifecycle.test.ts | 106 +++------- src/cli/daemon-cli/lifecycle.ts | 89 +-------- src/cli/daemon-cli/restart-health.test.ts | 26 +++ src/cli/daemon-cli/restart-health.ts | 19 +- src/daemon/schtasks.startup-fallback.test.ts | 10 +- src/daemon/schtasks.stop.test.ts | 197 +++++++++++++++++++ src/daemon/schtasks.ts | 180 ++++++++++++++++- src/gateway/client.ts | 7 +- src/gateway/probe.test.ts | 31 +++ src/gateway/probe.ts | 24 +++ src/infra/gateway-processes.ts | 162 +++++++++++++++ 11 files changed, 680 insertions(+), 171 deletions(-) create mode 100644 src/daemon/schtasks.stop.test.ts create mode 100644 src/infra/gateway-processes.ts diff --git a/src/cli/daemon-cli/lifecycle.test.ts b/src/cli/daemon-cli/lifecycle.test.ts index 61899e4e78c..7d03656f86b 100644 --- a/src/cli/daemon-cli/lifecycle.test.ts +++ b/src/cli/daemon-cli/lifecycle.test.ts @@ -1,8 +1,5 @@ import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; -const mockReadFileSync = vi.hoisted(() => vi.fn()); -const mockSpawnSync = vi.hoisted(() => vi.fn()); - type RestartHealthSnapshot = { healthy: boolean; staleGatewayPids: number[]; @@ -35,7 +32,9 @@ const terminateStaleGatewayPids = vi.fn(); const renderGatewayPortHealthDiagnostics = vi.fn(() => ["diag: unhealthy port"]); const renderRestartDiagnostics = vi.fn(() => ["diag: unhealthy runtime"]); const resolveGatewayPort = vi.fn(() => 18789); -const findGatewayPidsOnPortSync = vi.fn<(port: number) => number[]>(() => []); +const findVerifiedGatewayListenerPidsOnPortSync = vi.fn<(port: number) => number[]>(() => []); +const signalVerifiedGatewayPidSync = vi.fn<(pid: number, signal: "SIGTERM" | "SIGUSR1") => void>(); +const formatGatewayPidList = vi.fn<(pids: number[]) => string>((pids) => pids.join(", ")); const probeGateway = vi.fn< (opts: { url: string; @@ -49,24 +48,18 @@ const probeGateway = vi.fn< const isRestartEnabled = vi.fn<(config?: { commands?: unknown }) => boolean>(() => true); const loadConfig = vi.fn(() => ({})); -vi.mock("node:fs", () => ({ - default: { - readFileSync: (...args: unknown[]) => mockReadFileSync(...args), - }, -})); - -vi.mock("node:child_process", () => ({ - spawnSync: (...args: unknown[]) => mockSpawnSync(...args), -})); - vi.mock("../../config/config.js", () => ({ loadConfig: () => loadConfig(), readBestEffortConfig: async () => loadConfig(), resolveGatewayPort, })); -vi.mock("../../infra/restart.js", () => ({ - findGatewayPidsOnPortSync: (port: number) => findGatewayPidsOnPortSync(port), +vi.mock("../../infra/gateway-processes.js", () => ({ + findVerifiedGatewayListenerPidsOnPortSync: (port: number) => + findVerifiedGatewayListenerPidsOnPortSync(port), + signalVerifiedGatewayPidSync: (pid: number, signal: "SIGTERM" | "SIGUSR1") => + signalVerifiedGatewayPidSync(pid, signal), + formatGatewayPidList: (pids: number[]) => formatGatewayPidList(pids), })); vi.mock("../../gateway/probe.js", () => ({ @@ -121,12 +114,12 @@ describe("runDaemonRestart health checks", () => { renderGatewayPortHealthDiagnostics.mockReset(); renderRestartDiagnostics.mockReset(); resolveGatewayPort.mockReset(); - findGatewayPidsOnPortSync.mockReset(); + findVerifiedGatewayListenerPidsOnPortSync.mockReset(); + signalVerifiedGatewayPidSync.mockReset(); + formatGatewayPidList.mockReset(); probeGateway.mockReset(); isRestartEnabled.mockReset(); loadConfig.mockReset(); - mockReadFileSync.mockReset(); - mockSpawnSync.mockReset(); service.readCommand.mockResolvedValue({ programArguments: ["openclaw", "gateway", "--port", "18789"], @@ -158,23 +151,8 @@ describe("runDaemonRestart health checks", () => { configSnapshot: { commands: { restart: true } }, }); isRestartEnabled.mockReturnValue(true); - mockReadFileSync.mockImplementation((path: string) => { - const match = path.match(/\/proc\/(\d+)\/cmdline$/); - if (!match) { - throw new Error(`unexpected path ${path}`); - } - const pid = Number.parseInt(match[1] ?? "", 10); - if ([4200, 4300].includes(pid)) { - return ["openclaw", "gateway", "--port", "18789", ""].join("\0"); - } - throw new Error(`unknown pid ${pid}`); - }); - mockSpawnSync.mockReturnValue({ - error: null, - status: 0, - stdout: "openclaw gateway --port 18789", - stderr: "", - }); + signalVerifiedGatewayPidSync.mockImplementation(() => {}); + formatGatewayPidList.mockImplementation((pids) => pids.join(", ")); }); afterEach(() => { @@ -242,38 +220,20 @@ describe("runDaemonRestart health checks", () => { }); it("signals an unmanaged gateway process on stop", async () => { - vi.spyOn(process, "platform", "get").mockReturnValue("win32"); - const killSpy = vi.spyOn(process, "kill").mockImplementation(() => true); - findGatewayPidsOnPortSync.mockReturnValue([4200, 4200, 4300]); - mockSpawnSync.mockReturnValue({ - error: null, - status: 0, - stdout: - 'CommandLine="C:\\\\Program Files\\\\OpenClaw\\\\openclaw.exe" gateway --port 18789\r\n', - stderr: "", - }); + findVerifiedGatewayListenerPidsOnPortSync.mockReturnValue([4200, 4200, 4300]); runServiceStop.mockImplementation(async (params: { onNotLoaded?: () => Promise }) => { await params.onNotLoaded?.(); }); await runDaemonStop({ json: true }); - expect(findGatewayPidsOnPortSync).toHaveBeenCalledWith(18789); - expect(killSpy).toHaveBeenCalledWith(4200, "SIGTERM"); - expect(killSpy).toHaveBeenCalledWith(4300, "SIGTERM"); + expect(findVerifiedGatewayListenerPidsOnPortSync).toHaveBeenCalledWith(18789); + expect(signalVerifiedGatewayPidSync).toHaveBeenCalledWith(4200, "SIGTERM"); + expect(signalVerifiedGatewayPidSync).toHaveBeenCalledWith(4300, "SIGTERM"); }); it("signals a single unmanaged gateway process on restart", async () => { - vi.spyOn(process, "platform", "get").mockReturnValue("win32"); - const killSpy = vi.spyOn(process, "kill").mockImplementation(() => true); - findGatewayPidsOnPortSync.mockReturnValue([4200]); - mockSpawnSync.mockReturnValue({ - error: null, - status: 0, - stdout: - 'CommandLine="C:\\\\Program Files\\\\OpenClaw\\\\openclaw.exe" gateway --port 18789\r\n', - stderr: "", - }); + findVerifiedGatewayListenerPidsOnPortSync.mockReturnValue([4200]); runServiceRestart.mockImplementation( async (params: RestartParams & { onNotLoaded?: () => Promise }) => { await params.onNotLoaded?.(); @@ -291,8 +251,8 @@ describe("runDaemonRestart health checks", () => { await runDaemonRestart({ json: true }); - expect(findGatewayPidsOnPortSync).toHaveBeenCalledWith(18789); - expect(killSpy).toHaveBeenCalledWith(4200, "SIGUSR1"); + expect(findVerifiedGatewayListenerPidsOnPortSync).toHaveBeenCalledWith(18789); + expect(signalVerifiedGatewayPidSync).toHaveBeenCalledWith(4200, "SIGUSR1"); expect(probeGateway).toHaveBeenCalledTimes(1); expect(waitForGatewayHealthyListener).toHaveBeenCalledTimes(1); expect(waitForGatewayHealthyRestart).not.toHaveBeenCalled(); @@ -301,15 +261,7 @@ describe("runDaemonRestart health checks", () => { }); it("fails unmanaged restart when multiple gateway listeners are present", async () => { - vi.spyOn(process, "platform", "get").mockReturnValue("win32"); - findGatewayPidsOnPortSync.mockReturnValue([4200, 4300]); - mockSpawnSync.mockReturnValue({ - error: null, - status: 0, - stdout: - 'CommandLine="C:\\\\Program Files\\\\OpenClaw\\\\openclaw.exe" gateway --port 18789\r\n', - stderr: "", - }); + findVerifiedGatewayListenerPidsOnPortSync.mockReturnValue([4200, 4300]); runServiceRestart.mockImplementation( async (params: RestartParams & { onNotLoaded?: () => Promise }) => { await params.onNotLoaded?.(); @@ -323,7 +275,7 @@ describe("runDaemonRestart health checks", () => { }); it("fails unmanaged restart when the running gateway has commands.restart disabled", async () => { - findGatewayPidsOnPortSync.mockReturnValue([4200]); + findVerifiedGatewayListenerPidsOnPortSync.mockReturnValue([4200]); probeGateway.mockResolvedValue({ ok: true, configSnapshot: { commands: { restart: false } }, @@ -342,21 +294,13 @@ describe("runDaemonRestart health checks", () => { }); it("skips unmanaged signaling for pids that are not live gateway processes", async () => { - const killSpy = vi.spyOn(process, "kill").mockImplementation(() => true); - findGatewayPidsOnPortSync.mockReturnValue([4200]); - mockReadFileSync.mockReturnValue(["python", "-m", "http.server", ""].join("\0")); - mockSpawnSync.mockReturnValue({ - error: null, - status: 0, - stdout: "python -m http.server", - stderr: "", - }); + findVerifiedGatewayListenerPidsOnPortSync.mockReturnValue([]); runServiceStop.mockImplementation(async (params: { onNotLoaded?: () => Promise }) => { await params.onNotLoaded?.(); }); await runDaemonStop({ json: true }); - expect(killSpy).not.toHaveBeenCalled(); + expect(signalVerifiedGatewayPidSync).not.toHaveBeenCalled(); }); }); diff --git a/src/cli/daemon-cli/lifecycle.ts b/src/cli/daemon-cli/lifecycle.ts index 2b0775b0c48..53efaff9495 100644 --- a/src/cli/daemon-cli/lifecycle.ts +++ b/src/cli/daemon-cli/lifecycle.ts @@ -1,12 +1,12 @@ -import { spawnSync } from "node:child_process"; -import fsSync from "node:fs"; import { isRestartEnabled } from "../../config/commands.js"; import { readBestEffortConfig, resolveGatewayPort } from "../../config/config.js"; -import { parseCmdScriptCommandLine } from "../../daemon/cmd-argv.js"; import { resolveGatewayService } from "../../daemon/service.js"; import { probeGateway } from "../../gateway/probe.js"; -import { isGatewayArgv, parseProcCmdline } from "../../infra/gateway-process-argv.js"; -import { findGatewayPidsOnPortSync } from "../../infra/restart.js"; +import { + findVerifiedGatewayListenerPidsOnPortSync, + formatGatewayPidList, + signalVerifiedGatewayPidSync, +} from "../../infra/gateway-processes.js"; import { defaultRuntime } from "../../runtime.js"; import { theme } from "../../terminal/theme.js"; import { formatCliCommand } from "../command-format.js"; @@ -43,85 +43,12 @@ async function resolveGatewayLifecyclePort(service = resolveGatewayService()) { return portFromArgs ?? resolveGatewayPort(await readBestEffortConfig(), mergedEnv); } -function extractWindowsCommandLine(raw: string): string | null { - const lines = raw - .split(/\r?\n/) - .map((line) => line.trim()) - .filter(Boolean); - for (const line of lines) { - if (!line.toLowerCase().startsWith("commandline=")) { - continue; - } - const value = line.slice("commandline=".length).trim(); - return value || null; - } - return lines.find((line) => line.toLowerCase() !== "commandline") ?? null; -} - -function readGatewayProcessArgsSync(pid: number): string[] | null { - if (process.platform === "linux") { - try { - return parseProcCmdline(fsSync.readFileSync(`/proc/${pid}/cmdline`, "utf8")); - } catch { - return null; - } - } - if (process.platform === "darwin") { - const ps = spawnSync("ps", ["-o", "command=", "-p", String(pid)], { - encoding: "utf8", - timeout: 1000, - }); - if (ps.error || ps.status !== 0) { - return null; - } - const command = ps.stdout.trim(); - return command ? command.split(/\s+/) : null; - } - if (process.platform === "win32") { - const wmic = spawnSync( - "wmic", - ["process", "where", `ProcessId=${pid}`, "get", "CommandLine", "/value"], - { - encoding: "utf8", - timeout: 1000, - }, - ); - if (wmic.error || wmic.status !== 0) { - return null; - } - const command = extractWindowsCommandLine(wmic.stdout); - return command ? parseCmdScriptCommandLine(command) : null; - } - return null; -} - -function resolveGatewayListenerPids(port: number): number[] { - return Array.from(new Set(findGatewayPidsOnPortSync(port))) - .filter((pid): pid is number => Number.isFinite(pid) && pid > 0) - .filter((pid) => { - const args = readGatewayProcessArgsSync(pid); - return args != null && isGatewayArgv(args, { allowGatewayBinary: true }); - }); -} - function resolveGatewayPortFallback(): Promise { return readBestEffortConfig() .then((cfg) => resolveGatewayPort(cfg, process.env)) .catch(() => resolveGatewayPort(undefined, process.env)); } -function signalGatewayPid(pid: number, signal: "SIGTERM" | "SIGUSR1") { - const args = readGatewayProcessArgsSync(pid); - if (!args || !isGatewayArgv(args, { allowGatewayBinary: true })) { - throw new Error(`refusing to signal non-gateway process pid ${pid}`); - } - process.kill(pid, signal); -} - -function formatGatewayPidList(pids: number[]): string { - return pids.join(", "); -} - async function assertUnmanagedGatewayRestartEnabled(port: number): Promise { const probe = await probeGateway({ url: `ws://127.0.0.1:${port}`, @@ -143,7 +70,7 @@ async function assertUnmanagedGatewayRestartEnabled(port: number): Promise } function resolveVerifiedGatewayListenerPids(port: number): number[] { - return resolveGatewayListenerPids(port).filter( + return findVerifiedGatewayListenerPidsOnPortSync(port).filter( (pid): pid is number => Number.isFinite(pid) && pid > 0, ); } @@ -154,7 +81,7 @@ async function stopGatewayWithoutServiceManager(port: number) { return null; } for (const pid of pids) { - signalGatewayPid(pid, "SIGTERM"); + signalVerifiedGatewayPidSync(pid, "SIGTERM"); } return { result: "stopped" as const, @@ -173,7 +100,7 @@ async function restartGatewayWithoutServiceManager(port: number) { `multiple gateway processes are listening on port ${port}: ${formatGatewayPidList(pids)}; use "openclaw gateway status --deep" before retrying restart`, ); } - signalGatewayPid(pids[0], "SIGUSR1"); + signalVerifiedGatewayPidSync(pids[0], "SIGUSR1"); return { result: "restarted" as const, message: `Gateway restart signal sent to unmanaged process on port ${port}: ${pids[0]}.`, diff --git a/src/cli/daemon-cli/restart-health.test.ts b/src/cli/daemon-cli/restart-health.test.ts index 0202f591cc2..1a26f1a80dc 100644 --- a/src/cli/daemon-cli/restart-health.test.ts +++ b/src/cli/daemon-cli/restart-health.test.ts @@ -190,6 +190,32 @@ describe("inspectGatewayRestart", () => { ); }); + it("treats a busy port as healthy when runtime status lags but the probe succeeds", async () => { + Object.defineProperty(process, "platform", { value: "win32", configurable: true }); + + const service = { + readRuntime: vi.fn(async () => ({ status: "stopped" })), + } as unknown as GatewayService; + + inspectPortUsage.mockResolvedValue({ + port: 18789, + status: "busy", + listeners: [{ pid: 9100, commandLine: "openclaw-gateway" }], + hints: [], + }); + classifyPortListener.mockReturnValue("gateway"); + probeGateway.mockResolvedValue({ + ok: true, + close: null, + }); + + const { inspectGatewayRestart } = await import("./restart-health.js"); + const snapshot = await inspectGatewayRestart({ service, port: 18789 }); + + expect(snapshot.healthy).toBe(true); + expect(snapshot.staleGatewayPids).toEqual([]); + }); + it("treats auth-closed probe as healthy gateway reachability", async () => { const snapshot = await inspectAmbiguousOwnershipWithProbe({ ok: false, diff --git a/src/cli/daemon-cli/restart-health.ts b/src/cli/daemon-cli/restart-health.ts index 13741d2e9c4..9bfe3476ee6 100644 --- a/src/cli/daemon-cli/restart-health.ts +++ b/src/cli/daemon-cli/restart-health.ts @@ -65,7 +65,8 @@ async function confirmGatewayReachable(port: number): Promise { const probe = await probeGateway({ url: `ws://127.0.0.1:${port}`, auth: token || password ? { token, password } : undefined, - timeoutMs: 1_000, + timeoutMs: 3_000, + includeDetails: false, }); return probe.ok || looksLikeAuthClose(probe.close?.code, probe.close?.reason); } @@ -123,6 +124,22 @@ export async function inspectGatewayRestart(params: { }; } + if (portUsage.status === "busy" && runtime.status !== "running") { + try { + const reachable = await confirmGatewayReachable(params.port); + if (reachable) { + return { + runtime, + portUsage, + healthy: true, + staleGatewayPids: [], + }; + } + } catch { + // Probe is best-effort; keep the ownership-based diagnostics. + } + } + const gatewayListeners = portUsage.status === "busy" ? portUsage.listeners.filter( diff --git a/src/daemon/schtasks.startup-fallback.test.ts b/src/daemon/schtasks.startup-fallback.test.ts index 8b26a98e4ed..1a949856a09 100644 --- a/src/daemon/schtasks.startup-fallback.test.ts +++ b/src/daemon/schtasks.startup-fallback.test.ts @@ -29,9 +29,13 @@ vi.mock("../process/kill-tree.js", () => ({ killProcessTree: (...args: unknown[]) => killProcessTree(...args), })); -vi.mock("node:child_process", () => ({ - spawn, -})); +vi.mock("node:child_process", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + spawn, + }; +}); const { installScheduledTask, diff --git a/src/daemon/schtasks.stop.test.ts b/src/daemon/schtasks.stop.test.ts new file mode 100644 index 00000000000..d2d43de3ca2 --- /dev/null +++ b/src/daemon/schtasks.stop.test.ts @@ -0,0 +1,197 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { PassThrough } from "node:stream"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +const schtasksResponses = vi.hoisted( + () => [] as Array<{ code: number; stdout: string; stderr: string }>, +); +const schtasksCalls = vi.hoisted(() => [] as string[][]); +const inspectPortUsage = vi.hoisted(() => vi.fn()); +const killProcessTree = vi.hoisted(() => vi.fn()); +const findVerifiedGatewayListenerPidsOnPortSync = vi.hoisted(() => + vi.fn<(port: number) => number[]>(() => []), +); + +vi.mock("./schtasks-exec.js", () => ({ + execSchtasks: async (argv: string[]) => { + schtasksCalls.push(argv); + return schtasksResponses.shift() ?? { code: 0, stdout: "", stderr: "" }; + }, +})); + +vi.mock("../infra/ports.js", () => ({ + inspectPortUsage: (...args: unknown[]) => inspectPortUsage(...args), +})); + +vi.mock("../process/kill-tree.js", () => ({ + killProcessTree: (...args: unknown[]) => killProcessTree(...args), +})); + +vi.mock("../infra/gateway-processes.js", () => ({ + findVerifiedGatewayListenerPidsOnPortSync: (port: number) => + findVerifiedGatewayListenerPidsOnPortSync(port), +})); + +const { restartScheduledTask, resolveTaskScriptPath, stopScheduledTask } = + await import("./schtasks.js"); + +async function withWindowsEnv( + run: (params: { tmpDir: string; env: Record }) => Promise, +) { + const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-win-stop-")); + const env = { + USERPROFILE: tmpDir, + APPDATA: path.join(tmpDir, "AppData", "Roaming"), + OPENCLAW_PROFILE: "default", + OPENCLAW_GATEWAY_PORT: "18789", + }; + try { + await run({ tmpDir, env }); + } finally { + await fs.rm(tmpDir, { recursive: true, force: true }); + } +} + +async function writeGatewayScript(env: Record, port = 18789) { + const scriptPath = resolveTaskScriptPath(env); + await fs.mkdir(path.dirname(scriptPath), { recursive: true }); + await fs.writeFile( + scriptPath, + [ + "@echo off", + `set "OPENCLAW_GATEWAY_PORT=${port}"`, + `"C:\\Program Files\\nodejs\\node.exe" "C:\\Users\\steipete\\AppData\\Roaming\\npm\\node_modules\\openclaw\\dist\\index.js" gateway --port ${port}`, + "", + ].join("\r\n"), + "utf8", + ); +} + +beforeEach(() => { + schtasksResponses.length = 0; + schtasksCalls.length = 0; + inspectPortUsage.mockReset(); + killProcessTree.mockReset(); + findVerifiedGatewayListenerPidsOnPortSync.mockReset(); + findVerifiedGatewayListenerPidsOnPortSync.mockReturnValue([]); + inspectPortUsage.mockResolvedValue({ + port: 18789, + status: "free", + listeners: [], + hints: [], + }); +}); + +afterEach(() => { + vi.restoreAllMocks(); +}); + +describe("Scheduled Task stop/restart cleanup", () => { + it("kills lingering verified gateway listeners after schtasks stop", async () => { + await withWindowsEnv(async ({ env }) => { + await writeGatewayScript(env); + schtasksResponses.push( + { code: 0, stdout: "", stderr: "" }, + { code: 0, stdout: "", stderr: "" }, + { code: 0, stdout: "", stderr: "" }, + ); + findVerifiedGatewayListenerPidsOnPortSync.mockReturnValue([4242]); + inspectPortUsage + .mockResolvedValueOnce({ + port: 18789, + status: "busy", + listeners: [{ pid: 4242, command: "node.exe" }], + hints: [], + }) + .mockResolvedValueOnce({ + port: 18789, + status: "free", + listeners: [], + hints: [], + }); + + const stdout = new PassThrough(); + await stopScheduledTask({ env, stdout }); + + expect(findVerifiedGatewayListenerPidsOnPortSync).toHaveBeenCalledWith(18789); + expect(killProcessTree).toHaveBeenCalledWith(4242, { graceMs: 300 }); + expect(inspectPortUsage).toHaveBeenCalledTimes(2); + }); + }); + + it("falls back to inspected gateway listeners when sync verification misses on Windows", async () => { + await withWindowsEnv(async ({ env }) => { + await writeGatewayScript(env); + schtasksResponses.push( + { code: 0, stdout: "", stderr: "" }, + { code: 0, stdout: "", stderr: "" }, + { code: 0, stdout: "", stderr: "" }, + ); + findVerifiedGatewayListenerPidsOnPortSync.mockReturnValue([]); + inspectPortUsage + .mockResolvedValueOnce({ + port: 18789, + status: "busy", + listeners: [ + { + pid: 6262, + command: "node.exe", + commandLine: + '"C:\\Program Files\\nodejs\\node.exe" "C:\\Users\\steipete\\AppData\\Roaming\\npm\\node_modules\\openclaw\\dist\\index.js" gateway --port 18789', + }, + ], + hints: [], + }) + .mockResolvedValueOnce({ + port: 18789, + status: "free", + listeners: [], + hints: [], + }); + + const stdout = new PassThrough(); + await stopScheduledTask({ env, stdout }); + + expect(killProcessTree).toHaveBeenCalledWith(6262, { graceMs: 300 }); + expect(inspectPortUsage).toHaveBeenCalledTimes(2); + }); + }); + + it("kills lingering verified gateway listeners and waits for port release before restart", async () => { + await withWindowsEnv(async ({ env }) => { + await writeGatewayScript(env); + schtasksResponses.push( + { code: 0, stdout: "", stderr: "" }, + { code: 0, stdout: "", stderr: "" }, + { code: 0, stdout: "", stderr: "" }, + { code: 0, stdout: "", stderr: "" }, + ); + findVerifiedGatewayListenerPidsOnPortSync.mockReturnValue([5151]); + inspectPortUsage + .mockResolvedValueOnce({ + port: 18789, + status: "busy", + listeners: [{ pid: 5151, command: "node.exe" }], + hints: [], + }) + .mockResolvedValueOnce({ + port: 18789, + status: "free", + listeners: [], + hints: [], + }); + + const stdout = new PassThrough(); + await expect(restartScheduledTask({ env, stdout })).resolves.toEqual({ + outcome: "completed", + }); + + expect(findVerifiedGatewayListenerPidsOnPortSync).toHaveBeenCalledWith(18789); + expect(killProcessTree).toHaveBeenCalledWith(5151, { graceMs: 300 }); + expect(inspectPortUsage).toHaveBeenCalledTimes(2); + expect(schtasksCalls.at(-1)).toEqual(["/Run", "/TN", "OpenClaw Gateway"]); + }); + }); +}); diff --git a/src/daemon/schtasks.ts b/src/daemon/schtasks.ts index 2c74cf26a61..fcd8b08b1af 100644 --- a/src/daemon/schtasks.ts +++ b/src/daemon/schtasks.ts @@ -1,8 +1,11 @@ -import { spawn } from "node:child_process"; +import { spawn, spawnSync } from "node:child_process"; import fs from "node:fs/promises"; import path from "node:path"; +import { isGatewayArgv } from "../infra/gateway-process-argv.js"; +import { findVerifiedGatewayListenerPidsOnPortSync } from "../infra/gateway-processes.js"; import { inspectPortUsage } from "../infra/ports.js"; import { killProcessTree } from "../process/kill-tree.js"; +import { sleep } from "../utils.js"; import { parseCmdScriptCommandLine, quoteCmdScriptArg } from "./cmd-argv.js"; import { assertNoCmdLineBreak, parseCmdSetAssignment, renderCmdSetAssignment } from "./cmd-set.js"; import { resolveGatewayServiceDescription, resolveGatewayWindowsTaskName } from "./constants.js"; @@ -311,6 +314,155 @@ function resolveConfiguredGatewayPort(env: GatewayServiceEnv): number | null { return Number.isFinite(parsed) && parsed > 0 ? parsed : null; } +function parsePositivePort(raw: string | undefined): number | null { + const value = raw?.trim(); + if (!value) { + return null; + } + if (!/^\d+$/.test(value)) { + return null; + } + const parsed = Number.parseInt(value, 10); + return Number.isFinite(parsed) && parsed > 0 && parsed <= 65535 ? parsed : null; +} + +function parsePortFromProgramArguments(programArguments?: string[]): number | null { + if (!programArguments?.length) { + return null; + } + for (let i = 0; i < programArguments.length; i += 1) { + const arg = programArguments[i]; + if (!arg) { + continue; + } + const inlineMatch = arg.match(/^--port=(\d+)$/); + if (inlineMatch) { + return parsePositivePort(inlineMatch[1]); + } + if (arg === "--port") { + return parsePositivePort(programArguments[i + 1]); + } + } + return null; +} + +async function resolveScheduledTaskPort(env: GatewayServiceEnv): Promise { + const command = await readScheduledTaskCommand(env).catch(() => null); + return ( + parsePortFromProgramArguments(command?.programArguments) ?? + parsePositivePort(command?.environment?.OPENCLAW_GATEWAY_PORT) ?? + resolveConfiguredGatewayPort(env) + ); +} + +async function resolveScheduledTaskGatewayListenerPids(port: number): Promise { + const verified = findVerifiedGatewayListenerPidsOnPortSync(port); + if (verified.length > 0) { + return verified; + } + + const diagnostics = await inspectPortUsage(port).catch(() => null); + if (diagnostics?.status !== "busy") { + return []; + } + + const matchedGatewayPids = Array.from( + new Set( + diagnostics.listeners + .filter( + (listener) => + typeof listener.pid === "number" && + listener.commandLine && + isGatewayArgv(parseCmdScriptCommandLine(listener.commandLine), { + allowGatewayBinary: true, + }), + ) + .map((listener) => listener.pid as number), + ), + ); + if (matchedGatewayPids.length > 0) { + return matchedGatewayPids; + } + + return Array.from( + new Set( + diagnostics.listeners + .map((listener) => listener.pid) + .filter((pid): pid is number => Number.isFinite(pid) && pid > 0), + ), + ); +} + +async function terminateScheduledTaskGatewayListeners(env: GatewayServiceEnv): Promise { + const port = await resolveScheduledTaskPort(env); + if (!port) { + return []; + } + const pids = await resolveScheduledTaskGatewayListenerPids(port); + for (const pid of pids) { + await terminateGatewayProcessTree(pid, 300); + } + return pids; +} + +function isProcessAlive(pid: number): boolean { + try { + process.kill(pid, 0); + return true; + } catch { + return false; + } +} + +async function waitForProcessExit(pid: number, timeoutMs: number): Promise { + const deadline = Date.now() + timeoutMs; + while (Date.now() < deadline) { + if (!isProcessAlive(pid)) { + return true; + } + await sleep(100); + } + return !isProcessAlive(pid); +} + +async function terminateGatewayProcessTree(pid: number, graceMs: number): Promise { + if (process.platform !== "win32") { + killProcessTree(pid, { graceMs }); + return; + } + const taskkillPath = path.join( + process.env.SystemRoot ?? "C:\\Windows", + "System32", + "taskkill.exe", + ); + spawnSync(taskkillPath, ["/T", "/PID", String(pid)], { + stdio: "ignore", + timeout: 5_000, + windowsHide: true, + }); + if (await waitForProcessExit(pid, graceMs)) { + return; + } + spawnSync(taskkillPath, ["/F", "/T", "/PID", String(pid)], { + stdio: "ignore", + timeout: 5_000, + windowsHide: true, + }); + await waitForProcessExit(pid, 5_000); +} + +async function waitForGatewayPortRelease(port: number, timeoutMs = 5_000): Promise { + const deadline = Date.now() + timeoutMs; + while (Date.now() < deadline) { + const diagnostics = await inspectPortUsage(port).catch(() => null); + if (diagnostics?.status === "free") { + return true; + } + await sleep(250); + } + return false; +} + async function resolveFallbackRuntime(env: GatewayServiceEnv): Promise { const port = resolveConfiguredGatewayPort(env); if (!port) { @@ -343,18 +495,28 @@ async function stopStartupEntry( ): Promise { const runtime = await resolveFallbackRuntime(env); if (typeof runtime.pid === "number" && runtime.pid > 0) { - killProcessTree(runtime.pid, { graceMs: 300 }); + await terminateGatewayProcessTree(runtime.pid, 300); } stdout.write(`${formatLine("Stopped Windows login item", resolveTaskName(env))}\n`); } +async function terminateInstalledStartupRuntime(env: GatewayServiceEnv): Promise { + if (!(await isStartupEntryInstalled(env))) { + return; + } + const runtime = await resolveFallbackRuntime(env); + if (typeof runtime.pid === "number" && runtime.pid > 0) { + await terminateGatewayProcessTree(runtime.pid, 300); + } +} + async function restartStartupEntry( env: GatewayServiceEnv, stdout: NodeJS.WritableStream, ): Promise { const runtime = await resolveFallbackRuntime(env); if (typeof runtime.pid === "number" && runtime.pid > 0) { - killProcessTree(runtime.pid, { graceMs: 300 }); + await terminateGatewayProcessTree(runtime.pid, 300); } launchFallbackTaskScript(resolveTaskScriptPath(env)); stdout.write(`${formatLine("Restarted Windows login item", resolveTaskName(env))}\n`); @@ -489,6 +651,12 @@ export async function stopScheduledTask({ stdout, env }: GatewayServiceControlAr if (res.code !== 0 && !isTaskNotRunning(res)) { throw new Error(`schtasks end failed: ${res.stderr || res.stdout}`.trim()); } + const stopPort = await resolveScheduledTaskPort(effectiveEnv); + await terminateScheduledTaskGatewayListeners(effectiveEnv); + await terminateInstalledStartupRuntime(effectiveEnv); + if (stopPort) { + await waitForGatewayPortRelease(stopPort); + } stdout.write(`${formatLine("Stopped Scheduled Task", taskName)}\n`); } @@ -512,6 +680,12 @@ export async function restartScheduledTask({ } const taskName = resolveTaskName(effectiveEnv); await execSchtasks(["/End", "/TN", taskName]); + const restartPort = await resolveScheduledTaskPort(effectiveEnv); + await terminateScheduledTaskGatewayListeners(effectiveEnv); + await terminateInstalledStartupRuntime(effectiveEnv); + if (restartPort) { + await waitForGatewayPortRelease(restartPort); + } const res = await execSchtasks(["/Run", "/TN", taskName]); if (res.code !== 0) { throw new Error(`schtasks run failed: ${res.stderr || res.stdout}`.trim()); diff --git a/src/gateway/client.ts b/src/gateway/client.ts index 9e98a9bc0c4..f2c7a184dd8 100644 --- a/src/gateway/client.ts +++ b/src/gateway/client.ts @@ -95,7 +95,7 @@ export type GatewayClientOptions = { commands?: string[]; permissions?: Record; pathEnv?: string; - deviceIdentity?: DeviceIdentity; + deviceIdentity?: DeviceIdentity | null; minProtocol?: number; maxProtocol?: number; tlsFingerprint?: string; @@ -138,7 +138,10 @@ export class GatewayClient { constructor(opts: GatewayClientOptions) { this.opts = { ...opts, - deviceIdentity: opts.deviceIdentity ?? loadOrCreateDeviceIdentity(), + deviceIdentity: + opts.deviceIdentity === null + ? undefined + : (opts.deviceIdentity ?? loadOrCreateDeviceIdentity()), }; } diff --git a/src/gateway/probe.test.ts b/src/gateway/probe.test.ts index b5927389c4d..6cd7d64fc51 100644 --- a/src/gateway/probe.test.ts +++ b/src/gateway/probe.test.ts @@ -2,6 +2,7 @@ import { describe, expect, it, vi } from "vitest"; const gatewayClientState = vi.hoisted(() => ({ options: null as Record | null, + requests: [] as string[], })); class MockGatewayClient { @@ -10,6 +11,7 @@ class MockGatewayClient { constructor(opts: Record) { this.opts = opts; gatewayClientState.options = opts; + gatewayClientState.requests = []; } start(): void { @@ -26,6 +28,7 @@ class MockGatewayClient { stop(): void {} async request(method: string): Promise { + gatewayClientState.requests.push(method); if (method === "system-presence") { return []; } @@ -48,6 +51,34 @@ describe("probeGateway", () => { }); expect(gatewayClientState.options?.scopes).toEqual(["operator.read"]); + expect(gatewayClientState.options?.deviceIdentity).toBeNull(); + expect(gatewayClientState.requests).toEqual([ + "health", + "status", + "system-presence", + "config.get", + ]); expect(result.ok).toBe(true); }); + + it("keeps device identity enabled for remote probes", async () => { + await probeGateway({ + url: "wss://gateway.example/ws", + auth: { token: "secret" }, + timeoutMs: 1_000, + }); + + expect(gatewayClientState.options?.deviceIdentity).toBeUndefined(); + }); + + it("skips detail RPCs for lightweight reachability probes", async () => { + const result = await probeGateway({ + url: "ws://127.0.0.1:18789", + timeoutMs: 1_000, + includeDetails: false, + }); + + expect(result.ok).toBe(true); + expect(gatewayClientState.requests).toEqual([]); + }); }); diff --git a/src/gateway/probe.ts b/src/gateway/probe.ts index 0521e84d9c8..40740987fb0 100644 --- a/src/gateway/probe.ts +++ b/src/gateway/probe.ts @@ -4,6 +4,7 @@ import type { SystemPresence } from "../infra/system-presence.js"; import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "../utils/message-channel.js"; import { GatewayClient } from "./client.js"; import { READ_SCOPE } from "./method-scopes.js"; +import { isLoopbackHost } from "./net.js"; export type GatewayProbeAuth = { token?: string; @@ -32,6 +33,7 @@ export async function probeGateway(opts: { url: string; auth?: GatewayProbeAuth; timeoutMs: number; + includeDetails?: boolean; }): Promise { const startedAt = Date.now(); const instanceId = randomUUID(); @@ -39,6 +41,14 @@ export async function probeGateway(opts: { let connectError: string | null = null; let close: GatewayProbeClose | null = null; + const disableDeviceIdentity = (() => { + try { + return isLoopbackHost(new URL(opts.url).hostname); + } catch { + return false; + } + })(); + return await new Promise((resolve) => { let settled = false; const settle = (result: Omit) => { @@ -60,6 +70,7 @@ export async function probeGateway(opts: { clientVersion: "dev", mode: GATEWAY_CLIENT_MODES.PROBE, instanceId, + deviceIdentity: disableDeviceIdentity ? null : undefined, onConnectError: (err) => { connectError = formatErrorMessage(err); }, @@ -68,6 +79,19 @@ export async function probeGateway(opts: { }, onHelloOk: async () => { connectLatencyMs = Date.now() - startedAt; + if (opts.includeDetails === false) { + settle({ + ok: true, + connectLatencyMs, + error: null, + close, + health: null, + status: null, + presence: null, + configSnapshot: null, + }); + return; + } try { const [health, status, presence, configSnapshot] = await Promise.all([ client.request("health"), diff --git a/src/infra/gateway-processes.ts b/src/infra/gateway-processes.ts new file mode 100644 index 00000000000..340b54a259f --- /dev/null +++ b/src/infra/gateway-processes.ts @@ -0,0 +1,162 @@ +import { spawnSync } from "node:child_process"; +import fsSync from "node:fs"; +import { parseCmdScriptCommandLine } from "../daemon/cmd-argv.js"; +import { isGatewayArgv, parseProcCmdline } from "./gateway-process-argv.js"; +import { findGatewayPidsOnPortSync as findUnixGatewayPidsOnPortSync } from "./restart-stale-pids.js"; + +const WINDOWS_GATEWAY_DISCOVERY_TIMEOUT_MS = 5_000; + +function extractWindowsCommandLine(raw: string): string | null { + const lines = raw + .split(/\r?\n/) + .map((line) => line.trim()) + .filter(Boolean); + for (const line of lines) { + if (!line.toLowerCase().startsWith("commandline=")) { + continue; + } + const value = line.slice("commandline=".length).trim(); + return value || null; + } + return lines.find((line) => line.toLowerCase() !== "commandline") ?? null; +} + +function readWindowsProcessArgsViaPowerShell(pid: number): string[] | null { + const ps = spawnSync( + "powershell", + [ + "-NoProfile", + "-Command", + `(Get-CimInstance Win32_Process -Filter "ProcessId = ${pid}" | Select-Object -ExpandProperty CommandLine)`, + ], + { + encoding: "utf8", + timeout: WINDOWS_GATEWAY_DISCOVERY_TIMEOUT_MS, + windowsHide: true, + }, + ); + if (ps.error || ps.status !== 0) { + return null; + } + const command = ps.stdout.trim(); + return command ? parseCmdScriptCommandLine(command) : null; +} + +function readWindowsProcessArgsViaWmic(pid: number): string[] | null { + const wmic = spawnSync( + "wmic", + ["process", "where", `ProcessId=${pid}`, "get", "CommandLine", "/value"], + { + encoding: "utf8", + timeout: WINDOWS_GATEWAY_DISCOVERY_TIMEOUT_MS, + windowsHide: true, + }, + ); + if (wmic.error || wmic.status !== 0) { + return null; + } + const command = extractWindowsCommandLine(wmic.stdout); + return command ? parseCmdScriptCommandLine(command) : null; +} + +function readWindowsListeningPidsViaPowerShell(port: number): number[] | null { + const ps = spawnSync( + "powershell", + [ + "-NoProfile", + "-Command", + `(Get-NetTCPConnection -LocalPort ${port} -State Listen -ErrorAction SilentlyContinue | Select-Object -ExpandProperty OwningProcess)`, + ], + { + encoding: "utf8", + timeout: WINDOWS_GATEWAY_DISCOVERY_TIMEOUT_MS, + windowsHide: true, + }, + ); + if (ps.error || ps.status !== 0) { + return null; + } + return ps.stdout + .split(/\r?\n/) + .map((line) => Number.parseInt(line.trim(), 10)) + .filter((pid) => Number.isFinite(pid) && pid > 0); +} + +function readWindowsListeningPidsViaNetstat(port: number): number[] { + const netstat = spawnSync("netstat", ["-ano", "-p", "tcp"], { + encoding: "utf8", + timeout: WINDOWS_GATEWAY_DISCOVERY_TIMEOUT_MS, + windowsHide: true, + }); + if (netstat.error || netstat.status !== 0) { + return []; + } + const pids = new Set(); + for (const line of netstat.stdout.split(/\r?\n/)) { + const match = line.match(/^\s*TCP\s+(\S+):(\d+)\s+\S+\s+LISTENING\s+(\d+)\s*$/i); + if (!match) { + continue; + } + const parsedPort = Number.parseInt(match[2] ?? "", 10); + const pid = Number.parseInt(match[3] ?? "", 10); + if (parsedPort === port && Number.isFinite(pid) && pid > 0) { + pids.add(pid); + } + } + return [...pids]; +} + +function readWindowsListeningPidsOnPortSync(port: number): number[] { + return readWindowsListeningPidsViaPowerShell(port) ?? readWindowsListeningPidsViaNetstat(port); +} + +export function readGatewayProcessArgsSync(pid: number): string[] | null { + if (process.platform === "linux") { + try { + return parseProcCmdline(fsSync.readFileSync(`/proc/${pid}/cmdline`, "utf8")); + } catch { + return null; + } + } + if (process.platform === "darwin") { + const ps = spawnSync("ps", ["-o", "command=", "-p", String(pid)], { + encoding: "utf8", + timeout: 1000, + }); + if (ps.error || ps.status !== 0) { + return null; + } + const command = ps.stdout.trim(); + return command ? command.split(/\s+/) : null; + } + if (process.platform === "win32") { + return readWindowsProcessArgsViaPowerShell(pid) ?? readWindowsProcessArgsViaWmic(pid); + } + return null; +} + +export function signalVerifiedGatewayPidSync(pid: number, signal: "SIGTERM" | "SIGUSR1"): void { + const args = readGatewayProcessArgsSync(pid); + if (!args || !isGatewayArgv(args, { allowGatewayBinary: true })) { + throw new Error(`refusing to signal non-gateway process pid ${pid}`); + } + process.kill(pid, signal); +} + +export function findVerifiedGatewayListenerPidsOnPortSync(port: number): number[] { + const rawPids = + process.platform === "win32" + ? readWindowsListeningPidsOnPortSync(port) + : findUnixGatewayPidsOnPortSync(port); + + return Array.from(new Set(rawPids)) + .filter((pid): pid is number => Number.isFinite(pid) && pid > 0 && pid !== process.pid) + .filter((pid) => { + const args = readGatewayProcessArgsSync(pid); + return args != null && isGatewayArgv(args, { allowGatewayBinary: true }); + }); +} + +export function formatGatewayPidList(pids: number[]): string { + return pids.join(", "); +}