From 8a5ad5e320beb0b5d50fb8ab8a07ee246a754b01 Mon Sep 17 00:00:00 2001 From: Joey Krug Date: Sat, 14 Mar 2026 21:54:35 -0400 Subject: [PATCH] Reply: preserve deferred queued media context --- src/auto-reply/reply/followup-media.ts | 241 +++++++++++++++++++ src/auto-reply/reply/followup-runner.test.ts | 225 ++++++++++++++++- src/auto-reply/reply/followup-runner.ts | 180 +------------- src/auto-reply/reply/get-reply-run.ts | 2 + src/auto-reply/reply/queue/drain.ts | 71 +++++- src/auto-reply/reply/queue/enqueue.ts | 37 ++- src/auto-reply/reply/queue/state.ts | 3 + src/auto-reply/reply/queue/types.ts | 3 + 8 files changed, 572 insertions(+), 190 deletions(-) create mode 100644 src/auto-reply/reply/followup-media.ts diff --git a/src/auto-reply/reply/followup-media.ts b/src/auto-reply/reply/followup-media.ts new file mode 100644 index 00000000000..5a014e63f9b --- /dev/null +++ b/src/auto-reply/reply/followup-media.ts @@ -0,0 +1,241 @@ +import { logVerbose } from "../../globals.js"; +import { applyMediaUnderstanding } from "../../media-understanding/apply.js"; +import { + normalizeAttachments, + resolveAttachmentKind, +} from "../../media-understanding/attachments.js"; +import { buildInboundMediaNote } from "../media-note.js"; +import type { MsgContext } from "../templating.js"; +import { parseInlineDirectives } from "./directive-handling.js"; +import type { FollowupMediaContext, FollowupRun } from "./queue/types.js"; + +const MEDIA_ONLY_PLACEHOLDER = "[User sent media without caption]"; +const MEDIA_REPLY_HINT_PREFIX = "To send an image back, prefer the message tool"; +const LEADING_MEDIA_ATTACHED_LINE_RE = /^\[media attached(?: \d+\/\d+)?: [^\r\n]*\]$/; +const FILE_BLOCK_RE = / Boolean(value) && list.indexOf(value) === index, + ); + + let rebuilt = stripped; + for (const target of replacementTargets) { + const replaced = replaceLastOccurrence(rebuilt, target, updatedBody); + if (replaced !== undefined) { + rebuilt = replaced; + return [params.mediaNote?.trim(), rebuilt.trim()].filter(Boolean).join("\n").trim(); + } + } + + rebuilt = [rebuilt, updatedBody].filter(Boolean).join("\n\n"); + return [params.mediaNote?.trim(), rebuilt.trim()].filter(Boolean).join("\n").trim(); +} + +function hasMediaAttachments(mediaContext: FollowupMediaContext): boolean { + return Boolean( + mediaContext.MediaPath?.trim() || + mediaContext.MediaUrl?.trim() || + (Array.isArray(mediaContext.MediaPaths) && mediaContext.MediaPaths.length > 0) || + (Array.isArray(mediaContext.MediaUrls) && mediaContext.MediaUrls.length > 0), + ); +} + +function hasOnlyFileLikeAttachments(mediaContext: FollowupMediaContext): boolean { + const attachments = normalizeAttachments(mediaContext as MsgContext); + return ( + attachments.length > 0 && + attachments.every((attachment) => { + const kind = resolveAttachmentKind(attachment); + return kind !== "audio" && kind !== "image" && kind !== "video"; + }) + ); +} + +function snapshotUpdatedMediaContext(params: { + original: FollowupMediaContext; + mediaCtx: MsgContext; + updatedBody?: string; +}): FollowupMediaContext { + return { + ...params.original, + Body: params.updatedBody ?? params.original.Body, + Transcript: + typeof params.mediaCtx.Transcript === "string" + ? params.mediaCtx.Transcript + : params.original.Transcript, + MediaUnderstanding: Array.isArray(params.mediaCtx.MediaUnderstanding) + ? [...params.mediaCtx.MediaUnderstanding] + : params.original.MediaUnderstanding, + MediaUnderstandingDecisions: Array.isArray(params.mediaCtx.MediaUnderstandingDecisions) + ? [...params.mediaCtx.MediaUnderstandingDecisions] + : params.original.MediaUnderstandingDecisions, + DeferredMediaApplied: true, + }; +} + +export async function applyDeferredMediaUnderstandingToQueuedRun( + queued: FollowupRun, + params: { logLabel?: string } = {}, +): Promise { + const mediaContext = queued.mediaContext; + if (!mediaContext || mediaContext.DeferredMediaApplied) { + return; + } + if (mediaContext.MediaUnderstanding?.length) { + mediaContext.DeferredMediaApplied = true; + return; + } + if (!hasMediaAttachments(mediaContext)) { + mediaContext.DeferredMediaApplied = true; + return; + } + + const resolvedOriginalBody = + mediaContext.CommandBody ?? mediaContext.RawBody ?? mediaContext.Body; + const bodyAlreadyHasFileBlock = + FILE_BLOCK_RE.test(resolvedOriginalBody ?? "") || FILE_BLOCK_RE.test(mediaContext.Body ?? ""); + + if (bodyAlreadyHasFileBlock && hasOnlyFileLikeAttachments(mediaContext)) { + mediaContext.DeferredMediaApplied = true; + return; + } + + try { + const mediaCtx = { + ...mediaContext, + Body: resolvedOriginalBody, + Provider: + mediaContext.Provider ?? + queued.run.messageProvider ?? + (typeof mediaContext.OriginatingChannel === "string" + ? mediaContext.OriginatingChannel + : undefined), + Surface: mediaContext.Surface, + } as MsgContext; + + const muResult = await applyMediaUnderstanding({ + ctx: mediaCtx, + cfg: queued.run.config, + agentDir: queued.run.agentDir, + activeModel: { + provider: queued.run.provider, + model: queued.run.model, + }, + }); + + const shouldRebuildPrompt = + muResult.outputs.length > 0 || + muResult.appliedAudio || + muResult.appliedImage || + muResult.appliedVideo || + (muResult.appliedFile && !bodyAlreadyHasFileBlock); + + if (shouldRebuildPrompt) { + const newMediaNote = buildInboundMediaNote(mediaCtx); + queued.prompt = rebuildQueuedPromptWithMediaUnderstanding({ + prompt: queued.prompt, + originalBody: resolvedOriginalBody, + updatedBody: mediaCtx.Body, + mediaNote: newMediaNote, + }); + logVerbose( + `${params.logLabel ?? "followup"}: applied media understanding (audio=${muResult.appliedAudio}, image=${muResult.appliedImage}, video=${muResult.appliedVideo}, file=${muResult.appliedFile})`, + ); + } + + queued.mediaContext = snapshotUpdatedMediaContext({ + original: mediaContext, + mediaCtx, + updatedBody: shouldRebuildPrompt ? mediaCtx.Body : undefined, + }); + } catch (err) { + logVerbose( + `${params.logLabel ?? "followup"}: media understanding failed, proceeding with raw content: ${err instanceof Error ? err.message : String(err)}`, + ); + } +} diff --git a/src/auto-reply/reply/followup-runner.test.ts b/src/auto-reply/reply/followup-runner.test.ts index a8e31f998de..ea7448b74e5 100644 --- a/src/auto-reply/reply/followup-runner.test.ts +++ b/src/auto-reply/reply/followup-runner.test.ts @@ -3,7 +3,8 @@ import { tmpdir } from "node:os"; import path from "node:path"; import { beforeEach, describe, expect, it, vi } from "vitest"; import { loadSessionStore, type SessionEntry, saveSessionStore } from "../../config/sessions.js"; -import type { FollowupRun } from "./queue.js"; +import { buildCollectPrompt } from "../../utils/queue-helpers.js"; +import type { FollowupRun } from "./queue/types.js"; import { createMockFollowupRun, createMockTypingController } from "./test-helpers.js"; const runEmbeddedPiAgentMock = vi.fn(); @@ -34,6 +35,10 @@ vi.mock("./route-reply.js", async (importOriginal) => { }); import { createFollowupRunner } from "./followup-runner.js"; +import { + applyDeferredMediaToQueuedRuns, + buildMediaAwareQueueSummaryPrompt, +} from "./queue/drain.js"; const ROUTABLE_TEST_CHANNELS = new Set([ "telegram", @@ -757,6 +762,69 @@ describe("createFollowupRunner media understanding", () => { expect(onBlockReply).toHaveBeenCalledWith(expect.objectContaining({ text: "Got it!" })); }); + it("propagates the queued message provider into deferred media context", async () => { + const transcriptText = "Provider-aware transcript"; + applyMediaUnderstandingMock.mockImplementationOnce( + async (params: { ctx: Record }) => { + expect(params.ctx.Provider).toBe("telegram"); + params.ctx.MediaUnderstanding = [ + { + kind: "audio.transcription", + text: transcriptText, + attachmentIndex: 0, + provider: "whisper", + }, + ]; + params.ctx.Transcript = transcriptText; + params.ctx.Body = `[Audio]\nTranscript:\n${transcriptText}`; + return { + outputs: [ + { + kind: "audio.transcription", + text: transcriptText, + attachmentIndex: 0, + provider: "whisper", + }, + ], + decisions: [], + appliedImage: false, + appliedAudio: true, + appliedVideo: false, + appliedFile: false, + }; + }, + ); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "done" }], + meta: {}, + }); + + const runner = createFollowupRunner({ + opts: { onBlockReply: vi.fn(async () => {}) }, + typing: createMockTypingController(), + typingMode: "instant", + defaultModel: "anthropic/claude-opus-4-5", + }); + + await runner( + createQueuedRun({ + prompt: "[User sent media without caption]", + run: { messageProvider: "telegram" }, + mediaContext: { + Body: "", + MediaPaths: ["/tmp/voice.ogg"], + MediaTypes: ["audio/ogg"], + }, + }), + ); + + expect(applyMediaUnderstandingMock).toHaveBeenCalledTimes(1); + const agentCall = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as { + prompt?: string; + }; + expect(agentCall?.prompt).toContain(transcriptText); + }); + it("applies media understanding for URL-only attachments", async () => { const transcriptText = "URL-only transcript"; applyMediaUnderstandingMock.mockImplementationOnce( @@ -1678,4 +1746,159 @@ describe("createFollowupRunner media understanding", () => { const matches = agentCall?.prompt?.match(/summarize this/g); expect(matches?.length).toBe(1); }); + + it("does not re-apply file extraction when the stored media body already has a file block", async () => { + const fileBlock = '\nreport content\n'; + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "processed" }], + meta: {}, + }); + + const runner = createFollowupRunner({ + opts: { onBlockReply: vi.fn(async () => {}) }, + typing: createMockTypingController(), + typingMode: "instant", + defaultModel: "anthropic/claude-opus-4-5", + }); + + await runner( + createQueuedRun({ + prompt: `[media attached: /tmp/report.pdf]\n${MEDIA_REPLY_HINT}\nsummarize this\n\n${fileBlock}`, + mediaContext: { + Body: `summarize this\n\n${fileBlock}`, + CommandBody: "summarize this", + RawBody: "summarize this", + MediaPaths: ["/tmp/report.pdf"], + MediaTypes: ["application/pdf"], + }, + }), + ); + + expect(applyMediaUnderstandingMock).not.toHaveBeenCalled(); + const agentCall = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as { + prompt?: string; + }; + expect(agentCall?.prompt?.match(/ { + it("preprocesses collect batches before synthesizing the followup prompt", async () => { + applyMediaUnderstandingMock.mockImplementationOnce( + async (params: { ctx: Record }) => { + params.ctx.MediaUnderstanding = [ + { + kind: "audio.transcription", + text: "collect transcript", + attachmentIndex: 0, + provider: "whisper", + }, + ]; + params.ctx.Transcript = "collect transcript"; + params.ctx.Body = "[Audio]\nTranscript:\ncollect transcript"; + return { + outputs: [ + { + kind: "audio.transcription", + text: "collect transcript", + attachmentIndex: 0, + provider: "whisper", + }, + ], + decisions: [], + appliedImage: false, + appliedAudio: true, + appliedVideo: false, + appliedFile: false, + }; + }, + ); + const items: FollowupRun[] = [ + createQueuedRun({ + prompt: "[media attached: /tmp/voice.ogg (audio/ogg)]\nsome text", + summaryLine: "some text", + originatingChannel: "telegram", + originatingTo: "chat:1", + run: { messageProvider: "telegram" }, + mediaContext: { + Body: "some text", + MediaPaths: ["/tmp/voice.ogg"], + MediaTypes: ["audio/ogg"], + }, + }), + createQueuedRun({ + prompt: "second text", + summaryLine: "second text", + originatingChannel: "telegram", + originatingTo: "chat:1", + run: { messageProvider: "telegram" }, + }), + ]; + + await applyDeferredMediaToQueuedRuns(items); + + const prompt = buildCollectPrompt({ + title: "[Queued messages while agent was busy]", + items, + renderItem: (item, idx) => `---\nQueued #${idx + 1}\n${item.prompt}`.trim(), + }); + + expect(prompt).toContain("collect transcript"); + expect(prompt).toContain("Queued #2\nsecond text"); + expect(prompt).not.toContain("[media attached: /tmp/voice.ogg"); + }); + + it("preprocesses dropped media items before building overflow summaries", async () => { + applyMediaUnderstandingMock.mockImplementationOnce( + async (params: { ctx: Record }) => { + params.ctx.MediaUnderstanding = [ + { + kind: "audio.transcription", + text: "overflow transcript", + attachmentIndex: 0, + provider: "whisper", + }, + ]; + params.ctx.Transcript = "overflow transcript"; + params.ctx.Body = "[Audio]\nTranscript:\noverflow transcript"; + return { + outputs: [ + { + kind: "audio.transcription", + text: "overflow transcript", + attachmentIndex: 0, + provider: "whisper", + }, + ], + decisions: [], + appliedImage: false, + appliedAudio: true, + appliedVideo: false, + appliedFile: false, + }; + }, + ); + const summaryPrompt = await buildMediaAwareQueueSummaryPrompt({ + dropPolicy: "summarize", + droppedCount: 1, + summaryLines: ["[media attached: /tmp/voice.ogg (audio/ogg)]"], + summaryItems: [ + createQueuedRun({ + prompt: "[media attached: /tmp/voice.ogg (audio/ogg)]", + summaryLine: "", + run: { messageProvider: "telegram" }, + mediaContext: { + Body: "", + MediaPaths: ["/tmp/voice.ogg"], + MediaTypes: ["audio/ogg"], + }, + }), + ], + noun: "message", + }); + + expect(summaryPrompt).toContain("[Queue overflow] Dropped 1 message due to cap."); + expect(summaryPrompt).toContain("overflow transcript"); + expect(summaryPrompt).not.toContain("[media attached: /tmp/voice.ogg"); + }); }); diff --git a/src/auto-reply/reply/followup-runner.ts b/src/auto-reply/reply/followup-runner.ts index d10d63d6389..50925cd1269 100644 --- a/src/auto-reply/reply/followup-runner.ts +++ b/src/auto-reply/reply/followup-runner.ts @@ -9,16 +9,14 @@ import type { SessionEntry } from "../../config/sessions.js"; import type { TypingMode } from "../../config/types.js"; import { logVerbose } from "../../globals.js"; import { registerAgentRunContext } from "../../infra/agent-events.js"; -import { applyMediaUnderstanding } from "../../media-understanding/apply.js"; import { defaultRuntime } from "../../runtime.js"; import { isInternalMessageChannel } from "../../utils/message-channel.js"; import { stripHeartbeatToken } from "../heartbeat.js"; -import { buildInboundMediaNote } from "../media-note.js"; -import type { MsgContext, OriginatingChannelType } from "../templating.js"; +import type { OriginatingChannelType } from "../templating.js"; import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../tokens.js"; import type { GetReplyOptions, ReplyPayload } from "../types.js"; import { resolveRunAuthProfile } from "./agent-runner-utils.js"; -import { parseInlineDirectives } from "./directive-handling.js"; +import { applyDeferredMediaUnderstandingToQueuedRun } from "./followup-media.js"; import { resolveOriginAccountId, resolveOriginMessageProvider, @@ -37,113 +35,6 @@ import { incrementRunCompactionCount, persistRunSessionUsage } from "./session-r import { createTypingSignaler } from "./typing-mode.js"; import type { TypingController } from "./typing.js"; -const MEDIA_ONLY_PLACEHOLDER = "[User sent media without caption]"; -const MEDIA_REPLY_HINT_PREFIX = "To send an image back, prefer the message tool"; -const LEADING_MEDIA_ATTACHED_LINE_RE = /^\[media attached(?: \d+\/\d+)?: [^\r\n]*\]$/; -const FILE_BLOCK_RE = / Boolean(value) && list.indexOf(value) === index, - ); - - let rebuilt = stripped; - for (const target of replacementTargets) { - const replaced = replaceLastOccurrence(rebuilt, target, updatedBody); - if (replaced !== undefined) { - rebuilt = replaced; - return [params.mediaNote?.trim(), rebuilt.trim()].filter(Boolean).join("\n").trim(); - } - } - - rebuilt = [rebuilt, updatedBody].filter(Boolean).join("\n\n"); - return [params.mediaNote?.trim(), rebuilt.trim()].filter(Boolean).join("\n").trim(); -} - export function createFollowupRunner(params: { opts?: GetReplyOptions; typing: TypingController; @@ -264,72 +155,7 @@ export function createFollowupRunner(params: { let bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen( activeSessionEntry?.systemPromptReport, ); - - // Apply media understanding for followup-queued messages when it was - // not applied (or failed) in the primary path. This ensures voice - // notes that arrived while the agent was mid-turn still get transcribed. - if (queued.mediaContext && !queued.mediaContext.MediaUnderstanding?.length) { - const hasMedia = Boolean( - queued.mediaContext.MediaPath?.trim() || - queued.mediaContext.MediaUrl?.trim() || - (Array.isArray(queued.mediaContext.MediaPaths) && - queued.mediaContext.MediaPaths.length > 0) || - (Array.isArray(queued.mediaContext.MediaUrls) && - queued.mediaContext.MediaUrls.length > 0), - ); - if (hasMedia) { - try { - const resolvedOriginalBody = - queued.mediaContext.CommandBody ?? - queued.mediaContext.RawBody ?? - queued.mediaContext.Body; - const mediaCtx = { - ...queued.mediaContext, - Body: resolvedOriginalBody, - } as MsgContext; - const originalBody = resolvedOriginalBody; - // Capture whether the resolved body already contains a file block - // BEFORE applyMediaUnderstanding mutates it — this detects prior - // extraction so we avoid double-inserting. Checking the body - // (not the full queued.prompt) avoids false positives from user - // messages that happen to contain literal " 0 || - muResult.appliedAudio || - muResult.appliedImage || - muResult.appliedVideo || - (muResult.appliedFile && !bodyAlreadyHasFileBlock); - if (shouldRebuildPrompt) { - // Rebuild the queued prompt from the mutated media context so the - // deferred path matches the primary path's prompt shape. - const newMediaNote = buildInboundMediaNote(mediaCtx); - queued.prompt = rebuildQueuedPromptWithMediaUnderstanding({ - prompt: queued.prompt, - originalBody, - updatedBody: mediaCtx.Body, - mediaNote: newMediaNote, - }); - logVerbose( - `followup: applied media understanding (audio=${muResult.appliedAudio}, image=${muResult.appliedImage}, video=${muResult.appliedVideo}, file=${muResult.appliedFile})`, - ); - } - } catch (err) { - logVerbose( - `followup: media understanding failed, proceeding with raw content: ${err instanceof Error ? err.message : String(err)}`, - ); - } - } - } + await applyDeferredMediaUnderstandingToQueuedRun(queued, { logLabel: "followup" }); try { const fallbackResult = await runWithModelFallback({ diff --git a/src/auto-reply/reply/get-reply-run.ts b/src/auto-reply/reply/get-reply-run.ts index 32999a66238..92ca1be7513 100644 --- a/src/auto-reply/reply/get-reply-run.ts +++ b/src/auto-reply/reply/get-reply-run.ts @@ -490,6 +490,8 @@ export async function runPreparedReply( Body: ctx.Body, CommandBody: ctx.CommandBody, RawBody: ctx.RawBody, + Provider: ctx.Provider ?? sessionCtx.Provider, + Surface: ctx.Surface ?? sessionCtx.Surface, MediaPath: ctx.MediaPath, MediaUrl: ctx.MediaUrl, MediaType: ctx.MediaType, diff --git a/src/auto-reply/reply/queue/drain.ts b/src/auto-reply/reply/queue/drain.ts index 1e2fb33e4e0..68c660fe2b8 100644 --- a/src/auto-reply/reply/queue/drain.ts +++ b/src/auto-reply/reply/queue/drain.ts @@ -3,15 +3,17 @@ import { resolveGlobalMap } from "../../../shared/global-singleton.js"; import { buildCollectPrompt, beginQueueDrain, + buildQueueSummaryLine, + buildQueueSummaryPrompt, clearQueueSummaryState, drainCollectQueueStep, drainNextQueueItem, hasCrossChannelItems, - previewQueueSummaryPrompt, waitForQueueDebounce, } from "../../../utils/queue-helpers.js"; +import { applyDeferredMediaUnderstandingToQueuedRun } from "../followup-media.js"; import { isRoutableChannel } from "../route-reply.js"; -import { FOLLOWUP_QUEUES } from "./state.js"; +import { FOLLOWUP_QUEUES, type FollowupQueueState } from "./state.js"; import type { FollowupRun } from "./types.js"; // Persists the most recent runFollowup callback per queue key so that @@ -68,6 +70,50 @@ function resolveCrossChannelKey(item: FollowupRun): { cross?: true; key?: string }; } +function clearFollowupQueueSummaryState(queue: FollowupQueueState): void { + clearQueueSummaryState(queue); + queue.summaryItems = []; +} + +export async function applyDeferredMediaToQueuedRuns(items: FollowupRun[]): Promise { + for (const item of items) { + await applyDeferredMediaUnderstandingToQueuedRun(item, { logLabel: "followup queue" }); + } +} + +async function resolveSummaryLines(items: FollowupRun[]): Promise { + const summaryLines: string[] = []; + for (const item of items) { + await applyDeferredMediaUnderstandingToQueuedRun(item, { logLabel: "followup queue" }); + summaryLines.push(buildQueueSummaryLine(item.summaryLine?.trim() || item.prompt.trim())); + } + return summaryLines; +} + +export async function buildMediaAwareQueueSummaryPrompt(params: { + dropPolicy: FollowupQueueState["dropPolicy"]; + droppedCount: number; + summaryLines: string[]; + summaryItems: FollowupRun[]; + noun: string; +}): Promise { + if (params.dropPolicy !== "summarize" || params.droppedCount <= 0) { + return undefined; + } + const summaryLines = + params.summaryItems.length > 0 + ? await resolveSummaryLines(params.summaryItems) + : params.summaryLines; + return buildQueueSummaryPrompt({ + state: { + dropPolicy: params.dropPolicy, + droppedCount: params.droppedCount, + summaryLines: [...summaryLines], + }, + noun: params.noun, + }); +} + export function scheduleFollowupDrain( key: string, runFollowup: (run: FollowupRun) => Promise, @@ -107,7 +153,14 @@ export function scheduleFollowupDrain( } const items = queue.items.slice(); - const summary = previewQueueSummaryPrompt({ state: queue, noun: "message" }); + await applyDeferredMediaToQueuedRuns(items); + const summary = await buildMediaAwareQueueSummaryPrompt({ + dropPolicy: queue.dropPolicy, + droppedCount: queue.droppedCount, + summaryLines: queue.summaryLines, + summaryItems: queue.summaryItems, + noun: "message", + }); const run = items.at(-1)?.run ?? queue.lastRun; if (!run) { break; @@ -129,12 +182,18 @@ export function scheduleFollowupDrain( }); queue.items.splice(0, items.length); if (summary) { - clearQueueSummaryState(queue); + clearFollowupQueueSummaryState(queue); } continue; } - const summaryPrompt = previewQueueSummaryPrompt({ state: queue, noun: "message" }); + const summaryPrompt = await buildMediaAwareQueueSummaryPrompt({ + dropPolicy: queue.dropPolicy, + droppedCount: queue.droppedCount, + summaryLines: queue.summaryLines, + summaryItems: queue.summaryItems, + noun: "message", + }); if (summaryPrompt) { const run = queue.lastRun; if (!run) { @@ -155,7 +214,7 @@ export function scheduleFollowupDrain( ) { break; } - clearQueueSummaryState(queue); + clearFollowupQueueSummaryState(queue); continue; } diff --git a/src/auto-reply/reply/queue/enqueue.ts b/src/auto-reply/reply/queue/enqueue.ts index 11da0db98fc..e58cc5ffac5 100644 --- a/src/auto-reply/reply/queue/enqueue.ts +++ b/src/auto-reply/reply/queue/enqueue.ts @@ -1,8 +1,8 @@ import { createDedupeCache } from "../../../infra/dedupe.js"; import { resolveGlobalSingleton } from "../../../shared/global-singleton.js"; -import { applyQueueDropPolicy, shouldSkipQueueItem } from "../../../utils/queue-helpers.js"; +import { buildQueueSummaryLine, shouldSkipQueueItem } from "../../../utils/queue-helpers.js"; import { kickFollowupDrainIfIdle } from "./drain.js"; -import { getExistingFollowupQueue, getFollowupQueue } from "./state.js"; +import { getExistingFollowupQueue, getFollowupQueue, type FollowupQueueState } from "./state.js"; import type { FollowupRun, QueueDedupeMode, QueueSettings } from "./types.js"; /** @@ -57,6 +57,34 @@ function isRunAlreadyQueued( return items.some((item) => item.prompt === run.prompt && hasSameRouting(item)); } +function applyFollowupQueueDropPolicy(queue: FollowupQueueState): boolean { + const cap = queue.cap; + if (cap <= 0 || queue.items.length < cap) { + return true; + } + if (queue.dropPolicy === "new") { + return false; + } + + const dropCount = queue.items.length - cap + 1; + const dropped = queue.items.splice(0, dropCount); + if (queue.dropPolicy === "summarize") { + for (const item of dropped) { + queue.droppedCount += 1; + queue.summaryItems.push(item); + queue.summaryLines.push( + buildQueueSummaryLine(item.summaryLine?.trim() || item.prompt.trim()), + ); + } + const limit = Math.max(0, cap); + while (queue.summaryLines.length > limit) { + queue.summaryLines.shift(); + queue.summaryItems.shift(); + } + } + return true; +} + export function enqueueFollowupRun( key: string, run: FollowupRun, @@ -83,10 +111,7 @@ export function enqueueFollowupRun( queue.lastEnqueuedAt = Date.now(); queue.lastRun = run.run; - const shouldEnqueue = applyQueueDropPolicy({ - queue, - summarize: (item) => item.summaryLine?.trim() || item.prompt.trim(), - }); + const shouldEnqueue = applyFollowupQueueDropPolicy(queue); if (!shouldEnqueue) { return false; } diff --git a/src/auto-reply/reply/queue/state.ts b/src/auto-reply/reply/queue/state.ts index 44208e727dd..94021dd0c4c 100644 --- a/src/auto-reply/reply/queue/state.ts +++ b/src/auto-reply/reply/queue/state.ts @@ -4,6 +4,7 @@ import type { FollowupRun, QueueDropPolicy, QueueMode, QueueSettings } from "./t export type FollowupQueueState = { items: FollowupRun[]; + summaryItems: FollowupRun[]; draining: boolean; lastEnqueuedAt: number; mode: QueueMode; @@ -47,6 +48,7 @@ export function getFollowupQueue(key: string, settings: QueueSettings): Followup const created: FollowupQueueState = { items: [], + summaryItems: [], draining: false, lastEnqueuedAt: 0, mode: settings.mode, @@ -78,6 +80,7 @@ export function clearFollowupQueue(key: string): number { } const cleared = queue.items.length + queue.droppedCount; queue.items.length = 0; + queue.summaryItems.length = 0; queue.droppedCount = 0; queue.summaryLines = []; queue.lastRun = undefined; diff --git a/src/auto-reply/reply/queue/types.ts b/src/auto-reply/reply/queue/types.ts index 637ceb0a149..291059a28d7 100644 --- a/src/auto-reply/reply/queue/types.ts +++ b/src/auto-reply/reply/queue/types.ts @@ -32,6 +32,8 @@ export type FollowupMediaContext = { Body?: string; CommandBody?: string; RawBody?: string; + Provider?: string; + Surface?: string; MediaPath?: string; MediaUrl?: string; MediaType?: string; @@ -47,6 +49,7 @@ export type FollowupMediaContext = { OriginatingTo?: string; AccountId?: string; MessageThreadId?: string | number; + DeferredMediaApplied?: boolean; }; export type FollowupRun = {