mirror of https://github.com/openclaw/openclaw.git
fix(agents): handle unhandled stop reasons gracefully instead of crashing (#56639)
Wrap the embedded agent stream to catch 'Unhandled stop reason: ...' errors from the provider adapter and convert them into structured assistant error messages instead of crashing the agent run. Covers all unknown stop reasons so future provider additions don't crash the runner. The wrapper becomes a harmless no-op once the upstream dependency handles them natively. Fixes #43607
This commit is contained in:
parent
664680318e
commit
468185d1b5
|
|
@ -36,6 +36,7 @@ Docs: https://docs.openclaw.ai
|
|||
|
||||
### Fixes
|
||||
|
||||
- Agents/Anthropic: recover unhandled provider stop reasons (e.g. `sensitive`) as structured assistant errors instead of crashing the agent run. (#56639)
|
||||
- Google/models: resolve Gemini 3.1 pro, flash, and flash-lite for all Google provider aliases by passing the actual runtime provider ID and adding a template-provider fallback; fix flash-lite prefix ordering. (#56567)
|
||||
- OpenAI Codex/image tools: register Codex for media understanding and route image prompts through Codex instructions so image analysis no longer fails on missing provider registration or missing `instructions`. (#54829) Thanks @neeravmakwana.
|
||||
- Agents/image tool: restore the generic image-runtime fallback when no provider-specific media-understanding provider is registered, so image analysis works again for providers like `openrouter` and `minimax-portal`. (#54858) Thanks @MonkeyLeeT.
|
||||
|
|
|
|||
|
|
@ -0,0 +1,72 @@
|
|||
import type { StreamFn } from "@mariozechner/pi-agent-core";
|
||||
import { createAssistantMessageEventStream, type Context, type Model } from "@mariozechner/pi-ai";
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { wrapStreamFnHandleSensitiveStopReason } from "./attempt.stop-reason-recovery.js";
|
||||
|
||||
const anthropicModel = {
|
||||
api: "anthropic-messages",
|
||||
provider: "anthropic",
|
||||
id: "claude-sonnet-4-6",
|
||||
} as Model<"anthropic-messages">;
|
||||
|
||||
describe("wrapStreamFnHandleSensitiveStopReason", () => {
|
||||
it("rewrites unhandled stop-reason errors into structured assistant errors", async () => {
|
||||
const baseStreamFn: StreamFn = () => {
|
||||
const stream = createAssistantMessageEventStream();
|
||||
queueMicrotask(() => {
|
||||
stream.push({
|
||||
type: "error",
|
||||
reason: "error",
|
||||
error: {
|
||||
role: "assistant",
|
||||
content: [],
|
||||
api: anthropicModel.api,
|
||||
provider: anthropicModel.provider,
|
||||
model: anthropicModel.id,
|
||||
usage: {
|
||||
input: 0,
|
||||
output: 0,
|
||||
cacheRead: 0,
|
||||
cacheWrite: 0,
|
||||
totalTokens: 0,
|
||||
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
|
||||
},
|
||||
stopReason: "error",
|
||||
errorMessage: "Unhandled stop reason: sensitive",
|
||||
timestamp: Date.now(),
|
||||
},
|
||||
});
|
||||
stream.end();
|
||||
});
|
||||
return stream;
|
||||
};
|
||||
|
||||
const wrapped = wrapStreamFnHandleSensitiveStopReason(baseStreamFn);
|
||||
const stream = await Promise.resolve(
|
||||
wrapped(anthropicModel, { messages: [] } as Context, undefined),
|
||||
);
|
||||
const result = await stream.result();
|
||||
|
||||
expect(result.stopReason).toBe("error");
|
||||
expect(result.errorMessage).toBe(
|
||||
"The model stopped because the provider returned an unhandled stop reason: sensitive. Please rephrase and try again.",
|
||||
);
|
||||
});
|
||||
|
||||
it("includes the extracted stop reason when converting synchronous throws", async () => {
|
||||
const baseStreamFn: StreamFn = () => {
|
||||
throw new Error("Unhandled stop reason: refusal_policy");
|
||||
};
|
||||
|
||||
const wrapped = wrapStreamFnHandleSensitiveStopReason(baseStreamFn);
|
||||
const stream = await Promise.resolve(
|
||||
wrapped(anthropicModel, { messages: [] } as Context, undefined),
|
||||
);
|
||||
const result = await stream.result();
|
||||
|
||||
expect(result.stopReason).toBe("error");
|
||||
expect(result.errorMessage).toBe(
|
||||
"The model stopped because the provider returned an unhandled stop reason: refusal_policy. Please rephrase and try again.",
|
||||
);
|
||||
});
|
||||
});
|
||||
|
|
@ -0,0 +1,176 @@
|
|||
import type { StreamFn } from "@mariozechner/pi-agent-core";
|
||||
import { createAssistantMessageEventStream, streamSimple } from "@mariozechner/pi-ai";
|
||||
import { buildStreamErrorAssistantMessage } from "../../stream-message-shared.js";
|
||||
|
||||
const UNHANDLED_STOP_REASON_RE = /^Unhandled stop reason:\s*(.+)$/i;
|
||||
|
||||
function formatUnhandledStopReasonErrorMessage(stopReason: string): string {
|
||||
return `The model stopped because the provider returned an unhandled stop reason: ${stopReason}. Please rephrase and try again.`;
|
||||
}
|
||||
|
||||
function normalizeUnhandledStopReasonMessage(message: unknown): string | undefined {
|
||||
if (typeof message !== "string") {
|
||||
return undefined;
|
||||
}
|
||||
const match = message.trim().match(UNHANDLED_STOP_REASON_RE);
|
||||
const stopReason = match?.[1]?.trim();
|
||||
if (!stopReason) {
|
||||
return undefined;
|
||||
}
|
||||
return formatUnhandledStopReasonErrorMessage(stopReason);
|
||||
}
|
||||
|
||||
function patchUnhandledStopReasonInAssistantMessage(message: unknown): void {
|
||||
if (!message || typeof message !== "object") {
|
||||
return;
|
||||
}
|
||||
|
||||
const assistant = message as { errorMessage?: unknown; stopReason?: unknown };
|
||||
const normalizedMessage = normalizeUnhandledStopReasonMessage(assistant.errorMessage);
|
||||
if (!normalizedMessage) {
|
||||
return;
|
||||
}
|
||||
|
||||
assistant.stopReason = "error";
|
||||
assistant.errorMessage = normalizedMessage;
|
||||
}
|
||||
|
||||
function buildUnhandledStopReasonErrorStream(
|
||||
model: Parameters<StreamFn>[0],
|
||||
errorMessage: string,
|
||||
): ReturnType<typeof streamSimple> {
|
||||
const stream = createAssistantMessageEventStream();
|
||||
queueMicrotask(() => {
|
||||
stream.push({
|
||||
type: "error",
|
||||
reason: "error",
|
||||
error: buildStreamErrorAssistantMessage({
|
||||
model: {
|
||||
api: model.api,
|
||||
provider: model.provider,
|
||||
id: model.id,
|
||||
},
|
||||
errorMessage,
|
||||
}),
|
||||
});
|
||||
stream.end();
|
||||
});
|
||||
return stream;
|
||||
}
|
||||
|
||||
function wrapStreamHandleUnhandledStopReason(
|
||||
model: Parameters<StreamFn>[0],
|
||||
stream: ReturnType<typeof streamSimple>,
|
||||
): ReturnType<typeof streamSimple> {
|
||||
const originalResult = stream.result.bind(stream);
|
||||
stream.result = async () => {
|
||||
try {
|
||||
const message = await originalResult();
|
||||
patchUnhandledStopReasonInAssistantMessage(message);
|
||||
return message;
|
||||
} catch (err) {
|
||||
const normalizedMessage = normalizeUnhandledStopReasonMessage(
|
||||
err instanceof Error ? err.message : String(err),
|
||||
);
|
||||
if (!normalizedMessage) {
|
||||
throw err;
|
||||
}
|
||||
return buildStreamErrorAssistantMessage({
|
||||
model: {
|
||||
api: model.api,
|
||||
provider: model.provider,
|
||||
id: model.id,
|
||||
},
|
||||
errorMessage: normalizedMessage,
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
const originalAsyncIterator = stream[Symbol.asyncIterator].bind(stream);
|
||||
(stream as { [Symbol.asyncIterator]: typeof originalAsyncIterator })[Symbol.asyncIterator] =
|
||||
function () {
|
||||
const iterator = originalAsyncIterator();
|
||||
let emittedSyntheticTerminal = false;
|
||||
return {
|
||||
async next() {
|
||||
if (emittedSyntheticTerminal) {
|
||||
return { done: true as const, value: undefined };
|
||||
}
|
||||
|
||||
try {
|
||||
const result = await iterator.next();
|
||||
if (!result.done && result.value && typeof result.value === "object") {
|
||||
const event = result.value as { error?: unknown };
|
||||
patchUnhandledStopReasonInAssistantMessage(event.error);
|
||||
}
|
||||
return result;
|
||||
} catch (err) {
|
||||
const normalizedMessage = normalizeUnhandledStopReasonMessage(
|
||||
err instanceof Error ? err.message : String(err),
|
||||
);
|
||||
if (!normalizedMessage) {
|
||||
throw err;
|
||||
}
|
||||
emittedSyntheticTerminal = true;
|
||||
return {
|
||||
done: false as const,
|
||||
value: {
|
||||
type: "error" as const,
|
||||
reason: "error" as const,
|
||||
error: buildStreamErrorAssistantMessage({
|
||||
model: {
|
||||
api: model.api,
|
||||
provider: model.provider,
|
||||
id: model.id,
|
||||
},
|
||||
errorMessage: normalizedMessage,
|
||||
}),
|
||||
},
|
||||
};
|
||||
}
|
||||
},
|
||||
async return(value?: unknown) {
|
||||
return iterator.return?.(value) ?? { done: true as const, value: undefined };
|
||||
},
|
||||
async throw(error?: unknown) {
|
||||
return iterator.throw?.(error) ?? { done: true as const, value: undefined };
|
||||
},
|
||||
[Symbol.asyncIterator]() {
|
||||
return this;
|
||||
},
|
||||
};
|
||||
};
|
||||
|
||||
return stream;
|
||||
}
|
||||
|
||||
export function wrapStreamFnHandleSensitiveStopReason(baseFn: StreamFn): StreamFn {
|
||||
return (model, context, options) => {
|
||||
try {
|
||||
const maybeStream = baseFn(model, context, options);
|
||||
if (maybeStream && typeof maybeStream === "object" && "then" in maybeStream) {
|
||||
return Promise.resolve(maybeStream).then(
|
||||
(stream) => wrapStreamHandleUnhandledStopReason(model, stream),
|
||||
(err) => {
|
||||
const normalizedMessage = normalizeUnhandledStopReasonMessage(
|
||||
err instanceof Error ? err.message : String(err),
|
||||
);
|
||||
if (!normalizedMessage) {
|
||||
throw err;
|
||||
}
|
||||
return buildUnhandledStopReasonErrorStream(model, normalizedMessage);
|
||||
},
|
||||
);
|
||||
}
|
||||
return wrapStreamHandleUnhandledStopReason(model, maybeStream);
|
||||
} catch (err) {
|
||||
const normalizedMessage = normalizeUnhandledStopReasonMessage(
|
||||
err instanceof Error ? err.message : String(err),
|
||||
);
|
||||
if (!normalizedMessage) {
|
||||
throw err;
|
||||
}
|
||||
return buildUnhandledStopReasonErrorStream(model, normalizedMessage);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
@ -164,6 +164,7 @@ import {
|
|||
wrapStreamFnDecodeXaiToolCallArguments,
|
||||
wrapStreamFnRepairMalformedToolCallArguments,
|
||||
} from "./attempt.tool-call-argument-repair.js";
|
||||
import { wrapStreamFnHandleSensitiveStopReason } from "./attempt.stop-reason-recovery.js";
|
||||
import {
|
||||
wrapStreamFnSanitizeMalformedToolCalls,
|
||||
wrapStreamFnTrimToolCallNames,
|
||||
|
|
@ -1062,6 +1063,13 @@ export async function runEmbeddedAttempt(
|
|||
);
|
||||
}
|
||||
|
||||
// Anthropic-compatible providers can add new stop reasons before pi-ai maps them.
|
||||
// Recover the known "sensitive" stop reason here so a model refusal does not
|
||||
// bubble out as an uncaught runner error and stall channel polling.
|
||||
activeSession.agent.streamFn = wrapStreamFnHandleSensitiveStopReason(
|
||||
activeSession.agent.streamFn,
|
||||
);
|
||||
|
||||
try {
|
||||
const prior = await sanitizeSessionHistory({
|
||||
messages: activeSession.messages,
|
||||
|
|
|
|||
Loading…
Reference in New Issue