fix: stabilize plugin startup boundaries

This commit is contained in:
Peter Steinberger 2026-03-28 05:01:21 +00:00
parent 838013c87a
commit 23f0486810
6 changed files with 357 additions and 252 deletions

View File

@ -329,6 +329,37 @@ describe("gateway bonjour advertiser", () => {
await started.stop();
});
it("suppresses ciao self-probe retry console noise while advertising", async () => {
enableAdvertiserUnitMode();
const destroy = vi.fn().mockResolvedValue(undefined);
const advertise = vi.fn().mockResolvedValue(undefined);
mockCiaoService({ advertise, destroy });
const originalConsoleLog = console.log;
const baseConsoleLog = vi.fn();
console.log = baseConsoleLog as typeof console.log;
try {
const started = await startGatewayBonjourAdvertiser({
gatewayPort: 18789,
sshPort: 2222,
});
console.log(
"[test._openclaw-gw._tcp.local.] failed probing with reason: Error: Can't probe for a service which is announced already. Received announcing for service test._openclaw-gw._tcp.local.. Trying again in 2 seconds!",
);
console.log("ordinary console line");
expect(baseConsoleLog).toHaveBeenCalledTimes(1);
expect(baseConsoleLog).toHaveBeenCalledWith("ordinary console line");
await started.stop();
} finally {
console.log = originalConsoleLog;
}
});
it("recreates the advertiser when ciao gets stuck announcing", async () => {
enableAdvertiserUnitMode();
vi.useFakeTimers();

View File

@ -63,9 +63,13 @@ type ServiceStateTracker = {
sinceMs: number;
};
type ConsoleLogFn = (...args: unknown[]) => void;
const WATCHDOG_INTERVAL_MS = 5_000;
const REPAIR_DEBOUNCE_MS = 30_000;
const STUCK_ANNOUNCING_MS = 8_000;
const CIAO_SELF_PROBE_RETRY_FRAGMENT =
"failed probing with reason: Error: Can't probe for a service which is announced already.";
function serviceSummary(label: string, svc: BonjourService): string {
let fqdn = "unknown";
@ -109,6 +113,28 @@ function handleCiaoUnhandledRejection(reason: unknown): boolean {
return true;
}
function shouldSuppressCiaoConsoleLog(args: unknown[]): boolean {
return args.some(
(arg) => typeof arg === "string" && arg.includes(CIAO_SELF_PROBE_RETRY_FRAGMENT),
);
}
function installCiaoConsoleNoiseFilter(): () => void {
const originalConsoleLog = console.log as ConsoleLogFn;
console.log = ((...args: unknown[]) => {
if (shouldSuppressCiaoConsoleLog(args)) {
return;
}
originalConsoleLog(...args);
}) as ConsoleLogFn;
return () => {
if (console.log === originalConsoleLog) {
return;
}
console.log = originalConsoleLog;
};
}
export async function startGatewayBonjourAdvertiser(
opts: GatewayBonjourAdvertiseOpts,
): Promise<GatewayBonjourAdvertiser> {
@ -117,278 +143,287 @@ export async function startGatewayBonjourAdvertiser(
}
const { getResponder, Protocol } = await import("@homebridge/ciao");
const restoreConsoleLog = installCiaoConsoleNoiseFilter();
try {
// mDNS service instance names are single DNS labels; dots in hostnames (like
// `Mac.localdomain`) can confuse some resolvers/browsers and break discovery.
// Keep only the first label and normalize away a trailing `.local`.
const hostnameRaw = process.env.OPENCLAW_MDNS_HOSTNAME?.trim() || "openclaw";
const hostname =
hostnameRaw
.replace(/\.local$/i, "")
.split(".")[0]
.trim() || "openclaw";
const instanceName =
typeof opts.instanceName === "string" && opts.instanceName.trim()
? opts.instanceName.trim()
: `${hostname} (OpenClaw)`;
const displayName = prettifyInstanceName(instanceName);
// mDNS service instance names are single DNS labels; dots in hostnames (like
// `Mac.localdomain`) can confuse some resolvers/browsers and break discovery.
// Keep only the first label and normalize away a trailing `.local`.
const hostnameRaw = process.env.OPENCLAW_MDNS_HOSTNAME?.trim() || "openclaw";
const hostname =
hostnameRaw
.replace(/\.local$/i, "")
.split(".")[0]
.trim() || "openclaw";
const instanceName =
typeof opts.instanceName === "string" && opts.instanceName.trim()
? opts.instanceName.trim()
: `${hostname} (OpenClaw)`;
const displayName = prettifyInstanceName(instanceName);
const txtBase: Record<string, string> = {
role: "gateway",
gatewayPort: String(opts.gatewayPort),
lanHost: `${hostname}.local`,
displayName,
};
if (opts.gatewayTlsEnabled) {
txtBase.gatewayTls = "1";
if (opts.gatewayTlsFingerprintSha256) {
txtBase.gatewayTlsSha256 = opts.gatewayTlsFingerprintSha256;
}
}
if (typeof opts.canvasPort === "number" && opts.canvasPort > 0) {
txtBase.canvasPort = String(opts.canvasPort);
}
if (typeof opts.tailnetDns === "string" && opts.tailnetDns.trim()) {
txtBase.tailnetDns = opts.tailnetDns.trim();
}
// In minimal mode, omit cliPath to avoid exposing filesystem structure.
// This info can be obtained via the authenticated WebSocket if needed.
if (!opts.minimal && typeof opts.cliPath === "string" && opts.cliPath.trim()) {
txtBase.cliPath = opts.cliPath.trim();
}
// Build TXT record for the gateway service.
// In minimal mode, omit sshPort to avoid advertising SSH availability.
const gatewayTxt: Record<string, string> = {
...txtBase,
transport: "gateway",
};
if (!opts.minimal) {
gatewayTxt.sshPort = String(opts.sshPort ?? 22);
}
function createCycle(): BonjourCycle {
const responder = getResponder();
const services: Array<{ label: string; svc: BonjourService }> = [];
const gateway = responder.createService({
name: safeServiceName(instanceName),
type: "openclaw-gw",
protocol: Protocol.TCP,
port: opts.gatewayPort,
domain: "local",
hostname,
txt: gatewayTxt,
});
services.push({
label: "gateway",
svc: gateway as unknown as BonjourService,
});
const cleanupUnhandledRejection =
services.length > 0
? registerUnhandledRejectionHandler(handleCiaoUnhandledRejection)
: undefined;
return { responder, services, cleanupUnhandledRejection };
}
async function stopCycle(cycle: BonjourCycle | null) {
if (!cycle) {
return;
}
const responder = cycle.responder as unknown as {
advertiseService?: (...args: unknown[]) => unknown;
announce?: (...args: unknown[]) => unknown;
probe?: (...args: unknown[]) => unknown;
republishService?: (...args: unknown[]) => unknown;
const txtBase: Record<string, string> = {
role: "gateway",
gatewayPort: String(opts.gatewayPort),
lanHost: `${hostname}.local`,
displayName,
};
const noopAsync = async () => {};
// ciao schedules its own 2s retry timers after failed probe/announce attempts.
// Those callbacks target the original responder instance, so disarm it before
// destroy/shutdown to prevent a dead cycle from re-entering advertise/probe.
responder.advertiseService = noopAsync;
responder.announce = noopAsync;
responder.probe = noopAsync;
responder.republishService = noopAsync;
for (const { svc } of cycle.services) {
if (opts.gatewayTlsEnabled) {
txtBase.gatewayTls = "1";
if (opts.gatewayTlsFingerprintSha256) {
txtBase.gatewayTlsSha256 = opts.gatewayTlsFingerprintSha256;
}
}
if (typeof opts.canvasPort === "number" && opts.canvasPort > 0) {
txtBase.canvasPort = String(opts.canvasPort);
}
if (typeof opts.tailnetDns === "string" && opts.tailnetDns.trim()) {
txtBase.tailnetDns = opts.tailnetDns.trim();
}
// In minimal mode, omit cliPath to avoid exposing filesystem structure.
// This info can be obtained via the authenticated WebSocket if needed.
if (!opts.minimal && typeof opts.cliPath === "string" && opts.cliPath.trim()) {
txtBase.cliPath = opts.cliPath.trim();
}
// Build TXT record for the gateway service.
// In minimal mode, omit sshPort to avoid advertising SSH availability.
const gatewayTxt: Record<string, string> = {
...txtBase,
transport: "gateway",
};
if (!opts.minimal) {
gatewayTxt.sshPort = String(opts.sshPort ?? 22);
}
function createCycle(): BonjourCycle {
const responder = getResponder();
const services: Array<{ label: string; svc: BonjourService }> = [];
const gateway = responder.createService({
name: safeServiceName(instanceName),
type: "openclaw-gw",
protocol: Protocol.TCP,
port: opts.gatewayPort,
domain: "local",
hostname,
txt: gatewayTxt,
});
services.push({
label: "gateway",
svc: gateway as unknown as BonjourService,
});
const cleanupUnhandledRejection =
services.length > 0
? registerUnhandledRejectionHandler(handleCiaoUnhandledRejection)
: undefined;
return { responder, services, cleanupUnhandledRejection };
}
async function stopCycle(cycle: BonjourCycle | null) {
if (!cycle) {
return;
}
const responder = cycle.responder as unknown as {
advertiseService?: (...args: unknown[]) => unknown;
announce?: (...args: unknown[]) => unknown;
probe?: (...args: unknown[]) => unknown;
republishService?: (...args: unknown[]) => unknown;
};
const noopAsync = async () => {};
// ciao schedules its own 2s retry timers after failed probe/announce attempts.
// Those callbacks target the original responder instance, so disarm it before
// destroy/shutdown to prevent a dead cycle from re-entering advertise/probe.
responder.advertiseService = noopAsync;
responder.announce = noopAsync;
responder.probe = noopAsync;
responder.republishService = noopAsync;
for (const { svc } of cycle.services) {
try {
await svc.destroy();
} catch {
/* ignore */
}
}
try {
await svc.destroy();
await cycle.responder.shutdown();
} catch {
/* ignore */
} finally {
cycle.cleanupUnhandledRejection?.();
}
}
try {
await cycle.responder.shutdown();
} catch {
/* ignore */
} finally {
cycle.cleanupUnhandledRejection?.();
}
}
function attachConflictListeners(services: Array<{ label: string; svc: BonjourService }>) {
for (const { label, svc } of services) {
try {
svc.on("name-change", (name: unknown) => {
const next = typeof name === "string" ? name : String(name);
logWarn(`bonjour: ${label} name conflict resolved; newName=${JSON.stringify(next)}`);
});
svc.on("hostname-change", (nextHostname: unknown) => {
const next = typeof nextHostname === "string" ? nextHostname : String(nextHostname);
logWarn(
`bonjour: ${label} hostname conflict resolved; newHostname=${JSON.stringify(next)}`,
);
});
} catch (err) {
logDebug(`bonjour: failed to attach listeners for ${label}: ${String(err)}`);
}
}
}
function startAdvertising(services: Array<{ label: string; svc: BonjourService }>) {
for (const { label, svc } of services) {
try {
void svc
.advertise()
.then(() => {
// Keep this out of stdout/stderr (menubar + tests) but capture in the rolling log.
getLogger().info(`bonjour: advertised ${serviceSummary(label, svc)}`);
})
.catch((err) => {
function attachConflictListeners(services: Array<{ label: string; svc: BonjourService }>) {
for (const { label, svc } of services) {
try {
svc.on("name-change", (name: unknown) => {
const next = typeof name === "string" ? name : String(name);
logWarn(`bonjour: ${label} name conflict resolved; newName=${JSON.stringify(next)}`);
});
svc.on("hostname-change", (nextHostname: unknown) => {
const next = typeof nextHostname === "string" ? nextHostname : String(nextHostname);
logWarn(
`bonjour: advertise failed (${serviceSummary(label, svc)}): ${formatBonjourError(err)}`,
`bonjour: ${label} hostname conflict resolved; newHostname=${JSON.stringify(next)}`,
);
});
} catch (err) {
logWarn(
`bonjour: advertise threw (${serviceSummary(label, svc)}): ${formatBonjourError(err)}`,
);
} catch (err) {
logDebug(`bonjour: failed to attach listeners for ${label}: ${String(err)}`);
}
}
}
}
logDebug(
`bonjour: starting (hostname=${hostname}, instance=${JSON.stringify(
safeServiceName(instanceName),
)}, gatewayPort=${opts.gatewayPort}${opts.minimal ? ", minimal=true" : `, sshPort=${opts.sshPort ?? 22}`})`,
);
let stopped = false;
let recreatePromise: Promise<void> | null = null;
let cycle = createCycle();
const stateTracker = new Map<string, ServiceStateTracker>();
attachConflictListeners(cycle.services);
startAdvertising(cycle.services);
const updateStateTrackers = (services: Array<{ label: string; svc: BonjourService }>) => {
const now = Date.now();
for (const { label, svc } of services) {
const nextState: BonjourServiceState | "unknown" =
typeof svc.serviceState === "string" ? svc.serviceState : "unknown";
const current = stateTracker.get(label);
const nextEnteredAt =
current && !isAnnouncedState(current.state) && !isAnnouncedState(nextState)
? current.sinceMs
: now;
if (!current || current.state !== nextState || current.sinceMs !== nextEnteredAt) {
stateTracker.set(label, { state: nextState, sinceMs: nextEnteredAt });
function startAdvertising(services: Array<{ label: string; svc: BonjourService }>) {
for (const { label, svc } of services) {
try {
void svc
.advertise()
.then(() => {
// Keep this out of stdout/stderr (menubar + tests) but capture in the rolling log.
getLogger().info(`bonjour: advertised ${serviceSummary(label, svc)}`);
})
.catch((err) => {
logWarn(
`bonjour: advertise failed (${serviceSummary(label, svc)}): ${formatBonjourError(err)}`,
);
});
} catch (err) {
logWarn(
`bonjour: advertise threw (${serviceSummary(label, svc)}): ${formatBonjourError(err)}`,
);
}
}
}
};
const recreateAdvertiser = async (reason: string) => {
if (stopped) {
return;
}
if (recreatePromise) {
logDebug(
`bonjour: starting (hostname=${hostname}, instance=${JSON.stringify(
safeServiceName(instanceName),
)}, gatewayPort=${opts.gatewayPort}${opts.minimal ? ", minimal=true" : `, sshPort=${opts.sshPort ?? 22}`})`,
);
let stopped = false;
let recreatePromise: Promise<void> | null = null;
let cycle = createCycle();
const stateTracker = new Map<string, ServiceStateTracker>();
attachConflictListeners(cycle.services);
startAdvertising(cycle.services);
const updateStateTrackers = (services: Array<{ label: string; svc: BonjourService }>) => {
const now = Date.now();
for (const { label, svc } of services) {
const nextState: BonjourServiceState | "unknown" =
typeof svc.serviceState === "string" ? svc.serviceState : "unknown";
const current = stateTracker.get(label);
const nextEnteredAt =
current && !isAnnouncedState(current.state) && !isAnnouncedState(nextState)
? current.sinceMs
: now;
if (!current || current.state !== nextState || current.sinceMs !== nextEnteredAt) {
stateTracker.set(label, { state: nextState, sinceMs: nextEnteredAt });
}
}
};
const recreateAdvertiser = async (reason: string) => {
if (stopped) {
return;
}
if (recreatePromise) {
return recreatePromise;
}
recreatePromise = (async () => {
logWarn(`bonjour: restarting advertiser (${reason})`);
const previous = cycle;
await stopCycle(previous);
cycle = createCycle();
stateTracker.clear();
attachConflictListeners(cycle.services);
startAdvertising(cycle.services);
})().finally(() => {
recreatePromise = null;
});
return recreatePromise;
}
recreatePromise = (async () => {
logWarn(`bonjour: restarting advertiser (${reason})`);
const previous = cycle;
await stopCycle(previous);
cycle = createCycle();
stateTracker.clear();
attachConflictListeners(cycle.services);
startAdvertising(cycle.services);
})().finally(() => {
recreatePromise = null;
});
return recreatePromise;
};
};
// Watchdog: if we ever end up in an unannounced state (e.g. after sleep/wake or
// interface churn), try to re-advertise instead of requiring a full gateway restart.
const lastRepairAttempt = new Map<string, number>();
const watchdog = setInterval(() => {
if (stopped || recreatePromise) {
return;
}
updateStateTrackers(cycle.services);
for (const { label, svc } of cycle.services) {
const stateUnknown = (svc as { serviceState?: unknown }).serviceState;
if (typeof stateUnknown !== "string") {
continue;
// Watchdog: if we ever end up in an unannounced state (e.g. after sleep/wake or
// interface churn), try to re-advertise instead of requiring a full gateway restart.
const lastRepairAttempt = new Map<string, number>();
const watchdog = setInterval(() => {
if (stopped || recreatePromise) {
return;
}
const tracked = stateTracker.get(label);
if (
stateUnknown !== "announced" &&
tracked &&
Date.now() - tracked.sinceMs >= STUCK_ANNOUNCING_MS
) {
void recreateAdvertiser(
`service stuck in ${stateUnknown} for ${Date.now() - tracked.sinceMs}ms (${serviceSummary(
updateStateTrackers(cycle.services);
for (const { label, svc } of cycle.services) {
const stateUnknown = (svc as { serviceState?: unknown }).serviceState;
if (typeof stateUnknown !== "string") {
continue;
}
const tracked = stateTracker.get(label);
if (
stateUnknown !== "announced" &&
tracked &&
Date.now() - tracked.sinceMs >= STUCK_ANNOUNCING_MS
) {
void recreateAdvertiser(
`service stuck in ${stateUnknown} for ${Date.now() - tracked.sinceMs}ms (${serviceSummary(
label,
svc,
)})`,
);
return;
}
if (stateUnknown === "announced" || stateUnknown === "announcing") {
continue;
}
let key = label;
try {
key = `${label}:${svc.getFQDN()}`;
} catch {
// ignore
}
const now = Date.now();
const last = lastRepairAttempt.get(key) ?? 0;
if (now - last < REPAIR_DEBOUNCE_MS) {
continue;
}
lastRepairAttempt.set(key, now);
logWarn(
`bonjour: watchdog detected non-announced service; attempting re-advertise (${serviceSummary(
label,
svc,
)})`,
);
return;
}
if (stateUnknown === "announced" || stateUnknown === "announcing") {
continue;
}
let key = label;
try {
key = `${label}:${svc.getFQDN()}`;
} catch {
// ignore
}
const now = Date.now();
const last = lastRepairAttempt.get(key) ?? 0;
if (now - last < REPAIR_DEBOUNCE_MS) {
continue;
}
lastRepairAttempt.set(key, now);
logWarn(
`bonjour: watchdog detected non-announced service; attempting re-advertise (${serviceSummary(
label,
svc,
)})`,
);
try {
void svc.advertise().catch((err) => {
try {
void svc.advertise().catch((err) => {
logWarn(
`bonjour: watchdog advertise failed (${serviceSummary(label, svc)}): ${formatBonjourError(err)}`,
);
});
} catch (err) {
logWarn(
`bonjour: watchdog advertise failed (${serviceSummary(label, svc)}): ${formatBonjourError(err)}`,
`bonjour: watchdog advertise threw (${serviceSummary(label, svc)}): ${formatBonjourError(err)}`,
);
});
} catch (err) {
logWarn(
`bonjour: watchdog advertise threw (${serviceSummary(label, svc)}): ${formatBonjourError(err)}`,
);
}
}
}
}, WATCHDOG_INTERVAL_MS);
watchdog.unref?.();
}, WATCHDOG_INTERVAL_MS);
watchdog.unref?.();
return {
stop: async () => {
stopped = true;
clearInterval(watchdog);
await recreatePromise;
await stopCycle(cycle);
},
};
return {
stop: async () => {
stopped = true;
try {
clearInterval(watchdog);
await recreatePromise;
await stopCycle(cycle);
} finally {
restoreConsoleLog();
}
},
};
} catch (err) {
restoreConsoleLog();
throw err;
}
}

View File

@ -10,10 +10,13 @@ const pluginSdkSubpathsCache = new Map();
const isDistRootAlias = __filename.includes(
`${path.sep}dist${path.sep}plugin-sdk${path.sep}root-alias.cjs`,
);
const shouldPreferSourceInTests =
// Source plugin entry loading must stay on the source graph end-to-end. Mixing a
// source root alias with dist compat/runtime shims can split singleton deps
// (for example matrix-js-sdk) across two module graphs.
const shouldPreferSourceGraph =
!isDistRootAlias &&
(Boolean(process.env.VITEST) ||
process.env.NODE_ENV === "test" ||
(process.env.NODE_ENV !== "production" ||
Boolean(process.env.VITEST) ||
process.env.OPENCLAW_PLUGIN_SDK_SOURCE_IN_TESTS === "1");
function emptyPluginConfigSchema() {
@ -160,7 +163,7 @@ function loadMonolithicSdk() {
}
const distCandidate = path.resolve(__dirname, "..", "..", "dist", "plugin-sdk", "compat.js");
if (!shouldPreferSourceInTests && fs.existsSync(distCandidate)) {
if (!shouldPreferSourceGraph && fs.existsSync(distCandidate)) {
try {
monolithicSdk = getJiti(true)(distCandidate);
return monolithicSdk;
@ -186,7 +189,7 @@ function loadDiagnosticEventsModule() {
"infra",
"diagnostic-events.js",
);
if (!shouldPreferSourceInTests) {
if (!shouldPreferSourceGraph) {
const distCandidate =
(fs.existsSync(directDistCandidate) && directDistCandidate) ||
findDistChunkByPrefix("diagnostic-events");

View File

@ -173,10 +173,22 @@ describe("plugin-sdk root alias", () => {
});
it.each([
{
name: "prefers source loading when the source root alias runs in development",
options: {
distExists: true,
env: { NODE_ENV: "development" },
monolithicExports: {
slowHelper: (): string => "loaded",
},
},
expectedTryNative: false,
},
{
name: "prefers native loading when compat resolves to dist",
options: {
distExists: true,
env: { NODE_ENV: "production" },
monolithicExports: {
slowHelper: (): string => "loaded",
},

View File

@ -152,6 +152,22 @@ describe("command queue", () => {
}
});
it("demotes live model switch lane failures to debug noise", async () => {
const error = new Error("Live session model switch requested: anthropic/claude-opus-4-6");
error.name = "LiveSessionModelSwitchError";
await expect(
enqueueCommandInLane("nested", async () => {
throw error;
}),
).rejects.toBe(error);
expect(diagnosticMocks.diag.error).not.toHaveBeenCalled();
expect(diagnosticMocks.diag.debug).toHaveBeenCalledWith(
expect.stringContaining("lane task interrupted: lane=nested"),
);
});
it("getActiveTaskCount returns count of currently executing tasks", async () => {
const { task, release } = enqueueBlockedMainTask();

View File

@ -47,6 +47,10 @@ type LaneState = {
generation: number;
};
function isExpectedNonErrorLaneFailure(err: unknown): boolean {
return err instanceof Error && err.name === "LiveSessionModelSwitchError";
}
/**
* Keep queue runtime state on globalThis so every bundled entry/chunk shares
* the same lanes, counters, and draining flag in production builds.
@ -141,10 +145,14 @@ function drainLane(lane: string) {
} catch (err) {
const completedCurrentGeneration = completeTask(state, taskId, taskGeneration);
const isProbeLane = lane.startsWith("auth-probe:") || lane.startsWith("session:probe-");
if (!isProbeLane) {
if (!isProbeLane && !isExpectedNonErrorLaneFailure(err)) {
diag.error(
`lane task error: lane=${lane} durationMs=${Date.now() - startTime} error="${String(err)}"`,
);
} else if (!isProbeLane) {
diag.debug(
`lane task interrupted: lane=${lane} durationMs=${Date.now() - startTime} reason="${String(err)}"`,
);
}
if (completedCurrentGeneration) {
pump();