refactor(msteams): split reply and reflection helpers

This commit is contained in:
Peter Steinberger 2026-03-24 10:02:42 -07:00
parent 9f47892bef
commit 27448c3113
No known key found for this signature in database
6 changed files with 661 additions and 584 deletions

View File

@ -0,0 +1,115 @@
/** Max chars of the thumbed-down response to include in the reflection prompt. */
const MAX_RESPONSE_CHARS = 500;
export type ParsedReflectionResponse = {
learning: string;
followUp: boolean;
userMessage?: string;
};
export function buildReflectionPrompt(params: {
thumbedDownResponse?: string;
userComment?: string;
}): string {
const parts: string[] = ["A user indicated your previous response wasn't helpful."];
if (params.thumbedDownResponse) {
const truncated =
params.thumbedDownResponse.length > MAX_RESPONSE_CHARS
? `${params.thumbedDownResponse.slice(0, MAX_RESPONSE_CHARS)}...`
: params.thumbedDownResponse;
parts.push(`\nYour response was:\n> ${truncated}`);
}
if (params.userComment) {
parts.push(`\nUser's comment: "${params.userComment}"`);
}
parts.push(
"\nBriefly reflect: what could you improve? Consider tone, length, " +
"accuracy, relevance, and specificity. Reply with a single JSON object " +
'only, no markdown or prose, using this exact shape:\n{"learning":"...",' +
'"followUp":false,"userMessage":""}\n' +
"- learning: a short internal adjustment note (1-2 sentences) for your " +
"future behavior in this conversation.\n" +
"- followUp: true only if the user needs a direct follow-up message.\n" +
"- userMessage: only the exact user-facing message to send; empty string " +
"when followUp is false.",
);
return parts.join("\n");
}
function parseBooleanLike(value: unknown): boolean | undefined {
if (typeof value === "boolean") {
return value;
}
if (typeof value === "string") {
const normalized = value.trim().toLowerCase();
if (normalized === "true" || normalized === "yes") {
return true;
}
if (normalized === "false" || normalized === "no") {
return false;
}
}
return undefined;
}
function parseStructuredReflectionValue(value: unknown): ParsedReflectionResponse | null {
if (value == null || typeof value !== "object" || Array.isArray(value)) {
return null;
}
const candidate = value as {
learning?: unknown;
followUp?: unknown;
userMessage?: unknown;
};
const learning = typeof candidate.learning === "string" ? candidate.learning.trim() : undefined;
if (!learning) {
return null;
}
return {
learning,
followUp: parseBooleanLike(candidate.followUp) ?? false,
userMessage:
typeof candidate.userMessage === "string" && candidate.userMessage.trim()
? candidate.userMessage.trim()
: undefined,
};
}
export function parseReflectionResponse(text: string): ParsedReflectionResponse | null {
const trimmed = text.trim();
if (!trimmed) {
return null;
}
const candidates = [
trimmed,
...(trimmed.match(/```(?:json)?\s*([\s\S]*?)```/i)?.slice(1, 2) ?? []),
];
for (const candidateText of candidates) {
const candidate = candidateText.trim();
if (!candidate) {
continue;
}
try {
const parsed = parseStructuredReflectionValue(JSON.parse(candidate));
if (parsed) {
return parsed;
}
} catch {
// Fall through to the next parse strategy.
}
}
// Safe fallback: keep the internal learning, but never auto-message the user.
return {
learning: trimmed,
followUp: false,
};
}

View File

@ -0,0 +1,99 @@
/** Default cooldown between reflections per session (5 minutes). */
export const DEFAULT_COOLDOWN_MS = 300_000;
/** Tracks last reflection time per session to enforce cooldown. */
const lastReflectionBySession = new Map<string, number>();
/** Maximum cooldown entries before pruning expired ones. */
const MAX_COOLDOWN_ENTRIES = 500;
function sanitizeSessionKey(sessionKey: string): string {
return sessionKey.replace(/[^a-zA-Z0-9_-]/g, "_");
}
/** Prune expired cooldown entries to prevent unbounded memory growth. */
function pruneExpiredCooldowns(cooldownMs: number): void {
if (lastReflectionBySession.size <= MAX_COOLDOWN_ENTRIES) {
return;
}
const now = Date.now();
for (const [key, time] of lastReflectionBySession) {
if (now - time >= cooldownMs) {
lastReflectionBySession.delete(key);
}
}
}
/** Check if a reflection is allowed (cooldown not active). */
export function isReflectionAllowed(sessionKey: string, cooldownMs?: number): boolean {
const cooldown = cooldownMs ?? DEFAULT_COOLDOWN_MS;
const lastTime = lastReflectionBySession.get(sessionKey);
if (lastTime == null) {
return true;
}
return Date.now() - lastTime >= cooldown;
}
/** Record that a reflection was run for a session. */
export function recordReflectionTime(sessionKey: string, cooldownMs?: number): void {
lastReflectionBySession.set(sessionKey, Date.now());
pruneExpiredCooldowns(cooldownMs ?? DEFAULT_COOLDOWN_MS);
}
/** Clear reflection cooldown tracking (for tests). */
export function clearReflectionCooldowns(): void {
lastReflectionBySession.clear();
}
/** Store a learning derived from feedback reflection in a session companion file. */
export async function storeSessionLearning(params: {
storePath: string;
sessionKey: string;
learning: string;
}): Promise<void> {
const fs = await import("node:fs/promises");
const path = await import("node:path");
const learningsFile = path.join(
params.storePath,
`${sanitizeSessionKey(params.sessionKey)}.learnings.json`,
);
let learnings: string[] = [];
try {
const existing = await fs.readFile(learningsFile, "utf-8");
const parsed = JSON.parse(existing);
if (Array.isArray(parsed)) {
learnings = parsed;
}
} catch {
// File doesn't exist yet — start fresh.
}
learnings.push(params.learning);
if (learnings.length > 10) {
learnings = learnings.slice(-10);
}
await fs.mkdir(path.dirname(learningsFile), { recursive: true });
await fs.writeFile(learningsFile, JSON.stringify(learnings, null, 2), "utf-8");
}
/** Load session learnings for injection into extraSystemPrompt. */
export async function loadSessionLearnings(
storePath: string,
sessionKey: string,
): Promise<string[]> {
const fs = await import("node:fs/promises");
const path = await import("node:path");
const learningsFile = path.join(storePath, `${sanitizeSessionKey(sessionKey)}.learnings.json`);
try {
const content = await fs.readFile(learningsFile, "utf-8");
const parsed = JSON.parse(content);
return Array.isArray(parsed) ? parsed : [];
} catch {
return [];
}
}

View File

@ -2,7 +2,7 @@
* Background reflection triggered by negative user feedback (thumbs-down).
*
* Flow:
* 1. User thumbs-down invoke handler acks immediately
* 1. User thumbs-down -> invoke handler acks immediately
* 2. This module runs in the background (fire-and-forget)
* 3. Reads recent session context
* 4. Sends a synthetic reflection prompt to the agent
@ -15,36 +15,20 @@ import {
type OpenClawConfig,
} from "../runtime-api.js";
import type { StoredConversationReference } from "./conversation-store.js";
import { buildReflectionPrompt, parseReflectionResponse } from "./feedback-reflection-prompt.js";
import {
DEFAULT_COOLDOWN_MS,
clearReflectionCooldowns,
isReflectionAllowed,
loadSessionLearnings,
recordReflectionTime,
storeSessionLearning,
} from "./feedback-reflection-store.js";
import type { MSTeamsAdapter } from "./messenger.js";
import { buildConversationReference, sendMSTeamsMessages } from "./messenger.js";
import { buildConversationReference } from "./messenger.js";
import type { MSTeamsMonitorLogger } from "./monitor-types.js";
import { getMSTeamsRuntime } from "./runtime.js";
/** Default cooldown between reflections per session (5 minutes). */
const DEFAULT_COOLDOWN_MS = 300_000;
/** Max chars of the thumbed-down response to include in the reflection prompt. */
const MAX_RESPONSE_CHARS = 500;
/** Tracks last reflection time per session to enforce cooldown. */
const lastReflectionBySession = new Map<string, number>();
/** Maximum cooldown entries before pruning expired ones. */
const MAX_COOLDOWN_ENTRIES = 500;
/** Prune expired cooldown entries to prevent unbounded memory growth. */
function pruneExpiredCooldowns(cooldownMs: number): void {
if (lastReflectionBySession.size <= MAX_COOLDOWN_ENTRIES) {
return;
}
const now = Date.now();
for (const [key, time] of lastReflectionBySession) {
if (now - time >= cooldownMs) {
lastReflectionBySession.delete(key);
}
}
}
export type FeedbackEvent = {
type: "custom";
event: "feedback";
@ -79,146 +63,6 @@ export function buildFeedbackEvent(params: {
};
}
export type ParsedReflectionResponse = {
learning: string;
followUp: boolean;
userMessage?: string;
};
export function buildReflectionPrompt(params: {
thumbedDownResponse?: string;
userComment?: string;
}): string {
const parts: string[] = ["A user indicated your previous response wasn't helpful."];
if (params.thumbedDownResponse) {
const truncated =
params.thumbedDownResponse.length > MAX_RESPONSE_CHARS
? `${params.thumbedDownResponse.slice(0, MAX_RESPONSE_CHARS)}...`
: params.thumbedDownResponse;
parts.push(`\nYour response was:\n> ${truncated}`);
}
if (params.userComment) {
parts.push(`\nUser's comment: "${params.userComment}"`);
}
parts.push(
"\nBriefly reflect: what could you improve? Consider tone, length, " +
"accuracy, relevance, and specificity. Reply with a single JSON object " +
'only, no markdown or prose, using this exact shape:\n{"learning":"...",' +
'"followUp":false,"userMessage":""}\n' +
"- learning: a short internal adjustment note (1-2 sentences) for your " +
"future behavior in this conversation.\n" +
"- followUp: true only if the user needs a direct follow-up message.\n" +
"- userMessage: only the exact user-facing message to send; empty string " +
"when followUp is false.",
);
return parts.join("\n");
}
function parseBooleanLike(value: unknown): boolean | undefined {
if (typeof value === "boolean") {
return value;
}
if (typeof value === "string") {
const normalized = value.trim().toLowerCase();
if (normalized === "true" || normalized === "yes") {
return true;
}
if (normalized === "false" || normalized === "no") {
return false;
}
}
return undefined;
}
function parseStructuredReflectionValue(value: unknown): ParsedReflectionResponse | null {
if (value == null || typeof value !== "object" || Array.isArray(value)) {
return null;
}
const candidate = value as {
learning?: unknown;
followUp?: unknown;
userMessage?: unknown;
};
const learning = typeof candidate.learning === "string" ? candidate.learning.trim() : undefined;
if (!learning) {
return null;
}
return {
learning,
followUp: parseBooleanLike(candidate.followUp) ?? false,
userMessage:
typeof candidate.userMessage === "string" && candidate.userMessage.trim()
? candidate.userMessage.trim()
: undefined,
};
}
export function parseReflectionResponse(text: string): ParsedReflectionResponse | null {
const trimmed = text.trim();
if (!trimmed) {
return null;
}
const candidates = [
trimmed,
...(trimmed.match(/```(?:json)?\s*([\s\S]*?)```/i)?.slice(1, 2) ?? []),
];
for (const candidateText of candidates) {
const candidate = candidateText.trim();
if (!candidate) {
continue;
}
try {
const parsed = parseStructuredReflectionValue(JSON.parse(candidate));
if (parsed) {
return parsed;
}
} catch {
// Fall through to the next parse strategy.
}
}
// Safe fallback: keep the internal learning, but never auto-message the user.
return {
learning: trimmed,
followUp: false,
};
}
/**
* Check if a reflection is allowed (cooldown not active).
*/
export function isReflectionAllowed(sessionKey: string, cooldownMs?: number): boolean {
const cooldown = cooldownMs ?? DEFAULT_COOLDOWN_MS;
const lastTime = lastReflectionBySession.get(sessionKey);
if (lastTime == null) {
return true;
}
return Date.now() - lastTime >= cooldown;
}
/**
* Record that a reflection was run for a session.
*/
export function recordReflectionTime(sessionKey: string, cooldownMs?: number): void {
lastReflectionBySession.set(sessionKey, Date.now());
pruneExpiredCooldowns(cooldownMs ?? DEFAULT_COOLDOWN_MS);
}
/**
* Clear reflection cooldown tracking (for tests).
*/
export function clearReflectionCooldowns(): void {
lastReflectionBySession.clear();
}
export type RunFeedbackReflectionParams = {
cfg: OpenClawConfig;
adapter: MSTeamsAdapter;
@ -233,68 +77,51 @@ export type RunFeedbackReflectionParams = {
log: MSTeamsMonitorLogger;
};
/**
* Run a background reflection after negative feedback.
* This is designed to be called fire-and-forget (don't await in the invoke handler).
*/
export async function runFeedbackReflection(params: RunFeedbackReflectionParams): Promise<void> {
const { cfg, log, sessionKey } = params;
const msteamsCfg = cfg.channels?.msteams;
// Check cooldown
const cooldownMs = msteamsCfg?.feedbackReflectionCooldownMs ?? DEFAULT_COOLDOWN_MS;
if (!isReflectionAllowed(sessionKey, cooldownMs)) {
log.debug?.("skipping reflection (cooldown active)", { sessionKey });
return;
}
// Record cooldown after successful dispatch (not before) so transient
// failures don't suppress future reflection attempts.
function buildReflectionContext(params: {
cfg: OpenClawConfig;
conversationId: string;
sessionKey: string;
reflectionPrompt: string;
}) {
const core = getMSTeamsRuntime();
const reflectionPrompt = buildReflectionPrompt({
thumbedDownResponse: params.thumbedDownResponse,
userComment: params.userComment,
});
// Use the agentId from the feedback handler (already resolved with correct routing)
// instead of re-resolving, which could yield a different agent in peer-specific setups.
const storePath = core.channel.session.resolveStorePath(cfg.session?.store, {
agentId: params.agentId,
});
const envelopeOptions = core.channel.reply.resolveEnvelopeFormatOptions(cfg);
const envelopeOptions = core.channel.reply.resolveEnvelopeFormatOptions(params.cfg);
const body = core.channel.reply.formatAgentEnvelope({
channel: "Teams",
from: "system",
body: reflectionPrompt,
body: params.reflectionPrompt,
envelope: envelopeOptions,
});
const ctxPayload = core.channel.reply.finalizeInboundContext({
Body: body,
BodyForAgent: reflectionPrompt,
RawBody: reflectionPrompt,
CommandBody: reflectionPrompt,
From: `msteams:system:${params.conversationId}`,
To: `conversation:${params.conversationId}`,
SessionKey: params.sessionKey,
ChatType: "direct" as const,
SenderName: "system",
SenderId: "system",
Provider: "msteams" as const,
Surface: "msteams" as const,
Timestamp: Date.now(),
WasMentioned: true,
CommandAuthorized: false,
OriginatingChannel: "msteams" as const,
OriginatingTo: `conversation:${params.conversationId}`,
});
// Capture the reflection response instead of sending it directly.
// We only want to proactively message if the agent decides to follow up.
let reflectionResponse = "";
return {
ctxPayload: core.channel.reply.finalizeInboundContext({
Body: body,
BodyForAgent: params.reflectionPrompt,
RawBody: params.reflectionPrompt,
CommandBody: params.reflectionPrompt,
From: `msteams:system:${params.conversationId}`,
To: `conversation:${params.conversationId}`,
SessionKey: params.sessionKey,
ChatType: "direct" as const,
SenderName: "system",
SenderId: "system",
Provider: "msteams" as const,
Surface: "msteams" as const,
Timestamp: Date.now(),
WasMentioned: true,
CommandAuthorized: false,
OriginatingChannel: "msteams" as const,
OriginatingTo: `conversation:${params.conversationId}`,
}),
};
}
function createReflectionCaptureDispatcher(params: {
cfg: OpenClawConfig;
agentId: string;
log: MSTeamsMonitorLogger;
}) {
const core = getMSTeamsRuntime();
let response = "";
const noopTypingCallbacks = {
onReplyStart: async () => {},
onIdle: () => {},
@ -304,31 +131,88 @@ export async function runFeedbackReflection(params: RunFeedbackReflectionParams)
const { dispatcher, replyOptions } = core.channel.reply.createReplyDispatcherWithTyping({
deliver: async (payload) => {
if (payload.text) {
reflectionResponse += (reflectionResponse ? "\n" : "") + payload.text;
response += (response ? "\n" : "") + payload.text;
}
},
typingCallbacks: noopTypingCallbacks,
humanDelay: core.channel.reply.resolveHumanDelayConfig(cfg, params.agentId),
humanDelay: core.channel.reply.resolveHumanDelayConfig(params.cfg, params.agentId),
onError: (err) => {
log.debug?.("reflection reply error", { error: String(err) });
params.log.debug?.("reflection reply error", { error: String(err) });
},
});
return {
dispatcher,
replyOptions,
readResponse: () => response,
};
}
async function sendReflectionFollowUp(params: {
adapter: MSTeamsAdapter;
appId: string;
conversationRef: StoredConversationReference;
userMessage: string;
}): Promise<void> {
const baseRef = buildConversationReference(params.conversationRef);
const proactiveRef = { ...baseRef, activityId: undefined };
await params.adapter.continueConversation(params.appId, proactiveRef, async (ctx) => {
await ctx.sendActivity({
type: "message",
text: params.userMessage,
});
});
}
/**
* Run a background reflection after negative feedback.
* This is designed to be called fire-and-forget (don't await in the invoke handler).
*/
export async function runFeedbackReflection(params: RunFeedbackReflectionParams): Promise<void> {
const { cfg, log, sessionKey } = params;
const cooldownMs = cfg.channels?.msteams?.feedbackReflectionCooldownMs ?? DEFAULT_COOLDOWN_MS;
if (!isReflectionAllowed(sessionKey, cooldownMs)) {
log.debug?.("skipping reflection (cooldown active)", { sessionKey });
return;
}
const reflectionPrompt = buildReflectionPrompt({
thumbedDownResponse: params.thumbedDownResponse,
userComment: params.userComment,
});
const runtime = getMSTeamsRuntime();
const storePath = runtime.channel.session.resolveStorePath(cfg.session?.store, {
agentId: params.agentId,
});
const { ctxPayload } = buildReflectionContext({
cfg,
conversationId: params.conversationId,
sessionKey: params.sessionKey,
reflectionPrompt,
});
const capture = createReflectionCaptureDispatcher({
cfg,
agentId: params.agentId,
log,
});
try {
await dispatchReplyFromConfigWithSettledDispatcher({
ctxPayload,
cfg,
dispatcher,
dispatcher: capture.dispatcher,
onSettled: () => {},
replyOptions,
replyOptions: capture.replyOptions,
});
} catch (err) {
log.error("reflection dispatch failed", { error: String(err) });
// Don't record cooldown — allow retry on next feedback
return;
}
if (!reflectionResponse.trim()) {
const reflectionResponse = capture.readResponse().trim();
if (!reflectionResponse) {
log.debug?.("reflection produced no output");
return;
}
@ -339,16 +223,13 @@ export async function runFeedbackReflection(params: RunFeedbackReflectionParams)
return;
}
// Reflection succeeded — record cooldown now
recordReflectionTime(sessionKey, cooldownMs);
log.info("reflection complete", {
sessionKey,
responseLength: reflectionResponse.length,
followUp: parsedReflection.followUp,
});
// Store the learning in the session
try {
await storeSessionLearning({
storePath,
@ -360,87 +241,39 @@ export async function runFeedbackReflection(params: RunFeedbackReflectionParams)
}
const conversationType = params.conversationRef.conversation?.conversationType?.toLowerCase();
const isDirectMessage = conversationType === "personal";
const shouldNotify =
isDirectMessage && parsedReflection.followUp && Boolean(parsedReflection.userMessage);
conversationType === "personal" &&
parsedReflection.followUp &&
Boolean(parsedReflection.userMessage);
if (shouldNotify) {
try {
const baseRef = buildConversationReference(params.conversationRef);
const proactiveRef = { ...baseRef, activityId: undefined };
await params.adapter.continueConversation(params.appId, proactiveRef, async (ctx) => {
await ctx.sendActivity({
type: "message",
text: parsedReflection.userMessage!,
});
if (!shouldNotify) {
if (parsedReflection.followUp && conversationType !== "personal") {
log.debug?.("skipping reflection follow-up outside direct message", {
sessionKey,
conversationType,
});
log.info("sent reflection follow-up", { sessionKey });
} catch (err) {
log.debug?.("failed to send reflection follow-up", { error: String(err) });
}
} else if (parsedReflection.followUp && !isDirectMessage) {
log.debug?.("skipping reflection follow-up outside direct message", {
sessionKey,
conversationType,
return;
}
try {
await sendReflectionFollowUp({
adapter: params.adapter,
appId: params.appId,
conversationRef: params.conversationRef,
userMessage: parsedReflection.userMessage!,
});
log.info("sent reflection follow-up", { sessionKey });
} catch (err) {
log.debug?.("failed to send reflection follow-up", { error: String(err) });
}
}
/**
* Store a learning derived from feedback reflection in a session companion file.
*/
async function storeSessionLearning(params: {
storePath: string;
sessionKey: string;
learning: string;
}): Promise<void> {
const fs = await import("node:fs/promises");
const path = await import("node:path");
const safeKey = params.sessionKey.replace(/[^a-zA-Z0-9_-]/g, "_");
const learningsFile = path.join(params.storePath, `${safeKey}.learnings.json`);
let learnings: string[] = [];
try {
const existing = await fs.readFile(learningsFile, "utf-8");
const parsed = JSON.parse(existing);
if (Array.isArray(parsed)) {
learnings = parsed;
}
} catch {
// File doesn't exist yet — start fresh.
}
learnings.push(params.learning);
// Cap at 10 learnings to avoid unbounded growth
if (learnings.length > 10) {
learnings = learnings.slice(-10);
}
await fs.mkdir(path.dirname(learningsFile), { recursive: true });
await fs.writeFile(learningsFile, JSON.stringify(learnings, null, 2), "utf-8");
}
/**
* Load session learnings for injection into extraSystemPrompt.
*/
export async function loadSessionLearnings(
storePath: string,
sessionKey: string,
): Promise<string[]> {
const fs = await import("node:fs/promises");
const path = await import("node:path");
const safeKey = sessionKey.replace(/[^a-zA-Z0-9_-]/g, "_");
const learningsFile = path.join(storePath, `${safeKey}.learnings.json`);
try {
const content = await fs.readFile(learningsFile, "utf-8");
const parsed = JSON.parse(content);
return Array.isArray(parsed) ? parsed : [];
} catch {
return [];
}
}
export {
buildReflectionPrompt,
clearReflectionCooldowns,
isReflectionAllowed,
loadSessionLearnings,
parseReflectionResponse,
recordReflectionTime,
};

View File

@ -21,22 +21,12 @@ import {
sendMSTeamsMessages,
} from "./messenger.js";
import type { MSTeamsMonitorLogger } from "./monitor-types.js";
import { createTeamsReplyStreamController } from "./reply-stream-controller.js";
import { withRevokedProxyFallback } from "./revoked-context.js";
import { getMSTeamsRuntime } from "./runtime.js";
import type { MSTeamsTurnContext } from "./sdk-types.js";
import { TeamsHttpStream } from "./streaming-message.js";
const INFORMATIVE_STATUS_TEXTS = [
"Thinking...",
"Working on that...",
"Checking the details...",
"Putting an answer together...",
];
export function pickInformativeStatusText(random = Math.random): string {
const index = Math.floor(random() * INFORMATIVE_STATUS_TEXTS.length);
return INFORMATIVE_STATUS_TEXTS[index] ?? INFORMATIVE_STATUS_TEXTS[0]!;
}
export { pickInformativeStatusText } from "./reply-stream-controller.js";
export function createMSTeamsReplyDispatcher(params: {
cfg: OpenClawConfig;
@ -51,53 +41,35 @@ export function createMSTeamsReplyDispatcher(params: {
replyStyle: MSTeamsReplyStyle;
textLimit: number;
onSentMessageIds?: (ids: string[]) => void;
/** Token provider for OneDrive/SharePoint uploads in group chats/channels */
tokenProvider?: MSTeamsAccessTokenProvider;
/** SharePoint site ID for file uploads in group chats/channels */
sharePointSiteId?: string;
}) {
const core = getMSTeamsRuntime();
// Determine conversation type to decide typing vs streaming behavior:
// - personal (1:1): typing bubble + streaming (typing shows immediately,
// streaming takes over once tokens arrive)
// - groupChat: typing bubble only, no streaming
// - channel: neither (Teams doesn't support typing or streaming in channels)
const conversationType = params.conversationRef.conversation?.conversationType?.toLowerCase();
const isPersonal = conversationType === "personal";
const isGroupChat = conversationType === "groupchat";
const isChannel = conversationType === "channel";
const isTypingSupported = conversationType === "personal" || conversationType === "groupchat";
/**
* Send a typing indicator.
* Sent for personal and group chats so users see immediate feedback.
* Channels don't support typing indicators.
*/
const sendTypingIndicator =
isPersonal || isGroupChat
? async () => {
await withRevokedProxyFallback({
run: async () => {
await params.context.sendActivity({ type: "typing" });
},
onRevoked: async () => {
const baseRef = buildConversationReference(params.conversationRef);
await params.adapter.continueConversation(
params.appId,
{ ...baseRef, activityId: undefined },
async (ctx) => {
await ctx.sendActivity({ type: "typing" });
},
);
},
onRevokedLog: () => {
params.log.debug?.("turn context revoked, sending typing via proactive messaging");
},
});
}
: async () => {
// No-op for channels (not supported)
};
const sendTypingIndicator = isTypingSupported
? async () => {
await withRevokedProxyFallback({
run: async () => {
await params.context.sendActivity({ type: "typing" });
},
onRevoked: async () => {
const baseRef = buildConversationReference(params.conversationRef);
await params.adapter.continueConversation(
params.appId,
{ ...baseRef, activityId: undefined },
async (ctx) => {
await ctx.sendActivity({ type: "typing" });
},
);
},
onRevokedLog: () => {
params.log.debug?.("turn context revoked, sending typing via proactive messaging");
},
});
}
: async () => {};
const { onModelSelected, typingCallbacks, ...replyPipeline } = createChannelReplyPipeline({
cfg: params.cfg,
@ -116,6 +88,7 @@ export function createMSTeamsReplyDispatcher(params: {
},
},
});
const chunkMode = core.channel.text.resolveChunkMode(params.cfg, "msteams");
const tableMode = core.channel.text.resolveMarkdownTableMode({
cfg: params.cfg,
@ -126,26 +99,13 @@ export function createMSTeamsReplyDispatcher(params: {
resolveChannelLimitMb: ({ cfg }) => cfg.channels?.msteams?.mediaMaxMb,
});
const feedbackLoopEnabled = params.cfg.channels?.msteams?.feedbackEnabled !== false;
const streamController = createTeamsReplyStreamController({
conversationType,
context: params.context,
feedbackLoopEnabled,
log: params.log,
});
// Streaming for personal (1:1) chats using the Teams streaminfo protocol.
let stream: TeamsHttpStream | undefined;
// Track whether onPartialReply was ever called — if so, the stream
// owns the text delivery and deliver should skip text payloads.
let streamReceivedTokens = false;
let informativeUpdateSent = false;
if (isPersonal) {
stream = new TeamsHttpStream({
sendActivity: (activity) => params.context.sendActivity(activity),
feedbackLoopEnabled,
onError: (err) => {
params.log.debug?.(`stream error: ${err instanceof Error ? err.message : String(err)}`);
},
});
}
// Accumulate rendered messages from all deliver() calls so the entire turn's
// reply is sent in a single sendMSTeamsMessages() call. (#29379)
const pendingMessages: MSTeamsRenderedMessage[] = [];
const sendMessages = async (messages: MSTeamsRenderedMessage[]): Promise<string[]> => {
@ -211,30 +171,17 @@ export function createMSTeamsReplyDispatcher(params: {
...replyPipeline,
humanDelay: core.channel.reply.resolveHumanDelayConfig(params.cfg, params.agentId),
onReplyStart: async () => {
if (stream && !informativeUpdateSent) {
informativeUpdateSent = true;
await stream.sendInformativeUpdate(pickInformativeStatusText());
}
await streamController.onReplyStart();
await typingCallbacks?.onReplyStart?.();
},
typingCallbacks,
deliver: async (payload) => {
// When streaming received tokens AND hasn't failed, skip text delivery —
// finalize() handles the final message. If streaming failed (>4000 chars),
// fall through so deliver sends the complete response.
// For payloads with media, strip the text and send media only.
if (stream && streamReceivedTokens && stream.hasContent) {
const hasMedia = Boolean(payload.mediaUrl || payload.mediaUrls?.length);
if (!hasMedia) {
return;
}
payload = { ...payload, text: undefined };
const preparedPayload = streamController.preparePayload(payload);
if (!preparedPayload) {
return;
}
// Render the payload to messages and accumulate them. All messages from
// this turn are flushed together in markDispatchIdle() so they go out
// in a single continueConversation() call.
const messages = renderReplyPayloadsToMessages([payload], {
const messages = renderReplyPayloadsToMessages([preparedPayload], {
textChunkLimit: params.textLimit,
chunkText: true,
mediaMode: "split",
@ -259,7 +206,6 @@ export function createMSTeamsReplyDispatcher(params: {
},
});
// Wrap markDispatchIdle to flush accumulated messages and finalize stream.
const markDispatchIdle = (): Promise<void> => {
return flushPendingMessages()
.catch((err) => {
@ -274,32 +220,27 @@ export function createMSTeamsReplyDispatcher(params: {
});
})
.then(() => {
if (stream) {
return stream.finalize().catch((err) => {
params.log.debug?.("stream finalize failed", { error: String(err) });
});
}
return streamController.finalize().catch((err) => {
params.log.debug?.("stream finalize failed", { error: String(err) });
});
})
.finally(() => {
baseMarkDispatchIdle();
});
};
// Build reply options with onPartialReply for streaming.
const streamingReplyOptions = stream
? {
onPartialReply: (payload: { text?: string }) => {
if (payload.text) {
streamReceivedTokens = true;
stream!.update(payload.text);
}
},
}
: {};
return {
dispatcher,
replyOptions: { ...replyOptions, ...streamingReplyOptions, onModelSelected },
replyOptions: {
...replyOptions,
...(streamController.hasStream()
? {
onPartialReply: (payload: { text?: string }) =>
streamController.onPartialReply(payload),
}
: {}),
onModelSelected,
},
markDispatchIdle,
};
}

View File

@ -0,0 +1,76 @@
import type { ReplyPayload } from "../runtime-api.js";
import type { MSTeamsMonitorLogger } from "./monitor-types.js";
import type { MSTeamsTurnContext } from "./sdk-types.js";
import { TeamsHttpStream } from "./streaming-message.js";
const INFORMATIVE_STATUS_TEXTS = [
"Thinking...",
"Working on that...",
"Checking the details...",
"Putting an answer together...",
];
export function pickInformativeStatusText(random = Math.random): string {
const index = Math.floor(random() * INFORMATIVE_STATUS_TEXTS.length);
return INFORMATIVE_STATUS_TEXTS[index] ?? INFORMATIVE_STATUS_TEXTS[0]!;
}
export function createTeamsReplyStreamController(params: {
conversationType?: string;
context: MSTeamsTurnContext;
feedbackLoopEnabled: boolean;
log: MSTeamsMonitorLogger;
random?: () => number;
}) {
const isPersonal = params.conversationType?.toLowerCase() === "personal";
const stream = isPersonal
? new TeamsHttpStream({
sendActivity: (activity) => params.context.sendActivity(activity),
feedbackLoopEnabled: params.feedbackLoopEnabled,
onError: (err) => {
params.log.debug?.(`stream error: ${err instanceof Error ? err.message : String(err)}`);
},
})
: undefined;
let streamReceivedTokens = false;
let informativeUpdateSent = false;
return {
async onReplyStart(): Promise<void> {
if (!stream || informativeUpdateSent) {
return;
}
informativeUpdateSent = true;
await stream.sendInformativeUpdate(pickInformativeStatusText(params.random));
},
onPartialReply(payload: { text?: string }): void {
if (!stream || !payload.text) {
return;
}
streamReceivedTokens = true;
stream.update(payload.text);
},
preparePayload(payload: ReplyPayload): ReplyPayload | undefined {
if (!stream || !streamReceivedTokens || !stream.hasContent) {
return payload;
}
const hasMedia = Boolean(payload.mediaUrl || payload.mediaUrls?.length);
if (!hasMedia) {
return undefined;
}
return { ...payload, text: undefined };
},
async finalize(): Promise<void> {
await stream?.finalize();
},
hasStream(): boolean {
return Boolean(stream);
},
};
}

View File

@ -24,6 +24,24 @@ export type MSTeamsTokenProvider = {
getAccessToken: (scope: string) => Promise<string>;
};
type MSTeamsBotIdentity = {
id?: string;
name?: string;
};
type MSTeamsSendContext = {
sendActivity: (textOrActivity: string | object) => Promise<unknown>;
updateActivity: (activityUpdate: object) => Promise<{ id?: string } | void>;
deleteActivity: (activityId: string) => Promise<void>;
};
type MSTeamsProcessContext = MSTeamsSendContext & {
activity: Record<string, unknown> | undefined;
sendActivities: (
activities: Array<{ type: string } & Record<string, unknown>>,
) => Promise<unknown[]>;
};
export async function loadMSTeamsSdk(): Promise<MSTeamsTeamsSdk> {
const [appsModule, apiModule] = await Promise.all([
import("@microsoft/teams.apps"),
@ -70,6 +88,157 @@ export function createMSTeamsTokenProvider(app: MSTeamsApp): MSTeamsTokenProvide
};
}
function createBotTokenGetter(app: MSTeamsApp): () => Promise<string | undefined> {
return async () => {
const token = await (
app as unknown as { getBotToken(): Promise<{ toString(): string } | null> }
).getBotToken();
return token ? String(token) : undefined;
};
}
function createApiClient(
sdk: MSTeamsTeamsSdk,
serviceUrl: string,
getToken: () => Promise<string | undefined>,
) {
return new sdk.Client(serviceUrl, {
token: async () => (await getToken()) || undefined,
headers: { "User-Agent": buildUserAgent() },
} as Record<string, unknown>);
}
function normalizeOutboundActivity(textOrActivity: string | object): Record<string, unknown> {
return typeof textOrActivity === "string"
? ({ type: "message", text: textOrActivity } as Record<string, unknown>)
: (textOrActivity as Record<string, unknown>);
}
function createSendContext(params: {
sdk: MSTeamsTeamsSdk;
serviceUrl?: string;
conversationId?: string;
conversationType?: string;
bot?: MSTeamsBotIdentity;
replyToActivityId?: string;
getToken: () => Promise<string | undefined>;
treatInvokeResponseAsNoop?: boolean;
}): MSTeamsSendContext {
const apiClient =
params.serviceUrl && params.conversationId
? createApiClient(params.sdk, params.serviceUrl, params.getToken)
: undefined;
return {
async sendActivity(textOrActivity: string | object): Promise<unknown> {
const msg = normalizeOutboundActivity(textOrActivity);
if (params.treatInvokeResponseAsNoop && msg.type === "invokeResponse") {
return { id: "invokeResponse" };
}
if (!apiClient || !params.conversationId) {
return { id: "unknown" };
}
return await apiClient.conversations.activities(params.conversationId).create({
type: "message",
...msg,
from: params.bot?.id
? { id: params.bot.id, name: params.bot.name ?? "", role: "bot" }
: undefined,
conversation: {
id: params.conversationId,
conversationType: params.conversationType ?? "personal",
},
...(params.replyToActivityId && !msg.replyToId
? { replyToId: params.replyToActivityId }
: {}),
} as Parameters<
typeof apiClient.conversations.activities extends (id: string) => {
create: (a: infer T) => unknown;
}
? never
: never
>[0]);
},
async updateActivity(activityUpdate: object): Promise<{ id?: string } | void> {
const nextActivity = activityUpdate as { id?: string } & Record<string, unknown>;
const activityId = nextActivity.id;
if (!activityId) {
throw new Error("updateActivity requires an activity id");
}
if (!params.serviceUrl || !params.conversationId) {
return { id: "unknown" };
}
return await updateActivityViaRest({
serviceUrl: params.serviceUrl,
conversationId: params.conversationId,
activityId,
activity: nextActivity,
token: await params.getToken(),
});
},
async deleteActivity(activityId: string): Promise<void> {
if (!activityId) {
throw new Error("deleteActivity requires an activity id");
}
if (!params.serviceUrl || !params.conversationId) {
return;
}
await deleteActivityViaRest({
serviceUrl: params.serviceUrl,
conversationId: params.conversationId,
activityId,
token: await params.getToken(),
});
},
};
}
function createProcessContext(params: {
sdk: MSTeamsTeamsSdk;
activity: Record<string, unknown> | undefined;
getToken: () => Promise<string | undefined>;
}): MSTeamsProcessContext {
const serviceUrl = params.activity?.serviceUrl as string | undefined;
const conversationId = (params.activity?.conversation as Record<string, unknown>)?.id as
| string
| undefined;
const conversationType = (params.activity?.conversation as Record<string, unknown>)
?.conversationType as string | undefined;
const replyToActivityId = params.activity?.id as string | undefined;
const bot: MSTeamsBotIdentity | undefined =
params.activity?.recipient && typeof params.activity.recipient === "object"
? {
id: (params.activity.recipient as Record<string, unknown>).id as string | undefined,
name: (params.activity.recipient as Record<string, unknown>).name as string | undefined,
}
: undefined;
const sendContext = createSendContext({
sdk: params.sdk,
serviceUrl,
conversationId,
conversationType,
bot,
replyToActivityId,
getToken: params.getToken,
treatInvokeResponseAsNoop: true,
});
return {
activity: params.activity,
...sendContext,
async sendActivities(activities: Array<{ type: string } & Record<string, unknown>>) {
const results = [];
for (const activity of activities) {
results.push(await sendContext.sendActivity(activity));
}
return results;
},
};
}
/**
* Update an existing activity via the Bot Framework REST API.
* PUT /v3/conversations/{conversationId}/activities/{activityId}
@ -168,76 +337,14 @@ export function createMSTeamsAdapter(app: MSTeamsApp, sdk: MSTeamsTeamsSdk): MST
throw new Error("Missing conversation.id in conversation reference");
}
// Fetch a fresh token for each call via a token factory.
// The SDK's App manages token caching/refresh internally.
const getToken = async () => {
const token = await (
app as unknown as { getBotToken(): Promise<{ toString(): string } | null> }
).getBotToken();
return token ? String(token) : undefined;
};
// Build a send context that uses the Bot Framework REST API.
// Pass a token factory (not a cached value) so each request gets a fresh token.
const apiClient = new sdk.Client(serviceUrl, {
token: async () => (await getToken()) || undefined,
headers: { "User-Agent": buildUserAgent() },
} as Record<string, unknown>);
const sendContext = {
async sendActivity(textOrActivity: string | object): Promise<unknown> {
const activity =
typeof textOrActivity === "string"
? ({ type: "message", text: textOrActivity } as Record<string, unknown>)
: (textOrActivity as Record<string, unknown>);
const response = await apiClient.conversations.activities(conversationId).create({
type: "message",
...activity,
from: reference.agent
? { id: reference.agent.id, name: reference.agent.name ?? "", role: "bot" }
: undefined,
conversation: {
id: conversationId,
conversationType: reference.conversation?.conversationType ?? "personal",
},
} as Parameters<
typeof apiClient.conversations.activities extends (id: string) => {
create: (a: infer T) => unknown;
}
? never
: never
>[0]);
return response;
},
async updateActivity(activityUpdate: object): Promise<{ id?: string } | void> {
const nextActivity = activityUpdate as { id?: string } & Record<string, unknown>;
const activityId = nextActivity.id;
if (!activityId) {
throw new Error("updateActivity requires an activity id");
}
// Bot Framework REST API: PUT /v3/conversations/{conversationId}/activities/{activityId}
return await updateActivityViaRest({
serviceUrl,
conversationId,
activityId,
activity: nextActivity,
token: await getToken(),
});
},
async deleteActivity(activityId: string): Promise<void> {
if (!activityId) {
throw new Error("deleteActivity requires an activity id");
}
await deleteActivityViaRest({
serviceUrl,
conversationId,
activityId,
token: await getToken(),
});
},
};
const sendContext = createSendContext({
sdk,
serviceUrl,
conversationId,
conversationType: reference.conversation?.conversationType,
bot: reference.agent ?? undefined,
getToken: createBotTokenGetter(app),
});
await logic(sendContext);
},
@ -252,105 +359,11 @@ export function createMSTeamsAdapter(app: MSTeamsApp, sdk: MSTeamsTeamsSdk): MST
const isInvoke = (activity as Record<string, unknown>)?.type === "invoke";
try {
const serviceUrl = activity?.serviceUrl as string | undefined;
// Token factory — fetches a fresh token for each API call.
const getToken = async () => {
const token = await (
app as unknown as { getBotToken(): Promise<{ toString(): string } | null> }
).getBotToken();
return token ? String(token) : undefined;
};
const context = {
const context = createProcessContext({
sdk,
activity,
async sendActivity(textOrActivity: string | object): Promise<unknown> {
const msg =
typeof textOrActivity === "string"
? ({ type: "message", text: textOrActivity } as Record<string, unknown>)
: (textOrActivity as Record<string, unknown>);
// invokeResponse is handled by the HTTP response from process(),
// not by posting a new activity to Bot Framework.
if (msg.type === "invokeResponse") {
return { id: "invokeResponse" };
}
if (!serviceUrl) {
return { id: "unknown" };
}
const convId = (activity?.conversation as Record<string, unknown>)?.id as
| string
| undefined;
if (!convId) {
return { id: "unknown" };
}
const apiClient = new sdk.Client(serviceUrl, {
token: async () => (await getToken()) || undefined,
headers: { "User-Agent": buildUserAgent() },
} as Record<string, unknown>);
const botId = (activity?.recipient as Record<string, unknown>)?.id as
| string
| undefined;
const botName = (activity?.recipient as Record<string, unknown>)?.name as
| string
| undefined;
const convType = (activity?.conversation as Record<string, unknown>)
?.conversationType as string | undefined;
// Preserve replyToId for threaded replies (replyStyle: "thread")
const inboundActivityId = (activity as Record<string, unknown>)?.id as
| string
| undefined;
return await apiClient.conversations.activities(convId).create({
type: "message",
...msg,
from: botId ? { id: botId, name: botName ?? "", role: "bot" } : undefined,
conversation: { id: convId, conversationType: convType ?? "personal" },
...(inboundActivityId && !msg.replyToId ? { replyToId: inboundActivityId } : {}),
} as Parameters<
typeof apiClient.conversations.activities extends (id: string) => {
create: (a: infer T) => unknown;
}
? never
: never
>[0]);
},
async sendActivities(
activities: Array<{ type: string } & Record<string, unknown>>,
): Promise<unknown> {
const results = [];
for (const act of activities) {
results.push(await context.sendActivity(act));
}
return results;
},
async updateActivity(
activityUpdate: { id: string } & Record<string, unknown>,
): Promise<unknown> {
const activityId = activityUpdate.id;
if (!activityId || !serviceUrl) {
return { id: "unknown" };
}
const convId = (activity?.conversation as Record<string, unknown>)?.id as
| string
| undefined;
if (!convId) {
return { id: "unknown" };
}
return await updateActivityViaRest({
serviceUrl,
conversationId: convId,
activityId,
activity: activityUpdate,
token: await getToken(),
});
},
};
getToken: createBotTokenGetter(app),
});
// For invoke activities, send HTTP 200 immediately before running
// handler logic so slow operations (file uploads, reflections) don't