Skip to content

Commit

Permalink
[ISSUE mxsm#2242]🚀Optimize BrokerRuntime shutdown💥 (mxsm#2267)
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm authored Jan 15, 2025
1 parent ede63cc commit a6f9764
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 6 deletions.
140 changes: 136 additions & 4 deletions rocketmq-broker/src/broker_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,20 @@ use tracing::info;
use tracing::warn;

use crate::broker::broker_hook::BrokerShutdownHook;
use crate::client::consumer_ids_change_listener::ConsumerIdsChangeListener;
use crate::client::default_consumer_ids_change_listener::DefaultConsumerIdsChangeListener;
use crate::client::manager::consumer_manager::ConsumerManager;
use crate::client::manager::producer_manager::ProducerManager;
use crate::client::net::broker_to_client::Broker2Client;
use crate::client::rebalance::rebalance_lock_manager::RebalanceLockManager;
use crate::coldctr::cold_data_cg_ctr_service::ColdDataCgCtrService;
use crate::coldctr::cold_data_pull_request_hold_service::ColdDataPullRequestHoldService;
use crate::controller::replicas_manager::ReplicasManager;
use crate::failover::escape_bridge::EscapeBridge;
use crate::filter::manager::consumer_filter_manager::ConsumerFilterManager;
use crate::hook::batch_check_before_put_message::BatchCheckBeforePutMessageHook;
use crate::hook::check_before_put_message::CheckBeforePutMessageHook;
use crate::latency::broker_fast_failure::BrokerFastFailure;
use crate::long_polling::long_polling_service::pull_request_hold_service::PullRequestHoldService;
use crate::long_polling::notify_message_arriving_listener::NotifyMessageArrivingListener;
use crate::offset::manager::broadcast_offset_manager::BroadcastOffsetManager;
Expand All @@ -74,6 +79,7 @@ use crate::processor::client_manage_processor::ClientManageProcessor;
use crate::processor::consumer_manage_processor::ConsumerManageProcessor;
use crate::processor::default_pull_message_result_handler::DefaultPullMessageResultHandler;
use crate::processor::end_transaction_processor::EndTransactionProcessor;
use crate::processor::notification_processor::NotificationProcessor;
use crate::processor::pop_inflight_message_counter::PopInflightMessageCounter;
use crate::processor::pop_message_processor::PopMessageProcessor;
use crate::processor::pull_message_processor::PullMessageProcessor;
Expand Down Expand Up @@ -150,6 +156,8 @@ pub(crate) struct BrokerRuntime {
Option<ArcMut<DefaultTransactionalMessageService<DefaultMessageStore>>>,
broker_runtime: Option<RocketMQRuntime>,
shutdown_hook: Option<BrokerShutdownHook>,
consumer_ids_change_listener: Arc<Box<dyn ConsumerIdsChangeListener + Send + Sync + 'static>>,
topic_queue_mapping_clean_service: TopicQueueMappingCleanService,
// receiver for shutdown signal
pub(crate) shutdown_rx: Option<tokio::sync::broadcast::Receiver<()>>,
}
Expand Down Expand Up @@ -178,8 +186,11 @@ impl BrokerRuntime {
broker_config.get_broker_addr().into(),
);
let producer_manager = ProducerManager::new();
let consumer_ids_change_listener: Arc<
Box<dyn ConsumerIdsChangeListener + Send + Sync + 'static>,
> = Arc::new(Box::new(DefaultConsumerIdsChangeListener {}));
let consumer_manager = ConsumerManager::new_with_broker_stats(
Arc::new(Box::new(DefaultConsumerIdsChangeListener {})),
consumer_ids_change_listener.clone(),
Arc::new(broker_config.clone()),
);

Expand Down Expand Up @@ -222,6 +233,13 @@ impl BrokerRuntime {
topic_route_info_manager: None,
escape_bridge: None,
pop_inflight_message_counter,
replicas_manager: None,
broker_fast_failure: BrokerFastFailure,
cold_data_pull_request_hold_service: None,
cold_data_cg_ctr_service: None,
pop_message_processor: None,
ack_message_processor: None,
notification_processor: None,
});
let mut stats_manager = BrokerStatsManager::new(Arc::new(inner.broker_config.clone()));
stats_manager.set_producer_state_getter(Arc::new(ProducerStateGetter {
Expand All @@ -243,6 +261,8 @@ impl BrokerRuntime {
transactional_message_service: None,
broker_runtime: Some(runtime),
shutdown_hook: None,
consumer_ids_change_listener,
topic_queue_mapping_clean_service: TopicQueueMappingCleanService,
shutdown_rx: None,
}
}
Expand All @@ -259,7 +279,11 @@ impl BrokerRuntime {
self.shutdown_basic_service().await;

self.inner.broker_outer_api.shutdown();
if let Some(message_store) = &mut self.inner.message_store {

if let Some(runtime) = self.broker_runtime.take() {
runtime.shutdown();
}
/* if let Some(message_store) = &mut self.inner.message_store {
message_store.shutdown()
}
Expand All @@ -273,7 +297,7 @@ impl BrokerRuntime {
if let Some(runtime) = self.broker_runtime.take() {
runtime.shutdown();
}
}*/
}

async fn unregister_broker(&mut self) {
Expand All @@ -296,6 +320,100 @@ impl BrokerRuntime {
if let Some(hook) = self.shutdown_hook.as_ref() {
hook.before_shutdown();
}

if let Some(broker_stats_manager) = self.inner.broker_stats_manager.as_ref() {
broker_stats_manager.shutdown();
}

if let Some(pull_request_hold_service) = self.inner.pull_request_hold_service.as_mut() {
pull_request_hold_service.shutdown();
}

if let Some(pop_message_processor) = self.inner.pop_message_processor.as_mut() {
pop_message_processor.shutdown();
}

if let Some(ack_message_processor) = self.inner.ack_message_processor.as_mut() {
ack_message_processor.shutdown();
}

if let Some(transactional_message_service) = self.transactional_message_service.as_mut() {
transactional_message_service.shutdown();
}

if let Some(notification_processor) = self.inner.notification_processor.as_mut() {
notification_processor.shutdown();
}
self.consumer_ids_change_listener.shutdown();
self.topic_queue_mapping_clean_service.shutdown();
if let Some(timer_message_store) = self.inner.timer_message_store.as_mut() {
timer_message_store.shutdown();
}

self.inner.broadcast_offset_manager.shutdown();

if let Some(message_store) = self.inner.message_store.as_mut() {
message_store.shutdown();
}

if let Some(replicas_manager) = self.inner.replicas_manager.as_mut() {
replicas_manager.shutdown();
}

self.inner.broker_fast_failure.shutdown();

if let Some(consumer_filter_manager) = self.inner.consumer_filter_manager.as_ref() {
consumer_filter_manager.persist();
}
if let Some(consumer_order_info_manager) = self.inner.consumer_order_info_manager.as_ref() {
consumer_order_info_manager.persist();
}

self.inner.schedule_message_service.persist();
self.inner.schedule_message_service.shutdown();
if let Some(transactional_message_check_service) =
self.inner.transactional_message_check_service.as_mut()
{
transactional_message_check_service.shutdown();
}
if let Some(transaction_metrics_flush_service) =
self.inner.transaction_metrics_flush_service.as_mut()
{
transaction_metrics_flush_service.shutdown();
}
if let Some(escape_bridge) = self.inner.escape_bridge.as_mut() {
escape_bridge.shutdown();
}
if let Some(topic_route_info_manager) = self.inner.topic_route_info_manager.as_mut() {
topic_route_info_manager.shutdown();
}

if let Some(topic_route_info_manager) = self.inner.topic_route_info_manager.as_mut() {
topic_route_info_manager.shutdown();
}

if let Some(cold_data_pull_request_hold_service) =
self.inner.cold_data_pull_request_hold_service.as_mut()
{
cold_data_pull_request_hold_service.shutdown();
}

if let Some(cold_data_cg_ctr_service) = self.inner.cold_data_cg_ctr_service.as_mut() {
cold_data_cg_ctr_service.shutdown();
}

if let Some(topic_config_manager) = self.inner.topic_config_manager.as_mut() {
topic_config_manager.persist();
topic_config_manager.stop();
}

if let Some(subscription_group_manager) = self.inner.subscription_group_manager.as_mut() {
subscription_group_manager.persist();
subscription_group_manager.stop();
}

self.inner.consumer_offset_manager.persist();
self.inner.consumer_offset_manager.stop();
}
}

Expand Down Expand Up @@ -557,6 +675,7 @@ impl BrokerRuntime {
self.escape_bridge.clone(),*/
self.inner.clone(),
));
self.inner.pop_message_processor = Some(pop_message_processor.clone());
let ack_message_processor = ArcMut::new(AckMessageProcessor::new(
/*self.topic_config_manager.clone(),
self.message_store.as_ref().unwrap().clone(),
Expand All @@ -570,6 +689,10 @@ impl BrokerRuntime {
self.inner.clone(),
pop_message_processor.clone(),
));
self.inner.ack_message_processor = Some(ack_message_processor.clone());

let notification_processor = ArcMut::new(NotificationProcessor::default());
self.inner.notification_processor = Some(notification_processor.clone());
BrokerRequestProcessor {
send_message_processor: ArcMut::new(send_message_processor),
pull_message_processor,
Expand All @@ -588,7 +711,7 @@ impl BrokerRuntime {
pop_message_processor,
self.inner.clone(),
)),
notification_processor: Default::default(),
notification_processor,
polling_info_processor: Default::default(),
reply_message_processor: ArcMut::new(reply_message_processor),
admin_broker_processor: ArcMut::new(admin_broker_processor),
Expand Down Expand Up @@ -1232,6 +1355,15 @@ pub(crate) struct BrokerRuntimeInner<MS> {
topic_route_info_manager: Option<TopicRouteInfoManager<MS>>,
escape_bridge: Option<EscapeBridge<MS>>,
pop_inflight_message_counter: PopInflightMessageCounter,
replicas_manager: Option<ReplicasManager>,
broker_fast_failure: BrokerFastFailure,
cold_data_pull_request_hold_service: Option<ColdDataPullRequestHoldService>,
cold_data_cg_ctr_service: Option<ColdDataCgCtrService>,

//Processor
pop_message_processor: Option<ArcMut<PopMessageProcessor<MS>>>,
ack_message_processor: Option<ArcMut<AckMessageProcessor<MS>>>,
notification_processor: Option<ArcMut<NotificationProcessor>>,
}

impl<MS: MessageStore> BrokerRuntimeInner<MS> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
use std::any::Any;

use tracing::warn;

use crate::client::consumer_group_event::ConsumerGroupEvent;
use crate::client::consumer_ids_change_listener::ConsumerIdsChangeListener;

Expand All @@ -26,6 +28,6 @@ impl ConsumerIdsChangeListener for DefaultConsumerIdsChangeListener {
fn handle(&self, _event: ConsumerGroupEvent, _group: &str, _args: &[&dyn Any]) {}

fn shutdown(&self) {
todo!()
warn!("DefaultConsumerIdsChangeListener shutdown not implemented");
}
}
3 changes: 2 additions & 1 deletion rocketmq-broker/src/schedule/schedule_message_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ impl ConfigManager for ScheduleMessageService {
}

fn encode_pretty(&self, _pretty_format: bool) -> String {
todo!()
warn!("ScheduleMessageService encode_pretty not implemented");
"".to_string()
}

fn decode(&self, _json_string: &str) {}
Expand Down

0 comments on commit a6f9764

Please sign in to comment.