RabbitMQ如何保证消息的可靠投递?

Spring Boot整合RabbitMQ
github地址:
https://github.com/erlieStar/rabbitmq-examples
Spring有三种配置方式
基于XML 基于JavaConfig 基于注解当然现在已经很少使用XML来做配置了,何保只介绍一下用JavaConfig和注解的证消配置方式
RabbitMQ整合Spring Boot,我们只需要增加对应的可靠starter即可
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>基于注解
在application.yaml的配置如下
spring: rabbitmq: host: myhost port: 5672 username: guest password: guest virtual-host: / log: exchange: log.exchange info: queue: info.log.queue binding-key: info.log.key error: queue: error.log.queue binding-key: error.log.key all: queue: all.log.queue binding-key: *.log.key消费者代码如下
@Slf4j @Component public class LogReceiverListener { /** * 接收info级别的日志 */ @RabbitListener( bindings = @QueueBinding( value = @Queue(value = "${log.info.queue}", durable = "true"), exchange = @Exchange(value = "${log.exchange}", type = ExchangeTypes.TOPIC), key = "${log.info.binding-key}" ) ) public void infoLog(Message message) { String msg = new String(message.getBody()); log.info("infoLogQueue 收到的消息为: {}", msg); } /** * 接收所有的日志 */ @RabbitListener( bindings = @QueueBinding( value = @Queue(value = "${log.all.queue}", durable = "true"), exchange = @Exchange(value = "${log.exchange}", type = ExchangeTypes.TOPIC), key = "${log.all.binding-key}" ) ) public void allLog(Message message) { String msg = new String(message.getBody()); log.info("allLogQueue 收到的消息为: {}", msg); } }生产者如下
@RunWith(SpringRunner.class) @SpringBootTest public class MsgProducerTest { @Autowired private AmqpTemplate amqpTemplate; @Value("${log.exchange}") private String exchange; @Value("${log.info.binding-key}") private String routingKey; @SneakyThrows @Test public void sendMsg() { for (int i = 0; i < 5; i++) { String message = "this is info message " + i; amqpTemplate.convertAndSend(exchange, routingKey, message); } System.in.read(); } }Spring Boot针对消息ack的方式和原生api针对消息ack的方式有点不同
原生api消息ack的方式
消息的确认方式有2种
自动确认(autoAck=true)
手动确认(autoAck=false)
消费者在消费消息的时候,可以指定autoAck参数
String basicConsume(String queue,投递 boolean autoAck, Consumer callback)
autoAck=false: RabbitMQ会等待消费者显示回复确认消息后才从内存(或者磁盘)中移出消息
autoAck=true: RabbitMQ会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,何保而不管消费者是证消否真正的消费了这些消息
手动确认的方法如下,有2个参数
basicAck(long deliveryTag,可靠 boolean multiple)
deliveryTag: 用来标识信道中投递的消息。RabbitMQ 推送消息给Consumer时,投递会附带一个deliveryTag,何保以便Consumer可以在消息确认时告诉RabbitMQ到底是证消哪条消息被确认了。
RabbitMQ保证在每个信道中,可靠每条消息的投递deliveryTag从1开始递增
multiple=true: 消息id<=deliveryTag的消息,都会被确认
myltiple=false: 消息id=deliveryTag的何保消息,源码下载都会被确认
消息一直不确认会发生啥?证消
如果队列中的消息发送到消费者后,消费者不对消息进行确认,可靠那么消息会一直留在队列中,直到确认才会删除。
如果发送到A消费者的消息一直不确认,只有等到A消费者与rabbitmq的连接中断,rabbitmq才会考虑将A消费者未确认的消息重新投递给另一个消费者
Spring Boot中针对消息ack的方式
有三种方式,定义在AcknowledgeMode枚举类中
方式 解释 NONE 没有ack,等价于原生api中的autoAck=true MANUAL 用户需要手动发送ack或者nack AUTO 方法正常结束,spring boot 框架返回ack,发生异常spring boot框架返回nackspring boot针对消息默认的ack的方式为AUTO。
在实际场景中,我们一般都是手动ack。
application.yaml的配置改为如下
spring: rabbitmq: host: myhost port: 5672 username: guest password: guest virtual-host: / listener: simple: acknowledge-mode: manual # 手动ack,默认为auto相应的消费者代码改为
@Slf4j @Component public class LogListenerManual { /** * 接收info级别的日志 */ @RabbitListener( bindings = @QueueBinding( value = @Queue(value = "${log.info.queue}", durable = "true"), exchange = @Exchange(value = "${log.exchange}", type = ExchangeTypes.TOPIC), key = "${log.info.binding-key}" ) ) public void infoLog(Message message, Channel channel) throws Exception { String msg = new String(message.getBody()); log.info("infoLogQueue 收到的消息为: {}", msg); try { // 这里写各种业务逻辑 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } } }我们上面用到的注解,作用如下
注解 作用 RabbitListener 消费消息,可以定义在类上,方法上,当定义在类上时需要和RabbitHandler配合使用 QueueBinding 定义绑定关系 Queue 定义队列 Exchange 定义交换机 RabbitHandler RabbitListener定义在类上时,云服务器需要用RabbitHandler指定处理的方法基于JavaConfig
既然用注解这么方便,为啥还需要JavaConfig的方式呢?
JavaConfig方便自定义各种属性,比如同时配置多个virtual host等
具体代码看GitHub把
RabbitMQ如何保证消息的可靠投递
一个消息往往会经历如下几个阶段

在这里插入图片描述
所以要保证消息的可靠投递,只需要保证这3个阶段的可靠投递即可
生产阶段
这个阶段的可靠投递主要靠ConfirmListener(发布者确认)和ReturnListener(失败通知)
前面已经介绍过了,一条消息在RabbitMQ中的流转过程为
producer -> rabbitmq broker cluster -> exchange -> queue -> consumer
ConfirmListener可以获取消息是否从producer发送到broker
ReturnListener可以获取从exchange路由不到queue的消息
我用Spring Boot Starter 的api来演示一下效果
application.yaml
spring: rabbitmq: host: myhost port: 5672 username: guest password: guest virtual-host: / listener: simple: acknowledge-mode: manual # 手动ack,默认为auto log: exchange: log.exchange info: queue: info.log.queue binding-key: info.log.key发布者确认回调
@Component public class ConfirmCallback implements RabbitTemplate.ConfirmCallback { @Autowired private MessageSender messageSender; @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String msgId = correlationData.getId(); String msg = messageSender.dequeueUnAckMsg(msgId); if (ack) { System.out.println(String.format("消息 {%s} 成功发送给mq", msg)); } else { // 可以加一些重试的逻辑 System.out.println(String.format("消息 {%s} 发送mq失败", msg)); } } }失败通知回调
@Component public class ReturnCallback implements RabbitTemplate.ReturnCallback { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { String msg = new String(message.getBody()); System.out.println(String.format("消息 {%s} 不能被正确路由,routingKey为 {%s}", msg, routingKey)); } } @Configuration public class RabbitMqConfig { @Bean public ConnectionFactory connectionFactory( @Value("${spring.rabbitmq.host}") String host, @Value("${spring.rabbitmq.port}") int port, @Value("${spring.rabbitmq.username}") String username, @Value("${spring.rabbitmq.password}") String password, @Value("${spring.rabbitmq.virtual-host}") String vhost) { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host); connectionFactory.setPort(port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(vhost); connectionFactory.setPublisherConfirms(true); connectionFactory.setPublisherReturns(true); return connectionFactory; } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, ReturnCallback returnCallback, ConfirmCallback confirmCallback) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setReturnCallback(returnCallback); rabbitTemplate.setConfirmCallback(confirmCallback); // 要想使 returnCallback 生效,必须设置为true rabbitTemplate.setMandatory(true); return rabbitTemplate; } }这里我对RabbitTemplate做了一下包装,主要就是发送的时候增加消息id,并且保存消息id和消息的对应关系,因为RabbitTemplate.ConfirmCallback只能拿到消息id,并不能拿到消息内容,所以需要我们自己保存这种映射关系。在一些可靠性要求比较高的系统中,你可以将这种映射关系存到数据库中,成功发送删除映射关系,失败则一直发送
@Component public class MessageSender { @Autowired private RabbitTemplate rabbitTemplate; public final Map<String, String> unAckMsgQueue = new ConcurrentHashMap<>(); public void convertAndSend(String exchange, String routingKey, String message) { String msgId = UUID.randomUUID().toString(); CorrelationData correlationData = new CorrelationData(); correlationData.setId(msgId); rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData); unAckMsgQueue.put(msgId, message); } public String dequeueUnAckMsg(String msgId) { return unAckMsgQueue.remove(msgId); } }测试代码为
@RunWith(SpringRunner.class) @SpringBootTest public class MsgProducerTest { @Autowired private MessageSender messageSender; @Value("${log.exchange}") private String exchange; @Value("${log.info.binding-key}") private String routingKey; /** * 测试失败通知 */ @SneakyThrows @Test public void sendErrorMsg() { for (int i = 0; i < 3; i++) { String message = "this is error message " + i; messageSender.convertAndSend(exchange, "test", message); } System.in.read(); } /** * 测试发布者确认 */ @SneakyThrows @Test public void sendInfoMsg() { for (int i = 0; i < 3; i++) { String message = "this is info message " + i; messageSender.convertAndSend(exchange, routingKey, message); } System.in.read(); } }先来测试失败者通知
输出为
消息 {this is error message 0} 不能被正确路由,routingKey为 {test} 消息 {this is error message 0} 成功发送给mq 消息 {this is error message 2} 不能被正确路由,服务器租用routingKey为 {test} 消息 {this is error message 2} 成功发送给mq 消息 {this is error message 1} 不能被正确路由,routingKey为 {test} 消息 {this is error message 1} 成功发送给mq消息都成功发送到broker,但是并没有被路由到queue中
再来测试发布者确认
输出为
消息 {this is info message 0} 成功发送给mq infoLogQueue 收到的消息为: {this is info message 0} infoLogQueue 收到的消息为: {this is info message 1} 消息 {this is info message 1} 成功发送给mq infoLogQueue 收到的消息为: {this is info message 2} 消息 {this is info message 2} 成功发送给mq消息都成功发送到broker,也成功被路由到queue中
存储阶段
这个阶段的高可用还真没研究过,毕竟集群都是运维搭建的,后续有时间的话会把这快的内容补充一下
消费阶段
消费阶段的可靠投递主要靠ack来保证。
总而言之,在生产环境中,我们一般都是单条手动ack,消费失败后不会重新入队(因为很大概率还会再次失败),而是将消息重新投递到死信队列,方便以后排查问题
总结一下各种情况
ack后消息从broker中删除 nack或者reject后,分为如下2种情况(1) reque=true,则消息会被重新放入队列
(2) reque=fasle,消息会被直接丢弃,如果指定了死信队列的话,会被投递到死信队列
本文转载自微信公众号「Java识堂」,可以通过以下二维码关注。转载本文请联系Java识堂公众号。

相关文章
以建荣SD卡量产工具教程(关键步骤详解,让你快速掌握SD卡量产技巧)
摘要:在如今数字化时代,SD卡已成为存储设备的标配。然而,在一些特定的场景下,我们常常需要大批量制作相同规格的SD卡。为了提高效率和节省时间,以建荣SD卡量产工具应运而生。本文将详细介绍...2025-11-05- 摘要:电脑黑屏是一个常见的问题,而重装操作系统通常是解决这个问题的有效办法。本文将为大家详细介绍如何在电脑黑屏的情况下进行系统重装,帮助读者解决电脑黑屏的困扰。准备工作:检查硬件...2025-11-05
- 摘要:作为一款备受瞩目的智能手机,三星S8在摄影方面具备出色的表现。本文将深入探讨三星S8在摄影领域的实力,并分析其优秀的成像能力和创新的摄影技术。一:全新的镜头设计带来更清晰的...2025-11-05
- 摘要:作为一种新兴的机械键盘产品,GK87草木绿键盘以其独特的外观设计和强大的功能受到了广大玩家的喜爱。它不仅有着出色的性能,还凭借着草木绿的外观色彩给人带来了愉悦的视觉体验。本文将为您...2025-11-05
- 摘要:电脑主板硬盘的分区是一个重要的步骤,它可以帮助我们更好地管理电脑上的数据。不正确的分区操作可能会导致数据丢失或硬盘损坏,因此学习正确的分区技巧非常重要。本文将为您详细介绍以电脑主板...2025-11-05
大白菜装机教程(详解gho文件的使用方法,让电脑装机更快捷)
摘要:在电脑使用过程中,难免会遇到各种问题,例如系统崩溃、病毒感染等,而重装电脑则是解决这些问题的有效方法之一。本文将详细介绍利用gho文件进行电脑重装的步骤和注意事项,帮助读者更快捷地...2025-11-05

最新评论