Voice Call: enforce spoken-output contract and fix stream TTS silence regression (#51500)

* voice-call: harden streaming startup and fallback behavior

* voice-call: suppress barge-in during intro

* voice-call: skip first-turn auto-response while greeting plays

* Voice-call: improve telephony audio fidelity and pacing

* voice-call: enforce spoken JSON and first-message barge skip

* voice-call: fix silent stream TTS regression

* voice-call: remove TTS timing diagnostics and document stream behavior

* voice-call: fail stream playback when stream sends are dropped

* voice-call: harden spoken contract and initial stream replay

* voice-call: suppress barge transcripts during initial greeting

* voice-call: harden stream fallback and media safety
This commit is contained in:
Josh Avant 2026-03-21 04:15:16 -05:00 committed by GitHub
parent e9f715f27b
commit 3f7f2c8dc9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 1558 additions and 146 deletions

View File

@ -157,6 +157,7 @@ Docs: https://docs.openclaw.ai
- Gateway: harden OpenResponses file-context escaping (#50782) Thanks @YLChen-007 and @joshavant.
- LINE: harden Express webhook parsing to verified raw body (#51202) Thanks @gladiator9797 and @joshavant.
- Exec: harden host env override handling across gateway and node (#51207) Thanks @gladiator9797 and @joshavant.
- Voice Call: enforce spoken-output contract and fix stream TTS silence regression (#51500) Thanks @joshavant.
- xAI/models: rename the bundled Grok 4.20 catalog entries to the GA IDs and normalize saved deprecated beta IDs at runtime so existing configs and sessions keep resolving. (#50772) thanks @Jaaneek
### Fixes

View File

@ -224,6 +224,7 @@ Notes:
- **Microsoft speech is ignored for voice calls** (telephony audio needs PCM; the current Microsoft transport does not expose telephony PCM output).
- Core TTS is used when Twilio media streaming is enabled; otherwise calls fall back to provider native voices.
- If a Twilio media stream is already active, Voice Call does not fall back to TwiML `<Say>`. If telephony TTS is unavailable in that state, the playback request fails instead of mixing two playback paths.
### More examples
@ -308,6 +309,35 @@ Auto-responses use the agent system. Tune with:
- `responseSystemPrompt`
- `responseTimeoutMs`
### Spoken output contract
For auto-responses, Voice Call appends a strict spoken-output contract to the system prompt:
- `{"spoken":"..."}`
Voice Call then extracts speech text defensively:
- Ignores payloads marked as reasoning/error content.
- Parses direct JSON, fenced JSON, or inline `"spoken"` keys.
- Falls back to plain text and removes likely planning/meta lead-in paragraphs.
This keeps spoken playback focused on caller-facing text and avoids leaking planning text into audio.
### Conversation startup behavior
For outbound `conversation` calls, first-message handling is tied to live playback state:
- Barge-in queue clear and auto-response are suppressed only while the initial greeting is actively speaking.
- If initial playback fails, the call returns to `listening` and the initial message remains queued for retry.
- Initial playback for Twilio streaming starts on stream connect without extra delay.
### Twilio stream disconnect grace
When a Twilio media stream disconnects, Voice Call waits `2000ms` before auto-ending the call:
- If the stream reconnects during that window, auto-end is canceled.
- If no stream is re-registered after the grace period, the call is ended to prevent stuck active calls.
## CLI
```bash

View File

@ -140,4 +140,8 @@ Actions:
- Adds replay protection for Twilio and Plivo webhooks (valid duplicate callbacks are ignored safely).
- Twilio speech turns include a per-turn token so stale/replayed callbacks cannot complete a newer turn.
- `responseModel` / `responseSystemPrompt` control AI auto-responses.
- Voice-call auto-responses enforce a spoken JSON contract (`{"spoken":"..."}`) and filter reasoning/meta output before playback.
- While a Twilio stream is active, playback does not fall back to TwiML `<Say>`; stream-TTS failures fail the playback request.
- Outbound conversation calls suppress barge-in only while the initial greeting is actively speaking, then re-enable normal interruption.
- Twilio stream disconnect auto-end uses a short grace window so quick reconnects do not end the call.
- Media streaming requires `ws` and OpenAI Realtime API key.

View File

@ -1,6 +1,36 @@
import { describe, expect, it } from "vitest";
import { describe, expect, it, vi } from "vitest";
import { createManagerHarness, FakeProvider } from "./manager.test-harness.js";
class FailFirstPlayTtsProvider extends FakeProvider {
private failed = false;
override async playTts(input: Parameters<FakeProvider["playTts"]>[0]): Promise<void> {
this.playTtsCalls.push(input);
if (!this.failed) {
this.failed = true;
throw new Error("synthetic tts failure");
}
}
}
class DelayedPlayTtsProvider extends FakeProvider {
private releasePlayTts: (() => void) | null = null;
readonly playTtsStarted = vi.fn();
override async playTts(input: Parameters<FakeProvider["playTts"]>[0]): Promise<void> {
this.playTtsCalls.push(input);
this.playTtsStarted();
await new Promise<void>((resolve) => {
this.releasePlayTts = resolve;
});
}
releaseCurrentPlayback(): void {
this.releasePlayTts?.();
this.releasePlayTts = null;
}
}
describe("CallManager notify and mapping", () => {
it("upgrades providerCallId mapping when provider ID changes", async () => {
const { manager } = await createManagerHarness();
@ -50,4 +80,211 @@ describe("CallManager notify and mapping", () => {
expect(provider.playTtsCalls[0]?.text).toBe("Hello there");
},
);
it("speaks initial message on answered for conversation mode with non-stream provider", async () => {
const { manager, provider } = await createManagerHarness({}, new FakeProvider("plivo"));
const { callId, success } = await manager.initiateCall("+15550000003", undefined, {
message: "Hello from conversation",
mode: "conversation",
});
expect(success).toBe(true);
manager.processEvent({
id: "evt-conversation-plivo",
type: "call.answered",
callId,
providerCallId: "call-uuid",
timestamp: Date.now(),
});
await new Promise((resolve) => setTimeout(resolve, 0));
expect(provider.playTtsCalls).toHaveLength(1);
expect(provider.playTtsCalls[0]?.text).toBe("Hello from conversation");
});
it("speaks initial message on answered for conversation mode when Twilio streaming is disabled", async () => {
const { manager, provider } = await createManagerHarness(
{ streaming: { enabled: false } },
new FakeProvider("twilio"),
);
const { callId, success } = await manager.initiateCall("+15550000004", undefined, {
message: "Twilio non-stream",
mode: "conversation",
});
expect(success).toBe(true);
manager.processEvent({
id: "evt-conversation-twilio-no-stream",
type: "call.answered",
callId,
providerCallId: "call-uuid",
timestamp: Date.now(),
});
await new Promise((resolve) => setTimeout(resolve, 0));
expect(provider.playTtsCalls).toHaveLength(1);
expect(provider.playTtsCalls[0]?.text).toBe("Twilio non-stream");
});
it("waits for stream connect in conversation mode when Twilio streaming is enabled", async () => {
const { manager, provider } = await createManagerHarness(
{ streaming: { enabled: true } },
new FakeProvider("twilio"),
);
const { callId, success } = await manager.initiateCall("+15550000005", undefined, {
message: "Twilio stream",
mode: "conversation",
});
expect(success).toBe(true);
manager.processEvent({
id: "evt-conversation-twilio-stream",
type: "call.answered",
callId,
providerCallId: "call-uuid",
timestamp: Date.now(),
});
await new Promise((resolve) => setTimeout(resolve, 0));
expect(provider.playTtsCalls).toHaveLength(0);
});
it("speaks on answered when Twilio streaming is enabled but stream-connect path is unavailable", async () => {
const twilioProvider = new FakeProvider("twilio");
twilioProvider.twilioStreamConnectEnabled = false;
const { manager, provider } = await createManagerHarness(
{ streaming: { enabled: true } },
twilioProvider,
);
const { callId, success } = await manager.initiateCall("+15550000009", undefined, {
message: "Twilio stream unavailable",
mode: "conversation",
});
expect(success).toBe(true);
manager.processEvent({
id: "evt-conversation-twilio-stream-unavailable",
type: "call.answered",
callId,
providerCallId: "call-uuid",
timestamp: Date.now(),
});
await new Promise((resolve) => setTimeout(resolve, 0));
expect(provider.playTtsCalls).toHaveLength(1);
expect(provider.playTtsCalls[0]?.text).toBe("Twilio stream unavailable");
});
it("preserves initialMessage after a failed first playback and retries on next trigger", async () => {
const provider = new FailFirstPlayTtsProvider("plivo");
const { manager } = await createManagerHarness({}, provider);
const { callId, success } = await manager.initiateCall("+15550000006", undefined, {
message: "Retry me",
mode: "notify",
});
expect(success).toBe(true);
manager.processEvent({
id: "evt-retry-1",
type: "call.answered",
callId,
providerCallId: "call-uuid",
timestamp: Date.now(),
});
await new Promise((resolve) => setTimeout(resolve, 0));
const afterFailure = manager.getCall(callId);
expect(provider.playTtsCalls).toHaveLength(1);
expect(afterFailure?.metadata?.initialMessage).toBe("Retry me");
expect(afterFailure?.state).toBe("listening");
manager.processEvent({
id: "evt-retry-2",
type: "call.answered",
callId,
providerCallId: "call-uuid",
timestamp: Date.now(),
});
await new Promise((resolve) => setTimeout(resolve, 0));
const afterSuccess = manager.getCall(callId);
expect(provider.playTtsCalls).toHaveLength(2);
expect(afterSuccess?.metadata?.initialMessage).toBeUndefined();
});
it("speaks initial message only once on repeated stream-connect triggers", async () => {
const { manager, provider } = await createManagerHarness(
{ streaming: { enabled: true } },
new FakeProvider("twilio"),
);
const { callId, success } = await manager.initiateCall("+15550000007", undefined, {
message: "Stream hello",
mode: "conversation",
});
expect(success).toBe(true);
manager.processEvent({
id: "evt-stream-answered",
type: "call.answered",
callId,
providerCallId: "call-uuid",
timestamp: Date.now(),
});
await new Promise((resolve) => setTimeout(resolve, 0));
expect(provider.playTtsCalls).toHaveLength(0);
await manager.speakInitialMessage("call-uuid");
await manager.speakInitialMessage("call-uuid");
expect(provider.playTtsCalls).toHaveLength(1);
expect(provider.playTtsCalls[0]?.text).toBe("Stream hello");
});
it("prevents concurrent initial-message replays while first playback is in flight", async () => {
const provider = new DelayedPlayTtsProvider("twilio");
const { manager } = await createManagerHarness({ streaming: { enabled: true } }, provider);
const { callId, success } = await manager.initiateCall("+15550000008", undefined, {
message: "In-flight hello",
mode: "conversation",
});
expect(success).toBe(true);
manager.processEvent({
id: "evt-stream-answered-concurrent",
type: "call.answered",
callId,
providerCallId: "call-uuid",
timestamp: Date.now(),
});
await new Promise((resolve) => setTimeout(resolve, 0));
expect(provider.playTtsCalls).toHaveLength(0);
const first = manager.speakInitialMessage("call-uuid");
await vi.waitFor(() => {
expect(provider.playTtsStarted).toHaveBeenCalledTimes(1);
});
const second = manager.speakInitialMessage("call-uuid");
await new Promise((resolve) => setTimeout(resolve, 0));
expect(provider.playTtsCalls).toHaveLength(1);
provider.releaseCurrentPlayback();
await Promise.all([first, second]);
const call = manager.getCall(callId);
expect(call?.metadata?.initialMessage).toBeUndefined();
expect(provider.playTtsCalls).toHaveLength(1);
expect(provider.playTtsCalls[0]?.text).toBe("In-flight hello");
});
});

View File

@ -20,6 +20,7 @@ import type {
export class FakeProvider implements VoiceCallProvider {
readonly name: "plivo" | "twilio";
twilioStreamConnectEnabled = true;
readonly playTtsCalls: PlayTtsInput[] = [];
readonly hangupCalls: HangupCallInput[] = [];
readonly startListeningCalls: StartListeningInput[] = [];
@ -61,6 +62,10 @@ export class FakeProvider implements VoiceCallProvider {
async getCallStatus(_input: GetCallStatusInput): Promise<GetCallStatusResult> {
return this.getCallStatusResult;
}
isConversationStreamConnectEnabled(): boolean {
return this.name === "twilio" && this.twilioStreamConnectEnabled;
}
}
let storeSeq = 0;

View File

@ -64,6 +64,7 @@ export class CallManager {
}
>();
private maxDurationTimers = new Map<CallId, NodeJS.Timeout>();
private initialMessageInFlight = new Set<CallId>();
constructor(config: VoiceCallConfig, storePath?: string) {
this.config = config;
@ -256,6 +257,7 @@ export class CallManager {
activeTurnCalls: this.activeTurnCalls,
transcriptWaiters: this.transcriptWaiters,
maxDurationTimers: this.maxDurationTimers,
initialMessageInFlight: this.initialMessageInFlight,
onCallAnswered: (call) => {
this.maybeSpeakInitialMessageOnAnswered(call);
},
@ -269,6 +271,21 @@ export class CallManager {
processManagerEvent(this.getContext(), event);
}
private shouldDeferConversationInitialMessageUntilStreamConnect(): boolean {
if (!this.provider || this.provider.name !== "twilio" || !this.config.streaming.enabled) {
return false;
}
const streamAwareProvider = this.provider as VoiceCallProvider & {
isConversationStreamConnectEnabled?: () => boolean;
};
if (typeof streamAwareProvider.isConversationStreamConnectEnabled !== "function") {
return false;
}
return streamAwareProvider.isConversationStreamConnectEnabled();
}
private maybeSpeakInitialMessageOnAnswered(call: CallRecord): void {
const initialMessage =
typeof call.metadata?.initialMessage === "string" ? call.metadata.initialMessage.trim() : "";
@ -277,6 +294,20 @@ export class CallManager {
return;
}
// Notify mode should speak as soon as the provider reports "answered".
// Conversation mode should defer only when the Twilio stream-connect path
// is actually available; otherwise speak immediately on answered.
const mode = (call.metadata?.mode as string | undefined) ?? "conversation";
if (mode === "conversation") {
const shouldWaitForStreamConnect =
this.shouldDeferConversationInitialMessageUntilStreamConnect();
if (shouldWaitForStreamConnect) {
return;
}
} else if (mode !== "notify") {
return;
}
if (!this.provider || !call.providerCallId) {
return;
}

View File

@ -28,6 +28,7 @@ export type CallManagerTransientState = {
activeTurnCalls: Set<CallId>;
transcriptWaiters: Map<CallId, TranscriptWaiter>;
maxDurationTimers: Map<CallId, NodeJS.Timeout>;
initialMessageInFlight: Set<CallId>;
};
export type CallManagerHooks = {

View File

@ -27,6 +27,7 @@ function createContext(overrides: Partial<CallManagerContext> = {}): CallManager
activeTurnCalls: new Set(),
transcriptWaiters: new Map(),
maxDurationTimers: new Map(),
initialMessageInFlight: new Set(),
...overrides,
};
}

View File

@ -39,6 +39,7 @@ type ConversationContext = Pick<
| "activeTurnCalls"
| "transcriptWaiters"
| "maxDurationTimers"
| "initialMessageInFlight"
>;
type EndCallContext = Pick<
@ -210,8 +211,6 @@ export async function speak(
transitionState(call, "speaking");
persistCallRecord(ctx.storePath, call);
addTranscriptEntry(call, "bot", text);
const voice = provider.name === "twilio" ? ctx.config.tts?.openai?.voice : undefined;
await provider.playTts({
callId,
@ -220,8 +219,14 @@ export async function speak(
voice,
});
addTranscriptEntry(call, "bot", text);
persistCallRecord(ctx.storePath, call);
return { success: true };
} catch (err) {
// A failed playback should not leave the call stuck in speaking state.
transitionState(call, "listening");
persistCallRecord(ctx.storePath, call);
return { success: false, error: err instanceof Error ? err.message : String(err) };
}
}
@ -248,29 +253,41 @@ export async function speakInitialMessage(
return;
}
// Clear so we don't speak it again if the provider reconnects.
if (call.metadata) {
delete call.metadata.initialMessage;
persistCallRecord(ctx.storePath, call);
}
console.log(`[voice-call] Speaking initial message for call ${call.callId} (mode: ${mode})`);
const result = await speak(ctx, call.callId, initialMessage);
if (!result.success) {
console.warn(`[voice-call] Failed to speak initial message: ${result.error}`);
if (ctx.initialMessageInFlight.has(call.callId)) {
console.log(
`[voice-call] speakInitialMessage: initial message already in flight for ${call.callId}`,
);
return;
}
ctx.initialMessageInFlight.add(call.callId);
if (mode === "notify") {
const delaySec = ctx.config.outbound.notifyHangupDelaySec;
console.log(`[voice-call] Notify mode: auto-hangup in ${delaySec}s for call ${call.callId}`);
setTimeout(async () => {
const currentCall = ctx.activeCalls.get(call.callId);
if (currentCall && !TerminalStates.has(currentCall.state)) {
console.log(`[voice-call] Notify mode: hanging up call ${call.callId}`);
await endCall(ctx, call.callId);
}
}, delaySec * 1000);
try {
console.log(`[voice-call] Speaking initial message for call ${call.callId} (mode: ${mode})`);
const result = await speak(ctx, call.callId, initialMessage);
if (!result.success) {
console.warn(`[voice-call] Failed to speak initial message: ${result.error}`);
return;
}
// Clear only after successful playback so transient provider failures can retry.
if (call.metadata) {
delete call.metadata.initialMessage;
persistCallRecord(ctx.storePath, call);
}
if (mode === "notify") {
const delaySec = ctx.config.outbound.notifyHangupDelaySec;
console.log(`[voice-call] Notify mode: auto-hangup in ${delaySec}s for call ${call.callId}`);
setTimeout(async () => {
const currentCall = ctx.activeCalls.get(call.callId);
if (currentCall && !TerminalStates.has(currentCall.state)) {
console.log(`[voice-call] Notify mode: hanging up call ${call.callId}`);
await endCall(ctx, call.callId);
}
}, delaySec * 1000);
}
} finally {
ctx.initialMessageInFlight.delete(call.callId);
}
}

View File

@ -1,6 +1,6 @@
import { once } from "node:events";
import http from "node:http";
import { describe, expect, it } from "vitest";
import { describe, expect, it, vi } from "vitest";
import { WebSocket } from "ws";
import { MediaStreamHandler } from "./media-stream.js";
import type {
@ -163,6 +163,105 @@ describe("MediaStreamHandler TTS queue", () => {
});
describe("MediaStreamHandler security hardening", () => {
it("fails sends and closes stream when buffered bytes already exceed the cap", () => {
const handler = new MediaStreamHandler({
sttProvider: createStubSttProvider(),
});
const ws = {
readyState: WebSocket.OPEN,
bufferedAmount: 2 * 1024 * 1024,
send: vi.fn(),
close: vi.fn(),
} as unknown as WebSocket;
(
handler as unknown as {
sessions: Map<
string,
{ callId: string; streamSid: string; ws: WebSocket; sttSession: RealtimeSTTSession }
>;
}
).sessions.set("MZ-backpressure", {
callId: "CA-backpressure",
streamSid: "MZ-backpressure",
ws,
sttSession: createStubSession(),
});
const result = handler.sendAudio("MZ-backpressure", Buffer.alloc(160, 0xff));
expect(result.sent).toBe(false);
expect(ws.send).not.toHaveBeenCalled();
expect(ws.close).toHaveBeenCalledWith(1013, "Backpressure: send buffer exceeded");
});
it("fails sends when buffered bytes exceed cap after enqueueing a frame", () => {
const handler = new MediaStreamHandler({
sttProvider: createStubSttProvider(),
});
const ws = {
readyState: WebSocket.OPEN,
bufferedAmount: 0,
send: vi.fn(() => {
(
ws as unknown as {
bufferedAmount: number;
}
).bufferedAmount = 2 * 1024 * 1024;
}),
close: vi.fn(),
} as unknown as WebSocket;
(
handler as unknown as {
sessions: Map<
string,
{ callId: string; streamSid: string; ws: WebSocket; sttSession: RealtimeSTTSession }
>;
}
).sessions.set("MZ-overflow", {
callId: "CA-overflow",
streamSid: "MZ-overflow",
ws,
sttSession: createStubSession(),
});
const result = handler.sendMark("MZ-overflow", "mark-1");
expect(ws.send).toHaveBeenCalledTimes(1);
expect(result.sent).toBe(false);
expect(ws.close).toHaveBeenCalledWith(1013, "Backpressure: send buffer exceeded");
});
it("sanitizes websocket close reason before logging", async () => {
const logSpy = vi.spyOn(console, "log").mockImplementation(() => {});
const handler = new MediaStreamHandler({
sttProvider: createStubSttProvider(),
preStartTimeoutMs: 5_000,
shouldAcceptStream: () => true,
});
const server = await startWsServer(handler);
try {
const ws = await connectWs(server.url);
ws.close(1000, "forged\nline\r\tentry");
await waitForClose(ws);
const closeLog = logSpy.mock.calls
.map((call) => call[0])
.find(
(value): value is string =>
typeof value === "string" && value.includes("[MediaStream] WebSocket closed"),
);
expect(closeLog).toBeDefined();
expect(closeLog).not.toContain("\n");
expect(closeLog).not.toContain("\r");
expect(closeLog).not.toContain("\t");
expect(closeLog).toContain("forged line entry");
} finally {
logSpy.mockRestore();
await server.close();
}
});
it("closes idle pre-start connections after timeout", async () => {
const shouldAcceptStreamCalls: Array<{ callId: string; streamSid: string; token?: string }> =
[];

View File

@ -40,7 +40,7 @@ export interface MediaStreamConfig {
/** Callback when speech starts (barge-in) */
onSpeechStart?: (callId: string) => void;
/** Callback when stream disconnects */
onDisconnect?: (callId: string) => void;
onDisconnect?: (callId: string, streamSid: string) => void;
}
/**
@ -60,6 +60,13 @@ type TtsQueueEntry = {
reject: (error: unknown) => void;
};
type StreamSendResult = {
sent: boolean;
readyState?: number;
bufferedBeforeBytes: number;
bufferedAfterBytes: number;
};
type PendingConnection = {
ip: string;
timeout: ReturnType<typeof setTimeout>;
@ -69,6 +76,19 @@ const DEFAULT_PRE_START_TIMEOUT_MS = 5000;
const DEFAULT_MAX_PENDING_CONNECTIONS = 32;
const DEFAULT_MAX_PENDING_CONNECTIONS_PER_IP = 4;
const DEFAULT_MAX_CONNECTIONS = 128;
const MAX_WS_BUFFERED_BYTES = 1024 * 1024;
const CLOSE_REASON_LOG_MAX_CHARS = 120;
function sanitizeLogText(value: string, maxChars: number): string {
const sanitized = value
.replace(/[\u0000-\u001f\u007f]/g, " ")
.replace(/\s+/g, " ")
.trim();
if (sanitized.length <= maxChars) {
return sanitized;
}
return `${sanitized.slice(0, maxChars)}...`;
}
/**
* Manages WebSocket connections for Twilio media streams.
@ -170,7 +190,12 @@ export class MediaStreamHandler {
}
});
ws.on("close", () => {
ws.on("close", (code, reason) => {
const rawReason = Buffer.isBuffer(reason) ? reason.toString("utf8") : String(reason || "");
const reasonText = sanitizeLogText(rawReason, CLOSE_REASON_LOG_MAX_CHARS);
console.log(
`[MediaStream] WebSocket closed (code: ${code}, reason: ${reasonText || "none"})`,
);
this.clearPendingConnection(ws);
if (session) {
this.handleStop(session);
@ -258,7 +283,7 @@ export class MediaStreamHandler {
this.clearTtsState(session.streamSid);
session.sttSession.close();
this.sessions.delete(session.streamSid);
this.config.onDisconnect?.(session.callId);
this.config.onDisconnect?.(session.callId, session.streamSid);
}
private getStreamToken(request: IncomingMessage): string | undefined {
@ -347,17 +372,78 @@ export class MediaStreamHandler {
/**
* Send a message to a stream's WebSocket if available.
*/
private sendToStream(streamSid: string, message: unknown): void {
const session = this.getOpenSession(streamSid);
session?.ws.send(JSON.stringify(message));
private sendToStream(streamSid: string, message: unknown): StreamSendResult {
const session = this.sessions.get(streamSid);
if (!session) {
return {
sent: false,
bufferedBeforeBytes: 0,
bufferedAfterBytes: 0,
};
}
const readyState = session.ws.readyState;
const bufferedBeforeBytes = session.ws.bufferedAmount;
if (readyState !== WebSocket.OPEN) {
return {
sent: false,
readyState,
bufferedBeforeBytes,
bufferedAfterBytes: session.ws.bufferedAmount,
};
}
if (bufferedBeforeBytes > MAX_WS_BUFFERED_BYTES) {
try {
session.ws.close(1013, "Backpressure: send buffer exceeded");
} catch {
// Best-effort close; caller still receives sent:false.
}
return {
sent: false,
readyState,
bufferedBeforeBytes,
bufferedAfterBytes: session.ws.bufferedAmount,
};
}
try {
session.ws.send(JSON.stringify(message));
const bufferedAfterBytes = session.ws.bufferedAmount;
if (bufferedAfterBytes > MAX_WS_BUFFERED_BYTES) {
try {
session.ws.close(1013, "Backpressure: send buffer exceeded");
} catch {
// Best-effort close; caller still receives sent:false.
}
return {
sent: false,
readyState,
bufferedBeforeBytes,
bufferedAfterBytes,
};
}
return {
sent: true,
readyState,
bufferedBeforeBytes,
bufferedAfterBytes,
};
} catch {
return {
sent: false,
readyState,
bufferedBeforeBytes,
bufferedAfterBytes: session.ws.bufferedAmount,
};
}
}
/**
* Send audio to a specific stream (for TTS playback).
* Audio should be mu-law encoded at 8kHz mono.
*/
sendAudio(streamSid: string, muLawAudio: Buffer): void {
this.sendToStream(streamSid, {
sendAudio(streamSid: string, muLawAudio: Buffer): StreamSendResult {
return this.sendToStream(streamSid, {
event: "media",
streamSid,
media: { payload: muLawAudio.toString("base64") },
@ -367,8 +453,8 @@ export class MediaStreamHandler {
/**
* Send a mark event to track audio playback position.
*/
sendMark(streamSid: string, name: string): void {
this.sendToStream(streamSid, {
sendMark(streamSid: string, name: string): StreamSendResult {
return this.sendToStream(streamSid, {
event: "mark",
streamSid,
mark: { name },
@ -378,8 +464,8 @@ export class MediaStreamHandler {
/**
* Clear audio buffer (interrupt playback).
*/
clearAudio(streamSid: string): void {
this.sendToStream(streamSid, { event: "clear", streamSid });
clearAudio(streamSid: string): StreamSendResult {
return this.sendToStream(streamSid, { event: "clear", streamSid });
}
/**
@ -412,7 +498,7 @@ export class MediaStreamHandler {
/**
* Clear TTS queue and interrupt current playback (barge-in).
*/
clearTtsQueue(streamSid: string): void {
clearTtsQueue(streamSid: string, _reason = "unspecified"): void {
const queue = this.getTtsQueue(streamSid);
queue.length = 0;
this.ttsActiveControllers.get(streamSid)?.abort();

View File

@ -1,5 +1,5 @@
import { resolveOpenAITtsInstructions } from "../../api.js";
import { pcmToMulaw } from "../telephony-audio.js";
import { convertPcmToMulaw8k } from "../telephony-audio.js";
/**
* OpenAI TTS Provider
@ -145,51 +145,11 @@ export class OpenAITTSProvider {
// Get raw PCM from OpenAI (24kHz, 16-bit signed LE, mono)
const pcm24k = await this.synthesize(text);
// Resample from 24kHz to 8kHz
const pcm8k = resample24kTo8k(pcm24k);
// Encode to mu-law
return pcmToMulaw(pcm8k);
// Convert from 24kHz PCM to Twilio-compatible 8kHz mu-law
return convertPcmToMulaw8k(pcm24k, 24000);
}
}
/**
* Resample 24kHz PCM to 8kHz using linear interpolation.
* Input/output: 16-bit signed little-endian mono.
*/
function resample24kTo8k(input: Buffer): Buffer {
const inputSamples = input.length / 2;
const outputSamples = Math.floor(inputSamples / 3);
const output = Buffer.alloc(outputSamples * 2);
for (let i = 0; i < outputSamples; i++) {
// Calculate position in input (3:1 ratio)
const srcPos = i * 3;
const srcIdx = srcPos * 2;
if (srcIdx + 3 < input.length) {
// Linear interpolation between samples
const s0 = input.readInt16LE(srcIdx);
const s1 = input.readInt16LE(srcIdx + 2);
const frac = srcPos % 1 || 0;
const sample = Math.round(s0 + frac * (s1 - s0));
output.writeInt16LE(clamp16(sample), i * 2);
} else {
// Last sample
output.writeInt16LE(input.readInt16LE(srcIdx), i * 2);
}
}
return output;
}
/**
* Clamp value to 16-bit signed integer range.
*/
function clamp16(value: number): number {
return Math.max(-32768, Math.min(32767, value));
}
/**
* Convert 8-bit mu-law to 16-bit linear PCM.
* Useful for decoding incoming audio.

View File

@ -1,4 +1,4 @@
import { describe, expect, it } from "vitest";
import { describe, expect, it, vi } from "vitest";
import type { WebhookContext } from "../types.js";
import { TwilioProvider } from "./twilio.js";
@ -188,4 +188,162 @@ describe("TwilioProvider", () => {
expect(event?.type).toBe("call.speech");
expect(event?.turnToken).toBe("turn-xyz");
});
it("fails when an active stream exists but telephony TTS is unavailable", async () => {
const provider = createProvider();
const apiRequest = vi.fn<
(
endpoint: string,
params: Record<string, string | string[]>,
options?: { allowNotFound?: boolean },
) => Promise<unknown>
>(async () => ({}));
(
provider as unknown as {
apiRequest: (
endpoint: string,
params: Record<string, string | string[]>,
options?: { allowNotFound?: boolean },
) => Promise<unknown>;
}
).apiRequest = apiRequest;
(
provider as unknown as {
callWebhookUrls: Map<string, string>;
}
).callWebhookUrls.set("CA-stream", "https://example.ngrok.app/voice/twilio");
provider.registerCallStream("CA-stream", "MZ-stream");
await expect(
provider.playTts({
callId: "call-stream",
providerCallId: "CA-stream",
text: "Hello stream",
}),
).rejects.toThrow("refusing TwiML fallback");
expect(apiRequest).not.toHaveBeenCalled();
});
it("falls back to TwiML when no active stream exists and telephony TTS is unavailable", async () => {
const provider = createProvider();
const apiRequest = vi.fn<
(
endpoint: string,
params: Record<string, string | string[]>,
options?: { allowNotFound?: boolean },
) => Promise<unknown>
>(async () => ({}));
(
provider as unknown as {
apiRequest: (
endpoint: string,
params: Record<string, string | string[]>,
options?: { allowNotFound?: boolean },
) => Promise<unknown>;
}
).apiRequest = apiRequest;
(
provider as unknown as {
callWebhookUrls: Map<string, string>;
}
).callWebhookUrls.set("CA-nostream", "https://example.ngrok.app/voice/twilio");
await expect(
provider.playTts({
callId: "call-nostream",
providerCallId: "CA-nostream",
text: "Hello TwiML",
}),
).resolves.toBeUndefined();
expect(apiRequest).toHaveBeenCalledTimes(1);
const call = apiRequest.mock.calls[0]!;
const endpoint = call[0];
const params = call[1] as { Twiml?: string };
expect(endpoint).toBe("/Calls/CA-nostream.json");
expect(params.Twiml).toContain("<Say");
});
it("ignores stale stream unregister requests that do not match current stream SID", () => {
const provider = createProvider();
provider.registerCallStream("CA-reconnect", "MZ-new");
provider.unregisterCallStream("CA-reconnect", "MZ-old");
expect(provider.hasRegisteredStream("CA-reconnect")).toBe(true);
provider.unregisterCallStream("CA-reconnect", "MZ-new");
expect(provider.hasRegisteredStream("CA-reconnect")).toBe(false);
});
it("times out telephony synthesis in stream mode and does not send completion mark", async () => {
vi.useFakeTimers();
try {
const provider = createProvider();
provider.registerCallStream("CA-timeout", "MZ-timeout");
const sendAudio = vi.fn();
const sendMark = vi.fn();
const mediaStreamHandler = {
queueTts: async (
_streamSid: string,
playFn: (signal: AbortSignal) => Promise<void>,
): Promise<void> => {
await playFn(new AbortController().signal);
},
sendAudio,
sendMark,
};
provider.setMediaStreamHandler(mediaStreamHandler as never);
provider.setTTSProvider({
synthesizeForTelephony: async () => await new Promise<Buffer>(() => {}),
});
const playExpectation = expect(
provider.playTts({
callId: "call-timeout",
providerCallId: "CA-timeout",
text: "Timeout me",
}),
).rejects.toThrow("Telephony TTS synthesis timed out");
await vi.advanceTimersByTimeAsync(8_100);
await playExpectation;
expect(sendAudio).toHaveBeenCalled();
expect(sendMark).not.toHaveBeenCalled();
} finally {
vi.useRealTimers();
}
});
it("fails stream playback when all audio sends and completion mark are dropped", async () => {
const provider = createProvider();
provider.registerCallStream("CA-dropped", "MZ-dropped");
const sendAudio = vi.fn(() => ({ sent: false }));
const sendMark = vi.fn(() => ({ sent: false }));
const mediaStreamHandler = {
queueTts: async (
_streamSid: string,
playFn: (signal: AbortSignal) => Promise<void>,
): Promise<void> => {
await playFn(new AbortController().signal);
},
sendAudio,
sendMark,
};
provider.setMediaStreamHandler(mediaStreamHandler as never);
provider.setTTSProvider({
synthesizeForTelephony: async () => Buffer.alloc(320),
});
await expect(
provider.playTts({
callId: "call-dropped",
providerCallId: "CA-dropped",
text: "Dropped audio",
}),
).rejects.toThrow("Telephony stream playback failed");
expect(sendAudio).toHaveBeenCalled();
expect(sendMark).toHaveBeenCalledTimes(1);
});
});

View File

@ -31,6 +31,10 @@ import { twilioApiRequest } from "./twilio/api.js";
import { decideTwimlResponse, readTwimlRequestView } from "./twilio/twiml-policy.js";
import { verifyTwilioProviderWebhook } from "./twilio/webhook.js";
type StreamSendResult = {
sent: boolean;
};
function createTwilioRequestDedupeKey(ctx: WebhookContext, verifiedRequestKey?: string): string {
if (verifiedRequestKey) {
return verifiedRequestKey;
@ -76,6 +80,7 @@ export interface TwilioProviderOptions {
export class TwilioProvider implements VoiceCallProvider {
readonly name = "twilio" as const;
private static readonly TTS_SYNTH_TIMEOUT_MS = 8000;
private readonly accountSid: string;
private readonly authToken: string;
@ -172,11 +177,29 @@ export class TwilioProvider implements VoiceCallProvider {
this.callStreamMap.set(callSid, streamSid);
}
unregisterCallStream(callSid: string): void {
hasRegisteredStream(callSid: string): boolean {
return this.callStreamMap.has(callSid);
}
unregisterCallStream(callSid: string, streamSid?: string): void {
const currentStreamSid = this.callStreamMap.get(callSid);
if (!currentStreamSid) {
if (!streamSid) {
this.activeStreamCalls.delete(callSid);
}
return;
}
if (streamSid && currentStreamSid !== streamSid) {
return;
}
this.callStreamMap.delete(callSid);
this.activeStreamCalls.delete(callSid);
}
isConversationStreamConnectEnabled(): boolean {
return Boolean(this.mediaStreamHandler && this.getStreamUrl());
}
isValidStreamToken(callSid: string, token?: string): boolean {
const expected = this.streamAuthTokens.get(callSid);
if (!expected || !token) {
@ -194,11 +217,12 @@ export class TwilioProvider implements VoiceCallProvider {
* Clear TTS queue for a call (barge-in).
* Used when user starts speaking to interrupt current TTS playback.
*/
clearTtsQueue(callSid: string): void {
clearTtsQueue(callSid: string, reason = "unspecified"): void {
const streamSid = this.callStreamMap.get(callSid);
if (streamSid && this.mediaStreamHandler) {
this.mediaStreamHandler.clearTtsQueue(streamSid);
if (!streamSid || !this.mediaStreamHandler) {
return;
}
this.mediaStreamHandler.clearTtsQueue(streamSid, reason);
}
/**
@ -550,28 +574,32 @@ export class TwilioProvider implements VoiceCallProvider {
* Play TTS audio via Twilio.
*
* Two modes:
* 1. Core TTS + Media Streams: If TTS provider and media stream are available,
* generates audio via core TTS and streams it through WebSocket (preferred).
* 2. TwiML <Say>: Falls back to Twilio's native TTS with Polly voices.
* Note: This may not work on all Twilio accounts.
* 1. Core TTS + Media Streams: when an active stream exists, stream playback is required.
* If telephony TTS is unavailable in that state, playback fails rather than mixing paths.
* 2. TwiML <Say>: fallback only when there is no active stream for the call.
*/
async playTts(input: PlayTtsInput): Promise<void> {
// Try telephony TTS via media stream first (if configured)
const streamSid = this.callStreamMap.get(input.providerCallId);
if (this.ttsProvider && this.mediaStreamHandler && streamSid) {
if (streamSid) {
if (!this.ttsProvider || !this.mediaStreamHandler) {
throw new Error(
"Telephony TTS unavailable while media stream is active; refusing TwiML fallback",
);
}
try {
await this.playTtsViaStream(input.text, streamSid);
return;
} catch (err) {
console.warn(
`[voice-call] Telephony TTS failed, falling back to Twilio <Say>:`,
`[voice-call] Telephony TTS failed:`,
err instanceof Error ? err.message : err,
);
// Fall through to TwiML <Say> fallback
throw err instanceof Error ? err : new Error(String(err));
}
}
// Fall back to TwiML <Say> (may not work on all accounts)
// Fall back to TwiML <Say> only when no active stream exists.
const webhookUrl = this.callWebhookUrls.get(input.providerCallId);
if (!webhookUrl) {
throw new Error("Missing webhook URL for this call (provider state not initialized)");
@ -608,28 +636,111 @@ export class TwilioProvider implements VoiceCallProvider {
// Stream audio in 20ms chunks (160 bytes at 8kHz mu-law)
const CHUNK_SIZE = 160;
const CHUNK_DELAY_MS = 20;
const SILENCE_CHUNK = Buffer.alloc(CHUNK_SIZE, 0xff);
const handler = this.mediaStreamHandler;
const ttsProvider = this.ttsProvider;
const normalizeSendResult = (raw: unknown): StreamSendResult => {
if (!raw || typeof raw !== "object") {
return { sent: true };
}
const typed = raw as {
sent?: unknown;
};
return {
sent: typed.sent === undefined ? true : Boolean(typed.sent),
};
};
const sendAudioChunk = (audio: Buffer): StreamSendResult => {
const raw = (handler as { sendAudio: (sid: string, chunk: Buffer) => unknown }).sendAudio(
streamSid,
audio,
);
return normalizeSendResult(raw);
};
const sendPlaybackMark = (name: string): StreamSendResult => {
const raw = (handler as { sendMark: (sid: string, markName: string) => unknown }).sendMark(
streamSid,
name,
);
return normalizeSendResult(raw);
};
await handler.queueTts(streamSid, async (signal) => {
const sendKeepAlive = () => {
sendAudioChunk(SILENCE_CHUNK);
};
sendKeepAlive();
const keepAlive = setInterval(() => {
if (!signal.aborted) {
sendKeepAlive();
}
}, CHUNK_DELAY_MS);
// Generate audio with core TTS (returns mu-law at 8kHz)
const muLawAudio = await ttsProvider.synthesizeForTelephony(text);
let muLawAudio: Buffer;
let synthTimeout: ReturnType<typeof setTimeout> | null = null;
try {
const synthPromise = ttsProvider.synthesizeForTelephony(text);
const timeoutPromise = new Promise<Buffer>((_, reject) => {
synthTimeout = setTimeout(() => {
reject(
new Error(
`Telephony TTS synthesis timed out after ${TwilioProvider.TTS_SYNTH_TIMEOUT_MS}ms`,
),
);
}, TwilioProvider.TTS_SYNTH_TIMEOUT_MS);
});
muLawAudio = await Promise.race([synthPromise, timeoutPromise]);
} finally {
if (synthTimeout) {
clearTimeout(synthTimeout);
}
clearInterval(keepAlive);
}
let chunkAttempts = 0;
let chunkDelivered = 0;
let nextChunkDueAt = Date.now() + CHUNK_DELAY_MS;
for (const chunk of chunkAudio(muLawAudio, CHUNK_SIZE)) {
if (signal.aborted) {
break;
}
handler.sendAudio(streamSid, chunk);
chunkAttempts += 1;
const chunkResult = sendAudioChunk(chunk);
if (chunkResult.sent) {
chunkDelivered += 1;
}
// Pace the audio to match real-time playback
await new Promise((resolve) => setTimeout(resolve, CHUNK_DELAY_MS));
// Drift-corrected pacing: schedule against an absolute clock to avoid cumulative delay.
const waitMs = nextChunkDueAt - Date.now();
if (waitMs > 0) {
await new Promise((resolve) => setTimeout(resolve, Math.ceil(waitMs)));
}
nextChunkDueAt += CHUNK_DELAY_MS;
if (signal.aborted) {
break;
}
}
let markSent = true;
if (!signal.aborted) {
// Send a mark to track when audio finishes
handler.sendMark(streamSid, `tts-${Date.now()}`);
markSent = sendPlaybackMark(`tts-${Date.now()}`).sent;
}
if (!signal.aborted && chunkAttempts > 0 && (chunkDelivered === 0 || !markSent)) {
const failures: string[] = [];
if (chunkDelivered === 0) {
failures.push("no audio chunks delivered");
}
if (!markSent) {
failures.push("completion mark not delivered");
}
throw new Error(`Telephony stream playback failed: ${failures.join("; ")}`);
}
});
}

View File

@ -0,0 +1,111 @@
import { describe, expect, it, vi } from "vitest";
import { VoiceCallConfigSchema } from "./config.js";
import type { CoreAgentDeps, CoreConfig } from "./core-bridge.js";
import { generateVoiceResponse } from "./response-generator.js";
function createAgentRuntime(payloads: Array<Record<string, unknown>>) {
const runEmbeddedPiAgent = vi.fn(async () => ({
payloads,
meta: { durationMs: 12, aborted: false },
}));
const runtime = {
defaults: {
provider: "together",
model: "Qwen/Qwen2.5-7B-Instruct-Turbo",
},
resolveAgentDir: () => "/tmp/openclaw/agents/main",
resolveAgentWorkspaceDir: () => "/tmp/openclaw/workspace/main",
resolveAgentIdentity: () => ({ name: "tester" }),
resolveThinkingDefault: () => "off",
resolveAgentTimeoutMs: () => 30_000,
ensureAgentWorkspace: async () => {},
runEmbeddedPiAgent,
session: {
resolveStorePath: () => "/tmp/openclaw/sessions.json",
loadSessionStore: () => ({}),
saveSessionStore: async () => {},
resolveSessionFilePath: () => "/tmp/openclaw/sessions/session.jsonl",
},
} as unknown as CoreAgentDeps;
return { runtime, runEmbeddedPiAgent };
}
async function runGenerateVoiceResponse(
payloads: Array<Record<string, unknown>>,
overrides?: {
runtime?: CoreAgentDeps;
transcript?: Array<{ speaker: "user" | "bot"; text: string }>;
},
) {
const voiceConfig = VoiceCallConfigSchema.parse({
responseTimeoutMs: 5000,
});
const coreConfig = {} as CoreConfig;
const runtime = overrides?.runtime ?? createAgentRuntime(payloads).runtime;
const result = await generateVoiceResponse({
voiceConfig,
coreConfig,
agentRuntime: runtime,
callId: "call-123",
from: "+15550001111",
transcript: overrides?.transcript ?? [{ speaker: "user", text: "hello there" }],
userMessage: "hello there",
});
return { result };
}
describe("generateVoiceResponse", () => {
it("suppresses reasoning payloads and reads structured spoken output", async () => {
const { runtime, runEmbeddedPiAgent } = createAgentRuntime([
{ text: "Reasoning: hidden", isReasoning: true },
{ text: '{"spoken":"Hello from JSON."}' },
]);
const { result } = await runGenerateVoiceResponse([], { runtime });
expect(result.text).toBe("Hello from JSON.");
expect(runEmbeddedPiAgent).toHaveBeenCalledTimes(1);
const calls = runEmbeddedPiAgent.mock.calls as unknown[][];
const firstCall = calls[0];
expect(firstCall).toBeDefined();
const args = firstCall?.[0] as { extraSystemPrompt?: string } | undefined;
expect(args?.extraSystemPrompt).toContain('{"spoken":"..."}');
});
it("extracts spoken text from fenced JSON", async () => {
const { result } = await runGenerateVoiceResponse([
{ text: '```json\n{"spoken":"Fenced JSON works."}\n```' },
]);
expect(result.text).toBe("Fenced JSON works.");
});
it("returns silence for an explicit empty spoken contract response", async () => {
const { result } = await runGenerateVoiceResponse([{ text: '{"spoken":""}' }]);
expect(result.text).toBeNull();
});
it("strips leading planning text when model returns plain text", async () => {
const { result } = await runGenerateVoiceResponse([
{
text:
"The user responded with short text. I should keep the response concise.\n\n" +
"Sounds good. I can help with the next step whenever you are ready.",
},
]);
expect(result.text).toBe("Sounds good. I can help with the next step whenever you are ready.");
});
it("keeps plain conversational output when no JSON contract is followed", async () => {
const { result } = await runGenerateVoiceResponse([
{ text: "Absolutely. Tell me what you want to do next." },
]);
expect(result.text).toBe("Absolutely. Tell me what you want to do next.");
});
});

View File

@ -30,6 +30,145 @@ export type VoiceResponseResult = {
error?: string;
};
type VoiceResponsePayload = {
text?: string;
isError?: boolean;
isReasoning?: boolean;
};
const VOICE_SPOKEN_OUTPUT_CONTRACT = [
"Output format requirements:",
'- Return only valid JSON in this exact shape: {"spoken":"..."}',
"- Do not include markdown, code fences, planning text, or extra keys.",
'- Put exactly what should be spoken to the caller into "spoken".',
'- If there is nothing to say, return {"spoken":""}.',
].join("\n");
function normalizeSpokenText(value: string): string | null {
const normalized = value.replace(/\s+/g, " ").trim();
return normalized.length > 0 ? normalized : null;
}
function tryParseSpokenJson(text: string): string | null {
const candidates: string[] = [];
const trimmed = text.trim();
if (!trimmed) {
return null;
}
candidates.push(trimmed);
const fenced = trimmed.match(/^```(?:json)?\s*([\s\S]*?)\s*```$/i);
if (fenced?.[1]) {
candidates.push(fenced[1]);
}
const firstBrace = trimmed.indexOf("{");
const lastBrace = trimmed.lastIndexOf("}");
if (firstBrace >= 0 && lastBrace > firstBrace) {
candidates.push(trimmed.slice(firstBrace, lastBrace + 1));
}
for (const candidate of candidates) {
try {
const parsed = JSON.parse(candidate) as { spoken?: unknown };
if (typeof parsed?.spoken !== "string") {
continue;
}
return normalizeSpokenText(parsed.spoken) ?? "";
} catch {
// Continue trying other candidates.
}
}
const inlineSpokenMatch = trimmed.match(/"spoken"\s*:\s*"((?:[^"\\]|\\.)*)"/i);
if (!inlineSpokenMatch) {
return null;
}
try {
const decoded = JSON.parse(`"${inlineSpokenMatch[1] ?? ""}"`) as string;
return normalizeSpokenText(decoded) ?? "";
} catch {
return null;
}
}
function isLikelyMetaReasoningParagraph(paragraph: string): boolean {
const lower = paragraph.toLowerCase();
if (!lower) {
return false;
}
if (lower.startsWith("thinking process")) {
return true;
}
if (lower.startsWith("reasoning:") || lower.startsWith("analysis:")) {
return true;
}
if (
lower.startsWith("the user ") &&
(lower.includes("i should") || lower.includes("i need to") || lower.includes("i will"))
) {
return true;
}
if (
lower.includes("this is a natural continuation of the conversation") ||
lower.includes("keep the conversation flowing")
) {
return true;
}
return false;
}
function sanitizePlainSpokenText(text: string): string | null {
const withoutCodeFences = text.replace(/```[\s\S]*?```/g, " ").trim();
if (!withoutCodeFences) {
return null;
}
const paragraphs = withoutCodeFences
.split(/\n\s*\n+/)
.map((paragraph) => paragraph.trim())
.filter(Boolean);
while (paragraphs.length > 1 && isLikelyMetaReasoningParagraph(paragraphs[0])) {
paragraphs.shift();
}
return normalizeSpokenText(paragraphs.join(" "));
}
function extractSpokenTextFromPayloads(payloads: VoiceResponsePayload[]): string | null {
const spokenSegments: string[] = [];
for (const payload of payloads) {
if (payload.isError || payload.isReasoning) {
continue;
}
const rawText = payload.text?.trim() ?? "";
if (!rawText) {
continue;
}
const structured = tryParseSpokenJson(rawText);
if (structured !== null) {
if (structured.length > 0) {
spokenSegments.push(structured);
}
continue;
}
const plain = sanitizePlainSpokenText(rawText);
if (plain) {
spokenSegments.push(plain);
}
}
return spokenSegments.length > 0 ? spokenSegments.join(" ").trim() : null;
}
/**
* Generate a voice response using the embedded Pi agent with full tool support.
* Uses the same agent infrastructure as messaging for consistent behavior.
@ -103,6 +242,7 @@ export async function generateVoiceResponse(
.join("\n");
extraSystemPrompt = `${basePrompt}\n\nConversation so far:\n${history}`;
}
extraSystemPrompt = `${extraSystemPrompt}\n\n${VOICE_SPOKEN_OUTPUT_CONTRACT}`;
// Resolve timeout
const timeoutMs = voiceConfig.responseTimeoutMs ?? agentRuntime.resolveAgentTimeoutMs({ cfg });
@ -128,13 +268,7 @@ export async function generateVoiceResponse(
agentDir,
});
// Extract text from payloads
const texts = (result.payloads ?? [])
.filter((p) => p.text && !p.isError)
.map((p) => p.text?.trim())
.filter(Boolean);
const text = texts.join(" ") || null;
const text = extractSpokenTextFromPayloads((result.payloads ?? []) as VoiceResponsePayload[]);
if (!text && result.meta?.aborted) {
return { text: null, error: "Response generation was aborted" };

View File

@ -0,0 +1,61 @@
import { describe, expect, it } from "vitest";
import { convertPcmToMulaw8k, resamplePcmTo8k } from "./telephony-audio.js";
function makeSinePcm(
sampleRate: number,
frequencyHz: number,
durationSeconds: number,
amplitude = 12_000,
): Buffer {
const samples = Math.floor(sampleRate * durationSeconds);
const output = Buffer.alloc(samples * 2);
for (let i = 0; i < samples; i++) {
const value = Math.round(Math.sin((2 * Math.PI * frequencyHz * i) / sampleRate) * amplitude);
output.writeInt16LE(value, i * 2);
}
return output;
}
function rmsPcm(buffer: Buffer): number {
const samples = Math.floor(buffer.length / 2);
if (samples === 0) {
return 0;
}
let sum = 0;
for (let i = 0; i < samples; i++) {
const sample = buffer.readInt16LE(i * 2);
sum += sample * sample;
}
return Math.sqrt(sum / samples);
}
describe("telephony-audio resamplePcmTo8k", () => {
it("returns identical buffer for 8k input", () => {
const pcm8k = makeSinePcm(8_000, 1_000, 0.2);
const resampled = resamplePcmTo8k(pcm8k, 8_000);
expect(resampled).toBe(pcm8k);
});
it("preserves low-frequency speech-band energy when downsampling", () => {
const input = makeSinePcm(48_000, 1_000, 0.6);
const output = resamplePcmTo8k(input, 48_000);
expect(output.length).toBe(9_600);
expect(rmsPcm(output)).toBeGreaterThan(7_500);
});
it("attenuates out-of-band high frequencies before 8k telephony conversion", () => {
const lowTone = resamplePcmTo8k(makeSinePcm(48_000, 1_000, 0.6), 48_000);
const highTone = resamplePcmTo8k(makeSinePcm(48_000, 6_000, 0.6), 48_000);
const ratio = rmsPcm(highTone) / rmsPcm(lowTone);
expect(ratio).toBeLessThan(0.1);
});
});
describe("telephony-audio convertPcmToMulaw8k", () => {
it("converts to 8k mu-law frame length", () => {
const input = makeSinePcm(24_000, 1_000, 0.5);
const mulaw = convertPcmToMulaw8k(input, 24_000);
// 0.5s @ 8kHz => 4000 8-bit samples
expect(mulaw.length).toBe(4_000);
});
});

View File

@ -1,11 +1,58 @@
const TELEPHONY_SAMPLE_RATE = 8000;
const RESAMPLE_FILTER_TAPS = 31;
const RESAMPLE_CUTOFF_GUARD = 0.94;
function clamp16(value: number): number {
return Math.max(-32768, Math.min(32767, value));
}
function sinc(x: number): number {
if (x === 0) {
return 1;
}
return Math.sin(Math.PI * x) / (Math.PI * x);
}
/**
* Resample 16-bit PCM (little-endian mono) to 8kHz using linear interpolation.
* Build a finite low-pass kernel centered on `srcPos`.
* The kernel is windowed (Hann) to reduce ringing artifacts.
*/
function sampleBandlimited(
input: Buffer,
inputSamples: number,
srcPos: number,
cutoffCyclesPerSample: number,
): number {
const half = Math.floor(RESAMPLE_FILTER_TAPS / 2);
const center = Math.floor(srcPos);
let weighted = 0;
let weightSum = 0;
for (let tap = -half; tap <= half; tap++) {
const sampleIndex = center + tap;
if (sampleIndex < 0 || sampleIndex >= inputSamples) {
continue;
}
const distance = sampleIndex - srcPos;
const lowPass = 2 * cutoffCyclesPerSample * sinc(2 * cutoffCyclesPerSample * distance);
const tapIndex = tap + half;
const window = 0.5 - 0.5 * Math.cos((2 * Math.PI * tapIndex) / (RESAMPLE_FILTER_TAPS - 1));
const coeff = lowPass * window;
weighted += input.readInt16LE(sampleIndex * 2) * coeff;
weightSum += coeff;
}
if (weightSum === 0) {
const nearest = Math.max(0, Math.min(inputSamples - 1, Math.round(srcPos)));
return input.readInt16LE(nearest * 2);
}
return weighted / weightSum;
}
/**
* Resample 16-bit PCM (little-endian mono) to 8kHz using a windowed low-pass kernel.
*/
export function resamplePcmTo8k(input: Buffer, inputSampleRate: number): Buffer {
if (inputSampleRate === TELEPHONY_SAMPLE_RATE) {
@ -19,17 +66,15 @@ export function resamplePcmTo8k(input: Buffer, inputSampleRate: number): Buffer
const ratio = inputSampleRate / TELEPHONY_SAMPLE_RATE;
const outputSamples = Math.floor(inputSamples / ratio);
const output = Buffer.alloc(outputSamples * 2);
const maxCutoff = 0.5;
const downsampleCutoff = ratio > 1 ? maxCutoff / ratio : maxCutoff;
const cutoffCyclesPerSample = Math.max(0.01, downsampleCutoff * RESAMPLE_CUTOFF_GUARD);
for (let i = 0; i < outputSamples; i++) {
const srcPos = i * ratio;
const srcIndex = Math.floor(srcPos);
const frac = srcPos - srcIndex;
const s0 = input.readInt16LE(srcIndex * 2);
const s1Index = Math.min(srcIndex + 1, inputSamples - 1);
const s1 = input.readInt16LE(s1Index * 2);
const sample = Math.round(s0 + frac * (s1 - s0));
const sample = Math.round(
sampleBandlimited(input, inputSamples, srcPos, cutoffCyclesPerSample),
);
output.writeInt16LE(clamp16(sample), i * 2);
}

View File

@ -2,7 +2,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { VoiceCallConfigSchema, type VoiceCallConfig } from "./config.js";
import type { CallManager } from "./manager.js";
import type { VoiceCallProvider } from "./providers/base.js";
import type { CallRecord } from "./types.js";
import type { CallRecord, NormalizedEvent } from "./types.js";
import { VoiceCallWebhookServer } from "./webhook.js";
const provider: VoiceCallProvider = {
@ -350,3 +350,246 @@ describe("VoiceCallWebhookServer start idempotency", () => {
await server.stop();
});
});
describe("VoiceCallWebhookServer stream disconnect grace", () => {
beforeEach(() => {
vi.useFakeTimers();
});
afterEach(() => {
vi.useRealTimers();
});
it("ignores stale stream disconnects after reconnect and only hangs up on current stream disconnect", async () => {
const call = createCall(Date.now() - 1_000);
call.providerCallId = "CA-stream-1";
const endCall = vi.fn(async () => ({ success: true }));
const speakInitialMessage = vi.fn(async () => {});
const getCallByProviderCallId = vi.fn((providerCallId: string) =>
providerCallId === "CA-stream-1" ? call : undefined,
);
const manager = {
getActiveCalls: () => [call],
getCallByProviderCallId,
endCall,
speakInitialMessage,
processEvent: vi.fn(),
} as unknown as CallManager;
let currentStreamSid: string | null = "MZ-new";
const twilioProvider = {
name: "twilio" as const,
verifyWebhook: () => ({ ok: true, verifiedRequestKey: "twilio:req:test" }),
parseWebhookEvent: () => ({ events: [] }),
initiateCall: async () => ({ providerCallId: "provider-call", status: "initiated" as const }),
hangupCall: async () => {},
playTts: async () => {},
startListening: async () => {},
stopListening: async () => {},
getCallStatus: async () => ({ status: "in-progress", isTerminal: false }),
isValidStreamToken: () => true,
registerCallStream: (_callSid: string, streamSid: string) => {
currentStreamSid = streamSid;
},
unregisterCallStream: (_callSid: string, streamSid?: string) => {
if (!currentStreamSid) {
return;
}
if (streamSid && currentStreamSid !== streamSid) {
return;
}
currentStreamSid = null;
},
hasRegisteredStream: () => currentStreamSid !== null,
clearTtsQueue: () => {},
};
const config = createConfig({
provider: "twilio",
streaming: {
...createConfig().streaming,
enabled: true,
openaiApiKey: "test-key",
},
});
const server = new VoiceCallWebhookServer(
config,
manager,
twilioProvider as unknown as VoiceCallProvider,
);
const mediaHandler = server.getMediaStreamHandler() as unknown as {
config: {
onDisconnect?: (providerCallId: string, streamSid: string) => void;
onConnect?: (providerCallId: string, streamSid: string) => void;
};
};
expect(mediaHandler).toBeTruthy();
mediaHandler.config.onConnect?.("CA-stream-1", "MZ-new");
mediaHandler.config.onDisconnect?.("CA-stream-1", "MZ-old");
await vi.advanceTimersByTimeAsync(2_100);
expect(endCall).not.toHaveBeenCalled();
mediaHandler.config.onDisconnect?.("CA-stream-1", "MZ-new");
await vi.advanceTimersByTimeAsync(2_100);
expect(endCall).toHaveBeenCalledTimes(1);
expect(endCall).toHaveBeenCalledWith(call.callId);
await server.stop();
});
});
describe("VoiceCallWebhookServer barge-in suppression during initial message", () => {
const createTwilioProvider = (clearTtsQueue: ReturnType<typeof vi.fn>) => ({
name: "twilio" as const,
verifyWebhook: () => ({ ok: true, verifiedRequestKey: "twilio:req:test" }),
parseWebhookEvent: () => ({ events: [] }),
initiateCall: async () => ({ providerCallId: "provider-call", status: "initiated" as const }),
hangupCall: async () => {},
playTts: async () => {},
startListening: async () => {},
stopListening: async () => {},
getCallStatus: async () => ({ status: "in-progress", isTerminal: false }),
isValidStreamToken: () => true,
registerCallStream: () => {},
unregisterCallStream: () => {},
hasRegisteredStream: () => true,
clearTtsQueue,
});
const getMediaCallbacks = (server: VoiceCallWebhookServer) =>
server.getMediaStreamHandler() as unknown as {
config: {
onSpeechStart?: (providerCallId: string) => void;
onTranscript?: (providerCallId: string, transcript: string) => void;
};
};
it("suppresses barge-in clear while outbound conversation initial message is pending", async () => {
const call = createCall(Date.now() - 1_000);
call.callId = "call-barge";
call.providerCallId = "CA-barge";
call.direction = "outbound";
call.state = "speaking";
call.metadata = {
mode: "conversation",
initialMessage: "Hi, this is OpenClaw.",
};
const clearTtsQueue = vi.fn();
const processEvent = vi.fn((event: NormalizedEvent) => {
if (event.type === "call.speech") {
// Mirrors manager behavior: call.speech transitions to listening.
call.state = "listening";
}
});
const manager = {
getActiveCalls: () => [call],
getCallByProviderCallId: (providerCallId: string) =>
providerCallId === call.providerCallId ? call : undefined,
getCall: (callId: string) => (callId === call.callId ? call : undefined),
endCall: vi.fn(async () => ({ success: true })),
speakInitialMessage: vi.fn(async () => {}),
processEvent,
} as unknown as CallManager;
const config = createConfig({
provider: "twilio",
streaming: {
...createConfig().streaming,
enabled: true,
openaiApiKey: "test-key",
},
});
const server = new VoiceCallWebhookServer(
config,
manager,
createTwilioProvider(clearTtsQueue) as unknown as VoiceCallProvider,
);
const handleInboundResponse = vi.fn(async () => {});
(
server as unknown as {
handleInboundResponse: (
callId: string,
transcript: string,
timing?: unknown,
) => Promise<void>;
}
).handleInboundResponse = handleInboundResponse;
try {
const media = getMediaCallbacks(server);
media.config.onSpeechStart?.("CA-barge");
media.config.onTranscript?.("CA-barge", "hello");
media.config.onSpeechStart?.("CA-barge");
media.config.onTranscript?.("CA-barge", "hello again");
expect(clearTtsQueue).not.toHaveBeenCalled();
expect(handleInboundResponse).not.toHaveBeenCalled();
expect(processEvent).not.toHaveBeenCalled();
if (call.metadata) {
delete call.metadata.initialMessage;
}
call.state = "listening";
media.config.onSpeechStart?.("CA-barge");
media.config.onTranscript?.("CA-barge", "hello after greeting");
expect(clearTtsQueue).toHaveBeenCalledTimes(2);
expect(handleInboundResponse).toHaveBeenCalledTimes(1);
expect(processEvent).toHaveBeenCalledTimes(1);
const [calledCallId, calledTranscript] = (handleInboundResponse.mock.calls[0] ??
[]) as unknown as [string | undefined, string | undefined];
expect(calledCallId).toBe(call.callId);
expect(calledTranscript).toBe("hello after greeting");
} finally {
await server.stop();
}
});
it("keeps barge-in clear enabled for inbound calls", async () => {
const call = createCall(Date.now() - 1_000);
call.callId = "call-inbound";
call.providerCallId = "CA-inbound";
call.direction = "inbound";
call.metadata = {
initialMessage: "Hello from inbound greeting.",
};
const clearTtsQueue = vi.fn();
const manager = {
getActiveCalls: () => [call],
getCallByProviderCallId: (providerCallId: string) =>
providerCallId === call.providerCallId ? call : undefined,
getCall: (callId: string) => (callId === call.callId ? call : undefined),
endCall: vi.fn(async () => ({ success: true })),
speakInitialMessage: vi.fn(async () => {}),
processEvent: vi.fn(),
} as unknown as CallManager;
const config = createConfig({
provider: "twilio",
streaming: {
...createConfig().streaming,
enabled: true,
openaiApiKey: "test-key",
},
});
const server = new VoiceCallWebhookServer(
config,
manager,
createTwilioProvider(clearTtsQueue) as unknown as VoiceCallProvider,
);
try {
const media = getMediaCallbacks(server);
media.config.onSpeechStart?.("CA-inbound");
media.config.onTranscript?.("CA-inbound", "hello");
expect(clearTtsQueue).toHaveBeenCalledTimes(2);
} finally {
await server.stop();
}
});
});

View File

@ -13,10 +13,23 @@ import { MediaStreamHandler } from "./media-stream.js";
import type { VoiceCallProvider } from "./providers/base.js";
import { OpenAIRealtimeSTTProvider } from "./providers/stt-openai-realtime.js";
import type { TwilioProvider } from "./providers/twilio.js";
import type { NormalizedEvent, WebhookContext } from "./types.js";
import type { CallRecord, NormalizedEvent, WebhookContext } from "./types.js";
import { startStaleCallReaper } from "./webhook/stale-call-reaper.js";
const MAX_WEBHOOK_BODY_BYTES = 1024 * 1024;
const STREAM_DISCONNECT_HANGUP_GRACE_MS = 2000;
const TRANSCRIPT_LOG_MAX_CHARS = 200;
function sanitizeTranscriptForLog(value: string): string {
const sanitized = value
.replace(/[\u0000-\u001f\u007f]/g, " ")
.replace(/\s+/g, " ")
.trim();
if (sanitized.length <= TRANSCRIPT_LOG_MAX_CHARS) {
return sanitized;
}
return `${sanitized.slice(0, TRANSCRIPT_LOG_MAX_CHARS)}...`;
}
type WebhookResponsePayload = {
statusCode: number;
@ -60,6 +73,8 @@ export class VoiceCallWebhookServer {
/** Media stream handler for bidirectional audio (when streaming enabled) */
private mediaStreamHandler: MediaStreamHandler | null = null;
/** Delayed auto-hangup timers keyed by provider call ID after stream disconnect. */
private pendingDisconnectHangups = new Map<string, ReturnType<typeof setTimeout>>();
constructor(
config: VoiceCallConfig,
@ -87,6 +102,36 @@ export class VoiceCallWebhookServer {
return this.mediaStreamHandler;
}
private clearPendingDisconnectHangup(providerCallId: string): void {
const existing = this.pendingDisconnectHangups.get(providerCallId);
if (!existing) {
return;
}
clearTimeout(existing);
this.pendingDisconnectHangups.delete(providerCallId);
}
private shouldSuppressBargeInForInitialMessage(call: CallRecord | undefined): boolean {
if (!call || call.direction !== "outbound") {
return false;
}
// Suppress only while the initial greeting is actively being played.
// If playback fails and the call leaves "speaking", do not block auto-response.
if (call.state !== "speaking") {
return false;
}
const mode = (call.metadata?.mode as string | undefined) ?? "conversation";
if (mode !== "conversation") {
return false;
}
const initialMessage =
typeof call.metadata?.initialMessage === "string" ? call.metadata.initialMessage.trim() : "";
return initialMessage.length > 0;
}
/**
* Initialize media streaming with OpenAI Realtime STT.
*/
@ -127,19 +172,27 @@ export class VoiceCallWebhookServer {
return true;
},
onTranscript: (providerCallId, transcript) => {
console.log(`[voice-call] Transcript for ${providerCallId}: ${transcript}`);
// Clear TTS queue on barge-in (user started speaking, interrupt current playback)
if (this.provider.name === "twilio") {
(this.provider as TwilioProvider).clearTtsQueue(providerCallId);
}
// Look up our internal call ID from the provider call ID
const safeTranscript = sanitizeTranscriptForLog(transcript);
console.log(
`[voice-call] Transcript for ${providerCallId}: ${safeTranscript} (chars=${transcript.length})`,
);
const call = this.manager.getCallByProviderCallId(providerCallId);
if (!call) {
console.warn(`[voice-call] No active call found for provider ID: ${providerCallId}`);
return;
}
const suppressBargeIn = this.shouldSuppressBargeInForInitialMessage(call);
if (suppressBargeIn) {
console.log(
`[voice-call] Ignoring barge transcript while initial message is still playing (${providerCallId})`,
);
return;
}
// Clear TTS queue on barge-in (user started speaking, interrupt current playback)
if (this.provider.name === "twilio") {
(this.provider as TwilioProvider).clearTtsQueue(providerCallId);
}
// Create a speech event and process it through the manager
const event: NormalizedEvent = {
@ -163,44 +216,63 @@ export class VoiceCallWebhookServer {
}
},
onSpeechStart: (providerCallId) => {
if (this.provider.name === "twilio") {
(this.provider as TwilioProvider).clearTtsQueue(providerCallId);
if (this.provider.name !== "twilio") {
return;
}
const call = this.manager.getCallByProviderCallId(providerCallId);
if (this.shouldSuppressBargeInForInitialMessage(call)) {
return;
}
(this.provider as TwilioProvider).clearTtsQueue(providerCallId);
},
onPartialTranscript: (callId, partial) => {
console.log(`[voice-call] Partial for ${callId}: ${partial}`);
const safePartial = sanitizeTranscriptForLog(partial);
console.log(`[voice-call] Partial for ${callId}: ${safePartial} (chars=${partial.length})`);
},
onConnect: (callId, streamSid) => {
console.log(`[voice-call] Media stream connected: ${callId} -> ${streamSid}`);
this.clearPendingDisconnectHangup(callId);
// Register stream with provider for TTS routing
if (this.provider.name === "twilio") {
(this.provider as TwilioProvider).registerCallStream(callId, streamSid);
}
// Speak initial message if one was provided when call was initiated
// Use setTimeout to allow stream setup to complete
setTimeout(() => {
this.manager.speakInitialMessage(callId).catch((err) => {
console.warn(`[voice-call] Failed to speak initial message:`, err);
});
}, 500);
// Speak initial message immediately (no delay) to avoid stream timeout
this.manager.speakInitialMessage(callId).catch((err) => {
console.warn(`[voice-call] Failed to speak initial message:`, err);
});
},
onDisconnect: (callId) => {
console.log(`[voice-call] Media stream disconnected: ${callId}`);
// Auto-end call when media stream disconnects to prevent stuck calls.
// Without this, calls can remain active indefinitely after the stream closes.
const disconnectedCall = this.manager.getCallByProviderCallId(callId);
if (disconnectedCall) {
onDisconnect: (callId, streamSid) => {
console.log(`[voice-call] Media stream disconnected: ${callId} (${streamSid})`);
if (this.provider.name === "twilio") {
(this.provider as TwilioProvider).unregisterCallStream(callId, streamSid);
}
this.clearPendingDisconnectHangup(callId);
const timer = setTimeout(() => {
this.pendingDisconnectHangups.delete(callId);
const disconnectedCall = this.manager.getCallByProviderCallId(callId);
if (!disconnectedCall) {
return;
}
if (this.provider.name === "twilio") {
const twilio = this.provider as TwilioProvider;
if (twilio.hasRegisteredStream(callId)) {
return;
}
}
console.log(
`[voice-call] Auto-ending call ${disconnectedCall.callId} on stream disconnect`,
`[voice-call] Auto-ending call ${disconnectedCall.callId} after stream disconnect grace`,
);
void this.manager.endCall(disconnectedCall.callId).catch((err) => {
console.warn(`[voice-call] Failed to auto-end call ${disconnectedCall.callId}:`, err);
});
}
if (this.provider.name === "twilio") {
(this.provider as TwilioProvider).unregisterCallStream(callId);
}
}, STREAM_DISCONNECT_HANGUP_GRACE_MS);
timer.unref?.();
this.pendingDisconnectHangups.set(callId, timer);
},
};
@ -274,6 +346,11 @@ export class VoiceCallWebhookServer {
* Stop the webhook server.
*/
async stop(): Promise<void> {
for (const timer of this.pendingDisconnectHangups.values()) {
clearTimeout(timer);
}
this.pendingDisconnectHangups.clear();
if (this.stopStaleCallReaper) {
this.stopStaleCallReaper();
this.stopStaleCallReaper = null;