4、Rocketmq之存储原理

CommitLog ~ MappedFileQueue ~ MappedFile集合
在这里插入图片描述

正常情况下,RocketMQ支持消息体字节数最多为1个G。注意该消息体并不单单是消息体body。如果生产的消息其字节数超过1个G则该消息是无法被落盘处理的。因为没有一个MapperFile文件可以承载该消息所有的字节数。

1.AllocateMappedFileService

参考文章

异步初始化CommitLog文件时优先初始化nextFilePath、nextNextFilePath两个文件。

同时创建nextFilePath【/data/rocketmq/commitlog/00000000000000000000】、nextNextFilePath【/data/rocketmq/commitlog/00000000000000000050】两个文件是如何使用的呢?

  1. 优先返回nextFilePath,并添加到MappedFileQueue集合属性mappedFiles中。此时队列requestQueue为空。requestTable集合元素为nextNextFilePath【/data/rocketmq/commitlog/00000000000000000050】。
  2. 如果消息体的长度没有达到当前MapperFile中字节缓冲区capacity的大小,则不会创建新的MapperFile文件。
  3. 如果步骤2不成立,则创建新的nextFilePath【/data/rocketmq/commitlog/00000000000000000050】、nextNextFilePath【/data/rocketmq/commitlog/00000000000000000100】对应的MapperFile文件。但是由于requestTable集合不为空即存在nextFilePath对应的MapperFile文件【/data/rocketmq/commitlog/00000000000000000050】则删除并返回当前集合元素。
  4. 此时requestTable集合元素为nextNextFilePath【/data/rocketmq/commitlog/00000000000000000100】。MappedFileQueue中集合属性mappedFiles中存在00000000000000000000、00000000000000000050两个MappedFile文件。

如果真实发送的消息字节数没有超过当前字节缓冲区剩余空间则优先当前MapperFile文件处理。否则创建新的MapperFile文件。

public class AllocateMappedFileService extends ServiceThread {
    private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
    private static int waitTimeOut = 1000 * 5;
    private ConcurrentMap<String, AllocateRequest> requestTable = new ConcurrentHashMap<>();
    private PriorityBlockingQueue<AllocateRequest> requestQueue = new PriorityBlockingQueue<>();
    private volatile boolean hasException = false;
    private DefaultMessageStore messageStore;

    public AllocateMappedFileService(DefaultMessageStore messageStore) {
        this.messageStore = messageStore;
    }
	// nextFilePath:CommitLog文件路径 /data/rocketmq/commitlog/00000000000000000000
    public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
        int canSubmitRequests = 2;
        ...
        AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
        boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;
        ...
		boolean offerOK = this.requestQueue.offer(nextReq);
		
        AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
        boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
        ...
        boolean offerOK = this.requestQueue.offer(nextNextReq);
        ...
        // 每次只是返回nextFilePath对应的MappedFile。此时 requestQueue 队列为空,requestTable集合中只是存在 nextNextFilePath 对应的MappedFile文件
        // 如果
        AllocateRequest result = this.requestTable.get(nextFilePath);
       	messageStore.getPerfCounter().startTick("WAIT_MAPFILE_TIME_MS");
       	// 阻塞等待 线程AllocateMappedFileService 初始化MapperFile文件。默认时间为5秒
        boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);
        messageStore.getPerfCounter().endTick("WAIT_MAPFILE_TIME_MS");
        if (!waitOK) {
            log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize());
            return null;
        } else {
            this.requestTable.remove(nextFilePath);
            // 返回nextFilePath对应的MappedFile文件,并添加到MappedFileQueue中集合属性mappedFiles中
            return result.getMappedFile();
        }
    }
    
	...
	
    public void run() {// 初始化 MapperFile文件 任务
        while (!this.isStopped() && this.mmapOperation()) {
        }
    }

    /**
     * Only interrupted by the external thread, will return false
     */
    private boolean mmapOperation() {
        boolean isSuccess = false;
        AllocateRequest req = null;
        try {
            req = this.requestQueue.take();//移除并返回元素,否则阻塞等待
            AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());
            ...
            if (req.getMappedFile() == null) {
                long beginTime = System.currentTimeMillis();
                //创建对应对应大小、对应磁盘地址的本地文件。并且建立磁盘 & 内核映射关系
                MappedFile mappedFile = new DefaultMappedFile(req.getFilePath(), req.getFileSize());;
                ...
                // pre write mappedFile 预热处理
                if (mappedFile.getFileSize() >= mappedFileSizeCommitLog && warmMapedFileEnable) {
                    FlushDiskType flushDiskType = this.messageStore.getMessageStoreConfig().getFlushDiskType();
                    MessageStoreConfig messageStoreConfig = this.messageStore.getMessageStoreConfig();
                    int flushLeastPagesWhenWarmMapedFile = messageStoreConfig.getFlushLeastPagesWhenWarmMapedFile();
                    mappedFile.warmMappedFile(flushDiskType,flushLeastPagesWhenWarmMapedFile);
                }

                req.setMappedFile(mappedFile);
                this.hasException = false;
                isSuccess = true;
            }
        } finally {
            if (req != null && isSuccess)
                req.getCountDownLatch().countDown();//初始化完毕释放锁
        }
        return true;
    }

    static class AllocateRequest implements Comparable<AllocateRequest> {
        // Full file path
        private String filePath;
        private int fileSize;
        private CountDownLatch countDownLatch = new CountDownLatch(1);
        private volatile MappedFile mappedFile = null;

        public AllocateRequest(String filePath, int fileSize) {
            this.filePath = filePath;
            this.fileSize = fileSize;
        }
		...
        public CountDownLatch getCountDownLatch() {
            return countDownLatch;
        }

        public void setCountDownLatch(CountDownLatch countDownLatch) {
            this.countDownLatch = countDownLatch;
        }

        public MappedFile getMappedFile() {
            return mappedFile;
        }

        public void setMappedFile(MappedFile mappedFile) {
            this.mappedFile = mappedFile;
        }
    }
}

2.DefaultMappedFile

public class DefaultMappedFile extends AbstractMappedFile {

	public DefaultMappedFile(final String fileName, final int mappedFileSizeCommitLog) throws IOException {
        init(fileName, mappedFileSizeCommitLog);
    }
	
	private void init(final String fileName, final int mappedFileSizeCommitLog) throws IOException {
        this.fileName = fileName;
        this.mappedFileSizeCommitLog = mappedFileSizeCommitLog;
        this.file = new File(fileName);
        this.fileFromOffset = Long.parseLong(this.file.getName());
        boolean ok = false;

        UtilAll.ensureDirOK(this.file.getParent());

        try {
            this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
            this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, mappedFileSizeCommitLog);
            TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(mappedFileSizeCommitLog);
            TOTAL_MAPPED_FILES.incrementAndGet();
            ok = true;
        }finally {
            if (!ok && this.fileChannel != null) {
                this.fileChannel.close();
            }
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/67611.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

智慧工地源码 智慧工地云平台源码 智慧工地APP源码

智慧工地的核心是数字化&#xff0c;它通过传感器、监控设备、智能终端等技术手段&#xff0c;实现对工地各个环节的实时数据采集和传输&#xff0c;如环境温度、湿度、噪音等数据信息&#xff0c;将数据汇集到云端进行处理和分析&#xff0c;生成各种报表、图表和预警信息&…

windows10开启远程连接

目录 开启远程连接远程连接 开启远程连接 右击电脑图标->属性 点击 远程设置 远程连接 找到 远程桌面连接 点击 远程桌面连接 输入远程ip 10.0.8.5 然后点击连接 4.输入默认用户名new的密码&#xff0c;然后确定&#xff0c;搞定。

网络加速与文件传输软件:如何通过优化网络提升文件传输速度

在信息化社会&#xff0c;文件传输是人们生活和工作中必不可少的一个环节。但是&#xff0c;由于网络环境的多样性和传输过程中可能出现的各种问题&#xff0c;文件传输速度经常受到影响。因此&#xff0c;如何优化网络以提高文件传输速度成为了一个重要的课题。本文将探讨网络…

SQL SERVER 异地备份到远程共享文件夹异常处理

SQL SERVER 异地备份到远程共享文件夹异常处理 SQL Server 异地备份到远程共享文件夹异常处理 - 灰信网&#xff08;软件开发博客聚合&#xff09; -- 允许配置高级选项 EXEC sp_configure show advanced options, 1 GO -- 重新配置 RECONFIGURE GO -- 启用xp_cmdshell EXEC sp…

正则表达式的使用

1、正则表达式-教程 正则表达式&#xff1a;文本模式&#xff0c;包括普通字符&#xff08;例如&#xff0c;a到z之间的字母&#xff09;和特殊字符&#xff08;称为元字符&#xff09;。 正则表达式使用单个字符串来描述&#xff0c;匹配一系列匹配某个句法规则的字符串。 2、…

如何卸载SOLIDWORKS软件?

本文将为您提供一份简易指南&#xff0c;介绍如何正确卸载SOLIDWORKS软件&#xff0c;并分享一些注意事项&#xff0c;确保您的卸载过程顺利进行。 SOLIDWORKS软件作为一款强大的三维设计和工程分析工具&#xff0c;为许多工程师提供了优良的创作平台。然而&#xff0c;有时候我…

Redis-简单动态字符串(SDS)

文章目录 文章概要SDS数据结构定义SDS和C字符串的区别总结参考 文章概要 本篇文章&#xff0c;我们来学习Redis字符串的编码格式SDS编码&#xff0c;文章将将从以下几个方面介绍SDS&#xff1a; SDS的底层数据结构定义Redis是C写的&#xff0c;那SDS和C中的字符串的区别是什么…

OpenMV 自适应颜色阈值

目录 演示视频 思路讲解 OprnMV代码 演示视频 备战2023电赛~openmv自适应颜色阈值&#xff08;附源代码网盘链接&#xff09; 思路讲解 1. 参考openmv官方例程讲解10-Color-Tracking->image_statistics_info图像统计信息https://book.openmv.cc/example/10-Color-Trackin…

【Linux】gcc编译器的使用和介绍

目录 一&#xff0c;GCC简介 二&#xff0c;GCC的主要组件 三&#xff0c;GCC的工作流程 四&#xff0c;GCC的一些重要特性和功能 五&#xff0c;GCC常用的编译选项 六&#xff0c;GCC的输入输出选项的具体用法 七&#xff0c;GCC的参考文档 一&#xff0c;GCC简介 GCC&…

AI语音工牌在通讯行业营业大厅场景应用

在运营商营业大厅中&#xff0c;每天都有大量的客户来访咨询、办理业务。同时也会经常产生大量的客诉纠纷和服务差评。但因为缺乏有效的管理工具&#xff0c;加上线下沟通场景的数据采集难度高&#xff0c;数字化程度低&#xff0c;管理一直处于盲区。如何有效的管控营业厅人员…

2023最新Windows编译ffmpeg详细教程,附msys2详细安装配置教程

安装MSYS2 msys2是一款跨平台编译套件&#xff0c;它模拟linux编译环境&#xff0c;支持整合mingw32和mingw64&#xff0c;能很方便的在windows上对一些开源的linux工程进行编译运行。 类似的跨平台编译套件有&#xff1a;msys&#xff0c;cygwin&#xff0c;mingw 优势&…

【密码学】六、公钥密码

公钥密码 1、概述1.1设计要求1.2单向函数和单向陷门函数 2、RSA公钥密码体制2.1加解密2.2安全性分析 3、ElGamal公钥密码体制3.1加解密算法3.2安全性分析 4、椭圆曲线4.1椭圆曲线上的运算4.2ECC 5、SM2公钥密码体制5.1参数选取5.2密钥派生函数5.3加解密过程5.3.1初始化5.3.2加密…

Maven引入本地jar包

maven做为一种强大的依赖管理工具&#xff0c;可以帮助我们更方便的管理项目中的依赖&#xff1b;而在使用过程中我们难免会有需要引入本地jar包的需求&#xff0c;这里踩过坑之后我分享俩种引入方式&#xff1b; 1.上传jar到本地maven仓库&#xff0c;再引入 使用此方法后可…

最强自动化测试框架Playwright-操作指南(3)-PO模式

playwright支持PO模式 创建页面对象 class SearchPage:def __init__(self, page):self.page pageself.search_term_input page.get_by_role("searchbox", name"输入搜索词")def navigate(self):self.page.goto("https://bing.com")def searc…

探索远程访问内网群晖NAS 6.X(使用独立域名)【内网穿透】

使用自己的域名远程访问内网群晖NAS 6.X【内网穿透】 文章目录 使用自己的域名远程访问内网群晖NAS 6.X【内网穿透】 在之前的文章中&#xff0c;我们向大家演示了如何使用cpolar&#xff0c;创建一条固定的、能够在公共互联网登录内网群晖NAS的数据隧道。这条隧道已经能够应对…

ASEMI快恢复二极管APT80DQ20BG怎么检查好坏

编辑-Z 二极管APT80DQ20BG是一种高压快恢复二极管&#xff0c;常用于电源和电能质量控制等领域。如果您的二极管出现故障或需要进行维修&#xff0c;以下是一些可能的解决方案。 首先&#xff0c;确保您已经断开了电源&#xff0c;并且具备基本的电子维修知识和技能。如果您不…

Linux:shell脚本:基础使用(3)

for循环语句 语句格式 for for变量 in 取值列表&#xff08;可以是变量或者自己定义&#xff09; do 循环内容 done 工作方式就是通过取值列表去判断循环的次数&#xff0c;每次循环的同时把列表一行的值赋予到for变量。取值方式如果是数字&#xff0c;那就通过数字去…

neo4j查询语言Cypher详解(二)--Pattern和类型

Patterns 图形模式匹配是Cypher的核心。它是一种用于通过应用声明性模式从图中导航、描述和提取数据的机制。在MATCH子句中&#xff0c;可以使用图模式定义要搜索的数据和要返回的数据。图模式匹配也可以在不使用MATCH子句的情况下在EXISTS、COUNT和COLLECT子查询中使用。 图…

【计算机网络笔记】第一章

1、计算机网络定义 计算机网络主要是由一些通用的、可编程的硬件&#xff08;包含CPU、计算机、手机、智能电器…&#xff09;互连而成的&#xff0c;而这些硬件并非专门用来实现某一特定目的&#xff08;例如&#xff0c;传送数据或视频信号&#xff09;。这些可编程的硬件能…

机器学习深度学习——池化层

&#x1f468;‍&#x1f393;作者简介&#xff1a;一位即将上大四&#xff0c;正专攻机器学习的保研er &#x1f30c;上期文章&#xff1a;机器学习&&深度学习——卷积的多输入多输出通道 &#x1f4da;订阅专栏&#xff1a;机器学习&&深度学习 希望文章对你们…
最新文章