mirror of https://github.com/openclaw/openclaw.git
Agents: split transcript hygiene and preserve latest reasoning turns
This commit is contained in:
parent
83865f1921
commit
de35fba9b3
|
|
@ -37,7 +37,7 @@ If you need transcript storage details, see:
|
|||
All transcript hygiene is centralized in the embedded runner:
|
||||
|
||||
- Policy selection: `src/agents/transcript-policy.ts`
|
||||
- Sanitization/repair application: `sanitizeSessionHistory` in `src/agents/pi-embedded-runner/google.ts`
|
||||
- Sanitization/repair application: `sanitizeSessionHistory` in `src/agents/pi-embedded-runner/transcript-hygiene.ts`
|
||||
|
||||
The policy uses `provider`, `modelApi`, and `modelId` to decide what to apply.
|
||||
|
||||
|
|
@ -118,6 +118,12 @@ external end-user instructions.
|
|||
|
||||
- Tool result pairing repair and synthetic tool results.
|
||||
- Turn validation (merge consecutive user turns to satisfy strict alternation).
|
||||
- Preserve the latest assistant turn verbatim when it contains `thinking`/`redacted_thinking` blocks.
|
||||
|
||||
**GitHub Copilot Claude**
|
||||
|
||||
- Drop persisted `thinking` blocks from older assistant turns with invalid Copilot signatures.
|
||||
- Preserve the latest assistant turn unchanged so Anthropic replay requirements stay intact.
|
||||
|
||||
**Mistral (including model-id based detection)**
|
||||
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@ import {
|
|||
makeInMemorySessionManager,
|
||||
makeModelSnapshotEntry,
|
||||
} from "./pi-embedded-runner.sanitize-session-history.test-harness.js";
|
||||
import { sanitizeSessionHistory } from "./pi-embedded-runner/google.js";
|
||||
import { sanitizeSessionHistory } from "./pi-embedded-runner/transcript-hygiene.js";
|
||||
import { castAgentMessage } from "./test-helpers/agent-message-fixtures.js";
|
||||
|
||||
describe("sanitizeSessionHistory openai tool id preservation", () => {
|
||||
|
|
|
|||
|
|
@ -57,7 +57,7 @@ export function makeSimpleUserMessages(): AgentMessage[] {
|
|||
export async function loadSanitizeSessionHistoryWithCleanMocks(): Promise<SanitizeSessionHistoryFn> {
|
||||
vi.resetAllMocks();
|
||||
vi.mocked(helpers.sanitizeSessionMessagesImages).mockImplementation(async (msgs) => msgs);
|
||||
const mod = await import("./pi-embedded-runner/google.js");
|
||||
const mod = await import("./pi-embedded-runner/transcript-hygiene.js");
|
||||
return mod.sanitizeSessionHistory;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -175,14 +175,14 @@ describe("sanitizeSessionHistory", () => {
|
|||
});
|
||||
});
|
||||
|
||||
it("sanitizes tool call ids with strict9 for Mistral models", async () => {
|
||||
it("sanitizes tool call ids with strict9 for Mistral providers", async () => {
|
||||
setNonGoogleModelApi();
|
||||
|
||||
await sanitizeSessionHistory({
|
||||
messages: mockMessages,
|
||||
modelApi: "openai-responses",
|
||||
provider: "openrouter",
|
||||
modelId: "mistralai/devstral-2512:free",
|
||||
provider: "mistral",
|
||||
modelId: "codestral-latest",
|
||||
sessionManager: mockSessionManager,
|
||||
sessionId: TEST_SESSION_ID,
|
||||
});
|
||||
|
|
@ -720,17 +720,7 @@ describe("sanitizeSessionHistory", () => {
|
|||
).toBe(false);
|
||||
});
|
||||
|
||||
it("drops assistant thinking blocks for github-copilot models", async () => {
|
||||
setNonGoogleModelApi();
|
||||
|
||||
const messages = makeThinkingAndTextAssistantMessages("reasoning_text");
|
||||
|
||||
const result = await sanitizeGithubCopilotHistory({ messages });
|
||||
const assistant = getAssistantMessage(result);
|
||||
expect(assistant.content).toEqual([{ type: "text", text: "hi" }]);
|
||||
});
|
||||
|
||||
it("preserves assistant turn when all content is thinking blocks (github-copilot)", async () => {
|
||||
it("drops thinking blocks from older github-copilot assistant turns", async () => {
|
||||
setNonGoogleModelApi();
|
||||
|
||||
const messages: AgentMessage[] = [
|
||||
|
|
@ -738,22 +728,50 @@ describe("sanitizeSessionHistory", () => {
|
|||
makeAssistantMessage([
|
||||
{
|
||||
type: "thinking",
|
||||
thinking: "some reasoning",
|
||||
thinking: "older reasoning",
|
||||
thinkingSignature: "reasoning_text",
|
||||
},
|
||||
{ type: "text", text: "older answer" },
|
||||
]),
|
||||
makeUserMessage("follow up"),
|
||||
makeAssistantMessage([{ type: "text", text: "latest answer" }]),
|
||||
];
|
||||
|
||||
const result = await sanitizeGithubCopilotHistory({ messages });
|
||||
|
||||
// Assistant turn should be preserved (not dropped) to maintain turn alternation
|
||||
expect(result).toHaveLength(3);
|
||||
const assistant = getAssistantMessage(result);
|
||||
expect(assistant.content).toEqual([{ type: "text", text: "" }]);
|
||||
expect(assistant.content).toEqual([{ type: "text", text: "older answer" }]);
|
||||
});
|
||||
|
||||
it("preserves tool_use blocks when dropping thinking blocks (github-copilot)", async () => {
|
||||
it("preserves the latest github-copilot assistant turn unchanged", async () => {
|
||||
setNonGoogleModelApi();
|
||||
|
||||
const messages: AgentMessage[] = [
|
||||
makeUserMessage("hello"),
|
||||
makeAssistantMessage([{ type: "text", text: "previous answer" }]),
|
||||
makeUserMessage("follow up"),
|
||||
makeAssistantMessage([
|
||||
{
|
||||
type: "thinking",
|
||||
thinking: "some reasoning",
|
||||
thinkingSignature: "reasoning_text",
|
||||
},
|
||||
{ type: "text", text: "latest answer" },
|
||||
]),
|
||||
];
|
||||
|
||||
const result = await sanitizeGithubCopilotHistory({ messages });
|
||||
const latestAssistant = result.at(-1) as Extract<AgentMessage, { role: "assistant" }>;
|
||||
expect(latestAssistant.content).toEqual([
|
||||
{
|
||||
type: "thinking",
|
||||
thinking: "some reasoning",
|
||||
thinkingSignature: "reasoning_text",
|
||||
},
|
||||
{ type: "text", text: "latest answer" },
|
||||
]);
|
||||
});
|
||||
|
||||
it("preserves tool_use blocks when dropping thinking blocks from older github-copilot turns", async () => {
|
||||
setNonGoogleModelApi();
|
||||
|
||||
const messages: AgentMessage[] = [
|
||||
|
|
@ -767,6 +785,8 @@ describe("sanitizeSessionHistory", () => {
|
|||
{ type: "toolCall", id: "tool_123", name: "read", arguments: { path: "/tmp/test" } },
|
||||
{ type: "text", text: "Let me read that file." },
|
||||
]),
|
||||
makeUserMessage("keep latest stable"),
|
||||
makeAssistantMessage([{ type: "text", text: "latest answer" }]),
|
||||
];
|
||||
|
||||
const result = await sanitizeGithubCopilotHistory({ messages });
|
||||
|
|
@ -794,6 +814,35 @@ describe("sanitizeSessionHistory", () => {
|
|||
expect(types).toContain("thinking");
|
||||
});
|
||||
|
||||
it("preserves the latest anthropic assistant turn with thinking blocks verbatim", async () => {
|
||||
setNonGoogleModelApi();
|
||||
|
||||
const messages: AgentMessage[] = [
|
||||
makeUserMessage("hello"),
|
||||
makeAssistantMessage(
|
||||
[
|
||||
{ type: "text", text: "previous answer" },
|
||||
{ type: "thinking", thinking: "latest reasoning", thinkingSignature: "sig" },
|
||||
{ type: "text", text: "" },
|
||||
{ type: "text", text: "latest answer" },
|
||||
],
|
||||
{ timestamp: nextTimestamp() },
|
||||
),
|
||||
makeUserMessage("follow up"),
|
||||
];
|
||||
|
||||
const result = await sanitizeSessionHistory({
|
||||
messages,
|
||||
modelApi: "anthropic-messages",
|
||||
provider: "anthropic",
|
||||
modelId: "claude-opus-4-6",
|
||||
sessionManager: makeMockSessionManager(),
|
||||
sessionId: TEST_SESSION_ID,
|
||||
});
|
||||
|
||||
expect(result[1]).toEqual(messages[1]);
|
||||
});
|
||||
|
||||
it("does not drop thinking blocks for non-claude copilot models", async () => {
|
||||
setNonGoogleModelApi();
|
||||
|
||||
|
|
|
|||
|
|
@ -73,11 +73,7 @@ import {
|
|||
EMBEDDED_COMPACTION_TIMEOUT_MS,
|
||||
} from "./compaction-safety-timeout.js";
|
||||
import { buildEmbeddedExtensionFactories } from "./extensions.js";
|
||||
import {
|
||||
logToolSchemasForGoogle,
|
||||
sanitizeSessionHistory,
|
||||
sanitizeToolsForGoogle,
|
||||
} from "./google.js";
|
||||
import { logToolSchemasForGoogle, sanitizeToolsForGoogle } from "./google.js";
|
||||
import { getDmHistoryLimitFromSessionKey, limitHistoryTurns } from "./history.js";
|
||||
import { resolveGlobalLane, resolveSessionLane } from "./lanes.js";
|
||||
import { log } from "./logger.js";
|
||||
|
|
@ -92,6 +88,7 @@ import {
|
|||
} from "./system-prompt.js";
|
||||
import { collectAllowedToolNames } from "./tool-name-allowlist.js";
|
||||
import { splitSdkTools } from "./tool-split.js";
|
||||
import { sanitizeSessionHistory } from "./transcript-hygiene.js";
|
||||
import type { EmbeddedPiCompactResult } from "./types.js";
|
||||
import { describeUnknownError, mapThinkingLevel } from "./utils.js";
|
||||
import { flushPendingToolResultsAfterIdle } from "./wait-for-idle-before-flush.js";
|
||||
|
|
|
|||
|
|
@ -0,0 +1,24 @@
|
|||
import { EventEmitter } from "node:events";
|
||||
import { registerUnhandledRejectionHandler } from "../../infra/unhandled-rejections.js";
|
||||
import { isCompactionFailureError } from "../pi-embedded-helpers.js";
|
||||
import { log } from "./logger.js";
|
||||
import { describeUnknownError } from "./utils.js";
|
||||
|
||||
const compactionFailureEmitter = new EventEmitter();
|
||||
|
||||
export type CompactionFailureListener = (reason: string) => void;
|
||||
|
||||
export function onUnhandledCompactionFailure(cb: CompactionFailureListener): () => void {
|
||||
compactionFailureEmitter.on("failure", cb);
|
||||
return () => compactionFailureEmitter.off("failure", cb);
|
||||
}
|
||||
|
||||
registerUnhandledRejectionHandler((reason) => {
|
||||
const message = describeUnknownError(reason);
|
||||
if (!isCompactionFailureError(message)) {
|
||||
return false;
|
||||
}
|
||||
log.error(`Auto-compaction failed (unhandled): ${message}`);
|
||||
compactionFailureEmitter.emit("failure", message);
|
||||
return true;
|
||||
});
|
||||
|
|
@ -1,39 +1,9 @@
|
|||
import { EventEmitter } from "node:events";
|
||||
import type { AgentMessage, AgentTool } from "@mariozechner/pi-agent-core";
|
||||
import type { SessionManager } from "@mariozechner/pi-coding-agent";
|
||||
import type { TSchema } from "@sinclair/typebox";
|
||||
import type { OpenClawConfig } from "../../config/config.js";
|
||||
import { registerUnhandledRejectionHandler } from "../../infra/unhandled-rejections.js";
|
||||
import {
|
||||
hasInterSessionUserProvenance,
|
||||
normalizeInputProvenance,
|
||||
} from "../../sessions/input-provenance.js";
|
||||
import { resolveImageSanitizationLimits } from "../image-sanitization.js";
|
||||
import {
|
||||
downgradeOpenAIFunctionCallReasoningPairs,
|
||||
downgradeOpenAIReasoningBlocks,
|
||||
isCompactionFailureError,
|
||||
isGoogleModelApi,
|
||||
sanitizeGoogleTurnOrdering,
|
||||
sanitizeSessionMessagesImages,
|
||||
} from "../pi-embedded-helpers.js";
|
||||
import { isGoogleModelApi, sanitizeGoogleTurnOrdering } from "../pi-embedded-helpers.js";
|
||||
import { cleanToolSchemaForGemini } from "../pi-tools.schema.js";
|
||||
import {
|
||||
sanitizeToolCallInputs,
|
||||
stripToolResultDetails,
|
||||
sanitizeToolUseResultPairing,
|
||||
} from "../session-transcript-repair.js";
|
||||
import type { TranscriptPolicy } from "../transcript-policy.js";
|
||||
import { resolveTranscriptPolicy } from "../transcript-policy.js";
|
||||
import {
|
||||
makeZeroUsageSnapshot,
|
||||
normalizeUsage,
|
||||
type AssistantUsageSnapshot,
|
||||
type UsageLike,
|
||||
} from "../usage.js";
|
||||
import { log } from "./logger.js";
|
||||
import { dropThinkingBlocks } from "./thinking.js";
|
||||
import { describeUnknownError } from "./utils.js";
|
||||
|
||||
const GOOGLE_TURN_ORDERING_CUSTOM_TYPE = "google-turn-ordering-bootstrap";
|
||||
const GOOGLE_SCHEMA_UNSUPPORTED_KEYWORDS = new Set([
|
||||
|
|
@ -59,256 +29,7 @@ const GOOGLE_SCHEMA_UNSUPPORTED_KEYWORDS = new Set([
|
|||
"maxProperties",
|
||||
]);
|
||||
|
||||
const INTER_SESSION_PREFIX_BASE = "[Inter-session message]";
|
||||
|
||||
function buildInterSessionPrefix(message: AgentMessage): string {
|
||||
const provenance = normalizeInputProvenance((message as { provenance?: unknown }).provenance);
|
||||
if (!provenance) {
|
||||
return INTER_SESSION_PREFIX_BASE;
|
||||
}
|
||||
const details = [
|
||||
provenance.sourceSessionKey ? `sourceSession=${provenance.sourceSessionKey}` : undefined,
|
||||
provenance.sourceChannel ? `sourceChannel=${provenance.sourceChannel}` : undefined,
|
||||
provenance.sourceTool ? `sourceTool=${provenance.sourceTool}` : undefined,
|
||||
].filter(Boolean);
|
||||
if (details.length === 0) {
|
||||
return INTER_SESSION_PREFIX_BASE;
|
||||
}
|
||||
return `${INTER_SESSION_PREFIX_BASE} ${details.join(" ")}`;
|
||||
}
|
||||
|
||||
function annotateInterSessionUserMessages(messages: AgentMessage[]): AgentMessage[] {
|
||||
let touched = false;
|
||||
const out: AgentMessage[] = [];
|
||||
for (const msg of messages) {
|
||||
if (!hasInterSessionUserProvenance(msg as { role?: unknown; provenance?: unknown })) {
|
||||
out.push(msg);
|
||||
continue;
|
||||
}
|
||||
const prefix = buildInterSessionPrefix(msg);
|
||||
const user = msg as Extract<AgentMessage, { role: "user" }>;
|
||||
if (typeof user.content === "string") {
|
||||
if (user.content.startsWith(prefix)) {
|
||||
out.push(msg);
|
||||
continue;
|
||||
}
|
||||
touched = true;
|
||||
out.push({
|
||||
...(msg as unknown as Record<string, unknown>),
|
||||
content: `${prefix}\n${user.content}`,
|
||||
} as AgentMessage);
|
||||
continue;
|
||||
}
|
||||
if (!Array.isArray(user.content)) {
|
||||
out.push(msg);
|
||||
continue;
|
||||
}
|
||||
|
||||
const textIndex = user.content.findIndex(
|
||||
(block) =>
|
||||
block &&
|
||||
typeof block === "object" &&
|
||||
(block as { type?: unknown }).type === "text" &&
|
||||
typeof (block as { text?: unknown }).text === "string",
|
||||
);
|
||||
|
||||
if (textIndex >= 0) {
|
||||
const existing = user.content[textIndex] as { type: "text"; text: string };
|
||||
if (existing.text.startsWith(prefix)) {
|
||||
out.push(msg);
|
||||
continue;
|
||||
}
|
||||
const nextContent = [...user.content];
|
||||
nextContent[textIndex] = {
|
||||
...existing,
|
||||
text: `${prefix}\n${existing.text}`,
|
||||
};
|
||||
touched = true;
|
||||
out.push({
|
||||
...(msg as unknown as Record<string, unknown>),
|
||||
content: nextContent,
|
||||
} as AgentMessage);
|
||||
continue;
|
||||
}
|
||||
|
||||
touched = true;
|
||||
out.push({
|
||||
...(msg as unknown as Record<string, unknown>),
|
||||
content: [{ type: "text", text: prefix }, ...user.content],
|
||||
} as AgentMessage);
|
||||
}
|
||||
return touched ? out : messages;
|
||||
}
|
||||
|
||||
function parseMessageTimestamp(value: unknown): number | null {
|
||||
if (typeof value === "number" && Number.isFinite(value)) {
|
||||
return value;
|
||||
}
|
||||
if (typeof value === "string") {
|
||||
const parsed = Date.parse(value);
|
||||
if (Number.isFinite(parsed)) {
|
||||
return parsed;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
function stripStaleAssistantUsageBeforeLatestCompaction(messages: AgentMessage[]): AgentMessage[] {
|
||||
let latestCompactionSummaryIndex = -1;
|
||||
let latestCompactionTimestamp: number | null = null;
|
||||
for (let i = 0; i < messages.length; i += 1) {
|
||||
const entry = messages[i];
|
||||
if (entry?.role !== "compactionSummary") {
|
||||
continue;
|
||||
}
|
||||
latestCompactionSummaryIndex = i;
|
||||
latestCompactionTimestamp = parseMessageTimestamp(
|
||||
(entry as { timestamp?: unknown }).timestamp ?? null,
|
||||
);
|
||||
}
|
||||
if (latestCompactionSummaryIndex === -1) {
|
||||
return messages;
|
||||
}
|
||||
|
||||
const out = [...messages];
|
||||
let touched = false;
|
||||
for (let i = 0; i < out.length; i += 1) {
|
||||
const candidate = out[i] as
|
||||
| (AgentMessage & { usage?: unknown; timestamp?: unknown })
|
||||
| undefined;
|
||||
if (!candidate || candidate.role !== "assistant") {
|
||||
continue;
|
||||
}
|
||||
if (!candidate.usage || typeof candidate.usage !== "object") {
|
||||
continue;
|
||||
}
|
||||
|
||||
const messageTimestamp = parseMessageTimestamp(candidate.timestamp);
|
||||
const staleByTimestamp =
|
||||
latestCompactionTimestamp !== null &&
|
||||
messageTimestamp !== null &&
|
||||
messageTimestamp <= latestCompactionTimestamp;
|
||||
const staleByLegacyOrdering = i < latestCompactionSummaryIndex;
|
||||
if (!staleByTimestamp && !staleByLegacyOrdering) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// pi-coding-agent expects assistant usage to always be present during context
|
||||
// accounting. Keep stale snapshots structurally valid, but zeroed out.
|
||||
const candidateRecord = candidate as unknown as Record<string, unknown>;
|
||||
out[i] = {
|
||||
...candidateRecord,
|
||||
usage: makeZeroUsageSnapshot(),
|
||||
} as unknown as AgentMessage;
|
||||
touched = true;
|
||||
}
|
||||
return touched ? out : messages;
|
||||
}
|
||||
|
||||
function normalizeAssistantUsageSnapshot(usage: unknown) {
|
||||
const normalized = normalizeUsage((usage ?? undefined) as UsageLike | undefined);
|
||||
if (!normalized) {
|
||||
return makeZeroUsageSnapshot();
|
||||
}
|
||||
const input = normalized.input ?? 0;
|
||||
const output = normalized.output ?? 0;
|
||||
const cacheRead = normalized.cacheRead ?? 0;
|
||||
const cacheWrite = normalized.cacheWrite ?? 0;
|
||||
const totalTokens = normalized.total ?? input + output + cacheRead + cacheWrite;
|
||||
const cost = normalizeAssistantUsageCost(usage);
|
||||
return {
|
||||
input,
|
||||
output,
|
||||
cacheRead,
|
||||
cacheWrite,
|
||||
totalTokens,
|
||||
...(cost ? { cost } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
function normalizeAssistantUsageCost(usage: unknown): AssistantUsageSnapshot["cost"] | undefined {
|
||||
const base = makeZeroUsageSnapshot().cost;
|
||||
if (!usage || typeof usage !== "object") {
|
||||
return undefined;
|
||||
}
|
||||
const rawCost = (usage as { cost?: unknown }).cost;
|
||||
if (!rawCost || typeof rawCost !== "object") {
|
||||
return undefined;
|
||||
}
|
||||
const cost = rawCost as Record<string, unknown>;
|
||||
const inputRaw = toFiniteCostNumber(cost.input);
|
||||
const outputRaw = toFiniteCostNumber(cost.output);
|
||||
const cacheReadRaw = toFiniteCostNumber(cost.cacheRead);
|
||||
const cacheWriteRaw = toFiniteCostNumber(cost.cacheWrite);
|
||||
const totalRaw = toFiniteCostNumber(cost.total);
|
||||
if (
|
||||
inputRaw === undefined &&
|
||||
outputRaw === undefined &&
|
||||
cacheReadRaw === undefined &&
|
||||
cacheWriteRaw === undefined &&
|
||||
totalRaw === undefined
|
||||
) {
|
||||
return undefined;
|
||||
}
|
||||
const input = inputRaw ?? base.input;
|
||||
const output = outputRaw ?? base.output;
|
||||
const cacheRead = cacheReadRaw ?? base.cacheRead;
|
||||
const cacheWrite = cacheWriteRaw ?? base.cacheWrite;
|
||||
const total = totalRaw ?? input + output + cacheRead + cacheWrite;
|
||||
return { input, output, cacheRead, cacheWrite, total };
|
||||
}
|
||||
|
||||
function toFiniteCostNumber(value: unknown): number | undefined {
|
||||
return typeof value === "number" && Number.isFinite(value) ? value : undefined;
|
||||
}
|
||||
|
||||
function ensureAssistantUsageSnapshots(messages: AgentMessage[]): AgentMessage[] {
|
||||
if (messages.length === 0) {
|
||||
return messages;
|
||||
}
|
||||
|
||||
let touched = false;
|
||||
const out = [...messages];
|
||||
for (let i = 0; i < out.length; i += 1) {
|
||||
const message = out[i] as (AgentMessage & { role?: unknown; usage?: unknown }) | undefined;
|
||||
if (!message || message.role !== "assistant") {
|
||||
continue;
|
||||
}
|
||||
const normalizedUsage = normalizeAssistantUsageSnapshot(message.usage);
|
||||
const usageCost =
|
||||
message.usage && typeof message.usage === "object"
|
||||
? (message.usage as { cost?: unknown }).cost
|
||||
: undefined;
|
||||
const normalizedCost = normalizedUsage.cost;
|
||||
if (
|
||||
message.usage &&
|
||||
typeof message.usage === "object" &&
|
||||
(message.usage as { input?: unknown }).input === normalizedUsage.input &&
|
||||
(message.usage as { output?: unknown }).output === normalizedUsage.output &&
|
||||
(message.usage as { cacheRead?: unknown }).cacheRead === normalizedUsage.cacheRead &&
|
||||
(message.usage as { cacheWrite?: unknown }).cacheWrite === normalizedUsage.cacheWrite &&
|
||||
(message.usage as { totalTokens?: unknown }).totalTokens === normalizedUsage.totalTokens &&
|
||||
((normalizedCost &&
|
||||
usageCost &&
|
||||
typeof usageCost === "object" &&
|
||||
(usageCost as { input?: unknown }).input === normalizedCost.input &&
|
||||
(usageCost as { output?: unknown }).output === normalizedCost.output &&
|
||||
(usageCost as { cacheRead?: unknown }).cacheRead === normalizedCost.cacheRead &&
|
||||
(usageCost as { cacheWrite?: unknown }).cacheWrite === normalizedCost.cacheWrite &&
|
||||
(usageCost as { total?: unknown }).total === normalizedCost.total) ||
|
||||
(!normalizedCost && usageCost === undefined))
|
||||
) {
|
||||
continue;
|
||||
}
|
||||
out[i] = {
|
||||
...(message as unknown as Record<string, unknown>),
|
||||
usage: normalizedUsage,
|
||||
} as AgentMessage;
|
||||
touched = true;
|
||||
}
|
||||
|
||||
return touched ? out : messages;
|
||||
}
|
||||
type CustomEntryLike = { type?: unknown; customType?: unknown; data?: unknown };
|
||||
|
||||
export function findUnsupportedSchemaKeywords(schema: unknown, path: string): string[] {
|
||||
if (!schema || typeof schema !== "object") {
|
||||
|
|
@ -352,9 +73,7 @@ export function sanitizeToolsForGoogle<
|
|||
provider: string;
|
||||
}): AgentTool<TSchemaType, TResult>[] {
|
||||
// Cloud Code Assist uses the OpenAPI 3.03 `parameters` field for both Gemini
|
||||
// AND Claude models. This field does not support JSON Schema keywords such as
|
||||
// patternProperties, additionalProperties, $ref, etc. We must clean schemas
|
||||
// for every provider that routes through this path.
|
||||
// AND Claude models. This field does not support many JSON Schema keywords.
|
||||
if (params.provider !== "google-gemini-cli") {
|
||||
return params.tools;
|
||||
}
|
||||
|
|
@ -395,80 +114,6 @@ export function logToolSchemasForGoogle(params: { tools: AgentTool[]; provider:
|
|||
}
|
||||
}
|
||||
|
||||
// Event emitter for unhandled compaction failures that escape try-catch blocks.
|
||||
// Listeners can use this to trigger session recovery with retry.
|
||||
const compactionFailureEmitter = new EventEmitter();
|
||||
|
||||
export type CompactionFailureListener = (reason: string) => void;
|
||||
|
||||
/**
|
||||
* Register a listener for unhandled compaction failures.
|
||||
* Called when auto-compaction fails in a way that escapes the normal try-catch,
|
||||
* e.g., when the summarization request itself exceeds the model's token limit.
|
||||
* Returns an unsubscribe function.
|
||||
*/
|
||||
export function onUnhandledCompactionFailure(cb: CompactionFailureListener): () => void {
|
||||
compactionFailureEmitter.on("failure", cb);
|
||||
return () => compactionFailureEmitter.off("failure", cb);
|
||||
}
|
||||
|
||||
registerUnhandledRejectionHandler((reason) => {
|
||||
const message = describeUnknownError(reason);
|
||||
if (!isCompactionFailureError(message)) {
|
||||
return false;
|
||||
}
|
||||
log.error(`Auto-compaction failed (unhandled): ${message}`);
|
||||
compactionFailureEmitter.emit("failure", message);
|
||||
return true;
|
||||
});
|
||||
|
||||
type CustomEntryLike = { type?: unknown; customType?: unknown; data?: unknown };
|
||||
|
||||
type ModelSnapshotEntry = {
|
||||
timestamp: number;
|
||||
provider?: string;
|
||||
modelApi?: string | null;
|
||||
modelId?: string;
|
||||
};
|
||||
|
||||
const MODEL_SNAPSHOT_CUSTOM_TYPE = "model-snapshot";
|
||||
|
||||
function readLastModelSnapshot(sessionManager: SessionManager): ModelSnapshotEntry | null {
|
||||
try {
|
||||
const entries = sessionManager.getEntries();
|
||||
for (let i = entries.length - 1; i >= 0; i--) {
|
||||
const entry = entries[i] as CustomEntryLike;
|
||||
if (entry?.type !== "custom" || entry?.customType !== MODEL_SNAPSHOT_CUSTOM_TYPE) {
|
||||
continue;
|
||||
}
|
||||
const data = entry?.data as ModelSnapshotEntry | undefined;
|
||||
if (data && typeof data === "object") {
|
||||
return data;
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
function appendModelSnapshot(sessionManager: SessionManager, data: ModelSnapshotEntry): void {
|
||||
try {
|
||||
sessionManager.appendCustomEntry(MODEL_SNAPSHOT_CUSTOM_TYPE, data);
|
||||
} catch {
|
||||
// ignore persistence failures
|
||||
}
|
||||
}
|
||||
|
||||
function isSameModelSnapshot(a: ModelSnapshotEntry, b: ModelSnapshotEntry): boolean {
|
||||
const normalize = (value?: string | null) => value ?? "";
|
||||
return (
|
||||
normalize(a.provider) === normalize(b.provider) &&
|
||||
normalize(a.modelApi) === normalize(b.modelApi) &&
|
||||
normalize(a.modelId) === normalize(b.modelId)
|
||||
);
|
||||
}
|
||||
|
||||
function hasGoogleTurnOrderingMarker(sessionManager: SessionManager): boolean {
|
||||
try {
|
||||
return sessionManager
|
||||
|
|
@ -503,7 +148,7 @@ export function applyGoogleTurnOrderingFix(params: {
|
|||
if (!isGoogleModelApi(params.modelApi)) {
|
||||
return { messages: params.messages, didPrepend: false };
|
||||
}
|
||||
const first = params.messages[0] as { role?: unknown; content?: unknown } | undefined;
|
||||
const first = params.messages[0] as { role?: unknown } | undefined;
|
||||
if (first?.role !== "assistant") {
|
||||
return { messages: params.messages, didPrepend: false };
|
||||
}
|
||||
|
|
@ -516,97 +161,3 @@ export function applyGoogleTurnOrderingFix(params: {
|
|||
}
|
||||
return { messages: sanitized, didPrepend };
|
||||
}
|
||||
|
||||
export async function sanitizeSessionHistory(params: {
|
||||
messages: AgentMessage[];
|
||||
modelApi?: string | null;
|
||||
modelId?: string;
|
||||
provider?: string;
|
||||
allowedToolNames?: Iterable<string>;
|
||||
config?: OpenClawConfig;
|
||||
sessionManager: SessionManager;
|
||||
sessionId: string;
|
||||
policy?: TranscriptPolicy;
|
||||
}): Promise<AgentMessage[]> {
|
||||
// Keep docs/reference/transcript-hygiene.md in sync with any logic changes here.
|
||||
const policy =
|
||||
params.policy ??
|
||||
resolveTranscriptPolicy({
|
||||
modelApi: params.modelApi,
|
||||
provider: params.provider,
|
||||
modelId: params.modelId,
|
||||
});
|
||||
const withInterSessionMarkers = annotateInterSessionUserMessages(params.messages);
|
||||
const sanitizedImages = await sanitizeSessionMessagesImages(
|
||||
withInterSessionMarkers,
|
||||
"session:history",
|
||||
{
|
||||
sanitizeMode: policy.sanitizeMode,
|
||||
sanitizeToolCallIds: policy.sanitizeToolCallIds,
|
||||
toolCallIdMode: policy.toolCallIdMode,
|
||||
preserveSignatures: policy.preserveSignatures,
|
||||
sanitizeThoughtSignatures: policy.sanitizeThoughtSignatures,
|
||||
...resolveImageSanitizationLimits(params.config),
|
||||
},
|
||||
);
|
||||
const droppedThinking = policy.dropThinkingBlocks
|
||||
? dropThinkingBlocks(sanitizedImages)
|
||||
: sanitizedImages;
|
||||
const sanitizedToolCalls = sanitizeToolCallInputs(droppedThinking, {
|
||||
allowedToolNames: params.allowedToolNames,
|
||||
});
|
||||
const repairedTools = policy.repairToolUseResultPairing
|
||||
? sanitizeToolUseResultPairing(sanitizedToolCalls)
|
||||
: sanitizedToolCalls;
|
||||
const sanitizedToolResults = stripToolResultDetails(repairedTools);
|
||||
const sanitizedCompactionUsage = ensureAssistantUsageSnapshots(
|
||||
stripStaleAssistantUsageBeforeLatestCompaction(sanitizedToolResults),
|
||||
);
|
||||
|
||||
const isOpenAIResponsesApi =
|
||||
params.modelApi === "openai-responses" || params.modelApi === "openai-codex-responses";
|
||||
const hasSnapshot = Boolean(params.provider || params.modelApi || params.modelId);
|
||||
const priorSnapshot = hasSnapshot ? readLastModelSnapshot(params.sessionManager) : null;
|
||||
const modelChanged = priorSnapshot
|
||||
? !isSameModelSnapshot(priorSnapshot, {
|
||||
timestamp: 0,
|
||||
provider: params.provider,
|
||||
modelApi: params.modelApi,
|
||||
modelId: params.modelId,
|
||||
})
|
||||
: false;
|
||||
const sanitizedOpenAI = isOpenAIResponsesApi
|
||||
? downgradeOpenAIFunctionCallReasoningPairs(
|
||||
downgradeOpenAIReasoningBlocks(sanitizedCompactionUsage),
|
||||
)
|
||||
: sanitizedCompactionUsage;
|
||||
|
||||
if (hasSnapshot && (!priorSnapshot || modelChanged)) {
|
||||
appendModelSnapshot(params.sessionManager, {
|
||||
timestamp: Date.now(),
|
||||
provider: params.provider,
|
||||
modelApi: params.modelApi,
|
||||
modelId: params.modelId,
|
||||
});
|
||||
}
|
||||
|
||||
if (!policy.applyGoogleTurnOrdering) {
|
||||
return sanitizedOpenAI;
|
||||
}
|
||||
|
||||
// Google models use the full wrapper with logging and session markers.
|
||||
if (isGoogleModelApi(params.modelApi)) {
|
||||
return applyGoogleTurnOrderingFix({
|
||||
messages: sanitizedOpenAI,
|
||||
modelApi: params.modelApi,
|
||||
sessionManager: params.sessionManager,
|
||||
sessionId: params.sessionId,
|
||||
}).messages;
|
||||
}
|
||||
|
||||
// Strict OpenAI-compatible providers (vLLM, Gemma, etc.) also reject
|
||||
// conversations that start with an assistant turn (e.g. delivery-mirror
|
||||
// messages after /new). Apply the same ordering fix without the
|
||||
// Google-specific session markers. See #38962.
|
||||
return sanitizeGoogleTurnOrdering(sanitizedOpenAI);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -99,11 +99,7 @@ import { appendCacheTtlTimestamp, isCacheTtlEligibleProvider } from "../cache-tt
|
|||
import type { CompactEmbeddedPiSessionParams } from "../compact.js";
|
||||
import { buildEmbeddedExtensionFactories } from "../extensions.js";
|
||||
import { applyExtraParamsToAgent } from "../extra-params.js";
|
||||
import {
|
||||
logToolSchemasForGoogle,
|
||||
sanitizeSessionHistory,
|
||||
sanitizeToolsForGoogle,
|
||||
} from "../google.js";
|
||||
import { logToolSchemasForGoogle, sanitizeToolsForGoogle } from "../google.js";
|
||||
import { getDmHistoryLimitFromSessionKey, limitHistoryTurns } from "../history.js";
|
||||
import { log } from "../logger.js";
|
||||
import { buildModelAliasLines } from "../model.js";
|
||||
|
|
@ -125,6 +121,7 @@ import { dropThinkingBlocks } from "../thinking.js";
|
|||
import { collectAllowedToolNames } from "../tool-name-allowlist.js";
|
||||
import { installToolResultContextGuard } from "../tool-result-context-guard.js";
|
||||
import { splitSdkTools } from "../tool-split.js";
|
||||
import { sanitizeSessionHistory } from "../transcript-hygiene.js";
|
||||
import { describeUnknownError, mapThinkingLevel } from "../utils.js";
|
||||
import { flushPendingToolResultsAfterIdle } from "../wait-for-idle-before-flush.js";
|
||||
import { waitForCompactionRetryWithAggregateTimeout } from "./compaction-retry-aggregate-timeout.js";
|
||||
|
|
@ -1774,9 +1771,10 @@ export async function runEmbeddedAttempt(
|
|||
activeSession.agent.streamFn = cacheTrace.wrapStreamFn(activeSession.agent.streamFn);
|
||||
}
|
||||
|
||||
// Copilot/Claude can reject persisted `thinking` blocks (e.g. thinkingSignature:"reasoning_text")
|
||||
// on *any* follow-up provider call (including tool continuations). Wrap the stream function
|
||||
// so every outbound request sees sanitized messages.
|
||||
// Copilot/Claude can reject persisted `thinking` blocks on replay, but
|
||||
// Anthropic also requires the latest assistant turn to stay byte-for-byte
|
||||
// stable. Strip older replayed thinking blocks while preserving the most
|
||||
// recent assistant message for every outbound request.
|
||||
if (transcriptPolicy.dropThinkingBlocks) {
|
||||
const inner = activeSession.agent.streamFn;
|
||||
activeSession.agent.streamFn = (model, context, options) => {
|
||||
|
|
@ -1785,7 +1783,9 @@ export async function runEmbeddedAttempt(
|
|||
if (!Array.isArray(messages)) {
|
||||
return inner(model, context, options);
|
||||
}
|
||||
const sanitized = dropThinkingBlocks(messages as unknown as AgentMessage[]) as unknown;
|
||||
const sanitized = dropThinkingBlocks(messages as unknown as AgentMessage[], {
|
||||
preserveLatestAssistant: true,
|
||||
}) as unknown;
|
||||
if (sanitized === messages) {
|
||||
return inner(model, context, options);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,7 +3,7 @@ import type { ToolResultMessage, UserMessage } from "@mariozechner/pi-ai";
|
|||
import { SessionManager } from "@mariozechner/pi-coding-agent";
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { makeAgentAssistantMessage } from "../test-helpers/agent-message-fixtures.js";
|
||||
import { sanitizeSessionHistory } from "./google.js";
|
||||
import { sanitizeSessionHistory } from "./transcript-hygiene.js";
|
||||
|
||||
describe("sanitizeSessionHistory toolResult details stripping", () => {
|
||||
it("strips toolResult.details so untrusted payloads are not fed back to the model", async () => {
|
||||
|
|
|
|||
|
|
@ -58,4 +58,34 @@ describe("dropThinkingBlocks", () => {
|
|||
const assistant = result[0] as Extract<AgentMessage, { role: "assistant" }>;
|
||||
expect(assistant.content).toEqual([{ type: "text", text: "" }]);
|
||||
});
|
||||
|
||||
it("preserves the latest assistant turn when requested", () => {
|
||||
const messages: AgentMessage[] = [
|
||||
castAgentMessage({ role: "user", content: "hello" }),
|
||||
castAgentMessage({
|
||||
role: "assistant",
|
||||
content: [
|
||||
{ type: "thinking", thinking: "older reasoning" },
|
||||
{ type: "text", text: "older answer" },
|
||||
],
|
||||
}),
|
||||
castAgentMessage({ role: "user", content: "follow up" }),
|
||||
castAgentMessage({
|
||||
role: "assistant",
|
||||
content: [
|
||||
{ type: "thinking", thinking: "latest reasoning" },
|
||||
{ type: "text", text: "latest answer" },
|
||||
],
|
||||
}),
|
||||
];
|
||||
|
||||
const result = dropThinkingBlocks(messages, { preserveLatestAssistant: true });
|
||||
const olderAssistant = result[1] as Extract<AgentMessage, { role: "assistant" }>;
|
||||
const latestAssistant = result[3] as Extract<AgentMessage, { role: "assistant" }>;
|
||||
expect(olderAssistant.content).toEqual([{ type: "text", text: "older answer" }]);
|
||||
expect(latestAssistant.content).toEqual([
|
||||
{ type: "thinking", thinking: "latest reasoning" },
|
||||
{ type: "text", text: "latest answer" },
|
||||
]);
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -12,6 +12,15 @@ export function isAssistantMessageWithContent(message: AgentMessage): message is
|
|||
);
|
||||
}
|
||||
|
||||
function findLatestAssistantMessageIndex(messages: AgentMessage[]): number {
|
||||
for (let i = messages.length - 1; i >= 0; i -= 1) {
|
||||
if (isAssistantMessageWithContent(messages[i])) {
|
||||
return i;
|
||||
}
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Strip all `type: "thinking"` content blocks from assistant messages.
|
||||
*
|
||||
|
|
@ -19,17 +28,32 @@ export function isAssistantMessageWithContent(message: AgentMessage): message is
|
|||
* a synthetic `{ type: "text", text: "" }` block to preserve turn structure
|
||||
* (some providers require strict user/assistant alternation).
|
||||
*
|
||||
* When `preserveLatestAssistant` is enabled, the most recent assistant turn is
|
||||
* left untouched. Anthropic-backed endpoints require the latest assistant
|
||||
* message to preserve its original `thinking` blocks on replay.
|
||||
*
|
||||
* Returns the original array reference when nothing was changed (callers can
|
||||
* use reference equality to skip downstream work).
|
||||
*/
|
||||
export function dropThinkingBlocks(messages: AgentMessage[]): AgentMessage[] {
|
||||
export function dropThinkingBlocks(
|
||||
messages: AgentMessage[],
|
||||
opts?: { preserveLatestAssistant?: boolean },
|
||||
): AgentMessage[] {
|
||||
let touched = false;
|
||||
const out: AgentMessage[] = [];
|
||||
for (const msg of messages) {
|
||||
const latestAssistantIndex = opts?.preserveLatestAssistant
|
||||
? findLatestAssistantMessageIndex(messages)
|
||||
: -1;
|
||||
for (let index = 0; index < messages.length; index += 1) {
|
||||
const msg = messages[index];
|
||||
if (!isAssistantMessageWithContent(msg)) {
|
||||
out.push(msg);
|
||||
continue;
|
||||
}
|
||||
if (index === latestAssistantIndex) {
|
||||
out.push(msg);
|
||||
continue;
|
||||
}
|
||||
const nextContent: AssistantContentBlock[] = [];
|
||||
let changed = false;
|
||||
for (const block of msg.content) {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,470 @@
|
|||
import type { AgentMessage } from "@mariozechner/pi-agent-core";
|
||||
import type { SessionManager } from "@mariozechner/pi-coding-agent";
|
||||
import type { OpenClawConfig } from "../../config/config.js";
|
||||
import {
|
||||
hasInterSessionUserProvenance,
|
||||
normalizeInputProvenance,
|
||||
} from "../../sessions/input-provenance.js";
|
||||
import { resolveImageSanitizationLimits } from "../image-sanitization.js";
|
||||
import {
|
||||
downgradeOpenAIFunctionCallReasoningPairs,
|
||||
downgradeOpenAIReasoningBlocks,
|
||||
isGoogleModelApi,
|
||||
sanitizeGoogleTurnOrdering,
|
||||
sanitizeSessionMessagesImages,
|
||||
} from "../pi-embedded-helpers.js";
|
||||
import {
|
||||
sanitizeToolCallInputs,
|
||||
stripToolResultDetails,
|
||||
sanitizeToolUseResultPairing,
|
||||
} from "../session-transcript-repair.js";
|
||||
import type { TranscriptPolicy } from "../transcript-policy.js";
|
||||
import { resolveTranscriptPolicy } from "../transcript-policy.js";
|
||||
import {
|
||||
makeZeroUsageSnapshot,
|
||||
normalizeUsage,
|
||||
type AssistantUsageSnapshot,
|
||||
type UsageLike,
|
||||
} from "../usage.js";
|
||||
import { applyGoogleTurnOrderingFix } from "./google.js";
|
||||
import { dropThinkingBlocks } from "./thinking.js";
|
||||
|
||||
const INTER_SESSION_PREFIX_BASE = "[Inter-session message]";
|
||||
const MODEL_SNAPSHOT_CUSTOM_TYPE = "model-snapshot";
|
||||
|
||||
type CustomEntryLike = { type?: unknown; customType?: unknown; data?: unknown };
|
||||
|
||||
type ModelSnapshotEntry = {
|
||||
timestamp: number;
|
||||
provider?: string;
|
||||
modelApi?: string | null;
|
||||
modelId?: string;
|
||||
};
|
||||
|
||||
function hasThinkingLikeBlock(block: unknown): block is { type: "thinking" | "redacted_thinking" } {
|
||||
if (!block || typeof block !== "object") {
|
||||
return false;
|
||||
}
|
||||
const type = (block as { type?: unknown }).type;
|
||||
return type === "thinking" || type === "redacted_thinking";
|
||||
}
|
||||
|
||||
function findLatestAssistantMessageWithThinking(
|
||||
messages: AgentMessage[],
|
||||
): Extract<AgentMessage, { role: "assistant" }> | null {
|
||||
for (let i = messages.length - 1; i >= 0; i -= 1) {
|
||||
const message = messages[i];
|
||||
if (!message || typeof message !== "object" || message.role !== "assistant") {
|
||||
continue;
|
||||
}
|
||||
const assistant = message;
|
||||
if (Array.isArray(assistant.content) && assistant.content.some(hasThinkingLikeBlock)) {
|
||||
return assistant;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
function restoreLatestAssistantMessageWithThinking(
|
||||
originalMessages: AgentMessage[],
|
||||
sanitizedMessages: AgentMessage[],
|
||||
): AgentMessage[] {
|
||||
const originalLatestAssistant = findLatestAssistantMessageWithThinking(originalMessages);
|
||||
if (!originalLatestAssistant) {
|
||||
return sanitizedMessages;
|
||||
}
|
||||
|
||||
for (let i = sanitizedMessages.length - 1; i >= 0; i -= 1) {
|
||||
const candidate = sanitizedMessages[i];
|
||||
if (!candidate || typeof candidate !== "object" || candidate.role !== "assistant") {
|
||||
continue;
|
||||
}
|
||||
if (candidate === originalLatestAssistant) {
|
||||
return sanitizedMessages;
|
||||
}
|
||||
const restored = [...sanitizedMessages];
|
||||
restored[i] = originalLatestAssistant;
|
||||
return restored;
|
||||
}
|
||||
|
||||
return sanitizedMessages;
|
||||
}
|
||||
|
||||
function buildInterSessionPrefix(message: AgentMessage): string {
|
||||
const provenance = normalizeInputProvenance((message as { provenance?: unknown }).provenance);
|
||||
if (!provenance) {
|
||||
return INTER_SESSION_PREFIX_BASE;
|
||||
}
|
||||
const details = [
|
||||
provenance.sourceSessionKey ? `sourceSession=${provenance.sourceSessionKey}` : undefined,
|
||||
provenance.sourceChannel ? `sourceChannel=${provenance.sourceChannel}` : undefined,
|
||||
provenance.sourceTool ? `sourceTool=${provenance.sourceTool}` : undefined,
|
||||
].filter(Boolean);
|
||||
if (details.length === 0) {
|
||||
return INTER_SESSION_PREFIX_BASE;
|
||||
}
|
||||
return `${INTER_SESSION_PREFIX_BASE} ${details.join(" ")}`;
|
||||
}
|
||||
|
||||
function annotateInterSessionUserMessages(messages: AgentMessage[]): AgentMessage[] {
|
||||
let touched = false;
|
||||
const out: AgentMessage[] = [];
|
||||
for (const msg of messages) {
|
||||
if (!hasInterSessionUserProvenance(msg as { role?: unknown; provenance?: unknown })) {
|
||||
out.push(msg);
|
||||
continue;
|
||||
}
|
||||
const prefix = buildInterSessionPrefix(msg);
|
||||
const user = msg as Extract<AgentMessage, { role: "user" }>;
|
||||
if (typeof user.content === "string") {
|
||||
if (user.content.startsWith(prefix)) {
|
||||
out.push(msg);
|
||||
continue;
|
||||
}
|
||||
touched = true;
|
||||
out.push({
|
||||
...(msg as unknown as Record<string, unknown>),
|
||||
content: `${prefix}\n${user.content}`,
|
||||
} as AgentMessage);
|
||||
continue;
|
||||
}
|
||||
if (!Array.isArray(user.content)) {
|
||||
out.push(msg);
|
||||
continue;
|
||||
}
|
||||
|
||||
const textIndex = user.content.findIndex(
|
||||
(block) =>
|
||||
block &&
|
||||
typeof block === "object" &&
|
||||
(block as { type?: unknown }).type === "text" &&
|
||||
typeof (block as { text?: unknown }).text === "string",
|
||||
);
|
||||
|
||||
if (textIndex >= 0) {
|
||||
const existing = user.content[textIndex] as { type: "text"; text: string };
|
||||
if (existing.text.startsWith(prefix)) {
|
||||
out.push(msg);
|
||||
continue;
|
||||
}
|
||||
const nextContent = [...user.content];
|
||||
nextContent[textIndex] = {
|
||||
...existing,
|
||||
text: `${prefix}\n${existing.text}`,
|
||||
};
|
||||
touched = true;
|
||||
out.push({
|
||||
...(msg as unknown as Record<string, unknown>),
|
||||
content: nextContent,
|
||||
} as AgentMessage);
|
||||
continue;
|
||||
}
|
||||
|
||||
touched = true;
|
||||
out.push({
|
||||
...(msg as unknown as Record<string, unknown>),
|
||||
content: [{ type: "text", text: prefix }, ...user.content],
|
||||
} as AgentMessage);
|
||||
}
|
||||
return touched ? out : messages;
|
||||
}
|
||||
|
||||
function parseMessageTimestamp(value: unknown): number | null {
|
||||
if (typeof value === "number" && Number.isFinite(value)) {
|
||||
return value;
|
||||
}
|
||||
if (typeof value === "string") {
|
||||
const parsed = Date.parse(value);
|
||||
if (Number.isFinite(parsed)) {
|
||||
return parsed;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
function stripStaleAssistantUsageBeforeLatestCompaction(messages: AgentMessage[]): AgentMessage[] {
|
||||
let latestCompactionSummaryIndex = -1;
|
||||
let latestCompactionTimestamp: number | null = null;
|
||||
for (let i = 0; i < messages.length; i += 1) {
|
||||
const entry = messages[i];
|
||||
if (entry?.role !== "compactionSummary") {
|
||||
continue;
|
||||
}
|
||||
latestCompactionSummaryIndex = i;
|
||||
latestCompactionTimestamp = parseMessageTimestamp(
|
||||
(entry as { timestamp?: unknown }).timestamp ?? null,
|
||||
);
|
||||
}
|
||||
if (latestCompactionSummaryIndex === -1) {
|
||||
return messages;
|
||||
}
|
||||
|
||||
const out = [...messages];
|
||||
let touched = false;
|
||||
for (let i = 0; i < out.length; i += 1) {
|
||||
const candidate = out[i] as
|
||||
| (AgentMessage & { usage?: unknown; timestamp?: unknown })
|
||||
| undefined;
|
||||
if (!candidate || candidate.role !== "assistant") {
|
||||
continue;
|
||||
}
|
||||
if (!candidate.usage || typeof candidate.usage !== "object") {
|
||||
continue;
|
||||
}
|
||||
|
||||
const messageTimestamp = parseMessageTimestamp(candidate.timestamp);
|
||||
const staleByTimestamp =
|
||||
latestCompactionTimestamp !== null &&
|
||||
messageTimestamp !== null &&
|
||||
messageTimestamp <= latestCompactionTimestamp;
|
||||
const staleByLegacyOrdering = i < latestCompactionSummaryIndex;
|
||||
if (!staleByTimestamp && !staleByLegacyOrdering) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// pi-coding-agent expects assistant usage to always be present during context
|
||||
// accounting. Keep stale snapshots structurally valid, but zeroed out.
|
||||
const candidateRecord = candidate as unknown as Record<string, unknown>;
|
||||
out[i] = {
|
||||
...candidateRecord,
|
||||
usage: makeZeroUsageSnapshot(),
|
||||
} as unknown as AgentMessage;
|
||||
touched = true;
|
||||
}
|
||||
return touched ? out : messages;
|
||||
}
|
||||
|
||||
function normalizeAssistantUsageCost(usage: unknown): AssistantUsageSnapshot["cost"] | undefined {
|
||||
const base = makeZeroUsageSnapshot().cost;
|
||||
if (!usage || typeof usage !== "object") {
|
||||
return undefined;
|
||||
}
|
||||
const rawCost = (usage as { cost?: unknown }).cost;
|
||||
if (!rawCost || typeof rawCost !== "object") {
|
||||
return undefined;
|
||||
}
|
||||
const cost = rawCost as Record<string, unknown>;
|
||||
const inputRaw = toFiniteCostNumber(cost.input);
|
||||
const outputRaw = toFiniteCostNumber(cost.output);
|
||||
const cacheReadRaw = toFiniteCostNumber(cost.cacheRead);
|
||||
const cacheWriteRaw = toFiniteCostNumber(cost.cacheWrite);
|
||||
const totalRaw = toFiniteCostNumber(cost.total);
|
||||
if (
|
||||
inputRaw === undefined &&
|
||||
outputRaw === undefined &&
|
||||
cacheReadRaw === undefined &&
|
||||
cacheWriteRaw === undefined &&
|
||||
totalRaw === undefined
|
||||
) {
|
||||
return undefined;
|
||||
}
|
||||
const input = inputRaw ?? base.input;
|
||||
const output = outputRaw ?? base.output;
|
||||
const cacheRead = cacheReadRaw ?? base.cacheRead;
|
||||
const cacheWrite = cacheWriteRaw ?? base.cacheWrite;
|
||||
const total = totalRaw ?? input + output + cacheRead + cacheWrite;
|
||||
return { input, output, cacheRead, cacheWrite, total };
|
||||
}
|
||||
|
||||
function normalizeAssistantUsageSnapshot(usage: unknown) {
|
||||
const normalized = normalizeUsage((usage ?? undefined) as UsageLike | undefined);
|
||||
if (!normalized) {
|
||||
return makeZeroUsageSnapshot();
|
||||
}
|
||||
const input = normalized.input ?? 0;
|
||||
const output = normalized.output ?? 0;
|
||||
const cacheRead = normalized.cacheRead ?? 0;
|
||||
const cacheWrite = normalized.cacheWrite ?? 0;
|
||||
const totalTokens = normalized.total ?? input + output + cacheRead + cacheWrite;
|
||||
const cost = normalizeAssistantUsageCost(usage);
|
||||
return {
|
||||
input,
|
||||
output,
|
||||
cacheRead,
|
||||
cacheWrite,
|
||||
totalTokens,
|
||||
...(cost ? { cost } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
function toFiniteCostNumber(value: unknown): number | undefined {
|
||||
return typeof value === "number" && Number.isFinite(value) ? value : undefined;
|
||||
}
|
||||
|
||||
function ensureAssistantUsageSnapshots(messages: AgentMessage[]): AgentMessage[] {
|
||||
if (messages.length === 0) {
|
||||
return messages;
|
||||
}
|
||||
|
||||
let touched = false;
|
||||
const out = [...messages];
|
||||
for (let i = 0; i < out.length; i += 1) {
|
||||
const message = out[i] as (AgentMessage & { role?: unknown; usage?: unknown }) | undefined;
|
||||
if (!message || message.role !== "assistant") {
|
||||
continue;
|
||||
}
|
||||
const normalizedUsage = normalizeAssistantUsageSnapshot(message.usage);
|
||||
const usageCost =
|
||||
message.usage && typeof message.usage === "object"
|
||||
? (message.usage as { cost?: unknown }).cost
|
||||
: undefined;
|
||||
const normalizedCost = normalizedUsage.cost;
|
||||
if (
|
||||
message.usage &&
|
||||
typeof message.usage === "object" &&
|
||||
(message.usage as { input?: unknown }).input === normalizedUsage.input &&
|
||||
(message.usage as { output?: unknown }).output === normalizedUsage.output &&
|
||||
(message.usage as { cacheRead?: unknown }).cacheRead === normalizedUsage.cacheRead &&
|
||||
(message.usage as { cacheWrite?: unknown }).cacheWrite === normalizedUsage.cacheWrite &&
|
||||
(message.usage as { totalTokens?: unknown }).totalTokens === normalizedUsage.totalTokens &&
|
||||
((normalizedCost &&
|
||||
usageCost &&
|
||||
typeof usageCost === "object" &&
|
||||
(usageCost as { input?: unknown }).input === normalizedCost.input &&
|
||||
(usageCost as { output?: unknown }).output === normalizedCost.output &&
|
||||
(usageCost as { cacheRead?: unknown }).cacheRead === normalizedCost.cacheRead &&
|
||||
(usageCost as { cacheWrite?: unknown }).cacheWrite === normalizedCost.cacheWrite &&
|
||||
(usageCost as { total?: unknown }).total === normalizedCost.total) ||
|
||||
(!normalizedCost && usageCost === undefined))
|
||||
) {
|
||||
continue;
|
||||
}
|
||||
out[i] = {
|
||||
...(message as unknown as Record<string, unknown>),
|
||||
usage: normalizedUsage,
|
||||
} as AgentMessage;
|
||||
touched = true;
|
||||
}
|
||||
|
||||
return touched ? out : messages;
|
||||
}
|
||||
|
||||
function readLastModelSnapshot(sessionManager: SessionManager): ModelSnapshotEntry | null {
|
||||
try {
|
||||
const entries = sessionManager.getEntries();
|
||||
for (let i = entries.length - 1; i >= 0; i -= 1) {
|
||||
const entry = entries[i] as CustomEntryLike;
|
||||
if (entry?.type !== "custom" || entry?.customType !== MODEL_SNAPSHOT_CUSTOM_TYPE) {
|
||||
continue;
|
||||
}
|
||||
const data = entry?.data as ModelSnapshotEntry | undefined;
|
||||
if (data && typeof data === "object") {
|
||||
return data;
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
function appendModelSnapshot(sessionManager: SessionManager, data: ModelSnapshotEntry): void {
|
||||
try {
|
||||
sessionManager.appendCustomEntry(MODEL_SNAPSHOT_CUSTOM_TYPE, data);
|
||||
} catch {
|
||||
// ignore persistence failures
|
||||
}
|
||||
}
|
||||
|
||||
function isSameModelSnapshot(a: ModelSnapshotEntry, b: ModelSnapshotEntry): boolean {
|
||||
const normalize = (value?: string | null) => value ?? "";
|
||||
return (
|
||||
normalize(a.provider) === normalize(b.provider) &&
|
||||
normalize(a.modelApi) === normalize(b.modelApi) &&
|
||||
normalize(a.modelId) === normalize(b.modelId)
|
||||
);
|
||||
}
|
||||
|
||||
export async function sanitizeSessionHistory(params: {
|
||||
messages: AgentMessage[];
|
||||
modelApi?: string | null;
|
||||
modelId?: string;
|
||||
provider?: string;
|
||||
allowedToolNames?: Iterable<string>;
|
||||
config?: OpenClawConfig;
|
||||
sessionManager: SessionManager;
|
||||
sessionId: string;
|
||||
policy?: TranscriptPolicy;
|
||||
}): Promise<AgentMessage[]> {
|
||||
// Keep docs/reference/transcript-hygiene.md in sync with any logic changes here.
|
||||
const policy =
|
||||
params.policy ??
|
||||
resolveTranscriptPolicy({
|
||||
modelApi: params.modelApi,
|
||||
provider: params.provider,
|
||||
modelId: params.modelId,
|
||||
});
|
||||
const withInterSessionMarkers = annotateInterSessionUserMessages(params.messages);
|
||||
const sanitizedImages = await sanitizeSessionMessagesImages(
|
||||
withInterSessionMarkers,
|
||||
"session:history",
|
||||
{
|
||||
sanitizeMode: policy.sanitizeMode,
|
||||
sanitizeToolCallIds: policy.sanitizeToolCallIds,
|
||||
toolCallIdMode: policy.toolCallIdMode,
|
||||
preserveSignatures: policy.preserveSignatures,
|
||||
sanitizeThoughtSignatures: policy.sanitizeThoughtSignatures,
|
||||
...resolveImageSanitizationLimits(params.config),
|
||||
},
|
||||
);
|
||||
const droppedThinking = policy.dropThinkingBlocks
|
||||
? dropThinkingBlocks(sanitizedImages, { preserveLatestAssistant: true })
|
||||
: sanitizedImages;
|
||||
const sanitizedToolCalls = sanitizeToolCallInputs(droppedThinking, {
|
||||
allowedToolNames: params.allowedToolNames,
|
||||
});
|
||||
const repairedTools = policy.repairToolUseResultPairing
|
||||
? sanitizeToolUseResultPairing(sanitizedToolCalls)
|
||||
: sanitizedToolCalls;
|
||||
const sanitizedToolResults = stripToolResultDetails(repairedTools);
|
||||
const sanitizedCompactionUsage = ensureAssistantUsageSnapshots(
|
||||
stripStaleAssistantUsageBeforeLatestCompaction(sanitizedToolResults),
|
||||
);
|
||||
|
||||
const isOpenAIResponsesApi =
|
||||
params.modelApi === "openai-responses" || params.modelApi === "openai-codex-responses";
|
||||
const hasSnapshot = Boolean(params.provider || params.modelApi || params.modelId);
|
||||
const priorSnapshot = hasSnapshot ? readLastModelSnapshot(params.sessionManager) : null;
|
||||
const modelChanged = priorSnapshot
|
||||
? !isSameModelSnapshot(priorSnapshot, {
|
||||
timestamp: 0,
|
||||
provider: params.provider,
|
||||
modelApi: params.modelApi,
|
||||
modelId: params.modelId,
|
||||
})
|
||||
: false;
|
||||
const sanitizedOpenAI = isOpenAIResponsesApi
|
||||
? downgradeOpenAIFunctionCallReasoningPairs(
|
||||
downgradeOpenAIReasoningBlocks(sanitizedCompactionUsage),
|
||||
)
|
||||
: sanitizedCompactionUsage;
|
||||
const stableLatestAssistant = restoreLatestAssistantMessageWithThinking(
|
||||
params.messages,
|
||||
sanitizedOpenAI,
|
||||
);
|
||||
|
||||
if (hasSnapshot && (!priorSnapshot || modelChanged)) {
|
||||
appendModelSnapshot(params.sessionManager, {
|
||||
timestamp: Date.now(),
|
||||
provider: params.provider,
|
||||
modelApi: params.modelApi,
|
||||
modelId: params.modelId,
|
||||
});
|
||||
}
|
||||
|
||||
if (!policy.applyGoogleTurnOrdering) {
|
||||
return stableLatestAssistant;
|
||||
}
|
||||
|
||||
if (isGoogleModelApi(params.modelApi)) {
|
||||
return applyGoogleTurnOrderingFix({
|
||||
messages: stableLatestAssistant,
|
||||
modelApi: params.modelApi,
|
||||
sessionManager: params.sessionManager,
|
||||
sessionId: params.sessionId,
|
||||
}).messages;
|
||||
}
|
||||
|
||||
// Strict OpenAI-compatible providers also reject assistant-first histories.
|
||||
return sanitizeGoogleTurnOrdering(stableLatestAssistant);
|
||||
}
|
||||
Loading…
Reference in New Issue