This commit is contained in:
Forrest Blount 2026-03-15 22:30:30 +00:00 committed by GitHub
commit 83d0274f0d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 2027 additions and 4 deletions

View File

@ -88,7 +88,7 @@ RUN pnpm canvas:a2ui:bundle || \
echo "/* A2UI bundle unavailable in this build */" > src/canvas-host/a2ui/a2ui.bundle.js && \
echo "stub" > src/canvas-host/a2ui/.bundle.hash && \
rm -rf vendor/a2ui apps/shared/OpenClawKit/Tools/CanvasA2UI)
RUN pnpm build:docker
RUN NODE_OPTIONS=--max-old-space-size=2048 pnpm build:docker
# Force pnpm for UI build (Bun may fail on ARM/Synology architectures)
ENV OPENCLAW_PREFER_PNPM=1
RUN pnpm ui:build

View File

@ -24,6 +24,9 @@ services:
ports:
- "${OPENCLAW_GATEWAY_PORT:-18789}:18789"
- "${OPENCLAW_BRIDGE_PORT:-18790}:18790"
## Uncomment to expose the voice webhook port (required when voice-call plugin
## serve.bind is set to 0.0.0.0 for direct inbound webhook access):
# - "${OPENCLAW_VOICE_WEBHOOK_PORT:-3334}:3334"
init: true
restart: unless-stopped
command:

View File

@ -134,6 +134,63 @@ Actions:
- `voicecall.end` (callId)
- `voicecall.status` (callId)
## Realtime voice mode (OpenAI Realtime API)
Realtime mode routes inbound calls directly to the [OpenAI Realtime API](https://platform.openai.com/docs/guides/realtime) for voice-to-voice conversation (~200400 ms latency vs ~23 s for the STT/TTS pipeline). It is disabled by default and mutually exclusive with `streaming.enabled`.
### Requirements
- `OPENAI_API_KEY` set in your environment (or `streaming.openaiApiKey` in config).
- A **publicly reachable HTTPS endpoint with WebSocket support** — the webhook server must accept both POST requests (Twilio webhook) and WebSocket upgrades (Twilio Media Stream). A plain HTTP tunnel is not sufficient; Twilio requires WSS.
- `inboundPolicy` set to `"open"` or `"allowlist"` (not `"disabled"`) so the plugin accepts inbound calls.
### Config
```json5
{
inboundPolicy: "open", // required: realtime needs inbound calls enabled
realtime: {
enabled: true,
voice: "alloy", // Realtime API voices: alloy, ash, ballad, cedar, coral,
// echo, marin, sage, shimmer, verse
instructions: "You are a helpful assistant.",
model: "gpt-4o-mini-realtime-preview", // optional, this is the default
temperature: 0.8, // 02, optional
vadThreshold: 0.5, // voice activity detection sensitivity, 01, optional
silenceDurationMs: 500, // ms of silence before end-of-turn, optional
},
}
```
### Environment variable overrides
All `realtime.*` fields can be set via environment variables (config takes precedence):
| Env var | Config field |
| ----------------------------- | ---------------------------- |
| `REALTIME_VOICE_ENABLED=true` | `realtime.enabled` |
| `REALTIME_VOICE_MODEL` | `realtime.model` |
| `REALTIME_VOICE_VOICE` | `realtime.voice` |
| `REALTIME_VOICE_INSTRUCTIONS` | `realtime.instructions` |
| `REALTIME_VOICE_TEMPERATURE` | `realtime.temperature` |
| `VAD_THRESHOLD` | `realtime.vadThreshold` |
| `SILENCE_DURATION_MS` | `realtime.silenceDurationMs` |
### How it works
1. Twilio sends a POST webhook to `serve.path` (default `/voice/webhook`).
2. The plugin responds with TwiML `<Connect><Stream>` pointing to `wss://<host>/voice/stream/realtime`.
3. Twilio opens a WebSocket to that path carrying the caller's audio in μ-law format.
4. The plugin bridges the WebSocket to the OpenAI Realtime API — audio flows in both directions in real time.
5. The call is registered with CallManager and appears in `openclaw voice status` / `openclaw voice history`.
### Networking notes
- `serve.bind` defaults to `127.0.0.1`. If running inside Docker with an external port mapping, set `serve.bind: "0.0.0.0"` so the container's port is reachable from the host.
- The WebSocket upgrade path (`/voice/stream/realtime`) must be reachable on the same host and port as the webhook. Reverse proxies must pass `Upgrade: websocket` headers through.
- When using Tailscale Funnel on the host (outside Docker), configure Funnel to route `/voice/` to the plugin's local port. The gateway itself does not need to be exposed via Tailscale Funnel.
## Notes
- Uses webhook signature verification for Twilio/Telnyx/Plivo.

View File

@ -134,6 +134,36 @@
"label": "ElevenLabs Base URL",
"advanced": true
},
"realtime.enabled": {
"label": "Enable Realtime Voice (OpenAI)",
"help": "Full voice-to-voice mode via OpenAI Realtime API. Requires inboundPolicy open or allowlist."
},
"realtime.model": {
"label": "Realtime Model",
"advanced": true
},
"realtime.voice": {
"label": "Realtime Voice"
},
"realtime.instructions": {
"label": "Realtime Instructions"
},
"realtime.temperature": {
"label": "Realtime Temperature",
"advanced": true
},
"realtime.vadThreshold": {
"label": "VAD Sensitivity",
"advanced": true
},
"realtime.silenceDurationMs": {
"label": "Silence Timeout (ms)",
"advanced": true
},
"realtime.prefixPaddingMs": {
"label": "Prefix Padding (ms)",
"advanced": true
},
"publicUrl": {
"label": "Public Webhook URL",
"advanced": true
@ -385,6 +415,60 @@
}
}
},
"realtime": {
"type": "object",
"additionalProperties": false,
"properties": {
"enabled": {
"type": "boolean"
},
"model": {
"type": "string"
},
"voice": {
"type": "string",
"enum": [
"alloy",
"ash",
"ballad",
"cedar",
"coral",
"echo",
"marin",
"sage",
"shimmer",
"verse"
]
},
"instructions": {
"type": "string"
},
"temperature": {
"type": "number",
"minimum": 0,
"maximum": 2
},
"vadThreshold": {
"type": "number",
"minimum": 0,
"maximum": 1
},
"silenceDurationMs": {
"type": "integer",
"minimum": 1
},
"prefixPaddingMs": {
"type": "integer",
"minimum": 0
},
"tools": {
"type": "array",
"items": {
"type": "object"
}
}
}
},
"publicUrl": {
"type": "string"
},

View File

@ -3,6 +3,7 @@ import {
validateProviderConfig,
normalizeVoiceCallConfig,
resolveVoiceCallConfig,
VoiceCallRealtimeConfigSchema,
type VoiceCallConfig,
} from "./config.js";
import { createVoiceCallBaseConfig } from "./test-fixtures.js";
@ -216,3 +217,126 @@ describe("normalizeVoiceCallConfig", () => {
expect(normalized.tts?.elevenlabs?.voiceSettings).toEqual({ speed: 1.1 });
});
});
describe("VoiceCallRealtimeConfigSchema", () => {
it("defaults to disabled with empty tools array", () => {
const config = VoiceCallRealtimeConfigSchema.parse({});
expect(config.enabled).toBe(false);
expect(config.tools).toEqual([]);
});
it("accepts all valid Realtime API voice names", () => {
const voices = [
"alloy",
"ash",
"ballad",
"cedar",
"coral",
"echo",
"marin",
"sage",
"shimmer",
"verse",
];
for (const voice of voices) {
expect(() => VoiceCallRealtimeConfigSchema.parse({ voice })).not.toThrow();
}
});
it("rejects voice names that are not in the Realtime API (e.g. nova, fable, onyx)", () => {
for (const voice of ["nova", "fable", "onyx"]) {
expect(() => VoiceCallRealtimeConfigSchema.parse({ voice })).toThrow();
}
});
it("normalizeVoiceCallConfig propagates realtime sub-config", () => {
const normalized = normalizeVoiceCallConfig({
enabled: true,
provider: "mock",
realtime: { enabled: true, voice: "marin", instructions: "Be helpful." },
});
expect(normalized.realtime.enabled).toBe(true);
expect(normalized.realtime.voice).toBe("marin");
expect(normalized.realtime.instructions).toBe("Be helpful.");
expect(normalized.realtime.tools).toEqual([]);
});
});
describe("resolveVoiceCallConfig — realtime env vars", () => {
const originalEnv = { ...process.env };
afterEach(() => {
process.env = { ...originalEnv };
});
it("auto-enables realtime from REALTIME_VOICE_ENABLED=true", () => {
process.env.REALTIME_VOICE_ENABLED = "true";
const resolved = resolveVoiceCallConfig(createVoiceCallBaseConfig());
expect(resolved.realtime.enabled).toBe(true);
});
it("does not auto-enable when REALTIME_VOICE_ENABLED is absent or not 'true'", () => {
delete process.env.REALTIME_VOICE_ENABLED;
expect(resolveVoiceCallConfig(createVoiceCallBaseConfig()).realtime.enabled).toBe(false);
process.env.REALTIME_VOICE_ENABLED = "false";
expect(resolveVoiceCallConfig(createVoiceCallBaseConfig()).realtime.enabled).toBe(false);
});
it("resolves model, voice, instructions, temperature from env vars", () => {
process.env.REALTIME_VOICE_MODEL = "gpt-4o-realtime-preview";
process.env.REALTIME_VOICE_VOICE = "ash";
process.env.REALTIME_VOICE_INSTRUCTIONS = "You are helpful.";
process.env.REALTIME_VOICE_TEMPERATURE = "0.8";
const resolved = resolveVoiceCallConfig(createVoiceCallBaseConfig());
expect(resolved.realtime.model).toBe("gpt-4o-realtime-preview");
expect(resolved.realtime.voice).toBe("ash");
expect(resolved.realtime.instructions).toBe("You are helpful.");
expect(resolved.realtime.temperature).toBeCloseTo(0.8);
});
it("resolves vadThreshold and silenceDurationMs from env vars", () => {
process.env.VAD_THRESHOLD = "0.7";
process.env.SILENCE_DURATION_MS = "1200";
const resolved = resolveVoiceCallConfig(createVoiceCallBaseConfig());
expect(resolved.realtime.vadThreshold).toBeCloseTo(0.7);
expect(resolved.realtime.silenceDurationMs).toBe(1200);
});
it("config values take precedence over env vars", () => {
process.env.REALTIME_VOICE_VOICE = "ash";
const base = createVoiceCallBaseConfig();
base.realtime = { enabled: false, voice: "coral", tools: [] };
const resolved = resolveVoiceCallConfig(base);
expect(resolved.realtime.voice).toBe("coral");
});
});
describe("validateProviderConfig — realtime mode", () => {
it("rejects realtime.enabled when inboundPolicy is 'disabled'", () => {
const config = createVoiceCallBaseConfig({ provider: "mock" });
config.realtime = { enabled: true, tools: [] };
// inboundPolicy defaults to "disabled" in createVoiceCallBaseConfig
const result = validateProviderConfig(config);
expect(result.valid).toBe(false);
expect(result.errors.some((e) => e.includes("inboundPolicy"))).toBe(true);
});
it("passes when realtime.enabled with inboundPolicy 'open'", () => {
const config = createVoiceCallBaseConfig({ provider: "mock" });
config.inboundPolicy = "open";
config.realtime = { enabled: true, tools: [] };
const result = validateProviderConfig(config);
expect(result.errors.some((e) => e.includes("inboundPolicy"))).toBe(false);
});
it("rejects when both realtime.enabled and streaming.enabled are true", () => {
const config = createVoiceCallBaseConfig({ provider: "mock" });
config.inboundPolicy = "open";
config.realtime = { enabled: true, tools: [] };
config.streaming = { ...config.streaming, enabled: true };
const result = validateProviderConfig(config);
expect(result.valid).toBe(false);
expect(result.errors.some((e) => e.includes("streaming"))).toBe(true);
});
});

View File

@ -200,6 +200,66 @@ export const OutboundConfigSchema = z
.default({ defaultMode: "notify", notifyHangupDelaySec: 3 });
export type OutboundConfig = z.infer<typeof OutboundConfigSchema>;
// -----------------------------------------------------------------------------
// Realtime Voice Configuration (OpenAI Realtime API — voice-to-voice)
// -----------------------------------------------------------------------------
/**
* Zod schema for a single OpenAI Realtime tool (mirrors the RealtimeTool interface
* in providers/openai-realtime-voice.ts, expressed as a schema for config parsing).
*/
export const RealtimeToolSchema = z
.object({
type: z.literal("function"),
name: z.string(),
description: z.string(),
parameters: z.object({
type: z.literal("object"),
properties: z.record(z.string(), z.unknown()),
required: z.array(z.string()).optional(),
}),
})
.strict();
export type RealtimeToolConfig = z.infer<typeof RealtimeToolSchema>;
export const VoiceCallRealtimeConfigSchema = z
.object({
/** Enable realtime voice-to-voice mode (OpenAI Realtime API). Default: false. */
enabled: z.boolean().default(false),
/** Realtime model (env: REALTIME_VOICE_MODEL, default: "gpt-4o-mini-realtime-preview") */
model: z.string().optional(),
/** Voice for AI speech output (env: REALTIME_VOICE_VOICE) */
voice: z
.enum([
"alloy",
"ash",
"ballad",
"cedar",
"coral",
"echo",
"marin",
"sage",
"shimmer",
"verse",
])
.optional(),
/** System instructions / persona (env: REALTIME_VOICE_INSTRUCTIONS) */
instructions: z.string().optional(),
/** Temperature 02 (env: REALTIME_VOICE_TEMPERATURE) */
temperature: z.number().min(0).max(2).optional(),
/** VAD threshold 01 (env: VAD_THRESHOLD) */
vadThreshold: z.number().min(0).max(1).optional(),
/** Silence duration in ms before turn ends (env: SILENCE_DURATION_MS) */
silenceDurationMs: z.number().int().positive().optional(),
/** Audio padding before speech in ms */
prefixPaddingMs: z.number().int().nonnegative().optional(),
/** Tool definitions (OpenAI function-call schema); execution wired via registerToolHandler */
tools: z.array(RealtimeToolSchema).default([]),
})
.strict()
.default({ enabled: false, tools: [] });
export type VoiceCallRealtimeConfig = z.infer<typeof VoiceCallRealtimeConfigSchema>;
// -----------------------------------------------------------------------------
// Streaming Configuration (OpenAI Realtime STT)
// -----------------------------------------------------------------------------
@ -324,6 +384,9 @@ export const VoiceCallConfigSchema = z
/** Real-time audio streaming configuration */
streaming: VoiceCallStreamingConfigSchema,
/** Realtime voice-to-voice configuration (OpenAI Realtime API) */
realtime: VoiceCallRealtimeConfigSchema,
/** Public webhook URL override (if set, bypasses tunnel auto-detection) */
publicUrl: z.string().url().optional(),
@ -398,6 +461,14 @@ export function normalizeVoiceCallConfig(config: VoiceCallConfigInput): VoiceCal
config.webhookSecurity?.trustedProxyIPs ?? defaults.webhookSecurity.trustedProxyIPs,
},
streaming: { ...defaults.streaming, ...config.streaming },
realtime: {
...defaults.realtime,
...config.realtime,
// Cast: DeepPartial makes tool fields appear optional in the input type,
// but Zod validates the full shape before it reaches here.
tools:
(config.realtime?.tools as RealtimeToolConfig[] | undefined) ?? defaults.realtime.tools,
},
stt: { ...defaults.stt, ...config.stt },
tts: normalizeVoiceCallTtsConfig(defaults.tts, config.tts),
};
@ -453,6 +524,29 @@ export function resolveVoiceCallConfig(config: VoiceCallConfigInput): VoiceCallC
resolved.webhookSecurity.trustForwardingHeaders ?? false;
resolved.webhookSecurity.trustedProxyIPs = resolved.webhookSecurity.trustedProxyIPs ?? [];
// Realtime voice — resolve env var fallbacks
resolved.realtime = { ...resolved.realtime };
// REALTIME_VOICE_ENABLED=true auto-enables realtime mode (backward compat)
if (!resolved.realtime.enabled && process.env.REALTIME_VOICE_ENABLED === "true") {
resolved.realtime.enabled = true;
}
resolved.realtime.model = resolved.realtime.model ?? process.env.REALTIME_VOICE_MODEL;
resolved.realtime.voice =
(resolved.realtime.voice ??
(process.env.REALTIME_VOICE_VOICE as VoiceCallRealtimeConfig["voice"])) ||
undefined;
resolved.realtime.instructions =
resolved.realtime.instructions ?? process.env.REALTIME_VOICE_INSTRUCTIONS;
if (resolved.realtime.temperature == null && process.env.REALTIME_VOICE_TEMPERATURE) {
resolved.realtime.temperature = parseFloat(process.env.REALTIME_VOICE_TEMPERATURE);
}
if (resolved.realtime.vadThreshold == null && process.env.VAD_THRESHOLD) {
resolved.realtime.vadThreshold = parseFloat(process.env.VAD_THRESHOLD);
}
if (resolved.realtime.silenceDurationMs == null && process.env.SILENCE_DURATION_MS) {
resolved.realtime.silenceDurationMs = parseInt(process.env.SILENCE_DURATION_MS, 10);
}
return normalizeVoiceCallConfig(resolved);
}
@ -521,5 +615,24 @@ export function validateProviderConfig(config: VoiceCallConfig): {
}
}
// Realtime mode requires inbound calls to be accepted — policy "disabled"
// means the manager will reject every call before it can be tracked.
// "open" or "allowlist" are the correct choices when realtime.enabled = true.
if (config.realtime?.enabled && config.inboundPolicy === "disabled") {
errors.push(
'plugins.entries.voice-call.config.inboundPolicy must not be "disabled" when realtime.enabled is true ' +
'(use "open" or "allowlist" — realtime calls are answered before policy can reject them)',
);
}
// Both streaming and realtime cannot be enabled simultaneously — they use
// incompatible WebSocket paths and audio routing.
if (config.realtime?.enabled && config.streaming?.enabled) {
errors.push(
"plugins.entries.voice-call.config: realtime.enabled and streaming.enabled cannot both be true " +
"(they use incompatible audio paths — choose one mode)",
);
}
return { valid: errors.length === 0, errors };
}

View File

@ -204,6 +204,9 @@ export function processEvent(ctx: EventContext, event: NormalizedEvent): void {
break;
case "call.speaking":
if (event.text) {
addTranscriptEntry(call, "bot", event.text);
}
transitionState(call, "speaking");
break;

View File

@ -0,0 +1,874 @@
/**
* OpenAI Realtime Voice Bridge
*
* Implements a bidirectional voice-to-voice bridge between Twilio Media Streams
* and the OpenAI Realtime API. Replaces the STT LLM TTS pipeline with a
* single WebSocket session that handles everything natively.
*
* Key benefits over the STT-only approach:
* - Latency: ~200400 ms TTFB vs ~13.5 s in the pipeline mode
* - Audio format: g711_ulaw (mulaw) is natively supported zero conversion
* - Barge-in: server VAD handles interruptions automatically
* - No separate LLM or TTS call required
*
* Usage:
* const bridge = new OpenAIRealtimeVoiceBridge({
* apiKey: process.env.OPENAI_API_KEY!,
* instructions: "You are Gracie, a helpful AI assistant...",
* voice: "alloy",
* onAudio: (muLaw) => mediaStreamHandler.sendAudio(streamSid, muLaw),
* onClearAudio: () => mediaStreamHandler.clearAudio(streamSid),
* onTranscript: (role, text) => console.log(`[${role}]: ${text}`),
* });
* await bridge.connect();
* bridge.sendAudio(twilioPayloadBuffer);
* bridge.close();
*
* Integration with media-stream.ts:
* Replace `sttSession` with this bridge in `MediaStreamHandler.handleStart()`.
* Wire audio in/out through the existing sendAudio / clearAudio methods.
*
* @see https://platform.openai.com/docs/guides/realtime
* @see https://www.twilio.com/docs/voice/media-streams
*/
import WebSocket from "ws";
// ---------------------------------------------------------------------------
// Types
// ---------------------------------------------------------------------------
/** OpenAI Realtime API voice options.
* NOTE: These differ from the TTS-1 voices nova/fable/onyx are NOT supported here.
* Source: live API error "Supported values are: alloy, ash, ballad, cedar, coral, echo, marin, sage, shimmer, verse"
*/
export type RealtimeVoice =
| "alloy"
| "ash"
| "ballad"
| "cedar"
| "coral"
| "echo"
| "marin"
| "sage"
| "shimmer"
| "verse";
/** Realtime tool definition (mirrors OpenAI function calling schema) */
export interface RealtimeTool {
type: "function";
name: string;
description: string;
parameters: {
type: "object";
properties: Record<string, unknown>;
required?: string[];
};
}
/** Tool call event emitted when OpenAI invokes a function */
export interface ToolCallEvent {
/** Conversation item ID for submitting the result */
itemId: string;
/** Call ID for matching request/response */
callId: string;
/** Function name */
name: string;
/** Parsed JSON arguments */
args: unknown;
}
/**
* Configuration for the Realtime Voice Bridge.
*/
export interface RealtimeVoiceConfig {
// ---- Required ----
/** OpenAI API key */
apiKey: string;
// ---- Voice/personality ----
/** System instructions (persona / behaviour) */
instructions?: string;
/** Voice to use for AI speech output (default: "alloy") */
voice?: RealtimeVoice;
/** Response temperature 01 (default: 0.8) */
temperature?: number;
// ---- Model ----
/** Realtime model (default: "gpt-4o-mini-realtime-preview") */
model?: string;
// ---- VAD ----
/** VAD speech detection threshold 01 (default: 0.5) */
vadThreshold?: number;
/** Silence duration in ms before turn ends (default: 500) */
silenceDurationMs?: number;
/** Padding before speech in ms (default: 300) */
prefixPaddingMs?: number;
// ---- Tools ----
/** Optional function tools the model can call */
tools?: RealtimeTool[];
// ---- Audio callbacks ----
/**
* Called for each audio delta chunk from OpenAI.
* @param muLaw - Raw mulaw Buffer ready to send to Twilio
*/
onAudio: (muLaw: Buffer) => void;
/**
* Called on barge-in (user speech detected mid-response).
* Clear Twilio audio buffer here.
*/
onClearAudio: () => void;
// ---- Event callbacks (optional) ----
/**
* Transcript event (partial or final). Role is "user" or "assistant".
*/
onTranscript?: (role: "user" | "assistant", text: string, isFinal: boolean) => void;
/**
* Called when the model invokes a tool/function.
* Your handler should call `bridge.submitToolResult(event.callId, result)`.
*/
onToolCall?: (event: ToolCallEvent) => void;
/**
* Called when the session is fully connected and configured.
*/
onReady?: () => void;
/**
* Called on irrecoverable error or max reconnects exceeded.
*/
onError?: (error: Error) => void;
/**
* Called when the bridge closes (intentionally or after max retries).
*/
onClose?: () => void;
}
// ---------------------------------------------------------------------------
// Internal helpers
// ---------------------------------------------------------------------------
function base64ToBuffer(b64: string): Buffer {
return Buffer.from(b64, "base64");
}
// ---------------------------------------------------------------------------
// Main class
// ---------------------------------------------------------------------------
/**
* Bidirectional voice bridge between Twilio Media Streams and the OpenAI Realtime API.
*
* Lifecycle:
* new OpenAIRealtimeVoiceBridge(config)
* connect() opens WebSocket, configures session
* sendAudio() called for each Twilio media chunk
* [callbacks fire as OpenAI responds]
* close() graceful shutdown
*/
export class OpenAIRealtimeVoiceBridge {
private static readonly DEFAULT_MODEL = "gpt-4o-mini-realtime-preview";
private static readonly MAX_RECONNECT_ATTEMPTS = 5;
private static readonly BASE_RECONNECT_DELAY_MS = 1000;
private static readonly CONNECT_TIMEOUT_MS = 10_000;
private readonly config: RealtimeVoiceConfig;
private readonly model: string;
private ws: WebSocket | null = null;
private connected = false;
private intentionallyClosed = false;
private reconnectAttempts = 0;
/** Pending audio buffers queued while reconnecting */
private pendingAudio: Buffer[] = [];
/** Track mark queue for barge-in timing (mirrors reference impl) */
private markQueue: string[] = [];
private responseStartTimestamp: number | null = null;
private latestMediaTimestamp = 0;
private lastAssistantItemId: string | null = null;
/** Accumulate tool call arguments (streamed as deltas) */
private toolCallBuffers = new Map<string, { name: string; callId: string; args: string }>();
/** Guards onReady/greeting so it fires only on the first session, not reconnects */
private sessionReadyFired = false;
constructor(config: RealtimeVoiceConfig) {
if (!config.apiKey) {
throw new Error("[RealtimeVoice] OpenAI API key is required");
}
this.config = config;
this.model = config.model ?? OpenAIRealtimeVoiceBridge.DEFAULT_MODEL;
}
// -------------------------------------------------------------------------
// Public API
// -------------------------------------------------------------------------
/**
* Connect to the OpenAI Realtime API.
* Resolves when the WebSocket is open and session.update has been sent.
* Throws if connection times out.
*/
async connect(): Promise<void> {
this.intentionallyClosed = false;
this.reconnectAttempts = 0;
return this.doConnect();
}
/**
* Send a mulaw audio chunk from Twilio to OpenAI.
* Buffers chunks if not yet connected; drains on reconnect.
*
* @param audio - Raw mulaw Buffer (Twilio media.payload decoded from base64)
*/
sendAudio(audio: Buffer): void {
if (!this.connected || this.ws?.readyState !== WebSocket.OPEN) {
// Buffer up to 2 seconds of audio (~320 chunks × 20ms) while reconnecting
if (this.pendingAudio.length < 320) {
this.pendingAudio.push(audio);
}
return;
}
this.sendEvent({
type: "input_audio_buffer.append",
audio: audio.toString("base64"),
});
}
/**
* Update the media timestamp (used for barge-in truncation calculations).
* Call this with data.media.timestamp from each Twilio media event.
*/
setMediaTimestamp(ts: number): void {
this.latestMediaTimestamp = ts;
}
/**
* Inject a user text message into the conversation (optional).
* Useful for seeding context or simulating a greeting trigger.
*/
sendUserMessage(text: string): void {
this.sendEvent({
type: "conversation.item.create",
item: {
type: "message",
role: "user",
content: [{ type: "input_text", text }],
},
});
this.sendEvent({ type: "response.create" });
}
/**
* Submit a tool/function result back to the model.
* Must be called in response to an `onToolCall` event.
*
* @param callId - The call_id from the ToolCallEvent
* @param result - JSON-serializable result value
*/
submitToolResult(callId: string, result: unknown): void {
this.sendEvent({
type: "conversation.item.create",
item: {
type: "function_call_output",
call_id: callId,
output: JSON.stringify(result),
},
});
// Trigger AI to respond now that the tool result is available
this.sendEvent({ type: "response.create" });
}
/**
* Gracefully close the bridge.
*/
close(): void {
this.intentionallyClosed = true;
this.connected = false;
if (this.ws) {
this.ws.close(1000, "Bridge closed");
this.ws = null;
}
// onClose fires from the ws "close" event handler (intentionallyClosed branch)
// to avoid double-firing on explicit close().
}
/** True if the WebSocket is open and the session is configured. */
isConnected(): boolean {
return this.connected;
}
// -------------------------------------------------------------------------
// Connection management
// -------------------------------------------------------------------------
private async doConnect(): Promise<void> {
return new Promise((resolve, reject) => {
const url = `wss://api.openai.com/v1/realtime?model=${encodeURIComponent(this.model)}`;
console.log(`[RealtimeVoice] Connecting to ${url}`);
this.ws = new WebSocket(url, {
headers: {
Authorization: `Bearer ${this.config.apiKey}`,
"OpenAI-Beta": "realtime=v1",
},
});
const connectTimeout = setTimeout(() => {
if (!this.connected) {
this.ws?.terminate();
reject(new Error("[RealtimeVoice] Connection timeout"));
}
}, OpenAIRealtimeVoiceBridge.CONNECT_TIMEOUT_MS);
this.ws.on("open", () => {
clearTimeout(connectTimeout);
console.log("[RealtimeVoice] WebSocket connected");
this.connected = true;
this.reconnectAttempts = 0;
// Send session config immediately — no need to wait; the server
// confirms receipt via session.created which triggers drain + onReady.
this.sendSessionUpdate();
resolve();
});
this.ws.on("message", (data: Buffer) => {
try {
const event = JSON.parse(data.toString()) as RealtimeEvent;
this.handleEvent(event);
} catch (err) {
console.error("[RealtimeVoice] Failed to parse event:", err);
}
});
this.ws.on("error", (err) => {
console.error("[RealtimeVoice] WebSocket error:", err);
if (!this.connected) {
clearTimeout(connectTimeout);
reject(err);
} else {
this.config.onError?.(err instanceof Error ? err : new Error(String(err)));
}
});
this.ws.on("close", (code, reason) => {
console.log(
`[RealtimeVoice] WebSocket closed (code: ${code}, reason: ${reason?.toString() || "none"})`,
);
this.connected = false;
this.ws = null;
if (!this.intentionallyClosed) {
void this.attemptReconnect();
} else {
this.config.onClose?.();
}
});
});
}
/**
* Trigger a greeting response from the AI.
* Useful for seeding context or simulating a greeting trigger.
*/
public triggerGreeting(): void {
if (!this.connected || !this.ws) {
console.warn("[RealtimeVoice] Cannot trigger greeting: not connected");
return;
}
const greetingEvent = {
type: "response.create",
response: {
instructions: this.config.instructions,
},
};
this.sendEvent(greetingEvent);
console.log("[RealtimeVoice] Greeting triggered");
}
private sendSessionUpdate(): void {
const cfg = this.config;
const sessionUpdate: RealtimeSessionUpdate = {
type: "session.update",
session: {
modalities: ["text", "audio"],
instructions: cfg.instructions,
voice: cfg.voice ?? "alloy",
input_audio_format: "g711_ulaw",
output_audio_format: "g711_ulaw",
// whisper-1 is the only model currently supported by the Realtime API for
// inline user-speech transcription. This is distinct from the streaming
// STT path (streaming.sttModel) which uses gpt-4o-transcribe.
input_audio_transcription: {
model: "whisper-1",
},
turn_detection: {
type: "server_vad",
threshold: cfg.vadThreshold ?? 0.5,
prefix_padding_ms: cfg.prefixPaddingMs ?? 300,
silence_duration_ms: cfg.silenceDurationMs ?? 500,
create_response: true,
},
temperature: cfg.temperature ?? 0.8,
...(cfg.tools && cfg.tools.length > 0
? {
tools: cfg.tools,
tool_choice: "auto",
}
: {}),
},
};
console.log("[RealtimeVoice] Sending session.update");
this.sendEvent(sessionUpdate);
}
private drainPendingAudio(): void {
if (this.pendingAudio.length === 0) return;
console.log(`[RealtimeVoice] Draining ${this.pendingAudio.length} buffered audio chunks`);
for (const buf of this.pendingAudio) {
this.sendEvent({
type: "input_audio_buffer.append",
audio: buf.toString("base64"),
});
}
this.pendingAudio = [];
}
private async attemptReconnect(): Promise<void> {
if (this.intentionallyClosed) return;
if (this.reconnectAttempts >= OpenAIRealtimeVoiceBridge.MAX_RECONNECT_ATTEMPTS) {
const err = new Error(
`[RealtimeVoice] Max reconnect attempts (${OpenAIRealtimeVoiceBridge.MAX_RECONNECT_ATTEMPTS}) exceeded`,
);
console.error(err.message);
this.config.onError?.(err);
this.config.onClose?.();
return;
}
this.reconnectAttempts++;
const delay =
OpenAIRealtimeVoiceBridge.BASE_RECONNECT_DELAY_MS * 2 ** (this.reconnectAttempts - 1);
console.log(
`[RealtimeVoice] Reconnecting (${this.reconnectAttempts}/${OpenAIRealtimeVoiceBridge.MAX_RECONNECT_ATTEMPTS}) in ${delay}ms...`,
);
await new Promise<void>((resolve) => setTimeout(resolve, delay));
if (this.intentionallyClosed) return;
try {
await this.doConnect();
console.log("[RealtimeVoice] Reconnected successfully");
} catch (err) {
console.error("[RealtimeVoice] Reconnect failed:", err);
// doConnect's close handler will call attemptReconnect again
}
}
// -------------------------------------------------------------------------
// Event handling
// -------------------------------------------------------------------------
private handleEvent(event: RealtimeEvent): void {
switch (event.type) {
// ---- Session lifecycle ----
case "session.created":
console.log("[RealtimeVoice] Session created");
this.drainPendingAudio();
// Fire onReady exactly once — not on reconnects (greeting already played)
if (!this.sessionReadyFired) {
this.sessionReadyFired = true;
this.config.onReady?.();
}
break;
case "session.updated":
console.log("[RealtimeVoice] Session updated");
break;
// ---- Audio output: stream audio back to Twilio ----
case "response.audio.delta": {
if (!event.delta) break;
const audioBuffer = base64ToBuffer(event.delta);
this.config.onAudio(audioBuffer);
// Track response start timestamp for barge-in truncation
if (this.responseStartTimestamp === null) {
this.responseStartTimestamp = this.latestMediaTimestamp;
}
// Track the most recent assistant item ID
if (event.item_id) {
this.lastAssistantItemId = event.item_id;
}
// Send mark to track playback position
this.sendMark();
break;
}
case "response.audio.done":
console.log("[RealtimeVoice] Audio response complete");
break;
// ---- Barge-in: user started speaking, interrupt AI response ----
case "input_audio_buffer.speech_started":
console.log("[RealtimeVoice] Barge-in detected — clearing audio");
this.handleBargein();
break;
case "input_audio_buffer.speech_stopped":
console.log("[RealtimeVoice] Speech stopped");
break;
case "input_audio_buffer.committed":
console.log("[RealtimeVoice] Audio buffer committed");
break;
// ---- Mark acknowledgment from Twilio ----
case "response.audio_transcript.delta":
// AI speech transcript streaming (not an event we send to Twilio)
if (event.delta) {
this.config.onTranscript?.("assistant", event.delta, false);
}
break;
case "response.audio_transcript.done":
if (event.transcript) {
console.log(`[RealtimeVoice] Assistant: ${event.transcript}`);
this.config.onTranscript?.("assistant", event.transcript, true);
}
break;
// ---- User speech transcription (if text modality enabled) ----
case "conversation.item.input_audio_transcription.completed":
if (event.transcript) {
console.log(`[RealtimeVoice] User: ${event.transcript}`);
this.config.onTranscript?.("user", event.transcript, true);
}
break;
case "conversation.item.input_audio_transcription.delta":
if (event.delta) {
this.config.onTranscript?.("user", event.delta, false);
}
break;
// ---- Tool calling ----
case "response.function_call_arguments.delta": {
const key = event.item_id ?? "unknown";
const existing = this.toolCallBuffers.get(key);
if (existing && event.delta) {
existing.args += event.delta;
} else if (event.item_id) {
this.toolCallBuffers.set(event.item_id, {
name: event.name ?? "",
callId: event.call_id ?? "",
args: event.delta ?? "",
});
}
break;
}
case "response.function_call_arguments.done": {
const key = event.item_id ?? "unknown";
const buf = this.toolCallBuffers.get(key);
if (buf && this.config.onToolCall) {
let args: unknown;
try {
args = JSON.parse(buf.args || "{}");
} catch {
args = {};
}
this.config.onToolCall({
itemId: key,
callId: buf.callId || event.call_id || "",
name: buf.name || event.name || "",
args,
});
}
this.toolCallBuffers.delete(key);
break;
}
// ---- Response lifecycle ----
case "response.created":
console.log("[RealtimeVoice] Response started");
break;
case "response.done":
console.log("[RealtimeVoice] Response done");
// Reset mark queue and timestamps for next turn
this.markQueue = [];
this.responseStartTimestamp = null;
this.lastAssistantItemId = null;
break;
case "response.content.done":
// Individual content part done
break;
case "rate_limits.updated":
// Log rate limit info if needed
break;
// ---- Errors ----
case "error": {
const errMsg = event.error
? `${(event.error as { message?: string }).message ?? JSON.stringify(event.error)}`
: "Unknown error";
console.error(`[RealtimeVoice] Error event: ${errMsg}`);
this.config.onError?.(new Error(errMsg));
break;
}
default:
// Uncomment for debugging:
// console.log(`[RealtimeVoice] Unhandled event: ${event.type}`);
break;
}
}
/**
* Handle barge-in: truncate the current assistant response at the
* elapsed audio point, clear the Twilio buffer, and reset state.
* Mirrors the reference implementation's handleSpeechStartedEvent().
*/
private handleBargein(): void {
if (this.markQueue.length > 0 && this.responseStartTimestamp !== null) {
const elapsedMs = this.latestMediaTimestamp - this.responseStartTimestamp;
if (this.lastAssistantItemId) {
// Tell OpenAI to truncate the response at the point where the user
// interrupted — this ensures the AI's context matches what was heard
const truncateEvent = {
type: "conversation.item.truncate",
item_id: this.lastAssistantItemId,
content_index: 0,
audio_end_ms: Math.max(0, elapsedMs),
};
console.log(`[RealtimeVoice] Truncating at ${elapsedMs}ms`);
this.sendEvent(truncateEvent);
}
// Clear the audio already queued in Twilio's buffer
this.config.onClearAudio();
// Reset state
this.markQueue = [];
this.lastAssistantItemId = null;
this.responseStartTimestamp = null;
} else {
// Even if we have no mark queue, still clear audio to be safe
this.config.onClearAudio();
}
}
/**
* Send a mark event to Twilio to track audio playback position.
* The mark name is used to coordinate barge-in truncation.
*/
private sendMark(): void {
// We don't send marks directly — the caller does via onAudio
// We track mark queue internally for truncation calculations
this.markQueue.push(`audio-${Date.now()}`);
}
/**
* Handle Twilio mark acknowledgment (when a mark event comes back from Twilio).
* Call this method when you receive a "mark" event from the Twilio WebSocket.
*/
acknowledgeMark(): void {
if (this.markQueue.length > 0) {
this.markQueue.shift();
}
}
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
private sendEvent(event: unknown): void {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(event));
} else {
console.warn("[RealtimeVoice] Attempted to send event while disconnected");
}
}
}
// ---------------------------------------------------------------------------
// Provider factory (matches pattern of OpenAIRealtimeSTTProvider)
// ---------------------------------------------------------------------------
/**
* Configuration for the provider factory.
* Holds shared/default settings; per-call config is passed to createSession().
* @internal Not used by the plugin's built-in realtime handler; exposed for external consumers.
*/
export interface RealtimeVoiceProviderConfig {
/** OpenAI API key */
apiKey: string;
/** Default model (default: "gpt-4o-mini-realtime-preview") */
model?: string;
/** Default voice (default: "alloy") */
voice?: RealtimeVoice;
/** Default system instructions */
instructions?: string;
/** Default temperature (default: 0.8) */
temperature?: number;
/** Default VAD threshold (default: 0.5) */
vadThreshold?: number;
/** Default silence duration ms (default: 500) */
silenceDurationMs?: number;
/** Default tools */
tools?: RealtimeTool[];
}
/**
* Factory for creating RealtimeVoiceBridge instances.
* Follows the same pattern as OpenAIRealtimeSTTProvider for easy swapping.
*/
export class OpenAIRealtimeVoiceProvider {
readonly name = "openai-realtime-voice" as const;
constructor(private readonly defaults: RealtimeVoiceProviderConfig) {
if (!defaults.apiKey) {
throw new Error("[RealtimeVoiceProvider] OpenAI API key is required");
}
}
/**
* Create a new voice bridge for a single call session.
* Merges provided config with provider defaults.
*/
createBridge(
callConfig: Omit<RealtimeVoiceConfig, "apiKey"> & Partial<Pick<RealtimeVoiceConfig, "apiKey">>,
): OpenAIRealtimeVoiceBridge {
const merged: RealtimeVoiceConfig = {
apiKey: this.defaults.apiKey,
model: callConfig.model ?? this.defaults.model,
voice: callConfig.voice ?? this.defaults.voice,
instructions: callConfig.instructions ?? this.defaults.instructions,
temperature: callConfig.temperature ?? this.defaults.temperature,
vadThreshold: callConfig.vadThreshold ?? this.defaults.vadThreshold,
silenceDurationMs: callConfig.silenceDurationMs ?? this.defaults.silenceDurationMs,
tools: callConfig.tools ?? this.defaults.tools,
onAudio: callConfig.onAudio,
onClearAudio: callConfig.onClearAudio,
onTranscript: callConfig.onTranscript,
onToolCall: callConfig.onToolCall,
onReady: callConfig.onReady,
onError: callConfig.onError,
onClose: callConfig.onClose,
};
return new OpenAIRealtimeVoiceBridge(merged);
}
}
// ---------------------------------------------------------------------------
// MediaStreamHandler integration helper
// ---------------------------------------------------------------------------
/**
* Minimal interface that the bridge integration needs from MediaStreamHandler.
* This matches the actual MediaStreamHandler's method signatures.
* @internal Not used by the plugin's built-in realtime handler; exposed for external consumers.
*/
export interface MediaStreamHandlerLike {
sendAudio(streamSid: string, muLaw: Buffer): void;
clearAudio(streamSid: string): void;
sendMark(streamSid: string, name: string): void;
}
/**
* Create a RealtimeVoiceBridge wired to an existing MediaStreamHandler session.
* @internal Not used by the plugin's built-in realtime handler; exposed for external consumers.
*
* Drop-in helper for use inside media-stream.ts handleStart():
*
* ```typescript
* // In handleStart(), instead of creating an STT session:
* const bridge = createBridgeForStream({
* streamSid,
* handler: this, // MediaStreamHandler instance
* config: {
* apiKey: "...",
* instructions: "You are Gracie...",
* voice: "alloy",
* onTranscript: (role, text, final) => {
* if (final && role === "user") config.onTranscript?.(callId, text);
* },
* },
* });
* await bridge.connect();
* ```
*/
export function createBridgeForStream(opts: {
streamSid: string;
handler: MediaStreamHandlerLike;
config: Omit<RealtimeVoiceConfig, "onAudio" | "onClearAudio">;
}): OpenAIRealtimeVoiceBridge {
return new OpenAIRealtimeVoiceBridge({
...opts.config,
onAudio: (muLaw) => {
opts.handler.sendAudio(opts.streamSid, muLaw);
},
onClearAudio: () => {
opts.handler.clearAudio(opts.streamSid);
},
});
}
// ---------------------------------------------------------------------------
// Internal event shape types (partial — only fields we use)
// ---------------------------------------------------------------------------
interface RealtimeEvent {
type: string;
delta?: string;
transcript?: string;
item_id?: string;
call_id?: string;
name?: string;
error?: unknown;
}
interface RealtimeSessionUpdate {
type: "session.update";
session: {
modalities: string[];
instructions?: string;
voice: RealtimeVoice;
input_audio_format: string;
output_audio_format: string;
turn_detection: {
type: "server_vad";
threshold: number;
prefix_padding_ms: number;
silence_duration_ms: number;
create_response: boolean;
};
temperature: number;
input_audio_transcription?: { model: string };
tools?: RealtimeTool[];
tool_choice?: string;
};
}

View File

@ -11,6 +11,7 @@ import type { TelephonyTtsRuntime } from "./telephony-tts.js";
import { createTelephonyTtsProvider } from "./telephony-tts.js";
import { startTunnel, type TunnelResult } from "./tunnel.js";
import { VoiceCallWebhookServer } from "./webhook.js";
import { RealtimeCallHandler } from "./webhook/realtime-handler.js";
import { cleanupTailscaleExposure, setupTailscaleExposure } from "./webhook/tailscale.js";
export type VoiceCallRuntime = {
@ -20,6 +21,8 @@ export type VoiceCallRuntime = {
webhookServer: VoiceCallWebhookServer;
webhookUrl: string;
publicUrl: string | null;
/** Realtime voice handler — present when config.realtime.enabled is true */
realtimeHandler?: RealtimeCallHandler;
stop: () => Promise<void>;
};
@ -168,6 +171,20 @@ export async function createVoiceCallRuntime(params: {
const webhookServer = new VoiceCallWebhookServer(config, manager, provider, coreConfig);
const lifecycle = createRuntimeResourceLifecycle({ config, webhookServer });
// Wire realtime handler before the server starts so it's ready for upgrades
let realtimeHandler: RealtimeCallHandler | undefined;
if (config.realtime.enabled) {
realtimeHandler = new RealtimeCallHandler(
config.realtime,
manager,
provider,
coreConfig,
config.streaming.openaiApiKey,
);
webhookServer.setRealtimeHandler(realtimeHandler);
log.info("[voice-call] Realtime voice handler initialized");
}
const localUrl = await webhookServer.start();
// Wrap remaining initialization in try/catch so the webhook server is
@ -252,6 +269,7 @@ export async function createVoiceCallRuntime(params: {
webhookServer,
webhookUrl,
publicUrl,
realtimeHandler,
stop,
};
} catch (err) {

View File

@ -40,6 +40,7 @@ export function createVoiceCallBaseConfig(params?: {
maxPendingConnectionsPerIp: 4,
maxConnections: 128,
},
realtime: { enabled: false, tools: [] },
skipSignatureVerification: false,
stt: { provider: "openai", model: "whisper-1" },
tts: {

View File

@ -14,11 +14,12 @@ import type { VoiceCallProvider } from "./providers/base.js";
import { OpenAIRealtimeSTTProvider } from "./providers/stt-openai-realtime.js";
import type { TwilioProvider } from "./providers/twilio.js";
import type { NormalizedEvent, WebhookContext } from "./types.js";
import type { RealtimeCallHandler } from "./webhook/realtime-handler.js";
import { startStaleCallReaper } from "./webhook/stale-call-reaper.js";
const MAX_WEBHOOK_BODY_BYTES = 1024 * 1024;
type WebhookResponsePayload = {
export type WebhookResponsePayload = {
statusCode: number;
body: string;
headers?: Record<string, string>;
@ -60,6 +61,9 @@ export class VoiceCallWebhookServer {
/** Media stream handler for bidirectional audio (when streaming enabled) */
private mediaStreamHandler: MediaStreamHandler | null = null;
/** Realtime voice handler — present when config.realtime.enabled is true */
private realtimeHandler: RealtimeCallHandler | null = null;
constructor(
config: VoiceCallConfig,
manager: CallManager,
@ -84,6 +88,13 @@ export class VoiceCallWebhookServer {
return this.mediaStreamHandler;
}
/**
* Wire the realtime call handler (called from runtime.ts before server starts).
*/
setRealtimeHandler(handler: RealtimeCallHandler): void {
this.realtimeHandler = handler;
}
/**
* Initialize media streaming with OpenAI Realtime STT.
*/
@ -229,9 +240,15 @@ export class VoiceCallWebhookServer {
});
});
// Handle WebSocket upgrades for media streams
if (this.mediaStreamHandler) {
// Handle WebSocket upgrades for realtime voice and media streams
if (this.realtimeHandler || this.mediaStreamHandler) {
this.server.on("upgrade", (request, socket, head) => {
// Realtime voice takes precedence when the path matches
if (this.realtimeHandler && this.isRealtimeWebSocketUpgrade(request)) {
console.log("[voice-call] WebSocket upgrade for realtime voice");
this.realtimeHandler.handleWebSocketUpgrade(request, socket, head);
return;
}
const path = this.getUpgradePathname(request);
if (path === streamPath) {
console.log("[voice-call] WebSocket upgrade for media stream");
@ -338,6 +355,14 @@ export class VoiceCallWebhookServer {
this.writeWebhookResponse(res, payload);
}
/**
* Returns true for WebSocket upgrade paths that belong to the realtime handler.
* Used only for upgrade routing not for the inbound HTTP webhook POST.
*/
private isRealtimeWebSocketUpgrade(req: http.IncomingMessage): boolean {
return (req.url ?? "/").includes("/realtime");
}
private async runWebhookPipeline(
req: http.IncomingMessage,
webhookPath: string,
@ -396,6 +421,18 @@ export class VoiceCallWebhookServer {
return { statusCode: 401, body: "Unauthorized" };
}
// Realtime mode: return TwiML <Connect><Stream> for inbound ringing calls only.
// Status callbacks (CallStatus=completed, outbound calls, etc.) must fall
// through to the normal webhook pipeline so call state is updated correctly.
if (this.realtimeHandler) {
const params = new URLSearchParams(ctx.rawBody);
const callStatus = params.get("CallStatus");
const direction = params.get("Direction");
if (callStatus === "ringing" && (!direction || direction === "inbound")) {
return this.realtimeHandler.buildTwiMLPayload(req, params);
}
}
const parsed = this.provider.parseWebhookEvent(ctx, {
verifiedRequestKey: verification.verifiedRequestKey,
});

View File

@ -0,0 +1,326 @@
import http from "node:http";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { CallManager } from "../manager.js";
import type { VoiceCallProvider } from "../providers/base.js";
import type { CallRecord } from "../types.js";
import { RealtimeCallHandler } from "./realtime-handler.js";
/** Extract the stream token from a TwiML body string. */
function extractStreamToken(twiml: string): string | null {
const match = twiml.match(/\/voice\/stream\/realtime\/([^"&\s]+)/);
return match?.[1] ?? null;
}
// Minimal realtime config used across tests
const baseRealtimeConfig = {
enabled: true,
voice: "ash" as const,
tools: [] as never[],
};
// Fake CallRecord for manager stubs
function makeCallRecord(overrides: Partial<CallRecord> = {}): CallRecord {
return {
callId: "call-rt-1",
providerCallId: "CA_test",
provider: "twilio",
direction: "inbound",
state: "answered",
from: "+15550001234",
to: "+15550005678",
startedAt: Date.now(),
transcript: [],
processedEventIds: [],
metadata: {
initialMessage: "Hello! How can I help you today?",
},
...overrides,
};
}
function makeManager(record?: CallRecord): CallManager {
const storedRecord = record ?? makeCallRecord();
return {
processEvent: vi.fn(),
getCallByProviderCallId: vi.fn(() => storedRecord),
getCall: vi.fn(() => storedRecord),
} as unknown as CallManager;
}
function makeProvider(): VoiceCallProvider {
return {
name: "twilio",
verifyWebhook: vi.fn(() => ({ ok: true, verifiedRequestKey: "mock:key" })),
parseWebhookEvent: vi.fn(() => ({ events: [] })),
initiateCall: vi.fn(async () => ({ providerCallId: "CA_test", status: "initiated" as const })),
hangupCall: vi.fn(async () => {}),
playTts: vi.fn(async () => {}),
startListening: vi.fn(async () => {}),
stopListening: vi.fn(async () => {}),
getCallStatus: vi.fn(async () => ({ status: "in-progress" as const, isTerminal: false })),
};
}
function makeRequest(url: string, host = "example.ts.net"): http.IncomingMessage {
const req = new http.IncomingMessage(null as never);
req.url = url;
req.method = "POST";
req.headers = { host };
return req;
}
describe("RealtimeCallHandler", () => {
let originalEnv: NodeJS.ProcessEnv;
beforeEach(() => {
originalEnv = { ...process.env };
});
afterEach(() => {
process.env = { ...originalEnv };
});
// ---------------------------------------------------------------------------
// buildTwiMLPayload
// ---------------------------------------------------------------------------
describe("buildTwiMLPayload", () => {
it("returns TwiML <Connect><Stream> with wss URL derived from request host", () => {
const handler = new RealtimeCallHandler(
baseRealtimeConfig,
makeManager(),
makeProvider(),
null,
);
const req = makeRequest("/voice/webhook", "gateway.ts.net");
const payload = handler.buildTwiMLPayload(req);
expect(payload.statusCode).toBe(200);
expect(payload.headers?.["Content-Type"]).toBe("text/xml");
expect(payload.body).toContain("<Connect>");
expect(payload.body).toContain("<Stream");
expect(payload.body).toMatch(
/wss:\/\/gateway\.ts\.net\/voice\/stream\/realtime\/[0-9a-f-]{36}/,
);
});
it("falls back to localhost when no host header is present", () => {
const handler = new RealtimeCallHandler(
baseRealtimeConfig,
makeManager(),
makeProvider(),
null,
);
const req = makeRequest("/voice/webhook", "");
const payload = handler.buildTwiMLPayload(req);
expect(payload.body).toMatch(
/wss:\/\/localhost:8443\/voice\/stream\/realtime\/[0-9a-f-]{36}/,
);
});
it("embeds a unique token on each call", () => {
const handler = new RealtimeCallHandler(
baseRealtimeConfig,
makeManager(),
makeProvider(),
null,
);
const req = makeRequest("/voice/webhook", "host.example.com");
const token1 = extractStreamToken(handler.buildTwiMLPayload(req).body);
const token2 = extractStreamToken(handler.buildTwiMLPayload(req).body);
expect(token1).toBeTruthy();
expect(token2).toBeTruthy();
expect(token1).not.toBe(token2);
});
});
// ---------------------------------------------------------------------------
// Stream token (nonce) validation
// ---------------------------------------------------------------------------
describe("stream token (nonce)", () => {
it("issueStreamToken + consumeStreamToken: valid token accepted once then rejected", () => {
const handler = new RealtimeCallHandler(
baseRealtimeConfig,
makeManager(),
makeProvider(),
null,
);
const issue = (handler as unknown as { issueStreamToken: () => string }).issueStreamToken;
const consume = (
handler as unknown as {
consumeStreamToken: (t: string) => { from?: string; to?: string } | null;
}
).consumeStreamToken;
const token = issue.call(handler);
expect(consume.call(handler, token)).not.toBeNull();
expect(consume.call(handler, token)).toBeNull();
});
it("rejects unknown tokens", () => {
const handler = new RealtimeCallHandler(
baseRealtimeConfig,
makeManager(),
makeProvider(),
null,
);
const consume = (
handler as unknown as {
consumeStreamToken: (t: string) => { from?: string; to?: string } | null;
}
).consumeStreamToken;
expect(consume.call(handler, "not-a-real-token")).toBeNull();
});
});
// ---------------------------------------------------------------------------
// registerCallInManager — greeting suppression
// ---------------------------------------------------------------------------
describe("registerCallInManager (via handleCall)", () => {
it("clears metadata.initialMessage so the inboundGreeting TTS path is skipped", () => {
const callRecord = makeCallRecord({
metadata: { initialMessage: "Hello from config!" },
});
const manager = makeManager(callRecord);
const handler = new RealtimeCallHandler(baseRealtimeConfig, manager, makeProvider(), null);
// Access private method via type assertion for unit testing
(
handler as unknown as { registerCallInManager: (sid: string) => string }
).registerCallInManager("CA_test");
// call.initiated + call.answered should both have been emitted
expect(vi.mocked(manager.processEvent)).toHaveBeenCalledTimes(2);
const eventTypes = vi
.mocked(manager.processEvent)
.mock.calls.map(([e]) => (e as { type: string }).type);
expect(eventTypes).toEqual(["call.initiated", "call.answered"]);
// initialMessage must be cleared before call.answered fires
expect(callRecord.metadata?.initialMessage).toBeUndefined();
});
it("returns callId from the manager-created call record", () => {
const callRecord = makeCallRecord({ callId: "manager-gen-id" });
const manager = makeManager(callRecord);
const handler = new RealtimeCallHandler(baseRealtimeConfig, manager, makeProvider(), null);
const result = (
handler as unknown as { registerCallInManager: (sid: string) => string }
).registerCallInManager("CA_test");
expect(result).toBe("manager-gen-id");
});
it("falls back to providerCallId when manager has no record", () => {
const manager = {
processEvent: vi.fn(),
getCallByProviderCallId: vi.fn(() => undefined),
} as unknown as CallManager;
const handler = new RealtimeCallHandler(baseRealtimeConfig, manager, makeProvider(), null);
const result = (
handler as unknown as { registerCallInManager: (sid: string) => string }
).registerCallInManager("CA_fallback");
expect(result).toBe("CA_fallback");
});
});
// ---------------------------------------------------------------------------
// Tool handler framework
// ---------------------------------------------------------------------------
describe("registerToolHandler", () => {
it("routes tool calls to registered handlers and returns their result", async () => {
const handler = new RealtimeCallHandler(
baseRealtimeConfig,
makeManager(),
makeProvider(),
null,
);
handler.registerToolHandler("get_time", async () => ({ utc: "2026-03-10T00:00:00Z" }));
const fakeSubmit = vi.fn();
const fakeBridge = { submitToolResult: fakeSubmit } as never;
await (
handler as unknown as {
executeToolCall: (
bridge: never,
callId: string,
bridgeCallId: string,
name: string,
args: unknown,
) => Promise<void>;
}
).executeToolCall(fakeBridge, "call-1", "bridge-call-1", "get_time", {});
expect(fakeSubmit).toHaveBeenCalledWith("bridge-call-1", { utc: "2026-03-10T00:00:00Z" });
});
it("returns an error result for unregistered tool names", async () => {
const handler = new RealtimeCallHandler(
baseRealtimeConfig,
makeManager(),
makeProvider(),
null,
);
const fakeSubmit = vi.fn();
const fakeBridge = { submitToolResult: fakeSubmit } as never;
await (
handler as unknown as {
executeToolCall: (
bridge: never,
callId: string,
bridgeCallId: string,
name: string,
args: unknown,
) => Promise<void>;
}
).executeToolCall(fakeBridge, "call-1", "bridge-call-1", "unknown_tool", {});
expect(fakeSubmit).toHaveBeenCalledWith("bridge-call-1", {
error: 'Tool "unknown_tool" not available',
});
});
it("returns an error result when a handler throws", async () => {
const handler = new RealtimeCallHandler(
baseRealtimeConfig,
makeManager(),
makeProvider(),
null,
);
handler.registerToolHandler("boom", async () => {
throw new Error("handler blew up");
});
const fakeSubmit = vi.fn();
const fakeBridge = { submitToolResult: fakeSubmit } as never;
await (
handler as unknown as {
executeToolCall: (
bridge: never,
callId: string,
bridgeCallId: string,
name: string,
args: unknown,
) => Promise<void>;
}
).executeToolCall(fakeBridge, "call-1", "bridge-call-1", "boom", {});
expect(fakeSubmit).toHaveBeenCalledWith("bridge-call-1", { error: "handler blew up" });
});
});
});

View File

@ -0,0 +1,383 @@
import { randomUUID } from "node:crypto";
import http from "node:http";
import type { Duplex } from "node:stream";
import WebSocket, { WebSocketServer } from "ws";
import type { VoiceCallRealtimeConfig } from "../config.js";
import type { CoreConfig } from "../core-bridge.js";
import type { CallManager } from "../manager.js";
import type { VoiceCallProvider } from "../providers/base.js";
import {
OpenAIRealtimeVoiceBridge,
type RealtimeTool,
} from "../providers/openai-realtime-voice.js";
import type { NormalizedEvent } from "../types.js";
import type { WebhookResponsePayload } from "../webhook.js";
export type ToolHandlerFn = (args: unknown, callId: string) => Promise<unknown>;
/**
* Handles inbound voice calls bridged directly to the OpenAI Realtime API.
*
* Responsibilities:
* - Accept WebSocket upgrades from Twilio Media Streams at the /realtime path
* - Return TwiML <Connect><Stream> payload for the initial HTTP webhook
* - Register each call with CallManager (appears in voice status/history)
* - Route tool calls to registered handlers (Phase 5 tool framework)
*
* Provider coupling: this class currently speaks the Twilio Media Streams
* WebSocket protocol directly (μ-law audio, streamSid/callSid, start/media/
* mark/stop events) and emits Twilio TwiML. The OpenAI bridge itself is
* provider-agnostic. If a second provider needs realtime support, the right
* refactor is to extract a RealtimeMediaAdapter interface (buildStreamPayload +
* handleWebSocketUpgrade + MediaStreamCallbacks) so the bridge and call-manager
* wiring can be reused without duplicating the OpenAI session logic.
*/
/** How long (ms) a stream token remains valid after TwiML is issued. */
const STREAM_TOKEN_TTL_MS = 30_000;
export class RealtimeCallHandler {
private toolHandlers = new Map<string, ToolHandlerFn>();
/** One-time tokens issued per TwiML response; consumed on WS upgrade.
* Stores expiry + caller metadata so registerCallInManager can include From/To. */
private pendingStreamTokens = new Map<string, { expiry: number; from?: string; to?: string }>();
constructor(
private config: VoiceCallRealtimeConfig,
private manager: CallManager,
private provider: VoiceCallProvider,
private coreConfig: CoreConfig | null,
/** Pre-resolved OpenAI API key (falls back to OPENAI_API_KEY env at call time) */
private openaiApiKey?: string,
) {}
/**
* Handle a WebSocket upgrade request from Twilio for a realtime media stream.
* Called from VoiceCallWebhookServer's upgrade handler when isRealtimeWebSocketUpgrade() is true.
*
* Validates the one-time stream token embedded in the URL by buildTwiMLPayload before
* accepting the upgrade. This ensures the WS connection was preceded by a properly
* Twilio-signed POST webhook the token is only issued after verifyWebhook passes.
*/
handleWebSocketUpgrade(request: http.IncomingMessage, socket: Duplex, head: Buffer): void {
const url = new URL(request.url ?? "/", "wss://localhost");
// Token is embedded as the last path segment (e.g. /voice/stream/realtime/<uuid>)
// to survive reverse proxies that strip query parameters (e.g. Tailscale Funnel).
const token = url.pathname.split("/").pop() ?? null;
const callerMeta = token ? this.consumeStreamToken(token) : null;
if (!callerMeta) {
console.warn("[voice-call] Rejecting WS upgrade: missing or invalid stream token");
socket.write("HTTP/1.1 401 Unauthorized\r\n\r\n");
socket.destroy();
return;
}
const wss = new WebSocketServer({ noServer: true });
wss.handleUpgrade(request, socket, head, (ws) => {
let bridge: OpenAIRealtimeVoiceBridge | null = null;
let initialized = false;
ws.on("message", (data: Buffer) => {
try {
const msg = JSON.parse(data.toString()) as Record<string, unknown>;
if (!initialized && msg.event === "start") {
initialized = true;
const startData = msg.start as Record<string, string> | undefined;
const streamSid = startData?.streamSid || "unknown";
const callSid = startData?.callSid || "unknown";
bridge = this.handleCall(streamSid, callSid, ws, callerMeta);
} else if (bridge) {
const mediaData = msg.media as Record<string, unknown> | undefined;
if (msg.event === "media" && mediaData?.payload) {
bridge.sendAudio(Buffer.from(mediaData.payload as string, "base64"));
if (mediaData.timestamp) {
bridge.setMediaTimestamp(Number(mediaData.timestamp));
}
} else if (msg.event === "mark") {
bridge.acknowledgeMark();
} else if (msg.event === "stop") {
bridge.close();
}
}
} catch (err) {
console.error("[voice-call] Error parsing WS message:", err);
}
});
ws.on("close", () => {
bridge?.close();
});
});
}
/**
* Build the TwiML <Connect><Stream> response payload for a realtime call.
* The WebSocket URL is derived from the incoming request host so no hostname
* is hardcoded. A one-time stream token is embedded in the URL and validated
* by handleWebSocketUpgrade to prevent unauthenticated WS connections.
*
* @param params - Parsed Twilio webhook body params (From/To stored with nonce
* so registerCallInManager can populate caller fields).
*/
buildTwiMLPayload(req: http.IncomingMessage, params?: URLSearchParams): WebhookResponsePayload {
const host = req.headers.host || "localhost:8443";
const token = this.issueStreamToken({
from: params?.get("From") ?? undefined,
to: params?.get("To") ?? undefined,
});
const wsUrl = `wss://${host}/voice/stream/realtime/${token}`;
console.log(
`[voice-call] Returning realtime TwiML with WebSocket: wss://${host}/voice/stream/realtime`,
);
const twiml = `<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Connect>
<Stream url="${wsUrl}" />
</Connect>
</Response>`;
return {
statusCode: 200,
headers: { "Content-Type": "text/xml" },
body: twiml,
};
}
/**
* Register a named tool handler to be called when the model invokes a function.
* Must be called before calls begin.
*
* @param name - Function name as declared in config.realtime.tools
* @param fn - Async handler receiving (parsedArgs, internalCallId); return value
* is submitted back to the model as the tool result.
*/
registerToolHandler(name: string, fn: ToolHandlerFn): void {
this.toolHandlers.set(name, fn);
}
// ---------------------------------------------------------------------------
// Private
// ---------------------------------------------------------------------------
/** Generate a single-use stream token valid for STREAM_TOKEN_TTL_MS. */
private issueStreamToken(meta: { from?: string; to?: string } = {}): string {
const token = randomUUID();
this.pendingStreamTokens.set(token, { expiry: Date.now() + STREAM_TOKEN_TTL_MS, ...meta });
// Evict expired tokens to prevent unbounded growth if calls are abandoned
for (const [t, entry] of this.pendingStreamTokens) {
if (Date.now() > entry.expiry) this.pendingStreamTokens.delete(t);
}
return token;
}
/** Consume a stream token. Returns caller metadata if valid, null if not. */
private consumeStreamToken(token: string): { from?: string; to?: string } | null {
const entry = this.pendingStreamTokens.get(token);
if (!entry) return null;
this.pendingStreamTokens.delete(token);
return Date.now() <= entry.expiry ? { from: entry.from, to: entry.to } : null;
}
/**
* Create and start the OpenAI Realtime bridge for a single call session.
* Registers the call with CallManager so it appears in status/history.
* Returns the bridge (or null on fatal config error).
*/
private handleCall(
streamSid: string,
callSid: string,
ws: WebSocket,
callerMeta: { from?: string; to?: string },
): OpenAIRealtimeVoiceBridge | null {
const apiKey = this.openaiApiKey ?? process.env.OPENAI_API_KEY;
if (!apiKey) {
console.error(
"[voice-call] No OpenAI API key for realtime call (set streaming.openaiApiKey or OPENAI_API_KEY)",
);
ws.close(1011, "No API key");
return null;
}
const callId = this.registerCallInManager(callSid, callerMeta);
console.log(
`[voice-call] Realtime call: streamSid=${streamSid}, callSid=${callSid}, callId=${callId}`,
);
// Declare as null first so closures can capture the reference before bridge is created.
// By the time any callback fires, bridge will be fully assigned.
let bridge: OpenAIRealtimeVoiceBridge | null = null;
bridge = new OpenAIRealtimeVoiceBridge({
apiKey,
model: this.config.model,
voice: this.config.voice,
instructions: this.config.instructions,
temperature: this.config.temperature,
vadThreshold: this.config.vadThreshold,
silenceDurationMs: this.config.silenceDurationMs,
prefixPaddingMs: this.config.prefixPaddingMs,
tools: this.config.tools as RealtimeTool[],
onAudio: (muLaw) => {
ws.send(
JSON.stringify({
event: "media",
streamSid,
media: { payload: muLaw.toString("base64") },
}),
);
},
onClearAudio: () => {
ws.send(JSON.stringify({ event: "clear", streamSid }));
},
onTranscript: (role, text, isFinal) => {
if (isFinal) {
if (role === "user") {
const event: NormalizedEvent = {
id: `realtime-speech-${callSid}-${Date.now()}`,
type: "call.speech",
callId,
providerCallId: callSid,
timestamp: Date.now(),
transcript: text,
isFinal: true,
};
this.manager.processEvent(event);
} else if (role === "assistant") {
// Log assistant turns via call.speaking so they appear as speaker:"bot"
// in the transcript (same mechanism Telnyx uses for TTS speech).
this.manager.processEvent({
id: `realtime-bot-${callSid}-${Date.now()}`,
type: "call.speaking",
callId,
providerCallId: callSid,
timestamp: Date.now(),
text,
});
}
}
},
onToolCall: (toolEvent) => {
if (bridge) {
void this.executeToolCall(
bridge,
callId,
toolEvent.callId,
toolEvent.name,
toolEvent.args,
);
}
},
onReady: () => {
bridge?.triggerGreeting();
},
onError: (err) => {
console.error("[voice-call] Realtime error:", err.message);
},
onClose: () => {
this.endCallInManager(callSid, callId);
},
});
bridge.connect().catch((err: Error) => {
console.error("[voice-call] Failed to connect realtime bridge:", err);
ws.close(1011, "Failed to connect");
});
// Acknowledge the stream connection (mirrors Twilio Media Streams protocol)
ws.send(JSON.stringify({ event: "connected", protocol: "Call", version: "1.0.0" }));
return bridge;
}
/**
* Emit synthetic NormalizedEvents to register the call with CallManager.
* Returns the internal callId generated by the manager.
*
* Tested directly via `as unknown as` cast the logic is non-trivial
* enough to warrant unit testing without promoting to a public method.
*/
private registerCallInManager(
callSid: string,
callerMeta: { from?: string; to?: string } = {},
): string {
const now = Date.now();
const baseFields = {
providerCallId: callSid,
timestamp: now,
direction: "inbound" as const,
...(callerMeta.from ? { from: callerMeta.from } : {}),
...(callerMeta.to ? { to: callerMeta.to } : {}),
};
// call.initiated causes the manager to auto-create the call record
// (see manager/events.ts createWebhookCall path)
this.manager.processEvent({
id: `realtime-initiated-${callSid}`,
callId: callSid,
type: "call.initiated",
...baseFields,
});
// Clear inboundGreeting from the call record before call.answered fires.
// The realtime bridge owns all voice output; the TTS greeting path would
// fail anyway because provider state is never initialized for realtime calls.
const callRecord = this.manager.getCallByProviderCallId(callSid);
if (callRecord?.metadata) {
delete callRecord.metadata.initialMessage;
}
this.manager.processEvent({
id: `realtime-answered-${callSid}`,
callId: callSid,
type: "call.answered",
...baseFields,
});
return callRecord?.callId ?? callSid;
}
private endCallInManager(callSid: string, callId: string): void {
this.manager.processEvent({
id: `realtime-ended-${callSid}-${Date.now()}`,
type: "call.ended",
callId,
providerCallId: callSid,
timestamp: Date.now(),
reason: "completed",
});
}
/**
* Dispatch a tool call from the Realtime API to the registered handler.
* Submits the result (or an error object) back to the bridge.
*
* Tested directly via `as unknown as` cast the routing/error logic is
* worth unit testing without exposing the method publicly.
*/
private async executeToolCall(
bridge: OpenAIRealtimeVoiceBridge,
callId: string,
bridgeCallId: string,
name: string,
args: unknown,
): Promise<void> {
const handler = this.toolHandlers.get(name);
let result: unknown;
if (handler) {
try {
result = await handler(args, callId);
} catch (err) {
result = { error: err instanceof Error ? err.message : String(err) };
}
} else {
result = { error: `Tool "${name}" not available` };
}
bridge.submitToolResult(bridgeCallId, result);
}
}