diff --git a/CHANGELOG.md b/CHANGELOG.md index 6f6f3c5cd46..71f034fc489 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -75,6 +75,7 @@ Docs: https://docs.openclaw.ai - Secrets/SecretRef: reject exec SecretRef traversal ids across schema, runtime, and gateway. (#42370) Thanks @joshavant. - Telegram/docs: clarify that `channels.telegram.groups` allowlists chats while `groupAllowFrom` allowlists users inside those chats, and point invalid negative chat IDs at the right config key. (#42451) Thanks @altaywtf. - Models/Alibaba Cloud Model Studio: wire `MODELSTUDIO_API_KEY` through shared env auth, implicit provider discovery, and shell-env fallback so onboarding works outside the wizard too. (#40634) Thanks @pomelo-nwu. +- ACP/sessions_spawn: implicitly stream `mode="run"` ACP spawns to parent only for eligible subagent orchestrator sessions (heartbeat `target: "last"` with a usable session-local route), restoring parent progress relays without thread binding. (#42404) Thanks @davidguttman. ## 2026.3.8 diff --git a/src/agents/acp-spawn-parent-stream.ts b/src/agents/acp-spawn-parent-stream.ts index 94f04ce3940..36b113386c2 100644 --- a/src/agents/acp-spawn-parent-stream.ts +++ b/src/agents/acp-spawn-parent-stream.ts @@ -180,7 +180,9 @@ export function startAcpSpawnParentStreamRelay(params: { }; const wake = () => { requestHeartbeatNow( - scopedHeartbeatWakeOptions(parentSessionKey, { reason: "acp:spawn:stream" }), + scopedHeartbeatWakeOptions(parentSessionKey, { + reason: "acp:spawn:stream", + }), ); }; const emit = (text: string, contextKey: string) => { diff --git a/src/agents/acp-spawn.test.ts b/src/agents/acp-spawn.test.ts index 0f28b709792..c53584cdf55 100644 --- a/src/agents/acp-spawn.test.ts +++ b/src/agents/acp-spawn.test.ts @@ -38,6 +38,7 @@ const hoisted = vi.hoisted(() => { const loadSessionStoreMock = vi.fn(); const resolveStorePathMock = vi.fn(); const resolveSessionTranscriptFileMock = vi.fn(); + const areHeartbeatsEnabledMock = vi.fn(); const state = { cfg: createDefaultSpawnConfig(), }; @@ -55,6 +56,7 @@ const hoisted = vi.hoisted(() => { loadSessionStoreMock, resolveStorePathMock, resolveSessionTranscriptFileMock, + areHeartbeatsEnabledMock, state, }; }); @@ -128,6 +130,14 @@ vi.mock("../infra/outbound/session-binding-service.js", async (importOriginal) = }; }); +vi.mock("../infra/heartbeat-wake.js", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + areHeartbeatsEnabled: () => hoisted.areHeartbeatsEnabledMock(), + }; +}); + vi.mock("./acp-spawn-parent-stream.js", () => ({ startAcpSpawnParentStreamRelay: (...args: unknown[]) => hoisted.startAcpSpawnParentStreamRelayMock(...args), @@ -192,6 +202,7 @@ function expectResolvedIntroTextInBindMetadata(): void { describe("spawnAcpDirect", () => { beforeEach(() => { hoisted.state.cfg = createDefaultSpawnConfig(); + hoisted.areHeartbeatsEnabledMock.mockReset().mockReturnValue(true); hoisted.callGatewayMock.mockReset().mockImplementation(async (argsUnknown: unknown) => { const args = argsUnknown as { method?: string }; @@ -393,6 +404,8 @@ describe("spawnAcpDirect", () => { expect(result.status).toBe("accepted"); expect(result.mode).toBe("run"); + expect(result.streamLogPath).toBeUndefined(); + expect(hoisted.startAcpSpawnParentStreamRelayMock).not.toHaveBeenCalled(); expect(hoisted.resolveSessionTranscriptFileMock).toHaveBeenCalledWith( expect.objectContaining({ sessionId: "sess-123", @@ -633,6 +646,290 @@ describe("spawnAcpDirect", () => { expect(secondHandle.notifyStarted).toHaveBeenCalledTimes(1); }); + it("implicitly streams mode=run ACP spawns for subagent requester sessions", async () => { + hoisted.state.cfg = { + ...hoisted.state.cfg, + agents: { + defaults: { + heartbeat: { + every: "30m", + target: "last", + }, + }, + }, + }; + const firstHandle = createRelayHandle(); + const secondHandle = createRelayHandle(); + hoisted.startAcpSpawnParentStreamRelayMock + .mockReset() + .mockReturnValueOnce(firstHandle) + .mockReturnValueOnce(secondHandle); + hoisted.loadSessionStoreMock.mockReset().mockImplementation(() => { + const store: Record< + string, + { sessionId: string; updatedAt: number; deliveryContext?: unknown } + > = { + "agent:main:subagent:parent": { + sessionId: "parent-sess-1", + updatedAt: Date.now(), + deliveryContext: { + channel: "discord", + to: "channel:parent-channel", + accountId: "default", + }, + }, + }; + return new Proxy(store, { + get(target, prop) { + if (typeof prop === "string" && prop.startsWith("agent:codex:acp:")) { + return { sessionId: "sess-123", updatedAt: Date.now() }; + } + return target[prop as keyof typeof target]; + }, + }); + }); + + const result = await spawnAcpDirect( + { + task: "Investigate flaky tests", + agentId: "codex", + }, + { + agentSessionKey: "agent:main:subagent:parent", + agentChannel: "discord", + agentAccountId: "default", + agentTo: "channel:parent-channel", + }, + ); + + expect(result.status).toBe("accepted"); + expect(result.mode).toBe("run"); + expect(result.streamLogPath).toBe("/tmp/sess-main.acp-stream.jsonl"); + const agentCall = hoisted.callGatewayMock.mock.calls + .map((call: unknown[]) => call[0] as { method?: string; params?: Record }) + .find((request) => request.method === "agent"); + expect(agentCall?.params?.deliver).toBe(false); + expect(agentCall?.params?.channel).toBeUndefined(); + expect(agentCall?.params?.to).toBeUndefined(); + expect(agentCall?.params?.threadId).toBeUndefined(); + expect(hoisted.startAcpSpawnParentStreamRelayMock).toHaveBeenCalledWith( + expect.objectContaining({ + parentSessionKey: "agent:main:subagent:parent", + agentId: "codex", + logPath: "/tmp/sess-main.acp-stream.jsonl", + emitStartNotice: false, + }), + ); + expect(firstHandle.dispose).toHaveBeenCalledTimes(1); + expect(secondHandle.notifyStarted).toHaveBeenCalledTimes(1); + }); + + it("does not implicitly stream when heartbeat target is not session-local", async () => { + hoisted.state.cfg = { + ...hoisted.state.cfg, + agents: { + defaults: { + heartbeat: { + every: "30m", + target: "discord", + to: "channel:ops-room", + }, + }, + }, + }; + + const result = await spawnAcpDirect( + { + task: "Investigate flaky tests", + agentId: "codex", + }, + { + agentSessionKey: "agent:main:subagent:fixed-target", + }, + ); + + expect(result.status).toBe("accepted"); + expect(result.mode).toBe("run"); + expect(result.streamLogPath).toBeUndefined(); + expect(hoisted.startAcpSpawnParentStreamRelayMock).not.toHaveBeenCalled(); + }); + + it("does not implicitly stream when session scope is global", async () => { + hoisted.state.cfg = { + ...hoisted.state.cfg, + session: { + ...hoisted.state.cfg.session, + scope: "global", + }, + agents: { + defaults: { + heartbeat: { + every: "30m", + target: "last", + }, + }, + }, + }; + + const result = await spawnAcpDirect( + { + task: "Investigate flaky tests", + agentId: "codex", + }, + { + agentSessionKey: "agent:main:subagent:global-scope", + }, + ); + + expect(result.status).toBe("accepted"); + expect(result.mode).toBe("run"); + expect(result.streamLogPath).toBeUndefined(); + expect(hoisted.startAcpSpawnParentStreamRelayMock).not.toHaveBeenCalled(); + }); + + it("does not implicitly stream for subagent requester sessions when heartbeat is disabled", async () => { + hoisted.state.cfg = { + ...hoisted.state.cfg, + agents: { + list: [{ id: "main", heartbeat: { every: "30m" } }, { id: "research" }], + }, + }; + + const result = await spawnAcpDirect( + { + task: "Investigate flaky tests", + agentId: "codex", + }, + { + agentSessionKey: "agent:research:subagent:orchestrator", + }, + ); + + expect(result.status).toBe("accepted"); + expect(result.mode).toBe("run"); + expect(result.streamLogPath).toBeUndefined(); + expect(hoisted.startAcpSpawnParentStreamRelayMock).not.toHaveBeenCalled(); + }); + + it("does not implicitly stream for subagent requester sessions when heartbeat cadence is invalid", async () => { + hoisted.state.cfg = { + ...hoisted.state.cfg, + agents: { + list: [ + { + id: "research", + heartbeat: { every: "0m" }, + }, + ], + }, + }; + + const result = await spawnAcpDirect( + { + task: "Investigate flaky tests", + agentId: "codex", + }, + { + agentSessionKey: "agent:research:subagent:invalid-heartbeat", + }, + ); + + expect(result.status).toBe("accepted"); + expect(result.mode).toBe("run"); + expect(result.streamLogPath).toBeUndefined(); + expect(hoisted.startAcpSpawnParentStreamRelayMock).not.toHaveBeenCalled(); + }); + + it("does not implicitly stream when heartbeats are runtime-disabled", async () => { + hoisted.areHeartbeatsEnabledMock.mockReturnValue(false); + + const result = await spawnAcpDirect( + { + task: "Investigate flaky tests", + agentId: "codex", + }, + { + agentSessionKey: "agent:main:subagent:runtime-disabled", + }, + ); + + expect(result.status).toBe("accepted"); + expect(result.mode).toBe("run"); + expect(result.streamLogPath).toBeUndefined(); + expect(hoisted.startAcpSpawnParentStreamRelayMock).not.toHaveBeenCalled(); + }); + + it("does not implicitly stream for legacy subagent requester session keys", async () => { + const result = await spawnAcpDirect( + { + task: "Investigate flaky tests", + agentId: "codex", + }, + { + agentSessionKey: "subagent:legacy-worker", + }, + ); + + expect(result.status).toBe("accepted"); + expect(result.mode).toBe("run"); + expect(result.streamLogPath).toBeUndefined(); + expect(hoisted.startAcpSpawnParentStreamRelayMock).not.toHaveBeenCalled(); + }); + + it("does not implicitly stream for subagent requester sessions with thread context", async () => { + const result = await spawnAcpDirect( + { + task: "Investigate flaky tests", + agentId: "codex", + }, + { + agentSessionKey: "agent:main:subagent:thread-context", + agentChannel: "discord", + agentAccountId: "default", + agentTo: "channel:parent-channel", + agentThreadId: "requester-thread", + }, + ); + + expect(result.status).toBe("accepted"); + expect(result.mode).toBe("run"); + expect(result.streamLogPath).toBeUndefined(); + expect(hoisted.startAcpSpawnParentStreamRelayMock).not.toHaveBeenCalled(); + }); + + it("does not implicitly stream for thread-bound subagent requester sessions", async () => { + hoisted.sessionBindingListBySessionMock.mockImplementation((targetSessionKey: string) => { + if (targetSessionKey === "agent:main:subagent:thread-bound") { + return [ + createSessionBinding({ + targetSessionKey, + targetKind: "subagent", + status: "active", + }), + ]; + } + return []; + }); + + const result = await spawnAcpDirect( + { + task: "Investigate flaky tests", + agentId: "codex", + }, + { + agentSessionKey: "agent:main:subagent:thread-bound", + agentChannel: "discord", + agentAccountId: "default", + agentTo: "channel:parent-channel", + }, + ); + + expect(result.status).toBe("accepted"); + expect(result.mode).toBe("run"); + expect(result.streamLogPath).toBeUndefined(); + expect(hoisted.startAcpSpawnParentStreamRelayMock).not.toHaveBeenCalled(); + }); + it("announces parent relay start only after successful child dispatch", async () => { const firstHandle = createRelayHandle(); const secondHandle = createRelayHandle(); diff --git a/src/agents/acp-spawn.ts b/src/agents/acp-spawn.ts index 5d305b25f27..9d68a234aea 100644 --- a/src/agents/acp-spawn.ts +++ b/src/agents/acp-spawn.ts @@ -10,6 +10,7 @@ import { resolveAcpThreadSessionDetailLines, } from "../acp/runtime/session-identifiers.js"; import type { AcpRuntimeSessionMode } from "../acp/runtime/types.js"; +import { DEFAULT_HEARTBEAT_EVERY } from "../auto-reply/heartbeat.js"; import { resolveThreadBindingIntroText, resolveThreadBindingThreadName, @@ -21,11 +22,13 @@ import { resolveThreadBindingMaxAgeMsForChannel, resolveThreadBindingSpawnPolicy, } from "../channels/thread-bindings-policy.js"; +import { parseDurationMs } from "../cli/parse-duration.js"; import { loadConfig } from "../config/config.js"; import type { OpenClawConfig } from "../config/config.js"; import { loadSessionStore, resolveStorePath, type SessionEntry } from "../config/sessions.js"; import { resolveSessionTranscriptFile } from "../config/sessions/transcript.js"; import { callGateway } from "../gateway/call.js"; +import { areHeartbeatsEnabled } from "../infra/heartbeat-wake.js"; import { resolveConversationIdFromTargets } from "../infra/outbound/conversation-id.js"; import { getSessionBindingService, @@ -33,13 +36,18 @@ import { type SessionBindingRecord, } from "../infra/outbound/session-binding-service.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; -import { normalizeAgentId } from "../routing/session-key.js"; -import { normalizeDeliveryContext } from "../utils/delivery-context.js"; +import { + isSubagentSessionKey, + normalizeAgentId, + parseAgentSessionKey, +} from "../routing/session-key.js"; +import { deliveryContextFromSession, normalizeDeliveryContext } from "../utils/delivery-context.js"; import { type AcpSpawnParentRelayHandle, resolveAcpSpawnStreamLogPath, startAcpSpawnParentStreamRelay, } from "./acp-spawn-parent-stream.js"; +import { resolveAgentConfig, resolveDefaultAgentId } from "./agent-scope.js"; import { resolveSandboxRuntimeStatus } from "./sandbox/runtime-status.js"; import { resolveInternalSessionKey, resolveMainSessionAlias } from "./tools/sessions-helpers.js"; @@ -130,6 +138,95 @@ function resolveAcpSessionMode(mode: SpawnAcpMode): AcpRuntimeSessionMode { return mode === "session" ? "persistent" : "oneshot"; } +function isHeartbeatEnabledForSessionAgent(params: { + cfg: OpenClawConfig; + sessionKey?: string; +}): boolean { + if (!areHeartbeatsEnabled()) { + return false; + } + const requesterAgentId = parseAgentSessionKey(params.sessionKey)?.agentId; + if (!requesterAgentId) { + return true; + } + + const agentEntries = params.cfg.agents?.list ?? []; + const hasExplicitHeartbeatAgents = agentEntries.some((entry) => Boolean(entry?.heartbeat)); + const enabledByPolicy = hasExplicitHeartbeatAgents + ? agentEntries.some( + (entry) => Boolean(entry?.heartbeat) && normalizeAgentId(entry?.id) === requesterAgentId, + ) + : requesterAgentId === resolveDefaultAgentId(params.cfg); + if (!enabledByPolicy) { + return false; + } + + const heartbeatEvery = + resolveAgentConfig(params.cfg, requesterAgentId)?.heartbeat?.every ?? + params.cfg.agents?.defaults?.heartbeat?.every ?? + DEFAULT_HEARTBEAT_EVERY; + const trimmedEvery = typeof heartbeatEvery === "string" ? heartbeatEvery.trim() : ""; + if (!trimmedEvery) { + return false; + } + try { + return parseDurationMs(trimmedEvery, { defaultUnit: "m" }) > 0; + } catch { + return false; + } +} + +function resolveHeartbeatConfigForAgent(params: { + cfg: OpenClawConfig; + agentId: string; +}): NonNullable["defaults"]>["heartbeat"] { + const defaults = params.cfg.agents?.defaults?.heartbeat; + const overrides = resolveAgentConfig(params.cfg, params.agentId)?.heartbeat; + if (!defaults && !overrides) { + return undefined; + } + return { + ...defaults, + ...overrides, + }; +} + +function hasSessionLocalHeartbeatRelayRoute(params: { + cfg: OpenClawConfig; + parentSessionKey: string; + requesterAgentId: string; +}): boolean { + const scope = params.cfg.session?.scope ?? "per-sender"; + if (scope === "global") { + return false; + } + + const heartbeat = resolveHeartbeatConfigForAgent({ + cfg: params.cfg, + agentId: params.requesterAgentId, + }); + if ((heartbeat?.target ?? "none") !== "last") { + return false; + } + + // Explicit delivery overrides are not session-local and can route updates + // to unrelated destinations (for example a pinned ops channel). + if (typeof heartbeat?.to === "string" && heartbeat.to.trim().length > 0) { + return false; + } + if (typeof heartbeat?.accountId === "string" && heartbeat.accountId.trim().length > 0) { + return false; + } + + const storePath = resolveStorePath(params.cfg.session?.store, { + agentId: params.requesterAgentId, + }); + const sessionStore = loadSessionStore(storePath); + const parentEntry = sessionStore[params.parentSessionKey]; + const parentDeliveryContext = deliveryContextFromSession(parentEntry); + return Boolean(parentDeliveryContext?.channel && parentDeliveryContext.to); +} + function resolveTargetAcpAgentId(params: { requestedAgentId?: string; cfg: OpenClawConfig; @@ -326,6 +423,8 @@ export async function spawnAcpDirect( error: 'sessions_spawn streamTo="parent" requires an active requester session context.', }; } + + const requestThreadBinding = params.thread === true; const runtimePolicyError = resolveAcpSpawnRuntimePolicyError({ cfg, requesterSessionKey: ctx.agentSessionKey, @@ -339,7 +438,6 @@ export async function spawnAcpDirect( }; } - const requestThreadBinding = params.thread === true; const spawnMode = resolveSpawnMode({ requestedMode: params.mode, threadRequested: requestThreadBinding, @@ -351,6 +449,52 @@ export async function spawnAcpDirect( }; } + const bindingService = getSessionBindingService(); + const requesterParsedSession = parseAgentSessionKey(parentSessionKey); + const requesterIsSubagentSession = + Boolean(requesterParsedSession) && isSubagentSessionKey(parentSessionKey); + const requesterHasActiveSubagentBinding = + requesterIsSubagentSession && parentSessionKey + ? bindingService + .listBySession(parentSessionKey) + .some((record) => record.targetKind === "subagent" && record.status !== "ended") + : false; + const requesterHasThreadContext = + typeof ctx.agentThreadId === "string" + ? ctx.agentThreadId.trim().length > 0 + : ctx.agentThreadId != null; + const requesterHeartbeatEnabled = isHeartbeatEnabledForSessionAgent({ + cfg, + sessionKey: parentSessionKey, + }); + const requesterAgentId = requesterParsedSession?.agentId; + const requesterHeartbeatRelayRouteUsable = + parentSessionKey && requesterAgentId + ? hasSessionLocalHeartbeatRelayRoute({ + cfg, + parentSessionKey, + requesterAgentId, + }) + : false; + + // For mode=run without thread binding, implicitly route output to parent + // only for spawned subagent orchestrator sessions with heartbeat enabled + // AND a session-local heartbeat delivery route (target=last + usable last route). + // Skip requester sessions that are thread-bound (or carrying thread context) + // so user-facing threads do not receive unsolicited ACP progress chatter + // unless streamTo="parent" is explicitly requested. Use resolved spawnMode + // (not params.mode) so default mode selection works. + const implicitStreamToParent = + !streamToParentRequested && + spawnMode === "run" && + !requestThreadBinding && + requesterIsSubagentSession && + !requesterHasActiveSubagentBinding && + !requesterHasThreadContext && + requesterHeartbeatEnabled && + requesterHeartbeatRelayRouteUsable; + const effectiveStreamToParent = streamToParentRequested || implicitStreamToParent; + const targetAgentResult = resolveTargetAcpAgentId({ requestedAgentId: params.agentId, cfg, @@ -392,7 +536,6 @@ export async function spawnAcpDirect( } const acpManager = getAcpSessionManager(); - const bindingService = getSessionBindingService(); let binding: SessionBindingRecord | null = null; let sessionCreated = false; let initializedRuntime: AcpSpawnRuntimeCloseHandle | undefined; @@ -530,17 +673,17 @@ export async function spawnAcpDirect( // Fresh one-shot ACP runs should bootstrap the worker first, then let higher layers // decide how to relay status. Inline delivery is reserved for thread-bound sessions. const useInlineDelivery = - hasDeliveryTarget && spawnMode === "session" && !streamToParentRequested; + hasDeliveryTarget && spawnMode === "session" && !effectiveStreamToParent; const childIdem = crypto.randomUUID(); let childRunId: string = childIdem; const streamLogPath = - streamToParentRequested && parentSessionKey + effectiveStreamToParent && parentSessionKey ? resolveAcpSpawnStreamLogPath({ childSessionKey: sessionKey, }) : undefined; let parentRelay: AcpSpawnParentRelayHandle | undefined; - if (streamToParentRequested && parentSessionKey) { + if (effectiveStreamToParent && parentSessionKey) { // Register relay before dispatch so fast lifecycle failures are not missed. parentRelay = startAcpSpawnParentStreamRelay({ runId: childIdem, @@ -585,7 +728,7 @@ export async function spawnAcpDirect( }; } - if (streamToParentRequested && parentSessionKey) { + if (effectiveStreamToParent && parentSessionKey) { if (parentRelay && childRunId !== childIdem) { parentRelay.dispose(); // Defensive fallback if gateway returns a runId that differs from idempotency key. diff --git a/src/infra/heartbeat-reason.test.ts b/src/infra/heartbeat-reason.test.ts index 6c2fdc68f97..69d23e3219d 100644 --- a/src/infra/heartbeat-reason.test.ts +++ b/src/infra/heartbeat-reason.test.ts @@ -19,6 +19,7 @@ describe("heartbeat-reason", () => { expect(resolveHeartbeatReasonKind("manual")).toBe("manual"); expect(resolveHeartbeatReasonKind("exec-event")).toBe("exec-event"); expect(resolveHeartbeatReasonKind("wake")).toBe("wake"); + expect(resolveHeartbeatReasonKind("acp:spawn:stream")).toBe("wake"); expect(resolveHeartbeatReasonKind("cron:job-1")).toBe("cron"); expect(resolveHeartbeatReasonKind("hook:wake")).toBe("hook"); expect(resolveHeartbeatReasonKind(" hook:wake ")).toBe("hook"); @@ -35,6 +36,7 @@ describe("heartbeat-reason", () => { expect(isHeartbeatEventDrivenReason("exec-event")).toBe(true); expect(isHeartbeatEventDrivenReason("cron:job-1")).toBe(true); expect(isHeartbeatEventDrivenReason("wake")).toBe(true); + expect(isHeartbeatEventDrivenReason("acp:spawn:stream")).toBe(true); expect(isHeartbeatEventDrivenReason("hook:gmail:sync")).toBe(true); expect(isHeartbeatEventDrivenReason("interval")).toBe(false); expect(isHeartbeatEventDrivenReason("manual")).toBe(false); diff --git a/src/infra/heartbeat-reason.ts b/src/infra/heartbeat-reason.ts index 968b1e24062..447ca733e53 100644 --- a/src/infra/heartbeat-reason.ts +++ b/src/infra/heartbeat-reason.ts @@ -34,6 +34,9 @@ export function resolveHeartbeatReasonKind(reason?: string): HeartbeatReasonKind if (trimmed === "wake") { return "wake"; } + if (trimmed.startsWith("acp:spawn:")) { + return "wake"; + } if (trimmed.startsWith("cron:")) { return "cron"; } diff --git a/src/infra/heartbeat-runner.ts b/src/infra/heartbeat-runner.ts index c3c58d34c1e..344fd22d8fc 100644 --- a/src/infra/heartbeat-runner.ts +++ b/src/infra/heartbeat-runner.ts @@ -38,7 +38,11 @@ import type { AgentDefaultsConfig } from "../config/types.agent-defaults.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; import { getQueueSize } from "../process/command-queue.js"; import { CommandLane } from "../process/lanes.js"; -import { normalizeAgentId, toAgentStoreSessionKey } from "../routing/session-key.js"; +import { + normalizeAgentId, + parseAgentSessionKey, + toAgentStoreSessionKey, +} from "../routing/session-key.js"; import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; import { escapeRegExp } from "../utils.js"; import { formatErrorMessage, hasErrnoCode } from "./errors.js"; @@ -53,9 +57,11 @@ import { emitHeartbeatEvent, resolveIndicatorType } from "./heartbeat-events.js" import { resolveHeartbeatReasonKind } from "./heartbeat-reason.js"; import { resolveHeartbeatVisibility } from "./heartbeat-visibility.js"; import { + areHeartbeatsEnabled, type HeartbeatRunResult, type HeartbeatWakeHandler, requestHeartbeatNow, + setHeartbeatsEnabled, setHeartbeatWakeHandler, } from "./heartbeat-wake.js"; import type { OutboundSendDeps } from "./outbound/deliver.js"; @@ -75,11 +81,8 @@ export type HeartbeatDeps = OutboundSendDeps & }; const log = createSubsystemLogger("gateway/heartbeat"); -let heartbeatsEnabled = true; -export function setHeartbeatsEnabled(enabled: boolean) { - heartbeatsEnabled = enabled; -} +export { areHeartbeatsEnabled, setHeartbeatsEnabled }; type HeartbeatConfig = AgentDefaultsConfig["heartbeat"]; type HeartbeatAgent = { @@ -611,9 +614,14 @@ export async function runHeartbeatOnce(opts: { deps?: HeartbeatDeps; }): Promise { const cfg = opts.cfg ?? loadConfig(); - const agentId = normalizeAgentId(opts.agentId ?? resolveDefaultAgentId(cfg)); + const explicitAgentId = typeof opts.agentId === "string" ? opts.agentId.trim() : ""; + const forcedSessionAgentId = + explicitAgentId.length > 0 ? undefined : parseAgentSessionKey(opts.sessionKey)?.agentId; + const agentId = normalizeAgentId( + explicitAgentId || forcedSessionAgentId || resolveDefaultAgentId(cfg), + ); const heartbeat = opts.heartbeat ?? resolveHeartbeatConfig(cfg, agentId); - if (!heartbeatsEnabled) { + if (!areHeartbeatsEnabled()) { return { status: "skipped", reason: "disabled" }; } if (!isHeartbeatEnabledForAgent(cfg, agentId)) { @@ -1114,7 +1122,7 @@ export function startHeartbeatRunner(opts: { reason: "disabled", } satisfies HeartbeatRunResult; } - if (!heartbeatsEnabled) { + if (!areHeartbeatsEnabled()) { return { status: "skipped", reason: "disabled", diff --git a/src/infra/heartbeat-wake.ts b/src/infra/heartbeat-wake.ts index bccfdfe9829..3aaaca5ed96 100644 --- a/src/infra/heartbeat-wake.ts +++ b/src/infra/heartbeat-wake.ts @@ -15,6 +15,16 @@ export type HeartbeatWakeHandler = (opts: { sessionKey?: string; }) => Promise; +let heartbeatsEnabled = true; + +export function setHeartbeatsEnabled(enabled: boolean) { + heartbeatsEnabled = enabled; +} + +export function areHeartbeatsEnabled(): boolean { + return heartbeatsEnabled; +} + type WakeTimerKind = "normal" | "retry"; type PendingWakeReason = { reason: string;