消息可靠性保证

回顾RabbitMQ的消息传递过程

image.png
如图所示,发生消息丢失的可能阶段也就是生产者发送消息,时rabbitmq存储消息时,消费者消费消息时。
项目源码:gitee

生产者发送消息阶段

  1. 生产者发送消息时把交换机名写错
  2. 生产者发送消息时把routingKey写错

RabbitMQ存储消息阶段

默认情况下rabbitmq会把消息存储到内存中,如果在消费者消费消息之前,rabbitmq服务器宕机了,内存就会被释放,消息就会丢失

消费者消息消息阶段

消费者在获取到消息以后,就会自动给rabbitmq服务端返回一个ack标志,rabbitmq服务端就会把这个消息从队列中删除。但当消费者获取到消息以后,准备进行业务逻辑处理时消费者宕机了,相当于该消息没有被消费成功,即消息丢失。

因此,我们就针对以上3个阶段,分别解决

生产者保证消息不丢失

  1. 生产者确认机制:可以让生产者感知到消息是否正常发送给交换机
  2. 生产者回退机制:可以让生产者感知到消息是否正常发送给队列

生产者确认机制

image.png

  1. 首先准备好环境,交换机,队列,绑定信息。
package com.example.rabbitmqreliable.demos;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {
    public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
    public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
    public static final String CONFIRM_ROUTING_KEY = "key1";

    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange(CONFIRM_EXCHANGE_NAME);
    }
    @Bean
    public Queue confirmQueue() {
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }
    @Bean
    public Binding confirmBind(@Qualifier("directExchange") DirectExchange confirmExchange,
                               @Qualifier("confirmQueue") Queue confirmQueue) {
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);

    }
}

  1. 配置文件
spring.rabbitmq.host=101.133.141.75
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=ConFirm
# 开启生产者确认机制,当消费者成功处理这个消息时,会向生产者发送一个确认信号,
# 告诉生产者这个消息已经被成功消费了。
# 如果生产者在一定时间内没有收到确认信号,就会重新发送这个消息。
spring.rabbitmq.publisher-confirm-type=correlated
  1. 通过测试类,创建生产者发送消息
package com.example.rabbitmqreliable;

import com.example.rabbitmqreliable.demos.RabbitMQConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class ProviderTests {
    /**
     * 目标:让生产者获取到rabbitmq服务返回的ack或nack
     * 做法:rabbitTemplate需要绑定对应的回调函数
     * 分析:目前的rabbitTemplate是spring托管的,并没有对应的回调函数,需要自定义
     * 实施:需要自定义rabbitTemplate,并注入到spring容器中
     * 一旦我们在spring容器中配置了一个rabbitTemplate,
     * 那么spring boot就不会对rabbitTemplate进行自动化配置
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;
    public void test1() {
        rabbitTemplate.convertAndSend(RabbitMQConfig.CONFIRM_EXCHANGE_NAME, RabbitMQConfig.CONFIRM_ROUTING_KEY, "HELLO CONFIRM");
    }

}
  1. 自定义rabbitTemplate,实现确认机制的回调方法,需要在RabbitMQConfig文件中添加以下内容:
/**
     * ConnectFactory由spring boot根据配置文件中的连接信息实现自动化配置
     * 即在spring容器中直接存在了ConnectionFactory对象
     * @param connectionFactory
     * @return
     */
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        // 设置回调函数
        // 而ConfirmCallBack是一个接口,需要一个类去实现他
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * 当rabbitmq服务端给生产者放回ack/nack时会执行该方法
             * @param correlationData 消息的id,内容
             * @param ack 消息是否发送成功
             * @param cause 原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if(ack) {
                    System.out.println("消息正常发送给交换机");
                }else {
                    System.out.println("消息没有正常发送给交换机,cause" + cause);
                    // TODO 处理方案:再次发送消息给rabbitmq,需要获取消息内容
                }
            }
        });

      return rabbitTemplate;
    }
  1. 实现当消息发送失败时,再次重新发送部分。
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * 当rabbitmq服务端给生产者放回ack/nack时会执行该方法
             * @param correlationData 消息的id,内容
             * @param ack 消息是否发送成功
             * @param cause 原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if(ack) {
                    System.out.println("消息正常发送给交换机");
                }else {
                    System.out.println("消息没有正常发送给交换机,cause" + cause);
                    // 处理方案:再次发送消息给rabbitmq,需要获取消息内容
                    //方案1: 立马拿着id去数据库查消息
                    // 方案2:通过定时任务重新发送
                    String msgId = correlationData.getId();
                    System.out.println("msgId" + msgId);
                    // 规定消息的最大发送次数3次,发送消息前判断实际发送次数是否大于最大发送次数,如果大于就不进行重新发送,并设置status=2

                }
            }
        });
 @Test
    public void test1() {
        // 发送消息前把消息写入数据库,并分配唯一id(如果发送失败,可以拿这个id去查数据库,重新发送)
        // 并且还要记录消息实际发送次数,以及消息状态。当超过发送次数超过了规定值,就设置消息的status为2,发送成功设置消息状态为1
        String msgId = UUID.randomUUID().toString().replace("-","");
        CorrelationData correlationData =new CorrelationData(msgId);
        rabbitTemplate.convertAndSend(RabbitMQConfig.CONFIRM_EXCHANGE_NAME, RabbitMQConfig.CONFIRM_ROUTING_KEY + "error", "HELLO CONFIRM", correlationData);
    }

image.png

生产者回退机制

image.png

  1. 需要在配置文件中开启生产者回退机制
# 开启生产者确认机制,
spring.rabbitmq.publisher-returns=true
  1. 给rabbitTemplate绑定生产者回退机制的回调函数
/**
         * 给rabbitTemplate绑定回退机制的回调函数
         * ReturnCallback是一个接口,使用匿名内部类实现
         * 该方法被调用的概率极低,因为从交换机到队列的过程是rabbitmq内部实现的
         * 如果会出错,咱们也不会用他
         */
        rabbitTemplate.setMandatory(true);//让rabbitmq服务把失败信息回传给生产者
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            // 当消息没有正常转发给队列的时候被调用
            @Override
            public void returnedMessage(ReturnedMessage returnedMessage) {
                byte[] body = returnedMessage.getMessage().getBody();
                String msg = new String(body);
                System.out.println("msg:" + msg);
            }
        });
  1. 执行测试方法
package com.example.rabbitmqreliable;

import com.example.rabbitmqreliable.demos.RabbitMQConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.UUID;

@SpringBootTest
public class ProviderTests {
    /**
     * 目标:让生产者获取到rabbitmq服务返回的ack或nack
     * 做法:rabbitTemplate需要绑定对应的回调函数
     * 分析:目前的rabbitTemplate是spring托管的,并没有对应的回调函数,需要自定义
     * 实施:需要自定义rabbitTemplate,并注入到spring容器中
     * 一旦我们在spring容器中配置了一个rabbitTemplate,
     * 那么spring boot就不会对rabbitTemplate进行自动化配置
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void test1() {
        // 发送消息前把消息写入数据库,并分配唯一id(如果发送失败,可以拿这个id去查数据库,重新发送)
        // 并且还要记录消息实际发送次数,以及消息状态。当超过发送次数超过了规定值,就设置消息的status为2,发送成功设置消息状态为1
        String msgId = UUID.randomUUID().toString().replace("-","");
        CorrelationData correlationData =new CorrelationData(msgId);
        // 把routingKey写错
        rabbitTemplate.convertAndSend(RabbitMQConfig.CONFIRM_EXCHANGE_NAME, RabbitMQConfig.CONFIRM_ROUTING_KEY + "404", "HELLO CONFIRM", correlationData);
    }

}

image.png

RabbitMQ保证消息不丢失

  1. 对交换机进行持久化
  2. 对队列进行持久化
  3. 对消息进行持久化

消费者保证消息不丢失

Spring Boot整合RabbitMQ消费者的应答模式:

  • none:自动应答,消费者获取到消息后直接给rabbitmq返回ack
  • auto(默认值):由spring boot框架根据业务执行特点决定给rabbitmq返回ack还nack,业务正常执行完毕返回ack,业务执行过程产生异常,返回nack
  • manual:手动应答,由程序员自己根据业务执行特点给rabbitmq返回对应的ack或nack

使用none模式

  1. 在配置文件中设置消费者应答模式
# 消费者的应答模式
spring.rabbitmq.listener.simple.acknowledge-mode=none
  1. 设置消费者
@Component
public class Consumer {
	@RabbitListener(queues = RabbitMQConfig.CONFIRM_QUEUE_NAME) // 监听的队列
	public void consumerListener(Message message) {
		byte[] body = message.getBody();;
		String msg = new String(body);
		// 进行业务处理
		int a = 1 / 0; //产生异常
		System.out.println("[consumerListener],msg:" + msg);

	}
}
  1. 启动项目,再运行测试类

image.png
image.png
由于我们使用none自动应答模式,消费者给rabbitmq返回ack,rabbitmq直接把消息从队列中删除,导致消息丢失

使用auto模式

修改配置文件中的应答方式为auto,以及引伸出来的其他配置项

# 消费者的应答模式
spring.rabbitmq.listener.simple.acknowledge-mode=auto
# 开启重试机制
spring.rabbitmq.listener.simple.retry.enabled=true
# 最大重试次数,否则会无限重试下去
spring.rabbitmq.listener.simple.retry.max-attempts=3
# 初始化的重试时间间隔
spring.rabbitmq.listener.simple.retry.initial-interval=1000
# 最大的重试时间间隔
spring.rabbitmq.listener.simple.retry.max-interval=5000
# 乘子(计算每一次时间间隔):1s->2s->4s->5s
spring.rabbitmq.listener.simple.retry.multiplier=2

此时控制台报了3次错误,队列没有消息,消息还是丢失了。

auto模式需要设置最大重试次数,否则会死循环,但是又无法判断最大重试次数是多少

使用manual模式

  1. 修改配置文件中的应答方式为manual,将auto模式中延申出来的配置项注释掉
spring.rabbitmq.listener.simple.acknowledge-mode=manual
  1. 消费者代码
package com.example.rabbitmqreliable.demos;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;


@Component
public class Consumer {
    @RabbitListener(queues = RabbitMQConfig.CONFIRM_QUEUE_NAME) // 监听的队列
    public void consumerListener(Message message, Channel channel) {
        byte[] body = message.getBody();;
        String msg = new String(body);
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            // 进行业务处理
            int a = 1 / 0; //产生异常
            System.out.println("[consumerListener],msg:" + msg);

            // 没有产生异常,给服务端返回ack
            // 第一个参数:表示消息的标签,保证消息唯一性
            // 第二个数:表示是否需要进行批量应答
            channel.basicAck(deliveryTag, true);
        } catch (Exception e) {
			e.printStackTrace();
            // 产生异常
            // 给rabbitmq返回nack
            // 第三个参数:表示是否将消息重新放入队列中
            try {
               channel.basicNack(deliveryTag, true, true);
            } catch (IOException ex) {
                throw new RuntimeException(ex);
            }
        }
    }
}

启动项目后执行测试代码,控制台会不断报红,死循环。因为没有设置最大重试次数,因此我们需要统计消息的实际消费次数,可以借助redis计算。一旦消息的实际消费次数大于最大消费次数,那么此时需要给rabbitmq返回ack删除该消息,返回之前要将该消息记录数据库中,后期人工处理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/244915.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

推荐一款好用的包含表格识别的OCR网站

在当今数字化的时代,文字和表格识别已经成为了许多行业的关键技术。无论是处理大量的纸质文档,还是从网络上收集数据,OCR(光学字符识别)技术都扮演着重要的角色。然而,对于许多用户来说,OCR软件…

Infobright列存数据库原理介绍

简介 Infobright 是一个面向 OLAP 场景的开源列存数据库。比较容易找到代码的版本是 Infobright Community Edition 4.0.7,大概是 2006 年前后的代码。2016 年6 月,Infobright 决定停止开源1。由于它同时提供企业版和社区版,开源版本的功能相…

斑马zebra目标检测数据集VOC+YOLO格式2300张

斑马是由四百万年前的原马进化出来的,最早出现的斑马可能是细纹斑马。有关史前马科动物的化石现存于美国爱达荷州克文的克文化石床国家博物馆。斑马的史前马为“克文马”(美洲斑马或者克文斑马),学名为“Equussimplicidens”&…

WordPress VIP收费下载插件Erphpdown v17.0.1 开心版

会员推广下载专业版 WordPress插件(erphpdown)是模板兔开发的一款针对虚拟资源收费下载/付费下载/付费视频/收费查看/付费阅读/付费查看/VIP下载查看的插件,经过完美测试运行于wordpress 3.x-5.x版本。后续模板兔会增加更多实用的功能。 模板…

docker部署go gin框架 Linux环境

目录 文章目的是什么 环境介绍 Linux 环境下 docker 部署 go gin 详细步骤 部署 gin 文章目的是什么 假设我们学习了 go 语言,在 Linux 上安装了 go 相关的程序,也能直接运行,使用以下命令: go run main.go 假如代码是这样的…

跟着我学Python基础篇:08.集合和字典

往期文章 跟着我学Python基础篇:01.初露端倪 跟着我学Python基础篇:02.数字与字符串编程 跟着我学Python基础篇:03.选择结构 跟着我学Python基础篇:04.循环 跟着我学Python基础篇:05.函数 跟着我学Python基础篇&#…

java设计模式学习之【代理模式】

文章目录 引言代理模式简介定义与用途实现方式 使用场景优势与劣势在Spring框架中的应用图片加载示例代码地址 引言 在现实生活中,我们经常使用代理来处理我们不想直接参与或无法直接参与的事务,例如,使用律师来代表法庭上的案件。在软件开发…

计算机网络——网络层——OSPF协议的介绍

什么是 OSPF ? OSPF 是一个基于链路状态的自治系统内部路由协议,在 TCP/IP 的网络层中进行路由选择,常用于构建大型企业网络或者服务上的骨干网络。在互联网核心路由器之间也可以使用。 OSPF 概述 OSPF 使用的是 Dijkstra(最短…

Vue 实现一个弹出框,允许用户输入信息,并在确认时将输入的信息进行输出到控制台

父组件用来点击按钮弹出弹出框 <!--ParentComponent.vue--> <template><div><button click"showPopupV">点我会有个弹出框&#xff01;&#xff01;&#xff01;</button><PopupComponent v-if"showPopup" :data"p…

【退订】阿里云产品

之前因为学习需要使用了阿里云上的产品服务&#xff0c;项目结束后给忘记了&#xff0c;直到最近阿里云发短信我才知道&#xff1a; 我使用的是datawork的服务&#xff0c;现在先登录阿里云官网&#xff1a; 阿里云-计算&#xff0c;为了无法计算的价值 (aliyun.com) 之后点…

【毕业设计】基于STM32的智能衣柜设计

1、功能说明 功能如下: 1、用stm32控制ds18b20采集温度 2、然后按键可以设置上下限温度&#xff0c; 3、采集的温度低于下限温度时候 打开加热片开始加热&#xff0c; 4、加热到上限温度关闭加热片停止加热&#xff0c; 5、采集的温度可以在oled显示&#xff0c; 6、然后弄个按…

MySQL增量备份与恢复

实验环境 某学校近期在进行期中考试&#xff0c;要求数据库管理员负责一班&#xff0c;二班学生的考试成绩录入&#xff0c;为保证数据的可靠性&#xff0c;数据库管理员在录入学生成绩后均要做数据库备份&#xff0c;并且为了测试备份数据是否可 用&#xff0c;模拟数据丢失故…

MySQL数据库,视图、存储过程与存储函数

数据库对象&#xff1a; 常见的数据库对象&#xff1a; 视图&#xff1a; 视图是一种虚拟表&#xff0c;本身是不具有数据的占用很少的内存空间。 视图建立在已有表的基础上&#xff0c;视图赖以建立的这些表称为基表。 视图的创建和删除只影响视图本身&#xff0c;不影响对…

多云网络互通问题怎么解决——SD-WAN

随着业务的扩张&#xff0c;企业对云资源的用量也越来越大&#xff0c;逐渐形成了混合云架构。要解决多云网络互通的问题&#xff0c;其中一种常见的组网方案是云专线。然而&#xff0c;这种方式也带来了一系列问题&#xff0c;包括&#xff1a; 1、受服务商约束&#xff0c;需…

Docker真的好难用啊,为什么说它移植性好啊?

看起来你对Docker有点困惑和挑战呀。Docker刚开始确实有点难以入门&#xff0c;但是一旦掌握了它的核心概念和操作&#xff0c;你会发现它其实非常强大和便利。 接下来我会根据你提出的问题和场景&#xff0c;详细地解答。 关于你的实际问题&#xff1a; 刚接触时的困难是正。…

【机器学习 | 假设检验系列】假设检验系列—卡方检验(详细案例,数学公式原理推导),最常被忽视得假设检验确定不来看看?

&#x1f935;‍♂️ 个人主页: AI_magician &#x1f4e1;主页地址&#xff1a; 作者简介&#xff1a;CSDN内容合伙人&#xff0c;全栈领域优质创作者。 &#x1f468;‍&#x1f4bb;景愿&#xff1a;旨在于能和更多的热爱计算机的伙伴一起成长&#xff01;&#xff01;&…

【开源软件】最好的开源软件-2023-第17名 Gravite

自我介绍 做一个简单介绍&#xff0c;酒架年近48 &#xff0c;有20多年IT工作经历&#xff0c;目前在一家500强做企业架构&#xff0e;因为工作需要&#xff0c;另外也因为兴趣涉猎比较广&#xff0c;为了自己学习建立了三个博客&#xff0c;分别是【全球IT瞭望】&#xff0c;【…

骨灰级程序员那些年曾经告诉我们的高效学习的态度

一、背景 以前阅读陈皓老师的左耳听风专栏中关于如何高效学习的总结让我收货颇丰&#xff0c;今天总结了一下&#xff0c;分享给大家 老师说&#xff1a; 学习是一件“逆人性”的事&#xff0c;就像锻炼身体一样&#xff0c;需要人持续付出&#xff0c;会让人感到痛苦&#…

【Jenkins】节点 node、凭据 credentials、任务 job

一、节点 node Jenkins在安装并初始化完成后&#xff0c;会有一个主节点&#xff08;Master Node&#xff09;&#xff0c;默认情况下主节点可以同时运行的任务数是2&#xff0c;可以在节点配置中修改&#xff08;系统管理/节点和云管理&#xff09;。 Jenkins中的节点&#…

第十二章 React 路由配置,路由参数获取

一、专栏介绍 &#x1f436;&#x1f436; 欢迎加入本专栏&#xff01;本专栏将引领您快速上手React&#xff0c;让我们一起放弃放弃的念头&#xff0c;开始学习之旅吧&#xff01;我们将从搭建React项目开始&#xff0c;逐步深入讲解最核心的hooks&#xff0c;以及React路由、…
最新文章