From 8fb247c5286bd080df56adab102afc84bc8793cf Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Sun, 29 Mar 2026 22:00:25 -0700 Subject: [PATCH] refactor(tasks): guard executor-only producer writes (#57486) * refactor(tasks): add executor facade * refactor(tasks): extract delivery policy * refactor(tasks): route acp through executor * refactor(tasks): route subagents through executor * refactor(cron): split main and detached dispatch * refactor(tasks): guard executor-only producer writes --- src/gateway/server-methods/agent.ts | 5 +-- src/tasks/task-executor-boundary.test.ts | 56 ++++++++++++++++++++++++ 2 files changed, 58 insertions(+), 3 deletions(-) create mode 100644 src/tasks/task-executor-boundary.test.ts diff --git a/src/gateway/server-methods/agent.ts b/src/gateway/server-methods/agent.ts index a12021080a9..43d1617f159 100644 --- a/src/gateway/server-methods/agent.ts +++ b/src/gateway/server-methods/agent.ts @@ -27,7 +27,7 @@ import { classifySessionKeyShape, normalizeAgentId } from "../../routing/session import { defaultRuntime } from "../../runtime.js"; import { normalizeInputProvenance, type InputProvenance } from "../../sessions/input-provenance.js"; import { resolveSendPolicy } from "../../sessions/send-policy.js"; -import { createTaskRecord } from "../../tasks/task-registry.js"; +import { createRunningTaskRun } from "../../tasks/task-executor.js"; import { normalizeDeliveryContext, normalizeSessionDeliveryFields, @@ -191,7 +191,7 @@ function dispatchAgentRunFromGateway(params: { }) { if (params.ingressOpts.sessionKey?.trim()) { try { - createTaskRecord({ + createRunningTaskRun({ runtime: "cli", sourceId: params.runId, requesterSessionKey: params.ingressOpts.sessionKey, @@ -204,7 +204,6 @@ function dispatchAgentRunFromGateway(params: { childSessionKey: params.ingressOpts.sessionKey, runId: params.runId, task: params.ingressOpts.message, - status: "running", deliveryStatus: "not_applicable", startedAt: Date.now(), }); diff --git a/src/tasks/task-executor-boundary.test.ts b/src/tasks/task-executor-boundary.test.ts new file mode 100644 index 00000000000..b0c140f0487 --- /dev/null +++ b/src/tasks/task-executor-boundary.test.ts @@ -0,0 +1,56 @@ +import fs from "node:fs/promises"; +import path from "node:path"; +import { describe, expect, it } from "vitest"; + +const TASK_ROOT = path.resolve(import.meta.dirname); +const SRC_ROOT = path.resolve(TASK_ROOT, ".."); + +const RAW_TASK_MUTATORS = [ + "createTaskRecord", + "markTaskRunningByRunId", + "markTaskTerminalByRunId", + "markTaskTerminalById", + "setTaskRunDeliveryStatusByRunId", +] as const; + +const ALLOWED_CALLERS = new Set([ + "tasks/task-executor.ts", + "tasks/task-registry.ts", + "tasks/task-registry.maintenance.ts", +]); + +async function listSourceFiles(root: string): Promise { + const entries = await fs.readdir(root, { withFileTypes: true }); + const files: string[] = []; + for (const entry of entries) { + const fullPath = path.join(root, entry.name); + if (entry.isDirectory()) { + files.push(...(await listSourceFiles(fullPath))); + continue; + } + if (!entry.isFile() || !entry.name.endsWith(".ts") || entry.name.endsWith(".test.ts")) { + continue; + } + files.push(fullPath); + } + return files; +} + +describe("task executor boundary", () => { + it("keeps raw task lifecycle mutators behind task internals", async () => { + const offenders: string[] = []; + for (const file of await listSourceFiles(SRC_ROOT)) { + const relative = path.relative(SRC_ROOT, file).replaceAll(path.sep, "/"); + if (ALLOWED_CALLERS.has(relative)) { + continue; + } + const source = await fs.readFile(file, "utf8"); + for (const symbol of RAW_TASK_MUTATORS) { + if (source.includes(`${symbol}(`)) { + offenders.push(`${relative}:${symbol}`); + } + } + } + expect(offenders).toEqual([]); + }); +});