From 0e6a7c4c54916ef2ab2ad33c5c0fb4d0ae3eebec Mon Sep 17 00:00:00 2001 From: Naman Garg <0708ng@gmail.com> Date: Thu, 23 Jan 2025 00:47:05 +0530 Subject: [PATCH] chore: graceful shutdown --- src/main.rs | 53 ++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 38 insertions(+), 15 deletions(-) diff --git a/src/main.rs b/src/main.rs index 9741efb..943f775 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,5 @@ +use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration}; + use clap::{arg, ArgGroup, Parser}; use dotenv::dotenv; use error::Error; @@ -5,28 +7,24 @@ use http::{StatusCode, Uri}; use hyper::service::service_fn; use hyper::{server::conn::http1, Request, Response}; use hyper_util::rt::TokioIo; -use jsonrpsee::http_client::transport::HttpBackend; -use jsonrpsee::http_client::{HttpBody, HttpClient, HttpClientBuilder}; -use jsonrpsee::server::Server; -use jsonrpsee::RpcModule; +use jsonrpsee::{ + http_client::{transport::HttpBackend, HttpBody, HttpClient, HttpClientBuilder}, + server::Server, + RpcModule, +}; use metrics::ServerMetrics; use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle}; use metrics_util::layers::{PrefixLayer, Stack}; use opentelemetry::global; use opentelemetry_otlp::WithExportConfig; -use opentelemetry_sdk::propagation::TraceContextPropagator; -use opentelemetry_sdk::trace::Config; -use opentelemetry_sdk::Resource; +use opentelemetry_sdk::{propagation::TraceContextPropagator, trace::Config, Resource}; use proxy::ProxyLayer; use reth_rpc_layer::{AuthClientLayer, AuthClientService, JwtSecret}; use server::{EngineApiServer, EthEngineApi, HttpClientWrapper}; use std::net::AddrParseError; -use std::sync::Arc; -use std::time::Duration; -use std::{net::SocketAddr, path::PathBuf}; use tokio::net::TcpListener; -use tracing::error; -use tracing::{info, Level}; +use tokio::signal::unix::{signal as unix_signal, SignalKind}; +use tracing::{error, info, Level}; use tracing_subscriber::EnvFilter; mod error; @@ -159,7 +157,7 @@ async fn main() -> Result<()> { None }; - // telemetry setup + // Telemetry setup if args.tracing { init_tracing(&args.otlp_endpoint); } @@ -199,6 +197,7 @@ async fn main() -> Result<()> { let builder_client = create_client(&args.builder_url, builder_jwt_secret, args.builder_timeout)?; + // Construct the RPC module let eth_engine_api = EthEngineApi::new( Arc::new(l2_client), Arc::new(builder_client), @@ -210,13 +209,14 @@ async fn main() -> Result<()> { .merge(eth_engine_api.into_rpc()) .map_err(|e| Error::InitRPCServer(e.to_string()))?; - // server setup + // Build and start the server info!("Starting server on :{}", args.rpc_port); let service_builder = tower::ServiceBuilder::new().layer(ProxyLayer::new( args.l2_url .parse::() .map_err(|e| Error::InvalidArgs(e.to_string()))?, )); + let server = Server::builder() .set_http_middleware(service_builder) .build( @@ -226,8 +226,30 @@ async fn main() -> Result<()> { ) .await .map_err(|e| Error::InitRPCServer(e.to_string()))?; + let handle = server.start(module); - handle.stopped().await; + let stop_handle = handle.clone(); + + // Capture SIGINT and SIGTERM + let mut sigint = + unix_signal(SignalKind::interrupt()).map_err(|e| Error::InitRPCServer(e.to_string()))?; + let mut sigterm = + unix_signal(SignalKind::terminate()).map_err(|e| Error::InitRPCServer(e.to_string()))?; + + tokio::select! { + _ = handle.stopped() => { + // The server has already shut down by itself + info!("Server stopped"); + } + _ = sigint.recv() => { + info!("Received SIGINT, shutting down gracefully..."); + let _ = stop_handle.stop(); + } + _ = sigterm.recv() => { + info!("Received SIGTERM, shutting down gracefully..."); + let _ = stop_handle.stop(); + } + } Ok(()) } @@ -246,6 +268,7 @@ fn create_client( .request_timeout(Duration::from_millis(timeout)) .build(url) .map_err(|e| Error::InitRPCClient(e.to_string()))?; + Ok(HttpClientWrapper::new(client, url.to_string())) }