mirror of https://github.com/openclaw/openclaw.git
test: retry gateway acp bind warmup
This commit is contained in:
parent
d771f7dcb7
commit
78d1120a41
|
|
@ -99,9 +99,40 @@ async function waitForGatewayPort(params: {
|
|||
}
|
||||
|
||||
async function connectClient(params: { url: string; token: string; timeoutMs?: number }) {
|
||||
const timeoutMs = params.timeoutMs ?? CONNECT_TIMEOUT_MS;
|
||||
const startedAt = Date.now();
|
||||
let attempt = 0;
|
||||
let lastError: Error | null = null;
|
||||
|
||||
while (Date.now() - startedAt < timeoutMs) {
|
||||
attempt += 1;
|
||||
const remainingMs = timeoutMs - (Date.now() - startedAt);
|
||||
if (remainingMs <= 0) {
|
||||
break;
|
||||
}
|
||||
try {
|
||||
return await connectClientOnce({
|
||||
...params,
|
||||
timeoutMs: Math.min(remainingMs, 35_000),
|
||||
});
|
||||
} catch (error) {
|
||||
lastError = error instanceof Error ? error : new Error(String(error));
|
||||
if (!isRetryableGatewayConnectError(lastError) || remainingMs <= 5_000) {
|
||||
throw lastError;
|
||||
}
|
||||
logLiveStep(`gateway connect warmup retry ${attempt}: ${lastError.message}`);
|
||||
await sleep(Math.min(1_000 * attempt, 5_000));
|
||||
}
|
||||
}
|
||||
|
||||
throw lastError ?? new Error("gateway connect timeout");
|
||||
}
|
||||
|
||||
async function connectClientOnce(params: { url: string; token: string; timeoutMs?: number }) {
|
||||
const timeoutMs = params.timeoutMs ?? CONNECT_TIMEOUT_MS;
|
||||
return await new Promise<GatewayClient>((resolve, reject) => {
|
||||
let done = false;
|
||||
let client: GatewayClient | undefined;
|
||||
const finish = (result: { client?: GatewayClient; error?: Error }) => {
|
||||
if (done) {
|
||||
return;
|
||||
|
|
@ -109,13 +140,16 @@ async function connectClient(params: { url: string; token: string; timeoutMs?: n
|
|||
done = true;
|
||||
clearTimeout(connectTimeout);
|
||||
if (result.error) {
|
||||
if (client) {
|
||||
void client.stopAndWait({ timeoutMs: 1_000 }).catch(() => {});
|
||||
}
|
||||
reject(result.error);
|
||||
return;
|
||||
}
|
||||
resolve(result.client as GatewayClient);
|
||||
};
|
||||
|
||||
const client = new GatewayClient({
|
||||
client = new GatewayClient({
|
||||
url: params.url,
|
||||
token: params.token,
|
||||
clientName: GATEWAY_CLIENT_NAMES.TEST,
|
||||
|
|
@ -138,25 +172,94 @@ async function connectClient(params: { url: string; token: string; timeoutMs?: n
|
|||
});
|
||||
}
|
||||
|
||||
async function waitForAcpBackendHealthy(timeoutMs = 60_000): Promise<void> {
|
||||
function isRetryableGatewayConnectError(error: Error): boolean {
|
||||
const message = error.message.toLowerCase();
|
||||
return (
|
||||
message.includes("gateway closed during connect (1000)") ||
|
||||
message.includes("gateway connect timeout") ||
|
||||
message.includes("gateway connect challenge timeout")
|
||||
);
|
||||
}
|
||||
|
||||
function isRetryableAcpBindWarmupText(texts: string[]): boolean {
|
||||
const combined = texts.join("\n\n").toLowerCase();
|
||||
return (
|
||||
combined.includes("acp runtime backend is currently unavailable") ||
|
||||
combined.includes("try again in a moment") ||
|
||||
combined.includes("acp runtime backend is not configured") ||
|
||||
combined.includes("acp dispatch is disabled")
|
||||
);
|
||||
}
|
||||
|
||||
function formatAssistantTextPreview(texts: string[], maxChars = 600): string {
|
||||
const combined = texts.join("\n\n").trim();
|
||||
if (!combined) {
|
||||
return "<empty>";
|
||||
}
|
||||
if (combined.length <= maxChars) {
|
||||
return combined;
|
||||
}
|
||||
return combined.slice(-maxChars);
|
||||
}
|
||||
|
||||
async function bindConversationAndWait(params: {
|
||||
client: GatewayClient;
|
||||
sessionKey: string;
|
||||
liveAgent: "claude" | "codex";
|
||||
originatingChannel: string;
|
||||
originatingTo: string;
|
||||
originatingAccountId: string;
|
||||
timeoutMs?: number;
|
||||
}): Promise<{ mainAssistantTexts: string[]; spawnedSessionKey: string }> {
|
||||
const timeoutMs = params.timeoutMs ?? 90_000;
|
||||
const startedAt = Date.now();
|
||||
let attempt = 0;
|
||||
|
||||
while (Date.now() - startedAt < timeoutMs) {
|
||||
attempt += 1;
|
||||
const backend = getAcpRuntimeBackend("acpx");
|
||||
if (backend?.healthy?.() ?? false) {
|
||||
return;
|
||||
}
|
||||
const runtime = backend?.runtime as { probeAvailability?: () => Promise<void> } | undefined;
|
||||
if (runtime?.probeAvailability) {
|
||||
await runtime.probeAvailability().catch(() => {});
|
||||
if (backend?.healthy?.() ?? false) {
|
||||
return;
|
||||
}
|
||||
} else if (backend && !backend.healthy) {
|
||||
return;
|
||||
}
|
||||
await sleep(250);
|
||||
if (!(backend?.healthy?.() ?? false)) {
|
||||
logLiveStep(`acpx backend still unhealthy before bind attempt ${attempt}`);
|
||||
await sleep(5_000);
|
||||
continue;
|
||||
}
|
||||
|
||||
await sendChatAndWait({
|
||||
client: params.client,
|
||||
sessionKey: params.sessionKey,
|
||||
idempotencyKey: `idem-bind-${randomUUID()}`,
|
||||
message: `/acp spawn ${params.liveAgent} --bind here`,
|
||||
originatingChannel: params.originatingChannel,
|
||||
originatingTo: params.originatingTo,
|
||||
originatingAccountId: params.originatingAccountId,
|
||||
});
|
||||
|
||||
const mainHistory = await params.client.request<{ messages?: unknown[] }>("chat.history", {
|
||||
sessionKey: params.sessionKey,
|
||||
limit: 16,
|
||||
});
|
||||
const mainAssistantTexts = extractAssistantTexts(mainHistory.messages ?? []);
|
||||
const spawnedSessionKey = extractSpawnedAcpSessionKey(mainAssistantTexts);
|
||||
if (
|
||||
mainAssistantTexts.join("\n\n").includes("Bound this conversation to") &&
|
||||
spawnedSessionKey
|
||||
) {
|
||||
return { mainAssistantTexts, spawnedSessionKey };
|
||||
}
|
||||
if (!isRetryableAcpBindWarmupText(mainAssistantTexts)) {
|
||||
throw new Error(
|
||||
`bind command did not produce an ACP session: ${formatAssistantTextPreview(mainAssistantTexts)}`,
|
||||
);
|
||||
}
|
||||
logLiveStep(`acpx backend still warming up; retrying bind (${attempt})`);
|
||||
await sleep(5_000);
|
||||
}
|
||||
throw new Error("timed out waiting for the acpx runtime backend to become healthy");
|
||||
|
||||
throw new Error("timed out waiting for the ACP bind command to succeed");
|
||||
}
|
||||
|
||||
async function waitForAgentRunOk(
|
||||
|
|
@ -301,32 +404,17 @@ describeLive("gateway live (ACP bind)", () => {
|
|||
logLiveStep("gateway websocket connected");
|
||||
|
||||
try {
|
||||
logLiveStep("waiting for acpx backend health");
|
||||
await waitForAcpBackendHealthy();
|
||||
logLiveStep("acpx backend healthy");
|
||||
|
||||
await sendChatAndWait({
|
||||
const { mainAssistantTexts, spawnedSessionKey } = await bindConversationAndWait({
|
||||
client,
|
||||
sessionKey: originalSessionKey,
|
||||
idempotencyKey: `idem-bind-${randomUUID()}`,
|
||||
message: `/acp spawn ${liveAgent} --bind here`,
|
||||
liveAgent,
|
||||
originatingChannel: "slack",
|
||||
originatingTo: conversationId,
|
||||
originatingAccountId: accountId,
|
||||
});
|
||||
logLiveStep("bind command completed");
|
||||
|
||||
const mainHistory = await client.request<{ messages?: unknown[] }>("chat.history", {
|
||||
sessionKey: originalSessionKey,
|
||||
limit: 12,
|
||||
});
|
||||
const mainAssistantTexts = extractAssistantTexts(mainHistory.messages ?? []);
|
||||
const spawnedSessionKey = extractSpawnedAcpSessionKey(mainAssistantTexts);
|
||||
expect(mainAssistantTexts.join("\n\n")).toContain("Bound this conversation to");
|
||||
expect(spawnedSessionKey).toMatch(new RegExp(`^agent:${liveAgent}:acp:`));
|
||||
if (!spawnedSessionKey) {
|
||||
throw new Error("bind response did not expose the spawned ACP session key");
|
||||
}
|
||||
logLiveStep(`binding announced for session ${spawnedSessionKey ?? "missing"}`);
|
||||
|
||||
await sendChatAndWait({
|
||||
|
|
|
|||
Loading…
Reference in New Issue