Skip to content

Commit

Permalink
[ISSUE mxsm#2113]💫Implement BrokerOuterAPI#pullMessageFromSpecificBro…
Browse files Browse the repository at this point in the history
…kerAsync🥅 (mxsm#2114)
  • Loading branch information
mxsm authored Jan 6, 2025
1 parent 85041f1 commit 6605048
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 9 deletions.
142 changes: 133 additions & 9 deletions rocketmq-broker/src/out_api/broker_outer_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,26 @@ use std::sync::Weak;
use cheetah_string::CheetahString;
use dns_lookup::lookup_host;
use rocketmq_client_rust::consumer::pull_result::PullResult;
use rocketmq_client_rust::consumer::pull_status::PullStatus;
use rocketmq_client_rust::producer::send_result::SendResult;
use rocketmq_client_rust::producer::send_status::SendStatus;
use rocketmq_client_rust::PullResultExt;
use rocketmq_common::common::broker::broker_config::BrokerIdentity;
use rocketmq_common::common::config::TopicConfig;
use rocketmq_common::common::filter::expression_type::ExpressionType;
use rocketmq_common::common::message::message_client_id_setter::MessageClientIDSetter;
use rocketmq_common::common::message::message_ext::MessageExt;
use rocketmq_common::common::message::message_queue::MessageQueue;
use rocketmq_common::common::message::MessageConst;
use rocketmq_common::common::message::MessageTrait;
use rocketmq_common::common::mix_all;
use rocketmq_common::common::sys_flag::pull_sys_flag::PullSysFlag;
use rocketmq_common::common::topic::TopicValidator;
use rocketmq_common::utils::crc32_utils;
use rocketmq_common::utils::serde_json_utils::SerdeJsonUtils;
use rocketmq_common::MessageAccessor::MessageAccessor;
use rocketmq_common::MessageDecoder;
use rocketmq_common::TimeUtils::get_current_millis;
use rocketmq_remoting::clients::rocketmq_default_impl::RocketmqDefaultClient;
use rocketmq_remoting::clients::RemotingClient;
use rocketmq_remoting::code::request_code::RequestCode;
Expand All @@ -51,7 +57,11 @@ use rocketmq_remoting::protocol::header::message_operation_header::send_message_
use rocketmq_remoting::protocol::header::namesrv::register_broker_header::RegisterBrokerRequestHeader;
use rocketmq_remoting::protocol::header::namesrv::register_broker_header::RegisterBrokerResponseHeader;
use rocketmq_remoting::protocol::header::namesrv::topic_operation_header::RegisterTopicRequestHeader;
use rocketmq_remoting::protocol::header::namesrv::topic_operation_header::TopicRequestHeader;
use rocketmq_remoting::protocol::header::pull_message_request_header::PullMessageRequestHeader;
use rocketmq_remoting::protocol::header::pull_message_response_header::PullMessageResponseHeader;
use rocketmq_remoting::protocol::header::unlock_batch_mq_request_header::UnlockBatchMqRequestHeader;
use rocketmq_remoting::protocol::heartbeat::subscription_data::SubscriptionData;
use rocketmq_remoting::protocol::namesrv::RegisterBrokerResult;
use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
use rocketmq_remoting::protocol::route::route_data_view::QueueData;
Expand All @@ -62,6 +72,7 @@ use rocketmq_remoting::remoting::RemotingService;
use rocketmq_remoting::request_processor::default_request_processor::DefaultRemotingRequestProcessor;
use rocketmq_remoting::rpc::client_metadata::ClientMetadata;
use rocketmq_remoting::rpc::rpc_client_impl::RpcClientImpl;
use rocketmq_remoting::rpc::rpc_request_header::RpcRequestHeader;
use rocketmq_remoting::runtime::config::client_config::TokioClientConfig;
use rocketmq_remoting::runtime::RPCHook;
use rocketmq_rust::ArcMut;
Expand Down Expand Up @@ -494,19 +505,132 @@ impl BrokerOuterAPI {

pub async fn pull_message_from_specific_broker_async(
&self,
_broker_name: &CheetahString,
_broker_addr: &CheetahString,
_consumer_group: &CheetahString,
_topic: &CheetahString,
_queue_id: i32,
_offset: i64,
_max_nums: i32,
_timeout_millis: u64,
broker_name: &CheetahString,
broker_addr: &CheetahString,
consumer_group: &CheetahString,
topic: &CheetahString,
queue_id: i32,
offset: i64,
max_nums: i32,
timeout_millis: u64,
) -> Result<(Option<PullResult>, String, bool)> {
unimplemented!("pull_message_from_specific_broker_async")
let request_header = PullMessageRequestHeader {
consumer_group: consumer_group.clone(),
topic: topic.clone(),
queue_id,
queue_offset: offset,
max_msg_nums: max_nums,
sys_flag: PullSysFlag::build_sys_flag(false, false, true, false) as i32,
commit_offset: 0,
suspend_timeout_millis: 0,
subscription: Some(CheetahString::from_static_str(SubscriptionData::SUB_ALL)),
sub_version: get_current_millis() as i64,
expression_type: Some(CheetahString::from_static_str(ExpressionType::TAG)),
max_msg_bytes: Some(i32::MAX),
topic_request: Some(TopicRequestHeader {
lo: None,
rpc: Some(RpcRequestHeader {
broker_name: Some(broker_name.clone()),
..Default::default()
}),
}),
..Default::default()
};
let request_command =
RemotingCommand::create_request_command(RequestCode::PullMessage, request_header);
match self
.remoting_client
.invoke_async(Some(broker_addr), request_command, timeout_millis)
.await
{
Ok(response) => {
let code = response.code();
let mut pull_result_ext = match process_pull_response(response, broker_addr) {
Ok(value) => value,
Err(_) => return Ok((None, format!("Response Code:{}", code), true)),
};
let name = pull_result_ext.pull_result.pull_status.to_string();
process_pull_result(&mut pull_result_ext, broker_name, queue_id);
Ok((Some(pull_result_ext.pull_result), name, false))
}
Err(e) => Ok((None, e.to_string(), true)),
}
}
}

fn process_pull_result(
pull_result: &mut PullResultExt,
broker_name: &CheetahString,
queue_id: i32,
) {
if pull_result.pull_result.pull_status == PullStatus::Found {
let mut bytes = pull_result.message_binary.take().unwrap_or_default();
let mut message_list = MessageDecoder::decodes_batch(&mut bytes, true, true);
for message in message_list.iter_mut() {
let tra_flag = message.get_property(&CheetahString::from_static_str(
MessageConst::PROPERTY_TRANSACTION_PREPARED,
));
if tra_flag.is_some() && tra_flag.unwrap() == "true" {
if let Some(id) = message.get_property(&CheetahString::from_static_str(
MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
)) {
message.set_transaction_id(id);
}
}
MessageAccessor::put_property(
message,
CheetahString::from_static_str(MessageConst::PROPERTY_MIN_OFFSET),
pull_result.pull_result.min_offset.to_string().into(),
);
MessageAccessor::put_property(
message,
CheetahString::from_static_str(MessageConst::PROPERTY_MAX_OFFSET),
pull_result.pull_result.max_offset.to_string().into(),
);
message.set_broker_name(broker_name.clone());
message.set_queue_id(queue_id);
if let Some(offset_delta) = pull_result.offset_delta {
message.set_queue_offset(message.queue_offset + offset_delta);
}
}
}
}

fn process_pull_response(
mut response: RemotingCommand,
addr: &CheetahString,
) -> Result<PullResultExt> {
let pull_status = match ResponseCode::from(response.code()) {
ResponseCode::Success => PullStatus::Found,
ResponseCode::PullNotFound => PullStatus::NoNewMsg,
ResponseCode::PullRetryImmediately => PullStatus::NoMatchedMsg,
ResponseCode::PullOffsetMoved => PullStatus::OffsetIllegal,
_ => {
return Err(BrokerError::MQBrokerError(
response.code(),
response.remark().map_or("".to_string(), |s| s.to_string()),
addr.to_string(),
))
}
};
let response_header = response
.decode_command_custom_header::<PullMessageResponseHeader>()
.map_err(BrokerRemotingError)?;
let pull_result = PullResultExt {
pull_result: PullResult {
pull_status,
next_begin_offset: response_header.next_begin_offset as u64,
min_offset: response_header.min_offset as u64,
max_offset: response_header.max_offset as u64,
msg_found_list: Some(vec![]),
},
suggest_which_broker_id: response_header.suggest_which_broker_id,
message_binary: response.take_body(),
offset_delta: response_header.offset_delta,
};
Ok(pull_result)
}

fn dns_lookup_address_by_domain(domain: &str) -> Vec<CheetahString> {
let mut address_list = Vec::new();
// Ensure logging is initialized
Expand Down
2 changes: 2 additions & 0 deletions rocketmq-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,6 @@ pub mod producer;
mod trace;
pub mod utils;

pub use crate::consumer::consumer_impl::pull_request_ext::PullResultExt;

pub type Result<T> = std::result::Result<T, MQClientError>;

0 comments on commit 6605048

Please sign in to comment.