-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Copy of PR #435 - make start_session non blocking #23
Conversation
Co-authored-by: Howard Gil <[email protected]>
🔍 Code Review Summary❗ Attention Required: This push has potential issues. 🚨 Overview
🚨 Critical Issuessecurity (2 issues)1. Potential exposure of sensitive API keys.📁 File: agentops/client.py 💡 Solution: Current Code: if self._config.api_key is None: Suggested Code: if not self._config.api_key or not isinstance(self._config.api_key, str): 2. Unnecessary thread creation in session management.📁 File: agentops/session.py 💡 Solution: Current Code: self.thread = threading.Thread(target=self._run, args=(callback,)) Suggested Code: self.thread = concurrent.futures.ThreadPoolExecutor().submit(self._run, callback)
Useful Commands
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider implementing the following changes to improve the code.
@@ -87,16 +87,13 @@ def initialize(self) -> Union[Session, None]: | |||
return | |||
|
|||
self.unsuppress_logs() | |||
|
|||
if self._config.api_key is None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment: Potential exposure of sensitive API keys.
Solution: Implement validation for API keys to ensure they are in the expected format and not empty before use.
!! Make sure the following suggestion is correct before committing it !!
if self._config.api_key is None: | |
if not self._config.api_key or not isinstance(self._config.api_key, str): |
self.stop_flag = threading.Event() | ||
self.thread = threading.Thread(target=self._run) | ||
self.thread = threading.Thread(target=self._run, args=(callback,)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment: Unnecessary thread creation in session management.
Solution: Consider using a thread pool or asynchronous programming to manage sessions more efficiently.
!! Make sure the following suggestion is correct before committing it !!
self.thread = threading.Thread(target=self._run, args=(callback,)) | |
self.thread = concurrent.futures.ThreadPoolExecutor().submit(self._run, callback) |
SummaryPurpose: Key Changes:
Impact: |
WalkthroughThis update enhances the AgentOps library by refining session management and logging mechanisms. Key changes include the introduction of a callback function for session initialization, improved error handling during session start, and the removal of redundant code. Additionally, the test suite has been updated to ensure more robust testing with automatic fixture application and improved request history assertions. These modifications aim to streamline the session lifecycle, enhance logging capabilities, and ensure more reliable testing outcomes. Changes
|
Files selected (5)
Files ignored (8)
InstructionsEmoji Descriptions:
Interact with the Bot:
Available Commands:
Tips for Using @Entelligence.AI Effectively:
Need More Help?
|
|
||
if "autogen" in sys.modules: | ||
Client().configure(instrument_llm_calls=False) | ||
Client()._initialize_autogen_logger() | ||
Client().add_default_tags(["autogen"]) | ||
|
||
if "crewai" in sys.modules: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ℹ️ Performance Improvement
Initialize Autogen Logger for Enhanced Logging
The addition of _initialize_autogen_logger()
ensures that the logging mechanism is set up correctly when the autogen
module is present. This change enhances the logging capabilities, allowing for better tracking and debugging of operations related to the autogen
module.
Commitable Code Suggestion:
if "autogen" in sys.modules: | |
Client().configure(instrument_llm_calls=False) | |
Client()._initialize_autogen_logger() | |
Client().add_default_tags(["autogen"]) | |
if "crewai" in sys.modules: | |
+ Client()._initialize_autogen_logger() |
) | ||
|
||
self._handle_unclean_exits() | ||
self._initialize_partner_framework() | ||
|
||
self._initialized = True | ||
|
||
if self._config.instrument_llm_calls: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ℹ️ Logic Error
Potential Missing Initialization Logic
The removal of _initialize_partner_framework()
without a replacement or alternative initialization logic could lead to missing setup steps for partner frameworks. Ensure that any necessary initialization previously handled by this method is now covered elsewhere in the code.
Commitable Code Suggestion:
) | |
self._handle_unclean_exits() | |
self._initialize_partner_framework() | |
self._initialized = True | |
if self._config.instrument_llm_calls: | |
def initialize(self) -> Union[Session, None]: | |
) | |
self._handle_unclean_exits() | |
- self._initialize_partner_framework() | |
- | |
self._initialized = True | |
if self._config.instrument_llm_calls: | |
<br> | |
</details> | |
"errors": 0, | ||
"apis": 0, | ||
} | ||
|
||
self.is_running = False | ||
active_sessions.append(self) | ||
self.stop_flag = threading.Event() | ||
self.thread = threading.Thread(target=self._run) | ||
self.thread = threading.Thread(target=self._run, args=(callback,)) | ||
self.thread.daemon = True | ||
self.thread.start() | ||
|
||
self.is_running = self._start_session() | ||
if self.is_running == False: | ||
self.stop_flag.set() | ||
self.thread.join(timeout=1) | ||
|
||
def set_video(self, video: str) -> None: | ||
""" | ||
Sets a url to the video recording of the session. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ℹ️ Logic Error
Ensure Session Stops if Initialization Fails
The removal of the initial session start check and immediate stop flag setting could lead to the session thread running even if the session fails to start. This could result in unnecessary resource usage and potential errors.
+ self.is_running = self._start_session()
+ if self.is_running == False:
+ self.stop_flag.set()
+ self.thread.join(timeout=1)
Commitable Code Suggestion:
"errors": 0, | |
"apis": 0, | |
} | |
self.is_running = False | |
active_sessions.append(self) | |
self.stop_flag = threading.Event() | |
self.thread = threading.Thread(target=self._run) | |
self.thread = threading.Thread(target=self._run, args=(callback,)) | |
self.thread.daemon = True | |
self.thread.start() | |
self.is_running = self._start_session() | |
if self.is_running == False: | |
self.stop_flag.set() | |
self.thread.join(timeout=1) | |
def set_video(self, video: str) -> None: | |
""" | |
Sets a url to the video recording of the session. | |
self.is_running = self._start_session() | |
if self.is_running == False: | |
self.stop_flag.set() | |
self.thread.join(timeout=1) |
self._pre_init_queue["agents"].append( | ||
{"name": name, "agent_id": agent_id} | ||
) | ||
session.create_agent(name=name, agent_id=agent_id) | ||
else: | ||
session.create_agent(name=name, agent_id=agent_id) | ||
|
||
return agent_id | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ℹ️ Logic Error
Conditional Agent Creation Logic
The addition of an else
block for session.create_agent()
ensures that agents are only created when the session is running. Verify that this logic does not inadvertently skip agent creation in scenarios where it is required.
Commitable Code Suggestion:
@@ -305,7 +321,8 @@ def create_agent(
self._pre_init_queue["agents"].append(
{"name": name, "agent_id": agent_id}
)
-
session.create_agent(name=name, agent_id=agent_id)
-
else:
-
session.create_agent(name=name, agent_id=agent_id) return agent_id
|
||
|
||
def get_installed_packages(): | ||
|
||
try: | ||
return { | ||
# TODO: test | ||
# TODO: add to opt out | ||
"Installed Packages": { | ||
dist.metadata["Name"]: dist.version | ||
dist.metadata.get("Name"): dist.metadata.get("Version") | ||
for dist in importlib.metadata.distributions() | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💻 Code Improvement
Use of get
Method for Safe Metadata Access
The change from using direct dictionary access to using the get
method for retrieving metadata values is a good practice. It prevents potential KeyError
exceptions if the keys 'Name' or 'Version' are not present in the metadata. However, consider adding a default value to the get
method to handle cases where the metadata might be missing these keys.
+ dist.metadata.get("Name", "Unknown"): dist.metadata.get("Version", "Unknown")
Commitable Code Suggestion:
def get_installed_packages(): | |
try: | |
return { | |
# TODO: test | |
# TODO: add to opt out | |
"Installed Packages": { | |
dist.metadata["Name"]: dist.version | |
dist.metadata.get("Name"): dist.metadata.get("Version") | |
for dist in importlib.metadata.distributions() | |
} | |
} | |
dist.metadata.get("Name", "Unknown"): dist.metadata.get("Version", "Unknown") |
|
||
if jwt is not None: | ||
JSON_HEADER["Authorization"] = f"Bearer {jwt}" | ||
|
||
res = request_session.post( | ||
url, data=payload, headers=JSON_HEADER, timeout=20 | ||
) | ||
|
||
result.parse(res) | ||
except requests.exceptions.Timeout: | ||
result.code = 408 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ℹ️ Error Handling
Add Logging for Timeout Exceptions
The current implementation catches a requests.exceptions.Timeout
exception and sets the result code to 408. Consider adding logging for this exception to provide more context when a timeout occurs, which can aid in debugging and monitoring.
except requests.exceptions.Timeout:
+ logging.error(f"Request to {url} timed out.")
result.code = 408
Commitable Code Suggestion:
if jwt is not None: | |
JSON_HEADER["Authorization"] = f"Bearer {jwt}" | |
res = request_session.post( | |
url, data=payload, headers=JSON_HEADER, timeout=20 | |
) | |
result.parse(res) | |
except requests.exceptions.Timeout: | |
result.code = 408 | |
except requests.exceptions.Timeout: | |
logging.error(f"Request to {url} timed out.") | |
result.code = 408 |
elif event_type == "apis": | ||
self.event_counts["apis"] += 1 | ||
|
||
def _run(self) -> None: | ||
def _run(self, callback: Optional[Callable[["Session"], None]] = None) -> None: | ||
self.is_running = self._start_session() | ||
|
||
if callback: | ||
callback(self) | ||
|
||
if self.is_running == False: | ||
self.stop_flag.set() | ||
self.thread.join(timeout=1) | ||
|
||
while not self.stop_flag.is_set(): | ||
time.sleep(self.config.max_wait_time / 1000) | ||
if self.queue: | ||
if self.queue and self.jwt is not None: | ||
self._flush_queue() | ||
|
||
def create_agent(self, name, agent_id): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if tags is not None: | ||
session_tags.update(tags) | ||
|
||
def _start_session_callback(session: Session): | ||
if session.is_running: | ||
if len(self._pre_init_queue["agents"]) > 0: | ||
for agent_args in self._pre_init_queue["agents"]: | ||
session.create_agent( | ||
name=agent_args["name"], agent_id=agent_args["agent_id"] | ||
) | ||
self._pre_init_queue["agents"] = [] | ||
|
||
logger.info( | ||
colored( | ||
f"\x1b[34mSession Replay: https://app.agentops.ai/drilldown?session_id={session.session_id}\x1b[0m", | ||
"blue", | ||
) | ||
) | ||
else: | ||
self._sessions.remove(session) | ||
|
||
session = Session( | ||
session_id=session_id, | ||
tags=list(session_tags), | ||
host_env=get_host_env(self._config.env_data_opt_out), | ||
config=self._config, | ||
callback=_start_session_callback, | ||
) | ||
|
||
if not session.is_running: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ℹ️ Performance Improvement
Optimize Callback Usage for Session Initialization
The introduction of _start_session_callback
improves session management by handling pre-initialized agents and logging session replay links. However, ensure that the callback is only set when necessary to avoid unnecessary overhead in session creation.
Commitable Code Suggestion:
@@ -235,11 +232,30 @@ def start_session( if tags is not None: session_tags.update(tags) + def _start_session_callback(session: Session): + if session.is_running: + if len(self._pre_init_queue["agents"]) > 0: + for agent_args in self._pre_init_queue["agents"]: + session.create_agent( + name=agent_args["name"], agent_id=agent_args["agent_id"] + ) + self._pre_init_queue["agents"] = [] + + logger.info( + colored( + f"\x1b[34mSession Replay: https://app.agentops.ai/drilldown?session_id={session.session_id}\x1b[0m", + "blue", + ) + ) + else: + self._sessions.remove(session) + session = Session( session_id=session_id, tags=list(session_tags), host_env=get_host_env(self._config.env_data_opt_out), config=self._config, + callback=_start_session_callback, ) if not session.is_running:
self.config.parent_key, | ||
) | ||
except ApiServerException as e: | ||
return logger.error(f"Could not start session - {e}") | ||
logger.error(f"Could not start session - {e}") | ||
return False | ||
|
||
logger.debug(res.body) | ||
|
||
if res.code != 200: | ||
logger.error(f"Could not start session - server error") | ||
return False | ||
|
||
jwt = res.body.get("jwt", None) | ||
self.jwt = jwt | ||
if jwt is None: | ||
logger.error( | ||
f"Could not start session - server could not authenticate your API Key" | ||
) | ||
return False | ||
|
||
session_url = res.body.get( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔒 Security Suggestion
Sanitize Error Messages in Logs
The error handling for session start has been improved, but ensure that sensitive information is not logged. Consider sanitizing error messages to prevent potential exposure of sensitive data.
Commitable Code Suggestion:
@@ -274,16 +271,21 @@ def _start_session(self):
self.config.parent_key,
)
except ApiServerException as e:
-
return logger.error(f"Could not start session - {e}")
-
logger.error(f"Could not start session - {e}")
-
return False logger.debug(res.body) if res.code != 200:
-
logger.error(f"Could not start session - server error") return False jwt = res.body.get("jwt", None) self.jwt = jwt if jwt is None:
-
logger.error(
-
f"Could not start session - server could not authenticate your API Key"
-
) return False session_url = res.body.get(
Enhance AgentOps Initialization and Session Management
Improve the initialization process and session management in the AgentOps client.
_initialize_autogen_logger
method to configure logging for autogen.Session
class to handle post-initialization actions.These changes enhance the reliability and maintainability of the AgentOps client, particularly in session handling and logging.
Original Description
## 📥 Pull Request📘 Description
Make start session non-blocking
(This PR was copied from AgentOps-AI/agentops PR AgentOps-AI#435)