RabbitMQ入门指南(九):消费者可靠性

专栏导航

RabbitMQ入门指南

从零开始了解大数据


目录

专栏导航

前言

一、消费者确认机制

二、失败重试机制

三、失败处理策略

四、业务幂等性

1.通过唯一标识符保证操作的幂等性

2.通过业务判断保证操作的幂等性

总结


前言

RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了消费者确认机制、失败重试机制、失败处理策略、业务幂等性等内容。


当RabbitMQ向消费者投递消息后,了解消费者的处理状态是非常重要的。因为消息的投递并不代表消费者一定能够正确地消费这些消息,可能会出现各种故障:

  • 网络故障:在消息投递过程中,如果RabbitMQ和消费者之间的网络连接出现故障,可能会导致消息无法正确投递给消费者。
  • 消费者宕机:如果消费者在接收消息后突然宕机,那么消息可能无法被正确处理。
  • 消费者处理异常:消费者在接收到消息后,由于处理不当或者出现异常情况,可能会导致消息处理失败。

以上情况都可能导致消息丢失,因此RabbitMQ需要知道消费者的处理状态,以便在消息处理失败时重新投递消息。

一、消费者确认机制

RabbitMQ的消费者确认机制Consumer Acknowledgement)是一种确保消息被成功处理的机制。当消费者处理消息结束后,需要向RabbitMQ发送一个回执,以告知消息的处理状态。这个机制对于确保消息的可靠传递非常重要,因为它可以防止消息在消费者端处理失败而没有被正确处理的情况。

回执有三种可选值:

  • ACK(确认):表示消费者成功处理了消息,RabbitMQ会从队列中删除该消息。
  • NACK(否定确认):表示消息处理失败,RabbitMQ需要再次投递该消息。
  • REJECT(拒绝):表示消息处理失败并且被拒绝,RabbitMQ会从队列中删除该消息。

在实际应用中,一般使用ACK或NACK两种方式。REJECT方式的使用相对较少,通常只在消息格式存在问题,即存在开发错误的情况下使用。因此大多数情况下需要将消息处理的代码通过try catch机制捕获,消息处理成功时返回ACK,处理失败时返回NACK。

在consumer服务的application.yml文件中添加配置修改Spring AMQP的ACK处理方式 :

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto

RabbitMQ 支持三种不同的确认模式,这些模式通过acknowledge-mode属性进行配置:

  • none:关闭ACK。消费者接收到消息后不需要发送任何确认给发送者,发送者将继续发送下一条消息。在这种模式下,如果消费者处理消息失败,消息将会丢失,无法保证消息的可靠性。
  • manual:手动ACK。消费者接收到消息后需要手动发送确认给发送者,发送者才会继续发送下一条消息。在这种模式下,如果消费者处理消息失败,可以手动发送NACK给发送者,告诉发送者这条消息处理失败,以便发送者重新发送消息。这种模式可以保证消息的可靠性,但需要消费者手动处理确认和NACK。
  • auto:自动ACK。Spring AMQP提供了一种自动的消息确认机制。它利用AOP(面向切面编程)对消息处理逻辑做了环绕增强。当业务正常执行时,Spring AMQP会自动返回ACK。当业务出现异常时,根据异常判断返回不同结果:业务异常,自动返回NACK;消息处理或校验异常,自动返回REJECT。

二、失败重试机制

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。 如果消费者持续出现异常,消息会不断地在队列中重新排队并重新发送,这可能会导致消息处理延迟和队列持续增长,给系统带来不必要的压力。

为了解决这个问题,Spring框架提供了消费者失败重试机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到MQ队列 。

在consumer服务的application.yml文件中添加配置:

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true
          initial-interval: 1000ms
          multiplier: 1
          max-attempts: 3
          stateless: true
enabled: true开启消费者失败重试
initial-interval: 1000ms初始的等待时长
multiplier: 1每次重试的等待时长是上次等待时长的倍数
max-attempts: 3最大重试次数
stateless: truetrue表示重试是无状态的,即每次重试都是独立的,不会考虑之前的重试状态。如果业务中包含事务,需要改为false。

通过这样的配置,当消费者出现异常时,消息会在本地进行重试,而不是无限期地重新排队发送。在达到最大重试次数后,SpringAMQP会抛出AmqpRejectAndDontRequeueException异常,并将消息从队列中删除。这意味着最后一次处理消息的结果是失败的,并且消息不会被重新排队发送给消费者。

这种失败重试机制可以有效地减少消息处理的延迟和队列的增长,提高系统的稳定性和可用性。当然,在使用失败重试机制时,也需要考虑到业务逻辑和异常处理的合理性,避免因过度重试而导致的问题。

三、失败处理策略

在失败重试机制中,当达到最大重试次数后,消息会被直接丢弃。尽管这在某些场景中可能是可接受的,但对于那些对消息可靠性要求极高的业务来说,这显然是一个潜在的风险点。

Spring AMQP为此提供了强大的支持,允许开发人员自定义重试次数耗尽后的消息处理策略。这个策略是由MessageRecovery接口来定义的,它有三种不同的实现方式:

  • RejectAndDontRequeueRecoverer:当重试次数耗尽后,直接拒绝消息,并丢弃该消息。这是默认的处理方式。
  • ImmediateRequeueMessageRecoverer:当重试次数耗尽后,返回一个NACK给生产者,使消息重新入队,以便再次发送。
  • RepublishMessageRecoverer:当重试次数耗尽后,可以将失败的消息投递到一个指定的交换机和队列中,这个交换机和队列专门用来存放异常的消息。

在处理策略中,一种比较合适的方式是使用RepublishMessageRecoverer。当消息失败后,它会被投递到一个特定的、专门用于存放异常消息的队列中。这个队列可以由人工进行集中处理,使得开发人员可以更精细地处理和诊断问题。

在consumer服务中定义处理失败消息的交换机和队列:

    @Bean
    public DirectExchange errorExchange(){
        return new DirectExchange("error.direct");
    }

    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue");
    }

    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorExchange){
        return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");
    }

定义一个RepublishMessageRecoverer:

@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
    return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}

完整代码如下:

@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorConfiguration {

    @Bean
    public DirectExchange errorExchange() {
        return new DirectExchange("error.direct");
    }

    @Bean
    public Queue errorQueue() {
        return new Queue("error.queue");
    }

    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorExchange) {
        return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");
    }

    @Bean
    public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}

通过这样的配置,当消息在尝试多次重试后仍然失败时,它们会被自动投递到定义的异常消息队列中。这样就可以集中处理这些异常消息,进行进一步的诊断或处理。这种策略为开发人员在处理复杂分布式系统中的消息问题提供了一种更加专业和灵活的方式。

四、业务幂等性

在计算机科学和软件开发中,幂等性是一个重要的概念。简单来说,如果一个操作或函数不论执行一次还是多次,其结果都是相同的,那么称这个操作或函数是幂等的。在业务处理中,幂等性尤其关键。它可以保证系统的稳定性,确保在某些异常情况下,多次执行某个业务操作不会对业务状态产生不一致的结果。幂等性的重要性在于它能够避免因重复执行操作而产生的数据不一致、状态冲突等问题。在涉及金融交易、库存管理、用户认证等关键领域,幂等性是确保系统稳定和数据准确的重要前提。

1.通过唯一标识符保证操作的幂等性

为每个操作生成唯一的标识符(如ID),并在系统中跟踪这些标识符以检测重复操作。当接收到具有已知标识符的操作时,可以跳过重复的操作。Spring AMQP的MessageConverter自带了MessageID的功能,只要开启这个功能即可。

Jackson的消息转换器示例:

@Bean
public MessageConverter messageConverter(){
    // 定义消息转换器
    Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
    // 配置自动创建消息ID,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jjmc.setCreateMessageIds(true);
    return jjmc;
}

在publisher服务中编写测试类,并利用RabbitTemplate实现消息发送:

    @Test
    void testSendMessage2Queue() {
        String queueName = "demo.queue";
        String msg = "Idempotent Test";
        rabbitTemplate.convertAndSend(queueName, msg);
    }

运行测试用例,查看结果:

2.通过业务判断保证操作的幂等性

业务判断,是一种基于业务逻辑和状态的检查,以确定是否对重复的请求或消息进行处理。在多种业务场景中,这一策略的思路各有不同。

比如在支付订单的案例中,业务逻辑主要是支付并将订单状态从“未支付”修改为“已支付”(需要防止重复支付)。因此,在执行这一业务时,可以判断订单的状态是否为“未支付”。若状态不是“未支付”,则说明该订单已经被处理过,无需重复处理。与基于唯一标识符的方案相比,业务判断方案无需对原有数据库进行改造,因此更为推荐。

以支付修改订单的业务为例:

    @Override
    public void markOrderPaySuccess(Long orderId) {
        // 查询订单
        Order order = getById(orderId);
        // 判断订单状态,订单不存在或者订单状态不是1,放弃处理
        if (order == null || order.getStatus() != 1) {
            return;
        }
        // 尝试更新订单
        order.setStatus(2);
        order.setPayTime(LocalDateTime.now());
        orderService.updateById(order);
    }

以上代码示例判断和更新是两步动作 ,极小概率下可能存在线程安全问题,所以可以进行以下修改:

    @Override
    public void markOrderPaySuccess(Long orderId) {
        // UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1
        orderService.lambdaUpdate()
                .set(Order::getStatus, 2)
                .set(Order::getPayTime, LocalDateTime.now())
                .eq(Order::getId, orderId)
                .eq(Order::getStatus, 1)
                .update();
    }
 

总结

RabbitMQ是一个开源的消息队列软件,旨在提供可靠的消息传递和消息队列功能。本文主要介绍了消费者确认机制、失败重试机制、失败处理策略、业务幂等性等内容,希望对大家有所帮助。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/273123.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

EasyExcel实现动态表头(注解实现)

要实现上述动态头,按每日统计,每月统计,每年统计。而时间是一直变化,所以我们需要表头也一直动态生成。 首先,我们需要定义所需要实体类 public class CountDayData {ExcelProperty(value "业务员姓名")p…

test-03-java 单元测试框架 testNG 入门介绍 junit/junit5/testNG 详细对比

拓展阅读 test-01-java 单元测试框架 junit 入门介绍 test-02-java 单元测试框架 junit5 入门介绍 test-03-java 单元测试框架 testNG 入门介绍 junit/junit5/testNG 详细对比 test assert-01-Google Truth 断言 test 系统学习-03-TestNG Spock testng 入门使用教程 开源…

第四周:机器学习知识点回顾

前言: 讲真,复习这块我是比较头大的,之前的线代、高数、概率论、西瓜书、樱花书、NG的系列课程、李宏毅李沐等等等等…那可是花了三年学习佳实践下来的,现在一想脑子里就剩下几个名词就觉得废柴一个了,朋友们有没有同感…

【华为OD机试真题2023CD卷 JAVAJS】开源项目热榜

华为OD2023(C&D卷)机试题库全覆盖,刷题指南点这里 开源项目热榜 时间限制:1s 空间限制:256MB 限定语言:不限 题目描述: 某个开源社区希望将最近热度比较高的开源项目出一个榜单,推荐给社区里面的开发者。对于每个开源项目,开发者可以进行关注(watch)、收藏(star)、…

IDEA 控制台中文出现乱码问题解决

一、问题概述 请看下图 二、问题分析 IDEA控制台输出乱码一般会有三种来源: ① IDEA本身编码错误 ② Tomcat日志输出编码错误 ③ 项目本身原因。 终极原因:IDEA编码和Tomcat编码不一致,统一设置为UTF-8即可。 三、解决思路 修改…

敏捷开发 - 知识普及

敏捷开发- Scrum 前言 知乎有一篇文章描写Scrum,我觉得比较好:https://zhuanlan.zhihu.com/p/631459977 简单科普下PM和PMO 原文来源:https://zhuanlan.zhihu.com/p/546820914 PM - 项目经理(Project Manager) ​ 需要具备以下能力 ​ 1.号召力 2.影响力 3.交流能力 4.应…

万用表测接地电阻方法

万用表测接地电阻方法 用万用表在不同土质的土壤对接地电阻进行了实验,并将万用表所测数据和专用接地电阻测试仪所测数据进行了比较,两者十分接近。具体测量方法如下: 找两根8mm、1m长的圆钢,将其一端磨尖作为辅助测试棒&#x…

熊猫目标检测数据集VOC格式200张

熊猫,又名大熊猫,是中国珍稀特有的保护动物,被誉为“国宝”,具有极高的观赏价值。它们生活在中国中部的山区,包括四川、甘肃和陕西等地。熊猫是一种大型的食草动物,主要以竹子为食,也偶尔进食其…

调用delay_ms函数进入hardfault_handler处理硬件错误中断

一、大多是情况下hardfault_handler处理硬件错误中断的解决办法 1.检查代码中是否有指针未初始化或者越界访问的情况。 2.检查是否有堆栈溢出的情况,可以通过增加堆栈大小或者减少函数调用深度来解决。 3.检查是否有中断优先级设置不当的情况,可以通过…

数据治理之主数据管理

文章目录 一、主数据管理概述什么是主数据什么是主数据管理主数据管理的意义打破孤岛, 提升数据质量统一认知, 提升业务效率集中管控, 提升管理效能数据驱动, 提升决策水平 二、主数据管理方法摸家底建体系接数据数据接入数据清洗…

Maven之插件入门

官方文档&#xff1a;https://maven.apache.org/guides/plugin/guide-java-plugin-development.html 命名规范 <yourplugin>-maven-plugin 创建项目 生成项目 方式一、IDEA 2023 方式二、命令行 mvn archetype:generate -DgroupIdcn.lsj -DartifactIdhello-maven-pl…

Redis Streams在Spring Boot中的应用:构建可靠的消息队列解决方案【redis实战 二】

欢迎来到我的博客&#xff0c;代码的世界里&#xff0c;每一行都是一个故事 Redis Streams在Spring Boot中的应用&#xff1a;构建可靠的消息队列解决方案 引言前言Redis Streams的基本概念和特性1. 日志数据结构2. 消息和字段3. 消费者组4. 消息ID5. 实时和历史数据处理6. 性能…

DVWA靶场中的xss-反射型xss、存储型xss的low、medium、high的详细通关方法

目录 1.DVWA反射型xss &#xff08;1&#xff09;Low&#xff1a; &#xff08;2&#xff09;Medium&#xff1a; &#xff08;3&#xff09;Heigh 2.xss存储型 &#xff08;1&#xff09;Low&#xff1a; &#xff08;2&#xff09;Medium &#xff08;3&#xff09;He…

词法语法语义分析程序设计及实现,包含出错提示和错误恢复

词法说明 (1)关键字 main, int, char, if, else, for, while, void (2)运算符 - * / < < > > ! (3)界符 ; ( ) { } (4)标识符 ID letter(letter|digit)* (5)整型常数 NUM digit digit* (6)空格 ‘ ‘ ‘\n’ ‘\r’ ‘\t’ 空格用来分隔ID,NUM,运算符,界…

idea自动注释

前言 保存一下自己的自动注释代码 idea自动注释 前言1 创建类时&#xff0c;自动生成注释2 在方法上使用快捷键生成注释3 使用方法4 效果图 1 创建类时&#xff0c;自动生成注释 如下&#xff1a; #if (${PACKAGE_NAME} && ${PACKAGE_NAME} ! "")package …

亚马逊美国站ASTM F2613儿童折叠椅和凳子强制性安全标准

ASTM F2613折叠椅和凳子安全标准 美国消费品安全委员会&#xff08;CPSC&#xff09;发布的ASTM F2613儿童折叠椅和凳子的强制性安全标准&#xff0c;已于2020年7月6日生效&#xff0c;并被纳入联邦法规《16 CFR 1232儿童折叠椅和凳子安全标准》。 亚马逊要求在美国站上架的儿…

数据库基础面试第三弹

1. mysql数据库四种常见数据库引擎 1. MyISAM&#xff1a; MyISAM是MySQL最早的数据库引擎之一。它被设计成处理大量的插入和查询操作。MyISAM表格的数据存储在三个文件上&#xff1a;.frm文件存储表结构&#xff0c;.MYD文件存储数据&#xff0c;.MYI文件存储索引。MyISAM表…

【2023年12月18日-12月25日】一周AI咨询更新

上周&#xff0c;关于Google的Bard和Midjourney v6的讨论异常火热。 接下来&#xff0c;让我们回顾一下上周那些引人注目的AI新闻。 ① 已近乎真实拍摄&#xff1a;Midjourney v6的画质令人惊叹 由Midjourney v6制作的图片&#xff0c;质量之高&#xff0c;媲美电影级别&…

Spring高手之路-SpringBean的生命周期

目录 SpringBean的生命周期 整体介绍 详细介绍 1.实例化Bean 2.设置属性值 3.检查Aware 4.调用BeanPostProcessor的前置处理方法 5.调用InitializingBean的afterPropertiesSet方法 6.调用自定义init-method方法 7.调用BeanPostProcessor的后置处理方法 8.注册Destru…

【小黑嵌入式系统第十三课】PSoC 5LP第二个实验——中断控制实验

上一课&#xff1a; 【小黑嵌入式系统第十二课】μC/OS-III程序设计基础&#xff08;二&#xff09;——系统函数使用场合、时间管理、临界区管理、使用规则、互斥信号量 文章目录 1 实验目的2 实验要求3 实验设备4 实验原理4.1 中断(1) 中断机制概述(2) 中断源(3) 中断系统的功…
最新文章