mirror of https://github.com/openclaw/openclaw.git
763 lines
24 KiB
TypeScript
763 lines
24 KiB
TypeScript
import crypto from "node:crypto";
|
|
import { getAcpSessionManager } from "../acp/control-plane/manager.js";
|
|
import {
|
|
cleanupFailedAcpSpawn,
|
|
type AcpSpawnRuntimeCloseHandle,
|
|
} from "../acp/control-plane/spawn.js";
|
|
import { isAcpEnabledByPolicy, resolveAcpAgentPolicyError } from "../acp/policy.js";
|
|
import {
|
|
resolveAcpSessionCwd,
|
|
resolveAcpThreadSessionDetailLines,
|
|
} from "../acp/runtime/session-identifiers.js";
|
|
import type { AcpRuntimeSessionMode } from "../acp/runtime/types.js";
|
|
import { DEFAULT_HEARTBEAT_EVERY } from "../auto-reply/heartbeat.js";
|
|
import {
|
|
resolveThreadBindingIntroText,
|
|
resolveThreadBindingThreadName,
|
|
} from "../channels/thread-bindings-messages.js";
|
|
import {
|
|
formatThreadBindingDisabledError,
|
|
formatThreadBindingSpawnDisabledError,
|
|
resolveThreadBindingIdleTimeoutMsForChannel,
|
|
resolveThreadBindingMaxAgeMsForChannel,
|
|
resolveThreadBindingSpawnPolicy,
|
|
} from "../channels/thread-bindings-policy.js";
|
|
import { parseDurationMs } from "../cli/parse-duration.js";
|
|
import { loadConfig } from "../config/config.js";
|
|
import type { OpenClawConfig } from "../config/config.js";
|
|
import { loadSessionStore, resolveStorePath, type SessionEntry } from "../config/sessions.js";
|
|
import { resolveSessionTranscriptFile } from "../config/sessions/transcript.js";
|
|
import { callGateway } from "../gateway/call.js";
|
|
import { areHeartbeatsEnabled } from "../infra/heartbeat-wake.js";
|
|
import { resolveConversationIdFromTargets } from "../infra/outbound/conversation-id.js";
|
|
import {
|
|
getSessionBindingService,
|
|
isSessionBindingError,
|
|
type SessionBindingRecord,
|
|
} from "../infra/outbound/session-binding-service.js";
|
|
import { createSubsystemLogger } from "../logging/subsystem.js";
|
|
import {
|
|
isSubagentSessionKey,
|
|
normalizeAgentId,
|
|
parseAgentSessionKey,
|
|
} from "../routing/session-key.js";
|
|
import { deliveryContextFromSession, normalizeDeliveryContext } from "../utils/delivery-context.js";
|
|
import {
|
|
type AcpSpawnParentRelayHandle,
|
|
resolveAcpSpawnStreamLogPath,
|
|
startAcpSpawnParentStreamRelay,
|
|
} from "./acp-spawn-parent-stream.js";
|
|
import { resolveAgentConfig, resolveDefaultAgentId } from "./agent-scope.js";
|
|
import { resolveSandboxRuntimeStatus } from "./sandbox/runtime-status.js";
|
|
import { resolveInternalSessionKey, resolveMainSessionAlias } from "./tools/sessions-helpers.js";
|
|
|
|
const log = createSubsystemLogger("agents/acp-spawn");
|
|
|
|
export const ACP_SPAWN_MODES = ["run", "session"] as const;
|
|
export type SpawnAcpMode = (typeof ACP_SPAWN_MODES)[number];
|
|
export const ACP_SPAWN_SANDBOX_MODES = ["inherit", "require"] as const;
|
|
export type SpawnAcpSandboxMode = (typeof ACP_SPAWN_SANDBOX_MODES)[number];
|
|
export const ACP_SPAWN_STREAM_TARGETS = ["parent"] as const;
|
|
export type SpawnAcpStreamTarget = (typeof ACP_SPAWN_STREAM_TARGETS)[number];
|
|
|
|
export type SpawnAcpParams = {
|
|
task: string;
|
|
label?: string;
|
|
agentId?: string;
|
|
resumeSessionId?: string;
|
|
cwd?: string;
|
|
mode?: SpawnAcpMode;
|
|
thread?: boolean;
|
|
sandbox?: SpawnAcpSandboxMode;
|
|
streamTo?: SpawnAcpStreamTarget;
|
|
};
|
|
|
|
export type SpawnAcpContext = {
|
|
agentSessionKey?: string;
|
|
agentChannel?: string;
|
|
agentAccountId?: string;
|
|
agentTo?: string;
|
|
agentThreadId?: string | number;
|
|
sandboxed?: boolean;
|
|
};
|
|
|
|
export type SpawnAcpResult = {
|
|
status: "accepted" | "forbidden" | "error";
|
|
childSessionKey?: string;
|
|
runId?: string;
|
|
mode?: SpawnAcpMode;
|
|
streamLogPath?: string;
|
|
note?: string;
|
|
error?: string;
|
|
};
|
|
|
|
export const ACP_SPAWN_ACCEPTED_NOTE =
|
|
"initial ACP task queued in isolated session; follow-ups continue in the bound thread.";
|
|
export const ACP_SPAWN_SESSION_ACCEPTED_NOTE =
|
|
"thread-bound ACP session stays active after this task; continue in-thread for follow-ups.";
|
|
|
|
export function resolveAcpSpawnRuntimePolicyError(params: {
|
|
cfg: OpenClawConfig;
|
|
requesterSessionKey?: string;
|
|
requesterSandboxed?: boolean;
|
|
sandbox?: SpawnAcpSandboxMode;
|
|
}): string | undefined {
|
|
const sandboxMode = params.sandbox === "require" ? "require" : "inherit";
|
|
const requesterRuntime = resolveSandboxRuntimeStatus({
|
|
cfg: params.cfg,
|
|
sessionKey: params.requesterSessionKey,
|
|
});
|
|
const requesterSandboxed = params.requesterSandboxed === true || requesterRuntime.sandboxed;
|
|
if (requesterSandboxed) {
|
|
return 'Sandboxed sessions cannot spawn ACP sessions because runtime="acp" runs on the host. Use runtime="subagent" from sandboxed sessions.';
|
|
}
|
|
if (sandboxMode === "require") {
|
|
return 'sessions_spawn sandbox="require" is unsupported for runtime="acp" because ACP sessions run outside the sandbox. Use runtime="subagent" or sandbox="inherit".';
|
|
}
|
|
return undefined;
|
|
}
|
|
|
|
type PreparedAcpThreadBinding = {
|
|
channel: string;
|
|
accountId: string;
|
|
conversationId: string;
|
|
};
|
|
|
|
function resolveSpawnMode(params: {
|
|
requestedMode?: SpawnAcpMode;
|
|
threadRequested: boolean;
|
|
}): SpawnAcpMode {
|
|
if (params.requestedMode === "run" || params.requestedMode === "session") {
|
|
return params.requestedMode;
|
|
}
|
|
// Thread-bound spawns should default to persistent sessions.
|
|
return params.threadRequested ? "session" : "run";
|
|
}
|
|
|
|
function resolveAcpSessionMode(mode: SpawnAcpMode): AcpRuntimeSessionMode {
|
|
return mode === "session" ? "persistent" : "oneshot";
|
|
}
|
|
|
|
function isHeartbeatEnabledForSessionAgent(params: {
|
|
cfg: OpenClawConfig;
|
|
sessionKey?: string;
|
|
}): boolean {
|
|
if (!areHeartbeatsEnabled()) {
|
|
return false;
|
|
}
|
|
const requesterAgentId = parseAgentSessionKey(params.sessionKey)?.agentId;
|
|
if (!requesterAgentId) {
|
|
return true;
|
|
}
|
|
|
|
const agentEntries = params.cfg.agents?.list ?? [];
|
|
const hasExplicitHeartbeatAgents = agentEntries.some((entry) => Boolean(entry?.heartbeat));
|
|
const enabledByPolicy = hasExplicitHeartbeatAgents
|
|
? agentEntries.some(
|
|
(entry) => Boolean(entry?.heartbeat) && normalizeAgentId(entry?.id) === requesterAgentId,
|
|
)
|
|
: requesterAgentId === resolveDefaultAgentId(params.cfg);
|
|
if (!enabledByPolicy) {
|
|
return false;
|
|
}
|
|
|
|
const heartbeatEvery =
|
|
resolveAgentConfig(params.cfg, requesterAgentId)?.heartbeat?.every ??
|
|
params.cfg.agents?.defaults?.heartbeat?.every ??
|
|
DEFAULT_HEARTBEAT_EVERY;
|
|
const trimmedEvery = typeof heartbeatEvery === "string" ? heartbeatEvery.trim() : "";
|
|
if (!trimmedEvery) {
|
|
return false;
|
|
}
|
|
try {
|
|
return parseDurationMs(trimmedEvery, { defaultUnit: "m" }) > 0;
|
|
} catch {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
function resolveHeartbeatConfigForAgent(params: {
|
|
cfg: OpenClawConfig;
|
|
agentId: string;
|
|
}): NonNullable<NonNullable<OpenClawConfig["agents"]>["defaults"]>["heartbeat"] {
|
|
const defaults = params.cfg.agents?.defaults?.heartbeat;
|
|
const overrides = resolveAgentConfig(params.cfg, params.agentId)?.heartbeat;
|
|
if (!defaults && !overrides) {
|
|
return undefined;
|
|
}
|
|
return {
|
|
...defaults,
|
|
...overrides,
|
|
};
|
|
}
|
|
|
|
function hasSessionLocalHeartbeatRelayRoute(params: {
|
|
cfg: OpenClawConfig;
|
|
parentSessionKey: string;
|
|
requesterAgentId: string;
|
|
}): boolean {
|
|
const scope = params.cfg.session?.scope ?? "per-sender";
|
|
if (scope === "global") {
|
|
return false;
|
|
}
|
|
|
|
const heartbeat = resolveHeartbeatConfigForAgent({
|
|
cfg: params.cfg,
|
|
agentId: params.requesterAgentId,
|
|
});
|
|
if ((heartbeat?.target ?? "none") !== "last") {
|
|
return false;
|
|
}
|
|
|
|
// Explicit delivery overrides are not session-local and can route updates
|
|
// to unrelated destinations (for example a pinned ops channel).
|
|
if (typeof heartbeat?.to === "string" && heartbeat.to.trim().length > 0) {
|
|
return false;
|
|
}
|
|
if (typeof heartbeat?.accountId === "string" && heartbeat.accountId.trim().length > 0) {
|
|
return false;
|
|
}
|
|
|
|
const storePath = resolveStorePath(params.cfg.session?.store, {
|
|
agentId: params.requesterAgentId,
|
|
});
|
|
const sessionStore = loadSessionStore(storePath);
|
|
const parentEntry = sessionStore[params.parentSessionKey];
|
|
const parentDeliveryContext = deliveryContextFromSession(parentEntry);
|
|
return Boolean(parentDeliveryContext?.channel && parentDeliveryContext.to);
|
|
}
|
|
|
|
function resolveTargetAcpAgentId(params: {
|
|
requestedAgentId?: string;
|
|
cfg: OpenClawConfig;
|
|
}): { ok: true; agentId: string } | { ok: false; error: string } {
|
|
const requested = normalizeOptionalAgentId(params.requestedAgentId);
|
|
if (requested) {
|
|
return { ok: true, agentId: requested };
|
|
}
|
|
|
|
const configuredDefault = normalizeOptionalAgentId(params.cfg.acp?.defaultAgent);
|
|
if (configuredDefault) {
|
|
return { ok: true, agentId: configuredDefault };
|
|
}
|
|
|
|
return {
|
|
ok: false,
|
|
error:
|
|
"ACP target agent is not configured. Pass `agentId` in `sessions_spawn` or set `acp.defaultAgent` in config.",
|
|
};
|
|
}
|
|
|
|
function normalizeOptionalAgentId(value: string | undefined | null): string | undefined {
|
|
const trimmed = (value ?? "").trim();
|
|
if (!trimmed) {
|
|
return undefined;
|
|
}
|
|
return normalizeAgentId(trimmed);
|
|
}
|
|
|
|
function summarizeError(err: unknown): string {
|
|
if (err instanceof Error) {
|
|
return err.message;
|
|
}
|
|
if (typeof err === "string") {
|
|
return err;
|
|
}
|
|
return "error";
|
|
}
|
|
|
|
function resolveRequesterInternalSessionKey(params: {
|
|
cfg: OpenClawConfig;
|
|
requesterSessionKey?: string;
|
|
}): string {
|
|
const { mainKey, alias } = resolveMainSessionAlias(params.cfg);
|
|
const requesterSessionKey = params.requesterSessionKey?.trim();
|
|
return requesterSessionKey
|
|
? resolveInternalSessionKey({
|
|
key: requesterSessionKey,
|
|
alias,
|
|
mainKey,
|
|
})
|
|
: alias;
|
|
}
|
|
|
|
async function persistAcpSpawnSessionFileBestEffort(params: {
|
|
sessionId: string;
|
|
sessionKey: string;
|
|
sessionEntry: SessionEntry | undefined;
|
|
sessionStore: Record<string, SessionEntry>;
|
|
storePath: string;
|
|
agentId: string;
|
|
threadId?: string | number;
|
|
stage: "spawn" | "thread-bind";
|
|
}): Promise<SessionEntry | undefined> {
|
|
try {
|
|
const resolvedSessionFile = await resolveSessionTranscriptFile({
|
|
sessionId: params.sessionId,
|
|
sessionKey: params.sessionKey,
|
|
sessionEntry: params.sessionEntry,
|
|
sessionStore: params.sessionStore,
|
|
storePath: params.storePath,
|
|
agentId: params.agentId,
|
|
threadId: params.threadId,
|
|
});
|
|
return resolvedSessionFile.sessionEntry;
|
|
} catch (error) {
|
|
log.warn(
|
|
`ACP session-file persistence failed during ${params.stage} for ${params.sessionKey}: ${summarizeError(error)}`,
|
|
);
|
|
return params.sessionEntry;
|
|
}
|
|
}
|
|
|
|
function resolveConversationIdForThreadBinding(params: {
|
|
to?: string;
|
|
threadId?: string | number;
|
|
}): string | undefined {
|
|
return resolveConversationIdFromTargets({
|
|
threadId: params.threadId,
|
|
targets: [params.to],
|
|
});
|
|
}
|
|
|
|
function prepareAcpThreadBinding(params: {
|
|
cfg: OpenClawConfig;
|
|
channel?: string;
|
|
accountId?: string;
|
|
to?: string;
|
|
threadId?: string | number;
|
|
}): { ok: true; binding: PreparedAcpThreadBinding } | { ok: false; error: string } {
|
|
const channel = params.channel?.trim().toLowerCase();
|
|
if (!channel) {
|
|
return {
|
|
ok: false,
|
|
error: "thread=true for ACP sessions requires a channel context.",
|
|
};
|
|
}
|
|
|
|
const accountId = params.accountId?.trim() || "default";
|
|
const policy = resolveThreadBindingSpawnPolicy({
|
|
cfg: params.cfg,
|
|
channel,
|
|
accountId,
|
|
kind: "acp",
|
|
});
|
|
if (!policy.enabled) {
|
|
return {
|
|
ok: false,
|
|
error: formatThreadBindingDisabledError({
|
|
channel: policy.channel,
|
|
accountId: policy.accountId,
|
|
kind: "acp",
|
|
}),
|
|
};
|
|
}
|
|
if (!policy.spawnEnabled) {
|
|
return {
|
|
ok: false,
|
|
error: formatThreadBindingSpawnDisabledError({
|
|
channel: policy.channel,
|
|
accountId: policy.accountId,
|
|
kind: "acp",
|
|
}),
|
|
};
|
|
}
|
|
const bindingService = getSessionBindingService();
|
|
const capabilities = bindingService.getCapabilities({
|
|
channel: policy.channel,
|
|
accountId: policy.accountId,
|
|
});
|
|
if (!capabilities.adapterAvailable) {
|
|
return {
|
|
ok: false,
|
|
error: `Thread bindings are unavailable for ${policy.channel}.`,
|
|
};
|
|
}
|
|
if (!capabilities.bindSupported || !capabilities.placements.includes("child")) {
|
|
return {
|
|
ok: false,
|
|
error: `Thread bindings do not support ACP thread spawn for ${policy.channel}.`,
|
|
};
|
|
}
|
|
const conversationId = resolveConversationIdForThreadBinding({
|
|
to: params.to,
|
|
threadId: params.threadId,
|
|
});
|
|
if (!conversationId) {
|
|
return {
|
|
ok: false,
|
|
error: `Could not resolve a ${policy.channel} conversation for ACP thread spawn.`,
|
|
};
|
|
}
|
|
|
|
return {
|
|
ok: true,
|
|
binding: {
|
|
channel: policy.channel,
|
|
accountId: policy.accountId,
|
|
conversationId,
|
|
},
|
|
};
|
|
}
|
|
|
|
export async function spawnAcpDirect(
|
|
params: SpawnAcpParams,
|
|
ctx: SpawnAcpContext,
|
|
): Promise<SpawnAcpResult> {
|
|
const cfg = loadConfig();
|
|
const requesterInternalKey = resolveRequesterInternalSessionKey({
|
|
cfg,
|
|
requesterSessionKey: ctx.agentSessionKey,
|
|
});
|
|
if (!isAcpEnabledByPolicy(cfg)) {
|
|
return {
|
|
status: "forbidden",
|
|
error: "ACP is disabled by policy (`acp.enabled=false`).",
|
|
};
|
|
}
|
|
const streamToParentRequested = params.streamTo === "parent";
|
|
const parentSessionKey = ctx.agentSessionKey?.trim();
|
|
if (streamToParentRequested && !parentSessionKey) {
|
|
return {
|
|
status: "error",
|
|
error: 'sessions_spawn streamTo="parent" requires an active requester session context.',
|
|
};
|
|
}
|
|
|
|
const requestThreadBinding = params.thread === true;
|
|
const runtimePolicyError = resolveAcpSpawnRuntimePolicyError({
|
|
cfg,
|
|
requesterSessionKey: ctx.agentSessionKey,
|
|
requesterSandboxed: ctx.sandboxed,
|
|
sandbox: params.sandbox,
|
|
});
|
|
if (runtimePolicyError) {
|
|
return {
|
|
status: "forbidden",
|
|
error: runtimePolicyError,
|
|
};
|
|
}
|
|
|
|
const spawnMode = resolveSpawnMode({
|
|
requestedMode: params.mode,
|
|
threadRequested: requestThreadBinding,
|
|
});
|
|
if (spawnMode === "session" && !requestThreadBinding) {
|
|
return {
|
|
status: "error",
|
|
error: 'mode="session" requires thread=true so the ACP session can stay bound to a thread.',
|
|
};
|
|
}
|
|
|
|
const bindingService = getSessionBindingService();
|
|
const requesterParsedSession = parseAgentSessionKey(parentSessionKey);
|
|
const requesterIsSubagentSession =
|
|
Boolean(requesterParsedSession) && isSubagentSessionKey(parentSessionKey);
|
|
const requesterHasActiveSubagentBinding =
|
|
requesterIsSubagentSession && parentSessionKey
|
|
? bindingService
|
|
.listBySession(parentSessionKey)
|
|
.some((record) => record.targetKind === "subagent" && record.status !== "ended")
|
|
: false;
|
|
const requesterHasThreadContext =
|
|
typeof ctx.agentThreadId === "string"
|
|
? ctx.agentThreadId.trim().length > 0
|
|
: ctx.agentThreadId != null;
|
|
const requesterHeartbeatEnabled = isHeartbeatEnabledForSessionAgent({
|
|
cfg,
|
|
sessionKey: parentSessionKey,
|
|
});
|
|
const requesterAgentId = requesterParsedSession?.agentId;
|
|
const requesterHeartbeatRelayRouteUsable =
|
|
parentSessionKey && requesterAgentId
|
|
? hasSessionLocalHeartbeatRelayRoute({
|
|
cfg,
|
|
parentSessionKey,
|
|
requesterAgentId,
|
|
})
|
|
: false;
|
|
|
|
// For mode=run without thread binding, implicitly route output to parent
|
|
// only for spawned subagent orchestrator sessions with heartbeat enabled
|
|
// AND a session-local heartbeat delivery route (target=last + usable last route).
|
|
// Skip requester sessions that are thread-bound (or carrying thread context)
|
|
// so user-facing threads do not receive unsolicited ACP progress chatter
|
|
// unless streamTo="parent" is explicitly requested. Use resolved spawnMode
|
|
// (not params.mode) so default mode selection works.
|
|
const implicitStreamToParent =
|
|
!streamToParentRequested &&
|
|
spawnMode === "run" &&
|
|
!requestThreadBinding &&
|
|
requesterIsSubagentSession &&
|
|
!requesterHasActiveSubagentBinding &&
|
|
!requesterHasThreadContext &&
|
|
requesterHeartbeatEnabled &&
|
|
requesterHeartbeatRelayRouteUsable;
|
|
const effectiveStreamToParent = streamToParentRequested || implicitStreamToParent;
|
|
|
|
const targetAgentResult = resolveTargetAcpAgentId({
|
|
requestedAgentId: params.agentId,
|
|
cfg,
|
|
});
|
|
if (!targetAgentResult.ok) {
|
|
return {
|
|
status: "error",
|
|
error: targetAgentResult.error,
|
|
};
|
|
}
|
|
const targetAgentId = targetAgentResult.agentId;
|
|
const agentPolicyError = resolveAcpAgentPolicyError(cfg, targetAgentId);
|
|
if (agentPolicyError) {
|
|
return {
|
|
status: "forbidden",
|
|
error: agentPolicyError.message,
|
|
};
|
|
}
|
|
|
|
const sessionKey = `agent:${targetAgentId}:acp:${crypto.randomUUID()}`;
|
|
const runtimeMode = resolveAcpSessionMode(spawnMode);
|
|
|
|
let preparedBinding: PreparedAcpThreadBinding | null = null;
|
|
if (requestThreadBinding) {
|
|
const prepared = prepareAcpThreadBinding({
|
|
cfg,
|
|
channel: ctx.agentChannel,
|
|
accountId: ctx.agentAccountId,
|
|
to: ctx.agentTo,
|
|
threadId: ctx.agentThreadId,
|
|
});
|
|
if (!prepared.ok) {
|
|
return {
|
|
status: "error",
|
|
error: prepared.error,
|
|
};
|
|
}
|
|
preparedBinding = prepared.binding;
|
|
}
|
|
|
|
const acpManager = getAcpSessionManager();
|
|
let binding: SessionBindingRecord | null = null;
|
|
let sessionCreated = false;
|
|
let initializedRuntime: AcpSpawnRuntimeCloseHandle | undefined;
|
|
try {
|
|
await callGateway({
|
|
method: "sessions.patch",
|
|
params: {
|
|
key: sessionKey,
|
|
spawnedBy: requesterInternalKey,
|
|
...(params.label ? { label: params.label } : {}),
|
|
},
|
|
timeoutMs: 10_000,
|
|
});
|
|
sessionCreated = true;
|
|
const storePath = resolveStorePath(cfg.session?.store, { agentId: targetAgentId });
|
|
const sessionStore = loadSessionStore(storePath);
|
|
let sessionEntry: SessionEntry | undefined = sessionStore[sessionKey];
|
|
const sessionId = sessionEntry?.sessionId;
|
|
if (sessionId) {
|
|
sessionEntry = await persistAcpSpawnSessionFileBestEffort({
|
|
sessionId,
|
|
sessionKey,
|
|
sessionStore,
|
|
storePath,
|
|
sessionEntry,
|
|
agentId: targetAgentId,
|
|
stage: "spawn",
|
|
});
|
|
}
|
|
const initialized = await acpManager.initializeSession({
|
|
cfg,
|
|
sessionKey,
|
|
agent: targetAgentId,
|
|
mode: runtimeMode,
|
|
resumeSessionId: params.resumeSessionId,
|
|
cwd: params.cwd,
|
|
backendId: cfg.acp?.backend,
|
|
});
|
|
initializedRuntime = {
|
|
runtime: initialized.runtime,
|
|
handle: initialized.handle,
|
|
};
|
|
|
|
if (preparedBinding) {
|
|
binding = await bindingService.bind({
|
|
targetSessionKey: sessionKey,
|
|
targetKind: "session",
|
|
conversation: {
|
|
channel: preparedBinding.channel,
|
|
accountId: preparedBinding.accountId,
|
|
conversationId: preparedBinding.conversationId,
|
|
},
|
|
placement: "child",
|
|
metadata: {
|
|
threadName: resolveThreadBindingThreadName({
|
|
agentId: targetAgentId,
|
|
label: params.label || targetAgentId,
|
|
}),
|
|
agentId: targetAgentId,
|
|
label: params.label || undefined,
|
|
boundBy: "system",
|
|
introText: resolveThreadBindingIntroText({
|
|
agentId: targetAgentId,
|
|
label: params.label || undefined,
|
|
idleTimeoutMs: resolveThreadBindingIdleTimeoutMsForChannel({
|
|
cfg,
|
|
channel: preparedBinding.channel,
|
|
accountId: preparedBinding.accountId,
|
|
}),
|
|
maxAgeMs: resolveThreadBindingMaxAgeMsForChannel({
|
|
cfg,
|
|
channel: preparedBinding.channel,
|
|
accountId: preparedBinding.accountId,
|
|
}),
|
|
sessionCwd: resolveAcpSessionCwd(initialized.meta),
|
|
sessionDetails: resolveAcpThreadSessionDetailLines({
|
|
sessionKey,
|
|
meta: initialized.meta,
|
|
}),
|
|
}),
|
|
},
|
|
});
|
|
if (!binding?.conversation.conversationId) {
|
|
throw new Error(
|
|
`Failed to create and bind a ${preparedBinding.channel} thread for this ACP session.`,
|
|
);
|
|
}
|
|
if (sessionId) {
|
|
const boundThreadId = String(binding.conversation.conversationId).trim() || undefined;
|
|
if (boundThreadId) {
|
|
sessionEntry = await persistAcpSpawnSessionFileBestEffort({
|
|
sessionId,
|
|
sessionKey,
|
|
sessionStore,
|
|
storePath,
|
|
sessionEntry,
|
|
agentId: targetAgentId,
|
|
threadId: boundThreadId,
|
|
stage: "thread-bind",
|
|
});
|
|
}
|
|
}
|
|
}
|
|
} catch (err) {
|
|
await cleanupFailedAcpSpawn({
|
|
cfg,
|
|
sessionKey,
|
|
shouldDeleteSession: sessionCreated,
|
|
deleteTranscript: true,
|
|
runtimeCloseHandle: initializedRuntime,
|
|
});
|
|
return {
|
|
status: "error",
|
|
error: isSessionBindingError(err) ? err.message : summarizeError(err),
|
|
};
|
|
}
|
|
|
|
const requesterOrigin = normalizeDeliveryContext({
|
|
channel: ctx.agentChannel,
|
|
accountId: ctx.agentAccountId,
|
|
to: ctx.agentTo,
|
|
threadId: ctx.agentThreadId,
|
|
});
|
|
// For thread-bound ACP spawns, force bootstrap delivery to the new child thread.
|
|
const boundThreadIdRaw = binding?.conversation.conversationId;
|
|
const boundThreadId = boundThreadIdRaw ? String(boundThreadIdRaw).trim() || undefined : undefined;
|
|
const fallbackThreadIdRaw = requesterOrigin?.threadId;
|
|
const fallbackThreadId =
|
|
fallbackThreadIdRaw != null ? String(fallbackThreadIdRaw).trim() || undefined : undefined;
|
|
const deliveryThreadId = boundThreadId ?? fallbackThreadId;
|
|
const inferredDeliveryTo = boundThreadId
|
|
? `channel:${boundThreadId}`
|
|
: requesterOrigin?.to?.trim() || (deliveryThreadId ? `channel:${deliveryThreadId}` : undefined);
|
|
const hasDeliveryTarget = Boolean(requesterOrigin?.channel && inferredDeliveryTo);
|
|
// Fresh one-shot ACP runs should bootstrap the worker first, then let higher layers
|
|
// decide how to relay status. Inline delivery is reserved for thread-bound sessions.
|
|
const useInlineDelivery =
|
|
hasDeliveryTarget && spawnMode === "session" && !effectiveStreamToParent;
|
|
const childIdem = crypto.randomUUID();
|
|
let childRunId: string = childIdem;
|
|
const streamLogPath =
|
|
effectiveStreamToParent && parentSessionKey
|
|
? resolveAcpSpawnStreamLogPath({
|
|
childSessionKey: sessionKey,
|
|
})
|
|
: undefined;
|
|
let parentRelay: AcpSpawnParentRelayHandle | undefined;
|
|
if (effectiveStreamToParent && parentSessionKey) {
|
|
// Register relay before dispatch so fast lifecycle failures are not missed.
|
|
parentRelay = startAcpSpawnParentStreamRelay({
|
|
runId: childIdem,
|
|
parentSessionKey,
|
|
childSessionKey: sessionKey,
|
|
agentId: targetAgentId,
|
|
logPath: streamLogPath,
|
|
emitStartNotice: false,
|
|
});
|
|
}
|
|
try {
|
|
const response = await callGateway<{ runId?: string }>({
|
|
method: "agent",
|
|
params: {
|
|
message: params.task,
|
|
sessionKey,
|
|
channel: useInlineDelivery ? requesterOrigin?.channel : undefined,
|
|
to: useInlineDelivery ? inferredDeliveryTo : undefined,
|
|
accountId: useInlineDelivery ? (requesterOrigin?.accountId ?? undefined) : undefined,
|
|
threadId: useInlineDelivery ? deliveryThreadId : undefined,
|
|
idempotencyKey: childIdem,
|
|
deliver: useInlineDelivery,
|
|
label: params.label || undefined,
|
|
},
|
|
timeoutMs: 10_000,
|
|
});
|
|
if (typeof response?.runId === "string" && response.runId.trim()) {
|
|
childRunId = response.runId.trim();
|
|
}
|
|
} catch (err) {
|
|
parentRelay?.dispose();
|
|
await cleanupFailedAcpSpawn({
|
|
cfg,
|
|
sessionKey,
|
|
shouldDeleteSession: true,
|
|
deleteTranscript: true,
|
|
});
|
|
return {
|
|
status: "error",
|
|
error: summarizeError(err),
|
|
childSessionKey: sessionKey,
|
|
};
|
|
}
|
|
|
|
if (effectiveStreamToParent && parentSessionKey) {
|
|
if (parentRelay && childRunId !== childIdem) {
|
|
parentRelay.dispose();
|
|
// Defensive fallback if gateway returns a runId that differs from idempotency key.
|
|
parentRelay = startAcpSpawnParentStreamRelay({
|
|
runId: childRunId,
|
|
parentSessionKey,
|
|
childSessionKey: sessionKey,
|
|
agentId: targetAgentId,
|
|
logPath: streamLogPath,
|
|
emitStartNotice: false,
|
|
});
|
|
}
|
|
parentRelay?.notifyStarted();
|
|
return {
|
|
status: "accepted",
|
|
childSessionKey: sessionKey,
|
|
runId: childRunId,
|
|
mode: spawnMode,
|
|
...(streamLogPath ? { streamLogPath } : {}),
|
|
note: spawnMode === "session" ? ACP_SPAWN_SESSION_ACCEPTED_NOTE : ACP_SPAWN_ACCEPTED_NOTE,
|
|
};
|
|
}
|
|
|
|
return {
|
|
status: "accepted",
|
|
childSessionKey: sessionKey,
|
|
runId: childRunId,
|
|
mode: spawnMode,
|
|
note: spawnMode === "session" ? ACP_SPAWN_SESSION_ACCEPTED_NOTE : ACP_SPAWN_ACCEPTED_NOTE,
|
|
};
|
|
}
|