From d1a4363783d11412c853cbfbc69ea1aa5d53f846 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 4 Apr 2026 09:18:28 +0100 Subject: [PATCH] fix(runtime): restore gateway watch on legacy state --- src/plugins/loader.ts | 34 +--------- src/tasks/task-registry.store.sqlite.ts | 37 +++++++++-- src/tasks/task-registry.store.test.ts | 85 +++++++++++++++++++++++++ 3 files changed, 117 insertions(+), 39 deletions(-) diff --git a/src/plugins/loader.ts b/src/plugins/loader.ts index 5a6f33f60da..c89b0ea0558 100644 --- a/src/plugins/loader.ts +++ b/src/plugins/loader.ts @@ -3,6 +3,7 @@ import fs from "node:fs"; import path from "node:path"; import { createJiti } from "jiti"; import type { ChannelPlugin } from "../channels/plugins/types.js"; +import { isChannelConfigured } from "../config/channel-configured.js"; import type { OpenClawConfig } from "../config/config.js"; import type { PluginInstallRecord } from "../config/types.plugins.js"; import type { GatewayRequestHandler } from "../gateway/server-methods/types.js"; @@ -162,38 +163,6 @@ export function clearPluginLoaderCache(): void { const defaultLogger = () => createSubsystemLogger("plugins"); -type ChannelConfiguredModule = { - isChannelConfigured?: ( - cfg: OpenClawConfig, - channelId: string, - env?: NodeJS.ProcessEnv, - ) => boolean; -}; - -let channelConfiguredLoader: ReturnType | undefined; -let cachedChannelConfiguredModule: ChannelConfiguredModule | undefined; - -function resolveChannelConfiguredModule(): ChannelConfiguredModule { - if (cachedChannelConfiguredModule) { - return cachedChannelConfiguredModule; - } - channelConfiguredLoader ??= createJiti(import.meta.url, { interopDefault: true }); - const loaded = channelConfiguredLoader("../config/channel-configured.js"); - if (!loaded || typeof loaded !== "object") { - throw new Error("failed to load config/channel-configured runtime"); - } - cachedChannelConfiguredModule = loaded as ChannelConfiguredModule; - return cachedChannelConfiguredModule; -} - -function resolveIsChannelConfigured() { - const fn = resolveChannelConfiguredModule().isChannelConfigured; - if (typeof fn !== "function") { - throw new Error("config/channel-configured runtime missing isChannelConfigured()"); - } - return fn; -} - function createPluginJitiLoader(options: Pick) { const jitiLoaders = new Map>(); return (modulePath: string) => { @@ -576,7 +545,6 @@ function shouldLoadChannelPluginInSetupRuntime(params: { ) { return true; } - const isChannelConfigured = resolveIsChannelConfigured(); return !params.manifestChannels.some((channelId) => isChannelConfigured(params.cfg, channelId, params.env), ); diff --git a/src/tasks/task-registry.store.sqlite.ts b/src/tasks/task-registry.store.sqlite.ts index dbd0ea729ba..d4f8be6507d 100644 --- a/src/tasks/task-registry.store.sqlite.ts +++ b/src/tasks/task-registry.store.sqlite.ts @@ -44,6 +44,7 @@ type TableInfoRow = { }; type TaskRegistryStatements = { + legacyRequesterSessionColumn: boolean; selectAll: StatementSync; selectAllDeliveryStates: StatementSync; upsertRow: StatementSync; @@ -131,7 +132,7 @@ function rowToTaskDeliveryState(row: TaskDeliveryStateRow): TaskDeliveryState { }; } -function bindTaskRecord(record: TaskRecord) { +function bindTaskRecordBase(record: TaskRecord) { return { task_id: record.taskId, runtime: record.runtime, @@ -160,6 +161,16 @@ function bindTaskRecord(record: TaskRecord) { }; } +function bindTaskRecord(record: TaskRecord, legacyRequesterSessionColumn: boolean) { + if (!legacyRequesterSessionColumn) { + return bindTaskRecordBase(record); + } + return { + ...bindTaskRecordBase(record), + requester_session_key: record.scopeKind === "system" ? "" : record.requesterSessionKey, + }; +} + function bindTaskDeliveryState(state: TaskDeliveryState) { return { task_id: state.taskId, @@ -169,7 +180,19 @@ function bindTaskDeliveryState(state: TaskDeliveryState) { } function createStatements(db: DatabaseSync): TaskRegistryStatements { + const legacyRequesterSessionColumn = hasTaskRunsColumn(db, "requester_session_key"); + const upsertLegacyRequesterColumns = legacyRequesterSessionColumn + ? ` + requester_session_key, +` + : ""; + const upsertLegacyRequesterValues = legacyRequesterSessionColumn + ? ` + @requester_session_key, +` + : ""; return { + legacyRequesterSessionColumn, selectAll: db.prepare(` SELECT task_id, @@ -212,7 +235,7 @@ function createStatements(db: DatabaseSync): TaskRegistryStatements { task_id, runtime, source_id, - owner_key, +${upsertLegacyRequesterColumns} owner_key, scope_kind, child_session_key, parent_flow_id, @@ -237,7 +260,7 @@ function createStatements(db: DatabaseSync): TaskRegistryStatements { @task_id, @runtime, @source_id, - @owner_key, +${upsertLegacyRequesterValues} @owner_key, @scope_kind, @child_session_key, @parent_flow_id, @@ -456,7 +479,7 @@ export function saveTaskRegistryStateToSqlite(snapshot: TaskRegistryStoreSnapsho statements.clearDeliveryStates.run(); statements.clearRows.run(); for (const task of snapshot.tasks.values()) { - statements.upsertRow.run(bindTaskRecord(task)); + statements.upsertRow.run(bindTaskRecord(task, statements.legacyRequesterSessionColumn)); } for (const state of snapshot.deliveryStates.values()) { statements.replaceDeliveryState.run(bindTaskDeliveryState(state)); @@ -466,7 +489,9 @@ export function saveTaskRegistryStateToSqlite(snapshot: TaskRegistryStoreSnapsho export function upsertTaskRegistryRecordToSqlite(task: TaskRecord) { const store = openTaskRegistryDatabase(); - store.statements.upsertRow.run(bindTaskRecord(task)); + store.statements.upsertRow.run( + bindTaskRecord(task, store.statements.legacyRequesterSessionColumn), + ); } export function upsertTaskWithDeliveryStateToSqlite(params: { @@ -474,7 +499,7 @@ export function upsertTaskWithDeliveryStateToSqlite(params: { deliveryState?: TaskDeliveryState; }) { withWriteTransaction((statements) => { - statements.upsertRow.run(bindTaskRecord(params.task)); + statements.upsertRow.run(bindTaskRecord(params.task, statements.legacyRequesterSessionColumn)); if (params.deliveryState) { statements.replaceDeliveryState.run(bindTaskDeliveryState(params.deliveryState)); } else { diff --git a/src/tasks/task-registry.store.test.ts b/src/tasks/task-registry.store.test.ts index 7304cd871f4..f6e6e9cd8ef 100644 --- a/src/tasks/task-registry.store.test.ts +++ b/src/tasks/task-registry.store.test.ts @@ -8,6 +8,7 @@ import { createTaskRecord, deleteTaskRecordById, findTaskByRunId, + markTaskLostById, maybeDeliverTaskStateChangeUpdate, resetTaskRegistryForTests, } from "./task-registry.js"; @@ -334,4 +335,88 @@ describe("task-registry store runtime", () => { notifyPolicy: "silent", }); }); + + it("keeps legacy requester_session_key rows writable after restore", () => { + const stateDir = mkdtempSync(path.join(os.tmpdir(), "openclaw-task-store-legacy-write-")); + process.env.OPENCLAW_STATE_DIR = stateDir; + const sqlitePath = resolveTaskRegistrySqlitePath(process.env); + mkdirSync(path.dirname(sqlitePath), { recursive: true }); + const { DatabaseSync } = requireNodeSqlite(); + const db = new DatabaseSync(sqlitePath); + db.exec(` + CREATE TABLE task_runs ( + task_id TEXT PRIMARY KEY, + runtime TEXT NOT NULL, + source_id TEXT, + requester_session_key TEXT NOT NULL, + child_session_key TEXT, + parent_task_id TEXT, + agent_id TEXT, + run_id TEXT, + label TEXT, + task TEXT NOT NULL, + status TEXT NOT NULL, + delivery_status TEXT NOT NULL, + notify_policy TEXT NOT NULL, + created_at INTEGER NOT NULL, + started_at INTEGER, + ended_at INTEGER, + last_event_at INTEGER, + cleanup_after INTEGER, + error TEXT, + progress_summary TEXT, + terminal_summary TEXT, + terminal_outcome TEXT + ); + `); + db.exec(` + CREATE TABLE task_delivery_state ( + task_id TEXT PRIMARY KEY, + requester_origin_json TEXT, + last_notified_event_at INTEGER + ); + `); + db.prepare(` + INSERT INTO task_runs ( + task_id, + runtime, + requester_session_key, + run_id, + task, + status, + delivery_status, + notify_policy, + created_at, + last_event_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + `).run( + "legacy-session-task", + "acp", + "agent:main:main", + "legacy-session-run", + "Legacy session task", + "running", + "pending", + "done_only", + 100, + 100, + ); + db.close(); + + resetTaskRegistryForTests({ persist: false }); + + expect(() => + markTaskLostById({ + taskId: "legacy-session-task", + endedAt: 200, + lastEventAt: 200, + error: "session missing", + }), + ).not.toThrow(); + expect(findTaskByRunId("legacy-session-run")).toMatchObject({ + taskId: "legacy-session-task", + status: "lost", + error: "session missing", + }); + }); });