RabbitMQ-6.延迟消息

延迟消息

  • 6.延迟消息
    • 6.1.死信交换机和延迟消息
      • 6.1.1.死信交换机
      • 6.1.2.延迟消息
      • 6.1.3.总结
    • 6.2.DelayExchange插件
      • 6.2.1.下载
      • 6.2.2.安装
      • 6.2.3.声明延迟交换机
      • 6.2.4.发送延迟消息

6.延迟消息

在电商的支付业务中,对于一些库存有限的商品,为了更好的用户体验,通常都会在用户下单时立刻扣减商品库存。例如电影院购票、高铁购票,下单后就会锁定座位资源,其他人无法重复购买。

但是这样就存在一个问题,假如用户下单后一直不付款,就会一直占有库存资源,导致其他客户无法正常交易,最终导致商户利益受损!

因此,电商中通常的做法就是:对于超过一定时间未支付的订单,应该立刻取消订单并释放占用的库存

例如,订单支付超时时间为30分钟,则我们应该在用户下单后的第30分钟检查订单支付状态,如果发现未支付,应该立刻取消订单,释放库存。

但问题来了:如何才能准确的实现在下单后第30分钟去检查支付状态呢?

像这种在一段时间以后才执行的任务,我们称之为延迟任务,而要实现延迟任务,最简单的方案就是利用MQ的延迟消息了。

在RabbitMQ中实现延迟消息也有两种方案:

  • 死信交换机+TTL
  • 延迟消息插件

这一章我们就一起研究下这两种方案的实现方式,以及优缺点。

6.1.死信交换机和延迟消息

首先我们来学习一下基于死信交换机的延迟消息方案。

6.1.1.死信交换机

什么是死信?

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用basic.rejectbasic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息满了,无法投递

如果一个队列中的消息已经成为死信,并且这个队列通过**dead-letter-exchange**属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机(Dead Letter Exchange)。而此时加入有队列与死信交换机绑定,则最终死信就会被投递到这个队列中。

死信交换机有什么作用呢?

  1. 收集那些因处理失败而被拒绝的消息
  2. 收集那些因队列满了而被拒绝的消息
  3. 收集因TTL(有效期)到期的消息

6.1.2.延迟消息

前面两种作用场景可以看做是把死信交换机当做一种消息处理的最终兜底方案,与消费者重试时讲的RepublishMessageRecoverer作用类似。

而最后一种场景,大家设想一下这样的场景:
如图,有一组绑定的交换机(ttl.fanout)和队列(ttl.queue)。但是ttl.queue没有消费者监听,而是设定了死信交换机hmall.direct,而队列direct.queue1则与死信交换机绑定,RoutingKey是blue:
image.png

假如我们现在发送一条消息到ttl.fanout,RoutingKey为blue,并设置消息的有效期为5000毫秒:
image.png

注意:尽管这里的ttl.fanout不需要RoutingKey,但是当消息变为死信并投递到死信交换机时,会沿用之前的RoutingKey,这样hmall.direct才能正确路由消息。

消息肯定会被投递到ttl.queue之后,由于没有消费者,因此消息无人消费。5秒之后,消息的有效期到期,成为死信:
image.png
死信被再次投递到死信交换机hmall.direct,并沿用之前的RoutingKey,也就是blue
image.png
由于direct.queue1hmall.direct绑定的key是blue,因此最终消息被成功路由到direct.queue1,如果此时有消费者与direct.queue1绑定, 也就能成功消费消息了。但此时已经是5秒钟以后了:
image.png
也就是说,publisher发送了一条消息,但最终consumer在5秒后才收到消息。我们成功实现了延迟消息

6.1.3.总结

注意:
RabbitMQ的消息过期是基于追溯方式来实现的,也就是说当一个消息的TTL到期以后不一定会被移除或投递到死信交换机,而是在消息恰好处于队首时才会被处理。
当队列中消息堆积很多的时候,过期消息可能不会被按时处理,因此你设置的TTL时间不一定准确。

6.2.DelayExchange插件

基于死信队列虽然可以实现延迟消息,但是太麻烦了。因此RabbitMQ社区提供了一个延迟消息插件来实现相同的效果。
官方文档说明:
Scheduling Messages with RabbitMQ | RabbitMQ - Blog

6.2.1.下载

插件下载地址:
GitHub - rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ
由于我们安装的MQ是3.8版本,因此这里下载3.8.17版本:
image.png

6.2.2.安装

因为我们是基于Docker安装,所以需要先查看RabbitMQ的插件目录对应的数据卷。

docker volume inspect mq-plugins

结果如下:

[
    {
        "CreatedAt": "2024-06-19T09:22:59+08:00",
        "Driver": "local",
        "Labels": null,
        "Mountpoint": "/var/lib/docker/volumes/mq-plugins/_data",
        "Name": "mq-plugins",
        "Options": null,
        "Scope": "local"
    }
]

插件目录被挂载到了/var/lib/docker/volumes/mq-plugins/_data这个目录,我们上传插件到该目录下。

接下来执行命令,安装插件:

docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange

运行结果如下:
image.png

6.2.3.声明延迟交换机

基于注解方式:

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "delay.queue", durable = "true"),
        exchange = @Exchange(name = "delay.direct", delayed = "true"),
        key = "delay"
))
public void listenDelayMessage(String msg){
    log.info("接收到delay.queue的延迟消息:{}", msg);
}

基于@Bean的方式:

@Slf4j
@Configuration
public class DelayExchangeConfig {

    @Bean
    public DirectExchange delayExchange(){
        return ExchangeBuilder
                .directExchange("delay.direct") // 指定交换机类型和名称
                .delayed() // 设置delay的属性为true
                .durable(true) // 持久化
                .build();
    }

    @Bean
    public Queue delayedQueue(){
        return new Queue("delay.queue");
    }
    
    @Bean
    public Binding delayQueueBinding(){
        return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");
    }
}

6.2.4.发送延迟消息

发送消息时,必须通过x-delay属性设定延迟时间:

@Test
void testPublisherDelayMessage() {
    // 1.创建消息
    String message = "hello, delayed message";
    // 2.发送消息,利用消息后置处理器添加消息头
    rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            // 添加延迟消息属性
            message.getMessageProperties().setDelay(5000);
            return message;
        }
    });
}

注意:
延迟消息插件内部会维护一个本地数据库表,同时使用Elang Timers功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的CPU开销,同时延迟消息的时间会存在误差。
因此,不建议设置延迟时间过长的延迟消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/376930.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

SpringBoot:web开发

web开发demo:点击查看 LearnSpringBoot05Web 点击查看更多的SpringBoot教程 技术摘要 webjarsBootstrap模板引擎thymeleaf嵌入式Servlet容器注册web三大组件 一、webjars webjars官网 简介 简介翻译 WebJars 是打包到 JAR(Java Archive)…

2024.02 国内认知大模型汇总

概述 大模型,又称为大规模机器学习模型,是一种基于大数据的人工智能技术。它通过深度学习和机器学习的方法,对大量数据进行训练,以实现对复杂问题的高效解决。大模型技术在语音识别、图像识别、自然语言处理等领域有着广泛的应用…

常用对象和常用成员函数

常量对象与常量成员函数来防止修改对象,实现最低权限原则。 在Obj被定义为常量对象的情况下,下面这条语句是错误的。 错误的原因是常量对象一旦初始化后,其值就再也不能改变。因此,不能通过常量对象调用普通成员函数,因…

ArcGIS学习(五)坐标系-2

3.不同基准面坐标系之间的转换 在上一关中,我们学习了ArcGIS中的投影(投影栅格)工具,并以"WGS1984地理坐标系与WGS1984的UTM投影坐标系的转换”为例进行讲解。 "WGS1984地理坐标系与WGS1984的UTM投影坐标系的转换”代表的是同一个基准面下的两个坐标的转换。 …

【力扣】查找总价格为目标值的两个商品,双指针法

查找总价格为目标值的两个商品原题地址 方法一:双指针 这道题和力扣第一题“两数之和”非常像,区别是这道题已经把数组排好序了,所以不考虑暴力枚举和哈希集合的方法,而是利用单调性,使用双指针求解。 考虑数组pric…

洛希极限

L1-3 洛希极限 分数 10 作者 陈越 单位 浙江大学 科幻电影《流浪地球》中一个重要的情节是地球距离木星太近时,大气开始被木星吸走,而随着不断接近地木“…

【数据结构】链表OJ面试题2《分割小于x并排序链表、回文结构、相交链表》+解析

1.前言 前五题在这http://t.csdnimg.cn/UeggB 休息一天,今天继续刷题! 2.OJ题目训练 1. 编写代码,以给定值x为基准将链表分割成两部分,所有小于x的结点排在大于或等于x的结点之前 。链表分割_牛客题霸_牛客网 思路 既然涉及…

春节回家坐飞机后助听器就不好用了?如何过安检、拖运?

春节即将来临,很多人都要乘坐飞机回家或者出游。如果你是一位助听器使用者,你可能会有一些疑问:坐飞机能戴助听器吗?助听器会不会受到安检设备的影响?直接将助听器放在传送带上可以吗?……别担心&#xff0…

Harbor介绍、整体架构和安装

Harbor介绍、整体架构和安装 文章目录 Harbor介绍、整体架构和安装1.Harbor介绍2.Harbor 整体架构3.安装Harbor3.1 主机初始化3.1.1 设置ip地址3.1.2 配置镜像源3.1.3 关闭防火墙3.1.4 禁用SELinux3.1.5 禁用swap3.1.6 设置时区 3.2 安装docker3.3 安装docker compose3.4 下载H…

【JS逆向一】逆向某站的 加密参数算法--仅供学习参考

逆向日期:2024.02.06 使用工具:Node.js 文章全程已做去敏处理!!! 【需要做的可联系我】 可使用AES进行解密处理(直接解密即可):在线AES加解密工具 1、打开某某网站(请使用文章开头的…

记录 | linux下切换python版本

查看系统中存在的 python 版本 ls /usr/bin/python* 查看系统默认的 python 版本 python --version

金融信贷风控特征计算详解

特征的含义? 特征可以说是风控系统中的最小单元,是风控工具的重要组成部分,我们也可以理解成变量。不过叫什么问题不大,团队内有相同的共识就行。 风控特征是我们做数字化线上风控中的重要组成部分,几乎可以说没有风…

【Flink入门修炼】1-2 Mac 搭建 Flink 源码阅读环境

在后面学习 Flink 相关知识时,会深入源码探究其实现机制。因此,需要现在本地配置好源码阅读环境。 本文搭建环境: Mac M1(Apple Silicon)Java 8IDEAFlink 官方源码 一、 下载 Flink 源码 github 地址:h…

[每周一更]-(第85期):NLP-实战操作-文本分类

NLP文本分类的应用场景 医疗领域 - 病历自动摘要: 应用: 利用NLP技术从医疗文档中自动生成病历摘要,以帮助医生更快速地了解患者的状况。 法律领域 - 法律文件分类: 应用: 使用文本分类技术自动分类法律文件&#xf…

Mysql的sql优化

一.查询优化 我们都知道,在建立索引的时候,要考虑where后面的查询条件字段、order by 排序后面的字段 、group by 分组排序后面的字段,对他们的字段建立合适的索引,但是我们需要思考怎么建立合适的索引,或者建立索引之…

计算机网络-华为无线网络配置

前面已经大致了解了无线通信的原理和无线组网的概念,今天来学习无线的配置过程与步骤。 一、无线组网配置流程 在开始配置前复习下前面讲过无线组网有涉及几个设备,AC无线控制器、AP无线接入点、POE交换机。无线组网与有线组网是相对独立的,不…

10:LED点阵显示汉字

LED点阵显示汉字 1、字模2、横向取模 1、字模 (1)如何记录组成字的LED点阵亮灭信息(16x16点阵一共有256点,显示一个特定的字需要其中有些点亮而另一些不亮,如何记录哪些点亮哪些点不亮?用字模)字模如何工作?256个点用…

机器学习 | 揭示EM算法和马尔可夫链的实际应用

目录 初识EM算法 马尔可夫链 HMM模型基础 HMM模型使用 初识EM算法 EM算法是一种求解含有隐变量的概率模型参数的迭代算法。该算法通过交替进行两个步骤:E步骤和M步骤,从而不断逼近模型的最优参数值。EM算法也称期望最大化算法,它是一个基…

负重20kg复合翼垂直起降无人机应用,复合翼无人机技术分析

主要任务应用 1.管线巡查 挂载可见光/红外二合一光电载荷和小型SAR设备,对既定线路进行昼夜巡视侦察,利用图像实时传回指挥控制中心,可用于石油管路、电力线路、舰艇航线及周围态势感知,利于依据现场实情进行战略决策和指令传达…
最新文章