diff --git a/CHANGELOG.md b/CHANGELOG.md index a1ff0267dfa..e1af9a7119c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,7 @@ Docs: https://docs.openclaw.ai - CLI: keep `openclaw -v` as a root-only version alias so subcommand `-v, --verbose` flags (for example ACP/hooks/skills) are no longer intercepted globally. (#21303) thanks @adhitShet. - Config/Memory: restore schema help/label metadata for hybrid `mmr` and `temporalDecay` settings so configuration surfaces show correct names and guidance. (#18786) Thanks @rodrigouroz. - Tools/web_search: handle xAI Responses API payloads that emit top-level `output_text` blocks (without a `message` wrapper) so Grok web_search no longer returns `No response` for those results. (#20508) Thanks @echoVic. +- Telegram/Streaming: always clean up draft previews even when dispatch throws before fallback handling, preventing orphaned preview messages during failed runs. (#19041) thanks @mudrii. - Discord/Gateway: handle close code 4014 (missing privileged gateway intents) without crashing the gateway. Thanks @thewilloftheshadow. - Security/Net: strip sensitive headers (`Authorization`, `Proxy-Authorization`, `Cookie`, `Cookie2`) on cross-origin redirects in `fetchWithSsrFGuard` to prevent credential forwarding across origin boundaries. (#20313) Thanks @afurm. diff --git a/src/telegram/bot-message-dispatch.test.ts b/src/telegram/bot-message-dispatch.test.ts index 8893628fd17..8f47ce636b9 100644 --- a/src/telegram/bot-message-dispatch.test.ts +++ b/src/telegram/bot-message-dispatch.test.ts @@ -320,6 +320,29 @@ describe("dispatchTelegramMessage draft streaming", () => { ); }); + it("disables block streaming when streamMode is off even if blockStreaming config is true", async () => { + dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => { + await dispatcherOptions.deliver({ text: "Hello" }, { kind: "final" }); + return { queuedFinal: true }; + }); + deliverReplies.mockResolvedValue({ delivered: true }); + + await dispatchWithContext({ + context: createContext(), + streamMode: "off", + telegramCfg: { blockStreaming: true }, + }); + + expect(createTelegramDraftStream).not.toHaveBeenCalled(); + expect(dispatchReplyWithBufferedBlockDispatcher).toHaveBeenCalledWith( + expect.objectContaining({ + replyOptions: expect.objectContaining({ + disableBlockStreaming: true, + }), + }), + ); + }); + it("forces new message when new assistant message starts after previous output", async () => { const draftStream = createDraftStream(999); createTelegramDraftStream.mockReturnValue(draftStream); @@ -479,4 +502,234 @@ describe("dispatchTelegramMessage draft streaming", () => { }), ); }); + + it("clears preview for error-only finals", async () => { + const draftStream = createDraftStream(999); + createTelegramDraftStream.mockReturnValue(draftStream); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => { + await dispatcherOptions.deliver({ text: "tool failed", isError: true }, { kind: "final" }); + await dispatcherOptions.deliver({ text: "another error", isError: true }, { kind: "final" }); + return { queuedFinal: true }; + }); + deliverReplies.mockResolvedValue({ delivered: true }); + + await dispatchWithContext({ context: createContext() }); + + // Error payloads skip preview finalization — preview must be cleaned up + expect(draftStream.clear).toHaveBeenCalledTimes(1); + }); + + it("clears preview after media final delivery", async () => { + const draftStream = createDraftStream(999); + createTelegramDraftStream.mockReturnValue(draftStream); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => { + await dispatcherOptions.deliver({ mediaUrl: "file:///tmp/a.png" }, { kind: "final" }); + return { queuedFinal: true }; + }); + deliverReplies.mockResolvedValue({ delivered: true }); + + await dispatchWithContext({ context: createContext() }); + + expect(draftStream.clear).toHaveBeenCalledTimes(1); + }); + + it("clears stale preview when response is NO_REPLY", async () => { + const draftStream = createDraftStream(999); + createTelegramDraftStream.mockReturnValue(draftStream); + dispatchReplyWithBufferedBlockDispatcher.mockResolvedValue({ + queuedFinal: false, + }); + + await dispatchWithContext({ context: createContext() }); + + // Preview contains stale partial text — must be cleaned up + expect(draftStream.clear).toHaveBeenCalledTimes(1); + }); + + it("falls back when all finals are skipped and clears preview", async () => { + const draftStream = createDraftStream(999); + createTelegramDraftStream.mockReturnValue(draftStream); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => { + dispatcherOptions.onSkip?.({ text: "" }, { reason: "no_reply", kind: "final" }); + return { queuedFinal: false }; + }); + deliverReplies.mockResolvedValueOnce({ delivered: true }); + + await dispatchWithContext({ context: createContext() }); + + expect(deliverReplies).toHaveBeenCalledWith( + expect.objectContaining({ + replies: [ + expect.objectContaining({ + text: expect.stringContaining("No response"), + }), + ], + }), + ); + expect(draftStream.clear).toHaveBeenCalledTimes(1); + }); + + it("sends fallback and clears preview when deliver throws (dispatcher swallows error)", async () => { + const draftStream = createDraftStream(); + createTelegramDraftStream.mockReturnValue(draftStream); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => { + try { + await dispatcherOptions.deliver({ text: "Hello" }, { kind: "final" }); + } catch (err) { + dispatcherOptions.onError(err, { kind: "final" }); + } + return { queuedFinal: false }; + }); + deliverReplies + .mockRejectedValueOnce(new Error("network down")) + .mockResolvedValueOnce({ delivered: true }); + + await expect(dispatchWithContext({ context: createContext() })).resolves.toBeUndefined(); + // Fallback should be sent because failedDeliveries > 0 + expect(deliverReplies).toHaveBeenCalledTimes(2); + expect(deliverReplies).toHaveBeenLastCalledWith( + expect.objectContaining({ + replies: [ + expect.objectContaining({ + text: expect.stringContaining("No response"), + }), + ], + }), + ); + expect(draftStream.clear).toHaveBeenCalledTimes(1); + }); + + it("sends fallback in off mode when deliver throws", async () => { + dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => { + try { + await dispatcherOptions.deliver({ text: "Hello" }, { kind: "final" }); + } catch (err) { + dispatcherOptions.onError(err, { kind: "final" }); + } + return { queuedFinal: false }; + }); + deliverReplies + .mockRejectedValueOnce(new Error("403 bot blocked")) + .mockResolvedValueOnce({ delivered: true }); + + await dispatchWithContext({ context: createContext(), streamMode: "off" }); + + expect(createTelegramDraftStream).not.toHaveBeenCalled(); + expect(deliverReplies).toHaveBeenCalledTimes(2); + expect(deliverReplies).toHaveBeenLastCalledWith( + expect.objectContaining({ + replies: [ + expect.objectContaining({ + text: expect.stringContaining("No response"), + }), + ], + }), + ); + }); + + it("handles error block + response final — error delivered, response finalizes preview", async () => { + const draftStream = createDraftStream(999); + createTelegramDraftStream.mockReturnValue(draftStream); + editMessageTelegram.mockResolvedValue({ ok: true }); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + replyOptions?.onPartialReply?.({ text: "Processing..." }); + await dispatcherOptions.deliver( + { text: "⚠️ exec failed", isError: true }, + { kind: "block" }, + ); + await dispatcherOptions.deliver( + { text: "The command timed out. Here's what I found..." }, + { kind: "final" }, + ); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + + await dispatchWithContext({ context: createContext() }); + + // Block error went through deliverReplies + expect(deliverReplies).toHaveBeenCalledTimes(1); + // Final was finalized via preview edit + expect(editMessageTelegram).toHaveBeenCalledWith( + 123, + 999, + "The command timed out. Here's what I found...", + expect.any(Object), + ); + expect(draftStream.clear).not.toHaveBeenCalled(); + }); + + it("cleans up preview even when fallback delivery throws (double failure)", async () => { + const draftStream = createDraftStream(); + createTelegramDraftStream.mockReturnValue(draftStream); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => { + try { + await dispatcherOptions.deliver({ text: "Hello" }, { kind: "final" }); + } catch (err) { + dispatcherOptions.onError(err, { kind: "final" }); + } + return { queuedFinal: false }; + }); + // No preview message id → deliver goes through deliverReplies directly + // Primary delivery fails + deliverReplies + .mockRejectedValueOnce(new Error("network down")) + // Fallback also fails + .mockRejectedValueOnce(new Error("still down")); + + // Fallback throws, but cleanup still runs via try/finally. + await dispatchWithContext({ context: createContext() }).catch(() => {}); + + // Verify fallback was attempted and preview still cleaned up + expect(deliverReplies).toHaveBeenCalledTimes(2); + expect(draftStream.clear).toHaveBeenCalledTimes(1); + }); + + it("clears preview when dispatcher throws before fallback phase", async () => { + const draftStream = createDraftStream(999); + createTelegramDraftStream.mockReturnValue(draftStream); + dispatchReplyWithBufferedBlockDispatcher.mockRejectedValue(new Error("dispatcher exploded")); + + await expect(dispatchWithContext({ context: createContext() })).rejects.toThrow( + "dispatcher exploded", + ); + + expect(draftStream.stop).toHaveBeenCalledTimes(1); + expect(draftStream.clear).toHaveBeenCalledTimes(1); + expect(deliverReplies).not.toHaveBeenCalled(); + }); + + it("supports concurrent dispatches with independent previews", async () => { + const draftA = createDraftStream(11); + const draftB = createDraftStream(22); + createTelegramDraftStream.mockReturnValueOnce(draftA).mockReturnValueOnce(draftB); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onPartialReply?.({ text: "partial" }); + await dispatcherOptions.deliver({ mediaUrl: "file:///tmp/a.png" }, { kind: "final" }); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + + await Promise.all([ + dispatchWithContext({ + context: createContext({ + chatId: 1, + msg: { chat: { id: 1, type: "private" }, message_id: 1 } as never, + }), + }), + dispatchWithContext({ + context: createContext({ + chatId: 2, + msg: { chat: { id: 2, type: "private" }, message_id: 2 } as never, + }), + }), + ]); + + expect(draftA.clear).toHaveBeenCalledTimes(1); + expect(draftB.clear).toHaveBeenCalledTimes(1); + }); }); diff --git a/src/telegram/bot-message-dispatch.ts b/src/telegram/bot-message-dispatch.ts index 7cfd0778790..ad62cf54a0b 100644 --- a/src/telegram/bot-message-dispatch.ts +++ b/src/telegram/bot-message-dispatch.ts @@ -189,11 +189,13 @@ export const dispatchTelegramMessage = async ({ }; const disableBlockStreaming = - typeof telegramCfg.blockStreaming === "boolean" - ? !telegramCfg.blockStreaming - : draftStream || streamMode === "off" - ? true - : undefined; + streamMode === "off" + ? true // off mode must always disable block streaming + : typeof telegramCfg.blockStreaming === "boolean" + ? !telegramCfg.blockStreaming + : draftStream + ? true + : undefined; const { onModelSelected, ...prefixOptions } = createReplyPrefixOptions({ cfg, @@ -269,8 +271,26 @@ export const dispatchTelegramMessage = async ({ const deliveryState = { delivered: false, skippedNonSilent: 0, + failedDeliveries: 0, }; let finalizedViaPreviewMessage = false; + + /** + * Clean up the draft preview message. The preview must be removed in every + * case EXCEPT when it was successfully finalized as the actual response via + * an in-place edit (`finalizedViaPreviewMessage === true`). + */ + const clearDraftPreviewIfNeeded = async () => { + if (finalizedViaPreviewMessage) { + return; + } + try { + await draftStream?.clear(); + } catch (err) { + logVerbose(`telegram: draft preview cleanup failed: ${String(err)}`); + } + }; + const clearGroupHistory = () => { if (isGroup && historyKey) { clearHistoryEntriesIfEnabled({ historyMap: groupHistories, historyKey, limit: historyLimit }); @@ -292,6 +312,7 @@ export const dispatchTelegramMessage = async ({ }; let queuedFinal = false; + let dispatchError: unknown; try { ({ queuedFinal } = await dispatchReplyWithBufferedBlockDispatcher({ ctx: ctxPayload, @@ -340,6 +361,9 @@ export const dispatchTelegramMessage = async ({ }); finalizedViaPreviewMessage = true; deliveryState.delivered = true; + logVerbose( + `telegram: finalized response via preview edit (messageId=${previewMessageId})`, + ); return; } catch (err) { logVerbose( @@ -382,6 +406,9 @@ export const dispatchTelegramMessage = async ({ }); finalizedViaPreviewMessage = true; deliveryState.delivered = true; + logVerbose( + `telegram: finalized response via post-stop preview edit (messageId=${messageIdAfterStop})`, + ); return; } catch (err) { logVerbose( @@ -397,6 +424,13 @@ export const dispatchTelegramMessage = async ({ }); if (result.delivered) { deliveryState.delivered = true; + logVerbose( + `telegram: ${info.kind} reply delivered to chat ${chatId}${payload.isError ? " (error payload)" : ""}`, + ); + } else { + logVerbose( + `telegram: ${info.kind} reply delivery returned not-delivered for chat ${chatId}`, + ); } }, onSkip: (_payload, info) => { @@ -405,6 +439,7 @@ export const dispatchTelegramMessage = async ({ } }, onError: (err, info) => { + deliveryState.failedDeliveries += 1; runtime.error?.(danger(`telegram ${info.kind} reply failed: ${String(err)}`)); }, onReplyStart: createTypingCallbacks({ @@ -453,20 +488,29 @@ export const dispatchTelegramMessage = async ({ onModelSelected, }, })); + } catch (err) { + dispatchError = err; } finally { - // Must stop() first to flush debounced content before clear() wipes state await draftStream?.stop(); - if (!finalizedViaPreviewMessage) { - await draftStream?.clear(); - } } let sentFallback = false; - if (!deliveryState.delivered && deliveryState.skippedNonSilent > 0) { - const result = await deliverReplies({ - replies: [{ text: EMPTY_RESPONSE_FALLBACK }], - ...deliveryBaseOptions, - }); - sentFallback = result.delivered; + try { + if ( + !dispatchError && + !deliveryState.delivered && + (deliveryState.skippedNonSilent > 0 || deliveryState.failedDeliveries > 0) + ) { + const result = await deliverReplies({ + replies: [{ text: EMPTY_RESPONSE_FALLBACK }], + ...deliveryBaseOptions, + }); + sentFallback = result.delivered; + } + } finally { + await clearDraftPreviewIfNeeded(); + } + if (dispatchError) { + throw dispatchError; } const hasFinalResponse = queuedFinal || sentFallback; diff --git a/src/telegram/bot/delivery.ts b/src/telegram/bot/delivery.ts index b5fbfc434e5..5e0efa652c3 100644 --- a/src/telegram/bot/delivery.ts +++ b/src/telegram/bot/delivery.ts @@ -558,6 +558,7 @@ async function sendTelegramText( ...baseParams, }), }); + runtime.log?.(`telegram sendMessage ok chat=${chatId} message=${res.message_id}`); return res.message_id; } catch (err) { const errText = formatErrorMessage(err); @@ -574,6 +575,7 @@ async function sendTelegramText( ...baseParams, }), }); + runtime.log?.(`telegram sendMessage ok chat=${chatId} message=${res.message_id} (plain)`); return res.message_id; } throw err; diff --git a/src/telegram/draft-stream.ts b/src/telegram/draft-stream.ts index 1d8d8e81f04..9d87358671d 100644 --- a/src/telegram/draft-stream.ts +++ b/src/telegram/draft-stream.ts @@ -127,6 +127,7 @@ export function createTelegramDraftStream(params: { } try { await params.api.deleteMessage(chatId, messageId); + params.log?.(`telegram stream preview deleted (chat=${chatId}, message=${messageId})`); } catch (err) { params.warn?.( `telegram stream preview cleanup failed: ${err instanceof Error ? err.message : String(err)}`,