diff --git a/Cargo.lock b/Cargo.lock index d2ed78e3e..17e1f6f31 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -742,6 +742,7 @@ dependencies = [ "tokio-tungstenite", "toml", "tonic 0.12.1", + "tower", "tracing", "tracing-opentelemetry", "tracing-subscriber", diff --git a/fpx/Cargo.toml b/fpx/Cargo.toml index 4e9dd757a..1d0e98d6a 100644 --- a/fpx/Cargo.toml +++ b/fpx/Cargo.toml @@ -61,6 +61,7 @@ tokio-tungstenite = { version = "0.21", features = [ ] } # This should be kept the same as whatever Axum has toml = { version = "0.8" } tonic = { version = "0.12" } +tower = { version = "0.4" } tracing = { version = "0.1" } tracing-opentelemetry = { version = "0.25" } tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/fpx/src/api.rs b/fpx/src/api.rs index 98791127d..ad52d901b 100644 --- a/fpx/src/api.rs +++ b/fpx/src/api.rs @@ -116,4 +116,5 @@ fn api_router() -> axum::Router { get(handlers::spans::span_list_handler), ) .fallback(StatusCode::NOT_FOUND) + .layer(crate::otel_util::OtelTraceLayer::default()) } diff --git a/fpx/src/api/client.rs b/fpx/src/api/client.rs index 3ac4fd96a..25f65d8ce 100644 --- a/fpx/src/api/client.rs +++ b/fpx/src/api/client.rs @@ -8,9 +8,13 @@ use super::errors::ApiClientError; use super::handlers::spans::SpanGetError; use super::handlers::RequestGetError; use crate::api::models; +use crate::otel_util::HeaderMapInjector; use anyhow::Result; -use http::Method; +use http::{HeaderMap, Method}; +use opentelemetry::propagation::TextMapPropagator; +use opentelemetry_sdk::propagation::TraceContextPropagator; use tracing::trace; +use tracing_opentelemetry::OpenTelemetrySpanExt; use url::Url; pub struct ApiClient { @@ -57,6 +61,22 @@ impl ApiClient { let req = self.client.request(method, u); + // Take the current otel context, and inject those details into the + // Request using the TraceContext format. + let req = { + let mut headers = HeaderMap::new(); + let propagator = TraceContextPropagator::new(); + + let context = tracing::Span::current().context(); + let mut header_injector = HeaderMapInjector(&mut headers); + propagator.inject_context(&context, &mut header_injector); + + req.headers(headers) + }; + + // TODO: Create new otel span (SpanKind::Client) and add relevant client + // attributes to it. + // Make request let response = req.send().await?; @@ -67,6 +87,9 @@ impl ApiClient { // Read the entire response into a local buffer. let body = response.bytes().await?; + // TODO: Mark the span status as Err if we are unable to parse the + // response. + // Try to parse the result as T. match serde_json::from_slice::(&body) { Ok(result) => Ok(result), diff --git a/fpx/src/canned_requests.rs b/fpx/src/canned_requests.rs index 24d32c919..45ddf60e2 100644 --- a/fpx/src/canned_requests.rs +++ b/fpx/src/canned_requests.rs @@ -115,7 +115,7 @@ impl CannedRequest { .read() .await .get(name) - .ok_or_else(|| CannedRequestListError::NotFound)? + .ok_or(CannedRequestListError::NotFound)? .clone(); request.name = name.to_string(); diff --git a/fpx/src/lib.rs b/fpx/src/lib.rs index 7221b2600..6bd9c9910 100644 --- a/fpx/src/lib.rs +++ b/fpx/src/lib.rs @@ -4,4 +4,5 @@ pub mod data; pub mod events; pub mod grpc; pub mod inspector; +mod otel_util; mod service; diff --git a/fpx/src/main.rs b/fpx/src/main.rs index 1ad0edae4..28db2a26c 100644 --- a/fpx/src/main.rs +++ b/fpx/src/main.rs @@ -8,6 +8,7 @@ use opentelemetry_sdk::trace::Config; use opentelemetry_sdk::Resource; use std::env; use std::path::Path; +use tracing::trace; use tracing_opentelemetry::OpenTelemetryLayer; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; @@ -20,15 +21,24 @@ pub mod data; mod events; mod grpc; mod inspector; +mod otel_util; mod service; #[tokio::main] async fn main() -> Result<()> { let args = commands::Args::parse(); + let should_shutdown_tracing = args.enable_tracing; setup_tracing(&args)?; - commands::handle_command(args).await + let result = commands::handle_command(args).await; + + if should_shutdown_tracing { + trace!("Shutting down tracers"); + shutdown_tracing(); + } + + result } fn setup_tracing(args: &commands::Args) -> Result<()> { @@ -41,7 +51,7 @@ fn setup_tracing(args: &commands::Args) -> Result<()> { let trace_layer = if args.enable_tracing { // This tracer is responsible for sending the actual traces. - let tracer = opentelemetry_otlp::new_pipeline() + let tracer_provider = opentelemetry_otlp::new_pipeline() .tracing() .with_exporter( opentelemetry_otlp::new_exporter() @@ -53,8 +63,11 @@ fn setup_tracing(args: &commands::Args) -> Result<()> { .with_resource(Resource::new(vec![KeyValue::new("service.name", "fpx")])), ) .install_batch(runtime::Tokio) - .context("unable to install tracer")? - .tracer("fpx"); + .context("unable to install tracer")?; + + opentelemetry::global::set_tracer_provider(tracer_provider.clone()); + + let tracer = tracer_provider.tracer("fpx"); // This layer will take the traces from the `tracing` crate and send // them to the tracer specified above. @@ -73,6 +86,10 @@ fn setup_tracing(args: &commands::Args) -> Result<()> { Ok(()) } +fn shutdown_tracing() { + opentelemetry::global::shutdown_tracer_provider(); +} + /// Ensure that all the necessary directories are created for fpx. /// /// This includes the top level fpx working directory and many of the diff --git a/fpx/src/otel_util.rs b/fpx/src/otel_util.rs new file mode 100644 index 000000000..f43cb9de8 --- /dev/null +++ b/fpx/src/otel_util.rs @@ -0,0 +1,145 @@ +use axum::extract::{MatchedPath, Request}; +use axum::response::Response; +use futures_util::future::BoxFuture; +use http::{HeaderMap, HeaderName, HeaderValue}; +use opentelemetry::propagation::{Extractor, Injector, TextMapPropagator}; +use opentelemetry_sdk::propagation::TraceContextPropagator; +use std::str::FromStr; +use tower::Layer; +use tracing::{field, info_span, Instrument, Span}; +use tracing_opentelemetry::OpenTelemetrySpanExt; + +/// The [`HeaderMapInjector`] provides a implementation for the otel +/// [`Injector`] trait on the [`HeaderMap`] type. +/// +/// This allows a otel propagator to inject the span context into a +/// [`HeaderMap`], ie into a outgoing http response. Invalid keys or values, +/// according to http header standards, are silently ignored. +pub struct HeaderMapInjector<'a>(pub &'a mut HeaderMap); + +impl<'a> Injector for HeaderMapInjector<'a> { + fn set(&mut self, key: &str, val: String) { + if let Ok(key) = HeaderName::from_str(key) { + if let Ok(val) = HeaderValue::from_str(&val) { + self.0.insert(key, val); + } + } + } +} + +/// The [`HeaderMapExtractor`] provides a implementation for the otel +/// [`Extractor`] trait on the [`HeaderMap`] type. +/// +/// This allows a otel propagator to extract a span context from a [`HeaderMap`], +/// ie from a incoming http request. +pub struct HeaderMapExtractor<'a>(pub &'a HeaderMap); + +impl<'a> Extractor for HeaderMapExtractor<'a> { + fn get(&self, key: &str) -> Option<&str> { + self.0.get(key).and_then(|val| val.to_str().ok()) + } + + fn keys(&self) -> Vec<&str> { + self.0 + .keys() + .map(|header_name| header_name.as_str()) + .collect::>() + } +} + +/// [`tower_layer::Layer`] will add OTEL specific tracing to the request using +/// the [`OtelTraceService`]. +#[derive(Clone, Default)] +pub struct OtelTraceLayer {} + +impl Layer for OtelTraceLayer { + type Service = OtelTraceService; + + fn layer(&self, inner: S) -> Self::Service { + OtelTraceService { inner } + } +} + +/// The [`OtelTraceService`] will create a new otel span for each request. +/// +/// On any request this will create a new span with some of the http/url +/// attributes as defined by the OTEL spec. Note this is a minimal +/// implementation at the moment. This span will use any context from the +/// incoming request (as defined by the tracecontext spec) as the parent span. +/// +/// It will also encode the span context into the response headers. +#[derive(Clone)] +pub struct OtelTraceService { + inner: S, +} + +impl tower::Service for OtelTraceService +where + S: tower::Service + Send + 'static, + S::Future: Send + 'static, +{ + type Response = S::Response; + type Error = S::Error; + type Future = BoxFuture<'static, Result>; + + fn poll_ready( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, req: Request) -> Self::Future { + let propagator = TraceContextPropagator::new(); + + let span = create_span_from_req(&req); + let headers = req.headers(); + let extractor = HeaderMapExtractor(headers); + + let context = propagator.extract(&extractor); + span.set_parent(context); + + let future = self.inner.call(req); + Box::pin( + async move { + let mut response: Response = future.await?; + + let headers = response.headers_mut(); + + let context = tracing::Span::current().context(); + let mut header_injector = HeaderMapInjector(headers); + propagator.inject_context(&context, &mut header_injector); + + Ok(response) + } + .instrument(span), + ) + } +} + +/// Create a new span for the incoming request. This uses conventions from the +/// point of view of a server (ie. the SpanKind is server and will use the +/// MatchedPath). +/// +/// Note that this will only set a minimal set of attributes on the span. +fn create_span_from_req(req: &Request) -> Span { + // Try to get the matched path from the request. This ia a extenion from + // Axum, so it might not be present. Fallback on the actual path if it + // isn't present. + let path = if let Some(path) = req.extensions().get::() { + path.as_str() + } else { + req.uri().path() + }; + + info_span!( + "HTTP request", + http.request.method = req.method().as_str(), + url.path = req.uri().path(), + url.query = req.uri().query(), + url.scheme = ?req.uri().scheme(), + otel.kind = "Server", + otel.name = format!("{} {}", req.method().as_str(), path), + otel.status_code = field::Empty, // Should be set on the response + ) +}