Fix cron text announce delivery for Telegram targets (#40575)

Merged via squash.

Prepared head SHA: 54b1513c78
Co-authored-by: obviyus <22031114+obviyus@users.noreply.github.com>
Co-authored-by: obviyus <22031114+obviyus@users.noreply.github.com>
Reviewed-by: @obviyus
This commit is contained in:
Ayaan Zaidi 2026-03-09 10:26:17 +05:30 committed by GitHub
parent d4a960fcca
commit a40c29b11a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 516 additions and 422 deletions

View File

@ -55,6 +55,7 @@ Docs: https://docs.openclaw.ai
- Telegram/media downloads: time out only stalled body reads so polling recovers from hung file downloads without aborting slow downloads that are still streaming data. (#40098) thanks @tysoncung.
- Telegram/DM routing: dedupe inbound Telegram DMs per agent instead of per session key so the same DM cannot trigger duplicate replies when both `agent:main:main` and `agent:main:telegram:direct:<id>` resolve for one agent. Fixes #40005. Supersedes #40116. (#40519) thanks @obviyus.
- Matrix/DM routing: add safer fallback detection for broken `m.direct` homeservers, honor explicit room bindings over DM classification, and preserve room-bound agent selection for Matrix DM rooms. (#19736) Thanks @derbronko.
- Cron/Telegram announce delivery: route text-only announce jobs through the real outbound adapters after finalizing descendant output so plain Telegram targets no longer report `delivered: true` when no message actually reached Telegram. (#40575) thanks @obviyus.
## 2026.3.7

View File

@ -4,6 +4,7 @@ import { withTempHome as withTempHomeBase } from "../../test/helpers/temp-home.j
import { runEmbeddedPiAgent } from "../agents/pi-embedded.js";
import { runSubagentAnnounceFlow } from "../agents/subagent-announce.js";
import type { CliDeps } from "../cli/deps.js";
import { callGateway } from "../gateway/call.js";
import { runCronIsolatedAgentTurn } from "./isolated-agent.js";
import { makeCfg, makeJob, writeSessionStore } from "./isolated-agent.test-harness.js";
import { setupIsolatedAgentTurnMocks } from "./isolated-agent.test-setup.js";
@ -137,7 +138,7 @@ describe("runCronIsolatedAgentTurn", () => {
});
});
it("handles media heartbeat delivery and announce cleanup modes", async () => {
it("handles media heartbeat delivery and last-target text delivery", async () => {
await withTempHome(async (home) => {
const { storePath, deps } = await createTelegramDeliveryFixture(home);
@ -185,14 +186,18 @@ describe("runCronIsolatedAgentTurn", () => {
});
expect(keepRes.status).toBe("ok");
expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(1);
const keepArgs = vi.mocked(runSubagentAnnounceFlow).mock.calls[0]?.[0] as
| { cleanup?: "keep" | "delete" }
| undefined;
expect(keepArgs?.cleanup).toBe("keep");
expect(deps.sendMessageTelegram).not.toHaveBeenCalled();
expect(keepRes.delivered).toBe(true);
expect(runSubagentAnnounceFlow).not.toHaveBeenCalled();
expect(deps.sendMessageTelegram).toHaveBeenCalledTimes(1);
expect(deps.sendMessageTelegram).toHaveBeenCalledWith(
"123",
"HEARTBEAT_OK 🦞",
expect.objectContaining({ accountId: undefined }),
);
vi.mocked(deps.sendMessageTelegram).mockClear();
vi.mocked(runSubagentAnnounceFlow).mockClear();
vi.mocked(callGateway).mockClear();
const deleteRes = await runCronIsolatedAgentTurn({
cfg,
@ -211,12 +216,25 @@ describe("runCronIsolatedAgentTurn", () => {
});
expect(deleteRes.status).toBe("ok");
expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(1);
const deleteArgs = vi.mocked(runSubagentAnnounceFlow).mock.calls[0]?.[0] as
| { cleanup?: "keep" | "delete" }
| undefined;
expect(deleteArgs?.cleanup).toBe("delete");
expect(deps.sendMessageTelegram).not.toHaveBeenCalled();
expect(deleteRes.delivered).toBe(true);
expect(runSubagentAnnounceFlow).not.toHaveBeenCalled();
expect(deps.sendMessageTelegram).toHaveBeenCalledTimes(1);
expect(deps.sendMessageTelegram).toHaveBeenCalledWith(
"123",
"HEARTBEAT_OK 🦞",
expect.objectContaining({ accountId: undefined }),
);
expect(callGateway).toHaveBeenCalledTimes(1);
expect(callGateway).toHaveBeenCalledWith(
expect.objectContaining({
method: "sessions.delete",
params: expect.objectContaining({
key: "agent:main:cron:job-1",
deleteTranscript: true,
emitLifecycleHooks: false,
}),
}),
);
});
});
@ -243,70 +261,4 @@ describe("runCronIsolatedAgentTurn", () => {
expect(runSubagentAnnounceFlow).not.toHaveBeenCalled();
});
});
it("uses a unique announce childRunId for each cron run", async () => {
await withTempHome(async (home) => {
const storePath = await writeSessionStore(home, {
lastProvider: "telegram",
lastChannel: "telegram",
lastTo: "123",
});
const deps: CliDeps = {
sendMessageSlack: vi.fn(),
sendMessageWhatsApp: vi.fn(),
sendMessageTelegram: vi.fn(),
sendMessageDiscord: vi.fn(),
sendMessageSignal: vi.fn(),
sendMessageIMessage: vi.fn(),
};
vi.mocked(runEmbeddedPiAgent).mockResolvedValue({
payloads: [{ text: "final summary" }],
meta: {
durationMs: 5,
agentMeta: { sessionId: "s", provider: "p", model: "m" },
},
});
const cfg = makeCfg(home, storePath);
const job = makeJob({ kind: "agentTurn", message: "do it" });
job.delivery = { mode: "announce", channel: "last" };
const nowSpy = vi.spyOn(Date, "now");
let now = Date.now();
nowSpy.mockImplementation(() => now);
try {
await runCronIsolatedAgentTurn({
cfg,
deps,
job,
message: "do it",
sessionKey: "cron:job-1",
lane: "cron",
});
now += 5;
await runCronIsolatedAgentTurn({
cfg,
deps,
job,
message: "do it",
sessionKey: "cron:job-1",
lane: "cron",
});
} finally {
nowSpy.mockRestore();
}
expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(2);
const firstArgs = vi.mocked(runSubagentAnnounceFlow).mock.calls[0]?.[0] as
| { childRunId?: string }
| undefined;
const secondArgs = vi.mocked(runSubagentAnnounceFlow).mock.calls[1]?.[0] as
| { childRunId?: string }
| undefined;
expect(firstArgs?.childRunId).toBeTruthy();
expect(secondArgs?.childRunId).toBeTruthy();
expect(secondArgs?.childRunId).not.toBe(firstArgs?.childRunId);
});
});
});

View File

@ -0,0 +1,158 @@
import "./isolated-agent.mocks.js";
import { beforeEach, describe, expect, it } from "vitest";
import { runSubagentAnnounceFlow } from "../agents/subagent-announce.js";
import { discordOutbound } from "../channels/plugins/outbound/discord.js";
import { imessageOutbound } from "../channels/plugins/outbound/imessage.js";
import { signalOutbound } from "../channels/plugins/outbound/signal.js";
import { slackOutbound } from "../channels/plugins/outbound/slack.js";
import { telegramOutbound } from "../channels/plugins/outbound/telegram.js";
import { whatsappOutbound } from "../channels/plugins/outbound/whatsapp.js";
import type { CliDeps } from "../cli/deps.js";
import { setActivePluginRegistry } from "../plugins/runtime.js";
import { createOutboundTestPlugin, createTestRegistry } from "../test-utils/channel-plugins.js";
import { createCliDeps, mockAgentPayloads } from "./isolated-agent.delivery.test-helpers.js";
import { runCronIsolatedAgentTurn } from "./isolated-agent.js";
import {
makeCfg,
makeJob,
withTempCronHome,
writeSessionStore,
} from "./isolated-agent.test-harness.js";
import { setupIsolatedAgentTurnMocks } from "./isolated-agent.test-setup.js";
type ChannelCase = {
name: string;
channel: "slack" | "discord" | "whatsapp" | "imessage";
to: string;
sendKey: keyof Pick<
CliDeps,
"sendMessageSlack" | "sendMessageDiscord" | "sendMessageWhatsApp" | "sendMessageIMessage"
>;
expectedTo: string;
};
const CASES: ChannelCase[] = [
{
name: "Slack",
channel: "slack",
to: "channel:C12345",
sendKey: "sendMessageSlack",
expectedTo: "channel:C12345",
},
{
name: "Discord",
channel: "discord",
to: "channel:789",
sendKey: "sendMessageDiscord",
expectedTo: "channel:789",
},
{
name: "WhatsApp",
channel: "whatsapp",
to: "+15551234567",
sendKey: "sendMessageWhatsApp",
expectedTo: "+15551234567",
},
{
name: "iMessage",
channel: "imessage",
to: "friend@example.com",
sendKey: "sendMessageIMessage",
expectedTo: "friend@example.com",
},
];
async function runExplicitAnnounceTurn(params: {
home: string;
storePath: string;
deps: CliDeps;
channel: ChannelCase["channel"];
to: string;
}) {
return await runCronIsolatedAgentTurn({
cfg: makeCfg(params.home, params.storePath),
deps: params.deps,
job: {
...makeJob({ kind: "agentTurn", message: "do it" }),
delivery: {
mode: "announce",
channel: params.channel,
to: params.to,
},
},
message: "do it",
sessionKey: "cron:job-1",
lane: "cron",
});
}
describe("runCronIsolatedAgentTurn core-channel direct delivery", () => {
beforeEach(() => {
setupIsolatedAgentTurnMocks();
setActivePluginRegistry(
createTestRegistry([
{
pluginId: "telegram",
plugin: createOutboundTestPlugin({ id: "telegram", outbound: telegramOutbound }),
source: "test",
},
{
pluginId: "signal",
plugin: createOutboundTestPlugin({ id: "signal", outbound: signalOutbound }),
source: "test",
},
{
pluginId: "slack",
plugin: createOutboundTestPlugin({ id: "slack", outbound: slackOutbound }),
source: "test",
},
{
pluginId: "discord",
plugin: createOutboundTestPlugin({ id: "discord", outbound: discordOutbound }),
source: "test",
},
{
pluginId: "whatsapp",
plugin: createOutboundTestPlugin({ id: "whatsapp", outbound: whatsappOutbound }),
source: "test",
},
{
pluginId: "imessage",
plugin: createOutboundTestPlugin({ id: "imessage", outbound: imessageOutbound }),
source: "test",
},
]),
);
});
for (const testCase of CASES) {
it(`routes ${testCase.name} text-only announce delivery through the outbound adapter`, async () => {
await withTempCronHome(async (home) => {
const storePath = await writeSessionStore(home, { lastProvider: "webchat", lastTo: "" });
const deps = createCliDeps();
mockAgentPayloads([{ text: "hello from cron" }]);
const res = await runExplicitAnnounceTurn({
home,
storePath,
deps,
channel: testCase.channel,
to: testCase.to,
});
expect(res.status).toBe("ok");
expect(res.delivered).toBe(true);
expect(res.deliveryAttempted).toBe(true);
expect(runSubagentAnnounceFlow).not.toHaveBeenCalled();
const sendFn = deps[testCase.sendKey];
expect(sendFn).toHaveBeenCalledTimes(1);
expect(sendFn).toHaveBeenCalledWith(
testCase.expectedTo,
"hello from cron",
expect.any(Object),
);
});
});
}
});

View File

@ -48,12 +48,12 @@ describe("runCronIsolatedAgentTurn forum topic delivery", () => {
});
expect(plainRes.status).toBe("ok");
expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(1);
const announceArgs = vi.mocked(runSubagentAnnounceFlow).mock.calls[0]?.[0] as
| { expectsCompletionMessage?: boolean }
| undefined;
expect(announceArgs?.expectsCompletionMessage).toBe(true);
expect(deps.sendMessageTelegram).not.toHaveBeenCalled();
expect(plainRes.delivered).toBe(true);
expect(runSubagentAnnounceFlow).not.toHaveBeenCalled();
expectDirectTelegramDelivery(deps, {
chatId: "123",
text: "plain message",
});
});
});
});

View File

@ -26,5 +26,9 @@ vi.mock("../agents/subagent-announce.js", () => ({
runSubagentAnnounceFlow: vi.fn(),
}));
vi.mock("../gateway/call.js", () => ({
callGateway: vi.fn(),
}));
export const makeIsolatedAgentJob = makeIsolatedAgentJobFixture;
export const makeIsolatedAgentParams = makeIsolatedAgentParamsFixture;

View File

@ -104,7 +104,7 @@ async function expectStructuredTelegramFailure(params: {
);
}
async function runAnnounceFlowResult(bestEffort: boolean) {
async function runTelegramDeliveryResult(bestEffort: boolean) {
let outcome:
| {
res: Awaited<ReturnType<typeof runCronIsolatedAgentTurn>>;
@ -113,7 +113,6 @@ async function runAnnounceFlowResult(bestEffort: boolean) {
| undefined;
await withTelegramAnnounceFixture(async ({ home, storePath, deps }) => {
mockAgentPayloads([{ text: "hello from cron" }]);
vi.mocked(runSubagentAnnounceFlow).mockResolvedValueOnce(false);
const res = await runTelegramAnnounceTurn({
home,
storePath,
@ -128,12 +127,12 @@ async function runAnnounceFlowResult(bestEffort: boolean) {
outcome = { res, deps };
});
if (!outcome) {
throw new Error("announce flow did not produce an outcome");
throw new Error("telegram delivery did not produce an outcome");
}
return outcome;
}
async function runSignalAnnounceFlowResult(bestEffort: boolean) {
async function runSignalDeliveryResult(bestEffort: boolean) {
let outcome:
| {
res: Awaited<ReturnType<typeof runCronIsolatedAgentTurn>>;
@ -144,7 +143,6 @@ async function runSignalAnnounceFlowResult(bestEffort: boolean) {
const storePath = await writeSessionStore(home, { lastProvider: "webchat", lastTo: "" });
const deps = createCliDeps();
mockAgentPayloads([{ text: "hello from cron" }]);
vi.mocked(runSubagentAnnounceFlow).mockResolvedValueOnce(false);
const res = await runCronIsolatedAgentTurn({
cfg: makeCfg(home, storePath, {
channels: { signal: {} },
@ -166,12 +164,12 @@ async function runSignalAnnounceFlowResult(bestEffort: boolean) {
outcome = { res, deps };
});
if (!outcome) {
throw new Error("signal announce flow did not produce an outcome");
throw new Error("signal delivery did not produce an outcome");
}
return outcome;
}
async function assertExplicitTelegramTargetAnnounce(params: {
async function assertExplicitTelegramTargetDelivery(params: {
home: string;
storePath: string;
deps: CliDeps;
@ -186,22 +184,11 @@ async function assertExplicitTelegramTargetAnnounce(params: {
});
expectDeliveredOk(res);
expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(1);
const announceArgs = vi.mocked(runSubagentAnnounceFlow).mock.calls[0]?.[0] as
| {
requesterOrigin?: { channel?: string; to?: string };
roundOneReply?: string;
bestEffortDeliver?: boolean;
}
| undefined;
expect(announceArgs?.requesterOrigin?.channel).toBe("telegram");
expect(announceArgs?.requesterOrigin?.to).toBe("123");
expect(announceArgs?.roundOneReply).toBe(params.expectedText);
expect(announceArgs?.bestEffortDeliver).toBe(false);
expect((announceArgs as { expectsCompletionMessage?: boolean })?.expectsCompletionMessage).toBe(
true,
);
expect(params.deps.sendMessageTelegram).not.toHaveBeenCalled();
expect(runSubagentAnnounceFlow).not.toHaveBeenCalled();
expectDirectTelegramDelivery(params.deps, {
chatId: "123",
text: params.expectedText,
});
}
describe("runCronIsolatedAgentTurn", () => {
@ -209,9 +196,9 @@ describe("runCronIsolatedAgentTurn", () => {
setupIsolatedAgentTurnMocks();
});
it("announces explicit targets with direct and final-payload text", async () => {
it("delivers explicit targets with direct and final-payload text", async () => {
await withTelegramAnnounceFixture(async ({ home, storePath, deps }) => {
await assertExplicitTelegramTargetAnnounce({
await assertExplicitTelegramTargetDelivery({
home,
storePath,
deps,
@ -219,7 +206,7 @@ describe("runCronIsolatedAgentTurn", () => {
expectedText: "hello from cron",
});
vi.clearAllMocks();
await assertExplicitTelegramTargetAnnounce({
await assertExplicitTelegramTargetDelivery({
home,
storePath,
deps,
@ -229,7 +216,7 @@ describe("runCronIsolatedAgentTurn", () => {
});
});
it("routes announce injection to the delivery-target session key", async () => {
it("delivers explicit targets directly with per-channel-peer session scoping", async () => {
await withTelegramAnnounceFixture(async ({ home, storePath, deps }) => {
mockAgentPayloads([{ text: "hello from cron" }]);
@ -254,17 +241,12 @@ describe("runCronIsolatedAgentTurn", () => {
lane: "cron",
});
expect(res.status).toBe("ok");
expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(1);
const announceArgs = vi.mocked(runSubagentAnnounceFlow).mock.calls[0]?.[0] as
| {
requesterSessionKey?: string;
requesterOrigin?: { channel?: string; to?: string };
}
| undefined;
expect(announceArgs?.requesterSessionKey).toBe("agent:main:telegram:direct:123");
expect(announceArgs?.requesterOrigin?.channel).toBe("telegram");
expect(announceArgs?.requesterOrigin?.to).toBe("123");
expectDeliveredOk(res);
expect(runSubagentAnnounceFlow).not.toHaveBeenCalled();
expectDirectTelegramDelivery(deps, {
chatId: "123",
text: "hello from cron",
});
});
});
@ -359,12 +341,42 @@ describe("runCronIsolatedAgentTurn", () => {
});
});
it("falls back to direct delivery when announce reports false and best-effort is disabled", async () => {
it("reports not-delivered when text direct delivery fails and best-effort is enabled", async () => {
await withTelegramAnnounceFixture(
async ({ home, storePath, deps }) => {
mockAgentPayloads([{ text: "hello from cron" }]);
const res = await runTelegramAnnounceTurn({
home,
storePath,
deps,
delivery: {
mode: "announce",
channel: "telegram",
to: "123",
bestEffort: true,
},
});
expect(res.status).toBe("ok");
expect(res.delivered).toBe(false);
expect(res.deliveryAttempted).toBe(true);
expect(runSubagentAnnounceFlow).not.toHaveBeenCalled();
expect(deps.sendMessageTelegram).toHaveBeenCalledTimes(1);
},
{
deps: {
sendMessageTelegram: vi.fn().mockRejectedValue(new Error("boom")),
},
},
);
});
it("delivers text directly when best-effort is disabled", async () => {
await withTempHome(async (home) => {
const storePath = await writeSessionStore(home, { lastProvider: "webchat", lastTo: "" });
const deps = createCliDeps();
mockAgentPayloads([{ text: "hello from cron" }]);
vi.mocked(runSubagentAnnounceFlow).mockResolvedValueOnce(false);
const res = await runTelegramAnnounceTurn({
home,
@ -378,63 +390,124 @@ describe("runCronIsolatedAgentTurn", () => {
},
});
// When announce delivery fails, the direct-delivery fallback fires
// so the message still reaches the target channel.
expect(res.status).toBe("ok");
expect(res.delivered).toBe(true);
expect(res.deliveryAttempted).toBe(true);
expect(deps.sendMessageTelegram).toHaveBeenCalledTimes(1);
expect(runSubagentAnnounceFlow).not.toHaveBeenCalled();
expectDirectTelegramDelivery(deps, {
chatId: "123",
text: "hello from cron",
});
});
});
it("falls back to direct delivery when announce reports false and best-effort is enabled", async () => {
const { res, deps } = await runAnnounceFlowResult(true);
expect(res.status).toBe("ok");
expect(res.delivered).toBe(true);
expect(res.deliveryAttempted).toBe(true);
expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(1);
expect(deps.sendMessageTelegram).toHaveBeenCalledTimes(1);
it("returns error when text direct delivery fails and best-effort is disabled", async () => {
await withTelegramAnnounceFixture(
async ({ home, storePath, deps }) => {
mockAgentPayloads([{ text: "hello from cron" }]);
const res = await runTelegramAnnounceTurn({
home,
storePath,
deps,
delivery: {
mode: "announce",
channel: "telegram",
to: "123",
bestEffort: false,
},
});
expect(res.status).toBe("error");
expect(res.delivered).toBeUndefined();
expect(res.deliveryAttempted).toBe(true);
expect(res.error).toContain("boom");
expect(runSubagentAnnounceFlow).not.toHaveBeenCalled();
expect(deps.sendMessageTelegram).toHaveBeenCalledTimes(1);
},
{
deps: {
sendMessageTelegram: vi.fn().mockRejectedValue(new Error("boom")),
},
},
);
});
it("falls back to direct delivery for signal when announce reports false and best-effort is enabled", async () => {
const { res, deps } = await runSignalAnnounceFlowResult(true);
expect(res.status).toBe("ok");
expect(res.delivered).toBe(true);
expect(res.deliveryAttempted).toBe(true);
expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(1);
expect(deps.sendMessageSignal).toHaveBeenCalledTimes(1);
});
it("retries transient text direct delivery failures before succeeding", async () => {
const previousFastMode = process.env.OPENCLAW_TEST_FAST;
process.env.OPENCLAW_TEST_FAST = "1";
try {
await withTelegramAnnounceFixture(
async ({ home, storePath, deps }) => {
mockAgentPayloads([{ text: "hello from cron" }]);
it("falls back to direct delivery when announce flow throws and best-effort is disabled", async () => {
await withTempHome(async (home) => {
const storePath = await writeSessionStore(home, { lastProvider: "webchat", lastTo: "" });
const deps = createCliDeps();
mockAgentPayloads([{ text: "hello from cron" }]);
vi.mocked(runSubagentAnnounceFlow).mockRejectedValueOnce(
new Error("gateway closed (1008): pairing required"),
const res = await runTelegramAnnounceTurn({
home,
storePath,
deps,
delivery: {
mode: "announce",
channel: "telegram",
to: "123",
bestEffort: false,
},
});
expect(res.status).toBe("ok");
expect(res.delivered).toBe(true);
expect(res.deliveryAttempted).toBe(true);
expect(runSubagentAnnounceFlow).not.toHaveBeenCalled();
expect(deps.sendMessageTelegram).toHaveBeenCalledTimes(2);
expect(deps.sendMessageTelegram).toHaveBeenLastCalledWith(
"123",
"hello from cron",
expect.objectContaining({ cfg: expect.any(Object) }),
);
},
{
deps: {
sendMessageTelegram: vi
.fn()
.mockRejectedValueOnce(new Error("UNAVAILABLE: temporary network error"))
.mockResolvedValue({ messageId: 7, chatId: "123", text: "hello from cron" }),
},
},
);
} finally {
if (previousFastMode === undefined) {
delete process.env.OPENCLAW_TEST_FAST;
} else {
process.env.OPENCLAW_TEST_FAST = previousFastMode;
}
}
});
const res = await runTelegramAnnounceTurn({
home,
storePath,
deps,
delivery: {
mode: "announce",
channel: "telegram",
to: "123",
bestEffort: false,
},
});
// When announce throws (e.g. "pairing required"), the direct-delivery
// fallback fires so the message still reaches the target channel.
expect(res.status).toBe("ok");
expect(res.delivered).toBe(true);
expect(res.deliveryAttempted).toBe(true);
expect(deps.sendMessageTelegram).toHaveBeenCalledTimes(1);
it("delivers text directly when best-effort is enabled", async () => {
const { res, deps } = await runTelegramDeliveryResult(true);
expect(res.status).toBe("ok");
expect(res.delivered).toBe(true);
expect(res.deliveryAttempted).toBe(true);
expect(runSubagentAnnounceFlow).not.toHaveBeenCalled();
expectDirectTelegramDelivery(deps, {
chatId: "123",
text: "hello from cron",
});
});
it("delivers text directly for signal when best-effort is enabled", async () => {
const { res, deps } = await runSignalDeliveryResult(true);
expect(res.status).toBe("ok");
expect(res.delivered).toBe(true);
expect(res.deliveryAttempted).toBe(true);
expect(runSubagentAnnounceFlow).not.toHaveBeenCalled();
expect(deps.sendMessageSignal).toHaveBeenCalledTimes(1);
expect(deps.sendMessageSignal).toHaveBeenCalledWith(
"+15551234567",
"hello from cron",
expect.any(Object),
);
});
it("ignores structured direct delivery failures when best-effort is enabled", async () => {
await expectBestEffortTelegramNotDelivered({
text: "hello from cron",

View File

@ -4,6 +4,7 @@ import { runEmbeddedPiAgent } from "../agents/pi-embedded.js";
import { runSubagentAnnounceFlow } from "../agents/subagent-announce.js";
import { signalOutbound } from "../channels/plugins/outbound/signal.js";
import { telegramOutbound } from "../channels/plugins/outbound/telegram.js";
import { callGateway } from "../gateway/call.js";
import { setActivePluginRegistry } from "../plugins/runtime.js";
import { createOutboundTestPlugin, createTestRegistry } from "../test-utils/channel-plugins.js";
@ -14,6 +15,7 @@ export function setupIsolatedAgentTurnMocks(params?: { fast?: boolean }): void {
vi.mocked(runEmbeddedPiAgent).mockReset();
vi.mocked(loadModelCatalog).mockResolvedValue([]);
vi.mocked(runSubagentAnnounceFlow).mockReset().mockResolvedValue(true);
vi.mocked(callGateway).mockReset().mockResolvedValue({ ok: true, deleted: true });
setActivePluginRegistry(
createTestRegistry([
{

View File

@ -1,10 +1,10 @@
/**
* Tests for the double-announce bug in cron delivery dispatch.
*
* Bug: early return paths in deliverViaAnnounce (active subagent suppression
* Bug: early return paths in text finalization (active subagent suppression
* and stale interim message suppression) returned without setting
* deliveryAttempted = true. The timer saw deliveryAttempted = false and
* fired enqueueSystemEvent as a fallback, causing a second announcement.
* fired enqueueSystemEvent as a fallback, causing a second delivery.
*
* Fix: both early return paths now set deliveryAttempted = true before
* returning so the timer correctly skips the system-event fallback.
@ -14,23 +14,10 @@ import { beforeEach, describe, expect, it, vi } from "vitest";
// --- Module mocks (must be hoisted before imports) ---
vi.mock("../../agents/subagent-announce.js", () => ({
runSubagentAnnounceFlow: vi.fn().mockResolvedValue(true),
}));
vi.mock("../../agents/subagent-registry.js", () => ({
countActiveDescendantRuns: vi.fn().mockReturnValue(0),
}));
vi.mock("../../config/sessions.js", () => ({
resolveAgentMainSessionKey: vi.fn().mockReturnValue("agent:main"),
}));
vi.mock("../../infra/outbound/outbound-session.js", () => ({
resolveOutboundSessionRoute: vi.fn().mockResolvedValue(null),
ensureOutboundSessionEntry: vi.fn().mockResolvedValue(undefined),
}));
vi.mock("../../infra/outbound/deliver.js", () => ({
deliverOutboundPayloads: vi.fn().mockResolvedValue([{ ok: true }]),
}));
@ -58,9 +45,9 @@ vi.mock("./subagent-followup.js", () => ({
waitForDescendantSubagentSummary: vi.fn().mockResolvedValue(undefined),
}));
import { runSubagentAnnounceFlow } from "../../agents/subagent-announce.js";
// Import after mocks
import { countActiveDescendantRuns } from "../../agents/subagent-registry.js";
import { deliverOutboundPayloads } from "../../infra/outbound/deliver.js";
import { shouldEnqueueCronMainSummary } from "../heartbeat-policy.js";
import { dispatchCronDelivery } from "./delivery-dispatch.js";
import type { DeliveryTargetResolution } from "./delivery-target.js";
@ -145,7 +132,6 @@ describe("dispatchCronDelivery — double-announce guard", () => {
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
vi.mocked(readDescendantSubagentFallbackReply).mockResolvedValue(undefined);
vi.mocked(waitForDescendantSubagentSummary).mockResolvedValue(undefined);
vi.mocked(runSubagentAnnounceFlow).mockResolvedValue(true);
});
it("early return (active subagent) sets deliveryAttempted=true so timer skips enqueueSystemEvent", async () => {
@ -173,7 +159,7 @@ describe("dispatchCronDelivery — double-announce guard", () => {
).toBe(false);
// No announce should have been attempted (subagents still running)
expect(runSubagentAnnounceFlow).not.toHaveBeenCalled();
expect(deliverOutboundPayloads).not.toHaveBeenCalled();
});
it("early return (stale interim suppression) sets deliveryAttempted=true so timer skips enqueueSystemEvent", async () => {
@ -204,45 +190,42 @@ describe("dispatchCronDelivery — double-announce guard", () => {
}),
).toBe(false);
// No announce or direct delivery should have been sent (stale interim suppressed)
expect(runSubagentAnnounceFlow).not.toHaveBeenCalled();
// No direct delivery should have been sent (stale interim suppressed)
expect(deliverOutboundPayloads).not.toHaveBeenCalled();
});
it("consolidates descendant output into the cron announce path", async () => {
it("consolidates descendant output into the final direct delivery", async () => {
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(true);
vi.mocked(readDescendantSubagentFallbackReply).mockResolvedValue(
"Detailed child result, everything finished successfully.",
);
vi.mocked(runSubagentAnnounceFlow).mockResolvedValue(true);
const params = makeBaseParams({ synthesizedText: "on it" });
const state = await dispatchCronDelivery(params);
expect(state.deliveryAttempted).toBe(true);
expect(state.delivered).toBe(true);
expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(1);
expect(runSubagentAnnounceFlow).toHaveBeenCalledWith(
expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1);
expect(deliverOutboundPayloads).toHaveBeenCalledWith(
expect.objectContaining({
roundOneReply: "Detailed child result, everything finished successfully.",
expectsCompletionMessage: true,
announceType: "cron job",
channel: "telegram",
to: "123456",
payloads: [{ text: "Detailed child result, everything finished successfully." }],
}),
);
});
it("normal announce success delivers exactly once and sets deliveryAttempted=true", async () => {
it("normal text delivery sends exactly once and sets deliveryAttempted=true", async () => {
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
vi.mocked(runSubagentAnnounceFlow).mockResolvedValue(true);
const params = makeBaseParams({ synthesizedText: "Morning briefing complete." });
const state = await dispatchCronDelivery(params);
expect(state.deliveryAttempted).toBe(true);
expect(state.delivered).toBe(true);
// Announce called exactly once
expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(1);
expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1);
// Timer should not fire enqueueSystemEvent (delivered=true)
expect(
@ -257,13 +240,9 @@ describe("dispatchCronDelivery — double-announce guard", () => {
).toBe(false);
});
it("announce failure falls back to direct delivery exactly once (no double-deliver)", async () => {
it("text delivery fires exactly once (no double-deliver)", async () => {
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
// Announce fails: runSubagentAnnounceFlow returns false
vi.mocked(runSubagentAnnounceFlow).mockResolvedValue(false);
const { deliverOutboundPayloads } = await import("../../infra/outbound/deliver.js");
vi.mocked(deliverOutboundPayloads).mockResolvedValue([{ ok: true } as never]);
const params = makeBaseParams({ synthesizedText: "Briefing ready." });
@ -273,23 +252,17 @@ describe("dispatchCronDelivery — double-announce guard", () => {
expect(state.deliveryAttempted).toBe(true);
expect(state.delivered).toBe(true);
// Announce was tried exactly once
expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(1);
// Direct fallback fired exactly once (not zero, not twice)
// This ensures one delivery total reaches the user, not two
expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1);
});
it("no delivery requested means deliveryAttempted stays false and runSubagentAnnounceFlow not called", async () => {
it("no delivery requested means deliveryAttempted stays false and no delivery is sent", async () => {
const params = makeBaseParams({
synthesizedText: "Task done.",
deliveryRequested: false,
});
const state = await dispatchCronDelivery(params);
expect(runSubagentAnnounceFlow).not.toHaveBeenCalled();
// deliveryAttempted starts false (skipMessagingToolDelivery=false) and nothing runs
expect(deliverOutboundPayloads).not.toHaveBeenCalled();
expect(state.deliveryAttempted).toBe(false);
});
});

View File

@ -1,16 +1,12 @@
import { runSubagentAnnounceFlow } from "../../agents/subagent-announce.js";
import { countActiveDescendantRuns } from "../../agents/subagent-registry.js";
import { SILENT_REPLY_TOKEN } from "../../auto-reply/tokens.js";
import type { ReplyPayload } from "../../auto-reply/types.js";
import { createOutboundSendDeps, type CliDeps } from "../../cli/outbound-send-deps.js";
import type { OpenClawConfig } from "../../config/config.js";
import { resolveAgentMainSessionKey } from "../../config/sessions.js";
import { callGateway } from "../../gateway/call.js";
import { sleepWithAbort } from "../../infra/backoff.js";
import { deliverOutboundPayloads } from "../../infra/outbound/deliver.js";
import { resolveAgentOutboundIdentity } from "../../infra/outbound/identity.js";
import {
ensureOutboundSessionEntry,
resolveOutboundSessionRoute,
} from "../../infra/outbound/outbound-session.js";
import { buildOutboundSessionContext } from "../../infra/outbound/session-context.js";
import { logWarn } from "../../logger.js";
import type { CronJob, CronRunTelemetry } from "../types.js";
@ -71,53 +67,6 @@ export function resolveCronDeliveryBestEffort(job: CronJob): boolean {
return false;
}
async function resolveCronAnnounceSessionKey(params: {
cfg: OpenClawConfig;
agentId: string;
fallbackSessionKey: string;
delivery: {
channel: NonNullable<DeliveryTargetResolution["channel"]>;
to?: string;
accountId?: string;
threadId?: string | number;
};
}): Promise<string> {
const to = params.delivery.to?.trim();
if (!to) {
return params.fallbackSessionKey;
}
try {
const route = await resolveOutboundSessionRoute({
cfg: params.cfg,
channel: params.delivery.channel,
agentId: params.agentId,
accountId: params.delivery.accountId,
target: to,
threadId: params.delivery.threadId,
});
const resolved = route?.sessionKey?.trim();
if (route && resolved) {
// Ensure the session entry exists so downstream announce / queue delivery
// can look up channel metadata (lastChannel, to, sessionId). Named agents
// may not have a session entry for this target yet, causing announce
// delivery to silently fail (#32432).
await ensureOutboundSessionEntry({
cfg: params.cfg,
agentId: params.agentId,
channel: params.delivery.channel,
accountId: params.delivery.accountId,
route,
}).catch(() => {
// Best-effort: don't block delivery on session entry creation.
});
return resolved;
}
} catch {
// Fall back to main session routing if announce session resolution fails.
}
return params.fallbackSessionKey;
}
export type SuccessfulDeliveryTarget = Extract<DeliveryTargetResolution, { ok: true }>;
type DispatchCronDeliveryParams = {
@ -160,6 +109,86 @@ export type DispatchCronDeliveryState = {
deliveryPayloads: ReplyPayload[];
};
const TRANSIENT_DIRECT_CRON_DELIVERY_ERROR_PATTERNS: readonly RegExp[] = [
/\berrorcode=unavailable\b/i,
/\bstatus\s*[:=]\s*"?unavailable\b/i,
/\bUNAVAILABLE\b/,
/no active .* listener/i,
/gateway not connected/i,
/gateway closed \(1006/i,
/gateway timeout/i,
/\b(econnreset|econnrefused|etimedout|enotfound|ehostunreach|network error)\b/i,
];
const PERMANENT_DIRECT_CRON_DELIVERY_ERROR_PATTERNS: readonly RegExp[] = [
/unsupported channel/i,
/unknown channel/i,
/chat not found/i,
/user not found/i,
/bot was blocked by the user/i,
/forbidden: bot was kicked/i,
/recipient is not a valid/i,
/outbound not configured for channel/i,
];
function summarizeDirectCronDeliveryError(error: unknown): string {
if (error instanceof Error) {
return error.message || "error";
}
if (typeof error === "string") {
return error;
}
try {
return JSON.stringify(error) || String(error);
} catch {
return String(error);
}
}
function isTransientDirectCronDeliveryError(error: unknown): boolean {
const message = summarizeDirectCronDeliveryError(error);
if (!message) {
return false;
}
if (PERMANENT_DIRECT_CRON_DELIVERY_ERROR_PATTERNS.some((re) => re.test(message))) {
return false;
}
return TRANSIENT_DIRECT_CRON_DELIVERY_ERROR_PATTERNS.some((re) => re.test(message));
}
function resolveDirectCronRetryDelaysMs(): readonly number[] {
return process.env.OPENCLAW_TEST_FAST === "1" ? [8, 16, 32] : [5_000, 10_000, 20_000];
}
async function retryTransientDirectCronDelivery<T>(params: {
jobId: string;
signal?: AbortSignal;
run: () => Promise<T>;
}): Promise<T> {
const retryDelaysMs = resolveDirectCronRetryDelaysMs();
let retryIndex = 0;
for (;;) {
if (params.signal?.aborted) {
throw new Error("cron delivery aborted");
}
try {
return await params.run();
} catch (err) {
const delayMs = retryDelaysMs[retryIndex];
if (delayMs == null || !isTransientDirectCronDeliveryError(err) || params.signal?.aborted) {
throw err;
}
const nextAttempt = retryIndex + 2;
const maxAttempts = retryDelaysMs.length + 1;
logWarn(
`[cron:${params.jobId}] transient direct announce delivery failure, retrying ${nextAttempt}/${maxAttempts} in ${Math.round(delayMs / 1000)}s: ${summarizeDirectCronDeliveryError(err)}`,
);
retryIndex += 1;
await sleepWithAbort(delayMs, params.signal);
}
}
}
export async function dispatchCronDelivery(
params: DispatchCronDeliveryParams,
): Promise<DispatchCronDeliveryState> {
@ -172,12 +201,6 @@ export async function dispatchCronDelivery(
// Keep this strict so timer fallback can safely decide whether to wake main.
let delivered = params.skipMessagingToolDelivery;
let deliveryAttempted = params.skipMessagingToolDelivery;
// Tracks whether `runSubagentAnnounceFlow` was actually called. Early
// returns from `deliverViaAnnounce` (active subagents, interim suppression,
// SILENT_REPLY_TOKEN) are intentional suppressions — not delivery failures —
// so the direct-delivery fallback must only fire when the announce send was
// actually attempted and failed.
let announceDeliveryWasAttempted = false;
const failDeliveryTarget = (error: string) =>
params.withRunSession({
status: "error",
@ -191,6 +214,7 @@ export async function dispatchCronDelivery(
const deliverViaDirect = async (
delivery: SuccessfulDeliveryTarget,
options?: { retryTransient?: boolean },
): Promise<RunCronAgentTurnResult | null> => {
const identity = resolveAgentOutboundIdentity(params.cfgWithAgentDefaults, params.agentId);
try {
@ -217,19 +241,27 @@ export async function dispatchCronDelivery(
agentId: params.agentId,
sessionKey: params.agentSessionKey,
});
const deliveryResults = await deliverOutboundPayloads({
cfg: params.cfgWithAgentDefaults,
channel: delivery.channel,
to: delivery.to,
accountId: delivery.accountId,
threadId: delivery.threadId,
payloads: payloadsForDelivery,
session: deliverySession,
identity,
bestEffort: params.deliveryBestEffort,
deps: createOutboundSendDeps(params.deps),
abortSignal: params.abortSignal,
});
const runDelivery = async () =>
await deliverOutboundPayloads({
cfg: params.cfgWithAgentDefaults,
channel: delivery.channel,
to: delivery.to,
accountId: delivery.accountId,
threadId: delivery.threadId,
payloads: payloadsForDelivery,
session: deliverySession,
identity,
bestEffort: params.deliveryBestEffort,
deps: createOutboundSendDeps(params.deps),
abortSignal: params.abortSignal,
});
const deliveryResults = options?.retryTransient
? await retryTransientDirectCronDelivery({
jobId: params.job.id,
signal: params.abortSignal,
run: runDelivery,
})
: await runDelivery();
delivered = deliveryResults.length > 0;
return null;
} catch (err) {
@ -247,31 +279,31 @@ export async function dispatchCronDelivery(
}
};
const deliverViaAnnounce = async (
const finalizeTextDelivery = async (
delivery: SuccessfulDeliveryTarget,
): Promise<RunCronAgentTurnResult | null> => {
const cleanupDirectCronSessionIfNeeded = async (): Promise<void> => {
if (!params.job.deleteAfterRun) {
return;
}
try {
await callGateway({
method: "sessions.delete",
params: {
key: params.agentSessionKey,
deleteTranscript: true,
emitLifecycleHooks: false,
},
timeoutMs: 10_000,
});
} catch {
// Best-effort; direct delivery result should still be returned.
}
};
if (!synthesizedText) {
return null;
}
const announceMainSessionKey = resolveAgentMainSessionKey({
cfg: params.cfg,
agentId: params.agentId,
});
const announceSessionKey = await resolveCronAnnounceSessionKey({
cfg: params.cfgWithAgentDefaults,
agentId: params.agentId,
fallbackSessionKey: announceMainSessionKey,
delivery: {
channel: delivery.channel,
to: delivery.to,
accountId: delivery.accountId,
threadId: delivery.threadId,
},
});
const taskLabel =
typeof params.job.name === "string" && params.job.name.trim()
? params.job.name.trim()
: `cron:${params.job.id}`;
const initialSynthesizedText = synthesizedText.trim();
let activeSubagentRuns = countActiveDescendantRuns(params.agentSessionKey);
const expectedSubagentFollowup = expectsSubagentFollowup(initialSynthesizedText);
@ -357,84 +389,19 @@ export async function dispatchCronDelivery(
...params.telemetry,
});
}
try {
if (params.isAborted()) {
return params.withRunSession({
status: "error",
error: params.abortReason(),
deliveryAttempted,
...params.telemetry,
});
}
deliveryAttempted = true;
announceDeliveryWasAttempted = true;
const didAnnounce = await runSubagentAnnounceFlow({
childSessionKey: params.agentSessionKey,
childRunId: `${params.job.id}:${params.runSessionId}:${params.runStartedAt}`,
requesterSessionKey: announceSessionKey,
requesterOrigin: {
channel: delivery.channel,
to: delivery.to,
accountId: delivery.accountId,
threadId: delivery.threadId,
},
requesterDisplayKey: announceSessionKey,
task: taskLabel,
timeoutMs: params.timeoutMs,
cleanup: params.job.deleteAfterRun ? "delete" : "keep",
roundOneReply: synthesizedText,
// Cron output is a finished completion message: send it directly to the
// target channel via the completion-direct-send path rather than injecting
// a trigger message into the (likely idle) main agent session.
expectsCompletionMessage: true,
// Keep delivery outcome truthful for cron state: if outbound send fails,
// announce flow must report false so caller can apply best-effort policy.
bestEffortDeliver: false,
waitForCompletion: false,
startedAt: params.runStartedAt,
endedAt: params.runEndedAt,
outcome: { status: "ok" },
announceType: "cron job",
signal: params.abortSignal,
if (params.isAborted()) {
return params.withRunSession({
status: "error",
error: params.abortReason(),
deliveryAttempted,
...params.telemetry,
});
if (didAnnounce) {
delivered = true;
} else {
// Announce delivery failed but the agent execution itself succeeded.
// Return ok so the job isn't penalized for a transient delivery issue
// (e.g. "pairing required" when no active client session exists).
// Delivery failure is tracked separately via delivered/deliveryAttempted.
const message = "cron announce delivery failed";
logWarn(`[cron:${params.job.id}] ${message}`);
if (!params.deliveryBestEffort) {
return params.withRunSession({
status: "ok",
summary,
outputText,
error: message,
delivered: false,
deliveryAttempted,
...params.telemetry,
});
}
}
} catch (err) {
// Same as above: announce delivery errors should not mark a successful
// agent execution as failed.
logWarn(`[cron:${params.job.id}] ${String(err)}`);
if (!params.deliveryBestEffort) {
return params.withRunSession({
status: "ok",
summary,
outputText,
error: String(err),
delivered: false,
deliveryAttempted,
...params.telemetry,
});
}
}
return null;
try {
return await deliverViaDirect(delivery, { retryTransient: true });
} finally {
await cleanupDirectCronSessionIfNeeded();
}
};
if (
@ -472,14 +439,9 @@ export async function dispatchCronDelivery(
};
}
// Route text-only cron announce output back through the main session so it
// follows the same system-message injection path as subagent completions.
// Keep direct outbound delivery only for structured payloads (media/channel
// data), which cannot be represented by the shared announce flow.
//
// Forum/topic targets should also use direct delivery. Announce flow can
// be swallowed by ANNOUNCE_SKIP/NO_REPLY in the target agent turn, which
// silently drops cron output for topic-bound sessions.
// Finalize descendant/subagent output first for text-only cron runs, then
// send through the real outbound adapter so delivered=true always reflects
// an actual channel send instead of internal announce routing.
const useDirectDelivery =
params.deliveryPayloadHasStructuredContent || params.resolvedDelivery.threadId != null;
if (useDirectDelivery) {
@ -496,41 +458,10 @@ export async function dispatchCronDelivery(
};
}
} else {
const announceResult = await deliverViaAnnounce(params.resolvedDelivery);
// Fall back to direct delivery only when the announce send was actually
// attempted and failed. Early returns from deliverViaAnnounce (active
// subagents, interim suppression, SILENT_REPLY_TOKEN) are intentional
// suppressions that must NOT trigger direct delivery — doing so would
// bypass the suppression guard and leak partial/stale content.
if (announceDeliveryWasAttempted && !delivered && !params.isAborted()) {
const directFallback = await deliverViaDirect(params.resolvedDelivery);
if (directFallback) {
return {
result: directFallback,
delivered,
deliveryAttempted,
summary,
outputText,
synthesizedText,
deliveryPayloads,
};
}
// If direct delivery succeeded (returned null without error),
// `delivered` has been set to true by deliverViaDirect.
if (delivered) {
return {
delivered,
deliveryAttempted,
summary,
outputText,
synthesizedText,
deliveryPayloads,
};
}
}
if (announceResult) {
const finalizedTextResult = await finalizeTextDelivery(params.resolvedDelivery);
if (finalizedTextResult) {
return {
result: announceResult,
result: finalizedTextResult,
delivered,
deliveryAttempted,
summary,