diff --git a/src/acp/control-plane/manager.test.ts b/src/acp/control-plane/manager.test.ts index 628956570dd..03027a1509b 100644 --- a/src/acp/control-plane/manager.test.ts +++ b/src/acp/control-plane/manager.test.ts @@ -3,8 +3,6 @@ import { setTimeout as sleep } from "node:timers/promises"; import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; import type { OpenClawConfig } from "../../config/config.js"; import type { AcpSessionRuntimeOptions, SessionAcpMeta } from "../../config/sessions/types.js"; -import { resetFlowRegistryForTests } from "../../tasks/flow-registry.js"; -import { findTaskByRunId, resetTaskRegistryForTests } from "../../tasks/task-registry.js"; import { withTempDir } from "../../test-helpers/temp-dir.js"; import type { AcpRuntime, AcpRuntimeCapabilities } from "../runtime/types.js"; @@ -39,6 +37,10 @@ vi.mock("../runtime/registry.js", async (importOriginal) => { let AcpSessionManager: typeof import("./manager.js").AcpSessionManager; let AcpRuntimeError: typeof import("../runtime/errors.js").AcpRuntimeError; let resetAcpSessionManagerForTests: typeof import("./manager.js").__testing.resetAcpSessionManagerForTests; +let findTaskByRunId: typeof import("../../tasks/task-registry.js").findTaskByRunId; +let resetTaskRegistryForTests: typeof import("../../tasks/task-registry.js").resetTaskRegistryForTests; +let resetFlowRegistryForTests: typeof import("../../tasks/flow-registry.js").resetFlowRegistryForTests; +let installInMemoryTaskAndFlowRegistryRuntime: typeof import("../../test-utils/task-flow-registry-runtime.js").installInMemoryTaskAndFlowRegistryRuntime; const baseCfg = { acp: { @@ -52,13 +54,14 @@ const ORIGINAL_STATE_DIR = process.env.OPENCLAW_STATE_DIR; async function withAcpManagerTaskStateDir(run: (root: string) => Promise): Promise { await withTempDir({ prefix: "openclaw-acp-manager-task-" }, async (root) => { process.env.OPENCLAW_STATE_DIR = root; - resetTaskRegistryForTests(); - resetFlowRegistryForTests(); + resetTaskRegistryForTests({ persist: false }); + resetFlowRegistryForTests({ persist: false }); + installInMemoryTaskAndFlowRegistryRuntime(); try { await run(root); } finally { - resetTaskRegistryForTests(); - resetFlowRegistryForTests(); + resetTaskRegistryForTests({ persist: false }); + resetFlowRegistryForTests({ persist: false }); } }); } @@ -190,6 +193,10 @@ describe("AcpSessionManager", () => { __testing: { resetAcpSessionManagerForTests }, } = await import("./manager.js")); ({ AcpRuntimeError } = await import("../runtime/errors.js")); + ({ findTaskByRunId, resetTaskRegistryForTests } = await import("../../tasks/task-registry.js")); + ({ resetFlowRegistryForTests } = await import("../../tasks/flow-registry.js")); + ({ installInMemoryTaskAndFlowRegistryRuntime } = + await import("../../test-utils/task-flow-registry-runtime.js")); }); beforeEach(() => { @@ -207,8 +214,8 @@ describe("AcpSessionManager", () => { } else { process.env.OPENCLAW_STATE_DIR = ORIGINAL_STATE_DIR; } - resetTaskRegistryForTests(); - resetFlowRegistryForTests(); + resetTaskRegistryForTests({ persist: false }); + resetFlowRegistryForTests({ persist: false }); }); it("marks ACP-shaped sessions without metadata as stale", () => { diff --git a/src/config/sessions/store.lock.test.ts b/src/config/sessions/store.lock.test.ts index c19816dc852..b8def5188c9 100644 --- a/src/config/sessions/store.lock.test.ts +++ b/src/config/sessions/store.lock.test.ts @@ -1,32 +1,24 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { + clearSessionStoreCacheForTest, + resetSessionStoreLockRuntimeForTests, + setSessionWriteLockAcquirerForTests, + withSessionStoreLockForTest, +} from "./store.js"; const acquireSessionWriteLockMock = vi.hoisted(() => vi.fn(async () => ({ release: vi.fn(async () => {}) })), ); -let withSessionStoreLockForTest: typeof import("./store.js").withSessionStoreLockForTest; -let clearSessionStoreCacheForTest: typeof import("./store.js").clearSessionStoreCacheForTest; - -async function loadFreshStoreModule() { - vi.resetModules(); - vi.doMock("../../agents/session-write-lock.js", async (importOriginal) => { - const original = await importOriginal(); - return { - ...original, - acquireSessionWriteLock: acquireSessionWriteLockMock, - }; - }); - ({ withSessionStoreLockForTest, clearSessionStoreCacheForTest } = await import("./store.js")); -} - describe("withSessionStoreLock", () => { - beforeEach(async () => { + beforeEach(() => { acquireSessionWriteLockMock.mockClear(); - await loadFreshStoreModule(); + setSessionWriteLockAcquirerForTests(acquireSessionWriteLockMock); }); afterEach(() => { clearSessionStoreCacheForTest(); + resetSessionStoreLockRuntimeForTests(); vi.restoreAllMocks(); }); diff --git a/src/config/sessions/store.ts b/src/config/sessions/store.ts index 61018b63aea..48b54d7590d 100644 --- a/src/config/sessions/store.ts +++ b/src/config/sessions/store.ts @@ -47,6 +47,7 @@ const log = createSubsystemLogger("sessions/store"); let sessionArchiveRuntimePromise: Promise< typeof import("../../gateway/session-archive.runtime.js") > | null = null; +let sessionWriteLockAcquirerForTests: typeof acquireSessionWriteLock | null = null; function loadSessionArchiveRuntime() { sessionArchiveRuntimePromise ??= import("../../gateway/session-archive.runtime.js"); @@ -166,6 +167,16 @@ export function clearSessionStoreCacheForTest(): void { LOCK_QUEUES.clear(); } +export function setSessionWriteLockAcquirerForTests( + acquirer: typeof acquireSessionWriteLock | null, +): void { + sessionWriteLockAcquirerForTests = acquirer; +} + +export function resetSessionStoreLockRuntimeForTests(): void { + sessionWriteLockAcquirerForTests = null; +} + export async function drainSessionStoreLockQueuesForTest(): Promise { while (LOCK_QUEUES.size > 0) { const queues = [...LOCK_QUEUES.values()]; @@ -764,7 +775,7 @@ async function drainSessionStoreLockQueue(storePath: string): Promise { let failed: unknown; let hasFailure = false; try { - lock = await acquireSessionWriteLock({ + lock = await (sessionWriteLockAcquirerForTests ?? acquireSessionWriteLock)({ sessionFile: storePath, timeoutMs: remainingTimeoutMs, staleMs: task.staleMs, diff --git a/src/mcp/channel-server.test.ts b/src/mcp/channel-server.test.ts index 76c565f2bd3..7f7a0054799 100644 --- a/src/mcp/channel-server.test.ts +++ b/src/mcp/channel-server.test.ts @@ -113,7 +113,7 @@ async function connectMcp(params: { describe("openclaw channel mcp server", () => { describe("gateway-backed flows", () => { - installGatewayTestHooks({ scope: "suite" }); + installGatewayTestHooks(); test("lists conversations, reads messages, and waits for events", async () => { const storePath = await createSessionStoreFile(); diff --git a/src/plugin-sdk/channel-lifecycle.core.ts b/src/plugin-sdk/channel-lifecycle.core.ts new file mode 100644 index 00000000000..28045aeb058 --- /dev/null +++ b/src/plugin-sdk/channel-lifecycle.core.ts @@ -0,0 +1,108 @@ +import type { ChannelAccountSnapshot } from "../channels/plugins/types.core.js"; + +type CloseAwareServer = { + once: (event: "close", listener: () => void) => unknown; +}; + +type PassiveAccountLifecycleParams = { + abortSignal?: AbortSignal; + start: () => Promise; + stop?: (handle: Handle) => void | Promise; + onStop?: () => void | Promise; +}; + +/** Bind a fixed account id into a status writer so lifecycle code can emit partial snapshots. */ +export function createAccountStatusSink(params: { + accountId: string; + setStatus: (next: ChannelAccountSnapshot) => void; +}): (patch: Omit) => void { + return (patch) => { + params.setStatus({ accountId: params.accountId, ...patch }); + }; +} + +/** + * Return a promise that resolves when the signal is aborted. + * + * If no signal is provided, the promise stays pending forever. When provided, + * `onAbort` runs once before the promise resolves. + */ +export function waitUntilAbort( + signal?: AbortSignal, + onAbort?: () => void | Promise, +): Promise { + return new Promise((resolve, reject) => { + const complete = () => { + Promise.resolve(onAbort?.()).then(() => resolve(), reject); + }; + if (!signal) { + return; + } + if (signal.aborted) { + complete(); + return; + } + signal.addEventListener("abort", complete, { once: true }); + }); +} + +/** + * Keep a passive account task alive until abort, then run optional cleanup. + */ +export async function runPassiveAccountLifecycle( + params: PassiveAccountLifecycleParams, +): Promise { + const handle = await params.start(); + + try { + await waitUntilAbort(params.abortSignal); + } finally { + await params.stop?.(handle); + await params.onStop?.(); + } +} + +/** + * Keep a channel/provider task pending until the HTTP server closes. + * + * When an abort signal is provided, `onAbort` is invoked once and should + * trigger server shutdown. The returned promise resolves only after `close`. + */ +export async function keepHttpServerTaskAlive(params: { + server: CloseAwareServer; + abortSignal?: AbortSignal; + onAbort?: () => void | Promise; +}): Promise { + const { server, abortSignal, onAbort } = params; + let abortTask: Promise = Promise.resolve(); + let abortTriggered = false; + + const triggerAbort = () => { + if (abortTriggered) { + return; + } + abortTriggered = true; + abortTask = Promise.resolve(onAbort?.()).then(() => undefined); + }; + + const onAbortSignal = () => { + triggerAbort(); + }; + + if (abortSignal) { + if (abortSignal.aborted) { + triggerAbort(); + } else { + abortSignal.addEventListener("abort", onAbortSignal, { once: true }); + } + } + + await new Promise((resolve) => { + server.once("close", () => resolve()); + }); + + if (abortSignal) { + abortSignal.removeEventListener("abort", onAbortSignal); + } + await abortTask; +} diff --git a/src/plugin-sdk/channel-lifecycle.test.ts b/src/plugin-sdk/channel-lifecycle.test.ts index e4bd18ecf68..8534fbe9bbb 100644 --- a/src/plugin-sdk/channel-lifecycle.test.ts +++ b/src/plugin-sdk/channel-lifecycle.test.ts @@ -5,7 +5,7 @@ import { keepHttpServerTaskAlive, runPassiveAccountLifecycle, waitUntilAbort, -} from "./channel-lifecycle.js"; +} from "./channel-lifecycle.core.js"; type FakeServer = EventEmitter & { close: (callback?: () => void) => void; diff --git a/src/plugin-sdk/channel-lifecycle.ts b/src/plugin-sdk/channel-lifecycle.ts index 96a031ce5b7..f078925e5aa 100644 --- a/src/plugin-sdk/channel-lifecycle.ts +++ b/src/plugin-sdk/channel-lifecycle.ts @@ -1,4 +1,4 @@ -import type { ChannelAccountSnapshot } from "../channels/plugins/types.core.js"; +export * from "./channel-lifecycle.core.js"; export * from "../channels/draft-stream-controls.js"; export * from "../channels/draft-stream-loop.js"; export { createRunStateMachine } from "../channels/run-state-machine.js"; @@ -7,110 +7,3 @@ export { type ArmableStallWatchdog, type StallWatchdogTimeoutMeta, } from "../channels/transport/stall-watchdog.js"; - -type CloseAwareServer = { - once: (event: "close", listener: () => void) => unknown; -}; - -type PassiveAccountLifecycleParams = { - abortSignal?: AbortSignal; - start: () => Promise; - stop?: (handle: Handle) => void | Promise; - onStop?: () => void | Promise; -}; - -/** Bind a fixed account id into a status writer so lifecycle code can emit partial snapshots. */ -export function createAccountStatusSink(params: { - accountId: string; - setStatus: (next: ChannelAccountSnapshot) => void; -}): (patch: Omit) => void { - return (patch) => { - params.setStatus({ accountId: params.accountId, ...patch }); - }; -} - -/** - * Return a promise that resolves when the signal is aborted. - * - * If no signal is provided, the promise stays pending forever. When provided, - * `onAbort` runs once before the promise resolves. - */ -export function waitUntilAbort( - signal?: AbortSignal, - onAbort?: () => void | Promise, -): Promise { - return new Promise((resolve, reject) => { - const complete = () => { - Promise.resolve(onAbort?.()).then(() => resolve(), reject); - }; - if (!signal) { - return; - } - if (signal.aborted) { - complete(); - return; - } - signal.addEventListener("abort", complete, { once: true }); - }); -} - -/** - * Keep a passive account task alive until abort, then run optional cleanup. - */ -export async function runPassiveAccountLifecycle( - params: PassiveAccountLifecycleParams, -): Promise { - const handle = await params.start(); - - try { - await waitUntilAbort(params.abortSignal); - } finally { - await params.stop?.(handle); - await params.onStop?.(); - } -} - -/** - * Keep a channel/provider task pending until the HTTP server closes. - * - * When an abort signal is provided, `onAbort` is invoked once and should - * trigger server shutdown. The returned promise resolves only after `close`. - */ -export async function keepHttpServerTaskAlive(params: { - server: CloseAwareServer; - abortSignal?: AbortSignal; - onAbort?: () => void | Promise; -}): Promise { - const { server, abortSignal, onAbort } = params; - let abortTask: Promise = Promise.resolve(); - let abortTriggered = false; - - const triggerAbort = () => { - if (abortTriggered) { - return; - } - abortTriggered = true; - abortTask = Promise.resolve(onAbort?.()).then(() => undefined); - }; - - const onAbortSignal = () => { - triggerAbort(); - }; - - if (abortSignal) { - if (abortSignal.aborted) { - triggerAbort(); - } else { - abortSignal.addEventListener("abort", onAbortSignal, { once: true }); - } - } - - await new Promise((resolve) => { - server.once("close", () => resolve()); - }); - - if (abortSignal) { - abortSignal.removeEventListener("abort", onAbortSignal); - } - await abortTask; -} diff --git a/src/plugin-sdk/channel-runtime.ts b/src/plugin-sdk/channel-runtime.ts index fb3831c1bc2..ffe4bcd2295 100644 --- a/src/plugin-sdk/channel-runtime.ts +++ b/src/plugin-sdk/channel-runtime.ts @@ -25,4 +25,4 @@ export { createAccountStatusSink, keepHttpServerTaskAlive, waitUntilAbort, -} from "./channel-lifecycle.js"; +} from "./channel-lifecycle.core.js"; diff --git a/src/plugin-sdk/compat.ts b/src/plugin-sdk/compat.ts index 1389c0f6149..cb4d35baf6a 100644 --- a/src/plugin-sdk/compat.ts +++ b/src/plugin-sdk/compat.ts @@ -23,7 +23,7 @@ export { delegateCompactionToRuntime } from "../context-engine/delegate.js"; export type { DiagnosticEventPayload } from "../infra/diagnostic-events.js"; export { onDiagnosticEvent } from "../infra/diagnostic-events.js"; -export { createAccountStatusSink } from "./channel-lifecycle.js"; +export { createAccountStatusSink } from "./channel-lifecycle.core.js"; export { createPluginRuntimeStore } from "./runtime-store.js"; export { KeyedAsyncQueue } from "./keyed-async-queue.js"; diff --git a/src/plugin-sdk/extension-shared.ts b/src/plugin-sdk/extension-shared.ts index af754c0385b..346721df023 100644 --- a/src/plugin-sdk/extension-shared.ts +++ b/src/plugin-sdk/extension-shared.ts @@ -1,5 +1,5 @@ import type { z } from "zod"; -import { runPassiveAccountLifecycle } from "./channel-lifecycle.js"; +import { runPassiveAccountLifecycle } from "./channel-lifecycle.core.js"; import { createLoggerBackedRuntime } from "./runtime.js"; export { safeParseJsonWithSchema, safeParseWithSchema } from "../utils/zod-parse.js"; diff --git a/src/plugin-sdk/googlechat.ts b/src/plugin-sdk/googlechat.ts index 19e71e5f6fe..56b0f43919a 100644 --- a/src/plugin-sdk/googlechat.ts +++ b/src/plugin-sdk/googlechat.ts @@ -22,7 +22,7 @@ export { } from "../channels/plugins/directory-config-helpers.js"; export { buildComputedAccountStatusSnapshot } from "./status-helpers.js"; export { buildChannelConfigSchema } from "../channels/plugins/config-schema.js"; -export { createAccountStatusSink, runPassiveAccountLifecycle } from "./channel-lifecycle.js"; +export { createAccountStatusSink, runPassiveAccountLifecycle } from "./channel-lifecycle.core.js"; export { formatPairingApproveHint } from "../channels/plugins/helpers.js"; export { fetchRemoteMedia } from "../media/fetch.js"; export { resolveChannelMediaMaxBytes } from "../channels/plugins/media-limits.js"; diff --git a/src/plugin-sdk/irc.ts b/src/plugin-sdk/irc.ts index 4d4105c8bc7..5d3d65c0f0a 100644 --- a/src/plugin-sdk/irc.ts +++ b/src/plugin-sdk/irc.ts @@ -58,7 +58,7 @@ export type { PluginRuntime } from "../plugins/runtime/types.js"; export type { OpenClawPluginApi } from "../plugins/types.js"; export { DEFAULT_ACCOUNT_ID } from "../routing/session-key.js"; export type { RuntimeEnv } from "../runtime.js"; -export { createAccountStatusSink, runPassiveAccountLifecycle } from "./channel-lifecycle.js"; +export { createAccountStatusSink, runPassiveAccountLifecycle } from "./channel-lifecycle.core.js"; export { listIrcAccountIds, resolveDefaultIrcAccountId, resolveIrcAccount } from "./irc-surface.js"; export { readStoreAllowFromForDmPolicy, diff --git a/src/plugin-sdk/mattermost.ts b/src/plugin-sdk/mattermost.ts index e9df483ad53..1e09ad63ccb 100644 --- a/src/plugin-sdk/mattermost.ts +++ b/src/plugin-sdk/mattermost.ts @@ -40,7 +40,7 @@ export { createSetupInputPresenceValidator, migrateBaseNameToDefaultAccount, } from "../channels/plugins/setup-helpers.js"; -export { createAccountStatusSink } from "./channel-lifecycle.js"; +export { createAccountStatusSink } from "./channel-lifecycle.core.js"; export { buildComputedAccountStatusSnapshot } from "./status-helpers.js"; export { createAccountListHelpers } from "../channels/plugins/account-helpers.js"; export type { diff --git a/src/plugin-sdk/msteams.ts b/src/plugin-sdk/msteams.ts index 043f4f378db..cf0089a3259 100644 --- a/src/plugin-sdk/msteams.ts +++ b/src/plugin-sdk/msteams.ts @@ -101,7 +101,7 @@ export { formatDocsLink } from "../terminal/links.js"; export { sleep } from "../utils.js"; export { loadWebMedia } from "./web-media.js"; export type { WizardPrompter } from "../wizard/prompts.js"; -export { keepHttpServerTaskAlive } from "./channel-lifecycle.js"; +export { keepHttpServerTaskAlive } from "./channel-lifecycle.core.js"; export { withFileLock } from "./file-lock.js"; export { dispatchReplyFromConfigWithSettledDispatcher } from "./inbound-reply-dispatch.js"; export { readJsonFileWithFallback, writeJsonFileAtomically } from "./json-store.js"; diff --git a/src/tasks/task-registry.test.ts b/src/tasks/task-registry.test.ts index e8e1fc16aed..23408636d2d 100644 --- a/src/tasks/task-registry.test.ts +++ b/src/tasks/task-registry.test.ts @@ -7,6 +7,7 @@ import { } from "../infra/heartbeat-wake.js"; import { peekSystemEvents, resetSystemEventsForTest } from "../infra/system-events.js"; import { withTempDir } from "../test-helpers/temp-dir.js"; +import { installInMemoryTaskAndFlowRegistryRuntime } from "../test-utils/task-flow-registry-runtime.js"; import { createFlowRecord, getFlowById, resetFlowRegistryForTests } from "./flow-registry.js"; import { createTaskRecord, @@ -123,8 +124,8 @@ describe("task-registry", () => { } resetSystemEventsForTest(); resetHeartbeatWakeStateForTests(); - resetTaskRegistryForTests(); - resetFlowRegistryForTests(); + resetTaskRegistryForTests({ persist: false }); + resetFlowRegistryForTests({ persist: false }); hoisted.sendMessageMock.mockReset(); hoisted.cancelSessionMock.mockReset(); hoisted.killSubagentRunAdminMock.mockReset(); @@ -699,7 +700,9 @@ describe("task-registry", () => { it("adopts parent flow linkage when collapsing onto an earlier ACP record", async () => { await withTaskRegistryTempDir(async (root) => { process.env.OPENCLAW_STATE_DIR = root; - resetTaskRegistryForTests(); + resetTaskRegistryForTests({ persist: false }); + resetFlowRegistryForTests({ persist: false }); + installInMemoryTaskAndFlowRegistryRuntime(); const directTask = createTaskRecord({ runtime: "acp", diff --git a/src/test-utils/session-state-cleanup.test.ts b/src/test-utils/session-state-cleanup.test.ts index 40598519329..2449dd0e80f 100644 --- a/src/test-utils/session-state-cleanup.test.ts +++ b/src/test-utils/session-state-cleanup.test.ts @@ -2,37 +2,21 @@ import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { resetSessionWriteLockStateForTest } from "../agents/session-write-lock.js"; +import { + clearSessionStoreCacheForTest, + getSessionStoreLockQueueSizeForTest, + resetSessionStoreLockRuntimeForTests, + setSessionWriteLockAcquirerForTests, + withSessionStoreLockForTest, +} from "../config/sessions/store.js"; +import { resetFileLockStateForTest } from "../infra/file-lock.js"; +import { cleanupSessionStateForTest } from "./session-state-cleanup.js"; const acquireSessionWriteLockMock = vi.hoisted(() => vi.fn(async () => ({ release: vi.fn(async () => {}) })), ); -let cleanupSessionStateForTest: typeof import("./session-state-cleanup.js").cleanupSessionStateForTest; -let withSessionStoreLockForTest: typeof import("../config/sessions/store.js").withSessionStoreLockForTest; -let getSessionStoreLockQueueSizeForTest: typeof import("../config/sessions/store.js").getSessionStoreLockQueueSizeForTest; -let clearSessionStoreCacheForTest: typeof import("../config/sessions/store.js").clearSessionStoreCacheForTest; -let resetFileLockStateForTest: typeof import("../infra/file-lock.js").resetFileLockStateForTest; -let resetSessionWriteLockStateForTest: typeof import("../agents/session-write-lock.js").resetSessionWriteLockStateForTest; - -async function loadFreshSessionCleanupModules() { - vi.resetModules(); - vi.doMock("../agents/session-write-lock.js", async (importOriginal) => { - const original = await importOriginal(); - return { - ...original, - acquireSessionWriteLock: acquireSessionWriteLockMock, - }; - }); - ({ - withSessionStoreLockForTest, - getSessionStoreLockQueueSizeForTest, - clearSessionStoreCacheForTest, - } = await import("../config/sessions/store.js")); - ({ cleanupSessionStateForTest } = await import("./session-state-cleanup.js")); - ({ resetFileLockStateForTest } = await import("../infra/file-lock.js")); - ({ resetSessionWriteLockStateForTest } = await import("../agents/session-write-lock.js")); -} - function createDeferred() { let resolve!: (value: T | PromiseLike) => void; let reject!: (reason?: unknown) => void; @@ -44,13 +28,13 @@ function createDeferred() { } describe("cleanupSessionStateForTest", () => { - beforeEach(async () => { - await loadFreshSessionCleanupModules(); + beforeEach(() => { vi.useRealTimers(); clearSessionStoreCacheForTest(); resetFileLockStateForTest(); resetSessionWriteLockStateForTest(); acquireSessionWriteLockMock.mockClear(); + setSessionWriteLockAcquirerForTests(acquireSessionWriteLockMock); }); afterEach(() => { @@ -58,8 +42,8 @@ describe("cleanupSessionStateForTest", () => { clearSessionStoreCacheForTest(); resetFileLockStateForTest(); resetSessionWriteLockStateForTest(); + resetSessionStoreLockRuntimeForTests(); vi.restoreAllMocks(); - vi.doUnmock("../agents/session-write-lock.js"); }); it("waits for in-flight session store locks before clearing test state", async () => { diff --git a/src/test-utils/task-flow-registry-runtime.ts b/src/test-utils/task-flow-registry-runtime.ts new file mode 100644 index 00000000000..371c264c102 --- /dev/null +++ b/src/test-utils/task-flow-registry-runtime.ts @@ -0,0 +1,107 @@ +import { + configureFlowRegistryRuntime, + type FlowRegistryStore, + type FlowRegistryStoreSnapshot, +} from "../tasks/flow-registry.store.js"; +import type { FlowRecord } from "../tasks/flow-registry.types.js"; +import { + configureTaskRegistryRuntime, + type TaskRegistryStore, + type TaskRegistryStoreSnapshot, +} from "../tasks/task-registry.store.js"; +import type { TaskDeliveryState, TaskRecord } from "../tasks/task-registry.types.js"; + +function cloneTask(task: TaskRecord): TaskRecord { + return { ...task }; +} + +function cloneDeliveryState(state: TaskDeliveryState): TaskDeliveryState { + return { + ...state, + ...(state.requesterOrigin ? { requesterOrigin: { ...state.requesterOrigin } } : {}), + }; +} + +function cloneFlow(flow: FlowRecord): FlowRecord { + return { + ...flow, + ...(flow.requesterOrigin ? { requesterOrigin: { ...flow.requesterOrigin } } : {}), + }; +} + +export function installInMemoryTaskAndFlowRegistryRuntime(): { + taskStore: TaskRegistryStore; + flowStore: FlowRegistryStore; +} { + let taskSnapshot: TaskRegistryStoreSnapshot = { + tasks: new Map(), + deliveryStates: new Map(), + }; + let flowSnapshot: FlowRegistryStoreSnapshot = { + flows: new Map(), + }; + + const taskStore: TaskRegistryStore = { + loadSnapshot: () => ({ + tasks: new Map( + [...taskSnapshot.tasks.entries()].map(([taskId, task]) => [taskId, cloneTask(task)]), + ), + deliveryStates: new Map( + [...taskSnapshot.deliveryStates.entries()].map(([taskId, state]) => [ + taskId, + cloneDeliveryState(state), + ]), + ), + }), + saveSnapshot: (snapshot) => { + taskSnapshot = { + tasks: new Map( + [...snapshot.tasks.entries()].map(([taskId, task]) => [taskId, cloneTask(task)]), + ), + deliveryStates: new Map( + [...snapshot.deliveryStates.entries()].map(([taskId, state]) => [ + taskId, + cloneDeliveryState(state), + ]), + ), + }; + }, + upsertTask: (task) => { + taskSnapshot.tasks.set(task.taskId, cloneTask(task)); + }, + deleteTask: (taskId) => { + taskSnapshot.tasks.delete(taskId); + }, + upsertDeliveryState: (state) => { + taskSnapshot.deliveryStates.set(state.taskId, cloneDeliveryState(state)); + }, + deleteDeliveryState: (taskId) => { + taskSnapshot.deliveryStates.delete(taskId); + }, + }; + + const flowStore: FlowRegistryStore = { + loadSnapshot: () => ({ + flows: new Map( + [...flowSnapshot.flows.entries()].map(([flowId, flow]) => [flowId, cloneFlow(flow)]), + ), + }), + saveSnapshot: (snapshot) => { + flowSnapshot = { + flows: new Map( + [...snapshot.flows.entries()].map(([flowId, flow]) => [flowId, cloneFlow(flow)]), + ), + }; + }, + upsertFlow: (flow) => { + flowSnapshot.flows.set(flow.flowId, cloneFlow(flow)); + }, + deleteFlow: (flowId) => { + flowSnapshot.flows.delete(flowId); + }, + }; + + configureTaskRegistryRuntime({ store: taskStore }); + configureFlowRegistryRuntime({ store: flowStore }); + return { taskStore, flowStore }; +}