mirror of https://github.com/openclaw/openclaw.git
Matrix: serialize outbound sends per room
This commit is contained in:
parent
caebc75456
commit
5e480a4c90
|
|
@ -255,6 +255,75 @@ describe("MatrixClient request hardening", () => {
|
|||
});
|
||||
});
|
||||
|
||||
it("serializes outbound sends per room across message and event sends", async () => {
|
||||
const client = new MatrixClient("https://matrix.example.org", "token");
|
||||
let releaseFirst: (() => void) | undefined;
|
||||
const started: string[] = [];
|
||||
matrixJsClient.sendMessage = vi.fn(async () => {
|
||||
started.push("message");
|
||||
await new Promise<void>((resolve) => {
|
||||
releaseFirst = resolve;
|
||||
});
|
||||
return { event_id: "$message" };
|
||||
});
|
||||
matrixJsClient.sendEvent = vi.fn(async () => {
|
||||
started.push("event");
|
||||
return { event_id: "$event" };
|
||||
});
|
||||
|
||||
const first = client.sendMessage("!room:example.org", {
|
||||
msgtype: "m.text",
|
||||
body: "hello",
|
||||
});
|
||||
const second = client.sendEvent("!room:example.org", "m.reaction", {
|
||||
"m.relates_to": { event_id: "$target", key: "👍", rel_type: "m.annotation" },
|
||||
});
|
||||
|
||||
await Promise.resolve();
|
||||
await Promise.resolve();
|
||||
expect(started).toEqual(["message"]);
|
||||
expect(matrixJsClient.sendEvent).not.toHaveBeenCalled();
|
||||
|
||||
releaseFirst?.();
|
||||
|
||||
await expect(first).resolves.toBe("$message");
|
||||
await expect(second).resolves.toBe("$event");
|
||||
expect(started).toEqual(["message", "event"]);
|
||||
});
|
||||
|
||||
it("does not serialize sends across different rooms", async () => {
|
||||
const client = new MatrixClient("https://matrix.example.org", "token");
|
||||
let releaseFirst: (() => void) | undefined;
|
||||
const started: string[] = [];
|
||||
matrixJsClient.sendMessage = vi.fn(async (roomId: string) => {
|
||||
started.push(roomId);
|
||||
if (roomId === "!room-a:example.org") {
|
||||
await new Promise<void>((resolve) => {
|
||||
releaseFirst = resolve;
|
||||
});
|
||||
}
|
||||
return { event_id: `$${roomId}` };
|
||||
});
|
||||
|
||||
const first = client.sendMessage("!room-a:example.org", {
|
||||
msgtype: "m.text",
|
||||
body: "a",
|
||||
});
|
||||
const second = client.sendMessage("!room-b:example.org", {
|
||||
msgtype: "m.text",
|
||||
body: "b",
|
||||
});
|
||||
|
||||
await Promise.resolve();
|
||||
await Promise.resolve();
|
||||
expect(started).toEqual(["!room-a:example.org", "!room-b:example.org"]);
|
||||
|
||||
releaseFirst?.();
|
||||
|
||||
await expect(first).resolves.toBe("$!room-a:example.org");
|
||||
await expect(second).resolves.toBe("$!room-b:example.org");
|
||||
});
|
||||
|
||||
it("maps relations pages back to raw events", async () => {
|
||||
const client = new MatrixClient("https://matrix.example.org", "token");
|
||||
matrixJsClient.relations = vi.fn(async () => ({
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ import {
|
|||
type MatrixEvent,
|
||||
} from "matrix-js-sdk";
|
||||
import { VerificationMethod } from "matrix-js-sdk/lib/types.js";
|
||||
import { KeyedAsyncQueue } from "openclaw/plugin-sdk/keyed-async-queue";
|
||||
import { resolveMatrixRoomKeyBackupReadinessError } from "./backup-health.js";
|
||||
import { createMatrixJsSdkClientLogger } from "./client/logging.js";
|
||||
import { MatrixCryptoBootstrapper } from "./sdk/crypto-bootstrap.js";
|
||||
|
|
@ -169,6 +170,7 @@ export class MatrixClient {
|
|||
private cryptoInitialized = false;
|
||||
private readonly decryptBridge: MatrixDecryptBridge<MatrixRawEvent>;
|
||||
private readonly verificationManager = new MatrixVerificationManager();
|
||||
private readonly sendQueue = new KeyedAsyncQueue();
|
||||
private readonly recoveryKeyStore: MatrixRecoveryKeyStore;
|
||||
private readonly cryptoBootstrapper: MatrixCryptoBootstrapper<MatrixRawEvent>;
|
||||
private readonly autoBootstrapCrypto: boolean;
|
||||
|
|
@ -526,8 +528,10 @@ export class MatrixClient {
|
|||
}
|
||||
|
||||
async sendMessage(roomId: string, content: MessageEventContent): Promise<string> {
|
||||
const sent = await this.client.sendMessage(roomId, content as never);
|
||||
return sent.event_id;
|
||||
return await this.runSerializedRoomSend(roomId, async () => {
|
||||
const sent = await this.client.sendMessage(roomId, content as never);
|
||||
return sent.event_id;
|
||||
});
|
||||
}
|
||||
|
||||
async sendEvent(
|
||||
|
|
@ -535,8 +539,16 @@ export class MatrixClient {
|
|||
eventType: string,
|
||||
content: Record<string, unknown>,
|
||||
): Promise<string> {
|
||||
const sent = await this.client.sendEvent(roomId, eventType as never, content as never);
|
||||
return sent.event_id;
|
||||
return await this.runSerializedRoomSend(roomId, async () => {
|
||||
const sent = await this.client.sendEvent(roomId, eventType as never, content as never);
|
||||
return sent.event_id;
|
||||
});
|
||||
}
|
||||
|
||||
// Keep outbound room events ordered when multiple plugin paths emit
|
||||
// messages/reactions/polls into the same Matrix room concurrently.
|
||||
private async runSerializedRoomSend<T>(roomId: string, task: () => Promise<T>): Promise<T> {
|
||||
return await this.sendQueue.enqueue(roomId, task);
|
||||
}
|
||||
|
||||
async sendStateEvent(
|
||||
|
|
|
|||
Loading…
Reference in New Issue