mirror of https://github.com/openclaw/openclaw.git
fix(agents): handle overloaded failover separately (#38301)
* fix(agents): skip auth-profile failure on overload * fix(agents): note overload auth-profile fallback fix * fix(agents): classify overloaded failures separately * fix(agents): back off before overload failover * fix(agents): tighten overload probe and backoff state * fix(agents): persist overloaded cooldown across runs * fix(agents): tighten overloaded status handling * test(agents): add overload regression coverage * fix(agents): restore runner imports after rebase * test(agents): add overload fallback integration coverage * fix(agents): harden overloaded failover abort handling * test(agents): tighten overload classifier coverage * test(agents): cover all-overloaded fallback exhaustion * fix(cron): retry overloaded fallback summaries * fix(cron): treat HTTP 529 as overloaded retry
This commit is contained in:
parent
110ca23bab
commit
6e962d8b9e
|
|
@ -121,6 +121,7 @@ Docs: https://docs.openclaw.ai
|
|||
- Security/auth labels: remove token and API-key snippets from user-facing auth status labels so `/status` and `/models` do not expose credential fragments. (#33262) thanks @cu1ch3n.
|
||||
- Auth/credential semantics: align profile eligibility + probe diagnostics with SecretRef/expiry rules and harden browser download atomic writes. (#33733) thanks @joshavant.
|
||||
- Security/audit denyCommands guidance: suggest likely exact node command IDs for unknown `gateway.nodes.denyCommands` entries so ineffective denylist entries are easier to correct. (#29713) thanks @liquidhorizon88-bot.
|
||||
- Agents/overload failover handling: classify overloaded provider failures separately from rate limits/status timeouts, add short overload backoff before retry/failover, record overloaded prompt/assistant failures as transient auth-profile cooldowns (with probeable same-provider fallback) instead of treating them like persistent auth/billing failures, and keep one-shot cron retry classification aligned so overloaded fallback summaries still count as transient retries.
|
||||
- Docs/security hardening guidance: document Docker `DOCKER-USER` + UFW policy and add cross-linking from Docker install docs for VPS/public-host setups. (#27613) thanks @dorukardahan.
|
||||
- Docs/security threat-model links: replace relative `.md` links with Mintlify-compatible root-relative routes in security docs to prevent broken internal navigation. (#27698) thanks @clawdoo.
|
||||
- Plugins/Update integrity drift: avoid false integrity drift prompts when updating npm-installed plugins from unpinned specs, while keeping drift checks for exact pinned versions. (#37179) Thanks @vincentkoc.
|
||||
|
|
|
|||
|
|
@ -370,6 +370,7 @@ When a job fails, OpenClaw classifies errors as **transient** (retryable) or **p
|
|||
### Transient errors (retried)
|
||||
|
||||
- Rate limit (429, too many requests, resource exhausted)
|
||||
- Provider overload (for example Anthropic `529 overloaded_error`, overload fallback summaries)
|
||||
- Network errors (timeout, ECONNRESET, fetch failed, socket)
|
||||
- Server errors (5xx)
|
||||
- Cloudflare-related errors
|
||||
|
|
@ -407,7 +408,7 @@ Configure `cron.retry` to override these defaults (see [Configuration](/automati
|
|||
retry: {
|
||||
maxAttempts: 3,
|
||||
backoffMs: [60000, 120000, 300000],
|
||||
retryOn: ["rate_limit", "network", "server_error"],
|
||||
retryOn: ["rate_limit", "overloaded", "network", "server_error"],
|
||||
},
|
||||
webhook: "https://example.invalid/legacy", // deprecated fallback for stored notify:true jobs
|
||||
webhookToken: "replace-with-dedicated-webhook-token", // optional bearer token for webhook mode
|
||||
|
|
@ -665,7 +666,7 @@ openclaw system event --mode now --text "Next heartbeat: check battery."
|
|||
- OpenClaw applies exponential retry backoff for recurring jobs after consecutive errors:
|
||||
30s, 1m, 5m, 15m, then 60m between retries.
|
||||
- Backoff resets automatically after the next successful run.
|
||||
- One-shot (`at`) jobs retry transient errors (rate limit, network, server_error) up to 3 times with backoff; permanent errors disable immediately. See [Retry policy](/automation/cron-jobs#retry-policy).
|
||||
- One-shot (`at`) jobs retry transient errors (rate limit, overloaded, network, server_error) up to 3 times with backoff; permanent errors disable immediately. See [Retry policy](/automation/cron-jobs#retry-policy).
|
||||
|
||||
### Telegram delivers to the wrong place
|
||||
|
||||
|
|
|
|||
|
|
@ -114,6 +114,22 @@ describe("markAuthProfileFailure", () => {
|
|||
expect(reloaded.usageStats?.["anthropic:default"]?.cooldownUntil).toBe(firstCooldownUntil);
|
||||
});
|
||||
});
|
||||
it("records overloaded failures in the cooldown bucket", async () => {
|
||||
await withAuthProfileStore(async ({ agentDir, store }) => {
|
||||
await markAuthProfileFailure({
|
||||
store,
|
||||
profileId: "anthropic:default",
|
||||
reason: "overloaded",
|
||||
agentDir,
|
||||
});
|
||||
|
||||
const stats = store.usageStats?.["anthropic:default"];
|
||||
expect(typeof stats?.cooldownUntil).toBe("number");
|
||||
expect(stats?.disabledUntil).toBeUndefined();
|
||||
expect(stats?.disabledReason).toBeUndefined();
|
||||
expect(stats?.failureCounts?.overloaded).toBe(1);
|
||||
});
|
||||
});
|
||||
it("disables auth_permanent failures via disabledUntil (like billing)", async () => {
|
||||
await withAuthProfileStore(async ({ agentDir, store }) => {
|
||||
await markAuthProfileFailure({
|
||||
|
|
|
|||
|
|
@ -39,6 +39,7 @@ export type AuthProfileFailureReason =
|
|||
| "auth"
|
||||
| "auth_permanent"
|
||||
| "format"
|
||||
| "overloaded"
|
||||
| "rate_limit"
|
||||
| "billing"
|
||||
| "timeout"
|
||||
|
|
|
|||
|
|
@ -177,6 +177,24 @@ describe("resolveProfilesUnavailableReason", () => {
|
|||
).toBe("auth");
|
||||
});
|
||||
|
||||
it("returns overloaded for active overloaded cooldown windows", () => {
|
||||
const now = Date.now();
|
||||
const store = makeStore({
|
||||
"anthropic:default": {
|
||||
cooldownUntil: now + 60_000,
|
||||
failureCounts: { overloaded: 2, rate_limit: 1 },
|
||||
},
|
||||
});
|
||||
|
||||
expect(
|
||||
resolveProfilesUnavailableReason({
|
||||
store,
|
||||
profileIds: ["anthropic:default"],
|
||||
now,
|
||||
}),
|
||||
).toBe("overloaded");
|
||||
});
|
||||
|
||||
it("falls back to rate_limit when active cooldown has no reason history", () => {
|
||||
const now = Date.now();
|
||||
const store = makeStore({
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ const FAILURE_REASON_PRIORITY: AuthProfileFailureReason[] = [
|
|||
"billing",
|
||||
"format",
|
||||
"model_not_found",
|
||||
"overloaded",
|
||||
"timeout",
|
||||
"rate_limit",
|
||||
"unknown",
|
||||
|
|
@ -35,7 +36,7 @@ export function resolveProfileUnusableUntil(
|
|||
}
|
||||
|
||||
/**
|
||||
* Check if a profile is currently in cooldown (due to rate limiting or errors).
|
||||
* Check if a profile is currently in cooldown (due to rate limits, overload, or other transient failures).
|
||||
*/
|
||||
export function isProfileInCooldown(
|
||||
store: AuthProfileStore,
|
||||
|
|
@ -508,7 +509,7 @@ export async function markAuthProfileFailure(params: {
|
|||
}
|
||||
|
||||
/**
|
||||
* Mark a profile as failed/rate-limited. Applies exponential backoff cooldown.
|
||||
* Mark a profile as transiently failed. Applies exponential backoff cooldown.
|
||||
* Cooldown times: 1min, 5min, 25min, max 1 hour.
|
||||
* Uses store lock to avoid overwriting concurrent usage updates.
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -75,7 +75,7 @@ describe("failover-error", () => {
|
|||
expect(resolveFailoverReasonFromError({ status: 522 })).toBeNull();
|
||||
expect(resolveFailoverReasonFromError({ status: 523 })).toBeNull();
|
||||
expect(resolveFailoverReasonFromError({ status: 524 })).toBeNull();
|
||||
expect(resolveFailoverReasonFromError({ status: 529 })).toBe("rate_limit");
|
||||
expect(resolveFailoverReasonFromError({ status: 529 })).toBe("overloaded");
|
||||
});
|
||||
|
||||
it("classifies documented provider error shapes at the error boundary", () => {
|
||||
|
|
@ -90,7 +90,7 @@ describe("failover-error", () => {
|
|||
status: 529,
|
||||
message: ANTHROPIC_OVERLOADED_PAYLOAD,
|
||||
}),
|
||||
).toBe("rate_limit");
|
||||
).toBe("overloaded");
|
||||
expect(
|
||||
resolveFailoverReasonFromError({
|
||||
status: 429,
|
||||
|
|
@ -126,7 +126,22 @@ describe("failover-error", () => {
|
|||
status: 503,
|
||||
message: GROQ_SERVICE_UNAVAILABLE_MESSAGE,
|
||||
}),
|
||||
).toBe("overloaded");
|
||||
});
|
||||
|
||||
it("keeps status-only 503s conservative unless the payload is clearly overloaded", () => {
|
||||
expect(
|
||||
resolveFailoverReasonFromError({
|
||||
status: 503,
|
||||
message: "Internal database error",
|
||||
}),
|
||||
).toBe("timeout");
|
||||
expect(
|
||||
resolveFailoverReasonFromError({
|
||||
status: 503,
|
||||
message: '{"error":{"message":"The model is overloaded. Please try later"}}',
|
||||
}),
|
||||
).toBe("overloaded");
|
||||
});
|
||||
|
||||
it("treats 400 insufficient_quota payloads as billing instead of format", () => {
|
||||
|
|
@ -151,6 +166,14 @@ describe("failover-error", () => {
|
|||
).toBe("rate_limit");
|
||||
});
|
||||
|
||||
it("treats overloaded provider payloads as overloaded", () => {
|
||||
expect(
|
||||
resolveFailoverReasonFromError({
|
||||
message: ANTHROPIC_OVERLOADED_PAYLOAD,
|
||||
}),
|
||||
).toBe("overloaded");
|
||||
});
|
||||
|
||||
it("keeps raw-text 402 weekly/monthly limit errors in billing", () => {
|
||||
expect(
|
||||
resolveFailoverReasonFromError({
|
||||
|
|
@ -221,6 +244,10 @@ describe("failover-error", () => {
|
|||
expect(err?.model).toBe("claude-opus-4-5");
|
||||
});
|
||||
|
||||
it("maps overloaded to a 503 fallback status", () => {
|
||||
expect(resolveFailoverStatus("overloaded")).toBe(503);
|
||||
});
|
||||
|
||||
it("coerces format errors with a 400 status", () => {
|
||||
const err = coerceToFailoverError("invalid request format", {
|
||||
provider: "google",
|
||||
|
|
|
|||
|
|
@ -49,6 +49,8 @@ export function resolveFailoverStatus(reason: FailoverReason): number | undefine
|
|||
return 402;
|
||||
case "rate_limit":
|
||||
return 429;
|
||||
case "overloaded":
|
||||
return 503;
|
||||
case "auth":
|
||||
return 401;
|
||||
case "auth_permanent":
|
||||
|
|
|
|||
|
|
@ -53,7 +53,7 @@ function expectPrimaryProbeSuccess(
|
|||
expect(result.result).toBe(expectedResult);
|
||||
expect(run).toHaveBeenCalledTimes(1);
|
||||
expect(run).toHaveBeenCalledWith("openai", "gpt-4.1-mini", {
|
||||
allowRateLimitCooldownProbe: true,
|
||||
allowTransientCooldownProbe: true,
|
||||
});
|
||||
}
|
||||
|
||||
|
|
@ -200,10 +200,48 @@ describe("runWithModelFallback – probe logic", () => {
|
|||
expect(result.result).toBe("fallback-ok");
|
||||
expect(run).toHaveBeenCalledTimes(2);
|
||||
expect(run).toHaveBeenNthCalledWith(1, "openai", "gpt-4.1-mini", {
|
||||
allowRateLimitCooldownProbe: true,
|
||||
allowTransientCooldownProbe: true,
|
||||
});
|
||||
expect(run).toHaveBeenNthCalledWith(2, "anthropic", "claude-haiku-3-5", {
|
||||
allowRateLimitCooldownProbe: true,
|
||||
allowTransientCooldownProbe: true,
|
||||
});
|
||||
});
|
||||
|
||||
it("attempts non-primary fallbacks during overloaded cooldown after primary probe failure", async () => {
|
||||
const cfg = makeCfg({
|
||||
agents: {
|
||||
defaults: {
|
||||
model: {
|
||||
primary: "openai/gpt-4.1-mini",
|
||||
fallbacks: ["anthropic/claude-haiku-3-5", "google/gemini-2-flash"],
|
||||
},
|
||||
},
|
||||
},
|
||||
} as Partial<OpenClawConfig>);
|
||||
|
||||
mockedIsProfileInCooldown.mockReturnValue(true);
|
||||
mockedGetSoonestCooldownExpiry.mockReturnValue(NOW + 30 * 1000);
|
||||
mockedResolveProfilesUnavailableReason.mockReturnValue("overloaded");
|
||||
|
||||
const run = vi
|
||||
.fn()
|
||||
.mockRejectedValueOnce(Object.assign(new Error("service overloaded"), { status: 503 }))
|
||||
.mockResolvedValue("fallback-ok");
|
||||
|
||||
const result = await runWithModelFallback({
|
||||
cfg,
|
||||
provider: "openai",
|
||||
model: "gpt-4.1-mini",
|
||||
run,
|
||||
});
|
||||
|
||||
expect(result.result).toBe("fallback-ok");
|
||||
expect(run).toHaveBeenCalledTimes(2);
|
||||
expect(run).toHaveBeenNthCalledWith(1, "openai", "gpt-4.1-mini", {
|
||||
allowTransientCooldownProbe: true,
|
||||
});
|
||||
expect(run).toHaveBeenNthCalledWith(2, "anthropic", "claude-haiku-3-5", {
|
||||
allowTransientCooldownProbe: true,
|
||||
});
|
||||
});
|
||||
|
||||
|
|
@ -326,10 +364,10 @@ describe("runWithModelFallback – probe logic", () => {
|
|||
});
|
||||
|
||||
expect(run).toHaveBeenNthCalledWith(1, "openai", "gpt-4.1-mini", {
|
||||
allowRateLimitCooldownProbe: true,
|
||||
allowTransientCooldownProbe: true,
|
||||
});
|
||||
expect(run).toHaveBeenNthCalledWith(2, "openai", "gpt-4.1-mini", {
|
||||
allowRateLimitCooldownProbe: true,
|
||||
allowTransientCooldownProbe: true,
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -0,0 +1,517 @@
|
|||
import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import type { AssistantMessage } from "@mariozechner/pi-ai";
|
||||
import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type { OpenClawConfig } from "../config/config.js";
|
||||
import type { AuthProfileFailureReason } from "./auth-profiles.js";
|
||||
import { runWithModelFallback } from "./model-fallback.js";
|
||||
import type { EmbeddedRunAttemptResult } from "./pi-embedded-runner/run/types.js";
|
||||
|
||||
const runEmbeddedAttemptMock = vi.fn<(params: unknown) => Promise<EmbeddedRunAttemptResult>>();
|
||||
const { computeBackoffMock, sleepWithAbortMock } = vi.hoisted(() => ({
|
||||
computeBackoffMock: vi.fn(
|
||||
(
|
||||
_policy: { initialMs: number; maxMs: number; factor: number; jitter: number },
|
||||
_attempt: number,
|
||||
) => 321,
|
||||
),
|
||||
sleepWithAbortMock: vi.fn(async (_ms: number, _abortSignal?: AbortSignal) => undefined),
|
||||
}));
|
||||
|
||||
vi.mock("./pi-embedded-runner/run/attempt.js", () => ({
|
||||
runEmbeddedAttempt: (params: unknown) => runEmbeddedAttemptMock(params),
|
||||
}));
|
||||
|
||||
vi.mock("../infra/backoff.js", () => ({
|
||||
computeBackoff: (
|
||||
policy: { initialMs: number; maxMs: number; factor: number; jitter: number },
|
||||
attempt: number,
|
||||
) => computeBackoffMock(policy, attempt),
|
||||
sleepWithAbort: (ms: number, abortSignal?: AbortSignal) => sleepWithAbortMock(ms, abortSignal),
|
||||
}));
|
||||
|
||||
vi.mock("./models-config.js", async (importOriginal) => {
|
||||
const mod = await importOriginal<typeof import("./models-config.js")>();
|
||||
return {
|
||||
...mod,
|
||||
ensureOpenClawModelsJson: vi.fn(async () => ({ wrote: false })),
|
||||
};
|
||||
});
|
||||
|
||||
let runEmbeddedPiAgent: typeof import("./pi-embedded-runner/run.js").runEmbeddedPiAgent;
|
||||
|
||||
beforeAll(async () => {
|
||||
({ runEmbeddedPiAgent } = await import("./pi-embedded-runner/run.js"));
|
||||
});
|
||||
|
||||
beforeEach(() => {
|
||||
runEmbeddedAttemptMock.mockReset();
|
||||
computeBackoffMock.mockClear();
|
||||
sleepWithAbortMock.mockClear();
|
||||
});
|
||||
|
||||
const baseUsage = {
|
||||
input: 0,
|
||||
output: 0,
|
||||
cacheRead: 0,
|
||||
cacheWrite: 0,
|
||||
totalTokens: 0,
|
||||
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
|
||||
};
|
||||
|
||||
const OVERLOADED_ERROR_PAYLOAD =
|
||||
'{"type":"error","error":{"type":"overloaded_error","message":"Overloaded"}}';
|
||||
|
||||
const buildAssistant = (overrides: Partial<AssistantMessage>): AssistantMessage => ({
|
||||
role: "assistant",
|
||||
content: [],
|
||||
api: "openai-responses",
|
||||
provider: "openai",
|
||||
model: "mock-1",
|
||||
usage: baseUsage,
|
||||
stopReason: "stop",
|
||||
timestamp: Date.now(),
|
||||
...overrides,
|
||||
});
|
||||
|
||||
const makeAttempt = (overrides: Partial<EmbeddedRunAttemptResult>): EmbeddedRunAttemptResult => ({
|
||||
aborted: false,
|
||||
timedOut: false,
|
||||
timedOutDuringCompaction: false,
|
||||
promptError: null,
|
||||
sessionIdUsed: "session:test",
|
||||
systemPromptReport: undefined,
|
||||
messagesSnapshot: [],
|
||||
assistantTexts: [],
|
||||
toolMetas: [],
|
||||
lastAssistant: undefined,
|
||||
didSendViaMessagingTool: false,
|
||||
messagingToolSentTexts: [],
|
||||
messagingToolSentMediaUrls: [],
|
||||
messagingToolSentTargets: [],
|
||||
cloudCodeAssistFormatError: false,
|
||||
...overrides,
|
||||
});
|
||||
|
||||
function makeConfig(): OpenClawConfig {
|
||||
return {
|
||||
agents: {
|
||||
defaults: {
|
||||
model: {
|
||||
primary: "openai/mock-1",
|
||||
fallbacks: ["groq/mock-2"],
|
||||
},
|
||||
},
|
||||
},
|
||||
models: {
|
||||
providers: {
|
||||
openai: {
|
||||
api: "openai-responses",
|
||||
apiKey: "sk-openai",
|
||||
baseUrl: "https://example.com/openai",
|
||||
models: [
|
||||
{
|
||||
id: "mock-1",
|
||||
name: "Mock 1",
|
||||
reasoning: false,
|
||||
input: ["text"],
|
||||
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 },
|
||||
contextWindow: 16_000,
|
||||
maxTokens: 2048,
|
||||
},
|
||||
],
|
||||
},
|
||||
groq: {
|
||||
api: "openai-responses",
|
||||
apiKey: "sk-groq",
|
||||
baseUrl: "https://example.com/groq",
|
||||
models: [
|
||||
{
|
||||
id: "mock-2",
|
||||
name: "Mock 2",
|
||||
reasoning: false,
|
||||
input: ["text"],
|
||||
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 },
|
||||
contextWindow: 16_000,
|
||||
maxTokens: 2048,
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
},
|
||||
} satisfies OpenClawConfig;
|
||||
}
|
||||
|
||||
async function withAgentWorkspace<T>(
|
||||
fn: (ctx: { agentDir: string; workspaceDir: string }) => Promise<T>,
|
||||
): Promise<T> {
|
||||
const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-model-fallback-"));
|
||||
const agentDir = path.join(root, "agent");
|
||||
const workspaceDir = path.join(root, "workspace");
|
||||
await fs.mkdir(agentDir, { recursive: true });
|
||||
await fs.mkdir(workspaceDir, { recursive: true });
|
||||
try {
|
||||
return await fn({ agentDir, workspaceDir });
|
||||
} finally {
|
||||
await fs.rm(root, { recursive: true, force: true });
|
||||
}
|
||||
}
|
||||
|
||||
async function writeAuthStore(
|
||||
agentDir: string,
|
||||
usageStats?: Record<
|
||||
string,
|
||||
{
|
||||
lastUsed?: number;
|
||||
cooldownUntil?: number;
|
||||
disabledUntil?: number;
|
||||
disabledReason?: AuthProfileFailureReason;
|
||||
failureCounts?: Partial<Record<AuthProfileFailureReason, number>>;
|
||||
}
|
||||
>,
|
||||
) {
|
||||
await fs.writeFile(
|
||||
path.join(agentDir, "auth-profiles.json"),
|
||||
JSON.stringify({
|
||||
version: 1,
|
||||
profiles: {
|
||||
"openai:p1": { type: "api_key", provider: "openai", key: "sk-openai" },
|
||||
"groq:p1": { type: "api_key", provider: "groq", key: "sk-groq" },
|
||||
},
|
||||
usageStats:
|
||||
usageStats ??
|
||||
({
|
||||
"openai:p1": { lastUsed: 1 },
|
||||
"groq:p1": { lastUsed: 2 },
|
||||
} as const),
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
async function readUsageStats(agentDir: string) {
|
||||
const raw = await fs.readFile(path.join(agentDir, "auth-profiles.json"), "utf-8");
|
||||
return JSON.parse(raw).usageStats as Record<string, Record<string, unknown> | undefined>;
|
||||
}
|
||||
|
||||
async function runEmbeddedFallback(params: {
|
||||
agentDir: string;
|
||||
workspaceDir: string;
|
||||
sessionKey: string;
|
||||
runId: string;
|
||||
abortSignal?: AbortSignal;
|
||||
}) {
|
||||
const cfg = makeConfig();
|
||||
return await runWithModelFallback({
|
||||
cfg,
|
||||
provider: "openai",
|
||||
model: "mock-1",
|
||||
agentDir: params.agentDir,
|
||||
run: (provider, model, options) =>
|
||||
runEmbeddedPiAgent({
|
||||
sessionId: `session:${params.runId}`,
|
||||
sessionKey: params.sessionKey,
|
||||
sessionFile: path.join(params.workspaceDir, `${params.runId}.jsonl`),
|
||||
workspaceDir: params.workspaceDir,
|
||||
agentDir: params.agentDir,
|
||||
config: cfg,
|
||||
prompt: "hello",
|
||||
provider,
|
||||
model,
|
||||
authProfileIdSource: "auto",
|
||||
allowTransientCooldownProbe: options?.allowTransientCooldownProbe,
|
||||
timeoutMs: 5_000,
|
||||
runId: params.runId,
|
||||
abortSignal: params.abortSignal,
|
||||
}),
|
||||
});
|
||||
}
|
||||
|
||||
function mockPrimaryOverloadedThenFallbackSuccess() {
|
||||
runEmbeddedAttemptMock.mockImplementation(async (params: unknown) => {
|
||||
const attemptParams = params as { provider: string; modelId: string; authProfileId?: string };
|
||||
if (attemptParams.provider === "openai") {
|
||||
return makeAttempt({
|
||||
assistantTexts: [],
|
||||
lastAssistant: buildAssistant({
|
||||
provider: "openai",
|
||||
model: "mock-1",
|
||||
stopReason: "error",
|
||||
errorMessage: OVERLOADED_ERROR_PAYLOAD,
|
||||
}),
|
||||
});
|
||||
}
|
||||
if (attemptParams.provider === "groq") {
|
||||
return makeAttempt({
|
||||
assistantTexts: ["fallback ok"],
|
||||
lastAssistant: buildAssistant({
|
||||
provider: "groq",
|
||||
model: "mock-2",
|
||||
stopReason: "stop",
|
||||
content: [{ type: "text", text: "fallback ok" }],
|
||||
}),
|
||||
});
|
||||
}
|
||||
throw new Error(`Unexpected provider ${attemptParams.provider}`);
|
||||
});
|
||||
}
|
||||
|
||||
function mockAllProvidersOverloaded() {
|
||||
runEmbeddedAttemptMock.mockImplementation(async (params: unknown) => {
|
||||
const attemptParams = params as { provider: string; modelId: string; authProfileId?: string };
|
||||
if (attemptParams.provider === "openai" || attemptParams.provider === "groq") {
|
||||
return makeAttempt({
|
||||
assistantTexts: [],
|
||||
lastAssistant: buildAssistant({
|
||||
provider: attemptParams.provider,
|
||||
model: attemptParams.provider === "openai" ? "mock-1" : "mock-2",
|
||||
stopReason: "error",
|
||||
errorMessage: OVERLOADED_ERROR_PAYLOAD,
|
||||
}),
|
||||
});
|
||||
}
|
||||
throw new Error(`Unexpected provider ${attemptParams.provider}`);
|
||||
});
|
||||
}
|
||||
|
||||
describe("runWithModelFallback + runEmbeddedPiAgent overload policy", () => {
|
||||
it("falls back across providers after overloaded primary failure and persists transient cooldown", async () => {
|
||||
await withAgentWorkspace(async ({ agentDir, workspaceDir }) => {
|
||||
await writeAuthStore(agentDir);
|
||||
mockPrimaryOverloadedThenFallbackSuccess();
|
||||
|
||||
const result = await runEmbeddedFallback({
|
||||
agentDir,
|
||||
workspaceDir,
|
||||
sessionKey: "agent:test:overloaded-cross-provider",
|
||||
runId: "run:overloaded-cross-provider",
|
||||
});
|
||||
|
||||
expect(result.provider).toBe("groq");
|
||||
expect(result.model).toBe("mock-2");
|
||||
expect(result.attempts[0]?.reason).toBe("overloaded");
|
||||
expect(result.result.payloads?.[0]?.text ?? "").toContain("fallback ok");
|
||||
|
||||
const usageStats = await readUsageStats(agentDir);
|
||||
expect(typeof usageStats["openai:p1"]?.cooldownUntil).toBe("number");
|
||||
expect(usageStats["openai:p1"]?.failureCounts).toMatchObject({ overloaded: 1 });
|
||||
expect(typeof usageStats["groq:p1"]?.lastUsed).toBe("number");
|
||||
|
||||
expect(runEmbeddedAttemptMock).toHaveBeenCalledTimes(2);
|
||||
const firstCall = runEmbeddedAttemptMock.mock.calls[0]?.[0] as
|
||||
| { provider?: string }
|
||||
| undefined;
|
||||
const secondCall = runEmbeddedAttemptMock.mock.calls[1]?.[0] as
|
||||
| { provider?: string }
|
||||
| undefined;
|
||||
expect(firstCall).toBeDefined();
|
||||
expect(secondCall).toBeDefined();
|
||||
expect(firstCall?.provider).toBe("openai");
|
||||
expect(secondCall?.provider).toBe("groq");
|
||||
expect(computeBackoffMock).toHaveBeenCalledTimes(1);
|
||||
expect(sleepWithAbortMock).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
|
||||
it("surfaces a bounded overloaded summary when every fallback candidate is overloaded", async () => {
|
||||
await withAgentWorkspace(async ({ agentDir, workspaceDir }) => {
|
||||
await writeAuthStore(agentDir);
|
||||
mockAllProvidersOverloaded();
|
||||
|
||||
let thrown: unknown;
|
||||
try {
|
||||
await runEmbeddedFallback({
|
||||
agentDir,
|
||||
workspaceDir,
|
||||
sessionKey: "agent:test:all-overloaded",
|
||||
runId: "run:all-overloaded",
|
||||
});
|
||||
} catch (err) {
|
||||
thrown = err;
|
||||
}
|
||||
|
||||
expect(thrown).toBeInstanceOf(Error);
|
||||
expect((thrown as Error).message).toMatch(/^All models failed \(2\): /);
|
||||
expect((thrown as Error).message).toMatch(
|
||||
/openai\/mock-1: .* \(overloaded\) \| groq\/mock-2: .* \(overloaded\)/,
|
||||
);
|
||||
|
||||
const usageStats = await readUsageStats(agentDir);
|
||||
expect(typeof usageStats["openai:p1"]?.cooldownUntil).toBe("number");
|
||||
expect(typeof usageStats["groq:p1"]?.cooldownUntil).toBe("number");
|
||||
expect(usageStats["openai:p1"]?.failureCounts).toMatchObject({ overloaded: 1 });
|
||||
expect(usageStats["groq:p1"]?.failureCounts).toMatchObject({ overloaded: 1 });
|
||||
expect(usageStats["openai:p1"]?.disabledUntil).toBeUndefined();
|
||||
expect(usageStats["groq:p1"]?.disabledUntil).toBeUndefined();
|
||||
|
||||
expect(runEmbeddedAttemptMock).toHaveBeenCalledTimes(2);
|
||||
expect(computeBackoffMock).toHaveBeenCalledTimes(2);
|
||||
expect(sleepWithAbortMock).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
});
|
||||
|
||||
it("probes a provider already in overloaded cooldown before falling back", async () => {
|
||||
await withAgentWorkspace(async ({ agentDir, workspaceDir }) => {
|
||||
const now = Date.now();
|
||||
await writeAuthStore(agentDir, {
|
||||
"openai:p1": {
|
||||
lastUsed: 1,
|
||||
cooldownUntil: now + 60_000,
|
||||
failureCounts: { overloaded: 2 },
|
||||
},
|
||||
"groq:p1": { lastUsed: 2 },
|
||||
});
|
||||
mockPrimaryOverloadedThenFallbackSuccess();
|
||||
|
||||
const result = await runEmbeddedFallback({
|
||||
agentDir,
|
||||
workspaceDir,
|
||||
sessionKey: "agent:test:overloaded-probe-fallback",
|
||||
runId: "run:overloaded-probe-fallback",
|
||||
});
|
||||
|
||||
expect(result.provider).toBe("groq");
|
||||
expect(runEmbeddedAttemptMock).toHaveBeenCalledTimes(2);
|
||||
const firstCall = runEmbeddedAttemptMock.mock.calls[0]?.[0] as
|
||||
| { provider?: string; authProfileId?: string }
|
||||
| undefined;
|
||||
const secondCall = runEmbeddedAttemptMock.mock.calls[1]?.[0] as
|
||||
| { provider?: string }
|
||||
| undefined;
|
||||
expect(firstCall).toBeDefined();
|
||||
expect(secondCall).toBeDefined();
|
||||
expect(firstCall?.provider).toBe("openai");
|
||||
expect(firstCall?.authProfileId).toBe("openai:p1");
|
||||
expect(secondCall?.provider).toBe("groq");
|
||||
});
|
||||
});
|
||||
|
||||
it("persists overloaded cooldown across turns while still allowing one probe and fallback", async () => {
|
||||
await withAgentWorkspace(async ({ agentDir, workspaceDir }) => {
|
||||
await writeAuthStore(agentDir);
|
||||
mockPrimaryOverloadedThenFallbackSuccess();
|
||||
|
||||
const firstResult = await runEmbeddedFallback({
|
||||
agentDir,
|
||||
workspaceDir,
|
||||
sessionKey: "agent:test:overloaded-two-turns:first",
|
||||
runId: "run:overloaded-two-turns:first",
|
||||
});
|
||||
|
||||
expect(firstResult.provider).toBe("groq");
|
||||
|
||||
runEmbeddedAttemptMock.mockClear();
|
||||
computeBackoffMock.mockClear();
|
||||
sleepWithAbortMock.mockClear();
|
||||
|
||||
mockPrimaryOverloadedThenFallbackSuccess();
|
||||
|
||||
const secondResult = await runEmbeddedFallback({
|
||||
agentDir,
|
||||
workspaceDir,
|
||||
sessionKey: "agent:test:overloaded-two-turns:second",
|
||||
runId: "run:overloaded-two-turns:second",
|
||||
});
|
||||
|
||||
expect(secondResult.provider).toBe("groq");
|
||||
expect(runEmbeddedAttemptMock).toHaveBeenCalledTimes(2);
|
||||
|
||||
const firstCall = runEmbeddedAttemptMock.mock.calls[0]?.[0] as
|
||||
| { provider?: string; authProfileId?: string }
|
||||
| undefined;
|
||||
const secondCall = runEmbeddedAttemptMock.mock.calls[1]?.[0] as
|
||||
| { provider?: string }
|
||||
| undefined;
|
||||
expect(firstCall).toBeDefined();
|
||||
expect(secondCall).toBeDefined();
|
||||
expect(firstCall?.provider).toBe("openai");
|
||||
expect(firstCall?.authProfileId).toBe("openai:p1");
|
||||
expect(secondCall?.provider).toBe("groq");
|
||||
|
||||
const usageStats = await readUsageStats(agentDir);
|
||||
expect(typeof usageStats["openai:p1"]?.cooldownUntil).toBe("number");
|
||||
expect(usageStats["openai:p1"]?.failureCounts).toMatchObject({ overloaded: 2 });
|
||||
expect(computeBackoffMock).toHaveBeenCalledTimes(1);
|
||||
expect(sleepWithAbortMock).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
|
||||
it("keeps bare service-unavailable failures in the timeout lane without persisting cooldown", async () => {
|
||||
await withAgentWorkspace(async ({ agentDir, workspaceDir }) => {
|
||||
await writeAuthStore(agentDir);
|
||||
runEmbeddedAttemptMock.mockImplementation(async (params: unknown) => {
|
||||
const attemptParams = params as { provider: string };
|
||||
if (attemptParams.provider === "openai") {
|
||||
return makeAttempt({
|
||||
assistantTexts: [],
|
||||
lastAssistant: buildAssistant({
|
||||
provider: "openai",
|
||||
model: "mock-1",
|
||||
stopReason: "error",
|
||||
errorMessage: "LLM error: service unavailable",
|
||||
}),
|
||||
});
|
||||
}
|
||||
if (attemptParams.provider === "groq") {
|
||||
return makeAttempt({
|
||||
assistantTexts: ["fallback ok"],
|
||||
lastAssistant: buildAssistant({
|
||||
provider: "groq",
|
||||
model: "mock-2",
|
||||
stopReason: "stop",
|
||||
content: [{ type: "text", text: "fallback ok" }],
|
||||
}),
|
||||
});
|
||||
}
|
||||
throw new Error(`Unexpected provider ${attemptParams.provider}`);
|
||||
});
|
||||
|
||||
const result = await runEmbeddedFallback({
|
||||
agentDir,
|
||||
workspaceDir,
|
||||
sessionKey: "agent:test:timeout-cross-provider",
|
||||
runId: "run:timeout-cross-provider",
|
||||
});
|
||||
|
||||
expect(result.provider).toBe("groq");
|
||||
expect(result.attempts[0]?.reason).toBe("timeout");
|
||||
|
||||
const usageStats = await readUsageStats(agentDir);
|
||||
expect(usageStats["openai:p1"]?.cooldownUntil).toBeUndefined();
|
||||
expect(usageStats["openai:p1"]?.failureCounts).toBeUndefined();
|
||||
expect(computeBackoffMock).not.toHaveBeenCalled();
|
||||
expect(sleepWithAbortMock).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
it("rethrows AbortError during overload backoff instead of falling through fallback", async () => {
|
||||
await withAgentWorkspace(async ({ agentDir, workspaceDir }) => {
|
||||
await writeAuthStore(agentDir);
|
||||
const controller = new AbortController();
|
||||
mockPrimaryOverloadedThenFallbackSuccess();
|
||||
sleepWithAbortMock.mockImplementationOnce(async () => {
|
||||
controller.abort();
|
||||
throw new Error("aborted");
|
||||
});
|
||||
|
||||
await expect(
|
||||
runEmbeddedFallback({
|
||||
agentDir,
|
||||
workspaceDir,
|
||||
sessionKey: "agent:test:overloaded-backoff-abort",
|
||||
runId: "run:overloaded-backoff-abort",
|
||||
abortSignal: controller.signal,
|
||||
}),
|
||||
).rejects.toMatchObject({
|
||||
name: "AbortError",
|
||||
message: "Operation aborted",
|
||||
});
|
||||
|
||||
expect(runEmbeddedAttemptMock).toHaveBeenCalledTimes(1);
|
||||
const firstCall = runEmbeddedAttemptMock.mock.calls[0]?.[0] as
|
||||
| { provider?: string }
|
||||
| undefined;
|
||||
expect(firstCall?.provider).toBe("openai");
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
@ -1062,7 +1062,7 @@ describe("runWithModelFallback", () => {
|
|||
describe("fallback behavior with provider cooldowns", () => {
|
||||
async function makeAuthStoreWithCooldown(
|
||||
provider: string,
|
||||
reason: "rate_limit" | "auth" | "billing",
|
||||
reason: "rate_limit" | "overloaded" | "auth" | "billing",
|
||||
): Promise<{ store: AuthProfileStore; dir: string }> {
|
||||
const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-test-"));
|
||||
const now = Date.now();
|
||||
|
|
@ -1073,12 +1073,12 @@ describe("runWithModelFallback", () => {
|
|||
},
|
||||
usageStats: {
|
||||
[`${provider}:default`]:
|
||||
reason === "rate_limit"
|
||||
reason === "rate_limit" || reason === "overloaded"
|
||||
? {
|
||||
// Real rate-limit cooldowns are tracked through cooldownUntil
|
||||
// and failureCounts, not disabledReason.
|
||||
// Transient cooldown reasons are tracked through
|
||||
// cooldownUntil and failureCounts, not disabledReason.
|
||||
cooldownUntil: now + 300000,
|
||||
failureCounts: { rate_limit: 1 },
|
||||
failureCounts: { [reason]: 1 },
|
||||
}
|
||||
: {
|
||||
// Auth/billing issues use disabledUntil
|
||||
|
|
@ -1117,7 +1117,37 @@ describe("runWithModelFallback", () => {
|
|||
expect(result.result).toBe("sonnet success");
|
||||
expect(run).toHaveBeenCalledTimes(1); // Primary skipped, fallback attempted
|
||||
expect(run).toHaveBeenNthCalledWith(1, "anthropic", "claude-sonnet-4-5", {
|
||||
allowRateLimitCooldownProbe: true,
|
||||
allowTransientCooldownProbe: true,
|
||||
});
|
||||
});
|
||||
|
||||
it("attempts same-provider fallbacks during overloaded cooldown", async () => {
|
||||
const { dir } = await makeAuthStoreWithCooldown("anthropic", "overloaded");
|
||||
const cfg = makeCfg({
|
||||
agents: {
|
||||
defaults: {
|
||||
model: {
|
||||
primary: "anthropic/claude-opus-4-6",
|
||||
fallbacks: ["anthropic/claude-sonnet-4-5", "groq/llama-3.3-70b-versatile"],
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
const run = vi.fn().mockResolvedValueOnce("sonnet success");
|
||||
|
||||
const result = await runWithModelFallback({
|
||||
cfg,
|
||||
provider: "anthropic",
|
||||
model: "claude-opus-4-6",
|
||||
run,
|
||||
agentDir: dir,
|
||||
});
|
||||
|
||||
expect(result.result).toBe("sonnet success");
|
||||
expect(run).toHaveBeenCalledTimes(1);
|
||||
expect(run).toHaveBeenNthCalledWith(1, "anthropic", "claude-sonnet-4-5", {
|
||||
allowTransientCooldownProbe: true,
|
||||
});
|
||||
});
|
||||
|
||||
|
|
@ -1224,7 +1254,7 @@ describe("runWithModelFallback", () => {
|
|||
expect(result.result).toBe("groq success");
|
||||
expect(run).toHaveBeenCalledTimes(2);
|
||||
expect(run).toHaveBeenNthCalledWith(1, "anthropic", "claude-sonnet-4-5", {
|
||||
allowRateLimitCooldownProbe: true,
|
||||
allowTransientCooldownProbe: true,
|
||||
}); // Rate limit allows attempt
|
||||
expect(run).toHaveBeenNthCalledWith(2, "groq", "llama-3.3-70b-versatile"); // Cross-provider works
|
||||
});
|
||||
|
|
|
|||
|
|
@ -34,7 +34,7 @@ type ModelCandidate = {
|
|||
};
|
||||
|
||||
export type ModelFallbackRunOptions = {
|
||||
allowRateLimitCooldownProbe?: boolean;
|
||||
allowTransientCooldownProbe?: boolean;
|
||||
};
|
||||
|
||||
type ModelFallbackRunFn<T> = (
|
||||
|
|
@ -428,11 +428,11 @@ function resolveCooldownDecision(params: {
|
|||
}
|
||||
|
||||
// For primary: try when requested model or when probe allows.
|
||||
// For same-provider fallbacks: only relax cooldown on rate_limit, which
|
||||
// is commonly model-scoped and can recover on a sibling model.
|
||||
// For same-provider fallbacks: only relax cooldown on transient provider
|
||||
// limits, which are often model-scoped and can recover on a sibling model.
|
||||
const shouldAttemptDespiteCooldown =
|
||||
(params.isPrimary && (!params.requestedModel || shouldProbe)) ||
|
||||
(!params.isPrimary && inferredReason === "rate_limit");
|
||||
(!params.isPrimary && (inferredReason === "rate_limit" || inferredReason === "overloaded"));
|
||||
if (!shouldAttemptDespiteCooldown) {
|
||||
return {
|
||||
type: "skip",
|
||||
|
|
@ -514,8 +514,8 @@ export async function runWithModelFallback<T>(params: {
|
|||
if (decision.markProbe) {
|
||||
lastProbeAttempt.set(probeThrottleKey, now);
|
||||
}
|
||||
if (decision.reason === "rate_limit") {
|
||||
runOptions = { allowRateLimitCooldownProbe: true };
|
||||
if (decision.reason === "rate_limit" || decision.reason === "overloaded") {
|
||||
runOptions = { allowTransientCooldownProbe: true };
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -509,12 +509,12 @@ describe("classifyFailoverReason", () => {
|
|||
it("classifies documented provider error messages", () => {
|
||||
expect(classifyFailoverReason(OPENAI_RATE_LIMIT_MESSAGE)).toBe("rate_limit");
|
||||
expect(classifyFailoverReason(GEMINI_RESOURCE_EXHAUSTED_MESSAGE)).toBe("rate_limit");
|
||||
expect(classifyFailoverReason(ANTHROPIC_OVERLOADED_PAYLOAD)).toBe("rate_limit");
|
||||
expect(classifyFailoverReason(ANTHROPIC_OVERLOADED_PAYLOAD)).toBe("overloaded");
|
||||
expect(classifyFailoverReason(OPENROUTER_CREDITS_MESSAGE)).toBe("billing");
|
||||
expect(classifyFailoverReason(TOGETHER_PAYMENT_REQUIRED_MESSAGE)).toBe("billing");
|
||||
expect(classifyFailoverReason(TOGETHER_ENGINE_OVERLOADED_MESSAGE)).toBe("timeout");
|
||||
expect(classifyFailoverReason(TOGETHER_ENGINE_OVERLOADED_MESSAGE)).toBe("overloaded");
|
||||
expect(classifyFailoverReason(GROQ_TOO_MANY_REQUESTS_MESSAGE)).toBe("rate_limit");
|
||||
expect(classifyFailoverReason(GROQ_SERVICE_UNAVAILABLE_MESSAGE)).toBe("timeout");
|
||||
expect(classifyFailoverReason(GROQ_SERVICE_UNAVAILABLE_MESSAGE)).toBe("overloaded");
|
||||
});
|
||||
|
||||
it("classifies internal and compatibility error messages", () => {
|
||||
|
|
@ -572,25 +572,29 @@ describe("classifyFailoverReason", () => {
|
|||
"rate_limit",
|
||||
);
|
||||
});
|
||||
it("classifies provider high-demand / service-unavailable messages as rate_limit", () => {
|
||||
it("classifies provider high-demand / service-unavailable messages as overloaded", () => {
|
||||
expect(
|
||||
classifyFailoverReason(
|
||||
"This model is currently experiencing high demand. Please try again later.",
|
||||
),
|
||||
).toBe("rate_limit");
|
||||
// "service unavailable" combined with overload/capacity indicator → rate_limit
|
||||
).toBe("overloaded");
|
||||
// "service unavailable" combined with overload/capacity indicator → overloaded
|
||||
// (exercises the new regex — none of the standalone patterns match here)
|
||||
expect(classifyFailoverReason("service unavailable due to capacity limits")).toBe("rate_limit");
|
||||
expect(classifyFailoverReason("service unavailable due to capacity limits")).toBe("overloaded");
|
||||
expect(
|
||||
classifyFailoverReason(
|
||||
'{"error":{"code":503,"message":"The model is overloaded. Please try later","status":"UNAVAILABLE"}}',
|
||||
),
|
||||
).toBe("rate_limit");
|
||||
).toBe("overloaded");
|
||||
});
|
||||
it("classifies bare 'service unavailable' as timeout instead of rate_limit (#32828)", () => {
|
||||
// A generic "service unavailable" from a proxy/CDN should stay retryable,
|
||||
// but it should not be treated as provider overload / rate limit.
|
||||
expect(classifyFailoverReason("LLM error: service unavailable")).toBe("timeout");
|
||||
expect(classifyFailoverReason("503 Internal Database Error")).toBe("timeout");
|
||||
// Raw 529 text without explicit overload keywords still classifies as overloaded.
|
||||
expect(classifyFailoverReason("529 API is busy")).toBe("overloaded");
|
||||
expect(classifyFailoverReason("529 Please try again")).toBe("overloaded");
|
||||
});
|
||||
it("classifies zhipuai Weekly/Monthly Limit Exhausted as rate_limit (#33785)", () => {
|
||||
expect(
|
||||
|
|
|
|||
|
|
@ -293,13 +293,17 @@ export function classifyFailoverReasonFromHttpStatus(
|
|||
if (status === 408) {
|
||||
return "timeout";
|
||||
}
|
||||
// Keep the status-only path conservative and behavior-preserving.
|
||||
// Message-path HTTP heuristics are broader and should not leak in here.
|
||||
if (status === 502 || status === 503 || status === 504) {
|
||||
if (status === 503) {
|
||||
if (message && isOverloadedErrorMessage(message)) {
|
||||
return "overloaded";
|
||||
}
|
||||
return "timeout";
|
||||
}
|
||||
if (status === 502 || status === 504) {
|
||||
return "timeout";
|
||||
}
|
||||
if (status === 529) {
|
||||
return "rate_limit";
|
||||
return "overloaded";
|
||||
}
|
||||
if (status === 400) {
|
||||
// Some providers return quota/balance errors under HTTP 400, so do not
|
||||
|
|
@ -854,13 +858,6 @@ export function classifyFailoverReason(raw: string): FailoverReason | null {
|
|||
if (isModelNotFoundErrorMessage(raw)) {
|
||||
return "model_not_found";
|
||||
}
|
||||
if (isTransientHttpError(raw)) {
|
||||
// Treat transient 5xx provider failures as retryable transport issues.
|
||||
return "timeout";
|
||||
}
|
||||
if (isJsonApiInternalServerError(raw)) {
|
||||
return "timeout";
|
||||
}
|
||||
if (isPeriodicUsageLimitErrorMessage(raw)) {
|
||||
return isBillingErrorMessage(raw) ? "billing" : "rate_limit";
|
||||
}
|
||||
|
|
@ -868,7 +865,19 @@ export function classifyFailoverReason(raw: string): FailoverReason | null {
|
|||
return "rate_limit";
|
||||
}
|
||||
if (isOverloadedErrorMessage(raw)) {
|
||||
return "rate_limit";
|
||||
return "overloaded";
|
||||
}
|
||||
if (isTransientHttpError(raw)) {
|
||||
// 529 is always overloaded, even without explicit overload keywords in the body.
|
||||
const status = extractLeadingHttpStatus(raw.trim());
|
||||
if (status?.code === 529) {
|
||||
return "overloaded";
|
||||
}
|
||||
// Treat remaining transient 5xx provider failures as retryable transport issues.
|
||||
return "timeout";
|
||||
}
|
||||
if (isJsonApiInternalServerError(raw)) {
|
||||
return "timeout";
|
||||
}
|
||||
if (isCloudCodeAssistFormatError(raw)) {
|
||||
return "format";
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ export type FailoverReason =
|
|||
| "auth_permanent"
|
||||
| "format"
|
||||
| "rate_limit"
|
||||
| "overloaded"
|
||||
| "billing"
|
||||
| "timeout"
|
||||
| "model_not_found"
|
||||
|
|
|
|||
|
|
@ -9,11 +9,28 @@ import type { EmbeddedRunAttemptResult } from "./pi-embedded-runner/run/types.js
|
|||
|
||||
const runEmbeddedAttemptMock = vi.fn<(params: unknown) => Promise<EmbeddedRunAttemptResult>>();
|
||||
const resolveCopilotApiTokenMock = vi.fn();
|
||||
const { computeBackoffMock, sleepWithAbortMock } = vi.hoisted(() => ({
|
||||
computeBackoffMock: vi.fn(
|
||||
(
|
||||
_policy: { initialMs: number; maxMs: number; factor: number; jitter: number },
|
||||
_attempt: number,
|
||||
) => 321,
|
||||
),
|
||||
sleepWithAbortMock: vi.fn(async (_ms: number, _abortSignal?: AbortSignal) => undefined),
|
||||
}));
|
||||
|
||||
vi.mock("./pi-embedded-runner/run/attempt.js", () => ({
|
||||
runEmbeddedAttempt: (params: unknown) => runEmbeddedAttemptMock(params),
|
||||
}));
|
||||
|
||||
vi.mock("../infra/backoff.js", () => ({
|
||||
computeBackoff: (
|
||||
policy: { initialMs: number; maxMs: number; factor: number; jitter: number },
|
||||
attempt: number,
|
||||
) => computeBackoffMock(policy, attempt),
|
||||
sleepWithAbort: (ms: number, abortSignal?: AbortSignal) => sleepWithAbortMock(ms, abortSignal),
|
||||
}));
|
||||
|
||||
vi.mock("../providers/github-copilot-token.js", () => ({
|
||||
DEFAULT_COPILOT_API_BASE_URL: "https://api.individual.githubcopilot.com",
|
||||
resolveCopilotApiToken: (...args: unknown[]) => resolveCopilotApiTokenMock(...args),
|
||||
|
|
@ -43,6 +60,8 @@ beforeEach(() => {
|
|||
vi.useRealTimers();
|
||||
runEmbeddedAttemptMock.mockClear();
|
||||
resolveCopilotApiTokenMock.mockReset();
|
||||
computeBackoffMock.mockClear();
|
||||
sleepWithAbortMock.mockClear();
|
||||
});
|
||||
|
||||
const baseUsage = {
|
||||
|
|
@ -252,6 +271,24 @@ const mockFailedThenSuccessfulAttempt = (errorMessage = "rate limit") => {
|
|||
);
|
||||
};
|
||||
|
||||
const mockPromptErrorThenSuccessfulAttempt = (errorMessage: string) => {
|
||||
runEmbeddedAttemptMock
|
||||
.mockResolvedValueOnce(
|
||||
makeAttempt({
|
||||
promptError: new Error(errorMessage),
|
||||
}),
|
||||
)
|
||||
.mockResolvedValueOnce(
|
||||
makeAttempt({
|
||||
assistantTexts: ["ok"],
|
||||
lastAssistant: buildAssistant({
|
||||
stopReason: "stop",
|
||||
content: [{ type: "text", text: "ok" }],
|
||||
}),
|
||||
}),
|
||||
);
|
||||
};
|
||||
|
||||
async function runAutoPinnedOpenAiTurn(params: {
|
||||
agentDir: string;
|
||||
workspaceDir: string;
|
||||
|
|
@ -320,6 +357,28 @@ async function runAutoPinnedRotationCase(params: {
|
|||
});
|
||||
}
|
||||
|
||||
async function runAutoPinnedPromptErrorRotationCase(params: {
|
||||
errorMessage: string;
|
||||
sessionKey: string;
|
||||
runId: string;
|
||||
}) {
|
||||
runEmbeddedAttemptMock.mockClear();
|
||||
return withAgentWorkspace(async ({ agentDir, workspaceDir }) => {
|
||||
await writeAuthStore(agentDir);
|
||||
mockPromptErrorThenSuccessfulAttempt(params.errorMessage);
|
||||
await runAutoPinnedOpenAiTurn({
|
||||
agentDir,
|
||||
workspaceDir,
|
||||
sessionKey: params.sessionKey,
|
||||
runId: params.runId,
|
||||
});
|
||||
|
||||
expect(runEmbeddedAttemptMock).toHaveBeenCalledTimes(2);
|
||||
const usageStats = await readUsageStats(agentDir);
|
||||
return { usageStats };
|
||||
});
|
||||
}
|
||||
|
||||
function mockSingleSuccessfulAttempt() {
|
||||
runEmbeddedAttemptMock.mockResolvedValueOnce(
|
||||
makeAttempt({
|
||||
|
|
@ -639,13 +698,48 @@ describe("runEmbeddedPiAgent auth profile rotation", () => {
|
|||
expect(typeof usageStats["openai:p2"]?.lastUsed).toBe("number");
|
||||
});
|
||||
|
||||
it("rotates for overloaded prompt failures across auto-pinned profiles", async () => {
|
||||
it("rotates for overloaded assistant failures across auto-pinned profiles", async () => {
|
||||
const { usageStats } = await runAutoPinnedRotationCase({
|
||||
errorMessage: '{"type":"error","error":{"type":"overloaded_error","message":"Overloaded"}}',
|
||||
sessionKey: "agent:test:overloaded-rotation",
|
||||
runId: "run:overloaded-rotation",
|
||||
});
|
||||
expect(typeof usageStats["openai:p2"]?.lastUsed).toBe("number");
|
||||
expect(typeof usageStats["openai:p1"]?.cooldownUntil).toBe("number");
|
||||
expect(computeBackoffMock).toHaveBeenCalledTimes(1);
|
||||
expect(computeBackoffMock).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
initialMs: 250,
|
||||
maxMs: 1500,
|
||||
factor: 2,
|
||||
jitter: 0.2,
|
||||
}),
|
||||
1,
|
||||
);
|
||||
expect(sleepWithAbortMock).toHaveBeenCalledTimes(1);
|
||||
expect(sleepWithAbortMock).toHaveBeenCalledWith(321, undefined);
|
||||
});
|
||||
|
||||
it("rotates for overloaded prompt failures across auto-pinned profiles", async () => {
|
||||
const { usageStats } = await runAutoPinnedPromptErrorRotationCase({
|
||||
errorMessage: '{"type":"error","error":{"type":"overloaded_error","message":"Overloaded"}}',
|
||||
sessionKey: "agent:test:overloaded-prompt-rotation",
|
||||
runId: "run:overloaded-prompt-rotation",
|
||||
});
|
||||
expect(typeof usageStats["openai:p2"]?.lastUsed).toBe("number");
|
||||
expect(typeof usageStats["openai:p1"]?.cooldownUntil).toBe("number");
|
||||
expect(computeBackoffMock).toHaveBeenCalledTimes(1);
|
||||
expect(computeBackoffMock).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
initialMs: 250,
|
||||
maxMs: 1500,
|
||||
factor: 2,
|
||||
jitter: 0.2,
|
||||
}),
|
||||
1,
|
||||
);
|
||||
expect(sleepWithAbortMock).toHaveBeenCalledTimes(1);
|
||||
expect(sleepWithAbortMock).toHaveBeenCalledWith(321, undefined);
|
||||
});
|
||||
|
||||
it("rotates on timeout without cooling down the timed-out profile", async () => {
|
||||
|
|
@ -656,6 +750,8 @@ describe("runEmbeddedPiAgent auth profile rotation", () => {
|
|||
});
|
||||
expect(typeof usageStats["openai:p2"]?.lastUsed).toBe("number");
|
||||
expect(usageStats["openai:p1"]?.cooldownUntil).toBeUndefined();
|
||||
expect(computeBackoffMock).not.toHaveBeenCalled();
|
||||
expect(sleepWithAbortMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("rotates on bare service unavailable without cooling down the profile", async () => {
|
||||
|
|
@ -829,7 +925,7 @@ describe("runEmbeddedPiAgent auth profile rotation", () => {
|
|||
});
|
||||
});
|
||||
|
||||
it("can probe one cooldowned profile when rate-limit cooldown probe is explicitly allowed", async () => {
|
||||
it("can probe one cooldowned profile when transient cooldown probe is explicitly allowed", async () => {
|
||||
await withTimedAgentWorkspace(async ({ agentDir, workspaceDir, now }) => {
|
||||
await writeAuthStore(agentDir, {
|
||||
usageStats: {
|
||||
|
|
@ -859,7 +955,7 @@ describe("runEmbeddedPiAgent auth profile rotation", () => {
|
|||
provider: "openai",
|
||||
model: "mock-1",
|
||||
authProfileIdSource: "auto",
|
||||
allowRateLimitCooldownProbe: true,
|
||||
allowTransientCooldownProbe: true,
|
||||
timeoutMs: 5_000,
|
||||
runId: "run:cooldown-probe",
|
||||
});
|
||||
|
|
@ -869,6 +965,54 @@ describe("runEmbeddedPiAgent auth profile rotation", () => {
|
|||
});
|
||||
});
|
||||
|
||||
it("can probe one cooldowned profile when overloaded cooldown is explicitly probeable", async () => {
|
||||
await withTimedAgentWorkspace(async ({ agentDir, workspaceDir, now }) => {
|
||||
await writeAuthStore(agentDir, {
|
||||
usageStats: {
|
||||
"openai:p1": {
|
||||
lastUsed: 1,
|
||||
cooldownUntil: now + 60 * 60 * 1000,
|
||||
failureCounts: { overloaded: 4 },
|
||||
},
|
||||
"openai:p2": {
|
||||
lastUsed: 2,
|
||||
cooldownUntil: now + 60 * 60 * 1000,
|
||||
failureCounts: { overloaded: 4 },
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
runEmbeddedAttemptMock.mockResolvedValueOnce(
|
||||
makeAttempt({
|
||||
assistantTexts: ["ok"],
|
||||
lastAssistant: buildAssistant({
|
||||
stopReason: "stop",
|
||||
content: [{ type: "text", text: "ok" }],
|
||||
}),
|
||||
}),
|
||||
);
|
||||
|
||||
const result = await runEmbeddedPiAgent({
|
||||
sessionId: "session:test",
|
||||
sessionKey: "agent:test:overloaded-cooldown-probe",
|
||||
sessionFile: path.join(workspaceDir, "session.jsonl"),
|
||||
workspaceDir,
|
||||
agentDir,
|
||||
config: makeConfig({ fallbacks: ["openai/mock-2"] }),
|
||||
prompt: "hello",
|
||||
provider: "openai",
|
||||
model: "mock-1",
|
||||
authProfileIdSource: "auto",
|
||||
allowTransientCooldownProbe: true,
|
||||
timeoutMs: 5_000,
|
||||
runId: "run:overloaded-cooldown-probe",
|
||||
});
|
||||
|
||||
expect(runEmbeddedAttemptMock).toHaveBeenCalledTimes(1);
|
||||
expect(result.payloads?.[0]?.text ?? "").toContain("ok");
|
||||
});
|
||||
});
|
||||
|
||||
it("treats agent-level fallbacks as configured when defaults have none", async () => {
|
||||
await withTimedAgentWorkspace(async ({ agentDir, workspaceDir, now }) => {
|
||||
await writeAuthStore(agentDir, {
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import {
|
|||
ensureContextEnginesInitialized,
|
||||
resolveContextEngine,
|
||||
} from "../../context-engine/index.js";
|
||||
import { computeBackoff, sleepWithAbort, type BackoffPolicy } from "../../infra/backoff.js";
|
||||
import { generateSecureToken } from "../../infra/secure-random.js";
|
||||
import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js";
|
||||
import type { PluginHookBeforeAgentStartResult } from "../../plugins/types.js";
|
||||
|
|
@ -14,6 +15,7 @@ import { resolveOpenClawAgentDir } from "../agent-paths.js";
|
|||
import { hasConfiguredModelFallbacks } from "../agent-scope.js";
|
||||
import {
|
||||
isProfileInCooldown,
|
||||
type AuthProfileFailureReason,
|
||||
markAuthProfileFailure,
|
||||
markAuthProfileGood,
|
||||
markAuthProfileUsed,
|
||||
|
|
@ -79,6 +81,14 @@ type CopilotTokenState = {
|
|||
const COPILOT_REFRESH_MARGIN_MS = 5 * 60 * 1000;
|
||||
const COPILOT_REFRESH_RETRY_MS = 60 * 1000;
|
||||
const COPILOT_REFRESH_MIN_DELAY_MS = 5 * 1000;
|
||||
// Keep overload pacing noticeable enough to avoid tight retry bursts, but short
|
||||
// enough that fallback still feels responsive within a single turn.
|
||||
const OVERLOAD_FAILOVER_BACKOFF_POLICY: BackoffPolicy = {
|
||||
initialMs: 250,
|
||||
maxMs: 1_500,
|
||||
factor: 2,
|
||||
jitter: 0.2,
|
||||
};
|
||||
|
||||
// Avoid Anthropic's refusal test token poisoning session transcripts.
|
||||
const ANTHROPIC_MAGIC_STRING_TRIGGER_REFUSAL = "ANTHROPIC_MAGIC_STRING_TRIGGER_REFUSAL";
|
||||
|
|
@ -649,21 +659,21 @@ export async function runEmbeddedPiAgent(
|
|||
profileIds: autoProfileCandidates,
|
||||
}) ?? "rate_limit")
|
||||
: null;
|
||||
const allowRateLimitCooldownProbe =
|
||||
params.allowRateLimitCooldownProbe === true &&
|
||||
const allowTransientCooldownProbe =
|
||||
params.allowTransientCooldownProbe === true &&
|
||||
allAutoProfilesInCooldown &&
|
||||
unavailableReason === "rate_limit";
|
||||
let didRateLimitCooldownProbe = false;
|
||||
(unavailableReason === "rate_limit" || unavailableReason === "overloaded");
|
||||
let didTransientCooldownProbe = false;
|
||||
|
||||
while (profileIndex < profileCandidates.length) {
|
||||
const candidate = profileCandidates[profileIndex];
|
||||
const inCooldown =
|
||||
candidate && candidate !== lockedProfileId && isProfileInCooldown(authStore, candidate);
|
||||
if (inCooldown) {
|
||||
if (allowRateLimitCooldownProbe && !didRateLimitCooldownProbe) {
|
||||
didRateLimitCooldownProbe = true;
|
||||
if (allowTransientCooldownProbe && !didTransientCooldownProbe) {
|
||||
didTransientCooldownProbe = true;
|
||||
log.warn(
|
||||
`probing cooldowned auth profile for ${provider}/${modelId} due to rate_limit unavailability`,
|
||||
`probing cooldowned auth profile for ${provider}/${modelId} due to ${unavailableReason ?? "transient"} unavailability`,
|
||||
);
|
||||
} else {
|
||||
profileIndex += 1;
|
||||
|
|
@ -722,9 +732,10 @@ export async function runEmbeddedPiAgent(
|
|||
let lastRunPromptUsage: ReturnType<typeof normalizeUsage> | undefined;
|
||||
let autoCompactionCount = 0;
|
||||
let runLoopIterations = 0;
|
||||
let overloadFailoverAttempts = 0;
|
||||
const maybeMarkAuthProfileFailure = async (failure: {
|
||||
profileId?: string;
|
||||
reason?: Parameters<typeof markAuthProfileFailure>[0]["reason"] | null;
|
||||
reason?: AuthProfileFailureReason | null;
|
||||
config?: RunEmbeddedPiAgentParams["config"];
|
||||
agentDir?: RunEmbeddedPiAgentParams["agentDir"];
|
||||
}) => {
|
||||
|
|
@ -740,6 +751,36 @@ export async function runEmbeddedPiAgent(
|
|||
agentDir,
|
||||
});
|
||||
};
|
||||
const resolveAuthProfileFailureReason = (
|
||||
failoverReason: FailoverReason | null,
|
||||
): AuthProfileFailureReason | null => {
|
||||
// Timeouts are transport/model-path failures, not auth health signals,
|
||||
// so they should not persist auth-profile failure state.
|
||||
if (!failoverReason || failoverReason === "timeout") {
|
||||
return null;
|
||||
}
|
||||
return failoverReason;
|
||||
};
|
||||
const maybeBackoffBeforeOverloadFailover = async (reason: FailoverReason | null) => {
|
||||
if (reason !== "overloaded") {
|
||||
return;
|
||||
}
|
||||
overloadFailoverAttempts += 1;
|
||||
const delayMs = computeBackoff(OVERLOAD_FAILOVER_BACKOFF_POLICY, overloadFailoverAttempts);
|
||||
log.warn(
|
||||
`overload backoff before failover for ${provider}/${modelId}: attempt=${overloadFailoverAttempts} delayMs=${delayMs}`,
|
||||
);
|
||||
try {
|
||||
await sleepWithAbort(delayMs, params.abortSignal);
|
||||
} catch (err) {
|
||||
if (params.abortSignal?.aborted) {
|
||||
const abortErr = new Error("Operation aborted", { cause: err });
|
||||
abortErr.name = "AbortError";
|
||||
throw abortErr;
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
};
|
||||
// Resolve the context engine once and reuse across retries to avoid
|
||||
// repeated initialization/connection overhead per attempt.
|
||||
ensureContextEnginesInitialized();
|
||||
|
|
@ -1165,15 +1206,19 @@ export async function runEmbeddedPiAgent(
|
|||
};
|
||||
}
|
||||
const promptFailoverReason = classifyFailoverReason(errorText);
|
||||
const promptProfileFailureReason =
|
||||
resolveAuthProfileFailureReason(promptFailoverReason);
|
||||
await maybeMarkAuthProfileFailure({
|
||||
profileId: lastProfileId,
|
||||
reason: promptFailoverReason,
|
||||
reason: promptProfileFailureReason,
|
||||
});
|
||||
const promptFailoverFailure = isFailoverErrorMessage(errorText);
|
||||
if (
|
||||
isFailoverErrorMessage(errorText) &&
|
||||
promptFailoverFailure &&
|
||||
promptFailoverReason !== "timeout" &&
|
||||
(await advanceAuthProfile())
|
||||
) {
|
||||
await maybeBackoffBeforeOverloadFailover(promptFailoverReason);
|
||||
continue;
|
||||
}
|
||||
const fallbackThinking = pickFallbackThinkingLevel({
|
||||
|
|
@ -1187,9 +1232,11 @@ export async function runEmbeddedPiAgent(
|
|||
thinkLevel = fallbackThinking;
|
||||
continue;
|
||||
}
|
||||
// FIX: Throw FailoverError for prompt errors when fallbacks configured
|
||||
// This enables model fallback for quota/rate limit errors during prompt submission
|
||||
if (fallbackConfigured && isFailoverErrorMessage(errorText)) {
|
||||
// Throw FailoverError for prompt-side failover reasons when fallbacks
|
||||
// are configured so outer model fallback can continue on overload,
|
||||
// rate-limit, auth, or billing failures.
|
||||
if (fallbackConfigured && promptFailoverFailure) {
|
||||
await maybeBackoffBeforeOverloadFailover(promptFailoverReason);
|
||||
throw new FailoverError(errorText, {
|
||||
reason: promptFailoverReason ?? "unknown",
|
||||
provider,
|
||||
|
|
@ -1218,6 +1265,8 @@ export async function runEmbeddedPiAgent(
|
|||
const billingFailure = isBillingAssistantError(lastAssistant);
|
||||
const failoverFailure = isFailoverAssistantError(lastAssistant);
|
||||
const assistantFailoverReason = classifyFailoverReason(lastAssistant?.errorMessage ?? "");
|
||||
const assistantProfileFailureReason =
|
||||
resolveAuthProfileFailureReason(assistantFailoverReason);
|
||||
const cloudCodeAssistFormatError = attempt.cloudCodeAssistFormatError;
|
||||
const imageDimensionError = parseImageDimensionError(lastAssistant?.errorMessage ?? "");
|
||||
|
||||
|
|
@ -1257,10 +1306,7 @@ export async function runEmbeddedPiAgent(
|
|||
|
||||
if (shouldRotate) {
|
||||
if (lastProfileId) {
|
||||
const reason =
|
||||
timedOut || assistantFailoverReason === "timeout"
|
||||
? "timeout"
|
||||
: (assistantFailoverReason ?? "unknown");
|
||||
const reason = timedOut ? "timeout" : assistantProfileFailureReason;
|
||||
// Skip cooldown for timeouts: a timeout is model/network-specific,
|
||||
// not an auth issue. Marking the profile would poison fallback models
|
||||
// on the same provider (e.g. gpt-5.3 timeout blocks gpt-5.2).
|
||||
|
|
@ -1280,10 +1326,12 @@ export async function runEmbeddedPiAgent(
|
|||
|
||||
const rotated = await advanceAuthProfile();
|
||||
if (rotated) {
|
||||
await maybeBackoffBeforeOverloadFailover(assistantFailoverReason);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (fallbackConfigured) {
|
||||
await maybeBackoffBeforeOverloadFailover(assistantFailoverReason);
|
||||
// Prefer formatted error message (user-friendly) over raw errorMessage
|
||||
const message =
|
||||
(lastAssistant
|
||||
|
|
|
|||
|
|
@ -115,10 +115,10 @@ export type RunEmbeddedPiAgentParams = {
|
|||
enforceFinalTag?: boolean;
|
||||
/**
|
||||
* Allow a single run attempt even when all auth profiles are in cooldown,
|
||||
* but only for inferred `rate_limit` cooldowns.
|
||||
* but only for inferred transient cooldowns like `rate_limit` or `overloaded`.
|
||||
*
|
||||
* This is used by model fallback when trying sibling models on providers
|
||||
* where rate limits are often model-scoped.
|
||||
* where transient service pressure is often model-scoped.
|
||||
*/
|
||||
allowRateLimitCooldownProbe?: boolean;
|
||||
allowTransientCooldownProbe?: boolean;
|
||||
};
|
||||
|
|
|
|||
|
|
@ -311,7 +311,7 @@ export async function runAgentTurnWithFallback(params: {
|
|||
model,
|
||||
runId,
|
||||
authProfile,
|
||||
allowRateLimitCooldownProbe: runOptions?.allowRateLimitCooldownProbe,
|
||||
allowTransientCooldownProbe: runOptions?.allowTransientCooldownProbe,
|
||||
});
|
||||
return (async () => {
|
||||
const result = await runEmbeddedPiAgent({
|
||||
|
|
|
|||
|
|
@ -487,7 +487,7 @@ export async function runMemoryFlushIfNeeded(params: {
|
|||
model,
|
||||
runId: flushRunId,
|
||||
authProfile,
|
||||
allowRateLimitCooldownProbe: runOptions?.allowRateLimitCooldownProbe,
|
||||
allowTransientCooldownProbe: runOptions?.allowTransientCooldownProbe,
|
||||
});
|
||||
const result = await runEmbeddedPiAgent({
|
||||
...embeddedContext,
|
||||
|
|
|
|||
|
|
@ -166,7 +166,7 @@ export function buildEmbeddedRunBaseParams(params: {
|
|||
model: string;
|
||||
runId: string;
|
||||
authProfile: ReturnType<typeof resolveProviderScopedAuthProfile>;
|
||||
allowRateLimitCooldownProbe?: boolean;
|
||||
allowTransientCooldownProbe?: boolean;
|
||||
}) {
|
||||
return {
|
||||
sessionFile: params.run.sessionFile,
|
||||
|
|
@ -187,7 +187,7 @@ export function buildEmbeddedRunBaseParams(params: {
|
|||
bashElevated: params.run.bashElevated,
|
||||
timeoutMs: params.run.timeoutMs,
|
||||
runId: params.runId,
|
||||
allowRateLimitCooldownProbe: params.allowRateLimitCooldownProbe,
|
||||
allowTransientCooldownProbe: params.allowTransientCooldownProbe,
|
||||
};
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1054,6 +1054,11 @@ describe("runReplyAgent typing (heartbeat)", () => {
|
|||
reportedReason: "rate_limit",
|
||||
expectedReason: "rate limit",
|
||||
},
|
||||
{
|
||||
existingReason: undefined,
|
||||
reportedReason: "overloaded",
|
||||
expectedReason: "overloaded",
|
||||
},
|
||||
{
|
||||
existingReason: "rate limit",
|
||||
reportedReason: "timeout",
|
||||
|
|
|
|||
|
|
@ -208,7 +208,7 @@ export function createFollowupRunner(params: {
|
|||
bashElevated: queued.run.bashElevated,
|
||||
timeoutMs: queued.run.timeoutMs,
|
||||
runId,
|
||||
allowRateLimitCooldownProbe: runOptions?.allowRateLimitCooldownProbe,
|
||||
allowTransientCooldownProbe: runOptions?.allowTransientCooldownProbe,
|
||||
blockReplyBreak: queued.run.blockReplyBreak,
|
||||
bootstrapPromptWarningSignaturesSeen,
|
||||
bootstrapPromptWarningSignature:
|
||||
|
|
|
|||
|
|
@ -174,7 +174,7 @@ function runAgentAttempt(params: {
|
|||
primaryProvider: string;
|
||||
sessionStore?: Record<string, SessionEntry>;
|
||||
storePath?: string;
|
||||
allowRateLimitCooldownProbe?: boolean;
|
||||
allowTransientCooldownProbe?: boolean;
|
||||
}) {
|
||||
const effectivePrompt = resolveFallbackRetryPrompt({
|
||||
body: params.body,
|
||||
|
|
@ -325,7 +325,7 @@ function runAgentAttempt(params: {
|
|||
inputProvenance: params.opts.inputProvenance,
|
||||
streamParams: params.opts.streamParams,
|
||||
agentDir: params.agentDir,
|
||||
allowRateLimitCooldownProbe: params.allowRateLimitCooldownProbe,
|
||||
allowTransientCooldownProbe: params.allowTransientCooldownProbe,
|
||||
onAgentEvent: params.onAgentEvent,
|
||||
bootstrapPromptWarningSignaturesSeen,
|
||||
bootstrapPromptWarningSignature,
|
||||
|
|
@ -868,7 +868,7 @@ async function agentCommandInternal(
|
|||
primaryProvider: provider,
|
||||
sessionStore,
|
||||
storePath,
|
||||
allowRateLimitCooldownProbe: runOptions?.allowRateLimitCooldownProbe,
|
||||
allowTransientCooldownProbe: runOptions?.allowTransientCooldownProbe,
|
||||
onAgentEvent: (evt) => {
|
||||
// Track lifecycle end for fallback emission below.
|
||||
if (
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ describe("mapFailoverReasonToProbeStatus", () => {
|
|||
it("keeps existing failover reason mappings", () => {
|
||||
expect(mapFailoverReasonToProbeStatus("auth")).toBe("auth");
|
||||
expect(mapFailoverReasonToProbeStatus("rate_limit")).toBe("rate_limit");
|
||||
expect(mapFailoverReasonToProbeStatus("overloaded")).toBe("rate_limit");
|
||||
expect(mapFailoverReasonToProbeStatus("billing")).toBe("billing");
|
||||
expect(mapFailoverReasonToProbeStatus("timeout")).toBe("timeout");
|
||||
expect(mapFailoverReasonToProbeStatus("format")).toBe("format");
|
||||
|
|
|
|||
|
|
@ -106,7 +106,7 @@ export function mapFailoverReasonToProbeStatus(reason?: string | null): AuthProb
|
|||
// surface in the auth bucket instead of showing as unknown.
|
||||
return "auth";
|
||||
}
|
||||
if (reason === "rate_limit") {
|
||||
if (reason === "rate_limit" || reason === "overloaded") {
|
||||
return "rate_limit";
|
||||
}
|
||||
if (reason === "billing") {
|
||||
|
|
|
|||
|
|
@ -258,7 +258,7 @@ describe("cron webhook schema", () => {
|
|||
retry: {
|
||||
maxAttempts: 5,
|
||||
backoffMs: [60000, 120000, 300000],
|
||||
retryOn: ["rate_limit", "network"],
|
||||
retryOn: ["rate_limit", "overloaded", "network"],
|
||||
},
|
||||
},
|
||||
});
|
||||
|
|
|
|||
|
|
@ -1144,13 +1144,13 @@ export const FIELD_HELP: Record<string, string> = {
|
|||
"cron.maxConcurrentRuns":
|
||||
"Limits how many cron jobs can execute at the same time when multiple schedules fire together. Use lower values to protect CPU/memory under heavy automation load, or raise carefully for higher throughput.",
|
||||
"cron.retry":
|
||||
"Overrides the default retry policy for one-shot jobs when they fail with transient errors (rate limit, network, server_error). Omit to use defaults: maxAttempts 3, backoffMs [30000, 60000, 300000], retry all transient types.",
|
||||
"Overrides the default retry policy for one-shot jobs when they fail with transient errors (rate limit, overloaded, network, server_error). Omit to use defaults: maxAttempts 3, backoffMs [30000, 60000, 300000], retry all transient types.",
|
||||
"cron.retry.maxAttempts":
|
||||
"Max retries for one-shot jobs on transient errors before permanent disable (default: 3).",
|
||||
"cron.retry.backoffMs":
|
||||
"Backoff delays in ms for each retry attempt (default: [30000, 60000, 300000]). Use shorter values for faster retries.",
|
||||
"cron.retry.retryOn":
|
||||
"Error types to retry: rate_limit, network, timeout, server_error. Use to restrict which errors trigger retries; omit to retry all transient types.",
|
||||
"Error types to retry: rate_limit, overloaded, network, timeout, server_error. Use to restrict which errors trigger retries; omit to retry all transient types.",
|
||||
"cron.webhook":
|
||||
'Deprecated legacy fallback webhook URL used only for old jobs with `notify=true`. Migrate to per-job delivery using `delivery.mode="webhook"` plus `delivery.to`, and avoid relying on this global field.',
|
||||
"cron.webhookToken":
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
import type { SecretInput } from "./types.secrets.js";
|
||||
|
||||
/** Error types that can trigger retries for one-shot jobs. */
|
||||
export type CronRetryOn = "rate_limit" | "network" | "timeout" | "server_error";
|
||||
export type CronRetryOn = "rate_limit" | "overloaded" | "network" | "timeout" | "server_error";
|
||||
|
||||
export type CronRetryConfig = {
|
||||
/** Max retries for transient errors before permanent disable (default: 3). */
|
||||
|
|
|
|||
|
|
@ -440,7 +440,7 @@ export const OpenClawSchema = z
|
|||
maxAttempts: z.number().int().min(0).max(10).optional(),
|
||||
backoffMs: z.array(z.number().int().nonnegative()).min(1).max(10).optional(),
|
||||
retryOn: z
|
||||
.array(z.enum(["rate_limit", "network", "timeout", "server_error"]))
|
||||
.array(z.enum(["rate_limit", "overloaded", "network", "timeout", "server_error"]))
|
||||
.min(1)
|
||||
.optional(),
|
||||
})
|
||||
|
|
|
|||
|
|
@ -534,7 +534,7 @@ export async function runCronIsolatedAgentTurn(params: {
|
|||
// be blocked by a target it cannot satisfy (#27898).
|
||||
requireExplicitMessageTarget: deliveryRequested && resolvedDelivery.ok,
|
||||
disableMessageTool: deliveryRequested || deliveryPlan.mode === "none",
|
||||
allowRateLimitCooldownProbe: runOptions?.allowRateLimitCooldownProbe,
|
||||
allowTransientCooldownProbe: runOptions?.allowTransientCooldownProbe,
|
||||
abortSignal,
|
||||
bootstrapPromptWarningSignaturesSeen,
|
||||
bootstrapPromptWarningSignature,
|
||||
|
|
|
|||
|
|
@ -580,6 +580,7 @@ describe("Cron issue regressions", () => {
|
|||
const runRetryScenario = async (params: {
|
||||
id: string;
|
||||
deleteAfterRun: boolean;
|
||||
firstError?: string;
|
||||
}): Promise<{
|
||||
state: ReturnType<typeof createCronServiceState>;
|
||||
runIsolatedAgentJob: ReturnType<typeof vi.fn>;
|
||||
|
|
@ -600,7 +601,10 @@ describe("Cron issue regressions", () => {
|
|||
let now = scheduledAt;
|
||||
const runIsolatedAgentJob = vi
|
||||
.fn()
|
||||
.mockResolvedValueOnce({ status: "error", error: "429 rate limit exceeded" })
|
||||
.mockResolvedValueOnce({
|
||||
status: "error",
|
||||
error: params.firstError ?? "429 rate limit exceeded",
|
||||
})
|
||||
.mockResolvedValueOnce({ status: "ok", summary: "done" });
|
||||
const state = createCronServiceState({
|
||||
cronEnabled: true,
|
||||
|
|
@ -644,6 +648,19 @@ describe("Cron issue regressions", () => {
|
|||
);
|
||||
expect(deletedJob).toBeUndefined();
|
||||
expect(deleteResult.runIsolatedAgentJob).toHaveBeenCalledTimes(2);
|
||||
|
||||
const overloadedResult = await runRetryScenario({
|
||||
id: "oneshot-overloaded-retry",
|
||||
deleteAfterRun: false,
|
||||
firstError:
|
||||
"All models failed (2): anthropic/claude-3-5-sonnet: LLM error overloaded_error: overloaded (overloaded); openai/gpt-5.3-codex: LLM error overloaded_error: overloaded (overloaded)",
|
||||
});
|
||||
const overloadedJob = overloadedResult.state.store?.jobs.find(
|
||||
(j) => j.id === "oneshot-overloaded-retry",
|
||||
);
|
||||
expect(overloadedJob).toBeDefined();
|
||||
expect(overloadedJob!.state.lastStatus).toBe("ok");
|
||||
expect(overloadedResult.runIsolatedAgentJob).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it("#24355: one-shot job disabled after max transient retries", async () => {
|
||||
|
|
@ -735,6 +752,54 @@ describe("Cron issue regressions", () => {
|
|||
expect(runIsolatedAgentJob).toHaveBeenCalledTimes(3);
|
||||
});
|
||||
|
||||
it("#24355: one-shot job retries status-only 529 failures when retryOn only includes overloaded", async () => {
|
||||
const store = makeStorePath();
|
||||
const scheduledAt = Date.parse("2026-02-06T10:00:00.000Z");
|
||||
|
||||
const cronJob = createIsolatedRegressionJob({
|
||||
id: "oneshot-overloaded-529-only",
|
||||
name: "reminder",
|
||||
scheduledAt,
|
||||
schedule: { kind: "at", at: new Date(scheduledAt).toISOString() },
|
||||
payload: { kind: "agentTurn", message: "remind me" },
|
||||
state: { nextRunAtMs: scheduledAt },
|
||||
});
|
||||
await writeCronJobs(store.storePath, [cronJob]);
|
||||
|
||||
let now = scheduledAt;
|
||||
const runIsolatedAgentJob = vi
|
||||
.fn()
|
||||
.mockResolvedValueOnce({ status: "error", error: "FailoverError: HTTP 529" })
|
||||
.mockResolvedValueOnce({ status: "ok", summary: "done" });
|
||||
const state = createCronServiceState({
|
||||
cronEnabled: true,
|
||||
storePath: store.storePath,
|
||||
log: noopLogger,
|
||||
nowMs: () => now,
|
||||
enqueueSystemEvent: vi.fn(),
|
||||
requestHeartbeatNow: vi.fn(),
|
||||
runIsolatedAgentJob,
|
||||
cronConfig: {
|
||||
retry: { maxAttempts: 1, backoffMs: [1000], retryOn: ["overloaded"] },
|
||||
},
|
||||
});
|
||||
|
||||
await onTimer(state);
|
||||
const jobAfterRetry = state.store?.jobs.find((j) => j.id === "oneshot-overloaded-529-only");
|
||||
expect(jobAfterRetry).toBeDefined();
|
||||
expect(jobAfterRetry!.enabled).toBe(true);
|
||||
expect(jobAfterRetry!.state.lastStatus).toBe("error");
|
||||
expect(jobAfterRetry!.state.nextRunAtMs).toBeGreaterThan(scheduledAt);
|
||||
|
||||
now = (jobAfterRetry!.state.nextRunAtMs ?? now) + 1;
|
||||
await onTimer(state);
|
||||
|
||||
const finishedJob = state.store?.jobs.find((j) => j.id === "oneshot-overloaded-529-only");
|
||||
expect(finishedJob).toBeDefined();
|
||||
expect(finishedJob!.state.lastStatus).toBe("ok");
|
||||
expect(runIsolatedAgentJob).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it("#24355: one-shot job disabled immediately on permanent error", async () => {
|
||||
const store = makeStorePath();
|
||||
const scheduledAt = Date.parse("2026-02-06T10:00:00.000Z");
|
||||
|
|
|
|||
|
|
@ -120,6 +120,8 @@ const DEFAULT_MAX_TRANSIENT_RETRIES = 3;
|
|||
|
||||
const TRANSIENT_PATTERNS: Record<string, RegExp> = {
|
||||
rate_limit: /(rate[_ ]limit|too many requests|429|resource has been exhausted|cloudflare)/i,
|
||||
overloaded:
|
||||
/\b529\b|\boverloaded(?:_error)?\b|high demand|temporar(?:ily|y) overloaded|capacity exceeded/i,
|
||||
network: /(network|econnreset|econnrefused|fetch failed|socket)/i,
|
||||
timeout: /(timeout|etimedout)/i,
|
||||
server_error: /\b5\d{2}\b/,
|
||||
|
|
|
|||
|
|
@ -50,6 +50,26 @@ describe("discord auto presence", () => {
|
|||
expect(decision?.presence.activities[0]?.state).toBe("token exhausted");
|
||||
});
|
||||
|
||||
it("treats overloaded cooldown as exhausted", () => {
|
||||
const now = Date.now();
|
||||
const decision = resolveDiscordAutoPresenceDecision({
|
||||
discordConfig: {
|
||||
autoPresence: {
|
||||
enabled: true,
|
||||
exhaustedText: "token exhausted",
|
||||
},
|
||||
},
|
||||
authStore: createStore({ cooldownUntil: now + 60_000, failureCounts: { overloaded: 2 } }),
|
||||
gatewayConnected: true,
|
||||
now,
|
||||
});
|
||||
|
||||
expect(decision).toBeTruthy();
|
||||
expect(decision?.state).toBe("exhausted");
|
||||
expect(decision?.presence.status).toBe("dnd");
|
||||
expect(decision?.presence.activities[0]?.state).toBe("token exhausted");
|
||||
});
|
||||
|
||||
it("recovers from exhausted to online once a profile becomes usable", () => {
|
||||
let now = Date.now();
|
||||
let store = createStore({ cooldownUntil: now + 60_000, failureCounts: { rate_limit: 1 } });
|
||||
|
|
|
|||
|
|
@ -104,6 +104,7 @@ function isExhaustedUnavailableReason(reason: AuthProfileFailureReason | null):
|
|||
}
|
||||
return (
|
||||
reason === "rate_limit" ||
|
||||
reason === "overloaded" ||
|
||||
reason === "billing" ||
|
||||
reason === "auth" ||
|
||||
reason === "auth_permanent"
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@ export async function runWithModelFallback(params: {
|
|||
run: (
|
||||
provider: string,
|
||||
model: string,
|
||||
options?: { allowRateLimitCooldownProbe?: boolean },
|
||||
options?: { allowTransientCooldownProbe?: boolean },
|
||||
) => Promise<unknown>;
|
||||
}) {
|
||||
return {
|
||||
|
|
|
|||
Loading…
Reference in New Issue