RabbitMQ

学习笔记:02-什么是MQ_哔哩哔哩_bilibili

1. 消息队列

1.1 MQ 相关概念

1.1.1 什么是 MQ

  • MQ(message queue),从字面意思上看,本质是个队列,FIFO 先入先出,只不过队列中存放的内容是 message 而已

  • 还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ 是一种非常常见的上下游 “逻辑解耦+物理解耦” 的消息通信服务

    a 给 b 发消息,a 就是上游,b 就是下游

  • 使用了 MQ 之后,消息发送上游只需要依赖 MQ,不用依赖其他服务

1.1.2 为什么要用 MQ

MQ 有三大功能:

  • 流量削峰

    • 举例:
      • 如果订单系统最多能处理一万次订单,这个处理能力应付正常时段的下单时绰绰有余,正常时段我们下单一秒后就能返回结果
      • 但是在高峰期,如果有两万次下单操作系统是处理不了的,只能限制订单超过一万后不允许用户下单
      • 使用消息队列做缓冲,我们可以取消这个限制,把一秒内下的订单分散成一段时间来处理,这时有些用户可能在下单十几秒后才能收到下单成功的操作,但是比不能下单的体验要好
    • 图示说明:

    1696320156974

  • 应用解耦

    • 举例:
      • 以电商应用为例,应用中有订单系统、库存系统、物流系统、支付系统
      • 用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单操作异常
      • 转变成基于消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复
      • 在这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户的下单操作可以正常完成
      • 当物流系统恢复后,继续处理订单信息即可,中单用户感受不到物流系统的故障,提升系统的可用性
    • 图示说明:

    1696320368051

  • 异步处理

    • 举例:
      • 有些服务间调用是异步的,例如 A 调用 B,B 需要花费很长时间执行,但是 A 需要知道 B 什么时候可以执行完
      • 以前一般有两种方式,A 过一段时间去调用 B 的查询 api 查询;或者 A 提供一个 callback api,B 执行完之后调用 api 通知 A 服务
      • 这两种方式都不是很优雅,使用消息总线,可以很方便解决这个问题,A 调用 B 服务后,只需要监听 B 处理完成的消息,当 B 处理完成后,会发送一条消息给 MQ,MQ 会将此消息转发给 A 服务
      • 这样 A 服务既不用循环调用 B 的查询 api,也不用提供 callback api。同样 B 服务也不用做这些操作。A 服务还能及时的得到异步处理成功的消息
    • 图示说明:

    1696320559421

1.1.3 MQ 的分类

  • ActiveMQ
    • 优点:单机吞吐量万级,时效性 ms 级,可用性高,基于主从架构实现高可用性,消息可靠性方面:较低的概率丢失数据
    • 缺点:官方社区现在对 ActiveMQ 5.x 维护越来越少,高吞吐量场景较少使用
  • Kafka
    • 大数据的杀手锏,谈到大数据领域内的消息传输,则绕不开 Kafka,这款为大数据而生的消息中间件,以其百万级 TPS 的吞吐量名声大噪,在数据采集、传输、存储的过程中发挥着举足轻重的作用,
    • 优点:
      • 性能卓越,单机写入 TPS 约在百万条/秒,最大的优点,就是吞吐量高。时效性 ms 级,可用性非常高
      • kafka 是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
      • 消费者采用 Pull 方式获取消息,消息有序,通过控制能够保证所有消息被消费且仅被消费一次
      • 有优秀的第三方Kafka Web 管理界面 Kafka-Manager
      • 在日志领域比较成熟,被多家公司和多个开源项目使用
      • 功能支持:功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用
    • 缺点:Kafka 单机超过 64 个队列/分区,Load 会发生明显的飙高现象,队列越多,load 越高,发送消息响应时间变长,使用短轮询方式,实时性取决于轮询间隔时间,消费失败不支持重试;支持消息顺序,但是一台代理宕机后,就会产生消息乱序,社区更新较慢
  • RocketMQ
    • RocketMQ 出自阿里巴巴的开源产品,用 Java 语言实现,在设计时参考了 Kafka,并做出了自己的一些改进
    • 优点:单机吞吐量十万级,可用性非常高,分布式架构,消息可以做到 0 丢失,MQ 功能较为完善,还是分布式的,扩展性好,支持 10 亿级别的消息堆积,不会因为堆积导致性能下降,源码是 java 我们可以自己阅读源码,定制自己公司的 MQ
    • 缺点:支持的客户端语言不多,目前是 java 及 c++,其中 c++不成熟;社区活跃度一般,没有在 MQ 核心中去实现 JMS 等接口,有些系统要迁移需要修改大量代码
  • RabbitMQhttps://www.rabbitmq.com/news.html
    • 2007 年发布,是一个在 AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一
    • 优点:由于 erlang 语言的高并发特性,性能较好;吞吐量到万级,MQ 功能比较完备,健壮、稳定、易用、跨平台、支持多种语言 如:Python、Ruby、.NET、Java、JMS、C、PHPActionScript等,支持 AJAX 文档齐全;开源提供的管理界面非常棒,用起来很好用,社区活跃度高;更新频率相当高
    • 缺点:商业版需要收费,学习成本较高

1.1.4 MQ 的选择

  • Kafka

    • Kafka 主要特点是基于 Pull 的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输
    • 适合产生大量数据的互联网服务的数据收集业务。大型公司建议可以选用,如果有日志采集功能,肯定是首选 kafka 了
  • RocketMQ

    • 天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况
    • RocketMQ 在稳定性上可能更值得信赖,这些业务场景在阿里双 11 已经经历了多次考验,如果你的业务有上述并发场景,建议可以选择 RocketMQ。
  • RabbitMQ

    • 结合 erlang 语言本身的并发优势,性能好时效性微秒级,社区活跃度也比较高,管理界面用起来十分方便
    • 如果你的数据量没有那么大,中小型公司优先选择功能比较完备的 RabbitMQ

1.2 RabbitMQ

RabbitMQ 是一个消息中间件

它接受并转发消息,你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑 RabbitMQ 是一个快递站,一个快递员帮你传递快件。

RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收,存储和转发消息数据

1.2.1 四大核心概念

1696322466173

  • 生产者
    • 产生数据发送消息的程序是生产者
  • 交换机
    • 交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息推送到队列
    • 交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个得由交换机类型决定
  • 队列
    • 队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存储在队列中
    • 队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区
    • 许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式
  • 消费者
    • 消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序
    • 请注意生产者,消费者和消息中间件很多时候并不在同一机器上,同一个应用程序既可以是生产者又是可以是消费者。

1.2.3 RabbitMQ 核心部分

六大模式

1696322800780

1.2.3 各个名词介绍

1696322866484

  • Broker:接收和分发消息的应用,RabbitMQ Server就是Message Broker (Broker:中间人、经济人)
  • Virtual host
    • 出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念
    • 当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等
  • Connection:producer/consumer 和 broker 之间的 TCP 连接
  • Channel
    • 如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCPConnection 的开销将是巨大的,效率也较低
    • Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯
    • AMQP method 包含了 channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的
    • Channel 作为轻量级的Connection极大减少了操作系统建立 TCP connection 的开销
  • Exchange
    • message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去
    • 常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
  • Queue:消息最终被送到这里等待 consumer 取走
  • Bindingexchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据

1.2.4 安装

RabbitMQ安装教程(最新RabbitMQ安装,通用教程)_rabbitmq 安装教程-CSDN博客

RabbitMQ超详细安装教程(Linux)_rabbitmq安装-CSDN博客

官网没看到 centos7 的,直接用课件里的下载吧,由于是基于 erlang 语言开发的,所以这个环境也得有

  • 官网地址https://www.rabbitmq.com/download.html

1696326762615

1696326916721

  • 安装文件(分别按照以下顺序安装)
1
2
3
rpm -ivh erlang-21.3-1.el7.x86_64.rpm  # 语言环境
yum install socat -y # 依赖包
rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm

1696327263436

1696327336516

1696327406332

  • 常用命令(按照以下顺序执行)

    • 添加开机启动 RabbitMQ 服务

      1
      chkconfig rabbitmq-server on
    • 启动服务

      1
      /sbin/service rabbitmq-server start 
    • 查看服务状态

      1
      /sbin/service rabbitmq-server status

    1696327594120

    • 停止服务(选择执行)
    1
    /sbin/service rabbitmq-server stop 
    • 开启 web 管理插件(先停止服务再安装插件,安装完后再开启服务)
    1
    rabbitmq-plugins enable rabbitmq_management
  • 访问 web 页面的步骤:(前提是已经开启服务)

    • 查看防火墙状态
    1
    systemctl status firewalld
    • 关闭防火墙
    1
    systemctl stop firewalld

    除了直接关闭之外,刚学了 nginx,当时把80端口开放了,同理这里也可以把需要的端口开放即可,比如:firewall-cmd --zone=public --add-port=15672/tcp --permanent

    然后重启防火墙:firewall-cmd --reload

    1696328294984

    云服务器的话,还需要去官网控制台开下防火墙端口即可

    • 访问页面:192.168.3.195:15672

    初始账号密码均为:guest,但是报错没有权限

    1696328423232

1.2.5 添加用户并设置权限

  • 添加一个新的用户

    • 创建账号:rabbitmqctl add_user admin 123

    用户名为 admin,密码为 123

    • 设置用户角色:rabbitmqctl set_user_tags admin administrator
    • 设置用户权限:rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"

    set_permissions [-p <vhostpath>] <user> <conf> <write> <read>

    用户 user_admin 具有/vhost1 这个 virtual host 中所有资源的配置、写、读权限
    - 查看当前用户和角色:rabbitmqctl list_users

1696329829860

  • 再次利用 admin 用户登录

1696329911633

可以看到当前的 vh 是 /

1696330008927

  • 重置命令
    • 关闭应用的命令为:rabbitmqctl stop_app
    • 清除的命令为:rabbitmqctl reset
    • 重新启动命令为:rabbitmqctl start_app

2. Hello World

这一部分中,我们将用 Java 编写两个程序:发送单个消息的生产者和接收消息并打印出来的消费者;

在下图中,“ P”是我们的生产者,“ C”是我们的消费者。中间的框是一个队列-RabbitMQ 代表使用者保留的消息缓冲区

1696330081432

1696331108425

2.1 创建工程并添加依赖

  • 创建一个新的空工程

1696330354476

1696330513210

  • 再创建一个新的maven模块

1696330605542

1696330703776

  • 依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<!--指定 jdk 编译版本-->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<!--rabbitmq 依赖客户端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
<!--操作文件流的一个依赖-->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
</dependencies>

1696330901573

2.2 消息生产者

连接超时的注意把防火墙关了!!或者再给防火墙开个端口:5672

firewall-cmd --zone=public --add-port=5672/tcp --permanent

然后重启防火墙:firewall-cmd --reload

  • 代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* 生产者: 发消息(发给队列)
*/
public class Producer {
// 队列名称 (快捷键 psf, 大写转换: ctrl+shift+U)
public static final String QUEUE_NAME = "hello";

// 发消息
public static void main(String[] args) throws Exception {
// 1. 创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();

// 2. 设置参数
// 2.1 工厂 IP 连接 RabbitMQ 的队列
factory.setHost("192.168.3.196");
// 2.2 用户名
factory.setUsername("admin");
// 2.3 密码
factory.setPassword("123");

// 3. 创建连接
Connection connection = factory.newConnection();
// 4. 通过连接创建信道
Channel channel = connection.createChannel();

// 5. 声明一个队列 参数:
// (1)队列名称
// (2)队列里的消息是否持久化(存在磁盘上), 默认情况(false)消息存储在内存中
// (3)该队列是否只供一个消费者进行消费, 即是否进行消息的共享, true表示只供个消费者消费
// (4)是否自动删除, 最后一个消费者端开连接后, 该队列是否自动删除, true表示自动删除
// (5)其他参数
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 6. 消息体
String message = "hello world";

// 7. 发消息 参数:
// (1)表示发送到哪个交换机 这里暂时不考虑, 所以给了个空串
// (2)路由的Key值是哪个 这里是队列名称
// (3)其他参数信息
// (4)发送消息的消息体 需要用二进制形式
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("消息发送完毕");
}
}
  • 结果

1696332830165

1696332809480

1696332790842

2.3 消息消费者

  • 代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* 接收消息
*/
public class Consumer {
// 队列名称
public static final String QUEUE_NAME = "hello";

// 接收消息
public static void main(String[] args) throws Exception {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();

// 2. 设置参数
// 2.1 设置ip
factory.setHost("192.168.3.196");
// 2.2 设置用户名
factory.setUsername("admin");
// 2.3 设置密码
factory.setPassword("123");

// 3. 创建连接
Connection connection = factory.newConnection();
// 4. 通过连接创建信道
Channel channel = connection.createChannel();

// 5. 声明回调接口(这里用 lambda 表达式声明)
// 5.1 接收消息的回调:
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println(new String(message.getBody()));
};
// 5.2 取消消息的回调:
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("消费消息被中断");
};

// 6. 消费者消费消息 参数:
// (1) queue: 消费哪个队列
// (2) autoAck: 消费成功之后是否要自动应答 true代表自动应答 false代表手动应答
// (3) deliverCallback: 当一个消息发送过来后的回调接口(也就是消费者成功消费的回调)
// (4) cancelCallback: 消费者取消消费的回调
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
  • 结果

注意这两都在运行

1696334949148

1696334924600

1696334983106

3. Work Queues

工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成,相反我们安排任务在之后执行,我们把任务封装为消息并将其发送到队列

在后台运行的工作进程将弹出任务并最终执行作业,当有多个工作线程时,这些工作线程将一起处理这些任务

image-20240324193014618

3.1 轮询分发消息

案例:启动两个工作线程,一个消息发送线程,观察工作线程如何工作

3.1.1 抽取工具类

  • 将创建 连接工厂 和 创建信道 作为公共部分抽取出来
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 连接工厂创建信道工具类
*/
public class RabbitMqUtils {

// 得到一个连接的 channel
public static Channel getChannel() throws Exception {
// 创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();

// 设置参数
factory.setHost("192.168.10.254");
factory.setUsername("admin");
factory.setPassword("123");

// 创建连接
Connection connection = factory.newConnection();

// 通过连接创建信道
Channel channel = connection.createChannel();

return channel;
}
}

3.1.2 启动两个工作进程

  • 先写消费者代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Worker01 {
private static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback=(consumerTag,delivery)->{
String receivedMessage = new String(delivery.getBody());
System.out.println("接收到消息:"+receivedMessage);
};
CancelCallback cancelCallback=(consumerTag)->{
System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
};
System.out.println("C2 消费者启动等待消费......");
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
  • 利用 idea 设置,启动两次 代表 两个工作进程(消费者)

image-20240324195454613

image-20240324195905816

3.1.3 启动一个发送进程

  • 编写生产者代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Task01 {
// 队列名称
private static final String QUEUE_NAME = "hello";

public static void main(String[] args) throws Exception {
// 发送大量消息
try (Channel channel = RabbitMqUtils.getChannel()) {
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 从控制台当中接受信息
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String message = scanner.next();
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("发送消息完成:" + message);
}
}
}
}

3.1.4 结果

通过程序执行发现生产者总共发送 4 个消息,消费者 1 和消费者 2 分别分得两个消息,并且是按照有序的一个接收一次消息

谁接收到第一条消息都是正常的,只不过是必须是轮询的

C1 接收了 aa,那么 bb 必定是 C2 接收的

image-20240324200514301

3.2 消息应答

3.2.1 概念

消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了,会发生什么情况?

RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该消费这的消息,因为它无法接收到。

为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了

消息应答

  • 自动应答 ==> 并不完善
  • 手动应答

3.2.2 自动应答

在自动应答模式下,一旦RabbitMQ将消息派发给消费者,它立即将该消息标记为确认

  • 消息发送后立即被认为已经传送成功,这种模式需要在 高吞吐量和数据传输安全性方面做平衡
    • 因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了
    • 另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,
    • 所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用

3.2.3 消息应答的方法

  • Channel.basicAck(用于肯定确认)
    • RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
    • 两个参数,deliveryTag(消息投递标签)和multiple(是否批量确认)
  • Channel.basicNack(用于否定确认)
    • 支持批量否定确认(通过multiple参数)
    • 除了deliveryTagmultiple参数,basicNack还有一个requeue参数
      • 如果requeuetrue,RabbitMQ会将消息重新放回队列中
      • 如果为false,则消息会被RabbitMQ丢弃或发送到死信队列(如果配置了的话)
  • Channel.basicReject(用于否定确认)
    • 与 Channel.basicNack 相比少一个参数 multiple
      • basicReject有两个参数,deliveryTagrequeue
    • 不处理该消息了直接拒绝,可以将其丢弃了

3.2.4 Multiple 的解释

手动应答的好处是可以批量应答并且减少网络拥堵

  • multiple 的 true 和 false:
    • true 代表批量应答 channel 上未应答的消息
      • 比如说 channel 上有传送 tag 的消息 5,6,7,8
      • 当前 tag 是 8 ,那么此时 5-8 的这些还未应答的消息都会被确认收到消息应答
    • false 同上面相比
      • 只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答

image-20240324202504504

3.2.5 消息自动重新入队

  • 当消费者接收到消息但由于某种原因无法处理时,可以选择让消息自动重新入队,这意味着消息会被放回队列中,等待再次被消费
  • 自动入队的工作原理
    • 未确认的消息:在手动应答模式下,如果消费者接收了一条消息但还没有对其进行确认(ack、nack或reject),那么在消费者连接断开时,这条消息会被RabbitMQ视为未确认的消息,并将其自动重新入队(通常会被放回队列的尾部)
    • 否定确认与重新入队:当使用basicNackbasicReject方法进行否定确认时,消费者可以指定是否要将消息重新入队。如果选择重新入队,该消息将再次成为队列中的一部分,可以被相同的消费者或其他消费者再次接收和处理

RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。

这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。

3.2.6 消息手动应答代码

默认消息采用的是自动应答,所以我们要想实现消息消费过程中不丢失,需要把自动应答改为手动应答

消费者在之前代码的基础上增加修改代码:

  • 接收消息部分:改 autoAck 为 false 手动应答
  • 消费者消费成功的回调函数中:增加代码表示是肯定的应答

编写两个消费者代码:

  • C1 等待接收消息处理的时间为 1 秒
  • C2 等待接收消息处理的时间为 30 秒
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class Worker03 {

// 队列名称
private static final String TASK_QUEUE_NAME = "ack_queue";

public static void main(String[] args) throws Exception {
// 通过工具类获取信道
Channel channel = RabbitMqUtils.getChannel();
System.out.println("C1 等待接收消息处理时间较短......");

// 接收消息
// 消费者成功消费的回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
// 睡眠 1s
SleepUtils.sleep(1);
String receivedMessage = new String(delivery.getBody());
System.out.println("接收到消息: " + receivedMessage);

// 手动应答, 参数:
// (1) 消息的标签
// (2) 是否应该批量应答 ==> 不应该, 否则容易出现消息丢失, 应处理一个应答一个
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};

// 消费者取消消费的回调
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println(consumerTag + "消费者取消消费接口回调逻辑");
};

// 第二个参数 autoAck 为 true 为自动应答, 改为 false 后即为手动应答
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
}
}
  • 睡眠工具类
1
2
3
4
5
6
7
8
9
public class SleepUtils {
public static void sleep(int second){
try {
Thread.sleep(1000*second);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}

3.2.7 手动应答效果演示

  • 测试目标:验证 消息在手动应答时不会丢失,而是放回队列中重新消费
    • 先运行生产者(因为目前还没创建信道),再运行两个消费者
  • 正常情况下消息发送方发送两个消息 C1 和 C2 分别接收到消息并进行处理

image-20240324211852173

  • 在发送者发送消息 dd,发出消息之后的把 C2 消费者停掉,按理说该 C2 来处理该消息,但是由于它处理时间较长,在还未处理完
  • 也就是说 C2 还没有执行 ack 代码的时候,C2 被停掉了,此时会看到消息被 C1 接收到了,说明消息 dd 被重新入队,然后分配给能处理消息的 C1 处理了

image-20240324211954960

3.3 RabbitMQ 持久化

3.3.1 概念

如何保障当 RabbitMQ 服务停掉以后消息生产者发送过来的消息不丢失。

默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它忽视队列和消息。确保消息不会丢失需要做两件事:我们需要将队列和消息都标记为持久化

3.3.2 队列如何实现持久化

  • 之前我们创建的队列都是非持久化的,rabbitmq 如果重启,该队列就会被删除掉,如果要队列实现持久化 需要在声明队列的时候把 durable 参数设置为持久化 (这样即使重启 rabbitmq 队列也依然存在)
1
2
3
// 第二个参数 false => true
boolean durable = true; // 需要进行持久化
channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null);
  • 注意:如果之前声明的队列不是持久化的,需要把原先队列先删除,或者重新创建一个持久化的队列,不然就会出现错误
    • 先删:

image-20240324213149448

  • 出现了持久化标识:注意这表示队列持久化,而不是消息持久化

image-20240324212904947

3.3.3 消息实现持久化

  • 要想让消息实现持久化需要在消息生产者修改代码,MessageProperties.PERSISTENT_TEXT_PLAIN 添加文本持久化属性
1
2
// 设置生产者发送消息为持久化消息(要求保存到磁盘上), 不写就默认保存在内存中(会丢失)
channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
  • 将消息标记为持久化并不能完全保证不会丢失消息

    • 尽管它告诉 RabbitMQ 将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候 但是还没有存储完,消息还在缓存的一个间隔点
    • 此时并没有真正写入磁盘。持久性保证并不强,但是对于简单任务队列而言,这已经绰绰有余了。
  • 如果需要更强有力的持久化策略,参考后边 8. 发布确认 章节

3.3.4 不公平分发

  • 默认情况下,RabbitMQ 分发消息采用的轮询分发

如果所有消费者都能以相同的速率处理消息,轮询分发是一个公平且高效的策略

  • 但是,如果某些消费者能够处理消息更快,而其他消费者处理得更慢,那么更快的消费者在完成工作后会空闲等待,因为RabbitMQ 仍然试图平等地分配消息给所有消费者
  • 为了避免这种情况,我们可以通过设置较低的预取计数值来实现不公平分发,通常是1,意味着 RabbitMQ 一次只会向每个消费者发送一条消息,等待消费者处理完并发送ack之后,才会发送下一条消息。这样,更快的消费者能够更频繁地接收消息,而不是等待其他消费者处理完他们的消息
  • 在两个消费者端,可以通过basicQos方法来设置预取计数值:
1
2
3
// 设置不公平分发
int prefetchCount = 1; // 0 是公平分发, 1 是不公平分发
channel.basicQos(prefetchCount);
  • 意思就是如果这个任务我还没有处理完或者我还没有应答你,你先别分配给我,我目前只能处理一个任务
  • 然后 rabbitmq 就会把该任务分配给没有那么忙的那个空闲消费者,当然如果所有的消费者都没有完成手上任务,队列还在不停的添加新任务,队列有可能就会遇到队列被撑满的情况,这个时候就只能添加新的 worker 或者改变其他存储任务的策略

3.3.5 预取值

预取值控制着消息的流速以及如何基于消费者的能力来优化消息分发

主要目的是为了平衡各个消费者的负载,防止快速的消费者空闲,同时避免慢的消费者积累太多待处理的消息

告诉RabbitMQ不要同时给一个消费者发送超过设定数量的消息,除非这些消息已经被确认

  • 异步消息发送意味着消息可以快速地发送给消费者,而不需要等待每条消息都被确认
    • 但这也可能导致一个快速的消费者接收到过多的消息,而来不及处理
    • 通过设置预取值,RabbitMQ能够更智能地控制消息的流向,确保消费者能够以他们自己的速度处理消息。该值定义通道上允许的未确认消息的最大数量。一旦数量达到配置的数量,RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认
    • 例如,假设在通道上有未确认的消息 5、6、7,8,并且通道的预取计数设置为 4,此时 RabbitMQ 将不会在该通道上再传递任何消息,除非至少有一个未应答的消息被 ack。比方说 tag=6 这个消息刚刚被确认 ACK,RabbitMQ 将会感知这个情况到并再发送一条消息

image-20240324215642902

  • 消息应答和 QoS 预取值对用户吞吐量有重大影响
    • 通常,增加预取将提高向消费者传递消息的速度。虽然自动应答传输消息速率是最佳的,但是,在这种情况下已传递但尚未处理的消息的数量也会增加,从而增加了消费者的 RAM 消耗(随机存取存储器)
    • 应该小心使用具有无限预处理的自动确认模式或手动确认模式,消费者消费了大量的消息如果没有确认的话,会导致消费者连接节点的内存消耗变大
    • 所以找到合适的预取值是一个反复试验的过程,不同的负载该值取值也不同,100 到 300 范围内的值通常可提供最佳的吞吐量,并且不会给消费者带来太大的风险
    • 预取值为 1 是最保守的。当然这将使吞吐量变得很低,特别是消费者连接延迟很严重的情况下,特别是在消费者连接等待时间较长的环境中。对于大多数应用来说,稍微高一点的值将是最佳的

4. 发布确认

4.1 发布确认原理

发布确认(Publisher Confirms)机制是一种确保消息从生产者发送到 RabbitMQ 服务器的可靠方式

这种机制允许生产者知道其消息是否已成功到达目标队列

工作原理

  • 启用发布确认模式:生产者在通道上启用发布确认,启用后,该通道会进入发布确认模式。
  • 发送消息:生产者通过已启用发布确认的通道发送消息。发送消息时,生产者可以为每条消息指定一个唯一的标识符(从 1 开始)。
  • 等待确认:发送消息后,生产者等待来自RabbitMQ服务器的确认。服务器确认消息的两种方式:
    • 确认(Ack):如果消息被队列接受,RabbitMQ服务器会发送一个确认给生产者(包含消息的唯一 ID),表明消息已被成功接收。
    • 未确认(Nack):如果消息由于某些原因未能被队列接受,服务器会发送一个未确认给生产者,指出消息没有被成功处理。
  • 处理确认:生产者根据收到的确认或未确认采取相应的行动。例如,如果收到未确认,生产者可能决定重新发送消息或记录错误。

confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息。

4.2 发布确认策略

三种:单个确认、批量确认和异步确认

4.2.1 开启发布确认的方法

  • 发布确认默认是没有开启的

  • 如果要开启需要调用方法 confirmSelect,每当你要想使用发布确认,都需要在 channel 上调用该方法(获取了信道后开启发布确认

1
2
Channel channel = RabbitMqUtils.getChannel();
channel.confirmSelect();

4.2.2 单个确认发布

  • 是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布
  • waitForConfirmsOrDie(long) 这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。
  • 缺点发布速度特别的慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量(对于某些应用程序来说这可能已经足够了)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 1. 单个确认
public static void publishMessageIndividually() throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
// 队列名称 这里用 UUID 随机生成名称
String queueName = UUID.randomUUID().toString();

// 声明队列
channel.queueDeclare(queueName, false, false, false, null);

// 开启发布确认
channel.confirmSelect();

// 开始时间
long begin = System.currentTimeMillis();

// 批量发消息
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + ""; // 消息内容
// 发消息
channel.basicPublish("", queueName, null, message.getBytes());
// 单个消息就马上进行发布确认
// 服务端返回 false 或超时时间内未返回, 生产者可以消息重发
boolean flag = channel.waitForConfirms();
if (flag) {
System.out.println("消息发送成功");
}
}
// 结束时间
long end = System.currentTimeMillis();
// 用时
System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) +
"ms");
}
}

4.2.3 批量确认发布

  • 与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地提高吞吐量

  • 缺点:当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息

    • 当然这种方案仍然是同步的,也一样阻塞消息的发布
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public static void publishMessageBatch() throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
// 开启发布确认
channel.confirmSelect();
// 批量确认消息大小
int batchSize = 100;
// 未确认消息个数
int outstandingMessageCount = 0;
// 开始时间
long begin = System.currentTimeMillis();
// 批量发送消息, 批量发布确认
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
// 发消息
channel.basicPublish("", queueName, null, message.getBytes());
outstandingMessageCount++;
// 判断达到 100 条消息后, 批量确认一次
if (outstandingMessageCount == batchSize) {
channel.waitForConfirms();
outstandingMessageCount = 0;
}
}
// 为了确保还有剩余没有确认消息 再次确认
if (outstandingMessageCount > 0) {
channel.waitForConfirms();
}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "个批量确认消息,耗时" + (end - begin) +
"ms");
}
}

4.2.4 异步确认发布

  • 异步确认是一种高效的确认方式,无论是可靠性还是效率都很优秀,它是利用回调函数来达到消息可靠性传递

image-20240329173344260

  • 如何处理异步未确认消息 ==> 使用ConcurrentSkipListMap处理未确认消息
    • ConcurrentSkipListMap 是 Java 并发包中的一种线程安全且排序的映射表
      • 内部使用跳表(Skip List)作为数据结构
      • 跳表是一种概率性平衡的数据结构,通过多层索引来提高搜索、插入和删除操作的效率
      • ConcurrentSkipListMap 提供了一种高效的并发访问和排序的映射结构
    • 用作发布线程和确认回调之间的桥梁(两个线程),用于存储和传递未确认的消息
      • 存储未确认消息:当生产者发送消息时,将该消息的标识(如消息ID或唯一标识符)和相关信息(如发送时间戳、重试次数等)存入ConcurrentSkipListMap。键可以是消息ID,值可以是包含消息详细信息和发送时间的对象
      • 访问和处理(移除已经确认的):确认回调函数(即RabbitMQ向生产者确认消息时触发的回调)中,根据消息的标识(从确认通知中获得)查找ConcurrentSkipListMap中对应的条目,并将其移除。这表示消息已成功处理。对于识别出的超时未确认消息,可以执行重试逻辑,将消息重新发送
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// 3. 异步批量确认
public static void publishMessageAsync() throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
// 开启发布确认
channel.confirmSelect();

// 线程安全有序的一个map ConcurrentSkipListMap, 底层是跳表, 适用于高并发的情况
// 1. 轻松的将序号与消息进行关联
// 2. 轻松批量删除条目 只要给到序列号
// 3. 支持并发访问
ConcurrentSkipListMap<Long, String> outstandingConfirms = new
ConcurrentSkipListMap<>();

// 消息确认成功回调函数, 两个参数:
// 1. 消息序列号
// 2. true 可以确认小于等于当前序列号的消息
// false 确认当前序列号消息
ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
if (multiple) { // 如果是批量, 就批量移除
// 返回的是小于等于当前序列号的未确认消息 是一个 map
ConcurrentNavigableMap<Long, String> confirmed =
outstandingConfirms.headMap(sequenceNumber, true);
// 清除该部分未确认消息
confirmed.clear();
} else { // 如果是单个确认
// 只清除当前序列号的消息
outstandingConfirms.remove(sequenceNumber);
}
};
// 消息确认失败回调函数
ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {
String message = outstandingConfirms.get(sequenceNumber);
System.out.println("发布的消息" + message + "未被确认,序列号" + sequenceNumber);
};

// 添加一个异步确认的监听器 监听哪些消息成功了, 哪些消息失败了 两个参数:
// 1. 确认收到消息的回调 ==> 监听成功
// 2. 未收到消息的回调 ==> 监听失败, 不需要就写 null
channel.addConfirmListener(ackCallback, nackCallback);
// 开始时间
long begin = System.currentTimeMillis();

// 批量发送消息
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = "消息" + i;
// channel.getNextPublishSeqNo() 从信道中获取下一个消息的序列号
// 通过序列号与消息体进行一个关联, 全部都是未确认的消息体
outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
channel.basicPublish("", queueName, null, message.getBytes());
}

long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "个异步确认消息,耗时" + (end - begin) +
"ms");
}
}

4.2.5 以上 3 种发布确认速度对比

  • 单独发布消息
    • 同步等待确认,简单,但吞吐量非常有限。
  • 批量发布消息
    • 批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是哪条消息出现了问题
  • 异步处理
    • 最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些

5. 交换机

  • 工作队列模式
    • 任务(消息)被分配给多个消费者中的一个。每个任务都恰好由一个消费者处理,这种模式适用于分布式任务处理场景,其中每个任务只需要被处理一次
  • 发布/订阅模式
    • 消息不是被发送给单个消费者,而是被广播给所有订阅了该消息的消费者。这意味着发布的每条消息可以被多个消费者同时接收和处理
    • 如,日志系统
      • 系统组成:示例中的日志系统由两部分组成:日志消息的生产者(发布者)和消费者(订阅者)。
        • 生产者:负责发出日志消息。
        • 消费者:接收并处理日志消息。示例中有两个消费者:一个将日志消息保存到磁盘,另一个将消息打印到屏幕上。
      • 消息广播:在这个系统中,生产者发出的日志消息会被广播给所有消费者。每个消费者都会接收到所有的日志消息,而不是像工作队列模式那样,一个消息只被一个消费者接收

5.1 Exchanges

5.1.1 概念

  • RabbitMQ 消息传递模型的核心思想生产者生产的消息从不会直接发送到队列
    • 实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中
    • RabbitMQ中的消息传递模型通过引入交换机这一层,将生产者和队列解耦,提高了消息处理的灵活性和可扩展性
    • 生产者将消息发送到交换机,由交换机根据其类型和绑定规则决定消息的去向,这可以是一个队列、多个队列,或者在某些情况下,消息也可能被丢弃

image-20240330161012976

  • 交换机(Exchange)的角色和工作机制

    • 交换机作为中介:一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机是生产者和队列之间的中介
    • 处理规则由交换机类型决定:交换机根据自己的类型(如直接交换机、主题交换机、扇出交换机和头交换机)来决定如何处理接收到的消息。这包括:
      • 放到特定的队列:根据规则,交换机可能将消息路由到一个或多个特定队列
      • 广播到多个队列:某些类型的交换机(如扇出交换机)会将消息发送到所有绑定到该交换机的队列
      • 丢弃消息:如果没有队列与交换机正确绑定,或者根据交换机的路由规则找不到合适的队列,消息可能会被丢弃
  • RabbitMQ 中常用的交换机类型

    • 直接(direct) ==> 一对一的消息分发
    • 扇出(fanout) ==> 发布订阅模式
    • 主题(topic) ==> 根据消息的路由键和队列的绑定键之间的模式匹配来路由消息
    • 标题(headers) ==> 不咋用,根据消息内容而非路由键进行路由
    • 默认/无名(default)
  • 默认/无名交换机

    • 之前我们使用的就是默认交换,通过空字符串 "" 进行标识
    • 第一个参数是交换机的名称,空字符串表示默认或无名交换机,直接将消息路由到与路由键同名的队列
1
channel.basicPublish("", queueName, null, message.getBytes());

5.2 临时队列

  • 没有持久化的队列是临时队列,一旦我们断开了消费者的连

    接,队列将被自动删除

  • 创建临时队列的方式如下:RabbitMQ)会自动为其生成一个唯一的名称,避免命名冲突

1
String queueName = channel.queueDeclare().getQueue();
  • 创建效果

image-20240330164500891

5.3 绑定

  • 绑定(Binding)是定义交换机和队列之间关系的桥梁
    • 绑定充当路由规则,指示交换机如何根据消息的属性(如路由键或其他属性)将消息路由到一个或多个特定的队列
  • 绑定可以包括一个路由键(Routing Key),交换机会使用这个路由键按照绑定的规则来路由消息。路由键的使用和重要性取决于交换机的类型。
    • 对于直接交换机,消息被发送到路由键完全匹配的队列。
    • 对于主题交换机,路由键可以包含通配符,实现更灵活的匹配。
    • 扇出交换机忽略路由键,广播消息到所有绑定的队列。
    • 头交换机根据消息头而非路由键进行匹配。

5.4 Fanout

  • 扇出交换机(Fanout Exchange)是RabbitMQ中实现发布-订阅模式的一种交换机类型,它能够将接收到的消息广播到所有绑定到该交换机的队列中
    • 将接收到的所有消息广播到它知道的所有队列中
    • 系统中默认自带一些交换机类型,咱们也可以自己自定义

image-20240330165644277

  • 代码实现:写两个消费者,一个生产者
    • 交换机的创建可以由生产者或消费者来完成
    • RabbitMQ 允许你重复创建相同配置的交换机,后续创建操作会被静默忽略,简化了管理逻辑
  • 消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class ReceiveLogs01 {
// 交换机名称
private static final String EXCHANGE_NAME = "logs";

public static void main(String[] argv) throws Exception {
Channel channel = RabbitMqUtils.getChannel(); // 建立信道
// 创建交换机, 参数为交换机名称和类型
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

// 声明一个临时队列 队列的名称是随机的
// 当消费者断开和该队列的连接时 队列自动删除
String queueName = channel.queueDeclare().getQueue();

// 绑定临时队列和交换机
// 其中 routingKey(也称之为 binding key)为空字符串
channel.queueBind(queueName, EXCHANGE_NAME, "");

System.out.println("等待接收消息,把接收到的消息打印在屏幕.....");

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("控制台打印接收到的消息" + message);
};

// 接收消息
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
}
}
  • 生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class EmitLog {
private static final String EXCHANGE_NAME = "logs"; // 交换机名称

public static void main(String[] argv) throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {

// 声明一个 交换机
// 1. exchange 的名称
// 2. exchange 的类型
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

Scanner sc = new Scanner(System.in);
System.out.println("请输入信息");
while (sc.hasNext()) {
String message = sc.nextLine();
// 发送消息
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.println("生产者发出消息" + message);
}
}
}
}

CentOS 连不上了,测不了,试了下面这个,虽然能进去,但还是报关闭连接的错误:
centOS7 桥接模式设置静态Ip的方法步骤-腾讯云开发者社区-腾讯云 (tencent.com)

用云服务器重新配了下环境,测了下,可以的,没毛病

5.5 Direct exchange

  • 扇出交换机 VS 直接/路由交换机
    • 扇出交换机忽略路由键(routing key),当一条消息发送到扇出交换机时,它会将消息发送到所有与之绑定的队列
    • 直接交换机使用路由键(routing key)来路由消息,生产者在发送消息时会指定一个路由键,直接交换机根据这个路由键将消息路由到绑定时指定了相同路由键的队列
    • 如果需要将相同的消息广播给多个消费者,扇出交换机是合适的选择;如果需要根据某种标准(例如,消息类型或优先级等)将消息路由到特定的队列,直接交换机会更加适合
  • 直接交换机的使用场景举例
    • 构建日志系统时,如何根据日志的严重级别(例如错误、警告、信息)将消息分发到不同的队列
    • 期望实现的功能是能够根据日志的级别(例如,仅将“错误”日志消息写入磁盘),以避免将警告或信息日志存储到磁盘中,从而节省磁盘空间
  • 直接交换机工作原理
    • 直接交换机根据发送到交换机的消息的路由键(routing key)和队列的绑定键(binding key)进行匹配,只将消息路由到完全匹配的队列
    • 例如,假设有一个直接交换机X,它绑定了两个队列:队列Q1的绑定键为“orange”,而队列Q2的绑定键为“black”和“green” (一个队列可以绑多遍)
      • 发送到交换机X,路由键为“orange”的消息将被路由到队列Q1
      • 路由键为“black”或“green”的消息将被路由到队列Q2
      • 其他不匹配的消息将被丢弃

image-20240330213930427

  • 多重绑定(类似扇出交换机)
    • 当一个直接交换机绑定多个队列时,如果所有队列的绑定键都相同,那么尽管绑定类型是直接的,这种配置实际上会让交换机的行为类似于扇出交换机——相当于广播,因为所有绑定到该交换机的队列都会接收到相同的消息

image-20240330214053865

  • 实战

image-20240330214118580

image-20240331170351854

  • 消费者
    • 写两个,这里就一个示例下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class ReceiveLogsDirect02 {
private static final String EXCHANGE_NAME = "direct_logs";

public static void main(String[] argv) throws Exception {
// 获取信道
Channel channel = RabbitMqUtils.getChannel();
// 声明直接交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 声明队列
String queueName = "console";
channel.queueDeclare(queueName, false, false, false, null);
// 绑定两个键
channel.queueBind(queueName, EXCHANGE_NAME, "info");
channel.queueBind(queueName, EXCHANGE_NAME, "warning");

System.out.println("等待接收消息.....");
// 接收消息的回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" 接收绑定键 :" + delivery.getEnvelope().getRoutingKey() + ", 消息:" + message);
};

channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
  • 生产者
    • 生产者这边发送的消息是写死的,所以先运行消费者后再运行生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs";

public static void main(String[] argv) throws Exception {

try (Channel channel = RabbitMqUtils.getChannel()) {
// 声明一个直接交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

// 创建多个 bindingKey
Map<String, String> bindingKeyMap = new HashMap<>();
bindingKeyMap.put("info", "普通 info 信息");
bindingKeyMap.put("warning", "警告 warning 信息");
bindingKeyMap.put("error", "错误 error 信息");

// debug 没有消费这接收这个消息 所以就丢失了
bindingKeyMap.put("debug", "调试 debug 信息");

// 生产者测试消息的发送, 发送消息和对应的路由键
for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) {
String bindingKey = bindingKeyEntry.getKey();
String message = bindingKeyEntry.getValue();
channel.basicPublish(EXCHANGE_NAME, bindingKey, null,
message.getBytes("UTF-8"));
System.out.println("生产者发出消息:" + message);
}
}
}
}

5.6 Topics

5.6.1 主题交换机的引入

  • 扇出交换机的局限
    • 扇出交换机广播所有接收到的消息到所有绑定的队列,没有路由逻辑,适用于简单的广播场景,但无法进行细粒度的消息路由
  • 直接交换机的改进
    • 直接交换机允许根据消息的路由键和队列的绑定键进行精确匹配,从而实现有选择性地接收消息。这比扇出交换机提供了更高的灵活性,但它仍然有局限性,特别是当需求变得更复杂,需要基于多个条件进行消息路由时
  • 主题交换机的引入
    • 主题交换机通过允许路由键和绑定键之间使用模式匹配来解决直接交换机的局限性。主题交换机使得可以根据多个标准灵活地路由消息,非常适合复杂的路由逻辑
    • 路由键的要求:发送到主题交换机的消息的路由键必须是由点号分隔的单词列表,例如:stock.usd.nyse。这提供了一种结构化的方式来指定消息的属性或类别
    • 替换符:允许进行更复杂的匹配逻辑
      • *(星号):匹配路由键中的一个单词。
      • #(井号):匹配路由键中的零个或多个单词。
  • 示例:如果某个队列只想接收到以 info.base 为主题的日志消息,而另一个队列想接收所有以 info 开头的日志消息(无论是”info.base”还是”info.advantage”)使用主题交换机可以轻松实现这一需求
    • 例如,第一个队列可以使用绑定键 "info.base"
    • 而第二个队列可以使用绑定键 "info.#"

5.6.2 Topic 匹配案例

  • 前提

    • 队列Q1:绑定键为*.orange.*

      • Q1 只接收包含三个单词,其中第二个单词是”orange”的消息
    • 队列Q2:有两个绑定键,*.*.rabbitlazy.#

      • Q2 将接收最后一个单词是”rabbit”的包含三个单词的消息,以及以”lazy”开头的任意长度的消息

image-20240331165905276

  • 案例
    • quick.orange.rabbit:匹配Q1和Q2的绑定(因为它满足*.orange.**.*.rabbit),所以被两个队列接收。
    • lazy.orange.elephant:同样匹配Q1和Q2的绑定(满足*.orange.*lazy.#),被两个队列接收。
    • quick.orange.fox:只匹配Q1的绑定(*.orange.*),只被Q1接收。
    • lazy.brown.fox:只匹配Q2的lazy.#绑定,只被Q2接收。
    • lazy.pink.rabbit:虽然同时满足Q2的两个绑定条件,但消息在队列中不会重复,所以它只被Q2接收一次。
    • quick.brown.fox:不匹配任何绑定条件,被丢弃。
    • quick.orange.male.rabbit:因为是四个单词,不满足任何队列的绑定条件,被丢弃。
    • lazy.orange.male.rabbit:尽管是四个单词,但它满足Q2的lazy.#绑定条件,因此被Q2接收。
  • 特殊情况
    • 全匹配(#:如果一个队列的绑定键是#,那么这个队列会接收所有的消息,类似于扇出(fanout)交换机的行为。这种绑定键的设置使得队列成为一个”全订阅”者。
    • 直接匹配:如果一个队列的绑定键中不包含#*,那么这个队列的绑定类型实际上就类似于直接(direct)交换机的行为。这意味着消息的路由键必须完全匹配绑定键,消息才会被路由到该队列。

image-20240331185153187

5.6.3 代码实战

还是两个消费者Q1和Q2,一个生产者

  • 消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class ReceiveLogsTopic02 {
private static final String EXCHANGE_NAME = "topic_logs";

public static void main(String[] argv) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");

// 声明 Q2 队列与绑定关系
String queueName = "Q2";
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");
channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");

System.out.println("等待接收消息.....");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" 接收队列 :" + queueName + " 绑定键:" + delivery.getEnvelope().getRoutingKey()
+ ", 消息:" + message);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
}
}
  • 生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";

public static void main(String[] argv) throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

// Q1-->绑定的是
// 中间带 orange 带 3 个单词的字符串(*.orange.*)
// Q2-->绑定的是
// 最后一个单词是 rabbit 的 3 个单词(*.*.rabbit)
// 第一个单词是 lazy 的多个单词(lazy.#)
Map<String, String> bindingKeyMap = new HashMap<>();
bindingKeyMap.put("quick.orange.rabbit", "被队列 Q1Q2 接收到");
bindingKeyMap.put("lazy.orange.elephant", "被队列 Q1Q2 接收到");
bindingKeyMap.put("quick.orange.fox", "被队列 Q1 接收到");
bindingKeyMap.put("lazy.brown.fox", "被队列 Q2 接收到");
bindingKeyMap.put("lazy.pink.rabbit", "虽然满足两个绑定但只被队列 Q2 接收一次");
bindingKeyMap.put("quick.brown.fox", "不匹配任何绑定不会被任何队列接收到会被丢弃");
bindingKeyMap.put("quick.orange.male.rabbit", "是四个单词不匹配任何绑定会被丢弃");
bindingKeyMap.put("lazy.orange.male.rabbit", "是四个单词但匹配 Q2");

for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) {
String bindingKey = bindingKeyEntry.getKey();
String message = bindingKeyEntry.getValue();
channel.basicPublish(EXCHANGE_NAME, bindingKey, null,
message.getBytes("UTF-8"));
System.out.println("生产者发出消息" + message);
}
}
}
}

6. 死信队列

6.1 死信的概念和来源

  • 死信:无法被消费的消息,可能因为多种原因导致消息无法被正常处理,而死信队列就是用来收集这些无法处理的消息的特殊队列

  • 死信队列(DLQ, Dead Letter Queue):用于处理无法被正常消费的消息

  • 死信的常见来源

    • 消息TTL过期:TTL(Time-To-Live),即消息的存活时间。如果一个消息在队列中的存活时间超过了它的TTL,它就会变成死信
    • 队列达到最大长度:如果队列已经达到了它的最大长度,新的消息就无法再添加到队列中。在这种情况下,如果队列配置了死信交换机和死信路由键,超出长度的消息会被发送到死信队列
    • 消息被显式拒绝:当消费者通过basic.rejectbasic.nack命令拒绝消息,并且设置requeue=false(不重新入队列)时,这个消息就会变成死信
  • 应用场景

    • 保障业务的消息不丢失:比如订单处理系统,使用死信队列可以保证即使消费过程中发生异常,消息也不会丢失,而是被安全地移至死信队列中,后续可以对这些消息进行分析和再处理
    • 订单超时处理:在电商平台中,用户下单后可能存在未在规定时间内完成支付的情况。这时,可以通过设置消息的TTL来让未支付的订单消息过期,然后自动转移到死信队列中进行订单失效处理

6.2 死信实战

还是两个消费者,一个生产者

C1 代码需要声明两个交换机(包括死信交换机),两个队列(包括死信队列),还要涉及普通队列和死信交换机的捆绑问题

C2 只要接收死信队列的消息即可

image-20240331190302962

6.2.1 消息 TTL 过期

  • 消费者1(启动之后关闭该消费者 模拟其接收不到消息)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class Consumer01 {
// 普通交换机名称
private static final String NORMAL_EXCHANGE = "normal_exchange";
// 死信交换机名称
private static final String DEAD_EXCHANGE = "dead_exchange";
// 普通队列名称
private static final String NORMAL_QUEUE = "normal_queue";
// 死信队列名称
private static final String DEAD_QUEUE = "dead_queue";

public static void main(String[] argv) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 声明死信和直接交换机 类型为 direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

// 声明死信队列
channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
// 死信队列绑定死信交换机与 routingKey
channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");

// 普通队列绑定死信队列的参数
Map<String, Object> params = new HashMap<>();
// 可以由生产者指定过期时间 10s
// params.put("x-message-ttl", 10000);
// 参数1: 普通队列设置死信交换机 参数 key 是固定值
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
// 参数2: 普通队列设置死信 routing-key 参数 key 是固定值
params.put("x-dead-letter-routing-key", "lisi");

// 声明普通队列
channel.queueDeclare(NORMAL_QUEUE, false, false, false, params);
// 绑定到直接交换机与 routingKey
channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");

System.out.println("等待接收消息.....");
// 接收消息的回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Consumer01 接收到消息" + message);
};

// 消费
channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, consumerTag -> {});
}
}

普通交换机绑普通队列,死信交换机绑死信队列

image-20240331194218496

  • 生产者:设置过期时间 ttl
    • 在关闭了消费者后运行生产者,可以发现过了10秒后消息都到死信队列里了
    • 然后再写个消费者2把死信队列里的消费掉即可,消费者2只用写个信道,然后接收死信队列消息即可
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class Producer {
private static final String NORMAL_EXCHANGE = "normal_exchange";

public static void main(String[] argv) throws Exception {

try (Channel channel = RabbitMqUtils.getChannel()) {
// 声明直接交换机
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);

// 设置消息的 TTL 时间 10s
AMQP.BasicProperties properties = new AMQP.BasicProperties()
.builder().expiration("10000").build();

// 该信息是用作演示队列个数限制
for (int i = 1; i < 11; i++) {
String message = "info" + i;
// 设置 properties 参数
channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties,
message.getBytes());
System.out.println("生产者发送消息:" + message);
}
}
}
}

image-20240331194650547

6.2.2 队列达到最大长度

  • 队列达到最大长度后,超出长度的消息会被发送到死信队列

  • 修改代码:

    • 6.2.1 中,生产者代码中的 TTL 属性去掉,设置为 null
    • 修改消费者1的代码,添加参数3:
1
2
3
4
5
6
7
8
// 普通队列绑定死信队列的参数
Map<String, Object> params = new HashMap<>();
// 参数1: 普通队列设置死信交换机 参数 key 是固定值
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
// 参数2: 普通队列设置死信 routing-key 参数 key 是固定值
params.put("x-dead-letter-routing-key", "lisi");
// 参数3: 普通队列设置长度限制
params.put("x-max-length", 6);

注意测试时需要把原先普通队列删除,因为参数改变了

  • 还是先运行消费者1,生成队列以及队列和死信交换机之间的关系,然后停止消费者1(为了看效果,让消息积压在队列中),再运行生产者==>可以看到普通队列6个,死信里有4个

image-20240331200010021

image-20240331200144519

  • 最后记得把消费者1和2都启动下,恢复初始状态方便后续测试

6.2.3 消息被拒

  • 修改代码:
    • 6.2.2 中消费者1添加的长度限制属性注释掉
    • 设置消费者1拒绝消息,在接收回调的函数中设置拒绝 info5 消息,并且不允许放回普通队列,则只能进入死信队列
    • 记得开启手动应答!因为自动应答只要队列发消息就算成功
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 接收消息的回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
if (message.equals("info5")) { // 拒绝 info5
System.out.println("Consumer01 接收到消息" + message + "并拒绝签收该消息");
// requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中
channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
} else {
System.out.println("Consumer01 接收到消息" + message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};

// 消费
boolean autoAck = false;
channel.basicConsume(NORMAL_QUEUE, autoAck, deliverCallback, consumerTag -> {});
  • 在 web 界面删除普通队列后,重新运行消费者1,然后再运行生产者,可以看到死信队列1条(其他 9 条被普通队列消费了已经),可以看到那一条确实是 info5

image-20240331201928728

  • 最后运行下消费者2,即可消费掉这条 info5

7. 延迟队列

7.1 延迟队列的概念和使用场景

  • 延迟队列:核心特性是能够控制消息在队列中的停留时间,只有当指定的时间到达时,消息才会被消费者处理

    • 适合于需要在特定时间执行任务的场景
  • 使用场景

    • 订单支付超时处理:对于在线支付场景,订单如果在一定时间(如10分钟)内未支付,系统需要自动取消这些订单
    • 提醒功能:对于新注册用户或新开店铺,如果在一定时间内未进行某些操作(如三天内未登录、十天内未上传商品),系统可以自动发送提醒
    • 退款处理提醒:对用户发起的退款请求,如果在一定时间(如三天)内未处理,系统可以自动提醒运营人员进行处理
    • 会议通知:对于预定的会议,系统可以在会议开始前十分钟自动通知与会人员
  • 延迟队列与轮询的对比

    • 性能和效率:当数据量大时,使用定时任务不断轮询数据库检查哪些任务需要被执行是低效且对数据库压力大的。尤其是在高并发场景下,轮询可能无法在短时间内完成所有任务的检查,且对系统资源消耗大
    • 时效性:延迟队列可以精确控制任务的执行时间,确保任务能够在预定时间点准确执行,而轮询机制可能因为间隔时间设置不当导致执行时间的不精确
  • 实现延迟队列的方法:在RabbitMQ中,可以通过设置消息的TTL(Time-To-Live,生存时间)和使用死信交换机(DLX)组合来实现延迟队列。具体步骤如下:

    • 设置消息TTL:为消息设置一个TTL,即该消息在队列中可以存活的最长时间
    • 配置死信交换机:当消息因为TTL过期或其他原因变成死信后,它会被发送到配置的死信交换机,进而路由到一个或多个指定的队列
    • 消费死信队列中的消息设置消费者监听死信队列,一旦消息因TTL过期被路由到死信队列,消费者即可进行处理

7.2 整合 SpringBoot

7.2.1 创建项目

image-20240407190827899

image-20240407190945485

image-20240407191045720

7.2.2 RabbitMQ 依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
<dependencies>
<!--RabbitMQ 依赖-->
<dependency> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>

<!--RabbitMQ 测试依赖-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>

<!--swagger-->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>

<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>
</dependencies>

7.2.3 修改配置文件

1
2
3
4
5
6
spring:
rabbitmq:
host: xxx.xx.xx.xxx
port: 5672
username: admin
password: 123

7.2.4 添加 Swagger 配置类

  • config/SwaggerConfig.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class SwaggerConfig {
@Bean
public Docket webApiConfig() {
return new Docket(DocumentationType.SWAGGER_2)
.groupName("webApi")
.apiInfo(webApiInfo())
.select()
.build();
}

private ApiInfo webApiInfo() {
return new ApiInfoBuilder()
.title("rabbitmq 接口文档")
.description("本文档描述了 rabbitmq 微服务接口定义")
.version("1.0")
.contact(new Contact("thr", "http://thr.com",
"hhh@qq.com"))
.build();
}
}

7.3 RabbitMQ 中的 TTL

  • RabbitMQ中的TTL(Time-To-Live,生存时间)是指消息或队列中所有消息的最大存活时间,单位为毫秒

    • 这个特性允许消息在一定时间内未被消费则自动过期,过期的消息会变成死信(Dead Letter)。通过利用TTL和死信队列,我们可以在RabbitMQ中实现延迟队列的功能
  • 设置TTL的两种方式

    • 消息级别的TTL:可以为每条消息单独设置TTL。这种方式提供了灵活性,允许不同的消息有不同的生存时间。
    • 队列级别的TTL:在创建队列时,通过设置x-message-ttl属性为整个队列中的所有消息指定统一的TTL。队列级别的TTL适用于那些消息生存时间一致的场景
  • 两种方式的区别

    • 过期处理
      • 如果设置了队列级TTL,一旦消息过期,就会被队列自动丢弃(如果配置了死信交换机,则被发送到死信队列)
      • 而对于消息级别的TTL,消息是否过期是在消息即将被消费之前判定的。这意味着,如果队列中有大量积压的消息,即使某些消息已经过期,它们也可能会在队列中存活更长的时间,直到接近被消费时才被丢弃
    • 立即过期的消息:如果将TTL设置为0,表示这些消息除非能够立即被消费,否则将被立即丢弃
  • 实现延迟队列

    • 配置死信交换机:为需要实现延迟的队列配置死信交换机(DLX),并指定死信路由键。
    • 设置TTL:根据需要,为队列或消息设置TTL。当消息在队列中存活时间超过TTL后,它会变成死信。
    • 死信转移:变成死信后的消息会被自动发送到绑定到死信交换机的队列(即死信队列)中。
    • 消费死信队列:消费者监听死信队列,处理到期的延迟消息。这样,消息在延迟特定时间后才被消费,实现了延迟队列的功能。

7.4 队列 TTL

7.4.1 案例

创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交换机 Y,它们的类型都是 direct,创建一个死信队列 QD,它们的绑定关系如下:

image-20240407195336713

需要声明 2 个交换机 X 和 Y(死信),以及 3 个队列 QA、QB、QD(死信),绑定 QA 和 QB 到 X 和 Y 上

7.4.2 配置文件类代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
/**
* TTL 队列 配置文件类代码
* 需要声明 2 个交换机 X 和 Y(死信), 以及 3 个队列 QA、QB、QD(死信)
* 绑定 QA 和 QB 到 X 和 Y 上
*/
@Configuration
public class TtlQueueConfig {
// 普通交换机和普通队列
public static final String X_EXCHANGE = "X";
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";

// 死信交换机和死信队列
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
public static final String DEAD_LETTER_QUEUE = "QD";

// 注册两个直接交换机的 bean, 使得其他组件如消息生产者和消费者可以通过自动装配机制使用这些交换机
// 声明 X 交换机, bean 名称在注入的时候有用
@Bean("xExchange")
public DirectExchange xExchange() {
return new DirectExchange(X_EXCHANGE);
}

// 声明 xExchange
@Bean("yExchange")
public DirectExchange yExchange() {
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}

// 声明队列 A ttl 为 10s 并绑定到对应的死信交换机
@Bean("queueA")
public Queue queueA() {
Map<String, Object> args = new HashMap<>(3);
// 声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
// 声明当前队列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
// 声明队列的 TTL
args.put("x-message-ttl", 10000);
return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
}

// 声明队列 A 绑定 X 交换机
@Bean
public Binding queueABindingX(@Qualifier("queueA") Queue queueA,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}

// 声明队列 B ttl 为 40s 并绑定到对应的死信交换机
@Bean("queueB")
public Queue queueB() {
Map<String, Object> args = new HashMap<>(3);
// 声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
// 声明当前队列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
// 声明队列的 TTL
args.put("x-message-ttl", 40000);
return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
}

// 声明队列 B 绑定 X 交换机
@Bean
public Binding queueBBindingX(@Qualifier("queueB") Queue queue1B,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queue1B).to(xExchange).with("XB");
}

// 声明死信队列 QD
@Bean("queueD")
public Queue queueD() {
return new Queue(DEAD_LETTER_QUEUE);
}

// 声明死信队列 QD 绑定关系
@Bean
public Binding deadLetterBindingQDY(@Qualifier("queueD") Queue queueD,
@Qualifier("yExchange") DirectExchange yExchange) {
return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
}

7.4.3 消息生产者代码

  • controller/SendMsgController
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 发送延迟消息
*/
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {
@Resource
private RabbitTemplate rabbitTemplate;

/**
* 发消息
* http://localhost:8080/ttl/sendMsg?msg=hhh
* @param msg 消息
*/
@GetMapping("/sendMsg")
public void sendMsg(@RequestParam String msg) {
log.info("当前时间: {}, 发送一条信息给两个 TTL 队列: {}",
new Date().toString(), msg);

// 发送消息, 指定交换机和绑定键以及消息内容
rabbitTemplate.convertAndSend("X", "XA", "消息来自 ttl 为 10s" + msg);
rabbitTemplate.convertAndSend("X", "XB", "消息来自 ttl 为 40s" + msg);
}
}

7.4.4 消息消费者代码

  • consumer/DeadLetterQueueConsumer
1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* 队列 TTL 消费者
*/
@Slf4j
@Component
public class DeadLetterQueueConsumer {
// 接受消息(监听)
@RabbitListener(queues = "QD")
public void receiveD(Message message, Channel channel) throws Exception {
String msg = new String(message.getBody()); // 获取消息体
log.info("当前时间: {}, 收到死信队列的消息: {}", new Date().toString(), msg);
}
}

7.4.5 启动测试

  • 报错:SpringBoot 项目集成了 springfox。将SpringBoot更新到2.6.0后,应用程序无法启动并抛出异常
1
org.springframework.context.ApplicationContextException: Failed to start bean 'documentationPluginsBootstrapper'; nested exception is java.lang.NullPointerException
  • 解决:除了将 SpringBoot 降级到 2.5.7 外,还可以在 yaml 文件里添加配置如下:
1
2
3
4
spring:
mvc:
pathmatch:
matching-strategy: ANT_PATH_MATCHER
  • 访问:http://localhost:8080/ttl/sendMsg?msg=hhh 即可,过一段时间就能从死信队列里获取到消息
    • 第一条消息在 10S 后变成了死信消息,然后被消费者消费掉,第二条消息在 40S 之后变成了死信消息,然后被消费掉,这样一个延时队列就打造完成了

image-20240407205712499

7.5 延迟队列优化

如果像 7.4 这样使用,则每增加一个新的时间需求,就要新增一个队列,这里只有 10S 和 40S 两个时间选项,如果需要一个小时后处理,那么就需要增加 TTL 为一个小时的队列,如果是预定会议室然后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?

7.5.1 优化案例

允许生产者在发送每条消息时动态指定延迟时间。消息在延迟期满后才会被路由到绑定的队列中,消费者随后可以处理这些消息

  • 在 7.4.1 的基础上新增一个队列 QC,绑定关系如下,该队列不设置 TTL 时间

image-20240407210828320

7.5.2 配置文件类代码

  • 增加 QC
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Configuration
public class TtlQueueConfig {
// 新增普通队列 QC
public static final String QUEUE_C = "QC";

// 声明队列 C 并绑定到对应的死信交换机
@Bean("queueC")
public Queue queueC() {
Map<String, Object> args = new HashMap<>(3);
// 声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
// 声明当前队列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
}

// 声明队列 C 绑定 X 交换机
@Bean
public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueC).to(xExchange).with("XC");
}
}

7.5.3 消息生产者代码

  • convertAndSend方法的最后一个参数是一个MessagePostProcessor,它允许在消息发送前修改消息
    • 在这个MessagePostProcessor内部,可以设置消息的TTL为ttlTime,消息就会带有一个TTL属性,并且如果消息在TTL时间内未被消费,它将成为死信(前提是配置了相应的死信交换机和队列)
1
2
3
4
5
6
7
8
9
10
11
12
// 发消息, 设置 TTL
@GetMapping("/sendExpirationMsg")
public void sendMsg(@RequestParam String msg, @RequestParam String ttlTime) {
log.info("当前时间: {}, 发送一条时长 {} 毫秒 TTL 信息给队列 QC: {}",
new Date().toString(), ttlTime, msg);

rabbitTemplate.convertAndSend("X", "XC", msg, message -> {
// 设置发送消息的延迟时长
message.getMessageProperties().setExpiration(ttlTime);
return message;
});
}

7.5.4 启动测试

  • http://localhost:8080/sendExpirationMsg?msg=hello&ttlTime=10000

image-20240407213653456

7.5.6 存在问题(基于死信的延迟队列)

  • 如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡“,因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行
  • RabbitMQ的TTL检查机制:RabbitMQ 在处理带有 TTL 的消息时,只会检查队列头部(即第一个消息)是否已经过期。如果头部的消息还没有到达它的 TTL,那么 RabbitMQ 不会继续检查后面的消息。这意味着,只有当头部消息过期并被移除后,RabbitMQ 才会检查下一个消息是否过期

image-20240407222340941

  • 使用死信队列实现延迟队列时,需要通过消息TTL和死信交换机配置,消息到期后转入死信队列,存在检查顺序限制,可能导致消息延迟处理
    • 而使用延迟消息交换机插件,可直接在消息上设置延迟时间,插件负责延时后递送,操作更直接灵活,无需死信队列间接实现,解决了消息延迟处理的顺序问题

7.6 RabbitMQ 插件实现延迟队列

image-20240407222601585

7.6.1 安装延时队列插件

1
cp rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins

image-20240407220927691

  • 进入 RabbitMQ 的安装目录下的 plgins 目录,执行命令让该插件生效
1
2
3
4
cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins

# 插件生效
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

image-20240407221120860

  • 重启 RabbitMQ
1
systemctl restart rabbitmq-server.service
  • 验证:安装成功后,web 页面新增交换机,类型会多一个延迟消息的选项,由交换机来延迟

image-20240407221525820

7.6.2 案例

安装了延迟消息交换机插件后,就可以直接使用这个插件来实现消息的延迟发送,而不再需要通过死信队列的方式来实现延迟队列了

新增一个队列 delayed.queue,一个自定义交换机 delayed.exchange,绑定关系如下:

  • 生产者 -> 延迟交换机 -> 普通队列 -> 消费者

image-20240407222649600

7.6.3 配置文件类代码

  • 需要声明交换机、队列以及它们之间的绑定键
    • 声明交换机的时候,设置交换机是延迟类型的
    • @Qualifier注解用于指定自动装配(autowiring)时应注入哪个具体的bean,当有多个相同类型的bean可供选择时,@Qualifier("beanName")可以帮助指定注入哪一个
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Configuration
public class DelayedQueueConfig {
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";

// 声明队列
@Bean
public Queue delayedQueue() {
return new Queue(DELAYED_QUEUE_NAME);
}

// 声明自定义交换机 这里定义的是一个延迟交换机
@Bean
public CustomExchange delayedExchange() {
Map<String, Object> args = new HashMap<>();
// 自定义交换机的类型
args.put("x-delayed-type", "direct");
// 参数: 名称、类型、持久化标识、自动删除标识、参数
return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message",
true, false, args);
}

// 绑定
@Bean
public Binding bindingDelayedQueue(
@Qualifier("delayedQueue") Queue queue,
@Qualifier("delayedExchange") CustomExchange
delayedExchange) { // 这里@Qualifier()中就是方法名(默认的自定义名), 指定自动装配时应注入哪个具体的 bean
return BindingBuilder.bind(queue).to(delayedExchange)
.with(DELAYED_ROUTING_KEY).noargs();
}
}

7.6.4 消息生产者代码

  • 延迟时间 delayTime:是指消息从被发送到实际被投递到目标队列之间的等待时间
  • 存活时间 TTL:是指消息一旦到达目标队列后能在队列中存活的最大时间如果消息在这段TTL时间内没有被消费,那么它会被移除或转移到死信队列

简而言之,延迟时间控制消息发送到队列的延迟,而TTL控制消息在队列中的存活时间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 发消息, 基于插件
* @param msg 消息
* @param delayTime 延迟时间
*/
@GetMapping("/sendDelayMsg")
public void sendMsg(@RequestParam String msg, @RequestParam Integer delayTime) {
log.info("当前时间: {}, 发送一条时长 {} 毫秒 信息给延迟队列 QC: {}",
new Date().toString(), delayTime, msg);

rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME,
DelayedQueueConfig.DELAYED_ROUTING_KEY, msg, message -> {
// 设置发送消息的延迟时长
message.getMessageProperties().setDelay(delayTime);
return message;
});
}

7.6.5 消息消费者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 消息消费者
* 基于插件的延迟队列
*/
@Slf4j
@Component
public class DelayQueueConsumer {
public static final String DELAYED_QUEUE_NAME = "delayed.queue";

// 编写 rabbit 监听类
@RabbitListener(queues = DELAYED_QUEUE_NAME)
public void receiveDelayedQueue(Message message) {
String msg = new String(message.getBody());
log.info("当前时间:{},收到延时队列的消息:{}", new Date().toString(), msg);
}
}

7.6.6 测试

  • http://localhost:8080/ttl/sendDelayMsg?msg=hhh&delayTime=20000
  • http://localhost:8080/ttl/sendDelayMsg?msg=eee&delayTime=2000

image-20240408222411992

7.7 总结

  • 延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用 RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失
  • 延时队列还有很多其它选择,比如利用 Java 的 DelayQueue,利用 Redis 的 zset,利用 Quartz 或者利用 kafka 的时间轮,这些方式各有特点,看需要适用的场景

延迟队列在RabbitMQ中可以通过两种主要方式实现:基于死信队列和基于插件。

  • 基于死信队列:通过设置消息的TTL或队列长度限制,使消息在条件触发时转入死信队列,从而实现延迟效果。这种方法依赖于消息到期或队列状态改变,需要额外配置死信交换机和队列。
  • 基于插件:使用RabbitMQ的延迟消息交换机插件,直接在消息属性上设置延迟时间,使消息在指定时间后才被路由到目标队列。这种方式更直接,不依赖TTL导致的间接转移,适合需要精确控制延迟时间的场景。

消息延迟的实现机制和处理顺序的影响:

  • 基于死信队列:RabbitMQ只检查队列头部的消息是否过期。如果头部消息没有过期,即使后续消息已经到了过期时间,也不会被立即处理,直到它们移动到队列头部。这就可能导致即使设置了不同的TTL,消息处理也会受到队列头部消息状态的影响。
  • 基于插件每条消息的延迟是独立处理的,当消息的延迟时间到了,它就会被立即投递,不受队列中其他消息状态的影响。这样,即使后续消息的延迟时间较短,也能按预期时间被处理,不会因为队列中其他消息的状态而延误。

8. 发布确认高级

从 3.3.3 处埋的坑,之前 4.发布确认 中,发送消息后,生产者等待来自RabbitMQ服务器的确认,但是如果服务器宕机了呢?

之前我们的前提是假设RabbitMQ服务器是可达的,能够接收消息并返回确认或否认,如果RabbitMQ服务器不可用,生产者尝试发送的消息将无法到达RabbitMQ,这时候就需要一个额外的机制来确保消息不会丢失

  • 在生产环境中由于一些不明原因,导致 RabbitMQ 重启,在 RabbitMQ 重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复

  • 如何才能进行 RabbitMQ 的消息可靠投递呢?特别是在这样比较极端的情况,RabbitMQ 集群不可用的时候,无法投递的消息该如何处理呢?

8.1 发布确认 SpringBoot 版本

考虑两种情况:

  • 交换机接收不到消息:这种情况通常发生在消息从生产者发送到 RabbitMQ 时,由于RabbitMQ服务器宕机、网络问题或其他原因导致消息无法到达交换机。
  • 队列接收不到消息:即使消息成功到达了交换机,但因为某些原因(如队列不存在,或者路由规则导致消息无法路由到任何队列)消息不能被正确路由到目标队列。

8.1.6 将分析第一种情况

8.2 将分析第二种情况

8.1.1 确认机制方案

  • 在生产者端实现一个消息缓存机制,来暂存那些因RabbitMQ不可用而无法发送的消息
  • 生产者发送消息到直接交换机,一旦 mq 宕机了,或者生产者接收不到消息了,需要对消息进行缓存处理

image-20240409213649226

8.1.2 示例

  • 用 SpringBoot 实现 4.发布确认 的代码

image-20240409214011083

8.1.3 配置文件和配置类

  • 配置文件添加一行:取值设置为 correlated
    • **none**:禁用发布确认机制。
    • **correlated**:启用发布确认,当需要明确知道每条消息是否成功到达RabbitMQ时使用。这种模式允许将每个消息的确认与其发送操作相关联,通过CorrelationData对象的使用,可以知道具体是哪条消息收到了确认或未被确认。
    • **simple**:启用发布确认,但不需要CorrelationData对象
1
2
3
spring:
rabbitmq:
publisher-confirm-type: correlated
  • 配置交换机和队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Configuration
public class ConfirmConfig {
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
public static final String CONFIRM_ROUTING_KEY = "key1";

// 声明交换机
@Bean("confirmExchange")
public DirectExchange confirmExchange() {
return new DirectExchange(CONFIRM_EXCHANGE_NAME);
}

// 声明确认队列
@Bean("confirmQueue")
public Queue confirmQueue() {
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}

// 声明确认队列绑定关系
@Bean
public Binding queueBinding(@Qualifier("confirmQueue") Queue queue,
@Qualifier("confirmExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(CONFIRM_ROUTING_KEY);
}
}

8.1.4 消息生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@RestController
@RequestMapping("/confirm")
public class ProducerController {
@Resource
RabbitTemplate rabbitTemplate;

@GetMapping("/sendMsg")
public void sendMsg(@RequestParam String msg) {
// 指定消息 id 为 1
CorrelationData correlationData1 = new CorrelationData("1");

rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
ConfirmConfig.CONFIRM_ROUTING_KEY, msg + "key1", correlationData1);
log.info("发消息内容: {}", msg + "key1");

// 指定消息 id 为 2 ==> 绑定键错误
CorrelationData correlationData2 = new CorrelationData("2");

rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
ConfirmConfig.CONFIRM_ROUTING_KEY + "2", msg + "key12", correlationData2);
log.info("发消息内容: {}", msg + "key12");

}
}

8.1.5 回调接口

  • 使用@PostConstruct进行初始化

    • 使用@PostConstruct注解的init方法确保了只有在MyCallBack类实例化并RabbitTemplate注入完成后,才设置确认回调。这样可以避免在RabbitTemplate完全配置前误调用回调方法的风险

    • setConfirmCallback(this)的调用设置了当前实例(MyCallBack)作为消息确认的回调处理器。这意味着每当消息被发送到RabbitMQ服务器,并且服务器对消息进行了处理(无论是接受还是拒绝),都会调用实例的confirm方法。

  • CorrelationData 对象是在发送消息时由消息生产者创建并传递的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback {
// 注入
@Resource
RabbitTemplate rabbitTemplate;

// 类内部接口, 容器内也要有内部接口的实例
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
}

/**
* 交换机确认回调方法
* 发消息 交换机接收到了 回调
*
* @param correlationData 保存回调消息的 ID 及相关信息
* @param ack 交换机收到消息 => true, 没收到 => false
* @param cause 成功没有原因 null, 失败就是失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData != null ? correlationData.getId() : "";
if (ack) { // 成功
log.info("交换机已经收到 id 为:{}的消息", id);
} else { // 失败
log.info("交换机还未收到 id 为:{}消息,由于原因:{}", id, cause);
}
}
}

8.1.5 消息消费者

1
2
3
4
5
6
7
8
@Component
public class Consumer {
@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
public void receiveConfirmMsg(Message message) {
String msg = new String(message.getBody());
log.info("接受到队列 confirm.queue 消息: {}", msg);
}
}

8.1.6 测试

  • http://localhost:8080/confirm/sendMsg?msg=hhh

image-20240412135800282

  • 考虑到交换机接收不到消息的情况,测试下
    • 在生产者代码里修改交换机的名字为不存在的交换机,然后发送消息
    • 可以看到会回显失败和失败的原因

image-20240412141332905

  • 考虑到队列接收不到消息的情况,测试下
    • 在生产者代码里增加一个绑定键为错误的消息进行发送
    • 可以看到接收不到失败的回调,消费者(队列)只收到了一个消息,另一个没收到,但是它也没有应答,也没有确认,因为底层默认把消息丢失
    • 具体解决看 8.2

image-20240412141703506

8.2 回退消息

在 RabbitMQ 中,当消息无法被路由到任何队列时(即没有匹配的绑定键),默认情况下,这条消息会被交换机丢弃,而生产者不会得到任何通知。这种情况可能会导致数据丢失,特别是在生产者假设消息已成功发送的场景下

8.2.1 Mandatory 参数

当设置了mandatory参数后,如果消息在到达交换机后无法路由到任何队列,交换机不会静默丢弃这条消息。相反,消息会被返回给生产者。这个过程通常称为消息回退

8.2.2 配置文件

  • 在配置文件 yaml 里开启发布退回的配置
    • publisher-returns: true 在Spring Boot配置中确实起到了类似AMQP的mandatory标志的作用
    • 确保配置了ReturnCallback回调接口来处理退回的消息
1
2
3
spring:
rabbitmq:
publisher-returns: true

8.2.3 回调接口

  • 在 8.1.5 的回调接口代码上添加回退接口的实现:RabbitTemplate.ReturnsCallback
    • 注意也要注入改接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}

/**
* 当消息无法路由到任何队列时调用
* @param returned 封装了退回消息的详细信息
*/
@Override
public void returnedMessage(ReturnedMessage returned) {
String message = new String(returned.getMessage().getBody());

log.error("消息 {} 被交换机 {} 退回, 退回原因: {}, 路由 key: {}",
message,
returned.getExchange(),
returned.getReplyText(),
returned.getRoutingKey());
}

8.2.4 测试

  • 可以看到消息被回退了,还有回退的原因,保障即使消息发不到队列,消息也不丢失

image-20240412143809373

8.3 备份交换机

前面在设置死信队列的部分提到可以为队列设置死信交换机来存储那些处理失败的消息,可是这些不可路由消息根本没有机会进入到队列,因此无法使用死信队列来保存消息

备份交换机可以理解为某个主交换机的“备胎”。当一条消息发送到主交换机,但找不到任何合适的队列匹配时(即消息无法路由),这条消息将会被发送到配置的备份交换机。这样的机制确保消息在无法正常路由时不会被丢弃,而是有另一条路径可以走。

  • 使用场景

    • 错误处理和日志记录:通过备份交换机,无法路由的消息可以被统一收集和记录,用于后续的错误处理或者生成警报。

    • 系统可靠性增强:在高可靠性需求的系统中,备份交换机提供了一个简单的方法来保证消息至少在某个地方被处理,即使它们无法被正常的业务逻辑处理。

  • 备份交换机通常设置为Fanout类型,这是因为Fanout交换机会将接收到的所有消息广播到所有绑定的队列,确保无法路由的消息不会丢失。下面是如何配置备份交换机的步骤:

    • 声明备份交换机:首先,需要声明一个Fanout类型的备份交换机。

    • 配置主交换机:在声明主交换机时,设置一个额外的参数alternate-exchange,指向备份交换机。

    • 绑定队列:至少需要为备份交换机绑定一个队列,以确保从备份交换机接收消息。

8.3.1 示例

  • 增加备份交换机、备份队列和报警队列
    • 备份交换机设置为扇出类型,会广播消息到所有绑定的队列,可以实现备份和报警的功能

image-20240412154643543

8.3.2 修改配置类

  • 设置原先的确认交换机的备份交换机
  • 增加扇出类型的备份交换机,并分别和备份队列以及报警队列进行绑定
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 声明直接交换机, 并声明它的备份交换机
@Bean("confirmExchange")
public DirectExchange confirmExchange() {

return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME)
.durable(true)
.withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME)
.build();
}

// 备份交换机-扇出类型的
@Bean("backupExchange")
public FanoutExchange backupExchange() {
return new FanoutExchange(BACKUP_EXCHANGE_NAME);
}
// 备份队列
@Bean("backupQueue")
public Queue backupQueue() {
return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
}
// 备份队列
@Bean("warningQueue")
public Queue warningQueue() {
return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
}
// 绑定
@Bean
public Binding backupQueueBindingBackExchange(@Qualifier("backupQueue") Queue backupQueue,
@Qualifier("backupExchange") FanoutExchange backupExchange) {
return BindingBuilder.bind(backupQueue).to(backupExchange);
}
@Bean
public Binding warningQueueBindingBackExchange(@Qualifier("warningQueue") Queue warningQueue,
@Qualifier("backupExchange") FanoutExchange backupExchange) {
return BindingBuilder.bind(warningQueue).to(backupExchange);
}

8.3.3 报警消费者

1
2
3
4
5
6
7
8
9
@Component
public class WarningConsumer {

@RabbitListener(queues = ConfirmConfig.WARNING_QUEUE_NAME)
public void receiveWarningMsg(Message message) {
String msg = new String(message.getBody());
log.error("报警发现不可路由消息:{}", msg);
}
}

8.3.4 测试

  • 先在 web 界面删除之前的确认交换机,因为我们修改了其绑定属性,不然会报错

image-20240412161545389

  • 从结果也可以说明,在我们同时设置了回退消息和备份交换机的两种机制下,备份交换机的优先级更高些

9. 其他知识点

9.1 幂等性

9.1.1 概念

幂等性意味着执行多次和执行一次的效果完全相同。在业务操作如支付中,用户可能因为网络延迟等问题多次点击支付按钮,导致多次扣款。确保操作的幂等性可以防止这种情况,即使操作多次执行,也只有一次有效果生效。

9.1.2 消息重复消费

消费者在消费 MQ 中的消息时,MQ 已把消息发送给消费者,消费者在给 MQ 返回 ack 时网络中断,故 MQ 未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息。

9.1.3 解决思路

解决消息队列中的幂等性问题通常涉及到确保每条消息只被处理一次,即使它被多次接收。实现这一点的方法包括:

  • 使用全局唯一ID:每条消息都有一个全局唯一标识,消费者处理消息前先检查该消息ID是否已被处理。
  • 时间戳、UUID或业务ID:使用这些唯一标识来标记每个消息或事务。

9.1.4 消费端的幂等性保障

保障消息消费端的幂等性是通过确保每条消息只触发一次业务处理逻辑实现的。主流方法包括:

  • 唯一ID + 指纹码机制:结合消息内容或特定字段(如时间戳、用户ID等)生成一个指纹码,通过数据库主键去重或查询来防止重复处理。
    • 指纹码通常是根据消息内容或业务场景特征生成的一个唯一标识符。这个机制的实现通常依赖于数据库的能力来快速检查和插入这些唯一标识,确保操作的原子性和唯一性。这种方法的优点是实现简单,缺点是在高并发场景下可能会遇到数据库性能瓶颈。
  • 数据库事务和锁:利用数据库的事务和锁机制来确保操作只被成功执行一次。 ==> 推荐
    • Redis的SETNX命令(set if not exists)提供了一个天然的原子操作,可以用来实现幂等性。这个命令只有在键不存在时才设置键的值,如果键已存在,命令不做任何操作。这样,可以利用Redis来记录每个消息或操作的唯一标识,并确保重复的消息或操作不会被处理。

9.2 优先级队列

9.2.1 使用场景

  • 场景

    • 在电商平台中,例如天猫,订单支付提醒功能,即当用户在预定时间内未完成支付时,系统会自动发送提醒短信。

    • 对于不同级别的商家,如苹果或小米(被认为是大客户),他们的订单应当被优先处理,因为这些客户为平台带来较高的利润。

  • 问题:传统使用Redis实现的后端系统通过List结构支持的是基本的队列功能,无法处理复杂的优先级逻辑。当订单量大增时,需要一种机制来区分处理订单的优先级。

  • 解决方案:使用RabbitMQ的优先级队列功能来优化系统。这允许系统根据客户的重要性赋予订单不同的优先级,从而确保重要客户的订单可以得到更快的处理。

9.2.2 如何添加优先级

  • 设置队列的最大优先级 最大可以设置到255官网推荐 1-10 如果设置太高比较吃内存和 CPU

  • 要让队列实现优先级有如下步骤:

    • 队列需要设置为优先级队列,消息需要设置消息的优先级

    • 消费者需要等待消息已经发送到队列中才去消费,因为这样才有机会对消息进行排序

  • 两种方式添加队列的优先级

    • 控制台页面添加

    image-20240412184641755

    • 代码方式添加
    1
    2
    3
    Map<String, Object> params = new HashMap();
    params.put("x-max-priority", 10);
    channel.queueDeclare("hello", true, false, false, params);
  • 消息设置优先级

    • 注意消息的优先级数值需要在队列设置的优先级数值范围内,不能超过
1
2
AMQP.BasicProperties properties = new 
AMQP.BasicProperties().builder().priority(5).build();

9.2.3 实战

  • 消息生产者:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class Producer {
private static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception {
try (Channel channel = RabbitMqUtils.getChannel();) {
// 给消息赋予一个 priority 属性
AMQP.BasicProperties properties = new
AMQP.BasicProperties().builder().priority(5).build();
for (int i = 1; i <11; i++) {
String message = "info"+i;
if(i == 5){
channel.basicPublish("", QUEUE_NAME, properties, message.getBytes());
}else{
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
System.out.println("发送消息完成:" + message);
}
}
}
}
  • 消息消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Consumer {
private static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//设置队列的最大优先级 最大可以设置到 255 官网推荐 1-10 如果设置太高比较吃内存和 CPU
Map<String, Object> params = new HashMap();
params.put("x-max-priority", 10);
channel.queueDeclare(QUEUE_NAME, true, false, false, params);
System.out.println("消费者启动等待消费......");
DeliverCallback deliverCallback=(consumerTag, delivery)->{
String receivedMessage = new String(delivery.getBody());
System.out.println("接收到消息:"+receivedMessage);
};
channel.basicConsume(QUEUE_NAME,true,deliverCallback,(consumerTag)->{
System.out.println("消费者无法消费消息时调用,如队列被删除");
});
}
}

9.3 惰性队列

9.3.1 使用场景

默认情况 ==> 消息保存在内存

惰性队列 ==> 消息保存在磁盘

  • 惰性队列的设计旨在支持长队列的管理,允许存储大量消息而不对系统性能造成显著影响
    • 当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了
  • 问题解决
    • 在默认队列行为中,RabbitMQ尽可能将消息保留在内存中,以便快速传递给消费者。然而,这种模式在消息量大时会导致内存压力增大,系统可能需要执行耗时的内存到磁盘的数据交换操作,进而影响队列性能。
    • 惰性队列通过将消息直接存储到磁盘来解决这一问题,只有当消息需要被消费时,才加载到内存中。这极大地减少了内存的使用,使得RabbitMQ能够处理更长的队列

9.3.2 两种模式

  • **默认模式 default**:RabbitMQ的传统行为,消息尽可能保留在内存中,以便快速处理。适用于消息量不大,或需要极速处理的场景。

  • **惰性模式 lazy**:消息直接写入磁盘,仅在需要时加载到内存中,极大减轻了内存的压力,适用于处理大量或长队列的消息。

  • 设置方式

    • 可以在队列声明时通过 x-queue-mode 参数设置,如:

      1
      2
      3
      Map<String, Object> args = new HashMap<>();
      args.put("x-queue-mode", "lazy");
      channel.queueDeclare("myqueue", false, false, false, args);
    • 也可以通过Policy(策略)设置,策略设置具有比声明时更高的优先级

    • 已存在的队列需要先删除后再以新模式重新声明

image-20240412190534258

  • 惰性队列大幅减少了内存使用,从而降低了因内存不足而导致的性能问题。这使得RabbitMQ服务器能够在资源有限的环境下更加稳定地运行,尤其适合消息生产速度远大于消费速度的场景

10. RabbitMQ 集群

10.1 clustering

背景:单机RabbitMQ服务器在面对内存崩溃、电力故障或硬件故障等问题时容易出现服务中断,这在高可用性要求的生产环境中是不可接受的。

需求提升:随着业务规模的扩大,如需处理每秒达10万条消息的场景,单台服务器的处理能力明显不足,且高性能服务器成本过高。

解决方案:构建RabbitMQ集群,分散负载和增强冗余,以提高系统的可靠性和消息吞吐量。

10.2 镜像队列

问题:单个Broker节点的故障可以导致服务不可用和消息丢失,尽管消息持久化到磁盘可以减少数据丢失,但不足以处理节点故障引起的问题。

解决方案:通过使用镜像队列,将队列的副本存储在集群的其他节点上,确保主节点故障时可以无缝切换到镜像节点。