RocketMq 顺序消费、分区消息、延迟发送消息、Topic、tag分类 实战 (消费者) (三)

消费端配置
如下所示:是消费者的配置类,有以下几点需要注意的地方
1、是TargetMessageListener这个监听类(下文会把这个监听类的具体代码贴出来),需要把这个监听类订阅。
2、rocketMqDcProperties.getTargetProperties()这个方法里面有相关的配置信息(这里面有绑定消费者是那个组,因为一类Producer或Consumer,这类Producer或Consumer通常生产或消费同一类消息,且消息发布或订阅的逻辑一致),具体代码见下文
3、subscription.setTopic(rocketMqDcProperties.getOrderTopic()) 是绑定Topic,这个Topic跟上篇文章生产者的Topic是一致的,这样就能保证消费者能准确消费到生产者发送的消息
4、subscription.setExpression(MqTagEnum.target.name()); 这个是一个消息的过滤,也是一个对消息的具体分类。详细用法请参考链接:
https://help.aliyun.com/zh/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-4-x-series/developer-reference/message-filtering?spm=a2c4g.11186623.0.i38#concept-2047069
综上所述:
这段代码主要是用于配置一个 OrderConsumerBean 对象,设置其属性和订阅关系

@Configuration
public class TargetConsumerClient {

    @Autowired
    private RocketMqDcProperties rocketMqDcProperties;

    @Autowired
    private TargetMessageListener messageListener;

    @ConditionalOnProperty(name = "rocket.mq.dc.enabled", havingValue = "true", matchIfMissing = true)
    @Bean(initMethod = "start", destroyMethod = "shutdown",name = {"buildTargetConsumer"})
    public OrderConsumerBean buildTargetConsumer() {
        OrderConsumerBean orderConsumerBean = new OrderConsumerBean();
        //配置文件
        orderConsumerBean.setProperties(rocketMqDcProperties.getTargetProperties());
        //订阅关系
        Map<Subscription, MessageOrderListener> subscriptionTable = new HashMap<Subscription, MessageOrderListener>();
        Subscription subscription = new Subscription();
        subscription.setTopic(rocketMqDcProperties.getOrderTopic());
        subscription.setExpression(MqTagEnum.target.name());
        subscriptionTable.put(subscription, messageListener);
        orderConsumerBean.setSubscriptionTable(subscriptionTable);
        return orderConsumerBean;
    }
}
	public Properties getTargetProperties() {
        return this.getProperties(this.targetGroupId);
    }
    private Properties getProperties(String groupId) {
        Properties properties = new Properties();
        properties.setProperty("AccessKey", this.accessKey);
        properties.setProperty("SecretKey", this.secretKey);
        properties.setProperty("NAMESRV_ADDR", this.nameSrvAddr);
        properties.setProperty("GROUP_ID", groupId);
        properties.setProperty("ConsumeThreadNums", this.getConsumeThreadNums().toString());
        properties.setProperty("maxReconsumeTimes", this.getMaxReconsumeTimes().toString());
        properties.setProperty("consumeTimeout", this.getConsumeTimeout().toString());
        properties.setProperty("suspendTimeMillis", this.getSuspendTimeMillis().toString());
        return properties;
    }    

消费端代码
如下所示:TargetMessageListener 实现了MessageOrderListener接口,并如上文所示,其和OrderConsumerBean也绑定了订阅关系。

@Slf4j
@Component
public class TargetMessageListener implements MessageOrderListener {

    @Autowired
    private MqMessageRecordDao mqMessageRecordDao;

    @Autowired
    private TargetService targetServiceImpl;

    @Override
    public OrderAction consume(final Message message, final ConsumeOrderContext context) {

        log.info("MQ消息消费者监听消息内容:{}", message);

        MqMessageRecordMo mqMessageRecordMo = new MqMessageRecordMo();
        try {
            String body = new String(message.getBody());
            String tag = message.getTag();
            mqMessageRecordMo.setMsgId(message.getMsgID());
            mqMessageRecordMo.setOrderTopic(message.getTopic());
            mqMessageRecordMo.setProducerIp(message.getBornHost());
            mqMessageRecordMo.setTag(tag);
            //mqMessageRecordMo.setMessageKey(message.getKey());
            mqMessageRecordMo.setShardingKey(message.getShardingKey());
            mqMessageRecordMo.setBodyJson(body)
            mqMessageRecordMo.setProducerTime(LocalDateLUtils.timestampToDatetime(message.getBornTimestamp()));
            mqMessageRecordMo.setCreateTime(LocalDateTime.now());
            mqMessageRecordMo.setPMsgId(message.getUserProperties("pMsgId"));

            log.info("MQ消费者消息消费成功,解析并处理相应的业务逻辑, tag = {},key = {},messageId = {}", tag, message.getShardingKey(),message.getMsgID());

            DataBaseEnum dataBase = DataBaseEnum.getEnum(message.getShardingKey());
            targetServiceImpl.process(message.getMsgID(),dataBase,body);

            log.info("MQ消息体消费监听解析结果:{}", body);
            mqMessageRecordMo.setIsSuccess(true);
            return OrderAction.Success;
        } catch (Exception e) {
            //消费失败,挂起当前队列
            // 存储错误消息,重试,记录日志
            /*log.error("target MQ消费者消息监听消息业务逻辑处理失败:",e);
            mqMessageRecordMo.setIsSuccess(false);
            mqMessageRecordMo.setErrorMsg(e.getMessage());
            return OrderAction.Success;
        } finally {
            mqMessageRecordDao.save(mqMessageRecordMo);
        }
    }
}            

在这里想讲下顺序消息
顺序消息
顺序消息可以保证消息的消费顺序和发送的顺序一致,即先发送的先消费,后发送的后消费,常用于金融证券、电商业务等对消息指令顺序有严格要求的场景。本文介绍云消息队列 RocketMQ 版顺序消息的概念、适用场景、实现原理以及使用过程中的注意事项。
什么是顺序消息
顺序消息是云消息队列 RocketMQ 版提供的一种对消息发送和消费顺序有严格要求的消息。对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。
顺序消息分为分区顺序消息和全局顺序消息。
分区顺序消息
对于指定的一个Topic,所有消息根据Sharding Key进行区块分区,同一个分区内的消息按照严格的先进先出(FIFO)原则进行发布和消费。同一分区内的消息保证顺序,不同分区之间的消息顺序不做要求。
适用场景
适用于性能要求高,以Sharding Key作为分区字段,在同一个区块中严格地按照先进先出(FIFO)原则进行消息发布和消费的场景。
示例
用户注册需要发送验证码,以用户ID作为Sharding Key,那么同一个用户发送的消息都会按照发布的先后顺序来消费。
电商的订单创建,以订单ID作为Sharding Key,那么同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息都会按照发布的先后顺序来消费。
阿里巴巴集团内部电商系统均使用分区顺序消息,既保证业务的顺序,同时又能保证业务的高性能。
全局顺序消息
对于指定的一个Topic,所有消息按照严格的先入先出(FIFO)的顺序来发布和消费。
适用场景
适用于性能要求不高,所有的消息严格按照FIFO原则来发布和消费的场景。
示例
在证券处理中,以人民币兑换美元为Topic,在价格相同的情况下,先出价者优先处理,则可以按照FIFO的方式发布和消费全局顺序消息。
说明
全局顺序消息实际上是一种特殊的分区顺序消息,即Topic中只有一个分区,因此全局顺序和分区顺序的实现原理相同。因为分区顺序消息有多个分区,所以分区顺序消息比全局顺序消息的并发度和性能更高
如何实现顺序消息
在这里插入图片描述
在云消息队列 RocketMQ 版中,消息的顺序需要由以下三个阶段保证:
消息发送
如上图所示,A1、B1、A2、A3、B2、B3是订单A和订单B的消息产生的顺序,业务上要求同一订单的消息保持顺序,例如订单A的消息发送和消费都按照A1、A2、A3的顺序。如果是普通消息,订单A的消息可能会被轮询发送到不同的队列中,不同队列的消息将无法保持顺序,而顺序消息发送时云消息队列 RocketMQ 版支持将Sharding Key相同(例如同一订单号)的消息序路由到一个队列中。
云消息队列 RocketMQ 版服务端判定消息产生的顺序性是参照同一生产者发送消息的时序。不同生产者、不同线程并发产生的消息,云消息队列 RocketMQ 版服务端无法判定消息的先后顺序。
消息存储
如上图所示,顺序消息的Topic中,每个逻辑队列对应一个物理队列,当消息按照顺序发送到Topic中的逻辑队列时,每个分区的消息将按照同样的顺序存储到对应的物理队列中。
消息消费
云消息队列 RocketMQ 版按照存储的顺序将消息投递给Consumer,Consumer收到消息后也不对消息顺序做任何处理,按照接收到的顺序进行消费。
Consumer消费消息时,同一Sharding Key的消息使用单线程消费,保证消息消费顺序和存储顺序一致,最终实现消费顺序和发布顺序的一致
注意事项
a、同一个Group ID只对应一种类型的Topic,即不同时用于顺序消息和无序消息的收发。
b、对于全局顺序消息,建议消息不要有阻塞。同时运行多个实例,是为了防止工作实例意外退出而导致业务中断。当工作实例退出时,其他实例可以立即接手工作,不会导致业务中断,实际工作的只会有一个实例。
c、云消息队列 RocketMQ 版服务端判定消息产生的顺序性是参照单一生产者、单一线程并发下消息发送的时序。如果发送方有多个生产者或者有多个线程并发发送消息,则此时只能以到达云消息队列 RocketMQ 版服务端的时序作为消息顺序的依据,和业务侧的发送顺序未必一致

顺序消息常见问题
a、同一条消息是否可以既是顺序消息,又是定时消息和事务消息?
不可以。顺序消息、定时消息、事务消息是不同的消息类型,三者是互斥关系,不能叠加在一起使用。

b、顺序消息支持哪些地域?
支持云消息队列 RocketMQ 版所有公共云地域和金融云地域。

c、为什么全局顺序消息性能一般?
全局顺序消息是严格按照FIFO的消息阻塞原则,即上一条消息没有被成功消费,那么下一条消息会一直被存储到Topic队列中。如果想提高全局顺序消息的TPS,可以升级实例配置,同时消息客户端应用尽量减少处理本地业务逻辑的耗时。

d、顺序消息支持哪种消息发送方式?
顺序消息只支持可靠同步发送方式,不支持异步发送方式,否则将无法严格保证顺序。

e、顺序消息是否支持集群消费和广播消费?
顺序消息暂时仅支持集群消费模式,不支持广播消费模式。
以上文档链接来源:
https://help.aliyun.com/zh/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-4-x-series/developer-reference/ordered-messages?spm=a2c4g.11186623.0.0.34b428e5LL1Jlh

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/472156.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

MySQL 多表查询与事务的操作

一,多表联查 有些数据我们已经拆分成多个表,他们之间通过外键进行连接.当我们要查询两个表的数据,各取其中的一列或者多列. 这时候就需要使用多表联查. 数据准备: # 创建部门表 create table dept(id int primary key auto_increment,name varchar(20) ) insert into dept (n…

力扣---打家劫舍---动态规划

思路 1&#xff1a; 我将res[i]定义为&#xff1a;一定要取第 i 个房子的前提下&#xff0c;能获取的最大金额。那么直接用cnt从头记录到尾&#xff0c;每个房子的res最大值即是答案。那么递推公式是什么&#xff1f;res[i]max(res[i-2],res[i-1],...,res[0])nums[i]。数组初始…

cmake与交叉编译(x86 to arm)过程和问题全记录

一、背景 公司维护一批c动态库&#xff0c;由于生产需要&#xff0c;每次更新都要在windows、linux_x86、kylin_arm等多个环境中编译一遍&#xff0c;操作比较麻烦&#xff0c;所以想通过交叉编译的方式在一台机器上边编译多个环境的动态库&#xff0c;减少工作量。考虑到工作…

浅谈大模型“幻觉”问题

大模型的幻觉大概来源于算法对于数据处理的混乱&#xff0c;它不像人类一样可以by the book&#xff0c;它没有一个权威的对照数据源。 什么是大模型幻觉 大模型的幻觉&#xff08;Hallucination&#xff09;是指当人工智能模型生成的内容与提供的源内容不符或没有意义的现象。…

Linux——程序地址空间

我们先来看这样一段代码&#xff1a; #include <stdio.h> #include <unistd.h> #include <stdlib.h>int g_val 0;int main() {pid_t id fork();if(id < 0){perror("fork");return 0;}else if(id 0){ //child,子进程肯定先跑完&#xff0c;也…

提升Java编程安全性-代码加密混淆工具的重要性和应用

在Java编程领域中&#xff0c;保护代码安全性和知识产权至关重要。本文旨在探讨代码加密混淆工具在提升代码安全性和保护知识产权方面的重要性。我们将介绍几款流行的Java代码加密混淆工具&#xff0c;如ProGuard、DexGuard、Jscrambler、DashO和ipaguard&#xff0c;并分析它们…

多线程(剩余部分)

Day29 多线程(剩余部分) 十二、线程的礼让 Thread.yield(); 理解&#xff1a;此方法为静态方法&#xff0c;此方法写在哪个线程中&#xff0c;哪个线程就礼让 注意&#xff1a;所谓的礼让是指当前线程退出CPU资源&#xff0c;并转到就绪状态&#xff0c;接着再抢 需求&#x…

浅谈一下对于DDD模式的理解3

浅谈一下对于DDD模式的理解&#xff0c;相互学习交流&#xff0c;不对之处欢迎大家指正。 在说到DDD(Domain-Driven Design)设计模式之前&#xff0c;先要说下我们在对系统进行架构设时需要遵循的几个原则&#xff1a; 单一职责&#xff08;SRP&#xff09; "单一职责原则…

直播预约丨《袋鼠云大数据实操指南》No.1:从理论到实践,离线开发全流程解析

近年来&#xff0c;新质生产力、数据要素及数据资产入表等新兴概念犹如一股强劲的浪潮&#xff0c;持续冲击并革新着企业数字化转型的观念视野&#xff0c;昭示着一个以数据为核心驱动力的新时代正稳步启幕。 面对这些引领经济转型的新兴概念&#xff0c;为了更好地服务于客户…

文献速递:基于SAM的医学图像分割---阶梯式微调方法,用于整合补充网络的自适应矩估计(SAM)

Title 题目 Ladder Fine-tuning approach for SAM integrating complementary network 阶梯式微调方法&#xff0c;用于整合补充网络的自适应矩估计&#xff08;SAM&#xff09; 01 文献速递介绍 医学图像分割在医疗保健中扮演着至关重要的角色。它旨在使用各种医学成像方式…

MS2574/2574T/2574S高速、四通道差动线路驱动器

品简述 MS2574/MS2574T/MS2574S 是一款高速、低功耗的四通道 差动线路驱动芯片&#xff0c;用于平衡或非平衡的数字数据传输。可 以满足 ANSI TIA/EIA-422-B 和 ITU &#xff08;原 CCITT &#xff09;建议 V.11 的要求。 三态输出可提供用于驱动双绞线或平行双线传输线路等…

公司购买阿里云服务器多少钱一年?199元2核4G5M配置

阿里云服务器ECS u1实例&#xff0c;2核4G&#xff0c;5M固定带宽&#xff0c;80G ESSD Entry盘优惠价格199元一年&#xff0c;性能很不错&#xff0c;CPU采用Intel Xeon Platinum可扩展处理器&#xff0c;购买限制条件为企业客户专享&#xff0c;实名认证信息是企业用户即可&a…

基于机器视觉的太阳能电池片异物遮挡检测含数据集

分享链接见文末 近年来&#xff0c;随着太阳能发电技术的快速发展&#xff0c;太阳能电池片的应用越来越广泛。然而&#xff0c;太阳能电池片在实际运行过程中常常会受到各种异物的遮挡&#xff0c;如树叶、灰尘等&#xff0c;导致发电效率下降甚至损坏设备。因此&#xff0c;…

python 基于 websocket 的简单将视频推流到网页

本来有一台设备是要搞成无线的形式的&#xff0c;设备的摄像头的数据可以在一台局域网连接的平板上查看&#xff0c;因为试着使用 RTMP 推流&#xff0c;感觉延时太大了&#xff0c;而 Webrtc 感觉有太麻烦了&#xff0c;所以一开始看到这篇文章使用 UDP 协议进行推流&#xff…

竞赛 - 基于机器视觉的图像拼接算法

前言 图像拼接在实际的应用场景很广&#xff0c;比如无人机航拍&#xff0c;遥感图像等等&#xff0c;图像拼接是进一步做图像理解基础步骤&#xff0c;拼接效果的好坏直接影响接下来的工作&#xff0c;所以一个好的图像拼接算法非常重要。 再举一个身边的例子吧&#xff0c;…

“比特币跌至8900美元”?逢低买入信号闪现!亚洲投资者需求正持续增长!

3月19日&#xff0c;美股三大指数集体收涨&#xff0c;美联储正在召开为期两天的货币政策会议&#xff0c;周三公布结果&#xff0c;市场普遍预计美联储将按兵不动。 然而&#xff0c;比特币近几日却面临显著的价格回调&#xff0c;昨早再次从6.7万美元水平快速下滑&#xff0c…

学习vue3第九节(新加指令 v-pre/v-once/v-memo/v-cloak )

1、v-pre 作用&#xff1a;防止编译器解析某个特定的元素及其内容&#xff0c;即v-pre 会跳过当前元素以及其子元素的vue语法解析&#xff0c;并将其保持原样输出&#xff1b; 用于&#xff1a;vue 中一些没有指令和插值表达式的节点的元素&#xff0c;使用 v-pre 可以提高 Vu…

【Linux】shell命令运行原理---认识Linux基本指令

主页&#xff1a;醋溜马桶圈-CSDN博客 专栏&#xff1a;Linux_醋溜马桶圈的博客-CSDN博客 gitee&#xff1a;mnxcc (mnxcc) - Gitee.com 目录 1.shell命令以及运行原理 1.1 shell命令 1.2 Linux内核权限 1.3 图示Linux shell和bash的区别 2.认识Linux基本指令 2.1 指令的…

选马桶别再犯错,这7点要注意!福州中宅装饰,福州装修

在众多卫浴品牌中&#xff0c;各种型号尺寸和性能的马桶更是层出不穷&#xff0c;在选购的时候总是陷入难题&#xff0c;那么接下来就给大家讲讲马桶应该怎么选购&#xff1a; ①高效冲水系统&#xff1a;高效的冲水系统&#xff0c;不仅能确保每一次冲洗都干净彻底&#xff0c…

【RabbitMQ】【Docker】基于docker-compose构建rabbitmq容器

本文通过docker-compose构建一个单体的rabbtimq容器。 1&#xff0c;docker、docker-compose环境 首先需要有docker和docker-compose环境&#xff0c;docker安装[1]&#xff0c;docker-compose安装[2]。 通过下列命令确定docker、docker-compose是否安装成功。 [root192 ge…