Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v0.4 #642

Draft
wants to merge 66 commits into
base: main
Choose a base branch
from
Draft

v0.4 #642

Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
eb072db
refactor: reorganize session management into dedicated components
teocns Jan 9, 2025
88cdd43
Merge branch 'feat/session-refactoring' into feat/v0.4
teocns Jan 9, 2025
f9b141f
feat(session): add README for session package documentation
teocns Jan 9, 2025
9b6ed9e
Import `agentops/_telemetry` from `otel/v4` branch
teocns Jan 9, 2025
0fc7701
add `tests/_telemetry` from `otel/v4` branch
teocns Jan 9, 2025
051a85b
test: add `__init__` to make `tests/` a package
teocns Jan 10, 2025
2d97251
test: add llm_event_spy fixture for tests
teocns Jan 10, 2025
4a19dab
test: add VCR.py fixture for HTTP interaction recording
teocns Jan 10, 2025
51e2da2
deps: group integration-testing
teocns Jan 10, 2025
0180e2c
test: add fixture to mock package availability in tests
teocns Jan 10, 2025
73a0110
test: Add integration tests for OpenAI provider and features
teocns Jan 10, 2025
538bf98
test: add tests for concurrent API requests handling
teocns Jan 10, 2025
d679b93
Improve vcr.py configuration
teocns Jan 10, 2025
93744ce
ruff
teocns Jan 10, 2025
512c95d
chore(pyproject): update pytest options and loop scope
teocns Jan 10, 2025
e29d2b2
chore(tests): update vcr.py ignore_hosts and options
teocns Jan 11, 2025
8f02961
pyproject.toml
teocns Jan 11, 2025
a012a0f
centralize teardown in conftest.py (clear singletons, end all sessions)
teocns Jan 11, 2025
f51850e
change vcr_config scope to session
teocns Jan 11, 2025
e22513b
integration: auto start agentops session
teocns Jan 11, 2025
cb014b2
Move unit tests to dedicated folder (tests/unit)
teocns Jan 11, 2025
2c3b19d
Isolate vcr_config import into tests/integration
teocns Jan 12, 2025
6dbe54b
configure pytest to run only unit tests by default, and include integ…
teocns Jan 12, 2025
fb2be21
ci(python-tests): separate job between unit-integration tests
teocns Jan 12, 2025
caa08df
set python-tests timeout to 5 minutes
teocns Jan 12, 2025
120c455
ruff
teocns Jan 12, 2025
37edbc0
Implement jwt fixture, centralized reusable mock_req into conftest.py
teocns Jan 12, 2025
6be254a
ci(python-tests): simplify env management, remove cov from integratio…
teocns Jan 12, 2025
4d0d5a2
ruff
teocns Jan 12, 2025
2a860c8
fix: cassette for test_concurrent_api_requests
teocns Jan 12, 2025
e65f646
Cleanup vcr.py comments
teocns Jan 13, 2025
50d92c8
migrate(telemetry): config.py, encoders.py, attributes.py as-is
teocns Jan 9, 2025
adecb40
remove _telemetry
teocns Jan 9, 2025
3980a15
import to telemetry
teocns Jan 9, 2025
3a6dfa7
Add telemetry tests
teocns Jan 9, 2025
1684f51
merge exporters tests
teocns Jan 10, 2025
e4d9530
fixes
teocns Jan 10, 2025
eb99891
save x3
teocns Jan 10, 2025
bcb5ead
Introduce API Layer
teocns Jan 10, 2025
9666f20
v2
teocns Jan 10, 2025
97f0ec1
base api add post
teocns Jan 10, 2025
ab059ed
session tests passing
teocns Jan 10, 2025
3e84459
SessionExporter: make correct use of SessionApiClient
teocns Jan 10, 2025
069d7e7
Merge branch 'feat/session-refactoring' into feat/v0.4
teocns Jan 10, 2025
02c9b64
Merge branch 'main' into feat/v0.4
the-praxs Jan 10, 2025
1025af0
fix for autogen
the-praxs Jan 10, 2025
88ce5d5
Merge branch 'feat/optimal-test-suite' into feat/v0.4
teocns Jan 12, 2025
ca84cd5
move `tests/telemetry` to `tests/unit/telemetry/` to align with #637
teocns Jan 12, 2025
b10deca
Merge branch 'feat/optimal-test-suite' into feat/v0.4
teocns Jan 13, 2025
692e73f
telemetry: add DESIGN.mermaid.md
teocns Jan 13, 2025
e676993
OTEL Log Capture: allow capturing logging and prints from terminal
teocns Jan 14, 2025
bcbe850
Merge: main (Test suite v0.4)
teocns Jan 15, 2025
36c8cbf
design
teocns Jan 15, 2025
65c04fc
Merge branch 'main' into feat/v0.4
teocns Jan 15, 2025
aa09623
ruff
teocns Jan 15, 2025
9ecc8f7
style(pyproject.toml): allow F403 (import *)
teocns Jan 15, 2025
e94a7fb
tests: rename test_event_converter to test_encoders
teocns Jan 15, 2025
ed606e7
proposla
teocns Jan 15, 2025
7bb0162
update readme.md
teocns Jan 15, 2025
04ddbc1
flow.md
teocns Jan 15, 2025
b3e053d
ruff
the-praxs Jan 16, 2025
45814cb
docs: add LogCapture details to README.md
teocns Jan 16, 2025
d9fc35f
agentops.llms.README.md
teocns Jan 16, 2025
ddf8f4d
Add docstrings to LLMTracker
teocns Jan 16, 2025
4b61c56
agentops.Client.record: use Event type hint, since ErrorEvent inherit…
teocns Jan 16, 2025
f9ceabf
refactor(instrumented_provider): improve type hints and accessors
teocns Jan 16, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions agentops/CURRENT_FLOW.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
## 1. Current Architecture Flow

```mermaid
flowchart TB
subgraph Client["Client Singleton"]
Config["Configuration"]
direction TB
Client_API["Client API Layer"]
LLMTracker["LLM Tracker"]
end

subgraph Sessions["Session Management"]
Session["Session Class"]
SessionManager["SessionManager"]
LogCapture["LogCapture"]
SessionAPI["SessionApiClient"]
end

subgraph Events["Event System"]
Event["Base Event"]
LLMEvent["LLMEvent"]
ActionEvent["ActionEvent"]
ToolEvent["ToolEvent"]
ErrorEvent["ErrorEvent"]
end

subgraph Telemetry["Current Telemetry"]
SessionTelemetry["SessionTelemetry"]
OTELTracer["OTEL Tracer"]
SessionExporter["SessionExporter"]
BatchProcessor["BatchSpanProcessor"]
end

subgraph Providers["LLM Providers"]
InstrumentedProvider["InstrumentedProvider"]
OpenAIProvider["OpenAIProvider"]
AnthropicProvider["AnthropicProvider"]
end

%% Client Relationships
Client_API -->|initializes| Session
Client_API -->|configures| LLMTracker
LLMTracker -->|instruments| Providers

%% Session Direct Dependencies
Session -->|creates| SessionManager
Session -->|creates| SessionTelemetry
Session -->|creates| LogCapture
Session -->|owns| SessionAPI

%% Event Flow
InstrumentedProvider -->|creates| LLMEvent
InstrumentedProvider -->|requires| Session
Session -->|records| Event
SessionManager -->|processes| Event
SessionTelemetry -->|converts to spans| Event

%% Telemetry Flow
SessionTelemetry -->|uses| OTELTracer
OTELTracer -->|sends to| BatchProcessor
BatchProcessor -->|exports via| SessionExporter
SessionExporter -->|uses| SessionAPI

%% Problem Areas
style Session fill:#f77,stroke:#333
style InstrumentedProvider fill:#f77,stroke:#333
style SessionTelemetry fill:#f77,stroke:#333
92 changes: 92 additions & 0 deletions agentops/PROPOSED_FLOW.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
```mermaid
flowchart TB
subgraph Client["Client Singleton"]
Config["Configuration"]
direction TB
Client_API["Client API Layer"]
InstrumentationManager["Instrumentation Manager"]
end

subgraph Sessions["Session Management"]
Session["Session Class"]
SessionManager["SessionManager"]
LogCapture["LogCapture"]
SessionAPI["SessionApiClient"]
end

subgraph Events["Event System"]
Event["Base Event"]
LLMEvent["LLMEvent"]
ActionEvent["ActionEvent"]
ToolEvent["ToolEvent"]
ErrorEvent["ErrorEvent"]
end

subgraph Telemetry["Enhanced Telemetry"]
TelemetryManager["TelemetryManager"]
OTELTracer["OTEL Tracer"]

subgraph Exporters["Exporters"]
SessionExporter["SessionExporter"]
OTLPExporter["OTLP Exporter"]
end

subgraph Processors["Processors"]
BatchProcessor["BatchProcessor"]
SamplingProcessor["SamplingProcessor"]
end
end

subgraph Providers["LLM Providers"]
BaseInstrumentation["BaseInstrumentation"]
InstrumentedProvider["InstrumentedProvider"]
OpenAIProvider["OpenAIProvider"]
AnthropicProvider["AnthropicProvider"]
end

subgraph Context["Context Management"]
TraceContext["TraceContext"]
ContextPropagation["ContextPropagation"]
end

%% Client Relationships
Client_API -->|initializes| Session
Client_API -->|configures| InstrumentationManager
InstrumentationManager -->|manages| BaseInstrumentation

%% Session Dependencies
Session -->|creates| SessionManager
Session -->|uses| TelemetryManager
Session -->|creates| LogCapture
Session -->|owns| SessionAPI

%% Event Flow
InstrumentedProvider -->|creates| LLMEvent
InstrumentedProvider -->|requires| Session
Session -->|records| Event
SessionManager -->|processes| Event

%% Telemetry Flow
TelemetryManager -->|manages| OTELTracer
TelemetryManager -->|uses| TraceContext
OTELTracer -->|uses| Processors
Processors -->|send to| Exporters

%% Provider Structure
BaseInstrumentation -->|extends| InstrumentedProvider
InstrumentedProvider -->|implements| OpenAIProvider
InstrumentedProvider -->|implements| AnthropicProvider

%% Context Flow
ContextPropagation -->|enriches| Event
TraceContext -->|propagates to| SessionAPI

%% Highlight New/Changed Components
style InstrumentationManager fill:#90EE90,stroke:#333
style TelemetryManager fill:#90EE90,stroke:#333
style BaseInstrumentation fill:#90EE90,stroke:#333
style TraceContext fill:#90EE90,stroke:#333
style ContextPropagation fill:#90EE90,stroke:#333
style OTLPExporter fill:#90EE90,stroke:#333
style SamplingProcessor fill:#90EE90,stroke:#333
```
77 changes: 77 additions & 0 deletions agentops/api/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
from typing import Optional, Dict, Any
import requests
from requests.adapters import HTTPAdapter
from urllib3.util import Retry

from ..exceptions import ApiServerException


class ApiClient:
"""Base class for API communication with connection pooling"""

_session: Optional[requests.Session] = None

@classmethod
def get_session(cls) -> requests.Session:
"""Get or create the global session with optimized connection pooling"""
if cls._session is None:
cls._session = requests.Session()

# Configure connection pooling
adapter = HTTPAdapter(
pool_connections=15,
pool_maxsize=256,
max_retries=Retry(total=3, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504]),
)

# Mount adapter for both HTTP and HTTPS
cls._session.mount("http://", adapter)
cls._session.mount("https://", adapter)

# Set default headers
cls._session.headers.update(
{
"Connection": "keep-alive",
"Keep-Alive": "timeout=10, max=1000",
"Content-Type": "application/json",
}
)

return cls._session

def __init__(self, endpoint: str):
self.endpoint = endpoint

def _prepare_headers(
self,
api_key: Optional[str] = None,
parent_key: Optional[str] = None,
jwt: Optional[str] = None,
custom_headers: Optional[Dict[str, str]] = None,
) -> Dict[str, str]:
"""Prepare headers for the request"""
headers = {"Content-Type": "application/json; charset=UTF-8", "Accept": "*/*"}

if api_key:
headers["X-Agentops-Api-Key"] = api_key

if parent_key:
headers["X-Agentops-Parent-Key"] = parent_key

if jwt:
headers["Authorization"] = f"Bearer {jwt}"

if custom_headers:
# Don't let custom headers override critical headers
safe_headers = custom_headers.copy()
for protected in ["Authorization", "X-Agentops-Api-Key", "X-Agentops-Parent-Key"]:
safe_headers.pop(protected, None)
headers.update(safe_headers)

return headers

def post(self, path: str, data: Dict[str, Any], headers: Dict[str, str]) -> requests.Response:
"""Make POST request"""
url = f"{self.endpoint}{path}"
session = self.get_session()
return session.post(url, json=data, headers=headers)
84 changes: 84 additions & 0 deletions agentops/api/session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
from typing import Dict, List, Optional, Tuple, Union, Any
from uuid import UUID
import requests

from .base import ApiClient
from ..exceptions import ApiServerException
from ..helpers import safe_serialize
from ..log_config import logger
from ..event import Event


class SessionApiClient(ApiClient):
"""Handles API communication for sessions"""

def __init__(self, endpoint: str, session_id: UUID, api_key: str, jwt: Optional[str] = None):
super().__init__(endpoint)
self.session_id = session_id
self.api_key = api_key
self.jwt = jwt

def create_session(
self, session_data: Dict[str, Any], parent_key: Optional[str] = None
) -> Tuple[bool, Optional[str]]:
"""Create a new session"""
try:
headers = self._prepare_headers(
api_key=self.api_key, parent_key=parent_key, custom_headers={"X-Session-ID": str(self.session_id)}
)

res = self.post("/v2/create_session", {"session": session_data}, headers)
jwt = res.json().get("jwt")
return bool(jwt), jwt

except ApiServerException as e:
logger.error(f"Could not create session - {e}")
return False, None

def update_session(self, session_data: Optional[Dict[str, Any]] = None) -> Optional[Dict[str, Any]]:
"""Update session state"""
try:
headers = self._prepare_headers(
api_key=self.api_key, jwt=self.jwt, custom_headers={"X-Session-ID": str(self.session_id)}
)

res = self.post("/v2/update_session", {"session": session_data or {}}, headers)
return res.json()

except ApiServerException as e:
logger.error(f"Could not update session - {e}")
return None

def create_events(self, events: List[Dict[str, Any]]) -> bool:
"""Send events to API"""
try:
headers = self._prepare_headers(
api_key=self.api_key, jwt=self.jwt, custom_headers={"X-Session-ID": str(self.session_id)}
)

res = self.post("/v2/create_events", {"events": events}, headers)
return res.status_code == 200

except ApiServerException as e:
logger.error(f"Could not create events - {e}")
return False

def create_agent(self, name: str, agent_id: str) -> bool:
"""Create a new agent"""
try:
headers = self._prepare_headers(
api_key=self.api_key, jwt=self.jwt, custom_headers={"X-Session-ID": str(self.session_id)}
)

res = self.post("/v2/create_agent", {"id": agent_id, "name": name}, headers)
return res.status_code == 200

except ApiServerException as e:
logger.error(f"Could not create agent - {e}")
return False

def _post(self, path: str, data: Dict[str, Any], headers: Dict[str, str]) -> requests.Response:
"""Make POST request"""
url = f"{self.endpoint}{path}"
session = self.get_session()
return session.post(url, json=data, headers=headers)
50 changes: 17 additions & 33 deletions agentops/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ def get_default_tags(self) -> List[str]:
"""
return list(self._config.default_tags)

def record(self, event: Union[Event, ErrorEvent]) -> None:
def record(self, event: Event) -> None:
"""
Record an event with the AgentOps service.

Expand All @@ -198,47 +198,31 @@ def start_session(
self,
tags: Optional[List[str]] = None,
inherited_session_id: Optional[str] = None,
) -> Union[Session, None]:
"""
Start a new session for recording events.

Args:
tags (List[str], optional): Tags that can be used for grouping or sorting later.
e.g. ["test_run"].
config: (Configuration, optional): Client configuration object
inherited_session_id (optional, str): assign session id to match existing Session
"""
) -> Optional[Session]:
"""Start a new session"""
if not self.is_initialized:
return

if inherited_session_id is not None:
try:
session_id = UUID(inherited_session_id)
except ValueError:
return logger.warning(f"Invalid session id: {inherited_session_id}")
else:
session_id = uuid4()
return None

session_tags = self._config.default_tags.copy()
if tags is not None:
session_tags.update(tags)
try:
session_id = UUID(inherited_session_id) if inherited_session_id else uuid4()
except ValueError:
return logger.warning(f"Invalid session id: {inherited_session_id}")

default_tags = list(self._config.default_tags) if self._config.default_tags else []
session = Session(
session_id=session_id,
tags=list(session_tags),
host_env=self.host_env,
config=self._config,
tags=tags or default_tags,
host_env=self.host_env,
)

if not session.is_running:
return logger.error("Failed to start session")

if self._pre_init_queue["agents"] and len(self._pre_init_queue["agents"]) > 0:
for agent_args in self._pre_init_queue["agents"]:
session.create_agent(name=agent_args["name"], agent_id=agent_args["agent_id"])
self._pre_init_queue["agents"] = []
if session.is_running:
# Process any queued agents
if self._pre_init_queue["agents"]:
for agent_args in self._pre_init_queue["agents"]:
session.create_agent(name=agent_args["name"], agent_id=agent_args["agent_id"])
self._pre_init_queue["agents"].clear()

self._sessions.append(session)
return session

def end_session(
Expand Down
Loading
Loading