diff --git a/CHANGELOG.md b/CHANGELOG.md index e40d4ca3055..040899c6265 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ Docs: https://docs.openclaw.ai - WhatsApp/reactions: add `reactionLevel` guidance for agent reactions. Thanks @mcaxtr. - Feishu/comments: add a dedicated Drive comment-event flow with comment-thread context resolution, in-thread replies, and `feishu_drive` comment actions for document collaboration workflows. (#58497) thanks @wittam-01. - Tasks/chat: add `/tasks` as a chat-native background task board for the current session, with recent task details and agent-local fallback counts when no linked tasks are visible. Related #54226. Thanks @vincentkoc. +- Agents/failover: cap prompt-side and assistant-side same-provider auth-profile retries for rate-limit failures before cross-provider model fallback, add the `auth.cooldowns.rateLimitedProfileRotations` knob, and document the new fallback behavior. (#58707) Thanks @Forgely3D ### Fixes diff --git a/docs/.generated/config-baseline.json b/docs/.generated/config-baseline.json index db18d6af8c4..c42b45d1abf 100644 --- a/docs/.generated/config-baseline.json +++ b/docs/.generated/config-baseline.json @@ -7907,6 +7907,23 @@ "help": "Maximum same-provider auth-profile rotations allowed for overloaded errors before switching to model fallback (default: 1).", "hasChildren": false }, + { + "path": "auth.cooldowns.rateLimitedProfileRotations", + "kind": "core", + "type": "integer", + "required": false, + "deprecated": false, + "sensitive": false, + "tags": [ + "access", + "auth", + "performance", + "storage" + ], + "label": "Rate-Limited Profile Rotations", + "help": "Maximum same-provider auth-profile rotations allowed for rate-limit errors before switching to model fallback (default: 1).", + "hasChildren": false + }, { "path": "auth.order", "kind": "core", diff --git a/docs/.generated/config-baseline.jsonl b/docs/.generated/config-baseline.jsonl index 4db7e3e5b8c..33ea8f0d31d 100644 --- a/docs/.generated/config-baseline.jsonl +++ b/docs/.generated/config-baseline.jsonl @@ -1,4 +1,4 @@ -{"generatedBy":"scripts/generate-config-doc-baseline.ts","recordType":"meta","totalPaths":5729} +{"generatedBy":"scripts/generate-config-doc-baseline.ts","recordType":"meta","totalPaths":5730} {"recordType":"path","path":"acp","kind":"core","type":"object","required":false,"deprecated":false,"sensitive":false,"tags":["advanced"],"label":"ACP","help":"ACP runtime controls for enabling dispatch, selecting backends, constraining allowed agent targets, and tuning streamed turn projection behavior.","hasChildren":true} {"recordType":"path","path":"acp.allowedAgents","kind":"core","type":"array","required":false,"deprecated":false,"sensitive":false,"tags":["access"],"label":"ACP Allowed Agents","help":"Allowlist of ACP target agent ids permitted for ACP runtime sessions. Empty means no additional allowlist restriction.","hasChildren":true} {"recordType":"path","path":"acp.allowedAgents.*","kind":"core","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} @@ -701,6 +701,7 @@ {"recordType":"path","path":"auth.cooldowns.failureWindowHours","kind":"core","type":"number","required":false,"deprecated":false,"sensitive":false,"tags":["access","auth"],"label":"Failover Window (hours)","help":"Failure window (hours) for backoff counters (default: 24).","hasChildren":false} {"recordType":"path","path":"auth.cooldowns.overloadedBackoffMs","kind":"core","type":"integer","required":false,"deprecated":false,"sensitive":false,"tags":["access","auth","reliability","storage"],"label":"Overloaded Backoff (ms)","help":"Fixed delay in milliseconds before retrying an overloaded provider/profile rotation (default: 0).","hasChildren":false} {"recordType":"path","path":"auth.cooldowns.overloadedProfileRotations","kind":"core","type":"integer","required":false,"deprecated":false,"sensitive":false,"tags":["access","auth","storage"],"label":"Overloaded Profile Rotations","help":"Maximum same-provider auth-profile rotations allowed for overloaded errors before switching to model fallback (default: 1).","hasChildren":false} +{"recordType":"path","path":"auth.cooldowns.rateLimitedProfileRotations","kind":"core","type":"integer","required":false,"deprecated":false,"sensitive":false,"tags":["access","auth","performance","storage"],"label":"Rate-Limited Profile Rotations","help":"Maximum same-provider auth-profile rotations allowed for rate-limit errors before switching to model fallback (default: 1).","hasChildren":false} {"recordType":"path","path":"auth.order","kind":"core","type":"object","required":false,"deprecated":false,"sensitive":false,"tags":["access","auth"],"label":"Auth Profile Order","help":"Ordered auth profile IDs per provider (used for automatic failover).","hasChildren":true} {"recordType":"path","path":"auth.order.*","kind":"core","type":"array","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":true} {"recordType":"path","path":"auth.order.*.*","kind":"core","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} diff --git a/docs/concepts/model-failover.md b/docs/concepts/model-failover.md index b99803b10e6..790c116f7bb 100644 --- a/docs/concepts/model-failover.md +++ b/docs/concepts/model-failover.md @@ -138,10 +138,12 @@ If all profiles for a provider fail, OpenClaw moves to the next model in `agents.defaults.model.fallbacks`. This applies to auth failures, rate limits, and timeouts that exhausted profile rotation (other errors do not advance fallback). -Overloaded errors are handled more aggressively than billing cooldowns. By default, -OpenClaw allows one same-provider auth-profile retry, then switches to the next -configured model fallback without waiting. Tune this with -`auth.cooldowns.overloadedProfileRotations` and `auth.cooldowns.overloadedBackoffMs`. +Overloaded and rate-limit errors are handled more aggressively than billing +cooldowns. By default, OpenClaw allows one same-provider auth-profile retry, +then switches to the next configured model fallback without waiting. Tune this +with `auth.cooldowns.overloadedProfileRotations`, +`auth.cooldowns.overloadedBackoffMs`, and +`auth.cooldowns.rateLimitedProfileRotations`. When a run starts with a model override (hooks or CLI), fallbacks still end at `agents.defaults.model.primary` after trying any configured fallbacks. @@ -154,6 +156,7 @@ See [Gateway configuration](/gateway/configuration) for: - `auth.cooldowns.billingBackoffHours` / `auth.cooldowns.billingBackoffHoursByProvider` - `auth.cooldowns.billingMaxHours` / `auth.cooldowns.failureWindowHours` - `auth.cooldowns.overloadedProfileRotations` / `auth.cooldowns.overloadedBackoffMs` +- `auth.cooldowns.rateLimitedProfileRotations` - `agents.defaults.model.primary` / `agents.defaults.model.fallbacks` - `agents.defaults.imageModel` routing diff --git a/docs/gateway/configuration-reference.md b/docs/gateway/configuration-reference.md index 46ba51d6534..62113f1e49c 100644 --- a/docs/gateway/configuration-reference.md +++ b/docs/gateway/configuration-reference.md @@ -3031,6 +3031,7 @@ Notes: failureWindowHours: 24, overloadedProfileRotations: 1, overloadedBackoffMs: 0, + rateLimitedProfileRotations: 1, }, }, } @@ -3042,6 +3043,7 @@ Notes: - `failureWindowHours`: rolling window in hours used for backoff counters (default: `24`). - `overloadedProfileRotations`: maximum same-provider auth-profile rotations for overloaded errors before switching to model fallback (default: `1`). - `overloadedBackoffMs`: fixed delay before retrying an overloaded provider/profile rotation (default: `0`). +- `rateLimitedProfileRotations`: maximum same-provider auth-profile rotations for rate-limit errors before switching to model fallback (default: `1`). --- diff --git a/scripts/openclaw-npm-release-check.ts b/scripts/openclaw-npm-release-check.ts index 0da9da36214..0f3c681dba9 100644 --- a/scripts/openclaw-npm-release-check.ts +++ b/scripts/openclaw-npm-release-check.ts @@ -90,9 +90,12 @@ export function resolveNpmDistTagMirrorAuth(params?: { nodeAuthToken?: string | null; npmToken?: string | null; }): NpmDistTagMirrorAuth { + const nodeAuthToken = + params && "nodeAuthToken" in params ? params.nodeAuthToken : process.env.NODE_AUTH_TOKEN; + const npmToken = params && "npmToken" in params ? params.npmToken : process.env.NPM_TOKEN; return resolveNpmDistTagMirrorAuthBase({ - nodeAuthToken: params?.nodeAuthToken ?? process.env.NODE_AUTH_TOKEN, - npmToken: params?.npmToken ?? process.env.NPM_TOKEN, + nodeAuthToken, + npmToken, }) as NpmDistTagMirrorAuth; } diff --git a/src/agents/model-fallback.run-embedded.e2e.test.ts b/src/agents/model-fallback.run-embedded.e2e.test.ts index 4232ab75cf4..66206447ac7 100644 --- a/src/agents/model-fallback.run-embedded.e2e.test.ts +++ b/src/agents/model-fallback.run-embedded.e2e.test.ts @@ -94,6 +94,7 @@ beforeEach(() => { const OVERLOADED_ERROR_PAYLOAD = '{"type":"error","error":{"type":"overloaded_error","message":"Overloaded"}}'; +const RATE_LIMIT_ERROR_MESSAGE = "rate limit exceeded"; function makeConfig(): OpenClawConfig { const apiKeyField = ["api", "Key"].join(""); @@ -196,6 +197,27 @@ async function readUsageStats(agentDir: string) { return JSON.parse(raw).usageStats as Record | undefined>; } +async function writeMultiProfileAuthStore(agentDir: string) { + await fs.writeFile( + path.join(agentDir, "auth-profiles.json"), + JSON.stringify({ + version: 1, + profiles: { + "openai:p1": { type: "api_key", provider: "openai", key: "sk-openai-1" }, + "openai:p2": { type: "api_key", provider: "openai", key: "sk-openai-2" }, + "openai:p3": { type: "api_key", provider: "openai", key: "sk-openai-3" }, + "groq:p1": { type: "api_key", provider: "groq", key: "sk-groq" }, + }, + usageStats: { + "openai:p1": { lastUsed: 1 }, + "openai:p2": { lastUsed: 2 }, + "openai:p3": { lastUsed: 3 }, + "groq:p1": { lastUsed: 4 }, + }, + }), + ); +} + async function runEmbeddedFallback(params: { agentDir: string; workspaceDir: string; @@ -236,6 +258,29 @@ function mockPrimaryOverloadedThenFallbackSuccess() { mockPrimaryErrorThenFallbackSuccess(OVERLOADED_ERROR_PAYLOAD); } +function mockPrimaryPromptErrorThenFallbackSuccess(errorMessage: string) { + runEmbeddedAttemptMock.mockImplementation(async (params: unknown) => { + const attemptParams = params as { provider: string }; + if (attemptParams.provider === "openai") { + return makeEmbeddedRunnerAttempt({ + promptError: new Error(errorMessage), + }); + } + if (attemptParams.provider === "groq") { + return makeEmbeddedRunnerAttempt({ + assistantTexts: ["fallback ok"], + lastAssistant: buildEmbeddedRunnerAssistant({ + provider: "groq", + model: "mock-2", + stopReason: "stop", + content: [{ type: "text", text: "fallback ok" }], + }), + }); + } + throw new Error(`Unexpected provider ${attemptParams.provider}`); + }); +} + function mockPrimaryErrorThenFallbackSuccess(errorMessage: string) { runEmbeddedAttemptMock.mockImplementation(async (params: unknown) => { const attemptParams = params as { provider: string; modelId: string; authProfileId?: string }; @@ -572,22 +617,7 @@ describe("runWithModelFallback + runEmbeddedPiAgent overload policy", () => { it("respects overloadedProfileRotations=0 and falls back immediately", async () => { await withAgentWorkspace(async ({ agentDir, workspaceDir }) => { - await fs.writeFile( - path.join(agentDir, "auth-profiles.json"), - JSON.stringify({ - version: 1, - profiles: { - "openai:p1": { type: "api_key", provider: "openai", key: "sk-openai-1" }, - "openai:p2": { type: "api_key", provider: "openai", key: "sk-openai-2" }, - "groq:p1": { type: "api_key", provider: "groq", key: "sk-groq" }, - }, - usageStats: { - "openai:p1": { lastUsed: 1 }, - "openai:p2": { lastUsed: 2 }, - "groq:p1": { lastUsed: 3 }, - }, - }), - ); + await writeMultiProfileAuthStore(agentDir); runEmbeddedAttemptMock.mockImplementation(async (params: unknown) => { const attemptParams = params as { provider: string }; @@ -638,4 +668,117 @@ describe("runWithModelFallback + runEmbeddedPiAgent overload policy", () => { expect(groqAttempts.length).toBe(1); }); }); + + it("caps rate-limit profile rotations and escalates to cross-provider fallback (#58572)", async () => { + await withAgentWorkspace(async ({ agentDir, workspaceDir }) => { + await writeMultiProfileAuthStore(agentDir); + + mockPrimaryErrorThenFallbackSuccess(RATE_LIMIT_ERROR_MESSAGE); + + const result = await runEmbeddedFallback({ + agentDir, + workspaceDir, + sessionKey: "agent:test:rate-limit-multi-profile-cap", + runId: "run:rate-limit-multi-profile-cap", + }); + + expect(result.provider).toBe("groq"); + expect(result.model).toBe("mock-2"); + expect(result.result.payloads?.[0]?.text ?? "").toContain("fallback ok"); + + const openaiAttempts = runEmbeddedAttemptMock.mock.calls.filter( + (call) => (call[0] as { provider?: string })?.provider === "openai", + ); + const groqAttempts = runEmbeddedAttemptMock.mock.calls.filter( + (call) => (call[0] as { provider?: string })?.provider === "groq", + ); + expect(openaiAttempts.length).toBe(2); + expect(groqAttempts.length).toBe(1); + }); + }); + + it("respects rateLimitedProfileRotations=0 and falls back immediately", async () => { + await withAgentWorkspace(async ({ agentDir, workspaceDir }) => { + await writeMultiProfileAuthStore(agentDir); + + mockPrimaryErrorThenFallbackSuccess(RATE_LIMIT_ERROR_MESSAGE); + + const result = await runEmbeddedFallback({ + agentDir, + workspaceDir, + sessionKey: "agent:test:rate-limit-no-rotation", + runId: "run:rate-limit-no-rotation", + config: { + ...makeConfig(), + auth: { cooldowns: { rateLimitedProfileRotations: 0 } }, + }, + }); + + expect(result.provider).toBe("groq"); + const openaiAttempts = runEmbeddedAttemptMock.mock.calls.filter( + (call) => (call[0] as { provider?: string })?.provider === "openai", + ); + const groqAttempts = runEmbeddedAttemptMock.mock.calls.filter( + (call) => (call[0] as { provider?: string })?.provider === "groq", + ); + expect(openaiAttempts.length).toBe(1); + expect(groqAttempts.length).toBe(1); + }); + }); + + it("caps prompt-side rate-limit profile rotations before cross-provider fallback", async () => { + await withAgentWorkspace(async ({ agentDir, workspaceDir }) => { + await writeMultiProfileAuthStore(agentDir); + + mockPrimaryPromptErrorThenFallbackSuccess(RATE_LIMIT_ERROR_MESSAGE); + + const result = await runEmbeddedFallback({ + agentDir, + workspaceDir, + sessionKey: "agent:test:prompt-rate-limit-multi-profile-cap", + runId: "run:prompt-rate-limit-multi-profile-cap", + }); + + expect(result.provider).toBe("groq"); + expect(result.model).toBe("mock-2"); + + const openaiAttempts = runEmbeddedAttemptMock.mock.calls.filter( + (call) => (call[0] as { provider?: string })?.provider === "openai", + ); + const groqAttempts = runEmbeddedAttemptMock.mock.calls.filter( + (call) => (call[0] as { provider?: string })?.provider === "groq", + ); + expect(openaiAttempts.length).toBe(2); + expect(groqAttempts.length).toBe(1); + }); + }); + + it("respects prompt-side rateLimitedProfileRotations=0 and falls back immediately", async () => { + await withAgentWorkspace(async ({ agentDir, workspaceDir }) => { + await writeMultiProfileAuthStore(agentDir); + + mockPrimaryPromptErrorThenFallbackSuccess(RATE_LIMIT_ERROR_MESSAGE); + + const result = await runEmbeddedFallback({ + agentDir, + workspaceDir, + sessionKey: "agent:test:prompt-rate-limit-no-rotation", + runId: "run:prompt-rate-limit-no-rotation", + config: { + ...makeConfig(), + auth: { cooldowns: { rateLimitedProfileRotations: 0 } }, + }, + }); + + expect(result.provider).toBe("groq"); + const openaiAttempts = runEmbeddedAttemptMock.mock.calls.filter( + (call) => (call[0] as { provider?: string })?.provider === "openai", + ); + const groqAttempts = runEmbeddedAttemptMock.mock.calls.filter( + (call) => (call[0] as { provider?: string })?.provider === "groq", + ); + expect(openaiAttempts.length).toBe(1); + expect(groqAttempts.length).toBe(1); + }); + }); }); diff --git a/src/agents/pi-embedded-runner/run.ts b/src/agents/pi-embedded-runner/run.ts index fc34f8ac152..1b9963fac2f 100644 --- a/src/agents/pi-embedded-runner/run.ts +++ b/src/agents/pi-embedded-runner/run.ts @@ -78,6 +78,7 @@ import { resolveMaxRunRetryIterations, resolveOverloadFailoverBackoffMs, resolveOverloadProfileRotationLimit, + resolveRateLimitProfileRotationLimit, type RuntimeAuthState, scrubAnthropicRefusalMagic, } from "./run/helpers.js"; @@ -304,9 +305,36 @@ export async function runEmbeddedPiAgent( let autoCompactionCount = 0; let runLoopIterations = 0; let overloadProfileRotations = 0; + let rateLimitProfileRotations = 0; let timeoutCompactionAttempts = 0; const overloadFailoverBackoffMs = resolveOverloadFailoverBackoffMs(params.config); const overloadProfileRotationLimit = resolveOverloadProfileRotationLimit(params.config); + const rateLimitProfileRotationLimit = resolveRateLimitProfileRotationLimit(params.config); + const maybeEscalateRateLimitProfileFallback = (params: { + failoverProvider: string; + failoverModel: string; + logFallbackDecision: (decision: "fallback_model", extra?: { status?: number }) => void; + }) => { + rateLimitProfileRotations += 1; + if (rateLimitProfileRotations <= rateLimitProfileRotationLimit || !fallbackConfigured) { + return; + } + const status = resolveFailoverStatus("rate_limit"); + log.warn( + `rate-limit profile rotation cap reached for ${sanitizeForLog(provider)}/${sanitizeForLog(modelId)} after ${rateLimitProfileRotations} rotations; escalating to model fallback`, + ); + params.logFallbackDecision("fallback_model", { status }); + throw new FailoverError( + "The AI service is temporarily rate-limited. Please try again in a moment.", + { + reason: "rate_limit", + provider: params.failoverProvider, + model: params.failoverModel, + profileId: lastProfileId, + status, + }, + ); + }; const maybeMarkAuthProfileFailure = async (failure: { profileId?: string; reason?: AuthProfileFailureReason | null; @@ -1022,6 +1050,13 @@ export async function runEmbeddedPiAgent( fallbackConfigured, aborted, }); + if (promptFailoverReason === "rate_limit") { + maybeEscalateRateLimitProfileFallback({ + failoverProvider: provider, + failoverModel: modelId, + logFallbackDecision: logPromptFailoverDecision, + }); + } if ( promptFailoverFailure && promptFailoverReason !== "timeout" && @@ -1185,6 +1220,19 @@ export async function runEmbeddedPiAgent( } } + // For rate-limit errors, apply the same rotation cap so that + // per-model quota exhaustion (e.g. Anthropic Sonnet-only limits) + // escalates to cross-provider model fallback instead of spinning + // forever across profiles that share the same model quota. + // See: https://github.com/openclaw/openclaw/issues/58572 + if (assistantFailoverReason === "rate_limit") { + maybeEscalateRateLimitProfileFallback({ + failoverProvider: activeErrorContext.provider, + failoverModel: activeErrorContext.model, + logFallbackDecision: logAssistantFailoverDecision, + }); + } + const rotated = await advanceAuthProfile(); if (rotated) { logAssistantFailoverDecision("rotate_profile"); diff --git a/src/agents/pi-embedded-runner/run/helpers.ts b/src/agents/pi-embedded-runner/run/helpers.ts index a61b0254d13..5f257504ee6 100644 --- a/src/agents/pi-embedded-runner/run/helpers.ts +++ b/src/agents/pi-embedded-runner/run/helpers.ts @@ -27,6 +27,7 @@ export const RUNTIME_AUTH_REFRESH_MIN_DELAY_MS = 5 * 1000; export const DEFAULT_OVERLOAD_FAILOVER_BACKOFF_MS = 0; export const DEFAULT_MAX_OVERLOAD_PROFILE_ROTATIONS = 1; +export const DEFAULT_MAX_RATE_LIMIT_PROFILE_ROTATIONS = 1; export function resolveOverloadFailoverBackoffMs(cfg?: OpenClawConfig): number { return cfg?.auth?.cooldowns?.overloadedBackoffMs ?? DEFAULT_OVERLOAD_FAILOVER_BACKOFF_MS; @@ -36,6 +37,10 @@ export function resolveOverloadProfileRotationLimit(cfg?: OpenClawConfig): numbe return cfg?.auth?.cooldowns?.overloadedProfileRotations ?? DEFAULT_MAX_OVERLOAD_PROFILE_ROTATIONS; } +export function resolveRateLimitProfileRotationLimit(cfg?: OpenClawConfig): number { + return cfg?.auth?.cooldowns?.rateLimitedProfileRotations ?? DEFAULT_MAX_RATE_LIMIT_PROFILE_ROTATIONS; +} + const ANTHROPIC_MAGIC_STRING_TRIGGER_REFUSAL = "ANTHROPIC_MAGIC_STRING_TRIGGER_REFUSAL"; const ANTHROPIC_MAGIC_STRING_REPLACEMENT = "ANTHROPIC MAGIC STRING TRIGGER REFUSAL (redacted)"; diff --git a/src/auto-reply/reply/commands-status.ts b/src/auto-reply/reply/commands-status.ts index 968d14ef35d..a8870ebee6d 100644 --- a/src/auto-reply/reply/commands-status.ts +++ b/src/auto-reply/reply/commands-status.ts @@ -22,7 +22,10 @@ import { resolveUsageProviderId, } from "../../infra/provider-usage.js"; import type { MediaUnderstandingDecision } from "../../media-understanding/types.js"; -import { listTasksForAgentId, listTasksForSessionKey } from "../../tasks/task-registry.js"; +import { + listTasksForAgentIdForStatus, + listTasksForSessionKeyForStatus, +} from "../../tasks/task-status-access.js"; import { buildTaskStatusSnapshot, formatTaskStatusDetail, @@ -61,7 +64,7 @@ function shouldLoadUsageSummary(params: { } function formatSessionTaskLine(sessionKey: string): string | undefined { - const snapshot = buildTaskStatusSnapshot(listTasksForSessionKey(sessionKey)); + const snapshot = buildTaskStatusSnapshot(listTasksForSessionKeyForStatus(sessionKey)); const task = snapshot.focus; if (!task) { return undefined; @@ -79,7 +82,7 @@ function formatSessionTaskLine(sessionKey: string): string | undefined { } function formatAgentTaskCountsLine(agentId: string): string | undefined { - const snapshot = buildTaskStatusSnapshot(listTasksForAgentId(agentId)); + const snapshot = buildTaskStatusSnapshot(listTasksForAgentIdForStatus(agentId)); if (snapshot.totalCount === 0) { return undefined; } diff --git a/src/auto-reply/reply/commands-tasks.ts b/src/auto-reply/reply/commands-tasks.ts index be0343e23ab..adf37e09670 100644 --- a/src/auto-reply/reply/commands-tasks.ts +++ b/src/auto-reply/reply/commands-tasks.ts @@ -2,8 +2,11 @@ import { resolveSessionAgentId } from "../../agents/agent-scope.js"; import { logVerbose } from "../../globals.js"; import { formatDurationCompact } from "../../infra/format-time/format-duration.ts"; import { formatTimeAgo } from "../../infra/format-time/format-relative.ts"; -import { listTasksForAgentId, listTasksForSessionKey } from "../../tasks/task-registry.js"; import type { TaskRecord } from "../../tasks/task-registry.types.js"; +import { + listTasksForAgentIdForStatus, + listTasksForSessionKeyForStatus, +} from "../../tasks/task-status-access.js"; import { buildTaskStatusSnapshot } from "../../tasks/task-status.js"; import type { ReplyPayload } from "../types.js"; import type { CommandHandler, HandleCommandsParams } from "./commands-types.js"; @@ -35,7 +38,7 @@ function formatTaskHeadline(snapshot: ReturnType } function formatAgentFallbackLine(agentId: string): string | undefined { - const snapshot = buildTaskStatusSnapshot(listTasksForAgentId(agentId)); + const snapshot = buildTaskStatusSnapshot(listTasksForAgentIdForStatus(agentId)); if (snapshot.totalCount === 0) { return undefined; } @@ -75,7 +78,9 @@ function formatVisibleTask(task: TaskRecord, index: number): string { } export function buildTasksText(params: { sessionKey: string; agentId: string }): string { - const sessionSnapshot = buildTaskStatusSnapshot(listTasksForSessionKey(params.sessionKey)); + const sessionSnapshot = buildTaskStatusSnapshot( + listTasksForSessionKeyForStatus(params.sessionKey), + ); const lines = ["📋 Tasks", formatTaskHeadline(sessionSnapshot)]; if (sessionSnapshot.totalCount > 0) { diff --git a/src/config/schema.base.generated.ts b/src/config/schema.base.generated.ts index 540b999c582..7e779e00c44 100644 --- a/src/config/schema.base.generated.ts +++ b/src/config/schema.base.generated.ts @@ -799,6 +799,11 @@ export const GENERATED_BASE_CONFIG_SCHEMA = { minimum: 0, maximum: 9007199254740991, }, + rateLimitedProfileRotations: { + type: "integer", + minimum: 0, + maximum: 9007199254740991, + }, }, additionalProperties: false, }, @@ -13672,6 +13677,11 @@ export const GENERATED_BASE_CONFIG_SCHEMA = { help: "Fixed delay in milliseconds before retrying an overloaded provider/profile rotation (default: 0).", tags: ["auth", "access", "reliability", "storage"], }, + "auth.cooldowns.rateLimitedProfileRotations": { + label: "Rate-Limited Profile Rotations", + help: "Maximum same-provider auth-profile rotations allowed for rate-limit errors before switching to model fallback (default: 1).", + tags: ["auth", "access", "performance", "storage"], + }, "agents.defaults.models": { label: "Models", help: "Configured model catalog (keys are full provider/model IDs).", diff --git a/src/config/schema.help.ts b/src/config/schema.help.ts index 05f45e280b3..c08d93d989c 100644 --- a/src/config/schema.help.ts +++ b/src/config/schema.help.ts @@ -805,6 +805,8 @@ export const FIELD_HELP: Record = { "Maximum same-provider auth-profile rotations allowed for overloaded errors before switching to model fallback (default: 1).", "auth.cooldowns.overloadedBackoffMs": "Fixed delay in milliseconds before retrying an overloaded provider/profile rotation (default: 0).", + "auth.cooldowns.rateLimitedProfileRotations": + "Maximum same-provider auth-profile rotations allowed for rate-limit errors before switching to model fallback (default: 1).", "agents.defaults.workspace": "Default workspace path exposed to agent runtime tools for filesystem context and repo-aware behavior. Set this explicitly when running from wrappers so path resolution stays deterministic.", "agents.defaults.bootstrapMaxChars": diff --git a/src/config/schema.labels.ts b/src/config/schema.labels.ts index 4ccce2268a1..acefbc0d337 100644 --- a/src/config/schema.labels.ts +++ b/src/config/schema.labels.ts @@ -473,6 +473,7 @@ export const FIELD_LABELS: Record = { "auth.cooldowns.failureWindowHours": "Failover Window (hours)", "auth.cooldowns.overloadedProfileRotations": "Overloaded Profile Rotations", "auth.cooldowns.overloadedBackoffMs": "Overloaded Backoff (ms)", + "auth.cooldowns.rateLimitedProfileRotations": "Rate-Limited Profile Rotations", "agents.defaults.models": "Models", "agents.defaults.model.primary": "Primary Model", "agents.defaults.model.fallbacks": "Model Fallbacks", diff --git a/src/config/types.auth.ts b/src/config/types.auth.ts index 010295c86b3..d02461ce0a8 100644 --- a/src/config/types.auth.ts +++ b/src/config/types.auth.ts @@ -36,5 +36,10 @@ export type AuthConfig = { * Default: 0. */ overloadedBackoffMs?: number; + /** + * Maximum same-provider auth-profile rotations to allow for rate-limit + * errors before escalating to cross-provider model fallback. Default: 1. + */ + rateLimitedProfileRotations?: number; }; }; diff --git a/src/config/zod-schema.ts b/src/config/zod-schema.ts index b27a98765fb..90de086f4db 100644 --- a/src/config/zod-schema.ts +++ b/src/config/zod-schema.ts @@ -452,6 +452,7 @@ export const OpenClawSchema = z failureWindowHours: z.number().positive().optional(), overloadedProfileRotations: z.number().int().nonnegative().optional(), overloadedBackoffMs: z.number().int().nonnegative().optional(), + rateLimitedProfileRotations: z.number().int().nonnegative().optional(), }) .strict() .optional(), diff --git a/src/cron/service.jobs.test.ts b/src/cron/service.jobs.test.ts index 869b1bece9e..c310cc34307 100644 --- a/src/cron/service.jobs.test.ts +++ b/src/cron/service.jobs.test.ts @@ -1,5 +1,5 @@ import { describe, expect, it } from "vitest"; -import { applyJobPatch, createJob } from "./service/jobs.js"; +import { applyJobPatch, createJob, recomputeNextRuns } from "./service/jobs.js"; import type { CronServiceState } from "./service/state.js"; import { DEFAULT_TOP_OF_HOUR_STAGGER_MS } from "./stagger.js"; import type { CronJob, CronJobPatch } from "./types.js"; @@ -538,3 +538,33 @@ describe("createJob delivery defaults", () => { expect(job.delivery).toBeUndefined(); }); }); + +describe("recomputeNextRuns", () => { + it("backfills missing every anchorMs for legacy loaded jobs", () => { + const now = Date.parse("2026-03-01T12:00:00.000Z"); + const createdAtMs = now - 120_000; + const job: CronJob = { + id: "legacy-every", + name: "legacy-every", + enabled: true, + createdAtMs, + updatedAtMs: createdAtMs, + schedule: { kind: "every", everyMs: 60_000 }, + sessionTarget: "main", + wakeMode: "now", + payload: { kind: "systemEvent", text: "tick" }, + state: {}, + }; + const state = { + ...createMockState(now), + store: { version: 1 as const, jobs: [job] }, + } as CronServiceState; + + expect(recomputeNextRuns(state)).toBe(true); + expect(job.schedule.kind).toBe("every"); + if (job.schedule.kind === "every") { + expect(job.schedule.anchorMs).toBe(createdAtMs); + } + expect(job.state.nextRunAtMs).toBe(now); + }); +}); diff --git a/src/cron/service/jobs.ts b/src/cron/service/jobs.ts index 629b8bc3c3a..03ac0b3039b 100644 --- a/src/cron/service/jobs.ts +++ b/src/cron/service/jobs.ts @@ -238,8 +238,12 @@ export function findJobOrThrow(state: CronServiceState, id: string) { return job; } +export function isJobEnabled(job: Pick): boolean { + return job.enabled ?? true; +} + export function computeJobNextRunAtMs(job: CronJob, nowMs: number): number | undefined { - if (!job.enabled) { + if (!isJobEnabled(job)) { return undefined; } if (job.schedule.kind === "every") { @@ -295,7 +299,7 @@ export function computeJobNextRunAtMs(job: CronJob, nowMs: number): number | und } export function computeJobPreviousRunAtMs(job: CronJob, nowMs: number): number | undefined { - if (!job.enabled || job.schedule.kind !== "cron") { + if (!isJobEnabled(job) || job.schedule.kind !== "cron") { return undefined; } const previous = computeStaggeredCronPreviousRunAtMs(job, nowMs); @@ -359,7 +363,21 @@ function normalizeJobTickState(params: { state: CronServiceState; job: CronJob; changed = true; } - if (!job.enabled) { + if (job.schedule.kind === "every") { + const normalizedAnchorMs = resolveEveryAnchorMs({ + schedule: job.schedule, + fallbackAnchorMs: isFiniteTimestamp(job.createdAtMs) ? job.createdAtMs : nowMs, + }); + if (job.schedule.anchorMs !== normalizedAnchorMs) { + job.schedule = { + ...job.schedule, + anchorMs: normalizedAnchorMs, + }; + changed = true; + } + } + + if (!isJobEnabled(job)) { if (job.state.nextRunAtMs !== undefined) { job.state.nextRunAtMs = undefined; changed = true; @@ -840,7 +858,9 @@ export function isJobDue(job: CronJob, nowMs: number, opts: { forced: boolean }) if (opts.forced) { return true; } - return job.enabled && typeof job.state.nextRunAtMs === "number" && nowMs >= job.state.nextRunAtMs; + return ( + isJobEnabled(job) && typeof job.state.nextRunAtMs === "number" && nowMs >= job.state.nextRunAtMs + ); } export function resolveJobPayloadTextForMain(job: CronJob): string | undefined { diff --git a/src/cron/service/ops.ts b/src/cron/service/ops.ts index 78be6c4ea3a..868a8ac09df 100644 --- a/src/cron/service/ops.ts +++ b/src/cron/service/ops.ts @@ -11,6 +11,7 @@ import { computeJobNextRunAtMs, createJob, findJobOrThrow, + isJobEnabled, isJobDue, nextWakeAtMs, recomputeNextRuns, @@ -162,7 +163,7 @@ export async function list(state: CronServiceState, opts?: { includeDisabled?: b return await locked(state, async () => { await ensureLoadedForRead(state); const includeDisabled = opts?.includeDisabled === true; - const jobs = (state.store?.jobs ?? []).filter((j) => includeDisabled || j.enabled); + const jobs = (state.store?.jobs ?? []).filter((j) => includeDisabled || isJobEnabled(j)); return jobs.toSorted((a, b) => (a.state.nextRunAtMs ?? 0) - (b.state.nextRunAtMs ?? 0)); }); } @@ -215,10 +216,10 @@ export async function listPage(state: CronServiceState, opts?: CronListPageOptio const sortDir = opts?.sortDir ?? "asc"; const source = state.store?.jobs ?? []; const filtered = source.filter((job) => { - if (enabledFilter === "enabled" && !job.enabled) { + if (enabledFilter === "enabled" && !isJobEnabled(job)) { return false; } - if (enabledFilter === "disabled" && job.enabled) { + if (enabledFilter === "disabled" && isJobEnabled(job)) { return false; } if (!query) { @@ -307,13 +308,13 @@ export async function update(state: CronServiceState, id: string, patch: CronJob job.updatedAtMs = now; if (scheduleChanged || enabledChanged) { - if (job.enabled) { + if (isJobEnabled(job)) { job.state.nextRunAtMs = computeJobNextRunAtMs(job, now); } else { job.state.nextRunAtMs = undefined; job.state.runningAtMs = undefined; } - } else if (job.enabled) { + } else if (isJobEnabled(job)) { // Non-schedule edits should not mutate other jobs, but still repair a // missing/corrupt nextRunAtMs for the updated job. const nextRun = job.state.nextRunAtMs; diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index c5cad14d4f3..36442c06b53 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -20,6 +20,7 @@ import type { import { computeJobPreviousRunAtMs, computeJobNextRunAtMs, + isJobEnabled, nextWakeAtMs, recomputeNextRunsForMaintenance, recordScheduleComputeError, @@ -499,7 +500,7 @@ export function applyJobResult( ); } } - } else if (result.status === "error" && job.enabled) { + } else if (result.status === "error" && isJobEnabled(job)) { // Apply exponential backoff for errored jobs to prevent retry storms. const backoff = errorBackoffMs(job.state.consecutiveErrors ?? 1); let normalNext: number | undefined; @@ -527,7 +528,7 @@ export function applyJobResult( }, "cron: applying error backoff", ); - } else if (job.enabled) { + } else if (isJobEnabled(job)) { let naturalNext: number | undefined; try { naturalNext = @@ -836,7 +837,7 @@ function isRunnableJob(params: { if (!job.state) { job.state = {}; } - if (!job.enabled) { + if (!isJobEnabled(job)) { return false; } if (params.skipJobIds?.has(job.id)) { @@ -853,7 +854,7 @@ function isRunnableJob(params: { const nextRun = job.state.nextRunAtMs; if ( job.state.lastStatus === "error" && - job.enabled && + isJobEnabled(job) && typeof nextRun === "number" && typeof lastRun === "number" && nextRun > lastRun @@ -1079,7 +1080,7 @@ async function applyStartupCatchupOutcomes( let offset = staggerMs; for (const jobId of plan.deferredJobIds) { const job = state.store.jobs.find((entry) => entry.id === jobId); - if (!job || !job.enabled) { + if (!job || !isJobEnabled(job)) { continue; } job.state.nextRunAtMs = baseNow + offset; diff --git a/src/tasks/task-registry-import-boundary.test.ts b/src/tasks/task-registry-import-boundary.test.ts index b7767d5c527..4b4cc3ca3e4 100644 --- a/src/tasks/task-registry-import-boundary.test.ts +++ b/src/tasks/task-registry-import-boundary.test.ts @@ -6,9 +6,9 @@ const TASK_ROOT = path.resolve(import.meta.dirname); const SRC_ROOT = path.resolve(TASK_ROOT, ".."); const ALLOWED_IMPORTERS = new Set([ - "auto-reply/reply/commands-status.ts", "tasks/runtime-internal.ts", "tasks/task-owner-access.ts", + "tasks/task-status-access.ts", ]); async function listSourceFiles(root: string): Promise { diff --git a/src/tasks/task-status-access.ts b/src/tasks/task-status-access.ts new file mode 100644 index 00000000000..0e33b3587b9 --- /dev/null +++ b/src/tasks/task-status-access.ts @@ -0,0 +1,10 @@ +import { listTasksForAgentId, listTasksForSessionKey } from "./task-registry.js"; +import type { TaskRecord } from "./task-registry.types.js"; + +export function listTasksForSessionKeyForStatus(sessionKey: string): TaskRecord[] { + return listTasksForSessionKey(sessionKey); +} + +export function listTasksForAgentIdForStatus(agentId: string): TaskRecord[] { + return listTasksForAgentId(agentId); +}