mirror of https://github.com/openclaw/openclaw.git
refactor(core): make event and queue state lazy
This commit is contained in:
parent
f6b3377af2
commit
a9da52da50
|
|
@ -29,16 +29,19 @@ type AgentEventState = {
|
|||
|
||||
const AGENT_EVENT_STATE_KEY = Symbol.for("openclaw.agentEvents.state");
|
||||
|
||||
const state = resolveGlobalSingleton<AgentEventState>(AGENT_EVENT_STATE_KEY, () => ({
|
||||
seqByRun: new Map<string, number>(),
|
||||
listeners: new Set<(evt: AgentEventPayload) => void>(),
|
||||
runContextById: new Map<string, AgentRunContext>(),
|
||||
}));
|
||||
function getAgentEventState(): AgentEventState {
|
||||
return resolveGlobalSingleton<AgentEventState>(AGENT_EVENT_STATE_KEY, () => ({
|
||||
seqByRun: new Map<string, number>(),
|
||||
listeners: new Set<(evt: AgentEventPayload) => void>(),
|
||||
runContextById: new Map<string, AgentRunContext>(),
|
||||
}));
|
||||
}
|
||||
|
||||
export function registerAgentRunContext(runId: string, context: AgentRunContext) {
|
||||
if (!runId) {
|
||||
return;
|
||||
}
|
||||
const state = getAgentEventState();
|
||||
const existing = state.runContextById.get(runId);
|
||||
if (!existing) {
|
||||
state.runContextById.set(runId, { ...context });
|
||||
|
|
@ -59,18 +62,19 @@ export function registerAgentRunContext(runId: string, context: AgentRunContext)
|
|||
}
|
||||
|
||||
export function getAgentRunContext(runId: string) {
|
||||
return state.runContextById.get(runId);
|
||||
return getAgentEventState().runContextById.get(runId);
|
||||
}
|
||||
|
||||
export function clearAgentRunContext(runId: string) {
|
||||
state.runContextById.delete(runId);
|
||||
getAgentEventState().runContextById.delete(runId);
|
||||
}
|
||||
|
||||
export function resetAgentRunContextForTest() {
|
||||
state.runContextById.clear();
|
||||
getAgentEventState().runContextById.clear();
|
||||
}
|
||||
|
||||
export function emitAgentEvent(event: Omit<AgentEventPayload, "seq" | "ts">) {
|
||||
const state = getAgentEventState();
|
||||
const nextSeq = (state.seqByRun.get(event.runId) ?? 0) + 1;
|
||||
state.seqByRun.set(event.runId, nextSeq);
|
||||
const context = state.runContextById.get(event.runId);
|
||||
|
|
@ -88,10 +92,12 @@ export function emitAgentEvent(event: Omit<AgentEventPayload, "seq" | "ts">) {
|
|||
}
|
||||
|
||||
export function onAgentEvent(listener: (evt: AgentEventPayload) => void) {
|
||||
const state = getAgentEventState();
|
||||
return registerListener(state.listeners, listener);
|
||||
}
|
||||
|
||||
export function resetAgentEventsForTest() {
|
||||
const state = getAgentEventState();
|
||||
state.seqByRun.clear();
|
||||
state.listeners.clear();
|
||||
state.runContextById.clear();
|
||||
|
|
|
|||
|
|
@ -53,11 +53,13 @@ type LaneState = {
|
|||
*/
|
||||
const COMMAND_QUEUE_STATE_KEY = Symbol.for("openclaw.commandQueueState");
|
||||
|
||||
const queueState = resolveGlobalSingleton(COMMAND_QUEUE_STATE_KEY, () => ({
|
||||
gatewayDraining: false,
|
||||
lanes: new Map<string, LaneState>(),
|
||||
nextTaskId: 1,
|
||||
}));
|
||||
function getQueueState() {
|
||||
return resolveGlobalSingleton(COMMAND_QUEUE_STATE_KEY, () => ({
|
||||
gatewayDraining: false,
|
||||
lanes: new Map<string, LaneState>(),
|
||||
nextTaskId: 1,
|
||||
}));
|
||||
}
|
||||
|
||||
function normalizeLane(lane: string): string {
|
||||
return lane.trim() || CommandLane.Main;
|
||||
|
|
@ -68,6 +70,7 @@ function getLaneDepth(state: LaneState): number {
|
|||
}
|
||||
|
||||
function getLaneState(lane: string): LaneState {
|
||||
const queueState = getQueueState();
|
||||
const existing = queueState.lanes.get(lane);
|
||||
if (existing) {
|
||||
return existing;
|
||||
|
|
@ -120,7 +123,7 @@ function drainLane(lane: string) {
|
|||
);
|
||||
}
|
||||
logLaneDequeue(lane, waitedMs, state.queue.length);
|
||||
const taskId = queueState.nextTaskId++;
|
||||
const taskId = getQueueState().nextTaskId++;
|
||||
const taskGeneration = state.generation;
|
||||
state.activeTaskIds.add(taskId);
|
||||
void (async () => {
|
||||
|
|
@ -163,7 +166,7 @@ function drainLane(lane: string) {
|
|||
* `GatewayDrainingError` instead of being silently killed on shutdown.
|
||||
*/
|
||||
export function markGatewayDraining(): void {
|
||||
queueState.gatewayDraining = true;
|
||||
getQueueState().gatewayDraining = true;
|
||||
}
|
||||
|
||||
export function setCommandLaneConcurrency(lane: string, maxConcurrent: number) {
|
||||
|
|
@ -181,6 +184,7 @@ export function enqueueCommandInLane<T>(
|
|||
onWait?: (waitMs: number, queuedAhead: number) => void;
|
||||
},
|
||||
): Promise<T> {
|
||||
const queueState = getQueueState();
|
||||
if (queueState.gatewayDraining) {
|
||||
return Promise.reject(new GatewayDrainingError());
|
||||
}
|
||||
|
|
@ -213,7 +217,7 @@ export function enqueueCommand<T>(
|
|||
|
||||
export function getQueueSize(lane: string = CommandLane.Main) {
|
||||
const resolved = normalizeLane(lane);
|
||||
const state = queueState.lanes.get(resolved);
|
||||
const state = getQueueState().lanes.get(resolved);
|
||||
if (!state) {
|
||||
return 0;
|
||||
}
|
||||
|
|
@ -222,7 +226,7 @@ export function getQueueSize(lane: string = CommandLane.Main) {
|
|||
|
||||
export function getTotalQueueSize() {
|
||||
let total = 0;
|
||||
for (const s of queueState.lanes.values()) {
|
||||
for (const s of getQueueState().lanes.values()) {
|
||||
total += getLaneDepth(s);
|
||||
}
|
||||
return total;
|
||||
|
|
@ -230,7 +234,7 @@ export function getTotalQueueSize() {
|
|||
|
||||
export function clearCommandLane(lane: string = CommandLane.Main) {
|
||||
const cleaned = normalizeLane(lane);
|
||||
const state = queueState.lanes.get(cleaned);
|
||||
const state = getQueueState().lanes.get(cleaned);
|
||||
if (!state) {
|
||||
return 0;
|
||||
}
|
||||
|
|
@ -257,6 +261,7 @@ export function clearCommandLane(lane: string = CommandLane.Main) {
|
|||
* `enqueueCommandInLane()` call (which may never come).
|
||||
*/
|
||||
export function resetAllLanes(): void {
|
||||
const queueState = getQueueState();
|
||||
queueState.gatewayDraining = false;
|
||||
const lanesToDrain: string[] = [];
|
||||
for (const state of queueState.lanes.values()) {
|
||||
|
|
@ -278,6 +283,7 @@ export function resetAllLanes(): void {
|
|||
* (excludes queued-but-not-started entries).
|
||||
*/
|
||||
export function getActiveTaskCount(): number {
|
||||
const queueState = getQueueState();
|
||||
let total = 0;
|
||||
for (const s of queueState.lanes.values()) {
|
||||
total += s.activeTaskIds.size;
|
||||
|
|
@ -297,6 +303,7 @@ export function waitForActiveTasks(timeoutMs: number): Promise<{ drained: boolea
|
|||
// Keep shutdown/drain checks responsive without busy looping.
|
||||
const POLL_INTERVAL_MS = 50;
|
||||
const deadline = Date.now() + timeoutMs;
|
||||
const queueState = getQueueState();
|
||||
const activeAtStart = new Set<number>();
|
||||
for (const state of queueState.lanes.values()) {
|
||||
for (const taskId of state.activeTaskIds) {
|
||||
|
|
|
|||
Loading…
Reference in New Issue