Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handles 429s #1

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 148 additions & 92 deletions server.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import sys
import websockets
import ssl
import requests


def sts_connect():
Expand All @@ -19,123 +20,178 @@ async def twilio_handler(twilio_ws):
audio_queue = asyncio.Queue()
streamsid_queue = asyncio.Queue()

async with sts_connect() as sts_ws:
config_message = {
"type": "SettingsConfiguration",
"audio": {
"input": {
"encoding": "mulaw",
"sample_rate": 8000,
},
"output": {
"encoding": "mulaw",
"sample_rate": 8000,
"container": "none",
"buffer_size": 250,
try:
async with sts_connect() as sts_ws:
config_message = {
"type": "SettingsConfiguration",
"audio": {
"input": {
"encoding": "mulaw",
"sample_rate": 8000,
},
"output": {
"encoding": "mulaw",
"sample_rate": 8000,
"container": "none",
"buffer_size": 250,
},
},
},
"agent": {
"listen": {"model": "nova-2"},
"think": {
"provider": {
"type": "anthropic", # examples are anthropic, open_ai, groq, ollama
"agent": {
"listen": {"model": "nova-2"},
"think": {
"provider": {
"type": "anthropic", # examples are anthropic, open_ai, groq, ollama
},
"model": "claude-3-haiku-20240307", # examples are claude-3-haiku-20240307, gpt-3.5-turbo, mixtral-8x7b-32768, mistral
"instructions": "You are a helpful car seller.",
},
"model": "claude-3-haiku-20240307", # examples are claude-3-haiku-20240307, gpt-3.5-turbo, mixtral-8x7b-32768, mistral
"instructions": "You are a helpful car seller.",
"speak": {"model": "aura-asteria-en"},
},
"speak": {"model": "aura-asteria-en"},
},
}

await sts_ws.send(json.dumps(config_message))

async def sts_sender(sts_ws):
print("sts_sender started")
while True:
chunk = await audio_queue.get()
await sts_ws.send(chunk)

async def sts_receiver(sts_ws):
print("sts_receiver started")
# we will wait until the twilio ws connection figures out the streamsid
streamsid = await streamsid_queue.get()
# for each sts result received, forward it on to the call
async for message in sts_ws:
if type(message) is str:
print(message)
# handle barge-in
decoded = json.loads(message)
if decoded['type'] == 'UserStartedSpeaking':
clear_message = {
"event": "clear",
"streamSid": streamsid
}
await twilio_ws.send(json.dumps(clear_message))

continue

print(type(message))
raw_mulaw = message
}

await sts_ws.send(json.dumps(config_message))

async def sts_sender(sts_ws):
print("sts_sender started")
while True:
chunk = await audio_queue.get()
await sts_ws.send(chunk)

async def sts_receiver(sts_ws):
print("sts_receiver started")
# we will wait until the twilio ws connection figures out the streamsid
streamsid = await streamsid_queue.get()
# for each sts result received, forward it on to the call
async for message in sts_ws:
if type(message) is str:
print(message)
# handle barge-in
decoded = json.loads(message)
if decoded['type'] == 'UserStartedSpeaking':
clear_message = {
"event": "clear",
"streamSid": streamsid
}
await twilio_ws.send(json.dumps(clear_message))

# construct a Twilio media message with the raw mulaw (see https://www.twilio.com/docs/voice/twiml/stream#websocket-messages---to-twilio)
media_message = {
"event": "media",
"streamSid": streamsid,
"media": {"payload": base64.b64encode(raw_mulaw).decode("ascii")},
}
continue

# send the TTS audio to the attached phonecall
await twilio_ws.send(json.dumps(media_message))
print(type(message))
raw_mulaw = message

# construct a Twilio media message with the raw mulaw (see https://www.twilio.com/docs/voice/twiml/stream#websocket-messages---to-twilio)
media_message = {
"event": "media",
"streamSid": streamsid,
"media": {"payload": base64.b64encode(raw_mulaw).decode("ascii")},
}

# send the TTS audio to the attached phonecall
await twilio_ws.send(json.dumps(media_message))

async def twilio_receiver(twilio_ws):
print("twilio_receiver started")
# twilio sends audio data as 160 byte messages containing 20ms of audio each
# we will buffer 20 twilio messages corresponding to 0.4 seconds of audio to improve throughput performance
BUFFER_SIZE = 20 * 160

inbuffer = bytearray(b"")
async for message in twilio_ws:
try:
data = json.loads(message)
if data["event"] == "start":
print("got our streamsid")
start = data["start"]
streamsid = start["streamSid"]
streamsid_queue.put_nowait(streamsid)
if data["event"] == "connected":
continue
if data["event"] == "media":
media = data["media"]
chunk = base64.b64decode(media["payload"])
if media["track"] == "inbound":
inbuffer.extend(chunk)
if data["event"] == "stop":
break

# check if our buffer is ready to send to our audio_queue (and, thus, then to sts)
while len(inbuffer) >= BUFFER_SIZE:
chunk = inbuffer[:BUFFER_SIZE]
audio_queue.put_nowait(chunk)
inbuffer = inbuffer[BUFFER_SIZE:]
except:
break

# the async for loop will end if the ws connection from twilio dies
# and if this happens, we should forward an some kind of message to sts
# to signal sts to send back remaining messages before closing(?)
# audio_queue.put_nowait(b'')

await asyncio.wait(
[
asyncio.ensure_future(sts_sender(sts_ws)),
asyncio.ensure_future(sts_receiver(sts_ws)),
asyncio.ensure_future(twilio_receiver(twilio_ws)),
]
)

await twilio_ws.close()
except websockets.exceptions.WebSocketException as e:
print("failed to connect to deepgram - will convey an error message on the phone")

# we got an error connecting to deepgram, and we want to convey this to the caller
# to do this, we need to wait to receive the streamsid from twilio
async def twilio_receiver(twilio_ws):
print("twilio_receiver started")
# twilio sends audio data as 160 byte messages containing 20ms of audio each
# we will buffer 20 twilio messages corresponding to 0.4 seconds of audio to improve throughput performance
BUFFER_SIZE = 20 * 160

inbuffer = bytearray(b"")
async for message in twilio_ws:
try:
data = json.loads(message)

if data["event"] == "start":
print("got our streamsid")
start = data["start"]
streamsid = start["streamSid"]
streamsid_queue.put_nowait(streamsid)
if data["event"] == "connected":
continue
if data["event"] == "media":
media = data["media"]
chunk = base64.b64decode(media["payload"])
if media["track"] == "inbound":
inbuffer.extend(chunk)
if data["event"] == "stop":
break

# check if our buffer is ready to send to our audio_queue (and, thus, then to sts)
while len(inbuffer) >= BUFFER_SIZE:
chunk = inbuffer[:BUFFER_SIZE]
audio_queue.put_nowait(chunk)
inbuffer = inbuffer[BUFFER_SIZE:]
reply = "An error occured connecting to deepgram."
if "429" in str(e):
reply += " Too many requests are being handled currently, please try again later."

# make a Deepgram Aura TTS request specifying that we want raw mulaw audio as the output
url = "https://api.deepgram.com/v1/speak?model=aura-asteria-en&encoding=mulaw&sample_rate=8000&container=none"
headers = {
"Authorization": "Token INSERT_DEEPGRAM_API_KEY",
"Content-Type": "application/json",
}
payload = {"text": reply}
tts_response = requests.post(url, headers=headers, json=payload)

if tts_response.status_code == 200:
raw_mulaw = tts_response.content

# construct a Twilio media message with the raw mulaw (see https://www.twilio.com/docs/voice/twiml/stream#websocket-messages---to-twilio)
media_message = {
"event": "media",
"streamSid": streamsid,
"media": {"payload": base64.b64encode(raw_mulaw).decode("ascii")},
}

# send the TTS audio to the attached phonecall
await twilio_ws.send(json.dumps(media_message))

# sleep so that this message gets played in full before the connection gets killed
await asyncio.sleep(8)

# break, which will end the phone call
break
except:
break

# the async for loop will end if the ws connection from twilio dies
# and if this happens, we should forward an some kind of message to sts
# to signal sts to send back remaining messages before closing(?)
# audio_queue.put_nowait(b'')

await asyncio.wait(
[
asyncio.ensure_future(sts_sender(sts_ws)),
asyncio.ensure_future(sts_receiver(sts_ws)),
asyncio.ensure_future(twilio_receiver(twilio_ws)),
]
)

await twilio_ws.close()


async def router(websocket, path):
if path == "/twilio":
print("twilio connection incoming")
Expand Down