diff --git a/docs/reference/transcript-hygiene.md b/docs/reference/transcript-hygiene.md index 1321175ded9..21fdb3006d6 100644 --- a/docs/reference/transcript-hygiene.md +++ b/docs/reference/transcript-hygiene.md @@ -37,7 +37,7 @@ If you need transcript storage details, see: All transcript hygiene is centralized in the embedded runner: - Policy selection: `src/agents/transcript-policy.ts` -- Sanitization/repair application: `sanitizeSessionHistory` in `src/agents/pi-embedded-runner/google.ts` +- Sanitization/repair application: `sanitizeSessionHistory` in `src/agents/pi-embedded-runner/transcript-hygiene.ts` The policy uses `provider`, `modelApi`, and `modelId` to decide what to apply. @@ -118,6 +118,12 @@ external end-user instructions. - Tool result pairing repair and synthetic tool results. - Turn validation (merge consecutive user turns to satisfy strict alternation). +- Preserve the latest assistant turn verbatim when it contains `thinking`/`redacted_thinking` blocks. + +**GitHub Copilot Claude** + +- Drop persisted `thinking` blocks from older assistant turns with invalid Copilot signatures. +- Preserve the latest assistant turn unchanged so Anthropic replay requirements stay intact. **Mistral (including model-id based detection)** diff --git a/src/agents/pi-embedded-runner.openai-tool-id-preservation.test.ts b/src/agents/pi-embedded-runner.openai-tool-id-preservation.test.ts index 43b1e76b2d1..0ff721f6a49 100644 --- a/src/agents/pi-embedded-runner.openai-tool-id-preservation.test.ts +++ b/src/agents/pi-embedded-runner.openai-tool-id-preservation.test.ts @@ -4,7 +4,7 @@ import { makeInMemorySessionManager, makeModelSnapshotEntry, } from "./pi-embedded-runner.sanitize-session-history.test-harness.js"; -import { sanitizeSessionHistory } from "./pi-embedded-runner/google.js"; +import { sanitizeSessionHistory } from "./pi-embedded-runner/transcript-hygiene.js"; import { castAgentMessage } from "./test-helpers/agent-message-fixtures.js"; describe("sanitizeSessionHistory openai tool id preservation", () => { diff --git a/src/agents/pi-embedded-runner.sanitize-session-history.test-harness.ts b/src/agents/pi-embedded-runner.sanitize-session-history.test-harness.ts index 97750fc1dbc..d6774cfadde 100644 --- a/src/agents/pi-embedded-runner.sanitize-session-history.test-harness.ts +++ b/src/agents/pi-embedded-runner.sanitize-session-history.test-harness.ts @@ -57,7 +57,7 @@ export function makeSimpleUserMessages(): AgentMessage[] { export async function loadSanitizeSessionHistoryWithCleanMocks(): Promise { vi.resetAllMocks(); vi.mocked(helpers.sanitizeSessionMessagesImages).mockImplementation(async (msgs) => msgs); - const mod = await import("./pi-embedded-runner/google.js"); + const mod = await import("./pi-embedded-runner/transcript-hygiene.js"); return mod.sanitizeSessionHistory; } diff --git a/src/agents/pi-embedded-runner.sanitize-session-history.test.ts b/src/agents/pi-embedded-runner.sanitize-session-history.test.ts index 4fb4659c15d..229a8ec13ee 100644 --- a/src/agents/pi-embedded-runner.sanitize-session-history.test.ts +++ b/src/agents/pi-embedded-runner.sanitize-session-history.test.ts @@ -175,14 +175,14 @@ describe("sanitizeSessionHistory", () => { }); }); - it("sanitizes tool call ids with strict9 for Mistral models", async () => { + it("sanitizes tool call ids with strict9 for Mistral providers", async () => { setNonGoogleModelApi(); await sanitizeSessionHistory({ messages: mockMessages, modelApi: "openai-responses", - provider: "openrouter", - modelId: "mistralai/devstral-2512:free", + provider: "mistral", + modelId: "codestral-latest", sessionManager: mockSessionManager, sessionId: TEST_SESSION_ID, }); @@ -720,17 +720,7 @@ describe("sanitizeSessionHistory", () => { ).toBe(false); }); - it("drops assistant thinking blocks for github-copilot models", async () => { - setNonGoogleModelApi(); - - const messages = makeThinkingAndTextAssistantMessages("reasoning_text"); - - const result = await sanitizeGithubCopilotHistory({ messages }); - const assistant = getAssistantMessage(result); - expect(assistant.content).toEqual([{ type: "text", text: "hi" }]); - }); - - it("preserves assistant turn when all content is thinking blocks (github-copilot)", async () => { + it("drops thinking blocks from older github-copilot assistant turns", async () => { setNonGoogleModelApi(); const messages: AgentMessage[] = [ @@ -738,22 +728,50 @@ describe("sanitizeSessionHistory", () => { makeAssistantMessage([ { type: "thinking", - thinking: "some reasoning", + thinking: "older reasoning", thinkingSignature: "reasoning_text", }, + { type: "text", text: "older answer" }, ]), makeUserMessage("follow up"), + makeAssistantMessage([{ type: "text", text: "latest answer" }]), ]; const result = await sanitizeGithubCopilotHistory({ messages }); - - // Assistant turn should be preserved (not dropped) to maintain turn alternation - expect(result).toHaveLength(3); const assistant = getAssistantMessage(result); - expect(assistant.content).toEqual([{ type: "text", text: "" }]); + expect(assistant.content).toEqual([{ type: "text", text: "older answer" }]); }); - it("preserves tool_use blocks when dropping thinking blocks (github-copilot)", async () => { + it("preserves the latest github-copilot assistant turn unchanged", async () => { + setNonGoogleModelApi(); + + const messages: AgentMessage[] = [ + makeUserMessage("hello"), + makeAssistantMessage([{ type: "text", text: "previous answer" }]), + makeUserMessage("follow up"), + makeAssistantMessage([ + { + type: "thinking", + thinking: "some reasoning", + thinkingSignature: "reasoning_text", + }, + { type: "text", text: "latest answer" }, + ]), + ]; + + const result = await sanitizeGithubCopilotHistory({ messages }); + const latestAssistant = result.at(-1) as Extract; + expect(latestAssistant.content).toEqual([ + { + type: "thinking", + thinking: "some reasoning", + thinkingSignature: "reasoning_text", + }, + { type: "text", text: "latest answer" }, + ]); + }); + + it("preserves tool_use blocks when dropping thinking blocks from older github-copilot turns", async () => { setNonGoogleModelApi(); const messages: AgentMessage[] = [ @@ -767,6 +785,8 @@ describe("sanitizeSessionHistory", () => { { type: "toolCall", id: "tool_123", name: "read", arguments: { path: "/tmp/test" } }, { type: "text", text: "Let me read that file." }, ]), + makeUserMessage("keep latest stable"), + makeAssistantMessage([{ type: "text", text: "latest answer" }]), ]; const result = await sanitizeGithubCopilotHistory({ messages }); @@ -794,6 +814,35 @@ describe("sanitizeSessionHistory", () => { expect(types).toContain("thinking"); }); + it("preserves the latest anthropic assistant turn with thinking blocks verbatim", async () => { + setNonGoogleModelApi(); + + const messages: AgentMessage[] = [ + makeUserMessage("hello"), + makeAssistantMessage( + [ + { type: "text", text: "previous answer" }, + { type: "thinking", thinking: "latest reasoning", thinkingSignature: "sig" }, + { type: "text", text: "" }, + { type: "text", text: "latest answer" }, + ], + { timestamp: nextTimestamp() }, + ), + makeUserMessage("follow up"), + ]; + + const result = await sanitizeSessionHistory({ + messages, + modelApi: "anthropic-messages", + provider: "anthropic", + modelId: "claude-opus-4-6", + sessionManager: makeMockSessionManager(), + sessionId: TEST_SESSION_ID, + }); + + expect(result[1]).toEqual(messages[1]); + }); + it("does not drop thinking blocks for non-claude copilot models", async () => { setNonGoogleModelApi(); diff --git a/src/agents/pi-embedded-runner/compact.ts b/src/agents/pi-embedded-runner/compact.ts index a62ed2eecb0..57bb35f1429 100644 --- a/src/agents/pi-embedded-runner/compact.ts +++ b/src/agents/pi-embedded-runner/compact.ts @@ -73,11 +73,7 @@ import { EMBEDDED_COMPACTION_TIMEOUT_MS, } from "./compaction-safety-timeout.js"; import { buildEmbeddedExtensionFactories } from "./extensions.js"; -import { - logToolSchemasForGoogle, - sanitizeSessionHistory, - sanitizeToolsForGoogle, -} from "./google.js"; +import { logToolSchemasForGoogle, sanitizeToolsForGoogle } from "./google.js"; import { getDmHistoryLimitFromSessionKey, limitHistoryTurns } from "./history.js"; import { resolveGlobalLane, resolveSessionLane } from "./lanes.js"; import { log } from "./logger.js"; @@ -92,6 +88,7 @@ import { } from "./system-prompt.js"; import { collectAllowedToolNames } from "./tool-name-allowlist.js"; import { splitSdkTools } from "./tool-split.js"; +import { sanitizeSessionHistory } from "./transcript-hygiene.js"; import type { EmbeddedPiCompactResult } from "./types.js"; import { describeUnknownError, mapThinkingLevel } from "./utils.js"; import { flushPendingToolResultsAfterIdle } from "./wait-for-idle-before-flush.js"; diff --git a/src/agents/pi-embedded-runner/compaction-failures.ts b/src/agents/pi-embedded-runner/compaction-failures.ts new file mode 100644 index 00000000000..e3186b648f1 --- /dev/null +++ b/src/agents/pi-embedded-runner/compaction-failures.ts @@ -0,0 +1,24 @@ +import { EventEmitter } from "node:events"; +import { registerUnhandledRejectionHandler } from "../../infra/unhandled-rejections.js"; +import { isCompactionFailureError } from "../pi-embedded-helpers.js"; +import { log } from "./logger.js"; +import { describeUnknownError } from "./utils.js"; + +const compactionFailureEmitter = new EventEmitter(); + +export type CompactionFailureListener = (reason: string) => void; + +export function onUnhandledCompactionFailure(cb: CompactionFailureListener): () => void { + compactionFailureEmitter.on("failure", cb); + return () => compactionFailureEmitter.off("failure", cb); +} + +registerUnhandledRejectionHandler((reason) => { + const message = describeUnknownError(reason); + if (!isCompactionFailureError(message)) { + return false; + } + log.error(`Auto-compaction failed (unhandled): ${message}`); + compactionFailureEmitter.emit("failure", message); + return true; +}); diff --git a/src/agents/pi-embedded-runner/google.ts b/src/agents/pi-embedded-runner/google.ts index 265593f03e0..75b42483d1a 100644 --- a/src/agents/pi-embedded-runner/google.ts +++ b/src/agents/pi-embedded-runner/google.ts @@ -1,39 +1,9 @@ -import { EventEmitter } from "node:events"; import type { AgentMessage, AgentTool } from "@mariozechner/pi-agent-core"; import type { SessionManager } from "@mariozechner/pi-coding-agent"; import type { TSchema } from "@sinclair/typebox"; -import type { OpenClawConfig } from "../../config/config.js"; -import { registerUnhandledRejectionHandler } from "../../infra/unhandled-rejections.js"; -import { - hasInterSessionUserProvenance, - normalizeInputProvenance, -} from "../../sessions/input-provenance.js"; -import { resolveImageSanitizationLimits } from "../image-sanitization.js"; -import { - downgradeOpenAIFunctionCallReasoningPairs, - downgradeOpenAIReasoningBlocks, - isCompactionFailureError, - isGoogleModelApi, - sanitizeGoogleTurnOrdering, - sanitizeSessionMessagesImages, -} from "../pi-embedded-helpers.js"; +import { isGoogleModelApi, sanitizeGoogleTurnOrdering } from "../pi-embedded-helpers.js"; import { cleanToolSchemaForGemini } from "../pi-tools.schema.js"; -import { - sanitizeToolCallInputs, - stripToolResultDetails, - sanitizeToolUseResultPairing, -} from "../session-transcript-repair.js"; -import type { TranscriptPolicy } from "../transcript-policy.js"; -import { resolveTranscriptPolicy } from "../transcript-policy.js"; -import { - makeZeroUsageSnapshot, - normalizeUsage, - type AssistantUsageSnapshot, - type UsageLike, -} from "../usage.js"; import { log } from "./logger.js"; -import { dropThinkingBlocks } from "./thinking.js"; -import { describeUnknownError } from "./utils.js"; const GOOGLE_TURN_ORDERING_CUSTOM_TYPE = "google-turn-ordering-bootstrap"; const GOOGLE_SCHEMA_UNSUPPORTED_KEYWORDS = new Set([ @@ -59,256 +29,7 @@ const GOOGLE_SCHEMA_UNSUPPORTED_KEYWORDS = new Set([ "maxProperties", ]); -const INTER_SESSION_PREFIX_BASE = "[Inter-session message]"; - -function buildInterSessionPrefix(message: AgentMessage): string { - const provenance = normalizeInputProvenance((message as { provenance?: unknown }).provenance); - if (!provenance) { - return INTER_SESSION_PREFIX_BASE; - } - const details = [ - provenance.sourceSessionKey ? `sourceSession=${provenance.sourceSessionKey}` : undefined, - provenance.sourceChannel ? `sourceChannel=${provenance.sourceChannel}` : undefined, - provenance.sourceTool ? `sourceTool=${provenance.sourceTool}` : undefined, - ].filter(Boolean); - if (details.length === 0) { - return INTER_SESSION_PREFIX_BASE; - } - return `${INTER_SESSION_PREFIX_BASE} ${details.join(" ")}`; -} - -function annotateInterSessionUserMessages(messages: AgentMessage[]): AgentMessage[] { - let touched = false; - const out: AgentMessage[] = []; - for (const msg of messages) { - if (!hasInterSessionUserProvenance(msg as { role?: unknown; provenance?: unknown })) { - out.push(msg); - continue; - } - const prefix = buildInterSessionPrefix(msg); - const user = msg as Extract; - if (typeof user.content === "string") { - if (user.content.startsWith(prefix)) { - out.push(msg); - continue; - } - touched = true; - out.push({ - ...(msg as unknown as Record), - content: `${prefix}\n${user.content}`, - } as AgentMessage); - continue; - } - if (!Array.isArray(user.content)) { - out.push(msg); - continue; - } - - const textIndex = user.content.findIndex( - (block) => - block && - typeof block === "object" && - (block as { type?: unknown }).type === "text" && - typeof (block as { text?: unknown }).text === "string", - ); - - if (textIndex >= 0) { - const existing = user.content[textIndex] as { type: "text"; text: string }; - if (existing.text.startsWith(prefix)) { - out.push(msg); - continue; - } - const nextContent = [...user.content]; - nextContent[textIndex] = { - ...existing, - text: `${prefix}\n${existing.text}`, - }; - touched = true; - out.push({ - ...(msg as unknown as Record), - content: nextContent, - } as AgentMessage); - continue; - } - - touched = true; - out.push({ - ...(msg as unknown as Record), - content: [{ type: "text", text: prefix }, ...user.content], - } as AgentMessage); - } - return touched ? out : messages; -} - -function parseMessageTimestamp(value: unknown): number | null { - if (typeof value === "number" && Number.isFinite(value)) { - return value; - } - if (typeof value === "string") { - const parsed = Date.parse(value); - if (Number.isFinite(parsed)) { - return parsed; - } - } - return null; -} - -function stripStaleAssistantUsageBeforeLatestCompaction(messages: AgentMessage[]): AgentMessage[] { - let latestCompactionSummaryIndex = -1; - let latestCompactionTimestamp: number | null = null; - for (let i = 0; i < messages.length; i += 1) { - const entry = messages[i]; - if (entry?.role !== "compactionSummary") { - continue; - } - latestCompactionSummaryIndex = i; - latestCompactionTimestamp = parseMessageTimestamp( - (entry as { timestamp?: unknown }).timestamp ?? null, - ); - } - if (latestCompactionSummaryIndex === -1) { - return messages; - } - - const out = [...messages]; - let touched = false; - for (let i = 0; i < out.length; i += 1) { - const candidate = out[i] as - | (AgentMessage & { usage?: unknown; timestamp?: unknown }) - | undefined; - if (!candidate || candidate.role !== "assistant") { - continue; - } - if (!candidate.usage || typeof candidate.usage !== "object") { - continue; - } - - const messageTimestamp = parseMessageTimestamp(candidate.timestamp); - const staleByTimestamp = - latestCompactionTimestamp !== null && - messageTimestamp !== null && - messageTimestamp <= latestCompactionTimestamp; - const staleByLegacyOrdering = i < latestCompactionSummaryIndex; - if (!staleByTimestamp && !staleByLegacyOrdering) { - continue; - } - - // pi-coding-agent expects assistant usage to always be present during context - // accounting. Keep stale snapshots structurally valid, but zeroed out. - const candidateRecord = candidate as unknown as Record; - out[i] = { - ...candidateRecord, - usage: makeZeroUsageSnapshot(), - } as unknown as AgentMessage; - touched = true; - } - return touched ? out : messages; -} - -function normalizeAssistantUsageSnapshot(usage: unknown) { - const normalized = normalizeUsage((usage ?? undefined) as UsageLike | undefined); - if (!normalized) { - return makeZeroUsageSnapshot(); - } - const input = normalized.input ?? 0; - const output = normalized.output ?? 0; - const cacheRead = normalized.cacheRead ?? 0; - const cacheWrite = normalized.cacheWrite ?? 0; - const totalTokens = normalized.total ?? input + output + cacheRead + cacheWrite; - const cost = normalizeAssistantUsageCost(usage); - return { - input, - output, - cacheRead, - cacheWrite, - totalTokens, - ...(cost ? { cost } : {}), - }; -} - -function normalizeAssistantUsageCost(usage: unknown): AssistantUsageSnapshot["cost"] | undefined { - const base = makeZeroUsageSnapshot().cost; - if (!usage || typeof usage !== "object") { - return undefined; - } - const rawCost = (usage as { cost?: unknown }).cost; - if (!rawCost || typeof rawCost !== "object") { - return undefined; - } - const cost = rawCost as Record; - const inputRaw = toFiniteCostNumber(cost.input); - const outputRaw = toFiniteCostNumber(cost.output); - const cacheReadRaw = toFiniteCostNumber(cost.cacheRead); - const cacheWriteRaw = toFiniteCostNumber(cost.cacheWrite); - const totalRaw = toFiniteCostNumber(cost.total); - if ( - inputRaw === undefined && - outputRaw === undefined && - cacheReadRaw === undefined && - cacheWriteRaw === undefined && - totalRaw === undefined - ) { - return undefined; - } - const input = inputRaw ?? base.input; - const output = outputRaw ?? base.output; - const cacheRead = cacheReadRaw ?? base.cacheRead; - const cacheWrite = cacheWriteRaw ?? base.cacheWrite; - const total = totalRaw ?? input + output + cacheRead + cacheWrite; - return { input, output, cacheRead, cacheWrite, total }; -} - -function toFiniteCostNumber(value: unknown): number | undefined { - return typeof value === "number" && Number.isFinite(value) ? value : undefined; -} - -function ensureAssistantUsageSnapshots(messages: AgentMessage[]): AgentMessage[] { - if (messages.length === 0) { - return messages; - } - - let touched = false; - const out = [...messages]; - for (let i = 0; i < out.length; i += 1) { - const message = out[i] as (AgentMessage & { role?: unknown; usage?: unknown }) | undefined; - if (!message || message.role !== "assistant") { - continue; - } - const normalizedUsage = normalizeAssistantUsageSnapshot(message.usage); - const usageCost = - message.usage && typeof message.usage === "object" - ? (message.usage as { cost?: unknown }).cost - : undefined; - const normalizedCost = normalizedUsage.cost; - if ( - message.usage && - typeof message.usage === "object" && - (message.usage as { input?: unknown }).input === normalizedUsage.input && - (message.usage as { output?: unknown }).output === normalizedUsage.output && - (message.usage as { cacheRead?: unknown }).cacheRead === normalizedUsage.cacheRead && - (message.usage as { cacheWrite?: unknown }).cacheWrite === normalizedUsage.cacheWrite && - (message.usage as { totalTokens?: unknown }).totalTokens === normalizedUsage.totalTokens && - ((normalizedCost && - usageCost && - typeof usageCost === "object" && - (usageCost as { input?: unknown }).input === normalizedCost.input && - (usageCost as { output?: unknown }).output === normalizedCost.output && - (usageCost as { cacheRead?: unknown }).cacheRead === normalizedCost.cacheRead && - (usageCost as { cacheWrite?: unknown }).cacheWrite === normalizedCost.cacheWrite && - (usageCost as { total?: unknown }).total === normalizedCost.total) || - (!normalizedCost && usageCost === undefined)) - ) { - continue; - } - out[i] = { - ...(message as unknown as Record), - usage: normalizedUsage, - } as AgentMessage; - touched = true; - } - - return touched ? out : messages; -} +type CustomEntryLike = { type?: unknown; customType?: unknown; data?: unknown }; export function findUnsupportedSchemaKeywords(schema: unknown, path: string): string[] { if (!schema || typeof schema !== "object") { @@ -352,9 +73,7 @@ export function sanitizeToolsForGoogle< provider: string; }): AgentTool[] { // Cloud Code Assist uses the OpenAPI 3.03 `parameters` field for both Gemini - // AND Claude models. This field does not support JSON Schema keywords such as - // patternProperties, additionalProperties, $ref, etc. We must clean schemas - // for every provider that routes through this path. + // AND Claude models. This field does not support many JSON Schema keywords. if (params.provider !== "google-gemini-cli") { return params.tools; } @@ -395,80 +114,6 @@ export function logToolSchemasForGoogle(params: { tools: AgentTool[]; provider: } } -// Event emitter for unhandled compaction failures that escape try-catch blocks. -// Listeners can use this to trigger session recovery with retry. -const compactionFailureEmitter = new EventEmitter(); - -export type CompactionFailureListener = (reason: string) => void; - -/** - * Register a listener for unhandled compaction failures. - * Called when auto-compaction fails in a way that escapes the normal try-catch, - * e.g., when the summarization request itself exceeds the model's token limit. - * Returns an unsubscribe function. - */ -export function onUnhandledCompactionFailure(cb: CompactionFailureListener): () => void { - compactionFailureEmitter.on("failure", cb); - return () => compactionFailureEmitter.off("failure", cb); -} - -registerUnhandledRejectionHandler((reason) => { - const message = describeUnknownError(reason); - if (!isCompactionFailureError(message)) { - return false; - } - log.error(`Auto-compaction failed (unhandled): ${message}`); - compactionFailureEmitter.emit("failure", message); - return true; -}); - -type CustomEntryLike = { type?: unknown; customType?: unknown; data?: unknown }; - -type ModelSnapshotEntry = { - timestamp: number; - provider?: string; - modelApi?: string | null; - modelId?: string; -}; - -const MODEL_SNAPSHOT_CUSTOM_TYPE = "model-snapshot"; - -function readLastModelSnapshot(sessionManager: SessionManager): ModelSnapshotEntry | null { - try { - const entries = sessionManager.getEntries(); - for (let i = entries.length - 1; i >= 0; i--) { - const entry = entries[i] as CustomEntryLike; - if (entry?.type !== "custom" || entry?.customType !== MODEL_SNAPSHOT_CUSTOM_TYPE) { - continue; - } - const data = entry?.data as ModelSnapshotEntry | undefined; - if (data && typeof data === "object") { - return data; - } - } - } catch { - return null; - } - return null; -} - -function appendModelSnapshot(sessionManager: SessionManager, data: ModelSnapshotEntry): void { - try { - sessionManager.appendCustomEntry(MODEL_SNAPSHOT_CUSTOM_TYPE, data); - } catch { - // ignore persistence failures - } -} - -function isSameModelSnapshot(a: ModelSnapshotEntry, b: ModelSnapshotEntry): boolean { - const normalize = (value?: string | null) => value ?? ""; - return ( - normalize(a.provider) === normalize(b.provider) && - normalize(a.modelApi) === normalize(b.modelApi) && - normalize(a.modelId) === normalize(b.modelId) - ); -} - function hasGoogleTurnOrderingMarker(sessionManager: SessionManager): boolean { try { return sessionManager @@ -503,7 +148,7 @@ export function applyGoogleTurnOrderingFix(params: { if (!isGoogleModelApi(params.modelApi)) { return { messages: params.messages, didPrepend: false }; } - const first = params.messages[0] as { role?: unknown; content?: unknown } | undefined; + const first = params.messages[0] as { role?: unknown } | undefined; if (first?.role !== "assistant") { return { messages: params.messages, didPrepend: false }; } @@ -516,97 +161,3 @@ export function applyGoogleTurnOrderingFix(params: { } return { messages: sanitized, didPrepend }; } - -export async function sanitizeSessionHistory(params: { - messages: AgentMessage[]; - modelApi?: string | null; - modelId?: string; - provider?: string; - allowedToolNames?: Iterable; - config?: OpenClawConfig; - sessionManager: SessionManager; - sessionId: string; - policy?: TranscriptPolicy; -}): Promise { - // Keep docs/reference/transcript-hygiene.md in sync with any logic changes here. - const policy = - params.policy ?? - resolveTranscriptPolicy({ - modelApi: params.modelApi, - provider: params.provider, - modelId: params.modelId, - }); - const withInterSessionMarkers = annotateInterSessionUserMessages(params.messages); - const sanitizedImages = await sanitizeSessionMessagesImages( - withInterSessionMarkers, - "session:history", - { - sanitizeMode: policy.sanitizeMode, - sanitizeToolCallIds: policy.sanitizeToolCallIds, - toolCallIdMode: policy.toolCallIdMode, - preserveSignatures: policy.preserveSignatures, - sanitizeThoughtSignatures: policy.sanitizeThoughtSignatures, - ...resolveImageSanitizationLimits(params.config), - }, - ); - const droppedThinking = policy.dropThinkingBlocks - ? dropThinkingBlocks(sanitizedImages) - : sanitizedImages; - const sanitizedToolCalls = sanitizeToolCallInputs(droppedThinking, { - allowedToolNames: params.allowedToolNames, - }); - const repairedTools = policy.repairToolUseResultPairing - ? sanitizeToolUseResultPairing(sanitizedToolCalls) - : sanitizedToolCalls; - const sanitizedToolResults = stripToolResultDetails(repairedTools); - const sanitizedCompactionUsage = ensureAssistantUsageSnapshots( - stripStaleAssistantUsageBeforeLatestCompaction(sanitizedToolResults), - ); - - const isOpenAIResponsesApi = - params.modelApi === "openai-responses" || params.modelApi === "openai-codex-responses"; - const hasSnapshot = Boolean(params.provider || params.modelApi || params.modelId); - const priorSnapshot = hasSnapshot ? readLastModelSnapshot(params.sessionManager) : null; - const modelChanged = priorSnapshot - ? !isSameModelSnapshot(priorSnapshot, { - timestamp: 0, - provider: params.provider, - modelApi: params.modelApi, - modelId: params.modelId, - }) - : false; - const sanitizedOpenAI = isOpenAIResponsesApi - ? downgradeOpenAIFunctionCallReasoningPairs( - downgradeOpenAIReasoningBlocks(sanitizedCompactionUsage), - ) - : sanitizedCompactionUsage; - - if (hasSnapshot && (!priorSnapshot || modelChanged)) { - appendModelSnapshot(params.sessionManager, { - timestamp: Date.now(), - provider: params.provider, - modelApi: params.modelApi, - modelId: params.modelId, - }); - } - - if (!policy.applyGoogleTurnOrdering) { - return sanitizedOpenAI; - } - - // Google models use the full wrapper with logging and session markers. - if (isGoogleModelApi(params.modelApi)) { - return applyGoogleTurnOrderingFix({ - messages: sanitizedOpenAI, - modelApi: params.modelApi, - sessionManager: params.sessionManager, - sessionId: params.sessionId, - }).messages; - } - - // Strict OpenAI-compatible providers (vLLM, Gemma, etc.) also reject - // conversations that start with an assistant turn (e.g. delivery-mirror - // messages after /new). Apply the same ordering fix without the - // Google-specific session markers. See #38962. - return sanitizeGoogleTurnOrdering(sanitizedOpenAI); -} diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index ef36c6deadb..edf976d16de 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -99,11 +99,7 @@ import { appendCacheTtlTimestamp, isCacheTtlEligibleProvider } from "../cache-tt import type { CompactEmbeddedPiSessionParams } from "../compact.js"; import { buildEmbeddedExtensionFactories } from "../extensions.js"; import { applyExtraParamsToAgent } from "../extra-params.js"; -import { - logToolSchemasForGoogle, - sanitizeSessionHistory, - sanitizeToolsForGoogle, -} from "../google.js"; +import { logToolSchemasForGoogle, sanitizeToolsForGoogle } from "../google.js"; import { getDmHistoryLimitFromSessionKey, limitHistoryTurns } from "../history.js"; import { log } from "../logger.js"; import { buildModelAliasLines } from "../model.js"; @@ -125,6 +121,7 @@ import { dropThinkingBlocks } from "../thinking.js"; import { collectAllowedToolNames } from "../tool-name-allowlist.js"; import { installToolResultContextGuard } from "../tool-result-context-guard.js"; import { splitSdkTools } from "../tool-split.js"; +import { sanitizeSessionHistory } from "../transcript-hygiene.js"; import { describeUnknownError, mapThinkingLevel } from "../utils.js"; import { flushPendingToolResultsAfterIdle } from "../wait-for-idle-before-flush.js"; import { waitForCompactionRetryWithAggregateTimeout } from "./compaction-retry-aggregate-timeout.js"; @@ -1774,9 +1771,10 @@ export async function runEmbeddedAttempt( activeSession.agent.streamFn = cacheTrace.wrapStreamFn(activeSession.agent.streamFn); } - // Copilot/Claude can reject persisted `thinking` blocks (e.g. thinkingSignature:"reasoning_text") - // on *any* follow-up provider call (including tool continuations). Wrap the stream function - // so every outbound request sees sanitized messages. + // Copilot/Claude can reject persisted `thinking` blocks on replay, but + // Anthropic also requires the latest assistant turn to stay byte-for-byte + // stable. Strip older replayed thinking blocks while preserving the most + // recent assistant message for every outbound request. if (transcriptPolicy.dropThinkingBlocks) { const inner = activeSession.agent.streamFn; activeSession.agent.streamFn = (model, context, options) => { @@ -1785,7 +1783,9 @@ export async function runEmbeddedAttempt( if (!Array.isArray(messages)) { return inner(model, context, options); } - const sanitized = dropThinkingBlocks(messages as unknown as AgentMessage[]) as unknown; + const sanitized = dropThinkingBlocks(messages as unknown as AgentMessage[], { + preserveLatestAssistant: true, + }) as unknown; if (sanitized === messages) { return inner(model, context, options); } diff --git a/src/agents/pi-embedded-runner/sanitize-session-history.tool-result-details.test.ts b/src/agents/pi-embedded-runner/sanitize-session-history.tool-result-details.test.ts index c888ae2f4ab..1386e359a98 100644 --- a/src/agents/pi-embedded-runner/sanitize-session-history.tool-result-details.test.ts +++ b/src/agents/pi-embedded-runner/sanitize-session-history.tool-result-details.test.ts @@ -3,7 +3,7 @@ import type { ToolResultMessage, UserMessage } from "@mariozechner/pi-ai"; import { SessionManager } from "@mariozechner/pi-coding-agent"; import { describe, expect, it } from "vitest"; import { makeAgentAssistantMessage } from "../test-helpers/agent-message-fixtures.js"; -import { sanitizeSessionHistory } from "./google.js"; +import { sanitizeSessionHistory } from "./transcript-hygiene.js"; describe("sanitizeSessionHistory toolResult details stripping", () => { it("strips toolResult.details so untrusted payloads are not fed back to the model", async () => { diff --git a/src/agents/pi-embedded-runner/thinking.test.ts b/src/agents/pi-embedded-runner/thinking.test.ts index 6a2481748a1..5a42fb93112 100644 --- a/src/agents/pi-embedded-runner/thinking.test.ts +++ b/src/agents/pi-embedded-runner/thinking.test.ts @@ -58,4 +58,34 @@ describe("dropThinkingBlocks", () => { const assistant = result[0] as Extract; expect(assistant.content).toEqual([{ type: "text", text: "" }]); }); + + it("preserves the latest assistant turn when requested", () => { + const messages: AgentMessage[] = [ + castAgentMessage({ role: "user", content: "hello" }), + castAgentMessage({ + role: "assistant", + content: [ + { type: "thinking", thinking: "older reasoning" }, + { type: "text", text: "older answer" }, + ], + }), + castAgentMessage({ role: "user", content: "follow up" }), + castAgentMessage({ + role: "assistant", + content: [ + { type: "thinking", thinking: "latest reasoning" }, + { type: "text", text: "latest answer" }, + ], + }), + ]; + + const result = dropThinkingBlocks(messages, { preserveLatestAssistant: true }); + const olderAssistant = result[1] as Extract; + const latestAssistant = result[3] as Extract; + expect(olderAssistant.content).toEqual([{ type: "text", text: "older answer" }]); + expect(latestAssistant.content).toEqual([ + { type: "thinking", thinking: "latest reasoning" }, + { type: "text", text: "latest answer" }, + ]); + }); }); diff --git a/src/agents/pi-embedded-runner/thinking.ts b/src/agents/pi-embedded-runner/thinking.ts index f503fd3f164..8fcd95db999 100644 --- a/src/agents/pi-embedded-runner/thinking.ts +++ b/src/agents/pi-embedded-runner/thinking.ts @@ -12,6 +12,15 @@ export function isAssistantMessageWithContent(message: AgentMessage): message is ); } +function findLatestAssistantMessageIndex(messages: AgentMessage[]): number { + for (let i = messages.length - 1; i >= 0; i -= 1) { + if (isAssistantMessageWithContent(messages[i])) { + return i; + } + } + return -1; +} + /** * Strip all `type: "thinking"` content blocks from assistant messages. * @@ -19,17 +28,32 @@ export function isAssistantMessageWithContent(message: AgentMessage): message is * a synthetic `{ type: "text", text: "" }` block to preserve turn structure * (some providers require strict user/assistant alternation). * + * When `preserveLatestAssistant` is enabled, the most recent assistant turn is + * left untouched. Anthropic-backed endpoints require the latest assistant + * message to preserve its original `thinking` blocks on replay. + * * Returns the original array reference when nothing was changed (callers can * use reference equality to skip downstream work). */ -export function dropThinkingBlocks(messages: AgentMessage[]): AgentMessage[] { +export function dropThinkingBlocks( + messages: AgentMessage[], + opts?: { preserveLatestAssistant?: boolean }, +): AgentMessage[] { let touched = false; const out: AgentMessage[] = []; - for (const msg of messages) { + const latestAssistantIndex = opts?.preserveLatestAssistant + ? findLatestAssistantMessageIndex(messages) + : -1; + for (let index = 0; index < messages.length; index += 1) { + const msg = messages[index]; if (!isAssistantMessageWithContent(msg)) { out.push(msg); continue; } + if (index === latestAssistantIndex) { + out.push(msg); + continue; + } const nextContent: AssistantContentBlock[] = []; let changed = false; for (const block of msg.content) { diff --git a/src/agents/pi-embedded-runner/transcript-hygiene.ts b/src/agents/pi-embedded-runner/transcript-hygiene.ts new file mode 100644 index 00000000000..9e7d406e1a5 --- /dev/null +++ b/src/agents/pi-embedded-runner/transcript-hygiene.ts @@ -0,0 +1,470 @@ +import type { AgentMessage } from "@mariozechner/pi-agent-core"; +import type { SessionManager } from "@mariozechner/pi-coding-agent"; +import type { OpenClawConfig } from "../../config/config.js"; +import { + hasInterSessionUserProvenance, + normalizeInputProvenance, +} from "../../sessions/input-provenance.js"; +import { resolveImageSanitizationLimits } from "../image-sanitization.js"; +import { + downgradeOpenAIFunctionCallReasoningPairs, + downgradeOpenAIReasoningBlocks, + isGoogleModelApi, + sanitizeGoogleTurnOrdering, + sanitizeSessionMessagesImages, +} from "../pi-embedded-helpers.js"; +import { + sanitizeToolCallInputs, + stripToolResultDetails, + sanitizeToolUseResultPairing, +} from "../session-transcript-repair.js"; +import type { TranscriptPolicy } from "../transcript-policy.js"; +import { resolveTranscriptPolicy } from "../transcript-policy.js"; +import { + makeZeroUsageSnapshot, + normalizeUsage, + type AssistantUsageSnapshot, + type UsageLike, +} from "../usage.js"; +import { applyGoogleTurnOrderingFix } from "./google.js"; +import { dropThinkingBlocks } from "./thinking.js"; + +const INTER_SESSION_PREFIX_BASE = "[Inter-session message]"; +const MODEL_SNAPSHOT_CUSTOM_TYPE = "model-snapshot"; + +type CustomEntryLike = { type?: unknown; customType?: unknown; data?: unknown }; + +type ModelSnapshotEntry = { + timestamp: number; + provider?: string; + modelApi?: string | null; + modelId?: string; +}; + +function hasThinkingLikeBlock(block: unknown): block is { type: "thinking" | "redacted_thinking" } { + if (!block || typeof block !== "object") { + return false; + } + const type = (block as { type?: unknown }).type; + return type === "thinking" || type === "redacted_thinking"; +} + +function findLatestAssistantMessageWithThinking( + messages: AgentMessage[], +): Extract | null { + for (let i = messages.length - 1; i >= 0; i -= 1) { + const message = messages[i]; + if (!message || typeof message !== "object" || message.role !== "assistant") { + continue; + } + const assistant = message; + if (Array.isArray(assistant.content) && assistant.content.some(hasThinkingLikeBlock)) { + return assistant; + } + } + return null; +} + +function restoreLatestAssistantMessageWithThinking( + originalMessages: AgentMessage[], + sanitizedMessages: AgentMessage[], +): AgentMessage[] { + const originalLatestAssistant = findLatestAssistantMessageWithThinking(originalMessages); + if (!originalLatestAssistant) { + return sanitizedMessages; + } + + for (let i = sanitizedMessages.length - 1; i >= 0; i -= 1) { + const candidate = sanitizedMessages[i]; + if (!candidate || typeof candidate !== "object" || candidate.role !== "assistant") { + continue; + } + if (candidate === originalLatestAssistant) { + return sanitizedMessages; + } + const restored = [...sanitizedMessages]; + restored[i] = originalLatestAssistant; + return restored; + } + + return sanitizedMessages; +} + +function buildInterSessionPrefix(message: AgentMessage): string { + const provenance = normalizeInputProvenance((message as { provenance?: unknown }).provenance); + if (!provenance) { + return INTER_SESSION_PREFIX_BASE; + } + const details = [ + provenance.sourceSessionKey ? `sourceSession=${provenance.sourceSessionKey}` : undefined, + provenance.sourceChannel ? `sourceChannel=${provenance.sourceChannel}` : undefined, + provenance.sourceTool ? `sourceTool=${provenance.sourceTool}` : undefined, + ].filter(Boolean); + if (details.length === 0) { + return INTER_SESSION_PREFIX_BASE; + } + return `${INTER_SESSION_PREFIX_BASE} ${details.join(" ")}`; +} + +function annotateInterSessionUserMessages(messages: AgentMessage[]): AgentMessage[] { + let touched = false; + const out: AgentMessage[] = []; + for (const msg of messages) { + if (!hasInterSessionUserProvenance(msg as { role?: unknown; provenance?: unknown })) { + out.push(msg); + continue; + } + const prefix = buildInterSessionPrefix(msg); + const user = msg as Extract; + if (typeof user.content === "string") { + if (user.content.startsWith(prefix)) { + out.push(msg); + continue; + } + touched = true; + out.push({ + ...(msg as unknown as Record), + content: `${prefix}\n${user.content}`, + } as AgentMessage); + continue; + } + if (!Array.isArray(user.content)) { + out.push(msg); + continue; + } + + const textIndex = user.content.findIndex( + (block) => + block && + typeof block === "object" && + (block as { type?: unknown }).type === "text" && + typeof (block as { text?: unknown }).text === "string", + ); + + if (textIndex >= 0) { + const existing = user.content[textIndex] as { type: "text"; text: string }; + if (existing.text.startsWith(prefix)) { + out.push(msg); + continue; + } + const nextContent = [...user.content]; + nextContent[textIndex] = { + ...existing, + text: `${prefix}\n${existing.text}`, + }; + touched = true; + out.push({ + ...(msg as unknown as Record), + content: nextContent, + } as AgentMessage); + continue; + } + + touched = true; + out.push({ + ...(msg as unknown as Record), + content: [{ type: "text", text: prefix }, ...user.content], + } as AgentMessage); + } + return touched ? out : messages; +} + +function parseMessageTimestamp(value: unknown): number | null { + if (typeof value === "number" && Number.isFinite(value)) { + return value; + } + if (typeof value === "string") { + const parsed = Date.parse(value); + if (Number.isFinite(parsed)) { + return parsed; + } + } + return null; +} + +function stripStaleAssistantUsageBeforeLatestCompaction(messages: AgentMessage[]): AgentMessage[] { + let latestCompactionSummaryIndex = -1; + let latestCompactionTimestamp: number | null = null; + for (let i = 0; i < messages.length; i += 1) { + const entry = messages[i]; + if (entry?.role !== "compactionSummary") { + continue; + } + latestCompactionSummaryIndex = i; + latestCompactionTimestamp = parseMessageTimestamp( + (entry as { timestamp?: unknown }).timestamp ?? null, + ); + } + if (latestCompactionSummaryIndex === -1) { + return messages; + } + + const out = [...messages]; + let touched = false; + for (let i = 0; i < out.length; i += 1) { + const candidate = out[i] as + | (AgentMessage & { usage?: unknown; timestamp?: unknown }) + | undefined; + if (!candidate || candidate.role !== "assistant") { + continue; + } + if (!candidate.usage || typeof candidate.usage !== "object") { + continue; + } + + const messageTimestamp = parseMessageTimestamp(candidate.timestamp); + const staleByTimestamp = + latestCompactionTimestamp !== null && + messageTimestamp !== null && + messageTimestamp <= latestCompactionTimestamp; + const staleByLegacyOrdering = i < latestCompactionSummaryIndex; + if (!staleByTimestamp && !staleByLegacyOrdering) { + continue; + } + + // pi-coding-agent expects assistant usage to always be present during context + // accounting. Keep stale snapshots structurally valid, but zeroed out. + const candidateRecord = candidate as unknown as Record; + out[i] = { + ...candidateRecord, + usage: makeZeroUsageSnapshot(), + } as unknown as AgentMessage; + touched = true; + } + return touched ? out : messages; +} + +function normalizeAssistantUsageCost(usage: unknown): AssistantUsageSnapshot["cost"] | undefined { + const base = makeZeroUsageSnapshot().cost; + if (!usage || typeof usage !== "object") { + return undefined; + } + const rawCost = (usage as { cost?: unknown }).cost; + if (!rawCost || typeof rawCost !== "object") { + return undefined; + } + const cost = rawCost as Record; + const inputRaw = toFiniteCostNumber(cost.input); + const outputRaw = toFiniteCostNumber(cost.output); + const cacheReadRaw = toFiniteCostNumber(cost.cacheRead); + const cacheWriteRaw = toFiniteCostNumber(cost.cacheWrite); + const totalRaw = toFiniteCostNumber(cost.total); + if ( + inputRaw === undefined && + outputRaw === undefined && + cacheReadRaw === undefined && + cacheWriteRaw === undefined && + totalRaw === undefined + ) { + return undefined; + } + const input = inputRaw ?? base.input; + const output = outputRaw ?? base.output; + const cacheRead = cacheReadRaw ?? base.cacheRead; + const cacheWrite = cacheWriteRaw ?? base.cacheWrite; + const total = totalRaw ?? input + output + cacheRead + cacheWrite; + return { input, output, cacheRead, cacheWrite, total }; +} + +function normalizeAssistantUsageSnapshot(usage: unknown) { + const normalized = normalizeUsage((usage ?? undefined) as UsageLike | undefined); + if (!normalized) { + return makeZeroUsageSnapshot(); + } + const input = normalized.input ?? 0; + const output = normalized.output ?? 0; + const cacheRead = normalized.cacheRead ?? 0; + const cacheWrite = normalized.cacheWrite ?? 0; + const totalTokens = normalized.total ?? input + output + cacheRead + cacheWrite; + const cost = normalizeAssistantUsageCost(usage); + return { + input, + output, + cacheRead, + cacheWrite, + totalTokens, + ...(cost ? { cost } : {}), + }; +} + +function toFiniteCostNumber(value: unknown): number | undefined { + return typeof value === "number" && Number.isFinite(value) ? value : undefined; +} + +function ensureAssistantUsageSnapshots(messages: AgentMessage[]): AgentMessage[] { + if (messages.length === 0) { + return messages; + } + + let touched = false; + const out = [...messages]; + for (let i = 0; i < out.length; i += 1) { + const message = out[i] as (AgentMessage & { role?: unknown; usage?: unknown }) | undefined; + if (!message || message.role !== "assistant") { + continue; + } + const normalizedUsage = normalizeAssistantUsageSnapshot(message.usage); + const usageCost = + message.usage && typeof message.usage === "object" + ? (message.usage as { cost?: unknown }).cost + : undefined; + const normalizedCost = normalizedUsage.cost; + if ( + message.usage && + typeof message.usage === "object" && + (message.usage as { input?: unknown }).input === normalizedUsage.input && + (message.usage as { output?: unknown }).output === normalizedUsage.output && + (message.usage as { cacheRead?: unknown }).cacheRead === normalizedUsage.cacheRead && + (message.usage as { cacheWrite?: unknown }).cacheWrite === normalizedUsage.cacheWrite && + (message.usage as { totalTokens?: unknown }).totalTokens === normalizedUsage.totalTokens && + ((normalizedCost && + usageCost && + typeof usageCost === "object" && + (usageCost as { input?: unknown }).input === normalizedCost.input && + (usageCost as { output?: unknown }).output === normalizedCost.output && + (usageCost as { cacheRead?: unknown }).cacheRead === normalizedCost.cacheRead && + (usageCost as { cacheWrite?: unknown }).cacheWrite === normalizedCost.cacheWrite && + (usageCost as { total?: unknown }).total === normalizedCost.total) || + (!normalizedCost && usageCost === undefined)) + ) { + continue; + } + out[i] = { + ...(message as unknown as Record), + usage: normalizedUsage, + } as AgentMessage; + touched = true; + } + + return touched ? out : messages; +} + +function readLastModelSnapshot(sessionManager: SessionManager): ModelSnapshotEntry | null { + try { + const entries = sessionManager.getEntries(); + for (let i = entries.length - 1; i >= 0; i -= 1) { + const entry = entries[i] as CustomEntryLike; + if (entry?.type !== "custom" || entry?.customType !== MODEL_SNAPSHOT_CUSTOM_TYPE) { + continue; + } + const data = entry?.data as ModelSnapshotEntry | undefined; + if (data && typeof data === "object") { + return data; + } + } + } catch { + return null; + } + return null; +} + +function appendModelSnapshot(sessionManager: SessionManager, data: ModelSnapshotEntry): void { + try { + sessionManager.appendCustomEntry(MODEL_SNAPSHOT_CUSTOM_TYPE, data); + } catch { + // ignore persistence failures + } +} + +function isSameModelSnapshot(a: ModelSnapshotEntry, b: ModelSnapshotEntry): boolean { + const normalize = (value?: string | null) => value ?? ""; + return ( + normalize(a.provider) === normalize(b.provider) && + normalize(a.modelApi) === normalize(b.modelApi) && + normalize(a.modelId) === normalize(b.modelId) + ); +} + +export async function sanitizeSessionHistory(params: { + messages: AgentMessage[]; + modelApi?: string | null; + modelId?: string; + provider?: string; + allowedToolNames?: Iterable; + config?: OpenClawConfig; + sessionManager: SessionManager; + sessionId: string; + policy?: TranscriptPolicy; +}): Promise { + // Keep docs/reference/transcript-hygiene.md in sync with any logic changes here. + const policy = + params.policy ?? + resolveTranscriptPolicy({ + modelApi: params.modelApi, + provider: params.provider, + modelId: params.modelId, + }); + const withInterSessionMarkers = annotateInterSessionUserMessages(params.messages); + const sanitizedImages = await sanitizeSessionMessagesImages( + withInterSessionMarkers, + "session:history", + { + sanitizeMode: policy.sanitizeMode, + sanitizeToolCallIds: policy.sanitizeToolCallIds, + toolCallIdMode: policy.toolCallIdMode, + preserveSignatures: policy.preserveSignatures, + sanitizeThoughtSignatures: policy.sanitizeThoughtSignatures, + ...resolveImageSanitizationLimits(params.config), + }, + ); + const droppedThinking = policy.dropThinkingBlocks + ? dropThinkingBlocks(sanitizedImages, { preserveLatestAssistant: true }) + : sanitizedImages; + const sanitizedToolCalls = sanitizeToolCallInputs(droppedThinking, { + allowedToolNames: params.allowedToolNames, + }); + const repairedTools = policy.repairToolUseResultPairing + ? sanitizeToolUseResultPairing(sanitizedToolCalls) + : sanitizedToolCalls; + const sanitizedToolResults = stripToolResultDetails(repairedTools); + const sanitizedCompactionUsage = ensureAssistantUsageSnapshots( + stripStaleAssistantUsageBeforeLatestCompaction(sanitizedToolResults), + ); + + const isOpenAIResponsesApi = + params.modelApi === "openai-responses" || params.modelApi === "openai-codex-responses"; + const hasSnapshot = Boolean(params.provider || params.modelApi || params.modelId); + const priorSnapshot = hasSnapshot ? readLastModelSnapshot(params.sessionManager) : null; + const modelChanged = priorSnapshot + ? !isSameModelSnapshot(priorSnapshot, { + timestamp: 0, + provider: params.provider, + modelApi: params.modelApi, + modelId: params.modelId, + }) + : false; + const sanitizedOpenAI = isOpenAIResponsesApi + ? downgradeOpenAIFunctionCallReasoningPairs( + downgradeOpenAIReasoningBlocks(sanitizedCompactionUsage), + ) + : sanitizedCompactionUsage; + const stableLatestAssistant = restoreLatestAssistantMessageWithThinking( + params.messages, + sanitizedOpenAI, + ); + + if (hasSnapshot && (!priorSnapshot || modelChanged)) { + appendModelSnapshot(params.sessionManager, { + timestamp: Date.now(), + provider: params.provider, + modelApi: params.modelApi, + modelId: params.modelId, + }); + } + + if (!policy.applyGoogleTurnOrdering) { + return stableLatestAssistant; + } + + if (isGoogleModelApi(params.modelApi)) { + return applyGoogleTurnOrderingFix({ + messages: stableLatestAssistant, + modelApi: params.modelApi, + sessionManager: params.sessionManager, + sessionId: params.sessionId, + }).messages; + } + + // Strict OpenAI-compatible providers also reject assistant-first histories. + return sanitizeGoogleTurnOrdering(stableLatestAssistant); +}