fix(plugins): preserve live hook registry during gateway runs

This commit is contained in:
Vincent Koc 2026-03-23 01:00:08 -07:00
parent 9105b3723d
commit d22279d2e8
9 changed files with 198 additions and 63 deletions

View File

@ -7,6 +7,10 @@ import {
DefaultResourceLoader,
SessionManager,
} from "@mariozechner/pi-coding-agent";
import {
resolveTelegramInlineButtonsScope,
resolveTelegramReactionLevel,
} from "../../../../extensions/telegram/api.js";
import { resolveHeartbeatPrompt } from "../../../auto-reply/heartbeat.js";
import { resolveChannelCapabilities } from "../../../config/channel-capabilities.js";
import type { OpenClawConfig } from "../../../config/config.js";
@ -17,10 +21,6 @@ import {
} from "../../../infra/net/undici-global-dispatcher.js";
import { MAX_IMAGE_BYTES } from "../../../media/constants.js";
import { resolveSignalReactionLevel } from "../../../plugin-sdk/signal.js";
import {
resolveTelegramInlineButtonsScope,
resolveTelegramReactionLevel,
} from "../../../plugin-sdk/telegram.js";
import { getGlobalHookRunner } from "../../../plugins/hook-runner-global.js";
import type {
PluginHookAgentContext,
@ -1663,6 +1663,7 @@ export async function runEmbeddedAttempt(
params: EmbeddedRunAttemptParams,
): Promise<EmbeddedRunAttemptResult> {
const resolvedWorkspace = resolveUserPath(params.workspaceDir);
const prevCwd = process.cwd();
const runAbortController = new AbortController();
// Proxy bootstrap must happen before timeout tuning so the timeouts wrap the
// active EnvHttpProxyAgent instead of being replaced by a bare proxy dispatcher.
@ -1689,6 +1690,7 @@ export async function runEmbeddedAttempt(
await fs.mkdir(effectiveWorkspace, { recursive: true });
let restoreSkillEnv: (() => void) | undefined;
process.chdir(effectiveWorkspace);
try {
const { shouldLoadSkillEntries, skillEntries } = resolveEmbeddedRunSkillEntries({
workspaceDir: effectiveWorkspace,
@ -1943,7 +1945,7 @@ export async function runEmbeddedAttempt(
config: params.config,
agentId: sessionAgentId,
workspaceDir: effectiveWorkspace,
cwd: effectiveWorkspace,
cwd: process.cwd(),
runtime: {
host: machineName,
os: `${os.type()} ${os.release()}`,
@ -1962,7 +1964,7 @@ export async function runEmbeddedAttempt(
const docsPath = await resolveOpenClawDocsPath({
workspaceDir: effectiveWorkspace,
argv1: process.argv[1],
cwd: effectiveWorkspace,
cwd: process.cwd(),
moduleUrl: import.meta.url,
});
const ttsHint = params.config ? buildTtsSystemPromptHint(params.config) : undefined;
@ -2833,6 +2835,11 @@ export async function runEmbeddedAttempt(
);
}
if (process.env.OPENCLAW_PLUGIN_CHECKPOINTS === "1") {
log.warn(
`[hooks][checkpoints] attempt llm_input runId=${params.runId} sessionKey=${params.sessionKey ?? "unknown"} pid=${process.pid} hookRunner=${hookRunner ? "present" : "missing"} hasHooks=${hookRunner?.hasHooks("llm_input") === true}`,
);
}
if (hookRunner?.hasHooks("llm_input")) {
hookRunner
.runLlmInput(
@ -3105,6 +3112,11 @@ export async function runEmbeddedAttempt(
// Run agent_end hooks to allow plugins to analyze the conversation
// This is fire-and-forget, so we don't await
// Run even on compaction timeout so plugins can log/cleanup
if (process.env.OPENCLAW_PLUGIN_CHECKPOINTS === "1") {
log.warn(
`[hooks][checkpoints] attempt agent_end runId=${params.runId} sessionKey=${params.sessionKey ?? "unknown"} pid=${process.pid} hookRunner=${hookRunner ? "present" : "missing"} hasHooks=${hookRunner?.hasHooks("agent_end") === true}`,
);
}
if (hookRunner?.hasHooks("agent_end")) {
hookRunner
.runAgentEnd(
@ -3164,6 +3176,11 @@ export async function runEmbeddedAttempt(
)
.map((entry) => ({ toolName: entry.toolName, meta: entry.meta }));
if (process.env.OPENCLAW_PLUGIN_CHECKPOINTS === "1") {
log.warn(
`[hooks][checkpoints] attempt llm_output runId=${params.runId} sessionKey=${params.sessionKey ?? "unknown"} pid=${process.pid} hookRunner=${hookRunner ? "present" : "missing"} hasHooks=${hookRunner?.hasHooks("llm_output") === true}`,
);
}
if (hookRunner?.hasHooks("llm_output")) {
hookRunner
.runLlmOutput(
@ -3242,5 +3259,6 @@ export async function runEmbeddedAttempt(
}
} finally {
restoreSkillEnv?.();
process.chdir(prevCwd);
}
}

View File

@ -0,0 +1,52 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
const hoisted = vi.hoisted(() => ({
loadOpenClawPlugins: vi.fn(),
getActivePluginRegistryKey: vi.fn<() => string | null>(),
}));
vi.mock("../plugins/loader.js", () => ({
loadOpenClawPlugins: hoisted.loadOpenClawPlugins,
}));
vi.mock("../plugins/runtime.js", () => ({
getActivePluginRegistryKey: hoisted.getActivePluginRegistryKey,
}));
const { ensureRuntimePluginsLoaded } = await import("./runtime-plugins.js");
describe("ensureRuntimePluginsLoaded", () => {
beforeEach(() => {
hoisted.loadOpenClawPlugins.mockReset();
hoisted.getActivePluginRegistryKey.mockReset();
hoisted.getActivePluginRegistryKey.mockReturnValue(null);
});
it("does not reactivate plugins when a process already has an active registry", () => {
hoisted.getActivePluginRegistryKey.mockReturnValue("gateway-registry");
ensureRuntimePluginsLoaded({
config: {} as never,
workspaceDir: "/tmp/workspace",
allowGatewaySubagentBinding: true,
});
expect(hoisted.loadOpenClawPlugins).not.toHaveBeenCalled();
});
it("loads runtime plugins when no active registry exists", () => {
ensureRuntimePluginsLoaded({
config: {} as never,
workspaceDir: "/tmp/workspace",
allowGatewaySubagentBinding: true,
});
expect(hoisted.loadOpenClawPlugins).toHaveBeenCalledWith({
config: {} as never,
workspaceDir: "/tmp/workspace",
runtimeOptions: {
allowGatewaySubagentBinding: true,
},
});
});
});

View File

@ -1,5 +1,6 @@
import type { OpenClawConfig } from "../config/config.js";
import { loadOpenClawPlugins } from "../plugins/loader.js";
import { getActivePluginRegistryKey } from "../plugins/runtime.js";
import { resolveUserPath } from "../utils.js";
export function ensureRuntimePluginsLoaded(params: {
@ -7,6 +8,10 @@ export function ensureRuntimePluginsLoaded(params: {
workspaceDir?: string | null;
allowGatewaySubagentBinding?: boolean;
}): void {
if (getActivePluginRegistryKey()) {
return;
}
const workspaceDir =
typeof params.workspaceDir === "string" && params.workspaceDir.trim()
? resolveUserPath(params.workspaceDir)

View File

@ -1,31 +1,23 @@
import { normalizeProviderId } from "../agents/model-selection.js";
import type { OpenClawConfig } from "../config/config.js";
import { isBlockedObjectKey } from "../infra/prototype-keys.js";
import { loadOpenClawPlugins } from "../plugins/loader.js";
import { getActivePluginRegistry } from "../plugins/runtime.js";
import { getActivePluginRegistry, getActivePluginRegistryKey } from "../plugins/runtime.js";
import type { ImageGenerationProviderPlugin } from "../plugins/types.js";
const BUILTIN_IMAGE_GENERATION_PROVIDERS: readonly ImageGenerationProviderPlugin[] = [];
function normalizeImageGenerationProviderId(id: string | undefined): string | undefined {
const normalized = normalizeProviderId(id ?? "");
if (!normalized || isBlockedObjectKey(normalized)) {
return undefined;
}
return normalized;
return normalized || undefined;
}
function resolvePluginImageGenerationProviders(
cfg?: OpenClawConfig,
): ImageGenerationProviderPlugin[] {
const active = getActivePluginRegistry();
const activeEntries = active?.imageGenerationProviders?.map((entry) => entry.provider) ?? [];
if (activeEntries.length > 0 || !cfg) {
return activeEntries;
}
return loadOpenClawPlugins({ config: cfg }).imageGenerationProviders.map(
(entry) => entry.provider,
);
const registry =
getActivePluginRegistryKey() || !cfg ? active : loadOpenClawPlugins({ config: cfg });
return registry?.imageGenerationProviders?.map((entry) => entry.provider) ?? [];
}
function buildProviderMaps(cfg?: OpenClawConfig): {

View File

@ -1,6 +1,6 @@
import type { OpenClawConfig } from "../config/config.js";
import { resolveGlobalSingleton } from "../shared/global-singleton.js";
import { notifyListeners, registerListener } from "../shared/listeners.js";
const diagnosticCheckpointLogsEnabled = process.env.OPENCLAW_DIAGNOSTIC_CHECKPOINTS === "1";
export type DiagnosticSessionState = "idle" | "processing" | "waiting";
@ -176,22 +176,26 @@ type DiagnosticEventsGlobalState = {
dispatchDepth: number;
};
const DIAGNOSTIC_EVENTS_STATE_KEY = Symbol.for("openclaw.diagnosticEvents.state");
const state = resolveGlobalSingleton<DiagnosticEventsGlobalState>(
DIAGNOSTIC_EVENTS_STATE_KEY,
() => ({
seq: 0,
listeners: new Set<(evt: DiagnosticEventPayload) => void>(),
dispatchDepth: 0,
}),
);
function getDiagnosticEventsState(): DiagnosticEventsGlobalState {
const globalStore = globalThis as typeof globalThis & {
__openclawDiagnosticEventsState?: DiagnosticEventsGlobalState;
};
if (!globalStore.__openclawDiagnosticEventsState) {
globalStore.__openclawDiagnosticEventsState = {
seq: 0,
listeners: new Set<(evt: DiagnosticEventPayload) => void>(),
dispatchDepth: 0,
};
}
return globalStore.__openclawDiagnosticEventsState;
}
export function isDiagnosticsEnabled(config?: OpenClawConfig): boolean {
return config?.diagnostics?.enabled === true;
}
export function emitDiagnosticEvent(event: DiagnosticEventInput) {
const state = getDiagnosticEventsState();
if (state.dispatchDepth > 100) {
console.error(
`[diagnostic-events] recursion guard tripped at depth=${state.dispatchDepth}, dropping type=${event.type}`,
@ -204,27 +208,49 @@ export function emitDiagnosticEvent(event: DiagnosticEventInput) {
seq: (state.seq += 1),
ts: Date.now(),
} satisfies DiagnosticEventPayload;
state.dispatchDepth += 1;
notifyListeners(state.listeners, enriched, (err) => {
const errorMessage =
err instanceof Error
? (err.stack ?? err.message)
: typeof err === "string"
? err
: String(err);
console.error(
`[diagnostic-events] listener error type=${enriched.type} seq=${enriched.seq}: ${errorMessage}`,
if (diagnosticCheckpointLogsEnabled) {
console.warn(
`[diagnostic-events][checkpoints] emit type=${enriched.type} seq=${enriched.seq} listeners=${state.listeners.size}${"sessionKey" in enriched && typeof enriched.sessionKey === "string" ? ` sessionKey=${enriched.sessionKey}` : ""}`,
);
// Ignore listener failures.
});
}
state.dispatchDepth += 1;
for (const listener of state.listeners) {
try {
listener(enriched);
} catch (err) {
const errorMessage =
err instanceof Error
? (err.stack ?? err.message)
: typeof err === "string"
? err
: String(err);
console.error(
`[diagnostic-events] listener error type=${enriched.type} seq=${enriched.seq}: ${errorMessage}`,
);
// Ignore listener failures.
}
}
state.dispatchDepth -= 1;
}
export function onDiagnosticEvent(listener: (evt: DiagnosticEventPayload) => void): () => void {
return registerListener(state.listeners, listener);
const state = getDiagnosticEventsState();
state.listeners.add(listener);
if (diagnosticCheckpointLogsEnabled) {
console.warn(`[diagnostic-events][checkpoints] subscribe listeners=${state.listeners.size}`);
}
return () => {
state.listeners.delete(listener);
if (diagnosticCheckpointLogsEnabled) {
console.warn(
`[diagnostic-events][checkpoints] unsubscribe listeners=${state.listeners.size}`,
);
}
};
}
export function resetDiagnosticEventsForTest(): void {
const state = getDiagnosticEventsState();
state.seq = 0;
state.listeners.clear();
state.dispatchDepth = 0;

View File

@ -159,6 +159,7 @@ function getHooksForNameAndPlugin<K extends PluginHookName>(
export function createHookRunner(registry: PluginRegistry, options: HookRunnerOptions = {}) {
const logger = options.logger;
const catchErrors = options.catchErrors ?? true;
const hookCheckpointLogsEnabled = process.env.OPENCLAW_PLUGIN_CHECKPOINTS === "1";
const mergeBeforeModelResolve = (
acc: PluginHookBeforeModelResolveResult | undefined,
@ -250,9 +251,17 @@ export function createHookRunner(registry: PluginRegistry, options: HookRunnerOp
}
logger?.debug?.(`[hooks] running ${hookName} (${hooks.length} handlers)`);
if (hookCheckpointLogsEnabled) {
logger?.warn(
`[hooks][checkpoints] dispatch ${hookName} handlers=${hooks.map((hook) => hook.pluginId).join(",")}`,
);
}
const promises = hooks.map(async (hook) => {
try {
if (hookCheckpointLogsEnabled) {
logger?.warn(`[hooks][checkpoints] invoke ${hookName} plugin=${hook.pluginId}`);
}
await (hook.handler as (event: unknown, ctx: unknown) => Promise<void>)(event, ctx);
} catch (err) {
handleHookError({ hookName, pluginId: hook.pluginId, error: err });

View File

@ -1,4 +1,3 @@
import { resolveGlobalSingleton } from "../shared/global-singleton.js";
import { createEmptyPluginRegistry } from "./registry-empty.js";
import type { PluginRegistry } from "./registry.js";
@ -12,15 +11,33 @@ type RegistryState = {
version: number;
};
const state = resolveGlobalSingleton<RegistryState>(REGISTRY_STATE, () => ({
registry: createEmptyPluginRegistry(),
httpRouteRegistry: null,
httpRouteRegistryPinned: false,
key: null,
version: 0,
}));
const state: RegistryState = (() => {
const globalState = globalThis as typeof globalThis & {
[REGISTRY_STATE]?: RegistryState;
};
if (!globalState[REGISTRY_STATE]) {
globalState[REGISTRY_STATE] = {
registry: createEmptyPluginRegistry(),
httpRouteRegistry: null,
httpRouteRegistryPinned: false,
key: null,
version: 0,
};
}
return globalState[REGISTRY_STATE];
})();
export function setActivePluginRegistry(registry: PluginRegistry, cacheKey?: string) {
if (process.env.OPENCLAW_PLUGIN_CHECKPOINTS === "1") {
const stack = new Error().stack
?.split("\n")
.slice(2, 5)
.map((line) => line.trim())
.join(" | ");
console.warn(
`[plugins][checkpoints] activate registry key=${cacheKey ?? "none"} plugins=${registry.plugins.length} typedHooks=${registry.typedHooks.length}${stack ? ` caller=${stack}` : ""}`,
);
}
state.registry = registry;
if (!state.httpRouteRegistryPinned) {
state.httpRouteRegistry = registry;

View File

@ -5,6 +5,7 @@ import type { PluginRegistry } from "./registry.js";
import type { OpenClawPluginServiceContext, PluginLogger } from "./types.js";
const log = createSubsystemLogger("plugins");
const pluginCheckpointLogsEnabled = process.env.OPENCLAW_PLUGIN_CHECKPOINTS === "1";
function createPluginLogger(): PluginLogger {
return {
@ -47,8 +48,18 @@ export async function startPluginServices(params: {
for (const entry of params.registry.services) {
const service = entry.service;
const typedHookCountBefore = params.registry.typedHooks.length;
try {
await service.start(serviceContext);
if (pluginCheckpointLogsEnabled) {
const newTypedHooks = params.registry.typedHooks
.slice(typedHookCountBefore)
.filter((hook) => hook.pluginId === entry.pluginId)
.map((hook) => hook.hookName);
log.warn(
`[plugins][checkpoints] service started (${service.id}, plugin=${entry.pluginId}) typedHooksAdded=${newTypedHooks.length}${newTypedHooks.length > 0 ? ` hooks=${newTypedHooks.join(",")}` : ""}`,
);
}
running.push({
id: service.id,
stop: service.stop ? () => service.stop?.(serviceContext) : undefined,

View File

@ -4,6 +4,7 @@ import { createSubsystemLogger } from "../logging/subsystem.js";
import { applyTestPluginDefaults, normalizePluginsConfig } from "./config-state.js";
import { loadOpenClawPlugins } from "./loader.js";
import { createPluginLoaderLogger } from "./logger.js";
import { getActivePluginRegistry, getActivePluginRegistryKey } from "./runtime.js";
import type { OpenClawPluginToolContext } from "./types.js";
const log = createSubsystemLogger("plugins");
@ -59,17 +60,21 @@ export function resolvePluginTools(params: {
return [];
}
const registry = loadOpenClawPlugins({
config: effectiveConfig,
workspaceDir: params.context.workspaceDir,
runtimeOptions: params.allowGatewaySubagentBinding
? {
allowGatewaySubagentBinding: true,
}
: undefined,
env,
logger: createPluginLoaderLogger(log),
});
const activeRegistry = getActivePluginRegistry();
const registry =
getActivePluginRegistryKey() && activeRegistry
? activeRegistry
: loadOpenClawPlugins({
config: effectiveConfig,
workspaceDir: params.context.workspaceDir,
runtimeOptions: params.allowGatewaySubagentBinding
? {
allowGatewaySubagentBinding: true,
}
: undefined,
env,
logger: createPluginLoaderLogger(log),
});
const tools: AnyAgentTool[] = [];
const existing = params.existingToolNames ?? new Set<string>();