diff --git a/src/discord/monitor/listeners.test.ts b/src/discord/monitor/listeners.test.ts index 00eef1cb014..6264ab218db 100644 --- a/src/discord/monitor/listeners.test.ts +++ b/src/discord/monitor/listeners.test.ts @@ -8,6 +8,10 @@ function createLogger() { }; } +function fakeEvent(channelId: string) { + return { channel_id: channelId } as never; +} + describe("DiscordMessageListener", () => { it("returns immediately without awaiting handler completion", async () => { let resolveHandler: (() => void) | undefined; @@ -20,7 +24,7 @@ describe("DiscordMessageListener", () => { const logger = createLogger(); const listener = new DiscordMessageListener(handler as never, logger as never); - await expect(listener.handle({} as never, {} as never)).resolves.toBeUndefined(); + await expect(listener.handle(fakeEvent("ch-1"), {} as never)).resolves.toBeUndefined(); expect(handler).toHaveBeenCalledTimes(1); expect(logger.error).not.toHaveBeenCalled(); @@ -28,7 +32,7 @@ describe("DiscordMessageListener", () => { await handlerDone; }); - it("serializes queued handler runs while handle returns immediately", async () => { + it("serializes queued handler runs for the same channel", async () => { let firstResolve: (() => void) | undefined; let secondResolve: (() => void) | undefined; const firstDone = new Promise((resolve) => { @@ -48,10 +52,9 @@ describe("DiscordMessageListener", () => { }); const listener = new DiscordMessageListener(handler as never, createLogger() as never); - await expect(listener.handle({} as never, {} as never)).resolves.toBeUndefined(); - await expect(listener.handle({} as never, {} as never)).resolves.toBeUndefined(); + await expect(listener.handle(fakeEvent("ch-1"), {} as never)).resolves.toBeUndefined(); + await expect(listener.handle(fakeEvent("ch-1"), {} as never)).resolves.toBeUndefined(); - // Second event is queued until the first handler run settles. expect(handler).toHaveBeenCalledTimes(1); firstResolve?.(); await vi.waitFor(() => { @@ -62,6 +65,48 @@ describe("DiscordMessageListener", () => { await secondDone; }); + it("runs handlers for different channels in parallel", async () => { + let resolveA: (() => void) | undefined; + let resolveB: (() => void) | undefined; + const doneA = new Promise((r) => { + resolveA = r; + }); + const doneB = new Promise((r) => { + resolveB = r; + }); + const order: string[] = []; + const handler = vi.fn(async (data: { channel_id: string }) => { + order.push(`start:${data.channel_id}`); + if (data.channel_id === "ch-a") { + await doneA; + } else { + await doneB; + } + order.push(`end:${data.channel_id}`); + }); + const listener = new DiscordMessageListener(handler as never, createLogger() as never); + + await listener.handle(fakeEvent("ch-a"), {} as never); + await listener.handle(fakeEvent("ch-b"), {} as never); + + await vi.waitFor(() => { + expect(handler).toHaveBeenCalledTimes(2); + }); + expect(order).toContain("start:ch-a"); + expect(order).toContain("start:ch-b"); + + resolveB?.(); + await vi.waitFor(() => { + expect(order).toContain("end:ch-b"); + }); + expect(order).not.toContain("end:ch-a"); + + resolveA?.(); + await vi.waitFor(() => { + expect(order).toContain("end:ch-a"); + }); + }); + it("logs async handler failures", async () => { const handler = vi.fn(async () => { throw new Error("boom"); @@ -69,7 +114,7 @@ describe("DiscordMessageListener", () => { const logger = createLogger(); const listener = new DiscordMessageListener(handler as never, logger as never); - await expect(listener.handle({} as never, {} as never)).resolves.toBeUndefined(); + await expect(listener.handle(fakeEvent("ch-1"), {} as never)).resolves.toBeUndefined(); await vi.waitFor(() => { expect(logger.error).toHaveBeenCalledWith( expect.stringContaining("discord handler failed: Error: boom"), diff --git a/src/discord/monitor/listeners.ts b/src/discord/monitor/listeners.ts index 0afd31c9258..516b863e466 100644 --- a/src/discord/monitor/listeners.ts +++ b/src/discord/monitor/listeners.ts @@ -119,7 +119,7 @@ export function registerDiscordListener(listeners: Array, listener: obje } export class DiscordMessageListener extends MessageCreateListener { - private messageQueue: Promise = Promise.resolve(); + private channelQueues = new Map>(); constructor( private handler: DiscordMessageHandler, @@ -131,9 +131,12 @@ export class DiscordMessageListener extends MessageCreateListener { async handle(data: DiscordMessageEvent, client: Client) { this.onEvent?.(); - // Release Carbon's dispatch lane immediately, but keep our message handler - // serialized to avoid unbounded parallel model/IO work on traffic bursts. - this.messageQueue = this.messageQueue + const channelId = data.channel_id; + const prev = this.channelQueues.get(channelId) ?? Promise.resolve(); + // Serialize messages within the same channel to preserve ordering, + // but allow different channels to proceed in parallel so that + // channel-bound agents are not blocked by each other. + const next = prev .catch(() => {}) .then(() => runDiscordListenerWithSlowLog({ @@ -147,10 +150,17 @@ export class DiscordMessageListener extends MessageCreateListener { }, }), ); - void this.messageQueue.catch((err) => { - const logger = this.logger ?? discordEventQueueLog; - logger.error(danger(`discord handler failed: ${String(err)}`)); - }); + this.channelQueues.set(channelId, next); + void next + .then(() => { + if (this.channelQueues.get(channelId) === next) { + this.channelQueues.delete(channelId); + } + }) + .catch((err) => { + const logger = this.logger ?? discordEventQueueLog; + logger.error(danger(`discord handler failed: ${String(err)}`)); + }); } }