SpringBoot 整合RabbitMq 自定义消息监听容器来实现消息批量处理

SpringBoot 整合RabbitMq 自定义消息监听容器来实现消息批量处理

  • 前言
  • 添加依赖
  • 配置文件
  • 编写监听器
  • 创建SimpleRabbitListenerContainerFactory
  • 发送消息

前言

RabbitMQ是一种常用的消息队列,Spring Boot对其进行了深度的整合,可以快速地实现消息的发送和接收。在RabbitMQ中,消息的发送和接收都是异步的,因此需要使用监听器来监听消息的到来。Spring Boot中提供了默认的监听器容器,但是有时候我们需要自定义监听器容器,来满足一些特殊的需求,比如批量获取数据。
在这里插入图片描述

在本文中,我们将使用Spring Boot来整合RabbitMQ,并自定义一个监听器容器,实现批量获取数据的功能。
前置条件:
在开始之前,您需要具备以下条件:

  • 已经安装好RabbitMQ服务器并启动。
  • 已经创建好要使用的队列。
  • 已经熟悉了Spring Boot和RabbitMQ的基本知识。

环境准备:
在开始之前,我们需要准备好以下环境:

  • JDK 1.8或以上版本
  • Spring Boot 2.5.0或以上版本
  • RabbitMQ 3.8.0或以上版本

添加依赖

首先,在pom.xml文件中添加以下依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置文件

接下来,在application.properties文件中添加以下配置:

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/

# 队列名称
spring.rabbitmq.listener.simple.queue-name=myQueue

# 最大并发消费者数量
spring.rabbitmq.listener.simple.concurrency=5

# 最小数量
spring.rabbitmq.listener.simple.min-concurrency=1

# 最大数量
spring.rabbitmq.listener.simple.max-concurrency=10

# 批量处理消息的大小
spring.rabbitmq.listener.simple.batch-size=50

spring:
  rabbitmq:
    host: localhost
    listener:
      simple:
        batch-size: 50
        concurrency: 5
        max-concurrency: 10
        min-concurrency: 1
        queue-name: myQueue
    password: guest
    port: 5672
    username: guest
    virtual-host: /

编写监听器

然后,我们需要创建一个监听器类,以便处理从队列中接收到的消息。以下是一个简单的示例:

@Component
public class MyListener {
    
    @RabbitListener(queues = "myQueue", containerFactory = "myFactory")
    public void handleMessage(List<MyMessage> messages, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag)
            throws IOException {
        try {
            // 处理消息
            System.out.println("Received " + messages.size() + " messages");
            for (Message message : messages) {
           		// 处理消息
            	System.out.println("Received message: " + new String(message.getBody()));
        	}
        	channel.basicAck(messages.get(messages.size() - 1).getMessageProperties().getDeliveryTag(), true);
        } finally {
            // 手动确认消息
            channel.basicAck(deliveryTag, true);
        }
    }
}

在上面的代码中,我们使用了@RabbitListener注解来指定要监听的队列名称,同时也指定了使用myFactory工厂来创建监听容器。在这个监听器中,我们简单地打印了接收到的消息。

创建SimpleRabbitListenerContainerFactory

接下来,我们需要创建一个SimpleRabbitListenerContainerFactory工厂,以便能够自定义监听容器的行为。以下是一个简单的示例:

@Configuration
public class RabbitMQConfig {

//    @Bean
//    public SimpleRabbitListenerContainerFactory myFactory(ConnectionFactory connectionFactory) {
//        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
//        factory.setConnectionFactory(connectionFactory);
//        factory.setConcurrentConsumers(1);
//        factory.setMaxConcurrentConsumers(10);
//        factory.setBatchListener(true);
//        factory.setBatchSize(50);
//        return factory;
//    }

	@Bean
    public SimpleRabbitListenerContainerFactory myFactory(
            ConnectionFactory connectionFactory,
            PlatformTransactionManager transactionManager,
            MessageConverter messageConverter) {
        
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        
        // 并发消费者数,默认为 1
        factory.setConcurrentConsumers(5);
        
        // 最大并发消费者数,默认为 1
        factory.setMaxConcurrentConsumers(10);
        
        // 拒绝未确认的消息并重新将它们放回队列,默认为 true
        factory.setDefaultRequeueRejected(false);
        
        // 容器启动时是否自动启动,默认为 true
        factory.setAutoStartup(true);
        
        // 消息确认模式,默认为 AUTO
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        
        // 每个消费者在一次请求中预获取的消息数,默认为 1
        factory.setPrefetchCount(5);
        
        // 从队列中接收消息的超时时间,默认为 0,表示没有超时限制
        factory.setReceiveTimeout(1000);
        
        // 与容器一起使用的事务管理器。默认情况下,容器不会使用事务
        factory.setTransactionManager(transactionManager);
        
        // 消息转换器,用于将接收到的消息转换为 Java 对象或将 Java 对象转换为消息
        factory.setMessageConverter(messageConverter);
        
        // 用于异步消息处理的线程池。默认情况下,容器使用一个简单的 SimpleAsyncTaskExecutor
        factory.setTaskExecutor(new SimpleAsyncTaskExecutor());
        
        // 在关闭容器时等待活动线程终止的时间,默认为 5000 毫秒
        factory.setShutdownTimeout(10000);
        
        // 重试失败的消息之前等待的时间,默认为 5000 毫秒
        factory.setRecoveryInterval(5000);
        
        // 如果消息处理器尝试监听不存在的队列,是否抛出异常。默认为 true
        factory.setMissingQueuesFatal(false);
        
        // 监听器容器连接工厂
        factory.setConnectionFactory(connectionFactory);

        return factory;
    }
}

这些属性中的大多数都是可选的,可以根据需要进行设置。根据应用程序的需求,我们可以自由地调整这些属性,以提高应用程序的性能和可靠性。

发送消息

最后,我们可以编写一个简单的发送消息的代码来向队列中发送一些消息。以下是一个简单的示例:

@Component
public class MySender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(String message) {
        for (int i = 0; i < 100; i++) {
            rabbitTemplate.convertAndSend("myQueue", "message:" + i);
        }
    }
}

总之, SimpleRabbitListenerContainerFactory 提供了一个灵活而强大的机制,用于自定义和配置 RabbitMQ 消息监听容器。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/5909.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

PCB模块化设计16——RS232,RS485接口模块PCB布局布线设计规范

目录PCB模块化设计16——RS232&#xff0c;RS485接口模块PCB布局布线设计规范RS232接口模块1、接口概述2、接口电路 原理图的EMC设计3、连接器设计4、线缆设计5、RS-232常规管脚定义&#xff1a;6、RS-232知识要点RS485接口模块1、原理图设计方案1、RS485接口6KV防雷电路设计方…

c语言程序笔记(1)

C语言笔记&#xff08;1&#xff09;——B站翁恺视频 程序框架 #include <stdio.h> int main() {//printf("hello world!\n");return 0; }1、变量与常量。 例子1&#xff1a; #include <stdio.h> int main() {printf("1234%d",1234);return …

图解LeetCode——合并两个有序链表

如果你喜欢这篇文章的话&#xff0c;请给作者点赞关注哟&#xff0c;你的支持是我不断前进的动力&#xff01; 目录 题目描述&#xff1a; 解法&#xff1a; 完整代码&#xff1a; 结果 题目链接&#xff1a;力扣 题目描述&#xff1a; 将两个升序链表合并为一个新的 升序…

2017世界互联网领先成果来了 光量子计算机

演讲者&#xff1a;陆朝阳中国科学技术大学教授 发布了世界上首台超越早期经典计算机的光量子计算机 陆朝阳&#xff1a;很高兴向大家报告中国科学院在量子计算这个领域取得的基础性的研究成果。 我们知道50多年以来摩尔定律一直见证着计算机的更新换代&#xff0c;之前每过18个…

【新2023Q2模拟题JAVA】华为OD机试 - 绘图机器

最近更新的博客 华为od 2023 | 什么是华为od,od 薪资待遇,od机试题清单华为OD机试真题大全,用 Python 解华为机试题 | 机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南华为od机试,独家整理 已参加机试人员的实战技巧本篇题解:绘图机器 题目 绘图机器的绘…

读书笔记-纳瓦尔宝典-2023.04.01

重点 财富 如何构造高价值信息 判断力 何为幸福 启发 最近看了这本书的大部分内容&#xff0c;感悟颇多&#xff0c;及时记录下来。 因为是快速阅读&#xff0c;还未做深入思考和实践&#xff0c;但对总体的内容有一个大致把握&#xff0c;未来会结合行动反复阅读和思考&…

python画爱心代码

前几天在网上看到了一个画爱心的教程&#xff0c;就是在 Python里面画一个爱心&#xff0c;但是我在网上找到的代码不是很好用&#xff0c;所以我就自己写了一遍。 首先我们先创建一个新的 python文件。新建一个 python文件夹&#xff0c;将我们之前的那个 python文件夹复制到这…

蓝桥杯·3月份刷题集训Day03

本篇博客旨在记录自已打卡蓝桥杯3月份刷题集训&#xff0c;同时会有自己的思路及代码解答希望可以给小伙伴一些帮助。本人也是算法小白&#xff0c;水平有限&#xff0c;如果文章中有什么错误之处&#xff0c;希望小伙伴们可以在评论区指出来&#xff0c;共勉&#x1f4aa;。 文…

2021年第十二届蓝桥杯省赛Java B组真题及详细题解

A试题 : ASC【填空题】 本题总分&#xff1a; 5 分 【1、问题描述】 已知大写字母 A 的 ASCII 码为 65&#xff0c;请问大写字母 L 的 ASCII 码是多少&#xff1f; 【2、答案提交】 这是一道结果填空的题&#xff0c;你只需要算出结果后提交即可。本题的结果为一个整数&#…

二十、Javascript API(一)

1. Atomics和SharedArrayBuffer 多个上下文访问 SharedArrayBuffer时&#xff0c;如果同时对缓冲区执行操作&#xff0c;就可能出现资源争用问题。Atomics API 通过强制同一时刻只能对缓冲区执行一个操作&#xff0c;可以让多个上下文安全地读写一个SharedArrayBuffer。 1.1 …

Android HTTP请求方式

1.HttpClient使用流程 基本流程&#xff1a; 2.HttpClient使用示例 1&#xff09;使用HttpClient发送GET请求 直接贴下简单的发送Get请求的代码&#xff1a; public class MainActivity extends Activity implements OnClickListener { private Button btnGet; private WebV…

STM-32:GPIO 输出-点亮LED-流水灯-蜂鸣器

目录一、GPIO1.1GPIO简介1.2GPIO 硬件解析1.2.1保护二极管1.2.2 P-MOS、N-MOS 管1.2.3数据输入输出寄存器1.2.4复用功能输出1.2.5模拟输入输出1.3GPIO 的工作模式1.3.1 输入模式 (模拟/浮空/上拉/下拉)1.3.2 输出模式 (推挽/开漏)1.3.3 复用功能 (推挽/开漏)1.3.4 小结二、GPIO…

ChatGPT将引发大量而普遍的网络安全隐患

ChatGPT是一个基于人工智能的语言生成模型&#xff0c;它可以在任何给定的时间&#xff0c;使用自然语言生成技术&#xff0c;生成文本、对话和文章。它不仅可以被用来编写文本&#xff0c;还可以用来编写语言、生成图像和视频。目前&#xff0c; ChatGPT已广泛应用于语言翻译、…

【数据结构篇】-树(共计两万字,你真的搞懂了它吗)

友情链接&#xff1a;【数据结构与算法】首篇 - 思维导图 - 各部分内容目录 文章目录&#x1f680;树&#x1f6a2;一、树的原理精讲&#xff08;一&#xff09;树的定义&#xff08;二&#xff09;基本术语&#xff08;三&#xff09;树的性质&#x1f6a2;二、树的存储结构&a…

C++ STL:stack和queue的使用和底层实现

目录 一. 什么是stack和deque 二. stack和queue的使用方法 2.1 stack的常用接口 2.2 queue的常用接口 三. stack和queue的底层实现原理 3.1 容器适配器 3.2 deque&#xff08;双端队列&#xff09;的概念及抽象结构 3.3 deque的底层实现结构 3.4 deque的优缺点 —— 为…

try... excpet BaseException(异常处理捕获)

try ...except 是最常见的捕获处理异常的结构&#xff0c;其主要作用是将可能出现问题的代码块用try &#xff1a;包裹起来&#xff0c;不至于出现错误让程序崩溃&#xff0c;无法执行下去常见的try ...excpet 的结构有三种try&#xff1a;pass except BaseException as e &…

Azure SQL基础到实战(2)-部署

目录Azure 上的数据库服务的演变Azure SQL 部署选项Azure 虚拟机上的 SQL ServerIaaS 与PaaS无版本数据库服务SQL 托管实例SQL 数据库弹性数据库池Azure 上的数据库服务的演变 Azure SQL 是 Microsoft 作为 Azure 云计算平台的一部分提供的云数据库产品/服务。 与其他版本的 S…

含光热电站、有机有机朗肯循环、P2G的综合能源优化调度(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

算法---扫雷游戏

题目 让我们一起来玩扫雷游戏&#xff01; 给你一个大小为 m x n 二维字符矩阵 board &#xff0c;表示扫雷游戏的盘面&#xff0c;其中&#xff1a; ‘M’ 代表一个 未挖出的 地雷&#xff0c; ‘E’ 代表一个 未挖出的 空方块&#xff0c; ‘B’ 代表没有相邻&#xff08;…

服务器部署前后端分离项目

服务器部署前后端分离项目 文章目录服务器部署前后端分离项目一、安装环境安装jdk1、在/usr/local目录下创建jdk文件夹&#xff0c;并将jdk安装包放到/usr/local/jdk包下并解压2、配置jdk的环境变量3、进行编译&#xff0c;4、检测是否安装成功安装tomcat1、将tomcat放到/usr/l…
最新文章