Skip to content

Commit

Permalink
Merge pull request #6 from deepgram-devs/voice-agent-samples-1
Browse files Browse the repository at this point in the history
Docs: Voice Agent Code Sample  [Don't Merge]
  • Loading branch information
jpvajda authored Nov 7, 2024
2 parents b187da6 + 2ead461 commit 0b06af0
Showing 1 changed file with 376 additions and 1 deletion.
377 changes: 376 additions & 1 deletion documentation/python-readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ audio_url = "http://stream.live.vc.bbcmedia.co.uk/bbc_world_service"
# Define the WebSocket functions on_open, on_message, on_close, and on_error
def on_open(ws):
print("WebSocket connection established.")

# Start audio streaming thread
audio_thread = threading.Thread(target=stream_audio, args=(ws,))
audio_thread.daemon = True
Expand Down Expand Up @@ -367,3 +367,378 @@ print("File saved successfully at:", output_file_path)

```

## Voice Agent Conversion using Deepgram API

**Title:** Take audio from an input source and play the resulting agent audio

**Code Sample:** voice-agent/voice-agent-play-audio/main.py

**Description:** This Python script uses the Deepgram API to take audio from an input source and play the resulting agent audio on the selected output device.

### voice-agent/voice-agent-play-audio/main.py

```python

import pyaudio
import asyncio
import sys
import os
import json
import inspect
import queue

import threading
from typing import Optional, Callable, Union

from websockets.sync.client import ClientConnection as SyncClientConnection

from websockets.sync.client import connect

TIMEOUT = 0.050
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 16000
CHUNK = 8000

def main():
try:
dg_api_key = os.environ.get("DEEPGRAM_API_KEY")
if dg_api_key is None:
print("DEEPGRAM_API_KEY env var not present")
return

print("\n\n\nPress Enter to stop...\n\n\n")

_socket = connect(
"wss://agent.deepgram.com/agent",
additional_headers={"Authorization": f"Token {dg_api_key}"},
)

_config_message = {
"type": "SettingsConfiguration",
"audio": {
"input": {
"encoding": "linear16",
"sample_rate": RATE,
},
"output": {
"encoding": "linear16",
"sample_rate": RATE,
"container": "none",
},
},
"agent": {
"listen": {"model": "nova-2"},
"think": {
"provider": {
"type": "open_ai", # examples are anthropic, open_ai, groq, ollama
},
"model": "gpt-4o-mini", # examples are claude-3-haiku-20240307, gpt-3.5-turbo, mixtral-8x7b-32768, mistral
"instructions": "You are a helpful AI assistant.",
},
"speak": {"model": "aura-athena-en"},
},
}

_socket.send(json.dumps(_config_message))

speaker = Speaker()
speaker.start(_socket)

microphone = Microphone(push_callback=_socket.send)
microphone.start()

input()

print("Stopping microphone...")
microphone.stop()

print("Stopping speaker...")
speaker.stop()

print("Closing socket...")
_socket.close()
_socket = None

except Exception as e:
print(f"main: {e}")

class Microphone:
_audio: pyaudio.PyAudio
_chunk: int
_rate: int
_format: int
_channels: int
_input_device_index: Optional[int]
_is_muted: bool

_stream: pyaudio.Stream
_asyncio_loop: asyncio.AbstractEventLoop
_asyncio_thread: threading.Thread
_exit: threading.Event

_push_callback_org: object
_push_callback: object

def __init__(
self,
push_callback: Optional[Callable] = None,
rate: Optional[int] = RATE,
chunk: Optional[int] = CHUNK,
channels: Optional[int] = CHANNELS,
input_device_index: Optional[int] = None,
):
self._exit = threading.Event()

self._audio = pyaudio.PyAudio()
self._chunk = chunk
self._rate = rate
self._format = FORMAT
self._channels = channels
self._is_muted = False

self._input_device_index = input_device_index
self._push_callback_org = push_callback

def _start_asyncio_loop(self) -> None:
self._asyncio_loop = asyncio.new_event_loop()
self._asyncio_loop.run_forever()

def is_active(self) -> bool:
if self._stream is None:
return False

val = self._stream.is_active()
return val

def set_callback(self, push_callback: Callable) -> None:
self._push_callback_org = push_callback

def start(self) -> bool:
if self._push_callback_org is None:
return False

if inspect.iscoroutinefunction(self._push_callback_org):
self._asyncio_thread = threading.Thread(target=self._start_asyncio_loop)
self._asyncio_thread.start()

self._push_callback = lambda data: asyncio.run_coroutine_threadsafe(
self._push_callback_org(data), self._asyncio_loop
).result()
else:
self._push_callback = self._push_callback_org

self._stream = self._audio.open(
format=self._format,
channels=self._channels,
rate=self._rate,
input=True,
output=False,
frames_per_buffer=self._chunk,
input_device_index=self._input_device_index,
stream_callback=self._callback,
)

self._exit.clear()
self._stream.start_stream()

return True

def _callback(self, input_data, frame_count, time_info, status_flags):
if self._exit.is_set():
return None, pyaudio.paAbort

if input_data is None:
return None, pyaudio.paContinue

try:
if self._is_muted:
size = len(input_data)
input_data = b"\x00" * size

self._push_callback(input_data)
except Exception as e:
raise

return input_data, pyaudio.paContinue

def mute(self) -> bool:
if self._stream is None:
return False

self._is_muted = True

return True

def unmute(self) -> bool:
if self._stream is None:
return False

self._is_muted = False

return True

def stop(self) -> bool:
self._exit.set()

if self._stream is not None:
self._stream.stop_stream()
self._stream.close()
self._stream = None

if (
inspect.iscoroutinefunction(self._push_callback_org)
and self._asyncio_thread is not None
):
self._asyncio_loop.call_soon_threadsafe(self._asyncio_loop.stop)
self._asyncio_thread.join()
self._asyncio_thread = None

return True

class Speaker:
_audio: pyaudio.PyAudio
_chunk: int
_rate: int
_format: int
_channels: int
_output_device_index: Optional[int]

_queue: queue.Queue
_exit: threading.Event

_stream: pyaudio.Stream
_thread: threading.Thread
_asyncio_loop: asyncio.AbstractEventLoop
_receiver_thread: threading.Thread = None

_socket: SyncClientConnection
_push_callback_org: Callable = None
_push_callback: Callable = None
_loop: asyncio.AbstractEventLoop

def __init__(
self,
push_callback: Optional[Callable] = None,
rate: int = RATE,
chunk: int = CHUNK,
channels: int = CHANNELS,
output_device_index: Optional[int] = None,
):
self._exit = threading.Event()
self._queue = queue.Queue()

self._audio = pyaudio.PyAudio()
self._chunk = chunk
self._rate = rate
self._format = FORMAT
self._channels = channels
self._output_device_index = output_device_index

self._socket = None
self._push_callback_org = push_callback

def set_callback(self, push_callback: Callable) -> None:
self._push_callback_org = push_callback

def start(self, socket: SyncClientConnection) -> bool:
# Automatically get the current running event loop
if inspect.iscoroutinefunction(socket.send):
self._loop = asyncio.get_running_loop()

self._exit.clear()
self._socket = socket

self._stream = self._audio.open(
format=self._format,
channels=self._channels,
rate=self._rate,
input=False,
output=True,
frames_per_buffer=self._chunk,
output_device_index=self._output_device_index,
)

# determine if the push_callback is a coroutine
if inspect.iscoroutinefunction(self._push_callback_org):
self._push_callback = lambda data: asyncio.run_coroutine_threadsafe(
self._push_callback_org(data), self._asyncio_loop
).result()
else:
self._push_callback = self._push_callback_org

# start the play thread
self._thread = threading.Thread(
target=self._play, args=(self._queue, self._stream, self._exit), daemon=True
)
self._thread.start()

# Start the stream
self._stream.start_stream()

# Start the receiver thread within the start function
if self._socket is not None:
print("Starting receiver thread...")
self._receiver_thread = threading.Thread(
target=self._start_receiver, args=(self._socket,)
)
self._receiver_thread.start()

return True

def _start_receiver(self, socket: SyncClientConnection):
print("Starting threaded receiver...")
self._start_threaded_receiver(socket)

def _start_threaded_receiver(self, socket: SyncClientConnection):
try:
while True:
if socket is None or self._exit.is_set():
break

message = socket.recv()
if message is None:
continue

if isinstance(message, str):
print(message)
elif isinstance(message, bytes):
self.add_audio_to_queue(message)
except Exception as e:
print(f"threaded receiver: {e}")

def add_audio_to_queue(self, data):
self._queue.put(data)

def stop(self):
self._exit.set()

if self._stream is not None:
self._stream.stop_stream()
self._stream.close()
self._stream = None

self._thread.join()
self._thread = None

if self._receiver_thread is not None:
self._receiver_thread.join()
self._receiver_thread = None

self._socket = None
self._queue = None

def _play(self, audio_out, stream, stop):
while not stop.is_set():
try:
data = audio_out.get(True, TIMEOUT)
stream.write(data)
except queue.Empty:
pass
except Exception as e:
print(f"_play: {e}")

if __name__ == "__main__":
sys.exit(main() or 0)

```

0 comments on commit 0b06af0

Please sign in to comment.