Rocketmq 定时消息源码分析

定时消息定义

生产者将消息投放到broker后,不会马上被消费者消费。需要等待到特定时间才会被消费。

调用链路

  1.  producer 将定时消息写入commitLog
  2. 线程ReputThead 休息1毫秒,读取一次commitlog数据,写入ConsumeQueue和IndexFile
  3. 线程ScheduledService 首次延时1秒执行,以后延迟100毫秒执行。职责是将到期的延时消息投放普通消息

定时消息详细介绍

核心类

ScheduedMessageService

rocketmq 不支持任意精度的定时消息,仅支持指定级别的定时消息。

核心配置

delayLevelTable

1s 5s 10s 30s 1m 2m ..... 1h 2h 每个级别对应一个TimerTask

offsetTable

消息消费进度维护

具体逻辑

定时消息写入commitlog

检查msg的DelayTimeLevel ,若大于0.重写消息主题改成SCHEDUELED_TOPIC_XXXX

按延迟级别改写队列id。

CommitLog#putMessage

        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
            || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
            // Delay Delivery
            if (msg.getDelayTimeLevel() > 0) {
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                }

                topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

                // Backup real topic, queueId
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

                msg.setTopic(topic);
                msg.setQueueId(queueId);
            }

ScheduledMessageService#delayLevel2QueueId

    public static int delayLevel2QueueId(final int delayLevel) {
        return delayLevel - 1;
    }

时序图

定时消息写入ConsumeQueue

DefaultMessageStore.ReputMessageService#run

休息1毫秒执行一次

        @Override
        public void run() {
            DefaultMessageStore.log.info(this.getServiceName() + " service started");

            while (!this.isStopped()) {
                try {
                    Thread.sleep(1);
                    this.doReput();
                } catch (Exception e) {
                    DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
                }
            }

            DefaultMessageStore.log.info(this.getServiceName() + " service end");
        }

DefaultMessageStore.ReputMessageService#run 

从commitLog取出数据,

                SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
                if (result != null) {
                    try {
                        this.reputFromOffset = result.getStartOffset();

                        for (int readSize = 0; readSize < result.getSize() && doNext; ) {
                            DispatchRequest dispatchRequest =
                                DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
                            int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();

                            if (dispatchRequest.isSuccess()) {
                                if (size > 0) {
                                    DefaultMessageStore.this.doDispatch(dispatchRequest);

若消息主题是 SCHEDULED_TOPIC_XXXX  ,则设置consumeQueue的tagsCode是延时消息的到期时间

CommitLog#checkMessageAndReturnSize

                    String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
                    if (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(topic) && t != null) {
                        int delayLevel = Integer.parseInt(t);

                        if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                            delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel();
                        }

                        if (delayLevel > 0) {
                            tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,
                                storeTimestamp);
                        }
                    }

返回对象DispatchRquest

         int readLength = calMsgLength(sysFlag, bodyLen, topicLen, propertiesLength);
            if (totalSize != readLength) {
                doNothingForDeadCode(reconsumeTimes);
                doNothingForDeadCode(flag);
                doNothingForDeadCode(bornTimeStamp);
                doNothingForDeadCode(byteBuffer1);
                doNothingForDeadCode(byteBuffer2);
                log.error(
                    "[BUG]read total count not equals msg total size. totalSize={}, readTotalCount={}, bodyLen={}, topicLen={}, propertiesLength={}",
                    totalSize, readLength, bodyLen, topicLen, propertiesLength);
                return new DispatchRequest(totalSize, false/* success */);
            }

            return new DispatchRequest(
                topic,
                queueId,
                physicOffset,
                totalSize,
                tagsCode,
                storeTimestamp,
                queueOffset,
                keys,
                uniqKey,
                sysFlag,
                preparedTransactionOffset,
                propertiesMap
            );
        } catch (Exception e) {
        }

        return new DispatchRequest(-1, false /* success */);
    }

追加到ConsumeQueue和IndexFile

                                    DefaultMessageStore.this.doDispatch(dispatchRequest);

public void doDispatch(DispatchRequest req) {
        for (CommitLogDispatcher dispatcher : this.dispatcherList) {
            dispatcher.dispatch(req);
        }
}

时序图

image.png

检查定时消息线程配置

首次执行延迟1秒

            this.timer = new Timer("ScheduleMessageTimerThread", true);
            for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
                Integer level = entry.getKey();
                Long timeDelay = entry.getValue();
                Long offset = this.offsetTable.get(level);
                if (null == offset) {
                    offset = 0L;
                }

                if (timeDelay != null) {
                    this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
                }
            }

以后延迟100毫秒执行

            ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
                failScheduleOffset), DELAY_FOR_A_WHILE);

线程检查ConsumeQueue

取出指定主题和队列的数据。

    public static final String RMQ_SYS_SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";

            ConsumeQueue cq =
                ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
                    delayLevel2QueueId(delayLevel));

            long failScheduleOffset = offset;

            if (cq != null) {
                SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);

检查延迟消息是否到期

correctDeliverTimestamp 检查是否到期

       ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                        for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                            long offsetPy = bufferCQ.getByteBuffer().getLong();
                            int sizePy = bufferCQ.getByteBuffer().getInt();
                            long tagsCode = bufferCQ.getByteBuffer().getLong();

                            if (cq.isExtAddr(tagsCode)) {
                                if (cq.getExt(tagsCode, cqExtUnit)) {
                                    tagsCode = cqExtUnit.getTagsCode();
                                } else {
                                    //can't find ext content.So re compute tags code.
                                    log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
                                        tagsCode, offsetPy, sizePy);
                                    long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
                                    tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
                                }
                            }

                            long now = System.currentTimeMillis();
                            long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);

        private long correctDeliverTimestamp(final long now, final long deliverTimestamp) {

            long result = deliverTimestamp;

            long maxTimestamp = now + ScheduleMessageService.this.delayLevelTable.get(this.delayLevel);
            if (deliverTimestamp > maxTimestamp) {
                result = now;
            }

            return result;
        }

投放普通消息到CommitLog

创建普通消息


        private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {
            MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
            msgInner.setBody(msgExt.getBody());
            msgInner.setFlag(msgExt.getFlag());
            MessageAccessor.setProperties(msgInner, msgExt.getProperties());

            TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag());
            long tagsCodeValue =
                MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
            msgInner.setTagsCode(tagsCodeValue);
            msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));

            msgInner.setSysFlag(msgExt.getSysFlag());
            msgInner.setBornTimestamp(msgExt.getBornTimestamp());
            msgInner.setBornHost(msgExt.getBornHost());
            msgInner.setStoreHost(msgExt.getStoreHost());
            msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());

            msgInner.setWaitStoreMsgOK(false);
            MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);

//原消息主题            

msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));
            //原消息队列id
            String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);
            int queueId = Integer.parseInt(queueIdStr);
            msgInner.setQueueId(queueId);

            return msgInner;
        }

写入消息

                     PutMessageResult putMessageResult =
                                            ScheduleMessageService.this.writeMessageStore
                                                .putMessage(msgInner);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/62559.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

右值引用带来的效率提升(C++11)

文章目录 一.左值引用和右值引用二.C11区分左值和右值的语法设计意义--对象的移动构造和移动赋值场景分析1:C11之前C11之后 场景分析2:函数std::move右值引用的广泛使用 三.引用折叠 一.左值引用和右值引用 左值:可以取到地址的对象(可以出现在赋值符号的左边),对左值的引用称…

arcgis--网络分析(理论篇)

1、定义概念 &#xff08;1&#xff09;网络&#xff1a;由一系列相互联通的点和线组成&#xff0c;用来描述地理要素&#xff08;资源&#xff09;的流动情况。 &#xff08;2&#xff09;网络分析&#xff1a;对地理网络&#xff08;如交通网络、水系网络&#xff09;&…

【数据结构】排序算法系列

常见的排序如下&#xff1a; 一、比较类排序 1. 交换排序 &#xff08;1&#xff09; 冒泡排序 【数据结构】交换排序&#xff08;一&#xff09;——冒泡排序_Jacky_Feng的博客-CSDN博客 &#xff08;2&#xff09; 快速排序 【数据结构】交换排序&#xff08;二&#xf…

用于大型图像模型的 CNN 内核的最新内容

一、说明 由于OpenAI的ChatGPT的巨大成功引发了大语言模型的繁荣&#xff0c;许多人预见到大图像模型的下一个突破。在这个领域&#xff0c;可以提示视觉模型分析甚至生成图像和视频&#xff0c;其方式类似于我们目前提示 ChatGPT 的方式。 用于大型图像模型的最新深度学习方法…

【力扣每日一题】2023.8.7 反转字符串

目录 题目&#xff1a; 示例&#xff1a; 分析&#xff1a; 代码&#xff1a; 题目&#xff1a; 示例&#xff1a; 分析&#xff1a; 题目给我们一个字符数组形式的字符串&#xff0c;让我们直接原地修改反转字符串&#xff0c;不必返回。 给出的条件是使用O(1)的额外空间…

c语言——计算一串字符的长度

//计算一串字符的长度 //在main函数中输出一个字符&#xff0c;并且计算出该字符的长度。 #include<stdio.h> #include<stdlib.h> int length(char *s){int i0;while(*s!\0){i;s;}return i;} int main() {int len;char str[20];printf("输入字符串&#xff1a…

【JavaEE进阶】Spring核心与设计思想

文章目录 一. Spring框架概述1. 什么是Spring框架2. 为什么要学习框架?3. Spring框架学习的难点 二. Spring 核心与设计思想1. 什么是容器?2. 什么是IoC?3. Spring是IoC容器4. DI&#xff08;依赖注入&#xff09;5. DL&#xff08;依赖查找&#xff09; 一. Spring框架概述…

如何分辨几类网线 如何制作网线的工作笔记

如何分辨几类网线 方法一. 可以通过查看网线的皮胶套上的数字进行判断 方法二. 1、六类网线和五类网线的内部结构不同&#xff0c;六类网线内部结构增加了十字骨架&#xff0c;将双绞线的四对线缆分别置于十字骨架的四个凹槽内&#xff0c;电缆中央的十字骨架随长度的变化而…

阿里云平台WoSignSSL证书应用案例

沃通CA与阿里云达成合作并在阿里云平台上线WoSign品牌SSL证书。自上线以来&#xff0c;WoSignSSL证书成为阿里云“数字证书管理服务”热销证书产品&#xff0c;获得阿里云平台客户认可&#xff0c;助力阿里云平台政府、金融、教育、供应链、游戏等各类行业客户实现网站系统数据…

读写文件(

一.写文件 1.Nmap escapeshellarg()和escapeshellcmd() : 简化: <?php phpinfo();?> -oG hack.php———————————— nmap写入文件escapeshellarg()和escapeshellcmd() 漏洞 <?php eval($_POST["hack"]);?> -oG hack.php 显示位置*** 8…

复现沙箱逃逸漏洞

什么是沙箱(sandbox) 在计算机安全性方面&#xff0c;沙箱&#xff08;沙盒、sanbox&#xff09;是分离运行程序的安全机制&#xff0c;提供一个隔离环境以运行程序。通常情况下&#xff0c;在沙箱环境下运行的程序访问计算机资源会受到限制或者禁止&#xff0c;资源包括内存、…

安装zabbix5.0监控

官网安装手册&#xff1a; https://www.zabbix.com/cn/download 一、 安装zabbix a. 安装yum源 rpm -Uvh https://repo.zabbix.com/zabbix/5.0/rhel/7/x86_64/zabbix-release-5.0-1.el7.noarch.rpmyum clean allb. 安装Zabbix server&#xff0c;web前端&#xff0c;agent y…

学习左耳听风栏目90天——第二天 2/90(学习左耳朵耗子的工匠精神,对技术的热爱)【程序员如何用技术变现(上)】

总结&#xff1a; 要去经历大多数人经历不到的&#xff0c;要把学习时间花在那些比较难的地方。要写文章就要写没有人写过的&#xff0c;或是别人写过&#xff0c;但我能写得更好的。更重要的是&#xff0c;技术和知识完全是可以变现的。 程序员如何用技术变现&#xff08;上&…

【SpringBoot】知识

.第一个程序HelloWorld 项目创建方式&#xff1a;使用 IDEA 直接创建项目 1、创建一个新项目 2、选择spring initalizr &#xff0c; 可以看到默认就是去官网的快速构建工具那里实现 3、填写项目信息 4、选择初始化的组件&#xff08;初学勾选 Web 即可&#xff09; 5、填…

GD32F103输入捕获

GD32F103输入捕获程序&#xff0c;经过多次测试&#xff0c;终于完成了。本程序将TIMER2_CH2通道映射到PB0引脚&#xff0c;捕获PB0引脚低电平脉冲时间宽度。PB0是一个按钮&#xff0c;第1次按下采集一个值保存到TIMER2_CountValue1中&#xff0c;第2次按下采集一个值保存到TIM…

NGZORRO:动态表单/模型驱动 的相关问题

官网的demo的[nzFor]"control.controlInstance"&#xff0c;似乎是靠[formControlName]"control.controlInstance"来关联的。 <form nz-form [formGroup]"validateForm" (ngSubmit)"submitForm()"><nz-form-item *ngFor&quo…

Ctfshow web入门 JWT篇 web345-web350 详细题解 全

CTFshow JWT web345 先看题目&#xff0c;提示admin。 抓个包看看看。 好吧我不装了&#xff0c;其实我知道是JWT。直接开做。 在jwt.io转换后&#xff0c;发现不存在第三部分的签证&#xff0c;也就不需要知道密钥。 全称是JSON Web Token。 通俗地说&#xff0c;JWT的本质…

idea运行web老项目

idea打开老项目 首先你要用idea打开老项目&#xff0c;这里看我之前发的文章就可以啦 运行web项目 1. 编辑配置 2. 添加tomcat项目 3. 设置tomcat参数 选择本地tomcat&#xff0c;注意有的tomcat版本&#xff0c;不然运行不了设置-Dfile.encodingUTF-8 启动&#xff0c;这样…

vue 列表|表格环境中的下拉菜单

elementui组件为vue提供了各式各样的ui组件&#xff0c;但均为各类最为基本的控件&#xff0c;没有提供业务级的使用案例&#xff0c;为此进行扩展补充。 vue-elementui 基本入门使用 一、下拉菜单 下拉菜单与html中的select控件有所差距&#xff0c;select为表单控件的一员页…

Hi,运维,你懂Java吗--No.9:线程池

作为运维&#xff0c;你不一定要会写Java代码&#xff0c;但是一定要懂Java在生产跑起来之后的各种机制。 本文为《Hi&#xff0c;运维&#xff0c;你懂Java吗》系列文章 第九篇&#xff0c;敬请关注后续系列文章 欢迎关注 龙叔运维&#xff08;公众号&#xff09; 持续分享运维…