diff --git a/CHANGELOG.md b/CHANGELOG.md index 195e06ff224..3dd872231bb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,6 +36,7 @@ Docs: https://docs.openclaw.ai ### Fixes +- Agents/Anthropic: recover unhandled provider stop reasons (e.g. `sensitive`) as structured assistant errors instead of crashing the agent run. (#56639) - Google/models: resolve Gemini 3.1 pro, flash, and flash-lite for all Google provider aliases by passing the actual runtime provider ID and adding a template-provider fallback; fix flash-lite prefix ordering. (#56567) - OpenAI Codex/image tools: register Codex for media understanding and route image prompts through Codex instructions so image analysis no longer fails on missing provider registration or missing `instructions`. (#54829) Thanks @neeravmakwana. - Agents/image tool: restore the generic image-runtime fallback when no provider-specific media-understanding provider is registered, so image analysis works again for providers like `openrouter` and `minimax-portal`. (#54858) Thanks @MonkeyLeeT. diff --git a/src/agents/pi-embedded-runner/run/attempt.stop-reason-recovery.test.ts b/src/agents/pi-embedded-runner/run/attempt.stop-reason-recovery.test.ts new file mode 100644 index 00000000000..9d848bcd05b --- /dev/null +++ b/src/agents/pi-embedded-runner/run/attempt.stop-reason-recovery.test.ts @@ -0,0 +1,72 @@ +import type { StreamFn } from "@mariozechner/pi-agent-core"; +import { createAssistantMessageEventStream, type Context, type Model } from "@mariozechner/pi-ai"; +import { describe, expect, it } from "vitest"; +import { wrapStreamFnHandleSensitiveStopReason } from "./attempt.stop-reason-recovery.js"; + +const anthropicModel = { + api: "anthropic-messages", + provider: "anthropic", + id: "claude-sonnet-4-6", +} as Model<"anthropic-messages">; + +describe("wrapStreamFnHandleSensitiveStopReason", () => { + it("rewrites unhandled stop-reason errors into structured assistant errors", async () => { + const baseStreamFn: StreamFn = () => { + const stream = createAssistantMessageEventStream(); + queueMicrotask(() => { + stream.push({ + type: "error", + reason: "error", + error: { + role: "assistant", + content: [], + api: anthropicModel.api, + provider: anthropicModel.provider, + model: anthropicModel.id, + usage: { + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 0, + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, + }, + stopReason: "error", + errorMessage: "Unhandled stop reason: sensitive", + timestamp: Date.now(), + }, + }); + stream.end(); + }); + return stream; + }; + + const wrapped = wrapStreamFnHandleSensitiveStopReason(baseStreamFn); + const stream = await Promise.resolve( + wrapped(anthropicModel, { messages: [] } as Context, undefined), + ); + const result = await stream.result(); + + expect(result.stopReason).toBe("error"); + expect(result.errorMessage).toBe( + "The model stopped because the provider returned an unhandled stop reason: sensitive. Please rephrase and try again.", + ); + }); + + it("includes the extracted stop reason when converting synchronous throws", async () => { + const baseStreamFn: StreamFn = () => { + throw new Error("Unhandled stop reason: refusal_policy"); + }; + + const wrapped = wrapStreamFnHandleSensitiveStopReason(baseStreamFn); + const stream = await Promise.resolve( + wrapped(anthropicModel, { messages: [] } as Context, undefined), + ); + const result = await stream.result(); + + expect(result.stopReason).toBe("error"); + expect(result.errorMessage).toBe( + "The model stopped because the provider returned an unhandled stop reason: refusal_policy. Please rephrase and try again.", + ); + }); +}); diff --git a/src/agents/pi-embedded-runner/run/attempt.stop-reason-recovery.ts b/src/agents/pi-embedded-runner/run/attempt.stop-reason-recovery.ts new file mode 100644 index 00000000000..da20cbe1516 --- /dev/null +++ b/src/agents/pi-embedded-runner/run/attempt.stop-reason-recovery.ts @@ -0,0 +1,176 @@ +import type { StreamFn } from "@mariozechner/pi-agent-core"; +import { createAssistantMessageEventStream, streamSimple } from "@mariozechner/pi-ai"; +import { buildStreamErrorAssistantMessage } from "../../stream-message-shared.js"; + +const UNHANDLED_STOP_REASON_RE = /^Unhandled stop reason:\s*(.+)$/i; + +function formatUnhandledStopReasonErrorMessage(stopReason: string): string { + return `The model stopped because the provider returned an unhandled stop reason: ${stopReason}. Please rephrase and try again.`; +} + +function normalizeUnhandledStopReasonMessage(message: unknown): string | undefined { + if (typeof message !== "string") { + return undefined; + } + const match = message.trim().match(UNHANDLED_STOP_REASON_RE); + const stopReason = match?.[1]?.trim(); + if (!stopReason) { + return undefined; + } + return formatUnhandledStopReasonErrorMessage(stopReason); +} + +function patchUnhandledStopReasonInAssistantMessage(message: unknown): void { + if (!message || typeof message !== "object") { + return; + } + + const assistant = message as { errorMessage?: unknown; stopReason?: unknown }; + const normalizedMessage = normalizeUnhandledStopReasonMessage(assistant.errorMessage); + if (!normalizedMessage) { + return; + } + + assistant.stopReason = "error"; + assistant.errorMessage = normalizedMessage; +} + +function buildUnhandledStopReasonErrorStream( + model: Parameters[0], + errorMessage: string, +): ReturnType { + const stream = createAssistantMessageEventStream(); + queueMicrotask(() => { + stream.push({ + type: "error", + reason: "error", + error: buildStreamErrorAssistantMessage({ + model: { + api: model.api, + provider: model.provider, + id: model.id, + }, + errorMessage, + }), + }); + stream.end(); + }); + return stream; +} + +function wrapStreamHandleUnhandledStopReason( + model: Parameters[0], + stream: ReturnType, +): ReturnType { + const originalResult = stream.result.bind(stream); + stream.result = async () => { + try { + const message = await originalResult(); + patchUnhandledStopReasonInAssistantMessage(message); + return message; + } catch (err) { + const normalizedMessage = normalizeUnhandledStopReasonMessage( + err instanceof Error ? err.message : String(err), + ); + if (!normalizedMessage) { + throw err; + } + return buildStreamErrorAssistantMessage({ + model: { + api: model.api, + provider: model.provider, + id: model.id, + }, + errorMessage: normalizedMessage, + }); + } + }; + + const originalAsyncIterator = stream[Symbol.asyncIterator].bind(stream); + (stream as { [Symbol.asyncIterator]: typeof originalAsyncIterator })[Symbol.asyncIterator] = + function () { + const iterator = originalAsyncIterator(); + let emittedSyntheticTerminal = false; + return { + async next() { + if (emittedSyntheticTerminal) { + return { done: true as const, value: undefined }; + } + + try { + const result = await iterator.next(); + if (!result.done && result.value && typeof result.value === "object") { + const event = result.value as { error?: unknown }; + patchUnhandledStopReasonInAssistantMessage(event.error); + } + return result; + } catch (err) { + const normalizedMessage = normalizeUnhandledStopReasonMessage( + err instanceof Error ? err.message : String(err), + ); + if (!normalizedMessage) { + throw err; + } + emittedSyntheticTerminal = true; + return { + done: false as const, + value: { + type: "error" as const, + reason: "error" as const, + error: buildStreamErrorAssistantMessage({ + model: { + api: model.api, + provider: model.provider, + id: model.id, + }, + errorMessage: normalizedMessage, + }), + }, + }; + } + }, + async return(value?: unknown) { + return iterator.return?.(value) ?? { done: true as const, value: undefined }; + }, + async throw(error?: unknown) { + return iterator.throw?.(error) ?? { done: true as const, value: undefined }; + }, + [Symbol.asyncIterator]() { + return this; + }, + }; + }; + + return stream; +} + +export function wrapStreamFnHandleSensitiveStopReason(baseFn: StreamFn): StreamFn { + return (model, context, options) => { + try { + const maybeStream = baseFn(model, context, options); + if (maybeStream && typeof maybeStream === "object" && "then" in maybeStream) { + return Promise.resolve(maybeStream).then( + (stream) => wrapStreamHandleUnhandledStopReason(model, stream), + (err) => { + const normalizedMessage = normalizeUnhandledStopReasonMessage( + err instanceof Error ? err.message : String(err), + ); + if (!normalizedMessage) { + throw err; + } + return buildUnhandledStopReasonErrorStream(model, normalizedMessage); + }, + ); + } + return wrapStreamHandleUnhandledStopReason(model, maybeStream); + } catch (err) { + const normalizedMessage = normalizeUnhandledStopReasonMessage( + err instanceof Error ? err.message : String(err), + ); + if (!normalizedMessage) { + throw err; + } + return buildUnhandledStopReasonErrorStream(model, normalizedMessage); + } + }; +} diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index c3859aaa1c8..3004875ea12 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -164,6 +164,7 @@ import { wrapStreamFnDecodeXaiToolCallArguments, wrapStreamFnRepairMalformedToolCallArguments, } from "./attempt.tool-call-argument-repair.js"; +import { wrapStreamFnHandleSensitiveStopReason } from "./attempt.stop-reason-recovery.js"; import { wrapStreamFnSanitizeMalformedToolCalls, wrapStreamFnTrimToolCallNames, @@ -1062,6 +1063,13 @@ export async function runEmbeddedAttempt( ); } + // Anthropic-compatible providers can add new stop reasons before pi-ai maps them. + // Recover the known "sensitive" stop reason here so a model refusal does not + // bubble out as an uncaught runner error and stall channel polling. + activeSession.agent.streamFn = wrapStreamFnHandleSensitiveStopReason( + activeSession.agent.streamFn, + ); + try { const prior = await sanitizeSessionHistory({ messages: activeSession.messages,