fix(telegram): prevent silent message loss across all streamMode settings (#19041)

Merged via /review-pr -> /prepare-pr -> /merge-pr.

Prepared head SHA: 82898339f0
Co-authored-by: mudrii <220262+mudrii@users.noreply.github.com>
Co-authored-by: obviyus <22031114+obviyus@users.noreply.github.com>
Reviewed-by: @obviyus
This commit is contained in:
mudrii 2026-02-20 13:16:55 +08:00 committed by GitHub
parent 99db4c7903
commit beb2b74b5b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 316 additions and 15 deletions

View File

@ -33,6 +33,7 @@ Docs: https://docs.openclaw.ai
- CLI: keep `openclaw -v` as a root-only version alias so subcommand `-v, --verbose` flags (for example ACP/hooks/skills) are no longer intercepted globally. (#21303) thanks @adhitShet.
- Config/Memory: restore schema help/label metadata for hybrid `mmr` and `temporalDecay` settings so configuration surfaces show correct names and guidance. (#18786) Thanks @rodrigouroz.
- Tools/web_search: handle xAI Responses API payloads that emit top-level `output_text` blocks (without a `message` wrapper) so Grok web_search no longer returns `No response` for those results. (#20508) Thanks @echoVic.
- Telegram/Streaming: always clean up draft previews even when dispatch throws before fallback handling, preventing orphaned preview messages during failed runs. (#19041) thanks @mudrii.
- Discord/Gateway: handle close code 4014 (missing privileged gateway intents) without crashing the gateway. Thanks @thewilloftheshadow.
- Security/Net: strip sensitive headers (`Authorization`, `Proxy-Authorization`, `Cookie`, `Cookie2`) on cross-origin redirects in `fetchWithSsrFGuard` to prevent credential forwarding across origin boundaries. (#20313) Thanks @afurm.

View File

@ -320,6 +320,29 @@ describe("dispatchTelegramMessage draft streaming", () => {
);
});
it("disables block streaming when streamMode is off even if blockStreaming config is true", async () => {
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: "Hello" }, { kind: "final" });
return { queuedFinal: true };
});
deliverReplies.mockResolvedValue({ delivered: true });
await dispatchWithContext({
context: createContext(),
streamMode: "off",
telegramCfg: { blockStreaming: true },
});
expect(createTelegramDraftStream).not.toHaveBeenCalled();
expect(dispatchReplyWithBufferedBlockDispatcher).toHaveBeenCalledWith(
expect.objectContaining({
replyOptions: expect.objectContaining({
disableBlockStreaming: true,
}),
}),
);
});
it("forces new message when new assistant message starts after previous output", async () => {
const draftStream = createDraftStream(999);
createTelegramDraftStream.mockReturnValue(draftStream);
@ -479,4 +502,234 @@ describe("dispatchTelegramMessage draft streaming", () => {
}),
);
});
it("clears preview for error-only finals", async () => {
const draftStream = createDraftStream(999);
createTelegramDraftStream.mockReturnValue(draftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: "tool failed", isError: true }, { kind: "final" });
await dispatcherOptions.deliver({ text: "another error", isError: true }, { kind: "final" });
return { queuedFinal: true };
});
deliverReplies.mockResolvedValue({ delivered: true });
await dispatchWithContext({ context: createContext() });
// Error payloads skip preview finalization — preview must be cleaned up
expect(draftStream.clear).toHaveBeenCalledTimes(1);
});
it("clears preview after media final delivery", async () => {
const draftStream = createDraftStream(999);
createTelegramDraftStream.mockReturnValue(draftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ mediaUrl: "file:///tmp/a.png" }, { kind: "final" });
return { queuedFinal: true };
});
deliverReplies.mockResolvedValue({ delivered: true });
await dispatchWithContext({ context: createContext() });
expect(draftStream.clear).toHaveBeenCalledTimes(1);
});
it("clears stale preview when response is NO_REPLY", async () => {
const draftStream = createDraftStream(999);
createTelegramDraftStream.mockReturnValue(draftStream);
dispatchReplyWithBufferedBlockDispatcher.mockResolvedValue({
queuedFinal: false,
});
await dispatchWithContext({ context: createContext() });
// Preview contains stale partial text — must be cleaned up
expect(draftStream.clear).toHaveBeenCalledTimes(1);
});
it("falls back when all finals are skipped and clears preview", async () => {
const draftStream = createDraftStream(999);
createTelegramDraftStream.mockReturnValue(draftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
dispatcherOptions.onSkip?.({ text: "" }, { reason: "no_reply", kind: "final" });
return { queuedFinal: false };
});
deliverReplies.mockResolvedValueOnce({ delivered: true });
await dispatchWithContext({ context: createContext() });
expect(deliverReplies).toHaveBeenCalledWith(
expect.objectContaining({
replies: [
expect.objectContaining({
text: expect.stringContaining("No response"),
}),
],
}),
);
expect(draftStream.clear).toHaveBeenCalledTimes(1);
});
it("sends fallback and clears preview when deliver throws (dispatcher swallows error)", async () => {
const draftStream = createDraftStream();
createTelegramDraftStream.mockReturnValue(draftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
try {
await dispatcherOptions.deliver({ text: "Hello" }, { kind: "final" });
} catch (err) {
dispatcherOptions.onError(err, { kind: "final" });
}
return { queuedFinal: false };
});
deliverReplies
.mockRejectedValueOnce(new Error("network down"))
.mockResolvedValueOnce({ delivered: true });
await expect(dispatchWithContext({ context: createContext() })).resolves.toBeUndefined();
// Fallback should be sent because failedDeliveries > 0
expect(deliverReplies).toHaveBeenCalledTimes(2);
expect(deliverReplies).toHaveBeenLastCalledWith(
expect.objectContaining({
replies: [
expect.objectContaining({
text: expect.stringContaining("No response"),
}),
],
}),
);
expect(draftStream.clear).toHaveBeenCalledTimes(1);
});
it("sends fallback in off mode when deliver throws", async () => {
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
try {
await dispatcherOptions.deliver({ text: "Hello" }, { kind: "final" });
} catch (err) {
dispatcherOptions.onError(err, { kind: "final" });
}
return { queuedFinal: false };
});
deliverReplies
.mockRejectedValueOnce(new Error("403 bot blocked"))
.mockResolvedValueOnce({ delivered: true });
await dispatchWithContext({ context: createContext(), streamMode: "off" });
expect(createTelegramDraftStream).not.toHaveBeenCalled();
expect(deliverReplies).toHaveBeenCalledTimes(2);
expect(deliverReplies).toHaveBeenLastCalledWith(
expect.objectContaining({
replies: [
expect.objectContaining({
text: expect.stringContaining("No response"),
}),
],
}),
);
});
it("handles error block + response final — error delivered, response finalizes preview", async () => {
const draftStream = createDraftStream(999);
createTelegramDraftStream.mockReturnValue(draftStream);
editMessageTelegram.mockResolvedValue({ ok: true });
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
replyOptions?.onPartialReply?.({ text: "Processing..." });
await dispatcherOptions.deliver(
{ text: "⚠️ exec failed", isError: true },
{ kind: "block" },
);
await dispatcherOptions.deliver(
{ text: "The command timed out. Here's what I found..." },
{ kind: "final" },
);
return { queuedFinal: true };
},
);
deliverReplies.mockResolvedValue({ delivered: true });
await dispatchWithContext({ context: createContext() });
// Block error went through deliverReplies
expect(deliverReplies).toHaveBeenCalledTimes(1);
// Final was finalized via preview edit
expect(editMessageTelegram).toHaveBeenCalledWith(
123,
999,
"The command timed out. Here's what I found...",
expect.any(Object),
);
expect(draftStream.clear).not.toHaveBeenCalled();
});
it("cleans up preview even when fallback delivery throws (double failure)", async () => {
const draftStream = createDraftStream();
createTelegramDraftStream.mockReturnValue(draftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
try {
await dispatcherOptions.deliver({ text: "Hello" }, { kind: "final" });
} catch (err) {
dispatcherOptions.onError(err, { kind: "final" });
}
return { queuedFinal: false };
});
// No preview message id → deliver goes through deliverReplies directly
// Primary delivery fails
deliverReplies
.mockRejectedValueOnce(new Error("network down"))
// Fallback also fails
.mockRejectedValueOnce(new Error("still down"));
// Fallback throws, but cleanup still runs via try/finally.
await dispatchWithContext({ context: createContext() }).catch(() => {});
// Verify fallback was attempted and preview still cleaned up
expect(deliverReplies).toHaveBeenCalledTimes(2);
expect(draftStream.clear).toHaveBeenCalledTimes(1);
});
it("clears preview when dispatcher throws before fallback phase", async () => {
const draftStream = createDraftStream(999);
createTelegramDraftStream.mockReturnValue(draftStream);
dispatchReplyWithBufferedBlockDispatcher.mockRejectedValue(new Error("dispatcher exploded"));
await expect(dispatchWithContext({ context: createContext() })).rejects.toThrow(
"dispatcher exploded",
);
expect(draftStream.stop).toHaveBeenCalledTimes(1);
expect(draftStream.clear).toHaveBeenCalledTimes(1);
expect(deliverReplies).not.toHaveBeenCalled();
});
it("supports concurrent dispatches with independent previews", async () => {
const draftA = createDraftStream(11);
const draftB = createDraftStream(22);
createTelegramDraftStream.mockReturnValueOnce(draftA).mockReturnValueOnce(draftB);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({ text: "partial" });
await dispatcherOptions.deliver({ mediaUrl: "file:///tmp/a.png" }, { kind: "final" });
return { queuedFinal: true };
},
);
deliverReplies.mockResolvedValue({ delivered: true });
await Promise.all([
dispatchWithContext({
context: createContext({
chatId: 1,
msg: { chat: { id: 1, type: "private" }, message_id: 1 } as never,
}),
}),
dispatchWithContext({
context: createContext({
chatId: 2,
msg: { chat: { id: 2, type: "private" }, message_id: 2 } as never,
}),
}),
]);
expect(draftA.clear).toHaveBeenCalledTimes(1);
expect(draftB.clear).toHaveBeenCalledTimes(1);
});
});

View File

@ -189,11 +189,13 @@ export const dispatchTelegramMessage = async ({
};
const disableBlockStreaming =
typeof telegramCfg.blockStreaming === "boolean"
? !telegramCfg.blockStreaming
: draftStream || streamMode === "off"
? true
: undefined;
streamMode === "off"
? true // off mode must always disable block streaming
: typeof telegramCfg.blockStreaming === "boolean"
? !telegramCfg.blockStreaming
: draftStream
? true
: undefined;
const { onModelSelected, ...prefixOptions } = createReplyPrefixOptions({
cfg,
@ -269,8 +271,26 @@ export const dispatchTelegramMessage = async ({
const deliveryState = {
delivered: false,
skippedNonSilent: 0,
failedDeliveries: 0,
};
let finalizedViaPreviewMessage = false;
/**
* Clean up the draft preview message. The preview must be removed in every
* case EXCEPT when it was successfully finalized as the actual response via
* an in-place edit (`finalizedViaPreviewMessage === true`).
*/
const clearDraftPreviewIfNeeded = async () => {
if (finalizedViaPreviewMessage) {
return;
}
try {
await draftStream?.clear();
} catch (err) {
logVerbose(`telegram: draft preview cleanup failed: ${String(err)}`);
}
};
const clearGroupHistory = () => {
if (isGroup && historyKey) {
clearHistoryEntriesIfEnabled({ historyMap: groupHistories, historyKey, limit: historyLimit });
@ -292,6 +312,7 @@ export const dispatchTelegramMessage = async ({
};
let queuedFinal = false;
let dispatchError: unknown;
try {
({ queuedFinal } = await dispatchReplyWithBufferedBlockDispatcher({
ctx: ctxPayload,
@ -340,6 +361,9 @@ export const dispatchTelegramMessage = async ({
});
finalizedViaPreviewMessage = true;
deliveryState.delivered = true;
logVerbose(
`telegram: finalized response via preview edit (messageId=${previewMessageId})`,
);
return;
} catch (err) {
logVerbose(
@ -382,6 +406,9 @@ export const dispatchTelegramMessage = async ({
});
finalizedViaPreviewMessage = true;
deliveryState.delivered = true;
logVerbose(
`telegram: finalized response via post-stop preview edit (messageId=${messageIdAfterStop})`,
);
return;
} catch (err) {
logVerbose(
@ -397,6 +424,13 @@ export const dispatchTelegramMessage = async ({
});
if (result.delivered) {
deliveryState.delivered = true;
logVerbose(
`telegram: ${info.kind} reply delivered to chat ${chatId}${payload.isError ? " (error payload)" : ""}`,
);
} else {
logVerbose(
`telegram: ${info.kind} reply delivery returned not-delivered for chat ${chatId}`,
);
}
},
onSkip: (_payload, info) => {
@ -405,6 +439,7 @@ export const dispatchTelegramMessage = async ({
}
},
onError: (err, info) => {
deliveryState.failedDeliveries += 1;
runtime.error?.(danger(`telegram ${info.kind} reply failed: ${String(err)}`));
},
onReplyStart: createTypingCallbacks({
@ -453,20 +488,29 @@ export const dispatchTelegramMessage = async ({
onModelSelected,
},
}));
} catch (err) {
dispatchError = err;
} finally {
// Must stop() first to flush debounced content before clear() wipes state
await draftStream?.stop();
if (!finalizedViaPreviewMessage) {
await draftStream?.clear();
}
}
let sentFallback = false;
if (!deliveryState.delivered && deliveryState.skippedNonSilent > 0) {
const result = await deliverReplies({
replies: [{ text: EMPTY_RESPONSE_FALLBACK }],
...deliveryBaseOptions,
});
sentFallback = result.delivered;
try {
if (
!dispatchError &&
!deliveryState.delivered &&
(deliveryState.skippedNonSilent > 0 || deliveryState.failedDeliveries > 0)
) {
const result = await deliverReplies({
replies: [{ text: EMPTY_RESPONSE_FALLBACK }],
...deliveryBaseOptions,
});
sentFallback = result.delivered;
}
} finally {
await clearDraftPreviewIfNeeded();
}
if (dispatchError) {
throw dispatchError;
}
const hasFinalResponse = queuedFinal || sentFallback;

View File

@ -558,6 +558,7 @@ async function sendTelegramText(
...baseParams,
}),
});
runtime.log?.(`telegram sendMessage ok chat=${chatId} message=${res.message_id}`);
return res.message_id;
} catch (err) {
const errText = formatErrorMessage(err);
@ -574,6 +575,7 @@ async function sendTelegramText(
...baseParams,
}),
});
runtime.log?.(`telegram sendMessage ok chat=${chatId} message=${res.message_id} (plain)`);
return res.message_id;
}
throw err;

View File

@ -127,6 +127,7 @@ export function createTelegramDraftStream(params: {
}
try {
await params.api.deleteMessage(chatId, messageId);
params.log?.(`telegram stream preview deleted (chat=${chatId}, message=${messageId})`);
} catch (err) {
params.warn?.(
`telegram stream preview cleanup failed: ${err instanceof Error ? err.message : String(err)}`,