From f554b736f52fa63ad90c8f855cd715bb42d2d8ce Mon Sep 17 00:00:00 2001 From: huntharo Date: Sun, 15 Mar 2026 16:07:56 -0400 Subject: [PATCH] Plugins: harden bound conversation routing --- .../src/monitor/thread-bindings.lifecycle.ts | 12 +- .../reply/dispatch-from-config.test.ts | 297 +++++++++++++++++- src/auto-reply/reply/dispatch-from-config.ts | 213 +++++++++---- src/plugins/commands.test.ts | 48 ++- src/plugins/conversation-binding.test.ts | 48 +++ src/plugins/conversation-binding.ts | 67 ++++ src/plugins/hooks.test-helpers.ts | 21 ++ src/plugins/hooks.ts | 91 ++++++ src/plugins/types.ts | 2 + src/plugins/wired-hooks-inbound-claim.test.ts | 75 +++++ 10 files changed, 775 insertions(+), 99 deletions(-) diff --git a/extensions/discord/src/monitor/thread-bindings.lifecycle.ts b/extensions/discord/src/monitor/thread-bindings.lifecycle.ts index 44373e03860..d7d96857250 100644 --- a/extensions/discord/src/monitor/thread-bindings.lifecycle.ts +++ b/extensions/discord/src/monitor/thread-bindings.lifecycle.ts @@ -323,12 +323,12 @@ export async function reconcileAcpThreadBindingsOnStartup(params: { }; } - const acpBindings = manager.listBindings().filter((binding) => { - if (binding.targetKind !== "acp") { - return false; - } - return binding.metadata?.pluginBindingOwner !== "plugin"; - }); + const acpBindings = manager + .listBindings() + .filter( + (binding) => + binding.targetKind === "acp" && binding.metadata?.pluginBindingOwner !== "plugin", + ); const staleBindings: ThreadBindingRecord[] = []; const probeTargets: Array<{ binding: ThreadBindingRecord; diff --git a/src/auto-reply/reply/dispatch-from-config.test.ts b/src/auto-reply/reply/dispatch-from-config.test.ts index b39e6762b6c..ed41db9664e 100644 --- a/src/auto-reply/reply/dispatch-from-config.test.ts +++ b/src/auto-reply/reply/dispatch-from-config.test.ts @@ -23,10 +23,17 @@ const diagnosticMocks = vi.hoisted(() => ({ logSessionStateChange: vi.fn(), })); const hookMocks = vi.hoisted(() => ({ + registry: { + plugins: [] as Array<{ + id: string; + status: "loaded" | "disabled" | "error"; + }>, + }, runner: { hasHooks: vi.fn(() => false), runInboundClaim: vi.fn(async () => undefined), runInboundClaimForPlugin: vi.fn(async () => undefined), + runInboundClaimForPluginOutcome: vi.fn(async () => ({ status: "no_handler" as const })), runMessageReceived: vi.fn(async () => {}), }, })); @@ -136,6 +143,7 @@ vi.mock("../../config/sessions.js", async (importOriginal) => { vi.mock("../../plugins/hook-runner-global.js", () => ({ getGlobalHookRunner: () => hookMocks.runner, + getGlobalPluginRegistry: () => hookMocks.registry, })); vi.mock("../../hooks/internal-hooks.js", () => ({ createInternalHookEvent: internalHookMocks.createInternalHookEvent, @@ -181,6 +189,7 @@ vi.mock("../../tts/tts.js", () => ({ const { dispatchReplyFromConfig } = await import("./dispatch-from-config.js"); const { resetInboundDedupe } = await import("./inbound-dedupe.js"); const { __testing: acpManagerTesting } = await import("../../acp/control-plane/manager.js"); +const { __testing: pluginBindingTesting } = await import("../../plugins/conversation-binding.js"); const noAbortResult = { handled: false, aborted: false } as const; const emptyConfig = {} as OpenClawConfig; @@ -254,7 +263,12 @@ describe("dispatchReplyFromConfig", () => { hookMocks.runner.runInboundClaim.mockResolvedValue(undefined); hookMocks.runner.runInboundClaimForPlugin.mockClear(); hookMocks.runner.runInboundClaimForPlugin.mockResolvedValue(undefined); + hookMocks.runner.runInboundClaimForPluginOutcome.mockClear(); + hookMocks.runner.runInboundClaimForPluginOutcome.mockResolvedValue({ + status: "no_handler", + }); hookMocks.runner.runMessageReceived.mockClear(); + hookMocks.registry.plugins = []; internalHookMocks.createInternalHookEvent.mockClear(); internalHookMocks.createInternalHookEvent.mockImplementation(createInternalHookEventPayload); internalHookMocks.triggerInternalHook.mockClear(); @@ -265,13 +279,14 @@ describe("dispatchReplyFromConfig", () => { acpMocks.requireAcpRuntimeBackend.mockReset(); sessionBindingMocks.listBySession.mockReset(); sessionBindingMocks.listBySession.mockReturnValue([]); + pluginBindingTesting.reset(); + sessionBindingMocks.resolveByConversation.mockReset(); + sessionBindingMocks.resolveByConversation.mockReturnValue(null); + sessionBindingMocks.touch.mockReset(); sessionStoreMocks.currentEntry = undefined; sessionStoreMocks.loadSessionStore.mockClear(); sessionStoreMocks.resolveStorePath.mockClear(); sessionStoreMocks.resolveSessionStoreEntry.mockClear(); - sessionBindingMocks.resolveByConversation.mockReset(); - sessionBindingMocks.resolveByConversation.mockReturnValue(null); - sessionBindingMocks.touch.mockReset(); ttsMocks.state.synthesizeFinalAudio = false; ttsMocks.maybeApplyTtsToPayload.mockClear(); ttsMocks.normalizeTtsAutoMode.mockClear(); @@ -2033,6 +2048,11 @@ describe("dispatchReplyFromConfig", () => { ((hookName?: string) => hookName === "inbound_claim" || hookName === "message_received") as () => boolean, ); + hookMocks.registry.plugins = [{ id: "openclaw-codex-app-server", status: "loaded" }]; + hookMocks.runner.runInboundClaimForPluginOutcome.mockResolvedValue({ + status: "handled", + result: { handled: true }, + }); sessionBindingMocks.resolveByConversation.mockReturnValue({ bindingId: "binding-1", targetSessionKey: "plugin-binding:codex:abc123", @@ -2075,7 +2095,7 @@ describe("dispatchReplyFromConfig", () => { expect(result).toEqual({ queuedFinal: false, counts: { tool: 0, block: 0, final: 0 } }); expect(sessionBindingMocks.touch).toHaveBeenCalledWith("binding-1"); - expect(hookMocks.runner.runInboundClaimForPlugin).toHaveBeenCalledWith( + expect(hookMocks.runner.runInboundClaimForPluginOutcome).toHaveBeenCalledWith( "openclaw-codex-app-server", expect.objectContaining({ channel: "discord", @@ -2099,6 +2119,11 @@ describe("dispatchReplyFromConfig", () => { ((hookName?: string) => hookName === "inbound_claim" || hookName === "message_received") as () => boolean, ); + hookMocks.registry.plugins = [{ id: "openclaw-codex-app-server", status: "loaded" }]; + hookMocks.runner.runInboundClaimForPluginOutcome.mockResolvedValue({ + status: "handled", + result: { handled: true }, + }); sessionBindingMocks.resolveByConversation.mockReturnValue({ bindingId: "binding-dm-1", targetSessionKey: "plugin-binding:codex:dm123", @@ -2142,7 +2167,7 @@ describe("dispatchReplyFromConfig", () => { expect(result).toEqual({ queuedFinal: false, counts: { tool: 0, block: 0, final: 0 } }); expect(sessionBindingMocks.touch).toHaveBeenCalledWith("binding-dm-1"); - expect(hookMocks.runner.runInboundClaimForPlugin).toHaveBeenCalledWith( + expect(hookMocks.runner.runInboundClaimForPluginOutcome).toHaveBeenCalledWith( "openclaw-codex-app-server", expect.objectContaining({ channel: "discord", @@ -2160,6 +2185,268 @@ describe("dispatchReplyFromConfig", () => { expect(replyResolver).not.toHaveBeenCalled(); }); + it("falls back to OpenClaw once per startup when a bound plugin is missing", async () => { + setNoAbort(); + hookMocks.runner.hasHooks.mockImplementation( + ((hookName?: string) => + hookName === "inbound_claim" || hookName === "message_received") as () => boolean, + ); + hookMocks.runner.runInboundClaimForPluginOutcome.mockResolvedValue({ + status: "missing_plugin", + }); + sessionBindingMocks.resolveByConversation.mockReturnValue({ + bindingId: "binding-missing-1", + targetSessionKey: "plugin-binding:codex:missing123", + targetKind: "session", + conversation: { + channel: "discord", + accountId: "default", + conversationId: "channel:missing-plugin", + }, + status: "active", + boundAt: 1710000000000, + metadata: { + pluginBindingOwner: "plugin", + pluginId: "openclaw-codex-app-server", + pluginName: "Codex App Server", + pluginRoot: "/Users/huntharo/github/openclaw-app-server", + detachHint: "/codex_detach", + }, + } satisfies SessionBindingRecord); + + const replyResolver = vi.fn(async () => ({ text: "openclaw fallback" }) satisfies ReplyPayload); + + const firstDispatcher = createDispatcher(); + await dispatchReplyFromConfig({ + ctx: buildTestCtx({ + Provider: "discord", + Surface: "discord", + OriginatingChannel: "discord", + OriginatingTo: "discord:channel:missing-plugin", + To: "discord:channel:missing-plugin", + AccountId: "default", + MessageSid: "msg-missing-plugin-1", + SessionKey: "agent:main:discord:channel:missing-plugin", + CommandBody: "hello", + RawBody: "hello", + Body: "hello", + }), + cfg: emptyConfig, + dispatcher: firstDispatcher, + replyResolver, + }); + + const firstNotice = (firstDispatcher.sendToolResult as ReturnType).mock + .calls[0]?.[0] as ReplyPayload | undefined; + expect(firstNotice?.text).toContain("Routing this message to OpenClaw instead."); + expect(firstNotice?.text).toContain("/codex_detach"); + expect(replyResolver).toHaveBeenCalledTimes(1); + expect(hookMocks.runner.runInboundClaim).not.toHaveBeenCalled(); + + replyResolver.mockClear(); + hookMocks.runner.runInboundClaim.mockClear(); + + const secondDispatcher = createDispatcher(); + await dispatchReplyFromConfig({ + ctx: buildTestCtx({ + Provider: "discord", + Surface: "discord", + OriginatingChannel: "discord", + OriginatingTo: "discord:channel:missing-plugin", + To: "discord:channel:missing-plugin", + AccountId: "default", + MessageSid: "msg-missing-plugin-2", + SessionKey: "agent:main:discord:channel:missing-plugin", + CommandBody: "still there?", + RawBody: "still there?", + Body: "still there?", + }), + cfg: emptyConfig, + dispatcher: secondDispatcher, + replyResolver, + }); + + expect(secondDispatcher.sendToolResult).not.toHaveBeenCalled(); + expect(replyResolver).toHaveBeenCalledTimes(1); + expect(hookMocks.runner.runInboundClaim).not.toHaveBeenCalled(); + }); + + it("falls back to OpenClaw when the bound plugin is loaded but has no inbound_claim handler", async () => { + setNoAbort(); + hookMocks.runner.hasHooks.mockImplementation( + ((hookName?: string) => + hookName === "inbound_claim" || hookName === "message_received") as () => boolean, + ); + hookMocks.registry.plugins = [{ id: "openclaw-codex-app-server", status: "loaded" }]; + hookMocks.runner.runInboundClaimForPluginOutcome.mockResolvedValue({ + status: "no_handler", + }); + sessionBindingMocks.resolveByConversation.mockReturnValue({ + bindingId: "binding-no-handler-1", + targetSessionKey: "plugin-binding:codex:nohandler123", + targetKind: "session", + conversation: { + channel: "discord", + accountId: "default", + conversationId: "channel:no-handler", + }, + status: "active", + boundAt: 1710000000000, + metadata: { + pluginBindingOwner: "plugin", + pluginId: "openclaw-codex-app-server", + pluginName: "Codex App Server", + pluginRoot: "/Users/huntharo/github/openclaw-app-server", + }, + } satisfies SessionBindingRecord); + const dispatcher = createDispatcher(); + const replyResolver = vi.fn(async () => ({ text: "openclaw fallback" }) satisfies ReplyPayload); + + await dispatchReplyFromConfig({ + ctx: buildTestCtx({ + Provider: "discord", + Surface: "discord", + OriginatingChannel: "discord", + OriginatingTo: "discord:channel:no-handler", + To: "discord:channel:no-handler", + AccountId: "default", + MessageSid: "msg-no-handler-1", + SessionKey: "agent:main:discord:channel:no-handler", + CommandBody: "hello", + RawBody: "hello", + Body: "hello", + }), + cfg: emptyConfig, + dispatcher, + replyResolver, + }); + + const notice = (dispatcher.sendToolResult as ReturnType).mock.calls[0]?.[0] as + | ReplyPayload + | undefined; + expect(notice?.text).toContain("Routing this message to OpenClaw instead."); + expect(replyResolver).toHaveBeenCalledTimes(1); + expect(hookMocks.runner.runInboundClaim).not.toHaveBeenCalled(); + }); + + it("notifies the user when a bound plugin declines the turn and keeps the binding attached", async () => { + setNoAbort(); + hookMocks.runner.hasHooks.mockImplementation( + ((hookName?: string) => + hookName === "inbound_claim" || hookName === "message_received") as () => boolean, + ); + hookMocks.registry.plugins = [{ id: "openclaw-codex-app-server", status: "loaded" }]; + hookMocks.runner.runInboundClaimForPluginOutcome.mockResolvedValue({ + status: "declined", + }); + sessionBindingMocks.resolveByConversation.mockReturnValue({ + bindingId: "binding-declined-1", + targetSessionKey: "plugin-binding:codex:declined123", + targetKind: "session", + conversation: { + channel: "discord", + accountId: "default", + conversationId: "channel:declined", + }, + status: "active", + boundAt: 1710000000000, + metadata: { + pluginBindingOwner: "plugin", + pluginId: "openclaw-codex-app-server", + pluginName: "Codex App Server", + pluginRoot: "/Users/huntharo/github/openclaw-app-server", + detachHint: "/codex_detach", + }, + } satisfies SessionBindingRecord); + const dispatcher = createDispatcher(); + const replyResolver = vi.fn(async () => ({ text: "should not run" }) satisfies ReplyPayload); + + await dispatchReplyFromConfig({ + ctx: buildTestCtx({ + Provider: "discord", + Surface: "discord", + OriginatingChannel: "discord", + OriginatingTo: "discord:channel:declined", + To: "discord:channel:declined", + AccountId: "default", + MessageSid: "msg-declined-1", + SessionKey: "agent:main:discord:channel:declined", + CommandBody: "hello", + RawBody: "hello", + Body: "hello", + }), + cfg: emptyConfig, + dispatcher, + replyResolver, + }); + + const finalNotice = (dispatcher.sendFinalReply as ReturnType).mock + .calls[0]?.[0] as ReplyPayload | undefined; + expect(finalNotice?.text).toContain("did not handle this message"); + expect(finalNotice?.text).toContain("/codex_detach"); + expect(replyResolver).not.toHaveBeenCalled(); + expect(hookMocks.runner.runInboundClaim).not.toHaveBeenCalled(); + }); + + it("notifies the user when a bound plugin errors and keeps raw details out of the reply", async () => { + setNoAbort(); + hookMocks.runner.hasHooks.mockImplementation( + ((hookName?: string) => + hookName === "inbound_claim" || hookName === "message_received") as () => boolean, + ); + hookMocks.registry.plugins = [{ id: "openclaw-codex-app-server", status: "loaded" }]; + hookMocks.runner.runInboundClaimForPluginOutcome.mockResolvedValue({ + status: "error", + error: "boom", + }); + sessionBindingMocks.resolveByConversation.mockReturnValue({ + bindingId: "binding-error-1", + targetSessionKey: "plugin-binding:codex:error123", + targetKind: "session", + conversation: { + channel: "discord", + accountId: "default", + conversationId: "channel:error", + }, + status: "active", + boundAt: 1710000000000, + metadata: { + pluginBindingOwner: "plugin", + pluginId: "openclaw-codex-app-server", + pluginName: "Codex App Server", + pluginRoot: "/Users/huntharo/github/openclaw-app-server", + }, + } satisfies SessionBindingRecord); + const dispatcher = createDispatcher(); + const replyResolver = vi.fn(async () => ({ text: "should not run" }) satisfies ReplyPayload); + + await dispatchReplyFromConfig({ + ctx: buildTestCtx({ + Provider: "discord", + Surface: "discord", + OriginatingChannel: "discord", + OriginatingTo: "discord:channel:error", + To: "discord:channel:error", + AccountId: "default", + MessageSid: "msg-error-1", + SessionKey: "agent:main:discord:channel:error", + CommandBody: "hello", + RawBody: "hello", + Body: "hello", + }), + cfg: emptyConfig, + dispatcher, + replyResolver, + }); + + const finalNotice = (dispatcher.sendFinalReply as ReturnType).mock + .calls[0]?.[0] as ReplyPayload | undefined; + expect(finalNotice?.text).toContain("hit an error handling this message"); + expect(finalNotice?.text).not.toContain("boom"); + expect(replyResolver).not.toHaveBeenCalled(); + expect(hookMocks.runner.runInboundClaim).not.toHaveBeenCalled(); + }); + it("marks diagnostics skipped for duplicate inbound messages", async () => { setNoAbort(); const cfg = { diagnostics: { enabled: true } } as OpenClawConfig; diff --git a/src/auto-reply/reply/dispatch-from-config.ts b/src/auto-reply/reply/dispatch-from-config.ts index a22e942a8b9..1e90dd58887 100644 --- a/src/auto-reply/reply/dispatch-from-config.ts +++ b/src/auto-reply/reply/dispatch-from-config.ts @@ -27,10 +27,15 @@ import { logSessionStateChange, } from "../../logging/diagnostic.js"; import { + buildPluginBindingDeclinedText, + buildPluginBindingErrorText, + buildPluginBindingUnavailableText, + hasShownPluginBindingFallbackNotice, isPluginOwnedSessionBindingRecord, + markPluginBindingFallbackNoticeShown, toPluginConversationBinding, } from "../../plugins/conversation-binding.js"; -import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js"; +import { getGlobalHookRunner, getGlobalPluginRegistry } from "../../plugins/hook-runner-global.js"; import { resolveSendPolicy } from "../../sessions/send-policy.js"; import { maybeApplyTtsToPayload, normalizeTtsAutoMode, resolveTtsConfig } from "../../tts/tts.js"; import { INTERNAL_MESSAGE_CHANNEL, normalizeMessageChannel } from "../../utils/message-channel.js"; @@ -198,64 +203,11 @@ export async function dispatchReplyFromConfig(params: { const hookContext = deriveInboundMessageHookContext(ctx, { messageId: messageIdForHook }); const { isGroup, groupId } = hookContext; const inboundClaimContext = toPluginInboundClaimContext(hookContext); - - const pluginOwnedBindingRecord = - inboundClaimContext.conversationId && inboundClaimContext.channelId - ? getSessionBindingService().resolveByConversation({ - channel: inboundClaimContext.channelId, - accountId: inboundClaimContext.accountId ?? "default", - conversationId: inboundClaimContext.conversationId, - parentConversationId: inboundClaimContext.parentConversationId, - }) - : null; - const pluginOwnedBinding = isPluginOwnedSessionBindingRecord(pluginOwnedBindingRecord) - ? toPluginConversationBinding(pluginOwnedBindingRecord) - : null; - - if (pluginOwnedBinding) { - getSessionBindingService().touch(pluginOwnedBinding.bindingId); - logVerbose( - `plugin-bound inbound routed to ${pluginOwnedBinding.pluginId} conversation=${pluginOwnedBinding.conversationId}`, - ); - if (hookRunner?.hasHooks("inbound_claim")) { - await hookRunner.runInboundClaimForPlugin( - pluginOwnedBinding.pluginId, - toPluginInboundClaimEvent(hookContext, { - commandAuthorized: - typeof ctx.CommandAuthorized === "boolean" ? ctx.CommandAuthorized : undefined, - wasMentioned: typeof ctx.WasMentioned === "boolean" ? ctx.WasMentioned : undefined, - }), - inboundClaimContext, - ); - } - markIdle("plugin_binding_dispatch"); - recordProcessed("completed", { reason: "plugin-bound" }); - return { queuedFinal: false, counts: dispatcher.getQueuedCounts() }; - } - - // Trigger plugin hooks (fire-and-forget) - if (hookRunner?.hasHooks("message_received")) { - fireAndForgetHook( - hookRunner.runMessageReceived( - toPluginMessageReceivedEvent(hookContext), - toPluginMessageContext(hookContext), - ), - "dispatch-from-config: message_received plugin hook failed", - ); - } - - // Bridge to internal hooks (HOOK.md discovery system) - refs #8807 - if (sessionKey) { - fireAndForgetHook( - triggerInternalHook( - createInternalHookEvent("message", "received", sessionKey, { - ...toInternalMessageReceivedContext(hookContext), - timestamp, - }), - ), - "dispatch-from-config: message_received internal hook failed", - ); - } + const inboundClaimEvent = toPluginInboundClaimEvent(hookContext, { + commandAuthorized: + typeof ctx.CommandAuthorized === "boolean" ? ctx.CommandAuthorized : undefined, + wasMentioned: typeof ctx.WasMentioned === "boolean" ? ctx.WasMentioned : undefined, + }); // Check if we should route replies to originating channel instead of dispatcher. // Only route when the originating channel is DIFFERENT from the current surface. @@ -321,6 +273,144 @@ export async function dispatchReplyFromConfig(params: { } }; + const sendBindingNotice = async ( + payload: ReplyPayload, + mode: "additive" | "terminal", + ): Promise => { + if (shouldRouteToOriginating && originatingChannel && originatingTo) { + const result = await routeReply({ + payload, + channel: originatingChannel, + to: originatingTo, + sessionKey: ctx.SessionKey, + accountId: ctx.AccountId, + threadId: routeThreadId, + cfg, + isGroup, + groupId, + }); + if (!result.ok) { + logVerbose( + `dispatch-from-config: route-reply (plugin binding notice) failed: ${result.error ?? "unknown error"}`, + ); + } + return result.ok; + } + return mode === "additive" + ? dispatcher.sendToolResult(payload) + : dispatcher.sendFinalReply(payload); + }; + + const pluginOwnedBindingRecord = + inboundClaimContext.conversationId && inboundClaimContext.channelId + ? getSessionBindingService().resolveByConversation({ + channel: inboundClaimContext.channelId, + accountId: inboundClaimContext.accountId ?? "default", + conversationId: inboundClaimContext.conversationId, + parentConversationId: inboundClaimContext.parentConversationId, + }) + : null; + const pluginOwnedBinding = isPluginOwnedSessionBindingRecord(pluginOwnedBindingRecord) + ? toPluginConversationBinding(pluginOwnedBindingRecord) + : null; + + let pluginFallbackReason: + | "plugin-bound-fallback-missing-plugin" + | "plugin-bound-fallback-no-handler" + | undefined; + + if (pluginOwnedBinding) { + getSessionBindingService().touch(pluginOwnedBinding.bindingId); + logVerbose( + `plugin-bound inbound routed to ${pluginOwnedBinding.pluginId} conversation=${pluginOwnedBinding.conversationId}`, + ); + const targetedClaimOutcome = hookRunner?.runInboundClaimForPluginOutcome + ? await hookRunner.runInboundClaimForPluginOutcome( + pluginOwnedBinding.pluginId, + inboundClaimEvent, + inboundClaimContext, + ) + : (() => { + const pluginLoaded = + getGlobalPluginRegistry()?.plugins.some( + (plugin) => plugin.id === pluginOwnedBinding.pluginId && plugin.status === "loaded", + ) ?? false; + return pluginLoaded + ? ({ status: "no_handler" } as const) + : ({ status: "missing_plugin" } as const); + })(); + + switch (targetedClaimOutcome.status) { + case "handled": { + markIdle("plugin_binding_dispatch"); + recordProcessed("completed", { reason: "plugin-bound-handled" }); + return { queuedFinal: false, counts: dispatcher.getQueuedCounts() }; + } + case "missing_plugin": + case "no_handler": { + pluginFallbackReason = + targetedClaimOutcome.status === "missing_plugin" + ? "plugin-bound-fallback-missing-plugin" + : "plugin-bound-fallback-no-handler"; + if (!hasShownPluginBindingFallbackNotice(pluginOwnedBinding.bindingId)) { + const didSendNotice = await sendBindingNotice( + { text: buildPluginBindingUnavailableText(pluginOwnedBinding) }, + "additive", + ); + if (didSendNotice) { + markPluginBindingFallbackNoticeShown(pluginOwnedBinding.bindingId); + } + } + break; + } + case "declined": { + await sendBindingNotice( + { text: buildPluginBindingDeclinedText(pluginOwnedBinding) }, + "terminal", + ); + markIdle("plugin_binding_declined"); + recordProcessed("completed", { reason: "plugin-bound-declined" }); + return { queuedFinal: false, counts: dispatcher.getQueuedCounts() }; + } + case "error": { + logVerbose( + `plugin-bound inbound claim failed for ${pluginOwnedBinding.pluginId}: ${targetedClaimOutcome.error}`, + ); + await sendBindingNotice( + { text: buildPluginBindingErrorText(pluginOwnedBinding) }, + "terminal", + ); + markIdle("plugin_binding_error"); + recordProcessed("completed", { reason: "plugin-bound-error" }); + return { queuedFinal: false, counts: dispatcher.getQueuedCounts() }; + } + } + } + + // Trigger plugin hooks (fire-and-forget) + if (hookRunner?.hasHooks("message_received")) { + fireAndForgetHook( + hookRunner.runMessageReceived( + toPluginMessageReceivedEvent(hookContext), + toPluginMessageContext(hookContext), + ), + "dispatch-from-config: message_received plugin hook failed", + ); + } + + // Bridge to internal hooks (HOOK.md discovery system) - refs #8807 + if (sessionKey) { + fireAndForgetHook( + triggerInternalHook( + createInternalHookEvent("message", "received", sessionKey, { + ...toInternalMessageReceivedContext(hookContext), + timestamp, + }), + ), + "dispatch-from-config: message_received internal hook failed", + ); + } + markProcessing(); try { @@ -648,7 +738,10 @@ export async function dispatchReplyFromConfig(params: { const counts = dispatcher.getQueuedCounts(); counts.final += routedFinalCount; - recordProcessed("completed"); + recordProcessed( + "completed", + pluginFallbackReason ? { reason: pluginFallbackReason } : undefined, + ); markIdle("message_completed"); return { queuedFinal, counts }; } catch (err) { diff --git a/src/plugins/commands.test.ts b/src/plugins/commands.test.ts index e60fdff51fc..64f953fb014 100644 --- a/src/plugins/commands.test.ts +++ b/src/plugins/commands.test.ts @@ -139,26 +139,31 @@ describe("registerPluginCommand", () => { }); it("does not expose binding APIs to plugin commands on unsupported channels", async () => { + const handler = async (ctx: { + requestConversationBinding: (params: { summary: string }) => Promise; + getCurrentConversationBinding: () => Promise; + detachConversationBinding: () => Promise; + }) => { + const requested = await ctx.requestConversationBinding({ + summary: "Bind this conversation.", + }); + const current = await ctx.getCurrentConversationBinding(); + const detached = await ctx.detachConversationBinding(); + return { + text: JSON.stringify({ + requested, + current, + detached, + }), + }; + }; registerPluginCommand( "demo-plugin", { name: "bindcheck", description: "Demo command", acceptsArgs: false, - handler: async (ctx) => { - const requested = await ctx.requestConversationBinding({ - summary: "Bind this conversation.", - }); - const current = await ctx.getCurrentConversationBinding(); - const detached = await ctx.detachConversationBinding(); - return { - text: JSON.stringify({ - requested, - current, - detached, - }), - }; - }, + handler, }, { pluginRoot: "/plugins/demo-plugin" }, ); @@ -168,20 +173,7 @@ describe("registerPluginCommand", () => { name: "bindcheck", description: "Demo command", acceptsArgs: false, - handler: async (ctx) => { - const requested = await ctx.requestConversationBinding({ - summary: "Bind this conversation.", - }); - const current = await ctx.getCurrentConversationBinding(); - const detached = await ctx.detachConversationBinding(); - return { - text: JSON.stringify({ - requested, - current, - detached, - }), - }; - }, + handler, pluginId: "demo-plugin", pluginRoot: "/plugins/demo-plugin", }, diff --git a/src/plugins/conversation-binding.test.ts b/src/plugins/conversation-binding.test.ts index 7380adefca5..821fd9e3b48 100644 --- a/src/plugins/conversation-binding.test.ts +++ b/src/plugins/conversation-binding.test.ts @@ -294,6 +294,54 @@ describe("plugin conversation binding approvals", () => { expect(samePluginNewPath.status).toBe("pending"); }); + it("persists detachHint on approved plugin bindings", async () => { + const request = await requestPluginConversationBinding({ + pluginId: "codex", + pluginName: "Codex App Server", + pluginRoot: "/plugins/codex-a", + requestedBySenderId: "user-1", + conversation: { + channel: "discord", + accountId: "isolated", + conversationId: "channel:detach-hint", + }, + binding: { + summary: "Bind this conversation to Codex thread 999.", + detachHint: "/codex_detach", + }, + }); + + expect(["pending", "bound"]).toContain(request.status); + + if (request.status === "pending") { + const approved = await resolvePluginConversationBindingApproval({ + approvalId: request.approvalId, + decision: "allow-once", + senderId: "user-1", + }); + + expect(approved.status).toBe("approved"); + if (approved.status !== "approved") { + throw new Error("expected approved bind request"); + } + + expect(approved.binding.detachHint).toBe("/codex_detach"); + } else { + expect(request.binding.detachHint).toBe("/codex_detach"); + } + + const currentBinding = await getCurrentPluginConversationBinding({ + pluginRoot: "/plugins/codex-a", + conversation: { + channel: "discord", + accountId: "isolated", + conversationId: "channel:detach-hint", + }, + }); + + expect(currentBinding?.detachHint).toBe("/codex_detach"); + }); + it("returns and detaches only bindings owned by the requesting plugin root", async () => { const request = await requestPluginConversationBinding({ pluginId: "codex", diff --git a/src/plugins/conversation-binding.ts b/src/plugins/conversation-binding.ts index 7cae2304c16..3de655abbe1 100644 --- a/src/plugins/conversation-binding.ts +++ b/src/plugins/conversation-binding.ts @@ -61,6 +61,7 @@ type PendingPluginBindingRequest = { requestedAt: number; requestedBySenderId?: string; summary?: string; + detachHint?: string; }; type PluginBindingApprovalAction = { @@ -80,6 +81,7 @@ type PluginBindingMetadata = { pluginName?: string; pluginRoot: string; summary?: string; + detachHint?: string; }; type PluginBindingResolveResult = @@ -99,9 +101,24 @@ type PluginBindingResolveResult = const pendingRequests = new Map(); +type PluginBindingGlobalState = { + fallbackNoticeBindingIds: Set; +}; + +const pluginBindingGlobalStateKey = Symbol.for("openclaw.plugins.binding.global-state"); + let approvalsCache: PluginBindingApprovalsFile | null = null; let approvalsLoaded = false; +function getPluginBindingGlobalState(): PluginBindingGlobalState { + const globalStore = globalThis as typeof globalThis & { + [pluginBindingGlobalStateKey]?: PluginBindingGlobalState; + }; + return (globalStore[pluginBindingGlobalStateKey] ??= { + fallbackNoticeBindingIds: new Set(), + }); +} + class PluginBindingApprovalButton extends Button { customId: string; label: string; @@ -369,6 +386,7 @@ function buildBindingMetadata(params: { pluginName?: string; pluginRoot: string; summary?: string; + detachHint?: string; }): PluginBindingMetadata { return { pluginBindingOwner: PLUGIN_BINDING_OWNER, @@ -376,6 +394,7 @@ function buildBindingMetadata(params: { pluginName: params.pluginName, pluginRoot: params.pluginRoot, summary: params.summary?.trim() || undefined, + detachHint: params.detachHint?.trim() || undefined, }; } @@ -428,6 +447,7 @@ export function toPluginConversationBinding( parentConversationId: record.conversation.parentConversationId, boundAt: record.boundAt, summary: metadata.summary, + detachHint: metadata.detachHint, }; } @@ -435,6 +455,7 @@ async function bindConversationNow(params: { identity: PluginBindingIdentity; conversation: PluginBindingConversation; summary?: string; + detachHint?: string; }): Promise { const ref = toConversationRef(params.conversation); const targetSessionKey = buildPluginBindingSessionKey({ @@ -453,6 +474,7 @@ async function bindConversationNow(params: { pluginName: params.identity.pluginName, pluginRoot: params.identity.pluginRoot, summary: params.summary, + detachHint: params.detachHint, }), }); const binding = toPluginConversationBinding(record); @@ -482,6 +504,46 @@ function buildApprovalMessage(request: PendingPluginBindingRequest): string { return lines.join("\n"); } +function resolvePluginBindingDisplayName(binding: { + pluginId: string; + pluginName?: string; +}): string { + return binding.pluginName?.trim() || binding.pluginId; +} + +function buildDetachHintSuffix(detachHint?: string): string { + const trimmed = detachHint?.trim(); + return trimmed ? ` To detach this conversation, use ${trimmed}.` : ""; +} + +export function buildPluginBindingUnavailableText(binding: PluginConversationBinding): string { + return `The bound plugin ${resolvePluginBindingDisplayName(binding)} is not currently loaded. Routing this message to OpenClaw instead.${buildDetachHintSuffix(binding.detachHint)}`; +} + +export function buildPluginBindingDeclinedText(binding: PluginConversationBinding): string { + return `The bound plugin ${resolvePluginBindingDisplayName(binding)} did not handle this message. This conversation is still bound to that plugin.${buildDetachHintSuffix(binding.detachHint)}`; +} + +export function buildPluginBindingErrorText(binding: PluginConversationBinding): string { + return `The bound plugin ${resolvePluginBindingDisplayName(binding)} hit an error handling this message. This conversation is still bound to that plugin.${buildDetachHintSuffix(binding.detachHint)}`; +} + +export function hasShownPluginBindingFallbackNotice(bindingId: string): boolean { + const normalized = bindingId.trim(); + if (!normalized) { + return false; + } + return getPluginBindingGlobalState().fallbackNoticeBindingIds.has(normalized); +} + +export function markPluginBindingFallbackNoticeShown(bindingId: string): void { + const normalized = bindingId.trim(); + if (!normalized) { + return; + } + getPluginBindingGlobalState().fallbackNoticeBindingIds.add(normalized); +} + function buildPendingReply(request: PendingPluginBindingRequest): ReplyPayload { return { text: buildApprovalMessage(request), @@ -594,6 +656,7 @@ export async function requestPluginConversationBinding(params: { }, conversation, summary: params.binding?.summary, + detachHint: params.binding?.detachHint, }); log.info( `plugin binding auto-refresh plugin=${params.pluginId} root=${params.pluginRoot} channel=${ref.channel} account=${ref.accountId} conversation=${ref.conversationId}`, @@ -616,6 +679,7 @@ export async function requestPluginConversationBinding(params: { }, conversation, summary: params.binding?.summary, + detachHint: params.binding?.detachHint, }); log.info( `plugin binding auto-approved plugin=${params.pluginId} root=${params.pluginRoot} channel=${ref.channel} account=${ref.accountId} conversation=${ref.conversationId}`, @@ -632,6 +696,7 @@ export async function requestPluginConversationBinding(params: { requestedAt: Date.now(), requestedBySenderId: params.requestedBySenderId?.trim() || undefined, summary: params.binding?.summary?.trim() || undefined, + detachHint: params.binding?.detachHint?.trim() || undefined, }; pendingRequests.set(request.id, request); log.info( @@ -723,6 +788,7 @@ export async function resolvePluginConversationBindingApproval(params: { }, conversation: request.conversation, summary: request.summary, + detachHint: request.detachHint, }); log.info( `plugin binding approved plugin=${request.pluginId} root=${request.pluginRoot} decision=${params.decision} channel=${request.conversation.channel} account=${request.conversation.accountId} conversation=${request.conversation.conversationId}`, @@ -754,5 +820,6 @@ export const __testing = { pendingRequests.clear(); approvalsCache = null; approvalsLoaded = false; + getPluginBindingGlobalState().fallbackNoticeBindingIds.clear(); }, }; diff --git a/src/plugins/hooks.test-helpers.ts b/src/plugins/hooks.test-helpers.ts index 8b7076239c2..7954257e714 100644 --- a/src/plugins/hooks.test-helpers.ts +++ b/src/plugins/hooks.test-helpers.ts @@ -5,6 +5,27 @@ export function createMockPluginRegistry( hooks: Array<{ hookName: string; handler: (...args: unknown[]) => unknown }>, ): PluginRegistry { return { + plugins: [ + { + id: "test-plugin", + name: "Test Plugin", + source: "test", + origin: "workspace", + enabled: true, + status: "loaded", + toolNames: [], + hookNames: [], + channelIds: [], + providerIds: [], + gatewayMethods: [], + cliCommands: [], + services: [], + commands: [], + httpRoutes: 0, + hookCount: hooks.length, + configSchema: false, + }, + ], hooks: hooks as never[], typedHooks: hooks.map((h) => ({ pluginId: "test-plugin", diff --git a/src/plugins/hooks.ts b/src/plugins/hooks.ts index d03cb50e09c..cffafd6645d 100644 --- a/src/plugins/hooks.ts +++ b/src/plugins/hooks.ts @@ -114,6 +114,25 @@ export type HookRunnerOptions = { catchErrors?: boolean; }; +export type PluginTargetedInboundClaimOutcome = + | { + status: "handled"; + result: PluginHookInboundClaimResult; + } + | { + status: "missing_plugin"; + } + | { + status: "no_handler"; + } + | { + status: "declined"; + } + | { + status: "error"; + error: string; + }; + /** * Get hooks for a specific hook name, sorted by priority (higher first). */ @@ -210,6 +229,12 @@ export function createHookRunner(registry: PluginRegistry, options: HookRunnerOp throw new Error(msg, { cause: params.error }); }; + const sanitizeHookError = (error: unknown): string => { + const raw = error instanceof Error ? error.message : String(error); + const firstLine = raw.split("\n")[0]?.trim(); + return firstLine || "unknown error"; + }; + /** * Run a hook that doesn't return a value (fire-and-forget style). * All handlers are executed in parallel for performance. @@ -342,6 +367,58 @@ export function createHookRunner(registry: PluginRegistry, options: HookRunnerOp return undefined; } + async function runClaimingHookForPluginOutcome< + K extends PluginHookName, + TResult extends { handled: boolean }, + >( + hookName: K, + pluginId: string, + event: Parameters["handler"]>>[0], + ctx: Parameters["handler"]>>[1], + ): Promise< + | { status: "handled"; result: TResult } + | { status: "missing_plugin" } + | { status: "no_handler" } + | { status: "declined" } + | { status: "error"; error: string } + > { + const pluginLoaded = registry.plugins.some( + (plugin) => plugin.id === pluginId && plugin.status === "loaded", + ); + if (!pluginLoaded) { + return { status: "missing_plugin" }; + } + + const hooks = getHooksForNameAndPlugin(registry, hookName, pluginId); + if (hooks.length === 0) { + return { status: "no_handler" }; + } + + logger?.debug?.( + `[hooks] running ${hookName} for ${pluginId} (${hooks.length} handlers, targeted outcome)`, + ); + + let firstError: string | null = null; + for (const hook of hooks) { + try { + const handlerResult = await ( + hook.handler as (event: unknown, ctx: unknown) => Promise + )(event, ctx); + if (handlerResult?.handled) { + return { status: "handled", result: handlerResult }; + } + } catch (err) { + firstError ??= sanitizeHookError(err); + handleHookError({ hookName, pluginId: hook.pluginId, error: err }); + } + } + + if (firstError) { + return { status: "error", error: firstError }; + } + return { status: "declined" }; + } + // ========================================================================= // Agent Hooks // ========================================================================= @@ -491,6 +568,19 @@ export function createHookRunner(registry: PluginRegistry, options: HookRunnerOp ); } + async function runInboundClaimForPluginOutcome( + pluginId: string, + event: PluginHookInboundClaimEvent, + ctx: PluginHookInboundClaimContext, + ): Promise { + return runClaimingHookForPluginOutcome<"inbound_claim", PluginHookInboundClaimResult>( + "inbound_claim", + pluginId, + event, + ctx, + ); + } + /** * Run message_received hook. * Runs in parallel (fire-and-forget). @@ -843,6 +933,7 @@ export function createHookRunner(registry: PluginRegistry, options: HookRunnerOp // Message hooks runInboundClaim, runInboundClaimForPlugin, + runInboundClaimForPluginOutcome, runMessageReceived, runMessageSending, runMessageSent, diff --git a/src/plugins/types.ts b/src/plugins/types.ts index 6431ad268ac..721b30e6b94 100644 --- a/src/plugins/types.ts +++ b/src/plugins/types.ts @@ -279,6 +279,7 @@ export type PluginCommandContext = { export type PluginConversationBindingRequestParams = { summary?: string; + detachHint?: string; }; export type PluginConversationBinding = { @@ -293,6 +294,7 @@ export type PluginConversationBinding = { threadId?: string | number; boundAt: number; summary?: string; + detachHint?: string; }; export type PluginConversationBindingRequestResult = diff --git a/src/plugins/wired-hooks-inbound-claim.test.ts b/src/plugins/wired-hooks-inbound-claim.test.ts index 1ddb1d34dbf..2af75392fdb 100644 --- a/src/plugins/wired-hooks-inbound-claim.test.ts +++ b/src/plugins/wired-hooks-inbound-claim.test.ts @@ -97,4 +97,79 @@ describe("inbound_claim hook runner", () => { expect(first).toHaveBeenCalledTimes(1); expect(second).not.toHaveBeenCalled(); }); + + it("reports missing_plugin when the bound plugin is not loaded", async () => { + const registry = createMockPluginRegistry([]); + registry.plugins = []; + const runner = createHookRunner(registry); + + const result = await runner.runInboundClaimForPluginOutcome( + "missing-plugin", + { + content: "who are you", + channel: "discord", + accountId: "default", + conversationId: "channel:1", + isGroup: true, + }, + { + channelId: "discord", + accountId: "default", + conversationId: "channel:1", + }, + ); + + expect(result).toEqual({ status: "missing_plugin" }); + }); + + it("reports no_handler when the plugin is loaded but has no targeted hooks", async () => { + const registry = createMockPluginRegistry([]); + const runner = createHookRunner(registry); + + const result = await runner.runInboundClaimForPluginOutcome( + "test-plugin", + { + content: "who are you", + channel: "discord", + accountId: "default", + conversationId: "channel:1", + isGroup: true, + }, + { + channelId: "discord", + accountId: "default", + conversationId: "channel:1", + }, + ); + + expect(result).toEqual({ status: "no_handler" }); + }); + + it("reports error when a targeted handler throws and none claim the event", async () => { + const logger = { + warn: vi.fn(), + error: vi.fn(), + }; + const failing = vi.fn().mockRejectedValue(new Error("boom")); + const registry = createMockPluginRegistry([{ hookName: "inbound_claim", handler: failing }]); + const runner = createHookRunner(registry, { logger }); + + const result = await runner.runInboundClaimForPluginOutcome( + "test-plugin", + { + content: "who are you", + channel: "discord", + accountId: "default", + conversationId: "channel:1", + isGroup: true, + }, + { + channelId: "discord", + accountId: "default", + conversationId: "channel:1", + }, + ); + + expect(result).toEqual({ status: "error", error: "boom" }); + }); });