fix(feishu): use msg_type media for mp4 video (fixes #33674) (#33720)

* fix(feishu): use msg_type media for mp4 video (fixes #33674)

* Feishu: harden streaming merge semantics and final reply dedupe

Use explicit streaming update semantics in the Feishu reply dispatcher:
treat onPartialReply payloads as snapshot updates and block fallback payloads
as delta chunks, then merge final text with the shared overlap-aware
mergeStreamingText helper before closing the stream.

Prevent duplicate final text delivery within the same dispatch cycle, and add
regression tests covering overlap snapshot merge, duplicate final suppression,
and block-as-delta behavior to guard against repeated/truncated output.

* fix(feishu): prefer message.reply for streaming cards in topic threads

* fix: reduce Feishu streaming card print_step to avoid duplicate rendering

Fixes openclaw/openclaw#33751

* Feishu: preserve media sends on duplicate finals and add media synthesis changelog

* Feishu: only dedupe exact duplicate final replies

* Feishu: use scoped plugin-sdk import in streaming-card tests

---------

Co-authored-by: 倪汉杰0668001185 <ni.hanjie@xydigit.com>
Co-authored-by: zhengquanliu <zhengquanliu@bytedance.com>
Co-authored-by: nick <nickzj@qq.com>
Co-authored-by: linhey <linhey@mini.local>
Co-authored-by: Tak Hoffman <781889+Takhoffman@users.noreply.github.com>
This commit is contained in:
Nhj 2026-03-05 10:39:44 +08:00 committed by GitHub
parent 63ce7c74bd
commit 68e68bfb57
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 272 additions and 49 deletions

View File

@ -35,6 +35,7 @@ Docs: https://docs.openclaw.ai
- Runtime/tool-state stability: recover from dangling Anthropic `tool_use` after compaction, serialize long-running Discord handler runs without blocking new inbound events, and prevent stale busy snapshots from suppressing stuck-channel recovery. (from #33630, #33583) Thanks @kevinWangSheng and @theotarr.
- ACP/Discord startup hardening: clean up stuck ACP worker children on gateway restart, unbind stale ACP thread bindings during Discord startup reconciliation, and add per-thread listener watchdog timeouts so wedged turns cannot block later messages. (#33699) Thanks @dutifulbob.
- Extensions/media local-root propagation: consistently forward `mediaLocalRoots` through extension `sendMedia` adapters (Google Chat, Slack, iMessage, Signal, WhatsApp), preserving non-local media behavior while restoring local attachment resolution from configured roots. Synthesis of #33581, #33545, #33540, #33536, #33528. Thanks @bmendonca3.
- Feishu/video media send contract: keep mp4-like outbound payloads on `msg_type: "media"` (including reply and reply-in-thread paths) so videos render as media instead of degrading to file-link behavior, while preserving existing non-video file subtype handling. (from #33720, #33808, #33678) Thanks @polooooo, @dingjianrui, and @kevinWangSheng.
- Gateway/security default response headers: add `Permissions-Policy: camera=(), microphone=(), geolocation=()` to baseline gateway HTTP security headers for all responses. (#30186) thanks @habakan.
- Plugins/startup loading: lazily initialize plugin runtime, split startup-critical plugin SDK imports into `openclaw/plugin-sdk/core` and `openclaw/plugin-sdk/telegram`, and preserve `api.runtime` reflection semantics for plugin compatibility. (#28620) thanks @hmemcpy.
- Plugins/startup performance: reduce bursty plugin discovery/manifest overhead with short in-process caches, skip importing bundled memory plugins that are disabled by slot selection, and speed legacy root `openclaw/plugin-sdk` compatibility via runtime root-alias routing while preserving backward compatibility. Thanks @gumadeiras.

View File

@ -113,7 +113,7 @@ describe("sendMediaFeishu msg_type routing", () => {
messageResourceGetMock.mockResolvedValue(Buffer.from("resource-bytes"));
});
it("uses msg_type=file for mp4", async () => {
it("uses msg_type=media for mp4 video", async () => {
await sendMediaFeishu({
cfg: {} as any,
to: "user:ou_target",
@ -129,7 +129,7 @@ describe("sendMediaFeishu msg_type routing", () => {
expect(messageCreateMock).toHaveBeenCalledWith(
expect.objectContaining({
data: expect.objectContaining({ msg_type: "file" }),
data: expect.objectContaining({ msg_type: "media" }),
}),
);
});
@ -176,7 +176,7 @@ describe("sendMediaFeishu msg_type routing", () => {
);
});
it("uses msg_type=file when replying with mp4", async () => {
it("uses msg_type=media when replying with mp4", async () => {
await sendMediaFeishu({
cfg: {} as any,
to: "user:ou_target",
@ -188,7 +188,7 @@ describe("sendMediaFeishu msg_type routing", () => {
expect(messageReplyMock).toHaveBeenCalledWith(
expect.objectContaining({
path: { message_id: "om_parent" },
data: expect.objectContaining({ msg_type: "file" }),
data: expect.objectContaining({ msg_type: "media" }),
}),
);
@ -208,7 +208,10 @@ describe("sendMediaFeishu msg_type routing", () => {
expect(messageReplyMock).toHaveBeenCalledWith(
expect.objectContaining({
path: { message_id: "om_parent" },
data: expect.objectContaining({ msg_type: "file", reply_in_thread: true }),
data: expect.objectContaining({
msg_type: "media",
reply_in_thread: true,
}),
}),
);
});

View File

@ -328,8 +328,8 @@ export async function sendFileFeishu(params: {
cfg: ClawdbotConfig;
to: string;
fileKey: string;
/** Use "audio" for audio files, "file" for documents and video */
msgType?: "file" | "audio";
/** Use "audio" for audio, "media" for video (mp4), "file" for documents */
msgType?: "file" | "audio" | "media";
replyToMessageId?: string;
replyInThread?: boolean;
accountId?: string;
@ -467,8 +467,8 @@ export async function sendMediaFeishu(params: {
fileType,
accountId,
});
// Feishu API: opus -> "audio", everything else (including video) -> "file"
const msgType = fileType === "opus" ? "audio" : "file";
// Feishu API: opus -> "audio", mp4/video -> "media" (playable), others -> "file"
const msgType = fileType === "opus" ? "audio" : fileType === "mp4" ? "media" : "file";
return sendFileFeishu({
cfg,
to,

View File

@ -26,6 +26,23 @@ vi.mock("./typing.js", () => ({
removeTypingIndicator: removeTypingIndicatorMock,
}));
vi.mock("./streaming-card.js", () => ({
mergeStreamingText: (previousText: string | undefined, nextText: string | undefined) => {
const previous = typeof previousText === "string" ? previousText : "";
const next = typeof nextText === "string" ? nextText : "";
if (!next) {
return previous;
}
if (!previous || next === previous) {
return next;
}
if (next.startsWith(previous)) {
return next;
}
if (previous.startsWith(next)) {
return previous;
}
return `${previous}${next}`;
},
FeishuStreamingSession: class {
active = false;
start = vi.fn(async () => {
@ -244,6 +261,116 @@ describe("createFeishuReplyDispatcher streaming behavior", () => {
expect(streamingInstances[0].close).toHaveBeenCalledWith("```md\npartial answer\n```");
});
it("delivers distinct final payloads after streaming close", async () => {
createFeishuReplyDispatcher({
cfg: {} as never,
agentId: "agent",
runtime: { log: vi.fn(), error: vi.fn() } as never,
chatId: "oc_chat",
});
const options = createReplyDispatcherWithTypingMock.mock.calls[0]?.[0];
await options.deliver({ text: "```md\n完整回复第一段\n```" }, { kind: "final" });
await options.deliver({ text: "```md\n完整回复第一段 + 第二段\n```" }, { kind: "final" });
expect(streamingInstances).toHaveLength(2);
expect(streamingInstances[0].close).toHaveBeenCalledTimes(1);
expect(streamingInstances[0].close).toHaveBeenCalledWith("```md\n完整回复第一段\n```");
expect(streamingInstances[1].close).toHaveBeenCalledTimes(1);
expect(streamingInstances[1].close).toHaveBeenCalledWith("```md\n完整回复第一段 + 第二段\n```");
expect(sendMessageFeishuMock).not.toHaveBeenCalled();
expect(sendMarkdownCardFeishuMock).not.toHaveBeenCalled();
});
it("skips exact duplicate final text after streaming close", async () => {
createFeishuReplyDispatcher({
cfg: {} as never,
agentId: "agent",
runtime: { log: vi.fn(), error: vi.fn() } as never,
chatId: "oc_chat",
});
const options = createReplyDispatcherWithTypingMock.mock.calls[0]?.[0];
await options.deliver({ text: "```md\n同一条回复\n```" }, { kind: "final" });
await options.deliver({ text: "```md\n同一条回复\n```" }, { kind: "final" });
expect(streamingInstances).toHaveLength(1);
expect(streamingInstances[0].close).toHaveBeenCalledTimes(1);
expect(streamingInstances[0].close).toHaveBeenCalledWith("```md\n同一条回复\n```");
expect(sendMessageFeishuMock).not.toHaveBeenCalled();
expect(sendMarkdownCardFeishuMock).not.toHaveBeenCalled();
});
it("suppresses duplicate final text while still sending media", async () => {
resolveFeishuAccountMock.mockReturnValue({
accountId: "main",
appId: "app_id",
appSecret: "app_secret",
domain: "feishu",
config: {
renderMode: "auto",
streaming: false,
},
});
createFeishuReplyDispatcher({
cfg: {} as never,
agentId: "agent",
runtime: { log: vi.fn(), error: vi.fn() } as never,
chatId: "oc_chat",
});
const options = createReplyDispatcherWithTypingMock.mock.calls[0]?.[0];
await options.deliver({ text: "plain final" }, { kind: "final" });
await options.deliver(
{ text: "plain final", mediaUrl: "https://example.com/a.png" },
{ kind: "final" },
);
expect(sendMessageFeishuMock).toHaveBeenCalledTimes(1);
expect(sendMessageFeishuMock).toHaveBeenLastCalledWith(
expect.objectContaining({
text: "plain final",
}),
);
expect(sendMediaFeishuMock).toHaveBeenCalledTimes(1);
expect(sendMediaFeishuMock).toHaveBeenCalledWith(
expect.objectContaining({
mediaUrl: "https://example.com/a.png",
}),
);
});
it("treats block updates as delta chunks", async () => {
resolveFeishuAccountMock.mockReturnValue({
accountId: "main",
appId: "app_id",
appSecret: "app_secret",
domain: "feishu",
config: {
renderMode: "card",
streaming: true,
},
});
const result = createFeishuReplyDispatcher({
cfg: {} as never,
agentId: "agent",
runtime: { log: vi.fn(), error: vi.fn() } as never,
chatId: "oc_chat",
});
const options = createReplyDispatcherWithTypingMock.mock.calls[0]?.[0];
await options.onReplyStart?.();
await result.replyOptions.onPartialReply?.({ text: "hello" });
await options.deliver({ text: "lo world" }, { kind: "block" });
await options.onIdle?.();
expect(streamingInstances).toHaveLength(1);
expect(streamingInstances[0].close).toHaveBeenCalledTimes(1);
expect(streamingInstances[0].close).toHaveBeenCalledWith("hellolo world");
});
it("sends media-only payloads as attachments", async () => {
createFeishuReplyDispatcher({
cfg: {} as never,

View File

@ -13,7 +13,7 @@ import type { MentionTarget } from "./mention.js";
import { buildMentionedCardContent } from "./mention.js";
import { getFeishuRuntime } from "./runtime.js";
import { sendMarkdownCardFeishu, sendMessageFeishu } from "./send.js";
import { FeishuStreamingSession } from "./streaming-card.js";
import { FeishuStreamingSession, mergeStreamingText } from "./streaming-card.js";
import { resolveReceiveIdType } from "./targets.js";
import { addTypingIndicator, removeTypingIndicator, type TypingIndicatorState } from "./typing.js";
@ -143,29 +143,16 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
let streaming: FeishuStreamingSession | null = null;
let streamText = "";
let lastPartial = "";
let lastFinalText: string | null = null;
let partialUpdateQueue: Promise<void> = Promise.resolve();
let streamingStartPromise: Promise<void> | null = null;
const mergeStreamingText = (nextText: string) => {
if (!streamText) {
streamText = nextText;
return;
}
if (nextText.startsWith(streamText)) {
// Handle cumulative partial payloads where nextText already includes prior text.
streamText = nextText;
return;
}
if (streamText.endsWith(nextText)) {
return;
}
streamText += nextText;
};
type StreamTextUpdateMode = "snapshot" | "delta";
const queueStreamingUpdate = (
nextText: string,
options?: {
dedupeWithLastPartial?: boolean;
mode?: StreamTextUpdateMode;
},
) => {
if (!nextText) {
@ -177,7 +164,9 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
if (options?.dedupeWithLastPartial) {
lastPartial = nextText;
}
mergeStreamingText(nextText);
const mode = options?.mode ?? "snapshot";
streamText =
mode === "delta" ? `${streamText}${nextText}` : mergeStreamingText(streamText, nextText);
partialUpdateQueue = partialUpdateQueue.then(async () => {
if (streamingStartPromise) {
await streamingStartPromise;
@ -241,6 +230,7 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
responsePrefixContextProvider: prefixContext.responsePrefixContextProvider,
humanDelay: core.channel.reply.resolveHumanDelayConfig(cfg, agentId),
onReplyStart: () => {
lastFinalText = null;
if (streamingEnabled && renderMode === "card") {
startStreaming();
}
@ -256,12 +246,17 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
: [];
const hasText = Boolean(text.trim());
const hasMedia = mediaList.length > 0;
// Suppress only exact duplicate final text payloads to avoid
// dropping legitimate multi-part final replies.
const skipTextForDuplicateFinal =
info?.kind === "final" && hasText && lastFinalText === text;
const shouldDeliverText = hasText && !skipTextForDuplicateFinal;
if (!hasText && !hasMedia) {
if (!shouldDeliverText && !hasMedia) {
return;
}
if (hasText) {
if (shouldDeliverText) {
const useCard = renderMode === "card" || (renderMode === "auto" && shouldUseCard(text));
if (info?.kind === "block") {
@ -287,11 +282,12 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
if (info?.kind === "block") {
// Some runtimes emit block payloads without onPartial/final callbacks.
// Mirror block text into streamText so onIdle close still sends content.
queueStreamingUpdate(text);
queueStreamingUpdate(text, { mode: "delta" });
}
if (info?.kind === "final") {
streamText = text;
streamText = mergeStreamingText(streamText, text);
await closeStreaming();
lastFinalText = text;
}
// Send media even when streaming handled the text
if (hasMedia) {
@ -327,6 +323,9 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
});
first = false;
}
if (info?.kind === "final") {
lastFinalText = text;
}
} else {
const converted = core.channel.text.convertMarkdownTables(text, tableMode);
for (const chunk of core.channel.text.chunkTextWithMode(
@ -345,6 +344,9 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
});
first = false;
}
if (info?.kind === "final") {
lastFinalText = text;
}
}
}
@ -387,7 +389,10 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
if (!payload.text) {
return;
}
queueStreamingUpdate(payload.text, { dedupeWithLastPartial: true });
queueStreamingUpdate(payload.text, {
dedupeWithLastPartial: true,
mode: "snapshot",
});
}
: undefined,
},

View File

@ -1,5 +1,12 @@
import { describe, expect, it } from "vitest";
import { mergeStreamingText } from "./streaming-card.js";
import { beforeEach, describe, expect, it, vi } from "vitest";
const fetchWithSsrFGuardMock = vi.hoisted(() => vi.fn());
vi.mock("openclaw/plugin-sdk/feishu", () => ({
fetchWithSsrFGuard: fetchWithSsrFGuardMock,
}));
import { FeishuStreamingSession, mergeStreamingText } from "./streaming-card.js";
describe("mergeStreamingText", () => {
it("prefers the latest full text when it already includes prior text", () => {
@ -15,4 +22,65 @@ describe("mergeStreamingText", () => {
expect(mergeStreamingText("hello wor", "ld")).toBe("hello world");
expect(mergeStreamingText("line1", "line2")).toBe("line1line2");
});
it("merges overlap between adjacent partial snapshots", () => {
expect(mergeStreamingText("好的,让我", "让我再读取一遍")).toBe("好的,让我再读取一遍");
expect(mergeStreamingText("revision_id: 552", "2一点变化都没有")).toBe(
"revision_id: 552一点变化都没有",
);
});
});
describe("FeishuStreamingSession routing", () => {
beforeEach(() => {
vi.clearAllMocks();
fetchWithSsrFGuardMock.mockReset();
});
it("prefers message.reply when reply target and root id both exist", async () => {
fetchWithSsrFGuardMock
.mockResolvedValueOnce({
response: { json: async () => ({ code: 0, msg: "ok", tenant_access_token: "token" }) },
release: async () => {},
})
.mockResolvedValueOnce({
response: { json: async () => ({ code: 0, msg: "ok", data: { card_id: "card_1" } }) },
release: async () => {},
});
const replyMock = vi.fn(async () => ({ code: 0, data: { message_id: "msg_reply" } }));
const createMock = vi.fn(async () => ({ code: 0, data: { message_id: "msg_create" } }));
const session = new FeishuStreamingSession(
{
im: {
message: {
reply: replyMock,
create: createMock,
},
},
} as never,
{
appId: "app",
appSecret: "secret",
domain: "feishu",
},
);
await session.start("oc_chat", "chat_id", {
replyToMessageId: "om_parent",
replyInThread: true,
rootId: "om_topic_root",
});
expect(replyMock).toHaveBeenCalledTimes(1);
expect(replyMock).toHaveBeenCalledWith({
path: { message_id: "om_parent" },
data: expect.objectContaining({
msg_type: "interactive",
reply_in_thread: true,
}),
});
expect(createMock).not.toHaveBeenCalled();
});
});

View File

@ -94,7 +94,25 @@ export function mergeStreamingText(
if (!next) {
return previous;
}
if (!previous || next === previous || next.includes(previous)) {
if (!previous || next === previous) {
return next;
}
if (next.startsWith(previous)) {
return next;
}
if (previous.startsWith(next)) {
return previous;
}
// Merge partial overlaps, e.g. "这" + "这是" => "这是".
const maxOverlap = Math.min(previous.length, next.length);
for (let overlap = maxOverlap; overlap > 0; overlap -= 1) {
if (previous.slice(-overlap) === next.slice(0, overlap)) {
return `${previous}${next.slice(overlap)}`;
}
}
if (next.includes(previous)) {
return next;
}
if (previous.includes(next)) {
@ -142,7 +160,7 @@ export class FeishuStreamingSession {
config: {
streaming_mode: true,
summary: { content: "[Generating...]" },
streaming_config: { print_frequency_ms: { default: 50 }, print_step: { default: 2 } },
streaming_config: { print_frequency_ms: { default: 50 }, print_step: { default: 1 } },
},
body: {
elements: [{ tag: "markdown", content: "⏳ Thinking...", element_id: "content" }],
@ -181,20 +199,12 @@ export class FeishuStreamingSession {
const cardId = createData.data.card_id;
const cardContent = JSON.stringify({ type: "card", data: { card_id: cardId } });
// Topic-group replies require root_id routing. Prefer create+root_id when available.
// Prefer message.reply when we have a reply target — reply_in_thread
// reliably routes streaming cards into Feishu topics, whereas
// message.create with root_id may silently ignore root_id for card
// references (card_id format).
let sendRes;
if (options?.rootId) {
const createData = {
receive_id: receiveId,
msg_type: "interactive",
content: cardContent,
root_id: options.rootId,
};
sendRes = await this.client.im.message.create({
params: { receive_id_type: receiveIdType },
data: createData,
});
} else if (options?.replyToMessageId) {
if (options?.replyToMessageId) {
sendRes = await this.client.im.message.reply({
path: { message_id: options.replyToMessageId },
data: {
@ -203,6 +213,15 @@ export class FeishuStreamingSession {
...(options.replyInThread ? { reply_in_thread: true } : {}),
},
});
} else if (options?.rootId) {
// root_id is undeclared in the SDK types but accepted at runtime
sendRes = await this.client.im.message.create({
params: { receive_id_type: receiveIdType },
data: Object.assign(
{ receive_id: receiveId, msg_type: "interactive", content: cardContent },
{ root_id: options.rootId },
),
});
} else {
sendRes = await this.client.im.message.create({
params: { receive_id_type: receiveIdType },