diff --git a/src/acp/control-plane/manager.core.ts b/src/acp/control-plane/manager.core.ts index 8c69585fe31..9ce3ce610fe 100644 --- a/src/acp/control-plane/manager.core.ts +++ b/src/acp/control-plane/manager.core.ts @@ -1887,7 +1887,8 @@ export class AcpSessionManager { createRunningTaskRun({ runtime: "acp", sourceId: context.runId, - requesterSessionKey: context.requesterSessionKey, + ownerKey: context.requesterSessionKey, + scopeKind: "session", requesterOrigin: context.requesterOrigin, childSessionKey: context.childSessionKey, runId: context.runId, diff --git a/src/cron/service/ops.ts b/src/cron/service/ops.ts index 5a9bb2bbf29..94da7193eb2 100644 --- a/src/cron/service/ops.ts +++ b/src/cron/service/ops.ts @@ -400,7 +400,8 @@ function tryCreateManualTaskRun(params: { createRunningTaskRun({ runtime: "cron", sourceId: params.job.id, - requesterSessionKey: "", + ownerKey: "", + scopeKind: "system", childSessionKey: params.job.sessionKey, agentId: params.job.agentId, runId, diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index 56fcc0acaae..178feb2ba8a 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -138,7 +138,8 @@ function tryCreateCronTaskRun(params: { createRunningTaskRun({ runtime: "cron", sourceId: params.job.id, - requesterSessionKey: "", + ownerKey: "", + scopeKind: "system", childSessionKey: params.job.sessionKey, agentId: params.job.agentId, runId, diff --git a/src/tasks/task-executor.test.ts b/src/tasks/task-executor.test.ts index c4251a62921..f88cc014517 100644 --- a/src/tasks/task-executor.test.ts +++ b/src/tasks/task-executor.test.ts @@ -67,7 +67,8 @@ describe("task-executor", () => { await withTaskExecutorStateDir(async () => { const created = createQueuedTaskRun({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", childSessionKey: "agent:codex:acp:child", runId: "run-executor-queued", task: "Investigate issue", @@ -103,7 +104,8 @@ describe("task-executor", () => { await withTaskExecutorStateDir(async () => { const created = createRunningTaskRun({ runtime: "subagent", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", childSessionKey: "agent:codex:subagent:child", runId: "run-executor-fail", task: "Write summary", @@ -143,7 +145,8 @@ describe("task-executor", () => { await withTaskExecutorStateDir(async () => { const created = createRunningTaskRun({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", requesterOrigin: { channel: "telegram", to: "telegram:123", @@ -179,7 +182,8 @@ describe("task-executor", () => { const child = createRunningTaskRun({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", childSessionKey: "agent:codex:acp:child", runId: "run-linear-cancel", task: "Inspect a PR", @@ -217,7 +221,8 @@ describe("task-executor", () => { const child = createRunningTaskRun({ runtime: "subagent", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", childSessionKey: "agent:codex:subagent:child", runId: "run-subagent-cancel", task: "Inspect a PR", @@ -249,7 +254,8 @@ describe("task-executor", () => { await withTaskExecutorStateDir(async () => { const victim = createRunningTaskRun({ runtime: "acp", - requesterSessionKey: "agent:victim:main", + ownerKey: "agent:victim:main", + scopeKind: "session", childSessionKey: "agent:victim:acp:child", runId: "run-shared-executor-scope", task: "Victim ACP task", @@ -257,7 +263,8 @@ describe("task-executor", () => { }); const attacker = createRunningTaskRun({ runtime: "cli", - requesterSessionKey: "agent:attacker:main", + ownerKey: "agent:attacker:main", + scopeKind: "session", childSessionKey: "agent:attacker:main", runId: "run-shared-executor-scope", task: "Attacker CLI task", diff --git a/src/tasks/task-registry.test.ts b/src/tasks/task-registry.test.ts index e0a9fffd866..840e9072673 100644 --- a/src/tasks/task-registry.test.ts +++ b/src/tasks/task-registry.test.ts @@ -13,11 +13,12 @@ import { peekSystemEvents, resetSystemEventsForTest } from "../infra/system-even import { withTempDir } from "../test-helpers/temp-dir.js"; import { createTaskRecord, - findLatestTaskForSessionKey, + findLatestTaskForOwnerKey, + findLatestTaskForRelatedSessionKey, findTaskByRunId, getTaskById, getTaskRegistrySummary, - listTasksForSessionKey, + listTasksForOwnerKey, listTaskRecords, maybeDeliverTaskStateChangeUpdate, maybeDeliverTaskTerminalUpdate, @@ -139,7 +140,8 @@ describe("task-registry", () => { createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", childSessionKey: "agent:main:acp:child", runId: "run-1", task: "Do the thing", @@ -179,7 +181,8 @@ describe("task-registry", () => { createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", runId: "run-summary-acp", task: "Investigate issue", status: "queued", @@ -187,7 +190,8 @@ describe("task-registry", () => { }); createTaskRecord({ runtime: "cron", - requesterSessionKey: "", + ownerKey: "", + scopeKind: "system", runId: "run-summary-cron", task: "Daily digest", status: "running", @@ -195,7 +199,8 @@ describe("task-registry", () => { }); createTaskRecord({ runtime: "subagent", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", runId: "run-summary-subagent", task: "Write patch", status: "timed_out", @@ -238,7 +243,8 @@ describe("task-registry", () => { createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", requesterOrigin: { channel: "telegram", to: "telegram:123", @@ -292,7 +298,8 @@ describe("task-registry", () => { createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", requesterOrigin: { channel: "telegram", to: "telegram:123", @@ -338,7 +345,8 @@ describe("task-registry", () => { createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", requesterOrigin: { channel: "telegram", to: "telegram:123", @@ -374,7 +382,8 @@ describe("task-registry", () => { createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", childSessionKey: "agent:main:acp:child", runId: "run-session-queued", task: "Investigate issue", @@ -412,7 +421,8 @@ describe("task-registry", () => { createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", childSessionKey: "agent:main:acp:child", runId: "run-session-blocked", task: "Port the repo changes", @@ -449,7 +459,8 @@ describe("task-registry", () => { createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", requesterOrigin: { channel: "telegram", to: "telegram:123", @@ -500,7 +511,8 @@ describe("task-registry", () => { createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", requesterOrigin: { channel: "telegram", to: "telegram:123", @@ -541,7 +553,8 @@ describe("task-registry", () => { createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", requesterOrigin: { channel: "telegram", to: "telegram:123", @@ -575,7 +588,8 @@ describe("task-registry", () => { createTaskRecord({ runtime: "cli", - requesterSessionKey: "agent:codex:acp:child", + ownerKey: "agent:codex:acp:child", + scopeKind: "session", childSessionKey: "agent:codex:acp:child", runId: "run-shared", task: "Child ACP execution", @@ -585,7 +599,8 @@ describe("task-registry", () => { createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", childSessionKey: "agent:codex:acp:child", runId: "run-shared", task: "Spawn ACP child", @@ -608,7 +623,8 @@ describe("task-registry", () => { const victimTask = createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:victim:main", + ownerKey: "agent:victim:main", + scopeKind: "session", childSessionKey: "agent:victim:acp:child", runId: "run-shared-scope", task: "Victim ACP task", @@ -618,7 +634,8 @@ describe("task-registry", () => { const attackerTask = createTaskRecord({ runtime: "cli", - requesterSessionKey: "agent:attacker:main", + ownerKey: "agent:attacker:main", + scopeKind: "session", childSessionKey: "agent:attacker:main", runId: "run-shared-scope", task: "Attacker CLI task", @@ -662,7 +679,8 @@ describe("task-registry", () => { const directTask = createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", requesterOrigin: { channel: "telegram", to: "telegram:123", @@ -675,7 +693,8 @@ describe("task-registry", () => { }); const spawnedTask = createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", requesterOrigin: { channel: "telegram", to: "telegram:123", @@ -710,7 +729,8 @@ describe("task-registry", () => { const victimTask = createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:victim:main", + ownerKey: "agent:victim:main", + scopeKind: "session", childSessionKey: "agent:victim:acp:child", runId: "run-cross-requester-delivery", task: "Victim ACP task", @@ -719,7 +739,8 @@ describe("task-registry", () => { }); const attackerTask = createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:attacker:main", + ownerKey: "agent:attacker:main", + scopeKind: "session", childSessionKey: "agent:attacker:acp:child", runId: "run-cross-requester-delivery", task: "Attacker ACP task", @@ -760,7 +781,8 @@ describe("task-registry", () => { const directTask = createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", requesterOrigin: { channel: "telegram", to: "telegram:123", @@ -774,7 +796,8 @@ describe("task-registry", () => { const spawnedTask = createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", requesterOrigin: { channel: "telegram", to: "telegram:123", @@ -804,7 +827,8 @@ describe("task-registry", () => { const spawnedTask = createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", requesterOrigin: { channel: "telegram", to: "telegram:123", @@ -818,7 +842,8 @@ describe("task-registry", () => { const directTask = createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", requesterOrigin: { channel: "telegram", to: "telegram:123", @@ -849,7 +874,8 @@ describe("task-registry", () => { const task = createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", requesterOrigin: { channel: "telegram", to: "telegram:123", @@ -889,7 +915,8 @@ describe("task-registry", () => { const task = createTaskRecord({ runtime: "subagent", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", childSessionKey: "agent:main:subagent:child", runId: "run-restore", task: "Restore me", @@ -918,26 +945,30 @@ describe("task-registry", () => { const older = createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", childSessionKey: "agent:main:subagent:child-1", runId: "run-session-lookup-1", task: "Older task", }); const latest = createTaskRecord({ runtime: "subagent", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", childSessionKey: "agent:main:subagent:child-2", runId: "run-session-lookup-2", task: "Latest task", }); nowSpy.mockRestore(); - expect(findLatestTaskForSessionKey("agent:main:main")?.taskId).toBe(latest.taskId); - expect(listTasksForSessionKey("agent:main:main").map((task) => task.taskId)).toEqual([ + expect(findLatestTaskForOwnerKey("agent:main:main")?.taskId).toBe(latest.taskId); + expect(listTasksForOwnerKey("agent:main:main").map((task) => task.taskId)).toEqual([ latest.taskId, older.taskId, ]); - expect(findLatestTaskForSessionKey("agent:main:subagent:child-1")?.taskId).toBe(older.taskId); + expect(findLatestTaskForRelatedSessionKey("agent:main:subagent:child-1")?.taskId).toBe( + older.taskId, + ); }); }); @@ -948,7 +979,8 @@ describe("task-registry", () => { const task = createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", childSessionKey: "agent:main:acp:missing", runId: "run-lost", task: "Missing child", @@ -981,7 +1013,8 @@ describe("task-registry", () => { const task = createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", childSessionKey: "agent:main:acp:missing", runId: "run-lost-maintenance", task: "Missing child", @@ -1013,7 +1046,8 @@ describe("task-registry", () => { const task = createTaskRecord({ runtime: "cli", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", childSessionKey: "agent:main:main", runId: "run-prune", task: "Old completed task", @@ -1147,7 +1181,8 @@ describe("task-registry", () => { const task = createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", requesterOrigin: { channel: "discord", to: "discord:123", @@ -1204,7 +1239,8 @@ describe("task-registry", () => { createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", requesterOrigin: { channel: "discord", to: "discord:123", @@ -1276,7 +1312,8 @@ describe("task-registry", () => { createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", requesterOrigin: { channel: "discord", to: "discord:123", @@ -1327,7 +1364,8 @@ describe("task-registry", () => { createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", requesterOrigin: { channel: "discord", to: "discord:123", @@ -1385,7 +1423,8 @@ describe("task-registry", () => { const task = registry.createTaskRecord({ runtime: "acp", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", requesterOrigin: { channel: "telegram", to: "telegram:123", @@ -1446,7 +1485,8 @@ describe("task-registry", () => { const task = registry.createTaskRecord({ runtime: "subagent", - requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", requesterOrigin: { channel: "telegram", to: "telegram:123", diff --git a/src/tasks/task-registry.ts b/src/tasks/task-registry.ts index 217cb404f7c..d1fc51c3181 100644 --- a/src/tasks/task-registry.ts +++ b/src/tasks/task-registry.ts @@ -46,7 +46,8 @@ const DEFAULT_TASK_RETENTION_MS = 7 * 24 * 60 * 60_000; const tasks = new Map(); const taskDeliveryStates = new Map(); const taskIdsByRunId = new Map>(); -const taskIdsBySessionKey = new Map>(); +const taskIdsByOwnerKey = new Map>(); +const taskIdsByRelatedSessionKey = new Map>(); const tasksWithPendingDelivery = new Set(); let listenerStarted = false; let listenerStop: (() => void) | null = null; @@ -55,10 +56,17 @@ let deliveryRuntimePromise: Promise, -) { +function addIndexedKey(index: Map>, key: string, taskId: string) { + let ids = index.get(key); + if (!ids) { + ids = new Set(); + index.set(key, ids); + } + ids.add(taskId); +} + +function deleteIndexedKey(index: Map>, key: string, taskId: string) { + const ids = index.get(key); + if (!ids) { + return; + } + ids.delete(taskId); + if (ids.size === 0) { + index.delete(key); + } +} + +function getTaskRelatedSessionIndexKeys(task: Pick) { return [ ...new Set( [ - normalizeSessionIndexKey(task.requesterSessionKey), + normalizeSessionIndexKey(task.ownerKey), normalizeSessionIndexKey(task.childSessionKey), ].filter(Boolean) as string[], ), ]; } -function addSessionKeyIndex( +function addOwnerKeyIndex(taskId: string, task: Pick) { + const key = normalizeSessionIndexKey(task.ownerKey); + if (!key) { + return; + } + addIndexedKey(taskIdsByOwnerKey, key, taskId); +} + +function deleteOwnerKeyIndex(taskId: string, task: Pick) { + const key = normalizeSessionIndexKey(task.ownerKey); + if (!key) { + return; + } + deleteIndexedKey(taskIdsByOwnerKey, key, taskId); +} + +function addRelatedSessionKeyIndex( taskId: string, - task: Pick, + task: Pick, ) { - for (const sessionKey of getTaskSessionIndexKeys(task)) { - let ids = taskIdsBySessionKey.get(sessionKey); - if (!ids) { - ids = new Set(); - taskIdsBySessionKey.set(sessionKey, ids); - } - ids.add(taskId); + for (const sessionKey of getTaskRelatedSessionIndexKeys(task)) { + addIndexedKey(taskIdsByRelatedSessionKey, sessionKey, taskId); } } -function deleteSessionKeyIndex( +function deleteRelatedSessionKeyIndex( taskId: string, - task: Pick, + task: Pick, ) { - for (const sessionKey of getTaskSessionIndexKeys(task)) { - const ids = taskIdsBySessionKey.get(sessionKey); - if (!ids) { - continue; - } - ids.delete(taskId); - if (ids.size === 0) { - taskIdsBySessionKey.delete(sessionKey); - } + for (const sessionKey of getTaskRelatedSessionIndexKeys(task)) { + deleteIndexedKey(taskIdsByRelatedSessionKey, sessionKey, taskId); } } @@ -321,10 +363,17 @@ function rebuildRunIdIndex() { } } -function rebuildSessionKeyIndex() { - taskIdsBySessionKey.clear(); +function rebuildOwnerKeyIndex() { + taskIdsByOwnerKey.clear(); for (const [taskId, task] of tasks.entries()) { - addSessionKeyIndex(taskId, task); + addOwnerKeyIndex(taskId, task); + } +} + +function rebuildRelatedSessionKeyIndex() { + taskIdsByRelatedSessionKey.clear(); + for (const [taskId, task] of tasks.entries()) { + addRelatedSessionKeyIndex(taskId, task); } } @@ -338,12 +387,13 @@ function getTasksByRunId(runId: string): TaskRecord[] { .filter((task): task is TaskRecord => Boolean(task)); } -function taskRunOwnerKey( - task: Pick, +function taskRunScopeKey( + task: Pick, ): string { return [ task.runtime, - normalizeComparableText(task.requesterSessionKey), + task.scopeKind, + normalizeComparableText(task.ownerKey), normalizeComparableText(task.childSessionKey), ].join("\u0000"); } @@ -364,12 +414,14 @@ function getTasksByRunScope(params: { if (childMatches.length > 0) { return childMatches; } - return matches.filter( - (task) => normalizeSessionIndexKey(task.requesterSessionKey) === sessionKey, + const ownerMatches = matches.filter( + (task) => + task.scopeKind === "session" && normalizeSessionIndexKey(task.ownerKey) === sessionKey, ); + return ownerMatches; } - const ownerKeys = new Set(matches.map((task) => taskRunOwnerKey(task))); - return ownerKeys.size <= 1 ? matches : []; + const scopeKeys = new Set(matches.map((task) => taskRunScopeKey(task))); + return scopeKeys.size <= 1 ? matches : []; } function getPeerTasksForDelivery(task: TaskRecord): TaskRecord[] { @@ -379,8 +431,8 @@ function getPeerTasksForDelivery(task: TaskRecord): TaskRecord[] { return getTasksByRunId(task.runId).filter( (candidate) => candidate.runtime === task.runtime && - normalizeComparableText(candidate.requesterSessionKey) === - normalizeComparableText(task.requesterSessionKey) && + candidate.scopeKind === task.scopeKind && + normalizeComparableText(candidate.ownerKey) === normalizeComparableText(task.ownerKey) && normalizeComparableText(candidate.childSessionKey) === normalizeComparableText(task.childSessionKey), ); @@ -418,7 +470,8 @@ function compareTasksNewestFirst( function findExistingTaskForCreate(params: { runtime: TaskRuntime; - requesterSessionKey: string; + ownerKey: string; + scopeKind: TaskScopeKind; childSessionKey?: string; runId?: string; label?: string; @@ -429,8 +482,8 @@ function findExistingTaskForCreate(params: { ? getTasksByRunId(runId).filter( (task) => task.runtime === params.runtime && - normalizeComparableText(task.requesterSessionKey) === - normalizeComparableText(params.requesterSessionKey) && + task.scopeKind === params.scopeKind && + normalizeComparableText(task.ownerKey) === normalizeComparableText(params.ownerKey) && normalizeComparableText(task.childSessionKey) === normalizeComparableText(params.childSessionKey), ) @@ -505,7 +558,8 @@ function mergeExistingTaskForCreate( const notifyPolicy = ensureNotifyPolicy({ notifyPolicy: params.notifyPolicy, deliveryStatus: params.deliveryStatus, - requesterSessionKey: existing.requesterSessionKey, + ownerKey: existing.ownerKey, + scopeKind: existing.scopeKind, }); if (notifyPolicy !== existing.notifyPolicy && existing.notifyPolicy === "silent") { patch.notifyPolicy = notifyPolicy; @@ -534,8 +588,11 @@ function resolveTaskTerminalIdempotencyKey(task: TaskRecord): string { } function resolveTaskDeliveryOwner(task: TaskRecord): TaskDeliveryOwner { + if (task.scopeKind !== "session") { + return {}; + } return { - sessionKey: task.requesterSessionKey.trim(), + sessionKey: task.ownerKey.trim(), requesterOrigin: normalizeDeliveryContext(taskDeliveryStates.get(task.taskId)?.requesterOrigin), }; } @@ -557,7 +614,8 @@ function restoreTaskRegistryOnce() { taskDeliveryStates.set(taskId, state); } rebuildRunIdIndex(); - rebuildSessionKeyIndex(); + rebuildOwnerKeyIndex(); + rebuildRelatedSessionKeyIndex(); emitTaskRegistryHookEvent(() => ({ kind: "restored", tasks: snapshotTaskRecords(tasks), @@ -583,8 +641,7 @@ function updateTask(taskId: string, patch: Partial): TaskRecord | nu next.cleanupAfter = terminalAt + DEFAULT_TASK_RETENTION_MS; } const sessionIndexChanged = - normalizeSessionIndexKey(current.requesterSessionKey) !== - normalizeSessionIndexKey(next.requesterSessionKey) || + normalizeSessionIndexKey(current.ownerKey) !== normalizeSessionIndexKey(next.ownerKey) || normalizeSessionIndexKey(current.childSessionKey) !== normalizeSessionIndexKey(next.childSessionKey); tasks.set(taskId, next); @@ -592,8 +649,10 @@ function updateTask(taskId: string, patch: Partial): TaskRecord | nu rebuildRunIdIndex(); } if (sessionIndexChanged) { - deleteSessionKeyIndex(taskId, current); - addSessionKeyIndex(taskId, next); + deleteOwnerKeyIndex(taskId, current); + addOwnerKeyIndex(taskId, next); + deleteRelatedSessionKeyIndex(taskId, current); + addRelatedSessionKeyIndex(taskId, next); } persistTaskUpsert(next); emitTaskRegistryHookEvent(() => ({ @@ -635,20 +694,24 @@ function canDeliverTaskToRequesterOrigin(task: TaskRecord): boolean { return Boolean(channel && to && isDeliverableMessageChannel(channel)); } +function resolveMissingOwnerDeliveryStatus(task: TaskRecord): TaskDeliveryStatus { + return task.scopeKind === "system" ? "not_applicable" : "parent_missing"; +} + function queueTaskSystemEvent(task: TaskRecord, text: string) { const owner = resolveTaskDeliveryOwner(task); - const requesterSessionKey = owner.sessionKey.trim(); - if (!requesterSessionKey) { + const ownerKey = owner.sessionKey?.trim(); + if (!ownerKey) { return false; } enqueueSystemEvent(text, { - sessionKey: requesterSessionKey, + sessionKey: ownerKey, contextKey: `task:${task.taskId}`, deliveryContext: owner.requesterOrigin, }); requestHeartbeatNow({ reason: "background-task", - sessionKey: requesterSessionKey, + sessionKey: ownerKey, }); return true; } @@ -659,18 +722,18 @@ function queueBlockedTaskFollowup(task: TaskRecord) { return false; } const owner = resolveTaskDeliveryOwner(task); - const requesterSessionKey = owner.sessionKey.trim(); - if (!requesterSessionKey) { + const ownerKey = owner.sessionKey?.trim(); + if (!ownerKey) { return false; } enqueueSystemEvent(followupText, { - sessionKey: requesterSessionKey, + sessionKey: ownerKey, contextKey: `task:${task.taskId}:blocked-followup`, deliveryContext: owner.requesterOrigin, }); requestHeartbeatNow({ reason: "background-task-blocked", - sessionKey: requesterSessionKey, + sessionKey: ownerKey, }); return true; } @@ -702,9 +765,10 @@ export async function maybeDeliverTaskTerminalUpdate(taskId: string): Promise ({ kind: "upserted", @@ -1429,26 +1501,8 @@ export function findTaskByRunId(runId: string): TaskRecord | undefined { return task ? cloneTaskRecord(task) : undefined; } -export function findLatestTaskForSessionKey(sessionKey: string): TaskRecord | undefined { - const task = listTasksForSessionKey(sessionKey)[0]; - return task ? cloneTaskRecord(task) : undefined; -} - -export function findLatestTaskForOwnerKey(ownerKey: string): TaskRecord | undefined { - return findLatestTaskForSessionKey(ownerKey); -} - -export function findLatestTaskForRelatedSessionKey(sessionKey: string): TaskRecord | undefined { - return findLatestTaskForSessionKey(sessionKey); -} - -export function listTasksForSessionKey(sessionKey: string): TaskRecord[] { - ensureTaskRegistryReady(); - const key = normalizeSessionIndexKey(sessionKey); - if (!key) { - return []; - } - const ids = taskIdsBySessionKey.get(key); +function listTasksFromIndex(index: Map>, key: string): TaskRecord[] { + const ids = index.get(key); if (!ids || ids.size === 0) { return []; } @@ -1468,12 +1522,46 @@ export function listTasksForSessionKey(sessionKey: string): TaskRecord[] { .map(({ insertionIndex: _, ...task }) => task); } +export function findLatestTaskForSessionKey(sessionKey: string): TaskRecord | undefined { + const task = listTasksForSessionKey(sessionKey)[0]; + return task ? cloneTaskRecord(task) : undefined; +} + +export function listTasksForSessionKey(sessionKey: string): TaskRecord[] { + ensureTaskRegistryReady(); + const key = normalizeSessionIndexKey(sessionKey); + if (!key) { + return []; + } + return listTasksFromIndex(taskIdsByRelatedSessionKey, key); +} + +export function findLatestTaskForOwnerKey(ownerKey: string): TaskRecord | undefined { + const task = listTasksForOwnerKey(ownerKey)[0]; + return task ? cloneTaskRecord(task) : undefined; +} + export function listTasksForOwnerKey(ownerKey: string): TaskRecord[] { - return listTasksForSessionKey(ownerKey); + ensureTaskRegistryReady(); + const key = normalizeSessionIndexKey(ownerKey); + if (!key) { + return []; + } + return listTasksFromIndex(taskIdsByOwnerKey, key); +} + +export function findLatestTaskForRelatedSessionKey(sessionKey: string): TaskRecord | undefined { + const task = listTasksForRelatedSessionKey(sessionKey)[0]; + return task ? cloneTaskRecord(task) : undefined; } export function listTasksForRelatedSessionKey(sessionKey: string): TaskRecord[] { - return listTasksForSessionKey(sessionKey); + ensureTaskRegistryReady(); + const key = normalizeSessionIndexKey(sessionKey); + if (!key) { + return []; + } + return listTasksFromIndex(taskIdsByRelatedSessionKey, key); } export function resolveTaskForLookupToken(token: string): TaskRecord | undefined { @@ -1481,7 +1569,9 @@ export function resolveTaskForLookupToken(token: string): TaskRecord | undefined if (!lookup) { return undefined; } - return getTaskById(lookup) ?? findTaskByRunId(lookup) ?? findLatestTaskForSessionKey(lookup); + return ( + getTaskById(lookup) ?? findTaskByRunId(lookup) ?? findLatestTaskForRelatedSessionKey(lookup) + ); } export function deleteTaskRecordById(taskId: string): boolean { @@ -1490,7 +1580,8 @@ export function deleteTaskRecordById(taskId: string): boolean { if (!current) { return false; } - deleteSessionKeyIndex(taskId, current); + deleteOwnerKeyIndex(taskId, current); + deleteRelatedSessionKeyIndex(taskId, current); tasks.delete(taskId); taskDeliveryStates.delete(taskId); rebuildRunIdIndex(); @@ -1508,7 +1599,8 @@ export function resetTaskRegistryForTests(opts?: { persist?: boolean }) { tasks.clear(); taskDeliveryStates.clear(); taskIdsByRunId.clear(); - taskIdsBySessionKey.clear(); + taskIdsByOwnerKey.clear(); + taskIdsByRelatedSessionKey.clear(); tasksWithPendingDelivery.clear(); restoreAttempted = false; resetTaskRegistryRuntimeForTests();