mirror of https://github.com/openclaw/openclaw.git
fix(subagents): harden task-registry lifecycle writes (#58869)
* fix(subagents): harden task-registry lifecycle writes * chore(changelog): note subagent task-registry hardening * fix(subagents): align lifecycle terminal timestamps * fix(subagents): sanitize lifecycle warning metadata
This commit is contained in:
parent
1ce410a7be
commit
8fce663861
|
|
@ -28,6 +28,7 @@ Docs: https://docs.openclaw.ai
|
|||
- Sessions/model switching: keep `/model` changes queued behind busy runs instead of interrupting the active turn, and retarget queued followups so later work picks up the new model as soon as the current turn finishes.
|
||||
- Plugins/bundled runtimes: restore externalized bundled plugin runtime dependency staging across packed installs, Docker builds, and local runtime staging so bundled plugins keep their declared runtime deps after the 2026.3.31 externalization change. (#58782)
|
||||
- LINE/runtime: resolve the packaged runtime contract from the built `dist/plugins/runtime` layout so LINE channels start correctly again after global npm installs on `2026.3.31`. (#58799) Thanks @vincentkoc.
|
||||
- Subagents/tasks: keep subagent completion and cleanup from crashing when task-registry writes fail, so a corrupt or missing task row no longer takes down the gateway during lifecycle finalization. Thanks @vincentkoc.
|
||||
- Sandbox/browser: compare browser runtime inspection against `agents.defaults.sandbox.browser.image` so `openclaw sandbox list --browser` stops reporting healthy browser containers as image mismatches. (#58759) Thanks @sandpile.
|
||||
|
||||
## 2026.3.31
|
||||
|
|
|
|||
|
|
@ -0,0 +1,167 @@
|
|||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { SUBAGENT_ENDED_REASON_COMPLETE } from "./subagent-lifecycle-events.js";
|
||||
import type { SubagentRunRecord } from "./subagent-registry.types.js";
|
||||
|
||||
const taskExecutorMocks = vi.hoisted(() => ({
|
||||
completeTaskRunByRunId: vi.fn(),
|
||||
failTaskRunByRunId: vi.fn(),
|
||||
setDetachedTaskDeliveryStatusByRunId: vi.fn(),
|
||||
}));
|
||||
|
||||
const helperMocks = vi.hoisted(() => ({
|
||||
persistSubagentSessionTiming: vi.fn(async () => {}),
|
||||
safeRemoveAttachmentsDir: vi.fn(async () => {}),
|
||||
}));
|
||||
|
||||
const lifecycleEventMocks = vi.hoisted(() => ({
|
||||
emitSessionLifecycleEvent: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock("../tasks/task-executor.js", () => ({
|
||||
completeTaskRunByRunId: taskExecutorMocks.completeTaskRunByRunId,
|
||||
failTaskRunByRunId: taskExecutorMocks.failTaskRunByRunId,
|
||||
setDetachedTaskDeliveryStatusByRunId: taskExecutorMocks.setDetachedTaskDeliveryStatusByRunId,
|
||||
}));
|
||||
|
||||
vi.mock("../sessions/session-lifecycle-events.js", () => ({
|
||||
emitSessionLifecycleEvent: lifecycleEventMocks.emitSessionLifecycleEvent,
|
||||
}));
|
||||
|
||||
vi.mock("./subagent-registry-helpers.js", async () => {
|
||||
const actual = await vi.importActual<typeof import("./subagent-registry-helpers.js")>(
|
||||
"./subagent-registry-helpers.js",
|
||||
);
|
||||
return {
|
||||
...actual,
|
||||
persistSubagentSessionTiming: helperMocks.persistSubagentSessionTiming,
|
||||
safeRemoveAttachmentsDir: helperMocks.safeRemoveAttachmentsDir,
|
||||
};
|
||||
});
|
||||
|
||||
function createRunEntry(overrides: Partial<SubagentRunRecord> = {}): SubagentRunRecord {
|
||||
return {
|
||||
runId: "run-1",
|
||||
childSessionKey: "agent:main:subagent:child",
|
||||
requesterSessionKey: "agent:main:main",
|
||||
requesterDisplayKey: "main",
|
||||
task: "finish the task",
|
||||
cleanup: "keep",
|
||||
createdAt: 1_000,
|
||||
startedAt: 2_000,
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
describe("subagent registry lifecycle hardening", () => {
|
||||
let mod: typeof import("./subagent-registry-lifecycle.js");
|
||||
|
||||
beforeEach(async () => {
|
||||
vi.resetModules();
|
||||
vi.clearAllMocks();
|
||||
mod = await import("./subagent-registry-lifecycle.js");
|
||||
});
|
||||
|
||||
it("does not reject completion when task finalization throws", async () => {
|
||||
const persist = vi.fn();
|
||||
const warn = vi.fn();
|
||||
const entry = createRunEntry();
|
||||
const runs = new Map([[entry.runId, entry]]);
|
||||
taskExecutorMocks.completeTaskRunByRunId.mockImplementation(() => {
|
||||
throw new Error("task store boom");
|
||||
});
|
||||
|
||||
const controller = mod.createSubagentRegistryLifecycleController({
|
||||
runs,
|
||||
resumedRuns: new Set(),
|
||||
subagentAnnounceTimeoutMs: 1_000,
|
||||
persist,
|
||||
clearPendingLifecycleError: vi.fn(),
|
||||
countPendingDescendantRuns: () => 0,
|
||||
suppressAnnounceForSteerRestart: () => false,
|
||||
shouldEmitEndedHookForRun: () => false,
|
||||
emitSubagentEndedHookForRun: vi.fn(async () => {}),
|
||||
notifyContextEngineSubagentEnded: vi.fn(async () => {}),
|
||||
resumeSubagentRun: vi.fn(),
|
||||
captureSubagentCompletionReply: vi.fn(async () => "final completion reply"),
|
||||
runSubagentAnnounceFlow: vi.fn(async () => true),
|
||||
warn,
|
||||
});
|
||||
|
||||
await expect(
|
||||
controller.completeSubagentRun({
|
||||
runId: entry.runId,
|
||||
endedAt: 4_000,
|
||||
outcome: { status: "ok" },
|
||||
reason: SUBAGENT_ENDED_REASON_COMPLETE,
|
||||
triggerCleanup: false,
|
||||
}),
|
||||
).resolves.toBeUndefined();
|
||||
|
||||
expect(warn).toHaveBeenCalledWith(
|
||||
"failed to finalize subagent background task state",
|
||||
expect.objectContaining({
|
||||
error: { name: "Error", message: "task store boom" },
|
||||
runId: "***",
|
||||
childSessionKey: "agent:main:…",
|
||||
outcomeStatus: "ok",
|
||||
}),
|
||||
);
|
||||
expect(helperMocks.persistSubagentSessionTiming).toHaveBeenCalledTimes(1);
|
||||
expect(lifecycleEventMocks.emitSessionLifecycleEvent).toHaveBeenCalledWith({
|
||||
sessionKey: "agent:main:subagent:child",
|
||||
reason: "subagent-status",
|
||||
parentSessionKey: "agent:main:main",
|
||||
label: undefined,
|
||||
});
|
||||
});
|
||||
|
||||
it("does not reject cleanup give-up when task delivery status update throws", async () => {
|
||||
const persist = vi.fn();
|
||||
const warn = vi.fn();
|
||||
const entry = createRunEntry({
|
||||
endedAt: 4_000,
|
||||
expectsCompletionMessage: false,
|
||||
retainAttachmentsOnKeep: true,
|
||||
});
|
||||
taskExecutorMocks.setDetachedTaskDeliveryStatusByRunId.mockImplementation(() => {
|
||||
throw new Error("delivery state boom");
|
||||
});
|
||||
|
||||
const controller = mod.createSubagentRegistryLifecycleController({
|
||||
runs: new Map([[entry.runId, entry]]),
|
||||
resumedRuns: new Set(),
|
||||
subagentAnnounceTimeoutMs: 1_000,
|
||||
persist,
|
||||
clearPendingLifecycleError: vi.fn(),
|
||||
countPendingDescendantRuns: () => 0,
|
||||
suppressAnnounceForSteerRestart: () => false,
|
||||
shouldEmitEndedHookForRun: () => false,
|
||||
emitSubagentEndedHookForRun: vi.fn(async () => {}),
|
||||
notifyContextEngineSubagentEnded: vi.fn(async () => {}),
|
||||
resumeSubagentRun: vi.fn(),
|
||||
captureSubagentCompletionReply: vi.fn(async () => undefined),
|
||||
runSubagentAnnounceFlow: vi.fn(async () => true),
|
||||
warn,
|
||||
});
|
||||
|
||||
await expect(
|
||||
controller.finalizeResumedAnnounceGiveUp({
|
||||
runId: entry.runId,
|
||||
entry,
|
||||
reason: "retry-limit",
|
||||
}),
|
||||
).resolves.toBeUndefined();
|
||||
|
||||
expect(warn).toHaveBeenCalledWith(
|
||||
"failed to update subagent background task delivery state",
|
||||
expect.objectContaining({
|
||||
error: { name: "Error", message: "delivery state boom" },
|
||||
runId: "***",
|
||||
childSessionKey: "agent:main:…",
|
||||
deliveryStatus: "failed",
|
||||
}),
|
||||
);
|
||||
expect(entry.cleanupCompletedAt).toBeTypeOf("number");
|
||||
expect(persist).toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
|
@ -1,4 +1,5 @@
|
|||
import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../auto-reply/tokens.js";
|
||||
import { formatErrorMessage, readErrorName } from "../infra/errors.js";
|
||||
import { defaultRuntime } from "../runtime.js";
|
||||
import { emitSessionLifecycleEvent } from "../sessions/session-lifecycle-events.js";
|
||||
import {
|
||||
|
|
@ -62,6 +63,94 @@ export function createSubagentRegistryLifecycleController(params: {
|
|||
runSubagentAnnounceFlow: typeof runSubagentAnnounceFlow;
|
||||
warn(message: string, meta?: Record<string, unknown>): void;
|
||||
}) {
|
||||
const maskRunId = (runId: string): string => {
|
||||
const trimmed = runId.trim();
|
||||
if (!trimmed) {
|
||||
return "unknown";
|
||||
}
|
||||
if (trimmed.length <= 8) {
|
||||
return "***";
|
||||
}
|
||||
return `${trimmed.slice(0, 4)}…${trimmed.slice(-4)}`;
|
||||
};
|
||||
|
||||
const maskSessionKey = (sessionKey: string): string => {
|
||||
const trimmed = sessionKey.trim();
|
||||
if (!trimmed) {
|
||||
return "unknown";
|
||||
}
|
||||
const prefix = trimmed.split(":").slice(0, 2).join(":") || "session";
|
||||
return `${prefix}:…`;
|
||||
};
|
||||
|
||||
const buildSafeLifecycleErrorMeta = (err: unknown): Record<string, string> => {
|
||||
const message = formatErrorMessage(err);
|
||||
const name = readErrorName(err);
|
||||
return name ? { name, message } : { message };
|
||||
};
|
||||
|
||||
const safeSetSubagentTaskDeliveryStatus = (args: {
|
||||
runId: string;
|
||||
childSessionKey: string;
|
||||
deliveryStatus: "failed";
|
||||
}) => {
|
||||
try {
|
||||
setDetachedTaskDeliveryStatusByRunId({
|
||||
runId: args.runId,
|
||||
runtime: "subagent",
|
||||
sessionKey: args.childSessionKey,
|
||||
deliveryStatus: args.deliveryStatus,
|
||||
});
|
||||
} catch (err) {
|
||||
params.warn("failed to update subagent background task delivery state", {
|
||||
error: buildSafeLifecycleErrorMeta(err),
|
||||
runId: maskRunId(args.runId),
|
||||
childSessionKey: maskSessionKey(args.childSessionKey),
|
||||
deliveryStatus: args.deliveryStatus,
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
const safeFinalizeSubagentTaskRun = (args: {
|
||||
entry: SubagentRunRecord;
|
||||
outcome: SubagentRunOutcome;
|
||||
}) => {
|
||||
const endedAt = args.entry.endedAt ?? Date.now();
|
||||
const lastEventAt = endedAt;
|
||||
try {
|
||||
if (args.outcome.status === "ok") {
|
||||
completeTaskRunByRunId({
|
||||
runId: args.entry.runId,
|
||||
runtime: "subagent",
|
||||
sessionKey: args.entry.childSessionKey,
|
||||
endedAt,
|
||||
lastEventAt,
|
||||
progressSummary: args.entry.frozenResultText ?? undefined,
|
||||
terminalSummary: null,
|
||||
});
|
||||
return;
|
||||
}
|
||||
failTaskRunByRunId({
|
||||
runId: args.entry.runId,
|
||||
runtime: "subagent",
|
||||
sessionKey: args.entry.childSessionKey,
|
||||
status: args.outcome.status === "timeout" ? "timed_out" : "failed",
|
||||
endedAt,
|
||||
lastEventAt,
|
||||
error: args.outcome.status === "error" ? args.outcome.error : undefined,
|
||||
progressSummary: args.entry.frozenResultText ?? undefined,
|
||||
terminalSummary: null,
|
||||
});
|
||||
} catch (err) {
|
||||
params.warn("failed to finalize subagent background task state", {
|
||||
error: buildSafeLifecycleErrorMeta(err),
|
||||
runId: maskRunId(args.entry.runId),
|
||||
childSessionKey: maskSessionKey(args.entry.childSessionKey),
|
||||
outcomeStatus: args.outcome.status,
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
const freezeRunResultAtCompletion = async (entry: SubagentRunRecord): Promise<boolean> => {
|
||||
if (entry.frozenResultText !== undefined) {
|
||||
return false;
|
||||
|
|
@ -158,10 +247,9 @@ export function createSubagentRegistryLifecycleController(params: {
|
|||
entry: SubagentRunRecord;
|
||||
reason: "retry-limit" | "expiry";
|
||||
}) => {
|
||||
setDetachedTaskDeliveryStatusByRunId({
|
||||
safeSetSubagentTaskDeliveryStatus({
|
||||
runId: giveUpParams.runId,
|
||||
runtime: "subagent",
|
||||
sessionKey: giveUpParams.entry.childSessionKey,
|
||||
childSessionKey: giveUpParams.entry.childSessionKey,
|
||||
deliveryStatus: "failed",
|
||||
});
|
||||
giveUpParams.entry.wakeOnDescendantSettle = undefined;
|
||||
|
|
@ -469,29 +557,10 @@ export function createSubagentRegistryLifecycleController(params: {
|
|||
if (mutated) {
|
||||
params.persist();
|
||||
}
|
||||
if (completeParams.outcome.status === "ok") {
|
||||
completeTaskRunByRunId({
|
||||
runId: entry.runId,
|
||||
runtime: "subagent",
|
||||
sessionKey: entry.childSessionKey,
|
||||
endedAt: entry.endedAt,
|
||||
lastEventAt: entry.endedAt ?? Date.now(),
|
||||
progressSummary: entry.frozenResultText ?? undefined,
|
||||
terminalSummary: null,
|
||||
});
|
||||
} else {
|
||||
failTaskRunByRunId({
|
||||
runId: entry.runId,
|
||||
runtime: "subagent",
|
||||
sessionKey: entry.childSessionKey,
|
||||
status: completeParams.outcome.status === "timeout" ? "timed_out" : "failed",
|
||||
endedAt: entry.endedAt,
|
||||
lastEventAt: entry.endedAt ?? Date.now(),
|
||||
error: completeParams.outcome.status === "error" ? completeParams.outcome.error : undefined,
|
||||
progressSummary: entry.frozenResultText ?? undefined,
|
||||
terminalSummary: null,
|
||||
});
|
||||
}
|
||||
safeFinalizeSubagentTaskRun({
|
||||
entry,
|
||||
outcome: completeParams.outcome,
|
||||
});
|
||||
|
||||
try {
|
||||
await persistSubagentSessionTiming(entry);
|
||||
|
|
|
|||
Loading…
Reference in New Issue