Spring Boot+RocketMQ 实现多实例分布式环境下的事件驱动

为什么要使用MQ?

在Spring Boot Event这篇文章中已经通过Guava或者SpringBoot自身的Listener实现了事件驱动,已经做到了对业务的解耦。为什么还要用到MQ来进行业务解耦呢?

首先无论是通过Guava还是Spring Boot自身提供的监听注解来实现的事件驱动他都是处于同一进程中的,意思就是当前事件推送后只有当前的进程可以进行消费。通过MQ可以实现将事件推送到进程外的Broker中,在多实例/分布式环境下,其他的服务在订阅同一事件(Topic)时,可以在各自的服务中进行消费,最大化空闲服务的利用。

image-20231227120405997

源码地址:Gitee

整合RocketMQ

依赖版本

  • JDK 17
  • Spring Boot 3.2.0
  • RocketMQ-Client 5.0.4
  • RocketMQ-Starter 2.2.0

可以参考这篇进行RocketMQ安装

Spring Boot 3.0+ 取消了对spring.factories的支持。所以在导入时需要手动引入RocketMQ的配置类。

引入RocketMQ依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client-java</artifactId>
    <version>5.0.4</version>
</dependency>
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.0</version>
</dependency>

解决Spring Boot3+不兼容 spring.factories

rocketmq-spring-boot-starter:2.2.2版本中:
image-20231227062105302

参考配置文件

# RocketMQ 配置
rocketmq:
  name-server: 127.0.0.1:9876
  consumer:
    group: event-mq-group
    # 一次拉取消息最大值,注意是拉取消息的最大值而非消费最大值
    pull-batch-size: 1
  producer:
    # 发送同一类消息的设置为同一个group,保证唯一
    group: event-mq-group
    # 发送消息超时时间,默认3000
    sendMessageTimeout: 10000
    # 发送消息失败重试次数,默认2
    retryTimesWhenSendFailed: 2
    # 异步消息重试此处,默认2
    retryTimesWhenSendAsyncFailed: 2
    # 消息最大长度,默认1024 * 1024 * 4(默认4M)
    maxMessageSize: 4096
    # 压缩消息阈值,默认4k(1024 * 4)
    compressMessageBodyThreshold: 4096
    # 是否在内部发送失败时重试另一个broker,默认false
    retryNextServer: false

参考Issue

  • 方法一 :通过@Import(RocketMQAutoConfiguration.class)在配置类中引入

  • 方法二:在resources资源目录下创建文件夹及文件META-INF/springorg.springframework.boot.autoconfigure.AutoConfiguration.imports
    文件内容为RocketMQ自动配置类路径:org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration

RocketMQ 使用

解决Spring Boot3+不支持spring.factories的问题

import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Import;

/**
 * 启动类
 */
@Import(RocketMQAutoConfiguration.class)
@SpringBootApplication
public class MQEventApplication {
    public static void main(String[] args) {
        SpringApplication.run(MQEventApplication.class, args);
    }
}

RocketMQ操作工具

RocketMQ Message实体

import cn.hutool.core.util.IdUtil;
import jakarta.validation.constraints.NotBlank;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

import java.io.Serializable;
import java.util.List;

/**
 * RocketMQ 消息
 */
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class RocketMQMessage<T> implements Serializable {

    /**
     * 消息队列主题
     */
    @NotBlank(message = "MQ Topic 不能为空")
    private String topic;

    /**
     * 延迟级别
     */
    @Builder.Default
    private DelayLevel delayLevel = DelayLevel.OFF;

    /**
     * 消息体
     */
    private T message;

    /**
     * 消息体
     */
    private List<T> messages;

    /**
     * 使用有序消息发送时,指定发送到队列
     */
    private String hashKey;

    /**
     * 任务Id,用于日志打印相关信息
     */
    @Builder.Default
    private String taskId = IdUtil.fastSimpleUUID();
}

RocketMQTemplate 二次封装

import com.yiyan.study.domain.RocketMQMessage;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * RocketMQ 消息工具类
 */
@Slf4j
@Component
public class RocketMQService {

    @Resource
    private RocketMQTemplate rocketMQTemplate;

    @Value("${rocketmq.producer.sendMessageTimeout}")
    private int sendMessageTimeout;

    /**
     * 异步发送消息回调
     *
     * @param taskId 任务Id
     * @param topic  消息主题
     * @return the send callback
     */
    private static SendCallback asyncSendCallback(String taskId, String topic) {
        return new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("ROCKETMQ 异步消息发送成功 : [TaskId:{}] - [Topic:{}] - [SendStatus:{}]", taskId, topic, sendResult.getSendStatus());
            }

            @Override
            public void onException(Throwable throwable) {
                log.error("ROCKETMQ 异步消息发送失败 : [TaskId:{}] - [Topic:{}] - [ErrorMessage:{}]", taskId, topic, throwable.getMessage());
            }
        };
    }

    /**
     * 发送同步消息,使用有序发送请设置HashKey
     *
     * @param message 消息参数
     */
    public <T> void syncSend(RocketMQMessage<T> message) {
        log.info("ROCKETMQ 同步消息发送 : [TaskId:{}] - [Topic:{}]", message.getTaskId(), message.getTopic());
        SendResult sendResult;
        if (StringUtils.isNotBlank(message.getHashKey())) {
            sendResult = rocketMQTemplate.syncSendOrderly(message.getTopic(), message.getMessage(), message.getHashKey());
        } else {
            sendResult = rocketMQTemplate.syncSend(message.getTopic(), message.getMessage(), sendMessageTimeout, message.getDelayLevel().getLevel());
        }
        log.info("ROCKETMQ 同步消息发送结果 : [TaskId:{}] - [Topic:{}] - [MessageId:{}] - [SendStatus:{}]",
                message.getTaskId(), message.getTopic(), sendResult.getMsgId(), sendResult.getSendStatus());
    }

    /**
     * 批量发送同步消息
     *
     * @param message 消息参数
     */
    public <T> void syncSendBatch(RocketMQMessage<T> message) {
        log.info("ROCKETMQ 同步消息-批量发送 : [TaskId:{}] - [Topic:{}] - [MessageCount:{}]",
                message.getTaskId(), message.getTopic(), message.getMessages().size());
        SendResult sendResult;
        if (StringUtils.isNotBlank(message.getHashKey())) {
            sendResult = rocketMQTemplate.syncSendOrderly(message.getTopic(), message.getMessages(), message.getHashKey());
        } else {
            sendResult = rocketMQTemplate.syncSend(message.getTopic(), message.getMessages());
        }
        log.info("ROCKETMQ 同步消息-批量发送结果 : [TaskId:{}] - [Topic:{}] - [MessageId:{}] - [SendStatus:{}]",
                message.getTaskId(), message.getTopic(), sendResult.getMsgId(), sendResult.getSendStatus());
    }

    /**
     * 异步发送消息,异步返回消息结果
     *
     * @param message 消息参数
     */
    public <T> void asyncSend(RocketMQMessage<T> message) {
        log.info("ROCKETMQ 异步消息发送 : [TaskId:{}] - [Topic:{}]", message.getTaskId(), message.getTopic());
        if (StringUtils.isNotBlank(message.getHashKey())) {
            rocketMQTemplate.asyncSendOrderly(message.getTopic(), message.getMessage(), message.getHashKey(),
                    asyncSendCallback(message.getTaskId(), message.getTopic()));
        } else {
            rocketMQTemplate.asyncSend(message.getTopic(), message.getMessage(),
                    asyncSendCallback(message.getTaskId(), message.getTopic()), sendMessageTimeout, message.getDelayLevel().getLevel());
        }
    }

    /**
     * 批量异步发送消息
     *
     * @param message 消息参数
     */
    public <T> void asyncSendBatch(RocketMQMessage<T> message) {
        log.info("ROCKETMQ 异步消息-批量发送 : [TaskId:{}] - [Topic:{}] - [MessageCount:{}]",
                message.getTaskId(), message.getTopic(), message.getMessages().size());
        if (StringUtils.isNotBlank(message.getHashKey())) {
            rocketMQTemplate.asyncSendOrderly(message.getTopic(), message.getMessages(), message.getHashKey(),
                    asyncSendCallback(message.getTaskId(), message.getTopic()));
        } else {
            rocketMQTemplate.asyncSend(message.getTopic(), message.getMessages(),
                    asyncSendCallback(message.getTaskId(), message.getTopic()));
        }
    }

    /**
     * 单向发送消息,不关心返回结果,容易消息丢失,适合日志收集、不精确统计等消息发送;
     *
     * @param message 消息参数
     */
    public <T> void sendOneWay(RocketMQMessage<T> message) {
        sendOneWay(message, false);
    }

    /**
     * 单向消息 - 批量发送
     *
     * @param message 消息体
     * @param batch   是否为批量操作
     */
    public <T> void sendOneWay(RocketMQMessage<T> message, boolean batch) {
        log.info((batch ? "ROCKETMQ 单向消息发送 : [TaskId:{}] - [Topic:{}]"
                        : "ROCKETMQ 单向消息-批量发送 : [TaskId:{}] - [Topic:{}] - [MessageCount{}]"),
                message.getTaskId(), message.getTopic(), message.getMessages().size());
        if (StringUtils.isNotBlank(message.getHashKey())) {
            if (batch) {
                message.getMessages().
                        forEach(msg -> rocketMQTemplate.sendOneWayOrderly(message.getTopic(), msg, message.getHashKey()));
            } else {
                rocketMQTemplate.sendOneWayOrderly(message.getTopic(), message.getMessage(), message.getHashKey());
            }
        } else {
            if (batch) {
                message.getMessages().forEach(msg -> rocketMQTemplate.sendOneWay(message.getTopic(), msg));
            } else {
                rocketMQTemplate.sendOneWay(message.getTopic(), message.getMessage());
            }
        }
    }
}

定义RocketMQ消费者

import com.yiyan.study.constants.MQConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

/**
 * MQ消息监听
 */
@Component
@Slf4j
@RocketMQMessageListener(topic = MQConfig.EVENT_TOPIC,
        consumerGroup = MQConfig.EVENT_CONSUMER_GROUP)
public class MQListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        log.info("MQListener 接收消息 : {}", message);
    }
}

定义测试类发送消息

import cn.hutool.core.thread.ThreadUtil;
import com.yiyan.study.constants.MQConfig;
import com.yiyan.study.domain.RocketMQMessage;
import com.yiyan.study.utils.RocketMQService;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

/**
 * MQ测试
 */
@SpringBootTest
public class MQTest {

    @Resource
    private RocketMQService rocketMQService;

    @Test
    public void sendMessage() {
        int count = 1;
        while (count <= 50) {
            rocketMQService.syncSend(RocketMQMessage.builder()
                    .topic(MQConfig.EVENT_TOPIC)
                    .message(count++)
                    .build());
        }
        // 休眠等待消费消息
        ThreadUtil.sleep(2000L);
    }
}

测试

springboot3-RocketMQ

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/274490.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【动画视频生成】

转自&#xff1a;机器之心 动画视频生成这几天火了&#xff0c;这次 NUS、字节的新框架不仅效果自然流畅&#xff0c;还在视频保真度方面比其他方法强了一大截。 最近&#xff0c;阿里研究团队构建了一种名为 Animate Anyone 的方法&#xff0c;只需要一张人物照片&#xff0…

观察者模式概述

观察者模式,它用于建立一种对象与对象之间的依赖关系&#xff0c; 一个对象发生改变将自动通知其他对象&#xff0c; 其他对象将相应做出反应。在观察者模式种&#xff0c;发生改变的对象称为观察目标&#xff0c; 而被通知的对象称为观察者&#xff0c;一个观察目标可以对应多…

【SD】IP-Adapter 进阶 - 画风融合【3】

生成图片大小&#xff1a;1024x576 将图片一的画风转到图片2中。 测试一&#xff1a; control-1:IP-Adapter 导入图片1 仅调整大小&#xff08;拉伸&#xff09; control-2:canny 导入图片2 仅调整大小&#xff08;拉伸&#xff09; best qualit…

FPGA高端项目:SDI 视频+音频编解码,提供工程源码和技术支持

目录 1、前言免责声明 2、相关方案推荐我这里已有的 GT 高速接口解决方案我目前已有的SDI编解码方案 3、设计思路和框架设计框图GV8601A均衡EQGTX 时钟要求GTX 调用与控制SMPTE SD/HD/3G-SDISMPTE SD/HD/3G-SDI 接收SMPTE SD/HD/3G-SDI 发送 SDI 视频接收数据处理SDI 音频接收-…

Mysql实时数据同步工具Alibaba Canal 使用

目录 Mysql实时数据同步工具Alibaba Canal 使用Canal是什么&#xff1f;工作原理重要版本更新说明 环境准备安装Canalwindow Java : Canal Client 集成依赖编码 工作流程其他学习canal资料 个人主页: 【⭐️个人主页】 需要您的【&#x1f496; 点赞关注】支持 &#x1f4af; M…

2024美赛数学建模思路A题B题C题D题E题F题思路汇总 选题分析

文章目录 1 赛题思路2 美赛比赛日期和时间3 赛题类型4 美赛常见数模问题5 建模资料 1 赛题思路 (赛题出来以后第一时间在CSDN分享) https://blog.csdn.net/dc_sinor?typeblog 2 美赛比赛日期和时间 比赛开始时间&#xff1a;北京时间2024年2月2日&#xff08;周五&#xff…

excel 函数技巧

1&#xff1a;模糊查询 LOOKUP(1,0/FIND(F1062,Sheet1!C$2:Sheet1!C$9135),Sheet1!B$2:Sheet1!B$9135) 函数含义&#xff1a;寻找F列1062行和sheet1中的C2行到C9135行进行模糊查询&#xff0c;返回该行对应的B2行到B9135行的结果。未查到返回结果0 函数公式&#xff1a; LO…

leetcode贪心算法题总结(一)

此系列分三章来记录leetcode的有关贪心算法题解&#xff0c;题目我都会给出具体实现代码&#xff0c;如果看不懂的可以后台私信我。 本章目录 1.柠檬水找零2.将数组和减半的最少操作次数3.最大数4.摆动序列5.最长递增子序列6.递增的三元子序列7.最长连续递增序列8.买卖股票的最…

设计模式-过滤器模式

设计模式专栏 模式介绍模式特点应用场景Java中的过滤器介绍代码示例Java实现过滤器模式Python实现过滤器模式 过滤器模式在spring中的应用 模式介绍 过滤器模式是一种设计模式&#xff0c;它允许开发人员使用不同的标准来过滤一组对象。这种模式是通过运算逻辑以解耦的方式将它…

XIAO ESP32S3之物体检测加入视频流

一、前言 由于XIAO ESP32S3开发套件没有显示屏配件&#xff0c;因此加入http视频流功能&#xff0c;可通过浏览器请求ESP32S3上的视频流。 二、思路 1、XIAO ESP32S3启动后通过wifi连接到AP&#xff1b; 2、启动http服务器&#xff0c;注册get_mjpeg处理函数&#xff1b; 3…

2023年中职“网络安全”——B-5:网络安全事件响应(Server2216)

B-5&#xff1a;网络安全事件响应 任务环境说明&#xff1a; 服务器场景&#xff1a;Server2216&#xff08;开放链接&#xff09; 用户名:root密码&#xff1a;123456 1、黑客通过网络攻入本地服务器&#xff0c;通过特殊手段在系统中建立了多个异常进程&#xff0c;找出启…

【Pytorch】学习记录分享8——PyTorch自然语言处理基础-词向量模型Word2Vec

【Pytorch】学习记录分享7——PyTorch自然语言处理基础-词向量模型Word2Vec 1. 词向量模型Word2Vec)1. 如何度量这个单词的&#xff1f;2.词向量是什么样子&#xff1f;3.词向量对应的热力图&#xff1a;4.词向量模型的输入与输出![在这里插入图片描述](https://img-blog.csdni…

Java面试题及答案汇总来啦!快来领取

Java面试题及答案汇总来啦&#xff01;快来领取 还有不到两个月就要过年了&#xff0c;过完年紧接着“金三银四”招聘热季就要到了&#xff0c;在过年期间只想着吃吃喝喝玩玩&#xff0c;这习是学不了一点。那就趁着过年前这段时间开始恶补Java面试题&#xff0c;实现弯道超车吧…

ArkTS基本概念装饰器

目录 ArkTS基本概念 装饰器汇总 ArkTS基本概念 ArkTS是HarmonyOS的主力应用开发语言。 它在TypeScript&#xff08;简称TS&#xff09;的基础上&#xff0c;匹配ArkUI框架&#xff0c;扩展了声明式UI、状态管理等相应的能力&#xff0c;让开发者以更简洁、更自然的方式开发跨…

FTP简介FTP服务器的搭建【虚拟机版】以及计算机端口的介绍

目录 一. FTP简介 二. FTP服务器的搭建【虚拟机Windows2012版】 1. 启用防火墙 2. 打开服务器管理器➡工具➡计算机管理 3. 选择本地用户与组➡新建组 4. 给组命名&#xff0c;输入描述&#xff0c;点击创建 5. 新建用户&#xff0c;设置用户名称&#xff0c;添加描述&a…

立体匹配算法(Stereo correspondence)SGM

SGM(Semi-Global Matching)原理&#xff1a; SGM的原理在wiki百科和matlab官网上有比较详细的解释&#xff1a; wiki matlab 如果想完全了解原理还是建议看原论文 paper&#xff08;我就不看了&#xff0c;懒癌犯了。&#xff09; 优质论文解读和代码实现 一位大神自己用c实现…

IntelliJ IDEA [插件 MybatisX] mapper和xml间跳转

文章目录 1. 安装插件2. 如何使用3. 主要功能总结 MybatisX 是一款为 IntelliJ IDEA 提供支持的 MyBatis 开发插件 它通过提供丰富的功能集&#xff0c;大大简化了 MyBatis XML 文件的编写、映射关系的可视化查看以及 SQL 语句的调试等操作。本文将介绍如何安装、配置和使用 In…

redis 三主六从高可用docker(不固定ip)

redis集群(cluster)笔记 redis 三主三从高可用集群docker swarm redis 三主六从高可用docker(不固定ip) 此博客解决&#xff0c;redis加入集群后&#xff0c;是用于停掉后重启&#xff0c;将nodes.conf中的旧的Ip替换为新的IP&#xff0c;从而达到不会因为IP变化导致集群无法…

StackOverflowError的JVM处理方式

背景&#xff1a; 事情来源于生产的一个异常日志 Caused by: java.lang.StackOverflowError: null at java.util.stream.Collectors.lambda$groupingBy$45(Collectors.java:908) at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) at java.util.ArrayL…

阿里云 ACK 云上大规模 Kubernetes 集群高可靠性保障实战

作者&#xff1a;贤维 马建波 古九 五花 刘佳旭 引言 2023 年 7 月&#xff0c;阿里云容器服务 ACK 成为首批通过中国信通院“云服务稳定运行能力-容器集群稳定性”评估的产品&#xff0c; 并荣获“先进级”认证。随着 ACK 在生产环境中的采用率越来越高&#xff0c;稳定性保…
最新文章