-
Notifications
You must be signed in to change notification settings - Fork 8.4k
RocketMQ
RocketMQ 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。同时,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。
具有以下特点:
-
能够保证严格的消息顺序
-
提供丰富的消息拉取模式
-
高效的订阅者水平扩展能力
-
实时的消息订阅机制
-
亿级消息堆积能力
-
下载 RocketMQ
下载 RocketMQ最新的二进制文件,并解压
解压后的目录结构如下:
apache-rocketmq
├── LICENSE
├── NOTICE
├── README.md
├── benchmark
├── bin
├── conf
└── lib
-
启动 NameServer
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log
-
启动 Broker
nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log
-
发送、接收消息
发送消息:
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
发送成功后显示:SendResult [sendStatus=SEND_OK, msgId= …
接收消息:
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
接收成功后显示:ConsumeMessageThread_%d Receive New Messages: [MessageExt…
-
关闭 Server
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv
Spring Cloud Stream 是一个用于构建基于消息的微服务应用框架。它基于 SpringBoot 来创建具有生产级别的单机 Spring 应用,并且使用 Spring Integration
与 Broker 进行连接。
Spring Cloud Stream 提供了消息中间件配置的统一抽象,推出了 publish-subscribe、consumer groups、partition 这些统一的概念。
Spring Cloud Stream 内部有两个概念:Binder 和 Binding。
-
Binder: 跟外部消息中间件集成的组件,用来创建 Binding,各消息中间件都有自己的 Binder 实现。
比如 Kafka
的实现 KafkaMessageChannelBinder
,RabbitMQ
的实现 RabbitMessageChannelBinder
以及 RocketMQ
的实现 RocketMQMessageChannelBinder
。
-
Binding: 包括 Input Binding 和 Output Binding。
Binding 在消息中间件与应用程序提供的 Provider 和 Consumer 之间提供了一个桥梁,实现了开发者只需使用应用程序的 Provider 或 Consumer 生产或消费数据即可,屏蔽了开发者与底层消息中间件的接触。
使用 Spring Cloud Stream 完成一段简单的消息发送和消息接收代码:
MessageChannel messageChannel = new DirectChannel();
// 消息订阅
((SubscribableChannel) messageChannel).subscribe(new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println("receive msg: " + message.getPayload());
}
});
// 消息发送
messageChannel.send(MessageBuilder.withPayload("simple msg").build());
这段代码所有的消息类都是 spring-messaging
模块里提供的。屏蔽具体消息中间件的底层实现,如果想用更换消息中间件,在配置文件里配置相关消息中间件信息以及修改 binder 依赖即可。
Spring Cloud Stream 底层也是基于这段代码去做了各种抽象。
RocketMQ Binder 的核心主要就是这3个类:RocketMQMessageChannelBinder
,RocketMQInboundChannelAdapter
和 RocketMQMessageHandler
。
RocketMQMessageChannelBinder
是个标准的 Binder 实现,其内部构建 RocketMQInboundChannelAdapter
和 RocketMQMessageHandler
。
RocketMQMessageHandler
用于 RocketMQ Producer
的启动以及消息的发送,其内部会根据 spring-messaging
模块内 org.springframework.messaging.Message
消息类,去创建 RocketMQ 的消息类 org.apache.rocketmq.common.message.Message
。
在构造 org.apache.rocketmq.common.message.Message
的过程中会根据 org.springframework.messaging.Message
的 Header 构造成 RocketMQMessageHeaderAccessor
。然后再根据 RocketMQMessageHeaderAccessor
中的一些属性,比如 tags、keys、flag等属性设置到 RocketMQ 的消息类 org.apache.rocketmq.common.message.Message
中。
RocketMQInboundChannelAdapter
用于 RocketMQ Consumer
的启动以及消息的接收。其内部还支持 spring-retry 的使用。
在消费消息的时候可以从 Header 中获取 Acknowledgement
并进行一些设置。
比如使用 MessageListenerConcurrently
进行异步消费的时候,可以设置延迟消费:
@StreamListener("input")
public void receive(Message message) {
RocketMQMessageHeaderAccessor headerAccessor = new RocketMQMessageHeaderAccessor(message);
Acknowledgement acknowledgement = headerAccessor.getAcknowledgement(message);
acknowledgement.setConsumeConcurrentlyStatus(ConsumeConcurrentlyStatus.RECONSUME_LATER);
acknowledgement.setConsumeConcurrentlyDelayLevel(1);
}
比如使用 MessageListenerOrderly
进行顺序消费的时候,可以设置延迟消费:
@StreamListener("input")
public void receive(Message message) {
RocketMQMessageHeaderAccessor headerAccessor = new RocketMQMessageHeaderAccessor(message);
Acknowledgement acknowledgement = headerAccessor.getAcknowledgement(message);
acknowledgement.setConsumeOrderlyStatus(ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT);
acknowledgement.setConsumeOrderlySuspendCurrentQueueTimeMill(5000);
}
Provider端支持的配置:
配置项 | 含义 | 默认值 |
---|---|---|
|
是否启用producer |
true |
|
消息发送的最大字节数 |
0(大于0才会生效,RocketMQ 默认值为4M = 1024 * 1024 * 4) |
Consumer端支持的配置:
配置项 | 含义 | 默认值 |
---|---|---|
|
是否启用consumer |
true |
|
Consumer订阅只有包括这些tags的topic消息。多个标签之间使用 "||" 分割 |
|
|
Consumer订阅满足sql要求的topic消息 |
|
|
Consumer是否是广播模式 |
false |
|
顺序消费 or 异步消费 |
false |
在使用Endpoint特性之前需要在 Maven 中添加 spring-boot-starter-actuator
依赖,并在配置中允许 Endpoints 的访问。
-
Spring Boot 1.x 中添加配置
management.security.enabled=false
。暴露的 endpoint 路径为/rocketmq_binder
-
Spring Boot 2.x 中添加配置
management.endpoints.web.exposure.include=*
。暴露的 endpoint 路径为/actuator/rocketmq-binder
Endpoint 会统计消息最后一次发送的数据,消息发送成功或失败的次数,消息消费成功或失败的次数等数据。
{
"runtime": {
"lastSend.timestamp": 1542786623915
},
"metrics": {
"scs-rocketmq.consumer.test-topic.totalConsumed": {
"count": 11
},
"scs-rocketmq.consumer.test-topic.totalConsumedFailures": {
"count": 0
},
"scs-rocketmq.producer.test-topic.totalSentFailures": {
"count": 0
},
"scs-rocketmq.consumer.test-topic.consumedPerSecond": {
"count": 11,
"fifteenMinuteRate": 0.012163847780107841,
"fiveMinuteRate": 0.03614605351360527,
"meanRate": 0.3493213353657594,
"oneMinuteRate": 0.17099243039490175
},
"scs-rocketmq.producer.test-topic.totalSent": {
"count": 5
},
"scs-rocketmq.producer.test-topic.sentPerSecond": {
"count": 5,
"fifteenMinuteRate": 0.005540151995103271,
"fiveMinuteRate": 0.01652854617838251,
"meanRate": 0.10697493212602836,
"oneMinuteRate": 0.07995558537067671
},
"scs-rocketmq.producer.test-topic.sentFailuresPerSecond": {
"count": 0,
"fifteenMinuteRate": 0.0,
"fiveMinuteRate": 0.0,
"meanRate": 0.0,
"oneMinuteRate": 0.0
},
"scs-rocketmq.consumer.test-topic.consumedFailuresPerSecond": {
"count": 0,
"fifteenMinuteRate": 0.0,
"fiveMinuteRate": 0.0,
"meanRate": 0.0,
"oneMinuteRate": 0.0
}
}
}
注意:要想查看统计数据需要在pom里加上 metrics-core依赖。如若不加,endpoint 将会显示 warning 信息而不会显示统计信息:
{
"warning": "please add metrics-core dependency, we use it for metrics"
}
- 文档
- Documents
- Open Source components
- Commercial components
- Example
- awesome spring cloud alibaba