From 10fec1bc08eb6a7d714d69ead4ee2e152b6ca93d Mon Sep 17 00:00:00 2001 From: Chris Branch Date: Fri, 1 Nov 2024 22:15:42 +0000 Subject: [PATCH] Expose active traces through telemetry server Tracks currently-active spans in a low-overhead manner (relative to the general overhead of tracing, at least). This is particularly useful because even unsampled traces are available to be viewed here. The model for this functionality is https://pkg.go.dev/golang.org/x/net/trace although there is no built-in viewer here, we expect you to use Chrome's about:tracing or anything that supports the same JSON log format. --- Cargo.toml | 3 + foundations/Cargo.toml | 1 + foundations/src/telemetry/server.rs | 26 ++++ foundations/src/telemetry/tracing/init.rs | 9 ++ foundations/src/telemetry/tracing/internal.rs | 28 +++- .../telemetry/tracing/live/event_output.rs | 94 ++++++++++++ .../tracing/live/live_reference_set.rs | 144 ++++++++++++++++++ foundations/src/telemetry/tracing/live/mod.rs | 34 +++++ foundations/src/telemetry/tracing/mod.rs | 4 +- foundations/tests/telemetry_server.rs | 51 ++++++- 10 files changed, 390 insertions(+), 4 deletions(-) create mode 100644 foundations/src/telemetry/tracing/live/event_output.rs create mode 100644 foundations/src/telemetry/tracing/live/live_reference_set.rs create mode 100644 foundations/src/telemetry/tracing/live/mod.rs diff --git a/Cargo.toml b/Cargo.toml index eb6d9cb..647a79b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -79,3 +79,6 @@ neli = "0.6.4" neli-proc-macros = "0.1.3" parking_lot_core = "0.9.9" thiserror = "1.0.56" + +[patch.crates-io] +cf-rustracing = { git = "https://github.com/cbranch/rustracing.git", rev = "c81f98643547e31d920c73a440c382690bc2cde1" } diff --git a/foundations/Cargo.toml b/foundations/Cargo.toml index 179f0cf..a734524 100644 --- a/foundations/Cargo.toml +++ b/foundations/Cargo.toml @@ -116,6 +116,7 @@ tracing = [ "dep:futures-util", "dep:tokio", "dep:serde", + "dep:slab", ] # Enables metrics functionality. diff --git a/foundations/src/telemetry/server.rs b/foundations/src/telemetry/server.rs index e98380e..2edc8e4 100644 --- a/foundations/src/telemetry/server.rs +++ b/foundations/src/telemetry/server.rs @@ -109,6 +109,21 @@ impl RouteMap { .boxed() }), }); + + #[cfg(feature = "tracing")] + self.set(TelemetryServerRoute { + path: "/debug/traces".into(), + methods: vec![Method::GET], + handler: Box::new(|_, settings| { + async move { + into_response( + "application/json; charset=utf-8", + tracing::output_traces(settings).await, + ) + } + .boxed() + }), + }); } fn set(&mut self, route: TelemetryServerRoute) { @@ -280,3 +295,14 @@ mod memory_profiling { profiler(settings)?.heap_stats() } } + +#[cfg(feature = "tracing")] +mod tracing { + use super::*; + use crate::telemetry::tracing::init::TracingHarness; + use crate::Result; + + pub(super) async fn output_traces(_settings: Arc) -> Result { + Ok(TracingHarness::get().active_roots.get_active_traces()) + } +} diff --git a/foundations/src/telemetry/tracing/init.rs b/foundations/src/telemetry/tracing/init.rs index 014fb89..486d64d 100644 --- a/foundations/src/telemetry/tracing/init.rs +++ b/foundations/src/telemetry/tracing/init.rs @@ -27,6 +27,9 @@ static NOOP_HARNESS: Lazy = Lazy::new(|| { #[cfg(feature = "testing")] test_tracer_scope_stack: Default::default(), + + #[cfg(feature = "telemetry-server")] + active_roots: Default::default(), } }); @@ -37,6 +40,9 @@ pub(crate) struct TracingHarness { #[cfg(feature = "testing")] pub(crate) test_tracer_scope_stack: ScopeStack, + + #[cfg(feature = "telemetry-server")] + pub(crate) active_roots: crate::telemetry::tracing::live::ActiveRoots, } impl TracingHarness { @@ -95,6 +101,9 @@ pub(crate) fn init( #[cfg(feature = "testing")] test_tracer_scope_stack: Default::default(), + + #[cfg(feature = "telemetry-server")] + active_roots: Default::default(), }; let _ = HARNESS.set(harness); diff --git a/foundations/src/telemetry/tracing/internal.rs b/foundations/src/telemetry/tracing/internal.rs index 84c5891..8d5d6f5 100644 --- a/foundations/src/telemetry/tracing/internal.rs +++ b/foundations/src/telemetry/tracing/internal.rs @@ -11,11 +11,29 @@ use std::sync::Arc; pub(crate) type Tracer = cf_rustracing::Tracer, SpanContextState>; +#[cfg(not(feature = "telemetry-server"))] +pub(crate) type SharedSpanInner = Arc>; + +#[cfg(feature = "telemetry-server")] +pub(crate) type SharedSpanInner = super::live::SharedSpanHandle; + +#[cfg(not(feature = "telemetry-server"))] +fn track_span(span: Span) -> SharedSpanInner { + Arc::new(parking_lot::RwLock::new(span)) +} + +#[cfg(feature = "telemetry-server")] +fn track_span(span: Span) -> SharedSpanInner { + TracingHarness::get() + .active_roots + .track(Arc::new(parking_lot::RwLock::new(span))) +} + #[derive(Debug, Clone)] pub(crate) struct SharedSpan { // NOTE: we intentionally use a lock without poisoning here to not // panic the threads if they just share telemetry with failed thread. - pub(crate) inner: Arc>, + pub(crate) inner: SharedSpanInner, // NOTE: store sampling flag separately, so we don't need to acquire lock // every time we need to check the flag. is_sampled: bool, @@ -26,12 +44,18 @@ impl From for SharedSpan { let is_sampled = inner.is_sampled(); Self { - inner: Arc::new(parking_lot::RwLock::new(inner)), + inner: track_span(inner), is_sampled, } } } +impl SharedSpan { + pub(crate) fn into_span(self) -> Arc> { + Arc::clone(&self.inner) + } +} + pub fn write_current_span(write_fn: impl FnOnce(&mut Span)) { if let Some(span) = current_span() { if span.is_sampled { diff --git a/foundations/src/telemetry/tracing/live/event_output.rs b/foundations/src/telemetry/tracing/live/event_output.rs new file mode 100644 index 0000000..5ce9851 --- /dev/null +++ b/foundations/src/telemetry/tracing/live/event_output.rs @@ -0,0 +1,94 @@ +//! Outputs telemetry spans in Chrome JSON trace format (i.e. the format used by about:tracing). +use super::SharedSpanHandle; +use std::time::SystemTime; + +/// Outputs a slice of shared spans as a Chrome JSON trace log. +pub(crate) fn spans_to_trace_events(epoch: SystemTime, spans: &[SharedSpanHandle]) -> String { + use cf_rustracing::span::InspectableSpan; + + let mut log_builder = TraceLogBuilder::new(); + + let end_timestamp = epoch + .elapsed() + .ok() + .and_then(|x| u64::try_from(x.as_nanos()).ok()) + .unwrap_or(u64::MAX); + + for span in spans { + let span_ref = span.read(); + let span_state = span_ref.context().unwrap().state(); + let trace_id = span_state.trace_id().to_string(); + let name = span_ref.operation_name(); + + let start_ts = span_ref + .start_time() + .duration_since(epoch) + .ok() + .and_then(|x| u64::try_from(x.as_nanos()).ok()) + .unwrap_or_default(); + + let end_ts = span_ref + .finish_time() + .and_then(|x| x.duration_since(epoch).ok()) + .and_then(|x| u64::try_from(x.as_nanos()).ok()) + .unwrap_or(end_timestamp); + + log_builder.write_event(&trace_id, name, "", TraceEventType::Begin, start_ts); + log_builder.write_event(&trace_id, name, "", TraceEventType::End, end_ts); + } + + log_builder.finalize(end_timestamp) +} + +#[derive(Copy, Clone)] +enum TraceEventType { + Begin, + End, +} + +fn escape(s: &str) -> String { + s.escape_default().to_string() +} + +struct TraceLogBuilder { + out: String, +} + +impl TraceLogBuilder { + fn new() -> Self { + TraceLogBuilder { + out: "[".to_string(), + } + } + + fn write_event( + &mut self, + trace_id: &str, + name: &str, + category: &str, + event_type: TraceEventType, + timestamp_us: u64, + ) { + self.out.push_str(&format!( + "{{\"pid\":1,\"name\":\"{}\",\"cat\":\"{}\",\"ph\":\"{}\",\"ts\":{},\"id\":\"{}\"}},", + escape(name), + escape(category), + match event_type { + TraceEventType::Begin => "B", + TraceEventType::End => "E", + }, + timestamp_us, + trace_id, + )); + } + + fn finalize(mut self, end_timestamp: u64) -> String { + self.out.push_str(&format!( + "{{\"pid\":1,\"name\":\"Trace dump requested\",\"ph\":\"i\",\"ts\":{},\"s\":\"g\"}}", + end_timestamp, + )); + + self.out.push(']'); + self.out + } +} diff --git a/foundations/src/telemetry/tracing/live/live_reference_set.rs b/foundations/src/telemetry/tracing/live/live_reference_set.rs new file mode 100644 index 0000000..a4f42e8 --- /dev/null +++ b/foundations/src/telemetry/tracing/live/live_reference_set.rs @@ -0,0 +1,144 @@ +use slab::Slab; +use std::sync::{Arc, Mutex, Weak}; + +/// Stores a set of reference-counted objects and allows iteration over them +/// when requested. When the objects are dropped, they will be removed from +/// the set. +/// +/// The objects are wrapped by the `LiveReferenceSet` through its `track` +/// method. +pub(crate) struct LiveReferenceSet(Arc>); + +/// No default bound on T is required +impl Default for LiveReferenceSet { + fn default() -> Self { + LiveReferenceSet(Arc::new(LiveReferenceSetInner { + active_set: Default::default(), + })) + } +} + +struct LiveReferenceSetInner { + active_set: Mutex>>>, +} + +impl LiveReferenceSet { + /// Wrap `value` in an `Arc` and track the lifetime of the object. + /// + /// While the object has strong references, it is possible to obtain a + /// reference to it through the `get_live_references` method. + pub(crate) fn track(&self, value: T) -> Arc> { + let set_ref = Arc::clone(&self.0); + Arc::new_cyclic(|weak| { + let slot = self.0.active_set.lock().unwrap().insert(Weak::clone(weak)); + LiveReferenceHandle { + set_ref, + slot, + value, + } + }) + } + + /// Get references to all live objects tracked by the `LiveReferenceSet`. + /// + /// Because this object is internally locked, the references are cloned and + /// collected into a `Vec`. The assumption is that any operation using + /// the references is expensive enough that it should happen outside the critical + /// section. + pub(crate) fn get_live_references(&self) -> Vec>> { + self.0 + .active_set + .lock() + .unwrap() + .iter() + .filter_map(|(_, span)| span.upgrade()) + .collect() + } +} + +/// Wrapper around an object whose lifetime is tracked by `LiveReferenceSet`. +/// Access to the object is possible via the `Deref` implementation. +pub(crate) struct LiveReferenceHandle { + value: T, + set_ref: Arc>, + slot: usize, +} + +impl Drop for LiveReferenceHandle { + fn drop(&mut self) { + self.set_ref.active_set.lock().unwrap().remove(self.slot); + } +} + +impl std::ops::Deref for LiveReferenceHandle { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.value + } +} + +impl std::fmt::Debug for LiveReferenceHandle { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.value.fmt(f) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + struct NotifyOnDrop { + target: Arc>>, + inner_val: usize, + } + + impl Drop for NotifyOnDrop { + fn drop(&mut self) { + self.target.lock().unwrap().push(self.inner_val); + } + } + + #[test] + fn test_live_references() { + let notify_vec = Arc::new(Mutex::new(vec![])); + let ref_set = Arc::new(LiveReferenceSet::default()); + + // Dropping a returned reference should immediately drop the inner object + drop(ref_set.track(NotifyOnDrop { + target: Arc::clone(¬ify_vec), + inner_val: 1, + })); + + assert_eq!(&*notify_vec.lock().unwrap(), &[1]); + assert_eq!(ref_set.get_live_references().len(), 0); + + // Holding a reference should allow us to get it through the reference set + let r1 = ref_set.track(NotifyOnDrop { + target: Arc::clone(¬ify_vec), + inner_val: 2, + }); + + assert_eq!(&*notify_vec.lock().unwrap(), &[1]); + assert_eq!(ref_set.get_live_references()[0].inner_val, 2); + + // Holding a second reference... + let r2 = ref_set.track(NotifyOnDrop { + target: Arc::clone(¬ify_vec), + inner_val: 3, + }); + + assert_eq!(&*notify_vec.lock().unwrap(), &[1]); + assert_eq!(ref_set.get_live_references()[0].inner_val, 2); + assert_eq!(ref_set.get_live_references()[1].inner_val, 3); + + // then dropping the first + drop(r1); + assert_eq!(&*notify_vec.lock().unwrap(), &[1, 2]); + assert_eq!(ref_set.get_live_references()[0].inner_val, 3); + + drop(r2); + assert_eq!(&*notify_vec.lock().unwrap(), &[1, 2, 3]); + assert_eq!(ref_set.get_live_references().len(), 0); + } +} diff --git a/foundations/src/telemetry/tracing/live/mod.rs b/foundations/src/telemetry/tracing/live/mod.rs new file mode 100644 index 0000000..0be9122 --- /dev/null +++ b/foundations/src/telemetry/tracing/live/mod.rs @@ -0,0 +1,34 @@ +mod event_output; +mod live_reference_set; + +use cf_rustracing_jaeger::span::Span; +use live_reference_set::{LiveReferenceHandle, LiveReferenceSet}; +use std::sync::Arc; +use std::time::SystemTime; + +type SharedSpanInner = Arc>; +pub(crate) type SharedSpanHandle = Arc>; + +pub(crate) struct ActiveRoots { + roots: Arc>, + start: SystemTime, +} + +impl Default for ActiveRoots { + fn default() -> Self { + Self { + roots: Default::default(), + start: SystemTime::now(), + } + } +} + +impl ActiveRoots { + pub(crate) fn get_active_traces(&self) -> String { + event_output::spans_to_trace_events(self.start, &self.roots.get_live_references()) + } + + pub(crate) fn track(&self, value: SharedSpanInner) -> SharedSpanHandle { + self.roots.track(value) + } +} diff --git a/foundations/src/telemetry/tracing/mod.rs b/foundations/src/telemetry/tracing/mod.rs index 71ba202..559b708 100644 --- a/foundations/src/telemetry/tracing/mod.rs +++ b/foundations/src/telemetry/tracing/mod.rs @@ -7,6 +7,8 @@ pub(crate) mod init; #[cfg(any(test, feature = "testing"))] pub(crate) mod testing; +#[cfg(feature = "telemetry-server")] +mod live; mod output_jaeger_thrift_udp; mod rate_limit; @@ -385,7 +387,7 @@ pub fn start_trace( /// /// [rustracing]: https://crates.io/crates/rustracing pub fn rustracing_span() -> Option>> { - current_span().map(|span| span.inner) + current_span().map(|span| span.into_span()) } // NOTE: `#[doc(hidden)]` + `#[doc(inline)]` for `pub use` trick is used to prevent these macros diff --git a/foundations/tests/telemetry_server.rs b/foundations/tests/telemetry_server.rs index c18c2ec..5525c45 100644 --- a/foundations/tests/telemetry_server.rs +++ b/foundations/tests/telemetry_server.rs @@ -1,5 +1,5 @@ use foundations::telemetry::settings::{TelemetryServerSettings, TelemetrySettings}; -use foundations::telemetry::{TelemetryConfig, TelemetryServerRoute}; +use foundations::telemetry::{TelemetryConfig, TelemetryContext, TelemetryServerRoute}; use futures_util::FutureExt; use hyper::{Method, Response}; use std::net::{Ipv4Addr, SocketAddr}; @@ -97,4 +97,53 @@ async fn telemetry_server() { .unwrap() .contains("Allocated") ); + + let telemetry_ctx = TelemetryContext::current(); + let _scope = telemetry_ctx.scope(); + + // Create a broadcast channel used to keep tasks active until we fetch traces. + let (keep_trace_active, mut trace_waiter) = tokio::sync::broadcast::channel(2); + + // Create a span with a detached child. + // The parent span will end before the child does. + let mut trace_waiter1 = keep_trace_active.subscribe(); + #[allow(clippy::async_yields_async)] + let child_span_handle = foundations::telemetry::tracing::span("parent_span") + .into_context() + .apply(async move { + // return the JoinHandle for this task + tokio::spawn( + foundations::telemetry::tracing::span("child_span_outliving_parent") + .into_context() + .apply(async move { + let _ = trace_waiter1.recv().await; + }), + ) + }) + .await; + + // Create a span that stays active + let traced_task = { + let _scope = telemetry_ctx.scope(); + let _root = foundations::telemetry::tracing::span("my_root_span"); + + tokio::spawn(TelemetryContext::current().apply(async move { + let _ = trace_waiter.recv().await; + })) + }; + + let trace_output = reqwest::get(format!("http://{server_addr}/debug/traces")) + .await + .unwrap() + .text() + .await + .unwrap(); + + keep_trace_active.send(()).unwrap(); + let _ = traced_task.await; + let _ = child_span_handle.await; + + assert!(!trace_output.contains("parent_span")); + assert!(trace_output.contains("child_span_outliving_parent")); + assert!(trace_output.contains("my_root_span")); }