From 79c246666272c4ca8fa7a4432294e9a6d30a09aa Mon Sep 17 00:00:00 2001 From: max <40643627+quotentiroler@users.noreply.github.com> Date: Mon, 9 Feb 2026 00:32:57 -0800 Subject: [PATCH] refactor: consolidate throwIfAborted + fix isCompactionFailureError (#12463) * refactor: consolidate throwIfAborted in outbound module - Create abort.ts with shared throwIfAborted helper - Update deliver.ts, message-action-runner.ts, outbound-send-service.ts * fix: handle context overflow in isCompactionFailureError without requiring colon --- scripts/analyze_code_files.py | 2 +- src/agents/pi-embedded-helpers/errors.ts | 14 ++++++++------ src/infra/outbound/abort.ts | 15 +++++++++++++++ src/infra/outbound/deliver.ts | 7 +------ src/infra/outbound/message-action-runner.ts | 9 +-------- src/infra/outbound/outbound-send-service.ts | 9 +-------- 6 files changed, 27 insertions(+), 29 deletions(-) create mode 100644 src/infra/outbound/abort.ts diff --git a/scripts/analyze_code_files.py b/scripts/analyze_code_files.py index 027f0aefbb3..b5a666efadd 100644 --- a/scripts/analyze_code_files.py +++ b/scripts/analyze_code_files.py @@ -157,7 +157,7 @@ def find_duplicate_functions(files: List[Tuple[Path, int]], root_dir: Path) -> D def main(): parser = argparse.ArgumentParser( - description='List the longest and shortest code files in a project' + description='Analyze code files: list longest/shortest files, find duplicate function names' ) parser.add_argument( '-t', '--threshold', diff --git a/src/agents/pi-embedded-helpers/errors.ts b/src/agents/pi-embedded-helpers/errors.ts index 6c69c593925..829351e20e0 100644 --- a/src/agents/pi-embedded-helpers/errors.ts +++ b/src/agents/pi-embedded-helpers/errors.ts @@ -50,16 +50,18 @@ export function isCompactionFailureError(errorMessage?: string): boolean { if (!errorMessage) { return false; } - if (!isContextOverflowError(errorMessage)) { - return false; - } const lower = errorMessage.toLowerCase(); - return ( + const hasCompactionTerm = lower.includes("summarization failed") || lower.includes("auto-compaction") || lower.includes("compaction failed") || - lower.includes("compaction") - ); + lower.includes("compaction"); + if (!hasCompactionTerm) { + return false; + } + // For compaction failures, also accept "context overflow" without colon + // since the error message itself describes a compaction/summarization failure + return isContextOverflowError(errorMessage) || lower.includes("context overflow"); } const ERROR_PAYLOAD_PREFIX_RE = diff --git a/src/infra/outbound/abort.ts b/src/infra/outbound/abort.ts new file mode 100644 index 00000000000..8d6b0e2cf4d --- /dev/null +++ b/src/infra/outbound/abort.ts @@ -0,0 +1,15 @@ +/** + * Utility for checking AbortSignal state and throwing a standard AbortError. + */ + +/** + * Throws an AbortError if the given signal has been aborted. + * Use at async checkpoints to support cancellation. + */ +export function throwIfAborted(abortSignal?: AbortSignal): void { + if (abortSignal?.aborted) { + const err = new Error("Operation aborted"); + err.name = "AbortError"; + throw err; + } +} diff --git a/src/infra/outbound/deliver.ts b/src/infra/outbound/deliver.ts index de7931a6492..186f30a748b 100644 --- a/src/infra/outbound/deliver.ts +++ b/src/infra/outbound/deliver.ts @@ -23,6 +23,7 @@ import { } from "../../config/sessions.js"; import { markdownToSignalTextChunks, type SignalTextStyleRange } from "../../signal/format.js"; import { sendMessageSignal } from "../../signal/send.js"; +import { throwIfAborted } from "./abort.js"; import { normalizeReplyPayloadsForDelivery } from "./payloads.js"; export type { NormalizedOutboundPayload } from "./payloads.js"; @@ -74,12 +75,6 @@ type ChannelHandler = { sendMedia: (caption: string, mediaUrl: string) => Promise; }; -function throwIfAborted(abortSignal?: AbortSignal): void { - if (abortSignal?.aborted) { - throw new Error("Outbound delivery aborted"); - } -} - // Channel docking: outbound delivery delegates to plugin.outbound adapters. async function createChannelHandler(params: { cfg: OpenClawConfig; diff --git a/src/infra/outbound/message-action-runner.ts b/src/infra/outbound/message-action-runner.ts index 452c76bfa74..eddc7718708 100644 --- a/src/infra/outbound/message-action-runner.ts +++ b/src/infra/outbound/message-action-runner.ts @@ -28,6 +28,7 @@ import { type GatewayClientName, } from "../../utils/message-channel.js"; import { loadWebMedia } from "../../web/media.js"; +import { throwIfAborted } from "./abort.js"; import { listConfiguredMessageChannels, resolveMessageChannelSelection, @@ -720,14 +721,6 @@ async function handleBroadcastAction( }; } -function throwIfAborted(abortSignal?: AbortSignal): void { - if (abortSignal?.aborted) { - const err = new Error("Message send aborted"); - err.name = "AbortError"; - throw err; - } -} - async function handleSendAction(ctx: ResolvedActionContext): Promise { const { cfg, diff --git a/src/infra/outbound/outbound-send-service.ts b/src/infra/outbound/outbound-send-service.ts index cc9cb9476b8..587ab6890bd 100644 --- a/src/infra/outbound/outbound-send-service.ts +++ b/src/infra/outbound/outbound-send-service.ts @@ -6,6 +6,7 @@ import type { OutboundSendDeps } from "./deliver.js"; import type { MessagePollResult, MessageSendResult } from "./message.js"; import { dispatchChannelMessageAction } from "../../channels/plugins/message-actions.js"; import { appendAssistantMessageToSessionTranscript } from "../../config/sessions.js"; +import { throwIfAborted } from "./abort.js"; import { sendMessage, sendPoll } from "./message.js"; export type OutboundGatewayContext = { @@ -59,14 +60,6 @@ function extractToolPayload(result: AgentToolResult): unknown { return result.content ?? result; } -function throwIfAborted(abortSignal?: AbortSignal): void { - if (abortSignal?.aborted) { - const err = new Error("Message send aborted"); - err.name = "AbortError"; - throw err; - } -} - export async function executeSendAction(params: { ctx: OutboundSendContext; to: string;