Skip to content

Commit

Permalink
[ISSUE mxsm#2287]🚀Implement BrokerRuntime start method💫 (mxsm#2288)
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm authored Jan 16, 2025
1 parent 61bbbc5 commit e3be3f4
Showing 1 changed file with 53 additions and 5 deletions.
58 changes: 53 additions & 5 deletions rocketmq-broker/src/broker_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use tracing::info;
use tracing::warn;

use crate::broker::broker_hook::BrokerShutdownHook;
use crate::broker::broker_pre_online_service::BrokerPreOnlineService;
use crate::client::consumer_ids_change_listener::ConsumerIdsChangeListener;
use crate::client::default_consumer_ids_change_listener::DefaultConsumerIdsChangeListener;
use crate::client::manager::consumer_manager::ConsumerManager;
Expand Down Expand Up @@ -158,6 +159,7 @@ pub(crate) struct BrokerRuntime {
shutdown_hook: Option<BrokerShutdownHook>,
consumer_ids_change_listener: Arc<Box<dyn ConsumerIdsChangeListener + Send + Sync + 'static>>,
topic_queue_mapping_clean_service: TopicQueueMappingCleanService,
broker_pre_online_service: BrokerPreOnlineService,
// receiver for shutdown signal
pub(crate) shutdown_rx: Option<tokio::sync::broadcast::Receiver<()>>,
}
Expand Down Expand Up @@ -263,6 +265,7 @@ impl BrokerRuntime {
shutdown_hook: None,
consumer_ids_change_listener,
topic_queue_mapping_clean_service: TopicQueueMappingCleanService,
broker_pre_online_service: BrokerPreOnlineService,
shutdown_rx: None,
}
}
Expand Down Expand Up @@ -921,8 +924,6 @@ impl BrokerRuntime {
fn initial_request_pipeline(&mut self) {}

fn start_basic_service(&mut self) {
let request_processor = self.init_processor();
let fast_request_processor = request_processor.clone();
if let Some(ref mut message_store) = self.inner.message_store {
message_store
.start()
Expand All @@ -931,6 +932,16 @@ impl BrokerRuntime {
panic!("Message store is not initialized");
}

if let Some(timer_message_store) = self.inner.timer_message_store.as_mut() {
timer_message_store.start();
}
if let Some(replicas_manager) = self.inner.replicas_manager.as_mut() {
replicas_manager.start();
}

let request_processor = self.init_processor();
let fast_request_processor = request_processor.clone();

let server = RocketMQServer::new(Arc::new(self.inner.server_config.clone()));
//start nomarl broker remoting_server
tokio::spawn(async move { server.run(request_processor).await });
Expand All @@ -940,15 +951,52 @@ impl BrokerRuntime {
let fast_server = RocketMQServer::new(Arc::new(fast_server_config));
tokio::spawn(async move { fast_server.run(fast_request_processor).await });

if let Some(pop_message_processor) = self.inner.pop_message_processor.as_mut() {
pop_message_processor.start();
}
if let Some(ack_message_processor) = self.inner.ack_message_processor.as_mut() {
ack_message_processor.start();
}

if let Some(notification_processor) = self.inner.notification_processor.as_mut() {
notification_processor.start();
}

if let Some(topic_queue_mapping_clean_service) =
self.inner.topic_queue_mapping_clean_service.as_mut()
{
topic_queue_mapping_clean_service.start();
}

let inner = self.inner.clone();
if let Some(pull_request_hold_service) = self.inner.pull_request_hold_service.as_mut() {
pull_request_hold_service.start(inner);
}

self.inner.topic_route_info_manager().start();
if let Some(broker_stats_manager) = self.inner.broker_stats_manager.as_mut() {
broker_stats_manager.start();
}

self.inner.broker_fast_failure.start();

//self.inner.escape_bridge().start(self.message_store.clone());
self.inner.escape_bridge_mut().start();
self.inner.broadcast_offset_manager.start();

if let Some(escape_bridge) = self.inner.escape_bridge.as_mut() {
escape_bridge.start();
}
if let Some(topic_route_info_manager) = self.inner.topic_route_info_manager.as_mut() {
topic_route_info_manager.start();
}
self.broker_pre_online_service.start();

if let Some(cold_data_pull_request_hold_service) =
self.inner.cold_data_pull_request_hold_service.as_mut()
{
cold_data_pull_request_hold_service.start();
}
if let Some(cold_data_cg_ctr_service) = self.inner.cold_data_cg_ctr_service.as_mut() {
cold_data_cg_ctr_service.start();
}
}

async fn update_namesrv_addr(&mut self) {
Expand Down

0 comments on commit e3be3f4

Please sign in to comment.