refactor(gateway): simplify restart flow and expand lock tests

This commit is contained in:
Peter Steinberger 2026-02-22 10:44:35 +01:00
parent bd4f670544
commit edaa5ef7a5
5 changed files with 252 additions and 164 deletions

View File

@ -57,62 +57,86 @@ function removeNewSignalListeners(
}
}
async function withIsolatedSignals(run: () => Promise<void>) {
const beforeSigterm = new Set(
process.listeners("SIGTERM") as Array<(...args: unknown[]) => void>,
);
const beforeSigint = new Set(process.listeners("SIGINT") as Array<(...args: unknown[]) => void>);
const beforeSigusr1 = new Set(
process.listeners("SIGUSR1") as Array<(...args: unknown[]) => void>,
);
try {
await run();
} finally {
removeNewSignalListeners("SIGTERM", beforeSigterm);
removeNewSignalListeners("SIGINT", beforeSigint);
removeNewSignalListeners("SIGUSR1", beforeSigusr1);
}
}
function createRuntimeWithExitSignal(exitCallOrder?: string[]) {
let resolveExit: (code: number) => void = () => {};
const exited = new Promise<number>((resolve) => {
resolveExit = resolve;
});
const runtime = {
log: vi.fn(),
error: vi.fn(),
exit: vi.fn((code: number) => {
exitCallOrder?.push("exit");
resolveExit(code);
}),
};
return { runtime, exited };
}
describe("runGatewayLoop", () => {
it("restarts after SIGUSR1 even when drain times out, and resets lanes for the new iteration", async () => {
vi.clearAllMocks();
getActiveTaskCount.mockReturnValueOnce(2).mockReturnValueOnce(0);
waitForActiveTasks.mockResolvedValueOnce({ drained: false });
type StartServer = () => Promise<{
close: (opts: { reason: string; restartExpectedMs: number | null }) => Promise<void>;
}>;
await withIsolatedSignals(async () => {
getActiveTaskCount.mockReturnValueOnce(2).mockReturnValueOnce(0);
waitForActiveTasks.mockResolvedValueOnce({ drained: false });
const closeFirst = vi.fn(async () => {});
const closeSecond = vi.fn(async () => {});
type StartServer = () => Promise<{
close: (opts: { reason: string; restartExpectedMs: number | null }) => Promise<void>;
}>;
const start = vi.fn<StartServer>();
let resolveFirst: (() => void) | null = null;
const startedFirst = new Promise<void>((resolve) => {
resolveFirst = resolve;
});
start.mockImplementationOnce(async () => {
resolveFirst?.();
return { close: closeFirst };
});
const closeFirst = vi.fn(async () => {});
const closeSecond = vi.fn(async () => {});
let resolveSecond: (() => void) | null = null;
const startedSecond = new Promise<void>((resolve) => {
resolveSecond = resolve;
});
start.mockImplementationOnce(async () => {
resolveSecond?.();
return { close: closeSecond };
});
const start = vi.fn<StartServer>();
let resolveFirst: (() => void) | null = null;
const startedFirst = new Promise<void>((resolve) => {
resolveFirst = resolve;
});
start.mockImplementationOnce(async () => {
resolveFirst?.();
return { close: closeFirst };
});
start.mockRejectedValueOnce(new Error("stop-loop"));
let resolveSecond: (() => void) | null = null;
const startedSecond = new Promise<void>((resolve) => {
resolveSecond = resolve;
});
start.mockImplementationOnce(async () => {
resolveSecond?.();
return { close: closeSecond };
});
const beforeSigterm = new Set(
process.listeners("SIGTERM") as Array<(...args: unknown[]) => void>,
);
const beforeSigint = new Set(
process.listeners("SIGINT") as Array<(...args: unknown[]) => void>,
);
const beforeSigusr1 = new Set(
process.listeners("SIGUSR1") as Array<(...args: unknown[]) => void>,
);
start.mockRejectedValueOnce(new Error("stop-loop"));
const { runGatewayLoop } = await import("./run-loop.js");
const runtime = {
log: vi.fn(),
error: vi.fn(),
exit: vi.fn(),
};
const loopPromise = runGatewayLoop({
start: start as unknown as Parameters<typeof runGatewayLoop>[0]["start"],
runtime: runtime as unknown as Parameters<typeof runGatewayLoop>[0]["runtime"],
});
const { runGatewayLoop } = await import("./run-loop.js");
const runtime = {
log: vi.fn(),
error: vi.fn(),
exit: vi.fn(),
};
const loopPromise = runGatewayLoop({
start: start as unknown as Parameters<typeof runGatewayLoop>[0]["start"],
runtime: runtime as unknown as Parameters<typeof runGatewayLoop>[0]["runtime"],
});
try {
await startedFirst;
expect(start).toHaveBeenCalledTimes(1);
await new Promise<void>((resolve) => setImmediate(resolve));
@ -142,86 +166,105 @@ describe("runGatewayLoop", () => {
expect(markGatewaySigusr1RestartHandled).toHaveBeenCalledTimes(2);
expect(resetAllLanes).toHaveBeenCalledTimes(2);
expect(acquireGatewayLock).toHaveBeenCalledTimes(3);
} finally {
removeNewSignalListeners("SIGTERM", beforeSigterm);
removeNewSignalListeners("SIGINT", beforeSigint);
removeNewSignalListeners("SIGUSR1", beforeSigusr1);
}
});
});
it("releases the lock before exiting on spawned restart", async () => {
vi.clearAllMocks();
const lockRelease = vi.fn(async () => {});
acquireGatewayLock.mockResolvedValueOnce({
release: lockRelease,
});
await withIsolatedSignals(async () => {
const lockRelease = vi.fn(async () => {});
acquireGatewayLock.mockResolvedValueOnce({
release: lockRelease,
});
// Override process-respawn to return "spawned" mode
restartGatewayProcessWithFreshPid.mockReturnValueOnce({
mode: "spawned",
pid: 9999,
});
// Override process-respawn to return "spawned" mode
restartGatewayProcessWithFreshPid.mockReturnValueOnce({
mode: "spawned",
pid: 9999,
});
const close = vi.fn(async () => {});
let resolveStarted: (() => void) | null = null;
const started = new Promise<void>((resolve) => {
resolveStarted = resolve;
});
const close = vi.fn(async () => {});
let resolveStarted: (() => void) | null = null;
const started = new Promise<void>((resolve) => {
resolveStarted = resolve;
});
const start = vi.fn(async () => {
resolveStarted?.();
return { close };
});
const start = vi.fn(async () => {
resolveStarted?.();
return { close };
});
const exitCallOrder: string[] = [];
const runtime = {
log: vi.fn(),
error: vi.fn(),
exit: vi.fn(() => {
exitCallOrder.push("exit");
}),
};
const exitCallOrder: string[] = [];
const { runtime, exited } = createRuntimeWithExitSignal(exitCallOrder);
lockRelease.mockImplementation(async () => {
exitCallOrder.push("lockRelease");
});
lockRelease.mockImplementation(async () => {
exitCallOrder.push("lockRelease");
});
vi.resetModules();
const { runGatewayLoop } = await import("./run-loop.js");
const _loopPromise = runGatewayLoop({
start: start as unknown as Parameters<typeof runGatewayLoop>[0]["start"],
runtime: runtime as unknown as Parameters<typeof runGatewayLoop>[0]["runtime"],
});
const beforeSigterm = new Set(
process.listeners("SIGTERM") as Array<(...args: unknown[]) => void>,
);
const beforeSigint = new Set(
process.listeners("SIGINT") as Array<(...args: unknown[]) => void>,
);
const beforeSigusr1 = new Set(
process.listeners("SIGUSR1") as Array<(...args: unknown[]) => void>,
);
vi.resetModules();
const { runGatewayLoop } = await import("./run-loop.js");
const _loopPromise = runGatewayLoop({
start: start as unknown as Parameters<typeof runGatewayLoop>[0]["start"],
runtime: runtime as unknown as Parameters<typeof runGatewayLoop>[0]["runtime"],
});
try {
await started;
await new Promise<void>((resolve) => setImmediate(resolve));
process.emit("SIGUSR1");
// Wait for the shutdown path to complete
await new Promise<void>((resolve) => setTimeout(resolve, 100));
await exited;
expect(lockRelease).toHaveBeenCalled();
expect(runtime.exit).toHaveBeenCalledWith(0);
// Lock must be released BEFORE exit
expect(exitCallOrder).toEqual(["lockRelease", "exit"]);
} finally {
removeNewSignalListeners("SIGTERM", beforeSigterm);
removeNewSignalListeners("SIGINT", beforeSigint);
removeNewSignalListeners("SIGUSR1", beforeSigusr1);
}
});
});
it("exits when lock reacquire fails during in-process restart fallback", async () => {
vi.clearAllMocks();
await withIsolatedSignals(async () => {
const lockRelease = vi.fn(async () => {});
acquireGatewayLock
.mockResolvedValueOnce({
release: lockRelease,
})
.mockRejectedValueOnce(new Error("lock timeout"));
restartGatewayProcessWithFreshPid.mockReturnValueOnce({
mode: "disabled",
});
const close = vi.fn(async () => {});
let resolveStarted: (() => void) | null = null;
const started = new Promise<void>((resolve) => {
resolveStarted = resolve;
});
const start = vi.fn(async () => {
resolveStarted?.();
return { close };
});
const { runtime, exited } = createRuntimeWithExitSignal();
vi.resetModules();
const { runGatewayLoop } = await import("./run-loop.js");
const _loopPromise = runGatewayLoop({
start: start as unknown as Parameters<typeof runGatewayLoop>[0]["start"],
runtime: runtime as unknown as Parameters<typeof runGatewayLoop>[0]["runtime"],
});
await started;
await new Promise<void>((resolve) => setImmediate(resolve));
process.emit("SIGUSR1");
await expect(exited).resolves.toBe(1);
expect(acquireGatewayLock).toHaveBeenCalledTimes(2);
expect(start).toHaveBeenCalledTimes(1);
expect(gatewayLog.error).toHaveBeenCalledWith(
expect.stringContaining("failed to reacquire gateway lock for in-process restart"),
);
});
});
});

View File

@ -33,6 +33,58 @@ export async function runGatewayLoop(params: {
process.removeListener("SIGINT", onSigint);
process.removeListener("SIGUSR1", onSigusr1);
};
const exitProcess = (code: number) => {
cleanupSignals();
params.runtime.exit(code);
};
const releaseLockIfHeld = async (): Promise<boolean> => {
if (!lock) {
return false;
}
await lock.release();
lock = null;
return true;
};
const reacquireLockForInProcessRestart = async (): Promise<boolean> => {
try {
lock = await acquireGatewayLock();
return true;
} catch (err) {
gatewayLog.error(`failed to reacquire gateway lock for in-process restart: ${String(err)}`);
exitProcess(1);
return false;
}
};
const handleRestartAfterServerClose = async () => {
const hadLock = await releaseLockIfHeld();
// Release the lock BEFORE spawning so the child can acquire it immediately.
const respawn = restartGatewayProcessWithFreshPid();
if (respawn.mode === "spawned" || respawn.mode === "supervised") {
const modeLabel =
respawn.mode === "spawned"
? `spawned pid ${respawn.pid ?? "unknown"}`
: "supervisor restart";
gatewayLog.info(`restart mode: full process restart (${modeLabel})`);
exitProcess(0);
return;
}
if (respawn.mode === "failed") {
gatewayLog.warn(
`full process restart failed (${respawn.detail ?? "unknown error"}); falling back to in-process restart`,
);
} else {
gatewayLog.info("restart mode: in-process restart (OPENCLAW_NO_RESPAWN)");
}
if (hadLock && !(await reacquireLockForInProcessRestart())) {
return;
}
shuttingDown = false;
restartResolver?.();
};
const handleStopAfterServerClose = async () => {
await releaseLockIfHeld();
exitProcess(0);
};
const DRAIN_TIMEOUT_MS = 30_000;
const SHUTDOWN_TIMEOUT_MS = 5_000;
@ -50,8 +102,7 @@ export async function runGatewayLoop(params: {
const forceExitMs = isRestart ? DRAIN_TIMEOUT_MS + SHUTDOWN_TIMEOUT_MS : SHUTDOWN_TIMEOUT_MS;
const forceExitTimer = setTimeout(() => {
gatewayLog.error("shutdown timed out; exiting without full cleanup");
cleanupSignals();
params.runtime.exit(0);
exitProcess(0);
}, forceExitMs);
void (async () => {
@ -83,54 +134,9 @@ export async function runGatewayLoop(params: {
clearTimeout(forceExitTimer);
server = null;
if (isRestart) {
const hadLock = lock != null;
// Release the lock BEFORE spawning so the child can acquire it immediately.
if (lock) {
await lock.release();
lock = null;
}
const respawn = restartGatewayProcessWithFreshPid();
if (respawn.mode === "spawned" || respawn.mode === "supervised") {
const modeLabel =
respawn.mode === "spawned"
? `spawned pid ${respawn.pid ?? "unknown"}`
: "supervisor restart";
gatewayLog.info(`restart mode: full process restart (${modeLabel})`);
cleanupSignals();
params.runtime.exit(0);
} else {
if (respawn.mode === "failed") {
gatewayLog.warn(
`full process restart failed (${respawn.detail ?? "unknown error"}); falling back to in-process restart`,
);
} else {
gatewayLog.info("restart mode: in-process restart (OPENCLAW_NO_RESPAWN)");
}
let canContinueInProcessRestart = true;
if (hadLock) {
try {
lock = await acquireGatewayLock();
} catch (err) {
gatewayLog.error(
`failed to reacquire gateway lock for in-process restart: ${String(err)}`,
);
cleanupSignals();
params.runtime.exit(1);
canContinueInProcessRestart = false;
}
}
if (canContinueInProcessRestart) {
shuttingDown = false;
restartResolver?.();
}
}
await handleRestartAfterServerClose();
} else {
if (lock) {
await lock.release();
lock = null;
}
cleanupSignals();
params.runtime.exit(0);
await handleStopAfterServerClose();
}
}
})();
@ -183,10 +189,7 @@ export async function runGatewayLoop(params: {
});
}
} finally {
if (lock) {
await lock.release();
lock = null;
}
await releaseLockIfHeld();
cleanupSignals();
}
}

View File

@ -10,12 +10,22 @@ import { isMainModule } from "./infra/is-main.js";
import { installProcessWarningFilter } from "./infra/warning-filter.js";
import { attachChildProcessBridge } from "./process/child-process-bridge.js";
const ENTRY_WRAPPER_PAIRS = [
{ wrapperBasename: "openclaw.mjs", entryBasename: "entry.js" },
{ wrapperBasename: "openclaw.js", entryBasename: "entry.js" },
] as const;
// Guard: only run entry-point logic when this file is the main module.
// The bundler may import entry.js as a shared dependency when dist/index.js
// is the actual entry point; without this guard the top-level code below
// would call runCli a second time, starting a duplicate gateway that fails
// on the lock / port and crashes the process.
if (!isMainModule({ currentFile: fileURLToPath(import.meta.url) })) {
if (
!isMainModule({
currentFile: fileURLToPath(import.meta.url),
wrapperEntryPairs: [...ENTRY_WRAPPER_PAIRS],
})
) {
// Imported as a dependency — skip all entry-point side effects.
} else {
process.title = "openclaw";

View File

@ -63,10 +63,34 @@ describe("infra parsing", () => {
argv: ["node", "/repo/openclaw.mjs"],
cwd: "/repo",
env: {},
wrapperEntryPairs: [{ wrapperBasename: "openclaw.mjs", entryBasename: "entry.js" }],
}),
).toBe(true);
});
it("returns false for wrapper launches when wrapper pair is not configured", () => {
expect(
isMainModule({
currentFile: "/repo/dist/entry.js",
argv: ["node", "/repo/openclaw.mjs"],
cwd: "/repo",
env: {},
}),
).toBe(false);
});
it("returns false when wrapper pair targets a different entry basename", () => {
expect(
isMainModule({
currentFile: "/repo/dist/index.js",
argv: ["node", "/repo/openclaw.mjs"],
cwd: "/repo",
env: {},
wrapperEntryPairs: [{ wrapperBasename: "openclaw.mjs", entryBasename: "entry.js" }],
}),
).toBe(false);
});
it("returns false when running under PM2 but this module is imported", () => {
expect(
isMainModule({

View File

@ -6,6 +6,10 @@ type IsMainModuleOptions = {
argv?: string[];
env?: NodeJS.ProcessEnv;
cwd?: string;
wrapperEntryPairs?: Array<{
wrapperBasename: string;
entryBasename: string;
}>;
};
function normalizePathCandidate(candidate: string | undefined, cwd: string): string | undefined {
@ -26,6 +30,7 @@ export function isMainModule({
argv = process.argv,
env = process.env,
cwd = process.cwd(),
wrapperEntryPairs = [],
}: IsMainModuleOptions): boolean {
const normalizedCurrent = normalizePathCandidate(currentFile, cwd);
const normalizedArgv1 = normalizePathCandidate(argv[1], cwd);
@ -41,12 +46,15 @@ export function isMainModule({
return true;
}
// The published/open-source wrapper binary is openclaw.mjs, which then imports
// dist/entry.js. Treat that pair as the main module so entry bootstrap runs.
if (normalizedCurrent && normalizedArgv1) {
// Optional wrapper->entry mapping for wrapper launchers that import the real entry.
if (normalizedCurrent && normalizedArgv1 && wrapperEntryPairs.length > 0) {
const currentBase = path.basename(normalizedCurrent);
const argvBase = path.basename(normalizedArgv1);
if (currentBase === "entry.js" && (argvBase === "openclaw.mjs" || argvBase === "openclaw.js")) {
const matched = wrapperEntryPairs.some(
({ wrapperBasename, entryBasename }) =>
currentBase === entryBasename && argvBase === wrapperBasename,
);
if (matched) {
return true;
}
}