fix(gateway): emit before_reset on session reset (#53872)

Merged via squash.

Prepared head SHA: a47894ef16
Co-authored-by: VACInc <3279061+VACInc@users.noreply.github.com>
Co-authored-by: jalehman <550978+jalehman@users.noreply.github.com>
Reviewed-by: @jalehman
This commit is contained in:
VACInc 2026-04-01 15:46:53 -04:00 committed by GitHub
parent 1f99c87a44
commit 711c9e7249
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 373 additions and 22 deletions

View File

@ -14,6 +14,7 @@ Docs: https://docs.openclaw.ai
- Config/Discord: coerce safe integer numeric Discord IDs to strings during config validation, keep unsafe or precision-losing numeric snowflakes rejected, and align `openclaw doctor` repair guidance with the same fail-closed behavior. (#45125) Thanks @moliendocode.
- Gateway/sessions: scope bare `sessions.create` aliases like `main` to the requested agent while preserving the canonical `global` and `unknown` sentinel keys. (#58207) thanks @jalehman.
- `/context detail` now compares the tracked prompt estimate with cached context usage and surfaces untracked provider/runtime overhead when present. (#28391) thanks @ImLukeF.
- Gateway/session reset: emit the typed `before_reset` hook for gateway `/new` and `/reset`, preserving reset-hook behavior even when the previous transcript has already been archived. (#53872) thanks @VACInc
## 2026.4.2
@ -597,6 +598,7 @@ Docs: https://docs.openclaw.ai
- Security/path resolution: prefer non-user-writable absolute helper binaries for OpenClaw CLI, ffmpeg, and OpenSSL resolution so PATH hijacks cannot replace trusted helpers with attacker-controlled executables.
- Security/gateway command scopes: require `operator.admin` before Telegram target writeback and Talk Voice `/voice set` config writes persist through gateway message flows.
- Security/OpenShell mirror: exclude workspace `hooks/` from mirror sync so untrusted sandbox files cannot become trusted host hooks on gateway startup.
- Exec approvals/channels: unify Discord and Telegram exec approval runtime handling, move approval buttons onto the shared interactive reply model, and fix Telegram approval buttons and typed `/approve` commands so configured approvers can resolve requests reliably again. (#57516) Thanks @scoootscooob.
## 2026.3.24-beta.2

View File

@ -2,11 +2,23 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { HookRunner } from "../../plugins/hooks.js";
import type { HandleCommandsParams } from "./commands-types.js";
const fsMocks = vi.hoisted(() => ({
readFile: vi.fn(),
readdir: vi.fn(),
}));
const hookRunnerMocks = vi.hoisted(() => ({
hasHooks: vi.fn<HookRunner["hasHooks"]>(),
runBeforeReset: vi.fn<HookRunner["runBeforeReset"]>(),
}));
vi.mock("node:fs/promises", () => ({
default: {
readFile: fsMocks.readFile,
readdir: fsMocks.readdir,
},
}));
vi.mock("../../plugins/hook-runner-global.js", () => ({
getGlobalHookRunner: () =>
({
@ -46,10 +58,14 @@ describe("emitResetCommandHooks", () => {
}
beforeEach(() => {
fsMocks.readFile.mockReset();
fsMocks.readdir.mockReset();
hookRunnerMocks.hasHooks.mockReset();
hookRunnerMocks.runBeforeReset.mockReset();
hookRunnerMocks.hasHooks.mockImplementation((hookName) => hookName === "before_reset");
hookRunnerMocks.runBeforeReset.mockResolvedValue(undefined);
fsMocks.readFile.mockResolvedValue("");
fsMocks.readdir.mockResolvedValue([]);
});
afterEach(() => {
@ -85,4 +101,49 @@ describe("emitResetCommandHooks", () => {
workspaceDir: "/tmp/openclaw-workspace",
});
});
it("recovers the archived transcript when the original reset transcript path is gone", async () => {
fsMocks.readFile.mockRejectedValueOnce(Object.assign(new Error("ENOENT"), { code: "ENOENT" }));
fsMocks.readdir.mockResolvedValueOnce(["prev-session.jsonl.reset.2026-02-16T22-26-33.000Z"]);
fsMocks.readFile.mockResolvedValueOnce(
`${JSON.stringify({
type: "message",
id: "m1",
message: { role: "user", content: "Recovered from archive" },
})}\n`,
);
const command = {
surface: "telegram",
senderId: "vac",
channel: "telegram",
from: "telegram:vac",
to: "telegram:bot",
resetHookTriggered: false,
} as HandleCommandsParams["command"];
await emitResetCommandHooks({
action: "new",
ctx: {} as HandleCommandsParams["ctx"],
cfg: {} as HandleCommandsParams["cfg"],
command,
sessionKey: "agent:main:telegram:group:-1003826723328:topic:8428",
previousSessionEntry: {
sessionId: "prev-session",
sessionFile: "/tmp/prev-session.jsonl",
} as HandleCommandsParams["previousSessionEntry"],
workspaceDir: "/tmp/openclaw-workspace",
});
await vi.waitFor(() => expect(hookRunnerMocks.runBeforeReset).toHaveBeenCalledTimes(1));
expect(hookRunnerMocks.runBeforeReset).toHaveBeenCalledWith(
expect.objectContaining({
sessionFile: "/tmp/prev-session.jsonl.reset.2026-02-16T22-26-33.000Z",
messages: [{ role: "user", content: "Recovered from archive" }],
reason: "new",
}),
expect.objectContaining({
sessionId: "prev-session",
}),
);
});
});

View File

@ -1,4 +1,5 @@
import fs from "node:fs/promises";
import path from "node:path";
import { resetConfiguredBindingTargetInPlace } from "../../channels/plugins/binding-targets.js";
import { logVerbose } from "../../globals.js";
import { createInternalHookEvent, triggerInternalHook } from "../../hooks/internal-hooks.js";
@ -31,6 +32,86 @@ let HANDLERS: CommandHandler[] | null = null;
export type ResetCommandAction = "new" | "reset";
// Reset hooks only need the transcript message payloads, not session headers or metadata rows.
function parseTranscriptMessages(content: string): unknown[] {
const messages: unknown[] = [];
for (const line of content.split("\n")) {
if (!line.trim()) {
continue;
}
try {
const entry = JSON.parse(line);
if (entry.type === "message" && entry.message) {
messages.push(entry.message);
}
} catch {
// Skip malformed lines from partially-written transcripts.
}
}
return messages;
}
// Once /reset rotates a transcript, the newest archived sibling is the best fallback source.
async function findLatestArchivedTranscript(sessionFile: string): Promise<string | undefined> {
try {
const dir = path.dirname(sessionFile);
const base = path.basename(sessionFile);
const resetPrefix = `${base}.reset.`;
const archived = (await fs.readdir(dir))
.filter((name) => name.startsWith(resetPrefix))
.toSorted();
const latest = archived[archived.length - 1];
return latest ? path.join(dir, latest) : undefined;
} catch {
return undefined;
}
}
// Prefer the live transcript path, but fall back to the archived reset transcript when rotation won the race.
async function loadBeforeResetTranscript(params: {
sessionFile?: string;
}): Promise<{ sessionFile?: string; messages: unknown[] }> {
const sessionFile = params.sessionFile;
if (!sessionFile) {
logVerbose("before_reset: no session file available, firing hook with empty messages");
return { sessionFile, messages: [] };
}
try {
return {
sessionFile,
messages: parseTranscriptMessages(await fs.readFile(sessionFile, "utf-8")),
};
} catch (err: unknown) {
if ((err as { code?: unknown })?.code !== "ENOENT") {
logVerbose(
`before_reset: failed to read session file ${sessionFile}; firing hook with empty messages (${String(err)})`,
);
return { sessionFile, messages: [] };
}
}
const archivedSessionFile = await findLatestArchivedTranscript(sessionFile);
if (!archivedSessionFile) {
logVerbose(
`before_reset: failed to find archived transcript for ${sessionFile}; firing hook with empty messages`,
);
return { sessionFile, messages: [] };
}
try {
return {
sessionFile: archivedSessionFile,
messages: parseTranscriptMessages(await fs.readFile(archivedSessionFile, "utf-8")),
};
} catch (err: unknown) {
logVerbose(
`before_reset: failed to read archived session file ${archivedSessionFile}; firing hook with empty messages (${String(err)})`,
);
return { sessionFile: archivedSessionFile, messages: [] };
}
}
export async function emitResetCommandHooks(params: {
action: ResetCommandAction;
ctx: HandleCommandsParams["ctx"];
@ -82,29 +163,13 @@ export async function emitResetCommandHooks(params: {
const hookRunner = getGlobalHookRunner();
if (hookRunner?.hasHooks("before_reset")) {
const prevEntry = params.previousSessionEntry;
const sessionFile = prevEntry?.sessionFile;
// Fire-and-forget: read old session messages and run hook
void (async () => {
const { sessionFile, messages } = await loadBeforeResetTranscript({
sessionFile: prevEntry?.sessionFile,
});
try {
const messages: unknown[] = [];
if (sessionFile) {
const content = await fs.readFile(sessionFile, "utf-8");
for (const line of content.split("\n")) {
if (!line.trim()) {
continue;
}
try {
const entry = JSON.parse(line);
if (entry.type === "message" && entry.message) {
messages.push(entry.message);
}
} catch {
// skip malformed lines
}
}
} else {
logVerbose("before_reset: no session file available, firing hook with empty messages");
}
await hookRunner.runBeforeReset(
{ sessionFile, messages, reason: params.action },
{

View File

@ -5,10 +5,14 @@ import { afterAll, beforeAll, beforeEach, describe, expect, test, vi } from "vit
import { WebSocket } from "ws";
import { DEFAULT_PROVIDER } from "../agents/defaults.js";
import { clearConfigCache, clearRuntimeConfigSnapshot } from "../config/config.js";
import { loadConfig } from "../config/config.js";
import { withSessionStoreLockForTest } from "../config/sessions/store.js";
import { isSessionPatchEvent, type InternalHookEvent } from "../hooks/internal-hooks.js";
import { GATEWAY_CLIENT_IDS, GATEWAY_CLIENT_MODES } from "./protocol/client-info.js";
import { startGatewayServerHarness, type GatewayServerHarness } from "./server.e2e-ws-harness.js";
import { createToolSummaryPreviewTranscriptLines } from "./session-preview.test-helpers.js";
import { performGatewaySessionReset } from "./session-reset-service.js";
import { resolveGatewaySessionStoreTarget } from "./session-utils.js";
import {
connectOk,
embeddedRunMock,
@ -38,10 +42,18 @@ const sessionHookMocks = vi.hoisted(() => ({
triggerInternalHook: vi.fn(async (_event: unknown) => {}),
}));
const beforeResetHookMocks = vi.hoisted(() => ({
runBeforeReset: vi.fn(async () => {}),
}));
const subagentLifecycleHookMocks = vi.hoisted(() => ({
runSubagentEnded: vi.fn(async () => {}),
}));
const beforeResetHookState = vi.hoisted(() => ({
hasBeforeResetHook: false,
}));
const subagentLifecycleHookState = vi.hoisted(() => ({
hasSubagentEndedHook: true,
}));
@ -108,7 +120,9 @@ vi.mock("../plugins/hook-runner-global.js", async (importOriginal) => {
...actual,
getGlobalHookRunner: vi.fn(() => ({
hasHooks: (hookName: string) =>
hookName === "subagent_ended" && subagentLifecycleHookState.hasSubagentEndedHook,
(hookName === "subagent_ended" && subagentLifecycleHookState.hasSubagentEndedHook) ||
(hookName === "before_reset" && beforeResetHookState.hasBeforeResetHook),
runBeforeReset: beforeResetHookMocks.runBeforeReset,
runSubagentEnded: subagentLifecycleHookMocks.runSubagentEnded,
})),
};
@ -262,6 +276,8 @@ describe("gateway server sessions", () => {
sessionHookMocks.hasInternalHookListeners.mockReset();
sessionHookMocks.hasInternalHookListeners.mockReturnValue(true);
sessionHookMocks.triggerInternalHook.mockClear();
beforeResetHookMocks.runBeforeReset.mockClear();
beforeResetHookState.hasBeforeResetHook = false;
subagentLifecycleHookMocks.runSubagentEnded.mockClear();
subagentLifecycleHookState.hasSubagentEndedHook = true;
threadBindingMocks.unbindThreadBindingsBySessionKey.mockClear();
@ -2292,6 +2308,59 @@ describe("gateway server sessions", () => {
ws.close();
});
test("sessions.reset emits before_reset hook with transcript context", async () => {
const { dir } = await createSessionStoreDir();
const transcriptPath = path.join(dir, "sess-main.jsonl");
await fs.writeFile(
transcriptPath,
`${JSON.stringify({
type: "message",
id: "m1",
message: { role: "user", content: "hello from transcript" },
})}\n`,
"utf-8",
);
await writeSessionStore({
entries: {
main: {
sessionId: "sess-main",
sessionFile: transcriptPath,
updatedAt: Date.now(),
},
},
});
beforeResetHookState.hasBeforeResetHook = true;
const { ws } = await openClient();
const reset = await rpcReq<{ ok: true; key: string }>(ws, "sessions.reset", {
key: "main",
reason: "new",
});
expect(reset.ok).toBe(true);
expect(beforeResetHookMocks.runBeforeReset).toHaveBeenCalledTimes(1);
const [event, context] = (
beforeResetHookMocks.runBeforeReset.mock.calls as unknown as Array<[unknown, unknown]>
)[0] ?? [undefined, undefined];
expect(event).toMatchObject({
sessionFile: transcriptPath,
reason: "new",
messages: [
{
role: "user",
content: "hello from transcript",
},
],
});
expect(context).toMatchObject({
agentId: "main",
sessionKey: "agent:main:main",
sessionId: "sess-main",
});
ws.close();
});
test("sessions.reset returns unavailable when active run does not stop", async () => {
const { dir, storePath } = await seedActiveMainSession();
const waitCallCountAtSnapshotClear: number[] = [];
@ -2299,6 +2368,7 @@ describe("gateway server sessions", () => {
waitCallCountAtSnapshotClear.push(embeddedRunMock.waitCalls.length);
});
beforeResetHookState.hasBeforeResetHook = true;
embeddedRunMock.activeIds.add("sess-main");
embeddedRunMock.waitResults.set("sess-main", false);
@ -2315,6 +2385,7 @@ describe("gateway server sessions", () => {
["main", "agent:main:main", "sess-main"],
"sess-main",
);
expect(beforeResetHookMocks.runBeforeReset).not.toHaveBeenCalled();
expect(waitCallCountAtSnapshotClear).toEqual([1]);
expect(browserSessionTabMocks.closeTrackedBrowserTabsForSessions).not.toHaveBeenCalled();
@ -2329,6 +2400,99 @@ describe("gateway server sessions", () => {
ws.close();
});
test("sessions.reset emits before_reset for the entry actually reset under the store lock", async () => {
const { dir } = await createSessionStoreDir();
const oldTranscriptPath = path.join(dir, "sess-old.jsonl");
const newTranscriptPath = path.join(dir, "sess-new.jsonl");
await fs.writeFile(
oldTranscriptPath,
`${JSON.stringify({
type: "message",
id: "m-old",
message: { role: "user", content: "old transcript" },
})}\n`,
"utf-8",
);
await fs.writeFile(
newTranscriptPath,
`${JSON.stringify({
type: "message",
id: "m-new",
message: { role: "user", content: "new transcript" },
})}\n`,
"utf-8",
);
await writeSessionStore({
entries: {
main: {
sessionId: "sess-old",
sessionFile: oldTranscriptPath,
updatedAt: Date.now(),
},
},
});
beforeResetHookState.hasBeforeResetHook = true;
const gatewayStorePath = resolveGatewaySessionStoreTarget({
cfg: loadConfig(),
key: "main",
}).storePath;
let pendingReset: ReturnType<typeof performGatewaySessionReset> | undefined;
await withSessionStoreLockForTest(gatewayStorePath, async () => {
pendingReset = performGatewaySessionReset({
key: "main",
reason: "new",
commandSource: "gateway:sessions.reset",
});
await vi.waitFor(() => {
expect(sessionHookMocks.triggerInternalHook).toHaveBeenCalledTimes(1);
});
await fs.writeFile(
gatewayStorePath,
JSON.stringify(
{
"agent:main:main": {
sessionId: "sess-new",
sessionFile: newTranscriptPath,
updatedAt: Date.now(),
},
},
null,
2,
),
"utf-8",
);
});
const reset = await pendingReset!;
expect(reset.ok).toBe(true);
const internalEvent = (
sessionHookMocks.triggerInternalHook.mock.calls as unknown as Array<[unknown]>
)[0]?.[0] as { context?: { previousSessionEntry?: { sessionId?: string } } } | undefined;
expect(internalEvent?.context?.previousSessionEntry?.sessionId).toBe("sess-old");
expect(beforeResetHookMocks.runBeforeReset).toHaveBeenCalledTimes(1);
const [event, context] = (
beforeResetHookMocks.runBeforeReset.mock.calls as unknown as Array<[unknown, unknown]>
)[0] ?? [undefined, undefined];
expect(event).toMatchObject({
sessionFile: newTranscriptPath,
reason: "new",
messages: [
{
role: "user",
content: "new transcript",
},
],
});
expect(context).toMatchObject({
agentId: "main",
sessionKey: "agent:main:main",
sessionId: "sess-new",
});
});
test("sessions.delete returns unavailable when active run does not stop", async () => {
const { dir, storePath } = await createSessionStoreDir();
await writeSingleLineSession(dir, "sess-active", "active");

View File

@ -3,7 +3,7 @@ import fs from "node:fs";
import path from "node:path";
import { CURRENT_SESSION_VERSION } from "@mariozechner/pi-coding-agent";
import { getAcpSessionManager } from "../acp/control-plane/manager.js";
import { resolveDefaultAgentId } from "../agents/agent-scope.js";
import { resolveAgentWorkspaceDir, resolveDefaultAgentId } from "../agents/agent-scope.js";
import { clearBootstrapSnapshot } from "../agents/bootstrap-cache.js";
import { abortEmbeddedPiRun, waitForEmbeddedPiRunEnd } from "../agents/pi-embedded.js";
import { stopSubagentsForRequester } from "../auto-reply/reply/abort.js";
@ -30,6 +30,7 @@ import {
archiveSessionTranscripts,
loadSessionEntry,
migrateAndPruneGatewaySessionStoreKey,
readSessionMessages,
resolveGatewaySessionStoreTarget,
resolveSessionModelRef,
} from "./session-utils.js";
@ -254,6 +255,54 @@ export async function cleanupSessionBeforeMutation(params: {
});
}
function emitGatewayBeforeResetPluginHook(params: {
cfg: ReturnType<typeof loadConfig>;
key: string;
target: ReturnType<typeof resolveGatewaySessionStoreTarget>;
storePath: string;
entry?: SessionEntry;
reason: "new" | "reset";
}): void {
const hookRunner = getGlobalHookRunner();
if (!hookRunner?.hasHooks("before_reset")) {
return;
}
const sessionKey = params.target.canonicalKey ?? params.key;
const sessionId = params.entry?.sessionId;
const sessionFile = params.entry?.sessionFile;
const agentId = normalizeAgentId(params.target.agentId ?? resolveDefaultAgentId(params.cfg));
const workspaceDir = resolveAgentWorkspaceDir(params.cfg, agentId);
let messages: unknown[] = [];
try {
if (typeof sessionId === "string" && sessionId.trim().length > 0) {
messages = readSessionMessages(sessionId, params.storePath, sessionFile);
}
} catch (err) {
logVerbose(
`before_reset: failed to read session messages for ${sessionId ?? "(none)"}; firing hook with empty messages (${String(err)})`,
);
}
void hookRunner
.runBeforeReset(
{
sessionFile,
messages,
reason: params.reason,
},
{
agentId,
sessionKey,
sessionId,
workspaceDir,
},
)
.catch((err) => {
logVerbose(`before_reset hook failed: ${String(err)}`);
});
}
export async function performGatewaySessionReset(params: {
key: string;
reason: "new" | "reset";
@ -296,6 +345,7 @@ export async function performGatewaySessionReset(params: {
let oldSessionId: string | undefined;
let oldSessionFile: string | undefined;
let resetSourceEntry: SessionEntry | undefined;
const next = await updateSessionStore(storePath, (store) => {
const { primaryKey } = migrateAndPruneGatewaySessionStoreKey({
cfg,
@ -303,6 +353,7 @@ export async function performGatewaySessionReset(params: {
store,
});
const currentEntry = store[primaryKey];
resetSourceEntry = currentEntry ? { ...currentEntry } : undefined;
const resetEntry = stripRuntimeModelState(currentEntry);
const parsed = parseAgentSessionKey(primaryKey);
const sessionAgentId = normalizeAgentId(parsed?.agentId ?? resolveDefaultAgentId(cfg));
@ -385,6 +436,14 @@ export async function performGatewaySessionReset(params: {
store[primaryKey] = nextEntry;
return nextEntry;
});
emitGatewayBeforeResetPluginHook({
cfg,
key: params.key,
target,
storePath,
entry: resetSourceEntry,
reason: params.reason,
});
archiveSessionTranscriptsForSession({
sessionId: oldSessionId,