diff --git a/event_sidecar/Cargo.toml b/event_sidecar/Cargo.toml index 1c8fc9c1..3cfa6b24 100644 --- a/event_sidecar/Cargo.toml +++ b/event_sidecar/Cargo.toml @@ -11,7 +11,7 @@ homepage = "https://github.com/casper-network/casper-sidecar/" repository = "https://github.com/casper-network/casper-sidecar/" [features] -additional-metrics = ["casper-event-types/additional-metrics"] +additional-metrics = ["casper-event-types/additional-metrics", "metrics/additional-metrics"] testing = [] [dependencies] diff --git a/event_sidecar/src/database/writer_generator.rs b/event_sidecar/src/database/writer_generator.rs index bcdccb45..eab89356 100644 --- a/event_sidecar/src/database/writer_generator.rs +++ b/event_sidecar/src/database/writer_generator.rs @@ -9,7 +9,7 @@ use anyhow::Context; use async_trait::async_trait; use casper_types::AsymmetricType; #[cfg(feature = "additional-metrics")] -use casper_event_types::metrics; +use metrics::db::DB_OPERATION_TIMES; use itertools::Itertools; use tokio::sync::Mutex; use $crate::{ @@ -469,7 +469,7 @@ async fn save_event_log( #[cfg(feature = "additional-metrics")] fn observe_db_operation_time(operation_name: &str, start: Instant) { let duration = start.elapsed(); - metrics::DB_OPERATION_TIMES + DB_OPERATION_TIMES .with_label_values(&[operation_name]) .observe(duration.as_nanos() as f64); } diff --git a/event_sidecar/src/event_stream_server/sse_server.rs b/event_sidecar/src/event_stream_server/sse_server.rs index d5014944..ce8b03f1 100644 --- a/event_sidecar/src/event_stream_server/sse_server.rs +++ b/event_sidecar/src/event_stream_server/sse_server.rs @@ -604,6 +604,8 @@ fn stream_to_client( stream_filter, event_filter, is_legacy_filter, + #[cfg(feature = "additional-metrics")] + metrics_sender, ) } @@ -617,6 +619,7 @@ fn build_combined_events_stream( stream_filter: &'static Endpoint, event_filter: &'static [EventFilter], is_legacy_filter: bool, + #[cfg(feature = "additional-metrics")] metrics_sender: Sender<()>, ) -> impl Stream> + 'static { UnboundedReceiverStream::new(initial_events) .map(move |event| { @@ -642,7 +645,7 @@ fn build_combined_events_stream( ) .await; #[cfg(feature = "additional-metrics")] - if let Some(_) = fitlered_data { + if fitlered_data.is_some() { let _ = sender.clone().send(()).await; } #[allow(clippy::let_and_return)] @@ -973,7 +976,7 @@ mod tests { let stream_filter = path_to_filter(path_filter, true).unwrap(); #[cfg(feature = "additional-metrics")] - let (tx, rx) = channel(1000); + let (tx, _rx) = channel(1000); let (filter, is_legacy_filter) = get_filter(path_filter, true).unwrap(); // Collect the events emitted by `stream_to_client()` - should not contain duplicates. let received_events: Vec> = stream_to_client( diff --git a/event_sidecar/src/lib.rs b/event_sidecar/src/lib.rs index a5840924..b804a688 100644 --- a/event_sidecar/src/lib.rs +++ b/event_sidecar/src/lib.rs @@ -19,7 +19,7 @@ mod utils; use std::collections::HashMap; use std::process::ExitCode; use std::sync::Arc; -use std::{net::IpAddr, path::PathBuf, str::FromStr, time::Duration}; +use std::{path::PathBuf, time::Duration}; use crate::types::config::LegacySseApiTag; use crate::{ @@ -256,7 +256,7 @@ fn builder( inbound_sse_data_sender: Sender, ) -> Result { let node_interface = NodeConnectionInterface { - ip_address: IpAddr::from_str(&connection.ip_address)?, + ip_address: connection.ip_address, sse_port: connection.sse_port, rest_port: connection.rest_port, }; diff --git a/event_sidecar/src/testing/testing_config.rs b/event_sidecar/src/testing/testing_config.rs index 8621f4d7..9ed3d579 100644 --- a/event_sidecar/src/testing/testing_config.rs +++ b/event_sidecar/src/testing/testing_config.rs @@ -1,6 +1,9 @@ #[cfg(test)] use portpicker::Port; -use std::sync::{Arc, Mutex}; +use std::{ + net::{IpAddr, Ipv4Addr}, + sync::{Arc, Mutex}, +}; use tempfile::TempDir; use crate::types::config::{Connection, RestApiServerConfig, SseEventServerConfig, StorageConfig}; @@ -83,14 +86,14 @@ impl TestingConfig { pub(crate) fn add_connection( &mut self, - ip_address: Option, + ip_address: Option, sse_port: Option, rest_port: Option, ) -> Port { let random_port_for_sse = get_port(); let random_port_for_rest = get_port(); let connection = Connection { - ip_address: ip_address.unwrap_or_else(|| "127.0.0.1".to_string()), + ip_address: ip_address.unwrap_or_else(|| IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))), sse_port: sse_port.unwrap_or(random_port_for_sse), rest_port: rest_port.unwrap_or(random_port_for_rest), max_attempts: 2, diff --git a/event_sidecar/src/types/config.rs b/event_sidecar/src/types/config.rs index 1bb19b24..be42df64 100644 --- a/event_sidecar/src/types/config.rs +++ b/event_sidecar/src/types/config.rs @@ -1,4 +1,5 @@ use serde::Deserialize; +use std::net::IpAddr; use std::string::ToString; use std::vec; use std::{ @@ -71,7 +72,7 @@ impl SseEventServerConfig { #[derive(Clone, Debug, Deserialize, PartialEq, Eq)] pub struct Connection { - pub ip_address: String, + pub ip_address: IpAddr, pub sse_port: u16, pub rest_port: u16, pub max_attempts: usize, @@ -347,12 +348,14 @@ impl Default for AdminApiServerConfig { #[cfg(any(feature = "testing", test))] mod tests { + use std::net::Ipv4Addr; + use super::*; impl Connection { pub fn example_connection_1() -> Connection { Connection { - ip_address: "127.0.0.1".to_string(), + ip_address: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), sse_port: 18101, rest_port: 14101, max_attempts: 10, @@ -367,7 +370,7 @@ mod tests { pub fn example_connection_2() -> Connection { Connection { - ip_address: "127.0.0.1".to_string(), + ip_address: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), sse_port: 18102, rest_port: 14102, max_attempts: 10, @@ -382,7 +385,7 @@ mod tests { pub fn example_connection_3() -> Connection { Connection { - ip_address: "127.0.0.1".to_string(), + ip_address: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), sse_port: 18103, rest_port: 14103, max_attempts: 10, @@ -399,7 +402,7 @@ mod tests { impl Default for Connection { fn default() -> Self { Self { - ip_address: "127.0.0.1".to_string(), + ip_address: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), sse_port: 18101, rest_port: 14101, allow_partial_connection: false, diff --git a/event_sidecar/src/utils.rs b/event_sidecar/src/utils.rs index 052c4c40..5d847f82 100644 --- a/event_sidecar/src/utils.rs +++ b/event_sidecar/src/utils.rs @@ -1,5 +1,5 @@ #[cfg(feature = "additional-metrics")] -use crate::metrics::EVENTS_PROCESSED_PER_SECOND; +use metrics::db::EVENTS_PROCESSED_PER_SECOND; #[cfg(feature = "additional-metrics")] use std::sync::Arc; #[cfg(feature = "additional-metrics")] @@ -136,7 +136,7 @@ pub fn start_metrics_thread(module_name: String) -> Sender<()> { let metrics_data_for_thread = metrics_data.clone(); tokio::spawn(async move { let metrics_data = metrics_data_for_thread; - while let Some(_) = metrics_queue_rx.recv().await { + while metrics_queue_rx.recv().await.is_some() { let mut guard = metrics_data.lock().await; guard.observed_events += 1; drop(guard); diff --git a/metrics/Cargo.toml b/metrics/Cargo.toml index 9914e35f..f89c5af8 100644 --- a/metrics/Cargo.toml +++ b/metrics/Cargo.toml @@ -12,4 +12,7 @@ repository = "https://github.com/casper-network/casper-sidecar/" [dependencies] once_cell = { workspace = true } -prometheus = { version = "0.13.3", features = ["process"] } \ No newline at end of file +prometheus = { version = "0.13.3", features = ["process"] } + +[features] +additional-metrics = [] diff --git a/metrics/src/db.rs b/metrics/src/db.rs index 649c1583..8fc9177e 100644 --- a/metrics/src/db.rs +++ b/metrics/src/db.rs @@ -1,5 +1,7 @@ use super::REGISTRY; use once_cell::sync::Lazy; +#[cfg(feature = "additional-metrics")] +use prometheus::GaugeVec; use prometheus::{HistogramOpts, HistogramVec, Opts}; const RAW_DATA_SIZE_BUCKETS: &[f64; 8] = &[ diff --git a/rpc_sidecar/src/config.rs b/rpc_sidecar/src/config.rs index 482df230..5cd3d192 100644 --- a/rpc_sidecar/src/config.rs +++ b/rpc_sidecar/src/config.rs @@ -12,7 +12,7 @@ use crate::SpeculativeExecConfig; /// Default binding address for the JSON-RPC HTTP server. /// /// Uses a fixed port per node, but binds on any interface. -const DEFAULT_ADDRESS: &str = "0.0.0.0:0"; +const DEFAULT_ADDRESS: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0); /// Default rate limit in qps. const DEFAULT_QPS_LIMIT: u64 = 100; /// Default max body bytes. This is 2.5MB which should be able to accommodate the largest valid @@ -74,7 +74,7 @@ pub struct RpcConfig { /// Setting to enable the HTTP server. pub enable_server: bool, /// Address to bind JSON-RPC HTTP server to. - pub address: String, + pub address: SocketAddr, /// Maximum rate limit in queries per second. pub qps_limit: u64, /// Maximum number of bytes to accept in a single request body. @@ -88,7 +88,7 @@ impl RpcConfig { pub fn new() -> Self { RpcConfig { enable_server: true, - address: DEFAULT_ADDRESS.to_string(), + address: DEFAULT_ADDRESS, qps_limit: DEFAULT_QPS_LIMIT, max_body_bytes: DEFAULT_MAX_BODY_BYTES, cors_origin: DEFAULT_CORS_ORIGIN.to_string(), diff --git a/rpc_sidecar/src/lib.rs b/rpc_sidecar/src/lib.rs index 870d1625..02e18c57 100644 --- a/rpc_sidecar/src/lib.rs +++ b/rpc_sidecar/src/lib.rs @@ -24,11 +24,7 @@ use node_client::FramedNodeClient; pub use node_client::{Error as ClientError, NodeClient}; pub use speculative_exec_config::Config as SpeculativeExecConfig; pub use speculative_exec_server::run as run_speculative_exec_server; -use std::process::ExitCode; -use std::{ - net::{SocketAddr, ToSocketAddrs}, - sync::Arc, -}; +use std::{net::SocketAddr, process::ExitCode, sync::Arc}; use tracing::warn; /// Minimal casper protocol version supported by this sidecar. @@ -98,26 +94,13 @@ async fn run_speculative_exec( Ok(()) } -fn start_listening(address: &str) -> anyhow::Result> { - let address = resolve_address(address).map_err(|error| { - warn!(%error, %address, "failed to start HTTP server, cannot parse address"); - error - })?; - - Server::try_bind(&address).map_err(|error| { +fn start_listening(address: &SocketAddr) -> anyhow::Result> { + Server::try_bind(address).map_err(|error| { warn!(%error, %address, "failed to start HTTP server"); error.into() }) } -/// Parses a network address from a string, with DNS resolution. -fn resolve_address(address: &str) -> anyhow::Result { - address - .to_socket_addrs()? - .next() - .ok_or_else(|| anyhow::anyhow!("failed to resolve address")) -} - fn encode_request(req: &BinaryRequest, id: u16) -> Result, bytesrepr::Error> { let header = BinaryRequestHeader::new(SUPPORTED_PROTOCOL_VERSION, req.tag(), id); let mut bytes = Vec::with_capacity(header.serialized_length() + req.serialized_length()); diff --git a/rpc_sidecar/src/node_client.rs b/rpc_sidecar/src/node_client.rs index 125809e6..0228438c 100644 --- a/rpc_sidecar/src/node_client.rs +++ b/rpc_sidecar/src/node_client.rs @@ -966,7 +966,7 @@ impl FramedNodeClient { current_attempt - 1 ); } - warn!(%err, "failed to connect to the node, waiting {wait}ms before retrying"); + warn!(%err, "failed to connect to node {}, waiting {wait}ms before retrying", config.address); tokio::time::sleep(Duration::from_millis(wait)).await; wait = (wait * config.exponential_backoff.coefficient) .min(config.exponential_backoff.max_delay_ms); diff --git a/rpc_sidecar/src/speculative_exec_config.rs b/rpc_sidecar/src/speculative_exec_config.rs index dea42d0c..0bf6b5e5 100644 --- a/rpc_sidecar/src/speculative_exec_config.rs +++ b/rpc_sidecar/src/speculative_exec_config.rs @@ -1,10 +1,12 @@ +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + use datasize::DataSize; use serde::Deserialize; /// Default binding address for the speculative execution RPC HTTP server. /// /// Uses a fixed port per node, but binds on any interface. -const DEFAULT_ADDRESS: &str = "0.0.0.0:1"; +const DEFAULT_ADDRESS: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 1); /// Default rate limit in qps. const DEFAULT_QPS_LIMIT: u64 = 1; /// Default max body bytes (2.5MB). @@ -20,7 +22,7 @@ pub struct Config { /// Setting to enable the HTTP server. pub enable_server: bool, /// Address to bind JSON-RPC speculative execution server to. - pub address: String, + pub address: SocketAddr, /// Maximum rate limit in queries per second. pub qps_limit: u64, /// Maximum number of bytes to accept in a single request body. @@ -34,7 +36,7 @@ impl Config { pub fn new() -> Self { Config { enable_server: false, - address: DEFAULT_ADDRESS.to_string(), + address: DEFAULT_ADDRESS, qps_limit: DEFAULT_QPS_LIMIT, max_body_bytes: DEFAULT_MAX_BODY_BYTES, cors_origin: DEFAULT_CORS_ORIGIN.to_string(), diff --git a/sidecar/src/component.rs b/sidecar/src/component.rs index c0e2d8db..afb9c9fb 100644 --- a/sidecar/src/component.rs +++ b/sidecar/src/component.rs @@ -244,7 +244,10 @@ impl Component for RpcApiComponent { #[cfg(test)] mod tests { - use std::sync::Arc; + use std::{ + net::{IpAddr, Ipv4Addr, SocketAddr}, + sync::Arc, + }; use super::*; use crate::config::SidecarConfig; @@ -379,7 +382,8 @@ mod tests { let mut config = all_components_all_enabled(); config.rpc_server.as_mut().unwrap().node_client = NodeClientConfig::new_with_port_and_retries(port, 1); - config.rpc_server.as_mut().unwrap().main_server.address = format!("0.0.0.0:{}", port); + config.rpc_server.as_mut().unwrap().main_server.address = + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), port); config .rpc_server .as_mut() @@ -387,7 +391,7 @@ mod tests { .speculative_exec_server .as_mut() .unwrap() - .address = format!("0.0.0.0:{}", port); + .address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), port); let res = component.prepare_component_task(&config).await; assert!(res.is_ok()); assert!(res.unwrap().is_some()); diff --git a/types/Cargo.toml b/types/Cargo.toml index 19a710d6..8d5bf75f 100644 --- a/types/Cargo.toml +++ b/types/Cargo.toml @@ -5,7 +5,7 @@ edition = "2021" description = "Types for casper-event-listener library" license-file = "../LICENSE" documentation = "README.md" -homepage = "https://github.com/casper-network/casper-sidecar/" +homepage = "https://github.com/casper-network/casper-sidecar/" repository = "https://github.com/casper-network/casper-sidecar/" [dependencies]