Skip to content

Commit

Permalink
Merge pull request #149 from zxiong/pub-otel-pub-task
Browse files Browse the repository at this point in the history
Add instrument tracing
  • Loading branch information
zxiong authored Dec 6, 2023
2 parents e5709dd + ea0eeee commit 3ba55b8
Show file tree
Hide file tree
Showing 9 changed files with 492 additions and 0 deletions.
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Publishing tools project family.
projects
devguide
hooks
tracing

Introduction
------------
Expand Down
144 changes: 144 additions & 0 deletions docs/tracing.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
.. _tracing:

Tracing
=======

.. contents::
:depth: 3


Overview
--------

It provides an instrument tracing wrapper function to use to instrument functions manually in pubtools-* projects.


Usage
.....

Set environment variables
~~~~~~~~~~~~~~~~~~~~~~~~~

Following environment variables are used in the module:

- ``OTEL_TRACING``: set ``true`` to enable instrument tracing, otherwise tracing is disabled.
- ``OTEL_SERVICE_NAME``: required, set the value of the service.name resource attribute. It's
expected to be unique within the same namespace.


OTEL Exporter
~~~~~~~~~~~~~

In order to visualize and analyze telemetry, an exporter is required to export tracing data to
a backend, e.g: `jaeger <https://www.jaegertracing.io/>`_. As part of OpenTelemetry Python you
will find many exporters being available. Which exporter should be used depends on usage scenarios, e.g:
`ConsoleSpanExporter <https://opentelemetry.io/docs/instrumentation/python/exporters/#console-exporter/>`_
is useful for development and debugging tasks, while
`OTLPSpanExporter <https://opentelemetry.io/docs/instrumentation/python/exporters/#otlp-endpoint-or-collector/>`_
can be more suitable on production environment. So choosing different exporters for different scenarios is expected.

In order to have a exporter you expected, the hook :meth:`otel_exporter` is needed to be implemented,
otherwise `ConsoleSpanExporter <https://opentelemetry.io/docs/instrumentation/python/exporters/#console-exporter/>`_
will be used.

Instrument tracing for functions
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

**Basic instrument tracing for a function**

.. code-block:: python
from pubtools.tracing import get_trace_wrapper
# Initialize a trace provider, batch span processor and exporter.
tw = get_trace_wrapper()
# Create a span for the function foo.
@tw.instrument_func(span_name="foo_span", args_to_attr=True)
def foo(p1="p1"):
pass
...
The function foo will be instrumented, the span name is ``foo_span``, and the input parameters
``p1="p1"`` are added in the span attributes.

The input parameter ``span_name`` and ``args_to_attr`` are optional, if ``span_name`` is not
specified, the span name will use the function's name.

**Instrument a function with carrier**

Imaging the case that trace context is propagated cross application systems. A carrier which carries
trace context along with a call is passed to from upstream service to downstream service, then
downstream service will extract trace context from the carrier.

The wrapper function is able to extract trace context from the carrier if it's provided. For example:

.. code-block:: python
from pubtools.tracing import get_trace_wrapper
# carrier={'traceparent': '00-355989206d66228f21ff34634b77ae1a-97efa33ebed5d06c-01',...}
tw = get_trace_wrapper()
@tw.instrument_func(carrier=carrier):
def foo():
pass
...
The span "foo" will appear in the trace extracted from ``carrier`` and be as a child span of
the caller span.

**Instrument functions with environment variables**

Trace context can be extracted from environment variables.

.. code-block:: python
from pubtools.tracing import get_trace_wrapper
# 'traceparent' environment variable is set.
# os.environ["traceparent"] = "00-355989206d66228f21ff34634b77ae1a-97efa33ebed5d06c-01"
tw = get_trace_wrapper()
@tw.instrument_func():
def foo():
pass
...
It's similar as extracting trace context from the function parameter carrier.

It's useful in multiple threads scenario. As opentelemetry-python library uses
`contextvars <https://docs.python.org/3/library/contextvars.html>`_ under the hood, trace context
can not be passed across threads. It provides a solution to implement context pass in this case,
for example:

.. code-block:: python
from pubtools.tracing import get_trace_wrapper
tw = get_trace_wrapper()
@tw.instrument_func(span_name="sub_thread_span")
def sub_thread():
return 1
@tw.instrument_func(span_name="main_thread_span")
def main_thread(param1, param2):
with ThreadPoolExecutor(max_workers=2) as executor:
future_res = [executor.submit(sub_thread) for i in range(1, 3)]
as_completed(future_res)
...
The span ``sub_thread_span`` and ``main_thread_span`` are in the same trace and the ``sub_thread_span``
is the child of ``main_thread_span`` span.

API reference
-------------

.. autofunction:: pubtools.tracing.get_trace_wrapper

.. autofunction:: pubtools._impl.tracing.TracingWrapper.instrument_func

.. autofunction:: pubtools._impl.tracing.TracingWrapper.force_flush
12 changes: 12 additions & 0 deletions pubtools/_impl/pluggy.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,16 @@ def get_cert_key_paths(server_url):
"""


@hookspec(firstresult=True)
def otel_exporter():
"""Return an OTEL exporter, used by OTEL instrumentation.
If OTEL tracing is enabled and this hook is not implemented, a default
`ConsoleSpanExporter` will be used.
:return: Instance of SpanExporter.
:rtype: opentelemetry.sdk.trace.export.SpanExporter
"""


pm.add_hookspecs(sys.modules[__name__])
144 changes: 144 additions & 0 deletions pubtools/_impl/tracing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
"""Instrument functions.
Usage:
@instrument_func()
def func():
pass
"""
import functools
import logging
import os
import threading

from opentelemetry import baggage, context, trace
from opentelemetry.baggage.propagation import W3CBaggagePropagator
from opentelemetry.propagate import set_global_textmap
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.trace import Status, StatusCode
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator

from pubtools.pluggy import pm

propagator = TraceContextTextMapPropagator()
baggage_propagator = W3CBaggagePropagator()
TRACE_WRAPPER = None
log = logging.getLogger(__name__)


def get_trace_wrapper():
"""return a global trace wrapper instance"""
global TRACE_WRAPPER
if TRACE_WRAPPER is None:
TRACE_WRAPPER = TracingWrapper()
return TRACE_WRAPPER


class TracingWrapper:
"""Wrapper class to initialize opentelemetry instrumentation and provide a helper function
for instrumenting a function"""

def __init__(self):
self._processor = None
self._provider = None
self._enabled_trace = os.getenv("OTEL_TRACING", "").lower() == "true"
if self._enabled_trace:
log.info("Creating TracingWrapper instance")
exporter = pm.hook.otel_exporter() or ConsoleSpanExporter()
self._processor = BatchSpanProcessor(exporter)
self._provider = TracerProvider(
resource=Resource.create({SERVICE_NAME: os.getenv("OTEL_SERVICE_NAME")})
)
self._provider.add_span_processor(self._processor)
trace.set_tracer_provider(self._provider)
set_global_textmap(propagator)

def instrument_func(self, span_name=None, carrier=None, args_to_attr=False):
"""Instrument tracing for a function.
Args:
span_name: str
Span name. It's assigned with the function's name by default if it's omitted.
carrier: dict
A dictionary which holds trace context. Trace context will be extracted from it if
if it's provided.
args_to_attr: boolean
Add function parameters into span attributes or not.
Returns:
The decorated function
"""
tracer = trace.get_tracer(__name__)

def _instrument_func(func):
@functools.wraps(func)
def wrap(*args, **kwargs):
attributes = {
"function_name": func.__qualname__,
}
if args_to_attr:
attributes["args"] = ", ".join(map(str, args))
attributes["kwargs"] = ", ".join(
"{}={}".format(k, v) for k, v in kwargs.items()
)

if not self._enabled_trace:
return func(*args, **kwargs)

trace_ctx = None
token = None
if not context.get_current():
# Extract trace context from carrier.
if carrier:
trace_ctx = propagator.extract(carrier=carrier)
trace_ctx = baggage_propagator.extract(
carrier=carrier, context=trace_ctx
)
else:
# Try to extract trace context from environment variables.
trace_ctx = propagator.extract(carrier=os.environ)
trace_ctx = baggage_propagator.extract(
carrier=os.environ, context=trace_ctx
)

if trace_ctx:
token = context.attach(trace_ctx)

with tracer.start_as_current_span(
name=span_name or func.__qualname__,
attributes=attributes,
) as span:
try:
# Put trace context in environment variables in the main thread.
if threading.current_thread() is threading.main_thread():
propagator.inject(os.environ)
baggage_propagator.inject(os.environ)

result = func(*args, **kwargs)
except Exception as exc:
span.set_status(Status(StatusCode.ERROR))
span.record_exception(exc)
raise
finally:
# Add baggage data into span attributes
span.set_attributes(baggage.get_all())
if token:
context.detach(token)
return result

return wrap

return _instrument_func

def force_flush(self):
"""Flush trace data into OTEL collectors"""
if self._processor:
self._processor.force_flush()
log.info("Flush trace data into OTEL collectors")

@property
def provider(self):
"""Trace provider"""
return self._provider
3 changes: 3 additions & 0 deletions pubtools/tracing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from pubtools._impl.tracing import get_trace_wrapper

__all__ = ["get_trace_wrapper"]
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
pluggy
setuptools
opentelemetry-api
opentelemetry-sdk
31 changes: 31 additions & 0 deletions tests/tracing/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import pytest
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult

from pubtools.pluggy import hookimpl, pm


class FakeSpanExporterImp(SpanExporter):
def __init__(self):
self._spans = None

def export(self, spans) -> SpanExportResult:
self._spans = spans
return SpanExportResult.SUCCESS

def get_spans(self):
return self._spans


class FakeSpanExporter:
@hookimpl
def otel_exporter(self):
return FakeSpanExporterImp()


@pytest.fixture(scope="function")
def fake_span_exporter():
"""Installs a hookimpl for span exporter."""
span_exporter = FakeSpanExporter()
pm.register(span_exporter)
yield
pm.unregister(span_exporter)
Loading

0 comments on commit 3ba55b8

Please sign in to comment.