Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement tower layer to propagate trace context #100

Merged
merged 5 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions fpx/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ tokio-tungstenite = { version = "0.21", features = [
] } # This should be kept the same as whatever Axum has
toml = { version = "0.8" }
tonic = { version = "0.12" }
tower = { version = "0.4" }
tracing = { version = "0.1" }
tracing-opentelemetry = { version = "0.25" }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
Expand Down
1 change: 1 addition & 0 deletions fpx/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,5 @@ fn api_router() -> axum::Router<ApiState> {
get(handlers::spans::span_list_handler),
)
.fallback(StatusCode::NOT_FOUND)
.layer(crate::otel_util::OtelTraceLayer::default())
}
25 changes: 24 additions & 1 deletion fpx/src/api/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@ use super::errors::ApiClientError;
use super::handlers::spans::SpanGetError;
use super::handlers::RequestGetError;
use crate::api::models;
use crate::otel_util::HeaderMapInjector;
use anyhow::Result;
use http::Method;
use http::{HeaderMap, Method};
use opentelemetry::propagation::TextMapPropagator;
use opentelemetry_sdk::propagation::TraceContextPropagator;
use tracing::trace;
use tracing_opentelemetry::OpenTelemetrySpanExt;
use url::Url;

pub struct ApiClient {
Expand Down Expand Up @@ -57,6 +61,22 @@ impl ApiClient {

let req = self.client.request(method, u);

// Take the current otel context, and inject those details into the
// Request using the TraceContext format.
let req = {
let mut headers = HeaderMap::new();
let propagator = TraceContextPropagator::new();

let context = tracing::Span::current().context();
let mut header_injector = HeaderMapInjector(&mut headers);
propagator.inject_context(&context, &mut header_injector);

req.headers(headers)
};

// TODO: Create new otel span (SpanKind::Client) and add relevant client
// attributes to it.

// Make request
let response = req.send().await?;

Expand All @@ -67,6 +87,9 @@ impl ApiClient {
// Read the entire response into a local buffer.
let body = response.bytes().await?;

// TODO: Mark the span status as Err if we are unable to parse the
// response.

// Try to parse the result as T.
match serde_json::from_slice::<T>(&body) {
Ok(result) => Ok(result),
Expand Down
2 changes: 1 addition & 1 deletion fpx/src/canned_requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ impl CannedRequest {
.read()
.await
.get(name)
.ok_or_else(|| CannedRequestListError::NotFound)?
.ok_or(CannedRequestListError::NotFound)?
.clone();

request.name = name.to_string();
Expand Down
1 change: 1 addition & 0 deletions fpx/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ pub mod data;
pub mod events;
pub mod grpc;
pub mod inspector;
mod otel_util;
mod service;
25 changes: 21 additions & 4 deletions fpx/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use opentelemetry_sdk::trace::Config;
use opentelemetry_sdk::Resource;
use std::env;
use std::path::Path;
use tracing::trace;
use tracing_opentelemetry::OpenTelemetryLayer;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
Expand All @@ -20,15 +21,24 @@ pub mod data;
mod events;
mod grpc;
mod inspector;
mod otel_util;
mod service;

#[tokio::main]
async fn main() -> Result<()> {
let args = commands::Args::parse();
let should_shutdown_tracing = args.enable_tracing;

setup_tracing(&args)?;

commands::handle_command(args).await
let result = commands::handle_command(args).await;

if should_shutdown_tracing {
trace!("Shutting down tracers");
shutdown_tracing();
}

result
}

fn setup_tracing(args: &commands::Args) -> Result<()> {
Expand All @@ -41,7 +51,7 @@ fn setup_tracing(args: &commands::Args) -> Result<()> {

let trace_layer = if args.enable_tracing {
// This tracer is responsible for sending the actual traces.
let tracer = opentelemetry_otlp::new_pipeline()
let tracer_provider = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter()
Expand All @@ -53,8 +63,11 @@ fn setup_tracing(args: &commands::Args) -> Result<()> {
.with_resource(Resource::new(vec![KeyValue::new("service.name", "fpx")])),
)
.install_batch(runtime::Tokio)
.context("unable to install tracer")?
.tracer("fpx");
.context("unable to install tracer")?;

opentelemetry::global::set_tracer_provider(tracer_provider.clone());

let tracer = tracer_provider.tracer("fpx");

// This layer will take the traces from the `tracing` crate and send
// them to the tracer specified above.
Expand All @@ -73,6 +86,10 @@ fn setup_tracing(args: &commands::Args) -> Result<()> {
Ok(())
}

fn shutdown_tracing() {
opentelemetry::global::shutdown_tracer_provider();
}

/// Ensure that all the necessary directories are created for fpx.
///
/// This includes the top level fpx working directory and many of the
Expand Down
145 changes: 145 additions & 0 deletions fpx/src/otel_util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
use axum::extract::{MatchedPath, Request};
use axum::response::Response;
use futures_util::future::BoxFuture;
use http::{HeaderMap, HeaderName, HeaderValue};
use opentelemetry::propagation::{Extractor, Injector, TextMapPropagator};
use opentelemetry_sdk::propagation::TraceContextPropagator;
use std::str::FromStr;
use tower::Layer;
use tracing::{field, info_span, Instrument, Span};
use tracing_opentelemetry::OpenTelemetrySpanExt;

/// The [`HeaderMapInjector`] provides a implementation for the otel
/// [`Injector`] trait on the [`HeaderMap`] type.
///
/// This allows a otel propagator to inject the span context into a
/// [`HeaderMap`], ie into a outgoing http response. Invalid keys or values,
/// according to http header standards, are silently ignored.
pub struct HeaderMapInjector<'a>(pub &'a mut HeaderMap);

impl<'a> Injector for HeaderMapInjector<'a> {
fn set(&mut self, key: &str, val: String) {
if let Ok(key) = HeaderName::from_str(key) {
if let Ok(val) = HeaderValue::from_str(&val) {
self.0.insert(key, val);
}
}
}
}

/// The [`HeaderMapExtractor`] provides a implementation for the otel
/// [`Extractor`] trait on the [`HeaderMap`] type.
///
/// This allows a otel propagator to extract a span context from a [`HeaderMap`],
/// ie from a incoming http request.
pub struct HeaderMapExtractor<'a>(pub &'a HeaderMap);

impl<'a> Extractor for HeaderMapExtractor<'a> {
fn get(&self, key: &str) -> Option<&str> {
self.0.get(key).and_then(|val| val.to_str().ok())
}

fn keys(&self) -> Vec<&str> {
self.0
.keys()
.map(|header_name| header_name.as_str())
.collect::<Vec<_>>()
}
}

/// [`tower_layer::Layer`] will add OTEL specific tracing to the request using
/// the [`OtelTraceService`].
#[derive(Clone, Default)]
pub struct OtelTraceLayer {}

impl<S> Layer<S> for OtelTraceLayer {
type Service = OtelTraceService<S>;

fn layer(&self, inner: S) -> Self::Service {
OtelTraceService { inner }
}
}

/// The [`OtelTraceService`] will create a new otel span for each request.
///
/// On any request this will create a new span with some of the http/url
/// attributes as defined by the OTEL spec. Note this is a minimal
/// implementation at the moment. This span will use any context from the
/// incoming request (as defined by the tracecontext spec) as the parent span.
///
/// It will also encode the span context into the response headers.
#[derive(Clone)]
pub struct OtelTraceService<S> {
inner: S,
}

impl<S> tower::Service<Request> for OtelTraceService<S>
where
S: tower::Service<Request, Response = Response> + Send + 'static,
S::Future: Send + 'static,
{
type Response = S::Response;
type Error = S::Error;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}

fn call(&mut self, req: Request) -> Self::Future {
let propagator = TraceContextPropagator::new();

let span = create_span_from_req(&req);
let headers = req.headers();
let extractor = HeaderMapExtractor(headers);

let context = propagator.extract(&extractor);
span.set_parent(context);

let future = self.inner.call(req);
Box::pin(
async move {
let mut response: Response = future.await?;

let headers = response.headers_mut();

let context = tracing::Span::current().context();
let mut header_injector = HeaderMapInjector(headers);
propagator.inject_context(&context, &mut header_injector);

Ok(response)
}
.instrument(span),
)
}
}

/// Create a new span for the incoming request. This uses conventions from the
/// point of view of a server (ie. the SpanKind is server and will use the
/// MatchedPath).
///
/// Note that this will only set a minimal set of attributes on the span.
fn create_span_from_req(req: &Request) -> Span {
// Try to get the matched path from the request. This ia a extenion from
// Axum, so it might not be present. Fallback on the actual path if it
// isn't present.
let path = if let Some(path) = req.extensions().get::<MatchedPath>() {
path.as_str()
} else {
req.uri().path()
};

info_span!(
"HTTP request",
http.request.method = req.method().as_str(),
url.path = req.uri().path(),
url.query = req.uri().query(),
url.scheme = ?req.uri().scheme(),
otel.kind = "Server",
otel.name = format!("{} {}", req.method().as_str(), path),
otel.status_code = field::Empty, // Should be set on the response
)
}
Loading