fix(runtime): restore gateway watch on legacy state

This commit is contained in:
Peter Steinberger 2026-04-04 09:18:28 +01:00
parent 0051a86b8f
commit d1a4363783
No known key found for this signature in database
3 changed files with 117 additions and 39 deletions

View File

@ -3,6 +3,7 @@ import fs from "node:fs";
import path from "node:path";
import { createJiti } from "jiti";
import type { ChannelPlugin } from "../channels/plugins/types.js";
import { isChannelConfigured } from "../config/channel-configured.js";
import type { OpenClawConfig } from "../config/config.js";
import type { PluginInstallRecord } from "../config/types.plugins.js";
import type { GatewayRequestHandler } from "../gateway/server-methods/types.js";
@ -162,38 +163,6 @@ export function clearPluginLoaderCache(): void {
const defaultLogger = () => createSubsystemLogger("plugins");
type ChannelConfiguredModule = {
isChannelConfigured?: (
cfg: OpenClawConfig,
channelId: string,
env?: NodeJS.ProcessEnv,
) => boolean;
};
let channelConfiguredLoader: ReturnType<typeof createJiti> | undefined;
let cachedChannelConfiguredModule: ChannelConfiguredModule | undefined;
function resolveChannelConfiguredModule(): ChannelConfiguredModule {
if (cachedChannelConfiguredModule) {
return cachedChannelConfiguredModule;
}
channelConfiguredLoader ??= createJiti(import.meta.url, { interopDefault: true });
const loaded = channelConfiguredLoader("../config/channel-configured.js");
if (!loaded || typeof loaded !== "object") {
throw new Error("failed to load config/channel-configured runtime");
}
cachedChannelConfiguredModule = loaded as ChannelConfiguredModule;
return cachedChannelConfiguredModule;
}
function resolveIsChannelConfigured() {
const fn = resolveChannelConfiguredModule().isChannelConfigured;
if (typeof fn !== "function") {
throw new Error("config/channel-configured runtime missing isChannelConfigured()");
}
return fn;
}
function createPluginJitiLoader(options: Pick<PluginLoadOptions, "pluginSdkResolution">) {
const jitiLoaders = new Map<string, ReturnType<typeof createJiti>>();
return (modulePath: string) => {
@ -576,7 +545,6 @@ function shouldLoadChannelPluginInSetupRuntime(params: {
) {
return true;
}
const isChannelConfigured = resolveIsChannelConfigured();
return !params.manifestChannels.some((channelId) =>
isChannelConfigured(params.cfg, channelId, params.env),
);

View File

@ -44,6 +44,7 @@ type TableInfoRow = {
};
type TaskRegistryStatements = {
legacyRequesterSessionColumn: boolean;
selectAll: StatementSync;
selectAllDeliveryStates: StatementSync;
upsertRow: StatementSync;
@ -131,7 +132,7 @@ function rowToTaskDeliveryState(row: TaskDeliveryStateRow): TaskDeliveryState {
};
}
function bindTaskRecord(record: TaskRecord) {
function bindTaskRecordBase(record: TaskRecord) {
return {
task_id: record.taskId,
runtime: record.runtime,
@ -160,6 +161,16 @@ function bindTaskRecord(record: TaskRecord) {
};
}
function bindTaskRecord(record: TaskRecord, legacyRequesterSessionColumn: boolean) {
if (!legacyRequesterSessionColumn) {
return bindTaskRecordBase(record);
}
return {
...bindTaskRecordBase(record),
requester_session_key: record.scopeKind === "system" ? "" : record.requesterSessionKey,
};
}
function bindTaskDeliveryState(state: TaskDeliveryState) {
return {
task_id: state.taskId,
@ -169,7 +180,19 @@ function bindTaskDeliveryState(state: TaskDeliveryState) {
}
function createStatements(db: DatabaseSync): TaskRegistryStatements {
const legacyRequesterSessionColumn = hasTaskRunsColumn(db, "requester_session_key");
const upsertLegacyRequesterColumns = legacyRequesterSessionColumn
? `
requester_session_key,
`
: "";
const upsertLegacyRequesterValues = legacyRequesterSessionColumn
? `
@requester_session_key,
`
: "";
return {
legacyRequesterSessionColumn,
selectAll: db.prepare(`
SELECT
task_id,
@ -212,7 +235,7 @@ function createStatements(db: DatabaseSync): TaskRegistryStatements {
task_id,
runtime,
source_id,
owner_key,
${upsertLegacyRequesterColumns} owner_key,
scope_kind,
child_session_key,
parent_flow_id,
@ -237,7 +260,7 @@ function createStatements(db: DatabaseSync): TaskRegistryStatements {
@task_id,
@runtime,
@source_id,
@owner_key,
${upsertLegacyRequesterValues} @owner_key,
@scope_kind,
@child_session_key,
@parent_flow_id,
@ -456,7 +479,7 @@ export function saveTaskRegistryStateToSqlite(snapshot: TaskRegistryStoreSnapsho
statements.clearDeliveryStates.run();
statements.clearRows.run();
for (const task of snapshot.tasks.values()) {
statements.upsertRow.run(bindTaskRecord(task));
statements.upsertRow.run(bindTaskRecord(task, statements.legacyRequesterSessionColumn));
}
for (const state of snapshot.deliveryStates.values()) {
statements.replaceDeliveryState.run(bindTaskDeliveryState(state));
@ -466,7 +489,9 @@ export function saveTaskRegistryStateToSqlite(snapshot: TaskRegistryStoreSnapsho
export function upsertTaskRegistryRecordToSqlite(task: TaskRecord) {
const store = openTaskRegistryDatabase();
store.statements.upsertRow.run(bindTaskRecord(task));
store.statements.upsertRow.run(
bindTaskRecord(task, store.statements.legacyRequesterSessionColumn),
);
}
export function upsertTaskWithDeliveryStateToSqlite(params: {
@ -474,7 +499,7 @@ export function upsertTaskWithDeliveryStateToSqlite(params: {
deliveryState?: TaskDeliveryState;
}) {
withWriteTransaction((statements) => {
statements.upsertRow.run(bindTaskRecord(params.task));
statements.upsertRow.run(bindTaskRecord(params.task, statements.legacyRequesterSessionColumn));
if (params.deliveryState) {
statements.replaceDeliveryState.run(bindTaskDeliveryState(params.deliveryState));
} else {

View File

@ -8,6 +8,7 @@ import {
createTaskRecord,
deleteTaskRecordById,
findTaskByRunId,
markTaskLostById,
maybeDeliverTaskStateChangeUpdate,
resetTaskRegistryForTests,
} from "./task-registry.js";
@ -334,4 +335,88 @@ describe("task-registry store runtime", () => {
notifyPolicy: "silent",
});
});
it("keeps legacy requester_session_key rows writable after restore", () => {
const stateDir = mkdtempSync(path.join(os.tmpdir(), "openclaw-task-store-legacy-write-"));
process.env.OPENCLAW_STATE_DIR = stateDir;
const sqlitePath = resolveTaskRegistrySqlitePath(process.env);
mkdirSync(path.dirname(sqlitePath), { recursive: true });
const { DatabaseSync } = requireNodeSqlite();
const db = new DatabaseSync(sqlitePath);
db.exec(`
CREATE TABLE task_runs (
task_id TEXT PRIMARY KEY,
runtime TEXT NOT NULL,
source_id TEXT,
requester_session_key TEXT NOT NULL,
child_session_key TEXT,
parent_task_id TEXT,
agent_id TEXT,
run_id TEXT,
label TEXT,
task TEXT NOT NULL,
status TEXT NOT NULL,
delivery_status TEXT NOT NULL,
notify_policy TEXT NOT NULL,
created_at INTEGER NOT NULL,
started_at INTEGER,
ended_at INTEGER,
last_event_at INTEGER,
cleanup_after INTEGER,
error TEXT,
progress_summary TEXT,
terminal_summary TEXT,
terminal_outcome TEXT
);
`);
db.exec(`
CREATE TABLE task_delivery_state (
task_id TEXT PRIMARY KEY,
requester_origin_json TEXT,
last_notified_event_at INTEGER
);
`);
db.prepare(`
INSERT INTO task_runs (
task_id,
runtime,
requester_session_key,
run_id,
task,
status,
delivery_status,
notify_policy,
created_at,
last_event_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`).run(
"legacy-session-task",
"acp",
"agent:main:main",
"legacy-session-run",
"Legacy session task",
"running",
"pending",
"done_only",
100,
100,
);
db.close();
resetTaskRegistryForTests({ persist: false });
expect(() =>
markTaskLostById({
taskId: "legacy-session-task",
endedAt: 200,
lastEventAt: 200,
error: "session missing",
}),
).not.toThrow();
expect(findTaskByRunId("legacy-session-run")).toMatchObject({
taskId: "legacy-session-task",
status: "lost",
error: "session missing",
});
});
});