fix(discord): use per-channel message queues to restore parallel agent dispatch

Replace the single per-account messageQueue Promise chain in
DiscordMessageListener with per-channel queues. This restores parallel
processing for channel-bound agents that regressed in 2026.3.1.

Messages within the same channel remain serialized to preserve ordering,
while messages to different channels now proceed independently. Completed
queue entries are cleaned up to prevent memory accumulation.

Closes #31530
This commit is contained in:
SidQin-cyber 2026-03-03 00:51:05 +08:00 committed by Peter Steinberger
parent 5b63417fec
commit 479095bcfb
2 changed files with 69 additions and 14 deletions

View File

@ -8,6 +8,10 @@ function createLogger() {
};
}
function fakeEvent(channelId: string) {
return { channel_id: channelId } as never;
}
describe("DiscordMessageListener", () => {
it("returns immediately without awaiting handler completion", async () => {
let resolveHandler: (() => void) | undefined;
@ -20,7 +24,7 @@ describe("DiscordMessageListener", () => {
const logger = createLogger();
const listener = new DiscordMessageListener(handler as never, logger as never);
await expect(listener.handle({} as never, {} as never)).resolves.toBeUndefined();
await expect(listener.handle(fakeEvent("ch-1"), {} as never)).resolves.toBeUndefined();
expect(handler).toHaveBeenCalledTimes(1);
expect(logger.error).not.toHaveBeenCalled();
@ -28,7 +32,7 @@ describe("DiscordMessageListener", () => {
await handlerDone;
});
it("serializes queued handler runs while handle returns immediately", async () => {
it("serializes queued handler runs for the same channel", async () => {
let firstResolve: (() => void) | undefined;
let secondResolve: (() => void) | undefined;
const firstDone = new Promise<void>((resolve) => {
@ -48,10 +52,9 @@ describe("DiscordMessageListener", () => {
});
const listener = new DiscordMessageListener(handler as never, createLogger() as never);
await expect(listener.handle({} as never, {} as never)).resolves.toBeUndefined();
await expect(listener.handle({} as never, {} as never)).resolves.toBeUndefined();
await expect(listener.handle(fakeEvent("ch-1"), {} as never)).resolves.toBeUndefined();
await expect(listener.handle(fakeEvent("ch-1"), {} as never)).resolves.toBeUndefined();
// Second event is queued until the first handler run settles.
expect(handler).toHaveBeenCalledTimes(1);
firstResolve?.();
await vi.waitFor(() => {
@ -62,6 +65,48 @@ describe("DiscordMessageListener", () => {
await secondDone;
});
it("runs handlers for different channels in parallel", async () => {
let resolveA: (() => void) | undefined;
let resolveB: (() => void) | undefined;
const doneA = new Promise<void>((r) => {
resolveA = r;
});
const doneB = new Promise<void>((r) => {
resolveB = r;
});
const order: string[] = [];
const handler = vi.fn(async (data: { channel_id: string }) => {
order.push(`start:${data.channel_id}`);
if (data.channel_id === "ch-a") {
await doneA;
} else {
await doneB;
}
order.push(`end:${data.channel_id}`);
});
const listener = new DiscordMessageListener(handler as never, createLogger() as never);
await listener.handle(fakeEvent("ch-a"), {} as never);
await listener.handle(fakeEvent("ch-b"), {} as never);
await vi.waitFor(() => {
expect(handler).toHaveBeenCalledTimes(2);
});
expect(order).toContain("start:ch-a");
expect(order).toContain("start:ch-b");
resolveB?.();
await vi.waitFor(() => {
expect(order).toContain("end:ch-b");
});
expect(order).not.toContain("end:ch-a");
resolveA?.();
await vi.waitFor(() => {
expect(order).toContain("end:ch-a");
});
});
it("logs async handler failures", async () => {
const handler = vi.fn(async () => {
throw new Error("boom");
@ -69,7 +114,7 @@ describe("DiscordMessageListener", () => {
const logger = createLogger();
const listener = new DiscordMessageListener(handler as never, logger as never);
await expect(listener.handle({} as never, {} as never)).resolves.toBeUndefined();
await expect(listener.handle(fakeEvent("ch-1"), {} as never)).resolves.toBeUndefined();
await vi.waitFor(() => {
expect(logger.error).toHaveBeenCalledWith(
expect.stringContaining("discord handler failed: Error: boom"),

View File

@ -119,7 +119,7 @@ export function registerDiscordListener(listeners: Array<object>, listener: obje
}
export class DiscordMessageListener extends MessageCreateListener {
private messageQueue: Promise<void> = Promise.resolve();
private channelQueues = new Map<string, Promise<void>>();
constructor(
private handler: DiscordMessageHandler,
@ -131,9 +131,12 @@ export class DiscordMessageListener extends MessageCreateListener {
async handle(data: DiscordMessageEvent, client: Client) {
this.onEvent?.();
// Release Carbon's dispatch lane immediately, but keep our message handler
// serialized to avoid unbounded parallel model/IO work on traffic bursts.
this.messageQueue = this.messageQueue
const channelId = data.channel_id;
const prev = this.channelQueues.get(channelId) ?? Promise.resolve();
// Serialize messages within the same channel to preserve ordering,
// but allow different channels to proceed in parallel so that
// channel-bound agents are not blocked by each other.
const next = prev
.catch(() => {})
.then(() =>
runDiscordListenerWithSlowLog({
@ -147,10 +150,17 @@ export class DiscordMessageListener extends MessageCreateListener {
},
}),
);
void this.messageQueue.catch((err) => {
const logger = this.logger ?? discordEventQueueLog;
logger.error(danger(`discord handler failed: ${String(err)}`));
});
this.channelQueues.set(channelId, next);
void next
.then(() => {
if (this.channelQueues.get(channelId) === next) {
this.channelQueues.delete(channelId);
}
})
.catch((err) => {
const logger = this.logger ?? discordEventQueueLog;
logger.error(danger(`discord handler failed: ${String(err)}`));
});
}
}