8、Broker进一步了解

1、Broker消息分发服务以及构建ConsumeQueue和IndexFile与消息清除

前面分析如何进行刷盘,本章分析Broker的消息分发以及构建ConsumerQueue和IndexFile,两者构建是为了能够提高效率,减少消息查找时间以及减少网络带宽与存储空间。

ConsumeQueue 可以快速定位消息,IndexFile 可以快速定位 ConsumeQueue 中的位置,并随时实时更新,以保证消息的消费速度。

ConsumeQueue 和 IndexFile 采用了压缩存储和分片存储的方式,消费者可以仅仅消费需要的消息,从而节省存储空间。

ConsumeQueue 可以快速定位消息,减少消息传递所消耗的网络带宽,提高消息传递效率。

消息分发

在执行Broker的启动方法会去启动消息分发服务,简单理解就是为消息建立一种索引更加快速有效的消费查询消息。
broker处理消息分发的类是ReputMessageService,启动一个线程不断地将commitLong分到到对应的consumerQueue与indexFile,消息分发流程处理完毕;


public void run() {
    DefaultMessageStore.log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            // 休眠1s,进行分发ConsumeQueue和IndexFile
            Thread.sleep(1);
            this.doReput();
        } catch (Exception e) {
            DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }

    DefaultMessageStore.log.info(this.getServiceName() + " service end");
}

消息每隔1s进行消息的分发,尝试去构建ConsumerQueue索引与

IndexFile索引。
private void doReput() {
    if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
        log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
            this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
        this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
    }
    // 如果分发偏移量小于commitlog的最大物理偏移量,那么循环分发
    for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {

        if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
            && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
            break;
        }
      // 从commitLog中获取需要分发的消息
        SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
        if (result != null) {
            try {
                this.reputFromOffset = result.getStartOffset();

                for (int readSize = 0; readSize < result.getSize() && doNext; ) {
                    DispatchRequest dispatchRequest =
                        DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
                    int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();

                    if (dispatchRequest.isSuccess()) {
                        if (size > 0) {
                            // CommitLog日志分发到ConsumeQueue和IndexFile
                            DefaultMessageStore.this.doDispatch(dispatchRequest);
                            // 长轮询:如果有消息到了主节点,并且开启了长轮询。(消息拉取逻辑后面单独讲)
                            if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
                                    && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()
                                    && DefaultMessageStore.this.messageArrivingListener != null) {
                                //messageArrivingListener实例是NotifyMessageArrivingListener
                                //通知被hold住的消费者拉取消息的请求
                                DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
                                    dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
                                    dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
                                    dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
                                notifyMessageArrive4MultiQueue(dispatchRequest);
                            }

                            this.reputFromOffset += size;
                            readSize += size;
                            
                            if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
                                DefaultMessageStore.this.storeStatsService
                                    .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1);
                                DefaultMessageStore.this.storeStatsService
                                    .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
                                    .add(dispatchRequest.getMsgSize());
                            }
                        } else if (size == 0) {
                             // 如果等于0,表示读取到MappedFile文件尾
                            // 获取下一个文件的起始索引
                            this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
                            readSize = result.getSize();
                        }
                    } else if (!dispatchRequest.isSuccess()) {

                        if (size > 0) {
                            log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
                            this.reputFromOffset += size;
                        } else {
                            doNext = false;
                            // If user open the dledger pattern or the broker is master node,
                            // it will not ignore the exception and fix the reputFromOffset variable
                            if (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() ||
                                DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
                                log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}",
                                    this.reputFromOffset);
                                this.reputFromOffset += result.getSize() - readSize;
                            }
                        }
                    }
                }
            } finally {
                result.release();
            }
        } else {
            doNext = false;
        }
    }
}

通过getData()方法来获取需要进行分发的消息,根据reputFromOffset的物理偏移量找到mappedFileQueue中对应的CommitLog文件的MappedFile,然后从该MappedFile中截取一段自reputFromOffset偏移量开始的ByteBuffer。
依次进行消息的分发调用doDispatch方法,把消息写入ConsumerQueue与IndexFile中。
如果当前节点是主节点且开启长轮询,则调用messageArrivingListener.arriving方法进行消息的投递,具体长轮询的方案后期介绍。
构建ConsumerQueue与IndexFile


public void doDispatch(DispatchRequest req) {
    //dispatchList中包含CommitLogDispatcherBuildConsumeQueue和CommitLogDispatcherBuildIndex,
    // 分别用来构建ConsumeQueue文件和IndexFile文件。
    for (CommitLogDispatcher dispatcher : this.dispatcherList) {
        dispatcher.dispatch(req);
    }
}

调用dispatch进行消息的分发,其中包括三个实现类。

CommitLogDispatcherBuildConsumeQueue类:写ConsumeQueue文件,构建ConsumeQueue索引。

CommitLogDispatcherBuildIndex类:写IndexFile文件,构建IndexFile索引。

CommitLogDispatcherCalcBitMap类:构建布隆过滤器,加速SQL92过滤效率,避免每次都解析sql。

消息清理

当消息到达一定程度就会被RocketMQ丢弃,而在Broker启动会去添加定时删除文件的任务,60s之后执行第一个后面每隔10s执行一次。

// 过期文件删除
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        // 清除方法
        DefaultMessageStore.this.cleanFilesPeriodically();
    }
}

private void cleanFilesPeriodically() {
    // 删除CommitLog文件
    this.cleanCommitLogService.run();
    // 清除ConsumerQueue文件和IndexFile文件
    this.cleanConsumeQueueService.run();
}

清除的主要逻辑:首先清除CommitLog文件,然后在清除ConsumerQueue文件和IndexFile文件。

CommitLog文件的清除
// 定期删除CommitLog文件的地方
public void run() {
    try {
        // 删除文件
        this.deleteExpiredFiles();
        // 删除挂起文件
        this.redeleteHangedFile();
    } catch (Throwable e) {
        DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
    }
}

private void deleteExpiredFiles() {
    int deleteCount = 0;
    long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
    int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
    int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
    //K2 Broker触发文件删除的三个条件
    boolean timeup = this.isTimeToDelete(); // deleteWhen是否达到,配置项
    boolean spacefull = this.isSpaceToDelete(); // 判断磁盘是否达到删除,diskmax配置项
    boolean manualDelete = this.manualDeleteFileSeveralTimes > 0; // 手动删除次数
    // 并未判断消息是否已经被消费,所以当长时间积压的消息,触发上面三个条件之一就会导致消息丢失未消费的情况,所以我们得避免消息积压。
    if (timeup || spacefull || manualDelete) {

        if (manualDelete)
            this.manualDeleteFileSeveralTimes--;

        boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;

        log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}",
            fileReservedTime,
            timeup,
            spacefull,
            manualDeleteFileSeveralTimes,
            cleanAtOnce);

        fileReservedTime *= 60 * 60 * 1000;

        deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
            destroyMapedFileIntervalForcibly, cleanAtOnce);
        if (deleteCount > 0) {
        } else if (spacefull) {
            log.warn("disk space will be full soon, but delete file failed.");
        }
    }
}

首先删除文件需要满足的要求是其一deleteWhen是否达到默认是04(可配置),其二判断磁盘是否达到删除默认75(可配置),其三手动删除。

触发过期⽂件删除时,有两个检查的纬度,⼀个是,是否到了触发删除的时间,就是broker.conf⾥配置的deleteWhen属性。另外还会检查磁盘利⽤率,达到阈值也会触发过期⽂件删除。这个阈值默认是75%,可以在broker.conf⽂件当中定制。但是最⼤值为95,最⼩值为10。

注意:观察源码可得 并未判断消息是否已经被消费,所以当长时间积压的消息,触发上面三个条件之一就会导致消息丢失未消费的情况,所以我们得避免消息积压。

如果删除的时候我们的文件正在被使用这就导致删除不会生效此时就需要redeleteHangedFile方法区删除挂起文件。

ConsumerQueueIndexFile清理
public void run() {
    try {
        // 删除过期的file
        this.deleteExpiredFiles();
    } catch (Throwable e) {
        DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
    }
}

private void deleteExpiredFiles() {
    // 间隔100
    int deleteLogicsFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval();

    long minOffset = DefaultMessageStore.this.commitLog.getMinOffset();
    if (minOffset > this.lastPhysicalMinOffset) {
        this.lastPhysicalMinOffset = minOffset;

        ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;

        for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {
            for (ConsumeQueue logic : maps.values()) {
                // 进行consumerQueue删除
                int deleteCount = logic.deleteExpiredFile(minOffset);

                if (deleteCount > 0 && deleteLogicsFilesInterval > 0) {
                    try {
                        Thread.sleep(deleteLogicsFilesInterval);
                    } catch (InterruptedException ignored) {
                    }
                }
            }
        }
    // 进行IndexFile删除
        DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset);
    }
}

在删除ConsumeQueue和IndexFile⽂件时,会去检查CommitLog当前的最⼩Offset,然后在删除时进⾏对⻬。

2、Broker端整体流程以及文件索引结构

至此我们的Producer发送消息以及Broker如何处理存储消息以及介绍完了,首先我们需要去整理一下整个流程,然后后面再来分析Consumer端的消费,以及各种消息类型的案例源码等。

发送端流程总结

生产者发送消息写入内存推送给Broker进行刷盘入CommitLog文件,同时处理分发将消息索引存放到ConsumerQueue与IndexFile文件当中。

如果是主从架构就会去进行主从之间的同步。

接下来当消息达到一定条件就会触发删除机制,程序注册了一个定时任务首次60s执行后面每隔10s去执行一次清理工作。注意:删除文件并没有判断是否被消费过了只要达到要求就会删除继而导致消息的丢失,所以我们得去避免消息的积压。
在这里插入图片描述
索引结构

CommitLog⽂件的⼤⼩是固定的,但是其中每个存储的每个消息单元长度是不固定的,在源码CommitLog类的calMsgLength方法有计算消息长度的方法。

protected static int calMsgLength(int sysFlag, int bodyLength, int topicLength, int propertiesLength) {
    int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20;
    int storehostAddressLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 8 : 20;
    final int msgLen = 4 //TOTALSIZE
        + 4 //MAGICCODE
        + 4 //BODYCRC
        + 4 //QUEUEID
        + 4 //FLAG
        + 8 //QUEUEOFFSET
        + 8 //PHYSICALOFFSET
        + 4 //SYSFLAG
        + 8 //BORNTIMESTAMP
        + bornhostLength //BORNHOST
        + 8 //STORETIMESTAMP
        + storehostAddressLength //STOREHOSTADDRESS
        + 4 //RECONSUMETIMES
        + 8 //Prepared Transaction Offset
        + 4 + (bodyLength > 0 ? bodyLength : 0) //BODY
        + 1 + topicLength //TOPIC
        + 2 + (propertiesLength > 0 ? propertiesLength : 0) //propertiesLength
        + 0;
    return msgLen;
}

正因为消息的记录⼤⼩不固定,所以RocketMQ在每次存CommitLog⽂件时,都会去检查当前CommitLog⽂件空间是否⾜够,如果不够的话,就重新创建⼀个CommitLog⽂件,⽂件名为当前消息的偏移量。

ConsumeQueue⽂件主要是加速消费者的消息索引。他的每个⽂件夹对应RocketMQ中的⼀个MessageQueue,⽂件夹下的⽂件记录了每个MessageQueue中的消息在CommitLog⽂件当中的偏移量。这样,消费者通过ComsumeQueue⽂件,就可以快速找到CommitLog⽂件中感兴趣的消息记录。⽽消费者在ConsumeQueue⽂件当中的消费进度会保存在config/consumerOffset.json⽂件当中。

ConsumerQueue=30w固定大小20byte的数据块组成(8字节msgPhyOffset表示起始位置+4字节msgSize文件占用长度+8字节magTagCode消息的tag的Hash值)。

IndexFile⽂件主要是辅助消息检索。他的作⽤主要是⽤来⽀持根据key和timestamp检索消息。它的⽂件名⽐较特殊不以消息偏移量命名⽽是⽤的时间命名,但是其实它也是⼀个固定⼤⼩的⽂件。

IndexFile=固定40字节indexHeader+固定20字节的个数500w的slot+固定20字节最多个数500W*4的index。

indexHeader表示文件头,slots表示一系列的槽位,index表示真正的索引,槽位里面存放index,槽位紧挨着indexHeader而每个槽位里面最多存放4个index。

在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/223335.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

逆向爬虫进阶实战:突破反爬虫机制,实现数据抓取

文章目录 一、引言二、逆向爬虫进阶技巧三、逆向爬虫进阶实战代码片段四、总结与展望好书推荐内容简介作者简介前言节选 一、引言 随着网络技术的发展&#xff0c;网站为了保护自己的数据和资源&#xff0c;纷纷采用了各种反爬虫机制。然而&#xff0c;逆向爬虫技术的出现&…

农业装备行业分析:中国市场规模增长到4500多亿元

农业装备是指用于农业生产过程的先进农业机械、设备和设施。主要包括&#xff1a;农业田间作业机械、设施农业装备、农产品加工装备、农业生物质利用装备、农田设施与装备、农业信息化装备等。 农业装备服务于大农业&#xff0c;包括种植业、养殖业、加工业、服务业等&#xff…

详解python 面向对象三大特征

文章目录 一、面向对象三大特征介绍1、封装&#xff08;隐藏&#xff09;2、继承3、多态 二、继承1、语法格式2、类成员的继承和重写3、super()获得父类定义4、设计模式\_工厂模式实现 5、设计模式\_单例模式实现关于Python技术储备一、Python所有方向的学习路线二、Python基础…

Jupyter Notebook工具

Jupyter Notebook 是一个交互式的笔记本环境&#xff0c;允许用户以网页形式编写和分享代码、文本、图像以及其它多媒体内容。它支持超过 40 种编程语言&#xff0c;最常用的是 Python。 以下是 Jupyter Notebook 工具的一些特点和用法&#xff1a; 1. 特点&#xff1a; 交互式…

HTML程序大全(2):通用注册模版

一、正常情况效果 二、某项没有填写的效果 三、没有勾选同意项的效果 四、代码 <!DOCTYPE html> <html> <head><meta charset"UTF-8"><title>注册</title><style>body {font-family: Arial, sans-serif;background-color…

STM32基础教程 p16 窗口看门狗(WWDG)

1 窗口看门狗工作原理 1.1 简介 WWDG简介 窗口看门狗通常被用来监测&#xff0c;由外部干扰或不可预见的逻辑条件造成的应用程序背离正常的运 行序列而产生的软件故障。除非递减计数器的值在T6位变成0前被刷新&#xff0c;看门狗电路在达到预置 的时间周期时&#xff0c;会产…

Win系统缺少xinput1_3.dll有什么处理办法?看看这几个快速解决方法

一、xinput1_3.dll文件介绍&#xff1a; 文件类型&#xff1a;xinput1_3.dll是一种动态链接库&#xff08;DLL&#xff09;文件&#xff0c;用于提供系统功能。 文件大小&#xff1a;xinput1_3.dll文件大小通常在几KB到几十KB之间&#xff0c;具体取决于操作系统版本和编译时…

全息图着色器插件:Hologram Shaders Pro for URP, HDRP Built-in

8个新的Unity全息图着色器,具有故障效果,扫描线,网格线,和更多其他效果!与所有渲染管线兼容。 软件包添加了一系列的全息图着色器到Unity。从基本的全息图与菲涅耳亮点,先进的全息图与两种故障效应,扫描线,文体点阵和网格线全息图! 特色全息效果 Basic-支持菲涅耳发光照…

AWS CodeWhisperer:基于机器学习的代码建议工具

#AWS CodeWhisperer&#xff1a;基于机器学习的代码建议工具 AWS CodeWhisper概述 Amazon CodeWhisperer 是一种基于机器学习&#xff08;ML&#xff09;的服务&#xff0c;它可以根据Amazon CodeWhisperer 是一种基于机器学习&#xff08;ML&#xff09;的服务&#xff0c;它…

单片机开发常用的软件构架

对于单片机程序来说&#xff0c;大家都不陌生&#xff0c;但是真正使用架构&#xff0c;考虑架构的恐怕并不多&#xff0c;随着程序开发的不断增多&#xff0c;架构是非常必要的。 一、时间片轮询法 介于前后台顺序执行法和操作系统之间的一种程序架构设计方案。该设计方案需能…

软件测试外包干了2个月,技术进步2年。。。

先说一下自己的情况&#xff0c;本科生&#xff0c;18年通过校招进入北京某软件公司&#xff0c;干了接近2年的功能测试&#xff0c;今年国庆&#xff0c;感觉自己不能够在这样下去了&#xff0c;长时间呆在一个舒适的环境会让一个人堕落!而我已经在一个企业干了2年的功能测试&…

学习UnitTest框架,轻松打造无懈可击的代码!

一、什么是UnitTest&#xff1f; 1、介绍 unittest是Python自带的一个单元测试框架&#xff0c;它可以做单元测试&#xff0c;也能用于编写和运行重复的测试工作。 它给自动化测试用例开发和执行提供了丰富的断言方法&#xff0c;判断测试用例是否通过&#xff0c;并最终生成…

C++基础 -46- 类的静态函数成员

类的静态函数成员可以不创建类直接访问 #include "iostream"using namespace std;class base {public:static void show(){cout << "hello world" << endl;} };int main() {base::show(); }类的静态函数成员不能访问非静态成员 class base…

一维卡尔曼滤波(C++)

文章目录 头文件Cppexample 头文件 /*Copyright(C) 2019- White Noise TeamOne dimensional Kalman Filter implementation foruse with BNO055 IMU to filter noisy gyro andaccelerometer measurements.This software is distributed under the MIT LicenseThis file includ…

Elasticsearch:向量搜索的优势 — 以及 IT 领导者需要它来改善搜索体验的 5 个原因

作者&#xff1a;Evan Castle 与谷歌和亚马逊等高质量搜索引擎的频繁互动提高了客户对快速且相关搜索的期望。 向量搜索&#xff08;也称为语义向量搜索&#xff09;利用深度学习和机器学习来捕获数据的含义和上下文。 向量搜索的好处 向量搜索可以增强公司的搜索体验并带来广…

RobotFramework编写用例,在Jenkins上如何实现用例的并发运行?

我们了解RobotFramework编写自动化测试用例的方法&#xff0c;了解如何将用例在Jenkins上运行。 但是&#xff0c;随着用例的增多&#xff0c;传统的pybot/robot命令运行测试用例会耗费大量的时间&#xff0c;这就慢慢成为了一个苦恼的问题。 那么&#xff0c;在Jenkins上如何…

基于深度学习CRNN的水表读数识别系统

1.研究背景与意义 项目参考AAAI Association for the Advancement of Artificial Intelligence 研究背景与意义 随着科技的不断发展&#xff0c;深度学习技术在各个领域都取得了显著的成果。其中&#xff0c;基于深度学习的图像识别技术在计算机视觉领域具有重要的应用价值。…

249:vue+openlayers 坐标转地址,点击后在弹窗显示

第249个 点击查看专栏目录 本示例是演示如何在vue+openlayers项目中点击某点,转化经纬度坐标为地址信息,弹窗显示。 通过点击地图,获取到经纬度坐标,然后通过调取mapbox的地址转换API,将经纬度坐标转化为地址信息,通过overlay的方式,在弹窗中展示出来。 直接复制下面的…

11.机器人系统仿真搭建gazebo环境、仿真深度相机、雷达、RGB相机

目录 1 gazebo仿真环境搭建 1.1 直接添加内置组件创建仿真环境 1.2 urdf、gazebo、rviz的综合应用 2 ROS_control 2.1 运动控制实现流程(Gazebo) 2.1.1 已经创建完毕的机器人模型&#xff0c;编写一个单独的 xacro 文件&#xff0c;为机器人模型添加传动装置以及控制器 …

阿里大佬讲解的接口自动化测试框架pytest系列——pluggy插件源码解读:hook钩子函数调用执行过程分析

经过pluggy源码解读系列1-4的分析&#xff0c;已经完成插件定义、spec定义&#xff0c;插件注册等环节&#xff0c;下面就到了调用插件执行了&#xff0c;即hook钩子函数是如何被调用执行的&#xff0c;下面还是先把pluggy使用的代码放下面&#xff1a; import pluggy# Hooksp…
最新文章