Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move start session to thread #27

Open
wants to merge 26 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
a745cd5
make start_session non blocking
siyangqiu Sep 30, 2024
a8aeffb
fix build issue
siyangqiu Sep 30, 2024
7a81291
bump version
siyangqiu Sep 30, 2024
56fda75
callback for start session now that it is async
siyangqiu Sep 30, 2024
5f94950
fix callback
siyangqiu Sep 30, 2024
06a9b68
unpin dependencies (#434)
areibman Oct 1, 2024
1c8eab6
bump version number
siyangqiu Oct 2, 2024
10c0f1e
wip
siyangqiu Oct 4, 2024
960c44c
Merge branch 'main' into move-start-session-to-thread
areibman Oct 9, 2024
6c66d0b
change autogen getting run every time
areibman Oct 11, 2024
c28f16a
remove prints
areibman Oct 11, 2024
2fcb643
remove more prints
areibman Oct 11, 2024
277b483
suppress warnings
areibman Oct 11, 2024
da4ee87
exponential retry to close data
areibman Oct 11, 2024
9a9f8ef
removed event counter
areibman Oct 11, 2024
1aee480
fix requests mock
areibman Oct 14, 2024
6b4e50c
remove print
areibman Oct 14, 2024
7f98d23
fixed more tests
areibman Oct 14, 2024
379df1d
removed inits from test_agent; does not require a client
areibman Oct 16, 2024
180edc7
create requests fixture
areibman Oct 16, 2024
1fefac3
Scope fixtures
areibman Oct 16, 2024
4c591ff
Merge branch 'main' into move-start-session-to-thread
areibman Oct 16, 2024
d0524a6
black run
areibman Oct 16, 2024
0a673cd
remove bad files
areibman Oct 16, 2024
3183f42
Updated callback parameter type to Callable[["Session"], None] in the…
Nov 6, 2024
88e1e6d
Added condition to verify session state before removal to prevent uni…
Nov 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions agentops/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

if "autogen" in sys.modules:
Client().configure(instrument_llm_calls=False)
Client()._initialize_autogen_logger()
Client().add_default_tags(["autogen"])

if "crewai" in sys.modules:
Expand Down Expand Up @@ -72,7 +73,6 @@ def init(
Client().unsuppress_logs()
t = threading.Thread(target=check_agentops_update)
t.start()

if Client().is_initialized:
return logger.warning(
"AgentOps has already been initialized. If you are trying to start a session, call agentops.start_session() instead."
Expand Down Expand Up @@ -101,7 +101,6 @@ def init(
"auto_start_session is set to False - inherited_session_id will not be used to automatically start a session"
)
return Client().initialize()

Client().configure(auto_start_session=False)
Client().initialize()
return Client().start_session(inherited_session_id=inherited_session_id)
Expand Down
30 changes: 23 additions & 7 deletions agentops/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,13 @@ def initialize(self) -> Union[Session, None]:
return

self.unsuppress_logs()

if self._config.api_key is None:
return logger.error(
"Could not initialize AgentOps client - API Key is missing."
+ "\n\t Find your API key at https://app.agentops.ai/settings/projects"
)

self._handle_unclean_exits()
self._initialize_partner_framework()

self._initialized = True

if self._config.instrument_llm_calls:
Expand All @@ -116,7 +113,7 @@ def initialize(self) -> Union[Session, None]:

return session

def _initialize_partner_framework(self) -> None:
def _initialize_autogen_logger(self) -> None:
try:
import autogen
from .partners.autogen_logger import AutogenLogger
Expand Down Expand Up @@ -235,15 +232,33 @@ def start_session(
if tags is not None:
session_tags.update(tags)

def _start_session_callback(session: Session):
if session.is_running:
if len(self._pre_init_queue["agents"]) > 0:
for agent_args in self._pre_init_queue["agents"]:
session.create_agent(
name=agent_args["name"], agent_id=agent_args["agent_id"]
)
self._pre_init_queue["agents"] = []

logger.info(
colored(
f"\x1b[34mSession Replay: https://app.agentops.ai/drilldown?session_id={session.session_id}\x1b[0m",
"blue",
)
)
elif not session.is_running:
self._sessions.remove(session)

session = Session(
Comment on lines +247 to 253

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ️ Logic Error

Ensure Sessions are Only Removed if Not Running

The change in the condition from else to elif not session.is_running ensures that only non-running sessions are removed. This is a critical fix to prevent the removal of active sessions, which could lead to unexpected behavior or errors in session management.

Commitable Code Suggestion:
Suggested change
"blue",
)
)
elif not session.is_running:
self._sessions.remove(session)
session = Session(
elif not session.is_running:
self._sessions.remove(session)

session_id=session_id,
tags=list(session_tags),
host_env=get_host_env(self._config.env_data_opt_out),
config=self._config,
callback=_start_session_callback,
Comment on lines +251 to +258

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ️ Logic Error

Verify Session State Before Removal

The logic for handling sessions has been altered to remove a session if it is not running. Ensure that this change does not inadvertently remove sessions that are intended to be retained. Consider adding a condition to verify the session's state before removal.

+            elif not session.is_running:
+                self._sessions.remove(session)
Commitable Code Suggestion:
Suggested change
self._sessions.remove(session)
session = Session(
session_id=session_id,
tags=list(session_tags),
host_env=get_host_env(self._config.env_data_opt_out),
config=self._config,
callback=_start_session_callback,
elif not session.is_running:
self._sessions.remove(session)

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Entelligence.AI /updateCommit

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Entelligence AI Bot Icon Entelligence AI Bot v4> /updateCommit

I have successfully committed the changes!

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Entelligence AI Bot Icon Entelligence AI Bot v4

@Entelligence.AI /updateCommit

Error committing file content: 409 - {'message': 'agentops/client.py does not match 0942f1e25f7d0290d2face529c8fc96cd660ee44', 'documentation_url': 'https://docs.github.com/rest/repos/contents#create-or-update-file-contents', 'status': '409'}

)

if not session.is_running:
return logger.error("Failed to start session")
if not session.is_running: return logger.error("Failed to start session")

if self._pre_init_queue["agents"] and len(self._pre_init_queue["agents"]) > 0:
for agent_args in self._pre_init_queue["agents"]:
Comment on lines +258 to 264

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💻 Code Readability

Consolidate Error Logging and Return Statement

The consolidation of the return statement with the error logging improves code readability. However, ensure that the logging message is clear and provides enough context for debugging purposes.

+        if not session.is_running:
+            return logger.error("Failed to start session: Session is not running")
Commitable Code Suggestion:
Suggested change
callback=_start_session_callback,
)
if not session.is_running:
return logger.error("Failed to start session")
if not session.is_running: return logger.error("Failed to start session")
if self._pre_init_queue["agents"] and len(self._pre_init_queue["agents"]) > 0:
for agent_args in self._pre_init_queue["agents"]:
if not session.is_running:
return logger.error("Failed to start session: Session is not running")

Expand Down Expand Up @@ -305,7 +320,8 @@ def create_agent(
self._pre_init_queue["agents"].append(
{"name": name, "agent_id": agent_id}
)
session.create_agent(name=name, agent_id=agent_id)
else:
session.create_agent(name=name, agent_id=agent_id)

return agent_id

Expand Down
4 changes: 1 addition & 3 deletions agentops/host_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,11 @@ def get_sys_packages():


def get_installed_packages():

try:
return {
# TODO: test
# TODO: add to opt out
"Installed Packages": {
dist.metadata["Name"]: dist.version
dist.metadata.get("Name"): dist.metadata.get("Version")
for dist in importlib.metadata.distributions()
}
}
Expand Down
2 changes: 0 additions & 2 deletions agentops/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,9 @@ def post(

if jwt is not None:
JSON_HEADER["Authorization"] = f"Bearer {jwt}"

res = request_session.post(
url, data=payload, headers=JSON_HEADER, timeout=20
)

result.parse(res)
except requests.exceptions.Timeout:
result.code = 408
Expand Down
36 changes: 23 additions & 13 deletions agentops/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import time
from decimal import ROUND_HALF_UP, Decimal
from termcolor import colored
from typing import Optional, List, Union
from typing import Optional, List, Union, Callable
from uuid import UUID, uuid4
from datetime import datetime

Expand Down Expand Up @@ -40,10 +40,10 @@ def __init__(
config: Configuration,
tags: Optional[List[str]] = None,
host_env: Optional[dict] = None,
callback: Optional[Callable[["Session"], None]] = None,
):
self.end_timestamp = None
self.end_state: Optional[str] = None
self.session_id = session_id
self.end_state: Optional[str] = None self.session_id = session_id
self.init_timestamp = get_ISO_time()
self.tags: List[str] = tags or []
self.video: Optional[str] = None
Comment on lines +43 to 49

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ️ Logic Error

Fix Syntax Error in Attribute Initialization

The initialization of session_id is missing a line break, which could lead to a syntax error. Ensure each attribute is initialized on a separate line to maintain code readability and prevent potential issues.

 def __init__(
         callback: Optional[Callable[["Session"], None]] = None,
     ):
         self.end_timestamp = None
         self.end_state: Optional[str] = None
+        self.session_id = session_id
         self.init_timestamp = get_ISO_time()
         self.tags: List[str] = tags or []
         self.video: Optional[str] = None
Commitable Code Suggestion:
Suggested change
callback: Optional[Callable[["Session"], None]] = None,
):
self.end_timestamp = None
self.end_state: Optional[str] = None
self.session_id = session_id
self.end_state: Optional[str] = None self.session_id = session_id
self.init_timestamp = get_ISO_time()
self.tags: List[str] = tags or []
self.video: Optional[str] = None
def __init__(
callback: Optional[Callable[["Session"], None]] = None,
):
self.end_timestamp = None
self.end_state: Optional[str] = None
self.session_id = session_id
self.init_timestamp = get_ISO_time()
self.tags: List[str] = tags or []
self.video: Optional[str] = None

Expand All @@ -60,17 +60,13 @@ def __init__(
"errors": 0,
"apis": 0,
}

self.is_running = False
active_sessions.append(self)
self.stop_flag = threading.Event()
self.thread = threading.Thread(target=self._run)
self.thread = threading.Thread(target=self._run, args=(callback,))
self.thread.daemon = True
self.thread.start()

self.is_running = self._start_session()
if self.is_running == False:
self.stop_flag.set()
self.thread.join(timeout=1)

def set_video(self, video: str) -> None:
Comment on lines 65 to 70

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ️ Logic Error

Potential Delay in Session Start

The removal of the session start logic from the __init__ method to the _run method could lead to a situation where the session is not started immediately upon object creation. This might cause unexpected behavior if other methods are called before _run is executed.

Commitable Code Suggestion:
Suggested change
self.stop_flag = threading.Event()
self.thread = threading.Thread(target=self._run)
self.thread = threading.Thread(target=self._run, args=(callback,))
self.thread.daemon = True
self.thread.start()
self.is_running = self._start_session()
if self.is_running == False:
self.stop_flag.set()
self.thread.join(timeout=1)
def set_video(self, video: str) -> None:
def __init__(self):
self.thread.daemon = True
self.thread.start()
self.is_running = self._start_session()
if not self.is_running:
self.stop_flag.set()
self.thread.join(timeout=1)

"""
Sets a url to the video recording of the session.
Expand Down Expand Up @@ -274,16 +270,21 @@ def _start_session(self):
self.config.parent_key,
)
except ApiServerException as e:
return logger.error(f"Could not start session - {e}")
logger.error(f"Could not start session - {e}")
return False

logger.debug(res.body)

if res.code != 200:
logger.error(f"Could not start session - server error")
return False

jwt = res.body.get("jwt", None)
self.jwt = jwt
if jwt is None:
logger.error(
f"Could not start session - server could not authenticate your API Key"
)
Comment on lines +286 to +287
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment: Potential exposure of sensitive information in logs.

Solution: Ensure that sensitive information is masked or omitted from logs.
!! Make sure the following suggestion is correct before committing it !!

Suggested change
f"Could not start session - server could not authenticate your API Key"
)
logger.error("Could not start session - server could not authenticate your API Key")

return False

session_url = res.body.get(
Expand Down Expand Up @@ -359,10 +360,19 @@ def _flush_queue(self) -> None:
elif event_type == "apis":
self.event_counts["apis"] += 1

def _run(self) -> None:
def _run(self, callback: Optional[Callable[["Session"], None]] = None) -> None:
self.is_running = self._start_session()

if callback:
callback(self)

if self.is_running == False:
self.stop_flag.set()
self.thread.join(timeout=1)

while not self.stop_flag.is_set():
time.sleep(self.config.max_wait_time / 1000)
if self.queue:
if self.queue and self.jwt is not None:
self._flush_queue()

def create_agent(self, name, agent_id):
Expand Down
6 changes: 0 additions & 6 deletions tests/test_agent.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
from unittest import TestCase

from agentops import track_agent
import agentops


class TrackAgentTests(TestCase):
def test_track_agent_with_class(self):
agentops.init()

@track_agent(name="agent_name")
class TestAgentClass:
t = "a"
Expand All @@ -19,8 +15,6 @@ class TestAgentClass:
self.assertIsNotNone(getattr(obj, "agent_ops_agent_id"))

def test_track_agent_with_class_name(self):
agentops.init()

@track_agent(name="agent_name")
class TestAgentClass:
t = "a"
Expand Down
2 changes: 1 addition & 1 deletion tests/test_canary.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def setup_teardown():
agentops.end_all_sessions() # teardown part


@pytest.fixture
@pytest.fixture(autouse=True, scope="function")
def mock_req():
with requests_mock.Mocker() as m:
url = "https://api.agentops.ai"
Expand Down
2 changes: 1 addition & 1 deletion tests/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def setup_teardown():
agentops.end_all_sessions() # teardown part


@pytest.fixture
@pytest.fixture(scope="function")
def mock_req():
with requests_mock.Mocker() as m:
url = "https://api.agentops.ai"
Expand Down
18 changes: 14 additions & 4 deletions tests/test_pre_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def setup_teardown():


@contextlib.contextmanager
@pytest.fixture(autouse=True)
@pytest.fixture(autouse=True, scope="function")
def mock_req():
with requests_mock.Mocker() as m:
url = "https://api.agentops.ai"
Expand Down Expand Up @@ -48,10 +48,20 @@ def test_track_agent(self, mock_req):
assert len(mock_req.request_history) == 0

agentops.init(api_key=self.api_key)
time.sleep(1)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment: Unnecessary sleep calls in tests can lead to longer test execution times.

Solution: Use mocking to simulate time delays instead of using sleep.
!! Make sure the following suggestion is correct before committing it !!

Suggested change
time.sleep(1)
# Mock time.sleep or use a testing framework that supports time manipulation.


# Assert
# start session and create agent
assert len(mock_req.request_history) == 2
assert mock_req.last_request.headers["X-Agentops-Api-Key"] == self.api_key

agentops.end_session(end_state="Success")

# Wait for flush
time.sleep(1.5)

# 3 requests: create session, create agent, update session
assert len(mock_req.request_history) == 3

assert (
mock_req.request_history[-2].headers["X-Agentops-Api-Key"] == self.api_key
)

mock_req.reset()
2 changes: 1 addition & 1 deletion tests/test_record_action.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def setup_teardown():


@contextlib.contextmanager
@pytest.fixture(autouse=True)
@pytest.fixture(autouse=True, scope="function")
def mock_req():
with requests_mock.Mocker() as m:
url = "https://api.agentops.ai"
Expand Down
2 changes: 1 addition & 1 deletion tests/test_record_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def setup_teardown():


@contextlib.contextmanager
@pytest.fixture(autouse=True)
@pytest.fixture(autouse=True, scope="function")
def mock_req():
with requests_mock.Mocker() as m:
url = "https://api.agentops.ai"
Expand Down
2 changes: 1 addition & 1 deletion tests/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def setup_teardown():
agentops.end_all_sessions() # teardown part


@pytest.fixture
@pytest.fixture(scope="function")
def mock_req():
with requests_mock.Mocker() as m:
url = "https://api.agentops.ai"
Expand Down
2 changes: 1 addition & 1 deletion tests/test_teardown.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from agentops import Client


@pytest.fixture
@pytest.fixture(scope="function")
def mock_req():
with requests_mock.Mocker() as m:
url = "https://api.agentops.ai"
Expand Down
Loading