diff --git a/crates/madara/client/rpc/src/versions/admin/v0_1_0/methods/services.rs b/crates/madara/client/rpc/src/versions/admin/v0_1_0/methods/services.rs index a4a63017e..5eb0321b1 100644 --- a/crates/madara/client/rpc/src/versions/admin/v0_1_0/methods/services.rs +++ b/crates/madara/client/rpc/src/versions/admin/v0_1_0/methods/services.rs @@ -21,13 +21,13 @@ impl MadaraServicesRpcApiV0_1_0Server for Starknet { )) } else { match status { - ServiceRequest::Start => Ok(svcs.iter().map(|svc| self.ctx.service_add(*svc)).collect()), - ServiceRequest::Stop => Ok(svcs.iter().map(|svc| self.ctx.service_remove(*svc)).collect()), + ServiceRequest::Start => Ok(svcs.iter().map(|svc| self.ctx.service_activate(*svc)).collect()), + ServiceRequest::Stop => Ok(svcs.iter().map(|svc| self.ctx.service_deactivate(*svc)).collect()), ServiceRequest::Restart => { - let res = Ok(svcs.iter().map(|svc| self.ctx.service_remove(*svc)).collect()); + let res = Ok(svcs.iter().map(|svc| self.ctx.service_deactivate(*svc)).collect()); tokio::time::sleep(RESTART_INTERVAL).await; svcs.iter().for_each(|svc| { - self.ctx.service_add(*svc); + self.ctx.service_activate(*svc); }); res diff --git a/crates/madara/client/telemetry/src/lib.rs b/crates/madara/client/telemetry/src/lib.rs index 7ea05e376..43d961a8d 100644 --- a/crates/madara/client/telemetry/src/lib.rs +++ b/crates/madara/client/telemetry/src/lib.rs @@ -84,9 +84,7 @@ impl Service for TelemetryService { let rx = self.telemetry_handle.0.subscribe(); let clients = start_clients(&self.telemetry_endpoints).await; - runner.service_loop(move |ctx| start_telemetry(rx, ctx, clients)); - - anyhow::Ok(()) + runner.service_loop(move |ctx| start_telemetry(rx, ctx, clients)) } } diff --git a/crates/madara/node/src/cli/mod.rs b/crates/madara/node/src/cli/mod.rs index 9f6b56825..0c69dc57b 100644 --- a/crates/madara/node/src/cli/mod.rs +++ b/crates/madara/node/src/cli/mod.rs @@ -306,6 +306,34 @@ impl RunCmd { pub fn is_devnet(&self) -> bool { self.devnet } + + pub fn is_active_l1_sync(&self) -> bool { + !self.l1_sync_params.l1_sync_disabled && (self.l1_sync_params.l1_endpoint.is_some() || !self.devnet) + } + + pub fn is_active_l2_sync(&self) -> bool { + self.args_preset.warp_update_receiver || !self.l2_sync_params.l2_sync_disabled + } + + pub fn is_active_p2p(&self) -> bool { + self.p2p_params.p2p + } + + pub fn is_active_rpc_user(&self) -> bool { + !self.rpc_params.rpc_disable && !self.args_preset.warp_update_receiver + } + + pub fn is_active_rpc_admin(&self) -> bool { + self.rpc_params.rpc_admin && !self.args_preset.warp_update_receiver + } + + pub fn is_active_fgw(&self) -> bool { + self.gateway_params.feeder_gateway_enable && !self.args_preset.warp_update_receiver + } + + pub fn is_active_telemetry(&self) -> bool { + self.telemetry_params.telemetry && !self.args_preset.warp_update_receiver + } } /// Starknet network types. diff --git a/crates/madara/node/src/main.rs b/crates/madara/node/src/main.rs index 1d3a009d1..204928e7b 100644 --- a/crates/madara/node/src/main.rs +++ b/crates/madara/node/src/main.rs @@ -16,7 +16,7 @@ use mc_mempool::{GasPriceProvider, L1DataProvider, Mempool, MempoolLimits}; use mc_rpc::providers::{AddTransactionProvider, ForwardToProvider, MempoolAddTxProvider}; use mc_telemetry::{SysInfo, TelemetryService}; use mp_oracle::pragma::PragmaOracleBuilder; -use mp_utils::service::{MadaraServiceId, ServiceMonitor}; +use mp_utils::service::{MadaraServiceId, ServiceMonitorBuilder}; use service::{BlockProductionService, GatewayService, L1SyncService, P2pService, RpcService, SyncService}; use starknet_api::core::ChainId; use std::sync::Arc; @@ -294,59 +294,28 @@ async fn main() -> anyhow::Result<()> { service_block_production.setup_devnet().await?; } - let mut app = ServiceMonitor::new() + // Since the database is not implemented as a proper service, we do not + // active it, as it would never be marked as stopped by the existing logic + ServiceMonitorBuilder::new() .with(service_db)? .with(service_l1_sync)? - .with(service_p2p)? .with(service_l2_sync)? + .with(service_p2p)? .with(service_block_production)? .with(service_rpc_user)? .with(service_rpc_admin)? .with(service_gateway)? - .with(service_telemetry)?; - - // Since the database is not implemented as a proper service, we do not - // active it, as it would never be marked as stopped by the existing logic - // - // app.activate(MadaraService::Database); - - let l1_sync_enabled = !run_cmd.l1_sync_params.l1_sync_disabled; - let l1_endpoint_some = run_cmd.l1_sync_params.l1_endpoint.is_some(); - let warp_update_receiver = run_cmd.args_preset.warp_update_receiver; - - if l1_sync_enabled && (l1_endpoint_some || !run_cmd.devnet) { - app.activate(MadaraServiceId::L1Sync); - } - - if run_cmd.p2p_params.p2p { - app.activate(MadaraServiceId::P2P); - } - - if warp_update_receiver { - app.activate(MadaraServiceId::L2Sync); - } else if run_cmd.is_sequencer() { - app.activate(MadaraServiceId::BlockProduction); - } else if !run_cmd.l2_sync_params.l2_sync_disabled { - app.activate(MadaraServiceId::L2Sync); - } - - if !run_cmd.rpc_params.rpc_disable && !warp_update_receiver { - app.activate(MadaraServiceId::RpcUser); - } - - if run_cmd.rpc_params.rpc_admin && !warp_update_receiver { - app.activate(MadaraServiceId::RpcAdmin); - } - - if run_cmd.gateway_params.feeder_gateway_enable && !warp_update_receiver { - app.activate(MadaraServiceId::Gateway); - } - - if run_cmd.telemetry_params.telemetry && !warp_update_receiver { - app.activate(MadaraServiceId::Telemetry); - } - - app.start().await?; + .with(service_telemetry)? + .activate_if(MadaraServiceId::L1Sync, || run_cmd.is_active_l1_sync())? + .activate_if(MadaraServiceId::L2Sync, || run_cmd.is_active_l2_sync())? + .activate_if(MadaraServiceId::P2P, || run_cmd.is_active_p2p())? + .activate_if(MadaraServiceId::BlockProduction, || run_cmd.is_sequencer())? + .activate_if(MadaraServiceId::RpcUser, || run_cmd.is_active_rpc_user())? + .activate_if(MadaraServiceId::RpcAdmin, || run_cmd.is_active_rpc_admin())? + .activate_if(MadaraServiceId::Gateway, || run_cmd.is_active_fgw())? + .activate_if(MadaraServiceId::Telemetry, || run_cmd.is_active_telemetry())? + .start() + .await?; let _ = analytics.shutdown(); diff --git a/crates/madara/node/src/service/block_production.rs b/crates/madara/node/src/service/block_production.rs index 42c7d864e..1e3904c76 100644 --- a/crates/madara/node/src/service/block_production.rs +++ b/crates/madara/node/src/service/block_production.rs @@ -49,9 +49,7 @@ impl Service for BlockProductionService { Arc::clone(l1_data_provider), )?; - runner.service_loop(move |ctx| block_production_task.block_production_task(ctx)); - - Ok(()) + runner.service_loop(move |ctx| block_production_task.block_production_task(ctx)) } } diff --git a/crates/madara/node/src/service/gateway.rs b/crates/madara/node/src/service/gateway.rs index f5f2aec25..73370ede4 100644 --- a/crates/madara/node/src/service/gateway.rs +++ b/crates/madara/node/src/service/gateway.rs @@ -44,8 +44,7 @@ impl Service for GatewayService { config.gateway_port, ctx, ) - }); - Ok(()) + }) } } diff --git a/crates/madara/node/src/service/l1.rs b/crates/madara/node/src/service/l1.rs index 1fc4ab08e..eb9e7b52b 100644 --- a/crates/madara/node/src/service/l1.rs +++ b/crates/madara/node/src/service/l1.rs @@ -114,12 +114,11 @@ impl Service for L1SyncService { l1_head_snd, ctx, ) - }); + }) } else { tracing::error!("❗ Tried to start L1 Sync but no l1 endpoint was provided to the node on startup"); + anyhow::bail!("Failed to start L1 Sync") } - - Ok(()) } } diff --git a/crates/madara/node/src/service/l2.rs b/crates/madara/node/src/service/l2.rs index 1102b67e7..002373714 100644 --- a/crates/madara/node/src/service/l2.rs +++ b/crates/madara/node/src/service/l2.rs @@ -84,9 +84,7 @@ impl Service for SyncService { .run(ctx) .await } - }); - - Ok(()) + }) } } diff --git a/crates/madara/node/src/service/p2p.rs b/crates/madara/node/src/service/p2p.rs index b13b0e8ae..2ecb3d999 100644 --- a/crates/madara/node/src/service/p2p.rs +++ b/crates/madara/node/src/service/p2p.rs @@ -6,7 +6,6 @@ use mp_utils::service::{MadaraServiceId, Service, ServiceId, ServiceIdProvider, use std::time::Duration; pub struct P2pService { - enabled: bool, // add_transaction_provider: Arc, p2p: Option, } @@ -32,7 +31,7 @@ impl P2pService { None }; - Ok(Self { p2p, enabled: config.p2p }) + Ok(Self { p2p }) } pub fn commands(&mut self) -> Option { @@ -43,11 +42,9 @@ impl P2pService { #[async_trait::async_trait] impl Service for P2pService { async fn start<'a>(&mut self, runner: ServiceRunner<'a>) -> anyhow::Result<()> { - if self.enabled { - let p2p = self.p2p.take().expect("Service already started"); - runner.service_loop(move |ctx| async move { p2p.build().context("Building p2p service")?.run(ctx).await }); - } - Ok(()) + // TODO: we need to be able to restart this! + let p2p = self.p2p.take().expect("Service already started"); + runner.service_loop(move |ctx| async move { p2p.build().context("Building p2p service")?.run(ctx).await }) } } diff --git a/crates/madara/node/src/service/rpc/mod.rs b/crates/madara/node/src/service/rpc/mod.rs index 3a6dc015d..127d0e139 100644 --- a/crates/madara/node/src/service/rpc/mod.rs +++ b/crates/madara/node/src/service/rpc/mod.rs @@ -125,12 +125,8 @@ impl Service for RpcService { } }; - start_server(server_config, ctx.clone(), stop_handle).await?; - - anyhow::Ok(()) - }); - - anyhow::Ok(()) + start_server(server_config, ctx.clone(), stop_handle).await + }) } } diff --git a/crates/madara/primitives/utils/src/service.rs b/crates/madara/primitives/utils/src/service.rs index f9f4884c2..7d11dbc9c 100644 --- a/crates/madara/primitives/utils/src/service.rs +++ b/crates/madara/primitives/utils/src/service.rs @@ -1016,8 +1016,8 @@ pub struct ServiceTransport { /// # use mp_utils::service::ServiceIdProvider; /// # use mp_utils::service::ServiceRunner; /// # use mp_utils::service::ServiceMonitorBuilder; -/// -/// // Step 1: implementing the `ServiceId` trait. We use this to identify our services. +/// // Step 1: implementing the `ServiceId` trait. We use this to identify our +/// // services. /// pub enum MyServiceId { /// MyServiceA, /// MyServiceB @@ -1038,8 +1038,8 @@ pub struct ServiceTransport { /// Closed /// } /// -/// // Step 2: implementing the `Service` trait. An example service, sends over 4 integers to -/// // `ServiceB` and the exits +/// // Step 2: implementing the `Service` trait. An example service, sends over +/// // 4 integers to `ServiceB` and the exits /// struct MyServiceA(tokio::sync::broadcast::Sender>); /// /// #[async_trait::async_trait] @@ -1073,7 +1073,8 @@ pub struct ServiceTransport { /// } /// } /// -/// // Step 3: implementing the `ServiceIdProvider` trait. This re-uses the logic from step 1. +/// // Step 3: implementing the `ServiceIdProvider` trait. This re-uses the +/// // logic from step 1. /// impl ServiceIdProvider for MyServiceA { /// fn id_provider(&self) -> impl ServiceId { /// MyServiceId::MyServiceA @@ -1093,8 +1094,9 @@ pub struct ServiceTransport { /// let i = tokio::select! { /// res = rx.recv() => { /// // As mentioned above, `res` will never receive an -/// // `Err(RecvError::Closed)` since we always keep a sender alive in A for -/// // restarts, so we manually check if the channel was closed. +/// // `Err(RecvError::Closed)` since we always keep a +/// // sender alive in A for restarts, so we manually +/// // check if the channel was closed. /// match res? { /// Channel::Open(i) => i, /// Channel::Closed => break, @@ -1128,11 +1130,12 @@ pub struct ServiceTransport { /// let service_a = MyServiceA(sx); /// let service_b = MyServiceB(rx); /// -/// // Step 4: we add our service to a `ServiceMonitor` (using a type-safe builder pattern)... +/// // Step 4: we add our service to a `ServiceMonitor` (using a type-safe +/// // builder pattern)... /// ServiceMonitorBuilder::new() /// .with_active(service_a)? /// .with_active(service_b)? -/// .start() // ,,and start them +/// .start() // ...and start them /// .await /// } /// ``` @@ -1343,6 +1346,29 @@ impl ServiceMonitorBuilder { ) -> Result, ServiceMonitorError> { self.activate_impl(&id.svc_id()) } + + /// Marks a [`Service`] as [`Active`], if a condition is met, in which case it will be started + /// automatically when calling [`start`]. + /// + /// Note that this is not entirely type safe as we cannot encode runtime assertions into the + /// type system. + /// + /// # Errors + /// + /// Returns [`UnregisteredService`] if trying to activate a service that has not already been + /// added by using [`with`]. + /// + /// [`Active`]: ServiceStatus + /// [`start`]: Self::start + /// [`UnregisteredService`]: ServiceMonitorError::UnregisteredService + /// [`with`]: Self::with + pub fn activate_if( + self, + id: impl ServiceId, + f: impl Fn() -> bool, + ) -> Result, ServiceMonitorError> { + self.activate_if_impl(id, f) + } } impl ServiceMonitorBuilder { @@ -1386,6 +1412,29 @@ impl ServiceMonitorBuilder { self.activate_impl(&id.svc_id()) } + /// Marks a [`Service`] as [`Active`], if a condition is met, in which case it will be started + /// automatically when calling [`start`]. + /// + /// Note that this is not entirely type safe as we cannot encode runtime assertions into the + /// type system. + /// + /// # Errors + /// + /// Returns [`UnregisteredService`] if trying to activate a service that has not already been + /// added by using [`with`]. + /// + /// [`Active`]: ServiceStatus + /// [`start`]: Self::start + /// [`UnregisteredService`]: ServiceMonitorError::UnregisteredService + /// [`with`]: Self::with + pub fn activate_if( + self, + id: impl ServiceId, + f: impl Fn() -> bool, + ) -> Result, ServiceMonitorError> { + self.activate_if_impl(id, f) + } + /// Consumes this builder and returns a [ServiceMonitor] pub fn build(self) -> ServiceMonitor { let Self { services, status_monitored, monitored, ctx, .. } = self; @@ -1484,6 +1533,19 @@ impl ServiceMonitorBuilder { Ok(ServiceMonitorBuilder { services, status_monitored, monitored, ctx, _state: std::marker::PhantomData }) } } + + fn activate_if_impl( + self, + id: impl ServiceId, + f: impl Fn() -> bool, + ) -> Result, ServiceMonitorError> { + if f() { + self.activate_impl(&id.svc_id()) + } else { + let Self { services, status_monitored, monitored, ctx, .. } = self; + Ok(ServiceMonitorBuilder { services, status_monitored, monitored, ctx, _state: std::marker::PhantomData }) + } + } } /// Orchestrates the execution of various [`Service`]s.