diff --git a/CHANGELOG.md b/CHANGELOG.md index a2e976281e1..366c5af34e2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -58,6 +58,7 @@ Docs: https://docs.openclaw.ai - Telegram/topics: auto-rename DM forum topics on first message with LLM-generated labels, with per-account and per-DM `autoTopicLabel` overrides. (#51502) Thanks @Lukavyi. - Docs/plugins: add the community wecom plugin listing to the docs catalog. (#29905) Thanks @sliverp. - Models/GitHub Copilot: allow forward-compat dynamic model ids without code updates, while preserving configured provider and per-model overrides for those synthetic models. (#51325) Thanks @fuller-stack-dev. +- Agents/compaction: notify users when followup auto-compaction starts and finishes, keeping those notices out of TTS and preserving reply threading for the real assistant reply. (#38805) Thanks @zidongdesign. ### Fixes diff --git a/src/auto-reply/reply/agent-runner-execution.ts b/src/auto-reply/reply/agent-runner-execution.ts index c25342e4a28..7e6a4cfa6bc 100644 --- a/src/auto-reply/reply/agent-runner-execution.ts +++ b/src/auto-reply/reply/agent-runner-execution.ts @@ -199,6 +199,23 @@ export async function runAgentTurnWithFallback(params: { return text; }; const blockReplyPipeline = params.blockReplyPipeline; + // Build the delivery handler once so both onAgentEvent (compaction start + // notice) and the onBlockReply field share the same instance. This + // ensures replyToId threading (replyToMode=all|first) is applied to + // compaction notices just like every other block reply. + const blockReplyHandler = params.opts?.onBlockReply + ? createBlockReplyDeliveryHandler({ + onBlockReply: params.opts.onBlockReply, + currentMessageId: params.sessionCtx.MessageSidFull ?? params.sessionCtx.MessageSid, + normalizeStreamingText, + applyReplyToMode: params.applyReplyToMode, + normalizeMediaPaths: normalizeReplyMediaPaths, + typingSignals: params.typingSignals, + blockStreamingEnabled: params.blockStreamingEnabled, + blockReplyPipeline, + directlySentBlockKeys, + }) + : undefined; const onToolResult = params.opts?.onToolResult; const fallbackResult = await runWithModelFallback({ ...resolveModelFallbackOptions(params.followupRun.run), @@ -394,11 +411,34 @@ export async function runAgentTurnWithFallback(params: { await params.opts?.onToolStart?.({ name, phase }); } } - // Track auto-compaction completion and notify UI layer. + // Track auto-compaction and notify higher layers. if (evt.stream === "compaction") { const phase = typeof evt.data.phase === "string" ? evt.data.phase : ""; if (phase === "start") { - await params.opts?.onCompactionStart?.(); + if (params.opts?.onCompactionStart) { + await params.opts.onCompactionStart(); + } else if (params.opts?.onBlockReply) { + // Send directly via opts.onBlockReply (bypassing the + // pipeline) so the notice does not cause final payloads + // to be discarded on non-streaming model paths. + const currentMessageId = + params.sessionCtx.MessageSidFull ?? params.sessionCtx.MessageSid; + const noticePayload = params.applyReplyToMode({ + text: "๐Ÿงน Compacting context...", + replyToId: currentMessageId, + replyToCurrent: true, + isCompactionNotice: true, + }); + try { + await params.opts.onBlockReply(noticePayload); + } catch (err) { + // Non-critical notice delivery failure should not + // bubble out of the fire-and-forget event handler. + logVerbose( + `compaction start notice delivery failed (non-fatal): ${String(err)}`, + ); + } + } } const completed = evt.data?.completed === true; if (phase === "end" && completed) { @@ -410,20 +450,7 @@ export async function runAgentTurnWithFallback(params: { // Always pass onBlockReply so flushBlockReplyBuffer works before tool execution, // even when regular block streaming is disabled. The handler sends directly // via opts.onBlockReply when the pipeline isn't available. - onBlockReply: params.opts?.onBlockReply - ? createBlockReplyDeliveryHandler({ - onBlockReply: params.opts.onBlockReply, - currentMessageId: - params.sessionCtx.MessageSidFull ?? params.sessionCtx.MessageSid, - normalizeStreamingText, - applyReplyToMode: params.applyReplyToMode, - normalizeMediaPaths: normalizeReplyMediaPaths, - typingSignals: params.typingSignals, - blockStreamingEnabled: params.blockStreamingEnabled, - blockReplyPipeline, - directlySentBlockKeys, - }) - : undefined, + onBlockReply: blockReplyHandler, onBlockReplyFlush: params.blockStreamingEnabled && blockReplyPipeline ? async () => { diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index fbdad1be160..7499657d6d4 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -418,6 +418,11 @@ export async function runReplyAgent(params: { await blockReplyPipeline.flush({ force: true }); blockReplyPipeline.stop(); } + + // NOTE: The compaction completion notice for block-streaming mode is sent + // further below โ€” after incrementRunCompactionCount โ€” so it can include + // the `(count N)` suffix. Sending it here (before the count is known) + // would omit that information. if (pendingToolTasks.size > 0) { await Promise.allSettled(pendingToolTasks); } @@ -697,9 +702,48 @@ export async function runReplyAgent(params: { }); } - if (verboseEnabled) { - const suffix = typeof count === "number" ? ` (count ${count})` : ""; - verboseNotices.push({ text: `๐Ÿงน Auto-compaction complete${suffix}.` }); + // Always notify the user when compaction completes โ€” not just in verbose + // mode. The "๐Ÿงน Compacting context..." notice was already sent at start, + // so the completion message closes the loop for every user regardless of + // their verbose setting. + const suffix = typeof count === "number" ? ` (count ${count})` : ""; + const completionText = verboseEnabled + ? `๐Ÿงน Auto-compaction complete${suffix}.` + : `โœ… Context compacted${suffix}.`; + + if (blockReplyPipeline && opts?.onBlockReply) { + // In block-streaming mode, send the completion notice via + // fire-and-forget *after* the pipeline has flushed (so it does not set + // didStream()=true and cause buildReplyPayloads to discard the real + // assistant reply). Now that the count is known we can include it. + const currentMessageId = sessionCtx.MessageSidFull ?? sessionCtx.MessageSid; + const noticePayload = applyReplyToMode({ + text: completionText, + replyToId: currentMessageId, + replyToCurrent: true, + isCompactionNotice: true, + }); + void Promise.race([ + opts.onBlockReply(noticePayload), + new Promise((_, reject) => + setTimeout(() => reject(new Error("compaction notice timeout")), blockReplyTimeoutMs), + ), + ]).catch(() => { + // Intentionally swallowed โ€” the notice is informational only. + }); + } else { + // Non-streaming: push into verboseNotices with full compaction metadata + // so threading exemptions apply and replyToMode=first does not thread + // the notice instead of the real assistant reply. + const currentMessageId = sessionCtx.MessageSidFull ?? sessionCtx.MessageSid; + verboseNotices.push( + applyReplyToMode({ + text: completionText, + replyToId: currentMessageId, + replyToCurrent: true, + isCompactionNotice: true, + }), + ); } } if (verboseNotices.length > 0) { diff --git a/src/auto-reply/reply/dispatch-from-config.ts b/src/auto-reply/reply/dispatch-from-config.ts index 9df6ef2bc63..9f603b30863 100644 --- a/src/auto-reply/reply/dispatch-from-config.ts +++ b/src/auto-reply/reply/dispatch-from-config.ts @@ -582,8 +582,10 @@ export async function dispatchReplyFromConfig(params: { if (shouldSuppressReasoningPayload(payload)) { return; } - // Accumulate block text for TTS generation after streaming - if (payload.text) { + // Accumulate block text for TTS generation after streaming. + // Exclude compaction status notices โ€” they are informational UI + // signals and must not be synthesised into the spoken reply. + if (payload.text && !payload.isCompactionNotice) { if (accumulatedBlockText.length > 0) { accumulatedBlockText += "\n"; } diff --git a/src/auto-reply/reply/followup-runner.test.ts b/src/auto-reply/reply/followup-runner.test.ts index 0e93ab156a8..33fd8b9fb89 100644 --- a/src/auto-reply/reply/followup-runner.test.ts +++ b/src/auto-reply/reply/followup-runner.test.ts @@ -70,6 +70,10 @@ function mockCompactionRun(params: { async (args: { onAgentEvent?: (evt: { stream: string; data: Record }) => void; }) => { + args.onAgentEvent?.({ + stream: "compaction", + data: { phase: "start" }, + }); args.onAgentEvent?.({ stream: "compaction", data: { phase: "end", willRetry: params.willRetry, completed: true }, @@ -84,7 +88,7 @@ function createAsyncReplySpy() { } describe("createFollowupRunner compaction", () => { - it("adds verbose auto-compaction notice and tracks count", async () => { + it("adds compaction notices and tracks count in verbose mode", async () => { const storePath = path.join( await fs.mkdtemp(path.join(tmpdir(), "openclaw-compaction-")), "sessions.json", @@ -122,9 +126,15 @@ describe("createFollowupRunner compaction", () => { await runner(queued); - expect(onBlockReply).toHaveBeenCalled(); - const firstCall = (onBlockReply.mock.calls as unknown as Array>)[0]; - expect(firstCall?.[0]?.text).toContain("Auto-compaction complete"); + expect(onBlockReply).toHaveBeenCalledTimes(3); + const calls = onBlockReply.mock.calls as unknown as Array< + Array<{ text?: string; isCompactionNotice?: boolean }> + >; + expect(calls[0]?.[0]?.text).toBe("๐Ÿงน Compacting context..."); + expect(calls[0]?.[0]?.isCompactionNotice).toBe(true); + expect(calls[1]?.[0]?.text).toContain("Auto-compaction complete"); + expect(calls[1]?.[0]?.isCompactionNotice).toBe(true); + expect(calls[2]?.[0]?.text).toBe("final"); expect(sessionStore.main.compactionCount).toBe(1); }); @@ -171,12 +181,84 @@ describe("createFollowupRunner compaction", () => { await runner(queued); - expect(onBlockReply).toHaveBeenCalled(); - const firstCall = (onBlockReply.mock.calls as unknown as Array>)[0]; - expect(firstCall?.[0]?.text).toContain("Auto-compaction complete"); + expect(onBlockReply).toHaveBeenCalledTimes(2); + const calls = onBlockReply.mock.calls as unknown as Array< + Array<{ text?: string; isCompactionNotice?: boolean }> + >; + expect(calls[0]?.[0]?.text).toContain("Auto-compaction complete"); + expect(calls[0]?.[0]?.isCompactionNotice).toBe(true); + expect(calls[1]?.[0]?.text).toBe("final"); expect(sessionStore.main.compactionCount).toBe(2); }); + it("threads followup compaction notices without consuming the first reply slot", async () => { + const storePath = path.join( + await fs.mkdtemp(path.join(tmpdir(), "openclaw-compaction-threading-")), + "sessions.json", + ); + const sessionEntry: SessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + }; + const sessionStore: Record = { + main: sessionEntry, + }; + const onBlockReply = vi.fn(async () => {}); + + mockCompactionRun({ + willRetry: true, + result: { payloads: [{ text: "final" }], meta: {} }, + }); + + const runner = createFollowupRunner({ + opts: { onBlockReply }, + typing: createMockTypingController(), + typingMode: "instant", + sessionEntry, + sessionStore, + sessionKey: "main", + storePath, + defaultModel: "anthropic/claude-opus-4-5", + }); + + const queued = createQueuedRun({ + messageId: "msg-42", + run: { + messageProvider: "discord", + config: { + channels: { + discord: { + replyToMode: "first", + }, + }, + }, + verboseLevel: "off", + }, + }); + + await runner(queued); + + expect(onBlockReply).toHaveBeenCalledTimes(3); + const calls = onBlockReply.mock.calls as unknown as Array< + Array<{ text?: string; replyToId?: string; isCompactionNotice?: boolean }> + >; + expect(calls[0]?.[0]).toMatchObject({ + text: "๐Ÿงน Compacting context...", + replyToId: "msg-42", + isCompactionNotice: true, + }); + expect(calls[1]?.[0]).toMatchObject({ + text: "โœ… Context compacted (count 1).", + replyToId: "msg-42", + isCompactionNotice: true, + }); + expect(calls[2]?.[0]).toMatchObject({ + text: "final", + replyToId: "msg-42", + }); + expect(calls[2]?.[0]?.isCompactionNotice).toBeUndefined(); + }); + it("does not count failed compaction end events in followup runs", async () => { const storePath = path.join( await fs.mkdtemp(path.join(tmpdir(), "openclaw-compaction-failed-")), diff --git a/src/auto-reply/reply/followup-runner.ts b/src/auto-reply/reply/followup-runner.ts index 2fd21607095..ee69187d9df 100644 --- a/src/auto-reply/reply/followup-runner.ts +++ b/src/auto-reply/reply/followup-runner.ts @@ -148,6 +148,43 @@ export function createFollowupRunner(params: { isControlUiVisible: shouldSurfaceToControlUi, }); } + const replyToChannel = resolveOriginMessageProvider({ + originatingChannel: queued.originatingChannel, + provider: queued.run.messageProvider, + }) as OriginatingChannelType | undefined; + const replyToMode = resolveReplyToMode( + queued.run.config, + replyToChannel, + queued.originatingAccountId, + queued.originatingChatType, + ); + const currentMessageId = queued.messageId?.trim() || undefined; + const applyFollowupReplyThreading = (payloads: ReplyPayload[]) => + applyReplyThreading({ + payloads, + replyToMode, + replyToChannel, + currentMessageId, + }); + const sendCompactionNotice = async (text: string) => { + const noticePayloads = applyFollowupReplyThreading([ + { + text, + replyToCurrent: true, + isCompactionNotice: true, + }, + ]); + if (noticePayloads.length === 0) { + return; + } + try { + await sendFollowupPayloads(noticePayloads, queued); + } catch (err) { + logVerbose( + `followup queue: compaction notice delivery failed (non-fatal): ${String(err)}`, + ); + } + }; let autoCompactionCount = 0; let runResult: Awaited>; let fallbackProvider = queued.run.provider; @@ -229,6 +266,9 @@ export function createFollowupRunner(params: { return; } const phase = typeof evt.data.phase === "string" ? evt.data.phase : ""; + if (phase === "start") { + void sendCompactionNotice("๐Ÿงน Compacting context..."); + } const completed = evt.data?.completed === true; if (phase === "end" && completed) { attemptCompactionCount += 1; @@ -284,9 +324,6 @@ export function createFollowupRunner(params: { } const payloadArray = runResult.payloads ?? []; - if (payloadArray.length === 0) { - return; - } const sanitizedPayloads = payloadArray.flatMap((payload) => { const text = payload.text; if (!text || !text.includes("HEARTBEAT_OK")) { @@ -299,22 +336,7 @@ export function createFollowupRunner(params: { } return [{ ...payload, text: stripped.text }]; }); - const replyToChannel = resolveOriginMessageProvider({ - originatingChannel: queued.originatingChannel, - provider: queued.run.messageProvider, - }) as OriginatingChannelType | undefined; - const replyToMode = resolveReplyToMode( - queued.run.config, - replyToChannel, - queued.originatingAccountId, - queued.originatingChatType, - ); - - const replyTaggedPayloads: ReplyPayload[] = applyReplyThreading({ - payloads: sanitizedPayloads, - replyToMode, - replyToChannel, - }); + const replyTaggedPayloads = applyFollowupReplyThreading(sanitizedPayloads); const dedupedPayloads = filterMessagingToolDuplicates({ payloads: replyTaggedPayloads, @@ -338,11 +360,7 @@ export function createFollowupRunner(params: { accountId: queued.run.agentAccountId, }), }); - const finalPayloads = suppressMessagingToolReplies ? [] : mediaFilteredPayloads; - - if (finalPayloads.length === 0) { - return; - } + let finalPayloads = suppressMessagingToolReplies ? [] : mediaFilteredPayloads; if (autoCompactionCount > 0) { const count = await incrementRunCompactionCount({ @@ -354,12 +372,25 @@ export function createFollowupRunner(params: { lastCallUsage: runResult.meta?.agentMeta?.lastCallUsage, contextTokensUsed, }); - if (queued.run.verboseLevel && queued.run.verboseLevel !== "off") { - const suffix = typeof count === "number" ? ` (count ${count})` : ""; - finalPayloads.unshift({ - text: `๐Ÿงน Auto-compaction complete${suffix}.`, - }); - } + const suffix = typeof count === "number" ? ` (count ${count})` : ""; + const completionText = + queued.run.verboseLevel && queued.run.verboseLevel !== "off" + ? `๐Ÿงน Auto-compaction complete${suffix}.` + : `โœ… Context compacted${suffix}.`; + finalPayloads = [ + ...applyFollowupReplyThreading([ + { + text: completionText, + replyToCurrent: true, + isCompactionNotice: true, + }, + ]), + ...finalPayloads, + ]; + } + + if (finalPayloads.length === 0) { + return; } await sendFollowupPayloads(finalPayloads, queued); diff --git a/src/auto-reply/reply/reply-threading.ts b/src/auto-reply/reply/reply-threading.ts index 66871f226b7..5c0e1e423bc 100644 --- a/src/auto-reply/reply/reply-threading.ts +++ b/src/auto-reply/reply/reply-threading.ts @@ -33,7 +33,12 @@ export function createReplyToModeFilter( } if (mode === "off") { const isExplicit = Boolean(payload.replyToTag) || Boolean(payload.replyToCurrent); - if (opts.allowExplicitReplyTagsWhenOff && isExplicit) { + // Compaction notices must never be threaded when replyToMode=off โ€” even + // if they carry explicit reply tags (replyToCurrent). Honouring the + // explicit tag here would make status notices appear in-thread while + // normal assistant replies stay off-thread, contradicting the off-mode + // expectation. Strip replyToId unconditionally for compaction payloads. + if (opts.allowExplicitReplyTagsWhenOff && isExplicit && !payload.isCompactionNotice) { return payload; } return { ...payload, replyToId: undefined }; @@ -42,9 +47,21 @@ export function createReplyToModeFilter( return payload; } if (hasThreaded) { + // Compaction notices are transient status messages that should always + // appear in-thread, even after the first assistant block has already + // consumed the "first" slot. Let them keep their replyToId. + if (payload.isCompactionNotice) { + return payload; + } return { ...payload, replyToId: undefined }; } - hasThreaded = true; + // Compaction notices are transient status messages โ€” they should be + // threaded (so they appear in-context), but they must not consume the + // "first" slot of the replyToMode=first filter. Skip advancing + // hasThreaded so the real assistant reply still gets replyToId. + if (!payload.isCompactionNotice) { + hasThreaded = true; + } return payload; }; } diff --git a/src/auto-reply/types.ts b/src/auto-reply/types.ts index c424f43ab92..638dda42d8f 100644 --- a/src/auto-reply/types.ts +++ b/src/auto-reply/types.ts @@ -91,6 +91,10 @@ export type ReplyPayload = { /** Marks this payload as a reasoning/thinking block. Channels that do not * have a dedicated reasoning lane (e.g. WhatsApp, web) should suppress it. */ isReasoning?: boolean; + /** Marks this payload as a compaction status notice (start/end). + * Should be excluded from TTS transcript accumulation so compaction + * status lines are not synthesised into the spoken assistant reply. */ + isCompactionNotice?: boolean; /** Channel-specific payload data (per-channel envelope). */ channelData?: Record; }; diff --git a/src/tts/tts.ts b/src/tts/tts.ts index 17a7c2fc981..348ff94be91 100644 --- a/src/tts/tts.ts +++ b/src/tts/tts.ts @@ -825,6 +825,10 @@ export async function maybeApplyTtsToPayload(params: { inboundAudio?: boolean; ttsAuto?: string; }): Promise { + // Compaction notices are informational UI signals โ€” never synthesise them as speech. + if (params.payload.isCompactionNotice) { + return params.payload; + } const config = resolveTtsConfig(params.cfg); const prefsPath = resolveTtsPrefsPath(config); const autoMode = resolveTtsAutoMode({