feat(feishu): add reasoning stream support to streaming cards (openclaw#46029)

Verified:
- pnpm build
- pnpm check
- pnpm test:macmini

Co-authored-by: day253 <9634619+day253@users.noreply.github.com>
Co-authored-by: Tak Hoffman <781889+Takhoffman@users.noreply.github.com>
This commit is contained in:
day253 2026-03-15 07:23:03 +08:00 committed by GitHub
parent 3928b4872a
commit 9e8df16732
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 168 additions and 9 deletions

View File

@ -8,6 +8,7 @@ Docs: https://docs.openclaw.ai
- Commands/btw: add `/btw` side questions for quick tool-less answers about the current session without changing future session context, with dismissible in-session TUI answers and explicit BTW replies on external channels. (#45444) Thanks @ngutman. - Commands/btw: add `/btw` side questions for quick tool-less answers about the current session without changing future session context, with dismissible in-session TUI answers and explicit BTW replies on external channels. (#45444) Thanks @ngutman.
- Refactor/channels: remove the legacy channel shim directories and point channel-specific imports directly at the extension-owned implementations. (#45967) thanks @scoootscooob. - Refactor/channels: remove the legacy channel shim directories and point channel-specific imports directly at the extension-owned implementations. (#45967) thanks @scoootscooob.
- Feishu/streaming: add `onReasoningStream` and `onReasoningEnd` support to streaming cards, so `/reasoning stream` renders thinking tokens as markdown blockquotes in the same card — matching the Telegram channel's reasoning lane behavior.
### Fixes ### Fixes

View File

@ -462,6 +462,126 @@ describe("createFeishuReplyDispatcher streaming behavior", () => {
); );
}); });
it("streams reasoning content as blockquote before answer", async () => {
const { result, options } = createDispatcherHarness({
runtime: createRuntimeLogger(),
});
await options.onReplyStart?.();
// Core agent sends pre-formatted text from formatReasoningMessage
result.replyOptions.onReasoningStream?.({ text: "Reasoning:\n_thinking step 1_" });
result.replyOptions.onReasoningStream?.({
text: "Reasoning:\n_thinking step 1_\n_step 2_",
});
result.replyOptions.onPartialReply?.({ text: "answer part" });
result.replyOptions.onReasoningEnd?.();
await options.deliver({ text: "answer part final" }, { kind: "final" });
expect(streamingInstances).toHaveLength(1);
const updateCalls = streamingInstances[0].update.mock.calls.map((c: unknown[]) => c[0]);
const reasoningUpdate = updateCalls.find((c: string) => c.includes("Thinking"));
expect(reasoningUpdate).toContain("> 💭 **Thinking**");
// formatReasoningPrefix strips "Reasoning:" prefix and italic markers
expect(reasoningUpdate).toContain("> thinking step");
expect(reasoningUpdate).not.toContain("Reasoning:");
expect(reasoningUpdate).not.toMatch(/> _.*_/);
const combinedUpdate = updateCalls.find(
(c: string) => c.includes("Thinking") && c.includes("---"),
);
expect(combinedUpdate).toBeDefined();
expect(streamingInstances[0].close).toHaveBeenCalledTimes(1);
const closeArg = streamingInstances[0].close.mock.calls[0][0] as string;
expect(closeArg).toContain("> 💭 **Thinking**");
expect(closeArg).toContain("---");
expect(closeArg).toContain("answer part final");
});
it("provides onReasoningStream and onReasoningEnd when streaming is enabled", () => {
const { result } = createDispatcherHarness({
runtime: createRuntimeLogger(),
});
expect(result.replyOptions.onReasoningStream).toBeTypeOf("function");
expect(result.replyOptions.onReasoningEnd).toBeTypeOf("function");
});
it("omits reasoning callbacks when streaming is disabled", () => {
resolveFeishuAccountMock.mockReturnValue({
accountId: "main",
appId: "app_id",
appSecret: "app_secret",
domain: "feishu",
config: {
renderMode: "auto",
streaming: false,
},
});
const { result } = createDispatcherHarness({
runtime: createRuntimeLogger(),
});
expect(result.replyOptions.onReasoningStream).toBeUndefined();
expect(result.replyOptions.onReasoningEnd).toBeUndefined();
});
it("renders reasoning-only card when no answer text arrives", async () => {
const { result, options } = createDispatcherHarness({
runtime: createRuntimeLogger(),
});
await options.onReplyStart?.();
result.replyOptions.onReasoningStream?.({ text: "Reasoning:\n_deep thought_" });
result.replyOptions.onReasoningEnd?.();
await options.onIdle?.();
expect(streamingInstances).toHaveLength(1);
expect(streamingInstances[0].close).toHaveBeenCalledTimes(1);
const closeArg = streamingInstances[0].close.mock.calls[0][0] as string;
expect(closeArg).toContain("> 💭 **Thinking**");
expect(closeArg).toContain("> deep thought");
expect(closeArg).not.toContain("Reasoning:");
expect(closeArg).not.toContain("---");
});
it("ignores empty reasoning payloads", async () => {
const { result, options } = createDispatcherHarness({
runtime: createRuntimeLogger(),
});
await options.onReplyStart?.();
result.replyOptions.onReasoningStream?.({ text: "" });
result.replyOptions.onPartialReply?.({ text: "```ts\ncode\n```" });
await options.deliver({ text: "```ts\ncode\n```" }, { kind: "final" });
expect(streamingInstances).toHaveLength(1);
const closeArg = streamingInstances[0].close.mock.calls[0][0] as string;
expect(closeArg).not.toContain("Thinking");
expect(closeArg).toBe("```ts\ncode\n```");
});
it("deduplicates final text by raw answer payload, not combined card text", async () => {
const { result, options } = createDispatcherHarness({
runtime: createRuntimeLogger(),
});
await options.onReplyStart?.();
result.replyOptions.onReasoningStream?.({ text: "Reasoning:\n_thought_" });
result.replyOptions.onReasoningEnd?.();
await options.deliver({ text: "```ts\nfinal answer\n```" }, { kind: "final" });
expect(streamingInstances).toHaveLength(1);
expect(streamingInstances[0].close).toHaveBeenCalledTimes(1);
// Deliver the same raw answer text again — should be deduped
await options.deliver({ text: "```ts\nfinal answer\n```" }, { kind: "final" });
// No second streaming session since the raw answer text matches
expect(streamingInstances).toHaveLength(1);
});
it("passes replyToMessageId and replyInThread to streaming.start()", async () => { it("passes replyToMessageId and replyInThread to streaming.start()", async () => {
const { options } = createDispatcherHarness({ const { options } = createDispatcherHarness({
runtime: createRuntimeLogger(), runtime: createRuntimeLogger(),

View File

@ -143,11 +143,39 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
let streaming: FeishuStreamingSession | null = null; let streaming: FeishuStreamingSession | null = null;
let streamText = ""; let streamText = "";
let lastPartial = ""; let lastPartial = "";
let reasoningText = "";
const deliveredFinalTexts = new Set<string>(); const deliveredFinalTexts = new Set<string>();
let partialUpdateQueue: Promise<void> = Promise.resolve(); let partialUpdateQueue: Promise<void> = Promise.resolve();
let streamingStartPromise: Promise<void> | null = null; let streamingStartPromise: Promise<void> | null = null;
type StreamTextUpdateMode = "snapshot" | "delta"; type StreamTextUpdateMode = "snapshot" | "delta";
const formatReasoningPrefix = (thinking: string): string => {
if (!thinking) return "";
const withoutLabel = thinking.replace(/^Reasoning:\n/, "");
const plain = withoutLabel.replace(/^_(.*)_$/gm, "$1");
const lines = plain.split("\n").map((line) => `> ${line}`);
return `> 💭 **Thinking**\n${lines.join("\n")}`;
};
const buildCombinedStreamText = (thinking: string, answer: string): string => {
const parts: string[] = [];
if (thinking) parts.push(formatReasoningPrefix(thinking));
if (thinking && answer) parts.push("\n\n---\n\n");
if (answer) parts.push(answer);
return parts.join("");
};
const flushStreamingCardUpdate = (combined: string) => {
partialUpdateQueue = partialUpdateQueue.then(async () => {
if (streamingStartPromise) {
await streamingStartPromise;
}
if (streaming?.isActive()) {
await streaming.update(combined);
}
});
};
const queueStreamingUpdate = ( const queueStreamingUpdate = (
nextText: string, nextText: string,
options?: { options?: {
@ -167,14 +195,13 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
const mode = options?.mode ?? "snapshot"; const mode = options?.mode ?? "snapshot";
streamText = streamText =
mode === "delta" ? `${streamText}${nextText}` : mergeStreamingText(streamText, nextText); mode === "delta" ? `${streamText}${nextText}` : mergeStreamingText(streamText, nextText);
partialUpdateQueue = partialUpdateQueue.then(async () => { flushStreamingCardUpdate(buildCombinedStreamText(reasoningText, streamText));
if (streamingStartPromise) { };
await streamingStartPromise;
} const queueReasoningUpdate = (nextThinking: string) => {
if (streaming?.isActive()) { if (!nextThinking) return;
await streaming.update(streamText); reasoningText = nextThinking;
} flushStreamingCardUpdate(buildCombinedStreamText(reasoningText, streamText));
});
}; };
const startStreaming = () => { const startStreaming = () => {
@ -213,7 +240,7 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
} }
await partialUpdateQueue; await partialUpdateQueue;
if (streaming?.isActive()) { if (streaming?.isActive()) {
let text = streamText; let text = buildCombinedStreamText(reasoningText, streamText);
if (mentionTargets?.length) { if (mentionTargets?.length) {
text = buildMentionedCardContent(mentionTargets, text); text = buildMentionedCardContent(mentionTargets, text);
} }
@ -223,6 +250,7 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
streamingStartPromise = null; streamingStartPromise = null;
streamText = ""; streamText = "";
lastPartial = ""; lastPartial = "";
reasoningText = "";
}; };
const sendChunkedTextReply = async (params: { const sendChunkedTextReply = async (params: {
@ -392,6 +420,16 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
}); });
} }
: undefined, : undefined,
onReasoningStream: streamingEnabled
? (payload: ReplyPayload) => {
if (!payload.text) {
return;
}
startStreaming();
queueReasoningUpdate(payload.text);
}
: undefined,
onReasoningEnd: streamingEnabled ? () => {} : undefined,
}, },
markDispatchIdle, markDispatchIdle,
}; };