Skip to content

RocketMQ

format edited this page Nov 30, 2018 · 18 revisions

Spring Cloud Alibaba Rocket Binder

RocketMQ 介绍

RocketMQ 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。同时,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。

具有以下特点:

  • 能够保证严格的消息顺序

  • 提供丰富的消息拉取模式

  • 高效的订阅者水平扩展能力

  • 实时的消息订阅机制

  • 亿级消息堆积能力

RocketMQ 基本使用

  • 下载 RocketMQ

解压后的目录结构如下:

apache-rocketmq
├── LICENSE
├── NOTICE
├── README.md
├── benchmark
├── bin
├── conf
└── lib
  • 启动 NameServer

nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log
  • 启动 Broker

nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log
  • 发送、接收消息

发送消息:

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

发送成功后显示:SendResult [sendStatus=SEND_OK, msgId= …​

接收消息:

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

接收成功后显示:ConsumeMessageThread_%d Receive New Messages: [MessageExt…​

  • 关闭 Server

sh bin/mqshutdown broker
sh bin/mqshutdown namesrv

Spring Cloud Stream 介绍

Spring Cloud Stream 是一个用于构建基于消息的微服务应用框架。它基于 SpringBoot 来创建具有生产级别的单机 Spring 应用,并且使用 Spring Integration 与 Broker 进行连接。

Spring Cloud Stream 提供了消息中间件配置的统一抽象,推出了 publish-subscribe、consumer groups、partition 这些统一的概念。

Spring Cloud Stream 内部有两个概念:Binder 和 Binding。

  • Binder: 跟外部消息中间件集成的组件,用来创建 Binding,各消息中间件都有自己的 Binder 实现。

比如 Kafka 的实现 KafkaMessageChannelBinderRabbitMQ 的实现 RabbitMessageChannelBinder 以及 RocketMQ 的实现 RocketMQMessageChannelBinder

  • Binding: 包括 Input Binding 和 Output Binding。

Binding 在消息中间件与应用程序提供的 Provider 和 Consumer 之间提供了一个桥梁,实现了开发者只需使用应用程序的 Provider 或 Consumer 生产或消费数据即可,屏蔽了开发者与底层消息中间件的接触。

SCSt overview
Figure 1. Spring Cloud Stream

使用 Spring Cloud Stream 完成一段简单的消息发送和消息接收代码:

MessageChannel messageChannel = new DirectChannel();

// 消息订阅
((SubscribableChannel) messageChannel).subscribe(new MessageHandler() {
    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
        System.out.println("receive msg: " + message.getPayload());
    }
});

// 消息发送
messageChannel.send(MessageBuilder.withPayload("simple msg").build());

这段代码所有的消息类都是 spring-messaging 模块里提供的。屏蔽具体消息中间件的底层实现,如果想用更换消息中间件,在配置文件里配置相关消息中间件信息以及修改 binder 依赖即可。

Spring Cloud Stream 底层也是基于这段代码去做了各种抽象。

Spring Cloud Alibaba RocketMQ Binder 实现原理

1543560843558 24525bf4 1d0e 4e10 be5f bdde7127f6e6
Figure 2. RocketMQ Binder处理流程

RocketMQ Binder 的核心主要就是这3个类:RocketMQMessageChannelBinderRocketMQInboundChannelAdapterRocketMQMessageHandler

RocketMQMessageChannelBinder 是个标准的 Binder 实现,其内部构建 RocketMQInboundChannelAdapterRocketMQMessageHandler

RocketMQMessageHandler 用于 RocketMQ Producer 的启动以及消息的发送,其内部会根据 spring-messaging 模块内 org.springframework.messaging.Message 消息类,去创建 RocketMQ 的消息类 org.apache.rocketmq.common.message.Message

在构造 org.apache.rocketmq.common.message.Message 的过程中会根据 org.springframework.messaging.Message 的 Header 构造成 RocketMQMessageHeaderAccessor。然后再根据 RocketMQMessageHeaderAccessor 中的一些属性,比如 tags、keys、flag等属性设置到 RocketMQ 的消息类 org.apache.rocketmq.common.message.Message 中。

RocketMQInboundChannelAdapter 用于 RocketMQ Consumer 的启动以及消息的接收。其内部还支持 spring-retry 的使用。

在消费消息的时候可以从 Header 中获取 Acknowledgement 并进行一些设置。

比如使用 MessageListenerConcurrently 进行异步消费的时候,可以设置延迟消费:

@StreamListener("input")
public void receive(Message message) {
    RocketMQMessageHeaderAccessor headerAccessor = new RocketMQMessageHeaderAccessor(message);
    Acknowledgement acknowledgement = headerAccessor.getAcknowledgement(message);
    acknowledgement.setConsumeConcurrentlyStatus(ConsumeConcurrentlyStatus.RECONSUME_LATER);
    acknowledgement.setConsumeConcurrentlyDelayLevel(1);
}

比如使用 MessageListenerOrderly 进行顺序消费的时候,可以设置延迟消费:

@StreamListener("input")
public void receive(Message message) {
    RocketMQMessageHeaderAccessor headerAccessor = new RocketMQMessageHeaderAccessor(message);
    Acknowledgement acknowledgement = headerAccessor.getAcknowledgement(message);
    acknowledgement.setConsumeOrderlyStatus(ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT);
    acknowledgement.setConsumeOrderlySuspendCurrentQueueTimeMill(5000);
}

Provider端支持的配置:

配置项 含义 默认值

spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.enabled

是否启用producer

true

spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.max-message-size

消息发送的最大字节数

0(大于0才会生效,RocketMQ 默认值为4M = 1024 * 1024 * 4)

Consumer端支持的配置:

配置项 含义 默认值

spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.enabled

是否启用consumer

true

spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.tags

Consumer订阅只有包括这些tags的topic消息。多个标签之间使用 "||" 分割

spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.sql

Consumer订阅满足sql要求的topic消息

spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.broadcasting

Consumer是否是广播模式

false

spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.orderly

顺序消费 or 异步消费

false

Endpoint支持

在使用Endpoint特性之前需要在 Maven 中添加 spring-boot-starter-actuator 依赖,并在配置中允许 Endpoints 的访问。

  • Spring Boot 1.x 中添加配置 management.security.enabled=false。暴露的 endpoint 路径为 /rocketmq_binder

  • Spring Boot 2.x 中添加配置 management.endpoints.web.exposure.include=*。暴露的 endpoint 路径为 /actuator/rocketmq-binder

Endpoint 会统计消息最后一次发送的数据,消息发送成功或失败的次数,消息消费成功或失败的次数等数据。

{
    "runtime": {
        "lastSend.timestamp": 1542786623915
    },
    "metrics": {
        "scs-rocketmq.consumer.test-topic.totalConsumed": {
            "count": 11
        },
        "scs-rocketmq.consumer.test-topic.totalConsumedFailures": {
            "count": 0
        },
        "scs-rocketmq.producer.test-topic.totalSentFailures": {
            "count": 0
        },
        "scs-rocketmq.consumer.test-topic.consumedPerSecond": {
            "count": 11,
            "fifteenMinuteRate": 0.012163847780107841,
            "fiveMinuteRate": 0.03614605351360527,
            "meanRate": 0.3493213353657594,
            "oneMinuteRate": 0.17099243039490175
        },
        "scs-rocketmq.producer.test-topic.totalSent": {
            "count": 5
        },
        "scs-rocketmq.producer.test-topic.sentPerSecond": {
            "count": 5,
            "fifteenMinuteRate": 0.005540151995103271,
            "fiveMinuteRate": 0.01652854617838251,
            "meanRate": 0.10697493212602836,
            "oneMinuteRate": 0.07995558537067671
        },
        "scs-rocketmq.producer.test-topic.sentFailuresPerSecond": {
            "count": 0,
            "fifteenMinuteRate": 0.0,
            "fiveMinuteRate": 0.0,
            "meanRate": 0.0,
            "oneMinuteRate": 0.0
        },
        "scs-rocketmq.consumer.test-topic.consumedFailuresPerSecond": {
            "count": 0,
            "fifteenMinuteRate": 0.0,
            "fiveMinuteRate": 0.0,
            "meanRate": 0.0,
            "oneMinuteRate": 0.0
        }
    }
}

注意:要想查看统计数据需要在pom里加上 metrics-core依赖。如若不加,endpoint 将会显示 warning 信息而不会显示统计信息:

{
    "warning": "please add metrics-core dependency, we use it for metrics"
}
Clone this wiki locally