openclaw/src/auto-reply/reply/session-updates.ts

287 lines
8.8 KiB
TypeScript

import crypto from "node:crypto";
import fs from "node:fs";
import path from "node:path";
import { buildWorkspaceSkillSnapshot } from "../../agents/skills.js";
import { ensureSkillsWatcher, getSkillsSnapshotVersion } from "../../agents/skills/refresh.js";
import type { OpenClawConfig } from "../../config/config.js";
import {
resolveSessionFilePath,
resolveSessionFilePathOptions,
type SessionEntry,
updateSessionStore,
} from "../../config/sessions.js";
import { getRemoteSkillEligibility } from "../../infra/skills-remote.js";
import { resolveAgentIdFromSessionKey } from "../../routing/session-key.js";
export { drainFormattedSystemEvents } from "./session-system-events.js";
async function persistSessionEntryUpdate(params: {
sessionStore?: Record<string, SessionEntry>;
sessionKey?: string;
storePath?: string;
nextEntry: SessionEntry;
}) {
if (!params.sessionStore || !params.sessionKey) {
return;
}
params.sessionStore[params.sessionKey] = {
...params.sessionStore[params.sessionKey],
...params.nextEntry,
};
if (!params.storePath) {
return;
}
await updateSessionStore(params.storePath, (store) => {
store[params.sessionKey!] = { ...store[params.sessionKey!], ...params.nextEntry };
});
}
export async function ensureSkillSnapshot(params: {
sessionEntry?: SessionEntry;
sessionStore?: Record<string, SessionEntry>;
sessionKey?: string;
storePath?: string;
sessionId?: string;
isFirstTurnInSession: boolean;
workspaceDir: string;
cfg: OpenClawConfig;
/** If provided, only load skills with these names (for per-channel skill filtering) */
skillFilter?: string[];
}): Promise<{
sessionEntry?: SessionEntry;
skillsSnapshot?: SessionEntry["skillsSnapshot"];
systemSent: boolean;
}> {
if (process.env.OPENCLAW_TEST_FAST === "1") {
// In fast unit-test runs we skip filesystem scanning, watchers, and session-store writes.
// Dedicated skills tests cover snapshot generation behavior.
return {
sessionEntry: params.sessionEntry,
skillsSnapshot: params.sessionEntry?.skillsSnapshot,
systemSent: params.sessionEntry?.systemSent ?? false,
};
}
const {
sessionEntry,
sessionStore,
sessionKey,
storePath,
sessionId,
isFirstTurnInSession,
workspaceDir,
cfg,
skillFilter,
} = params;
let nextEntry = sessionEntry;
let systemSent = sessionEntry?.systemSent ?? false;
const remoteEligibility = getRemoteSkillEligibility();
const snapshotVersion = getSkillsSnapshotVersion(workspaceDir);
ensureSkillsWatcher({ workspaceDir, config: cfg });
const shouldRefreshSnapshot =
snapshotVersion > 0 && (nextEntry?.skillsSnapshot?.version ?? 0) < snapshotVersion;
if (isFirstTurnInSession && sessionStore && sessionKey) {
const current = nextEntry ??
sessionStore[sessionKey] ?? {
sessionId: sessionId ?? crypto.randomUUID(),
updatedAt: Date.now(),
};
const skillSnapshot =
isFirstTurnInSession || !current.skillsSnapshot || shouldRefreshSnapshot
? buildWorkspaceSkillSnapshot(workspaceDir, {
config: cfg,
skillFilter,
eligibility: { remote: remoteEligibility },
snapshotVersion,
})
: current.skillsSnapshot;
nextEntry = {
...current,
sessionId: sessionId ?? current.sessionId ?? crypto.randomUUID(),
updatedAt: Date.now(),
systemSent: true,
skillsSnapshot: skillSnapshot,
};
await persistSessionEntryUpdate({ sessionStore, sessionKey, storePath, nextEntry });
systemSent = true;
}
const skillsSnapshot = shouldRefreshSnapshot
? buildWorkspaceSkillSnapshot(workspaceDir, {
config: cfg,
skillFilter,
eligibility: { remote: remoteEligibility },
snapshotVersion,
})
: (nextEntry?.skillsSnapshot ??
(isFirstTurnInSession
? undefined
: buildWorkspaceSkillSnapshot(workspaceDir, {
config: cfg,
skillFilter,
eligibility: { remote: remoteEligibility },
snapshotVersion,
})));
if (
skillsSnapshot &&
sessionStore &&
sessionKey &&
!isFirstTurnInSession &&
(!nextEntry?.skillsSnapshot || shouldRefreshSnapshot)
) {
const current = nextEntry ?? {
sessionId: sessionId ?? crypto.randomUUID(),
updatedAt: Date.now(),
};
nextEntry = {
...current,
sessionId: sessionId ?? current.sessionId ?? crypto.randomUUID(),
updatedAt: Date.now(),
skillsSnapshot,
};
await persistSessionEntryUpdate({ sessionStore, sessionKey, storePath, nextEntry });
}
return { sessionEntry: nextEntry, skillsSnapshot, systemSent };
}
export async function incrementCompactionCount(params: {
sessionEntry?: SessionEntry;
sessionStore?: Record<string, SessionEntry>;
sessionKey?: string;
storePath?: string;
now?: number;
amount?: number;
/** Token count after compaction - if provided, updates session token counts */
tokensAfter?: number;
/** Session id after compaction, when the runtime rotated transcripts. */
newSessionId?: string;
}): Promise<number | undefined> {
const {
sessionEntry,
sessionStore,
sessionKey,
storePath,
now = Date.now(),
amount = 1,
tokensAfter,
newSessionId,
} = params;
if (!sessionStore || !sessionKey) {
return undefined;
}
const entry = sessionStore[sessionKey] ?? sessionEntry;
if (!entry) {
return undefined;
}
const incrementBy = Math.max(0, amount);
const nextCount = (entry.compactionCount ?? 0) + incrementBy;
// Build update payload with compaction count and optionally updated token counts
const updates: Partial<SessionEntry> = {
compactionCount: nextCount,
updatedAt: now,
};
if (newSessionId && newSessionId !== entry.sessionId) {
updates.sessionId = newSessionId;
updates.sessionFile = resolveCompactionSessionFile({
entry,
sessionKey,
storePath,
newSessionId,
});
}
// If tokensAfter is provided, update the cached token counts to reflect post-compaction state
if (tokensAfter != null && tokensAfter > 0) {
updates.totalTokens = tokensAfter;
updates.totalTokensFresh = true;
// Clear input/output breakdown since we only have the total estimate after compaction
updates.inputTokens = undefined;
updates.outputTokens = undefined;
updates.cacheRead = undefined;
updates.cacheWrite = undefined;
}
sessionStore[sessionKey] = {
...entry,
...updates,
};
if (storePath) {
await updateSessionStore(storePath, (store) => {
store[sessionKey] = {
...store[sessionKey],
...updates,
};
});
}
return nextCount;
}
function resolveCompactionSessionFile(params: {
entry: SessionEntry;
sessionKey: string;
storePath?: string;
newSessionId: string;
}): string {
const agentId = resolveAgentIdFromSessionKey(params.sessionKey);
const pathOpts = resolveSessionFilePathOptions({
agentId,
storePath: params.storePath,
});
const rewrittenSessionFile = rewriteSessionFileForNewSessionId({
sessionFile: params.entry.sessionFile,
previousSessionId: params.entry.sessionId,
nextSessionId: params.newSessionId,
});
const normalizedRewrittenSessionFile =
rewrittenSessionFile && path.isAbsolute(rewrittenSessionFile)
? canonicalizeAbsoluteSessionFilePath(rewrittenSessionFile)
: rewrittenSessionFile;
return resolveSessionFilePath(
params.newSessionId,
normalizedRewrittenSessionFile ? { sessionFile: normalizedRewrittenSessionFile } : undefined,
pathOpts,
);
}
function canonicalizeAbsoluteSessionFilePath(filePath: string): string {
const resolved = path.resolve(filePath);
try {
const parentDir = fs.realpathSync(path.dirname(resolved));
return path.join(parentDir, path.basename(resolved));
} catch {
return resolved;
}
}
function rewriteSessionFileForNewSessionId(params: {
sessionFile?: string;
previousSessionId: string;
nextSessionId: string;
}): string | undefined {
const trimmed = params.sessionFile?.trim();
if (!trimmed) {
return undefined;
}
const base = path.basename(trimmed);
if (!base.endsWith(".jsonl")) {
return undefined;
}
const withoutExt = base.slice(0, -".jsonl".length);
if (withoutExt === params.previousSessionId) {
return path.join(path.dirname(trimmed), `${params.nextSessionId}.jsonl`);
}
if (withoutExt.startsWith(`${params.previousSessionId}-topic-`)) {
return path.join(
path.dirname(trimmed),
`${params.nextSessionId}${base.slice(params.previousSessionId.length)}`,
);
}
const forkMatch = withoutExt.match(
/^(\d{4}-\d{2}-\d{2}T[\w-]+(?:Z|[+-]\d{2}(?:-\d{2})?)?)_(.+)$/,
);
if (forkMatch?.[2] === params.previousSessionId) {
return path.join(path.dirname(trimmed), `${forkMatch[1]}_${params.nextSessionId}.jsonl`);
}
return undefined;
}