mirror of https://github.com/openclaw/openclaw.git
fix: wrap heartbeat run() in try/finally to guarantee timer re-arm
The heartbeat scheduler silently dies after the first batch because run() has multiple exit paths that skip scheduleNext(). Once any path returns without re-arming the timer, heartbeats stop permanently. The most common trigger is the requests-in-flight early return, but any unhandled rejection in the async chain between runOnce() and the manual scheduleNext() calls can also kill the timer. Fix: wrap the run() body in try/finally so scheduleNext() is called on every exit path. All manual scheduleNext() calls inside are removed — the finally block is the single re-arm point. The requests-in-flight case is excluded via a flag because the wake layer (heartbeat-wake.ts) already handles retry scheduling for this case with DEFAULT_RETRY_MS. Calling scheduleNext() here would register a 0ms timer that races with the wake layer's 1s retry. Fixes #31139 Fixes #45772 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
cfbef8035d
commit
b83efd4a8a
|
|
@ -1090,81 +1090,87 @@ export function startHeartbeatRunner(opts: {
|
|||
const startedAt = Date.now();
|
||||
const now = startedAt;
|
||||
let ran = false;
|
||||
// Track requests-in-flight so we can skip re-arm in finally — the wake
|
||||
// layer handles retry for this case (DEFAULT_RETRY_MS = 1 s).
|
||||
let requestsInFlight = false;
|
||||
|
||||
if (requestedSessionKey || requestedAgentId) {
|
||||
const targetAgentId = requestedAgentId ?? resolveAgentIdFromSessionKey(requestedSessionKey);
|
||||
const targetAgent = state.agents.get(targetAgentId);
|
||||
if (!targetAgent) {
|
||||
scheduleNext();
|
||||
return { status: "skipped", reason: "disabled" };
|
||||
}
|
||||
try {
|
||||
const res = await runOnce({
|
||||
cfg: state.cfg,
|
||||
agentId: targetAgent.agentId,
|
||||
heartbeat: targetAgent.heartbeat,
|
||||
reason,
|
||||
sessionKey: requestedSessionKey,
|
||||
deps: { runtime: state.runtime },
|
||||
});
|
||||
if (res.status !== "skipped" || res.reason !== "disabled") {
|
||||
advanceAgentSchedule(targetAgent, now);
|
||||
try {
|
||||
if (requestedSessionKey || requestedAgentId) {
|
||||
const targetAgentId = requestedAgentId ?? resolveAgentIdFromSessionKey(requestedSessionKey);
|
||||
const targetAgent = state.agents.get(targetAgentId);
|
||||
if (!targetAgent) {
|
||||
return { status: "skipped", reason: "disabled" };
|
||||
}
|
||||
try {
|
||||
const res = await runOnce({
|
||||
cfg: state.cfg,
|
||||
agentId: targetAgent.agentId,
|
||||
heartbeat: targetAgent.heartbeat,
|
||||
reason,
|
||||
sessionKey: requestedSessionKey,
|
||||
deps: { runtime: state.runtime },
|
||||
});
|
||||
if (res.status !== "skipped" || res.reason !== "disabled") {
|
||||
advanceAgentSchedule(targetAgent, now);
|
||||
}
|
||||
return res.status === "ran" ? { status: "ran", durationMs: Date.now() - startedAt } : res;
|
||||
} catch (err) {
|
||||
const errMsg = formatErrorMessage(err);
|
||||
log.error(`heartbeat runner: targeted runOnce threw unexpectedly: ${errMsg}`, {
|
||||
error: errMsg,
|
||||
});
|
||||
advanceAgentSchedule(targetAgent, now);
|
||||
return { status: "failed", reason: errMsg };
|
||||
}
|
||||
}
|
||||
|
||||
for (const agent of state.agents.values()) {
|
||||
if (isInterval && now < agent.nextDueMs) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let res: HeartbeatRunResult;
|
||||
try {
|
||||
res = await runOnce({
|
||||
cfg: state.cfg,
|
||||
agentId: agent.agentId,
|
||||
heartbeat: agent.heartbeat,
|
||||
reason,
|
||||
deps: { runtime: state.runtime },
|
||||
});
|
||||
} catch (err) {
|
||||
const errMsg = formatErrorMessage(err);
|
||||
log.error(`heartbeat runner: runOnce threw unexpectedly: ${errMsg}`, { error: errMsg });
|
||||
advanceAgentSchedule(agent, now);
|
||||
continue;
|
||||
}
|
||||
if (res.status === "skipped" && res.reason === "requests-in-flight") {
|
||||
// Do not advance the schedule — the main lane is busy and the wake
|
||||
// layer will retry shortly (DEFAULT_RETRY_MS = 1 s). Calling
|
||||
// scheduleNext() here would register a 0 ms timer that races with
|
||||
// the wake layer's 1 s retry and wins, bypassing the cooldown.
|
||||
requestsInFlight = true;
|
||||
return res;
|
||||
}
|
||||
if (res.status !== "skipped" || res.reason !== "disabled") {
|
||||
advanceAgentSchedule(agent, now);
|
||||
}
|
||||
if (res.status === "ran") {
|
||||
ran = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (ran) {
|
||||
return { status: "ran", durationMs: Date.now() - startedAt };
|
||||
}
|
||||
return { status: "skipped", reason: isInterval ? "not-due" : "disabled" };
|
||||
} finally {
|
||||
// Always re-arm the timer — except for requests-in-flight, where the
|
||||
// wake layer (heartbeat-wake.ts) handles retry via schedule(DEFAULT_RETRY_MS).
|
||||
if (!requestsInFlight) {
|
||||
scheduleNext();
|
||||
return res.status === "ran" ? { status: "ran", durationMs: Date.now() - startedAt } : res;
|
||||
} catch (err) {
|
||||
const errMsg = formatErrorMessage(err);
|
||||
log.error(`heartbeat runner: targeted runOnce threw unexpectedly: ${errMsg}`, {
|
||||
error: errMsg,
|
||||
});
|
||||
advanceAgentSchedule(targetAgent, now);
|
||||
scheduleNext();
|
||||
return { status: "failed", reason: errMsg };
|
||||
}
|
||||
}
|
||||
|
||||
for (const agent of state.agents.values()) {
|
||||
if (isInterval && now < agent.nextDueMs) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let res: HeartbeatRunResult;
|
||||
try {
|
||||
res = await runOnce({
|
||||
cfg: state.cfg,
|
||||
agentId: agent.agentId,
|
||||
heartbeat: agent.heartbeat,
|
||||
reason,
|
||||
deps: { runtime: state.runtime },
|
||||
});
|
||||
} catch (err) {
|
||||
// If runOnce throws (e.g. during session compaction), we must still
|
||||
// advance the timer and call scheduleNext so heartbeats keep firing.
|
||||
const errMsg = formatErrorMessage(err);
|
||||
log.error(`heartbeat runner: runOnce threw unexpectedly: ${errMsg}`, { error: errMsg });
|
||||
advanceAgentSchedule(agent, now);
|
||||
continue;
|
||||
}
|
||||
if (res.status === "skipped" && res.reason === "requests-in-flight") {
|
||||
// Do not advance the schedule — the main lane is busy and the wake
|
||||
// layer will retry shortly (DEFAULT_RETRY_MS = 1 s). Calling
|
||||
// scheduleNext() here would register a 0 ms timer that races with
|
||||
// the wake layer's 1 s retry and wins, bypassing the cooldown.
|
||||
return res;
|
||||
}
|
||||
if (res.status !== "skipped" || res.reason !== "disabled") {
|
||||
advanceAgentSchedule(agent, now);
|
||||
}
|
||||
if (res.status === "ran") {
|
||||
ran = true;
|
||||
}
|
||||
}
|
||||
|
||||
scheduleNext();
|
||||
if (ran) {
|
||||
return { status: "ran", durationMs: Date.now() - startedAt };
|
||||
}
|
||||
return { status: "skipped", reason: isInterval ? "not-due" : "disabled" };
|
||||
};
|
||||
|
||||
const wakeHandler: HeartbeatWakeHandler = async (params) =>
|
||||
|
|
|
|||
Loading…
Reference in New Issue