RocketMQ(四):重复消费、消息重试、死信消息的解决方案

RocketMQ系列文章

RocketMQ(一):基本概念和环境搭建

RocketMQ(二):原生API快速入门

RocketMQ(三):集成SpringBoot

RocketMQ(四):重复消费、消息重试、死信消息的解决方案


目录

  • 一、重复消费
    • 1、消息重复的情况
    • 2、MySql唯一索引
    • 3、redis分布式锁
  • 二、消息重试
    • 1、生产者重试
    • 2、消费者重试
  • 三、死信消息
  • 四、消费堆积

一、重复消费

1、消息重复的情况

  • 发送时消息重复
    • 当一条消息已被成功发送到服务端并完成持久化
    • 此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败
    • 如果此时生产者意识到消息发送失败并尝试再次发送消息
    • 消费者后续会收到两条内容相同并且 Message ID 也相同的消息
  • 投递时消息重复
    • 消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断
    • 为了保证消息至少被消费一次
    • 消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息
    • 消费者后续会收到两条内容相同并且 Message ID 也相同的消息
  • 负载均衡时消息重复(包括但不限于网络抖动、Broker 重启以及订阅方应用重启)
    • 当消息队列RocketMQ的Broker 或客户端重启、扩容或缩容
    • 会触发 Rebalance,此时消费者可能会收到重复消息

2、MySql唯一索引

  • 因为 Message ID 有可能出现冲突(重复)的情况
  • 所以用业务唯一标识作为幂等处理的关键依据

生产者

  • 相同的唯一业务编号,发送两次
@Test
void test1() {
    // 业务唯一编号
    String key = "1300";
    Message<String> message = MessageBuilder
            .withPayload("我是一个带key的消息")
            .setHeader(RocketMQHeaders.KEYS, key)
            .build();
    // 相同的key发送两次
    rocketMQTemplate.syncSend("repeatedTopic", message);
    rocketMQTemplate.syncSend("repeatedTopic", message);
    System.out.println("发送完成");
}

消费者

  • 创建user表结构,num_no字段设置为唯一索引
  • 当唯一的业务id插入唯一索引的num_no字段
  • 只能插入一次,第二次会报唯一索引重复
  • 当获取到重复数据,直接返回即可,就不在执行业务代码
@Component
@RocketMQMessageListener(topic = "repeatedTopic", consumerGroup = "repeated-consumer-group")
public class RepeatMysqlListener implements RocketMQListener<MessageExt> {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Override
    public void onMessage(MessageExt message) {
        // 唯一的业务id(如果是相同的两次请求,则keys值一定相同)
        String messageKey = message.getKeys();
        try {
            jdbcTemplate.execute("INSERT INTO `user` (`num_no`,`name`) VALUES('" + messageKey + "','名称')");
        } catch (DataAccessException e) {
            // 该message可能是重复的
            if (e instanceof DuplicateKeyException) {
                System.out.println(messageKey+"的业务编号数据重复了,直接return,就算消费了此重复数据");
                return;
            }
        }
        // 获取消息执行业务
        System.out.println("获取消息内容:【" + new String(message.getBody()) + "】执行业务");
    }
}

执行结果:

发送完成
获取消息内容:【我是一个带key的消息】执行业务
1300的业务编号数据重复了,直接return,就算消费了此重复数据

3、redis分布式锁

Redisson分布式锁配置

@Configuration
public class RedissonConfig {
    @Bean
    public Redisson redisson() {
        Config config = new Config();
        config.useSingleServer()
                .setAddress("redis://localhost:6390")
                .setPassword("xc@1234")
                .setDatabase(0);
        return (Redisson) Redisson.create(config);
    }
}

生产者

@Test
void test1() {
    // 业务唯一编号
    String key = "1400";
    Message<String> message = MessageBuilder
            .withPayload("我是一个带key的消息")
            .setHeader(RocketMQHeaders.KEYS, key)
            .build();
    // 相同的key发送两次
    rocketMQTemplate.syncSend("repeatedTopic", message);
    rocketMQTemplate.syncSend("repeatedTopic", message);
    System.out.println("发送完成");
}

消费者

  • 因为消费者是多线程并发消费
  • 如果遇到相同的唯一业务id,则上锁依次执行
  • 将执行过的唯一业务id放入redis
  • 下次相同业务id进入与redis集合对比,存在则证明已经执行过了
@Component
@RocketMQMessageListener(topic = "repeatedTopic", consumerGroup = "repeated-consumer-group")
public class RepeatRedisListener implements RocketMQListener<MessageExt> {

    @Autowired
    private Redisson redisson;

    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    @Override
    public void onMessage(MessageExt message) {
        // 唯一的业务id(如果是相同的两次请求,则keys值一定相同)
        String messageKey = message.getKeys();
        RLock redissonLock = redisson.getLock(messageKey);
        try {
            // 添加redisson锁并实现锁续命功能
            // 默认过期时间是30s,每10s触发一次锁续命功能
            redissonLock.lock();
            List<String> topicBusinessKeyList = stringRedisTemplate.opsForList().range("topicBusinessKey",0,-1);
            if ( ObjectUtils.isNotEmpty(topicBusinessKeyList) && topicBusinessKeyList.contains(messageKey)) {
                System.out.println(messageKey + "的业务编号数据重复了,直接return,就算消费了此重复数据");
                return;
            }
            // 获取消息执行业务
            System.out.println("获取消息内容:【" + new String(message.getBody()) + "】执行业务");
            // 讲businessKey存入redis
            stringRedisTemplate.opsForList().rightPush("topicBusinessKey", messageKey);
        } finally {
            redissonLock.unlock();
        }
    }
}

执行结果:

发送完成
获取消息内容:【我是一个带key的消息】执行业务
1400的业务编号数据重复了,直接return,就算消费了此重复数据

二、消息重试

1、生产者重试

  • 可以分别设置同步消息和异步消息发送的重试次数
  • 广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息
  • 默认重试间隔时间为 1 秒,次数为2次
  • 发送消息超时时间默认3000毫秒,如果因为超时,那么便不再尝试重试

application.yml配置文件设置

在这里插入图片描述

2、消费者重试

  • 默认的重试间隔:10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
  • 默认多线程模式下,重试16次,设置超过 16 次的重试时间间隔均为每次 2 小时
  • 某条消息在一直消费失败的前提下,将会在接下来的 4 小时 46 分钟之内进行 16 次重试,超过这个时间范围消息将不再重试投递
  • 在单线程的顺序模式下,重试Integer.MAX_VALUE次,间隔1秒

消费者配置

  • 实现RocketMQPushConsumerLifecycleListener接口,从prepareStart方法中获取消费者并设置它
  • 消息最大重试次数的设置对相同GroupID下的所有Consumer实例有效
@Component
@RocketMQMessageListener(topic = "retryTopic",
        consumerGroup = "retry-consumer-group"
)
public class RetryListener implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener {
    @Override
    public void onMessage(MessageExt message) {
        //获取消息的重试次数
        System.out.println(message.getReconsumeTimes());
        System.out.println("消息内容:"+new String(message.getBody()));
    }

    @Override
    public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
        // 设置消费者重试次数
        defaultMQPushConsumer.setMaxReconsumeTimes(2);
        // 实例名称-控制面板可以看到
        defaultMQPushConsumer.setInstanceName("消费者1号");
    }
}

设置重试二次的执行结果:

在这里插入图片描述

三、死信消息

  • 当消费重试到达阈值以后,消息不会被投递给消费者了,而是进入了死信队列
  • 死信队列是死信Topic下分区数唯一的单独队列
  • 死信Topic名称为%DLQ%原消费者组名,死信队列的消息将不会再被消费

上一节的消费者重试两次后,就会将消息放入死信队列

在这里插入图片描述

处理死信消息方式一:

  • 监听死信队列处理消息
@Component
@RocketMQMessageListener(
        topic = "%DLQ%retry-consumer-group",
        consumerGroup = "retry-dead-consumer-group"
)
public class RetryDeadConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        // 处理消息 签收了
        System.out.println("记录到特别的位置 文件 mysql 通知人工处理");
    }
}

处理死信消息方式二:

  • 控制重试次数,重试几次后,直接记录到数据库等等
@Component
@RocketMQMessageListener(
        topic = "%DLQ%retry-consumer-group",
        consumerGroup = "retry-dead-consumer-group"
)
public class RetryDeadConsumer2 implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt messageExt) {
        // 业务处理
        try {
            int i = 1 / 0;
        } catch (Exception e) {
            // 重试
            int reconsumeTimes = messageExt.getReconsumeTimes();
            if (reconsumeTimes >= 3) {
                // 不要重试了
                System.out.println("记录到特别的位置 文件 mysql 通知人工处理");
            }else {
                throw new RuntimeException("异常");
            }
        }
    }
}

四、消费堆积

一般认为单条队列消息差值>=10w时 算堆积问题

什么情况下会出现堆积

  • 生产太快
    • 生产方可以做业务限流
    • 增加消费者数量,但是消费者数量<=队列数量,适当的设置最大的消费线程数量(根据IO(2n)/CPU(n+1))
  • 消费者消费出现问题

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/208230.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Hexo | 支持书写数学公式

为了能够让 Hexo 支持书写数学公式&#xff0c;遇到了好多个坑。虽然以下方法我亲测有效&#xff0c;但并不能保证每个人都能成功。最差的情况就是 hexo s 启动失败&#xff0c;不过还可以重新 hexo init 哈哈笑不出来。 提醒&#xff1a;本文主要针对 fluid 主题&#xff0c;…

视频合并方法:掌握视频批量嵌套合并技巧,成为剪辑高手

在视频剪辑的过程中&#xff0c;我们经常需要将多个视频片段合并在一起。传统的视频合并方法往往需要大量的时间和精力&#xff0c;通过掌握批量嵌套合并技巧&#xff0c;可以更高效地完成这项任务&#xff0c;成为剪辑高手。本文讲解一种简单易学的视频合并方法&#xff0c;轻…

【爬虫】Java 爬虫组件 Jsoup

【爬虫】Java 爬虫组件 Jsoup 写在前面实现思路和步骤步骤一&#xff1a;引入 Jsoup步骤二&#xff1a;获取页面组件内容步骤三&#xff1a;分析页面构成获取需要的组件 代码案例 写在前面 爬虫是通过编程的方式&#xff0c;从网站上获取数据的一种方式。很多语言都提供的有爬…

机器学习---EM算法

1. 极大似然估计与EM算法 极大似然估计是一种常用的参数估计方法&#xff0c;它是以观测值出现的概率最大作为准则。关于极 大似然估计&#xff0c;假设现在已经取到样本值了&#xff0c;这表明取到这一样本的概率L(θ) 比较 大。我们自然不会考虑那些不能使样本出现的θ作为…

高校智慧用电管理平台

高校智慧用电管理平台是一种基于物联网、云计算、大数据等技术的智能化用电管理系统&#xff0c;旨在实现高校用电的实时监测、智能控制、数据分析和管理决策。 具体来说&#xff0c;该平台通常包括以下功能和特点&#xff1a; 实时监测&#xff1a;通过安装传感器、智能终端等…

ZeroTier外网访问实验室Linux服务器

ZeroTier外网访问实验室Linux服务器 1、在ZeroTier上创建一个自己的Network 进入ZeroTier的官网https://www.zerotier.com/注册一个账号 注册完之后登录进去&#xff0c;创建自己的Network 创建完之后来到IPv4的分配管理&#xff0c;选择主机位只有后8位的IP&#xff0c;才能…

img[src=““] img无路径情况下,页面出现边框

在开发过程中遇到一个问题就是当img标签的src为空时&#xff0c;会出现边框&#xff0c;影响美观 其实我们可以直接加上这个就可以解决了 img[src""],img:not([src]){opacity:0; }

金融系统中容易踩坑的问题

1、产品类型指的是大类还是小类 有的产品比如员工贷既是指员工贷小类&#xff0c;也是指员工贷系列的产品&#xff0c;这时候需要关注需求描述的员工贷覆盖范围是产品大类还是小类。 2、未带参数时是否有默认处理 前端传输的某个值为空时&#xff0c;后端是否需要设默认值&a…

夯实c基础

夯实c基础 区别&#xff1a; 图一的交换&#xff0c;&#xff08;交换的是地址而不是两数&#xff09;无法实现两数的交换。 题干以下程序的输出结果为&#xff08; c  &#xff09;。 void fun(int a, int b, int c){ ca*b; } void main( ){ int…

模型层(回顾补充)

1.1基本使用 orm框架---》对象关系映射 数据库中&#xff1a;一个个表 &#xff1a;user表&#xff0c;book表&#xff0c;一条条的记录 程序中&#xff1a;一个个类&#xff0c;一个个对象 以后数据库中一张表---》对应程序中一个类 以后数据库中一条记录--》对应…

ThinkPHP 2.x任意代码执行漏洞

任务一&#xff1a; 复现环境中的代码漏洞 任务二&#xff1a; 尝试利用代码执行漏洞读取服务器web目录下的文件列表。 任务一&#xff1a; 1.搭建环境&#xff1a; 2.在php环境下直接输入{${phpinfo}}测试代码片段 2.写入一句话木马&#xff0c;用antsword连接&#xff0…

C++基础 -24- 覆盖

覆盖的三个条件 -1- 基类和派生类存在同名的函数 -2- 基类的函数为虚函数 -3- 必须使用基类引用或指针指向派生类 #include "iostream"using namespace std;class base {public:base(){}virtual void show(){cout << "base show" << endl;} };…

【LeetCode】栈和队列OJ题---C语言版

栈和队列OJ题 1.括号匹配问题&#xff08;1&#xff09;题目描述&#xff1a;&#xff08;2&#xff09;思路表述&#xff1a;&#xff08;3&#xff09;代码实现&#xff1a; 2.用队列实现栈&#xff08;1&#xff09;题目描述&#xff1a;&#xff08;2&#xff09;思路表述&…

OSI七层模型与TCP/IP四层模型的区别(计算机网络)

一、OSI七层网络模型 OSI 网络模型共有 7 层&#xff0c;分别是应用层、表示层、会话层、传输层、网络层、数据链路层和物理层。 应用层&#xff0c;负责给应用程序提供统一的接口&#xff1b;表示层&#xff0c;负责把数据转换成兼容另一个系统能识别的格式&#xff1b;会话…

NX二次开发UF_MTX2_copy 函数介绍

文章作者&#xff1a;里海 来源网站&#xff1a;https://blog.csdn.net/WangPaiFeiXingYuan UF_MTX2_copy Defined in: uf_mtx.h void UF_MTX2_copy(const double mtx_src [ 4 ] , double mtx_dst [ 4 ] ) overview 概述 Copies the 2x2 matrix elements from the source m…

对外汉语教师简历(精选12篇)

以对外汉语老师招聘需求为背景&#xff0c;我们制作了1份全面、专业且具有参考价值的简历案例&#xff0c;大家可以灵活借鉴&#xff0c;希望能帮助大家在众多候选人中脱颖而出。 对外汉语教师简历下载&#xff08;在线制作&#xff09;&#xff1a;百度幻主简历或huanzhucv.c…

多线程原理和常用方法以及Thread和Runnable的区别

文章目录 &#x1f366;多线程原理&#x1f367;随机性打印&#x1f368;多线程内存图解 &#x1f369;Thread类的常用方法&#x1f36a;获取线程名称 getName()&#x1f382;设置线程名称 setName() 或者 new Thread("线程名字")&#x1f370;使当前正在执行的线程以…

数据挖掘实战:基于 Python 的个人信贷违约预测

本次分享我们 Python 觅圈的一个练手实战项目&#xff1a;个人信贷违约预测&#xff0c;此项目对于想要学习信贷风控模型的同学非常有帮助。 技术交流 技术要学会交流、分享&#xff0c;不建议闭门造车。一个人可以走的很快、一堆人可以走的更远。 好的文章离不开粉丝的分享、…

ssm+java车辆售后维护系统 springboot汽车保养养护管理系统+jsp

以前汽车维修人员只是在汽车运输行业中从事后勤保障工作,随着我国经济的发展,汽车维修行业已经从原来的从属部门发展成了如今的功能齐备的独立企业。这种结构的转变,给私营汽修企业和个体汽修企业的发展带来了契机,私营企业和个体维修企业的加入也带动了整个汽修行业的整体水平…

Python中进行特征重要性分析的8个常用方法

更多资料获取 &#x1f4da; 个人网站&#xff1a;ipengtao.com 在机器学习和数据科学领域&#xff0c;理解特征在模型中的重要性对于构建准确且可靠的预测模型至关重要。Python提供了多种强大的工具和技术&#xff0c;能够探索特征重要性的各个方面。 本文将详细介绍8种常用…
最新文章