From 6776306387cc87c7d46203521384f32ebbe5658e Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Wed, 1 Apr 2026 16:02:22 +0900 Subject: [PATCH] fix: preserve telegram topic delivery routing (#58489) (thanks @cwmine) --- CHANGELOG.md | 3 + docs/automation/cron-jobs.md | 2 + src/agents/subagent-announce-delivery.test.ts | 20 +++++++ src/agents/subagent-announce-delivery.ts | 38 ++---------- src/cron/delivery.test.ts | 18 ++++++ src/cron/delivery.ts | 17 ++++++ ...ent.delivery-target-thread-session.test.ts | 14 +++++ src/cron/isolated-agent/delivery-target.ts | 3 + src/cron/isolated-agent/run.ts | 1 + src/cron/normalize.test.ts | 58 +++++++++++++++++++ src/cron/normalize.ts | 20 +++++++ src/cron/service/jobs.apply-patch.test.ts | 42 ++++++++++++++ src/cron/service/jobs.ts | 25 +++++++- src/cron/types.ts | 2 + 14 files changed, 230 insertions(+), 33 deletions(-) create mode 100644 src/cron/service/jobs.apply-patch.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 1d28dc24fde..35697554ea7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -394,6 +394,9 @@ Docs: https://docs.openclaw.ai - Telegram/splitting: replace proportional text estimate with verified HTML-length search so long messages split at word boundaries instead of mid-word; gracefully degrade when tag overhead exceeds the limit. (#56595) - Telegram/delivery: skip whitespace-only and hook-blanked text replies in bot delivery to prevent GrammyError 400 empty-text crashes. (#56620) - Telegram/send: validate `replyToMessageId` at all four API sinks with a shared normalizer that rejects non-numeric, NaN, and mixed-content strings. (#56587) +- Telegram/cron topics: route announce target parsing through the Telegram extension seam and carry explicit `delivery.threadId` through cron delivery resolution, so legacy `group:` routes and topic-targeted cron sends keep their forum topic destination. (#58489) Thanks @cwmine. +- Approvals/UI: keep the newest pending approval at the front of the Control UI queue so approving one request does not accidentally target an older expired id. Thanks @vincentkoc. +- Plugin approvals: accept unique short approval-id prefixes on `plugin.approval.resolve`, matching exec approvals and restoring `/approve` fallback flows on chat approval surfaces. Thanks @vincentkoc. - Mistral: normalize OpenAI-compatible request flags so official Mistral API runs no longer fail with remaining `422 status code (no body)` chat errors. - Control UI/config: keep sensitive raw config hidden by default, replace the blank blocked editor with an explicit reveal-to-edit state, and restore raw JSON editing without auto-exposing secrets. Fixes #55322. - CLI/zsh: defer `compdef` registration until `compinit` is available so zsh completion loads cleanly with plugin managers and manual setups. (#56555) diff --git a/docs/automation/cron-jobs.md b/docs/automation/cron-jobs.md index 19127ff0c40..f7acb19f66f 100644 --- a/docs/automation/cron-jobs.md +++ b/docs/automation/cron-jobs.md @@ -204,6 +204,7 @@ Delivery config: - `delivery.mode`: `none` | `announce` | `webhook`. - `delivery.channel`: `last` or a specific channel. - `delivery.to`: channel-specific target (announce) or webhook URL (webhook mode). +- `delivery.threadId`: optional explicit thread or topic id when the target channel supports threaded delivery. - `delivery.bestEffort`: avoid failing the job if announce delivery fails. Announce delivery suppresses messaging tool sends for the run; use `delivery.channel`/`delivery.to` @@ -272,6 +273,7 @@ Isolated jobs can deliver output to a channel via the top-level `delivery` confi - `delivery.mode`: `announce` (channel delivery), `webhook` (HTTP POST), or `none`. - `delivery.channel`: `last` or any deliverable channel id, for example `discord`, `matrix`, `telegram`, or `whatsapp`. - `delivery.to`: channel-specific recipient target. +- `delivery.threadId`: optional thread/topic override for channels like Telegram, Slack, Discord, or Matrix when you want a specific thread without encoding it into `delivery.to`. `announce` delivery is only valid for isolated jobs (`sessionTarget: "isolated"`). `webhook` delivery is valid for both main and isolated jobs. diff --git a/src/agents/subagent-announce-delivery.test.ts b/src/agents/subagent-announce-delivery.test.ts index 232ef5acba7..e1cc3f57fa7 100644 --- a/src/agents/subagent-announce-delivery.test.ts +++ b/src/agents/subagent-announce-delivery.test.ts @@ -22,6 +22,26 @@ describe("resolveAnnounceOrigin telegram forum topics", () => { }); }); + it("preserves stored forum topic thread ids for legacy group-prefixed requester targets", () => { + expect( + resolveAnnounceOrigin( + { + lastChannel: "telegram", + lastTo: "telegram:-1001234567890:topic:99", + lastThreadId: 99, + }, + { + channel: "telegram", + to: "group:-1001234567890", + }, + ), + ).toEqual({ + channel: "telegram", + to: "group:-1001234567890", + threadId: 99, + }); + }); + it("still strips stale thread ids when the stored telegram route points at a different chat", () => { expect( resolveAnnounceOrigin( diff --git a/src/agents/subagent-announce-delivery.ts b/src/agents/subagent-announce-delivery.ts index 4798b031a66..2501d3ed98e 100644 --- a/src/agents/subagent-announce-delivery.ts +++ b/src/agents/subagent-announce-delivery.ts @@ -1,4 +1,5 @@ import { resolveQueueSettings } from "../auto-reply/reply/queue.js"; +import { parseExplicitTargetForChannel } from "../channels/plugins/target-parsing.js"; import { loadConfig } from "../config/config.js"; import { loadSessionStore, @@ -92,42 +93,15 @@ function summarizeDeliveryError(error: unknown): string { } } -function stripTelegramAnnouncePrefix(to: string): string { - let trimmed = to.trim(); - let strippedTelegramPrefix = false; - while (true) { - const next = (() => { - if (/^(telegram|tg):/i.test(trimmed)) { - strippedTelegramPrefix = true; - return trimmed.replace(/^(telegram|tg):/i, "").trim(); - } - if (strippedTelegramPrefix && /^group:/i.test(trimmed)) { - return trimmed.replace(/^group:/i, "").trim(); - } - return trimmed; - })(); - if (next === trimmed) { - return trimmed; - } - trimmed = next; - } -} - function parseTelegramAnnounceTarget(to: string): { chatId: string; chatType: "direct" | "group" | "unknown"; } { - const normalized = stripTelegramAnnouncePrefix(to); - const topicMatch = /^(.+?):topic:(\d+)$/.exec(normalized); - const colonMatch = /^(.+):(\d+)$/.exec(normalized); - const chatId = topicMatch?.[1] ?? colonMatch?.[1] ?? normalized; - const trimmedChatId = chatId.trim(); - const chatType = /^-?\d+$/.test(trimmedChatId) - ? trimmedChatId.startsWith("-") - ? "group" - : "direct" - : "unknown"; - return { chatId: trimmedChatId, chatType }; + const parsed = parseExplicitTargetForChannel("telegram", to); + const chatId = parsed?.to?.trim() ?? to.trim(); + const chatType = + parsed?.chatType === "direct" || parsed?.chatType === "group" ? parsed.chatType : "unknown"; + return { chatId, chatType }; } function shouldStripThreadFromAnnounceEntry( diff --git a/src/cron/delivery.test.ts b/src/cron/delivery.test.ts index 43eaa215114..4193b94e170 100644 --- a/src/cron/delivery.test.ts +++ b/src/cron/delivery.test.ts @@ -84,6 +84,24 @@ describe("resolveCronDeliveryPlan", () => { expect(plan.to).toBe("123"); expect(plan.accountId).toBe("bot-a"); }); + + it("threads delivery.threadId when explicitly configured", () => { + const plan = resolveCronDeliveryPlan( + makeJob({ + delivery: { + mode: "announce", + channel: "telegram", + to: "-1001234567890", + threadId: "99", + }, + }), + ); + expect(plan.mode).toBe("announce"); + expect(plan.requested).toBe(true); + expect(plan.channel).toBe("telegram"); + expect(plan.to).toBe("-1001234567890"); + expect(plan.threadId).toBe("99"); + }); }); describe("resolveFailureDestination", () => { diff --git a/src/cron/delivery.ts b/src/cron/delivery.ts index 9d502a74fcb..fe716cc4700 100644 --- a/src/cron/delivery.ts +++ b/src/cron/delivery.ts @@ -14,6 +14,7 @@ export type CronDeliveryPlan = { mode: CronDeliveryMode; channel?: CronMessageChannel; to?: string; + threadId?: string | number; /** Explicit channel account id from the delivery config, if set. */ accountId?: string; source: "delivery" | "payload"; @@ -47,6 +48,17 @@ function normalizeAccountId(value: unknown): string | undefined { return trimmed ? trimmed : undefined; } +function normalizeThreadId(value: unknown): string | number | undefined { + if (typeof value === "number" && Number.isFinite(value)) { + return value; + } + if (typeof value !== "string") { + return undefined; + } + const trimmed = value.trim(); + return trimmed ? trimmed : undefined; +} + export function resolveCronDeliveryPlan(job: CronJob): CronDeliveryPlan { const payload = job.payload.kind === "agentTurn" ? job.payload : null; const delivery = job.delivery; @@ -70,6 +82,9 @@ export function resolveCronDeliveryPlan(job: CronJob): CronDeliveryPlan { (delivery as { channel?: unknown } | undefined)?.channel, ); const deliveryTo = normalizeTo((delivery as { to?: unknown } | undefined)?.to); + const deliveryThreadId = normalizeThreadId( + (delivery as { threadId?: unknown } | undefined)?.threadId, + ); const channel = deliveryChannel ?? payloadChannel ?? "last"; const to = deliveryTo ?? payloadTo; const deliveryAccountId = normalizeAccountId( @@ -81,6 +96,7 @@ export function resolveCronDeliveryPlan(job: CronJob): CronDeliveryPlan { mode: resolvedMode, channel: resolvedMode === "announce" ? channel : undefined, to, + threadId: resolvedMode === "announce" ? deliveryThreadId : undefined, accountId: deliveryAccountId, source: "delivery", requested: resolvedMode === "announce", @@ -96,6 +112,7 @@ export function resolveCronDeliveryPlan(job: CronJob): CronDeliveryPlan { mode: requested ? "announce" : "none", channel, to, + threadId: undefined, source: "payload", requested, }; diff --git a/src/cron/isolated-agent.delivery-target-thread-session.test.ts b/src/cron/isolated-agent.delivery-target-thread-session.test.ts index 93cd16e4381..69151846a85 100644 --- a/src/cron/isolated-agent.delivery-target-thread-session.test.ts +++ b/src/cron/isolated-agent.delivery-target-thread-session.test.ts @@ -161,4 +161,18 @@ describe("resolveDeliveryTarget thread session lookup", () => { expect(result.to).toBe("63448508"); expect(result.threadId).toBe(1008013); }); + + it("preserves explicit delivery.threadId on first run without topic syntax", async () => { + mockStore["/mock/store.json"] = {}; + + const result = await resolveDeliveryTarget(cfg, "main", { + channel: "telegram", + to: "63448508", + threadId: "1008013", + }); + + expect(result.to).toBe("63448508"); + expect(result.threadId).toBe("1008013"); + expect(result.channel).toBe("telegram"); + }); }); diff --git a/src/cron/isolated-agent/delivery-target.ts b/src/cron/isolated-agent/delivery-target.ts index 07d7a7ee8bc..3bc974b3c98 100644 --- a/src/cron/isolated-agent/delivery-target.ts +++ b/src/cron/isolated-agent/delivery-target.ts @@ -43,6 +43,7 @@ export async function resolveDeliveryTarget( jobPayload: { channel?: "last" | ChannelId; to?: string; + threadId?: string | number; /** Explicit accountId from job.delivery — overrides session-derived and binding-derived values. */ accountId?: string; sessionKey?: string; @@ -67,6 +68,7 @@ export async function resolveDeliveryTarget( entry: main, requestedChannel, explicitTo, + explicitThreadId: jobPayload.threadId, allowMismatchedLastTo, }); @@ -93,6 +95,7 @@ export async function resolveDeliveryTarget( entry: main, requestedChannel, explicitTo, + explicitThreadId: jobPayload.threadId, fallbackChannel, allowMismatchedLastTo, mode: preliminary.mode, diff --git a/src/cron/isolated-agent/run.ts b/src/cron/isolated-agent/run.ts index a59e086fbd8..eef449444d5 100644 --- a/src/cron/isolated-agent/run.ts +++ b/src/cron/isolated-agent/run.ts @@ -144,6 +144,7 @@ async function resolveCronDeliveryContext(params: { const resolvedDelivery = await resolveDeliveryTarget(params.cfg, params.agentId, { channel: deliveryPlan.channel ?? "last", to: deliveryPlan.to, + threadId: deliveryPlan.threadId, accountId: deliveryPlan.accountId, sessionKey: params.job.sessionKey, }); diff --git a/src/cron/normalize.test.ts b/src/cron/normalize.test.ts index 969faa6bb6f..273a35c0847 100644 --- a/src/cron/normalize.test.ts +++ b/src/cron/normalize.test.ts @@ -248,6 +248,32 @@ describe("normalizeCronJobCreate", () => { expect(delivery.accountId).toBe("coordinator"); }); + it("normalizes delivery threadId and preserves numeric values", () => { + const stringThread = normalizeIsolatedAgentTurnCreateJob({ + name: "delivery thread string", + delivery: { + mode: "announce", + channel: "telegram", + to: "-1003816714067", + threadId: " 1008013 ", + }, + }); + + expect((stringThread.delivery as Record).threadId).toBe("1008013"); + + const numericThread = normalizeIsolatedAgentTurnCreateJob({ + name: "delivery thread number", + delivery: { + mode: "announce", + channel: "telegram", + to: "-1003816714067", + threadId: 1008013, + }, + }); + + expect((numericThread.delivery as Record).threadId).toBe(1008013); + }); + it("strips empty accountId from delivery", () => { const normalized = normalizeIsolatedAgentTurnCreateJob({ name: "empty account", @@ -310,6 +336,26 @@ describe("normalizeCronJobCreate", () => { expect(delivery.bestEffort).toBe(true); }); + it("migrates legacy top-level threadId hints into delivery", () => { + const normalized = normalizeCronJobCreate({ + name: "legacy root thread", + enabled: true, + schedule: { kind: "cron", expr: "* * * * *" }, + payload: { + kind: "agentTurn", + message: "hi", + }, + channel: "telegram", + to: "-1001234567890", + threadId: " 99 ", + }) as unknown as Record; + + const delivery = normalized.delivery as Record; + expectAnnounceDeliveryTarget(delivery, { channel: "telegram", to: "-1001234567890" }); + expect(delivery.threadId).toBe("99"); + expect(normalized.threadId).toBeUndefined(); + }); + it("maps legacy deliver=false to delivery none", () => { const normalized = normalizeCronJobCreate({ name: "legacy off", @@ -499,4 +545,16 @@ describe("normalizeCronJobPatch", () => { const schedule = normalized.schedule as Record; expect(schedule.staggerMs).toBe(30_000); }); + + it("preserves legacy patch threadId hints for downstream delivery migration", () => { + const normalized = normalizeCronJobPatch({ + payload: { + kind: "agentTurn", + threadId: 77, + }, + }) as unknown as Record; + + expect(normalized.delivery).toBeUndefined(); + expect((normalized.payload as Record).threadId).toBe(77); + }); }); diff --git a/src/cron/normalize.ts b/src/cron/normalize.ts index 7a292809fea..8990c2480a7 100644 --- a/src/cron/normalize.ts +++ b/src/cron/normalize.ts @@ -208,6 +208,18 @@ function coerceDelivery(delivery: UnknownRecord) { delete next.to; } } + if (typeof delivery.threadId === "number" && Number.isFinite(delivery.threadId)) { + next.threadId = delivery.threadId; + } else if (typeof delivery.threadId === "string") { + const trimmed = delivery.threadId.trim(); + if (trimmed) { + next.threadId = trimmed; + } else { + delete next.threadId; + } + } else if ("threadId" in next) { + delete next.threadId; + } if (typeof delivery.accountId === "string") { const trimmed = delivery.accountId.trim(); if (trimmed) { @@ -299,6 +311,13 @@ function copyTopLevelLegacyDeliveryFields(next: UnknownRecord, payload: UnknownR if (typeof payload.to !== "string" && typeof next.to === "string" && next.to.trim()) { payload.to = next.to.trim(); } + if ( + !("threadId" in payload) && + ((typeof next.threadId === "number" && Number.isFinite(next.threadId)) || + (typeof next.threadId === "string" && next.threadId.trim())) + ) { + payload.threadId = typeof next.threadId === "string" ? next.threadId.trim() : next.threadId; + } if ( typeof payload.bestEffortDeliver !== "boolean" && typeof next.bestEffortDeliver === "boolean" @@ -324,6 +343,7 @@ function stripLegacyTopLevelFields(next: UnknownRecord) { delete next.deliver; delete next.channel; delete next.to; + delete next.threadId; delete next.bestEffortDeliver; delete next.provider; } diff --git a/src/cron/service/jobs.apply-patch.test.ts b/src/cron/service/jobs.apply-patch.test.ts new file mode 100644 index 00000000000..7c5e030c4d2 --- /dev/null +++ b/src/cron/service/jobs.apply-patch.test.ts @@ -0,0 +1,42 @@ +import { describe, expect, it } from "vitest"; +import type { CronJob } from "../types.js"; +import { applyJobPatch } from "./jobs.js"; + +function makeJob(overrides: Partial = {}): CronJob { + const now = Date.now(); + return { + id: "job-1", + name: "test", + enabled: true, + createdAtMs: now, + updatedAtMs: now, + schedule: { kind: "every", everyMs: 60_000 }, + sessionTarget: "isolated", + wakeMode: "now", + payload: { kind: "agentTurn", message: "hello" }, + delivery: { mode: "announce", channel: "telegram", to: "-1001234567890" }, + state: {}, + ...overrides, + }; +} + +describe("applyJobPatch legacy delivery migration", () => { + it("threads legacy payload threadId hints into delivery", () => { + const job = makeJob(); + const patch = { + payload: { + kind: "agentTurn", + threadId: "99", + }, + } as unknown as Parameters[1]; + + applyJobPatch(job, patch); + + expect(job.delivery).toEqual({ + mode: "announce", + channel: "telegram", + to: "-1001234567890", + threadId: "99", + }); + }); +}); diff --git a/src/cron/service/jobs.ts b/src/cron/service/jobs.ts index 542ba81053d..d55d41d1937 100644 --- a/src/cron/service/jobs.ts +++ b/src/cron/service/jobs.ts @@ -712,10 +712,18 @@ function buildLegacyDeliveryPatch( ): CronDeliveryPatch | null { const deliver = payload.deliver; const toRaw = typeof payload.to === "string" ? payload.to.trim() : ""; + const threadIdValue = (payload as { threadId?: unknown }).threadId; + const threadIdRaw = + typeof threadIdValue === "number" && Number.isFinite(threadIdValue) + ? threadIdValue + : typeof threadIdValue === "string" && threadIdValue.trim() + ? threadIdValue.trim() + : undefined; const hasLegacyHints = typeof deliver === "boolean" || typeof payload.bestEffortDeliver === "boolean" || - Boolean(toRaw); + Boolean(toRaw) || + threadIdRaw != null; if (!hasLegacyHints) { return null; } @@ -740,6 +748,10 @@ function buildLegacyDeliveryPatch( patch.to = payload.to.trim(); hasPatch = true; } + if (threadIdRaw != null) { + patch.threadId = threadIdRaw; + hasPatch = true; + } if (typeof payload.bestEffortDeliver === "boolean") { patch.bestEffort = payload.bestEffortDeliver; hasPatch = true; @@ -780,6 +792,13 @@ function normalizeOptionalTrimmedString(value: unknown): string | undefined { return trimmed ? trimmed : undefined; } +function normalizeOptionalThreadId(value: unknown): string | number | undefined { + if (typeof value === "number" && Number.isFinite(value)) { + return value; + } + return normalizeOptionalTrimmedString(value); +} + function mergeCronDelivery( existing: CronDelivery | undefined, patch: CronDeliveryPatch, @@ -788,6 +807,7 @@ function mergeCronDelivery( mode: existing?.mode ?? "none", channel: existing?.channel, to: existing?.to, + threadId: existing?.threadId, accountId: existing?.accountId, bestEffort: existing?.bestEffort, failureDestination: existing?.failureDestination, @@ -802,6 +822,9 @@ function mergeCronDelivery( if ("to" in patch) { next.to = normalizeOptionalTrimmedString(patch.to); } + if ("threadId" in patch) { + next.threadId = normalizeOptionalThreadId(patch.threadId); + } if ("accountId" in patch) { next.accountId = normalizeOptionalTrimmedString(patch.accountId); } diff --git a/src/cron/types.ts b/src/cron/types.ts index 9e424f564ac..b1806939b7c 100644 --- a/src/cron/types.ts +++ b/src/cron/types.ts @@ -25,6 +25,8 @@ export type CronDelivery = { mode: CronDeliveryMode; channel?: CronMessageChannel; to?: string; + /** Explicit thread/topic id for channels that support threaded delivery. */ + threadId?: string | number; /** Explicit channel account id for multi-account setups (e.g. multiple Telegram bots). */ accountId?: string; bestEffort?: boolean;