diff --git a/src/gateway/gateway-acp-bind.live.test.ts b/src/gateway/gateway-acp-bind.live.test.ts index e9ff122933d..c1864fdb4dd 100644 --- a/src/gateway/gateway-acp-bind.live.test.ts +++ b/src/gateway/gateway-acp-bind.live.test.ts @@ -99,9 +99,40 @@ async function waitForGatewayPort(params: { } async function connectClient(params: { url: string; token: string; timeoutMs?: number }) { + const timeoutMs = params.timeoutMs ?? CONNECT_TIMEOUT_MS; + const startedAt = Date.now(); + let attempt = 0; + let lastError: Error | null = null; + + while (Date.now() - startedAt < timeoutMs) { + attempt += 1; + const remainingMs = timeoutMs - (Date.now() - startedAt); + if (remainingMs <= 0) { + break; + } + try { + return await connectClientOnce({ + ...params, + timeoutMs: Math.min(remainingMs, 35_000), + }); + } catch (error) { + lastError = error instanceof Error ? error : new Error(String(error)); + if (!isRetryableGatewayConnectError(lastError) || remainingMs <= 5_000) { + throw lastError; + } + logLiveStep(`gateway connect warmup retry ${attempt}: ${lastError.message}`); + await sleep(Math.min(1_000 * attempt, 5_000)); + } + } + + throw lastError ?? new Error("gateway connect timeout"); +} + +async function connectClientOnce(params: { url: string; token: string; timeoutMs?: number }) { const timeoutMs = params.timeoutMs ?? CONNECT_TIMEOUT_MS; return await new Promise((resolve, reject) => { let done = false; + let client: GatewayClient | undefined; const finish = (result: { client?: GatewayClient; error?: Error }) => { if (done) { return; @@ -109,13 +140,16 @@ async function connectClient(params: { url: string; token: string; timeoutMs?: n done = true; clearTimeout(connectTimeout); if (result.error) { + if (client) { + void client.stopAndWait({ timeoutMs: 1_000 }).catch(() => {}); + } reject(result.error); return; } resolve(result.client as GatewayClient); }; - const client = new GatewayClient({ + client = new GatewayClient({ url: params.url, token: params.token, clientName: GATEWAY_CLIENT_NAMES.TEST, @@ -138,25 +172,94 @@ async function connectClient(params: { url: string; token: string; timeoutMs?: n }); } -async function waitForAcpBackendHealthy(timeoutMs = 60_000): Promise { +function isRetryableGatewayConnectError(error: Error): boolean { + const message = error.message.toLowerCase(); + return ( + message.includes("gateway closed during connect (1000)") || + message.includes("gateway connect timeout") || + message.includes("gateway connect challenge timeout") + ); +} + +function isRetryableAcpBindWarmupText(texts: string[]): boolean { + const combined = texts.join("\n\n").toLowerCase(); + return ( + combined.includes("acp runtime backend is currently unavailable") || + combined.includes("try again in a moment") || + combined.includes("acp runtime backend is not configured") || + combined.includes("acp dispatch is disabled") + ); +} + +function formatAssistantTextPreview(texts: string[], maxChars = 600): string { + const combined = texts.join("\n\n").trim(); + if (!combined) { + return ""; + } + if (combined.length <= maxChars) { + return combined; + } + return combined.slice(-maxChars); +} + +async function bindConversationAndWait(params: { + client: GatewayClient; + sessionKey: string; + liveAgent: "claude" | "codex"; + originatingChannel: string; + originatingTo: string; + originatingAccountId: string; + timeoutMs?: number; +}): Promise<{ mainAssistantTexts: string[]; spawnedSessionKey: string }> { + const timeoutMs = params.timeoutMs ?? 90_000; const startedAt = Date.now(); + let attempt = 0; + while (Date.now() - startedAt < timeoutMs) { + attempt += 1; const backend = getAcpRuntimeBackend("acpx"); - if (backend?.healthy?.() ?? false) { - return; - } const runtime = backend?.runtime as { probeAvailability?: () => Promise } | undefined; if (runtime?.probeAvailability) { await runtime.probeAvailability().catch(() => {}); - if (backend?.healthy?.() ?? false) { - return; - } - } else if (backend && !backend.healthy) { - return; } - await sleep(250); + if (!(backend?.healthy?.() ?? false)) { + logLiveStep(`acpx backend still unhealthy before bind attempt ${attempt}`); + await sleep(5_000); + continue; + } + + await sendChatAndWait({ + client: params.client, + sessionKey: params.sessionKey, + idempotencyKey: `idem-bind-${randomUUID()}`, + message: `/acp spawn ${params.liveAgent} --bind here`, + originatingChannel: params.originatingChannel, + originatingTo: params.originatingTo, + originatingAccountId: params.originatingAccountId, + }); + + const mainHistory = await params.client.request<{ messages?: unknown[] }>("chat.history", { + sessionKey: params.sessionKey, + limit: 16, + }); + const mainAssistantTexts = extractAssistantTexts(mainHistory.messages ?? []); + const spawnedSessionKey = extractSpawnedAcpSessionKey(mainAssistantTexts); + if ( + mainAssistantTexts.join("\n\n").includes("Bound this conversation to") && + spawnedSessionKey + ) { + return { mainAssistantTexts, spawnedSessionKey }; + } + if (!isRetryableAcpBindWarmupText(mainAssistantTexts)) { + throw new Error( + `bind command did not produce an ACP session: ${formatAssistantTextPreview(mainAssistantTexts)}`, + ); + } + logLiveStep(`acpx backend still warming up; retrying bind (${attempt})`); + await sleep(5_000); } - throw new Error("timed out waiting for the acpx runtime backend to become healthy"); + + throw new Error("timed out waiting for the ACP bind command to succeed"); } async function waitForAgentRunOk( @@ -301,32 +404,17 @@ describeLive("gateway live (ACP bind)", () => { logLiveStep("gateway websocket connected"); try { - logLiveStep("waiting for acpx backend health"); - await waitForAcpBackendHealthy(); - logLiveStep("acpx backend healthy"); - - await sendChatAndWait({ + const { mainAssistantTexts, spawnedSessionKey } = await bindConversationAndWait({ client, sessionKey: originalSessionKey, - idempotencyKey: `idem-bind-${randomUUID()}`, - message: `/acp spawn ${liveAgent} --bind here`, + liveAgent, originatingChannel: "slack", originatingTo: conversationId, originatingAccountId: accountId, }); logLiveStep("bind command completed"); - - const mainHistory = await client.request<{ messages?: unknown[] }>("chat.history", { - sessionKey: originalSessionKey, - limit: 12, - }); - const mainAssistantTexts = extractAssistantTexts(mainHistory.messages ?? []); - const spawnedSessionKey = extractSpawnedAcpSessionKey(mainAssistantTexts); expect(mainAssistantTexts.join("\n\n")).toContain("Bound this conversation to"); expect(spawnedSessionKey).toMatch(new RegExp(`^agent:${liveAgent}:acp:`)); - if (!spawnedSessionKey) { - throw new Error("bind response did not expose the spawned ACP session key"); - } logLiveStep(`binding announced for session ${spawnedSessionKey ?? "missing"}`); await sendChatAndWait({