diff --git a/CHANGELOG.md b/CHANGELOG.md index 9820a7aa1e8..563e7e6b132 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -236,6 +236,7 @@ Docs: https://docs.openclaw.ai - Memory/Gemini: normalize returned Gemini embeddings across direct query, direct batch, and async batch paths so memory search uses consistent vector handling for Gemini too. (#43409) Thanks @gumadeiras. - Agents/failover: recognize additional serialized network errno strings plus `EHOSTDOWN` and `EPIPE` structured codes so transient transport failures trigger timeout failover more reliably. (#42830) Thanks @jnMetaCode. - Telegram/model picker: make inline model button selections persist the chosen session model correctly, clear overrides when selecting the configured default, and include effective fallback models in `/models` button validation. (#40105) Thanks @avirweb. +- Agents/embedded runner: carry provider-observed overflow token counts into compaction so overflow retries and diagnostics use the rejected live prompt size instead of only transcript estimates. (#40357) thanks @rabsef-bicrym. ## 2026.3.7 diff --git a/src/agents/anthropic-payload-log.test.ts b/src/agents/anthropic-payload-log.test.ts index fb3cf18e47d..037093fbbf5 100644 --- a/src/agents/anthropic-payload-log.test.ts +++ b/src/agents/anthropic-payload-log.test.ts @@ -29,7 +29,7 @@ describe("createAnthropicPayloadLogger", () => { ], }; const streamFn: StreamFn = ((model, __, options) => { - options?.onPayload?.(payload, model); + options?.onPayload?.(payload); return {} as never; }) as StreamFn; diff --git a/src/agents/anthropic-payload-log.ts b/src/agents/anthropic-payload-log.ts index 2eb5d62e770..d80ed551179 100644 --- a/src/agents/anthropic-payload-log.ts +++ b/src/agents/anthropic-payload-log.ts @@ -145,7 +145,7 @@ export function createAnthropicPayloadLogger(params: { payload: redactedPayload, payloadDigest: digest(redactedPayload), }); - return options?.onPayload?.(payload, model); + return options?.onPayload?.(payload); }; return streamFn(model, context, { ...options, diff --git a/src/agents/auth-profiles/oauth.openai-codex-refresh-fallback.test.ts b/src/agents/auth-profiles/oauth.openai-codex-refresh-fallback.test.ts index 23381d89a05..833936b93f5 100644 --- a/src/agents/auth-profiles/oauth.openai-codex-refresh-fallback.test.ts +++ b/src/agents/auth-profiles/oauth.openai-codex-refresh-fallback.test.ts @@ -17,7 +17,7 @@ const { getOAuthApiKeyMock } = vi.hoisted(() => ({ }), })); -vi.mock("@mariozechner/pi-ai/oauth", () => ({ +vi.mock("@mariozechner/pi-ai", () => ({ getOAuthApiKey: getOAuthApiKeyMock, getOAuthProviders: () => [ { id: "openai-codex", envApiKey: "OPENAI_API_KEY", oauthTokenEnv: "OPENAI_OAUTH_TOKEN" }, // pragma: allowlist secret diff --git a/src/agents/auth-profiles/oauth.ts b/src/agents/auth-profiles/oauth.ts index 072b3a77246..6f2061501b6 100644 --- a/src/agents/auth-profiles/oauth.ts +++ b/src/agents/auth-profiles/oauth.ts @@ -1,5 +1,9 @@ -import type { OAuthCredentials, OAuthProvider } from "@mariozechner/pi-ai"; -import { getOAuthApiKey, getOAuthProviders } from "@mariozechner/pi-ai/oauth"; +import { + getOAuthApiKey, + getOAuthProviders, + type OAuthCredentials, + type OAuthProvider, +} from "@mariozechner/pi-ai"; import { loadConfig, type OpenClawConfig } from "../../config/config.js"; import { coerceSecretRef } from "../../config/types.secrets.js"; import { withFileLock } from "../../infra/file-lock.js"; diff --git a/src/agents/openai-ws-stream.ts b/src/agents/openai-ws-stream.ts index 5b7a80f52ec..04550a665f4 100644 --- a/src/agents/openai-ws-stream.ts +++ b/src/agents/openai-ws-stream.ts @@ -797,7 +797,7 @@ export function createOpenAIWebSocketStreamFn( ...(prevResponseId ? { previous_response_id: prevResponseId } : {}), ...extraParams, }; - const nextPayload = await options?.onPayload?.(payload, model); + const nextPayload = options?.onPayload?.(payload); const requestPayload = (nextPayload ?? payload) as Parameters< OpenAIWebSocketManager["send"] >[0]; diff --git a/src/agents/pi-embedded-helpers.isbillingerrormessage.test.ts b/src/agents/pi-embedded-helpers.isbillingerrormessage.test.ts index 9ed183a6910..b71ad3a7d78 100644 --- a/src/agents/pi-embedded-helpers.isbillingerrormessage.test.ts +++ b/src/agents/pi-embedded-helpers.isbillingerrormessage.test.ts @@ -2,6 +2,7 @@ import { describe, expect, it } from "vitest"; import { classifyFailoverReason, classifyFailoverReasonFromHttpStatus, + extractObservedOverflowTokenCount, isAuthErrorMessage, isAuthPermanentErrorMessage, isBillingErrorMessage, @@ -461,6 +462,29 @@ describe("isLikelyContextOverflowError", () => { }); }); +describe("extractObservedOverflowTokenCount", () => { + it("extracts provider-reported prompt token counts", () => { + expect( + extractObservedOverflowTokenCount( + '400 {"type":"error","error":{"message":"prompt is too long: 277403 tokens > 200000 maximum"}}', + ), + ).toBe(277403); + expect( + extractObservedOverflowTokenCount("Context window exceeded: requested 12000 tokens"), + ).toBe(12000); + expect( + extractObservedOverflowTokenCount( + "This model's maximum context length is 128000 tokens. However, your messages resulted in 145000 tokens.", + ), + ).toBe(145000); + }); + + it("returns undefined when overflow counts are not present", () => { + expect(extractObservedOverflowTokenCount("Prompt too large for this model")).toBeUndefined(); + expect(extractObservedOverflowTokenCount("rate limit exceeded")).toBeUndefined(); + }); +}); + describe("isTransientHttpError", () => { it("returns true for retryable 5xx status codes", () => { expect(isTransientHttpError("499 Client Closed Request")).toBe(true); diff --git a/src/agents/pi-embedded-helpers.ts b/src/agents/pi-embedded-helpers.ts index 53f21814492..77ae492bc32 100644 --- a/src/agents/pi-embedded-helpers.ts +++ b/src/agents/pi-embedded-helpers.ts @@ -22,6 +22,7 @@ export { isAuthPermanentErrorMessage, isModelNotFoundErrorMessage, isBillingAssistantError, + extractObservedOverflowTokenCount, parseApiErrorInfo, sanitizeUserFacingText, isBillingErrorMessage, diff --git a/src/agents/pi-embedded-helpers/errors.ts b/src/agents/pi-embedded-helpers/errors.ts index e9bfd92951e..28fcf328e87 100644 --- a/src/agents/pi-embedded-helpers/errors.ts +++ b/src/agents/pi-embedded-helpers/errors.ts @@ -185,6 +185,32 @@ export function isCompactionFailureError(errorMessage?: string): boolean { return lower.includes("context overflow"); } +const OBSERVED_OVERFLOW_TOKEN_PATTERNS = [ + /prompt is too long:\s*([\d,]+)\s+tokens\s*>\s*[\d,]+\s+maximum/i, + /requested\s+([\d,]+)\s+tokens/i, + /resulted in\s+([\d,]+)\s+tokens/i, +]; + +export function extractObservedOverflowTokenCount(errorMessage?: string): number | undefined { + if (!errorMessage) { + return undefined; + } + + for (const pattern of OBSERVED_OVERFLOW_TOKEN_PATTERNS) { + const match = errorMessage.match(pattern); + const rawCount = match?.[1]?.replaceAll(",", ""); + if (!rawCount) { + continue; + } + const parsed = Number(rawCount); + if (Number.isFinite(parsed) && parsed > 0) { + return Math.floor(parsed); + } + } + + return undefined; +} + const ERROR_PAYLOAD_PREFIX_RE = /^(?:error|api\s*error|apierror|openai\s*error|anthropic\s*error|gateway\s*error)[:\s-]+/i; const FINAL_TAG_RE = /<\s*\/?\s*final\s*>/gi; diff --git a/src/agents/pi-embedded-runner-extraparams.test.ts b/src/agents/pi-embedded-runner-extraparams.test.ts index 3f6fb7a2f5a..2e30450a719 100644 --- a/src/agents/pi-embedded-runner-extraparams.test.ts +++ b/src/agents/pi-embedded-runner-extraparams.test.ts @@ -208,7 +208,7 @@ describe("applyExtraParamsToAgent", () => { }) { const payload = params.payload ?? { store: false }; const baseStreamFn: StreamFn = (model, _context, options) => { - options?.onPayload?.(payload, model); + options?.onPayload?.(payload); return {} as ReturnType; }; const agent = { streamFn: baseStreamFn }; @@ -233,7 +233,7 @@ describe("applyExtraParamsToAgent", () => { }) { const payload = params.payload ?? {}; const baseStreamFn: StreamFn = (model, _context, options) => { - options?.onPayload?.(payload, model); + options?.onPayload?.(payload); return {} as ReturnType; }; const agent = { streamFn: baseStreamFn }; @@ -276,7 +276,7 @@ describe("applyExtraParamsToAgent", () => { const payloads: Record[] = []; const baseStreamFn: StreamFn = (_model, _context, options) => { const payload: Record = { model: "deepseek/deepseek-r1" }; - options?.onPayload?.(payload, model); + options?.onPayload?.(payload); payloads.push(payload); return {} as ReturnType; }; @@ -308,7 +308,7 @@ describe("applyExtraParamsToAgent", () => { const payloads: Record[] = []; const baseStreamFn: StreamFn = (_model, _context, options) => { const payload: Record = {}; - options?.onPayload?.(payload, model); + options?.onPayload?.(payload); payloads.push(payload); return {} as ReturnType; }; @@ -332,7 +332,7 @@ describe("applyExtraParamsToAgent", () => { const payloads: Record[] = []; const baseStreamFn: StreamFn = (_model, _context, options) => { const payload: Record = { reasoning_effort: "high" }; - options?.onPayload?.(payload, model); + options?.onPayload?.(payload); payloads.push(payload); return {} as ReturnType; }; @@ -357,7 +357,7 @@ describe("applyExtraParamsToAgent", () => { const payloads: Record[] = []; const baseStreamFn: StreamFn = (_model, _context, options) => { const payload: Record = { reasoning: { max_tokens: 256 } }; - options?.onPayload?.(payload, model); + options?.onPayload?.(payload); payloads.push(payload); return {} as ReturnType; }; @@ -381,7 +381,7 @@ describe("applyExtraParamsToAgent", () => { const payloads: Record[] = []; const baseStreamFn: StreamFn = (_model, _context, options) => { const payload: Record = { reasoning_effort: "medium" }; - options?.onPayload?.(payload, model); + options?.onPayload?.(payload); payloads.push(payload); return {} as ReturnType; }; @@ -588,7 +588,7 @@ describe("applyExtraParamsToAgent", () => { const payloads: Record[] = []; const baseStreamFn: StreamFn = (_model, _context, options) => { const payload: Record = { thinking: "off" }; - options?.onPayload?.(payload, model); + options?.onPayload?.(payload); payloads.push(payload); return {} as ReturnType; }; @@ -619,7 +619,7 @@ describe("applyExtraParamsToAgent", () => { const payloads: Record[] = []; const baseStreamFn: StreamFn = (_model, _context, options) => { const payload: Record = { thinking: "off" }; - options?.onPayload?.(payload, model); + options?.onPayload?.(payload); payloads.push(payload); return {} as ReturnType; }; @@ -650,7 +650,7 @@ describe("applyExtraParamsToAgent", () => { const payloads: Record[] = []; const baseStreamFn: StreamFn = (_model, _context, options) => { const payload: Record = {}; - options?.onPayload?.(payload, model); + options?.onPayload?.(payload); payloads.push(payload); return {} as ReturnType; }; @@ -674,7 +674,7 @@ describe("applyExtraParamsToAgent", () => { const payloads: Record[] = []; const baseStreamFn: StreamFn = (_model, _context, options) => { const payload: Record = { tool_choice: "required" }; - options?.onPayload?.(payload, model); + options?.onPayload?.(payload); payloads.push(payload); return {} as ReturnType; }; @@ -699,7 +699,7 @@ describe("applyExtraParamsToAgent", () => { const payloads: Record[] = []; const baseStreamFn: StreamFn = (_model, _context, options) => { const payload: Record = {}; - options?.onPayload?.(payload, model); + options?.onPayload?.(payload); payloads.push(payload); return {} as ReturnType; }; @@ -749,7 +749,7 @@ describe("applyExtraParamsToAgent", () => { ], tool_choice: { type: "tool", name: "read" }, }; - options?.onPayload?.(payload, model); + options?.onPayload?.(payload); payloads.push(payload); return {} as ReturnType; }; @@ -793,7 +793,7 @@ describe("applyExtraParamsToAgent", () => { }, ], }; - options?.onPayload?.(payload, model); + options?.onPayload?.(payload); payloads.push(payload); return {} as ReturnType; }; @@ -832,7 +832,7 @@ describe("applyExtraParamsToAgent", () => { }, ], }; - options?.onPayload?.(payload, model); + options?.onPayload?.(payload); payloads.push(payload); return {} as ReturnType; }; @@ -896,7 +896,7 @@ describe("applyExtraParamsToAgent", () => { }, }, }; - options?.onPayload?.(payload, model); + options?.onPayload?.(payload); payloads.push(payload); return {} as ReturnType; }; @@ -943,7 +943,7 @@ describe("applyExtraParamsToAgent", () => { }, }, }; - options?.onPayload?.(payload, model); + options?.onPayload?.(payload); payloads.push(payload); return {} as ReturnType; }; diff --git a/src/agents/pi-embedded-runner/anthropic-stream-wrappers.ts b/src/agents/pi-embedded-runner/anthropic-stream-wrappers.ts index df43d2570c7..66718b9e0aa 100644 --- a/src/agents/pi-embedded-runner/anthropic-stream-wrappers.ts +++ b/src/agents/pi-embedded-runner/anthropic-stream-wrappers.ts @@ -298,7 +298,7 @@ export function createAnthropicToolPayloadCompatibilityWrapper( ); } } - return originalOnPayload?.(payload, model); + return originalOnPayload?.(payload); }, }); }; diff --git a/src/agents/pi-embedded-runner/compact.ts b/src/agents/pi-embedded-runner/compact.ts index feba0f81493..f4ea6819eae 100644 --- a/src/agents/pi-embedded-runner/compact.ts +++ b/src/agents/pi-embedded-runner/compact.ts @@ -114,6 +114,8 @@ export type CompactEmbeddedPiSessionParams = { /** Whether the sender is an owner (required for owner-only tools). */ senderIsOwner?: boolean; sessionFile: string; + /** Optional caller-observed live prompt tokens used for compaction diagnostics. */ + currentTokenCount?: number; workspaceDir: string; agentDir?: string; config?: OpenClawConfig; @@ -152,6 +154,12 @@ function createCompactionDiagId(): string { return `cmp-${Date.now().toString(36)}-${generateSecureToken(4)}`; } +function normalizeObservedTokenCount(value: unknown): number | undefined { + return typeof value === "number" && Number.isFinite(value) && value > 0 + ? Math.floor(value) + : undefined; +} + function getMessageTextChars(msg: AgentMessage): number { const content = (msg as { content?: unknown }).content; if (typeof content === "string") { @@ -228,6 +236,9 @@ function classifyCompactionReason(reason?: string): string { if (text.includes("already compacted")) { return "already_compacted_recently"; } + if (text.includes("still exceeds target")) { + return "live_context_still_exceeds_target"; + } if (text.includes("guard")) { return "guard_blocked"; } @@ -701,6 +712,7 @@ export async function compactEmbeddedPiSessionDirect( const missingSessionKey = !params.sessionKey || !params.sessionKey.trim(); const hookSessionKey = params.sessionKey?.trim() || params.sessionId; const hookRunner = getGlobalHookRunner(); + const observedTokenCount = normalizeObservedTokenCount(params.currentTokenCount); const messageCountOriginal = originalMessages.length; let tokenCountOriginal: number | undefined; try { @@ -712,14 +724,16 @@ export async function compactEmbeddedPiSessionDirect( tokenCountOriginal = undefined; } const messageCountBefore = session.messages.length; - let tokenCountBefore: number | undefined; - try { - tokenCountBefore = 0; - for (const message of session.messages) { - tokenCountBefore += estimateTokens(message); + let tokenCountBefore = observedTokenCount; + if (tokenCountBefore === undefined) { + try { + tokenCountBefore = 0; + for (const message of session.messages) { + tokenCountBefore += estimateTokens(message); + } + } catch { + tokenCountBefore = undefined; } - } catch { - tokenCountBefore = undefined; } // TODO(#7175): Consider exposing full message snapshots or pre-compaction injection // hooks; current events only report counts/metadata. @@ -802,7 +816,7 @@ export async function compactEmbeddedPiSessionDirect( tokensAfter += estimateTokens(message); } // Sanity check: tokensAfter should be less than tokensBefore - if (tokensAfter > result.tokensBefore) { + if (tokensAfter > (observedTokenCount ?? result.tokensBefore)) { tokensAfter = undefined; // Don't trust the estimate } } catch { @@ -876,7 +890,7 @@ export async function compactEmbeddedPiSessionDirect( result: { summary: result.summary, firstKeptEntryId: result.firstKeptEntryId, - tokensBefore: result.tokensBefore, + tokensBefore: observedTokenCount ?? result.tokensBefore, tokensAfter, details: result.details, }, @@ -977,6 +991,7 @@ export async function compactEmbeddedPiSession( sessionId: params.sessionId, sessionFile: params.sessionFile, tokenBudget: ceCtxInfo.tokens, + currentTokenCount: params.currentTokenCount, customInstructions: params.customInstructions, force: params.trigger === "manual", runtimeContext: params as Record, diff --git a/src/agents/pi-embedded-runner/extra-params.kilocode.test.ts b/src/agents/pi-embedded-runner/extra-params.kilocode.test.ts index 0e2fd5ce93b..509cdb5edf4 100644 --- a/src/agents/pi-embedded-runner/extra-params.kilocode.test.ts +++ b/src/agents/pi-embedded-runner/extra-params.kilocode.test.ts @@ -19,7 +19,7 @@ function applyAndCapture(params: { const baseStreamFn: StreamFn = (_model, _context, options) => { captured.headers = options?.headers; - options?.onPayload?.({}, _model); + options?.onPayload?.({}); return createAssistantMessageEventStream(); }; const agent = { streamFn: baseStreamFn }; @@ -97,7 +97,7 @@ describe("extra-params: Kilocode kilo/auto reasoning", () => { const baseStreamFn: StreamFn = (_model, _context, options) => { const payload: Record = { reasoning_effort: "high" }; - options?.onPayload?.(payload, _model); + options?.onPayload?.(payload); capturedPayload = payload; return createAssistantMessageEventStream(); }; @@ -125,7 +125,7 @@ describe("extra-params: Kilocode kilo/auto reasoning", () => { const baseStreamFn: StreamFn = (_model, _context, options) => { const payload: Record = {}; - options?.onPayload?.(payload, _model); + options?.onPayload?.(payload); capturedPayload = payload; return createAssistantMessageEventStream(); }; @@ -158,7 +158,7 @@ describe("extra-params: Kilocode kilo/auto reasoning", () => { const baseStreamFn: StreamFn = (_model, _context, options) => { const payload: Record = { reasoning_effort: "high" }; - options?.onPayload?.(payload, _model); + options?.onPayload?.(payload); capturedPayload = payload; return createAssistantMessageEventStream(); }; diff --git a/src/agents/pi-embedded-runner/extra-params.openrouter-cache-control.test.ts b/src/agents/pi-embedded-runner/extra-params.openrouter-cache-control.test.ts index 58af2239a3d..71af916ccac 100644 --- a/src/agents/pi-embedded-runner/extra-params.openrouter-cache-control.test.ts +++ b/src/agents/pi-embedded-runner/extra-params.openrouter-cache-control.test.ts @@ -13,7 +13,7 @@ type StreamPayload = { function runOpenRouterPayload(payload: StreamPayload, modelId: string) { const baseStreamFn: StreamFn = (_model, _context, options) => { - options?.onPayload?.(payload, _model); + options?.onPayload?.(payload); return createAssistantMessageEventStream(); }; const agent = { streamFn: baseStreamFn }; diff --git a/src/agents/pi-embedded-runner/extra-params.ts b/src/agents/pi-embedded-runner/extra-params.ts index 8f36792f393..6e261463d4a 100644 --- a/src/agents/pi-embedded-runner/extra-params.ts +++ b/src/agents/pi-embedded-runner/extra-params.ts @@ -230,7 +230,7 @@ function createGoogleThinkingPayloadWrapper( thinkingLevel, }); } - return onPayload?.(payload, model); + return onPayload?.(payload); }, }); }; @@ -263,7 +263,7 @@ function createZaiToolStreamWrapper( // Inject tool_stream: true for Z.AI API (payload as Record).tool_stream = true; } - return originalOnPayload?.(payload, model); + return originalOnPayload?.(payload); }, }); }; @@ -310,7 +310,7 @@ function createParallelToolCallsWrapper( if (payload && typeof payload === "object") { (payload as Record).parallel_tool_calls = enabled; } - return originalOnPayload?.(payload, model); + return originalOnPayload?.(payload); }, }); }; diff --git a/src/agents/pi-embedded-runner/extra-params.zai-tool-stream.test.ts b/src/agents/pi-embedded-runner/extra-params.zai-tool-stream.test.ts index f7262a66798..2dab69cd15a 100644 --- a/src/agents/pi-embedded-runner/extra-params.zai-tool-stream.test.ts +++ b/src/agents/pi-embedded-runner/extra-params.zai-tool-stream.test.ts @@ -22,7 +22,7 @@ type ToolStreamCase = { function runToolStreamCase(params: ToolStreamCase) { const payload: Record = { model: params.model.id, messages: [] }; const baseStreamFn: StreamFn = (model, _context, options) => { - options?.onPayload?.(payload, model); + options?.onPayload?.(payload); return {} as ReturnType; }; const agent = { streamFn: baseStreamFn }; diff --git a/src/agents/pi-embedded-runner/moonshot-stream-wrappers.ts b/src/agents/pi-embedded-runner/moonshot-stream-wrappers.ts index 282b0960a9d..aa43260e55e 100644 --- a/src/agents/pi-embedded-runner/moonshot-stream-wrappers.ts +++ b/src/agents/pi-embedded-runner/moonshot-stream-wrappers.ts @@ -60,7 +60,7 @@ export function createSiliconFlowThinkingWrapper(baseStreamFn: StreamFn | undefi payloadObj.thinking = null; } } - return originalOnPayload?.(payload, model); + return originalOnPayload?.(payload); }, }); }; @@ -106,7 +106,7 @@ export function createMoonshotThinkingWrapper( payloadObj.tool_choice = "auto"; } } - return originalOnPayload?.(payload, model); + return originalOnPayload?.(payload); }, }); }; diff --git a/src/agents/pi-embedded-runner/openai-stream-wrappers.ts b/src/agents/pi-embedded-runner/openai-stream-wrappers.ts index c9bc2304f97..5667d39e8a7 100644 --- a/src/agents/pi-embedded-runner/openai-stream-wrappers.ts +++ b/src/agents/pi-embedded-runner/openai-stream-wrappers.ts @@ -197,7 +197,7 @@ export function createOpenAIResponsesContextManagementWrapper( compactThreshold, }); } - return originalOnPayload?.(payload, model); + return originalOnPayload?.(payload); }, }); }; @@ -226,7 +226,7 @@ export function createOpenAIServiceTierWrapper( payloadObj.service_tier = serviceTier; } } - return originalOnPayload?.(payload, model); + return originalOnPayload?.(payload); }, }); }; diff --git a/src/agents/pi-embedded-runner/proxy-stream-wrappers.ts b/src/agents/pi-embedded-runner/proxy-stream-wrappers.ts index 4f77c31cfdd..a5f9f5b1d85 100644 --- a/src/agents/pi-embedded-runner/proxy-stream-wrappers.ts +++ b/src/agents/pi-embedded-runner/proxy-stream-wrappers.ts @@ -92,7 +92,7 @@ export function createOpenRouterSystemCacheWrapper(baseStreamFn: StreamFn | unde } } } - return originalOnPayload?.(payload, model); + return originalOnPayload?.(payload); }, }); }; @@ -113,7 +113,7 @@ export function createOpenRouterWrapper( }, onPayload: (payload) => { normalizeProxyReasoningPayload(payload, thinkingLevel); - return onPayload?.(payload, model); + return onPayload?.(payload); }, }); }; @@ -138,7 +138,7 @@ export function createKilocodeWrapper( }, onPayload: (payload) => { normalizeProxyReasoningPayload(payload, thinkingLevel); - return onPayload?.(payload, model); + return onPayload?.(payload); }, }); }; diff --git a/src/agents/pi-embedded-runner/run.overflow-compaction.mocks.shared.ts b/src/agents/pi-embedded-runner/run.overflow-compaction.mocks.shared.ts index 51f711508b1..3e3d4a83461 100644 --- a/src/agents/pi-embedded-runner/run.overflow-compaction.mocks.shared.ts +++ b/src/agents/pi-embedded-runner/run.overflow-compaction.mocks.shared.ts @@ -109,13 +109,21 @@ vi.mock("../workspace-run.js", () => ({ vi.mock("../pi-embedded-helpers.js", () => ({ formatBillingErrorMessage: vi.fn(() => ""), classifyFailoverReason: vi.fn(() => null), + extractObservedOverflowTokenCount: vi.fn((msg?: string) => { + const match = msg?.match(/prompt is too long:\s*([\d,]+)\s+tokens\s*>\s*[\d,]+\s+maximum/i); + return match?.[1] ? Number(match[1].replaceAll(",", "")) : undefined; + }), formatAssistantErrorText: vi.fn(() => ""), isAuthAssistantError: vi.fn(() => false), isBillingAssistantError: vi.fn(() => false), isCompactionFailureError: vi.fn(() => false), isLikelyContextOverflowError: vi.fn((msg?: string) => { const lower = (msg ?? "").toLowerCase(); - return lower.includes("request_too_large") || lower.includes("context window exceeded"); + return ( + lower.includes("request_too_large") || + lower.includes("context window exceeded") || + lower.includes("prompt is too long") + ); }), isFailoverAssistantError: vi.fn(() => false), isFailoverErrorMessage: vi.fn(() => false), diff --git a/src/agents/pi-embedded-runner/run.overflow-compaction.test.ts b/src/agents/pi-embedded-runner/run.overflow-compaction.test.ts index b29394eedfd..b9f7707c0b6 100644 --- a/src/agents/pi-embedded-runner/run.overflow-compaction.test.ts +++ b/src/agents/pi-embedded-runner/run.overflow-compaction.test.ts @@ -111,6 +111,32 @@ describe("runEmbeddedPiAgent overflow compaction trigger routing", () => { ); }); + it("passes observed overflow token counts into compaction when providers report them", async () => { + const overflowError = new Error( + '400 {"type":"error","error":{"type":"invalid_request_error","message":"prompt is too long: 277403 tokens > 200000 maximum"}}', + ); + + mockedRunEmbeddedAttempt + .mockResolvedValueOnce(makeAttemptResult({ promptError: overflowError })) + .mockResolvedValueOnce(makeAttemptResult({ promptError: null })); + mockedCompactDirect.mockResolvedValueOnce( + makeCompactionSuccess({ + summary: "Compacted session", + firstKeptEntryId: "entry-8", + tokensBefore: 277403, + }), + ); + + const result = await runEmbeddedPiAgent(overflowBaseRunParams); + + expect(mockedCompactDirect).toHaveBeenCalledWith( + expect.objectContaining({ + currentTokenCount: 277403, + }), + ); + expect(result.meta.error).toBeUndefined(); + }); + it("does not reset compaction attempt budget after successful tool-result truncation", async () => { const overflowError = queueOverflowAttemptWithOversizedToolOutput( mockedRunEmbeddedAttempt, diff --git a/src/agents/pi-embedded-runner/run.ts b/src/agents/pi-embedded-runner/run.ts index 09d5adda724..32afe874442 100644 --- a/src/agents/pi-embedded-runner/run.ts +++ b/src/agents/pi-embedded-runner/run.ts @@ -40,6 +40,7 @@ import { ensureOpenClawModelsJson } from "../models-config.js"; import { formatBillingErrorMessage, classifyFailoverReason, + extractObservedOverflowTokenCount, formatAssistantErrorText, isAuthAssistantError, isBillingAssistantError, @@ -988,11 +989,13 @@ export async function runEmbeddedPiAgent( const overflowDiagId = createCompactionDiagId(); const errorText = contextOverflowError.text; const msgCount = attempt.messagesSnapshot?.length ?? 0; + const observedOverflowTokens = extractObservedOverflowTokenCount(errorText); log.warn( `[context-overflow-diag] sessionKey=${params.sessionKey ?? params.sessionId} ` + `provider=${provider}/${modelId} source=${contextOverflowError.source} ` + `messages=${msgCount} sessionFile=${params.sessionFile} ` + `diagId=${overflowDiagId} compactionAttempts=${overflowCompactionAttempts} ` + + `observedTokens=${observedOverflowTokens ?? "unknown"} ` + `error=${errorText.slice(0, 200)}`, ); const isCompactionFailure = isCompactionFailureError(errorText); @@ -1052,6 +1055,9 @@ export async function runEmbeddedPiAgent( sessionId: params.sessionId, sessionFile: params.sessionFile, tokenBudget: ctxInfo.tokens, + ...(observedOverflowTokens !== undefined + ? { currentTokenCount: observedOverflowTokens } + : {}), force: true, compactionTarget: "budget", runtimeContext: { @@ -1074,6 +1080,9 @@ export async function runEmbeddedPiAgent( extraSystemPrompt: params.extraSystemPrompt, ownerNumbers: params.ownerNumbers, trigger: "overflow", + ...(observedOverflowTokens !== undefined + ? { currentTokenCount: observedOverflowTokens } + : {}), diagId: overflowDiagId, attempt: overflowCompactionAttempts, maxAttempts: MAX_OVERFLOW_COMPACTION_ATTEMPTS, diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index 2f77b46aff5..f7c4c75e60a 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -233,14 +233,14 @@ export function wrapOllamaCompatNumCtx(baseFn: StreamFn | undefined, numCtx: num ...options, onPayload: (payload: unknown) => { if (!payload || typeof payload !== "object") { - return options?.onPayload?.(payload, model); + return options?.onPayload?.(payload); } const payloadRecord = payload as Record; if (!payloadRecord.options || typeof payloadRecord.options !== "object") { payloadRecord.options = {}; } (payloadRecord.options as Record).num_ctx = numCtx; - return options?.onPayload?.(payload, model); + return options?.onPayload?.(payload); }, }); } diff --git a/src/commands/openai-codex-oauth.test.ts b/src/commands/openai-codex-oauth.test.ts index 43f1ac41f8a..abe71d0bd42 100644 --- a/src/commands/openai-codex-oauth.test.ts +++ b/src/commands/openai-codex-oauth.test.ts @@ -9,7 +9,7 @@ const mocks = vi.hoisted(() => ({ formatOpenAIOAuthTlsPreflightFix: vi.fn(), })); -vi.mock("@mariozechner/pi-ai/oauth", () => ({ +vi.mock("@mariozechner/pi-ai", () => ({ loginOpenAICodex: mocks.loginOpenAICodex, })); diff --git a/src/commands/openai-codex-oauth.ts b/src/commands/openai-codex-oauth.ts index 1f6a8f9cde8..72a13f654cf 100644 --- a/src/commands/openai-codex-oauth.ts +++ b/src/commands/openai-codex-oauth.ts @@ -1,5 +1,5 @@ import type { OAuthCredentials } from "@mariozechner/pi-ai"; -import { loginOpenAICodex } from "@mariozechner/pi-ai/oauth"; +import { loginOpenAICodex } from "@mariozechner/pi-ai"; import type { RuntimeEnv } from "../runtime.js"; import type { WizardPrompter } from "../wizard/prompts.js"; import { createVpsAwareOAuthHandlers } from "./oauth-flow.js"; diff --git a/src/context-engine/context-engine.test.ts b/src/context-engine/context-engine.test.ts index 9b40008f1a0..3d9b7dc4fc1 100644 --- a/src/context-engine/context-engine.test.ts +++ b/src/context-engine/context-engine.test.ts @@ -1,5 +1,6 @@ import type { AgentMessage } from "@mariozechner/pi-agent-core"; -import { describe, expect, it, beforeEach } from "vitest"; +import { beforeEach, describe, expect, it, vi } from "vitest"; +import { compactEmbeddedPiSessionDirect } from "../agents/pi-embedded-runner/compact.runtime.js"; // --------------------------------------------------------------------------- // We dynamically import the registry so we can get a fresh module per test // group when needed. For most groups we use the shared singleton directly. @@ -19,6 +20,23 @@ import type { IngestResult, } from "./types.js"; +vi.mock("../agents/pi-embedded-runner/compact.runtime.js", () => ({ + compactEmbeddedPiSessionDirect: vi.fn(async () => ({ + ok: true, + compacted: false, + reason: "mock compaction", + result: { + summary: "", + firstKeptEntryId: "", + tokensBefore: 0, + tokensAfter: 0, + details: undefined, + }, + })), +})); + +const mockedCompactEmbeddedPiSessionDirect = vi.mocked(compactEmbeddedPiSessionDirect); + // --------------------------------------------------------------------------- // Helpers // --------------------------------------------------------------------------- @@ -91,6 +109,10 @@ class MockContextEngine implements ContextEngine { // ═══════════════════════════════════════════════════════════════════════════ describe("Engine contract tests", () => { + beforeEach(() => { + mockedCompactEmbeddedPiSessionDirect.mockClear(); + }); + it("a mock engine implementing ContextEngine can be registered and resolved", async () => { const factory = () => new MockContextEngine(); registerContextEngine("mock", factory); @@ -153,6 +175,25 @@ describe("Engine contract tests", () => { // Should complete without error await expect(engine.dispose()).resolves.toBeUndefined(); }); + + it("legacy compact preserves runtimeContext currentTokenCount when top-level value is absent", async () => { + const engine = new LegacyContextEngine(); + + await engine.compact({ + sessionId: "s1", + sessionFile: "/tmp/session.json", + runtimeContext: { + workspaceDir: "/tmp/workspace", + currentTokenCount: 277403, + }, + }); + + expect(mockedCompactEmbeddedPiSessionDirect).toHaveBeenCalledWith( + expect.objectContaining({ + currentTokenCount: 277403, + }), + ); + }); }); // ═══════════════════════════════════════════════════════════════════════════ diff --git a/src/context-engine/legacy.ts b/src/context-engine/legacy.ts index 011022ae26a..ffeb5cab9bd 100644 --- a/src/context-engine/legacy.ts +++ b/src/context-engine/legacy.ts @@ -78,6 +78,13 @@ export class LegacyContextEngine implements ContextEngine { // set by the caller in run.ts. We spread them and override the fields // that come from the ContextEngine compact() signature directly. const runtimeContext = params.runtimeContext ?? {}; + const currentTokenCount = + params.currentTokenCount ?? + (typeof runtimeContext.currentTokenCount === "number" && + Number.isFinite(runtimeContext.currentTokenCount) && + runtimeContext.currentTokenCount > 0 + ? Math.floor(runtimeContext.currentTokenCount) + : undefined); // eslint-disable-next-line @typescript-eslint/no-explicit-any -- bridge runtimeContext matches CompactEmbeddedPiSessionParams const result = await compactEmbeddedPiSessionDirect({ @@ -85,6 +92,7 @@ export class LegacyContextEngine implements ContextEngine { sessionId: params.sessionId, sessionFile: params.sessionFile, tokenBudget: params.tokenBudget, + ...(currentTokenCount !== undefined ? { currentTokenCount } : {}), force: params.force, customInstructions: params.customInstructions, workspaceDir: (runtimeContext.workspaceDir as string) ?? process.cwd(), diff --git a/test/setup.ts b/test/setup.ts index a6f902cb90f..659956cc2c8 100644 --- a/test/setup.ts +++ b/test/setup.ts @@ -10,12 +10,6 @@ vi.mock("@mariozechner/pi-ai", async (importOriginal) => { }; }); -vi.mock("@mariozechner/pi-ai/oauth", () => ({ - getOAuthApiKey: () => undefined, - getOAuthProviders: () => [], - loginOpenAICodex: vi.fn(), -})); - // Ensure Vitest environment is properly set process.env.VITEST = "true"; // Config validation walks plugin manifests; keep an aggressive cache in tests to avoid