From 05dffc705402b204f79269dadbfe0d17f34a17b6 Mon Sep 17 00:00:00 2001 From: avalonche Date: Wed, 22 Jan 2025 07:09:28 +1100 Subject: [PATCH] Separate metrics server from rpc server --- src/main.rs | 69 ++++++++++++++++++++++++++++++++++++++++++++++------ src/proxy.rs | 33 ++++++------------------- 2 files changed, 69 insertions(+), 33 deletions(-) diff --git a/src/main.rs b/src/main.rs index b305a70..a05efcd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,13 +1,16 @@ use clap::{arg, ArgGroup, Parser}; use dotenv::dotenv; use error::Error; -use http::Uri; +use http::{StatusCode, Uri}; +use hyper::service::service_fn; +use hyper::{server::conn::http1, Request, Response}; +use hyper_util::rt::TokioIo; use jsonrpsee::http_client::transport::HttpBackend; -use jsonrpsee::http_client::{HttpClient, HttpClientBuilder}; +use jsonrpsee::http_client::{HttpBody, HttpClient, HttpClientBuilder}; use jsonrpsee::server::Server; use jsonrpsee::RpcModule; use metrics::ServerMetrics; -use metrics_exporter_prometheus::PrometheusBuilder; +use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle}; use metrics_util::layers::{PrefixLayer, Stack}; use opentelemetry::global; use opentelemetry_otlp::WithExportConfig; @@ -17,9 +20,11 @@ use opentelemetry_sdk::Resource; use proxy::ProxyLayer; use reth_rpc_layer::{AuthClientLayer, AuthClientService, JwtSecret}; use server::{EngineApiServer, EthEngineApi, HttpClientWrapper}; +use std::net::AddrParseError; use std::sync::Arc; use std::time::Duration; use std::{net::SocketAddr, path::PathBuf}; +use tokio::net::TcpListener; use tracing::error; use tracing::{info, Level}; use tracing_subscriber::EnvFilter; @@ -77,6 +82,14 @@ struct Args { #[arg(long, env, default_value = "false")] metrics: bool, + /// Host to run the metrics server on + #[arg(long, env, default_value = "0.0.0.0")] + metrics_host: String, + + /// Port to run the metrics server on + #[arg(long, env, default_value = "9090")] + metrics_port: u16, + /// OTLP endpoint #[arg(long, env, default_value = "http://localhost:4317")] otlp_endpoint: String, @@ -108,7 +121,7 @@ async fn main() -> Result<()> { .with_ansi(false) // Disable colored logging .init(); - let (metrics, handler) = if args.metrics { + let metrics = if args.metrics { let recorder = PrometheusBuilder::new().build_recorder(); let handle = recorder.handle(); @@ -118,9 +131,16 @@ async fn main() -> Result<()> { .install() .map_err(|e| Error::InitMetrics(e.to_string()))?; - (Some(Arc::new(ServerMetrics::default())), Some(handle)) + // Start the metrics server + let metrics_addr = format!("{}:{}", args.metrics_host, args.metrics_port); + let addr: SocketAddr = metrics_addr + .parse() + .map_err(|e: AddrParseError| Error::InitMetrics(e.to_string()))?; + tokio::spawn(init_metrics_server(addr, handle)); // Run the metrics server in a separate task + + Some(Arc::new(ServerMetrics::default())) } else { - (None, None) + None }; // telemetry setup @@ -180,7 +200,6 @@ async fn main() -> Result<()> { args.l2_url .parse::() .map_err(|e| Error::InvalidArgs(e.to_string()))?, - handler, )); let server = Server::builder() .set_http_middleware(service_builder) @@ -237,6 +256,42 @@ fn init_tracing(endpoint: &str) { } } +async fn init_metrics_server(addr: SocketAddr, handle: PrometheusHandle) -> Result<()> { + let listener = TcpListener::bind(addr) + .await + .map_err(|e| Error::InitMetrics(e.to_string()))?; + info!("Metrics server running on {}", addr); + + loop { + match listener.accept().await { + Ok((stream, _)) => { + let handle = handle.clone(); // Clone the handle for each connection + tokio::task::spawn(async move { + let service = service_fn(move |_req: Request| { + let response = match _req.uri().path() { + "/metrics" => Response::new(HttpBody::from(handle.render())), + _ => Response::builder() + .status(StatusCode::NOT_FOUND) + .body(HttpBody::empty()) + .unwrap(), + }; + async { Ok::<_, hyper::Error>(response) } + }); + + let io = TokioIo::new(stream); + + if let Err(err) = http1::Builder::new().serve_connection(io, service).await { + error!(message = "Error serving metrics connection", error = %err); + } + }); + } + Err(e) => { + error!(message = "Error accepting connection", error = %e); + } + } + } +} + #[cfg(test)] mod tests { use assert_cmd::Command; diff --git a/src/proxy.rs b/src/proxy.rs index aa9fb9d..350e7fb 100644 --- a/src/proxy.rs +++ b/src/proxy.rs @@ -1,11 +1,9 @@ -use http::header::CONTENT_TYPE; -use http::{HeaderValue, Uri}; +use http::Uri; use hyper_util::client::legacy::connect::HttpConnector; use hyper_util::client::legacy::Client; use hyper_util::rt::TokioExecutor; use jsonrpsee::core::{http_helpers, BoxError}; use jsonrpsee::http_client::{HttpBody, HttpRequest, HttpResponse}; -use metrics_exporter_prometheus::PrometheusHandle; use std::task::{Context, Poll}; use std::{future::Future, pin::Pin}; use tower::{Layer, Service}; @@ -16,12 +14,11 @@ const MULTIPLEX_METHODS: [&str; 2] = ["engine_", "eth_sendRawTransaction"]; #[derive(Debug, Clone)] pub struct ProxyLayer { target_url: Uri, - handle: Option, } impl ProxyLayer { - pub fn new(target_url: Uri, handle: Option) -> Self { - ProxyLayer { target_url, handle } + pub fn new(target_url: Uri) -> Self { + ProxyLayer { target_url } } } @@ -33,7 +30,6 @@ impl Layer for ProxyLayer { inner, client: Client::builder(TokioExecutor::new()).build_http(), target_url: self.target_url.clone(), - handle: self.handle.clone(), } } } @@ -43,7 +39,6 @@ pub struct ProxyService { inner: S, client: Client, target_url: Uri, - handle: Option, } impl Service> for ProxyService @@ -63,22 +58,9 @@ where } fn call(&mut self, req: HttpRequest) -> Self::Future { - match req.uri().path() { - "/healthz" => return Box::pin(async { Ok(Self::Response::new(HttpBody::from("OK"))) }), - "/metrics" => { - if let Some(handle) = self.handle.as_ref() { - let metrics = handle.render(); - return Box::pin(async { - let mut response = Self::Response::new(HttpBody::from(metrics)); - response - .headers_mut() - .insert(CONTENT_TYPE, HeaderValue::from_static("text/plain")); - Ok::(response) - }); - } - } - _ => {} - }; + if req.uri().path() == "/healthz" { + return Box::pin(async { Ok(Self::Response::new(HttpBody::from("OK"))) }); + } let target_url = self.target_url.clone(); let client = self.client.clone(); @@ -254,8 +236,7 @@ mod tests { /// Spawn a new RPC server with a proxy layer. async fn spawn_proxy_server() -> ServerHandle { let addr = format!("{ADDR}:{PORT}"); - let proxy_layer = - ProxyLayer::new(format!("http://{ADDR}:{PROXY_PORT}").parse().unwrap(), None); + let proxy_layer = ProxyLayer::new(format!("http://{ADDR}:{PROXY_PORT}").parse().unwrap()); // Create a layered server let server = ServerBuilder::default() .set_http_middleware(tower::ServiceBuilder::new().layer(proxy_layer))