From 3f7f2c8dc96e353173eb5572b46775cff5b80147 Mon Sep 17 00:00:00 2001 From: Josh Avant <830519+joshavant@users.noreply.github.com> Date: Sat, 21 Mar 2026 04:15:16 -0500 Subject: [PATCH] Voice Call: enforce spoken-output contract and fix stream TTS silence regression (#51500) * voice-call: harden streaming startup and fallback behavior * voice-call: suppress barge-in during intro * voice-call: skip first-turn auto-response while greeting plays * Voice-call: improve telephony audio fidelity and pacing * voice-call: enforce spoken JSON and first-message barge skip * voice-call: fix silent stream TTS regression * voice-call: remove TTS timing diagnostics and document stream behavior * voice-call: fail stream playback when stream sends are dropped * voice-call: harden spoken contract and initial stream replay * voice-call: suppress barge transcripts during initial greeting * voice-call: harden stream fallback and media safety --- CHANGELOG.md | 1 + docs/plugins/voice-call.md | 30 +++ extensions/voice-call/README.md | 4 + .../voice-call/src/manager.notify.test.ts | 239 ++++++++++++++++- .../voice-call/src/manager.test-harness.ts | 5 + extensions/voice-call/src/manager.ts | 31 +++ extensions/voice-call/src/manager/context.ts | 1 + .../voice-call/src/manager/events.test.ts | 1 + extensions/voice-call/src/manager/outbound.ts | 61 +++-- .../voice-call/src/media-stream.test.ts | 101 +++++++- extensions/voice-call/src/media-stream.ts | 112 +++++++- .../voice-call/src/providers/tts-openai.ts | 46 +--- .../voice-call/src/providers/twilio.test.ts | 160 +++++++++++- extensions/voice-call/src/providers/twilio.ts | 147 +++++++++-- .../voice-call/src/response-generator.test.ts | 111 ++++++++ .../voice-call/src/response-generator.ts | 148 ++++++++++- .../voice-call/src/telephony-audio.test.ts | 61 +++++ extensions/voice-call/src/telephony-audio.ts | 63 ++++- extensions/voice-call/src/webhook.test.ts | 245 +++++++++++++++++- extensions/voice-call/src/webhook.ts | 137 +++++++--- 20 files changed, 1558 insertions(+), 146 deletions(-) create mode 100644 extensions/voice-call/src/response-generator.test.ts create mode 100644 extensions/voice-call/src/telephony-audio.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index c2e2f7521ac..e178df8b273 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -157,6 +157,7 @@ Docs: https://docs.openclaw.ai - Gateway: harden OpenResponses file-context escaping (#50782) Thanks @YLChen-007 and @joshavant. - LINE: harden Express webhook parsing to verified raw body (#51202) Thanks @gladiator9797 and @joshavant. - Exec: harden host env override handling across gateway and node (#51207) Thanks @gladiator9797 and @joshavant. +- Voice Call: enforce spoken-output contract and fix stream TTS silence regression (#51500) Thanks @joshavant. - xAI/models: rename the bundled Grok 4.20 catalog entries to the GA IDs and normalize saved deprecated beta IDs at runtime so existing configs and sessions keep resolving. (#50772) thanks @Jaaneek ### Fixes diff --git a/docs/plugins/voice-call.md b/docs/plugins/voice-call.md index 51c0f1efccd..1a9af8e3e41 100644 --- a/docs/plugins/voice-call.md +++ b/docs/plugins/voice-call.md @@ -224,6 +224,7 @@ Notes: - **Microsoft speech is ignored for voice calls** (telephony audio needs PCM; the current Microsoft transport does not expose telephony PCM output). - Core TTS is used when Twilio media streaming is enabled; otherwise calls fall back to provider native voices. +- If a Twilio media stream is already active, Voice Call does not fall back to TwiML ``. If telephony TTS is unavailable in that state, the playback request fails instead of mixing two playback paths. ### More examples @@ -308,6 +309,35 @@ Auto-responses use the agent system. Tune with: - `responseSystemPrompt` - `responseTimeoutMs` +### Spoken output contract + +For auto-responses, Voice Call appends a strict spoken-output contract to the system prompt: + +- `{"spoken":"..."}` + +Voice Call then extracts speech text defensively: + +- Ignores payloads marked as reasoning/error content. +- Parses direct JSON, fenced JSON, or inline `"spoken"` keys. +- Falls back to plain text and removes likely planning/meta lead-in paragraphs. + +This keeps spoken playback focused on caller-facing text and avoids leaking planning text into audio. + +### Conversation startup behavior + +For outbound `conversation` calls, first-message handling is tied to live playback state: + +- Barge-in queue clear and auto-response are suppressed only while the initial greeting is actively speaking. +- If initial playback fails, the call returns to `listening` and the initial message remains queued for retry. +- Initial playback for Twilio streaming starts on stream connect without extra delay. + +### Twilio stream disconnect grace + +When a Twilio media stream disconnects, Voice Call waits `2000ms` before auto-ending the call: + +- If the stream reconnects during that window, auto-end is canceled. +- If no stream is re-registered after the grace period, the call is ended to prevent stuck active calls. + ## CLI ```bash diff --git a/extensions/voice-call/README.md b/extensions/voice-call/README.md index 36ab127875e..d885daca5f3 100644 --- a/extensions/voice-call/README.md +++ b/extensions/voice-call/README.md @@ -140,4 +140,8 @@ Actions: - Adds replay protection for Twilio and Plivo webhooks (valid duplicate callbacks are ignored safely). - Twilio speech turns include a per-turn token so stale/replayed callbacks cannot complete a newer turn. - `responseModel` / `responseSystemPrompt` control AI auto-responses. +- Voice-call auto-responses enforce a spoken JSON contract (`{"spoken":"..."}`) and filter reasoning/meta output before playback. +- While a Twilio stream is active, playback does not fall back to TwiML ``; stream-TTS failures fail the playback request. +- Outbound conversation calls suppress barge-in only while the initial greeting is actively speaking, then re-enable normal interruption. +- Twilio stream disconnect auto-end uses a short grace window so quick reconnects do not end the call. - Media streaming requires `ws` and OpenAI Realtime API key. diff --git a/extensions/voice-call/src/manager.notify.test.ts b/extensions/voice-call/src/manager.notify.test.ts index 3252ae027b6..c49a1cafb76 100644 --- a/extensions/voice-call/src/manager.notify.test.ts +++ b/extensions/voice-call/src/manager.notify.test.ts @@ -1,6 +1,36 @@ -import { describe, expect, it } from "vitest"; +import { describe, expect, it, vi } from "vitest"; import { createManagerHarness, FakeProvider } from "./manager.test-harness.js"; +class FailFirstPlayTtsProvider extends FakeProvider { + private failed = false; + + override async playTts(input: Parameters[0]): Promise { + this.playTtsCalls.push(input); + if (!this.failed) { + this.failed = true; + throw new Error("synthetic tts failure"); + } + } +} + +class DelayedPlayTtsProvider extends FakeProvider { + private releasePlayTts: (() => void) | null = null; + readonly playTtsStarted = vi.fn(); + + override async playTts(input: Parameters[0]): Promise { + this.playTtsCalls.push(input); + this.playTtsStarted(); + await new Promise((resolve) => { + this.releasePlayTts = resolve; + }); + } + + releaseCurrentPlayback(): void { + this.releasePlayTts?.(); + this.releasePlayTts = null; + } +} + describe("CallManager notify and mapping", () => { it("upgrades providerCallId mapping when provider ID changes", async () => { const { manager } = await createManagerHarness(); @@ -50,4 +80,211 @@ describe("CallManager notify and mapping", () => { expect(provider.playTtsCalls[0]?.text).toBe("Hello there"); }, ); + + it("speaks initial message on answered for conversation mode with non-stream provider", async () => { + const { manager, provider } = await createManagerHarness({}, new FakeProvider("plivo")); + + const { callId, success } = await manager.initiateCall("+15550000003", undefined, { + message: "Hello from conversation", + mode: "conversation", + }); + expect(success).toBe(true); + + manager.processEvent({ + id: "evt-conversation-plivo", + type: "call.answered", + callId, + providerCallId: "call-uuid", + timestamp: Date.now(), + }); + + await new Promise((resolve) => setTimeout(resolve, 0)); + + expect(provider.playTtsCalls).toHaveLength(1); + expect(provider.playTtsCalls[0]?.text).toBe("Hello from conversation"); + }); + + it("speaks initial message on answered for conversation mode when Twilio streaming is disabled", async () => { + const { manager, provider } = await createManagerHarness( + { streaming: { enabled: false } }, + new FakeProvider("twilio"), + ); + + const { callId, success } = await manager.initiateCall("+15550000004", undefined, { + message: "Twilio non-stream", + mode: "conversation", + }); + expect(success).toBe(true); + + manager.processEvent({ + id: "evt-conversation-twilio-no-stream", + type: "call.answered", + callId, + providerCallId: "call-uuid", + timestamp: Date.now(), + }); + + await new Promise((resolve) => setTimeout(resolve, 0)); + + expect(provider.playTtsCalls).toHaveLength(1); + expect(provider.playTtsCalls[0]?.text).toBe("Twilio non-stream"); + }); + + it("waits for stream connect in conversation mode when Twilio streaming is enabled", async () => { + const { manager, provider } = await createManagerHarness( + { streaming: { enabled: true } }, + new FakeProvider("twilio"), + ); + + const { callId, success } = await manager.initiateCall("+15550000005", undefined, { + message: "Twilio stream", + mode: "conversation", + }); + expect(success).toBe(true); + + manager.processEvent({ + id: "evt-conversation-twilio-stream", + type: "call.answered", + callId, + providerCallId: "call-uuid", + timestamp: Date.now(), + }); + + await new Promise((resolve) => setTimeout(resolve, 0)); + + expect(provider.playTtsCalls).toHaveLength(0); + }); + + it("speaks on answered when Twilio streaming is enabled but stream-connect path is unavailable", async () => { + const twilioProvider = new FakeProvider("twilio"); + twilioProvider.twilioStreamConnectEnabled = false; + const { manager, provider } = await createManagerHarness( + { streaming: { enabled: true } }, + twilioProvider, + ); + + const { callId, success } = await manager.initiateCall("+15550000009", undefined, { + message: "Twilio stream unavailable", + mode: "conversation", + }); + expect(success).toBe(true); + + manager.processEvent({ + id: "evt-conversation-twilio-stream-unavailable", + type: "call.answered", + callId, + providerCallId: "call-uuid", + timestamp: Date.now(), + }); + + await new Promise((resolve) => setTimeout(resolve, 0)); + + expect(provider.playTtsCalls).toHaveLength(1); + expect(provider.playTtsCalls[0]?.text).toBe("Twilio stream unavailable"); + }); + + it("preserves initialMessage after a failed first playback and retries on next trigger", async () => { + const provider = new FailFirstPlayTtsProvider("plivo"); + const { manager } = await createManagerHarness({}, provider); + + const { callId, success } = await manager.initiateCall("+15550000006", undefined, { + message: "Retry me", + mode: "notify", + }); + expect(success).toBe(true); + + manager.processEvent({ + id: "evt-retry-1", + type: "call.answered", + callId, + providerCallId: "call-uuid", + timestamp: Date.now(), + }); + await new Promise((resolve) => setTimeout(resolve, 0)); + + const afterFailure = manager.getCall(callId); + expect(provider.playTtsCalls).toHaveLength(1); + expect(afterFailure?.metadata?.initialMessage).toBe("Retry me"); + expect(afterFailure?.state).toBe("listening"); + + manager.processEvent({ + id: "evt-retry-2", + type: "call.answered", + callId, + providerCallId: "call-uuid", + timestamp: Date.now(), + }); + await new Promise((resolve) => setTimeout(resolve, 0)); + + const afterSuccess = manager.getCall(callId); + expect(provider.playTtsCalls).toHaveLength(2); + expect(afterSuccess?.metadata?.initialMessage).toBeUndefined(); + }); + + it("speaks initial message only once on repeated stream-connect triggers", async () => { + const { manager, provider } = await createManagerHarness( + { streaming: { enabled: true } }, + new FakeProvider("twilio"), + ); + + const { callId, success } = await manager.initiateCall("+15550000007", undefined, { + message: "Stream hello", + mode: "conversation", + }); + expect(success).toBe(true); + + manager.processEvent({ + id: "evt-stream-answered", + type: "call.answered", + callId, + providerCallId: "call-uuid", + timestamp: Date.now(), + }); + await new Promise((resolve) => setTimeout(resolve, 0)); + expect(provider.playTtsCalls).toHaveLength(0); + + await manager.speakInitialMessage("call-uuid"); + await manager.speakInitialMessage("call-uuid"); + + expect(provider.playTtsCalls).toHaveLength(1); + expect(provider.playTtsCalls[0]?.text).toBe("Stream hello"); + }); + + it("prevents concurrent initial-message replays while first playback is in flight", async () => { + const provider = new DelayedPlayTtsProvider("twilio"); + const { manager } = await createManagerHarness({ streaming: { enabled: true } }, provider); + + const { callId, success } = await manager.initiateCall("+15550000008", undefined, { + message: "In-flight hello", + mode: "conversation", + }); + expect(success).toBe(true); + + manager.processEvent({ + id: "evt-stream-answered-concurrent", + type: "call.answered", + callId, + providerCallId: "call-uuid", + timestamp: Date.now(), + }); + await new Promise((resolve) => setTimeout(resolve, 0)); + expect(provider.playTtsCalls).toHaveLength(0); + + const first = manager.speakInitialMessage("call-uuid"); + await vi.waitFor(() => { + expect(provider.playTtsStarted).toHaveBeenCalledTimes(1); + }); + + const second = manager.speakInitialMessage("call-uuid"); + await new Promise((resolve) => setTimeout(resolve, 0)); + expect(provider.playTtsCalls).toHaveLength(1); + + provider.releaseCurrentPlayback(); + await Promise.all([first, second]); + + const call = manager.getCall(callId); + expect(call?.metadata?.initialMessage).toBeUndefined(); + expect(provider.playTtsCalls).toHaveLength(1); + expect(provider.playTtsCalls[0]?.text).toBe("In-flight hello"); + }); }); diff --git a/extensions/voice-call/src/manager.test-harness.ts b/extensions/voice-call/src/manager.test-harness.ts index 957007f3e0a..d10ad098720 100644 --- a/extensions/voice-call/src/manager.test-harness.ts +++ b/extensions/voice-call/src/manager.test-harness.ts @@ -20,6 +20,7 @@ import type { export class FakeProvider implements VoiceCallProvider { readonly name: "plivo" | "twilio"; + twilioStreamConnectEnabled = true; readonly playTtsCalls: PlayTtsInput[] = []; readonly hangupCalls: HangupCallInput[] = []; readonly startListeningCalls: StartListeningInput[] = []; @@ -61,6 +62,10 @@ export class FakeProvider implements VoiceCallProvider { async getCallStatus(_input: GetCallStatusInput): Promise { return this.getCallStatusResult; } + + isConversationStreamConnectEnabled(): boolean { + return this.name === "twilio" && this.twilioStreamConnectEnabled; + } } let storeSeq = 0; diff --git a/extensions/voice-call/src/manager.ts b/extensions/voice-call/src/manager.ts index bf4aad2df23..880d344bcd9 100644 --- a/extensions/voice-call/src/manager.ts +++ b/extensions/voice-call/src/manager.ts @@ -64,6 +64,7 @@ export class CallManager { } >(); private maxDurationTimers = new Map(); + private initialMessageInFlight = new Set(); constructor(config: VoiceCallConfig, storePath?: string) { this.config = config; @@ -256,6 +257,7 @@ export class CallManager { activeTurnCalls: this.activeTurnCalls, transcriptWaiters: this.transcriptWaiters, maxDurationTimers: this.maxDurationTimers, + initialMessageInFlight: this.initialMessageInFlight, onCallAnswered: (call) => { this.maybeSpeakInitialMessageOnAnswered(call); }, @@ -269,6 +271,21 @@ export class CallManager { processManagerEvent(this.getContext(), event); } + private shouldDeferConversationInitialMessageUntilStreamConnect(): boolean { + if (!this.provider || this.provider.name !== "twilio" || !this.config.streaming.enabled) { + return false; + } + + const streamAwareProvider = this.provider as VoiceCallProvider & { + isConversationStreamConnectEnabled?: () => boolean; + }; + if (typeof streamAwareProvider.isConversationStreamConnectEnabled !== "function") { + return false; + } + + return streamAwareProvider.isConversationStreamConnectEnabled(); + } + private maybeSpeakInitialMessageOnAnswered(call: CallRecord): void { const initialMessage = typeof call.metadata?.initialMessage === "string" ? call.metadata.initialMessage.trim() : ""; @@ -277,6 +294,20 @@ export class CallManager { return; } + // Notify mode should speak as soon as the provider reports "answered". + // Conversation mode should defer only when the Twilio stream-connect path + // is actually available; otherwise speak immediately on answered. + const mode = (call.metadata?.mode as string | undefined) ?? "conversation"; + if (mode === "conversation") { + const shouldWaitForStreamConnect = + this.shouldDeferConversationInitialMessageUntilStreamConnect(); + if (shouldWaitForStreamConnect) { + return; + } + } else if (mode !== "notify") { + return; + } + if (!this.provider || !call.providerCallId) { return; } diff --git a/extensions/voice-call/src/manager/context.ts b/extensions/voice-call/src/manager/context.ts index ed14a167e12..b271f1f132e 100644 --- a/extensions/voice-call/src/manager/context.ts +++ b/extensions/voice-call/src/manager/context.ts @@ -28,6 +28,7 @@ export type CallManagerTransientState = { activeTurnCalls: Set; transcriptWaiters: Map; maxDurationTimers: Map; + initialMessageInFlight: Set; }; export type CallManagerHooks = { diff --git a/extensions/voice-call/src/manager/events.test.ts b/extensions/voice-call/src/manager/events.test.ts index 4c91f9ddd26..63a811213e3 100644 --- a/extensions/voice-call/src/manager/events.test.ts +++ b/extensions/voice-call/src/manager/events.test.ts @@ -27,6 +27,7 @@ function createContext(overrides: Partial = {}): CallManager activeTurnCalls: new Set(), transcriptWaiters: new Map(), maxDurationTimers: new Map(), + initialMessageInFlight: new Set(), ...overrides, }; } diff --git a/extensions/voice-call/src/manager/outbound.ts b/extensions/voice-call/src/manager/outbound.ts index 494d7a10b5d..fccf46cf55e 100644 --- a/extensions/voice-call/src/manager/outbound.ts +++ b/extensions/voice-call/src/manager/outbound.ts @@ -39,6 +39,7 @@ type ConversationContext = Pick< | "activeTurnCalls" | "transcriptWaiters" | "maxDurationTimers" + | "initialMessageInFlight" >; type EndCallContext = Pick< @@ -210,8 +211,6 @@ export async function speak( transitionState(call, "speaking"); persistCallRecord(ctx.storePath, call); - addTranscriptEntry(call, "bot", text); - const voice = provider.name === "twilio" ? ctx.config.tts?.openai?.voice : undefined; await provider.playTts({ callId, @@ -220,8 +219,14 @@ export async function speak( voice, }); + addTranscriptEntry(call, "bot", text); + persistCallRecord(ctx.storePath, call); + return { success: true }; } catch (err) { + // A failed playback should not leave the call stuck in speaking state. + transitionState(call, "listening"); + persistCallRecord(ctx.storePath, call); return { success: false, error: err instanceof Error ? err.message : String(err) }; } } @@ -248,29 +253,41 @@ export async function speakInitialMessage( return; } - // Clear so we don't speak it again if the provider reconnects. - if (call.metadata) { - delete call.metadata.initialMessage; - persistCallRecord(ctx.storePath, call); - } - - console.log(`[voice-call] Speaking initial message for call ${call.callId} (mode: ${mode})`); - const result = await speak(ctx, call.callId, initialMessage); - if (!result.success) { - console.warn(`[voice-call] Failed to speak initial message: ${result.error}`); + if (ctx.initialMessageInFlight.has(call.callId)) { + console.log( + `[voice-call] speakInitialMessage: initial message already in flight for ${call.callId}`, + ); return; } + ctx.initialMessageInFlight.add(call.callId); - if (mode === "notify") { - const delaySec = ctx.config.outbound.notifyHangupDelaySec; - console.log(`[voice-call] Notify mode: auto-hangup in ${delaySec}s for call ${call.callId}`); - setTimeout(async () => { - const currentCall = ctx.activeCalls.get(call.callId); - if (currentCall && !TerminalStates.has(currentCall.state)) { - console.log(`[voice-call] Notify mode: hanging up call ${call.callId}`); - await endCall(ctx, call.callId); - } - }, delaySec * 1000); + try { + console.log(`[voice-call] Speaking initial message for call ${call.callId} (mode: ${mode})`); + const result = await speak(ctx, call.callId, initialMessage); + if (!result.success) { + console.warn(`[voice-call] Failed to speak initial message: ${result.error}`); + return; + } + + // Clear only after successful playback so transient provider failures can retry. + if (call.metadata) { + delete call.metadata.initialMessage; + persistCallRecord(ctx.storePath, call); + } + + if (mode === "notify") { + const delaySec = ctx.config.outbound.notifyHangupDelaySec; + console.log(`[voice-call] Notify mode: auto-hangup in ${delaySec}s for call ${call.callId}`); + setTimeout(async () => { + const currentCall = ctx.activeCalls.get(call.callId); + if (currentCall && !TerminalStates.has(currentCall.state)) { + console.log(`[voice-call] Notify mode: hanging up call ${call.callId}`); + await endCall(ctx, call.callId); + } + }, delaySec * 1000); + } + } finally { + ctx.initialMessageInFlight.delete(call.callId); } } diff --git a/extensions/voice-call/src/media-stream.test.ts b/extensions/voice-call/src/media-stream.test.ts index ecd4727318c..80c9f2d727f 100644 --- a/extensions/voice-call/src/media-stream.test.ts +++ b/extensions/voice-call/src/media-stream.test.ts @@ -1,6 +1,6 @@ import { once } from "node:events"; import http from "node:http"; -import { describe, expect, it } from "vitest"; +import { describe, expect, it, vi } from "vitest"; import { WebSocket } from "ws"; import { MediaStreamHandler } from "./media-stream.js"; import type { @@ -163,6 +163,105 @@ describe("MediaStreamHandler TTS queue", () => { }); describe("MediaStreamHandler security hardening", () => { + it("fails sends and closes stream when buffered bytes already exceed the cap", () => { + const handler = new MediaStreamHandler({ + sttProvider: createStubSttProvider(), + }); + const ws = { + readyState: WebSocket.OPEN, + bufferedAmount: 2 * 1024 * 1024, + send: vi.fn(), + close: vi.fn(), + } as unknown as WebSocket; + ( + handler as unknown as { + sessions: Map< + string, + { callId: string; streamSid: string; ws: WebSocket; sttSession: RealtimeSTTSession } + >; + } + ).sessions.set("MZ-backpressure", { + callId: "CA-backpressure", + streamSid: "MZ-backpressure", + ws, + sttSession: createStubSession(), + }); + + const result = handler.sendAudio("MZ-backpressure", Buffer.alloc(160, 0xff)); + + expect(result.sent).toBe(false); + expect(ws.send).not.toHaveBeenCalled(); + expect(ws.close).toHaveBeenCalledWith(1013, "Backpressure: send buffer exceeded"); + }); + + it("fails sends when buffered bytes exceed cap after enqueueing a frame", () => { + const handler = new MediaStreamHandler({ + sttProvider: createStubSttProvider(), + }); + const ws = { + readyState: WebSocket.OPEN, + bufferedAmount: 0, + send: vi.fn(() => { + ( + ws as unknown as { + bufferedAmount: number; + } + ).bufferedAmount = 2 * 1024 * 1024; + }), + close: vi.fn(), + } as unknown as WebSocket; + ( + handler as unknown as { + sessions: Map< + string, + { callId: string; streamSid: string; ws: WebSocket; sttSession: RealtimeSTTSession } + >; + } + ).sessions.set("MZ-overflow", { + callId: "CA-overflow", + streamSid: "MZ-overflow", + ws, + sttSession: createStubSession(), + }); + + const result = handler.sendMark("MZ-overflow", "mark-1"); + + expect(ws.send).toHaveBeenCalledTimes(1); + expect(result.sent).toBe(false); + expect(ws.close).toHaveBeenCalledWith(1013, "Backpressure: send buffer exceeded"); + }); + + it("sanitizes websocket close reason before logging", async () => { + const logSpy = vi.spyOn(console, "log").mockImplementation(() => {}); + const handler = new MediaStreamHandler({ + sttProvider: createStubSttProvider(), + preStartTimeoutMs: 5_000, + shouldAcceptStream: () => true, + }); + const server = await startWsServer(handler); + + try { + const ws = await connectWs(server.url); + ws.close(1000, "forged\nline\r\tentry"); + await waitForClose(ws); + + const closeLog = logSpy.mock.calls + .map((call) => call[0]) + .find( + (value): value is string => + typeof value === "string" && value.includes("[MediaStream] WebSocket closed"), + ); + expect(closeLog).toBeDefined(); + expect(closeLog).not.toContain("\n"); + expect(closeLog).not.toContain("\r"); + expect(closeLog).not.toContain("\t"); + expect(closeLog).toContain("forged line entry"); + } finally { + logSpy.mockRestore(); + await server.close(); + } + }); + it("closes idle pre-start connections after timeout", async () => { const shouldAcceptStreamCalls: Array<{ callId: string; streamSid: string; token?: string }> = []; diff --git a/extensions/voice-call/src/media-stream.ts b/extensions/voice-call/src/media-stream.ts index 11fa0109c12..c6cd5477e8e 100644 --- a/extensions/voice-call/src/media-stream.ts +++ b/extensions/voice-call/src/media-stream.ts @@ -40,7 +40,7 @@ export interface MediaStreamConfig { /** Callback when speech starts (barge-in) */ onSpeechStart?: (callId: string) => void; /** Callback when stream disconnects */ - onDisconnect?: (callId: string) => void; + onDisconnect?: (callId: string, streamSid: string) => void; } /** @@ -60,6 +60,13 @@ type TtsQueueEntry = { reject: (error: unknown) => void; }; +type StreamSendResult = { + sent: boolean; + readyState?: number; + bufferedBeforeBytes: number; + bufferedAfterBytes: number; +}; + type PendingConnection = { ip: string; timeout: ReturnType; @@ -69,6 +76,19 @@ const DEFAULT_PRE_START_TIMEOUT_MS = 5000; const DEFAULT_MAX_PENDING_CONNECTIONS = 32; const DEFAULT_MAX_PENDING_CONNECTIONS_PER_IP = 4; const DEFAULT_MAX_CONNECTIONS = 128; +const MAX_WS_BUFFERED_BYTES = 1024 * 1024; +const CLOSE_REASON_LOG_MAX_CHARS = 120; + +function sanitizeLogText(value: string, maxChars: number): string { + const sanitized = value + .replace(/[\u0000-\u001f\u007f]/g, " ") + .replace(/\s+/g, " ") + .trim(); + if (sanitized.length <= maxChars) { + return sanitized; + } + return `${sanitized.slice(0, maxChars)}...`; +} /** * Manages WebSocket connections for Twilio media streams. @@ -170,7 +190,12 @@ export class MediaStreamHandler { } }); - ws.on("close", () => { + ws.on("close", (code, reason) => { + const rawReason = Buffer.isBuffer(reason) ? reason.toString("utf8") : String(reason || ""); + const reasonText = sanitizeLogText(rawReason, CLOSE_REASON_LOG_MAX_CHARS); + console.log( + `[MediaStream] WebSocket closed (code: ${code}, reason: ${reasonText || "none"})`, + ); this.clearPendingConnection(ws); if (session) { this.handleStop(session); @@ -258,7 +283,7 @@ export class MediaStreamHandler { this.clearTtsState(session.streamSid); session.sttSession.close(); this.sessions.delete(session.streamSid); - this.config.onDisconnect?.(session.callId); + this.config.onDisconnect?.(session.callId, session.streamSid); } private getStreamToken(request: IncomingMessage): string | undefined { @@ -347,17 +372,78 @@ export class MediaStreamHandler { /** * Send a message to a stream's WebSocket if available. */ - private sendToStream(streamSid: string, message: unknown): void { - const session = this.getOpenSession(streamSid); - session?.ws.send(JSON.stringify(message)); + private sendToStream(streamSid: string, message: unknown): StreamSendResult { + const session = this.sessions.get(streamSid); + if (!session) { + return { + sent: false, + bufferedBeforeBytes: 0, + bufferedAfterBytes: 0, + }; + } + + const readyState = session.ws.readyState; + const bufferedBeforeBytes = session.ws.bufferedAmount; + if (readyState !== WebSocket.OPEN) { + return { + sent: false, + readyState, + bufferedBeforeBytes, + bufferedAfterBytes: session.ws.bufferedAmount, + }; + } + if (bufferedBeforeBytes > MAX_WS_BUFFERED_BYTES) { + try { + session.ws.close(1013, "Backpressure: send buffer exceeded"); + } catch { + // Best-effort close; caller still receives sent:false. + } + return { + sent: false, + readyState, + bufferedBeforeBytes, + bufferedAfterBytes: session.ws.bufferedAmount, + }; + } + + try { + session.ws.send(JSON.stringify(message)); + const bufferedAfterBytes = session.ws.bufferedAmount; + if (bufferedAfterBytes > MAX_WS_BUFFERED_BYTES) { + try { + session.ws.close(1013, "Backpressure: send buffer exceeded"); + } catch { + // Best-effort close; caller still receives sent:false. + } + return { + sent: false, + readyState, + bufferedBeforeBytes, + bufferedAfterBytes, + }; + } + return { + sent: true, + readyState, + bufferedBeforeBytes, + bufferedAfterBytes, + }; + } catch { + return { + sent: false, + readyState, + bufferedBeforeBytes, + bufferedAfterBytes: session.ws.bufferedAmount, + }; + } } /** * Send audio to a specific stream (for TTS playback). * Audio should be mu-law encoded at 8kHz mono. */ - sendAudio(streamSid: string, muLawAudio: Buffer): void { - this.sendToStream(streamSid, { + sendAudio(streamSid: string, muLawAudio: Buffer): StreamSendResult { + return this.sendToStream(streamSid, { event: "media", streamSid, media: { payload: muLawAudio.toString("base64") }, @@ -367,8 +453,8 @@ export class MediaStreamHandler { /** * Send a mark event to track audio playback position. */ - sendMark(streamSid: string, name: string): void { - this.sendToStream(streamSid, { + sendMark(streamSid: string, name: string): StreamSendResult { + return this.sendToStream(streamSid, { event: "mark", streamSid, mark: { name }, @@ -378,8 +464,8 @@ export class MediaStreamHandler { /** * Clear audio buffer (interrupt playback). */ - clearAudio(streamSid: string): void { - this.sendToStream(streamSid, { event: "clear", streamSid }); + clearAudio(streamSid: string): StreamSendResult { + return this.sendToStream(streamSid, { event: "clear", streamSid }); } /** @@ -412,7 +498,7 @@ export class MediaStreamHandler { /** * Clear TTS queue and interrupt current playback (barge-in). */ - clearTtsQueue(streamSid: string): void { + clearTtsQueue(streamSid: string, _reason = "unspecified"): void { const queue = this.getTtsQueue(streamSid); queue.length = 0; this.ttsActiveControllers.get(streamSid)?.abort(); diff --git a/extensions/voice-call/src/providers/tts-openai.ts b/extensions/voice-call/src/providers/tts-openai.ts index c16b20c0a66..9ed4e92e796 100644 --- a/extensions/voice-call/src/providers/tts-openai.ts +++ b/extensions/voice-call/src/providers/tts-openai.ts @@ -1,5 +1,5 @@ import { resolveOpenAITtsInstructions } from "../../api.js"; -import { pcmToMulaw } from "../telephony-audio.js"; +import { convertPcmToMulaw8k } from "../telephony-audio.js"; /** * OpenAI TTS Provider @@ -145,51 +145,11 @@ export class OpenAITTSProvider { // Get raw PCM from OpenAI (24kHz, 16-bit signed LE, mono) const pcm24k = await this.synthesize(text); - // Resample from 24kHz to 8kHz - const pcm8k = resample24kTo8k(pcm24k); - - // Encode to mu-law - return pcmToMulaw(pcm8k); + // Convert from 24kHz PCM to Twilio-compatible 8kHz mu-law + return convertPcmToMulaw8k(pcm24k, 24000); } } -/** - * Resample 24kHz PCM to 8kHz using linear interpolation. - * Input/output: 16-bit signed little-endian mono. - */ -function resample24kTo8k(input: Buffer): Buffer { - const inputSamples = input.length / 2; - const outputSamples = Math.floor(inputSamples / 3); - const output = Buffer.alloc(outputSamples * 2); - - for (let i = 0; i < outputSamples; i++) { - // Calculate position in input (3:1 ratio) - const srcPos = i * 3; - const srcIdx = srcPos * 2; - - if (srcIdx + 3 < input.length) { - // Linear interpolation between samples - const s0 = input.readInt16LE(srcIdx); - const s1 = input.readInt16LE(srcIdx + 2); - const frac = srcPos % 1 || 0; - const sample = Math.round(s0 + frac * (s1 - s0)); - output.writeInt16LE(clamp16(sample), i * 2); - } else { - // Last sample - output.writeInt16LE(input.readInt16LE(srcIdx), i * 2); - } - } - - return output; -} - -/** - * Clamp value to 16-bit signed integer range. - */ -function clamp16(value: number): number { - return Math.max(-32768, Math.min(32767, value)); -} - /** * Convert 8-bit mu-law to 16-bit linear PCM. * Useful for decoding incoming audio. diff --git a/extensions/voice-call/src/providers/twilio.test.ts b/extensions/voice-call/src/providers/twilio.test.ts index 4e23783b93a..076b5c20222 100644 --- a/extensions/voice-call/src/providers/twilio.test.ts +++ b/extensions/voice-call/src/providers/twilio.test.ts @@ -1,4 +1,4 @@ -import { describe, expect, it } from "vitest"; +import { describe, expect, it, vi } from "vitest"; import type { WebhookContext } from "../types.js"; import { TwilioProvider } from "./twilio.js"; @@ -188,4 +188,162 @@ describe("TwilioProvider", () => { expect(event?.type).toBe("call.speech"); expect(event?.turnToken).toBe("turn-xyz"); }); + + it("fails when an active stream exists but telephony TTS is unavailable", async () => { + const provider = createProvider(); + const apiRequest = vi.fn< + ( + endpoint: string, + params: Record, + options?: { allowNotFound?: boolean }, + ) => Promise + >(async () => ({})); + ( + provider as unknown as { + apiRequest: ( + endpoint: string, + params: Record, + options?: { allowNotFound?: boolean }, + ) => Promise; + } + ).apiRequest = apiRequest; + ( + provider as unknown as { + callWebhookUrls: Map; + } + ).callWebhookUrls.set("CA-stream", "https://example.ngrok.app/voice/twilio"); + provider.registerCallStream("CA-stream", "MZ-stream"); + + await expect( + provider.playTts({ + callId: "call-stream", + providerCallId: "CA-stream", + text: "Hello stream", + }), + ).rejects.toThrow("refusing TwiML fallback"); + expect(apiRequest).not.toHaveBeenCalled(); + }); + + it("falls back to TwiML when no active stream exists and telephony TTS is unavailable", async () => { + const provider = createProvider(); + const apiRequest = vi.fn< + ( + endpoint: string, + params: Record, + options?: { allowNotFound?: boolean }, + ) => Promise + >(async () => ({})); + ( + provider as unknown as { + apiRequest: ( + endpoint: string, + params: Record, + options?: { allowNotFound?: boolean }, + ) => Promise; + } + ).apiRequest = apiRequest; + ( + provider as unknown as { + callWebhookUrls: Map; + } + ).callWebhookUrls.set("CA-nostream", "https://example.ngrok.app/voice/twilio"); + + await expect( + provider.playTts({ + callId: "call-nostream", + providerCallId: "CA-nostream", + text: "Hello TwiML", + }), + ).resolves.toBeUndefined(); + expect(apiRequest).toHaveBeenCalledTimes(1); + const call = apiRequest.mock.calls[0]!; + const endpoint = call[0]; + const params = call[1] as { Twiml?: string }; + expect(endpoint).toBe("/Calls/CA-nostream.json"); + expect(params.Twiml).toContain(" { + const provider = createProvider(); + provider.registerCallStream("CA-reconnect", "MZ-new"); + + provider.unregisterCallStream("CA-reconnect", "MZ-old"); + expect(provider.hasRegisteredStream("CA-reconnect")).toBe(true); + + provider.unregisterCallStream("CA-reconnect", "MZ-new"); + expect(provider.hasRegisteredStream("CA-reconnect")).toBe(false); + }); + + it("times out telephony synthesis in stream mode and does not send completion mark", async () => { + vi.useFakeTimers(); + try { + const provider = createProvider(); + provider.registerCallStream("CA-timeout", "MZ-timeout"); + + const sendAudio = vi.fn(); + const sendMark = vi.fn(); + const mediaStreamHandler = { + queueTts: async ( + _streamSid: string, + playFn: (signal: AbortSignal) => Promise, + ): Promise => { + await playFn(new AbortController().signal); + }, + sendAudio, + sendMark, + }; + + provider.setMediaStreamHandler(mediaStreamHandler as never); + provider.setTTSProvider({ + synthesizeForTelephony: async () => await new Promise(() => {}), + }); + + const playExpectation = expect( + provider.playTts({ + callId: "call-timeout", + providerCallId: "CA-timeout", + text: "Timeout me", + }), + ).rejects.toThrow("Telephony TTS synthesis timed out"); + await vi.advanceTimersByTimeAsync(8_100); + await playExpectation; + expect(sendAudio).toHaveBeenCalled(); + expect(sendMark).not.toHaveBeenCalled(); + } finally { + vi.useRealTimers(); + } + }); + + it("fails stream playback when all audio sends and completion mark are dropped", async () => { + const provider = createProvider(); + provider.registerCallStream("CA-dropped", "MZ-dropped"); + + const sendAudio = vi.fn(() => ({ sent: false })); + const sendMark = vi.fn(() => ({ sent: false })); + const mediaStreamHandler = { + queueTts: async ( + _streamSid: string, + playFn: (signal: AbortSignal) => Promise, + ): Promise => { + await playFn(new AbortController().signal); + }, + sendAudio, + sendMark, + }; + + provider.setMediaStreamHandler(mediaStreamHandler as never); + provider.setTTSProvider({ + synthesizeForTelephony: async () => Buffer.alloc(320), + }); + + await expect( + provider.playTts({ + callId: "call-dropped", + providerCallId: "CA-dropped", + text: "Dropped audio", + }), + ).rejects.toThrow("Telephony stream playback failed"); + expect(sendAudio).toHaveBeenCalled(); + expect(sendMark).toHaveBeenCalledTimes(1); + }); }); diff --git a/extensions/voice-call/src/providers/twilio.ts b/extensions/voice-call/src/providers/twilio.ts index e09367eb3fa..231ce3150d0 100644 --- a/extensions/voice-call/src/providers/twilio.ts +++ b/extensions/voice-call/src/providers/twilio.ts @@ -31,6 +31,10 @@ import { twilioApiRequest } from "./twilio/api.js"; import { decideTwimlResponse, readTwimlRequestView } from "./twilio/twiml-policy.js"; import { verifyTwilioProviderWebhook } from "./twilio/webhook.js"; +type StreamSendResult = { + sent: boolean; +}; + function createTwilioRequestDedupeKey(ctx: WebhookContext, verifiedRequestKey?: string): string { if (verifiedRequestKey) { return verifiedRequestKey; @@ -76,6 +80,7 @@ export interface TwilioProviderOptions { export class TwilioProvider implements VoiceCallProvider { readonly name = "twilio" as const; + private static readonly TTS_SYNTH_TIMEOUT_MS = 8000; private readonly accountSid: string; private readonly authToken: string; @@ -172,11 +177,29 @@ export class TwilioProvider implements VoiceCallProvider { this.callStreamMap.set(callSid, streamSid); } - unregisterCallStream(callSid: string): void { + hasRegisteredStream(callSid: string): boolean { + return this.callStreamMap.has(callSid); + } + + unregisterCallStream(callSid: string, streamSid?: string): void { + const currentStreamSid = this.callStreamMap.get(callSid); + if (!currentStreamSid) { + if (!streamSid) { + this.activeStreamCalls.delete(callSid); + } + return; + } + if (streamSid && currentStreamSid !== streamSid) { + return; + } this.callStreamMap.delete(callSid); this.activeStreamCalls.delete(callSid); } + isConversationStreamConnectEnabled(): boolean { + return Boolean(this.mediaStreamHandler && this.getStreamUrl()); + } + isValidStreamToken(callSid: string, token?: string): boolean { const expected = this.streamAuthTokens.get(callSid); if (!expected || !token) { @@ -194,11 +217,12 @@ export class TwilioProvider implements VoiceCallProvider { * Clear TTS queue for a call (barge-in). * Used when user starts speaking to interrupt current TTS playback. */ - clearTtsQueue(callSid: string): void { + clearTtsQueue(callSid: string, reason = "unspecified"): void { const streamSid = this.callStreamMap.get(callSid); - if (streamSid && this.mediaStreamHandler) { - this.mediaStreamHandler.clearTtsQueue(streamSid); + if (!streamSid || !this.mediaStreamHandler) { + return; } + this.mediaStreamHandler.clearTtsQueue(streamSid, reason); } /** @@ -550,28 +574,32 @@ export class TwilioProvider implements VoiceCallProvider { * Play TTS audio via Twilio. * * Two modes: - * 1. Core TTS + Media Streams: If TTS provider and media stream are available, - * generates audio via core TTS and streams it through WebSocket (preferred). - * 2. TwiML : Falls back to Twilio's native TTS with Polly voices. - * Note: This may not work on all Twilio accounts. + * 1. Core TTS + Media Streams: when an active stream exists, stream playback is required. + * If telephony TTS is unavailable in that state, playback fails rather than mixing paths. + * 2. TwiML : fallback only when there is no active stream for the call. */ async playTts(input: PlayTtsInput): Promise { - // Try telephony TTS via media stream first (if configured) const streamSid = this.callStreamMap.get(input.providerCallId); - if (this.ttsProvider && this.mediaStreamHandler && streamSid) { + if (streamSid) { + if (!this.ttsProvider || !this.mediaStreamHandler) { + throw new Error( + "Telephony TTS unavailable while media stream is active; refusing TwiML fallback", + ); + } + try { await this.playTtsViaStream(input.text, streamSid); return; } catch (err) { console.warn( - `[voice-call] Telephony TTS failed, falling back to Twilio :`, + `[voice-call] Telephony TTS failed:`, err instanceof Error ? err.message : err, ); - // Fall through to TwiML fallback + throw err instanceof Error ? err : new Error(String(err)); } } - // Fall back to TwiML (may not work on all accounts) + // Fall back to TwiML only when no active stream exists. const webhookUrl = this.callWebhookUrls.get(input.providerCallId); if (!webhookUrl) { throw new Error("Missing webhook URL for this call (provider state not initialized)"); @@ -608,28 +636,111 @@ export class TwilioProvider implements VoiceCallProvider { // Stream audio in 20ms chunks (160 bytes at 8kHz mu-law) const CHUNK_SIZE = 160; const CHUNK_DELAY_MS = 20; + const SILENCE_CHUNK = Buffer.alloc(CHUNK_SIZE, 0xff); const handler = this.mediaStreamHandler; const ttsProvider = this.ttsProvider; + + const normalizeSendResult = (raw: unknown): StreamSendResult => { + if (!raw || typeof raw !== "object") { + return { sent: true }; + } + const typed = raw as { + sent?: unknown; + }; + return { + sent: typed.sent === undefined ? true : Boolean(typed.sent), + }; + }; + + const sendAudioChunk = (audio: Buffer): StreamSendResult => { + const raw = (handler as { sendAudio: (sid: string, chunk: Buffer) => unknown }).sendAudio( + streamSid, + audio, + ); + return normalizeSendResult(raw); + }; + + const sendPlaybackMark = (name: string): StreamSendResult => { + const raw = (handler as { sendMark: (sid: string, markName: string) => unknown }).sendMark( + streamSid, + name, + ); + return normalizeSendResult(raw); + }; + await handler.queueTts(streamSid, async (signal) => { + const sendKeepAlive = () => { + sendAudioChunk(SILENCE_CHUNK); + }; + sendKeepAlive(); + const keepAlive = setInterval(() => { + if (!signal.aborted) { + sendKeepAlive(); + } + }, CHUNK_DELAY_MS); + // Generate audio with core TTS (returns mu-law at 8kHz) - const muLawAudio = await ttsProvider.synthesizeForTelephony(text); + let muLawAudio: Buffer; + let synthTimeout: ReturnType | null = null; + try { + const synthPromise = ttsProvider.synthesizeForTelephony(text); + const timeoutPromise = new Promise((_, reject) => { + synthTimeout = setTimeout(() => { + reject( + new Error( + `Telephony TTS synthesis timed out after ${TwilioProvider.TTS_SYNTH_TIMEOUT_MS}ms`, + ), + ); + }, TwilioProvider.TTS_SYNTH_TIMEOUT_MS); + }); + muLawAudio = await Promise.race([synthPromise, timeoutPromise]); + } finally { + if (synthTimeout) { + clearTimeout(synthTimeout); + } + clearInterval(keepAlive); + } + + let chunkAttempts = 0; + let chunkDelivered = 0; + let nextChunkDueAt = Date.now() + CHUNK_DELAY_MS; for (const chunk of chunkAudio(muLawAudio, CHUNK_SIZE)) { if (signal.aborted) { break; } - handler.sendAudio(streamSid, chunk); + chunkAttempts += 1; + const chunkResult = sendAudioChunk(chunk); + if (chunkResult.sent) { + chunkDelivered += 1; + } - // Pace the audio to match real-time playback - await new Promise((resolve) => setTimeout(resolve, CHUNK_DELAY_MS)); + // Drift-corrected pacing: schedule against an absolute clock to avoid cumulative delay. + const waitMs = nextChunkDueAt - Date.now(); + if (waitMs > 0) { + await new Promise((resolve) => setTimeout(resolve, Math.ceil(waitMs))); + } + nextChunkDueAt += CHUNK_DELAY_MS; if (signal.aborted) { break; } } + let markSent = true; if (!signal.aborted) { // Send a mark to track when audio finishes - handler.sendMark(streamSid, `tts-${Date.now()}`); + markSent = sendPlaybackMark(`tts-${Date.now()}`).sent; + } + + if (!signal.aborted && chunkAttempts > 0 && (chunkDelivered === 0 || !markSent)) { + const failures: string[] = []; + if (chunkDelivered === 0) { + failures.push("no audio chunks delivered"); + } + if (!markSent) { + failures.push("completion mark not delivered"); + } + throw new Error(`Telephony stream playback failed: ${failures.join("; ")}`); } }); } diff --git a/extensions/voice-call/src/response-generator.test.ts b/extensions/voice-call/src/response-generator.test.ts new file mode 100644 index 00000000000..f43814a9b40 --- /dev/null +++ b/extensions/voice-call/src/response-generator.test.ts @@ -0,0 +1,111 @@ +import { describe, expect, it, vi } from "vitest"; +import { VoiceCallConfigSchema } from "./config.js"; +import type { CoreAgentDeps, CoreConfig } from "./core-bridge.js"; +import { generateVoiceResponse } from "./response-generator.js"; + +function createAgentRuntime(payloads: Array>) { + const runEmbeddedPiAgent = vi.fn(async () => ({ + payloads, + meta: { durationMs: 12, aborted: false }, + })); + + const runtime = { + defaults: { + provider: "together", + model: "Qwen/Qwen2.5-7B-Instruct-Turbo", + }, + resolveAgentDir: () => "/tmp/openclaw/agents/main", + resolveAgentWorkspaceDir: () => "/tmp/openclaw/workspace/main", + resolveAgentIdentity: () => ({ name: "tester" }), + resolveThinkingDefault: () => "off", + resolveAgentTimeoutMs: () => 30_000, + ensureAgentWorkspace: async () => {}, + runEmbeddedPiAgent, + session: { + resolveStorePath: () => "/tmp/openclaw/sessions.json", + loadSessionStore: () => ({}), + saveSessionStore: async () => {}, + resolveSessionFilePath: () => "/tmp/openclaw/sessions/session.jsonl", + }, + } as unknown as CoreAgentDeps; + + return { runtime, runEmbeddedPiAgent }; +} + +async function runGenerateVoiceResponse( + payloads: Array>, + overrides?: { + runtime?: CoreAgentDeps; + transcript?: Array<{ speaker: "user" | "bot"; text: string }>; + }, +) { + const voiceConfig = VoiceCallConfigSchema.parse({ + responseTimeoutMs: 5000, + }); + const coreConfig = {} as CoreConfig; + const runtime = overrides?.runtime ?? createAgentRuntime(payloads).runtime; + + const result = await generateVoiceResponse({ + voiceConfig, + coreConfig, + agentRuntime: runtime, + callId: "call-123", + from: "+15550001111", + transcript: overrides?.transcript ?? [{ speaker: "user", text: "hello there" }], + userMessage: "hello there", + }); + + return { result }; +} + +describe("generateVoiceResponse", () => { + it("suppresses reasoning payloads and reads structured spoken output", async () => { + const { runtime, runEmbeddedPiAgent } = createAgentRuntime([ + { text: "Reasoning: hidden", isReasoning: true }, + { text: '{"spoken":"Hello from JSON."}' }, + ]); + const { result } = await runGenerateVoiceResponse([], { runtime }); + + expect(result.text).toBe("Hello from JSON."); + expect(runEmbeddedPiAgent).toHaveBeenCalledTimes(1); + const calls = runEmbeddedPiAgent.mock.calls as unknown[][]; + const firstCall = calls[0]; + expect(firstCall).toBeDefined(); + const args = firstCall?.[0] as { extraSystemPrompt?: string } | undefined; + expect(args?.extraSystemPrompt).toContain('{"spoken":"..."}'); + }); + + it("extracts spoken text from fenced JSON", async () => { + const { result } = await runGenerateVoiceResponse([ + { text: '```json\n{"spoken":"Fenced JSON works."}\n```' }, + ]); + + expect(result.text).toBe("Fenced JSON works."); + }); + + it("returns silence for an explicit empty spoken contract response", async () => { + const { result } = await runGenerateVoiceResponse([{ text: '{"spoken":""}' }]); + + expect(result.text).toBeNull(); + }); + + it("strips leading planning text when model returns plain text", async () => { + const { result } = await runGenerateVoiceResponse([ + { + text: + "The user responded with short text. I should keep the response concise.\n\n" + + "Sounds good. I can help with the next step whenever you are ready.", + }, + ]); + + expect(result.text).toBe("Sounds good. I can help with the next step whenever you are ready."); + }); + + it("keeps plain conversational output when no JSON contract is followed", async () => { + const { result } = await runGenerateVoiceResponse([ + { text: "Absolutely. Tell me what you want to do next." }, + ]); + + expect(result.text).toBe("Absolutely. Tell me what you want to do next."); + }); +}); diff --git a/extensions/voice-call/src/response-generator.ts b/extensions/voice-call/src/response-generator.ts index d1903410f86..9fa62c0ee98 100644 --- a/extensions/voice-call/src/response-generator.ts +++ b/extensions/voice-call/src/response-generator.ts @@ -30,6 +30,145 @@ export type VoiceResponseResult = { error?: string; }; +type VoiceResponsePayload = { + text?: string; + isError?: boolean; + isReasoning?: boolean; +}; + +const VOICE_SPOKEN_OUTPUT_CONTRACT = [ + "Output format requirements:", + '- Return only valid JSON in this exact shape: {"spoken":"..."}', + "- Do not include markdown, code fences, planning text, or extra keys.", + '- Put exactly what should be spoken to the caller into "spoken".', + '- If there is nothing to say, return {"spoken":""}.', +].join("\n"); + +function normalizeSpokenText(value: string): string | null { + const normalized = value.replace(/\s+/g, " ").trim(); + return normalized.length > 0 ? normalized : null; +} + +function tryParseSpokenJson(text: string): string | null { + const candidates: string[] = []; + const trimmed = text.trim(); + if (!trimmed) { + return null; + } + candidates.push(trimmed); + + const fenced = trimmed.match(/^```(?:json)?\s*([\s\S]*?)\s*```$/i); + if (fenced?.[1]) { + candidates.push(fenced[1]); + } + + const firstBrace = trimmed.indexOf("{"); + const lastBrace = trimmed.lastIndexOf("}"); + if (firstBrace >= 0 && lastBrace > firstBrace) { + candidates.push(trimmed.slice(firstBrace, lastBrace + 1)); + } + + for (const candidate of candidates) { + try { + const parsed = JSON.parse(candidate) as { spoken?: unknown }; + if (typeof parsed?.spoken !== "string") { + continue; + } + return normalizeSpokenText(parsed.spoken) ?? ""; + } catch { + // Continue trying other candidates. + } + } + + const inlineSpokenMatch = trimmed.match(/"spoken"\s*:\s*"((?:[^"\\]|\\.)*)"/i); + if (!inlineSpokenMatch) { + return null; + } + + try { + const decoded = JSON.parse(`"${inlineSpokenMatch[1] ?? ""}"`) as string; + return normalizeSpokenText(decoded) ?? ""; + } catch { + return null; + } +} + +function isLikelyMetaReasoningParagraph(paragraph: string): boolean { + const lower = paragraph.toLowerCase(); + if (!lower) { + return false; + } + + if (lower.startsWith("thinking process")) { + return true; + } + if (lower.startsWith("reasoning:") || lower.startsWith("analysis:")) { + return true; + } + if ( + lower.startsWith("the user ") && + (lower.includes("i should") || lower.includes("i need to") || lower.includes("i will")) + ) { + return true; + } + if ( + lower.includes("this is a natural continuation of the conversation") || + lower.includes("keep the conversation flowing") + ) { + return true; + } + + return false; +} + +function sanitizePlainSpokenText(text: string): string | null { + const withoutCodeFences = text.replace(/```[\s\S]*?```/g, " ").trim(); + if (!withoutCodeFences) { + return null; + } + + const paragraphs = withoutCodeFences + .split(/\n\s*\n+/) + .map((paragraph) => paragraph.trim()) + .filter(Boolean); + + while (paragraphs.length > 1 && isLikelyMetaReasoningParagraph(paragraphs[0])) { + paragraphs.shift(); + } + + return normalizeSpokenText(paragraphs.join(" ")); +} + +function extractSpokenTextFromPayloads(payloads: VoiceResponsePayload[]): string | null { + const spokenSegments: string[] = []; + + for (const payload of payloads) { + if (payload.isError || payload.isReasoning) { + continue; + } + + const rawText = payload.text?.trim() ?? ""; + if (!rawText) { + continue; + } + + const structured = tryParseSpokenJson(rawText); + if (structured !== null) { + if (structured.length > 0) { + spokenSegments.push(structured); + } + continue; + } + + const plain = sanitizePlainSpokenText(rawText); + if (plain) { + spokenSegments.push(plain); + } + } + + return spokenSegments.length > 0 ? spokenSegments.join(" ").trim() : null; +} + /** * Generate a voice response using the embedded Pi agent with full tool support. * Uses the same agent infrastructure as messaging for consistent behavior. @@ -103,6 +242,7 @@ export async function generateVoiceResponse( .join("\n"); extraSystemPrompt = `${basePrompt}\n\nConversation so far:\n${history}`; } + extraSystemPrompt = `${extraSystemPrompt}\n\n${VOICE_SPOKEN_OUTPUT_CONTRACT}`; // Resolve timeout const timeoutMs = voiceConfig.responseTimeoutMs ?? agentRuntime.resolveAgentTimeoutMs({ cfg }); @@ -128,13 +268,7 @@ export async function generateVoiceResponse( agentDir, }); - // Extract text from payloads - const texts = (result.payloads ?? []) - .filter((p) => p.text && !p.isError) - .map((p) => p.text?.trim()) - .filter(Boolean); - - const text = texts.join(" ") || null; + const text = extractSpokenTextFromPayloads((result.payloads ?? []) as VoiceResponsePayload[]); if (!text && result.meta?.aborted) { return { text: null, error: "Response generation was aborted" }; diff --git a/extensions/voice-call/src/telephony-audio.test.ts b/extensions/voice-call/src/telephony-audio.test.ts new file mode 100644 index 00000000000..ec86deb2551 --- /dev/null +++ b/extensions/voice-call/src/telephony-audio.test.ts @@ -0,0 +1,61 @@ +import { describe, expect, it } from "vitest"; +import { convertPcmToMulaw8k, resamplePcmTo8k } from "./telephony-audio.js"; + +function makeSinePcm( + sampleRate: number, + frequencyHz: number, + durationSeconds: number, + amplitude = 12_000, +): Buffer { + const samples = Math.floor(sampleRate * durationSeconds); + const output = Buffer.alloc(samples * 2); + for (let i = 0; i < samples; i++) { + const value = Math.round(Math.sin((2 * Math.PI * frequencyHz * i) / sampleRate) * amplitude); + output.writeInt16LE(value, i * 2); + } + return output; +} + +function rmsPcm(buffer: Buffer): number { + const samples = Math.floor(buffer.length / 2); + if (samples === 0) { + return 0; + } + let sum = 0; + for (let i = 0; i < samples; i++) { + const sample = buffer.readInt16LE(i * 2); + sum += sample * sample; + } + return Math.sqrt(sum / samples); +} + +describe("telephony-audio resamplePcmTo8k", () => { + it("returns identical buffer for 8k input", () => { + const pcm8k = makeSinePcm(8_000, 1_000, 0.2); + const resampled = resamplePcmTo8k(pcm8k, 8_000); + expect(resampled).toBe(pcm8k); + }); + + it("preserves low-frequency speech-band energy when downsampling", () => { + const input = makeSinePcm(48_000, 1_000, 0.6); + const output = resamplePcmTo8k(input, 48_000); + expect(output.length).toBe(9_600); + expect(rmsPcm(output)).toBeGreaterThan(7_500); + }); + + it("attenuates out-of-band high frequencies before 8k telephony conversion", () => { + const lowTone = resamplePcmTo8k(makeSinePcm(48_000, 1_000, 0.6), 48_000); + const highTone = resamplePcmTo8k(makeSinePcm(48_000, 6_000, 0.6), 48_000); + const ratio = rmsPcm(highTone) / rmsPcm(lowTone); + expect(ratio).toBeLessThan(0.1); + }); +}); + +describe("telephony-audio convertPcmToMulaw8k", () => { + it("converts to 8k mu-law frame length", () => { + const input = makeSinePcm(24_000, 1_000, 0.5); + const mulaw = convertPcmToMulaw8k(input, 24_000); + // 0.5s @ 8kHz => 4000 8-bit samples + expect(mulaw.length).toBe(4_000); + }); +}); diff --git a/extensions/voice-call/src/telephony-audio.ts b/extensions/voice-call/src/telephony-audio.ts index 6b1ef1ec3e6..e87111c3880 100644 --- a/extensions/voice-call/src/telephony-audio.ts +++ b/extensions/voice-call/src/telephony-audio.ts @@ -1,11 +1,58 @@ const TELEPHONY_SAMPLE_RATE = 8000; +const RESAMPLE_FILTER_TAPS = 31; +const RESAMPLE_CUTOFF_GUARD = 0.94; function clamp16(value: number): number { return Math.max(-32768, Math.min(32767, value)); } +function sinc(x: number): number { + if (x === 0) { + return 1; + } + return Math.sin(Math.PI * x) / (Math.PI * x); +} + /** - * Resample 16-bit PCM (little-endian mono) to 8kHz using linear interpolation. + * Build a finite low-pass kernel centered on `srcPos`. + * The kernel is windowed (Hann) to reduce ringing artifacts. + */ +function sampleBandlimited( + input: Buffer, + inputSamples: number, + srcPos: number, + cutoffCyclesPerSample: number, +): number { + const half = Math.floor(RESAMPLE_FILTER_TAPS / 2); + const center = Math.floor(srcPos); + let weighted = 0; + let weightSum = 0; + + for (let tap = -half; tap <= half; tap++) { + const sampleIndex = center + tap; + if (sampleIndex < 0 || sampleIndex >= inputSamples) { + continue; + } + + const distance = sampleIndex - srcPos; + const lowPass = 2 * cutoffCyclesPerSample * sinc(2 * cutoffCyclesPerSample * distance); + const tapIndex = tap + half; + const window = 0.5 - 0.5 * Math.cos((2 * Math.PI * tapIndex) / (RESAMPLE_FILTER_TAPS - 1)); + const coeff = lowPass * window; + weighted += input.readInt16LE(sampleIndex * 2) * coeff; + weightSum += coeff; + } + + if (weightSum === 0) { + const nearest = Math.max(0, Math.min(inputSamples - 1, Math.round(srcPos))); + return input.readInt16LE(nearest * 2); + } + + return weighted / weightSum; +} + +/** + * Resample 16-bit PCM (little-endian mono) to 8kHz using a windowed low-pass kernel. */ export function resamplePcmTo8k(input: Buffer, inputSampleRate: number): Buffer { if (inputSampleRate === TELEPHONY_SAMPLE_RATE) { @@ -19,17 +66,15 @@ export function resamplePcmTo8k(input: Buffer, inputSampleRate: number): Buffer const ratio = inputSampleRate / TELEPHONY_SAMPLE_RATE; const outputSamples = Math.floor(inputSamples / ratio); const output = Buffer.alloc(outputSamples * 2); + const maxCutoff = 0.5; + const downsampleCutoff = ratio > 1 ? maxCutoff / ratio : maxCutoff; + const cutoffCyclesPerSample = Math.max(0.01, downsampleCutoff * RESAMPLE_CUTOFF_GUARD); for (let i = 0; i < outputSamples; i++) { const srcPos = i * ratio; - const srcIndex = Math.floor(srcPos); - const frac = srcPos - srcIndex; - - const s0 = input.readInt16LE(srcIndex * 2); - const s1Index = Math.min(srcIndex + 1, inputSamples - 1); - const s1 = input.readInt16LE(s1Index * 2); - - const sample = Math.round(s0 + frac * (s1 - s0)); + const sample = Math.round( + sampleBandlimited(input, inputSamples, srcPos, cutoffCyclesPerSample), + ); output.writeInt16LE(clamp16(sample), i * 2); } diff --git a/extensions/voice-call/src/webhook.test.ts b/extensions/voice-call/src/webhook.test.ts index 6297a69f14a..4ea4f1a65f5 100644 --- a/extensions/voice-call/src/webhook.test.ts +++ b/extensions/voice-call/src/webhook.test.ts @@ -2,7 +2,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { VoiceCallConfigSchema, type VoiceCallConfig } from "./config.js"; import type { CallManager } from "./manager.js"; import type { VoiceCallProvider } from "./providers/base.js"; -import type { CallRecord } from "./types.js"; +import type { CallRecord, NormalizedEvent } from "./types.js"; import { VoiceCallWebhookServer } from "./webhook.js"; const provider: VoiceCallProvider = { @@ -350,3 +350,246 @@ describe("VoiceCallWebhookServer start idempotency", () => { await server.stop(); }); }); + +describe("VoiceCallWebhookServer stream disconnect grace", () => { + beforeEach(() => { + vi.useFakeTimers(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it("ignores stale stream disconnects after reconnect and only hangs up on current stream disconnect", async () => { + const call = createCall(Date.now() - 1_000); + call.providerCallId = "CA-stream-1"; + + const endCall = vi.fn(async () => ({ success: true })); + const speakInitialMessage = vi.fn(async () => {}); + const getCallByProviderCallId = vi.fn((providerCallId: string) => + providerCallId === "CA-stream-1" ? call : undefined, + ); + + const manager = { + getActiveCalls: () => [call], + getCallByProviderCallId, + endCall, + speakInitialMessage, + processEvent: vi.fn(), + } as unknown as CallManager; + + let currentStreamSid: string | null = "MZ-new"; + const twilioProvider = { + name: "twilio" as const, + verifyWebhook: () => ({ ok: true, verifiedRequestKey: "twilio:req:test" }), + parseWebhookEvent: () => ({ events: [] }), + initiateCall: async () => ({ providerCallId: "provider-call", status: "initiated" as const }), + hangupCall: async () => {}, + playTts: async () => {}, + startListening: async () => {}, + stopListening: async () => {}, + getCallStatus: async () => ({ status: "in-progress", isTerminal: false }), + isValidStreamToken: () => true, + registerCallStream: (_callSid: string, streamSid: string) => { + currentStreamSid = streamSid; + }, + unregisterCallStream: (_callSid: string, streamSid?: string) => { + if (!currentStreamSid) { + return; + } + if (streamSid && currentStreamSid !== streamSid) { + return; + } + currentStreamSid = null; + }, + hasRegisteredStream: () => currentStreamSid !== null, + clearTtsQueue: () => {}, + }; + + const config = createConfig({ + provider: "twilio", + streaming: { + ...createConfig().streaming, + enabled: true, + openaiApiKey: "test-key", + }, + }); + const server = new VoiceCallWebhookServer( + config, + manager, + twilioProvider as unknown as VoiceCallProvider, + ); + + const mediaHandler = server.getMediaStreamHandler() as unknown as { + config: { + onDisconnect?: (providerCallId: string, streamSid: string) => void; + onConnect?: (providerCallId: string, streamSid: string) => void; + }; + }; + expect(mediaHandler).toBeTruthy(); + + mediaHandler.config.onConnect?.("CA-stream-1", "MZ-new"); + mediaHandler.config.onDisconnect?.("CA-stream-1", "MZ-old"); + await vi.advanceTimersByTimeAsync(2_100); + expect(endCall).not.toHaveBeenCalled(); + + mediaHandler.config.onDisconnect?.("CA-stream-1", "MZ-new"); + await vi.advanceTimersByTimeAsync(2_100); + expect(endCall).toHaveBeenCalledTimes(1); + expect(endCall).toHaveBeenCalledWith(call.callId); + + await server.stop(); + }); +}); + +describe("VoiceCallWebhookServer barge-in suppression during initial message", () => { + const createTwilioProvider = (clearTtsQueue: ReturnType) => ({ + name: "twilio" as const, + verifyWebhook: () => ({ ok: true, verifiedRequestKey: "twilio:req:test" }), + parseWebhookEvent: () => ({ events: [] }), + initiateCall: async () => ({ providerCallId: "provider-call", status: "initiated" as const }), + hangupCall: async () => {}, + playTts: async () => {}, + startListening: async () => {}, + stopListening: async () => {}, + getCallStatus: async () => ({ status: "in-progress", isTerminal: false }), + isValidStreamToken: () => true, + registerCallStream: () => {}, + unregisterCallStream: () => {}, + hasRegisteredStream: () => true, + clearTtsQueue, + }); + + const getMediaCallbacks = (server: VoiceCallWebhookServer) => + server.getMediaStreamHandler() as unknown as { + config: { + onSpeechStart?: (providerCallId: string) => void; + onTranscript?: (providerCallId: string, transcript: string) => void; + }; + }; + + it("suppresses barge-in clear while outbound conversation initial message is pending", async () => { + const call = createCall(Date.now() - 1_000); + call.callId = "call-barge"; + call.providerCallId = "CA-barge"; + call.direction = "outbound"; + call.state = "speaking"; + call.metadata = { + mode: "conversation", + initialMessage: "Hi, this is OpenClaw.", + }; + + const clearTtsQueue = vi.fn(); + const processEvent = vi.fn((event: NormalizedEvent) => { + if (event.type === "call.speech") { + // Mirrors manager behavior: call.speech transitions to listening. + call.state = "listening"; + } + }); + const manager = { + getActiveCalls: () => [call], + getCallByProviderCallId: (providerCallId: string) => + providerCallId === call.providerCallId ? call : undefined, + getCall: (callId: string) => (callId === call.callId ? call : undefined), + endCall: vi.fn(async () => ({ success: true })), + speakInitialMessage: vi.fn(async () => {}), + processEvent, + } as unknown as CallManager; + + const config = createConfig({ + provider: "twilio", + streaming: { + ...createConfig().streaming, + enabled: true, + openaiApiKey: "test-key", + }, + }); + const server = new VoiceCallWebhookServer( + config, + manager, + createTwilioProvider(clearTtsQueue) as unknown as VoiceCallProvider, + ); + const handleInboundResponse = vi.fn(async () => {}); + ( + server as unknown as { + handleInboundResponse: ( + callId: string, + transcript: string, + timing?: unknown, + ) => Promise; + } + ).handleInboundResponse = handleInboundResponse; + + try { + const media = getMediaCallbacks(server); + media.config.onSpeechStart?.("CA-barge"); + media.config.onTranscript?.("CA-barge", "hello"); + media.config.onSpeechStart?.("CA-barge"); + media.config.onTranscript?.("CA-barge", "hello again"); + expect(clearTtsQueue).not.toHaveBeenCalled(); + expect(handleInboundResponse).not.toHaveBeenCalled(); + expect(processEvent).not.toHaveBeenCalled(); + + if (call.metadata) { + delete call.metadata.initialMessage; + } + call.state = "listening"; + + media.config.onSpeechStart?.("CA-barge"); + media.config.onTranscript?.("CA-barge", "hello after greeting"); + expect(clearTtsQueue).toHaveBeenCalledTimes(2); + expect(handleInboundResponse).toHaveBeenCalledTimes(1); + expect(processEvent).toHaveBeenCalledTimes(1); + const [calledCallId, calledTranscript] = (handleInboundResponse.mock.calls[0] ?? + []) as unknown as [string | undefined, string | undefined]; + expect(calledCallId).toBe(call.callId); + expect(calledTranscript).toBe("hello after greeting"); + } finally { + await server.stop(); + } + }); + + it("keeps barge-in clear enabled for inbound calls", async () => { + const call = createCall(Date.now() - 1_000); + call.callId = "call-inbound"; + call.providerCallId = "CA-inbound"; + call.direction = "inbound"; + call.metadata = { + initialMessage: "Hello from inbound greeting.", + }; + + const clearTtsQueue = vi.fn(); + const manager = { + getActiveCalls: () => [call], + getCallByProviderCallId: (providerCallId: string) => + providerCallId === call.providerCallId ? call : undefined, + getCall: (callId: string) => (callId === call.callId ? call : undefined), + endCall: vi.fn(async () => ({ success: true })), + speakInitialMessage: vi.fn(async () => {}), + processEvent: vi.fn(), + } as unknown as CallManager; + + const config = createConfig({ + provider: "twilio", + streaming: { + ...createConfig().streaming, + enabled: true, + openaiApiKey: "test-key", + }, + }); + const server = new VoiceCallWebhookServer( + config, + manager, + createTwilioProvider(clearTtsQueue) as unknown as VoiceCallProvider, + ); + + try { + const media = getMediaCallbacks(server); + media.config.onSpeechStart?.("CA-inbound"); + media.config.onTranscript?.("CA-inbound", "hello"); + expect(clearTtsQueue).toHaveBeenCalledTimes(2); + } finally { + await server.stop(); + } + }); +}); diff --git a/extensions/voice-call/src/webhook.ts b/extensions/voice-call/src/webhook.ts index fe015727e73..9855d810a07 100644 --- a/extensions/voice-call/src/webhook.ts +++ b/extensions/voice-call/src/webhook.ts @@ -13,10 +13,23 @@ import { MediaStreamHandler } from "./media-stream.js"; import type { VoiceCallProvider } from "./providers/base.js"; import { OpenAIRealtimeSTTProvider } from "./providers/stt-openai-realtime.js"; import type { TwilioProvider } from "./providers/twilio.js"; -import type { NormalizedEvent, WebhookContext } from "./types.js"; +import type { CallRecord, NormalizedEvent, WebhookContext } from "./types.js"; import { startStaleCallReaper } from "./webhook/stale-call-reaper.js"; const MAX_WEBHOOK_BODY_BYTES = 1024 * 1024; +const STREAM_DISCONNECT_HANGUP_GRACE_MS = 2000; +const TRANSCRIPT_LOG_MAX_CHARS = 200; + +function sanitizeTranscriptForLog(value: string): string { + const sanitized = value + .replace(/[\u0000-\u001f\u007f]/g, " ") + .replace(/\s+/g, " ") + .trim(); + if (sanitized.length <= TRANSCRIPT_LOG_MAX_CHARS) { + return sanitized; + } + return `${sanitized.slice(0, TRANSCRIPT_LOG_MAX_CHARS)}...`; +} type WebhookResponsePayload = { statusCode: number; @@ -60,6 +73,8 @@ export class VoiceCallWebhookServer { /** Media stream handler for bidirectional audio (when streaming enabled) */ private mediaStreamHandler: MediaStreamHandler | null = null; + /** Delayed auto-hangup timers keyed by provider call ID after stream disconnect. */ + private pendingDisconnectHangups = new Map>(); constructor( config: VoiceCallConfig, @@ -87,6 +102,36 @@ export class VoiceCallWebhookServer { return this.mediaStreamHandler; } + private clearPendingDisconnectHangup(providerCallId: string): void { + const existing = this.pendingDisconnectHangups.get(providerCallId); + if (!existing) { + return; + } + clearTimeout(existing); + this.pendingDisconnectHangups.delete(providerCallId); + } + + private shouldSuppressBargeInForInitialMessage(call: CallRecord | undefined): boolean { + if (!call || call.direction !== "outbound") { + return false; + } + + // Suppress only while the initial greeting is actively being played. + // If playback fails and the call leaves "speaking", do not block auto-response. + if (call.state !== "speaking") { + return false; + } + + const mode = (call.metadata?.mode as string | undefined) ?? "conversation"; + if (mode !== "conversation") { + return false; + } + + const initialMessage = + typeof call.metadata?.initialMessage === "string" ? call.metadata.initialMessage.trim() : ""; + return initialMessage.length > 0; + } + /** * Initialize media streaming with OpenAI Realtime STT. */ @@ -127,19 +172,27 @@ export class VoiceCallWebhookServer { return true; }, onTranscript: (providerCallId, transcript) => { - console.log(`[voice-call] Transcript for ${providerCallId}: ${transcript}`); - - // Clear TTS queue on barge-in (user started speaking, interrupt current playback) - if (this.provider.name === "twilio") { - (this.provider as TwilioProvider).clearTtsQueue(providerCallId); - } - - // Look up our internal call ID from the provider call ID + const safeTranscript = sanitizeTranscriptForLog(transcript); + console.log( + `[voice-call] Transcript for ${providerCallId}: ${safeTranscript} (chars=${transcript.length})`, + ); const call = this.manager.getCallByProviderCallId(providerCallId); if (!call) { console.warn(`[voice-call] No active call found for provider ID: ${providerCallId}`); return; } + const suppressBargeIn = this.shouldSuppressBargeInForInitialMessage(call); + if (suppressBargeIn) { + console.log( + `[voice-call] Ignoring barge transcript while initial message is still playing (${providerCallId})`, + ); + return; + } + + // Clear TTS queue on barge-in (user started speaking, interrupt current playback) + if (this.provider.name === "twilio") { + (this.provider as TwilioProvider).clearTtsQueue(providerCallId); + } // Create a speech event and process it through the manager const event: NormalizedEvent = { @@ -163,44 +216,63 @@ export class VoiceCallWebhookServer { } }, onSpeechStart: (providerCallId) => { - if (this.provider.name === "twilio") { - (this.provider as TwilioProvider).clearTtsQueue(providerCallId); + if (this.provider.name !== "twilio") { + return; } + const call = this.manager.getCallByProviderCallId(providerCallId); + if (this.shouldSuppressBargeInForInitialMessage(call)) { + return; + } + (this.provider as TwilioProvider).clearTtsQueue(providerCallId); }, onPartialTranscript: (callId, partial) => { - console.log(`[voice-call] Partial for ${callId}: ${partial}`); + const safePartial = sanitizeTranscriptForLog(partial); + console.log(`[voice-call] Partial for ${callId}: ${safePartial} (chars=${partial.length})`); }, onConnect: (callId, streamSid) => { console.log(`[voice-call] Media stream connected: ${callId} -> ${streamSid}`); + this.clearPendingDisconnectHangup(callId); + // Register stream with provider for TTS routing if (this.provider.name === "twilio") { (this.provider as TwilioProvider).registerCallStream(callId, streamSid); } - // Speak initial message if one was provided when call was initiated - // Use setTimeout to allow stream setup to complete - setTimeout(() => { - this.manager.speakInitialMessage(callId).catch((err) => { - console.warn(`[voice-call] Failed to speak initial message:`, err); - }); - }, 500); + // Speak initial message immediately (no delay) to avoid stream timeout + this.manager.speakInitialMessage(callId).catch((err) => { + console.warn(`[voice-call] Failed to speak initial message:`, err); + }); }, - onDisconnect: (callId) => { - console.log(`[voice-call] Media stream disconnected: ${callId}`); - // Auto-end call when media stream disconnects to prevent stuck calls. - // Without this, calls can remain active indefinitely after the stream closes. - const disconnectedCall = this.manager.getCallByProviderCallId(callId); - if (disconnectedCall) { + onDisconnect: (callId, streamSid) => { + console.log(`[voice-call] Media stream disconnected: ${callId} (${streamSid})`); + if (this.provider.name === "twilio") { + (this.provider as TwilioProvider).unregisterCallStream(callId, streamSid); + } + + this.clearPendingDisconnectHangup(callId); + const timer = setTimeout(() => { + this.pendingDisconnectHangups.delete(callId); + const disconnectedCall = this.manager.getCallByProviderCallId(callId); + if (!disconnectedCall) { + return; + } + + if (this.provider.name === "twilio") { + const twilio = this.provider as TwilioProvider; + if (twilio.hasRegisteredStream(callId)) { + return; + } + } + console.log( - `[voice-call] Auto-ending call ${disconnectedCall.callId} on stream disconnect`, + `[voice-call] Auto-ending call ${disconnectedCall.callId} after stream disconnect grace`, ); void this.manager.endCall(disconnectedCall.callId).catch((err) => { console.warn(`[voice-call] Failed to auto-end call ${disconnectedCall.callId}:`, err); }); - } - if (this.provider.name === "twilio") { - (this.provider as TwilioProvider).unregisterCallStream(callId); - } + }, STREAM_DISCONNECT_HANGUP_GRACE_MS); + timer.unref?.(); + this.pendingDisconnectHangups.set(callId, timer); }, }; @@ -274,6 +346,11 @@ export class VoiceCallWebhookServer { * Stop the webhook server. */ async stop(): Promise { + for (const timer of this.pendingDisconnectHangups.values()) { + clearTimeout(timer); + } + this.pendingDisconnectHangups.clear(); + if (this.stopStaleCallReaper) { this.stopStaleCallReaper(); this.stopStaleCallReaper = null;