Skip to content

Commit

Permalink
refacto: LLM actions as a dynamic separated file
Browse files Browse the repository at this point in the history
  • Loading branch information
clemlesne committed Feb 20, 2024
1 parent 1709167 commit 6959665
Show file tree
Hide file tree
Showing 7 changed files with 980 additions and 699 deletions.
215 changes: 215 additions & 0 deletions helpers/call.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
from enum import Enum
from helpers.config import CONFIG
from helpers.logging import build_logger
from models.call import CallModel
from models.message import StyleEnum as MessageStyleEnum
from typing import Generator, List, Optional
from azure.communication.callautomation import (
FileSource,
PhoneNumberIdentifier,
RecognitionChoice,
RecognizeInputType,
SsmlSource,
CallConnectionClient,
)
from azure.core.exceptions import ResourceNotFoundError, HttpResponseError
from models.message import (
MessageModel,
PersonaEnum as MessagePersonaEnum,
StyleEnum as MessageStyleEnum,
)
import re


_logger = build_logger(__name__)
SENTENCE_R = r"[^\w\s+\-–—’/'\",:;()@=]"


class ContextEnum(str, Enum):
CONNECT_AGENT = "connect_agent"
GOODBYE = "goodbye"
TRANSFER_FAILED = "transfer_failed"


def sentence_split(text: str) -> Generator[str, None, None]:
"""
Split a text into sentences.
"""
separators = re.findall(SENTENCE_R, text)
splits = re.split(SENTENCE_R, text)
for i, separator in enumerate(separators):
local_content = splits[i] + separator
yield local_content


# TODO: Disable or lower profanity filter. The filter seems enabled by default, it replaces words like "holes in my roof" by "*** in my roof". This is not acceptable for a call center.
async def _handle_recognize_media(
client: CallConnectionClient,
call: CallModel,
sound_url: str,
) -> None:
"""
Play a media to a call participant and start recognizing the response.
"""
_logger.debug(f"Recognizing media ({call.call_id})")
try:
client.start_recognizing_media(
end_silence_timeout=3, # Sometimes user includes breaks in their speech
input_type=RecognizeInputType.SPEECH,
play_prompt=FileSource(url=sound_url),
speech_language=call.lang.short_code,
target_participant=PhoneNumberIdentifier(call.phone_number), # type: ignore
)
except ResourceNotFoundError:
_logger.debug(f"Call hung up before recognizing ({call.call_id})")
except HttpResponseError as e:
if "call already terminated" in e.message.lower():
_logger.debug(f"Call hung up before playing ({call.call_id})")
else:
raise e


async def handle_media(
client: CallConnectionClient,
call: CallModel,
sound_url: str,
context: Optional[str] = None,
) -> None:
try:
client.play_media(
operation_context=context,
play_source=FileSource(url=sound_url),
)
except ResourceNotFoundError:
_logger.debug(f"Call hung up before playing ({call.call_id})")
except HttpResponseError as e:
if "call already terminated" in e.message.lower():
_logger.debug(f"Call hung up before playing ({call.call_id})")
else:
raise e


async def handle_recognize_text(
client: CallConnectionClient,
call: CallModel,
style: MessageStyleEnum = MessageStyleEnum.NONE,
text: Optional[str] = None,
store: bool = True,
) -> None:
"""
Play a text to a call participant and start recognizing the response.
If `store` is `True`, the text will be stored in the call messages. Starts by playing text, then the "ready" sound, and finally starts recognizing the response.
"""
if text:
await handle_play(
call=call,
client=client,
store=store,
style=style,
text=text,
)

await _handle_recognize_media(
call=call,
client=client,
sound_url=CONFIG.prompts.sounds.ready(),
)


async def handle_play(
client: CallConnectionClient,
call: CallModel,
text: str,
style: MessageStyleEnum = MessageStyleEnum.NONE,
context: Optional[str] = None,
store: bool = True,
) -> None:
"""
Play a text to a call participant.
If store is True, the text will be stored in the call messages. Compatible with text larger than 400 characters, in that case the text will be split in chunks and played sequentially.
See: https://learn.microsoft.com/en-us/azure/ai-services/speech-service/language-support?tabs=tts
"""
if store:
call.messages.append(
MessageModel(
content=text,
persona=MessagePersonaEnum.ASSISTANT,
style=style,
)
)

# Split text in chunks of max 400 characters, separated by sentence
chunks = []
chunk = ""
for to_add in sentence_split(text):
if len(chunk) + len(to_add) >= 400:
chunks.append(chunk.strip()) # Remove trailing space
chunk = ""
chunk += to_add
if chunk:
chunks.append(chunk)

try:
for chunk in chunks:
_logger.info(f"Playing text ({call.call_id}): {text} ({style})")
client.play_media(
operation_context=context,
play_source=_audio_from_text(chunk, style, call),
)
except ResourceNotFoundError:
_logger.debug(f"Call hung up before playing ({call.call_id})")
except HttpResponseError as e:
if "call already terminated" in e.message.lower():
_logger.debug(f"Call hung up before playing ({call.call_id})")
else:
raise e


def _audio_from_text(text: str, style: MessageStyleEnum, call: CallModel) -> SsmlSource:
"""
Generate an audio source that can be read by Azure Communication Services SDK.
Text requires to be SVG escaped, and SSML tags are used to control the voice. Plus, text is slowed down by 5% to make it more understandable for elderly people. Text is also truncated to 400 characters, as this is the limit of Azure Communication Services TTS, but a warning is logged.
"""
# Azure Speech Service TTS limit is 400 characters
if len(text) > 400:
_logger.warning(
f"Text is too long to be processed by TTS, truncating to 400 characters, fix this!"
)
text = text[:400]
ssml = f"""
<speak version="1.0" xmlns="http://www.w3.org/2001/10/synthesis" xmlns:mstts="https://www.w3.org/2001/mstts" xml:lang="{call.lang.short_code}">
<voice name="{call.lang.voice}" effect="eq_telecomhp8k">
<lexicon uri="{CONFIG.resources.public_url}/lexicon.xml" />
<mstts:express-as style="{style.value}" styledegree="0.5">
<prosody rate="0.95">{text}</prosody>
</mstts:express-as>
</voice>
</speak>
"""
return SsmlSource(ssml_text=ssml.strip())


async def handle_recognize_ivr(
client: CallConnectionClient,
call: CallModel,
text: str,
choices: List[RecognitionChoice],
) -> None:
_logger.info(f"Playing text before IVR ({call.call_id}): {text}")
_logger.debug(f"Recognizing IVR ({call.call_id})")
try:
client.start_recognizing_media(
choices=choices,
end_silence_timeout=20,
input_type=RecognizeInputType.CHOICES,
interrupt_prompt=True,
play_prompt=_audio_from_text(text, MessageStyleEnum.NONE, call),
speech_language=call.lang.short_code,
target_participant=PhoneNumberIdentifier(call.phone_number), # type: ignore
)
except ResourceNotFoundError:
_logger.debug(f"Call hung up before recognizing ({call.call_id})")
180 changes: 180 additions & 0 deletions helpers/llm_plugins.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
from azure.communication.callautomation import CallConnectionClient
from fastapi import BackgroundTasks
from helpers.call import ContextEnum as CallContextEnum, handle_play
from helpers.config import CONFIG
from helpers.llm_tools import function_schema
from inspect import getmembers, isfunction
from models.call import CallModel
from models.claim import ClaimModel
from models.message import StyleEnum as MessageStyleEnum
from models.reminder import ReminderModel
from openai.types.chat import ChatCompletionToolParam
from pydantic import ValidationError
from typing import Awaitable, Callable, Annotated, List


class LlmPlugins:
background_tasks: BackgroundTasks
call: CallModel
cancellation_callback: Callable[[], Awaitable]
client: CallConnectionClient
post_call_next: Callable[[CallModel], Awaitable]
post_call_synthesis: Callable[[CallModel], Awaitable]
style: MessageStyleEnum
user_callback: Callable[[str, MessageStyleEnum], Awaitable]

def __init__(
self,
background_tasks: BackgroundTasks,
call: CallModel,
cancellation_callback: Callable[[], Awaitable],
client: CallConnectionClient,
post_call_next: Callable[[CallModel], Awaitable],
post_call_synthesis: Callable[[CallModel], Awaitable],
style: MessageStyleEnum,
user_callback: Callable[[str, MessageStyleEnum], Awaitable],
):
self.background_tasks = background_tasks
self.call = call
self.cancellation_callback = cancellation_callback
self.client = client
self.post_call_next = post_call_next
self.post_call_synthesis = post_call_synthesis
self.style = style
self.user_callback = user_callback

async def end_call(self) -> str:
"""
Use this if the user wants to end the call, or if the user said goodbye in its last message. Be warnging that the call will be ended immediately. Never use this action directly after a recall. Example: 'I want to hang up', 'Good bye, see you soon', 'We are done here', 'We will talk again later'.
"""
await self.cancellation_callback()
await handle_play(
call=self.call,
client=self.client,
context=CallContextEnum.GOODBYE,
text=await CONFIG.prompts.tts.goodbye(self.call),
)
return "Call ended"

async def new_claim(
self,
customer_response: Annotated[
str,
"The text to be read to the customer to confirm the update. Only speak about this action. Use an imperative sentence. Example: 'I am updating the involved parties to Marie-Jeanne and Jean-Pierre', 'I am updating the contact contact info to 123 rue de la paix 75000 Paris, +33735119775, only call after 6pm'.",
],
) -> str:
"""
Use this if the user wants to create a new claim for a totally different subject. This will reset the claim and reminder data. Old is stored but not accessible anymore. Approval from the customer must be explicitely given. Example: 'I want to create a new claim'.
"""
await self.user_callback(customer_response, self.style)

self.background_tasks.add_task(self.post_call_next, self.call)
self.background_tasks.add_task(self.post_call_synthesis, self.call)

last_message = self.call.messages[-1]
call = CallModel(phone_number=self.call.phone_number)
call.messages.append(last_message)

return "Claim, reminders and messages reset"

async def new_or_updated_reminder(
self,
customer_response: Annotated[
str,
"Contextual description of the reminder. Should be detailed enough to be understood by anyone. Example: 'Watch model is Rolex Submariner 116610LN', 'User said the witnesses car was red but the police report says it was blue. Double check with the involved parties'.",
],
description: Annotated[
str,
"The text to be read to the customer to confirm the reminder. Only speak about this action. Use an imperative sentence. Example: 'I am creating a reminder for next week to call back the customer', 'I am creating a reminder for next week to send the report'.",
],
due_date_time: Annotated[
str,
"Datetime when the reminder should be triggered. Should be in the future, in the ISO format.",
],
owner: Annotated[
str,
"The owner of the reminder. Can be 'customer', 'assistant', or a third party from the claim. Try to be as specific as possible, with a name. Example: 'customer', 'assistant', 'contact', 'witness', 'police'.",
],
title: Annotated[
str,
"Short title of the reminder. Should be short and concise, in the format 'Verb + Subject'. Title is unique and allows the reminder to be updated. Example: 'Call back customer', 'Send analysis report', 'Study replacement estimates for the stolen watch'.",
],
) -> str:
"""
Use this if you think there is something important to do in the future, and you want to be reminded about it. If it already exists, it will be updated with the new values. Example: 'Remind Assitant thuesday at 10am to call back the customer', 'Remind Assitant next week to send the report', 'Remind the customer next week to send the documents by the end of the month'.
"""
await self.user_callback(customer_response, self.style)

for reminder in self.call.reminders:
if reminder.title == title:
try:
reminder.description = description
reminder.due_date_time = due_date_time # type: ignore
reminder.owner = owner
return f'Reminder "{title}" updated.'
except ValidationError as e: # Catch error
return f'Failed to edit reminder "{title}": {e.json()}'

try:
reminder = ReminderModel(
description=description,
due_date_time=due_date_time, # type: ignore
owner=owner,
title=title,
)
self.call.reminders.append(reminder)
return f'Reminder "{title}" created.'
except ValidationError as e: # Catch error
return f'Failed to create reminder "{title}": {e.json()}'

async def updated_claim(
self,
customer_response: Annotated[
str,
"The text to be read to the customer to confirm the update. Only speak about this action. Use an imperative sentence. Example: 'I am updating the involved parties to Marie-Jeanne and Jean-Pierre', 'I am updating the contact contact info to 123 rue de la paix 75000 Paris, +33735119775, only call after 6pm'.",
],
field: Annotated[
str, f"The claim field to update: {list(ClaimModel.editable_fields())}"
],
value: Annotated[
str,
"The claim field value to update. For dates, use YYYY-MM-DD HH:MM format (e.g. 2024-02-01 18:58). For phone numbers, use E164 format (e.g. +33612345678).",
],
) -> str:
"""
Use this if the user wants to update a claim field with a new value. Example: 'Update claim explanation to: I was driving on the highway when a car hit me from behind', 'Update contact contact info to: 123 rue de la paix 75000 Paris, +33735119775, only call after 6pm'.
"""
await self.user_callback(customer_response, self.style)

if not field in ClaimModel.editable_fields():
return f'Failed to update a non-editable field "{field}".'

try:
# Define the field and force to trigger validation
copy = self.call.claim.model_dump()
copy[field] = value
self.call.claim = ClaimModel.model_validate(copy)
return f'Updated claim field "{field}" with value "{value}".'
except ValidationError as e: # Catch error to inform LLM
return f'Failed to edit field "{field}": {e.json()}'

async def talk_to_human(self) -> str:
"""
Use this if the user wants to talk to a human and Assistant is unable to help. This will transfer the customer to an human agent. Approval from the customer must be explicitely given. Never use this action directly after a recall. Example: 'I want to talk to a human', 'I want to talk to a real person'.
"""
await self.cancellation_callback()
await handle_play(
call=self.call,
client=self.client,
context=CallContextEnum.CONNECT_AGENT,
text=await CONFIG.prompts.tts.end_call_to_connect_agent(self.call),
)
return "Transferring to human agent"

@staticmethod
def to_openai() -> List[ChatCompletionToolParam]:
return [
function_schema(func[1])
for func in getmembers(LlmPlugins, isfunction)
if not func[0].startswith("_")
]
Loading

0 comments on commit 6959665

Please sign in to comment.