RocketMQ重复消费的解决方案::分布式锁直击面试!

文章目录

  • 场景分析
    • 方法的幂等
    • 分布式锁
      • Redis实现分布式锁
        • 抢锁的设计思路
    • 分布式锁案例
  • 直击面试
    • rocketmq什么时候重复消费
    • 消息丢失的问题
      • 消息在哪里丢失
      • 发送端确保发送成功并且配合失败的业务处理
      • 消费端确保消息不丢失
      • rocketmq 主从+同步刷盘

场景分析

在这里插入图片描述
分布式系统架构中,队列是分布式的,生产端是分布式集群,消费端也是分布式集群.

相当于有多个消费端同时监听队列,同时减库存,写入订单.

面试题:如何处理消息重复消费的问题. 重复消费大部分场景,需要解决的.

引入2个概念来解决: 幂等的业务方法,和消息的分布式锁.

方法的幂等

结论: 一个方法的一次业务逻辑调用和N次调用的结果是一致的,我们称这种方法就是幂等.

一旦重复消费,一定要把消费的业务逻辑方法(orderAdd)设计成幂等的.

  • 幂等的方法
    • GET方法: 查询方法,天生幂等.
    • DELETE方法: 删除方法,天生幂等.
    • PUT方法: 修改,并不是天生幂等,需要设计
      • 减少库存:
        • update stock_tbl set stock=stock-#{stock} where id=#{id}(不是幂等)
        • select * from stock_log where order_id=#{orderId}(查询日志,判断是否已经见过库存了),没有数据 update stock_tbl set stock=stock-#{stock} where id=#{id},insert into stock_log (字段) values (订单id,商品减库存信息) (这样设计就幂等了,依然有问题)
    • POST方法: 新增,并不是天生幂等,需要设计
      • 新增订单: insert into order_tbl (order_id,order_item_id,count,user_id) values (各种属性);如果使用唯一属性校验,作用在order_id order_sn(编号).同一张订单,这个字段值是相同(幂等满足,没做幂等不满足)
  • 当前orderAdd方法设计幂等的解决思路(之一)
    • 使用订单id或者订单编号,**userId+商品id(**这个只满足当前我们的案例特点,不满足实际场景.)查询订单,如果已经存在了,库存不减少,订单不增了,购物车不用删除了
@Override
public void orderAdd(OrderAddDTO orderAddDTO) {
    //幂等设计思路: 利用userId和commodityCode 查询,如果已经存在了订单,方法直接执行结束
    //如果结果不存在,减库存,生单,删除购物车
    int count=orderMapper.selectExists(orderAddDTO);
    if (count>0){
        log.debug("订单已经新增了");
        return;
    }
    StockReduceCountDTO countDTO=new StockReduceCountDTO();
    countDTO.setCommodityCode(orderAddDTO.getCommodityCode());
    countDTO.setReduceCount(orderAddDTO.getCount());
    // 利用Dubbo调用stock模块减少库存的业务逻辑层方法实现功能
    stockService.reduceCommodityCount(countDTO);
    // 2.从购物车中删除用户选中的商品(调用Cart模块删除购物车中商品的方法)
    // 利用dubbo调用cart模块删除购物车中商品的方法实现功能
    Order order=new Order();
    BeanUtils.copyProperties(orderAddDTO,order);
    // 下面执行新增 假设insert是幂等的.
    orderMapper.insertOrder(order);
    log.info("新增订单信息为:{}",order);
    cartService.cartDelete(orderAddDTO);
}

@Select("select count(id) from " +
"order_tbl where user_id=#{userId} and commodity_code=#{commodityCode}")
int selectExists(OrderAddDTO orderAddDTO);

分布式锁

当前分布式消费架构
在这里插入图片描述
即使,将方法设计成幂等,这个架构中,消息重复消费

,满足线程安全问题的所有因素

  • 并发/多线程
  • 写操作
  • 共享数据

只要解决其中一点,线程安全问题就消失了.

并发多线程–>串行

写操作–> 避免写(不能满足当前案例,必须写)

共享数据–>个体数据(不能满足,重复消费,重复订单是前提)

分布式线程安全问题的解决方案—分布式锁

错误思路: 引入synchronized同步锁,不能解决分布式场景下,多个进程的并发线程安全问题.

概念: 分布式场景下,多进程,多线程并发的抢锁机制. 抢到资源锁,执行业务逻辑,抢不到等待或者放弃执行.能够避免对同一个资源出现并发多线程操作的解决方案.

和synchronized的区别在于 synchronizeds本地锁.管理一个进程中的多线程,分布式锁是管理多个进程中的多线程.

分布式锁当前落地方案: redis setnx命令

Redis实现分布式锁

抢锁的设计思路

  • 目标: 多线程执行业务之前,先判断执行权限,抢锁,抢到锁的才能执行业务,抢不到的不执行.(当前案例中,抢锁,然后执行的业务逻辑是:orderAdd)
    在这里插入图片描述
    抢锁如何执行?: setnx key “”

  • key值如何设计?: 需要结合业务,设计key值(redis中最主要的功能,都关系到key值的设计),抢锁的逻辑中,满足是业务数据,满足重复消费的重复数据.就可以实现这个key值的设计. 消息Id是重复的.

  • 当前业务流程设计缺陷: 如果有一个消费者抢到锁了,执行了业务方法.执行完成后,没有释放锁的机制.如果引入等待重抢的机制,由于抢到锁的没有释放,会导致死锁.
    在这里插入图片描述
    释放锁的逻辑引入
    在这里插入图片描述
    上述整改的流程中避免了死锁问题,但是存在删除失败导致死锁的问题.
    在这里插入图片描述
    所以,要保证del释放没有成功,在redis也一定不会长期保存.
    在这里插入图片描述
    兜底的解决死锁问题.基本不会出现死锁了.
    在这里插入图片描述为了解决误删除的问题,抢锁的时候setnx key value值设计成一个随机数.
    在这里插入图片描述
    随机数两个消费,多个消费者生成相同的可能性极低.

分布式锁案例

@Component
@RocketMQMessageListener(
        topic = "business-order-topic",
        consumerGroup = "${rocketmq.consumer.group}",
        selectorExpression = "orderAdd")
@Slf4j
public class OrderAddConsumerListener implements RocketMQListener<MessageExt> {
    @Autowired
    private IOrderService orderService;
    @Autowired
    private StringRedisTemplate redisTemplate;
    @Override
    public void onMessage(MessageExt msg) {
        //拿到底层消息对象的body
        byte[] body = msg.getBody();
        //尝试先解析成string
        String orderJson=new String(body, StandardCharsets.UTF_8);
        System.out.println(orderJson);
        OrderAddDTO orderAddDTO=
            JSON.toJavaObject(JSON.parseObject(orderJson),OrderAddDTO.class);
        System.out.println(orderAddDTO);
        //1.生成锁的key值,生成当前这把锁的随机数
        //准备锁key
        String msgKeyLock="msg:order:add:"+msg.getMsgId();
        //准备随机数 4 6 8位
        String randCode=new Random().nextInt(9000)+1000+"";
        ValueOperations<String, String> stringOps = redisTemplate.opsForValue();
        try{
            //补充消息消费的抢锁机制
            //2.抢锁 setnx msgKeyLock randCode expire 10s
            Boolean tryLockSuccess = stringOps
                    .setIfAbsent(msgKeyLock, randCode, 10, TimeUnit.SECONDS);
            //3.判断 抢锁成功还是失败
            if(!tryLockSuccess){
                //3.2 失败了 可以等待5秒重新抢锁,也可以直接结束
                //尝试这里使用while编写等待5秒重新抢的逻辑
                log.info("有别人抢锁了,msgKey:{},value:{}",msgKeyLock,randCode);
                return;
            }
            //3.1 成功了 执行orderAdd
            orderService.orderAdd(orderAddDTO);
        }catch (CoolSharkServiceException e){
            //业务异常,说明订单新增业务性失败,比如库存没了
            log.error("库存减少失败,库存触底了:{},异常信息:{}",orderAddDTO,e.getMessage());
        }finally {
            //释放锁 读以下锁的value值,等于当前生成value才释放
            String s = stringOps.get(msgKeyLock);
            if (s!=null && s.equals(randCode)){
                //del msgKeyLock
                redisTemplate.delete(msgKeyLock);
            }
        }
    }
}

直击面试

rocketmq什么时候重复消费

  • 在broker做扩展的时候,消息队列的消息,做扩展的时候,原本存储在原队列的消息,会进行rebalance重平衡.
  • 消费开始阶段
    在这里插入图片描述消费者consumer1 所在group1 绑定队列,push消费模式,使得消费者接受到了queue1 queue2的6条消息.消费过程,成功执行,即将返回确认.
    在这里插入图片描述
    总结:
  • 消费者并发消费的逻辑,同一组消费者绑定分布式队列,推送批量的消息
  • 在某个消费者还没有来得及消费,或者没来得及返回确认给rocketmq,队列发生了扩容缩容
  • rocketmq会对队列中所有的消息做rebalance重平衡(消息重新分配给不同队列),消费者绑定也充平衡
  • 导致已经推送的但是未返回确认的消息,被发送给不同消费者多次.

消息丢失的问题

rocketmq kafka rabbitmq activemq都是队列.只要谈到其中一个.

  1. 重复消费的问题(方法必须设计成幂等,一旦设计成幂等,可能造成线程安全隐患,所以引入分布式锁)
  2. 消息丢失如何处理.

面试题:消息丢失如何处理.

消息在哪里丢失

  • 发送没成功,没有解决不成功的业务逻辑
  • rocketmq保存的时候,断电,宕机,丢失消息(运行的时候,消息存储在内存)
  • 消费端丢失消息(没有成功处理消息,就直接返回success,并不是所有的消费逻辑都是先消费,再确认的,如果关注的是消费速度,不关注成功或者是否丢失,就可以这样处理)

发送端确保发送成功并且配合失败的业务处理

同步发送,接收发送结果,SEND_OK才结束.

客户端代码底层都有默认重试(retry 3 times).发送重试都失败了.

处理发送失败的逻辑.

  1. 发送到备用/失败的队列
  2. 记录日志,将消息来源,目标和消息内容,详细记录,等待监控系统,维护人员来直接处理

消费端确保消息不丢失

一定是先消息费,在确认,消费失败,返回失败(rocketmq消费点位保持原有位置不变,同一个消费者组,会重新拿到消息)

rocketmq 主从+同步刷盘

在这里插入图片描述
同步刷盘(消息数据可靠性保证): 如果持久化内存消息数据到磁盘失败,发送结果没有成功.

异步刷盘: 只要内存接收到了生产端的消息数据,数据是否持久化到磁盘,都会给生产端发送成功接收信息.

主从的双机热备: broker可以配置主从,考虑数据可靠性,和性能,一般主master做同步刷盘,slave做异步刷盘.(都同步刷盘,100%保证消息只要到达rocketmq就不会丢失,但是性能不能保证.)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/44467.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

7.python设计模式【桥结模式】

内容&#xff1a;将一个事物的两个维度分离&#xff0c;使其都可以独立变化角色&#xff1a; 抽象&#xff08;Abstraction&#xff09;细化抽象&#xff08;RefinedAbstraction&#xff09;实现者&#xff08;Implementor&#xff09;具体实现者&#xff08;ConcreteImplement…

vue3 +ts 报错 index.vue 不是模块

那是因为index.vue中创建了一个空的script标签&#xff0c;而且语法使用的是ts语法。vue-cli会用ts语法解析和校验 如果是无状态组件&#xff0c;删掉 如果是有状态组件&#xff0c;导出该组件的实例 去掉null的script后&#xff1a;

实验三 贪心算法

实验三 贪心算法 迪杰斯特拉的贪心算法实现 优先队列等 1.实验目的 1、掌握贪心算法的基本要素 &#xff1a;最优子结构性质和贪心选择性质 2、应用优先队列求单源顶点的最短路径Dijkstra算法&#xff0c;掌握贪心算法。 2.实验环境 Java 3.问题描述 给定带权有向图G (V…

单Bank OTA升级:STM32G071 APP (二)

接上一篇文章&#xff1a;单Bank OTA升级&#xff1a;STM32G071 BootLoader (一)&#xff1a;跳转链接 什么是单Bank升级&#xff1a;将Flash划分为以下3个区域。 BootLoader区&#xff1a;程序进行升级的引导程序&#xff0c;根据Upade_Flag来判断跳转Bank区运行程序或是接收…

C# 存在重复元素

217 存在重复元素 给你一个整数数组 nums 。如果任一值在数组中出现 至少两次 &#xff0c;返回 true &#xff1b;如果数组中每个元素互不相同&#xff0c;返回 false 。 示例 1&#xff1a; 输入&#xff1a;nums [1,2,3,1] 输出&#xff1a;true 示例 2&#xff1a; 输…

【压测指南|压力测试核心性能指标及行业标准】

文章目录 压力测试核心性能指标及行业标准指标1&#xff1a;响应时间指标2&#xff1a;吞吐量&#xff08;TPS)指标3&#xff1a;失败率总结&#xff1a; 压力测试核心性能指标及行业标准 在做压力测试时&#xff0c;新手测试人员常常在看报告时倍感压力&#xff1a;这么多性能…

网工内推 | 网络安全工程师,有安全相关证书优先

01 航天四创科技有限责任公司 招聘岗位&#xff1a;网络安全工程师 职责描述&#xff1a; 1、根据项目的投标技术方案、适配测试方案等&#xff0c;制定网络系统、安全系统、主机系统、存储系统等的深化设计方案和实施方案&#xff1b; 2、安装、配置和搭建基于软硬件设备的网…

连锁反应开始了!Linux 发行版迎新变化!

任何企业都有合法权利捍卫其模型和产品。撇开大量不真正了解开源许可证如何工作的人不谈&#xff0c;我们的印象是&#xff0c;有很多人觉得仅仅因为这是Linux&#xff0c;他们就有某种权利免费获得它。但事实上&#xff0c;他们没有。这不是自由软件中的“自由”的意思&#x…

微信小游戏个人开发者上架:从注册到上线的详细步骤

微信小游戏个人开发者上架&#xff1a;从注册到上线的详细步骤 一&#xff0c;注册小程序账号1.1 微信公众平台1.2 填写信息1.3 绑定管理 二&#xff0c;打包步骤2.1 工具准备2.2 关于Unity版本2.3 打包详解 三&#xff0c;提包步骤3.1 填写用户隐私3.2 完善开发者自查3.3 游戏…

SpringCloudAlibaba微服务实战系列(二)Nacos配置中心

SpringCloudAlibaba Nacos配置中心 在java代码中或者在配置文件中写配置&#xff0c;是最不雅的&#xff0c;意味着每次修改配置都需要重新打包或者替换class文件。若放在远程的配置文件中&#xff0c;每次修改了配置后只需要重启一次服务即可。话不多说&#xff0c;直接干货拉…

zookeeper的应用

Zookeeper的配置文件解析: Zookeeper内部原理: 选举机制 半数机制:在集群环境中半数以上的机器存活,这个集群可用,所以在设计Zookeeper集群系统时&#xff0c;通常会选择 奇数台服务器来搭建Zookeeper的集群 虽然在配置文件中并没有指定Master和Slave。但是&#xff0c;Zookeep…

家政服务小程序制作攻略揭秘

想要打造一个家政服务小程序&#xff0c;但是又不懂编程和设计&#xff1f;不用担心&#xff01;下面将为你详细介绍如何利用第三方平台&#xff0c;从零开始打造一个家政服务小程序。 首先&#xff0c;你需要找到一个适合的第三方平台&#xff0c;例如乔拓云网。在乔拓云网的【…

LiveGBS流媒体平台GB/T28181常见问题-token有效期是多久如何设置token有效期有效时间接口调用token的有效时长

LiveGBS常见问题如何设置TOKEN有效时间接口调用token的有效时长 1、TOKEN有效期2、默认token有效期3、配置token_key4、如何配置一直有效的token5、动态有效期6、搭建GB28181视频直播平台 1、TOKEN有效期 调用登陆接口后&#xff0c;会获得一个token&#xff0c;默认的有效期是…

【NLP】BERT,BART和T5等LLM模型的比较

一、介绍 在这篇博文中&#xff0c;我将讨论像BERT&#xff0c;BART和T5这样的大型语言模型。到2020年&#xff0c;LLM领域取得的主要进展包括这些模型的开发。BERT和T5由Google开发&#xff0c;BART由Meta开发。我将根据这些模型的发布日期依次介绍这些模型的详细信息。在之前…

AlSD 系列智能安全配电装置是安科瑞电气有限公司专门为低压配电侧开发的一款智能安全用电产 品-安科瑞黄安南

一、应用背景 电力作为一种清洁能源&#xff0c;给人们带来了舒适、便捷的电气化生活。与此同时&#xff0c;由于使用不当&#xff0c;维护 不及时等原因引发的漏电触电和电气火灾事故&#xff0c;也给人们的生命和财产带来了巨大的威胁和损失。 为了防止低压配电系统发生漏…

Yarn与Zookeeper学习

YARN学习 1.YARN是什么&#xff1f; yarn 分配运行资源 mapReduce的运行平台 2.YARN运行过程&#xff1a; 客户端与ResourceManager交互&#xff0c;生成临时配置文件(Application)ResourceManager根据Application信息生成Task然后生成MapReduceApplicationMaster(简称AM)AM…

STN:Spatial Transformer Networks

1.Abstract 卷积神经网络缺乏对输入数据保持空间不变的能力&#xff0c;导致模型性能下降。作者提出了一种新的可学习模块&#xff0c;STN。这个可微模块可以插入现有的卷积结构中&#xff0c;使神经网络能够根据特征图像本身&#xff0c;主动地对特征图像进行空间变换&#x…

前端图标解决方案

1. 前言 随着 Web 技术的发展与日益丰富的界面需求&#xff0c;图标逐渐成为前端开发中不可或缺的一部分&#xff0c;为此也诞生了各种各样的解决方案。文章总结及分析了目前常见的一些图标解决方案。 2. CSS 背景图片 2.1 background-image 图标本质上也是图片&#xff0c…

人才公寓水电表改造解决方案

随着社会经济的不断发展&#xff0c;人才公寓作为吸引和留住人才的重要配套设施&#xff0c;其水电表改造问题越来越受到人们的关注。本文将从以下几个方面探讨人才公寓水电表改造解决方案。 一、现状分析 目前&#xff0c;人才公寓的水电表普遍存在以下几个问题&#xff1a; …

科技资讯|苹果计划本月推出Vision Pro头显开发套件,电池有重大更新

根据消息源 aaronp613 分享的信息&#xff0c;苹果计划本月底面向开发者&#xff0c;发布 Vision Pro 头显开发套件。消息源还指出苹果更新了 Vision Pro 头显电池组的代号&#xff0c;共有 A2781&#xff0c;A2988 和 A2697 三种不同的型号&#xff0c;目前尚不清楚三者之间的…
最新文章