Skip to content

Commit

Permalink
metrics module, other rearrangements
Browse files Browse the repository at this point in the history
  • Loading branch information
blind-oracle committed Apr 27, 2024
1 parent b36ff00 commit b72a60b
Show file tree
Hide file tree
Showing 12 changed files with 409 additions and 66 deletions.
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ async-trait = "0.1"
axum = "0.7"
axum-extra = "0.9"
axum-server = { version = "0.6", features = ["tls-rustls"] }
bytes = "1.5"
candid = "0.10"
clap = { version = "4.5", features = ["derive", "string"] }
clap_derive = "4.5"
Expand All @@ -27,6 +28,8 @@ hickory-resolver = { version = "0.24", features = [
"dnssec-ring",
] }
http = "1.1"
http-body = "1.0"
http-body-util = "0.1"
humantime = "2.1"
hyper = "1.3"
hyper-util = "0.1"
Expand Down Expand Up @@ -78,6 +81,7 @@ tracing-core = "0.1"
tracing-serde = "0.1"
tracing-subscriber = "0.3"
url = "2.5"
uuid = { version = "1.8", features = ["v7"] }
x509-parser = "0.16"

[dev-dependencies]
Expand Down
1 change: 1 addition & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ mod cache;
mod cli;
mod core;
mod http;
mod metrics;
mod policy;
mod routing;
mod tls;
Expand Down
164 changes: 164 additions & 0 deletions src/metrics/body.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
use bytes::Buf;
use http_body::{Body, Frame, SizeHint};
use std::{
pin::{pin, Pin},
sync::atomic::AtomicBool,
task::{Context, Poll},
};

// Body that counts the bytes streamed
pub struct CountingBody<D, E> {
inner: Pin<Box<dyn Body<Data = D, Error = E> + Send + 'static>>,
callback: Box<dyn Fn(u64, Result<(), String>) + Send + 'static>,
callback_done: AtomicBool,
expected_size: Option<u64>,
bytes_sent: u64,
}

impl<D, E> CountingBody<D, E> {
pub fn new<B>(inner: B, callback: impl Fn(u64, Result<(), String>) + Send + 'static) -> Self
where
B: Body<Data = D, Error = E> + Send + 'static,
D: Buf,
{
let expected_size = inner.size_hint().exact();

let mut body = Self {
inner: Box::pin(inner),
callback: Box::new(callback),
callback_done: AtomicBool::new(false),
expected_size,
bytes_sent: 0,
};

// If the size is known and zero - just execute the callback now,
// otherwise it won't be called anywhere else
if expected_size == Some(0) {
body.do_callback(Ok(()));
}

body
}

// It seems that in certain cases the users of Body trait can cause us to run callbacks more than once.
// Use AtomicBool to prevent that and run it at most once.
pub fn do_callback(&mut self, res: Result<(), String>) {
// Make locking scope shorter
{
let done = self.callback_done.get_mut();
if *done {
return;
}
*done = true;
}

(self.callback)(self.bytes_sent, res);
}
}

impl<D, E> Body for CountingBody<D, E>
where
D: Buf,
E: std::string::ToString,
{
type Data = D;
type Error = E;

fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
let poll = pin!(&mut self.inner).poll_frame(cx);

match &poll {
// There is still some data available
Poll::Ready(Some(v)) => match v {
Ok(buf) => {
// Ignore if it's not a data frame for now.
// It can also be trailers that are uncommon
if buf.is_data() {
self.bytes_sent += buf.data_ref().unwrap().remaining() as u64;

// Check if we already got what was expected
if Some(self.bytes_sent) >= self.expected_size {
self.do_callback(Ok(()));
}
}
}

// Error occured, execute callback
Err(e) => {
// Error is not Copy/Clone so use string instead
self.do_callback(Err(e.to_string()));
}
},

// Nothing left, execute callback
Poll::Ready(None) => {
self.do_callback(Ok(()));
}

// Do nothing
Poll::Pending => {}
}

poll
}

fn size_hint(&self) -> SizeHint {
self.inner.size_hint()
}
}

#[cfg(test)]
mod test {
use super::*;
use http_body_util::BodyExt;

#[tokio::test]
async fn test_body_stream() {
let data = b"foobarblahblahfoobarblahblahfoobarblahblahfoobarblahblahfoobarblahblahfoobarblahblahfoobarblahblahfoobarblahblahfoobarblahblahfoobarblahblahfoobarblahblahfoobarblahblahfoobarblahblahfoobarblahblahfoobarblahblahfoobarblahblahfoobarblahblahfoobarblahblahfoobarblahblahfoobarblahblahfoobarblahblahfoobarblahblahfoobarblahblahfoobarblahblahfoobarblahblah";
let mut stream = tokio_util::io::ReaderStream::new(&data[..]);
let body = axum::body::Body::from_stream(stream);

let (tx, rx) = std::sync::mpsc::channel();

let callback = move |response_size: u64, _body_result: Result<(), String>| {
let _ = tx.send(response_size);
};

let body = CountingBody::new(body, callback);

// Check that the body streams the same data back
let body = body.collect().await.unwrap().to_bytes().to_vec();
assert_eq!(body, data);

// Check that the counting body got right number
let count = rx.recv().unwrap();
assert_eq!(count, data.len() as u64);
}

#[tokio::test]
async fn test_body_full() {
let data = vec![0; 512];

let buf = bytes::Bytes::from_iter(data.clone());
let body = http_body_util::Full::new(buf);

let (tx, rx) = std::sync::mpsc::channel();

let callback = move |response_size: u64, _body_result: Result<(), String>| {
let _ = tx.send(response_size);
};

let body = CountingBody::new(body, callback);

// Check that the body streams the same data back
let body = body.collect().await.unwrap().to_bytes().to_vec();
assert_eq!(body, data);

// Check that the counting body got right number
let count = rx.recv().unwrap();
assert_eq!(count, data.len() as u64);
}
}
100 changes: 100 additions & 0 deletions src/metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
pub mod body;

use std::{
net::SocketAddr,
pin::Pin,
sync::{atomic::AtomicBool, Arc},
time::Instant,
};

use axum::{
extract::{Extension, Request, State},
middleware::Next,
response::{IntoResponse, Response},
};
use prometheus::{
proto::MetricFamily, register_histogram_vec_with_registry,
register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
register_int_gauge_with_registry, Encoder, HistogramOpts, HistogramVec, IntCounterVec,
IntGauge, IntGaugeVec, Registry, TextEncoder,
};
use tracing::warn;

use crate::routing::middleware::request_id::RequestId;
use body::CountingBody;

const KB: f64 = 1024.0;

pub const HTTP_DURATION_BUCKETS: &[f64] = &[0.05, 0.2, 1.0, 2.0];
pub const HTTP_REQUEST_SIZE_BUCKETS: &[f64] = &[128.0, KB, 2.0 * KB, 4.0 * KB, 8.0 * KB];
pub const HTTP_RESPONSE_SIZE_BUCKETS: &[f64] = &[1.0 * KB, 8.0 * KB, 64.0 * KB, 256.0 * KB];

// https://prometheus.io/docs/instrumenting/exposition_formats/#basic-info
const PROMETHEUS_CONTENT_TYPE: &str = "text/plain; version=0.0.4";

#[derive(Clone)]
pub struct HttpMetricParams {
pub counter: IntCounterVec,
pub durationer: HistogramVec,
pub request_sizer: HistogramVec,
pub response_sizer: HistogramVec,
}

impl HttpMetricParams {
pub fn new(registry: &Registry) -> Self {
const LABELS_HTTP: &[&str] = &["domain", "status_code", "error_cause", "cache_status"];

Self {
counter: register_int_counter_vec_with_registry!(
format!("http_total"),
format!("Counts occurrences of requests"),
LABELS_HTTP,
registry
)
.unwrap(),

durationer: register_histogram_vec_with_registry!(
format!("http_duration_sec"),
format!("Records the duration of request processing in seconds"),
LABELS_HTTP,
HTTP_DURATION_BUCKETS.to_vec(),
registry
)
.unwrap(),

request_sizer: register_histogram_vec_with_registry!(
format!("http_request_size"),
format!("Records the size of requests"),
LABELS_HTTP,
HTTP_REQUEST_SIZE_BUCKETS.to_vec(),
registry
)
.unwrap(),

response_sizer: register_histogram_vec_with_registry!(
format!("http_response_size"),
format!("Records the size of responses"),
LABELS_HTTP,
HTTP_RESPONSE_SIZE_BUCKETS.to_vec(),
registry
)
.unwrap(),
}
}
}

pub async fn middleware(
State(state): State<HttpMetricParams>,
Extension(request_id): Extension<RequestId>,
request: Request,
next: Next,
) -> impl IntoResponse {
let response = next.run(request).await;
let (parts, body) = response.into_parts();

let record_metrics =
move |response_size: u64, _body_result: Result<(), String>| warn!("{}", response_size);

let body = CountingBody::new(body, record_metrics);
Response::from_parts(parts, body)
}
2 changes: 1 addition & 1 deletion src/routing/middleware/geoip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl GeoIp {
}
}

pub async fn geoip(
pub async fn middleware(
State(geoip): State<Arc<GeoIp>>,
Extension(conn_info): Extension<Arc<ConnInfo>>,
mut request: Request,
Expand Down
Loading

0 comments on commit b72a60b

Please sign in to comment.