diff --git a/CHANGELOG.md b/CHANGELOG.md index 34e3708c970..379dee74f25 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,7 @@ Docs: https://docs.openclaw.ai ### Fixes - Agents/exec approvals: let `exec-approvals.json` agent security override stricter gateway tool defaults so approved subagents can use `security: "full"` without falling back to allowlist enforcement again. (#60310) Thanks @lml2468. +- Tasks/maintenance: mark stale cron runs and CLI tasks backed only by long-lived chat sessions as lost again so task cleanup does not keep dead work alive indefinitely. (#60310) Thanks @lml2468. - Providers/OpenAI: preserve native `reasoning.effort: "none"` and strict tool schemas on direct OpenAI-family endpoints, keep compat routes on compat shaping, fix Responses WebSocket warm-up behavior, keep stable session and turn metadata, and fall back more gracefully after early WebSocket failures. - Providers/OpenAI Codex: split native `contextWindow` from runtime `contextTokens`, keep the default effective cap at `272000`, and expose a per-model `contextTokens` override on `models.providers.*.models[]`. - Providers/compat: stop forcing OpenAI-only defaults on proxy and custom OpenAI-compatible routes, preserve native vendor-specific reasoning/tool/streaming behavior across Anthropic-compatible, Moonshot, Mistral, ModelStudio, OpenRouter, xAI, and Z.ai endpoints, and route GitHub Copilot Claude models through Anthropic Messages instead of OpenAI Responses. diff --git a/src/tasks/task-registry.maintenance.issue-60299.test.ts b/src/tasks/task-registry.maintenance.issue-60299.test.ts new file mode 100644 index 00000000000..187c5a19d3a --- /dev/null +++ b/src/tasks/task-registry.maintenance.issue-60299.test.ts @@ -0,0 +1,156 @@ +import { describe, expect, it, vi } from "vitest"; +import type { TaskRecord } from "./task-registry.types.js"; + +const GRACE_EXPIRED_MS = 10 * 60_000; + +function makeStaleTask(overrides: Partial): TaskRecord { + const now = Date.now(); + return { + taskId: "task-test-" + Math.random().toString(36).slice(2), + runtime: "cron", + requesterSessionKey: "agent:main:main", + ownerKey: "system:cron:test", + scopeKind: "system", + task: "test task", + status: "running", + deliveryStatus: "not_applicable", + notifyPolicy: "silent", + createdAt: now - GRACE_EXPIRED_MS, + startedAt: now - GRACE_EXPIRED_MS, + lastEventAt: now - GRACE_EXPIRED_MS, + ...overrides, + }; +} + +async function loadMaintenanceModule(params: { + tasks: TaskRecord[]; + sessionStore?: Record; + acpEntry?: unknown; +}) { + vi.resetModules(); + + const sessionStore = params.sessionStore ?? {}; + const acpEntry = params.acpEntry; + const currentTasks = new Map(params.tasks.map((task) => [task.taskId, { ...task }])); + + vi.doMock("../acp/runtime/session-meta.js", () => ({ + readAcpSessionEntry: () => + acpEntry !== undefined + ? { entry: acpEntry, storeReadFailed: false } + : { entry: undefined, storeReadFailed: false }, + })); + + vi.doMock("../config/sessions.js", () => ({ + loadSessionStore: () => sessionStore, + resolveStorePath: () => "", + })); + + vi.doMock("./runtime-internal.js", () => ({ + deleteTaskRecordById: (taskId: string) => currentTasks.delete(taskId), + ensureTaskRegistryReady: () => {}, + getTaskById: (taskId: string) => currentTasks.get(taskId), + listTaskRecords: () => params.tasks, + markTaskLostById: (patch: { + taskId: string; + endedAt: number; + lastEventAt?: number; + error?: string; + cleanupAfter?: number; + }) => { + const current = currentTasks.get(patch.taskId); + if (!current) { + return null; + } + const next = { + ...current, + status: "lost" as const, + endedAt: patch.endedAt, + lastEventAt: patch.lastEventAt ?? patch.endedAt, + ...(patch.error !== undefined ? { error: patch.error } : {}), + ...(patch.cleanupAfter !== undefined ? { cleanupAfter: patch.cleanupAfter } : {}), + }; + currentTasks.set(patch.taskId, next); + return next; + }, + maybeDeliverTaskTerminalUpdate: () => false, + resolveTaskForLookupToken: () => undefined, + setTaskCleanupAfterById: (patch: { taskId: string; cleanupAfter: number }) => { + const current = currentTasks.get(patch.taskId); + if (!current) { + return null; + } + const next = { ...current, cleanupAfter: patch.cleanupAfter }; + currentTasks.set(patch.taskId, next); + return next; + }, + })); + + const mod = await import("./task-registry.maintenance.js"); + return { mod, currentTasks }; +} + +describe("task-registry maintenance issue #60299", () => { + it("marks cron tasks with no child session key lost after the grace period", async () => { + const task = makeStaleTask({ + runtime: "cron", + childSessionKey: undefined, + }); + + const { mod, currentTasks } = await loadMaintenanceModule({ tasks: [task] }); + + expect(await mod.runTaskRegistryMaintenance()).toMatchObject({ reconciled: 1 }); + expect(currentTasks.get(task.taskId)).toMatchObject({ status: "lost" }); + }); + + it("marks cron tasks lost even if their transient child key still exists in the session store", async () => { + const childSessionKey = "agent:main:slack:channel:test-channel"; + const task = makeStaleTask({ + runtime: "cron", + childSessionKey, + }); + + const { mod, currentTasks } = await loadMaintenanceModule({ + tasks: [task], + sessionStore: { [childSessionKey]: { updatedAt: Date.now() } }, + }); + + expect(await mod.runTaskRegistryMaintenance()).toMatchObject({ reconciled: 1 }); + expect(currentTasks.get(task.taskId)).toMatchObject({ status: "lost" }); + }); + + it("treats cli tasks backed only by a persistent chat session as stale", async () => { + const channelKey = "agent:main:slack:channel:C1234567890"; + const task = makeStaleTask({ + runtime: "cli", + ownerKey: "agent:main:main", + requesterSessionKey: channelKey, + childSessionKey: channelKey, + }); + + const { mod, currentTasks } = await loadMaintenanceModule({ + tasks: [task], + sessionStore: { [channelKey]: { updatedAt: Date.now() } }, + }); + + expect(await mod.runTaskRegistryMaintenance()).toMatchObject({ reconciled: 1 }); + expect(currentTasks.get(task.taskId)).toMatchObject({ status: "lost" }); + }); + + it("keeps subagent tasks live while their child session still exists", async () => { + const childKey = "agent:main:subagent:abc123"; + const task = makeStaleTask({ + runtime: "subagent", + ownerKey: "agent:main:main", + requesterSessionKey: "agent:main:main", + childSessionKey: childKey, + }); + + const { mod, currentTasks } = await loadMaintenanceModule({ + tasks: [task], + sessionStore: { [childKey]: { updatedAt: Date.now() } }, + }); + + expect(await mod.runTaskRegistryMaintenance()).toMatchObject({ reconciled: 0 }); + expect(currentTasks.get(task.taskId)).toMatchObject({ status: "running" }); + }); +}); diff --git a/src/tasks/task-registry.maintenance.ts b/src/tasks/task-registry.maintenance.ts index c7d7bec3114..030d9ddfb26 100644 --- a/src/tasks/task-registry.maintenance.ts +++ b/src/tasks/task-registry.maintenance.ts @@ -1,6 +1,7 @@ import { readAcpSessionEntry } from "../acp/runtime/session-meta.js"; import { loadSessionStore, resolveStorePath } from "../config/sessions.js"; import { parseAgentSessionKey } from "../routing/session-key.js"; +import { deriveSessionChatType } from "../sessions/session-chat-type.js"; import { deleteTaskRecordById, ensureTaskRegistryReady, @@ -63,7 +64,18 @@ function hasLostGraceExpired(task: TaskRecord, now: number): boolean { return now - referenceAt >= TASK_RECONCILE_GRACE_MS; } +/** + * Returns false if the task's runtime is cron, since cron tasks do not maintain + * a persistent child session after the job exits. + * + * For cli tasks, long-lived channel/group/direct session-store entries do not + * imply task liveness, so only agent-scoped non-chat child sessions count. + */ function hasBackingSession(task: TaskRecord): boolean { + if (task.runtime === "cron") { + return false; + } + const childSessionKey = task.childSessionKey?.trim(); if (!childSessionKey) { return true; @@ -77,12 +89,24 @@ function hasBackingSession(task: TaskRecord): boolean { } return Boolean(acpEntry.entry); } - if (task.runtime === "subagent" || task.runtime === "cli") { + if (task.runtime === "subagent") { const agentId = parseAgentSessionKey(childSessionKey)?.agentId; const storePath = resolveStorePath(undefined, { agentId }); const store = loadSessionStore(storePath); return Boolean(findSessionEntryByKey(store, childSessionKey)); } + + if (task.runtime === "cli") { + const chatType = deriveSessionChatType(childSessionKey); + if (chatType === "channel" || chatType === "group" || chatType === "direct") { + return false; + } + const agentId = parseAgentSessionKey(childSessionKey)?.agentId; + const storePath = resolveStorePath(undefined, { agentId }); + const store = loadSessionStore(storePath); + return Boolean(findSessionEntryByKey(store, childSessionKey)); + } + return true; }