Skip to content

Commit

Permalink
refactor(service): updated project to use new service capabilities
Browse files Browse the repository at this point in the history
  • Loading branch information
Trantorian1 committed Feb 27, 2025
1 parent bebc4fd commit caaf4d4
Show file tree
Hide file tree
Showing 11 changed files with 131 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ impl MadaraServicesRpcApiV0_1_0Server for Starknet {
))
} else {
match status {
ServiceRequest::Start => Ok(svcs.iter().map(|svc| self.ctx.service_add(*svc)).collect()),
ServiceRequest::Stop => Ok(svcs.iter().map(|svc| self.ctx.service_remove(*svc)).collect()),
ServiceRequest::Start => Ok(svcs.iter().map(|svc| self.ctx.service_activate(*svc)).collect()),
ServiceRequest::Stop => Ok(svcs.iter().map(|svc| self.ctx.service_deactivate(*svc)).collect()),
ServiceRequest::Restart => {
let res = Ok(svcs.iter().map(|svc| self.ctx.service_remove(*svc)).collect());
let res = Ok(svcs.iter().map(|svc| self.ctx.service_deactivate(*svc)).collect());
tokio::time::sleep(RESTART_INTERVAL).await;
svcs.iter().for_each(|svc| {
self.ctx.service_add(*svc);
self.ctx.service_activate(*svc);
});

res
Expand Down
4 changes: 1 addition & 3 deletions crates/madara/client/telemetry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,7 @@ impl Service for TelemetryService {
let rx = self.telemetry_handle.0.subscribe();
let clients = start_clients(&self.telemetry_endpoints).await;

runner.service_loop(move |ctx| start_telemetry(rx, ctx, clients));

anyhow::Ok(())
runner.service_loop(move |ctx| start_telemetry(rx, ctx, clients))
}
}

Expand Down
28 changes: 28 additions & 0 deletions crates/madara/node/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,34 @@ impl RunCmd {
pub fn is_devnet(&self) -> bool {
self.devnet
}

pub fn is_active_l1_sync(&self) -> bool {
!self.l1_sync_params.l1_sync_disabled && (self.l1_sync_params.l1_endpoint.is_some() || !self.devnet)
}

pub fn is_active_l2_sync(&self) -> bool {
self.args_preset.warp_update_receiver || !self.l2_sync_params.l2_sync_disabled
}

pub fn is_active_p2p(&self) -> bool {
self.p2p_params.p2p
}

pub fn is_active_rpc_user(&self) -> bool {
!self.rpc_params.rpc_disable && !self.args_preset.warp_update_receiver
}

pub fn is_active_rpc_admin(&self) -> bool {
self.rpc_params.rpc_admin && !self.args_preset.warp_update_receiver
}

pub fn is_active_fgw(&self) -> bool {
self.gateway_params.feeder_gateway_enable && !self.args_preset.warp_update_receiver
}

pub fn is_active_telemetry(&self) -> bool {
self.telemetry_params.telemetry && !self.args_preset.warp_update_receiver
}
}

/// Starknet network types.
Expand Down
63 changes: 16 additions & 47 deletions crates/madara/node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use mc_mempool::{GasPriceProvider, L1DataProvider, Mempool, MempoolLimits};
use mc_rpc::providers::{AddTransactionProvider, ForwardToProvider, MempoolAddTxProvider};
use mc_telemetry::{SysInfo, TelemetryService};
use mp_oracle::pragma::PragmaOracleBuilder;
use mp_utils::service::{MadaraServiceId, ServiceMonitor};
use mp_utils::service::{MadaraServiceId, ServiceMonitorBuilder};
use service::{BlockProductionService, GatewayService, L1SyncService, P2pService, RpcService, SyncService};
use starknet_api::core::ChainId;
use std::sync::Arc;
Expand Down Expand Up @@ -294,59 +294,28 @@ async fn main() -> anyhow::Result<()> {
service_block_production.setup_devnet().await?;
}

let mut app = ServiceMonitor::new()
// Since the database is not implemented as a proper service, we do not
// active it, as it would never be marked as stopped by the existing logic
ServiceMonitorBuilder::new()
.with(service_db)?
.with(service_l1_sync)?
.with(service_p2p)?
.with(service_l2_sync)?
.with(service_p2p)?
.with(service_block_production)?
.with(service_rpc_user)?
.with(service_rpc_admin)?
.with(service_gateway)?
.with(service_telemetry)?;

// Since the database is not implemented as a proper service, we do not
// active it, as it would never be marked as stopped by the existing logic
//
// app.activate(MadaraService::Database);

let l1_sync_enabled = !run_cmd.l1_sync_params.l1_sync_disabled;
let l1_endpoint_some = run_cmd.l1_sync_params.l1_endpoint.is_some();
let warp_update_receiver = run_cmd.args_preset.warp_update_receiver;

if l1_sync_enabled && (l1_endpoint_some || !run_cmd.devnet) {
app.activate(MadaraServiceId::L1Sync);
}

if run_cmd.p2p_params.p2p {
app.activate(MadaraServiceId::P2P);
}

if warp_update_receiver {
app.activate(MadaraServiceId::L2Sync);
} else if run_cmd.is_sequencer() {
app.activate(MadaraServiceId::BlockProduction);
} else if !run_cmd.l2_sync_params.l2_sync_disabled {
app.activate(MadaraServiceId::L2Sync);
}

if !run_cmd.rpc_params.rpc_disable && !warp_update_receiver {
app.activate(MadaraServiceId::RpcUser);
}

if run_cmd.rpc_params.rpc_admin && !warp_update_receiver {
app.activate(MadaraServiceId::RpcAdmin);
}

if run_cmd.gateway_params.feeder_gateway_enable && !warp_update_receiver {
app.activate(MadaraServiceId::Gateway);
}

if run_cmd.telemetry_params.telemetry && !warp_update_receiver {
app.activate(MadaraServiceId::Telemetry);
}

app.start().await?;
.with(service_telemetry)?
.activate_if(MadaraServiceId::L1Sync, || run_cmd.is_active_l1_sync())?
.activate_if(MadaraServiceId::L2Sync, || run_cmd.is_active_l2_sync())?
.activate_if(MadaraServiceId::P2P, || run_cmd.is_active_p2p())?
.activate_if(MadaraServiceId::BlockProduction, || run_cmd.is_sequencer())?
.activate_if(MadaraServiceId::RpcUser, || run_cmd.is_active_rpc_user())?
.activate_if(MadaraServiceId::RpcAdmin, || run_cmd.is_active_rpc_admin())?
.activate_if(MadaraServiceId::Gateway, || run_cmd.is_active_fgw())?
.activate_if(MadaraServiceId::Telemetry, || run_cmd.is_active_telemetry())?
.start()
.await?;

let _ = analytics.shutdown();

Expand Down
4 changes: 1 addition & 3 deletions crates/madara/node/src/service/block_production.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,7 @@ impl Service for BlockProductionService {
Arc::clone(l1_data_provider),
)?;

runner.service_loop(move |ctx| block_production_task.block_production_task(ctx));

Ok(())
runner.service_loop(move |ctx| block_production_task.block_production_task(ctx))
}
}

Expand Down
3 changes: 1 addition & 2 deletions crates/madara/node/src/service/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ impl Service for GatewayService {
config.gateway_port,
ctx,
)
});
Ok(())
})
}
}

Expand Down
5 changes: 2 additions & 3 deletions crates/madara/node/src/service/l1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,11 @@ impl Service for L1SyncService {
l1_head_snd,
ctx,
)
});
})
} else {
tracing::error!("❗ Tried to start L1 Sync but no l1 endpoint was provided to the node on startup");
anyhow::bail!("Failed to start L1 Sync")
}

Ok(())
}
}

Expand Down
4 changes: 1 addition & 3 deletions crates/madara/node/src/service/l2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,7 @@ impl Service for SyncService {
.run(ctx)
.await
}
});

Ok(())
})
}
}

Expand Down
11 changes: 4 additions & 7 deletions crates/madara/node/src/service/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use mp_utils::service::{MadaraServiceId, Service, ServiceId, ServiceIdProvider,
use std::time::Duration;

pub struct P2pService {
enabled: bool,
// add_transaction_provider: Arc<dyn AddTransactionProvider>,
p2p: Option<mc_p2p::MadaraP2pBuilder>,
}
Expand All @@ -32,7 +31,7 @@ impl P2pService {
None
};

Ok(Self { p2p, enabled: config.p2p })
Ok(Self { p2p })
}

pub fn commands(&mut self) -> Option<P2pCommands> {
Expand All @@ -43,11 +42,9 @@ impl P2pService {
#[async_trait::async_trait]
impl Service for P2pService {
async fn start<'a>(&mut self, runner: ServiceRunner<'a>) -> anyhow::Result<()> {
if self.enabled {
let p2p = self.p2p.take().expect("Service already started");
runner.service_loop(move |ctx| async move { p2p.build().context("Building p2p service")?.run(ctx).await });
}
Ok(())
// TODO: we need to be able to restart this!
let p2p = self.p2p.take().expect("Service already started");
runner.service_loop(move |ctx| async move { p2p.build().context("Building p2p service")?.run(ctx).await })
}
}

Expand Down
8 changes: 2 additions & 6 deletions crates/madara/node/src/service/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,8 @@ impl Service for RpcService {
}
};

start_server(server_config, ctx.clone(), stop_handle).await?;

anyhow::Ok(())
});

anyhow::Ok(())
start_server(server_config, ctx.clone(), stop_handle).await
})
}
}

Expand Down
80 changes: 71 additions & 9 deletions crates/madara/primitives/utils/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1016,8 +1016,8 @@ pub struct ServiceTransport {
/// # use mp_utils::service::ServiceIdProvider;
/// # use mp_utils::service::ServiceRunner;
/// # use mp_utils::service::ServiceMonitorBuilder;
///
/// // Step 1: implementing the `ServiceId` trait. We use this to identify our services.
/// // Step 1: implementing the `ServiceId` trait. We use this to identify our
/// // services.
/// pub enum MyServiceId {
/// MyServiceA,
/// MyServiceB
Expand All @@ -1038,8 +1038,8 @@ pub struct ServiceTransport {
/// Closed
/// }
///
/// // Step 2: implementing the `Service` trait. An example service, sends over 4 integers to
/// // `ServiceB` and the exits
/// // Step 2: implementing the `Service` trait. An example service, sends over
/// // 4 integers to `ServiceB` and the exits
/// struct MyServiceA(tokio::sync::broadcast::Sender<Channel<usize>>);
///
/// #[async_trait::async_trait]
Expand Down Expand Up @@ -1073,7 +1073,8 @@ pub struct ServiceTransport {
/// }
/// }
///
/// // Step 3: implementing the `ServiceIdProvider` trait. This re-uses the logic from step 1.
/// // Step 3: implementing the `ServiceIdProvider` trait. This re-uses the
/// // logic from step 1.
/// impl ServiceIdProvider for MyServiceA {
/// fn id_provider(&self) -> impl ServiceId {
/// MyServiceId::MyServiceA
Expand All @@ -1093,8 +1094,9 @@ pub struct ServiceTransport {
/// let i = tokio::select! {
/// res = rx.recv() => {
/// // As mentioned above, `res` will never receive an
/// // `Err(RecvError::Closed)` since we always keep a sender alive in A for
/// // restarts, so we manually check if the channel was closed.
/// // `Err(RecvError::Closed)` since we always keep a
/// // sender alive in A for restarts, so we manually
/// // check if the channel was closed.
/// match res? {
/// Channel::Open(i) => i,
/// Channel::Closed => break,
Expand Down Expand Up @@ -1128,11 +1130,12 @@ pub struct ServiceTransport {
/// let service_a = MyServiceA(sx);
/// let service_b = MyServiceB(rx);
///
/// // Step 4: we add our service to a `ServiceMonitor` (using a type-safe builder pattern)...
/// // Step 4: we add our service to a `ServiceMonitor` (using a type-safe
/// // builder pattern)...
/// ServiceMonitorBuilder::new()
/// .with_active(service_a)?
/// .with_active(service_b)?
/// .start() // ,,and start them
/// .start() // ...and start them
/// .await
/// }
/// ```
Expand Down Expand Up @@ -1343,6 +1346,29 @@ impl ServiceMonitorBuilder<ServiceMonitorBuilderStateSome> {
) -> Result<ServiceMonitorBuilder<ServiceMonitorBuilderStateSomeActive>, ServiceMonitorError> {
self.activate_impl(&id.svc_id())
}

/// Marks a [`Service`] as [`Active`], if a condition is met, in which case it will be started
/// automatically when calling [`start`].
///
/// Note that this is not entirely type safe as we cannot encode runtime assertions into the
/// type system.
///
/// # Errors
///
/// Returns [`UnregisteredService`] if trying to activate a service that has not already been
/// added by using [`with`].
///
/// [`Active`]: ServiceStatus
/// [`start`]: Self::start
/// [`UnregisteredService`]: ServiceMonitorError::UnregisteredService
/// [`with`]: Self::with
pub fn activate_if(
self,
id: impl ServiceId,
f: impl Fn() -> bool,
) -> Result<ServiceMonitorBuilder<ServiceMonitorBuilderStateSomeActive>, ServiceMonitorError> {
self.activate_if_impl(id, f)
}
}

impl ServiceMonitorBuilder<ServiceMonitorBuilderStateSomeActive> {
Expand Down Expand Up @@ -1386,6 +1412,29 @@ impl ServiceMonitorBuilder<ServiceMonitorBuilderStateSomeActive> {
self.activate_impl(&id.svc_id())
}

/// Marks a [`Service`] as [`Active`], if a condition is met, in which case it will be started
/// automatically when calling [`start`].
///
/// Note that this is not entirely type safe as we cannot encode runtime assertions into the
/// type system.
///
/// # Errors
///
/// Returns [`UnregisteredService`] if trying to activate a service that has not already been
/// added by using [`with`].
///
/// [`Active`]: ServiceStatus
/// [`start`]: Self::start
/// [`UnregisteredService`]: ServiceMonitorError::UnregisteredService
/// [`with`]: Self::with
pub fn activate_if(
self,
id: impl ServiceId,
f: impl Fn() -> bool,
) -> Result<ServiceMonitorBuilder<ServiceMonitorBuilderStateSomeActive>, ServiceMonitorError> {
self.activate_if_impl(id, f)
}

/// Consumes this builder and returns a [ServiceMonitor]
pub fn build(self) -> ServiceMonitor {
let Self { services, status_monitored, monitored, ctx, .. } = self;
Expand Down Expand Up @@ -1484,6 +1533,19 @@ impl<S> ServiceMonitorBuilder<S> {
Ok(ServiceMonitorBuilder { services, status_monitored, monitored, ctx, _state: std::marker::PhantomData })
}
}

fn activate_if_impl<S2>(
self,
id: impl ServiceId,
f: impl Fn() -> bool,
) -> Result<ServiceMonitorBuilder<S2>, ServiceMonitorError> {
if f() {
self.activate_impl(&id.svc_id())
} else {
let Self { services, status_monitored, monitored, ctx, .. } = self;
Ok(ServiceMonitorBuilder { services, status_monitored, monitored, ctx, _state: std::marker::PhantomData })
}
}
}

/// Orchestrates the execution of various [`Service`]s.
Expand Down

0 comments on commit caaf4d4

Please sign in to comment.