diff --git a/documentation/python-readme.md b/documentation/python-readme.md index 5f29119..b949320 100644 --- a/documentation/python-readme.md +++ b/documentation/python-readme.md @@ -29,7 +29,7 @@ audio_url = "http://stream.live.vc.bbcmedia.co.uk/bbc_world_service" # Define the WebSocket functions on_open, on_message, on_close, and on_error def on_open(ws): print("WebSocket connection established.") - + # Start audio streaming thread audio_thread = threading.Thread(target=stream_audio, args=(ws,)) audio_thread.daemon = True @@ -367,3 +367,378 @@ print("File saved successfully at:", output_file_path) ``` +## Voice Agent Conversion using Deepgram API + +**Title:** Take audio from an input source and play the resulting agent audio + +**Code Sample:** voice-agent/voice-agent-play-audio/main.py + +**Description:** This Python script uses the Deepgram API to take audio from an input source and play the resulting agent audio on the selected output device. + +### voice-agent/voice-agent-play-audio/main.py + +```python + +import pyaudio +import asyncio +import sys +import os +import json +import inspect +import queue + +import threading +from typing import Optional, Callable, Union + +from websockets.sync.client import ClientConnection as SyncClientConnection + +from websockets.sync.client import connect + +TIMEOUT = 0.050 +FORMAT = pyaudio.paInt16 +CHANNELS = 1 +RATE = 16000 +CHUNK = 8000 + +def main(): + try: + dg_api_key = os.environ.get("DEEPGRAM_API_KEY") + if dg_api_key is None: + print("DEEPGRAM_API_KEY env var not present") + return + + print("\n\n\nPress Enter to stop...\n\n\n") + + _socket = connect( + "wss://agent.deepgram.com/agent", + additional_headers={"Authorization": f"Token {dg_api_key}"}, + ) + + _config_message = { + "type": "SettingsConfiguration", + "audio": { + "input": { + "encoding": "linear16", + "sample_rate": RATE, + }, + "output": { + "encoding": "linear16", + "sample_rate": RATE, + "container": "none", + }, + }, + "agent": { + "listen": {"model": "nova-2"}, + "think": { + "provider": { + "type": "open_ai", # examples are anthropic, open_ai, groq, ollama + }, + "model": "gpt-4o-mini", # examples are claude-3-haiku-20240307, gpt-3.5-turbo, mixtral-8x7b-32768, mistral + "instructions": "You are a helpful AI assistant.", + }, + "speak": {"model": "aura-athena-en"}, + }, + } + + _socket.send(json.dumps(_config_message)) + + speaker = Speaker() + speaker.start(_socket) + + microphone = Microphone(push_callback=_socket.send) + microphone.start() + + input() + + print("Stopping microphone...") + microphone.stop() + + print("Stopping speaker...") + speaker.stop() + + print("Closing socket...") + _socket.close() + _socket = None + + except Exception as e: + print(f"main: {e}") + +class Microphone: + _audio: pyaudio.PyAudio + _chunk: int + _rate: int + _format: int + _channels: int + _input_device_index: Optional[int] + _is_muted: bool + + _stream: pyaudio.Stream + _asyncio_loop: asyncio.AbstractEventLoop + _asyncio_thread: threading.Thread + _exit: threading.Event + + _push_callback_org: object + _push_callback: object + + def __init__( + self, + push_callback: Optional[Callable] = None, + rate: Optional[int] = RATE, + chunk: Optional[int] = CHUNK, + channels: Optional[int] = CHANNELS, + input_device_index: Optional[int] = None, + ): + self._exit = threading.Event() + + self._audio = pyaudio.PyAudio() + self._chunk = chunk + self._rate = rate + self._format = FORMAT + self._channels = channels + self._is_muted = False + + self._input_device_index = input_device_index + self._push_callback_org = push_callback + + def _start_asyncio_loop(self) -> None: + self._asyncio_loop = asyncio.new_event_loop() + self._asyncio_loop.run_forever() + + def is_active(self) -> bool: + if self._stream is None: + return False + + val = self._stream.is_active() + return val + + def set_callback(self, push_callback: Callable) -> None: + self._push_callback_org = push_callback + + def start(self) -> bool: + if self._push_callback_org is None: + return False + + if inspect.iscoroutinefunction(self._push_callback_org): + self._asyncio_thread = threading.Thread(target=self._start_asyncio_loop) + self._asyncio_thread.start() + + self._push_callback = lambda data: asyncio.run_coroutine_threadsafe( + self._push_callback_org(data), self._asyncio_loop + ).result() + else: + self._push_callback = self._push_callback_org + + self._stream = self._audio.open( + format=self._format, + channels=self._channels, + rate=self._rate, + input=True, + output=False, + frames_per_buffer=self._chunk, + input_device_index=self._input_device_index, + stream_callback=self._callback, + ) + + self._exit.clear() + self._stream.start_stream() + + return True + + def _callback(self, input_data, frame_count, time_info, status_flags): + if self._exit.is_set(): + return None, pyaudio.paAbort + + if input_data is None: + return None, pyaudio.paContinue + + try: + if self._is_muted: + size = len(input_data) + input_data = b"\x00" * size + + self._push_callback(input_data) + except Exception as e: + raise + + return input_data, pyaudio.paContinue + + def mute(self) -> bool: + if self._stream is None: + return False + + self._is_muted = True + + return True + + def unmute(self) -> bool: + if self._stream is None: + return False + + self._is_muted = False + + return True + + def stop(self) -> bool: + self._exit.set() + + if self._stream is not None: + self._stream.stop_stream() + self._stream.close() + self._stream = None + + if ( + inspect.iscoroutinefunction(self._push_callback_org) + and self._asyncio_thread is not None + ): + self._asyncio_loop.call_soon_threadsafe(self._asyncio_loop.stop) + self._asyncio_thread.join() + self._asyncio_thread = None + + return True + +class Speaker: + _audio: pyaudio.PyAudio + _chunk: int + _rate: int + _format: int + _channels: int + _output_device_index: Optional[int] + + _queue: queue.Queue + _exit: threading.Event + + _stream: pyaudio.Stream + _thread: threading.Thread + _asyncio_loop: asyncio.AbstractEventLoop + _receiver_thread: threading.Thread = None + + _socket: SyncClientConnection + _push_callback_org: Callable = None + _push_callback: Callable = None + _loop: asyncio.AbstractEventLoop + + def __init__( + self, + push_callback: Optional[Callable] = None, + rate: int = RATE, + chunk: int = CHUNK, + channels: int = CHANNELS, + output_device_index: Optional[int] = None, + ): + self._exit = threading.Event() + self._queue = queue.Queue() + + self._audio = pyaudio.PyAudio() + self._chunk = chunk + self._rate = rate + self._format = FORMAT + self._channels = channels + self._output_device_index = output_device_index + + self._socket = None + self._push_callback_org = push_callback + + def set_callback(self, push_callback: Callable) -> None: + self._push_callback_org = push_callback + + def start(self, socket: SyncClientConnection) -> bool: + # Automatically get the current running event loop + if inspect.iscoroutinefunction(socket.send): + self._loop = asyncio.get_running_loop() + + self._exit.clear() + self._socket = socket + + self._stream = self._audio.open( + format=self._format, + channels=self._channels, + rate=self._rate, + input=False, + output=True, + frames_per_buffer=self._chunk, + output_device_index=self._output_device_index, + ) + + # determine if the push_callback is a coroutine + if inspect.iscoroutinefunction(self._push_callback_org): + self._push_callback = lambda data: asyncio.run_coroutine_threadsafe( + self._push_callback_org(data), self._asyncio_loop + ).result() + else: + self._push_callback = self._push_callback_org + + # start the play thread + self._thread = threading.Thread( + target=self._play, args=(self._queue, self._stream, self._exit), daemon=True + ) + self._thread.start() + + # Start the stream + self._stream.start_stream() + + # Start the receiver thread within the start function + if self._socket is not None: + print("Starting receiver thread...") + self._receiver_thread = threading.Thread( + target=self._start_receiver, args=(self._socket,) + ) + self._receiver_thread.start() + + return True + + def _start_receiver(self, socket: SyncClientConnection): + print("Starting threaded receiver...") + self._start_threaded_receiver(socket) + + def _start_threaded_receiver(self, socket: SyncClientConnection): + try: + while True: + if socket is None or self._exit.is_set(): + break + + message = socket.recv() + if message is None: + continue + + if isinstance(message, str): + print(message) + elif isinstance(message, bytes): + self.add_audio_to_queue(message) + except Exception as e: + print(f"threaded receiver: {e}") + + def add_audio_to_queue(self, data): + self._queue.put(data) + + def stop(self): + self._exit.set() + + if self._stream is not None: + self._stream.stop_stream() + self._stream.close() + self._stream = None + + self._thread.join() + self._thread = None + + if self._receiver_thread is not None: + self._receiver_thread.join() + self._receiver_thread = None + + self._socket = None + self._queue = None + + def _play(self, audio_out, stream, stop): + while not stop.is_set(): + try: + data = audio_out.get(True, TIMEOUT) + stream.write(data) + except queue.Empty: + pass + except Exception as e: + print(f"_play: {e}") + +if __name__ == "__main__": + sys.exit(main() or 0) + +```