用RocketMQ这么久,才知道消息可以这样玩

今天我们就来带大家如何玩转MQ的用R样玩消息。
消息中间件,才知英文Message Queue,道消简称MQ。用R样玩它没有标准定义,才知一般认为:消息中间件属于分布式系统中一个子系统,道消关注于数据的用R样玩发送和接收,利用高效可靠的才知异步消息传递机制对分布式系统中的其余各个子系统进行集成。
高效: 对于消息的道消处理处理速度快,RocketMQ可以达到单机10万+的用R样玩并发。
可靠: 一般消息中间件都会有消息持久化机制和其他的才知机制确保消息不丢失。
异步: 指发送完一个请求,道消不需要等待返回,用R样玩随时可以再发送下一个请求,才知既不需要等待。道消
消息中间件不生产消息,只是消息的搬运工。

首先Message包含的内容主要有几个方面组成:id(MQ自动生成)、Topic、tag、proerties、内容。
消息的发送分为:
普通消息顺序消息延时消息批量消息分布式消息普通消息
普通消息的企商汇发送方式主要有三种:发送同步消息、发送异步消息、单向发送。
我们可以先使用 RocketMQ 提供的原生客户端的API,在 SpringBoot、SpringCloudStream 也进行了集成,但本质上这些也是基于原生API的封装,所以我们只需要掌握原生API的时候,其他的也就无师自通了。
想要使用 RocketMQ中的API,就需要先导入对应的客户端依赖。
rocketmq-client
发送同步消息
发送同步消息是说消息发送方发出数据后,同步等待,一直等收到接收方发回响应之后才发下一个请求。这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
流程如下所示:

package org.apache.rocketmq.example.quickstart;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
/
*** 同步发送
*/
public class SyncProducer {
public static void main(String[] args) throws Exception{
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("group_test");
// 设置NameServer的地址
producer.setNamesrvAddr("127.0.0.1:9876");
//producer.setSendLatencyFaultEnable(true);
// 启动Producer实例
producer.start();
for (int i = 0; i < 10; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 发送消息到一个Broker
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}响应结果如下所示:


在上面代表的是四个queue,而maxOffset代表我们发送消息的数量,之前发送过消息,所以大家现在看到的数量是17、18...这种,当你在运行一次发送消息时,就会看到十条消息会分布在不同机器上。

发送异步消息
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。消息发送方在发送了一条消息后,不等接收方发回响应,接着进行第二条消息发送。发送方通过回调接口的方式接收服务器响应,并对响应结果进行处理。
流程如下:

package com.muxiaonong.normal;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
/
*** 异步发送--生产者
*/
public class AsyncProducer {
public static void main(String[] args) throws Exception{
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("group_test");
// 设置NameServer的地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 启动Producer实例
producer.start();
for (int i = 0; i < 10; i++) {
final int index = i;
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest", "TagA", "OrderID888",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// SendCallback接收异步返回结果的回调
producer.send(msg, new SendCallback() {
//发送成功
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%s%n", sendResult);
}
//发送异常
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}
Thread.sleep(10000);
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}发送成功报文:

我们在dashbord下看到已经成功拿到消息了。

单向发送
这种方式不需要我们特别关心发送结果的场景,比如日志发送、单向发送特点是发送方只需要负责发送消息,不需要等待服务器回应且没有回调函数触发,发送请求不需要等待应答,只管发,这种放松方式过程耗时很短,一般在微妙级别。
流程如下:

package com.muxiaonong.normal;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
/
*** 单向发送
*/
public class OnewayProducer {
public static void main(String[] args) throws Exception{
// 实例化消息生产者Producer对象
DefaultMQProducer producer = new DefaultMQProducer("group_test");
// 设置NameServer的地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 启动Producer实例
producer.start();
for (int i = 0; i < 10; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 发送单向消息,没有任何返回结果
producer.sendOneway(msg);
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}返回报文:

这种发送方式,我们客户端不会感受到发送结果,发送完成之后,我们并不知道到底有没有发送成功,我们只能在 top status 中去查看。

发送方式
发送TPS
可靠性
结果反馈
使用场景
同步消息发送
快
不丢失
有
重要通知(邮件、短信通知、)等
异步消息发送
快
不丢失
有
用户文件上传自动解析服务,完成后通知其结果
单向发送
超快
可能丢失
无
适用于 耗时非常短,但是对于可靠性要求不高的场景,比如日志收集
消息的消费方式
普通消息的消费方式主要有三种:集群消费、广播消费。
一、集群消费模式集群消费方式下,一个分组(Group) 下的多个消费者共同消费队列消息,每一个消费者出来处理的消息不一样,一个Consumer Group 中的各个Consumer 实例分摊去消费消息,一条消息只会投递到一个Consumer Group 下的一个实例,如果一个Topic有三个队列,其中一个 Consumer Group 有三个实例,那么每个实例只会消费其中一个队列,集群消费模式是消费者默认的消费方式。

实例代码:
package com.muxiaonong.normal.consumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.util.List;
/
*** 集群消费模式
*/
public class BalanceConsumer {
public static void main(String[] args) throws Exception {
// 实例化消费者,指定组名: TopicTest 10条消息 group_consumer , lijin 8(2)
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_consumer");
// 指定Namesrv地址信息.
consumer.setNamesrvAddr("127.0.0.1:9876");
// 订阅Topic
consumer.subscribe("TopicTest", "*"); //tag tagA|TagB|TagC
//consumer.setConsumeFromWhere();
//集群模式消费
consumer.setMessageModel(MessageModel.CLUSTERING);
//取消
consumer.unsubscribe("TopicTest");
//再次订阅Topic即可
consumer.subscribe("TopicTest", "*"); //tag tagA|TagB|TagC
// 注册回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List
ConsumeConcurrentlyContext context) {
try {
for(MessageExt msg : msgs) {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(), "utf-8");
String tags = msg.getTags();
Thread.sleep(1000);
System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消息者
consumer.start();
//注销Consumer
//consumer.shutdown();
System.out.printf("Consumer Started.%n");
}
}我们启动两个实例对象,分别为BalanceConsumer2和BalanceConsumer,我们再去生产者生产十条消息后,我们再去看consumer,分别均摊了这十条消息。

广播消费模式中消息将对一个Consumer Group下的各个Consumer实例都投递一遍。即使这些 Consumer属于同一个Consumer Group,消息也会被Consumer Group 中的每个Consumer都消费一次。因为一个消费组下的每个消费者实例都获取到了topic下面的每个Message Queue去拉取消费。所以消息会投递到每个消费者实例。每一个消费者下面的消费实例,都会去拿到我们Topic下的每一条消息,但是这种消费进度的保存,不会放在broker里面,而是持久化到我们的本地实例。
流程图如下:

具体代码:
package com.muxiaonong.normal.consumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.util.List;
/
*** 广播消费模式
*/
public class BroadcastConsumer {
public static void main(String[] args) throws Exception {
// 实例化消费者,指定组名: TopicTest 10条消息 group_consumer , lijin 8(2)
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_consumer");
// 指定Namesrv地址信息.
consumer.setNamesrvAddr("127.0.0.1:9876");
// 订阅Topic
consumer.subscribe("TopicTest", "*"); //tag tagA|TagB|TagC
//consumer.setConsumeFromWhere();
//广播模式消费
consumer.setMessageModel(MessageModel.BROADCASTING);
//取消
consumer.unsubscribe("TopicTest");
//再次订阅Topic即可
consumer.subscribe("TopicTest", "*"); //tag tagA|TagB|TagC
// 注册回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List
try {
for(MessageExt msg : msgs) {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(), "utf-8");
String tags = msg.getTags();
Thread.sleep(1000);
System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消息者
consumer.start();
//注销Consumer
//consumer.shutdown();
System.out.printf("Consumer Started.%n");
}
}我们先启动 BroadcastConsumer和BroadcastConsumer2,生产十条消息以后,我们会看到不管是哪个消费者,都会接收到十条消息,这个就是广播消费模式。

消息消费的权衡。
负载均衡模式: 消费端集群化部署,每条消息只需要被处理一次,由于消费进度在服务端维护,可靠性更高。
集群消费模式下,不能保证每一次失败重投的消息路由到同一台机器上,因此处理消息时不应该做任何确定性假设。每一条消息都只会被分发到一台机器上处理,如果需要被集群下的每一台机器都处理,只能使用广播模式。
广播模式: 每条消息都需要被相同逻辑的多台机器处理,消费进度在客户端维护,出现重复的概率稍大于集群模式。
广播模式下,消息队列 RocketMQ 保证每条消息至少被每台客户端消费一次,但是并不会对消费失败的消息进行失败重投,因此需要关注消费失败的情况,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息会被自动跳过,这一点是需要注意的地方
每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。目前仅 Java 客户端支持广播模式,不支持顺序消息且服务端不维护消费进度,所以消息队列 RocketMQ 控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。
顺序消息
顺序消息指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ 可以严格的保证消息有序,可以分为 分区有序 或者 全局有序。
生产消息时在默认的情况下消息发送会采取 Round Robin 轮询方式把消息发送到不同的 queue ( 分区队列);而消费消息的时候从多个 queue 上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个 queue 中,消费的时候只从这个 queue 上依次拉取,则就保证了顺序。当发送和消费参与的 queue 只有一个,则是全局有序;如果多个 queue 参与,则为分区有序,即相对每个 queue ,消息都是有序的。
全局有序
全局有序主要控制在于创建Topic指定只有一个队列,同步确保生产者与消费者都只有一个实例进行即可
分区有序
在电商业务场景中,订单的流程是:创建、付款、推送、完成。 在加入 RocketMQ 后,一个订单会分别产生对于这个订单的创建、付款、推送、完成等消息,如果我们把所有消息全部送入到 RocketMQ 中的一个主题中,如何实现针对一个订单的消息顺序性呢!如下图:

要完成分区有序性,在生产者环节使用自定义的消息队列选择策略,确保订单号尾数相同的消息会被先后发送到同一个队列中(案例中主题有3个队列,生产环境中可设定成10个满足全部尾数的需求),然后再消费端开启负载均衡模式,最终确保一个消费者拿到的消息对于一个订单来说是有序的。
/** @Author 牧小农
* @Description // 订单消息生产
* @Date 16:47 2022/8/20
* @Param
* @return
**/
public class OrderProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("OrderProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
// 订单列表
List
for (int i = 0; i < orderList.size(); i++) {
String body = orderList.get(i).toString();
Message msg = new Message("PartOrder", null, "KEY" + i, body.getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List
Long id = (Long) arg; //根据订单id选择发送queue
long index = id % mqs.size();
return mqs.get((int) index);
}
}, orderList.get(i).getOrderId());//订单id
System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
sendResult.getSendStatus(),
sendResult.getMessageQueue().getQueueId(),
body));
}
producer.shutdown();
}
/
*** 订单
*/
private static class Order {
private long orderId;
private String desc;
.....
}
/
*** 生成模拟订单数据 3个订单 每个订单4个状态
* 每个订单 创建->付款->推送->完成
*/
private List
List
Order orderDemo = new Order();
orderDemo.setOrderId(001);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
//...............
return orderList;
}
}订单消费者:
/** @Author 牧小农
* @Description // 订单消息消费
* @Date 16:46 2022/8/20
* @Param
* @return
**/
public class OrderConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OrderConsumer2");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.subscribe("PartOrder", "*");
consumer.registerMessageListener(new MessageListenerOrderly() {
Random random = new Random();
@Override
public ConsumeOrderlyStatus consumeMessage(List
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
// 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
System.out.println("consumeThread=" + Thread.currentThread().getName()
+ ",queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
}
try {
//模拟业务逻辑处理中...
TimeUnit.MILLISECONDS.sleep(random.nextInt(300));
} catch (Exception e) {
e.printStackTrace();
//一会再处理这批消息,而不是放到重试队列里
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}消息生产者:

消息消费者:

我们可以看到消息按照顺序进行了消费。使用顺序消息:首先要保证消息是有序进入MQ的,消息放入MQ之前,对id等关键字进行取模,放入指定 messageQueue ,同时 consumer 消费消息失败时,不能返回 reconsume——later ,这样会导致乱序,所以应该返回 suspend_current_queue_a_moment ,意思是先等一会,一会儿再处理这批消息,而不是放到重试队列里。
延时消息
Producer 将消息发送到消息队列 RocketMQ 服务端,但并不期望这条消息立马投递(被消费者消费),而是延迟一定时间后才投递到 Consumer 进行消费,该消息即延时消息。

消息生产和消费有时间窗口要求的场景下,比如在电商交易中超时未支付关闭订单的场景,在订单创建时向 RocketMQ 发送一条延时消息。这条消息将会在30分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。如支付未完成,则取消订单、释放库存。如已完成支付则忽略。
Apache RocketMQ 目前只支持固定精度(MQ自己规定的时间段)的定时消息,因为如果要支持任意的时间精度,在 Broker 层面,必须要做消息排序,如果再涉及到持久化,消息排序不可避免的产生巨大性能开销。(RocketMQ 的商业版本 Aliware MQ 提供了任意时刻的定时消息功能,Apache的 RocketMQ 并没有,阿里并没有开源)
Apache RocketMQ 发送延时消息是设置在每一个消息体上的,在创建消息时设定一个延时时间长度,消息将从当前发送时间点开始延迟固定时间之后才开始投递。
RocketMQ 延时消息的延迟时长不支持随意时长的延迟,是通过特定的延迟等级来指定的。默认支持18个等级的延迟消息,延时等级定义在 RocketMQ 服务端的 MessageStoreConfig 类中。
具体如下所示:
Level
延迟时间
Level
延迟时间
1
1S
10
6m
2
5S
11
7m
3
10S
12
8m
4
30S
13
9m
5
1m
14
10m
6
2m
15
20m
7
3m
16
30m
8
4m
17
1h
9
5m
18
2h
延时消息生产者:
/** @Author 牧小农
* @Description // 延时消息-生产者
* @Date 10:00 2022/8/21
* @Param
* @return
**/
public class ScheduledProducer {
public static void main(String[] args) throws Exception {
// 实例化一个生产者来产生延时消息
DefaultMQProducer producer = new DefaultMQProducer("ScheduledProducer");
// 设置NameServer的地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 启动Producer实例
producer.start();
int totalMessagesToSend = 10;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("ScheduledTopic", ("Hello scheduled message " + i).getBytes());
// 设置延时等级4,这个消息将在10s之后投递给消费者(详看delayTimeLevel)
// delayTimeLevel:(1~18个等级)"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
message.setDelayTimeLevel(3);
// 发送消息
producer.send(message);
}
// 关闭生产者
producer.shutdown();
}
}延时消息消费者:
/** @Author 牧小农
* @Description // 延时消息-消费者
* @Date 10:00 2022/8/21
* @Param
* @return
**/
public class ScheduledConsumer {
public static void main(String[] args) throws Exception {
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ScheduledConsumer");
// 指定Namesrv地址信息.
consumer.setNamesrvAddr("127.0.0.1:9876");
// 订阅Topics
consumer.subscribe("ScheduledTopic", "*");
// 注册消息监听者
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List
for (MessageExt message : messages) {
System.out.println("Receive message[msgId=" + message.getMsgId() + "] "
+ (message.getStoreTimestamp()-message.getBornTimestamp()) + "ms later");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
}
}当我们生产消息后,查看消费者信息,延时10秒后,消息才发送完成后,之后进行了消息的消费。

批量消息
批量消息发送: 能显著提高传递小消息的性能。限制是这些批量消息有相同的 topic,相同的 waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过 4MB。批量消息是一个 Collection集合,所以送入消息只要是集合就行。
批量接收消息: 能提高传递小消息的性能,同时与顺序消息配合的情况下,还能根据业务主键对顺序消息进行去重(是否可去重,需要业务来决定),减少消费者对消息的处理。
如果我们需要发送10万元素的数组,怎么快速发送完?这里可以使用批量发送,同时每一批控制在1M左右确保不超过消息大小限制。批量切分发送.
批量消息生产者:
/** @Author 牧小农
* @Description // 批量消息-生产者 list不要超过4m
* @Date 10:38 2022/8/21
* @Param
* @return
**/
public class BatchProducer {
public static void main(String[] args) throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("BatchProducer");
// 设置NameServer的地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 启动Producer实例
producer.start();
String topic = "BatchTest";
List
messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 1".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 2".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 3".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID004", "Hello world 4".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID005", "Hello world 5".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID006", "Hello world 6".getBytes()));
try {
producer.send(messages);
} catch (Exception e) {
producer.shutdown();
e.printStackTrace();
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}批量消息消费者:
/** @Author 牧小农
* @Description // 批量消息-消费者
* @Date 10:38 2022/8/21
* @Param
* @return
**/
public class BatchComuser {
public static void main(String[] args) throws Exception {
// 实例化消息生产者,指定组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("BatchComsuer");
// 指定Namesrv地址信息.
consumer.setNamesrvAddr("127.0.0.1:9876");
// 订阅Topic
consumer.subscribe("BatchTest", "*");
//负载均衡模式消费
consumer.setMessageModel(MessageModel.CLUSTERING);
// 注册回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n",
Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消息者
consumer.start();
System.out.printf("Consumer Started.%n");
}
}这样我们就实现了批量消息的发送,如果我们消息超过了,4M的时候,这个时候可以考虑消息的分割,具体代码如下:
public class ListSplitter implements Iterator> {
private int sizeLimit = 1000 * 1000;//1M
private final List
private int currIndex;
public ListSplitter(List
@Override
public boolean hasNext() { return currIndex < messages.size(); }
@Override
public List
int nextIndex = currIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = message.getTopic().length() + message.getBody().length;
Map
for (Map.Entry
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; // 增加日志的开销20字节
if (tmpSize > sizeLimit) {
if (nextIndex - currIndex == 0) {//单个消息超过了最大的限制(1M),否则会阻塞进程
nextIndex++; //假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则退出循环
}
break;
}
if (tmpSize + totalSize > sizeLimit) { break; }
else { totalSize += tmpSize; }
}
List
currIndex = nextIndex;
return subList;
}
}消息的过滤
效率的过滤主要分为两种:Tag过滤和SQL语法过滤。
在实际的开发应用中,对于一类消息尽可能使用一个Topic进行存储,但在消费时需要选择想要的消息,这时可以使用 RocketMQ 的消息过滤功能,具体实现是利用消息的Tag和Key。
Key :一般用于消息在业务层面的唯一标识。对发送的消息设置好 Key,根据这个 Key 来查找消息。比如消息异常,消息丢失,进行查找会很方便。RocketMQ 会创建专门的索引文件,用来存储 Key与消息的映射,由于底层实现是 Hash 索引,应尽量使 Key唯一,避免潜在的哈希冲突。
Tag: 可以理解为是二级分类。以电商交易平台为例,订单消息和支付消息属于不同业务类型的消息,分别创建 OrderTopic 和PayTopic ,其中订单消息根据不同的商品品类以不同的 Tag 再进行细分,如手机类、家电类、男装类、女装类、化妆品类,最后它们都被各个不同的系统所接收。通过合理的使用 Topic 和 Tag,可以让业务结构清晰,更可以提高效率。
Key和Tag的主要差别是使用场景不同,Key主要用于通过命令行命令查询消息,而Tag用于在消息端的代码中,用来进行服务端消息过滤。
Tag过滤
使用Tag过滤的方式是在消息生产时传入感兴趣的Tag标签,然后在消费端就可以根据Tag来选择您想要的消息。具体的操作是在创建Message的时候添加,一个Message只能有一个Tag。
使用案例:
/** @Author 牧小农
* @Description // tag过滤-生产者
* @Date 10:51 2022/8/21
* @Param
* @return
**/
public class TagFilterProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("TagFilterProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
// 设定三种标签
String[] tags = new String[] {"TagA", "TagB", "TagC"};
for (int i = 0; i < 3; i++) {
Message msg = new Message("TagFilterTest",
tags[i % tags.length],
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}消费者:
/** @Author 牧小农
* @Description // tag过滤-消费者
* @Date 10:51 2022/8/21
* @Param
* @return
**/
public class TagFilterConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException, IOException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TagFilterComsumer");
//指定Namesrv地址信息.
consumer.setNamesrvAddr("127.0.0.1:9876");
//只有TagA 或者TagB 的消息
consumer.subscribe("TagFilterTest", "TagA || TagB");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List
ConsumeConcurrentlyContext context) {
try {
for(MessageExt msg : msgs) {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(), "utf-8");
String msgPro = msg.getProperty("a");
String tags = msg.getTags();
System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,a : "
+ msgPro +" ,msg : " + msgBody);
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}我们生成了 TagA\b\c 三条消息,但是消费者只想接收 TagA或B, 那么我们可以在消费者端进行消息过滤。

Tag过滤的形式非常简单。
|| 代表或 * 代表所有。
因此Tag过滤对于复杂的场景可能不能进行覆盖。在这种情况下,可以使用SQL表达式筛选消息。
SQL语法
SQL基本语法:
数值比较:>,>=,<,<=,BETWEEN,=。字符比较:=,<>,IN。非空比较:IS NULL 或者 IS NOT NULL。逻辑符号:AND,OR,NOT。常量支持类型为:数值(123,3.1415)、字符(abc)单引号包裹起来、NULL、布尔值(TRUE 或 FALSE)。Sql过滤需要 Broker 开启这项功能,需要修改Broker.conf配置文件。加入enablePropertyFilter=true 然后重启Broker服务。
消息生产者,发送消息时加入消息属性,通过 putUserProperty 来设置消息的属性,生产者发送10条消息,
生产者:
/** @Author 牧小农
* @Description // sql过滤 -消息生产者(加入消息属性)
* @Date 11:04 2022/8/21
* @Param
* @return
**/
public class SqlFilterProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("SqlFilterProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC"};
for (int i = 0; i < 10; i++) {
Message msg = new Message("SqlFilterTest",
tags[i % tags.length],
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 设置SQL过滤的属性
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}消费者:
/** @Author 牧小农
* @Description // sql过滤-消费者
* @Date 11:04 2022/8/21
* @Param
* @return
**/
public class SqlFilterConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("SqlFilterConsumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("SqlFilterTest",
//bySql:通过 SQL过滤
// 1. TAGS不为空且TAGS 在(TagA, TagB)
// 2. 同时 a 不等于空并且a在0-3之间
MessageSelector.bySql("(TAGS is not null and TAGS in (TagA, TagB))" +
"and (a is not null and a between 0 and 3)"));
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List
ConsumeConcurrentlyContext context) {
try {
for(MessageExt msg : msgs) {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(), "utf-8");
String msgPro = msg.getProperty("a");
String tags = msg.getTags();
System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,a : " + msgPro +" ,msg : " + msgBody);
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}消费结果:按照Tag和SQL过滤消费3条消息。
第一个消息是TagA ,消息的属性(a)是3。
第一个消息是TagB ,消息的属性(a)是1。
第一个消息是TagA ,消息的属性(a)是0。

相关文章
手机信号满格却无法连接网络的解决方法(遇到手机信号满格但无法上网的情况?试试这些方法!)
摘要:在现代社会中,手机已经成为人们生活中必不可少的工具之一。然而,有时候我们可能会遇到手机信号满格却无法连接网络的问题,这不仅会给我们的生活和工作带来困扰,还会影响我们的沟通和信息获取...2025-11-05- 摘要:显卡作为电脑重要的组件之一,在电脑使用中扮演着不可或缺的角色。然而,由于错误的使用或其他原因,显卡容易受损,进而影响电脑的正常运行。本文将深入探讨显卡损坏的原因,并提供一些预防方法...2025-11-05
- 摘要:在现代社会中,充电器已经成为人们生活中必不可少的电子设备。作为一家知名的电子周边品牌,Baseus推出了一系列高性能的充电器产品。本文将主要介绍Baseus充电器的优势与特点,以及...2025-11-05
乐享大屏幕,打造震撼视听体验(大屏幕视听盛宴,让你畅享电影、游戏和娱乐)
摘要:在现代社会,人们对娱乐体验的追求已经不再局限于传统的电视、电影院等设备。大屏幕成为了人们追求震撼视听体验的新标配。以乐享大屏幕为主题的文章将为大家详细介绍乐享大屏幕的各个方面,并为...2025-11-05- 摘要:玉雕作为我国传统工艺的瑰宝之一,如今在电脑技术的辅助下,玉雕电脑雕刻成为了一门精细而高效的艺术形式。本文将以连做教程为主题,介绍玉雕连做到电脑雕刻的关键步骤与技巧,帮助读者掌握这一...2025-11-05
手机共享Wi-Fi给电脑,轻松畅游互联网(方便快捷的无线网络共享,提升工作和娱乐体验)
摘要:在现代社会中,我们对于互联网的需求越来越大,尤其是在工作和娱乐方面。而我们通常会在手机上连接Wi-Fi来满足这些需求。但是,有时我们需要在电脑上进行一些更复杂的任务,这时就可以考虑...2025-11-05

最新评论