消息中间件之RocketMQ源码分析(三)

RocketMQ中的Consumer启动流程

RocketMQ客户端中有两个独立的消费者实现类分别为DefaultMQPullConsumer和DefaultMQPushConsumer,

DefaultMQPullConsumer

DefaultMQPullConsumer,该消费者使用时需要用户主动从Broker中Pull消息和消费消息,提交消费位点

继承关系图

在这里插入图片描述

核心属性

  • namesrvAddr:继承自ClientConfig,表示RocketMQ集群的Namesrv地址,如果是多个,则用逗号分开
    如:127.0.0.1:9876,127.0.0.2:9876
  • clientIP:使用客户端的程序所在机器的IP地址,目前支持IPV4和IPV6,同时排除了本地环会地址(127.0.xxx.xxx)
    和私有内网地址(192.168.xxx.xxx),如果在Docket中运行,获取的IP地址是容器所在的IP地址,而非宿主主机的IP地址
  • instanceName:实例名,顾名思义每个实例都需要取不一样的名字,加入要在多个机器上部署
    多个程序进程,那么每个进程的实例名必须不相同,否则程序会启动失败,因为在创建MQClient时,
    会用到IP和instancename名称来
    在这里插入图片描述

在这里插入图片描述

  • vipChannelEnabled:这是一个boolean值,表示是否开启VIP通道。VIP通道和非VIP通道的区别是使用不同的端口号进行通信
  • clientCallbackExecutorThreads:客户端回调线程数。该线程数等于Netty通信层回调线程的个数,默认值为
    Runtime.getRuntime().availableProcessors();表示当前有效的CPU个数
  • pollNameServerInterval:获取Topic路由信息间隔,单位为ms,默认为30000ms(30s)
  • heartbeatBrokerInterval:客户端和Broker心跳间隔,单位为ms,默认30000ms(30s)
  • persistCOnsumerOffsetInterval:持久化消费位点时间间隔,单位为ms,默认为5000ms(5s)
  • defaultMQPullConsumer:默认pull消费者的具体实现
  • consumerGroup:消费者组名字
  • brokerSuspendMaxTimeMills:在长轮询模式下,Broker的最大挂起请求时间,建议不要修改此值
  • consumerTimeoutMillsWhenSuspend:在长轮询模式下,消费者的最大请求超时时间,必须比brokerSuspendMaxTimeMills大,不建议修改
  • messageModel:消费模式,现在支持集群模式消费和广播模式消费
  • messageQueueListener:消息路由信息变化时回调处理监听器,一般在重新平衡时被调用
  • offsetStore:位点存储模块。集群模式位点会持久化到Broker中,广播模式持久化到本地文件中(某个实例消费失败,生产者也不会重发),位点存储模块有两个实现类,分别为RemoteBrokerOffsetStore和LocalFileOffsetStore
  • allocateMessageQUeueStrategy:消费Queue分配策略管理器,默认是平均分配策略
    private AllocateMessageQueueStrategy allocateMessageQueueStrategy = new AllocateMessageQueueAveragely();
  • maxReconsumeTimes:最大重试次数,可以配置

核心方法

  • registerMessageQueueListener():注册队列变化监听器,当队列发生变化是会被监听到
    在这里插入图片描述

  • pull():从Broker中Pull消息,如果有PullCallback参数,则表示异步拉取
    在这里插入图片描述

  • pullBlockIfNotFound():长轮询方式拉取,如果没有拉取到消息,那么Broker会讲请求Hold住一段时间,
    当有消息来临时再发送pull请求
    在这里插入图片描述

  • updateConsumeOffset():更新某一个Queue的消费位点
    在这里插入图片描述

  • fetchConsumeOffset():查找某个Queue的消费位点
    在这里插入图片描述

  • sendMessageBack():如果消费发送失败,则可以讲消息重新发回Broker,这个消费者组延迟一段时间后可以再消费(也就是重试)
    在这里插入图片描述

  • fetchSubscribeMessageQueues():获取一个Topic的全部Queue信息

在这里插入图片描述

Pull启动流程

在这里插入图片描述

  • 1.最初创建defaultMQPullConsumerImpl时的状态为ServiceState.CREATE_JSUT,然后设置消费者的默认启动状态为失败

在这里插入图片描述
在这里插入图片描述

  • 2.检查消费者的配置比,如消费者组名、消费类型、Queue的分配策略等参数是否符合规范,将订阅关系数据发给Rebalance服务对象
    在这里插入图片描述
    在这里插入图片描述

  • 3.校验消费者实例名,如果时默认的名字,则更改为当前的程序进程id
    在这里插入图片描述

  • 4.获取一个MQClientInstance,如果MQClientInstance已经初始化,则直接返回初始化的实例。这是核心对象,每个ClientID缓存一个实例
    在这里插入图片描述

  • 5.设置Rebalance对象消费组、消费类型、Queue分配策略、MQClientInstance等参数
    在这里插入图片描述

  • 6.对BrokerAPI的封装类pullAPIWrapper进行初始化,同时注册消息,过滤filter
    在这里插入图片描述

  • 7.初始化位点管理器并加载位点信息,位点管理器分为本地管理和远程管理,集群消费时
    消费位点保存在Broker中,由远程管理器管理,广播消息时位点在本地,由本地管理其管理
    在这里插入图片描述

  • 8.本地注册消费者实例,如果注册成功,则表示消费者启动成功
    在这里插入图片描述

DefaultMQPushConsumer

大部分属性、方法和DefaultMQPullConsumer是一样的

核心属性和方法

  • defaultMQPushConsumerImpl:默认的Push消费者具体实现类
  • consumeFromWhere:一个枚举,表示从什么位点开始消费,
    CONSUME_FROM_LAST_OFFSET:默认从上次消费的位点开始消费,相当于断点继续
    CONSUME_FROM_TIMESTAMP:从指定时间开始消费
    CONSUME_FROM_FIRST_OFFSET:从ConsumeQueue的最小位点开始消费
  • consumeTimestamp:表示从哪一时刻开始消费,时间格式为yyyyMMDDHHmmss,默认半小时前,当consumeFromWhere=CONSUME_FROM_TIMESTAMP时,consumeTimestamp设置的值才生效
  • allocateMessageQueueStrategy:消费者订阅topic-queue策略
  • subscription:订阅关系,表示当前消费者订阅了哪些Topic的哪些Tag
  • messageListener:消息Push回调监听器
  • consumeThreadMin:最小消费线程数,必须小于consumeThreadMax
    consumeThreadMax:最大线程数,必须大于consumeThreadMin
  • adjustThreadPoolNumsThreshold:动态调整消费线程池的线程数大小,开源版本不支持
  • consumeConcurrentlyMaxSpan:并发消息的最大位点差,,如果Pull消息的位点差超过该值,拉取变慢
  • pullThresholdForQueue:一个Queue能缓存的最大消息数,超过该值则采取拉取流控措施,默认是1000
  • pullThresholdSizeForQueue:一个Queue最大能缓存的消息字节数,单位是MB,默认是10MB
  • pullThresholdForTopic:一个Topic最大能缓存的消息数。超过该值则采取拉取流控措施,该字段值默认是-1,该值根据pullThresholdForQueue的配置决定是否生效,pullThresholdForTopic的优先级低于pullThresholdForQueue
  • pullThreasholdSizeForTopic:一个Topic最大能缓存的消息字节数,单位是MB,默认为-1,结合pullThresholdSizeForQueue配置项生效,该配置项的优先级低于pullThresholdSizeForQueue
  • pullInterval:拉取间隔,单位为ms
  • consumeMessageBatchMaxSize:消费者每次批量消费时,最多消费多少条消息,默认是1
  • pullBatchSize:一次最大拉取多少条消息,默认32条
  • postSubscriptionWhenPull:每次拉取消息时是否更新订阅关系,默认false
  • maxReconsumeTimes:最大重试次数,默认-1,表示最大重试次数为16次
  • suspendCurrentQueueTimeMillis为段轮询场景设置的挂起时间,比如顺序消息场景
  • consumeTimeout:消费超时时间,单位为min,默认是15

Push启动流程

在这里插入图片描述

  • 1-7和Pull模式类似
  • 8.初始化消费服务并启动,之所以用户"感觉"消息是Broker主动推送给自己的,
    是因为DefaultMQPushConsumer通过Pull服务将消息
    拉取到本地,再通过Callbakc的形式,将本地消息Push给用户的消费代码,
    DefaultMQPushConsumer和DefaultMQPullConsumer
    获取消息的方式一样,本质上都是拉取。

消费服务分为两种,即并行消费服务和顺序消费服务,对应的实现类分别是
ConsumeMessageConcurrentlyService和ConsumeMessageOrderlyService
根据用户监听器继承的不同接口初始化不同的消费服务程序

在这里插入图片描述

  • 9.启动MQClientInstance实例
    在这里插入图片描述
  • 10.更新本地订阅关系和路由信息,通过Broker检查是否支持消费者的过滤类型;
    向集群中的所有Broker发送消费者组的心跳信息
    在这里插入图片描述
  • 11.立即执行一次Rebalance
    this.mQClientFactory.rebalanceImmediately();

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/361320.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Springboot-SpringCloud学习

文章目录 web项目开发历史 SpringBootSpring以及Springboot是什么微服务第一个Springboot项目配置如何编写 yaml自动装配原理集成 web开发(业务核心)集成 数据库 Druid分布式开发:Dubbo(RPC) zookeeperswagger&#x…

掌握使用 React 和 Ant Design 的个人博客艺术之美

文章目录 前言在React的海洋中起航安装 Create React App安装Ant Design 打造个性化的博客风格通过路由实现多页面美化与样式定制部署与分享总结 前言 在当今数字时代,个人博客成为表达观点、分享经验和展示技能的独特平台。在这个互联网浪潮中,选择使用…

新火种AI|脑洞照进现实!马斯克正式官宣,已将芯片连入大脑...

作者:小岩 编辑:彩云 2024年1月30日,马斯克在社交媒体上宣布,他的公司Neuralink已经完成了首例人类大脑芯片植入手术,并且目前手术恢复状况良好。这一突破性进展意味着人类距离实现大脑与电脑的直接连接更近了一步。…

【mysql】InnoDB引擎的索引

目录 1、B树索引 1.1 二叉树 1.1.1 二分查找(对半查找) 1.1.2 树(Tree) 1.1.2.1 树的定义 1.1.2.2 树的特点 1.1.2.3 二叉树 1.1.2.4 二叉查找(搜索)树 1.2 B树 1.2.1 聚簇索引(clust…

AI-数学-高中-14-函数零点存在定理和运用

原作者视频:【函数综合】【考点精华】1零点存在性定理的运用(基础)_哔哩哔哩_bilibili 1.定义: 2.零点存在定义: 2.函数零点与图像焦点的转化 零点如果不好求,将函数化成两个函数再画图,看函数…

防抖和节流?有什么区别?如何实现?

#一、是什么 本质上是优化高频率执行代码的一种手段 如:浏览器的 resize、scroll、keypress、mousemove 等事件在触发时,会不断地调用绑定在事件上的回调函数,极大地浪费资源,降低前端性能 为了优化体验,需要对这类…

计算机网络-物理层传输介质(导向传输介质-双绞线 同轴电缆 光纤和非导向性传输介质-无线波 微波 红外线 激光)

文章目录 传输介质及分类导向传输介质-双绞线导向传输介质-同轴电缆导向传输介质-光纤非导向性传输介质小结 传输介质及分类 物理层规定电气特性:规定电气信号对应的数据 导向传输介质-双绞线 双绞线的主要作用是传输数据和语音信息。它通过将两根导线以特定的方…

python爬虫2

1.table 是表格&#xff0c;tr是行&#xff0c;td是列 ul li是无序列标签用的较多&#xff0c;ol li是有序列标签 最基本的结构 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><title> Title </title>…

【JavaEE】UDP协议与TCP协议

作者主页&#xff1a;paper jie_博客 本文作者&#xff1a;大家好&#xff0c;我是paper jie&#xff0c;感谢你阅读本文&#xff0c;欢迎一建三连哦。 本文于《JavaEE》专栏&#xff0c;本专栏是针对于大学生&#xff0c;编程小白精心打造的。笔者用重金(时间和精力)打造&…

CSS复合选择器

目录 1.什么是复合选择器 2.后代选择器 3.子选择器 4.并集选择器 5.伪类选择器 5.1链接伪类选择器 5.2 :focus 伪类选择器 6.总结 7.补充 7.1相邻兄弟选择器&#xff08;也叫加号选择器&#xff09; 7.2通用兄弟选择器&#xff08;也叫波浪号选择器&#xff09; 1. 什…

力扣hot100 划分字母区间 贪心 思维 满注释版

Problem: 763. 划分字母区间 文章目录 思路复杂度Code 思路 &#x1f468;‍&#x1f3eb; 代码随想录 复杂度 时间复杂度: O ( n ) O(n) O(n) 空间复杂度: O ( n ) O(n) O(n) Code class Solution {public List<Integer> partitionLabels(String s){// 创建哈希…

AI工具【OCR 01】Java可使用的OCR工具Tess4J使用举例(身份证信息识别核心代码及信息提取方法分享)

Java可使用的OCR工具Tess4J使用举例 1.简介1.1 简单介绍1.2 官方说明 2.使用举例2.1 依赖及语言数据包2.2 核心代码2.3 识别身份证信息2.3.1 核心代码2.3.2 截取指定字符2.3.3 去掉字符串里的非中文字符2.3.4 提取出生日期&#xff08;待优化&#xff09;2.3.5 实测 3.总结 1.简…

闭包的理解?闭包使用场景

说说你对闭包的理解&#xff1f;闭包使用场景 #一、是什么 一个函数和对其周围状态&#xff08;lexical environment&#xff0c;词法环境&#xff09;的引用捆绑在一起&#xff08;或者说函数被引用包围&#xff09;&#xff0c;这样的组合就是闭包&#xff08;closure&#…

FFmpeg和Monibuka拉取rtsp(大华摄像头)视频流时未进行URLCode编码导致提示404等报错

场景 Monibucav4(开源流媒体服务器)在Windows上搭建rtmp服务器并实现拉取rtsp视频流以及转换flv播放&#xff1a; Monibucav4(开源流媒体服务器)在Windows上搭建rtmp服务器并实现拉取rtsp视频流以及转换flv播放_monibuca 搭建流媒体服务-CSDN博客 Nginx搭建RTMP服务器FFmpeg…

Origin 2022下载安装教程,操作简单,小白也能轻松搞定,附安装包,带软件使用教程

前言 Origin是一个科学绘图、数据分析软件&#xff0c;支持各种各样的2D/3D图形&#xff0c;包括统计&#xff0c;信号处理&#xff0c;曲线拟合以及峰值分析&#xff0c;Origin具有强大的数据导入功能和多样的图形输出格式。 准备工作 1、Win7及以上系统 2、提前准备好 Or…

十分钟快速上手Spring Boot与微信小程序API接口的调用,快速开发小程序后端服务

1.1 微信小程序API接口介绍 微信小程序API接口是连接小程序前端与后端服务器的桥梁&#xff0c;它提供了丰富的功能接口&#xff0c;包括用户信息、支付、模板消息、数据存储等。这些API接口能够满足开发者在小程序中实现各种复杂业务逻辑的需求。 用户信息接口 用户信息接口…

Windows PC版微信内置浏览器调试(更新版)

前言 在日常的开发中&#xff0c;尤其是在微信公众号的相关开发中&#xff0c;我们需要进行微信端的调试&#xff0c;如果是后端开发&#xff0c;频率会更高。早期的微信版本&#xff0c;还支持查看网页元素以及接口请求&#xff0c;近年来&#xff0c;微信将这个功能频闭掉了…

链动2+1:打造企业级社交电商新篇章

随着互联网的迅猛发展&#xff0c;社交电商已成为商业领域的新宠。而在众多社交电商模式中&#xff0c;链动21模式以其独特的优势和魅力&#xff0c;正逐渐成为企业级用户的首选。本文将深入探讨链动21模式的优势&#xff0c;以及如何助力企业级用户实现商业成功。 一、链动21模…

vue环境安装 nodejs和vue

npm 是 NodeJS 下的包管理器,vue-cli脚手架模板就是基于 node 下的 npm 来完成安装的。 webpack: 它的主要用途是通过CommonJS的语法把所有浏览器端需要发布的静态资源做相应的准备,比如资源的合并和打包。 vue-cli:官方提供的一个脚手架,用于快速生成一个 vue 的项目模板。…

精通Python第16篇—深入解析Pyecharts极坐标系参数与实战

文章目录 Pyecharts绘制多种炫酷极坐标系参数说明与方向的技术博客1. 导入必要的库2. 极坐标系基础3. 定制化极坐标系4. 方向性的极坐标系5. 极坐标系的动画效果6. 自定义极坐标轴标签7. 添加极坐标系的背景图8. 极坐标系的雷达图总结 Pyecharts绘制多种炫酷极坐标系参数说明与…
最新文章