Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose active traces through telemetry server #87

Merged
merged 5 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ foundations = { version = "4.0.0", path = "./foundations" }
foundations-macros = { version = "4.0.0", path = "./foundations-macros" }
bindgen = { version = "0.68.1", default-features = false }
cc = "1.0"
cf-rustracing = "1.0.1"
cf-rustracing = "1.1"
cf-rustracing-jaeger = "1.1"
clap = "4.4"
darling = "0.20.10"
Expand Down
1 change: 1 addition & 0 deletions foundations/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ tracing = [
"dep:futures-util",
"dep:tokio",
"dep:serde",
"dep:slab",
]

# Enables metrics functionality.
Expand Down
2 changes: 1 addition & 1 deletion foundations/src/telemetry/log/field_filtering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ macro_rules! filter {
}};
}

impl<'s, F: Filter> Serializer for FieldFilteringSerializer<'s, F> {
impl<F: Filter> Serializer for FieldFilteringSerializer<'_, F> {
fn emit_arguments(&mut self, key: Key, val: &Arguments) -> slog::Result {
filter!(self.emit_arguments(key, val))
}
Expand Down
26 changes: 26 additions & 0 deletions foundations/src/telemetry/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,21 @@ impl RouteMap {
.boxed()
}),
});

#[cfg(feature = "tracing")]
self.set(TelemetryServerRoute {
path: "/debug/traces".into(),
methods: vec![Method::GET],
handler: Box::new(|_, settings| {
async move {
into_response(
"application/json; charset=utf-8",
tracing::output_traces(settings).await,
)
}
.boxed()
}),
});
}

fn set(&mut self, route: TelemetryServerRoute) {
Expand Down Expand Up @@ -280,3 +295,14 @@ mod memory_profiling {
profiler(settings)?.heap_stats()
}
}

#[cfg(feature = "tracing")]
mod tracing {
use super::*;
use crate::telemetry::tracing::init::TracingHarness;
use crate::Result;

pub(super) async fn output_traces(_settings: Arc<TelemetrySettings>) -> Result<String> {
Ok(TracingHarness::get().active_roots.get_active_traces())
}
}
4 changes: 2 additions & 2 deletions foundations/src/telemetry/telemetry_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub struct WithTelemetryContext<'f, T> {
ctx: TelemetryContext,
}

impl<'f, T> Future for WithTelemetryContext<'f, T> {
impl<T> Future for WithTelemetryContext<'_, T> {
type Output = T;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Expand All @@ -51,7 +51,7 @@ pub struct WithTelemetryContextLocal<'f, T> {
ctx: TelemetryContext,
}

impl<'f, T> Future for WithTelemetryContextLocal<'f, T> {
impl<T> Future for WithTelemetryContextLocal<'_, T> {
type Output = T;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Expand Down
11 changes: 10 additions & 1 deletion foundations/src/telemetry/tracing/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ static NOOP_HARNESS: Lazy<TracingHarness> = Lazy::new(|| {

#[cfg(feature = "testing")]
test_tracer_scope_stack: Default::default(),

#[cfg(feature = "telemetry-server")]
active_roots: Default::default(),
}
});

Expand All @@ -37,6 +40,9 @@ pub(crate) struct TracingHarness {

#[cfg(feature = "testing")]
pub(crate) test_tracer_scope_stack: ScopeStack<Tracer>,

#[cfg(feature = "telemetry-server")]
pub(crate) active_roots: crate::telemetry::tracing::live::ActiveRoots,
}

impl TracingHarness {
Expand All @@ -53,7 +59,7 @@ impl TracingHarness {
}

#[cfg(not(feature = "testing"))]
pub(crate) fn tracer(&'static self) -> &Tracer {
pub(crate) fn tracer(&'static self) -> &'static Tracer {
&self.tracer
}
}
Expand Down Expand Up @@ -95,6 +101,9 @@ pub(crate) fn init(

#[cfg(feature = "testing")]
test_tracer_scope_stack: Default::default(),

#[cfg(feature = "telemetry-server")]
active_roots: Default::default(),
};

let _ = HARNESS.set(harness);
Expand Down
70 changes: 68 additions & 2 deletions foundations/src/telemetry/tracing/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,77 @@ use std::sync::Arc;

pub(crate) type Tracer = cf_rustracing::Tracer<BoxSampler<SpanContextState>, SpanContextState>;

#[cfg(not(feature = "telemetry-server"))]
mod span_inner {
use super::*;

/// Shared span with mutability.
#[derive(Clone, Debug)]
pub(crate) struct SharedSpanInner(Arc<parking_lot::RwLock<Span>>);

impl SharedSpanInner {
pub(crate) fn new(span: Span) -> Self {
Self(Arc::new(parking_lot::RwLock::new(span)))
}
}

impl From<SharedSpanInner> for Arc<parking_lot::RwLock<Span>> {
fn from(value: SharedSpanInner) -> Self {
value.0
}
}

impl std::ops::Deref for SharedSpanInner {
type Target = Arc<parking_lot::RwLock<Span>>;

fn deref(&self) -> &Self::Target {
&self.0
}
}
}

#[cfg(feature = "telemetry-server")]
mod span_inner {
use super::*;
use crate::telemetry::tracing::live::SharedSpanHandle;

/// Shared span with mutability and additional reference tracking for
/// ad-hoc inspection.
#[derive(Clone, Debug)]
pub(crate) struct SharedSpanInner(SharedSpanHandle);

impl SharedSpanInner {
pub(crate) fn new(span: Span) -> Self {
Self(
TracingHarness::get()
.active_roots
.track(Arc::new(parking_lot::RwLock::new(span))),
)
}
}

impl From<SharedSpanInner> for Arc<parking_lot::RwLock<Span>> {
fn from(value: SharedSpanInner) -> Self {
Arc::clone(&value.0)
}
}

impl std::ops::Deref for SharedSpanInner {
type Target = SharedSpanHandle;

fn deref(&self) -> &Self::Target {
&self.0
}
}
}

use self::span_inner::SharedSpanInner;

#[derive(Debug, Clone)]
pub(crate) struct SharedSpan {
// NOTE: we intentionally use a lock without poisoning here to not
// panic the threads if they just share telemetry with failed thread.
pub(crate) inner: Arc<parking_lot::RwLock<Span>>,
pub(crate) inner: SharedSpanInner,
// NOTE: store sampling flag separately, so we don't need to acquire lock
// every time we need to check the flag.
is_sampled: bool,
Expand All @@ -26,7 +92,7 @@ impl From<Span> for SharedSpan {
let is_sampled = inner.is_sampled();

Self {
inner: Arc::new(parking_lot::RwLock::new(inner)),
inner: SharedSpanInner::new(inner),
is_sampled,
}
}
Expand Down
94 changes: 94 additions & 0 deletions foundations/src/telemetry/tracing/live/event_output.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
//! Outputs telemetry spans in Chrome JSON trace format (i.e. the format used by about:tracing).
use super::SharedSpanHandle;
use std::time::SystemTime;

/// Outputs a slice of shared spans as a Chrome JSON trace log.
pub(crate) fn spans_to_trace_events(epoch: SystemTime, spans: &[SharedSpanHandle]) -> String {
use cf_rustracing::span::InspectableSpan;

let mut log_builder = TraceLogBuilder::new();

let end_timestamp = epoch
.elapsed()
.ok()
.and_then(|x| u64::try_from(x.as_micros()).ok())
.unwrap_or(u64::MAX);

for span in spans {
let span_ref = span.read();
let span_state = span_ref.context().unwrap().state();
let trace_id = span_state.trace_id().to_string();
let name = span_ref.operation_name();

let start_ts = span_ref
.start_time()
.duration_since(epoch)
.ok()
.and_then(|x| u64::try_from(x.as_micros()).ok())
.unwrap_or_default();

let end_ts = span_ref
.finish_time()
.and_then(|x| x.duration_since(epoch).ok())
.and_then(|x| u64::try_from(x.as_micros()).ok())
.unwrap_or(end_timestamp);

log_builder.write_event(&trace_id, name, "", TraceEventType::Begin, start_ts);
log_builder.write_event(&trace_id, name, "", TraceEventType::End, end_ts);
}

log_builder.finalize(end_timestamp)
}

#[derive(Copy, Clone)]
enum TraceEventType {
Begin,
End,
}

fn escape(s: &str) -> String {
s.escape_default().to_string()
}

struct TraceLogBuilder {
out: String,
}

impl TraceLogBuilder {
fn new() -> Self {
TraceLogBuilder {
out: "[".to_string(),
}
}

fn write_event(
&mut self,
trace_id: &str,
name: &str,
category: &str,
event_type: TraceEventType,
timestamp_us: u64,
) {
self.out.push_str(&format!(
"{{\"pid\":1,\"name\":\"{}\",\"cat\":\"{}\",\"ph\":\"{}\",\"ts\":{},\"id\":\"{}\"}},",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to also emit the trace tags as args, but I understand if you don't have time to implement this right now.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tags are typed, so we'd need to generate JSON objects with serde or something rather than this quick and dirty approach.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, let's leave that for a future iteration

escape(name),
escape(category),
match event_type {
TraceEventType::Begin => "B",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Despite the name, async events don't have the right semantics for what we want to express here. Regular events are enough for the visualisation.

TraceEventType::End => "E",
},
timestamp_us,
trace_id,
));
}

fn finalize(mut self, end_timestamp: u64) -> String {
self.out.push_str(&format!(
"{{\"pid\":1,\"name\":\"Trace dump requested\",\"ph\":\"i\",\"ts\":{},\"s\":\"g\"}}",
end_timestamp,
));

self.out.push(']');
self.out
}
}
Loading
Loading