Publish/Subscribe发布订阅模式 为什么需要交换机? 问题背景 直接队列通信的局限性:
生产者必须知道队列名称
消息只能发送到一个队列
无法实现”一对多”的广播模式
消息路由策略单一
解决方案: 引入Exchange(交换机)作为消息路由中心
发布订阅模式: 1、每个消费者监听自己的队列。 2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息
EXcahnges概念 生产者生产的消息从不会直接发送到队列 。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。 相反,生产者只能将消息发送到交换机 **(exchange)**,交换机工作的内容非常简单,一方面它接收来 自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消 息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。
· P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
· C:消费者,消息的接受者,会一直等待消息到来。
· Queue:消息队列,接收消息、缓存消息。
· Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
Exchange有常见以下3种类型 :
o Fanout:广播,将消息交给所有绑定到交换机的队列
o Direct:定向,把消息交给符合指定routing key 的队列
o Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力 ,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 graph TB A[生产者] --> B{选择交换机模式} B --> C[Fanout - 广播] B --> D[Direct - 路由] B --> E[Topic - 主题] C --> F[所有绑定队列都收到] D --> G[匹配RoutingKey的队列收到] E --> H[模式匹配的队列收到] subgraph "适用场景演进" F --> I[日志广播/通知] G --> J[选择性处理] H --> K[复杂路由规则] end
三种模式详细对比
模式
匹配规则
使用场景
代码示例
特点
Fanout
无规则,全部发送
日志广播、通知推送
channel.queueBind(queue, exchange, "")
最简单,性能好
Direct
精确匹配RoutingKey
选择性处理(如:错误级别)
channel.queueBind(queue, exchange, "error")
灵活但规则有限
Topic
通配符匹配
复杂路由、多维度筛选
channel.queueBind(queue, exchange, "*.orange.*")
最灵活,规则复杂
Fanout 广播
Logs 和临时队列的绑定关系如下图
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class ReceiveLogs01 { private static final String EXCHANGE_NAME = "logs" ; public static void main (String[] argv) throws Exception { Channel channel = RabbitUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout" ); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, "" ); System.out.println("等待接收消息,把接收到的消息打印在屏幕....." ); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8" ); System.out.println("控制台打印接收到的消息" +message); }; channel.basicConsume(queueName, true , deliverCallback, consumerTag -> { }); } }
ReceiveLogs02 将接收到的消息存储在磁盘
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class ReceiveLogs02 { private static final String EXCHANGE_NAME = "logs" ; public static void main (String[] argv) throws Exception { Channel channel = RabbitUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout" ); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, "" ); System.out.println("等待接收消息,把接收到的消息写到文件....." ); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8" ); File file = new File("C:\\work\\rabbitmq_info.txt" ); FileUtils.writeStringToFile(file,message,"UTF-8" ); System.out.println("数据写入文件成功" ); }; channel.basicConsume(queueName, true , deliverCallback, consumerTag -> { }); } }
EmitLog 发送消息给两个消费者接收
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public class EmitLog { private static final String EXCHANGE_NAME = "logs" ; public static void main (String[] argv) throws Exception { try (Channel channel = RabbitUtils.getChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, "fanout" ); Scanner sc = new Scanner(System.in); System.out.println("请输入信息" ); while (sc.hasNext()) { String message = sc.nextLine(); channel.basicPublish(EXCHANGE_NAME, "" , null , message.getBytes("UTF-8" )); System.out.println("生产者发出消息" + message); } } } }
核心特点:
一条消息,所有绑定的队列都能收到
适合系统日志收集、实时通知
routingKey为空字符串
Direct exchange
在上面这张图中,我们可以看到 X 绑定了两个队列,绑定类型是 direct。队列 Q1 绑定键为 orange,
队列 Q2 绑定键有两个:一个绑定键为 black,另一个绑定键为 green.
在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列
Q1。绑定键为 blackgreen 和的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃。
创建绑定的使用代码 channel.queueBind(queueName, EXCHANGE_NAME, “routingKey”)
多重绑定
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 package mq.direct;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import mq.utils.RabbitMqUtils;import java.util.HashMap;import java.util.Map;public class EmitLogDirect { private static final String EXCHANGE_NAME = "direct_logs" ; public static void main (String[] argv) throws Exception { try (Channel channel = RabbitMqUtils.getChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); Map<String, String> bindingKeyMap = new HashMap<>(); bindingKeyMap.put("info" , "普通 info 信息" ); bindingKeyMap.put("warning" , "警告 warning 信息" ); bindingKeyMap.put("error" , "错误 error 信息" ); bindingKeyMap.put("debug" , "调试 debug 信息" ); for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) { String bindingKey = bindingKeyEntry.getKey(); String message = bindingKeyEntry.getValue(); channel.basicPublish(EXCHANGE_NAME, bindingKey, null , message.getBytes("UTF-8" )); System.out.println("生产者发出消息:" + message); } } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 package com.example.Direct;import com.example.RabbitMqUtils;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;public class ReceiveLogsDirect01 { private static final String EXCHANGE_NAME = "direct_logs" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String queueName = "disk" ; channel.queueDeclare(queueName, false , false , false , null ); channel.queueBind(queueName, EXCHANGE_NAME, "error" ); System.out.println("等待接受的消息..." ); DeliverCallback deliverCallback = ((consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8" ); message = "接收绑定键:" + delivery.getEnvelope().getRoutingKey() + ",消息:" + message; System.out.println("错误日志已经接收" ); }); channel.basicConsume(queueName, true , deliverCallback, consumerTag -> { }); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 package com.example.Direct;import com.example.RabbitMqUtils;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;public class ReceiveLogsDirect02 { private static final String EXCHANGE_NAME = "direct_logs" ; public static void main (String[] argv) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String queueName = "console" ; channel.queueDeclare(queueName, false , false , false , null ); channel.queueBind(queueName, EXCHANGE_NAME, "info" ); channel.queueBind(queueName, EXCHANGE_NAME, "warning" ); System.out.println("等待接收消息....." ); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8" ); System.out.println(" 接收绑定键 :" +delivery.getEnvelope().getRoutingKey()+", 消 息:" +message); }; channel.basicConsume(queueName, true , deliverCallback, consumerTag -> { }); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 package com.example.Topic;import com.example.RabbitMqUtils;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;public class ReceiveLogsTopic02 { private static final String EXCHANGE_NAME = "topic_logs" ; public static void main (String[] argv) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic" ); String queueName="Q2" ; channel.queueDeclare(queueName, false , false , false , null ); channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit" ); channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#" ); System.out.println("等待接收消息....." ); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8" ); System.out.println(" 接收队列 :" +queueName+" 绑定键:" +delivery.getEnvelope().getRoutingKey()+",消息:" +message); }; channel.basicConsume(queueName, true , deliverCallback, consumerTag -> { }); } }
Topic
Q1–>绑定的是
中间带 orange 带 3 个单词的字符串(.orange. )
Q2–>绑定的是
最后一个单词是 rabbit 的 3 个单词(. .rabbit)
第一个单词是 lazy 的多个单词(lazy.#)
上图是一个队列绑定关系图,我们来看看他们之间数据接收情况是怎么样的
quick.orange.rabbit 被队列 Q1Q2 接收到
lazy.orange.elephant 被队列 Q1Q2 接收到
quick.orange.fox 被队列 Q1 接收到
lazy.brown.fox 被队列 Q2 接收到
lazy.pink.rabbit 虽然满足两个绑定但只被队列 Q2 接收一次
quick.brown.fox 不匹配任何绑定不会被任何队列接收到会被丢弃
quick.orange.male.rabbit 是四个单词不匹配任何绑定会被丢弃
lazy.orange.male.rabbit 是四个单词但匹配 Q2
当队列绑定关系是下列这种情况时需要引起注意
**当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 **fanout 了
*如果队列绑定键当中没有#和 出现,那么该队列绑定类型就是 direct **
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 package com.example.Topic;import com.example.RabbitMqUtils;import com.rabbitmq.client.Channel;import java.util.HashMap;import java.util.Map;public class EmitLogTopic { private static final String EXCHANGE_NAME = "topic_logs" ; public static void main (String[] argv) throws Exception { try (Channel channel = RabbitMqUtils.getChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, "topic" ); Map<String, String> bindingKeyMap = new HashMap<>(); bindingKeyMap.put("quick.orange.rabbit" ,"被队列 Q1Q2 接收到" ); bindingKeyMap.put("lazy.orange.elephant" ,"被队列 Q1Q2 接收到" ); bindingKeyMap.put("quick.orange.fox" ,"被队列 Q1 接收到" ); bindingKeyMap.put("lazy.brown.fox" ,"被队列 Q2 接收到" ); bindingKeyMap.put("lazy.pink.rabbit" ,"虽然满足两个绑定但只被队列 Q2 接收一次" ); bindingKeyMap.put("quick.brown.fox" ,"不匹配任何绑定不会被任何队列接收到会被丢弃" ); bindingKeyMap.put("quick.orange.male.rabbit" ,"是四个单词不匹配任何绑定会被丢弃" ); bindingKeyMap.put("lazy.orange.male.rabbit" ,"是四个单词但匹配 Q2" ); for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()){ String bindingKey = bindingKeyEntry.getKey(); String message = bindingKeyEntry.getValue(); channel.basicPublish(EXCHANGE_NAME,bindingKey, null , message.getBytes("UTF-8" )); System.out.println("生产者发出消息" + message); } } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 package com.example.Topic;import com.example.RabbitMqUtils;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;public class ReceiveLogsTopic01 { private static final String EXCHANGE_NAME = "topic_logs" ; public static void main (String[] argv) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic" ); String queueName="Q1" ; channel.queueDeclare(queueName, false , false , false , null ); channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*" ); System.out.println("等待接收消息....." ); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8" ); System.out.println(" 接收队列 :" +queueName+" 绑定键:" +delivery.getEnvelope().getRoutingKey()+",消息:" +message); }; channel.basicConsume(queueName, true , deliverCallback, consumerTag -> { }); } }
请解释RabbitMQ的三种交换机模式 回答框架:
“RabbitMQ的交换机模式是为了解决不同场景下的消息路由需求而设计的,它们的演进体现了从简单到复杂的路由需求:”
第一层:Fanout(广播模式)
“想象成公司的大喇叭广播,不管谁在办公室,都能听到通知。”
核心特点:
一条消息,所有绑定的队列都能收到
适合系统日志收集、实时通知
routingKey为空字符串
java
1 2 3 4 // 使用场景:日志收集系统 // 多个服务同时记录日志到不同地方(文件、数据库、监控) channel.queueBind(logFileQueue, "logs_exchange", ""); channel.queueBind(logDBQueue, "logs_exchange", "");
第二层:Direct(路由模式)
“升级为智能分拣,比如快递员根据收件人地址送到不同楼栋。”
解决什么问题:
Fanout只能全部发送,浪费资源
需要选择性发送特定消息
核心特点:
精确匹配routingKey
一个队列可以绑定多个routingKey
多个队列可以绑定同一个routingKey(实现有限广播)
java
1 2 3 4 5 // 使用场景:错误级别分类 // error级别:文件+数据库+报警 channel.queueBind(queue1, "log_exchange", "error"); // info级别:只存文件 channel.queueBind(queue2, "log_exchange", "info");
第三层:Topic(主题模式)
“再升级为智能搜索,比如根据关键词标签推送相关内容。”
解决什么问题:
Direct只能精确匹配,无法处理模式匹配
需要更灵活的路由规则(如:user.login、order.paid)
核心特点:
支持通配符:*(一个单词)、#(多个单词)
实现基于主题的消息路由
适合微服务间的消息通信
java
1 2 3 4 5 channel.queueBind(queue, "order_exchange" , "order.*" ); channel.queueBind(queue, "order_exchange" , "*.payment.success" );
1 2 3 4 5 6 7 8 9 graph LR P[订单服务] --> E[订单交换机 Topic] subgraph "消费者服务" E -->|order.created| Q1[库存服务] E -->|order.paid.success| Q2[物流服务] E -->|order.*.failed| Q3[通知服务] E -->|*.payment.*| Q4[支付监控] end
1 2 3 4 5 6 7 8 9 10 channel.basicPublish("order_exchange" , "order.created" , null , orderData); channel.basicPublish("order_exchange" , "order.paid.success" , null , paymentData);