From 1c9053802a98fbcd42bda03c3d5fa301b0e65318 Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Sun, 29 Mar 2026 21:59:55 -0700 Subject: [PATCH] refactor(cron): split main and detached dispatch (#57482) * refactor(tasks): add executor facade * refactor(tasks): extract delivery policy * refactor(tasks): route acp through executor * refactor(tasks): route subagents through executor * refactor(cron): split main and detached dispatch --- src/cron/service/ops.test.ts | 6 +- src/cron/service/ops.ts | 62 ++++++---- src/cron/service/timer.test.ts | 4 +- src/cron/service/timer.ts | 218 ++++++++++++++++++--------------- 4 files changed, 166 insertions(+), 124 deletions(-) diff --git a/src/cron/service/ops.test.ts b/src/cron/service/ops.test.ts index b2a1f143cd2..108beffcfc8 100644 --- a/src/cron/service/ops.test.ts +++ b/src/cron/service/ops.test.ts @@ -1,7 +1,7 @@ import fs from "node:fs/promises"; import path from "node:path"; import { describe, expect, it, vi } from "vitest"; -import * as taskRegistry from "../../tasks/task-registry.js"; +import * as taskExecutor from "../../tasks/task-executor.js"; import { findTaskByRunId, resetTaskRegistryForTests } from "../../tasks/task-registry.js"; import { setupCronServiceSuite, writeCronStoreSnapshot } from "../service.test-harness.js"; import type { CronJob } from "../types.js"; @@ -166,7 +166,7 @@ describe("cron service ops seam coverage", () => { }); const createTaskRecordSpy = vi - .spyOn(taskRegistry, "createTaskRecord") + .spyOn(taskExecutor, "createRunningTaskRun") .mockImplementation(() => { throw new Error("disk full"); }); @@ -210,7 +210,7 @@ describe("cron service ops seam coverage", () => { }); const updateTaskRecordSpy = vi - .spyOn(taskRegistry, "markTaskTerminalById") + .spyOn(taskExecutor, "completeTaskRunByRunId") .mockImplementation(() => { throw new Error("disk full"); }); diff --git a/src/cron/service/ops.ts b/src/cron/service/ops.ts index 97dea355a2d..1daeeccdf80 100644 --- a/src/cron/service/ops.ts +++ b/src/cron/service/ops.ts @@ -1,6 +1,10 @@ import { enqueueCommandInLane } from "../../process/command-queue.js"; import { CommandLane } from "../../process/lanes.js"; -import { createTaskRecord, markTaskTerminalById } from "../../tasks/task-registry.js"; +import { + completeTaskRunByRunId, + createRunningTaskRun, + failTaskRunByRunId, +} from "../../tasks/task-executor.js"; import type { CronJob, CronJobCreate, CronJobPatch } from "../types.js"; import { normalizeCronCreateDeliveryInput } from "./initial-delivery.js"; import { @@ -360,7 +364,7 @@ type PreparedManualRun = ok: true; ran: true; jobId: string; - taskId?: string; + taskRunId?: string; startedAt: number; executionJob: CronJob; } @@ -382,27 +386,32 @@ type ManualRunPreflightResult = let nextManualRunId = 1; -function tryCreateManualTaskRecord(params: { +function createCronTaskRunId(jobId: string, startedAt: number): string { + return `cron:${jobId}:${startedAt}`; +} + +function tryCreateManualTaskRun(params: { state: CronServiceState; job: CronJob; startedAt: number; }): string | undefined { + const runId = createCronTaskRunId(params.job.id, params.startedAt); try { - return createTaskRecord({ + createRunningTaskRun({ runtime: "cron", sourceId: params.job.id, requesterSessionKey: "", childSessionKey: params.job.sessionKey, agentId: params.job.agentId, - runId: `cron:${params.job.id}:${params.startedAt}`, + runId, label: params.job.name, task: params.job.name || params.job.id, - status: "running", deliveryStatus: "not_applicable", notifyPolicy: "silent", startedAt: params.startedAt, lastEventAt: params.startedAt, - }).taskId; + }); + return runId; } catch (error) { params.state.deps.log.warn( { jobId: params.job.id, error }, @@ -412,26 +421,33 @@ function tryCreateManualTaskRecord(params: { } } -function tryUpdateManualTaskRecord( +function tryFinishManualTaskRun( state: CronServiceState, params: { - taskId?: string; + taskRunId?: string; coreResult: Awaited>; endedAt: number; }, ): void { - if (!params.taskId) { + if (!params.taskRunId) { return; } try { - markTaskTerminalById({ - taskId: params.taskId, + if (params.coreResult.status === "ok" || params.coreResult.status === "skipped") { + completeTaskRunByRunId({ + runId: params.taskRunId, + endedAt: params.endedAt, + lastEventAt: params.endedAt, + terminalSummary: params.coreResult.summary ?? undefined, + }); + return; + } + failTaskRunByRunId({ + runId: params.taskRunId, status: - params.coreResult.status === "ok" || params.coreResult.status === "skipped" - ? "succeeded" - : normalizeCronRunErrorText(params.coreResult.error) === "cron: job execution timed out" - ? "timed_out" - : "failed", + normalizeCronRunErrorText(params.coreResult.error) === "cron: job execution timed out" + ? "timed_out" + : "failed", endedAt: params.endedAt, lastEventAt: params.endedAt, error: @@ -442,7 +458,7 @@ function tryUpdateManualTaskRecord( }); } catch (error) { state.deps.log.warn( - { taskId: params.taskId, jobStatus: params.coreResult.status, error }, + { runId: params.taskRunId, jobStatus: params.coreResult.status, error }, "cron: failed to update task ledger record", ); } @@ -517,7 +533,7 @@ async function prepareManualRun( // force-reload from disk cannot start the same job concurrently. await persist(state); emit(state, { jobId: job.id, action: "started", runAtMs: preflight.now }); - const taskId = tryCreateManualTaskRecord({ + const taskRunId = tryCreateManualTaskRun({ state, job, startedAt: preflight.now, @@ -527,7 +543,7 @@ async function prepareManualRun( ok: true, ran: true, jobId: job.id, - taskId, + taskRunId, startedAt: preflight.now, executionJob, } as const; @@ -542,7 +558,7 @@ async function finishPreparedManualRun( const executionJob = prepared.executionJob; const startedAt = prepared.startedAt; const jobId = prepared.jobId; - const taskId = prepared.taskId; + const taskRunId = prepared.taskRunId; let coreResult: Awaited>; try { @@ -551,8 +567,8 @@ async function finishPreparedManualRun( coreResult = { status: "error", error: normalizeCronRunErrorText(err) }; } const endedAt = state.deps.nowMs(); - tryUpdateManualTaskRecord(state, { - taskId, + tryFinishManualTaskRun(state, { + taskRunId, coreResult, endedAt, }); diff --git a/src/cron/service/timer.test.ts b/src/cron/service/timer.test.ts index 357a8422515..e6009c42fdc 100644 --- a/src/cron/service/timer.test.ts +++ b/src/cron/service/timer.test.ts @@ -4,7 +4,7 @@ import { setupCronServiceSuite, writeCronStoreSnapshot } from "../../cron/servic import { createCronServiceState } from "../../cron/service/state.js"; import { onTimer } from "../../cron/service/timer.js"; import type { CronJob } from "../../cron/types.js"; -import * as taskRegistry from "../../tasks/task-registry.js"; +import * as taskExecutor from "../../tasks/task-executor.js"; import { resetTaskRegistryForTests } from "../../tasks/task-registry.js"; const { logger, makeStorePath } = setupCronServiceSuite({ @@ -96,7 +96,7 @@ describe("cron service timer seam coverage", () => { }); const createTaskRecordSpy = vi - .spyOn(taskRegistry, "createTaskRecord") + .spyOn(taskExecutor, "createRunningTaskRun") .mockImplementation(() => { throw new Error("disk full"); }); diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index d145fa8f842..2e167cf6d72 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -2,7 +2,11 @@ import { resolveFailoverReasonFromError } from "../../agents/failover-error.js"; import type { CronConfig, CronRetryOn } from "../../config/types.cron.js"; import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js"; import { DEFAULT_AGENT_ID } from "../../routing/session-key.js"; -import { createTaskRecord, markTaskTerminalById } from "../../tasks/task-registry.js"; +import { + completeTaskRunByRunId, + createRunningTaskRun, + failTaskRunByRunId, +} from "../../tasks/task-executor.js"; import { resolveCronDeliveryPlan } from "../delivery.js"; import { sweepCronRunSessions } from "../session-reaper.js"; import type { @@ -47,7 +51,7 @@ const DEFAULT_FAILURE_ALERT_COOLDOWN_MS = 60 * 60_000; // 1 hour type TimedCronRunOutcome = CronRunOutcome & CronRunTelemetry & { jobId: string; - taskId?: string; + taskRunId?: string; delivered?: boolean; deliveryAttempted?: boolean; startedAt: number; @@ -120,27 +124,32 @@ export function normalizeCronRunErrorText(err: unknown): string { return String(err); } -function tryCreateCronTaskRecord(params: { +function createCronTaskRunId(jobId: string, startedAt: number): string { + return `cron:${jobId}:${startedAt}`; +} + +function tryCreateCronTaskRun(params: { state: CronServiceState; job: CronJob; startedAt: number; }): string | undefined { + const runId = createCronTaskRunId(params.job.id, params.startedAt); try { - return createTaskRecord({ + createRunningTaskRun({ runtime: "cron", sourceId: params.job.id, requesterSessionKey: "", childSessionKey: params.job.sessionKey, agentId: params.job.agentId, - runId: `cron:${params.job.id}:${params.startedAt}`, + runId, label: params.job.name, task: params.job.name || params.job.id, - status: "running", deliveryStatus: "not_applicable", notifyPolicy: "silent", startedAt: params.startedAt, lastEventAt: params.startedAt, - }).taskId; + }); + return runId; } catch (error) { params.state.deps.log.warn( { jobId: params.job.id, error }, @@ -150,22 +159,27 @@ function tryCreateCronTaskRecord(params: { } } -function tryUpdateCronTaskRecord( +function tryFinishCronTaskRun( state: CronServiceState, - result: Pick, + result: Pick, ): void { - if (!result.taskId) { + if (!result.taskRunId) { return; } try { - markTaskTerminalById({ - taskId: result.taskId, + if (result.status === "ok" || result.status === "skipped") { + completeTaskRunByRunId({ + runId: result.taskRunId, + endedAt: result.endedAt, + lastEventAt: result.endedAt, + terminalSummary: result.summary ?? undefined, + }); + return; + } + failTaskRunByRunId({ + runId: result.taskRunId, status: - result.status === "ok" || result.status === "skipped" - ? "succeeded" - : normalizeCronRunErrorText(result.error) === timeoutErrorMessage() - ? "timed_out" - : "failed", + normalizeCronRunErrorText(result.error) === timeoutErrorMessage() ? "timed_out" : "failed", endedAt: result.endedAt, lastEventAt: result.endedAt, error: result.status === "error" ? normalizeCronRunErrorText(result.error) : undefined, @@ -173,7 +187,7 @@ function tryUpdateCronTaskRecord( }); } catch (error) { state.deps.log.warn( - { taskId: result.taskId, jobStatus: result.status, error }, + { runId: result.taskRunId, jobStatus: result.status, error }, "cron: failed to update task ledger record", ); } @@ -545,7 +559,7 @@ export function applyJobResult( } function applyOutcomeToStoredJob(state: CronServiceState, result: TimedCronRunOutcome): void { - tryUpdateCronTaskRecord(state, result); + tryFinishCronTaskRun(state, result); const store = state.store; if (!store) { return; @@ -702,13 +716,13 @@ export async function onTimer(state: CronServiceState) { job.state.runningAtMs = startedAt; emit(state, { jobId: job.id, action: "started", runAtMs: startedAt }); const jobTimeoutMs = resolveCronJobTimeoutMs(job); - const taskId = tryCreateCronTaskRecord({ state, job, startedAt }); + const taskRunId = tryCreateCronTaskRun({ state, job, startedAt }); try { const result = await executeJobCoreWithTimeout(state, job); return { jobId: id, - taskId, + taskRunId, ...result, startedAt, endedAt: state.deps.nowMs(), @@ -721,7 +735,7 @@ export async function onTimer(state: CronServiceState) { ); return { jobId: id, - taskId, + taskRunId, status: "error", error: errorText, startedAt, @@ -1006,7 +1020,7 @@ async function runStartupCatchupCandidate( candidate: StartupCatchupCandidate, ): Promise { const startedAt = state.deps.nowMs(); - const taskId = tryCreateCronTaskRecord({ + const taskRunId = tryCreateCronTaskRun({ state, job: candidate.job, startedAt, @@ -1016,7 +1030,7 @@ async function runStartupCatchupCandidate( const result = await executeJobCoreWithTimeout(state, candidate.job); return { jobId: candidate.jobId, - taskId, + taskRunId, status: result.status, error: result.error, summary: result.summary, @@ -1032,7 +1046,7 @@ async function runStartupCatchupCandidate( } catch (err) { return { jobId: candidate.jobId, - taskId, + taskRunId, status: "error", error: normalizeCronRunErrorText(err), startedAt, @@ -1128,90 +1142,102 @@ export async function executeJobCore( return resolveAbortError(); } if (job.sessionTarget === "main") { - const text = resolveJobPayloadTextForMain(job); - if (!text) { - const kind = job.payload.kind; - return { - status: "skipped", - error: - kind === "systemEvent" - ? "main job requires non-empty systemEvent text" - : 'main job requires payload.kind="systemEvent"', - }; - } - // Preserve the job session namespace for main-target reminders so heartbeat - // routing can deliver follow-through in the originating channel/thread. - // Downstream gateway wiring canonicalizes/guards this key per agent. - const targetMainSessionKey = job.sessionKey; - state.deps.enqueueSystemEvent(text, { - agentId: job.agentId, - sessionKey: targetMainSessionKey, - contextKey: `cron:${job.id}`, - }); - if (job.wakeMode === "now" && state.deps.runHeartbeatOnce) { - const reason = `cron:${job.id}`; - const maxWaitMs = state.deps.wakeNowHeartbeatBusyMaxWaitMs ?? 2 * 60_000; - const retryDelayMs = state.deps.wakeNowHeartbeatBusyRetryDelayMs ?? 250; - const waitStartedAt = state.deps.nowMs(); + return await executeMainSessionCronJob(state, job, abortSignal, waitWithAbort); + } - let heartbeatResult: HeartbeatRunResult; - for (;;) { + return await executeDetachedCronJob(state, job, abortSignal, resolveAbortError); +} + +async function executeMainSessionCronJob( + state: CronServiceState, + job: CronJob, + abortSignal: AbortSignal | undefined, + waitWithAbort: (ms: number) => Promise, +): Promise< + CronRunOutcome & CronRunTelemetry & { delivered?: boolean; deliveryAttempted?: boolean } +> { + const text = resolveJobPayloadTextForMain(job); + if (!text) { + const kind = job.payload.kind; + return { + status: "skipped", + error: + kind === "systemEvent" + ? "main job requires non-empty systemEvent text" + : 'main job requires payload.kind="systemEvent"', + }; + } + const targetMainSessionKey = job.sessionKey; + state.deps.enqueueSystemEvent(text, { + agentId: job.agentId, + sessionKey: targetMainSessionKey, + contextKey: `cron:${job.id}`, + }); + if (job.wakeMode === "now" && state.deps.runHeartbeatOnce) { + const reason = `cron:${job.id}`; + const maxWaitMs = state.deps.wakeNowHeartbeatBusyMaxWaitMs ?? 2 * 60_000; + const retryDelayMs = state.deps.wakeNowHeartbeatBusyRetryDelayMs ?? 250; + const waitStartedAt = state.deps.nowMs(); + + let heartbeatResult: HeartbeatRunResult; + for (;;) { + if (abortSignal?.aborted) { + return { status: "error", error: timeoutErrorMessage() }; + } + heartbeatResult = await state.deps.runHeartbeatOnce({ + reason, + agentId: job.agentId, + sessionKey: targetMainSessionKey, + heartbeat: { target: "last" }, + }); + if (heartbeatResult.status !== "skipped" || heartbeatResult.reason !== "requests-in-flight") { + break; + } + if (abortSignal?.aborted) { + return { status: "error", error: timeoutErrorMessage() }; + } + if (state.deps.nowMs() - waitStartedAt > maxWaitMs) { if (abortSignal?.aborted) { - return resolveAbortError(); + return { status: "error", error: timeoutErrorMessage() }; } - heartbeatResult = await state.deps.runHeartbeatOnce({ + state.deps.requestHeartbeatNow({ reason, agentId: job.agentId, sessionKey: targetMainSessionKey, - // Cron-triggered heartbeats should deliver to the last active channel. - // Without this override, heartbeat target defaults to "none" (since - // e2362d35) and cron main-session responses are silently swallowed. - // See: https://github.com/openclaw/openclaw/issues/28508 - heartbeat: { target: "last" }, }); - if ( - heartbeatResult.status !== "skipped" || - heartbeatResult.reason !== "requests-in-flight" - ) { - break; - } - if (abortSignal?.aborted) { - return resolveAbortError(); - } - if (state.deps.nowMs() - waitStartedAt > maxWaitMs) { - if (abortSignal?.aborted) { - return resolveAbortError(); - } - state.deps.requestHeartbeatNow({ - reason, - agentId: job.agentId, - sessionKey: targetMainSessionKey, - }); - return { status: "ok", summary: text }; - } - await waitWithAbort(retryDelayMs); - } - - if (heartbeatResult.status === "ran") { return { status: "ok", summary: text }; - } else if (heartbeatResult.status === "skipped") { - return { status: "skipped", error: heartbeatResult.reason, summary: text }; - } else { - return { status: "error", error: heartbeatResult.reason, summary: text }; } - } else { - if (abortSignal?.aborted) { - return resolveAbortError(); - } - state.deps.requestHeartbeatNow({ - reason: `cron:${job.id}`, - agentId: job.agentId, - sessionKey: targetMainSessionKey, - }); + await waitWithAbort(retryDelayMs); + } + + if (heartbeatResult.status === "ran") { return { status: "ok", summary: text }; } + if (heartbeatResult.status === "skipped") { + return { status: "skipped", error: heartbeatResult.reason, summary: text }; + } + return { status: "error", error: heartbeatResult.reason, summary: text }; } + if (abortSignal?.aborted) { + return { status: "error", error: timeoutErrorMessage() }; + } + state.deps.requestHeartbeatNow({ + reason: `cron:${job.id}`, + agentId: job.agentId, + sessionKey: targetMainSessionKey, + }); + return { status: "ok", summary: text }; +} + +async function executeDetachedCronJob( + state: CronServiceState, + job: CronJob, + abortSignal: AbortSignal | undefined, + resolveAbortError: () => { status: "error"; error: string }, +): Promise< + CronRunOutcome & CronRunTelemetry & { delivered?: boolean; deliveryAttempted?: boolean } +> { if (job.payload.kind !== "agentTurn") { return { status: "skipped", error: "isolated job requires payload.kind=agentTurn" }; }