Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update streaming processing python #194

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 167 additions & 0 deletions examples/python-demo/main-stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@

import uuid
import asyncio
from typing import Optional, List, Dict, Any
import json
import sys

from tools import weather_tool

from multi_agent_orchestrator.orchestrator import MultiAgentOrchestrator, OrchestratorConfig
from multi_agent_orchestrator.agents import (BedrockLLMAgent,
BedrockLLMAgentOptions,
AgentResponse,
AgentStreamResponse,
AgentCallbacks)
from multi_agent_orchestrator.types import ConversationMessage, ParticipantRole
from multi_agent_orchestrator.utils import AgentTools

class LLMAgentCallbacks(AgentCallbacks):
def on_llm_new_token(self, token: str) -> None:
print(token, end='', flush=True)


async def handle_request(_orchestrator: MultiAgentOrchestrator, _user_input:str, _user_id:str, _session_id:str):
stream_response = True
response:AgentResponse = await _orchestrator.route_request(_user_input, _user_id, _session_id, {}, stream_response)

# Print metadata
print("\nMetadata:")
print(f"Selected Agent: {response.metadata.agent_name}")
if stream_response and response.streaming:
async for chunk in response.output:
if isinstance(chunk, AgentStreamResponse):
if response.streaming:
print(chunk.text, end='', flush=True)
else:
if isinstance(response.output, ConversationMessage):
print(response.output.content[0]['text'])
elif isinstance(response.output, str):
print(response.output)
else:
print(response.output)

def custom_input_payload_encoder(input_text: str,
chat_history: List[Any],
user_id: str,
session_id: str,
additional_params: Optional[Dict[str, str]] = None) -> str:
return json.dumps({
'hello':'world'
})

def custom_output_payload_decoder(response: Dict[str, Any]) -> Any:
decoded_response = json.loads(
json.loads(
response['Payload'].read().decode('utf-8')
)['body'])['response']
return ConversationMessage(
role=ParticipantRole.ASSISTANT.value,
content=[{'text': decoded_response}]
)

if __name__ == "__main__":

# Initialize the orchestrator with some options
orchestrator = MultiAgentOrchestrator(options=OrchestratorConfig(
LOG_AGENT_CHAT=True,
LOG_CLASSIFIER_CHAT=True,
LOG_CLASSIFIER_RAW_OUTPUT=True,
LOG_CLASSIFIER_OUTPUT=True,
LOG_EXECUTION_TIMES=True,
MAX_RETRIES=3,
USE_DEFAULT_AGENT_IF_NONE_IDENTIFIED=True,
MAX_MESSAGE_PAIRS_PER_AGENT=10,
))

# Add some agents
tech_agent = BedrockLLMAgent(BedrockLLMAgentOptions(
name="Tech Agent",
streaming=True,
description="Specializes in technology areas including software development, hardware, AI, \
cybersecurity, blockchain, cloud computing, emerging tech innovations, and pricing/costs \
related to technology products and services.",
model_id="anthropic.claude-3-sonnet-20240229-v1:0",
# callbacks=LLMAgentCallbacks()
))
orchestrator.add_agent(tech_agent)

# Add some agents
tech_agent = BedrockLLMAgent(BedrockLLMAgentOptions(
name="Health Agent",
streaming=False,
description="Specializes in health and well being.",
model_id="anthropic.claude-3-sonnet-20240229-v1:0",
))
orchestrator.add_agent(tech_agent)

# Add a Anthropic weather agent with a tool in anthropic's tool format
# weather_agent = AnthropicAgent(AnthropicAgentOptions(
# api_key='api-key',
# name="Weather Agent",
# streaming=False,
# description="Specialized agent for giving weather condition from a city.",
# tool_config={
# 'tool': [tool.to_claude_format() for tool in weather_tool.weather_tools.tools],
# 'toolMaxRecursions': 5,
# 'useToolHandler': weather_tool.anthropic_weather_tool_handler
# },
# callbacks=LLMAgentCallbacks()
# ))

# Add an Anthropic weather agent with Tools class
# weather_agent = AnthropicAgent(AnthropicAgentOptions(
# api_key='api-key',
# name="Weather Agent",
# streaming=True,
# description="Specialized agent for giving weather condition from a city.",
# tool_config={
# 'tool': weather_tool.weather_tools,
# 'toolMaxRecursions': 5,
# },
# callbacks=LLMAgentCallbacks()
# ))

# Add a Bedrock weather agent with Tools class
# weather_agent = BedrockLLMAgent(BedrockLLMAgentOptions(
# name="Weather Agent",
# streaming=False,
# description="Specialized agent for giving weather condition from a city.",
# tool_config={
# 'tool': weather_tool.weather_tools,
# 'toolMaxRecursions': 5,
# },
# callbacks=LLMAgentCallbacks(),
# ))

# Add a Bedrock weather agent with custom handler and bedrock's tool format
weather_agent = BedrockLLMAgent(BedrockLLMAgentOptions(
name="Weather Agent",
streaming=False,
description="Specialized agent for giving weather condition from a city.",
tool_config={
'tool': [tool.to_bedrock_format() for tool in weather_tool.weather_tools.tools],
'toolMaxRecursions': 5,
'useToolHandler': weather_tool.bedrock_weather_tool_handler
}
))


weather_agent.set_system_prompt(weather_tool.weather_tool_prompt)
orchestrator.add_agent(weather_agent)

USER_ID = "user123"
SESSION_ID = str(uuid.uuid4())

print("Welcome to the interactive Multi-Agent system. Type 'quit' to exit.")

while True:
# Get user input
user_input = input("\nYou: ").strip()

if user_input.lower() == 'quit':
print("Exiting the program. Goodbye!")
sys.exit()

# Run the async function
asyncio.run(handle_request(orchestrator, user_input, USER_ID, SESSION_ID))
6 changes: 3 additions & 3 deletions python/src/multi_agent_orchestrator/agents/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""
Code for Agents.
"""
from .agent import Agent, AgentOptions, AgentCallbacks, AgentProcessingResult, AgentResponse
from .agent import Agent, AgentOptions, AgentCallbacks, AgentProcessingResult, AgentResponse, AgentStreamResponse


try:
Expand Down Expand Up @@ -32,16 +32,16 @@

from .supervisor_agent import SupervisorAgent, SupervisorAgentOptions


__all__ = [
'Agent',
'AgentOptions',
'AgentCallbacks',
'AgentProcessingResult',
'AgentResponse',
'AgentStreamResponse',
'SupervisorAgent',
'SupervisorAgentOptions'
]
]


if _AWS_AVAILABLE :
Expand Down
17 changes: 12 additions & 5 deletions python/src/multi_agent_orchestrator/agents/agent.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, List, Union, AsyncIterable, Optional, Any
from typing import Union, AsyncIterable, Optional, Any
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from multi_agent_orchestrator.types import ConversationMessage
Expand All @@ -12,13 +12,19 @@ class AgentProcessingResult:
agent_name: str
user_id: str
session_id: str
additional_params: Dict[str, Any] = field(default_factory=dict)
additional_params: dict[str, Any] = field(default_factory=dict)


class AgentStreamResponse:
def __init__(self, text: str = '', final_message: ConversationMessage = None):
self.text = text
self.final_message = final_message


@dataclass
class AgentResponse:
metadata: AgentProcessingResult
output: Union[Any, str]
output: Union[Any, str, AgentStreamResponse]
streaming: bool


Expand Down Expand Up @@ -72,9 +78,10 @@ async def process_request(
input_text: str,
user_id: str,
session_id: str,
chat_history: List[ConversationMessage],
additional_params: Optional[Dict[str, str]] = None,
chat_history: list[ConversationMessage],
additional_params: Optional[dict[str, str]] = None,
) -> Union[ConversationMessage, AsyncIterable[Any]]:

pass

def log_debug(self, class_name, message, data=None):
Expand Down
69 changes: 38 additions & 31 deletions python/src/multi_agent_orchestrator/agents/amazon_bedrock_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
AWS Bedrock's agent runtime capabilities.
"""

from typing import Dict, List, Optional, Any
from typing import Any, Optional
from dataclasses import dataclass
import os
import boto3
from botocore.exceptions import BotoCoreError, ClientError
from multi_agent_orchestrator.agents import Agent, AgentOptions
from multi_agent_orchestrator.agents import Agent, AgentOptions, AgentStreamResponse
from multi_agent_orchestrator.types import ConversationMessage, ParticipantRole
from multi_agent_orchestrator.utils import Logger

Expand All @@ -31,9 +31,9 @@ class AmazonBedrockAgentOptions(AgentOptions):
region: Optional[str] = None
agent_id: str = None
agent_alias_id: str = None
client: Optional[Any] = None
streaming: Optional[bool] = False
enableTrace: Optional[bool] = False
client: Any | None = None
streaming: bool | None = False
enableTrace: bool | None = False


class AmazonBedrockAgent(Agent):
Expand Down Expand Up @@ -88,8 +88,8 @@ async def process_request(
input_text: str,
user_id: str,
session_id: str,
chat_history: List[ConversationMessage],
additional_params: Optional[Dict[str, str]] = None
chat_history: list[ConversationMessage],
additional_params: dict[str, str] | None = None
) -> ConversationMessage:
"""
Process a user request through the Bedrock agent runtime.
Expand Down Expand Up @@ -129,32 +129,39 @@ async def process_request(
streamingConfigurations=streamingConfigurations if self.streaming else {}
)

# Process response, handling both streaming and non-streaming modes
completion = ""
for event in response['completion']:
if 'chunk' in event:
# Process streaming chunk
chunk = event['chunk']
decoded_response = chunk['bytes'].decode('utf-8')

# Trigger callback for each token (useful for real-time updates)
self.callbacks.on_llm_new_token(decoded_response)
completion += decoded_response

elif 'trace' in event:
# Log trace events if tracing is enabled
Logger.info(f"Received event: {event}") if self.enableTrace else None

else:
# Ignore unrecognized event types
pass

# Construct and return the conversation message
return ConversationMessage(
role=ParticipantRole.ASSISTANT.value,
content=[{"text": completion}]
)

if self.streaming:
async def generate_chunks():
nonlocal completion
for event in response['completion']:
if 'chunk' in event:
chunk = event['chunk']
decoded_response = chunk['bytes'].decode('utf-8')
self.callbacks.on_llm_new_token(decoded_response)
completion += decoded_response
yield AgentStreamResponse(text=decoded_response)
elif 'trace' in event and self.enableTrace:
Logger.info(f"Received event: {event}")
yield AgentStreamResponse(
final_message=ConversationMessage(
role=ParticipantRole.ASSISTANT.value,
content=[{'text':completion}]))
return generate_chunks()
else:
for event in response['completion']:
if 'chunk' in event:
chunk = event['chunk']
decoded_response = chunk['bytes'].decode('utf-8')
self.callbacks.on_llm_new_token(decoded_response)
completion += decoded_response
elif 'trace' in event and self.enableTrace:
Logger.info(f"Received event: {event}")

return ConversationMessage(
role=ParticipantRole.ASSISTANT.value,
content=[{"text": completion}]
)
except (BotoCoreError, ClientError) as error:
# Comprehensive error logging and propagation
Logger.error(f"Error processing request: {str(error)}")
Expand Down
Loading