openclaw/src/auto-reply/reply/agent-runner-memory.ts

785 lines
27 KiB
TypeScript

import crypto from "node:crypto";
import fs from "node:fs";
import type { AgentMessage } from "@mariozechner/pi-agent-core";
import { resolveBootstrapWarningSignaturesSeen } from "../../agents/bootstrap-budget.js";
import { estimateMessagesTokens } from "../../agents/compaction.js";
import { runWithModelFallback } from "../../agents/model-fallback.js";
import { isCliProvider } from "../../agents/model-selection.js";
import { compactEmbeddedPiSession, runEmbeddedPiAgent } from "../../agents/pi-embedded.js";
import { resolveSandboxConfigForAgent, resolveSandboxRuntimeStatus } from "../../agents/sandbox.js";
import {
derivePromptTokens,
hasNonzeroUsage,
normalizeUsage,
type UsageLike,
} from "../../agents/usage.js";
import type { OpenClawConfig } from "../../config/config.js";
import {
resolveAgentIdFromSessionKey,
resolveFreshSessionTotalTokens,
resolveSessionFilePath,
resolveSessionFilePathOptions,
type SessionEntry,
updateSessionStoreEntry,
} from "../../config/sessions.js";
import { readSessionMessages } from "../../gateway/session-utils.fs.js";
import { logVerbose } from "../../globals.js";
import { registerAgentRunContext } from "../../infra/agent-events.js";
import { resolveMemoryFlushPlan } from "../../plugins/memory-state.js";
import type { TemplateContext } from "../templating.js";
import type { VerboseLevel } from "../thinking.js";
import type { GetReplyOptions } from "../types.js";
import {
buildEmbeddedRunExecutionParams,
resolveModelFallbackOptions,
} from "./agent-runner-utils.js";
import {
hasAlreadyFlushedForCurrentCompaction,
resolveMemoryFlushContextWindowTokens,
shouldRunMemoryFlush,
shouldRunPreflightCompaction,
} from "./memory-flush.js";
import { readPostCompactionContext } from "./post-compaction-context.js";
import { refreshQueuedFollowupSession, type FollowupRun } from "./queue.js";
import { incrementCompactionCount } from "./session-updates.js";
export function estimatePromptTokensForMemoryFlush(prompt?: string): number | undefined {
const trimmed = prompt?.trim();
if (!trimmed) {
return undefined;
}
const message: AgentMessage = { role: "user", content: trimmed, timestamp: Date.now() };
const tokens = estimateMessagesTokens([message]);
if (!Number.isFinite(tokens) || tokens <= 0) {
return undefined;
}
return Math.ceil(tokens);
}
export function resolveEffectivePromptTokens(
basePromptTokens?: number,
lastOutputTokens?: number,
promptTokenEstimate?: number,
): number {
const base = Math.max(0, basePromptTokens ?? 0);
const output = Math.max(0, lastOutputTokens ?? 0);
const estimate = Math.max(0, promptTokenEstimate ?? 0);
// Flush gating projects the next input context by adding the previous
// completion and the current user prompt estimate.
return base + output + estimate;
}
export type SessionTranscriptUsageSnapshot = {
promptTokens?: number;
outputTokens?: number;
};
// Keep a generous near-threshold window so large assistant outputs still trigger
// transcript reads in time to flip memory-flush gating when needed.
const TRANSCRIPT_OUTPUT_READ_BUFFER_TOKENS = 8192;
const TRANSCRIPT_TAIL_CHUNK_BYTES = 64 * 1024;
function parseUsageFromTranscriptLine(line: string): ReturnType<typeof normalizeUsage> | undefined {
const trimmed = line.trim();
if (!trimmed) {
return undefined;
}
try {
const parsed = JSON.parse(trimmed) as {
message?: { usage?: UsageLike };
usage?: UsageLike;
};
const usageRaw = parsed.message?.usage ?? parsed.usage;
const usage = normalizeUsage(usageRaw);
if (usage && hasNonzeroUsage(usage)) {
return usage;
}
} catch {
// ignore bad lines
}
return undefined;
}
function resolveSessionLogPath(
sessionId?: string,
sessionEntry?: SessionEntry,
sessionKey?: string,
opts?: { storePath?: string },
): string | undefined {
if (!sessionId) {
return undefined;
}
try {
const transcriptPath = (
sessionEntry as (SessionEntry & { transcriptPath?: string }) | undefined
)?.transcriptPath?.trim();
const sessionFile = sessionEntry?.sessionFile?.trim() || transcriptPath;
const agentId = resolveAgentIdFromSessionKey(sessionKey);
const pathOpts = resolveSessionFilePathOptions({
agentId,
storePath: opts?.storePath,
});
// Normalize sessionFile through resolveSessionFilePath so relative entries
// are resolved against the sessions dir/store layout, not process.cwd().
return resolveSessionFilePath(
sessionId,
sessionFile ? { sessionFile } : sessionEntry,
pathOpts,
);
} catch {
return undefined;
}
}
function deriveTranscriptUsageSnapshot(
usage: ReturnType<typeof normalizeUsage> | undefined,
): SessionTranscriptUsageSnapshot | undefined {
if (!usage) {
return undefined;
}
const promptTokens = derivePromptTokens(usage);
const outputRaw = usage.output;
const outputTokens =
typeof outputRaw === "number" && Number.isFinite(outputRaw) && outputRaw > 0
? outputRaw
: undefined;
if (!(typeof promptTokens === "number") && !(typeof outputTokens === "number")) {
return undefined;
}
return {
promptTokens,
outputTokens,
};
}
type SessionLogSnapshot = {
byteSize?: number;
usage?: SessionTranscriptUsageSnapshot;
};
async function appendPostCompactionRefreshPrompt(params: {
cfg: OpenClawConfig;
followupRun: FollowupRun;
}): Promise<void> {
const refreshPrompt = await readPostCompactionContext(
params.followupRun.run.workspaceDir,
params.cfg,
);
if (!refreshPrompt) {
return;
}
const existingPrompt = params.followupRun.run.extraSystemPrompt?.trim();
if (existingPrompt?.includes(refreshPrompt)) {
return;
}
params.followupRun.run.extraSystemPrompt = [existingPrompt, refreshPrompt]
.filter(Boolean)
.join("\n\n");
}
async function readSessionLogSnapshot(params: {
sessionId?: string;
sessionEntry?: SessionEntry;
sessionKey?: string;
opts?: { storePath?: string };
includeByteSize: boolean;
includeUsage: boolean;
}): Promise<SessionLogSnapshot> {
const logPath = resolveSessionLogPath(
params.sessionId,
params.sessionEntry,
params.sessionKey,
params.opts,
);
if (!logPath) {
return {};
}
const snapshot: SessionLogSnapshot = {};
if (params.includeByteSize) {
try {
const stat = await fs.promises.stat(logPath);
const size = Math.floor(stat.size);
snapshot.byteSize = Number.isFinite(size) && size >= 0 ? size : undefined;
} catch {
snapshot.byteSize = undefined;
}
}
if (params.includeUsage) {
try {
const lastUsage = await readLastNonzeroUsageFromSessionLog(logPath);
snapshot.usage = deriveTranscriptUsageSnapshot(lastUsage);
} catch {
snapshot.usage = undefined;
}
}
return snapshot;
}
async function readLastNonzeroUsageFromSessionLog(logPath: string) {
const handle = await fs.promises.open(logPath, "r");
try {
const stat = await handle.stat();
let position = stat.size;
let leadingPartial = "";
while (position > 0) {
const chunkSize = Math.min(TRANSCRIPT_TAIL_CHUNK_BYTES, position);
const start = position - chunkSize;
const buffer = Buffer.allocUnsafe(chunkSize);
const { bytesRead } = await handle.read(buffer, 0, chunkSize, start);
if (bytesRead <= 0) {
break;
}
const chunk = buffer.toString("utf-8", 0, bytesRead);
const combined = `${chunk}${leadingPartial}`;
const lines = combined.split(/\n+/);
leadingPartial = lines.shift() ?? "";
for (let i = lines.length - 1; i >= 0; i -= 1) {
const usage = parseUsageFromTranscriptLine(lines[i] ?? "");
if (usage) {
return usage;
}
}
position = start;
}
return parseUsageFromTranscriptLine(leadingPartial);
} finally {
await handle.close();
}
}
function estimatePromptTokensFromSessionTranscript(params: {
sessionId?: string;
storePath?: string;
sessionFile?: string;
}): number | undefined {
const sessionId = params.sessionId?.trim();
if (!sessionId) {
return undefined;
}
try {
const messages = readSessionMessages(
sessionId,
params.storePath,
params.sessionFile,
) as AgentMessage[];
if (messages.length === 0) {
return undefined;
}
const estimatedTokens = estimateMessagesTokens(messages);
if (!Number.isFinite(estimatedTokens) || estimatedTokens <= 0) {
return undefined;
}
return Math.ceil(estimatedTokens);
} catch {
return undefined;
}
}
export async function readPromptTokensFromSessionLog(
sessionId?: string,
sessionEntry?: SessionEntry,
sessionKey?: string,
opts?: { storePath?: string },
): Promise<SessionTranscriptUsageSnapshot | undefined> {
const snapshot = await readSessionLogSnapshot({
sessionId,
sessionEntry,
sessionKey,
opts,
includeByteSize: false,
includeUsage: true,
});
return snapshot.usage;
}
export async function runPreflightCompactionIfNeeded(params: {
cfg: OpenClawConfig;
followupRun: FollowupRun;
promptForEstimate?: string;
defaultModel: string;
agentCfgContextTokens?: number;
sessionEntry?: SessionEntry;
sessionStore?: Record<string, SessionEntry>;
sessionKey?: string;
storePath?: string;
isHeartbeat: boolean;
}): Promise<SessionEntry | undefined> {
if (!params.sessionKey) {
return params.sessionEntry;
}
let entry =
params.sessionEntry ??
(params.sessionKey ? params.sessionStore?.[params.sessionKey] : undefined);
if (!entry?.sessionId) {
return entry ?? params.sessionEntry;
}
const isCli = isCliProvider(params.followupRun.run.provider, params.cfg);
if (params.isHeartbeat || isCli) {
return entry ?? params.sessionEntry;
}
const contextWindowTokens = resolveMemoryFlushContextWindowTokens({
modelId: params.followupRun.run.model ?? params.defaultModel,
agentCfgContextTokens: params.agentCfgContextTokens,
});
const memoryFlushPlan = resolveMemoryFlushPlan({ cfg: params.cfg });
const reserveTokensFloor =
memoryFlushPlan?.reserveTokensFloor ??
params.cfg.agents?.defaults?.compaction?.reserveTokensFloor ??
20_000;
const softThresholdTokens = memoryFlushPlan?.softThresholdTokens ?? 4_000;
const freshPersistedTokens = resolveFreshSessionTotalTokens(entry);
const persistedTotalTokens = entry.totalTokens;
const hasPersistedTotalTokens =
typeof persistedTotalTokens === "number" &&
Number.isFinite(persistedTotalTokens) &&
persistedTotalTokens > 0;
const shouldUseTranscriptFallback = entry.totalTokensFresh === false || !hasPersistedTotalTokens;
if (!shouldUseTranscriptFallback) {
return entry ?? params.sessionEntry;
}
const promptTokenEstimate = estimatePromptTokensForMemoryFlush(
params.promptForEstimate ?? params.followupRun.prompt,
);
const transcriptPromptTokens =
typeof freshPersistedTokens === "number"
? undefined
: estimatePromptTokensFromSessionTranscript({
sessionId: entry.sessionId,
storePath: params.storePath,
sessionFile: entry.sessionFile ?? params.followupRun.run.sessionFile,
});
const projectedTokenCount =
typeof transcriptPromptTokens === "number"
? resolveEffectivePromptTokens(transcriptPromptTokens, undefined, promptTokenEstimate)
: undefined;
const tokenCountForCompaction =
typeof projectedTokenCount === "number" &&
Number.isFinite(projectedTokenCount) &&
projectedTokenCount > 0
? projectedTokenCount
: undefined;
const threshold = contextWindowTokens - reserveTokensFloor - softThresholdTokens;
logVerbose(
`preflightCompaction check: sessionKey=${params.sessionKey} ` +
`tokenCount=${tokenCountForCompaction ?? freshPersistedTokens ?? "undefined"} ` +
`contextWindow=${contextWindowTokens} threshold=${threshold} ` +
`isHeartbeat=${params.isHeartbeat} isCli=${isCli} ` +
`persistedFresh=${entry?.totalTokensFresh === true} ` +
`transcriptPromptTokens=${transcriptPromptTokens ?? "undefined"} ` +
`promptTokensEst=${promptTokenEstimate ?? "undefined"}`,
);
const shouldCompact = shouldRunPreflightCompaction({
entry,
tokenCount: tokenCountForCompaction,
contextWindowTokens,
reserveTokensFloor,
softThresholdTokens,
});
if (!shouldCompact) {
return entry ?? params.sessionEntry;
}
logVerbose(
`preflightCompaction triggered: sessionKey=${params.sessionKey} ` +
`tokenCount=${tokenCountForCompaction ?? freshPersistedTokens ?? "undefined"} ` +
`threshold=${threshold}`,
);
const sessionFile = resolveSessionLogPath(
entry.sessionId,
entry.sessionFile ? entry : { ...entry, sessionFile: params.followupRun.run.sessionFile },
params.sessionKey ?? params.followupRun.run.sessionKey,
{ storePath: params.storePath },
);
const result = await compactEmbeddedPiSession({
sessionId: entry.sessionId,
sessionKey: params.sessionKey,
allowGatewaySubagentBinding: true,
messageChannel: params.followupRun.run.messageProvider,
groupId: entry.groupId ?? params.followupRun.run.groupId,
groupChannel: entry.groupChannel ?? params.followupRun.run.groupChannel,
groupSpace: entry.space ?? params.followupRun.run.groupSpace,
sessionFile: sessionFile ?? params.followupRun.run.sessionFile,
workspaceDir: params.followupRun.run.workspaceDir,
agentDir: params.followupRun.run.agentDir,
config: params.cfg,
skillsSnapshot: entry.skillsSnapshot ?? params.followupRun.run.skillsSnapshot,
provider: params.followupRun.run.provider,
model: params.followupRun.run.model,
thinkLevel: params.followupRun.run.thinkLevel,
bashElevated: params.followupRun.run.bashElevated,
trigger: "budget",
currentTokenCount: tokenCountForCompaction,
senderIsOwner: params.followupRun.run.senderIsOwner,
ownerNumbers: params.followupRun.run.ownerNumbers,
});
if (!result?.ok || !result.compacted) {
logVerbose(
`preflightCompaction skipped: sessionKey=${params.sessionKey} reason=${result?.reason ?? "not_compacted"}`,
);
return entry ?? params.sessionEntry;
}
await incrementCompactionCount({
sessionEntry: entry,
sessionStore: params.sessionStore,
sessionKey: params.sessionKey,
storePath: params.storePath,
tokensAfter: result.result?.tokensAfter,
});
await appendPostCompactionRefreshPrompt({
cfg: params.cfg,
followupRun: params.followupRun,
});
entry = params.sessionStore?.[params.sessionKey] ?? entry;
return entry ?? params.sessionEntry;
}
export async function runMemoryFlushIfNeeded(params: {
cfg: OpenClawConfig;
followupRun: FollowupRun;
promptForEstimate?: string;
sessionCtx: TemplateContext;
opts?: GetReplyOptions;
defaultModel: string;
agentCfgContextTokens?: number;
resolvedVerboseLevel: VerboseLevel;
sessionEntry?: SessionEntry;
sessionStore?: Record<string, SessionEntry>;
sessionKey?: string;
storePath?: string;
isHeartbeat: boolean;
}): Promise<SessionEntry | undefined> {
const memoryFlushPlan = resolveMemoryFlushPlan({ cfg: params.cfg });
if (!memoryFlushPlan) {
return params.sessionEntry;
}
const memoryFlushWritable = (() => {
if (!params.sessionKey) {
return true;
}
const runtime = resolveSandboxRuntimeStatus({
cfg: params.cfg,
sessionKey: params.sessionKey,
});
if (!runtime.sandboxed) {
return true;
}
const sandboxCfg = resolveSandboxConfigForAgent(params.cfg, runtime.agentId);
return sandboxCfg.workspaceAccess === "rw";
})();
const isCli = isCliProvider(params.followupRun.run.provider, params.cfg);
const canAttemptFlush = memoryFlushWritable && !params.isHeartbeat && !isCli;
let entry =
params.sessionEntry ??
(params.sessionKey ? params.sessionStore?.[params.sessionKey] : undefined);
const contextWindowTokens = resolveMemoryFlushContextWindowTokens({
modelId: params.followupRun.run.model ?? params.defaultModel,
agentCfgContextTokens: params.agentCfgContextTokens,
});
const promptTokenEstimate = estimatePromptTokensForMemoryFlush(
params.promptForEstimate ?? params.followupRun.prompt,
);
const persistedPromptTokensRaw = entry?.totalTokens;
const persistedPromptTokens =
typeof persistedPromptTokensRaw === "number" &&
Number.isFinite(persistedPromptTokensRaw) &&
persistedPromptTokensRaw > 0
? persistedPromptTokensRaw
: undefined;
const hasFreshPersistedPromptTokens =
typeof persistedPromptTokens === "number" && entry?.totalTokensFresh === true;
const flushThreshold =
contextWindowTokens - memoryFlushPlan.reserveTokensFloor - memoryFlushPlan.softThresholdTokens;
// When totals are stale/unknown, derive prompt + last output from transcript so memory
// flush can still be evaluated against projected next-input size.
//
// When totals are fresh, only read the transcript when we're close enough to the
// threshold that missing the last output tokens could flip the decision.
const shouldReadTranscriptForOutput =
canAttemptFlush &&
entry &&
hasFreshPersistedPromptTokens &&
typeof promptTokenEstimate === "number" &&
Number.isFinite(promptTokenEstimate) &&
flushThreshold > 0 &&
(persistedPromptTokens ?? 0) + promptTokenEstimate >=
flushThreshold - TRANSCRIPT_OUTPUT_READ_BUFFER_TOKENS;
const shouldReadTranscript = Boolean(
canAttemptFlush && entry && (!hasFreshPersistedPromptTokens || shouldReadTranscriptForOutput),
);
const forceFlushTranscriptBytes = memoryFlushPlan.forceFlushTranscriptBytes;
const shouldCheckTranscriptSizeForForcedFlush = Boolean(
canAttemptFlush &&
entry &&
Number.isFinite(forceFlushTranscriptBytes) &&
forceFlushTranscriptBytes > 0,
);
const shouldReadSessionLog = shouldReadTranscript || shouldCheckTranscriptSizeForForcedFlush;
const sessionLogSnapshot = shouldReadSessionLog
? await readSessionLogSnapshot({
sessionId: params.followupRun.run.sessionId,
sessionEntry: entry,
sessionKey: params.sessionKey ?? params.followupRun.run.sessionKey,
opts: { storePath: params.storePath },
includeByteSize: shouldCheckTranscriptSizeForForcedFlush,
includeUsage: shouldReadTranscript,
})
: undefined;
const transcriptByteSize = sessionLogSnapshot?.byteSize;
const shouldForceFlushByTranscriptSize =
typeof transcriptByteSize === "number" && transcriptByteSize >= forceFlushTranscriptBytes;
const transcriptUsageSnapshot = sessionLogSnapshot?.usage;
const transcriptPromptTokens = transcriptUsageSnapshot?.promptTokens;
const transcriptOutputTokens = transcriptUsageSnapshot?.outputTokens;
const hasReliableTranscriptPromptTokens =
typeof transcriptPromptTokens === "number" &&
Number.isFinite(transcriptPromptTokens) &&
transcriptPromptTokens > 0;
const shouldPersistTranscriptPromptTokens =
hasReliableTranscriptPromptTokens &&
(!hasFreshPersistedPromptTokens ||
(transcriptPromptTokens ?? 0) > (persistedPromptTokens ?? 0));
if (entry && shouldPersistTranscriptPromptTokens) {
const nextEntry = {
...entry,
totalTokens: transcriptPromptTokens,
totalTokensFresh: true,
};
entry = nextEntry;
if (params.sessionKey && params.sessionStore) {
params.sessionStore[params.sessionKey] = nextEntry;
}
if (params.storePath && params.sessionKey) {
try {
const updatedEntry = await updateSessionStoreEntry({
storePath: params.storePath,
sessionKey: params.sessionKey,
update: async () => ({ totalTokens: transcriptPromptTokens, totalTokensFresh: true }),
});
if (updatedEntry) {
entry = updatedEntry;
if (params.sessionStore) {
params.sessionStore[params.sessionKey] = updatedEntry;
}
}
} catch (err) {
logVerbose(`failed to persist derived prompt totalTokens: ${String(err)}`);
}
}
}
const promptTokensSnapshot = Math.max(
hasFreshPersistedPromptTokens ? (persistedPromptTokens ?? 0) : 0,
hasReliableTranscriptPromptTokens ? (transcriptPromptTokens ?? 0) : 0,
);
const hasFreshPromptTokensSnapshot =
promptTokensSnapshot > 0 &&
(hasFreshPersistedPromptTokens || hasReliableTranscriptPromptTokens);
const projectedTokenCount = hasFreshPromptTokensSnapshot
? resolveEffectivePromptTokens(
promptTokensSnapshot,
transcriptOutputTokens,
promptTokenEstimate,
)
: undefined;
const tokenCountForFlush =
typeof projectedTokenCount === "number" &&
Number.isFinite(projectedTokenCount) &&
projectedTokenCount > 0
? projectedTokenCount
: undefined;
// Diagnostic logging to understand why memory flush may not trigger.
logVerbose(
`memoryFlush check: sessionKey=${params.sessionKey} ` +
`tokenCount=${tokenCountForFlush ?? "undefined"} ` +
`contextWindow=${contextWindowTokens} threshold=${flushThreshold} ` +
`isHeartbeat=${params.isHeartbeat} isCli=${isCli} memoryFlushWritable=${memoryFlushWritable} ` +
`compactionCount=${entry?.compactionCount ?? 0} memoryFlushCompactionCount=${entry?.memoryFlushCompactionCount ?? "undefined"} ` +
`persistedPromptTokens=${persistedPromptTokens ?? "undefined"} persistedFresh=${entry?.totalTokensFresh === true} ` +
`promptTokensEst=${promptTokenEstimate ?? "undefined"} transcriptPromptTokens=${transcriptPromptTokens ?? "undefined"} transcriptOutputTokens=${transcriptOutputTokens ?? "undefined"} ` +
`projectedTokenCount=${projectedTokenCount ?? "undefined"} transcriptBytes=${transcriptByteSize ?? "undefined"} ` +
`forceFlushTranscriptBytes=${forceFlushTranscriptBytes} forceFlushByTranscriptSize=${shouldForceFlushByTranscriptSize}`,
);
const shouldFlushMemory =
(memoryFlushWritable &&
!params.isHeartbeat &&
!isCli &&
shouldRunMemoryFlush({
entry,
tokenCount: tokenCountForFlush,
contextWindowTokens,
reserveTokensFloor: memoryFlushPlan.reserveTokensFloor,
softThresholdTokens: memoryFlushPlan.softThresholdTokens,
})) ||
(shouldForceFlushByTranscriptSize &&
entry != null &&
!hasAlreadyFlushedForCurrentCompaction(entry));
if (!shouldFlushMemory) {
return entry ?? params.sessionEntry;
}
logVerbose(
`memoryFlush triggered: sessionKey=${params.sessionKey} tokenCount=${tokenCountForFlush ?? "undefined"} threshold=${flushThreshold}`,
);
let activeSessionEntry = entry ?? params.sessionEntry;
const activeSessionStore = params.sessionStore;
let bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen(
activeSessionEntry?.systemPromptReport ??
(params.sessionKey ? activeSessionStore?.[params.sessionKey]?.systemPromptReport : undefined),
);
const flushRunId = crypto.randomUUID();
if (params.sessionKey) {
registerAgentRunContext(flushRunId, {
sessionKey: params.sessionKey,
verboseLevel: params.resolvedVerboseLevel,
});
}
let memoryCompactionCompleted = false;
const memoryFlushNowMs = Date.now();
const activeMemoryFlushPlan =
resolveMemoryFlushPlan({
cfg: params.cfg,
nowMs: memoryFlushNowMs,
}) ?? memoryFlushPlan;
const memoryFlushWritePath = activeMemoryFlushPlan.relativePath;
const flushSystemPrompt = [
params.followupRun.run.extraSystemPrompt,
activeMemoryFlushPlan.systemPrompt,
]
.filter(Boolean)
.join("\n\n");
let postCompactionSessionId: string | undefined;
try {
await runWithModelFallback({
...resolveModelFallbackOptions(params.followupRun.run),
runId: flushRunId,
run: async (provider, model, runOptions) => {
const { embeddedContext, senderContext, runBaseParams } = buildEmbeddedRunExecutionParams({
run: params.followupRun.run,
sessionCtx: params.sessionCtx,
hasRepliedRef: params.opts?.hasRepliedRef,
provider,
model,
runId: flushRunId,
allowTransientCooldownProbe: runOptions?.allowTransientCooldownProbe,
});
const result = await runEmbeddedPiAgent({
...embeddedContext,
...senderContext,
...runBaseParams,
allowGatewaySubagentBinding: true,
trigger: "memory",
memoryFlushWritePath,
prompt: activeMemoryFlushPlan.prompt,
extraSystemPrompt: flushSystemPrompt,
bootstrapPromptWarningSignaturesSeen,
bootstrapPromptWarningSignature:
bootstrapPromptWarningSignaturesSeen[bootstrapPromptWarningSignaturesSeen.length - 1],
onAgentEvent: (evt) => {
if (evt.stream === "compaction") {
const phase = typeof evt.data.phase === "string" ? evt.data.phase : "";
if (phase === "end") {
memoryCompactionCompleted = true;
}
}
},
});
if (result.meta?.agentMeta?.sessionId) {
postCompactionSessionId = result.meta.agentMeta.sessionId;
}
bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen(
result.meta?.systemPromptReport,
);
return result;
},
});
let memoryFlushCompactionCount =
activeSessionEntry?.compactionCount ??
(params.sessionKey ? activeSessionStore?.[params.sessionKey]?.compactionCount : 0) ??
0;
if (memoryCompactionCompleted) {
const previousSessionId = activeSessionEntry?.sessionId ?? params.followupRun.run.sessionId;
const nextCount = await incrementCompactionCount({
sessionEntry: activeSessionEntry,
sessionStore: activeSessionStore,
sessionKey: params.sessionKey,
storePath: params.storePath,
newSessionId: postCompactionSessionId,
});
const updatedEntry = params.sessionKey ? activeSessionStore?.[params.sessionKey] : undefined;
if (updatedEntry) {
activeSessionEntry = updatedEntry;
params.followupRun.run.sessionId = updatedEntry.sessionId;
if (updatedEntry.sessionFile) {
params.followupRun.run.sessionFile = updatedEntry.sessionFile;
}
const queueKey = params.followupRun.run.sessionKey ?? params.sessionKey;
if (queueKey) {
refreshQueuedFollowupSession({
key: queueKey,
previousSessionId,
nextSessionId: updatedEntry.sessionId,
nextSessionFile: updatedEntry.sessionFile,
});
}
}
if (typeof nextCount === "number") {
memoryFlushCompactionCount = nextCount;
}
}
if (params.storePath && params.sessionKey) {
try {
const updatedEntry = await updateSessionStoreEntry({
storePath: params.storePath,
sessionKey: params.sessionKey,
update: async () => ({
memoryFlushAt: Date.now(),
memoryFlushCompactionCount,
}),
});
if (updatedEntry) {
activeSessionEntry = updatedEntry;
params.followupRun.run.sessionId = updatedEntry.sessionId;
if (updatedEntry.sessionFile) {
params.followupRun.run.sessionFile = updatedEntry.sessionFile;
}
}
} catch (err) {
logVerbose(`failed to persist memory flush metadata: ${String(err)}`);
}
}
} catch (err) {
logVerbose(`memory flush run failed: ${String(err)}`);
}
return activeSessionEntry;
}