Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add config typing for IP/socket addresses #353

Merged
merged 4 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion event_sidecar/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ homepage = "https://github.com/casper-network/casper-sidecar/"
repository = "https://github.com/casper-network/casper-sidecar/"

[features]
additional-metrics = ["casper-event-types/additional-metrics"]
additional-metrics = ["casper-event-types/additional-metrics", "metrics/additional-metrics"]
testing = []

[dependencies]
Expand Down
4 changes: 2 additions & 2 deletions event_sidecar/src/database/writer_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use anyhow::Context;
use async_trait::async_trait;
use casper_types::AsymmetricType;
#[cfg(feature = "additional-metrics")]
use casper_event_types::metrics;
use metrics::db::DB_OPERATION_TIMES;
use itertools::Itertools;
use tokio::sync::Mutex;
use $crate::{
Expand Down Expand Up @@ -469,7 +469,7 @@ async fn save_event_log(
#[cfg(feature = "additional-metrics")]
fn observe_db_operation_time(operation_name: &str, start: Instant) {
let duration = start.elapsed();
metrics::DB_OPERATION_TIMES
DB_OPERATION_TIMES
.with_label_values(&[operation_name])
.observe(duration.as_nanos() as f64);
}
Expand Down
7 changes: 5 additions & 2 deletions event_sidecar/src/event_stream_server/sse_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,8 @@ fn stream_to_client(
stream_filter,
event_filter,
is_legacy_filter,
#[cfg(feature = "additional-metrics")]
metrics_sender,
)
}

Expand All @@ -617,6 +619,7 @@ fn build_combined_events_stream(
stream_filter: &'static Endpoint,
event_filter: &'static [EventFilter],
is_legacy_filter: bool,
#[cfg(feature = "additional-metrics")] metrics_sender: Sender<()>,
) -> impl Stream<Item = Result<WarpServerSentEvent, RecvError>> + 'static {
UnboundedReceiverStream::new(initial_events)
.map(move |event| {
Expand All @@ -642,7 +645,7 @@ fn build_combined_events_stream(
)
.await;
#[cfg(feature = "additional-metrics")]
if let Some(_) = fitlered_data {
if fitlered_data.is_some() {
let _ = sender.clone().send(()).await;
}
#[allow(clippy::let_and_return)]
Expand Down Expand Up @@ -973,7 +976,7 @@ mod tests {

let stream_filter = path_to_filter(path_filter, true).unwrap();
#[cfg(feature = "additional-metrics")]
let (tx, rx) = channel(1000);
let (tx, _rx) = channel(1000);
let (filter, is_legacy_filter) = get_filter(path_filter, true).unwrap();
// Collect the events emitted by `stream_to_client()` - should not contain duplicates.
let received_events: Vec<Result<WarpServerSentEvent, RecvError>> = stream_to_client(
Expand Down
4 changes: 2 additions & 2 deletions event_sidecar/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ mod utils;
use std::collections::HashMap;
use std::process::ExitCode;
use std::sync::Arc;
use std::{net::IpAddr, path::PathBuf, str::FromStr, time::Duration};
use std::{path::PathBuf, time::Duration};

use crate::types::config::LegacySseApiTag;
use crate::{
Expand Down Expand Up @@ -256,7 +256,7 @@ fn builder(
inbound_sse_data_sender: Sender<SseEvent>,
) -> Result<EventListenerBuilder, Error> {
let node_interface = NodeConnectionInterface {
ip_address: IpAddr::from_str(&connection.ip_address)?,
ip_address: connection.ip_address,
sse_port: connection.sse_port,
rest_port: connection.rest_port,
};
Expand Down
9 changes: 6 additions & 3 deletions event_sidecar/src/testing/testing_config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#[cfg(test)]
use portpicker::Port;
use std::sync::{Arc, Mutex};
use std::{
net::{IpAddr, Ipv4Addr},
sync::{Arc, Mutex},
};
use tempfile::TempDir;

use crate::types::config::{Connection, RestApiServerConfig, SseEventServerConfig, StorageConfig};
Expand Down Expand Up @@ -83,14 +86,14 @@ impl TestingConfig {

pub(crate) fn add_connection(
&mut self,
ip_address: Option<String>,
ip_address: Option<IpAddr>,
sse_port: Option<u16>,
rest_port: Option<u16>,
) -> Port {
let random_port_for_sse = get_port();
let random_port_for_rest = get_port();
let connection = Connection {
ip_address: ip_address.unwrap_or_else(|| "127.0.0.1".to_string()),
ip_address: ip_address.unwrap_or_else(|| IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))),
sse_port: sse_port.unwrap_or(random_port_for_sse),
rest_port: rest_port.unwrap_or(random_port_for_rest),
max_attempts: 2,
Expand Down
13 changes: 8 additions & 5 deletions event_sidecar/src/types/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use serde::Deserialize;
use std::net::IpAddr;
use std::string::ToString;
use std::vec;
use std::{
Expand Down Expand Up @@ -71,7 +72,7 @@ impl SseEventServerConfig {

#[derive(Clone, Debug, Deserialize, PartialEq, Eq)]
pub struct Connection {
pub ip_address: String,
pub ip_address: IpAddr,
pub sse_port: u16,
pub rest_port: u16,
pub max_attempts: usize,
Expand Down Expand Up @@ -347,12 +348,14 @@ impl Default for AdminApiServerConfig {

#[cfg(any(feature = "testing", test))]
mod tests {
use std::net::Ipv4Addr;

use super::*;

impl Connection {
pub fn example_connection_1() -> Connection {
Connection {
ip_address: "127.0.0.1".to_string(),
ip_address: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
sse_port: 18101,
rest_port: 14101,
max_attempts: 10,
Expand All @@ -367,7 +370,7 @@ mod tests {

pub fn example_connection_2() -> Connection {
Connection {
ip_address: "127.0.0.1".to_string(),
ip_address: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
sse_port: 18102,
rest_port: 14102,
max_attempts: 10,
Expand All @@ -382,7 +385,7 @@ mod tests {

pub fn example_connection_3() -> Connection {
Connection {
ip_address: "127.0.0.1".to_string(),
ip_address: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
sse_port: 18103,
rest_port: 14103,
max_attempts: 10,
Expand All @@ -399,7 +402,7 @@ mod tests {
impl Default for Connection {
fn default() -> Self {
Self {
ip_address: "127.0.0.1".to_string(),
ip_address: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
sse_port: 18101,
rest_port: 14101,
allow_partial_connection: false,
Expand Down
4 changes: 2 additions & 2 deletions event_sidecar/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#[cfg(feature = "additional-metrics")]
use crate::metrics::EVENTS_PROCESSED_PER_SECOND;
use metrics::db::EVENTS_PROCESSED_PER_SECOND;
#[cfg(feature = "additional-metrics")]
use std::sync::Arc;
#[cfg(feature = "additional-metrics")]
Expand Down Expand Up @@ -136,7 +136,7 @@ pub fn start_metrics_thread(module_name: String) -> Sender<()> {
let metrics_data_for_thread = metrics_data.clone();
tokio::spawn(async move {
let metrics_data = metrics_data_for_thread;
while let Some(_) = metrics_queue_rx.recv().await {
while metrics_queue_rx.recv().await.is_some() {
let mut guard = metrics_data.lock().await;
guard.observed_events += 1;
drop(guard);
Expand Down
5 changes: 4 additions & 1 deletion metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,7 @@ repository = "https://github.com/casper-network/casper-sidecar/"

[dependencies]
once_cell = { workspace = true }
prometheus = { version = "0.13.3", features = ["process"] }
prometheus = { version = "0.13.3", features = ["process"] }

[features]
additional-metrics = []
2 changes: 2 additions & 0 deletions metrics/src/db.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use super::REGISTRY;
use once_cell::sync::Lazy;
#[cfg(feature = "additional-metrics")]
use prometheus::GaugeVec;
use prometheus::{HistogramOpts, HistogramVec, Opts};

const RAW_DATA_SIZE_BUCKETS: &[f64; 8] = &[
Expand Down
6 changes: 3 additions & 3 deletions rpc_sidecar/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::SpeculativeExecConfig;
/// Default binding address for the JSON-RPC HTTP server.
///
/// Uses a fixed port per node, but binds on any interface.
const DEFAULT_ADDRESS: &str = "0.0.0.0:0";
const DEFAULT_ADDRESS: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
/// Default rate limit in qps.
const DEFAULT_QPS_LIMIT: u64 = 100;
/// Default max body bytes. This is 2.5MB which should be able to accommodate the largest valid
Expand Down Expand Up @@ -74,7 +74,7 @@ pub struct RpcConfig {
/// Setting to enable the HTTP server.
pub enable_server: bool,
/// Address to bind JSON-RPC HTTP server to.
pub address: String,
pub address: SocketAddr,
/// Maximum rate limit in queries per second.
pub qps_limit: u64,
/// Maximum number of bytes to accept in a single request body.
Expand All @@ -88,7 +88,7 @@ impl RpcConfig {
pub fn new() -> Self {
RpcConfig {
enable_server: true,
address: DEFAULT_ADDRESS.to_string(),
address: DEFAULT_ADDRESS,
qps_limit: DEFAULT_QPS_LIMIT,
max_body_bytes: DEFAULT_MAX_BODY_BYTES,
cors_origin: DEFAULT_CORS_ORIGIN.to_string(),
Expand Down
23 changes: 3 additions & 20 deletions rpc_sidecar/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,7 @@ use node_client::FramedNodeClient;
pub use node_client::{Error as ClientError, NodeClient};
pub use speculative_exec_config::Config as SpeculativeExecConfig;
pub use speculative_exec_server::run as run_speculative_exec_server;
use std::process::ExitCode;
use std::{
net::{SocketAddr, ToSocketAddrs},
sync::Arc,
};
use std::{net::SocketAddr, process::ExitCode, sync::Arc};
use tracing::warn;

/// Minimal casper protocol version supported by this sidecar.
Expand Down Expand Up @@ -98,26 +94,13 @@ async fn run_speculative_exec(
Ok(())
}

fn start_listening(address: &str) -> anyhow::Result<ServerBuilder<AddrIncoming>> {
let address = resolve_address(address).map_err(|error| {
warn!(%error, %address, "failed to start HTTP server, cannot parse address");
error
})?;

Server::try_bind(&address).map_err(|error| {
fn start_listening(address: &SocketAddr) -> anyhow::Result<ServerBuilder<AddrIncoming>> {
Server::try_bind(address).map_err(|error| {
warn!(%error, %address, "failed to start HTTP server");
error.into()
})
}

/// Parses a network address from a string, with DNS resolution.
fn resolve_address(address: &str) -> anyhow::Result<SocketAddr> {
address
.to_socket_addrs()?
.next()
.ok_or_else(|| anyhow::anyhow!("failed to resolve address"))
}

fn encode_request(req: &BinaryRequest, id: u16) -> Result<Vec<u8>, bytesrepr::Error> {
let header = BinaryRequestHeader::new(SUPPORTED_PROTOCOL_VERSION, req.tag(), id);
let mut bytes = Vec::with_capacity(header.serialized_length() + req.serialized_length());
Expand Down
2 changes: 1 addition & 1 deletion rpc_sidecar/src/node_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -966,7 +966,7 @@ impl FramedNodeClient {
current_attempt - 1
);
}
warn!(%err, "failed to connect to the node, waiting {wait}ms before retrying");
warn!(%err, "failed to connect to node {}, waiting {wait}ms before retrying", config.address);
tokio::time::sleep(Duration::from_millis(wait)).await;
wait = (wait * config.exponential_backoff.coefficient)
.min(config.exponential_backoff.max_delay_ms);
Expand Down
8 changes: 5 additions & 3 deletions rpc_sidecar/src/speculative_exec_config.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use std::net::{IpAddr, Ipv4Addr, SocketAddr};

use datasize::DataSize;
use serde::Deserialize;

/// Default binding address for the speculative execution RPC HTTP server.
///
/// Uses a fixed port per node, but binds on any interface.
const DEFAULT_ADDRESS: &str = "0.0.0.0:1";
const DEFAULT_ADDRESS: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 1);
/// Default rate limit in qps.
const DEFAULT_QPS_LIMIT: u64 = 1;
/// Default max body bytes (2.5MB).
Expand All @@ -20,7 +22,7 @@ pub struct Config {
/// Setting to enable the HTTP server.
pub enable_server: bool,
/// Address to bind JSON-RPC speculative execution server to.
pub address: String,
pub address: SocketAddr,
/// Maximum rate limit in queries per second.
pub qps_limit: u64,
/// Maximum number of bytes to accept in a single request body.
Expand All @@ -34,7 +36,7 @@ impl Config {
pub fn new() -> Self {
Config {
enable_server: false,
address: DEFAULT_ADDRESS.to_string(),
address: DEFAULT_ADDRESS,
qps_limit: DEFAULT_QPS_LIMIT,
max_body_bytes: DEFAULT_MAX_BODY_BYTES,
cors_origin: DEFAULT_CORS_ORIGIN.to_string(),
Expand Down
10 changes: 7 additions & 3 deletions sidecar/src/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,10 @@ impl Component for RpcApiComponent {

#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::{
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::Arc,
};

use super::*;
use crate::config::SidecarConfig;
Expand Down Expand Up @@ -379,15 +382,16 @@ mod tests {
let mut config = all_components_all_enabled();
config.rpc_server.as_mut().unwrap().node_client =
NodeClientConfig::new_with_port_and_retries(port, 1);
config.rpc_server.as_mut().unwrap().main_server.address = format!("0.0.0.0:{}", port);
config.rpc_server.as_mut().unwrap().main_server.address =
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), port);
config
.rpc_server
.as_mut()
.unwrap()
.speculative_exec_server
.as_mut()
.unwrap()
.address = format!("0.0.0.0:{}", port);
.address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), port);
let res = component.prepare_component_task(&config).await;
assert!(res.is_ok());
assert!(res.unwrap().is_some());
Expand Down
2 changes: 1 addition & 1 deletion types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ edition = "2021"
description = "Types for casper-event-listener library"
license-file = "../LICENSE"
documentation = "README.md"
homepage = "https://github.com/casper-network/casper-sidecar/"
homepage = "https://github.com/casper-network/casper-sidecar/"
repository = "https://github.com/casper-network/casper-sidecar/"

[dependencies]
Expand Down
Loading