diff --git a/CHANGELOG.md b/CHANGELOG.md index 79a5dec0456..dfe012b9065 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -55,6 +55,7 @@ Docs: https://docs.openclaw.ai - Telegram/media downloads: time out only stalled body reads so polling recovers from hung file downloads without aborting slow downloads that are still streaming data. (#40098) thanks @tysoncung. - Telegram/DM routing: dedupe inbound Telegram DMs per agent instead of per session key so the same DM cannot trigger duplicate replies when both `agent:main:main` and `agent:main:telegram:direct:` resolve for one agent. Fixes #40005. Supersedes #40116. (#40519) thanks @obviyus. - Matrix/DM routing: add safer fallback detection for broken `m.direct` homeservers, honor explicit room bindings over DM classification, and preserve room-bound agent selection for Matrix DM rooms. (#19736) Thanks @derbronko. +- Cron/Telegram announce delivery: route text-only announce jobs through the real outbound adapters after finalizing descendant output so plain Telegram targets no longer report `delivered: true` when no message actually reached Telegram. (#40575) thanks @obviyus. ## 2026.3.7 diff --git a/src/cron/isolated-agent.delivers-response-has-heartbeat-ok-but-includes.test.ts b/src/cron/isolated-agent.delivers-response-has-heartbeat-ok-but-includes.test.ts index 7b65101e8da..023c1e9eedc 100644 --- a/src/cron/isolated-agent.delivers-response-has-heartbeat-ok-but-includes.test.ts +++ b/src/cron/isolated-agent.delivers-response-has-heartbeat-ok-but-includes.test.ts @@ -4,6 +4,7 @@ import { withTempHome as withTempHomeBase } from "../../test/helpers/temp-home.j import { runEmbeddedPiAgent } from "../agents/pi-embedded.js"; import { runSubagentAnnounceFlow } from "../agents/subagent-announce.js"; import type { CliDeps } from "../cli/deps.js"; +import { callGateway } from "../gateway/call.js"; import { runCronIsolatedAgentTurn } from "./isolated-agent.js"; import { makeCfg, makeJob, writeSessionStore } from "./isolated-agent.test-harness.js"; import { setupIsolatedAgentTurnMocks } from "./isolated-agent.test-setup.js"; @@ -137,7 +138,7 @@ describe("runCronIsolatedAgentTurn", () => { }); }); - it("handles media heartbeat delivery and announce cleanup modes", async () => { + it("handles media heartbeat delivery and last-target text delivery", async () => { await withTempHome(async (home) => { const { storePath, deps } = await createTelegramDeliveryFixture(home); @@ -185,14 +186,18 @@ describe("runCronIsolatedAgentTurn", () => { }); expect(keepRes.status).toBe("ok"); - expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(1); - const keepArgs = vi.mocked(runSubagentAnnounceFlow).mock.calls[0]?.[0] as - | { cleanup?: "keep" | "delete" } - | undefined; - expect(keepArgs?.cleanup).toBe("keep"); - expect(deps.sendMessageTelegram).not.toHaveBeenCalled(); + expect(keepRes.delivered).toBe(true); + expect(runSubagentAnnounceFlow).not.toHaveBeenCalled(); + expect(deps.sendMessageTelegram).toHaveBeenCalledTimes(1); + expect(deps.sendMessageTelegram).toHaveBeenCalledWith( + "123", + "HEARTBEAT_OK 🦞", + expect.objectContaining({ accountId: undefined }), + ); + vi.mocked(deps.sendMessageTelegram).mockClear(); vi.mocked(runSubagentAnnounceFlow).mockClear(); + vi.mocked(callGateway).mockClear(); const deleteRes = await runCronIsolatedAgentTurn({ cfg, @@ -211,12 +216,25 @@ describe("runCronIsolatedAgentTurn", () => { }); expect(deleteRes.status).toBe("ok"); - expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(1); - const deleteArgs = vi.mocked(runSubagentAnnounceFlow).mock.calls[0]?.[0] as - | { cleanup?: "keep" | "delete" } - | undefined; - expect(deleteArgs?.cleanup).toBe("delete"); - expect(deps.sendMessageTelegram).not.toHaveBeenCalled(); + expect(deleteRes.delivered).toBe(true); + expect(runSubagentAnnounceFlow).not.toHaveBeenCalled(); + expect(deps.sendMessageTelegram).toHaveBeenCalledTimes(1); + expect(deps.sendMessageTelegram).toHaveBeenCalledWith( + "123", + "HEARTBEAT_OK 🦞", + expect.objectContaining({ accountId: undefined }), + ); + expect(callGateway).toHaveBeenCalledTimes(1); + expect(callGateway).toHaveBeenCalledWith( + expect.objectContaining({ + method: "sessions.delete", + params: expect.objectContaining({ + key: "agent:main:cron:job-1", + deleteTranscript: true, + emitLifecycleHooks: false, + }), + }), + ); }); }); @@ -243,70 +261,4 @@ describe("runCronIsolatedAgentTurn", () => { expect(runSubagentAnnounceFlow).not.toHaveBeenCalled(); }); }); - - it("uses a unique announce childRunId for each cron run", async () => { - await withTempHome(async (home) => { - const storePath = await writeSessionStore(home, { - lastProvider: "telegram", - lastChannel: "telegram", - lastTo: "123", - }); - const deps: CliDeps = { - sendMessageSlack: vi.fn(), - sendMessageWhatsApp: vi.fn(), - sendMessageTelegram: vi.fn(), - sendMessageDiscord: vi.fn(), - sendMessageSignal: vi.fn(), - sendMessageIMessage: vi.fn(), - }; - - vi.mocked(runEmbeddedPiAgent).mockResolvedValue({ - payloads: [{ text: "final summary" }], - meta: { - durationMs: 5, - agentMeta: { sessionId: "s", provider: "p", model: "m" }, - }, - }); - - const cfg = makeCfg(home, storePath); - const job = makeJob({ kind: "agentTurn", message: "do it" }); - job.delivery = { mode: "announce", channel: "last" }; - - const nowSpy = vi.spyOn(Date, "now"); - let now = Date.now(); - nowSpy.mockImplementation(() => now); - try { - await runCronIsolatedAgentTurn({ - cfg, - deps, - job, - message: "do it", - sessionKey: "cron:job-1", - lane: "cron", - }); - now += 5; - await runCronIsolatedAgentTurn({ - cfg, - deps, - job, - message: "do it", - sessionKey: "cron:job-1", - lane: "cron", - }); - } finally { - nowSpy.mockRestore(); - } - - expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(2); - const firstArgs = vi.mocked(runSubagentAnnounceFlow).mock.calls[0]?.[0] as - | { childRunId?: string } - | undefined; - const secondArgs = vi.mocked(runSubagentAnnounceFlow).mock.calls[1]?.[0] as - | { childRunId?: string } - | undefined; - expect(firstArgs?.childRunId).toBeTruthy(); - expect(secondArgs?.childRunId).toBeTruthy(); - expect(secondArgs?.childRunId).not.toBe(firstArgs?.childRunId); - }); - }); }); diff --git a/src/cron/isolated-agent.direct-delivery-core-channels.test.ts b/src/cron/isolated-agent.direct-delivery-core-channels.test.ts new file mode 100644 index 00000000000..1950e361068 --- /dev/null +++ b/src/cron/isolated-agent.direct-delivery-core-channels.test.ts @@ -0,0 +1,158 @@ +import "./isolated-agent.mocks.js"; +import { beforeEach, describe, expect, it } from "vitest"; +import { runSubagentAnnounceFlow } from "../agents/subagent-announce.js"; +import { discordOutbound } from "../channels/plugins/outbound/discord.js"; +import { imessageOutbound } from "../channels/plugins/outbound/imessage.js"; +import { signalOutbound } from "../channels/plugins/outbound/signal.js"; +import { slackOutbound } from "../channels/plugins/outbound/slack.js"; +import { telegramOutbound } from "../channels/plugins/outbound/telegram.js"; +import { whatsappOutbound } from "../channels/plugins/outbound/whatsapp.js"; +import type { CliDeps } from "../cli/deps.js"; +import { setActivePluginRegistry } from "../plugins/runtime.js"; +import { createOutboundTestPlugin, createTestRegistry } from "../test-utils/channel-plugins.js"; +import { createCliDeps, mockAgentPayloads } from "./isolated-agent.delivery.test-helpers.js"; +import { runCronIsolatedAgentTurn } from "./isolated-agent.js"; +import { + makeCfg, + makeJob, + withTempCronHome, + writeSessionStore, +} from "./isolated-agent.test-harness.js"; +import { setupIsolatedAgentTurnMocks } from "./isolated-agent.test-setup.js"; + +type ChannelCase = { + name: string; + channel: "slack" | "discord" | "whatsapp" | "imessage"; + to: string; + sendKey: keyof Pick< + CliDeps, + "sendMessageSlack" | "sendMessageDiscord" | "sendMessageWhatsApp" | "sendMessageIMessage" + >; + expectedTo: string; +}; + +const CASES: ChannelCase[] = [ + { + name: "Slack", + channel: "slack", + to: "channel:C12345", + sendKey: "sendMessageSlack", + expectedTo: "channel:C12345", + }, + { + name: "Discord", + channel: "discord", + to: "channel:789", + sendKey: "sendMessageDiscord", + expectedTo: "channel:789", + }, + { + name: "WhatsApp", + channel: "whatsapp", + to: "+15551234567", + sendKey: "sendMessageWhatsApp", + expectedTo: "+15551234567", + }, + { + name: "iMessage", + channel: "imessage", + to: "friend@example.com", + sendKey: "sendMessageIMessage", + expectedTo: "friend@example.com", + }, +]; + +async function runExplicitAnnounceTurn(params: { + home: string; + storePath: string; + deps: CliDeps; + channel: ChannelCase["channel"]; + to: string; +}) { + return await runCronIsolatedAgentTurn({ + cfg: makeCfg(params.home, params.storePath), + deps: params.deps, + job: { + ...makeJob({ kind: "agentTurn", message: "do it" }), + delivery: { + mode: "announce", + channel: params.channel, + to: params.to, + }, + }, + message: "do it", + sessionKey: "cron:job-1", + lane: "cron", + }); +} + +describe("runCronIsolatedAgentTurn core-channel direct delivery", () => { + beforeEach(() => { + setupIsolatedAgentTurnMocks(); + setActivePluginRegistry( + createTestRegistry([ + { + pluginId: "telegram", + plugin: createOutboundTestPlugin({ id: "telegram", outbound: telegramOutbound }), + source: "test", + }, + { + pluginId: "signal", + plugin: createOutboundTestPlugin({ id: "signal", outbound: signalOutbound }), + source: "test", + }, + { + pluginId: "slack", + plugin: createOutboundTestPlugin({ id: "slack", outbound: slackOutbound }), + source: "test", + }, + { + pluginId: "discord", + plugin: createOutboundTestPlugin({ id: "discord", outbound: discordOutbound }), + source: "test", + }, + { + pluginId: "whatsapp", + plugin: createOutboundTestPlugin({ id: "whatsapp", outbound: whatsappOutbound }), + source: "test", + }, + { + pluginId: "imessage", + plugin: createOutboundTestPlugin({ id: "imessage", outbound: imessageOutbound }), + source: "test", + }, + ]), + ); + }); + + for (const testCase of CASES) { + it(`routes ${testCase.name} text-only announce delivery through the outbound adapter`, async () => { + await withTempCronHome(async (home) => { + const storePath = await writeSessionStore(home, { lastProvider: "webchat", lastTo: "" }); + const deps = createCliDeps(); + mockAgentPayloads([{ text: "hello from cron" }]); + + const res = await runExplicitAnnounceTurn({ + home, + storePath, + deps, + channel: testCase.channel, + to: testCase.to, + }); + + expect(res.status).toBe("ok"); + expect(res.delivered).toBe(true); + expect(res.deliveryAttempted).toBe(true); + expect(runSubagentAnnounceFlow).not.toHaveBeenCalled(); + + const sendFn = deps[testCase.sendKey]; + expect(sendFn).toHaveBeenCalledTimes(1); + expect(sendFn).toHaveBeenCalledWith( + testCase.expectedTo, + "hello from cron", + expect.any(Object), + ); + }); + }); + } +}); diff --git a/src/cron/isolated-agent.direct-delivery-forum-topics.test.ts b/src/cron/isolated-agent.direct-delivery-forum-topics.test.ts index 7f7df209418..836369fedb6 100644 --- a/src/cron/isolated-agent.direct-delivery-forum-topics.test.ts +++ b/src/cron/isolated-agent.direct-delivery-forum-topics.test.ts @@ -48,12 +48,12 @@ describe("runCronIsolatedAgentTurn forum topic delivery", () => { }); expect(plainRes.status).toBe("ok"); - expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(1); - const announceArgs = vi.mocked(runSubagentAnnounceFlow).mock.calls[0]?.[0] as - | { expectsCompletionMessage?: boolean } - | undefined; - expect(announceArgs?.expectsCompletionMessage).toBe(true); - expect(deps.sendMessageTelegram).not.toHaveBeenCalled(); + expect(plainRes.delivered).toBe(true); + expect(runSubagentAnnounceFlow).not.toHaveBeenCalled(); + expectDirectTelegramDelivery(deps, { + chatId: "123", + text: "plain message", + }); }); }); }); diff --git a/src/cron/isolated-agent.mocks.ts b/src/cron/isolated-agent.mocks.ts index 913f5ab74d4..72e031dc3f4 100644 --- a/src/cron/isolated-agent.mocks.ts +++ b/src/cron/isolated-agent.mocks.ts @@ -26,5 +26,9 @@ vi.mock("../agents/subagent-announce.js", () => ({ runSubagentAnnounceFlow: vi.fn(), })); +vi.mock("../gateway/call.js", () => ({ + callGateway: vi.fn(), +})); + export const makeIsolatedAgentJob = makeIsolatedAgentJobFixture; export const makeIsolatedAgentParams = makeIsolatedAgentParamsFixture; diff --git a/src/cron/isolated-agent.skips-delivery-without-whatsapp-recipient-besteffortdeliver-true.test.ts b/src/cron/isolated-agent.skips-delivery-without-whatsapp-recipient-besteffortdeliver-true.test.ts index bc763a7a588..6b2ab85739a 100644 --- a/src/cron/isolated-agent.skips-delivery-without-whatsapp-recipient-besteffortdeliver-true.test.ts +++ b/src/cron/isolated-agent.skips-delivery-without-whatsapp-recipient-besteffortdeliver-true.test.ts @@ -104,7 +104,7 @@ async function expectStructuredTelegramFailure(params: { ); } -async function runAnnounceFlowResult(bestEffort: boolean) { +async function runTelegramDeliveryResult(bestEffort: boolean) { let outcome: | { res: Awaited>; @@ -113,7 +113,6 @@ async function runAnnounceFlowResult(bestEffort: boolean) { | undefined; await withTelegramAnnounceFixture(async ({ home, storePath, deps }) => { mockAgentPayloads([{ text: "hello from cron" }]); - vi.mocked(runSubagentAnnounceFlow).mockResolvedValueOnce(false); const res = await runTelegramAnnounceTurn({ home, storePath, @@ -128,12 +127,12 @@ async function runAnnounceFlowResult(bestEffort: boolean) { outcome = { res, deps }; }); if (!outcome) { - throw new Error("announce flow did not produce an outcome"); + throw new Error("telegram delivery did not produce an outcome"); } return outcome; } -async function runSignalAnnounceFlowResult(bestEffort: boolean) { +async function runSignalDeliveryResult(bestEffort: boolean) { let outcome: | { res: Awaited>; @@ -144,7 +143,6 @@ async function runSignalAnnounceFlowResult(bestEffort: boolean) { const storePath = await writeSessionStore(home, { lastProvider: "webchat", lastTo: "" }); const deps = createCliDeps(); mockAgentPayloads([{ text: "hello from cron" }]); - vi.mocked(runSubagentAnnounceFlow).mockResolvedValueOnce(false); const res = await runCronIsolatedAgentTurn({ cfg: makeCfg(home, storePath, { channels: { signal: {} }, @@ -166,12 +164,12 @@ async function runSignalAnnounceFlowResult(bestEffort: boolean) { outcome = { res, deps }; }); if (!outcome) { - throw new Error("signal announce flow did not produce an outcome"); + throw new Error("signal delivery did not produce an outcome"); } return outcome; } -async function assertExplicitTelegramTargetAnnounce(params: { +async function assertExplicitTelegramTargetDelivery(params: { home: string; storePath: string; deps: CliDeps; @@ -186,22 +184,11 @@ async function assertExplicitTelegramTargetAnnounce(params: { }); expectDeliveredOk(res); - expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(1); - const announceArgs = vi.mocked(runSubagentAnnounceFlow).mock.calls[0]?.[0] as - | { - requesterOrigin?: { channel?: string; to?: string }; - roundOneReply?: string; - bestEffortDeliver?: boolean; - } - | undefined; - expect(announceArgs?.requesterOrigin?.channel).toBe("telegram"); - expect(announceArgs?.requesterOrigin?.to).toBe("123"); - expect(announceArgs?.roundOneReply).toBe(params.expectedText); - expect(announceArgs?.bestEffortDeliver).toBe(false); - expect((announceArgs as { expectsCompletionMessage?: boolean })?.expectsCompletionMessage).toBe( - true, - ); - expect(params.deps.sendMessageTelegram).not.toHaveBeenCalled(); + expect(runSubagentAnnounceFlow).not.toHaveBeenCalled(); + expectDirectTelegramDelivery(params.deps, { + chatId: "123", + text: params.expectedText, + }); } describe("runCronIsolatedAgentTurn", () => { @@ -209,9 +196,9 @@ describe("runCronIsolatedAgentTurn", () => { setupIsolatedAgentTurnMocks(); }); - it("announces explicit targets with direct and final-payload text", async () => { + it("delivers explicit targets with direct and final-payload text", async () => { await withTelegramAnnounceFixture(async ({ home, storePath, deps }) => { - await assertExplicitTelegramTargetAnnounce({ + await assertExplicitTelegramTargetDelivery({ home, storePath, deps, @@ -219,7 +206,7 @@ describe("runCronIsolatedAgentTurn", () => { expectedText: "hello from cron", }); vi.clearAllMocks(); - await assertExplicitTelegramTargetAnnounce({ + await assertExplicitTelegramTargetDelivery({ home, storePath, deps, @@ -229,7 +216,7 @@ describe("runCronIsolatedAgentTurn", () => { }); }); - it("routes announce injection to the delivery-target session key", async () => { + it("delivers explicit targets directly with per-channel-peer session scoping", async () => { await withTelegramAnnounceFixture(async ({ home, storePath, deps }) => { mockAgentPayloads([{ text: "hello from cron" }]); @@ -254,17 +241,12 @@ describe("runCronIsolatedAgentTurn", () => { lane: "cron", }); - expect(res.status).toBe("ok"); - expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(1); - const announceArgs = vi.mocked(runSubagentAnnounceFlow).mock.calls[0]?.[0] as - | { - requesterSessionKey?: string; - requesterOrigin?: { channel?: string; to?: string }; - } - | undefined; - expect(announceArgs?.requesterSessionKey).toBe("agent:main:telegram:direct:123"); - expect(announceArgs?.requesterOrigin?.channel).toBe("telegram"); - expect(announceArgs?.requesterOrigin?.to).toBe("123"); + expectDeliveredOk(res); + expect(runSubagentAnnounceFlow).not.toHaveBeenCalled(); + expectDirectTelegramDelivery(deps, { + chatId: "123", + text: "hello from cron", + }); }); }); @@ -359,12 +341,42 @@ describe("runCronIsolatedAgentTurn", () => { }); }); - it("falls back to direct delivery when announce reports false and best-effort is disabled", async () => { + it("reports not-delivered when text direct delivery fails and best-effort is enabled", async () => { + await withTelegramAnnounceFixture( + async ({ home, storePath, deps }) => { + mockAgentPayloads([{ text: "hello from cron" }]); + + const res = await runTelegramAnnounceTurn({ + home, + storePath, + deps, + delivery: { + mode: "announce", + channel: "telegram", + to: "123", + bestEffort: true, + }, + }); + + expect(res.status).toBe("ok"); + expect(res.delivered).toBe(false); + expect(res.deliveryAttempted).toBe(true); + expect(runSubagentAnnounceFlow).not.toHaveBeenCalled(); + expect(deps.sendMessageTelegram).toHaveBeenCalledTimes(1); + }, + { + deps: { + sendMessageTelegram: vi.fn().mockRejectedValue(new Error("boom")), + }, + }, + ); + }); + + it("delivers text directly when best-effort is disabled", async () => { await withTempHome(async (home) => { const storePath = await writeSessionStore(home, { lastProvider: "webchat", lastTo: "" }); const deps = createCliDeps(); mockAgentPayloads([{ text: "hello from cron" }]); - vi.mocked(runSubagentAnnounceFlow).mockResolvedValueOnce(false); const res = await runTelegramAnnounceTurn({ home, @@ -378,63 +390,124 @@ describe("runCronIsolatedAgentTurn", () => { }, }); - // When announce delivery fails, the direct-delivery fallback fires - // so the message still reaches the target channel. expect(res.status).toBe("ok"); expect(res.delivered).toBe(true); expect(res.deliveryAttempted).toBe(true); - expect(deps.sendMessageTelegram).toHaveBeenCalledTimes(1); + expect(runSubagentAnnounceFlow).not.toHaveBeenCalled(); + expectDirectTelegramDelivery(deps, { + chatId: "123", + text: "hello from cron", + }); }); }); - it("falls back to direct delivery when announce reports false and best-effort is enabled", async () => { - const { res, deps } = await runAnnounceFlowResult(true); - expect(res.status).toBe("ok"); - expect(res.delivered).toBe(true); - expect(res.deliveryAttempted).toBe(true); - expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(1); - expect(deps.sendMessageTelegram).toHaveBeenCalledTimes(1); + it("returns error when text direct delivery fails and best-effort is disabled", async () => { + await withTelegramAnnounceFixture( + async ({ home, storePath, deps }) => { + mockAgentPayloads([{ text: "hello from cron" }]); + + const res = await runTelegramAnnounceTurn({ + home, + storePath, + deps, + delivery: { + mode: "announce", + channel: "telegram", + to: "123", + bestEffort: false, + }, + }); + + expect(res.status).toBe("error"); + expect(res.delivered).toBeUndefined(); + expect(res.deliveryAttempted).toBe(true); + expect(res.error).toContain("boom"); + expect(runSubagentAnnounceFlow).not.toHaveBeenCalled(); + expect(deps.sendMessageTelegram).toHaveBeenCalledTimes(1); + }, + { + deps: { + sendMessageTelegram: vi.fn().mockRejectedValue(new Error("boom")), + }, + }, + ); }); - it("falls back to direct delivery for signal when announce reports false and best-effort is enabled", async () => { - const { res, deps } = await runSignalAnnounceFlowResult(true); - expect(res.status).toBe("ok"); - expect(res.delivered).toBe(true); - expect(res.deliveryAttempted).toBe(true); - expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(1); - expect(deps.sendMessageSignal).toHaveBeenCalledTimes(1); - }); + it("retries transient text direct delivery failures before succeeding", async () => { + const previousFastMode = process.env.OPENCLAW_TEST_FAST; + process.env.OPENCLAW_TEST_FAST = "1"; + try { + await withTelegramAnnounceFixture( + async ({ home, storePath, deps }) => { + mockAgentPayloads([{ text: "hello from cron" }]); - it("falls back to direct delivery when announce flow throws and best-effort is disabled", async () => { - await withTempHome(async (home) => { - const storePath = await writeSessionStore(home, { lastProvider: "webchat", lastTo: "" }); - const deps = createCliDeps(); - mockAgentPayloads([{ text: "hello from cron" }]); - vi.mocked(runSubagentAnnounceFlow).mockRejectedValueOnce( - new Error("gateway closed (1008): pairing required"), + const res = await runTelegramAnnounceTurn({ + home, + storePath, + deps, + delivery: { + mode: "announce", + channel: "telegram", + to: "123", + bestEffort: false, + }, + }); + + expect(res.status).toBe("ok"); + expect(res.delivered).toBe(true); + expect(res.deliveryAttempted).toBe(true); + expect(runSubagentAnnounceFlow).not.toHaveBeenCalled(); + expect(deps.sendMessageTelegram).toHaveBeenCalledTimes(2); + expect(deps.sendMessageTelegram).toHaveBeenLastCalledWith( + "123", + "hello from cron", + expect.objectContaining({ cfg: expect.any(Object) }), + ); + }, + { + deps: { + sendMessageTelegram: vi + .fn() + .mockRejectedValueOnce(new Error("UNAVAILABLE: temporary network error")) + .mockResolvedValue({ messageId: 7, chatId: "123", text: "hello from cron" }), + }, + }, ); + } finally { + if (previousFastMode === undefined) { + delete process.env.OPENCLAW_TEST_FAST; + } else { + process.env.OPENCLAW_TEST_FAST = previousFastMode; + } + } + }); - const res = await runTelegramAnnounceTurn({ - home, - storePath, - deps, - delivery: { - mode: "announce", - channel: "telegram", - to: "123", - bestEffort: false, - }, - }); - - // When announce throws (e.g. "pairing required"), the direct-delivery - // fallback fires so the message still reaches the target channel. - expect(res.status).toBe("ok"); - expect(res.delivered).toBe(true); - expect(res.deliveryAttempted).toBe(true); - expect(deps.sendMessageTelegram).toHaveBeenCalledTimes(1); + it("delivers text directly when best-effort is enabled", async () => { + const { res, deps } = await runTelegramDeliveryResult(true); + expect(res.status).toBe("ok"); + expect(res.delivered).toBe(true); + expect(res.deliveryAttempted).toBe(true); + expect(runSubagentAnnounceFlow).not.toHaveBeenCalled(); + expectDirectTelegramDelivery(deps, { + chatId: "123", + text: "hello from cron", }); }); + it("delivers text directly for signal when best-effort is enabled", async () => { + const { res, deps } = await runSignalDeliveryResult(true); + expect(res.status).toBe("ok"); + expect(res.delivered).toBe(true); + expect(res.deliveryAttempted).toBe(true); + expect(runSubagentAnnounceFlow).not.toHaveBeenCalled(); + expect(deps.sendMessageSignal).toHaveBeenCalledTimes(1); + expect(deps.sendMessageSignal).toHaveBeenCalledWith( + "+15551234567", + "hello from cron", + expect.any(Object), + ); + }); + it("ignores structured direct delivery failures when best-effort is enabled", async () => { await expectBestEffortTelegramNotDelivered({ text: "hello from cron", diff --git a/src/cron/isolated-agent.test-setup.ts b/src/cron/isolated-agent.test-setup.ts index 6a776b323d9..e6357531ad3 100644 --- a/src/cron/isolated-agent.test-setup.ts +++ b/src/cron/isolated-agent.test-setup.ts @@ -4,6 +4,7 @@ import { runEmbeddedPiAgent } from "../agents/pi-embedded.js"; import { runSubagentAnnounceFlow } from "../agents/subagent-announce.js"; import { signalOutbound } from "../channels/plugins/outbound/signal.js"; import { telegramOutbound } from "../channels/plugins/outbound/telegram.js"; +import { callGateway } from "../gateway/call.js"; import { setActivePluginRegistry } from "../plugins/runtime.js"; import { createOutboundTestPlugin, createTestRegistry } from "../test-utils/channel-plugins.js"; @@ -14,6 +15,7 @@ export function setupIsolatedAgentTurnMocks(params?: { fast?: boolean }): void { vi.mocked(runEmbeddedPiAgent).mockReset(); vi.mocked(loadModelCatalog).mockResolvedValue([]); vi.mocked(runSubagentAnnounceFlow).mockReset().mockResolvedValue(true); + vi.mocked(callGateway).mockReset().mockResolvedValue({ ok: true, deleted: true }); setActivePluginRegistry( createTestRegistry([ { diff --git a/src/cron/isolated-agent/delivery-dispatch.double-announce.test.ts b/src/cron/isolated-agent/delivery-dispatch.double-announce.test.ts index abaf1ae5349..f9a7d90a276 100644 --- a/src/cron/isolated-agent/delivery-dispatch.double-announce.test.ts +++ b/src/cron/isolated-agent/delivery-dispatch.double-announce.test.ts @@ -1,10 +1,10 @@ /** * Tests for the double-announce bug in cron delivery dispatch. * - * Bug: early return paths in deliverViaAnnounce (active subagent suppression + * Bug: early return paths in text finalization (active subagent suppression * and stale interim message suppression) returned without setting * deliveryAttempted = true. The timer saw deliveryAttempted = false and - * fired enqueueSystemEvent as a fallback, causing a second announcement. + * fired enqueueSystemEvent as a fallback, causing a second delivery. * * Fix: both early return paths now set deliveryAttempted = true before * returning so the timer correctly skips the system-event fallback. @@ -14,23 +14,10 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; // --- Module mocks (must be hoisted before imports) --- -vi.mock("../../agents/subagent-announce.js", () => ({ - runSubagentAnnounceFlow: vi.fn().mockResolvedValue(true), -})); - vi.mock("../../agents/subagent-registry.js", () => ({ countActiveDescendantRuns: vi.fn().mockReturnValue(0), })); -vi.mock("../../config/sessions.js", () => ({ - resolveAgentMainSessionKey: vi.fn().mockReturnValue("agent:main"), -})); - -vi.mock("../../infra/outbound/outbound-session.js", () => ({ - resolveOutboundSessionRoute: vi.fn().mockResolvedValue(null), - ensureOutboundSessionEntry: vi.fn().mockResolvedValue(undefined), -})); - vi.mock("../../infra/outbound/deliver.js", () => ({ deliverOutboundPayloads: vi.fn().mockResolvedValue([{ ok: true }]), })); @@ -58,9 +45,9 @@ vi.mock("./subagent-followup.js", () => ({ waitForDescendantSubagentSummary: vi.fn().mockResolvedValue(undefined), })); -import { runSubagentAnnounceFlow } from "../../agents/subagent-announce.js"; // Import after mocks import { countActiveDescendantRuns } from "../../agents/subagent-registry.js"; +import { deliverOutboundPayloads } from "../../infra/outbound/deliver.js"; import { shouldEnqueueCronMainSummary } from "../heartbeat-policy.js"; import { dispatchCronDelivery } from "./delivery-dispatch.js"; import type { DeliveryTargetResolution } from "./delivery-target.js"; @@ -145,7 +132,6 @@ describe("dispatchCronDelivery — double-announce guard", () => { vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false); vi.mocked(readDescendantSubagentFallbackReply).mockResolvedValue(undefined); vi.mocked(waitForDescendantSubagentSummary).mockResolvedValue(undefined); - vi.mocked(runSubagentAnnounceFlow).mockResolvedValue(true); }); it("early return (active subagent) sets deliveryAttempted=true so timer skips enqueueSystemEvent", async () => { @@ -173,7 +159,7 @@ describe("dispatchCronDelivery — double-announce guard", () => { ).toBe(false); // No announce should have been attempted (subagents still running) - expect(runSubagentAnnounceFlow).not.toHaveBeenCalled(); + expect(deliverOutboundPayloads).not.toHaveBeenCalled(); }); it("early return (stale interim suppression) sets deliveryAttempted=true so timer skips enqueueSystemEvent", async () => { @@ -204,45 +190,42 @@ describe("dispatchCronDelivery — double-announce guard", () => { }), ).toBe(false); - // No announce or direct delivery should have been sent (stale interim suppressed) - expect(runSubagentAnnounceFlow).not.toHaveBeenCalled(); + // No direct delivery should have been sent (stale interim suppressed) + expect(deliverOutboundPayloads).not.toHaveBeenCalled(); }); - it("consolidates descendant output into the cron announce path", async () => { + it("consolidates descendant output into the final direct delivery", async () => { vi.mocked(countActiveDescendantRuns).mockReturnValue(0); vi.mocked(isLikelyInterimCronMessage).mockReturnValue(true); vi.mocked(readDescendantSubagentFallbackReply).mockResolvedValue( "Detailed child result, everything finished successfully.", ); - vi.mocked(runSubagentAnnounceFlow).mockResolvedValue(true); const params = makeBaseParams({ synthesizedText: "on it" }); const state = await dispatchCronDelivery(params); expect(state.deliveryAttempted).toBe(true); expect(state.delivered).toBe(true); - expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(1); - expect(runSubagentAnnounceFlow).toHaveBeenCalledWith( + expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1); + expect(deliverOutboundPayloads).toHaveBeenCalledWith( expect.objectContaining({ - roundOneReply: "Detailed child result, everything finished successfully.", - expectsCompletionMessage: true, - announceType: "cron job", + channel: "telegram", + to: "123456", + payloads: [{ text: "Detailed child result, everything finished successfully." }], }), ); }); - it("normal announce success delivers exactly once and sets deliveryAttempted=true", async () => { + it("normal text delivery sends exactly once and sets deliveryAttempted=true", async () => { vi.mocked(countActiveDescendantRuns).mockReturnValue(0); vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false); - vi.mocked(runSubagentAnnounceFlow).mockResolvedValue(true); const params = makeBaseParams({ synthesizedText: "Morning briefing complete." }); const state = await dispatchCronDelivery(params); expect(state.deliveryAttempted).toBe(true); expect(state.delivered).toBe(true); - // Announce called exactly once - expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(1); + expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1); // Timer should not fire enqueueSystemEvent (delivered=true) expect( @@ -257,13 +240,9 @@ describe("dispatchCronDelivery — double-announce guard", () => { ).toBe(false); }); - it("announce failure falls back to direct delivery exactly once (no double-deliver)", async () => { + it("text delivery fires exactly once (no double-deliver)", async () => { vi.mocked(countActiveDescendantRuns).mockReturnValue(0); vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false); - // Announce fails: runSubagentAnnounceFlow returns false - vi.mocked(runSubagentAnnounceFlow).mockResolvedValue(false); - - const { deliverOutboundPayloads } = await import("../../infra/outbound/deliver.js"); vi.mocked(deliverOutboundPayloads).mockResolvedValue([{ ok: true } as never]); const params = makeBaseParams({ synthesizedText: "Briefing ready." }); @@ -273,23 +252,17 @@ describe("dispatchCronDelivery — double-announce guard", () => { expect(state.deliveryAttempted).toBe(true); expect(state.delivered).toBe(true); - // Announce was tried exactly once - expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(1); - - // Direct fallback fired exactly once (not zero, not twice) - // This ensures one delivery total reaches the user, not two expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1); }); - it("no delivery requested means deliveryAttempted stays false and runSubagentAnnounceFlow not called", async () => { + it("no delivery requested means deliveryAttempted stays false and no delivery is sent", async () => { const params = makeBaseParams({ synthesizedText: "Task done.", deliveryRequested: false, }); const state = await dispatchCronDelivery(params); - expect(runSubagentAnnounceFlow).not.toHaveBeenCalled(); - // deliveryAttempted starts false (skipMessagingToolDelivery=false) and nothing runs + expect(deliverOutboundPayloads).not.toHaveBeenCalled(); expect(state.deliveryAttempted).toBe(false); }); }); diff --git a/src/cron/isolated-agent/delivery-dispatch.ts b/src/cron/isolated-agent/delivery-dispatch.ts index fffa5fcb8b8..a3a98b245d0 100644 --- a/src/cron/isolated-agent/delivery-dispatch.ts +++ b/src/cron/isolated-agent/delivery-dispatch.ts @@ -1,16 +1,12 @@ -import { runSubagentAnnounceFlow } from "../../agents/subagent-announce.js"; import { countActiveDescendantRuns } from "../../agents/subagent-registry.js"; import { SILENT_REPLY_TOKEN } from "../../auto-reply/tokens.js"; import type { ReplyPayload } from "../../auto-reply/types.js"; import { createOutboundSendDeps, type CliDeps } from "../../cli/outbound-send-deps.js"; import type { OpenClawConfig } from "../../config/config.js"; -import { resolveAgentMainSessionKey } from "../../config/sessions.js"; +import { callGateway } from "../../gateway/call.js"; +import { sleepWithAbort } from "../../infra/backoff.js"; import { deliverOutboundPayloads } from "../../infra/outbound/deliver.js"; import { resolveAgentOutboundIdentity } from "../../infra/outbound/identity.js"; -import { - ensureOutboundSessionEntry, - resolveOutboundSessionRoute, -} from "../../infra/outbound/outbound-session.js"; import { buildOutboundSessionContext } from "../../infra/outbound/session-context.js"; import { logWarn } from "../../logger.js"; import type { CronJob, CronRunTelemetry } from "../types.js"; @@ -71,53 +67,6 @@ export function resolveCronDeliveryBestEffort(job: CronJob): boolean { return false; } -async function resolveCronAnnounceSessionKey(params: { - cfg: OpenClawConfig; - agentId: string; - fallbackSessionKey: string; - delivery: { - channel: NonNullable; - to?: string; - accountId?: string; - threadId?: string | number; - }; -}): Promise { - const to = params.delivery.to?.trim(); - if (!to) { - return params.fallbackSessionKey; - } - try { - const route = await resolveOutboundSessionRoute({ - cfg: params.cfg, - channel: params.delivery.channel, - agentId: params.agentId, - accountId: params.delivery.accountId, - target: to, - threadId: params.delivery.threadId, - }); - const resolved = route?.sessionKey?.trim(); - if (route && resolved) { - // Ensure the session entry exists so downstream announce / queue delivery - // can look up channel metadata (lastChannel, to, sessionId). Named agents - // may not have a session entry for this target yet, causing announce - // delivery to silently fail (#32432). - await ensureOutboundSessionEntry({ - cfg: params.cfg, - agentId: params.agentId, - channel: params.delivery.channel, - accountId: params.delivery.accountId, - route, - }).catch(() => { - // Best-effort: don't block delivery on session entry creation. - }); - return resolved; - } - } catch { - // Fall back to main session routing if announce session resolution fails. - } - return params.fallbackSessionKey; -} - export type SuccessfulDeliveryTarget = Extract; type DispatchCronDeliveryParams = { @@ -160,6 +109,86 @@ export type DispatchCronDeliveryState = { deliveryPayloads: ReplyPayload[]; }; +const TRANSIENT_DIRECT_CRON_DELIVERY_ERROR_PATTERNS: readonly RegExp[] = [ + /\berrorcode=unavailable\b/i, + /\bstatus\s*[:=]\s*"?unavailable\b/i, + /\bUNAVAILABLE\b/, + /no active .* listener/i, + /gateway not connected/i, + /gateway closed \(1006/i, + /gateway timeout/i, + /\b(econnreset|econnrefused|etimedout|enotfound|ehostunreach|network error)\b/i, +]; + +const PERMANENT_DIRECT_CRON_DELIVERY_ERROR_PATTERNS: readonly RegExp[] = [ + /unsupported channel/i, + /unknown channel/i, + /chat not found/i, + /user not found/i, + /bot was blocked by the user/i, + /forbidden: bot was kicked/i, + /recipient is not a valid/i, + /outbound not configured for channel/i, +]; + +function summarizeDirectCronDeliveryError(error: unknown): string { + if (error instanceof Error) { + return error.message || "error"; + } + if (typeof error === "string") { + return error; + } + try { + return JSON.stringify(error) || String(error); + } catch { + return String(error); + } +} + +function isTransientDirectCronDeliveryError(error: unknown): boolean { + const message = summarizeDirectCronDeliveryError(error); + if (!message) { + return false; + } + if (PERMANENT_DIRECT_CRON_DELIVERY_ERROR_PATTERNS.some((re) => re.test(message))) { + return false; + } + return TRANSIENT_DIRECT_CRON_DELIVERY_ERROR_PATTERNS.some((re) => re.test(message)); +} + +function resolveDirectCronRetryDelaysMs(): readonly number[] { + return process.env.OPENCLAW_TEST_FAST === "1" ? [8, 16, 32] : [5_000, 10_000, 20_000]; +} + +async function retryTransientDirectCronDelivery(params: { + jobId: string; + signal?: AbortSignal; + run: () => Promise; +}): Promise { + const retryDelaysMs = resolveDirectCronRetryDelaysMs(); + let retryIndex = 0; + for (;;) { + if (params.signal?.aborted) { + throw new Error("cron delivery aborted"); + } + try { + return await params.run(); + } catch (err) { + const delayMs = retryDelaysMs[retryIndex]; + if (delayMs == null || !isTransientDirectCronDeliveryError(err) || params.signal?.aborted) { + throw err; + } + const nextAttempt = retryIndex + 2; + const maxAttempts = retryDelaysMs.length + 1; + logWarn( + `[cron:${params.jobId}] transient direct announce delivery failure, retrying ${nextAttempt}/${maxAttempts} in ${Math.round(delayMs / 1000)}s: ${summarizeDirectCronDeliveryError(err)}`, + ); + retryIndex += 1; + await sleepWithAbort(delayMs, params.signal); + } + } +} + export async function dispatchCronDelivery( params: DispatchCronDeliveryParams, ): Promise { @@ -172,12 +201,6 @@ export async function dispatchCronDelivery( // Keep this strict so timer fallback can safely decide whether to wake main. let delivered = params.skipMessagingToolDelivery; let deliveryAttempted = params.skipMessagingToolDelivery; - // Tracks whether `runSubagentAnnounceFlow` was actually called. Early - // returns from `deliverViaAnnounce` (active subagents, interim suppression, - // SILENT_REPLY_TOKEN) are intentional suppressions — not delivery failures — - // so the direct-delivery fallback must only fire when the announce send was - // actually attempted and failed. - let announceDeliveryWasAttempted = false; const failDeliveryTarget = (error: string) => params.withRunSession({ status: "error", @@ -191,6 +214,7 @@ export async function dispatchCronDelivery( const deliverViaDirect = async ( delivery: SuccessfulDeliveryTarget, + options?: { retryTransient?: boolean }, ): Promise => { const identity = resolveAgentOutboundIdentity(params.cfgWithAgentDefaults, params.agentId); try { @@ -217,19 +241,27 @@ export async function dispatchCronDelivery( agentId: params.agentId, sessionKey: params.agentSessionKey, }); - const deliveryResults = await deliverOutboundPayloads({ - cfg: params.cfgWithAgentDefaults, - channel: delivery.channel, - to: delivery.to, - accountId: delivery.accountId, - threadId: delivery.threadId, - payloads: payloadsForDelivery, - session: deliverySession, - identity, - bestEffort: params.deliveryBestEffort, - deps: createOutboundSendDeps(params.deps), - abortSignal: params.abortSignal, - }); + const runDelivery = async () => + await deliverOutboundPayloads({ + cfg: params.cfgWithAgentDefaults, + channel: delivery.channel, + to: delivery.to, + accountId: delivery.accountId, + threadId: delivery.threadId, + payloads: payloadsForDelivery, + session: deliverySession, + identity, + bestEffort: params.deliveryBestEffort, + deps: createOutboundSendDeps(params.deps), + abortSignal: params.abortSignal, + }); + const deliveryResults = options?.retryTransient + ? await retryTransientDirectCronDelivery({ + jobId: params.job.id, + signal: params.abortSignal, + run: runDelivery, + }) + : await runDelivery(); delivered = deliveryResults.length > 0; return null; } catch (err) { @@ -247,31 +279,31 @@ export async function dispatchCronDelivery( } }; - const deliverViaAnnounce = async ( + const finalizeTextDelivery = async ( delivery: SuccessfulDeliveryTarget, ): Promise => { + const cleanupDirectCronSessionIfNeeded = async (): Promise => { + if (!params.job.deleteAfterRun) { + return; + } + try { + await callGateway({ + method: "sessions.delete", + params: { + key: params.agentSessionKey, + deleteTranscript: true, + emitLifecycleHooks: false, + }, + timeoutMs: 10_000, + }); + } catch { + // Best-effort; direct delivery result should still be returned. + } + }; + if (!synthesizedText) { return null; } - const announceMainSessionKey = resolveAgentMainSessionKey({ - cfg: params.cfg, - agentId: params.agentId, - }); - const announceSessionKey = await resolveCronAnnounceSessionKey({ - cfg: params.cfgWithAgentDefaults, - agentId: params.agentId, - fallbackSessionKey: announceMainSessionKey, - delivery: { - channel: delivery.channel, - to: delivery.to, - accountId: delivery.accountId, - threadId: delivery.threadId, - }, - }); - const taskLabel = - typeof params.job.name === "string" && params.job.name.trim() - ? params.job.name.trim() - : `cron:${params.job.id}`; const initialSynthesizedText = synthesizedText.trim(); let activeSubagentRuns = countActiveDescendantRuns(params.agentSessionKey); const expectedSubagentFollowup = expectsSubagentFollowup(initialSynthesizedText); @@ -357,84 +389,19 @@ export async function dispatchCronDelivery( ...params.telemetry, }); } - try { - if (params.isAborted()) { - return params.withRunSession({ - status: "error", - error: params.abortReason(), - deliveryAttempted, - ...params.telemetry, - }); - } - deliveryAttempted = true; - announceDeliveryWasAttempted = true; - const didAnnounce = await runSubagentAnnounceFlow({ - childSessionKey: params.agentSessionKey, - childRunId: `${params.job.id}:${params.runSessionId}:${params.runStartedAt}`, - requesterSessionKey: announceSessionKey, - requesterOrigin: { - channel: delivery.channel, - to: delivery.to, - accountId: delivery.accountId, - threadId: delivery.threadId, - }, - requesterDisplayKey: announceSessionKey, - task: taskLabel, - timeoutMs: params.timeoutMs, - cleanup: params.job.deleteAfterRun ? "delete" : "keep", - roundOneReply: synthesizedText, - // Cron output is a finished completion message: send it directly to the - // target channel via the completion-direct-send path rather than injecting - // a trigger message into the (likely idle) main agent session. - expectsCompletionMessage: true, - // Keep delivery outcome truthful for cron state: if outbound send fails, - // announce flow must report false so caller can apply best-effort policy. - bestEffortDeliver: false, - waitForCompletion: false, - startedAt: params.runStartedAt, - endedAt: params.runEndedAt, - outcome: { status: "ok" }, - announceType: "cron job", - signal: params.abortSignal, + if (params.isAborted()) { + return params.withRunSession({ + status: "error", + error: params.abortReason(), + deliveryAttempted, + ...params.telemetry, }); - if (didAnnounce) { - delivered = true; - } else { - // Announce delivery failed but the agent execution itself succeeded. - // Return ok so the job isn't penalized for a transient delivery issue - // (e.g. "pairing required" when no active client session exists). - // Delivery failure is tracked separately via delivered/deliveryAttempted. - const message = "cron announce delivery failed"; - logWarn(`[cron:${params.job.id}] ${message}`); - if (!params.deliveryBestEffort) { - return params.withRunSession({ - status: "ok", - summary, - outputText, - error: message, - delivered: false, - deliveryAttempted, - ...params.telemetry, - }); - } - } - } catch (err) { - // Same as above: announce delivery errors should not mark a successful - // agent execution as failed. - logWarn(`[cron:${params.job.id}] ${String(err)}`); - if (!params.deliveryBestEffort) { - return params.withRunSession({ - status: "ok", - summary, - outputText, - error: String(err), - delivered: false, - deliveryAttempted, - ...params.telemetry, - }); - } } - return null; + try { + return await deliverViaDirect(delivery, { retryTransient: true }); + } finally { + await cleanupDirectCronSessionIfNeeded(); + } }; if ( @@ -472,14 +439,9 @@ export async function dispatchCronDelivery( }; } - // Route text-only cron announce output back through the main session so it - // follows the same system-message injection path as subagent completions. - // Keep direct outbound delivery only for structured payloads (media/channel - // data), which cannot be represented by the shared announce flow. - // - // Forum/topic targets should also use direct delivery. Announce flow can - // be swallowed by ANNOUNCE_SKIP/NO_REPLY in the target agent turn, which - // silently drops cron output for topic-bound sessions. + // Finalize descendant/subagent output first for text-only cron runs, then + // send through the real outbound adapter so delivered=true always reflects + // an actual channel send instead of internal announce routing. const useDirectDelivery = params.deliveryPayloadHasStructuredContent || params.resolvedDelivery.threadId != null; if (useDirectDelivery) { @@ -496,41 +458,10 @@ export async function dispatchCronDelivery( }; } } else { - const announceResult = await deliverViaAnnounce(params.resolvedDelivery); - // Fall back to direct delivery only when the announce send was actually - // attempted and failed. Early returns from deliverViaAnnounce (active - // subagents, interim suppression, SILENT_REPLY_TOKEN) are intentional - // suppressions that must NOT trigger direct delivery — doing so would - // bypass the suppression guard and leak partial/stale content. - if (announceDeliveryWasAttempted && !delivered && !params.isAborted()) { - const directFallback = await deliverViaDirect(params.resolvedDelivery); - if (directFallback) { - return { - result: directFallback, - delivered, - deliveryAttempted, - summary, - outputText, - synthesizedText, - deliveryPayloads, - }; - } - // If direct delivery succeeded (returned null without error), - // `delivered` has been set to true by deliverViaDirect. - if (delivered) { - return { - delivered, - deliveryAttempted, - summary, - outputText, - synthesizedText, - deliveryPayloads, - }; - } - } - if (announceResult) { + const finalizedTextResult = await finalizeTextDelivery(params.resolvedDelivery); + if (finalizedTextResult) { return { - result: announceResult, + result: finalizedTextResult, delivered, deliveryAttempted, summary,