refactor(cron): split main and detached dispatch (#57482)

* refactor(tasks): add executor facade

* refactor(tasks): extract delivery policy

* refactor(tasks): route acp through executor

* refactor(tasks): route subagents through executor

* refactor(cron): split main and detached dispatch
This commit is contained in:
Vincent Koc 2026-03-29 21:59:55 -07:00 committed by GitHub
parent 4be290c15f
commit 1c9053802a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 166 additions and 124 deletions

View File

@ -1,7 +1,7 @@
import fs from "node:fs/promises";
import path from "node:path";
import { describe, expect, it, vi } from "vitest";
import * as taskRegistry from "../../tasks/task-registry.js";
import * as taskExecutor from "../../tasks/task-executor.js";
import { findTaskByRunId, resetTaskRegistryForTests } from "../../tasks/task-registry.js";
import { setupCronServiceSuite, writeCronStoreSnapshot } from "../service.test-harness.js";
import type { CronJob } from "../types.js";
@ -166,7 +166,7 @@ describe("cron service ops seam coverage", () => {
});
const createTaskRecordSpy = vi
.spyOn(taskRegistry, "createTaskRecord")
.spyOn(taskExecutor, "createRunningTaskRun")
.mockImplementation(() => {
throw new Error("disk full");
});
@ -210,7 +210,7 @@ describe("cron service ops seam coverage", () => {
});
const updateTaskRecordSpy = vi
.spyOn(taskRegistry, "markTaskTerminalById")
.spyOn(taskExecutor, "completeTaskRunByRunId")
.mockImplementation(() => {
throw new Error("disk full");
});

View File

@ -1,6 +1,10 @@
import { enqueueCommandInLane } from "../../process/command-queue.js";
import { CommandLane } from "../../process/lanes.js";
import { createTaskRecord, markTaskTerminalById } from "../../tasks/task-registry.js";
import {
completeTaskRunByRunId,
createRunningTaskRun,
failTaskRunByRunId,
} from "../../tasks/task-executor.js";
import type { CronJob, CronJobCreate, CronJobPatch } from "../types.js";
import { normalizeCronCreateDeliveryInput } from "./initial-delivery.js";
import {
@ -360,7 +364,7 @@ type PreparedManualRun =
ok: true;
ran: true;
jobId: string;
taskId?: string;
taskRunId?: string;
startedAt: number;
executionJob: CronJob;
}
@ -382,27 +386,32 @@ type ManualRunPreflightResult =
let nextManualRunId = 1;
function tryCreateManualTaskRecord(params: {
function createCronTaskRunId(jobId: string, startedAt: number): string {
return `cron:${jobId}:${startedAt}`;
}
function tryCreateManualTaskRun(params: {
state: CronServiceState;
job: CronJob;
startedAt: number;
}): string | undefined {
const runId = createCronTaskRunId(params.job.id, params.startedAt);
try {
return createTaskRecord({
createRunningTaskRun({
runtime: "cron",
sourceId: params.job.id,
requesterSessionKey: "",
childSessionKey: params.job.sessionKey,
agentId: params.job.agentId,
runId: `cron:${params.job.id}:${params.startedAt}`,
runId,
label: params.job.name,
task: params.job.name || params.job.id,
status: "running",
deliveryStatus: "not_applicable",
notifyPolicy: "silent",
startedAt: params.startedAt,
lastEventAt: params.startedAt,
}).taskId;
});
return runId;
} catch (error) {
params.state.deps.log.warn(
{ jobId: params.job.id, error },
@ -412,26 +421,33 @@ function tryCreateManualTaskRecord(params: {
}
}
function tryUpdateManualTaskRecord(
function tryFinishManualTaskRun(
state: CronServiceState,
params: {
taskId?: string;
taskRunId?: string;
coreResult: Awaited<ReturnType<typeof executeJobCoreWithTimeout>>;
endedAt: number;
},
): void {
if (!params.taskId) {
if (!params.taskRunId) {
return;
}
try {
markTaskTerminalById({
taskId: params.taskId,
if (params.coreResult.status === "ok" || params.coreResult.status === "skipped") {
completeTaskRunByRunId({
runId: params.taskRunId,
endedAt: params.endedAt,
lastEventAt: params.endedAt,
terminalSummary: params.coreResult.summary ?? undefined,
});
return;
}
failTaskRunByRunId({
runId: params.taskRunId,
status:
params.coreResult.status === "ok" || params.coreResult.status === "skipped"
? "succeeded"
: normalizeCronRunErrorText(params.coreResult.error) === "cron: job execution timed out"
? "timed_out"
: "failed",
normalizeCronRunErrorText(params.coreResult.error) === "cron: job execution timed out"
? "timed_out"
: "failed",
endedAt: params.endedAt,
lastEventAt: params.endedAt,
error:
@ -442,7 +458,7 @@ function tryUpdateManualTaskRecord(
});
} catch (error) {
state.deps.log.warn(
{ taskId: params.taskId, jobStatus: params.coreResult.status, error },
{ runId: params.taskRunId, jobStatus: params.coreResult.status, error },
"cron: failed to update task ledger record",
);
}
@ -517,7 +533,7 @@ async function prepareManualRun(
// force-reload from disk cannot start the same job concurrently.
await persist(state);
emit(state, { jobId: job.id, action: "started", runAtMs: preflight.now });
const taskId = tryCreateManualTaskRecord({
const taskRunId = tryCreateManualTaskRun({
state,
job,
startedAt: preflight.now,
@ -527,7 +543,7 @@ async function prepareManualRun(
ok: true,
ran: true,
jobId: job.id,
taskId,
taskRunId,
startedAt: preflight.now,
executionJob,
} as const;
@ -542,7 +558,7 @@ async function finishPreparedManualRun(
const executionJob = prepared.executionJob;
const startedAt = prepared.startedAt;
const jobId = prepared.jobId;
const taskId = prepared.taskId;
const taskRunId = prepared.taskRunId;
let coreResult: Awaited<ReturnType<typeof executeJobCoreWithTimeout>>;
try {
@ -551,8 +567,8 @@ async function finishPreparedManualRun(
coreResult = { status: "error", error: normalizeCronRunErrorText(err) };
}
const endedAt = state.deps.nowMs();
tryUpdateManualTaskRecord(state, {
taskId,
tryFinishManualTaskRun(state, {
taskRunId,
coreResult,
endedAt,
});

View File

@ -4,7 +4,7 @@ import { setupCronServiceSuite, writeCronStoreSnapshot } from "../../cron/servic
import { createCronServiceState } from "../../cron/service/state.js";
import { onTimer } from "../../cron/service/timer.js";
import type { CronJob } from "../../cron/types.js";
import * as taskRegistry from "../../tasks/task-registry.js";
import * as taskExecutor from "../../tasks/task-executor.js";
import { resetTaskRegistryForTests } from "../../tasks/task-registry.js";
const { logger, makeStorePath } = setupCronServiceSuite({
@ -96,7 +96,7 @@ describe("cron service timer seam coverage", () => {
});
const createTaskRecordSpy = vi
.spyOn(taskRegistry, "createTaskRecord")
.spyOn(taskExecutor, "createRunningTaskRun")
.mockImplementation(() => {
throw new Error("disk full");
});

View File

@ -2,7 +2,11 @@ import { resolveFailoverReasonFromError } from "../../agents/failover-error.js";
import type { CronConfig, CronRetryOn } from "../../config/types.cron.js";
import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js";
import { DEFAULT_AGENT_ID } from "../../routing/session-key.js";
import { createTaskRecord, markTaskTerminalById } from "../../tasks/task-registry.js";
import {
completeTaskRunByRunId,
createRunningTaskRun,
failTaskRunByRunId,
} from "../../tasks/task-executor.js";
import { resolveCronDeliveryPlan } from "../delivery.js";
import { sweepCronRunSessions } from "../session-reaper.js";
import type {
@ -47,7 +51,7 @@ const DEFAULT_FAILURE_ALERT_COOLDOWN_MS = 60 * 60_000; // 1 hour
type TimedCronRunOutcome = CronRunOutcome &
CronRunTelemetry & {
jobId: string;
taskId?: string;
taskRunId?: string;
delivered?: boolean;
deliveryAttempted?: boolean;
startedAt: number;
@ -120,27 +124,32 @@ export function normalizeCronRunErrorText(err: unknown): string {
return String(err);
}
function tryCreateCronTaskRecord(params: {
function createCronTaskRunId(jobId: string, startedAt: number): string {
return `cron:${jobId}:${startedAt}`;
}
function tryCreateCronTaskRun(params: {
state: CronServiceState;
job: CronJob;
startedAt: number;
}): string | undefined {
const runId = createCronTaskRunId(params.job.id, params.startedAt);
try {
return createTaskRecord({
createRunningTaskRun({
runtime: "cron",
sourceId: params.job.id,
requesterSessionKey: "",
childSessionKey: params.job.sessionKey,
agentId: params.job.agentId,
runId: `cron:${params.job.id}:${params.startedAt}`,
runId,
label: params.job.name,
task: params.job.name || params.job.id,
status: "running",
deliveryStatus: "not_applicable",
notifyPolicy: "silent",
startedAt: params.startedAt,
lastEventAt: params.startedAt,
}).taskId;
});
return runId;
} catch (error) {
params.state.deps.log.warn(
{ jobId: params.job.id, error },
@ -150,22 +159,27 @@ function tryCreateCronTaskRecord(params: {
}
}
function tryUpdateCronTaskRecord(
function tryFinishCronTaskRun(
state: CronServiceState,
result: Pick<TimedCronRunOutcome, "taskId" | "status" | "error" | "endedAt" | "summary">,
result: Pick<TimedCronRunOutcome, "taskRunId" | "status" | "error" | "endedAt" | "summary">,
): void {
if (!result.taskId) {
if (!result.taskRunId) {
return;
}
try {
markTaskTerminalById({
taskId: result.taskId,
if (result.status === "ok" || result.status === "skipped") {
completeTaskRunByRunId({
runId: result.taskRunId,
endedAt: result.endedAt,
lastEventAt: result.endedAt,
terminalSummary: result.summary ?? undefined,
});
return;
}
failTaskRunByRunId({
runId: result.taskRunId,
status:
result.status === "ok" || result.status === "skipped"
? "succeeded"
: normalizeCronRunErrorText(result.error) === timeoutErrorMessage()
? "timed_out"
: "failed",
normalizeCronRunErrorText(result.error) === timeoutErrorMessage() ? "timed_out" : "failed",
endedAt: result.endedAt,
lastEventAt: result.endedAt,
error: result.status === "error" ? normalizeCronRunErrorText(result.error) : undefined,
@ -173,7 +187,7 @@ function tryUpdateCronTaskRecord(
});
} catch (error) {
state.deps.log.warn(
{ taskId: result.taskId, jobStatus: result.status, error },
{ runId: result.taskRunId, jobStatus: result.status, error },
"cron: failed to update task ledger record",
);
}
@ -545,7 +559,7 @@ export function applyJobResult(
}
function applyOutcomeToStoredJob(state: CronServiceState, result: TimedCronRunOutcome): void {
tryUpdateCronTaskRecord(state, result);
tryFinishCronTaskRun(state, result);
const store = state.store;
if (!store) {
return;
@ -702,13 +716,13 @@ export async function onTimer(state: CronServiceState) {
job.state.runningAtMs = startedAt;
emit(state, { jobId: job.id, action: "started", runAtMs: startedAt });
const jobTimeoutMs = resolveCronJobTimeoutMs(job);
const taskId = tryCreateCronTaskRecord({ state, job, startedAt });
const taskRunId = tryCreateCronTaskRun({ state, job, startedAt });
try {
const result = await executeJobCoreWithTimeout(state, job);
return {
jobId: id,
taskId,
taskRunId,
...result,
startedAt,
endedAt: state.deps.nowMs(),
@ -721,7 +735,7 @@ export async function onTimer(state: CronServiceState) {
);
return {
jobId: id,
taskId,
taskRunId,
status: "error",
error: errorText,
startedAt,
@ -1006,7 +1020,7 @@ async function runStartupCatchupCandidate(
candidate: StartupCatchupCandidate,
): Promise<TimedCronRunOutcome> {
const startedAt = state.deps.nowMs();
const taskId = tryCreateCronTaskRecord({
const taskRunId = tryCreateCronTaskRun({
state,
job: candidate.job,
startedAt,
@ -1016,7 +1030,7 @@ async function runStartupCatchupCandidate(
const result = await executeJobCoreWithTimeout(state, candidate.job);
return {
jobId: candidate.jobId,
taskId,
taskRunId,
status: result.status,
error: result.error,
summary: result.summary,
@ -1032,7 +1046,7 @@ async function runStartupCatchupCandidate(
} catch (err) {
return {
jobId: candidate.jobId,
taskId,
taskRunId,
status: "error",
error: normalizeCronRunErrorText(err),
startedAt,
@ -1128,90 +1142,102 @@ export async function executeJobCore(
return resolveAbortError();
}
if (job.sessionTarget === "main") {
const text = resolveJobPayloadTextForMain(job);
if (!text) {
const kind = job.payload.kind;
return {
status: "skipped",
error:
kind === "systemEvent"
? "main job requires non-empty systemEvent text"
: 'main job requires payload.kind="systemEvent"',
};
}
// Preserve the job session namespace for main-target reminders so heartbeat
// routing can deliver follow-through in the originating channel/thread.
// Downstream gateway wiring canonicalizes/guards this key per agent.
const targetMainSessionKey = job.sessionKey;
state.deps.enqueueSystemEvent(text, {
agentId: job.agentId,
sessionKey: targetMainSessionKey,
contextKey: `cron:${job.id}`,
});
if (job.wakeMode === "now" && state.deps.runHeartbeatOnce) {
const reason = `cron:${job.id}`;
const maxWaitMs = state.deps.wakeNowHeartbeatBusyMaxWaitMs ?? 2 * 60_000;
const retryDelayMs = state.deps.wakeNowHeartbeatBusyRetryDelayMs ?? 250;
const waitStartedAt = state.deps.nowMs();
return await executeMainSessionCronJob(state, job, abortSignal, waitWithAbort);
}
let heartbeatResult: HeartbeatRunResult;
for (;;) {
return await executeDetachedCronJob(state, job, abortSignal, resolveAbortError);
}
async function executeMainSessionCronJob(
state: CronServiceState,
job: CronJob,
abortSignal: AbortSignal | undefined,
waitWithAbort: (ms: number) => Promise<void>,
): Promise<
CronRunOutcome & CronRunTelemetry & { delivered?: boolean; deliveryAttempted?: boolean }
> {
const text = resolveJobPayloadTextForMain(job);
if (!text) {
const kind = job.payload.kind;
return {
status: "skipped",
error:
kind === "systemEvent"
? "main job requires non-empty systemEvent text"
: 'main job requires payload.kind="systemEvent"',
};
}
const targetMainSessionKey = job.sessionKey;
state.deps.enqueueSystemEvent(text, {
agentId: job.agentId,
sessionKey: targetMainSessionKey,
contextKey: `cron:${job.id}`,
});
if (job.wakeMode === "now" && state.deps.runHeartbeatOnce) {
const reason = `cron:${job.id}`;
const maxWaitMs = state.deps.wakeNowHeartbeatBusyMaxWaitMs ?? 2 * 60_000;
const retryDelayMs = state.deps.wakeNowHeartbeatBusyRetryDelayMs ?? 250;
const waitStartedAt = state.deps.nowMs();
let heartbeatResult: HeartbeatRunResult;
for (;;) {
if (abortSignal?.aborted) {
return { status: "error", error: timeoutErrorMessage() };
}
heartbeatResult = await state.deps.runHeartbeatOnce({
reason,
agentId: job.agentId,
sessionKey: targetMainSessionKey,
heartbeat: { target: "last" },
});
if (heartbeatResult.status !== "skipped" || heartbeatResult.reason !== "requests-in-flight") {
break;
}
if (abortSignal?.aborted) {
return { status: "error", error: timeoutErrorMessage() };
}
if (state.deps.nowMs() - waitStartedAt > maxWaitMs) {
if (abortSignal?.aborted) {
return resolveAbortError();
return { status: "error", error: timeoutErrorMessage() };
}
heartbeatResult = await state.deps.runHeartbeatOnce({
state.deps.requestHeartbeatNow({
reason,
agentId: job.agentId,
sessionKey: targetMainSessionKey,
// Cron-triggered heartbeats should deliver to the last active channel.
// Without this override, heartbeat target defaults to "none" (since
// e2362d35) and cron main-session responses are silently swallowed.
// See: https://github.com/openclaw/openclaw/issues/28508
heartbeat: { target: "last" },
});
if (
heartbeatResult.status !== "skipped" ||
heartbeatResult.reason !== "requests-in-flight"
) {
break;
}
if (abortSignal?.aborted) {
return resolveAbortError();
}
if (state.deps.nowMs() - waitStartedAt > maxWaitMs) {
if (abortSignal?.aborted) {
return resolveAbortError();
}
state.deps.requestHeartbeatNow({
reason,
agentId: job.agentId,
sessionKey: targetMainSessionKey,
});
return { status: "ok", summary: text };
}
await waitWithAbort(retryDelayMs);
}
if (heartbeatResult.status === "ran") {
return { status: "ok", summary: text };
} else if (heartbeatResult.status === "skipped") {
return { status: "skipped", error: heartbeatResult.reason, summary: text };
} else {
return { status: "error", error: heartbeatResult.reason, summary: text };
}
} else {
if (abortSignal?.aborted) {
return resolveAbortError();
}
state.deps.requestHeartbeatNow({
reason: `cron:${job.id}`,
agentId: job.agentId,
sessionKey: targetMainSessionKey,
});
await waitWithAbort(retryDelayMs);
}
if (heartbeatResult.status === "ran") {
return { status: "ok", summary: text };
}
if (heartbeatResult.status === "skipped") {
return { status: "skipped", error: heartbeatResult.reason, summary: text };
}
return { status: "error", error: heartbeatResult.reason, summary: text };
}
if (abortSignal?.aborted) {
return { status: "error", error: timeoutErrorMessage() };
}
state.deps.requestHeartbeatNow({
reason: `cron:${job.id}`,
agentId: job.agentId,
sessionKey: targetMainSessionKey,
});
return { status: "ok", summary: text };
}
async function executeDetachedCronJob(
state: CronServiceState,
job: CronJob,
abortSignal: AbortSignal | undefined,
resolveAbortError: () => { status: "error"; error: string },
): Promise<
CronRunOutcome & CronRunTelemetry & { delivered?: boolean; deliveryAttempted?: boolean }
> {
if (job.payload.kind !== "agentTurn") {
return { status: "skipped", error: "isolated job requires payload.kind=agentTurn" };
}