Skip to content

Commit

Permalink
Expose active traces through telemetry server
Browse files Browse the repository at this point in the history
Tracks currently-active spans in a low-overhead manner (relative to the
general overhead of tracing, at least). This is particularly useful because
even unsampled traces are available to be viewed here.

The model for this functionality is https://pkg.go.dev/golang.org/x/net/trace
although there is no built-in viewer here, we expect you to use Chrome's
about:tracing or anything that supports the same JSON log format.
  • Loading branch information
cbranch committed Nov 25, 2024
1 parent d7dc2d1 commit 10fec1b
Show file tree
Hide file tree
Showing 10 changed files with 390 additions and 4 deletions.
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,6 @@ neli = "0.6.4"
neli-proc-macros = "0.1.3"
parking_lot_core = "0.9.9"
thiserror = "1.0.56"

[patch.crates-io]
cf-rustracing = { git = "https://github.com/cbranch/rustracing.git", rev = "c81f98643547e31d920c73a440c382690bc2cde1" }
1 change: 1 addition & 0 deletions foundations/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ tracing = [
"dep:futures-util",
"dep:tokio",
"dep:serde",
"dep:slab",
]

# Enables metrics functionality.
Expand Down
26 changes: 26 additions & 0 deletions foundations/src/telemetry/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,21 @@ impl RouteMap {
.boxed()
}),
});

#[cfg(feature = "tracing")]
self.set(TelemetryServerRoute {
path: "/debug/traces".into(),
methods: vec![Method::GET],
handler: Box::new(|_, settings| {
async move {
into_response(
"application/json; charset=utf-8",
tracing::output_traces(settings).await,
)
}
.boxed()
}),
});
}

fn set(&mut self, route: TelemetryServerRoute) {
Expand Down Expand Up @@ -280,3 +295,14 @@ mod memory_profiling {
profiler(settings)?.heap_stats()
}
}

#[cfg(feature = "tracing")]
mod tracing {
use super::*;
use crate::telemetry::tracing::init::TracingHarness;
use crate::Result;

pub(super) async fn output_traces(_settings: Arc<TelemetrySettings>) -> Result<String> {
Ok(TracingHarness::get().active_roots.get_active_traces())
}
}
9 changes: 9 additions & 0 deletions foundations/src/telemetry/tracing/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ static NOOP_HARNESS: Lazy<TracingHarness> = Lazy::new(|| {

#[cfg(feature = "testing")]
test_tracer_scope_stack: Default::default(),

#[cfg(feature = "telemetry-server")]
active_roots: Default::default(),
}
});

Expand All @@ -37,6 +40,9 @@ pub(crate) struct TracingHarness {

#[cfg(feature = "testing")]
pub(crate) test_tracer_scope_stack: ScopeStack<Tracer>,

#[cfg(feature = "telemetry-server")]
pub(crate) active_roots: crate::telemetry::tracing::live::ActiveRoots,
}

impl TracingHarness {
Expand Down Expand Up @@ -95,6 +101,9 @@ pub(crate) fn init(

#[cfg(feature = "testing")]
test_tracer_scope_stack: Default::default(),

#[cfg(feature = "telemetry-server")]
active_roots: Default::default(),
};

let _ = HARNESS.set(harness);
Expand Down
28 changes: 26 additions & 2 deletions foundations/src/telemetry/tracing/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,29 @@ use std::sync::Arc;

pub(crate) type Tracer = cf_rustracing::Tracer<BoxSampler<SpanContextState>, SpanContextState>;

#[cfg(not(feature = "telemetry-server"))]
pub(crate) type SharedSpanInner = Arc<parking_lot::RwLock<Span>>;

#[cfg(feature = "telemetry-server")]
pub(crate) type SharedSpanInner = super::live::SharedSpanHandle;

#[cfg(not(feature = "telemetry-server"))]
fn track_span(span: Span) -> SharedSpanInner {
Arc::new(parking_lot::RwLock::new(span))
}

#[cfg(feature = "telemetry-server")]
fn track_span(span: Span) -> SharedSpanInner {
TracingHarness::get()
.active_roots
.track(Arc::new(parking_lot::RwLock::new(span)))
}

#[derive(Debug, Clone)]
pub(crate) struct SharedSpan {
// NOTE: we intentionally use a lock without poisoning here to not
// panic the threads if they just share telemetry with failed thread.
pub(crate) inner: Arc<parking_lot::RwLock<Span>>,
pub(crate) inner: SharedSpanInner,
// NOTE: store sampling flag separately, so we don't need to acquire lock
// every time we need to check the flag.
is_sampled: bool,
Expand All @@ -26,12 +44,18 @@ impl From<Span> for SharedSpan {
let is_sampled = inner.is_sampled();

Self {
inner: Arc::new(parking_lot::RwLock::new(inner)),
inner: track_span(inner),
is_sampled,
}
}
}

impl SharedSpan {
pub(crate) fn into_span(self) -> Arc<parking_lot::RwLock<Span>> {
Arc::clone(&self.inner)
}
}

pub fn write_current_span(write_fn: impl FnOnce(&mut Span)) {
if let Some(span) = current_span() {
if span.is_sampled {
Expand Down
94 changes: 94 additions & 0 deletions foundations/src/telemetry/tracing/live/event_output.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
//! Outputs telemetry spans in Chrome JSON trace format (i.e. the format used by about:tracing).
use super::SharedSpanHandle;
use std::time::SystemTime;

/// Outputs a slice of shared spans as a Chrome JSON trace log.
pub(crate) fn spans_to_trace_events(epoch: SystemTime, spans: &[SharedSpanHandle]) -> String {
use cf_rustracing::span::InspectableSpan;

let mut log_builder = TraceLogBuilder::new();

let end_timestamp = epoch
.elapsed()
.ok()
.and_then(|x| u64::try_from(x.as_nanos()).ok())
.unwrap_or(u64::MAX);

for span in spans {
let span_ref = span.read();
let span_state = span_ref.context().unwrap().state();
let trace_id = span_state.trace_id().to_string();
let name = span_ref.operation_name();

let start_ts = span_ref
.start_time()
.duration_since(epoch)
.ok()
.and_then(|x| u64::try_from(x.as_nanos()).ok())
.unwrap_or_default();

let end_ts = span_ref
.finish_time()
.and_then(|x| x.duration_since(epoch).ok())
.and_then(|x| u64::try_from(x.as_nanos()).ok())
.unwrap_or(end_timestamp);

log_builder.write_event(&trace_id, name, "", TraceEventType::Begin, start_ts);
log_builder.write_event(&trace_id, name, "", TraceEventType::End, end_ts);
}

log_builder.finalize(end_timestamp)
}

#[derive(Copy, Clone)]
enum TraceEventType {
Begin,
End,
}

fn escape(s: &str) -> String {
s.escape_default().to_string()
}

struct TraceLogBuilder {
out: String,
}

impl TraceLogBuilder {
fn new() -> Self {
TraceLogBuilder {
out: "[".to_string(),
}
}

fn write_event(
&mut self,
trace_id: &str,
name: &str,
category: &str,
event_type: TraceEventType,
timestamp_us: u64,
) {
self.out.push_str(&format!(
"{{\"pid\":1,\"name\":\"{}\",\"cat\":\"{}\",\"ph\":\"{}\",\"ts\":{},\"id\":\"{}\"}},",
escape(name),
escape(category),
match event_type {
TraceEventType::Begin => "B",
TraceEventType::End => "E",
},
timestamp_us,
trace_id,
));
}

fn finalize(mut self, end_timestamp: u64) -> String {
self.out.push_str(&format!(
"{{\"pid\":1,\"name\":\"Trace dump requested\",\"ph\":\"i\",\"ts\":{},\"s\":\"g\"}}",
end_timestamp,
));

self.out.push(']');
self.out
}
}
144 changes: 144 additions & 0 deletions foundations/src/telemetry/tracing/live/live_reference_set.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
use slab::Slab;
use std::sync::{Arc, Mutex, Weak};

/// Stores a set of reference-counted objects and allows iteration over them
/// when requested. When the objects are dropped, they will be removed from
/// the set.
///
/// The objects are wrapped by the `LiveReferenceSet` through its `track`
/// method.
pub(crate) struct LiveReferenceSet<T>(Arc<LiveReferenceSetInner<T>>);

/// No default bound on T is required
impl<T> Default for LiveReferenceSet<T> {
fn default() -> Self {
LiveReferenceSet(Arc::new(LiveReferenceSetInner {
active_set: Default::default(),
}))
}
}

struct LiveReferenceSetInner<T> {
active_set: Mutex<Slab<Weak<LiveReferenceHandle<T>>>>,
}

impl<T> LiveReferenceSet<T> {
/// Wrap `value` in an `Arc` and track the lifetime of the object.
///
/// While the object has strong references, it is possible to obtain a
/// reference to it through the `get_live_references` method.
pub(crate) fn track(&self, value: T) -> Arc<LiveReferenceHandle<T>> {
let set_ref = Arc::clone(&self.0);
Arc::new_cyclic(|weak| {
let slot = self.0.active_set.lock().unwrap().insert(Weak::clone(weak));
LiveReferenceHandle {
set_ref,
slot,
value,
}
})
}

/// Get references to all live objects tracked by the `LiveReferenceSet`.
///
/// Because this object is internally locked, the references are cloned and
/// collected into a `Vec`. The assumption is that any operation using
/// the references is expensive enough that it should happen outside the critical
/// section.
pub(crate) fn get_live_references(&self) -> Vec<Arc<LiveReferenceHandle<T>>> {
self.0
.active_set
.lock()
.unwrap()
.iter()
.filter_map(|(_, span)| span.upgrade())
.collect()
}
}

/// Wrapper around an object whose lifetime is tracked by `LiveReferenceSet`.
/// Access to the object is possible via the `Deref` implementation.
pub(crate) struct LiveReferenceHandle<T> {
value: T,
set_ref: Arc<LiveReferenceSetInner<T>>,
slot: usize,
}

impl<T> Drop for LiveReferenceHandle<T> {
fn drop(&mut self) {
self.set_ref.active_set.lock().unwrap().remove(self.slot);
}
}

impl<T> std::ops::Deref for LiveReferenceHandle<T> {
type Target = T;

fn deref(&self) -> &Self::Target {
&self.value
}
}

impl<T: std::fmt::Debug> std::fmt::Debug for LiveReferenceHandle<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.value.fmt(f)
}
}

#[cfg(test)]
mod tests {
use super::*;

struct NotifyOnDrop {
target: Arc<Mutex<Vec<usize>>>,
inner_val: usize,
}

impl Drop for NotifyOnDrop {
fn drop(&mut self) {
self.target.lock().unwrap().push(self.inner_val);
}
}

#[test]
fn test_live_references() {
let notify_vec = Arc::new(Mutex::new(vec![]));
let ref_set = Arc::new(LiveReferenceSet::default());

// Dropping a returned reference should immediately drop the inner object
drop(ref_set.track(NotifyOnDrop {
target: Arc::clone(&notify_vec),
inner_val: 1,
}));

assert_eq!(&*notify_vec.lock().unwrap(), &[1]);
assert_eq!(ref_set.get_live_references().len(), 0);

// Holding a reference should allow us to get it through the reference set
let r1 = ref_set.track(NotifyOnDrop {
target: Arc::clone(&notify_vec),
inner_val: 2,
});

assert_eq!(&*notify_vec.lock().unwrap(), &[1]);
assert_eq!(ref_set.get_live_references()[0].inner_val, 2);

// Holding a second reference...
let r2 = ref_set.track(NotifyOnDrop {
target: Arc::clone(&notify_vec),
inner_val: 3,
});

assert_eq!(&*notify_vec.lock().unwrap(), &[1]);
assert_eq!(ref_set.get_live_references()[0].inner_val, 2);
assert_eq!(ref_set.get_live_references()[1].inner_val, 3);

// then dropping the first
drop(r1);
assert_eq!(&*notify_vec.lock().unwrap(), &[1, 2]);
assert_eq!(ref_set.get_live_references()[0].inner_val, 3);

drop(r2);
assert_eq!(&*notify_vec.lock().unwrap(), &[1, 2, 3]);
assert_eq!(ref_set.get_live_references().len(), 0);
}
}
Loading

0 comments on commit 10fec1b

Please sign in to comment.