RocketMq如何保证消息的顺序性

文章目录

  • 消息的有序性
  • 生产者保证消息的有序性
  • 消费者端保证消息消费的有序性

消息的有序性

消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了三条消息分别是订单创建、订单付款、订单完成。消费时要按照这个顺序消费才能有意义,但是同时订单之间是可以并行消费的。RocketMQ可以严格的保证消息有序。

顺序消息分为全局顺序消息与部分顺序消息,全局顺序是指某个Topic下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可。
我们不用管不同的订单ID的消息之间的总体消费顺序,只需要保证同样订单ID的消息能按照订单创建、订单付款、订单完成这个顺序消费就可以了。
顺序消费实际上有两个核心点,一个是生产者有序存储,另一个是消费者有序消费。

生产者保证消息的有序性

先看如何实现生产者有序存储。我们知道RocketMQ中生产者生产的消息会放置在某个队列中,基于队列先进先出的特性天然的可以保证存入队列的消息顺序和拉取的消息顺序是一致的,因此,我们只需要保证一组相同的消息按照给定的顺序存入同一个队列中,就能保证生产者有序存储。

普通发送消息的模式下,生产者会采用轮询的方式将消费均匀的分发到不同的队列中,然后被不同的消费者消费,因为一组消息在不同的队列,此时就无法使用 RocketMQ 带来的队列有序特性来保证消息有序性了。
这个问题很好解决,因为RocketMQ支持生产者在投放消息的时候自定义投放策略,我们实现一个MessageQueueSelector接口,使用Hash取模法来保证同一个订单在同一个队列中就行了,即通过订单ID%队列数量得到该ID的订单所投放的队列在队列列表中的索引,然后该订单的所有消息都会被投放到这个队列中。

生产者发送消息的方法中就有一些添加队列选择器的方法,保证消息发送顺序。比如只有两个队列,那么订单ID为1,2,3的三组消息中,1、3组消息存放于第一个队列,而2组消息存放于第二个队列

另外,顺序消息必须使用同步发送的方式才能保证生产者发送的消息有序。实际上,采用队列选择器的方法不能保证消息的严格顺序,我们的目的是将消息发送到同一个队列中,如果某个broker挂了,那么队列就会减少一部分,如果采用取余的方式投递,将可能导致同一个业务中的不同消息被发送到不同的队列中,导致同一个业务的不同消息被存入不同的队列中,短暂的造成部分消息无序。同样的,如果增加了服务器,那么也会造成短暂的造成部分消息无序。

消费者端保证消息消费的有序性

生产者有序存储实现了,那么该如何实现消费者有序消费呢?RockerMQ的MessageListener回调函数提供了两种消费模式,有序消费模式MessageListenerOrderly和并发消费模式MessageListenerConcurrently。

在消费的时候,还需要保证消费者注册MessageListenerOrderly类型的回调接口实现顺序消费,如果消费者采用Concurrently并行消费,则仍然不能保证消息消费顺序。

实际上,每一个消费者的的消费端都是采用线程池实现多线程消费的模式,即消费端是多线程消费。虽然MessageListenerOrderly被称为有序消费模式,但是仍然是使用的线程池去消费消息。

MessageListenerConcurrently是拉取到新消息之后就提交到线程池去消费,而MessageListenerOrderly则是通过加分布式锁和本地锁保证同时只有一条线程去消费一个队列上的数据。

即顺序消费模式使用3把锁来保证消费的顺序性:

broker端的分布式锁:
在负载均衡的处理新分配队列的updateProcessQueueTableInRebalance方法,以及ConsumeMessageOrderlyService服务启动时的start方法中,都会尝试向broker申请当前消费者客户端分配到的messageQueue的分布式锁。
broker端的分布式锁存储结构为ConcurrentMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>>,该分布式锁保证同一个consumerGroup下同一个messageQueue只会被分配给一个consumerClient。
获取到的broker端的分布式锁,在client端的表现形式为processQueue. locked属性为true,且该分布式锁在broker端默认60s过期,而在client端默认30s过期,因此ConsumeMessageOrderlyService#start会启动一个定时任务,每过20s向broker申请分布式锁,刷新过期时间。而负载均衡服务也是每20s进行一次负载均衡。
broker端的分布式锁最先被获取到,如果没有获取到,那么在负载均衡的时候就不会创建processQueue了也不会提交对应的消费请求了。
messageQueue的本地synchronized锁:
在执行消费任务的开头,便会获取该messageQueue的本地锁对象objLock,它是一个Object对象,然后通过synchronized实现锁定。
这个锁的锁对象存储在MessageQueueLock.mqLockTable属性中,结构为ConcurrentMap<MessageQueue, Object>,所以说,一个MessageQueue对应一个锁,不同的MessageQueue有不同的锁。
因为顺序消费也是通过线程池消费的,所以这个synchronized锁用来保证同一时刻对于同一个队列只有一个线程去消费它。
ProcessQueue的本地consumeLock:
在获取到broker端的分布式锁以及messageQueue的本地synchronized锁的之后,在执行真正的消息消费的逻辑messageListener#consumeMessage之前,会获取ProcessQueue的consumeLock,这个本地锁是一个ReentrantLock。
那么这把锁有什么作用呢?
在负载均衡时,如果某个队列C被分配给了新的消费者,那么当前客户端消费者需要对该队列进行释放,它会调用removeUnnecessaryMessageQueue方法对该队列C请求broker端分布式锁的解锁。
而在请求broker分布式锁解锁的时候,一个重要的操作就是首先尝试获取这个messageQueue对应的ProcessQueue的本地consumeLock。只有获取了这个锁,才能尝试请求broker端对该messageQueue的分布式锁解锁。
如果consumeLock加锁失败,表示当前消息队列正在消息,不能解锁。那么本次就放弃解锁了,移除消息队列失败,只有等待下次重新分配消费队列时,再进行移除。
如果没有这把锁,假设该消息队列因为负载均衡而被分配给其他客户端B,但是由于客户端A正在对于拉取的一批消费消息进行消费,还没有提交消费点位,如果此时客户端A能够直接请求broker对该messageQueue解锁,这将导致客户端B获取该messageQueue的分布式锁,进而消费消息,而这些没有commit的消息将会发送重复消费。
所以说这把锁的作用,就是防止在消费消息的过程中,该消息队列因为发生负载均衡而被分配给其他客户端,进而导致的两个客户端重复消费消息的行为。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/3353.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

若依框架 --- ruoyi 表格的设置

表格 字典值转换 (1) 方式1&#xff1a;使用字典枚举的方式 var isDownload [[${dict.getType(YES_OR_NO)}]];{field : isDownload,title : 是否允许下载,formatter: function(value, row, index) {return $.table.selectDictLabel(isDownload, value);} }, (2) 方式2&…

Java正则表达式及Pattern与Matcher使用详解

文章目录一、正则表达式详解1、符号定义&#xff08;1&#xff09;基本书写符号&#xff08;2&#xff09;限定符&#xff08;3&#xff09;匹配字符集&#xff08;4&#xff09;分组构造&#xff08;5&#xff09;字符转义2、常用正则表达式举例3、Java中RegularExpressionVal…

flutter 输入时插入分隔符

每四位插入一个分隔符import package:flutter/services.dart;class DividerInputFormatter extends TextInputFormatter {final int rear; //第一个分割位数,后面分割位,,数final String pattern; //分割符DividerInputFormatter({this.rear 4, this.pattern });overrideTex…

【Linux】虚拟地址空间

进程地址空间一、引入二、虚拟地址与物理内存的联系三、为什么要有虚拟地址空间一、引入 对于C/C程序&#xff0c;我们眼中的内存是这样的&#xff1a; 我们利用这种对于与内存的理解看一下下面这段代码&#xff1a; 运行结果&#xff1a; 观察父子进程中 val 变量的值&…

uniapp中使用百度地图(初学者保姆式教学,持续更新)

uniapp中使用百度地图(保姆式教学&#xff0c;从零开始) 最近在写一个移动端的地图项目&#xff0c;也是首次完整的去了解百度地图api&#xff0c;这篇博客会手把手的教你如何使用百度地图api和一些常见问题&#xff0c;后续我也会继续更新完善此博客 1、百度地图api&#xf…

实验九 TSP问题

《算法设计与分析》实验报告 所在院系 计算机与信息工程学院 学生学号 学生姓名 年级专业 2020级计算机科学与技术 授课教师 彭绪富 学 期 2022-2023学年第一学期 提交时间 2022年10月26日 目 录 实验九-1&#xff1a;TSP问题 一、实验目的与要求 二…

html+css制作

<!DOCTYPE html> <html><head><meta charset"utf-8"><title>校园官网</title><style type"text/css">*{padding: 0;margin: 0;}#logo{width:30%;float: left;}.nav{width: 100%;height: 100px;background-color…

mybatis如何解析常用的标签

通过这三行就解析好了一个mybatis配置文件&#xff0c;我们看看如何工作的&#xff1f; String resource "mybatis-config.xml"; Reader reader Resources.getResourceAsReader(resource); SqlSessionFactory sqlSessionFactory new SqlSessionFactoryBuilder().b…

【进阶C语言】qsort库函数(详解)

qsort库函数1. qsort到底是什么&#xff1f;2. qsort库函数的功能3. qosrt函数详解4. 冒泡排序的实现5. qsort库函数如何实现冒泡排序6. qsort库函数排序结构体数据7. 使用冒泡排序的思想来实现类似于qsort1. qsort到底是什么&#xff1f; qsort是C语言库函数里面的一种&#x…

【Flutter·学习实践·配置】认识配置文件pubspec.yaml

目录 简介 pubspec.yaml 添加Pub仓库 其他依赖方式 依赖本地包 依赖Git 简介 简单说就是包管理工具&#xff0c;类似于Android 提供了 Gradle 来管理依赖&#xff0c;iOS 用 Cocoapods 或 Carthage 来管理依赖&#xff0c;Node 中通过 npm 等。 让我们能很好的管理第三…

固定优先级仲裁器设计

前言仲裁器Arbiter是数字设计中非常常见的模块&#xff0c;应用也非常广泛。定义就是当有两个或两个以上的模块需要占用同一个资源的时候&#xff0c;我们需要由仲裁器arbiter来决定哪一个模块来占有这个资源。一般来说&#xff0c;提出占有资源的模块要产生一个请求(request)&…

电脑硬盘文件数据误删除/格式化为什么可以恢复? 怎么恢复?谈谈文件删除与恢复背后的原理

Hello 大家好&#xff0c; 我是元存储~ 主页&#xff1a;元存储的博客_CSDN博客 1. 硬盘数据丢失场景 我们在每天办公还是记录数据的时候&#xff0c;文件存储大多数都是通过硬盘进行存储的&#xff0c;因此&#xff0c;使用多了&#xff0c;各种问题就会出现&#xff0c;比如…

【C++初阶】五、内存管理

文章目录1. C/C内存分布2. C语言中动态内存管理3. C中动态内存管理方式new/delete操作内置类型new和delete操作自定义类型4.C和C在内存申请失败时处理方式的区别5. operator new与operator delete函数6. new和delete的实现原理内置类型自定义类型7. 定位new表达式(placement-ne…

【 Spark编程基础 】实验1

文章目录第1部分&#xff1a;虚拟机的准备工作1.1 下载安装虚拟机1.2 修改主机名1.3 主机ip映射安装SSH服务端SFTP连接&#xff0c;传输安装包安装Java环境第2部分 Hadoop安装2.1 安装Hadoop第3部分 配置集群环境第4部分 Spark安装第1部分&#xff1a;虚拟机的准备工作 1.1 下…

【设计模式-工厂方法】想象力和创造力:你考虑过自动化实现工厂吗?

无限思维-想象力和创造力&#xff1a;自动化实现工厂方法前言一、《大话设计模式》对应的Java版本工厂方法类图先行&#xff1a;代码实现&#xff1a;思考升华&#xff1a;二、想象力&#xff1a;创新型思维解决思路战略上&#xff1a;以无限思维的角度去想问题&#xff1a;部署…

SpringBoot整合数据可视化大屏使用

1 前言 DataV数据可视化是使用可视化应用的方式来分析并展示庞杂数据的产品。DataV旨让更多的人看到数据可视化的魅力,帮助非专业的工程师通过图形化的界面轻松搭建专业水准的可视化应用,满足您会议展览、业务监控、风险预警、地理信息分析等多种业务的展示需求, 访问地址:h…

文件上传的多种利用方式

文件上传的多种利用方式 文件上传漏洞除了可以通过绕过检测进行webshell的上传之外&#xff0c;还有多种其它的漏洞可以进行测试。 XSS漏洞 文件名造成的XSS 当上传任何文件时&#xff0c;文件名肯定是会反显示在网页上&#xff0c;可以使用 XSS Payload做文件名尝试将其上传到…

upload—labs(9-12)

pass9直接查看的源码&#xff0c;得知是黑名单过滤&#xff0c;而且过滤也都很全通过查看wp&#xff0c;得知我们可以使用. .(点空格点)进行绕过利用bp抓包进行更改trim删除文件名末尾的点&#xff0c;得到shell.php.空格&#xff0c;然后进行首尾去空得到shell.php.,黑名单过滤…

Java并发高频面试题

分享50道Java并发高频面试题。 线程池 线程池&#xff1a;一个管理线程的池子。 为什么平时都是使用线程池创建线程&#xff0c;直接new一个线程不好吗&#xff1f; 嗯&#xff0c;手动创建线程有两个缺点 不受控风险频繁创建开销大 为什么不受控&#xff1f; 系统资源有…

【机器学习基础 3】 sklearn库

目录 一、sklearn库简介 二、sklearn库安装 三、关于机器学习 四、sklearn库在机器学习中的应用 1、数据预处理 2、特征提取 3、模型选择与评估 五、常用的sklearn函数 1、数据集划分 2、特征选择 3、特征缩放 4、模型训练 5、模型预测 一、sklearn库简介 Scikit-l…