fix: preserve telegram topic routing in announce and delivery context

This commit is contained in:
yi-bot 2026-03-31 16:57:05 +00:00 committed by Peter Steinberger
parent 1b94e8ca14
commit e643ba2f5e
8 changed files with 257 additions and 8 deletions

View File

@ -99,7 +99,7 @@ describe("buildTelegramMessageContext DM topic threadId in deliveryContext (#889
expect(ctx).not.toBeNull();
expect(recordInboundSessionMock).toHaveBeenCalled();
expectRecordedRoute({ to: "telegram:-1001234567890", threadId: "99" });
expectRecordedRoute({ to: "telegram:-1001234567890:topic:99", threadId: "99" });
});
it("passes threadId to updateLastRoute for the forum General topic", async () => {
@ -115,6 +115,6 @@ describe("buildTelegramMessageContext DM topic threadId in deliveryContext (#889
expect(ctx).not.toBeNull();
expect(recordInboundSessionMock).toHaveBeenCalled();
expectRecordedRoute({ to: "telegram:-1001234567890", threadId: "1" });
expectRecordedRoute({ to: "telegram:-1001234567890:topic:1", threadId: "1" });
});
});

View File

@ -280,7 +280,10 @@ export async function buildTelegramInboundContextPayload(params: {
? {
sessionKey: updateLastRouteSessionKey,
channel: "telegram",
to: `telegram:${chatId}`,
to:
isGroup && updateLastRouteThreadId != null
? `telegram:${chatId}:topic:${updateLastRouteThreadId}`
: `telegram:${chatId}`,
accountId: route.accountId,
threadId: updateLastRouteThreadId,
mainDmOwnerPin:

View File

@ -0,0 +1,43 @@
import { describe, expect, it } from "vitest";
import { resolveAnnounceOrigin } from "./subagent-announce-delivery.js";
describe("resolveAnnounceOrigin telegram forum topics", () => {
it("preserves stored forum topic thread ids when requester origin omits one for the same chat", () => {
expect(
resolveAnnounceOrigin(
{
lastChannel: "telegram",
lastTo: "telegram:-1001234567890:topic:99",
lastThreadId: 99,
},
{
channel: "telegram",
to: "telegram:-1001234567890",
},
),
).toEqual({
channel: "telegram",
to: "telegram:-1001234567890",
threadId: 99,
});
});
it("still strips stale thread ids when the stored telegram route points at a different chat", () => {
expect(
resolveAnnounceOrigin(
{
lastChannel: "telegram",
lastTo: "telegram:-1009999999999:topic:99",
lastThreadId: 99,
},
{
channel: "telegram",
to: "telegram:-1001234567890",
},
),
).toEqual({
channel: "telegram",
to: "telegram:-1001234567890",
});
});
});

View File

@ -92,6 +92,79 @@ function summarizeDeliveryError(error: unknown): string {
}
}
function stripTelegramAnnouncePrefix(to: string): string {
let trimmed = to.trim();
let strippedTelegramPrefix = false;
while (true) {
const next = (() => {
if (/^(telegram|tg):/i.test(trimmed)) {
strippedTelegramPrefix = true;
return trimmed.replace(/^(telegram|tg):/i, "").trim();
}
if (strippedTelegramPrefix && /^group:/i.test(trimmed)) {
return trimmed.replace(/^group:/i, "").trim();
}
return trimmed;
})();
if (next === trimmed) {
return trimmed;
}
trimmed = next;
}
}
function parseTelegramAnnounceTarget(to: string): {
chatId: string;
chatType: "direct" | "group" | "unknown";
} {
const normalized = stripTelegramAnnouncePrefix(to);
const topicMatch = /^(.+?):topic:(\d+)$/.exec(normalized);
const colonMatch = /^(.+):(\d+)$/.exec(normalized);
const chatId = topicMatch?.[1] ?? colonMatch?.[1] ?? normalized;
const trimmedChatId = chatId.trim();
const chatType = /^-?\d+$/.test(trimmedChatId)
? trimmedChatId.startsWith("-")
? "group"
: "direct"
: "unknown";
return { chatId: trimmedChatId, chatType };
}
function shouldStripThreadFromAnnounceEntry(
normalizedRequester?: DeliveryContext,
normalizedEntry?: DeliveryContext,
): boolean {
if (
!normalizedRequester?.to ||
normalizedRequester.threadId != null ||
normalizedEntry?.threadId == null
) {
return false;
}
const requesterChannel = normalizedRequester.channel?.trim().toLowerCase();
if (requesterChannel && requesterChannel !== "telegram") {
return true;
}
if (!requesterChannel && !normalizedRequester.to.startsWith("telegram:")) {
return true;
}
try {
const requesterTarget = parseTelegramAnnounceTarget(normalizedRequester.to);
if (requesterTarget.chatType !== "group") {
return true;
}
const entryTarget = normalizedEntry.to
? parseTelegramAnnounceTarget(normalizedEntry.to)
: undefined;
if (entryTarget && entryTarget.chatId !== requesterTarget.chatId) {
return true;
}
return false;
} catch {
return false;
}
}
const TRANSIENT_ANNOUNCE_DELIVERY_ERROR_PATTERNS: readonly RegExp[] = [
/\berrorcode=unavailable\b/i,
/\bstatus\s*[:=]\s*"?unavailable\b/i,
@ -196,9 +269,7 @@ export function resolveAnnounceOrigin(
);
}
const entryForMerge =
normalizedRequester?.to &&
normalizedRequester.threadId == null &&
normalizedEntry?.threadId != null
normalizedEntry && shouldStripThreadFromAnnounceEntry(normalizedRequester, normalizedEntry)
? (() => {
const { threadId: _ignore, ...rest } = normalizedEntry;
return rest;

View File

@ -123,6 +123,7 @@ describe("extractDeliveryInfo", () => {
to: "group:98765",
accountId: "main",
});
storeState.store[baseKey].lastThreadId = "55";
const result = extractDeliveryInfo(topicKey);
@ -131,8 +132,33 @@ describe("extractDeliveryInfo", () => {
channel: "telegram",
to: "group:98765",
accountId: "main",
threadId: "55",
},
threadId: "55",
});
});
it("falls back to session metadata thread ids when deliveryContext.threadId is missing", () => {
const sessionKey = "agent:main:telegram:group:98765";
storeState.store[sessionKey] = {
...buildEntry({
channel: "telegram",
to: "group:98765",
accountId: "main",
}),
origin: { threadId: 77 },
};
const result = extractDeliveryInfo(sessionKey);
expect(result).toEqual({
deliveryContext: {
channel: "telegram",
to: "group:98765",
accountId: "main",
threadId: "77",
},
threadId: undefined,
});
});
});

View File

@ -15,7 +15,9 @@ export function parseSessionThreadInfo(sessionKey: string | undefined): {
}
export function extractDeliveryInfo(sessionKey: string | undefined): {
deliveryContext: { channel?: string; to?: string; accountId?: string } | undefined;
deliveryContext:
| { channel?: string; to?: string; accountId?: string; threadId?: string }
| undefined;
threadId: string | undefined;
} {
const { baseSessionKey, threadId } = parseSessionThreadInfo(sessionKey);
@ -23,7 +25,9 @@ export function extractDeliveryInfo(sessionKey: string | undefined): {
return { deliveryContext: undefined, threadId };
}
let deliveryContext: { channel?: string; to?: string; accountId?: string } | undefined;
let deliveryContext:
| { channel?: string; to?: string; accountId?: string; threadId?: string }
| undefined;
try {
const cfg = loadConfig();
const storePath = resolveStorePath(cfg.session?.store);
@ -33,10 +37,13 @@ export function extractDeliveryInfo(sessionKey: string | undefined): {
entry = store[baseSessionKey];
}
if (entry?.deliveryContext) {
const resolvedThreadId =
entry.deliveryContext.threadId ?? entry.lastThreadId ?? entry.origin?.threadId;
deliveryContext = {
channel: entry.deliveryContext.channel,
to: entry.deliveryContext.to,
accountId: entry.deliveryContext.accountId,
threadId: resolvedThreadId != null ? String(resolvedThreadId) : undefined,
};
}
} catch {

View File

@ -0,0 +1,66 @@
import { describe, expect, it } from "vitest";
import {
buildDeliveryFromLegacyPayload,
buildDeliveryPatchFromLegacyPayload,
hasLegacyDeliveryHints,
mergeLegacyDeliveryInto,
normalizeLegacyDeliveryInput,
} from "./legacy-delivery.js";
describe("legacy delivery threadId support", () => {
it("treats threadId as a legacy delivery hint", () => {
expect(hasLegacyDeliveryHints({ threadId: "42" })).toBe(true);
expect(hasLegacyDeliveryHints({ threadId: 42 })).toBe(true);
});
it("hydrates threadId into new delivery payloads", () => {
expect(
buildDeliveryFromLegacyPayload({
channel: "telegram",
to: "-100123:topic:42",
threadId: 42,
}),
).toEqual({
mode: "announce",
channel: "telegram",
to: "-100123:topic:42",
threadId: "42",
});
});
it("patches and merges threadId into existing deliveries", () => {
expect(buildDeliveryPatchFromLegacyPayload({ threadId: "77" })).toEqual({
mode: "announce",
threadId: "77",
});
expect(
mergeLegacyDeliveryInto(
{ mode: "announce", channel: "telegram", to: "-100123", threadId: "1" },
{ threadId: 77 },
),
).toEqual({
delivery: { mode: "announce", channel: "telegram", to: "-100123", threadId: "77" },
mutated: true,
});
});
it("strips threadId from legacy payloads after normalization", () => {
const payload: Record<string, unknown> = {
channel: "telegram",
to: "-100123:topic:42",
threadId: 42,
};
expect(normalizeLegacyDeliveryInput({ payload })).toEqual({
delivery: {
mode: "announce",
channel: "telegram",
to: "-100123:topic:42",
threadId: "42",
},
mutated: true,
});
expect(payload.threadId).toBeUndefined();
});
});

View File

@ -14,6 +14,12 @@ export function hasLegacyDeliveryHints(payload: Record<string, unknown>) {
if (typeof payload.to === "string" && payload.to.trim()) {
return true;
}
if (typeof payload.threadId === "string" && payload.threadId.trim()) {
return true;
}
if (typeof payload.threadId === "number" && Number.isFinite(payload.threadId)) {
return true;
}
return false;
}
@ -29,6 +35,12 @@ export function buildDeliveryFromLegacyPayload(
? payload.provider.trim().toLowerCase()
: "";
const toRaw = typeof payload.to === "string" ? payload.to.trim() : "";
const threadIdRaw =
typeof payload.threadId === "string"
? payload.threadId.trim()
: typeof payload.threadId === "number" && Number.isFinite(payload.threadId)
? String(payload.threadId)
: "";
const next: Record<string, unknown> = { mode };
if (channelRaw) {
next.channel = channelRaw;
@ -36,6 +48,9 @@ export function buildDeliveryFromLegacyPayload(
if (toRaw) {
next.to = toRaw;
}
if (threadIdRaw) {
next.threadId = threadIdRaw;
}
if (typeof payload.bestEffortDeliver === "boolean") {
next.bestEffort = payload.bestEffortDeliver;
}
@ -51,6 +66,12 @@ export function buildDeliveryPatchFromLegacyPayload(payload: Record<string, unkn
? payload.provider.trim().toLowerCase()
: "";
const toRaw = typeof payload.to === "string" ? payload.to.trim() : "";
const threadIdRaw =
typeof payload.threadId === "string"
? payload.threadId.trim()
: typeof payload.threadId === "number" && Number.isFinite(payload.threadId)
? String(payload.threadId)
: "";
const next: Record<string, unknown> = {};
let hasPatch = false;
@ -61,6 +82,7 @@ export function buildDeliveryPatchFromLegacyPayload(payload: Record<string, unkn
deliver === true ||
channelRaw ||
toRaw ||
threadIdRaw ||
typeof payload.bestEffortDeliver === "boolean"
) {
next.mode = "announce";
@ -74,6 +96,10 @@ export function buildDeliveryPatchFromLegacyPayload(payload: Record<string, unkn
next.to = toRaw;
hasPatch = true;
}
if (threadIdRaw) {
next.threadId = threadIdRaw;
hasPatch = true;
}
if (typeof payload.bestEffortDeliver === "boolean") {
next.bestEffort = payload.bestEffortDeliver;
hasPatch = true;
@ -106,6 +132,10 @@ export function mergeLegacyDeliveryInto(
next.to = patch.to;
mutated = true;
}
if ("threadId" in patch && patch.threadId !== next.threadId) {
next.threadId = patch.threadId;
mutated = true;
}
if ("bestEffort" in patch && patch.bestEffort !== next.bestEffort) {
next.bestEffort = patch.bestEffort;
mutated = true;
@ -151,6 +181,9 @@ export function stripLegacyDeliveryFields(payload: Record<string, unknown>) {
if ("to" in payload) {
delete payload.to;
}
if ("threadId" in payload) {
delete payload.threadId;
}
if ("bestEffortDeliver" in payload) {
delete payload.bestEffortDeliver;
}