Skip to content

Commit

Permalink
feat(starknet_sequencer_infra): add max concurrency limit to local co…
Browse files Browse the repository at this point in the history
…ncurrent servers (#3923)

commit-id:aa42f2c0
  • Loading branch information
lev-starkware authored Feb 5, 2025
1 parent e8edce1 commit a53875d
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::fmt::Debug;
use std::marker::PhantomData;
use std::sync::Arc;

use async_trait::async_trait;
use starknet_infra_utils::type_name::short_type_name;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::Semaphore;
use tracing::{debug, error, info, warn};

use crate::component_definitions::{
Expand Down Expand Up @@ -90,7 +92,8 @@ use crate::errors::{ComponentServerError, ReplaceComponentError};
/// let component = MyComponent {};
///
/// // Instantiate the server.
/// let mut server = LocalComponentServer::new(component, rx);
/// let max_concurrency = 1;
/// let mut server = LocalComponentServer::new(component, rx, max_concurrency);
///
/// // Start the server in a new task.
/// task::spawn(async move {
Expand Down Expand Up @@ -150,7 +153,8 @@ where
async fn start(&mut self) -> Result<(), ComponentServerError> {
info!("Starting ConcurrentLocalComponentServer for {}.", short_type_name::<Component>());
self.component.start().await?;
concurrent_request_response_loop(&mut self.rx, &mut self.component).await;
concurrent_request_response_loop(&mut self.rx, &mut self.component, self.max_concurrency)
.await;
error!("Finished ConcurrentLocalComponentServer for {}.", short_type_name::<Component>());
Err(ComponentServerError::ServerUnexpectedlyStopped)
}
Expand All @@ -164,6 +168,8 @@ where
{
component: Component,
rx: Receiver<ComponentRequestAndResponseSender<Request, Response>>,
// TODO(Itay, Lev): find the way to provide max_concurrency only for non-blocking server.
max_concurrency: usize,
_local_server_type: PhantomData<LocalServerType>,
}

Expand All @@ -177,8 +183,9 @@ where
pub fn new(
component: Component,
rx: Receiver<ComponentRequestAndResponseSender<Request, Response>>,
max_concurrency: usize,
) -> Self {
Self { component, rx, _local_server_type: PhantomData }
Self { component, rx, max_concurrency, _local_server_type: PhantomData }
}
}

Expand Down Expand Up @@ -232,20 +239,29 @@ async fn request_response_loop<Request, Response, Component>(
async fn concurrent_request_response_loop<Request, Response, Component>(
rx: &mut Receiver<ComponentRequestAndResponseSender<Request, Response>>,
component: &mut Component,
max_concurrency: usize,
) where
Component: ComponentRequestHandler<Request, Response> + Clone + Send + 'static,
Request: Send + Debug + 'static,
Response: Send + Debug + 'static,
{
info!("Starting concurrent server for component {}", short_type_name::<Component>());
let task_limiter = Arc::new(Semaphore::new(max_concurrency));

while let Some(request_and_res_tx) = rx.recv().await {
let request = request_and_res_tx.request;
let tx = request_and_res_tx.tx;
debug!("Component {} received request {:?}", short_type_name::<Component>(), request);

// Acquire a permit to run the task.
let permit = task_limiter.clone().acquire_owned().await.unwrap();

let mut cloned_component = component.clone();
tokio::spawn(async move { process_request(&mut cloned_component, request, tx).await });
tokio::spawn(async move {
process_request(&mut cloned_component, request, tx).await;
// Drop the permit to allow more tasks to be created.
drop(permit);
});
}

error!("Stopping concurrent server for component {}", short_type_name::<Component>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ async fn setup_concurrent_local_test() -> LocalConcurrentComponentClient {

let local_client = LocalConcurrentComponentClient::new(tx_a);

let mut concurrent_local_server = ConcurrentLocalComponentServer::new(component, rx_a);
let max_concurrency = 10;
let mut concurrent_local_server =
ConcurrentLocalComponentServer::new(component, rx_a, max_concurrency);
task::spawn(async move {
let _ = concurrent_local_server.start().await;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,9 @@ async fn test_setup() {
let component_a = ComponentA::new(Box::new(b_client.clone()));
let component_b = ComponentB::new(setup_value, Box::new(a_client.clone()));

let mut component_a_server = LocalComponentServer::new(component_a, rx_a);
let mut component_b_server = LocalComponentServer::new(component_b, rx_b);
let max_concurrency = 1;
let mut component_a_server = LocalComponentServer::new(component_a, rx_a, max_concurrency);
let mut component_b_server = LocalComponentServer::new(component_b, rx_b, max_concurrency);

task::spawn(async move {
let _ = component_a_server.start().await;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,11 @@ async fn setup_for_tests(setup_value: ValueB, a_socket: SocketAddr, b_socket: So
let a_local_client = LocalComponentClient::<ComponentARequest, ComponentAResponse>::new(tx_a);
let b_local_client = LocalComponentClient::<ComponentBRequest, ComponentBResponse>::new(tx_b);

let mut component_a_local_server = LocalComponentServer::new(component_a, rx_a);
let mut component_b_local_server = LocalComponentServer::new(component_b, rx_b);
let max_concurrency = 1;
let mut component_a_local_server =
LocalComponentServer::new(component_a, rx_a, max_concurrency);
let mut component_b_local_server =
LocalComponentServer::new(component_b, rx_b, max_concurrency);

let mut component_a_remote_server =
RemoteComponentServer::new(a_local_client, a_socket.ip(), a_socket.port());
Expand Down
13 changes: 12 additions & 1 deletion crates/starknet_sequencer_node/src/servers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ macro_rules! create_remote_server {
/// * $component - The component that will be taken to initialize the server if the execution mode
/// is enabled(LocalExecutionWithRemoteDisabled / LocalExecutionWithRemoteEnabled).
/// * $receiver - receiver side for the server.
/// * $max_concurrency - the maximum number of concurrent requests the server will handle.
/// * $server_type - the type of the server, one of string literals REGULAR_LOCAL_SERVER or
/// CONCURRENT_LOCAL_SERVER.
///
Expand All @@ -164,6 +165,7 @@ macro_rules! create_remote_server {
/// &config.components.batcher.execution_mode,
/// components.batcher,
/// communication.take_batcher_rx(),
/// config.components.batcher.max_concurrency,
/// REGULAR_LOCAL_SERVER,
/// );
/// match batcher_server {
Expand All @@ -172,7 +174,7 @@ macro_rules! create_remote_server {
/// }
/// ```
macro_rules! create_local_server {
($execution_mode:expr, $component:expr, $receiver:expr, $server_type:tt) => {
($execution_mode:expr, $component:expr, $receiver:expr, $max_concurrency:expr, $server_type:tt) => {
match *$execution_mode {
ReactiveComponentExecutionMode::LocalExecutionWithRemoteDisabled
| ReactiveComponentExecutionMode::LocalExecutionWithRemoteEnabled => {
Expand All @@ -181,6 +183,7 @@ macro_rules! create_local_server {
.take()
.expect(concat!(stringify!($component), " is not initialized.")),
$receiver,
$max_concurrency,
)))
}
ReactiveComponentExecutionMode::Disabled | ReactiveComponentExecutionMode::Remote => {
Expand Down Expand Up @@ -247,48 +250,56 @@ fn create_local_servers(
&config.components.batcher.execution_mode,
&mut components.batcher,
communication.take_batcher_rx(),
config.components.batcher.max_concurrency,
REGULAR_LOCAL_SERVER
);
let class_manager_server = create_local_server!(
&config.components.class_manager.execution_mode,
&mut components.class_manager,
communication.take_class_manager_rx(),
config.components.class_manager.max_concurrency,
REGULAR_LOCAL_SERVER
);
let gateway_server = create_local_server!(
&config.components.gateway.execution_mode,
&mut components.gateway,
communication.take_gateway_rx(),
config.components.gateway.max_concurrency,
REGULAR_LOCAL_SERVER
);
let l1_provider_server = create_local_server!(
&config.components.l1_provider.execution_mode,
&mut components.l1_provider,
communication.take_l1_provider_rx(),
config.components.l1_provider.max_concurrency,
REGULAR_LOCAL_SERVER
);
let mempool_server = create_local_server!(
&config.components.mempool.execution_mode,
&mut components.mempool,
communication.take_mempool_rx(),
config.components.mempool.max_concurrency,
REGULAR_LOCAL_SERVER
);
let mempool_p2p_propagator_server = create_local_server!(
&config.components.mempool_p2p.execution_mode,
&mut components.mempool_p2p_propagator,
communication.take_mempool_p2p_propagator_rx(),
config.components.mempool_p2p.max_concurrency,
REGULAR_LOCAL_SERVER
);
let sierra_compiler_server = create_local_server!(
&config.components.sierra_compiler.execution_mode,
&mut components.sierra_compiler,
communication.take_sierra_compiler_rx(),
config.components.sierra_compiler.max_concurrency,
CONCURRENT_LOCAL_SERVER
);
let state_sync_server = create_local_server!(
&config.components.state_sync.execution_mode,
&mut components.state_sync,
communication.take_state_sync_rx(),
config.components.state_sync.max_concurrency,
REGULAR_LOCAL_SERVER
);

Expand Down

0 comments on commit a53875d

Please sign in to comment.