mirror of https://github.com/openclaw/openclaw.git
fix: preserve gateway plugin webhook routes
This commit is contained in:
parent
179208c914
commit
6783f59232
|
|
@ -11,6 +11,7 @@ export function createGatewayCloseHandler(params: {
|
|||
tailscaleCleanup: (() => Promise<void>) | null;
|
||||
canvasHost: CanvasHostHandler | null;
|
||||
canvasHostServer: CanvasHostServer | null;
|
||||
releasePluginRouteRegistry?: (() => void) | null;
|
||||
stopChannel: (name: ChannelId, accountId?: string) => Promise<void>;
|
||||
pluginServices: PluginServicesHandle | null;
|
||||
cron: { stop: () => void };
|
||||
|
|
@ -33,106 +34,114 @@ export function createGatewayCloseHandler(params: {
|
|||
httpServers?: HttpServer[];
|
||||
}) {
|
||||
return async (opts?: { reason?: string; restartExpectedMs?: number | null }) => {
|
||||
const reasonRaw = typeof opts?.reason === "string" ? opts.reason.trim() : "";
|
||||
const reason = reasonRaw || "gateway stopping";
|
||||
const restartExpectedMs =
|
||||
typeof opts?.restartExpectedMs === "number" && Number.isFinite(opts.restartExpectedMs)
|
||||
? Math.max(0, Math.floor(opts.restartExpectedMs))
|
||||
: null;
|
||||
if (params.bonjourStop) {
|
||||
try {
|
||||
await params.bonjourStop();
|
||||
} catch {
|
||||
/* ignore */
|
||||
}
|
||||
}
|
||||
if (params.tailscaleCleanup) {
|
||||
await params.tailscaleCleanup();
|
||||
}
|
||||
if (params.canvasHost) {
|
||||
try {
|
||||
await params.canvasHost.close();
|
||||
} catch {
|
||||
/* ignore */
|
||||
}
|
||||
}
|
||||
if (params.canvasHostServer) {
|
||||
try {
|
||||
await params.canvasHostServer.close();
|
||||
} catch {
|
||||
/* ignore */
|
||||
}
|
||||
}
|
||||
for (const plugin of listChannelPlugins()) {
|
||||
await params.stopChannel(plugin.id);
|
||||
}
|
||||
if (params.pluginServices) {
|
||||
await params.pluginServices.stop().catch(() => {});
|
||||
}
|
||||
await stopGmailWatcher();
|
||||
params.cron.stop();
|
||||
params.heartbeatRunner.stop();
|
||||
try {
|
||||
params.updateCheckStop?.();
|
||||
} catch {
|
||||
/* ignore */
|
||||
}
|
||||
for (const timer of params.nodePresenceTimers.values()) {
|
||||
clearInterval(timer);
|
||||
}
|
||||
params.nodePresenceTimers.clear();
|
||||
params.broadcast("shutdown", {
|
||||
reason,
|
||||
restartExpectedMs,
|
||||
});
|
||||
clearInterval(params.tickInterval);
|
||||
clearInterval(params.healthInterval);
|
||||
clearInterval(params.dedupeCleanup);
|
||||
if (params.mediaCleanup) {
|
||||
clearInterval(params.mediaCleanup);
|
||||
}
|
||||
if (params.agentUnsub) {
|
||||
const reasonRaw = typeof opts?.reason === "string" ? opts.reason.trim() : "";
|
||||
const reason = reasonRaw || "gateway stopping";
|
||||
const restartExpectedMs =
|
||||
typeof opts?.restartExpectedMs === "number" && Number.isFinite(opts.restartExpectedMs)
|
||||
? Math.max(0, Math.floor(opts.restartExpectedMs))
|
||||
: null;
|
||||
if (params.bonjourStop) {
|
||||
try {
|
||||
await params.bonjourStop();
|
||||
} catch {
|
||||
/* ignore */
|
||||
}
|
||||
}
|
||||
if (params.tailscaleCleanup) {
|
||||
await params.tailscaleCleanup();
|
||||
}
|
||||
if (params.canvasHost) {
|
||||
try {
|
||||
await params.canvasHost.close();
|
||||
} catch {
|
||||
/* ignore */
|
||||
}
|
||||
}
|
||||
if (params.canvasHostServer) {
|
||||
try {
|
||||
await params.canvasHostServer.close();
|
||||
} catch {
|
||||
/* ignore */
|
||||
}
|
||||
}
|
||||
for (const plugin of listChannelPlugins()) {
|
||||
await params.stopChannel(plugin.id);
|
||||
}
|
||||
if (params.pluginServices) {
|
||||
await params.pluginServices.stop().catch(() => {});
|
||||
}
|
||||
await stopGmailWatcher();
|
||||
params.cron.stop();
|
||||
params.heartbeatRunner.stop();
|
||||
try {
|
||||
params.agentUnsub();
|
||||
params.updateCheckStop?.();
|
||||
} catch {
|
||||
/* ignore */
|
||||
}
|
||||
}
|
||||
if (params.heartbeatUnsub) {
|
||||
for (const timer of params.nodePresenceTimers.values()) {
|
||||
clearInterval(timer);
|
||||
}
|
||||
params.nodePresenceTimers.clear();
|
||||
params.broadcast("shutdown", {
|
||||
reason,
|
||||
restartExpectedMs,
|
||||
});
|
||||
clearInterval(params.tickInterval);
|
||||
clearInterval(params.healthInterval);
|
||||
clearInterval(params.dedupeCleanup);
|
||||
if (params.mediaCleanup) {
|
||||
clearInterval(params.mediaCleanup);
|
||||
}
|
||||
if (params.agentUnsub) {
|
||||
try {
|
||||
params.agentUnsub();
|
||||
} catch {
|
||||
/* ignore */
|
||||
}
|
||||
}
|
||||
if (params.heartbeatUnsub) {
|
||||
try {
|
||||
params.heartbeatUnsub();
|
||||
} catch {
|
||||
/* ignore */
|
||||
}
|
||||
}
|
||||
params.chatRunState.clear();
|
||||
for (const c of params.clients) {
|
||||
try {
|
||||
c.socket.close(1012, "service restart");
|
||||
} catch {
|
||||
/* ignore */
|
||||
}
|
||||
}
|
||||
params.clients.clear();
|
||||
await params.configReloader.stop().catch(() => {});
|
||||
if (params.browserControl) {
|
||||
await params.browserControl.stop().catch(() => {});
|
||||
}
|
||||
await new Promise<void>((resolve) => params.wss.close(() => resolve()));
|
||||
const servers =
|
||||
params.httpServers && params.httpServers.length > 0
|
||||
? params.httpServers
|
||||
: [params.httpServer];
|
||||
for (const server of servers) {
|
||||
const httpServer = server as HttpServer & {
|
||||
closeIdleConnections?: () => void;
|
||||
};
|
||||
if (typeof httpServer.closeIdleConnections === "function") {
|
||||
httpServer.closeIdleConnections();
|
||||
}
|
||||
await new Promise<void>((resolve, reject) =>
|
||||
httpServer.close((err) => (err ? reject(err) : resolve())),
|
||||
);
|
||||
}
|
||||
} finally {
|
||||
try {
|
||||
params.heartbeatUnsub();
|
||||
params.releasePluginRouteRegistry?.();
|
||||
} catch {
|
||||
/* ignore */
|
||||
}
|
||||
}
|
||||
params.chatRunState.clear();
|
||||
for (const c of params.clients) {
|
||||
try {
|
||||
c.socket.close(1012, "service restart");
|
||||
} catch {
|
||||
/* ignore */
|
||||
}
|
||||
}
|
||||
params.clients.clear();
|
||||
await params.configReloader.stop().catch(() => {});
|
||||
if (params.browserControl) {
|
||||
await params.browserControl.stop().catch(() => {});
|
||||
}
|
||||
await new Promise<void>((resolve) => params.wss.close(() => resolve()));
|
||||
const servers =
|
||||
params.httpServers && params.httpServers.length > 0
|
||||
? params.httpServers
|
||||
: [params.httpServer];
|
||||
for (const server of servers) {
|
||||
const httpServer = server as HttpServer & {
|
||||
closeIdleConnections?: () => void;
|
||||
};
|
||||
if (typeof httpServer.closeIdleConnections === "function") {
|
||||
httpServer.closeIdleConnections();
|
||||
}
|
||||
await new Promise<void>((resolve, reject) =>
|
||||
httpServer.close((err) => (err ? reject(err) : resolve())),
|
||||
);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,6 +5,11 @@ import { type CanvasHostHandler, createCanvasHostHandler } from "../canvas-host/
|
|||
import type { CliDeps } from "../cli/deps.js";
|
||||
import type { createSubsystemLogger } from "../logging/subsystem.js";
|
||||
import type { PluginRegistry } from "../plugins/registry.js";
|
||||
import {
|
||||
getActivePluginHttpRouteRegistry,
|
||||
pinActivePluginHttpRouteRegistry,
|
||||
releasePinnedPluginHttpRouteRegistry,
|
||||
} from "../plugins/runtime.js";
|
||||
import type { RuntimeEnv } from "../runtime.js";
|
||||
import type { AuthRateLimiter } from "./auth-rate-limit.js";
|
||||
import type { ResolvedGatewayAuth } from "./auth.js";
|
||||
|
|
@ -70,6 +75,7 @@ export async function createGatewayRuntimeState(params: {
|
|||
getReadiness?: ReadinessChecker;
|
||||
}): Promise<{
|
||||
canvasHost: CanvasHostHandler | null;
|
||||
releasePluginRouteRegistry: () => void;
|
||||
httpServer: HttpServer;
|
||||
httpServers: HttpServer[];
|
||||
httpBindHosts: string[];
|
||||
|
|
@ -91,147 +97,157 @@ export async function createGatewayRuntimeState(params: {
|
|||
chatAbortControllers: Map<string, ChatAbortControllerEntry>;
|
||||
toolEventRecipients: ReturnType<typeof createToolEventRecipientRegistry>;
|
||||
}> {
|
||||
let canvasHost: CanvasHostHandler | null = null;
|
||||
if (params.canvasHostEnabled) {
|
||||
try {
|
||||
const handler = await createCanvasHostHandler({
|
||||
runtime: params.canvasRuntime,
|
||||
rootDir: params.cfg.canvasHost?.root,
|
||||
basePath: CANVAS_HOST_PATH,
|
||||
allowInTests: params.allowCanvasHostInTests,
|
||||
liveReload: params.cfg.canvasHost?.liveReload,
|
||||
});
|
||||
if (handler.rootDir) {
|
||||
canvasHost = handler;
|
||||
params.logCanvas.info(
|
||||
`canvas host mounted at http://${params.bindHost}:${params.port}${CANVAS_HOST_PATH}/ (root ${handler.rootDir})`,
|
||||
);
|
||||
pinActivePluginHttpRouteRegistry(params.pluginRegistry);
|
||||
try {
|
||||
let canvasHost: CanvasHostHandler | null = null;
|
||||
if (params.canvasHostEnabled) {
|
||||
try {
|
||||
const handler = await createCanvasHostHandler({
|
||||
runtime: params.canvasRuntime,
|
||||
rootDir: params.cfg.canvasHost?.root,
|
||||
basePath: CANVAS_HOST_PATH,
|
||||
allowInTests: params.allowCanvasHostInTests,
|
||||
liveReload: params.cfg.canvasHost?.liveReload,
|
||||
});
|
||||
if (handler.rootDir) {
|
||||
canvasHost = handler;
|
||||
params.logCanvas.info(
|
||||
`canvas host mounted at http://${params.bindHost}:${params.port}${CANVAS_HOST_PATH}/ (root ${handler.rootDir})`,
|
||||
);
|
||||
}
|
||||
} catch (err) {
|
||||
params.logCanvas.warn(`canvas host failed to start: ${String(err)}`);
|
||||
}
|
||||
} catch (err) {
|
||||
params.logCanvas.warn(`canvas host failed to start: ${String(err)}`);
|
||||
}
|
||||
}
|
||||
|
||||
const clients = new Set<GatewayWsClient>();
|
||||
const { broadcast, broadcastToConnIds } = createGatewayBroadcaster({ clients });
|
||||
const clients = new Set<GatewayWsClient>();
|
||||
const { broadcast, broadcastToConnIds } = createGatewayBroadcaster({ clients });
|
||||
|
||||
const handleHooksRequest = createGatewayHooksRequestHandler({
|
||||
deps: params.deps,
|
||||
getHooksConfig: params.hooksConfig,
|
||||
getClientIpConfig: params.getHookClientIpConfig,
|
||||
bindHost: params.bindHost,
|
||||
port: params.port,
|
||||
logHooks: params.logHooks,
|
||||
});
|
||||
|
||||
const handlePluginRequest = createGatewayPluginRequestHandler({
|
||||
registry: params.pluginRegistry,
|
||||
log: params.logPlugins,
|
||||
});
|
||||
const shouldEnforcePluginGatewayAuth = (pathContext: PluginRoutePathContext): boolean => {
|
||||
return shouldEnforceGatewayAuthForPluginPath(params.pluginRegistry, pathContext);
|
||||
};
|
||||
|
||||
const bindHosts = await resolveGatewayListenHosts(params.bindHost);
|
||||
if (!isLoopbackHost(params.bindHost)) {
|
||||
params.log.warn(
|
||||
"⚠️ Gateway is binding to a non-loopback address. " +
|
||||
"Ensure authentication is configured before exposing to public networks.",
|
||||
);
|
||||
}
|
||||
if (params.cfg.gateway?.controlUi?.dangerouslyAllowHostHeaderOriginFallback === true) {
|
||||
params.log.warn(
|
||||
"⚠️ gateway.controlUi.dangerouslyAllowHostHeaderOriginFallback=true is enabled. " +
|
||||
"Host-header origin fallback weakens origin checks and should only be used as break-glass.",
|
||||
);
|
||||
}
|
||||
const httpServers: HttpServer[] = [];
|
||||
const httpBindHosts: string[] = [];
|
||||
for (const host of bindHosts) {
|
||||
const httpServer = createGatewayHttpServer({
|
||||
canvasHost,
|
||||
clients,
|
||||
controlUiEnabled: params.controlUiEnabled,
|
||||
controlUiBasePath: params.controlUiBasePath,
|
||||
controlUiRoot: params.controlUiRoot,
|
||||
openAiChatCompletionsEnabled: params.openAiChatCompletionsEnabled,
|
||||
openAiChatCompletionsConfig: params.openAiChatCompletionsConfig,
|
||||
openResponsesEnabled: params.openResponsesEnabled,
|
||||
openResponsesConfig: params.openResponsesConfig,
|
||||
strictTransportSecurityHeader: params.strictTransportSecurityHeader,
|
||||
handleHooksRequest,
|
||||
handlePluginRequest,
|
||||
shouldEnforcePluginGatewayAuth,
|
||||
resolvedAuth: params.resolvedAuth,
|
||||
rateLimiter: params.rateLimiter,
|
||||
getReadiness: params.getReadiness,
|
||||
tlsOptions: params.gatewayTls?.enabled ? params.gatewayTls.tlsOptions : undefined,
|
||||
const handleHooksRequest = createGatewayHooksRequestHandler({
|
||||
deps: params.deps,
|
||||
getHooksConfig: params.hooksConfig,
|
||||
getClientIpConfig: params.getHookClientIpConfig,
|
||||
bindHost: params.bindHost,
|
||||
port: params.port,
|
||||
logHooks: params.logHooks,
|
||||
});
|
||||
try {
|
||||
await listenGatewayHttpServer({
|
||||
httpServer,
|
||||
bindHost: host,
|
||||
port: params.port,
|
||||
});
|
||||
httpServers.push(httpServer);
|
||||
httpBindHosts.push(host);
|
||||
} catch (err) {
|
||||
if (host === bindHosts[0]) {
|
||||
throw err;
|
||||
}
|
||||
|
||||
const handlePluginRequest = createGatewayPluginRequestHandler({
|
||||
registry: params.pluginRegistry,
|
||||
log: params.logPlugins,
|
||||
});
|
||||
const shouldEnforcePluginGatewayAuth = (pathContext: PluginRoutePathContext): boolean => {
|
||||
return shouldEnforceGatewayAuthForPluginPath(
|
||||
getActivePluginHttpRouteRegistry() ?? params.pluginRegistry,
|
||||
pathContext,
|
||||
);
|
||||
};
|
||||
|
||||
const bindHosts = await resolveGatewayListenHosts(params.bindHost);
|
||||
if (!isLoopbackHost(params.bindHost)) {
|
||||
params.log.warn(
|
||||
`gateway: failed to bind loopback alias ${host}:${params.port} (${String(err)})`,
|
||||
"⚠️ Gateway is binding to a non-loopback address. " +
|
||||
"Ensure authentication is configured before exposing to public networks.",
|
||||
);
|
||||
}
|
||||
}
|
||||
const httpServer = httpServers[0];
|
||||
if (!httpServer) {
|
||||
throw new Error("Gateway HTTP server failed to start");
|
||||
}
|
||||
if (params.cfg.gateway?.controlUi?.dangerouslyAllowHostHeaderOriginFallback === true) {
|
||||
params.log.warn(
|
||||
"⚠️ gateway.controlUi.dangerouslyAllowHostHeaderOriginFallback=true is enabled. " +
|
||||
"Host-header origin fallback weakens origin checks and should only be used as break-glass.",
|
||||
);
|
||||
}
|
||||
const httpServers: HttpServer[] = [];
|
||||
const httpBindHosts: string[] = [];
|
||||
for (const host of bindHosts) {
|
||||
const httpServer = createGatewayHttpServer({
|
||||
canvasHost,
|
||||
clients,
|
||||
controlUiEnabled: params.controlUiEnabled,
|
||||
controlUiBasePath: params.controlUiBasePath,
|
||||
controlUiRoot: params.controlUiRoot,
|
||||
openAiChatCompletionsEnabled: params.openAiChatCompletionsEnabled,
|
||||
openAiChatCompletionsConfig: params.openAiChatCompletionsConfig,
|
||||
openResponsesEnabled: params.openResponsesEnabled,
|
||||
openResponsesConfig: params.openResponsesConfig,
|
||||
strictTransportSecurityHeader: params.strictTransportSecurityHeader,
|
||||
handleHooksRequest,
|
||||
handlePluginRequest,
|
||||
shouldEnforcePluginGatewayAuth,
|
||||
resolvedAuth: params.resolvedAuth,
|
||||
rateLimiter: params.rateLimiter,
|
||||
getReadiness: params.getReadiness,
|
||||
tlsOptions: params.gatewayTls?.enabled ? params.gatewayTls.tlsOptions : undefined,
|
||||
});
|
||||
try {
|
||||
await listenGatewayHttpServer({
|
||||
httpServer,
|
||||
bindHost: host,
|
||||
port: params.port,
|
||||
});
|
||||
httpServers.push(httpServer);
|
||||
httpBindHosts.push(host);
|
||||
} catch (err) {
|
||||
if (host === bindHosts[0]) {
|
||||
throw err;
|
||||
}
|
||||
params.log.warn(
|
||||
`gateway: failed to bind loopback alias ${host}:${params.port} (${String(err)})`,
|
||||
);
|
||||
}
|
||||
}
|
||||
const httpServer = httpServers[0];
|
||||
if (!httpServer) {
|
||||
throw new Error("Gateway HTTP server failed to start");
|
||||
}
|
||||
|
||||
const wss = new WebSocketServer({
|
||||
noServer: true,
|
||||
maxPayload: MAX_PREAUTH_PAYLOAD_BYTES,
|
||||
});
|
||||
for (const server of httpServers) {
|
||||
attachGatewayUpgradeHandler({
|
||||
httpServer: server,
|
||||
wss,
|
||||
canvasHost,
|
||||
clients,
|
||||
resolvedAuth: params.resolvedAuth,
|
||||
rateLimiter: params.rateLimiter,
|
||||
const wss = new WebSocketServer({
|
||||
noServer: true,
|
||||
maxPayload: MAX_PREAUTH_PAYLOAD_BYTES,
|
||||
});
|
||||
for (const server of httpServers) {
|
||||
attachGatewayUpgradeHandler({
|
||||
httpServer: server,
|
||||
wss,
|
||||
canvasHost,
|
||||
clients,
|
||||
resolvedAuth: params.resolvedAuth,
|
||||
rateLimiter: params.rateLimiter,
|
||||
});
|
||||
}
|
||||
|
||||
const agentRunSeq = new Map<string, number>();
|
||||
const dedupe = new Map<string, DedupeEntry>();
|
||||
const chatRunState = createChatRunState();
|
||||
const chatRunRegistry = chatRunState.registry;
|
||||
const chatRunBuffers = chatRunState.buffers;
|
||||
const chatDeltaSentAt = chatRunState.deltaSentAt;
|
||||
const addChatRun = chatRunRegistry.add;
|
||||
const removeChatRun = chatRunRegistry.remove;
|
||||
const chatAbortControllers = new Map<string, ChatAbortControllerEntry>();
|
||||
const toolEventRecipients = createToolEventRecipientRegistry();
|
||||
|
||||
return {
|
||||
canvasHost,
|
||||
releasePluginRouteRegistry: () => releasePinnedPluginHttpRouteRegistry(params.pluginRegistry),
|
||||
httpServer,
|
||||
httpServers,
|
||||
httpBindHosts,
|
||||
wss,
|
||||
clients,
|
||||
broadcast,
|
||||
broadcastToConnIds,
|
||||
agentRunSeq,
|
||||
dedupe,
|
||||
chatRunState,
|
||||
chatRunBuffers,
|
||||
chatDeltaSentAt,
|
||||
addChatRun,
|
||||
removeChatRun,
|
||||
chatAbortControllers,
|
||||
toolEventRecipients,
|
||||
};
|
||||
} catch (err) {
|
||||
releasePinnedPluginHttpRouteRegistry(params.pluginRegistry);
|
||||
throw err;
|
||||
}
|
||||
|
||||
const agentRunSeq = new Map<string, number>();
|
||||
const dedupe = new Map<string, DedupeEntry>();
|
||||
const chatRunState = createChatRunState();
|
||||
const chatRunRegistry = chatRunState.registry;
|
||||
const chatRunBuffers = chatRunState.buffers;
|
||||
const chatDeltaSentAt = chatRunState.deltaSentAt;
|
||||
const addChatRun = chatRunRegistry.add;
|
||||
const removeChatRun = chatRunRegistry.remove;
|
||||
const chatAbortControllers = new Map<string, ChatAbortControllerEntry>();
|
||||
const toolEventRecipients = createToolEventRecipientRegistry();
|
||||
|
||||
return {
|
||||
canvasHost,
|
||||
httpServer,
|
||||
httpServers,
|
||||
httpBindHosts,
|
||||
wss,
|
||||
clients,
|
||||
broadcast,
|
||||
broadcastToConnIds,
|
||||
agentRunSeq,
|
||||
dedupe,
|
||||
chatRunState,
|
||||
chatRunBuffers,
|
||||
chatDeltaSentAt,
|
||||
addChatRun,
|
||||
removeChatRun,
|
||||
chatAbortControllers,
|
||||
toolEventRecipients,
|
||||
};
|
||||
}
|
||||
|
|
|
|||
|
|
@ -583,6 +583,7 @@ export async function startGatewayServer(
|
|||
});
|
||||
const {
|
||||
canvasHost,
|
||||
releasePluginRouteRegistry,
|
||||
httpServer,
|
||||
httpServers,
|
||||
httpBindHosts,
|
||||
|
|
@ -1022,6 +1023,7 @@ export async function startGatewayServer(
|
|||
tailscaleCleanup,
|
||||
canvasHost,
|
||||
canvasHostServer,
|
||||
releasePluginRouteRegistry,
|
||||
stopChannel,
|
||||
pluginServices,
|
||||
cron,
|
||||
|
|
|
|||
|
|
@ -1,5 +1,12 @@
|
|||
import type { IncomingMessage, ServerResponse } from "node:http";
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import { registerPluginHttpRoute } from "../../plugins/http-registry.js";
|
||||
import { createEmptyPluginRegistry } from "../../plugins/registry.js";
|
||||
import {
|
||||
pinActivePluginHttpRouteRegistry,
|
||||
releasePinnedPluginHttpRouteRegistry,
|
||||
setActivePluginRegistry,
|
||||
} from "../../plugins/runtime.js";
|
||||
import type { PluginRuntime } from "../../plugins/runtime/types.js";
|
||||
import type { GatewayRequestContext, GatewayRequestOptions } from "../server-methods/types.js";
|
||||
import { makeMockHttpResponse } from "../test-http-response.js";
|
||||
|
|
@ -292,6 +299,81 @@ describe("createGatewayPluginRequestHandler", () => {
|
|||
expect(routeHandler).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("handles routes registered into the pinned startup registry after the active registry changes", async () => {
|
||||
const startupRegistry = createTestRegistry();
|
||||
const laterActiveRegistry = createTestRegistry();
|
||||
const routeHandler = vi.fn(async (_req, res: ServerResponse) => {
|
||||
res.statusCode = 202;
|
||||
return true;
|
||||
});
|
||||
|
||||
setActivePluginRegistry(startupRegistry);
|
||||
pinActivePluginHttpRouteRegistry(startupRegistry);
|
||||
setActivePluginRegistry(laterActiveRegistry);
|
||||
|
||||
const unregister = registerPluginHttpRoute({
|
||||
path: "/bluebubbles-webhook",
|
||||
auth: "plugin",
|
||||
handler: routeHandler,
|
||||
});
|
||||
|
||||
try {
|
||||
const handler = createGatewayPluginRequestHandler({
|
||||
registry: startupRegistry,
|
||||
log: createPluginLog(),
|
||||
});
|
||||
|
||||
const { res } = makeMockHttpResponse();
|
||||
const handled = await handler({ url: "/bluebubbles-webhook" } as IncomingMessage, res);
|
||||
expect(handled).toBe(true);
|
||||
expect(routeHandler).toHaveBeenCalledTimes(1);
|
||||
expect(laterActiveRegistry.httpRoutes).toHaveLength(0);
|
||||
} finally {
|
||||
unregister();
|
||||
releasePinnedPluginHttpRouteRegistry(startupRegistry);
|
||||
setActivePluginRegistry(createEmptyPluginRegistry());
|
||||
}
|
||||
});
|
||||
|
||||
it("dispatches using the pinned route registry when both startup and active registries are stale", async () => {
|
||||
const startupRegistry = createTestRegistry();
|
||||
const routeRegistry = createTestRegistry();
|
||||
const laterActiveRegistry = createTestRegistry();
|
||||
const routeHandler = vi.fn(async (_req, res: ServerResponse) => {
|
||||
res.statusCode = 204;
|
||||
return true;
|
||||
});
|
||||
|
||||
setActivePluginRegistry(startupRegistry);
|
||||
pinActivePluginHttpRouteRegistry(routeRegistry);
|
||||
setActivePluginRegistry(laterActiveRegistry);
|
||||
|
||||
const unregister = registerPluginHttpRoute({
|
||||
path: "/bluebubbles-webhook",
|
||||
auth: "plugin",
|
||||
handler: routeHandler,
|
||||
});
|
||||
|
||||
try {
|
||||
const handler = createGatewayPluginRequestHandler({
|
||||
registry: startupRegistry,
|
||||
log: createPluginLog(),
|
||||
});
|
||||
|
||||
const { res } = makeMockHttpResponse();
|
||||
const handled = await handler({ url: "/bluebubbles-webhook" } as IncomingMessage, res);
|
||||
expect(handled).toBe(true);
|
||||
expect(routeHandler).toHaveBeenCalledTimes(1);
|
||||
expect(startupRegistry.httpRoutes).toHaveLength(0);
|
||||
expect(laterActiveRegistry.httpRoutes).toHaveLength(0);
|
||||
expect(routeRegistry.httpRoutes).toHaveLength(1);
|
||||
} finally {
|
||||
unregister();
|
||||
releasePinnedPluginHttpRouteRegistry(routeRegistry);
|
||||
setActivePluginRegistry(createEmptyPluginRegistry());
|
||||
}
|
||||
});
|
||||
|
||||
it("logs and responds with 500 when a route throws", async () => {
|
||||
const log = createPluginLog();
|
||||
const handler = createGatewayPluginRequestHandler({
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
import type { IncomingMessage, ServerResponse } from "node:http";
|
||||
import type { createSubsystemLogger } from "../../logging/subsystem.js";
|
||||
import type { PluginRegistry } from "../../plugins/registry.js";
|
||||
import { getActivePluginHttpRouteRegistry } from "../../plugins/runtime.js";
|
||||
import { withPluginRuntimeGatewayRequestScope } from "../../plugins/runtime/gateway-request-scope.js";
|
||||
import { ADMIN_SCOPE, APPROVALS_SCOPE, PAIRING_SCOPE, WRITE_SCOPE } from "../method-scopes.js";
|
||||
import { GATEWAY_CLIENT_IDS, GATEWAY_CLIENT_MODES } from "../protocol/client-info.js";
|
||||
|
|
@ -59,12 +60,26 @@ export type PluginHttpRequestHandler = (
|
|||
dispatchContext?: { gatewayAuthSatisfied?: boolean },
|
||||
) => Promise<boolean>;
|
||||
|
||||
function resolveCurrentPluginRegistry(fallback: PluginRegistry): PluginRegistry {
|
||||
const routeRegistry = getActivePluginHttpRouteRegistry();
|
||||
if (!routeRegistry) {
|
||||
return fallback;
|
||||
}
|
||||
const routeCount = routeRegistry.httpRoutes?.length ?? 0;
|
||||
const fallbackRouteCount = fallback.httpRoutes?.length ?? 0;
|
||||
if (routeCount === 0 && fallbackRouteCount > 0) {
|
||||
return fallback;
|
||||
}
|
||||
return routeRegistry;
|
||||
}
|
||||
|
||||
export function createGatewayPluginRequestHandler(params: {
|
||||
registry: PluginRegistry;
|
||||
log: SubsystemLogger;
|
||||
}): PluginHttpRequestHandler {
|
||||
const { registry, log } = params;
|
||||
const { log } = params;
|
||||
return async (req, res, providedPathContext, dispatchContext) => {
|
||||
const registry = resolveCurrentPluginRegistry(params.registry);
|
||||
const routes = registry.httpRoutes ?? [];
|
||||
if (routes.length === 0) {
|
||||
return false;
|
||||
|
|
|
|||
|
|
@ -1,6 +1,11 @@
|
|||
import { describe, expect, it, vi } from "vitest";
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import { registerPluginHttpRoute } from "./http-registry.js";
|
||||
import { createEmptyPluginRegistry } from "./registry.js";
|
||||
import {
|
||||
pinActivePluginHttpRouteRegistry,
|
||||
releasePinnedPluginHttpRouteRegistry,
|
||||
setActivePluginRegistry,
|
||||
} from "./runtime.js";
|
||||
|
||||
function expectRouteRegistrationDenied(params: {
|
||||
replaceExisting: boolean;
|
||||
|
|
@ -38,6 +43,11 @@ function expectRouteRegistrationDenied(params: {
|
|||
}
|
||||
|
||||
describe("registerPluginHttpRoute", () => {
|
||||
afterEach(() => {
|
||||
releasePinnedPluginHttpRouteRegistry();
|
||||
setActivePluginRegistry(createEmptyPluginRegistry());
|
||||
});
|
||||
|
||||
it("registers route and unregisters it", () => {
|
||||
const registry = createEmptyPluginRegistry();
|
||||
const handler = vi.fn();
|
||||
|
|
@ -164,4 +174,26 @@ describe("registerPluginHttpRoute", () => {
|
|||
unregister();
|
||||
expect(registry.httpRoutes).toHaveLength(1);
|
||||
});
|
||||
|
||||
it("uses the pinned route registry when the active registry changes later", () => {
|
||||
const startupRegistry = createEmptyPluginRegistry();
|
||||
const laterActiveRegistry = createEmptyPluginRegistry();
|
||||
|
||||
setActivePluginRegistry(startupRegistry);
|
||||
pinActivePluginHttpRouteRegistry(startupRegistry);
|
||||
setActivePluginRegistry(laterActiveRegistry);
|
||||
|
||||
const unregister = registerPluginHttpRoute({
|
||||
path: "/bluebubbles-webhook",
|
||||
auth: "plugin",
|
||||
handler: vi.fn(),
|
||||
});
|
||||
|
||||
expect(startupRegistry.httpRoutes).toHaveLength(1);
|
||||
expect(startupRegistry.httpRoutes[0]?.path).toBe("/bluebubbles-webhook");
|
||||
expect(laterActiveRegistry.httpRoutes).toHaveLength(0);
|
||||
|
||||
unregister();
|
||||
expect(startupRegistry.httpRoutes).toHaveLength(0);
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@ import type { IncomingMessage, ServerResponse } from "node:http";
|
|||
import { normalizePluginHttpPath } from "./http-path.js";
|
||||
import { findOverlappingPluginHttpRoute } from "./http-route-overlap.js";
|
||||
import type { PluginHttpRouteRegistration, PluginRegistry } from "./registry.js";
|
||||
import { requireActivePluginRegistry } from "./runtime.js";
|
||||
import { requireActivePluginHttpRouteRegistry } from "./runtime.js";
|
||||
|
||||
export type PluginHttpRouteHandler = (
|
||||
req: IncomingMessage,
|
||||
|
|
@ -22,7 +22,7 @@ export function registerPluginHttpRoute(params: {
|
|||
log?: (message: string) => void;
|
||||
registry?: PluginRegistry;
|
||||
}): () => void {
|
||||
const registry = params.registry ?? requireActivePluginRegistry();
|
||||
const registry = params.registry ?? requireActivePluginHttpRouteRegistry();
|
||||
const routes = registry.httpRoutes ?? [];
|
||||
registry.httpRoutes = routes;
|
||||
|
||||
|
|
|
|||
|
|
@ -4,6 +4,8 @@ const REGISTRY_STATE = Symbol.for("openclaw.pluginRegistryState");
|
|||
|
||||
type RegistryState = {
|
||||
registry: PluginRegistry | null;
|
||||
httpRouteRegistry: PluginRegistry | null;
|
||||
httpRouteRegistryPinned: boolean;
|
||||
key: string | null;
|
||||
version: number;
|
||||
};
|
||||
|
|
@ -15,6 +17,8 @@ const state: RegistryState = (() => {
|
|||
if (!globalState[REGISTRY_STATE]) {
|
||||
globalState[REGISTRY_STATE] = {
|
||||
registry: createEmptyPluginRegistry(),
|
||||
httpRouteRegistry: null,
|
||||
httpRouteRegistryPinned: false,
|
||||
key: null,
|
||||
version: 0,
|
||||
};
|
||||
|
|
@ -24,6 +28,9 @@ const state: RegistryState = (() => {
|
|||
|
||||
export function setActivePluginRegistry(registry: PluginRegistry, cacheKey?: string) {
|
||||
state.registry = registry;
|
||||
if (!state.httpRouteRegistryPinned) {
|
||||
state.httpRouteRegistry = registry;
|
||||
}
|
||||
state.key = cacheKey ?? null;
|
||||
state.version += 1;
|
||||
}
|
||||
|
|
@ -35,11 +42,41 @@ export function getActivePluginRegistry(): PluginRegistry | null {
|
|||
export function requireActivePluginRegistry(): PluginRegistry {
|
||||
if (!state.registry) {
|
||||
state.registry = createEmptyPluginRegistry();
|
||||
if (!state.httpRouteRegistryPinned) {
|
||||
state.httpRouteRegistry = state.registry;
|
||||
}
|
||||
state.version += 1;
|
||||
}
|
||||
return state.registry;
|
||||
}
|
||||
|
||||
export function pinActivePluginHttpRouteRegistry(registry: PluginRegistry) {
|
||||
state.httpRouteRegistry = registry;
|
||||
state.httpRouteRegistryPinned = true;
|
||||
}
|
||||
|
||||
export function releasePinnedPluginHttpRouteRegistry(registry?: PluginRegistry) {
|
||||
if (registry && state.httpRouteRegistry !== registry) {
|
||||
return;
|
||||
}
|
||||
state.httpRouteRegistryPinned = false;
|
||||
state.httpRouteRegistry = state.registry;
|
||||
}
|
||||
|
||||
export function getActivePluginHttpRouteRegistry(): PluginRegistry | null {
|
||||
return state.httpRouteRegistry ?? state.registry;
|
||||
}
|
||||
|
||||
export function requireActivePluginHttpRouteRegistry(): PluginRegistry {
|
||||
const existing = getActivePluginHttpRouteRegistry();
|
||||
if (existing) {
|
||||
return existing;
|
||||
}
|
||||
const created = requireActivePluginRegistry();
|
||||
state.httpRouteRegistry = created;
|
||||
return created;
|
||||
}
|
||||
|
||||
export function getActivePluginRegistryKey(): string | null {
|
||||
return state.key;
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue