-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Copy of PR #385 - Dead letter queue #13
Conversation
🔍 Code Review Summary❗ Attention Required: This push has potential issues. 🚨 Overview
🚨 Critical Issuesbest_practices (1 issues)1. Use of environment variables for configuration.📁 File: agentops/config.py 💡 Solution: Current Code: self.api_key: Optional[str] = None Suggested Code: self.api_key: Optional[str] = os.getenv('API_KEY')
Useful Commands
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider implementing the following changes to improve the code.
|
||
|
||
@singleton | ||
class Configuration: | ||
def __init__(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment: Use of environment variables for configuration.
Solution: Consider using a library like python-dotenv
to manage environment variables more securely.
!! Make sure the following suggestion is correct before committing it !!
def __init__(self): | |
self.api_key: Optional[str] = os.getenv('API_KEY') |
Summary
|
WalkthroughThis update enhances the AgentOps project by introducing a dead letter queue (DLQ) mechanism to handle failed HTTP requests, ensuring they are retried. The Changes
|
Files selected (7)
Files ignored (7)
InstructionsEmoji Descriptions:
Interact with the Bot:
Available Commands:
Tips for Using @Entelligence.AI Effectively:
Need More Help?
|
Summary
|
WalkthroughThis update enhances the AgentOps project by introducing a dead letter queue (DLQ) mechanism to handle failed HTTP requests, ensuring they are retried. It refactors the HttpClient class to support JWT token reauthorization and improves error handling. The update also includes new tests for the DLQ functionality and modifies existing tests to accommodate the changes. Additionally, environment variables are set for testing, and the .gitignore file is updated to exclude new directories and files. Changes
|
Files selected (7)
Files ignored (7)
InstructionsEmoji Descriptions:
Interact with the Bot:
Available Commands:
Tips for Using @Entelligence.AI Effectively:
Need More Help?
|
Summary
|
WalkthroughThis update enhances the AgentOps project by introducing a dead letter queue (DLQ) mechanism to handle failed HTTP requests, ensuring they are retried. The Changes
|
Files selected (7)
Files ignored (7)
InstructionsEmoji Descriptions:
Interact with the Bot:
Available Commands:
Tips for Using @Entelligence.AI Effectively:
Need More Help?
|
res = HttpClient.post( | ||
f"{self.config.endpoint}/v2/update_session", | ||
json.dumps(filter_unjsonable(payload)).encode("utf-8"), | ||
jwt=self.jwt, | ||
token=self.jwt, | ||
) | ||
except ApiServerException as e: | ||
return logger.error(f"Could not update session - {e}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💻 Code maintainability 🔴
Ensure Consistency in Parameter Naming
The change from jwt
to token
should be consistently applied across all relevant methods. Ensure that all parts of the codebase that interact with these methods are updated accordingly to prevent any potential runtime errors.
from uuid import UUID | ||
|
||
from .log_config import logger | ||
from .singleton import singleton | ||
|
||
|
||
@singleton | ||
class Configuration: | ||
def __init__(self): | ||
self.api_key: Optional[str] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💻 Code maintainability 🔴
Thread Safety of Singleton Implementation
The introduction of the singleton pattern to the Configuration
class is a good design choice for ensuring a single instance. However, ensure that the singleton implementation is thread-safe to prevent issues in multi-threaded environments.
+@singleton
Commitable Code Suggestion:
from uuid import UUID | |
from .log_config import logger | |
from .singleton import singleton | |
@singleton | |
class Configuration: | |
def __init__(self): | |
self.api_key: Optional[str] = None | |
@singleton |
res = HttpClient.post( | ||
f"{self.config.endpoint}/v2/update_session", | ||
json.dumps(filter_unjsonable(payload)).encode("utf-8"), | ||
jwt=self.jwt, | ||
token=self.jwt, | ||
) | ||
except ApiServerException as e: | ||
return logger.error(f"Could not end session - {e}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔒 Security Suggestion 🔴
Clarify Token Usage
Changing the parameter name from jwt
to token
in the HTTP client calls may lead to confusion if the token is still a JWT. Ensure that the token being passed is indeed a JWT and that this change does not affect any existing functionality that relies on the jwt
parameter.
if parent_key is not None: | ||
JSON_HEADER["X-Agentops-Parent-Key"] = parent_key | ||
|
||
if jwt is not None: | ||
JSON_HEADER["Authorization"] = f"Bearer {jwt}" | ||
if token is not None: | ||
decoded_jwt = jwt.decode( | ||
token, | ||
algorithms=["HS256"], | ||
options={"verify_signature": False}, | ||
) | ||
|
||
# if token is expired, reauth | ||
if datetime.fromtimestamp(decoded_jwt["exp"]) < datetime.now(): | ||
new_jwt = reauthorize_jwt( | ||
token, | ||
api_key, | ||
decoded_jwt["session_id"], | ||
) | ||
token = new_jwt | ||
|
||
JSON_HEADER["Authorization"] = f"Bearer {token}" | ||
|
||
res = request_session.post( | ||
url, data=payload, headers=JSON_HEADER, timeout=20 | ||
) | ||
|
||
result.parse(res) | ||
except requests.exceptions.Timeout: | ||
result.code = 408 | ||
result.status = HttpStatus.TIMEOUT | ||
raise ApiServerException( | ||
"Could not reach API server - connection timed out" | ||
|
||
if result.code == 200: | ||
HttpClient._retry_dlq_requests() | ||
|
||
except (requests.exceptions.Timeout, requests.exceptions.HTTPError) as e: | ||
HttpClient._handle_failed_request( | ||
url, payload, api_key, parent_key, token, type(e).__name__ | ||
) | ||
except requests.exceptions.HTTPError as e: | ||
try: | ||
result.parse(e.response) | ||
except Exception: | ||
result = Response() | ||
result.code = e.response.status_code | ||
result.status = Response.get_status(e.response.status_code) | ||
result.body = {"error": str(e)} | ||
raise ApiServerException(f"HTTPError: {e}") | ||
raise ApiServerException(f"{type(e).__name__}: {e}") | ||
except requests.exceptions.RequestException as e: | ||
result.body = {"error": str(e)} | ||
HttpClient._handle_failed_request( | ||
url, payload, api_key, parent_key, token, "RequestException" | ||
) | ||
raise ApiServerException(f"RequestException: {e}") | ||
|
||
if result.code == 401: | ||
if result.body.get("message") == "Expired Token": | ||
raise ApiServerException(f"API server: jwt token expired.") | ||
raise ApiServerException( | ||
f"API server: invalid API key: {api_key}. Find your API key at https://app.agentops.ai/settings/projects" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔒 Security vulnerability 🔴
JWT Signature Verification Missing
The JWT token is decoded without verifying the signature, which poses a security risk. Always verify the JWT signature to ensure its authenticity before using the token.
+ decoded_jwt = jwt.decode(
+ token,
+ algorithms=["HS256"],
+ options={"verify_signature": True},
+ )
Commitable Code Suggestion:
if parent_key is not None: | |
JSON_HEADER["X-Agentops-Parent-Key"] = parent_key | |
if jwt is not None: | |
JSON_HEADER["Authorization"] = f"Bearer {jwt}" | |
if token is not None: | |
decoded_jwt = jwt.decode( | |
token, | |
algorithms=["HS256"], | |
options={"verify_signature": False}, | |
) | |
# if token is expired, reauth | |
if datetime.fromtimestamp(decoded_jwt["exp"]) < datetime.now(): | |
new_jwt = reauthorize_jwt( | |
token, | |
api_key, | |
decoded_jwt["session_id"], | |
) | |
token = new_jwt | |
JSON_HEADER["Authorization"] = f"Bearer {token}" | |
res = request_session.post( | |
url, data=payload, headers=JSON_HEADER, timeout=20 | |
) | |
result.parse(res) | |
except requests.exceptions.Timeout: | |
result.code = 408 | |
result.status = HttpStatus.TIMEOUT | |
raise ApiServerException( | |
"Could not reach API server - connection timed out" | |
if result.code == 200: | |
HttpClient._retry_dlq_requests() | |
except (requests.exceptions.Timeout, requests.exceptions.HTTPError) as e: | |
HttpClient._handle_failed_request( | |
url, payload, api_key, parent_key, token, type(e).__name__ | |
) | |
except requests.exceptions.HTTPError as e: | |
try: | |
result.parse(e.response) | |
except Exception: | |
result = Response() | |
result.code = e.response.status_code | |
result.status = Response.get_status(e.response.status_code) | |
result.body = {"error": str(e)} | |
raise ApiServerException(f"HTTPError: {e}") | |
raise ApiServerException(f"{type(e).__name__}: {e}") | |
except requests.exceptions.RequestException as e: | |
result.body = {"error": str(e)} | |
HttpClient._handle_failed_request( | |
url, payload, api_key, parent_key, token, "RequestException" | |
) | |
raise ApiServerException(f"RequestException: {e}") | |
if result.code == 401: | |
if result.body.get("message") == "Expired Token": | |
raise ApiServerException(f"API server: jwt token expired.") | |
raise ApiServerException( | |
f"API server: invalid API key: {api_key}. Find your API key at https://app.agentops.ai/settings/projects" | |
) | |
decoded_jwt = jwt.decode( | |
token, | |
algorithms=["HS256"], | |
options={"verify_signature": True}, | |
) |
📜 Guidelines:
Always verify JWT signatures to ensure authenticity.
session_1 = agentops.start_session(tags=["multi-session-test-1"]) | ||
session_2 = agentops.start_session(tags=["multi-session-test-2"]) | ||
|
||
print("session_id_1: {}".format(session_1)) | ||
print("session_id_2: {}".format(session_2)) | ||
print("session_id_1: {}".format(session_1.session_id)) | ||
print("session_id_2: {}".format(session_2.session_id)) | ||
|
||
messages = [{"role": "user", "content": "Hello"}] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💻 Code maintainability 🔴
Ensure session_id Attribute Presence
The print statements have been updated to access the session_id
attribute directly from the session objects. Ensure that the session_id
attribute is always present and correctly initialized in the session
objects to avoid potential attribute errors.
-print("session_id_1: {}".format(session_1))
+print("session_id_1: {}".format(session_1.session_id))
Commitable Code Suggestion:
session_1 = agentops.start_session(tags=["multi-session-test-1"]) | |
session_2 = agentops.start_session(tags=["multi-session-test-2"]) | |
print("session_id_1: {}".format(session_1)) | |
print("session_id_2: {}".format(session_2)) | |
print("session_id_1: {}".format(session_1.session_id)) | |
print("session_id_2: {}".format(session_2.session_id)) | |
messages = [{"role": "user", "content": "Hello"}] | |
print("session_id_1: {}".format(session_1.session_id)) |
|
||
|
||
class HttpClient: | ||
|
||
@staticmethod | ||
def post( | ||
url: str, | ||
payload: bytes, | ||
api_key: Optional[str] = None, | ||
parent_key: Optional[str] = None, | ||
jwt: Optional[str] = None, | ||
header=None, | ||
token: Optional[str] = None, | ||
) -> Response: | ||
result = Response() | ||
try: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💻 Code maintainability 🟡
Consolidate Exception Handling
The error handling in the post
method could be improved by consolidating the exception handling for requests.exceptions.Timeout
and requests.exceptions.HTTPError
. This will reduce code duplication and improve readability.
- except (requests.exceptions.Timeout, requests.exceptions.HTTPError) as e:
- HttpClient._handle_failed_request(
- url, payload, api_key, parent_key, token, type(e).__name__
- )
+ except requests.exceptions.RequestException as e:
+ HttpClient._handle_failed_request(
+ url, payload, api_key, parent_key, token, type(e).__name__
+ )
Commitable Code Suggestion:
class HttpClient: | |
@staticmethod | |
def post( | |
url: str, | |
payload: bytes, | |
api_key: Optional[str] = None, | |
parent_key: Optional[str] = None, | |
jwt: Optional[str] = None, | |
header=None, | |
token: Optional[str] = None, | |
) -> Response: | |
result = Response() | |
try: | |
except requests.exceptions.RequestException as e: | |
HttpClient._handle_failed_request( | |
url, payload, api_key, parent_key, token, type(e).__name__ | |
) |
📜 Guidelines:
Consolidate similar exception handling to improve code readability.
return func(self, *args, **kwargs) | ||
|
||
return wrapper | ||
|
||
|
||
def ensure_dead_letter_queue(): | ||
# Define file path | ||
file_path = os.path.join(".agentops", "dead_letter_queue.json") | ||
|
||
# Check if directory exists | ||
if not os.path.exists(".agentops"): | ||
os.makedirs(".agentops") | ||
|
||
# Check if file exists | ||
if not os.path.isfile(file_path): | ||
with open(file_path, "w") as f: | ||
json.dump({"messages": []}, f) | ||
|
||
return file_path |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
⚠️ Critical Issue 🔴
Lack of Error Handling in DLQ Creation
The ensure_dead_letter_queue
function creates a directory and a file without handling potential exceptions. If the directory creation fails due to permission issues or other IO errors, it could lead to unhandled exceptions in the application. Consider adding error handling to manage these scenarios gracefully.
+ try:
+ if not os.path.exists('.agentops'):
+ os.makedirs('.agentops')
+ except OSError as e:
+ logger.error(f'Error creating directory: {e}')
+ raise
Commitable Code Suggestion:
return func(self, *args, **kwargs) | |
return wrapper | |
def ensure_dead_letter_queue(): | |
# Define file path | |
file_path = os.path.join(".agentops", "dead_letter_queue.json") | |
# Check if directory exists | |
if not os.path.exists(".agentops"): | |
os.makedirs(".agentops") | |
# Check if file exists | |
if not os.path.isfile(file_path): | |
with open(file_path, "w") as f: | |
json.dump({"messages": []}, f) | |
return file_path | |
try: | |
if not os.path.exists('.agentops'): | |
os.makedirs('.agentops') | |
except OSError as e: | |
logger.error(f'Error creating directory: {e}') | |
raise |
🔒 Security Suggestion 🔴
Potential Security Risk in JSON File Creation
The function ensure_dead_letter_queue
creates a JSON file without validating the contents. If the application is compromised, an attacker could potentially manipulate the file. Consider implementing validation for the contents of the JSON file before writing to it.
+ # Validate the content before writing
+ content = {'messages': []}
+ if not isinstance(content, dict):
+ raise ValueError('Invalid content for dead letter queue')
+ json.dump(content, f)
Commitable Code Suggestion:
return func(self, *args, **kwargs) | |
return wrapper | |
def ensure_dead_letter_queue(): | |
# Define file path | |
file_path = os.path.join(".agentops", "dead_letter_queue.json") | |
# Check if directory exists | |
if not os.path.exists(".agentops"): | |
os.makedirs(".agentops") | |
# Check if file exists | |
if not os.path.isfile(file_path): | |
with open(file_path, "w") as f: | |
json.dump({"messages": []}, f) | |
return file_path | |
# Validate the content before writing | |
content = {'messages': []} | |
if not isinstance(content, dict): | |
raise ValueError('Invalid content for dead letter queue') | |
json.dump(content, f) |
### Purpose | ||
# test an edge case where a request is retried after the jwt has expired | ||
import time | ||
from datetime import datetime | ||
|
||
### SETUP | ||
# Run the API server locally | ||
# In utils.py -> generate_jwt -> set the jwt expiration to 0.001 | ||
# Run this script | ||
|
||
### Plan | ||
# The first request should succeed and return a JWT | ||
# We'll manually add a failed request to the DLQ with the expired JWT | ||
# When reattempting, the http_client should identify the expired jwt and reauthorize it before sending again | ||
|
||
import agentops | ||
from agentops import ActionEvent | ||
from agentops.helpers import safe_serialize, get_ISO_time | ||
from agentops.http_client import dead_letter_queue, HttpClient | ||
|
||
api_key = "492f0ee6-0b7d-40a6-af86-22d89c7c5eea" | ||
agentops.init( | ||
endpoint="http://localhost:8000", | ||
api_key=api_key, | ||
auto_start_session=False, | ||
default_tags=["dead-letter-queue-test"], | ||
) | ||
|
||
# create session | ||
session = agentops.start_session() | ||
|
||
# add failed request to DLQ | ||
event = ActionEvent() | ||
event.end_timestamp = get_ISO_time() | ||
|
||
failed_request = { | ||
"url": "http://localhost:8000/v2/create_events", | ||
"payload": {"events": [event.__dict__]}, | ||
"api_key": str(api_key), | ||
"parent_key": None, | ||
"jwt": session.jwt, | ||
"error_type": "Timeout", | ||
} | ||
# failed_request = safe_serialize(failed_request).encode("utf-8") | ||
|
||
dead_letter_queue.add(failed_request) | ||
assert len(dead_letter_queue.get_all()) == 1 | ||
|
||
# wait for the JWT to expire | ||
time.sleep(3) | ||
|
||
# retry | ||
HttpClient()._retry_dlq_requests() | ||
session.end_session(end_state="Success") | ||
|
||
# check if the failed request is still in the DLQ | ||
assert dead_letter_queue.get_all() == [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔒 Security vulnerability 🔴
Flaky JWT Expiration Simulation
The JWT expiration handling relies on a fixed sleep duration to simulate expiration. This approach can lead to flaky tests if the timing is not precise. Consider using a mock or a controlled environment to simulate JWT expiration more reliably.
- time.sleep(3)
+ # Consider using a mock to simulate JWT expiration instead of a fixed sleep.
Commitable Code Suggestion:
### Purpose | |
# test an edge case where a request is retried after the jwt has expired | |
import time | |
from datetime import datetime | |
### SETUP | |
# Run the API server locally | |
# In utils.py -> generate_jwt -> set the jwt expiration to 0.001 | |
# Run this script | |
### Plan | |
# The first request should succeed and return a JWT | |
# We'll manually add a failed request to the DLQ with the expired JWT | |
# When reattempting, the http_client should identify the expired jwt and reauthorize it before sending again | |
import agentops | |
from agentops import ActionEvent | |
from agentops.helpers import safe_serialize, get_ISO_time | |
from agentops.http_client import dead_letter_queue, HttpClient | |
api_key = "492f0ee6-0b7d-40a6-af86-22d89c7c5eea" | |
agentops.init( | |
endpoint="http://localhost:8000", | |
api_key=api_key, | |
auto_start_session=False, | |
default_tags=["dead-letter-queue-test"], | |
) | |
# create session | |
session = agentops.start_session() | |
# add failed request to DLQ | |
event = ActionEvent() | |
event.end_timestamp = get_ISO_time() | |
failed_request = { | |
"url": "http://localhost:8000/v2/create_events", | |
"payload": {"events": [event.__dict__]}, | |
"api_key": str(api_key), | |
"parent_key": None, | |
"jwt": session.jwt, | |
"error_type": "Timeout", | |
} | |
# failed_request = safe_serialize(failed_request).encode("utf-8") | |
dead_letter_queue.add(failed_request) | |
assert len(dead_letter_queue.get_all()) == 1 | |
# wait for the JWT to expire | |
time.sleep(3) | |
# retry | |
HttpClient()._retry_dlq_requests() | |
session.end_session(end_state="Success") | |
# check if the failed request is still in the DLQ | |
assert dead_letter_queue.get_all() == [] | |
# Consider using a mock to simulate JWT expiration instead of a fixed sleep. |
💻 Code maintainability 🟡
Improve Readability of Failed Request Construction
The failed_request
dictionary is constructed inline, which can reduce readability. Consider defining it in a separate function or using a data class to improve clarity and maintainability.
+ def create_failed_request(api_key, session):
+ event = ActionEvent()
+ event.end_timestamp = get_ISO_time()
+ return {
+ "url": "http://localhost:8000/v2/create_events",
+ "payload": {"events": [event.__dict__]},
+ "api_key": str(api_key),
+ "parent_key": None,
+ "jwt": session.jwt,
+ "error_type": "Timeout",
+ }
+ failed_request = create_failed_request(api_key, session)
Commitable Code Suggestion:
### Purpose | |
# test an edge case where a request is retried after the jwt has expired | |
import time | |
from datetime import datetime | |
### SETUP | |
# Run the API server locally | |
# In utils.py -> generate_jwt -> set the jwt expiration to 0.001 | |
# Run this script | |
### Plan | |
# The first request should succeed and return a JWT | |
# We'll manually add a failed request to the DLQ with the expired JWT | |
# When reattempting, the http_client should identify the expired jwt and reauthorize it before sending again | |
import agentops | |
from agentops import ActionEvent | |
from agentops.helpers import safe_serialize, get_ISO_time | |
from agentops.http_client import dead_letter_queue, HttpClient | |
api_key = "492f0ee6-0b7d-40a6-af86-22d89c7c5eea" | |
agentops.init( | |
endpoint="http://localhost:8000", | |
api_key=api_key, | |
auto_start_session=False, | |
default_tags=["dead-letter-queue-test"], | |
) | |
# create session | |
session = agentops.start_session() | |
# add failed request to DLQ | |
event = ActionEvent() | |
event.end_timestamp = get_ISO_time() | |
failed_request = { | |
"url": "http://localhost:8000/v2/create_events", | |
"payload": {"events": [event.__dict__]}, | |
"api_key": str(api_key), | |
"parent_key": None, | |
"jwt": session.jwt, | |
"error_type": "Timeout", | |
} | |
# failed_request = safe_serialize(failed_request).encode("utf-8") | |
dead_letter_queue.add(failed_request) | |
assert len(dead_letter_queue.get_all()) == 1 | |
# wait for the JWT to expire | |
time.sleep(3) | |
# retry | |
HttpClient()._retry_dlq_requests() | |
session.end_session(end_state="Success") | |
# check if the failed request is still in the DLQ | |
assert dead_letter_queue.get_all() == [] | |
def create_failed_request(api_key, session): | |
event = ActionEvent() | |
event.end_timestamp = get_ISO_time() | |
return { | |
"url": "http://localhost:8000/v2/create_events", | |
"payload": {"events": [event.__dict__]}, | |
"api_key": str(api_key), | |
"parent_key": None, | |
"jwt": session.jwt, | |
"error_type": "Timeout", | |
} | |
failed_request = create_failed_request(api_key, session) |
Summary
|
WalkthroughThis update enhances the AgentOps project by introducing a Dead Letter Queue (DLQ) mechanism to handle failed HTTP requests, ensuring they are retried and logged appropriately. The Changes
|
Files selected (7)
Files ignored (7)
InstructionsEmoji Descriptions:
Interact with the Bot:
Available Commands:
Tips for Using @Entelligence.AI Effectively:
Need More Help?
|
from uuid import UUID | ||
|
||
from .log_config import logger | ||
from .singleton import singleton | ||
|
||
|
||
@singleton | ||
class Configuration: | ||
def __init__(self): | ||
self.api_key: Optional[str] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💻 Code maintainability 🟡
Singleton Implementation Safety
The addition of the @singleton
decorator to the Configuration
class is a good design choice to ensure a single instance is used throughout the application. However, ensure that the singleton implementation is thread-safe to avoid potential issues in a multi-threaded environment.
res = HttpClient.post( | ||
f"{self.config.endpoint}/v2/update_session", | ||
json.dumps(filter_unjsonable(payload)).encode("utf-8"), | ||
jwt=self.jwt, | ||
token=self.jwt, | ||
) | ||
except ApiServerException as e: | ||
return logger.error(f"Could not update session - {e}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💻 Code maintainability 🔴
Update Documentation for Token Usage
The change from jwt
to token
should be reflected in all related documentation and comments to avoid confusion for future developers. Ensure that any references to JWT in the code are updated to reflect the new token usage.
res = HttpClient.post( | ||
f"{self.config.endpoint}/v2/update_session", | ||
json.dumps(filter_unjsonable(payload)).encode("utf-8"), | ||
jwt=self.jwt, | ||
token=self.jwt, | ||
) | ||
except ApiServerException as e: | ||
return logger.error(f"Could not end session - {e}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔒 Security Suggestion 🔴
Token Naming Consistency
Changing the parameter name from jwt
to token
may lead to confusion if the token is still a JWT. Ensure that the new naming convention is consistent throughout the codebase and that the token is validated properly to prevent unauthorized access.
- jwt=self.jwt,
+ token=self.jwt,
Commitable Code Suggestion:
res = HttpClient.post( | |
f"{self.config.endpoint}/v2/update_session", | |
json.dumps(filter_unjsonable(payload)).encode("utf-8"), | |
jwt=self.jwt, | |
token=self.jwt, | |
) | |
except ApiServerException as e: | |
return logger.error(f"Could not end session - {e}") | |
token=self.jwt, |
session_1 = agentops.start_session(tags=["multi-session-test-1"]) | ||
session_2 = agentops.start_session(tags=["multi-session-test-2"]) | ||
|
||
print("session_id_1: {}".format(session_1)) | ||
print("session_id_2: {}".format(session_2)) | ||
print("session_id_1: {}".format(session_1.session_id)) | ||
print("session_id_2: {}".format(session_2.session_id)) | ||
|
||
messages = [{"role": "user", "content": "Hello"}] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💻 Code maintainability 🟡
Ensure session_id Availability
The print statements have been updated to access the session_id
attribute directly from the session objects. This is a good improvement for clarity, but ensure that the session_id
attribute is always available and correctly initialized in the start_session
method to avoid potential attribute errors.
+print("session_id_1: {}".format(session_1.session_id))
+print("session_id_2: {}".format(session_2.session_id))
Commitable Code Suggestion:
session_1 = agentops.start_session(tags=["multi-session-test-1"]) | |
session_2 = agentops.start_session(tags=["multi-session-test-2"]) | |
print("session_id_1: {}".format(session_1)) | |
print("session_id_2: {}".format(session_2)) | |
print("session_id_1: {}".format(session_1.session_id)) | |
print("session_id_2: {}".format(session_2.session_id)) | |
messages = [{"role": "user", "content": "Hello"}] | |
print("session_id_1: {}".format(session_1.session_id)) | |
print("session_id_2: {}".format(session_2.session_id)) |
if parent_key is not None: | ||
JSON_HEADER["X-Agentops-Parent-Key"] = parent_key | ||
|
||
if jwt is not None: | ||
JSON_HEADER["Authorization"] = f"Bearer {jwt}" | ||
if token is not None: | ||
decoded_jwt = jwt.decode( | ||
token, | ||
algorithms=["HS256"], | ||
options={"verify_signature": False}, | ||
) | ||
|
||
# if token is expired, reauth | ||
if datetime.fromtimestamp(decoded_jwt["exp"]) < datetime.now(): | ||
new_jwt = reauthorize_jwt( | ||
token, | ||
api_key, | ||
decoded_jwt["session_id"], | ||
) | ||
token = new_jwt | ||
|
||
JSON_HEADER["Authorization"] = f"Bearer {token}" | ||
|
||
res = request_session.post( | ||
url, data=payload, headers=JSON_HEADER, timeout=20 | ||
) | ||
|
||
result.parse(res) | ||
except requests.exceptions.Timeout: | ||
result.code = 408 | ||
result.status = HttpStatus.TIMEOUT | ||
raise ApiServerException( | ||
"Could not reach API server - connection timed out" | ||
|
||
if result.code == 200: | ||
HttpClient._retry_dlq_requests() | ||
|
||
except (requests.exceptions.Timeout, requests.exceptions.HTTPError) as e: | ||
HttpClient._handle_failed_request( | ||
url, payload, api_key, parent_key, token, type(e).__name__ | ||
) | ||
except requests.exceptions.HTTPError as e: | ||
try: | ||
result.parse(e.response) | ||
except Exception: | ||
result = Response() | ||
result.code = e.response.status_code | ||
result.status = Response.get_status(e.response.status_code) | ||
result.body = {"error": str(e)} | ||
raise ApiServerException(f"HTTPError: {e}") | ||
raise ApiServerException(f"{type(e).__name__}: {e}") | ||
except requests.exceptions.RequestException as e: | ||
result.body = {"error": str(e)} | ||
HttpClient._handle_failed_request( | ||
url, payload, api_key, parent_key, token, "RequestException" | ||
) | ||
raise ApiServerException(f"RequestException: {e}") | ||
|
||
if result.code == 401: | ||
if result.body.get("message") == "Expired Token": | ||
raise ApiServerException(f"API server: jwt token expired.") | ||
raise ApiServerException( | ||
f"API server: invalid API key: {api_key}. Find your API key at https://app.agentops.ai/settings/projects" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔒 Security vulnerability 🔴
JWT Signature Verification Missing
The new implementation decodes the JWT token without verifying its signature, which poses a security risk. Ensure that the token is verified before using its claims to prevent potential attacks.
+ decoded_jwt = jwt.decode(
+ token,
+ algorithms=["HS256"],
+ options={"verify_signature": True},
+ )
Commitable Code Suggestion:
if parent_key is not None: | |
JSON_HEADER["X-Agentops-Parent-Key"] = parent_key | |
if jwt is not None: | |
JSON_HEADER["Authorization"] = f"Bearer {jwt}" | |
if token is not None: | |
decoded_jwt = jwt.decode( | |
token, | |
algorithms=["HS256"], | |
options={"verify_signature": False}, | |
) | |
# if token is expired, reauth | |
if datetime.fromtimestamp(decoded_jwt["exp"]) < datetime.now(): | |
new_jwt = reauthorize_jwt( | |
token, | |
api_key, | |
decoded_jwt["session_id"], | |
) | |
token = new_jwt | |
JSON_HEADER["Authorization"] = f"Bearer {token}" | |
res = request_session.post( | |
url, data=payload, headers=JSON_HEADER, timeout=20 | |
) | |
result.parse(res) | |
except requests.exceptions.Timeout: | |
result.code = 408 | |
result.status = HttpStatus.TIMEOUT | |
raise ApiServerException( | |
"Could not reach API server - connection timed out" | |
if result.code == 200: | |
HttpClient._retry_dlq_requests() | |
except (requests.exceptions.Timeout, requests.exceptions.HTTPError) as e: | |
HttpClient._handle_failed_request( | |
url, payload, api_key, parent_key, token, type(e).__name__ | |
) | |
except requests.exceptions.HTTPError as e: | |
try: | |
result.parse(e.response) | |
except Exception: | |
result = Response() | |
result.code = e.response.status_code | |
result.status = Response.get_status(e.response.status_code) | |
result.body = {"error": str(e)} | |
raise ApiServerException(f"HTTPError: {e}") | |
raise ApiServerException(f"{type(e).__name__}: {e}") | |
except requests.exceptions.RequestException as e: | |
result.body = {"error": str(e)} | |
HttpClient._handle_failed_request( | |
url, payload, api_key, parent_key, token, "RequestException" | |
) | |
raise ApiServerException(f"RequestException: {e}") | |
if result.code == 401: | |
if result.body.get("message") == "Expired Token": | |
raise ApiServerException(f"API server: jwt token expired.") | |
raise ApiServerException( | |
f"API server: invalid API key: {api_key}. Find your API key at https://app.agentops.ai/settings/projects" | |
) | |
decoded_jwt = jwt.decode( | |
token, | |
algorithms=["HS256"], | |
options={"verify_signature": True}, | |
)``` | |
<br> | |
</details> | |
<details> | |
<summary>📜 Guidelines:</summary> | |
<br> | |
Ensure that JWT tokens are verified before use to prevent security vulnerabilities. | |
<br><br> | |
</details> | |
### Purpose | ||
# test an edge case where a request is retried after the jwt has expired | ||
import time | ||
from datetime import datetime | ||
|
||
### SETUP | ||
# Run the API server locally | ||
# In utils.py -> generate_jwt -> set the jwt expiration to 0.001 | ||
# Run this script | ||
|
||
### Plan | ||
# The first request should succeed and return a JWT | ||
# We'll manually add a failed request to the DLQ with the expired JWT | ||
# When reattempting, the http_client should identify the expired jwt and reauthorize it before sending again | ||
|
||
import agentops | ||
from agentops import ActionEvent | ||
from agentops.helpers import safe_serialize, get_ISO_time | ||
from agentops.http_client import dead_letter_queue, HttpClient | ||
|
||
api_key = "492f0ee6-0b7d-40a6-af86-22d89c7c5eea" | ||
agentops.init( | ||
endpoint="http://localhost:8000", | ||
api_key=api_key, | ||
auto_start_session=False, | ||
default_tags=["dead-letter-queue-test"], | ||
) | ||
|
||
# create session | ||
session = agentops.start_session() | ||
|
||
# add failed request to DLQ | ||
event = ActionEvent() | ||
event.end_timestamp = get_ISO_time() | ||
|
||
failed_request = { | ||
"url": "http://localhost:8000/v2/create_events", | ||
"payload": {"events": [event.__dict__]}, | ||
"api_key": str(api_key), | ||
"parent_key": None, | ||
"jwt": session.jwt, | ||
"error_type": "Timeout", | ||
} | ||
# failed_request = safe_serialize(failed_request).encode("utf-8") | ||
|
||
dead_letter_queue.add(failed_request) | ||
assert len(dead_letter_queue.get_all()) == 1 | ||
|
||
# wait for the JWT to expire | ||
time.sleep(3) | ||
|
||
# retry | ||
HttpClient()._retry_dlq_requests() | ||
session.end_session(end_state="Success") | ||
|
||
# check if the failed request is still in the DLQ | ||
assert dead_letter_queue.get_all() == [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
⚠️ Critical Issue 🔴
Flaky Test Risk Due to Sleep
The test script directly manipulates the JWT expiration and relies on a sleep function to wait for expiration. This approach can lead to flaky tests due to timing issues. Consider using a mock or a controlled environment to simulate JWT expiration without relying on sleep.
- time.sleep(3)
+ # Use a mock or controlled environment to simulate JWT expiration
Commitable Code Suggestion:
### Purpose | |
# test an edge case where a request is retried after the jwt has expired | |
import time | |
from datetime import datetime | |
### SETUP | |
# Run the API server locally | |
# In utils.py -> generate_jwt -> set the jwt expiration to 0.001 | |
# Run this script | |
### Plan | |
# The first request should succeed and return a JWT | |
# We'll manually add a failed request to the DLQ with the expired JWT | |
# When reattempting, the http_client should identify the expired jwt and reauthorize it before sending again | |
import agentops | |
from agentops import ActionEvent | |
from agentops.helpers import safe_serialize, get_ISO_time | |
from agentops.http_client import dead_letter_queue, HttpClient | |
api_key = "492f0ee6-0b7d-40a6-af86-22d89c7c5eea" | |
agentops.init( | |
endpoint="http://localhost:8000", | |
api_key=api_key, | |
auto_start_session=False, | |
default_tags=["dead-letter-queue-test"], | |
) | |
# create session | |
session = agentops.start_session() | |
# add failed request to DLQ | |
event = ActionEvent() | |
event.end_timestamp = get_ISO_time() | |
failed_request = { | |
"url": "http://localhost:8000/v2/create_events", | |
"payload": {"events": [event.__dict__]}, | |
"api_key": str(api_key), | |
"parent_key": None, | |
"jwt": session.jwt, | |
"error_type": "Timeout", | |
} | |
# failed_request = safe_serialize(failed_request).encode("utf-8") | |
dead_letter_queue.add(failed_request) | |
assert len(dead_letter_queue.get_all()) == 1 | |
# wait for the JWT to expire | |
time.sleep(3) | |
# retry | |
HttpClient()._retry_dlq_requests() | |
session.end_session(end_state="Success") | |
# check if the failed request is still in the DLQ | |
assert dead_letter_queue.get_all() == [] | |
# Use a mock or controlled environment to simulate JWT expiration |
🔒 Security Suggestion 🔴
Hardcoded API Key
The API key is hardcoded in the test script, which poses a security risk if the code is ever exposed. Consider using environment variables or a secure vault to manage sensitive information.
- api_key = "492f0ee6-0b7d-40a6-af86-22d89c7c5eea"
+ api_key = os.getenv('API_KEY') # Load from environment variable
Commitable Code Suggestion:
### Purpose | |
# test an edge case where a request is retried after the jwt has expired | |
import time | |
from datetime import datetime | |
### SETUP | |
# Run the API server locally | |
# In utils.py -> generate_jwt -> set the jwt expiration to 0.001 | |
# Run this script | |
### Plan | |
# The first request should succeed and return a JWT | |
# We'll manually add a failed request to the DLQ with the expired JWT | |
# When reattempting, the http_client should identify the expired jwt and reauthorize it before sending again | |
import agentops | |
from agentops import ActionEvent | |
from agentops.helpers import safe_serialize, get_ISO_time | |
from agentops.http_client import dead_letter_queue, HttpClient | |
api_key = "492f0ee6-0b7d-40a6-af86-22d89c7c5eea" | |
agentops.init( | |
endpoint="http://localhost:8000", | |
api_key=api_key, | |
auto_start_session=False, | |
default_tags=["dead-letter-queue-test"], | |
) | |
# create session | |
session = agentops.start_session() | |
# add failed request to DLQ | |
event = ActionEvent() | |
event.end_timestamp = get_ISO_time() | |
failed_request = { | |
"url": "http://localhost:8000/v2/create_events", | |
"payload": {"events": [event.__dict__]}, | |
"api_key": str(api_key), | |
"parent_key": None, | |
"jwt": session.jwt, | |
"error_type": "Timeout", | |
} | |
# failed_request = safe_serialize(failed_request).encode("utf-8") | |
dead_letter_queue.add(failed_request) | |
assert len(dead_letter_queue.get_all()) == 1 | |
# wait for the JWT to expire | |
time.sleep(3) | |
# retry | |
HttpClient()._retry_dlq_requests() | |
session.end_session(end_state="Success") | |
# check if the failed request is still in the DLQ | |
assert dead_letter_queue.get_all() == [] | |
api_key = os.getenv('API_KEY') # Load from environment variable |
return func(self, *args, **kwargs) | ||
|
||
return wrapper | ||
|
||
|
||
def ensure_dead_letter_queue(): | ||
# Define file path | ||
file_path = os.path.join(".agentops", "dead_letter_queue.json") | ||
|
||
# Check if directory exists | ||
if not os.path.exists(".agentops"): | ||
os.makedirs(".agentops") | ||
|
||
# Check if file exists | ||
if not os.path.isfile(file_path): | ||
with open(file_path, "w") as f: | ||
json.dump({"messages": []}, f) | ||
|
||
return file_path |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
⚠️ Critical Issue 🔴
Lack of Error Handling in Dead Letter Queue Setup
The ensure_dead_letter_queue
function creates a directory and a file without handling potential exceptions. If the directory creation fails (e.g., due to permission issues), the function will not handle this gracefully, which could lead to unhandled exceptions in the application. Consider adding error handling to manage these scenarios.
+ try:
+ if not os.path.exists('.agentops'):
+ os.makedirs('.agentops')
+ except OSError as e:
+ logger.error(f'Failed to create directory: {e}')
+ return None
Commitable Code Suggestion:
return func(self, *args, **kwargs) | |
return wrapper | |
def ensure_dead_letter_queue(): | |
# Define file path | |
file_path = os.path.join(".agentops", "dead_letter_queue.json") | |
# Check if directory exists | |
if not os.path.exists(".agentops"): | |
os.makedirs(".agentops") | |
# Check if file exists | |
if not os.path.isfile(file_path): | |
with open(file_path, "w") as f: | |
json.dump({"messages": []}, f) | |
return file_path | |
try: | |
if not os.path.exists('.agentops'): | |
os.makedirs('.agentops') | |
except OSError as e: | |
logger.error(f'Failed to create directory: {e}') | |
return None |
🔒 Security Suggestion 🔴
Data Validation Before Writing to File
The ensure_dead_letter_queue
function writes to a JSON file without validating the contents or ensuring that the data being written is safe. This could lead to potential security issues if the data is manipulated. Consider validating the data before writing it to the file.
+ # Validate data before writing
+ data_to_write = {'messages': []}
+ if isinstance(data_to_write, dict):
+ json.dump(data_to_write, f)
Commitable Code Suggestion:
return func(self, *args, **kwargs) | |
return wrapper | |
def ensure_dead_letter_queue(): | |
# Define file path | |
file_path = os.path.join(".agentops", "dead_letter_queue.json") | |
# Check if directory exists | |
if not os.path.exists(".agentops"): | |
os.makedirs(".agentops") | |
# Check if file exists | |
if not os.path.isfile(file_path): | |
with open(file_path, "w") as f: | |
json.dump({"messages": []}, f) | |
return file_path | |
# Validate data before writing | |
data_to_write = {'messages': []} | |
if isinstance(data_to_write, dict): | |
json.dump(data_to_write, f) |
UNKNOWN = -1 | ||
|
||
|
||
class Response: | ||
class DeadLetterQueue: | ||
def __init__(self): | ||
self.queue: List[dict] = [] | ||
self.is_testing = os.environ.get("ENVIRONMENT") == "test" | ||
|
||
# if not self.is_testing: | ||
self.file_path = ensure_dead_letter_queue() | ||
|
||
def read_queue(self): | ||
if not self.is_testing: | ||
with open(self.file_path, "r") as f: | ||
return json.load(f)["messages"] | ||
else: | ||
return [] | ||
|
||
def write_queue(self): | ||
if not self.is_testing: | ||
with open(self.file_path, "w") as f: | ||
json.dump({"messages": safe_serialize(self.queue)}, f) | ||
|
||
def add(self, request_data: dict): | ||
if not self.is_testing: | ||
self.queue.append(request_data) | ||
self.write_queue() | ||
|
||
def get_all(self) -> List[dict]: | ||
return self.queue | ||
|
||
def remove(self, request_data: dict): | ||
if not self.is_testing: | ||
if request_data in self.queue: | ||
self.queue.remove(request_data) | ||
self.write_queue() | ||
|
||
def clear(self): | ||
self.queue.clear() | ||
self.write_queue() | ||
|
||
|
||
dead_letter_queue = DeadLetterQueue() | ||
|
||
|
||
class Response: | ||
def __init__( | ||
self, status: HttpStatus = HttpStatus.UNKNOWN, body: Optional[dict] = None | ||
): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💻 Code maintainability 🔴
Add Error Handling for File Operations
The DeadLetterQueue
class writes to a file without handling potential I/O exceptions. Consider adding error handling to manage cases where the file cannot be accessed or written to, which could lead to unhandled exceptions during runtime.
+ def write_queue(self):
+ try:
+ if not self.is_testing:
+ with open(self.file_path, "w") as f:
+ json.dump({"messages": safe_serialize(self.queue)}, f)
+ except IOError as e:
+ logger.error(f"Failed to write to dead letter queue: {e}")
Commitable Code Suggestion:
UNKNOWN = -1 | |
class Response: | |
class DeadLetterQueue: | |
def __init__(self): | |
self.queue: List[dict] = [] | |
self.is_testing = os.environ.get("ENVIRONMENT") == "test" | |
# if not self.is_testing: | |
self.file_path = ensure_dead_letter_queue() | |
def read_queue(self): | |
if not self.is_testing: | |
with open(self.file_path, "r") as f: | |
return json.load(f)["messages"] | |
else: | |
return [] | |
def write_queue(self): | |
if not self.is_testing: | |
with open(self.file_path, "w") as f: | |
json.dump({"messages": safe_serialize(self.queue)}, f) | |
def add(self, request_data: dict): | |
if not self.is_testing: | |
self.queue.append(request_data) | |
self.write_queue() | |
def get_all(self) -> List[dict]: | |
return self.queue | |
def remove(self, request_data: dict): | |
if not self.is_testing: | |
if request_data in self.queue: | |
self.queue.remove(request_data) | |
self.write_queue() | |
def clear(self): | |
self.queue.clear() | |
self.write_queue() | |
dead_letter_queue = DeadLetterQueue() | |
class Response: | |
def __init__( | |
self, status: HttpStatus = HttpStatus.UNKNOWN, body: Optional[dict] = None | |
): | |
try: | |
if not self.is_testing: | |
with open(self.file_path, "w") as f: | |
json.dump({"messages": safe_serialize(self.queue)}, f) | |
except IOError as e: | |
logger.error(f"Failed to write to dead letter queue: {e}") |
📜 Guidelines:
Handle I/O exceptions when performing file operations to ensure robustness.
Enhance HTTP Client with Dead Letter Queue Functionality
Introduce a dead letter queue (DLQ) mechanism to handle failed HTTP requests and improve JWT management.
DeadLetterQueue
class to manage failed requests and retry logic.HttpClient
methods to utilize the DLQ for error handling.Enhances resilience of HTTP requests by ensuring failed requests are retried and logged, improving overall system reliability.
Original Description
closes ENG-565Failed requests are retried.
After 3 failures, they are stored in-memory with a queue.
When a request succeeds, the queue attempts to retry.
(This PR was copied from AgentOps-AI/agentops PR AgentOps-AI#385)