diff --git a/extensions/nostr/src/channel.ts b/extensions/nostr/src/channel.ts index cbb1f287297..a755bfacc6d 100644 --- a/extensions/nostr/src/channel.ts +++ b/extensions/nostr/src/channel.ts @@ -5,6 +5,7 @@ import { } from "openclaw/plugin-sdk/channel-config-helpers"; import { createChannelPairingController } from "openclaw/plugin-sdk/channel-pairing"; import { attachChannelToResult } from "openclaw/plugin-sdk/channel-send-result"; +import { createChatChannelPlugin } from "openclaw/plugin-sdk/core"; import { buildPassiveChannelStatusSummary, buildTrafficStatusSummary, @@ -143,39 +144,259 @@ const nostrConfigAdapter = createTopLevelChannelConfigAdapter = { - id: "nostr", - meta: { +export const nostrPlugin: ChannelPlugin = createChatChannelPlugin({ + base: { id: "nostr", - label: "Nostr", - selectionLabel: "Nostr", - docsPath: "/channels/nostr", - docsLabel: "nostr", - blurb: "Decentralized DMs via Nostr relays (NIP-04)", - order: 100, - }, - capabilities: { - chatTypes: ["direct"], // DMs only for MVP - media: false, // No media for MVP - }, - reload: { configPrefixes: ["channels.nostr"] }, - configSchema: buildChannelConfigSchema(NostrConfigSchema), - setup: nostrSetupAdapter, - setupWizard: nostrSetupWizard, - - config: { - ...nostrConfigAdapter, - isConfigured: (account) => account.configured, - describeAccount: (account) => - describeAccountSnapshot({ - account, - configured: account.configured, - extra: { - publicKey: account.publicKey, + meta: { + id: "nostr", + label: "Nostr", + selectionLabel: "Nostr", + docsPath: "/channels/nostr", + docsLabel: "nostr", + blurb: "Decentralized DMs via Nostr relays (NIP-04)", + order: 100, + }, + capabilities: { + chatTypes: ["direct"], // DMs only for MVP + media: false, // No media for MVP + }, + reload: { configPrefixes: ["channels.nostr"] }, + configSchema: buildChannelConfigSchema(NostrConfigSchema), + setup: nostrSetupAdapter, + setupWizard: nostrSetupWizard, + config: { + ...nostrConfigAdapter, + isConfigured: (account) => account.configured, + describeAccount: (account) => + describeAccountSnapshot({ + account, + configured: account.configured, + extra: { + publicKey: account.publicKey, + }, + }), + }, + messaging: { + normalizeTarget: (target) => { + // Strip nostr: prefix if present + const cleaned = target.trim().replace(/^nostr:/i, ""); + try { + return normalizePubkey(cleaned); + } catch { + return cleaned; + } + }, + targetResolver: { + looksLikeId: (input) => { + const trimmed = input.trim(); + return trimmed.startsWith("npub1") || /^[0-9a-fA-F]{64}$/.test(trimmed); }, - }), - }, + hint: "", + }, + resolveOutboundSessionRoute: (params) => resolveNostrOutboundSessionRoute(params), + }, + status: { + defaultRuntime: createDefaultChannelRuntimeState(DEFAULT_ACCOUNT_ID), + collectStatusIssues: (accounts) => collectStatusIssuesFromLastError("nostr", accounts), + buildChannelSummary: ({ snapshot }) => + buildPassiveChannelStatusSummary(snapshot, { + publicKey: snapshot.publicKey ?? null, + }), + buildAccountSnapshot: ({ account, runtime }) => + buildComputedAccountStatusSnapshot( + { + accountId: account.accountId, + name: account.name, + enabled: account.enabled, + configured: account.configured, + runtime, + }, + { + publicKey: account.publicKey, + profile: account.profile, + ...buildTrafficStatusSummary(runtime), + }, + ), + }, + gateway: { + startAccount: async (ctx) => { + const account = ctx.account; + ctx.setStatus({ + accountId: account.accountId, + publicKey: account.publicKey, + }); + ctx.log?.info( + `[${account.accountId}] starting Nostr provider (pubkey: ${account.publicKey})`, + ); + if (!account.configured) { + throw new Error("Nostr private key not configured"); + } + + const runtime = getNostrRuntime(); + const pairing = createChannelPairingController({ + core: runtime, + channel: "nostr", + accountId: account.accountId, + }); + const resolveInboundAccess = async (senderPubkey: string, rawBody: string) => + await resolveNostrDirectAccess({ + cfg: ctx.cfg, + accountId: account.accountId, + dmPolicy: account.config.dmPolicy ?? "pairing", + allowFrom: account.config.allowFrom, + senderPubkey, + rawBody, + runtime: { + shouldComputeCommandAuthorized: + runtime.channel.commands.shouldComputeCommandAuthorized, + resolveCommandAuthorizedFromAuthorizers: + runtime.channel.commands.resolveCommandAuthorizedFromAuthorizers, + }, + }); + + // Track bus handle for metrics callback + let busHandle: NostrBusHandle | null = null; + + const authorizeSender = createPreCryptoDirectDmAuthorizer({ + resolveAccess: async (senderPubkey) => await resolveInboundAccess(senderPubkey, ""), + issuePairingChallenge: async ({ senderId, reply }) => { + await pairing.issueChallenge({ + senderId, + senderIdLine: `Your Nostr pubkey: ${senderId}`, + sendPairingReply: reply, + onCreated: () => { + ctx.log?.debug?.(`[${account.accountId}] nostr pairing request sender=${senderId}`); + }, + onReplyError: (err) => { + ctx.log?.warn?.( + `[${account.accountId}] nostr pairing reply failed for ${senderId}: ${String(err)}`, + ); + }, + }); + }, + onBlocked: ({ senderId, reason }) => { + ctx.log?.debug?.(`[${account.accountId}] blocked Nostr sender ${senderId} (${reason})`); + }, + }); + + const bus = await startNostrBus({ + accountId: account.accountId, + privateKey: account.privateKey, + relays: account.relays, + authorizeSender: async ({ senderPubkey, reply }) => + await authorizeSender({ senderId: senderPubkey, reply }), + onMessage: async (senderPubkey, text, reply, meta) => { + const resolvedAccess = await resolveInboundAccess(senderPubkey, text); + if (resolvedAccess.access.decision !== "allow") { + ctx.log?.warn?.( + `[${account.accountId}] dropping Nostr DM after preflight drift (${senderPubkey}, ${resolvedAccess.access.reason})`, + ); + return; + } + + await dispatchInboundDirectDmWithRuntime({ + cfg: ctx.cfg, + runtime, + channel: "nostr", + channelLabel: "Nostr", + accountId: account.accountId, + peer: { + kind: "direct", + id: senderPubkey, + }, + senderId: senderPubkey, + senderAddress: `nostr:${senderPubkey}`, + recipientAddress: `nostr:${account.publicKey}`, + conversationLabel: senderPubkey, + rawBody: text, + messageId: meta.eventId, + timestamp: meta.createdAt * 1000, + commandAuthorized: resolvedAccess.commandAuthorized, + deliver: async (payload) => { + const outboundText = + payload && typeof payload === "object" && "text" in payload + ? String((payload as { text?: string }).text ?? "") + : ""; + if (!outboundText.trim()) { + return; + } + const tableMode = runtime.channel.text.resolveMarkdownTableMode({ + cfg: ctx.cfg, + channel: "nostr", + accountId: account.accountId, + }); + await reply(runtime.channel.text.convertMarkdownTables(outboundText, tableMode)); + }, + onRecordError: (err) => { + ctx.log?.error?.( + `[${account.accountId}] failed recording Nostr inbound session: ${String(err)}`, + ); + }, + onDispatchError: (err, info) => { + ctx.log?.error?.( + `[${account.accountId}] Nostr ${info.kind} reply failed: ${String(err)}`, + ); + }, + }); + }, + onError: (error, context) => { + ctx.log?.error?.(`[${account.accountId}] Nostr error (${context}): ${error.message}`); + }, + onConnect: (relay) => { + ctx.log?.debug?.(`[${account.accountId}] Connected to relay: ${relay}`); + }, + onDisconnect: (relay) => { + ctx.log?.debug?.(`[${account.accountId}] Disconnected from relay: ${relay}`); + }, + onEose: (relays) => { + ctx.log?.debug?.(`[${account.accountId}] EOSE received from relays: ${relays}`); + }, + onMetric: (event: MetricEvent) => { + // Log significant metrics at appropriate levels + if (event.name.startsWith("event.rejected.")) { + ctx.log?.debug?.( + `[${account.accountId}] Metric: ${event.name} ${JSON.stringify(event.labels)}`, + ); + } else if (event.name === "relay.circuit_breaker.open") { + ctx.log?.warn?.( + `[${account.accountId}] Circuit breaker opened for relay: ${event.labels?.relay}`, + ); + } else if (event.name === "relay.circuit_breaker.close") { + ctx.log?.info?.( + `[${account.accountId}] Circuit breaker closed for relay: ${event.labels?.relay}`, + ); + } else if (event.name === "relay.error") { + ctx.log?.debug?.(`[${account.accountId}] Relay error: ${event.labels?.relay}`); + } + // Update cached metrics snapshot + if (busHandle) { + metricsSnapshots.set(account.accountId, busHandle.getMetrics()); + } + }, + }); + + busHandle = bus; + + // Store the bus handle + activeBuses.set(account.accountId, bus); + + ctx.log?.info( + `[${account.accountId}] Nostr provider started, connected to ${account.relays.length} relay(s)`, + ); + + // Return cleanup function + return { + stop: () => { + bus.close(); + activeBuses.delete(account.accountId); + metricsSnapshots.delete(account.accountId); + ctx.log?.info(`[${account.accountId}] Nostr provider stopped`); + }, + }; + }, + }, + }, pairing: { idLabel: "nostrPubkey", normalizeAllowEntry: (entry) => { @@ -193,31 +414,9 @@ export const nostrPlugin: ChannelPlugin = { } }, }, - security: { resolveDmPolicy: resolveNostrDmPolicy, }, - - messaging: { - normalizeTarget: (target) => { - // Strip nostr: prefix if present - const cleaned = target.trim().replace(/^nostr:/i, ""); - try { - return normalizePubkey(cleaned); - } catch { - return cleaned; - } - }, - targetResolver: { - looksLikeId: (input) => { - const trimmed = input.trim(); - return trimmed.startsWith("npub1") || /^[0-9a-fA-F]{64}$/.test(trimmed); - }, - hint: "", - }, - resolveOutboundSessionRoute: (params) => resolveNostrOutboundSessionRoute(params), - }, - outbound: { deliveryMode: "direct", textChunkLimit: 4000, @@ -242,209 +441,7 @@ export const nostrPlugin: ChannelPlugin = { }); }, }, - - status: { - defaultRuntime: createDefaultChannelRuntimeState(DEFAULT_ACCOUNT_ID), - collectStatusIssues: (accounts) => collectStatusIssuesFromLastError("nostr", accounts), - buildChannelSummary: ({ snapshot }) => - buildPassiveChannelStatusSummary(snapshot, { - publicKey: snapshot.publicKey ?? null, - }), - buildAccountSnapshot: ({ account, runtime }) => - buildComputedAccountStatusSnapshot( - { - accountId: account.accountId, - name: account.name, - enabled: account.enabled, - configured: account.configured, - runtime, - }, - { - publicKey: account.publicKey, - profile: account.profile, - ...buildTrafficStatusSummary(runtime), - }, - ), - }, - - gateway: { - startAccount: async (ctx) => { - const account = ctx.account; - ctx.setStatus({ - accountId: account.accountId, - publicKey: account.publicKey, - }); - ctx.log?.info( - `[${account.accountId}] starting Nostr provider (pubkey: ${account.publicKey})`, - ); - - if (!account.configured) { - throw new Error("Nostr private key not configured"); - } - - const runtime = getNostrRuntime(); - const pairing = createChannelPairingController({ - core: runtime, - channel: "nostr", - accountId: account.accountId, - }); - const resolveInboundAccess = async (senderPubkey: string, rawBody: string) => - await resolveNostrDirectAccess({ - cfg: ctx.cfg, - accountId: account.accountId, - dmPolicy: account.config.dmPolicy ?? "pairing", - allowFrom: account.config.allowFrom, - senderPubkey, - rawBody, - runtime: { - shouldComputeCommandAuthorized: runtime.channel.commands.shouldComputeCommandAuthorized, - resolveCommandAuthorizedFromAuthorizers: - runtime.channel.commands.resolveCommandAuthorizedFromAuthorizers, - }, - }); - - // Track bus handle for metrics callback - let busHandle: NostrBusHandle | null = null; - - const authorizeSender = createPreCryptoDirectDmAuthorizer({ - resolveAccess: async (senderPubkey) => await resolveInboundAccess(senderPubkey, ""), - issuePairingChallenge: async ({ senderId, reply }) => { - await pairing.issueChallenge({ - senderId, - senderIdLine: `Your Nostr pubkey: ${senderId}`, - sendPairingReply: reply, - onCreated: () => { - ctx.log?.debug?.(`[${account.accountId}] nostr pairing request sender=${senderId}`); - }, - onReplyError: (err) => { - ctx.log?.warn?.( - `[${account.accountId}] nostr pairing reply failed for ${senderId}: ${String(err)}`, - ); - }, - }); - }, - onBlocked: ({ senderId, reason }) => { - ctx.log?.debug?.(`[${account.accountId}] blocked Nostr sender ${senderId} (${reason})`); - }, - }); - - const bus = await startNostrBus({ - accountId: account.accountId, - privateKey: account.privateKey, - relays: account.relays, - authorizeSender: async ({ senderPubkey, reply }) => - await authorizeSender({ senderId: senderPubkey, reply }), - onMessage: async (senderPubkey, text, reply, meta) => { - const resolvedAccess = await resolveInboundAccess(senderPubkey, text); - if (resolvedAccess.access.decision !== "allow") { - ctx.log?.warn?.( - `[${account.accountId}] dropping Nostr DM after preflight drift (${senderPubkey}, ${resolvedAccess.access.reason})`, - ); - return; - } - - await dispatchInboundDirectDmWithRuntime({ - cfg: ctx.cfg, - runtime, - channel: "nostr", - channelLabel: "Nostr", - accountId: account.accountId, - peer: { - kind: "direct", - id: senderPubkey, - }, - senderId: senderPubkey, - senderAddress: `nostr:${senderPubkey}`, - recipientAddress: `nostr:${account.publicKey}`, - conversationLabel: senderPubkey, - rawBody: text, - messageId: meta.eventId, - timestamp: meta.createdAt * 1000, - commandAuthorized: resolvedAccess.commandAuthorized, - deliver: async (payload) => { - const outboundText = - payload && typeof payload === "object" && "text" in payload - ? String((payload as { text?: string }).text ?? "") - : ""; - if (!outboundText.trim()) { - return; - } - const tableMode = runtime.channel.text.resolveMarkdownTableMode({ - cfg: ctx.cfg, - channel: "nostr", - accountId: account.accountId, - }); - await reply(runtime.channel.text.convertMarkdownTables(outboundText, tableMode)); - }, - onRecordError: (err) => { - ctx.log?.error?.( - `[${account.accountId}] failed recording Nostr inbound session: ${String(err)}`, - ); - }, - onDispatchError: (err, info) => { - ctx.log?.error?.( - `[${account.accountId}] Nostr ${info.kind} reply failed: ${String(err)}`, - ); - }, - }); - }, - onError: (error, context) => { - ctx.log?.error?.(`[${account.accountId}] Nostr error (${context}): ${error.message}`); - }, - onConnect: (relay) => { - ctx.log?.debug?.(`[${account.accountId}] Connected to relay: ${relay}`); - }, - onDisconnect: (relay) => { - ctx.log?.debug?.(`[${account.accountId}] Disconnected from relay: ${relay}`); - }, - onEose: (relays) => { - ctx.log?.debug?.(`[${account.accountId}] EOSE received from relays: ${relays}`); - }, - onMetric: (event: MetricEvent) => { - // Log significant metrics at appropriate levels - if (event.name.startsWith("event.rejected.")) { - ctx.log?.debug?.( - `[${account.accountId}] Metric: ${event.name} ${JSON.stringify(event.labels)}`, - ); - } else if (event.name === "relay.circuit_breaker.open") { - ctx.log?.warn?.( - `[${account.accountId}] Circuit breaker opened for relay: ${event.labels?.relay}`, - ); - } else if (event.name === "relay.circuit_breaker.close") { - ctx.log?.info?.( - `[${account.accountId}] Circuit breaker closed for relay: ${event.labels?.relay}`, - ); - } else if (event.name === "relay.error") { - ctx.log?.debug?.(`[${account.accountId}] Relay error: ${event.labels?.relay}`); - } - // Update cached metrics snapshot - if (busHandle) { - metricsSnapshots.set(account.accountId, busHandle.getMetrics()); - } - }, - }); - - busHandle = bus; - - // Store the bus handle - activeBuses.set(account.accountId, bus); - - ctx.log?.info( - `[${account.accountId}] Nostr provider started, connected to ${account.relays.length} relay(s)`, - ); - - // Return cleanup function - return { - stop: () => { - bus.close(); - activeBuses.delete(account.accountId); - metricsSnapshots.delete(account.accountId); - ctx.log?.info(`[${account.accountId}] Nostr provider stopped`); - }, - }; - }, - }, -}; +}); /** * Get metrics snapshot for a Nostr account.