Skip to content

Commit

Permalink
Merge pull request #54 from flashbots/metrics-server
Browse files Browse the repository at this point in the history
Separate metrics server from rpc server
  • Loading branch information
avalonche authored Jan 22, 2025
2 parents feef2c2 + 05dffc7 commit fff953d
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 33 deletions.
69 changes: 62 additions & 7 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use clap::{arg, ArgGroup, Parser};
use dotenv::dotenv;
use error::Error;
use http::Uri;
use http::{StatusCode, Uri};
use hyper::service::service_fn;
use hyper::{server::conn::http1, Request, Response};
use hyper_util::rt::TokioIo;
use jsonrpsee::http_client::transport::HttpBackend;
use jsonrpsee::http_client::{HttpClient, HttpClientBuilder};
use jsonrpsee::http_client::{HttpBody, HttpClient, HttpClientBuilder};
use jsonrpsee::server::Server;
use jsonrpsee::RpcModule;
use metrics::ServerMetrics;
use metrics_exporter_prometheus::PrometheusBuilder;
use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
use metrics_util::layers::{PrefixLayer, Stack};
use opentelemetry::global;
use opentelemetry_otlp::WithExportConfig;
Expand All @@ -17,9 +20,11 @@ use opentelemetry_sdk::Resource;
use proxy::ProxyLayer;
use reth_rpc_layer::{AuthClientLayer, AuthClientService, JwtSecret};
use server::{EngineApiServer, EthEngineApi, HttpClientWrapper};
use std::net::AddrParseError;
use std::sync::Arc;
use std::time::Duration;
use std::{net::SocketAddr, path::PathBuf};
use tokio::net::TcpListener;
use tracing::error;
use tracing::{info, Level};
use tracing_subscriber::EnvFilter;
Expand Down Expand Up @@ -77,6 +82,14 @@ struct Args {
#[arg(long, env, default_value = "false")]
metrics: bool,

/// Host to run the metrics server on
#[arg(long, env, default_value = "0.0.0.0")]
metrics_host: String,

/// Port to run the metrics server on
#[arg(long, env, default_value = "9090")]
metrics_port: u16,

/// OTLP endpoint
#[arg(long, env, default_value = "http://localhost:4317")]
otlp_endpoint: String,
Expand Down Expand Up @@ -108,7 +121,7 @@ async fn main() -> Result<()> {
.with_ansi(false) // Disable colored logging
.init();

let (metrics, handler) = if args.metrics {
let metrics = if args.metrics {
let recorder = PrometheusBuilder::new().build_recorder();
let handle = recorder.handle();

Expand All @@ -118,9 +131,16 @@ async fn main() -> Result<()> {
.install()
.map_err(|e| Error::InitMetrics(e.to_string()))?;

(Some(Arc::new(ServerMetrics::default())), Some(handle))
// Start the metrics server
let metrics_addr = format!("{}:{}", args.metrics_host, args.metrics_port);
let addr: SocketAddr = metrics_addr
.parse()
.map_err(|e: AddrParseError| Error::InitMetrics(e.to_string()))?;
tokio::spawn(init_metrics_server(addr, handle)); // Run the metrics server in a separate task

Some(Arc::new(ServerMetrics::default()))
} else {
(None, None)
None
};

// telemetry setup
Expand Down Expand Up @@ -180,7 +200,6 @@ async fn main() -> Result<()> {
args.l2_url
.parse::<Uri>()
.map_err(|e| Error::InvalidArgs(e.to_string()))?,
handler,
));
let server = Server::builder()
.set_http_middleware(service_builder)
Expand Down Expand Up @@ -237,6 +256,42 @@ fn init_tracing(endpoint: &str) {
}
}

async fn init_metrics_server(addr: SocketAddr, handle: PrometheusHandle) -> Result<()> {
let listener = TcpListener::bind(addr)
.await
.map_err(|e| Error::InitMetrics(e.to_string()))?;
info!("Metrics server running on {}", addr);

loop {
match listener.accept().await {
Ok((stream, _)) => {
let handle = handle.clone(); // Clone the handle for each connection
tokio::task::spawn(async move {
let service = service_fn(move |_req: Request<hyper::body::Incoming>| {
let response = match _req.uri().path() {
"/metrics" => Response::new(HttpBody::from(handle.render())),
_ => Response::builder()
.status(StatusCode::NOT_FOUND)
.body(HttpBody::empty())
.unwrap(),
};
async { Ok::<_, hyper::Error>(response) }
});

let io = TokioIo::new(stream);

if let Err(err) = http1::Builder::new().serve_connection(io, service).await {
error!(message = "Error serving metrics connection", error = %err);
}
});
}
Err(e) => {
error!(message = "Error accepting connection", error = %e);
}
}
}
}

#[cfg(test)]
mod tests {
use assert_cmd::Command;
Expand Down
33 changes: 7 additions & 26 deletions src/proxy.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use http::header::CONTENT_TYPE;
use http::{HeaderValue, Uri};
use http::Uri;
use hyper_util::client::legacy::connect::HttpConnector;
use hyper_util::client::legacy::Client;
use hyper_util::rt::TokioExecutor;
use jsonrpsee::core::{http_helpers, BoxError};
use jsonrpsee::http_client::{HttpBody, HttpRequest, HttpResponse};
use metrics_exporter_prometheus::PrometheusHandle;
use std::task::{Context, Poll};
use std::{future::Future, pin::Pin};
use tower::{Layer, Service};
Expand All @@ -16,12 +14,11 @@ const MULTIPLEX_METHODS: [&str; 2] = ["engine_", "eth_sendRawTransaction"];
#[derive(Debug, Clone)]
pub struct ProxyLayer {
target_url: Uri,
handle: Option<PrometheusHandle>,
}

impl ProxyLayer {
pub fn new(target_url: Uri, handle: Option<PrometheusHandle>) -> Self {
ProxyLayer { target_url, handle }
pub fn new(target_url: Uri) -> Self {
ProxyLayer { target_url }
}
}

Expand All @@ -33,7 +30,6 @@ impl<S> Layer<S> for ProxyLayer {
inner,
client: Client::builder(TokioExecutor::new()).build_http(),
target_url: self.target_url.clone(),
handle: self.handle.clone(),
}
}
}
Expand All @@ -43,7 +39,6 @@ pub struct ProxyService<S> {
inner: S,
client: Client<HttpConnector, HttpBody>,
target_url: Uri,
handle: Option<PrometheusHandle>,
}

impl<S> Service<HttpRequest<HttpBody>> for ProxyService<S>
Expand All @@ -63,22 +58,9 @@ where
}

fn call(&mut self, req: HttpRequest<HttpBody>) -> Self::Future {
match req.uri().path() {
"/healthz" => return Box::pin(async { Ok(Self::Response::new(HttpBody::from("OK"))) }),
"/metrics" => {
if let Some(handle) = self.handle.as_ref() {
let metrics = handle.render();
return Box::pin(async {
let mut response = Self::Response::new(HttpBody::from(metrics));
response
.headers_mut()
.insert(CONTENT_TYPE, HeaderValue::from_static("text/plain"));
Ok::<HttpResponse, BoxError>(response)
});
}
}
_ => {}
};
if req.uri().path() == "/healthz" {
return Box::pin(async { Ok(Self::Response::new(HttpBody::from("OK"))) });
}

let target_url = self.target_url.clone();
let client = self.client.clone();
Expand Down Expand Up @@ -254,8 +236,7 @@ mod tests {
/// Spawn a new RPC server with a proxy layer.
async fn spawn_proxy_server() -> ServerHandle {
let addr = format!("{ADDR}:{PORT}");
let proxy_layer =
ProxyLayer::new(format!("http://{ADDR}:{PROXY_PORT}").parse().unwrap(), None);
let proxy_layer = ProxyLayer::new(format!("http://{ADDR}:{PROXY_PORT}").parse().unwrap());
// Create a layered server
let server = ServerBuilder::default()
.set_http_middleware(tower::ServiceBuilder::new().layer(proxy_layer))
Expand Down

0 comments on commit fff953d

Please sign in to comment.