From 6b7206ed35ff4f1f2d80e21753b2ef655952ae7e Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Sun, 22 Mar 2026 09:05:14 -0700 Subject: [PATCH] perf(inbound): trim reply-run startup imports (#52332) * perf(inbound): trim reply-run startup imports * style(reply): format body runtime import * test(reply): restore runtime seam mocks --- src/agents/pi-embedded.runtime.ts | 6 + src/auto-reply/reply/agent-runner.runtime.ts | 1 + src/auto-reply/reply/body.ts | 13 +- .../reply/get-reply-run.media-only.test.ts | 28 +++-- src/auto-reply/reply/get-reply-run.ts | 53 ++++++-- src/auto-reply/reply/route-reply.runtime.ts | 1 + src/auto-reply/reply/session-system-events.ts | 112 +++++++++++++++++ .../reply/session-updates.runtime.ts | 1 + src/auto-reply/reply/session-updates.ts | 113 +----------------- 9 files changed, 193 insertions(+), 135 deletions(-) create mode 100644 src/agents/pi-embedded.runtime.ts create mode 100644 src/auto-reply/reply/agent-runner.runtime.ts create mode 100644 src/auto-reply/reply/route-reply.runtime.ts create mode 100644 src/auto-reply/reply/session-system-events.ts create mode 100644 src/auto-reply/reply/session-updates.runtime.ts diff --git a/src/agents/pi-embedded.runtime.ts b/src/agents/pi-embedded.runtime.ts new file mode 100644 index 00000000000..96dc2a0853e --- /dev/null +++ b/src/agents/pi-embedded.runtime.ts @@ -0,0 +1,6 @@ +export { + abortEmbeddedPiRun, + isEmbeddedPiRunActive, + isEmbeddedPiRunStreaming, + resolveEmbeddedSessionLane, +} from "./pi-embedded.js"; diff --git a/src/auto-reply/reply/agent-runner.runtime.ts b/src/auto-reply/reply/agent-runner.runtime.ts new file mode 100644 index 00000000000..60933427362 --- /dev/null +++ b/src/auto-reply/reply/agent-runner.runtime.ts @@ -0,0 +1 @@ +export { runReplyAgent } from "./agent-runner.js"; diff --git a/src/auto-reply/reply/body.ts b/src/auto-reply/reply/body.ts index 900c8148c30..147e4147875 100644 --- a/src/auto-reply/reply/body.ts +++ b/src/auto-reply/reply/body.ts @@ -1,7 +1,15 @@ -import type { SessionEntry } from "../../config/sessions.js"; -import { updateSessionStore } from "../../config/sessions.js"; +import type { SessionEntry } from "../../config/sessions/types.js"; import { setAbortMemory } from "./abort-primitives.js"; +let sessionStoreRuntimePromise: Promise< + typeof import("../../config/sessions/store.runtime.js") +> | null = null; + +function loadSessionStoreRuntime() { + sessionStoreRuntimePromise ??= import("../../config/sessions/store.runtime.js"); + return sessionStoreRuntimePromise; +} + export async function applySessionHints(params: { baseBody: string; abortedLastRun: boolean; @@ -23,6 +31,7 @@ export async function applySessionHints(params: { params.sessionStore[params.sessionKey] = params.sessionEntry; if (params.storePath) { const sessionKey = params.sessionKey; + const { updateSessionStore } = await loadSessionStoreRuntime(); await updateSessionStore(params.storePath, (store) => { const entry = store[sessionKey] ?? params.sessionEntry; if (!entry) { diff --git a/src/auto-reply/reply/get-reply-run.media-only.test.ts b/src/auto-reply/reply/get-reply-run.media-only.test.ts index 829b3937009..2c1dd06b8e5 100644 --- a/src/auto-reply/reply/get-reply-run.media-only.test.ts +++ b/src/auto-reply/reply/get-reply-run.media-only.test.ts @@ -5,17 +5,23 @@ vi.mock("../../agents/auth-profiles/session-override.js", () => ({ resolveSessionAuthProfileOverride: vi.fn().mockResolvedValue(undefined), })); -vi.mock("../../agents/pi-embedded.js", () => ({ +vi.mock("../../agents/pi-embedded.runtime.js", () => ({ abortEmbeddedPiRun: vi.fn().mockReturnValue(false), isEmbeddedPiRunActive: vi.fn().mockReturnValue(false), isEmbeddedPiRunStreaming: vi.fn().mockReturnValue(false), resolveEmbeddedSessionLane: vi.fn().mockReturnValue("session:session-key"), })); -vi.mock("../../config/sessions.js", () => ({ +vi.mock("../../config/sessions/group.js", () => ({ resolveGroupSessionKey: vi.fn().mockReturnValue(undefined), +})); + +vi.mock("../../config/sessions/paths.js", () => ({ resolveSessionFilePath: vi.fn().mockReturnValue("/tmp/session.jsonl"), resolveSessionFilePathOptions: vi.fn().mockReturnValue({}), +})); + +vi.mock("../../config/sessions/store.js", () => ({ updateSessionStore: vi.fn(), })); @@ -30,6 +36,7 @@ vi.mock("../../process/command-queue.js", () => ({ vi.mock("../../routing/session-key.js", () => ({ normalizeMainKey: vi.fn().mockReturnValue("main"), + normalizeAgentId: vi.fn((id?: string) => id ?? "default"), })); vi.mock("../../utils/provider-utils.js", () => ({ @@ -40,7 +47,7 @@ vi.mock("../command-detection.js", () => ({ hasControlCommand: vi.fn().mockReturnValue(false), })); -vi.mock("./agent-runner.js", () => ({ +vi.mock("./agent-runner.runtime.js", () => ({ runReplyAgent: vi.fn().mockResolvedValue({ text: "ok" }), })); @@ -58,20 +65,23 @@ vi.mock("./inbound-meta.js", () => ({ buildInboundUserContextPrefix: vi.fn().mockReturnValue(""), })); -vi.mock("./queue.js", () => ({ +vi.mock("./queue/settings.js", () => ({ resolveQueueSettings: vi.fn().mockReturnValue({ mode: "followup" }), })); -vi.mock("./route-reply.js", () => ({ +vi.mock("./route-reply.runtime.js", () => ({ routeReply: vi.fn(), })); -vi.mock("./session-updates.js", () => ({ +vi.mock("./session-updates.runtime.js", () => ({ ensureSkillSnapshot: vi.fn().mockImplementation(async ({ sessionEntry, systemSent }) => ({ sessionEntry, systemSent, skillsSnapshot: undefined, })), +})); + +vi.mock("./session-system-events.js", () => ({ drainFormattedSystemEvents: vi.fn().mockResolvedValue(undefined), })); @@ -79,9 +89,9 @@ vi.mock("./typing-mode.js", () => ({ resolveTypingMode: vi.fn().mockReturnValue("off"), })); -import { runReplyAgent } from "./agent-runner.js"; -import { routeReply } from "./route-reply.js"; -import { drainFormattedSystemEvents } from "./session-updates.js"; +import { runReplyAgent } from "./agent-runner.runtime.js"; +import { routeReply } from "./route-reply.runtime.js"; +import { drainFormattedSystemEvents } from "./session-system-events.js"; import { resolveTypingMode } from "./typing-mode.js"; function baseParams( diff --git a/src/auto-reply/reply/get-reply-run.ts b/src/auto-reply/reply/get-reply-run.ts index 71de418e623..fbfae534954 100644 --- a/src/auto-reply/reply/get-reply-run.ts +++ b/src/auto-reply/reply/get-reply-run.ts @@ -2,12 +2,6 @@ import crypto from "node:crypto"; import { resolveSessionAuthProfileOverride } from "../../agents/auth-profiles/session-override.js"; import type { ExecToolDefaults } from "../../agents/bash-tools.js"; import { resolveFastModeState } from "../../agents/fast-mode.js"; -import { - abortEmbeddedPiRun, - isEmbeddedPiRunActive, - isEmbeddedPiRunStreaming, - resolveEmbeddedSessionLane, -} from "../../agents/pi-embedded.js"; import type { OpenClawConfig } from "../../config/config.js"; import { resolveGroupSessionKey } from "../../config/sessions/group.js"; import { @@ -35,7 +29,6 @@ import { } from "../thinking.js"; import { SILENT_REPLY_TOKEN } from "../tokens.js"; import type { GetReplyOptions, ReplyPayload } from "../types.js"; -import { runReplyAgent } from "./agent-runner.js"; import { applySessionHints } from "./body.js"; import type { buildCommandContext } from "./commands.js"; import type { InlineDirectives } from "./directive-handling.js"; @@ -43,10 +36,10 @@ import { buildGroupChatContext, buildGroupIntro } from "./groups.js"; import { buildInboundMetaSystemPrompt, buildInboundUserContextPrefix } from "./inbound-meta.js"; import type { createModelSelectionState } from "./model-selection.js"; import { resolveOriginMessageProvider } from "./origin-routing.js"; -import { resolveQueueSettings } from "./queue.js"; -import { routeReply } from "./route-reply.js"; +import { resolveQueueSettings } from "./queue/settings.js"; +import type { RouteReplyParams } from "./route-reply.js"; import { buildBareSessionResetPrompt } from "./session-reset-prompt.js"; -import { drainFormattedSystemEvents, ensureSkillSnapshot } from "./session-updates.js"; +import { drainFormattedSystemEvents } from "./session-system-events.js"; import { resolveTypingMode } from "./typing-mode.js"; import { resolveRunTypingPolicy } from "./typing-policy.js"; import type { TypingController } from "./typing.js"; @@ -55,6 +48,33 @@ import { appendUntrustedContext } from "./untrusted-context.js"; type AgentDefaults = NonNullable["defaults"]; type ExecOverrides = Pick; +let piEmbeddedRuntimePromise: Promise | null = + null; +let agentRunnerRuntimePromise: Promise | null = null; +let routeReplyRuntimePromise: Promise | null = null; +let sessionUpdatesRuntimePromise: Promise | null = + null; + +function loadPiEmbeddedRuntime() { + piEmbeddedRuntimePromise ??= import("../../agents/pi-embedded.runtime.js"); + return piEmbeddedRuntimePromise; +} + +function loadAgentRunnerRuntime() { + agentRunnerRuntimePromise ??= import("./agent-runner.runtime.js"); + return agentRunnerRuntimePromise; +} + +function loadRouteReplyRuntime() { + routeReplyRuntimePromise ??= import("./route-reply.runtime.js"); + return routeReplyRuntimePromise; +} + +function loadSessionUpdatesRuntime() { + sessionUpdatesRuntimePromise ??= import("./session-updates.runtime.js"); + return sessionUpdatesRuntimePromise; +} + function buildResetSessionNoticeText(params: { provider: string; model: string; @@ -72,13 +92,13 @@ function resolveResetSessionNoticeRoute(params: { ctx: MsgContext; command: ReturnType; }): { - channel: Parameters[0]["channel"]; + channel: RouteReplyParams["channel"]; to: string; } | null { const commandChannel = params.command.channel?.trim().toLowerCase(); const fallbackChannel = commandChannel && commandChannel !== "webchat" - ? (commandChannel as Parameters[0]["channel"]) + ? (commandChannel as RouteReplyParams["channel"]) : undefined; const channel = params.ctx.OriginatingChannel ?? fallbackChannel; const to = params.ctx.OriginatingTo ?? params.command.from ?? params.command.to; @@ -107,6 +127,7 @@ async function sendResetSessionNotice(params: { if (!route) { return; } + const { routeReply } = await loadRouteReplyRuntime(); await routeReply({ payload: { text: buildResetSessionNoticeText({ @@ -368,6 +389,7 @@ export async function runPreparedReply( : threadStarterBody ? `[Thread starter - for context]\n${threadStarterBody}` : undefined; + const { ensureSkillSnapshot } = await loadSessionUpdatesRuntime(); const skillResult = await ensureSkillSnapshot({ sessionEntry, sessionStore, @@ -446,6 +468,12 @@ export async function runPreparedReply( inlineMode: perMessageQueueMode, inlineOptions: perMessageQueueOptions, }); + const { + abortEmbeddedPiRun, + isEmbeddedPiRunActive, + isEmbeddedPiRunStreaming, + resolveEmbeddedSessionLane, + } = await loadPiEmbeddedRuntime(); const sessionLaneKey = resolveEmbeddedSessionLane(sessionKey ?? sessionIdFinal); const laneSize = getQueueSize(sessionLaneKey); if (resolvedQueue.mode === "interrupt" && laneSize > 0) { @@ -538,6 +566,7 @@ export async function runPreparedReply( }, }; + const { runReplyAgent } = await loadAgentRunnerRuntime(); return runReplyAgent({ commandBody: prefixedCommandBody, followupRun, diff --git a/src/auto-reply/reply/route-reply.runtime.ts b/src/auto-reply/reply/route-reply.runtime.ts new file mode 100644 index 00000000000..4d68fc7ce23 --- /dev/null +++ b/src/auto-reply/reply/route-reply.runtime.ts @@ -0,0 +1 @@ +export { routeReply } from "./route-reply.js"; diff --git a/src/auto-reply/reply/session-system-events.ts b/src/auto-reply/reply/session-system-events.ts new file mode 100644 index 00000000000..b5b07ec2ba9 --- /dev/null +++ b/src/auto-reply/reply/session-system-events.ts @@ -0,0 +1,112 @@ +import { resolveUserTimezone } from "../../agents/date-time.js"; +import type { OpenClawConfig } from "../../config/config.js"; +import { buildChannelSummary } from "../../infra/channel-summary.js"; +import { + formatUtcTimestamp, + formatZonedTimestamp, + resolveTimezone, +} from "../../infra/format-time/format-datetime.ts"; +import { drainSystemEventEntries } from "../../infra/system-events.js"; + +/** Drain queued system events, format as `System:` lines, return the block (or undefined). */ +export async function drainFormattedSystemEvents(params: { + cfg: OpenClawConfig; + sessionKey: string; + isMainSession: boolean; + isNewSession: boolean; +}): Promise { + const compactSystemEvent = (line: string): string | null => { + const trimmed = line.trim(); + if (!trimmed) { + return null; + } + const lower = trimmed.toLowerCase(); + if (lower.includes("reason periodic")) { + return null; + } + // Filter out the actual heartbeat prompt, but not cron jobs that mention "heartbeat". + // The heartbeat prompt starts with "Read HEARTBEAT.md" - cron payloads won't match this. + if (lower.startsWith("read heartbeat.md")) { + return null; + } + if (lower.includes("heartbeat poll") || lower.includes("heartbeat wake")) { + return null; + } + if (trimmed.startsWith("Node:")) { + return trimmed.replace(/ · last input [^·]+/i, "").trim(); + } + return trimmed; + }; + + const resolveSystemEventTimezone = (cfg: OpenClawConfig) => { + const raw = cfg.agents?.defaults?.envelopeTimezone?.trim(); + if (!raw) { + return { mode: "local" as const }; + } + const lowered = raw.toLowerCase(); + if (lowered === "utc" || lowered === "gmt") { + return { mode: "utc" as const }; + } + if (lowered === "local" || lowered === "host") { + return { mode: "local" as const }; + } + if (lowered === "user") { + return { + mode: "iana" as const, + timeZone: resolveUserTimezone(cfg.agents?.defaults?.userTimezone), + }; + } + const explicit = resolveTimezone(raw); + return explicit ? { mode: "iana" as const, timeZone: explicit } : { mode: "local" as const }; + }; + + const formatSystemEventTimestamp = (ts: number, cfg: OpenClawConfig) => { + const date = new Date(ts); + if (Number.isNaN(date.getTime())) { + return "unknown-time"; + } + const zone = resolveSystemEventTimezone(cfg); + if (zone.mode === "utc") { + return formatUtcTimestamp(date, { displaySeconds: true }); + } + if (zone.mode === "local") { + return formatZonedTimestamp(date, { displaySeconds: true }) ?? "unknown-time"; + } + return ( + formatZonedTimestamp(date, { timeZone: zone.timeZone, displaySeconds: true }) ?? + "unknown-time" + ); + }; + + const systemLines: string[] = []; + const queued = drainSystemEventEntries(params.sessionKey); + systemLines.push( + ...queued + .map((event) => { + const compacted = compactSystemEvent(event.text); + if (!compacted) { + return null; + } + return `[${formatSystemEventTimestamp(event.ts, params.cfg)}] ${compacted}`; + }) + .filter((v): v is string => Boolean(v)), + ); + if (params.isMainSession && params.isNewSession) { + const summary = await buildChannelSummary(params.cfg); + if (summary.length > 0) { + systemLines.unshift(...summary); + } + } + if (systemLines.length === 0) { + return undefined; + } + + // Format events as trusted System: lines for the message timeline. + // Inbound sanitization rewrites any user-supplied "System:" to "System (untrusted):", + // so these gateway-originated lines are distinguishable by the model. + // Each sub-line of a multi-line event gets its own System: prefix so continuation + // lines can't be mistaken for user content. + return systemLines + .flatMap((line) => line.split("\n").map((subline) => `System: ${subline}`)) + .join("\n"); +} diff --git a/src/auto-reply/reply/session-updates.runtime.ts b/src/auto-reply/reply/session-updates.runtime.ts new file mode 100644 index 00000000000..a366dccf8fa --- /dev/null +++ b/src/auto-reply/reply/session-updates.runtime.ts @@ -0,0 +1 @@ +export { ensureSkillSnapshot } from "./session-updates.js"; diff --git a/src/auto-reply/reply/session-updates.ts b/src/auto-reply/reply/session-updates.ts index bea6cd326e0..59981ae7fd3 100644 --- a/src/auto-reply/reply/session-updates.ts +++ b/src/auto-reply/reply/session-updates.ts @@ -1,121 +1,10 @@ import crypto from "node:crypto"; -import { resolveUserTimezone } from "../../agents/date-time.js"; import { buildWorkspaceSkillSnapshot } from "../../agents/skills.js"; import { ensureSkillsWatcher, getSkillsSnapshotVersion } from "../../agents/skills/refresh.js"; import type { OpenClawConfig } from "../../config/config.js"; import { type SessionEntry, updateSessionStore } from "../../config/sessions.js"; -import { buildChannelSummary } from "../../infra/channel-summary.js"; -import { - resolveTimezone, - formatUtcTimestamp, - formatZonedTimestamp, -} from "../../infra/format-time/format-datetime.ts"; import { getRemoteSkillEligibility } from "../../infra/skills-remote.js"; -import { drainSystemEventEntries } from "../../infra/system-events.js"; - -/** Drain queued system events, format as `System:` lines, return the block (or undefined). */ -export async function drainFormattedSystemEvents(params: { - cfg: OpenClawConfig; - sessionKey: string; - isMainSession: boolean; - isNewSession: boolean; -}): Promise { - const compactSystemEvent = (line: string): string | null => { - const trimmed = line.trim(); - if (!trimmed) { - return null; - } - const lower = trimmed.toLowerCase(); - if (lower.includes("reason periodic")) { - return null; - } - // Filter out the actual heartbeat prompt, but not cron jobs that mention "heartbeat" - // The heartbeat prompt starts with "Read HEARTBEAT.md" - cron payloads won't match this - if (lower.startsWith("read heartbeat.md")) { - return null; - } - // Also filter heartbeat poll/wake noise - if (lower.includes("heartbeat poll") || lower.includes("heartbeat wake")) { - return null; - } - if (trimmed.startsWith("Node:")) { - return trimmed.replace(/ · last input [^·]+/i, "").trim(); - } - return trimmed; - }; - - const resolveSystemEventTimezone = (cfg: OpenClawConfig) => { - const raw = cfg.agents?.defaults?.envelopeTimezone?.trim(); - if (!raw) { - return { mode: "local" as const }; - } - const lowered = raw.toLowerCase(); - if (lowered === "utc" || lowered === "gmt") { - return { mode: "utc" as const }; - } - if (lowered === "local" || lowered === "host") { - return { mode: "local" as const }; - } - if (lowered === "user") { - return { - mode: "iana" as const, - timeZone: resolveUserTimezone(cfg.agents?.defaults?.userTimezone), - }; - } - const explicit = resolveTimezone(raw); - return explicit ? { mode: "iana" as const, timeZone: explicit } : { mode: "local" as const }; - }; - - const formatSystemEventTimestamp = (ts: number, cfg: OpenClawConfig) => { - const date = new Date(ts); - if (Number.isNaN(date.getTime())) { - return "unknown-time"; - } - const zone = resolveSystemEventTimezone(cfg); - if (zone.mode === "utc") { - return formatUtcTimestamp(date, { displaySeconds: true }); - } - if (zone.mode === "local") { - return formatZonedTimestamp(date, { displaySeconds: true }) ?? "unknown-time"; - } - return ( - formatZonedTimestamp(date, { timeZone: zone.timeZone, displaySeconds: true }) ?? - "unknown-time" - ); - }; - - const systemLines: string[] = []; - const queued = drainSystemEventEntries(params.sessionKey); - systemLines.push( - ...queued - .map((event) => { - const compacted = compactSystemEvent(event.text); - if (!compacted) { - return null; - } - return `[${formatSystemEventTimestamp(event.ts, params.cfg)}] ${compacted}`; - }) - .filter((v): v is string => Boolean(v)), - ); - if (params.isMainSession && params.isNewSession) { - const summary = await buildChannelSummary(params.cfg); - if (summary.length > 0) { - systemLines.unshift(...summary); - } - } - if (systemLines.length === 0) { - return undefined; - } - - // Format events as trusted System: lines for the message timeline. - // Inbound sanitization rewrites any user-supplied "System:" to "System (untrusted):", - // so these gateway-originated lines are distinguishable by the model. - // Each sub-line of a multi-line event gets its own System: prefix so continuation - // lines can't be mistaken for user content. - return systemLines - .flatMap((line) => line.split("\n").map((subline) => `System: ${subline}`)) - .join("\n"); -} +export { drainFormattedSystemEvents } from "./session-system-events.js"; async function persistSessionEntryUpdate(params: { sessionStore?: Record;