RocketMq源码分析(八)--消息消费流程

文章目录

  • 一、消息消费实现
  • 二、消息消费过程
    • 1、消息拉取
    • 2、消息消费
      • 1)提交消费请求
      • 2)消费消息

一、消息消费实现

  消息消费有2种实现,分别为:并发消费实现(ConsumeMessageConcurrentlyService)和顺序消费实现(ConsumeMessageOrderlyService)。本次以并发消费实现为切入进行探讨消息的消费流程。
在这里插入图片描述

二、消息消费过程

1、消息拉取

  1)在消息服务PullMessageService中完成将消息从远程服务器拉取到本地,具体实现由DefaultMQPushConsumerImpl#pullMessage方法完成

//org.apache.rocketmq.client.impl.consumer.PullMessageService#pullMessage
    private void pullMessage(final PullRequest pullRequest) {
        //从MQClientInstance中获取内部实现类MQConsumerInner
        final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
        if (consumer != null) {
            //强转换成PUSH消息消费服务,然后消费消息
            DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
            impl.pullMessage(pullRequest);
        } else {
            log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
        }
    }

  2)DefaultMQPushConsumerImpl#pullMessage方法中定义了回调实现,在成功拉取消息后,先将消息放到processQueue中,然后再提交消费请求(DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest)异步完成消息消费。

// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage 回调部分代码
       PullCallback pullCallback = new PullCallback() {
            @Override
            public void onSuccess(PullResult pullResult) {
               // ......

	             //从服务器拉取到消息后回调 PullCallBack 回调方法后,先将消息放入到 ProccessQueue中,
	               boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
	               // 然后把消息提交到消费线程池中执行,
	               // 也就是调用 ConsumeMessageService#submitConsumeRequest 开始进入到消息消费的事件中来
	               DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
	                   pullResult.getMsgFoundList(),
	                   processQueue,
	                   pullRequest.getMessageQueue(),
	                   dispatchToConsume);

  // ......
                    }
                }
            }

// ......
        };

2、消息消费

1)提交消费请求

  pullMessage方法中回调提交消息消费(submitConsumeRequest),进入消息并发消费实现(ConsumeMessageConcurrentlyService),其实现代码如下:

// org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#submitConsumeRequest方法
    @Override
    public void submitConsumeRequest(
        final List<MessageExt> msgs,
        final ProcessQueue processQueue,
        final MessageQueue messageQueue,
        final boolean dispatchToConsume) {
        final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
        if (msgs.size() <= consumeBatchSize) {
            ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
            try {
                //异步线程池中执行
                this.consumeExecutor.submit(consumeRequest);
            } catch (RejectedExecutionException e) {
                //提交异常,延迟5S再提交
                this.submitConsumeRequestLater(consumeRequest);
            }
        } else {
            //超过最大数量,分批
            for (int total = 0; total < msgs.size(); ) {
                List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
                for (int i = 0; i < consumeBatchSize; i++, total++) {
                    if (total < msgs.size()) {
                        msgThis.add(msgs.get(total));
                    } else {
                        break;
                    }
                }
                ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
                try {
                    this.consumeExecutor.submit(consumeRequest);
                } catch (RejectedExecutionException e) {
                    for (; total < msgs.size(); total++) {
                        msgThis.add(msgs.get(total));
                    }
                    this.submitConsumeRequestLater(consumeRequest);
                }
            }
        }
    }

  submitConsumeRequest方法中,

  • 先获取单次消费消息最大条数(consumeBatchSize,默认1条)
  • 如果本次提交消息条数小于等于单次消费消息最大条数,则直接创建ConsumeRequest并提交到线程池(consumeExecutor)中执行
  • 如果超过单次消费消息最大条数,则按consumeBatchSize分割分批提交

2)消费消息

  ConsumeMessageConcurrentlyService中创建消息消费请求线程ConsumeRequest,然后提交到线程池。

// org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run
        @Override
        public void run() {

            //在进行消息重新负载时如果该消息队列被分配给消费组内其他消费者,drop设置为true,阻止消费者消费不属于自己的消息队列
            if (this.processQueue.isDropped()) {
                log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
                return;
            }

            //类名.this:一般用于内部类需要使用其外部类的实例对象时候使用 ClassName.this 代表其外部类对象,直接写this则代表内部类本身对象
            MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
            ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
            ConsumeConcurrentlyStatus status = null;
            //恢复重试消息主题名
            // RocketMQ将消息存入 commitlog 文件时,如果发现消息的延时级别 delayTimeLevel 大于0会
            //首先将重试主题存人在消息的属性中,然后设置主题名称为 SCHEDULE TOPIC ,以便时间到后重新参与消息消费
            defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());

            ConsumeMessageContext consumeMessageContext = null;
            //执行钩子
            if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                consumeMessageContext = new ConsumeMessageContext();
                consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
                consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
                consumeMessageContext.setProps(new HashMap<String, String>());
                consumeMessageContext.setMq(messageQueue);
                consumeMessageContext.setMsgList(msgs);
                consumeMessageContext.setSuccess(false);
                ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
            }

            long beginTimestamp = System.currentTimeMillis();
            boolean hasException = false;
            ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
            try {
                if (msgs != null && !msgs.isEmpty()) {
                    for (MessageExt msg : msgs) {
                        MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
                    }
                }
                //内部消息监听器消费消息
                status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
            } catch (Throwable e) {
                log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",
                    RemotingHelper.exceptionSimpleDesc(e),
                    ConsumeMessageConcurrentlyService.this.consumerGroup,
                    msgs,
                    messageQueue), e);
                hasException = true;
            }
            long consumeRT = System.currentTimeMillis() - beginTimestamp;
            if (null == status) {
                if (hasException) {
                    returnType = ConsumeReturnType.EXCEPTION;
                } else {
                    returnType = ConsumeReturnType.RETURNNULL;
                }
            } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
                returnType = ConsumeReturnType.TIME_OUT;
            } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
                returnType = ConsumeReturnType.FAILED;
            } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
                returnType = ConsumeReturnType.SUCCESS;
            }

            if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
            }

            if (null == status) {
                log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
                    ConsumeMessageConcurrentlyService.this.consumerGroup,
                    msgs,
                    messageQueue);
                status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            //后置钩子
            if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                consumeMessageContext.setStatus(status.toString());
                consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
                ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
            }
            
            ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
                .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
            //同步消息消费状态和offset
            if (!processQueue.isDropped()) {
                ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
            } else {
                log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
            }
        }

  ConsumeRequest线程中,执行步骤如下

  • 判断消费的队列是否dropped,如果为true,则停止直接终止该消费请求
  • 恢复重试消息的topic和namespace
  • 如果存在钩子函数,则执行前置钩子函数 ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext)
  • 调用消息监听器消费消息listener.consumeMessage(io.openmessaging.rocketmq.consumer.PushConsumerImpl.MessageListenerImpl)
  • 如果存在后置钩子,则执行后置钩子函数
  • 消息消费结果处理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/109216.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

pre-existing shared memory block

发生原因: 1.服务器cpu、内存进行扩容 2.非正常关闭,导致任在占用共享内存段 解决方案: 根据shmid进行关闭 ipcs -mipcrm -m xxx

Kotlin协程核心理解

一、协程是什么&#xff1f; 1.1 基本概念的理解 我们知道JVM中的线程的实现是依赖其运行的操作系统决定的&#xff0c;JVM只是在上层进行了API的封装&#xff0c;包含常见的有线程的启动方法&#xff0c;状态的管理&#xff0c;比如&#xff1a;Java中抽象出了6种状态&#x…

windows8080端口占用

查看端口占用 netstat -ano | findstr “8080”查看占用进程 tasklist | findstr “4664”关闭占用进程 taskkill /f /t /im httpd.exe

读图数据库实战笔记03_遍历

1. Gremlin Server只将数据存储在内存中 1.1. 如果停止Gremlin Server&#xff0c;将丢失数据库里的所有数据 2. 概念 2.1. 遍历&#xff08;动词&#xff09; 2.1.1. 当在图数据库中导航时&#xff0c;从顶点到边或从边到顶点的移动过程 2.1.2. 类似于在关系数据库中的查…

操作系统 --- 存储器管理

一、简答题 1.存储器管理的基本任务&#xff0c;是为多道程序的并发执行提供良好的存储器环境。请问好的存储器环境”应包含哪几个方面&#xff1f; 答&#xff1a; 2.内存保护是否可以完全由软件实现&#xff1f;为什么&#xff1f; 答&#xff1a;内存保护的主要任务是确保每…

C语言_断言assert详解

一、assert定义 assert() 的用法像是一种"契约式编程"&#xff0c;在我的理解中&#xff0c;其表达的意思就是&#xff0c;程序在我的假设条件下&#xff0c;能够正常良好的运作&#xff0c;其实就相当于一个 if 语句&#xff1a; if(假设成立) {程序正常运行&…

用户登录前后端开发(一个简单完整的小项目)——SpringBoot与session验证(带前后端源码)全方位全流程超详细教程

&#x1f9f8;注&#xff1a;不要看我的文件多&#xff0c;那是我的其他项目&#xff0c;这个项目所用的文件我会全部用红框框起来&#xff0c;没框的部分不用管&#xff0c;前端两个文件&#xff0c;后端一个文件 &#x1f4dc; 目录 首先&#xff0c;定义前后端交互接口 然…

强化学习中值函数应用示例

一、Gridworld Gridworld是一个用于教授强化学习概念的简化的电子游戏环境。它具有一个简单的二维网格&#xff0c;智能体可以在其中执行动作并获得奖励。这个环境是有限的&#xff0c;因为它有一个明确的开始和结束状态&#xff0c;以及一组确定的动作和奖励。 在Gridworld中&…

use renv with this project create a git repository

目录 1-create a git repository 2-Use renv with this project 今天在使用Rstudio过程中&#xff0c;发现有下面两个新选项&#xff08;1&#xff09;create a git repository (2) Use renv with this project. 选中这两个选项后&#xff0c;创建新项目&#xff0c;在项目目…

NEFU数字图像处理(三)图像分割

一、图像分割的基本概念 1.1专有名词 前景和背景 在图像分割中&#xff0c;我们通常需要将图像分为前景和背景两个部分。前景是指图像中我们感兴趣、要分割出来的部分&#xff0c;背景是指和前景不相关的部分。例如&#xff0c;对于一张人物照片&#xff0c;人物就是前景&…

目标检测算法改进系列之添加EIOU,SIOU,AlphaIOU,FocalEIOU等

YOLOv8添加EIoU,SIoU,AlphaIoU,FocalEIoU,Wise-IoU等 yolov8中box_iou其默认用的是CIoU&#xff0c;其中代码还带有GIoU&#xff0c;DIoU&#xff0c;文件路径&#xff1a;ultralytics/yolo/utils/metrics.py&#xff0c;函数名为&#xff1a;bbox_iou 原始代码 def bbox_i…

故障诊断模型 | Maltab实现LSTM长短期记忆神经网络故障诊断

文章目录 效果一览文章概述模型描述源码设计参考资料效果一览 文章概述 故障诊断模型 | Maltab实现LSTM长短期记忆神经网络故障诊断 模型描述 长短记忆神经网络——通常称作LSTM,是一种特殊的RNN,能够学习长的依赖关系。 他们由Hochreiter&Schmidhuber引入,并被许多人进行了…

2023 年值得关注的国外网络安全初创公司

网络安全初创公司试图解决的问题往往有点超前于主流。他们可以比大多数老牌公司更快地填补空白或新兴需求。初创公司通常可以更快地创新&#xff0c;因为它们不受安装基础的限制。 当然&#xff0c;缺点是初创公司往往缺乏资源和成熟度。公司致力于初创公司的产品或平台是有风…

基于单片机的空气质量检测系统

欢迎大家点赞、收藏、关注、评论啦 &#xff0c;由于篇幅有限&#xff0c;只展示了部分核心代码。 技术交流认准下方 CSDN 官方提供的联系方式 文章目录 概要 一、主要内容二、系统方案设计2.1 系统方案设计2.2 主控制器模块选择 三、 系统软件设计4.1 程序结构分析4.2系统程序…

基于SSM的会员卡管理系统设计与实现

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;采用JSP技术开发 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#x…

S32K144芯片焊接完成后使用S32DS初次下载无法下载解决方法

一、问题现象如下&#xff0c;S32DS Debug下报错 二、原因&#xff0c;原厂芯片出厂时的FLASH Memory的安全机制是激活的&#xff0c;仿真器是可以连上&#xff0c;但是没法读取Flash Memory的内容 三、解决方法 参考图示&#xff0c;解锁后即可正常Debug

Mac电脑配置Dart编程环境

1.安装Dart SDK 官网地址&#xff1a;https://dart.dev/get-dart $brew tap dart-lang/dart$brew install dart 安装后&#xff0c;用命令检测一下是否安装正常。 $brew info dart 2.VS Code配置Dart环境 1).安装VS Code 官网地址&#xff1a;https://code.visualstudio.c…

【技能树笔记】网络篇——练习题解析(十)

【技能树笔记】网络篇系列前九篇 【技能树笔记】网络篇——练习题解析&#xff08;一&#xff09;-CSDN博客 【技能树笔记】网络篇——练习题解析&#xff08;二&#xff09;-CSDN博客 【技能树笔记】网络篇——练习题解析&#xff08;三&#xff09;-CSDN博客 【技能树笔记】网…

DAY38 动态规划 + 509. 斐波那契数 + 70. 爬楼梯 + 746. 使用最小花费爬楼梯

动态规划理论 动态规划&#xff0c;Dynamic Programming&#xff0c; DP&#xff0c; 如果某一问题有很多重叠子问题&#xff0c;使用动态规划是最有效的。 所以动态规划中每一个状态一定是由上一个状态推导出来的&#xff0c;这一点就区分于贪心&#xff0c;贪心没有状态推导…

Cross Site Scripting (XSS)

攻击者会给网站发送可疑的脚本&#xff0c;可以获取浏览器保存的网站cookie&#xff0c; session tokens, 或者其他敏感的信息&#xff0c;甚至可以重写HTML页面的内容。 背景 XSS漏洞有不同类型&#xff0c;最开始发现的是存储型XSS和反射型XSS&#xff0c;2005&#xff0c;Am…