diff --git a/CHANGELOG.md b/CHANGELOG.md index 83736de43a0..7694e4e68f4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ Docs: https://docs.openclaw.ai ### Changes +- Nostr/inbound DMs: verify inbound event signatures before pairing or sender-authorization side effects, so forged DM events no longer create pairing requests or trigger reply attempts. Thanks @smaeljaish771 and @vincentkoc. - LINE/outbound media: add LINE image, video, and audio outbound sends on the LINE-specific delivery path, including explicit preview/tracking handling for videos while keeping generic media sends on the existing image-only route. (#45826) Thanks @masatohoshino. - WhatsApp/reactions: agents can now react with emoji on incoming WhatsApp messages, enabling more natural conversational interactions like acknowledging a photo with ❤️ instead of typing a reply. Thanks @mcaxtr. - MCP: add remote HTTP/SSE server support for `mcp.servers` URL configs, including auth headers and safer config redaction for MCP credentials. (#50396) Thanks @dhananjai1729. diff --git a/extensions/nostr/src/nostr-bus.inbound.test.ts b/extensions/nostr/src/nostr-bus.inbound.test.ts index 86ca8fa8d22..0f5f6b274ad 100644 --- a/extensions/nostr/src/nostr-bus.inbound.test.ts +++ b/extensions/nostr/src/nostr-bus.inbound.test.ts @@ -101,7 +101,7 @@ describe("startNostrBus inbound guards", () => { mockState.handlers = null; }); - it("checks sender authorization before verify/decrypt", async () => { + it("checks sender authorization after verify and before decrypt", async () => { const onMessage = vi.fn(async () => {}); const authorizeSender = vi.fn(async () => "block" as const); const bus = await startNostrBus({ @@ -114,7 +114,7 @@ describe("startNostrBus inbound guards", () => { await emitEvent(createEvent()); expect(authorizeSender).toHaveBeenCalledTimes(1); - expect(mockState.verifyEvent).not.toHaveBeenCalled(); + expect(mockState.verifyEvent).toHaveBeenCalledTimes(1); expect(mockState.decrypt).not.toHaveBeenCalled(); expect(onMessage).not.toHaveBeenCalled(); expect(bus.getMetrics().eventsReceived).toBe(1); @@ -122,6 +122,28 @@ describe("startNostrBus inbound guards", () => { bus.close(); }); + it("rejects invalid signatures before sender authorization", async () => { + mockState.verifyEvent.mockReturnValueOnce(false); + const onMessage = vi.fn(async () => {}); + const authorizeSender = vi.fn(async () => "allow" as const); + const bus = await startNostrBus({ + privateKey: TEST_HEX_PRIVATE_KEY, + onMessage, + authorizeSender, + onMetric: () => {}, + }); + + await emitEvent(createEvent()); + + expect(mockState.verifyEvent).toHaveBeenCalledTimes(1); + expect(authorizeSender).not.toHaveBeenCalled(); + expect(mockState.decrypt).not.toHaveBeenCalled(); + expect(onMessage).not.toHaveBeenCalled(); + expect(bus.getMetrics().eventsRejected.invalidSignature).toBe(1); + + bus.close(); + }); + it("rate limits repeated events before decrypt", async () => { const onMessage = vi.fn(async () => {}); const bus = await startNostrBus({ @@ -146,6 +168,237 @@ describe("startNostrBus inbound guards", () => { bus.close(); }); + it("does not let a blocked sender starve a different verified sender", async () => { + const onMessage = vi.fn(async () => {}); + const authorizeSender = vi.fn(async ({ senderPubkey }: { senderPubkey: string }) => + senderPubkey.startsWith("blocked") ? ("block" as const) : ("allow" as const), + ); + const bus = await startNostrBus({ + privateKey: TEST_HEX_PRIVATE_KEY, + onMessage, + authorizeSender, + onMetric: () => {}, + guardPolicy: { + rateLimit: { + windowMs: 60_000, + maxGlobalPerWindow: 2, + maxPerSenderPerWindow: 1, + maxTrackedSenderKeys: 32, + }, + }, + }); + + await emitEvent( + createEvent({ + id: "blocked-event", + pubkey: `blocked${"a".repeat(57)}`, + }), + ); + await emitEvent( + createEvent({ + id: "allowed-event", + pubkey: `allowed${"b".repeat(57)}`, + }), + ); + + expect(authorizeSender).toHaveBeenCalledTimes(2); + expect(mockState.decrypt).toHaveBeenCalledTimes(1); + expect(onMessage).toHaveBeenCalledTimes(1); + expect(bus.getMetrics().eventsRejected.rateLimited).toBe(0); + + bus.close(); + }); + + it("dedupes replayed verified events that authorization blocks", async () => { + const onMessage = vi.fn(async () => {}); + const authorizeSender = vi.fn(async () => "block" as const); + const bus = await startNostrBus({ + privateKey: TEST_HEX_PRIVATE_KEY, + onMessage, + authorizeSender, + onMetric: () => {}, + }); + + const blockedEvent = createEvent({ + id: "blocked-replay", + pubkey: `blocked${"a".repeat(57)}`, + }); + + await emitEvent(blockedEvent); + await emitEvent(blockedEvent); + + expect(mockState.verifyEvent).toHaveBeenCalledTimes(1); + expect(authorizeSender).toHaveBeenCalledTimes(1); + expect(mockState.decrypt).not.toHaveBeenCalled(); + expect(onMessage).not.toHaveBeenCalled(); + + bus.close(); + }); + + it("does not rate limit an allowed sender while another authorization is still pending", async () => { + const onMessage = vi.fn(async () => {}); + let resolveBlocked: ((value: "block") => void) | undefined; + const blockedPromise = new Promise<"block">((resolve) => { + resolveBlocked = resolve; + }); + const authorizeSender = vi + .fn<(params: { senderPubkey: string }) => Promise<"allow" | "block" | "pairing">>() + .mockImplementationOnce(async () => await blockedPromise) + .mockResolvedValueOnce("allow"); + const bus = await startNostrBus({ + privateKey: TEST_HEX_PRIVATE_KEY, + onMessage, + authorizeSender, + onMetric: () => {}, + guardPolicy: { + rateLimit: { + windowMs: 60_000, + maxGlobalPerWindow: 2, + maxPerSenderPerWindow: 1, + maxTrackedSenderKeys: 32, + }, + }, + }); + + const blockedEventPromise = emitEvent( + createEvent({ + id: "blocked-pending", + pubkey: `blocked${"a".repeat(57)}`, + }), + ); + await emitEvent( + createEvent({ + id: "allowed-during-pending-auth", + pubkey: `allowed${"b".repeat(57)}`, + }), + ); + resolveBlocked?.("block"); + await blockedEventPromise; + + expect(authorizeSender).toHaveBeenCalledTimes(2); + expect(mockState.decrypt).toHaveBeenCalledTimes(1); + expect(onMessage).toHaveBeenCalledTimes(1); + expect(bus.getMetrics().eventsRejected.rateLimited).toBe(0); + + bus.close(); + }); + + it("rate limits repeated invalid signatures before authorization work fans out", async () => { + mockState.verifyEvent.mockReturnValue(false); + const onMessage = vi.fn(async () => {}); + const authorizeSender = vi.fn(async () => "allow" as const); + const bus = await startNostrBus({ + privateKey: TEST_HEX_PRIVATE_KEY, + onMessage, + authorizeSender, + onMetric: () => {}, + guardPolicy: { + rateLimit: { + windowMs: 60_000, + maxGlobalPerWindow: 1, + maxPerSenderPerWindow: 10, + maxTrackedSenderKeys: 32, + }, + }, + }); + + await emitEvent(createEvent({ id: "invalid-1" })); + await emitEvent(createEvent({ id: "invalid-2" })); + + expect(mockState.verifyEvent).toHaveBeenCalledTimes(1); + expect(authorizeSender).not.toHaveBeenCalled(); + expect(bus.getMetrics().eventsRejected.invalidSignature).toBe(1); + expect(bus.getMetrics().eventsRejected.rateLimited).toBe(1); + + bus.close(); + }); + + it("counts oversized ciphertext toward the global inbound rate limit", async () => { + const onMessage = vi.fn(async () => {}); + const bus = await startNostrBus({ + privateKey: TEST_HEX_PRIVATE_KEY, + onMessage, + onMetric: () => {}, + guardPolicy: { + maxCiphertextBytes: 4, + rateLimit: { + windowMs: 60_000, + maxGlobalPerWindow: 1, + maxPerSenderPerWindow: 10, + maxTrackedSenderKeys: 32, + }, + }, + }); + + await emitEvent( + createEvent({ + id: "oversized-global-1", + pubkey: `sender1${"a".repeat(57)}`, + content: "ciphertext-too-large", + }), + ); + await emitEvent( + createEvent({ + id: "oversized-global-2", + pubkey: `sender2${"b".repeat(57)}`, + content: "ciphertext-too-large", + }), + ); + + expect(bus.getMetrics().eventsRejected.oversizedCiphertext).toBe(1); + expect(bus.getMetrics().eventsRejected.rateLimited).toBe(1); + expect(mockState.verifyEvent).not.toHaveBeenCalled(); + expect(mockState.decrypt).not.toHaveBeenCalled(); + expect(onMessage).not.toHaveBeenCalled(); + + bus.close(); + }); + + it("does not spend per-sender buckets on oversized ciphertext before verification", async () => { + const onMessage = vi.fn(async () => {}); + const bus = await startNostrBus({ + privateKey: TEST_HEX_PRIVATE_KEY, + onMessage, + onMetric: () => {}, + guardPolicy: { + maxCiphertextBytes: 4, + rateLimit: { + windowMs: 60_000, + maxGlobalPerWindow: 10, + maxPerSenderPerWindow: 1, + maxTrackedSenderKeys: 32, + }, + }, + }); + + await emitEvent( + createEvent({ + id: "oversized-sender-1", + content: "ciphertext-too-large", + }), + ); + await emitEvent( + createEvent({ + id: "oversized-sender-2", + content: "ciphertext-too-large", + }), + ); + await emitEvent( + createEvent({ + id: "allowed-after-oversized", + content: "ok", + }), + ); + + expect(bus.getMetrics().eventsRejected.oversizedCiphertext).toBe(2); + expect(bus.getMetrics().eventsRejected.rateLimited).toBe(0); + expect(mockState.verifyEvent).toHaveBeenCalledTimes(1); + expect(mockState.decrypt).toHaveBeenCalledTimes(1); + expect(onMessage).toHaveBeenCalledTimes(1); + + bus.close(); + }); + it("rejects far-future events before crypto", async () => { const onMessage = vi.fn(async () => {}); const bus = await startNostrBus({ diff --git a/extensions/nostr/src/nostr-bus.ts b/extensions/nostr/src/nostr-bus.ts index 703e05897f3..943ce373526 100644 --- a/extensions/nostr/src/nostr-bus.ts +++ b/extensions/nostr/src/nostr-bus.ts @@ -64,7 +64,7 @@ export interface NostrBusOptions { reply: (text: string) => Promise, meta: { eventId: string; createdAt: number }, ) => Promise; - /** Called before expensive crypto to allow sender policy checks (optional) */ + /** Called after signature verification and before decrypt to allow sender policy checks (optional) */ authorizeSender?: (params: { senderPubkey: string; reply: (text: string) => Promise; @@ -553,36 +553,47 @@ export async function startNostrBus(options: NostrBusOptions): Promise { + updateRateLimiterSizeMetric(); + if (globalRateLimiter.isRateLimited("global")) { + metrics.emit("rate_limit.global"); + metrics.emit("event.rejected.rate_limited"); + updateRateLimiterSizeMetric(); + return true; } - } + updateRateLimiterSizeMetric(); + return false; + }; - updateRateLimiterSizeMetric(); - if (globalRateLimiter.isRateLimited("global")) { - metrics.emit("rate_limit.global"); - metrics.emit("event.rejected.rate_limited"); + const rejectIfVerifiedSenderRateLimited = (): boolean => { updateRateLimiterSizeMetric(); - return; - } - if (perSenderRateLimiter.isRateLimited(event.pubkey)) { - metrics.emit("rate_limit.per_sender"); - metrics.emit("event.rejected.rate_limited"); + if (perSenderRateLimiter.isRateLimited(event.pubkey)) { + metrics.emit("rate_limit.per_sender"); + metrics.emit("event.rejected.rate_limited"); + updateRateLimiterSizeMetric(); + return true; + } updateRateLimiterSizeMetric(); - return; - } - updateRateLimiterSizeMetric(); + return false; + }; + + const markSeen = () => { + seen.add(event.id); + metrics.emit("memory.seen_tracker_size", seen.size()); + }; if (Buffer.byteLength(event.content, "utf8") > guardPolicy.maxCiphertextBytes) { + if (rejectIfGlobalRateLimited()) { + return; + } metrics.emit("event.rejected.oversized_ciphertext"); return; } + if (rejectIfGlobalRateLimited()) { + return; + } + // Verify signature (must pass before we trust the event) if (!verifyEvent(event)) { metrics.emit("event.rejected.invalid_signature"); @@ -590,9 +601,23 @@ export async function startNostrBus(options: NostrBusOptions): Promise