1.1. MQ 的相关概念

1.1.1. 什么是 MQ

MQ(message queue),从字面意思上看,本质是个队列,FIFO 先入先出,只不过队列中存放的内容是

message 而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ 是一种非常常

见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了 MQ 之后,消息发送上游只需要依赖 MQ,不

用依赖其他服务。

1.1.2. 为什么要用MQ

主要是异步处理、应用解耦和流量削峰

应用解耦

  • 以下单功能为例,如下图,存在功能耦合度高的问题。

    调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单操作异常。当转变成基于

    消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复。在

    这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户的下单操作可以正常完成。当物流

    系统恢复后,继续处理订单信息即可,中单用户感受不到物流系统的故障,提升系统的可用性。

image.png

流量削峰

流量削峰一般在秒杀活动中应用广泛

场景: 秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。

传统模式

如订单系统,在下单的时候就会往数据库写数据。但是数据库只能支撑每秒1000左右的并发写入,并发量再高就容易宕机。低峰期的时候并发也就100多个,但是在高峰期时候,并发量会突然激增到5000以上,这个时候数据库肯定卡死了。

image-20251208225357612

传统模式的缺点:

· 并发量大的时候,所有的请求直接怼到数据库,造成数据库连接异常

中间件模式:

消息被MQ保存起来了,然后系统就可以按照自己的消费能力来消费,比如每秒1000个数据,这样慢慢写入数据库,这样就不会卡死数据库了。虽然延时了但是但是比不能下单的体 验要好。

image-20251208225444276

中间件模式的的优点:

系统A慢慢按照数据库能处理的并发量,从消息队列中拉取消息。在生产中,这个短暂的高峰期积压是允许的。

流量削峰也叫做削峰填谷

使用了MQ之后,限制消费消息的速度为1000,但是这样一来,高峰期产生的数据势必会被积压在MQ中,高峰就被“削”掉了。但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在 3消费完积压的消息,这就叫做“填谷”

3.异步处理

有些服务间调用是异步的,例如 A 调用 B,B 需要花费很长时间执行,但是 A 需要知道 B 什么时候可

以执行完,以前一般有两种方式,A 过一段时间去调用 B 的查询 api 查询。或者 A 提供一个 callback api,

B 执行完之后调用 api 通知 A 服务。这两种方式都不是很优雅,使用消息总线,可以很方便解决这个问题,

A 调用 B 服务后,只需要监听 B 处理完成的消息,当 B 处理完成后,会发送一条消息给 MQ,MQ 会将此

消息转发给 A 服务。这样 A 服务既不用循环调用 B 的查询 api,也不用提供 callback api。同样 B 服务也不

用做这些操作。A 服务还能及时的得到异步处理成功的消息。

中间件模式的的优点:

· 将消息写入消息队列,非必要的业务逻辑以异步的方式运行,加快响应速度

结合商城例如 商城在在下完单后需要自动上架店铺和发送短信通知,这种就是串行接口就超时,改成异步队列发送消息进行处理

在分批次上架完成后然后再进行 短信分批次发送 。

1.1.3 AMQP 和 JMS

MQ是消息通信的模型;实现MQ的大致有两种主流方式:AMQP、JMS。

AMQP

AMQP是一种高级消息队列协议(Advanced Message Queuing Protocol),更准确的说是一种binary wire-level protocol(链接协议)。这是其和JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。

JMS

JMS即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。

AMQP 与 JMS 区别

· JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式

· JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。

· JMS规定了两种消息模式;而AMQP的消息模式更加丰富

主要核心区别就是 就像是“USB协议”和“USB接口说明书”的区别

在Java里,操作消息队列的代码应该怎么写

比如创建连接叫ConnectionFactory,发消息用MessageProducer,收消息用MessageConsumer

JMS 就是个《Java专用说明书》:

  • 它就规定了一件事:在Java里,操作消息队列的代码应该怎么写
  • 比如创建连接叫ConnectionFactory,发消息用MessageProducer,收消息用MessageConsumer
  • 好处是:Java程序员只用学这一套,就能操作不同的消息中间件(比如ActiveMQ、IBM MQ),换产品时改改配置就行,代码不用大改。
  • 缺点是:只适用于Java,而且功能比较基础。

AMQP 就是个《全球通用USB协议》:

  • 它不关心你用什么语言,它规定的是消息在网络上传输的格式和规则
  • 就像USB协议规定了电压、数据怎么传输,让任何设备插上都能用。
  • 好处是:真正跨语言,用Java写的服务可以给Go写的服务发消息,Python写的也能收。
  • 而且功能更强大,支持灵活的消息路由(比如像微信订阅号那样,可以按标签推给特定用户)。

1.1.4 消息队列产品

1.ActiveMQ

优点:单机吞吐量万级,时效性 ms 级,可用性高,基于主从架构实现高可用性,消息可靠性较

低的概率丢失数据

缺点:官方社区现在对 ActiveMQ 5.x 维护越来越少,高吞吐量场景较少使用

2. Kafka

大数据的杀手锏,谈到大数据领域内的消息传输,则绕不开 Kafka,这款为大数据而生的消息中间件,

以其百万级 TPS 的吞吐量名声大噪,迅速成为大数据领域的宠儿,在数据采集、传输、存储的过程中发挥

着举足轻重的作用。目前已经被 LinkedIn,Uber, Twitter, Netflix 等大公司所采纳。

优点:

  • 性能卓越,单机写入 TPS 约在百万条/秒,最大的优点,就是吞吐量高

  • 时效性 ms 级可用性非常高,kafka 是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用,消费者采

用 Pull 方式获取消息, 消息有序, 通过控制能够保证所有消息被消费且仅被消费一次;

  • 有优秀的第三方Kafka Web 管理界面 Kafka-Manager;

  • 在日志领域比较成熟,被多家公司和多个开源项目使用;

功能支持:

功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用

缺点:

  • Kafka 单机超过 64 个队列/分区,Load 会发生明显的飙高现象,队列越多,load 越高,发送消

息响应时间变长

  • 使用短轮询方式,实时性取决于轮询间隔时间,消费失败不支持重试;

  • 支持消息顺序, 但是一台代理宕机后,就会产生消息乱序,社区更新较慢

    做日志收集、用户行为跟踪、监控数据、流式处理,选Kafka准没错。它就是个超级数据高速公路

3、RocketMQ

优点:

  1. 阿里出身,久经考验:双十一万亿级流量验证,在金融、电商交易场景的可靠性和一致性上做得最好。
  2. 性能均衡:吞吐量接近Kafka(单机十万级),但延迟更低,达到毫秒甚至亚毫秒级
  3. 功能丰富:支持顺序消息、事务消息(分布式事务解决方案)、延迟消息、消息回溯等高级特性,非常适合业务复杂的核心系统。
  4. 架构优雅:Java开发,源码易懂,NameServer代替Zookeeper,依赖更少,运维相对Kafka简单。

缺点:

  1. 生态不如Kafka:虽然在国内是霸主,但在国际开源社区和大数据生态的融合度上不如Kafka。
  2. 客户端语言支持:官方主推Java客户端,对其他语言(如Go、Python)的支持社区版较弱(云商业版支持好)。

面试一句话: “业务复杂、要求高可靠和高性能的国内Java项目,特别是电商、金融等核心交易链路,RocketMQ是首选。”

4、RabbitMQ

优点:

  1. 协议之王,灵活路由:原生支持AMQP,并提供多种交换机类型(Direct, Topic, Fanout, Headers),消息路由能力最强最灵活
  2. 管理界面优秀:自带Web管理界面非常强大,方便查看状态、管理队列和监控。
  3. 可靠性强:支持生产者确认、消费者ACK、持久化等,保证消息不丢失。
  4. 社区活跃,语言支持广:Erlang开发,性能不错,社区非常活跃,几乎支持所有主流编程语言。
  5. 消息模型丰富:对复杂业务场景支持友好,如死信队列、优先级队列等。

缺点:

  1. 吞吐量一般:基于Erlang,吞吐量在四者中偏中下(万级到十万级),不如Kafka和RocketMQ。
  2. 集群扩展稍麻烦:镜像队列模式虽然能保证高可用,但扩展性和数据一致性方案没有Kafka/RocketMQ的分布式分区设计那么优雅。
  3. Erlang技术栈:国内精通Erlang的开发者相对少,二次开发和深度定制有门槛。
  4. 消息堆积能力弱:大量消息堆积时性能下降明显,因为它主要设计是“处理”消息,而不是“存储”消息。

面试一句话: “对消息路由有复杂要求、需要灵活可靠通信的微服务系统,并且数据量不是天文数字,RabbitMQ是最佳选择。”

  • 吞吐和生态,找Kafka
  • 业务可靠和功能,找RocketMQ
  • 灵活路由和易用,找RabbitMQ

综合并发量

image-20240805160953981

1.2. RabbitMQ

1.2.1. RabbitMQ 的概念

RabbitMQ 是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包

裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑 RabbitMQ 是

一个快递站,一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收,

存储和转发

消息数据。

1.2.2 四大核心概念

生产者

产生数据发送消息的程序是生产者

交换机

交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息

推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是

送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定

队列

队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存

储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可

以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式

消费者

消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费

者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。

1.2.3. RabbitMQ 核心部分

image-20240806084732986

image-20240806084732986

1.2.4 RabbitMQ基础架构

  • 基础架构图

image-20240806102134889

  • RabbitMQ相关概念

Broker:接收和分发消息的应用,RabbitMQ 服务节点,或者 RabbitMQ服务实例。大多数情况下也可以将一个 RabbitMQ Broker 看作一台 RabbitMQ 服务器。

image-20220506105926977

生产者将消息存入 RabbitMQ Broker,以及消费者从Broker中消费数据的整个流程

Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等

Connection:publisher/consumer 和 broker 之间的 TCP 连接

Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销

Exchange:Exchange(交换器) 用来接收生产者发送的消息并将这些消息路由给服务器中的队列中,如果路由不到,或许会返回给 Producer(生产者) 。根据分发规则,匹配查询表中的 RoutingKey,找到交换机与消息队列的绑定,分发消息到 queue 中去。

RabbitMQ 的 Exchange(交换器) 有4种类型,不同的类型对应着不同的路由策略:

direct(默认), fanout, topic, 和 headers,不同类型的Exchange转发消息的策略有所区别。

生产者将消息发给交换器的时候,一般会指定一个 RoutingKey(路由键)**,用来指定这个**消息的路由规则**,而这个 RoutingKey 需要与交换器类型和绑定键(BindingKey)联合使用**才能最终生效。

image-20220506105255236

生产者将消息发送给交换器时,需要一个RoutingKey,当 BindingKey 和 RoutingKey 相匹配时,消息会被路由到对应的队列中。在绑定多个队列到同一个交换器的时候,这些绑定允许使用相同的 BindingKey。BindingKey 并不是在所有的情况下都生效,它依赖于交换器类型,比如fanout类型的交换器就会无视,而是将消息路由到所有绑定到该交换器的队列中

Queue(消息队列) 用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊(Round-Robin,即轮询)给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理,这样避免的消息被重复消费。

Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据

image-20220506102911250

1.3 安装

1.3.1 虚机部署

1.官网地址

https://www.rabbitmq.com/download.html

2.文件上传

上传到/usr/local/software 目录下(如果没有 software 需要自己创建)

img

3.安装文件(分别按照以下顺序安装)

rpm -ivh erlang-21.3-1.el7.x86_64.rpm

yum install socat -y

rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm

3.常用命令(按照以下顺序执行)

添加开机启动 RabbitMQ 服务

chkconfig rabbitmq-server on

启动服务

/sbin/service rabbitmq-server start

查看服务状态

/sbin/service rabbitmq-server status

img

停止服务(选择执行)

/sbin/service rabbitmq-server stop

开启 web 管理插件

rabbitmq-plugins enable rabbitmq_management

用默认账号密码(guest)访问地址 http://47.115.185.244:15672/出现权限问题

img

4.添加一个新的用户

创建账号

rabbitmqctl add_user admin 123

设置用户角色

rabbitmqctl set_user_tags admin administrator

设置用户权限

set_permissions [-p ]

rabbitmqctl set_permissions -p “/“ admin “.“ “.“ “.*”

用户 user_admin 具有/vhost1 这个 virtual host 中所有资源的配置、写、读权限

当前用户和角色

rabbitmqctl list_users

5.再次利用 admin 用户登录

img

\6. 重置命令

关闭应用的命令为

rabbitmqctl stop_app

清除的命令为

rabbitmqctl reset

重新启动命令为

rabbitmqctl start_app

1.3.2 镜像部署

1 安装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 拉取镜像
docker pull rabbitmq:3.12-management

# -d 参数:后台运行 Docker 容器
# --name 参数:设置容器名称
# -p 参数:映射端口号,格式是“宿主机端口号:容器内端口号”。5672供客户端程序访问,15672供后台管理界面访问
# -v 参数:卷映射目录
# -e 参数:设置容器内的环境变量,这里我们设置了登录RabbitMQ管理后台的默认用户和密码
docker run -d \
--name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-v rabbitmq-plugin:/plugins \
-e RABBITMQ_DEFAULT_USER=guest \
-e RABBITMQ_DEFAULT_PASS=123456 \
rabbitmq:3.12.0-management

2 验证

访问后台管理界面:http://192.168.200.100:15672

image-20231102194452610

使用上面创建Docker容器时指定的默认用户名、密码登录:

image-20231102194633997

image-20231102194746743

3 可能的问题1:Docker升级

3.1 问题现象

在使用Docker拉取RabbitMQ镜像的时候,如果遇到提示:missing signature key,那就说明Docker版本太低了,需要升级

比如我目前的Docker版本如下图所示:

image-20231105151245299

3.2 解决办法

基于CentOS7

①卸载当前Docker

更好的办法是安装Docker前曾经给服务器拍摄了快照,此时恢复快照;

如果不曾拍摄快照,那只能执行卸载操作了

1
2
3
4
5
6
7
8
9
10
11
yum erase -y docker \
docker-client \
docker-client-latest \
docker-common \
docker-latest \
docker-latest-logrotate \
docker-logrotate \
docker-selinux \
docker-engine-selinux \
docker-engine \
docker-ce
②升级yum库
1
yum update -y
③安装Docker最新版
1
yum install -y docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin

如果这一步看到提示:没有可用软件包 docker-ce,那就添加Docker的yum源:

1
2
yum install -y yum-utils
yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo
④设置Docker服务
1
2
systemctl start docker
systemctl enable docker
3.3 验证

上述操作执行完成后,再次查看Docker版本:

image-20240321113218105

4 可能的问题:拉取镜像失败

1、问题现象

image-20240724113003339

2、解决办法
①daemon.json
1
2
# 新建或修改 docker 守护进程配置文件:daemon.json
vim /etc/docker/daemon.json
②修改镜像源
1
2
3
{
"registry-mirrors": ["https://registry.dockermirror.com"]
}
③重启docker服务
1
systemctl restart docker
④查看修改后的镜像源
1
docker info

部分内容举例如下:

……

Docker Root Dir: /var/lib/docker
Debug Mode: false
Experimental: false
Insecure Registries:
127.0.0.0/8
Registry Mirrors:
https://registry.dockermirror.com/
Live Restore Enabled: fals

最后再尝试重新拉取所需镜像