diff --git a/extensions/msteams/src/feedback-reflection-prompt.ts b/extensions/msteams/src/feedback-reflection-prompt.ts new file mode 100644 index 00000000000..ec374e25550 --- /dev/null +++ b/extensions/msteams/src/feedback-reflection-prompt.ts @@ -0,0 +1,115 @@ +/** Max chars of the thumbed-down response to include in the reflection prompt. */ +const MAX_RESPONSE_CHARS = 500; + +export type ParsedReflectionResponse = { + learning: string; + followUp: boolean; + userMessage?: string; +}; + +export function buildReflectionPrompt(params: { + thumbedDownResponse?: string; + userComment?: string; +}): string { + const parts: string[] = ["A user indicated your previous response wasn't helpful."]; + + if (params.thumbedDownResponse) { + const truncated = + params.thumbedDownResponse.length > MAX_RESPONSE_CHARS + ? `${params.thumbedDownResponse.slice(0, MAX_RESPONSE_CHARS)}...` + : params.thumbedDownResponse; + parts.push(`\nYour response was:\n> ${truncated}`); + } + + if (params.userComment) { + parts.push(`\nUser's comment: "${params.userComment}"`); + } + + parts.push( + "\nBriefly reflect: what could you improve? Consider tone, length, " + + "accuracy, relevance, and specificity. Reply with a single JSON object " + + 'only, no markdown or prose, using this exact shape:\n{"learning":"...",' + + '"followUp":false,"userMessage":""}\n' + + "- learning: a short internal adjustment note (1-2 sentences) for your " + + "future behavior in this conversation.\n" + + "- followUp: true only if the user needs a direct follow-up message.\n" + + "- userMessage: only the exact user-facing message to send; empty string " + + "when followUp is false.", + ); + + return parts.join("\n"); +} + +function parseBooleanLike(value: unknown): boolean | undefined { + if (typeof value === "boolean") { + return value; + } + if (typeof value === "string") { + const normalized = value.trim().toLowerCase(); + if (normalized === "true" || normalized === "yes") { + return true; + } + if (normalized === "false" || normalized === "no") { + return false; + } + } + return undefined; +} + +function parseStructuredReflectionValue(value: unknown): ParsedReflectionResponse | null { + if (value == null || typeof value !== "object" || Array.isArray(value)) { + return null; + } + + const candidate = value as { + learning?: unknown; + followUp?: unknown; + userMessage?: unknown; + }; + const learning = typeof candidate.learning === "string" ? candidate.learning.trim() : undefined; + if (!learning) { + return null; + } + + return { + learning, + followUp: parseBooleanLike(candidate.followUp) ?? false, + userMessage: + typeof candidate.userMessage === "string" && candidate.userMessage.trim() + ? candidate.userMessage.trim() + : undefined, + }; +} + +export function parseReflectionResponse(text: string): ParsedReflectionResponse | null { + const trimmed = text.trim(); + if (!trimmed) { + return null; + } + + const candidates = [ + trimmed, + ...(trimmed.match(/```(?:json)?\s*([\s\S]*?)```/i)?.slice(1, 2) ?? []), + ]; + + for (const candidateText of candidates) { + const candidate = candidateText.trim(); + if (!candidate) { + continue; + } + try { + const parsed = parseStructuredReflectionValue(JSON.parse(candidate)); + if (parsed) { + return parsed; + } + } catch { + // Fall through to the next parse strategy. + } + } + + // Safe fallback: keep the internal learning, but never auto-message the user. + return { + learning: trimmed, + followUp: false, + }; +} diff --git a/extensions/msteams/src/feedback-reflection-store.ts b/extensions/msteams/src/feedback-reflection-store.ts new file mode 100644 index 00000000000..83125372aba --- /dev/null +++ b/extensions/msteams/src/feedback-reflection-store.ts @@ -0,0 +1,99 @@ +/** Default cooldown between reflections per session (5 minutes). */ +export const DEFAULT_COOLDOWN_MS = 300_000; + +/** Tracks last reflection time per session to enforce cooldown. */ +const lastReflectionBySession = new Map(); + +/** Maximum cooldown entries before pruning expired ones. */ +const MAX_COOLDOWN_ENTRIES = 500; + +function sanitizeSessionKey(sessionKey: string): string { + return sessionKey.replace(/[^a-zA-Z0-9_-]/g, "_"); +} + +/** Prune expired cooldown entries to prevent unbounded memory growth. */ +function pruneExpiredCooldowns(cooldownMs: number): void { + if (lastReflectionBySession.size <= MAX_COOLDOWN_ENTRIES) { + return; + } + const now = Date.now(); + for (const [key, time] of lastReflectionBySession) { + if (now - time >= cooldownMs) { + lastReflectionBySession.delete(key); + } + } +} + +/** Check if a reflection is allowed (cooldown not active). */ +export function isReflectionAllowed(sessionKey: string, cooldownMs?: number): boolean { + const cooldown = cooldownMs ?? DEFAULT_COOLDOWN_MS; + const lastTime = lastReflectionBySession.get(sessionKey); + if (lastTime == null) { + return true; + } + return Date.now() - lastTime >= cooldown; +} + +/** Record that a reflection was run for a session. */ +export function recordReflectionTime(sessionKey: string, cooldownMs?: number): void { + lastReflectionBySession.set(sessionKey, Date.now()); + pruneExpiredCooldowns(cooldownMs ?? DEFAULT_COOLDOWN_MS); +} + +/** Clear reflection cooldown tracking (for tests). */ +export function clearReflectionCooldowns(): void { + lastReflectionBySession.clear(); +} + +/** Store a learning derived from feedback reflection in a session companion file. */ +export async function storeSessionLearning(params: { + storePath: string; + sessionKey: string; + learning: string; +}): Promise { + const fs = await import("node:fs/promises"); + const path = await import("node:path"); + + const learningsFile = path.join( + params.storePath, + `${sanitizeSessionKey(params.sessionKey)}.learnings.json`, + ); + + let learnings: string[] = []; + try { + const existing = await fs.readFile(learningsFile, "utf-8"); + const parsed = JSON.parse(existing); + if (Array.isArray(parsed)) { + learnings = parsed; + } + } catch { + // File doesn't exist yet — start fresh. + } + + learnings.push(params.learning); + if (learnings.length > 10) { + learnings = learnings.slice(-10); + } + + await fs.mkdir(path.dirname(learningsFile), { recursive: true }); + await fs.writeFile(learningsFile, JSON.stringify(learnings, null, 2), "utf-8"); +} + +/** Load session learnings for injection into extraSystemPrompt. */ +export async function loadSessionLearnings( + storePath: string, + sessionKey: string, +): Promise { + const fs = await import("node:fs/promises"); + const path = await import("node:path"); + + const learningsFile = path.join(storePath, `${sanitizeSessionKey(sessionKey)}.learnings.json`); + + try { + const content = await fs.readFile(learningsFile, "utf-8"); + const parsed = JSON.parse(content); + return Array.isArray(parsed) ? parsed : []; + } catch { + return []; + } +} diff --git a/extensions/msteams/src/feedback-reflection.ts b/extensions/msteams/src/feedback-reflection.ts index cba7e35ab7a..7d53310935b 100644 --- a/extensions/msteams/src/feedback-reflection.ts +++ b/extensions/msteams/src/feedback-reflection.ts @@ -2,7 +2,7 @@ * Background reflection triggered by negative user feedback (thumbs-down). * * Flow: - * 1. User thumbs-down → invoke handler acks immediately + * 1. User thumbs-down -> invoke handler acks immediately * 2. This module runs in the background (fire-and-forget) * 3. Reads recent session context * 4. Sends a synthetic reflection prompt to the agent @@ -15,36 +15,20 @@ import { type OpenClawConfig, } from "../runtime-api.js"; import type { StoredConversationReference } from "./conversation-store.js"; +import { buildReflectionPrompt, parseReflectionResponse } from "./feedback-reflection-prompt.js"; +import { + DEFAULT_COOLDOWN_MS, + clearReflectionCooldowns, + isReflectionAllowed, + loadSessionLearnings, + recordReflectionTime, + storeSessionLearning, +} from "./feedback-reflection-store.js"; import type { MSTeamsAdapter } from "./messenger.js"; -import { buildConversationReference, sendMSTeamsMessages } from "./messenger.js"; +import { buildConversationReference } from "./messenger.js"; import type { MSTeamsMonitorLogger } from "./monitor-types.js"; import { getMSTeamsRuntime } from "./runtime.js"; -/** Default cooldown between reflections per session (5 minutes). */ -const DEFAULT_COOLDOWN_MS = 300_000; - -/** Max chars of the thumbed-down response to include in the reflection prompt. */ -const MAX_RESPONSE_CHARS = 500; - -/** Tracks last reflection time per session to enforce cooldown. */ -const lastReflectionBySession = new Map(); - -/** Maximum cooldown entries before pruning expired ones. */ -const MAX_COOLDOWN_ENTRIES = 500; - -/** Prune expired cooldown entries to prevent unbounded memory growth. */ -function pruneExpiredCooldowns(cooldownMs: number): void { - if (lastReflectionBySession.size <= MAX_COOLDOWN_ENTRIES) { - return; - } - const now = Date.now(); - for (const [key, time] of lastReflectionBySession) { - if (now - time >= cooldownMs) { - lastReflectionBySession.delete(key); - } - } -} - export type FeedbackEvent = { type: "custom"; event: "feedback"; @@ -79,146 +63,6 @@ export function buildFeedbackEvent(params: { }; } -export type ParsedReflectionResponse = { - learning: string; - followUp: boolean; - userMessage?: string; -}; - -export function buildReflectionPrompt(params: { - thumbedDownResponse?: string; - userComment?: string; -}): string { - const parts: string[] = ["A user indicated your previous response wasn't helpful."]; - - if (params.thumbedDownResponse) { - const truncated = - params.thumbedDownResponse.length > MAX_RESPONSE_CHARS - ? `${params.thumbedDownResponse.slice(0, MAX_RESPONSE_CHARS)}...` - : params.thumbedDownResponse; - parts.push(`\nYour response was:\n> ${truncated}`); - } - - if (params.userComment) { - parts.push(`\nUser's comment: "${params.userComment}"`); - } - - parts.push( - "\nBriefly reflect: what could you improve? Consider tone, length, " + - "accuracy, relevance, and specificity. Reply with a single JSON object " + - 'only, no markdown or prose, using this exact shape:\n{"learning":"...",' + - '"followUp":false,"userMessage":""}\n' + - "- learning: a short internal adjustment note (1-2 sentences) for your " + - "future behavior in this conversation.\n" + - "- followUp: true only if the user needs a direct follow-up message.\n" + - "- userMessage: only the exact user-facing message to send; empty string " + - "when followUp is false.", - ); - - return parts.join("\n"); -} - -function parseBooleanLike(value: unknown): boolean | undefined { - if (typeof value === "boolean") { - return value; - } - if (typeof value === "string") { - const normalized = value.trim().toLowerCase(); - if (normalized === "true" || normalized === "yes") { - return true; - } - if (normalized === "false" || normalized === "no") { - return false; - } - } - return undefined; -} - -function parseStructuredReflectionValue(value: unknown): ParsedReflectionResponse | null { - if (value == null || typeof value !== "object" || Array.isArray(value)) { - return null; - } - - const candidate = value as { - learning?: unknown; - followUp?: unknown; - userMessage?: unknown; - }; - const learning = typeof candidate.learning === "string" ? candidate.learning.trim() : undefined; - if (!learning) { - return null; - } - - return { - learning, - followUp: parseBooleanLike(candidate.followUp) ?? false, - userMessage: - typeof candidate.userMessage === "string" && candidate.userMessage.trim() - ? candidate.userMessage.trim() - : undefined, - }; -} - -export function parseReflectionResponse(text: string): ParsedReflectionResponse | null { - const trimmed = text.trim(); - if (!trimmed) { - return null; - } - - const candidates = [ - trimmed, - ...(trimmed.match(/```(?:json)?\s*([\s\S]*?)```/i)?.slice(1, 2) ?? []), - ]; - - for (const candidateText of candidates) { - const candidate = candidateText.trim(); - if (!candidate) { - continue; - } - try { - const parsed = parseStructuredReflectionValue(JSON.parse(candidate)); - if (parsed) { - return parsed; - } - } catch { - // Fall through to the next parse strategy. - } - } - - // Safe fallback: keep the internal learning, but never auto-message the user. - return { - learning: trimmed, - followUp: false, - }; -} - -/** - * Check if a reflection is allowed (cooldown not active). - */ -export function isReflectionAllowed(sessionKey: string, cooldownMs?: number): boolean { - const cooldown = cooldownMs ?? DEFAULT_COOLDOWN_MS; - const lastTime = lastReflectionBySession.get(sessionKey); - if (lastTime == null) { - return true; - } - return Date.now() - lastTime >= cooldown; -} - -/** - * Record that a reflection was run for a session. - */ -export function recordReflectionTime(sessionKey: string, cooldownMs?: number): void { - lastReflectionBySession.set(sessionKey, Date.now()); - pruneExpiredCooldowns(cooldownMs ?? DEFAULT_COOLDOWN_MS); -} - -/** - * Clear reflection cooldown tracking (for tests). - */ -export function clearReflectionCooldowns(): void { - lastReflectionBySession.clear(); -} - export type RunFeedbackReflectionParams = { cfg: OpenClawConfig; adapter: MSTeamsAdapter; @@ -233,68 +77,51 @@ export type RunFeedbackReflectionParams = { log: MSTeamsMonitorLogger; }; -/** - * Run a background reflection after negative feedback. - * This is designed to be called fire-and-forget (don't await in the invoke handler). - */ -export async function runFeedbackReflection(params: RunFeedbackReflectionParams): Promise { - const { cfg, log, sessionKey } = params; - const msteamsCfg = cfg.channels?.msteams; - - // Check cooldown - const cooldownMs = msteamsCfg?.feedbackReflectionCooldownMs ?? DEFAULT_COOLDOWN_MS; - if (!isReflectionAllowed(sessionKey, cooldownMs)) { - log.debug?.("skipping reflection (cooldown active)", { sessionKey }); - return; - } - - // Record cooldown after successful dispatch (not before) so transient - // failures don't suppress future reflection attempts. - +function buildReflectionContext(params: { + cfg: OpenClawConfig; + conversationId: string; + sessionKey: string; + reflectionPrompt: string; +}) { const core = getMSTeamsRuntime(); - const reflectionPrompt = buildReflectionPrompt({ - thumbedDownResponse: params.thumbedDownResponse, - userComment: params.userComment, - }); - - // Use the agentId from the feedback handler (already resolved with correct routing) - // instead of re-resolving, which could yield a different agent in peer-specific setups. - const storePath = core.channel.session.resolveStorePath(cfg.session?.store, { - agentId: params.agentId, - }); - - const envelopeOptions = core.channel.reply.resolveEnvelopeFormatOptions(cfg); + const envelopeOptions = core.channel.reply.resolveEnvelopeFormatOptions(params.cfg); const body = core.channel.reply.formatAgentEnvelope({ channel: "Teams", from: "system", - body: reflectionPrompt, + body: params.reflectionPrompt, envelope: envelopeOptions, }); - const ctxPayload = core.channel.reply.finalizeInboundContext({ - Body: body, - BodyForAgent: reflectionPrompt, - RawBody: reflectionPrompt, - CommandBody: reflectionPrompt, - From: `msteams:system:${params.conversationId}`, - To: `conversation:${params.conversationId}`, - SessionKey: params.sessionKey, - ChatType: "direct" as const, - SenderName: "system", - SenderId: "system", - Provider: "msteams" as const, - Surface: "msteams" as const, - Timestamp: Date.now(), - WasMentioned: true, - CommandAuthorized: false, - OriginatingChannel: "msteams" as const, - OriginatingTo: `conversation:${params.conversationId}`, - }); - - // Capture the reflection response instead of sending it directly. - // We only want to proactively message if the agent decides to follow up. - let reflectionResponse = ""; + return { + ctxPayload: core.channel.reply.finalizeInboundContext({ + Body: body, + BodyForAgent: params.reflectionPrompt, + RawBody: params.reflectionPrompt, + CommandBody: params.reflectionPrompt, + From: `msteams:system:${params.conversationId}`, + To: `conversation:${params.conversationId}`, + SessionKey: params.sessionKey, + ChatType: "direct" as const, + SenderName: "system", + SenderId: "system", + Provider: "msteams" as const, + Surface: "msteams" as const, + Timestamp: Date.now(), + WasMentioned: true, + CommandAuthorized: false, + OriginatingChannel: "msteams" as const, + OriginatingTo: `conversation:${params.conversationId}`, + }), + }; +} +function createReflectionCaptureDispatcher(params: { + cfg: OpenClawConfig; + agentId: string; + log: MSTeamsMonitorLogger; +}) { + const core = getMSTeamsRuntime(); + let response = ""; const noopTypingCallbacks = { onReplyStart: async () => {}, onIdle: () => {}, @@ -304,31 +131,88 @@ export async function runFeedbackReflection(params: RunFeedbackReflectionParams) const { dispatcher, replyOptions } = core.channel.reply.createReplyDispatcherWithTyping({ deliver: async (payload) => { if (payload.text) { - reflectionResponse += (reflectionResponse ? "\n" : "") + payload.text; + response += (response ? "\n" : "") + payload.text; } }, typingCallbacks: noopTypingCallbacks, - humanDelay: core.channel.reply.resolveHumanDelayConfig(cfg, params.agentId), + humanDelay: core.channel.reply.resolveHumanDelayConfig(params.cfg, params.agentId), onError: (err) => { - log.debug?.("reflection reply error", { error: String(err) }); + params.log.debug?.("reflection reply error", { error: String(err) }); }, }); + return { + dispatcher, + replyOptions, + readResponse: () => response, + }; +} + +async function sendReflectionFollowUp(params: { + adapter: MSTeamsAdapter; + appId: string; + conversationRef: StoredConversationReference; + userMessage: string; +}): Promise { + const baseRef = buildConversationReference(params.conversationRef); + const proactiveRef = { ...baseRef, activityId: undefined }; + + await params.adapter.continueConversation(params.appId, proactiveRef, async (ctx) => { + await ctx.sendActivity({ + type: "message", + text: params.userMessage, + }); + }); +} + +/** + * Run a background reflection after negative feedback. + * This is designed to be called fire-and-forget (don't await in the invoke handler). + */ +export async function runFeedbackReflection(params: RunFeedbackReflectionParams): Promise { + const { cfg, log, sessionKey } = params; + const cooldownMs = cfg.channels?.msteams?.feedbackReflectionCooldownMs ?? DEFAULT_COOLDOWN_MS; + if (!isReflectionAllowed(sessionKey, cooldownMs)) { + log.debug?.("skipping reflection (cooldown active)", { sessionKey }); + return; + } + + const reflectionPrompt = buildReflectionPrompt({ + thumbedDownResponse: params.thumbedDownResponse, + userComment: params.userComment, + }); + const runtime = getMSTeamsRuntime(); + const storePath = runtime.channel.session.resolveStorePath(cfg.session?.store, { + agentId: params.agentId, + }); + const { ctxPayload } = buildReflectionContext({ + cfg, + conversationId: params.conversationId, + sessionKey: params.sessionKey, + reflectionPrompt, + }); + + const capture = createReflectionCaptureDispatcher({ + cfg, + agentId: params.agentId, + log, + }); + try { await dispatchReplyFromConfigWithSettledDispatcher({ ctxPayload, cfg, - dispatcher, + dispatcher: capture.dispatcher, onSettled: () => {}, - replyOptions, + replyOptions: capture.replyOptions, }); } catch (err) { log.error("reflection dispatch failed", { error: String(err) }); - // Don't record cooldown — allow retry on next feedback return; } - if (!reflectionResponse.trim()) { + const reflectionResponse = capture.readResponse().trim(); + if (!reflectionResponse) { log.debug?.("reflection produced no output"); return; } @@ -339,16 +223,13 @@ export async function runFeedbackReflection(params: RunFeedbackReflectionParams) return; } - // Reflection succeeded — record cooldown now recordReflectionTime(sessionKey, cooldownMs); - log.info("reflection complete", { sessionKey, responseLength: reflectionResponse.length, followUp: parsedReflection.followUp, }); - // Store the learning in the session try { await storeSessionLearning({ storePath, @@ -360,87 +241,39 @@ export async function runFeedbackReflection(params: RunFeedbackReflectionParams) } const conversationType = params.conversationRef.conversation?.conversationType?.toLowerCase(); - const isDirectMessage = conversationType === "personal"; const shouldNotify = - isDirectMessage && parsedReflection.followUp && Boolean(parsedReflection.userMessage); + conversationType === "personal" && + parsedReflection.followUp && + Boolean(parsedReflection.userMessage); - if (shouldNotify) { - try { - const baseRef = buildConversationReference(params.conversationRef); - const proactiveRef = { ...baseRef, activityId: undefined }; - - await params.adapter.continueConversation(params.appId, proactiveRef, async (ctx) => { - await ctx.sendActivity({ - type: "message", - text: parsedReflection.userMessage!, - }); + if (!shouldNotify) { + if (parsedReflection.followUp && conversationType !== "personal") { + log.debug?.("skipping reflection follow-up outside direct message", { + sessionKey, + conversationType, }); - log.info("sent reflection follow-up", { sessionKey }); - } catch (err) { - log.debug?.("failed to send reflection follow-up", { error: String(err) }); } - } else if (parsedReflection.followUp && !isDirectMessage) { - log.debug?.("skipping reflection follow-up outside direct message", { - sessionKey, - conversationType, + return; + } + + try { + await sendReflectionFollowUp({ + adapter: params.adapter, + appId: params.appId, + conversationRef: params.conversationRef, + userMessage: parsedReflection.userMessage!, }); + log.info("sent reflection follow-up", { sessionKey }); + } catch (err) { + log.debug?.("failed to send reflection follow-up", { error: String(err) }); } } -/** - * Store a learning derived from feedback reflection in a session companion file. - */ -async function storeSessionLearning(params: { - storePath: string; - sessionKey: string; - learning: string; -}): Promise { - const fs = await import("node:fs/promises"); - const path = await import("node:path"); - - const safeKey = params.sessionKey.replace(/[^a-zA-Z0-9_-]/g, "_"); - const learningsFile = path.join(params.storePath, `${safeKey}.learnings.json`); - - let learnings: string[] = []; - try { - const existing = await fs.readFile(learningsFile, "utf-8"); - const parsed = JSON.parse(existing); - if (Array.isArray(parsed)) { - learnings = parsed; - } - } catch { - // File doesn't exist yet — start fresh. - } - - learnings.push(params.learning); - - // Cap at 10 learnings to avoid unbounded growth - if (learnings.length > 10) { - learnings = learnings.slice(-10); - } - - await fs.mkdir(path.dirname(learningsFile), { recursive: true }); - await fs.writeFile(learningsFile, JSON.stringify(learnings, null, 2), "utf-8"); -} - -/** - * Load session learnings for injection into extraSystemPrompt. - */ -export async function loadSessionLearnings( - storePath: string, - sessionKey: string, -): Promise { - const fs = await import("node:fs/promises"); - const path = await import("node:path"); - - const safeKey = sessionKey.replace(/[^a-zA-Z0-9_-]/g, "_"); - const learningsFile = path.join(storePath, `${safeKey}.learnings.json`); - - try { - const content = await fs.readFile(learningsFile, "utf-8"); - const parsed = JSON.parse(content); - return Array.isArray(parsed) ? parsed : []; - } catch { - return []; - } -} +export { + buildReflectionPrompt, + clearReflectionCooldowns, + isReflectionAllowed, + loadSessionLearnings, + parseReflectionResponse, + recordReflectionTime, +}; diff --git a/extensions/msteams/src/reply-dispatcher.ts b/extensions/msteams/src/reply-dispatcher.ts index e25f7c441ae..66fc7356b88 100644 --- a/extensions/msteams/src/reply-dispatcher.ts +++ b/extensions/msteams/src/reply-dispatcher.ts @@ -21,22 +21,12 @@ import { sendMSTeamsMessages, } from "./messenger.js"; import type { MSTeamsMonitorLogger } from "./monitor-types.js"; +import { createTeamsReplyStreamController } from "./reply-stream-controller.js"; import { withRevokedProxyFallback } from "./revoked-context.js"; import { getMSTeamsRuntime } from "./runtime.js"; import type { MSTeamsTurnContext } from "./sdk-types.js"; -import { TeamsHttpStream } from "./streaming-message.js"; -const INFORMATIVE_STATUS_TEXTS = [ - "Thinking...", - "Working on that...", - "Checking the details...", - "Putting an answer together...", -]; - -export function pickInformativeStatusText(random = Math.random): string { - const index = Math.floor(random() * INFORMATIVE_STATUS_TEXTS.length); - return INFORMATIVE_STATUS_TEXTS[index] ?? INFORMATIVE_STATUS_TEXTS[0]!; -} +export { pickInformativeStatusText } from "./reply-stream-controller.js"; export function createMSTeamsReplyDispatcher(params: { cfg: OpenClawConfig; @@ -51,53 +41,35 @@ export function createMSTeamsReplyDispatcher(params: { replyStyle: MSTeamsReplyStyle; textLimit: number; onSentMessageIds?: (ids: string[]) => void; - /** Token provider for OneDrive/SharePoint uploads in group chats/channels */ tokenProvider?: MSTeamsAccessTokenProvider; - /** SharePoint site ID for file uploads in group chats/channels */ sharePointSiteId?: string; }) { const core = getMSTeamsRuntime(); - - // Determine conversation type to decide typing vs streaming behavior: - // - personal (1:1): typing bubble + streaming (typing shows immediately, - // streaming takes over once tokens arrive) - // - groupChat: typing bubble only, no streaming - // - channel: neither (Teams doesn't support typing or streaming in channels) const conversationType = params.conversationRef.conversation?.conversationType?.toLowerCase(); - const isPersonal = conversationType === "personal"; - const isGroupChat = conversationType === "groupchat"; - const isChannel = conversationType === "channel"; + const isTypingSupported = conversationType === "personal" || conversationType === "groupchat"; - /** - * Send a typing indicator. - * Sent for personal and group chats so users see immediate feedback. - * Channels don't support typing indicators. - */ - const sendTypingIndicator = - isPersonal || isGroupChat - ? async () => { - await withRevokedProxyFallback({ - run: async () => { - await params.context.sendActivity({ type: "typing" }); - }, - onRevoked: async () => { - const baseRef = buildConversationReference(params.conversationRef); - await params.adapter.continueConversation( - params.appId, - { ...baseRef, activityId: undefined }, - async (ctx) => { - await ctx.sendActivity({ type: "typing" }); - }, - ); - }, - onRevokedLog: () => { - params.log.debug?.("turn context revoked, sending typing via proactive messaging"); - }, - }); - } - : async () => { - // No-op for channels (not supported) - }; + const sendTypingIndicator = isTypingSupported + ? async () => { + await withRevokedProxyFallback({ + run: async () => { + await params.context.sendActivity({ type: "typing" }); + }, + onRevoked: async () => { + const baseRef = buildConversationReference(params.conversationRef); + await params.adapter.continueConversation( + params.appId, + { ...baseRef, activityId: undefined }, + async (ctx) => { + await ctx.sendActivity({ type: "typing" }); + }, + ); + }, + onRevokedLog: () => { + params.log.debug?.("turn context revoked, sending typing via proactive messaging"); + }, + }); + } + : async () => {}; const { onModelSelected, typingCallbacks, ...replyPipeline } = createChannelReplyPipeline({ cfg: params.cfg, @@ -116,6 +88,7 @@ export function createMSTeamsReplyDispatcher(params: { }, }, }); + const chunkMode = core.channel.text.resolveChunkMode(params.cfg, "msteams"); const tableMode = core.channel.text.resolveMarkdownTableMode({ cfg: params.cfg, @@ -126,26 +99,13 @@ export function createMSTeamsReplyDispatcher(params: { resolveChannelLimitMb: ({ cfg }) => cfg.channels?.msteams?.mediaMaxMb, }); const feedbackLoopEnabled = params.cfg.channels?.msteams?.feedbackEnabled !== false; + const streamController = createTeamsReplyStreamController({ + conversationType, + context: params.context, + feedbackLoopEnabled, + log: params.log, + }); - // Streaming for personal (1:1) chats using the Teams streaminfo protocol. - let stream: TeamsHttpStream | undefined; - // Track whether onPartialReply was ever called — if so, the stream - // owns the text delivery and deliver should skip text payloads. - let streamReceivedTokens = false; - let informativeUpdateSent = false; - - if (isPersonal) { - stream = new TeamsHttpStream({ - sendActivity: (activity) => params.context.sendActivity(activity), - feedbackLoopEnabled, - onError: (err) => { - params.log.debug?.(`stream error: ${err instanceof Error ? err.message : String(err)}`); - }, - }); - } - - // Accumulate rendered messages from all deliver() calls so the entire turn's - // reply is sent in a single sendMSTeamsMessages() call. (#29379) const pendingMessages: MSTeamsRenderedMessage[] = []; const sendMessages = async (messages: MSTeamsRenderedMessage[]): Promise => { @@ -211,30 +171,17 @@ export function createMSTeamsReplyDispatcher(params: { ...replyPipeline, humanDelay: core.channel.reply.resolveHumanDelayConfig(params.cfg, params.agentId), onReplyStart: async () => { - if (stream && !informativeUpdateSent) { - informativeUpdateSent = true; - await stream.sendInformativeUpdate(pickInformativeStatusText()); - } + await streamController.onReplyStart(); await typingCallbacks?.onReplyStart?.(); }, typingCallbacks, deliver: async (payload) => { - // When streaming received tokens AND hasn't failed, skip text delivery — - // finalize() handles the final message. If streaming failed (>4000 chars), - // fall through so deliver sends the complete response. - // For payloads with media, strip the text and send media only. - if (stream && streamReceivedTokens && stream.hasContent) { - const hasMedia = Boolean(payload.mediaUrl || payload.mediaUrls?.length); - if (!hasMedia) { - return; - } - payload = { ...payload, text: undefined }; + const preparedPayload = streamController.preparePayload(payload); + if (!preparedPayload) { + return; } - // Render the payload to messages and accumulate them. All messages from - // this turn are flushed together in markDispatchIdle() so they go out - // in a single continueConversation() call. - const messages = renderReplyPayloadsToMessages([payload], { + const messages = renderReplyPayloadsToMessages([preparedPayload], { textChunkLimit: params.textLimit, chunkText: true, mediaMode: "split", @@ -259,7 +206,6 @@ export function createMSTeamsReplyDispatcher(params: { }, }); - // Wrap markDispatchIdle to flush accumulated messages and finalize stream. const markDispatchIdle = (): Promise => { return flushPendingMessages() .catch((err) => { @@ -274,32 +220,27 @@ export function createMSTeamsReplyDispatcher(params: { }); }) .then(() => { - if (stream) { - return stream.finalize().catch((err) => { - params.log.debug?.("stream finalize failed", { error: String(err) }); - }); - } + return streamController.finalize().catch((err) => { + params.log.debug?.("stream finalize failed", { error: String(err) }); + }); }) .finally(() => { baseMarkDispatchIdle(); }); }; - // Build reply options with onPartialReply for streaming. - const streamingReplyOptions = stream - ? { - onPartialReply: (payload: { text?: string }) => { - if (payload.text) { - streamReceivedTokens = true; - stream!.update(payload.text); - } - }, - } - : {}; - return { dispatcher, - replyOptions: { ...replyOptions, ...streamingReplyOptions, onModelSelected }, + replyOptions: { + ...replyOptions, + ...(streamController.hasStream() + ? { + onPartialReply: (payload: { text?: string }) => + streamController.onPartialReply(payload), + } + : {}), + onModelSelected, + }, markDispatchIdle, }; } diff --git a/extensions/msteams/src/reply-stream-controller.ts b/extensions/msteams/src/reply-stream-controller.ts new file mode 100644 index 00000000000..20299caac6b --- /dev/null +++ b/extensions/msteams/src/reply-stream-controller.ts @@ -0,0 +1,76 @@ +import type { ReplyPayload } from "../runtime-api.js"; +import type { MSTeamsMonitorLogger } from "./monitor-types.js"; +import type { MSTeamsTurnContext } from "./sdk-types.js"; +import { TeamsHttpStream } from "./streaming-message.js"; + +const INFORMATIVE_STATUS_TEXTS = [ + "Thinking...", + "Working on that...", + "Checking the details...", + "Putting an answer together...", +]; + +export function pickInformativeStatusText(random = Math.random): string { + const index = Math.floor(random() * INFORMATIVE_STATUS_TEXTS.length); + return INFORMATIVE_STATUS_TEXTS[index] ?? INFORMATIVE_STATUS_TEXTS[0]!; +} + +export function createTeamsReplyStreamController(params: { + conversationType?: string; + context: MSTeamsTurnContext; + feedbackLoopEnabled: boolean; + log: MSTeamsMonitorLogger; + random?: () => number; +}) { + const isPersonal = params.conversationType?.toLowerCase() === "personal"; + const stream = isPersonal + ? new TeamsHttpStream({ + sendActivity: (activity) => params.context.sendActivity(activity), + feedbackLoopEnabled: params.feedbackLoopEnabled, + onError: (err) => { + params.log.debug?.(`stream error: ${err instanceof Error ? err.message : String(err)}`); + }, + }) + : undefined; + + let streamReceivedTokens = false; + let informativeUpdateSent = false; + + return { + async onReplyStart(): Promise { + if (!stream || informativeUpdateSent) { + return; + } + informativeUpdateSent = true; + await stream.sendInformativeUpdate(pickInformativeStatusText(params.random)); + }, + + onPartialReply(payload: { text?: string }): void { + if (!stream || !payload.text) { + return; + } + streamReceivedTokens = true; + stream.update(payload.text); + }, + + preparePayload(payload: ReplyPayload): ReplyPayload | undefined { + if (!stream || !streamReceivedTokens || !stream.hasContent) { + return payload; + } + + const hasMedia = Boolean(payload.mediaUrl || payload.mediaUrls?.length); + if (!hasMedia) { + return undefined; + } + return { ...payload, text: undefined }; + }, + + async finalize(): Promise { + await stream?.finalize(); + }, + + hasStream(): boolean { + return Boolean(stream); + }, + }; +} diff --git a/extensions/msteams/src/sdk.ts b/extensions/msteams/src/sdk.ts index dbde9660cf6..e1cbee5e5a9 100644 --- a/extensions/msteams/src/sdk.ts +++ b/extensions/msteams/src/sdk.ts @@ -24,6 +24,24 @@ export type MSTeamsTokenProvider = { getAccessToken: (scope: string) => Promise; }; +type MSTeamsBotIdentity = { + id?: string; + name?: string; +}; + +type MSTeamsSendContext = { + sendActivity: (textOrActivity: string | object) => Promise; + updateActivity: (activityUpdate: object) => Promise<{ id?: string } | void>; + deleteActivity: (activityId: string) => Promise; +}; + +type MSTeamsProcessContext = MSTeamsSendContext & { + activity: Record | undefined; + sendActivities: ( + activities: Array<{ type: string } & Record>, + ) => Promise; +}; + export async function loadMSTeamsSdk(): Promise { const [appsModule, apiModule] = await Promise.all([ import("@microsoft/teams.apps"), @@ -70,6 +88,157 @@ export function createMSTeamsTokenProvider(app: MSTeamsApp): MSTeamsTokenProvide }; } +function createBotTokenGetter(app: MSTeamsApp): () => Promise { + return async () => { + const token = await ( + app as unknown as { getBotToken(): Promise<{ toString(): string } | null> } + ).getBotToken(); + return token ? String(token) : undefined; + }; +} + +function createApiClient( + sdk: MSTeamsTeamsSdk, + serviceUrl: string, + getToken: () => Promise, +) { + return new sdk.Client(serviceUrl, { + token: async () => (await getToken()) || undefined, + headers: { "User-Agent": buildUserAgent() }, + } as Record); +} + +function normalizeOutboundActivity(textOrActivity: string | object): Record { + return typeof textOrActivity === "string" + ? ({ type: "message", text: textOrActivity } as Record) + : (textOrActivity as Record); +} + +function createSendContext(params: { + sdk: MSTeamsTeamsSdk; + serviceUrl?: string; + conversationId?: string; + conversationType?: string; + bot?: MSTeamsBotIdentity; + replyToActivityId?: string; + getToken: () => Promise; + treatInvokeResponseAsNoop?: boolean; +}): MSTeamsSendContext { + const apiClient = + params.serviceUrl && params.conversationId + ? createApiClient(params.sdk, params.serviceUrl, params.getToken) + : undefined; + + return { + async sendActivity(textOrActivity: string | object): Promise { + const msg = normalizeOutboundActivity(textOrActivity); + if (params.treatInvokeResponseAsNoop && msg.type === "invokeResponse") { + return { id: "invokeResponse" }; + } + if (!apiClient || !params.conversationId) { + return { id: "unknown" }; + } + + return await apiClient.conversations.activities(params.conversationId).create({ + type: "message", + ...msg, + from: params.bot?.id + ? { id: params.bot.id, name: params.bot.name ?? "", role: "bot" } + : undefined, + conversation: { + id: params.conversationId, + conversationType: params.conversationType ?? "personal", + }, + ...(params.replyToActivityId && !msg.replyToId + ? { replyToId: params.replyToActivityId } + : {}), + } as Parameters< + typeof apiClient.conversations.activities extends (id: string) => { + create: (a: infer T) => unknown; + } + ? never + : never + >[0]); + }, + + async updateActivity(activityUpdate: object): Promise<{ id?: string } | void> { + const nextActivity = activityUpdate as { id?: string } & Record; + const activityId = nextActivity.id; + if (!activityId) { + throw new Error("updateActivity requires an activity id"); + } + if (!params.serviceUrl || !params.conversationId) { + return { id: "unknown" }; + } + return await updateActivityViaRest({ + serviceUrl: params.serviceUrl, + conversationId: params.conversationId, + activityId, + activity: nextActivity, + token: await params.getToken(), + }); + }, + + async deleteActivity(activityId: string): Promise { + if (!activityId) { + throw new Error("deleteActivity requires an activity id"); + } + if (!params.serviceUrl || !params.conversationId) { + return; + } + await deleteActivityViaRest({ + serviceUrl: params.serviceUrl, + conversationId: params.conversationId, + activityId, + token: await params.getToken(), + }); + }, + }; +} + +function createProcessContext(params: { + sdk: MSTeamsTeamsSdk; + activity: Record | undefined; + getToken: () => Promise; +}): MSTeamsProcessContext { + const serviceUrl = params.activity?.serviceUrl as string | undefined; + const conversationId = (params.activity?.conversation as Record)?.id as + | string + | undefined; + const conversationType = (params.activity?.conversation as Record) + ?.conversationType as string | undefined; + const replyToActivityId = params.activity?.id as string | undefined; + const bot: MSTeamsBotIdentity | undefined = + params.activity?.recipient && typeof params.activity.recipient === "object" + ? { + id: (params.activity.recipient as Record).id as string | undefined, + name: (params.activity.recipient as Record).name as string | undefined, + } + : undefined; + const sendContext = createSendContext({ + sdk: params.sdk, + serviceUrl, + conversationId, + conversationType, + bot, + replyToActivityId, + getToken: params.getToken, + treatInvokeResponseAsNoop: true, + }); + + return { + activity: params.activity, + ...sendContext, + async sendActivities(activities: Array<{ type: string } & Record>) { + const results = []; + for (const activity of activities) { + results.push(await sendContext.sendActivity(activity)); + } + return results; + }, + }; +} + /** * Update an existing activity via the Bot Framework REST API. * PUT /v3/conversations/{conversationId}/activities/{activityId} @@ -168,76 +337,14 @@ export function createMSTeamsAdapter(app: MSTeamsApp, sdk: MSTeamsTeamsSdk): MST throw new Error("Missing conversation.id in conversation reference"); } - // Fetch a fresh token for each call via a token factory. - // The SDK's App manages token caching/refresh internally. - const getToken = async () => { - const token = await ( - app as unknown as { getBotToken(): Promise<{ toString(): string } | null> } - ).getBotToken(); - return token ? String(token) : undefined; - }; - - // Build a send context that uses the Bot Framework REST API. - // Pass a token factory (not a cached value) so each request gets a fresh token. - const apiClient = new sdk.Client(serviceUrl, { - token: async () => (await getToken()) || undefined, - headers: { "User-Agent": buildUserAgent() }, - } as Record); - - const sendContext = { - async sendActivity(textOrActivity: string | object): Promise { - const activity = - typeof textOrActivity === "string" - ? ({ type: "message", text: textOrActivity } as Record) - : (textOrActivity as Record); - - const response = await apiClient.conversations.activities(conversationId).create({ - type: "message", - ...activity, - from: reference.agent - ? { id: reference.agent.id, name: reference.agent.name ?? "", role: "bot" } - : undefined, - conversation: { - id: conversationId, - conversationType: reference.conversation?.conversationType ?? "personal", - }, - } as Parameters< - typeof apiClient.conversations.activities extends (id: string) => { - create: (a: infer T) => unknown; - } - ? never - : never - >[0]); - - return response; - }, - async updateActivity(activityUpdate: object): Promise<{ id?: string } | void> { - const nextActivity = activityUpdate as { id?: string } & Record; - const activityId = nextActivity.id; - if (!activityId) { - throw new Error("updateActivity requires an activity id"); - } - // Bot Framework REST API: PUT /v3/conversations/{conversationId}/activities/{activityId} - return await updateActivityViaRest({ - serviceUrl, - conversationId, - activityId, - activity: nextActivity, - token: await getToken(), - }); - }, - async deleteActivity(activityId: string): Promise { - if (!activityId) { - throw new Error("deleteActivity requires an activity id"); - } - await deleteActivityViaRest({ - serviceUrl, - conversationId, - activityId, - token: await getToken(), - }); - }, - }; + const sendContext = createSendContext({ + sdk, + serviceUrl, + conversationId, + conversationType: reference.conversation?.conversationType, + bot: reference.agent ?? undefined, + getToken: createBotTokenGetter(app), + }); await logic(sendContext); }, @@ -252,105 +359,11 @@ export function createMSTeamsAdapter(app: MSTeamsApp, sdk: MSTeamsTeamsSdk): MST const isInvoke = (activity as Record)?.type === "invoke"; try { - const serviceUrl = activity?.serviceUrl as string | undefined; - - // Token factory — fetches a fresh token for each API call. - const getToken = async () => { - const token = await ( - app as unknown as { getBotToken(): Promise<{ toString(): string } | null> } - ).getBotToken(); - return token ? String(token) : undefined; - }; - - const context = { + const context = createProcessContext({ + sdk, activity, - async sendActivity(textOrActivity: string | object): Promise { - const msg = - typeof textOrActivity === "string" - ? ({ type: "message", text: textOrActivity } as Record) - : (textOrActivity as Record); - - // invokeResponse is handled by the HTTP response from process(), - // not by posting a new activity to Bot Framework. - if (msg.type === "invokeResponse") { - return { id: "invokeResponse" }; - } - - if (!serviceUrl) { - return { id: "unknown" }; - } - - const convId = (activity?.conversation as Record)?.id as - | string - | undefined; - if (!convId) { - return { id: "unknown" }; - } - - const apiClient = new sdk.Client(serviceUrl, { - token: async () => (await getToken()) || undefined, - headers: { "User-Agent": buildUserAgent() }, - } as Record); - - const botId = (activity?.recipient as Record)?.id as - | string - | undefined; - const botName = (activity?.recipient as Record)?.name as - | string - | undefined; - const convType = (activity?.conversation as Record) - ?.conversationType as string | undefined; - - // Preserve replyToId for threaded replies (replyStyle: "thread") - const inboundActivityId = (activity as Record)?.id as - | string - | undefined; - - return await apiClient.conversations.activities(convId).create({ - type: "message", - ...msg, - from: botId ? { id: botId, name: botName ?? "", role: "bot" } : undefined, - conversation: { id: convId, conversationType: convType ?? "personal" }, - ...(inboundActivityId && !msg.replyToId ? { replyToId: inboundActivityId } : {}), - } as Parameters< - typeof apiClient.conversations.activities extends (id: string) => { - create: (a: infer T) => unknown; - } - ? never - : never - >[0]); - }, - async sendActivities( - activities: Array<{ type: string } & Record>, - ): Promise { - const results = []; - for (const act of activities) { - results.push(await context.sendActivity(act)); - } - return results; - }, - async updateActivity( - activityUpdate: { id: string } & Record, - ): Promise { - const activityId = activityUpdate.id; - if (!activityId || !serviceUrl) { - return { id: "unknown" }; - } - const convId = (activity?.conversation as Record)?.id as - | string - | undefined; - if (!convId) { - return { id: "unknown" }; - } - return await updateActivityViaRest({ - serviceUrl, - conversationId: convId, - activityId, - activity: activityUpdate, - token: await getToken(), - }); - }, - }; + getToken: createBotTokenGetter(app), + }); // For invoke activities, send HTTP 200 immediately before running // handler logic so slow operations (file uploads, reflections) don't