From ffb12397a8ce97498282e8729b5109659538d32d Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sun, 22 Feb 2026 21:41:36 +0100 Subject: [PATCH] fix(cron): direct-deliver thread and topic announce targets Co-authored-by: Andrei Aratmonov <247877121+AndrewArto@users.noreply.github.com> --- CHANGELOG.md | 1 + ...agent.direct-delivery-forum-topics.test.ts | 101 ++++++++++++++++++ ...p-recipient-besteffortdeliver-true.test.ts | 19 ++-- src/cron/isolated-agent/run.ts | 8 +- 4 files changed, 120 insertions(+), 9 deletions(-) create mode 100644 src/cron/isolated-agent.direct-delivery-forum-topics.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index e2ab70b170b..deda3adfd56 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -62,6 +62,7 @@ Docs: https://docs.openclaw.ai - Cron/Timer: keep a watchdog recheck timer armed while `onTimer` is actively executing so the scheduler continues polling even if a due-run tick stalls for an extended period. (#23628) Thanks @dsgraves. - Cron/Run: enforce the same per-job timeout guard for manual `cron.run` executions as timer-driven runs, including abort propagation for isolated agent jobs, so forced runs cannot wedge indefinitely. (#23704) Thanks @tkuehnl. - Delivery/Queue: quarantine queue entries immediately on known permanent delivery errors (for example invalid recipients or missing conversation references) by moving them to `failed/` instead of retrying on every restart. (#23794) Thanks @aldoeliacim. +- Cron/Delivery: route text-only announce jobs with explicit thread/topic targets through direct outbound delivery so forum/thread destinations do not get dropped by intermediary announce turns. (#23841) Thanks @AndrewArto. - Cron/Status: split execution outcome (`lastRunStatus`) from delivery outcome (`lastDeliveryStatus`) in persisted cron state, finished events, and run history so failed/unknown announcement delivery is visible without conflating it with run errors. - Cron/Schedule: for `every` jobs, prefer `lastRunAtMs + everyMs` when still in the future after restarts, then fall back to anchor scheduling for catch-up windows, so NEXT timing matches the last successful cadence. (#22895) Thanks @SidQin-cyber. - Agents/Compaction: restore embedded compaction safeguard/context-pruning extension loading in production by wiring bundled extension factories into the resource loader instead of runtime file-path resolution. (#22349) Thanks @Glucksberg. diff --git a/src/cron/isolated-agent.direct-delivery-forum-topics.test.ts b/src/cron/isolated-agent.direct-delivery-forum-topics.test.ts new file mode 100644 index 00000000000..eda441b2001 --- /dev/null +++ b/src/cron/isolated-agent.direct-delivery-forum-topics.test.ts @@ -0,0 +1,101 @@ +import "./isolated-agent.mocks.js"; +import { beforeEach, describe, expect, it, vi } from "vitest"; +import { runEmbeddedPiAgent } from "../agents/pi-embedded.js"; +import { runSubagentAnnounceFlow } from "../agents/subagent-announce.js"; +import type { CliDeps } from "../cli/deps.js"; +import { runCronIsolatedAgentTurn } from "./isolated-agent.js"; +import { + makeCfg, + makeJob, + withTempCronHome, + writeSessionStore, +} from "./isolated-agent.test-harness.js"; +import { setupIsolatedAgentTurnMocks } from "./isolated-agent.test-setup.js"; + +function createCliDeps(overrides: Partial = {}): CliDeps { + return { + sendMessageSlack: vi.fn(), + sendMessageWhatsApp: vi.fn(), + sendMessageTelegram: vi.fn(), + sendMessageDiscord: vi.fn(), + sendMessageSignal: vi.fn(), + sendMessageIMessage: vi.fn(), + ...overrides, + }; +} + +function mockAgentPayloads(payloads: Array>) { + vi.mocked(runEmbeddedPiAgent).mockResolvedValue({ + payloads, + meta: { + durationMs: 5, + agentMeta: { sessionId: "s", provider: "p", model: "m" }, + }, + }); +} + +describe("runCronIsolatedAgentTurn forum topic delivery", () => { + beforeEach(() => { + setupIsolatedAgentTurnMocks(); + }); + + it("uses direct delivery for text-only forum topic targets", async () => { + await withTempCronHome(async (home) => { + const storePath = await writeSessionStore(home, { lastProvider: "webchat", lastTo: "" }); + const deps = createCliDeps(); + mockAgentPayloads([{ text: "forum message" }]); + + const res = await runCronIsolatedAgentTurn({ + cfg: makeCfg(home, storePath, { + channels: { telegram: { botToken: "t-1" } }, + }), + deps, + job: { + ...makeJob({ kind: "agentTurn", message: "do it" }), + delivery: { mode: "announce", channel: "telegram", to: "123:topic:42" }, + }, + message: "do it", + sessionKey: "cron:job-1", + lane: "cron", + }); + + expect(res.status).toBe("ok"); + expect(res.delivered).toBe(true); + expect(runSubagentAnnounceFlow).not.toHaveBeenCalled(); + expect(deps.sendMessageTelegram).toHaveBeenCalledTimes(1); + expect(deps.sendMessageTelegram).toHaveBeenCalledWith( + "123", + "forum message", + expect.objectContaining({ + messageThreadId: 42, + }), + ); + }); + }); + + it("keeps text-only non-threaded targets on announce flow", async () => { + await withTempCronHome(async (home) => { + const storePath = await writeSessionStore(home, { lastProvider: "webchat", lastTo: "" }); + const deps = createCliDeps(); + mockAgentPayloads([{ text: "plain message" }]); + + const res = await runCronIsolatedAgentTurn({ + cfg: makeCfg(home, storePath, { + channels: { telegram: { botToken: "t-1" } }, + }), + deps, + job: { + ...makeJob({ kind: "agentTurn", message: "do it" }), + delivery: { mode: "announce", channel: "telegram", to: "123" }, + }, + message: "do it", + sessionKey: "cron:job-1", + lane: "cron", + }); + + expect(res.status).toBe("ok"); + expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(1); + expect(deps.sendMessageTelegram).not.toHaveBeenCalled(); + }); + }); +}); diff --git a/src/cron/isolated-agent.skips-delivery-without-whatsapp-recipient-besteffortdeliver-true.test.ts b/src/cron/isolated-agent.skips-delivery-without-whatsapp-recipient-besteffortdeliver-true.test.ts index edb1599e494..065e5aaa3c8 100644 --- a/src/cron/isolated-agent.skips-delivery-without-whatsapp-recipient-besteffortdeliver-true.test.ts +++ b/src/cron/isolated-agent.skips-delivery-without-whatsapp-recipient-besteffortdeliver-true.test.ts @@ -184,7 +184,7 @@ describe("runCronIsolatedAgentTurn", () => { }); }); - it("passes resolved threadId into shared subagent announce flow", async () => { + it("routes threaded announce targets through direct delivery", async () => { await withTempCronHome(async (home) => { const storePath = await writeSessionStore(home, { lastProvider: "webchat", lastTo: "" }); await fs.writeFile( @@ -214,13 +214,16 @@ describe("runCronIsolatedAgentTurn", () => { }); expect(res.status).toBe("ok"); - expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(1); - const announceArgs = vi.mocked(runSubagentAnnounceFlow).mock.calls[0]?.[0] as - | { requesterOrigin?: { threadId?: string | number; channel?: string; to?: string } } - | undefined; - expect(announceArgs?.requesterOrigin?.channel).toBe("telegram"); - expect(announceArgs?.requesterOrigin?.to).toBe("123"); - expect(announceArgs?.requesterOrigin?.threadId).toBe(42); + expect(res.delivered).toBe(true); + expect(runSubagentAnnounceFlow).not.toHaveBeenCalled(); + expect(deps.sendMessageTelegram).toHaveBeenCalledTimes(1); + expect(deps.sendMessageTelegram).toHaveBeenCalledWith( + "123", + "Final weather summary", + expect.objectContaining({ + messageThreadId: 42, + }), + ); }); }); diff --git a/src/cron/isolated-agent/run.ts b/src/cron/isolated-agent/run.ts index 91106a14982..6d59a8e14e0 100644 --- a/src/cron/isolated-agent/run.ts +++ b/src/cron/isolated-agent/run.ts @@ -657,7 +657,13 @@ export async function runCronIsolatedAgentTurn(params: { // follows the same system-message injection path as subagent completions. // Keep direct outbound delivery only for structured payloads (media/channel // data), which cannot be represented by the shared announce flow. - if (deliveryPayloadHasStructuredContent) { + // + // Forum/topic targets should also use direct delivery. Announce flow can + // be swallowed by ANNOUNCE_SKIP/NO_REPLY in the target agent turn, which + // silently drops cron output for topic-bound sessions. + const useDirectDelivery = + deliveryPayloadHasStructuredContent || resolvedDelivery.threadId != null; + if (useDirectDelivery) { try { const payloadsForDelivery = deliveryPayloads.length > 0