fix: address delivery dedupe review follow-ups (#44666)

Merged via squash.

Prepared head SHA: 8e6d254cc4
Co-authored-by: frankekn <4488090+frankekn@users.noreply.github.com>
Co-authored-by: frankekn <4488090+frankekn@users.noreply.github.com>
Reviewed-by: @frankekn
This commit is contained in:
Frank Yang 2026-03-13 16:18:01 +08:00 committed by GitHub
parent 5b06619c67
commit f07033ed3f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 413 additions and 37 deletions

View File

@ -93,6 +93,7 @@ Docs: https://docs.openclaw.ai
- Cron/doctor: stop flagging canonical `agentTurn` and `systemEvent` payload kinds as legacy cron storage, while still normalizing whitespace-padded and non-canonical variants. (#44012) Thanks @shuicici.
- ACP/client final-message delivery: preserve terminal assistant text snapshots before resolving `end_turn`, so ACP clients no longer drop the last visible reply when the gateway sends the final message body on the terminal chat event. (#17615) Thanks @pjeby.
- Telegram/Discord status reactions: show a temporary compacting reaction during auto-compaction pauses and restore thinking afterward so the bot no longer appears frozen while context is being compacted. (#35474) thanks @Cypherm.
- Delivery/dedupe: trim completed direct-cron delivery cache correctly and keep mirrored transcript dedupe active even when transcript files contain malformed lines. (#44666) thanks @frankekn.
## 2026.3.11

View File

@ -343,6 +343,35 @@ describe("subagent registry persistence", () => {
expect(afterSecond.runs["run-3"].cleanupCompletedAt).toBeDefined();
});
it("retries cleanup announce after announce flow rejects", async () => {
const persisted = createPersistedEndedRun({
runId: "run-reject",
childSessionKey: "agent:main:subagent:reject",
task: "reject announce",
cleanup: "keep",
});
const registryPath = await writePersistedRegistry(persisted);
announceSpy.mockRejectedValueOnce(new Error("announce boom"));
await restartRegistryAndFlush();
expect(announceSpy).toHaveBeenCalledTimes(1);
const afterFirst = JSON.parse(await fs.readFile(registryPath, "utf8")) as {
runs: Record<string, { cleanupHandled?: boolean; cleanupCompletedAt?: number }>;
};
expect(afterFirst.runs["run-reject"].cleanupHandled).toBe(false);
expect(afterFirst.runs["run-reject"].cleanupCompletedAt).toBeUndefined();
announceSpy.mockResolvedValueOnce(true);
await restartRegistryAndFlush();
expect(announceSpy).toHaveBeenCalledTimes(2);
const afterSecond = JSON.parse(await fs.readFile(registryPath, "utf8")) as {
runs: Record<string, { cleanupCompletedAt?: number }>;
};
expect(afterSecond.runs["run-reject"].cleanupCompletedAt).toBeDefined();
});
it("keeps delete-mode runs retryable when announce is deferred", async () => {
const persisted = createPersistedEndedRun({
runId: "run-4",

View File

@ -534,6 +534,18 @@ function startSubagentAnnounceCleanupFlow(runId: string, entry: SubagentRunRecor
return false;
}
const requesterOrigin = normalizeDeliveryContext(entry.requesterOrigin);
const finalizeAnnounceCleanup = (didAnnounce: boolean) => {
void finalizeSubagentCleanup(runId, entry.cleanup, didAnnounce).catch((err) => {
defaultRuntime.log(`[warn] subagent cleanup finalize failed (${runId}): ${String(err)}`);
const current = subagentRuns.get(runId);
if (!current || current.cleanupCompletedAt) {
return;
}
current.cleanupHandled = false;
persistSubagentRuns();
});
};
void runSubagentAnnounceFlow({
childSessionKey: entry.childSessionKey,
childRunId: entry.runId,
@ -555,13 +567,13 @@ function startSubagentAnnounceCleanupFlow(runId: string, entry: SubagentRunRecor
wakeOnDescendantSettle: entry.wakeOnDescendantSettle === true,
})
.then((didAnnounce) => {
void finalizeSubagentCleanup(runId, entry.cleanup, didAnnounce);
finalizeAnnounceCleanup(didAnnounce);
})
.catch((error) => {
defaultRuntime.log(
`[warn] Subagent announce flow failed during cleanup for run ${runId}: ${String(error)}`,
);
void finalizeSubagentCleanup(runId, entry.cleanup, false);
finalizeAnnounceCleanup(false);
});
return true;
}

View File

@ -324,6 +324,88 @@ describe("appendAssistantMessageToSessionTranscript", () => {
expect(messageLine.message.content[0].text).toBe("Hello from delivery mirror!");
}
});
it("does not append a duplicate delivery mirror for the same idempotency key", async () => {
const sessionId = "test-session-id";
const sessionKey = "test-session";
const store = {
[sessionKey]: {
sessionId,
chatType: "direct",
channel: "discord",
},
};
fs.writeFileSync(fixture.storePath(), JSON.stringify(store), "utf-8");
await appendAssistantMessageToSessionTranscript({
sessionKey,
text: "Hello from delivery mirror!",
idempotencyKey: "mirror:test-source-message",
storePath: fixture.storePath(),
});
await appendAssistantMessageToSessionTranscript({
sessionKey,
text: "Hello from delivery mirror!",
idempotencyKey: "mirror:test-source-message",
storePath: fixture.storePath(),
});
const sessionFile = resolveSessionTranscriptPathInDir(sessionId, fixture.sessionsDir());
const lines = fs.readFileSync(sessionFile, "utf-8").trim().split("\n");
expect(lines.length).toBe(2);
const messageLine = JSON.parse(lines[1]);
expect(messageLine.message.idempotencyKey).toBe("mirror:test-source-message");
expect(messageLine.message.content[0].text).toBe("Hello from delivery mirror!");
});
it("ignores malformed transcript lines when checking mirror idempotency", async () => {
const sessionId = "test-session-id";
const sessionKey = "test-session";
const store = {
[sessionKey]: {
sessionId,
chatType: "direct",
channel: "discord",
},
};
fs.writeFileSync(fixture.storePath(), JSON.stringify(store), "utf-8");
const sessionFile = resolveSessionTranscriptPathInDir(sessionId, fixture.sessionsDir());
fs.writeFileSync(
sessionFile,
[
JSON.stringify({
type: "session",
version: 1,
id: sessionId,
timestamp: new Date().toISOString(),
cwd: process.cwd(),
}),
"{not-json",
JSON.stringify({
type: "message",
message: {
role: "assistant",
idempotencyKey: "mirror:test-source-message",
content: [{ type: "text", text: "Hello from delivery mirror!" }],
},
}),
].join("\n") + "\n",
"utf-8",
);
const result = await appendAssistantMessageToSessionTranscript({
sessionKey,
text: "Hello from delivery mirror!",
idempotencyKey: "mirror:test-source-message",
storePath: fixture.storePath(),
});
expect(result.ok).toBe(true);
const lines = fs.readFileSync(sessionFile, "utf-8").trim().split("\n");
expect(lines.length).toBe(3);
});
});
describe("resolveAndPersistSessionFile", () => {

View File

@ -135,6 +135,7 @@ export async function appendAssistantMessageToSessionTranscript(params: {
sessionKey: string;
text?: string;
mediaUrls?: string[];
idempotencyKey?: string;
/** Optional override for store path (mostly for tests). */
storePath?: string;
}): Promise<{ ok: true; sessionFile: string } | { ok: false; reason: string }> {
@ -179,6 +180,13 @@ export async function appendAssistantMessageToSessionTranscript(params: {
await ensureSessionHeader({ sessionFile, sessionId: entry.sessionId });
if (
params.idempotencyKey &&
(await transcriptHasIdempotencyKey(sessionFile, params.idempotencyKey))
) {
return { ok: true, sessionFile };
}
const sessionManager = SessionManager.open(sessionFile);
sessionManager.appendMessage({
role: "assistant",
@ -202,8 +210,34 @@ export async function appendAssistantMessageToSessionTranscript(params: {
},
stopReason: "stop",
timestamp: Date.now(),
...(params.idempotencyKey ? { idempotencyKey: params.idempotencyKey } : {}),
});
emitSessionTranscriptUpdate(sessionFile);
return { ok: true, sessionFile };
}
async function transcriptHasIdempotencyKey(
transcriptPath: string,
idempotencyKey: string,
): Promise<boolean> {
try {
const raw = await fs.promises.readFile(transcriptPath, "utf-8");
for (const line of raw.split(/\r?\n/)) {
if (!line.trim()) {
continue;
}
try {
const parsed = JSON.parse(line) as { message?: { idempotencyKey?: unknown } };
if (parsed.message?.idempotencyKey === idempotencyKey) {
return true;
}
} catch {
continue;
}
}
} catch {
return false;
}
return false;
}

View File

@ -49,7 +49,11 @@ vi.mock("./subagent-followup.js", () => ({
import { countActiveDescendantRuns } from "../../agents/subagent-registry.js";
import { deliverOutboundPayloads } from "../../infra/outbound/deliver.js";
import { shouldEnqueueCronMainSummary } from "../heartbeat-policy.js";
import { dispatchCronDelivery } from "./delivery-dispatch.js";
import {
dispatchCronDelivery,
getCompletedDirectCronDeliveriesCountForTests,
resetCompletedDirectCronDeliveriesForTests,
} from "./delivery-dispatch.js";
import type { DeliveryTargetResolution } from "./delivery-target.js";
import type { RunCronAgentTurnResult } from "./run.js";
import {
@ -84,7 +88,11 @@ function makeWithRunSession() {
});
}
function makeBaseParams(overrides: { synthesizedText?: string; deliveryRequested?: boolean }) {
function makeBaseParams(overrides: {
synthesizedText?: string;
deliveryRequested?: boolean;
runSessionId?: string;
}) {
const resolvedDelivery = makeResolvedDelivery();
return {
cfg: {} as never,
@ -98,7 +106,7 @@ function makeBaseParams(overrides: { synthesizedText?: string; deliveryRequested
} as never,
agentId: "main",
agentSessionKey: "agent:main",
runSessionId: "run-123",
runSessionId: overrides.runSessionId ?? "run-123",
runStartedAt: Date.now(),
runEndedAt: Date.now(),
timeoutMs: 30_000,
@ -126,6 +134,7 @@ function makeBaseParams(overrides: { synthesizedText?: string; deliveryRequested
describe("dispatchCronDelivery — double-announce guard", () => {
beforeEach(() => {
vi.clearAllMocks();
resetCompletedDirectCronDeliveriesForTests();
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
vi.mocked(expectsSubagentFollowup).mockReturnValue(false);
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
@ -278,6 +287,38 @@ describe("dispatchCronDelivery — double-announce guard", () => {
expect(deliverOutboundPayloads).toHaveBeenCalledTimes(2);
});
it("keeps direct announce delivery idempotent across replay for the same run session", async () => {
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
vi.mocked(deliverOutboundPayloads).mockResolvedValue([{ ok: true } as never]);
const params = makeBaseParams({ synthesizedText: "Replay-safe cron update." });
const first = await dispatchCronDelivery(params);
const second = await dispatchCronDelivery(params);
expect(first.delivered).toBe(true);
expect(second.delivered).toBe(true);
expect(second.deliveryAttempted).toBe(true);
expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1);
});
it("prunes the completed-delivery cache back to the entry cap", async () => {
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
vi.mocked(deliverOutboundPayloads).mockResolvedValue([{ ok: true } as never]);
for (let i = 0; i < 2003; i += 1) {
const params = makeBaseParams({
synthesizedText: `Replay-safe cron update ${i}.`,
runSessionId: `run-${i}`,
});
const state = await dispatchCronDelivery(params);
expect(state.delivered).toBe(true);
}
expect(getCompletedDirectCronDeliveriesCountForTests()).toBe(2000);
});
it("does not retry permanent direct announce failures", async () => {
vi.stubEnv("OPENCLAW_TEST_FAST", "1");
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);

View File

@ -5,7 +5,10 @@ import { createOutboundSendDeps, type CliDeps } from "../../cli/outbound-send-de
import type { OpenClawConfig } from "../../config/config.js";
import { callGateway } from "../../gateway/call.js";
import { sleepWithAbort } from "../../infra/backoff.js";
import { deliverOutboundPayloads } from "../../infra/outbound/deliver.js";
import {
deliverOutboundPayloads,
type OutboundDeliveryResult,
} from "../../infra/outbound/deliver.js";
import { resolveAgentOutboundIdentity } from "../../infra/outbound/identity.js";
import { buildOutboundSessionContext } from "../../infra/outbound/session-context.js";
import { logWarn } from "../../logger.js";
@ -131,6 +134,91 @@ const PERMANENT_DIRECT_CRON_DELIVERY_ERROR_PATTERNS: readonly RegExp[] = [
/outbound not configured for channel/i,
];
type CompletedDirectCronDelivery = {
ts: number;
results: OutboundDeliveryResult[];
};
const COMPLETED_DIRECT_CRON_DELIVERIES = new Map<string, CompletedDirectCronDelivery>();
function cloneDeliveryResults(
results: readonly OutboundDeliveryResult[],
): OutboundDeliveryResult[] {
return results.map((result) => ({
...result,
...(result.meta ? { meta: { ...result.meta } } : {}),
}));
}
function pruneCompletedDirectCronDeliveries(now: number) {
const ttlMs = process.env.OPENCLAW_TEST_FAST === "1" ? 60_000 : 24 * 60 * 60 * 1000;
for (const [key, entry] of COMPLETED_DIRECT_CRON_DELIVERIES) {
if (now - entry.ts >= ttlMs) {
COMPLETED_DIRECT_CRON_DELIVERIES.delete(key);
}
}
const maxEntries = 2000;
if (COMPLETED_DIRECT_CRON_DELIVERIES.size <= maxEntries) {
return;
}
const entries = [...COMPLETED_DIRECT_CRON_DELIVERIES.entries()].toSorted(
(a, b) => a[1].ts - b[1].ts,
);
const toDelete = COMPLETED_DIRECT_CRON_DELIVERIES.size - maxEntries;
for (let i = 0; i < toDelete; i += 1) {
const oldest = entries[i];
if (!oldest) {
break;
}
COMPLETED_DIRECT_CRON_DELIVERIES.delete(oldest[0]);
}
}
function rememberCompletedDirectCronDelivery(
idempotencyKey: string,
results: readonly OutboundDeliveryResult[],
) {
const now = Date.now();
COMPLETED_DIRECT_CRON_DELIVERIES.set(idempotencyKey, {
ts: now,
results: cloneDeliveryResults(results),
});
pruneCompletedDirectCronDeliveries(now);
}
function getCompletedDirectCronDelivery(
idempotencyKey: string,
): OutboundDeliveryResult[] | undefined {
const now = Date.now();
pruneCompletedDirectCronDeliveries(now);
const cached = COMPLETED_DIRECT_CRON_DELIVERIES.get(idempotencyKey);
if (!cached) {
return undefined;
}
return cloneDeliveryResults(cached.results);
}
function buildDirectCronDeliveryIdempotencyKey(params: {
runSessionId: string;
delivery: SuccessfulDeliveryTarget;
}): string {
const threadId =
params.delivery.threadId == null || params.delivery.threadId === ""
? ""
: String(params.delivery.threadId);
const accountId = params.delivery.accountId?.trim() ?? "";
const normalizedTo = normalizeDeliveryTarget(params.delivery.channel, params.delivery.to);
return `cron-direct-delivery:v1:${params.runSessionId}:${params.delivery.channel}:${accountId}:${normalizedTo}:${threadId}`;
}
export function resetCompletedDirectCronDeliveriesForTests() {
COMPLETED_DIRECT_CRON_DELIVERIES.clear();
}
export function getCompletedDirectCronDeliveriesCountForTests(): number {
return COMPLETED_DIRECT_CRON_DELIVERIES.size;
}
function summarizeDirectCronDeliveryError(error: unknown): string {
if (error instanceof Error) {
return error.message || "error";
@ -221,6 +309,10 @@ export async function dispatchCronDelivery(
options?: { retryTransient?: boolean },
): Promise<RunCronAgentTurnResult | null> => {
const identity = resolveAgentOutboundIdentity(params.cfgWithAgentDefaults, params.agentId);
const deliveryIdempotencyKey = buildDirectCronDeliveryIdempotencyKey({
runSessionId: params.runSessionId,
delivery,
});
try {
const payloadsForDelivery =
deliveryPayloads.length > 0
@ -240,6 +332,12 @@ export async function dispatchCronDelivery(
});
}
deliveryAttempted = true;
const cachedResults = getCompletedDirectCronDelivery(deliveryIdempotencyKey);
if (cachedResults) {
// Cached entries are only recorded after a successful non-empty delivery.
delivered = true;
return null;
}
const deliverySession = buildOutboundSessionContext({
cfg: params.cfgWithAgentDefaults,
agentId: params.agentId,
@ -273,6 +371,9 @@ export async function dispatchCronDelivery(
})
: await runDelivery();
delivered = deliveryResults.length > 0;
if (delivered) {
rememberCompletedDirectCronDelivery(deliveryIdempotencyKey, deliveryResults);
}
return null;
} catch (err) {
if (!params.deliveryBestEffort) {

View File

@ -334,6 +334,7 @@ describe("gateway send mirroring", () => {
sessionKey: "agent:main:main",
text: "caption",
mediaUrls: ["https://example.com/files/report.pdf?sig=1"],
idempotencyKey: "idem-2",
}),
}),
);

View File

@ -268,6 +268,7 @@ export const sendHandlers: GatewayRequestHandlers = {
agentId: effectiveAgentId,
text: mirrorText || message,
mediaUrls: mirrorMediaUrls.length > 0 ? mirrorMediaUrls : undefined,
idempotencyKey: idem,
}
: derivedRoute
? {
@ -275,6 +276,7 @@ export const sendHandlers: GatewayRequestHandlers = {
agentId: effectiveAgentId,
text: mirrorText || message,
mediaUrls: mirrorMediaUrls.length > 0 ? mirrorMediaUrls : undefined,
idempotencyKey: idem,
}
: undefined,
});

View File

@ -869,11 +869,15 @@ describe("deliverOutboundPayloads", () => {
sessionKey: "agent:main:main",
text: "caption",
mediaUrls: ["https://example.com/files/report.pdf?sig=1"],
idempotencyKey: "idem-deliver-1",
},
});
expect(mocks.appendAssistantMessageToSessionTranscript).toHaveBeenCalledWith(
expect.objectContaining({ text: "report.pdf" }),
expect.objectContaining({
text: "report.pdf",
idempotencyKey: "idem-deliver-1",
}),
);
});

View File

@ -38,6 +38,7 @@ import type { sendMessageWhatsApp } from "../../web/outbound.js";
import { throwIfAborted } from "./abort.js";
import { ackDelivery, enqueueDelivery, failDelivery } from "./delivery-queue.js";
import type { OutboundIdentity } from "./identity.js";
import type { DeliveryMirror } from "./mirror.js";
import type { NormalizedOutboundPayload } from "./payloads.js";
import { normalizeReplyPayloadsForDelivery } from "./payloads.js";
import { isPlainTextSurface, sanitizeForPlainText } from "./sanitize-text.js";
@ -237,16 +238,7 @@ type DeliverOutboundPayloadsCoreParams = {
onPayload?: (payload: NormalizedOutboundPayload) => void;
/** Session/agent context used for hooks and media local-root scoping. */
session?: OutboundSessionContext;
mirror?: {
sessionKey: string;
agentId?: string;
text?: string;
mediaUrls?: string[];
/** Whether this message is being sent in a group/channel context */
isGroup?: boolean;
/** Group or channel identifier for correlation with received events */
groupId?: string;
};
mirror?: DeliveryMirror;
silent?: boolean;
};
@ -820,6 +812,7 @@ async function deliverOutboundPayloadsCore(
agentId: params.mirror.agentId,
sessionKey: params.mirror.sessionKey,
text: mirrorText,
idempotencyKey: params.mirror.idempotencyKey,
});
}
}

View File

@ -4,6 +4,7 @@ import type { ReplyPayload } from "../../auto-reply/types.js";
import type { OpenClawConfig } from "../../config/config.js";
import { resolveStateDir } from "../../config/paths.js";
import { generateSecureUuid } from "../secure-random.js";
import type { OutboundMirror } from "./mirror.js";
import type { OutboundChannel } from "./targets.js";
const QUEUE_DIRNAME = "delivery-queue";
@ -18,13 +19,6 @@ const BACKOFF_MS: readonly number[] = [
600_000, // retry 4: 10m
];
type DeliveryMirrorPayload = {
sessionKey: string;
agentId?: string;
text?: string;
mediaUrls?: string[];
};
type QueuedDeliveryPayload = {
channel: Exclude<OutboundChannel, "none">;
to: string;
@ -40,7 +34,7 @@ type QueuedDeliveryPayload = {
bestEffort?: boolean;
gifPlayback?: boolean;
silent?: boolean;
mirror?: DeliveryMirrorPayload;
mirror?: OutboundMirror;
};
export interface QueuedDelivery extends QueuedDeliveryPayload {

View File

@ -15,6 +15,16 @@ vi.mock("../../channels/plugins/index.js", () => ({
vi.mock("../../agents/agent-scope.js", () => ({
resolveDefaultAgentId: () => "main",
resolveSessionAgentId: ({
sessionKey,
}: {
sessionKey?: string;
config?: unknown;
agentId?: string;
}) => {
const match = sessionKey?.match(/^agent:([^:]+)/i);
return match?.[1] ?? "main";
},
resolveAgentWorkspaceDir: () => "/tmp/openclaw-test-workspace",
}));
@ -71,6 +81,29 @@ describe("sendMessage", () => {
);
});
it("propagates the send idempotency key into mirrored transcript delivery", async () => {
await sendMessage({
cfg: {},
channel: "telegram",
to: "123456",
content: "hi",
idempotencyKey: "idem-send-1",
mirror: {
sessionKey: "agent:main:telegram:dm:123456",
},
});
expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith(
expect.objectContaining({
mirror: expect.objectContaining({
sessionKey: "agent:main:telegram:dm:123456",
text: "hi",
idempotencyKey: "idem-send-1",
}),
}),
);
});
it("recovers telegram plugin resolution so message/send does not fail with Unknown channel: telegram", async () => {
const telegramPlugin = {
outbound: { deliveryMode: "direct" },

View File

@ -16,6 +16,7 @@ import {
type OutboundDeliveryResult,
type OutboundSendDeps,
} from "./deliver.js";
import type { OutboundMirror } from "./mirror.js";
import { normalizeReplyPayloadsForDelivery } from "./payloads.js";
import { buildOutboundSessionContext } from "./session-context.js";
import { resolveOutboundTarget } from "./targets.js";
@ -47,12 +48,7 @@ type MessageSendParams = {
cfg?: OpenClawConfig;
gateway?: MessageGatewayOptions;
idempotencyKey?: string;
mirror?: {
sessionKey: string;
agentId?: string;
text?: string;
mediaUrls?: string[];
};
mirror?: OutboundMirror;
abortSignal?: AbortSignal;
silent?: boolean;
};
@ -232,6 +228,7 @@ export async function sendMessage(params: MessageSendParams): Promise<MessageSen
...params.mirror,
text: mirrorText || params.content,
mediaUrls: mirrorMediaUrls.length ? mirrorMediaUrls : undefined,
idempotencyKey: params.mirror.idempotencyKey ?? params.idempotencyKey,
}
: undefined,
});

View File

@ -0,0 +1,14 @@
export type OutboundMirror = {
sessionKey: string;
agentId?: string;
text?: string;
mediaUrls?: string[];
idempotencyKey?: string;
};
export type DeliveryMirror = OutboundMirror & {
/** Whether this message is being sent in a group/channel context */
isGroup?: boolean;
/** Group or channel identifier for correlation with received events */
groupId?: string;
};

View File

@ -6,6 +6,7 @@ const mocks = vi.hoisted(() => ({
sendMessage: vi.fn(),
sendPoll: vi.fn(),
getAgentScopedMediaLocalRoots: vi.fn(() => ["/tmp/agent-roots"]),
appendAssistantMessageToSessionTranscript: vi.fn(async () => ({ ok: true, sessionFile: "x" })),
}));
vi.mock("../../channels/plugins/message-actions.js", () => ({
@ -26,6 +27,10 @@ vi.mock("../../media/local-roots.js", async (importOriginal) => {
};
});
vi.mock("../../config/sessions.js", () => ({
appendAssistantMessageToSessionTranscript: mocks.appendAssistantMessageToSessionTranscript,
}));
import { executePollAction, executeSendAction } from "./outbound-send-service.js";
describe("executeSendAction", () => {
@ -35,6 +40,7 @@ describe("executeSendAction", () => {
mocks.sendPoll.mockClear();
mocks.getDefaultMediaLocalRoots.mockClear();
mocks.getAgentScopedMediaLocalRoots.mockClear();
mocks.appendAssistantMessageToSessionTranscript.mockClear();
});
it("forwards ctx.agentId to sendMessage on core outbound path", async () => {
@ -127,6 +133,41 @@ describe("executeSendAction", () => {
);
});
it("passes mirror idempotency keys through plugin-handled sends", async () => {
mocks.dispatchChannelMessageAction.mockResolvedValue({
ok: true,
value: { messageId: "msg-plugin" },
continuePrompt: "",
output: "",
sessionId: "s1",
model: "gpt-5.2",
usage: {},
});
await executeSendAction({
ctx: {
cfg: {},
channel: "discord",
params: { to: "channel:123", message: "hello" },
dryRun: false,
mirror: {
sessionKey: "agent:main:discord:channel:123",
idempotencyKey: "idem-plugin-send-1",
},
},
to: "channel:123",
message: "hello",
});
expect(mocks.appendAssistantMessageToSessionTranscript).toHaveBeenCalledWith(
expect.objectContaining({
sessionKey: "agent:main:discord:channel:123",
text: "hello",
idempotencyKey: "idem-plugin-send-1",
}),
);
});
it("forwards poll args to sendPoll on core outbound path", async () => {
mocks.dispatchChannelMessageAction.mockResolvedValue(null);
mocks.sendPoll.mockResolvedValue({

View File

@ -9,6 +9,7 @@ import { throwIfAborted } from "./abort.js";
import type { OutboundSendDeps } from "./deliver.js";
import type { MessagePollResult, MessageSendResult } from "./message.js";
import { sendMessage, sendPoll } from "./message.js";
import type { OutboundMirror } from "./mirror.js";
import { extractToolPayload } from "./tool-payload.js";
export type OutboundGatewayContext = {
@ -31,12 +32,7 @@ export type OutboundSendContext = {
toolContext?: ChannelThreadingToolContext;
deps?: OutboundSendDeps;
dryRun: boolean;
mirror?: {
sessionKey: string;
agentId?: string;
text?: string;
mediaUrls?: string[];
};
mirror?: OutboundMirror;
abortSignal?: AbortSignal;
silent?: boolean;
};
@ -115,6 +111,7 @@ export async function executeSendAction(params: {
sessionKey: params.ctx.mirror.sessionKey,
text: mirrorText,
mediaUrls: mirrorMediaUrls,
idempotencyKey: params.ctx.mirror.idempotencyKey,
});
},
});