From e11a0a333ad959e0a13d3550bf1fe8cd030fb191 Mon Sep 17 00:00:00 2001 From: Jonathan Chavez <153635462+jjxct@users.noreply.github.com> Date: Tue, 21 Jan 2025 18:19:55 -0500 Subject: [PATCH] fix(llmobs): encode llm objects in utf-8 before sending (#11961) This PR resolves an issue in the Python SDK where non-ascii/utf8 characters being annotated on spans resulted in span payloads being dropped due to encoding errors. In #11330 we previously added the `ensure_ascii=False` option to our `safe_json()` helper's use of `json.dumps(...)` in order to keep non-ascii characters from being encoded multiple times into nonsense (as we were calling `safe_json()` multiple nested times while building the span event from the span tags. However this resulted in issues where non-latin1 characters (which is a subset of utf-8 and apparently the encoding scheme HTTP library relies on, which we in turn rely on to submit payloads) broke the encoding at payload submission time. To fix this, we remove the `ensure_ascii=False` option at the final write time. Also note that after #11543 we mostly centralized all of the times a span event is encoded, which is at write time and when encoding the span's input/output value fields (which can be a json dictionary format). Since we need to provide valid json formatting for the IO fields (which leads to a prettier UI display), we still need to call `json.dumps(ensure_ascii=False)` to avoid the same problem as fixed by #11330, i.e. keep the non-ascii characters unencoded until at the very end (i.e. write time) This PR also adds minor test fixtures mocking out the LLMObs back end intake to make assertions on the payloads we should be submitting to LLMObs, since previous tests were all relying on the span events prior to encoding/submission and weren't able to cover this scenario. ## Checklist - [x] PR author has checked that all the criteria below are met - The PR description includes an overview of the change - The PR description articulates the motivation for the change - The change includes tests OR the PR description describes a testing strategy - The PR description notes risks associated with the change, if any - Newly-added code is easy to change - The change follows the [library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) - The change includes or references documentation updates if necessary - Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) ## Reviewer Checklist - [ ] Reviewer has checked that all the criteria below are met - Title is accurate - All changes are related to the pull request's stated goal - Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes - Testing strategy adequately addresses listed risks - Newly-added code is easy to change - Release note makes sense to a user of the library - If necessary, author has acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment - Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) --------- Co-authored-by: Kyle Verhoog Co-authored-by: Yun Kim Co-authored-by: Yun Kim <35776586+Yun-Kim@users.noreply.github.com> --- ddtrace/llmobs/_llmobs.py | 6 +- ddtrace/llmobs/_utils.py | 4 +- ddtrace/llmobs/_writer.py | 6 +- ...-json-encoding-ascii-f8da77867a910de1.yaml | 4 + tests/contrib/langgraph/conftest.py | 7 +- tests/llmobs/conftest.py | 79 +++++++++++++++++-- tests/llmobs/test_llmobs.py | 27 +++++++ tests/llmobs/test_llmobs_span_agent_writer.py | 2 +- .../test_llmobs_span_agentless_writer.py | 21 ++--- 9 files changed, 133 insertions(+), 23 deletions(-) create mode 100644 releasenotes/notes/fix-llmobs-json-encoding-ascii-f8da77867a910de1.yaml diff --git a/ddtrace/llmobs/_llmobs.py b/ddtrace/llmobs/_llmobs.py index 66339cbb751..50a8e9d4255 100644 --- a/ddtrace/llmobs/_llmobs.py +++ b/ddtrace/llmobs/_llmobs.py @@ -31,6 +31,7 @@ from ddtrace.internal.utils.formats import asbool from ddtrace.internal.utils.formats import parse_tags_str from ddtrace.llmobs import _constants as constants +from ddtrace.llmobs._constants import AGENTLESS_BASE_URL from ddtrace.llmobs._constants import ANNOTATIONS_CONTEXT_ID from ddtrace.llmobs._constants import INPUT_DOCUMENTS from ddtrace.llmobs._constants import INPUT_MESSAGES @@ -91,6 +92,7 @@ def __init__(self, tracer=None): self.tracer = tracer or ddtrace.tracer self._llmobs_span_writer = LLMObsSpanWriter( is_agentless=config._llmobs_agentless_enabled, + agentless_url="%s.%s" % (AGENTLESS_BASE_URL, config._dd_site), interval=float(os.getenv("_DD_LLMOBS_WRITER_INTERVAL", 1.0)), timeout=float(os.getenv("_DD_LLMOBS_WRITER_TIMEOUT", 5.0)), ) @@ -152,13 +154,13 @@ def _llmobs_span_event(cls, span: Span) -> Tuple[Dict[str, Any], bool]: if span_kind == "llm" and span._get_ctx_item(INPUT_MESSAGES) is not None: meta["input"]["messages"] = span._get_ctx_item(INPUT_MESSAGES) if span._get_ctx_item(INPUT_VALUE) is not None: - meta["input"]["value"] = safe_json(span._get_ctx_item(INPUT_VALUE)) + meta["input"]["value"] = safe_json(span._get_ctx_item(INPUT_VALUE), ensure_ascii=False) if span_kind == "llm" and span._get_ctx_item(OUTPUT_MESSAGES) is not None: meta["output"]["messages"] = span._get_ctx_item(OUTPUT_MESSAGES) if span_kind == "embedding" and span._get_ctx_item(INPUT_DOCUMENTS) is not None: meta["input"]["documents"] = span._get_ctx_item(INPUT_DOCUMENTS) if span._get_ctx_item(OUTPUT_VALUE) is not None: - meta["output"]["value"] = safe_json(span._get_ctx_item(OUTPUT_VALUE)) + meta["output"]["value"] = safe_json(span._get_ctx_item(OUTPUT_VALUE), ensure_ascii=False) if span_kind == "retrieval" and span._get_ctx_item(OUTPUT_DOCUMENTS) is not None: meta["output"]["documents"] = span._get_ctx_item(OUTPUT_DOCUMENTS) if span._get_ctx_item(INPUT_PROMPT) is not None: diff --git a/ddtrace/llmobs/_utils.py b/ddtrace/llmobs/_utils.py index 1799eb5548d..b30ef4c969e 100644 --- a/ddtrace/llmobs/_utils.py +++ b/ddtrace/llmobs/_utils.py @@ -185,10 +185,10 @@ def _unserializable_default_repr(obj): return default_repr -def safe_json(obj): +def safe_json(obj, ensure_ascii=True): if isinstance(obj, str): return obj try: - return json.dumps(obj, ensure_ascii=False, skipkeys=True, default=_unserializable_default_repr) + return json.dumps(obj, ensure_ascii=ensure_ascii, skipkeys=True, default=_unserializable_default_repr) except Exception: log.error("Failed to serialize object to JSON.", exc_info=True) diff --git a/ddtrace/llmobs/_writer.py b/ddtrace/llmobs/_writer.py index 5880019d67f..c172c9adba9 100644 --- a/ddtrace/llmobs/_writer.py +++ b/ddtrace/llmobs/_writer.py @@ -22,7 +22,6 @@ from ddtrace.internal.periodic import PeriodicService from ddtrace.internal.writer import HTTPWriter from ddtrace.internal.writer import WriterClientBase -from ddtrace.llmobs._constants import AGENTLESS_BASE_URL from ddtrace.llmobs._constants import AGENTLESS_ENDPOINT from ddtrace.llmobs._constants import DROPPED_IO_COLLECTION_ERROR from ddtrace.llmobs._constants import DROPPED_VALUE_TEXT @@ -243,6 +242,7 @@ def __init__( interval: float, timeout: float, is_agentless: bool = True, + agentless_url: str = "", dogstatsd=None, sync_mode=False, reuse_connections=None, @@ -250,8 +250,10 @@ def __init__( headers = {} clients = [] # type: List[WriterClientBase] if is_agentless: + if not agentless_url: + raise ValueError("agentless_url is required for agentless mode") clients.append(LLMObsAgentlessEventClient()) - intake_url = "%s.%s" % (AGENTLESS_BASE_URL, config._dd_site) + intake_url = agentless_url headers["DD-API-KEY"] = config._dd_api_key else: clients.append(LLMObsProxiedEventClient()) diff --git a/releasenotes/notes/fix-llmobs-json-encoding-ascii-f8da77867a910de1.yaml b/releasenotes/notes/fix-llmobs-json-encoding-ascii-f8da77867a910de1.yaml new file mode 100644 index 00000000000..352ffefc369 --- /dev/null +++ b/releasenotes/notes/fix-llmobs-json-encoding-ascii-f8da77867a910de1.yaml @@ -0,0 +1,4 @@ +--- +fixes: + - | + LLM Observability: This fix resolves an issue where annotating a span with non latin-1 (but valid utf-8) input/output values resulted in encoding errors. diff --git a/tests/contrib/langgraph/conftest.py b/tests/contrib/langgraph/conftest.py index 19d01c018aa..a521ff367fb 100644 --- a/tests/contrib/langgraph/conftest.py +++ b/tests/contrib/langgraph/conftest.py @@ -11,11 +11,15 @@ from ddtrace.contrib.internal.langgraph.patch import patch from ddtrace.contrib.internal.langgraph.patch import unpatch from ddtrace.llmobs import LLMObs as llmobs_service +from ddtrace.llmobs._constants import AGENTLESS_BASE_URL from ddtrace.llmobs._writer import LLMObsSpanWriter from tests.utils import DummyTracer from tests.utils import override_global_config +DATADOG_SITE = "datad0g.com" + + @pytest.fixture def mock_tracer(): yield DummyTracer() @@ -48,7 +52,8 @@ def enqueue(self, event): @pytest.fixture def llmobs_span_writer(): - yield TestLLMObsSpanWriter(interval=1.0, timeout=1.0) + agentless_url = "{}.{}".format(AGENTLESS_BASE_URL, DATADOG_SITE) + yield TestLLMObsSpanWriter(is_agentless=True, agentless_url=agentless_url, interval=1.0, timeout=1.0) @pytest.fixture diff --git a/tests/llmobs/conftest.py b/tests/llmobs/conftest.py index 61a028e5caf..e47595f0973 100644 --- a/tests/llmobs/conftest.py +++ b/tests/llmobs/conftest.py @@ -1,4 +1,9 @@ +from http.server import BaseHTTPRequestHandler +from http.server import HTTPServer +import json import os +import threading +import time import mock import pytest @@ -195,15 +200,79 @@ def llmobs_env(): class TestLLMObsSpanWriter(LLMObsSpanWriter): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.events = [] + self._events = [] def enqueue(self, event): - self.events.append(event) + self._events.append(event) + super().enqueue(event) + + def events(self): + return self._events @pytest.fixture -def llmobs_span_writer(): - yield TestLLMObsSpanWriter(interval=1.0, timeout=1.0) +def llmobs_span_writer(_llmobs_backend): + url, _ = _llmobs_backend + sw = TestLLMObsSpanWriter(interval=1.0, timeout=1.0, agentless_url=url) + sw._headers["DD-API-KEY"] = "" + yield sw + + +class LLMObsServer(BaseHTTPRequestHandler): + """A mock server for the LLMObs backend used to capture the requests made by the client. + + Python's HTTPRequestHandler is a bit weird and uses a class rather than an instance + for running an HTTP server so the requests are stored in a class variable and reset in the pytest fixture. + """ + + requests = [] + + def __init__(self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + + def do_POST(self) -> None: + content_length = int(self.headers["Content-Length"]) + body = self.rfile.read(content_length).decode("utf-8") + self.requests.append({"path": self.path, "headers": dict(self.headers), "body": body}) + self.send_response(200) + self.end_headers() + self.wfile.write(b"OK") + + +@pytest.fixture +def _llmobs_backend(): + LLMObsServer.requests = [] + # Create and start the HTTP server + server = HTTPServer(("localhost", 0), LLMObsServer) + server_thread = threading.Thread(target=server.serve_forever) + server_thread.daemon = True + server_thread.start() + + # Provide the server details to the test + server_address = f"http://{server.server_address[0]}:{server.server_address[1]}" + + yield server_address, LLMObsServer.requests + + # Stop the server after the test + server.shutdown() + server.server_close() + + +@pytest.fixture +def llmobs_backend(_llmobs_backend): + _, reqs = _llmobs_backend + + class _LLMObsBackend: + def wait_for_num_events(self, num, attempts=1000): + for _ in range(attempts): + if len(reqs) == num: + return [json.loads(r["body"]) for r in reqs] + # time.sleep will yield the GIL so the server can process the request + time.sleep(0.001) + else: + raise TimeoutError(f"Expected {num} events, got {len(reqs)}") + + return _LLMObsBackend() @pytest.fixture @@ -231,4 +300,4 @@ def llmobs( @pytest.fixture def llmobs_events(llmobs, llmobs_span_writer): - return llmobs_span_writer.events + return llmobs_span_writer.events() diff --git a/tests/llmobs/test_llmobs.py b/tests/llmobs/test_llmobs.py index 6cf19fc3e2c..004b77b5764 100644 --- a/tests/llmobs/test_llmobs.py +++ b/tests/llmobs/test_llmobs.py @@ -245,3 +245,30 @@ def test_only_generate_span_events_from_llmobs_spans(tracer, llmobs_events): assert len(llmobs_events) == 2 assert llmobs_events[1] == _expected_llmobs_llm_span_event(root_span, "llm") assert llmobs_events[0] == expected_grandchild_llmobs_span + + +def test_utf_non_ascii_io(llmobs, llmobs_backend): + with llmobs.workflow() as workflow_span: + with llmobs.llm(model_name="gpt-3.5-turbo-0125") as llm_span: + llmobs.annotate(llm_span, input_data="안녕, 지금 몇 시야?") + llmobs.annotate(workflow_span, input_data="안녕, 지금 몇 시야?") + events = llmobs_backend.wait_for_num_events(num=1) + assert len(events) == 1 + assert events[0]["spans"][0]["meta"]["input"]["messages"][0]["content"] == "안녕, 지금 몇 시야?" + assert events[0]["spans"][1]["meta"]["input"]["value"] == "안녕, 지금 몇 시야?" + + +def test_non_utf8_inputs_outputs(llmobs, llmobs_backend): + """Test that latin1 encoded inputs and outputs are correctly decoded.""" + with llmobs.llm(model_name="gpt-3.5-turbo-0125") as span: + llmobs.annotate( + span, + input_data="The first Super Bowl (aka First AFL–NFL World Championship Game), was played in 1967.", + ) + + events = llmobs_backend.wait_for_num_events(num=1) + assert len(events) == 1 + assert ( + events[0]["spans"][0]["meta"]["input"]["messages"][0]["content"] + == "The first Super Bowl (aka First AFL–NFL World Championship Game), was played in 1967." + ) diff --git a/tests/llmobs/test_llmobs_span_agent_writer.py b/tests/llmobs/test_llmobs_span_agent_writer.py index d16bb9f0e2c..55f0a56e4d5 100644 --- a/tests/llmobs/test_llmobs_span_agent_writer.py +++ b/tests/llmobs/test_llmobs_span_agent_writer.py @@ -50,7 +50,7 @@ def test_flush_queue_when_event_cause_queue_to_exceed_payload_limit( def test_truncating_oversized_events(mock_writer_logs, mock_http_writer_send_payload_response): - llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, interval=1000, timeout=1) + llmobs_span_writer = LLMObsSpanWriter(is_agentless=False, interval=1000, timeout=1) llmobs_span_writer.enqueue(_oversized_llm_event()) llmobs_span_writer.enqueue(_oversized_retrieval_event()) llmobs_span_writer.enqueue(_oversized_workflow_event()) diff --git a/tests/llmobs/test_llmobs_span_agentless_writer.py b/tests/llmobs/test_llmobs_span_agentless_writer.py index cac0d926a74..8a1a0697752 100644 --- a/tests/llmobs/test_llmobs_span_agentless_writer.py +++ b/tests/llmobs/test_llmobs_span_agentless_writer.py @@ -20,14 +20,14 @@ def test_writer_start(mock_writer_logs): with override_global_config(dict(_dd_api_key="foobar.baz", _dd_site=DATADOG_SITE)): - llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, interval=1000, timeout=1) + llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, agentless_url=INTAKE_URL, interval=1000, timeout=1) llmobs_span_writer.start() mock_writer_logs.debug.assert_has_calls([mock.call("started %r to %r", "LLMObsSpanWriter", INTAKE_URL)]) def test_buffer_limit(mock_writer_logs, mock_http_writer_send_payload_response): with override_global_config(dict(_dd_api_key="foobar.baz", _dd_site=DATADOG_SITE)): - llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, interval=1000, timeout=1) + llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, agentless_url=INTAKE_URL, interval=1000, timeout=1) for _ in range(1001): llmobs_span_writer.enqueue({}) mock_writer_logs.warning.assert_called_with( @@ -39,7 +39,7 @@ def test_flush_queue_when_event_cause_queue_to_exceed_payload_limit( mock_writer_logs, mock_http_writer_send_payload_response ): with override_global_config(dict(_dd_api_key="foobar.baz", _dd_site=DATADOG_SITE)): - llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, interval=1000, timeout=1) + llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, agentless_url=INTAKE_URL, interval=1000, timeout=1) llmobs_span_writer.enqueue(_large_event()) llmobs_span_writer.enqueue(_large_event()) llmobs_span_writer.enqueue(_large_event()) @@ -56,7 +56,7 @@ def test_flush_queue_when_event_cause_queue_to_exceed_payload_limit( def test_truncating_oversized_events(mock_writer_logs, mock_http_writer_send_payload_response): with override_global_config(dict(_dd_api_key="foobar.baz", _dd_site=DATADOG_SITE)): - llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, interval=1000, timeout=1) + llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, agentless_url=INTAKE_URL, interval=1000, timeout=1) llmobs_span_writer.enqueue(_oversized_llm_event()) llmobs_span_writer.enqueue(_oversized_retrieval_event()) llmobs_span_writer.enqueue(_oversized_workflow_event()) @@ -77,7 +77,7 @@ def test_truncating_oversized_events(mock_writer_logs, mock_http_writer_send_pay def test_send_completion_event(mock_writer_logs, mock_http_writer_send_payload_response): with override_global_config(dict(_dd_site=DATADOG_SITE, _dd_api_key="foobar.baz")): - llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, interval=1, timeout=1) + llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, agentless_url=INTAKE_URL, interval=1, timeout=1) llmobs_span_writer.start() llmobs_span_writer.enqueue(_completion_event()) llmobs_span_writer.periodic() @@ -86,7 +86,7 @@ def test_send_completion_event(mock_writer_logs, mock_http_writer_send_payload_r def test_send_chat_completion_event(mock_writer_logs, mock_http_writer_send_payload_response): with override_global_config(dict(_dd_site=DATADOG_SITE, _dd_api_key="foobar.baz")): - llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, interval=1, timeout=1) + llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, agentless_url=INTAKE_URL, interval=1, timeout=1) llmobs_span_writer.start() llmobs_span_writer.enqueue(_chat_completion_event()) llmobs_span_writer.periodic() @@ -96,7 +96,7 @@ def test_send_chat_completion_event(mock_writer_logs, mock_http_writer_send_payl @mock.patch("ddtrace.internal.writer.writer.log") def test_send_completion_bad_api_key(mock_http_writer_logs, mock_http_writer_put_response_forbidden): with override_global_config(dict(_dd_site=DATADOG_SITE, _dd_api_key="")): - llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, interval=1, timeout=1) + llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, agentless_url=INTAKE_URL, interval=1, timeout=1) llmobs_span_writer.start() llmobs_span_writer.enqueue(_completion_event()) llmobs_span_writer.periodic() @@ -110,7 +110,7 @@ def test_send_completion_bad_api_key(mock_http_writer_logs, mock_http_writer_put def test_send_timed_events(mock_writer_logs, mock_http_writer_send_payload_response): with override_global_config(dict(_dd_site=DATADOG_SITE, _dd_api_key="foobar.baz")): - llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, interval=0.01, timeout=1) + llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, agentless_url=INTAKE_URL, interval=0.01, timeout=1) llmobs_span_writer.start() mock_writer_logs.reset_mock() @@ -125,7 +125,7 @@ def test_send_timed_events(mock_writer_logs, mock_http_writer_send_payload_respo def test_send_multiple_events(mock_writer_logs, mock_http_writer_send_payload_response): with override_global_config(dict(_dd_site=DATADOG_SITE, _dd_api_key="foobar.baz")): - llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, interval=0.01, timeout=1) + llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, agentless_url=INTAKE_URL, interval=0.01, timeout=1) llmobs_span_writer.start() mock_writer_logs.reset_mock() @@ -156,6 +156,7 @@ def test_send_on_exit(run_python_code_in_subprocess): from ddtrace.internal.utils.http import Response from ddtrace.llmobs._writer import LLMObsSpanWriter +from tests.llmobs.test_llmobs_span_agentless_writer import INTAKE_URL from tests.llmobs.test_llmobs_span_agentless_writer import _completion_event with mock.patch( @@ -165,7 +166,7 @@ def test_send_on_exit(run_python_code_in_subprocess): body="{}", ), ): - llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, interval=0.01, timeout=1) + llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, agentless_url=INTAKE_URL, interval=0.01, timeout=1) llmobs_span_writer.start() llmobs_span_writer.enqueue(_completion_event()) """,