From 7cd0ff2d888f41c40b8eac11cb2cd5181100500d Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Wed, 1 Apr 2026 03:12:33 +0900 Subject: [PATCH] refactor(tasks): add owner-key task access boundaries (#58516) * refactor(tasks): add owner-key task access boundaries * test(acp): update task owner-key assertion * fix(tasks): align owner key checks and migration scope --- src/acp/control-plane/manager.core.ts | 3 +- src/acp/control-plane/manager.test.ts | 3 +- src/agents/acp-spawn.ts | 6 +- .../openclaw-tools.session-status.test.ts | 20 +- src/agents/subagent-registry-run-manager.ts | 3 +- src/agents/tools/session-status-tool.ts | 17 +- src/auto-reply/reply/commands-acp.test.ts | 3 +- .../reply/commands-acp/runtime-options.ts | 7 +- .../reply/commands-subagents/action-info.ts | 7 +- src/auto-reply/reply/commands.test.ts | 3 +- src/commands/tasks.ts | 8 +- src/cron/service/ops.ts | 3 +- src/cron/service/timer.ts | 3 +- src/gateway/server-methods/agent.test.ts | 3 +- src/gateway/server-methods/agent.ts | 3 +- src/tasks/runtime-internal.ts | 28 ++ src/tasks/task-executor-policy.test.ts | 3 +- src/tasks/task-executor.test.ts | 15 +- src/tasks/task-executor.ts | 9 +- src/tasks/task-owner-access.test.ts | 114 ++++++++ src/tasks/task-owner-access.ts | 80 ++++++ .../task-registry-import-boundary.test.ts | 11 +- src/tasks/task-registry.audit.test.ts | 3 +- src/tasks/task-registry.maintenance.ts | 6 +- src/tasks/task-registry.store.sqlite.ts | 66 ++++- src/tasks/task-registry.store.test.ts | 100 ++++++- src/tasks/task-registry.test.ts | 118 +++++--- src/tasks/task-registry.ts | 271 ++++++++++++------ src/tasks/task-registry.types.ts | 4 +- 29 files changed, 732 insertions(+), 188 deletions(-) create mode 100644 src/tasks/runtime-internal.ts create mode 100644 src/tasks/task-owner-access.test.ts create mode 100644 src/tasks/task-owner-access.ts diff --git a/src/acp/control-plane/manager.core.ts b/src/acp/control-plane/manager.core.ts index 526b324d168..84c5d617b36 100644 --- a/src/acp/control-plane/manager.core.ts +++ b/src/acp/control-plane/manager.core.ts @@ -1884,7 +1884,8 @@ export class AcpSessionManager { createRunningTaskRun({ runtime: "acp", sourceId: context.runId, - requesterSessionKey: context.requesterSessionKey, + ownerKey: context.requesterSessionKey, + scopeKind: "session", requesterOrigin: context.requesterOrigin, childSessionKey: context.childSessionKey, runId: context.runId, diff --git a/src/acp/control-plane/manager.test.ts b/src/acp/control-plane/manager.test.ts index ef034c742c1..06be5ff7e71 100644 --- a/src/acp/control-plane/manager.test.ts +++ b/src/acp/control-plane/manager.test.ts @@ -327,7 +327,8 @@ describe("AcpSessionManager", () => { expect(findTaskByRunId("direct-parented-run")).toMatchObject({ runtime: "acp", - requesterSessionKey: "agent:quant:telegram:quant:direct:822430204", + ownerKey: "agent:quant:telegram:quant:direct:822430204", + scopeKind: "session", childSessionKey: "agent:codex:acp:child-1", label: "Quant patch", task: "Implement the feature and report back", diff --git a/src/agents/acp-spawn.ts b/src/agents/acp-spawn.ts index 841293965d4..6e1328ce1a8 100644 --- a/src/agents/acp-spawn.ts +++ b/src/agents/acp-spawn.ts @@ -997,7 +997,8 @@ export async function spawnAcpDirect( createRunningTaskRun({ runtime: "acp", sourceId: childRunId, - requesterSessionKey: requesterInternalKey, + ownerKey: requesterInternalKey, + scopeKind: "session", requesterOrigin: requesterState.origin, childSessionKey: sessionKey, runId: childRunId, @@ -1028,7 +1029,8 @@ export async function spawnAcpDirect( createRunningTaskRun({ runtime: "acp", sourceId: childRunId, - requesterSessionKey: requesterInternalKey, + ownerKey: requesterInternalKey, + scopeKind: "session", requesterOrigin: requesterState.origin, childSessionKey: sessionKey, runId: childRunId, diff --git a/src/agents/openclaw-tools.session-status.test.ts b/src/agents/openclaw-tools.session-status.test.ts index e41ae65a5c3..e419f5dfaaf 100644 --- a/src/agents/openclaw-tools.session-status.test.ts +++ b/src/agents/openclaw-tools.session-status.test.ts @@ -8,8 +8,11 @@ const callGatewayMock = vi.fn(); const loadCombinedSessionStoreForGatewayMock = vi.fn(); const buildStatusMessageMock = vi.hoisted(() => vi.fn(() => "OpenClaw\n🧠 Model: GPT-5.4")); const resolveQueueSettingsMock = vi.hoisted(() => vi.fn(() => ({ mode: "interrupt" }))); -const listTasksForSessionKeyMock = vi.hoisted(() => - vi.fn((_: string) => [] as Array>), +const listTasksForRelatedSessionKeyForOwnerMock = vi.hoisted(() => + vi.fn( + (_: { relatedSessionKey: string; callerOwnerKey: string }) => + [] as Array>, + ), ); const createMockConfig = () => ({ @@ -192,8 +195,11 @@ async function loadFreshOpenClawToolsForSessionStatusTest() { vi.doMock("../auto-reply/status.js", () => ({ buildStatusMessage: buildStatusMessageMock, })); - vi.doMock("../tasks/task-registry.js", () => ({ - listTasksForSessionKey: (sessionKey: string) => listTasksForSessionKeyMock(sessionKey), + vi.doMock("../tasks/task-owner-access.js", () => ({ + listTasksForRelatedSessionKeyForOwner: (params: { + relatedSessionKey: string; + callerOwnerKey: string; + }) => listTasksForRelatedSessionKeyForOwnerMock(params), })); ({ createSessionStatusTool } = await import("./tools/session-status-tool.js")); } @@ -206,8 +212,8 @@ function resetSessionStore(store: Record) { updateSessionStoreMock.mockClear(); callGatewayMock.mockClear(); loadCombinedSessionStoreForGatewayMock.mockClear(); - listTasksForSessionKeyMock.mockClear(); - listTasksForSessionKeyMock.mockReturnValue([]); + listTasksForRelatedSessionKeyForOwnerMock.mockClear(); + listTasksForRelatedSessionKeyForOwnerMock.mockReturnValue([]); loadSessionStoreMock.mockReturnValue(store); loadCombinedSessionStoreForGatewayMock.mockReturnValue({ storePath: "(multiple)", @@ -390,7 +396,7 @@ describe("session_status tool", () => { updatedAt: Date.now(), }, }); - listTasksForSessionKeyMock.mockReturnValue([ + listTasksForRelatedSessionKeyForOwnerMock.mockReturnValue([ { taskId: "task-1", runtime: "acp", diff --git a/src/agents/subagent-registry-run-manager.ts b/src/agents/subagent-registry-run-manager.ts index 33e70961de0..cc6c6c722fa 100644 --- a/src/agents/subagent-registry-run-manager.ts +++ b/src/agents/subagent-registry-run-manager.ts @@ -323,7 +323,8 @@ export function createSubagentRunManager(params: { createRunningTaskRun({ runtime: "subagent", sourceId: registerParams.runId, - requesterSessionKey: registerParams.requesterSessionKey, + ownerKey: registerParams.requesterSessionKey, + scopeKind: "session", requesterOrigin, childSessionKey: registerParams.childSessionKey, runId: registerParams.runId, diff --git a/src/agents/tools/session-status-tool.ts b/src/agents/tools/session-status-tool.ts index d5338de4e3e..ad2ff405a22 100644 --- a/src/agents/tools/session-status-tool.ts +++ b/src/agents/tools/session-status-tool.ts @@ -23,7 +23,7 @@ import { resolveAgentIdFromSessionKey, } from "../../routing/session-key.js"; import { applyModelOverrideToSessionEntry } from "../../sessions/model-overrides.js"; -import { listTasksForSessionKey } from "../../tasks/task-registry.js"; +import { listTasksForRelatedSessionKeyForOwner } from "../../tasks/task-owner-access.js"; import { resolveAgentConfig, resolveAgentDir } from "../agent-scope.js"; import { formatUserTime, resolveUserTimeFormat, resolveUserTimezone } from "../date-time.js"; import { resolveModelAuthLabel } from "../model-auth-label.js"; @@ -119,8 +119,14 @@ function resolveStoreScopedRequesterKey(params: { return parsed.rest === params.mainKey ? params.mainKey : params.requesterKey; } -function formatSessionTaskLine(sessionKey: string): string | undefined { - const tasks = listTasksForSessionKey(sessionKey); +function formatSessionTaskLine(params: { + relatedSessionKey: string; + callerOwnerKey: string; +}): string | undefined { + const tasks = listTasksForRelatedSessionKeyForOwner({ + relatedSessionKey: params.relatedSessionKey, + callerOwnerKey: params.callerOwnerKey, + }); if (tasks.length === 0) { return undefined; } @@ -568,7 +574,10 @@ export function createSessionStatusTool(opts?: { }, includeTranscriptUsage: true, }); - const taskLine = formatSessionTaskLine(resolved.key); + const taskLine = formatSessionTaskLine({ + relatedSessionKey: resolved.key, + callerOwnerKey: visibilityRequesterKey, + }); const fullStatusText = taskLine ? `${statusText}\n${taskLine}` : statusText; return { diff --git a/src/auto-reply/reply/commands-acp.test.ts b/src/auto-reply/reply/commands-acp.test.ts index e41fe6d9ae9..0de8a5505f9 100644 --- a/src/auto-reply/reply/commands-acp.test.ts +++ b/src/auto-reply/reply/commands-acp.test.ts @@ -1464,7 +1464,8 @@ describe("/acp command", () => { }); createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", childSessionKey: defaultAcpSessionKey, runId: "acp-run-1", task: "Inspect ACP backlog", diff --git a/src/auto-reply/reply/commands-acp/runtime-options.ts b/src/auto-reply/reply/commands-acp/runtime-options.ts index f88c88caed1..41f1bcbdee9 100644 --- a/src/auto-reply/reply/commands-acp/runtime-options.ts +++ b/src/auto-reply/reply/commands-acp/runtime-options.ts @@ -8,7 +8,7 @@ import { validateRuntimePermissionProfileInput, } from "../../../acp/control-plane/runtime-options.js"; import { resolveAcpSessionIdentifierLinesFromIdentity } from "../../../acp/runtime/session-identifiers.js"; -import { findLatestTaskForSessionKey } from "../../../tasks/task-registry.js"; +import { findLatestTaskForRelatedSessionKeyForOwner } from "../../../tasks/task-owner-access.js"; import type { CommandHandlerResult, HandleCommandsParams } from "../commands-types.js"; import { ACP_CWD_USAGE, @@ -123,7 +123,10 @@ export async function handleAcpStatusAction( fallbackCode: "ACP_TURN_FAILED", fallbackMessage: "Could not read ACP session status.", onSuccess: (status) => { - const linkedTask = findLatestTaskForSessionKey(status.sessionKey); + const linkedTask = findLatestTaskForRelatedSessionKeyForOwner({ + relatedSessionKey: status.sessionKey, + callerOwnerKey: params.sessionKey, + }); const sessionIdentifierLines = resolveAcpSessionIdentifierLinesFromIdentity({ backend: status.backend, identity: status.identity, diff --git a/src/auto-reply/reply/commands-subagents/action-info.ts b/src/auto-reply/reply/commands-subagents/action-info.ts index 69f39f9cb26..cb90256c149 100644 --- a/src/auto-reply/reply/commands-subagents/action-info.ts +++ b/src/auto-reply/reply/commands-subagents/action-info.ts @@ -1,7 +1,7 @@ import { countPendingDescendantRuns } from "../../../agents/subagent-registry.js"; import { loadSessionStore, resolveStorePath } from "../../../config/sessions.js"; import { formatDurationCompact } from "../../../shared/subagents-format.js"; -import { findTaskByRunId } from "../../../tasks/task-registry.js"; +import { findTaskByRunIdForOwner } from "../../../tasks/task-owner-access.js"; import type { CommandHandlerResult } from "../commands-types.js"; import { formatRunLabel } from "../subagents-utils.js"; import { @@ -37,7 +37,10 @@ export function handleSubagentsInfoAction(ctx: SubagentsCommandContext): Command const outcome = run.outcome ? `${run.outcome.status}${run.outcome.error ? ` (${run.outcome.error})` : ""}` : "n/a"; - const linkedTask = findTaskByRunId(run.runId); + const linkedTask = findTaskByRunIdForOwner({ + runId: run.runId, + callerOwnerKey: params.sessionKey, + }); const lines = [ "ℹ️ Subagent info", diff --git a/src/auto-reply/reply/commands.test.ts b/src/auto-reply/reply/commands.test.ts index 555dc26a6f7..3d35a26a034 100644 --- a/src/auto-reply/reply/commands.test.ts +++ b/src/auto-reply/reply/commands.test.ts @@ -2633,7 +2633,8 @@ describe("handleCommands subagents", () => { }); createTaskRecord({ runtime: "subagent", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", childSessionKey: "agent:main:subagent:abc", runId: "run-1", task: "do thing", diff --git a/src/commands/tasks.ts b/src/commands/tasks.ts index 8d263b5e037..1fa3bcfabf5 100644 --- a/src/commands/tasks.ts +++ b/src/commands/tasks.ts @@ -1,6 +1,11 @@ import { loadConfig } from "../config/config.js"; import { info } from "../globals.js"; import type { RuntimeEnv } from "../runtime.js"; +import { + cancelTaskById, + getTaskById, + updateTaskNotifyPolicyById, +} from "../tasks/runtime-internal.js"; import { listTaskAuditFindings, summarizeTaskAuditFindings, @@ -8,7 +13,6 @@ import { type TaskAuditFinding, type TaskAuditSeverity, } from "../tasks/task-registry.audit.js"; -import { cancelTaskById, getTaskById, updateTaskNotifyPolicyById } from "../tasks/task-registry.js"; import { getInspectableTaskAuditSummary, getInspectableTaskRegistrySummary, @@ -231,7 +235,7 @@ export async function tasksShowCommand( `result: ${task.terminalOutcome ?? "n/a"}`, `delivery: ${task.deliveryStatus}`, `notify: ${task.notifyPolicy}`, - `requesterSessionKey: ${task.requesterSessionKey}`, + `ownerKey: ${task.ownerKey}`, `childSessionKey: ${task.childSessionKey ?? "n/a"}`, `parentTaskId: ${task.parentTaskId ?? "n/a"}`, `agentId: ${task.agentId ?? "n/a"}`, diff --git a/src/cron/service/ops.ts b/src/cron/service/ops.ts index 1daeeccdf80..d9a4a78b2c3 100644 --- a/src/cron/service/ops.ts +++ b/src/cron/service/ops.ts @@ -400,7 +400,8 @@ function tryCreateManualTaskRun(params: { createRunningTaskRun({ runtime: "cron", sourceId: params.job.id, - requesterSessionKey: "", + ownerKey: `system:cron:${params.job.id}`, + scopeKind: "system", childSessionKey: params.job.sessionKey, agentId: params.job.agentId, runId, diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index 2e167cf6d72..51995acebe0 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -138,7 +138,8 @@ function tryCreateCronTaskRun(params: { createRunningTaskRun({ runtime: "cron", sourceId: params.job.id, - requesterSessionKey: "", + ownerKey: `system:cron:${params.job.id}`, + scopeKind: "system", childSessionKey: params.job.sessionKey, agentId: params.job.agentId, runId, diff --git a/src/gateway/server-methods/agent.test.ts b/src/gateway/server-methods/agent.test.ts index 5ee015a63f9..b8cd4a22562 100644 --- a/src/gateway/server-methods/agent.test.ts +++ b/src/gateway/server-methods/agent.test.ts @@ -473,7 +473,8 @@ describe("gateway agent handler", () => { runId: "run-old", childSessionKey, controllerSessionKey: "agent:main:main", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", requesterDisplayKey: "main", task: "initial task", cleanup: "keep" as const, diff --git a/src/gateway/server-methods/agent.ts b/src/gateway/server-methods/agent.ts index 7ce897b7018..4f244f1c7cb 100644 --- a/src/gateway/server-methods/agent.ts +++ b/src/gateway/server-methods/agent.ts @@ -197,7 +197,8 @@ function dispatchAgentRunFromGateway(params: { createRunningTaskRun({ runtime: "cli", sourceId: params.runId, - requesterSessionKey: params.ingressOpts.sessionKey, + ownerKey: params.ingressOpts.sessionKey, + scopeKind: "session", requesterOrigin: normalizeDeliveryContext({ channel: params.ingressOpts.channel, to: params.ingressOpts.to, diff --git a/src/tasks/runtime-internal.ts b/src/tasks/runtime-internal.ts new file mode 100644 index 00000000000..bbd3353c33c --- /dev/null +++ b/src/tasks/runtime-internal.ts @@ -0,0 +1,28 @@ +export { + cancelTaskById, + createTaskRecord, + deleteTaskRecordById, + ensureTaskRegistryReady, + findLatestTaskForOwnerKey, + findLatestTaskForRelatedSessionKey, + findTaskByRunId, + getTaskById, + getTaskRegistrySnapshot, + getTaskRegistrySummary, + listTaskRecords, + listTasksForOwnerKey, + listTasksForRelatedSessionKey, + markTaskLostById, + markTaskRunningByRunId, + markTaskTerminalById, + markTaskTerminalByRunId, + maybeDeliverTaskTerminalUpdate, + recordTaskProgressByRunId, + resolveTaskForLookupToken, + resetTaskRegistryForTests, + setTaskCleanupAfterById, + setTaskProgressById, + setTaskRunDeliveryStatusByRunId, + setTaskTimingById, + updateTaskNotifyPolicyById, +} from "./task-registry.js"; diff --git a/src/tasks/task-executor-policy.test.ts b/src/tasks/task-executor-policy.test.ts index d6316d9cdc5..19469002a8b 100644 --- a/src/tasks/task-executor-policy.test.ts +++ b/src/tasks/task-executor-policy.test.ts @@ -14,7 +14,8 @@ function createTask(partial: Partial): TaskRecord { return { taskId: partial.taskId ?? "task-1", runtime: partial.runtime ?? "acp", - requesterSessionKey: partial.requesterSessionKey ?? "agent:main:main", + ownerKey: partial.ownerKey ?? "agent:main:main", + scopeKind: "session", task: partial.task ?? "Investigate issue", status: partial.status ?? "running", deliveryStatus: partial.deliveryStatus ?? "pending", diff --git a/src/tasks/task-executor.test.ts b/src/tasks/task-executor.test.ts index 46883aff569..be6b23613e3 100644 --- a/src/tasks/task-executor.test.ts +++ b/src/tasks/task-executor.test.ts @@ -67,7 +67,8 @@ describe("task-executor", () => { await withTaskExecutorStateDir(async () => { const created = createQueuedTaskRun({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", childSessionKey: "agent:codex:acp:child", runId: "run-executor-queued", task: "Investigate issue", @@ -103,7 +104,8 @@ describe("task-executor", () => { await withTaskExecutorStateDir(async () => { const created = createRunningTaskRun({ runtime: "subagent", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", childSessionKey: "agent:codex:subagent:child", runId: "run-executor-fail", task: "Write summary", @@ -143,7 +145,8 @@ describe("task-executor", () => { await withTaskExecutorStateDir(async () => { const created = createRunningTaskRun({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", requesterOrigin: { channel: "telegram", to: "telegram:123", @@ -179,7 +182,8 @@ describe("task-executor", () => { const child = createRunningTaskRun({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", childSessionKey: "agent:codex:acp:child", runId: "run-linear-cancel", task: "Inspect a PR", @@ -217,7 +221,8 @@ describe("task-executor", () => { const child = createRunningTaskRun({ runtime: "subagent", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", childSessionKey: "agent:codex:subagent:child", runId: "run-subagent-cancel", task: "Inspect a PR", diff --git a/src/tasks/task-executor.ts b/src/tasks/task-executor.ts index 8bfde5b8c07..7ac1b245e52 100644 --- a/src/tasks/task-executor.ts +++ b/src/tasks/task-executor.ts @@ -7,13 +7,14 @@ import { markTaskTerminalByRunId, recordTaskProgressByRunId, setTaskRunDeliveryStatusByRunId, -} from "./task-registry.js"; +} from "./runtime-internal.js"; import type { TaskDeliveryState, TaskDeliveryStatus, TaskNotifyPolicy, TaskRecord, TaskRuntime, + TaskScopeKind, TaskStatus, TaskTerminalOutcome, } from "./task-registry.types.js"; @@ -21,7 +22,8 @@ import type { export function createQueuedTaskRun(params: { runtime: TaskRuntime; sourceId?: string; - requesterSessionKey: string; + ownerKey: string; + scopeKind: TaskScopeKind; requesterOrigin?: TaskDeliveryState["requesterOrigin"]; childSessionKey?: string; parentTaskId?: string; @@ -42,7 +44,8 @@ export function createQueuedTaskRun(params: { export function createRunningTaskRun(params: { runtime: TaskRuntime; sourceId?: string; - requesterSessionKey: string; + ownerKey: string; + scopeKind: TaskScopeKind; requesterOrigin?: TaskDeliveryState["requesterOrigin"]; childSessionKey?: string; parentTaskId?: string; diff --git a/src/tasks/task-owner-access.test.ts b/src/tasks/task-owner-access.test.ts new file mode 100644 index 00000000000..6acc84afea4 --- /dev/null +++ b/src/tasks/task-owner-access.test.ts @@ -0,0 +1,114 @@ +import { afterEach, describe, expect, it } from "vitest"; +import { + findLatestTaskForRelatedSessionKeyForOwner, + findTaskByRunIdForOwner, + getTaskByIdForOwner, + resolveTaskForLookupTokenForOwner, +} from "./task-owner-access.js"; +import { createTaskRecord, resetTaskRegistryForTests } from "./task-registry.js"; + +afterEach(() => { + resetTaskRegistryForTests({ persist: false }); +}); + +describe("task owner access", () => { + it("returns owner-scoped tasks for owner and child-session lookups", () => { + const task = createTaskRecord({ + runtime: "subagent", + ownerKey: "agent:main:main", + scopeKind: "session", + childSessionKey: "agent:main:subagent:child-1", + runId: "owner-visible-run", + task: "Owner visible task", + status: "running", + }); + + expect( + findLatestTaskForRelatedSessionKeyForOwner({ + relatedSessionKey: "agent:main:subagent:child-1", + callerOwnerKey: "agent:main:main", + })?.taskId, + ).toBe(task.taskId); + expect( + findTaskByRunIdForOwner({ + runId: "owner-visible-run", + callerOwnerKey: "agent:main:main", + })?.taskId, + ).toBe(task.taskId); + }); + + it("denies cross-owner task reads", () => { + const task = createTaskRecord({ + runtime: "acp", + ownerKey: "agent:main:main", + scopeKind: "session", + childSessionKey: "agent:main:acp:child-1", + runId: "owner-hidden-run", + task: "Hidden task", + status: "queued", + }); + + expect( + getTaskByIdForOwner({ + taskId: task.taskId, + callerOwnerKey: "agent:main:subagent:other-parent", + }), + ).toBeUndefined(); + expect( + findTaskByRunIdForOwner({ + runId: "owner-hidden-run", + callerOwnerKey: "agent:main:subagent:other-parent", + }), + ).toBeUndefined(); + expect( + resolveTaskForLookupTokenForOwner({ + token: "agent:main:acp:child-1", + callerOwnerKey: "agent:main:subagent:other-parent", + }), + ).toBeUndefined(); + }); + + it("requires an exact owner-key match", () => { + const task = createTaskRecord({ + runtime: "acp", + ownerKey: "agent:main:MixedCase", + scopeKind: "session", + runId: "case-sensitive-owner-run", + task: "Case-sensitive owner", + status: "queued", + }); + + expect( + getTaskByIdForOwner({ + taskId: task.taskId, + callerOwnerKey: "agent:main:mixedcase", + }), + ).toBeUndefined(); + }); + + it("does not expose system-owned tasks through owner-scoped readers", () => { + const task = createTaskRecord({ + runtime: "cron", + ownerKey: "system:cron:nightly", + scopeKind: "system", + childSessionKey: "agent:main:cron:nightly", + runId: "system-task-run", + task: "Nightly cron", + status: "running", + deliveryStatus: "not_applicable", + }); + + expect( + getTaskByIdForOwner({ + taskId: task.taskId, + callerOwnerKey: "agent:main:main", + }), + ).toBeUndefined(); + expect( + resolveTaskForLookupTokenForOwner({ + token: "system-task-run", + callerOwnerKey: "agent:main:main", + }), + ).toBeUndefined(); + }); +}); diff --git a/src/tasks/task-owner-access.ts b/src/tasks/task-owner-access.ts new file mode 100644 index 00000000000..eaf51126037 --- /dev/null +++ b/src/tasks/task-owner-access.ts @@ -0,0 +1,80 @@ +import { + findTaskByRunId, + getTaskById, + listTasksForRelatedSessionKey, + resolveTaskForLookupToken, +} from "./task-registry.js"; +import type { TaskRecord } from "./task-registry.types.js"; + +function normalizeOwnerKey(ownerKey?: string): string | undefined { + const trimmed = ownerKey?.trim(); + return trimmed ? trimmed : undefined; +} + +function canOwnerAccessTask(task: TaskRecord, callerOwnerKey: string): boolean { + return ( + task.scopeKind === "session" && + normalizeOwnerKey(task.ownerKey) === normalizeOwnerKey(callerOwnerKey) + ); +} + +export function getTaskByIdForOwner(params: { + taskId: string; + callerOwnerKey: string; +}): TaskRecord | undefined { + const task = getTaskById(params.taskId); + return task && canOwnerAccessTask(task, params.callerOwnerKey) ? task : undefined; +} + +export function findTaskByRunIdForOwner(params: { + runId: string; + callerOwnerKey: string; +}): TaskRecord | undefined { + const task = findTaskByRunId(params.runId); + return task && canOwnerAccessTask(task, params.callerOwnerKey) ? task : undefined; +} + +export function listTasksForRelatedSessionKeyForOwner(params: { + relatedSessionKey: string; + callerOwnerKey: string; +}): TaskRecord[] { + return listTasksForRelatedSessionKey(params.relatedSessionKey).filter((task) => + canOwnerAccessTask(task, params.callerOwnerKey), + ); +} + +export function findLatestTaskForRelatedSessionKeyForOwner(params: { + relatedSessionKey: string; + callerOwnerKey: string; +}): TaskRecord | undefined { + return listTasksForRelatedSessionKeyForOwner(params)[0]; +} + +export function resolveTaskForLookupTokenForOwner(params: { + token: string; + callerOwnerKey: string; +}): TaskRecord | undefined { + const direct = getTaskByIdForOwner({ + taskId: params.token, + callerOwnerKey: params.callerOwnerKey, + }); + if (direct) { + return direct; + } + const byRun = findTaskByRunIdForOwner({ + runId: params.token, + callerOwnerKey: params.callerOwnerKey, + }); + if (byRun) { + return byRun; + } + const related = findLatestTaskForRelatedSessionKeyForOwner({ + relatedSessionKey: params.token, + callerOwnerKey: params.callerOwnerKey, + }); + if (related) { + return related; + } + const raw = resolveTaskForLookupToken(params.token); + return raw && canOwnerAccessTask(raw, params.callerOwnerKey) ? raw : undefined; +} diff --git a/src/tasks/task-registry-import-boundary.test.ts b/src/tasks/task-registry-import-boundary.test.ts index d99ef7775bd..4b5b6e05034 100644 --- a/src/tasks/task-registry-import-boundary.test.ts +++ b/src/tasks/task-registry-import-boundary.test.ts @@ -5,14 +5,7 @@ import { describe, expect, it } from "vitest"; const TASK_ROOT = path.resolve(import.meta.dirname); const SRC_ROOT = path.resolve(TASK_ROOT, ".."); -const ALLOWED_IMPORTERS = new Set([ - "agents/tools/session-status-tool.ts", - "auto-reply/reply/commands-acp/runtime-options.ts", - "auto-reply/reply/commands-subagents/action-info.ts", - "commands/tasks.ts", - "tasks/task-executor.ts", - "tasks/task-registry.maintenance.ts", -]); +const ALLOWED_IMPORTERS = new Set(["tasks/runtime-internal.ts", "tasks/task-owner-access.ts"]); async function listSourceFiles(root: string): Promise { const entries = await fs.readdir(root, { withFileTypes: true }); @@ -32,7 +25,7 @@ async function listSourceFiles(root: string): Promise { } describe("task registry import boundary", () => { - it("keeps direct task-registry imports on the approved read-model seam", async () => { + it("keeps direct task-registry imports behind the approved task access seams", async () => { const importers: string[] = []; for (const file of await listSourceFiles(SRC_ROOT)) { const relative = path.relative(SRC_ROOT, file).replaceAll(path.sep, "/"); diff --git a/src/tasks/task-registry.audit.test.ts b/src/tasks/task-registry.audit.test.ts index e2d8ddb41ad..583d5d21a46 100644 --- a/src/tasks/task-registry.audit.test.ts +++ b/src/tasks/task-registry.audit.test.ts @@ -6,7 +6,8 @@ function createTask(partial: Partial): TaskRecord { return { taskId: partial.taskId ?? "task-1", runtime: partial.runtime ?? "acp", - requesterSessionKey: partial.requesterSessionKey ?? "agent:main:main", + ownerKey: partial.ownerKey ?? "agent:main:main", + scopeKind: "session", task: partial.task ?? "Background task", status: partial.status ?? "queued", deliveryStatus: partial.deliveryStatus ?? "pending", diff --git a/src/tasks/task-registry.maintenance.ts b/src/tasks/task-registry.maintenance.ts index f639e122172..82897d85c5e 100644 --- a/src/tasks/task-registry.maintenance.ts +++ b/src/tasks/task-registry.maintenance.ts @@ -1,8 +1,6 @@ import { readAcpSessionEntry } from "../acp/runtime/session-meta.js"; import { loadSessionStore, resolveStorePath } from "../config/sessions.js"; import { parseAgentSessionKey } from "../routing/session-key.js"; -import { listTaskAuditFindings, summarizeTaskAuditFindings } from "./task-registry.audit.js"; -import type { TaskAuditSummary } from "./task-registry.audit.js"; import { deleteTaskRecordById, ensureTaskRegistryReady, @@ -12,7 +10,9 @@ import { maybeDeliverTaskTerminalUpdate, resolveTaskForLookupToken, setTaskCleanupAfterById, -} from "./task-registry.js"; +} from "./runtime-internal.js"; +import { listTaskAuditFindings, summarizeTaskAuditFindings } from "./task-registry.audit.js"; +import type { TaskAuditSummary } from "./task-registry.audit.js"; import { summarizeTaskRecords } from "./task-registry.summary.js"; import type { TaskRecord, TaskRegistrySummary } from "./task-registry.types.js"; diff --git a/src/tasks/task-registry.store.sqlite.ts b/src/tasks/task-registry.store.sqlite.ts index 9cf0746bbb0..e6bdf12b0f0 100644 --- a/src/tasks/task-registry.store.sqlite.ts +++ b/src/tasks/task-registry.store.sqlite.ts @@ -10,7 +10,8 @@ type TaskRegistryRow = { task_id: string; runtime: TaskRecord["runtime"]; source_id: string | null; - requester_session_key: string; + owner_key: string; + scope_kind: TaskRecord["scopeKind"]; child_session_key: string | null; parent_task_id: string | null; agent_id: string | null; @@ -37,6 +38,10 @@ type TaskDeliveryStateRow = { last_notified_event_at: number | bigint | null; }; +type TableInfoRow = { + name: string; +}; + type TaskRegistryStatements = { selectAll: StatementSync; selectAllDeliveryStates: StatementSync; @@ -90,7 +95,8 @@ function rowToTaskRecord(row: TaskRegistryRow): TaskRecord { taskId: row.task_id, runtime: row.runtime, ...(row.source_id ? { sourceId: row.source_id } : {}), - requesterSessionKey: row.requester_session_key, + ownerKey: row.owner_key, + scopeKind: row.scope_kind, ...(row.child_session_key ? { childSessionKey: row.child_session_key } : {}), ...(row.parent_task_id ? { parentTaskId: row.parent_task_id } : {}), ...(row.agent_id ? { agentId: row.agent_id } : {}), @@ -127,7 +133,8 @@ function bindTaskRecord(record: TaskRecord) { task_id: record.taskId, runtime: record.runtime, source_id: record.sourceId ?? null, - requester_session_key: record.requesterSessionKey, + owner_key: record.ownerKey, + scope_kind: record.scopeKind, child_session_key: record.childSessionKey ?? null, parent_task_id: record.parentTaskId ?? null, agent_id: record.agentId ?? null, @@ -164,7 +171,8 @@ function createStatements(db: DatabaseSync): TaskRegistryStatements { task_id, runtime, source_id, - requester_session_key, + owner_key, + scope_kind, child_session_key, parent_task_id, agent_id, @@ -199,7 +207,8 @@ function createStatements(db: DatabaseSync): TaskRegistryStatements { task_id, runtime, source_id, - requester_session_key, + owner_key, + scope_kind, child_session_key, parent_task_id, agent_id, @@ -222,7 +231,8 @@ function createStatements(db: DatabaseSync): TaskRegistryStatements { @task_id, @runtime, @source_id, - @requester_session_key, + @owner_key, + @scope_kind, @child_session_key, @parent_task_id, @agent_id, @@ -245,7 +255,8 @@ function createStatements(db: DatabaseSync): TaskRegistryStatements { ON CONFLICT(task_id) DO UPDATE SET runtime = excluded.runtime, source_id = excluded.source_id, - requester_session_key = excluded.requester_session_key, + owner_key = excluded.owner_key, + scope_kind = excluded.scope_kind, child_session_key = excluded.child_session_key, parent_task_id = excluded.parent_task_id, agent_id = excluded.agent_id, @@ -283,13 +294,50 @@ function createStatements(db: DatabaseSync): TaskRegistryStatements { }; } +function hasTaskRunsColumn(db: DatabaseSync, columnName: string): boolean { + const rows = db.prepare(`PRAGMA table_info(task_runs)`).all() as TableInfoRow[]; + return rows.some((row) => row.name === columnName); +} + +function migrateLegacyOwnerColumns(db: DatabaseSync) { + if (!hasTaskRunsColumn(db, "owner_key")) { + db.exec(`ALTER TABLE task_runs ADD COLUMN owner_key TEXT;`); + } + if (!hasTaskRunsColumn(db, "scope_kind")) { + db.exec(`ALTER TABLE task_runs ADD COLUMN scope_kind TEXT NOT NULL DEFAULT 'session';`); + } + if (hasTaskRunsColumn(db, "requester_session_key")) { + db.exec(` + UPDATE task_runs + SET owner_key = requester_session_key + WHERE owner_key IS NULL + `); + } + db.exec(` + UPDATE task_runs + SET owner_key = CASE + WHEN trim(COALESCE(owner_key, '')) <> '' THEN trim(owner_key) + ELSE 'system:' || runtime || ':' || COALESCE(NULLIF(source_id, ''), task_id) + END + `); + db.exec(` + UPDATE task_runs + SET scope_kind = CASE + WHEN scope_kind = 'system' THEN 'system' + WHEN owner_key LIKE 'system:%' THEN 'system' + ELSE 'session' + END + `); +} + function ensureSchema(db: DatabaseSync) { db.exec(` CREATE TABLE IF NOT EXISTS task_runs ( task_id TEXT PRIMARY KEY, runtime TEXT NOT NULL, source_id TEXT, - requester_session_key TEXT NOT NULL, + owner_key TEXT NOT NULL, + scope_kind TEXT NOT NULL, child_session_key TEXT, parent_task_id TEXT, agent_id TEXT, @@ -310,6 +358,7 @@ function ensureSchema(db: DatabaseSync) { terminal_outcome TEXT ); `); + migrateLegacyOwnerColumns(db); db.exec(` CREATE TABLE IF NOT EXISTS task_delivery_state ( task_id TEXT PRIMARY KEY, @@ -322,6 +371,7 @@ function ensureSchema(db: DatabaseSync) { db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_runtime_status ON task_runs(runtime, status);`); db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_cleanup_after ON task_runs(cleanup_after);`); db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_last_event_at ON task_runs(last_event_at);`); + db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_owner_key ON task_runs(owner_key);`); db.exec( `CREATE INDEX IF NOT EXISTS idx_task_runs_child_session_key ON task_runs(child_session_key);`, ); diff --git a/src/tasks/task-registry.store.test.ts b/src/tasks/task-registry.store.test.ts index b4b3b6aca3c..09de084ddaa 100644 --- a/src/tasks/task-registry.store.test.ts +++ b/src/tasks/task-registry.store.test.ts @@ -1,7 +1,8 @@ -import { mkdtempSync, rmSync, statSync } from "node:fs"; +import { mkdirSync, mkdtempSync, rmSync, statSync } from "node:fs"; import os from "node:os"; import path from "node:path"; import { afterEach, describe, expect, it, vi } from "vitest"; +import { requireNodeSqlite } from "../infra/node-sqlite.js"; import { createTaskRecord, deleteTaskRecordById, @@ -17,7 +18,8 @@ function createStoredTask(): TaskRecord { taskId: "task-restored", runtime: "acp", sourceId: "run-restored", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", childSessionKey: "agent:codex:acp:restored", runId: "run-restored", task: "Restored task", @@ -57,7 +59,8 @@ describe("task-registry store runtime", () => { createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", childSessionKey: "agent:codex:acp:new", runId: "run-new", task: "New task", @@ -93,7 +96,8 @@ describe("task-registry store runtime", () => { expect(findTaskByRunId("run-restored")).toBeTruthy(); const created = createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", childSessionKey: "agent:codex:acp:new", runId: "run-new", task: "New task", @@ -120,7 +124,8 @@ describe("task-registry store runtime", () => { it("restores persisted tasks from the default sqlite store", () => { const created = createTaskRecord({ runtime: "cron", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", sourceId: "job-123", runId: "run-sqlite", task: "Run nightly cron", @@ -147,7 +152,8 @@ describe("task-registry store runtime", () => { createTaskRecord({ runtime: "cron", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", sourceId: "job-456", runId: "run-perms", task: "Run secured cron", @@ -164,4 +170,86 @@ describe("task-registry store runtime", () => { resetTaskRegistryForTests(); rmSync(stateDir, { recursive: true, force: true }); }); + + it("migrates legacy ownerless cron rows to system scope", () => { + const stateDir = mkdtempSync(path.join(os.tmpdir(), "openclaw-task-store-legacy-")); + process.env.OPENCLAW_STATE_DIR = stateDir; + const sqlitePath = resolveTaskRegistrySqlitePath(process.env); + mkdirSync(path.dirname(sqlitePath), { recursive: true }); + const { DatabaseSync } = requireNodeSqlite(); + const db = new DatabaseSync(sqlitePath); + db.exec(` + CREATE TABLE task_runs ( + task_id TEXT PRIMARY KEY, + runtime TEXT NOT NULL, + source_id TEXT, + requester_session_key TEXT NOT NULL, + child_session_key TEXT, + parent_task_id TEXT, + agent_id TEXT, + run_id TEXT, + label TEXT, + task TEXT NOT NULL, + status TEXT NOT NULL, + delivery_status TEXT NOT NULL, + notify_policy TEXT NOT NULL, + created_at INTEGER NOT NULL, + started_at INTEGER, + ended_at INTEGER, + last_event_at INTEGER, + cleanup_after INTEGER, + error TEXT, + progress_summary TEXT, + terminal_summary TEXT, + terminal_outcome TEXT + ); + `); + db.exec(` + CREATE TABLE task_delivery_state ( + task_id TEXT PRIMARY KEY, + requester_origin_json TEXT, + last_notified_event_at INTEGER + ); + `); + db.prepare(` + INSERT INTO task_runs ( + task_id, + runtime, + source_id, + requester_session_key, + child_session_key, + run_id, + task, + status, + delivery_status, + notify_policy, + created_at, + last_event_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + `).run( + "legacy-cron-task", + "cron", + "nightly-digest", + "", + "agent:main:cron:nightly-digest", + "legacy-cron-run", + "Nightly digest", + "running", + "not_applicable", + "silent", + 100, + 100, + ); + db.close(); + + resetTaskRegistryForTests({ persist: false }); + + expect(findTaskByRunId("legacy-cron-run")).toMatchObject({ + taskId: "legacy-cron-task", + ownerKey: "system:cron:nightly-digest", + scopeKind: "system", + deliveryStatus: "not_applicable", + notifyPolicy: "silent", + }); + }); }); diff --git a/src/tasks/task-registry.test.ts b/src/tasks/task-registry.test.ts index b69cc7af650..8ad5b1621ff 100644 --- a/src/tasks/task-registry.test.ts +++ b/src/tasks/task-registry.test.ts @@ -9,11 +9,12 @@ import { peekSystemEvents, resetSystemEventsForTest } from "../infra/system-even import { withTempDir } from "../test-helpers/temp-dir.js"; import { createTaskRecord, - findLatestTaskForSessionKey, + findLatestTaskForOwnerKey, + findLatestTaskForRelatedSessionKey, findTaskByRunId, getTaskById, getTaskRegistrySummary, - listTasksForSessionKey, + listTasksForOwnerKey, listTaskRecords, maybeDeliverTaskStateChangeUpdate, maybeDeliverTaskTerminalUpdate, @@ -133,7 +134,8 @@ describe("task-registry", () => { createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", childSessionKey: "agent:main:acp:child", runId: "run-1", task: "Do the thing", @@ -173,7 +175,8 @@ describe("task-registry", () => { createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", runId: "run-summary-acp", task: "Investigate issue", status: "queued", @@ -181,7 +184,8 @@ describe("task-registry", () => { }); createTaskRecord({ runtime: "cron", - requesterSessionKey: "", + ownerKey: "system:cron:run-summary-cron", + scopeKind: "system", runId: "run-summary-cron", task: "Daily digest", status: "running", @@ -189,7 +193,8 @@ describe("task-registry", () => { }); createTaskRecord({ runtime: "subagent", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", runId: "run-summary-subagent", task: "Write patch", status: "timed_out", @@ -232,7 +237,8 @@ describe("task-registry", () => { createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", requesterOrigin: { channel: "telegram", to: "telegram:123", @@ -286,7 +292,8 @@ describe("task-registry", () => { createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", requesterOrigin: { channel: "telegram", to: "telegram:123", @@ -332,7 +339,8 @@ describe("task-registry", () => { createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", requesterOrigin: { channel: "telegram", to: "telegram:123", @@ -368,7 +376,8 @@ describe("task-registry", () => { createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", childSessionKey: "agent:main:acp:child", runId: "run-session-queued", task: "Investigate issue", @@ -406,7 +415,8 @@ describe("task-registry", () => { createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", childSessionKey: "agent:main:acp:child", runId: "run-session-blocked", task: "Port the repo changes", @@ -443,7 +453,8 @@ describe("task-registry", () => { createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", requesterOrigin: { channel: "telegram", to: "telegram:123", @@ -494,7 +505,8 @@ describe("task-registry", () => { createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", requesterOrigin: { channel: "telegram", to: "telegram:123", @@ -535,7 +547,8 @@ describe("task-registry", () => { createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", requesterOrigin: { channel: "telegram", to: "telegram:123", @@ -569,7 +582,8 @@ describe("task-registry", () => { createTaskRecord({ runtime: "cli", - requesterSessionKey: "agent:codex:acp:child", + ownerKey: "agent:codex:acp:child", + scopeKind: "session", childSessionKey: "agent:codex:acp:child", runId: "run-shared", task: "Child ACP execution", @@ -579,7 +593,8 @@ describe("task-registry", () => { createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", childSessionKey: "agent:codex:acp:child", runId: "run-shared", task: "Spawn ACP child", @@ -607,7 +622,8 @@ describe("task-registry", () => { const directTask = createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", requesterOrigin: { channel: "telegram", to: "telegram:123", @@ -620,7 +636,8 @@ describe("task-registry", () => { }); const spawnedTask = createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", requesterOrigin: { channel: "telegram", to: "telegram:123", @@ -655,7 +672,8 @@ describe("task-registry", () => { const directTask = createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", requesterOrigin: { channel: "telegram", to: "telegram:123", @@ -669,7 +687,8 @@ describe("task-registry", () => { const spawnedTask = createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", requesterOrigin: { channel: "telegram", to: "telegram:123", @@ -699,7 +718,8 @@ describe("task-registry", () => { const spawnedTask = createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", requesterOrigin: { channel: "telegram", to: "telegram:123", @@ -713,7 +733,8 @@ describe("task-registry", () => { const directTask = createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", requesterOrigin: { channel: "telegram", to: "telegram:123", @@ -744,7 +765,8 @@ describe("task-registry", () => { const task = createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", requesterOrigin: { channel: "telegram", to: "telegram:123", @@ -784,7 +806,8 @@ describe("task-registry", () => { const task = createTaskRecord({ runtime: "subagent", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", childSessionKey: "agent:main:subagent:child", runId: "run-restore", task: "Restore me", @@ -813,26 +836,30 @@ describe("task-registry", () => { const older = createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", childSessionKey: "agent:main:subagent:child-1", runId: "run-session-lookup-1", task: "Older task", }); const latest = createTaskRecord({ runtime: "subagent", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", childSessionKey: "agent:main:subagent:child-2", runId: "run-session-lookup-2", task: "Latest task", }); nowSpy.mockRestore(); - expect(findLatestTaskForSessionKey("agent:main:main")?.taskId).toBe(latest.taskId); - expect(listTasksForSessionKey("agent:main:main").map((task) => task.taskId)).toEqual([ + expect(findLatestTaskForOwnerKey("agent:main:main")?.taskId).toBe(latest.taskId); + expect(listTasksForOwnerKey("agent:main:main").map((task) => task.taskId)).toEqual([ latest.taskId, older.taskId, ]); - expect(findLatestTaskForSessionKey("agent:main:subagent:child-1")?.taskId).toBe(older.taskId); + expect(findLatestTaskForRelatedSessionKey("agent:main:subagent:child-1")?.taskId).toBe( + older.taskId, + ); }); }); @@ -843,7 +870,8 @@ describe("task-registry", () => { const task = createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", childSessionKey: "agent:main:acp:missing", runId: "run-lost", task: "Missing child", @@ -876,7 +904,8 @@ describe("task-registry", () => { const task = createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", childSessionKey: "agent:main:acp:missing", runId: "run-lost-maintenance", task: "Missing child", @@ -908,7 +937,8 @@ describe("task-registry", () => { const task = createTaskRecord({ runtime: "cli", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", childSessionKey: "agent:main:main", runId: "run-prune", task: "Old completed task", @@ -945,7 +975,8 @@ describe("task-registry", () => { { taskId: "task-missing-cleanup", runtime: "cron", - requesterSessionKey: "", + ownerKey: "system:cron:run-maintenance-cleanup", + scopeKind: "system", runId: "run-maintenance-cleanup", task: "Finished cron", status: "failed", @@ -992,7 +1023,8 @@ describe("task-registry", () => { { taskId: "task-audit-summary", runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", runId: "run-audit-summary", task: "Hung task", status: "running", @@ -1038,7 +1070,8 @@ describe("task-registry", () => { const task = createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", requesterOrigin: { channel: "discord", to: "discord:123", @@ -1095,7 +1128,8 @@ describe("task-registry", () => { createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", requesterOrigin: { channel: "discord", to: "discord:123", @@ -1167,7 +1201,8 @@ describe("task-registry", () => { createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", requesterOrigin: { channel: "discord", to: "discord:123", @@ -1218,7 +1253,8 @@ describe("task-registry", () => { createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", requesterOrigin: { channel: "discord", to: "discord:123", @@ -1276,7 +1312,8 @@ describe("task-registry", () => { const task = registry.createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", requesterOrigin: { channel: "telegram", to: "telegram:123", @@ -1337,7 +1374,8 @@ describe("task-registry", () => { const task = registry.createTaskRecord({ runtime: "subagent", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", requesterOrigin: { channel: "telegram", to: "telegram:123", diff --git a/src/tasks/task-registry.ts b/src/tasks/task-registry.ts index e279cc577e6..d97a69e6962 100644 --- a/src/tasks/task-registry.ts +++ b/src/tasks/task-registry.ts @@ -35,6 +35,7 @@ import type { TaskRegistrySummary, TaskRegistrySnapshot, TaskRuntime, + TaskScopeKind, TaskStatus, TaskTerminalOutcome, } from "./task-registry.types.js"; @@ -45,7 +46,8 @@ const DEFAULT_TASK_RETENTION_MS = 7 * 24 * 60 * 60_000; const tasks = new Map(); const taskDeliveryStates = new Map(); const taskIdsByRunId = new Map>(); -const taskIdsBySessionKey = new Map>(); +const taskIdsByOwnerKey = new Map>(); +const taskIdsByRelatedSessionKey = new Map>(); const tasksWithPendingDelivery = new Set(); let listenerStarted = false; let listenerStop: (() => void) | null = null; @@ -54,10 +56,17 @@ let deliveryRuntimePromise: Promise, -) { +function addIndexedKey(index: Map>, key: string, taskId: string) { + let ids = index.get(key); + if (!ids) { + ids = new Set(); + index.set(key, ids); + } + ids.add(taskId); +} + +function deleteIndexedKey(index: Map>, key: string, taskId: string) { + const ids = index.get(key); + if (!ids) { + return; + } + ids.delete(taskId); + if (ids.size === 0) { + index.delete(key); + } +} + +function getTaskRelatedSessionIndexKeys(task: Pick) { return [ ...new Set( [ - normalizeSessionIndexKey(task.requesterSessionKey), + normalizeSessionIndexKey(task.ownerKey), normalizeSessionIndexKey(task.childSessionKey), ].filter(Boolean) as string[], ), ]; } -function addSessionKeyIndex( +function addOwnerKeyIndex(taskId: string, task: Pick) { + const key = normalizeSessionIndexKey(task.ownerKey); + if (!key) { + return; + } + addIndexedKey(taskIdsByOwnerKey, key, taskId); +} + +function deleteOwnerKeyIndex(taskId: string, task: Pick) { + const key = normalizeSessionIndexKey(task.ownerKey); + if (!key) { + return; + } + deleteIndexedKey(taskIdsByOwnerKey, key, taskId); +} + +function addRelatedSessionKeyIndex( taskId: string, - task: Pick, + task: Pick, ) { - for (const sessionKey of getTaskSessionIndexKeys(task)) { - let ids = taskIdsBySessionKey.get(sessionKey); - if (!ids) { - ids = new Set(); - taskIdsBySessionKey.set(sessionKey, ids); - } - ids.add(taskId); + for (const sessionKey of getTaskRelatedSessionIndexKeys(task)) { + addIndexedKey(taskIdsByRelatedSessionKey, sessionKey, taskId); } } -function deleteSessionKeyIndex( +function deleteRelatedSessionKeyIndex( taskId: string, - task: Pick, + task: Pick, ) { - for (const sessionKey of getTaskSessionIndexKeys(task)) { - const ids = taskIdsBySessionKey.get(sessionKey); - if (!ids) { - continue; - } - ids.delete(taskId); - if (ids.size === 0) { - taskIdsBySessionKey.delete(sessionKey); - } + for (const sessionKey of getTaskRelatedSessionIndexKeys(task)) { + deleteIndexedKey(taskIdsByRelatedSessionKey, sessionKey, taskId); } } @@ -279,10 +322,17 @@ function rebuildRunIdIndex() { } } -function rebuildSessionKeyIndex() { - taskIdsBySessionKey.clear(); +function rebuildOwnerKeyIndex() { + taskIdsByOwnerKey.clear(); for (const [taskId, task] of tasks.entries()) { - addSessionKeyIndex(taskId, task); + addOwnerKeyIndex(taskId, task); + } +} + +function rebuildRelatedSessionKeyIndex() { + taskIdsByRelatedSessionKey.clear(); + for (const [taskId, task] of tasks.entries()) { + addRelatedSessionKeyIndex(taskId, task); } } @@ -328,7 +378,8 @@ function compareTasksNewestFirst( function findExistingTaskForCreate(params: { runtime: TaskRuntime; - requesterSessionKey: string; + ownerKey: string; + scopeKind: TaskScopeKind; childSessionKey?: string; runId?: string; label?: string; @@ -339,8 +390,8 @@ function findExistingTaskForCreate(params: { ? getTasksByRunId(runId).find( (task) => task.runtime === params.runtime && - normalizeComparableText(task.requesterSessionKey) === - normalizeComparableText(params.requesterSessionKey) && + task.scopeKind === params.scopeKind && + normalizeComparableText(task.ownerKey) === normalizeComparableText(params.ownerKey) && normalizeComparableText(task.childSessionKey) === normalizeComparableText(params.childSessionKey) && normalizeComparableText(task.label) === normalizeComparableText(params.label) && @@ -356,8 +407,8 @@ function findExistingTaskForCreate(params: { const siblingMatches = getTasksByRunId(runId).filter( (task) => task.runtime === params.runtime && - normalizeComparableText(task.requesterSessionKey) === - normalizeComparableText(params.requesterSessionKey) && + task.scopeKind === params.scopeKind && + normalizeComparableText(task.ownerKey) === normalizeComparableText(params.ownerKey) && normalizeComparableText(task.childSessionKey) === normalizeComparableText(params.childSessionKey), ); @@ -418,7 +469,8 @@ function mergeExistingTaskForCreate( const notifyPolicy = ensureNotifyPolicy({ notifyPolicy: params.notifyPolicy, deliveryStatus: params.deliveryStatus, - requesterSessionKey: existing.requesterSessionKey, + ownerKey: existing.ownerKey, + scopeKind: existing.scopeKind, }); if (notifyPolicy !== existing.notifyPolicy && existing.notifyPolicy === "silent") { patch.notifyPolicy = notifyPolicy; @@ -447,8 +499,11 @@ function resolveTaskTerminalIdempotencyKey(task: TaskRecord): string { } function resolveTaskDeliveryOwner(task: TaskRecord): TaskDeliveryOwner { + if (task.scopeKind !== "session") { + return {}; + } return { - sessionKey: task.requesterSessionKey.trim(), + sessionKey: task.ownerKey.trim(), requesterOrigin: normalizeDeliveryContext(taskDeliveryStates.get(task.taskId)?.requesterOrigin), }; } @@ -470,7 +525,8 @@ function restoreTaskRegistryOnce() { taskDeliveryStates.set(taskId, state); } rebuildRunIdIndex(); - rebuildSessionKeyIndex(); + rebuildOwnerKeyIndex(); + rebuildRelatedSessionKeyIndex(); emitTaskRegistryHookEvent(() => ({ kind: "restored", tasks: snapshotTaskRecords(tasks), @@ -496,8 +552,7 @@ function updateTask(taskId: string, patch: Partial): TaskRecord | nu next.cleanupAfter = terminalAt + DEFAULT_TASK_RETENTION_MS; } const sessionIndexChanged = - normalizeSessionIndexKey(current.requesterSessionKey) !== - normalizeSessionIndexKey(next.requesterSessionKey) || + normalizeSessionIndexKey(current.ownerKey) !== normalizeSessionIndexKey(next.ownerKey) || normalizeSessionIndexKey(current.childSessionKey) !== normalizeSessionIndexKey(next.childSessionKey); tasks.set(taskId, next); @@ -505,8 +560,10 @@ function updateTask(taskId: string, patch: Partial): TaskRecord | nu rebuildRunIdIndex(); } if (sessionIndexChanged) { - deleteSessionKeyIndex(taskId, current); - addSessionKeyIndex(taskId, next); + deleteOwnerKeyIndex(taskId, current); + addOwnerKeyIndex(taskId, next); + deleteRelatedSessionKeyIndex(taskId, current); + addRelatedSessionKeyIndex(taskId, next); } persistTaskUpsert(next); emitTaskRegistryHookEvent(() => ({ @@ -548,20 +605,24 @@ function canDeliverTaskToRequesterOrigin(task: TaskRecord): boolean { return Boolean(channel && to && isDeliverableMessageChannel(channel)); } +function resolveMissingOwnerDeliveryStatus(task: TaskRecord): TaskDeliveryStatus { + return task.scopeKind === "system" ? "not_applicable" : "parent_missing"; +} + function queueTaskSystemEvent(task: TaskRecord, text: string) { const owner = resolveTaskDeliveryOwner(task); - const requesterSessionKey = owner.sessionKey.trim(); - if (!requesterSessionKey) { + const ownerKey = owner.sessionKey?.trim(); + if (!ownerKey) { return false; } enqueueSystemEvent(text, { - sessionKey: requesterSessionKey, + sessionKey: ownerKey, contextKey: `task:${task.taskId}`, deliveryContext: owner.requesterOrigin, }); requestHeartbeatNow({ reason: "background-task", - sessionKey: requesterSessionKey, + sessionKey: ownerKey, }); return true; } @@ -572,18 +633,18 @@ function queueBlockedTaskFollowup(task: TaskRecord) { return false; } const owner = resolveTaskDeliveryOwner(task); - const requesterSessionKey = owner.sessionKey.trim(); - if (!requesterSessionKey) { + const ownerKey = owner.sessionKey?.trim(); + if (!ownerKey) { return false; } enqueueSystemEvent(followupText, { - sessionKey: requesterSessionKey, + sessionKey: ownerKey, contextKey: `task:${task.taskId}:blocked-followup`, deliveryContext: owner.requesterOrigin, }); requestHeartbeatNow({ reason: "background-task-blocked", - sessionKey: requesterSessionKey, + sessionKey: ownerKey, }); return true; } @@ -615,9 +676,10 @@ export async function maybeDeliverTaskTerminalUpdate(taskId: string): Promise ({ kind: "upserted", @@ -1300,18 +1383,8 @@ export function findTaskByRunId(runId: string): TaskRecord | undefined { return task ? cloneTaskRecord(task) : undefined; } -export function findLatestTaskForSessionKey(sessionKey: string): TaskRecord | undefined { - const task = listTasksForSessionKey(sessionKey)[0]; - return task ? cloneTaskRecord(task) : undefined; -} - -export function listTasksForSessionKey(sessionKey: string): TaskRecord[] { - ensureTaskRegistryReady(); - const key = normalizeSessionIndexKey(sessionKey); - if (!key) { - return []; - } - const ids = taskIdsBySessionKey.get(key); +function listTasksFromIndex(index: Map>, key: string): TaskRecord[] { + const ids = index.get(key); if (!ids || ids.size === 0) { return []; } @@ -1331,12 +1404,42 @@ export function listTasksForSessionKey(sessionKey: string): TaskRecord[] { .map(({ insertionIndex: _, ...task }) => task); } +export function findLatestTaskForOwnerKey(ownerKey: string): TaskRecord | undefined { + const task = listTasksForOwnerKey(ownerKey)[0]; + return task ? cloneTaskRecord(task) : undefined; +} + +export function listTasksForOwnerKey(ownerKey: string): TaskRecord[] { + ensureTaskRegistryReady(); + const key = normalizeSessionIndexKey(ownerKey); + if (!key) { + return []; + } + return listTasksFromIndex(taskIdsByOwnerKey, key); +} + +export function findLatestTaskForRelatedSessionKey(sessionKey: string): TaskRecord | undefined { + const task = listTasksForRelatedSessionKey(sessionKey)[0]; + return task ? cloneTaskRecord(task) : undefined; +} + +export function listTasksForRelatedSessionKey(sessionKey: string): TaskRecord[] { + ensureTaskRegistryReady(); + const key = normalizeSessionIndexKey(sessionKey); + if (!key) { + return []; + } + return listTasksFromIndex(taskIdsByRelatedSessionKey, key); +} + export function resolveTaskForLookupToken(token: string): TaskRecord | undefined { const lookup = token.trim(); if (!lookup) { return undefined; } - return getTaskById(lookup) ?? findTaskByRunId(lookup) ?? findLatestTaskForSessionKey(lookup); + return ( + getTaskById(lookup) ?? findTaskByRunId(lookup) ?? findLatestTaskForRelatedSessionKey(lookup) + ); } export function deleteTaskRecordById(taskId: string): boolean { @@ -1345,7 +1448,8 @@ export function deleteTaskRecordById(taskId: string): boolean { if (!current) { return false; } - deleteSessionKeyIndex(taskId, current); + deleteOwnerKeyIndex(taskId, current); + deleteRelatedSessionKeyIndex(taskId, current); tasks.delete(taskId); taskDeliveryStates.delete(taskId); rebuildRunIdIndex(); @@ -1363,7 +1467,8 @@ export function resetTaskRegistryForTests(opts?: { persist?: boolean }) { tasks.clear(); taskDeliveryStates.clear(); taskIdsByRunId.clear(); - taskIdsBySessionKey.clear(); + taskIdsByOwnerKey.clear(); + taskIdsByRelatedSessionKey.clear(); tasksWithPendingDelivery.clear(); restoreAttempted = false; resetTaskRegistryRuntimeForTests(); diff --git a/src/tasks/task-registry.types.ts b/src/tasks/task-registry.types.ts index 2228f16d998..677a25f4371 100644 --- a/src/tasks/task-registry.types.ts +++ b/src/tasks/task-registry.types.ts @@ -22,6 +22,7 @@ export type TaskDeliveryStatus = export type TaskNotifyPolicy = "done_only" | "state_changes" | "silent"; export type TaskTerminalOutcome = "succeeded" | "blocked"; +export type TaskScopeKind = "session" | "system"; export type TaskStatusCounts = Record; export type TaskRuntimeCounts = Record; @@ -53,7 +54,8 @@ export type TaskRecord = { taskId: string; runtime: TaskRuntime; sourceId?: string; - requesterSessionKey: string; + ownerKey: string; + scopeKind: TaskScopeKind; childSessionKey?: string; parentTaskId?: string; agentId?: string;