Skip to content

Commit

Permalink
chore(deps): opentelemetry 0.20 (#91)
Browse files Browse the repository at this point in the history
* chore(deps): opentelemetry 0.20

* fix(otel): missing SocketAddr

* fix: clippy

* fix: clippy

* fix(deps): missing prometheus

* fix(otel): prometheus

* fix(docs): example
  • Loading branch information
fundon authored Aug 3, 2023
1 parent b551f45 commit 8b01494
Show file tree
Hide file tree
Showing 16 changed files with 200 additions and 164 deletions.
7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,12 @@ hex = "0.4.3"
rust-embed = "6.8.1"

# OpenTelemetry
opentelemetry = { version = "0.19.0", default-features = false }
opentelemetry-prometheus = { version = "0.12.0", features = [
opentelemetry = { version = "0.20.0", default-features = false }
opentelemetry-prometheus = { version = "0.13.0", features = [
"prometheus-encoding",
] }
opentelemetry-semantic-conventions = { version = "0.11.0" }
opentelemetry-semantic-conventions = { version = "0.12.0" }
prometheus = "0.13"

# Tracing
tracing = "0.1.37"
Expand Down
50 changes: 37 additions & 13 deletions examples/otel/metrics/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,17 @@ use tokio::net::TcpListener;
use opentelemetry::{
global,
sdk::{
export::metrics::aggregation,
metrics::{controllers, processors, selectors},
metrics::{self, Aggregation, Instrument, MeterProvider, Stream},
Resource,
},
KeyValue,
};

use viz::{
handlers::prometheus::{ExporterBuilder, Prometheus},
handlers::prometheus::{ExporterBuilder, Prometheus, Registry},
middleware::otel,
server::conn::http1,
Io, Request, Responder, Result, Router, Tree,
Error, Io, Request, Responder, Result, Router, Tree,
};

async fn index(_: Request) -> Result<&'static str> {
Expand All @@ -29,20 +30,39 @@ async fn main() -> Result<()> {
let listener = TcpListener::bind(addr).await?;
println!("listening on {addr}");

let exporter = {
let controller = controllers::basic(processors::factory(
selectors::simple::histogram([1.0, 2.0, 5.0, 10.0, 20.0, 50.0]),
aggregation::cumulative_temporality_selector(),
))
.build();
ExporterBuilder::new(controller).init()
let registry = Registry::new();
let (exporter, controller) = {
(
ExporterBuilder::default()
.with_registry(registry.clone())
.build()
.map_err(Error::normal)?,
metrics::new_view(
Instrument::new().name("http.server.duration"),
Stream::new().aggregation(Aggregation::ExplicitBucketHistogram {
boundaries: vec![
0.0, 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0,
7.5, 10.0,
],
record_min_max: true,
}),
)
.unwrap(),
)
};
let provider = MeterProvider::builder()
.with_reader(exporter)
.with_resource(Resource::new([KeyValue::new("service.name", "viz")]))
.with_view(controller)
.build();

global::set_meter_provider(provider.clone());

let app = Router::new()
.get("/", index)
.get("/:username", index)
.get("/metrics", Prometheus::new(exporter))
.with(otel::metrics::Config::new(&global::meter("viz")));
.get("/metrics", Prometheus::new(registry))
.with(otel::metrics::Config::new(&global::meter("otel")));
let tree = Arc::new(Tree::from(app));

loop {
Expand All @@ -57,4 +77,8 @@ async fn main() -> Result<()> {
}
});
}

// Ensure all spans have been reported
// global::shutdown_tracer_provider();
// provider.shutdown();
}
2 changes: 1 addition & 1 deletion examples/otel/tracing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ viz = { workspace = true, features = ["otel-tracing"] }

tokio = { workspace = true, features = [ "rt-multi-thread", "macros" ] }
opentelemetry.workspace = true
opentelemetry-jaeger = { version = "0.18.0", features = ["rt-tokio-current-thread"]}
opentelemetry-jaeger = { version = "0.19.0", features = ["rt-tokio-current-thread"]}
2 changes: 0 additions & 2 deletions viz-core/src/body.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
#![allow(clippy::module_name_repetitions)]

use std::{
pin::Pin,
task::{Context, Poll},
Expand Down
2 changes: 0 additions & 2 deletions viz-core/src/handler/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
//! Traits and types for handling an HTTP.

#![allow(clippy::module_name_repetitions)]

use crate::{async_trait, Future};

mod after;
Expand Down
1 change: 1 addition & 0 deletions viz-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#![doc(html_logo_url = "https://viz.rs/logo.svg")]
#![doc(html_favicon_url = "https://viz.rs/logo.svg")]
#![allow(clippy::module_name_repetitions)]
// #![forbid(unsafe_code)]
#![warn(
missing_debug_implementations,
Expand Down
2 changes: 0 additions & 2 deletions viz-core/src/middleware/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
//! Built-in Middleware.

#![allow(clippy::module_name_repetitions)]

#[cfg(feature = "cookie")]
pub mod cookie;
#[cfg(feature = "cors")]
Expand Down
135 changes: 69 additions & 66 deletions viz-core/src/middleware/otel/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,35 @@
//!
//! [`OpenTelemetry`]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/semantic_conventions/http-metrics.md

use std::time::SystemTime;
use std::{net::SocketAddr, time::SystemTime};

use http::uri::Scheme;
use opentelemetry::{
metrics::{Histogram, Meter, Unit, UpDownCounter},
Context, KeyValue,
KeyValue,
};
use opentelemetry_semantic_conventions::trace::{
HTTP_CLIENT_IP,
HTTP_FLAVOR,
HTTP_METHOD,
// HTTP_RESPONSE_CONTENT_LENGTH,
HTTP_ROUTE,
HTTP_SCHEME, // , HTTP_SERVER_NAME
HTTP_STATUS_CODE,
HTTP_TARGET,
HTTP_USER_AGENT,
NET_HOST_PORT,
NET_SOCK_PEER_ADDR,
CLIENT_ADDRESS, CLIENT_SOCKET_ADDRESS, HTTP_REQUEST_METHOD, HTTP_RESPONSE_STATUS_CODE,
HTTP_ROUTE, NETWORK_PROTOCOL_VERSION, SERVER_ADDRESS, SERVER_PORT, URL_SCHEME,
};

use super::HTTP_HOST;

use crate::{
async_trait, headers::UserAgent, Handler, IntoResponse, Request, RequestExt, Response, Result,
async_trait, Handler, IntoResponse, Request, RequestExt, Response, ResponseExt, Result,
Transform,
};

const HTTP_SERVER_ACTIVE_REQUESTS: &str = "http.server.active_requests";
const HTTP_SERVER_DURATION: &str = "http.server.duration";
const HTTP_SERVER_REQUEST_SIZE: &str = "http.server.request.size";
const HTTP_SERVER_RESPONSE_SIZE: &str = "http.server.response.size";

/// Request metrics middleware config.
#[derive(Clone, Debug)]
pub struct Config {
active_requests: UpDownCounter<i64>,
duration: Histogram<f64>,
request_size: Histogram<u64>,
response_size: Histogram<u64>,
}

impl Config {
Expand All @@ -46,18 +39,35 @@ impl Config {
pub fn new(meter: &Meter) -> Self {
let active_requests = meter
.i64_up_down_counter(HTTP_SERVER_ACTIVE_REQUESTS)
.with_description("HTTP concurrent in-flight requests per route")
.with_description(
"Measures the number of concurrent HTTP requests that are currently in-flight.",
)
.with_unit(Unit::new("{request}"))
.init();

let duration = meter
.f64_histogram(HTTP_SERVER_DURATION)
.with_description("HTTP inbound request duration per route")
.with_unit(Unit::new("ms"))
.with_description("Measures the duration of inbound HTTP requests.")
.with_unit(Unit::new("s"))
.init();

let request_size = meter
.u64_histogram(HTTP_SERVER_REQUEST_SIZE)
.with_description("Measures the size of HTTP request messages (compressed).")
.with_unit(Unit::new("By"))
.init();

let response_size = meter
.u64_histogram(HTTP_SERVER_RESPONSE_SIZE)
.with_description("Measures the size of HTTP request messages (compressed).")
.with_unit(Unit::new("By"))
.init();

Config {
active_requests,
duration,
request_size,
response_size,
}
}
}
Expand All @@ -70,6 +80,8 @@ impl<H> Transform<H> for Config {
h,
active_requests: self.active_requests.clone(),
duration: self.duration.clone(),
request_size: self.request_size.clone(),
response_size: self.response_size.clone(),
}
}
}
Expand All @@ -80,6 +92,8 @@ pub struct MetricsMiddleware<H> {
h: H,
active_requests: UpDownCounter<i64>,
duration: Histogram<f64>,
request_size: Histogram<u64>,
response_size: Histogram<u64>,
}

#[async_trait]
Expand All @@ -92,30 +106,31 @@ where

async fn call(&self, req: Request) -> Self::Output {
let timer = SystemTime::now();
let cx = Context::current();
let mut attributes = build_attributes(&req, req.route_info().pattern.as_str());

self.active_requests.add(&cx, 1, &attributes);
self.active_requests.add(1, &attributes);

self.request_size
.record(req.content_length().unwrap_or(0), &attributes);

let resp = self
.h
.call(req)
.await
.map(IntoResponse::into_response)
.map(|resp| {
self.active_requests.add(&cx, -1, &attributes);
self.active_requests.add(-1, &attributes);

attributes.push(HTTP_STATUS_CODE.i64(i64::from(resp.status().as_u16())));
attributes.push(HTTP_RESPONSE_STATUS_CODE.i64(i64::from(resp.status().as_u16())));

self.response_size
.record(resp.content_length().unwrap_or(0), &attributes);

resp
});

self.duration.record(
&cx,
timer
.elapsed()
.map(|t| t.as_secs_f64() * 1000.0)
.unwrap_or_default(),
timer.elapsed().map(|t| t.as_secs_f64()).unwrap_or_default(),
&attributes,
);

Expand All @@ -124,51 +139,39 @@ where
}

fn build_attributes(req: &Request, http_route: &str) -> Vec<KeyValue> {
let mut attributes = Vec::with_capacity(10);
attributes.push(
HTTP_SCHEME.string(
req.schema()
.or(Some(&Scheme::HTTP))
.map(ToString::to_string)
.unwrap(),
),
);
attributes.push(HTTP_FLAVOR.string(format!("{:?}", req.version())));
attributes.push(HTTP_METHOD.string(req.method().to_string()));
let mut attributes = Vec::with_capacity(5);
// <https://github.com/open-telemetry/semantic-conventions/blob/v1.21.0/docs/http/http-spans.md#http-server>
attributes.push(HTTP_ROUTE.string(http_route.to_string()));
if let Some(path_and_query) = req.uri().path_and_query() {
attributes.push(HTTP_TARGET.string(path_and_query.as_str().to_string()));
}
if let Some(host) = req.uri().host() {
attributes.push(HTTP_HOST.string(host.to_string()));
}
if let Some(user_agent) = req
.header_typed::<UserAgent>()
.as_ref()
.map(UserAgent::as_str)
{
attributes.push(HTTP_USER_AGENT.string(user_agent.to_string()));

// <https://github.com/open-telemetry/semantic-conventions/blob/v1.21.0/docs/http/http-spans.md#common-attributes>
attributes.push(HTTP_REQUEST_METHOD.string(req.method().to_string()));
attributes.push(NETWORK_PROTOCOL_VERSION.string(format!("{:?}", req.version())));

let remote_addr = req.remote_addr();
if let Some(remote_addr) = remote_addr {
attributes.push(CLIENT_ADDRESS.string(remote_addr.to_string()));
}
let realip = req.realip();
if let Some(realip) = realip {
attributes.push(HTTP_CLIENT_IP.string(realip.0.to_string()));
if let Some(realip) = req.realip().map(|value| value.0).filter(|realip| {
remote_addr
.map(SocketAddr::ip)
.map_or(true, |remoteip| &remoteip != realip)
}) {
attributes.push(CLIENT_SOCKET_ADDRESS.string(realip.to_string()));
}
// if server_name != host {
// attributes.insert(HTTP_SERVER_NAME, server_name.to_string().into());
// }
if let Some(remote_ip) = req.remote_addr().map(std::net::SocketAddr::ip) {
if realip.map_or(true, |realip| realip.0 != remote_ip) {
// Client is going through a proxy
attributes.push(NET_SOCK_PEER_ADDR.string(remote_ip.to_string()));
}

let uri = req.uri();
if let Some(host) = uri.host() {
attributes.push(SERVER_ADDRESS.string(host.to_string()));
}
if let Some(port) = req
.uri()
if let Some(port) = uri
.port_u16()
.filter(|port| *port != 80 || *port != 443)
.and_then(|port| i64::try_from(port).ok())
.filter(|port| *port != 80 && *port != 443)
{
attributes.push(NET_HOST_PORT.i64(i64::from(port)));
attributes.push(SERVER_PORT.i64(port));
}

attributes.push(URL_SCHEME.string(uri.scheme().unwrap_or(&Scheme::HTTP).to_string()));

attributes
}
3 changes: 0 additions & 3 deletions viz-core/src/middleware/otel/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
//! Opentelemetry request tracking & metrics middleware.

/// `http.host` key
pub const HTTP_HOST: opentelemetry::Key = opentelemetry::Key::from_static_str("http.host");

#[cfg(feature = "otel-metrics")]
pub mod metrics;
#[cfg(feature = "otel-tracing")]
Expand Down
Loading

0 comments on commit 8b01494

Please sign in to comment.