RocketMQ教程-(5)-功能特性-顺序消息

顺序消息为 Apache RocketMQ 中的高级特性消息,本文为您介绍顺序消息的应用场景、功能原理、使用限制、使用方法和使用建议。

应用场景​

在有序事件处理、撮合交易、数据实时增量同步等场景下,异构系统间需要维持强一致的状态同步,上游的事件变更需要按照顺序传递到下游进行处理。在这类场景下使用 Apache RocketMQ 的顺序消息可以有效保证数据传输的顺序性。

 

功能原理​

什么是顺序消息

顺序消息是 Apache RocketMQ 提供的一种高级消息类型,支持消费者按照发送消息的先后顺序获取消息,从而实现业务场景中的顺序处理。 相比其他类型消息,顺序消息在发送、存储和投递的处理过程中,更多强调多条消息间的先后顺序关系。

Apache RocketMQ 顺序消息的顺序关系通过消息组(MessageGroup)判定和识别,发送顺序消息时需要为每条消息设置归属的消息组,相同消息组的多条消息之间遵循先进先出的顺序关系,不同消息组、无消息组的消息之间不涉及顺序性。

基于消息组的顺序判定逻辑,支持按照业务逻辑做细粒度拆分,可以在满足业务局部顺序的前提下提高系统的并行度和吞吐能力。

如何保证消息的顺序性

Apache RocketMQ 的消息的顺序性分为两部分,生产顺序性和消费顺序性。

  • 生产顺序性 :

    Apache RocketMQ 通过生产者和服务端的协议保障单个生产者串行地发送消息,并按序存储和持久化。

    如需保证消息生产的顺序性,则必须满足以下条件:

    • 单一生产者:消息生产的顺序性仅支持单一生产者,不同生产者分布在不同的系统,即使设置相同的消息组,不同生产者之间产生的消息也无法判定其先后顺序。

    • 串行发送:Apache RocketMQ 生产者客户端支持多线程安全访问,但如果生产者使用多线程并行发送,则不同线程间产生的消息将无法判定其先后顺序。

    满足以上条件的生产者,将顺序消息发送至 Apache RocketMQ 后,会保证设置了同一消息组的消息,按照发送顺序存储在同一队列中。服务端顺序存储逻辑如下:

    • 相同消息组的消息按照先后顺序被存储在同一个队列。

    • 不同消息组的消息可以混合在同一个队列中,且不保证连续。

如上图所示,消息组1和消息组4的消息混合存储在队列1中, Apache RocketMQ 保证消息组1中的消息G1-M1、G1-M2、G1-M3是按发送顺序存储,且消息组4的消息G4-M1、G4-M2也是按顺序存储,但消息组1和消息组4中的消息不涉及顺序关系。

  • 消费顺序性 :

    Apache RocketMQ 通过消费者和服务端的协议保障消息消费严格按照存储的先后顺序来处理。

    如需保证消息消费的顺序性,则必须满足以下条件:

    • 投递顺序

      Apache RocketMQ 通过客户端SDK和服务端通信协议保障消息按照服务端存储顺序投递,但业务方消费消息时需要严格按照接收---处理---应答的语义处理消息,避免因异步处理导致消息乱序。

      备注

      消费者类型为PushConsumer时, Apache RocketMQ 保证消息按照存储顺序一条一条投递给消费者,若消费者类型为SimpleConsumer,则消费者有可能一次拉取多条消息。此时,消息消费的顺序性需要由业务方自行保证。消费者类型的具体信息,请参见消费者分类。

    • 有限重试

      Apache RocketMQ 顺序消息投递仅在重试次数限定范围内,即一条消息如果一直重试失败,超过最大重试次数后将不再重试,跳过这条消息消费,不会一直阻塞后续消息处理。

      对于需要严格保证消费顺序的场景,请务设置合理的重试次数,避免参数不合理导致消息乱序。

生产顺序性和消费顺序性组合

如果消息需要严格按照先进先出(FIFO)的原则处理,即先发送的先消费、后发送的后消费,则必须要同时满足生产顺序性和消费顺序性。

一般业务场景下,同一个生产者可能对接多个下游消费者,不一定所有的消费者业务都需要顺序消费,您可以将生产顺序性和消费顺序性进行差异化组合,应用于不同的业务场景。例如发送顺序消息,但使用非顺序的并发消费方式来提高吞吐能力。更多组合方式如下表所示:

使用限制​

顺序消息仅支持使用MessageType为FIFO的主题,即顺序消息只能发送至类型为顺序消息的主题中,发送的消息的类型必须和主题的类型一致。

使用示例​

创建主题

Apache RocketMQ 5.0版本下创建主题操作,推荐使用mqadmin工具,需要注意的是,对于消息类型需要通过属性参数添加。示例如下:

sh mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=FIFO

发送消息

和普通消息发送相比,顺序消息发送必须要设置消息组。消息组的粒度建议按照业务场景,尽可能细粒度设计,以便实现业务拆分和并发扩展。

创建FIFO主题

./bin/mqadmin updateTopic -c DefaultCluster -t FIFOTopic -o true -n 127.0.0.1:9876

  • -c 集群名称
  • -t Topic名称
  • -n nameserver地址
  • -o 创建顺序消息

以Java语言为例,收发顺序消息的示例代码如下:

使用建议

串行消费,避免批量消费导致乱序

消息消费建议串行处理,避免一次消费多条消费,否则可能出现乱序情况。

例如:发送顺序为1->2->3->4,消费时批量消费,消费顺序为1->23(批量处理,失败)->23(重试处理)->4,此时可能由于消息3的失败导致消息2被重复处理,最后导致消息消费乱序。

消息组尽可能打散,避免集中导致热点

Apache RocketMQ 保证相同消息组的消息存储在同一个队列中,如果不同业务场景的消息都集中在少量或一个消息组中,则这些消息存储压力都会集中到服务端的少量队列或一个队列中。容易导致性能热点,且不利于扩展。一般建议的消息组设计会采用订单ID、用户ID作为顺序参考,即同一个终端用户的消息保证顺序,不同用户的消息无需保证顺序。

因此建议将业务以消息组粒度进行拆分,例如,将订单ID、用户ID作为消息组关键字,可实现同一终端用户的消息按照顺序处理,不同用户的消息无需保证顺序。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/45061.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

JavaWeb银行项目

主要功能 实现了贷款、存款、理财、提现、充值、开户、绑卡、转账等功能。 介绍 1、这个是一个类似有支付宝一样的web项目。 2、登录和注册&#xff0c;都是通过手机号来进行的。 3、注册的新用户需要先进行开户操作&#xff0c;然后进行绑卡操作。 4、在开户的时候回给你…

Linux 学习记录57(ARM篇)

Linux 学习记录57(ARM篇) 本文目录 Linux 学习记录57(ARM篇)一、外部中断1. 概念2. 流程图框 二、相关寄存器1. GIC CPU Interface (GICC)2. GIC distributor (GICD)3. EXTI registers 三、EXTI 寄存器1. 概述2. 内部框图3. 寄存器功能描述4. EXTI选择框图5. EXTI_EXTICR1 &…

金融中的数学:贝叶斯公式

1.贝叶斯定理 贝叶斯定理是概率论中的一项重要定理&#xff0c;用于在已知某一事件的条件下&#xff0c;求另一事件发生的概率。它是根据条件概率推导出来的&#xff0c;得名于英国数学家托马斯贝叶斯。 贝叶斯定理可以表示为&#xff1a; 这个式子就是贝叶斯公式&#xff0c…

Hadoop 之 Spark 配置与使用(五)

Hadoop 之 Spark 配置与使用 一.Spark 配置1.Spark 下载2.单机测试环境配置3.集群配置 二.Java 访问 Spark1.Pom 依赖2.测试代码1.计算 π 三.Spark 配置 Hadoop1.配置 Hadoop2.测试代码1.统计字符数 一.Spark 配置 环境说明环境版本AnolisAnolis OS release 8.6Jdkjava versi…

Docker系列 1 - 镜像和容器

Docker系列 1 - 镜像和容器 1、关于 Docker2、镜像 image3、容器 container 1、关于 Docker docker官网&#xff1a;http://www.docker.com docker中文网站&#xff1a;https://www.docker-cn.com/ Docker Hub 仓库官网: https://hub.docker.com/ Docker 的基本组成&#…

Okhttp-LoggingInterceptor的简单使用

概述 Okhttp除了提供强大的get,post网络请求外&#xff0c;还包含请求日志的拦截器&#xff0c;可以监视&#xff0c;重写&#xff0c;重试调用请求。 简单使用 我们在构造OkHttpClient时&#xff0c;通过addInterceptor()方法添加我们需要的过滤器。 object OkhttpUtils{……

十二、数据结构——二叉树基本概念及特点

数据结构中的二叉树 目录 一、二叉树的基本概念 二、二叉树的特点 三、二叉树的分类 四、二叉树的存储结构 (一)、顺序存储 (二)、链式存储 一、二叉树的基本概念 二叉树是一种重要的数据结构&#xff0c;它是每个节点最多有两个子节点的树结构。在二叉树中&#xff0c;每个…

【iOS】自定义字体

文章目录 前言一、下载字体二、添加字体三、检查字体四、使用字体 前言 在设计App的过程中我们常常会想办法去让我们的界面变得美观&#xff0c;使用好看的字体是我们美化界面的一个方法。接下来笔者将会讲解App中添加自定义字体 一、下载字体 我们要使用自定义字体&#x…

Docker 安装 Nacos

简介 Nacos 是一个轻量级的服务发现、配置管理和服务管理平台&#xff0c;它支持多种语言&#xff08;Java、Go、Node.js 等&#xff09;和多种协议&#xff08;HTTP、gRPC、DNS 等&#xff09;&#xff0c;能够帮助开发者构建微服务体系结构&#xff0c;简化了应用程序在不同…

RunnerGo相比较JMeter有哪些优势

当谈到性能测试需求时&#xff0c;JMeter和RunnerGo都提供了丰富的功能&#xff0c;包括测试场景设置、执行性能测试和性能测试结果分析。然而&#xff0c;这两工具在结构方面存在一些区别。以下是对它们进行比较的另一种角度&#xff1a; 模块化设计&#xff1a; JMeter采用…

计算机网络模型

计算机网络模型 网络模型网络模型中各层对应的协议封装与分用TCP/IP协议簇的组成 网络模型 OSI 七层模型 应用层、表示层、会话层、传输层、网络层、数据链路层、物理层 TCP/IP四层模型 应用层、传输层、网络层、网络接口层 TCP/IP五层模型 应用层、传输层、网络层、数据链路…

Linux 学习记录55(ARM篇)

Linux 学习记录55(ARM篇) 本文目录 Linux 学习记录55(ARM篇)一、使用C语言封装GPIO函数1. 封装GPIO组寄存器2. 封装GPIO模式以及相关配置3. 封装GPIO初始化结构体4. 使用自己的封装配置GPIO 一、使用C语言封装GPIO函数 1. 封装GPIO组寄存器 #define GPIOA ((GP…

基于STM32设计的智能奶瓶

一、项目背景 随着我国计划生育政策的放开,婴幼儿数量持续上涨,国民收入逐年提高,家庭在婴幼儿产品方面的消费日益扩大。奶瓶是母婴市场的刚需。目前婴儿哺育的问题引起新爸新妈的高度重视。一方面,人们使用的传统奶瓶已经不能很好地满足现代人对于智能化生活的需求。另一…

钉钉和金蝶云星空接口打通对接实战

钉钉和金蝶云星空接口打通对接实战 对接系统&#xff1a;钉钉 钉钉是阿里巴巴集团打造的企业级智能移动办公平台&#xff0c;是数字经济时代的企业组织协同办公和应用开发平台。钉钉将IM即时沟通、钉钉文档、钉闪会、钉盘、Teambition、OA审批、智能人事、钉工牌、工作台深度整…

无线投屏手机(安卓)屏幕到 Linux(ubuntu 22.04)桌面

1.安装 scrcpy 安装 scrcpy会自动安装 adb. 这个版本的adb功能不是最全的&#xff0c;需要删掉&#xff0c;然后从链接 https://dl.google.com/android/repository/platform-tools-latest-darwin.zip 下载&#xff0c;解压安装即可。 2. 在手机上 打开开发者模式和 USB调试…

【1++的C++初阶】之list

&#x1f44d;作者主页&#xff1a;进击的1 &#x1f929; 专栏链接&#xff1a;【1的C初阶】 文章目录 一&#xff0c;什么是list二&#xff0c;构造与析构2.1 结点结构2.2 链表结构2.3 迭代器结构 三&#xff0c;部分重要接口的作用及其实现3.1 迭代器相关的接口3.2 list相关…

【网络安全】DVWA靶场实战BurpSuite内网渗透

BurpSuite 内网渗透 一、 攻击模式介绍1.1 Sniper&#xff08;狙击手&#xff09;1.2 Battering ram&#xff08;攻城锤&#xff09;1.3 Pitchfork&#xff08;草叉&#xff09;1.4 Cluster bomb&#xff08;榴霰弹&#xff09; 二、 DVWA靶场搭建2.1 下载DVWA工程2.2 添加网站…

redis中使用bloomfilter的白名单功能解决缓存预热问题

一 缓存预热 1.1 缓存预热 将需要的数据提前缓存到缓存redis中&#xff0c;可以在服务启动时候&#xff0c;或者在使用前一天完成数据的同步等操作。保证后续能够正常使用。 1.2 解决办法PostConstruct注解初始化

WebRTC Simulcast介绍

原文地址&#x1f447; https://blog.livekit.io/an-introduction-to-webrtc-simulcast-6c5f1f6402eb/ 你想知道的关于Simulcast的一切 Simulcast是WebRTC中最酷的功能之一,它允许WebRTC会议在参与者网络连接不可预测的情况下进行扩展。在这篇文章中,我们将深入探讨Simulcas…

Databend 开源周报第 103 期

Databend 是一款现代云数仓。专为弹性和高效设计&#xff0c;为您的大规模分析需求保驾护航。自由且开源。即刻体验云服务&#xff1a;https://app.databend.cn 。 Whats On In Databend 探索 Databend 本周新进展&#xff0c;遇到更贴近你心意的 Databend 。 创建网络策略 …