refactor(tasks): extract delivery policy (#57475)

* refactor(tasks): add executor facade

* refactor(tasks): extract delivery policy
This commit is contained in:
Vincent Koc 2026-03-29 21:44:59 -07:00 committed by GitHub
parent 8623c28f1d
commit 817ac551b6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 242 additions and 119 deletions

View File

@ -0,0 +1,110 @@
import { describe, expect, it } from "vitest";
import {
formatTaskBlockedFollowupMessage,
formatTaskStateChangeMessage,
formatTaskTerminalMessage,
isTerminalTaskStatus,
shouldAutoDeliverTaskStateChange,
shouldAutoDeliverTaskTerminalUpdate,
shouldSuppressDuplicateTerminalDelivery,
} from "./task-executor-policy.js";
import type { TaskEventRecord, TaskRecord } from "./task-registry.types.js";
function createTask(partial: Partial<TaskRecord>): TaskRecord {
return {
taskId: partial.taskId ?? "task-1",
runtime: partial.runtime ?? "acp",
requesterSessionKey: partial.requesterSessionKey ?? "agent:main:main",
task: partial.task ?? "Investigate issue",
status: partial.status ?? "running",
deliveryStatus: partial.deliveryStatus ?? "pending",
notifyPolicy: partial.notifyPolicy ?? "done_only",
createdAt: partial.createdAt ?? 1,
...partial,
};
}
describe("task-executor-policy", () => {
it("identifies terminal statuses", () => {
expect(isTerminalTaskStatus("queued")).toBe(false);
expect(isTerminalTaskStatus("running")).toBe(false);
expect(isTerminalTaskStatus("succeeded")).toBe(true);
expect(isTerminalTaskStatus("failed")).toBe(true);
expect(isTerminalTaskStatus("timed_out")).toBe(true);
expect(isTerminalTaskStatus("cancelled")).toBe(true);
expect(isTerminalTaskStatus("lost")).toBe(true);
});
it("formats terminal, followup, and progress messages", () => {
const blockedTask = createTask({
status: "succeeded",
terminalOutcome: "blocked",
terminalSummary: "Needs login.",
runId: "run-1234567890",
label: "ACP import",
});
const progressEvent: TaskEventRecord = {
at: 10,
kind: "progress",
summary: "No output for 60s.",
};
expect(formatTaskTerminalMessage(blockedTask)).toBe(
"Background task blocked: ACP import (run run-1234). Needs login.",
);
expect(formatTaskBlockedFollowupMessage(blockedTask)).toBe(
"Task needs follow-up: ACP import (run run-1234). Needs login.",
);
expect(formatTaskStateChangeMessage(blockedTask, progressEvent)).toBe(
"Background task update: ACP import. No output for 60s.",
);
});
it("keeps delivery policy decisions explicit", () => {
expect(
shouldAutoDeliverTaskTerminalUpdate(
createTask({
status: "succeeded",
deliveryStatus: "pending",
notifyPolicy: "done_only",
}),
),
).toBe(true);
expect(
shouldAutoDeliverTaskTerminalUpdate(
createTask({
runtime: "subagent",
status: "succeeded",
deliveryStatus: "pending",
}),
),
).toBe(false);
expect(
shouldAutoDeliverTaskStateChange(
createTask({
status: "running",
notifyPolicy: "state_changes",
deliveryStatus: "pending",
}),
),
).toBe(true);
expect(
shouldAutoDeliverTaskStateChange(
createTask({
status: "failed",
notifyPolicy: "state_changes",
deliveryStatus: "pending",
}),
),
).toBe(false);
expect(
shouldSuppressDuplicateTerminalDelivery({
task: createTask({
runtime: "acp",
runId: "run-duplicate",
}),
preferredTaskId: "task-2",
}),
).toBe(true);
});
});

View File

@ -0,0 +1,110 @@
import type { TaskEventRecord, TaskRecord, TaskStatus } from "./task-registry.types.js";
export function isTerminalTaskStatus(status: TaskStatus): boolean {
return (
status === "succeeded" ||
status === "failed" ||
status === "timed_out" ||
status === "cancelled" ||
status === "lost"
);
}
function resolveTaskDisplayTitle(task: TaskRecord): string {
return (
task.label?.trim() ||
(task.runtime === "acp"
? "ACP background task"
: task.runtime === "subagent"
? "Subagent task"
: task.task.trim() || "Background task")
);
}
function resolveTaskRunLabel(task: TaskRecord): string {
return task.runId ? ` (run ${task.runId.slice(0, 8)})` : "";
}
export function formatTaskTerminalMessage(task: TaskRecord): string {
const title = resolveTaskDisplayTitle(task);
const runLabel = resolveTaskRunLabel(task);
const summary = task.terminalSummary?.trim();
if (task.status === "succeeded") {
if (task.terminalOutcome === "blocked") {
return summary
? `Background task blocked: ${title}${runLabel}. ${summary}`
: `Background task blocked: ${title}${runLabel}.`;
}
return summary
? `Background task done: ${title}${runLabel}. ${summary}`
: `Background task done: ${title}${runLabel}.`;
}
if (task.status === "timed_out") {
return `Background task timed out: ${title}${runLabel}.`;
}
if (task.status === "lost") {
return `Background task lost: ${title}${runLabel}. ${task.error ?? "Backing session disappeared."}`;
}
if (task.status === "cancelled") {
return `Background task cancelled: ${title}${runLabel}.`;
}
const error = task.error?.trim();
return error
? `Background task failed: ${title}${runLabel}. ${error}`
: `Background task failed: ${title}${runLabel}.`;
}
export function formatTaskBlockedFollowupMessage(task: TaskRecord): string | null {
if (task.status !== "succeeded" || task.terminalOutcome !== "blocked") {
return null;
}
const title = resolveTaskDisplayTitle(task);
const runLabel = resolveTaskRunLabel(task);
const summary = task.terminalSummary?.trim() || "Task is blocked and needs follow-up.";
return `Task needs follow-up: ${title}${runLabel}. ${summary}`;
}
export function formatTaskStateChangeMessage(
task: TaskRecord,
event: TaskEventRecord,
): string | null {
const title = resolveTaskDisplayTitle(task);
if (event.kind === "running") {
return `Background task started: ${title}.`;
}
if (event.kind === "progress") {
return event.summary ? `Background task update: ${title}. ${event.summary}` : null;
}
return null;
}
export function shouldAutoDeliverTaskTerminalUpdate(task: TaskRecord): boolean {
if (task.notifyPolicy === "silent") {
return false;
}
if (task.runtime === "subagent" && task.status !== "cancelled") {
return false;
}
if (!isTerminalTaskStatus(task.status)) {
return false;
}
return task.deliveryStatus === "pending";
}
export function shouldAutoDeliverTaskStateChange(task: TaskRecord): boolean {
return (
task.notifyPolicy === "state_changes" &&
task.deliveryStatus === "pending" &&
!isTerminalTaskStatus(task.status)
);
}
export function shouldSuppressDuplicateTerminalDelivery(params: {
task: TaskRecord;
preferredTaskId?: string;
}): boolean {
if (params.task.runtime !== "acp" || !params.task.runId?.trim()) {
return false;
}
return Boolean(params.preferredTaskId && params.preferredTaskId !== params.task.taskId);
}

View File

@ -9,6 +9,15 @@ import { createSubsystemLogger } from "../logging/subsystem.js";
import { parseAgentSessionKey } from "../routing/session-key.js";
import { normalizeDeliveryContext } from "../utils/delivery-context.js";
import { isDeliverableMessageChannel } from "../utils/message-channel.js";
import {
formatTaskBlockedFollowupMessage,
formatTaskStateChangeMessage,
formatTaskTerminalMessage,
isTerminalTaskStatus,
shouldAutoDeliverTaskStateChange,
shouldAutoDeliverTaskTerminalUpdate,
shouldSuppressDuplicateTerminalDelivery,
} from "./task-executor-policy.js";
import {
getTaskRegistryHooks,
getTaskRegistryStore,
@ -384,16 +393,6 @@ export function ensureTaskRegistryReady() {
ensureListener();
}
function isTerminalTaskStatus(status: TaskStatus): boolean {
return (
status === "succeeded" ||
status === "failed" ||
status === "timed_out" ||
status === "cancelled" ||
status === "lost"
);
}
function updateTask(taskId: string, patch: Partial<TaskRecord>): TaskRecord | null {
const current = tasks.get(taskId);
if (!current) {
@ -441,43 +440,6 @@ function getTaskDeliveryState(taskId: string): TaskDeliveryState | undefined {
return state ? cloneTaskDeliveryState(state) : undefined;
}
function formatTaskTerminalEvent(task: TaskRecord): string {
// User-facing task notifications stay intentionally terse. Detailed runtime chatter lives
// in task metadata for inspection, not in the default channel ping.
const title =
task.label?.trim() ||
(task.runtime === "acp"
? "ACP background task"
: task.runtime === "subagent"
? "Subagent task"
: task.task.trim() || "Background task");
const runLabel = task.runId ? ` (run ${task.runId.slice(0, 8)})` : "";
const summary = task.terminalSummary?.trim();
if (task.status === "succeeded") {
if (task.terminalOutcome === "blocked") {
return summary
? `Background task blocked: ${title}${runLabel}. ${summary}`
: `Background task blocked: ${title}${runLabel}.`;
}
return summary
? `Background task done: ${title}${runLabel}. ${summary}`
: `Background task done: ${title}${runLabel}.`;
}
if (task.status === "timed_out") {
return `Background task timed out: ${title}${runLabel}.`;
}
if (task.status === "lost") {
return `Background task lost: ${title}${runLabel}. ${task.error ?? "Backing session disappeared."}`;
}
if (task.status === "cancelled") {
return `Background task cancelled: ${title}${runLabel}.`;
}
const error = task.error?.trim();
return error
? `Background task failed: ${title}${runLabel}. ${error}`
: `Background task failed: ${title}${runLabel}.`;
}
function canDeliverTaskToRequesterOrigin(task: TaskRecord): boolean {
const origin = normalizeDeliveryContext(taskDeliveryStates.get(task.taskId)?.requesterOrigin);
const channel = origin?.channel?.trim();
@ -503,23 +465,15 @@ function queueTaskSystemEvent(task: TaskRecord, text: string) {
}
function queueBlockedTaskFollowup(task: TaskRecord) {
if (task.status !== "succeeded" || task.terminalOutcome !== "blocked") {
const followupText = formatTaskBlockedFollowupMessage(task);
if (!followupText) {
return false;
}
const requesterSessionKey = task.requesterSessionKey.trim();
if (!requesterSessionKey) {
return false;
}
const title =
task.label?.trim() ||
(task.runtime === "acp"
? "ACP background task"
: task.runtime === "subagent"
? "Subagent task"
: task.task.trim() || "Background task");
const runLabel = task.runId ? ` (run ${task.runId.slice(0, 8)})` : "";
const summary = task.terminalSummary?.trim() || "Task is blocked and needs follow-up.";
enqueueSystemEvent(`Task needs follow-up: ${title}${runLabel}. ${summary}`, {
enqueueSystemEvent(followupText, {
sessionKey: requesterSessionKey,
contextKey: `task:${task.taskId}:blocked-followup`,
deliveryContext: taskDeliveryStates.get(task.taskId)?.requesterOrigin,
@ -531,66 +485,10 @@ function queueBlockedTaskFollowup(task: TaskRecord) {
return true;
}
function formatTaskStateChangeEvent(task: TaskRecord, event: TaskEventRecord): string | null {
const title =
task.label?.trim() ||
(task.runtime === "acp"
? "ACP background task"
: task.runtime === "subagent"
? "Subagent task"
: task.task.trim() || "Background task");
if (event.kind === "running") {
return `Background task started: ${title}.`;
}
if (event.kind === "progress") {
return event.summary ? `Background task update: ${title}. ${event.summary}` : null;
}
return null;
}
function shouldAutoDeliverTaskUpdate(task: TaskRecord): boolean {
if (task.notifyPolicy === "silent") {
return false;
}
if (task.runtime === "subagent" && task.status !== "cancelled") {
return false;
}
if (
task.status !== "succeeded" &&
task.status !== "failed" &&
task.status !== "timed_out" &&
task.status !== "lost" &&
task.status !== "cancelled"
) {
return false;
}
return task.deliveryStatus === "pending";
}
function shouldAutoDeliverTaskStateChange(task: TaskRecord): boolean {
return (
task.notifyPolicy === "state_changes" &&
task.deliveryStatus === "pending" &&
task.status !== "succeeded" &&
task.status !== "failed" &&
task.status !== "timed_out" &&
task.status !== "lost" &&
task.status !== "cancelled"
);
}
function shouldSuppressDuplicateTerminalDelivery(task: TaskRecord): boolean {
if (task.runtime !== "acp" || !task.runId?.trim()) {
return false;
}
const preferred = pickPreferredRunIdTask(getTasksByRunId(task.runId));
return Boolean(preferred && preferred.taskId !== task.taskId);
}
export async function maybeDeliverTaskTerminalUpdate(taskId: string): Promise<TaskRecord | null> {
ensureTaskRegistryReady();
const current = tasks.get(taskId);
if (!current || !shouldAutoDeliverTaskUpdate(current)) {
if (!current || !shouldAutoDeliverTaskTerminalUpdate(current)) {
return current ? cloneTaskRecord(current) : null;
}
if (tasksWithPendingDelivery.has(taskId)) {
@ -599,10 +497,15 @@ export async function maybeDeliverTaskTerminalUpdate(taskId: string): Promise<Ta
tasksWithPendingDelivery.add(taskId);
try {
const latest = tasks.get(taskId);
if (!latest || !shouldAutoDeliverTaskUpdate(latest)) {
if (!latest || !shouldAutoDeliverTaskTerminalUpdate(latest)) {
return latest ? cloneTaskRecord(latest) : null;
}
if (shouldSuppressDuplicateTerminalDelivery(latest)) {
const preferred = latest.runId
? pickPreferredRunIdTask(getTasksByRunId(latest.runId))
: undefined;
if (
shouldSuppressDuplicateTerminalDelivery({ task: latest, preferredTaskId: preferred?.taskId })
) {
return updateTask(taskId, {
deliveryStatus: "not_applicable",
lastEventAt: Date.now(),
@ -614,7 +517,7 @@ export async function maybeDeliverTaskTerminalUpdate(taskId: string): Promise<Ta
lastEventAt: Date.now(),
});
}
const eventText = formatTaskTerminalEvent(latest);
const eventText = formatTaskTerminalMessage(latest);
if (!canDeliverTaskToRequesterOrigin(latest)) {
try {
queueTaskSystemEvent(latest, eventText);
@ -704,7 +607,7 @@ export async function maybeDeliverTaskStateChangeUpdate(
if (!latestEvent || (deliveryState?.lastNotifiedEventAt ?? 0) >= latestEvent.at) {
return cloneTaskRecord(current);
}
const eventText = formatTaskStateChangeEvent(current, latestEvent);
const eventText = formatTaskStateChangeMessage(current, latestEvent);
if (!eventText) {
return cloneTaskRecord(current);
}