diff --git a/src/agents/announce-idempotency.ts b/src/agents/announce-idempotency.ts new file mode 100644 index 00000000000..e792b262704 --- /dev/null +++ b/src/agents/announce-idempotency.ts @@ -0,0 +1,25 @@ +export type AnnounceIdFromChildRunParams = { + childSessionKey: string; + childRunId: string; +}; + +export function buildAnnounceIdFromChildRun(params: AnnounceIdFromChildRunParams): string { + return `v1:${params.childSessionKey}:${params.childRunId}`; +} + +export function buildAnnounceIdempotencyKey(announceId: string): string { + return `announce:${announceId}`; +} + +export function resolveQueueAnnounceId(params: { + announceId?: string; + sessionKey: string; + enqueuedAt: number; +}): string { + const announceId = params.announceId?.trim(); + if (announceId) { + return announceId; + } + // Backward-compatible fallback for queue items that predate announceId. + return `legacy:${params.sessionKey}:${params.enqueuedAt}`; +} diff --git a/src/agents/subagent-announce.ts b/src/agents/subagent-announce.ts index 7b8aa418d2e..5293d9c4524 100644 --- a/src/agents/subagent-announce.ts +++ b/src/agents/subagent-announce.ts @@ -15,6 +15,11 @@ import { mergeDeliveryContext, normalizeDeliveryContext, } from "../utils/delivery-context.js"; +import { + buildAnnounceIdFromChildRun, + buildAnnounceIdempotencyKey, + resolveQueueAnnounceId, +} from "./announce-idempotency.js"; import { isEmbeddedPiRunActive, queueEmbeddedPiMessage, @@ -106,26 +111,6 @@ function resolveAnnounceOrigin( return mergeDeliveryContext(requesterOrigin, deliveryContextFromSession(entry)); } -function buildAnnounceIdFromChildRun(params: { - childSessionKey: string; - childRunId: string; -}): string { - return `v1:${params.childSessionKey}:${params.childRunId}`; -} - -function buildAnnounceIdempotencyKey(announceId: string): string { - return `announce:${announceId}`; -} - -function resolveQueueAnnounceId(item: AnnounceQueueItem): string { - const announceId = item.announceId?.trim(); - if (announceId) { - return announceId; - } - // Backward-compatible fallback for queue items that predate announceId. - return `legacy:${item.sessionKey}:${item.enqueuedAt}`; -} - async function sendAnnounce(item: AnnounceQueueItem) { const requesterDepth = getSubagentDepthFromSessionStore(item.sessionKey); const requesterIsSubagent = requesterDepth >= 1; @@ -134,7 +119,13 @@ async function sendAnnounce(item: AnnounceQueueItem) { origin?.threadId != null && origin.threadId !== "" ? String(origin.threadId) : undefined; // Share one announce identity across direct and queued delivery paths so // gateway dedupe suppresses true retries without collapsing distinct events. - const idempotencyKey = buildAnnounceIdempotencyKey(resolveQueueAnnounceId(item)); + const idempotencyKey = buildAnnounceIdempotencyKey( + resolveQueueAnnounceId({ + announceId: item.announceId, + sessionKey: item.sessionKey, + enqueuedAt: item.enqueuedAt, + }), + ); await callGateway({ method: "agent", params: {