From 0a014ca63a8dc733dde91b4f2b8de3830325354a Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Mon, 30 Mar 2026 14:34:48 +0900 Subject: [PATCH] perf(tasks): optimize session lookups and sqlite upserts --- .../openclaw-tools.session-status.test.ts | 40 ++++++++ src/agents/tools/cron-tool.ts | 2 + src/agents/tools/session-status-tool.ts | 36 +++++++- src/auto-reply/reply/commands-acp.test.ts | 14 +++ .../reply/commands-acp/runtime-options.ts | 8 ++ .../reply/commands-subagents/action-info.ts | 4 + src/auto-reply/reply/commands.test.ts | 15 +++ src/tasks/task-registry.store.sqlite.ts | 33 ++++++- src/tasks/task-registry.test.ts | 31 +++++++ src/tasks/task-registry.ts | 91 ++++++++++++++++++- 10 files changed, 261 insertions(+), 13 deletions(-) diff --git a/src/agents/openclaw-tools.session-status.test.ts b/src/agents/openclaw-tools.session-status.test.ts index 9a11fdbe032..e652fcd3219 100644 --- a/src/agents/openclaw-tools.session-status.test.ts +++ b/src/agents/openclaw-tools.session-status.test.ts @@ -8,6 +8,9 @@ const callGatewayMock = vi.fn(); const loadCombinedSessionStoreForGatewayMock = vi.fn(); const buildStatusMessageMock = vi.hoisted(() => vi.fn(() => "OpenClaw\n๐Ÿง  Model: GPT-5.4")); const resolveQueueSettingsMock = vi.hoisted(() => vi.fn(() => ({ mode: "interrupt" }))); +const listTasksForSessionKeyMock = vi.hoisted(() => + vi.fn((_: string) => [] as Array>), +); const createMockConfig = () => ({ session: { mainKey: "main", scope: "per-sender" }, @@ -189,6 +192,9 @@ async function loadFreshOpenClawToolsForSessionStatusTest() { vi.doMock("../auto-reply/status.js", () => ({ buildStatusMessage: buildStatusMessageMock, })); + vi.doMock("../tasks/task-registry.js", () => ({ + listTasksForSessionKey: (sessionKey: string) => listTasksForSessionKeyMock(sessionKey), + })); ({ createSessionStatusTool } = await import("./tools/session-status-tool.js")); } @@ -200,6 +206,8 @@ function resetSessionStore(store: Record) { updateSessionStoreMock.mockClear(); callGatewayMock.mockClear(); loadCombinedSessionStoreForGatewayMock.mockClear(); + listTasksForSessionKeyMock.mockClear(); + listTasksForSessionKeyMock.mockReturnValue([]); loadSessionStoreMock.mockReturnValue(store); loadCombinedSessionStoreForGatewayMock.mockReturnValue({ storePath: "(multiple)", @@ -375,6 +383,38 @@ describe("session_status tool", () => { expect(details.sessionKey).toBe("agent:main:current"); }); + it("includes background task context in session_status output", async () => { + resetSessionStore({ + "agent:main:main": { + sessionId: "sess-main", + updatedAt: Date.now(), + }, + }); + listTasksForSessionKeyMock.mockReturnValue([ + { + taskId: "task-1", + runtime: "acp", + requesterSessionKey: "agent:main:main", + task: "Summarize inbox backlog", + status: "running", + deliveryStatus: "pending", + notifyPolicy: "done_only", + createdAt: Date.now() - 5_000, + progressSummary: "Indexing the latest threads", + }, + ]); + + const tool = createSessionStatusTool({ agentSessionKey: "agent:main:main" }); + const result = await tool.execute("tc-1", { sessionKey: "agent:main:main" }); + const firstContent = result.content?.[0]; + const text = (firstContent as { text: string } | undefined)?.text ?? ""; + + expect(text).toContain("๐Ÿ“Œ Tasks: 1 active"); + expect(text).toContain("acp"); + expect(text).toContain("Summarize inbox backlog"); + expect(text).toContain("Indexing the latest threads"); + }); + it("resolves a literal current sessionId in session_status", async () => { resetSessionStore({ main: { diff --git a/src/agents/tools/cron-tool.ts b/src/agents/tools/cron-tool.ts index 58451934eda..1a2a24bba86 100644 --- a/src/agents/tools/cron-tool.ts +++ b/src/agents/tools/cron-tool.ts @@ -216,6 +216,8 @@ export function createCronTool(opts?: CronToolOptions, deps?: CronToolDeps): Any displaySummary: "Schedule and manage cron jobs and wake events.", description: `Manage Gateway cron jobs (status/list/add/update/remove/run/runs) and send wake events. +Main-session cron jobs enqueue system events for heartbeat handling. Isolated cron jobs create background task runs that appear in \`openclaw tasks\`. + ACTIONS: - status: Check cron scheduler status - list: List jobs (use includeDisabled:true to include disabled) diff --git a/src/agents/tools/session-status-tool.ts b/src/agents/tools/session-status-tool.ts index e6be3fb103d..732ad21b6a1 100644 --- a/src/agents/tools/session-status-tool.ts +++ b/src/agents/tools/session-status-tool.ts @@ -23,6 +23,7 @@ import { resolveAgentIdFromSessionKey, } from "../../routing/session-key.js"; import { applyModelOverrideToSessionEntry } from "../../sessions/model-overrides.js"; +import { listTasksForSessionKey } from "../../tasks/task-registry.js"; import { resolveAgentConfig, resolveAgentDir } from "../agent-scope.js"; import { formatUserTime, resolveUserTimeFormat, resolveUserTimezone } from "../date-time.js"; import { resolveModelAuthLabel } from "../model-auth-label.js"; @@ -118,6 +119,33 @@ function resolveStoreScopedRequesterKey(params: { return parsed.rest === params.mainKey ? params.mainKey : params.requesterKey; } +function formatSessionTaskLine(sessionKey: string): string | undefined { + const tasks = listTasksForSessionKey(sessionKey); + if (tasks.length === 0) { + return undefined; + } + const latest = tasks[0]; + const active = tasks.filter( + (task) => task.status === "queued" || task.status === "running", + ).length; + const failed = tasks.filter( + (task) => task.status === "failed" || task.status === "timed_out" || task.status === "lost", + ).length; + const headline = + active > 0 + ? `${active} active` + : failed > 0 + ? `${failed} recent failure${failed === 1 ? "" : "s"}` + : `latest ${latest.status.replaceAll("_", " ")}`; + const title = latest.label?.trim() || latest.task.trim(); + const detail = + latest.status === "running" || latest.status === "queued" + ? latest.progressSummary?.trim() + : latest.error?.trim() || latest.terminalSummary?.trim(); + const parts = [headline, latest.runtime, title, detail].filter(Boolean); + return parts.length ? `๐Ÿ“Œ Tasks: ${parts.join(" ยท ")}` : undefined; +} + async function resolveModelOverride(params: { cfg: OpenClawConfig; raw: string; @@ -191,7 +219,7 @@ export function createSessionStatusTool(opts?: { label: "Session Status", name: "session_status", description: - "Show a /status-equivalent session status card (usage + time + cost when available). Use for model-use questions (๐Ÿ“Š session_status). Optional: set per-session model override (model=default resets overrides).", + "Show a /status-equivalent session status card (usage + time + cost when available), including linked background task context when present. Use for model-use questions (๐Ÿ“Š session_status). Optional: set per-session model override (model=default resets overrides).", parameters: SessionStatusToolSchema, execute: async (_toolCallId, args) => { const params = args as Record; @@ -537,14 +565,16 @@ export function createSessionStatusTool(opts?: { }, includeTranscriptUsage: true, }); + const taskLine = formatSessionTaskLine(resolved.key); + const fullStatusText = taskLine ? `${statusText}\n${taskLine}` : statusText; return { - content: [{ type: "text", text: statusText }], + content: [{ type: "text", text: fullStatusText }], details: { ok: true, sessionKey: resolved.key, changedModel, - statusText, + statusText: fullStatusText, }, }; }, diff --git a/src/auto-reply/reply/commands-acp.test.ts b/src/auto-reply/reply/commands-acp.test.ts index c096c63f06a..1245fbfd478 100644 --- a/src/auto-reply/reply/commands-acp.test.ts +++ b/src/auto-reply/reply/commands-acp.test.ts @@ -111,6 +111,8 @@ const { handleAcpCommand } = await import("./commands-acp.js"); const { buildCommandTestParams } = await import("./commands-spawn.test-harness.js"); const { __testing: acpManagerTesting } = await import("../../acp/control-plane/manager.js"); const { __testing: acpResetTargetTesting } = await import("./acp-reset-target.js"); +const { createTaskRecord, resetTaskRegistryForTests } = + await import("../../tasks/task-registry.js"); function parseTelegramChatIdForTest(raw?: string | null): string | undefined { const trimmed = raw?.trim().replace(/^telegram:/i, ""); @@ -675,6 +677,7 @@ describe("/acp command", () => { beforeEach(() => { setMinimalAcpCommandRegistryForTests(); acpManagerTesting.resetAcpSessionManagerForTests(); + resetTaskRegistryForTests(); acpResetTargetTesting.setDepsForTest({ getSessionBindingService: () => createAcpCommandSessionBindingService() as never, }); @@ -1454,12 +1457,23 @@ describe("/acp command", () => { lastUpdatedAt: Date.now(), }, }); + createTaskRecord({ + runtime: "acp", + requesterSessionKey: "agent:main:main", + childSessionKey: defaultAcpSessionKey, + runId: "acp-run-1", + task: "Inspect ACP backlog", + status: "running", + progressSummary: "Fetching the latest runtime state", + }); const result = await runThreadAcpCommand("/acp status", baseCfg); expect(result?.reply?.text).toContain("ACP status:"); expect(result?.reply?.text).toContain(`session: ${defaultAcpSessionKey}`); expect(result?.reply?.text).toContain("agent session id: codex-sid-1"); expect(result?.reply?.text).toContain("acpx session id: acpx-sid-1"); + expect(result?.reply?.text).toContain("taskStatus: running"); + expect(result?.reply?.text).toContain("taskProgress: Fetching the latest runtime state"); expect(result?.reply?.text).toContain("capabilities:"); expect(hoisted.getStatusMock).toHaveBeenCalledTimes(1); }); diff --git a/src/auto-reply/reply/commands-acp/runtime-options.ts b/src/auto-reply/reply/commands-acp/runtime-options.ts index 6d0dc7da7a4..f88c88caed1 100644 --- a/src/auto-reply/reply/commands-acp/runtime-options.ts +++ b/src/auto-reply/reply/commands-acp/runtime-options.ts @@ -142,6 +142,14 @@ export async function handleAcpStatusAction( `taskId: ${linkedTask.taskId}`, `taskStatus: ${linkedTask.status}`, `delivery: ${linkedTask.deliveryStatus}`, + ...(linkedTask.progressSummary + ? [`taskProgress: ${linkedTask.progressSummary}`] + : []), + ...(linkedTask.terminalSummary ? [`taskSummary: ${linkedTask.terminalSummary}`] : []), + ...(linkedTask.error ? [`taskError: ${linkedTask.error}`] : []), + ...(typeof linkedTask.lastEventAt === "number" + ? [`taskUpdatedAt: ${new Date(linkedTask.lastEventAt).toISOString()}`] + : []), ] : []), `runtimeOptions: ${formatRuntimeOptionsText(status.runtimeOptions)}`, diff --git a/src/auto-reply/reply/commands-subagents/action-info.ts b/src/auto-reply/reply/commands-subagents/action-info.ts index aa97b891b01..69f39f9cb26 100644 --- a/src/auto-reply/reply/commands-subagents/action-info.ts +++ b/src/auto-reply/reply/commands-subagents/action-info.ts @@ -46,6 +46,7 @@ export function handleSubagentsInfoAction(ctx: SubagentsCommandContext): Command `Task: ${run.task}`, `Run: ${run.runId}`, linkedTask ? `TaskId: ${linkedTask.taskId}` : undefined, + linkedTask ? `TaskStatus: ${linkedTask.status}` : undefined, `Session: ${run.childSessionKey}`, `SessionId: ${sessionEntry?.sessionId ?? "n/a"}`, `Transcript: ${sessionEntry?.sessionFile ?? "n/a"}`, @@ -57,6 +58,9 @@ export function handleSubagentsInfoAction(ctx: SubagentsCommandContext): Command run.archiveAtMs ? `Archive: ${formatTimestampWithAge(run.archiveAtMs)}` : undefined, run.cleanupHandled ? "Cleanup handled: yes" : undefined, `Outcome: ${outcome}`, + linkedTask?.progressSummary ? `Progress: ${linkedTask.progressSummary}` : undefined, + linkedTask?.terminalSummary ? `Task summary: ${linkedTask.terminalSummary}` : undefined, + linkedTask?.error ? `Task error: ${linkedTask.error}` : undefined, linkedTask ? `Delivery: ${linkedTask.deliveryStatus}` : undefined, ].filter(Boolean); diff --git a/src/auto-reply/reply/commands.test.ts b/src/auto-reply/reply/commands.test.ts index a27d2a0eb95..44a9cffd86b 100644 --- a/src/auto-reply/reply/commands.test.ts +++ b/src/auto-reply/reply/commands.test.ts @@ -130,6 +130,8 @@ const { parseConfigCommand } = await import("./config-commands.js"); const { parseDebugCommand } = await import("./debug-commands.js"); const { parseInlineDirectives } = await import("./directive-handling.js"); const { buildCommandContext, handleCommands } = await import("./commands.js"); +const { createTaskRecord, resetTaskRegistryForTests } = + await import("../../tasks/task-registry.js"); let testWorkspaceDir = os.tmpdir(); @@ -150,6 +152,7 @@ afterAll(async () => { beforeEach(() => { vi.useRealTimers(); vi.clearAllTimers(); + resetTaskRegistryForTests(); setDefaultChannelPluginRegistryForTests(); readConfigFileSnapshotMock.mockImplementation(async () => { const configPath = process.env.OPENCLAW_CONFIG_PATH; @@ -2743,6 +2746,16 @@ describe("handleCommands subagents", () => { endedAt: now - 1_000, outcome: { status: "ok" }, }); + createTaskRecord({ + runtime: "subagent", + requesterSessionKey: "agent:main:main", + childSessionKey: "agent:main:subagent:abc", + runId: "run-1", + task: "do thing", + status: "succeeded", + terminalSummary: "Completed the requested task", + deliveryStatus: "delivered", + }); const cfg = { commands: { text: true }, channels: { whatsapp: { allowFrom: ["*"] } }, @@ -2754,6 +2767,8 @@ describe("handleCommands subagents", () => { expect(result.reply?.text).toContain("Subagent info"); expect(result.reply?.text).toContain("Run: run-1"); expect(result.reply?.text).toContain("Status: done"); + expect(result.reply?.text).toContain("TaskStatus: succeeded"); + expect(result.reply?.text).toContain("Task summary: Completed the requested task"); }); it("does not resolve moved child rows from a stale older parent", async () => { diff --git a/src/tasks/task-registry.store.sqlite.ts b/src/tasks/task-registry.store.sqlite.ts index 0def1fc78a8..0e507bc7d6d 100644 --- a/src/tasks/task-registry.store.sqlite.ts +++ b/src/tasks/task-registry.store.sqlite.ts @@ -40,7 +40,7 @@ type TaskDeliveryStateRow = { type TaskRegistryStatements = { selectAll: StatementSync; selectAllDeliveryStates: StatementSync; - replaceRow: StatementSync; + upsertRow: StatementSync; replaceDeliveryState: StatementSync; deleteRow: StatementSync; deleteDeliveryState: StatementSync; @@ -186,6 +186,7 @@ function createStatements(db: DatabaseSync): TaskRegistryStatements { FROM task_runs ORDER BY created_at ASC, task_id ASC `), +<<<<<<< HEAD selectAllDeliveryStates: db.prepare(` SELECT task_id, @@ -194,8 +195,8 @@ function createStatements(db: DatabaseSync): TaskRegistryStatements { FROM task_delivery_state ORDER BY task_id ASC `), - replaceRow: db.prepare(` - INSERT OR REPLACE INTO task_runs ( + upsertRow: db.prepare(` + INSERT INTO task_runs ( task_id, runtime, source_id, @@ -242,6 +243,28 @@ function createStatements(db: DatabaseSync): TaskRegistryStatements { @terminal_summary, @terminal_outcome ) + ON CONFLICT(task_id) DO UPDATE SET + runtime = excluded.runtime, + source_id = excluded.source_id, + requester_session_key = excluded.requester_session_key, + child_session_key = excluded.child_session_key, + parent_task_id = excluded.parent_task_id, + agent_id = excluded.agent_id, + run_id = excluded.run_id, + label = excluded.label, + task = excluded.task, + status = excluded.status, + delivery_status = excluded.delivery_status, + notify_policy = excluded.notify_policy, + created_at = excluded.created_at, + started_at = excluded.started_at, + ended_at = excluded.ended_at, + last_event_at = excluded.last_event_at, + cleanup_after = excluded.cleanup_after, + error = excluded.error, + progress_summary = excluded.progress_summary, + terminal_summary = excluded.terminal_summary, + terminal_outcome = excluded.terminal_outcome `), replaceDeliveryState: db.prepare(` INSERT OR REPLACE INTO task_delivery_state ( @@ -371,7 +394,7 @@ export function saveTaskRegistryStateToSqlite(snapshot: TaskRegistryStoreSnapsho statements.clearDeliveryStates.run(); statements.clearRows.run(); for (const task of snapshot.tasks.values()) { - statements.replaceRow.run(bindTaskRecord(task)); + statements.upsertRow.run(bindTaskRecord(task)); } for (const state of snapshot.deliveryStates.values()) { statements.replaceDeliveryState.run(bindTaskDeliveryState(state)); @@ -381,7 +404,7 @@ export function saveTaskRegistryStateToSqlite(snapshot: TaskRegistryStoreSnapsho export function upsertTaskRegistryRecordToSqlite(task: TaskRecord) { const store = openTaskRegistryDatabase(); - store.statements.replaceRow.run(bindTaskRecord(task)); + store.statements.upsertRow.run(bindTaskRecord(task)); ensureTaskRegistryPermissions(store.path); } diff --git a/src/tasks/task-registry.test.ts b/src/tasks/task-registry.test.ts index d225e4596af..46fed30cb11 100644 --- a/src/tasks/task-registry.test.ts +++ b/src/tasks/task-registry.test.ts @@ -9,9 +9,11 @@ import { peekSystemEvents, resetSystemEventsForTest } from "../infra/system-even import { withTempDir } from "../test-helpers/temp-dir.js"; import { createTaskRecord, + findLatestTaskForSessionKey, findTaskByRunId, getTaskById, getTaskRegistrySummary, + listTasksForSessionKey, listTaskRecords, maybeDeliverTaskStateChangeUpdate, maybeDeliverTaskTerminalUpdate, @@ -789,6 +791,35 @@ describe("task-registry", () => { }); }); + it("indexes tasks by session key for latest and list lookups", async () => { + await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskRegistryForTests({ persist: false }); + + const older = createTaskRecord({ + runtime: "acp", + requesterSessionKey: "agent:main:main", + childSessionKey: "agent:main:subagent:child-1", + runId: "run-session-lookup-1", + task: "Older task", + }); + const latest = createTaskRecord({ + runtime: "subagent", + requesterSessionKey: "agent:main:main", + childSessionKey: "agent:main:subagent:child-2", + runId: "run-session-lookup-2", + task: "Latest task", + }); + + expect(findLatestTaskForSessionKey("agent:main:main")?.taskId).toBe(latest.taskId); + expect(listTasksForSessionKey("agent:main:main").map((task) => task.taskId)).toEqual([ + latest.taskId, + older.taskId, + ]); + expect(findLatestTaskForSessionKey("agent:main:subagent:child-1")?.taskId).toBe(older.taskId); + }); + }); + it("projects inspection-time orphaned tasks as lost without mutating the registry", async () => { await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => { process.env.OPENCLAW_STATE_DIR = root; diff --git a/src/tasks/task-registry.ts b/src/tasks/task-registry.ts index 6158f21eeb0..238610a2e6f 100644 --- a/src/tasks/task-registry.ts +++ b/src/tasks/task-registry.ts @@ -45,6 +45,7 @@ const DEFAULT_TASK_RETENTION_MS = 7 * 24 * 60 * 60_000; const tasks = new Map(); const taskDeliveryStates = new Map(); const taskIdsByRunId = new Map>(); +const taskIdsBySessionKey = new Map>(); const tasksWithPendingDelivery = new Set(); let listenerStarted = false; let listenerStop: (() => void) | null = null; @@ -218,6 +219,54 @@ function addRunIdIndex(taskId: string, runId?: string) { ids.add(taskId); } +function normalizeSessionIndexKey(sessionKey?: string): string | undefined { + const trimmed = sessionKey?.trim(); + return trimmed ? trimmed : undefined; +} + +function getTaskSessionIndexKeys( + task: Pick, +) { + return [ + ...new Set( + [ + normalizeSessionIndexKey(task.requesterSessionKey), + normalizeSessionIndexKey(task.childSessionKey), + ].filter(Boolean) as string[], + ), + ]; +} + +function addSessionKeyIndex( + taskId: string, + task: Pick, +) { + for (const sessionKey of getTaskSessionIndexKeys(task)) { + let ids = taskIdsBySessionKey.get(sessionKey); + if (!ids) { + ids = new Set(); + taskIdsBySessionKey.set(sessionKey, ids); + } + ids.add(taskId); + } +} + +function deleteSessionKeyIndex( + taskId: string, + task: Pick, +) { + for (const sessionKey of getTaskSessionIndexKeys(task)) { + const ids = taskIdsBySessionKey.get(sessionKey); + if (!ids) { + continue; + } + ids.delete(taskId); + if (ids.size === 0) { + taskIdsBySessionKey.delete(sessionKey); + } + } +} + function rebuildRunIdIndex() { taskIdsByRunId.clear(); for (const [taskId, task] of tasks.entries()) { @@ -225,6 +274,13 @@ function rebuildRunIdIndex() { } } +function rebuildSessionKeyIndex() { + taskIdsBySessionKey.clear(); + for (const [taskId, task] of tasks.entries()) { + addSessionKeyIndex(taskId, task); + } +} + function getTasksByRunId(runId: string): TaskRecord[] { const ids = taskIdsByRunId.get(runId.trim()); if (!ids || ids.size === 0) { @@ -379,6 +435,7 @@ function restoreTaskRegistryOnce() { taskDeliveryStates.set(taskId, state); } rebuildRunIdIndex(); + rebuildSessionKeyIndex(); emitTaskRegistryHookEvent(() => ({ kind: "restored", tasks: snapshotTaskRecords(tasks), @@ -403,10 +460,19 @@ function updateTask(taskId: string, patch: Partial): TaskRecord | nu const terminalAt = next.endedAt ?? next.lastEventAt ?? Date.now(); next.cleanupAfter = terminalAt + DEFAULT_TASK_RETENTION_MS; } + const sessionIndexChanged = + normalizeSessionIndexKey(current.requesterSessionKey) !== + normalizeSessionIndexKey(next.requesterSessionKey) || + normalizeSessionIndexKey(current.childSessionKey) !== + normalizeSessionIndexKey(next.childSessionKey); tasks.set(taskId, next); if (patch.runId && patch.runId !== current.runId) { rebuildRunIdIndex(); } + if (sessionIndexChanged) { + deleteSessionKeyIndex(taskId, current); + addSessionKeyIndex(taskId, next); + } persistTaskUpsert(next); emitTaskRegistryHookEvent(() => ({ kind: "upserted", @@ -900,6 +966,7 @@ export function createTaskRecord(params: { requesterOrigin: normalizeDeliveryContext(params.requesterOrigin), }); addRunIdIndex(taskId, record.runId); + addSessionKeyIndex(taskId, record); persistTaskUpsert(record); emitTaskRegistryHookEvent(() => ({ kind: "upserted", @@ -1190,13 +1257,25 @@ export function findTaskByRunId(runId: string): TaskRecord | undefined { } export function findLatestTaskForSessionKey(sessionKey: string): TaskRecord | undefined { - const key = sessionKey.trim(); + const task = listTasksForSessionKey(sessionKey)[0]; + return task ? cloneTaskRecord(task) : undefined; +} + +export function listTasksForSessionKey(sessionKey: string): TaskRecord[] { + ensureTaskRegistryReady(); + const key = normalizeSessionIndexKey(sessionKey); if (!key) { - return undefined; + return []; } - return listTaskRecords().find( - (task) => task.childSessionKey === key || task.requesterSessionKey === key, - ); + const ids = taskIdsBySessionKey.get(key); + if (!ids || ids.size === 0) { + return []; + } + return [...ids] + .map((taskId) => tasks.get(taskId)) + .filter((task): task is TaskRecord => Boolean(task)) + .toSorted((left, right) => right.createdAt - left.createdAt) + .map((task) => cloneTaskRecord(task)); } export function resolveTaskForLookupToken(token: string): TaskRecord | undefined { @@ -1213,6 +1292,7 @@ export function deleteTaskRecordById(taskId: string): boolean { if (!current) { return false; } + deleteSessionKeyIndex(taskId, current); tasks.delete(taskId); taskDeliveryStates.delete(taskId); rebuildRunIdIndex(); @@ -1230,6 +1310,7 @@ export function resetTaskRegistryForTests(opts?: { persist?: boolean }) { tasks.clear(); taskDeliveryStates.clear(); taskIdsByRunId.clear(); + taskIdsBySessionKey.clear(); tasksWithPendingDelivery.clear(); restoreAttempted = false; resetTaskRegistryRuntimeForTests();