【源码解析】Apache RocketMQ发送消息源码

send message源码解析

引入

send message方法作为我们经常使用的方法,平时我们很难去关注他底层到底做了什么。大部分人只知道通过send message方法可以将消息发送到broker,然后供消费者进行消费。其实不然,消息从客户端发送到broker,需要中间需要经过很多步骤,比如:首先客户端需要向nameserver拿路由,拿到路由后才能将消息发送到对应的broker。消息到了broker,需要先进行校验,校验无误后,再写到commitLog,写完commitLog后,再根据具体的策略判断是否需要同步到slave节点,同步完slave节点完后才response给客户端。

源码阅读入口

// 客户端入口
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl

// NameServer入口
org.apache.rocketmq.namesrv.processor.ClientRequestProcessor#getRouteInfoByTopic

// Broker端入口
org.apache.rocketmq.broker.processor.SendMessageProcessor#processRequest

源码解析

mid

  • RocketMQ中的客户端send方法提供了单条发送发送也批量发送的API,不管是单条发送还是批量发送本质都是一样的,批量发送会把消息集合包装一下了,具体可以看batch里面的实现,将消息集合封装了MessageBatch对象,当然MessageBatch继承Message。然后再尝试去topicPublishInfoTable中拿路由,如果没有就请求NameServer(忽略经过Proxy层),需要注意的是,请求NameServer获取路由的这个过程是同步的,同一时间只有一个线程可以请求NameServer,需要等到NameServer返回之后才会执行后续的操作。拿到路由后再根据轮询策略选中其中一个broker进行发送。这就是发送消息客户端大致的逻辑,总体来说是还是比较简单的。
  • CONSUMER_SEND_MSG_BACK是消费者发过来的RETRY消息,本次重点不在这里,后续单独讲下这里。当消息到达Broker’端,先根据请求头构建出一个MappingContext对象,再把request对象封装成sendMessageContext;执行注册到sendMessageProcessor里面的钩子方法sendMessageBefore;之后根据是否是batch消息,如果是batch消息,执行sendBatchMessage,不是执行sendMessage方法,其实本质上还是一样的,只是sendBatchMessage中间构建的是messageExtBatch对象,而sendMessage构建的是messageExtBrokerInner对象。MessageExtBatch是MessageExtBrokerInner的子类,所以两者后续还是共用一套逻辑;然后根据是否开启异步写入执行asyncPutMessage或者putMessage,同步的putMessage实际上还是调用的asynPutMessage,只是要等到asyncPutMessage有返回值之后才执行后续的逻辑。我们这里以asyncPutMessage为主,还是先执行注册到SendMessageProcessor里面的钩子方法SendMessageAfter,然后再先判断时候是否是HA(高可用,高可用是需要等到消息写入slave节点成功之后才说明消息发送成功,一般使用在一些金融场景,对消息可靠性要求较高。),然后再然后分配offset(这个offset是由consumeQueue分配的),分配完offset之后,分配完了之后,再将消息体append到commitLog的分配的buf中,返回的状态码PUT_OK执行handleDiskFlush方法,如果是配置的是同步刷盘就等到刷盘成功后返回,如果是异步刷盘,wakeup对应的FlushManager就算写入完成。
  • 上述执行成功后,执行handleHA方法,如果是不是HA模式执行response PUT_OK,否则,构建一个GroupCommitRequest对象put到haService里面,对应slave节点写完最终才算发送成功。

参考:
· https://rocketmq.apache.org/
· 基于Apache Rocket 5.1.0
· https://github.com/apache/rocketmq

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/299028.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

GPU的硬件架构

SM: streaming Multiprocessor 流多处理器 sm里面有多个(sp)cuda core 32个线程称为一个warp,一个warp是一个基本执行单元 抽象概念:grid 网格 block 块 thread 线程 块中的线程大小是有讲究的,关乎到资源的调度,一般是128&#x…

SSD固态硬盘的黄金原则:抱最高的希望,做最坏的打算-1

随着SSD固态硬盘日益普及,在个人电脑中已成为基本的配置选项。在体验SSD固态硬盘带来的性能优势的同时,你有没有想过一个问题,SSD的数据如果误删除或发生故障丢失,还有没有可能找回来呢?这也许是固态硬盘飞入寻常百姓家…

如何在 Windows 电脑上恢复硬盘数据

虽然硬盘偶尔发出安静的咔哒声无需担心,但响亮、持续的咔哒声(有时称为“死亡咔哒声”)应该认真对待。您应该尽快从发出咔嗒声的硬盘驱动器中恢复数据,因为它会比您想象的更快失效。我们下面的指南将探讨从点击硬盘驱动器获取数据…

【读书】《白帽子讲web安全》个人笔记Ⅱ-1

目录 第二篇 客户端脚本安全 第2章 浏览器安全 2.1同源策略 2.2浏览器沙箱 2.3恶意网址拦截 2.4高速发展的浏览器安全 第二篇 客户端脚本安全 第2章 浏览器安全 近年来随着互联网的发展,人们发现浏览器才是互联网最大的入口,绝大多数用户使用互联…

【python学习】-用matplotlib实现将二维数据绘制为三维图形(三维多线图)并实战(三维散点图)

文章目录 绘制一幅三维线图结合for循环绘制多幅三维线图(在一幅图上)美化图形 绘制一幅三维线图 #将二维数据绘制三维图(三维多线图) import matplotlib.pyplot as plt from mpl_toolkits.mplot3d import axes3d import numpy as…

STM32F4xx之库函数

一、库函数介绍 库函数与寄存器的区别 库函数:不需要自己写很多代码,可以利用软件生成代码。使用的时候必须添加库文件。库文件是芯片厂商写好了。占用空间大。 寄存器:自己写的代码量大,没有软件生成代码。使用的时候不需要库文件…

QT c++和qml交互实例

文章目录 一、demo效果图二、c和qml交互的基本方式1、qml访问C类对象 三、关键代码1、工程结构图2、c代码MainWindow.cppMainQuickView.cppStudentInfoView.cppStudentInfoModel.cpp 3、qml代码main.qmlMainQuickTopRect.qmlMainQuickMiddleRect.qmlMainQuickMiddleTableRect.q…

@Async正确使用姿势

Async注解可以使被修饰的方法成为异步方法,简单且方便,这篇文章将教你如何正确的使用它 先谈谈大多数人对Aysnc的认识: 如果直接使用Async,未指定线程池 并且 容器内也没有beanName为taskExecutor的bean,则会使…

im6ull学习总结(三-3)freetype

1、Freetype简介 FreeType是一个开源的字体渲染引擎,主要用于将字体文件转换为位图或矢量图形,并在屏幕上渲染出高质量的字体。它提供了一组API,使开发者能够在自己的应用程序中使用和呈现字体。 FreeType最初是作为一个独立项目开发的&…

欢乐钓鱼^^

欢迎来到程序小院 欢乐钓鱼 玩法&#xff1a;点击鼠标左键左右晃动的鱼钩&#xff0c;下方左右移动的鱼对准鱼的方向即可进行钓鱼&#xff0c; 不同的鱼不同的分数&#xff0c;快去钓鱼吧^^开始游戏https://www.ormcc.com/play/gameStart/241 html <div id"gamediv&qu…

(leetcode)替换所有的问号 -- 模拟算法

个人主页&#xff1a;Lei宝啊 愿所有美好如期而遇 本题链接 力扣&#xff08;LeetCode&#xff09; 输入描述 string modifyString(string s) 输入一个字符串&#xff0c;字符串中仅包含小写字母和 ‘?’ 字符。 输出描述 将问号替换为小写字母&#xff0c;且这个替…

数据结构期末复习

章节知识点分析 第一章绪论 基本概念 数据 数据元素&#xff08;记录、表目&#xff0c;是数据集合中一个个体&#xff09; 数据项&#xff1a;一个数据元素可由若干数据项组成 数据对象&#xff1a;性质相同的数据元素的集合&#xff0c;是数据的一个子集 数据结构&…

LLM漫谈(二)| QAnything支持任意格式文件或数据库的本地知识库问答系统

一、QAnything介绍 QAnything (Question and Answer based on Anything) 是致力于支持任意格式文件或数据库的本地知识库问答系统&#xff0c;可断网安装使用。 您的任何格式的本地文件都可以往里扔&#xff0c;即可获得准确、快速、靠谱的问答体验。 目前已支持格式: PDF&…

MiniCom串口调试工具使用

一、程序安装 执行下面代码&#xff0c;安装minicom。 sudo apt-get install minicom 二、查看串口设备名称 先拔掉串口运行下面指令&#xff0c;获得所有设备名称,插上串口再运行一次&#xff0c;新增的就是串口设备名称&#xff0c;记住串口设备名称&#xff0c;以串口设备名…

LeetCode-整数反转(7)

题目描述&#xff1a; 给你一个 32 位的有符号整数 x &#xff0c;返回将 x 中的数字部分反转后的结果。 如果反转后整数超过 32 位的有符号整数的范围 [−231&#xff0c;231− 1] &#xff0c;就返回 0。 假设环境不允许存储 64 位整数&#xff08;有符号或无符号&#xff0…

[4K80 AI ISP IPC芯片]

4K80 AI ISP IPC芯片 Hi3403V100是一颗面向监控市场推出的专业 Ultra-HD Smart IP Camera SOC&#xff0c;该芯片最高支持四路sensor输入&#xff0c;支持最高4K60的ISP图像处理能力&#xff0c;支持3F WDR加粗样式、多级降噪、六轴防抖、硬件拼接等多种图像增强和处理算法&am…

C++多态性——(5)运算符重载(第二节)

归纳编程学习的感悟&#xff0c; 记录奋斗路上的点滴&#xff0c; 希望能帮到一样刻苦的你&#xff01; 如有不足欢迎指正&#xff01; 共同学习交流&#xff01; &#x1f30e;欢迎各位→点赞 &#x1f44d; 收藏⭐ 留言​&#x1f4dd; 身先才能率人&#xff0c;律己才能服人…

【SpringBoot】公共字段自动填充功能实现(枚举、自定义注解、AOP、反射)

1. 自定义注解 使用interface语法来定义注解&#xff08;Annotation&#xff09;。 注解的参数类似无参数方法&#xff0c;可以用default设定一个默认值&#xff0c;比如String value() default "";。 元注解&#xff1a;有一些注解可以修饰其他注解&#xff0c;这…

基础面试题整理2

1.抽象类与接口区别 语法&#xff1a; 抽象类用abstract定义&#xff1b;接口用interface定义抽象类被子类继承extends&#xff08;不可用final修饰&#xff09;&#xff1b;接口被类实现implements抽象类的属性访问无限制,方法不可用private修饰&#xff1b;接口中的方法只能…

【STM32】STM32学习笔记-DMA数据转运+AD多通道(24)

00. 目录 文章目录 00. 目录01. DMA简介02. DMA相关API2.1 DMA_Init2.2 DMA_InitTypeDef2.3 DMA_Cmd2.4 DMA_SetCurrDataCounter2.5 DMA_GetFlagStatus2.6 DMA_ClearFlag 03. DMA数据单通道接线图04. DMA数据单通道示例05. DMA数据多通道接线图06. DMA数据多通道示例一07. DMA数…