From 0693bfd8bed117e14d47998344ef4869be9e317b Mon Sep 17 00:00:00 2001 From: Nikola Whallon Date: Fri, 13 Sep 2024 11:53:18 -0700 Subject: [PATCH] handles 429s --- server.py | 240 +++++++++++++++++++++++++++++++++--------------------- 1 file changed, 148 insertions(+), 92 deletions(-) diff --git a/server.py b/server.py index 683e8a2..416c44d 100644 --- a/server.py +++ b/server.py @@ -4,6 +4,7 @@ import sys import websockets import ssl +import requests def sts_connect(): @@ -19,123 +20,178 @@ async def twilio_handler(twilio_ws): audio_queue = asyncio.Queue() streamsid_queue = asyncio.Queue() - async with sts_connect() as sts_ws: - config_message = { - "type": "SettingsConfiguration", - "audio": { - "input": { - "encoding": "mulaw", - "sample_rate": 8000, - }, - "output": { - "encoding": "mulaw", - "sample_rate": 8000, - "container": "none", - "buffer_size": 250, + try: + async with sts_connect() as sts_ws: + config_message = { + "type": "SettingsConfiguration", + "audio": { + "input": { + "encoding": "mulaw", + "sample_rate": 8000, + }, + "output": { + "encoding": "mulaw", + "sample_rate": 8000, + "container": "none", + "buffer_size": 250, + }, }, - }, - "agent": { - "listen": {"model": "nova-2"}, - "think": { - "provider": { - "type": "anthropic", # examples are anthropic, open_ai, groq, ollama + "agent": { + "listen": {"model": "nova-2"}, + "think": { + "provider": { + "type": "anthropic", # examples are anthropic, open_ai, groq, ollama + }, + "model": "claude-3-haiku-20240307", # examples are claude-3-haiku-20240307, gpt-3.5-turbo, mixtral-8x7b-32768, mistral + "instructions": "You are a helpful car seller.", }, - "model": "claude-3-haiku-20240307", # examples are claude-3-haiku-20240307, gpt-3.5-turbo, mixtral-8x7b-32768, mistral - "instructions": "You are a helpful car seller.", + "speak": {"model": "aura-asteria-en"}, }, - "speak": {"model": "aura-asteria-en"}, - }, - } - - await sts_ws.send(json.dumps(config_message)) - - async def sts_sender(sts_ws): - print("sts_sender started") - while True: - chunk = await audio_queue.get() - await sts_ws.send(chunk) - - async def sts_receiver(sts_ws): - print("sts_receiver started") - # we will wait until the twilio ws connection figures out the streamsid - streamsid = await streamsid_queue.get() - # for each sts result received, forward it on to the call - async for message in sts_ws: - if type(message) is str: - print(message) - # handle barge-in - decoded = json.loads(message) - if decoded['type'] == 'UserStartedSpeaking': - clear_message = { - "event": "clear", - "streamSid": streamsid - } - await twilio_ws.send(json.dumps(clear_message)) - - continue - - print(type(message)) - raw_mulaw = message + } + + await sts_ws.send(json.dumps(config_message)) + + async def sts_sender(sts_ws): + print("sts_sender started") + while True: + chunk = await audio_queue.get() + await sts_ws.send(chunk) + + async def sts_receiver(sts_ws): + print("sts_receiver started") + # we will wait until the twilio ws connection figures out the streamsid + streamsid = await streamsid_queue.get() + # for each sts result received, forward it on to the call + async for message in sts_ws: + if type(message) is str: + print(message) + # handle barge-in + decoded = json.loads(message) + if decoded['type'] == 'UserStartedSpeaking': + clear_message = { + "event": "clear", + "streamSid": streamsid + } + await twilio_ws.send(json.dumps(clear_message)) - # construct a Twilio media message with the raw mulaw (see https://www.twilio.com/docs/voice/twiml/stream#websocket-messages---to-twilio) - media_message = { - "event": "media", - "streamSid": streamsid, - "media": {"payload": base64.b64encode(raw_mulaw).decode("ascii")}, - } + continue - # send the TTS audio to the attached phonecall - await twilio_ws.send(json.dumps(media_message)) + print(type(message)) + raw_mulaw = message + + # construct a Twilio media message with the raw mulaw (see https://www.twilio.com/docs/voice/twiml/stream#websocket-messages---to-twilio) + media_message = { + "event": "media", + "streamSid": streamsid, + "media": {"payload": base64.b64encode(raw_mulaw).decode("ascii")}, + } + + # send the TTS audio to the attached phonecall + await twilio_ws.send(json.dumps(media_message)) + + async def twilio_receiver(twilio_ws): + print("twilio_receiver started") + # twilio sends audio data as 160 byte messages containing 20ms of audio each + # we will buffer 20 twilio messages corresponding to 0.4 seconds of audio to improve throughput performance + BUFFER_SIZE = 20 * 160 + + inbuffer = bytearray(b"") + async for message in twilio_ws: + try: + data = json.loads(message) + if data["event"] == "start": + print("got our streamsid") + start = data["start"] + streamsid = start["streamSid"] + streamsid_queue.put_nowait(streamsid) + if data["event"] == "connected": + continue + if data["event"] == "media": + media = data["media"] + chunk = base64.b64decode(media["payload"]) + if media["track"] == "inbound": + inbuffer.extend(chunk) + if data["event"] == "stop": + break + + # check if our buffer is ready to send to our audio_queue (and, thus, then to sts) + while len(inbuffer) >= BUFFER_SIZE: + chunk = inbuffer[:BUFFER_SIZE] + audio_queue.put_nowait(chunk) + inbuffer = inbuffer[BUFFER_SIZE:] + except: + break + # the async for loop will end if the ws connection from twilio dies + # and if this happens, we should forward an some kind of message to sts + # to signal sts to send back remaining messages before closing(?) + # audio_queue.put_nowait(b'') + + await asyncio.wait( + [ + asyncio.ensure_future(sts_sender(sts_ws)), + asyncio.ensure_future(sts_receiver(sts_ws)), + asyncio.ensure_future(twilio_receiver(twilio_ws)), + ] + ) + + await twilio_ws.close() + except websockets.exceptions.WebSocketException as e: + print("failed to connect to deepgram - will convey an error message on the phone") + + # we got an error connecting to deepgram, and we want to convey this to the caller + # to do this, we need to wait to receive the streamsid from twilio async def twilio_receiver(twilio_ws): - print("twilio_receiver started") - # twilio sends audio data as 160 byte messages containing 20ms of audio each - # we will buffer 20 twilio messages corresponding to 0.4 seconds of audio to improve throughput performance - BUFFER_SIZE = 20 * 160 - - inbuffer = bytearray(b"") async for message in twilio_ws: try: data = json.loads(message) + if data["event"] == "start": - print("got our streamsid") start = data["start"] streamsid = start["streamSid"] - streamsid_queue.put_nowait(streamsid) - if data["event"] == "connected": - continue - if data["event"] == "media": - media = data["media"] - chunk = base64.b64decode(media["payload"]) - if media["track"] == "inbound": - inbuffer.extend(chunk) - if data["event"] == "stop": - break - # check if our buffer is ready to send to our audio_queue (and, thus, then to sts) - while len(inbuffer) >= BUFFER_SIZE: - chunk = inbuffer[:BUFFER_SIZE] - audio_queue.put_nowait(chunk) - inbuffer = inbuffer[BUFFER_SIZE:] + reply = "An error occured connecting to deepgram." + if "429" in str(e): + reply += " Too many requests are being handled currently, please try again later." + + # make a Deepgram Aura TTS request specifying that we want raw mulaw audio as the output + url = "https://api.deepgram.com/v1/speak?model=aura-asteria-en&encoding=mulaw&sample_rate=8000&container=none" + headers = { + "Authorization": "Token INSERT_DEEPGRAM_API_KEY", + "Content-Type": "application/json", + } + payload = {"text": reply} + tts_response = requests.post(url, headers=headers, json=payload) + + if tts_response.status_code == 200: + raw_mulaw = tts_response.content + + # construct a Twilio media message with the raw mulaw (see https://www.twilio.com/docs/voice/twiml/stream#websocket-messages---to-twilio) + media_message = { + "event": "media", + "streamSid": streamsid, + "media": {"payload": base64.b64encode(raw_mulaw).decode("ascii")}, + } + + # send the TTS audio to the attached phonecall + await twilio_ws.send(json.dumps(media_message)) + + # sleep so that this message gets played in full before the connection gets killed + await asyncio.sleep(8) + + # break, which will end the phone call + break except: break - # the async for loop will end if the ws connection from twilio dies - # and if this happens, we should forward an some kind of message to sts - # to signal sts to send back remaining messages before closing(?) - # audio_queue.put_nowait(b'') - await asyncio.wait( [ - asyncio.ensure_future(sts_sender(sts_ws)), - asyncio.ensure_future(sts_receiver(sts_ws)), asyncio.ensure_future(twilio_receiver(twilio_ws)), ] ) await twilio_ws.close() - async def router(websocket, path): if path == "/twilio": print("twilio connection incoming")