spring boot 集成rocketMq + 基本使用

1. RocketMq基本概念

1. NameServer
每个NameServer结点之间是相互独立,彼此没有任何信息交互
启动NameServer。NameServer启动后监听端口,等待Broker、Producer、Consumer连接,
相当于一个路由控制中心。主要是用来保存topic路由信息,管理Broker
2. Broker
消息存储和中转角色,负责存储和转发消息
在启动时会向NameServer进行注册并且定时发送心跳包。心跳包中包含当前 Broker 信息
以及存储所有 Topic 信息。注册成功后,NameServer 集群中就有 Topic跟Broker 的映射关系。
3. topic : 一个消息的集合的名字
创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上,也可以在发送消息时自动创建Topic。
4. 生产者
生产者发送消息。启动时先从 NameServer 集群中的其中一台拉取到路由表,缓存到本地,
并从 NameServer 中获取当前发送的 Topic存在于哪些 Broker 上,
轮询从队列列表中选择一个队列(默认轮询)
5. 消费者
消费者跟其中一台NameServer建立连接,获取当前订阅Topic存在哪些Broker上,
然后直接跟Broker建立连接通道,然后开始消费消息

2. maven 引入starter

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>

3.yml配置

3.1 生产者yml 配置

rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    group: my-group
    # 发送消息超时时间
    send-message-timeout: 5000
    # 发送消息失败重试次数
    retry-times-when-send-failed: 2
    retry-times-when-send-async-failed: 3 # 异步发送消息失败重试次数,默认2

3.2 消费者yml 配置

rocketmq:
  name-server: 127.0.0.1:9876
  consumer:
    topic: topic_test
    group: consumer_my-group

4.生产者发送消息

4.1 一般消息

@Resource
    private RocketMQTemplate rocketMQTemplate;

    /**
     *  一般消息
     * Topic 与 Tag 都是业务上用来归类的标识,区别在于 Topic 是一级分类,而 Tag 可以理解为是二级分类。
     * 使用 Tag 可以实现对 Topic 中的消息进行过滤。
     * **/
    @GetMapping("/send")
    public String send(){
        rocketMQTemplate.convertAndSend("topic_test", "Hello, World!");
        rocketMQTemplate.convertAndSend("topic_test:tagB","Hello, World222--tagB");
        return "rocketMq普通消息发送完成";
    }

4.2 顺序消息

/** 支持消费者按照发送消息的先后顺序获取消息 */
    @GetMapping("/send/orderly")
    public String sendOrder(){
        //发送顺序消息,参数:topic,消息,hashkey,相同hashkey发送至同一个队列
        rocketMQTemplate.syncSendOrderly("topic_test:tagA", MessageBuilder.withPayload("消息编号" + 1).build(),"queue");
        rocketMQTemplate.syncSendOrderly("topic_test:tagA", MessageBuilder.withPayload("消息编号" + 2).build(),"queue");
        return "rocketMq顺序-消息发送成功";
    }

4.3 同步消息

@GetMapping("/send/sync")
    public String sendMsg() {
        String message = "我是同步消息:" + LocalDateTime.now();
        SendResult result = rocketMQTemplate.syncSend("topic_test:tagA", MessageBuilder.withPayload(message).build());
        log.info("同步-消息发送成功:" + LocalDateTime.now());
        return "rocketMq 同步-消息发送成功:" + result.getSendStatus();
    }

4.4 异步消息

/** 发送异步消息 */
    @GetMapping("/send/async")
    public String asyncSendMsg(){
        String message = "我是异步消息:" + LocalDateTime.now();
        rocketMQTemplate.asyncSend("topic_test:tagA",message,new SendCallback() {

            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("发送成功 (后执行),SendStatus = {}",sendResult.getSendStatus());
            }

            @Override
            public void onException(Throwable throwable) {
                log.info("发送失败 (后执行)");
            }
        });
        return "rocketMq 异步-消息发送成功:" + LocalDateTime.now();
    }

 4.5 单向消息:一般用来发送日志等不重要的消息

@GetMapping("/send/oneWay")
    public String sendOneWayMessage() {
        String message =  "我是单向消息:"+LocalDateTime.now();
        this.rocketMQTemplate.sendOneWay("topic_test:tagA", message);
        log.info("单向发送消息完成:message = {}", message);
        return "rocketMq 单向-消息发送成功:" + LocalDateTime.now();
    }

 

4.6 延时消息

/** 延时消息 */
    @GetMapping("/sendDelay")
    public String sendDelay(){
        String message = "我是延时消息:" + LocalDateTime.now();
        // 第四个参数为延时级别,分为1-18:1、5、10、30、1m、2m、3m、...10m、20m、30m、1h、2h
        rocketMQTemplate.syncSend("topic_test:tagC", MessageBuilder.withPayload(message).build(), 3000, 2);
        return "rocketMq延时-消息发送成功";
    }

4.7 事务消息

4.7.1 事务消息发送代码

/** 事务消息 */
    @GetMapping("/send/transaction/{id}")
    public void sendTransactionMessage(@PathVariable("id") Integer id){
        //发送事务消息:采用的是sendMessageInTransaction方法,返回结果为TransactionSendResult对象,该对象中包含了事务发送的状态、本地事务执行的状态等
        //参数一:topic;参数二:消息
        // 事务id
        String[] tags = {"tagA", "tagB", "tagC"};
        int i = id%3;

        String transactionId = UUID.randomUUID().toString();
        String message = "我是事务消息:" + LocalDateTime.now();
        TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("topic_test:" + tags[i]
                , MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.TRANSACTION_ID,transactionId).build(),
                // 给本地事务的参数
                2);
        //发送状态
        String sendStatus = result.getSendStatus().name();
        //本地事务执行状态
        String localState = result.getLocalTransactionState().name();
        log.info("发送状态:"+sendStatus+";本地事务执行状态"+localState);


    }

4.7.2 继承 RocketMQLocalTransactionListener

@Slf4j
@RocketMQTransactionListener
public class MyTransactionListener implements RocketMQLocalTransactionListener {


    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(org.springframework.messaging.Message message, Object o) {

        MessageHeaders headers = message.getHeaders();
        //获取事务ID
        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
        log.info("执行本地事务 ,transactionId is {}, orderId is {}",transactionId, message.getHeaders().get("rocketmq_TAGS"));
        try{
            //模拟网络波动
            Thread.sleep(3000);
            /***
             * 首先发送一个半消息(half message),这个消息不会立即投递给消费者;然后执行本地事务(比如数据库操作)。
             * 根据本地事务的执行结果,决定是提交(commit)还是回滚(rollback)这个消息。
             * 如果本地事务成功,消息会被提交并发送给消费者;
             * 如果失败,消息会被回滚,消费者不会接收到这个消息
             */

        }catch (Exception e){
            return RocketMQLocalTransactionState.ROLLBACK;
        }

        // 执行本地事务
        String tag = String.valueOf(message.getHeaders().get("rocketmq_TAGS"));
        if (StringUtils.equals("tagA", tag)){
            //这里只讲TAGA消息提交,状态为可执行
            return RocketMQLocalTransactionState.COMMIT;
        }else if (StringUtils.equals("tagB", tag)) {
            return RocketMQLocalTransactionState.ROLLBACK;
        } else if (StringUtils.equals("tagC",tag)) {
            return RocketMQLocalTransactionState.UNKNOWN;
        }

        log.info("事务提交,消息正常处理: " + LocalDateTime.now());
        //执行成功,可以提交事务
        return RocketMQLocalTransactionState.COMMIT;
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(org.springframework.messaging.Message message) {
        MessageHeaders headers = message.getHeaders();
        //获取事务ID
        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
        log.info(transactionId + ",消息回查"+ LocalDateTime.now());
        return RocketMQLocalTransactionState.ROLLBACK;
    }
}

tagA、tagB、tagC 三种事务消息,只有Commit的才能发送到broker 

 

 5. 消费端

/**
 * topic指定消费的主题,consumerGroup指定消费组,
 * 一个主题可以有多个消费者组,一个消息可以被多个不同的组的消费者都消费
 *  2.实现RocketMQListener接口
 *  如果想拿到消息的其他参数可以写成MessageExt
 *  selectorExpression = "tagA || tagB" 指定tag 的消费
 */
@Service
@Slf4j
@RocketMQMessageListener(topic = "${rocketmq.consumer.topic}", consumerGroup = "${rocketmq.consumer.group}")
public class RocketMqConsumer implements RocketMQListener<String>{

    @Override
    public void onMessage(String s) {
        log.info("topic_test: 所有的收到消息:"+s);
    }

}

6.广播消费模式

生产端是一样的,但是消费端需要增加一个参数

messageModel:设置消费模式,取值范围CLUSTERING(集群消费)、BROADCASTING(广播消费)
@Service
@Slf4j
@RocketMQMessageListener(topic = "${rocketmq.consumer.topic}", consumerGroup = "${rocketmq.consumer.group}", messageModel = MessageModel.BROADCASTING)
public class RocketMqConsumer implements RocketMQListener<String>{

    @Override
    public void onMessage(String s) {
        log.info("consumer2---topic_test: 所有的收到消息:"+s);
    }

}

// 第2个消费者类,他们都是一样的代码,
//为了表示广播,就是一个消息,会被这两个消费者消费

@Service
@Slf4j
@RocketMQMessageListener(topic = "${rocketmq.consumer.topic}", consumerGroup = "${rocketmq.consumer.group}", messageModel = MessageModel.BROADCASTING)
public class RocketMqConsumer implements RocketMQListener<String>{

    @Override
    public void onMessage(String s) {
        log.info("consumer1--topic_test: 所有的收到消息:"+s);
    }

}

7.其他

RocketMQ 通过消费者组(Consumer Group)来维护不同消费者的消费进度。每个消费者组都有一个消费进度(offset),用于标记该组下的消费者在某个主题(Topic)和队列(Queue)上已经消费到的位置。所以:不同的消费者组会被视为不同的消费者

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/536197.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

知识图谱与人工智能:携手共进

知识图谱与人工智能&#xff1a;携手共进 一、引言&#xff1a;知识图谱与人工智能的融合 在这个数据驱动的时代&#xff0c;知识图谱与人工智能&#xff08;AI&#xff09;之间的融合不仅是技术发展的必然趋势&#xff0c;也是推动各行各业创新的关键。知识图谱&#xff0c;作…

windows下pycharm中配置conda虚拟环境

目录 一&#xff1a;背景 二&#xff1a;安装conda环境 三&#xff1a;pycharm配置环境 四&#xff1a;注意问题 一&#xff1a;背景 在使用python的过程中&#xff0c;我们可能需要在一个windows环境中创建多个版本的python和安装不同的库去做一些开发任务。 使用conda&a…

TQ15EG开发板教程:在MPSOC上运行ADRV9371

首先需要在github上下载两个文件&#xff0c;本例程用到的文件以及最终文件我都会放在网盘里面&#xff0c; 地址放在本文最后。首先在github搜索hdl选择第一个&#xff0c;如下图所示 GitHub网址&#xff1a;https://github.com/analogdevicesinc/hdl/releases 点击releases…

Docker入门实战教程

文章目录 Docker引擎的安装Docker比vm虚拟机快 Docker常用命令帮助启动类命令镜像命令docker imagesdocker searchdocker pulldocker system dfdocker rmi 容器命令redis前台交互式启动redis后台守护式启动Nginx容器运行ubuntu交互式运行tomcat交互式运行对外暴露访问端口 Dock…

Java基础07--多线程-网络编程-Java高级

一、多线程 1.认识多线程 ①线程 ②多线程 2.创建线程方式 ①方式一&#xff1a;继承Thread类 1.让子类继承Thread线程类 2.重写run方法&#xff0c;就是这个线程执行会执行的操作。 3.创建继承Thread的子类对象就代表一个线程 4.启动线程:.start()-自动执行run方法 注意&am…

绝地求生:PUBG×杜卡迪联名上线!参与投稿评论赢取精美好礼

PUBG杜卡迪联名活动游戏内现已正式上线&#xff01;我们诚邀与您一起在开拓未知战场和书写新历史的过程中&#xff0c;与杜卡迪一同实现您的极速梦想&#xff01; 在本次的杜卡迪工坊中&#xff0c;更是包含了具备标志性红色在内的6种颜色供您自由选择&#xff0c;一起自由驰骋…

创作一首音乐需要多长时间?网易云音乐内测AI音乐生成工具『网易天音』

大家好&#xff0c;我是木易&#xff0c;一个持续关注AI领域的互联网技术产品经理&#xff0c;国内Top2本科&#xff0c;美国Top10 CS研究生&#xff0c;MBA。我坚信AI是普通人变强的“外挂”&#xff0c;所以创建了“AI信息Gap”这个公众号&#xff0c;专注于分享AI全维度知识…

视频基础学习五——视频编码基础二(编码参数帧、GOP、码率等)

系列文章目录 视频基础学习一——色立体、三原色以及像素 视频基础学习二——图像深度与格式&#xff08;RGB与YUV&#xff09; 视频基础学习三——视频帧率、码率与分辨率 视频基础学习四——视频编码基础一&#xff08;冗余信息&#xff09; 视频基础学习五——视频编码基础…

KKVIEW远程远程访问家里电脑

远程访问家里电脑&#xff1a;简易指南与价值所在 在数字化时代&#xff0c;电脑已成为我们日常生活和工作中不可或缺的工具。有时&#xff0c;我们可能在外出时急需访问家中电脑里的某个文件或应用&#xff0c;这时&#xff0c;远程访问家里电脑就显得尤为重要。本文将简要介…

SQLite---调试提示(十九)

返回&#xff1a;SQLite—系列文章目录 上一篇:SQLite Android 绑定&#xff08;十八&#xff09; 下一篇&#xff1a;从 SQLite 3.4.2 迁移到 3.5.0&#xff08;二十&#xff09; ​ 以下是 SQLite 开发人员跟踪、检查和了解 核心 SQLite 库。 这些技术旨在帮助理解 核…

从 SQLite 3.5.9 迁移到 3.6.0(二十一)

返回&#xff1a;SQLite—系列文章目录 上一篇&#xff1a;从 SQLite 3.4.2 迁移到 3.5.0&#xff08;二十&#xff09; 下一篇&#xff1a;SQLite—系列文章目录 ​SQLite 版本 3.6.0 &#xff08;2008-07-16&#xff09; 包含许多更改。按照惯例 SQLite项目&#xff…

中移物联网 OneOS 操作系统环境搭建和工程创建

一、官网 OneOS Lite是中国移动针对物联网领域推出的轻量级操作系统&#xff0c;具有可裁剪、跨平台、低功耗、高安全等特点&#xff0c;支持ARM Cortex-A和 Cortex-M、MIPS、RISC-V等主流芯片架构&#xff0c;兼容POSIX、CMSIS等标准接口&#xff0c;支持Javascript、MicroPyt…

Ubuntu下配置Android NDK环境

Android-NDK的下载 下载Android-NDK wget -c http://dl.google.com/android/ndk/android-ndk-r10e-linux-x86_64.bin 执行bin文件&#xff08;即解压&#xff09; ./android-ndk-r10c-linux-x86_64.bin Android-NDK的配置 要想使用Android-NDK&#xff0c;还需要进行环境变量…

C++ stl容器string的底层模拟实现

目录 前言&#xff1a; 1.成员变量 2.构造函数与拷贝构造函数 3.析构函数 4.赋值重载 5.[]重载 6.比较关系重载 7.reserve 8.resize 9.push_back,append和重载 10.insert 11.erase 12.find 14.迭代器 15.流插入&#xff0c;流提取重载 16.swap 17.c_str 18.完…

Testng测试框架(6)--@Factory动态地创建测试类的实例

工厂允许您动态地创建测试。例如&#xff0c;假设您想创建一个测试方法&#xff0c;该方法将多次访问网站上的某个页面&#xff0c;并且您希望使用不同的值来调用它。 public class TestWebServer {Test(parameters { "number-of-times" })public void accessPage(…

【C++题解】1605. 求一个两位数的个位和十位的和

问题&#xff1a;1605. 求一个两位数的个位和十位的和 类型&#xff1a;基本运算、拆位求解。 题目描述&#xff1a; 从键盘读入一个两位的整数 n &#xff0c;请求出这个两位整数个位和十位的和是多少&#xff1f; 输入&#xff1a; 一个两位的整数 n 。 输出&#xff1a…

hbase基础shell用法

HBase中用create命令创建表&#xff0c;具体如下&#xff1a; create student,Sname,Ssex,Sage,Sdept,course 此时&#xff0c;即创建了一个“student”表&#xff0c;属性有&#xff1a;Sname,Ssex,Sage,Sdept,course。因为HBase的表中会有一个系统默认的属性作为行键&#x…

【MySQL】事务篇

SueWakeup 个人主页&#xff1a;SueWakeup 系列专栏&#xff1a;学习技术栈 个性签名&#xff1a;保留赤子之心也许是种幸运吧 目录 本系列专栏 1. 什么是事务 2. 事务的特征 原子性&#xff08;Atomicity&#xff09; 一致性&#xff08;Consistency&#xff09; 隔离性&…

255Mesh 无线lora模块详细配置和测试

一、型号介绍 字符含义&#xff1a; E&#xff1a;终端 N&#xff1a;节点&#xff08;node&#xff09; G&#xff1a;网关 &#xff08;gateway&#xff09; 官网淘宝介绍 注意&#xff1a;组网必须配网关。 二、功能配置 软件界面 1.网络参数 网络参数包括网络 ID&a…

华为OD技术面试-有序数组第K最小值

背景 2024-03-15华为od 二面&#xff0c;记录结题过程 有序矩阵中第 K 小的元素 - 力扣&#xff08;LeetCode&#xff09; https://leetcode.cn/problems/kth-smallest-element-in-a-sorted-matrix/submissions/512483717/ 题目 给你一个 n x n 矩阵 matrix &#xff0c;其…
最新文章