-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Autogen latency v2 #30
Conversation
…AgentOps-AI#479) In this feature we're adding the pop up chat of entelligence inside the agentops repository
* make start_session non blocking * fix build issue * bump version * callback for start session now that it is async * fix callback * unpin dependencies (AgentOps-AI#434) Co-authored-by: Howard Gil <[email protected]> * bump version number * wip * change autogen getting run every time * remove prints * remove more prints * suppress warnings * exponential retry to close data * removed event counter * fix requests mock * remove print * fixed more tests * removed inits from test_agent; does not require a client * create requests fixture * Scope fixtures * black run * remove bad files * add spaces back * revert session threading changes * remove callback * revert session changes * revert session * fix test * update tests * update tox to install local build instead of pypi * replace http client with requests so requests_mock works properly * fixed multiple sessions * fix tool recorder * removed test logs, fixed request count tests * set fixture scopes * Fixed missing async tests failing in tox, updated tox * fixed missing pass in tests * fixed timing * created rc branch --------- Co-authored-by: Shawn Qiu <[email protected]> Co-authored-by: Howard Gil <[email protected]>
* Updated README and assets with new v2 screenshots * updated font size on dashboard banner
Add Hover Badges and Update Badge Style for Social Links
…#382) * add initial files for support * working sync client * stream not working * updated examples notebook for current testing * fix for `delta.content` and cleanup * cleanup again * cleanup and add tool event * structure examples notebook * add contextual answers tracking * cleanup example notebook * create testing file * clean example notebook again * rename examples directory * updated docs page * wrap `chunk.choices[0].delta.content` in `str(...)` * update doc --------- Co-authored-by: reibs <[email protected]>
…Ops-AI#374) * add mistral support * linting * fix typo * add tests * add examples notebook * linting * fix langchain typo in pyproject.toml (updated to 0.2.14) * fix mistralai import and `undo_override` function * add mistral to readme * fix typo * modified self.llm_event to llm_event * refactoring * black * rename examples directory * fix merge * init merge * updated model name so that tokencost will recognize this as a mistral model * black lint --------- Co-authored-by: reibs <[email protected]>
SummaryPurpose: Key Changes:
Impact: |
WalkthroughThis update enhances the AgentOps project by integrating AI21 and Mistral providers, adding new examples and documentation for these integrations. The README file has been updated with badge-style links for better visual appeal. Several new images have been added to the documentation, and existing ones have been updated. The codebase has been refactored to improve session management, error handling, and logging. Additionally, the update includes new test cases and adjustments to existing ones to ensure compatibility with the latest changes. The version has been incremented to 0.3.15rc1. Changes
|
Files selected (24)
Files ignored (18)
InstructionsEmoji Descriptions:
Interact with the Bot:
Available Commands:
Tips for Using @Entelligence.AI Effectively:
Need More Help?
|
Attributes: | ||
init_timestamp (float): The timestamp for when the session started, represented as seconds since the epoch. | ||
end_timestamp (float, optional): The timestamp for when the session ended, represented as seconds since the epoch. This is only set after end_session is called. | ||
end_state (str, optional): The final state of the session. Suggested: "Success", "Fail", "Indeterminate" | ||
end_state (str, optional): The final state of the session. Suggested: "Success", "Fail", "Indeterminate". Defaults to "Indeterminate". | ||
end_state_reason (str, optional): The reason for ending the session. | ||
|
||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💻 Code Consistency
Ensure Consistent Use of Default end_state
The docstring now mentions that end_state
defaults to "Indeterminate". Ensure that this change is consistently applied across the codebase, especially in places where end_state
is checked or utilized, to prevent any logic discrepancies.
host_env: Optional[dict] = None, | ||
): | ||
self.end_timestamp = None | ||
self.end_state: Optional[str] = None | ||
self.end_state: Optional[str] = "Indeterminate" | ||
self.session_id = session_id | ||
self.init_timestamp = get_ISO_time() | ||
self.tags: List[str] = tags or [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ℹ️ Logic Error
Review Default Value for end_state
The default value for end_state
is set to "Indeterminate" in the constructor. Ensure that this default value aligns with the intended logic of the application. If the session's end state should be explicitly set by the application logic, consider initializing it to None
and setting it only when the session ends.
Commitable Code Suggestion:
host_env: Optional[dict] = None, | |
): | |
self.end_timestamp = None | |
self.end_state: Optional[str] = None | |
self.end_state: Optional[str] = "Indeterminate" | |
self.session_id = session_id | |
self.init_timestamp = get_ISO_time() | |
self.tags: List[str] = tags or [] | |
- self.end_state: Optional[str] = "Indeterminate" | |
+ self.end_state: Optional[str] = None |
return response | ||
|
||
def override(self): | ||
self._override_completion() | ||
self._override_completion_async() | ||
self._override_answer() | ||
self._override_answer_async() | ||
|
||
def _override_completion(self): | ||
from ai21.clients.studio.resources.chat import ChatCompletions | ||
|
||
global original_create | ||
original_create = ChatCompletions.create | ||
|
||
def patched_function(*args, **kwargs): | ||
# Call the original function with its original arguments | ||
init_timestamp = get_ISO_time() | ||
session = kwargs.get("session", None) | ||
if "session" in kwargs.keys(): | ||
del kwargs["session"] | ||
result = original_create(*args, **kwargs) | ||
return self.handle_response(result, kwargs, init_timestamp, session=session) | ||
|
||
# Override the original method with the patched one | ||
ChatCompletions.create = patched_function | ||
|
||
def _override_completion_async(self): | ||
from ai21.clients.studio.resources.chat import AsyncChatCompletions | ||
|
||
global original_create_async | ||
original_create_async = AsyncChatCompletions.create | ||
|
||
async def patched_function(*args, **kwargs): | ||
# Call the original function with its original arguments | ||
init_timestamp = get_ISO_time() | ||
session = kwargs.get("session", None) | ||
if "session" in kwargs.keys(): | ||
del kwargs["session"] | ||
result = await original_create_async(*args, **kwargs) | ||
return self.handle_response(result, kwargs, init_timestamp, session=session) | ||
|
||
# Override the original method with the patched one | ||
AsyncChatCompletions.create = patched_function | ||
|
||
def _override_answer(self): | ||
from ai21.clients.studio.resources.studio_answer import StudioAnswer | ||
|
||
global original_answer | ||
original_answer = StudioAnswer.create | ||
|
||
def patched_function(*args, **kwargs): | ||
# Call the original function with its original arguments | ||
init_timestamp = get_ISO_time() | ||
|
||
session = kwargs.get("session", None) | ||
if "session" in kwargs.keys(): | ||
del kwargs["session"] | ||
result = original_answer(*args, **kwargs) | ||
return self.handle_response(result, kwargs, init_timestamp, session=session) | ||
|
||
StudioAnswer.create = patched_function | ||
|
||
def _override_answer_async(self): | ||
from ai21.clients.studio.resources.studio_answer import AsyncStudioAnswer | ||
|
||
global original_answer_async | ||
original_answer_async = AsyncStudioAnswer.create | ||
|
||
async def patched_function(*args, **kwargs): | ||
# Call the original function with its original arguments | ||
init_timestamp = get_ISO_time() | ||
|
||
session = kwargs.get("session", None) | ||
if "session" in kwargs.keys(): | ||
del kwargs["session"] | ||
result = await original_answer_async(*args, **kwargs) | ||
return self.handle_response(result, kwargs, init_timestamp, session=session) | ||
|
||
AsyncStudioAnswer.create = patched_function | ||
|
||
def undo_override(self): | ||
if ( | ||
self.original_create is not None | ||
and self.original_create_async is not None | ||
and self.original_answer is not None | ||
and self.original_answer_async is not None | ||
): | ||
from ai21.clients.studio.resources.chat import ( | ||
ChatCompletions, | ||
AsyncChatCompletions, | ||
) | ||
from ai21.clients.studio.resources.studio_answer import ( | ||
StudioAnswer, | ||
AsyncStudioAnswer, | ||
) | ||
|
||
ChatCompletions.create = self.original_create | ||
AsyncChatCompletions.create = self.original_create_async | ||
StudioAnswer.create = self.original_answer | ||
AsyncStudioAnswer.create = self.original_answer_async |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ℹ️ Logic Error
Incorrect Use of getattr
for Tool Calls
The getattr
function is incorrectly used to access choice.delta.tool_calls
. The current implementation will always return None
because the first argument should be an object, not a string. This could lead to missing tool call events in the accumulated delta.
- if getattr("choice.delta", "tool_calls", None):
+ if getattr(choice.delta, "tool_calls", None):
Commitable Code Suggestion:
import inspect | |
import pprint | |
from typing import Optional | |
from agentops.llms.instrumented_provider import InstrumentedProvider | |
from agentops.time_travel import fetch_completion_override_from_time_travel_cache | |
from ..event import ErrorEvent, LLMEvent, ActionEvent, ToolEvent | |
from ..session import Session | |
from ..log_config import logger | |
from ..helpers import check_call_stack_for_agent_id, get_ISO_time | |
from ..singleton import singleton | |
@singleton | |
class AI21Provider(InstrumentedProvider): | |
original_create = None | |
original_create_async = None | |
original_answer = None | |
original_answer_async = None | |
def __init__(self, client): | |
super().__init__(client) | |
self._provider_name = "AI21" | |
def handle_response( | |
self, response, kwargs, init_timestamp, session: Optional[Session] = None | |
): | |
"""Handle responses for AI21""" | |
from ai21.stream.stream import Stream | |
from ai21.stream.async_stream import AsyncStream | |
from ai21.models.chat.chat_completion_chunk import ChatCompletionChunk | |
from ai21.models.chat.chat_completion_response import ChatCompletionResponse | |
from ai21.models.responses.answer_response import AnswerResponse | |
llm_event = LLMEvent(init_timestamp=init_timestamp, params=kwargs) | |
action_event = ActionEvent(init_timestamp=init_timestamp, params=kwargs) | |
if session is not None: | |
llm_event.session_id = session.session_id | |
def handle_stream_chunk(chunk: ChatCompletionChunk): | |
# We take the first ChatCompletionChunk and accumulate the deltas from all subsequent chunks to build one full chat completion | |
if llm_event.returns is None: | |
llm_event.returns = chunk | |
# Manually setting content to empty string to avoid error | |
llm_event.returns.choices[0].delta.content = "" | |
try: | |
accumulated_delta = llm_event.returns.choices[0].delta | |
llm_event.agent_id = check_call_stack_for_agent_id() | |
llm_event.model = kwargs["model"] | |
llm_event.prompt = [ | |
message.model_dump() for message in kwargs["messages"] | |
] | |
# NOTE: We assume for completion only choices[0] is relevant | |
choice = chunk.choices[0] | |
if choice.delta.content: | |
accumulated_delta.content += choice.delta.content | |
if choice.delta.role: | |
accumulated_delta.role = choice.delta.role | |
if getattr("choice.delta", "tool_calls", None): | |
accumulated_delta.tool_calls += ToolEvent(logs=choice.delta.tools) | |
if choice.finish_reason: | |
# Streaming is done. Record LLMEvent | |
llm_event.returns.choices[0].finish_reason = choice.finish_reason | |
llm_event.completion = { | |
"role": accumulated_delta.role, | |
"content": accumulated_delta.content, | |
} | |
llm_event.prompt_tokens = chunk.usage.prompt_tokens | |
llm_event.completion_tokens = chunk.usage.completion_tokens | |
llm_event.end_timestamp = get_ISO_time() | |
self._safe_record(session, llm_event) | |
except Exception as e: | |
self._safe_record( | |
session, ErrorEvent(trigger_event=llm_event, exception=e) | |
) | |
kwargs_str = pprint.pformat(kwargs) | |
chunk = pprint.pformat(chunk) | |
logger.warning( | |
f"Unable to parse a chunk for LLM call. Skipping upload to AgentOps\n" | |
f"chunk:\n {chunk}\n" | |
f"kwargs:\n {kwargs_str}\n" | |
) | |
# if the response is a generator, decorate the generator | |
# For synchronous Stream | |
if isinstance(response, Stream): | |
def generator(): | |
for chunk in response: | |
handle_stream_chunk(chunk) | |
yield chunk | |
return generator() | |
# For asynchronous AsyncStream | |
if isinstance(response, AsyncStream): | |
async def async_generator(): | |
async for chunk in response: | |
handle_stream_chunk(chunk) | |
yield chunk | |
return async_generator() | |
# Handle object responses | |
try: | |
if isinstance(response, ChatCompletionResponse): | |
llm_event.returns = response | |
llm_event.agent_id = check_call_stack_for_agent_id() | |
llm_event.model = kwargs["model"] | |
llm_event.prompt = [ | |
message.model_dump() for message in kwargs["messages"] | |
] | |
llm_event.prompt_tokens = response.usage.prompt_tokens | |
llm_event.completion = response.choices[0].message.model_dump() | |
llm_event.completion_tokens = response.usage.completion_tokens | |
llm_event.end_timestamp = get_ISO_time() | |
self._safe_record(session, llm_event) | |
elif isinstance(response, AnswerResponse): | |
action_event.returns = response | |
action_event.agent_id = check_call_stack_for_agent_id() | |
action_event.action_type = "Contextual Answers" | |
action_event.logs = [ | |
{"context": kwargs["context"], "question": kwargs["question"]}, | |
response.model_dump() if response.model_dump() else None, | |
] | |
action_event.end_timestamp = get_ISO_time() | |
self._safe_record(session, action_event) | |
except Exception as e: | |
self._safe_record(session, ErrorEvent(trigger_event=llm_event, exception=e)) | |
kwargs_str = pprint.pformat(kwargs) | |
response = pprint.pformat(response) | |
logger.warning( | |
f"Unable to parse response for LLM call. Skipping upload to AgentOps\n" | |
f"response:\n {response}\n" | |
f"kwargs:\n {kwargs_str}\n" | |
) | |
return response | |
def override(self): | |
self._override_completion() | |
self._override_completion_async() | |
self._override_answer() | |
self._override_answer_async() | |
def _override_completion(self): | |
from ai21.clients.studio.resources.chat import ChatCompletions | |
global original_create | |
original_create = ChatCompletions.create | |
def patched_function(*args, **kwargs): | |
# Call the original function with its original arguments | |
init_timestamp = get_ISO_time() | |
session = kwargs.get("session", None) | |
if "session" in kwargs.keys(): | |
del kwargs["session"] | |
result = original_create(*args, **kwargs) | |
return self.handle_response(result, kwargs, init_timestamp, session=session) | |
# Override the original method with the patched one | |
ChatCompletions.create = patched_function | |
def _override_completion_async(self): | |
from ai21.clients.studio.resources.chat import AsyncChatCompletions | |
global original_create_async | |
original_create_async = AsyncChatCompletions.create | |
async def patched_function(*args, **kwargs): | |
# Call the original function with its original arguments | |
init_timestamp = get_ISO_time() | |
session = kwargs.get("session", None) | |
if "session" in kwargs.keys(): | |
del kwargs["session"] | |
result = await original_create_async(*args, **kwargs) | |
return self.handle_response(result, kwargs, init_timestamp, session=session) | |
# Override the original method with the patched one | |
AsyncChatCompletions.create = patched_function | |
def _override_answer(self): | |
from ai21.clients.studio.resources.studio_answer import StudioAnswer | |
global original_answer | |
original_answer = StudioAnswer.create | |
def patched_function(*args, **kwargs): | |
# Call the original function with its original arguments | |
init_timestamp = get_ISO_time() | |
session = kwargs.get("session", None) | |
if "session" in kwargs.keys(): | |
del kwargs["session"] | |
result = original_answer(*args, **kwargs) | |
return self.handle_response(result, kwargs, init_timestamp, session=session) | |
StudioAnswer.create = patched_function | |
def _override_answer_async(self): | |
from ai21.clients.studio.resources.studio_answer import AsyncStudioAnswer | |
global original_answer_async | |
original_answer_async = AsyncStudioAnswer.create | |
async def patched_function(*args, **kwargs): | |
# Call the original function with its original arguments | |
init_timestamp = get_ISO_time() | |
session = kwargs.get("session", None) | |
if "session" in kwargs.keys(): | |
del kwargs["session"] | |
result = await original_answer_async(*args, **kwargs) | |
return self.handle_response(result, kwargs, init_timestamp, session=session) | |
AsyncStudioAnswer.create = patched_function | |
def undo_override(self): | |
if ( | |
self.original_create is not None | |
and self.original_create_async is not None | |
and self.original_answer is not None | |
and self.original_answer_async is not None | |
): | |
from ai21.clients.studio.resources.chat import ( | |
ChatCompletions, | |
AsyncChatCompletions, | |
) | |
from ai21.clients.studio.resources.studio_answer import ( | |
StudioAnswer, | |
AsyncStudioAnswer, | |
) | |
ChatCompletions.create = self.original_create | |
AsyncChatCompletions.create = self.original_create_async | |
StudioAnswer.create = self.original_answer | |
AsyncStudioAnswer.create = self.original_answer_async | |
if getattr(choice.delta, "tool_calls", None): |
🔒 Security Suggestion
Sanitize Sensitive Data Before Logging
The current implementation logs potentially sensitive information, such as kwargs
and chunk
, in the event of an error. Consider sanitizing or redacting sensitive data before logging to prevent exposure of sensitive information.
header=None, | ||
) -> Response: | ||
result = Response() | ||
try: | ||
# Create request session with retries configured | ||
request_session = requests.Session() | ||
request_session.mount(url, HTTPAdapter(max_retries=retry_config)) | ||
|
||
if api_key is not None: | ||
JSON_HEADER["X-Agentops-Api-Key"] = api_key | ||
session = cls.get_session() | ||
|
||
if parent_key is not None: | ||
JSON_HEADER["X-Agentops-Parent-Key"] = parent_key | ||
|
||
if jwt is not None: | ||
JSON_HEADER["Authorization"] = f"Bearer {jwt}" | ||
|
||
res = request_session.post( | ||
url, data=payload, headers=JSON_HEADER, timeout=20 | ||
) | ||
# Update headers for this request | ||
headers = dict(session.headers) | ||
if api_key: | ||
headers["X-Agentops-Api-Key"] = api_key | ||
if parent_key: | ||
headers["X-Agentops-Parent-Key"] = parent_key | ||
if jwt: | ||
headers["Authorization"] = f"Bearer {jwt}" | ||
|
||
try: | ||
res = session.post(url, data=payload, headers=headers, timeout=20) | ||
result.parse(res) | ||
|
||
except requests.exceptions.Timeout: | ||
result.code = 408 | ||
result.status = HttpStatus.TIMEOUT | ||
raise ApiServerException( | ||
"Could not reach API server - connection timed out" | ||
) | ||
|
||
except requests.exceptions.HTTPError as e: | ||
try: | ||
result.parse(e.response) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
def get_installed_packages(): | ||
|
||
try: | ||
return { | ||
# TODO: test | ||
# TODO: add to opt out | ||
"Installed Packages": { | ||
dist.metadata["Name"]: dist.version | ||
dist.metadata.get("Name"): dist.metadata.get("Version") | ||
for dist in importlib.metadata.distributions() | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ℹ️ Logic Error
Handle None Values in Metadata
The change from dist.metadata["Name"]
to dist.metadata.get("Name")
and similarly for Version
is a good practice to avoid KeyError
if the metadata is missing. However, this change introduces a potential issue where None
values could be included in the dictionary if the metadata is not present. This could lead to unexpected behavior when processing the installed packages list.
def get_installed_packages():
try:
return {
"Installed Packages": {
- dist.metadata.get("Name"): dist.metadata.get("Version")
+ name: version
+ for dist in importlib.metadata.distributions()
+ if (name := dist.metadata.get("Name")) is not None
+ and (version := dist.metadata.get("Version")) is not None
}
}
Commitable Code Suggestion:
def get_installed_packages(): | |
try: | |
return { | |
# TODO: test | |
# TODO: add to opt out | |
"Installed Packages": { | |
dist.metadata["Name"]: dist.version | |
dist.metadata.get("Name"): dist.metadata.get("Version") | |
for dist in importlib.metadata.distributions() | |
} | |
} | |
def get_installed_packages(): | |
try: | |
return { | |
"Installed Packages": { | |
name: version | |
for dist in importlib.metadata.distributions() | |
if (name := dist.metadata.get("Name")) is not None | |
and (version := dist.metadata.get("Version")) is not None | |
} | |
} |
Streaming | ||
|
||
```python python | ||
from mistralai import Mistral | ||
import agentops | ||
|
||
# Beginning of program's code (i.e. main.py, __init__.py) | ||
agentops.init(<INSERT YOUR API KEY HERE>) | ||
|
||
client = Mistral( | ||
# This is the default and can be omitted | ||
api_key=os.environ.get("MISTRAL_API_KEY"), | ||
) | ||
|
||
message = client.chat.stream( | ||
messages=[ | ||
{ | ||
"role": "user", | ||
"content": "Tell me something cool about streaming agents", | ||
} | ||
], | ||
model="open-mistral-nemo", | ||
) | ||
|
||
response = "" | ||
for event in message: | ||
if event.data.choices[0].finish_reason == "stop": | ||
print("\n") | ||
print(response) | ||
print("\n") | ||
else: | ||
response += event.text | ||
|
||
agentops.end_session('Success') | ||
``` | ||
|
||
Async | ||
|
||
```python python | ||
import asyncio | ||
from mistralai import Mistral | ||
|
||
client = Mistral( | ||
# This is the default and can be omitted | ||
api_key=os.environ.get("MISTRAL_API_KEY"), | ||
) | ||
|
||
|
||
async def main() -> None: | ||
message = await client.chat.complete_async( | ||
messages=[ | ||
{ | ||
"role": "user", | ||
"content": "Tell me something interesting about async agents", | ||
} | ||
], | ||
model="open-mistral-nemo", | ||
) | ||
print(message.choices[0].message.content) | ||
|
||
|
||
await main() | ||
``` | ||
|
||
Async Streaming | ||
|
||
```python python | ||
import asyncio | ||
from mistralai import Mistral | ||
|
||
client = Mistral( | ||
# This is the default and can be omitted | ||
api_key=os.environ.get("MISTRAL_API_KEY"), | ||
) | ||
|
||
|
||
async def main() -> None: | ||
message = await client.chat.stream_async( | ||
messages=[ | ||
{ | ||
"role": "user", | ||
"content": "Tell me something interesting about async streaming agents", | ||
} | ||
], | ||
model="open-mistral-nemo", | ||
) | ||
|
||
response = "" | ||
async for event in message: | ||
if event.data.choices[0].finish_reason == "stop": | ||
print("\n") | ||
print(response) | ||
print("\n") | ||
else: | ||
response += event.text | ||
|
||
|
||
await main() | ||
``` | ||
</details> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ℹ️ Performance Improvement
Optimize HTTP Requests with Class-Level Session
The introduction of a class-level session object for HTTP requests in agentops/http_client.py
is a significant improvement. This change reduces the overhead of creating a new session for each request, enhancing performance and reliability. Ensure that the session is properly closed when no longer needed to prevent resource leaks.
🔒 Security Suggestion
Secure Handling of Sensitive Information
In the agentops/init.py
and agentops/client.py
, ensure that sensitive information such as API keys are not logged. Consider using environment variables or secure vaults to manage sensitive data securely.
ℹ️ Logic Error
Clarify Default Session End State
In the agentops/session.py
, setting the default session end state to 'Indeterminate' could lead to confusion if not properly documented or handled. Ensure that the logic for determining the session end state is robust and clearly documented to avoid misinterpretation.
result.status = Response.get_status(e.response.status_code) | ||
result.body = {"error": str(e)} | ||
raise ApiServerException(f"HTTPError: {e}") | ||
|
||
except requests.exceptions.RequestException as e: | ||
result.body = {"error": str(e)} | ||
raise ApiServerException(f"RequestException: {e}") | ||
|
||
# Handle error status codes | ||
if result.code == 401: | ||
raise ApiServerException( | ||
f"API server: invalid API key: {api_key}. Find your API key at https://app.agentops.ai/settings/projects" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ℹ️ Error Handling
Comprehensive Error Handling for HTTP Requests
The refactored error handling in the post
method now includes a catch-all for requests.exceptions.RequestException
, which is a good practice to handle unexpected request errors. Ensure that the error messages logged are informative but do not expose sensitive information.
Commitable Code Suggestion:
result.status = Response.get_status(e.response.status_code) | |
result.body = {"error": str(e)} | |
raise ApiServerException(f"HTTPError: {e}") | |
except requests.exceptions.RequestException as e: | |
result.body = {"error": str(e)} | |
raise ApiServerException(f"RequestException: {e}") | |
# Handle error status codes | |
if result.code == 401: | |
raise ApiServerException( | |
f"API server: invalid API key: {api_key}. Find your API key at https://app.agentops.ai/settings/projects" | |
+ except requests.exceptions.RequestException as e: | |
+ result.body = {"error": str(e)} | |
+ raise ApiServerException(f"RequestException: {e}") |
|
||
|
||
class HttpClient: | ||
_session = None # Class-level session object | ||
|
||
@classmethod | ||
def get_session(cls) -> requests.Session: | ||
if cls._session is None: | ||
cls._session = requests.Session() | ||
# Configure session defaults | ||
adapter = HTTPAdapter( | ||
max_retries=retry_config, | ||
pool_connections=1, # Assuming api.agentops.ai is the only host | ||
pool_maxsize=100, # Maximum number of connections to save in the pool | ||
) | ||
cls._session.mount("http://", adapter) | ||
cls._session.mount("https://", adapter) | ||
cls._session.headers.update( | ||
{ | ||
"Content-Type": "application/json; charset=UTF-8", | ||
"Accept": "*/*", | ||
"User-Agent": "AgentOps-Client", | ||
} | ||
) | ||
return cls._session | ||
|
||
@staticmethod | ||
@classmethod | ||
def post( | ||
cls, | ||
url: str, | ||
payload: bytes, | ||
api_key: Optional[str] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ℹ️ Performance Improvement
Class-level Session Object for Improved Performance
The introduction of a class-level session object in HttpClient
improves performance by reusing the same session for multiple requests, reducing the overhead of creating a new session each time. This change is beneficial for applications making frequent HTTP requests.
Commitable Code Suggestion:
class HttpClient: | |
_session = None # Class-level session object | |
@classmethod | |
def get_session(cls) -> requests.Session: | |
if cls._session is None: | |
cls._session = requests.Session() | |
# Configure session defaults | |
adapter = HTTPAdapter( | |
max_retries=retry_config, | |
pool_connections=1, # Assuming api.agentops.ai is the only host | |
pool_maxsize=100, # Maximum number of connections to save in the pool | |
) | |
cls._session.mount("http://", adapter) | |
cls._session.mount("https://", adapter) | |
cls._session.headers.update( | |
{ | |
"Content-Type": "application/json; charset=UTF-8", | |
"Accept": "*/*", | |
"User-Agent": "AgentOps-Client", | |
} | |
) | |
return cls._session | |
@staticmethod | |
@classmethod | |
def post( | |
cls, | |
url: str, | |
payload: bytes, | |
api_key: Optional[str] = None, | |
+ @classmethod | |
+ def get_session(cls) -> requests.Session: | |
+ if cls._session is None: | |
+ cls._session = requests.Session() | |
+ # Configure session defaults | |
+ adapter = HTTPAdapter( | |
+ max_retries=retry_config, | |
+ pool_connections=1, # Assuming api.agentops.ai is the only host | |
+ pool_maxsize=100, # Maximum number of connections to save in the pool | |
+ ) | |
+ cls._session.mount("http://", adapter) | |
+ cls._session.mount("https://", adapter) | |
+ cls._session.headers.update( | |
+ { | |
+ "Content-Type": "application/json; charset=UTF-8", | |
+ "Accept": "*/*", | |
+ "User-Agent": "AgentOps-Client", | |
+ } | |
+ ) | |
+ return cls._session |
|
||
return result | ||
|
||
@staticmethod | ||
@classmethod | ||
def get( | ||
cls, | ||
url: str, | ||
api_key: Optional[str] = None, | ||
jwt: Optional[str] = None, | ||
header=None, | ||
) -> Response: | ||
result = Response() | ||
try: | ||
# Create request session with retries configured | ||
request_session = requests.Session() | ||
request_session.mount(url, HTTPAdapter(max_retries=retry_config)) | ||
|
||
if api_key is not None: | ||
JSON_HEADER["X-Agentops-Api-Key"] = api_key | ||
session = cls.get_session() | ||
|
||
if jwt is not None: | ||
JSON_HEADER["Authorization"] = f"Bearer {jwt}" | ||
|
||
res = request_session.get(url, headers=JSON_HEADER, timeout=20) | ||
# Update headers for this request | ||
headers = dict(session.headers) | ||
if api_key: | ||
headers["X-Agentops-Api-Key"] = api_key | ||
if jwt: | ||
headers["Authorization"] = f"Bearer {jwt}" | ||
|
||
try: | ||
res = session.get(url, headers=headers, timeout=20) | ||
result.parse(res) | ||
|
||
except requests.exceptions.Timeout: | ||
result.code = 408 | ||
result.status = HttpStatus.TIMEOUT | ||
raise ApiServerException( | ||
"Could not reach API server - connection timed out" | ||
) | ||
|
||
except requests.exceptions.HTTPError as e: | ||
try: | ||
result.parse(e.response) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ℹ️ Logic Error
Ensure Correct Header Management in Class-level Session
The get
method now uses a class-level session object, which is a positive change. However, ensure that the session headers are correctly updated for each request to avoid unintended header persistence across different requests.
Commitable Code Suggestion:
return result | |
@staticmethod | |
@classmethod | |
def get( | |
cls, | |
url: str, | |
api_key: Optional[str] = None, | |
jwt: Optional[str] = None, | |
header=None, | |
) -> Response: | |
result = Response() | |
try: | |
# Create request session with retries configured | |
request_session = requests.Session() | |
request_session.mount(url, HTTPAdapter(max_retries=retry_config)) | |
if api_key is not None: | |
JSON_HEADER["X-Agentops-Api-Key"] = api_key | |
session = cls.get_session() | |
if jwt is not None: | |
JSON_HEADER["Authorization"] = f"Bearer {jwt}" | |
res = request_session.get(url, headers=JSON_HEADER, timeout=20) | |
# Update headers for this request | |
headers = dict(session.headers) | |
if api_key: | |
headers["X-Agentops-Api-Key"] = api_key | |
if jwt: | |
headers["Authorization"] = f"Bearer {jwt}" | |
try: | |
res = session.get(url, headers=headers, timeout=20) | |
result.parse(res) | |
except requests.exceptions.Timeout: | |
result.code = 408 | |
result.status = HttpStatus.TIMEOUT | |
raise ApiServerException( | |
"Could not reach API server - connection timed out" | |
) | |
except requests.exceptions.HTTPError as e: | |
try: | |
result.parse(e.response) | |
# Update headers for this request | |
headers = dict(session.headers) | |
if api_key: | |
headers["X-Agentops-Api-Key"] = api_key | |
if jwt: | |
headers["Authorization"] = f"Bearer {jwt}" | |
try: | |
res = session.get(url, headers=headers, timeout=20) | |
result.parse(res) |
📥 Pull Request
📘 Description
Briefly describe the changes you've made.
🧪 Testing
Describe the tests you performed to validate your changes.