Reply: preserve deferred queued media context

This commit is contained in:
Joey Krug 2026-03-14 21:54:35 -04:00
parent 9859e86379
commit 8a5ad5e320
8 changed files with 572 additions and 190 deletions

View File

@ -0,0 +1,241 @@
import { logVerbose } from "../../globals.js";
import { applyMediaUnderstanding } from "../../media-understanding/apply.js";
import {
normalizeAttachments,
resolveAttachmentKind,
} from "../../media-understanding/attachments.js";
import { buildInboundMediaNote } from "../media-note.js";
import type { MsgContext } from "../templating.js";
import { parseInlineDirectives } from "./directive-handling.js";
import type { FollowupMediaContext, FollowupRun } from "./queue/types.js";
const MEDIA_ONLY_PLACEHOLDER = "[User sent media without caption]";
const MEDIA_REPLY_HINT_PREFIX = "To send an image back, prefer the message tool";
const LEADING_MEDIA_ATTACHED_LINE_RE = /^\[media attached(?: \d+\/\d+)?: [^\r\n]*\]$/;
const FILE_BLOCK_RE = /<file\s+name="/i;
function stripLeadingMediaAttachedLines(prompt: string): string {
const lines = prompt.split("\n");
let index = 0;
while (index < lines.length) {
const trimmed = lines[index]?.trim() ?? "";
if (!LEADING_MEDIA_ATTACHED_LINE_RE.test(trimmed)) {
break;
}
index += 1;
}
return lines.slice(index).join("\n").trim();
}
function stripLeadingMediaReplyHint(prompt: string): string {
const lines = prompt.split("\n");
if ((lines[0] ?? "").startsWith(MEDIA_REPLY_HINT_PREFIX)) {
return lines.slice(1).join("\n").trim();
}
return prompt.trim();
}
function replaceLastOccurrence(
value: string,
search: string,
replacement: string,
): string | undefined {
if (!search) {
return undefined;
}
const index = value.lastIndexOf(search);
if (index < 0) {
return undefined;
}
return `${value.slice(0, index)}${replacement}${value.slice(index + search.length)}`;
}
function stripInlineDirectives(text: string | undefined): string {
return parseInlineDirectives(text ?? "").cleaned.trim();
}
function normalizeUpdatedBody(params: { originalBody?: string; updatedBody?: string }): string {
const updatedBody = params.updatedBody?.trim();
if (!updatedBody) {
return "";
}
const originalBody = params.originalBody?.trim();
if (!originalBody) {
return updatedBody;
}
const cleanedOriginalBody = stripInlineDirectives(originalBody);
if (!cleanedOriginalBody) {
return updatedBody;
}
if (updatedBody === originalBody) {
return cleanedOriginalBody;
}
return (
replaceLastOccurrence(updatedBody, originalBody, cleanedOriginalBody) ?? updatedBody
).trim();
}
function rebuildQueuedPromptWithMediaUnderstanding(params: {
prompt: string;
originalBody?: string;
updatedBody?: string;
mediaNote?: string;
}): string {
let stripped = stripLeadingMediaAttachedLines(params.prompt);
if (!params.mediaNote) {
stripped = stripLeadingMediaReplyHint(stripped);
}
const updatedBody = normalizeUpdatedBody({
originalBody: params.originalBody,
updatedBody: params.updatedBody,
});
if (!updatedBody) {
return [params.mediaNote?.trim(), stripped].filter(Boolean).join("\n").trim();
}
const replacementTargets = [
params.originalBody?.trim(),
stripInlineDirectives(params.originalBody),
MEDIA_ONLY_PLACEHOLDER,
].filter(
(value, index, list): value is string => Boolean(value) && list.indexOf(value) === index,
);
let rebuilt = stripped;
for (const target of replacementTargets) {
const replaced = replaceLastOccurrence(rebuilt, target, updatedBody);
if (replaced !== undefined) {
rebuilt = replaced;
return [params.mediaNote?.trim(), rebuilt.trim()].filter(Boolean).join("\n").trim();
}
}
rebuilt = [rebuilt, updatedBody].filter(Boolean).join("\n\n");
return [params.mediaNote?.trim(), rebuilt.trim()].filter(Boolean).join("\n").trim();
}
function hasMediaAttachments(mediaContext: FollowupMediaContext): boolean {
return Boolean(
mediaContext.MediaPath?.trim() ||
mediaContext.MediaUrl?.trim() ||
(Array.isArray(mediaContext.MediaPaths) && mediaContext.MediaPaths.length > 0) ||
(Array.isArray(mediaContext.MediaUrls) && mediaContext.MediaUrls.length > 0),
);
}
function hasOnlyFileLikeAttachments(mediaContext: FollowupMediaContext): boolean {
const attachments = normalizeAttachments(mediaContext as MsgContext);
return (
attachments.length > 0 &&
attachments.every((attachment) => {
const kind = resolveAttachmentKind(attachment);
return kind !== "audio" && kind !== "image" && kind !== "video";
})
);
}
function snapshotUpdatedMediaContext(params: {
original: FollowupMediaContext;
mediaCtx: MsgContext;
updatedBody?: string;
}): FollowupMediaContext {
return {
...params.original,
Body: params.updatedBody ?? params.original.Body,
Transcript:
typeof params.mediaCtx.Transcript === "string"
? params.mediaCtx.Transcript
: params.original.Transcript,
MediaUnderstanding: Array.isArray(params.mediaCtx.MediaUnderstanding)
? [...params.mediaCtx.MediaUnderstanding]
: params.original.MediaUnderstanding,
MediaUnderstandingDecisions: Array.isArray(params.mediaCtx.MediaUnderstandingDecisions)
? [...params.mediaCtx.MediaUnderstandingDecisions]
: params.original.MediaUnderstandingDecisions,
DeferredMediaApplied: true,
};
}
export async function applyDeferredMediaUnderstandingToQueuedRun(
queued: FollowupRun,
params: { logLabel?: string } = {},
): Promise<void> {
const mediaContext = queued.mediaContext;
if (!mediaContext || mediaContext.DeferredMediaApplied) {
return;
}
if (mediaContext.MediaUnderstanding?.length) {
mediaContext.DeferredMediaApplied = true;
return;
}
if (!hasMediaAttachments(mediaContext)) {
mediaContext.DeferredMediaApplied = true;
return;
}
const resolvedOriginalBody =
mediaContext.CommandBody ?? mediaContext.RawBody ?? mediaContext.Body;
const bodyAlreadyHasFileBlock =
FILE_BLOCK_RE.test(resolvedOriginalBody ?? "") || FILE_BLOCK_RE.test(mediaContext.Body ?? "");
if (bodyAlreadyHasFileBlock && hasOnlyFileLikeAttachments(mediaContext)) {
mediaContext.DeferredMediaApplied = true;
return;
}
try {
const mediaCtx = {
...mediaContext,
Body: resolvedOriginalBody,
Provider:
mediaContext.Provider ??
queued.run.messageProvider ??
(typeof mediaContext.OriginatingChannel === "string"
? mediaContext.OriginatingChannel
: undefined),
Surface: mediaContext.Surface,
} as MsgContext;
const muResult = await applyMediaUnderstanding({
ctx: mediaCtx,
cfg: queued.run.config,
agentDir: queued.run.agentDir,
activeModel: {
provider: queued.run.provider,
model: queued.run.model,
},
});
const shouldRebuildPrompt =
muResult.outputs.length > 0 ||
muResult.appliedAudio ||
muResult.appliedImage ||
muResult.appliedVideo ||
(muResult.appliedFile && !bodyAlreadyHasFileBlock);
if (shouldRebuildPrompt) {
const newMediaNote = buildInboundMediaNote(mediaCtx);
queued.prompt = rebuildQueuedPromptWithMediaUnderstanding({
prompt: queued.prompt,
originalBody: resolvedOriginalBody,
updatedBody: mediaCtx.Body,
mediaNote: newMediaNote,
});
logVerbose(
`${params.logLabel ?? "followup"}: applied media understanding (audio=${muResult.appliedAudio}, image=${muResult.appliedImage}, video=${muResult.appliedVideo}, file=${muResult.appliedFile})`,
);
}
queued.mediaContext = snapshotUpdatedMediaContext({
original: mediaContext,
mediaCtx,
updatedBody: shouldRebuildPrompt ? mediaCtx.Body : undefined,
});
} catch (err) {
logVerbose(
`${params.logLabel ?? "followup"}: media understanding failed, proceeding with raw content: ${err instanceof Error ? err.message : String(err)}`,
);
}
}

View File

@ -3,7 +3,8 @@ import { tmpdir } from "node:os";
import path from "node:path";
import { beforeEach, describe, expect, it, vi } from "vitest";
import { loadSessionStore, type SessionEntry, saveSessionStore } from "../../config/sessions.js";
import type { FollowupRun } from "./queue.js";
import { buildCollectPrompt } from "../../utils/queue-helpers.js";
import type { FollowupRun } from "./queue/types.js";
import { createMockFollowupRun, createMockTypingController } from "./test-helpers.js";
const runEmbeddedPiAgentMock = vi.fn();
@ -34,6 +35,10 @@ vi.mock("./route-reply.js", async (importOriginal) => {
});
import { createFollowupRunner } from "./followup-runner.js";
import {
applyDeferredMediaToQueuedRuns,
buildMediaAwareQueueSummaryPrompt,
} from "./queue/drain.js";
const ROUTABLE_TEST_CHANNELS = new Set([
"telegram",
@ -757,6 +762,69 @@ describe("createFollowupRunner media understanding", () => {
expect(onBlockReply).toHaveBeenCalledWith(expect.objectContaining({ text: "Got it!" }));
});
it("propagates the queued message provider into deferred media context", async () => {
const transcriptText = "Provider-aware transcript";
applyMediaUnderstandingMock.mockImplementationOnce(
async (params: { ctx: Record<string, unknown> }) => {
expect(params.ctx.Provider).toBe("telegram");
params.ctx.MediaUnderstanding = [
{
kind: "audio.transcription",
text: transcriptText,
attachmentIndex: 0,
provider: "whisper",
},
];
params.ctx.Transcript = transcriptText;
params.ctx.Body = `[Audio]\nTranscript:\n${transcriptText}`;
return {
outputs: [
{
kind: "audio.transcription",
text: transcriptText,
attachmentIndex: 0,
provider: "whisper",
},
],
decisions: [],
appliedImage: false,
appliedAudio: true,
appliedVideo: false,
appliedFile: false,
};
},
);
runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [{ text: "done" }],
meta: {},
});
const runner = createFollowupRunner({
opts: { onBlockReply: vi.fn(async () => {}) },
typing: createMockTypingController(),
typingMode: "instant",
defaultModel: "anthropic/claude-opus-4-5",
});
await runner(
createQueuedRun({
prompt: "[User sent media without caption]",
run: { messageProvider: "telegram" },
mediaContext: {
Body: "",
MediaPaths: ["/tmp/voice.ogg"],
MediaTypes: ["audio/ogg"],
},
}),
);
expect(applyMediaUnderstandingMock).toHaveBeenCalledTimes(1);
const agentCall = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as {
prompt?: string;
};
expect(agentCall?.prompt).toContain(transcriptText);
});
it("applies media understanding for URL-only attachments", async () => {
const transcriptText = "URL-only transcript";
applyMediaUnderstandingMock.mockImplementationOnce(
@ -1678,4 +1746,159 @@ describe("createFollowupRunner media understanding", () => {
const matches = agentCall?.prompt?.match(/summarize this/g);
expect(matches?.length).toBe(1);
});
it("does not re-apply file extraction when the stored media body already has a file block", async () => {
const fileBlock = '<file name="report.pdf" mime="application/pdf">\nreport content\n</file>';
runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [{ text: "processed" }],
meta: {},
});
const runner = createFollowupRunner({
opts: { onBlockReply: vi.fn(async () => {}) },
typing: createMockTypingController(),
typingMode: "instant",
defaultModel: "anthropic/claude-opus-4-5",
});
await runner(
createQueuedRun({
prompt: `[media attached: /tmp/report.pdf]\n${MEDIA_REPLY_HINT}\nsummarize this\n\n${fileBlock}`,
mediaContext: {
Body: `summarize this\n\n${fileBlock}`,
CommandBody: "summarize this",
RawBody: "summarize this",
MediaPaths: ["/tmp/report.pdf"],
MediaTypes: ["application/pdf"],
},
}),
);
expect(applyMediaUnderstandingMock).not.toHaveBeenCalled();
const agentCall = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as {
prompt?: string;
};
expect(agentCall?.prompt?.match(/<file\s+name="report\.pdf"/g)).toHaveLength(1);
});
});
describe("followup queue drain deferred media understanding", () => {
it("preprocesses collect batches before synthesizing the followup prompt", async () => {
applyMediaUnderstandingMock.mockImplementationOnce(
async (params: { ctx: Record<string, unknown> }) => {
params.ctx.MediaUnderstanding = [
{
kind: "audio.transcription",
text: "collect transcript",
attachmentIndex: 0,
provider: "whisper",
},
];
params.ctx.Transcript = "collect transcript";
params.ctx.Body = "[Audio]\nTranscript:\ncollect transcript";
return {
outputs: [
{
kind: "audio.transcription",
text: "collect transcript",
attachmentIndex: 0,
provider: "whisper",
},
],
decisions: [],
appliedImage: false,
appliedAudio: true,
appliedVideo: false,
appliedFile: false,
};
},
);
const items: FollowupRun[] = [
createQueuedRun({
prompt: "[media attached: /tmp/voice.ogg (audio/ogg)]\nsome text",
summaryLine: "some text",
originatingChannel: "telegram",
originatingTo: "chat:1",
run: { messageProvider: "telegram" },
mediaContext: {
Body: "some text",
MediaPaths: ["/tmp/voice.ogg"],
MediaTypes: ["audio/ogg"],
},
}),
createQueuedRun({
prompt: "second text",
summaryLine: "second text",
originatingChannel: "telegram",
originatingTo: "chat:1",
run: { messageProvider: "telegram" },
}),
];
await applyDeferredMediaToQueuedRuns(items);
const prompt = buildCollectPrompt({
title: "[Queued messages while agent was busy]",
items,
renderItem: (item, idx) => `---\nQueued #${idx + 1}\n${item.prompt}`.trim(),
});
expect(prompt).toContain("collect transcript");
expect(prompt).toContain("Queued #2\nsecond text");
expect(prompt).not.toContain("[media attached: /tmp/voice.ogg");
});
it("preprocesses dropped media items before building overflow summaries", async () => {
applyMediaUnderstandingMock.mockImplementationOnce(
async (params: { ctx: Record<string, unknown> }) => {
params.ctx.MediaUnderstanding = [
{
kind: "audio.transcription",
text: "overflow transcript",
attachmentIndex: 0,
provider: "whisper",
},
];
params.ctx.Transcript = "overflow transcript";
params.ctx.Body = "[Audio]\nTranscript:\noverflow transcript";
return {
outputs: [
{
kind: "audio.transcription",
text: "overflow transcript",
attachmentIndex: 0,
provider: "whisper",
},
],
decisions: [],
appliedImage: false,
appliedAudio: true,
appliedVideo: false,
appliedFile: false,
};
},
);
const summaryPrompt = await buildMediaAwareQueueSummaryPrompt({
dropPolicy: "summarize",
droppedCount: 1,
summaryLines: ["[media attached: /tmp/voice.ogg (audio/ogg)]"],
summaryItems: [
createQueuedRun({
prompt: "[media attached: /tmp/voice.ogg (audio/ogg)]",
summaryLine: "",
run: { messageProvider: "telegram" },
mediaContext: {
Body: "",
MediaPaths: ["/tmp/voice.ogg"],
MediaTypes: ["audio/ogg"],
},
}),
],
noun: "message",
});
expect(summaryPrompt).toContain("[Queue overflow] Dropped 1 message due to cap.");
expect(summaryPrompt).toContain("overflow transcript");
expect(summaryPrompt).not.toContain("[media attached: /tmp/voice.ogg");
});
});

View File

@ -9,16 +9,14 @@ import type { SessionEntry } from "../../config/sessions.js";
import type { TypingMode } from "../../config/types.js";
import { logVerbose } from "../../globals.js";
import { registerAgentRunContext } from "../../infra/agent-events.js";
import { applyMediaUnderstanding } from "../../media-understanding/apply.js";
import { defaultRuntime } from "../../runtime.js";
import { isInternalMessageChannel } from "../../utils/message-channel.js";
import { stripHeartbeatToken } from "../heartbeat.js";
import { buildInboundMediaNote } from "../media-note.js";
import type { MsgContext, OriginatingChannelType } from "../templating.js";
import type { OriginatingChannelType } from "../templating.js";
import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../tokens.js";
import type { GetReplyOptions, ReplyPayload } from "../types.js";
import { resolveRunAuthProfile } from "./agent-runner-utils.js";
import { parseInlineDirectives } from "./directive-handling.js";
import { applyDeferredMediaUnderstandingToQueuedRun } from "./followup-media.js";
import {
resolveOriginAccountId,
resolveOriginMessageProvider,
@ -37,113 +35,6 @@ import { incrementRunCompactionCount, persistRunSessionUsage } from "./session-r
import { createTypingSignaler } from "./typing-mode.js";
import type { TypingController } from "./typing.js";
const MEDIA_ONLY_PLACEHOLDER = "[User sent media without caption]";
const MEDIA_REPLY_HINT_PREFIX = "To send an image back, prefer the message tool";
const LEADING_MEDIA_ATTACHED_LINE_RE = /^\[media attached(?: \d+\/\d+)?: [^\r\n]*\]$/;
const FILE_BLOCK_RE = /<file\s+name="/i;
function stripLeadingMediaAttachedLines(prompt: string): string {
const lines = prompt.split("\n");
let index = 0;
while (index < lines.length) {
const trimmed = lines[index]?.trim() ?? "";
if (!LEADING_MEDIA_ATTACHED_LINE_RE.test(trimmed)) {
break;
}
index += 1;
}
return lines.slice(index).join("\n").trim();
}
function stripLeadingMediaReplyHint(prompt: string): string {
const lines = prompt.split("\n");
if ((lines[0] ?? "").startsWith(MEDIA_REPLY_HINT_PREFIX)) {
return lines.slice(1).join("\n").trim();
}
return prompt.trim();
}
function replaceLastOccurrence(
value: string,
search: string,
replacement: string,
): string | undefined {
if (!search) {
return undefined;
}
const index = value.lastIndexOf(search);
if (index < 0) {
return undefined;
}
return `${value.slice(0, index)}${replacement}${value.slice(index + search.length)}`;
}
function stripInlineDirectives(text: string | undefined): string {
return parseInlineDirectives(text ?? "").cleaned.trim();
}
function normalizeUpdatedBody(params: { originalBody?: string; updatedBody?: string }): string {
const updatedBody = params.updatedBody?.trim();
if (!updatedBody) {
return "";
}
const originalBody = params.originalBody?.trim();
if (!originalBody) {
return updatedBody;
}
const cleanedOriginalBody = stripInlineDirectives(originalBody);
if (!cleanedOriginalBody) {
return updatedBody;
}
if (updatedBody === originalBody) {
return cleanedOriginalBody;
}
return (
replaceLastOccurrence(updatedBody, originalBody, cleanedOriginalBody) ?? updatedBody
).trim();
}
function rebuildQueuedPromptWithMediaUnderstanding(params: {
prompt: string;
originalBody?: string;
updatedBody?: string;
mediaNote?: string;
}): string {
let stripped = stripLeadingMediaAttachedLines(params.prompt);
if (!params.mediaNote) {
stripped = stripLeadingMediaReplyHint(stripped);
}
const updatedBody = normalizeUpdatedBody({
originalBody: params.originalBody,
updatedBody: params.updatedBody,
});
if (!updatedBody) {
return [params.mediaNote?.trim(), stripped].filter(Boolean).join("\n").trim();
}
const replacementTargets = [
params.originalBody?.trim(),
stripInlineDirectives(params.originalBody),
MEDIA_ONLY_PLACEHOLDER,
].filter(
(value, index, list): value is string => Boolean(value) && list.indexOf(value) === index,
);
let rebuilt = stripped;
for (const target of replacementTargets) {
const replaced = replaceLastOccurrence(rebuilt, target, updatedBody);
if (replaced !== undefined) {
rebuilt = replaced;
return [params.mediaNote?.trim(), rebuilt.trim()].filter(Boolean).join("\n").trim();
}
}
rebuilt = [rebuilt, updatedBody].filter(Boolean).join("\n\n");
return [params.mediaNote?.trim(), rebuilt.trim()].filter(Boolean).join("\n").trim();
}
export function createFollowupRunner(params: {
opts?: GetReplyOptions;
typing: TypingController;
@ -264,72 +155,7 @@ export function createFollowupRunner(params: {
let bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen(
activeSessionEntry?.systemPromptReport,
);
// Apply media understanding for followup-queued messages when it was
// not applied (or failed) in the primary path. This ensures voice
// notes that arrived while the agent was mid-turn still get transcribed.
if (queued.mediaContext && !queued.mediaContext.MediaUnderstanding?.length) {
const hasMedia = Boolean(
queued.mediaContext.MediaPath?.trim() ||
queued.mediaContext.MediaUrl?.trim() ||
(Array.isArray(queued.mediaContext.MediaPaths) &&
queued.mediaContext.MediaPaths.length > 0) ||
(Array.isArray(queued.mediaContext.MediaUrls) &&
queued.mediaContext.MediaUrls.length > 0),
);
if (hasMedia) {
try {
const resolvedOriginalBody =
queued.mediaContext.CommandBody ??
queued.mediaContext.RawBody ??
queued.mediaContext.Body;
const mediaCtx = {
...queued.mediaContext,
Body: resolvedOriginalBody,
} as MsgContext;
const originalBody = resolvedOriginalBody;
// Capture whether the resolved body already contains a file block
// BEFORE applyMediaUnderstanding mutates it — this detects prior
// extraction so we avoid double-inserting. Checking the body
// (not the full queued.prompt) avoids false positives from user
// messages that happen to contain literal "<file path=" text.
const bodyAlreadyHasFileBlock = FILE_BLOCK_RE.test(resolvedOriginalBody ?? "");
const muResult = await applyMediaUnderstanding({
ctx: mediaCtx,
cfg: queued.run.config,
agentDir: queued.run.agentDir,
activeModel: {
provider: queued.run.provider,
model: queued.run.model,
},
});
const shouldRebuildPrompt =
muResult.outputs.length > 0 ||
muResult.appliedAudio ||
muResult.appliedImage ||
muResult.appliedVideo ||
(muResult.appliedFile && !bodyAlreadyHasFileBlock);
if (shouldRebuildPrompt) {
// Rebuild the queued prompt from the mutated media context so the
// deferred path matches the primary path's prompt shape.
const newMediaNote = buildInboundMediaNote(mediaCtx);
queued.prompt = rebuildQueuedPromptWithMediaUnderstanding({
prompt: queued.prompt,
originalBody,
updatedBody: mediaCtx.Body,
mediaNote: newMediaNote,
});
logVerbose(
`followup: applied media understanding (audio=${muResult.appliedAudio}, image=${muResult.appliedImage}, video=${muResult.appliedVideo}, file=${muResult.appliedFile})`,
);
}
} catch (err) {
logVerbose(
`followup: media understanding failed, proceeding with raw content: ${err instanceof Error ? err.message : String(err)}`,
);
}
}
}
await applyDeferredMediaUnderstandingToQueuedRun(queued, { logLabel: "followup" });
try {
const fallbackResult = await runWithModelFallback({

View File

@ -490,6 +490,8 @@ export async function runPreparedReply(
Body: ctx.Body,
CommandBody: ctx.CommandBody,
RawBody: ctx.RawBody,
Provider: ctx.Provider ?? sessionCtx.Provider,
Surface: ctx.Surface ?? sessionCtx.Surface,
MediaPath: ctx.MediaPath,
MediaUrl: ctx.MediaUrl,
MediaType: ctx.MediaType,

View File

@ -3,15 +3,17 @@ import { resolveGlobalMap } from "../../../shared/global-singleton.js";
import {
buildCollectPrompt,
beginQueueDrain,
buildQueueSummaryLine,
buildQueueSummaryPrompt,
clearQueueSummaryState,
drainCollectQueueStep,
drainNextQueueItem,
hasCrossChannelItems,
previewQueueSummaryPrompt,
waitForQueueDebounce,
} from "../../../utils/queue-helpers.js";
import { applyDeferredMediaUnderstandingToQueuedRun } from "../followup-media.js";
import { isRoutableChannel } from "../route-reply.js";
import { FOLLOWUP_QUEUES } from "./state.js";
import { FOLLOWUP_QUEUES, type FollowupQueueState } from "./state.js";
import type { FollowupRun } from "./types.js";
// Persists the most recent runFollowup callback per queue key so that
@ -68,6 +70,50 @@ function resolveCrossChannelKey(item: FollowupRun): { cross?: true; key?: string
};
}
function clearFollowupQueueSummaryState(queue: FollowupQueueState): void {
clearQueueSummaryState(queue);
queue.summaryItems = [];
}
export async function applyDeferredMediaToQueuedRuns(items: FollowupRun[]): Promise<void> {
for (const item of items) {
await applyDeferredMediaUnderstandingToQueuedRun(item, { logLabel: "followup queue" });
}
}
async function resolveSummaryLines(items: FollowupRun[]): Promise<string[]> {
const summaryLines: string[] = [];
for (const item of items) {
await applyDeferredMediaUnderstandingToQueuedRun(item, { logLabel: "followup queue" });
summaryLines.push(buildQueueSummaryLine(item.summaryLine?.trim() || item.prompt.trim()));
}
return summaryLines;
}
export async function buildMediaAwareQueueSummaryPrompt(params: {
dropPolicy: FollowupQueueState["dropPolicy"];
droppedCount: number;
summaryLines: string[];
summaryItems: FollowupRun[];
noun: string;
}): Promise<string | undefined> {
if (params.dropPolicy !== "summarize" || params.droppedCount <= 0) {
return undefined;
}
const summaryLines =
params.summaryItems.length > 0
? await resolveSummaryLines(params.summaryItems)
: params.summaryLines;
return buildQueueSummaryPrompt({
state: {
dropPolicy: params.dropPolicy,
droppedCount: params.droppedCount,
summaryLines: [...summaryLines],
},
noun: params.noun,
});
}
export function scheduleFollowupDrain(
key: string,
runFollowup: (run: FollowupRun) => Promise<void>,
@ -107,7 +153,14 @@ export function scheduleFollowupDrain(
}
const items = queue.items.slice();
const summary = previewQueueSummaryPrompt({ state: queue, noun: "message" });
await applyDeferredMediaToQueuedRuns(items);
const summary = await buildMediaAwareQueueSummaryPrompt({
dropPolicy: queue.dropPolicy,
droppedCount: queue.droppedCount,
summaryLines: queue.summaryLines,
summaryItems: queue.summaryItems,
noun: "message",
});
const run = items.at(-1)?.run ?? queue.lastRun;
if (!run) {
break;
@ -129,12 +182,18 @@ export function scheduleFollowupDrain(
});
queue.items.splice(0, items.length);
if (summary) {
clearQueueSummaryState(queue);
clearFollowupQueueSummaryState(queue);
}
continue;
}
const summaryPrompt = previewQueueSummaryPrompt({ state: queue, noun: "message" });
const summaryPrompt = await buildMediaAwareQueueSummaryPrompt({
dropPolicy: queue.dropPolicy,
droppedCount: queue.droppedCount,
summaryLines: queue.summaryLines,
summaryItems: queue.summaryItems,
noun: "message",
});
if (summaryPrompt) {
const run = queue.lastRun;
if (!run) {
@ -155,7 +214,7 @@ export function scheduleFollowupDrain(
) {
break;
}
clearQueueSummaryState(queue);
clearFollowupQueueSummaryState(queue);
continue;
}

View File

@ -1,8 +1,8 @@
import { createDedupeCache } from "../../../infra/dedupe.js";
import { resolveGlobalSingleton } from "../../../shared/global-singleton.js";
import { applyQueueDropPolicy, shouldSkipQueueItem } from "../../../utils/queue-helpers.js";
import { buildQueueSummaryLine, shouldSkipQueueItem } from "../../../utils/queue-helpers.js";
import { kickFollowupDrainIfIdle } from "./drain.js";
import { getExistingFollowupQueue, getFollowupQueue } from "./state.js";
import { getExistingFollowupQueue, getFollowupQueue, type FollowupQueueState } from "./state.js";
import type { FollowupRun, QueueDedupeMode, QueueSettings } from "./types.js";
/**
@ -57,6 +57,34 @@ function isRunAlreadyQueued(
return items.some((item) => item.prompt === run.prompt && hasSameRouting(item));
}
function applyFollowupQueueDropPolicy(queue: FollowupQueueState): boolean {
const cap = queue.cap;
if (cap <= 0 || queue.items.length < cap) {
return true;
}
if (queue.dropPolicy === "new") {
return false;
}
const dropCount = queue.items.length - cap + 1;
const dropped = queue.items.splice(0, dropCount);
if (queue.dropPolicy === "summarize") {
for (const item of dropped) {
queue.droppedCount += 1;
queue.summaryItems.push(item);
queue.summaryLines.push(
buildQueueSummaryLine(item.summaryLine?.trim() || item.prompt.trim()),
);
}
const limit = Math.max(0, cap);
while (queue.summaryLines.length > limit) {
queue.summaryLines.shift();
queue.summaryItems.shift();
}
}
return true;
}
export function enqueueFollowupRun(
key: string,
run: FollowupRun,
@ -83,10 +111,7 @@ export function enqueueFollowupRun(
queue.lastEnqueuedAt = Date.now();
queue.lastRun = run.run;
const shouldEnqueue = applyQueueDropPolicy({
queue,
summarize: (item) => item.summaryLine?.trim() || item.prompt.trim(),
});
const shouldEnqueue = applyFollowupQueueDropPolicy(queue);
if (!shouldEnqueue) {
return false;
}

View File

@ -4,6 +4,7 @@ import type { FollowupRun, QueueDropPolicy, QueueMode, QueueSettings } from "./t
export type FollowupQueueState = {
items: FollowupRun[];
summaryItems: FollowupRun[];
draining: boolean;
lastEnqueuedAt: number;
mode: QueueMode;
@ -47,6 +48,7 @@ export function getFollowupQueue(key: string, settings: QueueSettings): Followup
const created: FollowupQueueState = {
items: [],
summaryItems: [],
draining: false,
lastEnqueuedAt: 0,
mode: settings.mode,
@ -78,6 +80,7 @@ export function clearFollowupQueue(key: string): number {
}
const cleared = queue.items.length + queue.droppedCount;
queue.items.length = 0;
queue.summaryItems.length = 0;
queue.droppedCount = 0;
queue.summaryLines = [];
queue.lastRun = undefined;

View File

@ -32,6 +32,8 @@ export type FollowupMediaContext = {
Body?: string;
CommandBody?: string;
RawBody?: string;
Provider?: string;
Surface?: string;
MediaPath?: string;
MediaUrl?: string;
MediaType?: string;
@ -47,6 +49,7 @@ export type FollowupMediaContext = {
OriginatingTo?: string;
AccountId?: string;
MessageThreadId?: string | number;
DeferredMediaApplied?: boolean;
};
export type FollowupRun = {