Skip to content

Commit

Permalink
Implement tower layer to propagate trace context (#100)
Browse files Browse the repository at this point in the history
* Implement tower layer to propagate trace context

Also return the trace id as part of the response

Add trace context to outgoing request going through the API client

* Move OtelTraceLayer into otel_util

* Add comments to Client

* Replace eprintln! with trace!

Remove obsolete instrument macros

* Resolve clippy
  • Loading branch information
hatchan authored Jul 30, 2024
1 parent 4710902 commit afb36b1
Show file tree
Hide file tree
Showing 8 changed files with 195 additions and 6 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions fpx/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ tokio-tungstenite = { version = "0.21", features = [
] } # This should be kept the same as whatever Axum has
toml = { version = "0.8" }
tonic = { version = "0.12" }
tower = { version = "0.4" }
tracing = { version = "0.1" }
tracing-opentelemetry = { version = "0.25" }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
Expand Down
1 change: 1 addition & 0 deletions fpx/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,5 @@ fn api_router() -> axum::Router<ApiState> {
get(handlers::spans::span_list_handler),
)
.fallback(StatusCode::NOT_FOUND)
.layer(crate::otel_util::OtelTraceLayer::default())
}
25 changes: 24 additions & 1 deletion fpx/src/api/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@ use super::errors::ApiClientError;
use super::handlers::spans::SpanGetError;
use super::handlers::RequestGetError;
use crate::api::models;
use crate::otel_util::HeaderMapInjector;
use anyhow::Result;
use http::Method;
use http::{HeaderMap, Method};
use opentelemetry::propagation::TextMapPropagator;
use opentelemetry_sdk::propagation::TraceContextPropagator;
use tracing::trace;
use tracing_opentelemetry::OpenTelemetrySpanExt;
use url::Url;

pub struct ApiClient {
Expand Down Expand Up @@ -57,6 +61,22 @@ impl ApiClient {

let req = self.client.request(method, u);

// Take the current otel context, and inject those details into the
// Request using the TraceContext format.
let req = {
let mut headers = HeaderMap::new();
let propagator = TraceContextPropagator::new();

let context = tracing::Span::current().context();
let mut header_injector = HeaderMapInjector(&mut headers);
propagator.inject_context(&context, &mut header_injector);

req.headers(headers)
};

// TODO: Create new otel span (SpanKind::Client) and add relevant client
// attributes to it.

// Make request
let response = req.send().await?;

Expand All @@ -67,6 +87,9 @@ impl ApiClient {
// Read the entire response into a local buffer.
let body = response.bytes().await?;

// TODO: Mark the span status as Err if we are unable to parse the
// response.

// Try to parse the result as T.
match serde_json::from_slice::<T>(&body) {
Ok(result) => Ok(result),
Expand Down
2 changes: 1 addition & 1 deletion fpx/src/canned_requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ impl CannedRequest {
.read()
.await
.get(name)
.ok_or_else(|| CannedRequestListError::NotFound)?
.ok_or(CannedRequestListError::NotFound)?
.clone();

request.name = name.to_string();
Expand Down
1 change: 1 addition & 0 deletions fpx/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ pub mod data;
pub mod events;
pub mod grpc;
pub mod inspector;
mod otel_util;
mod service;
25 changes: 21 additions & 4 deletions fpx/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use opentelemetry_sdk::trace::Config;
use opentelemetry_sdk::Resource;
use std::env;
use std::path::Path;
use tracing::trace;
use tracing_opentelemetry::OpenTelemetryLayer;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
Expand All @@ -20,15 +21,24 @@ pub mod data;
mod events;
mod grpc;
mod inspector;
mod otel_util;
mod service;

#[tokio::main]
async fn main() -> Result<()> {
let args = commands::Args::parse();
let should_shutdown_tracing = args.enable_tracing;

setup_tracing(&args)?;

commands::handle_command(args).await
let result = commands::handle_command(args).await;

if should_shutdown_tracing {
trace!("Shutting down tracers");
shutdown_tracing();
}

result
}

fn setup_tracing(args: &commands::Args) -> Result<()> {
Expand All @@ -41,7 +51,7 @@ fn setup_tracing(args: &commands::Args) -> Result<()> {

let trace_layer = if args.enable_tracing {
// This tracer is responsible for sending the actual traces.
let tracer = opentelemetry_otlp::new_pipeline()
let tracer_provider = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter()
Expand All @@ -53,8 +63,11 @@ fn setup_tracing(args: &commands::Args) -> Result<()> {
.with_resource(Resource::new(vec![KeyValue::new("service.name", "fpx")])),
)
.install_batch(runtime::Tokio)
.context("unable to install tracer")?
.tracer("fpx");
.context("unable to install tracer")?;

opentelemetry::global::set_tracer_provider(tracer_provider.clone());

let tracer = tracer_provider.tracer("fpx");

// This layer will take the traces from the `tracing` crate and send
// them to the tracer specified above.
Expand All @@ -73,6 +86,10 @@ fn setup_tracing(args: &commands::Args) -> Result<()> {
Ok(())
}

fn shutdown_tracing() {
opentelemetry::global::shutdown_tracer_provider();
}

/// Ensure that all the necessary directories are created for fpx.
///
/// This includes the top level fpx working directory and many of the
Expand Down
145 changes: 145 additions & 0 deletions fpx/src/otel_util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
use axum::extract::{MatchedPath, Request};
use axum::response::Response;
use futures_util::future::BoxFuture;
use http::{HeaderMap, HeaderName, HeaderValue};
use opentelemetry::propagation::{Extractor, Injector, TextMapPropagator};
use opentelemetry_sdk::propagation::TraceContextPropagator;
use std::str::FromStr;
use tower::Layer;
use tracing::{field, info_span, Instrument, Span};
use tracing_opentelemetry::OpenTelemetrySpanExt;

/// The [`HeaderMapInjector`] provides a implementation for the otel
/// [`Injector`] trait on the [`HeaderMap`] type.
///
/// This allows a otel propagator to inject the span context into a
/// [`HeaderMap`], ie into a outgoing http response. Invalid keys or values,
/// according to http header standards, are silently ignored.
pub struct HeaderMapInjector<'a>(pub &'a mut HeaderMap);

impl<'a> Injector for HeaderMapInjector<'a> {
fn set(&mut self, key: &str, val: String) {
if let Ok(key) = HeaderName::from_str(key) {
if let Ok(val) = HeaderValue::from_str(&val) {
self.0.insert(key, val);
}
}
}
}

/// The [`HeaderMapExtractor`] provides a implementation for the otel
/// [`Extractor`] trait on the [`HeaderMap`] type.
///
/// This allows a otel propagator to extract a span context from a [`HeaderMap`],
/// ie from a incoming http request.
pub struct HeaderMapExtractor<'a>(pub &'a HeaderMap);

impl<'a> Extractor for HeaderMapExtractor<'a> {
fn get(&self, key: &str) -> Option<&str> {
self.0.get(key).and_then(|val| val.to_str().ok())
}

fn keys(&self) -> Vec<&str> {
self.0
.keys()
.map(|header_name| header_name.as_str())
.collect::<Vec<_>>()
}
}

/// [`tower_layer::Layer`] will add OTEL specific tracing to the request using
/// the [`OtelTraceService`].
#[derive(Clone, Default)]
pub struct OtelTraceLayer {}

impl<S> Layer<S> for OtelTraceLayer {
type Service = OtelTraceService<S>;

fn layer(&self, inner: S) -> Self::Service {
OtelTraceService { inner }
}
}

/// The [`OtelTraceService`] will create a new otel span for each request.
///
/// On any request this will create a new span with some of the http/url
/// attributes as defined by the OTEL spec. Note this is a minimal
/// implementation at the moment. This span will use any context from the
/// incoming request (as defined by the tracecontext spec) as the parent span.
///
/// It will also encode the span context into the response headers.
#[derive(Clone)]
pub struct OtelTraceService<S> {
inner: S,
}

impl<S> tower::Service<Request> for OtelTraceService<S>
where
S: tower::Service<Request, Response = Response> + Send + 'static,
S::Future: Send + 'static,
{
type Response = S::Response;
type Error = S::Error;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}

fn call(&mut self, req: Request) -> Self::Future {
let propagator = TraceContextPropagator::new();

let span = create_span_from_req(&req);
let headers = req.headers();
let extractor = HeaderMapExtractor(headers);

let context = propagator.extract(&extractor);
span.set_parent(context);

let future = self.inner.call(req);
Box::pin(
async move {
let mut response: Response = future.await?;

let headers = response.headers_mut();

let context = tracing::Span::current().context();
let mut header_injector = HeaderMapInjector(headers);
propagator.inject_context(&context, &mut header_injector);

Ok(response)
}
.instrument(span),
)
}
}

/// Create a new span for the incoming request. This uses conventions from the
/// point of view of a server (ie. the SpanKind is server and will use the
/// MatchedPath).
///
/// Note that this will only set a minimal set of attributes on the span.
fn create_span_from_req(req: &Request) -> Span {
// Try to get the matched path from the request. This ia a extenion from
// Axum, so it might not be present. Fallback on the actual path if it
// isn't present.
let path = if let Some(path) = req.extensions().get::<MatchedPath>() {
path.as_str()
} else {
req.uri().path()
};

info_span!(
"HTTP request",
http.request.method = req.method().as_str(),
url.path = req.uri().path(),
url.query = req.uri().query(),
url.scheme = ?req.uri().scheme(),
otel.kind = "Server",
otel.name = format!("{} {}", req.method().as_str(), path),
otel.status_code = field::Empty, // Should be set on the response
)
}

0 comments on commit afb36b1

Please sign in to comment.