RocketMQ—RocketMQ集成SpringBoot

RocketMQ—RocketMQ集成SpringBoot

新建生产者的boot项目和消费者的boot项目,pom文件重点如下:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.2.2</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>2.0.25</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
            <configuration>
                <excludes>
                    <exclude>
                        <groupId>org.projectlombok</groupId>
                        <artifactId>lombok</artifactId>
                    </exclude>
                </excludes>
            </configuration>
        </plugin>
    </plugins>
</build>

02-boot-producer和03-boot-consumer分别对应生产者和消费者。

项目结构

生产者

生产者yml文件如下:

rocketmq:
    name-server: 地址:端口
    producer:
        group: boot-producer-group

同步发送消息

生产者同步发送消息的代码如下:

@SpringBootTest
class Rocketmq02BootProducerApplicationTests {
    //注入rocketMQTemplate

   	@Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Test
    public void producer(){
        rocketMQTemplate.syncSend("bootTestTopic","这是boot的一个消息");
    }

}

运行完毕看面板如下:

面板

发送异步消息

// 异步
rocketMQTemplate.asyncSend("bootAsyncTestTopic", "我是boot的一个异步消息", new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        System.out.println("成功");
    }

    @Override
    public void onException(Throwable throwable) {
        System.out.println("失败" + throwable.getMessage());
    }
});

发送单向消息

rocketMQTemplate.sendOneWay("bootOnewayTopic", "单向消息");

延迟消息

// 延迟消息
Message<String> msg = MessageBuilder.withPayload("我是一个延迟消息").build();
rocketMQTemplate.syncSend("bootMsTopic", msg, 3000, 3); //第三个参数表示连接消息队列的超时时间,第四个参数表示延时等级

顺序消息

MSGModel类如下

@Data
@AllArgsConstructor
@NoArgsConstructor
public class MsgModel {

    private String orderSn;
    private Integer userId;
    private String desc; // 下单 短信 物流

}

发送顺序消息的生产者如下:

//发送者放 需要将一组消息 都发在同一个队列中去  消费者 需要单线程消费
List<MsgModel> msgModels = Arrays.asList(
    new MsgModel("qwer", 1, "下单"),
    new MsgModel("qwer", 1, "短信"),
    new MsgModel("qwer", 1, "物流"),
    new MsgModel("zxcv", 2, "下单"),
    new MsgModel("zxcv", 2, "短信"),
    new MsgModel("zxcv", 2, "物流")
);
msgModels.forEach(msgModel -> {
    // 发送  一般都是以json的方式进行处理
    rocketMQTemplate.syncSendOrderly("bootOrderlyTopic", JSON.toJSONString(msgModel), msgModel.getOrderSn());
    //第二个参数表示消息内容	第三个参数表示hashKey
});

带标签的消息

@Test
void tagKeyTest() throws Exception {
    rocketMQTemplate.syncSend("bootTagTopic:tagA", "我是一个带tag的消息");
}

带key的消息

@Test
void tagKeyTest() throws Exception {
   // key是写带在消息头的
    Message<String> message = MessageBuilder.withPayload("我是一个带key的消息")
        .setHeader(RocketMQHeaders.KEYS, "key-id-1")
        .build();
    rocketMQTemplate.syncSend("bootKeyTopic", message);

}

消费者

yml配置文件如下:

server:
  port: 8890
rocketmq:
  name-server: 地址:端口

简单消费者

消费者代码如下

@Component
@RocketMQMessageListener(topic = "bootTestTopic",consumerGroup = "boot-test-consumer-group")
public class ASimpleMsgListener implements RocketMQListener<MessageExt> {
    //如果泛型指定固定类型,消息体就是我们的参数
    //MessageExt 是消息所有内容,可以拿到所有内容

    /**
     * 这个方法就是消费消息的方法
     * 只要没有报错,就签收了
     * 如果报错了,就是拒收,就会重试
     * @param message 是消息内容
     */
    @Override
    public void onMessage(MessageExt message) {
        System.out.println(new String(message.getBody()));
    }
}

运行结果

顺序消息的消费者

@Component
@RocketMQMessageListener(topic = "bootOrderlyTopic",
        consumerGroup = "boot-orderly-consumer-group",
        consumeMode = ConsumeMode.ORDERLY, // 顺序消费模式 单线程
        maxReconsumeTimes = 5 // 消费重试的次数
)
public class BOrderlyMsgListener implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt message) {
        MsgModel msgModel = JSON.parseObject(new String(message.getBody()), MsgModel.class);
        System.out.println(msgModel);
    }
}

带tag的消费者

@Component
@RocketMQMessageListener(topic = "bootTagTopic",
        consumerGroup = "boot-tag-consumer-group",
        selectorType = SelectorType.TAG,// tag过滤模式
        selectorExpression = "tagA || tagB"
//        selectorType = SelectorType.SQL92,// sql92过滤模式  这种一般不用,这种默认没有开启,需要在sql92 
                         //需要在broker.conf配置文件中开启enbalePropertyFilter=true
//        selectorExpression = "a in (3,5,7)" // broker.conf中开启enbalePropertyFilter=true
)
public class CTagMsgListener implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt message) {
        System.out.println(new String(message.getBody()));
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/418413.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

[RoarCTF 2019]Easy Calc

这题考查的是: 字符串解析特性目录读取文件内容读取 字符串解析特性详解&#xff1a;PHP字符串解析特性 &#xff08;$GET/$POST参数绕过&#xff09;&#xff08;含例题 buuctf easycalc&#xff09;_参数解析 绕过-CSDN博客 ascii码查询表&#xff1a;ASCII 表 | 菜鸟工具 …

ubuntu 20.04下查找pycharm-vscode-qtcreator安装路径-查快捷方式路径-pycharm要以root权限运行脚本

环境&#xff1a;ubuntu20.04 两个用户&#xff1a;root ips3000 qtcreator和pycharm是用户ips3000从软件商店中安装的。1.快捷键和启动方式注意关键词&#xff0c;名字不一样。 桌面快捷方式 启动方式 Pycharm Pycharm-community.desktop Pycharm.sh Qtcreator **qtcrea…

程序媛的mac修炼手册-- Node.js入门篇

最近因为参与一个微信小程序的开发&#xff0c;开始摸索JavaScript。期间&#xff0c;需要基于Node.js安装微信开发工具的依赖项&#xff0c;所以又顺带学习了Node.js的包管理工具npm&#xff08;Node Package Manager&#xff09;。不过&#xff0c;之前看到国外的全栈大佬​​…

GEE入门篇|图像处理(二):在Earth Engine中进行波段计算

目录 波段计算 1.NDVI的计算 2.NDVI 归一化差值的单次运算计算 3.使用 NDWI 的归一化差值 波段计算 许多指数可以使用 Earth Engine 中的波段运算来计算。 波段运算是对图像中两个或多个波段进行加、减、乘或除的过程。 在这里&#xff0c;我们将首先手动执行此操作&#x…

政安晨:【掌握AI的深度学习工具Keras API】(二)—— 【使用内置的训练循环和评估循环】

渐进式呈现复杂性&#xff0c;是指采用一系列从简单到灵活的工作流程&#xff0c;并逐步提高复杂性。这个原则也适用于模型训练。Keras提供了训练模型的多种工作流程。这些工作流程可以很简单&#xff0c;比如在数据上调用fit()&#xff0c;也可以很高级&#xff0c;比如从头开…

ShardingSphere inline表达式线程安全问题定位

ShardingSphere inline表达式线程安全问题定位 问题背景 春节期间发现 ShardingSphere 事务 E2E 偶发执行失败问题&#xff0c;并且每次执行失败需要执行很久&#xff0c;直到超时。最终定位发现 inline 表达式存在线程安全问题。本文记录定位并解决 inline 表达式线程安全问…

实验笔记之——Ubuntu20.04配置nvidia以及cuda并测试3DGS与SIBR_viewers

之前博文测试3DGS的时候一直用服务器进行开发&#xff0c;没有用过笔记本&#xff0c;本博文记录下用笔记本ubuntu20.04配置过程&#xff5e; 学习笔记之——3D Gaussian Splatting源码解读_3dgs运行代码-CSDN博客文章浏览阅读3.2k次&#xff0c;点赞34次&#xff0c;收藏62次…

编写科技项目验收测试报告需要注意什么?第三方验收测试多少钱?

科技项目验收测试是一个非常重要的环节&#xff0c;它对于确保科技项目的质量和可用性起着至关重要的作用。在项目完成后&#xff0c;进行科技项目验收测试可以评估项目的功能、性能和可靠性等方面&#xff0c;并生成科技项目验收测试报告&#xff0c;以提供给项目的相关方参考…

keil uv5 map文件解析

map参考博客&#xff1a;https://www.csdn.net/tags/MtjaYgwsMTY2NzUtYmxvZwO0O0OO0O0O.html 配置外部flash存储代码&#xff1a;https://strongerhuang.blog.csdn.net/article/details/51485903?spm1001.2101.3001.6650.4&utm_mediumdistribute.pc_relevant.none-task-bl…

使用 Helm 安装 极狐GitLab

本篇作者 徐晓伟 使用 Helm 简便快捷的部署与管理 极狐GitLab 前提条件 k8s 完成 helm 的配置 k8s 完成 ingress 的配置 内存至少 10G 演示环境是 龙蜥 Anolis 8.4&#xff08;即&#xff1a;CentOS 8.4&#xff09;最小化安装k8s 版本 1.28.2calico 版本 3.26.1nginx ingre…

Dockerfile(5) - CMD 指令详解

CMD 指定容器默认执行的命令 # exec 形式&#xff0c;推荐 CMD ["executable","param1","param2"] CMD ["可执行命令", "参数1", "参数2"...]# 作为ENTRYPOINT的默认参数 CMD ["param1","param…

高瓴张磊入籍新加坡,这代表了什么?

文&#xff5c;新熔财经 作者&#xff5c;显洋 这两天&#xff0c;海外媒体报道了中国投资大佬与企业家拿到新加坡永居的事儿。本来乏善可陈的文章&#xff0c;却因为一个人名的出现变得有趣起来——高瓴创始人张磊&#xff0c;一位曾经在国内如日中天&#xff0c;但今天鲜少…

论文阅读:2020GhostNet华为轻量化网络

创新&#xff1a;&#xff08;1&#xff09;对卷积进行改进&#xff08;2&#xff09;加残差连接 1、Ghost Module 1、利用1x1卷积获得输入特征的必要特征浓缩。利用1x1卷积对我们输入进来的特征图进行跨通道的特征提取&#xff0c;进行通道的压缩&#xff0c;获得一个特征浓…

解放设计师的创造力:免版的图片素材

title: 解放设计师的创造力&#xff1a;免版的图片素材 date: 2024/2/29 15:10:19 updated: 2024/2/29 15:10:19 tags: 版权无忧创意自由设计效率视觉提升广告设计UI/UX素材移动应用 在设计领域&#xff0c;设计师常常需要使用图片素材来增加作品的视觉效果。然而&#xff0c;…

Docker技术概论(1):Docker与虚拟化技术比较

Docker技术概论&#xff08;1&#xff09; Docker与虚拟化技术比较 - 文章信息 - Author: 李俊才 (jcLee95) Visit me at: https://jclee95.blog.csdn.netMy WebSite&#xff1a;http://thispage.tech/Email: 291148484163.com. Shenzhen ChinaAddress of this article:https:…

从 Flask 切到 FastAPI 后,起飞了!

我这几天上手体验 FastAPI&#xff0c;感受到这个框架易用和方便。之前也使用过 Python 中的 Django 和 Flask 作为项目的框架。Django 说实话上手也方便&#xff0c;但是学习起来有点重量级框架的感觉&#xff0c;FastAPI 带给我的直观体验还是很轻便的&#xff0c;本文就会着…

LeetCode34.在排序数组中查找元素的第一个和最后一个位置

题目 给你一个按照非递减顺序排列的整数数组 nums&#xff0c;和一个目标值 target。请你找出给定目标值在数组中的开始位置和结束位置。 如果数组中不存在目标值 target&#xff0c;返回 [-1, -1]。 你必须设计并实现时间复杂度为 O(log n) 的算法解决此问题。 示例 输入…

尚硅谷Java数据结构--希尔排序

插入排序的问题&#x1f388;&#xff1a; arr{2,3,4,5,6,0,9,7,8}; 当0作为插入元素的时候&#xff0c;其待插入下标与原下标相差很远&#xff0c;需要进行多次比较和移动。 希尔排序则是先将下标相差一定距离gap的元素分为一组&#xff0c;进行插入排序&#xff1b;再逐渐将距…

Flutter(四):SingleChildScrollView、GridView

SingleChildScrollView、GridView 遇到的问题 以下代码会报错: class GridViewPage extends StatefulWidget {const GridViewPage({super.key});overrideState<GridViewPage> createState() > _GridViewPage(); }class _GridViewPage extends State<GridViewPage&g…

Maven下载、安装、配置教程

maven是一个项目管理的工具&#xff0c;maven自身是纯java开发的&#xff0c;可以使用maven对java项目进行构建、依赖管理。 通常我们靠手动下载jar包引入项目中是非常浪费时间的&#xff0c;我们可以通过maven工具帮我们导入jar包提高开发效率。 第一步&#xff1a;下载Mave…
最新文章