From f07033ed3fb9006b35be6d8e2a64464ab40f3f3b Mon Sep 17 00:00:00 2001 From: Frank Yang Date: Fri, 13 Mar 2026 16:18:01 +0800 Subject: [PATCH] fix: address delivery dedupe review follow-ups (#44666) Merged via squash. Prepared head SHA: 8e6d254cc4781df66ee02b683c4ad72b5a633502 Co-authored-by: frankekn <4488090+frankekn@users.noreply.github.com> Co-authored-by: frankekn <4488090+frankekn@users.noreply.github.com> Reviewed-by: @frankekn --- CHANGELOG.md | 1 + .../subagent-registry.persistence.test.ts | 29 +++++ src/agents/subagent-registry.ts | 16 ++- src/config/sessions/sessions.test.ts | 82 ++++++++++++++ src/config/sessions/transcript.ts | 34 ++++++ .../delivery-dispatch.double-announce.test.ts | 47 +++++++- src/cron/isolated-agent/delivery-dispatch.ts | 103 +++++++++++++++++- src/gateway/server-methods/send.test.ts | 1 + src/gateway/server-methods/send.ts | 2 + src/infra/outbound/deliver.test.ts | 6 +- src/infra/outbound/deliver.ts | 13 +-- src/infra/outbound/delivery-queue.ts | 10 +- src/infra/outbound/message.test.ts | 33 ++++++ src/infra/outbound/message.ts | 9 +- src/infra/outbound/mirror.ts | 14 +++ .../outbound/outbound-send-service.test.ts | 41 +++++++ src/infra/outbound/outbound-send-service.ts | 9 +- 17 files changed, 413 insertions(+), 37 deletions(-) create mode 100644 src/infra/outbound/mirror.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 56143615b29..b3d4d0398ca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -93,6 +93,7 @@ Docs: https://docs.openclaw.ai - Cron/doctor: stop flagging canonical `agentTurn` and `systemEvent` payload kinds as legacy cron storage, while still normalizing whitespace-padded and non-canonical variants. (#44012) Thanks @shuicici. - ACP/client final-message delivery: preserve terminal assistant text snapshots before resolving `end_turn`, so ACP clients no longer drop the last visible reply when the gateway sends the final message body on the terminal chat event. (#17615) Thanks @pjeby. - Telegram/Discord status reactions: show a temporary compacting reaction during auto-compaction pauses and restore thinking afterward so the bot no longer appears frozen while context is being compacted. (#35474) thanks @Cypherm. +- Delivery/dedupe: trim completed direct-cron delivery cache correctly and keep mirrored transcript dedupe active even when transcript files contain malformed lines. (#44666) thanks @frankekn. ## 2026.3.11 diff --git a/src/agents/subagent-registry.persistence.test.ts b/src/agents/subagent-registry.persistence.test.ts index 468de55953c..32f2e06311e 100644 --- a/src/agents/subagent-registry.persistence.test.ts +++ b/src/agents/subagent-registry.persistence.test.ts @@ -343,6 +343,35 @@ describe("subagent registry persistence", () => { expect(afterSecond.runs["run-3"].cleanupCompletedAt).toBeDefined(); }); + it("retries cleanup announce after announce flow rejects", async () => { + const persisted = createPersistedEndedRun({ + runId: "run-reject", + childSessionKey: "agent:main:subagent:reject", + task: "reject announce", + cleanup: "keep", + }); + const registryPath = await writePersistedRegistry(persisted); + + announceSpy.mockRejectedValueOnce(new Error("announce boom")); + await restartRegistryAndFlush(); + + expect(announceSpy).toHaveBeenCalledTimes(1); + const afterFirst = JSON.parse(await fs.readFile(registryPath, "utf8")) as { + runs: Record; + }; + expect(afterFirst.runs["run-reject"].cleanupHandled).toBe(false); + expect(afterFirst.runs["run-reject"].cleanupCompletedAt).toBeUndefined(); + + announceSpy.mockResolvedValueOnce(true); + await restartRegistryAndFlush(); + + expect(announceSpy).toHaveBeenCalledTimes(2); + const afterSecond = JSON.parse(await fs.readFile(registryPath, "utf8")) as { + runs: Record; + }; + expect(afterSecond.runs["run-reject"].cleanupCompletedAt).toBeDefined(); + }); + it("keeps delete-mode runs retryable when announce is deferred", async () => { const persisted = createPersistedEndedRun({ runId: "run-4", diff --git a/src/agents/subagent-registry.ts b/src/agents/subagent-registry.ts index 477544bdd3d..d9c593c3e84 100644 --- a/src/agents/subagent-registry.ts +++ b/src/agents/subagent-registry.ts @@ -534,6 +534,18 @@ function startSubagentAnnounceCleanupFlow(runId: string, entry: SubagentRunRecor return false; } const requesterOrigin = normalizeDeliveryContext(entry.requesterOrigin); + const finalizeAnnounceCleanup = (didAnnounce: boolean) => { + void finalizeSubagentCleanup(runId, entry.cleanup, didAnnounce).catch((err) => { + defaultRuntime.log(`[warn] subagent cleanup finalize failed (${runId}): ${String(err)}`); + const current = subagentRuns.get(runId); + if (!current || current.cleanupCompletedAt) { + return; + } + current.cleanupHandled = false; + persistSubagentRuns(); + }); + }; + void runSubagentAnnounceFlow({ childSessionKey: entry.childSessionKey, childRunId: entry.runId, @@ -555,13 +567,13 @@ function startSubagentAnnounceCleanupFlow(runId: string, entry: SubagentRunRecor wakeOnDescendantSettle: entry.wakeOnDescendantSettle === true, }) .then((didAnnounce) => { - void finalizeSubagentCleanup(runId, entry.cleanup, didAnnounce); + finalizeAnnounceCleanup(didAnnounce); }) .catch((error) => { defaultRuntime.log( `[warn] Subagent announce flow failed during cleanup for run ${runId}: ${String(error)}`, ); - void finalizeSubagentCleanup(runId, entry.cleanup, false); + finalizeAnnounceCleanup(false); }); return true; } diff --git a/src/config/sessions/sessions.test.ts b/src/config/sessions/sessions.test.ts index dfe4b74e9b2..6866d6c10c1 100644 --- a/src/config/sessions/sessions.test.ts +++ b/src/config/sessions/sessions.test.ts @@ -324,6 +324,88 @@ describe("appendAssistantMessageToSessionTranscript", () => { expect(messageLine.message.content[0].text).toBe("Hello from delivery mirror!"); } }); + + it("does not append a duplicate delivery mirror for the same idempotency key", async () => { + const sessionId = "test-session-id"; + const sessionKey = "test-session"; + const store = { + [sessionKey]: { + sessionId, + chatType: "direct", + channel: "discord", + }, + }; + fs.writeFileSync(fixture.storePath(), JSON.stringify(store), "utf-8"); + + await appendAssistantMessageToSessionTranscript({ + sessionKey, + text: "Hello from delivery mirror!", + idempotencyKey: "mirror:test-source-message", + storePath: fixture.storePath(), + }); + await appendAssistantMessageToSessionTranscript({ + sessionKey, + text: "Hello from delivery mirror!", + idempotencyKey: "mirror:test-source-message", + storePath: fixture.storePath(), + }); + + const sessionFile = resolveSessionTranscriptPathInDir(sessionId, fixture.sessionsDir()); + const lines = fs.readFileSync(sessionFile, "utf-8").trim().split("\n"); + expect(lines.length).toBe(2); + + const messageLine = JSON.parse(lines[1]); + expect(messageLine.message.idempotencyKey).toBe("mirror:test-source-message"); + expect(messageLine.message.content[0].text).toBe("Hello from delivery mirror!"); + }); + + it("ignores malformed transcript lines when checking mirror idempotency", async () => { + const sessionId = "test-session-id"; + const sessionKey = "test-session"; + const store = { + [sessionKey]: { + sessionId, + chatType: "direct", + channel: "discord", + }, + }; + fs.writeFileSync(fixture.storePath(), JSON.stringify(store), "utf-8"); + + const sessionFile = resolveSessionTranscriptPathInDir(sessionId, fixture.sessionsDir()); + fs.writeFileSync( + sessionFile, + [ + JSON.stringify({ + type: "session", + version: 1, + id: sessionId, + timestamp: new Date().toISOString(), + cwd: process.cwd(), + }), + "{not-json", + JSON.stringify({ + type: "message", + message: { + role: "assistant", + idempotencyKey: "mirror:test-source-message", + content: [{ type: "text", text: "Hello from delivery mirror!" }], + }, + }), + ].join("\n") + "\n", + "utf-8", + ); + + const result = await appendAssistantMessageToSessionTranscript({ + sessionKey, + text: "Hello from delivery mirror!", + idempotencyKey: "mirror:test-source-message", + storePath: fixture.storePath(), + }); + + expect(result.ok).toBe(true); + const lines = fs.readFileSync(sessionFile, "utf-8").trim().split("\n"); + expect(lines.length).toBe(3); + }); }); describe("resolveAndPersistSessionFile", () => { diff --git a/src/config/sessions/transcript.ts b/src/config/sessions/transcript.ts index e6a8044f5c6..aa1890de953 100644 --- a/src/config/sessions/transcript.ts +++ b/src/config/sessions/transcript.ts @@ -135,6 +135,7 @@ export async function appendAssistantMessageToSessionTranscript(params: { sessionKey: string; text?: string; mediaUrls?: string[]; + idempotencyKey?: string; /** Optional override for store path (mostly for tests). */ storePath?: string; }): Promise<{ ok: true; sessionFile: string } | { ok: false; reason: string }> { @@ -179,6 +180,13 @@ export async function appendAssistantMessageToSessionTranscript(params: { await ensureSessionHeader({ sessionFile, sessionId: entry.sessionId }); + if ( + params.idempotencyKey && + (await transcriptHasIdempotencyKey(sessionFile, params.idempotencyKey)) + ) { + return { ok: true, sessionFile }; + } + const sessionManager = SessionManager.open(sessionFile); sessionManager.appendMessage({ role: "assistant", @@ -202,8 +210,34 @@ export async function appendAssistantMessageToSessionTranscript(params: { }, stopReason: "stop", timestamp: Date.now(), + ...(params.idempotencyKey ? { idempotencyKey: params.idempotencyKey } : {}), }); emitSessionTranscriptUpdate(sessionFile); return { ok: true, sessionFile }; } + +async function transcriptHasIdempotencyKey( + transcriptPath: string, + idempotencyKey: string, +): Promise { + try { + const raw = await fs.promises.readFile(transcriptPath, "utf-8"); + for (const line of raw.split(/\r?\n/)) { + if (!line.trim()) { + continue; + } + try { + const parsed = JSON.parse(line) as { message?: { idempotencyKey?: unknown } }; + if (parsed.message?.idempotencyKey === idempotencyKey) { + return true; + } + } catch { + continue; + } + } + } catch { + return false; + } + return false; +} diff --git a/src/cron/isolated-agent/delivery-dispatch.double-announce.test.ts b/src/cron/isolated-agent/delivery-dispatch.double-announce.test.ts index 2c7eb20a3c6..b245b4b9c94 100644 --- a/src/cron/isolated-agent/delivery-dispatch.double-announce.test.ts +++ b/src/cron/isolated-agent/delivery-dispatch.double-announce.test.ts @@ -49,7 +49,11 @@ vi.mock("./subagent-followup.js", () => ({ import { countActiveDescendantRuns } from "../../agents/subagent-registry.js"; import { deliverOutboundPayloads } from "../../infra/outbound/deliver.js"; import { shouldEnqueueCronMainSummary } from "../heartbeat-policy.js"; -import { dispatchCronDelivery } from "./delivery-dispatch.js"; +import { + dispatchCronDelivery, + getCompletedDirectCronDeliveriesCountForTests, + resetCompletedDirectCronDeliveriesForTests, +} from "./delivery-dispatch.js"; import type { DeliveryTargetResolution } from "./delivery-target.js"; import type { RunCronAgentTurnResult } from "./run.js"; import { @@ -84,7 +88,11 @@ function makeWithRunSession() { }); } -function makeBaseParams(overrides: { synthesizedText?: string; deliveryRequested?: boolean }) { +function makeBaseParams(overrides: { + synthesizedText?: string; + deliveryRequested?: boolean; + runSessionId?: string; +}) { const resolvedDelivery = makeResolvedDelivery(); return { cfg: {} as never, @@ -98,7 +106,7 @@ function makeBaseParams(overrides: { synthesizedText?: string; deliveryRequested } as never, agentId: "main", agentSessionKey: "agent:main", - runSessionId: "run-123", + runSessionId: overrides.runSessionId ?? "run-123", runStartedAt: Date.now(), runEndedAt: Date.now(), timeoutMs: 30_000, @@ -126,6 +134,7 @@ function makeBaseParams(overrides: { synthesizedText?: string; deliveryRequested describe("dispatchCronDelivery — double-announce guard", () => { beforeEach(() => { vi.clearAllMocks(); + resetCompletedDirectCronDeliveriesForTests(); vi.mocked(countActiveDescendantRuns).mockReturnValue(0); vi.mocked(expectsSubagentFollowup).mockReturnValue(false); vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false); @@ -278,6 +287,38 @@ describe("dispatchCronDelivery — double-announce guard", () => { expect(deliverOutboundPayloads).toHaveBeenCalledTimes(2); }); + it("keeps direct announce delivery idempotent across replay for the same run session", async () => { + vi.mocked(countActiveDescendantRuns).mockReturnValue(0); + vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false); + vi.mocked(deliverOutboundPayloads).mockResolvedValue([{ ok: true } as never]); + + const params = makeBaseParams({ synthesizedText: "Replay-safe cron update." }); + const first = await dispatchCronDelivery(params); + const second = await dispatchCronDelivery(params); + + expect(first.delivered).toBe(true); + expect(second.delivered).toBe(true); + expect(second.deliveryAttempted).toBe(true); + expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1); + }); + + it("prunes the completed-delivery cache back to the entry cap", async () => { + vi.mocked(countActiveDescendantRuns).mockReturnValue(0); + vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false); + vi.mocked(deliverOutboundPayloads).mockResolvedValue([{ ok: true } as never]); + + for (let i = 0; i < 2003; i += 1) { + const params = makeBaseParams({ + synthesizedText: `Replay-safe cron update ${i}.`, + runSessionId: `run-${i}`, + }); + const state = await dispatchCronDelivery(params); + expect(state.delivered).toBe(true); + } + + expect(getCompletedDirectCronDeliveriesCountForTests()).toBe(2000); + }); + it("does not retry permanent direct announce failures", async () => { vi.stubEnv("OPENCLAW_TEST_FAST", "1"); vi.mocked(countActiveDescendantRuns).mockReturnValue(0); diff --git a/src/cron/isolated-agent/delivery-dispatch.ts b/src/cron/isolated-agent/delivery-dispatch.ts index a5dc0190b72..6ddddf20669 100644 --- a/src/cron/isolated-agent/delivery-dispatch.ts +++ b/src/cron/isolated-agent/delivery-dispatch.ts @@ -5,7 +5,10 @@ import { createOutboundSendDeps, type CliDeps } from "../../cli/outbound-send-de import type { OpenClawConfig } from "../../config/config.js"; import { callGateway } from "../../gateway/call.js"; import { sleepWithAbort } from "../../infra/backoff.js"; -import { deliverOutboundPayloads } from "../../infra/outbound/deliver.js"; +import { + deliverOutboundPayloads, + type OutboundDeliveryResult, +} from "../../infra/outbound/deliver.js"; import { resolveAgentOutboundIdentity } from "../../infra/outbound/identity.js"; import { buildOutboundSessionContext } from "../../infra/outbound/session-context.js"; import { logWarn } from "../../logger.js"; @@ -131,6 +134,91 @@ const PERMANENT_DIRECT_CRON_DELIVERY_ERROR_PATTERNS: readonly RegExp[] = [ /outbound not configured for channel/i, ]; +type CompletedDirectCronDelivery = { + ts: number; + results: OutboundDeliveryResult[]; +}; + +const COMPLETED_DIRECT_CRON_DELIVERIES = new Map(); + +function cloneDeliveryResults( + results: readonly OutboundDeliveryResult[], +): OutboundDeliveryResult[] { + return results.map((result) => ({ + ...result, + ...(result.meta ? { meta: { ...result.meta } } : {}), + })); +} + +function pruneCompletedDirectCronDeliveries(now: number) { + const ttlMs = process.env.OPENCLAW_TEST_FAST === "1" ? 60_000 : 24 * 60 * 60 * 1000; + for (const [key, entry] of COMPLETED_DIRECT_CRON_DELIVERIES) { + if (now - entry.ts >= ttlMs) { + COMPLETED_DIRECT_CRON_DELIVERIES.delete(key); + } + } + const maxEntries = 2000; + if (COMPLETED_DIRECT_CRON_DELIVERIES.size <= maxEntries) { + return; + } + const entries = [...COMPLETED_DIRECT_CRON_DELIVERIES.entries()].toSorted( + (a, b) => a[1].ts - b[1].ts, + ); + const toDelete = COMPLETED_DIRECT_CRON_DELIVERIES.size - maxEntries; + for (let i = 0; i < toDelete; i += 1) { + const oldest = entries[i]; + if (!oldest) { + break; + } + COMPLETED_DIRECT_CRON_DELIVERIES.delete(oldest[0]); + } +} + +function rememberCompletedDirectCronDelivery( + idempotencyKey: string, + results: readonly OutboundDeliveryResult[], +) { + const now = Date.now(); + COMPLETED_DIRECT_CRON_DELIVERIES.set(idempotencyKey, { + ts: now, + results: cloneDeliveryResults(results), + }); + pruneCompletedDirectCronDeliveries(now); +} + +function getCompletedDirectCronDelivery( + idempotencyKey: string, +): OutboundDeliveryResult[] | undefined { + const now = Date.now(); + pruneCompletedDirectCronDeliveries(now); + const cached = COMPLETED_DIRECT_CRON_DELIVERIES.get(idempotencyKey); + if (!cached) { + return undefined; + } + return cloneDeliveryResults(cached.results); +} + +function buildDirectCronDeliveryIdempotencyKey(params: { + runSessionId: string; + delivery: SuccessfulDeliveryTarget; +}): string { + const threadId = + params.delivery.threadId == null || params.delivery.threadId === "" + ? "" + : String(params.delivery.threadId); + const accountId = params.delivery.accountId?.trim() ?? ""; + const normalizedTo = normalizeDeliveryTarget(params.delivery.channel, params.delivery.to); + return `cron-direct-delivery:v1:${params.runSessionId}:${params.delivery.channel}:${accountId}:${normalizedTo}:${threadId}`; +} + +export function resetCompletedDirectCronDeliveriesForTests() { + COMPLETED_DIRECT_CRON_DELIVERIES.clear(); +} + +export function getCompletedDirectCronDeliveriesCountForTests(): number { + return COMPLETED_DIRECT_CRON_DELIVERIES.size; +} + function summarizeDirectCronDeliveryError(error: unknown): string { if (error instanceof Error) { return error.message || "error"; @@ -221,6 +309,10 @@ export async function dispatchCronDelivery( options?: { retryTransient?: boolean }, ): Promise => { const identity = resolveAgentOutboundIdentity(params.cfgWithAgentDefaults, params.agentId); + const deliveryIdempotencyKey = buildDirectCronDeliveryIdempotencyKey({ + runSessionId: params.runSessionId, + delivery, + }); try { const payloadsForDelivery = deliveryPayloads.length > 0 @@ -240,6 +332,12 @@ export async function dispatchCronDelivery( }); } deliveryAttempted = true; + const cachedResults = getCompletedDirectCronDelivery(deliveryIdempotencyKey); + if (cachedResults) { + // Cached entries are only recorded after a successful non-empty delivery. + delivered = true; + return null; + } const deliverySession = buildOutboundSessionContext({ cfg: params.cfgWithAgentDefaults, agentId: params.agentId, @@ -273,6 +371,9 @@ export async function dispatchCronDelivery( }) : await runDelivery(); delivered = deliveryResults.length > 0; + if (delivered) { + rememberCompletedDirectCronDelivery(deliveryIdempotencyKey, deliveryResults); + } return null; } catch (err) { if (!params.deliveryBestEffort) { diff --git a/src/gateway/server-methods/send.test.ts b/src/gateway/server-methods/send.test.ts index 0220a4d6895..22cf527a46b 100644 --- a/src/gateway/server-methods/send.test.ts +++ b/src/gateway/server-methods/send.test.ts @@ -334,6 +334,7 @@ describe("gateway send mirroring", () => { sessionKey: "agent:main:main", text: "caption", mediaUrls: ["https://example.com/files/report.pdf?sig=1"], + idempotencyKey: "idem-2", }), }), ); diff --git a/src/gateway/server-methods/send.ts b/src/gateway/server-methods/send.ts index 1ec5d23c133..4dcdd1f61f9 100644 --- a/src/gateway/server-methods/send.ts +++ b/src/gateway/server-methods/send.ts @@ -268,6 +268,7 @@ export const sendHandlers: GatewayRequestHandlers = { agentId: effectiveAgentId, text: mirrorText || message, mediaUrls: mirrorMediaUrls.length > 0 ? mirrorMediaUrls : undefined, + idempotencyKey: idem, } : derivedRoute ? { @@ -275,6 +276,7 @@ export const sendHandlers: GatewayRequestHandlers = { agentId: effectiveAgentId, text: mirrorText || message, mediaUrls: mirrorMediaUrls.length > 0 ? mirrorMediaUrls : undefined, + idempotencyKey: idem, } : undefined, }); diff --git a/src/infra/outbound/deliver.test.ts b/src/infra/outbound/deliver.test.ts index e5b24c06a8c..8e5383ea055 100644 --- a/src/infra/outbound/deliver.test.ts +++ b/src/infra/outbound/deliver.test.ts @@ -869,11 +869,15 @@ describe("deliverOutboundPayloads", () => { sessionKey: "agent:main:main", text: "caption", mediaUrls: ["https://example.com/files/report.pdf?sig=1"], + idempotencyKey: "idem-deliver-1", }, }); expect(mocks.appendAssistantMessageToSessionTranscript).toHaveBeenCalledWith( - expect.objectContaining({ text: "report.pdf" }), + expect.objectContaining({ + text: "report.pdf", + idempotencyKey: "idem-deliver-1", + }), ); }); diff --git a/src/infra/outbound/deliver.ts b/src/infra/outbound/deliver.ts index caca4985370..79bbbc17179 100644 --- a/src/infra/outbound/deliver.ts +++ b/src/infra/outbound/deliver.ts @@ -38,6 +38,7 @@ import type { sendMessageWhatsApp } from "../../web/outbound.js"; import { throwIfAborted } from "./abort.js"; import { ackDelivery, enqueueDelivery, failDelivery } from "./delivery-queue.js"; import type { OutboundIdentity } from "./identity.js"; +import type { DeliveryMirror } from "./mirror.js"; import type { NormalizedOutboundPayload } from "./payloads.js"; import { normalizeReplyPayloadsForDelivery } from "./payloads.js"; import { isPlainTextSurface, sanitizeForPlainText } from "./sanitize-text.js"; @@ -237,16 +238,7 @@ type DeliverOutboundPayloadsCoreParams = { onPayload?: (payload: NormalizedOutboundPayload) => void; /** Session/agent context used for hooks and media local-root scoping. */ session?: OutboundSessionContext; - mirror?: { - sessionKey: string; - agentId?: string; - text?: string; - mediaUrls?: string[]; - /** Whether this message is being sent in a group/channel context */ - isGroup?: boolean; - /** Group or channel identifier for correlation with received events */ - groupId?: string; - }; + mirror?: DeliveryMirror; silent?: boolean; }; @@ -820,6 +812,7 @@ async function deliverOutboundPayloadsCore( agentId: params.mirror.agentId, sessionKey: params.mirror.sessionKey, text: mirrorText, + idempotencyKey: params.mirror.idempotencyKey, }); } } diff --git a/src/infra/outbound/delivery-queue.ts b/src/infra/outbound/delivery-queue.ts index 1cbab613bc4..97c37f911e4 100644 --- a/src/infra/outbound/delivery-queue.ts +++ b/src/infra/outbound/delivery-queue.ts @@ -4,6 +4,7 @@ import type { ReplyPayload } from "../../auto-reply/types.js"; import type { OpenClawConfig } from "../../config/config.js"; import { resolveStateDir } from "../../config/paths.js"; import { generateSecureUuid } from "../secure-random.js"; +import type { OutboundMirror } from "./mirror.js"; import type { OutboundChannel } from "./targets.js"; const QUEUE_DIRNAME = "delivery-queue"; @@ -18,13 +19,6 @@ const BACKOFF_MS: readonly number[] = [ 600_000, // retry 4: 10m ]; -type DeliveryMirrorPayload = { - sessionKey: string; - agentId?: string; - text?: string; - mediaUrls?: string[]; -}; - type QueuedDeliveryPayload = { channel: Exclude; to: string; @@ -40,7 +34,7 @@ type QueuedDeliveryPayload = { bestEffort?: boolean; gifPlayback?: boolean; silent?: boolean; - mirror?: DeliveryMirrorPayload; + mirror?: OutboundMirror; }; export interface QueuedDelivery extends QueuedDeliveryPayload { diff --git a/src/infra/outbound/message.test.ts b/src/infra/outbound/message.test.ts index 7cebff01d90..200d4d587e1 100644 --- a/src/infra/outbound/message.test.ts +++ b/src/infra/outbound/message.test.ts @@ -15,6 +15,16 @@ vi.mock("../../channels/plugins/index.js", () => ({ vi.mock("../../agents/agent-scope.js", () => ({ resolveDefaultAgentId: () => "main", + resolveSessionAgentId: ({ + sessionKey, + }: { + sessionKey?: string; + config?: unknown; + agentId?: string; + }) => { + const match = sessionKey?.match(/^agent:([^:]+)/i); + return match?.[1] ?? "main"; + }, resolveAgentWorkspaceDir: () => "/tmp/openclaw-test-workspace", })); @@ -71,6 +81,29 @@ describe("sendMessage", () => { ); }); + it("propagates the send idempotency key into mirrored transcript delivery", async () => { + await sendMessage({ + cfg: {}, + channel: "telegram", + to: "123456", + content: "hi", + idempotencyKey: "idem-send-1", + mirror: { + sessionKey: "agent:main:telegram:dm:123456", + }, + }); + + expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith( + expect.objectContaining({ + mirror: expect.objectContaining({ + sessionKey: "agent:main:telegram:dm:123456", + text: "hi", + idempotencyKey: "idem-send-1", + }), + }), + ); + }); + it("recovers telegram plugin resolution so message/send does not fail with Unknown channel: telegram", async () => { const telegramPlugin = { outbound: { deliveryMode: "direct" }, diff --git a/src/infra/outbound/message.ts b/src/infra/outbound/message.ts index f8c09538f75..8bfd6b104b5 100644 --- a/src/infra/outbound/message.ts +++ b/src/infra/outbound/message.ts @@ -16,6 +16,7 @@ import { type OutboundDeliveryResult, type OutboundSendDeps, } from "./deliver.js"; +import type { OutboundMirror } from "./mirror.js"; import { normalizeReplyPayloadsForDelivery } from "./payloads.js"; import { buildOutboundSessionContext } from "./session-context.js"; import { resolveOutboundTarget } from "./targets.js"; @@ -47,12 +48,7 @@ type MessageSendParams = { cfg?: OpenClawConfig; gateway?: MessageGatewayOptions; idempotencyKey?: string; - mirror?: { - sessionKey: string; - agentId?: string; - text?: string; - mediaUrls?: string[]; - }; + mirror?: OutboundMirror; abortSignal?: AbortSignal; silent?: boolean; }; @@ -232,6 +228,7 @@ export async function sendMessage(params: MessageSendParams): Promise ({ sendMessage: vi.fn(), sendPoll: vi.fn(), getAgentScopedMediaLocalRoots: vi.fn(() => ["/tmp/agent-roots"]), + appendAssistantMessageToSessionTranscript: vi.fn(async () => ({ ok: true, sessionFile: "x" })), })); vi.mock("../../channels/plugins/message-actions.js", () => ({ @@ -26,6 +27,10 @@ vi.mock("../../media/local-roots.js", async (importOriginal) => { }; }); +vi.mock("../../config/sessions.js", () => ({ + appendAssistantMessageToSessionTranscript: mocks.appendAssistantMessageToSessionTranscript, +})); + import { executePollAction, executeSendAction } from "./outbound-send-service.js"; describe("executeSendAction", () => { @@ -35,6 +40,7 @@ describe("executeSendAction", () => { mocks.sendPoll.mockClear(); mocks.getDefaultMediaLocalRoots.mockClear(); mocks.getAgentScopedMediaLocalRoots.mockClear(); + mocks.appendAssistantMessageToSessionTranscript.mockClear(); }); it("forwards ctx.agentId to sendMessage on core outbound path", async () => { @@ -127,6 +133,41 @@ describe("executeSendAction", () => { ); }); + it("passes mirror idempotency keys through plugin-handled sends", async () => { + mocks.dispatchChannelMessageAction.mockResolvedValue({ + ok: true, + value: { messageId: "msg-plugin" }, + continuePrompt: "", + output: "", + sessionId: "s1", + model: "gpt-5.2", + usage: {}, + }); + + await executeSendAction({ + ctx: { + cfg: {}, + channel: "discord", + params: { to: "channel:123", message: "hello" }, + dryRun: false, + mirror: { + sessionKey: "agent:main:discord:channel:123", + idempotencyKey: "idem-plugin-send-1", + }, + }, + to: "channel:123", + message: "hello", + }); + + expect(mocks.appendAssistantMessageToSessionTranscript).toHaveBeenCalledWith( + expect.objectContaining({ + sessionKey: "agent:main:discord:channel:123", + text: "hello", + idempotencyKey: "idem-plugin-send-1", + }), + ); + }); + it("forwards poll args to sendPoll on core outbound path", async () => { mocks.dispatchChannelMessageAction.mockResolvedValue(null); mocks.sendPoll.mockResolvedValue({ diff --git a/src/infra/outbound/outbound-send-service.ts b/src/infra/outbound/outbound-send-service.ts index 0661d5ddafb..a6b27a40b4c 100644 --- a/src/infra/outbound/outbound-send-service.ts +++ b/src/infra/outbound/outbound-send-service.ts @@ -9,6 +9,7 @@ import { throwIfAborted } from "./abort.js"; import type { OutboundSendDeps } from "./deliver.js"; import type { MessagePollResult, MessageSendResult } from "./message.js"; import { sendMessage, sendPoll } from "./message.js"; +import type { OutboundMirror } from "./mirror.js"; import { extractToolPayload } from "./tool-payload.js"; export type OutboundGatewayContext = { @@ -31,12 +32,7 @@ export type OutboundSendContext = { toolContext?: ChannelThreadingToolContext; deps?: OutboundSendDeps; dryRun: boolean; - mirror?: { - sessionKey: string; - agentId?: string; - text?: string; - mediaUrls?: string[]; - }; + mirror?: OutboundMirror; abortSignal?: AbortSignal; silent?: boolean; }; @@ -115,6 +111,7 @@ export async function executeSendAction(params: { sessionKey: params.ctx.mirror.sessionKey, text: mirrorText, mediaUrls: mirrorMediaUrls, + idempotencyKey: params.ctx.mirror.idempotencyKey, }); }, });