Publish/Subscribe发布订阅模式

为什么需要交换机?

问题背景

直接队列通信的局限性:

  • 生产者必须知道队列名称
  • 消息只能发送到一个队列
  • 无法实现”一对多”的广播模式
  • 消息路由策略单一

解决方案: 引入Exchange(交换机)作为消息路由中心

发布订阅模式:

1、每个消费者监听自己的队列。
2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息

EXcahnges概念

生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。 相反,生产者只能将消息发送到交换机**(exchange)**,交换机工作的内容非常简单,一方面它接收来 自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消 息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。

image-20240806085107277

· P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)

· C:消费者,消息的接受者,会一直等待消息到来。

· Queue:消息队列,接收消息、缓存消息。

· Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。

Exchange有常见以下3种类型

o Fanout:广播,将消息交给所有绑定到交换机的队列

o Direct:定向,把消息交给符合指定routing key 的队列

o Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
graph TB
A[生产者] --> B{选择交换机模式}
B --> C[Fanout - 广播]
B --> D[Direct - 路由]
B --> E[Topic - 主题]

C --> F[所有绑定队列都收到]
D --> G[匹配RoutingKey的队列收到]
E --> H[模式匹配的队列收到]

subgraph "适用场景演进"
F --> I[日志广播/通知]
G --> J[选择性处理]
H --> K[复杂路由规则]
end

三种模式详细对比

模式 匹配规则 使用场景 代码示例 特点
Fanout 无规则,全部发送 日志广播、通知推送 channel.queueBind(queue, exchange, "") 最简单,性能好
Direct 精确匹配RoutingKey 选择性处理(如:错误级别) channel.queueBind(queue, exchange, "error") 灵活但规则有限
Topic 通配符匹配 复杂路由、多维度筛选 channel.queueBind(queue, exchange, "*.orange.*") 最灵活,规则复杂

Fanout 广播

image.png

Logs 和临时队列的绑定关系如下图

image.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class ReceiveLogs01 {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitUtils.getChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
/**
* 生成一个临时的队列 队列的名称是随机的
* 当消费者断开和该队列的连接时 队列自动删除
*/
String queueName = channel.queueDeclare().getQueue();
//把该临时队列绑定我们的 exchange 其中 routingkey(也称之为 binding key)为空字符串
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println("等待接收消息,把接收到的消息打印在屏幕.....");

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("控制台打印接收到的消息"+message);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}

ReceiveLogs02 将接收到的消息存储在磁盘

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class ReceiveLogs02 {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
/**
* 生成一个临时的队列 队列的名称是随机的
* 当消费者断开和该队列的连接时 队列自动删除
*/
String queueName = channel.queueDeclare().getQueue();
//把该临时队列绑定我们的 exchange 其中 routingkey(也称之为 binding key)为空字符串
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println("等待接收消息,把接收到的消息写到文件.....");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
File file = new File("C:\\work\\rabbitmq_info.txt");
FileUtils.writeStringToFile(file,message,"UTF-8");
System.out.println("数据写入文件成功");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}

EmitLog 发送消息给两个消费者接收

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
try (Channel channel = RabbitUtils.getChannel()) {
/**
* 声明一个 exchange
* 1.exchange 的名称
* 2.exchange 的类型
*/
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
Scanner sc = new Scanner(System.in);
System.out.println("请输入信息");
while (sc.hasNext()) {
String message = sc.nextLine();
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.println("生产者发出消息" + message);
}
}
}
}

核心特点:

  • 一条消息,所有绑定的队列都能收到
  • 适合系统日志收集、实时通知
  • routingKey为空字符串

Direct exchange

image-20220524095729319

在上面这张图中,我们可以看到 X 绑定了两个队列,绑定类型是 direct。队列 Q1 绑定键为 orange,

队列 Q2 绑定键有两个:一个绑定键为 black,另一个绑定键为 green.

在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列

Q1。绑定键为 blackgreen 和的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃。

创建绑定的使用代码 channel.queueBind(queueName, EXCHANGE_NAME, “routingKey”)

多重绑定

image.png

image.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package mq.direct;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import mq.utils.RabbitMqUtils;

import java.util.HashMap;
import java.util.Map;

public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs";

public static void main(String[] argv) throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//创建多个 bindingKey
Map<String, String> bindingKeyMap = new HashMap<>();
bindingKeyMap.put("info", "普通 info 信息");
bindingKeyMap.put("warning", "警告 warning 信息");
bindingKeyMap.put("error", "错误 error 信息");
//debug 没有消费这接收这个消息 所有就丢失了
bindingKeyMap.put("debug", "调试 debug 信息");
for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) {
String bindingKey = bindingKeyEntry.getKey();
String message = bindingKeyEntry.getValue();
channel.basicPublish(EXCHANGE_NAME, bindingKey, null,
message.getBytes("UTF-8"));
System.out.println("生产者发出消息:" + message);
}
}
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.example.Direct;

import com.example.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

public class ReceiveLogsDirect01 {
private static final String EXCHANGE_NAME = "direct_logs";

public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String queueName = "disk";
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "error");
System.out.println("等待接受的消息...");
DeliverCallback deliverCallback = ((consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
message = "接收绑定键:" + delivery.getEnvelope().getRoutingKey() + ",消息:" + message;
System.out.println("错误日志已经接收");
});
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.example.Direct;

import com.example.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

/**
* 使用多个绑定键
*/
public class ReceiveLogsDirect02 {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String queueName = "console";
channel.queueDeclare(queueName, false, false, false, null);
// 一个交换机可以绑定多个key
channel.queueBind(queueName, EXCHANGE_NAME, "info");
channel.queueBind(queueName, EXCHANGE_NAME, "warning");
System.out.println("等待接收消息.....");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" 接收绑定键 :"+delivery.getEnvelope().getRoutingKey()+", 消 息:"+message);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.example.Topic;


import com.example.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

public class ReceiveLogsTopic02 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//声明 Q2 队列与绑定关系
String queueName="Q2";
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");
channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");
System.out.println("等待接收消息.....");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" 接收队列 :"+queueName+" 绑定键:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}

}

Topic

image-20220524104714649

Q1–>绑定的是

中间带 orange 带 3 个单词的字符串(.orange.)

Q2–>绑定的是

最后一个单词是 rabbit 的 3 个单词(..rabbit)

第一个单词是 lazy 的多个单词(lazy.#)

上图是一个队列绑定关系图,我们来看看他们之间数据接收情况是怎么样的

quick.orange.rabbit 被队列 Q1Q2 接收到

lazy.orange.elephant 被队列 Q1Q2 接收到

quick.orange.fox 被队列 Q1 接收到

lazy.brown.fox 被队列 Q2 接收到

lazy.pink.rabbit 虽然满足两个绑定但只被队列 Q2 接收一次

quick.brown.fox 不匹配任何绑定不会被任何队列接收到会被丢弃

quick.orange.male.rabbit 是四个单词不匹配任何绑定会被丢弃

lazy.orange.male.rabbit 是四个单词但匹配 Q2

当队列绑定关系是下列这种情况时需要引起注意

**当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 **fanout

*如果队列绑定键当中没有#和出现,那么该队列绑定类型就是 direct **

image-20220524104621414

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.example.Topic;

import com.example.RabbitMqUtils;
import com.rabbitmq.client.Channel;

import java.util.HashMap;
import java.util.Map;

public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
/**
* Q1-->绑定的是
* 中间带 orange 带 3 个单词的字符串(*.orange.*)
* Q2-->绑定的是
* 最后一个单词是 rabbit 的 3 个单词(*.*.rabbit)
* 第一个单词是 lazy 的多个单词(lazy.#)
*
*/
Map<String, String> bindingKeyMap = new HashMap<>();
bindingKeyMap.put("quick.orange.rabbit","被队列 Q1Q2 接收到");
bindingKeyMap.put("lazy.orange.elephant","被队列 Q1Q2 接收到");
bindingKeyMap.put("quick.orange.fox","被队列 Q1 接收到");
bindingKeyMap.put("lazy.brown.fox","被队列 Q2 接收到");
bindingKeyMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列 Q2 接收一次");
bindingKeyMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃");
bindingKeyMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");
bindingKeyMap.put("lazy.orange.male.rabbit","是四个单词但匹配 Q2");
for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()){
String bindingKey = bindingKeyEntry.getKey();
String message = bindingKeyEntry.getValue();
channel.basicPublish(EXCHANGE_NAME,bindingKey, null,
message.getBytes("UTF-8"));
System.out.println("生产者发出消息" + message);
}
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.example.Topic;

import com.example.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

public class ReceiveLogsTopic01 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//声明 Q1 队列与绑定关系
String queueName="Q1";
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");
System.out.println("等待接收消息.....");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" 接收队列 :"+queueName+" 绑定键:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}

请解释RabbitMQ的三种交换机模式

回答框架:

“RabbitMQ的交换机模式是为了解决不同场景下的消息路由需求而设计的,它们的演进体现了从简单到复杂的路由需求:”

第一层:Fanout(广播模式)

“想象成公司的大喇叭广播,不管谁在办公室,都能听到通知。”

核心特点:

  • 一条消息,所有绑定的队列都能收到
  • 适合系统日志收集、实时通知
  • routingKey为空字符串

java

1
2
3
4
// 使用场景:日志收集系统
// 多个服务同时记录日志到不同地方(文件、数据库、监控)
channel.queueBind(logFileQueue, "logs_exchange", "");
channel.queueBind(logDBQueue, "logs_exchange", "");

第二层:Direct(路由模式)

“升级为智能分拣,比如快递员根据收件人地址送到不同楼栋。”

解决什么问题:

  • Fanout只能全部发送,浪费资源
  • 需要选择性发送特定消息

核心特点:

  • 精确匹配routingKey
  • 一个队列可以绑定多个routingKey
  • 多个队列可以绑定同一个routingKey(实现有限广播)

java

1
2
3
4
5
// 使用场景:错误级别分类
// error级别:文件+数据库+报警
channel.queueBind(queue1, "log_exchange", "error");
// info级别:只存文件
channel.queueBind(queue2, "log_exchange", "info");

第三层:Topic(主题模式)

“再升级为智能搜索,比如根据关键词标签推送相关内容。”

解决什么问题:

  • Direct只能精确匹配,无法处理模式匹配
  • 需要更灵活的路由规则(如:user.loginorder.paid

核心特点:

  • 支持通配符:*(一个单词)、#(多个单词)
  • 实现基于主题的消息路由
  • 适合微服务间的消息通信

java

1
2
3
4
5
// 使用场景:订单系统
// 监听所有订单相关消息
channel.queueBind(queue, "order_exchange", "order.*");
// 监听支付成功消息
channel.queueBind(queue, "order_exchange", "*.payment.success");
1
2
3
4
5
6
7
8
9
graph LR
P[订单服务] --> E[订单交换机 Topic]

subgraph "消费者服务"
E -->|order.created| Q1[库存服务]
E -->|order.paid.success| Q2[物流服务]
E -->|order.*.failed| Q3[通知服务]
E -->|*.payment.*| Q4[支付监控]
end
1
2
3
4
5
6
7
8
9
10
// 订单创建时
channel.basicPublish("order_exchange", "order.created", null, orderData);

// 支付成功时
channel.basicPublish("order_exchange", "order.paid.success", null, paymentData);

// 不同的服务按需订阅
// 库存服务订阅:order.created
// 物流服务订阅:order.paid.success
// 通知服务订阅:order.*.failed(所有失败情况)