From 7d1575b5df79bc34e91a0c017a8e8d4e0bcc009a Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 4 Apr 2026 16:56:16 +0900 Subject: [PATCH] fix: reconcile stale cron and chat-backed tasks (#60310) (thanks @lml2468) --- CHANGELOG.md | 1 + src/cron/active-jobs.ts | 38 +++++++++++ src/cron/service/timer.ts | 5 ++ ...k-registry.maintenance.issue-60299.test.ts | 63 +++++++++++++------ src/tasks/task-registry.maintenance.ts | 44 +++++++------ src/tasks/task-registry.test.ts | 2 + 6 files changed, 114 insertions(+), 39 deletions(-) create mode 100644 src/cron/active-jobs.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 5ef47da26a8..0d5477a2e88 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -60,6 +60,7 @@ Docs: https://docs.openclaw.ai - Update/npm: prefer the npm binary that owns the installed global OpenClaw prefix so mixed Homebrew-plus-nvm setups update the right install. (#60153) Thanks @jayeshp19. - Gateway/plugin routes: keep gateway-auth plugin runtime routes on write-only fallback scopes unless a trusted-proxy caller explicitly declares narrower `x-openclaw-scopes`, so plugin HTTP handlers no longer mint admin-level runtime scopes on missing or untrusted HTTP scope headers. (#59815) Thanks @pgondhi987. - Agents/exec approvals: let `exec-approvals.json` agent security override stricter gateway tool defaults so approved subagents can use `security: "full"` without falling back to allowlist enforcement again. (#60310) Thanks @lml2468. +- Tasks/maintenance: reconcile stale cron and chat-backed CLI task rows against live cron-job and agent-run ownership instead of treating any persisted session key as proof that the task is still running. (#60310) Thanks @lml2468. ## 2026.4.2 diff --git a/src/cron/active-jobs.ts b/src/cron/active-jobs.ts new file mode 100644 index 00000000000..c3dcdf6db3d --- /dev/null +++ b/src/cron/active-jobs.ts @@ -0,0 +1,38 @@ +import { resolveGlobalSingleton } from "../shared/global-singleton.js"; + +type CronActiveJobState = { + activeJobIds: Set; +}; + +const CRON_ACTIVE_JOB_STATE_KEY = Symbol.for("openclaw.cron.activeJobs"); + +function getCronActiveJobState(): CronActiveJobState { + return resolveGlobalSingleton(CRON_ACTIVE_JOB_STATE_KEY, () => ({ + activeJobIds: new Set(), + })); +} + +export function markCronJobActive(jobId: string) { + if (!jobId) { + return; + } + getCronActiveJobState().activeJobIds.add(jobId); +} + +export function clearCronJobActive(jobId: string) { + if (!jobId) { + return; + } + getCronActiveJobState().activeJobIds.delete(jobId); +} + +export function isCronJobActive(jobId: string) { + if (!jobId) { + return false; + } + return getCronActiveJobState().activeJobIds.has(jobId); +} + +export function resetCronActiveJobsForTests() { + getCronActiveJobState().activeJobIds.clear(); +} diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index edd9f93c2b6..e3aa39264a0 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -7,6 +7,7 @@ import { createRunningTaskRun, failTaskRunByRunId, } from "../../tasks/task-executor.js"; +import { clearCronJobActive, markCronJobActive } from "../active-jobs.js"; import { resolveCronDeliveryPlan } from "../delivery-plan.js"; import { sweepCronRunSessions } from "../session-reaper.js"; import type { @@ -561,6 +562,7 @@ export function applyJobResult( } function applyOutcomeToStoredJob(state: CronServiceState, result: TimedCronRunOutcome): void { + clearCronJobActive(result.jobId); tryFinishCronTaskRun(state, result); const store = state.store; if (!store) { @@ -716,6 +718,7 @@ export async function onTimer(state: CronServiceState) { const { id, job } = params; const startedAt = state.deps.nowMs(); job.state.runningAtMs = startedAt; + markCronJobActive(job.id); emit(state, { jobId: job.id, action: "started", runAtMs: startedAt }); const jobTimeoutMs = resolveCronJobTimeoutMs(job); const taskRunId = tryCreateCronTaskRun({ state, job, startedAt }); @@ -1299,6 +1302,7 @@ export async function executeJob( const startedAt = state.deps.nowMs(); job.state.runningAtMs = startedAt; job.state.lastError = undefined; + markCronJobActive(job.id); emit(state, { jobId: job.id, action: "started", runAtMs: startedAt }); let coreResult: { @@ -1327,6 +1331,7 @@ export async function executeJob( state.store.jobs = state.store.jobs.filter((j) => j.id !== job.id); emit(state, { jobId: job.id, action: "removed" }); } + clearCronJobActive(job.id); } function emitJobFinished( diff --git a/src/tasks/task-registry.maintenance.issue-60299.test.ts b/src/tasks/task-registry.maintenance.issue-60299.test.ts index 187c5a19d3a..e931466ae1a 100644 --- a/src/tasks/task-registry.maintenance.issue-60299.test.ts +++ b/src/tasks/task-registry.maintenance.issue-60299.test.ts @@ -26,11 +26,15 @@ async function loadMaintenanceModule(params: { tasks: TaskRecord[]; sessionStore?: Record; acpEntry?: unknown; + activeCronJobIds?: string[]; + activeRunIds?: string[]; }) { vi.resetModules(); const sessionStore = params.sessionStore ?? {}; const acpEntry = params.acpEntry; + const activeCronJobIds = new Set(params.activeCronJobIds ?? []); + const activeRunIds = new Set(params.activeRunIds ?? []); const currentTasks = new Map(params.tasks.map((task) => [task.taskId, { ...task }])); vi.doMock("../acp/runtime/session-meta.js", () => ({ @@ -45,6 +49,15 @@ async function loadMaintenanceModule(params: { resolveStorePath: () => "", })); + vi.doMock("../cron/active-jobs.js", () => ({ + isCronJobActive: (jobId: string) => activeCronJobIds.has(jobId), + })); + + vi.doMock("../infra/agent-events.js", () => ({ + getAgentRunContext: (runId: string) => + activeRunIds.has(runId) ? { sessionKey: "main" } : undefined, + })); + vi.doMock("./runtime-internal.js", () => ({ deleteTaskRecordById: (taskId: string) => currentTasks.delete(taskId), ensureTaskRegistryReady: () => {}, @@ -90,22 +103,11 @@ async function loadMaintenanceModule(params: { } describe("task-registry maintenance issue #60299", () => { - it("marks cron tasks with no child session key lost after the grace period", async () => { - const task = makeStaleTask({ - runtime: "cron", - childSessionKey: undefined, - }); - - const { mod, currentTasks } = await loadMaintenanceModule({ tasks: [task] }); - - expect(await mod.runTaskRegistryMaintenance()).toMatchObject({ reconciled: 1 }); - expect(currentTasks.get(task.taskId)).toMatchObject({ status: "lost" }); - }); - - it("marks cron tasks lost even if their transient child key still exists in the session store", async () => { + it("marks stale cron tasks lost once the runtime no longer tracks the job as active", async () => { const childSessionKey = "agent:main:slack:channel:test-channel"; const task = makeStaleTask({ runtime: "cron", + sourceId: "cron-job-1", childSessionKey, }); @@ -118,10 +120,28 @@ describe("task-registry maintenance issue #60299", () => { expect(currentTasks.get(task.taskId)).toMatchObject({ status: "lost" }); }); - it("treats cli tasks backed only by a persistent chat session as stale", async () => { + it("keeps active cron tasks live while the cron runtime still owns the job", async () => { + const task = makeStaleTask({ + runtime: "cron", + sourceId: "cron-job-2", + childSessionKey: undefined, + }); + + const { mod, currentTasks } = await loadMaintenanceModule({ + tasks: [task], + activeCronJobIds: ["cron-job-2"], + }); + + expect(await mod.runTaskRegistryMaintenance()).toMatchObject({ reconciled: 0 }); + expect(currentTasks.get(task.taskId)).toMatchObject({ status: "running" }); + }); + + it("marks chat-backed cli tasks lost after the owning run context disappears", async () => { const channelKey = "agent:main:slack:channel:C1234567890"; const task = makeStaleTask({ runtime: "cli", + sourceId: "run-chat-cli-stale", + runId: "run-chat-cli-stale", ownerKey: "agent:main:main", requesterSessionKey: channelKey, childSessionKey: channelKey, @@ -136,18 +156,21 @@ describe("task-registry maintenance issue #60299", () => { expect(currentTasks.get(task.taskId)).toMatchObject({ status: "lost" }); }); - it("keeps subagent tasks live while their child session still exists", async () => { - const childKey = "agent:main:subagent:abc123"; + it("keeps chat-backed cli tasks live while the owning run context is still active", async () => { + const channelKey = "agent:main:slack:channel:C1234567890"; const task = makeStaleTask({ - runtime: "subagent", + runtime: "cli", + sourceId: "run-chat-cli-live", + runId: "run-chat-cli-live", ownerKey: "agent:main:main", - requesterSessionKey: "agent:main:main", - childSessionKey: childKey, + requesterSessionKey: channelKey, + childSessionKey: channelKey, }); const { mod, currentTasks } = await loadMaintenanceModule({ tasks: [task], - sessionStore: { [childKey]: { updatedAt: Date.now() } }, + sessionStore: { [channelKey]: { updatedAt: Date.now() } }, + activeRunIds: ["run-chat-cli-live"], }); expect(await mod.runTaskRegistryMaintenance()).toMatchObject({ reconciled: 0 }); diff --git a/src/tasks/task-registry.maintenance.ts b/src/tasks/task-registry.maintenance.ts index 030d9ddfb26..230ad84a685 100644 --- a/src/tasks/task-registry.maintenance.ts +++ b/src/tasks/task-registry.maintenance.ts @@ -1,5 +1,7 @@ import { readAcpSessionEntry } from "../acp/runtime/session-meta.js"; import { loadSessionStore, resolveStorePath } from "../config/sessions.js"; +import { isCronJobActive } from "../cron/active-jobs.js"; +import { getAgentRunContext } from "../infra/agent-events.js"; import { parseAgentSessionKey } from "../routing/session-key.js"; import { deriveSessionChatType } from "../sessions/session-chat-type.js"; import { @@ -64,16 +66,25 @@ function hasLostGraceExpired(task: TaskRecord, now: number): boolean { return now - referenceAt >= TASK_RECONCILE_GRACE_MS; } -/** - * Returns false if the task's runtime is cron, since cron tasks do not maintain - * a persistent child session after the job exits. - * - * For cli tasks, long-lived channel/group/direct session-store entries do not - * imply task liveness, so only agent-scoped non-chat child sessions count. - */ +function hasActiveCliRun(task: TaskRecord): boolean { + const candidateRunIds = [task.sourceId, task.runId]; + for (const candidate of candidateRunIds) { + const runId = candidate?.trim(); + if (runId && getAgentRunContext(runId)) { + return true; + } + } + return false; +} + function hasBackingSession(task: TaskRecord): boolean { if (task.runtime === "cron") { - return false; + const jobId = task.sourceId?.trim(); + return jobId ? isCronJobActive(jobId) : false; + } + + if (task.runtime === "cli" && hasActiveCliRun(task)) { + return true; } const childSessionKey = task.childSessionKey?.trim(); @@ -89,17 +100,12 @@ function hasBackingSession(task: TaskRecord): boolean { } return Boolean(acpEntry.entry); } - if (task.runtime === "subagent") { - const agentId = parseAgentSessionKey(childSessionKey)?.agentId; - const storePath = resolveStorePath(undefined, { agentId }); - const store = loadSessionStore(storePath); - return Boolean(findSessionEntryByKey(store, childSessionKey)); - } - - if (task.runtime === "cli") { - const chatType = deriveSessionChatType(childSessionKey); - if (chatType === "channel" || chatType === "group" || chatType === "direct") { - return false; + if (task.runtime === "subagent" || task.runtime === "cli") { + if (task.runtime === "cli") { + const chatType = deriveSessionChatType(childSessionKey); + if (chatType === "channel" || chatType === "group" || chatType === "direct") { + return false; + } } const agentId = parseAgentSessionKey(childSessionKey)?.agentId; const storePath = resolveStorePath(undefined, { agentId }); diff --git a/src/tasks/task-registry.test.ts b/src/tasks/task-registry.test.ts index 5abc1236225..9fbe49b0ab5 100644 --- a/src/tasks/task-registry.test.ts +++ b/src/tasks/task-registry.test.ts @@ -1,5 +1,6 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { startAcpSpawnParentStreamRelay } from "../agents/acp-spawn-parent-stream.js"; +import { resetCronActiveJobsForTests } from "../cron/active-jobs.js"; import { emitAgentEvent, registerAgentRunContext, @@ -226,6 +227,7 @@ describe("task-registry", () => { resetSystemEventsForTest(); resetHeartbeatWakeStateForTests(); resetAgentRunContextForTest(); + resetCronActiveJobsForTests(); resetTaskRegistryDeliveryRuntimeForTests(); resetTaskRegistryForTests({ persist: false }); resetTaskFlowRegistryForTests({ persist: false });