From a9da52da50d22c574cf7e7059e5ee378c2f02f81 Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Tue, 24 Mar 2026 11:45:17 -0700 Subject: [PATCH] refactor(core): make event and queue state lazy --- src/infra/agent-events.ts | 22 ++++++++++++++-------- src/process/command-queue.ts | 27 +++++++++++++++++---------- 2 files changed, 31 insertions(+), 18 deletions(-) diff --git a/src/infra/agent-events.ts b/src/infra/agent-events.ts index c7f4ba5d205..1884774a3c2 100644 --- a/src/infra/agent-events.ts +++ b/src/infra/agent-events.ts @@ -29,16 +29,19 @@ type AgentEventState = { const AGENT_EVENT_STATE_KEY = Symbol.for("openclaw.agentEvents.state"); -const state = resolveGlobalSingleton(AGENT_EVENT_STATE_KEY, () => ({ - seqByRun: new Map(), - listeners: new Set<(evt: AgentEventPayload) => void>(), - runContextById: new Map(), -})); +function getAgentEventState(): AgentEventState { + return resolveGlobalSingleton(AGENT_EVENT_STATE_KEY, () => ({ + seqByRun: new Map(), + listeners: new Set<(evt: AgentEventPayload) => void>(), + runContextById: new Map(), + })); +} export function registerAgentRunContext(runId: string, context: AgentRunContext) { if (!runId) { return; } + const state = getAgentEventState(); const existing = state.runContextById.get(runId); if (!existing) { state.runContextById.set(runId, { ...context }); @@ -59,18 +62,19 @@ export function registerAgentRunContext(runId: string, context: AgentRunContext) } export function getAgentRunContext(runId: string) { - return state.runContextById.get(runId); + return getAgentEventState().runContextById.get(runId); } export function clearAgentRunContext(runId: string) { - state.runContextById.delete(runId); + getAgentEventState().runContextById.delete(runId); } export function resetAgentRunContextForTest() { - state.runContextById.clear(); + getAgentEventState().runContextById.clear(); } export function emitAgentEvent(event: Omit) { + const state = getAgentEventState(); const nextSeq = (state.seqByRun.get(event.runId) ?? 0) + 1; state.seqByRun.set(event.runId, nextSeq); const context = state.runContextById.get(event.runId); @@ -88,10 +92,12 @@ export function emitAgentEvent(event: Omit) { } export function onAgentEvent(listener: (evt: AgentEventPayload) => void) { + const state = getAgentEventState(); return registerListener(state.listeners, listener); } export function resetAgentEventsForTest() { + const state = getAgentEventState(); state.seqByRun.clear(); state.listeners.clear(); state.runContextById.clear(); diff --git a/src/process/command-queue.ts b/src/process/command-queue.ts index 3421f7d9c86..42da633e846 100644 --- a/src/process/command-queue.ts +++ b/src/process/command-queue.ts @@ -53,11 +53,13 @@ type LaneState = { */ const COMMAND_QUEUE_STATE_KEY = Symbol.for("openclaw.commandQueueState"); -const queueState = resolveGlobalSingleton(COMMAND_QUEUE_STATE_KEY, () => ({ - gatewayDraining: false, - lanes: new Map(), - nextTaskId: 1, -})); +function getQueueState() { + return resolveGlobalSingleton(COMMAND_QUEUE_STATE_KEY, () => ({ + gatewayDraining: false, + lanes: new Map(), + nextTaskId: 1, + })); +} function normalizeLane(lane: string): string { return lane.trim() || CommandLane.Main; @@ -68,6 +70,7 @@ function getLaneDepth(state: LaneState): number { } function getLaneState(lane: string): LaneState { + const queueState = getQueueState(); const existing = queueState.lanes.get(lane); if (existing) { return existing; @@ -120,7 +123,7 @@ function drainLane(lane: string) { ); } logLaneDequeue(lane, waitedMs, state.queue.length); - const taskId = queueState.nextTaskId++; + const taskId = getQueueState().nextTaskId++; const taskGeneration = state.generation; state.activeTaskIds.add(taskId); void (async () => { @@ -163,7 +166,7 @@ function drainLane(lane: string) { * `GatewayDrainingError` instead of being silently killed on shutdown. */ export function markGatewayDraining(): void { - queueState.gatewayDraining = true; + getQueueState().gatewayDraining = true; } export function setCommandLaneConcurrency(lane: string, maxConcurrent: number) { @@ -181,6 +184,7 @@ export function enqueueCommandInLane( onWait?: (waitMs: number, queuedAhead: number) => void; }, ): Promise { + const queueState = getQueueState(); if (queueState.gatewayDraining) { return Promise.reject(new GatewayDrainingError()); } @@ -213,7 +217,7 @@ export function enqueueCommand( export function getQueueSize(lane: string = CommandLane.Main) { const resolved = normalizeLane(lane); - const state = queueState.lanes.get(resolved); + const state = getQueueState().lanes.get(resolved); if (!state) { return 0; } @@ -222,7 +226,7 @@ export function getQueueSize(lane: string = CommandLane.Main) { export function getTotalQueueSize() { let total = 0; - for (const s of queueState.lanes.values()) { + for (const s of getQueueState().lanes.values()) { total += getLaneDepth(s); } return total; @@ -230,7 +234,7 @@ export function getTotalQueueSize() { export function clearCommandLane(lane: string = CommandLane.Main) { const cleaned = normalizeLane(lane); - const state = queueState.lanes.get(cleaned); + const state = getQueueState().lanes.get(cleaned); if (!state) { return 0; } @@ -257,6 +261,7 @@ export function clearCommandLane(lane: string = CommandLane.Main) { * `enqueueCommandInLane()` call (which may never come). */ export function resetAllLanes(): void { + const queueState = getQueueState(); queueState.gatewayDraining = false; const lanesToDrain: string[] = []; for (const state of queueState.lanes.values()) { @@ -278,6 +283,7 @@ export function resetAllLanes(): void { * (excludes queued-but-not-started entries). */ export function getActiveTaskCount(): number { + const queueState = getQueueState(); let total = 0; for (const s of queueState.lanes.values()) { total += s.activeTaskIds.size; @@ -297,6 +303,7 @@ export function waitForActiveTasks(timeoutMs: number): Promise<{ drained: boolea // Keep shutdown/drain checks responsive without busy looping. const POLL_INTERVAL_MS = 50; const deadline = Date.now() + timeoutMs; + const queueState = getQueueState(); const activeAtStart = new Set(); for (const state of queueState.lanes.values()) { for (const taskId of state.activeTaskIds) {