fix(tasks): restore owner-key task scope

This commit is contained in:
Vincent Koc 2026-04-01 03:51:12 +09:00
parent 5fdde9b237
commit 80ed55332d
6 changed files with 290 additions and 148 deletions

View File

@ -1887,7 +1887,8 @@ export class AcpSessionManager {
createRunningTaskRun({
runtime: "acp",
sourceId: context.runId,
requesterSessionKey: context.requesterSessionKey,
ownerKey: context.requesterSessionKey,
scopeKind: "session",
requesterOrigin: context.requesterOrigin,
childSessionKey: context.childSessionKey,
runId: context.runId,

View File

@ -400,7 +400,8 @@ function tryCreateManualTaskRun(params: {
createRunningTaskRun({
runtime: "cron",
sourceId: params.job.id,
requesterSessionKey: "",
ownerKey: "",
scopeKind: "system",
childSessionKey: params.job.sessionKey,
agentId: params.job.agentId,
runId,

View File

@ -138,7 +138,8 @@ function tryCreateCronTaskRun(params: {
createRunningTaskRun({
runtime: "cron",
sourceId: params.job.id,
requesterSessionKey: "",
ownerKey: "",
scopeKind: "system",
childSessionKey: params.job.sessionKey,
agentId: params.job.agentId,
runId,

View File

@ -67,7 +67,8 @@ describe("task-executor", () => {
await withTaskExecutorStateDir(async () => {
const created = createQueuedTaskRun({
runtime: "acp",
requesterSessionKey: "agent:main:main",
ownerKey: "agent:main:main",
scopeKind: "session",
childSessionKey: "agent:codex:acp:child",
runId: "run-executor-queued",
task: "Investigate issue",
@ -103,7 +104,8 @@ describe("task-executor", () => {
await withTaskExecutorStateDir(async () => {
const created = createRunningTaskRun({
runtime: "subagent",
requesterSessionKey: "agent:main:main",
ownerKey: "agent:main:main",
scopeKind: "session",
childSessionKey: "agent:codex:subagent:child",
runId: "run-executor-fail",
task: "Write summary",
@ -143,7 +145,8 @@ describe("task-executor", () => {
await withTaskExecutorStateDir(async () => {
const created = createRunningTaskRun({
runtime: "acp",
requesterSessionKey: "agent:main:main",
ownerKey: "agent:main:main",
scopeKind: "session",
requesterOrigin: {
channel: "telegram",
to: "telegram:123",
@ -179,7 +182,8 @@ describe("task-executor", () => {
const child = createRunningTaskRun({
runtime: "acp",
requesterSessionKey: "agent:main:main",
ownerKey: "agent:main:main",
scopeKind: "session",
childSessionKey: "agent:codex:acp:child",
runId: "run-linear-cancel",
task: "Inspect a PR",
@ -217,7 +221,8 @@ describe("task-executor", () => {
const child = createRunningTaskRun({
runtime: "subagent",
requesterSessionKey: "agent:main:main",
ownerKey: "agent:main:main",
scopeKind: "session",
childSessionKey: "agent:codex:subagent:child",
runId: "run-subagent-cancel",
task: "Inspect a PR",
@ -249,7 +254,8 @@ describe("task-executor", () => {
await withTaskExecutorStateDir(async () => {
const victim = createRunningTaskRun({
runtime: "acp",
requesterSessionKey: "agent:victim:main",
ownerKey: "agent:victim:main",
scopeKind: "session",
childSessionKey: "agent:victim:acp:child",
runId: "run-shared-executor-scope",
task: "Victim ACP task",
@ -257,7 +263,8 @@ describe("task-executor", () => {
});
const attacker = createRunningTaskRun({
runtime: "cli",
requesterSessionKey: "agent:attacker:main",
ownerKey: "agent:attacker:main",
scopeKind: "session",
childSessionKey: "agent:attacker:main",
runId: "run-shared-executor-scope",
task: "Attacker CLI task",

View File

@ -13,11 +13,12 @@ import { peekSystemEvents, resetSystemEventsForTest } from "../infra/system-even
import { withTempDir } from "../test-helpers/temp-dir.js";
import {
createTaskRecord,
findLatestTaskForSessionKey,
findLatestTaskForOwnerKey,
findLatestTaskForRelatedSessionKey,
findTaskByRunId,
getTaskById,
getTaskRegistrySummary,
listTasksForSessionKey,
listTasksForOwnerKey,
listTaskRecords,
maybeDeliverTaskStateChangeUpdate,
maybeDeliverTaskTerminalUpdate,
@ -139,7 +140,8 @@ describe("task-registry", () => {
createTaskRecord({
runtime: "acp",
requesterSessionKey: "agent:main:main",
ownerKey: "agent:main:main",
scopeKind: "session",
childSessionKey: "agent:main:acp:child",
runId: "run-1",
task: "Do the thing",
@ -179,7 +181,8 @@ describe("task-registry", () => {
createTaskRecord({
runtime: "acp",
requesterSessionKey: "agent:main:main",
ownerKey: "agent:main:main",
scopeKind: "session",
runId: "run-summary-acp",
task: "Investigate issue",
status: "queued",
@ -187,7 +190,8 @@ describe("task-registry", () => {
});
createTaskRecord({
runtime: "cron",
requesterSessionKey: "",
ownerKey: "",
scopeKind: "system",
runId: "run-summary-cron",
task: "Daily digest",
status: "running",
@ -195,7 +199,8 @@ describe("task-registry", () => {
});
createTaskRecord({
runtime: "subagent",
requesterSessionKey: "agent:main:main",
ownerKey: "agent:main:main",
scopeKind: "session",
runId: "run-summary-subagent",
task: "Write patch",
status: "timed_out",
@ -238,7 +243,8 @@ describe("task-registry", () => {
createTaskRecord({
runtime: "acp",
requesterSessionKey: "agent:main:main",
ownerKey: "agent:main:main",
scopeKind: "session",
requesterOrigin: {
channel: "telegram",
to: "telegram:123",
@ -292,7 +298,8 @@ describe("task-registry", () => {
createTaskRecord({
runtime: "acp",
requesterSessionKey: "agent:main:main",
ownerKey: "agent:main:main",
scopeKind: "session",
requesterOrigin: {
channel: "telegram",
to: "telegram:123",
@ -338,7 +345,8 @@ describe("task-registry", () => {
createTaskRecord({
runtime: "acp",
requesterSessionKey: "agent:main:main",
ownerKey: "agent:main:main",
scopeKind: "session",
requesterOrigin: {
channel: "telegram",
to: "telegram:123",
@ -374,7 +382,8 @@ describe("task-registry", () => {
createTaskRecord({
runtime: "acp",
requesterSessionKey: "agent:main:main",
ownerKey: "agent:main:main",
scopeKind: "session",
childSessionKey: "agent:main:acp:child",
runId: "run-session-queued",
task: "Investigate issue",
@ -412,7 +421,8 @@ describe("task-registry", () => {
createTaskRecord({
runtime: "acp",
requesterSessionKey: "agent:main:main",
ownerKey: "agent:main:main",
scopeKind: "session",
childSessionKey: "agent:main:acp:child",
runId: "run-session-blocked",
task: "Port the repo changes",
@ -449,7 +459,8 @@ describe("task-registry", () => {
createTaskRecord({
runtime: "acp",
requesterSessionKey: "agent:main:main",
ownerKey: "agent:main:main",
scopeKind: "session",
requesterOrigin: {
channel: "telegram",
to: "telegram:123",
@ -500,7 +511,8 @@ describe("task-registry", () => {
createTaskRecord({
runtime: "acp",
requesterSessionKey: "agent:main:main",
ownerKey: "agent:main:main",
scopeKind: "session",
requesterOrigin: {
channel: "telegram",
to: "telegram:123",
@ -541,7 +553,8 @@ describe("task-registry", () => {
createTaskRecord({
runtime: "acp",
requesterSessionKey: "agent:main:main",
ownerKey: "agent:main:main",
scopeKind: "session",
requesterOrigin: {
channel: "telegram",
to: "telegram:123",
@ -575,7 +588,8 @@ describe("task-registry", () => {
createTaskRecord({
runtime: "cli",
requesterSessionKey: "agent:codex:acp:child",
ownerKey: "agent:codex:acp:child",
scopeKind: "session",
childSessionKey: "agent:codex:acp:child",
runId: "run-shared",
task: "Child ACP execution",
@ -585,7 +599,8 @@ describe("task-registry", () => {
createTaskRecord({
runtime: "acp",
requesterSessionKey: "agent:main:main",
ownerKey: "agent:main:main",
scopeKind: "session",
childSessionKey: "agent:codex:acp:child",
runId: "run-shared",
task: "Spawn ACP child",
@ -608,7 +623,8 @@ describe("task-registry", () => {
const victimTask = createTaskRecord({
runtime: "acp",
requesterSessionKey: "agent:victim:main",
ownerKey: "agent:victim:main",
scopeKind: "session",
childSessionKey: "agent:victim:acp:child",
runId: "run-shared-scope",
task: "Victim ACP task",
@ -618,7 +634,8 @@ describe("task-registry", () => {
const attackerTask = createTaskRecord({
runtime: "cli",
requesterSessionKey: "agent:attacker:main",
ownerKey: "agent:attacker:main",
scopeKind: "session",
childSessionKey: "agent:attacker:main",
runId: "run-shared-scope",
task: "Attacker CLI task",
@ -662,7 +679,8 @@ describe("task-registry", () => {
const directTask = createTaskRecord({
runtime: "acp",
requesterSessionKey: "agent:main:main",
ownerKey: "agent:main:main",
scopeKind: "session",
requesterOrigin: {
channel: "telegram",
to: "telegram:123",
@ -675,7 +693,8 @@ describe("task-registry", () => {
});
const spawnedTask = createTaskRecord({
runtime: "acp",
requesterSessionKey: "agent:main:main",
ownerKey: "agent:main:main",
scopeKind: "session",
requesterOrigin: {
channel: "telegram",
to: "telegram:123",
@ -710,7 +729,8 @@ describe("task-registry", () => {
const victimTask = createTaskRecord({
runtime: "acp",
requesterSessionKey: "agent:victim:main",
ownerKey: "agent:victim:main",
scopeKind: "session",
childSessionKey: "agent:victim:acp:child",
runId: "run-cross-requester-delivery",
task: "Victim ACP task",
@ -719,7 +739,8 @@ describe("task-registry", () => {
});
const attackerTask = createTaskRecord({
runtime: "acp",
requesterSessionKey: "agent:attacker:main",
ownerKey: "agent:attacker:main",
scopeKind: "session",
childSessionKey: "agent:attacker:acp:child",
runId: "run-cross-requester-delivery",
task: "Attacker ACP task",
@ -760,7 +781,8 @@ describe("task-registry", () => {
const directTask = createTaskRecord({
runtime: "acp",
requesterSessionKey: "agent:main:main",
ownerKey: "agent:main:main",
scopeKind: "session",
requesterOrigin: {
channel: "telegram",
to: "telegram:123",
@ -774,7 +796,8 @@ describe("task-registry", () => {
const spawnedTask = createTaskRecord({
runtime: "acp",
requesterSessionKey: "agent:main:main",
ownerKey: "agent:main:main",
scopeKind: "session",
requesterOrigin: {
channel: "telegram",
to: "telegram:123",
@ -804,7 +827,8 @@ describe("task-registry", () => {
const spawnedTask = createTaskRecord({
runtime: "acp",
requesterSessionKey: "agent:main:main",
ownerKey: "agent:main:main",
scopeKind: "session",
requesterOrigin: {
channel: "telegram",
to: "telegram:123",
@ -818,7 +842,8 @@ describe("task-registry", () => {
const directTask = createTaskRecord({
runtime: "acp",
requesterSessionKey: "agent:main:main",
ownerKey: "agent:main:main",
scopeKind: "session",
requesterOrigin: {
channel: "telegram",
to: "telegram:123",
@ -849,7 +874,8 @@ describe("task-registry", () => {
const task = createTaskRecord({
runtime: "acp",
requesterSessionKey: "agent:main:main",
ownerKey: "agent:main:main",
scopeKind: "session",
requesterOrigin: {
channel: "telegram",
to: "telegram:123",
@ -889,7 +915,8 @@ describe("task-registry", () => {
const task = createTaskRecord({
runtime: "subagent",
requesterSessionKey: "agent:main:main",
ownerKey: "agent:main:main",
scopeKind: "session",
childSessionKey: "agent:main:subagent:child",
runId: "run-restore",
task: "Restore me",
@ -918,26 +945,30 @@ describe("task-registry", () => {
const older = createTaskRecord({
runtime: "acp",
requesterSessionKey: "agent:main:main",
ownerKey: "agent:main:main",
scopeKind: "session",
childSessionKey: "agent:main:subagent:child-1",
runId: "run-session-lookup-1",
task: "Older task",
});
const latest = createTaskRecord({
runtime: "subagent",
requesterSessionKey: "agent:main:main",
ownerKey: "agent:main:main",
scopeKind: "session",
childSessionKey: "agent:main:subagent:child-2",
runId: "run-session-lookup-2",
task: "Latest task",
});
nowSpy.mockRestore();
expect(findLatestTaskForSessionKey("agent:main:main")?.taskId).toBe(latest.taskId);
expect(listTasksForSessionKey("agent:main:main").map((task) => task.taskId)).toEqual([
expect(findLatestTaskForOwnerKey("agent:main:main")?.taskId).toBe(latest.taskId);
expect(listTasksForOwnerKey("agent:main:main").map((task) => task.taskId)).toEqual([
latest.taskId,
older.taskId,
]);
expect(findLatestTaskForSessionKey("agent:main:subagent:child-1")?.taskId).toBe(older.taskId);
expect(findLatestTaskForRelatedSessionKey("agent:main:subagent:child-1")?.taskId).toBe(
older.taskId,
);
});
});
@ -948,7 +979,8 @@ describe("task-registry", () => {
const task = createTaskRecord({
runtime: "acp",
requesterSessionKey: "agent:main:main",
ownerKey: "agent:main:main",
scopeKind: "session",
childSessionKey: "agent:main:acp:missing",
runId: "run-lost",
task: "Missing child",
@ -981,7 +1013,8 @@ describe("task-registry", () => {
const task = createTaskRecord({
runtime: "acp",
requesterSessionKey: "agent:main:main",
ownerKey: "agent:main:main",
scopeKind: "session",
childSessionKey: "agent:main:acp:missing",
runId: "run-lost-maintenance",
task: "Missing child",
@ -1013,7 +1046,8 @@ describe("task-registry", () => {
const task = createTaskRecord({
runtime: "cli",
requesterSessionKey: "agent:main:main",
ownerKey: "agent:main:main",
scopeKind: "session",
childSessionKey: "agent:main:main",
runId: "run-prune",
task: "Old completed task",
@ -1147,7 +1181,8 @@ describe("task-registry", () => {
const task = createTaskRecord({
runtime: "acp",
requesterSessionKey: "agent:main:main",
ownerKey: "agent:main:main",
scopeKind: "session",
requesterOrigin: {
channel: "discord",
to: "discord:123",
@ -1204,7 +1239,8 @@ describe("task-registry", () => {
createTaskRecord({
runtime: "acp",
requesterSessionKey: "agent:main:main",
ownerKey: "agent:main:main",
scopeKind: "session",
requesterOrigin: {
channel: "discord",
to: "discord:123",
@ -1276,7 +1312,8 @@ describe("task-registry", () => {
createTaskRecord({
runtime: "acp",
requesterSessionKey: "agent:main:main",
ownerKey: "agent:main:main",
scopeKind: "session",
requesterOrigin: {
channel: "discord",
to: "discord:123",
@ -1327,7 +1364,8 @@ describe("task-registry", () => {
createTaskRecord({
runtime: "acp",
requesterSessionKey: "agent:main:main",
ownerKey: "agent:main:main",
scopeKind: "session",
requesterOrigin: {
channel: "discord",
to: "discord:123",
@ -1385,7 +1423,8 @@ describe("task-registry", () => {
const task = registry.createTaskRecord({
runtime: "acp",
requesterSessionKey: "agent:main:main",
ownerKey: "agent:main:main",
scopeKind: "session",
requesterOrigin: {
channel: "telegram",
to: "telegram:123",
@ -1446,7 +1485,8 @@ describe("task-registry", () => {
const task = registry.createTaskRecord({
runtime: "subagent",
requesterSessionKey: "agent:main:main",
ownerKey: "agent:main:main",
scopeKind: "session",
requesterOrigin: {
channel: "telegram",
to: "telegram:123",

View File

@ -46,7 +46,8 @@ const DEFAULT_TASK_RETENTION_MS = 7 * 24 * 60 * 60_000;
const tasks = new Map<string, TaskRecord>();
const taskDeliveryStates = new Map<string, TaskDeliveryState>();
const taskIdsByRunId = new Map<string, Set<string>>();
const taskIdsBySessionKey = new Map<string, Set<string>>();
const taskIdsByOwnerKey = new Map<string, Set<string>>();
const taskIdsByRelatedSessionKey = new Map<string, Set<string>>();
const tasksWithPendingDelivery = new Set<string>();
let listenerStarted = false;
let listenerStop: (() => void) | null = null;
@ -55,10 +56,17 @@ let deliveryRuntimePromise: Promise<typeof import("./task-registry-delivery-runt
null;
type TaskDeliveryOwner = {
sessionKey: string;
sessionKey?: string;
requesterOrigin?: TaskDeliveryState["requesterOrigin"];
};
function assertTaskOwner(params: { ownerKey: string; scopeKind: TaskScopeKind }) {
const ownerKey = params.ownerKey.trim();
if (!ownerKey && params.scopeKind !== "system") {
throw new Error("Task ownerKey is required.");
}
}
function cloneTaskRecord(record: TaskRecord): TaskRecord {
return { ...record };
}
@ -156,19 +164,31 @@ function persistTaskDeliveryStateDelete(taskId: string) {
});
}
function ensureDeliveryStatus(requesterSessionKey: string): TaskDeliveryStatus {
return requesterSessionKey.trim() ? "pending" : "parent_missing";
function ensureDeliveryStatus(params: {
ownerKey: string;
scopeKind: TaskScopeKind;
}): TaskDeliveryStatus {
if (params.scopeKind === "system") {
return "not_applicable";
}
return params.ownerKey.trim() ? "pending" : "parent_missing";
}
function ensureNotifyPolicy(params: {
notifyPolicy?: TaskNotifyPolicy;
deliveryStatus?: TaskDeliveryStatus;
requesterSessionKey: string;
ownerKey: string;
scopeKind: TaskScopeKind;
}): TaskNotifyPolicy {
if (params.notifyPolicy) {
return params.notifyPolicy;
}
const deliveryStatus = params.deliveryStatus ?? ensureDeliveryStatus(params.requesterSessionKey);
const deliveryStatus =
params.deliveryStatus ??
ensureDeliveryStatus({
ownerKey: params.ownerKey,
scopeKind: params.scopeKind,
});
return deliveryStatus === "not_applicable" ? "silent" : "done_only";
}
@ -271,46 +291,68 @@ function normalizeSessionIndexKey(sessionKey?: string): string | undefined {
return trimmed ? trimmed : undefined;
}
function getTaskSessionIndexKeys(
task: Pick<TaskRecord, "requesterSessionKey" | "childSessionKey">,
) {
function addIndexedKey(index: Map<string, Set<string>>, key: string, taskId: string) {
let ids = index.get(key);
if (!ids) {
ids = new Set<string>();
index.set(key, ids);
}
ids.add(taskId);
}
function deleteIndexedKey(index: Map<string, Set<string>>, key: string, taskId: string) {
const ids = index.get(key);
if (!ids) {
return;
}
ids.delete(taskId);
if (ids.size === 0) {
index.delete(key);
}
}
function getTaskRelatedSessionIndexKeys(task: Pick<TaskRecord, "ownerKey" | "childSessionKey">) {
return [
...new Set(
[
normalizeSessionIndexKey(task.requesterSessionKey),
normalizeSessionIndexKey(task.ownerKey),
normalizeSessionIndexKey(task.childSessionKey),
].filter(Boolean) as string[],
),
];
}
function addSessionKeyIndex(
function addOwnerKeyIndex(taskId: string, task: Pick<TaskRecord, "ownerKey">) {
const key = normalizeSessionIndexKey(task.ownerKey);
if (!key) {
return;
}
addIndexedKey(taskIdsByOwnerKey, key, taskId);
}
function deleteOwnerKeyIndex(taskId: string, task: Pick<TaskRecord, "ownerKey">) {
const key = normalizeSessionIndexKey(task.ownerKey);
if (!key) {
return;
}
deleteIndexedKey(taskIdsByOwnerKey, key, taskId);
}
function addRelatedSessionKeyIndex(
taskId: string,
task: Pick<TaskRecord, "requesterSessionKey" | "childSessionKey">,
task: Pick<TaskRecord, "ownerKey" | "childSessionKey">,
) {
for (const sessionKey of getTaskSessionIndexKeys(task)) {
let ids = taskIdsBySessionKey.get(sessionKey);
if (!ids) {
ids = new Set<string>();
taskIdsBySessionKey.set(sessionKey, ids);
}
ids.add(taskId);
for (const sessionKey of getTaskRelatedSessionIndexKeys(task)) {
addIndexedKey(taskIdsByRelatedSessionKey, sessionKey, taskId);
}
}
function deleteSessionKeyIndex(
function deleteRelatedSessionKeyIndex(
taskId: string,
task: Pick<TaskRecord, "requesterSessionKey" | "childSessionKey">,
task: Pick<TaskRecord, "ownerKey" | "childSessionKey">,
) {
for (const sessionKey of getTaskSessionIndexKeys(task)) {
const ids = taskIdsBySessionKey.get(sessionKey);
if (!ids) {
continue;
}
ids.delete(taskId);
if (ids.size === 0) {
taskIdsBySessionKey.delete(sessionKey);
}
for (const sessionKey of getTaskRelatedSessionIndexKeys(task)) {
deleteIndexedKey(taskIdsByRelatedSessionKey, sessionKey, taskId);
}
}
@ -321,10 +363,17 @@ function rebuildRunIdIndex() {
}
}
function rebuildSessionKeyIndex() {
taskIdsBySessionKey.clear();
function rebuildOwnerKeyIndex() {
taskIdsByOwnerKey.clear();
for (const [taskId, task] of tasks.entries()) {
addSessionKeyIndex(taskId, task);
addOwnerKeyIndex(taskId, task);
}
}
function rebuildRelatedSessionKeyIndex() {
taskIdsByRelatedSessionKey.clear();
for (const [taskId, task] of tasks.entries()) {
addRelatedSessionKeyIndex(taskId, task);
}
}
@ -338,12 +387,13 @@ function getTasksByRunId(runId: string): TaskRecord[] {
.filter((task): task is TaskRecord => Boolean(task));
}
function taskRunOwnerKey(
task: Pick<TaskRecord, "runtime" | "requesterSessionKey" | "childSessionKey">,
function taskRunScopeKey(
task: Pick<TaskRecord, "runtime" | "scopeKind" | "ownerKey" | "childSessionKey">,
): string {
return [
task.runtime,
normalizeComparableText(task.requesterSessionKey),
task.scopeKind,
normalizeComparableText(task.ownerKey),
normalizeComparableText(task.childSessionKey),
].join("\u0000");
}
@ -364,12 +414,14 @@ function getTasksByRunScope(params: {
if (childMatches.length > 0) {
return childMatches;
}
return matches.filter(
(task) => normalizeSessionIndexKey(task.requesterSessionKey) === sessionKey,
const ownerMatches = matches.filter(
(task) =>
task.scopeKind === "session" && normalizeSessionIndexKey(task.ownerKey) === sessionKey,
);
return ownerMatches;
}
const ownerKeys = new Set(matches.map((task) => taskRunOwnerKey(task)));
return ownerKeys.size <= 1 ? matches : [];
const scopeKeys = new Set(matches.map((task) => taskRunScopeKey(task)));
return scopeKeys.size <= 1 ? matches : [];
}
function getPeerTasksForDelivery(task: TaskRecord): TaskRecord[] {
@ -379,8 +431,8 @@ function getPeerTasksForDelivery(task: TaskRecord): TaskRecord[] {
return getTasksByRunId(task.runId).filter(
(candidate) =>
candidate.runtime === task.runtime &&
normalizeComparableText(candidate.requesterSessionKey) ===
normalizeComparableText(task.requesterSessionKey) &&
candidate.scopeKind === task.scopeKind &&
normalizeComparableText(candidate.ownerKey) === normalizeComparableText(task.ownerKey) &&
normalizeComparableText(candidate.childSessionKey) ===
normalizeComparableText(task.childSessionKey),
);
@ -418,7 +470,8 @@ function compareTasksNewestFirst(
function findExistingTaskForCreate(params: {
runtime: TaskRuntime;
requesterSessionKey: string;
ownerKey: string;
scopeKind: TaskScopeKind;
childSessionKey?: string;
runId?: string;
label?: string;
@ -429,8 +482,8 @@ function findExistingTaskForCreate(params: {
? getTasksByRunId(runId).filter(
(task) =>
task.runtime === params.runtime &&
normalizeComparableText(task.requesterSessionKey) ===
normalizeComparableText(params.requesterSessionKey) &&
task.scopeKind === params.scopeKind &&
normalizeComparableText(task.ownerKey) === normalizeComparableText(params.ownerKey) &&
normalizeComparableText(task.childSessionKey) ===
normalizeComparableText(params.childSessionKey),
)
@ -505,7 +558,8 @@ function mergeExistingTaskForCreate(
const notifyPolicy = ensureNotifyPolicy({
notifyPolicy: params.notifyPolicy,
deliveryStatus: params.deliveryStatus,
requesterSessionKey: existing.requesterSessionKey,
ownerKey: existing.ownerKey,
scopeKind: existing.scopeKind,
});
if (notifyPolicy !== existing.notifyPolicy && existing.notifyPolicy === "silent") {
patch.notifyPolicy = notifyPolicy;
@ -534,8 +588,11 @@ function resolveTaskTerminalIdempotencyKey(task: TaskRecord): string {
}
function resolveTaskDeliveryOwner(task: TaskRecord): TaskDeliveryOwner {
if (task.scopeKind !== "session") {
return {};
}
return {
sessionKey: task.requesterSessionKey.trim(),
sessionKey: task.ownerKey.trim(),
requesterOrigin: normalizeDeliveryContext(taskDeliveryStates.get(task.taskId)?.requesterOrigin),
};
}
@ -557,7 +614,8 @@ function restoreTaskRegistryOnce() {
taskDeliveryStates.set(taskId, state);
}
rebuildRunIdIndex();
rebuildSessionKeyIndex();
rebuildOwnerKeyIndex();
rebuildRelatedSessionKeyIndex();
emitTaskRegistryHookEvent(() => ({
kind: "restored",
tasks: snapshotTaskRecords(tasks),
@ -583,8 +641,7 @@ function updateTask(taskId: string, patch: Partial<TaskRecord>): TaskRecord | nu
next.cleanupAfter = terminalAt + DEFAULT_TASK_RETENTION_MS;
}
const sessionIndexChanged =
normalizeSessionIndexKey(current.requesterSessionKey) !==
normalizeSessionIndexKey(next.requesterSessionKey) ||
normalizeSessionIndexKey(current.ownerKey) !== normalizeSessionIndexKey(next.ownerKey) ||
normalizeSessionIndexKey(current.childSessionKey) !==
normalizeSessionIndexKey(next.childSessionKey);
tasks.set(taskId, next);
@ -592,8 +649,10 @@ function updateTask(taskId: string, patch: Partial<TaskRecord>): TaskRecord | nu
rebuildRunIdIndex();
}
if (sessionIndexChanged) {
deleteSessionKeyIndex(taskId, current);
addSessionKeyIndex(taskId, next);
deleteOwnerKeyIndex(taskId, current);
addOwnerKeyIndex(taskId, next);
deleteRelatedSessionKeyIndex(taskId, current);
addRelatedSessionKeyIndex(taskId, next);
}
persistTaskUpsert(next);
emitTaskRegistryHookEvent(() => ({
@ -635,20 +694,24 @@ function canDeliverTaskToRequesterOrigin(task: TaskRecord): boolean {
return Boolean(channel && to && isDeliverableMessageChannel(channel));
}
function resolveMissingOwnerDeliveryStatus(task: TaskRecord): TaskDeliveryStatus {
return task.scopeKind === "system" ? "not_applicable" : "parent_missing";
}
function queueTaskSystemEvent(task: TaskRecord, text: string) {
const owner = resolveTaskDeliveryOwner(task);
const requesterSessionKey = owner.sessionKey.trim();
if (!requesterSessionKey) {
const ownerKey = owner.sessionKey?.trim();
if (!ownerKey) {
return false;
}
enqueueSystemEvent(text, {
sessionKey: requesterSessionKey,
sessionKey: ownerKey,
contextKey: `task:${task.taskId}`,
deliveryContext: owner.requesterOrigin,
});
requestHeartbeatNow({
reason: "background-task",
sessionKey: requesterSessionKey,
sessionKey: ownerKey,
});
return true;
}
@ -659,18 +722,18 @@ function queueBlockedTaskFollowup(task: TaskRecord) {
return false;
}
const owner = resolveTaskDeliveryOwner(task);
const requesterSessionKey = owner.sessionKey.trim();
if (!requesterSessionKey) {
const ownerKey = owner.sessionKey?.trim();
if (!ownerKey) {
return false;
}
enqueueSystemEvent(followupText, {
sessionKey: requesterSessionKey,
sessionKey: ownerKey,
contextKey: `task:${task.taskId}:blocked-followup`,
deliveryContext: owner.requesterOrigin,
});
requestHeartbeatNow({
reason: "background-task-blocked",
sessionKey: requesterSessionKey,
sessionKey: ownerKey,
});
return true;
}
@ -702,9 +765,10 @@ export async function maybeDeliverTaskTerminalUpdate(taskId: string): Promise<Ta
});
}
const owner = resolveTaskDeliveryOwner(latest);
if (!owner.sessionKey.trim()) {
const ownerSessionKey = owner.sessionKey?.trim();
if (!ownerSessionKey) {
return updateTask(taskId, {
deliveryStatus: "parent_missing",
deliveryStatus: resolveMissingOwnerDeliveryStatus(latest),
lastEventAt: Date.now(),
});
}
@ -722,7 +786,7 @@ export async function maybeDeliverTaskTerminalUpdate(taskId: string): Promise<Ta
} catch (error) {
log.warn("Failed to queue background task session delivery", {
taskId,
requesterSessionKey: latest.requesterSessionKey,
ownerKey: latest.ownerKey,
error,
});
return updateTask(taskId, {
@ -733,7 +797,7 @@ export async function maybeDeliverTaskTerminalUpdate(taskId: string): Promise<Ta
}
try {
const { sendMessage } = await loadTaskRegistryDeliveryRuntime();
const requesterAgentId = parseAgentSessionKey(owner.sessionKey)?.agentId;
const requesterAgentId = parseAgentSessionKey(ownerSessionKey)?.agentId;
const idempotencyKey = resolveTaskTerminalIdempotencyKey(latest);
await sendMessage({
channel: owner.requesterOrigin?.channel,
@ -744,7 +808,7 @@ export async function maybeDeliverTaskTerminalUpdate(taskId: string): Promise<Ta
agentId: requesterAgentId,
idempotencyKey,
mirror: {
sessionKey: owner.sessionKey,
sessionKey: ownerSessionKey,
agentId: requesterAgentId,
idempotencyKey,
},
@ -759,7 +823,7 @@ export async function maybeDeliverTaskTerminalUpdate(taskId: string): Promise<Ta
} catch (error) {
log.warn("Failed to deliver background task update", {
taskId,
requesterSessionKey: owner.sessionKey,
ownerKey: ownerSessionKey,
requesterOrigin: owner.requesterOrigin,
error,
});
@ -771,7 +835,7 @@ export async function maybeDeliverTaskTerminalUpdate(taskId: string): Promise<Ta
} catch (fallbackError) {
log.warn("Failed to queue background task fallback event", {
taskId,
requesterSessionKey: latest.requesterSessionKey,
ownerKey: latest.ownerKey,
error: fallbackError,
});
}
@ -804,6 +868,13 @@ export async function maybeDeliverTaskStateChangeUpdate(
}
try {
const owner = resolveTaskDeliveryOwner(current);
const ownerSessionKey = owner.sessionKey?.trim();
if (!ownerSessionKey) {
return updateTask(taskId, {
deliveryStatus: resolveMissingOwnerDeliveryStatus(current),
lastEventAt: Date.now(),
});
}
if (!canDeliverTaskToRequesterOrigin(current)) {
queueTaskSystemEvent(current, eventText);
upsertTaskDeliveryState({
@ -816,7 +887,7 @@ export async function maybeDeliverTaskStateChangeUpdate(
});
}
const { sendMessage } = await loadTaskRegistryDeliveryRuntime();
const requesterAgentId = parseAgentSessionKey(owner.sessionKey)?.agentId;
const requesterAgentId = parseAgentSessionKey(ownerSessionKey)?.agentId;
const idempotencyKey = resolveTaskStateChangeIdempotencyKey({
task: current,
latestEvent,
@ -831,7 +902,7 @@ export async function maybeDeliverTaskStateChangeUpdate(
agentId: requesterAgentId,
idempotencyKey,
mirror: {
sessionKey: owner.sessionKey,
sessionKey: ownerSessionKey,
agentId: requesterAgentId,
idempotencyKey,
},
@ -847,7 +918,7 @@ export async function maybeDeliverTaskStateChangeUpdate(
} catch (error) {
log.warn("Failed to deliver background task state change", {
taskId,
requesterSessionKey: current.requesterSessionKey,
ownerKey: current.ownerKey,
error,
});
return cloneTaskRecord(current);
@ -1116,7 +1187,8 @@ export function createTaskRecord(params: {
requesterOrigin: normalizeDeliveryContext(params.requesterOrigin),
});
addRunIdIndex(taskId, record.runId);
addSessionKeyIndex(taskId, record);
addOwnerKeyIndex(taskId, record);
addRelatedSessionKeyIndex(taskId, record);
persistTaskUpsert(record);
emitTaskRegistryHookEvent(() => ({
kind: "upserted",
@ -1429,26 +1501,8 @@ export function findTaskByRunId(runId: string): TaskRecord | undefined {
return task ? cloneTaskRecord(task) : undefined;
}
export function findLatestTaskForSessionKey(sessionKey: string): TaskRecord | undefined {
const task = listTasksForSessionKey(sessionKey)[0];
return task ? cloneTaskRecord(task) : undefined;
}
export function findLatestTaskForOwnerKey(ownerKey: string): TaskRecord | undefined {
return findLatestTaskForSessionKey(ownerKey);
}
export function findLatestTaskForRelatedSessionKey(sessionKey: string): TaskRecord | undefined {
return findLatestTaskForSessionKey(sessionKey);
}
export function listTasksForSessionKey(sessionKey: string): TaskRecord[] {
ensureTaskRegistryReady();
const key = normalizeSessionIndexKey(sessionKey);
if (!key) {
return [];
}
const ids = taskIdsBySessionKey.get(key);
function listTasksFromIndex(index: Map<string, Set<string>>, key: string): TaskRecord[] {
const ids = index.get(key);
if (!ids || ids.size === 0) {
return [];
}
@ -1468,12 +1522,46 @@ export function listTasksForSessionKey(sessionKey: string): TaskRecord[] {
.map(({ insertionIndex: _, ...task }) => task);
}
export function findLatestTaskForSessionKey(sessionKey: string): TaskRecord | undefined {
const task = listTasksForSessionKey(sessionKey)[0];
return task ? cloneTaskRecord(task) : undefined;
}
export function listTasksForSessionKey(sessionKey: string): TaskRecord[] {
ensureTaskRegistryReady();
const key = normalizeSessionIndexKey(sessionKey);
if (!key) {
return [];
}
return listTasksFromIndex(taskIdsByRelatedSessionKey, key);
}
export function findLatestTaskForOwnerKey(ownerKey: string): TaskRecord | undefined {
const task = listTasksForOwnerKey(ownerKey)[0];
return task ? cloneTaskRecord(task) : undefined;
}
export function listTasksForOwnerKey(ownerKey: string): TaskRecord[] {
return listTasksForSessionKey(ownerKey);
ensureTaskRegistryReady();
const key = normalizeSessionIndexKey(ownerKey);
if (!key) {
return [];
}
return listTasksFromIndex(taskIdsByOwnerKey, key);
}
export function findLatestTaskForRelatedSessionKey(sessionKey: string): TaskRecord | undefined {
const task = listTasksForRelatedSessionKey(sessionKey)[0];
return task ? cloneTaskRecord(task) : undefined;
}
export function listTasksForRelatedSessionKey(sessionKey: string): TaskRecord[] {
return listTasksForSessionKey(sessionKey);
ensureTaskRegistryReady();
const key = normalizeSessionIndexKey(sessionKey);
if (!key) {
return [];
}
return listTasksFromIndex(taskIdsByRelatedSessionKey, key);
}
export function resolveTaskForLookupToken(token: string): TaskRecord | undefined {
@ -1481,7 +1569,9 @@ export function resolveTaskForLookupToken(token: string): TaskRecord | undefined
if (!lookup) {
return undefined;
}
return getTaskById(lookup) ?? findTaskByRunId(lookup) ?? findLatestTaskForSessionKey(lookup);
return (
getTaskById(lookup) ?? findTaskByRunId(lookup) ?? findLatestTaskForRelatedSessionKey(lookup)
);
}
export function deleteTaskRecordById(taskId: string): boolean {
@ -1490,7 +1580,8 @@ export function deleteTaskRecordById(taskId: string): boolean {
if (!current) {
return false;
}
deleteSessionKeyIndex(taskId, current);
deleteOwnerKeyIndex(taskId, current);
deleteRelatedSessionKeyIndex(taskId, current);
tasks.delete(taskId);
taskDeliveryStates.delete(taskId);
rebuildRunIdIndex();
@ -1508,7 +1599,8 @@ export function resetTaskRegistryForTests(opts?: { persist?: boolean }) {
tasks.clear();
taskDeliveryStates.clear();
taskIdsByRunId.clear();
taskIdsBySessionKey.clear();
taskIdsByOwnerKey.clear();
taskIdsByRelatedSessionKey.clear();
tasksWithPendingDelivery.clear();
restoreAttempted = false;
resetTaskRegistryRuntimeForTests();