Skip to content

Commit

Permalink
refactor: block watcher committer merger (#83)
Browse files Browse the repository at this point in the history
  • Loading branch information
segfault-magnet authored May 28, 2024
1 parent 5943d62 commit 40ad238
Show file tree
Hide file tree
Showing 21 changed files with 504 additions and 622 deletions.
29 changes: 14 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ serde = { version = "1.0", default-features = false }
serde_json = { version = "1.0", default-features = false }
sqlx = { version = "0.7.4", default-features = false }
tai64 = { version = "4.0.0", default-features = false }
testcontainers = { version = "0.16", default-features = false }
humantime = { version = "2.1", default-features = false }
testcontainers = { version = "0.17", default-features = false }
thiserror = { version = "1.0", default-features = false }
tokio = { version = "1.37", default-features = false }
tokio-util = { version = "0.7", default-features = false }
Expand Down
1 change: 1 addition & 0 deletions committer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ clap = { workspace = true, features = ["derive"] }
config = { workspace = true, features = ["toml", "async"] }
eth = { workspace = true }
fuel = { workspace = true }
humantime = { workspace = true }
metrics = { workspace = true }
ports = { workspace = true }
serde = { workspace = true }
Expand Down
32 changes: 22 additions & 10 deletions committer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ use url::Url;

#[derive(Debug, Clone, Deserialize)]
pub struct Config {
pub eth: EthConfig,
pub fuel: FuelConfig,
pub app: AppConfig,
pub eth: Eth,
pub fuel: Fuel,
pub app: App,
}

#[derive(Debug, Clone, Deserialize)]
pub struct FuelConfig {
pub struct Fuel {
/// URL to a fuel-core graphql endpoint.
#[serde(deserialize_with = "parse_url")]
pub graphql_endpoint: Url,
Expand All @@ -23,7 +23,7 @@ pub struct FuelConfig {
}

#[derive(Debug, Clone, Deserialize)]
pub struct EthConfig {
pub struct Eth {
/// The secret key authorized by the L1 bridging contracts to post block commitments.
pub wallet_key: String,
/// URL to a Ethereum RPC endpoint.
Expand Down Expand Up @@ -59,28 +59,40 @@ where
}

#[derive(Debug, Clone, Deserialize)]
pub struct AppConfig {
pub struct App {
/// Port used by the started server
pub port: u16,
/// IPv4 address on which the server will listen for connections
pub host: Ipv4Addr,
/// Postgres database configuration
pub db: DbConfig,
/// How often to check the latest fuel block
#[serde(deserialize_with = "human_readable_duration")]
pub block_check_interval: Duration,
}

fn human_readable_duration<'de, D>(deserializer: D) -> Result<Duration, D::Error>
where
D: serde::Deserializer<'de>,
{
let duration_str: String = Deserialize::deserialize(deserializer).unwrap();
humantime::parse_duration(&duration_str).map_err(|e| {
let msg = format!("Failed to parse duration '{duration_str}': {e};");
serde::de::Error::custom(msg)
})
}

#[derive(Debug, Clone)]
pub struct InternalConfig {
pub fuel_polling_interval: Duration,
pub struct Internal {
pub fuel_errors_before_unhealthy: usize,
pub between_eth_event_stream_restablishing_attempts: Duration,
pub eth_errors_before_unhealthy: usize,
pub balance_update_interval: Duration,
}

impl Default for InternalConfig {
impl Default for Internal {
fn default() -> Self {
Self {
fuel_polling_interval: Duration::from_secs(3),
fuel_errors_before_unhealthy: 3,
between_eth_event_stream_restablishing_attempts: Duration::from_secs(3),
eth_errors_before_unhealthy: 3,
Expand Down
5 changes: 2 additions & 3 deletions committer/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl From<ports::l1::Error> for Error {
fn from(error: ports::l1::Error) -> Self {
match error {
ports::l1::Error::Network(e) => Self::Network(e),
_ => Self::Other(error.to_string()),
ports::l1::Error::Other(e) => Self::Other(e),
}
}
}
Expand All @@ -57,8 +57,7 @@ impl From<services::Error> for Error {
match error {
services::Error::Network(e) => Self::Network(e),
services::Error::Storage(e) => Self::Storage(e),
services::Error::BlockValidation(e) => Self::Other(e),
services::Error::Other(e) => Self::Other(e),
services::Error::BlockValidation(e) | services::Error::Other(e) => Self::Other(e),
}
}
}
Expand Down
35 changes: 16 additions & 19 deletions committer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,8 @@ mod errors;
mod setup;

use api::launch_api_server;
use config::InternalConfig;
use errors::Result;
use metrics::prometheus::Registry;
use setup::{
create_l1_adapter, setup_logger, setup_storage, spawn_block_watcher,
spawn_l1_committer_and_listener, spawn_wallet_balance_tracker,
};
use tokio_util::sync::CancellationToken;

use crate::setup::shut_down;
Expand All @@ -24,41 +19,44 @@ pub type Validator = validator::BlockValidator;

#[tokio::main]
async fn main() -> Result<()> {
setup_logger();
setup::logger();

let config = config::parse()?;

let storage = setup_storage(&config).await?;
let storage = setup::storage(&config).await?;

let internal_config = InternalConfig::default();
let internal_config = config::Internal::default();
let cancel_token = CancellationToken::new();

let metrics_registry = Registry::default();

let (fuel_adapter, fuel_health_check) =
setup::fuel_adapter(&config, &internal_config, &metrics_registry);

let (ethereum_rpc, eth_health_check) =
create_l1_adapter(&config, &internal_config, &metrics_registry).await?;
setup::l1_adapter(&config, &internal_config, &metrics_registry).await?;

let commit_interval = ethereum_rpc.commit_interval();

let (rx_fuel_block, block_watcher_handle, fuel_health_check) = spawn_block_watcher(
commit_interval,
&config,
let wallet_balance_tracker_handle = setup::wallet_balance_tracker(
&internal_config,
storage.clone(),
&metrics_registry,
ethereum_rpc.clone(),
cancel_token.clone(),
);

let wallet_balance_tracker_handle = spawn_wallet_balance_tracker(
&internal_config,
&metrics_registry,
let committer_handle = setup::block_committer(
commit_interval,
ethereum_rpc.clone(),
storage.clone(),
fuel_adapter,
&config,
&metrics_registry,
cancel_token.clone(),
);

let (committer_handle, listener_handle) = spawn_l1_committer_and_listener(
let listener_handle = setup::l1_event_listener(
&internal_config,
rx_fuel_block,
ethereum_rpc,
storage.clone(),
&metrics_registry,
Expand All @@ -76,7 +74,6 @@ async fn main() -> Result<()> {

shut_down(
cancel_token,
block_watcher_handle,
wallet_balance_tracker_handle,
committer_handle,
listener_handle,
Expand Down
Loading

0 comments on commit 40ad238

Please sign in to comment.