fix: preserve telegram topic delivery routing (#58489) (thanks @cwmine)

This commit is contained in:
Peter Steinberger 2026-04-01 16:02:22 +09:00
parent e643ba2f5e
commit 6776306387
14 changed files with 230 additions and 33 deletions

View File

@ -394,6 +394,9 @@ Docs: https://docs.openclaw.ai
- Telegram/splitting: replace proportional text estimate with verified HTML-length search so long messages split at word boundaries instead of mid-word; gracefully degrade when tag overhead exceeds the limit. (#56595)
- Telegram/delivery: skip whitespace-only and hook-blanked text replies in bot delivery to prevent GrammyError 400 empty-text crashes. (#56620)
- Telegram/send: validate `replyToMessageId` at all four API sinks with a shared normalizer that rejects non-numeric, NaN, and mixed-content strings. (#56587)
- Telegram/cron topics: route announce target parsing through the Telegram extension seam and carry explicit `delivery.threadId` through cron delivery resolution, so legacy `group:` routes and topic-targeted cron sends keep their forum topic destination. (#58489) Thanks @cwmine.
- Approvals/UI: keep the newest pending approval at the front of the Control UI queue so approving one request does not accidentally target an older expired id. Thanks @vincentkoc.
- Plugin approvals: accept unique short approval-id prefixes on `plugin.approval.resolve`, matching exec approvals and restoring `/approve` fallback flows on chat approval surfaces. Thanks @vincentkoc.
- Mistral: normalize OpenAI-compatible request flags so official Mistral API runs no longer fail with remaining `422 status code (no body)` chat errors.
- Control UI/config: keep sensitive raw config hidden by default, replace the blank blocked editor with an explicit reveal-to-edit state, and restore raw JSON editing without auto-exposing secrets. Fixes #55322.
- CLI/zsh: defer `compdef` registration until `compinit` is available so zsh completion loads cleanly with plugin managers and manual setups. (#56555)

View File

@ -204,6 +204,7 @@ Delivery config:
- `delivery.mode`: `none` | `announce` | `webhook`.
- `delivery.channel`: `last` or a specific channel.
- `delivery.to`: channel-specific target (announce) or webhook URL (webhook mode).
- `delivery.threadId`: optional explicit thread or topic id when the target channel supports threaded delivery.
- `delivery.bestEffort`: avoid failing the job if announce delivery fails.
Announce delivery suppresses messaging tool sends for the run; use `delivery.channel`/`delivery.to`
@ -272,6 +273,7 @@ Isolated jobs can deliver output to a channel via the top-level `delivery` confi
- `delivery.mode`: `announce` (channel delivery), `webhook` (HTTP POST), or `none`.
- `delivery.channel`: `last` or any deliverable channel id, for example `discord`, `matrix`, `telegram`, or `whatsapp`.
- `delivery.to`: channel-specific recipient target.
- `delivery.threadId`: optional thread/topic override for channels like Telegram, Slack, Discord, or Matrix when you want a specific thread without encoding it into `delivery.to`.
`announce` delivery is only valid for isolated jobs (`sessionTarget: "isolated"`).
`webhook` delivery is valid for both main and isolated jobs.

View File

@ -22,6 +22,26 @@ describe("resolveAnnounceOrigin telegram forum topics", () => {
});
});
it("preserves stored forum topic thread ids for legacy group-prefixed requester targets", () => {
expect(
resolveAnnounceOrigin(
{
lastChannel: "telegram",
lastTo: "telegram:-1001234567890:topic:99",
lastThreadId: 99,
},
{
channel: "telegram",
to: "group:-1001234567890",
},
),
).toEqual({
channel: "telegram",
to: "group:-1001234567890",
threadId: 99,
});
});
it("still strips stale thread ids when the stored telegram route points at a different chat", () => {
expect(
resolveAnnounceOrigin(

View File

@ -1,4 +1,5 @@
import { resolveQueueSettings } from "../auto-reply/reply/queue.js";
import { parseExplicitTargetForChannel } from "../channels/plugins/target-parsing.js";
import { loadConfig } from "../config/config.js";
import {
loadSessionStore,
@ -92,42 +93,15 @@ function summarizeDeliveryError(error: unknown): string {
}
}
function stripTelegramAnnouncePrefix(to: string): string {
let trimmed = to.trim();
let strippedTelegramPrefix = false;
while (true) {
const next = (() => {
if (/^(telegram|tg):/i.test(trimmed)) {
strippedTelegramPrefix = true;
return trimmed.replace(/^(telegram|tg):/i, "").trim();
}
if (strippedTelegramPrefix && /^group:/i.test(trimmed)) {
return trimmed.replace(/^group:/i, "").trim();
}
return trimmed;
})();
if (next === trimmed) {
return trimmed;
}
trimmed = next;
}
}
function parseTelegramAnnounceTarget(to: string): {
chatId: string;
chatType: "direct" | "group" | "unknown";
} {
const normalized = stripTelegramAnnouncePrefix(to);
const topicMatch = /^(.+?):topic:(\d+)$/.exec(normalized);
const colonMatch = /^(.+):(\d+)$/.exec(normalized);
const chatId = topicMatch?.[1] ?? colonMatch?.[1] ?? normalized;
const trimmedChatId = chatId.trim();
const chatType = /^-?\d+$/.test(trimmedChatId)
? trimmedChatId.startsWith("-")
? "group"
: "direct"
: "unknown";
return { chatId: trimmedChatId, chatType };
const parsed = parseExplicitTargetForChannel("telegram", to);
const chatId = parsed?.to?.trim() ?? to.trim();
const chatType =
parsed?.chatType === "direct" || parsed?.chatType === "group" ? parsed.chatType : "unknown";
return { chatId, chatType };
}
function shouldStripThreadFromAnnounceEntry(

View File

@ -84,6 +84,24 @@ describe("resolveCronDeliveryPlan", () => {
expect(plan.to).toBe("123");
expect(plan.accountId).toBe("bot-a");
});
it("threads delivery.threadId when explicitly configured", () => {
const plan = resolveCronDeliveryPlan(
makeJob({
delivery: {
mode: "announce",
channel: "telegram",
to: "-1001234567890",
threadId: "99",
},
}),
);
expect(plan.mode).toBe("announce");
expect(plan.requested).toBe(true);
expect(plan.channel).toBe("telegram");
expect(plan.to).toBe("-1001234567890");
expect(plan.threadId).toBe("99");
});
});
describe("resolveFailureDestination", () => {

View File

@ -14,6 +14,7 @@ export type CronDeliveryPlan = {
mode: CronDeliveryMode;
channel?: CronMessageChannel;
to?: string;
threadId?: string | number;
/** Explicit channel account id from the delivery config, if set. */
accountId?: string;
source: "delivery" | "payload";
@ -47,6 +48,17 @@ function normalizeAccountId(value: unknown): string | undefined {
return trimmed ? trimmed : undefined;
}
function normalizeThreadId(value: unknown): string | number | undefined {
if (typeof value === "number" && Number.isFinite(value)) {
return value;
}
if (typeof value !== "string") {
return undefined;
}
const trimmed = value.trim();
return trimmed ? trimmed : undefined;
}
export function resolveCronDeliveryPlan(job: CronJob): CronDeliveryPlan {
const payload = job.payload.kind === "agentTurn" ? job.payload : null;
const delivery = job.delivery;
@ -70,6 +82,9 @@ export function resolveCronDeliveryPlan(job: CronJob): CronDeliveryPlan {
(delivery as { channel?: unknown } | undefined)?.channel,
);
const deliveryTo = normalizeTo((delivery as { to?: unknown } | undefined)?.to);
const deliveryThreadId = normalizeThreadId(
(delivery as { threadId?: unknown } | undefined)?.threadId,
);
const channel = deliveryChannel ?? payloadChannel ?? "last";
const to = deliveryTo ?? payloadTo;
const deliveryAccountId = normalizeAccountId(
@ -81,6 +96,7 @@ export function resolveCronDeliveryPlan(job: CronJob): CronDeliveryPlan {
mode: resolvedMode,
channel: resolvedMode === "announce" ? channel : undefined,
to,
threadId: resolvedMode === "announce" ? deliveryThreadId : undefined,
accountId: deliveryAccountId,
source: "delivery",
requested: resolvedMode === "announce",
@ -96,6 +112,7 @@ export function resolveCronDeliveryPlan(job: CronJob): CronDeliveryPlan {
mode: requested ? "announce" : "none",
channel,
to,
threadId: undefined,
source: "payload",
requested,
};

View File

@ -161,4 +161,18 @@ describe("resolveDeliveryTarget thread session lookup", () => {
expect(result.to).toBe("63448508");
expect(result.threadId).toBe(1008013);
});
it("preserves explicit delivery.threadId on first run without topic syntax", async () => {
mockStore["/mock/store.json"] = {};
const result = await resolveDeliveryTarget(cfg, "main", {
channel: "telegram",
to: "63448508",
threadId: "1008013",
});
expect(result.to).toBe("63448508");
expect(result.threadId).toBe("1008013");
expect(result.channel).toBe("telegram");
});
});

View File

@ -43,6 +43,7 @@ export async function resolveDeliveryTarget(
jobPayload: {
channel?: "last" | ChannelId;
to?: string;
threadId?: string | number;
/** Explicit accountId from job.delivery — overrides session-derived and binding-derived values. */
accountId?: string;
sessionKey?: string;
@ -67,6 +68,7 @@ export async function resolveDeliveryTarget(
entry: main,
requestedChannel,
explicitTo,
explicitThreadId: jobPayload.threadId,
allowMismatchedLastTo,
});
@ -93,6 +95,7 @@ export async function resolveDeliveryTarget(
entry: main,
requestedChannel,
explicitTo,
explicitThreadId: jobPayload.threadId,
fallbackChannel,
allowMismatchedLastTo,
mode: preliminary.mode,

View File

@ -144,6 +144,7 @@ async function resolveCronDeliveryContext(params: {
const resolvedDelivery = await resolveDeliveryTarget(params.cfg, params.agentId, {
channel: deliveryPlan.channel ?? "last",
to: deliveryPlan.to,
threadId: deliveryPlan.threadId,
accountId: deliveryPlan.accountId,
sessionKey: params.job.sessionKey,
});

View File

@ -248,6 +248,32 @@ describe("normalizeCronJobCreate", () => {
expect(delivery.accountId).toBe("coordinator");
});
it("normalizes delivery threadId and preserves numeric values", () => {
const stringThread = normalizeIsolatedAgentTurnCreateJob({
name: "delivery thread string",
delivery: {
mode: "announce",
channel: "telegram",
to: "-1003816714067",
threadId: " 1008013 ",
},
});
expect((stringThread.delivery as Record<string, unknown>).threadId).toBe("1008013");
const numericThread = normalizeIsolatedAgentTurnCreateJob({
name: "delivery thread number",
delivery: {
mode: "announce",
channel: "telegram",
to: "-1003816714067",
threadId: 1008013,
},
});
expect((numericThread.delivery as Record<string, unknown>).threadId).toBe(1008013);
});
it("strips empty accountId from delivery", () => {
const normalized = normalizeIsolatedAgentTurnCreateJob({
name: "empty account",
@ -310,6 +336,26 @@ describe("normalizeCronJobCreate", () => {
expect(delivery.bestEffort).toBe(true);
});
it("migrates legacy top-level threadId hints into delivery", () => {
const normalized = normalizeCronJobCreate({
name: "legacy root thread",
enabled: true,
schedule: { kind: "cron", expr: "* * * * *" },
payload: {
kind: "agentTurn",
message: "hi",
},
channel: "telegram",
to: "-1001234567890",
threadId: " 99 ",
}) as unknown as Record<string, unknown>;
const delivery = normalized.delivery as Record<string, unknown>;
expectAnnounceDeliveryTarget(delivery, { channel: "telegram", to: "-1001234567890" });
expect(delivery.threadId).toBe("99");
expect(normalized.threadId).toBeUndefined();
});
it("maps legacy deliver=false to delivery none", () => {
const normalized = normalizeCronJobCreate({
name: "legacy off",
@ -499,4 +545,16 @@ describe("normalizeCronJobPatch", () => {
const schedule = normalized.schedule as Record<string, unknown>;
expect(schedule.staggerMs).toBe(30_000);
});
it("preserves legacy patch threadId hints for downstream delivery migration", () => {
const normalized = normalizeCronJobPatch({
payload: {
kind: "agentTurn",
threadId: 77,
},
}) as unknown as Record<string, unknown>;
expect(normalized.delivery).toBeUndefined();
expect((normalized.payload as Record<string, unknown>).threadId).toBe(77);
});
});

View File

@ -208,6 +208,18 @@ function coerceDelivery(delivery: UnknownRecord) {
delete next.to;
}
}
if (typeof delivery.threadId === "number" && Number.isFinite(delivery.threadId)) {
next.threadId = delivery.threadId;
} else if (typeof delivery.threadId === "string") {
const trimmed = delivery.threadId.trim();
if (trimmed) {
next.threadId = trimmed;
} else {
delete next.threadId;
}
} else if ("threadId" in next) {
delete next.threadId;
}
if (typeof delivery.accountId === "string") {
const trimmed = delivery.accountId.trim();
if (trimmed) {
@ -299,6 +311,13 @@ function copyTopLevelLegacyDeliveryFields(next: UnknownRecord, payload: UnknownR
if (typeof payload.to !== "string" && typeof next.to === "string" && next.to.trim()) {
payload.to = next.to.trim();
}
if (
!("threadId" in payload) &&
((typeof next.threadId === "number" && Number.isFinite(next.threadId)) ||
(typeof next.threadId === "string" && next.threadId.trim()))
) {
payload.threadId = typeof next.threadId === "string" ? next.threadId.trim() : next.threadId;
}
if (
typeof payload.bestEffortDeliver !== "boolean" &&
typeof next.bestEffortDeliver === "boolean"
@ -324,6 +343,7 @@ function stripLegacyTopLevelFields(next: UnknownRecord) {
delete next.deliver;
delete next.channel;
delete next.to;
delete next.threadId;
delete next.bestEffortDeliver;
delete next.provider;
}

View File

@ -0,0 +1,42 @@
import { describe, expect, it } from "vitest";
import type { CronJob } from "../types.js";
import { applyJobPatch } from "./jobs.js";
function makeJob(overrides: Partial<CronJob> = {}): CronJob {
const now = Date.now();
return {
id: "job-1",
name: "test",
enabled: true,
createdAtMs: now,
updatedAtMs: now,
schedule: { kind: "every", everyMs: 60_000 },
sessionTarget: "isolated",
wakeMode: "now",
payload: { kind: "agentTurn", message: "hello" },
delivery: { mode: "announce", channel: "telegram", to: "-1001234567890" },
state: {},
...overrides,
};
}
describe("applyJobPatch legacy delivery migration", () => {
it("threads legacy payload threadId hints into delivery", () => {
const job = makeJob();
const patch = {
payload: {
kind: "agentTurn",
threadId: "99",
},
} as unknown as Parameters<typeof applyJobPatch>[1];
applyJobPatch(job, patch);
expect(job.delivery).toEqual({
mode: "announce",
channel: "telegram",
to: "-1001234567890",
threadId: "99",
});
});
});

View File

@ -712,10 +712,18 @@ function buildLegacyDeliveryPatch(
): CronDeliveryPatch | null {
const deliver = payload.deliver;
const toRaw = typeof payload.to === "string" ? payload.to.trim() : "";
const threadIdValue = (payload as { threadId?: unknown }).threadId;
const threadIdRaw =
typeof threadIdValue === "number" && Number.isFinite(threadIdValue)
? threadIdValue
: typeof threadIdValue === "string" && threadIdValue.trim()
? threadIdValue.trim()
: undefined;
const hasLegacyHints =
typeof deliver === "boolean" ||
typeof payload.bestEffortDeliver === "boolean" ||
Boolean(toRaw);
Boolean(toRaw) ||
threadIdRaw != null;
if (!hasLegacyHints) {
return null;
}
@ -740,6 +748,10 @@ function buildLegacyDeliveryPatch(
patch.to = payload.to.trim();
hasPatch = true;
}
if (threadIdRaw != null) {
patch.threadId = threadIdRaw;
hasPatch = true;
}
if (typeof payload.bestEffortDeliver === "boolean") {
patch.bestEffort = payload.bestEffortDeliver;
hasPatch = true;
@ -780,6 +792,13 @@ function normalizeOptionalTrimmedString(value: unknown): string | undefined {
return trimmed ? trimmed : undefined;
}
function normalizeOptionalThreadId(value: unknown): string | number | undefined {
if (typeof value === "number" && Number.isFinite(value)) {
return value;
}
return normalizeOptionalTrimmedString(value);
}
function mergeCronDelivery(
existing: CronDelivery | undefined,
patch: CronDeliveryPatch,
@ -788,6 +807,7 @@ function mergeCronDelivery(
mode: existing?.mode ?? "none",
channel: existing?.channel,
to: existing?.to,
threadId: existing?.threadId,
accountId: existing?.accountId,
bestEffort: existing?.bestEffort,
failureDestination: existing?.failureDestination,
@ -802,6 +822,9 @@ function mergeCronDelivery(
if ("to" in patch) {
next.to = normalizeOptionalTrimmedString(patch.to);
}
if ("threadId" in patch) {
next.threadId = normalizeOptionalThreadId(patch.threadId);
}
if ("accountId" in patch) {
next.accountId = normalizeOptionalTrimmedString(patch.accountId);
}

View File

@ -25,6 +25,8 @@ export type CronDelivery = {
mode: CronDeliveryMode;
channel?: CronMessageChannel;
to?: string;
/** Explicit thread/topic id for channels that support threaded delivery. */
threadId?: string | number;
/** Explicit channel account id for multi-account setups (e.g. multiple Telegram bots). */
accountId?: string;
bestEffort?: boolean;