test: reduce subagent announce import overhead

This commit is contained in:
Peter Steinberger 2026-04-03 15:59:26 +01:00
parent 25a187568f
commit a6816cb59c
No known key found for this signature in database
11 changed files with 231 additions and 143 deletions

View File

@ -0,0 +1,4 @@
export { resolveQueueSettings } from "../auto-reply/reply/queue.js";
export { resolveExternalBestEffortDeliveryTarget } from "../infra/outbound/best-effort-delivery.js";
export { createBoundDeliveryRouter } from "../infra/outbound/bound-delivery-router.js";
export { resolveConversationIdFromTargets } from "../infra/outbound/conversation-id.js";

View File

@ -1,16 +1,4 @@
import { resolveQueueSettings } from "../auto-reply/reply/queue.js";
import { parseExplicitTargetForChannel } from "../channels/plugins/target-parsing.js";
import { loadConfig } from "../config/config.js";
import {
loadSessionStore,
resolveAgentIdFromSessionKey,
resolveMainSessionKey,
resolveStorePath,
} from "../config/sessions.js";
import { callGateway } from "../gateway/call.js";
import { resolveExternalBestEffortDeliveryTarget } from "../infra/outbound/best-effort-delivery.js";
import { createBoundDeliveryRouter } from "../infra/outbound/bound-delivery-router.js";
import { resolveConversationIdFromTargets } from "../infra/outbound/conversation-id.js";
import type { ConversationRef } from "../infra/outbound/session-binding-service.js";
import { getGlobalHookRunner } from "../plugins/hook-runner-global.js";
import { normalizeAccountId, normalizeMainKey } from "../routing/session-key.js";
@ -31,12 +19,21 @@ import {
} from "../utils/message-channel.js";
import { buildAnnounceIdempotencyKey, resolveQueueAnnounceId } from "./announce-idempotency.js";
import type { AgentInternalEvent } from "./internal-events.js";
import { isEmbeddedPiRunActive, queueEmbeddedPiMessage } from "./pi-embedded.js";
import {
runSubagentAnnounceDispatch,
type SubagentAnnounceDeliveryResult,
} from "./subagent-announce-dispatch.js";
import { type AnnounceQueueItem, enqueueAnnounce } from "./subagent-announce-queue.js";
import {
callGateway,
isEmbeddedPiRunActive,
loadConfig,
loadSessionStore,
queueEmbeddedPiMessage,
resolveAgentIdFromSessionKey,
resolveMainSessionKey,
resolveStorePath,
} from "./subagent-announce.runtime.js";
import { getSubagentDepthFromSessionStore } from "./subagent-depth.js";
import type { SpawnSubagentMode } from "./subagent-spawn.js";
@ -56,6 +53,15 @@ const defaultSubagentAnnounceDeliveryDeps: SubagentAnnounceDeliveryDeps = {
let subagentAnnounceDeliveryDeps: SubagentAnnounceDeliveryDeps =
defaultSubagentAnnounceDeliveryDeps;
let subagentAnnounceDeliveryRuntimePromise: Promise<
typeof import("./subagent-announce-delivery.runtime.js")
> | null = null;
function loadSubagentAnnounceDeliveryRuntime() {
subagentAnnounceDeliveryRuntimePromise ??= import("./subagent-announce-delivery.runtime.js");
return subagentAnnounceDeliveryRuntimePromise;
}
function resolveDirectAnnounceTransientRetryDelaysMs() {
return process.env.OPENCLAW_TEST_FAST === "1"
? ([8, 16, 32] as const)
@ -260,6 +266,7 @@ export async function resolveSubagentCompletionOrigin(params: {
spawnMode?: SpawnSubagentMode;
expectsCompletionMessage: boolean;
}): Promise<DeliveryContext | undefined> {
const deliveryRuntime = await loadSubagentAnnounceDeliveryRuntime();
const requesterOrigin = normalizeDeliveryContext(params.requesterOrigin);
const channel = requesterOrigin?.channel?.trim().toLowerCase();
const to = requesterOrigin?.to?.trim();
@ -270,14 +277,14 @@ export async function resolveSubagentCompletionOrigin(params: {
: undefined;
const conversationId =
threadId ||
resolveConversationIdFromTargets({
deliveryRuntime.resolveConversationIdFromTargets({
targets: [to],
}) ||
"";
const requesterConversation: ConversationRef | undefined =
channel && conversationId ? { channel, accountId, conversationId } : undefined;
const route = createBoundDeliveryRouter().resolveDestination({
const route = deliveryRuntime.createBoundDeliveryRouter().resolveDestination({
eventKind: "task_completion",
targetSessionKey: params.childSessionKey,
requester: requesterConversation,
@ -438,6 +445,7 @@ async function maybeQueueSubagentAnnounce(params: {
if (params.signal?.aborted) {
return "none";
}
const deliveryRuntime = await loadSubagentAnnounceDeliveryRuntime();
const { cfg, entry } = loadRequesterSessionEntry(params.requesterSessionKey);
const canonicalKey = resolveRequesterStoreKey(cfg, params.requesterSessionKey);
const sessionId = entry?.sessionId;
@ -445,7 +453,7 @@ async function maybeQueueSubagentAnnounce(params: {
return "none";
}
const queueSettings = resolveQueueSettings({
const queueSettings = deliveryRuntime.resolveQueueSettings({
cfg,
channel: entry?.channel ?? entry?.lastChannel ?? entry?.origin?.provider,
sessionEntry: entry,
@ -512,6 +520,7 @@ async function sendSubagentAnnounceDirectly(params: {
path: "none",
};
}
const deliveryRuntime = await loadSubagentAnnounceDeliveryRuntime();
const cfg = subagentAnnounceDeliveryDeps.loadConfig();
const announceTimeoutMs = resolveSubagentAnnounceTimeoutMs(cfg);
const canonicalRequesterSessionKey = resolveRequesterStoreKey(
@ -530,7 +539,7 @@ async function sendSubagentAnnounceDirectly(params: {
? effectiveDirectOrigin
: requesterSessionOrigin;
const deliveryTarget = !params.requesterIsSubagent
? resolveExternalBestEffortDeliveryTarget({
? deliveryRuntime.resolveExternalBestEffortDeliveryTarget({
channel: effectiveDirectOrigin?.channel,
to: effectiveDirectOrigin?.to,
accountId: effectiveDirectOrigin?.accountId,

View File

@ -1,15 +1,15 @@
import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../auto-reply/tokens.js";
import { loadConfig } from "../config/config.js";
import { extractTextFromChatContent } from "../shared/chat-content.js";
import {
callGateway,
loadConfig,
loadSessionStore,
resolveAgentIdFromSessionKey,
resolveStorePath,
} from "../config/sessions.js";
import { callGateway } from "../gateway/call.js";
import { extractTextFromChatContent } from "../shared/chat-content.js";
} from "./subagent-announce.runtime.js";
import { readLatestAssistantReply } from "./tools/agent-step.js";
import { sanitizeTextContent, extractAssistantText } from "./tools/sessions-helpers.js";
import { isAnnounceSkip } from "./tools/sessions-send-helpers.js";
import { extractAssistantText, sanitizeTextContent } from "./tools/session-message-text.js";
import { isAnnounceSkip } from "./tools/sessions-send-tokens.js";
const FAST_TEST_RETRY_INTERVAL_MS = 8;

View File

@ -0,0 +1,11 @@
export {
countActiveDescendantRuns,
countPendingDescendantRuns,
countPendingDescendantRunsExcludingRun,
getLatestSubagentRunByChildSessionKey,
isSubagentSessionRunActive,
listSubagentRunsForRequester,
replaceSubagentRunAfterSteer,
resolveRequesterForChildSession,
shouldIgnorePostCompletionAnnounceForSession,
} from "./subagent-registry-runtime.js";

View File

@ -0,0 +1,13 @@
export { loadConfig } from "../config/config.js";
export {
loadSessionStore,
resolveAgentIdFromSessionKey,
resolveMainSessionKey,
resolveStorePath,
} from "../config/sessions.js";
export { callGateway } from "../gateway/call.js";
export {
isEmbeddedPiRunActive,
queueEmbeddedPiMessage,
waitForEmbeddedPiRunEnd,
} from "./pi-embedded.js";

View File

@ -35,65 +35,60 @@ const { subagentRegistryRuntimeMock } = vi.hoisted(() => ({
},
}));
vi.mock("../config/config.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("../config/config.js")>();
return {
...actual,
loadConfig: () => mockConfig,
resolveGatewayPort: () => 18789,
};
});
vi.mock("../config/sessions.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("../config/sessions.js")>();
return {
...actual,
loadSessionStore: (storePath: string) => loadSessionStoreMock(storePath),
resolveAgentIdFromSessionKey: (sessionKey: string) =>
resolveAgentIdFromSessionKeyMock(sessionKey),
resolveMainSessionKey: (cfg: unknown) => resolveMainSessionKeyMock(cfg),
resolveStorePath: (store: unknown, options: unknown) => resolveStorePathMock(store, options),
};
});
vi.mock("../gateway/call.js", () => ({
callGateway: (request: unknown) => callGatewayMock(request),
}));
vi.mock("../plugins/hook-runner-global.js", () => ({
getGlobalHookRunner: () => ({ hasHooks: () => false }),
}));
vi.mock("./pi-embedded.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("./pi-embedded.js")>();
return {
...actual,
isEmbeddedPiRunActive: (sessionId: string) => isEmbeddedPiRunActiveMock(sessionId),
queueEmbeddedPiMessage: (sessionId: string, text: string) =>
queueEmbeddedPiMessageMock(sessionId, text),
waitForEmbeddedPiRunEnd: (sessionId: string, timeoutMs?: number) =>
waitForEmbeddedPiRunEndMock(sessionId, timeoutMs),
};
});
vi.mock("./subagent-announce.runtime.js", () => ({
callGateway: (request: unknown) => callGatewayMock(request),
isEmbeddedPiRunActive: (sessionId: string) => isEmbeddedPiRunActiveMock(sessionId),
loadConfig: () => mockConfig,
loadSessionStore: (storePath: string) => loadSessionStoreMock(storePath),
queueEmbeddedPiMessage: (sessionId: string, text: string) =>
queueEmbeddedPiMessageMock(sessionId, text),
resolveAgentIdFromSessionKey: (sessionKey: string) =>
resolveAgentIdFromSessionKeyMock(sessionKey),
resolveMainSessionKey: (cfg: unknown) => resolveMainSessionKeyMock(cfg),
resolveStorePath: (store: unknown, options: unknown) => resolveStorePathMock(store, options),
waitForEmbeddedPiRunEnd: (sessionId: string, timeoutMs?: number) =>
waitForEmbeddedPiRunEndMock(sessionId, timeoutMs),
}));
vi.mock("./tools/agent-step.js", () => ({
readLatestAssistantReply: (params?: unknown) => readLatestAssistantReplyMock(params),
}));
vi.mock("./subagent-registry.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("./subagent-registry.js")>();
return {
...actual,
...subagentRegistryRuntimeMock,
};
});
vi.mock("./subagent-registry-runtime.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("./subagent-registry-runtime.js")>();
return {
...actual,
...subagentRegistryRuntimeMock,
};
});
vi.mock("./subagent-announce-delivery.runtime.js", () => ({
createBoundDeliveryRouter: () => ({
resolveDestination: () => ({ mode: "none" }),
}),
resolveConversationIdFromTargets: () => "",
resolveExternalBestEffortDeliveryTarget: (params: {
channel?: string;
to?: string;
accountId?: string;
threadId?: string;
}) => ({
deliver: Boolean(params.channel && params.to),
channel: params.channel,
to: params.to,
accountId: params.accountId,
threadId: params.threadId,
}),
resolveQueueSettings: (params: {
cfg?: {
messages?: {
queue?: {
byChannel?: Record<string, string>;
};
};
};
channel?: string;
}) => ({
mode: (params.channel && params.cfg?.messages?.queue?.byChannel?.[params.channel]) ?? "none",
}),
}));
vi.mock("./subagent-announce.registry.runtime.js", () => subagentRegistryRuntimeMock);
import { runSubagentAnnounceFlow } from "./subagent-announce.js";
describe("subagent announce seam flow", () => {

View File

@ -79,60 +79,56 @@ function createTimeoutHistoryWithNoReply() {
}
vi.mock("../gateway/call.js", createGatewayCallModuleMock);
vi.mock("../config/config.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("../config/config.js")>();
return {
...actual,
loadConfig: () => configOverride,
resolveGatewayPort: () => 18789,
};
});
vi.mock("../config/sessions.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("../config/sessions.js")>();
return {
...actual,
...createSessionsModuleMock(),
};
});
vi.mock("./subagent-depth.js", createSubagentDepthModuleMock);
vi.mock("./pi-embedded.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("./pi-embedded.js")>();
return {
...actual,
isEmbeddedPiRunActive: (sessionId: string) => isEmbeddedPiRunActiveMock(sessionId),
queueEmbeddedPiMessage: (_sessionId: string, _text: string) => false,
waitForEmbeddedPiRunEnd: (sessionId: string, timeoutMs?: number) =>
waitForEmbeddedPiRunEndMock(sessionId, timeoutMs),
};
});
vi.mock("./subagent-registry.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("./subagent-registry.js")>();
return {
...actual,
countActiveDescendantRuns: () => 0,
countPendingDescendantRuns: () => pendingDescendantRuns,
countPendingDescendantRunsExcludingRun: () => 0,
listSubagentRunsForRequester: () => [],
isSubagentSessionRunActive: () => subagentSessionRunActive,
shouldIgnorePostCompletionAnnounceForSession: () => shouldIgnorePostCompletion,
replaceSubagentRunAfterSteer: () => true,
resolveRequesterForChildSession: () => fallbackRequesterResolution,
};
});
vi.mock("./subagent-registry-runtime.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("./subagent-registry-runtime.js")>();
return {
...actual,
countActiveDescendantRuns: () => 0,
countPendingDescendantRuns: () => pendingDescendantRuns,
countPendingDescendantRunsExcludingRun: () => 0,
listSubagentRunsForRequester: () => [],
isSubagentSessionRunActive: () => subagentSessionRunActive,
shouldIgnorePostCompletionAnnounceForSession: () => shouldIgnorePostCompletion,
replaceSubagentRunAfterSteer: () => true,
resolveRequesterForChildSession: () => fallbackRequesterResolution,
};
});
vi.mock("./subagent-announce-delivery.runtime.js", () => ({
createBoundDeliveryRouter: () => ({
resolveDestination: () => ({ mode: "none" }),
}),
resolveConversationIdFromTargets: () => "",
resolveExternalBestEffortDeliveryTarget: (params: {
channel?: string;
to?: string;
accountId?: string;
threadId?: string;
}) => ({
deliver: Boolean(params.channel && params.to),
channel: params.channel,
to: params.to,
accountId: params.accountId,
threadId: params.threadId,
}),
resolveQueueSettings: (params: {
cfg?: {
messages?: {
queue?: {
byChannel?: Record<string, string>;
};
};
};
channel?: string;
}) => ({
mode: (params.channel && params.cfg?.messages?.queue?.byChannel?.[params.channel]) ?? "none",
}),
}));
vi.mock("./subagent-announce.runtime.js", () => ({
callGateway: createGatewayCallModuleMock().callGateway,
loadConfig: () => configOverride,
...createSessionsModuleMock(),
isEmbeddedPiRunActive: (sessionId: string) => isEmbeddedPiRunActiveMock(sessionId),
queueEmbeddedPiMessage: (_sessionId: string, _text: string) => false,
waitForEmbeddedPiRunEnd: (sessionId: string, timeoutMs?: number) =>
waitForEmbeddedPiRunEndMock(sessionId, timeoutMs),
}));
vi.mock("./subagent-announce.registry.runtime.js", () => ({
countActiveDescendantRuns: () => 0,
countPendingDescendantRuns: () => pendingDescendantRuns,
countPendingDescendantRunsExcludingRun: () => 0,
listSubagentRunsForRequester: () => [],
isSubagentSessionRunActive: () => subagentSessionRunActive,
shouldIgnorePostCompletionAnnounceForSession: () => shouldIgnorePostCompletion,
replaceSubagentRunAfterSteer: () => true,
resolveRequesterForChildSession: () => fallbackRequesterResolution,
}));
import { runSubagentAnnounceFlow } from "./subagent-announce.js";
type AnnounceFlowParams = Parameters<
typeof import("./subagent-announce.js").runSubagentAnnounceFlow

View File

@ -1,7 +1,5 @@
import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../auto-reply/tokens.js";
import { DEFAULT_SUBAGENT_MAX_SPAWN_DEPTH } from "../config/agent-limits.js";
import { loadConfig } from "../config/config.js";
import { callGateway } from "../gateway/call.js";
import { defaultRuntime } from "../runtime.js";
import { isCronSessionKey } from "../sessions/session-key-utils.js";
import { type DeliveryContext, normalizeDeliveryContext } from "../utils/delivery-context.js";
@ -11,7 +9,6 @@ import {
buildAnnounceIdempotencyKey,
} from "./announce-idempotency.js";
import { formatAgentInternalEventsForPrompt, type AgentInternalEvent } from "./internal-events.js";
import { isEmbeddedPiRunActive, waitForEmbeddedPiRunEnd } from "./pi-embedded.js";
import {
deliverSubagentAnnouncement,
loadRequesterSessionEntry,
@ -32,9 +29,15 @@ import {
type SubagentRunOutcome,
waitForSubagentRunOutcome,
} from "./subagent-announce-output.js";
import {
callGateway,
isEmbeddedPiRunActive,
loadConfig,
waitForEmbeddedPiRunEnd,
} from "./subagent-announce.runtime.js";
import { getSubagentDepthFromSessionStore } from "./subagent-depth.js";
import type { SpawnSubagentMode } from "./subagent-spawn.js";
import { isAnnounceSkip } from "./tools/sessions-send-helpers.js";
import { isAnnounceSkip } from "./tools/sessions-send-tokens.js";
type SubagentAnnounceDeps = {
callGateway: typeof callGateway;
@ -49,11 +52,11 @@ const defaultSubagentAnnounceDeps: SubagentAnnounceDeps = {
let subagentAnnounceDeps: SubagentAnnounceDeps = defaultSubagentAnnounceDeps;
let subagentRegistryRuntimePromise: Promise<
typeof import("./subagent-registry-runtime.js")
typeof import("./subagent-announce.registry.runtime.js")
> | null = null;
function loadSubagentRegistryRuntime() {
subagentRegistryRuntimePromise ??= import("./subagent-registry-runtime.js");
subagentRegistryRuntimePromise ??= import("./subagent-announce.registry.runtime.js");
return subagentRegistryRuntimePromise;
}

View File

@ -0,0 +1,50 @@
import { extractTextFromChatContent } from "../../shared/chat-content.js";
import { sanitizeUserFacingText } from "../pi-embedded-helpers.js";
import {
stripDowngradedToolCallText,
stripMinimaxToolCallXml,
stripModelSpecialTokens,
stripThinkingTagsFromText,
} from "../pi-embedded-utils.js";
export function stripToolMessages(messages: unknown[]): unknown[] {
return messages.filter((msg) => {
if (!msg || typeof msg !== "object") {
return true;
}
const role = (msg as { role?: unknown }).role;
return role !== "toolResult" && role !== "tool";
});
}
export function sanitizeTextContent(text: string): string {
if (!text) {
return text;
}
return stripThinkingTagsFromText(
stripDowngradedToolCallText(stripModelSpecialTokens(stripMinimaxToolCallXml(text))),
);
}
export function extractAssistantText(message: unknown): string | undefined {
if (!message || typeof message !== "object") {
return undefined;
}
if ((message as { role?: unknown }).role !== "assistant") {
return undefined;
}
const content = (message as { content?: unknown }).content;
if (!Array.isArray(content)) {
return undefined;
}
const joined =
extractTextFromChatContent(content, {
sanitizeText: sanitizeTextContent,
joinWith: "",
normalizeText: (text) => text.trim(),
}) ?? "";
const stopReason = (message as { stopReason?: unknown }).stopReason;
const errorContext = stopReason === "error";
return joined ? sanitizeUserFacingText(joined, { errorContext }) : undefined;
}

View File

@ -5,9 +5,14 @@ import {
import { resolveSessionConversationRef } from "../../channels/plugins/session-conversation.js";
import { normalizeChannelId as normalizeChatChannelId } from "../../channels/registry.js";
import type { OpenClawConfig } from "../../config/config.js";
import { ANNOUNCE_SKIP_TOKEN, REPLY_SKIP_TOKEN } from "./sessions-send-tokens.js";
export {
ANNOUNCE_SKIP_TOKEN,
REPLY_SKIP_TOKEN,
isAnnounceSkip,
isReplySkip,
} from "./sessions-send-tokens.js";
const ANNOUNCE_SKIP_TOKEN = "ANNOUNCE_SKIP";
const REPLY_SKIP_TOKEN = "REPLY_SKIP";
const DEFAULT_PING_PONG_TURNS = 5;
const MAX_PING_PONG_TURNS = 5;
@ -115,14 +120,6 @@ export function buildAgentToAgentAnnounceContext(params: {
return lines.join("\n");
}
export function isAnnounceSkip(text?: string) {
return (text ?? "").trim() === ANNOUNCE_SKIP_TOKEN;
}
export function isReplySkip(text?: string) {
return (text ?? "").trim() === REPLY_SKIP_TOKEN;
}
export function resolvePingPongTurns(cfg?: OpenClawConfig) {
const raw = cfg?.session?.agentToAgent?.maxPingPongTurns;
const fallback = DEFAULT_PING_PONG_TURNS;

View File

@ -0,0 +1,10 @@
export const ANNOUNCE_SKIP_TOKEN = "ANNOUNCE_SKIP";
export const REPLY_SKIP_TOKEN = "REPLY_SKIP";
export function isAnnounceSkip(text?: string) {
return (text ?? "").trim() === ANNOUNCE_SKIP_TOKEN;
}
export function isReplySkip(text?: string) {
return (text ?? "").trim() === REPLY_SKIP_TOKEN;
}