diff --git a/src/auto-reply/reply/agent-runner-memory.ts b/src/auto-reply/reply/agent-runner-memory.ts index 78cba1d6d44..d3a1df8e06c 100644 --- a/src/auto-reply/reply/agent-runner-memory.ts +++ b/src/auto-reply/reply/agent-runner-memory.ts @@ -37,9 +37,8 @@ import { resolveMemoryFlushPromptForRun, resolveMemoryFlushSettings, shouldRunMemoryFlush, - computeContextHash, } from "./memory-flush.js"; -import type { FollowupRun } from "./queue.js"; +import { refreshQueuedFollowupSession, type FollowupRun } from "./queue.js"; import { incrementCompactionCount } from "./session-updates.js"; export function estimatePromptTokensForMemoryFlush(prompt?: string): number | undefined { @@ -448,47 +447,6 @@ export async function runMemoryFlushIfNeeded(params: { return entry ?? params.sessionEntry; } - // --- Content hash dedup (state-based) --- - // Read the tail of the session transcript and compute a lightweight hash. - // If the hash matches the last flush, the context hasn't materially changed - // and flushing again would produce duplicate memory entries (#30115). - const sessionFilePath = await resolveSessionFilePathForFlush( - params.followupRun.run.sessionId, - entry ?? params.sessionEntry, - params.storePath, - params.sessionKey ? resolveAgentIdFromSessionKey(params.sessionKey) : undefined, - ); - let contextHashBeforeFlush: string | undefined; - if (sessionFilePath) { - try { - const tailMessages = await readTranscriptTailMessages(sessionFilePath, 10); - // Include the pending prompt in the hash — runMemoryFlushIfNeeded runs - // before the current prompt is appended to the transcript, so the - // persisted tail alone would match the post-flush hash and incorrectly - // skip the next flush even when a new user message arrived. - const currentPrompt = params.followupRun.prompt; - if (currentPrompt) { - tailMessages.push({ role: "user", content: currentPrompt }); - } - if (tailMessages.length === 0) { - logVerbose( - `memoryFlush dedup skipped (no tail messages extracted): sessionKey=${params.sessionKey}`, - ); - } - contextHashBeforeFlush = - tailMessages.length > 0 ? computeContextHash(tailMessages) : undefined; - const previousHash = entry?.memoryFlushContextHash; - if (previousHash && contextHashBeforeFlush === previousHash) { - logVerbose( - `memoryFlush skipped (context hash unchanged): sessionKey=${params.sessionKey} hash=${contextHashBeforeFlush}`, - ); - return entry ?? params.sessionEntry; - } - } catch (err) { - logVerbose(`memoryFlush hash check failed, proceeding with flush: ${String(err)}`); - } - } - logVerbose( `memoryFlush triggered: sessionKey=${params.sessionKey} tokenCount=${tokenCountForFlush ?? "undefined"} threshold=${flushThreshold}`, ); @@ -507,7 +465,6 @@ export async function runMemoryFlushIfNeeded(params: { }); } let memoryCompactionCompleted = false; - let fallbackFlushAttemptedForCurrentHash = false; const memoryFlushNowMs = Date.now(); const memoryFlushWritePath = resolveMemoryFlushRelativePathForRun({ cfg: params.cfg, @@ -519,21 +476,12 @@ export async function runMemoryFlushIfNeeded(params: { ] .filter(Boolean) .join("\n\n"); + let postCompactionSessionId: string | undefined; try { await runWithModelFallback({ ...resolveModelFallbackOptions(params.followupRun.run), runId: flushRunId, run: async (provider, model, runOptions) => { - if (contextHashBeforeFlush && fallbackFlushAttemptedForCurrentHash) { - logVerbose( - `memoryFlush fallback candidate skipped (context hash already attempted): sessionKey=${params.sessionKey} hash=${contextHashBeforeFlush} provider=${provider} model=${model}`, - ); - // A prior candidate already attempted this exact flush context. Be - // conservative and skip later candidates so a write-then-throw failure - // cannot append the same memory twice during a single fallback cycle. - return { payloads: [], meta: {} }; - } - fallbackFlushAttemptedForCurrentHash = Boolean(contextHashBeforeFlush); const { embeddedContext, senderContext, runBaseParams } = buildEmbeddedRunExecutionParams({ run: params.followupRun.run, sessionCtx: params.sessionCtx, @@ -562,12 +510,15 @@ export async function runMemoryFlushIfNeeded(params: { onAgentEvent: (evt) => { if (evt.stream === "compaction") { const phase = typeof evt.data.phase === "string" ? evt.data.phase : ""; - if (phase === "end" && evt.data.completed === true) { + if (phase === "end") { memoryCompactionCompleted = true; } } }, }); + if (result.meta?.agentMeta?.sessionId) { + postCompactionSessionId = result.meta.agentMeta.sessionId; + } bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen( result.meta?.systemPromptReport, ); @@ -579,45 +530,51 @@ export async function runMemoryFlushIfNeeded(params: { (params.sessionKey ? activeSessionStore?.[params.sessionKey]?.compactionCount : 0) ?? 0; if (memoryCompactionCompleted) { + const previousSessionId = activeSessionEntry?.sessionId ?? params.followupRun.run.sessionId; const nextCount = await incrementCompactionCount({ sessionEntry: activeSessionEntry, sessionStore: activeSessionStore, sessionKey: params.sessionKey, storePath: params.storePath, + newSessionId: postCompactionSessionId, }); + const updatedEntry = params.sessionKey ? activeSessionStore?.[params.sessionKey] : undefined; + if (updatedEntry) { + activeSessionEntry = updatedEntry; + params.followupRun.run.sessionId = updatedEntry.sessionId; + if (updatedEntry.sessionFile) { + params.followupRun.run.sessionFile = updatedEntry.sessionFile; + } + const queueKey = params.followupRun.run.sessionKey ?? params.sessionKey; + if (queueKey) { + refreshQueuedFollowupSession({ + key: queueKey, + previousSessionId, + nextSessionId: updatedEntry.sessionId, + nextSessionFile: updatedEntry.sessionFile, + }); + } + } if (typeof nextCount === "number") { memoryFlushCompactionCount = nextCount; } } if (params.storePath && params.sessionKey) { try { - // Re-hash the transcript AFTER the flush so the stored hash matches - // what the next pre-flush check will compute (the transcript now - // includes the flush turn's messages). (#34222) - let contextHashAfterFlush = contextHashBeforeFlush; - if (sessionFilePath) { - try { - const postFlushMessages = await readTranscriptTailMessages(sessionFilePath, 10); - if (postFlushMessages.length > 0) { - contextHashAfterFlush = computeContextHash(postFlushMessages); - } - } catch { - // Best-effort: fall back to pre-flush hash if re-read fails. - } - } const updatedEntry = await updateSessionStoreEntry({ storePath: params.storePath, sessionKey: params.sessionKey, update: async () => ({ memoryFlushAt: Date.now(), memoryFlushCompactionCount, - // Always write the hash field — when rehashing fails, clearing - // the stale value prevents incorrect dedup on subsequent flushes. - memoryFlushContextHash: contextHashAfterFlush ?? undefined, }), }); if (updatedEntry) { activeSessionEntry = updatedEntry; + params.followupRun.run.sessionId = updatedEntry.sessionId; + if (updatedEntry.sessionFile) { + params.followupRun.run.sessionFile = updatedEntry.sessionFile; + } } } catch (err) { logVerbose(`failed to persist memory flush metadata: ${String(err)}`); @@ -629,64 +586,3 @@ export async function runMemoryFlushIfNeeded(params: { return activeSessionEntry; } - -/** - * Resolve the session transcript file path for flush hash computation. - */ -async function resolveSessionFilePathForFlush( - sessionId: string | undefined, - entry: SessionEntry | undefined, - storePath: string | undefined, - agentId: string | undefined, -): Promise { - if (!sessionId) { - return undefined; - } - const resolved = resolveSessionFilePath( - sessionId, - entry, - resolveSessionFilePathOptions({ agentId, storePath }), - ); - return resolved ?? undefined; -} - -/** - * Read the last N messages from a session transcript file. - * Only reads the tail of the file to avoid loading multi-MB transcripts. - */ -async function readTranscriptTailMessages( - filePath: string, - maxMessages: number, -): Promise> { - const TAIL_BYTES = 64 * 1024; - const handle = await fs.promises.open(filePath, "r"); - try { - const stat = await handle.stat(); - const start = Math.max(0, stat.size - TAIL_BYTES); - const readLen = Math.min(stat.size, TAIL_BYTES); - const buf = Buffer.alloc(readLen); - await handle.read(buf, 0, readLen, start); - const tail = buf.toString("utf-8"); - const nlIdx = tail.indexOf("\n"); - const trimmed = start > 0 ? (nlIdx >= 0 ? tail.slice(nlIdx + 1) : "") : tail; - const lines = trimmed.split(/\r?\n/); - const messages: Array<{ role?: string; content?: unknown }> = []; - for (let i = lines.length - 1; i >= 0 && messages.length < maxMessages; i--) { - const line = lines[i].trim(); - if (!line) { - continue; - } - try { - const parsed = JSON.parse(line); - if (parsed?.message?.role) { - messages.unshift({ role: parsed.message.role, content: parsed.message.content }); - } - } catch { - // Skip malformed lines - } - } - return messages; - } finally { - await handle.close(); - } -} diff --git a/src/auto-reply/reply/agent-runner.misc.runreplyagent.test.ts b/src/auto-reply/reply/agent-runner.misc.runreplyagent.test.ts index bd597c8585a..534b2d65a4a 100644 --- a/src/auto-reply/reply/agent-runner.misc.runreplyagent.test.ts +++ b/src/auto-reply/reply/agent-runner.misc.runreplyagent.test.ts @@ -15,16 +15,6 @@ const runEmbeddedPiAgentMock = vi.fn(); const runCliAgentMock = vi.fn(); const runWithModelFallbackMock = vi.fn(); const runtimeErrorMock = vi.fn(); -const runMemoryFlushIfNeededMock = vi.hoisted(() => - vi.fn(async ({ sessionEntry }) => sessionEntry), -); -const createReplyMediaPathNormalizerMock = vi.hoisted(() => - vi.fn( - (_params?: unknown) => - async (payload: T) => - payload, - ), -); vi.mock("../../agents/model-fallback.js", () => ({ runWithModelFallback: (params: { @@ -68,14 +58,6 @@ vi.mock("../../runtime.js", async () => { }; }); -vi.mock("./agent-runner-memory.runtime.js", () => ({ - runMemoryFlushIfNeeded: (params: unknown) => runMemoryFlushIfNeededMock(params), -})); - -vi.mock("./reply-media-paths.runtime.js", () => ({ - createReplyMediaPathNormalizer: (params: unknown) => createReplyMediaPathNormalizerMock(params), -})); - vi.mock("./queue.js", async () => { const actual = await vi.importActual("./queue.js"); return { @@ -103,40 +85,10 @@ type RunWithModelFallbackParams = { }; beforeEach(() => { - vi.useRealTimers(); - vi.clearAllTimers(); runEmbeddedPiAgentMock.mockClear(); runCliAgentMock.mockClear(); runWithModelFallbackMock.mockClear(); runtimeErrorMock.mockClear(); - runMemoryFlushIfNeededMock.mockClear(); - runMemoryFlushIfNeededMock.mockImplementation( - async ({ - sessionEntry, - followupRun, - }: { - sessionEntry?: SessionEntry; - followupRun: FollowupRun; - }) => { - if (!sessionEntry || (sessionEntry.totalTokens ?? 0) < 1_000_000) { - return sessionEntry; - } - await runWithModelFallbackMock({ - provider: followupRun.run.provider, - model: followupRun.run.model, - run: async (provider: string, model: string) => - await runEmbeddedPiAgentMock({ - provider, - model, - prompt: "Pre-compaction memory flush.", - enforceFinalTag: provider.includes("gemini") ? true : undefined, - }), - }); - return sessionEntry; - }, - ); - createReplyMediaPathNormalizerMock.mockClear(); - createReplyMediaPathNormalizerMock.mockImplementation(() => async (payload) => payload); loadCronStoreMock.mockClear(); // Default: no cron jobs in store. loadCronStoreMock.mockResolvedValue({ version: 1, jobs: [] }); @@ -153,7 +105,6 @@ beforeEach(() => { }); afterEach(() => { - vi.clearAllTimers(); vi.useRealTimers(); resetSystemEventsForTest(); }); @@ -388,6 +339,11 @@ describe("runReplyAgent auto-compaction token update", () => { ); } + async function normalizeComparablePath(filePath: string): Promise { + const parent = await fs.realpath(path.dirname(filePath)).catch(() => path.dirname(filePath)); + return path.join(parent, path.basename(filePath)); + } + function createBaseRun(params: { storePath: string; sessionEntry: Record; @@ -436,6 +392,7 @@ describe("runReplyAgent auto-compaction token update", () => { const sessionKey = "main"; const sessionEntry = { sessionId: "session", + sessionFile: path.join(tmp, "session.jsonl"), updatedAt: Date.now(), totalTokens: 181_000, compactionCount: 0, @@ -524,6 +481,7 @@ describe("runReplyAgent auto-compaction token update", () => { payloads: [{ text: "done" }], meta: { agentMeta: { + sessionId: "session-rotated", usage: { input: 190_000, output: 8_000, total: 198_000 }, lastCallUsage: { input: 10_000, output: 3_000, total: 13_000 }, compactionCount: 2, @@ -568,6 +526,10 @@ describe("runReplyAgent auto-compaction token update", () => { const stored = JSON.parse(await fs.readFile(storePath, "utf-8")); expect(stored[sessionKey].totalTokens).toBe(10_000); expect(stored[sessionKey].compactionCount).toBe(2); + expect(stored[sessionKey].sessionId).toBe("session-rotated"); + expect(await normalizeComparablePath(stored[sessionKey].sessionFile)).toBe( + await normalizeComparablePath(path.join(tmp, "session-rotated.jsonl")), + ); }); it("accumulates compactions across fallback attempts without double-counting a single attempt", async () => { diff --git a/src/auto-reply/reply/agent-runner.runreplyagent.e2e.test.ts b/src/auto-reply/reply/agent-runner.runreplyagent.e2e.test.ts index 830a6af8779..8f27f1b3ff6 100644 --- a/src/auto-reply/reply/agent-runner.runreplyagent.e2e.test.ts +++ b/src/auto-reply/reply/agent-runner.runreplyagent.e2e.test.ts @@ -8,9 +8,14 @@ import type { TypingMode } from "../../config/types.js"; import { withStateDirEnv } from "../../test-helpers/state-dir-env.js"; import type { TemplateContext } from "../templating.js"; import type { GetReplyOptions } from "../types.js"; -import type { FollowupRun, QueueSettings } from "./queue.js"; +import { + enqueueFollowupRun, + refreshQueuedFollowupSession, + scheduleFollowupDrain, + type FollowupRun, + type QueueSettings, +} from "./queue.js"; import { createMockTypingController } from "./test-helpers.js"; -import { createTypingSignaler } from "./typing-mode.js"; type AgentRunParams = { onPartialReply?: (payload: { text?: string }) => Promise | void; @@ -25,12 +30,11 @@ type EmbeddedRunParams = { prompt?: string; extraSystemPrompt?: string; memoryFlushWritePath?: string; + sessionId?: string; + sessionFile?: string; bootstrapPromptWarningSignaturesSeen?: string[]; bootstrapPromptWarningSignature?: string; - onAgentEvent?: (evt: { - stream?: string; - data?: { phase?: string; willRetry?: boolean; completed?: boolean }; - }) => void; + onAgentEvent?: (evt: { stream?: string; data?: { phase?: string; willRetry?: boolean } }) => void; }; const state = vi.hoisted(() => ({ @@ -38,19 +42,8 @@ const state = vi.hoisted(() => ({ runCliAgentMock: vi.fn(), })); -const accountingState = vi.hoisted(() => ({ - persistRunSessionUsageMock: vi.fn(), - incrementRunCompactionCountMock: vi.fn(), - persistRunSessionUsageActual: null as null | ((params: unknown) => Promise), - incrementRunCompactionCountActual: null as - | null - | ((params: unknown) => Promise), -})); - let modelFallbackModule: typeof import("../../agents/model-fallback.js"); let onAgentEvent: typeof import("../../infra/agent-events.js").onAgentEvent; -let enqueueFollowupRunMock: typeof import("./queue/enqueue.js").enqueueFollowupRun; -let scheduleFollowupDrainMock: typeof import("./queue.js").scheduleFollowupDrain; let runReplyAgentPromise: | Promise<(typeof import("./agent-runner.js"))["runReplyAgent"]> @@ -63,99 +56,51 @@ async function getRunReplyAgent() { return await runReplyAgentPromise; } -vi.mock("../../agents/model-fallback.js", async (importOriginal) => { - const actual = await importOriginal(); - return { - ...actual, - runWithModelFallback: async ({ - provider, - model, - run, - }: { - provider: string; - model: string; - run: (provider: string, model: string) => Promise; - }) => ({ - result: await run(provider, model), - provider, - model, - attempts: [], - }), - }; -}); +vi.mock("../../agents/model-fallback.js", () => ({ + runWithModelFallback: async ({ + provider, + model, + run, + }: { + provider: string; + model: string; + run: (provider: string, model: string) => Promise; + }) => ({ + result: await run(provider, model), + provider, + model, + attempts: [], + }), +})); -vi.mock("../../agents/pi-embedded.js", async (importOriginal) => { - const actual = await importOriginal(); - return { - ...actual, - queueEmbeddedPiMessage: vi.fn().mockReturnValue(false), - runEmbeddedPiAgent: (params: unknown) => state.runEmbeddedPiAgentMock(params), - }; -}); +vi.mock("../../agents/pi-embedded.js", () => ({ + queueEmbeddedPiMessage: vi.fn().mockReturnValue(false), + runEmbeddedPiAgent: (params: unknown) => state.runEmbeddedPiAgentMock(params), +})); -vi.mock("../../agents/cli-runner.js", async (importOriginal) => { - const actual = await importOriginal(); - return { - ...actual, - runCliAgent: (params: unknown) => state.runCliAgentMock(params), - }; -}); +vi.mock("../../agents/cli-runner.js", () => ({ + runCliAgent: (params: unknown) => state.runCliAgentMock(params), +})); -vi.mock("./queue/enqueue.js", async (importOriginal) => { - const actual = await importOriginal(); - return { - ...actual, - enqueueFollowupRun: vi.fn(), - }; -}); - -vi.mock("./queue.js", async (importOriginal) => { - const actual = await importOriginal(); - return { - ...actual, - scheduleFollowupDrain: vi.fn(), - }; -}); - -vi.mock("./session-run-accounting.js", async (importOriginal) => { - const actual = await importOriginal(); - accountingState.persistRunSessionUsageActual = actual.persistRunSessionUsage as ( - params: unknown, - ) => Promise; - accountingState.incrementRunCompactionCountActual = actual.incrementRunCompactionCount as ( - params: unknown, - ) => Promise; - return { - ...actual, - persistRunSessionUsage: (params: unknown) => accountingState.persistRunSessionUsageMock(params), - incrementRunCompactionCount: (params: unknown) => - accountingState.incrementRunCompactionCountMock(params), - }; -}); +vi.mock("./queue.js", () => ({ + enqueueFollowupRun: vi.fn(), + refreshQueuedFollowupSession: vi.fn(), + scheduleFollowupDrain: vi.fn(), +})); beforeAll(async () => { + // Avoid attributing the initial agent-runner import cost to the first test case. modelFallbackModule = await import("../../agents/model-fallback.js"); ({ onAgentEvent } = await import("../../infra/agent-events.js")); - ({ enqueueFollowupRun: enqueueFollowupRunMock } = await import("./queue/enqueue.js")); - ({ scheduleFollowupDrain: scheduleFollowupDrainMock } = await import("./queue.js")); await getRunReplyAgent(); }); -beforeEach(async () => { - ({ enqueueFollowupRun: enqueueFollowupRunMock } = await import("./queue/enqueue.js")); - ({ scheduleFollowupDrain: scheduleFollowupDrainMock } = await import("./queue.js")); +beforeEach(() => { state.runEmbeddedPiAgentMock.mockClear(); state.runCliAgentMock.mockClear(); - vi.mocked(enqueueFollowupRunMock).mockClear(); - vi.mocked(scheduleFollowupDrainMock).mockClear(); - accountingState.persistRunSessionUsageMock.mockReset(); - accountingState.incrementRunCompactionCountMock.mockReset(); - accountingState.persistRunSessionUsageMock.mockImplementation(async (params: unknown) => { - await accountingState.persistRunSessionUsageActual?.(params); - }); - accountingState.incrementRunCompactionCountMock.mockImplementation(async (params: unknown) => { - return await accountingState.incrementRunCompactionCountActual?.(params); - }); + vi.mocked(enqueueFollowupRun).mockClear(); + vi.mocked(refreshQueuedFollowupSession).mockClear(); + vi.mocked(scheduleFollowupDrain).mockClear(); vi.stubEnv("OPENCLAW_TEST_FAST", "1"); }); @@ -361,7 +306,7 @@ describe("runReplyAgent heartbeat followup guard", () => { const result = await run(); expect(result).toBeUndefined(); - expect(vi.mocked(enqueueFollowupRunMock)).not.toHaveBeenCalled(); + expect(vi.mocked(enqueueFollowupRun)).not.toHaveBeenCalled(); expect(state.runEmbeddedPiAgentMock).not.toHaveBeenCalled(); expect(typing.cleanup).toHaveBeenCalledTimes(1); }); @@ -377,20 +322,27 @@ describe("runReplyAgent heartbeat followup guard", () => { const result = await run(); expect(result).toBeUndefined(); - expect(vi.mocked(enqueueFollowupRunMock)).toHaveBeenCalledTimes(1); + expect(vi.mocked(enqueueFollowupRun)).toHaveBeenCalledTimes(1); expect(state.runEmbeddedPiAgentMock).not.toHaveBeenCalled(); }); it("drains followup queue when an unexpected exception escapes the run path", async () => { - accountingState.persistRunSessionUsageMock.mockRejectedValueOnce(new Error("persist exploded")); + const accounting = await import("./session-run-accounting.js"); + const persistSpy = vi + .spyOn(accounting, "persistRunSessionUsage") + .mockRejectedValueOnce(new Error("persist exploded")); state.runEmbeddedPiAgentMock.mockResolvedValueOnce({ payloads: [{ text: "ok" }], meta: { agentMeta: { usage: { input: 1, output: 1 } } }, }); - const { run } = createMinimalRun(); - await expect(run()).rejects.toThrow("persist exploded"); - expect(vi.mocked(scheduleFollowupDrainMock)).toHaveBeenCalledTimes(1); + try { + const { run } = createMinimalRun(); + await expect(run()).rejects.toThrow("persist exploded"); + expect(vi.mocked(scheduleFollowupDrain)).toHaveBeenCalledTimes(1); + } finally { + persistSpy.mockRestore(); + } }); }); @@ -425,16 +377,15 @@ describe("runReplyAgent typing (heartbeat)", () => { it("signals typing for normal runs", async () => { const onPartialReply = vi.fn(); - const typing = createMockTypingController(); - const typingSignals = createTypingSignaler({ - typing, - mode: "instant", - isHeartbeat: false, + state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => { + await params.onPartialReply?.({ text: "hi" }); + return { payloads: [{ text: "final" }], meta: {} }; }); - await typingSignals.signalRunStart(); - await typingSignals.signalTextDelta("hi"); - await onPartialReply({ text: "hi", mediaUrls: undefined }); + const { run, typing } = createMinimalRun({ + opts: { isHeartbeat: false, onPartialReply }, + }); + await run(); expect(onPartialReply).toHaveBeenCalled(); expect(typing.startTypingOnText).toHaveBeenCalledWith("hi"); @@ -443,16 +394,15 @@ describe("runReplyAgent typing (heartbeat)", () => { it("never signals typing for heartbeat runs", async () => { const onPartialReply = vi.fn(); - const typing = createMockTypingController(); - const typingSignals = createTypingSignaler({ - typing, - mode: "instant", - isHeartbeat: true, + state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => { + await params.onPartialReply?.({ text: "hi" }); + return { payloads: [{ text: "final" }], meta: {} }; }); - await typingSignals.signalRunStart(); - await typingSignals.signalTextDelta("hi"); - await onPartialReply({ text: "hi", mediaUrls: undefined }); + const { run, typing } = createMinimalRun({ + opts: { isHeartbeat: true, onPartialReply }, + }); + await run(); expect(onPartialReply).toHaveBeenCalled(); expect(typing.startTypingOnText).not.toHaveBeenCalled(); @@ -686,37 +636,26 @@ describe("runReplyAgent typing (heartbeat)", () => { it("retries transient HTTP failures once with timer-driven backoff", async () => { vi.useFakeTimers(); - try { - let calls = 0; - state.runEmbeddedPiAgentMock.mockImplementation(async () => { - calls += 1; - if (calls === 1) { - throw new Error("502 Bad Gateway"); - } - return { payloads: [{ text: "final" }], meta: {} }; - }); + let calls = 0; + state.runEmbeddedPiAgentMock.mockImplementation(async () => { + calls += 1; + if (calls === 1) { + throw new Error("502 Bad Gateway"); + } + return { payloads: [{ text: "final" }], meta: {} }; + }); - const { run } = createMinimalRun({ - typingMode: "message", - }); - const runPromise = run(); - void runPromise.catch(() => {}); - await vi.dynamicImportSettled(); + const { run } = createMinimalRun({ + typingMode: "message", + }); + const runPromise = run(); - vi.advanceTimersByTime(2_499); - await Promise.resolve(); - expect(calls).toBe(1); - vi.advanceTimersByTime(1); - await Promise.resolve(); - await Promise.resolve(); - expect(calls).toBe(2); - - // Restore real timers before awaiting the settled run to avoid Vitest - // fake-timer bookkeeping stalling the test worker after the retry fires. - vi.useRealTimers(); - } finally { - vi.useRealTimers(); - } + await vi.advanceTimersByTimeAsync(2_499); + expect(calls).toBe(1); + await vi.advanceTimersByTimeAsync(1); + await runPromise; + expect(calls).toBe(2); + vi.useRealTimers(); }); it("delivers tool results in order even when dispatched concurrently", async () => { @@ -782,7 +721,7 @@ describe("runReplyAgent typing (heartbeat)", () => { state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => { params.onAgentEvent?.({ stream: "compaction", - data: { phase: "end", willRetry: false, completed: true }, + data: { phase: "end", willRetry: false }, }); return { payloads: [{ text: "final" }], meta: {} }; }); @@ -803,6 +742,39 @@ describe("runReplyAgent typing (heartbeat)", () => { }); }); + it("refreshes queued followups when auto-compaction rotates the session", async () => { + await withTempStateDir(async (stateDir) => { + const storePath = path.join(stateDir, "sessions", "sessions.json"); + const sessionEntry: SessionEntry = { sessionId: "session", updatedAt: Date.now() }; + const sessionStore = { main: sessionEntry }; + + state.runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "final" }], + meta: { + agentMeta: { + sessionId: "session-rotated", + compactionCount: 1, + }, + }, + }); + + const { run } = createMinimalRun({ + sessionEntry, + sessionStore, + sessionKey: "main", + storePath, + }); + await run(); + + expect(vi.mocked(refreshQueuedFollowupSession)).toHaveBeenCalledWith({ + key: "main", + previousSessionId: "session", + nextSessionId: "session-rotated", + nextSessionFile: expect.stringContaining("session-rotated.jsonl"), + }); + }); + }); + it("announces model fallback only when verbose mode is enabled", async () => { const cases = [ { name: "verbose on", verbose: "on" as const, expectNotice: true }, @@ -1315,6 +1287,12 @@ describe("runReplyAgent typing (heartbeat)", () => { } expect(payload.text?.toLowerCase()).toContain("reset"); expect(sessionStore.main.sessionId).not.toBe(sessionId); + expect(vi.mocked(refreshQueuedFollowupSession)).toHaveBeenCalledWith({ + key: "main", + previousSessionId: sessionId, + nextSessionId: sessionStore.main.sessionId, + nextSessionFile: expect.stringContaining(".jsonl"), + }); const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")); expect(persisted.main.sessionId).toBe(sessionStore.main.sessionId); @@ -1646,6 +1624,11 @@ describe("runReplyAgent memory flush", () => { return await fn(path.join(dir, "sessions.json")); } + async function normalizeComparablePath(filePath: string): Promise { + const parent = await fs.realpath(path.dirname(filePath)).catch(() => path.dirname(filePath)); + return path.join(parent, path.basename(filePath)); + } + beforeAll(async () => { fixtureRoot = await fs.mkdtemp(path.join(tmpdir(), "openclaw-memory-flush-")); }); @@ -1850,15 +1833,26 @@ describe("runReplyAgent memory flush", () => { prompt?: string; extraSystemPrompt?: string; memoryFlushWritePath?: string; + sessionId?: string; + sessionFile?: string; }> = []; state.runEmbeddedPiAgentMock.mockImplementation(async (params: EmbeddedRunParams) => { calls.push({ prompt: params.prompt, extraSystemPrompt: params.extraSystemPrompt, memoryFlushWritePath: params.memoryFlushWritePath, + sessionId: params.sessionId, + sessionFile: params.sessionFile, }); if (params.prompt?.includes("Pre-compaction memory flush.")) { - return { payloads: [], meta: {} }; + params.onAgentEvent?.({ + stream: "compaction", + data: { phase: "end", willRetry: false }, + }); + return { + payloads: [], + meta: { agentMeta: { sessionId: "session-rotated" } }, + }; } return { payloads: [{ text: "ok" }], @@ -1888,10 +1882,25 @@ describe("runReplyAgent memory flush", () => { expect(calls[0]?.extraSystemPrompt).toContain("memory/YYYY-MM-DD.md"); expect(calls[0]?.extraSystemPrompt).toContain("MEMORY.md"); expect(calls[1]?.prompt).toBe("hello"); + expect(calls[1]?.sessionId).toBe("session-rotated"); + expect(await normalizeComparablePath(calls[1]?.sessionFile ?? "")).toBe( + await normalizeComparablePath(path.join(path.dirname(storePath), "session-rotated.jsonl")), + ); + expect(vi.mocked(refreshQueuedFollowupSession)).toHaveBeenCalledWith({ + key: sessionKey, + previousSessionId: "session", + nextSessionId: "session-rotated", + nextSessionFile: expect.stringContaining("session-rotated.jsonl"), + }); const stored = JSON.parse(await fs.readFile(storePath, "utf-8")); expect(stored[sessionKey].memoryFlushAt).toBeTypeOf("number"); - expect(stored[sessionKey].memoryFlushCompactionCount).toBe(1); + expect(stored[sessionKey].memoryFlushCompactionCount).toBe(2); + expect(stored[sessionKey].compactionCount).toBe(2); + expect(stored[sessionKey].sessionId).toBe("session-rotated"); + expect(await normalizeComparablePath(stored[sessionKey].sessionFile)).toBe( + await normalizeComparablePath(path.join(path.dirname(storePath), "session-rotated.jsonl")), + ); }); }); @@ -2098,121 +2107,6 @@ describe("runReplyAgent memory flush", () => { }); }); - it("skips duplicate memory writes across memory-flush fallback retries", async () => { - await withTempStore(async (storePath) => { - const sessionKey = "main"; - const sessionFile = "session-relative.jsonl"; - const fixtureDir = path.dirname(storePath); - const transcriptPath = path.join(fixtureDir, sessionFile); - await fs.mkdir(path.dirname(transcriptPath), { recursive: true }); - await fs.writeFile( - transcriptPath, - [ - JSON.stringify({ message: { role: "user", content: "Remember alpha." } }), - JSON.stringify({ message: { role: "assistant", content: "Stored alpha." } }), - ].join("\n") + "\n", - "utf-8", - ); - - const sessionEntry = { - sessionId: "session", - updatedAt: Date.now(), - sessionFile, - totalTokens: 80_000, - compactionCount: 1, - }; - - await seedSessionStore({ storePath, sessionKey, entry: sessionEntry }); - - let flushAttemptCount = 0; - let memoryFilePath: string | undefined; - const prompts: string[] = []; - state.runEmbeddedPiAgentMock.mockImplementation(async (params: EmbeddedRunParams) => { - prompts.push(params.prompt ?? ""); - if (params.prompt?.includes("Pre-compaction memory flush.")) { - flushAttemptCount += 1; - memoryFilePath = path.join(fixtureDir, params.memoryFlushWritePath ?? "memory/flush.md"); - await fs.mkdir(path.dirname(memoryFilePath), { recursive: true }); - await fs.appendFile(memoryFilePath, "remember alpha\n", "utf-8"); - if (flushAttemptCount === 1) { - throw new Error("flush failed after write"); - } - return { payloads: [], meta: {} }; - } - return { - payloads: [{ text: "ok" }], - meta: { agentMeta: { usage: { input: 1, output: 1 } } }, - }; - }); - - const fallbackSpy = vi - .spyOn(modelFallbackModule, "runWithModelFallback") - .mockImplementationOnce( - async ({ - provider, - model, - run, - }: { - provider: string; - model: string; - run: (provider: string, model: string) => Promise; - }) => { - try { - await run(provider, model); - } catch { - // Simulate advancing to the next fallback candidate after the first - // memory flush attempt already wrote and then failed. - } - return { - result: await run("openai", "gpt-5.4"), - provider: "openai", - model: "gpt-5.4", - attempts: [ - { - provider, - model, - error: "flush failed after write", - reason: "unknown", - }, - ], - }; - }, - ); - - try { - const baseRun = createBaseRun({ - storePath, - sessionEntry, - runOverrides: { - sessionFile, - workspaceDir: fixtureDir, - }, - }); - - await runReplyAgentWithBase({ - baseRun, - storePath, - sessionKey, - sessionEntry, - commandBody: "hello", - }); - } finally { - fallbackSpy.mockRestore(); - } - - expect(flushAttemptCount).toBe(1); - expect( - prompts.filter((prompt) => prompt.includes("Pre-compaction memory flush.")), - ).toHaveLength(1); - expect(memoryFilePath).toBeDefined(); - await expect(fs.readFile(memoryFilePath!, "utf-8")).resolves.toBe("remember alpha\n"); - - const stored = JSON.parse(await fs.readFile(storePath, "utf-8")); - expect(stored[sessionKey].memoryFlushAt).toBeTypeOf("number"); - expect(stored[sessionKey].memoryFlushContextHash).toMatch(/^[0-9a-f]{16}$/); - }); - }); - it("increments compaction count when flush compaction completes", async () => { await withTempStore(async (storePath) => { const sessionKey = "main"; @@ -2229,7 +2123,7 @@ describe("runReplyAgent memory flush", () => { if (params.prompt?.includes("Pre-compaction memory flush.")) { params.onAgentEvent?.({ stream: "compaction", - data: { phase: "end", willRetry: false, completed: true }, + data: { phase: "end", willRetry: false }, }); return { payloads: [], meta: {} }; } @@ -2258,25 +2152,4 @@ describe("runReplyAgent memory flush", () => { }); }); }); - -describe("runReplyAgent error followup drain", () => { - it("drains followup queue when an unexpected exception escapes the run path", async () => { - vi.resetModules(); - vi.doMock("./agent-runner-execution.runtime.js", () => ({ - runAgentTurnWithFallback: vi.fn().mockRejectedValueOnce(new Error("persist exploded")), - })); - - try { - ({ scheduleFollowupDrain: scheduleFollowupDrainMock } = await import("./queue.js")); - vi.mocked(scheduleFollowupDrainMock).mockClear(); - runReplyAgentPromise = undefined; - const { run } = createMinimalRun(); - await expect(run()).rejects.toThrow("persist exploded"); - expect(vi.mocked(scheduleFollowupDrainMock)).toHaveBeenCalledTimes(1); - } finally { - vi.doUnmock("./agent-runner-execution.runtime.js"); - runReplyAgentPromise = undefined; - } - }); -}); import type { ReplyPayload } from "../types.js"; diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index d7ce18d1cf8..659045a3136 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -1,23 +1,26 @@ import fs from "node:fs"; -import { lookupCachedContextTokens } from "../../agents/context-cache.js"; -import { lookupContextTokens } from "../../agents/context-tokens.runtime.js"; +import { lookupContextTokens } from "../../agents/context.js"; import { DEFAULT_CONTEXT_TOKENS } from "../../agents/defaults.js"; import { resolveModelAuthMode } from "../../agents/model-auth.js"; import { isCliProvider } from "../../agents/model-selection.js"; +import { queueEmbeddedPiMessage } from "../../agents/pi-embedded.js"; import { hasNonzeroUsage } from "../../agents/usage.js"; import { + resolveAgentIdFromSessionKey, resolveSessionFilePath, resolveSessionFilePathOptions, resolveSessionTranscriptPath, -} from "../../config/sessions/paths.js"; -import type { SessionEntry } from "../../config/sessions/types.js"; + type SessionEntry, + updateSessionStore, + updateSessionStoreEntry, +} from "../../config/sessions.js"; import type { TypingMode } from "../../config/types.js"; import { emitAgentEvent } from "../../infra/agent-events.js"; import { emitDiagnosticEvent, isDiagnosticsEnabled } from "../../infra/diagnostic-events.js"; import { generateSecureUuid } from "../../infra/secure-random.js"; import { enqueueSystemEvent } from "../../infra/system-events.js"; -import { resolveAgentIdFromSessionKey } from "../../routing/session-key.js"; import { defaultRuntime } from "../../runtime.js"; +import { estimateUsageCost, resolveModelCostConfig } from "../../utils/usage-format.js"; import { buildFallbackClearedNotice, buildFallbackNotice, @@ -26,7 +29,7 @@ import { import type { OriginatingChannelType, TemplateContext } from "../templating.js"; import { resolveResponseUsageMode, type VerboseLevel } from "../thinking.js"; import type { GetReplyOptions, ReplyPayload } from "../types.js"; -import { runAgentTurnWithFallback } from "./agent-runner-execution.runtime.js"; +import { runAgentTurnWithFallback } from "./agent-runner-execution.js"; import { createShouldEmitToolOutput, createShouldEmitToolResult, @@ -34,7 +37,7 @@ import { isAudioPayload, signalTypingIfNeeded, } from "./agent-runner-helpers.js"; -import { runMemoryFlushIfNeeded } from "./agent-runner-memory.runtime.js"; +import { runMemoryFlushIfNeeded } from "./agent-runner-memory.js"; import { buildReplyPayloads } from "./agent-runner-payloads.js"; import { appendUnscheduledReminderNote, @@ -48,37 +51,19 @@ import { createFollowupRunner } from "./followup-runner.js"; import { resolveOriginMessageProvider, resolveOriginMessageTo } from "./origin-routing.js"; import { readPostCompactionContext } from "./post-compaction-context.js"; import { resolveActiveRunQueueAction } from "./queue-policy.js"; -import type { FollowupRun, QueueSettings } from "./queue.js"; -import { enqueueFollowupRun } from "./queue/enqueue.js"; -import { createReplyMediaPathNormalizer } from "./reply-media-paths.runtime.js"; +import { + enqueueFollowupRun, + refreshQueuedFollowupSession, + type FollowupRun, + type QueueSettings, +} from "./queue.js"; +import { createReplyMediaPathNormalizer } from "./reply-media-paths.js"; import { createReplyToModeFilterForChannel, resolveReplyToMode } from "./reply-threading.js"; import { incrementRunCompactionCount, persistRunSessionUsage } from "./session-run-accounting.js"; import { createTypingSignaler } from "./typing-mode.js"; import type { TypingController } from "./typing.js"; const BLOCK_REPLY_SEND_TIMEOUT_MS = 15_000; -let piEmbeddedQueueRuntimePromise: Promise< - typeof import("../../agents/pi-embedded-queue.runtime.js") -> | null = null; -let usageCostRuntimePromise: Promise | null = null; -let sessionStoreRuntimePromise: Promise< - typeof import("../../config/sessions/store.runtime.js") -> | null = null; - -function loadPiEmbeddedQueueRuntime() { - piEmbeddedQueueRuntimePromise ??= import("../../agents/pi-embedded-queue.runtime.js"); - return piEmbeddedQueueRuntimePromise; -} - -function loadUsageCostRuntime() { - usageCostRuntimePromise ??= import("./usage-cost.runtime.js"); - return usageCostRuntimePromise; -} - -function loadSessionStoreRuntime() { - sessionStoreRuntimePromise ??= import("../../config/sessions/store.runtime.js"); - return sessionStoreRuntimePromise; -} export async function runReplyAgent(params: { commandBody: string; @@ -206,7 +191,6 @@ export async function runReplyAgent(params: { activeSessionEntry.updatedAt = updatedAt; activeSessionStore[sessionKey] = activeSessionEntry; if (storePath) { - const { updateSessionStoreEntry } = await loadSessionStoreRuntime(); await updateSessionStoreEntry({ storePath, sessionKey, @@ -216,7 +200,6 @@ export async function runReplyAgent(params: { }; if (shouldSteer && isStreaming) { - const { queueEmbeddedPiMessage } = await loadPiEmbeddedQueueRuntime(); const steered = queueEmbeddedPiMessage(followupRun.run.sessionId, followupRun.prompt); if (steered && !shouldFollowup) { await touchActiveSessionEntry(); @@ -314,7 +297,6 @@ export async function runReplyAgent(params: { fallbackNoticeSelectedModel: undefined, fallbackNoticeActiveModel: undefined, fallbackNoticeReason: undefined, - memoryFlushContextHash: undefined, }; const agentId = resolveAgentIdFromSessionKey(sessionKey); const nextSessionFile = resolveSessionTranscriptPath( @@ -325,7 +307,6 @@ export async function runReplyAgent(params: { nextEntry.sessionFile = nextSessionFile; activeSessionStore[sessionKey] = nextEntry; try { - const { updateSessionStore } = await loadSessionStoreRuntime(); await updateSessionStore(storePath, (store) => { store[sessionKey] = nextEntry; }); @@ -336,6 +317,12 @@ export async function runReplyAgent(params: { } followupRun.run.sessionId = nextSessionId; followupRun.run.sessionFile = nextSessionFile; + refreshQueuedFollowupSession({ + key: queueKey, + previousSessionId: prevEntry.sessionId, + nextSessionId, + nextSessionFile, + }); activeSessionEntry = nextEntry; activeIsNewSession = true; defaultRuntime.error(buildLogMessage(nextSessionId)); @@ -425,7 +412,6 @@ export async function runReplyAgent(params: { activeSessionEntry.updatedAt = updatedAt; activeSessionStore[sessionKey] = activeSessionEntry; if (storePath) { - const { updateSessionStoreEntry } = await loadSessionStoreRuntime(); await updateSessionStoreEntry({ storePath, sessionKey, @@ -443,11 +429,6 @@ export async function runReplyAgent(params: { await blockReplyPipeline.flush({ force: true }); blockReplyPipeline.stop(); } - - // NOTE: The compaction completion notice for block-streaming mode is sent - // further below — after incrementRunCompactionCount — so it can include - // the `(count N)` suffix. Sending it here (before the count is known) - // would omit that information. if (pendingToolTasks.size > 0) { await Promise.allSettled(pendingToolTasks); } @@ -482,7 +463,6 @@ export async function runReplyAgent(params: { activeSessionStore[sessionKey] = fallbackStateEntry; } if (sessionKey && storePath) { - const { updateSessionStoreEntry } = await loadSessionStoreRuntime(); await updateSessionStoreEntry({ storePath, sessionKey, @@ -497,11 +477,9 @@ export async function runReplyAgent(params: { const cliSessionId = isCliProvider(providerUsed, cfg) ? runResult.meta?.agentMeta?.sessionId?.trim() : undefined; - const cachedContextTokens = lookupCachedContextTokens(modelUsed); const contextTokensUsed = agentCfgContextTokens ?? - cachedContextTokens ?? - lookupContextTokens(modelUsed, { allowAsyncLoad: false }) ?? + lookupContextTokens(modelUsed) ?? activeSessionEntry?.contextTokens ?? DEFAULT_CONTEXT_TOKENS; @@ -579,7 +557,6 @@ export async function runReplyAgent(params: { await signalTypingIfNeeded(guardedReplyPayloads, typingSignals); if (isDiagnosticsEnabled(cfg) && hasNonzeroUsage(usage)) { - const { estimateUsageCost, resolveModelCostConfig } = await loadUsageCostRuntime(); const input = usage.input ?? 0; const output = usage.output ?? 0; const cacheRead = usage.cacheRead ?? 0; @@ -622,7 +599,6 @@ export async function runReplyAgent(params: { (sessionKey ? activeSessionStore?.[sessionKey]?.responseUsage : undefined); const responseUsageMode = resolveResponseUsageMode(responseUsageRaw); if (responseUsageMode !== "off" && hasNonzeroUsage(usage)) { - const { resolveModelCostConfig } = await loadUsageCostRuntime(); const authMode = resolveModelAuthMode(providerUsed, cfg); const showCost = authMode === "api-key"; const costConfig = showCost @@ -708,6 +684,7 @@ export async function runReplyAgent(params: { } if (autoCompactionCount > 0) { + const previousSessionId = activeSessionEntry?.sessionId ?? followupRun.run.sessionId; const count = await incrementRunCompactionCount({ sessionEntry: activeSessionEntry, sessionStore: activeSessionStore, @@ -716,7 +693,19 @@ export async function runReplyAgent(params: { amount: autoCompactionCount, lastCallUsage: runResult.meta?.agentMeta?.lastCallUsage, contextTokensUsed, + newSessionId: runResult.meta?.agentMeta?.sessionId, }); + const refreshedSessionEntry = + sessionKey && activeSessionStore ? activeSessionStore[sessionKey] : undefined; + if (refreshedSessionEntry) { + activeSessionEntry = refreshedSessionEntry; + refreshQueuedFollowupSession({ + key: queueKey, + previousSessionId, + nextSessionId: refreshedSessionEntry.sessionId, + nextSessionFile: refreshedSessionEntry.sessionFile, + }); + } // Inject post-compaction workspace context for the next agent turn if (sessionKey) { @@ -732,48 +721,9 @@ export async function runReplyAgent(params: { }); } - // Always notify the user when compaction completes — not just in verbose - // mode. The "🧹 Compacting context..." notice was already sent at start, - // so the completion message closes the loop for every user regardless of - // their verbose setting. - const suffix = typeof count === "number" ? ` (count ${count})` : ""; - const completionText = verboseEnabled - ? `🧹 Auto-compaction complete${suffix}.` - : `✅ Context compacted${suffix}.`; - - if (blockReplyPipeline && opts?.onBlockReply) { - // In block-streaming mode, send the completion notice via - // fire-and-forget *after* the pipeline has flushed (so it does not set - // didStream()=true and cause buildReplyPayloads to discard the real - // assistant reply). Now that the count is known we can include it. - const currentMessageId = sessionCtx.MessageSidFull ?? sessionCtx.MessageSid; - const noticePayload = applyReplyToMode({ - text: completionText, - replyToId: currentMessageId, - replyToCurrent: true, - isCompactionNotice: true, - }); - void Promise.race([ - opts.onBlockReply(noticePayload), - new Promise((_, reject) => - setTimeout(() => reject(new Error("compaction notice timeout")), blockReplyTimeoutMs), - ), - ]).catch(() => { - // Intentionally swallowed — the notice is informational only. - }); - } else { - // Non-streaming: push into verboseNotices with full compaction metadata - // so threading exemptions apply and replyToMode=first does not thread - // the notice instead of the real assistant reply. - const currentMessageId = sessionCtx.MessageSidFull ?? sessionCtx.MessageSid; - verboseNotices.push( - applyReplyToMode({ - text: completionText, - replyToId: currentMessageId, - replyToCurrent: true, - isCompactionNotice: true, - }), - ); + if (verboseEnabled) { + const suffix = typeof count === "number" ? ` (count ${count})` : ""; + verboseNotices.push({ text: `🧹 Auto-compaction complete${suffix}.` }); } } if (verboseNotices.length > 0) { diff --git a/src/auto-reply/reply/followup-runner.test.ts b/src/auto-reply/reply/followup-runner.test.ts index 8d7804c465e..f91efa40f40 100644 --- a/src/auto-reply/reply/followup-runner.test.ts +++ b/src/auto-reply/reply/followup-runner.test.ts @@ -3,7 +3,12 @@ import { tmpdir } from "node:os"; import path from "node:path"; import { beforeEach, describe, expect, it, vi } from "vitest"; import { loadSessionStore, saveSessionStore, type SessionEntry } from "../../config/sessions.js"; -import type { FollowupRun } from "./queue.js"; +import { + clearFollowupQueue, + enqueueFollowupRun, + type FollowupRun, + type QueueSettings, +} from "./queue.js"; import * as sessionRunAccounting from "./session-run-accounting.js"; import { createMockFollowupRun, createMockTypingController } from "./test-helpers.js"; @@ -16,14 +21,18 @@ vi.mock( async () => await import("../../test-utils/model-fallback.mock.js"), ); -vi.mock("../../agents/pi-embedded.runtime.js", () => ({ +vi.mock("../../agents/pi-embedded.js", () => ({ runEmbeddedPiAgent: (params: unknown) => runEmbeddedPiAgentMock(params), })); -vi.mock("./route-reply.runtime.js", () => ({ - isRoutableChannel: (...args: unknown[]) => isRoutableChannelMock(...args), - routeReply: (...args: unknown[]) => routeReplyMock(...args), -})); +vi.mock("./route-reply.js", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + isRoutableChannel: (...args: unknown[]) => isRoutableChannelMock(...args), + routeReply: (...args: unknown[]) => routeReplyMock(...args), + }; +}); import { createFollowupRunner } from "./followup-runner.js"; @@ -44,6 +53,7 @@ beforeEach(() => { isRoutableChannelMock.mockImplementation((ch: string | undefined) => Boolean(ch?.trim() && ROUTABLE_TEST_CHANNELS.has(ch.trim().toLowerCase())), ); + clearFollowupQueue("main"); }); const baseQueuedRun = (messageProvider = "whatsapp"): FollowupRun => @@ -55,6 +65,11 @@ function createQueuedRun( return createMockFollowupRun(overrides); } +async function normalizeComparablePath(filePath: string): Promise { + const parent = await fs.realpath(path.dirname(filePath)).catch(() => path.dirname(filePath)); + return path.join(parent, path.basename(filePath)); +} + function mockCompactionRun(params: { willRetry: boolean; result: { @@ -66,10 +81,6 @@ function mockCompactionRun(params: { async (args: { onAgentEvent?: (evt: { stream: string; data: Record }) => void; }) => { - args.onAgentEvent?.({ - stream: "compaction", - data: { phase: "start" }, - }); args.onAgentEvent?.({ stream: "compaction", data: { phase: "end", willRetry: params.willRetry, completed: true }, @@ -84,7 +95,7 @@ function createAsyncReplySpy() { } describe("createFollowupRunner compaction", () => { - it("adds compaction notices and tracks count in verbose mode", async () => { + it("adds verbose auto-compaction notice and tracks count", async () => { const storePath = path.join( await fs.mkdtemp(path.join(tmpdir(), "openclaw-compaction-")), "sessions.json", @@ -122,15 +133,9 @@ describe("createFollowupRunner compaction", () => { await runner(queued); - expect(onBlockReply).toHaveBeenCalledTimes(3); - const calls = onBlockReply.mock.calls as unknown as Array< - Array<{ text?: string; isCompactionNotice?: boolean }> - >; - expect(calls[0]?.[0]?.text).toBe("🧹 Compacting context..."); - expect(calls[0]?.[0]?.isCompactionNotice).toBe(true); - expect(calls[1]?.[0]?.text).toContain("Auto-compaction complete"); - expect(calls[1]?.[0]?.isCompactionNotice).toBe(true); - expect(calls[2]?.[0]?.text).toBe("final"); + expect(onBlockReply).toHaveBeenCalled(); + const firstCall = (onBlockReply.mock.calls as unknown as Array>)[0]; + expect(firstCall?.[0]?.text).toContain("Auto-compaction complete"); expect(sessionStore.main.compactionCount).toBe(1); }); @@ -141,6 +146,7 @@ describe("createFollowupRunner compaction", () => { ); const sessionEntry: SessionEntry = { sessionId: "session", + sessionFile: path.join(path.dirname(storePath), "session.jsonl"), updatedAt: Date.now(), }; const sessionStore: Record = { @@ -152,6 +158,7 @@ describe("createFollowupRunner compaction", () => { payloads: [{ text: "final" }], meta: { agentMeta: { + sessionId: "session-rotated", compactionCount: 2, lastCallUsage: { input: 10_000, output: 3_000, total: 13_000 }, }, @@ -177,37 +184,43 @@ describe("createFollowupRunner compaction", () => { await runner(queued); - expect(onBlockReply).toHaveBeenCalledTimes(2); - const calls = onBlockReply.mock.calls as unknown as Array< - Array<{ text?: string; isCompactionNotice?: boolean }> - >; - expect(calls[0]?.[0]?.text).toContain("Auto-compaction complete"); - expect(calls[0]?.[0]?.isCompactionNotice).toBe(true); - expect(calls[1]?.[0]?.text).toBe("final"); + expect(onBlockReply).toHaveBeenCalled(); + const firstCall = (onBlockReply.mock.calls as unknown as Array>)[0]; + expect(firstCall?.[0]?.text).toContain("Auto-compaction complete"); expect(sessionStore.main.compactionCount).toBe(2); + expect(sessionStore.main.sessionId).toBe("session-rotated"); + expect(await normalizeComparablePath(sessionStore.main.sessionFile ?? "")).toBe( + await normalizeComparablePath(path.join(path.dirname(storePath), "session-rotated.jsonl")), + ); }); - it("threads followup compaction notices without consuming the first reply slot", async () => { + it("refreshes queued followup runs to the rotated transcript", async () => { const storePath = path.join( - await fs.mkdtemp(path.join(tmpdir(), "openclaw-compaction-threading-")), + await fs.mkdtemp(path.join(tmpdir(), "openclaw-compaction-queue-")), "sessions.json", ); const sessionEntry: SessionEntry = { sessionId: "session", + sessionFile: path.join(path.dirname(storePath), "session.jsonl"), updatedAt: Date.now(), }; const sessionStore: Record = { main: sessionEntry, }; - const onBlockReply = vi.fn(async () => {}); - mockCompactionRun({ - willRetry: true, - result: { payloads: [{ text: "final" }], meta: {} }, + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "final" }], + meta: { + agentMeta: { + sessionId: "session-rotated", + compactionCount: 1, + lastCallUsage: { input: 10_000, output: 3_000, total: 13_000 }, + }, + }, }); const runner = createFollowupRunner({ - opts: { onBlockReply }, + opts: { onBlockReply: vi.fn(async () => {}) }, typing: createMockTypingController(), typingMode: "instant", sessionEntry, @@ -217,42 +230,30 @@ describe("createFollowupRunner compaction", () => { defaultModel: "anthropic/claude-opus-4-5", }); - const queued = createQueuedRun({ - messageId: "msg-42", + const queuedNext = createQueuedRun({ + prompt: "next", run: { - messageProvider: "discord", - config: { - channels: { - discord: { - replyToMode: "first", - }, - }, - }, - verboseLevel: "off", + sessionId: "session", + sessionFile: path.join(path.dirname(storePath), "session.jsonl"), + }, + }); + const queueSettings: QueueSettings = { mode: "queue" }; + enqueueFollowupRun("main", queuedNext, queueSettings); + + const current = createQueuedRun({ + run: { + verboseLevel: "on", + sessionId: "session", + sessionFile: path.join(path.dirname(storePath), "session.jsonl"), }, }); - await runner(queued); + await runner(current); - expect(onBlockReply).toHaveBeenCalledTimes(3); - const calls = onBlockReply.mock.calls as unknown as Array< - Array<{ text?: string; replyToId?: string; isCompactionNotice?: boolean }> - >; - expect(calls[0]?.[0]).toMatchObject({ - text: "🧹 Compacting context...", - replyToId: "msg-42", - isCompactionNotice: true, - }); - expect(calls[1]?.[0]).toMatchObject({ - text: "✅ Context compacted (count 1).", - replyToId: "msg-42", - isCompactionNotice: true, - }); - expect(calls[2]?.[0]).toMatchObject({ - text: "final", - replyToId: "msg-42", - }); - expect(calls[2]?.[0]?.isCompactionNotice).toBeUndefined(); + expect(queuedNext.run.sessionId).toBe("session-rotated"); + expect(await normalizeComparablePath(queuedNext.run.sessionFile)).toBe( + await normalizeComparablePath(path.join(path.dirname(storePath), "session-rotated.jsonl")), + ); }); it("does not count failed compaction end events in followup runs", async () => { diff --git a/src/auto-reply/reply/followup-runner.ts b/src/auto-reply/reply/followup-runner.ts index 513f1a60449..f5bd3db9af5 100644 --- a/src/auto-reply/reply/followup-runner.ts +++ b/src/auto-reply/reply/followup-runner.ts @@ -5,10 +5,10 @@ import { } from "openclaw/plugin-sdk/reply-payload"; import { resolveRunModelFallbacksOverride } from "../../agents/agent-scope.js"; import { resolveBootstrapWarningSignaturesSeen } from "../../agents/bootstrap-budget.js"; -import { lookupCachedContextTokens } from "../../agents/context-cache.js"; import { lookupContextTokens } from "../../agents/context.js"; import { DEFAULT_CONTEXT_TOKENS } from "../../agents/defaults.js"; import { runWithModelFallback } from "../../agents/model-fallback.js"; +import { runEmbeddedPiAgent } from "../../agents/pi-embedded.js"; import type { SessionEntry } from "../../config/sessions.js"; import type { TypingMode } from "../../config/types.js"; import { logVerbose } from "../../globals.js"; @@ -19,38 +19,25 @@ import { stripHeartbeatToken } from "../heartbeat.js"; import type { OriginatingChannelType } from "../templating.js"; import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../tokens.js"; import type { GetReplyOptions, ReplyPayload } from "../types.js"; -import { resolveRunAuthProfile } from "./agent-runner-auth-profile.js"; +import { resolveRunAuthProfile } from "./agent-runner-utils.js"; import { resolveOriginAccountId, resolveOriginMessageProvider, resolveOriginMessageTo, } from "./origin-routing.js"; -import type { FollowupRun } from "./queue.js"; +import { refreshQueuedFollowupSession, type FollowupRun } from "./queue.js"; +import { + applyReplyThreading, + filterMessagingToolDuplicates, + filterMessagingToolMediaDuplicates, + shouldSuppressMessagingToolReplies, +} from "./reply-payloads.js"; import { resolveReplyToMode } from "./reply-threading.js"; +import { isRoutableChannel, routeReply } from "./route-reply.js"; import { incrementRunCompactionCount, persistRunSessionUsage } from "./session-run-accounting.js"; import { createTypingSignaler } from "./typing-mode.js"; import type { TypingController } from "./typing.js"; -let piEmbeddedRuntimePromise: Promise | null = - null; -let routeReplyRuntimePromise: Promise | null = null; -let replyPayloadsRuntimePromise: Promise | null = - null; - -function loadPiEmbeddedRuntime() { - piEmbeddedRuntimePromise ??= import("../../agents/pi-embedded.runtime.js"); - return piEmbeddedRuntimePromise; -} - -function loadRouteReplyRuntime() { - routeReplyRuntimePromise ??= import("./route-reply.runtime.js"); - return routeReplyRuntimePromise; -} - -function loadReplyPayloadsRuntime() { - replyPayloadsRuntimePromise ??= import("./reply-payloads.runtime.js"); - return replyPayloadsRuntimePromise; -} export function createFollowupRunner(params: { opts?: GetReplyOptions; typing: TypingController; @@ -90,7 +77,6 @@ export function createFollowupRunner(params: { const sendFollowupPayloads = async (payloads: ReplyPayload[], queued: FollowupRun) => { // Check if we should route to originating channel. const { originatingChannel, originatingTo } = queued; - const { isRoutableChannel, routeReply } = await loadRouteReplyRuntime(); const shouldRouteToOriginating = isRoutableChannel(originatingChannel) && originatingTo; if (!shouldRouteToOriginating && !opts?.onBlockReply) { @@ -162,47 +148,8 @@ export function createFollowupRunner(params: { isControlUiVisible: shouldSurfaceToControlUi, }); } - const replyToChannel = resolveOriginMessageProvider({ - originatingChannel: queued.originatingChannel, - provider: queued.run.messageProvider, - }) as OriginatingChannelType | undefined; - const replyToMode = resolveReplyToMode( - queued.run.config, - replyToChannel, - queued.originatingAccountId, - queued.originatingChatType, - ); - const currentMessageId = queued.messageId?.trim() || undefined; - const applyFollowupReplyThreading = async (payloads: ReplyPayload[]) => { - const { applyReplyThreading } = await loadReplyPayloadsRuntime(); - return applyReplyThreading({ - payloads, - replyToMode, - replyToChannel, - currentMessageId, - }); - }; - const sendCompactionNotice = async (text: string) => { - try { - const noticePayloads = await applyFollowupReplyThreading([ - { - text, - replyToCurrent: true, - isCompactionNotice: true, - }, - ]); - if (noticePayloads.length === 0) { - return; - } - await sendFollowupPayloads(noticePayloads, queued); - } catch (err) { - logVerbose(`followup queue: compaction notice failed (non-fatal): ${String(err)}`); - } - }; let autoCompactionCount = 0; - let runResult: Awaited< - ReturnType - >; + let runResult: Awaited>; let fallbackProvider = queued.run.provider; let fallbackModel = queued.run.model; const activeSessionEntry = @@ -226,7 +173,6 @@ export function createFollowupRunner(params: { const authProfile = resolveRunAuthProfile(queued.run, provider); let attemptCompactionCount = 0; try { - const { runEmbeddedPiAgent } = await loadPiEmbeddedRuntime(); const result = await runEmbeddedPiAgent({ allowGatewaySubagentBinding: true, sessionId: queued.run.sessionId, @@ -278,14 +224,11 @@ export function createFollowupRunner(params: { bootstrapPromptWarningSignaturesSeen[ bootstrapPromptWarningSignaturesSeen.length - 1 ], - onAgentEvent: (evt: { stream: string; data?: Record }) => { + onAgentEvent: (evt) => { if (evt.stream !== "compaction") { return; } - const phase = typeof evt.data?.phase === "string" ? evt.data.phase : ""; - if (phase === "start") { - void sendCompactionNotice("🧹 Compacting context..."); - } + const phase = typeof evt.data.phase === "string" ? evt.data.phase : ""; const completed = evt.data?.completed === true; if (phase === "end" && completed) { attemptCompactionCount += 1; @@ -318,15 +261,9 @@ export function createFollowupRunner(params: { const usage = runResult.meta?.agentMeta?.usage; const promptTokens = runResult.meta?.agentMeta?.promptTokens; const modelUsed = runResult.meta?.agentMeta?.model ?? fallbackModel ?? defaultModel; - const cachedContextTokens = lookupCachedContextTokens(modelUsed); - const lazyContextTokens = - agentCfgContextTokens == null && cachedContextTokens == null - ? lookupContextTokens(modelUsed, { allowAsyncLoad: false }) - : undefined; const contextTokensUsed = agentCfgContextTokens ?? - cachedContextTokens ?? - lazyContextTokens ?? + lookupContextTokens(modelUsed) ?? sessionEntry?.contextTokens ?? DEFAULT_CONTEXT_TOKENS; @@ -347,7 +284,10 @@ export function createFollowupRunner(params: { } const payloadArray = runResult.payloads ?? []; - const sanitizedPayloads = payloadArray.flatMap((payload: ReplyPayload) => { + if (payloadArray.length === 0) { + return; + } + const sanitizedPayloads = payloadArray.flatMap((payload) => { const text = payload.text; if (!text || !text.includes("HEARTBEAT_OK")) { return [payload]; @@ -359,13 +299,23 @@ export function createFollowupRunner(params: { } return [{ ...payload, text: stripped.text }]; }); - const replyTaggedPayloads = await applyFollowupReplyThreading(sanitizedPayloads); + const replyToChannel = resolveOriginMessageProvider({ + originatingChannel: queued.originatingChannel, + provider: queued.run.messageProvider, + }) as OriginatingChannelType | undefined; + const replyToMode = resolveReplyToMode( + queued.run.config, + replyToChannel, + queued.originatingAccountId, + queued.originatingChatType, + ); + + const replyTaggedPayloads: ReplyPayload[] = applyReplyThreading({ + payloads: sanitizedPayloads, + replyToMode, + replyToChannel, + }); - const { - filterMessagingToolDuplicates, - filterMessagingToolMediaDuplicates, - shouldSuppressMessagingToolReplies, - } = await loadReplyPayloadsRuntime(); const dedupedPayloads = filterMessagingToolDuplicates({ payloads: replyTaggedPayloads, sentTexts: runResult.messagingToolSentTexts ?? [], @@ -388,9 +338,14 @@ export function createFollowupRunner(params: { accountId: queued.run.agentAccountId, }), }); - let finalPayloads = suppressMessagingToolReplies ? [] : mediaFilteredPayloads; + const finalPayloads = suppressMessagingToolReplies ? [] : mediaFilteredPayloads; + + if (finalPayloads.length === 0) { + return; + } if (autoCompactionCount > 0) { + const previousSessionId = queued.run.sessionId; const count = await incrementRunCompactionCount({ sessionEntry, sessionStore, @@ -399,26 +354,27 @@ export function createFollowupRunner(params: { amount: autoCompactionCount, lastCallUsage: runResult.meta?.agentMeta?.lastCallUsage, contextTokensUsed, + newSessionId: runResult.meta?.agentMeta?.sessionId, }); - const suffix = typeof count === "number" ? ` (count ${count})` : ""; - const completionText = - queued.run.verboseLevel && queued.run.verboseLevel !== "off" - ? `🧹 Auto-compaction complete${suffix}.` - : `✅ Context compacted${suffix}.`; - finalPayloads = [ - ...(await applyFollowupReplyThreading([ - { - text: completionText, - replyToCurrent: true, - isCompactionNotice: true, - }, - ])), - ...finalPayloads, - ]; - } - - if (finalPayloads.length === 0) { - return; + const refreshedSessionEntry = + sessionKey && sessionStore ? sessionStore[sessionKey] : undefined; + if (refreshedSessionEntry) { + const queueKey = queued.run.sessionKey ?? sessionKey; + if (queueKey) { + refreshQueuedFollowupSession({ + key: queueKey, + previousSessionId, + nextSessionId: refreshedSessionEntry.sessionId, + nextSessionFile: refreshedSessionEntry.sessionFile, + }); + } + } + if (queued.run.verboseLevel && queued.run.verboseLevel !== "off") { + const suffix = typeof count === "number" ? ` (count ${count})` : ""; + finalPayloads.unshift({ + text: `🧹 Auto-compaction complete${suffix}.`, + }); + } } await sendFollowupPayloads(finalPayloads, queued); diff --git a/src/auto-reply/reply/queue.ts b/src/auto-reply/reply/queue.ts index b097b6c5193..0dbb913bd75 100644 --- a/src/auto-reply/reply/queue.ts +++ b/src/auto-reply/reply/queue.ts @@ -8,7 +8,7 @@ export { resetRecentQueuedMessageIdDedupe, } from "./queue/enqueue.js"; export { resolveQueueSettings } from "./queue/settings.js"; -export { clearFollowupQueue } from "./queue/state.js"; +export { clearFollowupQueue, refreshQueuedFollowupSession } from "./queue/state.js"; export type { FollowupRun, QueueDedupeMode, diff --git a/src/auto-reply/reply/queue/state.ts b/src/auto-reply/reply/queue/state.ts index 44208e727dd..36a09ef1ebc 100644 --- a/src/auto-reply/reply/queue/state.ts +++ b/src/auto-reply/reply/queue/state.ts @@ -85,3 +85,37 @@ export function clearFollowupQueue(key: string): number { FOLLOWUP_QUEUES.delete(cleaned); return cleared; } + +export function refreshQueuedFollowupSession(params: { + key: string; + previousSessionId?: string; + nextSessionId?: string; + nextSessionFile?: string; +}): void { + const cleaned = params.key.trim(); + if (!cleaned || !params.previousSessionId || !params.nextSessionId) { + return; + } + if (params.previousSessionId === params.nextSessionId) { + return; + } + const queue = getExistingFollowupQueue(cleaned); + if (!queue) { + return; + } + + const rewriteRun = (run?: FollowupRun["run"]) => { + if (!run || run.sessionId !== params.previousSessionId) { + return; + } + run.sessionId = params.nextSessionId!; + if (params.nextSessionFile?.trim()) { + run.sessionFile = params.nextSessionFile; + } + }; + + rewriteRun(queue.lastRun); + for (const item of queue.items) { + rewriteRun(item.run); + } +} diff --git a/src/auto-reply/reply/reply-state.test.ts b/src/auto-reply/reply/reply-state.test.ts index f83d313e2d3..83b4408e5b4 100644 --- a/src/auto-reply/reply/reply-state.test.ts +++ b/src/auto-reply/reply/reply-state.test.ts @@ -445,6 +445,93 @@ describe("incrementCompactionCount", () => { expect(stored[sessionKey].outputTokens).toBeUndefined(); }); + it("updates sessionId and sessionFile when compaction rotated transcripts", async () => { + const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-compact-rotate-")); + tempDirs.push(tmp); + const storePath = path.join(tmp, "sessions.json"); + const sessionKey = "main"; + const entry = { + sessionId: "s1", + sessionFile: path.join(tmp, "s1-topic-456.jsonl"), + updatedAt: Date.now(), + compactionCount: 0, + } as SessionEntry; + const sessionStore: Record = { [sessionKey]: entry }; + await seedSessionStore({ storePath, sessionKey, entry }); + + await incrementCompactionCount({ + sessionEntry: entry, + sessionStore, + sessionKey, + storePath, + newSessionId: "s2", + }); + + const stored = JSON.parse(await fs.readFile(storePath, "utf-8")); + const expectedDir = await fs.realpath(tmp); + expect(stored[sessionKey].sessionId).toBe("s2"); + expect(stored[sessionKey].sessionFile).toBe(path.join(expectedDir, "s2-topic-456.jsonl")); + }); + + it("preserves fork transcript filenames when compaction rotates transcripts", async () => { + const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-compact-fork-")); + tempDirs.push(tmp); + const storePath = path.join(tmp, "sessions.json"); + const sessionKey = "main"; + const entry = { + sessionId: "s1", + sessionFile: path.join(tmp, "2026-03-23T12-34-56-789Z_s1.jsonl"), + updatedAt: Date.now(), + compactionCount: 0, + } as SessionEntry; + const sessionStore: Record = { [sessionKey]: entry }; + await seedSessionStore({ storePath, sessionKey, entry }); + + await incrementCompactionCount({ + sessionEntry: entry, + sessionStore, + sessionKey, + storePath, + newSessionId: "s2", + }); + + const stored = JSON.parse(await fs.readFile(storePath, "utf-8")); + const expectedDir = await fs.realpath(tmp); + expect(stored[sessionKey].sessionId).toBe("s2"); + expect(stored[sessionKey].sessionFile).toBe( + path.join(expectedDir, "2026-03-23T12-34-56-789Z_s2.jsonl"), + ); + }); + + it("falls back to the derived transcript path when rewritten absolute sessionFile is unsafe", async () => { + const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-compact-unsafe-")); + tempDirs.push(tmp); + const storePath = path.join(tmp, "sessions.json"); + const sessionKey = "main"; + const unsafePath = path.join(tmp, "outside", "s1.jsonl"); + const entry = { + sessionId: "s1", + sessionFile: unsafePath, + updatedAt: Date.now(), + compactionCount: 0, + } as SessionEntry; + const sessionStore: Record = { [sessionKey]: entry }; + await seedSessionStore({ storePath, sessionKey, entry }); + + await incrementCompactionCount({ + sessionEntry: entry, + sessionStore, + sessionKey, + storePath, + newSessionId: "s2", + }); + + const stored = JSON.parse(await fs.readFile(storePath, "utf-8")); + const expectedDir = await fs.realpath(tmp); + expect(stored[sessionKey].sessionId).toBe("s2"); + expect(stored[sessionKey].sessionFile).toBe(path.join(expectedDir, "s2.jsonl")); + }); + it("increments compaction count by an explicit amount", async () => { const entry = { sessionId: "s1", updatedAt: Date.now(), compactionCount: 2 } as SessionEntry; const { storePath, sessionKey, sessionStore } = await createCompactionSessionFixture(entry); @@ -462,6 +549,55 @@ describe("incrementCompactionCount", () => { expect(stored[sessionKey].compactionCount).toBe(4); }); + it("updates sessionId and sessionFile when newSessionId is provided", async () => { + const entry = { + sessionId: "old-session-id", + sessionFile: "old-session-id.jsonl", + updatedAt: Date.now(), + compactionCount: 1, + } as SessionEntry; + const { storePath, sessionKey, sessionStore } = await createCompactionSessionFixture(entry); + + await incrementCompactionCount({ + sessionEntry: entry, + sessionStore, + sessionKey, + storePath, + newSessionId: "new-session-id", + }); + + const stored = JSON.parse(await fs.readFile(storePath, "utf-8")); + const expectedSessionDir = await fs.realpath(path.dirname(storePath)); + expect(stored[sessionKey].sessionId).toBe("new-session-id"); + expect(stored[sessionKey].sessionFile).toBe( + path.join(expectedSessionDir, "new-session-id.jsonl"), + ); + expect(stored[sessionKey].compactionCount).toBe(2); + }); + + it("does not update sessionFile when newSessionId matches current sessionId", async () => { + const entry = { + sessionId: "same-id", + sessionFile: "same-id.jsonl", + updatedAt: Date.now(), + compactionCount: 0, + } as SessionEntry; + const { storePath, sessionKey, sessionStore } = await createCompactionSessionFixture(entry); + + await incrementCompactionCount({ + sessionEntry: entry, + sessionStore, + sessionKey, + storePath, + newSessionId: "same-id", + }); + + const stored = JSON.parse(await fs.readFile(storePath, "utf-8")); + expect(stored[sessionKey].sessionId).toBe("same-id"); + expect(stored[sessionKey].sessionFile).toBe("same-id.jsonl"); + expect(stored[sessionKey].compactionCount).toBe(1); + }); + it("does not update totalTokens when tokensAfter is not provided", async () => { const entry = { sessionId: "s1", diff --git a/src/auto-reply/reply/session-run-accounting.ts b/src/auto-reply/reply/session-run-accounting.ts index 1a8a0d83640..75b9966414f 100644 --- a/src/auto-reply/reply/session-run-accounting.ts +++ b/src/auto-reply/reply/session-run-accounting.ts @@ -11,6 +11,7 @@ type IncrementRunCompactionCountParams = Omit< amount?: number; lastCallUsage?: NormalizedUsage; contextTokensUsed?: number; + newSessionId?: string; }; export async function persistRunSessionUsage(params: PersistRunSessionUsageParams): Promise { @@ -33,5 +34,6 @@ export async function incrementRunCompactionCount( storePath: params.storePath, amount: params.amount, tokensAfter: tokensAfterCompaction, + newSessionId: params.newSessionId, }); } diff --git a/src/auto-reply/reply/session-updates.ts b/src/auto-reply/reply/session-updates.ts index 59981ae7fd3..e18ea3987c0 100644 --- a/src/auto-reply/reply/session-updates.ts +++ b/src/auto-reply/reply/session-updates.ts @@ -1,10 +1,129 @@ import crypto from "node:crypto"; +import fs from "node:fs"; +import path from "node:path"; +import { resolveUserTimezone } from "../../agents/date-time.js"; import { buildWorkspaceSkillSnapshot } from "../../agents/skills.js"; import { ensureSkillsWatcher, getSkillsSnapshotVersion } from "../../agents/skills/refresh.js"; import type { OpenClawConfig } from "../../config/config.js"; -import { type SessionEntry, updateSessionStore } from "../../config/sessions.js"; +import { + resolveSessionFilePath, + resolveSessionFilePathOptions, + type SessionEntry, + updateSessionStore, +} from "../../config/sessions.js"; +import { buildChannelSummary } from "../../infra/channel-summary.js"; +import { + resolveTimezone, + formatUtcTimestamp, + formatZonedTimestamp, +} from "../../infra/format-time/format-datetime.ts"; import { getRemoteSkillEligibility } from "../../infra/skills-remote.js"; -export { drainFormattedSystemEvents } from "./session-system-events.js"; +import { drainSystemEventEntries } from "../../infra/system-events.js"; +import { resolveAgentIdFromSessionKey } from "../../routing/session-key.js"; + +/** Drain queued system events, format as `System:` lines, return the block (or undefined). */ +export async function drainFormattedSystemEvents(params: { + cfg: OpenClawConfig; + sessionKey: string; + isMainSession: boolean; + isNewSession: boolean; +}): Promise { + const compactSystemEvent = (line: string): string | null => { + const trimmed = line.trim(); + if (!trimmed) { + return null; + } + const lower = trimmed.toLowerCase(); + if (lower.includes("reason periodic")) { + return null; + } + // Filter out the actual heartbeat prompt, but not cron jobs that mention "heartbeat" + // The heartbeat prompt starts with "Read HEARTBEAT.md" - cron payloads won't match this + if (lower.startsWith("read heartbeat.md")) { + return null; + } + // Also filter heartbeat poll/wake noise + if (lower.includes("heartbeat poll") || lower.includes("heartbeat wake")) { + return null; + } + if (trimmed.startsWith("Node:")) { + return trimmed.replace(/ · last input [^·]+/i, "").trim(); + } + return trimmed; + }; + + const resolveSystemEventTimezone = (cfg: OpenClawConfig) => { + const raw = cfg.agents?.defaults?.envelopeTimezone?.trim(); + if (!raw) { + return { mode: "local" as const }; + } + const lowered = raw.toLowerCase(); + if (lowered === "utc" || lowered === "gmt") { + return { mode: "utc" as const }; + } + if (lowered === "local" || lowered === "host") { + return { mode: "local" as const }; + } + if (lowered === "user") { + return { + mode: "iana" as const, + timeZone: resolveUserTimezone(cfg.agents?.defaults?.userTimezone), + }; + } + const explicit = resolveTimezone(raw); + return explicit ? { mode: "iana" as const, timeZone: explicit } : { mode: "local" as const }; + }; + + const formatSystemEventTimestamp = (ts: number, cfg: OpenClawConfig) => { + const date = new Date(ts); + if (Number.isNaN(date.getTime())) { + return "unknown-time"; + } + const zone = resolveSystemEventTimezone(cfg); + if (zone.mode === "utc") { + return formatUtcTimestamp(date, { displaySeconds: true }); + } + if (zone.mode === "local") { + return formatZonedTimestamp(date, { displaySeconds: true }) ?? "unknown-time"; + } + return ( + formatZonedTimestamp(date, { timeZone: zone.timeZone, displaySeconds: true }) ?? + "unknown-time" + ); + }; + + const systemLines: string[] = []; + const queued = drainSystemEventEntries(params.sessionKey); + systemLines.push( + ...queued + .map((event) => { + const compacted = compactSystemEvent(event.text); + if (!compacted) { + return null; + } + return `[${formatSystemEventTimestamp(event.ts, params.cfg)}] ${compacted}`; + }) + .filter((v): v is string => Boolean(v)), + ); + if (params.isMainSession && params.isNewSession) { + const summary = await buildChannelSummary(params.cfg); + if (summary.length > 0) { + systemLines.unshift(...summary); + } + } + if (systemLines.length === 0) { + return undefined; + } + + // Format events as trusted System: lines for the message timeline. + // Inbound sanitization rewrites any user-supplied "System:" to "System (untrusted):", + // so these gateway-originated lines are distinguishable by the model. + // Each sub-line of a multi-line event gets its own System: prefix so continuation + // lines can't be mistaken for user content. + return systemLines + .flatMap((line) => line.split("\n").map((subline) => `System: ${subline}`)) + .join("\n"); +} async function persistSessionEntryUpdate(params: { sessionStore?: Record; @@ -147,6 +266,8 @@ export async function incrementCompactionCount(params: { amount?: number; /** Token count after compaction - if provided, updates session token counts */ tokensAfter?: number; + /** Session id after compaction, when the runtime rotated transcripts. */ + newSessionId?: string; }): Promise { const { sessionEntry, @@ -156,6 +277,7 @@ export async function incrementCompactionCount(params: { now = Date.now(), amount = 1, tokensAfter, + newSessionId, } = params; if (!sessionStore || !sessionKey) { return undefined; @@ -171,6 +293,15 @@ export async function incrementCompactionCount(params: { compactionCount: nextCount, updatedAt: now, }; + if (newSessionId && newSessionId !== entry.sessionId) { + updates.sessionId = newSessionId; + updates.sessionFile = resolveCompactionSessionFile({ + entry, + sessionKey, + storePath, + newSessionId, + }); + } // If tokensAfter is provided, update the cached token counts to reflect post-compaction state if (tokensAfter != null && tokensAfter > 0) { updates.totalTokens = tokensAfter; @@ -195,3 +326,72 @@ export async function incrementCompactionCount(params: { } return nextCount; } + +function resolveCompactionSessionFile(params: { + entry: SessionEntry; + sessionKey: string; + storePath?: string; + newSessionId: string; +}): string { + const agentId = resolveAgentIdFromSessionKey(params.sessionKey); + const pathOpts = resolveSessionFilePathOptions({ + agentId, + storePath: params.storePath, + }); + const rewrittenSessionFile = rewriteSessionFileForNewSessionId({ + sessionFile: params.entry.sessionFile, + previousSessionId: params.entry.sessionId, + nextSessionId: params.newSessionId, + }); + const normalizedRewrittenSessionFile = + rewrittenSessionFile && path.isAbsolute(rewrittenSessionFile) + ? canonicalizeAbsoluteSessionFilePath(rewrittenSessionFile) + : rewrittenSessionFile; + return resolveSessionFilePath( + params.newSessionId, + normalizedRewrittenSessionFile ? { sessionFile: normalizedRewrittenSessionFile } : undefined, + pathOpts, + ); +} + +function canonicalizeAbsoluteSessionFilePath(filePath: string): string { + const resolved = path.resolve(filePath); + try { + const parentDir = fs.realpathSync(path.dirname(resolved)); + return path.join(parentDir, path.basename(resolved)); + } catch { + return resolved; + } +} + +function rewriteSessionFileForNewSessionId(params: { + sessionFile?: string; + previousSessionId: string; + nextSessionId: string; +}): string | undefined { + const trimmed = params.sessionFile?.trim(); + if (!trimmed) { + return undefined; + } + const base = path.basename(trimmed); + if (!base.endsWith(".jsonl")) { + return undefined; + } + const withoutExt = base.slice(0, -".jsonl".length); + if (withoutExt === params.previousSessionId) { + return path.join(path.dirname(trimmed), `${params.nextSessionId}.jsonl`); + } + if (withoutExt.startsWith(`${params.previousSessionId}-topic-`)) { + return path.join( + path.dirname(trimmed), + `${params.nextSessionId}${base.slice(params.previousSessionId.length)}`, + ); + } + const forkMatch = withoutExt.match( + /^(\d{4}-\d{2}-\d{2}T[\w-]+(?:Z|[+-]\d{2}(?:-\d{2})?)?)_(.+)$/, + ); + if (forkMatch?.[2] === params.previousSessionId) { + return path.join(path.dirname(trimmed), `${forkMatch[1]}_${params.nextSessionId}.jsonl`); + } + return undefined; +} diff --git a/src/gateway/session-utils.fs.test.ts b/src/gateway/session-utils.fs.test.ts index 711948f5b6f..ee6b0da66df 100644 --- a/src/gateway/session-utils.fs.test.ts +++ b/src/gateway/session-utils.fs.test.ts @@ -556,39 +556,6 @@ describe("readSessionMessages", () => { expect((out[0] as { __openclaw?: { seq?: number } }).__openclaw?.seq).toBe(1); } }); - - test("preserves raw assistant transcript content on disk reads", () => { - const sessionId = "assistant-scaffolding"; - const transcriptPath = path.join(tmpDir, `${sessionId}.jsonl`); - fs.writeFileSync( - transcriptPath, - [ - JSON.stringify({ type: "session", version: 1, id: sessionId }), - JSON.stringify({ - message: { - role: "assistant", - text: "hiddenVisible top-level", - content: [ - { type: "text", text: "secretVisible content" }, - { type: "tool_result", text: "keep?Visible tool text" }, - ], - }, - }), - ].join("\n"), - "utf-8", - ); - - const out = readSessionMessages(sessionId, storePath); - expect(out).toHaveLength(1); - expect(out[0]).toMatchObject({ - role: "assistant", - text: "hiddenVisible top-level", - content: [ - { type: "text", text: "secretVisible content" }, - { type: "tool_result", text: "keep?Visible tool text" }, - ], - }); - }); }); describe("readSessionPreviewItemsFromTranscript", () => { @@ -898,6 +865,78 @@ describe("resolveSessionTranscriptCandidates safety", () => { expect(candidates.some((value) => value.includes("etc/passwd"))).toBe(false); expect(normalizedCandidates).toContain(expectedFallback); }); + + test("prefers the current sessionId transcript before a stale sessionFile candidate", () => { + const storePath = "/tmp/openclaw/agents/main/sessions/sessions.json"; + const candidates = resolveSessionTranscriptCandidates( + "11111111-1111-4111-8111-111111111111", + storePath, + "/tmp/openclaw/agents/main/sessions/22222222-2222-4222-8222-222222222222.jsonl", + ); + + expect(candidates[0]).toBe( + path.resolve("/tmp/openclaw/agents/main/sessions/11111111-1111-4111-8111-111111111111.jsonl"), + ); + expect(candidates).toContain( + path.resolve("/tmp/openclaw/agents/main/sessions/22222222-2222-4222-8222-222222222222.jsonl"), + ); + }); + + test("keeps explicit custom sessionFile ahead of synthesized fallback", () => { + const storePath = "/tmp/openclaw/agents/main/sessions/sessions.json"; + const sessionFile = "/tmp/openclaw/agents/main/sessions/custom-transcript.jsonl"; + const candidates = resolveSessionTranscriptCandidates( + "11111111-1111-4111-8111-111111111111", + storePath, + sessionFile, + ); + + expect(candidates[0]).toBe(path.resolve(sessionFile)); + }); + + test("keeps custom topic-like transcript paths ahead of synthesized fallback", () => { + const storePath = "/tmp/openclaw/agents/main/sessions/sessions.json"; + const sessionFile = "/tmp/openclaw/agents/main/sessions/custom-topic-notes.jsonl"; + const candidates = resolveSessionTranscriptCandidates( + "11111111-1111-4111-8111-111111111111", + storePath, + sessionFile, + ); + + expect(candidates[0]).toBe(path.resolve(sessionFile)); + }); + + test("keeps forked transcript paths ahead of synthesized fallback", () => { + const storePath = "/tmp/openclaw/agents/main/sessions/sessions.json"; + const sessionId = "11111111-1111-4111-8111-111111111111"; + const sessionFile = + "/tmp/openclaw/agents/main/sessions/2026-03-23T16-30-00-000Z_11111111-1111-4111-8111-111111111111.jsonl"; + const candidates = resolveSessionTranscriptCandidates(sessionId, storePath, sessionFile); + + expect(candidates[0]).toBe(path.resolve(sessionFile)); + }); + + test("keeps timestamped custom transcript paths ahead of synthesized fallback", () => { + const storePath = "/tmp/openclaw/agents/main/sessions/sessions.json"; + const sessionId = "11111111-1111-4111-8111-111111111111"; + const sessionFile = "/tmp/openclaw/agents/main/sessions/2026-03-23T16-30-00-000Z_notes.jsonl"; + const candidates = resolveSessionTranscriptCandidates(sessionId, storePath, sessionFile); + + expect(candidates[0]).toBe(path.resolve(sessionFile)); + }); + + test("still treats generated topic transcripts from another session as stale", () => { + const storePath = "/tmp/openclaw/agents/main/sessions/sessions.json"; + const sessionId = "11111111-1111-4111-8111-111111111111"; + const staleSessionFile = + "/tmp/openclaw/agents/main/sessions/22222222-2222-4222-8222-222222222222-topic-thread.jsonl"; + const candidates = resolveSessionTranscriptCandidates(sessionId, storePath, staleSessionFile); + + expect(candidates[0]).toBe( + path.resolve("/tmp/openclaw/agents/main/sessions/11111111-1111-4111-8111-111111111111.jsonl"), + ); + expect(candidates).toContain(path.resolve(staleSessionFile)); + }); }); describe("archiveSessionTranscripts", () => { diff --git a/src/gateway/session-utils.fs.ts b/src/gateway/session-utils.fs.ts index 82bfd55323c..9aa22caf461 100644 --- a/src/gateway/session-utils.fs.ts +++ b/src/gateway/session-utils.fs.ts @@ -3,16 +3,13 @@ import os from "node:os"; import path from "node:path"; import { deriveSessionTotalTokens, hasNonzeroUsage, normalizeUsage } from "../agents/usage.js"; import { + formatSessionArchiveTimestamp, + parseSessionArchiveTimestamp, + type SessionArchiveReason, resolveSessionFilePath, resolveSessionTranscriptPath, resolveSessionTranscriptPathInDir, } from "../config/sessions.js"; -export { - archiveFileOnDisk, - archiveSessionTranscripts, - cleanupArchivedSessionTranscripts, - type ArchiveFileReason, -} from "../gateway/session-archive.fs.js"; import { resolveRequiredHomeDir } from "../infra/home-dir.js"; import { jsonUtf8Bytes } from "../infra/json-utf8-bytes.js"; import { hasInterSessionUserProvenance } from "../sessions/input-provenance.js"; @@ -159,6 +156,7 @@ export function resolveSessionTranscriptCandidates( agentId?: string, ): string[] { const candidates: string[] = []; + const sessionFileState = classifySessionTranscriptCandidate(sessionId, sessionFile); const pushCandidate = (resolve: () => string): void => { try { candidates.push(resolve()); @@ -169,15 +167,22 @@ export function resolveSessionTranscriptCandidates( if (storePath) { const sessionsDir = path.dirname(storePath); - if (sessionFile) { + if (sessionFile && sessionFileState !== "stale") { pushCandidate(() => resolveSessionFilePath(sessionId, { sessionFile }, { sessionsDir, agentId }), ); } pushCandidate(() => resolveSessionTranscriptPathInDir(sessionId, sessionsDir)); + if (sessionFile && sessionFileState === "stale") { + pushCandidate(() => + resolveSessionFilePath(sessionId, { sessionFile }, { sessionsDir, agentId }), + ); + } } else if (sessionFile) { if (agentId) { - pushCandidate(() => resolveSessionFilePath(sessionId, { sessionFile }, { agentId })); + if (sessionFileState !== "stale") { + pushCandidate(() => resolveSessionFilePath(sessionId, { sessionFile }, { agentId })); + } } else { const trimmed = sessionFile.trim(); if (trimmed) { @@ -188,6 +193,9 @@ export function resolveSessionTranscriptCandidates( if (agentId) { pushCandidate(() => resolveSessionTranscriptPath(sessionId, agentId)); + if (sessionFile && sessionFileState === "stale") { + pushCandidate(() => resolveSessionFilePath(sessionId, { sessionFile }, { agentId })); + } } const home = resolveRequiredHomeDir(process.env, os.homedir); @@ -197,6 +205,151 @@ export function resolveSessionTranscriptCandidates( return Array.from(new Set(candidates)); } +export type ArchiveFileReason = SessionArchiveReason; + +function classifySessionTranscriptCandidate( + sessionId: string, + sessionFile?: string, +): "current" | "stale" | "custom" { + const transcriptSessionId = extractGeneratedTranscriptSessionId(sessionFile); + if (!transcriptSessionId) { + return "custom"; + } + return transcriptSessionId === sessionId ? "current" : "stale"; +} + +function extractGeneratedTranscriptSessionId(sessionFile?: string): string | undefined { + const trimmed = sessionFile?.trim(); + if (!trimmed) { + return undefined; + } + const base = path.basename(trimmed); + if (!base.endsWith(".jsonl")) { + return undefined; + } + const withoutExt = base.slice(0, -".jsonl".length); + const topicIndex = withoutExt.indexOf("-topic-"); + if (topicIndex > 0) { + const topicSessionId = withoutExt.slice(0, topicIndex); + return looksLikeGeneratedSessionId(topicSessionId) ? topicSessionId : undefined; + } + const forkMatch = withoutExt.match( + /^(\d{4}-\d{2}-\d{2}T[\w-]+(?:Z|[+-]\d{2}(?:-\d{2})?)?)_(.+)$/, + ); + if (forkMatch?.[2]) { + return looksLikeGeneratedSessionId(forkMatch[2]) ? forkMatch[2] : undefined; + } + if (looksLikeGeneratedSessionId(withoutExt)) { + return withoutExt; + } + return undefined; +} + +function looksLikeGeneratedSessionId(value: string): boolean { + return /^[0-9a-f]{8}-[0-9a-f]{4}-[1-8][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i.test(value); +} + +function canonicalizePathForComparison(filePath: string): string { + const resolved = path.resolve(filePath); + try { + return fs.realpathSync(resolved); + } catch { + return resolved; + } +} + +export function archiveFileOnDisk(filePath: string, reason: ArchiveFileReason): string { + const ts = formatSessionArchiveTimestamp(); + const archived = `${filePath}.${reason}.${ts}`; + fs.renameSync(filePath, archived); + return archived; +} + +/** + * Archives all transcript files for a given session. + * Best-effort: silently skips files that don't exist or fail to rename. + */ +export function archiveSessionTranscripts(opts: { + sessionId: string; + storePath: string | undefined; + sessionFile?: string; + agentId?: string; + reason: "reset" | "deleted"; + /** + * When true, only archive files resolved under the session store directory. + * This prevents maintenance operations from mutating paths outside the agent sessions dir. + */ + restrictToStoreDir?: boolean; +}): string[] { + const archived: string[] = []; + const storeDir = + opts.restrictToStoreDir && opts.storePath + ? canonicalizePathForComparison(path.dirname(opts.storePath)) + : null; + for (const candidate of resolveSessionTranscriptCandidates( + opts.sessionId, + opts.storePath, + opts.sessionFile, + opts.agentId, + )) { + const candidatePath = canonicalizePathForComparison(candidate); + if (storeDir) { + const relative = path.relative(storeDir, candidatePath); + if (!relative || relative.startsWith("..") || path.isAbsolute(relative)) { + continue; + } + } + if (!fs.existsSync(candidatePath)) { + continue; + } + try { + archived.push(archiveFileOnDisk(candidatePath, opts.reason)); + } catch { + // Best-effort. + } + } + return archived; +} + +export async function cleanupArchivedSessionTranscripts(opts: { + directories: string[]; + olderThanMs: number; + reason?: ArchiveFileReason; + nowMs?: number; +}): Promise<{ removed: number; scanned: number }> { + if (!Number.isFinite(opts.olderThanMs) || opts.olderThanMs < 0) { + return { removed: 0, scanned: 0 }; + } + const now = opts.nowMs ?? Date.now(); + const reason: ArchiveFileReason = opts.reason ?? "deleted"; + const directories = Array.from(new Set(opts.directories.map((dir) => path.resolve(dir)))); + let removed = 0; + let scanned = 0; + + for (const dir of directories) { + const entries = await fs.promises.readdir(dir).catch(() => []); + for (const entry of entries) { + const timestamp = parseSessionArchiveTimestamp(entry, reason); + if (timestamp == null) { + continue; + } + scanned += 1; + if (now - timestamp <= opts.olderThanMs) { + continue; + } + const fullPath = path.join(dir, entry); + const stat = await fs.promises.stat(fullPath).catch(() => null); + if (!stat?.isFile()) { + continue; + } + await fs.promises.rm(fullPath).catch(() => undefined); + removed += 1; + } + } + + return { removed, scanned }; +} + export function capArrayByJsonBytes( items: T[], maxBytes: number,