mirror of https://github.com/openclaw/openclaw.git
test: stabilize recurring windows ci suites
This commit is contained in:
parent
6ab0f62b3b
commit
da03d857f9
|
|
@ -3,8 +3,6 @@ import { setTimeout as sleep } from "node:timers/promises";
|
|||
import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type { OpenClawConfig } from "../../config/config.js";
|
||||
import type { AcpSessionRuntimeOptions, SessionAcpMeta } from "../../config/sessions/types.js";
|
||||
import { resetFlowRegistryForTests } from "../../tasks/flow-registry.js";
|
||||
import { findTaskByRunId, resetTaskRegistryForTests } from "../../tasks/task-registry.js";
|
||||
import { withTempDir } from "../../test-helpers/temp-dir.js";
|
||||
import type { AcpRuntime, AcpRuntimeCapabilities } from "../runtime/types.js";
|
||||
|
||||
|
|
@ -39,6 +37,10 @@ vi.mock("../runtime/registry.js", async (importOriginal) => {
|
|||
let AcpSessionManager: typeof import("./manager.js").AcpSessionManager;
|
||||
let AcpRuntimeError: typeof import("../runtime/errors.js").AcpRuntimeError;
|
||||
let resetAcpSessionManagerForTests: typeof import("./manager.js").__testing.resetAcpSessionManagerForTests;
|
||||
let findTaskByRunId: typeof import("../../tasks/task-registry.js").findTaskByRunId;
|
||||
let resetTaskRegistryForTests: typeof import("../../tasks/task-registry.js").resetTaskRegistryForTests;
|
||||
let resetFlowRegistryForTests: typeof import("../../tasks/flow-registry.js").resetFlowRegistryForTests;
|
||||
let installInMemoryTaskAndFlowRegistryRuntime: typeof import("../../test-utils/task-flow-registry-runtime.js").installInMemoryTaskAndFlowRegistryRuntime;
|
||||
|
||||
const baseCfg = {
|
||||
acp: {
|
||||
|
|
@ -52,13 +54,14 @@ const ORIGINAL_STATE_DIR = process.env.OPENCLAW_STATE_DIR;
|
|||
async function withAcpManagerTaskStateDir(run: (root: string) => Promise<void>): Promise<void> {
|
||||
await withTempDir({ prefix: "openclaw-acp-manager-task-" }, async (root) => {
|
||||
process.env.OPENCLAW_STATE_DIR = root;
|
||||
resetTaskRegistryForTests();
|
||||
resetFlowRegistryForTests();
|
||||
resetTaskRegistryForTests({ persist: false });
|
||||
resetFlowRegistryForTests({ persist: false });
|
||||
installInMemoryTaskAndFlowRegistryRuntime();
|
||||
try {
|
||||
await run(root);
|
||||
} finally {
|
||||
resetTaskRegistryForTests();
|
||||
resetFlowRegistryForTests();
|
||||
resetTaskRegistryForTests({ persist: false });
|
||||
resetFlowRegistryForTests({ persist: false });
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
@ -190,6 +193,10 @@ describe("AcpSessionManager", () => {
|
|||
__testing: { resetAcpSessionManagerForTests },
|
||||
} = await import("./manager.js"));
|
||||
({ AcpRuntimeError } = await import("../runtime/errors.js"));
|
||||
({ findTaskByRunId, resetTaskRegistryForTests } = await import("../../tasks/task-registry.js"));
|
||||
({ resetFlowRegistryForTests } = await import("../../tasks/flow-registry.js"));
|
||||
({ installInMemoryTaskAndFlowRegistryRuntime } =
|
||||
await import("../../test-utils/task-flow-registry-runtime.js"));
|
||||
});
|
||||
|
||||
beforeEach(() => {
|
||||
|
|
@ -207,8 +214,8 @@ describe("AcpSessionManager", () => {
|
|||
} else {
|
||||
process.env.OPENCLAW_STATE_DIR = ORIGINAL_STATE_DIR;
|
||||
}
|
||||
resetTaskRegistryForTests();
|
||||
resetFlowRegistryForTests();
|
||||
resetTaskRegistryForTests({ persist: false });
|
||||
resetFlowRegistryForTests({ persist: false });
|
||||
});
|
||||
|
||||
it("marks ACP-shaped sessions without metadata as stale", () => {
|
||||
|
|
|
|||
|
|
@ -1,32 +1,24 @@
|
|||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import {
|
||||
clearSessionStoreCacheForTest,
|
||||
resetSessionStoreLockRuntimeForTests,
|
||||
setSessionWriteLockAcquirerForTests,
|
||||
withSessionStoreLockForTest,
|
||||
} from "./store.js";
|
||||
|
||||
const acquireSessionWriteLockMock = vi.hoisted(() =>
|
||||
vi.fn(async () => ({ release: vi.fn(async () => {}) })),
|
||||
);
|
||||
|
||||
let withSessionStoreLockForTest: typeof import("./store.js").withSessionStoreLockForTest;
|
||||
let clearSessionStoreCacheForTest: typeof import("./store.js").clearSessionStoreCacheForTest;
|
||||
|
||||
async function loadFreshStoreModule() {
|
||||
vi.resetModules();
|
||||
vi.doMock("../../agents/session-write-lock.js", async (importOriginal) => {
|
||||
const original = await importOriginal<typeof import("../../agents/session-write-lock.js")>();
|
||||
return {
|
||||
...original,
|
||||
acquireSessionWriteLock: acquireSessionWriteLockMock,
|
||||
};
|
||||
});
|
||||
({ withSessionStoreLockForTest, clearSessionStoreCacheForTest } = await import("./store.js"));
|
||||
}
|
||||
|
||||
describe("withSessionStoreLock", () => {
|
||||
beforeEach(async () => {
|
||||
beforeEach(() => {
|
||||
acquireSessionWriteLockMock.mockClear();
|
||||
await loadFreshStoreModule();
|
||||
setSessionWriteLockAcquirerForTests(acquireSessionWriteLockMock);
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
clearSessionStoreCacheForTest();
|
||||
resetSessionStoreLockRuntimeForTests();
|
||||
vi.restoreAllMocks();
|
||||
});
|
||||
|
||||
|
|
|
|||
|
|
@ -47,6 +47,7 @@ const log = createSubsystemLogger("sessions/store");
|
|||
let sessionArchiveRuntimePromise: Promise<
|
||||
typeof import("../../gateway/session-archive.runtime.js")
|
||||
> | null = null;
|
||||
let sessionWriteLockAcquirerForTests: typeof acquireSessionWriteLock | null = null;
|
||||
|
||||
function loadSessionArchiveRuntime() {
|
||||
sessionArchiveRuntimePromise ??= import("../../gateway/session-archive.runtime.js");
|
||||
|
|
@ -166,6 +167,16 @@ export function clearSessionStoreCacheForTest(): void {
|
|||
LOCK_QUEUES.clear();
|
||||
}
|
||||
|
||||
export function setSessionWriteLockAcquirerForTests(
|
||||
acquirer: typeof acquireSessionWriteLock | null,
|
||||
): void {
|
||||
sessionWriteLockAcquirerForTests = acquirer;
|
||||
}
|
||||
|
||||
export function resetSessionStoreLockRuntimeForTests(): void {
|
||||
sessionWriteLockAcquirerForTests = null;
|
||||
}
|
||||
|
||||
export async function drainSessionStoreLockQueuesForTest(): Promise<void> {
|
||||
while (LOCK_QUEUES.size > 0) {
|
||||
const queues = [...LOCK_QUEUES.values()];
|
||||
|
|
@ -764,7 +775,7 @@ async function drainSessionStoreLockQueue(storePath: string): Promise<void> {
|
|||
let failed: unknown;
|
||||
let hasFailure = false;
|
||||
try {
|
||||
lock = await acquireSessionWriteLock({
|
||||
lock = await (sessionWriteLockAcquirerForTests ?? acquireSessionWriteLock)({
|
||||
sessionFile: storePath,
|
||||
timeoutMs: remainingTimeoutMs,
|
||||
staleMs: task.staleMs,
|
||||
|
|
|
|||
|
|
@ -113,7 +113,7 @@ async function connectMcp(params: {
|
|||
|
||||
describe("openclaw channel mcp server", () => {
|
||||
describe("gateway-backed flows", () => {
|
||||
installGatewayTestHooks({ scope: "suite" });
|
||||
installGatewayTestHooks();
|
||||
|
||||
test("lists conversations, reads messages, and waits for events", async () => {
|
||||
const storePath = await createSessionStoreFile();
|
||||
|
|
|
|||
|
|
@ -0,0 +1,108 @@
|
|||
import type { ChannelAccountSnapshot } from "../channels/plugins/types.core.js";
|
||||
|
||||
type CloseAwareServer = {
|
||||
once: (event: "close", listener: () => void) => unknown;
|
||||
};
|
||||
|
||||
type PassiveAccountLifecycleParams<Handle> = {
|
||||
abortSignal?: AbortSignal;
|
||||
start: () => Promise<Handle>;
|
||||
stop?: (handle: Handle) => void | Promise<void>;
|
||||
onStop?: () => void | Promise<void>;
|
||||
};
|
||||
|
||||
/** Bind a fixed account id into a status writer so lifecycle code can emit partial snapshots. */
|
||||
export function createAccountStatusSink(params: {
|
||||
accountId: string;
|
||||
setStatus: (next: ChannelAccountSnapshot) => void;
|
||||
}): (patch: Omit<ChannelAccountSnapshot, "accountId">) => void {
|
||||
return (patch) => {
|
||||
params.setStatus({ accountId: params.accountId, ...patch });
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a promise that resolves when the signal is aborted.
|
||||
*
|
||||
* If no signal is provided, the promise stays pending forever. When provided,
|
||||
* `onAbort` runs once before the promise resolves.
|
||||
*/
|
||||
export function waitUntilAbort(
|
||||
signal?: AbortSignal,
|
||||
onAbort?: () => void | Promise<void>,
|
||||
): Promise<void> {
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
const complete = () => {
|
||||
Promise.resolve(onAbort?.()).then(() => resolve(), reject);
|
||||
};
|
||||
if (!signal) {
|
||||
return;
|
||||
}
|
||||
if (signal.aborted) {
|
||||
complete();
|
||||
return;
|
||||
}
|
||||
signal.addEventListener("abort", complete, { once: true });
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Keep a passive account task alive until abort, then run optional cleanup.
|
||||
*/
|
||||
export async function runPassiveAccountLifecycle<Handle>(
|
||||
params: PassiveAccountLifecycleParams<Handle>,
|
||||
): Promise<void> {
|
||||
const handle = await params.start();
|
||||
|
||||
try {
|
||||
await waitUntilAbort(params.abortSignal);
|
||||
} finally {
|
||||
await params.stop?.(handle);
|
||||
await params.onStop?.();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Keep a channel/provider task pending until the HTTP server closes.
|
||||
*
|
||||
* When an abort signal is provided, `onAbort` is invoked once and should
|
||||
* trigger server shutdown. The returned promise resolves only after `close`.
|
||||
*/
|
||||
export async function keepHttpServerTaskAlive(params: {
|
||||
server: CloseAwareServer;
|
||||
abortSignal?: AbortSignal;
|
||||
onAbort?: () => void | Promise<void>;
|
||||
}): Promise<void> {
|
||||
const { server, abortSignal, onAbort } = params;
|
||||
let abortTask: Promise<void> = Promise.resolve();
|
||||
let abortTriggered = false;
|
||||
|
||||
const triggerAbort = () => {
|
||||
if (abortTriggered) {
|
||||
return;
|
||||
}
|
||||
abortTriggered = true;
|
||||
abortTask = Promise.resolve(onAbort?.()).then(() => undefined);
|
||||
};
|
||||
|
||||
const onAbortSignal = () => {
|
||||
triggerAbort();
|
||||
};
|
||||
|
||||
if (abortSignal) {
|
||||
if (abortSignal.aborted) {
|
||||
triggerAbort();
|
||||
} else {
|
||||
abortSignal.addEventListener("abort", onAbortSignal, { once: true });
|
||||
}
|
||||
}
|
||||
|
||||
await new Promise<void>((resolve) => {
|
||||
server.once("close", () => resolve());
|
||||
});
|
||||
|
||||
if (abortSignal) {
|
||||
abortSignal.removeEventListener("abort", onAbortSignal);
|
||||
}
|
||||
await abortTask;
|
||||
}
|
||||
|
|
@ -5,7 +5,7 @@ import {
|
|||
keepHttpServerTaskAlive,
|
||||
runPassiveAccountLifecycle,
|
||||
waitUntilAbort,
|
||||
} from "./channel-lifecycle.js";
|
||||
} from "./channel-lifecycle.core.js";
|
||||
|
||||
type FakeServer = EventEmitter & {
|
||||
close: (callback?: () => void) => void;
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
import type { ChannelAccountSnapshot } from "../channels/plugins/types.core.js";
|
||||
export * from "./channel-lifecycle.core.js";
|
||||
export * from "../channels/draft-stream-controls.js";
|
||||
export * from "../channels/draft-stream-loop.js";
|
||||
export { createRunStateMachine } from "../channels/run-state-machine.js";
|
||||
|
|
@ -7,110 +7,3 @@ export {
|
|||
type ArmableStallWatchdog,
|
||||
type StallWatchdogTimeoutMeta,
|
||||
} from "../channels/transport/stall-watchdog.js";
|
||||
|
||||
type CloseAwareServer = {
|
||||
once: (event: "close", listener: () => void) => unknown;
|
||||
};
|
||||
|
||||
type PassiveAccountLifecycleParams<Handle> = {
|
||||
abortSignal?: AbortSignal;
|
||||
start: () => Promise<Handle>;
|
||||
stop?: (handle: Handle) => void | Promise<void>;
|
||||
onStop?: () => void | Promise<void>;
|
||||
};
|
||||
|
||||
/** Bind a fixed account id into a status writer so lifecycle code can emit partial snapshots. */
|
||||
export function createAccountStatusSink(params: {
|
||||
accountId: string;
|
||||
setStatus: (next: ChannelAccountSnapshot) => void;
|
||||
}): (patch: Omit<ChannelAccountSnapshot, "accountId">) => void {
|
||||
return (patch) => {
|
||||
params.setStatus({ accountId: params.accountId, ...patch });
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a promise that resolves when the signal is aborted.
|
||||
*
|
||||
* If no signal is provided, the promise stays pending forever. When provided,
|
||||
* `onAbort` runs once before the promise resolves.
|
||||
*/
|
||||
export function waitUntilAbort(
|
||||
signal?: AbortSignal,
|
||||
onAbort?: () => void | Promise<void>,
|
||||
): Promise<void> {
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
const complete = () => {
|
||||
Promise.resolve(onAbort?.()).then(() => resolve(), reject);
|
||||
};
|
||||
if (!signal) {
|
||||
return;
|
||||
}
|
||||
if (signal.aborted) {
|
||||
complete();
|
||||
return;
|
||||
}
|
||||
signal.addEventListener("abort", complete, { once: true });
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Keep a passive account task alive until abort, then run optional cleanup.
|
||||
*/
|
||||
export async function runPassiveAccountLifecycle<Handle>(
|
||||
params: PassiveAccountLifecycleParams<Handle>,
|
||||
): Promise<void> {
|
||||
const handle = await params.start();
|
||||
|
||||
try {
|
||||
await waitUntilAbort(params.abortSignal);
|
||||
} finally {
|
||||
await params.stop?.(handle);
|
||||
await params.onStop?.();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Keep a channel/provider task pending until the HTTP server closes.
|
||||
*
|
||||
* When an abort signal is provided, `onAbort` is invoked once and should
|
||||
* trigger server shutdown. The returned promise resolves only after `close`.
|
||||
*/
|
||||
export async function keepHttpServerTaskAlive(params: {
|
||||
server: CloseAwareServer;
|
||||
abortSignal?: AbortSignal;
|
||||
onAbort?: () => void | Promise<void>;
|
||||
}): Promise<void> {
|
||||
const { server, abortSignal, onAbort } = params;
|
||||
let abortTask: Promise<void> = Promise.resolve();
|
||||
let abortTriggered = false;
|
||||
|
||||
const triggerAbort = () => {
|
||||
if (abortTriggered) {
|
||||
return;
|
||||
}
|
||||
abortTriggered = true;
|
||||
abortTask = Promise.resolve(onAbort?.()).then(() => undefined);
|
||||
};
|
||||
|
||||
const onAbortSignal = () => {
|
||||
triggerAbort();
|
||||
};
|
||||
|
||||
if (abortSignal) {
|
||||
if (abortSignal.aborted) {
|
||||
triggerAbort();
|
||||
} else {
|
||||
abortSignal.addEventListener("abort", onAbortSignal, { once: true });
|
||||
}
|
||||
}
|
||||
|
||||
await new Promise<void>((resolve) => {
|
||||
server.once("close", () => resolve());
|
||||
});
|
||||
|
||||
if (abortSignal) {
|
||||
abortSignal.removeEventListener("abort", onAbortSignal);
|
||||
}
|
||||
await abortTask;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -25,4 +25,4 @@ export {
|
|||
createAccountStatusSink,
|
||||
keepHttpServerTaskAlive,
|
||||
waitUntilAbort,
|
||||
} from "./channel-lifecycle.js";
|
||||
} from "./channel-lifecycle.core.js";
|
||||
|
|
|
|||
|
|
@ -23,7 +23,7 @@ export { delegateCompactionToRuntime } from "../context-engine/delegate.js";
|
|||
export type { DiagnosticEventPayload } from "../infra/diagnostic-events.js";
|
||||
export { onDiagnosticEvent } from "../infra/diagnostic-events.js";
|
||||
|
||||
export { createAccountStatusSink } from "./channel-lifecycle.js";
|
||||
export { createAccountStatusSink } from "./channel-lifecycle.core.js";
|
||||
export { createPluginRuntimeStore } from "./runtime-store.js";
|
||||
export { KeyedAsyncQueue } from "./keyed-async-queue.js";
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
import type { z } from "zod";
|
||||
import { runPassiveAccountLifecycle } from "./channel-lifecycle.js";
|
||||
import { runPassiveAccountLifecycle } from "./channel-lifecycle.core.js";
|
||||
import { createLoggerBackedRuntime } from "./runtime.js";
|
||||
export { safeParseJsonWithSchema, safeParseWithSchema } from "../utils/zod-parse.js";
|
||||
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ export {
|
|||
} from "../channels/plugins/directory-config-helpers.js";
|
||||
export { buildComputedAccountStatusSnapshot } from "./status-helpers.js";
|
||||
export { buildChannelConfigSchema } from "../channels/plugins/config-schema.js";
|
||||
export { createAccountStatusSink, runPassiveAccountLifecycle } from "./channel-lifecycle.js";
|
||||
export { createAccountStatusSink, runPassiveAccountLifecycle } from "./channel-lifecycle.core.js";
|
||||
export { formatPairingApproveHint } from "../channels/plugins/helpers.js";
|
||||
export { fetchRemoteMedia } from "../media/fetch.js";
|
||||
export { resolveChannelMediaMaxBytes } from "../channels/plugins/media-limits.js";
|
||||
|
|
|
|||
|
|
@ -58,7 +58,7 @@ export type { PluginRuntime } from "../plugins/runtime/types.js";
|
|||
export type { OpenClawPluginApi } from "../plugins/types.js";
|
||||
export { DEFAULT_ACCOUNT_ID } from "../routing/session-key.js";
|
||||
export type { RuntimeEnv } from "../runtime.js";
|
||||
export { createAccountStatusSink, runPassiveAccountLifecycle } from "./channel-lifecycle.js";
|
||||
export { createAccountStatusSink, runPassiveAccountLifecycle } from "./channel-lifecycle.core.js";
|
||||
export { listIrcAccountIds, resolveDefaultIrcAccountId, resolveIrcAccount } from "./irc-surface.js";
|
||||
export {
|
||||
readStoreAllowFromForDmPolicy,
|
||||
|
|
|
|||
|
|
@ -40,7 +40,7 @@ export {
|
|||
createSetupInputPresenceValidator,
|
||||
migrateBaseNameToDefaultAccount,
|
||||
} from "../channels/plugins/setup-helpers.js";
|
||||
export { createAccountStatusSink } from "./channel-lifecycle.js";
|
||||
export { createAccountStatusSink } from "./channel-lifecycle.core.js";
|
||||
export { buildComputedAccountStatusSnapshot } from "./status-helpers.js";
|
||||
export { createAccountListHelpers } from "../channels/plugins/account-helpers.js";
|
||||
export type {
|
||||
|
|
|
|||
|
|
@ -101,7 +101,7 @@ export { formatDocsLink } from "../terminal/links.js";
|
|||
export { sleep } from "../utils.js";
|
||||
export { loadWebMedia } from "./web-media.js";
|
||||
export type { WizardPrompter } from "../wizard/prompts.js";
|
||||
export { keepHttpServerTaskAlive } from "./channel-lifecycle.js";
|
||||
export { keepHttpServerTaskAlive } from "./channel-lifecycle.core.js";
|
||||
export { withFileLock } from "./file-lock.js";
|
||||
export { dispatchReplyFromConfigWithSettledDispatcher } from "./inbound-reply-dispatch.js";
|
||||
export { readJsonFileWithFallback, writeJsonFileAtomically } from "./json-store.js";
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ import {
|
|||
} from "../infra/heartbeat-wake.js";
|
||||
import { peekSystemEvents, resetSystemEventsForTest } from "../infra/system-events.js";
|
||||
import { withTempDir } from "../test-helpers/temp-dir.js";
|
||||
import { installInMemoryTaskAndFlowRegistryRuntime } from "../test-utils/task-flow-registry-runtime.js";
|
||||
import { createFlowRecord, getFlowById, resetFlowRegistryForTests } from "./flow-registry.js";
|
||||
import {
|
||||
createTaskRecord,
|
||||
|
|
@ -123,8 +124,8 @@ describe("task-registry", () => {
|
|||
}
|
||||
resetSystemEventsForTest();
|
||||
resetHeartbeatWakeStateForTests();
|
||||
resetTaskRegistryForTests();
|
||||
resetFlowRegistryForTests();
|
||||
resetTaskRegistryForTests({ persist: false });
|
||||
resetFlowRegistryForTests({ persist: false });
|
||||
hoisted.sendMessageMock.mockReset();
|
||||
hoisted.cancelSessionMock.mockReset();
|
||||
hoisted.killSubagentRunAdminMock.mockReset();
|
||||
|
|
@ -699,7 +700,9 @@ describe("task-registry", () => {
|
|||
it("adopts parent flow linkage when collapsing onto an earlier ACP record", async () => {
|
||||
await withTaskRegistryTempDir(async (root) => {
|
||||
process.env.OPENCLAW_STATE_DIR = root;
|
||||
resetTaskRegistryForTests();
|
||||
resetTaskRegistryForTests({ persist: false });
|
||||
resetFlowRegistryForTests({ persist: false });
|
||||
installInMemoryTaskAndFlowRegistryRuntime();
|
||||
|
||||
const directTask = createTaskRecord({
|
||||
runtime: "acp",
|
||||
|
|
|
|||
|
|
@ -2,37 +2,21 @@ import fs from "node:fs/promises";
|
|||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { resetSessionWriteLockStateForTest } from "../agents/session-write-lock.js";
|
||||
import {
|
||||
clearSessionStoreCacheForTest,
|
||||
getSessionStoreLockQueueSizeForTest,
|
||||
resetSessionStoreLockRuntimeForTests,
|
||||
setSessionWriteLockAcquirerForTests,
|
||||
withSessionStoreLockForTest,
|
||||
} from "../config/sessions/store.js";
|
||||
import { resetFileLockStateForTest } from "../infra/file-lock.js";
|
||||
import { cleanupSessionStateForTest } from "./session-state-cleanup.js";
|
||||
|
||||
const acquireSessionWriteLockMock = vi.hoisted(() =>
|
||||
vi.fn(async () => ({ release: vi.fn(async () => {}) })),
|
||||
);
|
||||
|
||||
let cleanupSessionStateForTest: typeof import("./session-state-cleanup.js").cleanupSessionStateForTest;
|
||||
let withSessionStoreLockForTest: typeof import("../config/sessions/store.js").withSessionStoreLockForTest;
|
||||
let getSessionStoreLockQueueSizeForTest: typeof import("../config/sessions/store.js").getSessionStoreLockQueueSizeForTest;
|
||||
let clearSessionStoreCacheForTest: typeof import("../config/sessions/store.js").clearSessionStoreCacheForTest;
|
||||
let resetFileLockStateForTest: typeof import("../infra/file-lock.js").resetFileLockStateForTest;
|
||||
let resetSessionWriteLockStateForTest: typeof import("../agents/session-write-lock.js").resetSessionWriteLockStateForTest;
|
||||
|
||||
async function loadFreshSessionCleanupModules() {
|
||||
vi.resetModules();
|
||||
vi.doMock("../agents/session-write-lock.js", async (importOriginal) => {
|
||||
const original = await importOriginal<typeof import("../agents/session-write-lock.js")>();
|
||||
return {
|
||||
...original,
|
||||
acquireSessionWriteLock: acquireSessionWriteLockMock,
|
||||
};
|
||||
});
|
||||
({
|
||||
withSessionStoreLockForTest,
|
||||
getSessionStoreLockQueueSizeForTest,
|
||||
clearSessionStoreCacheForTest,
|
||||
} = await import("../config/sessions/store.js"));
|
||||
({ cleanupSessionStateForTest } = await import("./session-state-cleanup.js"));
|
||||
({ resetFileLockStateForTest } = await import("../infra/file-lock.js"));
|
||||
({ resetSessionWriteLockStateForTest } = await import("../agents/session-write-lock.js"));
|
||||
}
|
||||
|
||||
function createDeferred<T>() {
|
||||
let resolve!: (value: T | PromiseLike<T>) => void;
|
||||
let reject!: (reason?: unknown) => void;
|
||||
|
|
@ -44,13 +28,13 @@ function createDeferred<T>() {
|
|||
}
|
||||
|
||||
describe("cleanupSessionStateForTest", () => {
|
||||
beforeEach(async () => {
|
||||
await loadFreshSessionCleanupModules();
|
||||
beforeEach(() => {
|
||||
vi.useRealTimers();
|
||||
clearSessionStoreCacheForTest();
|
||||
resetFileLockStateForTest();
|
||||
resetSessionWriteLockStateForTest();
|
||||
acquireSessionWriteLockMock.mockClear();
|
||||
setSessionWriteLockAcquirerForTests(acquireSessionWriteLockMock);
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
|
|
@ -58,8 +42,8 @@ describe("cleanupSessionStateForTest", () => {
|
|||
clearSessionStoreCacheForTest();
|
||||
resetFileLockStateForTest();
|
||||
resetSessionWriteLockStateForTest();
|
||||
resetSessionStoreLockRuntimeForTests();
|
||||
vi.restoreAllMocks();
|
||||
vi.doUnmock("../agents/session-write-lock.js");
|
||||
});
|
||||
|
||||
it("waits for in-flight session store locks before clearing test state", async () => {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,107 @@
|
|||
import {
|
||||
configureFlowRegistryRuntime,
|
||||
type FlowRegistryStore,
|
||||
type FlowRegistryStoreSnapshot,
|
||||
} from "../tasks/flow-registry.store.js";
|
||||
import type { FlowRecord } from "../tasks/flow-registry.types.js";
|
||||
import {
|
||||
configureTaskRegistryRuntime,
|
||||
type TaskRegistryStore,
|
||||
type TaskRegistryStoreSnapshot,
|
||||
} from "../tasks/task-registry.store.js";
|
||||
import type { TaskDeliveryState, TaskRecord } from "../tasks/task-registry.types.js";
|
||||
|
||||
function cloneTask(task: TaskRecord): TaskRecord {
|
||||
return { ...task };
|
||||
}
|
||||
|
||||
function cloneDeliveryState(state: TaskDeliveryState): TaskDeliveryState {
|
||||
return {
|
||||
...state,
|
||||
...(state.requesterOrigin ? { requesterOrigin: { ...state.requesterOrigin } } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
function cloneFlow(flow: FlowRecord): FlowRecord {
|
||||
return {
|
||||
...flow,
|
||||
...(flow.requesterOrigin ? { requesterOrigin: { ...flow.requesterOrigin } } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
export function installInMemoryTaskAndFlowRegistryRuntime(): {
|
||||
taskStore: TaskRegistryStore;
|
||||
flowStore: FlowRegistryStore;
|
||||
} {
|
||||
let taskSnapshot: TaskRegistryStoreSnapshot = {
|
||||
tasks: new Map<string, TaskRecord>(),
|
||||
deliveryStates: new Map<string, TaskDeliveryState>(),
|
||||
};
|
||||
let flowSnapshot: FlowRegistryStoreSnapshot = {
|
||||
flows: new Map<string, FlowRecord>(),
|
||||
};
|
||||
|
||||
const taskStore: TaskRegistryStore = {
|
||||
loadSnapshot: () => ({
|
||||
tasks: new Map(
|
||||
[...taskSnapshot.tasks.entries()].map(([taskId, task]) => [taskId, cloneTask(task)]),
|
||||
),
|
||||
deliveryStates: new Map(
|
||||
[...taskSnapshot.deliveryStates.entries()].map(([taskId, state]) => [
|
||||
taskId,
|
||||
cloneDeliveryState(state),
|
||||
]),
|
||||
),
|
||||
}),
|
||||
saveSnapshot: (snapshot) => {
|
||||
taskSnapshot = {
|
||||
tasks: new Map(
|
||||
[...snapshot.tasks.entries()].map(([taskId, task]) => [taskId, cloneTask(task)]),
|
||||
),
|
||||
deliveryStates: new Map(
|
||||
[...snapshot.deliveryStates.entries()].map(([taskId, state]) => [
|
||||
taskId,
|
||||
cloneDeliveryState(state),
|
||||
]),
|
||||
),
|
||||
};
|
||||
},
|
||||
upsertTask: (task) => {
|
||||
taskSnapshot.tasks.set(task.taskId, cloneTask(task));
|
||||
},
|
||||
deleteTask: (taskId) => {
|
||||
taskSnapshot.tasks.delete(taskId);
|
||||
},
|
||||
upsertDeliveryState: (state) => {
|
||||
taskSnapshot.deliveryStates.set(state.taskId, cloneDeliveryState(state));
|
||||
},
|
||||
deleteDeliveryState: (taskId) => {
|
||||
taskSnapshot.deliveryStates.delete(taskId);
|
||||
},
|
||||
};
|
||||
|
||||
const flowStore: FlowRegistryStore = {
|
||||
loadSnapshot: () => ({
|
||||
flows: new Map(
|
||||
[...flowSnapshot.flows.entries()].map(([flowId, flow]) => [flowId, cloneFlow(flow)]),
|
||||
),
|
||||
}),
|
||||
saveSnapshot: (snapshot) => {
|
||||
flowSnapshot = {
|
||||
flows: new Map(
|
||||
[...snapshot.flows.entries()].map(([flowId, flow]) => [flowId, cloneFlow(flow)]),
|
||||
),
|
||||
};
|
||||
},
|
||||
upsertFlow: (flow) => {
|
||||
flowSnapshot.flows.set(flow.flowId, cloneFlow(flow));
|
||||
},
|
||||
deleteFlow: (flowId) => {
|
||||
flowSnapshot.flows.delete(flowId);
|
||||
},
|
||||
};
|
||||
|
||||
configureTaskRegistryRuntime({ store: taskStore });
|
||||
configureFlowRegistryRuntime({ store: flowStore });
|
||||
return { taskStore, flowStore };
|
||||
}
|
||||
Loading…
Reference in New Issue