-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbot.py
102 lines (84 loc) · 3.46 KB
/
bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import asyncio
import os
import aiohttp
from dyte_sdk.transport import DyteTransport
from pipecat.frames.frames import (
LLMMessagesFrame,
TranscriptionFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator,
LLMUserResponseAggregator
)
from pipecat.services.ai_services import AIService
from pipecat.services.deepgram import DeepgramSTTService, LiveOptions
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openai import OpenAILLMService
# Wrap transcript frames with {Started,Stopped}Speaking frames
class TranscriptFakeVAD(AIService):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
async def process_frame(self, frame, direction):
await super().process_frame(frame, direction)
if isinstance(frame, TranscriptionFrame):
await self.push_frame(UserStartedSpeakingFrame(), direction)
await self.push_frame(frame, direction)
await self.push_frame(UserStoppedSpeakingFrame(), direction)
else:
await self.push_frame(frame, direction)
async def main():
async with aiohttp.ClientSession() as session:
token = os.environ["DYTE_AUTH_TOKEN"]
transport = DyteTransport("preprod.dyte.io", token)
pipeline = Pipeline(
[
transport.input(),
DeepgramSTTService(
api_key=os.environ["DEEPGRAM_API_KEY"],
live_options=LiveOptions(
encoding="linear16",
language="en-US",
model="nova-2-conversationalai",
sample_rate=24000,
channels=2,
interim_results=False,
smart_format=True,
),
),
TranscriptFakeVAD(),
LLMUserResponseAggregator(),
OpenAILLMService(
api_key=os.environ["OPENAI_API_KEY"], model="gpt-4o"
),
ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.environ["ELEVENLABS_API_KEY"],
voice_id=os.environ["ELEVENLABS_VOICE_ID"],
),
transport.output(),
LLMAssistantResponseAggregator(),
]
)
runner = PipelineRunner()
task = PipelineTask(pipeline)
sent_intro = False
@transport.event_handler("on_join")
async def on_join(transport, participant):
nonlocal sent_intro
if sent_intro:
return
sent_intro = True
messages = [
{
"role": "system",
"content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by introducing yourself.",
}
]
await task.queue_frames([LLMMessagesFrame(messages)])
await runner.run(task)
asyncio.run(main())