refactor(agents): centralize announce idempotency helpers

This commit is contained in:
Gustavo Madeira Santana 2026-02-15 10:30:49 -05:00
parent 4cd7965a28
commit 54bba3cea1
2 changed files with 37 additions and 21 deletions

View File

@ -0,0 +1,25 @@
export type AnnounceIdFromChildRunParams = {
childSessionKey: string;
childRunId: string;
};
export function buildAnnounceIdFromChildRun(params: AnnounceIdFromChildRunParams): string {
return `v1:${params.childSessionKey}:${params.childRunId}`;
}
export function buildAnnounceIdempotencyKey(announceId: string): string {
return `announce:${announceId}`;
}
export function resolveQueueAnnounceId(params: {
announceId?: string;
sessionKey: string;
enqueuedAt: number;
}): string {
const announceId = params.announceId?.trim();
if (announceId) {
return announceId;
}
// Backward-compatible fallback for queue items that predate announceId.
return `legacy:${params.sessionKey}:${params.enqueuedAt}`;
}

View File

@ -15,6 +15,11 @@ import {
mergeDeliveryContext,
normalizeDeliveryContext,
} from "../utils/delivery-context.js";
import {
buildAnnounceIdFromChildRun,
buildAnnounceIdempotencyKey,
resolveQueueAnnounceId,
} from "./announce-idempotency.js";
import {
isEmbeddedPiRunActive,
queueEmbeddedPiMessage,
@ -106,26 +111,6 @@ function resolveAnnounceOrigin(
return mergeDeliveryContext(requesterOrigin, deliveryContextFromSession(entry));
}
function buildAnnounceIdFromChildRun(params: {
childSessionKey: string;
childRunId: string;
}): string {
return `v1:${params.childSessionKey}:${params.childRunId}`;
}
function buildAnnounceIdempotencyKey(announceId: string): string {
return `announce:${announceId}`;
}
function resolveQueueAnnounceId(item: AnnounceQueueItem): string {
const announceId = item.announceId?.trim();
if (announceId) {
return announceId;
}
// Backward-compatible fallback for queue items that predate announceId.
return `legacy:${item.sessionKey}:${item.enqueuedAt}`;
}
async function sendAnnounce(item: AnnounceQueueItem) {
const requesterDepth = getSubagentDepthFromSessionStore(item.sessionKey);
const requesterIsSubagent = requesterDepth >= 1;
@ -134,7 +119,13 @@ async function sendAnnounce(item: AnnounceQueueItem) {
origin?.threadId != null && origin.threadId !== "" ? String(origin.threadId) : undefined;
// Share one announce identity across direct and queued delivery paths so
// gateway dedupe suppresses true retries without collapsing distinct events.
const idempotencyKey = buildAnnounceIdempotencyKey(resolveQueueAnnounceId(item));
const idempotencyKey = buildAnnounceIdempotencyKey(
resolveQueueAnnounceId({
announceId: item.announceId,
sessionKey: item.sessionKey,
enqueuedAt: item.enqueuedAt,
}),
);
await callGateway({
method: "agent",
params: {