From d22279d2e873eb829ceda2bda607657405ea106c Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Mon, 23 Mar 2026 01:00:08 -0700 Subject: [PATCH] fix(plugins): preserve live hook registry during gateway runs --- src/agents/pi-embedded-runner/run/attempt.ts | 30 ++++++-- src/agents/runtime-plugins.test.ts | 52 ++++++++++++++ src/agents/runtime-plugins.ts | 5 ++ src/image-generation/provider-registry.ts | 18 ++--- src/infra/diagnostic-events.ts | 76 +++++++++++++------- src/plugins/hooks.ts | 9 +++ src/plugins/runtime.ts | 33 ++++++--- src/plugins/services.ts | 11 +++ src/plugins/tools.ts | 27 ++++--- 9 files changed, 198 insertions(+), 63 deletions(-) create mode 100644 src/agents/runtime-plugins.test.ts diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index ddc98aa22f1..bfe363da303 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -7,6 +7,10 @@ import { DefaultResourceLoader, SessionManager, } from "@mariozechner/pi-coding-agent"; +import { + resolveTelegramInlineButtonsScope, + resolveTelegramReactionLevel, +} from "../../../../extensions/telegram/api.js"; import { resolveHeartbeatPrompt } from "../../../auto-reply/heartbeat.js"; import { resolveChannelCapabilities } from "../../../config/channel-capabilities.js"; import type { OpenClawConfig } from "../../../config/config.js"; @@ -17,10 +21,6 @@ import { } from "../../../infra/net/undici-global-dispatcher.js"; import { MAX_IMAGE_BYTES } from "../../../media/constants.js"; import { resolveSignalReactionLevel } from "../../../plugin-sdk/signal.js"; -import { - resolveTelegramInlineButtonsScope, - resolveTelegramReactionLevel, -} from "../../../plugin-sdk/telegram.js"; import { getGlobalHookRunner } from "../../../plugins/hook-runner-global.js"; import type { PluginHookAgentContext, @@ -1663,6 +1663,7 @@ export async function runEmbeddedAttempt( params: EmbeddedRunAttemptParams, ): Promise { const resolvedWorkspace = resolveUserPath(params.workspaceDir); + const prevCwd = process.cwd(); const runAbortController = new AbortController(); // Proxy bootstrap must happen before timeout tuning so the timeouts wrap the // active EnvHttpProxyAgent instead of being replaced by a bare proxy dispatcher. @@ -1689,6 +1690,7 @@ export async function runEmbeddedAttempt( await fs.mkdir(effectiveWorkspace, { recursive: true }); let restoreSkillEnv: (() => void) | undefined; + process.chdir(effectiveWorkspace); try { const { shouldLoadSkillEntries, skillEntries } = resolveEmbeddedRunSkillEntries({ workspaceDir: effectiveWorkspace, @@ -1943,7 +1945,7 @@ export async function runEmbeddedAttempt( config: params.config, agentId: sessionAgentId, workspaceDir: effectiveWorkspace, - cwd: effectiveWorkspace, + cwd: process.cwd(), runtime: { host: machineName, os: `${os.type()} ${os.release()}`, @@ -1962,7 +1964,7 @@ export async function runEmbeddedAttempt( const docsPath = await resolveOpenClawDocsPath({ workspaceDir: effectiveWorkspace, argv1: process.argv[1], - cwd: effectiveWorkspace, + cwd: process.cwd(), moduleUrl: import.meta.url, }); const ttsHint = params.config ? buildTtsSystemPromptHint(params.config) : undefined; @@ -2833,6 +2835,11 @@ export async function runEmbeddedAttempt( ); } + if (process.env.OPENCLAW_PLUGIN_CHECKPOINTS === "1") { + log.warn( + `[hooks][checkpoints] attempt llm_input runId=${params.runId} sessionKey=${params.sessionKey ?? "unknown"} pid=${process.pid} hookRunner=${hookRunner ? "present" : "missing"} hasHooks=${hookRunner?.hasHooks("llm_input") === true}`, + ); + } if (hookRunner?.hasHooks("llm_input")) { hookRunner .runLlmInput( @@ -3105,6 +3112,11 @@ export async function runEmbeddedAttempt( // Run agent_end hooks to allow plugins to analyze the conversation // This is fire-and-forget, so we don't await // Run even on compaction timeout so plugins can log/cleanup + if (process.env.OPENCLAW_PLUGIN_CHECKPOINTS === "1") { + log.warn( + `[hooks][checkpoints] attempt agent_end runId=${params.runId} sessionKey=${params.sessionKey ?? "unknown"} pid=${process.pid} hookRunner=${hookRunner ? "present" : "missing"} hasHooks=${hookRunner?.hasHooks("agent_end") === true}`, + ); + } if (hookRunner?.hasHooks("agent_end")) { hookRunner .runAgentEnd( @@ -3164,6 +3176,11 @@ export async function runEmbeddedAttempt( ) .map((entry) => ({ toolName: entry.toolName, meta: entry.meta })); + if (process.env.OPENCLAW_PLUGIN_CHECKPOINTS === "1") { + log.warn( + `[hooks][checkpoints] attempt llm_output runId=${params.runId} sessionKey=${params.sessionKey ?? "unknown"} pid=${process.pid} hookRunner=${hookRunner ? "present" : "missing"} hasHooks=${hookRunner?.hasHooks("llm_output") === true}`, + ); + } if (hookRunner?.hasHooks("llm_output")) { hookRunner .runLlmOutput( @@ -3242,5 +3259,6 @@ export async function runEmbeddedAttempt( } } finally { restoreSkillEnv?.(); + process.chdir(prevCwd); } } diff --git a/src/agents/runtime-plugins.test.ts b/src/agents/runtime-plugins.test.ts new file mode 100644 index 00000000000..137bc23a699 --- /dev/null +++ b/src/agents/runtime-plugins.test.ts @@ -0,0 +1,52 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +const hoisted = vi.hoisted(() => ({ + loadOpenClawPlugins: vi.fn(), + getActivePluginRegistryKey: vi.fn<() => string | null>(), +})); + +vi.mock("../plugins/loader.js", () => ({ + loadOpenClawPlugins: hoisted.loadOpenClawPlugins, +})); + +vi.mock("../plugins/runtime.js", () => ({ + getActivePluginRegistryKey: hoisted.getActivePluginRegistryKey, +})); + +const { ensureRuntimePluginsLoaded } = await import("./runtime-plugins.js"); + +describe("ensureRuntimePluginsLoaded", () => { + beforeEach(() => { + hoisted.loadOpenClawPlugins.mockReset(); + hoisted.getActivePluginRegistryKey.mockReset(); + hoisted.getActivePluginRegistryKey.mockReturnValue(null); + }); + + it("does not reactivate plugins when a process already has an active registry", () => { + hoisted.getActivePluginRegistryKey.mockReturnValue("gateway-registry"); + + ensureRuntimePluginsLoaded({ + config: {} as never, + workspaceDir: "/tmp/workspace", + allowGatewaySubagentBinding: true, + }); + + expect(hoisted.loadOpenClawPlugins).not.toHaveBeenCalled(); + }); + + it("loads runtime plugins when no active registry exists", () => { + ensureRuntimePluginsLoaded({ + config: {} as never, + workspaceDir: "/tmp/workspace", + allowGatewaySubagentBinding: true, + }); + + expect(hoisted.loadOpenClawPlugins).toHaveBeenCalledWith({ + config: {} as never, + workspaceDir: "/tmp/workspace", + runtimeOptions: { + allowGatewaySubagentBinding: true, + }, + }); + }); +}); diff --git a/src/agents/runtime-plugins.ts b/src/agents/runtime-plugins.ts index 0bf395b505c..14c9f99af9b 100644 --- a/src/agents/runtime-plugins.ts +++ b/src/agents/runtime-plugins.ts @@ -1,5 +1,6 @@ import type { OpenClawConfig } from "../config/config.js"; import { loadOpenClawPlugins } from "../plugins/loader.js"; +import { getActivePluginRegistryKey } from "../plugins/runtime.js"; import { resolveUserPath } from "../utils.js"; export function ensureRuntimePluginsLoaded(params: { @@ -7,6 +8,10 @@ export function ensureRuntimePluginsLoaded(params: { workspaceDir?: string | null; allowGatewaySubagentBinding?: boolean; }): void { + if (getActivePluginRegistryKey()) { + return; + } + const workspaceDir = typeof params.workspaceDir === "string" && params.workspaceDir.trim() ? resolveUserPath(params.workspaceDir) diff --git a/src/image-generation/provider-registry.ts b/src/image-generation/provider-registry.ts index d91bcba6c7c..6233b270018 100644 --- a/src/image-generation/provider-registry.ts +++ b/src/image-generation/provider-registry.ts @@ -1,31 +1,23 @@ import { normalizeProviderId } from "../agents/model-selection.js"; import type { OpenClawConfig } from "../config/config.js"; -import { isBlockedObjectKey } from "../infra/prototype-keys.js"; import { loadOpenClawPlugins } from "../plugins/loader.js"; -import { getActivePluginRegistry } from "../plugins/runtime.js"; +import { getActivePluginRegistry, getActivePluginRegistryKey } from "../plugins/runtime.js"; import type { ImageGenerationProviderPlugin } from "../plugins/types.js"; const BUILTIN_IMAGE_GENERATION_PROVIDERS: readonly ImageGenerationProviderPlugin[] = []; function normalizeImageGenerationProviderId(id: string | undefined): string | undefined { const normalized = normalizeProviderId(id ?? ""); - if (!normalized || isBlockedObjectKey(normalized)) { - return undefined; - } - return normalized; + return normalized || undefined; } function resolvePluginImageGenerationProviders( cfg?: OpenClawConfig, ): ImageGenerationProviderPlugin[] { const active = getActivePluginRegistry(); - const activeEntries = active?.imageGenerationProviders?.map((entry) => entry.provider) ?? []; - if (activeEntries.length > 0 || !cfg) { - return activeEntries; - } - return loadOpenClawPlugins({ config: cfg }).imageGenerationProviders.map( - (entry) => entry.provider, - ); + const registry = + getActivePluginRegistryKey() || !cfg ? active : loadOpenClawPlugins({ config: cfg }); + return registry?.imageGenerationProviders?.map((entry) => entry.provider) ?? []; } function buildProviderMaps(cfg?: OpenClawConfig): { diff --git a/src/infra/diagnostic-events.ts b/src/infra/diagnostic-events.ts index 4379f975a3b..efad1d790f1 100644 --- a/src/infra/diagnostic-events.ts +++ b/src/infra/diagnostic-events.ts @@ -1,6 +1,6 @@ import type { OpenClawConfig } from "../config/config.js"; -import { resolveGlobalSingleton } from "../shared/global-singleton.js"; -import { notifyListeners, registerListener } from "../shared/listeners.js"; + +const diagnosticCheckpointLogsEnabled = process.env.OPENCLAW_DIAGNOSTIC_CHECKPOINTS === "1"; export type DiagnosticSessionState = "idle" | "processing" | "waiting"; @@ -176,22 +176,26 @@ type DiagnosticEventsGlobalState = { dispatchDepth: number; }; -const DIAGNOSTIC_EVENTS_STATE_KEY = Symbol.for("openclaw.diagnosticEvents.state"); - -const state = resolveGlobalSingleton( - DIAGNOSTIC_EVENTS_STATE_KEY, - () => ({ - seq: 0, - listeners: new Set<(evt: DiagnosticEventPayload) => void>(), - dispatchDepth: 0, - }), -); +function getDiagnosticEventsState(): DiagnosticEventsGlobalState { + const globalStore = globalThis as typeof globalThis & { + __openclawDiagnosticEventsState?: DiagnosticEventsGlobalState; + }; + if (!globalStore.__openclawDiagnosticEventsState) { + globalStore.__openclawDiagnosticEventsState = { + seq: 0, + listeners: new Set<(evt: DiagnosticEventPayload) => void>(), + dispatchDepth: 0, + }; + } + return globalStore.__openclawDiagnosticEventsState; +} export function isDiagnosticsEnabled(config?: OpenClawConfig): boolean { return config?.diagnostics?.enabled === true; } export function emitDiagnosticEvent(event: DiagnosticEventInput) { + const state = getDiagnosticEventsState(); if (state.dispatchDepth > 100) { console.error( `[diagnostic-events] recursion guard tripped at depth=${state.dispatchDepth}, dropping type=${event.type}`, @@ -204,27 +208,49 @@ export function emitDiagnosticEvent(event: DiagnosticEventInput) { seq: (state.seq += 1), ts: Date.now(), } satisfies DiagnosticEventPayload; - state.dispatchDepth += 1; - notifyListeners(state.listeners, enriched, (err) => { - const errorMessage = - err instanceof Error - ? (err.stack ?? err.message) - : typeof err === "string" - ? err - : String(err); - console.error( - `[diagnostic-events] listener error type=${enriched.type} seq=${enriched.seq}: ${errorMessage}`, + if (diagnosticCheckpointLogsEnabled) { + console.warn( + `[diagnostic-events][checkpoints] emit type=${enriched.type} seq=${enriched.seq} listeners=${state.listeners.size}${"sessionKey" in enriched && typeof enriched.sessionKey === "string" ? ` sessionKey=${enriched.sessionKey}` : ""}`, ); - // Ignore listener failures. - }); + } + state.dispatchDepth += 1; + for (const listener of state.listeners) { + try { + listener(enriched); + } catch (err) { + const errorMessage = + err instanceof Error + ? (err.stack ?? err.message) + : typeof err === "string" + ? err + : String(err); + console.error( + `[diagnostic-events] listener error type=${enriched.type} seq=${enriched.seq}: ${errorMessage}`, + ); + // Ignore listener failures. + } + } state.dispatchDepth -= 1; } export function onDiagnosticEvent(listener: (evt: DiagnosticEventPayload) => void): () => void { - return registerListener(state.listeners, listener); + const state = getDiagnosticEventsState(); + state.listeners.add(listener); + if (diagnosticCheckpointLogsEnabled) { + console.warn(`[diagnostic-events][checkpoints] subscribe listeners=${state.listeners.size}`); + } + return () => { + state.listeners.delete(listener); + if (diagnosticCheckpointLogsEnabled) { + console.warn( + `[diagnostic-events][checkpoints] unsubscribe listeners=${state.listeners.size}`, + ); + } + }; } export function resetDiagnosticEventsForTest(): void { + const state = getDiagnosticEventsState(); state.seq = 0; state.listeners.clear(); state.dispatchDepth = 0; diff --git a/src/plugins/hooks.ts b/src/plugins/hooks.ts index e8e1e2aa163..0272481bee8 100644 --- a/src/plugins/hooks.ts +++ b/src/plugins/hooks.ts @@ -159,6 +159,7 @@ function getHooksForNameAndPlugin( export function createHookRunner(registry: PluginRegistry, options: HookRunnerOptions = {}) { const logger = options.logger; const catchErrors = options.catchErrors ?? true; + const hookCheckpointLogsEnabled = process.env.OPENCLAW_PLUGIN_CHECKPOINTS === "1"; const mergeBeforeModelResolve = ( acc: PluginHookBeforeModelResolveResult | undefined, @@ -250,9 +251,17 @@ export function createHookRunner(registry: PluginRegistry, options: HookRunnerOp } logger?.debug?.(`[hooks] running ${hookName} (${hooks.length} handlers)`); + if (hookCheckpointLogsEnabled) { + logger?.warn( + `[hooks][checkpoints] dispatch ${hookName} handlers=${hooks.map((hook) => hook.pluginId).join(",")}`, + ); + } const promises = hooks.map(async (hook) => { try { + if (hookCheckpointLogsEnabled) { + logger?.warn(`[hooks][checkpoints] invoke ${hookName} plugin=${hook.pluginId}`); + } await (hook.handler as (event: unknown, ctx: unknown) => Promise)(event, ctx); } catch (err) { handleHookError({ hookName, pluginId: hook.pluginId, error: err }); diff --git a/src/plugins/runtime.ts b/src/plugins/runtime.ts index 03c43ed4d1b..e1a287233e3 100644 --- a/src/plugins/runtime.ts +++ b/src/plugins/runtime.ts @@ -1,4 +1,3 @@ -import { resolveGlobalSingleton } from "../shared/global-singleton.js"; import { createEmptyPluginRegistry } from "./registry-empty.js"; import type { PluginRegistry } from "./registry.js"; @@ -12,15 +11,33 @@ type RegistryState = { version: number; }; -const state = resolveGlobalSingleton(REGISTRY_STATE, () => ({ - registry: createEmptyPluginRegistry(), - httpRouteRegistry: null, - httpRouteRegistryPinned: false, - key: null, - version: 0, -})); +const state: RegistryState = (() => { + const globalState = globalThis as typeof globalThis & { + [REGISTRY_STATE]?: RegistryState; + }; + if (!globalState[REGISTRY_STATE]) { + globalState[REGISTRY_STATE] = { + registry: createEmptyPluginRegistry(), + httpRouteRegistry: null, + httpRouteRegistryPinned: false, + key: null, + version: 0, + }; + } + return globalState[REGISTRY_STATE]; +})(); export function setActivePluginRegistry(registry: PluginRegistry, cacheKey?: string) { + if (process.env.OPENCLAW_PLUGIN_CHECKPOINTS === "1") { + const stack = new Error().stack + ?.split("\n") + .slice(2, 5) + .map((line) => line.trim()) + .join(" | "); + console.warn( + `[plugins][checkpoints] activate registry key=${cacheKey ?? "none"} plugins=${registry.plugins.length} typedHooks=${registry.typedHooks.length}${stack ? ` caller=${stack}` : ""}`, + ); + } state.registry = registry; if (!state.httpRouteRegistryPinned) { state.httpRouteRegistry = registry; diff --git a/src/plugins/services.ts b/src/plugins/services.ts index 07746e1650a..73e6c901965 100644 --- a/src/plugins/services.ts +++ b/src/plugins/services.ts @@ -5,6 +5,7 @@ import type { PluginRegistry } from "./registry.js"; import type { OpenClawPluginServiceContext, PluginLogger } from "./types.js"; const log = createSubsystemLogger("plugins"); +const pluginCheckpointLogsEnabled = process.env.OPENCLAW_PLUGIN_CHECKPOINTS === "1"; function createPluginLogger(): PluginLogger { return { @@ -47,8 +48,18 @@ export async function startPluginServices(params: { for (const entry of params.registry.services) { const service = entry.service; + const typedHookCountBefore = params.registry.typedHooks.length; try { await service.start(serviceContext); + if (pluginCheckpointLogsEnabled) { + const newTypedHooks = params.registry.typedHooks + .slice(typedHookCountBefore) + .filter((hook) => hook.pluginId === entry.pluginId) + .map((hook) => hook.hookName); + log.warn( + `[plugins][checkpoints] service started (${service.id}, plugin=${entry.pluginId}) typedHooksAdded=${newTypedHooks.length}${newTypedHooks.length > 0 ? ` hooks=${newTypedHooks.join(",")}` : ""}`, + ); + } running.push({ id: service.id, stop: service.stop ? () => service.stop?.(serviceContext) : undefined, diff --git a/src/plugins/tools.ts b/src/plugins/tools.ts index 9a1142a8306..3ab517f9550 100644 --- a/src/plugins/tools.ts +++ b/src/plugins/tools.ts @@ -4,6 +4,7 @@ import { createSubsystemLogger } from "../logging/subsystem.js"; import { applyTestPluginDefaults, normalizePluginsConfig } from "./config-state.js"; import { loadOpenClawPlugins } from "./loader.js"; import { createPluginLoaderLogger } from "./logger.js"; +import { getActivePluginRegistry, getActivePluginRegistryKey } from "./runtime.js"; import type { OpenClawPluginToolContext } from "./types.js"; const log = createSubsystemLogger("plugins"); @@ -59,17 +60,21 @@ export function resolvePluginTools(params: { return []; } - const registry = loadOpenClawPlugins({ - config: effectiveConfig, - workspaceDir: params.context.workspaceDir, - runtimeOptions: params.allowGatewaySubagentBinding - ? { - allowGatewaySubagentBinding: true, - } - : undefined, - env, - logger: createPluginLoaderLogger(log), - }); + const activeRegistry = getActivePluginRegistry(); + const registry = + getActivePluginRegistryKey() && activeRegistry + ? activeRegistry + : loadOpenClawPlugins({ + config: effectiveConfig, + workspaceDir: params.context.workspaceDir, + runtimeOptions: params.allowGatewaySubagentBinding + ? { + allowGatewaySubagentBinding: true, + } + : undefined, + env, + logger: createPluginLoaderLogger(log), + }); const tools: AnyAgentTool[] = []; const existing = params.existingToolNames ?? new Set();