mirror of https://github.com/openclaw/openclaw.git
fix(tasks): make task-store writes atomic (#58521)
This commit is contained in:
parent
0abd143d37
commit
66413487c8
|
|
@ -403,7 +403,7 @@ function openTaskRegistryDatabase(): TaskRegistryDatabase {
|
|||
const { DatabaseSync } = requireNodeSqlite();
|
||||
const db = new DatabaseSync(pathname);
|
||||
db.exec(`PRAGMA journal_mode = WAL;`);
|
||||
db.exec(`PRAGMA synchronous = NORMAL;`);
|
||||
db.exec(`PRAGMA synchronous = FULL;`);
|
||||
db.exec(`PRAGMA busy_timeout = 5000;`);
|
||||
ensureSchema(db);
|
||||
ensureTaskRegistryPermissions(pathname);
|
||||
|
|
@ -457,6 +457,20 @@ export function upsertTaskRegistryRecordToSqlite(task: TaskRecord) {
|
|||
ensureTaskRegistryPermissions(store.path);
|
||||
}
|
||||
|
||||
export function upsertTaskWithDeliveryStateToSqlite(params: {
|
||||
task: TaskRecord;
|
||||
deliveryState?: TaskDeliveryState;
|
||||
}) {
|
||||
withWriteTransaction((statements) => {
|
||||
statements.upsertRow.run(bindTaskRecord(params.task));
|
||||
if (params.deliveryState) {
|
||||
statements.replaceDeliveryState.run(bindTaskDeliveryState(params.deliveryState));
|
||||
} else {
|
||||
statements.deleteDeliveryState.run(params.task.taskId);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
export function deleteTaskRegistryRecordFromSqlite(taskId: string) {
|
||||
const store = openTaskRegistryDatabase();
|
||||
store.statements.deleteRow.run(taskId);
|
||||
|
|
@ -464,6 +478,13 @@ export function deleteTaskRegistryRecordFromSqlite(taskId: string) {
|
|||
ensureTaskRegistryPermissions(store.path);
|
||||
}
|
||||
|
||||
export function deleteTaskAndDeliveryStateFromSqlite(taskId: string) {
|
||||
withWriteTransaction((statements) => {
|
||||
statements.deleteRow.run(taskId);
|
||||
statements.deleteDeliveryState.run(taskId);
|
||||
});
|
||||
}
|
||||
|
||||
export function upsertTaskDeliveryStateToSqlite(state: TaskDeliveryState) {
|
||||
const store = openTaskRegistryDatabase();
|
||||
store.statements.replaceDeliveryState.run(bindTaskDeliveryState(state));
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ import {
|
|||
createTaskRecord,
|
||||
deleteTaskRecordById,
|
||||
findTaskByRunId,
|
||||
maybeDeliverTaskStateChangeUpdate,
|
||||
resetTaskRegistryForTests,
|
||||
} from "./task-registry.js";
|
||||
import { resolveTaskRegistryDir, resolveTaskRegistrySqlitePath } from "./task-registry.paths.js";
|
||||
|
|
@ -121,6 +122,55 @@ describe("task-registry store runtime", () => {
|
|||
});
|
||||
});
|
||||
|
||||
it("uses atomic task-plus-delivery store methods when available", async () => {
|
||||
const upsertTaskWithDeliveryState = vi.fn();
|
||||
const deleteTaskWithDeliveryState = vi.fn();
|
||||
configureTaskRegistryRuntime({
|
||||
store: {
|
||||
loadSnapshot: () => ({
|
||||
tasks: new Map(),
|
||||
deliveryStates: new Map(),
|
||||
}),
|
||||
saveSnapshot: vi.fn(),
|
||||
upsertTaskWithDeliveryState,
|
||||
deleteTaskWithDeliveryState,
|
||||
},
|
||||
});
|
||||
|
||||
const created = createTaskRecord({
|
||||
runtime: "acp",
|
||||
ownerKey: "agent:main:main",
|
||||
scopeKind: "session",
|
||||
childSessionKey: "agent:codex:acp:new",
|
||||
runId: "run-atomic",
|
||||
task: "Atomic task",
|
||||
status: "running",
|
||||
notifyPolicy: "state_changes",
|
||||
deliveryStatus: "pending",
|
||||
});
|
||||
|
||||
await maybeDeliverTaskStateChangeUpdate(created.taskId, {
|
||||
at: 200,
|
||||
kind: "progress",
|
||||
summary: "working",
|
||||
});
|
||||
expect(deleteTaskRecordById(created.taskId)).toBe(true);
|
||||
|
||||
expect(upsertTaskWithDeliveryState).toHaveBeenCalled();
|
||||
expect(upsertTaskWithDeliveryState.mock.calls[0]?.[0]).toMatchObject({
|
||||
task: expect.objectContaining({
|
||||
taskId: created.taskId,
|
||||
}),
|
||||
});
|
||||
expect(
|
||||
upsertTaskWithDeliveryState.mock.calls.some((call) => {
|
||||
const params = call[0] as { deliveryState?: { lastNotifiedEventAt?: number } };
|
||||
return params.deliveryState?.lastNotifiedEventAt === 200;
|
||||
}),
|
||||
).toBe(true);
|
||||
expect(deleteTaskWithDeliveryState).toHaveBeenCalledWith(created.taskId);
|
||||
});
|
||||
|
||||
it("restores persisted tasks from the default sqlite store", () => {
|
||||
const created = createTaskRecord({
|
||||
runtime: "cron",
|
||||
|
|
|
|||
|
|
@ -1,9 +1,11 @@
|
|||
import {
|
||||
closeTaskRegistrySqliteStore,
|
||||
deleteTaskAndDeliveryStateFromSqlite,
|
||||
deleteTaskDeliveryStateFromSqlite,
|
||||
deleteTaskRegistryRecordFromSqlite,
|
||||
loadTaskRegistryStateFromSqlite,
|
||||
saveTaskRegistryStateToSqlite,
|
||||
upsertTaskWithDeliveryStateToSqlite,
|
||||
upsertTaskDeliveryStateToSqlite,
|
||||
upsertTaskRegistryRecordToSqlite,
|
||||
} from "./task-registry.store.sqlite.js";
|
||||
|
|
@ -17,7 +19,12 @@ export type TaskRegistryStoreSnapshot = {
|
|||
export type TaskRegistryStore = {
|
||||
loadSnapshot: () => TaskRegistryStoreSnapshot;
|
||||
saveSnapshot: (snapshot: TaskRegistryStoreSnapshot) => void;
|
||||
upsertTaskWithDeliveryState?: (params: {
|
||||
task: TaskRecord;
|
||||
deliveryState?: TaskDeliveryState;
|
||||
}) => void;
|
||||
upsertTask?: (task: TaskRecord) => void;
|
||||
deleteTaskWithDeliveryState?: (taskId: string) => void;
|
||||
deleteTask?: (taskId: string) => void;
|
||||
upsertDeliveryState?: (state: TaskDeliveryState) => void;
|
||||
deleteDeliveryState?: (taskId: string) => void;
|
||||
|
|
@ -48,7 +55,9 @@ export type TaskRegistryHooks = {
|
|||
const defaultTaskRegistryStore: TaskRegistryStore = {
|
||||
loadSnapshot: loadTaskRegistryStateFromSqlite,
|
||||
saveSnapshot: saveTaskRegistryStateToSqlite,
|
||||
upsertTaskWithDeliveryState: upsertTaskWithDeliveryStateToSqlite,
|
||||
upsertTask: upsertTaskRegistryRecordToSqlite,
|
||||
deleteTaskWithDeliveryState: deleteTaskAndDeliveryStateFromSqlite,
|
||||
deleteTask: deleteTaskRegistryRecordFromSqlite,
|
||||
upsertDeliveryState: upsertTaskDeliveryStateToSqlite,
|
||||
deleteDeliveryState: deleteTaskDeliveryStateFromSqlite,
|
||||
|
|
|
|||
|
|
@ -78,6 +78,24 @@ function cloneTaskDeliveryState(state: TaskDeliveryState): TaskDeliveryState {
|
|||
};
|
||||
}
|
||||
|
||||
function setTaskDeliveryStateInMemory(state: TaskDeliveryState): TaskDeliveryState {
|
||||
const current = taskDeliveryStates.get(state.taskId);
|
||||
const next: TaskDeliveryState = {
|
||||
taskId: state.taskId,
|
||||
...(state.requesterOrigin
|
||||
? { requesterOrigin: normalizeDeliveryContext(state.requesterOrigin) }
|
||||
: {}),
|
||||
...(state.lastNotifiedEventAt != null
|
||||
? { lastNotifiedEventAt: state.lastNotifiedEventAt }
|
||||
: {}),
|
||||
};
|
||||
if (!next.requesterOrigin && typeof next.lastNotifiedEventAt !== "number" && !current) {
|
||||
return cloneTaskDeliveryState({ taskId: state.taskId });
|
||||
}
|
||||
taskDeliveryStates.set(state.taskId, next);
|
||||
return cloneTaskDeliveryState(next);
|
||||
}
|
||||
|
||||
function snapshotTaskRecords(source: ReadonlyMap<string, TaskRecord>): TaskRecord[] {
|
||||
return [...source.values()].map((record) => cloneTaskRecord(record));
|
||||
}
|
||||
|
|
@ -106,6 +124,14 @@ function persistTaskRegistry() {
|
|||
|
||||
function persistTaskUpsert(task: TaskRecord) {
|
||||
const store = getTaskRegistryStore();
|
||||
const deliveryState = taskDeliveryStates.get(task.taskId);
|
||||
if (store.upsertTaskWithDeliveryState) {
|
||||
store.upsertTaskWithDeliveryState({
|
||||
task,
|
||||
...(deliveryState ? { deliveryState } : {}),
|
||||
});
|
||||
return;
|
||||
}
|
||||
if (store.upsertTask) {
|
||||
store.upsertTask(task);
|
||||
return;
|
||||
|
|
@ -118,6 +144,10 @@ function persistTaskUpsert(task: TaskRecord) {
|
|||
|
||||
function persistTaskDelete(taskId: string) {
|
||||
const store = getTaskRegistryStore();
|
||||
if (store.deleteTaskWithDeliveryState) {
|
||||
store.deleteTaskWithDeliveryState(taskId);
|
||||
return;
|
||||
}
|
||||
if (store.deleteTask) {
|
||||
store.deleteTask(taskId);
|
||||
return;
|
||||
|
|
@ -128,30 +158,6 @@ function persistTaskDelete(taskId: string) {
|
|||
});
|
||||
}
|
||||
|
||||
function persistTaskDeliveryStateUpsert(state: TaskDeliveryState) {
|
||||
const store = getTaskRegistryStore();
|
||||
if (store.upsertDeliveryState) {
|
||||
store.upsertDeliveryState(state);
|
||||
return;
|
||||
}
|
||||
store.saveSnapshot({
|
||||
tasks,
|
||||
deliveryStates: taskDeliveryStates,
|
||||
});
|
||||
}
|
||||
|
||||
function persistTaskDeliveryStateDelete(taskId: string) {
|
||||
const store = getTaskRegistryStore();
|
||||
if (store.deleteDeliveryState) {
|
||||
store.deleteDeliveryState(taskId);
|
||||
return;
|
||||
}
|
||||
store.saveSnapshot({
|
||||
tasks,
|
||||
deliveryStates: taskDeliveryStates,
|
||||
});
|
||||
}
|
||||
|
||||
function ensureDeliveryStatus(params: {
|
||||
ownerKey: string;
|
||||
scopeKind: TaskScopeKind;
|
||||
|
|
@ -435,12 +441,14 @@ function mergeExistingTaskForCreate(
|
|||
const patch: Partial<TaskRecord> = {};
|
||||
const requesterOrigin = normalizeDeliveryContext(params.requesterOrigin);
|
||||
const currentDeliveryState = taskDeliveryStates.get(existing.taskId);
|
||||
let deliveryStateChanged = false;
|
||||
if (requesterOrigin && !currentDeliveryState?.requesterOrigin) {
|
||||
upsertTaskDeliveryState({
|
||||
setTaskDeliveryStateInMemory({
|
||||
taskId: existing.taskId,
|
||||
requesterOrigin,
|
||||
lastNotifiedEventAt: currentDeliveryState?.lastNotifiedEventAt,
|
||||
});
|
||||
deliveryStateChanged = true;
|
||||
}
|
||||
if (params.sourceId?.trim() && !existing.sourceId?.trim()) {
|
||||
patch.sourceId = params.sourceId.trim();
|
||||
|
|
@ -476,6 +484,9 @@ function mergeExistingTaskForCreate(
|
|||
patch.notifyPolicy = notifyPolicy;
|
||||
}
|
||||
if (Object.keys(patch).length === 0) {
|
||||
if (deliveryStateChanged) {
|
||||
persistTaskUpsert(existing);
|
||||
}
|
||||
return cloneTaskRecord(existing);
|
||||
}
|
||||
return updateTask(existing.taskId, patch) ?? cloneTaskRecord(existing);
|
||||
|
|
@ -574,25 +585,6 @@ function updateTask(taskId: string, patch: Partial<TaskRecord>): TaskRecord | nu
|
|||
return cloneTaskRecord(next);
|
||||
}
|
||||
|
||||
function upsertTaskDeliveryState(state: TaskDeliveryState): TaskDeliveryState {
|
||||
const current = taskDeliveryStates.get(state.taskId);
|
||||
const next: TaskDeliveryState = {
|
||||
taskId: state.taskId,
|
||||
...(state.requesterOrigin
|
||||
? { requesterOrigin: normalizeDeliveryContext(state.requesterOrigin) }
|
||||
: {}),
|
||||
...(state.lastNotifiedEventAt != null
|
||||
? { lastNotifiedEventAt: state.lastNotifiedEventAt }
|
||||
: {}),
|
||||
};
|
||||
if (!next.requesterOrigin && typeof next.lastNotifiedEventAt !== "number" && !current) {
|
||||
return cloneTaskDeliveryState({ taskId: state.taskId });
|
||||
}
|
||||
taskDeliveryStates.set(state.taskId, next);
|
||||
persistTaskDeliveryStateUpsert(next);
|
||||
return cloneTaskDeliveryState(next);
|
||||
}
|
||||
|
||||
function getTaskDeliveryState(taskId: string): TaskDeliveryState | undefined {
|
||||
const state = taskDeliveryStates.get(taskId);
|
||||
return state ? cloneTaskDeliveryState(state) : undefined;
|
||||
|
|
@ -788,7 +780,7 @@ export async function maybeDeliverTaskStateChangeUpdate(
|
|||
}
|
||||
if (!canDeliverTaskToRequesterOrigin(current)) {
|
||||
queueTaskSystemEvent(current, eventText);
|
||||
upsertTaskDeliveryState({
|
||||
setTaskDeliveryStateInMemory({
|
||||
taskId,
|
||||
requesterOrigin: deliveryState?.requesterOrigin,
|
||||
lastNotifiedEventAt: latestEvent.at,
|
||||
|
|
@ -818,7 +810,7 @@ export async function maybeDeliverTaskStateChangeUpdate(
|
|||
idempotencyKey,
|
||||
},
|
||||
});
|
||||
upsertTaskDeliveryState({
|
||||
setTaskDeliveryStateInMemory({
|
||||
taskId,
|
||||
requesterOrigin: deliveryState?.requesterOrigin,
|
||||
lastNotifiedEventAt: latestEvent.at,
|
||||
|
|
@ -1086,7 +1078,7 @@ export function createTaskRecord(params: {
|
|||
(record.endedAt ?? record.lastEventAt ?? record.createdAt) + DEFAULT_TASK_RETENTION_MS;
|
||||
}
|
||||
tasks.set(taskId, record);
|
||||
upsertTaskDeliveryState({
|
||||
setTaskDeliveryStateInMemory({
|
||||
taskId,
|
||||
requesterOrigin: normalizeDeliveryContext(params.requesterOrigin),
|
||||
});
|
||||
|
|
@ -1454,7 +1446,6 @@ export function deleteTaskRecordById(taskId: string): boolean {
|
|||
taskDeliveryStates.delete(taskId);
|
||||
rebuildRunIdIndex();
|
||||
persistTaskDelete(taskId);
|
||||
persistTaskDeliveryStateDelete(taskId);
|
||||
emitTaskRegistryHookEvent(() => ({
|
||||
kind: "deleted",
|
||||
taskId: current.taskId,
|
||||
|
|
|
|||
Loading…
Reference in New Issue