fix: stabilize Telegram draft boundaries and suppress NO_REPLY lead leaks (#33169)

* fix: stabilize telegram draft stream message boundaries

* fix: suppress NO_REPLY lead-fragment leaks

* fix: keep underscore guard for non-NO_REPLY prefixes

* fix: skip assistant-start rotation only after real lane rotation

* fix: preserve finalized state when pre-rotation does not force

* fix: reset finalized preview state on message-start boundary

* fix: document Telegram draft boundary + NO_REPLY reliability updates (#33169) (thanks @obviyus)
This commit is contained in:
Ayaan Zaidi 2026-03-03 22:49:33 +05:30 committed by GitHub
parent a7a9a3d3c8
commit 3d998828b9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 212 additions and 137 deletions

View File

@ -17,6 +17,7 @@ Docs: https://docs.openclaw.ai
- Discord/typing cleanup: stop typing indicators after silent/NO_REPLY runs by marking the run complete before dispatch idle cleanup. Thanks @thewilloftheshadow.
- Discord/voice messages: request upload slots with JSON fetch calls so voice message uploads no longer fail with content-type errors. Thanks @thewilloftheshadow.
- Telegram/DM draft finalization reliability: require verified final-text draft emission before treating preview finalization as delivered, and fall back to normal payload send when final draft delivery is not confirmed (preventing missing final responses and preserving media/button delivery). (#32118) Thanks @OpenCils.
- Telegram/draft preview boundary + silent-token reliability: stabilize answer-lane message boundaries across late-partial/message-start races, preserve/reset finalized preview state at the correct boundaries, and suppress `NO_REPLY` lead-fragment leaks without broad heartbeat-prefix false positives. (#33169) Thanks @obviyus.
- Discord/audit wildcard warnings: ignore "\*" wildcard keys when counting unresolved guild channels so doctor/status no longer warns on allow-all configs. (#33125) Thanks @thewilloftheshadow.
- Discord/channel resolution: default bare numeric recipients to channels, harden allowlist numeric ID handling with safe fallbacks, and avoid inbound WS heartbeat stalls. (#33142) Thanks @thewilloftheshadow.
- Discord/chunk delivery reliability: preserve chunk ordering when using a REST client and retry chunk sends on 429/5xx using account retry settings. (#33226) Thanks @thewilloftheshadow.

View File

@ -410,7 +410,7 @@ describe("runReplyAgent typing (heartbeat)", () => {
shouldType: false,
},
{
partials: ["NO_", "NO_RE", "NO_REPLY"],
partials: ["NO", "NO_", "NO_RE", "NO_REPLY"],
finalText: "NO_REPLY",
expectedForwarded: [] as string[],
shouldType: false,

View File

@ -74,7 +74,8 @@ describe("stripSilentToken", () => {
});
describe("isSilentReplyPrefixText", () => {
it("matches uppercase underscore prefixes", () => {
it("matches uppercase token lead fragments", () => {
expect(isSilentReplyPrefixText("NO")).toBe(true);
expect(isSilentReplyPrefixText("NO_")).toBe(true);
expect(isSilentReplyPrefixText("NO_RE")).toBe(true);
expect(isSilentReplyPrefixText("NO_REPLY")).toBe(true);
@ -84,9 +85,17 @@ describe("isSilentReplyPrefixText", () => {
it("rejects ambiguous natural-language prefixes", () => {
expect(isSilentReplyPrefixText("N")).toBe(false);
expect(isSilentReplyPrefixText("No")).toBe(false);
expect(isSilentReplyPrefixText("no")).toBe(false);
expect(isSilentReplyPrefixText("Hello")).toBe(false);
});
it("keeps underscore guard for non-NO_REPLY tokens", () => {
expect(isSilentReplyPrefixText("HE", "HEARTBEAT_OK")).toBe(false);
expect(isSilentReplyPrefixText("HEART", "HEARTBEAT_OK")).toBe(false);
expect(isSilentReplyPrefixText("HEARTBEAT", "HEARTBEAT_OK")).toBe(false);
expect(isSilentReplyPrefixText("HEARTBEAT_", "HEARTBEAT_OK")).toBe(true);
});
it("rejects non-prefixes and mixed characters", () => {
expect(isSilentReplyPrefixText("NO_X")).toBe(false);
expect(isSilentReplyPrefixText("NO_REPLY more")).toBe(false);

View File

@ -56,15 +56,34 @@ export function isSilentReplyPrefixText(
if (!text) {
return false;
}
const normalized = text.trimStart().toUpperCase();
const trimmed = text.trimStart();
if (!trimmed) {
return false;
}
// Guard against suppressing natural-language "No..." text while still
// catching uppercase lead fragments like "NO" from streamed NO_REPLY.
if (trimmed !== trimmed.toUpperCase()) {
return false;
}
const normalized = trimmed.toUpperCase();
if (!normalized) {
return false;
}
if (!normalized.includes("_")) {
if (normalized.length < 2) {
return false;
}
if (/[^A-Z_]/.test(normalized)) {
return false;
}
return token.toUpperCase().startsWith(normalized);
const tokenUpper = token.toUpperCase();
if (!tokenUpper.startsWith(normalized)) {
return false;
}
if (normalized.includes("_")) {
return true;
}
// Keep underscore guard for generic tokens to avoid suppressing unrelated
// uppercase words (e.g. HEART/HE with HEARTBEAT_OK). Only allow bare "NO"
// because NO_REPLY streaming can transiently emit that fragment.
return tokenUpper === SILENT_REPLY_TOKEN && normalized === "NO";
}

View File

@ -444,6 +444,133 @@ describe("dispatchTelegramMessage draft streaming", () => {
expect(draftStream.forceNewMessage).not.toHaveBeenCalled();
});
it("rotates before a late second-message partial so finalized preview is not overwritten", async () => {
const answerDraftStream = createSequencedDraftStream(1001);
const reasoningDraftStream = createDraftStream();
createTelegramDraftStream
.mockImplementationOnce(() => answerDraftStream)
.mockImplementationOnce(() => reasoningDraftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({ text: "Message A partial" });
await dispatcherOptions.deliver({ text: "Message A final" }, { kind: "final" });
// Simulate provider ordering bug: first chunk arrives before message-start callback.
await replyOptions?.onPartialReply?.({ text: "Message B early" });
await replyOptions?.onAssistantMessageStart?.();
await replyOptions?.onPartialReply?.({ text: "Message B partial" });
await dispatcherOptions.deliver({ text: "Message B final" }, { kind: "final" });
return { queuedFinal: true };
},
);
deliverReplies.mockResolvedValue({ delivered: true });
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" });
await dispatchWithContext({ context: createContext(), streamMode: "partial" });
expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(1);
expect(answerDraftStream.update).toHaveBeenNthCalledWith(2, "Message B early");
const boundaryRotationOrder = answerDraftStream.forceNewMessage.mock.invocationCallOrder[0];
const secondUpdateOrder = answerDraftStream.update.mock.invocationCallOrder[1];
expect(boundaryRotationOrder).toBeLessThan(secondUpdateOrder);
expect(editMessageTelegram).toHaveBeenNthCalledWith(
1,
123,
1001,
"Message A final",
expect.any(Object),
);
expect(editMessageTelegram).toHaveBeenNthCalledWith(
2,
123,
1002,
"Message B final",
expect.any(Object),
);
});
it("does not skip message-start rotation when pre-rotation did not force a new message", async () => {
const answerDraftStream = createSequencedDraftStream(1002);
answerDraftStream.setMessageId(1001);
const reasoningDraftStream = createDraftStream();
createTelegramDraftStream
.mockImplementationOnce(() => answerDraftStream)
.mockImplementationOnce(() => reasoningDraftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
// First message has only final text (no streamed partials), so answer lane
// reaches finalized state with hasStreamedMessage still false.
await dispatcherOptions.deliver({ text: "Message A final" }, { kind: "final" });
// Provider ordering bug: next message partial arrives before message-start.
await replyOptions?.onPartialReply?.({ text: "Message B early" });
await replyOptions?.onAssistantMessageStart?.();
await replyOptions?.onPartialReply?.({ text: "Message B partial" });
await dispatcherOptions.deliver({ text: "Message B final" }, { kind: "final" });
return { queuedFinal: true };
},
);
deliverReplies.mockResolvedValue({ delivered: true });
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" });
const bot = createBot();
await dispatchWithContext({ context: createContext(), streamMode: "partial", bot });
// Early pre-rotation could not force (no streamed partials yet), so the
// real assistant message_start must still rotate once.
expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(1);
expect(answerDraftStream.update).toHaveBeenNthCalledWith(1, "Message B early");
expect(answerDraftStream.update).toHaveBeenNthCalledWith(2, "Message B partial");
const earlyUpdateOrder = answerDraftStream.update.mock.invocationCallOrder[0];
const boundaryRotationOrder = answerDraftStream.forceNewMessage.mock.invocationCallOrder[0];
const secondUpdateOrder = answerDraftStream.update.mock.invocationCallOrder[1];
expect(earlyUpdateOrder).toBeLessThan(boundaryRotationOrder);
expect(boundaryRotationOrder).toBeLessThan(secondUpdateOrder);
expect(editMessageTelegram).toHaveBeenNthCalledWith(
1,
123,
1001,
"Message A final",
expect.any(Object),
);
expect(editMessageTelegram).toHaveBeenNthCalledWith(
2,
123,
1002,
"Message B final",
expect.any(Object),
);
expect((bot.api.deleteMessage as ReturnType<typeof vi.fn>).mock.calls).toHaveLength(0);
});
it("does not trigger late pre-rotation mid-message after an explicit assistant message start", async () => {
const answerDraftStream = createDraftStream(1001);
const reasoningDraftStream = createDraftStream();
createTelegramDraftStream
.mockImplementationOnce(() => answerDraftStream)
.mockImplementationOnce(() => reasoningDraftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
// Message A finalizes without streamed partials.
await dispatcherOptions.deliver({ text: "Message A final" }, { kind: "final" });
// Message B starts normally before partials.
await replyOptions?.onAssistantMessageStart?.();
await replyOptions?.onPartialReply?.({ text: "Message B first chunk" });
await replyOptions?.onPartialReply?.({ text: "Message B second chunk" });
await dispatcherOptions.deliver({ text: "Message B final" }, { kind: "final" });
return { queuedFinal: true };
},
);
deliverReplies.mockResolvedValue({ delivered: true });
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" });
await dispatchWithContext({ context: createContext(), streamMode: "partial" });
// The explicit message_start boundary must clear finalized state so
// same-message partials do not force a new preview mid-stream.
expect(answerDraftStream.forceNewMessage).not.toHaveBeenCalled();
expect(answerDraftStream.update).toHaveBeenNthCalledWith(1, "Message B first chunk");
expect(answerDraftStream.update).toHaveBeenNthCalledWith(2, "Message B second chunk");
});
it("finalizes multi-message assistant stream to matching preview messages in order", async () => {
const answerDraftStream = createSequencedDraftStream(1001);
const reasoningDraftStream = createDraftStream();

View File

@ -225,16 +225,20 @@ export const dispatchTelegramMessage = async ({
stream,
lastPartialText: "",
hasStreamedMessage: false,
previewRevisionBaseline: stream?.previewRevision?.() ?? 0,
};
};
const lanes: Record<LaneName, DraftLaneState> = {
answer: createDraftLane("answer", canStreamAnswerDraft),
reasoning: createDraftLane("reasoning", canStreamReasoningDraft),
};
const finalizedPreviewByLane: Record<LaneName, boolean> = {
answer: false,
reasoning: false,
};
const answerLane = lanes.answer;
const reasoningLane = lanes.reasoning;
let splitReasoningOnNextStream = false;
let skipNextAnswerMessageStartRotation = false;
const reasoningStepState = createTelegramReasoningStepState();
type SplitLaneSegment = { lane: LaneName; text: string };
type SplitLaneSegmentsResult = {
@ -260,7 +264,29 @@ export const dispatchTelegramMessage = async ({
const resetDraftLaneState = (lane: DraftLaneState) => {
lane.lastPartialText = "";
lane.hasStreamedMessage = false;
lane.previewRevisionBaseline = lane.stream?.previewRevision?.() ?? lane.previewRevisionBaseline;
};
const rotateAnswerLaneForNewAssistantMessage = () => {
let didForceNewMessage = false;
if (answerLane.hasStreamedMessage) {
const previewMessageId = answerLane.stream?.messageId();
// Only archive previews that still need a matching final text update.
// Once a preview has already been finalized, archiving it here causes
// cleanup to delete a user-visible final message on later media-only turns.
if (typeof previewMessageId === "number" && !finalizedPreviewByLane.answer) {
archivedAnswerPreviews.push({
messageId: previewMessageId,
textSnapshot: answerLane.lastPartialText,
});
}
answerLane.stream?.forceNewMessage();
didForceNewMessage = true;
}
resetDraftLaneState(answerLane);
if (didForceNewMessage) {
// New assistant message boundary: this lane now tracks a fresh preview lifecycle.
finalizedPreviewByLane.answer = false;
}
return didForceNewMessage;
};
const updateDraftFromPartial = (lane: DraftLaneState, text: string | undefined) => {
const laneStream = lane.stream;
@ -287,6 +313,13 @@ export const dispatchTelegramMessage = async ({
};
const ingestDraftLaneSegments = (text: string | undefined) => {
const split = splitTextIntoLaneSegments(text);
const hasAnswerSegment = split.segments.some((segment) => segment.lane === "answer");
if (hasAnswerSegment && finalizedPreviewByLane.answer) {
// Some providers can emit the first partial of a new assistant message before
// onAssistantMessageStart() arrives. Rotate preemptively so we do not edit
// the previously finalized preview message with the next message's text.
skipNextAnswerMessageStartRotation = rotateAnswerLaneForNewAssistantMessage();
}
for (const segment of split.segments) {
if (segment.lane === "reasoning") {
reasoningStepState.noteReasoningHint();
@ -376,10 +409,6 @@ export const dispatchTelegramMessage = async ({
? ctxPayload.ReplyToBody.trim() || undefined
: undefined;
const deliveryState = createLaneDeliveryStateTracker();
const finalizedPreviewByLane: Record<LaneName, boolean> = {
answer: false,
reasoning: false,
};
const clearGroupHistory = () => {
if (isGroup && historyKey) {
clearHistoryEntriesIfEnabled({ historyMap: groupHistories, historyKey, limit: historyLimit });
@ -599,21 +628,16 @@ export const dispatchTelegramMessage = async ({
onAssistantMessageStart: answerLane.stream
? async () => {
reasoningStepState.resetForNextStep();
if (answerLane.hasStreamedMessage) {
const previewMessageId = answerLane.stream?.messageId();
// Only archive previews that still need a matching final text update.
// Once a preview has already been finalized, archiving it here causes
// cleanup to delete a user-visible final message on later media-only turns.
if (typeof previewMessageId === "number" && !finalizedPreviewByLane.answer) {
archivedAnswerPreviews.push({
messageId: previewMessageId,
textSnapshot: answerLane.lastPartialText,
});
}
answerLane.stream?.forceNewMessage();
if (skipNextAnswerMessageStartRotation) {
skipNextAnswerMessageStartRotation = false;
finalizedPreviewByLane.answer = false;
return;
}
resetDraftLaneState(answerLane);
// New assistant message boundary: this lane now tracks a fresh preview lifecycle.
rotateAnswerLaneForNewAssistantMessage();
// Message-start is an explicit assistant-message boundary.
// Even when no forceNewMessage happened (e.g. prior answer had no
// streamed partials), the next partial belongs to a fresh lifecycle
// and must not trigger late pre-rotation mid-message.
finalizedPreviewByLane.answer = false;
}
: undefined,

View File

@ -10,7 +10,6 @@ function createHarness(params?: {
answerStream?: DraftLaneState["stream"];
answerHasStreamedMessage?: boolean;
answerLastPartialText?: string;
answerPreviewRevisionBaseline?: number;
}) {
const answer =
params?.answerStream ?? createTestDraftStream({ messageId: params?.answerMessageId });
@ -20,13 +19,11 @@ function createHarness(params?: {
stream: answer,
lastPartialText: params?.answerLastPartialText ?? "",
hasStreamedMessage: params?.answerHasStreamedMessage ?? false,
previewRevisionBaseline: params?.answerPreviewRevisionBaseline ?? 0,
},
reasoning: {
stream: reasoning as DraftLaneState["stream"],
lastPartialText: "",
hasStreamedMessage: false,
previewRevisionBaseline: 0,
},
};
const sendPayload = vi.fn().mockResolvedValue(true);
@ -212,10 +209,8 @@ describe("createLaneTextDeliverer", () => {
expect(harness.log).toHaveBeenCalledWith(expect.stringContaining("preview final too long"));
});
it("treats unchanged DM draft final text as already finalized", async () => {
it("sends a final message after DM draft streaming even when text is unchanged", async () => {
const answerStream = createTestDraftStream({ previewMode: "draft" });
answerStream.previewRevision.mockReturnValue(7);
answerStream.lastDeliveredText.mockReturnValue("Hello final");
answerStream.update.mockImplementation(() => {});
const harness = createHarness({
answerStream: answerStream as DraftLaneState["stream"],
@ -230,76 +225,19 @@ describe("createLaneTextDeliverer", () => {
infoKind: "final",
});
expect(result).toBe("preview-finalized");
expect(harness.flushDraftLane).toHaveBeenCalledTimes(1);
expect(harness.stopDraftLane).toHaveBeenCalledTimes(1);
expect(harness.sendPayload).not.toHaveBeenCalled();
expect(harness.markDelivered).toHaveBeenCalledTimes(1);
});
it("falls back once when DM draft finalization emits no update", async () => {
const answerStream = createTestDraftStream({ previewMode: "draft" });
answerStream.previewRevision.mockReturnValue(3);
answerStream.update.mockImplementation(() => {});
const harness = createHarness({
answerStream: answerStream as DraftLaneState["stream"],
answerHasStreamedMessage: true,
answerLastPartialText: "Partial",
});
const result = await harness.deliverLaneText({
laneName: "answer",
text: "Final answer",
payload: { text: "Final answer" },
infoKind: "final",
});
expect(result).toBe("sent");
expect(harness.flushDraftLane).toHaveBeenCalledTimes(1);
expect(harness.stopDraftLane).toHaveBeenCalledTimes(1);
expect(harness.sendPayload).toHaveBeenCalledWith(
expect.objectContaining({ text: "Final answer" }),
);
expect(harness.markDelivered).not.toHaveBeenCalled();
expect(harness.log).toHaveBeenCalledWith(
expect.stringContaining("draft final text not emitted"),
);
});
it("falls back when unchanged final text has no emitted draft preview in current lane", async () => {
const answerStream = createTestDraftStream({ previewMode: "draft" });
answerStream.previewRevision.mockReturnValue(7);
answerStream.update.mockImplementation(() => {});
const harness = createHarness({
answerStream: answerStream as DraftLaneState["stream"],
answerHasStreamedMessage: true,
answerLastPartialText: "Hello final",
answerPreviewRevisionBaseline: 7,
});
const result = await harness.deliverLaneText({
laneName: "answer",
text: "Hello final",
payload: { text: "Hello final" },
infoKind: "final",
});
expect(result).toBe("sent");
expect(harness.stopDraftLane).toHaveBeenCalledTimes(1);
expect(harness.flushDraftLane).toHaveBeenCalled();
expect(harness.stopDraftLane).toHaveBeenCalled();
expect(harness.sendPayload).toHaveBeenCalledWith(
expect.objectContaining({ text: "Hello final" }),
);
expect(harness.markDelivered).not.toHaveBeenCalled();
expect(harness.log).toHaveBeenCalledWith(
expect.stringContaining("draft final text not emitted"),
);
});
it("falls back when revision advances but final text was not emitted", async () => {
let previewRevision = 7;
it("sends a final message after DM draft streaming when revision changes", async () => {
let previewRevision = 3;
const answerStream = createTestDraftStream({ previewMode: "draft" });
answerStream.previewRevision.mockImplementation(() => previewRevision);
answerStream.lastDeliveredText.mockReturnValue("Older partial");
answerStream.update.mockImplementation(() => {});
answerStream.flush.mockImplementation(async () => {
previewRevision += 1;
@ -322,9 +260,6 @@ describe("createLaneTextDeliverer", () => {
expect.objectContaining({ text: "Final answer" }),
);
expect(harness.markDelivered).not.toHaveBeenCalled();
expect(harness.log).toHaveBeenCalledWith(
expect.stringContaining("draft final text not emitted"),
);
});
it("does not use DM draft final shortcut for media payloads", async () => {

View File

@ -8,7 +8,6 @@ export type DraftLaneState = {
stream: TelegramDraftStream | undefined;
lastPartialText: string;
hasStreamedMessage: boolean;
previewRevisionBaseline: number;
};
export type ArchivedPreview = {
@ -329,43 +328,6 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
!hasMedia && text.length > 0 && text.length <= params.draftMaxChars && !payload.isError;
if (infoKind === "final") {
const hasPreviewButtons = Boolean(previewButtons?.some((row) => row.length > 0));
const canFinalizeDraftPreviewDirectly =
isDraftPreviewLane(lane) &&
lane.hasStreamedMessage &&
canEditViaPreview &&
!hasPreviewButtons;
let draftPreviewStopped = false;
if (canFinalizeDraftPreviewDirectly) {
const previewRevisionBeforeFlush = lane.stream?.previewRevision?.() ?? 0;
const finalTextSnapshot = text.trimEnd();
const hasEmittedPreviewInCurrentLane =
previewRevisionBeforeFlush > lane.previewRevisionBaseline;
const deliveredPreviewTextBeforeFinal = lane.stream?.lastDeliveredText?.() ?? "";
const finalTextAlreadyDelivered =
deliveredPreviewTextBeforeFinal === finalTextSnapshot && hasEmittedPreviewInCurrentLane;
const unchangedFinalText = text === lane.lastPartialText;
lane.stream?.update(text);
await params.flushDraftLane(lane);
await params.stopDraftLane(lane);
draftPreviewStopped = true;
const previewUpdated = (lane.stream?.previewRevision?.() ?? 0) > previewRevisionBeforeFlush;
const deliveredPreviewTextAfterFinal =
lane.stream?.lastDeliveredText?.() ?? deliveredPreviewTextBeforeFinal;
if (
(previewUpdated && deliveredPreviewTextAfterFinal === finalTextSnapshot) ||
(unchangedFinalText && finalTextAlreadyDelivered)
) {
lane.lastPartialText = text;
params.finalizedPreviewByLane[laneName] = true;
params.markDelivered();
return "preview-finalized";
}
params.log(
`telegram: ${laneName} draft final text not emitted; falling back to standard send`,
);
}
if (laneName === "answer") {
const archivedResult = await consumeArchivedAnswerPreviewForFinal({
lane,
@ -378,7 +340,7 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
return archivedResult;
}
}
if (canEditViaPreview && !params.finalizedPreviewByLane[laneName] && !draftPreviewStopped) {
if (canEditViaPreview && !params.finalizedPreviewByLane[laneName]) {
await params.flushDraftLane(lane);
if (laneName === "answer") {
const archivedResultAfterFlush = await consumeArchivedAnswerPreviewForFinal({
@ -410,9 +372,7 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
`telegram: preview final too long for edit (${text.length} > ${params.draftMaxChars}); falling back to standard send`,
);
}
if (!draftPreviewStopped) {
await params.stopDraftLane(lane);
}
await params.stopDraftLane(lane);
const delivered = await params.sendPayload(params.applyTextToPayload(payload, text));
return delivered ? "sent" : "skipped";
}