Skip to content

Commit

Permalink
feat(recorder): make TapeRecorder copyable (#14)
Browse files Browse the repository at this point in the history
  • Loading branch information
soehrl authored Nov 14, 2024
1 parent a3afc05 commit e6a091b
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 99 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- This CHANGELOG file that will contain all notable changes to this project ([#2](https://github.com/soehrl/tracing-tape/pull/2/))
- Derive the `Debug`, `Clone`, and `Copy` traits for all structs in the `tracing-tape` crate ([#11](https://github.com/soehrl/tracing-tape/pull/11/))
- Store parent kind of a span explicitly (bumps format version to `0.1`) ([#13](https://github.com/soehrl/tracing-tape/pull/13/))
- Make `TapeRecorder` clonable ([#14](https://github.com/soehrl/tracing-tape/pull/14/))

### Fixed
- Parsing of *SpanExit* records ([#3](https://github.com/soehrl/tracing-tape/pull/3/))
Expand Down
213 changes: 114 additions & 99 deletions tracing-tape-recorder/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ use std::{
hint,
io::Write,
path::Path,
sync::atomic::{AtomicPtr, AtomicU64, Ordering},
sync::{
atomic::{AtomicPtr, AtomicU64, Ordering},
Arc,
},
time::Instant,
};

Expand All @@ -37,7 +40,9 @@ use tracing_subscriber::registry::LookupSpan;
use tracing_tape::{
intro::Intro,
record::{
field_type, parent_kind, CallsiteFieldRecord, CallsiteRecord, EventRecord, EventValueRecord, SpanCloseRecord, SpanEnterRecord, SpanExitRecord, SpanFollowsRecord, SpanOpenRecord, SpanOpenRecord2, SpanValueRecord
field_type, parent_kind, CallsiteFieldRecord, CallsiteRecord, EventRecord,
EventValueRecord, SpanCloseRecord, SpanEnterRecord, SpanExitRecord, SpanFollowsRecord,
SpanOpenRecord, SpanOpenRecord2, SpanValueRecord,
},
};
use zerocopy::AsBytes;
Expand Down Expand Up @@ -124,7 +129,7 @@ impl Drop for Chapter {
}

#[derive(Debug)]
pub struct TapeRecorder {
struct TapeRecorderInner {
file: File,
offset: AtomicU64,
init_instant: Instant,
Expand All @@ -136,35 +141,7 @@ pub struct TapeRecorder {
random_state: ahash::RandomState,
}

const INTRO_SIZE: usize = std::mem::size_of::<Intro>();

impl Default for TapeRecorder {
fn default() -> Self {
let exe = std::env::current_exe().ok();
let name = exe
.as_ref()
.and_then(|path| path.file_name())
.map(|name| name.to_string_lossy())
.unwrap_or(Cow::Borrowed("trace"));

let time = time::OffsetDateTime::now_local()
.ok()
.unwrap_or_else(time::OffsetDateTime::now_utc);
let format = time::macros::format_description!(
"[year]-[month]-[day]_[weekday repr:short]_[hour]-[minute]-[second]"
);

let time_format = time
.format(&format)
.ok()
.unwrap_or_else(|| time.unix_timestamp().to_string());

let file_path = format!("{}_{}.tape", name, time_format);
return Self::with_file(file_path).unwrap();
}
}

impl Drop for TapeRecorder {
impl Drop for TapeRecorderInner {
fn drop(&mut self) {
let offset = self.offset.load(Ordering::Relaxed);
let chapter_bytes = offset & self.chapter_offset_mask;
Expand All @@ -174,37 +151,7 @@ impl Drop for TapeRecorder {
}
}

impl TapeRecorder {
fn with_file<P: AsRef<Path>>(path: P) -> std::io::Result<Self> {
let mut file = File::create_new(path)?;

let now_system = time::OffsetDateTime::now_local()
.ok()
.unwrap_or_else(time::OffsetDateTime::now_utc);
let now_instant = Instant::now();

let chapter_size: u32 = 1024 * 1024;
let chapter_size_pot = chapter_size.ilog2() as u8;

let intro = Intro::new(chapter_size_pot, now_system.unix_timestamp_nanos());
file.write_all(intro.as_bytes())?;

Ok(Self {
file,
offset: AtomicU64::new(0),
init_instant: now_instant,

chapter_size,
chapter_size_pot: chapter_size.ilog2() as u8,
chapter_offset_mask: chapter_size as u64 - 1,
chapters: [
Chapter::new(chapter_size as usize, 0),
Chapter::new(chapter_size as usize, 1),
],
random_state: Default::default(),
})
}

impl TapeRecorderInner {
#[inline]
fn elapsed_nanos(&self) -> i64 {
self.init_instant.elapsed().as_nanos() as i64
Expand Down Expand Up @@ -265,8 +212,75 @@ impl TapeRecorder {
}
}

#[derive(Debug, Clone)]
pub struct TapeRecorder {
inner: Arc<TapeRecorderInner>,
}

const INTRO_SIZE: usize = std::mem::size_of::<Intro>();

impl Default for TapeRecorder {
fn default() -> Self {
let exe = std::env::current_exe().ok();
let name = exe
.as_ref()
.and_then(|path| path.file_name())
.map(|name| name.to_string_lossy())
.unwrap_or(Cow::Borrowed("trace"));

let time = time::OffsetDateTime::now_local()
.ok()
.unwrap_or_else(time::OffsetDateTime::now_utc);
let format = time::macros::format_description!(
"[year]-[month]-[day]_[weekday repr:short]_[hour]-[minute]-[second]"
);

let time_format = time
.format(&format)
.ok()
.unwrap_or_else(|| time.unix_timestamp().to_string());

let file_path = format!("{}_{}.tape", name, time_format);
return Self::with_file(file_path).unwrap();
}
}

impl TapeRecorder {
fn with_file<P: AsRef<Path>>(path: P) -> std::io::Result<Self> {
let mut file = File::create_new(path)?;

let now_system = time::OffsetDateTime::now_local()
.ok()
.unwrap_or_else(time::OffsetDateTime::now_utc);
let now_instant = Instant::now();

let chapter_size: u32 = 1024 * 1024;
let chapter_size_pot = chapter_size.ilog2() as u8;

let intro = Intro::new(chapter_size_pot, now_system.unix_timestamp_nanos());
file.write_all(intro.as_bytes())?;

Ok(Self {
inner: Arc::new(TapeRecorderInner {
file,
offset: AtomicU64::new(0),
init_instant: now_instant,

chapter_size,
chapter_size_pot: chapter_size.ilog2() as u8,
chapter_offset_mask: chapter_size as u64 - 1,
chapters: [
Chapter::new(chapter_size as usize, 0),
Chapter::new(chapter_size as usize, 1),
],
random_state: Default::default(),
}),
})
}
}

struct EventValueRecorder<'a> {
recorder: &'a TapeRecorder,
recorder: &'a TapeRecorderInner,
thread_id: u64,
}

Expand Down Expand Up @@ -328,7 +342,7 @@ impl tracing::field::Visit for EventValueRecorder<'_> {
}

struct SpanValueRecorder<'a> {
recorder: &'a TapeRecorder,
recorder: &'a TapeRecorderInner,
span_id: u64,
}

Expand Down Expand Up @@ -397,7 +411,7 @@ where
&self,
metadata: &'static tracing::Metadata<'static>,
) -> tracing::subscriber::Interest {
let id = self.random_state.hash_one(metadata.callsite());
let id = self.inner.random_state.hash_one(metadata.callsite());

let module_path = metadata.module_path().unwrap_or("");
let file = metadata.file().unwrap_or("");
Expand Down Expand Up @@ -430,7 +444,7 @@ where
id,
);

self.write(record_len, |slice| {
self.inner.write(record_len, |slice| {
let mut cursor = std::io::Cursor::new(slice);
cursor.write_all(callsite_record.as_bytes()).unwrap();
cursor.write_all(metadata.name().as_bytes()).unwrap();
Expand All @@ -443,13 +457,14 @@ where
let field_record = CallsiteFieldRecord::new(
field.name().len() as u16,
id,
self.random_state.hash_one(field.name()),
self.inner.random_state.hash_one(field.name()),
);
self.write(field_record.header.len.get() as usize, |slice| {
let mut cursor = std::io::Cursor::new(slice);
cursor.write_all(field_record.as_bytes()).unwrap();
cursor.write_all(field.name().as_bytes()).unwrap();
});
self.inner
.write(field_record.header.len.get() as usize, |slice| {
let mut cursor = std::io::Cursor::new(slice);
cursor.write_all(field_record.as_bytes()).unwrap();
cursor.write_all(field.name().as_bytes()).unwrap();
});
}

tracing::subscriber::Interest::sometimes()
Expand All @@ -460,21 +475,21 @@ where
event: &tracing::Event<'_>,
_ctx: tracing_subscriber::layer::Context<'_, S>,
) {
let timestamp = self.elapsed_nanos();
let callsite_id = self.random_state.hash_one(event.metadata().callsite());
let thread_id = self.random_state.hash_one(std::thread::current().id());
let timestamp = self.inner.elapsed_nanos();
let callsite_id = self.inner.random_state.hash_one(event.metadata().callsite());
let thread_id = self.inner.random_state.hash_one(std::thread::current().id());
let event_record = EventRecord::new(
event.metadata().fields().len() as u16,
timestamp,
callsite_id,
thread_id,
);

self.write(std::mem::size_of::<EventRecord>(), |slice| {
self.inner.write(std::mem::size_of::<EventRecord>(), |slice| {
slice.copy_from_slice(event_record.as_bytes());
});
let mut recorder = EventValueRecorder {
recorder: self,
recorder: &self.inner,
thread_id,
};
event.record(&mut recorder);
Expand All @@ -486,54 +501,54 @@ where
id: &Id,
_ctx: tracing_subscriber::layer::Context<'_, S>,
) {
let timestamp = self.elapsed_nanos();
let id = self.random_state.hash_one(id);
let callsite_id = self.random_state.hash_one(attrs.metadata().callsite());
let timestamp = self.inner.elapsed_nanos();
let id = self.inner.random_state.hash_one(id);
let callsite_id = self.inner.random_state.hash_one(attrs.metadata().callsite());
let (parent_kind, parent_id) = if let Some(parent) = attrs.parent() {
(parent_kind::EXPLICIT, self.random_state.hash_one(parent))
(parent_kind::EXPLICIT, self.inner.random_state.hash_one(parent))
} else if attrs.is_contextual() {
(parent_kind::CURRENT, 0)
} else {
(parent_kind::ROOT, 0)
};
let record = SpanOpenRecord2::new(id, parent_kind, parent_id, callsite_id, timestamp);
self.write(std::mem::size_of_val(&record), |slice| {
self.inner.write(std::mem::size_of_val(&record), |slice| {
slice.copy_from_slice(record.as_bytes());
});
let mut recorder = SpanValueRecorder {
recorder: self,
recorder: &self.inner,
span_id: id,
};
attrs.record(&mut recorder);
}

fn on_enter(&self, id: &Id, _ctx: tracing_subscriber::layer::Context<'_, S>) {
let timestamp = self.elapsed_nanos();
let id = self.random_state.hash_one(id);
let thread_id = self.random_state.hash_one(std::thread::current().id());
let timestamp = self.inner.elapsed_nanos();
let id = self.inner.random_state.hash_one(id);
let thread_id = self.inner.random_state.hash_one(std::thread::current().id());

let record = SpanEnterRecord::new(id, timestamp, thread_id);
self.write(std::mem::size_of_val(&record), |slice| {
self.inner.write(std::mem::size_of_val(&record), |slice| {
slice.copy_from_slice(record.as_bytes());
});
}

fn on_exit(&self, id: &Id, _ctx: tracing_subscriber::layer::Context<'_, S>) {
let timestamp = self.elapsed_nanos();
let id = self.random_state.hash_one(id);
let timestamp = self.inner.elapsed_nanos();
let id = self.inner.random_state.hash_one(id);

let record = SpanExitRecord::new(id, timestamp);
self.write(std::mem::size_of_val(&record), |slice| {
self.inner.write(std::mem::size_of_val(&record), |slice| {
slice.copy_from_slice(record.as_bytes());
});
}

fn on_close(&self, id: Id, _ctx: tracing_subscriber::layer::Context<'_, S>) {
let timestamp = self.elapsed_nanos();
let id = self.random_state.hash_one(id);
let timestamp = self.inner.elapsed_nanos();
let id = self.inner.random_state.hash_one(id);

let record = SpanCloseRecord::new(id, timestamp);
self.write(std::mem::size_of_val(&record), |slice| {
self.inner.write(std::mem::size_of_val(&record), |slice| {
slice.copy_from_slice(record.as_bytes());
});
}
Expand All @@ -544,9 +559,9 @@ where
values: &tracing::span::Record<'_>,
_ctx: tracing_subscriber::layer::Context<'_, S>,
) {
let id = self.random_state.hash_one(id);
let id = self.inner.random_state.hash_one(id);
let mut recorder = SpanValueRecorder {
recorder: self,
recorder: &self.inner,
span_id: id,
};
values.record(&mut recorder);
Expand All @@ -558,11 +573,11 @@ where
follows: &Id,
_ctx: tracing_subscriber::layer::Context<'_, S>,
) {
let id = self.random_state.hash_one(id);
let follows = self.random_state.hash_one(follows);
let id = self.inner.random_state.hash_one(id);
let follows = self.inner.random_state.hash_one(follows);

let record = SpanFollowsRecord::new(id, follows);
self.write(std::mem::size_of_val(&record), |slice| {
self.inner.write(std::mem::size_of_val(&record), |slice| {
slice.copy_from_slice(record.as_bytes());
});
}
Expand Down

0 comments on commit e6a091b

Please sign in to comment.