mirror of https://github.com/openclaw/openclaw.git
refactor: share gateway session store migration
This commit is contained in:
parent
6464149031
commit
198c2482ee
|
|
@ -50,8 +50,7 @@ import { performGatewaySessionReset } from "../session-reset-service.js";
|
|||
import {
|
||||
canonicalizeSpawnedByForAgent,
|
||||
loadSessionEntry,
|
||||
pruneLegacyStoreKeys,
|
||||
resolveGatewaySessionStoreTarget,
|
||||
migrateAndPruneGatewaySessionStoreKey,
|
||||
} from "../session-utils.js";
|
||||
import { formatForLog } from "../ws-log.js";
|
||||
import { waitForAgentJob } from "./agent-job.js";
|
||||
|
|
@ -425,18 +424,13 @@ export const agentHandlers: GatewayRequestHandlers = {
|
|||
const mainSessionKey = resolveAgentMainSessionKey({ cfg, agentId });
|
||||
if (storePath) {
|
||||
const persisted = await updateSessionStore(storePath, (store) => {
|
||||
const target = resolveGatewaySessionStoreTarget({
|
||||
const { primaryKey } = migrateAndPruneGatewaySessionStoreKey({
|
||||
cfg,
|
||||
key: requestedSessionKey,
|
||||
store,
|
||||
});
|
||||
pruneLegacyStoreKeys({
|
||||
store,
|
||||
canonicalKey: target.canonicalKey,
|
||||
candidates: target.storeKeys,
|
||||
});
|
||||
const merged = mergeSessionEntry(store[canonicalSessionKey], nextEntryPatch);
|
||||
store[canonicalSessionKey] = merged;
|
||||
const merged = mergeSessionEntry(store[primaryKey], nextEntryPatch);
|
||||
store[primaryKey] = merged;
|
||||
return merged;
|
||||
});
|
||||
sessionEntry = persisted;
|
||||
|
|
|
|||
|
|
@ -31,7 +31,7 @@ import {
|
|||
listSessionsFromStore,
|
||||
loadCombinedSessionStoreForGateway,
|
||||
loadSessionEntry,
|
||||
pruneLegacyStoreKeys,
|
||||
migrateAndPruneGatewaySessionStoreKey,
|
||||
readSessionPreviewItemsFromTranscript,
|
||||
resolveGatewaySessionStoreTarget,
|
||||
resolveSessionModelRef,
|
||||
|
|
@ -92,31 +92,6 @@ function rejectWebchatSessionMutation(params: {
|
|||
return true;
|
||||
}
|
||||
|
||||
function migrateAndPruneSessionStoreKey(params: {
|
||||
cfg: ReturnType<typeof loadConfig>;
|
||||
key: string;
|
||||
store: Record<string, SessionEntry>;
|
||||
}) {
|
||||
const target = resolveGatewaySessionStoreTarget({
|
||||
cfg: params.cfg,
|
||||
key: params.key,
|
||||
store: params.store,
|
||||
});
|
||||
const primaryKey = target.canonicalKey;
|
||||
if (!params.store[primaryKey]) {
|
||||
const existingKey = target.storeKeys.find((candidate) => Boolean(params.store[candidate]));
|
||||
if (existingKey) {
|
||||
params.store[primaryKey] = params.store[existingKey];
|
||||
}
|
||||
}
|
||||
pruneLegacyStoreKeys({
|
||||
store: params.store,
|
||||
canonicalKey: primaryKey,
|
||||
candidates: target.storeKeys,
|
||||
});
|
||||
return { target, primaryKey, entry: params.store[primaryKey] };
|
||||
}
|
||||
|
||||
export const sessionsHandlers: GatewayRequestHandlers = {
|
||||
"sessions.list": ({ params, respond }) => {
|
||||
if (!assertValidParams(params, validateSessionsListParams, "sessions.list", respond)) {
|
||||
|
|
@ -224,7 +199,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
|
|||
|
||||
const { cfg, target, storePath } = resolveGatewaySessionTargetFromKey(key);
|
||||
const applied = await updateSessionStore(storePath, async (store) => {
|
||||
const { primaryKey } = migrateAndPruneSessionStoreKey({ cfg, key, store });
|
||||
const { primaryKey } = migrateAndPruneGatewaySessionStoreKey({ cfg, key, store });
|
||||
return await applySessionsPatchToStore({
|
||||
cfg,
|
||||
store,
|
||||
|
|
@ -316,7 +291,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
|
|||
}
|
||||
const sessionId = entry?.sessionId;
|
||||
const deleted = await updateSessionStore(storePath, (store) => {
|
||||
const { primaryKey } = migrateAndPruneSessionStoreKey({ cfg, key, store });
|
||||
const { primaryKey } = migrateAndPruneGatewaySessionStoreKey({ cfg, key, store });
|
||||
const hadEntry = Boolean(store[primaryKey]);
|
||||
if (hadEntry) {
|
||||
delete store[primaryKey];
|
||||
|
|
@ -385,7 +360,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
|
|||
const { cfg, target, storePath } = resolveGatewaySessionTargetFromKey(key);
|
||||
// Lock + read in a short critical section; transcript work happens outside.
|
||||
const compactTarget = await updateSessionStore(storePath, (store) => {
|
||||
const { entry, primaryKey } = migrateAndPruneSessionStoreKey({ cfg, key, store });
|
||||
const { entry, primaryKey } = migrateAndPruneGatewaySessionStoreKey({ cfg, key, store });
|
||||
return { entry, primaryKey };
|
||||
});
|
||||
const entry = compactTarget.entry;
|
||||
|
|
|
|||
|
|
@ -16,11 +16,7 @@ import { defaultRuntime } from "../runtime.js";
|
|||
import { parseMessageWithAttachments } from "./chat-attachments.js";
|
||||
import { normalizeRpcAttachmentsToChatAttachments } from "./server-methods/attachment-normalize.js";
|
||||
import type { NodeEvent, NodeEventContext } from "./server-node-events-types.js";
|
||||
import {
|
||||
loadSessionEntry,
|
||||
pruneLegacyStoreKeys,
|
||||
resolveGatewaySessionStoreTarget,
|
||||
} from "./session-utils.js";
|
||||
import { loadSessionEntry, migrateAndPruneGatewaySessionStoreKey } from "./session-utils.js";
|
||||
import { formatForLog } from "./ws-log.js";
|
||||
|
||||
const MAX_EXEC_EVENT_OUTPUT_CHARS = 180;
|
||||
|
|
@ -152,17 +148,12 @@ async function touchSessionStore(params: {
|
|||
return;
|
||||
}
|
||||
await updateSessionStore(storePath, (store) => {
|
||||
const target = resolveGatewaySessionStoreTarget({
|
||||
const { primaryKey } = migrateAndPruneGatewaySessionStoreKey({
|
||||
cfg: params.cfg,
|
||||
key: params.sessionKey,
|
||||
store,
|
||||
});
|
||||
pruneLegacyStoreKeys({
|
||||
store,
|
||||
canonicalKey: target.canonicalKey,
|
||||
candidates: target.storeKeys,
|
||||
});
|
||||
store[params.canonicalKey] = {
|
||||
store[primaryKey] = {
|
||||
sessionId: params.sessionId,
|
||||
updatedAt: params.now,
|
||||
thinkingLevel: params.entry?.thinkingLevel,
|
||||
|
|
|
|||
|
|
@ -25,38 +25,13 @@ import { ErrorCodes, errorShape } from "./protocol/index.js";
|
|||
import {
|
||||
archiveSessionTranscripts,
|
||||
loadSessionEntry,
|
||||
pruneLegacyStoreKeys,
|
||||
migrateAndPruneGatewaySessionStoreKey,
|
||||
resolveGatewaySessionStoreTarget,
|
||||
resolveSessionModelRef,
|
||||
} from "./session-utils.js";
|
||||
|
||||
const ACP_RUNTIME_CLEANUP_TIMEOUT_MS = 15_000;
|
||||
|
||||
function migrateAndPruneSessionStoreKey(params: {
|
||||
cfg: ReturnType<typeof loadConfig>;
|
||||
key: string;
|
||||
store: Record<string, SessionEntry>;
|
||||
}) {
|
||||
const target = resolveGatewaySessionStoreTarget({
|
||||
cfg: params.cfg,
|
||||
key: params.key,
|
||||
store: params.store,
|
||||
});
|
||||
const primaryKey = target.canonicalKey;
|
||||
if (!params.store[primaryKey]) {
|
||||
const existingKey = target.storeKeys.find((candidate) => Boolean(params.store[candidate]));
|
||||
if (existingKey) {
|
||||
params.store[primaryKey] = params.store[existingKey];
|
||||
}
|
||||
}
|
||||
pruneLegacyStoreKeys({
|
||||
store: params.store,
|
||||
canonicalKey: primaryKey,
|
||||
candidates: target.storeKeys,
|
||||
});
|
||||
return { target, primaryKey, entry: params.store[primaryKey] };
|
||||
}
|
||||
|
||||
function stripRuntimeModelState(entry?: SessionEntry): SessionEntry | undefined {
|
||||
if (!entry) {
|
||||
return entry;
|
||||
|
|
@ -311,7 +286,11 @@ export async function performGatewaySessionReset(params: {
|
|||
let oldSessionId: string | undefined;
|
||||
let oldSessionFile: string | undefined;
|
||||
const next = await updateSessionStore(storePath, (store) => {
|
||||
const { primaryKey } = migrateAndPruneSessionStoreKey({ cfg, key: params.key, store });
|
||||
const { primaryKey } = migrateAndPruneGatewaySessionStoreKey({
|
||||
cfg,
|
||||
key: params.key,
|
||||
store,
|
||||
});
|
||||
const currentEntry = store[primaryKey];
|
||||
const resetEntry = stripRuntimeModelState(currentEntry);
|
||||
const parsed = parseAgentSessionKey(primaryKey);
|
||||
|
|
|
|||
|
|
@ -263,6 +263,31 @@ export function pruneLegacyStoreKeys(params: {
|
|||
}
|
||||
}
|
||||
|
||||
export function migrateAndPruneGatewaySessionStoreKey(params: {
|
||||
cfg: ReturnType<typeof loadConfig>;
|
||||
key: string;
|
||||
store: Record<string, SessionEntry>;
|
||||
}) {
|
||||
const target = resolveGatewaySessionStoreTarget({
|
||||
cfg: params.cfg,
|
||||
key: params.key,
|
||||
store: params.store,
|
||||
});
|
||||
const primaryKey = target.canonicalKey;
|
||||
if (!params.store[primaryKey]) {
|
||||
const existingKey = target.storeKeys.find((candidate) => Boolean(params.store[candidate]));
|
||||
if (existingKey) {
|
||||
params.store[primaryKey] = params.store[existingKey];
|
||||
}
|
||||
}
|
||||
pruneLegacyStoreKeys({
|
||||
store: params.store,
|
||||
canonicalKey: primaryKey,
|
||||
candidates: target.storeKeys,
|
||||
});
|
||||
return { target, primaryKey, entry: params.store[primaryKey] };
|
||||
}
|
||||
|
||||
export function classifySessionKey(key: string, entry?: SessionEntry): GatewaySessionRow["kind"] {
|
||||
if (key === "global") {
|
||||
return "global";
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ import {
|
|||
import {
|
||||
listSessionsFromStore,
|
||||
loadCombinedSessionStoreForGateway,
|
||||
pruneLegacyStoreKeys,
|
||||
migrateAndPruneGatewaySessionStoreKey,
|
||||
resolveGatewaySessionStoreTarget,
|
||||
} from "./session-utils.js";
|
||||
|
||||
|
|
@ -58,13 +58,10 @@ export async function resolveSessionKeyFromResolveParams(params: {
|
|||
};
|
||||
}
|
||||
await updateSessionStore(target.storePath, (s) => {
|
||||
const liveTarget = resolveGatewaySessionStoreTarget({ cfg, key, store: s });
|
||||
const canonicalKey = liveTarget.canonicalKey;
|
||||
// Migrate the first legacy entry to the canonical key.
|
||||
if (!s[canonicalKey] && s[legacyKey]) {
|
||||
s[canonicalKey] = s[legacyKey];
|
||||
const { primaryKey } = migrateAndPruneGatewaySessionStoreKey({ cfg, key, store: s });
|
||||
if (!s[primaryKey] && s[legacyKey]) {
|
||||
s[primaryKey] = s[legacyKey];
|
||||
}
|
||||
pruneLegacyStoreKeys({ store: s, canonicalKey, candidates: liveTarget.storeKeys });
|
||||
});
|
||||
return { ok: true, key: target.canonicalKey };
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue