fix(nostr): verify inbound dm signatures before pairing replies (#58236)

* fix(nostr): verify inbound dm signatures before pairing

* fix(nostr): authorize senders before rate limiting

* test(nostr): cover pending auth rate-limit starvation

* fix(nostr): rate limit oversized inbound ciphertext

* fix(nostr): dedupe blocked inbound replays

* fix(nostr): rate limit before auth work
This commit is contained in:
Vincent Koc 2026-03-31 22:51:22 +09:00 committed by GitHub
parent 5fc8f6ca8f
commit 4ee742174f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 304 additions and 25 deletions

View File

@ -11,6 +11,7 @@ Docs: https://docs.openclaw.ai
### Changes
- Nostr/inbound DMs: verify inbound event signatures before pairing or sender-authorization side effects, so forged DM events no longer create pairing requests or trigger reply attempts. Thanks @smaeljaish771 and @vincentkoc.
- LINE/outbound media: add LINE image, video, and audio outbound sends on the LINE-specific delivery path, including explicit preview/tracking handling for videos while keeping generic media sends on the existing image-only route. (#45826) Thanks @masatohoshino.
- WhatsApp/reactions: agents can now react with emoji on incoming WhatsApp messages, enabling more natural conversational interactions like acknowledging a photo with ❤️ instead of typing a reply. Thanks @mcaxtr.
- MCP: add remote HTTP/SSE server support for `mcp.servers` URL configs, including auth headers and safer config redaction for MCP credentials. (#50396) Thanks @dhananjai1729.

View File

@ -101,7 +101,7 @@ describe("startNostrBus inbound guards", () => {
mockState.handlers = null;
});
it("checks sender authorization before verify/decrypt", async () => {
it("checks sender authorization after verify and before decrypt", async () => {
const onMessage = vi.fn(async () => {});
const authorizeSender = vi.fn(async () => "block" as const);
const bus = await startNostrBus({
@ -114,7 +114,7 @@ describe("startNostrBus inbound guards", () => {
await emitEvent(createEvent());
expect(authorizeSender).toHaveBeenCalledTimes(1);
expect(mockState.verifyEvent).not.toHaveBeenCalled();
expect(mockState.verifyEvent).toHaveBeenCalledTimes(1);
expect(mockState.decrypt).not.toHaveBeenCalled();
expect(onMessage).not.toHaveBeenCalled();
expect(bus.getMetrics().eventsReceived).toBe(1);
@ -122,6 +122,28 @@ describe("startNostrBus inbound guards", () => {
bus.close();
});
it("rejects invalid signatures before sender authorization", async () => {
mockState.verifyEvent.mockReturnValueOnce(false);
const onMessage = vi.fn(async () => {});
const authorizeSender = vi.fn(async () => "allow" as const);
const bus = await startNostrBus({
privateKey: TEST_HEX_PRIVATE_KEY,
onMessage,
authorizeSender,
onMetric: () => {},
});
await emitEvent(createEvent());
expect(mockState.verifyEvent).toHaveBeenCalledTimes(1);
expect(authorizeSender).not.toHaveBeenCalled();
expect(mockState.decrypt).not.toHaveBeenCalled();
expect(onMessage).not.toHaveBeenCalled();
expect(bus.getMetrics().eventsRejected.invalidSignature).toBe(1);
bus.close();
});
it("rate limits repeated events before decrypt", async () => {
const onMessage = vi.fn(async () => {});
const bus = await startNostrBus({
@ -146,6 +168,237 @@ describe("startNostrBus inbound guards", () => {
bus.close();
});
it("does not let a blocked sender starve a different verified sender", async () => {
const onMessage = vi.fn(async () => {});
const authorizeSender = vi.fn(async ({ senderPubkey }: { senderPubkey: string }) =>
senderPubkey.startsWith("blocked") ? ("block" as const) : ("allow" as const),
);
const bus = await startNostrBus({
privateKey: TEST_HEX_PRIVATE_KEY,
onMessage,
authorizeSender,
onMetric: () => {},
guardPolicy: {
rateLimit: {
windowMs: 60_000,
maxGlobalPerWindow: 2,
maxPerSenderPerWindow: 1,
maxTrackedSenderKeys: 32,
},
},
});
await emitEvent(
createEvent({
id: "blocked-event",
pubkey: `blocked${"a".repeat(57)}`,
}),
);
await emitEvent(
createEvent({
id: "allowed-event",
pubkey: `allowed${"b".repeat(57)}`,
}),
);
expect(authorizeSender).toHaveBeenCalledTimes(2);
expect(mockState.decrypt).toHaveBeenCalledTimes(1);
expect(onMessage).toHaveBeenCalledTimes(1);
expect(bus.getMetrics().eventsRejected.rateLimited).toBe(0);
bus.close();
});
it("dedupes replayed verified events that authorization blocks", async () => {
const onMessage = vi.fn(async () => {});
const authorizeSender = vi.fn(async () => "block" as const);
const bus = await startNostrBus({
privateKey: TEST_HEX_PRIVATE_KEY,
onMessage,
authorizeSender,
onMetric: () => {},
});
const blockedEvent = createEvent({
id: "blocked-replay",
pubkey: `blocked${"a".repeat(57)}`,
});
await emitEvent(blockedEvent);
await emitEvent(blockedEvent);
expect(mockState.verifyEvent).toHaveBeenCalledTimes(1);
expect(authorizeSender).toHaveBeenCalledTimes(1);
expect(mockState.decrypt).not.toHaveBeenCalled();
expect(onMessage).not.toHaveBeenCalled();
bus.close();
});
it("does not rate limit an allowed sender while another authorization is still pending", async () => {
const onMessage = vi.fn(async () => {});
let resolveBlocked: ((value: "block") => void) | undefined;
const blockedPromise = new Promise<"block">((resolve) => {
resolveBlocked = resolve;
});
const authorizeSender = vi
.fn<(params: { senderPubkey: string }) => Promise<"allow" | "block" | "pairing">>()
.mockImplementationOnce(async () => await blockedPromise)
.mockResolvedValueOnce("allow");
const bus = await startNostrBus({
privateKey: TEST_HEX_PRIVATE_KEY,
onMessage,
authorizeSender,
onMetric: () => {},
guardPolicy: {
rateLimit: {
windowMs: 60_000,
maxGlobalPerWindow: 2,
maxPerSenderPerWindow: 1,
maxTrackedSenderKeys: 32,
},
},
});
const blockedEventPromise = emitEvent(
createEvent({
id: "blocked-pending",
pubkey: `blocked${"a".repeat(57)}`,
}),
);
await emitEvent(
createEvent({
id: "allowed-during-pending-auth",
pubkey: `allowed${"b".repeat(57)}`,
}),
);
resolveBlocked?.("block");
await blockedEventPromise;
expect(authorizeSender).toHaveBeenCalledTimes(2);
expect(mockState.decrypt).toHaveBeenCalledTimes(1);
expect(onMessage).toHaveBeenCalledTimes(1);
expect(bus.getMetrics().eventsRejected.rateLimited).toBe(0);
bus.close();
});
it("rate limits repeated invalid signatures before authorization work fans out", async () => {
mockState.verifyEvent.mockReturnValue(false);
const onMessage = vi.fn(async () => {});
const authorizeSender = vi.fn(async () => "allow" as const);
const bus = await startNostrBus({
privateKey: TEST_HEX_PRIVATE_KEY,
onMessage,
authorizeSender,
onMetric: () => {},
guardPolicy: {
rateLimit: {
windowMs: 60_000,
maxGlobalPerWindow: 1,
maxPerSenderPerWindow: 10,
maxTrackedSenderKeys: 32,
},
},
});
await emitEvent(createEvent({ id: "invalid-1" }));
await emitEvent(createEvent({ id: "invalid-2" }));
expect(mockState.verifyEvent).toHaveBeenCalledTimes(1);
expect(authorizeSender).not.toHaveBeenCalled();
expect(bus.getMetrics().eventsRejected.invalidSignature).toBe(1);
expect(bus.getMetrics().eventsRejected.rateLimited).toBe(1);
bus.close();
});
it("counts oversized ciphertext toward the global inbound rate limit", async () => {
const onMessage = vi.fn(async () => {});
const bus = await startNostrBus({
privateKey: TEST_HEX_PRIVATE_KEY,
onMessage,
onMetric: () => {},
guardPolicy: {
maxCiphertextBytes: 4,
rateLimit: {
windowMs: 60_000,
maxGlobalPerWindow: 1,
maxPerSenderPerWindow: 10,
maxTrackedSenderKeys: 32,
},
},
});
await emitEvent(
createEvent({
id: "oversized-global-1",
pubkey: `sender1${"a".repeat(57)}`,
content: "ciphertext-too-large",
}),
);
await emitEvent(
createEvent({
id: "oversized-global-2",
pubkey: `sender2${"b".repeat(57)}`,
content: "ciphertext-too-large",
}),
);
expect(bus.getMetrics().eventsRejected.oversizedCiphertext).toBe(1);
expect(bus.getMetrics().eventsRejected.rateLimited).toBe(1);
expect(mockState.verifyEvent).not.toHaveBeenCalled();
expect(mockState.decrypt).not.toHaveBeenCalled();
expect(onMessage).not.toHaveBeenCalled();
bus.close();
});
it("does not spend per-sender buckets on oversized ciphertext before verification", async () => {
const onMessage = vi.fn(async () => {});
const bus = await startNostrBus({
privateKey: TEST_HEX_PRIVATE_KEY,
onMessage,
onMetric: () => {},
guardPolicy: {
maxCiphertextBytes: 4,
rateLimit: {
windowMs: 60_000,
maxGlobalPerWindow: 10,
maxPerSenderPerWindow: 1,
maxTrackedSenderKeys: 32,
},
},
});
await emitEvent(
createEvent({
id: "oversized-sender-1",
content: "ciphertext-too-large",
}),
);
await emitEvent(
createEvent({
id: "oversized-sender-2",
content: "ciphertext-too-large",
}),
);
await emitEvent(
createEvent({
id: "allowed-after-oversized",
content: "ok",
}),
);
expect(bus.getMetrics().eventsRejected.oversizedCiphertext).toBe(2);
expect(bus.getMetrics().eventsRejected.rateLimited).toBe(0);
expect(mockState.verifyEvent).toHaveBeenCalledTimes(1);
expect(mockState.decrypt).toHaveBeenCalledTimes(1);
expect(onMessage).toHaveBeenCalledTimes(1);
bus.close();
});
it("rejects far-future events before crypto", async () => {
const onMessage = vi.fn(async () => {});
const bus = await startNostrBus({

View File

@ -64,7 +64,7 @@ export interface NostrBusOptions {
reply: (text: string) => Promise<void>,
meta: { eventId: string; createdAt: number },
) => Promise<void>;
/** Called before expensive crypto to allow sender policy checks (optional) */
/** Called after signature verification and before decrypt to allow sender policy checks (optional) */
authorizeSender?: (params: {
senderPubkey: string;
reply: (text: string) => Promise<void>;
@ -553,36 +553,47 @@ export async function startNostrBus(options: NostrBusOptions): Promise<NostrBusH
);
};
if (authorizeSender) {
const decision = await authorizeSender({
senderPubkey: event.pubkey,
reply: replyTo,
});
if (decision !== "allow") {
return;
const rejectIfGlobalRateLimited = (): boolean => {
updateRateLimiterSizeMetric();
if (globalRateLimiter.isRateLimited("global")) {
metrics.emit("rate_limit.global");
metrics.emit("event.rejected.rate_limited");
updateRateLimiterSizeMetric();
return true;
}
}
updateRateLimiterSizeMetric();
return false;
};
updateRateLimiterSizeMetric();
if (globalRateLimiter.isRateLimited("global")) {
metrics.emit("rate_limit.global");
metrics.emit("event.rejected.rate_limited");
const rejectIfVerifiedSenderRateLimited = (): boolean => {
updateRateLimiterSizeMetric();
return;
}
if (perSenderRateLimiter.isRateLimited(event.pubkey)) {
metrics.emit("rate_limit.per_sender");
metrics.emit("event.rejected.rate_limited");
if (perSenderRateLimiter.isRateLimited(event.pubkey)) {
metrics.emit("rate_limit.per_sender");
metrics.emit("event.rejected.rate_limited");
updateRateLimiterSizeMetric();
return true;
}
updateRateLimiterSizeMetric();
return;
}
updateRateLimiterSizeMetric();
return false;
};
const markSeen = () => {
seen.add(event.id);
metrics.emit("memory.seen_tracker_size", seen.size());
};
if (Buffer.byteLength(event.content, "utf8") > guardPolicy.maxCiphertextBytes) {
if (rejectIfGlobalRateLimited()) {
return;
}
metrics.emit("event.rejected.oversized_ciphertext");
return;
}
if (rejectIfGlobalRateLimited()) {
return;
}
// Verify signature (must pass before we trust the event)
if (!verifyEvent(event)) {
metrics.emit("event.rejected.invalid_signature");
@ -590,9 +601,23 @@ export async function startNostrBus(options: NostrBusOptions): Promise<NostrBusH
return;
}
if (rejectIfVerifiedSenderRateLimited()) {
return;
}
if (authorizeSender) {
const decision = await authorizeSender({
senderPubkey: event.pubkey,
reply: replyTo,
});
if (decision !== "allow") {
markSeen();
return;
}
}
// Mark seen AFTER verify (don't cache invalid IDs)
seen.add(event.id);
metrics.emit("memory.seen_tracker_size", seen.size());
markSeen();
// Decrypt the message
let plaintext: string;