diff --git a/extensions/matrix/src/matrix/sdk.test.ts b/extensions/matrix/src/matrix/sdk.test.ts index d76dcbdb2c3..b5170fba545 100644 --- a/extensions/matrix/src/matrix/sdk.test.ts +++ b/extensions/matrix/src/matrix/sdk.test.ts @@ -255,6 +255,75 @@ describe("MatrixClient request hardening", () => { }); }); + it("serializes outbound sends per room across message and event sends", async () => { + const client = new MatrixClient("https://matrix.example.org", "token"); + let releaseFirst: (() => void) | undefined; + const started: string[] = []; + matrixJsClient.sendMessage = vi.fn(async () => { + started.push("message"); + await new Promise((resolve) => { + releaseFirst = resolve; + }); + return { event_id: "$message" }; + }); + matrixJsClient.sendEvent = vi.fn(async () => { + started.push("event"); + return { event_id: "$event" }; + }); + + const first = client.sendMessage("!room:example.org", { + msgtype: "m.text", + body: "hello", + }); + const second = client.sendEvent("!room:example.org", "m.reaction", { + "m.relates_to": { event_id: "$target", key: "👍", rel_type: "m.annotation" }, + }); + + await Promise.resolve(); + await Promise.resolve(); + expect(started).toEqual(["message"]); + expect(matrixJsClient.sendEvent).not.toHaveBeenCalled(); + + releaseFirst?.(); + + await expect(first).resolves.toBe("$message"); + await expect(second).resolves.toBe("$event"); + expect(started).toEqual(["message", "event"]); + }); + + it("does not serialize sends across different rooms", async () => { + const client = new MatrixClient("https://matrix.example.org", "token"); + let releaseFirst: (() => void) | undefined; + const started: string[] = []; + matrixJsClient.sendMessage = vi.fn(async (roomId: string) => { + started.push(roomId); + if (roomId === "!room-a:example.org") { + await new Promise((resolve) => { + releaseFirst = resolve; + }); + } + return { event_id: `$${roomId}` }; + }); + + const first = client.sendMessage("!room-a:example.org", { + msgtype: "m.text", + body: "a", + }); + const second = client.sendMessage("!room-b:example.org", { + msgtype: "m.text", + body: "b", + }); + + await Promise.resolve(); + await Promise.resolve(); + expect(started).toEqual(["!room-a:example.org", "!room-b:example.org"]); + + releaseFirst?.(); + + await expect(first).resolves.toBe("$!room-a:example.org"); + await expect(second).resolves.toBe("$!room-b:example.org"); + }); + it("maps relations pages back to raw events", async () => { const client = new MatrixClient("https://matrix.example.org", "token"); matrixJsClient.relations = vi.fn(async () => ({ diff --git a/extensions/matrix/src/matrix/sdk.ts b/extensions/matrix/src/matrix/sdk.ts index 1a8c220d6ec..31aa518ba87 100644 --- a/extensions/matrix/src/matrix/sdk.ts +++ b/extensions/matrix/src/matrix/sdk.ts @@ -9,6 +9,7 @@ import { type MatrixEvent, } from "matrix-js-sdk"; import { VerificationMethod } from "matrix-js-sdk/lib/types.js"; +import { KeyedAsyncQueue } from "openclaw/plugin-sdk/keyed-async-queue"; import { resolveMatrixRoomKeyBackupReadinessError } from "./backup-health.js"; import { createMatrixJsSdkClientLogger } from "./client/logging.js"; import { MatrixCryptoBootstrapper } from "./sdk/crypto-bootstrap.js"; @@ -169,6 +170,7 @@ export class MatrixClient { private cryptoInitialized = false; private readonly decryptBridge: MatrixDecryptBridge; private readonly verificationManager = new MatrixVerificationManager(); + private readonly sendQueue = new KeyedAsyncQueue(); private readonly recoveryKeyStore: MatrixRecoveryKeyStore; private readonly cryptoBootstrapper: MatrixCryptoBootstrapper; private readonly autoBootstrapCrypto: boolean; @@ -526,8 +528,10 @@ export class MatrixClient { } async sendMessage(roomId: string, content: MessageEventContent): Promise { - const sent = await this.client.sendMessage(roomId, content as never); - return sent.event_id; + return await this.runSerializedRoomSend(roomId, async () => { + const sent = await this.client.sendMessage(roomId, content as never); + return sent.event_id; + }); } async sendEvent( @@ -535,8 +539,16 @@ export class MatrixClient { eventType: string, content: Record, ): Promise { - const sent = await this.client.sendEvent(roomId, eventType as never, content as never); - return sent.event_id; + return await this.runSerializedRoomSend(roomId, async () => { + const sent = await this.client.sendEvent(roomId, eventType as never, content as never); + return sent.event_id; + }); + } + + // Keep outbound room events ordered when multiple plugin paths emit + // messages/reactions/polls into the same Matrix room concurrently. + private async runSerializedRoomSend(roomId: string, task: () => Promise): Promise { + return await this.sendQueue.enqueue(roomId, task); } async sendStateEvent(