fix: keep session transcript pointers fresh after compaction (#50688)

Co-authored-by: Frank Yang <frank.ekn@gmail.com>
This commit is contained in:
Sathvik Veerapaneni 2026-03-23 10:58:07 -04:00 committed by GitHub
parent dd132ea77b
commit d2e8ed3632
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 976 additions and 774 deletions

View File

@ -37,9 +37,8 @@ import {
resolveMemoryFlushPromptForRun,
resolveMemoryFlushSettings,
shouldRunMemoryFlush,
computeContextHash,
} from "./memory-flush.js";
import type { FollowupRun } from "./queue.js";
import { refreshQueuedFollowupSession, type FollowupRun } from "./queue.js";
import { incrementCompactionCount } from "./session-updates.js";
export function estimatePromptTokensForMemoryFlush(prompt?: string): number | undefined {
@ -448,47 +447,6 @@ export async function runMemoryFlushIfNeeded(params: {
return entry ?? params.sessionEntry;
}
// --- Content hash dedup (state-based) ---
// Read the tail of the session transcript and compute a lightweight hash.
// If the hash matches the last flush, the context hasn't materially changed
// and flushing again would produce duplicate memory entries (#30115).
const sessionFilePath = await resolveSessionFilePathForFlush(
params.followupRun.run.sessionId,
entry ?? params.sessionEntry,
params.storePath,
params.sessionKey ? resolveAgentIdFromSessionKey(params.sessionKey) : undefined,
);
let contextHashBeforeFlush: string | undefined;
if (sessionFilePath) {
try {
const tailMessages = await readTranscriptTailMessages(sessionFilePath, 10);
// Include the pending prompt in the hash — runMemoryFlushIfNeeded runs
// before the current prompt is appended to the transcript, so the
// persisted tail alone would match the post-flush hash and incorrectly
// skip the next flush even when a new user message arrived.
const currentPrompt = params.followupRun.prompt;
if (currentPrompt) {
tailMessages.push({ role: "user", content: currentPrompt });
}
if (tailMessages.length === 0) {
logVerbose(
`memoryFlush dedup skipped (no tail messages extracted): sessionKey=${params.sessionKey}`,
);
}
contextHashBeforeFlush =
tailMessages.length > 0 ? computeContextHash(tailMessages) : undefined;
const previousHash = entry?.memoryFlushContextHash;
if (previousHash && contextHashBeforeFlush === previousHash) {
logVerbose(
`memoryFlush skipped (context hash unchanged): sessionKey=${params.sessionKey} hash=${contextHashBeforeFlush}`,
);
return entry ?? params.sessionEntry;
}
} catch (err) {
logVerbose(`memoryFlush hash check failed, proceeding with flush: ${String(err)}`);
}
}
logVerbose(
`memoryFlush triggered: sessionKey=${params.sessionKey} tokenCount=${tokenCountForFlush ?? "undefined"} threshold=${flushThreshold}`,
);
@ -507,7 +465,6 @@ export async function runMemoryFlushIfNeeded(params: {
});
}
let memoryCompactionCompleted = false;
let fallbackFlushAttemptedForCurrentHash = false;
const memoryFlushNowMs = Date.now();
const memoryFlushWritePath = resolveMemoryFlushRelativePathForRun({
cfg: params.cfg,
@ -519,21 +476,12 @@ export async function runMemoryFlushIfNeeded(params: {
]
.filter(Boolean)
.join("\n\n");
let postCompactionSessionId: string | undefined;
try {
await runWithModelFallback({
...resolveModelFallbackOptions(params.followupRun.run),
runId: flushRunId,
run: async (provider, model, runOptions) => {
if (contextHashBeforeFlush && fallbackFlushAttemptedForCurrentHash) {
logVerbose(
`memoryFlush fallback candidate skipped (context hash already attempted): sessionKey=${params.sessionKey} hash=${contextHashBeforeFlush} provider=${provider} model=${model}`,
);
// A prior candidate already attempted this exact flush context. Be
// conservative and skip later candidates so a write-then-throw failure
// cannot append the same memory twice during a single fallback cycle.
return { payloads: [], meta: {} };
}
fallbackFlushAttemptedForCurrentHash = Boolean(contextHashBeforeFlush);
const { embeddedContext, senderContext, runBaseParams } = buildEmbeddedRunExecutionParams({
run: params.followupRun.run,
sessionCtx: params.sessionCtx,
@ -562,12 +510,15 @@ export async function runMemoryFlushIfNeeded(params: {
onAgentEvent: (evt) => {
if (evt.stream === "compaction") {
const phase = typeof evt.data.phase === "string" ? evt.data.phase : "";
if (phase === "end" && evt.data.completed === true) {
if (phase === "end") {
memoryCompactionCompleted = true;
}
}
},
});
if (result.meta?.agentMeta?.sessionId) {
postCompactionSessionId = result.meta.agentMeta.sessionId;
}
bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen(
result.meta?.systemPromptReport,
);
@ -579,45 +530,51 @@ export async function runMemoryFlushIfNeeded(params: {
(params.sessionKey ? activeSessionStore?.[params.sessionKey]?.compactionCount : 0) ??
0;
if (memoryCompactionCompleted) {
const previousSessionId = activeSessionEntry?.sessionId ?? params.followupRun.run.sessionId;
const nextCount = await incrementCompactionCount({
sessionEntry: activeSessionEntry,
sessionStore: activeSessionStore,
sessionKey: params.sessionKey,
storePath: params.storePath,
newSessionId: postCompactionSessionId,
});
const updatedEntry = params.sessionKey ? activeSessionStore?.[params.sessionKey] : undefined;
if (updatedEntry) {
activeSessionEntry = updatedEntry;
params.followupRun.run.sessionId = updatedEntry.sessionId;
if (updatedEntry.sessionFile) {
params.followupRun.run.sessionFile = updatedEntry.sessionFile;
}
const queueKey = params.followupRun.run.sessionKey ?? params.sessionKey;
if (queueKey) {
refreshQueuedFollowupSession({
key: queueKey,
previousSessionId,
nextSessionId: updatedEntry.sessionId,
nextSessionFile: updatedEntry.sessionFile,
});
}
}
if (typeof nextCount === "number") {
memoryFlushCompactionCount = nextCount;
}
}
if (params.storePath && params.sessionKey) {
try {
// Re-hash the transcript AFTER the flush so the stored hash matches
// what the next pre-flush check will compute (the transcript now
// includes the flush turn's messages). (#34222)
let contextHashAfterFlush = contextHashBeforeFlush;
if (sessionFilePath) {
try {
const postFlushMessages = await readTranscriptTailMessages(sessionFilePath, 10);
if (postFlushMessages.length > 0) {
contextHashAfterFlush = computeContextHash(postFlushMessages);
}
} catch {
// Best-effort: fall back to pre-flush hash if re-read fails.
}
}
const updatedEntry = await updateSessionStoreEntry({
storePath: params.storePath,
sessionKey: params.sessionKey,
update: async () => ({
memoryFlushAt: Date.now(),
memoryFlushCompactionCount,
// Always write the hash field — when rehashing fails, clearing
// the stale value prevents incorrect dedup on subsequent flushes.
memoryFlushContextHash: contextHashAfterFlush ?? undefined,
}),
});
if (updatedEntry) {
activeSessionEntry = updatedEntry;
params.followupRun.run.sessionId = updatedEntry.sessionId;
if (updatedEntry.sessionFile) {
params.followupRun.run.sessionFile = updatedEntry.sessionFile;
}
}
} catch (err) {
logVerbose(`failed to persist memory flush metadata: ${String(err)}`);
@ -629,64 +586,3 @@ export async function runMemoryFlushIfNeeded(params: {
return activeSessionEntry;
}
/**
* Resolve the session transcript file path for flush hash computation.
*/
async function resolveSessionFilePathForFlush(
sessionId: string | undefined,
entry: SessionEntry | undefined,
storePath: string | undefined,
agentId: string | undefined,
): Promise<string | undefined> {
if (!sessionId) {
return undefined;
}
const resolved = resolveSessionFilePath(
sessionId,
entry,
resolveSessionFilePathOptions({ agentId, storePath }),
);
return resolved ?? undefined;
}
/**
* Read the last N messages from a session transcript file.
* Only reads the tail of the file to avoid loading multi-MB transcripts.
*/
async function readTranscriptTailMessages(
filePath: string,
maxMessages: number,
): Promise<Array<{ role?: string; content?: unknown }>> {
const TAIL_BYTES = 64 * 1024;
const handle = await fs.promises.open(filePath, "r");
try {
const stat = await handle.stat();
const start = Math.max(0, stat.size - TAIL_BYTES);
const readLen = Math.min(stat.size, TAIL_BYTES);
const buf = Buffer.alloc(readLen);
await handle.read(buf, 0, readLen, start);
const tail = buf.toString("utf-8");
const nlIdx = tail.indexOf("\n");
const trimmed = start > 0 ? (nlIdx >= 0 ? tail.slice(nlIdx + 1) : "") : tail;
const lines = trimmed.split(/\r?\n/);
const messages: Array<{ role?: string; content?: unknown }> = [];
for (let i = lines.length - 1; i >= 0 && messages.length < maxMessages; i--) {
const line = lines[i].trim();
if (!line) {
continue;
}
try {
const parsed = JSON.parse(line);
if (parsed?.message?.role) {
messages.unshift({ role: parsed.message.role, content: parsed.message.content });
}
} catch {
// Skip malformed lines
}
}
return messages;
} finally {
await handle.close();
}
}

View File

@ -15,16 +15,6 @@ const runEmbeddedPiAgentMock = vi.fn();
const runCliAgentMock = vi.fn();
const runWithModelFallbackMock = vi.fn();
const runtimeErrorMock = vi.fn();
const runMemoryFlushIfNeededMock = vi.hoisted(() =>
vi.fn(async ({ sessionEntry }) => sessionEntry),
);
const createReplyMediaPathNormalizerMock = vi.hoisted(() =>
vi.fn(
(_params?: unknown) =>
async <T>(payload: T) =>
payload,
),
);
vi.mock("../../agents/model-fallback.js", () => ({
runWithModelFallback: (params: {
@ -68,14 +58,6 @@ vi.mock("../../runtime.js", async () => {
};
});
vi.mock("./agent-runner-memory.runtime.js", () => ({
runMemoryFlushIfNeeded: (params: unknown) => runMemoryFlushIfNeededMock(params),
}));
vi.mock("./reply-media-paths.runtime.js", () => ({
createReplyMediaPathNormalizer: (params: unknown) => createReplyMediaPathNormalizerMock(params),
}));
vi.mock("./queue.js", async () => {
const actual = await vi.importActual<typeof import("./queue.js")>("./queue.js");
return {
@ -103,40 +85,10 @@ type RunWithModelFallbackParams = {
};
beforeEach(() => {
vi.useRealTimers();
vi.clearAllTimers();
runEmbeddedPiAgentMock.mockClear();
runCliAgentMock.mockClear();
runWithModelFallbackMock.mockClear();
runtimeErrorMock.mockClear();
runMemoryFlushIfNeededMock.mockClear();
runMemoryFlushIfNeededMock.mockImplementation(
async ({
sessionEntry,
followupRun,
}: {
sessionEntry?: SessionEntry;
followupRun: FollowupRun;
}) => {
if (!sessionEntry || (sessionEntry.totalTokens ?? 0) < 1_000_000) {
return sessionEntry;
}
await runWithModelFallbackMock({
provider: followupRun.run.provider,
model: followupRun.run.model,
run: async (provider: string, model: string) =>
await runEmbeddedPiAgentMock({
provider,
model,
prompt: "Pre-compaction memory flush.",
enforceFinalTag: provider.includes("gemini") ? true : undefined,
}),
});
return sessionEntry;
},
);
createReplyMediaPathNormalizerMock.mockClear();
createReplyMediaPathNormalizerMock.mockImplementation(() => async (payload) => payload);
loadCronStoreMock.mockClear();
// Default: no cron jobs in store.
loadCronStoreMock.mockResolvedValue({ version: 1, jobs: [] });
@ -153,7 +105,6 @@ beforeEach(() => {
});
afterEach(() => {
vi.clearAllTimers();
vi.useRealTimers();
resetSystemEventsForTest();
});
@ -388,6 +339,11 @@ describe("runReplyAgent auto-compaction token update", () => {
);
}
async function normalizeComparablePath(filePath: string): Promise<string> {
const parent = await fs.realpath(path.dirname(filePath)).catch(() => path.dirname(filePath));
return path.join(parent, path.basename(filePath));
}
function createBaseRun(params: {
storePath: string;
sessionEntry: Record<string, unknown>;
@ -436,6 +392,7 @@ describe("runReplyAgent auto-compaction token update", () => {
const sessionKey = "main";
const sessionEntry = {
sessionId: "session",
sessionFile: path.join(tmp, "session.jsonl"),
updatedAt: Date.now(),
totalTokens: 181_000,
compactionCount: 0,
@ -524,6 +481,7 @@ describe("runReplyAgent auto-compaction token update", () => {
payloads: [{ text: "done" }],
meta: {
agentMeta: {
sessionId: "session-rotated",
usage: { input: 190_000, output: 8_000, total: 198_000 },
lastCallUsage: { input: 10_000, output: 3_000, total: 13_000 },
compactionCount: 2,
@ -568,6 +526,10 @@ describe("runReplyAgent auto-compaction token update", () => {
const stored = JSON.parse(await fs.readFile(storePath, "utf-8"));
expect(stored[sessionKey].totalTokens).toBe(10_000);
expect(stored[sessionKey].compactionCount).toBe(2);
expect(stored[sessionKey].sessionId).toBe("session-rotated");
expect(await normalizeComparablePath(stored[sessionKey].sessionFile)).toBe(
await normalizeComparablePath(path.join(tmp, "session-rotated.jsonl")),
);
});
it("accumulates compactions across fallback attempts without double-counting a single attempt", async () => {

View File

@ -8,9 +8,14 @@ import type { TypingMode } from "../../config/types.js";
import { withStateDirEnv } from "../../test-helpers/state-dir-env.js";
import type { TemplateContext } from "../templating.js";
import type { GetReplyOptions } from "../types.js";
import type { FollowupRun, QueueSettings } from "./queue.js";
import {
enqueueFollowupRun,
refreshQueuedFollowupSession,
scheduleFollowupDrain,
type FollowupRun,
type QueueSettings,
} from "./queue.js";
import { createMockTypingController } from "./test-helpers.js";
import { createTypingSignaler } from "./typing-mode.js";
type AgentRunParams = {
onPartialReply?: (payload: { text?: string }) => Promise<void> | void;
@ -25,12 +30,11 @@ type EmbeddedRunParams = {
prompt?: string;
extraSystemPrompt?: string;
memoryFlushWritePath?: string;
sessionId?: string;
sessionFile?: string;
bootstrapPromptWarningSignaturesSeen?: string[];
bootstrapPromptWarningSignature?: string;
onAgentEvent?: (evt: {
stream?: string;
data?: { phase?: string; willRetry?: boolean; completed?: boolean };
}) => void;
onAgentEvent?: (evt: { stream?: string; data?: { phase?: string; willRetry?: boolean } }) => void;
};
const state = vi.hoisted(() => ({
@ -38,19 +42,8 @@ const state = vi.hoisted(() => ({
runCliAgentMock: vi.fn(),
}));
const accountingState = vi.hoisted(() => ({
persistRunSessionUsageMock: vi.fn(),
incrementRunCompactionCountMock: vi.fn(),
persistRunSessionUsageActual: null as null | ((params: unknown) => Promise<void>),
incrementRunCompactionCountActual: null as
| null
| ((params: unknown) => Promise<number | undefined>),
}));
let modelFallbackModule: typeof import("../../agents/model-fallback.js");
let onAgentEvent: typeof import("../../infra/agent-events.js").onAgentEvent;
let enqueueFollowupRunMock: typeof import("./queue/enqueue.js").enqueueFollowupRun;
let scheduleFollowupDrainMock: typeof import("./queue.js").scheduleFollowupDrain;
let runReplyAgentPromise:
| Promise<(typeof import("./agent-runner.js"))["runReplyAgent"]>
@ -63,99 +56,51 @@ async function getRunReplyAgent() {
return await runReplyAgentPromise;
}
vi.mock("../../agents/model-fallback.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("../../agents/model-fallback.js")>();
return {
...actual,
runWithModelFallback: async ({
provider,
model,
run,
}: {
provider: string;
model: string;
run: (provider: string, model: string) => Promise<unknown>;
}) => ({
result: await run(provider, model),
provider,
model,
attempts: [],
}),
};
});
vi.mock("../../agents/model-fallback.js", () => ({
runWithModelFallback: async ({
provider,
model,
run,
}: {
provider: string;
model: string;
run: (provider: string, model: string) => Promise<unknown>;
}) => ({
result: await run(provider, model),
provider,
model,
attempts: [],
}),
}));
vi.mock("../../agents/pi-embedded.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("../../agents/pi-embedded.js")>();
return {
...actual,
queueEmbeddedPiMessage: vi.fn().mockReturnValue(false),
runEmbeddedPiAgent: (params: unknown) => state.runEmbeddedPiAgentMock(params),
};
});
vi.mock("../../agents/pi-embedded.js", () => ({
queueEmbeddedPiMessage: vi.fn().mockReturnValue(false),
runEmbeddedPiAgent: (params: unknown) => state.runEmbeddedPiAgentMock(params),
}));
vi.mock("../../agents/cli-runner.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("../../agents/cli-runner.js")>();
return {
...actual,
runCliAgent: (params: unknown) => state.runCliAgentMock(params),
};
});
vi.mock("../../agents/cli-runner.js", () => ({
runCliAgent: (params: unknown) => state.runCliAgentMock(params),
}));
vi.mock("./queue/enqueue.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("./queue/enqueue.js")>();
return {
...actual,
enqueueFollowupRun: vi.fn(),
};
});
vi.mock("./queue.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("./queue.js")>();
return {
...actual,
scheduleFollowupDrain: vi.fn(),
};
});
vi.mock("./session-run-accounting.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("./session-run-accounting.js")>();
accountingState.persistRunSessionUsageActual = actual.persistRunSessionUsage as (
params: unknown,
) => Promise<void>;
accountingState.incrementRunCompactionCountActual = actual.incrementRunCompactionCount as (
params: unknown,
) => Promise<number | undefined>;
return {
...actual,
persistRunSessionUsage: (params: unknown) => accountingState.persistRunSessionUsageMock(params),
incrementRunCompactionCount: (params: unknown) =>
accountingState.incrementRunCompactionCountMock(params),
};
});
vi.mock("./queue.js", () => ({
enqueueFollowupRun: vi.fn(),
refreshQueuedFollowupSession: vi.fn(),
scheduleFollowupDrain: vi.fn(),
}));
beforeAll(async () => {
// Avoid attributing the initial agent-runner import cost to the first test case.
modelFallbackModule = await import("../../agents/model-fallback.js");
({ onAgentEvent } = await import("../../infra/agent-events.js"));
({ enqueueFollowupRun: enqueueFollowupRunMock } = await import("./queue/enqueue.js"));
({ scheduleFollowupDrain: scheduleFollowupDrainMock } = await import("./queue.js"));
await getRunReplyAgent();
});
beforeEach(async () => {
({ enqueueFollowupRun: enqueueFollowupRunMock } = await import("./queue/enqueue.js"));
({ scheduleFollowupDrain: scheduleFollowupDrainMock } = await import("./queue.js"));
beforeEach(() => {
state.runEmbeddedPiAgentMock.mockClear();
state.runCliAgentMock.mockClear();
vi.mocked(enqueueFollowupRunMock).mockClear();
vi.mocked(scheduleFollowupDrainMock).mockClear();
accountingState.persistRunSessionUsageMock.mockReset();
accountingState.incrementRunCompactionCountMock.mockReset();
accountingState.persistRunSessionUsageMock.mockImplementation(async (params: unknown) => {
await accountingState.persistRunSessionUsageActual?.(params);
});
accountingState.incrementRunCompactionCountMock.mockImplementation(async (params: unknown) => {
return await accountingState.incrementRunCompactionCountActual?.(params);
});
vi.mocked(enqueueFollowupRun).mockClear();
vi.mocked(refreshQueuedFollowupSession).mockClear();
vi.mocked(scheduleFollowupDrain).mockClear();
vi.stubEnv("OPENCLAW_TEST_FAST", "1");
});
@ -361,7 +306,7 @@ describe("runReplyAgent heartbeat followup guard", () => {
const result = await run();
expect(result).toBeUndefined();
expect(vi.mocked(enqueueFollowupRunMock)).not.toHaveBeenCalled();
expect(vi.mocked(enqueueFollowupRun)).not.toHaveBeenCalled();
expect(state.runEmbeddedPiAgentMock).not.toHaveBeenCalled();
expect(typing.cleanup).toHaveBeenCalledTimes(1);
});
@ -377,20 +322,27 @@ describe("runReplyAgent heartbeat followup guard", () => {
const result = await run();
expect(result).toBeUndefined();
expect(vi.mocked(enqueueFollowupRunMock)).toHaveBeenCalledTimes(1);
expect(vi.mocked(enqueueFollowupRun)).toHaveBeenCalledTimes(1);
expect(state.runEmbeddedPiAgentMock).not.toHaveBeenCalled();
});
it("drains followup queue when an unexpected exception escapes the run path", async () => {
accountingState.persistRunSessionUsageMock.mockRejectedValueOnce(new Error("persist exploded"));
const accounting = await import("./session-run-accounting.js");
const persistSpy = vi
.spyOn(accounting, "persistRunSessionUsage")
.mockRejectedValueOnce(new Error("persist exploded"));
state.runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [{ text: "ok" }],
meta: { agentMeta: { usage: { input: 1, output: 1 } } },
});
const { run } = createMinimalRun();
await expect(run()).rejects.toThrow("persist exploded");
expect(vi.mocked(scheduleFollowupDrainMock)).toHaveBeenCalledTimes(1);
try {
const { run } = createMinimalRun();
await expect(run()).rejects.toThrow("persist exploded");
expect(vi.mocked(scheduleFollowupDrain)).toHaveBeenCalledTimes(1);
} finally {
persistSpy.mockRestore();
}
});
});
@ -425,16 +377,15 @@ describe("runReplyAgent typing (heartbeat)", () => {
it("signals typing for normal runs", async () => {
const onPartialReply = vi.fn();
const typing = createMockTypingController();
const typingSignals = createTypingSignaler({
typing,
mode: "instant",
isHeartbeat: false,
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => {
await params.onPartialReply?.({ text: "hi" });
return { payloads: [{ text: "final" }], meta: {} };
});
await typingSignals.signalRunStart();
await typingSignals.signalTextDelta("hi");
await onPartialReply({ text: "hi", mediaUrls: undefined });
const { run, typing } = createMinimalRun({
opts: { isHeartbeat: false, onPartialReply },
});
await run();
expect(onPartialReply).toHaveBeenCalled();
expect(typing.startTypingOnText).toHaveBeenCalledWith("hi");
@ -443,16 +394,15 @@ describe("runReplyAgent typing (heartbeat)", () => {
it("never signals typing for heartbeat runs", async () => {
const onPartialReply = vi.fn();
const typing = createMockTypingController();
const typingSignals = createTypingSignaler({
typing,
mode: "instant",
isHeartbeat: true,
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => {
await params.onPartialReply?.({ text: "hi" });
return { payloads: [{ text: "final" }], meta: {} };
});
await typingSignals.signalRunStart();
await typingSignals.signalTextDelta("hi");
await onPartialReply({ text: "hi", mediaUrls: undefined });
const { run, typing } = createMinimalRun({
opts: { isHeartbeat: true, onPartialReply },
});
await run();
expect(onPartialReply).toHaveBeenCalled();
expect(typing.startTypingOnText).not.toHaveBeenCalled();
@ -686,37 +636,26 @@ describe("runReplyAgent typing (heartbeat)", () => {
it("retries transient HTTP failures once with timer-driven backoff", async () => {
vi.useFakeTimers();
try {
let calls = 0;
state.runEmbeddedPiAgentMock.mockImplementation(async () => {
calls += 1;
if (calls === 1) {
throw new Error("502 Bad Gateway");
}
return { payloads: [{ text: "final" }], meta: {} };
});
let calls = 0;
state.runEmbeddedPiAgentMock.mockImplementation(async () => {
calls += 1;
if (calls === 1) {
throw new Error("502 Bad Gateway");
}
return { payloads: [{ text: "final" }], meta: {} };
});
const { run } = createMinimalRun({
typingMode: "message",
});
const runPromise = run();
void runPromise.catch(() => {});
await vi.dynamicImportSettled();
const { run } = createMinimalRun({
typingMode: "message",
});
const runPromise = run();
vi.advanceTimersByTime(2_499);
await Promise.resolve();
expect(calls).toBe(1);
vi.advanceTimersByTime(1);
await Promise.resolve();
await Promise.resolve();
expect(calls).toBe(2);
// Restore real timers before awaiting the settled run to avoid Vitest
// fake-timer bookkeeping stalling the test worker after the retry fires.
vi.useRealTimers();
} finally {
vi.useRealTimers();
}
await vi.advanceTimersByTimeAsync(2_499);
expect(calls).toBe(1);
await vi.advanceTimersByTimeAsync(1);
await runPromise;
expect(calls).toBe(2);
vi.useRealTimers();
});
it("delivers tool results in order even when dispatched concurrently", async () => {
@ -782,7 +721,7 @@ describe("runReplyAgent typing (heartbeat)", () => {
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => {
params.onAgentEvent?.({
stream: "compaction",
data: { phase: "end", willRetry: false, completed: true },
data: { phase: "end", willRetry: false },
});
return { payloads: [{ text: "final" }], meta: {} };
});
@ -803,6 +742,39 @@ describe("runReplyAgent typing (heartbeat)", () => {
});
});
it("refreshes queued followups when auto-compaction rotates the session", async () => {
await withTempStateDir(async (stateDir) => {
const storePath = path.join(stateDir, "sessions", "sessions.json");
const sessionEntry: SessionEntry = { sessionId: "session", updatedAt: Date.now() };
const sessionStore = { main: sessionEntry };
state.runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [{ text: "final" }],
meta: {
agentMeta: {
sessionId: "session-rotated",
compactionCount: 1,
},
},
});
const { run } = createMinimalRun({
sessionEntry,
sessionStore,
sessionKey: "main",
storePath,
});
await run();
expect(vi.mocked(refreshQueuedFollowupSession)).toHaveBeenCalledWith({
key: "main",
previousSessionId: "session",
nextSessionId: "session-rotated",
nextSessionFile: expect.stringContaining("session-rotated.jsonl"),
});
});
});
it("announces model fallback only when verbose mode is enabled", async () => {
const cases = [
{ name: "verbose on", verbose: "on" as const, expectNotice: true },
@ -1315,6 +1287,12 @@ describe("runReplyAgent typing (heartbeat)", () => {
}
expect(payload.text?.toLowerCase()).toContain("reset");
expect(sessionStore.main.sessionId).not.toBe(sessionId);
expect(vi.mocked(refreshQueuedFollowupSession)).toHaveBeenCalledWith({
key: "main",
previousSessionId: sessionId,
nextSessionId: sessionStore.main.sessionId,
nextSessionFile: expect.stringContaining(".jsonl"),
});
const persisted = JSON.parse(await fs.readFile(storePath, "utf-8"));
expect(persisted.main.sessionId).toBe(sessionStore.main.sessionId);
@ -1646,6 +1624,11 @@ describe("runReplyAgent memory flush", () => {
return await fn(path.join(dir, "sessions.json"));
}
async function normalizeComparablePath(filePath: string): Promise<string> {
const parent = await fs.realpath(path.dirname(filePath)).catch(() => path.dirname(filePath));
return path.join(parent, path.basename(filePath));
}
beforeAll(async () => {
fixtureRoot = await fs.mkdtemp(path.join(tmpdir(), "openclaw-memory-flush-"));
});
@ -1850,15 +1833,26 @@ describe("runReplyAgent memory flush", () => {
prompt?: string;
extraSystemPrompt?: string;
memoryFlushWritePath?: string;
sessionId?: string;
sessionFile?: string;
}> = [];
state.runEmbeddedPiAgentMock.mockImplementation(async (params: EmbeddedRunParams) => {
calls.push({
prompt: params.prompt,
extraSystemPrompt: params.extraSystemPrompt,
memoryFlushWritePath: params.memoryFlushWritePath,
sessionId: params.sessionId,
sessionFile: params.sessionFile,
});
if (params.prompt?.includes("Pre-compaction memory flush.")) {
return { payloads: [], meta: {} };
params.onAgentEvent?.({
stream: "compaction",
data: { phase: "end", willRetry: false },
});
return {
payloads: [],
meta: { agentMeta: { sessionId: "session-rotated" } },
};
}
return {
payloads: [{ text: "ok" }],
@ -1888,10 +1882,25 @@ describe("runReplyAgent memory flush", () => {
expect(calls[0]?.extraSystemPrompt).toContain("memory/YYYY-MM-DD.md");
expect(calls[0]?.extraSystemPrompt).toContain("MEMORY.md");
expect(calls[1]?.prompt).toBe("hello");
expect(calls[1]?.sessionId).toBe("session-rotated");
expect(await normalizeComparablePath(calls[1]?.sessionFile ?? "")).toBe(
await normalizeComparablePath(path.join(path.dirname(storePath), "session-rotated.jsonl")),
);
expect(vi.mocked(refreshQueuedFollowupSession)).toHaveBeenCalledWith({
key: sessionKey,
previousSessionId: "session",
nextSessionId: "session-rotated",
nextSessionFile: expect.stringContaining("session-rotated.jsonl"),
});
const stored = JSON.parse(await fs.readFile(storePath, "utf-8"));
expect(stored[sessionKey].memoryFlushAt).toBeTypeOf("number");
expect(stored[sessionKey].memoryFlushCompactionCount).toBe(1);
expect(stored[sessionKey].memoryFlushCompactionCount).toBe(2);
expect(stored[sessionKey].compactionCount).toBe(2);
expect(stored[sessionKey].sessionId).toBe("session-rotated");
expect(await normalizeComparablePath(stored[sessionKey].sessionFile)).toBe(
await normalizeComparablePath(path.join(path.dirname(storePath), "session-rotated.jsonl")),
);
});
});
@ -2098,121 +2107,6 @@ describe("runReplyAgent memory flush", () => {
});
});
it("skips duplicate memory writes across memory-flush fallback retries", async () => {
await withTempStore(async (storePath) => {
const sessionKey = "main";
const sessionFile = "session-relative.jsonl";
const fixtureDir = path.dirname(storePath);
const transcriptPath = path.join(fixtureDir, sessionFile);
await fs.mkdir(path.dirname(transcriptPath), { recursive: true });
await fs.writeFile(
transcriptPath,
[
JSON.stringify({ message: { role: "user", content: "Remember alpha." } }),
JSON.stringify({ message: { role: "assistant", content: "Stored alpha." } }),
].join("\n") + "\n",
"utf-8",
);
const sessionEntry = {
sessionId: "session",
updatedAt: Date.now(),
sessionFile,
totalTokens: 80_000,
compactionCount: 1,
};
await seedSessionStore({ storePath, sessionKey, entry: sessionEntry });
let flushAttemptCount = 0;
let memoryFilePath: string | undefined;
const prompts: string[] = [];
state.runEmbeddedPiAgentMock.mockImplementation(async (params: EmbeddedRunParams) => {
prompts.push(params.prompt ?? "");
if (params.prompt?.includes("Pre-compaction memory flush.")) {
flushAttemptCount += 1;
memoryFilePath = path.join(fixtureDir, params.memoryFlushWritePath ?? "memory/flush.md");
await fs.mkdir(path.dirname(memoryFilePath), { recursive: true });
await fs.appendFile(memoryFilePath, "remember alpha\n", "utf-8");
if (flushAttemptCount === 1) {
throw new Error("flush failed after write");
}
return { payloads: [], meta: {} };
}
return {
payloads: [{ text: "ok" }],
meta: { agentMeta: { usage: { input: 1, output: 1 } } },
};
});
const fallbackSpy = vi
.spyOn(modelFallbackModule, "runWithModelFallback")
.mockImplementationOnce(
async ({
provider,
model,
run,
}: {
provider: string;
model: string;
run: (provider: string, model: string) => Promise<unknown>;
}) => {
try {
await run(provider, model);
} catch {
// Simulate advancing to the next fallback candidate after the first
// memory flush attempt already wrote and then failed.
}
return {
result: await run("openai", "gpt-5.4"),
provider: "openai",
model: "gpt-5.4",
attempts: [
{
provider,
model,
error: "flush failed after write",
reason: "unknown",
},
],
};
},
);
try {
const baseRun = createBaseRun({
storePath,
sessionEntry,
runOverrides: {
sessionFile,
workspaceDir: fixtureDir,
},
});
await runReplyAgentWithBase({
baseRun,
storePath,
sessionKey,
sessionEntry,
commandBody: "hello",
});
} finally {
fallbackSpy.mockRestore();
}
expect(flushAttemptCount).toBe(1);
expect(
prompts.filter((prompt) => prompt.includes("Pre-compaction memory flush.")),
).toHaveLength(1);
expect(memoryFilePath).toBeDefined();
await expect(fs.readFile(memoryFilePath!, "utf-8")).resolves.toBe("remember alpha\n");
const stored = JSON.parse(await fs.readFile(storePath, "utf-8"));
expect(stored[sessionKey].memoryFlushAt).toBeTypeOf("number");
expect(stored[sessionKey].memoryFlushContextHash).toMatch(/^[0-9a-f]{16}$/);
});
});
it("increments compaction count when flush compaction completes", async () => {
await withTempStore(async (storePath) => {
const sessionKey = "main";
@ -2229,7 +2123,7 @@ describe("runReplyAgent memory flush", () => {
if (params.prompt?.includes("Pre-compaction memory flush.")) {
params.onAgentEvent?.({
stream: "compaction",
data: { phase: "end", willRetry: false, completed: true },
data: { phase: "end", willRetry: false },
});
return { payloads: [], meta: {} };
}
@ -2258,25 +2152,4 @@ describe("runReplyAgent memory flush", () => {
});
});
});
describe("runReplyAgent error followup drain", () => {
it("drains followup queue when an unexpected exception escapes the run path", async () => {
vi.resetModules();
vi.doMock("./agent-runner-execution.runtime.js", () => ({
runAgentTurnWithFallback: vi.fn().mockRejectedValueOnce(new Error("persist exploded")),
}));
try {
({ scheduleFollowupDrain: scheduleFollowupDrainMock } = await import("./queue.js"));
vi.mocked(scheduleFollowupDrainMock).mockClear();
runReplyAgentPromise = undefined;
const { run } = createMinimalRun();
await expect(run()).rejects.toThrow("persist exploded");
expect(vi.mocked(scheduleFollowupDrainMock)).toHaveBeenCalledTimes(1);
} finally {
vi.doUnmock("./agent-runner-execution.runtime.js");
runReplyAgentPromise = undefined;
}
});
});
import type { ReplyPayload } from "../types.js";

View File

@ -1,23 +1,26 @@
import fs from "node:fs";
import { lookupCachedContextTokens } from "../../agents/context-cache.js";
import { lookupContextTokens } from "../../agents/context-tokens.runtime.js";
import { lookupContextTokens } from "../../agents/context.js";
import { DEFAULT_CONTEXT_TOKENS } from "../../agents/defaults.js";
import { resolveModelAuthMode } from "../../agents/model-auth.js";
import { isCliProvider } from "../../agents/model-selection.js";
import { queueEmbeddedPiMessage } from "../../agents/pi-embedded.js";
import { hasNonzeroUsage } from "../../agents/usage.js";
import {
resolveAgentIdFromSessionKey,
resolveSessionFilePath,
resolveSessionFilePathOptions,
resolveSessionTranscriptPath,
} from "../../config/sessions/paths.js";
import type { SessionEntry } from "../../config/sessions/types.js";
type SessionEntry,
updateSessionStore,
updateSessionStoreEntry,
} from "../../config/sessions.js";
import type { TypingMode } from "../../config/types.js";
import { emitAgentEvent } from "../../infra/agent-events.js";
import { emitDiagnosticEvent, isDiagnosticsEnabled } from "../../infra/diagnostic-events.js";
import { generateSecureUuid } from "../../infra/secure-random.js";
import { enqueueSystemEvent } from "../../infra/system-events.js";
import { resolveAgentIdFromSessionKey } from "../../routing/session-key.js";
import { defaultRuntime } from "../../runtime.js";
import { estimateUsageCost, resolveModelCostConfig } from "../../utils/usage-format.js";
import {
buildFallbackClearedNotice,
buildFallbackNotice,
@ -26,7 +29,7 @@ import {
import type { OriginatingChannelType, TemplateContext } from "../templating.js";
import { resolveResponseUsageMode, type VerboseLevel } from "../thinking.js";
import type { GetReplyOptions, ReplyPayload } from "../types.js";
import { runAgentTurnWithFallback } from "./agent-runner-execution.runtime.js";
import { runAgentTurnWithFallback } from "./agent-runner-execution.js";
import {
createShouldEmitToolOutput,
createShouldEmitToolResult,
@ -34,7 +37,7 @@ import {
isAudioPayload,
signalTypingIfNeeded,
} from "./agent-runner-helpers.js";
import { runMemoryFlushIfNeeded } from "./agent-runner-memory.runtime.js";
import { runMemoryFlushIfNeeded } from "./agent-runner-memory.js";
import { buildReplyPayloads } from "./agent-runner-payloads.js";
import {
appendUnscheduledReminderNote,
@ -48,37 +51,19 @@ import { createFollowupRunner } from "./followup-runner.js";
import { resolveOriginMessageProvider, resolveOriginMessageTo } from "./origin-routing.js";
import { readPostCompactionContext } from "./post-compaction-context.js";
import { resolveActiveRunQueueAction } from "./queue-policy.js";
import type { FollowupRun, QueueSettings } from "./queue.js";
import { enqueueFollowupRun } from "./queue/enqueue.js";
import { createReplyMediaPathNormalizer } from "./reply-media-paths.runtime.js";
import {
enqueueFollowupRun,
refreshQueuedFollowupSession,
type FollowupRun,
type QueueSettings,
} from "./queue.js";
import { createReplyMediaPathNormalizer } from "./reply-media-paths.js";
import { createReplyToModeFilterForChannel, resolveReplyToMode } from "./reply-threading.js";
import { incrementRunCompactionCount, persistRunSessionUsage } from "./session-run-accounting.js";
import { createTypingSignaler } from "./typing-mode.js";
import type { TypingController } from "./typing.js";
const BLOCK_REPLY_SEND_TIMEOUT_MS = 15_000;
let piEmbeddedQueueRuntimePromise: Promise<
typeof import("../../agents/pi-embedded-queue.runtime.js")
> | null = null;
let usageCostRuntimePromise: Promise<typeof import("./usage-cost.runtime.js")> | null = null;
let sessionStoreRuntimePromise: Promise<
typeof import("../../config/sessions/store.runtime.js")
> | null = null;
function loadPiEmbeddedQueueRuntime() {
piEmbeddedQueueRuntimePromise ??= import("../../agents/pi-embedded-queue.runtime.js");
return piEmbeddedQueueRuntimePromise;
}
function loadUsageCostRuntime() {
usageCostRuntimePromise ??= import("./usage-cost.runtime.js");
return usageCostRuntimePromise;
}
function loadSessionStoreRuntime() {
sessionStoreRuntimePromise ??= import("../../config/sessions/store.runtime.js");
return sessionStoreRuntimePromise;
}
export async function runReplyAgent(params: {
commandBody: string;
@ -206,7 +191,6 @@ export async function runReplyAgent(params: {
activeSessionEntry.updatedAt = updatedAt;
activeSessionStore[sessionKey] = activeSessionEntry;
if (storePath) {
const { updateSessionStoreEntry } = await loadSessionStoreRuntime();
await updateSessionStoreEntry({
storePath,
sessionKey,
@ -216,7 +200,6 @@ export async function runReplyAgent(params: {
};
if (shouldSteer && isStreaming) {
const { queueEmbeddedPiMessage } = await loadPiEmbeddedQueueRuntime();
const steered = queueEmbeddedPiMessage(followupRun.run.sessionId, followupRun.prompt);
if (steered && !shouldFollowup) {
await touchActiveSessionEntry();
@ -314,7 +297,6 @@ export async function runReplyAgent(params: {
fallbackNoticeSelectedModel: undefined,
fallbackNoticeActiveModel: undefined,
fallbackNoticeReason: undefined,
memoryFlushContextHash: undefined,
};
const agentId = resolveAgentIdFromSessionKey(sessionKey);
const nextSessionFile = resolveSessionTranscriptPath(
@ -325,7 +307,6 @@ export async function runReplyAgent(params: {
nextEntry.sessionFile = nextSessionFile;
activeSessionStore[sessionKey] = nextEntry;
try {
const { updateSessionStore } = await loadSessionStoreRuntime();
await updateSessionStore(storePath, (store) => {
store[sessionKey] = nextEntry;
});
@ -336,6 +317,12 @@ export async function runReplyAgent(params: {
}
followupRun.run.sessionId = nextSessionId;
followupRun.run.sessionFile = nextSessionFile;
refreshQueuedFollowupSession({
key: queueKey,
previousSessionId: prevEntry.sessionId,
nextSessionId,
nextSessionFile,
});
activeSessionEntry = nextEntry;
activeIsNewSession = true;
defaultRuntime.error(buildLogMessage(nextSessionId));
@ -425,7 +412,6 @@ export async function runReplyAgent(params: {
activeSessionEntry.updatedAt = updatedAt;
activeSessionStore[sessionKey] = activeSessionEntry;
if (storePath) {
const { updateSessionStoreEntry } = await loadSessionStoreRuntime();
await updateSessionStoreEntry({
storePath,
sessionKey,
@ -443,11 +429,6 @@ export async function runReplyAgent(params: {
await blockReplyPipeline.flush({ force: true });
blockReplyPipeline.stop();
}
// NOTE: The compaction completion notice for block-streaming mode is sent
// further below — after incrementRunCompactionCount — so it can include
// the `(count N)` suffix. Sending it here (before the count is known)
// would omit that information.
if (pendingToolTasks.size > 0) {
await Promise.allSettled(pendingToolTasks);
}
@ -482,7 +463,6 @@ export async function runReplyAgent(params: {
activeSessionStore[sessionKey] = fallbackStateEntry;
}
if (sessionKey && storePath) {
const { updateSessionStoreEntry } = await loadSessionStoreRuntime();
await updateSessionStoreEntry({
storePath,
sessionKey,
@ -497,11 +477,9 @@ export async function runReplyAgent(params: {
const cliSessionId = isCliProvider(providerUsed, cfg)
? runResult.meta?.agentMeta?.sessionId?.trim()
: undefined;
const cachedContextTokens = lookupCachedContextTokens(modelUsed);
const contextTokensUsed =
agentCfgContextTokens ??
cachedContextTokens ??
lookupContextTokens(modelUsed, { allowAsyncLoad: false }) ??
lookupContextTokens(modelUsed) ??
activeSessionEntry?.contextTokens ??
DEFAULT_CONTEXT_TOKENS;
@ -579,7 +557,6 @@ export async function runReplyAgent(params: {
await signalTypingIfNeeded(guardedReplyPayloads, typingSignals);
if (isDiagnosticsEnabled(cfg) && hasNonzeroUsage(usage)) {
const { estimateUsageCost, resolveModelCostConfig } = await loadUsageCostRuntime();
const input = usage.input ?? 0;
const output = usage.output ?? 0;
const cacheRead = usage.cacheRead ?? 0;
@ -622,7 +599,6 @@ export async function runReplyAgent(params: {
(sessionKey ? activeSessionStore?.[sessionKey]?.responseUsage : undefined);
const responseUsageMode = resolveResponseUsageMode(responseUsageRaw);
if (responseUsageMode !== "off" && hasNonzeroUsage(usage)) {
const { resolveModelCostConfig } = await loadUsageCostRuntime();
const authMode = resolveModelAuthMode(providerUsed, cfg);
const showCost = authMode === "api-key";
const costConfig = showCost
@ -708,6 +684,7 @@ export async function runReplyAgent(params: {
}
if (autoCompactionCount > 0) {
const previousSessionId = activeSessionEntry?.sessionId ?? followupRun.run.sessionId;
const count = await incrementRunCompactionCount({
sessionEntry: activeSessionEntry,
sessionStore: activeSessionStore,
@ -716,7 +693,19 @@ export async function runReplyAgent(params: {
amount: autoCompactionCount,
lastCallUsage: runResult.meta?.agentMeta?.lastCallUsage,
contextTokensUsed,
newSessionId: runResult.meta?.agentMeta?.sessionId,
});
const refreshedSessionEntry =
sessionKey && activeSessionStore ? activeSessionStore[sessionKey] : undefined;
if (refreshedSessionEntry) {
activeSessionEntry = refreshedSessionEntry;
refreshQueuedFollowupSession({
key: queueKey,
previousSessionId,
nextSessionId: refreshedSessionEntry.sessionId,
nextSessionFile: refreshedSessionEntry.sessionFile,
});
}
// Inject post-compaction workspace context for the next agent turn
if (sessionKey) {
@ -732,48 +721,9 @@ export async function runReplyAgent(params: {
});
}
// Always notify the user when compaction completes — not just in verbose
// mode. The "🧹 Compacting context..." notice was already sent at start,
// so the completion message closes the loop for every user regardless of
// their verbose setting.
const suffix = typeof count === "number" ? ` (count ${count})` : "";
const completionText = verboseEnabled
? `🧹 Auto-compaction complete${suffix}.`
: `✅ Context compacted${suffix}.`;
if (blockReplyPipeline && opts?.onBlockReply) {
// In block-streaming mode, send the completion notice via
// fire-and-forget *after* the pipeline has flushed (so it does not set
// didStream()=true and cause buildReplyPayloads to discard the real
// assistant reply). Now that the count is known we can include it.
const currentMessageId = sessionCtx.MessageSidFull ?? sessionCtx.MessageSid;
const noticePayload = applyReplyToMode({
text: completionText,
replyToId: currentMessageId,
replyToCurrent: true,
isCompactionNotice: true,
});
void Promise.race([
opts.onBlockReply(noticePayload),
new Promise<void>((_, reject) =>
setTimeout(() => reject(new Error("compaction notice timeout")), blockReplyTimeoutMs),
),
]).catch(() => {
// Intentionally swallowed — the notice is informational only.
});
} else {
// Non-streaming: push into verboseNotices with full compaction metadata
// so threading exemptions apply and replyToMode=first does not thread
// the notice instead of the real assistant reply.
const currentMessageId = sessionCtx.MessageSidFull ?? sessionCtx.MessageSid;
verboseNotices.push(
applyReplyToMode({
text: completionText,
replyToId: currentMessageId,
replyToCurrent: true,
isCompactionNotice: true,
}),
);
if (verboseEnabled) {
const suffix = typeof count === "number" ? ` (count ${count})` : "";
verboseNotices.push({ text: `🧹 Auto-compaction complete${suffix}.` });
}
}
if (verboseNotices.length > 0) {

View File

@ -3,7 +3,12 @@ import { tmpdir } from "node:os";
import path from "node:path";
import { beforeEach, describe, expect, it, vi } from "vitest";
import { loadSessionStore, saveSessionStore, type SessionEntry } from "../../config/sessions.js";
import type { FollowupRun } from "./queue.js";
import {
clearFollowupQueue,
enqueueFollowupRun,
type FollowupRun,
type QueueSettings,
} from "./queue.js";
import * as sessionRunAccounting from "./session-run-accounting.js";
import { createMockFollowupRun, createMockTypingController } from "./test-helpers.js";
@ -16,14 +21,18 @@ vi.mock(
async () => await import("../../test-utils/model-fallback.mock.js"),
);
vi.mock("../../agents/pi-embedded.runtime.js", () => ({
vi.mock("../../agents/pi-embedded.js", () => ({
runEmbeddedPiAgent: (params: unknown) => runEmbeddedPiAgentMock(params),
}));
vi.mock("./route-reply.runtime.js", () => ({
isRoutableChannel: (...args: unknown[]) => isRoutableChannelMock(...args),
routeReply: (...args: unknown[]) => routeReplyMock(...args),
}));
vi.mock("./route-reply.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("./route-reply.js")>();
return {
...actual,
isRoutableChannel: (...args: unknown[]) => isRoutableChannelMock(...args),
routeReply: (...args: unknown[]) => routeReplyMock(...args),
};
});
import { createFollowupRunner } from "./followup-runner.js";
@ -44,6 +53,7 @@ beforeEach(() => {
isRoutableChannelMock.mockImplementation((ch: string | undefined) =>
Boolean(ch?.trim() && ROUTABLE_TEST_CHANNELS.has(ch.trim().toLowerCase())),
);
clearFollowupQueue("main");
});
const baseQueuedRun = (messageProvider = "whatsapp"): FollowupRun =>
@ -55,6 +65,11 @@ function createQueuedRun(
return createMockFollowupRun(overrides);
}
async function normalizeComparablePath(filePath: string): Promise<string> {
const parent = await fs.realpath(path.dirname(filePath)).catch(() => path.dirname(filePath));
return path.join(parent, path.basename(filePath));
}
function mockCompactionRun(params: {
willRetry: boolean;
result: {
@ -66,10 +81,6 @@ function mockCompactionRun(params: {
async (args: {
onAgentEvent?: (evt: { stream: string; data: Record<string, unknown> }) => void;
}) => {
args.onAgentEvent?.({
stream: "compaction",
data: { phase: "start" },
});
args.onAgentEvent?.({
stream: "compaction",
data: { phase: "end", willRetry: params.willRetry, completed: true },
@ -84,7 +95,7 @@ function createAsyncReplySpy() {
}
describe("createFollowupRunner compaction", () => {
it("adds compaction notices and tracks count in verbose mode", async () => {
it("adds verbose auto-compaction notice and tracks count", async () => {
const storePath = path.join(
await fs.mkdtemp(path.join(tmpdir(), "openclaw-compaction-")),
"sessions.json",
@ -122,15 +133,9 @@ describe("createFollowupRunner compaction", () => {
await runner(queued);
expect(onBlockReply).toHaveBeenCalledTimes(3);
const calls = onBlockReply.mock.calls as unknown as Array<
Array<{ text?: string; isCompactionNotice?: boolean }>
>;
expect(calls[0]?.[0]?.text).toBe("🧹 Compacting context...");
expect(calls[0]?.[0]?.isCompactionNotice).toBe(true);
expect(calls[1]?.[0]?.text).toContain("Auto-compaction complete");
expect(calls[1]?.[0]?.isCompactionNotice).toBe(true);
expect(calls[2]?.[0]?.text).toBe("final");
expect(onBlockReply).toHaveBeenCalled();
const firstCall = (onBlockReply.mock.calls as unknown as Array<Array<{ text?: string }>>)[0];
expect(firstCall?.[0]?.text).toContain("Auto-compaction complete");
expect(sessionStore.main.compactionCount).toBe(1);
});
@ -141,6 +146,7 @@ describe("createFollowupRunner compaction", () => {
);
const sessionEntry: SessionEntry = {
sessionId: "session",
sessionFile: path.join(path.dirname(storePath), "session.jsonl"),
updatedAt: Date.now(),
};
const sessionStore: Record<string, SessionEntry> = {
@ -152,6 +158,7 @@ describe("createFollowupRunner compaction", () => {
payloads: [{ text: "final" }],
meta: {
agentMeta: {
sessionId: "session-rotated",
compactionCount: 2,
lastCallUsage: { input: 10_000, output: 3_000, total: 13_000 },
},
@ -177,37 +184,43 @@ describe("createFollowupRunner compaction", () => {
await runner(queued);
expect(onBlockReply).toHaveBeenCalledTimes(2);
const calls = onBlockReply.mock.calls as unknown as Array<
Array<{ text?: string; isCompactionNotice?: boolean }>
>;
expect(calls[0]?.[0]?.text).toContain("Auto-compaction complete");
expect(calls[0]?.[0]?.isCompactionNotice).toBe(true);
expect(calls[1]?.[0]?.text).toBe("final");
expect(onBlockReply).toHaveBeenCalled();
const firstCall = (onBlockReply.mock.calls as unknown as Array<Array<{ text?: string }>>)[0];
expect(firstCall?.[0]?.text).toContain("Auto-compaction complete");
expect(sessionStore.main.compactionCount).toBe(2);
expect(sessionStore.main.sessionId).toBe("session-rotated");
expect(await normalizeComparablePath(sessionStore.main.sessionFile ?? "")).toBe(
await normalizeComparablePath(path.join(path.dirname(storePath), "session-rotated.jsonl")),
);
});
it("threads followup compaction notices without consuming the first reply slot", async () => {
it("refreshes queued followup runs to the rotated transcript", async () => {
const storePath = path.join(
await fs.mkdtemp(path.join(tmpdir(), "openclaw-compaction-threading-")),
await fs.mkdtemp(path.join(tmpdir(), "openclaw-compaction-queue-")),
"sessions.json",
);
const sessionEntry: SessionEntry = {
sessionId: "session",
sessionFile: path.join(path.dirname(storePath), "session.jsonl"),
updatedAt: Date.now(),
};
const sessionStore: Record<string, SessionEntry> = {
main: sessionEntry,
};
const onBlockReply = vi.fn(async () => {});
mockCompactionRun({
willRetry: true,
result: { payloads: [{ text: "final" }], meta: {} },
runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [{ text: "final" }],
meta: {
agentMeta: {
sessionId: "session-rotated",
compactionCount: 1,
lastCallUsage: { input: 10_000, output: 3_000, total: 13_000 },
},
},
});
const runner = createFollowupRunner({
opts: { onBlockReply },
opts: { onBlockReply: vi.fn(async () => {}) },
typing: createMockTypingController(),
typingMode: "instant",
sessionEntry,
@ -217,42 +230,30 @@ describe("createFollowupRunner compaction", () => {
defaultModel: "anthropic/claude-opus-4-5",
});
const queued = createQueuedRun({
messageId: "msg-42",
const queuedNext = createQueuedRun({
prompt: "next",
run: {
messageProvider: "discord",
config: {
channels: {
discord: {
replyToMode: "first",
},
},
},
verboseLevel: "off",
sessionId: "session",
sessionFile: path.join(path.dirname(storePath), "session.jsonl"),
},
});
const queueSettings: QueueSettings = { mode: "queue" };
enqueueFollowupRun("main", queuedNext, queueSettings);
const current = createQueuedRun({
run: {
verboseLevel: "on",
sessionId: "session",
sessionFile: path.join(path.dirname(storePath), "session.jsonl"),
},
});
await runner(queued);
await runner(current);
expect(onBlockReply).toHaveBeenCalledTimes(3);
const calls = onBlockReply.mock.calls as unknown as Array<
Array<{ text?: string; replyToId?: string; isCompactionNotice?: boolean }>
>;
expect(calls[0]?.[0]).toMatchObject({
text: "🧹 Compacting context...",
replyToId: "msg-42",
isCompactionNotice: true,
});
expect(calls[1]?.[0]).toMatchObject({
text: "✅ Context compacted (count 1).",
replyToId: "msg-42",
isCompactionNotice: true,
});
expect(calls[2]?.[0]).toMatchObject({
text: "final",
replyToId: "msg-42",
});
expect(calls[2]?.[0]?.isCompactionNotice).toBeUndefined();
expect(queuedNext.run.sessionId).toBe("session-rotated");
expect(await normalizeComparablePath(queuedNext.run.sessionFile)).toBe(
await normalizeComparablePath(path.join(path.dirname(storePath), "session-rotated.jsonl")),
);
});
it("does not count failed compaction end events in followup runs", async () => {

View File

@ -5,10 +5,10 @@ import {
} from "openclaw/plugin-sdk/reply-payload";
import { resolveRunModelFallbacksOverride } from "../../agents/agent-scope.js";
import { resolveBootstrapWarningSignaturesSeen } from "../../agents/bootstrap-budget.js";
import { lookupCachedContextTokens } from "../../agents/context-cache.js";
import { lookupContextTokens } from "../../agents/context.js";
import { DEFAULT_CONTEXT_TOKENS } from "../../agents/defaults.js";
import { runWithModelFallback } from "../../agents/model-fallback.js";
import { runEmbeddedPiAgent } from "../../agents/pi-embedded.js";
import type { SessionEntry } from "../../config/sessions.js";
import type { TypingMode } from "../../config/types.js";
import { logVerbose } from "../../globals.js";
@ -19,38 +19,25 @@ import { stripHeartbeatToken } from "../heartbeat.js";
import type { OriginatingChannelType } from "../templating.js";
import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../tokens.js";
import type { GetReplyOptions, ReplyPayload } from "../types.js";
import { resolveRunAuthProfile } from "./agent-runner-auth-profile.js";
import { resolveRunAuthProfile } from "./agent-runner-utils.js";
import {
resolveOriginAccountId,
resolveOriginMessageProvider,
resolveOriginMessageTo,
} from "./origin-routing.js";
import type { FollowupRun } from "./queue.js";
import { refreshQueuedFollowupSession, type FollowupRun } from "./queue.js";
import {
applyReplyThreading,
filterMessagingToolDuplicates,
filterMessagingToolMediaDuplicates,
shouldSuppressMessagingToolReplies,
} from "./reply-payloads.js";
import { resolveReplyToMode } from "./reply-threading.js";
import { isRoutableChannel, routeReply } from "./route-reply.js";
import { incrementRunCompactionCount, persistRunSessionUsage } from "./session-run-accounting.js";
import { createTypingSignaler } from "./typing-mode.js";
import type { TypingController } from "./typing.js";
let piEmbeddedRuntimePromise: Promise<typeof import("../../agents/pi-embedded.runtime.js")> | null =
null;
let routeReplyRuntimePromise: Promise<typeof import("./route-reply.runtime.js")> | null = null;
let replyPayloadsRuntimePromise: Promise<typeof import("./reply-payloads.runtime.js")> | null =
null;
function loadPiEmbeddedRuntime() {
piEmbeddedRuntimePromise ??= import("../../agents/pi-embedded.runtime.js");
return piEmbeddedRuntimePromise;
}
function loadRouteReplyRuntime() {
routeReplyRuntimePromise ??= import("./route-reply.runtime.js");
return routeReplyRuntimePromise;
}
function loadReplyPayloadsRuntime() {
replyPayloadsRuntimePromise ??= import("./reply-payloads.runtime.js");
return replyPayloadsRuntimePromise;
}
export function createFollowupRunner(params: {
opts?: GetReplyOptions;
typing: TypingController;
@ -90,7 +77,6 @@ export function createFollowupRunner(params: {
const sendFollowupPayloads = async (payloads: ReplyPayload[], queued: FollowupRun) => {
// Check if we should route to originating channel.
const { originatingChannel, originatingTo } = queued;
const { isRoutableChannel, routeReply } = await loadRouteReplyRuntime();
const shouldRouteToOriginating = isRoutableChannel(originatingChannel) && originatingTo;
if (!shouldRouteToOriginating && !opts?.onBlockReply) {
@ -162,47 +148,8 @@ export function createFollowupRunner(params: {
isControlUiVisible: shouldSurfaceToControlUi,
});
}
const replyToChannel = resolveOriginMessageProvider({
originatingChannel: queued.originatingChannel,
provider: queued.run.messageProvider,
}) as OriginatingChannelType | undefined;
const replyToMode = resolveReplyToMode(
queued.run.config,
replyToChannel,
queued.originatingAccountId,
queued.originatingChatType,
);
const currentMessageId = queued.messageId?.trim() || undefined;
const applyFollowupReplyThreading = async (payloads: ReplyPayload[]) => {
const { applyReplyThreading } = await loadReplyPayloadsRuntime();
return applyReplyThreading({
payloads,
replyToMode,
replyToChannel,
currentMessageId,
});
};
const sendCompactionNotice = async (text: string) => {
try {
const noticePayloads = await applyFollowupReplyThreading([
{
text,
replyToCurrent: true,
isCompactionNotice: true,
},
]);
if (noticePayloads.length === 0) {
return;
}
await sendFollowupPayloads(noticePayloads, queued);
} catch (err) {
logVerbose(`followup queue: compaction notice failed (non-fatal): ${String(err)}`);
}
};
let autoCompactionCount = 0;
let runResult: Awaited<
ReturnType<typeof import("../../agents/pi-embedded.runtime.js").runEmbeddedPiAgent>
>;
let runResult: Awaited<ReturnType<typeof runEmbeddedPiAgent>>;
let fallbackProvider = queued.run.provider;
let fallbackModel = queued.run.model;
const activeSessionEntry =
@ -226,7 +173,6 @@ export function createFollowupRunner(params: {
const authProfile = resolveRunAuthProfile(queued.run, provider);
let attemptCompactionCount = 0;
try {
const { runEmbeddedPiAgent } = await loadPiEmbeddedRuntime();
const result = await runEmbeddedPiAgent({
allowGatewaySubagentBinding: true,
sessionId: queued.run.sessionId,
@ -278,14 +224,11 @@ export function createFollowupRunner(params: {
bootstrapPromptWarningSignaturesSeen[
bootstrapPromptWarningSignaturesSeen.length - 1
],
onAgentEvent: (evt: { stream: string; data?: Record<string, unknown> }) => {
onAgentEvent: (evt) => {
if (evt.stream !== "compaction") {
return;
}
const phase = typeof evt.data?.phase === "string" ? evt.data.phase : "";
if (phase === "start") {
void sendCompactionNotice("🧹 Compacting context...");
}
const phase = typeof evt.data.phase === "string" ? evt.data.phase : "";
const completed = evt.data?.completed === true;
if (phase === "end" && completed) {
attemptCompactionCount += 1;
@ -318,15 +261,9 @@ export function createFollowupRunner(params: {
const usage = runResult.meta?.agentMeta?.usage;
const promptTokens = runResult.meta?.agentMeta?.promptTokens;
const modelUsed = runResult.meta?.agentMeta?.model ?? fallbackModel ?? defaultModel;
const cachedContextTokens = lookupCachedContextTokens(modelUsed);
const lazyContextTokens =
agentCfgContextTokens == null && cachedContextTokens == null
? lookupContextTokens(modelUsed, { allowAsyncLoad: false })
: undefined;
const contextTokensUsed =
agentCfgContextTokens ??
cachedContextTokens ??
lazyContextTokens ??
lookupContextTokens(modelUsed) ??
sessionEntry?.contextTokens ??
DEFAULT_CONTEXT_TOKENS;
@ -347,7 +284,10 @@ export function createFollowupRunner(params: {
}
const payloadArray = runResult.payloads ?? [];
const sanitizedPayloads = payloadArray.flatMap((payload: ReplyPayload) => {
if (payloadArray.length === 0) {
return;
}
const sanitizedPayloads = payloadArray.flatMap((payload) => {
const text = payload.text;
if (!text || !text.includes("HEARTBEAT_OK")) {
return [payload];
@ -359,13 +299,23 @@ export function createFollowupRunner(params: {
}
return [{ ...payload, text: stripped.text }];
});
const replyTaggedPayloads = await applyFollowupReplyThreading(sanitizedPayloads);
const replyToChannel = resolveOriginMessageProvider({
originatingChannel: queued.originatingChannel,
provider: queued.run.messageProvider,
}) as OriginatingChannelType | undefined;
const replyToMode = resolveReplyToMode(
queued.run.config,
replyToChannel,
queued.originatingAccountId,
queued.originatingChatType,
);
const replyTaggedPayloads: ReplyPayload[] = applyReplyThreading({
payloads: sanitizedPayloads,
replyToMode,
replyToChannel,
});
const {
filterMessagingToolDuplicates,
filterMessagingToolMediaDuplicates,
shouldSuppressMessagingToolReplies,
} = await loadReplyPayloadsRuntime();
const dedupedPayloads = filterMessagingToolDuplicates({
payloads: replyTaggedPayloads,
sentTexts: runResult.messagingToolSentTexts ?? [],
@ -388,9 +338,14 @@ export function createFollowupRunner(params: {
accountId: queued.run.agentAccountId,
}),
});
let finalPayloads = suppressMessagingToolReplies ? [] : mediaFilteredPayloads;
const finalPayloads = suppressMessagingToolReplies ? [] : mediaFilteredPayloads;
if (finalPayloads.length === 0) {
return;
}
if (autoCompactionCount > 0) {
const previousSessionId = queued.run.sessionId;
const count = await incrementRunCompactionCount({
sessionEntry,
sessionStore,
@ -399,26 +354,27 @@ export function createFollowupRunner(params: {
amount: autoCompactionCount,
lastCallUsage: runResult.meta?.agentMeta?.lastCallUsage,
contextTokensUsed,
newSessionId: runResult.meta?.agentMeta?.sessionId,
});
const suffix = typeof count === "number" ? ` (count ${count})` : "";
const completionText =
queued.run.verboseLevel && queued.run.verboseLevel !== "off"
? `🧹 Auto-compaction complete${suffix}.`
: `✅ Context compacted${suffix}.`;
finalPayloads = [
...(await applyFollowupReplyThreading([
{
text: completionText,
replyToCurrent: true,
isCompactionNotice: true,
},
])),
...finalPayloads,
];
}
if (finalPayloads.length === 0) {
return;
const refreshedSessionEntry =
sessionKey && sessionStore ? sessionStore[sessionKey] : undefined;
if (refreshedSessionEntry) {
const queueKey = queued.run.sessionKey ?? sessionKey;
if (queueKey) {
refreshQueuedFollowupSession({
key: queueKey,
previousSessionId,
nextSessionId: refreshedSessionEntry.sessionId,
nextSessionFile: refreshedSessionEntry.sessionFile,
});
}
}
if (queued.run.verboseLevel && queued.run.verboseLevel !== "off") {
const suffix = typeof count === "number" ? ` (count ${count})` : "";
finalPayloads.unshift({
text: `🧹 Auto-compaction complete${suffix}.`,
});
}
}
await sendFollowupPayloads(finalPayloads, queued);

View File

@ -8,7 +8,7 @@ export {
resetRecentQueuedMessageIdDedupe,
} from "./queue/enqueue.js";
export { resolveQueueSettings } from "./queue/settings.js";
export { clearFollowupQueue } from "./queue/state.js";
export { clearFollowupQueue, refreshQueuedFollowupSession } from "./queue/state.js";
export type {
FollowupRun,
QueueDedupeMode,

View File

@ -85,3 +85,37 @@ export function clearFollowupQueue(key: string): number {
FOLLOWUP_QUEUES.delete(cleaned);
return cleared;
}
export function refreshQueuedFollowupSession(params: {
key: string;
previousSessionId?: string;
nextSessionId?: string;
nextSessionFile?: string;
}): void {
const cleaned = params.key.trim();
if (!cleaned || !params.previousSessionId || !params.nextSessionId) {
return;
}
if (params.previousSessionId === params.nextSessionId) {
return;
}
const queue = getExistingFollowupQueue(cleaned);
if (!queue) {
return;
}
const rewriteRun = (run?: FollowupRun["run"]) => {
if (!run || run.sessionId !== params.previousSessionId) {
return;
}
run.sessionId = params.nextSessionId!;
if (params.nextSessionFile?.trim()) {
run.sessionFile = params.nextSessionFile;
}
};
rewriteRun(queue.lastRun);
for (const item of queue.items) {
rewriteRun(item.run);
}
}

View File

@ -445,6 +445,93 @@ describe("incrementCompactionCount", () => {
expect(stored[sessionKey].outputTokens).toBeUndefined();
});
it("updates sessionId and sessionFile when compaction rotated transcripts", async () => {
const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-compact-rotate-"));
tempDirs.push(tmp);
const storePath = path.join(tmp, "sessions.json");
const sessionKey = "main";
const entry = {
sessionId: "s1",
sessionFile: path.join(tmp, "s1-topic-456.jsonl"),
updatedAt: Date.now(),
compactionCount: 0,
} as SessionEntry;
const sessionStore: Record<string, SessionEntry> = { [sessionKey]: entry };
await seedSessionStore({ storePath, sessionKey, entry });
await incrementCompactionCount({
sessionEntry: entry,
sessionStore,
sessionKey,
storePath,
newSessionId: "s2",
});
const stored = JSON.parse(await fs.readFile(storePath, "utf-8"));
const expectedDir = await fs.realpath(tmp);
expect(stored[sessionKey].sessionId).toBe("s2");
expect(stored[sessionKey].sessionFile).toBe(path.join(expectedDir, "s2-topic-456.jsonl"));
});
it("preserves fork transcript filenames when compaction rotates transcripts", async () => {
const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-compact-fork-"));
tempDirs.push(tmp);
const storePath = path.join(tmp, "sessions.json");
const sessionKey = "main";
const entry = {
sessionId: "s1",
sessionFile: path.join(tmp, "2026-03-23T12-34-56-789Z_s1.jsonl"),
updatedAt: Date.now(),
compactionCount: 0,
} as SessionEntry;
const sessionStore: Record<string, SessionEntry> = { [sessionKey]: entry };
await seedSessionStore({ storePath, sessionKey, entry });
await incrementCompactionCount({
sessionEntry: entry,
sessionStore,
sessionKey,
storePath,
newSessionId: "s2",
});
const stored = JSON.parse(await fs.readFile(storePath, "utf-8"));
const expectedDir = await fs.realpath(tmp);
expect(stored[sessionKey].sessionId).toBe("s2");
expect(stored[sessionKey].sessionFile).toBe(
path.join(expectedDir, "2026-03-23T12-34-56-789Z_s2.jsonl"),
);
});
it("falls back to the derived transcript path when rewritten absolute sessionFile is unsafe", async () => {
const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-compact-unsafe-"));
tempDirs.push(tmp);
const storePath = path.join(tmp, "sessions.json");
const sessionKey = "main";
const unsafePath = path.join(tmp, "outside", "s1.jsonl");
const entry = {
sessionId: "s1",
sessionFile: unsafePath,
updatedAt: Date.now(),
compactionCount: 0,
} as SessionEntry;
const sessionStore: Record<string, SessionEntry> = { [sessionKey]: entry };
await seedSessionStore({ storePath, sessionKey, entry });
await incrementCompactionCount({
sessionEntry: entry,
sessionStore,
sessionKey,
storePath,
newSessionId: "s2",
});
const stored = JSON.parse(await fs.readFile(storePath, "utf-8"));
const expectedDir = await fs.realpath(tmp);
expect(stored[sessionKey].sessionId).toBe("s2");
expect(stored[sessionKey].sessionFile).toBe(path.join(expectedDir, "s2.jsonl"));
});
it("increments compaction count by an explicit amount", async () => {
const entry = { sessionId: "s1", updatedAt: Date.now(), compactionCount: 2 } as SessionEntry;
const { storePath, sessionKey, sessionStore } = await createCompactionSessionFixture(entry);
@ -462,6 +549,55 @@ describe("incrementCompactionCount", () => {
expect(stored[sessionKey].compactionCount).toBe(4);
});
it("updates sessionId and sessionFile when newSessionId is provided", async () => {
const entry = {
sessionId: "old-session-id",
sessionFile: "old-session-id.jsonl",
updatedAt: Date.now(),
compactionCount: 1,
} as SessionEntry;
const { storePath, sessionKey, sessionStore } = await createCompactionSessionFixture(entry);
await incrementCompactionCount({
sessionEntry: entry,
sessionStore,
sessionKey,
storePath,
newSessionId: "new-session-id",
});
const stored = JSON.parse(await fs.readFile(storePath, "utf-8"));
const expectedSessionDir = await fs.realpath(path.dirname(storePath));
expect(stored[sessionKey].sessionId).toBe("new-session-id");
expect(stored[sessionKey].sessionFile).toBe(
path.join(expectedSessionDir, "new-session-id.jsonl"),
);
expect(stored[sessionKey].compactionCount).toBe(2);
});
it("does not update sessionFile when newSessionId matches current sessionId", async () => {
const entry = {
sessionId: "same-id",
sessionFile: "same-id.jsonl",
updatedAt: Date.now(),
compactionCount: 0,
} as SessionEntry;
const { storePath, sessionKey, sessionStore } = await createCompactionSessionFixture(entry);
await incrementCompactionCount({
sessionEntry: entry,
sessionStore,
sessionKey,
storePath,
newSessionId: "same-id",
});
const stored = JSON.parse(await fs.readFile(storePath, "utf-8"));
expect(stored[sessionKey].sessionId).toBe("same-id");
expect(stored[sessionKey].sessionFile).toBe("same-id.jsonl");
expect(stored[sessionKey].compactionCount).toBe(1);
});
it("does not update totalTokens when tokensAfter is not provided", async () => {
const entry = {
sessionId: "s1",

View File

@ -11,6 +11,7 @@ type IncrementRunCompactionCountParams = Omit<
amount?: number;
lastCallUsage?: NormalizedUsage;
contextTokensUsed?: number;
newSessionId?: string;
};
export async function persistRunSessionUsage(params: PersistRunSessionUsageParams): Promise<void> {
@ -33,5 +34,6 @@ export async function incrementRunCompactionCount(
storePath: params.storePath,
amount: params.amount,
tokensAfter: tokensAfterCompaction,
newSessionId: params.newSessionId,
});
}

View File

@ -1,10 +1,129 @@
import crypto from "node:crypto";
import fs from "node:fs";
import path from "node:path";
import { resolveUserTimezone } from "../../agents/date-time.js";
import { buildWorkspaceSkillSnapshot } from "../../agents/skills.js";
import { ensureSkillsWatcher, getSkillsSnapshotVersion } from "../../agents/skills/refresh.js";
import type { OpenClawConfig } from "../../config/config.js";
import { type SessionEntry, updateSessionStore } from "../../config/sessions.js";
import {
resolveSessionFilePath,
resolveSessionFilePathOptions,
type SessionEntry,
updateSessionStore,
} from "../../config/sessions.js";
import { buildChannelSummary } from "../../infra/channel-summary.js";
import {
resolveTimezone,
formatUtcTimestamp,
formatZonedTimestamp,
} from "../../infra/format-time/format-datetime.ts";
import { getRemoteSkillEligibility } from "../../infra/skills-remote.js";
export { drainFormattedSystemEvents } from "./session-system-events.js";
import { drainSystemEventEntries } from "../../infra/system-events.js";
import { resolveAgentIdFromSessionKey } from "../../routing/session-key.js";
/** Drain queued system events, format as `System:` lines, return the block (or undefined). */
export async function drainFormattedSystemEvents(params: {
cfg: OpenClawConfig;
sessionKey: string;
isMainSession: boolean;
isNewSession: boolean;
}): Promise<string | undefined> {
const compactSystemEvent = (line: string): string | null => {
const trimmed = line.trim();
if (!trimmed) {
return null;
}
const lower = trimmed.toLowerCase();
if (lower.includes("reason periodic")) {
return null;
}
// Filter out the actual heartbeat prompt, but not cron jobs that mention "heartbeat"
// The heartbeat prompt starts with "Read HEARTBEAT.md" - cron payloads won't match this
if (lower.startsWith("read heartbeat.md")) {
return null;
}
// Also filter heartbeat poll/wake noise
if (lower.includes("heartbeat poll") || lower.includes("heartbeat wake")) {
return null;
}
if (trimmed.startsWith("Node:")) {
return trimmed.replace(/ · last input [^·]+/i, "").trim();
}
return trimmed;
};
const resolveSystemEventTimezone = (cfg: OpenClawConfig) => {
const raw = cfg.agents?.defaults?.envelopeTimezone?.trim();
if (!raw) {
return { mode: "local" as const };
}
const lowered = raw.toLowerCase();
if (lowered === "utc" || lowered === "gmt") {
return { mode: "utc" as const };
}
if (lowered === "local" || lowered === "host") {
return { mode: "local" as const };
}
if (lowered === "user") {
return {
mode: "iana" as const,
timeZone: resolveUserTimezone(cfg.agents?.defaults?.userTimezone),
};
}
const explicit = resolveTimezone(raw);
return explicit ? { mode: "iana" as const, timeZone: explicit } : { mode: "local" as const };
};
const formatSystemEventTimestamp = (ts: number, cfg: OpenClawConfig) => {
const date = new Date(ts);
if (Number.isNaN(date.getTime())) {
return "unknown-time";
}
const zone = resolveSystemEventTimezone(cfg);
if (zone.mode === "utc") {
return formatUtcTimestamp(date, { displaySeconds: true });
}
if (zone.mode === "local") {
return formatZonedTimestamp(date, { displaySeconds: true }) ?? "unknown-time";
}
return (
formatZonedTimestamp(date, { timeZone: zone.timeZone, displaySeconds: true }) ??
"unknown-time"
);
};
const systemLines: string[] = [];
const queued = drainSystemEventEntries(params.sessionKey);
systemLines.push(
...queued
.map((event) => {
const compacted = compactSystemEvent(event.text);
if (!compacted) {
return null;
}
return `[${formatSystemEventTimestamp(event.ts, params.cfg)}] ${compacted}`;
})
.filter((v): v is string => Boolean(v)),
);
if (params.isMainSession && params.isNewSession) {
const summary = await buildChannelSummary(params.cfg);
if (summary.length > 0) {
systemLines.unshift(...summary);
}
}
if (systemLines.length === 0) {
return undefined;
}
// Format events as trusted System: lines for the message timeline.
// Inbound sanitization rewrites any user-supplied "System:" to "System (untrusted):",
// so these gateway-originated lines are distinguishable by the model.
// Each sub-line of a multi-line event gets its own System: prefix so continuation
// lines can't be mistaken for user content.
return systemLines
.flatMap((line) => line.split("\n").map((subline) => `System: ${subline}`))
.join("\n");
}
async function persistSessionEntryUpdate(params: {
sessionStore?: Record<string, SessionEntry>;
@ -147,6 +266,8 @@ export async function incrementCompactionCount(params: {
amount?: number;
/** Token count after compaction - if provided, updates session token counts */
tokensAfter?: number;
/** Session id after compaction, when the runtime rotated transcripts. */
newSessionId?: string;
}): Promise<number | undefined> {
const {
sessionEntry,
@ -156,6 +277,7 @@ export async function incrementCompactionCount(params: {
now = Date.now(),
amount = 1,
tokensAfter,
newSessionId,
} = params;
if (!sessionStore || !sessionKey) {
return undefined;
@ -171,6 +293,15 @@ export async function incrementCompactionCount(params: {
compactionCount: nextCount,
updatedAt: now,
};
if (newSessionId && newSessionId !== entry.sessionId) {
updates.sessionId = newSessionId;
updates.sessionFile = resolveCompactionSessionFile({
entry,
sessionKey,
storePath,
newSessionId,
});
}
// If tokensAfter is provided, update the cached token counts to reflect post-compaction state
if (tokensAfter != null && tokensAfter > 0) {
updates.totalTokens = tokensAfter;
@ -195,3 +326,72 @@ export async function incrementCompactionCount(params: {
}
return nextCount;
}
function resolveCompactionSessionFile(params: {
entry: SessionEntry;
sessionKey: string;
storePath?: string;
newSessionId: string;
}): string {
const agentId = resolveAgentIdFromSessionKey(params.sessionKey);
const pathOpts = resolveSessionFilePathOptions({
agentId,
storePath: params.storePath,
});
const rewrittenSessionFile = rewriteSessionFileForNewSessionId({
sessionFile: params.entry.sessionFile,
previousSessionId: params.entry.sessionId,
nextSessionId: params.newSessionId,
});
const normalizedRewrittenSessionFile =
rewrittenSessionFile && path.isAbsolute(rewrittenSessionFile)
? canonicalizeAbsoluteSessionFilePath(rewrittenSessionFile)
: rewrittenSessionFile;
return resolveSessionFilePath(
params.newSessionId,
normalizedRewrittenSessionFile ? { sessionFile: normalizedRewrittenSessionFile } : undefined,
pathOpts,
);
}
function canonicalizeAbsoluteSessionFilePath(filePath: string): string {
const resolved = path.resolve(filePath);
try {
const parentDir = fs.realpathSync(path.dirname(resolved));
return path.join(parentDir, path.basename(resolved));
} catch {
return resolved;
}
}
function rewriteSessionFileForNewSessionId(params: {
sessionFile?: string;
previousSessionId: string;
nextSessionId: string;
}): string | undefined {
const trimmed = params.sessionFile?.trim();
if (!trimmed) {
return undefined;
}
const base = path.basename(trimmed);
if (!base.endsWith(".jsonl")) {
return undefined;
}
const withoutExt = base.slice(0, -".jsonl".length);
if (withoutExt === params.previousSessionId) {
return path.join(path.dirname(trimmed), `${params.nextSessionId}.jsonl`);
}
if (withoutExt.startsWith(`${params.previousSessionId}-topic-`)) {
return path.join(
path.dirname(trimmed),
`${params.nextSessionId}${base.slice(params.previousSessionId.length)}`,
);
}
const forkMatch = withoutExt.match(
/^(\d{4}-\d{2}-\d{2}T[\w-]+(?:Z|[+-]\d{2}(?:-\d{2})?)?)_(.+)$/,
);
if (forkMatch?.[2] === params.previousSessionId) {
return path.join(path.dirname(trimmed), `${forkMatch[1]}_${params.nextSessionId}.jsonl`);
}
return undefined;
}

View File

@ -556,39 +556,6 @@ describe("readSessionMessages", () => {
expect((out[0] as { __openclaw?: { seq?: number } }).__openclaw?.seq).toBe(1);
}
});
test("preserves raw assistant transcript content on disk reads", () => {
const sessionId = "assistant-scaffolding";
const transcriptPath = path.join(tmpDir, `${sessionId}.jsonl`);
fs.writeFileSync(
transcriptPath,
[
JSON.stringify({ type: "session", version: 1, id: sessionId }),
JSON.stringify({
message: {
role: "assistant",
text: "<think>hidden</think>Visible top-level",
content: [
{ type: "text", text: "<think>secret</think>Visible content" },
{ type: "tool_result", text: "<think>keep?</think>Visible tool text" },
],
},
}),
].join("\n"),
"utf-8",
);
const out = readSessionMessages(sessionId, storePath);
expect(out).toHaveLength(1);
expect(out[0]).toMatchObject({
role: "assistant",
text: "<think>hidden</think>Visible top-level",
content: [
{ type: "text", text: "<think>secret</think>Visible content" },
{ type: "tool_result", text: "<think>keep?</think>Visible tool text" },
],
});
});
});
describe("readSessionPreviewItemsFromTranscript", () => {
@ -898,6 +865,78 @@ describe("resolveSessionTranscriptCandidates safety", () => {
expect(candidates.some((value) => value.includes("etc/passwd"))).toBe(false);
expect(normalizedCandidates).toContain(expectedFallback);
});
test("prefers the current sessionId transcript before a stale sessionFile candidate", () => {
const storePath = "/tmp/openclaw/agents/main/sessions/sessions.json";
const candidates = resolveSessionTranscriptCandidates(
"11111111-1111-4111-8111-111111111111",
storePath,
"/tmp/openclaw/agents/main/sessions/22222222-2222-4222-8222-222222222222.jsonl",
);
expect(candidates[0]).toBe(
path.resolve("/tmp/openclaw/agents/main/sessions/11111111-1111-4111-8111-111111111111.jsonl"),
);
expect(candidates).toContain(
path.resolve("/tmp/openclaw/agents/main/sessions/22222222-2222-4222-8222-222222222222.jsonl"),
);
});
test("keeps explicit custom sessionFile ahead of synthesized fallback", () => {
const storePath = "/tmp/openclaw/agents/main/sessions/sessions.json";
const sessionFile = "/tmp/openclaw/agents/main/sessions/custom-transcript.jsonl";
const candidates = resolveSessionTranscriptCandidates(
"11111111-1111-4111-8111-111111111111",
storePath,
sessionFile,
);
expect(candidates[0]).toBe(path.resolve(sessionFile));
});
test("keeps custom topic-like transcript paths ahead of synthesized fallback", () => {
const storePath = "/tmp/openclaw/agents/main/sessions/sessions.json";
const sessionFile = "/tmp/openclaw/agents/main/sessions/custom-topic-notes.jsonl";
const candidates = resolveSessionTranscriptCandidates(
"11111111-1111-4111-8111-111111111111",
storePath,
sessionFile,
);
expect(candidates[0]).toBe(path.resolve(sessionFile));
});
test("keeps forked transcript paths ahead of synthesized fallback", () => {
const storePath = "/tmp/openclaw/agents/main/sessions/sessions.json";
const sessionId = "11111111-1111-4111-8111-111111111111";
const sessionFile =
"/tmp/openclaw/agents/main/sessions/2026-03-23T16-30-00-000Z_11111111-1111-4111-8111-111111111111.jsonl";
const candidates = resolveSessionTranscriptCandidates(sessionId, storePath, sessionFile);
expect(candidates[0]).toBe(path.resolve(sessionFile));
});
test("keeps timestamped custom transcript paths ahead of synthesized fallback", () => {
const storePath = "/tmp/openclaw/agents/main/sessions/sessions.json";
const sessionId = "11111111-1111-4111-8111-111111111111";
const sessionFile = "/tmp/openclaw/agents/main/sessions/2026-03-23T16-30-00-000Z_notes.jsonl";
const candidates = resolveSessionTranscriptCandidates(sessionId, storePath, sessionFile);
expect(candidates[0]).toBe(path.resolve(sessionFile));
});
test("still treats generated topic transcripts from another session as stale", () => {
const storePath = "/tmp/openclaw/agents/main/sessions/sessions.json";
const sessionId = "11111111-1111-4111-8111-111111111111";
const staleSessionFile =
"/tmp/openclaw/agents/main/sessions/22222222-2222-4222-8222-222222222222-topic-thread.jsonl";
const candidates = resolveSessionTranscriptCandidates(sessionId, storePath, staleSessionFile);
expect(candidates[0]).toBe(
path.resolve("/tmp/openclaw/agents/main/sessions/11111111-1111-4111-8111-111111111111.jsonl"),
);
expect(candidates).toContain(path.resolve(staleSessionFile));
});
});
describe("archiveSessionTranscripts", () => {

View File

@ -3,16 +3,13 @@ import os from "node:os";
import path from "node:path";
import { deriveSessionTotalTokens, hasNonzeroUsage, normalizeUsage } from "../agents/usage.js";
import {
formatSessionArchiveTimestamp,
parseSessionArchiveTimestamp,
type SessionArchiveReason,
resolveSessionFilePath,
resolveSessionTranscriptPath,
resolveSessionTranscriptPathInDir,
} from "../config/sessions.js";
export {
archiveFileOnDisk,
archiveSessionTranscripts,
cleanupArchivedSessionTranscripts,
type ArchiveFileReason,
} from "../gateway/session-archive.fs.js";
import { resolveRequiredHomeDir } from "../infra/home-dir.js";
import { jsonUtf8Bytes } from "../infra/json-utf8-bytes.js";
import { hasInterSessionUserProvenance } from "../sessions/input-provenance.js";
@ -159,6 +156,7 @@ export function resolveSessionTranscriptCandidates(
agentId?: string,
): string[] {
const candidates: string[] = [];
const sessionFileState = classifySessionTranscriptCandidate(sessionId, sessionFile);
const pushCandidate = (resolve: () => string): void => {
try {
candidates.push(resolve());
@ -169,15 +167,22 @@ export function resolveSessionTranscriptCandidates(
if (storePath) {
const sessionsDir = path.dirname(storePath);
if (sessionFile) {
if (sessionFile && sessionFileState !== "stale") {
pushCandidate(() =>
resolveSessionFilePath(sessionId, { sessionFile }, { sessionsDir, agentId }),
);
}
pushCandidate(() => resolveSessionTranscriptPathInDir(sessionId, sessionsDir));
if (sessionFile && sessionFileState === "stale") {
pushCandidate(() =>
resolveSessionFilePath(sessionId, { sessionFile }, { sessionsDir, agentId }),
);
}
} else if (sessionFile) {
if (agentId) {
pushCandidate(() => resolveSessionFilePath(sessionId, { sessionFile }, { agentId }));
if (sessionFileState !== "stale") {
pushCandidate(() => resolveSessionFilePath(sessionId, { sessionFile }, { agentId }));
}
} else {
const trimmed = sessionFile.trim();
if (trimmed) {
@ -188,6 +193,9 @@ export function resolveSessionTranscriptCandidates(
if (agentId) {
pushCandidate(() => resolveSessionTranscriptPath(sessionId, agentId));
if (sessionFile && sessionFileState === "stale") {
pushCandidate(() => resolveSessionFilePath(sessionId, { sessionFile }, { agentId }));
}
}
const home = resolveRequiredHomeDir(process.env, os.homedir);
@ -197,6 +205,151 @@ export function resolveSessionTranscriptCandidates(
return Array.from(new Set(candidates));
}
export type ArchiveFileReason = SessionArchiveReason;
function classifySessionTranscriptCandidate(
sessionId: string,
sessionFile?: string,
): "current" | "stale" | "custom" {
const transcriptSessionId = extractGeneratedTranscriptSessionId(sessionFile);
if (!transcriptSessionId) {
return "custom";
}
return transcriptSessionId === sessionId ? "current" : "stale";
}
function extractGeneratedTranscriptSessionId(sessionFile?: string): string | undefined {
const trimmed = sessionFile?.trim();
if (!trimmed) {
return undefined;
}
const base = path.basename(trimmed);
if (!base.endsWith(".jsonl")) {
return undefined;
}
const withoutExt = base.slice(0, -".jsonl".length);
const topicIndex = withoutExt.indexOf("-topic-");
if (topicIndex > 0) {
const topicSessionId = withoutExt.slice(0, topicIndex);
return looksLikeGeneratedSessionId(topicSessionId) ? topicSessionId : undefined;
}
const forkMatch = withoutExt.match(
/^(\d{4}-\d{2}-\d{2}T[\w-]+(?:Z|[+-]\d{2}(?:-\d{2})?)?)_(.+)$/,
);
if (forkMatch?.[2]) {
return looksLikeGeneratedSessionId(forkMatch[2]) ? forkMatch[2] : undefined;
}
if (looksLikeGeneratedSessionId(withoutExt)) {
return withoutExt;
}
return undefined;
}
function looksLikeGeneratedSessionId(value: string): boolean {
return /^[0-9a-f]{8}-[0-9a-f]{4}-[1-8][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i.test(value);
}
function canonicalizePathForComparison(filePath: string): string {
const resolved = path.resolve(filePath);
try {
return fs.realpathSync(resolved);
} catch {
return resolved;
}
}
export function archiveFileOnDisk(filePath: string, reason: ArchiveFileReason): string {
const ts = formatSessionArchiveTimestamp();
const archived = `${filePath}.${reason}.${ts}`;
fs.renameSync(filePath, archived);
return archived;
}
/**
* Archives all transcript files for a given session.
* Best-effort: silently skips files that don't exist or fail to rename.
*/
export function archiveSessionTranscripts(opts: {
sessionId: string;
storePath: string | undefined;
sessionFile?: string;
agentId?: string;
reason: "reset" | "deleted";
/**
* When true, only archive files resolved under the session store directory.
* This prevents maintenance operations from mutating paths outside the agent sessions dir.
*/
restrictToStoreDir?: boolean;
}): string[] {
const archived: string[] = [];
const storeDir =
opts.restrictToStoreDir && opts.storePath
? canonicalizePathForComparison(path.dirname(opts.storePath))
: null;
for (const candidate of resolveSessionTranscriptCandidates(
opts.sessionId,
opts.storePath,
opts.sessionFile,
opts.agentId,
)) {
const candidatePath = canonicalizePathForComparison(candidate);
if (storeDir) {
const relative = path.relative(storeDir, candidatePath);
if (!relative || relative.startsWith("..") || path.isAbsolute(relative)) {
continue;
}
}
if (!fs.existsSync(candidatePath)) {
continue;
}
try {
archived.push(archiveFileOnDisk(candidatePath, opts.reason));
} catch {
// Best-effort.
}
}
return archived;
}
export async function cleanupArchivedSessionTranscripts(opts: {
directories: string[];
olderThanMs: number;
reason?: ArchiveFileReason;
nowMs?: number;
}): Promise<{ removed: number; scanned: number }> {
if (!Number.isFinite(opts.olderThanMs) || opts.olderThanMs < 0) {
return { removed: 0, scanned: 0 };
}
const now = opts.nowMs ?? Date.now();
const reason: ArchiveFileReason = opts.reason ?? "deleted";
const directories = Array.from(new Set(opts.directories.map((dir) => path.resolve(dir))));
let removed = 0;
let scanned = 0;
for (const dir of directories) {
const entries = await fs.promises.readdir(dir).catch(() => []);
for (const entry of entries) {
const timestamp = parseSessionArchiveTimestamp(entry, reason);
if (timestamp == null) {
continue;
}
scanned += 1;
if (now - timestamp <= opts.olderThanMs) {
continue;
}
const fullPath = path.join(dir, entry);
const stat = await fs.promises.stat(fullPath).catch(() => null);
if (!stat?.isFile()) {
continue;
}
await fs.promises.rm(fullPath).catch(() => undefined);
removed += 1;
}
}
return { removed, scanned };
}
export function capArrayByJsonBytes<T>(
items: T[],
maxBytes: number,