diff --git a/CHANGELOG.md b/CHANGELOG.md index b2a2c7138ac..375cac0aa5c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ Docs: https://docs.openclaw.ai - Commands/btw: add `/btw` side questions for quick tool-less answers about the current session without changing future session context, with dismissible in-session TUI answers and explicit BTW replies on external channels. (#45444) Thanks @ngutman. - Refactor/channels: remove the legacy channel shim directories and point channel-specific imports directly at the extension-owned implementations. (#45967) thanks @scoootscooob. +- Feishu/streaming: add `onReasoningStream` and `onReasoningEnd` support to streaming cards, so `/reasoning stream` renders thinking tokens as markdown blockquotes in the same card — matching the Telegram channel's reasoning lane behavior. ### Fixes diff --git a/extensions/feishu/src/reply-dispatcher.test.ts b/extensions/feishu/src/reply-dispatcher.test.ts index 338953a7d6d..3f20a594e25 100644 --- a/extensions/feishu/src/reply-dispatcher.test.ts +++ b/extensions/feishu/src/reply-dispatcher.test.ts @@ -462,6 +462,126 @@ describe("createFeishuReplyDispatcher streaming behavior", () => { ); }); + it("streams reasoning content as blockquote before answer", async () => { + const { result, options } = createDispatcherHarness({ + runtime: createRuntimeLogger(), + }); + + await options.onReplyStart?.(); + // Core agent sends pre-formatted text from formatReasoningMessage + result.replyOptions.onReasoningStream?.({ text: "Reasoning:\n_thinking step 1_" }); + result.replyOptions.onReasoningStream?.({ + text: "Reasoning:\n_thinking step 1_\n_step 2_", + }); + result.replyOptions.onPartialReply?.({ text: "answer part" }); + result.replyOptions.onReasoningEnd?.(); + await options.deliver({ text: "answer part final" }, { kind: "final" }); + + expect(streamingInstances).toHaveLength(1); + const updateCalls = streamingInstances[0].update.mock.calls.map((c: unknown[]) => c[0]); + const reasoningUpdate = updateCalls.find((c: string) => c.includes("Thinking")); + expect(reasoningUpdate).toContain("> 💭 **Thinking**"); + // formatReasoningPrefix strips "Reasoning:" prefix and italic markers + expect(reasoningUpdate).toContain("> thinking step"); + expect(reasoningUpdate).not.toContain("Reasoning:"); + expect(reasoningUpdate).not.toMatch(/> _.*_/); + + const combinedUpdate = updateCalls.find( + (c: string) => c.includes("Thinking") && c.includes("---"), + ); + expect(combinedUpdate).toBeDefined(); + + expect(streamingInstances[0].close).toHaveBeenCalledTimes(1); + const closeArg = streamingInstances[0].close.mock.calls[0][0] as string; + expect(closeArg).toContain("> 💭 **Thinking**"); + expect(closeArg).toContain("---"); + expect(closeArg).toContain("answer part final"); + }); + + it("provides onReasoningStream and onReasoningEnd when streaming is enabled", () => { + const { result } = createDispatcherHarness({ + runtime: createRuntimeLogger(), + }); + + expect(result.replyOptions.onReasoningStream).toBeTypeOf("function"); + expect(result.replyOptions.onReasoningEnd).toBeTypeOf("function"); + }); + + it("omits reasoning callbacks when streaming is disabled", () => { + resolveFeishuAccountMock.mockReturnValue({ + accountId: "main", + appId: "app_id", + appSecret: "app_secret", + domain: "feishu", + config: { + renderMode: "auto", + streaming: false, + }, + }); + + const { result } = createDispatcherHarness({ + runtime: createRuntimeLogger(), + }); + + expect(result.replyOptions.onReasoningStream).toBeUndefined(); + expect(result.replyOptions.onReasoningEnd).toBeUndefined(); + }); + + it("renders reasoning-only card when no answer text arrives", async () => { + const { result, options } = createDispatcherHarness({ + runtime: createRuntimeLogger(), + }); + + await options.onReplyStart?.(); + result.replyOptions.onReasoningStream?.({ text: "Reasoning:\n_deep thought_" }); + result.replyOptions.onReasoningEnd?.(); + await options.onIdle?.(); + + expect(streamingInstances).toHaveLength(1); + expect(streamingInstances[0].close).toHaveBeenCalledTimes(1); + const closeArg = streamingInstances[0].close.mock.calls[0][0] as string; + expect(closeArg).toContain("> 💭 **Thinking**"); + expect(closeArg).toContain("> deep thought"); + expect(closeArg).not.toContain("Reasoning:"); + expect(closeArg).not.toContain("---"); + }); + + it("ignores empty reasoning payloads", async () => { + const { result, options } = createDispatcherHarness({ + runtime: createRuntimeLogger(), + }); + + await options.onReplyStart?.(); + result.replyOptions.onReasoningStream?.({ text: "" }); + result.replyOptions.onPartialReply?.({ text: "```ts\ncode\n```" }); + await options.deliver({ text: "```ts\ncode\n```" }, { kind: "final" }); + + expect(streamingInstances).toHaveLength(1); + const closeArg = streamingInstances[0].close.mock.calls[0][0] as string; + expect(closeArg).not.toContain("Thinking"); + expect(closeArg).toBe("```ts\ncode\n```"); + }); + + it("deduplicates final text by raw answer payload, not combined card text", async () => { + const { result, options } = createDispatcherHarness({ + runtime: createRuntimeLogger(), + }); + + await options.onReplyStart?.(); + result.replyOptions.onReasoningStream?.({ text: "Reasoning:\n_thought_" }); + result.replyOptions.onReasoningEnd?.(); + await options.deliver({ text: "```ts\nfinal answer\n```" }, { kind: "final" }); + + expect(streamingInstances).toHaveLength(1); + expect(streamingInstances[0].close).toHaveBeenCalledTimes(1); + + // Deliver the same raw answer text again — should be deduped + await options.deliver({ text: "```ts\nfinal answer\n```" }, { kind: "final" }); + + // No second streaming session since the raw answer text matches + expect(streamingInstances).toHaveLength(1); + }); + it("passes replyToMessageId and replyInThread to streaming.start()", async () => { const { options } = createDispatcherHarness({ runtime: createRuntimeLogger(), diff --git a/extensions/feishu/src/reply-dispatcher.ts b/extensions/feishu/src/reply-dispatcher.ts index 5ebf712ca8b..68f0a2c2a0f 100644 --- a/extensions/feishu/src/reply-dispatcher.ts +++ b/extensions/feishu/src/reply-dispatcher.ts @@ -143,11 +143,39 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP let streaming: FeishuStreamingSession | null = null; let streamText = ""; let lastPartial = ""; + let reasoningText = ""; const deliveredFinalTexts = new Set(); let partialUpdateQueue: Promise = Promise.resolve(); let streamingStartPromise: Promise | null = null; type StreamTextUpdateMode = "snapshot" | "delta"; + const formatReasoningPrefix = (thinking: string): string => { + if (!thinking) return ""; + const withoutLabel = thinking.replace(/^Reasoning:\n/, ""); + const plain = withoutLabel.replace(/^_(.*)_$/gm, "$1"); + const lines = plain.split("\n").map((line) => `> ${line}`); + return `> 💭 **Thinking**\n${lines.join("\n")}`; + }; + + const buildCombinedStreamText = (thinking: string, answer: string): string => { + const parts: string[] = []; + if (thinking) parts.push(formatReasoningPrefix(thinking)); + if (thinking && answer) parts.push("\n\n---\n\n"); + if (answer) parts.push(answer); + return parts.join(""); + }; + + const flushStreamingCardUpdate = (combined: string) => { + partialUpdateQueue = partialUpdateQueue.then(async () => { + if (streamingStartPromise) { + await streamingStartPromise; + } + if (streaming?.isActive()) { + await streaming.update(combined); + } + }); + }; + const queueStreamingUpdate = ( nextText: string, options?: { @@ -167,14 +195,13 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP const mode = options?.mode ?? "snapshot"; streamText = mode === "delta" ? `${streamText}${nextText}` : mergeStreamingText(streamText, nextText); - partialUpdateQueue = partialUpdateQueue.then(async () => { - if (streamingStartPromise) { - await streamingStartPromise; - } - if (streaming?.isActive()) { - await streaming.update(streamText); - } - }); + flushStreamingCardUpdate(buildCombinedStreamText(reasoningText, streamText)); + }; + + const queueReasoningUpdate = (nextThinking: string) => { + if (!nextThinking) return; + reasoningText = nextThinking; + flushStreamingCardUpdate(buildCombinedStreamText(reasoningText, streamText)); }; const startStreaming = () => { @@ -213,7 +240,7 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP } await partialUpdateQueue; if (streaming?.isActive()) { - let text = streamText; + let text = buildCombinedStreamText(reasoningText, streamText); if (mentionTargets?.length) { text = buildMentionedCardContent(mentionTargets, text); } @@ -223,6 +250,7 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP streamingStartPromise = null; streamText = ""; lastPartial = ""; + reasoningText = ""; }; const sendChunkedTextReply = async (params: { @@ -392,6 +420,16 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP }); } : undefined, + onReasoningStream: streamingEnabled + ? (payload: ReplyPayload) => { + if (!payload.text) { + return; + } + startStreaming(); + queueReasoningUpdate(payload.text); + } + : undefined, + onReasoningEnd: streamingEnabled ? () => {} : undefined, }, markDispatchIdle, };