mirror of https://github.com/openclaw/openclaw.git
Tasks: route one-task emergence through parent flows (#57874)
This commit is contained in:
parent
7590c22db7
commit
9d9cf0d8ff
|
|
@ -23,6 +23,7 @@ Docs: https://docs.openclaw.ai
|
|||
- Matrix/network: add explicit `channels.matrix.proxy` config for routing Matrix traffic through an HTTP(S) proxy, including account-level overrides and matching probe/runtime behavior. (#56931) thanks @patrick-yingxi-pan.
|
||||
- Background tasks: turn tasks into a real shared background-run control plane instead of ACP-only bookkeeping by unifying ACP, subagent, cron, and background CLI execution under one SQLite-backed ledger, routing detached lifecycle updates through the executor seam, adding audit/maintenance/status visibility, tightening auto-cleanup and lost-run recovery, improving task awareness in internal status/tool surfaces, and clarifying the split between heartbeat/main-session automation and detached scheduled runs. Thanks @vincentkoc and @mbelinky.
|
||||
- Flows/tasks: add a minimal SQLite-backed flow registry plus task-to-flow linkage scaffolding, so orchestrated work can start gaining a first-class parent record without changing current task delivery behavior.
|
||||
- Flows/tasks: route one-task ACP and subagent updates through a parent flow owner context, so detached work can emerge back through the intended parent thread/session instead of speaking only as a raw child task.
|
||||
|
||||
### Fixes
|
||||
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
import crypto from "node:crypto";
|
||||
import { getFlowRegistryStore, resetFlowRegistryRuntimeForTests } from "./flow-registry.store.js";
|
||||
import type { FlowRecord, FlowStatus } from "./flow-registry.types.js";
|
||||
import type { TaskNotifyPolicy } from "./task-registry.types.js";
|
||||
import type { TaskNotifyPolicy, TaskRecord } from "./task-registry.types.js";
|
||||
|
||||
const flows = new Map<string, FlowRecord>();
|
||||
let restoreAttempted = false;
|
||||
|
|
@ -21,6 +21,31 @@ function ensureNotifyPolicy(notifyPolicy?: TaskNotifyPolicy): TaskNotifyPolicy {
|
|||
return notifyPolicy ?? "done_only";
|
||||
}
|
||||
|
||||
function resolveFlowGoal(task: Pick<TaskRecord, "label" | "task">): string {
|
||||
return task.label?.trim() || task.task.trim() || "Background task";
|
||||
}
|
||||
|
||||
export function deriveFlowStatusFromTask(
|
||||
task: Pick<TaskRecord, "status" | "terminalOutcome">,
|
||||
): FlowStatus {
|
||||
if (task.status === "queued") {
|
||||
return "queued";
|
||||
}
|
||||
if (task.status === "running") {
|
||||
return "running";
|
||||
}
|
||||
if (task.status === "succeeded") {
|
||||
return task.terminalOutcome === "blocked" ? "blocked" : "succeeded";
|
||||
}
|
||||
if (task.status === "cancelled") {
|
||||
return "cancelled";
|
||||
}
|
||||
if (task.status === "lost") {
|
||||
return "lost";
|
||||
}
|
||||
return "failed";
|
||||
}
|
||||
|
||||
function ensureFlowRegistryReady() {
|
||||
if (restoreAttempted) {
|
||||
return;
|
||||
|
|
@ -87,6 +112,43 @@ export function createFlowRecord(params: {
|
|||
return cloneFlowRecord(record);
|
||||
}
|
||||
|
||||
export function createFlowForTask(params: {
|
||||
task: Pick<
|
||||
TaskRecord,
|
||||
| "requesterSessionKey"
|
||||
| "notifyPolicy"
|
||||
| "status"
|
||||
| "terminalOutcome"
|
||||
| "label"
|
||||
| "task"
|
||||
| "createdAt"
|
||||
| "lastEventAt"
|
||||
| "endedAt"
|
||||
>;
|
||||
requesterOrigin?: FlowRecord["requesterOrigin"];
|
||||
}): FlowRecord {
|
||||
const terminalFlowStatus = deriveFlowStatusFromTask(params.task);
|
||||
const isTerminal =
|
||||
terminalFlowStatus === "succeeded" ||
|
||||
terminalFlowStatus === "blocked" ||
|
||||
terminalFlowStatus === "failed" ||
|
||||
terminalFlowStatus === "cancelled" ||
|
||||
terminalFlowStatus === "lost";
|
||||
const endedAt = isTerminal
|
||||
? (params.task.endedAt ?? params.task.lastEventAt ?? params.task.createdAt)
|
||||
: undefined;
|
||||
return createFlowRecord({
|
||||
ownerSessionKey: params.task.requesterSessionKey,
|
||||
requesterOrigin: params.requesterOrigin,
|
||||
status: terminalFlowStatus,
|
||||
notifyPolicy: params.task.notifyPolicy,
|
||||
goal: resolveFlowGoal(params.task),
|
||||
createdAt: params.task.createdAt,
|
||||
updatedAt: params.task.lastEventAt ?? params.task.createdAt,
|
||||
...(endedAt !== undefined ? { endedAt } : {}),
|
||||
});
|
||||
}
|
||||
|
||||
export function updateFlowRecordById(
|
||||
flowId: string,
|
||||
patch: Partial<
|
||||
|
|
@ -113,6 +175,43 @@ export function updateFlowRecordById(
|
|||
return cloneFlowRecord(next);
|
||||
}
|
||||
|
||||
export function syncFlowFromTask(
|
||||
task: Pick<
|
||||
TaskRecord,
|
||||
| "parentFlowId"
|
||||
| "status"
|
||||
| "terminalOutcome"
|
||||
| "notifyPolicy"
|
||||
| "label"
|
||||
| "task"
|
||||
| "lastEventAt"
|
||||
| "endedAt"
|
||||
>,
|
||||
): FlowRecord | null {
|
||||
const flowId = task.parentFlowId?.trim();
|
||||
if (!flowId) {
|
||||
return null;
|
||||
}
|
||||
const terminalFlowStatus = deriveFlowStatusFromTask(task);
|
||||
const isTerminal =
|
||||
terminalFlowStatus === "succeeded" ||
|
||||
terminalFlowStatus === "blocked" ||
|
||||
terminalFlowStatus === "failed" ||
|
||||
terminalFlowStatus === "cancelled" ||
|
||||
terminalFlowStatus === "lost";
|
||||
return updateFlowRecordById(flowId, {
|
||||
status: terminalFlowStatus,
|
||||
notifyPolicy: task.notifyPolicy,
|
||||
goal: resolveFlowGoal(task),
|
||||
updatedAt: task.lastEventAt ?? Date.now(),
|
||||
...(isTerminal
|
||||
? {
|
||||
endedAt: task.endedAt ?? task.lastEventAt ?? Date.now(),
|
||||
}
|
||||
: {}),
|
||||
});
|
||||
}
|
||||
|
||||
export function getFlowById(flowId: string): FlowRecord | undefined {
|
||||
ensureFlowRegistryReady();
|
||||
const flow = flows.get(flowId);
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
import { afterEach, describe, expect, it } from "vitest";
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import { withTempDir } from "../test-helpers/temp-dir.js";
|
||||
import { getFlowById, listFlowRecords, resetFlowRegistryForTests } from "./flow-registry.js";
|
||||
import {
|
||||
completeTaskRunByRunId,
|
||||
createQueuedTaskRun,
|
||||
|
|
@ -12,15 +13,41 @@ import {
|
|||
import { findTaskByRunId, resetTaskRegistryForTests } from "./task-registry.js";
|
||||
|
||||
const ORIGINAL_STATE_DIR = process.env.OPENCLAW_STATE_DIR;
|
||||
const hoisted = vi.hoisted(() => {
|
||||
const sendMessageMock = vi.fn();
|
||||
const cancelSessionMock = vi.fn();
|
||||
const killSubagentRunAdminMock = vi.fn();
|
||||
return {
|
||||
sendMessageMock,
|
||||
cancelSessionMock,
|
||||
killSubagentRunAdminMock,
|
||||
};
|
||||
});
|
||||
|
||||
vi.mock("./task-registry-delivery-runtime.js", () => ({
|
||||
sendMessage: hoisted.sendMessageMock,
|
||||
}));
|
||||
|
||||
vi.mock("../acp/control-plane/manager.js", () => ({
|
||||
getAcpSessionManager: () => ({
|
||||
cancelSession: hoisted.cancelSessionMock,
|
||||
}),
|
||||
}));
|
||||
|
||||
vi.mock("../agents/subagent-control.js", () => ({
|
||||
killSubagentRunAdmin: (params: unknown) => hoisted.killSubagentRunAdminMock(params),
|
||||
}));
|
||||
|
||||
async function withTaskExecutorStateDir(run: (root: string) => Promise<void>): Promise<void> {
|
||||
await withTempDir({ prefix: "openclaw-task-executor-" }, async (root) => {
|
||||
process.env.OPENCLAW_STATE_DIR = root;
|
||||
resetTaskRegistryForTests();
|
||||
resetFlowRegistryForTests();
|
||||
try {
|
||||
await run(root);
|
||||
} finally {
|
||||
resetTaskRegistryForTests();
|
||||
resetFlowRegistryForTests();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
@ -33,6 +60,10 @@ describe("task-executor", () => {
|
|||
process.env.OPENCLAW_STATE_DIR = ORIGINAL_STATE_DIR;
|
||||
}
|
||||
resetTaskRegistryForTests();
|
||||
resetFlowRegistryForTests();
|
||||
hoisted.sendMessageMock.mockReset();
|
||||
hoisted.cancelSessionMock.mockReset();
|
||||
hoisted.killSubagentRunAdminMock.mockReset();
|
||||
});
|
||||
|
||||
it("advances a queued run through start and completion", async () => {
|
||||
|
|
@ -110,4 +141,59 @@ describe("task-executor", () => {
|
|||
});
|
||||
});
|
||||
});
|
||||
|
||||
it("auto-creates a one-task flow and keeps it synced with task status", async () => {
|
||||
await withTaskExecutorStateDir(async () => {
|
||||
const created = createRunningTaskRun({
|
||||
runtime: "subagent",
|
||||
requesterSessionKey: "agent:main:main",
|
||||
childSessionKey: "agent:codex:subagent:child",
|
||||
runId: "run-executor-flow",
|
||||
task: "Write summary",
|
||||
startedAt: 10,
|
||||
deliveryStatus: "pending",
|
||||
});
|
||||
|
||||
expect(created.parentFlowId).toEqual(expect.any(String));
|
||||
expect(getFlowById(created.parentFlowId!)).toMatchObject({
|
||||
flowId: created.parentFlowId,
|
||||
ownerSessionKey: "agent:main:main",
|
||||
status: "running",
|
||||
goal: "Write summary",
|
||||
notifyPolicy: "done_only",
|
||||
});
|
||||
|
||||
completeTaskRunByRunId({
|
||||
runId: "run-executor-flow",
|
||||
endedAt: 40,
|
||||
lastEventAt: 40,
|
||||
terminalSummary: "Done.",
|
||||
});
|
||||
|
||||
expect(getFlowById(created.parentFlowId!)).toMatchObject({
|
||||
flowId: created.parentFlowId,
|
||||
status: "succeeded",
|
||||
endedAt: 40,
|
||||
goal: "Write summary",
|
||||
notifyPolicy: "done_only",
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
it("does not auto-create one-task flows for non-returning bookkeeping runs", async () => {
|
||||
await withTaskExecutorStateDir(async () => {
|
||||
const created = createRunningTaskRun({
|
||||
runtime: "cli",
|
||||
requesterSessionKey: "agent:main:main",
|
||||
childSessionKey: "agent:main:main",
|
||||
runId: "run-executor-cli",
|
||||
task: "Foreground gateway run",
|
||||
deliveryStatus: "not_applicable",
|
||||
startedAt: 10,
|
||||
});
|
||||
|
||||
expect(created.parentFlowId).toBeUndefined();
|
||||
expect(listFlowRecords()).toEqual([]);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -1,7 +1,10 @@
|
|||
import type { OpenClawConfig } from "../config/config.js";
|
||||
import { createSubsystemLogger } from "../logging/subsystem.js";
|
||||
import { createFlowForTask, deleteFlowRecordById } from "./flow-registry.js";
|
||||
import {
|
||||
cancelTaskById,
|
||||
createTaskRecord,
|
||||
linkTaskToFlowById,
|
||||
markTaskLostById,
|
||||
markTaskRunningByRunId,
|
||||
markTaskTerminalByRunId,
|
||||
|
|
@ -18,6 +21,53 @@ import type {
|
|||
TaskTerminalOutcome,
|
||||
} from "./task-registry.types.js";
|
||||
|
||||
const log = createSubsystemLogger("tasks/executor");
|
||||
|
||||
function isOneTaskFlowEligible(task: TaskRecord): boolean {
|
||||
if (task.parentFlowId?.trim() || !task.requesterSessionKey.trim()) {
|
||||
return false;
|
||||
}
|
||||
if (task.deliveryStatus === "not_applicable") {
|
||||
return false;
|
||||
}
|
||||
return task.runtime === "acp" || task.runtime === "subagent";
|
||||
}
|
||||
|
||||
function ensureSingleTaskFlow(params: {
|
||||
task: TaskRecord;
|
||||
requesterOrigin?: TaskDeliveryState["requesterOrigin"];
|
||||
}): TaskRecord {
|
||||
if (!isOneTaskFlowEligible(params.task)) {
|
||||
return params.task;
|
||||
}
|
||||
try {
|
||||
const flow = createFlowForTask({
|
||||
task: params.task,
|
||||
requesterOrigin: params.requesterOrigin,
|
||||
});
|
||||
const linked = linkTaskToFlowById({
|
||||
taskId: params.task.taskId,
|
||||
flowId: flow.flowId,
|
||||
});
|
||||
if (!linked) {
|
||||
deleteFlowRecordById(flow.flowId);
|
||||
return params.task;
|
||||
}
|
||||
if (linked.parentFlowId !== flow.flowId) {
|
||||
deleteFlowRecordById(flow.flowId);
|
||||
return linked;
|
||||
}
|
||||
return linked;
|
||||
} catch (error) {
|
||||
log.warn("Failed to create one-task flow for detached run", {
|
||||
taskId: params.task.taskId,
|
||||
runId: params.task.runId,
|
||||
error,
|
||||
});
|
||||
return params.task;
|
||||
}
|
||||
}
|
||||
|
||||
export function createQueuedTaskRun(params: {
|
||||
runtime: TaskRuntime;
|
||||
sourceId?: string;
|
||||
|
|
@ -34,10 +84,14 @@ export function createQueuedTaskRun(params: {
|
|||
notifyPolicy?: TaskNotifyPolicy;
|
||||
deliveryStatus?: TaskDeliveryStatus;
|
||||
}): TaskRecord {
|
||||
return createTaskRecord({
|
||||
const task = createTaskRecord({
|
||||
...params,
|
||||
status: "queued",
|
||||
});
|
||||
return ensureSingleTaskFlow({
|
||||
task,
|
||||
requesterOrigin: params.requesterOrigin,
|
||||
});
|
||||
}
|
||||
|
||||
export function createRunningTaskRun(params: {
|
||||
|
|
@ -59,10 +113,14 @@ export function createRunningTaskRun(params: {
|
|||
lastEventAt?: number;
|
||||
progressSummary?: string | null;
|
||||
}): TaskRecord {
|
||||
return createTaskRecord({
|
||||
const task = createTaskRecord({
|
||||
...params,
|
||||
status: "running",
|
||||
});
|
||||
return ensureSingleTaskFlow({
|
||||
task,
|
||||
requesterOrigin: params.requesterOrigin,
|
||||
});
|
||||
}
|
||||
|
||||
export function startTaskRunByRunId(params: {
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ import {
|
|||
} from "../infra/heartbeat-wake.js";
|
||||
import { peekSystemEvents, resetSystemEventsForTest } from "../infra/system-events.js";
|
||||
import { withTempDir } from "../test-helpers/temp-dir.js";
|
||||
import { createFlowRecord, getFlowById, resetFlowRegistryForTests } from "./flow-registry.js";
|
||||
import {
|
||||
createTaskRecord,
|
||||
findLatestTaskForSessionKey,
|
||||
|
|
@ -108,6 +109,7 @@ describe("task-registry", () => {
|
|||
resetSystemEventsForTest();
|
||||
resetHeartbeatWakeStateForTests();
|
||||
resetTaskRegistryForTests();
|
||||
resetFlowRegistryForTests();
|
||||
hoisted.sendMessageMock.mockReset();
|
||||
hoisted.cancelSessionMock.mockReset();
|
||||
hoisted.killSubagentRunAdminMock.mockReset();
|
||||
|
|
@ -1112,6 +1114,70 @@ describe("task-registry", () => {
|
|||
});
|
||||
});
|
||||
|
||||
it("routes state-change updates through the parent flow owner when a task is flow-linked", async () => {
|
||||
await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => {
|
||||
process.env.OPENCLAW_STATE_DIR = root;
|
||||
resetTaskRegistryForTests();
|
||||
resetFlowRegistryForTests();
|
||||
hoisted.sendMessageMock.mockResolvedValue({
|
||||
channel: "discord",
|
||||
to: "discord:flow",
|
||||
via: "direct",
|
||||
});
|
||||
|
||||
const flow = createFlowRecord({
|
||||
ownerSessionKey: "agent:flow:owner",
|
||||
requesterOrigin: {
|
||||
channel: "discord",
|
||||
to: "discord:flow",
|
||||
threadId: "444",
|
||||
},
|
||||
status: "queued",
|
||||
notifyPolicy: "state_changes",
|
||||
goal: "Investigate issue",
|
||||
});
|
||||
|
||||
const task = createTaskRecord({
|
||||
runtime: "acp",
|
||||
requesterSessionKey: "agent:main:main",
|
||||
requesterOrigin: {
|
||||
channel: "telegram",
|
||||
to: "telegram:123",
|
||||
},
|
||||
parentFlowId: flow.flowId,
|
||||
childSessionKey: "agent:codex:acp:child",
|
||||
runId: "run-flow-state",
|
||||
task: "Investigate issue",
|
||||
status: "queued",
|
||||
notifyPolicy: "state_changes",
|
||||
});
|
||||
|
||||
markTaskRunningByRunId({
|
||||
runId: "run-flow-state",
|
||||
eventSummary: "Started.",
|
||||
});
|
||||
|
||||
await waitForAssertion(() =>
|
||||
expect(hoisted.sendMessageMock).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
channel: "discord",
|
||||
to: "discord:flow",
|
||||
threadId: "444",
|
||||
idempotencyKey: expect.stringContaining(`flow-event:${flow.flowId}:${task.taskId}:`),
|
||||
mirror: expect.objectContaining({
|
||||
sessionKey: "agent:flow:owner",
|
||||
idempotencyKey: expect.stringContaining(`flow-event:${flow.flowId}:${task.taskId}:`),
|
||||
}),
|
||||
}),
|
||||
),
|
||||
);
|
||||
expect(getFlowById(flow.flowId)).toMatchObject({
|
||||
flowId: flow.flowId,
|
||||
status: "running",
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
it("keeps background ACP progress off the foreground lane and only sends a terminal notify", async () => {
|
||||
await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => {
|
||||
process.env.OPENCLAW_STATE_DIR = root;
|
||||
|
|
@ -1235,6 +1301,122 @@ describe("task-registry", () => {
|
|||
});
|
||||
});
|
||||
|
||||
it("routes terminal delivery through the parent flow owner when a task is flow-linked", async () => {
|
||||
await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => {
|
||||
process.env.OPENCLAW_STATE_DIR = root;
|
||||
resetTaskRegistryForTests();
|
||||
resetFlowRegistryForTests();
|
||||
resetSystemEventsForTest();
|
||||
hoisted.sendMessageMock.mockResolvedValue({
|
||||
channel: "discord",
|
||||
to: "discord:flow",
|
||||
via: "direct",
|
||||
});
|
||||
|
||||
const flow = createFlowRecord({
|
||||
ownerSessionKey: "agent:flow:owner",
|
||||
requesterOrigin: {
|
||||
channel: "discord",
|
||||
to: "discord:flow",
|
||||
threadId: "444",
|
||||
},
|
||||
status: "running",
|
||||
goal: "Investigate issue",
|
||||
});
|
||||
|
||||
createTaskRecord({
|
||||
runtime: "acp",
|
||||
requesterSessionKey: "agent:main:main",
|
||||
requesterOrigin: {
|
||||
channel: "telegram",
|
||||
to: "telegram:123",
|
||||
},
|
||||
parentFlowId: flow.flowId,
|
||||
childSessionKey: "agent:codex:acp:child",
|
||||
runId: "run-flow-terminal",
|
||||
task: "Investigate issue",
|
||||
status: "running",
|
||||
deliveryStatus: "pending",
|
||||
});
|
||||
|
||||
emitAgentEvent({
|
||||
runId: "run-flow-terminal",
|
||||
stream: "lifecycle",
|
||||
data: {
|
||||
phase: "end",
|
||||
endedAt: 250,
|
||||
},
|
||||
});
|
||||
await flushAsyncWork();
|
||||
|
||||
await waitForAssertion(() =>
|
||||
expect(hoisted.sendMessageMock).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
channel: "discord",
|
||||
to: "discord:flow",
|
||||
threadId: "444",
|
||||
idempotencyKey: expect.stringContaining(`flow-terminal:${flow.flowId}:`),
|
||||
mirror: expect.objectContaining({
|
||||
sessionKey: "agent:flow:owner",
|
||||
}),
|
||||
}),
|
||||
),
|
||||
);
|
||||
expect(getFlowById(flow.flowId)).toMatchObject({
|
||||
flowId: flow.flowId,
|
||||
status: "succeeded",
|
||||
endedAt: 250,
|
||||
});
|
||||
expect(peekSystemEvents("agent:main:main")).toEqual([]);
|
||||
});
|
||||
});
|
||||
|
||||
it("queues fallback terminal delivery on the parent flow owner session when a task is flow-linked", async () => {
|
||||
await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => {
|
||||
process.env.OPENCLAW_STATE_DIR = root;
|
||||
resetTaskRegistryForTests();
|
||||
resetFlowRegistryForTests();
|
||||
resetSystemEventsForTest();
|
||||
|
||||
const flow = createFlowRecord({
|
||||
ownerSessionKey: "agent:flow:owner",
|
||||
status: "running",
|
||||
goal: "Investigate issue",
|
||||
});
|
||||
|
||||
createTaskRecord({
|
||||
runtime: "acp",
|
||||
requesterSessionKey: "agent:main:main",
|
||||
parentFlowId: flow.flowId,
|
||||
childSessionKey: "agent:codex:acp:child",
|
||||
runId: "run-flow-fallback",
|
||||
task: "Investigate issue",
|
||||
status: "running",
|
||||
deliveryStatus: "pending",
|
||||
});
|
||||
|
||||
emitAgentEvent({
|
||||
runId: "run-flow-fallback",
|
||||
stream: "lifecycle",
|
||||
data: {
|
||||
phase: "end",
|
||||
endedAt: 250,
|
||||
},
|
||||
});
|
||||
await flushAsyncWork();
|
||||
|
||||
await waitForAssertion(() =>
|
||||
expect(peekSystemEvents("agent:flow:owner")).toEqual([
|
||||
"Background task done: ACP background task (run run-flow).",
|
||||
]),
|
||||
);
|
||||
expect(peekSystemEvents("agent:main:main")).toEqual([]);
|
||||
expect(findTaskByRunId("run-flow-fallback")).toMatchObject({
|
||||
deliveryStatus: "session_queued",
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
it("emits concise state-change updates without surfacing raw ACP chatter", async () => {
|
||||
await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => {
|
||||
process.env.OPENCLAW_STATE_DIR = root;
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ import { createSubsystemLogger } from "../logging/subsystem.js";
|
|||
import { parseAgentSessionKey } from "../routing/session-key.js";
|
||||
import { normalizeDeliveryContext } from "../utils/delivery-context.js";
|
||||
import { isDeliverableMessageChannel } from "../utils/message-channel.js";
|
||||
import { getFlowById, syncFlowFromTask } from "./flow-registry.js";
|
||||
import {
|
||||
formatTaskBlockedFollowupMessage,
|
||||
formatTaskStateChangeMessage,
|
||||
|
|
@ -53,6 +54,12 @@ let restoreAttempted = false;
|
|||
let deliveryRuntimePromise: Promise<typeof import("./task-registry-delivery-runtime.js")> | null =
|
||||
null;
|
||||
|
||||
type TaskDeliveryOwner = {
|
||||
sessionKey: string;
|
||||
requesterOrigin?: TaskDeliveryState["requesterOrigin"];
|
||||
flowId?: string;
|
||||
};
|
||||
|
||||
function cloneTaskRecord(record: TaskRecord): TaskRecord {
|
||||
return { ...record };
|
||||
}
|
||||
|
|
@ -433,6 +440,39 @@ function taskTerminalDeliveryIdempotencyKey(task: TaskRecord): string {
|
|||
return `task-terminal:${task.taskId}:${task.status}:${outcome}`;
|
||||
}
|
||||
|
||||
function flowTerminalDeliveryIdempotencyKey(flowId: string, task: TaskRecord): string {
|
||||
const outcome = task.status === "succeeded" ? (task.terminalOutcome ?? "default") : "default";
|
||||
return `flow-terminal:${flowId}:${task.taskId}:${task.status}:${outcome}`;
|
||||
}
|
||||
|
||||
function resolveTaskStateChangeIdempotencyKey(params: {
|
||||
task: TaskRecord;
|
||||
latestEvent: TaskEventRecord;
|
||||
owner: TaskDeliveryOwner;
|
||||
}): string {
|
||||
if (params.owner.flowId) {
|
||||
return `flow-event:${params.owner.flowId}:${params.task.taskId}:${params.latestEvent.at}:${params.latestEvent.kind}`;
|
||||
}
|
||||
return `task-event:${params.task.taskId}:${params.latestEvent.at}:${params.latestEvent.kind}`;
|
||||
}
|
||||
|
||||
function resolveTaskTerminalIdempotencyKey(task: TaskRecord, owner: TaskDeliveryOwner): string {
|
||||
return owner.flowId
|
||||
? flowTerminalDeliveryIdempotencyKey(owner.flowId, task)
|
||||
: taskTerminalDeliveryIdempotencyKey(task);
|
||||
}
|
||||
|
||||
function resolveTaskDeliveryOwner(task: TaskRecord): TaskDeliveryOwner {
|
||||
const flow = task.parentFlowId?.trim() ? getFlowById(task.parentFlowId) : undefined;
|
||||
return {
|
||||
sessionKey: flow?.ownerSessionKey?.trim() || task.requesterSessionKey.trim(),
|
||||
requesterOrigin: normalizeDeliveryContext(
|
||||
flow?.requesterOrigin ?? taskDeliveryStates.get(task.taskId)?.requesterOrigin,
|
||||
),
|
||||
...(flow ? { flowId: flow.flowId } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
function restoreTaskRegistryOnce() {
|
||||
if (restoreAttempted) {
|
||||
return;
|
||||
|
|
@ -489,6 +529,15 @@ function updateTask(taskId: string, patch: Partial<TaskRecord>): TaskRecord | nu
|
|||
addSessionKeyIndex(taskId, next);
|
||||
}
|
||||
persistTaskUpsert(next);
|
||||
try {
|
||||
syncFlowFromTask(next);
|
||||
} catch (error) {
|
||||
log.warn("Failed to sync parent flow from task update", {
|
||||
taskId,
|
||||
flowId: next.parentFlowId,
|
||||
error,
|
||||
});
|
||||
}
|
||||
emitTaskRegistryHookEvent(() => ({
|
||||
kind: "upserted",
|
||||
task: cloneTaskRecord(next),
|
||||
|
|
@ -522,21 +571,22 @@ function getTaskDeliveryState(taskId: string): TaskDeliveryState | undefined {
|
|||
}
|
||||
|
||||
function canDeliverTaskToRequesterOrigin(task: TaskRecord): boolean {
|
||||
const origin = normalizeDeliveryContext(taskDeliveryStates.get(task.taskId)?.requesterOrigin);
|
||||
const origin = resolveTaskDeliveryOwner(task).requesterOrigin;
|
||||
const channel = origin?.channel?.trim();
|
||||
const to = origin?.to?.trim();
|
||||
return Boolean(channel && to && isDeliverableMessageChannel(channel));
|
||||
}
|
||||
|
||||
function queueTaskSystemEvent(task: TaskRecord, text: string) {
|
||||
const requesterSessionKey = task.requesterSessionKey.trim();
|
||||
const owner = resolveTaskDeliveryOwner(task);
|
||||
const requesterSessionKey = owner.sessionKey.trim();
|
||||
if (!requesterSessionKey) {
|
||||
return false;
|
||||
}
|
||||
enqueueSystemEvent(text, {
|
||||
sessionKey: requesterSessionKey,
|
||||
contextKey: `task:${task.taskId}`,
|
||||
deliveryContext: taskDeliveryStates.get(task.taskId)?.requesterOrigin,
|
||||
contextKey: owner.flowId ? `flow:${owner.flowId}` : `task:${task.taskId}`,
|
||||
deliveryContext: owner.requesterOrigin,
|
||||
});
|
||||
requestHeartbeatNow({
|
||||
reason: "background-task",
|
||||
|
|
@ -550,14 +600,17 @@ function queueBlockedTaskFollowup(task: TaskRecord) {
|
|||
if (!followupText) {
|
||||
return false;
|
||||
}
|
||||
const requesterSessionKey = task.requesterSessionKey.trim();
|
||||
const owner = resolveTaskDeliveryOwner(task);
|
||||
const requesterSessionKey = owner.sessionKey.trim();
|
||||
if (!requesterSessionKey) {
|
||||
return false;
|
||||
}
|
||||
enqueueSystemEvent(followupText, {
|
||||
sessionKey: requesterSessionKey,
|
||||
contextKey: `task:${task.taskId}:blocked-followup`,
|
||||
deliveryContext: taskDeliveryStates.get(task.taskId)?.requesterOrigin,
|
||||
contextKey: owner.flowId
|
||||
? `flow:${owner.flowId}:blocked-followup`
|
||||
: `task:${task.taskId}:blocked-followup`,
|
||||
deliveryContext: owner.requesterOrigin,
|
||||
});
|
||||
requestHeartbeatNow({
|
||||
reason: "background-task-blocked",
|
||||
|
|
@ -592,7 +645,8 @@ export async function maybeDeliverTaskTerminalUpdate(taskId: string): Promise<Ta
|
|||
lastEventAt: Date.now(),
|
||||
});
|
||||
}
|
||||
if (!latest.requesterSessionKey.trim()) {
|
||||
const owner = resolveTaskDeliveryOwner(latest);
|
||||
if (!owner.sessionKey.trim()) {
|
||||
return updateTask(taskId, {
|
||||
deliveryStatus: "parent_missing",
|
||||
lastEventAt: Date.now(),
|
||||
|
|
@ -623,20 +677,20 @@ export async function maybeDeliverTaskTerminalUpdate(taskId: string): Promise<Ta
|
|||
}
|
||||
try {
|
||||
const { sendMessage } = await loadTaskRegistryDeliveryRuntime();
|
||||
const origin = normalizeDeliveryContext(taskDeliveryStates.get(taskId)?.requesterOrigin);
|
||||
const requesterAgentId = parseAgentSessionKey(latest.requesterSessionKey)?.agentId;
|
||||
const requesterAgentId = parseAgentSessionKey(owner.sessionKey)?.agentId;
|
||||
const idempotencyKey = resolveTaskTerminalIdempotencyKey(latest, owner);
|
||||
await sendMessage({
|
||||
channel: origin?.channel,
|
||||
to: origin?.to ?? "",
|
||||
accountId: origin?.accountId,
|
||||
threadId: origin?.threadId,
|
||||
channel: owner.requesterOrigin?.channel,
|
||||
to: owner.requesterOrigin?.to ?? "",
|
||||
accountId: owner.requesterOrigin?.accountId,
|
||||
threadId: owner.requesterOrigin?.threadId,
|
||||
content: eventText,
|
||||
agentId: requesterAgentId,
|
||||
idempotencyKey: taskTerminalDeliveryIdempotencyKey(latest),
|
||||
idempotencyKey,
|
||||
mirror: {
|
||||
sessionKey: latest.requesterSessionKey,
|
||||
sessionKey: owner.sessionKey,
|
||||
agentId: requesterAgentId,
|
||||
idempotencyKey: taskTerminalDeliveryIdempotencyKey(latest),
|
||||
idempotencyKey,
|
||||
},
|
||||
});
|
||||
if (latest.terminalOutcome === "blocked") {
|
||||
|
|
@ -649,8 +703,8 @@ export async function maybeDeliverTaskTerminalUpdate(taskId: string): Promise<Ta
|
|||
} catch (error) {
|
||||
log.warn("Failed to deliver background task update", {
|
||||
taskId,
|
||||
requesterSessionKey: latest.requesterSessionKey,
|
||||
requesterOrigin: taskDeliveryStates.get(taskId)?.requesterOrigin,
|
||||
requesterSessionKey: owner.sessionKey,
|
||||
requesterOrigin: owner.requesterOrigin,
|
||||
error,
|
||||
});
|
||||
try {
|
||||
|
|
@ -693,6 +747,7 @@ export async function maybeDeliverTaskStateChangeUpdate(
|
|||
return cloneTaskRecord(current);
|
||||
}
|
||||
try {
|
||||
const owner = resolveTaskDeliveryOwner(current);
|
||||
if (!canDeliverTaskToRequesterOrigin(current)) {
|
||||
queueTaskSystemEvent(current, eventText);
|
||||
upsertTaskDeliveryState({
|
||||
|
|
@ -705,20 +760,24 @@ export async function maybeDeliverTaskStateChangeUpdate(
|
|||
});
|
||||
}
|
||||
const { sendMessage } = await loadTaskRegistryDeliveryRuntime();
|
||||
const origin = normalizeDeliveryContext(deliveryState?.requesterOrigin);
|
||||
const requesterAgentId = parseAgentSessionKey(current.requesterSessionKey)?.agentId;
|
||||
const requesterAgentId = parseAgentSessionKey(owner.sessionKey)?.agentId;
|
||||
const idempotencyKey = resolveTaskStateChangeIdempotencyKey({
|
||||
task: current,
|
||||
latestEvent,
|
||||
owner,
|
||||
});
|
||||
await sendMessage({
|
||||
channel: origin?.channel,
|
||||
to: origin?.to ?? "",
|
||||
accountId: origin?.accountId,
|
||||
threadId: origin?.threadId,
|
||||
channel: owner.requesterOrigin?.channel,
|
||||
to: owner.requesterOrigin?.to ?? "",
|
||||
accountId: owner.requesterOrigin?.accountId,
|
||||
threadId: owner.requesterOrigin?.threadId,
|
||||
content: eventText,
|
||||
agentId: requesterAgentId,
|
||||
idempotencyKey: `task-event:${current.taskId}:${latestEvent.at}:${latestEvent.kind}`,
|
||||
idempotencyKey,
|
||||
mirror: {
|
||||
sessionKey: current.requesterSessionKey,
|
||||
sessionKey: owner.sessionKey,
|
||||
agentId: requesterAgentId,
|
||||
idempotencyKey: `task-event:${current.taskId}:${latestEvent.at}:${latestEvent.kind}`,
|
||||
idempotencyKey,
|
||||
},
|
||||
});
|
||||
upsertTaskDeliveryState({
|
||||
|
|
@ -1158,6 +1217,24 @@ export function updateTaskNotifyPolicyById(params: {
|
|||
});
|
||||
}
|
||||
|
||||
export function linkTaskToFlowById(params: { taskId: string; flowId: string }): TaskRecord | null {
|
||||
ensureTaskRegistryReady();
|
||||
const flowId = params.flowId.trim();
|
||||
if (!flowId) {
|
||||
return null;
|
||||
}
|
||||
const current = tasks.get(params.taskId);
|
||||
if (!current) {
|
||||
return null;
|
||||
}
|
||||
if (current.parentFlowId?.trim()) {
|
||||
return cloneTaskRecord(current);
|
||||
}
|
||||
return updateTask(params.taskId, {
|
||||
parentFlowId: flowId,
|
||||
});
|
||||
}
|
||||
|
||||
export async function cancelTaskById(params: {
|
||||
cfg: OpenClawConfig;
|
||||
taskId: string;
|
||||
|
|
|
|||
Loading…
Reference in New Issue