openclaw/src/agents/acp-spawn-parent-stream.ts

379 lines
11 KiB
TypeScript

import { appendFile, mkdir } from "node:fs/promises";
import path from "node:path";
import { readAcpSessionEntry } from "../acp/runtime/session-meta.js";
import { resolveSessionFilePath, resolveSessionFilePathOptions } from "../config/sessions/paths.js";
import { onAgentEvent } from "../infra/agent-events.js";
import { requestHeartbeatNow } from "../infra/heartbeat-wake.js";
import { enqueueSystemEvent } from "../infra/system-events.js";
import { scopedHeartbeatWakeOptions } from "../routing/session-key.js";
const DEFAULT_STREAM_FLUSH_MS = 2_500;
const DEFAULT_NO_OUTPUT_NOTICE_MS = 60_000;
const DEFAULT_NO_OUTPUT_POLL_MS = 15_000;
const DEFAULT_MAX_RELAY_LIFETIME_MS = 6 * 60 * 60 * 1000;
const STREAM_BUFFER_MAX_CHARS = 4_000;
const STREAM_SNIPPET_MAX_CHARS = 220;
function compactWhitespace(value: string): string {
return value.replace(/\s+/g, " ").trim();
}
function truncate(value: string, maxChars: number): string {
if (value.length <= maxChars) {
return value;
}
if (maxChars <= 1) {
return value.slice(0, maxChars);
}
return `${value.slice(0, maxChars - 1)}`;
}
function toTrimmedString(value: unknown): string | undefined {
if (typeof value !== "string") {
return undefined;
}
const trimmed = value.trim();
return trimmed || undefined;
}
function toFiniteNumber(value: unknown): number | undefined {
return typeof value === "number" && Number.isFinite(value) ? value : undefined;
}
function resolveAcpStreamLogPathFromSessionFile(sessionFile: string, sessionId: string): string {
const baseDir = path.dirname(path.resolve(sessionFile));
return path.join(baseDir, `${sessionId}.acp-stream.jsonl`);
}
export function resolveAcpSpawnStreamLogPath(params: {
childSessionKey: string;
}): string | undefined {
const childSessionKey = params.childSessionKey.trim();
if (!childSessionKey) {
return undefined;
}
const storeEntry = readAcpSessionEntry({
sessionKey: childSessionKey,
});
const sessionId = storeEntry?.entry?.sessionId?.trim();
if (!storeEntry || !sessionId) {
return undefined;
}
try {
const sessionFile = resolveSessionFilePath(
sessionId,
storeEntry.entry,
resolveSessionFilePathOptions({
storePath: storeEntry.storePath,
}),
);
return resolveAcpStreamLogPathFromSessionFile(sessionFile, sessionId);
} catch {
return undefined;
}
}
export function startAcpSpawnParentStreamRelay(params: {
runId: string;
parentSessionKey: string;
childSessionKey: string;
agentId: string;
logPath?: string;
streamFlushMs?: number;
noOutputNoticeMs?: number;
noOutputPollMs?: number;
maxRelayLifetimeMs?: number;
emitStartNotice?: boolean;
}): AcpSpawnParentRelayHandle {
const runId = params.runId.trim();
const parentSessionKey = params.parentSessionKey.trim();
if (!runId || !parentSessionKey) {
return {
dispose: () => {},
notifyStarted: () => {},
};
}
const streamFlushMs =
typeof params.streamFlushMs === "number" && Number.isFinite(params.streamFlushMs)
? Math.max(0, Math.floor(params.streamFlushMs))
: DEFAULT_STREAM_FLUSH_MS;
const noOutputNoticeMs =
typeof params.noOutputNoticeMs === "number" && Number.isFinite(params.noOutputNoticeMs)
? Math.max(0, Math.floor(params.noOutputNoticeMs))
: DEFAULT_NO_OUTPUT_NOTICE_MS;
const noOutputPollMs =
typeof params.noOutputPollMs === "number" && Number.isFinite(params.noOutputPollMs)
? Math.max(250, Math.floor(params.noOutputPollMs))
: DEFAULT_NO_OUTPUT_POLL_MS;
const maxRelayLifetimeMs =
typeof params.maxRelayLifetimeMs === "number" && Number.isFinite(params.maxRelayLifetimeMs)
? Math.max(1_000, Math.floor(params.maxRelayLifetimeMs))
: DEFAULT_MAX_RELAY_LIFETIME_MS;
const relayLabel = truncate(compactWhitespace(params.agentId), 40) || "ACP child";
const contextPrefix = `acp-spawn:${runId}`;
const logPath = toTrimmedString(params.logPath);
let logDirReady = false;
let pendingLogLines = "";
let logFlushScheduled = false;
let logWriteChain: Promise<void> = Promise.resolve();
const flushLogBuffer = () => {
if (!logPath || !pendingLogLines) {
return;
}
const chunk = pendingLogLines;
pendingLogLines = "";
logWriteChain = logWriteChain
.then(async () => {
if (!logDirReady) {
await mkdir(path.dirname(logPath), {
recursive: true,
});
logDirReady = true;
}
await appendFile(logPath, chunk, {
encoding: "utf-8",
mode: 0o600,
});
})
.catch(() => {
// Best-effort diagnostics; never break relay flow.
});
};
const scheduleLogFlush = () => {
if (!logPath || logFlushScheduled) {
return;
}
logFlushScheduled = true;
queueMicrotask(() => {
logFlushScheduled = false;
flushLogBuffer();
});
};
const writeLogLine = (entry: Record<string, unknown>) => {
if (!logPath) {
return;
}
try {
pendingLogLines += `${JSON.stringify(entry)}\n`;
if (pendingLogLines.length >= 16_384) {
flushLogBuffer();
return;
}
scheduleLogFlush();
} catch {
// Best-effort diagnostics; never break relay flow.
}
};
const logEvent = (kind: string, fields?: Record<string, unknown>) => {
writeLogLine({
ts: new Date().toISOString(),
epochMs: Date.now(),
runId,
parentSessionKey,
childSessionKey: params.childSessionKey,
agentId: params.agentId,
kind,
...fields,
});
};
const wake = () => {
requestHeartbeatNow(
scopedHeartbeatWakeOptions(parentSessionKey, {
reason: "acp:spawn:stream",
}),
);
};
const emit = (text: string, contextKey: string) => {
const cleaned = text.trim();
if (!cleaned) {
return;
}
logEvent("system_event", { contextKey, text: cleaned });
enqueueSystemEvent(cleaned, { sessionKey: parentSessionKey, contextKey });
wake();
};
const emitStartNotice = () => {
emit(
`Started ${relayLabel} session ${params.childSessionKey}. Streaming progress updates to parent session.`,
`${contextPrefix}:start`,
);
};
let disposed = false;
let pendingText = "";
let lastProgressAt = Date.now();
let stallNotified = false;
let flushTimer: NodeJS.Timeout | undefined;
let relayLifetimeTimer: NodeJS.Timeout | undefined;
const clearFlushTimer = () => {
if (!flushTimer) {
return;
}
clearTimeout(flushTimer);
flushTimer = undefined;
};
const clearRelayLifetimeTimer = () => {
if (!relayLifetimeTimer) {
return;
}
clearTimeout(relayLifetimeTimer);
relayLifetimeTimer = undefined;
};
const flushPending = () => {
clearFlushTimer();
if (!pendingText) {
return;
}
const snippet = truncate(compactWhitespace(pendingText), STREAM_SNIPPET_MAX_CHARS);
pendingText = "";
if (!snippet) {
return;
}
emit(`${relayLabel}: ${snippet}`, `${contextPrefix}:progress`);
};
const scheduleFlush = () => {
if (disposed || flushTimer || streamFlushMs <= 0) {
return;
}
flushTimer = setTimeout(() => {
flushPending();
}, streamFlushMs);
flushTimer.unref?.();
};
const noOutputWatcherTimer = setInterval(() => {
if (disposed || noOutputNoticeMs <= 0) {
return;
}
if (stallNotified) {
return;
}
if (Date.now() - lastProgressAt < noOutputNoticeMs) {
return;
}
stallNotified = true;
emit(
`${relayLabel} has produced no output for ${Math.round(noOutputNoticeMs / 1000)}s. It may be waiting for interactive input.`,
`${contextPrefix}:stall`,
);
}, noOutputPollMs);
noOutputWatcherTimer.unref?.();
relayLifetimeTimer = setTimeout(() => {
if (disposed) {
return;
}
emit(
`${relayLabel} stream relay timed out after ${Math.max(1, Math.round(maxRelayLifetimeMs / 1000))}s without completion.`,
`${contextPrefix}:timeout`,
);
dispose();
}, maxRelayLifetimeMs);
relayLifetimeTimer.unref?.();
if (params.emitStartNotice !== false) {
emitStartNotice();
}
const unsubscribe = onAgentEvent((event) => {
if (disposed || event.runId !== runId) {
return;
}
if (event.stream === "assistant") {
const data = event.data;
const deltaCandidate =
(data as { delta?: unknown } | undefined)?.delta ??
(data as { text?: unknown } | undefined)?.text;
const delta = typeof deltaCandidate === "string" ? deltaCandidate : undefined;
if (!delta || !delta.trim()) {
return;
}
logEvent("assistant_delta", { delta });
if (stallNotified) {
stallNotified = false;
emit(`${relayLabel} resumed output.`, `${contextPrefix}:resumed`);
}
lastProgressAt = Date.now();
pendingText += delta;
if (pendingText.length > STREAM_BUFFER_MAX_CHARS) {
pendingText = pendingText.slice(-STREAM_BUFFER_MAX_CHARS);
}
if (pendingText.length >= STREAM_SNIPPET_MAX_CHARS || delta.includes("\n\n")) {
flushPending();
return;
}
scheduleFlush();
return;
}
if (event.stream !== "lifecycle") {
return;
}
const phase = toTrimmedString((event.data as { phase?: unknown } | undefined)?.phase);
logEvent("lifecycle", { phase: phase ?? "unknown", data: event.data });
if (phase === "end") {
flushPending();
const startedAt = toFiniteNumber(
(event.data as { startedAt?: unknown } | undefined)?.startedAt,
);
const endedAt = toFiniteNumber((event.data as { endedAt?: unknown } | undefined)?.endedAt);
const durationMs =
startedAt != null && endedAt != null && endedAt >= startedAt
? endedAt - startedAt
: undefined;
if (durationMs != null) {
emit(
`${relayLabel} run completed in ${Math.max(1, Math.round(durationMs / 1000))}s.`,
`${contextPrefix}:done`,
);
} else {
emit(`${relayLabel} run completed.`, `${contextPrefix}:done`);
}
dispose();
return;
}
if (phase === "error") {
flushPending();
const errorText = toTrimmedString((event.data as { error?: unknown } | undefined)?.error);
if (errorText) {
emit(`${relayLabel} run failed: ${errorText}`, `${contextPrefix}:error`);
} else {
emit(`${relayLabel} run failed.`, `${contextPrefix}:error`);
}
dispose();
}
});
const dispose = () => {
if (disposed) {
return;
}
disposed = true;
clearFlushTimer();
clearRelayLifetimeTimer();
flushLogBuffer();
clearInterval(noOutputWatcherTimer);
unsubscribe();
};
return {
dispose,
notifyStarted: emitStartNotice,
};
}
export type AcpSpawnParentRelayHandle = {
dispose: () => void;
notifyStarted: () => void;
};