工作队列(Work Queues又名:任务队列)

img

工作队列(又名:任务队列)

对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

轮循分发(Round-robin dispatching)

任务队列的一个好处就是能很简单的并行工作。如果我们有积压的工作,我们只需要添加更多的工作者,很容易扩展。三个消费者顺序读取队列中的消息

image-20251209222309179

image-20251209222225870

image-20251209222209603

image-20251209222113479

image-20251209222149759

image-20251209222158146

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Component
public class Consumer1 {

/**
* 消息监听方法
* queuesToDeclare: 声明队列
* durable: 是否为持久化队列
*/
@RabbitListener(queuesToDeclare = @Queue(name = "work-queue", durable = "true"))
public void handlerMessage(String msg) {
System.out.println("=====消费者1: " + msg);
try {
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
}
//模拟消费失败,触发重试
// int i = 1 / 0;
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

/**
* <p>
* work工作队列模式的消费者-只有一个消费者能接收到消息(竞争的消费者模式)
* </p>
*
* @author kim
**/
@Component
public class Consumer2 {

/**
* 消息监听方法
* queuesToDeclare: 声明队列
* durable: 是否为持久化队列
*/
@RabbitListener(queuesToDeclare = @Queue(name = "work-queue", durable = "true"))
public void handlerMessage(String msg) {
System.out.println("=====消费者2: " + msg);
try {
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
}
//模拟消费失败,触发重试
// int i = 1 / 0;
}

}

消息应答(ACK消费者可靠性):

主要解决消息应答手动ack确保(消费者维度) - 解决”消费者挂了怎么办” 解决了‘处没处理’的问题

完成一个任务需要花费一些时间,你可以想象一个需要较长时间完成的任务在执行的中途中挂了会发生什么。我们当前的代码,一旦RabbitMQ将消息交付给客户,它将立即从内存中被移除。在这个例子中,如果你在执行过程中杀死一个工作者我们将丢失这个消息。我们也会丢失所有分发给指定的这个工作者但是还没有处理的消息。

但是我们不想丢失任何任务。如果一个工作者挂了,我们希望这个任务能交付给另一个工作者

为了确保消息永远不会丢失,RabbitMQ提供了消息确认机制,消费者向RabbitMQ中发送一个确认表示消息已经接收、处理并且RabbitMQ可以自由的删除它了。

如果一个消费者挂了(通道关闭,连接关闭或者TCP连接丢失)没有发送确认,RabbitMQ将会理解成消息没有被完全处理并将消息发回队列。如果这个时候有其他消费者在线,它将被快速的交付给另一个消费者。通过这种方式能确保没有消息丢失,即使工作者偶尔挂掉。

这里没有消息超时,当消费者挂了RabbitMQ将会重新交付消息。如果一个消息的处理过程花费了很长很长的时间,这个是允许的。

简单版

为什么需要消息确认?

  • 工作者(消费者)在处理消息过程中崩溃
  • RabbitMQ立即从内存移除已交付的消息
  • 导致消息永久丢失

解决方案:
RabbitMQ的消息确认(Acknowledgement)机制确保消息不会丢失。

1
2
3
4
5
6
7
生产者 → RabbitMQ → 消息交付给消费者 → 消费者处理消息 → 发送确认 → RabbitMQ删除消息

(如果崩溃)

RabbitMQ重新投递


确认模式对比

模式 机制 优点 缺点 适用场景
自动应答 消息一送达就自动确认 高吞吐量 消息可能丢失 可快速处理、允许丢失的场景
手动应答 处理完成后手动发送确认 数据安全,不会丢失 吞吐量较低 重要任务,不允许丢失

手动应答的三种方法

java

1
2
3
4
5
6
7
8
9
10
11
12
// 1. 肯定确认(成功处理)
channel.basicAck(deliveryTag, multiple);
// deliveryTag: 消息标识
// multiple: 是否批量确认

// 2. 否定确认(RabbitMQ 2.0.0+)
channel.basicNack(deliveryTag, multiple, requeue);
// requeue: true-重新入队,false-丢弃/进入死信队列

// 3. 拒绝确认(早期版本)
channel.basicReject(deliveryTag, requeue);
// 不支持批量操作

Multiple参数详解

场景:假设channel上有未确认的消息tag=5,6,7,8,当前处理的是tag=8

java

1
2
3
4
5
6
7
8
9
// 情况1:multiple = true
channel.basicAck(8, true);
// 结果:一次性确认5,6,7,8所有消息
// ✅ 减少网络拥堵,提高效率

// 情况2:multiple = false
channel.basicAck(8, false);
// 结果:只确认tag=8的消息
// ✅ 更精确控制,避免误确认

image-20220510004753600

消息自动重新入队

如果消费者因为某种原因断开连接,导致消息并未发送ACK确认,RabbitMq将了解的消息么没有完全删除,对其进行重新排队。如果有其他的消费者 ,会重新分配给下一个消费者。这样可以在确保不会丢失任何消息

image-20220510004005131

我们开启了两个消费者,一个正常消费,一个缓慢消费,当消费者2消费完成后,停止消费者一 发现 消费者一的消息再消费者2中进行消费

image-20251209224740208

image-20251209224017093

image-20251209224033505

image-20251209224100765

image-20251209224712300

Spring版本:

image-20251211001621017

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 主题模式-测试消费端的ack
* 消费端需要配置 spring.rabbitmq.listener.acknowledge-mode=manual 消费者开启手动ack消息确认,所有队列都会生效
* 消费端需要配置 spring.rabbitmq.listener.default-requeue-rejected=false 设置为false,会重发消息到死信队列,所有队列都会生效
*/
@Test
public void sendTopicByConsumerAck() {
//消费端最大消息堆积是3,这里发10条会有几条瞬间被扔到死信队列中,剩下的消费失败被拒绝确认后,才会扔到死信队列中
//当然,也可以设置为3,测试消费失败被拒绝确认,扔到死信队列中的情景
for (int i = 0; i < 10; i++) {
String routingKey = i % 2 == 0 ? "topicAck.log.info" : "topicAck.log.error";
rabbitMQService.sendMessageByExchange("topicAck-exchange", routingKey, "主题模式-测试消费端的ack,路由routingKey=" + routingKey + " i=" + i);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
    /**
* 消息监听方法
* bindings: 完成队列与交换机的绑定
* Queue: 队列属性,超过最大值,超时未被消费,消费失败超过重试次数,都会被仍到信息队列中
* exchange:交换机属性
* key:路由key,通配符
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "${topicAck.ack.queue}", durable = "true", arguments = {
@Argument(name = "x-max-length", value = "3", type = "java.lang.Long"), // 队列的最大存储界限,这里示例设为3
@Argument(name = "x-message-ttl", value = "5000000", type = "java.lang.Long"), // 消息过期时间,多久没有被消费
@Argument(name = "x-dead-letter-exchange", value = "${dead.exchange}"), // 死信队列交换机-dead-exchange
@Argument(name = "x-dead-letter-routing-key", value = "xxx")}), // 死信队列路由key
exchange = @Exchange(name = "${topicAck.exchange}", type = ExchangeTypes.TOPIC),// 交换机
key = {"#"}// 路由key,通配符
))
public void handlerMessage(String msg, Channel channel, Message message) throws IOException {
try {
System.out.printf("================AckConsumer接受到消息,准备消费,msg=%s,================", msg);
System.out.println();
try {
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("AckConsumer--->接受到的消息是:" + msg);

// 业务处理
//int i = 10 / 0;

// 手动ack确认
//参数1:deliveryTag:消息唯一传输ID
//参数2:multiple:true: 手动批量处理,false: 手动单条处理
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
System.out.printf("================AckConsumer消费成功,msg=%s,================", msg);
System.out.println();
} catch (Exception ex) {
// 如果真得出现了异常,我们采用消息重投,获取redelivered,判断是否为重投: false没有重投,true重投
Boolean redelivered = message.getMessageProperties().getRedelivered();
System.out.println("redelivered = " + redelivered);

try {
// (已重投)拒绝确认
if (redelivered) {
/**
* 拒绝确认,从队列中删除该消息,防止队列阻塞(消息堆积)
* boolean requeue: false不重新入队列(丢弃消息)
*/
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
System.out.printf("================AckConsumer消费消息已投入死信队列,msg=%s,================", msg);
System.out.println();
} else { // (没有重投) 消息重投
/**
* 消息重投,重新把消息放回队列中
* boolean multiple: 单条或批量
* boolean requeue: true重回队列
*/
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
System.out.println("=========消息重投了=======");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

①basicAck()方法

  • 方法功能:给Broker返回ACK确认信息,表示消息已经在消费端成功消费,这样Broker就可以把消息删除了
  • 参数列表:
参数名称 含义
long deliveryTag Broker给每一条进入队列的消息都设定一个唯一标识
boolean multiple 取值为true:为小于、等于deliveryTag的消息批量返回ACK信息
取值为false:仅为指定的deliveryTag返回ACK信息

②basicNack()方法

  • 方法功能:给Broker返回NACK信息,表示消息在消费端消费失败,此时Broker的后续操作取决于参数requeue的值
  • 参数列表:
参数名称 含义
long deliveryTag Broker给每一条进入队列的消息都设定一个唯一标识
boolean multiple 取值为true:为小于、等于deliveryTag的消息批量返回ACK信息
取值为false:仅为指定的deliveryTag返回ACK信息
boolean requeue 取值为true:Broker将消息重新放回队列,接下来会重新投递给消费端
取值为false:Broker将消息标记为已消费,不会放回队列

③basicReject()方法

  • 方法功能:根据指定的deliveryTag,对该消息表示拒绝
  • 参数列表:
参数名称 含义
long deliveryTag Broker给每一条进入队列的消息都设定一个唯一标识
boolean requeue 取值为true:Broker将消息重新放回队列,接下来会重新投递给消费端
取值为false:Broker将消息标记为已消费,不会放回队列
  • basicNack()和basicReject()有啥区别?
    • basicNack()有批量操作
    • basicReject()没有批量操作
  • 要点1:把消息确认模式改为手动确认
  • 要点2:调用Channel对象的方法返回信息
    • ACK:Acknowledgement,表示消息处理成功
    • NACK:Negative Acknowledgement,表示消息处理失败
    • Reject:拒绝,同样表示消息处理失败
  • 要点3:后续操作
    • requeue为true:重新放回队列,重新投递,再次尝试
    • requeue为false:不放回队列,不重新投递
  • 要点4:deliveryTag 消息的唯一标识,查找具体某一条消息的依据

未命名文件

小结

在RabbitMQ中,我使用手动应答机制来保证消息的可靠性传输。具体来说,当消费者成功处理完消息后,会调用channel.basicAck()发送确认。如果消费者崩溃,RabbitMQ会将未确认的消息重新投递给其他消费者。通过multiple参数,我可以选择批量确认来提高效率,或者单独确认来精确控制。这样既能保证消息不丢失,又能根据业务需求平衡性能。”

RabbitMQ持久化机制详解

持久化(Broker维度) - 解决”RabbitMQ挂了怎么办” ‘存没存住’的问题

一、问题背景

默认情况下的风险:

当RabbitMQ服务重启或崩溃时:

  • 队列和消息都会丢失
  • 生产者发送的消息无法恢复

二、持久化双重保障

要确保消息不丢失,需要同时做两件事:

1. 队列持久化

1
2
3
4
5
6
// 非持久化队列(默认)
channel.queueDeclare("queue_name", false, false, false, null);

// 持久化队列
channel.queueDeclare("queue_name", true, false, false, null);
// ↑ 第二个参数durable=true

队列持久化特点:

  • 队列元数据保存到磁盘
  • RabbitMQ重启后队列依然存在
  • 但仅队列存在还不够,消息本身还需要持久化

image-20251209230011761

2. 消息持久化(生产者发送)

java

1
2
3
4
5
6
7
// 非持久化消息(默认)
channel.basicPublish("", "queue_name", null, message.getBytes());

// 持久化消息
channel.basicPublish("", "queue_name",
MessageProperties.PERSISTENT_TEXT_PLAIN, // 关键参数
message.getBytes());
  • MINIMAL_BASIC

最基本的空属性,所有字段都为null 适用于不需要任何特殊属性的简单消息

  • MINIMAL_PERSISTENT_BASIC

只设置了deliveryMode=2(持久化)的空属性适用于需要持久化但不需要其他属性的消息

  • BASIC

设置了contentType=”application/octet-stream”(二进制流)
deliveryMode=1(非持久化)
priority=0(最低优先级)
适用于普通的二进制数据消息

  • PERSISTENT_BASIC

与BASIC类似,但deliveryMode=2(持久化) 适用于需要持久化的二进制数据消息

  • TEXT_PLAIN

设置了contentType=”text/plain”(纯文本)
deliveryMode=1(非持久化)
priority=0
适用于普通的文本消息

  • PERSISTENT_TEXT_PLAIN

与TEXT_PLAIN类似,但deliveryMode=2(持久化)
适用于需要持久化的文本消息

三、持久化机制图解

text

1
2
3
4
5
6
7
8
9
10
11
生产者发送消息

消息标记为持久化 (PERSISTENT_TEXT_PLAIN)

RabbitMQ接收消息

写入磁盘(异步刷盘)

持久化队列保存消息

消费者消费后确认

重要提醒:

  • 持久化会增加磁盘IO,降低吞吐量
  • 不是100%保证不丢失(极端情况如磁盘损坏)
  • 是”at-least-once”的基础保障

四、不公平分发 vs 轮询分发

默认情况:轮询分发(Round-robin)

text

1
2
消费者A:消息1、3、5...
消费者B:消息2、4、6...

问题: 处理快的消费者要等处理慢的消费者

解决方案:不公平分发

java

1
2
// 设置预取值为1
channel.basicQos(1);

image-20251209232648200

image-20251209232838249

效果:

  • 每个消费者一次只处理一个消息
  • 谁处理完谁再拿新的
  • 实现”能者多劳”

五、预取值(Prefetch Count)

1
2
// 设置预取值大小
channel.basicQos(prefetchCount);

image-20220510171254790

预取值 效果 适用场景
0 无限制,有多少给多少 高速处理场景
1 不公平分发 处理时间不均的任务
N 批量处理 提高吞吐量

小结:

为了确保RabbitMQ重启时消息不丢失,我采用双重持久化策略。首先在声明队列时设置durable=true使队列持久化,然后在发布消息时使用MessageProperties.PERSISTENT_TEXT_PLAIN标记消息持久化。这样即使RabbitMQ重启,消息也能恢复。同时,为了优化消费者性能,我会通过channel.basicQos()设置预取值,实现不公平分发,让处理快的消费者可以处理更多消息,避免消息堆积。”

注意事项

  • 持久化队列不能修改为临时队列,反之亦然
  • 已存在的队列修改durable参数会报错
  • 持久化不是实时写入,有微小的时间窗口风险
  • 建议搭配生产者确认机制(publisher confirms)使用

RabbitMQ发布确认

发布确认(生产者维度) - 解决”消息有没有到Broker” ‘发没发到’的问题

一、核心概念

发布确认(Publisher Confirms) 是RabbitMQ提供的生产者可靠性保障机制,确保消息成功到达Broker。

生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的

消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker

就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队

列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传

给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置

basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。

confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信

道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调

方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消

息,生产者应用程序同样可以在回调方法中处理该 nack 消息。

二、三种确认模式对比

模式 工作原理 性能 可靠性 适用场景
单个确认 发一条等一条确认 最低 最高 对可靠性要求极高的场景
批量确认 累积N条后统一确认 中等 中等 批量处理,可容忍部分重发
异步确认 异步回调,边发边等 最高 灵活 高性能高可靠场景

graph TD
A[生产者发送消息] –> B{选择确认模式}
B –> C[单个确认]
B –> D[批量确认]
B –> E[异步确认]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
graph TD
A[生产者发送消息] --> B{选择确认模式}
B --> C[单个确认]
B --> D[批量确认]
B --> E[异步确认]

C --> F[阻塞等待确认]
F --> G[收到确认]
G --> H[发送下一条]

D --> I[累积到batchSize]
I --> J[批量等待确认]
J --> K[发送下一批]

E --> L[消息放入未确认Map]
L --> M[异步继续发送]
M --> N[回调处理确认结果]
N --> O{确认类型}
O --> P[Ack-从Map移除]
O --> Q[Nack-记录重发]

四、详细实现说明

发布确认默认是没有开启的,如果要开启需要调用方法 confirmSelect,每当你要想使用发布确认,都需要在 channel 上调用

image-20220523233725329

1. 单个确认模式(同步)

java

1
2
3
4
// 关键代码
channel.confirmSelect(); // 开启确认模式
channel.basicPublish(...); // 发送消息
boolean flag = channel.waitForConfirms(); // 阻塞等待确认

特点:

  • 完全同步,吞吐量低(~数百条/秒)
  • 每条消息都有明确确认结果
  • 适合小规模关键业务

2. 批量确认模式(同步批量)

java

1
2
3
4
5
6
7
8
9
10
11
12
int batchSize = 100;
int outstandingCount = 0;

for (int i = 0; i < total; i++) {
channel.basicPublish(...);
outstandingCount++;

if (outstandingCount == batchSize) {
channel.waitForConfirms(); // 批量确认
outstandingCount = 0;
}
}

特点:

  • 性能比单个确认高
  • 故障时不知道具体哪条消息失败
  • 需要内存保存未确认批次

3. 异步确认模式(最优方案)

消息生产者进行发送,不用等消费进行确认,而直接使用回调,消息需要存储在map

最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,

比如说用 ConcurrentLinkedQueue 这个队列在 confirm callbacks 与发布线程之间进行消息的传

递。

java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 使用线程安全数据结构存储未确认消息
ConcurrentSkipListMap<Long, String> outstandingConfirms =
new ConcurrentSkipListMap<>();

// 成功回调
ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
if (multiple) {
// 批量确认:清除≤当前序列号的所有消息
outstandingConfirms.headMap(sequenceNumber, true).clear();
} else {
// 单个确认:只清除当前消息
outstandingConfirms.remove(sequenceNumber);
}
};

// 失败回调
ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {
String failedMsg = outstandingConfirms.get(sequenceNumber);
// 记录日志或重发机制
};

// 添加监听器
channel.addConfirmListener(ackCallback, nackCallback);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
    public static void publishMessageAsync() throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
//开启发布确认
channel.confirmSelect();
/**
* 线程安全有序的一个哈希表,适用于高并发的情况
* 1.轻松的将序号与消息进行关联
* 2.轻松批量删除条目 只要给到序列号
* 3.支持并发访问
*/
ConcurrentSkipListMap<Long, String> outstandingConfirms = new
ConcurrentSkipListMap<>();
/**
// Broker可能返回的确认模式:
// 情况A:逐条确认(multiple=false)
// 收到10次回调:handleAck(1,false), handleAck(2,false)...
// 情况B:批量确认(multiple=true)
// 可能只收到1次回调:handleAck(10, true)
// 一次性确认了1-10所有消息
ConcurrentSkipListMap<Long, String> map = new ConcurrentSkipListMap<>();
map.put(1L, "A");
map.put(2L, "B");
map.put(3L, "C");
map.put(4L, "D");
map.put(5L, "E");

// headMap(3, true) 返回:{1=A, 2=B, 3=C}
// headMap(3, false) 返回:{1=A, 2=B}
*/
ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
if (multiple) {
//返回的是小于等于当前序列号的未确认消息 是一个 map 用于批量确认
ConcurrentNavigableMap<Long, String> confirmed =
outstandingConfirms.headMap(sequenceNumber, true);
//清除该部分未确认消息
confirmed.clear(); //把已确认的从数组中清除,未确认的进行删除
}else{
//只清除当前序列号的消息 当条确认
outstandingConfirms.remove(sequenceNumber);
}
};
ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {
// 重试等操作
String message = outstandingConfirms.get(sequenceNumber);

System.out.println("发布的消息"+message+"未被确认,序列号"+sequenceNumber);
};
/**
* 添加一个异步确认的监听器
* 1.确认收到消息的回调
* 2.未收到消息的回调
*/
channel.addConfirmListener(ackCallback, null);
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = "消息" + i;
/**
* channel.getNextPublishSeqNo()获取下一个消息的序列号
* 通过序列号与消息体进行一个关联
* 全部都是未确认的消息体
*/
outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
channel.basicPublish("", queueName, null, message.getBytes());
}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "个异步确认消息,耗时" + (end - begin) +
"ms");
} }

SpringAMOP

  • 在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式

· confirm 确认模式

· return 退回模式

  • rabbitmq 整个消息投递的路径为:

producer—>rabbitmq broker—>exchange—>queue—>consumer

· 消息从 producer 到 exchange 则会返回一个 confirmCallback 。

· 消息从 exchange–>queue 投递失败则会返回一个 returnCallback 。

我们将利用这两个 callback 控制消息的可靠性投递

1
2
3
4
5
6
7
8
9
10
11
12
# 配置rabbitmq
spring:
rabbitmq:
host: 192.168.10.100 #mq地址
port: 5672 #端口
username: guest #登录账号
password: jin159357@ #登录密码
virtual-host: / #虚拟机
#publisher-confirms: true # 开启消息发送确认机制,低版本
publisher-confirm-type: correlated # 开启消息发送确认机制
publisher-returns: true # 队列的确认

在这里我们为什么要创建这个配置类呢?首先,我们需要声明回调函数来接收RabbitMQ服务器返回的确认信息:

方法名 方法功能 所属接口 接口所属类
confirm() 确认消息是否发送到交换机 ConfirmCallback RabbitTemplate
returnedMessage() 确认消息是否发送到队列 ReturnsCallback RabbitTemplate

然后,就是对RabbitTemplate的功能进行增强,因为回调函数所在对象必须设置到RabbitTemplate对象中才能生效。

原本RabbitTemplate对象并没有生产者端消息确认的功能,要给它设置对应的组件才可以。

而设置对应的组件,需要调用RabbitTemplate对象下面两个方法:

设置组件调用的方法 所需对象类型
setConfirmCallback() ConfirmCallback接口类型
setReturnCallback() ReturnCallback接口类型

2、API说明

①ConfirmCallback接口

这是RabbitTemplate内部的一个接口,源代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* A callback for publisher confirmations.
*/
@FunctionalInterface
public interface ConfirmCallback {

/**
* Confirmation callback.
* @param correlationData correlation data for the callback.
* @param ack true for ack, false for nack
* @param cause An optional cause, for nack, when available, otherwise null.
*/
void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause);

}

生产者端发送消息之后,回调confirm()方法

  • ack参数值为true:表示消息成功发送到了交换机
  • ack参数值为false:表示消息没有发送到交换机

②ReturnCallback接口

同样也RabbitTemplate内部的一个接口,源代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* A callback for returned messages.
* @since 2.3
*/
@FunctionalInterface
public interface ReturnsCallback {

/**
* Returned message callback.
* @param returned the returned message and metadata.
*/
void returnedMessage(ReturnedMessage returned);

}

注意:接口中的returnedMessage()方法在消息没有发送到队列时调用

ReturnedMessage类中主要属性含义如下:

属性名 类型 含义
message org.springframework.amqp.core.Message 消息以及消息相关数据
replyCode int 应答码,类似于HTTP响应状态码
replyText String 应答码说明
exchange String 交换机名称
routingKey String 路由键名称
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63

/**
* <p>
* 定义生产者确认回调对象
* 需要配置spring.rabbitmq.publisher-confirms=true 低版本
* 或spring.rabbitmq.publisher-confirm-type=correlated 高版本
* 每个RabbitTemplate只支持一个ConfirmCallback
* </p>
*
* @author kim
**/
@Slf4j
@Component
public class ProducerAckConfirmCallback implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {

@Autowired
private RabbitTemplate rabbitTemplate;

@PostConstruct
private void init() {
//设置生产者确认回调对象
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}


/**
* 发送ack确认回调
*
* @param correlationData 这里获取唯一id
* @param ack 是否确认收到(true已确认收到,false未确认收到)
* @param cause 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
//有些没有设置发送应答ack的,不需要走后续的逻辑
if (correlationData == null) {
return;
}
// 确认方法
log.info("是否确认发送成功ack = {} 失败原因cause={}", ack, cause);
// 如果为true,代表mq已成功接收消息
if (ack) {
// 从Redis数据中删除消息
log.info("消息确认发送成功:correlationDataId = {}", correlationData.getId());
} else {
// 如果为false,代表mq没有接收到消息(消息生产失败)
// 业务处理
log.error("消息确认发送失败:correlationDataId = {}", correlationData.getId());
}
}

//当消息无法路由的时候的回调方法
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String
exchange, String routingKey) {
log.error(" 消 息 {}, 被交换机 {} 退回,退回原因 :{}, 路 由 key:{}",new
String(message.getBody()),exchange,replyText,routingKey);
}



}

1、面试表达角度

问题: “如何保证生产者发送的消息可靠到达RabbitMQ?”

回答框架:

  1. 引出问题:”在分布式系统中,网络故障可能导致消息丢失”
  2. 解决方案:”RabbitMQ提供了发布确认机制,三种模式各有优劣”
  3. 详细说明:”我会根据业务场景选择:单个确认最可靠但性能差,批量确认折中,异步确认性能最好”
  4. 实践经验:”实际项目中,我通常使用异步确认+ConcurrentSkipListMap跟踪未确认消息”

示例回答:
“为了保证消息可靠投递,我会启用RabbitMQ的发布确认机制。具体来说,首先调用channel.confirmSelect()开启确认模式。根据业务需求选择模式:对于转账等关键业务使用单个确认,确保每条消息都有反馈;对于日志收集等场景使用批量确认提高吞吐量;对于高并发场景使用异步确认,通过回调函数处理确认结果,并用ConcurrentSkipListMap管理未确认消息,实现高性能和高可靠的平衡。”

实际应用建议

配置选择:

  1. 追求可靠性:单个确认 + 持久化
  2. 追求性能:异步确认 + 合适的prefetch
  3. 平衡方案:批量确认 + 适中的batchSize

谈谈RabbitMQ如何保证消息不丢失?你会从哪些层面来设计?

我的回答框架:

“RabbitMQ的消息可靠性是一个分层递进的保障体系,我通常从生产者 → Broker → 消费者三个维度来构建完整闭环:

  1. 消息应答手动ack确保(消费者维度) - 解决”消费者挂了怎么办” 解决了‘处没处理’的问题
  2. 持久化(Broker维度) - 解决”RabbitMQ挂了怎么办” ‘存没存住’的问题
  3. 发布确认(生产者维度) - 解决”消息有没有到Broker” ‘发没发到’的问题

这三者形成一个完整的可靠性链条,缺一不可

1
2
3
4
5
6
7
8
9
10
11
graph LR
A[生产者] -- 发布确认 --> B[Broker RabbitMQ]
B -- 持久化存储 --> C[磁盘]
B -- 消息投递 --> D[消费者]
D -- 消息应答 --> B

subgraph "可靠性保障体系"
A -->|1. 确保发送成功| B
B -->|2. 确保消息不丢| C
D -->|3. 确保处理完成| B
end

第一,消息应答是消费者对Broker的承诺

“我告诉RabbitMQ:这个消息我已经处理完了,你可以放心删除了。如果我没来得及确认就挂了,请你把消息重新给别人处理。”

第二,持久化是Broker对自己的保护

“RabbitMQ对自己说:重要消息我要写到磁盘上,就算我重启了,这些消息也能从硬盘恢复,不会让生产者的努力白费。”

第三,发布确认是生产者对发送结果的知情权

“生产者需要知道:我的消息到底到没到RabbitMQ?到了我就放心了,没到我要考虑重发。不能像寄信一样,扔进邮筒就不管了。”

典型故障案例:

场景: 电商订单支付后发送库存扣减消息

text

1
2
3
4
5
6
7
1. 生产者发送扣减库存消息 ❌(无发布确认)
↓ 网络抖动,消息未到达Broker
2. Broker接收消息后持久化 ✅
↓ 系统重启,消息还在
3. 消费者处理消息时崩溃 ❌(无消息应答)
↓ 消息被重新投递,但实际库存已扣
4. 结果:库存被重复扣减!

1 消息百分百成功投递

谈到消息的可靠性投递,无法避免的,在实际的工作中会经常碰到,比如一些核心业务需要保障消息不丢失,接下来我们看一个可靠性投递的流程图,说明可靠性投递的概念:

img

Step 1: 首先把消息信息(业务数据)存储到数据库中,紧接着,我们再把这个消息记录也存储到一张消息记录表里(或者另外一个同源数据库的消息记录表)

Step 2:发送消息到MQ Broker节点(采用confirm方式发送,会有异步的返回结果)

Step 3、4:生产者端接受MQ Broker节点返回的Confirm确认消息结果,然后进行更新消息记录表里的消息状态。比如默认Status = 0 当收到消息确认成功后,更新为1即可!

Step 5:但是在消息确认这个过程中可能由于网络闪断、MQ Broker端异常等原因导致 回送消息失败或者异常。这个时候就需要发送方(生产者)对消息进行可靠性投递了,保障消息不丢失,100%的投递成功!(有一种极限情况是闪断,Broker返回的成功确认消息,但是生产端由于网络闪断没收到,这个时候重新投递可能会造成消息重复,需要消费端去做幂等处理)所以我们需要有一个定时任务,(比如每5分钟拉取一下处于中间状态的消息,当然这个消息可以设置一个超时时间,比如超过1分钟 Status = 0 ,也就说明了1分钟这个时间窗口内,我们的消息没有被确认,那么会被定时任务拉取出来)

Step 6:接下来我们把中间状态的消息进行重新投递 retry send,继续发送消息到MQ ,当然也可能有多种原因导致发送失败

Step 7:我们可以采用设置最大努力尝试次数,比如投递了3次,还是失败,那么我们可以将最终状态设置为Status = 2 ,最后 交由人工解决处理此类问题(或者把消息转储到失败表中)。

2 消息幂等性保障

幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。

img

消费端的幂等性保障

消费者在消费 MQ 中的消息时,MQ 已把消息发送给消费者,消费者在给 MQ 返回 ack 时网络中断,

故 MQ 未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但

实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息。

MQ 消费者的幂等性的解决一般使用全局 ID 或者写个唯一标识比如时间戳 或者 UUID 或者订单消费

者消费 MQ 中的消息也可利用 MQ 的该 id 来判断,或者可按自己的规则生成一个全局唯一 id,每次消费消

息时用该 id 先判断该消息是否已消费过。

9.1.5. 唯一 ID+指纹码机制

指纹码:我们的一些规则或者时间戳加别的服务给到的唯一信息码,它并不一定是我们系统生成的,基

本都是由我们的业务规则拼接而来,但是一定要保证唯一性,然后就利用查询语句进行判断这个 id 是否存

在数据库中,优势就是实现简单就一个拼接,然后查询判断是否重复;劣势就是在高并发时,如果是单个数

据库就会有写入性能瓶颈当然也可以采用分库分表提升性能,但也不是我们最推荐的方式。

9.1.6. Redis 原子性

利用 redis 执行 setnx 命令,天然具有幂等性。从而实现不重复消费

在MQ中指,消费多条相同的消息,得到与消费该消息一次相同的结果。

消息幂等性保障 乐观锁机制

生产者发送消息:

1
id=1,money=500,version=1

消费者接收消息

1
2
3
id=1,money=500,version=1

id=1,money=500,version=1

消费者需要保证幂等性:第一次执行SQL语句

第一次执行:version=1

1
2
update account set money = money - 500 , version = version + 1
where id = 1 and version = 1

消费者需要保证幂等性:第二次执行SQL语句

第二次执行:version=2

1
2
update account set money = money - 500 , version = version + 1
where id = 1 and version = 1