diff --git a/Dockerfile b/Dockerfile index b2af00c3b40..aa7816a5c8e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -88,7 +88,7 @@ RUN pnpm canvas:a2ui:bundle || \ echo "/* A2UI bundle unavailable in this build */" > src/canvas-host/a2ui/a2ui.bundle.js && \ echo "stub" > src/canvas-host/a2ui/.bundle.hash && \ rm -rf vendor/a2ui apps/shared/OpenClawKit/Tools/CanvasA2UI) -RUN pnpm build:docker +RUN NODE_OPTIONS=--max-old-space-size=2048 pnpm build:docker # Force pnpm for UI build (Bun may fail on ARM/Synology architectures) ENV OPENCLAW_PREFER_PNPM=1 RUN pnpm ui:build diff --git a/docker-compose.yml b/docker-compose.yml index c0bffc64458..6859e10c49d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -24,6 +24,9 @@ services: ports: - "${OPENCLAW_GATEWAY_PORT:-18789}:18789" - "${OPENCLAW_BRIDGE_PORT:-18790}:18790" + ## Uncomment to expose the voice webhook port (required when voice-call plugin + ## serve.bind is set to 0.0.0.0 for direct inbound webhook access): + # - "${OPENCLAW_VOICE_WEBHOOK_PORT:-3334}:3334" init: true restart: unless-stopped command: diff --git a/extensions/voice-call/README.md b/extensions/voice-call/README.md index fe228537ee8..8a0e6310b17 100644 --- a/extensions/voice-call/README.md +++ b/extensions/voice-call/README.md @@ -134,6 +134,63 @@ Actions: - `voicecall.end` (callId) - `voicecall.status` (callId) +## Realtime voice mode (OpenAI Realtime API) + +Realtime mode routes inbound calls directly to the [OpenAI Realtime API](https://platform.openai.com/docs/guides/realtime) for voice-to-voice conversation (~200–400 ms latency vs ~2–3 s for the STT/TTS pipeline). It is disabled by default and mutually exclusive with `streaming.enabled`. + +### Requirements + +- `OPENAI_API_KEY` set in your environment (or `streaming.openaiApiKey` in config). +- A **publicly reachable HTTPS endpoint with WebSocket support** — the webhook server must accept both POST requests (Twilio webhook) and WebSocket upgrades (Twilio Media Stream). A plain HTTP tunnel is not sufficient; Twilio requires WSS. +- `inboundPolicy` set to `"open"` or `"allowlist"` (not `"disabled"`) so the plugin accepts inbound calls. + +### Config + +```json5 +{ + inboundPolicy: "open", // required: realtime needs inbound calls enabled + + realtime: { + enabled: true, + voice: "alloy", // Realtime API voices: alloy, ash, ballad, cedar, coral, + // echo, marin, sage, shimmer, verse + instructions: "You are a helpful assistant.", + model: "gpt-4o-mini-realtime-preview", // optional, this is the default + temperature: 0.8, // 0–2, optional + vadThreshold: 0.5, // voice activity detection sensitivity, 0–1, optional + silenceDurationMs: 500, // ms of silence before end-of-turn, optional + }, +} +``` + +### Environment variable overrides + +All `realtime.*` fields can be set via environment variables (config takes precedence): + +| Env var | Config field | +| ----------------------------- | ---------------------------- | +| `REALTIME_VOICE_ENABLED=true` | `realtime.enabled` | +| `REALTIME_VOICE_MODEL` | `realtime.model` | +| `REALTIME_VOICE_VOICE` | `realtime.voice` | +| `REALTIME_VOICE_INSTRUCTIONS` | `realtime.instructions` | +| `REALTIME_VOICE_TEMPERATURE` | `realtime.temperature` | +| `VAD_THRESHOLD` | `realtime.vadThreshold` | +| `SILENCE_DURATION_MS` | `realtime.silenceDurationMs` | + +### How it works + +1. Twilio sends a POST webhook to `serve.path` (default `/voice/webhook`). +2. The plugin responds with TwiML `` pointing to `wss:///voice/stream/realtime`. +3. Twilio opens a WebSocket to that path carrying the caller's audio in μ-law format. +4. The plugin bridges the WebSocket to the OpenAI Realtime API — audio flows in both directions in real time. +5. The call is registered with CallManager and appears in `openclaw voice status` / `openclaw voice history`. + +### Networking notes + +- `serve.bind` defaults to `127.0.0.1`. If running inside Docker with an external port mapping, set `serve.bind: "0.0.0.0"` so the container's port is reachable from the host. +- The WebSocket upgrade path (`/voice/stream/realtime`) must be reachable on the same host and port as the webhook. Reverse proxies must pass `Upgrade: websocket` headers through. +- When using Tailscale Funnel on the host (outside Docker), configure Funnel to route `/voice/` to the plugin's local port. The gateway itself does not need to be exposed via Tailscale Funnel. + ## Notes - Uses webhook signature verification for Twilio/Telnyx/Plivo. diff --git a/extensions/voice-call/openclaw.plugin.json b/extensions/voice-call/openclaw.plugin.json index fef3ccc6ad9..4a2a5f94433 100644 --- a/extensions/voice-call/openclaw.plugin.json +++ b/extensions/voice-call/openclaw.plugin.json @@ -134,6 +134,36 @@ "label": "ElevenLabs Base URL", "advanced": true }, + "realtime.enabled": { + "label": "Enable Realtime Voice (OpenAI)", + "help": "Full voice-to-voice mode via OpenAI Realtime API. Requires inboundPolicy open or allowlist." + }, + "realtime.model": { + "label": "Realtime Model", + "advanced": true + }, + "realtime.voice": { + "label": "Realtime Voice" + }, + "realtime.instructions": { + "label": "Realtime Instructions" + }, + "realtime.temperature": { + "label": "Realtime Temperature", + "advanced": true + }, + "realtime.vadThreshold": { + "label": "VAD Sensitivity", + "advanced": true + }, + "realtime.silenceDurationMs": { + "label": "Silence Timeout (ms)", + "advanced": true + }, + "realtime.prefixPaddingMs": { + "label": "Prefix Padding (ms)", + "advanced": true + }, "publicUrl": { "label": "Public Webhook URL", "advanced": true @@ -385,6 +415,60 @@ } } }, + "realtime": { + "type": "object", + "additionalProperties": false, + "properties": { + "enabled": { + "type": "boolean" + }, + "model": { + "type": "string" + }, + "voice": { + "type": "string", + "enum": [ + "alloy", + "ash", + "ballad", + "cedar", + "coral", + "echo", + "marin", + "sage", + "shimmer", + "verse" + ] + }, + "instructions": { + "type": "string" + }, + "temperature": { + "type": "number", + "minimum": 0, + "maximum": 2 + }, + "vadThreshold": { + "type": "number", + "minimum": 0, + "maximum": 1 + }, + "silenceDurationMs": { + "type": "integer", + "minimum": 1 + }, + "prefixPaddingMs": { + "type": "integer", + "minimum": 0 + }, + "tools": { + "type": "array", + "items": { + "type": "object" + } + } + } + }, "publicUrl": { "type": "string" }, diff --git a/extensions/voice-call/src/config.test.ts b/extensions/voice-call/src/config.test.ts index 1b12e9e84c5..ceef8d57d9a 100644 --- a/extensions/voice-call/src/config.test.ts +++ b/extensions/voice-call/src/config.test.ts @@ -3,6 +3,7 @@ import { validateProviderConfig, normalizeVoiceCallConfig, resolveVoiceCallConfig, + VoiceCallRealtimeConfigSchema, type VoiceCallConfig, } from "./config.js"; import { createVoiceCallBaseConfig } from "./test-fixtures.js"; @@ -216,3 +217,126 @@ describe("normalizeVoiceCallConfig", () => { expect(normalized.tts?.elevenlabs?.voiceSettings).toEqual({ speed: 1.1 }); }); }); + +describe("VoiceCallRealtimeConfigSchema", () => { + it("defaults to disabled with empty tools array", () => { + const config = VoiceCallRealtimeConfigSchema.parse({}); + expect(config.enabled).toBe(false); + expect(config.tools).toEqual([]); + }); + + it("accepts all valid Realtime API voice names", () => { + const voices = [ + "alloy", + "ash", + "ballad", + "cedar", + "coral", + "echo", + "marin", + "sage", + "shimmer", + "verse", + ]; + for (const voice of voices) { + expect(() => VoiceCallRealtimeConfigSchema.parse({ voice })).not.toThrow(); + } + }); + + it("rejects voice names that are not in the Realtime API (e.g. nova, fable, onyx)", () => { + for (const voice of ["nova", "fable", "onyx"]) { + expect(() => VoiceCallRealtimeConfigSchema.parse({ voice })).toThrow(); + } + }); + + it("normalizeVoiceCallConfig propagates realtime sub-config", () => { + const normalized = normalizeVoiceCallConfig({ + enabled: true, + provider: "mock", + realtime: { enabled: true, voice: "marin", instructions: "Be helpful." }, + }); + expect(normalized.realtime.enabled).toBe(true); + expect(normalized.realtime.voice).toBe("marin"); + expect(normalized.realtime.instructions).toBe("Be helpful."); + expect(normalized.realtime.tools).toEqual([]); + }); +}); + +describe("resolveVoiceCallConfig — realtime env vars", () => { + const originalEnv = { ...process.env }; + + afterEach(() => { + process.env = { ...originalEnv }; + }); + + it("auto-enables realtime from REALTIME_VOICE_ENABLED=true", () => { + process.env.REALTIME_VOICE_ENABLED = "true"; + const resolved = resolveVoiceCallConfig(createVoiceCallBaseConfig()); + expect(resolved.realtime.enabled).toBe(true); + }); + + it("does not auto-enable when REALTIME_VOICE_ENABLED is absent or not 'true'", () => { + delete process.env.REALTIME_VOICE_ENABLED; + expect(resolveVoiceCallConfig(createVoiceCallBaseConfig()).realtime.enabled).toBe(false); + + process.env.REALTIME_VOICE_ENABLED = "false"; + expect(resolveVoiceCallConfig(createVoiceCallBaseConfig()).realtime.enabled).toBe(false); + }); + + it("resolves model, voice, instructions, temperature from env vars", () => { + process.env.REALTIME_VOICE_MODEL = "gpt-4o-realtime-preview"; + process.env.REALTIME_VOICE_VOICE = "ash"; + process.env.REALTIME_VOICE_INSTRUCTIONS = "You are helpful."; + process.env.REALTIME_VOICE_TEMPERATURE = "0.8"; + const resolved = resolveVoiceCallConfig(createVoiceCallBaseConfig()); + expect(resolved.realtime.model).toBe("gpt-4o-realtime-preview"); + expect(resolved.realtime.voice).toBe("ash"); + expect(resolved.realtime.instructions).toBe("You are helpful."); + expect(resolved.realtime.temperature).toBeCloseTo(0.8); + }); + + it("resolves vadThreshold and silenceDurationMs from env vars", () => { + process.env.VAD_THRESHOLD = "0.7"; + process.env.SILENCE_DURATION_MS = "1200"; + const resolved = resolveVoiceCallConfig(createVoiceCallBaseConfig()); + expect(resolved.realtime.vadThreshold).toBeCloseTo(0.7); + expect(resolved.realtime.silenceDurationMs).toBe(1200); + }); + + it("config values take precedence over env vars", () => { + process.env.REALTIME_VOICE_VOICE = "ash"; + const base = createVoiceCallBaseConfig(); + base.realtime = { enabled: false, voice: "coral", tools: [] }; + const resolved = resolveVoiceCallConfig(base); + expect(resolved.realtime.voice).toBe("coral"); + }); +}); + +describe("validateProviderConfig — realtime mode", () => { + it("rejects realtime.enabled when inboundPolicy is 'disabled'", () => { + const config = createVoiceCallBaseConfig({ provider: "mock" }); + config.realtime = { enabled: true, tools: [] }; + // inboundPolicy defaults to "disabled" in createVoiceCallBaseConfig + const result = validateProviderConfig(config); + expect(result.valid).toBe(false); + expect(result.errors.some((e) => e.includes("inboundPolicy"))).toBe(true); + }); + + it("passes when realtime.enabled with inboundPolicy 'open'", () => { + const config = createVoiceCallBaseConfig({ provider: "mock" }); + config.inboundPolicy = "open"; + config.realtime = { enabled: true, tools: [] }; + const result = validateProviderConfig(config); + expect(result.errors.some((e) => e.includes("inboundPolicy"))).toBe(false); + }); + + it("rejects when both realtime.enabled and streaming.enabled are true", () => { + const config = createVoiceCallBaseConfig({ provider: "mock" }); + config.inboundPolicy = "open"; + config.realtime = { enabled: true, tools: [] }; + config.streaming = { ...config.streaming, enabled: true }; + const result = validateProviderConfig(config); + expect(result.valid).toBe(false); + expect(result.errors.some((e) => e.includes("streaming"))).toBe(true); + }); +}); diff --git a/extensions/voice-call/src/config.ts b/extensions/voice-call/src/config.ts index 2d1494c7876..2ace2283608 100644 --- a/extensions/voice-call/src/config.ts +++ b/extensions/voice-call/src/config.ts @@ -200,6 +200,66 @@ export const OutboundConfigSchema = z .default({ defaultMode: "notify", notifyHangupDelaySec: 3 }); export type OutboundConfig = z.infer; +// ----------------------------------------------------------------------------- +// Realtime Voice Configuration (OpenAI Realtime API — voice-to-voice) +// ----------------------------------------------------------------------------- + +/** + * Zod schema for a single OpenAI Realtime tool (mirrors the RealtimeTool interface + * in providers/openai-realtime-voice.ts, expressed as a schema for config parsing). + */ +export const RealtimeToolSchema = z + .object({ + type: z.literal("function"), + name: z.string(), + description: z.string(), + parameters: z.object({ + type: z.literal("object"), + properties: z.record(z.string(), z.unknown()), + required: z.array(z.string()).optional(), + }), + }) + .strict(); +export type RealtimeToolConfig = z.infer; + +export const VoiceCallRealtimeConfigSchema = z + .object({ + /** Enable realtime voice-to-voice mode (OpenAI Realtime API). Default: false. */ + enabled: z.boolean().default(false), + /** Realtime model (env: REALTIME_VOICE_MODEL, default: "gpt-4o-mini-realtime-preview") */ + model: z.string().optional(), + /** Voice for AI speech output (env: REALTIME_VOICE_VOICE) */ + voice: z + .enum([ + "alloy", + "ash", + "ballad", + "cedar", + "coral", + "echo", + "marin", + "sage", + "shimmer", + "verse", + ]) + .optional(), + /** System instructions / persona (env: REALTIME_VOICE_INSTRUCTIONS) */ + instructions: z.string().optional(), + /** Temperature 0–2 (env: REALTIME_VOICE_TEMPERATURE) */ + temperature: z.number().min(0).max(2).optional(), + /** VAD threshold 0–1 (env: VAD_THRESHOLD) */ + vadThreshold: z.number().min(0).max(1).optional(), + /** Silence duration in ms before turn ends (env: SILENCE_DURATION_MS) */ + silenceDurationMs: z.number().int().positive().optional(), + /** Audio padding before speech in ms */ + prefixPaddingMs: z.number().int().nonnegative().optional(), + /** Tool definitions (OpenAI function-call schema); execution wired via registerToolHandler */ + tools: z.array(RealtimeToolSchema).default([]), + }) + .strict() + .default({ enabled: false, tools: [] }); +export type VoiceCallRealtimeConfig = z.infer; + // ----------------------------------------------------------------------------- // Streaming Configuration (OpenAI Realtime STT) // ----------------------------------------------------------------------------- @@ -324,6 +384,9 @@ export const VoiceCallConfigSchema = z /** Real-time audio streaming configuration */ streaming: VoiceCallStreamingConfigSchema, + /** Realtime voice-to-voice configuration (OpenAI Realtime API) */ + realtime: VoiceCallRealtimeConfigSchema, + /** Public webhook URL override (if set, bypasses tunnel auto-detection) */ publicUrl: z.string().url().optional(), @@ -398,6 +461,14 @@ export function normalizeVoiceCallConfig(config: VoiceCallConfigInput): VoiceCal config.webhookSecurity?.trustedProxyIPs ?? defaults.webhookSecurity.trustedProxyIPs, }, streaming: { ...defaults.streaming, ...config.streaming }, + realtime: { + ...defaults.realtime, + ...config.realtime, + // Cast: DeepPartial makes tool fields appear optional in the input type, + // but Zod validates the full shape before it reaches here. + tools: + (config.realtime?.tools as RealtimeToolConfig[] | undefined) ?? defaults.realtime.tools, + }, stt: { ...defaults.stt, ...config.stt }, tts: normalizeVoiceCallTtsConfig(defaults.tts, config.tts), }; @@ -453,6 +524,29 @@ export function resolveVoiceCallConfig(config: VoiceCallConfigInput): VoiceCallC resolved.webhookSecurity.trustForwardingHeaders ?? false; resolved.webhookSecurity.trustedProxyIPs = resolved.webhookSecurity.trustedProxyIPs ?? []; + // Realtime voice — resolve env var fallbacks + resolved.realtime = { ...resolved.realtime }; + // REALTIME_VOICE_ENABLED=true auto-enables realtime mode (backward compat) + if (!resolved.realtime.enabled && process.env.REALTIME_VOICE_ENABLED === "true") { + resolved.realtime.enabled = true; + } + resolved.realtime.model = resolved.realtime.model ?? process.env.REALTIME_VOICE_MODEL; + resolved.realtime.voice = + (resolved.realtime.voice ?? + (process.env.REALTIME_VOICE_VOICE as VoiceCallRealtimeConfig["voice"])) || + undefined; + resolved.realtime.instructions = + resolved.realtime.instructions ?? process.env.REALTIME_VOICE_INSTRUCTIONS; + if (resolved.realtime.temperature == null && process.env.REALTIME_VOICE_TEMPERATURE) { + resolved.realtime.temperature = parseFloat(process.env.REALTIME_VOICE_TEMPERATURE); + } + if (resolved.realtime.vadThreshold == null && process.env.VAD_THRESHOLD) { + resolved.realtime.vadThreshold = parseFloat(process.env.VAD_THRESHOLD); + } + if (resolved.realtime.silenceDurationMs == null && process.env.SILENCE_DURATION_MS) { + resolved.realtime.silenceDurationMs = parseInt(process.env.SILENCE_DURATION_MS, 10); + } + return normalizeVoiceCallConfig(resolved); } @@ -521,5 +615,24 @@ export function validateProviderConfig(config: VoiceCallConfig): { } } + // Realtime mode requires inbound calls to be accepted — policy "disabled" + // means the manager will reject every call before it can be tracked. + // "open" or "allowlist" are the correct choices when realtime.enabled = true. + if (config.realtime?.enabled && config.inboundPolicy === "disabled") { + errors.push( + 'plugins.entries.voice-call.config.inboundPolicy must not be "disabled" when realtime.enabled is true ' + + '(use "open" or "allowlist" — realtime calls are answered before policy can reject them)', + ); + } + + // Both streaming and realtime cannot be enabled simultaneously — they use + // incompatible WebSocket paths and audio routing. + if (config.realtime?.enabled && config.streaming?.enabled) { + errors.push( + "plugins.entries.voice-call.config: realtime.enabled and streaming.enabled cannot both be true " + + "(they use incompatible audio paths — choose one mode)", + ); + } + return { valid: errors.length === 0, errors }; } diff --git a/extensions/voice-call/src/manager/events.ts b/extensions/voice-call/src/manager/events.ts index 668369e0c35..2ce9de599aa 100644 --- a/extensions/voice-call/src/manager/events.ts +++ b/extensions/voice-call/src/manager/events.ts @@ -204,6 +204,9 @@ export function processEvent(ctx: EventContext, event: NormalizedEvent): void { break; case "call.speaking": + if (event.text) { + addTranscriptEntry(call, "bot", event.text); + } transitionState(call, "speaking"); break; diff --git a/extensions/voice-call/src/providers/openai-realtime-voice.ts b/extensions/voice-call/src/providers/openai-realtime-voice.ts new file mode 100755 index 00000000000..42f7b7949f1 --- /dev/null +++ b/extensions/voice-call/src/providers/openai-realtime-voice.ts @@ -0,0 +1,874 @@ +/** + * OpenAI Realtime Voice Bridge + * + * Implements a bidirectional voice-to-voice bridge between Twilio Media Streams + * and the OpenAI Realtime API. Replaces the STT → LLM → TTS pipeline with a + * single WebSocket session that handles everything natively. + * + * Key benefits over the STT-only approach: + * - Latency: ~200–400 ms TTFB vs ~1–3.5 s in the pipeline mode + * - Audio format: g711_ulaw (mulaw) is natively supported — zero conversion + * - Barge-in: server VAD handles interruptions automatically + * - No separate LLM or TTS call required + * + * Usage: + * const bridge = new OpenAIRealtimeVoiceBridge({ + * apiKey: process.env.OPENAI_API_KEY!, + * instructions: "You are Gracie, a helpful AI assistant...", + * voice: "alloy", + * onAudio: (muLaw) => mediaStreamHandler.sendAudio(streamSid, muLaw), + * onClearAudio: () => mediaStreamHandler.clearAudio(streamSid), + * onTranscript: (role, text) => console.log(`[${role}]: ${text}`), + * }); + * await bridge.connect(); + * bridge.sendAudio(twilioPayloadBuffer); + * bridge.close(); + * + * Integration with media-stream.ts: + * Replace `sttSession` with this bridge in `MediaStreamHandler.handleStart()`. + * Wire audio in/out through the existing sendAudio / clearAudio methods. + * + * @see https://platform.openai.com/docs/guides/realtime + * @see https://www.twilio.com/docs/voice/media-streams + */ + +import WebSocket from "ws"; + +// --------------------------------------------------------------------------- +// Types +// --------------------------------------------------------------------------- + +/** OpenAI Realtime API voice options. + * NOTE: These differ from the TTS-1 voices — nova/fable/onyx are NOT supported here. + * Source: live API error "Supported values are: alloy, ash, ballad, cedar, coral, echo, marin, sage, shimmer, verse" + */ +export type RealtimeVoice = + | "alloy" + | "ash" + | "ballad" + | "cedar" + | "coral" + | "echo" + | "marin" + | "sage" + | "shimmer" + | "verse"; + +/** Realtime tool definition (mirrors OpenAI function calling schema) */ +export interface RealtimeTool { + type: "function"; + name: string; + description: string; + parameters: { + type: "object"; + properties: Record; + required?: string[]; + }; +} + +/** Tool call event emitted when OpenAI invokes a function */ +export interface ToolCallEvent { + /** Conversation item ID for submitting the result */ + itemId: string; + /** Call ID for matching request/response */ + callId: string; + /** Function name */ + name: string; + /** Parsed JSON arguments */ + args: unknown; +} + +/** + * Configuration for the Realtime Voice Bridge. + */ +export interface RealtimeVoiceConfig { + // ---- Required ---- + /** OpenAI API key */ + apiKey: string; + + // ---- Voice/personality ---- + /** System instructions (persona / behaviour) */ + instructions?: string; + /** Voice to use for AI speech output (default: "alloy") */ + voice?: RealtimeVoice; + /** Response temperature 0–1 (default: 0.8) */ + temperature?: number; + + // ---- Model ---- + /** Realtime model (default: "gpt-4o-mini-realtime-preview") */ + model?: string; + + // ---- VAD ---- + /** VAD speech detection threshold 0–1 (default: 0.5) */ + vadThreshold?: number; + /** Silence duration in ms before turn ends (default: 500) */ + silenceDurationMs?: number; + /** Padding before speech in ms (default: 300) */ + prefixPaddingMs?: number; + + // ---- Tools ---- + /** Optional function tools the model can call */ + tools?: RealtimeTool[]; + + // ---- Audio callbacks ---- + /** + * Called for each audio delta chunk from OpenAI. + * @param muLaw - Raw mulaw Buffer ready to send to Twilio + */ + onAudio: (muLaw: Buffer) => void; + + /** + * Called on barge-in (user speech detected mid-response). + * Clear Twilio audio buffer here. + */ + onClearAudio: () => void; + + // ---- Event callbacks (optional) ---- + /** + * Transcript event (partial or final). Role is "user" or "assistant". + */ + onTranscript?: (role: "user" | "assistant", text: string, isFinal: boolean) => void; + + /** + * Called when the model invokes a tool/function. + * Your handler should call `bridge.submitToolResult(event.callId, result)`. + */ + onToolCall?: (event: ToolCallEvent) => void; + + /** + * Called when the session is fully connected and configured. + */ + onReady?: () => void; + + /** + * Called on irrecoverable error or max reconnects exceeded. + */ + onError?: (error: Error) => void; + + /** + * Called when the bridge closes (intentionally or after max retries). + */ + onClose?: () => void; +} + +// --------------------------------------------------------------------------- +// Internal helpers +// --------------------------------------------------------------------------- + +function base64ToBuffer(b64: string): Buffer { + return Buffer.from(b64, "base64"); +} + +// --------------------------------------------------------------------------- +// Main class +// --------------------------------------------------------------------------- + +/** + * Bidirectional voice bridge between Twilio Media Streams and the OpenAI Realtime API. + * + * Lifecycle: + * new OpenAIRealtimeVoiceBridge(config) + * → connect() — opens WebSocket, configures session + * → sendAudio() — called for each Twilio media chunk + * → [callbacks fire as OpenAI responds] + * → close() — graceful shutdown + */ +export class OpenAIRealtimeVoiceBridge { + private static readonly DEFAULT_MODEL = "gpt-4o-mini-realtime-preview"; + private static readonly MAX_RECONNECT_ATTEMPTS = 5; + private static readonly BASE_RECONNECT_DELAY_MS = 1000; + private static readonly CONNECT_TIMEOUT_MS = 10_000; + + private readonly config: RealtimeVoiceConfig; + private readonly model: string; + + private ws: WebSocket | null = null; + private connected = false; + private intentionallyClosed = false; + private reconnectAttempts = 0; + + /** Pending audio buffers queued while reconnecting */ + private pendingAudio: Buffer[] = []; + + /** Track mark queue for barge-in timing (mirrors reference impl) */ + private markQueue: string[] = []; + private responseStartTimestamp: number | null = null; + private latestMediaTimestamp = 0; + private lastAssistantItemId: string | null = null; + + /** Accumulate tool call arguments (streamed as deltas) */ + private toolCallBuffers = new Map(); + + /** Guards onReady/greeting so it fires only on the first session, not reconnects */ + private sessionReadyFired = false; + + constructor(config: RealtimeVoiceConfig) { + if (!config.apiKey) { + throw new Error("[RealtimeVoice] OpenAI API key is required"); + } + this.config = config; + this.model = config.model ?? OpenAIRealtimeVoiceBridge.DEFAULT_MODEL; + } + + // ------------------------------------------------------------------------- + // Public API + // ------------------------------------------------------------------------- + + /** + * Connect to the OpenAI Realtime API. + * Resolves when the WebSocket is open and session.update has been sent. + * Throws if connection times out. + */ + async connect(): Promise { + this.intentionallyClosed = false; + this.reconnectAttempts = 0; + return this.doConnect(); + } + + /** + * Send a mulaw audio chunk from Twilio to OpenAI. + * Buffers chunks if not yet connected; drains on reconnect. + * + * @param audio - Raw mulaw Buffer (Twilio media.payload decoded from base64) + */ + sendAudio(audio: Buffer): void { + if (!this.connected || this.ws?.readyState !== WebSocket.OPEN) { + // Buffer up to 2 seconds of audio (~320 chunks × 20ms) while reconnecting + if (this.pendingAudio.length < 320) { + this.pendingAudio.push(audio); + } + return; + } + this.sendEvent({ + type: "input_audio_buffer.append", + audio: audio.toString("base64"), + }); + } + + /** + * Update the media timestamp (used for barge-in truncation calculations). + * Call this with data.media.timestamp from each Twilio media event. + */ + setMediaTimestamp(ts: number): void { + this.latestMediaTimestamp = ts; + } + + /** + * Inject a user text message into the conversation (optional). + * Useful for seeding context or simulating a greeting trigger. + */ + sendUserMessage(text: string): void { + this.sendEvent({ + type: "conversation.item.create", + item: { + type: "message", + role: "user", + content: [{ type: "input_text", text }], + }, + }); + this.sendEvent({ type: "response.create" }); + } + + /** + * Submit a tool/function result back to the model. + * Must be called in response to an `onToolCall` event. + * + * @param callId - The call_id from the ToolCallEvent + * @param result - JSON-serializable result value + */ + submitToolResult(callId: string, result: unknown): void { + this.sendEvent({ + type: "conversation.item.create", + item: { + type: "function_call_output", + call_id: callId, + output: JSON.stringify(result), + }, + }); + // Trigger AI to respond now that the tool result is available + this.sendEvent({ type: "response.create" }); + } + + /** + * Gracefully close the bridge. + */ + close(): void { + this.intentionallyClosed = true; + this.connected = false; + if (this.ws) { + this.ws.close(1000, "Bridge closed"); + this.ws = null; + } + // onClose fires from the ws "close" event handler (intentionallyClosed branch) + // to avoid double-firing on explicit close(). + } + + /** True if the WebSocket is open and the session is configured. */ + isConnected(): boolean { + return this.connected; + } + + // ------------------------------------------------------------------------- + // Connection management + // ------------------------------------------------------------------------- + + private async doConnect(): Promise { + return new Promise((resolve, reject) => { + const url = `wss://api.openai.com/v1/realtime?model=${encodeURIComponent(this.model)}`; + + console.log(`[RealtimeVoice] Connecting to ${url}`); + + this.ws = new WebSocket(url, { + headers: { + Authorization: `Bearer ${this.config.apiKey}`, + "OpenAI-Beta": "realtime=v1", + }, + }); + + const connectTimeout = setTimeout(() => { + if (!this.connected) { + this.ws?.terminate(); + reject(new Error("[RealtimeVoice] Connection timeout")); + } + }, OpenAIRealtimeVoiceBridge.CONNECT_TIMEOUT_MS); + + this.ws.on("open", () => { + clearTimeout(connectTimeout); + console.log("[RealtimeVoice] WebSocket connected"); + this.connected = true; + this.reconnectAttempts = 0; + // Send session config immediately — no need to wait; the server + // confirms receipt via session.created which triggers drain + onReady. + this.sendSessionUpdate(); + resolve(); + }); + + this.ws.on("message", (data: Buffer) => { + try { + const event = JSON.parse(data.toString()) as RealtimeEvent; + this.handleEvent(event); + } catch (err) { + console.error("[RealtimeVoice] Failed to parse event:", err); + } + }); + + this.ws.on("error", (err) => { + console.error("[RealtimeVoice] WebSocket error:", err); + if (!this.connected) { + clearTimeout(connectTimeout); + reject(err); + } else { + this.config.onError?.(err instanceof Error ? err : new Error(String(err))); + } + }); + + this.ws.on("close", (code, reason) => { + console.log( + `[RealtimeVoice] WebSocket closed (code: ${code}, reason: ${reason?.toString() || "none"})`, + ); + this.connected = false; + this.ws = null; + + if (!this.intentionallyClosed) { + void this.attemptReconnect(); + } else { + this.config.onClose?.(); + } + }); + }); + } + + /** + * Trigger a greeting response from the AI. + * Useful for seeding context or simulating a greeting trigger. + */ + public triggerGreeting(): void { + if (!this.connected || !this.ws) { + console.warn("[RealtimeVoice] Cannot trigger greeting: not connected"); + return; + } + const greetingEvent = { + type: "response.create", + response: { + instructions: this.config.instructions, + }, + }; + this.sendEvent(greetingEvent); + console.log("[RealtimeVoice] Greeting triggered"); + } + + private sendSessionUpdate(): void { + const cfg = this.config; + + const sessionUpdate: RealtimeSessionUpdate = { + type: "session.update", + session: { + modalities: ["text", "audio"], + instructions: cfg.instructions, + voice: cfg.voice ?? "alloy", + input_audio_format: "g711_ulaw", + output_audio_format: "g711_ulaw", + // whisper-1 is the only model currently supported by the Realtime API for + // inline user-speech transcription. This is distinct from the streaming + // STT path (streaming.sttModel) which uses gpt-4o-transcribe. + input_audio_transcription: { + model: "whisper-1", + }, + turn_detection: { + type: "server_vad", + threshold: cfg.vadThreshold ?? 0.5, + prefix_padding_ms: cfg.prefixPaddingMs ?? 300, + silence_duration_ms: cfg.silenceDurationMs ?? 500, + create_response: true, + }, + temperature: cfg.temperature ?? 0.8, + ...(cfg.tools && cfg.tools.length > 0 + ? { + tools: cfg.tools, + tool_choice: "auto", + } + : {}), + }, + }; + + console.log("[RealtimeVoice] Sending session.update"); + this.sendEvent(sessionUpdate); + } + + private drainPendingAudio(): void { + if (this.pendingAudio.length === 0) return; + console.log(`[RealtimeVoice] Draining ${this.pendingAudio.length} buffered audio chunks`); + for (const buf of this.pendingAudio) { + this.sendEvent({ + type: "input_audio_buffer.append", + audio: buf.toString("base64"), + }); + } + this.pendingAudio = []; + } + + private async attemptReconnect(): Promise { + if (this.intentionallyClosed) return; + + if (this.reconnectAttempts >= OpenAIRealtimeVoiceBridge.MAX_RECONNECT_ATTEMPTS) { + const err = new Error( + `[RealtimeVoice] Max reconnect attempts (${OpenAIRealtimeVoiceBridge.MAX_RECONNECT_ATTEMPTS}) exceeded`, + ); + console.error(err.message); + this.config.onError?.(err); + this.config.onClose?.(); + return; + } + + this.reconnectAttempts++; + const delay = + OpenAIRealtimeVoiceBridge.BASE_RECONNECT_DELAY_MS * 2 ** (this.reconnectAttempts - 1); + + console.log( + `[RealtimeVoice] Reconnecting (${this.reconnectAttempts}/${OpenAIRealtimeVoiceBridge.MAX_RECONNECT_ATTEMPTS}) in ${delay}ms...`, + ); + + await new Promise((resolve) => setTimeout(resolve, delay)); + + if (this.intentionallyClosed) return; + + try { + await this.doConnect(); + console.log("[RealtimeVoice] Reconnected successfully"); + } catch (err) { + console.error("[RealtimeVoice] Reconnect failed:", err); + // doConnect's close handler will call attemptReconnect again + } + } + + // ------------------------------------------------------------------------- + // Event handling + // ------------------------------------------------------------------------- + + private handleEvent(event: RealtimeEvent): void { + switch (event.type) { + // ---- Session lifecycle ---- + case "session.created": + console.log("[RealtimeVoice] Session created"); + this.drainPendingAudio(); + // Fire onReady exactly once — not on reconnects (greeting already played) + if (!this.sessionReadyFired) { + this.sessionReadyFired = true; + this.config.onReady?.(); + } + break; + + case "session.updated": + console.log("[RealtimeVoice] Session updated"); + break; + + // ---- Audio output: stream audio back to Twilio ---- + case "response.audio.delta": { + if (!event.delta) break; + + const audioBuffer = base64ToBuffer(event.delta); + this.config.onAudio(audioBuffer); + + // Track response start timestamp for barge-in truncation + if (this.responseStartTimestamp === null) { + this.responseStartTimestamp = this.latestMediaTimestamp; + } + + // Track the most recent assistant item ID + if (event.item_id) { + this.lastAssistantItemId = event.item_id; + } + + // Send mark to track playback position + this.sendMark(); + break; + } + + case "response.audio.done": + console.log("[RealtimeVoice] Audio response complete"); + break; + + // ---- Barge-in: user started speaking, interrupt AI response ---- + case "input_audio_buffer.speech_started": + console.log("[RealtimeVoice] Barge-in detected — clearing audio"); + this.handleBargein(); + break; + + case "input_audio_buffer.speech_stopped": + console.log("[RealtimeVoice] Speech stopped"); + break; + + case "input_audio_buffer.committed": + console.log("[RealtimeVoice] Audio buffer committed"); + break; + + // ---- Mark acknowledgment from Twilio ---- + case "response.audio_transcript.delta": + // AI speech transcript streaming (not an event we send to Twilio) + if (event.delta) { + this.config.onTranscript?.("assistant", event.delta, false); + } + break; + + case "response.audio_transcript.done": + if (event.transcript) { + console.log(`[RealtimeVoice] Assistant: ${event.transcript}`); + this.config.onTranscript?.("assistant", event.transcript, true); + } + break; + + // ---- User speech transcription (if text modality enabled) ---- + case "conversation.item.input_audio_transcription.completed": + if (event.transcript) { + console.log(`[RealtimeVoice] User: ${event.transcript}`); + this.config.onTranscript?.("user", event.transcript, true); + } + break; + + case "conversation.item.input_audio_transcription.delta": + if (event.delta) { + this.config.onTranscript?.("user", event.delta, false); + } + break; + + // ---- Tool calling ---- + case "response.function_call_arguments.delta": { + const key = event.item_id ?? "unknown"; + const existing = this.toolCallBuffers.get(key); + if (existing && event.delta) { + existing.args += event.delta; + } else if (event.item_id) { + this.toolCallBuffers.set(event.item_id, { + name: event.name ?? "", + callId: event.call_id ?? "", + args: event.delta ?? "", + }); + } + break; + } + + case "response.function_call_arguments.done": { + const key = event.item_id ?? "unknown"; + const buf = this.toolCallBuffers.get(key); + if (buf && this.config.onToolCall) { + let args: unknown; + try { + args = JSON.parse(buf.args || "{}"); + } catch { + args = {}; + } + this.config.onToolCall({ + itemId: key, + callId: buf.callId || event.call_id || "", + name: buf.name || event.name || "", + args, + }); + } + this.toolCallBuffers.delete(key); + break; + } + + // ---- Response lifecycle ---- + case "response.created": + console.log("[RealtimeVoice] Response started"); + break; + + case "response.done": + console.log("[RealtimeVoice] Response done"); + // Reset mark queue and timestamps for next turn + this.markQueue = []; + this.responseStartTimestamp = null; + this.lastAssistantItemId = null; + break; + + case "response.content.done": + // Individual content part done + break; + + case "rate_limits.updated": + // Log rate limit info if needed + break; + + // ---- Errors ---- + case "error": { + const errMsg = event.error + ? `${(event.error as { message?: string }).message ?? JSON.stringify(event.error)}` + : "Unknown error"; + console.error(`[RealtimeVoice] Error event: ${errMsg}`); + this.config.onError?.(new Error(errMsg)); + break; + } + + default: + // Uncomment for debugging: + // console.log(`[RealtimeVoice] Unhandled event: ${event.type}`); + break; + } + } + + /** + * Handle barge-in: truncate the current assistant response at the + * elapsed audio point, clear the Twilio buffer, and reset state. + * Mirrors the reference implementation's handleSpeechStartedEvent(). + */ + private handleBargein(): void { + if (this.markQueue.length > 0 && this.responseStartTimestamp !== null) { + const elapsedMs = this.latestMediaTimestamp - this.responseStartTimestamp; + + if (this.lastAssistantItemId) { + // Tell OpenAI to truncate the response at the point where the user + // interrupted — this ensures the AI's context matches what was heard + const truncateEvent = { + type: "conversation.item.truncate", + item_id: this.lastAssistantItemId, + content_index: 0, + audio_end_ms: Math.max(0, elapsedMs), + }; + console.log(`[RealtimeVoice] Truncating at ${elapsedMs}ms`); + this.sendEvent(truncateEvent); + } + + // Clear the audio already queued in Twilio's buffer + this.config.onClearAudio(); + + // Reset state + this.markQueue = []; + this.lastAssistantItemId = null; + this.responseStartTimestamp = null; + } else { + // Even if we have no mark queue, still clear audio to be safe + this.config.onClearAudio(); + } + } + + /** + * Send a mark event to Twilio to track audio playback position. + * The mark name is used to coordinate barge-in truncation. + */ + private sendMark(): void { + // We don't send marks directly — the caller does via onAudio + // We track mark queue internally for truncation calculations + this.markQueue.push(`audio-${Date.now()}`); + } + + /** + * Handle Twilio mark acknowledgment (when a mark event comes back from Twilio). + * Call this method when you receive a "mark" event from the Twilio WebSocket. + */ + acknowledgeMark(): void { + if (this.markQueue.length > 0) { + this.markQueue.shift(); + } + } + + // ------------------------------------------------------------------------- + // Utilities + // ------------------------------------------------------------------------- + + private sendEvent(event: unknown): void { + if (this.ws?.readyState === WebSocket.OPEN) { + this.ws.send(JSON.stringify(event)); + } else { + console.warn("[RealtimeVoice] Attempted to send event while disconnected"); + } + } +} + +// --------------------------------------------------------------------------- +// Provider factory (matches pattern of OpenAIRealtimeSTTProvider) +// --------------------------------------------------------------------------- + +/** + * Configuration for the provider factory. + * Holds shared/default settings; per-call config is passed to createSession(). + * @internal Not used by the plugin's built-in realtime handler; exposed for external consumers. + */ +export interface RealtimeVoiceProviderConfig { + /** OpenAI API key */ + apiKey: string; + /** Default model (default: "gpt-4o-mini-realtime-preview") */ + model?: string; + /** Default voice (default: "alloy") */ + voice?: RealtimeVoice; + /** Default system instructions */ + instructions?: string; + /** Default temperature (default: 0.8) */ + temperature?: number; + /** Default VAD threshold (default: 0.5) */ + vadThreshold?: number; + /** Default silence duration ms (default: 500) */ + silenceDurationMs?: number; + /** Default tools */ + tools?: RealtimeTool[]; +} + +/** + * Factory for creating RealtimeVoiceBridge instances. + * Follows the same pattern as OpenAIRealtimeSTTProvider for easy swapping. + */ +export class OpenAIRealtimeVoiceProvider { + readonly name = "openai-realtime-voice" as const; + + constructor(private readonly defaults: RealtimeVoiceProviderConfig) { + if (!defaults.apiKey) { + throw new Error("[RealtimeVoiceProvider] OpenAI API key is required"); + } + } + + /** + * Create a new voice bridge for a single call session. + * Merges provided config with provider defaults. + */ + createBridge( + callConfig: Omit & Partial>, + ): OpenAIRealtimeVoiceBridge { + const merged: RealtimeVoiceConfig = { + apiKey: this.defaults.apiKey, + model: callConfig.model ?? this.defaults.model, + voice: callConfig.voice ?? this.defaults.voice, + instructions: callConfig.instructions ?? this.defaults.instructions, + temperature: callConfig.temperature ?? this.defaults.temperature, + vadThreshold: callConfig.vadThreshold ?? this.defaults.vadThreshold, + silenceDurationMs: callConfig.silenceDurationMs ?? this.defaults.silenceDurationMs, + tools: callConfig.tools ?? this.defaults.tools, + onAudio: callConfig.onAudio, + onClearAudio: callConfig.onClearAudio, + onTranscript: callConfig.onTranscript, + onToolCall: callConfig.onToolCall, + onReady: callConfig.onReady, + onError: callConfig.onError, + onClose: callConfig.onClose, + }; + return new OpenAIRealtimeVoiceBridge(merged); + } +} + +// --------------------------------------------------------------------------- +// MediaStreamHandler integration helper +// --------------------------------------------------------------------------- + +/** + * Minimal interface that the bridge integration needs from MediaStreamHandler. + * This matches the actual MediaStreamHandler's method signatures. + * @internal Not used by the plugin's built-in realtime handler; exposed for external consumers. + */ +export interface MediaStreamHandlerLike { + sendAudio(streamSid: string, muLaw: Buffer): void; + clearAudio(streamSid: string): void; + sendMark(streamSid: string, name: string): void; +} + +/** + * Create a RealtimeVoiceBridge wired to an existing MediaStreamHandler session. + * @internal Not used by the plugin's built-in realtime handler; exposed for external consumers. + * + * Drop-in helper for use inside media-stream.ts handleStart(): + * + * ```typescript + * // In handleStart(), instead of creating an STT session: + * const bridge = createBridgeForStream({ + * streamSid, + * handler: this, // MediaStreamHandler instance + * config: { + * apiKey: "...", + * instructions: "You are Gracie...", + * voice: "alloy", + * onTranscript: (role, text, final) => { + * if (final && role === "user") config.onTranscript?.(callId, text); + * }, + * }, + * }); + * await bridge.connect(); + * ``` + */ +export function createBridgeForStream(opts: { + streamSid: string; + handler: MediaStreamHandlerLike; + config: Omit; +}): OpenAIRealtimeVoiceBridge { + return new OpenAIRealtimeVoiceBridge({ + ...opts.config, + onAudio: (muLaw) => { + opts.handler.sendAudio(opts.streamSid, muLaw); + }, + onClearAudio: () => { + opts.handler.clearAudio(opts.streamSid); + }, + }); +} + +// --------------------------------------------------------------------------- +// Internal event shape types (partial — only fields we use) +// --------------------------------------------------------------------------- + +interface RealtimeEvent { + type: string; + delta?: string; + transcript?: string; + item_id?: string; + call_id?: string; + name?: string; + error?: unknown; +} + +interface RealtimeSessionUpdate { + type: "session.update"; + session: { + modalities: string[]; + instructions?: string; + voice: RealtimeVoice; + input_audio_format: string; + output_audio_format: string; + turn_detection: { + type: "server_vad"; + threshold: number; + prefix_padding_ms: number; + silence_duration_ms: number; + create_response: boolean; + }; + temperature: number; + input_audio_transcription?: { model: string }; + tools?: RealtimeTool[]; + tool_choice?: string; + }; +} diff --git a/extensions/voice-call/src/runtime.ts b/extensions/voice-call/src/runtime.ts index d725e44bf06..7da90634774 100644 --- a/extensions/voice-call/src/runtime.ts +++ b/extensions/voice-call/src/runtime.ts @@ -11,6 +11,7 @@ import type { TelephonyTtsRuntime } from "./telephony-tts.js"; import { createTelephonyTtsProvider } from "./telephony-tts.js"; import { startTunnel, type TunnelResult } from "./tunnel.js"; import { VoiceCallWebhookServer } from "./webhook.js"; +import { RealtimeCallHandler } from "./webhook/realtime-handler.js"; import { cleanupTailscaleExposure, setupTailscaleExposure } from "./webhook/tailscale.js"; export type VoiceCallRuntime = { @@ -20,6 +21,8 @@ export type VoiceCallRuntime = { webhookServer: VoiceCallWebhookServer; webhookUrl: string; publicUrl: string | null; + /** Realtime voice handler — present when config.realtime.enabled is true */ + realtimeHandler?: RealtimeCallHandler; stop: () => Promise; }; @@ -168,6 +171,20 @@ export async function createVoiceCallRuntime(params: { const webhookServer = new VoiceCallWebhookServer(config, manager, provider, coreConfig); const lifecycle = createRuntimeResourceLifecycle({ config, webhookServer }); + // Wire realtime handler before the server starts so it's ready for upgrades + let realtimeHandler: RealtimeCallHandler | undefined; + if (config.realtime.enabled) { + realtimeHandler = new RealtimeCallHandler( + config.realtime, + manager, + provider, + coreConfig, + config.streaming.openaiApiKey, + ); + webhookServer.setRealtimeHandler(realtimeHandler); + log.info("[voice-call] Realtime voice handler initialized"); + } + const localUrl = await webhookServer.start(); // Wrap remaining initialization in try/catch so the webhook server is @@ -252,6 +269,7 @@ export async function createVoiceCallRuntime(params: { webhookServer, webhookUrl, publicUrl, + realtimeHandler, stop, }; } catch (err) { diff --git a/extensions/voice-call/src/test-fixtures.ts b/extensions/voice-call/src/test-fixtures.ts index 594aa064ba5..f38126f858c 100644 --- a/extensions/voice-call/src/test-fixtures.ts +++ b/extensions/voice-call/src/test-fixtures.ts @@ -40,6 +40,7 @@ export function createVoiceCallBaseConfig(params?: { maxPendingConnectionsPerIp: 4, maxConnections: 128, }, + realtime: { enabled: false, tools: [] }, skipSignatureVerification: false, stt: { provider: "openai", model: "whisper-1" }, tts: { diff --git a/extensions/voice-call/src/webhook.ts b/extensions/voice-call/src/webhook.ts index 1258229735e..be3cce5f399 100644 --- a/extensions/voice-call/src/webhook.ts +++ b/extensions/voice-call/src/webhook.ts @@ -14,11 +14,12 @@ import type { VoiceCallProvider } from "./providers/base.js"; import { OpenAIRealtimeSTTProvider } from "./providers/stt-openai-realtime.js"; import type { TwilioProvider } from "./providers/twilio.js"; import type { NormalizedEvent, WebhookContext } from "./types.js"; +import type { RealtimeCallHandler } from "./webhook/realtime-handler.js"; import { startStaleCallReaper } from "./webhook/stale-call-reaper.js"; const MAX_WEBHOOK_BODY_BYTES = 1024 * 1024; -type WebhookResponsePayload = { +export type WebhookResponsePayload = { statusCode: number; body: string; headers?: Record; @@ -60,6 +61,9 @@ export class VoiceCallWebhookServer { /** Media stream handler for bidirectional audio (when streaming enabled) */ private mediaStreamHandler: MediaStreamHandler | null = null; + /** Realtime voice handler — present when config.realtime.enabled is true */ + private realtimeHandler: RealtimeCallHandler | null = null; + constructor( config: VoiceCallConfig, manager: CallManager, @@ -84,6 +88,13 @@ export class VoiceCallWebhookServer { return this.mediaStreamHandler; } + /** + * Wire the realtime call handler (called from runtime.ts before server starts). + */ + setRealtimeHandler(handler: RealtimeCallHandler): void { + this.realtimeHandler = handler; + } + /** * Initialize media streaming with OpenAI Realtime STT. */ @@ -229,9 +240,15 @@ export class VoiceCallWebhookServer { }); }); - // Handle WebSocket upgrades for media streams - if (this.mediaStreamHandler) { + // Handle WebSocket upgrades for realtime voice and media streams + if (this.realtimeHandler || this.mediaStreamHandler) { this.server.on("upgrade", (request, socket, head) => { + // Realtime voice takes precedence when the path matches + if (this.realtimeHandler && this.isRealtimeWebSocketUpgrade(request)) { + console.log("[voice-call] WebSocket upgrade for realtime voice"); + this.realtimeHandler.handleWebSocketUpgrade(request, socket, head); + return; + } const path = this.getUpgradePathname(request); if (path === streamPath) { console.log("[voice-call] WebSocket upgrade for media stream"); @@ -338,6 +355,14 @@ export class VoiceCallWebhookServer { this.writeWebhookResponse(res, payload); } + /** + * Returns true for WebSocket upgrade paths that belong to the realtime handler. + * Used only for upgrade routing — not for the inbound HTTP webhook POST. + */ + private isRealtimeWebSocketUpgrade(req: http.IncomingMessage): boolean { + return (req.url ?? "/").includes("/realtime"); + } + private async runWebhookPipeline( req: http.IncomingMessage, webhookPath: string, @@ -396,6 +421,18 @@ export class VoiceCallWebhookServer { return { statusCode: 401, body: "Unauthorized" }; } + // Realtime mode: return TwiML for inbound ringing calls only. + // Status callbacks (CallStatus=completed, outbound calls, etc.) must fall + // through to the normal webhook pipeline so call state is updated correctly. + if (this.realtimeHandler) { + const params = new URLSearchParams(ctx.rawBody); + const callStatus = params.get("CallStatus"); + const direction = params.get("Direction"); + if (callStatus === "ringing" && (!direction || direction === "inbound")) { + return this.realtimeHandler.buildTwiMLPayload(req, params); + } + } + const parsed = this.provider.parseWebhookEvent(ctx, { verifiedRequestKey: verification.verifiedRequestKey, }); diff --git a/extensions/voice-call/src/webhook/realtime-handler.test.ts b/extensions/voice-call/src/webhook/realtime-handler.test.ts new file mode 100644 index 00000000000..921fe757a6e --- /dev/null +++ b/extensions/voice-call/src/webhook/realtime-handler.test.ts @@ -0,0 +1,326 @@ +import http from "node:http"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import type { CallManager } from "../manager.js"; +import type { VoiceCallProvider } from "../providers/base.js"; +import type { CallRecord } from "../types.js"; +import { RealtimeCallHandler } from "./realtime-handler.js"; + +/** Extract the stream token from a TwiML body string. */ +function extractStreamToken(twiml: string): string | null { + const match = twiml.match(/\/voice\/stream\/realtime\/([^"&\s]+)/); + return match?.[1] ?? null; +} + +// Minimal realtime config used across tests +const baseRealtimeConfig = { + enabled: true, + voice: "ash" as const, + tools: [] as never[], +}; + +// Fake CallRecord for manager stubs +function makeCallRecord(overrides: Partial = {}): CallRecord { + return { + callId: "call-rt-1", + providerCallId: "CA_test", + provider: "twilio", + direction: "inbound", + state: "answered", + from: "+15550001234", + to: "+15550005678", + startedAt: Date.now(), + transcript: [], + processedEventIds: [], + metadata: { + initialMessage: "Hello! How can I help you today?", + }, + ...overrides, + }; +} + +function makeManager(record?: CallRecord): CallManager { + const storedRecord = record ?? makeCallRecord(); + return { + processEvent: vi.fn(), + getCallByProviderCallId: vi.fn(() => storedRecord), + getCall: vi.fn(() => storedRecord), + } as unknown as CallManager; +} + +function makeProvider(): VoiceCallProvider { + return { + name: "twilio", + verifyWebhook: vi.fn(() => ({ ok: true, verifiedRequestKey: "mock:key" })), + parseWebhookEvent: vi.fn(() => ({ events: [] })), + initiateCall: vi.fn(async () => ({ providerCallId: "CA_test", status: "initiated" as const })), + hangupCall: vi.fn(async () => {}), + playTts: vi.fn(async () => {}), + startListening: vi.fn(async () => {}), + stopListening: vi.fn(async () => {}), + getCallStatus: vi.fn(async () => ({ status: "in-progress" as const, isTerminal: false })), + }; +} + +function makeRequest(url: string, host = "example.ts.net"): http.IncomingMessage { + const req = new http.IncomingMessage(null as never); + req.url = url; + req.method = "POST"; + req.headers = { host }; + return req; +} + +describe("RealtimeCallHandler", () => { + let originalEnv: NodeJS.ProcessEnv; + + beforeEach(() => { + originalEnv = { ...process.env }; + }); + + afterEach(() => { + process.env = { ...originalEnv }; + }); + + // --------------------------------------------------------------------------- + // buildTwiMLPayload + // --------------------------------------------------------------------------- + + describe("buildTwiMLPayload", () => { + it("returns TwiML with wss URL derived from request host", () => { + const handler = new RealtimeCallHandler( + baseRealtimeConfig, + makeManager(), + makeProvider(), + null, + ); + const req = makeRequest("/voice/webhook", "gateway.ts.net"); + const payload = handler.buildTwiMLPayload(req); + + expect(payload.statusCode).toBe(200); + expect(payload.headers?.["Content-Type"]).toBe("text/xml"); + expect(payload.body).toContain(""); + expect(payload.body).toContain(" { + const handler = new RealtimeCallHandler( + baseRealtimeConfig, + makeManager(), + makeProvider(), + null, + ); + const req = makeRequest("/voice/webhook", ""); + const payload = handler.buildTwiMLPayload(req); + + expect(payload.body).toMatch( + /wss:\/\/localhost:8443\/voice\/stream\/realtime\/[0-9a-f-]{36}/, + ); + }); + + it("embeds a unique token on each call", () => { + const handler = new RealtimeCallHandler( + baseRealtimeConfig, + makeManager(), + makeProvider(), + null, + ); + const req = makeRequest("/voice/webhook", "host.example.com"); + const token1 = extractStreamToken(handler.buildTwiMLPayload(req).body); + const token2 = extractStreamToken(handler.buildTwiMLPayload(req).body); + expect(token1).toBeTruthy(); + expect(token2).toBeTruthy(); + expect(token1).not.toBe(token2); + }); + }); + + // --------------------------------------------------------------------------- + // Stream token (nonce) validation + // --------------------------------------------------------------------------- + + describe("stream token (nonce)", () => { + it("issueStreamToken + consumeStreamToken: valid token accepted once then rejected", () => { + const handler = new RealtimeCallHandler( + baseRealtimeConfig, + makeManager(), + makeProvider(), + null, + ); + const issue = (handler as unknown as { issueStreamToken: () => string }).issueStreamToken; + const consume = ( + handler as unknown as { + consumeStreamToken: (t: string) => { from?: string; to?: string } | null; + } + ).consumeStreamToken; + const token = issue.call(handler); + expect(consume.call(handler, token)).not.toBeNull(); + expect(consume.call(handler, token)).toBeNull(); + }); + + it("rejects unknown tokens", () => { + const handler = new RealtimeCallHandler( + baseRealtimeConfig, + makeManager(), + makeProvider(), + null, + ); + const consume = ( + handler as unknown as { + consumeStreamToken: (t: string) => { from?: string; to?: string } | null; + } + ).consumeStreamToken; + expect(consume.call(handler, "not-a-real-token")).toBeNull(); + }); + }); + + // --------------------------------------------------------------------------- + // registerCallInManager — greeting suppression + // --------------------------------------------------------------------------- + + describe("registerCallInManager (via handleCall)", () => { + it("clears metadata.initialMessage so the inboundGreeting TTS path is skipped", () => { + const callRecord = makeCallRecord({ + metadata: { initialMessage: "Hello from config!" }, + }); + const manager = makeManager(callRecord); + + const handler = new RealtimeCallHandler(baseRealtimeConfig, manager, makeProvider(), null); + + // Access private method via type assertion for unit testing + ( + handler as unknown as { registerCallInManager: (sid: string) => string } + ).registerCallInManager("CA_test"); + + // call.initiated + call.answered should both have been emitted + expect(vi.mocked(manager.processEvent)).toHaveBeenCalledTimes(2); + const eventTypes = vi + .mocked(manager.processEvent) + .mock.calls.map(([e]) => (e as { type: string }).type); + expect(eventTypes).toEqual(["call.initiated", "call.answered"]); + + // initialMessage must be cleared before call.answered fires + expect(callRecord.metadata?.initialMessage).toBeUndefined(); + }); + + it("returns callId from the manager-created call record", () => { + const callRecord = makeCallRecord({ callId: "manager-gen-id" }); + const manager = makeManager(callRecord); + + const handler = new RealtimeCallHandler(baseRealtimeConfig, manager, makeProvider(), null); + + const result = ( + handler as unknown as { registerCallInManager: (sid: string) => string } + ).registerCallInManager("CA_test"); + + expect(result).toBe("manager-gen-id"); + }); + + it("falls back to providerCallId when manager has no record", () => { + const manager = { + processEvent: vi.fn(), + getCallByProviderCallId: vi.fn(() => undefined), + } as unknown as CallManager; + + const handler = new RealtimeCallHandler(baseRealtimeConfig, manager, makeProvider(), null); + + const result = ( + handler as unknown as { registerCallInManager: (sid: string) => string } + ).registerCallInManager("CA_fallback"); + + expect(result).toBe("CA_fallback"); + }); + }); + + // --------------------------------------------------------------------------- + // Tool handler framework + // --------------------------------------------------------------------------- + + describe("registerToolHandler", () => { + it("routes tool calls to registered handlers and returns their result", async () => { + const handler = new RealtimeCallHandler( + baseRealtimeConfig, + makeManager(), + makeProvider(), + null, + ); + + handler.registerToolHandler("get_time", async () => ({ utc: "2026-03-10T00:00:00Z" })); + + const fakeSubmit = vi.fn(); + const fakeBridge = { submitToolResult: fakeSubmit } as never; + + await ( + handler as unknown as { + executeToolCall: ( + bridge: never, + callId: string, + bridgeCallId: string, + name: string, + args: unknown, + ) => Promise; + } + ).executeToolCall(fakeBridge, "call-1", "bridge-call-1", "get_time", {}); + + expect(fakeSubmit).toHaveBeenCalledWith("bridge-call-1", { utc: "2026-03-10T00:00:00Z" }); + }); + + it("returns an error result for unregistered tool names", async () => { + const handler = new RealtimeCallHandler( + baseRealtimeConfig, + makeManager(), + makeProvider(), + null, + ); + + const fakeSubmit = vi.fn(); + const fakeBridge = { submitToolResult: fakeSubmit } as never; + + await ( + handler as unknown as { + executeToolCall: ( + bridge: never, + callId: string, + bridgeCallId: string, + name: string, + args: unknown, + ) => Promise; + } + ).executeToolCall(fakeBridge, "call-1", "bridge-call-1", "unknown_tool", {}); + + expect(fakeSubmit).toHaveBeenCalledWith("bridge-call-1", { + error: 'Tool "unknown_tool" not available', + }); + }); + + it("returns an error result when a handler throws", async () => { + const handler = new RealtimeCallHandler( + baseRealtimeConfig, + makeManager(), + makeProvider(), + null, + ); + + handler.registerToolHandler("boom", async () => { + throw new Error("handler blew up"); + }); + + const fakeSubmit = vi.fn(); + const fakeBridge = { submitToolResult: fakeSubmit } as never; + + await ( + handler as unknown as { + executeToolCall: ( + bridge: never, + callId: string, + bridgeCallId: string, + name: string, + args: unknown, + ) => Promise; + } + ).executeToolCall(fakeBridge, "call-1", "bridge-call-1", "boom", {}); + + expect(fakeSubmit).toHaveBeenCalledWith("bridge-call-1", { error: "handler blew up" }); + }); + }); +}); diff --git a/extensions/voice-call/src/webhook/realtime-handler.ts b/extensions/voice-call/src/webhook/realtime-handler.ts new file mode 100644 index 00000000000..fc911bb2132 --- /dev/null +++ b/extensions/voice-call/src/webhook/realtime-handler.ts @@ -0,0 +1,383 @@ +import { randomUUID } from "node:crypto"; +import http from "node:http"; +import type { Duplex } from "node:stream"; +import WebSocket, { WebSocketServer } from "ws"; +import type { VoiceCallRealtimeConfig } from "../config.js"; +import type { CoreConfig } from "../core-bridge.js"; +import type { CallManager } from "../manager.js"; +import type { VoiceCallProvider } from "../providers/base.js"; +import { + OpenAIRealtimeVoiceBridge, + type RealtimeTool, +} from "../providers/openai-realtime-voice.js"; +import type { NormalizedEvent } from "../types.js"; +import type { WebhookResponsePayload } from "../webhook.js"; + +export type ToolHandlerFn = (args: unknown, callId: string) => Promise; + +/** + * Handles inbound voice calls bridged directly to the OpenAI Realtime API. + * + * Responsibilities: + * - Accept WebSocket upgrades from Twilio Media Streams at the /realtime path + * - Return TwiML payload for the initial HTTP webhook + * - Register each call with CallManager (appears in voice status/history) + * - Route tool calls to registered handlers (Phase 5 tool framework) + * + * Provider coupling: this class currently speaks the Twilio Media Streams + * WebSocket protocol directly (μ-law audio, streamSid/callSid, start/media/ + * mark/stop events) and emits Twilio TwiML. The OpenAI bridge itself is + * provider-agnostic. If a second provider needs realtime support, the right + * refactor is to extract a RealtimeMediaAdapter interface (buildStreamPayload + + * handleWebSocketUpgrade + MediaStreamCallbacks) so the bridge and call-manager + * wiring can be reused without duplicating the OpenAI session logic. + */ + +/** How long (ms) a stream token remains valid after TwiML is issued. */ +const STREAM_TOKEN_TTL_MS = 30_000; + +export class RealtimeCallHandler { + private toolHandlers = new Map(); + /** One-time tokens issued per TwiML response; consumed on WS upgrade. + * Stores expiry + caller metadata so registerCallInManager can include From/To. */ + private pendingStreamTokens = new Map(); + + constructor( + private config: VoiceCallRealtimeConfig, + private manager: CallManager, + private provider: VoiceCallProvider, + private coreConfig: CoreConfig | null, + /** Pre-resolved OpenAI API key (falls back to OPENAI_API_KEY env at call time) */ + private openaiApiKey?: string, + ) {} + + /** + * Handle a WebSocket upgrade request from Twilio for a realtime media stream. + * Called from VoiceCallWebhookServer's upgrade handler when isRealtimeWebSocketUpgrade() is true. + * + * Validates the one-time stream token embedded in the URL by buildTwiMLPayload before + * accepting the upgrade. This ensures the WS connection was preceded by a properly + * Twilio-signed POST webhook — the token is only issued after verifyWebhook passes. + */ + handleWebSocketUpgrade(request: http.IncomingMessage, socket: Duplex, head: Buffer): void { + const url = new URL(request.url ?? "/", "wss://localhost"); + // Token is embedded as the last path segment (e.g. /voice/stream/realtime/) + // to survive reverse proxies that strip query parameters (e.g. Tailscale Funnel). + const token = url.pathname.split("/").pop() ?? null; + const callerMeta = token ? this.consumeStreamToken(token) : null; + if (!callerMeta) { + console.warn("[voice-call] Rejecting WS upgrade: missing or invalid stream token"); + socket.write("HTTP/1.1 401 Unauthorized\r\n\r\n"); + socket.destroy(); + return; + } + + const wss = new WebSocketServer({ noServer: true }); + wss.handleUpgrade(request, socket, head, (ws) => { + let bridge: OpenAIRealtimeVoiceBridge | null = null; + let initialized = false; + + ws.on("message", (data: Buffer) => { + try { + const msg = JSON.parse(data.toString()) as Record; + if (!initialized && msg.event === "start") { + initialized = true; + const startData = msg.start as Record | undefined; + const streamSid = startData?.streamSid || "unknown"; + const callSid = startData?.callSid || "unknown"; + bridge = this.handleCall(streamSid, callSid, ws, callerMeta); + } else if (bridge) { + const mediaData = msg.media as Record | undefined; + if (msg.event === "media" && mediaData?.payload) { + bridge.sendAudio(Buffer.from(mediaData.payload as string, "base64")); + if (mediaData.timestamp) { + bridge.setMediaTimestamp(Number(mediaData.timestamp)); + } + } else if (msg.event === "mark") { + bridge.acknowledgeMark(); + } else if (msg.event === "stop") { + bridge.close(); + } + } + } catch (err) { + console.error("[voice-call] Error parsing WS message:", err); + } + }); + + ws.on("close", () => { + bridge?.close(); + }); + }); + } + + /** + * Build the TwiML response payload for a realtime call. + * The WebSocket URL is derived from the incoming request host so no hostname + * is hardcoded. A one-time stream token is embedded in the URL and validated + * by handleWebSocketUpgrade to prevent unauthenticated WS connections. + * + * @param params - Parsed Twilio webhook body params (From/To stored with nonce + * so registerCallInManager can populate caller fields). + */ + buildTwiMLPayload(req: http.IncomingMessage, params?: URLSearchParams): WebhookResponsePayload { + const host = req.headers.host || "localhost:8443"; + const token = this.issueStreamToken({ + from: params?.get("From") ?? undefined, + to: params?.get("To") ?? undefined, + }); + const wsUrl = `wss://${host}/voice/stream/realtime/${token}`; + console.log( + `[voice-call] Returning realtime TwiML with WebSocket: wss://${host}/voice/stream/realtime`, + ); + const twiml = ` + + + + +`; + return { + statusCode: 200, + headers: { "Content-Type": "text/xml" }, + body: twiml, + }; + } + + /** + * Register a named tool handler to be called when the model invokes a function. + * Must be called before calls begin. + * + * @param name - Function name as declared in config.realtime.tools + * @param fn - Async handler receiving (parsedArgs, internalCallId); return value + * is submitted back to the model as the tool result. + */ + registerToolHandler(name: string, fn: ToolHandlerFn): void { + this.toolHandlers.set(name, fn); + } + + // --------------------------------------------------------------------------- + // Private + // --------------------------------------------------------------------------- + + /** Generate a single-use stream token valid for STREAM_TOKEN_TTL_MS. */ + private issueStreamToken(meta: { from?: string; to?: string } = {}): string { + const token = randomUUID(); + this.pendingStreamTokens.set(token, { expiry: Date.now() + STREAM_TOKEN_TTL_MS, ...meta }); + // Evict expired tokens to prevent unbounded growth if calls are abandoned + for (const [t, entry] of this.pendingStreamTokens) { + if (Date.now() > entry.expiry) this.pendingStreamTokens.delete(t); + } + return token; + } + + /** Consume a stream token. Returns caller metadata if valid, null if not. */ + private consumeStreamToken(token: string): { from?: string; to?: string } | null { + const entry = this.pendingStreamTokens.get(token); + if (!entry) return null; + this.pendingStreamTokens.delete(token); + return Date.now() <= entry.expiry ? { from: entry.from, to: entry.to } : null; + } + + /** + * Create and start the OpenAI Realtime bridge for a single call session. + * Registers the call with CallManager so it appears in status/history. + * Returns the bridge (or null on fatal config error). + */ + private handleCall( + streamSid: string, + callSid: string, + ws: WebSocket, + callerMeta: { from?: string; to?: string }, + ): OpenAIRealtimeVoiceBridge | null { + const apiKey = this.openaiApiKey ?? process.env.OPENAI_API_KEY; + if (!apiKey) { + console.error( + "[voice-call] No OpenAI API key for realtime call (set streaming.openaiApiKey or OPENAI_API_KEY)", + ); + ws.close(1011, "No API key"); + return null; + } + + const callId = this.registerCallInManager(callSid, callerMeta); + console.log( + `[voice-call] Realtime call: streamSid=${streamSid}, callSid=${callSid}, callId=${callId}`, + ); + + // Declare as null first so closures can capture the reference before bridge is created. + // By the time any callback fires, bridge will be fully assigned. + let bridge: OpenAIRealtimeVoiceBridge | null = null; + + bridge = new OpenAIRealtimeVoiceBridge({ + apiKey, + model: this.config.model, + voice: this.config.voice, + instructions: this.config.instructions, + temperature: this.config.temperature, + vadThreshold: this.config.vadThreshold, + silenceDurationMs: this.config.silenceDurationMs, + prefixPaddingMs: this.config.prefixPaddingMs, + tools: this.config.tools as RealtimeTool[], + + onAudio: (muLaw) => { + ws.send( + JSON.stringify({ + event: "media", + streamSid, + media: { payload: muLaw.toString("base64") }, + }), + ); + }, + + onClearAudio: () => { + ws.send(JSON.stringify({ event: "clear", streamSid })); + }, + + onTranscript: (role, text, isFinal) => { + if (isFinal) { + if (role === "user") { + const event: NormalizedEvent = { + id: `realtime-speech-${callSid}-${Date.now()}`, + type: "call.speech", + callId, + providerCallId: callSid, + timestamp: Date.now(), + transcript: text, + isFinal: true, + }; + this.manager.processEvent(event); + } else if (role === "assistant") { + // Log assistant turns via call.speaking so they appear as speaker:"bot" + // in the transcript (same mechanism Telnyx uses for TTS speech). + this.manager.processEvent({ + id: `realtime-bot-${callSid}-${Date.now()}`, + type: "call.speaking", + callId, + providerCallId: callSid, + timestamp: Date.now(), + text, + }); + } + } + }, + + onToolCall: (toolEvent) => { + if (bridge) { + void this.executeToolCall( + bridge, + callId, + toolEvent.callId, + toolEvent.name, + toolEvent.args, + ); + } + }, + + onReady: () => { + bridge?.triggerGreeting(); + }, + + onError: (err) => { + console.error("[voice-call] Realtime error:", err.message); + }, + + onClose: () => { + this.endCallInManager(callSid, callId); + }, + }); + + bridge.connect().catch((err: Error) => { + console.error("[voice-call] Failed to connect realtime bridge:", err); + ws.close(1011, "Failed to connect"); + }); + + // Acknowledge the stream connection (mirrors Twilio Media Streams protocol) + ws.send(JSON.stringify({ event: "connected", protocol: "Call", version: "1.0.0" })); + + return bridge; + } + + /** + * Emit synthetic NormalizedEvents to register the call with CallManager. + * Returns the internal callId generated by the manager. + * + * Tested directly via `as unknown as` cast — the logic is non-trivial + * enough to warrant unit testing without promoting to a public method. + */ + private registerCallInManager( + callSid: string, + callerMeta: { from?: string; to?: string } = {}, + ): string { + const now = Date.now(); + const baseFields = { + providerCallId: callSid, + timestamp: now, + direction: "inbound" as const, + ...(callerMeta.from ? { from: callerMeta.from } : {}), + ...(callerMeta.to ? { to: callerMeta.to } : {}), + }; + + // call.initiated causes the manager to auto-create the call record + // (see manager/events.ts createWebhookCall path) + this.manager.processEvent({ + id: `realtime-initiated-${callSid}`, + callId: callSid, + type: "call.initiated", + ...baseFields, + }); + + // Clear inboundGreeting from the call record before call.answered fires. + // The realtime bridge owns all voice output; the TTS greeting path would + // fail anyway because provider state is never initialized for realtime calls. + const callRecord = this.manager.getCallByProviderCallId(callSid); + if (callRecord?.metadata) { + delete callRecord.metadata.initialMessage; + } + + this.manager.processEvent({ + id: `realtime-answered-${callSid}`, + callId: callSid, + type: "call.answered", + ...baseFields, + }); + + return callRecord?.callId ?? callSid; + } + + private endCallInManager(callSid: string, callId: string): void { + this.manager.processEvent({ + id: `realtime-ended-${callSid}-${Date.now()}`, + type: "call.ended", + callId, + providerCallId: callSid, + timestamp: Date.now(), + reason: "completed", + }); + } + + /** + * Dispatch a tool call from the Realtime API to the registered handler. + * Submits the result (or an error object) back to the bridge. + * + * Tested directly via `as unknown as` cast — the routing/error logic is + * worth unit testing without exposing the method publicly. + */ + private async executeToolCall( + bridge: OpenAIRealtimeVoiceBridge, + callId: string, + bridgeCallId: string, + name: string, + args: unknown, + ): Promise { + const handler = this.toolHandlers.get(name); + let result: unknown; + if (handler) { + try { + result = await handler(args, callId); + } catch (err) { + result = { error: err instanceof Error ? err.message : String(err) }; + } + } else { + result = { error: `Tool "${name}" not available` }; + } + bridge.submitToolResult(bridgeCallId, result); + } +}