diff --git a/src/config/sessions.test.ts b/src/config/sessions.test.ts index 221654659d9..a9ecbf37143 100644 --- a/src/config/sessions.test.ts +++ b/src/config/sessions.test.ts @@ -591,4 +591,52 @@ describe("sessions", () => { expect(store[mainSessionKey]?.thinkingLevel).toBe("high"); await expect(fs.stat(`${storePath}.lock`)).rejects.toThrow(); }); + + it("updateSessionStoreEntry re-reads disk inside lock instead of using stale cache", async () => { + const mainSessionKey = "agent:main:main"; + const dir = await createCaseDir("updateSessionStoreEntry-cache-bypass"); + const storePath = path.join(dir, "sessions.json"); + await fs.writeFile( + storePath, + JSON.stringify( + { + [mainSessionKey]: { + sessionId: "sess-1", + updatedAt: 123, + thinkingLevel: "low", + }, + }, + null, + 2, + ), + "utf-8", + ); + + // Prime the in-process cache with the original entry. + expect(loadSessionStore(storePath)[mainSessionKey]?.thinkingLevel).toBe("low"); + const originalStat = await fs.stat(storePath); + + // Simulate an external writer that updates the store but preserves mtime. + const externalStore = JSON.parse(await fs.readFile(storePath, "utf-8")) as Record< + string, + Record + >; + externalStore[mainSessionKey] = { + ...externalStore[mainSessionKey], + providerOverride: "anthropic", + updatedAt: 124, + }; + await fs.writeFile(storePath, JSON.stringify(externalStore, null, 2), "utf-8"); + await fs.utimes(storePath, originalStat.atime, originalStat.mtime); + + await updateSessionStoreEntry({ + storePath, + sessionKey: mainSessionKey, + update: async () => ({ thinkingLevel: "high" }), + }); + + const store = loadSessionStore(storePath); + expect(store[mainSessionKey]?.providerOverride).toBe("anthropic"); + expect(store[mainSessionKey]?.thinkingLevel).toBe("high"); + }); }); diff --git a/src/config/sessions/store.ts b/src/config/sessions/store.ts index 9ad45976b1f..d224f368299 100644 --- a/src/config/sessions/store.ts +++ b/src/config/sessions/store.ts @@ -806,7 +806,7 @@ export async function updateSessionStoreEntry(params: { }): Promise { const { storePath, sessionKey, update } = params; return await withSessionStoreLock(storePath, async () => { - const store = loadSessionStore(storePath); + const store = loadSessionStore(storePath, { skipCache: true }); const existing = store[sessionKey]; if (!existing) { return null; diff --git a/src/cron/isolated-agent/run.ts b/src/cron/isolated-agent/run.ts index 5a66e121281..4de81a3db62 100644 --- a/src/cron/isolated-agent/run.ts +++ b/src/cron/isolated-agent/run.ts @@ -154,6 +154,7 @@ export async function runCronIsolatedAgentTurn(params: { deps: CliDeps; job: CronJob; message: string; + abortSignal?: AbortSignal; sessionKey: string; agentId?: string; lane?: string; @@ -454,6 +455,9 @@ export async function runCronIsolatedAgentTurn(params: { agentDir, fallbacksOverride: resolveAgentModelFallbacksOverride(params.cfg, agentId), run: (providerOverride, modelOverride) => { + if (params.abortSignal?.aborted) { + throw new Error("cron: isolated run aborted"); + } if (isCliProvider(providerOverride, cfgWithAgentDefaults)) { const cliSessionId = getCliSessionId(cronSession.sessionEntry, providerOverride); return runCliAgent({ @@ -492,6 +496,7 @@ export async function runCronIsolatedAgentTurn(params: { runId: cronSession.sessionEntry.sessionId, requireExplicitMessageTarget: true, disableMessageTool: deliveryRequested, + abortSignal: params.abortSignal, }); }, }); diff --git a/src/cron/service.issue-regressions.test.ts b/src/cron/service.issue-regressions.test.ts index ac122840750..4e8c9d6f1e7 100644 --- a/src/cron/service.issue-regressions.test.ts +++ b/src/cron/service.issue-regressions.test.ts @@ -683,6 +683,55 @@ describe("Cron issue regressions", () => { expect(job?.state.lastStatus).toBe("ok"); }); + it("aborts isolated runs when cron timeout fires", async () => { + vi.useRealTimers(); + const store = await makeStorePath(); + const scheduledAt = Date.parse("2026-02-15T13:00:00.000Z"); + const cronJob = createIsolatedRegressionJob({ + id: "abort-on-timeout", + name: "abort timeout", + scheduledAt, + schedule: { kind: "at", at: new Date(scheduledAt).toISOString() }, + payload: { kind: "agentTurn", message: "work", timeoutSeconds: 0.01 }, + state: { nextRunAtMs: scheduledAt }, + }); + await writeCronJobs(store.storePath, [cronJob]); + + let now = scheduledAt; + let observedAbortSignal: AbortSignal | undefined; + const state = createCronServiceState({ + cronEnabled: true, + storePath: store.storePath, + log: noopLogger, + nowMs: () => now, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn(async ({ abortSignal }) => { + observedAbortSignal = abortSignal; + await new Promise((resolve) => { + if (!abortSignal) { + return; + } + if (abortSignal.aborted) { + resolve(); + return; + } + abortSignal.addEventListener("abort", () => resolve(), { once: true }); + }); + now += 5; + return { status: "ok" as const, summary: "late" }; + }), + }); + + await onTimer(state); + + expect(observedAbortSignal).toBeDefined(); + expect(observedAbortSignal?.aborted).toBe(true); + const job = state.store?.jobs.find((entry) => entry.id === "abort-on-timeout"); + expect(job?.state.lastStatus).toBe("error"); + expect(job?.state.lastError).toContain("timed out"); + }); + it("retries cron schedule computation from the next second when the first attempt returns undefined (#17821)", () => { const scheduledAt = Date.parse("2026-02-15T13:00:00.000Z"); const cronJob = createIsolatedRegressionJob({ diff --git a/src/cron/service/state.ts b/src/cron/service/state.ts index c331fa1290b..b366da7abc3 100644 --- a/src/cron/service/state.ts +++ b/src/cron/service/state.ts @@ -62,7 +62,11 @@ export type CronServiceDeps = { wakeNowHeartbeatBusyMaxWaitMs?: number; /** WakeMode=now: delay between runHeartbeatOnce retries while busy. */ wakeNowHeartbeatBusyRetryDelayMs?: number; - runIsolatedAgentJob: (params: { job: CronJob; message: string }) => Promise< + runIsolatedAgentJob: (params: { + job: CronJob; + message: string; + abortSignal?: AbortSignal; + }) => Promise< { summary?: string; /** Last non-empty agent text output (not truncated). */ diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index 1b6b108dab1..206c82d439f 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -267,18 +267,20 @@ export async function onTimer(state: CronServiceState) { : DEFAULT_JOB_TIMEOUT_MS; try { + const runAbortController = + typeof jobTimeoutMs === "number" ? new AbortController() : undefined; const result = typeof jobTimeoutMs === "number" ? await (async () => { let timeoutId: NodeJS.Timeout | undefined; try { return await Promise.race([ - executeJobCore(state, job), + executeJobCore(state, job, runAbortController?.signal), new Promise((_, reject) => { - timeoutId = setTimeout( - () => reject(new Error("cron: job execution timed out")), - jobTimeoutMs, - ); + timeoutId = setTimeout(() => { + runAbortController?.abort(new Error("cron: job execution timed out")); + reject(new Error("cron: job execution timed out")); + }, jobTimeoutMs); }), ]); } finally { @@ -565,6 +567,7 @@ export async function runDueJobs(state: CronServiceState) { async function executeJobCore( state: CronServiceState, job: CronJob, + abortSignal?: AbortSignal, ): Promise { if (job.sessionTarget === "main") { const text = resolveJobPayloadTextForMain(job); @@ -634,10 +637,14 @@ async function executeJobCore( if (job.payload.kind !== "agentTurn") { return { status: "skipped", error: "isolated job requires payload.kind=agentTurn" }; } + if (abortSignal?.aborted) { + return { status: "error", error: "cron: job execution aborted" }; + } const res = await state.deps.runIsolatedAgentJob({ job, message: job.payload.message, + abortSignal, }); // Post a short summary back to the main session — but only when the diff --git a/src/gateway/server-cron.ts b/src/gateway/server-cron.ts index b681377b13c..b0b2de28cac 100644 --- a/src/gateway/server-cron.ts +++ b/src/gateway/server-cron.ts @@ -185,13 +185,14 @@ export function buildGatewayCronService(params: { deps: { ...params.deps, runtime: defaultRuntime }, }); }, - runIsolatedAgentJob: async ({ job, message }) => { + runIsolatedAgentJob: async ({ job, message, abortSignal }) => { const { agentId, cfg: runtimeConfig } = resolveCronAgent(job.agentId); return await runCronIsolatedAgentTurn({ cfg: runtimeConfig, deps: params.deps, job, message, + abortSignal, agentId, sessionKey: `cron:${job.id}`, lane: "cron", diff --git a/src/infra/provider-usage.fetch.claude.test.ts b/src/infra/provider-usage.fetch.claude.test.ts index b8fbaffb71c..7650a8e8c87 100644 --- a/src/infra/provider-usage.fetch.claude.test.ts +++ b/src/infra/provider-usage.fetch.claude.test.ts @@ -107,7 +107,7 @@ describe("fetchClaudeUsage", () => { expect(result.windows).toEqual([{ label: "5h", usedPercent: 12, resetAt: undefined }]); }); - it("keeps oauth error when cookie header cannot be parsed into a session key", async () => { + it("parses sessionKey from CLAUDE_WEB_COOKIE for web fallback", async () => { vi.stubEnv("CLAUDE_WEB_COOKIE", "sessionKey=sk-ant-cookie-session"); const mockFetch = createScopeFallbackFetch(async (url) => { @@ -120,7 +120,10 @@ describe("fetchClaudeUsage", () => { return makeResponse(404, "not found"); }); - await expectMissingScopeWithoutFallback(mockFetch); + const result = await fetchClaudeUsage("token", 5000, mockFetch); + expect(result.error).toBeUndefined(); + expect(result.windows).toEqual([{ label: "Opus", usedPercent: 44 }]); + expect(mockFetch).toHaveBeenCalledTimes(3); }); it("keeps oauth error when fallback session key is unavailable", async () => { diff --git a/src/infra/provider-usage.fetch.claude.ts b/src/infra/provider-usage.fetch.claude.ts index 927c76e4c0b..41ffcb37b20 100644 --- a/src/infra/provider-usage.fetch.claude.ts +++ b/src/infra/provider-usage.fetch.claude.ts @@ -57,8 +57,8 @@ function resolveClaudeWebSessionKey(): string | undefined { if (!cookieHeader) { return undefined; } - const stripped = cookieHeader.replace(/^cookie:\\s*/i, ""); - const match = stripped.match(/(?:^|;\\s*)sessionKey=([^;\\s]+)/i); + const stripped = cookieHeader.replace(/^cookie:\s*/i, ""); + const match = stripped.match(/(?:^|;\s*)sessionKey=([^;\s]+)/i); const value = match?.[1]?.trim(); return value?.startsWith("sk-ant-") ? value : undefined; } diff --git a/src/signal/daemon.ts b/src/signal/daemon.ts index cc99f6ca37a..e85eb0021fa 100644 --- a/src/signal/daemon.ts +++ b/src/signal/daemon.ts @@ -16,6 +16,8 @@ export type SignalDaemonOpts = { export type SignalDaemonHandle = { pid?: number; stop: () => void; + exited: Promise<{ code: number | null; signal: NodeJS.Signals | null }>; + isExited: () => boolean; }; export function classifySignalCliLogLine(line: string): "log" | "error" | null { @@ -83,17 +85,51 @@ export function spawnSignalDaemon(opts: SignalDaemonOpts): SignalDaemonHandle { }); const log = opts.runtime?.log ?? (() => {}); const error = opts.runtime?.error ?? (() => {}); + let exited = false; + let settledExit = false; + let resolveExit!: (value: { code: number | null; signal: NodeJS.Signals | null }) => void; + const exitedPromise = new Promise<{ code: number | null; signal: NodeJS.Signals | null }>( + (resolve) => { + resolveExit = resolve; + }, + ); + const settleExit = (value: { code: number | null; signal: NodeJS.Signals | null }) => { + if (settledExit) { + return; + } + settledExit = true; + exited = true; + resolveExit(value); + }; bindSignalCliOutput({ stream: child.stdout, log, error }); bindSignalCliOutput({ stream: child.stderr, log, error }); + child.once("exit", (code, signal) => { + settleExit({ + code: typeof code === "number" ? code : null, + signal: signal ?? null, + }); + error( + `signal-cli daemon exited (code=${String(code ?? "null")} signal=${String(signal ?? "null")})`, + ); + }); + child.once("close", (code, signal) => { + settleExit({ + code: typeof code === "number" ? code : null, + signal: signal ?? null, + }); + }); child.on("error", (err) => { error(`signal-cli spawn error: ${String(err)}`); + settleExit({ code: null, signal: null }); }); return { pid: child.pid ?? undefined, + exited: exitedPromise, + isExited: () => exited, stop: () => { - if (!child.killed) { + if (!child.killed && !exited) { child.kill("SIGTERM"); } }, diff --git a/src/signal/monitor.tool-result.sends-tool-summaries-responseprefix.test.ts b/src/signal/monitor.tool-result.sends-tool-summaries-responseprefix.test.ts index f21d2230324..6cbaf96623b 100644 --- a/src/signal/monitor.tool-result.sends-tool-summaries-responseprefix.test.ts +++ b/src/signal/monitor.tool-result.sends-tool-summaries-responseprefix.test.ts @@ -23,6 +23,7 @@ const { updateLastRouteMock, upsertPairingRequestMock, waitForTransportReadyMock, + spawnSignalDaemonMock, } = getSignalToolResultTestMocks(); const SIGNAL_BASE_URL = "http://127.0.0.1:8080"; @@ -176,7 +177,7 @@ describe("monitorSignalProvider tool results", () => { logIntervalMs: 10_000, pollIntervalMs: 150, runtime, - abortSignal: abortController.signal, + abortSignal: expect.any(AbortSignal), }), ); }); @@ -212,6 +213,39 @@ describe("monitorSignalProvider tool results", () => { expectWaitForTransportReadyTimeout(120_000); }); + it("fails fast when auto-started signal daemon exits during startup", async () => { + const runtime = createMonitorRuntime(); + setSignalAutoStartConfig(); + spawnSignalDaemonMock.mockReturnValueOnce({ + stop: vi.fn(), + exited: Promise.resolve({ code: 1, signal: null }), + isExited: () => true, + }); + waitForTransportReadyMock.mockImplementationOnce( + async (params: { abortSignal?: AbortSignal | null }) => { + await new Promise((_resolve, reject) => { + if (params.abortSignal?.aborted) { + reject(params.abortSignal.reason); + return; + } + params.abortSignal?.addEventListener( + "abort", + () => reject(params.abortSignal?.reason ?? new Error("aborted")), + { once: true }, + ); + }); + }, + ); + + await expect( + runMonitorWithMocks({ + autoStart: true, + baseUrl: SIGNAL_BASE_URL, + runtime, + }), + ).rejects.toThrow(/signal daemon exited/i); + }); + it("skips tool summaries with responsePrefix", async () => { replyMock.mockResolvedValue({ text: "final reply" }); diff --git a/src/signal/monitor.tool-result.test-harness.ts b/src/signal/monitor.tool-result.test-harness.ts index 7d1919c5bb4..e05ebe94f5f 100644 --- a/src/signal/monitor.tool-result.test-harness.ts +++ b/src/signal/monitor.tool-result.test-harness.ts @@ -13,6 +13,7 @@ type SignalToolResultTestMocks = { streamMock: MockFn; signalCheckMock: MockFn; signalRpcRequestMock: MockFn; + spawnSignalDaemonMock: MockFn; }; const waitForTransportReadyMock = vi.hoisted(() => vi.fn()) as unknown as MockFn; @@ -24,6 +25,7 @@ const upsertPairingRequestMock = vi.hoisted(() => vi.fn()) as unknown as MockFn; const streamMock = vi.hoisted(() => vi.fn()) as unknown as MockFn; const signalCheckMock = vi.hoisted(() => vi.fn()) as unknown as MockFn; const signalRpcRequestMock = vi.hoisted(() => vi.fn()) as unknown as MockFn; +const spawnSignalDaemonMock = vi.hoisted(() => vi.fn()) as unknown as MockFn; export function getSignalToolResultTestMocks(): SignalToolResultTestMocks { return { @@ -36,6 +38,7 @@ export function getSignalToolResultTestMocks(): SignalToolResultTestMocks { streamMock, signalCheckMock, signalRpcRequestMock, + spawnSignalDaemonMock, }; } @@ -84,7 +87,7 @@ vi.mock("./client.js", () => ({ })); vi.mock("./daemon.js", () => ({ - spawnSignalDaemon: vi.fn(() => ({ stop: vi.fn() })), + spawnSignalDaemon: (...args: unknown[]) => spawnSignalDaemonMock(...args), })); vi.mock("../infra/transport-ready.js", () => ({ @@ -107,6 +110,11 @@ export function installSignalToolResultTestHooks() { streamMock.mockReset(); signalCheckMock.mockReset().mockResolvedValue({}); signalRpcRequestMock.mockReset().mockResolvedValue({}); + spawnSignalDaemonMock.mockReset().mockReturnValue({ + stop: vi.fn(), + exited: new Promise(() => {}), + isExited: () => false, + }); readAllowFromStoreMock.mockReset().mockResolvedValue([]); upsertPairingRequestMock.mockReset().mockResolvedValue({ code: "PAIRCODE", created: true }); waitForTransportReadyMock.mockReset().mockResolvedValue(undefined); diff --git a/src/signal/monitor.ts b/src/signal/monitor.ts index baf45795c19..0bcff74b795 100644 --- a/src/signal/monitor.ts +++ b/src/signal/monitor.ts @@ -47,6 +47,46 @@ function resolveRuntime(opts: MonitorSignalOpts): RuntimeEnv { return opts.runtime ?? createNonExitingRuntime(); } +function mergeAbortSignals( + a?: AbortSignal, + b?: AbortSignal, +): { signal?: AbortSignal; dispose: () => void } { + if (!a && !b) { + return { signal: undefined, dispose: () => {} }; + } + if (!a) { + return { signal: b, dispose: () => {} }; + } + if (!b) { + return { signal: a, dispose: () => {} }; + } + const controller = new AbortController(); + const abortFrom = (source: AbortSignal) => { + if (!controller.signal.aborted) { + controller.abort(source.reason); + } + }; + if (a.aborted) { + abortFrom(a); + return { signal: controller.signal, dispose: () => {} }; + } + if (b.aborted) { + abortFrom(b); + return { signal: controller.signal, dispose: () => {} }; + } + const onAbortA = () => abortFrom(a); + const onAbortB = () => abortFrom(b); + a.addEventListener("abort", onAbortA, { once: true }); + b.addEventListener("abort", onAbortB, { once: true }); + return { + signal: controller.signal, + dispose: () => { + a.removeEventListener("abort", onAbortA); + b.removeEventListener("abort", onAbortB); + }, + }; +} + function normalizeAllowList(raw?: Array): string[] { return normalizeStringEntries(raw); } @@ -286,6 +326,9 @@ export async function monitorSignalProvider(opts: MonitorSignalOpts = {}): Promi Math.max(1_000, opts.startupTimeoutMs ?? accountInfo.config.startupTimeoutMs ?? 30_000), ); const readReceiptsViaDaemon = Boolean(autoStart && sendReadReceipts); + let daemonExitError: Error | undefined; + const daemonAbortController = new AbortController(); + const mergedAbort = mergeAbortSignals(opts.abortSignal, daemonAbortController.signal); let daemonHandle: ReturnType | null = null; if (autoStart) { @@ -303,6 +346,14 @@ export async function monitorSignalProvider(opts: MonitorSignalOpts = {}): Promi sendReadReceipts, runtime, }); + void daemonHandle.exited.then((exit) => { + daemonExitError = new Error( + `signal daemon exited (code=${String(exit.code ?? "null")} signal=${String(exit.signal ?? "null")})`, + ); + if (!daemonAbortController.signal.aborted) { + daemonAbortController.abort(daemonExitError); + } + }); } const onAbort = () => { @@ -314,12 +365,15 @@ export async function monitorSignalProvider(opts: MonitorSignalOpts = {}): Promi if (daemonHandle) { await waitForSignalDaemonReady({ baseUrl, - abortSignal: opts.abortSignal, + abortSignal: mergedAbort.signal, timeoutMs: startupTimeoutMs, logAfterMs: 10_000, logIntervalMs: 10_000, runtime, }); + if (daemonExitError) { + throw daemonExitError; + } } const handleEvent = createSignalEventHandler({ @@ -353,7 +407,7 @@ export async function monitorSignalProvider(opts: MonitorSignalOpts = {}): Promi await runSignalSseLoop({ baseUrl, account, - abortSignal: opts.abortSignal, + abortSignal: mergedAbort.signal, runtime, onEvent: (event) => { void handleEvent(event).catch((err) => { @@ -361,12 +415,16 @@ export async function monitorSignalProvider(opts: MonitorSignalOpts = {}): Promi }); }, }); + if (daemonExitError) { + throw daemonExitError; + } } catch (err) { - if (opts.abortSignal?.aborted) { + if (opts.abortSignal?.aborted && !daemonExitError) { return; } throw err; } finally { + mergedAbort.dispose(); opts.abortSignal?.removeEventListener("abort", onAbort); daemonHandle?.stop(); } diff --git a/src/web/auto-reply/deliver-reply.test.ts b/src/web/auto-reply/deliver-reply.test.ts index ff5f7b6f100..385fcd65af7 100644 --- a/src/web/auto-reply/deliver-reply.test.ts +++ b/src/web/auto-reply/deliver-reply.test.ts @@ -98,6 +98,28 @@ describe("deliverWebReply", () => { expect(sleep).toHaveBeenCalledWith(500); }); + it("retries text send when error contains timed out", async () => { + const msg = makeMsg(); + (msg.reply as unknown as { mockRejectedValueOnce: (v: unknown) => void }).mockRejectedValueOnce( + new Error("operation timed out"), + ); + (msg.reply as unknown as { mockResolvedValueOnce: (v: unknown) => void }).mockResolvedValueOnce( + undefined, + ); + + await deliverWebReply({ + replyResult: { text: "hi" }, + msg, + maxMediaBytes: 1024 * 1024, + textLimit: 200, + replyLogger, + skipLog: true, + }); + + expect(msg.reply).toHaveBeenCalledTimes(2); + expect(sleep).toHaveBeenCalledWith(500); + }); + it("sends image media with caption and then remaining text", async () => { const msg = makeMsg(); const mediaLocalRoots = ["/tmp/workspace-work"]; diff --git a/src/web/auto-reply/deliver-reply.ts b/src/web/auto-reply/deliver-reply.ts index cb9b1e6ed44..664e8acee85 100644 --- a/src/web/auto-reply/deliver-reply.ts +++ b/src/web/auto-reply/deliver-reply.ts @@ -50,7 +50,7 @@ export async function deliverWebReply(params: { lastErr = err; const errText = formatError(err); const isLast = attempt === maxAttempts; - const shouldRetry = /closed|reset|timed\\s*out|disconnect/i.test(errText); + const shouldRetry = /closed|reset|timed\s*out|disconnect/i.test(errText); if (!shouldRetry || isLast) { throw err; }