From 479095bcfba3dd7fbc87f9f9610b15315f572ead Mon Sep 17 00:00:00 2001 From: SidQin-cyber Date: Tue, 3 Mar 2026 00:51:05 +0800 Subject: [PATCH] fix(discord): use per-channel message queues to restore parallel agent dispatch Replace the single per-account messageQueue Promise chain in DiscordMessageListener with per-channel queues. This restores parallel processing for channel-bound agents that regressed in 2026.3.1. Messages within the same channel remain serialized to preserve ordering, while messages to different channels now proceed independently. Completed queue entries are cleaned up to prevent memory accumulation. Closes #31530 --- src/discord/monitor/listeners.test.ts | 57 ++++++++++++++++++++++++--- src/discord/monitor/listeners.ts | 26 ++++++++---- 2 files changed, 69 insertions(+), 14 deletions(-) diff --git a/src/discord/monitor/listeners.test.ts b/src/discord/monitor/listeners.test.ts index 00eef1cb014..6264ab218db 100644 --- a/src/discord/monitor/listeners.test.ts +++ b/src/discord/monitor/listeners.test.ts @@ -8,6 +8,10 @@ function createLogger() { }; } +function fakeEvent(channelId: string) { + return { channel_id: channelId } as never; +} + describe("DiscordMessageListener", () => { it("returns immediately without awaiting handler completion", async () => { let resolveHandler: (() => void) | undefined; @@ -20,7 +24,7 @@ describe("DiscordMessageListener", () => { const logger = createLogger(); const listener = new DiscordMessageListener(handler as never, logger as never); - await expect(listener.handle({} as never, {} as never)).resolves.toBeUndefined(); + await expect(listener.handle(fakeEvent("ch-1"), {} as never)).resolves.toBeUndefined(); expect(handler).toHaveBeenCalledTimes(1); expect(logger.error).not.toHaveBeenCalled(); @@ -28,7 +32,7 @@ describe("DiscordMessageListener", () => { await handlerDone; }); - it("serializes queued handler runs while handle returns immediately", async () => { + it("serializes queued handler runs for the same channel", async () => { let firstResolve: (() => void) | undefined; let secondResolve: (() => void) | undefined; const firstDone = new Promise((resolve) => { @@ -48,10 +52,9 @@ describe("DiscordMessageListener", () => { }); const listener = new DiscordMessageListener(handler as never, createLogger() as never); - await expect(listener.handle({} as never, {} as never)).resolves.toBeUndefined(); - await expect(listener.handle({} as never, {} as never)).resolves.toBeUndefined(); + await expect(listener.handle(fakeEvent("ch-1"), {} as never)).resolves.toBeUndefined(); + await expect(listener.handle(fakeEvent("ch-1"), {} as never)).resolves.toBeUndefined(); - // Second event is queued until the first handler run settles. expect(handler).toHaveBeenCalledTimes(1); firstResolve?.(); await vi.waitFor(() => { @@ -62,6 +65,48 @@ describe("DiscordMessageListener", () => { await secondDone; }); + it("runs handlers for different channels in parallel", async () => { + let resolveA: (() => void) | undefined; + let resolveB: (() => void) | undefined; + const doneA = new Promise((r) => { + resolveA = r; + }); + const doneB = new Promise((r) => { + resolveB = r; + }); + const order: string[] = []; + const handler = vi.fn(async (data: { channel_id: string }) => { + order.push(`start:${data.channel_id}`); + if (data.channel_id === "ch-a") { + await doneA; + } else { + await doneB; + } + order.push(`end:${data.channel_id}`); + }); + const listener = new DiscordMessageListener(handler as never, createLogger() as never); + + await listener.handle(fakeEvent("ch-a"), {} as never); + await listener.handle(fakeEvent("ch-b"), {} as never); + + await vi.waitFor(() => { + expect(handler).toHaveBeenCalledTimes(2); + }); + expect(order).toContain("start:ch-a"); + expect(order).toContain("start:ch-b"); + + resolveB?.(); + await vi.waitFor(() => { + expect(order).toContain("end:ch-b"); + }); + expect(order).not.toContain("end:ch-a"); + + resolveA?.(); + await vi.waitFor(() => { + expect(order).toContain("end:ch-a"); + }); + }); + it("logs async handler failures", async () => { const handler = vi.fn(async () => { throw new Error("boom"); @@ -69,7 +114,7 @@ describe("DiscordMessageListener", () => { const logger = createLogger(); const listener = new DiscordMessageListener(handler as never, logger as never); - await expect(listener.handle({} as never, {} as never)).resolves.toBeUndefined(); + await expect(listener.handle(fakeEvent("ch-1"), {} as never)).resolves.toBeUndefined(); await vi.waitFor(() => { expect(logger.error).toHaveBeenCalledWith( expect.stringContaining("discord handler failed: Error: boom"), diff --git a/src/discord/monitor/listeners.ts b/src/discord/monitor/listeners.ts index 0afd31c9258..516b863e466 100644 --- a/src/discord/monitor/listeners.ts +++ b/src/discord/monitor/listeners.ts @@ -119,7 +119,7 @@ export function registerDiscordListener(listeners: Array, listener: obje } export class DiscordMessageListener extends MessageCreateListener { - private messageQueue: Promise = Promise.resolve(); + private channelQueues = new Map>(); constructor( private handler: DiscordMessageHandler, @@ -131,9 +131,12 @@ export class DiscordMessageListener extends MessageCreateListener { async handle(data: DiscordMessageEvent, client: Client) { this.onEvent?.(); - // Release Carbon's dispatch lane immediately, but keep our message handler - // serialized to avoid unbounded parallel model/IO work on traffic bursts. - this.messageQueue = this.messageQueue + const channelId = data.channel_id; + const prev = this.channelQueues.get(channelId) ?? Promise.resolve(); + // Serialize messages within the same channel to preserve ordering, + // but allow different channels to proceed in parallel so that + // channel-bound agents are not blocked by each other. + const next = prev .catch(() => {}) .then(() => runDiscordListenerWithSlowLog({ @@ -147,10 +150,17 @@ export class DiscordMessageListener extends MessageCreateListener { }, }), ); - void this.messageQueue.catch((err) => { - const logger = this.logger ?? discordEventQueueLog; - logger.error(danger(`discord handler failed: ${String(err)}`)); - }); + this.channelQueues.set(channelId, next); + void next + .then(() => { + if (this.channelQueues.get(channelId) === next) { + this.channelQueues.delete(channelId); + } + }) + .catch((err) => { + const logger = this.logger ?? discordEventQueueLog; + logger.error(danger(`discord handler failed: ${String(err)}`)); + }); } }