import fs from "node:fs/promises"; import os from "node:os"; import type { AgentMessage, StreamFn } from "@mariozechner/pi-agent-core"; import { streamSimple } from "@mariozechner/pi-ai"; import { createAgentSession, DefaultResourceLoader, SessionManager, } from "@mariozechner/pi-coding-agent"; import { resolveSignalReactionLevel } from "../../../../extensions/signal/src/reaction-level.js"; import { resolveTelegramInlineButtonsScope } from "../../../../extensions/telegram/src/inline-buttons.js"; import { resolveTelegramReactionLevel } from "../../../../extensions/telegram/src/reaction-level.js"; import { resolveHeartbeatPrompt } from "../../../auto-reply/heartbeat.js"; import { resolveChannelCapabilities } from "../../../config/channel-capabilities.js"; import type { OpenClawConfig } from "../../../config/config.js"; import { getMachineDisplayName } from "../../../infra/machine-name.js"; import { ensureGlobalUndiciEnvProxyDispatcher, ensureGlobalUndiciStreamTimeouts, } from "../../../infra/net/undici-global-dispatcher.js"; import { MAX_IMAGE_BYTES } from "../../../media/constants.js"; import { getGlobalHookRunner } from "../../../plugins/hook-runner-global.js"; import type { PluginHookAgentContext, PluginHookBeforeAgentStartResult, PluginHookBeforePromptBuildResult, } from "../../../plugins/types.js"; import { isCronSessionKey, isSubagentSessionKey } from "../../../routing/session-key.js"; import { joinPresentTextSegments } from "../../../shared/text/join-segments.js"; import { buildTtsSystemPromptHint } from "../../../tts/tts.js"; import { resolveUserPath } from "../../../utils.js"; import { normalizeMessageChannel } from "../../../utils/message-channel.js"; import { isReasoningTagProvider } from "../../../utils/provider-utils.js"; import { resolveOpenClawAgentDir } from "../../agent-paths.js"; import { resolveSessionAgentIds } from "../../agent-scope.js"; import { createAnthropicPayloadLogger } from "../../anthropic-payload-log.js"; import { analyzeBootstrapBudget, buildBootstrapPromptWarning, buildBootstrapTruncationReportMeta, buildBootstrapInjectionStats, } from "../../bootstrap-budget.js"; import { makeBootstrapWarn, resolveBootstrapContextForRun } from "../../bootstrap-files.js"; import { createCacheTrace } from "../../cache-trace.js"; import { listChannelSupportedActions, resolveChannelMessageToolHints, } from "../../channel-tools.js"; import { ensureCustomApiRegistered } from "../../custom-api-registry.js"; import { DEFAULT_CONTEXT_TOKENS } from "../../defaults.js"; import { resolveOpenClawDocsPath } from "../../docs-path.js"; import { isTimeoutError } from "../../failover-error.js"; import { resolveImageSanitizationLimits } from "../../image-sanitization.js"; import { resolveModelAuthMode } from "../../model-auth.js"; import { normalizeProviderId, resolveDefaultModelForAgent } from "../../model-selection.js"; import { supportsModelTools } from "../../model-tool-support.js"; import { createConfiguredOllamaStreamFn } from "../../ollama-stream.js"; import { createOpenAIWebSocketStreamFn, releaseWsSession } from "../../openai-ws-stream.js"; import { resolveOwnerDisplaySetting } from "../../owner-display.js"; import { downgradeOpenAIFunctionCallReasoningPairs, isCloudCodeAssistFormatError, resolveBootstrapMaxChars, resolveBootstrapPromptTruncationWarningMode, resolveBootstrapTotalMaxChars, validateAnthropicTurns, validateGeminiTurns, } from "../../pi-embedded-helpers.js"; import { subscribeEmbeddedPiSession } from "../../pi-embedded-subscribe.js"; import { createPreparedEmbeddedPiSettingsManager } from "../../pi-project-settings.js"; import { applyPiAutoCompactionGuard } from "../../pi-settings.js"; import { toClientToolDefinitions } from "../../pi-tool-definition-adapter.js"; import { createOpenClawCodingTools, resolveToolLoopDetectionConfig } from "../../pi-tools.js"; import { resolveSandboxContext } from "../../sandbox.js"; import { resolveSandboxRuntimeStatus } from "../../sandbox/runtime-status.js"; import { isXaiProvider } from "../../schema/clean-for-xai.js"; import { repairSessionFileIfNeeded } from "../../session-file-repair.js"; import { guardSessionManager } from "../../session-tool-result-guard-wrapper.js"; import { sanitizeToolUseResultPairing } from "../../session-transcript-repair.js"; import { acquireSessionWriteLock, resolveSessionLockMaxHoldFromTimeout, } from "../../session-write-lock.js"; import { detectRuntimeShell } from "../../shell-utils.js"; import { applySkillEnvOverrides, applySkillEnvOverridesFromSnapshot, resolveSkillsPromptForRun, } from "../../skills.js"; import { buildSystemPromptParams } from "../../system-prompt-params.js"; import { buildSystemPromptReport } from "../../system-prompt-report.js"; import { sanitizeToolCallIdsForCloudCodeAssist } from "../../tool-call-id.js"; import { resolveEffectiveToolFsWorkspaceOnly } from "../../tool-fs-policy.js"; import { normalizeToolName } from "../../tool-policy.js"; import { resolveTranscriptPolicy } from "../../transcript-policy.js"; import { DEFAULT_BOOTSTRAP_FILENAME } from "../../workspace.js"; import { isRunnerAbortError } from "../abort.js"; import { appendCacheTtlTimestamp, isCacheTtlEligibleProvider } from "../cache-ttl.js"; import type { CompactEmbeddedPiSessionParams } from "../compact.js"; import { buildEmbeddedExtensionFactories } from "../extensions.js"; import { applyExtraParamsToAgent } from "../extra-params.js"; import { logToolSchemasForGoogle, sanitizeSessionHistory, sanitizeToolsForGoogle, } from "../google.js"; import { getDmHistoryLimitFromSessionKey, limitHistoryTurns } from "../history.js"; import { log } from "../logger.js"; import { buildModelAliasLines } from "../model.js"; import { clearActiveEmbeddedRun, type EmbeddedPiQueueHandle, setActiveEmbeddedRun, } from "../runs.js"; import { buildEmbeddedSandboxInfo } from "../sandbox-info.js"; import { prewarmSessionFile, trackSessionManagerAccess } from "../session-manager-cache.js"; import { prepareSessionManagerForRun } from "../session-manager-init.js"; import { resolveEmbeddedRunSkillEntries } from "../skills-runtime.js"; import { applySystemPromptOverrideToSession, buildEmbeddedSystemPrompt, createSystemPromptOverride, } from "../system-prompt.js"; import { dropThinkingBlocks } from "../thinking.js"; import { collectAllowedToolNames } from "../tool-name-allowlist.js"; import { installToolResultContextGuard } from "../tool-result-context-guard.js"; import { splitSdkTools } from "../tool-split.js"; import { describeUnknownError, mapThinkingLevel } from "../utils.js"; import { flushPendingToolResultsAfterIdle } from "../wait-for-idle-before-flush.js"; import { waitForCompactionRetryWithAggregateTimeout } from "./compaction-retry-aggregate-timeout.js"; import { selectCompactionTimeoutSnapshot, shouldFlagCompactionTimeout, } from "./compaction-timeout.js"; import { pruneProcessedHistoryImages } from "./history-image-prune.js"; import { detectAndLoadPromptImages } from "./images.js"; import type { EmbeddedRunAttemptParams, EmbeddedRunAttemptResult } from "./types.js"; type PromptBuildHookRunner = { hasHooks: (hookName: "before_prompt_build" | "before_agent_start") => boolean; runBeforePromptBuild: ( event: { prompt: string; messages: unknown[] }, ctx: PluginHookAgentContext, ) => Promise; runBeforeAgentStart: ( event: { prompt: string; messages: unknown[] }, ctx: PluginHookAgentContext, ) => Promise; }; const SESSIONS_YIELD_INTERRUPT_CUSTOM_TYPE = "openclaw.sessions_yield_interrupt"; const SESSIONS_YIELD_CONTEXT_CUSTOM_TYPE = "openclaw.sessions_yield"; // Persist a hidden context reminder so the next turn knows why the runner stopped. function buildSessionsYieldContextMessage(message: string): string { return `${message}\n\n[Context: The previous turn ended intentionally via sessions_yield while waiting for a follow-up event.]`; } // Return a synthetic aborted response so pi-agent-core unwinds without a real provider call. function createYieldAbortedResponse(model: { api?: string; provider?: string; id?: string }): { [Symbol.asyncIterator]: () => AsyncGenerator; result: () => Promise<{ role: "assistant"; content: Array<{ type: "text"; text: string }>; stopReason: "aborted"; api: string; provider: string; model: string; usage: { input: number; output: number; cacheRead: number; cacheWrite: number; totalTokens: number; cost: { input: number; output: number; cacheRead: number; cacheWrite: number; total: number; }; }; timestamp: number; }>; } { const message = { role: "assistant" as const, content: [{ type: "text" as const, text: "" }], stopReason: "aborted" as const, api: model.api ?? "", provider: model.provider ?? "", model: model.id ?? "", usage: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, totalTokens: 0, cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0, }, }, timestamp: Date.now(), }; return { async *[Symbol.asyncIterator]() {}, result: async () => message, }; } // Queue a hidden steering message so pi-agent-core skips any remaining tool calls. function queueSessionsYieldInterruptMessage(activeSession: { agent: { steer: (message: AgentMessage) => void }; }) { activeSession.agent.steer({ role: "custom", customType: SESSIONS_YIELD_INTERRUPT_CUSTOM_TYPE, content: "[sessions_yield interrupt]", display: false, details: { source: "sessions_yield" }, timestamp: Date.now(), }); } // Append the caller-provided yield payload as a hidden session message once the run is idle. async function persistSessionsYieldContextMessage( activeSession: { sendCustomMessage: ( message: { customType: string; content: string; display: boolean; details?: Record; }, options?: { triggerTurn?: boolean }, ) => Promise; }, message: string, ) { await activeSession.sendCustomMessage( { customType: SESSIONS_YIELD_CONTEXT_CUSTOM_TYPE, content: buildSessionsYieldContextMessage(message), display: false, details: { source: "sessions_yield", message }, }, { triggerTurn: false }, ); } // Remove the synthetic yield interrupt + aborted assistant entry from the live transcript. function stripSessionsYieldArtifacts(activeSession: { messages: AgentMessage[]; agent: { replaceMessages: (messages: AgentMessage[]) => void }; sessionManager?: unknown; }) { const strippedMessages = activeSession.messages.slice(); while (strippedMessages.length > 0) { const last = strippedMessages.at(-1) as | AgentMessage | { role?: string; customType?: string; stopReason?: string }; if (last?.role === "assistant" && "stopReason" in last && last.stopReason === "aborted") { strippedMessages.pop(); continue; } if ( last?.role === "custom" && "customType" in last && last.customType === SESSIONS_YIELD_INTERRUPT_CUSTOM_TYPE ) { strippedMessages.pop(); continue; } break; } if (strippedMessages.length !== activeSession.messages.length) { activeSession.agent.replaceMessages(strippedMessages); } const sessionManager = activeSession.sessionManager as | { fileEntries?: Array<{ type?: string; id?: string; parentId?: string | null; message?: { role?: string; stopReason?: string }; customType?: string; }>; byId?: Map; leafId?: string | null; _rewriteFile?: () => void; } | undefined; const fileEntries = sessionManager?.fileEntries; const byId = sessionManager?.byId; if (!fileEntries || !byId) { return; } let changed = false; while (fileEntries.length > 1) { const last = fileEntries.at(-1); if (!last || last.type === "session") { break; } const isYieldAbortAssistant = last.type === "message" && last.message?.role === "assistant" && last.message?.stopReason === "aborted"; const isYieldInterruptMessage = last.type === "custom_message" && last.customType === SESSIONS_YIELD_INTERRUPT_CUSTOM_TYPE; if (!isYieldAbortAssistant && !isYieldInterruptMessage) { break; } fileEntries.pop(); if (last.id) { byId.delete(last.id); } sessionManager.leafId = last.parentId ?? null; changed = true; } if (changed) { sessionManager._rewriteFile?.(); } } export function isOllamaCompatProvider(model: { provider?: string; baseUrl?: string; api?: string; }): boolean { const providerId = normalizeProviderId(model.provider ?? ""); if (providerId === "ollama") { return true; } if (!model.baseUrl) { return false; } try { const parsed = new URL(model.baseUrl); const hostname = parsed.hostname.toLowerCase(); const isLocalhost = hostname === "localhost" || hostname === "127.0.0.1" || hostname === "::1" || hostname === "[::1]"; if (isLocalhost && parsed.port === "11434") { return true; } // Allow remote/LAN Ollama OpenAI-compatible endpoints when the provider id // itself indicates Ollama usage (e.g. "my-ollama"). const providerHintsOllama = providerId.includes("ollama"); const isOllamaPort = parsed.port === "11434"; const isOllamaCompatPath = parsed.pathname === "/" || /^\/v1\/?$/i.test(parsed.pathname); return providerHintsOllama && isOllamaPort && isOllamaCompatPath; } catch { return false; } } export function resolveOllamaCompatNumCtxEnabled(params: { config?: OpenClawConfig; providerId?: string; }): boolean { const providerId = params.providerId?.trim(); if (!providerId) { return true; } const providers = params.config?.models?.providers; if (!providers) { return true; } const direct = providers[providerId]; if (direct) { return direct.injectNumCtxForOpenAICompat ?? true; } const normalized = normalizeProviderId(providerId); for (const [candidateId, candidate] of Object.entries(providers)) { if (normalizeProviderId(candidateId) === normalized) { return candidate.injectNumCtxForOpenAICompat ?? true; } } return true; } export function shouldInjectOllamaCompatNumCtx(params: { model: { api?: string; provider?: string; baseUrl?: string }; config?: OpenClawConfig; providerId?: string; }): boolean { // Restrict to the OpenAI-compatible adapter path only. if (params.model.api !== "openai-completions") { return false; } if (!isOllamaCompatProvider(params.model)) { return false; } return resolveOllamaCompatNumCtxEnabled({ config: params.config, providerId: params.providerId, }); } export function wrapOllamaCompatNumCtx(baseFn: StreamFn | undefined, numCtx: number): StreamFn { const streamFn = baseFn ?? streamSimple; return (model, context, options) => streamFn(model, context, { ...options, onPayload: (payload: unknown) => { if (!payload || typeof payload !== "object") { return options?.onPayload?.(payload, model); } const payloadRecord = payload as Record; if (!payloadRecord.options || typeof payloadRecord.options !== "object") { payloadRecord.options = {}; } (payloadRecord.options as Record).num_ctx = numCtx; return options?.onPayload?.(payload, model); }, }); } function resolveCaseInsensitiveAllowedToolName( rawName: string, allowedToolNames?: Set, ): string | null { if (!allowedToolNames || allowedToolNames.size === 0) { return null; } const folded = rawName.toLowerCase(); let caseInsensitiveMatch: string | null = null; for (const name of allowedToolNames) { if (name.toLowerCase() !== folded) { continue; } if (caseInsensitiveMatch && caseInsensitiveMatch !== name) { return null; } caseInsensitiveMatch = name; } return caseInsensitiveMatch; } function resolveExactAllowedToolName( rawName: string, allowedToolNames?: Set, ): string | null { if (!allowedToolNames || allowedToolNames.size === 0) { return null; } if (allowedToolNames.has(rawName)) { return rawName; } const normalized = normalizeToolName(rawName); if (allowedToolNames.has(normalized)) { return normalized; } return ( resolveCaseInsensitiveAllowedToolName(rawName, allowedToolNames) ?? resolveCaseInsensitiveAllowedToolName(normalized, allowedToolNames) ); } function buildStructuredToolNameCandidates(rawName: string): string[] { const trimmed = rawName.trim(); if (!trimmed) { return []; } const candidates: string[] = []; const seen = new Set(); const addCandidate = (value: string) => { const candidate = value.trim(); if (!candidate || seen.has(candidate)) { return; } seen.add(candidate); candidates.push(candidate); }; addCandidate(trimmed); addCandidate(normalizeToolName(trimmed)); const normalizedDelimiter = trimmed.replace(/\//g, "."); addCandidate(normalizedDelimiter); addCandidate(normalizeToolName(normalizedDelimiter)); const segments = normalizedDelimiter .split(".") .map((segment) => segment.trim()) .filter(Boolean); if (segments.length > 1) { for (let index = 1; index < segments.length; index += 1) { const suffix = segments.slice(index).join("."); addCandidate(suffix); addCandidate(normalizeToolName(suffix)); } } return candidates; } function resolveStructuredAllowedToolName( rawName: string, allowedToolNames?: Set, ): string | null { if (!allowedToolNames || allowedToolNames.size === 0) { return null; } const candidateNames = buildStructuredToolNameCandidates(rawName); for (const candidate of candidateNames) { if (allowedToolNames.has(candidate)) { return candidate; } } for (const candidate of candidateNames) { const caseInsensitiveMatch = resolveCaseInsensitiveAllowedToolName(candidate, allowedToolNames); if (caseInsensitiveMatch) { return caseInsensitiveMatch; } } return null; } function inferToolNameFromToolCallId( rawId: string | undefined, allowedToolNames?: Set, ): string | null { if (!rawId || !allowedToolNames || allowedToolNames.size === 0) { return null; } const id = rawId.trim(); if (!id) { return null; } const candidateTokens = new Set(); const addToken = (value: string) => { const trimmed = value.trim(); if (!trimmed) { return; } candidateTokens.add(trimmed); candidateTokens.add(trimmed.replace(/[:._/-]\d+$/, "")); candidateTokens.add(trimmed.replace(/\d+$/, "")); const normalizedDelimiter = trimmed.replace(/\//g, "."); candidateTokens.add(normalizedDelimiter); candidateTokens.add(normalizedDelimiter.replace(/[:._-]\d+$/, "")); candidateTokens.add(normalizedDelimiter.replace(/\d+$/, "")); for (const prefixPattern of [/^functions?[._-]?/i, /^tools?[._-]?/i]) { const stripped = normalizedDelimiter.replace(prefixPattern, ""); if (stripped !== normalizedDelimiter) { candidateTokens.add(stripped); candidateTokens.add(stripped.replace(/[:._-]\d+$/, "")); candidateTokens.add(stripped.replace(/\d+$/, "")); } } }; const preColon = id.split(":")[0] ?? id; for (const seed of [id, preColon]) { addToken(seed); } let singleMatch: string | null = null; for (const candidate of candidateTokens) { const matched = resolveStructuredAllowedToolName(candidate, allowedToolNames); if (!matched) { continue; } if (singleMatch && singleMatch !== matched) { return null; } singleMatch = matched; } return singleMatch; } function looksLikeMalformedToolNameCounter(rawName: string): boolean { const normalizedDelimiter = rawName.trim().replace(/\//g, "."); return ( /^(?:functions?|tools?)[._-]?/i.test(normalizedDelimiter) && /(?:[:._-]\d+|\d+)$/.test(normalizedDelimiter) ); } function normalizeToolCallNameForDispatch( rawName: string, allowedToolNames?: Set, rawToolCallId?: string, ): string { const trimmed = rawName.trim(); if (!trimmed) { // Keep whitespace-only placeholders unchanged unless we can safely infer // a canonical name from toolCallId and allowlist. return inferToolNameFromToolCallId(rawToolCallId, allowedToolNames) ?? rawName; } if (!allowedToolNames || allowedToolNames.size === 0) { return trimmed; } const exact = resolveExactAllowedToolName(trimmed, allowedToolNames); if (exact) { return exact; } // Some providers put malformed toolCallId-like strings into `name` // itself (for example `functionsread3`). Recover conservatively from the // name token before consulting the separate id so explicit names like // `someOtherTool` are preserved. const inferredFromName = inferToolNameFromToolCallId(trimmed, allowedToolNames); if (inferredFromName) { return inferredFromName; } // If the explicit name looks like a provider-mangled tool-call id with a // numeric suffix, fail closed when inference is ambiguous instead of routing // to whichever structured candidate happens to match. if (looksLikeMalformedToolNameCounter(trimmed)) { return trimmed; } return resolveStructuredAllowedToolName(trimmed, allowedToolNames) ?? trimmed; } function isToolCallBlockType(type: unknown): boolean { return type === "toolCall" || type === "toolUse" || type === "functionCall"; } function normalizeToolCallIdsInMessage(message: unknown): void { if (!message || typeof message !== "object") { return; } const content = (message as { content?: unknown }).content; if (!Array.isArray(content)) { return; } const usedIds = new Set(); for (const block of content) { if (!block || typeof block !== "object") { continue; } const typedBlock = block as { type?: unknown; id?: unknown }; if (!isToolCallBlockType(typedBlock.type) || typeof typedBlock.id !== "string") { continue; } const trimmedId = typedBlock.id.trim(); if (!trimmedId) { continue; } usedIds.add(trimmedId); } let fallbackIndex = 1; for (const block of content) { if (!block || typeof block !== "object") { continue; } const typedBlock = block as { type?: unknown; id?: unknown }; if (!isToolCallBlockType(typedBlock.type)) { continue; } if (typeof typedBlock.id === "string") { const trimmedId = typedBlock.id.trim(); if (trimmedId) { if (typedBlock.id !== trimmedId) { typedBlock.id = trimmedId; } usedIds.add(trimmedId); continue; } } let fallbackId = ""; while (!fallbackId || usedIds.has(fallbackId)) { fallbackId = `call_auto_${fallbackIndex++}`; } typedBlock.id = fallbackId; usedIds.add(fallbackId); } } function trimWhitespaceFromToolCallNamesInMessage( message: unknown, allowedToolNames?: Set, ): void { if (!message || typeof message !== "object") { return; } const content = (message as { content?: unknown }).content; if (!Array.isArray(content)) { return; } for (const block of content) { if (!block || typeof block !== "object") { continue; } const typedBlock = block as { type?: unknown; name?: unknown; id?: unknown }; if (!isToolCallBlockType(typedBlock.type)) { continue; } const rawId = typeof typedBlock.id === "string" ? typedBlock.id : undefined; if (typeof typedBlock.name === "string") { const normalized = normalizeToolCallNameForDispatch(typedBlock.name, allowedToolNames, rawId); if (normalized !== typedBlock.name) { typedBlock.name = normalized; } continue; } const inferred = inferToolNameFromToolCallId(rawId, allowedToolNames); if (inferred) { typedBlock.name = inferred; } } normalizeToolCallIdsInMessage(message); } function wrapStreamTrimToolCallNames( stream: ReturnType, allowedToolNames?: Set, ): ReturnType { const originalResult = stream.result.bind(stream); stream.result = async () => { const message = await originalResult(); trimWhitespaceFromToolCallNamesInMessage(message, allowedToolNames); return message; }; const originalAsyncIterator = stream[Symbol.asyncIterator].bind(stream); (stream as { [Symbol.asyncIterator]: typeof originalAsyncIterator })[Symbol.asyncIterator] = function () { const iterator = originalAsyncIterator(); return { async next() { const result = await iterator.next(); if (!result.done && result.value && typeof result.value === "object") { const event = result.value as { partial?: unknown; message?: unknown; }; trimWhitespaceFromToolCallNamesInMessage(event.partial, allowedToolNames); trimWhitespaceFromToolCallNamesInMessage(event.message, allowedToolNames); } return result; }, async return(value?: unknown) { return iterator.return?.(value) ?? { done: true as const, value: undefined }; }, async throw(error?: unknown) { return iterator.throw?.(error) ?? { done: true as const, value: undefined }; }, }; }; return stream; } export function wrapStreamFnTrimToolCallNames( baseFn: StreamFn, allowedToolNames?: Set, ): StreamFn { return (model, context, options) => { const maybeStream = baseFn(model, context, options); if (maybeStream && typeof maybeStream === "object" && "then" in maybeStream) { return Promise.resolve(maybeStream).then((stream) => wrapStreamTrimToolCallNames(stream, allowedToolNames), ); } return wrapStreamTrimToolCallNames(maybeStream, allowedToolNames); }; } function extractBalancedJsonPrefix(raw: string): string | null { let start = 0; while (start < raw.length && /\s/.test(raw[start] ?? "")) { start += 1; } const startChar = raw[start]; if (startChar !== "{" && startChar !== "[") { return null; } let depth = 0; let inString = false; let escaped = false; for (let i = start; i < raw.length; i += 1) { const char = raw[i]; if (char === undefined) { break; } if (inString) { if (escaped) { escaped = false; } else if (char === "\\") { escaped = true; } else if (char === '"') { inString = false; } continue; } if (char === '"') { inString = true; continue; } if (char === "{" || char === "[") { depth += 1; continue; } if (char === "}" || char === "]") { depth -= 1; if (depth === 0) { return raw.slice(start, i + 1); } } } return null; } const MAX_TOOLCALL_REPAIR_BUFFER_CHARS = 64_000; const MAX_TOOLCALL_REPAIR_TRAILING_CHARS = 3; const TOOLCALL_REPAIR_ALLOWED_TRAILING_RE = /^[^\s{}[\]":,\\]{1,3}$/; function shouldAttemptMalformedToolCallRepair(partialJson: string, delta: string): boolean { if (/[}\]]/.test(delta)) { return true; } const trimmedDelta = delta.trim(); return ( trimmedDelta.length > 0 && trimmedDelta.length <= MAX_TOOLCALL_REPAIR_TRAILING_CHARS && /[}\]]/.test(partialJson) ); } type ToolCallArgumentRepair = { args: Record; trailingSuffix: string; }; function tryParseMalformedToolCallArguments(raw: string): ToolCallArgumentRepair | undefined { if (!raw.trim()) { return undefined; } try { JSON.parse(raw); return undefined; } catch { const jsonPrefix = extractBalancedJsonPrefix(raw); if (!jsonPrefix) { return undefined; } const suffix = raw.slice(raw.indexOf(jsonPrefix) + jsonPrefix.length).trim(); if ( suffix.length === 0 || suffix.length > MAX_TOOLCALL_REPAIR_TRAILING_CHARS || !TOOLCALL_REPAIR_ALLOWED_TRAILING_RE.test(suffix) ) { return undefined; } try { const parsed = JSON.parse(jsonPrefix) as unknown; return parsed && typeof parsed === "object" && !Array.isArray(parsed) ? { args: parsed as Record, trailingSuffix: suffix } : undefined; } catch { return undefined; } } } function repairToolCallArgumentsInMessage( message: unknown, contentIndex: number, repairedArgs: Record, ): void { if (!message || typeof message !== "object") { return; } const content = (message as { content?: unknown }).content; if (!Array.isArray(content)) { return; } const block = content[contentIndex]; if (!block || typeof block !== "object") { return; } const typedBlock = block as { type?: unknown; arguments?: unknown }; if (!isToolCallBlockType(typedBlock.type)) { return; } typedBlock.arguments = repairedArgs; } function clearToolCallArgumentsInMessage(message: unknown, contentIndex: number): void { if (!message || typeof message !== "object") { return; } const content = (message as { content?: unknown }).content; if (!Array.isArray(content)) { return; } const block = content[contentIndex]; if (!block || typeof block !== "object") { return; } const typedBlock = block as { type?: unknown; arguments?: unknown }; if (!isToolCallBlockType(typedBlock.type)) { return; } typedBlock.arguments = {}; } function repairMalformedToolCallArgumentsInMessage( message: unknown, repairedArgsByIndex: Map>, ): void { if (!message || typeof message !== "object") { return; } const content = (message as { content?: unknown }).content; if (!Array.isArray(content)) { return; } for (const [index, repairedArgs] of repairedArgsByIndex.entries()) { repairToolCallArgumentsInMessage(message, index, repairedArgs); } } function wrapStreamRepairMalformedToolCallArguments( stream: ReturnType, ): ReturnType { const partialJsonByIndex = new Map(); const repairedArgsByIndex = new Map>(); const disabledIndices = new Set(); const loggedRepairIndices = new Set(); const originalResult = stream.result.bind(stream); stream.result = async () => { const message = await originalResult(); repairMalformedToolCallArgumentsInMessage(message, repairedArgsByIndex); partialJsonByIndex.clear(); repairedArgsByIndex.clear(); disabledIndices.clear(); loggedRepairIndices.clear(); return message; }; const originalAsyncIterator = stream[Symbol.asyncIterator].bind(stream); (stream as { [Symbol.asyncIterator]: typeof originalAsyncIterator })[Symbol.asyncIterator] = function () { const iterator = originalAsyncIterator(); return { async next() { const result = await iterator.next(); if (!result.done && result.value && typeof result.value === "object") { const event = result.value as { type?: unknown; contentIndex?: unknown; delta?: unknown; partial?: unknown; message?: unknown; toolCall?: unknown; }; if ( typeof event.contentIndex === "number" && Number.isInteger(event.contentIndex) && event.type === "toolcall_delta" && typeof event.delta === "string" ) { if (disabledIndices.has(event.contentIndex)) { return result; } const nextPartialJson = (partialJsonByIndex.get(event.contentIndex) ?? "") + event.delta; if (nextPartialJson.length > MAX_TOOLCALL_REPAIR_BUFFER_CHARS) { partialJsonByIndex.delete(event.contentIndex); repairedArgsByIndex.delete(event.contentIndex); disabledIndices.add(event.contentIndex); return result; } partialJsonByIndex.set(event.contentIndex, nextPartialJson); if (shouldAttemptMalformedToolCallRepair(nextPartialJson, event.delta)) { const repair = tryParseMalformedToolCallArguments(nextPartialJson); if (repair) { repairedArgsByIndex.set(event.contentIndex, repair.args); repairToolCallArgumentsInMessage(event.partial, event.contentIndex, repair.args); repairToolCallArgumentsInMessage(event.message, event.contentIndex, repair.args); if (!loggedRepairIndices.has(event.contentIndex)) { loggedRepairIndices.add(event.contentIndex); log.warn( `repairing kimi-coding tool call arguments after ${repair.trailingSuffix.length} trailing chars`, ); } } else { repairedArgsByIndex.delete(event.contentIndex); clearToolCallArgumentsInMessage(event.partial, event.contentIndex); clearToolCallArgumentsInMessage(event.message, event.contentIndex); } } } if ( typeof event.contentIndex === "number" && Number.isInteger(event.contentIndex) && event.type === "toolcall_end" ) { const repairedArgs = repairedArgsByIndex.get(event.contentIndex); if (repairedArgs) { if (event.toolCall && typeof event.toolCall === "object") { (event.toolCall as { arguments?: unknown }).arguments = repairedArgs; } repairToolCallArgumentsInMessage(event.partial, event.contentIndex, repairedArgs); repairToolCallArgumentsInMessage(event.message, event.contentIndex, repairedArgs); } partialJsonByIndex.delete(event.contentIndex); disabledIndices.delete(event.contentIndex); loggedRepairIndices.delete(event.contentIndex); } } return result; }, async return(value?: unknown) { return iterator.return?.(value) ?? { done: true as const, value: undefined }; }, async throw(error?: unknown) { return iterator.throw?.(error) ?? { done: true as const, value: undefined }; }, }; }; return stream; } export function wrapStreamFnRepairMalformedToolCallArguments(baseFn: StreamFn): StreamFn { return (model, context, options) => { const maybeStream = baseFn(model, context, options); if (maybeStream && typeof maybeStream === "object" && "then" in maybeStream) { return Promise.resolve(maybeStream).then((stream) => wrapStreamRepairMalformedToolCallArguments(stream), ); } return wrapStreamRepairMalformedToolCallArguments(maybeStream); }; } function shouldRepairMalformedAnthropicToolCallArguments(provider?: string): boolean { return normalizeProviderId(provider ?? "") === "kimi-coding"; } // --------------------------------------------------------------------------- // xAI / Grok: decode HTML entities in tool call arguments // --------------------------------------------------------------------------- const HTML_ENTITY_RE = /&(?:amp|lt|gt|quot|apos|#39|#x[0-9a-f]+|#\d+);/i; function decodeHtmlEntities(value: string): string { return value .replace(/&/gi, "&") .replace(/"/gi, '"') .replace(/'/gi, "'") .replace(/'/gi, "'") .replace(/</gi, "<") .replace(/>/gi, ">") .replace(/&#x([0-9a-f]+);/gi, (_, hex) => String.fromCodePoint(Number.parseInt(hex, 16))) .replace(/&#(\d+);/gi, (_, dec) => String.fromCodePoint(Number.parseInt(dec, 10))); } export function decodeHtmlEntitiesInObject(obj: unknown): unknown { if (typeof obj === "string") { return HTML_ENTITY_RE.test(obj) ? decodeHtmlEntities(obj) : obj; } if (Array.isArray(obj)) { return obj.map(decodeHtmlEntitiesInObject); } if (obj && typeof obj === "object") { const result: Record = {}; for (const [key, val] of Object.entries(obj as Record)) { result[key] = decodeHtmlEntitiesInObject(val); } return result; } return obj; } function decodeXaiToolCallArgumentsInMessage(message: unknown): void { if (!message || typeof message !== "object") { return; } const content = (message as { content?: unknown }).content; if (!Array.isArray(content)) { return; } for (const block of content) { if (!block || typeof block !== "object") { continue; } const typedBlock = block as { type?: unknown; arguments?: unknown }; if (typedBlock.type !== "toolCall" || !typedBlock.arguments) { continue; } if (typeof typedBlock.arguments === "object") { typedBlock.arguments = decodeHtmlEntitiesInObject(typedBlock.arguments); } } } function wrapStreamDecodeXaiToolCallArguments( stream: ReturnType, ): ReturnType { const originalResult = stream.result.bind(stream); stream.result = async () => { const message = await originalResult(); decodeXaiToolCallArgumentsInMessage(message); return message; }; const originalAsyncIterator = stream[Symbol.asyncIterator].bind(stream); (stream as { [Symbol.asyncIterator]: typeof originalAsyncIterator })[Symbol.asyncIterator] = function () { const iterator = originalAsyncIterator(); return { async next() { const result = await iterator.next(); if (!result.done && result.value && typeof result.value === "object") { const event = result.value as { partial?: unknown; message?: unknown }; decodeXaiToolCallArgumentsInMessage(event.partial); decodeXaiToolCallArgumentsInMessage(event.message); } return result; }, async return(value?: unknown) { return iterator.return?.(value) ?? { done: true as const, value: undefined }; }, async throw(error?: unknown) { return iterator.throw?.(error) ?? { done: true as const, value: undefined }; }, }; }; return stream; } function wrapStreamFnDecodeXaiToolCallArguments(baseFn: StreamFn): StreamFn { return (model, context, options) => { const maybeStream = baseFn(model, context, options); if (maybeStream && typeof maybeStream === "object" && "then" in maybeStream) { return Promise.resolve(maybeStream).then((stream) => wrapStreamDecodeXaiToolCallArguments(stream), ); } return wrapStreamDecodeXaiToolCallArguments(maybeStream); }; } export async function resolvePromptBuildHookResult(params: { prompt: string; messages: unknown[]; hookCtx: PluginHookAgentContext; hookRunner?: PromptBuildHookRunner | null; legacyBeforeAgentStartResult?: PluginHookBeforeAgentStartResult; }): Promise { const promptBuildResult = params.hookRunner?.hasHooks("before_prompt_build") ? await params.hookRunner .runBeforePromptBuild( { prompt: params.prompt, messages: params.messages, }, params.hookCtx, ) .catch((hookErr: unknown) => { log.warn(`before_prompt_build hook failed: ${String(hookErr)}`); return undefined; }) : undefined; const legacyResult = params.legacyBeforeAgentStartResult ?? (params.hookRunner?.hasHooks("before_agent_start") ? await params.hookRunner .runBeforeAgentStart( { prompt: params.prompt, messages: params.messages, }, params.hookCtx, ) .catch((hookErr: unknown) => { log.warn( `before_agent_start hook (legacy prompt build path) failed: ${String(hookErr)}`, ); return undefined; }) : undefined); return { systemPrompt: promptBuildResult?.systemPrompt ?? legacyResult?.systemPrompt, prependContext: joinPresentTextSegments([ promptBuildResult?.prependContext, legacyResult?.prependContext, ]), prependSystemContext: joinPresentTextSegments([ promptBuildResult?.prependSystemContext, legacyResult?.prependSystemContext, ]), appendSystemContext: joinPresentTextSegments([ promptBuildResult?.appendSystemContext, legacyResult?.appendSystemContext, ]), }; } export function composeSystemPromptWithHookContext(params: { baseSystemPrompt?: string; prependSystemContext?: string; appendSystemContext?: string; }): string | undefined { const prependSystem = params.prependSystemContext?.trim(); const appendSystem = params.appendSystemContext?.trim(); if (!prependSystem && !appendSystem) { return undefined; } return joinPresentTextSegments( [params.prependSystemContext, params.baseSystemPrompt, params.appendSystemContext], { trim: true }, ); } export function resolvePromptModeForSession(sessionKey?: string): "minimal" | "full" { if (!sessionKey) { return "full"; } return isSubagentSessionKey(sessionKey) || isCronSessionKey(sessionKey) ? "minimal" : "full"; } export function resolveAttemptFsWorkspaceOnly(params: { config?: OpenClawConfig; sessionAgentId: string; }): boolean { return resolveEffectiveToolFsWorkspaceOnly({ cfg: params.config, agentId: params.sessionAgentId, }); } export function prependSystemPromptAddition(params: { systemPrompt: string; systemPromptAddition?: string; }): string { if (!params.systemPromptAddition) { return params.systemPrompt; } return `${params.systemPromptAddition}\n\n${params.systemPrompt}`; } /** Build runtime context passed into context-engine afterTurn hooks. */ export function buildAfterTurnRuntimeContext(params: { attempt: Pick< EmbeddedRunAttemptParams, | "sessionKey" | "messageChannel" | "messageProvider" | "agentAccountId" | "config" | "skillsSnapshot" | "senderIsOwner" | "provider" | "modelId" | "thinkLevel" | "reasoningLevel" | "bashElevated" | "extraSystemPrompt" | "ownerNumbers" | "authProfileId" >; workspaceDir: string; agentDir: string; }): Partial { return { sessionKey: params.attempt.sessionKey, messageChannel: params.attempt.messageChannel, messageProvider: params.attempt.messageProvider, agentAccountId: params.attempt.agentAccountId, authProfileId: params.attempt.authProfileId, workspaceDir: params.workspaceDir, agentDir: params.agentDir, config: params.attempt.config, skillsSnapshot: params.attempt.skillsSnapshot, senderIsOwner: params.attempt.senderIsOwner, provider: params.attempt.provider, model: params.attempt.modelId, thinkLevel: params.attempt.thinkLevel, reasoningLevel: params.attempt.reasoningLevel, bashElevated: params.attempt.bashElevated, extraSystemPrompt: params.attempt.extraSystemPrompt, ownerNumbers: params.attempt.ownerNumbers, }; } function summarizeMessagePayload(msg: AgentMessage): { textChars: number; imageBlocks: number } { const content = (msg as { content?: unknown }).content; if (typeof content === "string") { return { textChars: content.length, imageBlocks: 0 }; } if (!Array.isArray(content)) { return { textChars: 0, imageBlocks: 0 }; } let textChars = 0; let imageBlocks = 0; for (const block of content) { if (!block || typeof block !== "object") { continue; } const typedBlock = block as { type?: unknown; text?: unknown }; if (typedBlock.type === "image") { imageBlocks++; continue; } if (typeof typedBlock.text === "string") { textChars += typedBlock.text.length; } } return { textChars, imageBlocks }; } function summarizeSessionContext(messages: AgentMessage[]): { roleCounts: string; totalTextChars: number; totalImageBlocks: number; maxMessageTextChars: number; } { const roleCounts = new Map(); let totalTextChars = 0; let totalImageBlocks = 0; let maxMessageTextChars = 0; for (const msg of messages) { const role = typeof msg.role === "string" ? msg.role : "unknown"; roleCounts.set(role, (roleCounts.get(role) ?? 0) + 1); const payload = summarizeMessagePayload(msg); totalTextChars += payload.textChars; totalImageBlocks += payload.imageBlocks; if (payload.textChars > maxMessageTextChars) { maxMessageTextChars = payload.textChars; } } return { roleCounts: [...roleCounts.entries()] .toSorted((a, b) => a[0].localeCompare(b[0])) .map(([role, count]) => `${role}:${count}`) .join(",") || "none", totalTextChars, totalImageBlocks, maxMessageTextChars, }; } export async function runEmbeddedAttempt( params: EmbeddedRunAttemptParams, ): Promise { const resolvedWorkspace = resolveUserPath(params.workspaceDir); const prevCwd = process.cwd(); const runAbortController = new AbortController(); // Proxy bootstrap must happen before timeout tuning so the timeouts wrap the // active EnvHttpProxyAgent instead of being replaced by a bare proxy dispatcher. ensureGlobalUndiciEnvProxyDispatcher(); ensureGlobalUndiciStreamTimeouts(); log.debug( `embedded run start: runId=${params.runId} sessionId=${params.sessionId} provider=${params.provider} model=${params.modelId} thinking=${params.thinkLevel} messageChannel=${params.messageChannel ?? params.messageProvider ?? "unknown"}`, ); await fs.mkdir(resolvedWorkspace, { recursive: true }); const sandboxSessionKey = params.sessionKey?.trim() || params.sessionId; const sandbox = await resolveSandboxContext({ config: params.config, sessionKey: sandboxSessionKey, workspaceDir: resolvedWorkspace, }); const effectiveWorkspace = sandbox?.enabled ? sandbox.workspaceAccess === "rw" ? resolvedWorkspace : sandbox.workspaceDir : resolvedWorkspace; await fs.mkdir(effectiveWorkspace, { recursive: true }); let restoreSkillEnv: (() => void) | undefined; process.chdir(effectiveWorkspace); try { const { shouldLoadSkillEntries, skillEntries } = resolveEmbeddedRunSkillEntries({ workspaceDir: effectiveWorkspace, config: params.config, skillsSnapshot: params.skillsSnapshot, }); restoreSkillEnv = params.skillsSnapshot ? applySkillEnvOverridesFromSnapshot({ snapshot: params.skillsSnapshot, config: params.config, }) : applySkillEnvOverrides({ skills: skillEntries ?? [], config: params.config, }); const skillsPrompt = resolveSkillsPromptForRun({ skillsSnapshot: params.skillsSnapshot, entries: shouldLoadSkillEntries ? skillEntries : undefined, config: params.config, workspaceDir: effectiveWorkspace, }); const sessionLabel = params.sessionKey ?? params.sessionId; const { bootstrapFiles: hookAdjustedBootstrapFiles, contextFiles } = await resolveBootstrapContextForRun({ workspaceDir: effectiveWorkspace, config: params.config, sessionKey: params.sessionKey, sessionId: params.sessionId, warn: makeBootstrapWarn({ sessionLabel, warn: (message) => log.warn(message) }), contextMode: params.bootstrapContextMode, runKind: params.bootstrapContextRunKind, }); const bootstrapMaxChars = resolveBootstrapMaxChars(params.config); const bootstrapTotalMaxChars = resolveBootstrapTotalMaxChars(params.config); const bootstrapAnalysis = analyzeBootstrapBudget({ files: buildBootstrapInjectionStats({ bootstrapFiles: hookAdjustedBootstrapFiles, injectedFiles: contextFiles, }), bootstrapMaxChars, bootstrapTotalMaxChars, }); const bootstrapPromptWarningMode = resolveBootstrapPromptTruncationWarningMode(params.config); const bootstrapPromptWarning = buildBootstrapPromptWarning({ analysis: bootstrapAnalysis, mode: bootstrapPromptWarningMode, seenSignatures: params.bootstrapPromptWarningSignaturesSeen, previousSignature: params.bootstrapPromptWarningSignature, }); const workspaceNotes = hookAdjustedBootstrapFiles.some( (file) => file.name === DEFAULT_BOOTSTRAP_FILENAME && !file.missing, ) ? ["Reminder: commit your changes in this workspace after edits."] : undefined; const agentDir = params.agentDir ?? resolveOpenClawAgentDir(); const { defaultAgentId, sessionAgentId } = resolveSessionAgentIds({ sessionKey: params.sessionKey, config: params.config, agentId: params.agentId, }); const effectiveFsWorkspaceOnly = resolveAttemptFsWorkspaceOnly({ config: params.config, sessionAgentId, }); // Track sessions_yield tool invocation (callback pattern, like clientToolCallDetected) let yieldDetected = false; let yieldMessage: string | null = null; // Late-binding reference so onYield can abort the session (declared after tool creation) let abortSessionForYield: (() => void) | null = null; let queueYieldInterruptForSession: (() => void) | null = null; let yieldAbortSettled: Promise | null = null; // Check if the model supports native image input const modelHasVision = params.model.input?.includes("image") ?? false; const toolsRaw = params.disableTools ? [] : createOpenClawCodingTools({ agentId: sessionAgentId, exec: { ...params.execOverrides, elevated: params.bashElevated, }, sandbox, messageProvider: params.messageChannel ?? params.messageProvider, agentAccountId: params.agentAccountId, messageTo: params.messageTo, messageThreadId: params.messageThreadId, groupId: params.groupId, groupChannel: params.groupChannel, groupSpace: params.groupSpace, spawnedBy: params.spawnedBy, senderId: params.senderId, senderName: params.senderName, senderUsername: params.senderUsername, senderE164: params.senderE164, senderIsOwner: params.senderIsOwner, sessionKey: sandboxSessionKey, sessionId: params.sessionId, runId: params.runId, agentDir, workspaceDir: effectiveWorkspace, // When sandboxing uses a copied workspace (`ro` or `none`), effectiveWorkspace points // at the sandbox copy. Spawned subagents should inherit the real workspace instead. spawnWorkspaceDir: sandbox?.enabled && sandbox.workspaceAccess !== "rw" ? resolvedWorkspace : undefined, config: params.config, abortSignal: runAbortController.signal, modelProvider: params.model.provider, modelId: params.modelId, modelContextWindowTokens: params.model.contextWindow, modelAuthMode: resolveModelAuthMode(params.model.provider, params.config), currentChannelId: params.currentChannelId, currentThreadTs: params.currentThreadTs, currentMessageId: params.currentMessageId, replyToMode: params.replyToMode, hasRepliedRef: params.hasRepliedRef, modelHasVision, requireExplicitMessageTarget: params.requireExplicitMessageTarget ?? isSubagentSessionKey(params.sessionKey), disableMessageTool: params.disableMessageTool, onYield: (message) => { yieldDetected = true; yieldMessage = message; queueYieldInterruptForSession?.(); runAbortController.abort("sessions_yield"); abortSessionForYield?.(); }, }); const toolsEnabled = supportsModelTools(params.model); const tools = sanitizeToolsForGoogle({ tools: toolsEnabled ? toolsRaw : [], provider: params.provider, }); const clientTools = toolsEnabled ? params.clientTools : undefined; const allowedToolNames = collectAllowedToolNames({ tools, clientTools, }); logToolSchemasForGoogle({ tools, provider: params.provider }); const machineName = await getMachineDisplayName(); const runtimeChannel = normalizeMessageChannel(params.messageChannel ?? params.messageProvider); let runtimeCapabilities = runtimeChannel ? (resolveChannelCapabilities({ cfg: params.config, channel: runtimeChannel, accountId: params.agentAccountId, }) ?? []) : undefined; if (runtimeChannel === "telegram" && params.config) { const inlineButtonsScope = resolveTelegramInlineButtonsScope({ cfg: params.config, accountId: params.agentAccountId ?? undefined, }); if (inlineButtonsScope !== "off") { if (!runtimeCapabilities) { runtimeCapabilities = []; } if ( !runtimeCapabilities.some((cap) => String(cap).trim().toLowerCase() === "inlinebuttons") ) { runtimeCapabilities.push("inlineButtons"); } } } const reactionGuidance = runtimeChannel && params.config ? (() => { if (runtimeChannel === "telegram") { const resolved = resolveTelegramReactionLevel({ cfg: params.config, accountId: params.agentAccountId ?? undefined, }); const level = resolved.agentReactionGuidance; return level ? { level, channel: "Telegram" } : undefined; } if (runtimeChannel === "signal") { const resolved = resolveSignalReactionLevel({ cfg: params.config, accountId: params.agentAccountId ?? undefined, }); const level = resolved.agentReactionGuidance; return level ? { level, channel: "Signal" } : undefined; } return undefined; })() : undefined; const sandboxInfo = buildEmbeddedSandboxInfo(sandbox, params.bashElevated); const reasoningTagHint = isReasoningTagProvider(params.provider); // Resolve channel-specific message actions for system prompt const channelActions = runtimeChannel ? listChannelSupportedActions({ cfg: params.config, channel: runtimeChannel, }) : undefined; const messageToolHints = runtimeChannel ? resolveChannelMessageToolHints({ cfg: params.config, channel: runtimeChannel, accountId: params.agentAccountId, }) : undefined; const defaultModelRef = resolveDefaultModelForAgent({ cfg: params.config ?? {}, agentId: sessionAgentId, }); const defaultModelLabel = `${defaultModelRef.provider}/${defaultModelRef.model}`; const { runtimeInfo, userTimezone, userTime, userTimeFormat } = buildSystemPromptParams({ config: params.config, agentId: sessionAgentId, workspaceDir: effectiveWorkspace, cwd: process.cwd(), runtime: { host: machineName, os: `${os.type()} ${os.release()}`, arch: os.arch(), node: process.version, model: `${params.provider}/${params.modelId}`, defaultModel: defaultModelLabel, shell: detectRuntimeShell(), channel: runtimeChannel, capabilities: runtimeCapabilities, channelActions, }, }); const isDefaultAgent = sessionAgentId === defaultAgentId; const promptMode = resolvePromptModeForSession(params.sessionKey); const docsPath = await resolveOpenClawDocsPath({ workspaceDir: effectiveWorkspace, argv1: process.argv[1], cwd: process.cwd(), moduleUrl: import.meta.url, }); const ttsHint = params.config ? buildTtsSystemPromptHint(params.config) : undefined; const ownerDisplay = resolveOwnerDisplaySetting(params.config); const appendPrompt = buildEmbeddedSystemPrompt({ workspaceDir: effectiveWorkspace, defaultThinkLevel: params.thinkLevel, reasoningLevel: params.reasoningLevel ?? "off", extraSystemPrompt: params.extraSystemPrompt, ownerNumbers: params.ownerNumbers, ownerDisplay: ownerDisplay.ownerDisplay, ownerDisplaySecret: ownerDisplay.ownerDisplaySecret, reasoningTagHint, heartbeatPrompt: isDefaultAgent ? resolveHeartbeatPrompt(params.config?.agents?.defaults?.heartbeat?.prompt) : undefined, skillsPrompt, docsPath: docsPath ?? undefined, ttsHint, workspaceNotes, reactionGuidance, promptMode, acpEnabled: params.config?.acp?.enabled !== false, runtimeInfo, messageToolHints, sandboxInfo, tools, modelAliasLines: buildModelAliasLines(params.config), userTimezone, userTime, userTimeFormat, contextFiles, bootstrapTruncationWarningLines: bootstrapPromptWarning.lines, memoryCitationsMode: params.config?.memory?.citations, }); const systemPromptReport = buildSystemPromptReport({ source: "run", generatedAt: Date.now(), sessionId: params.sessionId, sessionKey: params.sessionKey, provider: params.provider, model: params.modelId, workspaceDir: effectiveWorkspace, bootstrapMaxChars, bootstrapTotalMaxChars, bootstrapTruncation: buildBootstrapTruncationReportMeta({ analysis: bootstrapAnalysis, warningMode: bootstrapPromptWarningMode, warning: bootstrapPromptWarning, }), sandbox: (() => { const runtime = resolveSandboxRuntimeStatus({ cfg: params.config, sessionKey: sandboxSessionKey, }); return { mode: runtime.mode, sandboxed: runtime.sandboxed }; })(), systemPrompt: appendPrompt, bootstrapFiles: hookAdjustedBootstrapFiles, injectedFiles: contextFiles, skillsPrompt, tools, }); const systemPromptOverride = createSystemPromptOverride(appendPrompt); let systemPromptText = systemPromptOverride(); const sessionLock = await acquireSessionWriteLock({ sessionFile: params.sessionFile, maxHoldMs: resolveSessionLockMaxHoldFromTimeout({ timeoutMs: params.timeoutMs, }), }); let sessionManager: ReturnType | undefined; let session: Awaited>["session"] | undefined; let removeToolResultContextGuard: (() => void) | undefined; try { await repairSessionFileIfNeeded({ sessionFile: params.sessionFile, warn: (message) => log.warn(message), }); const hadSessionFile = await fs .stat(params.sessionFile) .then(() => true) .catch(() => false); const transcriptPolicy = resolveTranscriptPolicy({ modelApi: params.model?.api, provider: params.provider, modelId: params.modelId, }); await prewarmSessionFile(params.sessionFile); sessionManager = guardSessionManager(SessionManager.open(params.sessionFile), { agentId: sessionAgentId, sessionKey: params.sessionKey, inputProvenance: params.inputProvenance, allowSyntheticToolResults: transcriptPolicy.allowSyntheticToolResults, allowedToolNames, }); trackSessionManagerAccess(params.sessionFile); if (hadSessionFile && params.contextEngine?.bootstrap) { try { await params.contextEngine.bootstrap({ sessionId: params.sessionId, sessionKey: params.sessionKey, sessionFile: params.sessionFile, }); } catch (bootstrapErr) { log.warn(`context engine bootstrap failed: ${String(bootstrapErr)}`); } } await prepareSessionManagerForRun({ sessionManager, sessionFile: params.sessionFile, hadSessionFile, sessionId: params.sessionId, cwd: effectiveWorkspace, }); const settingsManager = createPreparedEmbeddedPiSettingsManager({ cwd: effectiveWorkspace, agentDir, cfg: params.config, }); applyPiAutoCompactionGuard({ settingsManager, contextEngineInfo: params.contextEngine?.info, }); // Sets compaction/pruning runtime state and returns extension factories // that must be passed to the resource loader for the safeguard to be active. const extensionFactories = buildEmbeddedExtensionFactories({ cfg: params.config, sessionManager, provider: params.provider, modelId: params.modelId, model: params.model, }); // Only create an explicit resource loader when there are extension factories // to register; otherwise let createAgentSession use its built-in default. let resourceLoader: DefaultResourceLoader | undefined; if (extensionFactories.length > 0) { resourceLoader = new DefaultResourceLoader({ cwd: resolvedWorkspace, agentDir, settingsManager, extensionFactories, }); await resourceLoader.reload(); } // Get hook runner early so it's available when creating tools const hookRunner = getGlobalHookRunner(); const { builtInTools, customTools } = splitSdkTools({ tools, sandboxEnabled: !!sandbox?.enabled, }); // Add client tools (OpenResponses hosted tools) to customTools let clientToolCallDetected: { name: string; params: Record } | null = null; const clientToolLoopDetection = resolveToolLoopDetectionConfig({ cfg: params.config, agentId: sessionAgentId, }); const clientToolDefs = clientTools ? toClientToolDefinitions( clientTools, (toolName, toolParams) => { clientToolCallDetected = { name: toolName, params: toolParams }; }, { agentId: sessionAgentId, sessionKey: sandboxSessionKey, sessionId: params.sessionId, runId: params.runId, loopDetection: clientToolLoopDetection, }, ) : []; const allCustomTools = [...customTools, ...clientToolDefs]; ({ session } = await createAgentSession({ cwd: resolvedWorkspace, agentDir, authStorage: params.authStorage, modelRegistry: params.modelRegistry, model: params.model, thinkingLevel: mapThinkingLevel(params.thinkLevel), tools: builtInTools, customTools: allCustomTools, sessionManager, settingsManager, resourceLoader, })); applySystemPromptOverrideToSession(session, systemPromptText); if (!session) { throw new Error("Embedded agent session missing"); } const activeSession = session; abortSessionForYield = () => { yieldAbortSettled = Promise.resolve(activeSession.abort()); }; queueYieldInterruptForSession = () => { queueSessionsYieldInterruptMessage(activeSession); }; removeToolResultContextGuard = installToolResultContextGuard({ agent: activeSession.agent, contextWindowTokens: Math.max( 1, Math.floor( params.model.contextWindow ?? params.model.maxTokens ?? DEFAULT_CONTEXT_TOKENS, ), ), }); const cacheTrace = createCacheTrace({ cfg: params.config, env: process.env, runId: params.runId, sessionId: activeSession.sessionId, sessionKey: params.sessionKey, provider: params.provider, modelId: params.modelId, modelApi: params.model.api, workspaceDir: params.workspaceDir, }); const anthropicPayloadLogger = createAnthropicPayloadLogger({ env: process.env, runId: params.runId, sessionId: activeSession.sessionId, sessionKey: params.sessionKey, provider: params.provider, modelId: params.modelId, modelApi: params.model.api, workspaceDir: params.workspaceDir, }); // Ollama native API: bypass SDK's streamSimple and use direct /api/chat calls // for reliable streaming + tool calling support (#11828). if (params.model.api === "ollama") { // Prioritize configured provider baseUrl so Docker/remote Ollama hosts work reliably. const providerConfig = params.config?.models?.providers?.[params.model.provider]; const providerBaseUrl = typeof providerConfig?.baseUrl === "string" ? providerConfig.baseUrl : undefined; const ollamaStreamFn = createConfiguredOllamaStreamFn({ model: params.model, providerBaseUrl, }); activeSession.agent.streamFn = ollamaStreamFn; ensureCustomApiRegistered(params.model.api, ollamaStreamFn); } else if (params.model.api === "openai-responses" && params.provider === "openai") { const wsApiKey = await params.authStorage.getApiKey(params.provider); if (wsApiKey) { activeSession.agent.streamFn = createOpenAIWebSocketStreamFn(wsApiKey, params.sessionId, { signal: runAbortController.signal, }); } else { log.warn(`[ws-stream] no API key for provider=${params.provider}; using HTTP transport`); activeSession.agent.streamFn = streamSimple; } } else { // Force a stable streamFn reference so vitest can reliably mock @mariozechner/pi-ai. activeSession.agent.streamFn = streamSimple; } // Ollama with OpenAI-compatible API needs num_ctx in payload.options. // Otherwise Ollama defaults to a 4096 context window. const providerIdForNumCtx = typeof params.model.provider === "string" && params.model.provider.trim().length > 0 ? params.model.provider : params.provider; const shouldInjectNumCtx = shouldInjectOllamaCompatNumCtx({ model: params.model, config: params.config, providerId: providerIdForNumCtx, }); if (shouldInjectNumCtx) { const numCtx = Math.max( 1, Math.floor( params.model.contextWindow ?? params.model.maxTokens ?? DEFAULT_CONTEXT_TOKENS, ), ); activeSession.agent.streamFn = wrapOllamaCompatNumCtx(activeSession.agent.streamFn, numCtx); } applyExtraParamsToAgent( activeSession.agent, params.config, params.provider, params.modelId, { ...params.streamParams, fastMode: params.fastMode, }, params.thinkLevel, sessionAgentId, ); if (cacheTrace) { cacheTrace.recordStage("session:loaded", { messages: activeSession.messages, system: systemPromptText, note: "after session create", }); activeSession.agent.streamFn = cacheTrace.wrapStreamFn(activeSession.agent.streamFn); } // Anthropic Claude endpoints can reject replayed `thinking` blocks // (e.g. thinkingSignature:"reasoning_text") on any follow-up provider // call, including tool continuations. Wrap the stream function so every // outbound request sees sanitized messages. if (transcriptPolicy.dropThinkingBlocks) { const inner = activeSession.agent.streamFn; activeSession.agent.streamFn = (model, context, options) => { const ctx = context as unknown as { messages?: unknown }; const messages = ctx?.messages; if (!Array.isArray(messages)) { return inner(model, context, options); } const sanitized = dropThinkingBlocks(messages as unknown as AgentMessage[]) as unknown; if (sanitized === messages) { return inner(model, context, options); } const nextContext = { ...(context as unknown as Record), messages: sanitized, } as unknown; return inner(model, nextContext as typeof context, options); }; } // Mistral (and other strict providers) reject tool call IDs that don't match their // format requirements (e.g. [a-zA-Z0-9]{9}). sanitizeSessionHistory only processes // historical messages at attempt start, but the agent loop's internal tool call → // tool result cycles bypass that path. Wrap streamFn so every outbound request // sees sanitized tool call IDs. if (transcriptPolicy.sanitizeToolCallIds && transcriptPolicy.toolCallIdMode) { const inner = activeSession.agent.streamFn; const mode = transcriptPolicy.toolCallIdMode; activeSession.agent.streamFn = (model, context, options) => { const ctx = context as unknown as { messages?: unknown }; const messages = ctx?.messages; if (!Array.isArray(messages)) { return inner(model, context, options); } const sanitized = sanitizeToolCallIdsForCloudCodeAssist(messages as AgentMessage[], mode); if (sanitized === messages) { return inner(model, context, options); } const nextContext = { ...(context as unknown as Record), messages: sanitized, } as unknown; return inner(model, nextContext as typeof context, options); }; } if ( params.model.api === "openai-responses" || params.model.api === "openai-codex-responses" ) { const inner = activeSession.agent.streamFn; activeSession.agent.streamFn = (model, context, options) => { const ctx = context as unknown as { messages?: unknown }; const messages = ctx?.messages; if (!Array.isArray(messages)) { return inner(model, context, options); } const sanitized = downgradeOpenAIFunctionCallReasoningPairs(messages as AgentMessage[]); if (sanitized === messages) { return inner(model, context, options); } const nextContext = { ...(context as unknown as Record), messages: sanitized, } as unknown; return inner(model, nextContext as typeof context, options); }; } const innerStreamFn = activeSession.agent.streamFn; activeSession.agent.streamFn = (model, context, options) => { const signal = runAbortController.signal as AbortSignal & { reason?: unknown }; if (yieldDetected && signal.aborted && signal.reason === "sessions_yield") { return createYieldAbortedResponse(model) as unknown as Awaited< ReturnType >; } return innerStreamFn(model, context, options); }; // Some models emit tool names with surrounding whitespace (e.g. " read "). // pi-agent-core dispatches tool calls with exact string matching, so normalize // names on the live response stream before tool execution. activeSession.agent.streamFn = wrapStreamFnTrimToolCallNames( activeSession.agent.streamFn, allowedToolNames, ); if ( params.model.api === "anthropic-messages" && shouldRepairMalformedAnthropicToolCallArguments(params.provider) ) { activeSession.agent.streamFn = wrapStreamFnRepairMalformedToolCallArguments( activeSession.agent.streamFn, ); } if (isXaiProvider(params.provider, params.modelId)) { activeSession.agent.streamFn = wrapStreamFnDecodeXaiToolCallArguments( activeSession.agent.streamFn, ); } if (anthropicPayloadLogger) { activeSession.agent.streamFn = anthropicPayloadLogger.wrapStreamFn( activeSession.agent.streamFn, ); } try { const prior = await sanitizeSessionHistory({ messages: activeSession.messages, modelApi: params.model.api, modelId: params.modelId, provider: params.provider, allowedToolNames, config: params.config, sessionManager, sessionId: params.sessionId, policy: transcriptPolicy, }); cacheTrace?.recordStage("session:sanitized", { messages: prior }); const validatedGemini = transcriptPolicy.validateGeminiTurns ? validateGeminiTurns(prior) : prior; const validated = transcriptPolicy.validateAnthropicTurns ? validateAnthropicTurns(validatedGemini) : validatedGemini; const truncated = limitHistoryTurns( validated, getDmHistoryLimitFromSessionKey(params.sessionKey, params.config), ); // Re-run tool_use/tool_result pairing repair after truncation, since // limitHistoryTurns can orphan tool_result blocks by removing the // assistant message that contained the matching tool_use. const limited = transcriptPolicy.repairToolUseResultPairing ? sanitizeToolUseResultPairing(truncated) : truncated; cacheTrace?.recordStage("session:limited", { messages: limited }); if (limited.length > 0) { activeSession.agent.replaceMessages(limited); } if (params.contextEngine) { try { const assembled = await params.contextEngine.assemble({ sessionId: params.sessionId, sessionKey: params.sessionKey, messages: activeSession.messages, tokenBudget: params.contextTokenBudget, }); if (assembled.messages !== activeSession.messages) { activeSession.agent.replaceMessages(assembled.messages); } if (assembled.systemPromptAddition) { systemPromptText = prependSystemPromptAddition({ systemPrompt: systemPromptText, systemPromptAddition: assembled.systemPromptAddition, }); applySystemPromptOverrideToSession(activeSession, systemPromptText); log.debug( `context engine: prepended system prompt addition (${assembled.systemPromptAddition.length} chars)`, ); } } catch (assembleErr) { log.warn( `context engine assemble failed, using pipeline messages: ${String(assembleErr)}`, ); } } } catch (err) { await flushPendingToolResultsAfterIdle({ agent: activeSession?.agent, sessionManager, clearPendingOnTimeout: true, }); activeSession.dispose(); throw err; } let aborted = Boolean(params.abortSignal?.aborted); let yieldAborted = false; let timedOut = false; let timedOutDuringCompaction = false; const getAbortReason = (signal: AbortSignal): unknown => "reason" in signal ? (signal as { reason?: unknown }).reason : undefined; const makeTimeoutAbortReason = (): Error => { const err = new Error("request timed out"); err.name = "TimeoutError"; return err; }; const makeAbortError = (signal: AbortSignal): Error => { const reason = getAbortReason(signal); const err = reason ? new Error("aborted", { cause: reason }) : new Error("aborted"); err.name = "AbortError"; return err; }; const abortRun = (isTimeout = false, reason?: unknown) => { aborted = true; if (isTimeout) { timedOut = true; } if (isTimeout) { runAbortController.abort(reason ?? makeTimeoutAbortReason()); } else { runAbortController.abort(reason); } void activeSession.abort(); }; const abortable = (promise: Promise): Promise => { const signal = runAbortController.signal; if (signal.aborted) { return Promise.reject(makeAbortError(signal)); } return new Promise((resolve, reject) => { const onAbort = () => { signal.removeEventListener("abort", onAbort); reject(makeAbortError(signal)); }; signal.addEventListener("abort", onAbort, { once: true }); promise.then( (value) => { signal.removeEventListener("abort", onAbort); resolve(value); }, (err) => { signal.removeEventListener("abort", onAbort); reject(err); }, ); }); }; const subscription = subscribeEmbeddedPiSession({ session: activeSession, runId: params.runId, hookRunner: getGlobalHookRunner() ?? undefined, verboseLevel: params.verboseLevel, reasoningMode: params.reasoningLevel ?? "off", toolResultFormat: params.toolResultFormat, shouldEmitToolResult: params.shouldEmitToolResult, shouldEmitToolOutput: params.shouldEmitToolOutput, onToolResult: params.onToolResult, onReasoningStream: params.onReasoningStream, onReasoningEnd: params.onReasoningEnd, onBlockReply: params.onBlockReply, onBlockReplyFlush: params.onBlockReplyFlush, blockReplyBreak: params.blockReplyBreak, blockReplyChunking: params.blockReplyChunking, onPartialReply: params.onPartialReply, onAssistantMessageStart: params.onAssistantMessageStart, onAgentEvent: params.onAgentEvent, enforceFinalTag: params.enforceFinalTag, config: params.config, sessionKey: sandboxSessionKey, sessionId: params.sessionId, agentId: sessionAgentId, }); const { assistantTexts, toolMetas, unsubscribe, waitForCompactionRetry, isCompactionInFlight, getMessagingToolSentTexts, getMessagingToolSentMediaUrls, getMessagingToolSentTargets, getSuccessfulCronAdds, didSendViaMessagingTool, getLastToolError, getUsageTotals, getCompactionCount, } = subscription; const queueHandle: EmbeddedPiQueueHandle = { queueMessage: async (text: string) => { await activeSession.steer(text); }, isStreaming: () => activeSession.isStreaming, isCompacting: () => subscription.isCompacting(), abort: abortRun, }; setActiveEmbeddedRun(params.sessionId, queueHandle, params.sessionKey); let abortWarnTimer: NodeJS.Timeout | undefined; const isProbeSession = params.sessionId?.startsWith("probe-") ?? false; const abortTimer = setTimeout( () => { if (!isProbeSession) { log.warn( `embedded run timeout: runId=${params.runId} sessionId=${params.sessionId} timeoutMs=${params.timeoutMs}`, ); } if ( shouldFlagCompactionTimeout({ isTimeout: true, isCompactionPendingOrRetrying: subscription.isCompacting(), isCompactionInFlight: activeSession.isCompacting, }) ) { timedOutDuringCompaction = true; } abortRun(true); if (!abortWarnTimer) { abortWarnTimer = setTimeout(() => { if (!activeSession.isStreaming) { return; } if (!isProbeSession) { log.warn( `embedded run abort still streaming: runId=${params.runId} sessionId=${params.sessionId}`, ); } }, 10_000); } }, Math.max(1, params.timeoutMs), ); let messagesSnapshot: AgentMessage[] = []; let sessionIdUsed = activeSession.sessionId; const onAbort = () => { const reason = params.abortSignal ? getAbortReason(params.abortSignal) : undefined; const timeout = reason ? isTimeoutError(reason) : false; if ( shouldFlagCompactionTimeout({ isTimeout: timeout, isCompactionPendingOrRetrying: subscription.isCompacting(), isCompactionInFlight: activeSession.isCompacting, }) ) { timedOutDuringCompaction = true; } abortRun(timeout, reason); }; if (params.abortSignal) { if (params.abortSignal.aborted) { onAbort(); } else { params.abortSignal.addEventListener("abort", onAbort, { once: true, }); } } // Hook runner was already obtained earlier before tool creation const hookAgentId = sessionAgentId; let promptError: unknown = null; let promptErrorSource: "prompt" | "compaction" | null = null; const prePromptMessageCount = activeSession.messages.length; try { const promptStartedAt = Date.now(); // Run before_prompt_build hooks to allow plugins to inject prompt context. // Legacy compatibility: before_agent_start is also checked for context fields. let effectivePrompt = params.prompt; const hookCtx = { agentId: hookAgentId, sessionKey: params.sessionKey, sessionId: params.sessionId, workspaceDir: params.workspaceDir, messageProvider: params.messageProvider ?? undefined, trigger: params.trigger, channelId: params.messageChannel ?? params.messageProvider ?? undefined, }; const hookResult = await resolvePromptBuildHookResult({ prompt: params.prompt, messages: activeSession.messages, hookCtx, hookRunner, legacyBeforeAgentStartResult: params.legacyBeforeAgentStartResult, }); { if (hookResult?.prependContext) { effectivePrompt = `${hookResult.prependContext}\n\n${params.prompt}`; log.debug( `hooks: prepended context to prompt (${hookResult.prependContext.length} chars)`, ); } const legacySystemPrompt = typeof hookResult?.systemPrompt === "string" ? hookResult.systemPrompt.trim() : ""; if (legacySystemPrompt) { applySystemPromptOverrideToSession(activeSession, legacySystemPrompt); systemPromptText = legacySystemPrompt; log.debug(`hooks: applied systemPrompt override (${legacySystemPrompt.length} chars)`); } const prependedOrAppendedSystemPrompt = composeSystemPromptWithHookContext({ baseSystemPrompt: systemPromptText, prependSystemContext: hookResult?.prependSystemContext, appendSystemContext: hookResult?.appendSystemContext, }); if (prependedOrAppendedSystemPrompt) { const prependSystemLen = hookResult?.prependSystemContext?.trim().length ?? 0; const appendSystemLen = hookResult?.appendSystemContext?.trim().length ?? 0; applySystemPromptOverrideToSession(activeSession, prependedOrAppendedSystemPrompt); systemPromptText = prependedOrAppendedSystemPrompt; log.debug( `hooks: applied prependSystemContext/appendSystemContext (${prependSystemLen}+${appendSystemLen} chars)`, ); } } log.debug(`embedded run prompt start: runId=${params.runId} sessionId=${params.sessionId}`); cacheTrace?.recordStage("prompt:before", { prompt: effectivePrompt, messages: activeSession.messages, }); // Repair orphaned trailing user messages so new prompts don't violate role ordering. const leafEntry = sessionManager.getLeafEntry(); if (leafEntry?.type === "message" && leafEntry.message.role === "user") { if (leafEntry.parentId) { sessionManager.branch(leafEntry.parentId); } else { sessionManager.resetLeaf(); } const sessionContext = sessionManager.buildSessionContext(); activeSession.agent.replaceMessages(sessionContext.messages); log.warn( `Removed orphaned user message to prevent consecutive user turns. ` + `runId=${params.runId} sessionId=${params.sessionId}`, ); } try { // Idempotent cleanup for legacy sessions with persisted image payloads. // Called each run; only mutates already-answered user turns that still carry image blocks. const didPruneImages = pruneProcessedHistoryImages(activeSession.messages); if (didPruneImages) { activeSession.agent.replaceMessages(activeSession.messages); } // Detect and load images referenced in the prompt for vision-capable models. // Images are prompt-local only (pi-like behavior). const imageResult = await detectAndLoadPromptImages({ prompt: effectivePrompt, workspaceDir: effectiveWorkspace, model: params.model, existingImages: params.images, maxBytes: MAX_IMAGE_BYTES, maxDimensionPx: resolveImageSanitizationLimits(params.config).maxDimensionPx, workspaceOnly: effectiveFsWorkspaceOnly, // Enforce sandbox path restrictions when sandbox is enabled sandbox: sandbox?.enabled && sandbox?.fsBridge ? { root: sandbox.workspaceDir, bridge: sandbox.fsBridge } : undefined, }); cacheTrace?.recordStage("prompt:images", { prompt: effectivePrompt, messages: activeSession.messages, note: `images: prompt=${imageResult.images.length}`, }); // Diagnostic: log context sizes before prompt to help debug early overflow errors. if (log.isEnabled("debug")) { const msgCount = activeSession.messages.length; const systemLen = systemPromptText?.length ?? 0; const promptLen = effectivePrompt.length; const sessionSummary = summarizeSessionContext(activeSession.messages); log.debug( `[context-diag] pre-prompt: sessionKey=${params.sessionKey ?? params.sessionId} ` + `messages=${msgCount} roleCounts=${sessionSummary.roleCounts} ` + `historyTextChars=${sessionSummary.totalTextChars} ` + `maxMessageTextChars=${sessionSummary.maxMessageTextChars} ` + `historyImageBlocks=${sessionSummary.totalImageBlocks} ` + `systemPromptChars=${systemLen} promptChars=${promptLen} ` + `promptImages=${imageResult.images.length} ` + `provider=${params.provider}/${params.modelId} sessionFile=${params.sessionFile}`, ); } if (hookRunner?.hasHooks("llm_input")) { hookRunner .runLlmInput( { runId: params.runId, sessionId: params.sessionId, provider: params.provider, model: params.modelId, systemPrompt: systemPromptText, prompt: effectivePrompt, historyMessages: activeSession.messages, imagesCount: imageResult.images.length, }, { agentId: hookAgentId, sessionKey: params.sessionKey, sessionId: params.sessionId, workspaceDir: params.workspaceDir, messageProvider: params.messageProvider ?? undefined, trigger: params.trigger, channelId: params.messageChannel ?? params.messageProvider ?? undefined, }, ) .catch((err) => { log.warn(`llm_input hook failed: ${String(err)}`); }); } // Only pass images option if there are actually images to pass // This avoids potential issues with models that don't expect the images parameter if (imageResult.images.length > 0) { await abortable(activeSession.prompt(effectivePrompt, { images: imageResult.images })); } else { await abortable(activeSession.prompt(effectivePrompt)); } } catch (err) { // Yield-triggered abort is intentional — treat as clean stop, not error. // Check the abort reason to distinguish from external aborts (timeout, user cancel) // that may race after yieldDetected is set. yieldAborted = yieldDetected && isRunnerAbortError(err) && err instanceof Error && err.cause === "sessions_yield"; if (yieldAborted) { aborted = false; // Ensure the session abort has fully settled before proceeding. if (yieldAbortSettled) { // eslint-disable-next-line @typescript-eslint/await-thenable -- abort() returns Promise per AgentSession.d.ts await yieldAbortSettled; } stripSessionsYieldArtifacts(activeSession); if (yieldMessage) { await persistSessionsYieldContextMessage(activeSession, yieldMessage); } } else { promptError = err; promptErrorSource = "prompt"; } } finally { log.debug( `embedded run prompt end: runId=${params.runId} sessionId=${params.sessionId} durationMs=${Date.now() - promptStartedAt}`, ); } // Capture snapshot before compaction wait so we have complete messages if timeout occurs // Check compaction state before and after to avoid race condition where compaction starts during capture // Use session state (not subscription) for snapshot decisions - need instantaneous compaction status const wasCompactingBefore = activeSession.isCompacting; const snapshot = activeSession.messages.slice(); const wasCompactingAfter = activeSession.isCompacting; // Only trust snapshot if compaction wasn't running before or after capture const preCompactionSnapshot = wasCompactingBefore || wasCompactingAfter ? null : snapshot; const preCompactionSessionId = activeSession.sessionId; const COMPACTION_RETRY_AGGREGATE_TIMEOUT_MS = 60_000; try { // Flush buffered block replies before waiting for compaction so the // user receives the assistant response immediately. Without this, // coalesced/buffered blocks stay in the pipeline until compaction // finishes — which can take minutes on large contexts (#35074). if (params.onBlockReplyFlush) { await params.onBlockReplyFlush(); } // Skip compaction wait when yield aborted the run — the signal is // already tripped and abortable() would immediately reject. const compactionRetryWait = yieldAborted ? { timedOut: false } : await waitForCompactionRetryWithAggregateTimeout({ waitForCompactionRetry, abortable, aggregateTimeoutMs: COMPACTION_RETRY_AGGREGATE_TIMEOUT_MS, isCompactionStillInFlight: isCompactionInFlight, }); if (compactionRetryWait.timedOut) { timedOutDuringCompaction = true; if (!isProbeSession) { log.warn( `compaction retry aggregate timeout (${COMPACTION_RETRY_AGGREGATE_TIMEOUT_MS}ms): ` + `proceeding with pre-compaction state runId=${params.runId} sessionId=${params.sessionId}`, ); } } } catch (err) { if (isRunnerAbortError(err)) { if (!promptError) { promptError = err; promptErrorSource = "compaction"; } if (!isProbeSession) { log.debug( `compaction wait aborted: runId=${params.runId} sessionId=${params.sessionId}`, ); } } else { throw err; } } // Check if ANY compaction occurred during the entire attempt (prompt + retry). // Using a cumulative count (> 0) instead of a delta check avoids missing // compactions that complete during activeSession.prompt() before the delta // baseline is sampled. const compactionOccurredThisAttempt = getCompactionCount() > 0; // Append cache-TTL timestamp AFTER prompt + compaction retry completes. // Previously this was before the prompt, which caused a custom entry to be // inserted between compaction and the next prompt — breaking the // prepareCompaction() guard that checks the last entry type, leading to // double-compaction. See: https://github.com/openclaw/openclaw/issues/9282 // Skip when timed out during compaction — session state may be inconsistent. // Also skip when compaction ran this attempt — appending a custom entry // after compaction would break the guard again. See: #28491 if (!timedOutDuringCompaction && !compactionOccurredThisAttempt) { const shouldTrackCacheTtl = params.config?.agents?.defaults?.contextPruning?.mode === "cache-ttl" && isCacheTtlEligibleProvider(params.provider, params.modelId); if (shouldTrackCacheTtl) { appendCacheTtlTimestamp(sessionManager, { timestamp: Date.now(), provider: params.provider, modelId: params.modelId, }); } } // If timeout occurred during compaction, use pre-compaction snapshot when available // (compaction restructures messages but does not add user/assistant turns). const snapshotSelection = selectCompactionTimeoutSnapshot({ timedOutDuringCompaction, preCompactionSnapshot, preCompactionSessionId, currentSnapshot: activeSession.messages.slice(), currentSessionId: activeSession.sessionId, }); if (timedOutDuringCompaction) { if (!isProbeSession) { log.warn( `using ${snapshotSelection.source} snapshot: timed out during compaction runId=${params.runId} sessionId=${params.sessionId}`, ); } } messagesSnapshot = snapshotSelection.messagesSnapshot; sessionIdUsed = snapshotSelection.sessionIdUsed; if (promptError && promptErrorSource === "prompt" && !compactionOccurredThisAttempt) { try { sessionManager.appendCustomEntry("openclaw:prompt-error", { timestamp: Date.now(), runId: params.runId, sessionId: params.sessionId, provider: params.provider, model: params.modelId, api: params.model.api, error: describeUnknownError(promptError), }); } catch (entryErr) { log.warn(`failed to persist prompt error entry: ${String(entryErr)}`); } } // Let the active context engine run its post-turn lifecycle. if (params.contextEngine) { const afterTurnRuntimeContext = buildAfterTurnRuntimeContext({ attempt: params, workspaceDir: effectiveWorkspace, agentDir, }); if (typeof params.contextEngine.afterTurn === "function") { try { await params.contextEngine.afterTurn({ sessionId: sessionIdUsed, sessionKey: params.sessionKey, sessionFile: params.sessionFile, messages: messagesSnapshot, prePromptMessageCount, tokenBudget: params.contextTokenBudget, runtimeContext: afterTurnRuntimeContext, }); } catch (afterTurnErr) { log.warn(`context engine afterTurn failed: ${String(afterTurnErr)}`); } } else { // Fallback: ingest new messages individually const newMessages = messagesSnapshot.slice(prePromptMessageCount); if (newMessages.length > 0) { if (typeof params.contextEngine.ingestBatch === "function") { try { await params.contextEngine.ingestBatch({ sessionId: sessionIdUsed, sessionKey: params.sessionKey, messages: newMessages, }); } catch (ingestErr) { log.warn(`context engine ingest failed: ${String(ingestErr)}`); } } else { for (const msg of newMessages) { try { await params.contextEngine.ingest({ sessionId: sessionIdUsed, sessionKey: params.sessionKey, message: msg, }); } catch (ingestErr) { log.warn(`context engine ingest failed: ${String(ingestErr)}`); } } } } } } cacheTrace?.recordStage("session:after", { messages: messagesSnapshot, note: timedOutDuringCompaction ? "compaction timeout" : promptError ? "prompt error" : undefined, }); anthropicPayloadLogger?.recordUsage(messagesSnapshot, promptError); // Run agent_end hooks to allow plugins to analyze the conversation // This is fire-and-forget, so we don't await // Run even on compaction timeout so plugins can log/cleanup if (hookRunner?.hasHooks("agent_end")) { hookRunner .runAgentEnd( { messages: messagesSnapshot, success: !aborted && !promptError, error: promptError ? describeUnknownError(promptError) : undefined, durationMs: Date.now() - promptStartedAt, }, { agentId: hookAgentId, sessionKey: params.sessionKey, sessionId: params.sessionId, workspaceDir: params.workspaceDir, messageProvider: params.messageProvider ?? undefined, trigger: params.trigger, channelId: params.messageChannel ?? params.messageProvider ?? undefined, }, ) .catch((err) => { log.warn(`agent_end hook failed: ${err}`); }); } } finally { clearTimeout(abortTimer); if (abortWarnTimer) { clearTimeout(abortWarnTimer); } if (!isProbeSession && (aborted || timedOut) && !timedOutDuringCompaction) { log.debug( `run cleanup: runId=${params.runId} sessionId=${params.sessionId} aborted=${aborted} timedOut=${timedOut}`, ); } try { unsubscribe(); } catch (err) { // unsubscribe() should never throw; if it does, it indicates a serious bug. // Log at error level to ensure visibility, but don't rethrow in finally block // as it would mask any exception from the try block above. log.error( `CRITICAL: unsubscribe failed, possible resource leak: runId=${params.runId} ${String(err)}`, ); } clearActiveEmbeddedRun(params.sessionId, queueHandle, params.sessionKey); params.abortSignal?.removeEventListener?.("abort", onAbort); } const lastAssistant = messagesSnapshot .slice() .toReversed() .find((m) => m.role === "assistant"); const toolMetasNormalized = toolMetas .filter( (entry): entry is { toolName: string; meta?: string } => typeof entry.toolName === "string" && entry.toolName.trim().length > 0, ) .map((entry) => ({ toolName: entry.toolName, meta: entry.meta })); if (hookRunner?.hasHooks("llm_output")) { hookRunner .runLlmOutput( { runId: params.runId, sessionId: params.sessionId, provider: params.provider, model: params.modelId, assistantTexts, lastAssistant, usage: getUsageTotals(), }, { agentId: hookAgentId, sessionKey: params.sessionKey, sessionId: params.sessionId, workspaceDir: params.workspaceDir, messageProvider: params.messageProvider ?? undefined, trigger: params.trigger, channelId: params.messageChannel ?? params.messageProvider ?? undefined, }, ) .catch((err) => { log.warn(`llm_output hook failed: ${String(err)}`); }); } return { aborted, timedOut, timedOutDuringCompaction, promptError, sessionIdUsed, bootstrapPromptWarningSignaturesSeen: bootstrapPromptWarning.warningSignaturesSeen, bootstrapPromptWarningSignature: bootstrapPromptWarning.signature, systemPromptReport, messagesSnapshot, assistantTexts, toolMetas: toolMetasNormalized, lastAssistant, lastToolError: getLastToolError?.(), didSendViaMessagingTool: didSendViaMessagingTool(), messagingToolSentTexts: getMessagingToolSentTexts(), messagingToolSentMediaUrls: getMessagingToolSentMediaUrls(), messagingToolSentTargets: getMessagingToolSentTargets(), successfulCronAdds: getSuccessfulCronAdds(), cloudCodeAssistFormatError: Boolean( lastAssistant?.errorMessage && isCloudCodeAssistFormatError(lastAssistant.errorMessage), ), attemptUsage: getUsageTotals(), compactionCount: getCompactionCount(), // Client tool call detected (OpenResponses hosted tools) clientToolCall: clientToolCallDetected ?? undefined, yieldDetected: yieldDetected || undefined, }; } finally { // Always tear down the session (and release the lock) before we leave this attempt. // // BUGFIX: Wait for the agent to be truly idle before flushing pending tool results. // pi-agent-core's auto-retry resolves waitForRetry() on assistant message receipt, // *before* tool execution completes in the retried agent loop. Without this wait, // flushPendingToolResults() fires while tools are still executing, inserting // synthetic "missing tool result" errors and causing silent agent failures. // See: https://github.com/openclaw/openclaw/issues/8643 removeToolResultContextGuard?.(); await flushPendingToolResultsAfterIdle({ agent: session?.agent, sessionManager, clearPendingOnTimeout: true, }); session?.dispose(); releaseWsSession(params.sessionId); await sessionLock.release(); } } finally { restoreSkillEnv?.(); process.chdir(prevCwd); } }