mirror of https://github.com/openclaw/openclaw.git
feat: implement memory flush run decision logic
Added a new function `shouldAttemptMemoryFlushRun` to determine if a memory flush should be attempted based on session parameters. Updated the `runMemoryFlushIfNeeded` function to utilize this new logic and ensure proper handling of session keys, including checks for subagent sessions. This enhances the memory management capabilities of the agent runner.
This commit is contained in:
parent
6783f59232
commit
006ba42c35
|
|
@ -0,0 +1,45 @@
|
|||
import { describe, expect, it } from "vitest";
|
||||
import { shouldAttemptMemoryFlushRun } from "./agent-runner-memory.js";
|
||||
|
||||
describe("shouldAttemptMemoryFlushRun", () => {
|
||||
it("skips subagent sessions", () => {
|
||||
expect(
|
||||
shouldAttemptMemoryFlushRun({
|
||||
memoryFlushWritable: true,
|
||||
isHeartbeat: false,
|
||||
isCli: false,
|
||||
sessionKey: "agent:main:subagent:worker",
|
||||
}),
|
||||
).toBe(false);
|
||||
});
|
||||
|
||||
it("skips heartbeat and cli runs", () => {
|
||||
expect(
|
||||
shouldAttemptMemoryFlushRun({
|
||||
memoryFlushWritable: true,
|
||||
isHeartbeat: true,
|
||||
isCli: false,
|
||||
sessionKey: "main",
|
||||
}),
|
||||
).toBe(false);
|
||||
expect(
|
||||
shouldAttemptMemoryFlushRun({
|
||||
memoryFlushWritable: true,
|
||||
isHeartbeat: false,
|
||||
isCli: true,
|
||||
sessionKey: "main",
|
||||
}),
|
||||
).toBe(false);
|
||||
});
|
||||
|
||||
it("allows regular writable sessions", () => {
|
||||
expect(
|
||||
shouldAttemptMemoryFlushRun({
|
||||
memoryFlushWritable: true,
|
||||
isHeartbeat: false,
|
||||
isCli: false,
|
||||
sessionKey: "main",
|
||||
}),
|
||||
).toBe(true);
|
||||
});
|
||||
});
|
||||
|
|
@ -23,6 +23,7 @@ import {
|
|||
} from "../../config/sessions.js";
|
||||
import { logVerbose } from "../../globals.js";
|
||||
import { registerAgentRunContext } from "../../infra/agent-events.js";
|
||||
import { isSubagentSessionKey } from "../../routing/session-key.js";
|
||||
import type { TemplateContext } from "../templating.js";
|
||||
import type { VerboseLevel } from "../thinking.js";
|
||||
import type { GetReplyOptions } from "../types.js";
|
||||
|
|
@ -68,6 +69,20 @@ export function resolveEffectivePromptTokens(
|
|||
return base + output + estimate;
|
||||
}
|
||||
|
||||
export function shouldAttemptMemoryFlushRun(params: {
|
||||
memoryFlushWritable: boolean;
|
||||
isHeartbeat: boolean;
|
||||
isCli: boolean;
|
||||
sessionKey?: string;
|
||||
}): boolean {
|
||||
return (
|
||||
params.memoryFlushWritable &&
|
||||
!params.isHeartbeat &&
|
||||
!params.isCli &&
|
||||
!isSubagentSessionKey(params.sessionKey)
|
||||
);
|
||||
}
|
||||
|
||||
export type SessionTranscriptUsageSnapshot = {
|
||||
promptTokens?: number;
|
||||
outputTokens?: number;
|
||||
|
|
@ -267,14 +282,15 @@ export async function runMemoryFlushIfNeeded(params: {
|
|||
if (!memoryFlushSettings) {
|
||||
return params.sessionEntry;
|
||||
}
|
||||
const effectiveSessionKey = params.sessionKey ?? params.followupRun.run.sessionKey;
|
||||
|
||||
const memoryFlushWritable = (() => {
|
||||
if (!params.sessionKey) {
|
||||
if (!effectiveSessionKey) {
|
||||
return true;
|
||||
}
|
||||
const runtime = resolveSandboxRuntimeStatus({
|
||||
cfg: params.cfg,
|
||||
sessionKey: params.sessionKey,
|
||||
sessionKey: effectiveSessionKey,
|
||||
});
|
||||
if (!runtime.sandboxed) {
|
||||
return true;
|
||||
|
|
@ -284,10 +300,16 @@ export async function runMemoryFlushIfNeeded(params: {
|
|||
})();
|
||||
|
||||
const isCli = isCliProvider(params.followupRun.run.provider, params.cfg);
|
||||
const canAttemptFlush = memoryFlushWritable && !params.isHeartbeat && !isCli;
|
||||
const canAttemptFlush = shouldAttemptMemoryFlushRun({
|
||||
memoryFlushWritable,
|
||||
isHeartbeat: params.isHeartbeat,
|
||||
isCli,
|
||||
sessionKey: effectiveSessionKey,
|
||||
});
|
||||
const isSubagentSession = isSubagentSessionKey(effectiveSessionKey);
|
||||
let entry =
|
||||
params.sessionEntry ??
|
||||
(params.sessionKey ? params.sessionStore?.[params.sessionKey] : undefined);
|
||||
(effectiveSessionKey ? params.sessionStore?.[effectiveSessionKey] : undefined);
|
||||
const contextWindowTokens = resolveMemoryFlushContextWindowTokens({
|
||||
modelId: params.followupRun.run.model ?? params.defaultModel,
|
||||
agentCfgContextTokens: params.agentCfgContextTokens,
|
||||
|
|
@ -342,7 +364,7 @@ export async function runMemoryFlushIfNeeded(params: {
|
|||
? await readSessionLogSnapshot({
|
||||
sessionId: params.followupRun.run.sessionId,
|
||||
sessionEntry: entry,
|
||||
sessionKey: params.sessionKey ?? params.followupRun.run.sessionKey,
|
||||
sessionKey: effectiveSessionKey,
|
||||
opts: { storePath: params.storePath },
|
||||
includeByteSize: shouldCheckTranscriptSizeForForcedFlush,
|
||||
includeUsage: shouldReadTranscript,
|
||||
|
|
@ -371,20 +393,20 @@ export async function runMemoryFlushIfNeeded(params: {
|
|||
totalTokensFresh: true,
|
||||
};
|
||||
entry = nextEntry;
|
||||
if (params.sessionKey && params.sessionStore) {
|
||||
params.sessionStore[params.sessionKey] = nextEntry;
|
||||
if (effectiveSessionKey && params.sessionStore) {
|
||||
params.sessionStore[effectiveSessionKey] = nextEntry;
|
||||
}
|
||||
if (params.storePath && params.sessionKey) {
|
||||
if (params.storePath && effectiveSessionKey) {
|
||||
try {
|
||||
const updatedEntry = await updateSessionStoreEntry({
|
||||
storePath: params.storePath,
|
||||
sessionKey: params.sessionKey,
|
||||
sessionKey: effectiveSessionKey,
|
||||
update: async () => ({ totalTokens: transcriptPromptTokens, totalTokensFresh: true }),
|
||||
});
|
||||
if (updatedEntry) {
|
||||
entry = updatedEntry;
|
||||
if (params.sessionStore) {
|
||||
params.sessionStore[params.sessionKey] = updatedEntry;
|
||||
params.sessionStore[effectiveSessionKey] = updatedEntry;
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
|
|
@ -421,6 +443,7 @@ export async function runMemoryFlushIfNeeded(params: {
|
|||
`tokenCount=${tokenCountForFlush ?? "undefined"} ` +
|
||||
`contextWindow=${contextWindowTokens} threshold=${flushThreshold} ` +
|
||||
`isHeartbeat=${params.isHeartbeat} isCli=${isCli} memoryFlushWritable=${memoryFlushWritable} ` +
|
||||
`isSubagentSession=${isSubagentSession} ` +
|
||||
`compactionCount=${entry?.compactionCount ?? 0} memoryFlushCompactionCount=${entry?.memoryFlushCompactionCount ?? "undefined"} ` +
|
||||
`persistedPromptTokens=${persistedPromptTokens ?? "undefined"} persistedFresh=${entry?.totalTokensFresh === true} ` +
|
||||
`promptTokensEst=${promptTokenEstimate ?? "undefined"} transcriptPromptTokens=${transcriptPromptTokens ?? "undefined"} transcriptOutputTokens=${transcriptOutputTokens ?? "undefined"} ` +
|
||||
|
|
@ -456,12 +479,14 @@ export async function runMemoryFlushIfNeeded(params: {
|
|||
const activeSessionStore = params.sessionStore;
|
||||
let bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen(
|
||||
activeSessionEntry?.systemPromptReport ??
|
||||
(params.sessionKey ? activeSessionStore?.[params.sessionKey]?.systemPromptReport : undefined),
|
||||
(effectiveSessionKey
|
||||
? activeSessionStore?.[effectiveSessionKey]?.systemPromptReport
|
||||
: undefined),
|
||||
);
|
||||
const flushRunId = crypto.randomUUID();
|
||||
if (params.sessionKey) {
|
||||
if (effectiveSessionKey) {
|
||||
registerAgentRunContext(flushRunId, {
|
||||
sessionKey: params.sessionKey,
|
||||
sessionKey: effectiveSessionKey,
|
||||
verboseLevel: params.resolvedVerboseLevel,
|
||||
});
|
||||
}
|
||||
|
|
@ -528,24 +553,24 @@ export async function runMemoryFlushIfNeeded(params: {
|
|||
});
|
||||
let memoryFlushCompactionCount =
|
||||
activeSessionEntry?.compactionCount ??
|
||||
(params.sessionKey ? activeSessionStore?.[params.sessionKey]?.compactionCount : 0) ??
|
||||
(effectiveSessionKey ? activeSessionStore?.[effectiveSessionKey]?.compactionCount : 0) ??
|
||||
0;
|
||||
if (memoryCompactionCompleted) {
|
||||
const nextCount = await incrementCompactionCount({
|
||||
sessionEntry: activeSessionEntry,
|
||||
sessionStore: activeSessionStore,
|
||||
sessionKey: params.sessionKey,
|
||||
sessionKey: effectiveSessionKey,
|
||||
storePath: params.storePath,
|
||||
});
|
||||
if (typeof nextCount === "number") {
|
||||
memoryFlushCompactionCount = nextCount;
|
||||
}
|
||||
}
|
||||
if (params.storePath && params.sessionKey) {
|
||||
if (params.storePath && effectiveSessionKey) {
|
||||
try {
|
||||
const updatedEntry = await updateSessionStoreEntry({
|
||||
storePath: params.storePath,
|
||||
sessionKey: params.sessionKey,
|
||||
sessionKey: effectiveSessionKey,
|
||||
update: async () => ({
|
||||
memoryFlushAt: Date.now(),
|
||||
memoryFlushCompactionCount,
|
||||
|
|
|
|||
Loading…
Reference in New Issue