Voice AI Engine Development
Overview
This skill guides you through building production-ready voice AI engines with real-time conversation capabilities. Voice AI engines enable natural, bidirectional conversations between users and AI agents through streaming audio processing, speech-to-text transcription, LLM-powered responses, and text-to-speech synthesis.
The core architecture uses an async queue-based worker pipeline where each component runs independently and communicates via asyncio.Queue objects, enabling concurrent processing, interrupt handling, and real-time streaming at every stage.
When to Use This Skill
Use this skill when:
- Building real-time voice conversation systems
- Implementing voice assistants or chatbots
- Creating voice-enabled customer service agents
- Developing voice AI applications with interrupt capabilities
- Integrating multiple transcription, LLM, or TTS providers
- Working with streaming audio processing pipelines
- The user mentions Vocode, voice engines, or conversational AI
Core Architecture Principles
The Worker Pipeline Pattern
Every voice AI engine follows this pipeline:
Audio In → Transcriber → Agent → Synthesizer → Audio Out
(Worker 1) (Worker 2) (Worker 3)
Key Benefits:
- Decoupling: Workers only know about their input/output queues
- Concurrency: All workers run simultaneously via asyncio
- Backpressure: Queues automatically handle rate differences
- Interruptibility: Everything can be stopped mid-stream
Base Worker Pattern
Every worker follows this pattern:
class BaseWorker:
def __init__(self, input_queue, output_queue):
self.input_queue = input_queue # asyncio.Queue to consume from
self.output_queue = output_queue # asyncio.Queue to produce to
self.active = False
def start(self):
"""Start the worker's processing loop"""
self.active = True
asyncio.create_task(self._run_loop())
async def _run_loop(self):
"""Main processing loop - runs forever until terminated"""
while self.active:
item = await self.input_queue.get() # Block until item arrives
await self.process(item) # Process the item
async def process(self, item):
"""Override this - does the actual work"""
raise NotImplementedError
def terminate(self):
"""Stop the worker"""
self.active = False
Component Implementation Guide
1. Transcriber (Audio → Text)
Purpose: Converts incoming audio chunks to text transcriptions
Interface Requirements:
class BaseTranscriber:
def __init__(self, transcriber_config):
self.input_queue = asyncio.Queue() # Audio chunks (bytes)
self.output_queue = asyncio.Queue() # Transcriptions
self.is_muted = False
def send_audio(self, chunk: bytes):
"""Client calls this to send audio"""
if not self.is_muted:
self.input_queue.put_nowait(chunk)
else:
# Send silence instead (prevents echo during bot speech)
self.input_queue.put_nowait(self.create_silent_chunk(len(chunk)))
def mute(self):
"""Called when bot starts speaking (prevents echo)"""
self.is_muted = True
def unmute(self):
"""Called when bot stops speaking"""
self.is_muted = False
Output Format:
class Transcription:
message: str # "Hello, how are you?"
confidence: float # 0.95
is_final: bool # True = complete sentence, False = partial
is_interrupt: bool # Set by TranscriptionsWorker
Supported Providers:
- Deepgram - Fast, accurate, streaming
- AssemblyAI - High accuracy, good for accents
- Azure Speech - Enterprise-grade
- Google Cloud Speech - Multi-language support
Critical Implementation Details:
- Use WebSocket for bidirectional streaming
- Run sender and receiver tasks concurrently with
asyncio.gather() - Mute transcriber when bot speaks to prevent echo/feedback loops
- Handle both final and partial transcriptions
2. Agent (Text → Response)
Purpose: Processes user input and generates conversational responses
Interface Requirements:
class BaseAgent:
def __init__(self, agent_config):
self.input_queue = asyncio.Queue() # TranscriptionAgentInput
self.output_queue = asyncio.Queue() # AgentResponse
self.transcript = None # Conversation history
async def generate_response(self, human_input, is_interrupt, conversation_id):
"""Override this - returns AsyncGenerator of responses"""
raise NotImplementedError
Why Streaming Responses?
- Lower latency: Start speaking as soon as first sentence is ready
- Better interrupts: Can stop mid-response
- Sentence-by-sentence: More natural conversation flow
Supported Providers:
- OpenAI (GPT-4, GPT-3.5) - High quality, fast
- Google Gemini - Multimodal, cost-effective
- Anthropic Claude - Long context, nuanced responses
Critical Implementation Details:
- Maintain conversation history in
Transcriptobject - Stream responses using
AsyncGenerator - IMPORTANT: Buffer entire LLM response before yielding to synthesizer (prevents audio jumping)
- Handle interrupts by canceling current generation task
- Update conversation history with partial messages on interrupt
3. Synthesizer (Text → Audio)
Purpose: Converts agent text responses to speech audio
Interface Requirements:
class BaseSynthesizer:
async def create_speech(self, message: BaseMessage, chunk_size: int) -> SynthesisResult:
"""
Returns a SynthesisResult containing:
- chunk_generator: AsyncGenerator that yields audio chunks
- get_message_up_to: Function to get partial text (for interrupts)
"""
raise NotImplementedError
SynthesisResult Structure:
class SynthesisResult:
chunk_generator: AsyncGenerator[ChunkResult, None]
get_message_up_to: Callable[[float], str] # seconds → partial text
class ChunkResult:
chunk: bytes # Raw PCM audio
is_last_chunk: bool
Supported Providers:
- ElevenLabs - Most natural voices, streaming
- Azure TTS - Enterprise-grade, many languages
- Google Cloud TTS - Cost-effective, good quality
- Amazon Polly - AWS integration
- Play.ht - Voice cloning
Critical Implementation Details:
- Stream audio chunks as they're generated
- Convert audio to LINEAR16 PCM format (16kHz sample rate)
- Implement
get_message_up_to()for interrupt handling - Handle audio format conversion (MP3 → PCM)
4. Output Device (Audio → Client)
Purpose: Sends synthesized audio back to the client
CRITICAL: Rate Limiting for Interrupts
async def send_speech_to_output(self, message, synthesis_result,
stop_event, seconds_per_chunk):
chunk_idx = 0
async for chunk_result in synthesis_result.chunk_generator:
# Check for interrupt
if stop_event.is_set():
logger.debug(f"Interrupted after {chunk_idx} chunks")
message_sent = synthesis_result.get_message_up_to(
chunk_idx * seconds_per_chunk
)
return message_sent, True # cut_off = True
start_time = time.time()
# Send chunk to output device
self.output_device.consume_nonblocking(chunk_result.chunk)
# CRITICAL: Wait for chunk to play before sending next one
# This is what makes interrupts work!
speech_length = seconds_per_chunk
processing_time = time.time() - start_time
await asyncio.sleep(max(speech_length - processing_time, 0))
chunk_idx += 1
return message, False # cut_off = Fals