RabbitMQ可靠性传输
工作队列(Work Queues又名:任务队列)

工作队列(又名:任务队列)
对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
轮循分发(Round-robin dispatching)
任务队列的一个好处就是能很简单的并行工作。如果我们有积压的工作,我们只需要添加更多的工作者,很容易扩展。三个消费者顺序读取队列中的消息






1 |
|
1 |
|
消息应答(ACK消费者可靠性):
主要解决消息应答手动ack确保(消费者维度) - 解决”消费者挂了怎么办” 解决了‘处没处理’的问题
完成一个任务需要花费一些时间,你可以想象一个需要较长时间完成的任务在执行的中途中挂了会发生什么。我们当前的代码,一旦RabbitMQ将消息交付给客户,它将立即从内存中被移除。在这个例子中,如果你在执行过程中杀死一个工作者我们将丢失这个消息。我们也会丢失所有分发给指定的这个工作者但是还没有处理的消息。
但是我们不想丢失任何任务。如果一个工作者挂了,我们希望这个任务能交付给另一个工作者。
为了确保消息永远不会丢失,RabbitMQ提供了消息确认机制,消费者向RabbitMQ中发送一个确认表示消息已经接收、处理并且RabbitMQ可以自由的删除它了。
如果一个消费者挂了(通道关闭,连接关闭或者TCP连接丢失)没有发送确认,RabbitMQ将会理解成消息没有被完全处理并将消息发回队列。如果这个时候有其他消费者在线,它将被快速的交付给另一个消费者。通过这种方式能确保没有消息丢失,即使工作者偶尔挂掉。
这里没有消息超时,当消费者挂了RabbitMQ将会重新交付消息。如果一个消息的处理过程花费了很长很长的时间,这个是允许的。
简单版
为什么需要消息确认?
- 工作者(消费者)在处理消息过程中崩溃
- RabbitMQ立即从内存移除已交付的消息
- 导致消息永久丢失
解决方案:
RabbitMQ的消息确认(Acknowledgement)机制确保消息不会丢失。
1 | 生产者 → RabbitMQ → 消息交付给消费者 → 消费者处理消息 → 发送确认 → RabbitMQ删除消息 |
确认模式对比
| 模式 | 机制 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|---|
| 自动应答 | 消息一送达就自动确认 | 高吞吐量 | 消息可能丢失 | 可快速处理、允许丢失的场景 |
| 手动应答 | 处理完成后手动发送确认 | 数据安全,不会丢失 | 吞吐量较低 | 重要任务,不允许丢失 |
手动应答的三种方法
java
1 | // 1. 肯定确认(成功处理) |
Multiple参数详解
场景:假设channel上有未确认的消息tag=5,6,7,8,当前处理的是tag=8
java
1 | // 情况1:multiple = true |

消息自动重新入队
如果消费者因为某种原因断开连接,导致消息并未发送ACK确认,RabbitMq将了解的消息么没有完全删除,对其进行重新排队。如果有其他的消费者 ,会重新分配给下一个消费者。这样可以在确保不会丢失任何消息

我们开启了两个消费者,一个正常消费,一个缓慢消费,当消费者2消费完成后,停止消费者一 发现 消费者一的消息再消费者2中进行消费





Spring版本:

1 | /** |
1 | /** |
①basicAck()方法
- 方法功能:给Broker返回ACK确认信息,表示消息已经在消费端成功消费,这样Broker就可以把消息删除了
- 参数列表:
| 参数名称 | 含义 |
|---|---|
| long deliveryTag | Broker给每一条进入队列的消息都设定一个唯一标识 |
| boolean multiple | 取值为true:为小于、等于deliveryTag的消息批量返回ACK信息 取值为false:仅为指定的deliveryTag返回ACK信息 |
②basicNack()方法
- 方法功能:给Broker返回NACK信息,表示消息在消费端消费失败,此时Broker的后续操作取决于参数requeue的值
- 参数列表:
| 参数名称 | 含义 |
|---|---|
| long deliveryTag | Broker给每一条进入队列的消息都设定一个唯一标识 |
| boolean multiple | 取值为true:为小于、等于deliveryTag的消息批量返回ACK信息 取值为false:仅为指定的deliveryTag返回ACK信息 |
| boolean requeue | 取值为true:Broker将消息重新放回队列,接下来会重新投递给消费端 取值为false:Broker将消息标记为已消费,不会放回队列 |
③basicReject()方法
- 方法功能:根据指定的deliveryTag,对该消息表示拒绝
- 参数列表:
| 参数名称 | 含义 |
|---|---|
| long deliveryTag | Broker给每一条进入队列的消息都设定一个唯一标识 |
| boolean requeue | 取值为true:Broker将消息重新放回队列,接下来会重新投递给消费端 取值为false:Broker将消息标记为已消费,不会放回队列 |
- basicNack()和basicReject()有啥区别?
- basicNack()有批量操作
- basicReject()没有批量操作
- 要点1:把消息确认模式改为手动确认
- 要点2:调用Channel对象的方法返回信息
- ACK:Acknowledgement,表示消息处理成功
- NACK:Negative Acknowledgement,表示消息处理失败
- Reject:拒绝,同样表示消息处理失败
- 要点3:后续操作
- requeue为true:重新放回队列,重新投递,再次尝试
- requeue为false:不放回队列,不重新投递
- 要点4:deliveryTag 消息的唯一标识,查找具体某一条消息的依据

小结
在RabbitMQ中,我使用手动应答机制来保证消息的可靠性传输。具体来说,当消费者成功处理完消息后,会调用channel.basicAck()发送确认。如果消费者崩溃,RabbitMQ会将未确认的消息重新投递给其他消费者。通过multiple参数,我可以选择批量确认来提高效率,或者单独确认来精确控制。这样既能保证消息不丢失,又能根据业务需求平衡性能。”
RabbitMQ持久化机制详解
持久化(Broker维度) - 解决”RabbitMQ挂了怎么办” ‘存没存住’的问题
一、问题背景
默认情况下的风险:
当RabbitMQ服务重启或崩溃时:
- 队列和消息都会丢失
- 生产者发送的消息无法恢复
二、持久化双重保障
要确保消息不丢失,需要同时做两件事:
1. 队列持久化
1 | // 非持久化队列(默认) |
队列持久化特点:
- 队列元数据保存到磁盘
- RabbitMQ重启后队列依然存在
- 但仅队列存在还不够,消息本身还需要持久化

2. 消息持久化(生产者发送)
java
1 | // 非持久化消息(默认) |
- MINIMAL_BASIC
最基本的空属性,所有字段都为null 适用于不需要任何特殊属性的简单消息
- MINIMAL_PERSISTENT_BASIC
只设置了deliveryMode=2(持久化)的空属性适用于需要持久化但不需要其他属性的消息
- BASIC
设置了contentType=”application/octet-stream”(二进制流)
deliveryMode=1(非持久化)
priority=0(最低优先级)
适用于普通的二进制数据消息
- PERSISTENT_BASIC
与BASIC类似,但deliveryMode=2(持久化) 适用于需要持久化的二进制数据消息
- TEXT_PLAIN
设置了contentType=”text/plain”(纯文本)
deliveryMode=1(非持久化)
priority=0
适用于普通的文本消息
- PERSISTENT_TEXT_PLAIN
与TEXT_PLAIN类似,但deliveryMode=2(持久化)
适用于需要持久化的文本消息
三、持久化机制图解
text
1 | 生产者发送消息 |
重要提醒:
- 持久化会增加磁盘IO,降低吞吐量
- 不是100%保证不丢失(极端情况如磁盘损坏)
- 是”at-least-once”的基础保障
四、不公平分发 vs 轮询分发
默认情况:轮询分发(Round-robin)
text
1 | 消费者A:消息1、3、5... |
问题: 处理快的消费者要等处理慢的消费者
解决方案:不公平分发
java
1 | // 设置预取值为1 |


效果:
- 每个消费者一次只处理一个消息
- 谁处理完谁再拿新的
- 实现”能者多劳”
五、预取值(Prefetch Count)
1 | // 设置预取值大小 |

| 预取值 | 效果 | 适用场景 |
|---|---|---|
| 0 | 无限制,有多少给多少 | 高速处理场景 |
| 1 | 不公平分发 | 处理时间不均的任务 |
| N | 批量处理 | 提高吞吐量 |
小结:
为了确保RabbitMQ重启时消息不丢失,我采用双重持久化策略。首先在声明队列时设置durable=true使队列持久化,然后在发布消息时使用MessageProperties.PERSISTENT_TEXT_PLAIN标记消息持久化。这样即使RabbitMQ重启,消息也能恢复。同时,为了优化消费者性能,我会通过channel.basicQos()设置预取值,实现不公平分发,让处理快的消费者可以处理更多消息,避免消息堆积。”
注意事项
- 持久化队列不能修改为临时队列,反之亦然
- 已存在的队列修改durable参数会报错
- 持久化不是实时写入,有微小的时间窗口风险
- 建议搭配生产者确认机制(publisher confirms)使用
RabbitMQ发布确认
发布确认(生产者维度) - 解决”消息有没有到Broker” ‘发没发到’的问题
一、核心概念
发布确认(Publisher Confirms) 是RabbitMQ提供的生产者可靠性保障机制,确保消息成功到达Broker。
生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的
消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker
就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队
列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传
给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置
basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。
confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信
道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调
方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消
息,生产者应用程序同样可以在回调方法中处理该 nack 消息。
二、三种确认模式对比
| 模式 | 工作原理 | 性能 | 可靠性 | 适用场景 |
|---|---|---|---|---|
| 单个确认 | 发一条等一条确认 | 最低 | 最高 | 对可靠性要求极高的场景 |
| 批量确认 | 累积N条后统一确认 | 中等 | 中等 | 批量处理,可容忍部分重发 |
| 异步确认 | 异步回调,边发边等 | 最高 | 灵活 | 高性能高可靠场景 |
graph TD
A[生产者发送消息] –> B{选择确认模式}
B –> C[单个确认]
B –> D[批量确认]
B –> E[异步确认]
1 | graph TD |
四、详细实现说明
发布确认默认是没有开启的,如果要开启需要调用方法 confirmSelect,每当你要想使用发布确认,都需要在 channel 上调用

1. 单个确认模式(同步)
java
1 | // 关键代码 |
特点:
- 完全同步,吞吐量低(~数百条/秒)
- 每条消息都有明确确认结果
- 适合小规模关键业务
2. 批量确认模式(同步批量)
java
1 | int batchSize = 100; |
特点:
- 性能比单个确认高
- 故障时不知道具体哪条消息失败
- 需要内存保存未确认批次
3. 异步确认模式(最优方案)
消息生产者进行发送,不用等消费进行确认,而直接使用回调,消息需要存储在map
最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,
比如说用 ConcurrentLinkedQueue 这个队列在 confirm callbacks 与发布线程之间进行消息的传
递。
java
1 | // 使用线程安全数据结构存储未确认消息 |
1 | public static void publishMessageAsync() throws Exception { |
SpringAMOP
- 在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。
· confirm 确认模式
· return 退回模式
- rabbitmq 整个消息投递的路径为:
producer—>rabbitmq broker—>exchange—>queue—>consumer
· 消息从 producer 到 exchange 则会返回一个 confirmCallback 。
· 消息从 exchange–>queue 投递失败则会返回一个 returnCallback 。
我们将利用这两个 callback 控制消息的可靠性投递
1 | # 配置rabbitmq |
在这里我们为什么要创建这个配置类呢?首先,我们需要声明回调函数来接收RabbitMQ服务器返回的确认信息:
| 方法名 | 方法功能 | 所属接口 | 接口所属类 |
|---|---|---|---|
| confirm() | 确认消息是否发送到交换机 | ConfirmCallback | RabbitTemplate |
| returnedMessage() | 确认消息是否发送到队列 | ReturnsCallback | RabbitTemplate |
然后,就是对RabbitTemplate的功能进行增强,因为回调函数所在对象必须设置到RabbitTemplate对象中才能生效。
原本RabbitTemplate对象并没有生产者端消息确认的功能,要给它设置对应的组件才可以。
而设置对应的组件,需要调用RabbitTemplate对象下面两个方法:
| 设置组件调用的方法 | 所需对象类型 |
|---|---|
| setConfirmCallback() | ConfirmCallback接口类型 |
| setReturnCallback() | ReturnCallback接口类型 |
2、API说明
①ConfirmCallback接口
这是RabbitTemplate内部的一个接口,源代码如下:
1 | /** |
生产者端发送消息之后,回调confirm()方法
- ack参数值为true:表示消息成功发送到了交换机
- ack参数值为false:表示消息没有发送到交换机
②ReturnCallback接口
同样也RabbitTemplate内部的一个接口,源代码如下:
1 | /** |
注意:接口中的returnedMessage()方法仅在消息没有发送到队列时调用
ReturnedMessage类中主要属性含义如下:
| 属性名 | 类型 | 含义 |
|---|---|---|
| message | org.springframework.amqp.core.Message | 消息以及消息相关数据 |
| replyCode | int | 应答码,类似于HTTP响应状态码 |
| replyText | String | 应答码说明 |
| exchange | String | 交换机名称 |
| routingKey | String | 路由键名称 |
1 |
|
1、面试表达角度
问题: “如何保证生产者发送的消息可靠到达RabbitMQ?”
回答框架:
- 引出问题:”在分布式系统中,网络故障可能导致消息丢失”
- 解决方案:”RabbitMQ提供了发布确认机制,三种模式各有优劣”
- 详细说明:”我会根据业务场景选择:单个确认最可靠但性能差,批量确认折中,异步确认性能最好”
- 实践经验:”实际项目中,我通常使用异步确认+ConcurrentSkipListMap跟踪未确认消息”
示例回答:
“为了保证消息可靠投递,我会启用RabbitMQ的发布确认机制。具体来说,首先调用channel.confirmSelect()开启确认模式。根据业务需求选择模式:对于转账等关键业务使用单个确认,确保每条消息都有反馈;对于日志收集等场景使用批量确认提高吞吐量;对于高并发场景使用异步确认,通过回调函数处理确认结果,并用ConcurrentSkipListMap管理未确认消息,实现高性能和高可靠的平衡。”
实际应用建议
配置选择:
- 追求可靠性:单个确认 + 持久化
- 追求性能:异步确认 + 合适的prefetch
- 平衡方案:批量确认 + 适中的batchSize
谈谈RabbitMQ如何保证消息不丢失?你会从哪些层面来设计?
我的回答框架:
“RabbitMQ的消息可靠性是一个分层递进的保障体系,我通常从生产者 → Broker → 消费者三个维度来构建完整闭环:
- 消息应答手动ack确保(消费者维度) - 解决”消费者挂了怎么办” 解决了‘处没处理’的问题
- 持久化(Broker维度) - 解决”RabbitMQ挂了怎么办” ‘存没存住’的问题
- 发布确认(生产者维度) - 解决”消息有没有到Broker” ‘发没发到’的问题
这三者形成一个完整的可靠性链条,缺一不可
1 | graph LR |
第一,消息应答是消费者对Broker的承诺
“我告诉RabbitMQ:这个消息我已经处理完了,你可以放心删除了。如果我没来得及确认就挂了,请你把消息重新给别人处理。”
第二,持久化是Broker对自己的保护
“RabbitMQ对自己说:重要消息我要写到磁盘上,就算我重启了,这些消息也能从硬盘恢复,不会让生产者的努力白费。”
第三,发布确认是生产者对发送结果的知情权
“生产者需要知道:我的消息到底到没到RabbitMQ?到了我就放心了,没到我要考虑重发。不能像寄信一样,扔进邮筒就不管了。”
典型故障案例:
场景: 电商订单支付后发送库存扣减消息
text
1 | 1. 生产者发送扣减库存消息 ❌(无发布确认) |
1 消息百分百成功投递
谈到消息的可靠性投递,无法避免的,在实际的工作中会经常碰到,比如一些核心业务需要保障消息不丢失,接下来我们看一个可靠性投递的流程图,说明可靠性投递的概念:
Step 1: 首先把消息信息(业务数据)存储到数据库中,紧接着,我们再把这个消息记录也存储到一张消息记录表里(或者另外一个同源数据库的消息记录表)
Step 2:发送消息到MQ Broker节点(采用confirm方式发送,会有异步的返回结果)
Step 3、4:生产者端接受MQ Broker节点返回的Confirm确认消息结果,然后进行更新消息记录表里的消息状态。比如默认Status = 0 当收到消息确认成功后,更新为1即可!
Step 5:但是在消息确认这个过程中可能由于网络闪断、MQ Broker端异常等原因导致 回送消息失败或者异常。这个时候就需要发送方(生产者)对消息进行可靠性投递了,保障消息不丢失,100%的投递成功!(有一种极限情况是闪断,Broker返回的成功确认消息,但是生产端由于网络闪断没收到,这个时候重新投递可能会造成消息重复,需要消费端去做幂等处理)所以我们需要有一个定时任务,(比如每5分钟拉取一下处于中间状态的消息,当然这个消息可以设置一个超时时间,比如超过1分钟 Status = 0 ,也就说明了1分钟这个时间窗口内,我们的消息没有被确认,那么会被定时任务拉取出来)
Step 6:接下来我们把中间状态的消息进行重新投递 retry send,继续发送消息到MQ ,当然也可能有多种原因导致发送失败
Step 7:我们可以采用设置最大努力尝试次数,比如投递了3次,还是失败,那么我们可以将最终状态设置为Status = 2 ,最后 交由人工解决处理此类问题(或者把消息转储到失败表中)。
2 消息幂等性保障
幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。
消费端的幂等性保障
消费者在消费 MQ 中的消息时,MQ 已把消息发送给消费者,消费者在给 MQ 返回 ack 时网络中断,
故 MQ 未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但
实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息。
MQ 消费者的幂等性的解决一般使用全局 ID 或者写个唯一标识比如时间戳 或者 UUID 或者订单消费
者消费 MQ 中的消息也可利用 MQ 的该 id 来判断,或者可按自己的规则生成一个全局唯一 id,每次消费消
息时用该 id 先判断该消息是否已消费过。
9.1.5. 唯一 ID+指纹码机制
指纹码:我们的一些规则或者时间戳加别的服务给到的唯一信息码,它并不一定是我们系统生成的,基
本都是由我们的业务规则拼接而来,但是一定要保证唯一性,然后就利用查询语句进行判断这个 id 是否存
在数据库中,优势就是实现简单就一个拼接,然后查询判断是否重复;劣势就是在高并发时,如果是单个数
据库就会有写入性能瓶颈当然也可以采用分库分表提升性能,但也不是我们最推荐的方式。
9.1.6. Redis 原子性
利用 redis 执行 setnx 命令,天然具有幂等性。从而实现不重复消费
在MQ中指,消费多条相同的消息,得到与消费该消息一次相同的结果。
消息幂等性保障 乐观锁机制
生产者发送消息:
1 | id=1,money=500,version=1 |
消费者接收消息
1 | id=1,money=500,version=1 |
消费者需要保证幂等性:第一次执行SQL语句
第一次执行:version=1
1 | update account set money = money - 500 , version = version + 1 |
消费者需要保证幂等性:第二次执行SQL语句
第二次执行:version=2
1 | update account set money = money - 500 , version = version + 1 |






