mirror of https://github.com/openclaw/openclaw.git
Merge branch 'main' into fix-openclaw-home-nested-dir
This commit is contained in:
commit
369d30bf70
|
|
@ -74,6 +74,8 @@ Docs: https://docs.openclaw.ai
|
|||
- Signal/config validation: add `channels.signal.groups` schema support so per-group `requireMention`, `tools`, and `toolsBySender` overrides no longer get rejected during config validation. (#27199) Thanks @unisone.
|
||||
- Config/discovery: accept `discovery.wideArea.domain` in strict config validation so unicast DNS-SD gateway configs no longer fail with an unrecognized-key error. (#35615) Thanks @ingyukoh.
|
||||
- Telegram/media errors: redact Telegram file URLs before building media fetch errors so failed inbound downloads do not leak bot tokens into logs. Thanks @space08.
|
||||
- Agents/failover: normalize abort-wrapped `429 RESOURCE_EXHAUSTED` provider failures before abort short-circuiting so wrapped Google/Vertex rate limits continue across configured fallback models, including the embedded runner prompt-error path. (#39820) Thanks @lupuletic.
|
||||
- Mattermost/thread routing: non-inbound reply paths (TUI/WebUI turns, tool-call callbacks, subagent responses) now correctly route to the originating Mattermost thread when `replyToMode: "all"` is active; also prevents stale `origin.threadId` metadata from resurrecting cleared thread routes. (#44283) thanks @teconomix
|
||||
|
||||
## 2026.3.12
|
||||
|
||||
|
|
|
|||
|
|
@ -355,6 +355,53 @@ describe("mattermostPlugin", () => {
|
|||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("uses threadId as fallback when replyToId is absent (sendText)", async () => {
|
||||
const sendText = mattermostPlugin.outbound?.sendText;
|
||||
if (!sendText) {
|
||||
return;
|
||||
}
|
||||
|
||||
await sendText({
|
||||
to: "channel:CHAN1",
|
||||
text: "hello",
|
||||
accountId: "default",
|
||||
threadId: "post-root",
|
||||
} as any);
|
||||
|
||||
expect(sendMessageMattermostMock).toHaveBeenCalledWith(
|
||||
"channel:CHAN1",
|
||||
"hello",
|
||||
expect.objectContaining({
|
||||
accountId: "default",
|
||||
replyToId: "post-root",
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("uses threadId as fallback when replyToId is absent (sendMedia)", async () => {
|
||||
const sendMedia = mattermostPlugin.outbound?.sendMedia;
|
||||
if (!sendMedia) {
|
||||
return;
|
||||
}
|
||||
|
||||
await sendMedia({
|
||||
to: "channel:CHAN1",
|
||||
text: "caption",
|
||||
mediaUrl: "https://example.com/image.png",
|
||||
accountId: "default",
|
||||
threadId: "post-root",
|
||||
} as any);
|
||||
|
||||
expect(sendMessageMattermostMock).toHaveBeenCalledWith(
|
||||
"channel:CHAN1",
|
||||
"caption",
|
||||
expect.objectContaining({
|
||||
accountId: "default",
|
||||
replyToId: "post-root",
|
||||
}),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe("config", () => {
|
||||
|
|
|
|||
|
|
@ -390,21 +390,30 @@ export const mattermostPlugin: ChannelPlugin<ResolvedMattermostAccount> = {
|
|||
}
|
||||
return { ok: true, to: trimmed };
|
||||
},
|
||||
sendText: async ({ cfg, to, text, accountId, replyToId }) => {
|
||||
sendText: async ({ cfg, to, text, accountId, replyToId, threadId }) => {
|
||||
const result = await sendMessageMattermost(to, text, {
|
||||
cfg,
|
||||
accountId: accountId ?? undefined,
|
||||
replyToId: replyToId ?? undefined,
|
||||
replyToId: replyToId ?? (threadId != null ? String(threadId) : undefined),
|
||||
});
|
||||
return { channel: "mattermost", ...result };
|
||||
},
|
||||
sendMedia: async ({ cfg, to, text, mediaUrl, mediaLocalRoots, accountId, replyToId }) => {
|
||||
sendMedia: async ({
|
||||
cfg,
|
||||
to,
|
||||
text,
|
||||
mediaUrl,
|
||||
mediaLocalRoots,
|
||||
accountId,
|
||||
replyToId,
|
||||
threadId,
|
||||
}) => {
|
||||
const result = await sendMessageMattermost(to, text, {
|
||||
cfg,
|
||||
accountId: accountId ?? undefined,
|
||||
mediaUrl,
|
||||
mediaLocalRoots,
|
||||
replyToId: replyToId ?? undefined,
|
||||
replyToId: replyToId ?? (threadId != null ? String(threadId) : undefined),
|
||||
});
|
||||
return { channel: "mattermost", ...result };
|
||||
},
|
||||
|
|
|
|||
|
|
@ -364,6 +364,23 @@ describe("failover-error", () => {
|
|||
expect(isTimeoutError(err)).toBe(true);
|
||||
});
|
||||
|
||||
it("classifies abort-wrapped RESOURCE_EXHAUSTED as rate_limit", () => {
|
||||
const err = Object.assign(new Error("request aborted"), {
|
||||
name: "AbortError",
|
||||
cause: {
|
||||
error: {
|
||||
code: 429,
|
||||
message: GEMINI_RESOURCE_EXHAUSTED_MESSAGE,
|
||||
status: "RESOURCE_EXHAUSTED",
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
expect(resolveFailoverReasonFromError(err)).toBe("rate_limit");
|
||||
expect(coerceToFailoverError(err)?.reason).toBe("rate_limit");
|
||||
expect(coerceToFailoverError(err)?.status).toBe(429);
|
||||
});
|
||||
|
||||
it("coerces failover-worthy errors into FailoverError with metadata", () => {
|
||||
const err = coerceToFailoverError("credit balance too low", {
|
||||
provider: "anthropic",
|
||||
|
|
|
|||
|
|
@ -68,7 +68,30 @@ export function resolveFailoverStatus(reason: FailoverReason): number | undefine
|
|||
}
|
||||
}
|
||||
|
||||
function getStatusCode(err: unknown): number | undefined {
|
||||
function findErrorProperty<T>(
|
||||
err: unknown,
|
||||
reader: (candidate: unknown) => T | undefined,
|
||||
seen: Set<object> = new Set(),
|
||||
): T | undefined {
|
||||
const direct = reader(err);
|
||||
if (direct !== undefined) {
|
||||
return direct;
|
||||
}
|
||||
if (!err || typeof err !== "object") {
|
||||
return undefined;
|
||||
}
|
||||
if (seen.has(err)) {
|
||||
return undefined;
|
||||
}
|
||||
seen.add(err);
|
||||
const candidate = err as { error?: unknown; cause?: unknown };
|
||||
return (
|
||||
findErrorProperty(candidate.error, reader, seen) ??
|
||||
findErrorProperty(candidate.cause, reader, seen)
|
||||
);
|
||||
}
|
||||
|
||||
function readDirectStatusCode(err: unknown): number | undefined {
|
||||
if (!err || typeof err !== "object") {
|
||||
return undefined;
|
||||
}
|
||||
|
|
@ -84,38 +107,87 @@ function getStatusCode(err: unknown): number | undefined {
|
|||
return undefined;
|
||||
}
|
||||
|
||||
function getErrorCode(err: unknown): string | undefined {
|
||||
function getStatusCode(err: unknown): number | undefined {
|
||||
return findErrorProperty(err, readDirectStatusCode);
|
||||
}
|
||||
|
||||
function readDirectErrorCode(err: unknown): string | undefined {
|
||||
if (!err || typeof err !== "object") {
|
||||
return undefined;
|
||||
}
|
||||
const candidate = (err as { code?: unknown }).code;
|
||||
if (typeof candidate !== "string") {
|
||||
const directCode = (err as { code?: unknown }).code;
|
||||
if (typeof directCode === "string") {
|
||||
const trimmed = directCode.trim();
|
||||
return trimmed ? trimmed : undefined;
|
||||
}
|
||||
const status = (err as { status?: unknown }).status;
|
||||
if (typeof status !== "string" || /^\d+$/.test(status)) {
|
||||
return undefined;
|
||||
}
|
||||
const trimmed = candidate.trim();
|
||||
const trimmed = status.trim();
|
||||
return trimmed ? trimmed : undefined;
|
||||
}
|
||||
|
||||
function getErrorMessage(err: unknown): string {
|
||||
function getErrorCode(err: unknown): string | undefined {
|
||||
return findErrorProperty(err, readDirectErrorCode);
|
||||
}
|
||||
|
||||
function readDirectErrorMessage(err: unknown): string | undefined {
|
||||
if (err instanceof Error) {
|
||||
return err.message;
|
||||
return err.message || undefined;
|
||||
}
|
||||
if (typeof err === "string") {
|
||||
return err;
|
||||
return err || undefined;
|
||||
}
|
||||
if (typeof err === "number" || typeof err === "boolean" || typeof err === "bigint") {
|
||||
return String(err);
|
||||
}
|
||||
if (typeof err === "symbol") {
|
||||
return err.description ?? "";
|
||||
return err.description ?? undefined;
|
||||
}
|
||||
if (err && typeof err === "object") {
|
||||
const message = (err as { message?: unknown }).message;
|
||||
if (typeof message === "string") {
|
||||
return message;
|
||||
return message || undefined;
|
||||
}
|
||||
}
|
||||
return "";
|
||||
return undefined;
|
||||
}
|
||||
|
||||
function getErrorMessage(err: unknown): string {
|
||||
return findErrorProperty(err, readDirectErrorMessage) ?? "";
|
||||
}
|
||||
|
||||
function getErrorCause(err: unknown): unknown {
|
||||
if (!err || typeof err !== "object" || !("cause" in err)) {
|
||||
return undefined;
|
||||
}
|
||||
return (err as { cause?: unknown }).cause;
|
||||
}
|
||||
|
||||
/** Classify rate-limit / overloaded from symbolic error codes like RESOURCE_EXHAUSTED. */
|
||||
function classifyFailoverReasonFromSymbolicCode(raw: string | undefined): FailoverReason | null {
|
||||
const normalized = raw?.trim().toUpperCase();
|
||||
if (!normalized) {
|
||||
return null;
|
||||
}
|
||||
switch (normalized) {
|
||||
case "RESOURCE_EXHAUSTED":
|
||||
case "RATE_LIMIT":
|
||||
case "RATE_LIMITED":
|
||||
case "RATE_LIMIT_EXCEEDED":
|
||||
case "TOO_MANY_REQUESTS":
|
||||
case "THROTTLED":
|
||||
case "THROTTLING":
|
||||
case "THROTTLINGEXCEPTION":
|
||||
case "THROTTLING_EXCEPTION":
|
||||
return "rate_limit";
|
||||
case "OVERLOADED":
|
||||
case "OVERLOADED_ERROR":
|
||||
return "overloaded";
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
function hasTimeoutHint(err: unknown): boolean {
|
||||
|
|
@ -160,6 +232,12 @@ export function resolveFailoverReasonFromError(err: unknown): FailoverReason | n
|
|||
return statusReason;
|
||||
}
|
||||
|
||||
// Check symbolic error codes (e.g. RESOURCE_EXHAUSTED from Google APIs)
|
||||
const symbolicCodeReason = classifyFailoverReasonFromSymbolicCode(getErrorCode(err));
|
||||
if (symbolicCodeReason) {
|
||||
return symbolicCodeReason;
|
||||
}
|
||||
|
||||
const code = (getErrorCode(err) ?? "").toUpperCase();
|
||||
if (
|
||||
[
|
||||
|
|
@ -178,6 +256,16 @@ export function resolveFailoverReasonFromError(err: unknown): FailoverReason | n
|
|||
) {
|
||||
return "timeout";
|
||||
}
|
||||
// Walk into error cause chain *before* timeout heuristics so that a specific
|
||||
// cause (e.g. RESOURCE_EXHAUSTED wrapped in AbortError) overrides a parent
|
||||
// message-based "timeout" guess from isTimeoutError.
|
||||
const cause = getErrorCause(err);
|
||||
if (cause && cause !== err) {
|
||||
const causeReason = resolveFailoverReasonFromError(cause);
|
||||
if (causeReason) {
|
||||
return causeReason;
|
||||
}
|
||||
}
|
||||
if (isTimeoutError(err)) {
|
||||
return "timeout";
|
||||
}
|
||||
|
|
|
|||
|
|
@ -331,6 +331,77 @@ describe("runWithModelFallback – probe logic", () => {
|
|||
});
|
||||
});
|
||||
|
||||
it("keeps walking remaining fallbacks after an abort-wrapped RESOURCE_EXHAUSTED probe failure", async () => {
|
||||
const cfg = makeCfg({
|
||||
agents: {
|
||||
defaults: {
|
||||
model: {
|
||||
primary: "google/gemini-3-flash-preview",
|
||||
fallbacks: ["anthropic/claude-haiku-3-5", "deepseek/deepseek-chat"],
|
||||
},
|
||||
},
|
||||
},
|
||||
} as Partial<OpenClawConfig>);
|
||||
|
||||
mockedResolveAuthProfileOrder.mockImplementation(({ provider }: { provider: string }) => {
|
||||
if (provider === "google") {
|
||||
return ["google-profile-1"];
|
||||
}
|
||||
if (provider === "anthropic") {
|
||||
return ["anthropic-profile-1"];
|
||||
}
|
||||
if (provider === "deepseek") {
|
||||
return ["deepseek-profile-1"];
|
||||
}
|
||||
return [];
|
||||
});
|
||||
mockedIsProfileInCooldown.mockImplementation((_store, profileId: string) =>
|
||||
profileId.startsWith("google"),
|
||||
);
|
||||
mockedGetSoonestCooldownExpiry.mockReturnValue(NOW + 30 * 1000);
|
||||
mockedResolveProfilesUnavailableReason.mockReturnValue("rate_limit");
|
||||
|
||||
// Simulate Google Vertex abort-wrapped RESOURCE_EXHAUSTED (the shape that was
|
||||
// previously swallowed by shouldRethrowAbort before the fallback loop could continue)
|
||||
const primaryAbort = Object.assign(new Error("request aborted"), {
|
||||
name: "AbortError",
|
||||
cause: {
|
||||
error: {
|
||||
code: 429,
|
||||
message: "Resource has been exhausted (e.g. check quota).",
|
||||
status: "RESOURCE_EXHAUSTED",
|
||||
},
|
||||
},
|
||||
});
|
||||
const run = vi
|
||||
.fn()
|
||||
.mockRejectedValueOnce(primaryAbort)
|
||||
.mockRejectedValueOnce(
|
||||
Object.assign(new Error("fallback still rate limited"), { status: 429 }),
|
||||
)
|
||||
.mockRejectedValueOnce(
|
||||
Object.assign(new Error("final fallback still rate limited"), { status: 429 }),
|
||||
);
|
||||
|
||||
await expect(
|
||||
runWithModelFallback({
|
||||
cfg,
|
||||
provider: "google",
|
||||
model: "gemini-3-flash-preview",
|
||||
run,
|
||||
}),
|
||||
).rejects.toThrow(/All models failed \(3\)/);
|
||||
|
||||
// All three candidates must be attempted — the abort must not short-circuit
|
||||
expect(run).toHaveBeenCalledTimes(3);
|
||||
|
||||
expect(run).toHaveBeenNthCalledWith(1, "google", "gemini-3-flash-preview", {
|
||||
allowTransientCooldownProbe: true,
|
||||
});
|
||||
expect(run).toHaveBeenNthCalledWith(2, "anthropic", "claude-haiku-3-5");
|
||||
expect(run).toHaveBeenNthCalledWith(3, "deepseek", "deepseek-chat");
|
||||
});
|
||||
|
||||
it("throttles probe when called within 30s interval", async () => {
|
||||
const cfg = makeCfg();
|
||||
// Cooldown just about to expire (within probe margin)
|
||||
|
|
|
|||
|
|
@ -140,10 +140,16 @@ async function runFallbackCandidate<T>(params: {
|
|||
result,
|
||||
};
|
||||
} catch (err) {
|
||||
if (shouldRethrowAbort(err)) {
|
||||
// Normalize abort-wrapped rate-limit errors (e.g. Google Vertex RESOURCE_EXHAUSTED)
|
||||
// so they become FailoverErrors and continue the fallback loop instead of aborting.
|
||||
const normalizedFailover = coerceToFailoverError(err, {
|
||||
provider: params.provider,
|
||||
model: params.model,
|
||||
});
|
||||
if (shouldRethrowAbort(err) && !normalizedFailover) {
|
||||
throw err;
|
||||
}
|
||||
return { ok: false, error: err };
|
||||
return { ok: false, error: normalizedFailover ?? err };
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -209,9 +209,36 @@ vi.mock("../defaults.js", () => ({
|
|||
DEFAULT_PROVIDER: "anthropic",
|
||||
}));
|
||||
|
||||
type MockFailoverErrorDescription = {
|
||||
message: string;
|
||||
reason: string | undefined;
|
||||
status: number | undefined;
|
||||
code: string | undefined;
|
||||
};
|
||||
|
||||
type MockCoerceToFailoverError = (
|
||||
err: unknown,
|
||||
params?: { provider?: string; model?: string; profileId?: string },
|
||||
) => unknown;
|
||||
type MockDescribeFailoverError = (err: unknown) => MockFailoverErrorDescription;
|
||||
type MockResolveFailoverStatus = (reason: string) => number | undefined;
|
||||
|
||||
export const mockedCoerceToFailoverError = vi.fn<MockCoerceToFailoverError>();
|
||||
export const mockedDescribeFailoverError = vi.fn<MockDescribeFailoverError>(
|
||||
(err: unknown): MockFailoverErrorDescription => ({
|
||||
message: err instanceof Error ? err.message : String(err),
|
||||
reason: undefined,
|
||||
status: undefined,
|
||||
code: undefined,
|
||||
}),
|
||||
);
|
||||
export const mockedResolveFailoverStatus = vi.fn<MockResolveFailoverStatus>();
|
||||
|
||||
vi.mock("../failover-error.js", () => ({
|
||||
FailoverError: class extends Error {},
|
||||
resolveFailoverStatus: vi.fn(),
|
||||
coerceToFailoverError: mockedCoerceToFailoverError,
|
||||
describeFailoverError: mockedDescribeFailoverError,
|
||||
resolveFailoverStatus: mockedResolveFailoverStatus,
|
||||
}));
|
||||
|
||||
vi.mock("./lanes.js", () => ({
|
||||
|
|
|
|||
|
|
@ -9,7 +9,12 @@ import {
|
|||
mockOverflowRetrySuccess,
|
||||
queueOverflowAttemptWithOversizedToolOutput,
|
||||
} from "./run.overflow-compaction.fixture.js";
|
||||
import { mockedGlobalHookRunner } from "./run.overflow-compaction.mocks.shared.js";
|
||||
import {
|
||||
mockedCoerceToFailoverError,
|
||||
mockedDescribeFailoverError,
|
||||
mockedGlobalHookRunner,
|
||||
mockedResolveFailoverStatus,
|
||||
} from "./run.overflow-compaction.mocks.shared.js";
|
||||
import {
|
||||
mockedContextEngine,
|
||||
mockedCompactDirect,
|
||||
|
|
@ -25,6 +30,9 @@ describe("runEmbeddedPiAgent overflow compaction trigger routing", () => {
|
|||
vi.clearAllMocks();
|
||||
mockedRunEmbeddedAttempt.mockReset();
|
||||
mockedCompactDirect.mockReset();
|
||||
mockedCoerceToFailoverError.mockReset();
|
||||
mockedDescribeFailoverError.mockReset();
|
||||
mockedResolveFailoverStatus.mockReset();
|
||||
mockedSessionLikelyHasOversizedToolResults.mockReset();
|
||||
mockedTruncateOversizedToolResultsInSession.mockReset();
|
||||
mockedGlobalHookRunner.runBeforeAgentStart.mockReset();
|
||||
|
|
@ -36,6 +44,13 @@ describe("runEmbeddedPiAgent overflow compaction trigger routing", () => {
|
|||
compacted: false,
|
||||
reason: "nothing to compact",
|
||||
});
|
||||
mockedCoerceToFailoverError.mockReturnValue(null);
|
||||
mockedDescribeFailoverError.mockImplementation((err: unknown) => ({
|
||||
message: err instanceof Error ? err.message : String(err),
|
||||
reason: undefined,
|
||||
status: undefined,
|
||||
code: undefined,
|
||||
}));
|
||||
mockedSessionLikelyHasOversizedToolResults.mockReturnValue(false);
|
||||
mockedTruncateOversizedToolResultsInSession.mockResolvedValue({
|
||||
truncated: false,
|
||||
|
|
@ -255,4 +270,57 @@ describe("runEmbeddedPiAgent overflow compaction trigger routing", () => {
|
|||
expect(result.meta.error?.kind).toBe("retry_limit");
|
||||
expect(result.payloads?.[0]?.isError).toBe(true);
|
||||
});
|
||||
|
||||
it("normalizes abort-wrapped prompt errors before handing off to model fallback", async () => {
|
||||
const promptError = Object.assign(new Error("request aborted"), {
|
||||
name: "AbortError",
|
||||
cause: {
|
||||
error: {
|
||||
code: 429,
|
||||
message: "Resource has been exhausted (e.g. check quota).",
|
||||
status: "RESOURCE_EXHAUSTED",
|
||||
},
|
||||
},
|
||||
});
|
||||
const normalized = Object.assign(new Error("Resource has been exhausted (e.g. check quota)."), {
|
||||
name: "FailoverError",
|
||||
reason: "rate_limit",
|
||||
status: 429,
|
||||
});
|
||||
|
||||
mockedRunEmbeddedAttempt.mockResolvedValueOnce(makeAttemptResult({ promptError }));
|
||||
mockedCoerceToFailoverError.mockReturnValueOnce(normalized);
|
||||
mockedDescribeFailoverError.mockImplementation((err: unknown) => ({
|
||||
message: err instanceof Error ? err.message : String(err),
|
||||
reason: err === normalized ? "rate_limit" : undefined,
|
||||
status: err === normalized ? 429 : undefined,
|
||||
code: undefined,
|
||||
}));
|
||||
mockedResolveFailoverStatus.mockReturnValueOnce(429);
|
||||
|
||||
await expect(
|
||||
runEmbeddedPiAgent({
|
||||
...overflowBaseRunParams,
|
||||
config: {
|
||||
agents: {
|
||||
defaults: {
|
||||
model: {
|
||||
fallbacks: ["openai/gpt-5.2"],
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}),
|
||||
).rejects.toBe(normalized);
|
||||
|
||||
expect(mockedCoerceToFailoverError).toHaveBeenCalledWith(
|
||||
promptError,
|
||||
expect.objectContaining({
|
||||
provider: "anthropic",
|
||||
model: "test-model",
|
||||
profileId: "test-profile",
|
||||
}),
|
||||
);
|
||||
expect(mockedResolveFailoverStatus).toHaveBeenCalledWith("rate_limit");
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -28,7 +28,12 @@ import {
|
|||
resolveContextWindowInfo,
|
||||
} from "../context-window-guard.js";
|
||||
import { DEFAULT_CONTEXT_TOKENS, DEFAULT_MODEL, DEFAULT_PROVIDER } from "../defaults.js";
|
||||
import { FailoverError, resolveFailoverStatus } from "../failover-error.js";
|
||||
import {
|
||||
coerceToFailoverError,
|
||||
describeFailoverError,
|
||||
FailoverError,
|
||||
resolveFailoverStatus,
|
||||
} from "../failover-error.js";
|
||||
import {
|
||||
applyLocalNoAuthHeaderOverride,
|
||||
ensureAuthProfileStore,
|
||||
|
|
@ -1217,7 +1222,17 @@ export async function runEmbeddedPiAgent(
|
|||
}
|
||||
|
||||
if (promptError && !aborted) {
|
||||
const errorText = describeUnknownError(promptError);
|
||||
// Normalize wrapped errors (e.g. abort-wrapped RESOURCE_EXHAUSTED) into
|
||||
// FailoverError so rate-limit classification works even for nested shapes.
|
||||
const normalizedPromptFailover = coerceToFailoverError(promptError, {
|
||||
provider: activeErrorContext.provider,
|
||||
model: activeErrorContext.model,
|
||||
profileId: lastProfileId,
|
||||
});
|
||||
const promptErrorDetails = normalizedPromptFailover
|
||||
? describeFailoverError(normalizedPromptFailover)
|
||||
: describeFailoverError(promptError);
|
||||
const errorText = promptErrorDetails.message || describeUnknownError(promptError);
|
||||
if (await maybeRefreshCopilotForAuthError(errorText, copilotAuthRetry)) {
|
||||
authRetryPending = true;
|
||||
continue;
|
||||
|
|
@ -1281,14 +1296,16 @@ export async function runEmbeddedPiAgent(
|
|||
},
|
||||
};
|
||||
}
|
||||
const promptFailoverReason = classifyFailoverReason(errorText);
|
||||
const promptFailoverReason =
|
||||
promptErrorDetails.reason ?? classifyFailoverReason(errorText);
|
||||
const promptProfileFailureReason =
|
||||
resolveAuthProfileFailureReason(promptFailoverReason);
|
||||
await maybeMarkAuthProfileFailure({
|
||||
profileId: lastProfileId,
|
||||
reason: promptProfileFailureReason,
|
||||
});
|
||||
const promptFailoverFailure = isFailoverErrorMessage(errorText);
|
||||
const promptFailoverFailure =
|
||||
promptFailoverReason !== null || isFailoverErrorMessage(errorText);
|
||||
// Capture the failing profile before auth-profile rotation mutates `lastProfileId`.
|
||||
const failedPromptProfileId = lastProfileId;
|
||||
const logPromptFailoverDecision = createFailoverDecisionLogger({
|
||||
|
|
@ -1330,13 +1347,16 @@ export async function runEmbeddedPiAgent(
|
|||
const status = resolveFailoverStatus(promptFailoverReason ?? "unknown");
|
||||
logPromptFailoverDecision("fallback_model", { status });
|
||||
await maybeBackoffBeforeOverloadFailover(promptFailoverReason);
|
||||
throw new FailoverError(errorText, {
|
||||
reason: promptFailoverReason ?? "unknown",
|
||||
provider,
|
||||
model: modelId,
|
||||
profileId: lastProfileId,
|
||||
status,
|
||||
});
|
||||
throw (
|
||||
normalizedPromptFailover ??
|
||||
new FailoverError(errorText, {
|
||||
reason: promptFailoverReason ?? "unknown",
|
||||
provider,
|
||||
model: modelId,
|
||||
profileId: lastProfileId,
|
||||
status: resolveFailoverStatus(promptFailoverReason ?? "unknown"),
|
||||
})
|
||||
);
|
||||
}
|
||||
if (promptFailoverFailure || promptFailoverReason) {
|
||||
logPromptFailoverDecision("surface_error");
|
||||
|
|
|
|||
|
|
@ -41,6 +41,12 @@ const acpMocks = vi.hoisted(() => ({
|
|||
const sessionBindingMocks = vi.hoisted(() => ({
|
||||
listBySession: vi.fn<(targetSessionKey: string) => SessionBindingRecord[]>(() => []),
|
||||
}));
|
||||
const sessionStoreMocks = vi.hoisted(() => ({
|
||||
currentEntry: undefined as Record<string, unknown> | undefined,
|
||||
loadSessionStore: vi.fn(() => ({})),
|
||||
resolveStorePath: vi.fn(() => "/tmp/mock-sessions.json"),
|
||||
resolveSessionStoreEntry: vi.fn(() => ({ existing: sessionStoreMocks.currentEntry })),
|
||||
}));
|
||||
const ttsMocks = vi.hoisted(() => {
|
||||
const state = {
|
||||
synthesizeFinalAudio: false,
|
||||
|
|
@ -77,9 +83,16 @@ vi.mock("./route-reply.js", () => ({
|
|||
isRoutableChannel: (channel: string | undefined) =>
|
||||
Boolean(
|
||||
channel &&
|
||||
["telegram", "slack", "discord", "signal", "imessage", "whatsapp", "feishu"].includes(
|
||||
channel,
|
||||
),
|
||||
[
|
||||
"telegram",
|
||||
"slack",
|
||||
"discord",
|
||||
"signal",
|
||||
"imessage",
|
||||
"whatsapp",
|
||||
"feishu",
|
||||
"mattermost",
|
||||
].includes(channel),
|
||||
),
|
||||
routeReply: mocks.routeReply,
|
||||
}));
|
||||
|
|
@ -100,6 +113,15 @@ vi.mock("../../logging/diagnostic.js", () => ({
|
|||
logMessageProcessed: diagnosticMocks.logMessageProcessed,
|
||||
logSessionStateChange: diagnosticMocks.logSessionStateChange,
|
||||
}));
|
||||
vi.mock("../../config/sessions.js", async (importOriginal) => {
|
||||
const actual = await importOriginal<typeof import("../../config/sessions.js")>();
|
||||
return {
|
||||
...actual,
|
||||
loadSessionStore: sessionStoreMocks.loadSessionStore,
|
||||
resolveStorePath: sessionStoreMocks.resolveStorePath,
|
||||
resolveSessionStoreEntry: sessionStoreMocks.resolveSessionStoreEntry,
|
||||
};
|
||||
});
|
||||
|
||||
vi.mock("../../plugins/hook-runner-global.js", () => ({
|
||||
getGlobalHookRunner: () => hookMocks.runner,
|
||||
|
|
@ -228,6 +250,10 @@ describe("dispatchReplyFromConfig", () => {
|
|||
acpMocks.requireAcpRuntimeBackend.mockReset();
|
||||
sessionBindingMocks.listBySession.mockReset();
|
||||
sessionBindingMocks.listBySession.mockReturnValue([]);
|
||||
sessionStoreMocks.currentEntry = undefined;
|
||||
sessionStoreMocks.loadSessionStore.mockClear();
|
||||
sessionStoreMocks.resolveStorePath.mockClear();
|
||||
sessionStoreMocks.resolveSessionStoreEntry.mockClear();
|
||||
ttsMocks.state.synthesizeFinalAudio = false;
|
||||
ttsMocks.maybeApplyTtsToPayload.mockClear();
|
||||
ttsMocks.normalizeTtsAutoMode.mockClear();
|
||||
|
|
@ -293,6 +319,88 @@ describe("dispatchReplyFromConfig", () => {
|
|||
);
|
||||
});
|
||||
|
||||
it("falls back to thread-scoped session key when current ctx has no MessageThreadId", async () => {
|
||||
setNoAbort();
|
||||
mocks.routeReply.mockClear();
|
||||
sessionStoreMocks.currentEntry = {
|
||||
deliveryContext: {
|
||||
channel: "mattermost",
|
||||
to: "channel:CHAN1",
|
||||
accountId: "default",
|
||||
},
|
||||
origin: {
|
||||
threadId: "stale-origin-root",
|
||||
},
|
||||
lastThreadId: "stale-origin-root",
|
||||
};
|
||||
const cfg = emptyConfig;
|
||||
const dispatcher = createDispatcher();
|
||||
const ctx = buildTestCtx({
|
||||
Provider: "webchat",
|
||||
Surface: "webchat",
|
||||
SessionKey: "agent:main:mattermost:channel:CHAN1:thread:post-root",
|
||||
AccountId: "default",
|
||||
MessageThreadId: undefined,
|
||||
OriginatingChannel: "mattermost",
|
||||
OriginatingTo: "channel:CHAN1",
|
||||
ExplicitDeliverRoute: true,
|
||||
});
|
||||
|
||||
const replyResolver = async () => ({ text: "hi" }) satisfies ReplyPayload;
|
||||
await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver });
|
||||
|
||||
expect(mocks.routeReply).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
channel: "mattermost",
|
||||
to: "channel:CHAN1",
|
||||
threadId: "post-root",
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("does not resurrect a cleared route thread from origin metadata", async () => {
|
||||
setNoAbort();
|
||||
mocks.routeReply.mockClear();
|
||||
// Simulate the real store: lastThreadId and deliveryContext.threadId may be normalised from
|
||||
// origin.threadId on read, but a non-thread session key must still route to channel root.
|
||||
sessionStoreMocks.currentEntry = {
|
||||
deliveryContext: {
|
||||
channel: "mattermost",
|
||||
to: "channel:CHAN1",
|
||||
accountId: "default",
|
||||
threadId: "stale-root",
|
||||
},
|
||||
lastThreadId: "stale-root",
|
||||
origin: {
|
||||
threadId: "stale-root",
|
||||
},
|
||||
};
|
||||
const cfg = emptyConfig;
|
||||
const dispatcher = createDispatcher();
|
||||
const ctx = buildTestCtx({
|
||||
Provider: "webchat",
|
||||
Surface: "webchat",
|
||||
SessionKey: "agent:main:mattermost:channel:CHAN1",
|
||||
AccountId: "default",
|
||||
MessageThreadId: undefined,
|
||||
OriginatingChannel: "mattermost",
|
||||
OriginatingTo: "channel:CHAN1",
|
||||
ExplicitDeliverRoute: true,
|
||||
});
|
||||
|
||||
const replyResolver = async () => ({ text: "hi" }) satisfies ReplyPayload;
|
||||
await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver });
|
||||
|
||||
const routeCall = mocks.routeReply.mock.calls[0]?.[0] as
|
||||
| { channel?: string; to?: string; threadId?: string | number }
|
||||
| undefined;
|
||||
expect(routeCall).toMatchObject({
|
||||
channel: "mattermost",
|
||||
to: "channel:CHAN1",
|
||||
});
|
||||
expect(routeCall?.threadId).toBeUndefined();
|
||||
});
|
||||
|
||||
it("forces suppressTyping when routing to a different originating channel", async () => {
|
||||
setNoAbort();
|
||||
const cfg = emptyConfig;
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ import { resolveSessionAgentId } from "../../agents/agent-scope.js";
|
|||
import type { OpenClawConfig } from "../../config/config.js";
|
||||
import {
|
||||
loadSessionStore,
|
||||
parseSessionThreadInfo,
|
||||
resolveSessionStoreEntry,
|
||||
resolveStorePath,
|
||||
type SessionEntry,
|
||||
|
|
@ -172,6 +173,12 @@ export async function dispatchReplyFromConfig(params: {
|
|||
|
||||
const sessionStoreEntry = resolveSessionStoreLookup(ctx, cfg);
|
||||
const acpDispatchSessionKey = sessionStoreEntry.sessionKey ?? sessionKey;
|
||||
// Restore route thread context only from the active turn or the thread-scoped session key.
|
||||
// Do not read thread ids from the normalised session store here: `origin.threadId` can be
|
||||
// folded back into lastThreadId/deliveryContext during store normalisation and resurrect a
|
||||
// stale route after thread delivery was intentionally cleared.
|
||||
const routeThreadId =
|
||||
ctx.MessageThreadId ?? parseSessionThreadInfo(acpDispatchSessionKey).threadId;
|
||||
const inboundAudio = isInboundAudioContext(ctx);
|
||||
const sessionTtsAuto = normalizeTtsAutoMode(sessionStoreEntry.entry?.ttsAuto);
|
||||
const hookRunner = getGlobalHookRunner();
|
||||
|
|
@ -260,7 +267,7 @@ export async function dispatchReplyFromConfig(params: {
|
|||
to: originatingTo,
|
||||
sessionKey: ctx.SessionKey,
|
||||
accountId: ctx.AccountId,
|
||||
threadId: ctx.MessageThreadId,
|
||||
threadId: routeThreadId,
|
||||
cfg,
|
||||
abortSignal,
|
||||
mirror,
|
||||
|
|
@ -289,7 +296,7 @@ export async function dispatchReplyFromConfig(params: {
|
|||
to: originatingTo,
|
||||
sessionKey: ctx.SessionKey,
|
||||
accountId: ctx.AccountId,
|
||||
threadId: ctx.MessageThreadId,
|
||||
threadId: routeThreadId,
|
||||
cfg,
|
||||
isGroup,
|
||||
groupId,
|
||||
|
|
@ -519,7 +526,7 @@ export async function dispatchReplyFromConfig(params: {
|
|||
to: originatingTo,
|
||||
sessionKey: ctx.SessionKey,
|
||||
accountId: ctx.AccountId,
|
||||
threadId: ctx.MessageThreadId,
|
||||
threadId: routeThreadId,
|
||||
cfg,
|
||||
isGroup,
|
||||
groupId,
|
||||
|
|
@ -571,7 +578,7 @@ export async function dispatchReplyFromConfig(params: {
|
|||
to: originatingTo,
|
||||
sessionKey: ctx.SessionKey,
|
||||
accountId: ctx.AccountId,
|
||||
threadId: ctx.MessageThreadId,
|
||||
threadId: routeThreadId,
|
||||
cfg,
|
||||
isGroup,
|
||||
groupId,
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { mattermostPlugin } from "../../../extensions/mattermost/src/channel.js";
|
||||
import { discordOutbound } from "../../channels/plugins/outbound/discord.js";
|
||||
import { imessageOutbound } from "../../channels/plugins/outbound/imessage.js";
|
||||
import { signalOutbound } from "../../channels/plugins/outbound/signal.js";
|
||||
|
|
@ -24,6 +25,7 @@ const mocks = vi.hoisted(() => ({
|
|||
sendMessageSlack: vi.fn(async () => ({ messageId: "m1", channelId: "c1" })),
|
||||
sendMessageTelegram: vi.fn(async () => ({ messageId: "m1", chatId: "c1" })),
|
||||
sendMessageWhatsApp: vi.fn(async () => ({ messageId: "m1", toJid: "jid" })),
|
||||
sendMessageMattermost: vi.fn(async () => ({ messageId: "m1", channelId: "c1" })),
|
||||
deliverOutboundPayloads: vi.fn(),
|
||||
}));
|
||||
|
||||
|
|
@ -46,6 +48,9 @@ vi.mock("../../web/outbound.js", () => ({
|
|||
sendMessageWhatsApp: mocks.sendMessageWhatsApp,
|
||||
sendPollWhatsApp: mocks.sendMessageWhatsApp,
|
||||
}));
|
||||
vi.mock("../../../extensions/mattermost/src/mattermost/send.js", () => ({
|
||||
sendMessageMattermost: mocks.sendMessageMattermost,
|
||||
}));
|
||||
vi.mock("../../infra/outbound/deliver.js", async () => {
|
||||
const actual = await vi.importActual<typeof import("../../infra/outbound/deliver.js")>(
|
||||
"../../infra/outbound/deliver.js",
|
||||
|
|
@ -335,6 +340,33 @@ describe("routeReply", () => {
|
|||
);
|
||||
});
|
||||
|
||||
it("uses threadId as replyToId for Mattermost when replyToId is missing", async () => {
|
||||
mocks.deliverOutboundPayloads.mockResolvedValue([]);
|
||||
await routeReply({
|
||||
payload: { text: "hi" },
|
||||
channel: "mattermost",
|
||||
to: "channel:CHAN1",
|
||||
threadId: "post-root",
|
||||
cfg: {
|
||||
channels: {
|
||||
mattermost: {
|
||||
enabled: true,
|
||||
botToken: "test-token",
|
||||
baseUrl: "https://chat.example.com",
|
||||
},
|
||||
},
|
||||
} as unknown as OpenClawConfig,
|
||||
});
|
||||
expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
channel: "mattermost",
|
||||
to: "channel:CHAN1",
|
||||
replyToId: "post-root",
|
||||
threadId: "post-root",
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("sends multiple mediaUrls (caption only on first)", async () => {
|
||||
mocks.sendMessageSlack.mockClear();
|
||||
await routeReply({
|
||||
|
|
@ -501,4 +533,9 @@ const defaultRegistry = createTestRegistry([
|
|||
}),
|
||||
source: "test",
|
||||
},
|
||||
{
|
||||
pluginId: "mattermost",
|
||||
plugin: mattermostPlugin,
|
||||
source: "test",
|
||||
},
|
||||
]);
|
||||
|
|
|
|||
|
|
@ -149,7 +149,9 @@ export async function routeReply(params: RouteReplyParams): Promise<RouteReplyRe
|
|||
|
||||
const resolvedReplyToId =
|
||||
replyToId ??
|
||||
(channelId === "slack" && threadId != null && threadId !== "" ? String(threadId) : undefined);
|
||||
((channelId === "slack" || channelId === "mattermost") && threadId != null && threadId !== ""
|
||||
? String(threadId)
|
||||
: undefined);
|
||||
const resolvedThreadId = channelId === "slack" ? null : (threadId ?? null);
|
||||
|
||||
try {
|
||||
|
|
|
|||
|
|
@ -138,11 +138,10 @@ describe("runCronIsolatedAgentTurn", () => {
|
|||
});
|
||||
});
|
||||
|
||||
it("handles media heartbeat delivery and last-target text delivery", async () => {
|
||||
it("delivers media payloads even when heartbeat text is suppressed", async () => {
|
||||
await withTempHome(async (home) => {
|
||||
const { storePath, deps } = await createTelegramDeliveryFixture(home);
|
||||
|
||||
// Media should still be delivered even if text is just HEARTBEAT_OK.
|
||||
mockEmbeddedAgentPayloads([
|
||||
{ text: "HEARTBEAT_OK", mediaUrl: "https://example.com/img.png" },
|
||||
]);
|
||||
|
|
@ -156,9 +155,13 @@ describe("runCronIsolatedAgentTurn", () => {
|
|||
expect(mediaRes.status).toBe("ok");
|
||||
expect(deps.sendMessageTelegram).toHaveBeenCalled();
|
||||
expect(runSubagentAnnounceFlow).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
it("keeps non-empty heartbeat text when last-target ack suppression is disabled", async () => {
|
||||
await withTempHome(async (home) => {
|
||||
const { storePath, deps } = await createTelegramDeliveryFixture(home);
|
||||
|
||||
vi.mocked(runSubagentAnnounceFlow).mockClear();
|
||||
vi.mocked(deps.sendMessageTelegram).mockClear();
|
||||
mockEmbeddedAgentPayloads([{ text: "HEARTBEAT_OK 🦞" }]);
|
||||
|
||||
const cfg = makeCfg(home, storePath);
|
||||
|
|
@ -194,10 +197,23 @@ describe("runCronIsolatedAgentTurn", () => {
|
|||
"HEARTBEAT_OK 🦞",
|
||||
expect.objectContaining({ accountId: undefined }),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
vi.mocked(deps.sendMessageTelegram).mockClear();
|
||||
vi.mocked(runSubagentAnnounceFlow).mockClear();
|
||||
vi.mocked(callGateway).mockClear();
|
||||
it("deletes the direct cron session after last-target text delivery", async () => {
|
||||
await withTempHome(async (home) => {
|
||||
const { storePath, deps } = await createTelegramDeliveryFixture(home);
|
||||
|
||||
mockEmbeddedAgentPayloads([{ text: "HEARTBEAT_OK 🦞" }]);
|
||||
|
||||
const cfg = makeCfg(home, storePath);
|
||||
cfg.agents = {
|
||||
...cfg.agents,
|
||||
defaults: {
|
||||
...cfg.agents?.defaults,
|
||||
heartbeat: { ackMaxChars: 0 },
|
||||
},
|
||||
};
|
||||
|
||||
const deleteRes = await runCronIsolatedAgentTurn({
|
||||
cfg,
|
||||
|
|
|
|||
|
|
@ -91,12 +91,16 @@ vi.mock("../pairing/pairing-store.js", () => ({
|
|||
upsertChannelPairingRequest: (...args: unknown[]) => upsertPairingRequestMock(...args),
|
||||
}));
|
||||
|
||||
vi.mock("../config/sessions.js", () => ({
|
||||
resolveStorePath: vi.fn(() => "/tmp/openclaw-sessions.json"),
|
||||
updateLastRoute: (...args: unknown[]) => updateLastRouteMock(...args),
|
||||
readSessionUpdatedAt: vi.fn(() => undefined),
|
||||
recordSessionMetaFromInbound: vi.fn().mockResolvedValue(undefined),
|
||||
}));
|
||||
vi.mock("../config/sessions.js", async (importOriginal) => {
|
||||
const actual = await importOriginal<typeof import("../config/sessions.js")>();
|
||||
return {
|
||||
...actual,
|
||||
resolveStorePath: vi.fn(() => "/tmp/openclaw-sessions.json"),
|
||||
updateLastRoute: (...args: unknown[]) => updateLastRouteMock(...args),
|
||||
readSessionUpdatedAt: vi.fn(() => undefined),
|
||||
recordSessionMetaFromInbound: vi.fn().mockResolvedValue(undefined),
|
||||
};
|
||||
});
|
||||
|
||||
vi.mock("./client.js", () => ({
|
||||
streamSignalEvents: (...args: unknown[]) => streamMock(...args),
|
||||
|
|
|
|||
|
|
@ -180,13 +180,17 @@ vi.mock("../pairing/pairing-store.js", () => ({
|
|||
slackTestState.upsertPairingRequestMock(...args),
|
||||
}));
|
||||
|
||||
vi.mock("../config/sessions.js", () => ({
|
||||
resolveStorePath: vi.fn(() => "/tmp/openclaw-sessions.json"),
|
||||
updateLastRoute: (...args: unknown[]) => slackTestState.updateLastRouteMock(...args),
|
||||
resolveSessionKey: vi.fn(),
|
||||
readSessionUpdatedAt: vi.fn(() => undefined),
|
||||
recordSessionMetaFromInbound: vi.fn().mockResolvedValue(undefined),
|
||||
}));
|
||||
vi.mock("../config/sessions.js", async (importOriginal) => {
|
||||
const actual = await importOriginal<typeof import("../config/sessions.js")>();
|
||||
return {
|
||||
...actual,
|
||||
resolveStorePath: vi.fn(() => "/tmp/openclaw-sessions.json"),
|
||||
updateLastRoute: (...args: unknown[]) => slackTestState.updateLastRouteMock(...args),
|
||||
resolveSessionKey: vi.fn(),
|
||||
readSessionUpdatedAt: vi.fn(() => undefined),
|
||||
recordSessionMetaFromInbound: vi.fn().mockResolvedValue(undefined),
|
||||
};
|
||||
});
|
||||
|
||||
vi.mock("@slack/bolt", () => {
|
||||
const handlers = new Map<string, SlackHandler>();
|
||||
|
|
|
|||
Loading…
Reference in New Issue