Springboot整合RabbitMq,详细使用步骤

Springboot整合RabbitMq,详细使用步骤

    • 1 添加springboot-starter依赖
    • 2 添加连接配置
    • 3 在启动类上添加开启注解`@EnableRabbit`
    • 4 创建RabbitMq的配置类,用于创建交换机,队列,绑定关系等基础信息。
    • 5 生产者推送消息
    • 6 消费者接收消息
    • 7 生产者的消息回调机制
    • 8 消费者的确认机制

消息队列(Message Queue)是一种应用间的通信方式。顾名思义,将消息放到队列中,排队发出。消息发布者只管把消息发布到MQ中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。

而且消息队列一般有完整的接收确认,发布消息回调等一系列机制,可以确保接收方一定能接受。

用到的场景如:异步处理,应用解耦,流量削锋和消息通讯等。

以下先详细介绍下springboot项目怎么使用RabbitMq

1 添加springboot-starter依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2 添加连接配置

以下几项是最基础的配置,其他配置下面用到时额外添加

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest #默认用户名和密码
    password: guest
    virtual-host: /  # 虚拟主机

3 在启动类上添加开启注解@EnableRabbit

4 创建RabbitMq的配置类,用于创建交换机,队列,绑定关系等基础信息。

可以直接在java代码中通过注入实体类的方式创建交换机及队列等设备。但此方式添加的’设备‘是懒加载的形式,只要当使用到识别到监听注解或调用发送消息的方法时,才会真在rabbitmq中创建。

可以定位到amqp依赖的源码看到在程序启动的时候并不创建连接,只有在添加了监听注解启动程序或要发送消息时,才会走创建连接的方法。

配置类的示例代码如下:

@Configuration
public class RabbitConfig {
    /**
     * 队列
     */
    @Bean
    Queue createDirectQueue(){
        /**
         * durable:是否持久化,默认是false。true为持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在;false为暂存队列:当前连接有效。
         * exclusive:默认也是false。true是只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable。
         * autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
         * 一般设置一下队列的持久化就好,其余两个就是默认false
         */
        //两种创建方式
        //QueueBuilder.durable("queue.test1").build();
        return new Queue("queue.test1",true,false,false);
    }

    /**
     *  交换机
     */
    @Bean
    DirectExchange createDirectExchange(){
        /**
         * durable、autoDelete参数性质和上面队列的一致
         */
        return new DirectExchange("direct.test1",true,false);
    }

    /**
     * 将队列和交换机绑定, 并设置用于匹配键
     */
    @Bean
    Binding binding(){
        return BindingBuilder.bind(createDirectQueue()).to(createDirectExchange()).with("testRoute");
    }

}

以上是以直连交换机为例,创建其他交换机写法一样,具体对应哪个实体类可以在Exchange接口 —>AbstractExchange实现类下看到。

在这里插入图片描述

可以通过客户端看到队列、交换机、路由关系已经创建成功

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

5 生产者推送消息

@Autowired
RabbitTemplate rabbitTemplate;

@PostMapping("/sendMessage")
public AjaxResult sendMessage(@RequestBody Map params) {
    String id = UUID.randomUUID().toString();
    String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
    params.put("messageId",id);
    params.put("createTime",createTime);
    /**
     * 发给交换机,在发给路由绑定的队列
     */
    rabbitTemplate.convertAndSend("direct.test1","testRoute",params);
    return AjaxResult.success("成功");
}

可以看到,rabbitmq成功接收到消息。

在这里插入图片描述
在这里插入图片描述

6 消费者接收消息

@Component
@RabbitListener(queues = "queue.test1")
public class Receiver {
    @RabbitHandler
    public void process(Map message){
        System.out.printf("消费者接收到消息:" + message.toString());
    }
}

可以看到消息成功被消费,监听处理方法也成功被执行

在这里插入图片描述

​ 如果多个监听器监听同一个队列,是轮询的方式进行消费,不会出现重复消费的情况;如果多个队列同时以相同的路由绑定同一个交换机,消息会以复制的形式发送至每个队列。

7 生产者的消息回调机制

在实际运用中,作为消息的生产者,很多时候我们需要确认消息是否成功发送到了mq中。同时我们还需要知道,假如消息出现异常时的异常情况。为了满足这个业务场景,我们就需要配置消息回调。

  • 增加配置项

    spring:
     rabbitmq:
      publisher-confirm-type: correlated #消息发送成功交互
      publisher-returns: true
    

    可能之前老的版本是publisher-confirm:true,但现在写的话会发现变红了,说明过时了。因为在springboot的自动配置依赖里该配置级别已经为error

在这里插入图片描述

  • 目前回调包含发送成功回调ConfirmCallback和失败回调ReturnsCallback。一些老版本的可能有ReturnCallback。下面先自定义两个回调的回调方法

    ConfirmCallback的回调

    /**
     * 消息发送成功回调
     */
    public class RabbitConfirmCallBack implements RabbitTemplate.ConfirmCallback {
    
        /**
         * 消息成功到达exchange,ack=true
         * @param correlationData
         * @param ack
         * @param s
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String s) {
            System.out.println("相关数据:" + correlationData);
            System.out.println("确认状态:" + ack);
            System.out.println("造成原因:" + s);
        }
    }
    

    ReturnsCallback的回调

    /**
     * 发生异常时的消息返回提醒
     */
    public class RabbitReturnsCallback implements RabbitTemplate.ReturnsCallback {
        @Override
        public void returnedMessage(ReturnedMessage returnedMessage) {
            System.out.println("失败回调:" + returnedMessage);
        }
    }
    

    将自定义回调配置到模板中

    在Rabbit配置类中添加RabbitTemplate并配置两个回调

    @Configuration
    public class RabbitConfig {
    
        @Bean
        public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
            RabbitTemplate rabbitTemplate = new RabbitTemplate();
            rabbitTemplate.setConnectionFactory(connectionFactory);
            //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
            rabbitTemplate.setMandatory(true);
    
            rabbitTemplate.setConfirmCallback(new RabbitConfirmCallBack());
            rabbitTemplate.setReturnsCallback(new RabbitReturnsCallback());
            return rabbitTemplate;
        }
    }
    

    那以上两种回调函数什么时候回执行呢?

    1. 消息发送到exchange,且传播到队列,则只有ConfirmCallback回调,ack=true
    2. 消息发送不到exchange,则只有ConfirmCallback回调,ack=false
    3. 消息发送到exchange,没传播到队列(或找不到路由),则ConfirmCallback回调,ack=true、ReturnsCallback回调

由此可见ConfirmCallback回调是exchange的一种反馈,是发生在生产者和交换机之间的,无论能不能发到都会回调。消息发送出去如果收到交换机的确认反馈则回调为成功,如果没有收到确认反馈,则回调为失败。

ReturnsCallback回调是队列的一种反馈,是发生在交换机和队列之间的。只有消息先到达交换机,且发送到队列失败才会执行此回调。

下面是对以上三种情况的测试

  • 消息完全成功发送到队列

    模拟:交换机和路由都存在

    @PostMapping("/sendMessage")
    public AjaxResult sendMessage(@RequestBody Map params) {
        String id = UUID.randomUUID().toString();
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        params.put("messageId",id);
        params.put("createTime",createTime);
        //direct.test1和testRoute都存在
        rabbitTemplate.convertAndSend("direct.test1","testRoute",params);
        return AjaxResult.success("成功");
    }
    

    消费者监听且ConfirmCallback回调为true
    在这里插入图片描述

  • 消息没有发送到exchange

    模拟:交换机不存在

    @PostMapping("/sendMessageFailByNoExchange")
        public AjaxResult sendMessageFailByNoExchange(@RequestBody Map params) {
            String id = UUID.randomUUID().toString();
            String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
            params.put("messageId",id);
            params.put("createTime",createTime);
            //该交换机不存在
            rabbitTemplate.convertAndSend("direct.exchange不存在","testRoute",params);
            return AjaxResult.success("成功");
        }
    

    ConfirmCallback回调为false
    在这里插入图片描述

  • 消息发送到exchange,但没发送到队列

    模拟:该交换机存在但该路由不存在

    @PostMapping("/sendMessageFailByNoRoute")
    public AjaxResult sendMessageFailByNoRoute(@RequestBody Map params) {
        String id = UUID.randomUUID().toString();
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        params.put("messageId",id);
        params.put("createTime",createTime);
    	//交换机存在但该路由不存在
        rabbitTemplate.convertAndSend("direct.test1","failRoute",params);
        return AjaxResult.success("成功");
    }
    

    ConfirmCallback回调为true,ReturnsCallback失败回调执行
    在这里插入图片描述

可以通过两个回调确定哪些消息没有成功发送到队列,记录下来再次发送,保证消息不丢失。

8 消费者的确认机制

消费者和生产者不同,消费者本身就是凭自己喜好,符合条件才会消费。

所有消费者的确认机制有三种模式:

  1. 自动确认

    是默认的消息确认模式,即mq成功将消息发出,消费者成功接收到,就反馈确认。不管消费者是不是已经成功处理。

    所以如果处理逻辑抛出异常,就相当于丢失了消息。

    一般这种情况我们都是使用try catch捕捉异常后,打印日志用于追踪数据,这样找出对应数据再做后续处理。

  2. 手动确认

  • 自动确认

    自动确认没什么好说的,消费者确认机制的默认模式就是auto,自动反馈确认,所以可以看到只要消息被消费了队列中就不存在了。

  • 手动确认

    消费者收到消息后,手动调用basic.ack/basic.nack/basic.reject后,RabbitMQ收到这些消息后,才认为本次投递成功。

    • basic.ack:确认正确
    • basic.nack:拒绝确认,可以选择是否重新发回队列、是否批处理
    • basic.reject:拒绝确认,可以选择是否重新发回队列

    后两者对应的方法为channel.basicNackchannel.basicReject两者都表示消息没有被正常处理。其中有个参数requeue,选择是否重新入队,开启此项可以避免消息丢失。

    但开启要慎重,如果使用不当会导致一些每次都被你重入列的消息一直消费-入列-消费-入列这样循环,导致消息积压。

    两者有略微的区别channel.basicNack可以拒绝多个消息,channel.basicReject只能拒绝一个

    下面看下代码怎么实现

    如果使用的是RabbitListener注解,需要将ackMode设置为手动模式ackMode="MANUAL"

    三个种情况分别对应下面 【1、2、3】三个方法

    @RabbitHandler
        @RabbitListener(queues = "queue.test1",ackMode = "MANUAL")
        public void processQueueTest1(Map param, Message message, Channel channel) throws IOException {
            /**
             * 【1 确认】
             * deliveryTag:消息的标识符
             * multiple:
             *     false:仅确认当前消息
             *     true:确认所有消息
             */
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
             /**
             *  【2 拒绝】
             *  第一个参数是消息的唯一ID
             *  第二个参数表示是否批量处理
             *  第三个参数表示是否将消息重发回队列
             */
            //channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
            /**
             * 【3 拒绝】
             * 第一个参数deliveryTag表示消息ID
             * 第二个参数为true表示是否重新入列,如果是true则重新丢回队列里等待再次消费,否则数据只是被消费,不会丢回队列里
             */
    		//channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
            System.out.println("queue.test1消费者接收到消息:" + param.toString());
            System.out.println("message:" + message);
            System.out.println("channel:" + channel);
        }
    

    channel.basicAck确认

    以上代码中channel.basicAck是消费者向rabbitmq发送确认消息。向queue.test1队列发送消息,此时开启了手动确认,如果不写此行,队列中会一直存在一条Unacked(未确认)的消息。
    在这里插入图片描述
    执行了channel.basicAck消息才会被消费,如下图已经无滞留消息。
    在这里插入图片描述
    channel.basicNack、channel.basicReject否认
    可以看到拒绝消息之后,因为requeue参数为true,消息会被重新入队,入队后再次等待被消费者消费。如果requeue设为false的话则队列中该消息就是已经被消费。一般情况可以单独记录下,在轮询发送到队列。
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/72756.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Win11中使用pip或者Cython报错 —— error: Microsoft Visual C++ 14.0 is required.

第一步&#xff1a;下载Visual Studio 2019 下载地址&#xff1a; https://learn.microsoft.com/zh-cn/visualstudio/releases/2019/release-notes 第二步&#xff1a;安装组件 选择单个组件&#xff0c;勾选以下两个组件 其他错误&#xff1a; 无法打开文件“python37.li…

网络编程(TFTP协议实验)

#include <stdio.h> #include <string.h> #include <stdlib.h> #include <head.h> #include <sys/types.h> #include <sys/socket.h> #include <arpa/inet.h> #include <netinet/in.h>#define PORT 69 //端口号&#xf…

16通道AD采集FMC子卡推荐哪些?

FMC149是一款16通道65MHz采样率14位直流耦合AD采集FMC子卡&#xff0c;符合VITA57.1规范&#xff0c;可以作为一个理想的IO模块耦合至FPGA前端&#xff0c;16通道AD通过FMC连接器&#xff08;HPC&#xff09;连接至FPGA从而大大降低了系统信号延迟。 该板卡支持板上可编程采样…

难解的bug

android.app.RemoteServiceException: Context.startForegroundService() did not then call Service.startForeground(): ServiceRecord 【Android TimeCat】 解决 context.startforegroundservice() did not then call service.startforeground() | XiChens Blog http://www…

Vue+ElementUI实现选择指定行导出Excel

这里记录一下&#xff0c;今天写项目时 的一个需求&#xff0c;就是通过复选框选中指定行然后导出表格中选中行的Excel表格 然后这里介绍一个工具箱(模板)&#xff1a;vue-element-admin 将它拉取后&#xff0c;运行就可以看到如下界面&#xff1a; 这里面的很多功能都已经实现…

【TypeScript】进阶之路语法细节,类型和函数

进阶之路 类型别名(type)的使用接口(interface)的声明的使用二者区别&#xff1a; 联合类型和交叉类型联合类型交叉类型 类型断言获取DOM元素 非空类型断言字面量类型的使用类型缩小&#xff08;类型收窄&#xff09;TypeScript 函数类型函数类型表达式内部规则检测函数的调用签…

线性规划模型-应用篇

文章目录 模型特点使用技巧工具包和求解器模型线性化 应用实例经验总结 模型特点 上一篇中&#xff0c;详细阐述了线性规划问题和单纯形法的算法原理&#xff0c;本文将着重介绍线性模型在工业场景中的应用。 首先需要说清楚的是&#xff0c;为什么线性模型深受研发人员青睐。…

【dnf5文档】新一代RedHat自动化包管理器

前言 HI,CSDN的码友们&#xff0c;距离上一次我发文章已经过去了半年的时间&#xff0c;现在我又来介绍自己新发现和探究的开源技术了。计算机的发展总是飞速的&#xff0c;当我在写这篇文章的时候&#xff0c;Fedora rawhide已经进入了40版本、默认采用的自动化包管理器为dnf…

iPhone苹果手机触屏失灵无法关机,如何强制重启

参考:https://zhuanlan.zhihu.com/p/615223121 1&#xff0c;只轻按一下音量上键后快速松开 2&#xff0c;只轻按一下音量下键后快速松开 3&#xff0c;只按住右侧电源键长按不松手&#xff0c;直到手机关机。

什么是CSS中的渐变(gradient)?如何使用CSS创建线性渐变和径向渐变?

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ 渐变&#xff08;Gradient&#xff09;在CSS中的应用⭐ 线性渐变&#xff08;Linear Gradient&#xff09;语法&#xff1a;示例&#xff1a; ⭐ 径向渐变&#xff08;Radial Gradient&#xff09;语法&#xff1a;示例&#xff1a; ⭐ 写…

CentOS安装Postgresql

PG基本安装步骤 安装postgresql&#xff1a; sudo yum install postgresql-server初始化数据库&#xff1a;安装完毕后&#xff0c;需要初始化数据库并创建初始用户&#xff1a; sudo postgresql-setup initdb启动和停止服务&#xff1a; sudo systemctl start postgresql sudo…

FreeRTOS源码分析-12 低功耗管理

目录 1 STM32低功耗管理概念及应用 1.1睡眠模式 1.2 停止模式 1.3 待机模式 2 Tickless低功耗管理 2.1 Tickless低功耗模式介绍 2.2 FreeRTOS低功耗模式配置 2.3 FreeRTOS低功耗模式应用 3 低功耗管理实际项目开发 3.1 低功耗设计必须要掌握的硬件知识 …

什么是BFC?它有什么作用?如何创建BFC?

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ 什么是BFC⭐ BFC的作用⭐ 创建BFC的方法⭐ 写在最后 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 记得点击上方或者右侧链接订阅本专栏哦 几何带你启航前端之旅 欢迎来到前端入门之旅&#xff01;这个专栏是为那些对Web…

TENNECO EDI 项目——X12与XML之间的转换

近期为了帮助广大用户更好地使用 EDI 系统&#xff0c;我们根据以往的项目实施经验&#xff0c;将成熟的 EDI 项目进行开源。用户安装好知行之桥EDI系统之后&#xff0c;只需要下载我们整理好的示例代码&#xff0c;并放置在知行之桥指定的工作区中&#xff0c;即可开始使用。 …

在Java中操作Redis(详细-->从环境配置到代码实现)

在Java中操作Redis 文章目录 在Java中操作Redis1、介绍2、Jedis3、Spring Data Redis3.1、对String的操作3.2、对哈希类型数据的操作3.3、对list的操作3.4、对set类型的操作3.5、对 ZSet类型的数据&#xff08;有序集合&#xff09;3.6、通用类型的操作 1、介绍 Redis 的Java客…

C语言笔试训练【第六天】

大家好&#xff0c;我是纪宁。今天是C语言笔试训练的第6天&#xff0c;加油&#xff01; 往期回顾&#xff1a; C语言笔试训练【第五天】 C语言笔试训练【第四天】 C语言笔试训练【第三天】 C语言笔试训练【第二天】 C语言笔试训练【第一天】 1、以下叙述中正确的是&…

client-go实战之十二:选主(leader-election)

欢迎访问我的GitHub 这里分类和汇总了欣宸的全部原创(含配套源码)&#xff1a;https://github.com/zq2599/blog_demos 本篇概览 本文是《client-go实战》系列的第十二篇&#xff0c;又有一个精彩的知识点在本章呈现&#xff1a;选主(leader-election)在解释什么是选主之前&…

【2023 华数杯全国大学生数学建模竞赛】 A题 隔热材料的结构优化控制研究 问题分析及完整论文

【2023 华数杯全国大学生数学建模竞赛】 A题 隔热材料的结构优化控制研究 问题分析及完整论文 1 题目 A 题 隔热材料的结构优化控制研究 新型隔热材料 A 具有优良的隔热特性&#xff0c;在航天、军工、石化、建筑、交通等高科技领域中有着广泛的应用。 目前&#xff0c;由单…

微服务Ribbon-负载均衡策略和饥饿加载

目录 一、负载均衡策略 1.1 负载均衡策略介绍 1.2 自定义负载均衡策略 二、饥饿加载 &#xff08;笔记整理自bilibili黑马程序员课程&#xff09; 一、负载均衡策略 1.1 负载均衡策略介绍 负载均衡的规则都定义在IRule接口中&#xff0c;而IRule有很多不同的实现类&…

使用巴特沃兹滤波器的1D零相位频率滤波研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…
最新文章