RocketMQ-RocketMQ高性能核心原理与源码剖析(下)

融汇贯通阶段

​ 开始梳理一些比较完整,比较复杂的完整业务线。

8、消息持久化设计

1、RocketMQ的持久化文件结构

​ 消息持久化也就是将内存中的消息写入到本地磁盘的过程。而磁盘IO操作通常是一个很耗性能,很慢的操作,所以,对消息持久化机制的设计,是一个MQ产品提升性能的关键,甚至可以说是最为重要的核心也不为过。这部分我们就先来梳理RocketMQ是如何在本地磁盘中保存消息的。

​ 在进入源码之前,我们首先需要看一下RocketMQ在磁盘上存了哪些文件。RocketMQ消息直接采用磁盘文件保存消息,默认路径在${user_home}/store目录。这些存储目录可以在broker.conf中自行指定。

image.png

  • 存储文件主要分为三个部分

    • CommitLog:存储消息的元数据。所有消息都会顺序存入到CommitLog文件当中。CommitLog由多个文件组成,每个文件固定大小1G。以第一条消息的偏移量为文件名。
    • ConsumerQueue:存储消息在CommitLog的索引。一个MessageQueue一个文件,记录当前MessageQueue被哪些消费者组消费到了哪一条CommitLog。
    • IndexFile:为了消息查询提供了一种通过key或时间区间来查询消息的方法,这种通过IndexFile来查找消息的方法不影响发送与消费消息的主流程

    ​ 另外,还有几个辅助的存储文件,主要记录一些描述消息的元数据:

    • checkpoint:数据存盘检查点。里面主要记录commitlog文件、ConsumeQueue文件以及IndexFile文件最后一次刷盘的时间戳。
    • config/*.json:这些文件是将RocketMQ的一些关键配置信息进行存盘保存。例如Topic配置、消费者组配置、消费者组消息偏移量Offset 等等一些信息。
    • abort:这个文件是RocketMQ用来判断程序是否正常关闭的一个标识文件。正常情况下,会在启动时创建,而关闭服务时删除。但是如果遇到一些服务器宕机,或者kill -9这样一些非正常关闭服务的情况,这个abort文件就不会删除,因此RocketMQ就可以判断上一次服务是非正常关闭的,后续就会做一些数据恢复的操作。

​ 整体的消息存储结构,官方做了个图进行描述:

image.png

​ 简单来说,Producer发过来的所有消息,不管是属于那个Topic,Broker都统一存在CommitLog文件当中,然后分别构建ConsumeQueue文件和IndexFile两个索引文件,用来辅助消费者进行消息检索。这种设计最直接的好处是可以较少查找目标文件的时间,让消息以最快的速度落盘。对比Kafka存文件时,需要寻找消息所属的Partition文件,再完成写入。当Topic比较多时,这样的Partition寻址就会浪费非常多的时间。所以Kafka不太适合多Topic的场景。而RocketMQ的这种快速落盘的方式,在多Topic的场景下,优势就比较明显了。

​ 然后在文件形式上:

​ CommitLog文件的大小是固定的。文件名就是当前CommitLog文件当中存储的第一条消息的Offset。

​ ConsumeQueue文件主要是加速消费者进行消息索引。每个文件夹对应RocketMQ中的一个MessageQueue,文件夹下的文件记录了每个MessageQueue中的消息在CommitLog文件当中的偏移量。这样,消费者通过ConsumeQueue文件,就可以快速找到CommitLog文件中感兴趣的消息记录。而消费者在ConsumeQueue文件中的消费进度,会保存在config/consumerOffset.json文件当中。

​ IndexFile文件主要是辅助消费者进行消息索引。消费者进行消息消费时,通过ConsumeQueue文件就足够完成消息检索了,但是如果消费者指定时间戳进行消费,或者要按照MeessageId或者MessageKey来检索文件,比如RocketMQ管理控制台的消息轨迹功能,ConsumeQueue文件就不够用了。IndexFile文件就是用来辅助这类消息检索的。他的文件名比较特殊,不是以消息偏移量命名,而是用的时间命名。但是其实,他也是一个固定大小的文件。

​ 这是对RocketMQ存盘文件最基础的了解,但是只有这样的设计,是不足以支撑RocketMQ的三高性能的。RocketMQ如何保证ConsumeQueue、IndexFile两个索引文件与CommitLog中的消息对齐?如何保证消息断电不丢失?如何保证文件高效的写入磁盘?等等。如果你想要去抓住RocketMQ这些三高问题的核心设计,那么还是需要到源码当中去深究。

​ 以下几个部分非常重要,所以有必要单独拉出章节来详细讲解。

2、commitLog写入

​ 消息存储的入口在: DefaultMessageStore.asyncPutMessage方法

怎么找到这个方法的?这个大家可以自行往上溯源。其实还是可以追溯到Broker处理Producer发送消息的请求的SendMessageProcessor中。

​ CommitLog的asyncPutMessage方法中会给写入线程加锁,保证一次只会允许一个线程写入。写入消息的过程是串行的,一次只会允许一个线程写入。

​ 最终进入CommitLog中的DefaultAppendMessageCallback#doAppend方法,这里就是Broker写入消息的实际入口。这个方法最终会把消息追加到MappedFile映射的一块内存里,并没有直接写入磁盘。而是在随后调用ComitLog#submitFlushRequest方法,提交刷盘申请。刷盘完成之后,内存中的文件才真正写入到磁盘当中。

​ 在提交刷盘申请之后,就会立即调用CommitLog#submitReplicaRequest方法,发起主从同步申请。

3、文件同步刷盘与异步刷盘

​ 入口:CommitLog.submitFlushRequest

​ 这里涉及到了对于同步刷盘与异步刷盘的不同处理机制。这里有很多极致提高性能的设计,对于我们理解和设计高并发应用场景有非常大的借鉴意义。

​ 同步刷盘和异步刷盘是通过不同的FlushCommitLogService的子服务实现的。

  //org.apache.rocketmq.store.CommitLog的构造方法
  if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
            this.flushCommitLogService = new GroupCommitService();
        } else {
            this.flushCommitLogService = new FlushRealTimeService();
        }

        this.commitLogService = new CommitRealTimeService();

​ 同步刷盘采用的是GroupCommitService子线程。虽然是叫做同步刷盘,但是从源码中能看到,他实际上并不是来一条消息就刷一次盘。而是这个子线程每10毫秒执行一次doCommit方法,扫描文件的缓存。只要缓存当中有消息,就执行一次Flush操作。

​ 而异步刷盘采用的是FlushRealTimeService子线程。这个子线程最终也是执行Flush操作,只不过他的执行时机会根据配置进行灵活调整。所以可以看到,这里异步刷盘和同步刷盘的最本质区别,实际上是进行Flush操作的频率不同。

我们经常说使用RocketMQ的同步刷盘,可以保证Broker断电时,消息不会丢失。但是可以看到,RocketMQ并不可能真正来一条消息就进行一次刷盘,这样在海量数据下,操作系统是承受不了的。而只要不是来一次消息刷一次盘,那么在Broker直接断电的情况接下,就总是会有内存中的消息没有刷入磁盘的情况,这就会造成消息丢失。所以,对于消息安全性的设计,其实是重在取舍,无法做到绝对。

​ 同步刷盘异步刷盘最终落地到FileChannel的force方法。这个force方法就会最终调用一次操作系统的fsync系统调用,完成文件写入。关于force操作的详细演示,可以参考后面的零拷贝部分。

//org.apache.rocketmq.store
 public int flush(final int flushLeastPages) {
        if (this.isAbleToFlush(flushLeastPages)) {
            if (this.hold()) {
                int value = getReadPosition();

                try {
                    //We only append data to fileChannel or mappedByteBuffer, never both.
                    if (writeBuffer != null || this.fileChannel.position() != 0) {
                        this.fileChannel.force(false);
                    } else {
                        this.mappedByteBuffer.force();
                    }
                } catch (Throwable e) {
                    log.error("Error occurred when force data to disk.", e);
                }

                this.flushedPosition.set(value);
                this.release();
            } else {
                log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
                this.flushedPosition.set(getReadPosition());
            }
        }
        return this.getFlushedPosition();
    }

​ 而另外一个CommitRealTimeService这个子线程则是用来写入堆外内存的。应用可以通过配置TransientStorePoolEnable参数开启对外内存,如果开启了堆外内存,会在启动时申请一个跟CommitLog文件大小一致的堆外内存,这部分内存就可以确保不会被交换到虚拟内存中。而CommitRealTimeService处理消息的方式则只是调用mappedFileQueue的commit方法。这个方法只是往操作系统的PagedCache里写入消息,并不主动进行刷盘操作。会由操作系统通过Dirty Page机制,在某一个时刻进行统一刷盘。例如我们在正常关闭操作系统时,经常会等待很长时间。这里面大部分的时间其实就是在做PageCache的刷盘。

    public boolean commit(final int commitLeastPages) {
        boolean result = true;
        MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0);
        if (mappedFile != null) {
            int offset = mappedFile.commit(commitLeastPages);
            long where = mappedFile.getFileFromOffset() + offset;
            result = where == this.committedWhere;
            this.committedWhere = where;
        }

        return result;
    }

​ 然后,在梳理同步刷盘与异步刷盘的具体实现时,可以看到一个小点,RocketMQ是如何让两个刷盘服务间隔执行的?RocketMQ提供了一个自己实现的CountDownLatch2工具类来提供线程阻塞功能,使用CAS驱动CountDownLatch2的countDown操作。每来一个消息就启动一次CAS,成功后,调用一次countDown。而这个CountDonwLatch2在Java.util.concurrent.CountDownLatch的基础上,实现了reset功能,这样可以进行对象重用。如果你对JUC并发编程感兴趣,那么这也是一个不错的学习点。

​ 到这里,我们只是把同步刷盘和异步刷盘的机制梳理清楚了。但是关于force操作跟刷盘有什么关系?如果你对底层IO操作不是很理解,那么很容易产生困惑。没关系,保留你的疑问,下一部分我们会一起梳理。

4、CommigLog主从复制

​ 入口:CommitLog.submitReplicaRequest

​ 主从同步时,也体现到了RocketMQ对于性能的极致追求。最为明显的,RocketMQ整体是基于Netty实现的网络请求,而在主从复制这一块,却放弃了Netty框架,转而使用更轻量级的Java的NIO来构建。

​ 在主要的HAService中,会在启动过程中启动三个守护进程。

 //HAService#start
 public void start() throws Exception {
        this.acceptSocketService.beginAccept();
        this.acceptSocketService.start();
        this.groupTransferService.start();
        this.haClient.start();
    }

​ 这其中与Master相关的是acceptSocketService和groupTransferService。其中acceptSocketService主要负责维护Master与Slave之间的TCP连接。groupTransferService主要与主从同步复制有关。而slave相关的则是haClient。

​ 至于其中关于主从的同步复制与异步复制的实现流程,还是比较复杂的,有兴趣的同学可以深入去研究一下。

推荐一篇可供参考的博客 RocketMQ源码分析之主从数据复制-CSDN博客

5、分发ConsumeQueue和IndexFile

​ 当CommitLog写入一条消息后,在DefaultMessageStore的start方法中,会启动一个后台线程reputMessageService。源码就定义在DefaultMessageStore中。这个后台线程每隔1毫秒就会去拉取CommitLog中最新更新的一批消息。如果发现CommitLog中有新的消息写入,就会触发一次doDispatch。

 //org.apache.rocketmq.store.DefaultMessageStore中的ReputMessageService线程类
 public void doDispatch(DispatchRequest req) {
        for (CommitLogDispatcher dispatcher : this.dispatcherList) {
            dispatcher.dispatch(req);
        }
    }

​ dispatchList中包含两个关键的实现类CommitLogDispatcherBuildConsumeQueue和CommitLogDispatcherBuildIndex。源码就定义在DefaultMessageStore中。他们分别用来构建ConsumeQueue索引和IndexFile索引。

具体的构建逻辑比较复杂,在下面章节了解ConsumeQueue文件和IndexFile文件的具体构造后,会比较容易看懂一点。

​ 并且,如果服务异常宕机,会造成CommitLog和ConsumeQueue、IndexFile文件不一致,有消息写入CommitLog后,没有分发到索引文件,这样消息就丢失了。DefaultMappedStore的load方法提供了恢复索引文件的方法,入口在load方法。

6、过期文件删除机制

​ 入口: DefaultMessageStore.addScheduleTask -> DefaultMessageStore.this.cleanFilesPeriodically()

​ 在这个方法中会启动两个线程,cleanCommitLogService用来删除过期的CommitLog文件,cleanConsumeQueueService用来删除过期的ConsumeQueue和IndexFile文件。

​ 在删除CommitLog文件时,Broker会启动后台线程,每60秒,检查CommitLog、ConsumeQueue文件。然后对超过72小时的数据进行删除。也就是说,默认情况下, RocketMQ只会保存3天内的数据。这个时间可以通过fileReservedTime来配置。

​ 触发过期文件删除时,有两个检查的纬度,一个是,是否到了触发删除的时间,也就是broker.conf里配置的deleteWhen属性。另外还会检查磁盘利用率,达到阈值也会触发过期文件删除。这个阈值默认是72%,可以在broker.conf文件当中定制。但是最大值为95,最小值为10。

​ 然后在删除ConsumeQueue和IndexFile文件时,会去检查CommitLog当前的最小Offset,然后在删除时进行对齐。

​ 需要注意的是,RocketMQ在删除过期CommitLog文件时,并不检查消息是否被消费过。 所以如果有消息长期没有被消费,是有可能直接被删除掉,造成消息丢失的。

​ RocketMQ整个文件管理的核心入口在DefaultMessageStore的start方法中,整体流程总结如下:

image.png

7、文件索引结构

​ 了解了大部分的文件写入机制之后,最后我们来理解一下RocketMQ的索引构建方式。

​ 1、CommitLog文件的大小是固定的,但是其中存储的每个消息单元长度是不固定的,具体格式可以参考org.apache.rocketmq.store.CommitLog中计算消息长度的方法

    protected static int calMsgLength(int sysFlag, int bodyLength, int topicLength, int propertiesLength) {
        int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20;
        int storehostAddressLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 8 : 20;
        final int msgLen = 4 //TOTALSIZE
            + 4 //MAGICCODE
            + 4 //BODYCRC
            + 4 //QUEUEID
            + 4 //FLAG
            + 8 //QUEUEOFFSET
            + 8 //PHYSICALOFFSET
            + 4 //SYSFLAG
            + 8 //BORNTIMESTAMP
            + bornhostLength //BORNHOST
            + 8 //STORETIMESTAMP
            + storehostAddressLength //STOREHOSTADDRESS
            + 4 //RECONSUMETIMES
            + 8 //Prepared Transaction Offset
            + 4 + (bodyLength > 0 ? bodyLength : 0) //BODY
            + 1 + topicLength //TOPIC
            + 2 + (propertiesLength > 0 ? propertiesLength : 0) //propertiesLength
            + 0;
        return msgLen;
    }

​ 正因为消息的记录大小不固定,所以RocketMQ在每次存CommitLog文件时,都会去检查当前CommitLog文件空间是否足够,如果不够的话,就重新创建一个CommitLog文件。文件名为当前消息的偏移量。

​ 2、ConsumeQueue文件主要是加速消费者的消息索引。他的每个文件夹对应RocketMQ中的一个MessageQueue,文件夹下的文件记录了每个MessageQueue中的消息在CommitLog文件当中的偏移量。这样,消费者通过ComsumeQueue文件,就可以快速找到CommitLog文件中感兴趣的消息记录。而消费者在ConsumeQueue文件当中的消费进度,会保存在config/consumerOffset.json文件当中。

​ 文件结构: 每个ConsumeQueue文件固定由30万个固定大小20byte的数据块组成,数据块的内容包括:msgPhyOffset(8byte,消息在文件中的起始位置)+msgSize(4byte,消息在文件中占用的长度)+msgTagCode(8byte,消息的tag的Hash值)。

msgTag是和消息索引放在一起的,所以,消费者根据Tag过滤消息的性能是非常高的。

​ 在ConsumeQueue.java当中有一个常量CQ_STORE_UNIT_SIZE=20,这个常量就表示一个数据块的大小。

​ 例如,在ConsumeQueue.java当中构建一条ConsumeQueue索引的方法 中,就是这样记录一个单元块的数据的。

    private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
        final long cqOffset) {

        if (offset + size <= this.maxPhysicOffset) {
            log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);
            return true;
        }

        this.byteBufferIndex.flip();
        this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
        this.byteBufferIndex.putLong(offset);
        this.byteBufferIndex.putInt(size);
        this.byteBufferIndex.putLong(tagsCode);
        //.......
    }

​ 3、IndexFile文件主要是辅助消息检索。他的作用主要是用来支持根据key和timestamp检索消息。他的文件名比较特殊,不是以消息偏移量命名,而是用的时间命名。但是其实,他也是一个固定大小的文件。

​ 文件结构: 他的文件结构由 indexHeader(固定40byte)+ slot(固定500W个,每个固定20byte) + index(最多500W*4个,每个固定20byte) 三个部分组成。

indexFile的详细结构有大厂之前面试过,可以参考一下我的博文: RocketMQ之底层IndexFile存储协议_rocketmq index-CSDN博客

​ 然后,了解这些文件结构有什么用呢?下面的延迟消息机制就是一个例子。

9、延迟消息机制

1、关注重点

​ 延迟消息是RocketMQ非常有特色的一个功能,其他MQ产品中,往往需要开发者使用一些特殊方法来变相实现延迟消息功能。而RocketMQ直接在产品中实现了这个功能,开发者只需要设定一个属性就可以快速实现。

​ 延迟消息的核心使用方法就是在Message中设定一个MessageDelayLevel参数,对应18个延迟级别。然后Broker中会创建一个默认的Schedule_Topic主题,这个主题下有18个队列,对应18个延迟级别。消息发过来之后,会先把消息存入Schedule_Topic主题中对应的队列。然后等延迟时间到了,再转发到目标队列,推送给消费者进行消费。

2、源码重点

​ 延迟消息的处理入口在scheduleMessageService这个组件中。 他会在broker启动时也一起加载。

1、消息写入到系统内置的Topic中

​ 代码见CommitLog.putMessage方法。

​ 在CommitLog写入消息时,会判断消息的延迟级别,然后修改Message的Topic和Queue,将消息转储到系统内部的Topic中,这样消息就对消费者不可见了。而原始的目标信息,会作为消息的属性,保存到消息当中。

if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
                || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
            // Delay Delivery
            //K1 延迟消息转到系统Topic
            if (msg.getDelayTimeLevel() > 0) {
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                }

                topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
                int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

                // Backup real topic, queueId
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
    //修改消息的Topic和Queue,转储到系统的Topic中。
                msg.setTopic(topic);
                msg.setQueueId(queueId);
            }
        }

​ 十八个队列对应了十八个延迟级别,这也说明了为什么这种机制下不支持自定义时间戳。

2、消息转储到目标Topic

​ 接下来就是需要过一点时间,再将消息转回到Producer提交的Topic和Queue中,这样就可以正常往消费者推送了。

​ 这个转储的核心服务是scheduleMessageService,他也是Broker启动过程中的一个功能组件。随DefaultMessageStore组件一起构建。这个服务只在master节点上启动,而在slave节点上会主动关闭这个服务。

 //org.apache.rocketmq.store.DefaultMessageStore
 @Override
    public void handleScheduleMessageService(final BrokerRole brokerRole) {
        if (this.scheduleMessageService != null) {
            if (brokerRole == BrokerRole.SLAVE) {
                this.scheduleMessageService.shutdown();
            } else {
                this.scheduleMessageService.start();
            }
        }

    }

​ 由于RocketMQ的主从节点支持切换,所以就需要考虑这个服务的幂等性。在节点切换为slave时就要关闭服务,切换为master时就要启动服务。并且,即便节点多次切换为master,服务也只启动一次。所以在ScheduleMessageService的start方法中,就通过一个CAS操作来保证服务的启动状态。

if (started.compareAndSet(false, true)) {

​ 这个CAS操作还保证了在后面,同一时间只有一个DeliverDelayedMessageTimerTask执行。这种方式,给整个延迟消息服务提供了一个基础保证。

​ ScheduleMessageService会每隔1秒钟执行一个executeOnTimeup任务,将消息从延迟队列中写入正常Topic中。 代码见ScheduleMessageService中的DeliverDelayedMessageTimerTask.executeOnTimeup方法。

​ 在executeOnTimeup方法中,就会去扫描SCHEDULE_TOPIC_XXXX这个Topic下的所有messageQueue,然后扫描这些MessageQueue对应的ConsumeQueue文件,找到没有处理过的消息,计算他们的延迟时间。如果延迟时间没有到,就等下一秒再重新扫描。如果延迟时间到了,就进行消息转储。将消息转回到原来的目标Topic下。

​ 整个延迟消息的实现方式是这样的:

image.png

​ 而ScheduleMessageService中扫描延迟消息的主要逻辑是这样的:

 //ScheduleMessageService.DeliverDelayedMessageTimerTask#executeOnTimeup
 public void executeOnTimeup() {
         //找到延迟队列对应的ConsumeQueue文件
            ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
                    delayLevel2QueueId(delayLevel));
            //...
            //通过计算,找到这一次扫描需要处理的的ConsumeQueue文件
            SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
            //...
            try {
               //...
                //循环过滤ConsumeQueue文件当中的每一条消息索引
                for (; i < bufferCQ.getSize() && isStarted(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                    //解析每一条ConsumeQueue记录
                    long offsetPy = bufferCQ.getByteBuffer().getLong();
                    int sizePy = bufferCQ.getByteBuffer().getInt();
                    long tagsCode = bufferCQ.getByteBuffer().getLong();
                    //...
                    //计算延迟时间
                    long now = System.currentTimeMillis();
                    long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
                    nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
                    //延迟时间没到就等下一次扫描。
                    long countdown = deliverTimestamp - now;
                    if (countdown > 0) {
                        this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
                        return;
                    }
                   //...
                    //时间到了就进行转储。
                    boolean deliverSuc;
                    if (ScheduleMessageService.this.enableAsyncDeliver) {
                        deliverSuc = this.asyncDeliver(msgInner, msgExt.getMsgId(), nextOffset, offsetPy, sizePy);
                    } else {
                        deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), nextOffset, offsetPy, sizePy);
                    }
//...
                }
    //计算下一次扫描时的Offset起点。
                nextOffset = this.offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
            } catch (Exception e) {
                log.error("ScheduleMessageService, messageTimeup execute error, offset = {}", nextOffset, e);
            } finally {
                bufferCQ.release();
            }
   //部署下一次扫描任务
            this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
        }

​ 你看。这段代码,如果你不懂ConsumeQueue文件的结构,大概率是看不懂他是在干什么的。但是如果清楚了ConsumeQueue文件的结构,就可以很清晰的感受到RocketMQ其实就是在Broker端,像一个普通消费者一样去进行消费,然后扩展出了延迟消息的整个扩展功能。而这,其实也是很多互联网大厂对RocketMQ进行自定义功能扩展的很好的参考。

​ 当然,如果你有心深入分析下去的话,可以针对扫描的效率做更多的梳理以及总结。因为只要是延迟类任务,都需要不断进行扫描。但是如何提升扫描的效率其实是一个非常核心的问题。各种框架都有不同的设计思路,而RocketMQ其实就是给出了一个很高效的参考。

​ 例如下面的长轮询机制,就是在普通消息流转过程中加入一些小逻辑,扩展出来的一种很好的优化机制。在花联网大厂中,会有很多类似这样的自定义优化机制。比如对于延迟消息,只支持十八个固定的延迟级别,但是在很多互联网大厂,其实早在官方提出5.0版本之前,就已经定制形成了支持任意延迟时间的扩展功能。

10、长轮询机制

1、功能回顾

​ RocketMQ对消息消费者提供了Push推模式和Pull拉模式两种消费模式。但是这两种消费模式的本质其实都是Pull拉模式,Push模式可以认为是一种定时的Pull机制。但是这时有一个问题,当使用Push模式时,如果RocketMQ中没有对应的数据,那难道一直进行空轮询吗?如果是这样的话,那显然会极大的浪费网络带宽以及服务器的性能,并且,当有新的消息进来时,RocketMQ也没有办法尽快通知客户端,而只能等客户端下一次来拉取消息了。针对这个问题,RocketMQ实现了一种长轮询机制 long polling。

​ 长轮询机制简单来说,就是当Broker接收到Consumer的Pull请求时,判断如果没有对应的消息,不用直接给Consumer响应(给响应也是个空的,没意义),而是就将这个Pull请求给缓存起来。当Producer发送消息过来时,增加一个步骤去检查是否有对应的已缓存的Pull请求,如果有,就及时将请求从缓存中拉取出来,并将消息通知给Consumer。

2、源码重点

​ Consumer请求缓存,代码入口PullMessageProcessor#processRequest方法

​ PullRequestHoldService服务会随着BrokerController一起启动。

​ 生产者线:从DefaultMessageStore.doReput进入

​ 整个流程以及源码重点如下图所示:

image.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/232625.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

MyBatis--07--启动过程分析、SqlSession安全问题、拦截器

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 谈谈MyBatis的启动过程具体的操作过程如下&#xff1a;实现测试类,并测试SqlSessionFactorySqlSession SqlSession有数据安全问题?在MyBatis中&#xff0c;SqlSess…

App备案、ios备案Bundle ID查询、公钥信息、SHA-1值

App备案、ios备案Bundle ID查询、公钥信息、SHA-1值 Bundle ID这个就不说了&#xff0c;都知道是啥&#xff0c;主要说公钥信息和SHA-1值的获取 打开钥匙串访问&#xff0c;找到当前需要备案App的dis证书&#xff0c;如下&#xff1a; #####右键点击显示简介 #####可以看…

ThinkPHP生活用品商城系统

有需要请加文章底部Q哦 可远程调试 ThinkPHP生活用品商城系统 一 介绍 此生活用品商城系统基于ThinkPHP框架开发&#xff0c;数据库mysql&#xff0c;前端bootstrap。系统分为用户和管理员。(附带配套设计文档) 技术栈&#xff1a;ThinkPHPmysqlbootstrapphpstudyvscode 二 …

小米手机锁屏时间设置为永不休眠_手机不息屏_保持亮屏

环境&#xff1a;打开手机自带的锁屏时间设置发现没有 永不息屏的选项 原因&#xff1a;采用了三星OLED屏幕&#xff0c;所以根据OLED屏幕特性&#xff0c;这个是为了防止烧屏而特意设计的。非OLED机型支持设置“永不” 解决方案1&#xff1a;原生系统是支持永不锁屏的&#…

【自定义Source、Sink】Flink自定义Source、Sink对redis进行读写操作

使用ParameterTool读取配置文件 Flink读取参数的对象 Commons-cli&#xff1a; Apache提供的&#xff0c;需要引入依赖ParameterTool&#xff1a;Flink内置 ParameterTool 比 Commons-cli 使用上简便&#xff1b; ParameterTool能避免Jar包的依赖冲突 建议使用第二种 使用Par…

STL(七)(map篇)

### 这里重点学习map ### 在实际做题过程中,multimap几乎用不到### unordered_map拥有极好的平均时间复杂度和极差的最坏时间复杂度,所以他的时间复杂度是不稳定的,unordered_map一般用不到,要做一个了解 1.map map是一种关联容器,用于存储一组键值对(key-value pairs),其中每…

鸿蒙开发组件之Slider

一、Slider控件是鸿蒙开发中的滑动条组建&#xff0c;初始化方式 Slider({min:0, //最小值max:100,//最大值value:30,//默认值step:10,//步长&#xff0c;每次滑动的差值style:SliderStyle.OutSet, //滑块的样式&#xff0c;默认outsetdirection:Axis.Horizontal, //水平方式的…

Transformer 简介

Transformer 是 Google 在 2017 年底发表的论文 Attention Is All You Need 中所提出的 seq2seq 模型。Transformer 模型的核心是 Self-Attention 机制&#xff0c;能够处理输入序列中的每个元素&#xff0c;并能计算其与序列中其他元素的交互关系的方法&#xff0c;从而能够更…

【自定义Source、Sink】Flink自定义Source、Sink对ClickHouse进行读和批量写操作

ClickHouse官网文档 Flink 读取 ClickHouse 数据两种驱动 ClickHouse 官方提供Clickhouse JDBC.【建议使用】第3方提供的Clickhouse JDBC. ru.yandex.clickhouse.ClickHouseDriver ru.yandex.clickhouse.ClickHouseDriver.现在是没有维护 ClickHouse 官方提供Clickhouse JDBC…

【小沐学Python】Python实现语音识别(SpeechRecognition)

文章目录 1、简介2、安装和测试2.1 安装python2.2 安装SpeechRecognition2.3 安装pyaudio2.4 安装pocketsphinx&#xff08;offline&#xff09;2.5 安装Vosk &#xff08;offline&#xff09;2.6 安装Whisper&#xff08;offline&#xff09; 3 测试3.1 命令3.2 fastapi3.3 go…

【数据结构】——排序篇(上)

前言&#xff1a;前面我们已经学过了许许多多的排序方法&#xff0c;如冒泡排序&#xff0c;选择排序&#xff0c;堆排序等等&#xff0c;那么我们就来将排序的方法总结一下。 我们的排序方法包括以下几种&#xff0c;而快速排序和归并排序我们后面进行详细的讲解。 直接插入…

C#注册表技术及操作

目录 一、注册表基础 1.Registry和RegistryKey类 &#xff08;1&#xff09;Registry类 &#xff08;2&#xff09;RegistryKey类 二、在C#中操作注册表 1.读取注册表中的信息 &#xff08;1&#xff09;OpenSubKey()方法 &#xff08;2&#xff09;GetSubKeyNames()…

2-Spring

2-Spring 文章目录 2-Spring项目源码地址Spring概述Spring特点&#xff08;优点&#xff09;Spring相关学习网站基于Maven的Spring框架导入Spring的组成及拓展 Spring-IOC--原型理解IOC-原型--示例开发示例-常规开发示例-Set函数&#xff08;IOC原型&#xff09;开发示例-对比思…

Python-pdf工具自制(合并、拆分、删除)

pdf工具&#xff0c;之前写的合并工具有点麻烦&#xff0c;使用PyQt5库重写合并拆分和删除指定页面的程序 实现如图&#xff1a; 代码&#xff1a; import sysimport osfrom PyQt5.QtWidgets import QApplication, QMainWindow, QPushButton, QVBoxLayout, QWidget, QFileDia…

新版Android Studio 正则表达式匹配代码注释,删除注释,删除全部注释,IntelliJ IDEA 正则表达式匹配代码注释

正则表达式匹配代码注释 完整表达式拼接Android Studio 搜索匹配【IntelliJ IDEA 也是一样的】 完整表达式拼接 (/*{1,2}[\s\S]?*/)|(//[\x{4e00}-\x{9fa5}].)|(<!-[\s\S]?–>)|(^\s\n)|(System.out.println.*) 表达式拆解&#xff0c;可以根据自己需求自由组合&#x…

【Dubbo3云原生微服务开发实战】「Dubbo前奏导学」 RPC服务的底层原理和实现

RPC服务 RPC服务介绍RPC通信模式RPC架构组成RPC技术要点RPC通信技术选项分析RPC实战开发6大基础组件基础组件之Guava基础组件之Hutools基础组件之ReflectionASM基础组件之FastJSON/FastJSON2基础组件之FST相比FastJSON的优势 基础组件之Commons-Codec RPC框架层面选项分析RPC组…

Cocos Creator:创建棋盘

Cocos Creator&#xff1a;创建棋盘 创建地图三部曲&#xff1a;1. 创建layout组件2. 创建预制体Prefab&#xff0c;做好精灵贴图&#xff1a;3. 创建脚本LayoutSprite.ts收尾工作&#xff1a; 创建地图三部曲&#xff1a; 1. 创建layout组件 使用layout进行布局&#xff0c;…

sensitive word 敏感词(脏词) 如何忽略无意义的字符?达到更好的过滤效果?

忽略字符 说明 我们的敏感词一般都是比较连续的&#xff0c;比如 傻帽 那就有大聪明发现&#xff0c;可以在中间加一些字符&#xff0c;比如【傻!#$帽】跳过检测&#xff0c;但是骂人等攻击力不减。 那么&#xff0c;如何应对这些类似的场景呢&#xff1f; 我们可以指定特…

【论文精读】REACT: SYNERGIZING REASONING AND ACTING IN LANGUAGE MODELS

REACT: SYNERGIZING REASONING AND ACTING IN LANGUAGE MODELS 前言ABSTRACT1 INTRODUCTION2 REACT: SYNERGIZING REASONING ACTING3 KNOWLEDGE-INTENSIVE REASONING TASKS3.1 SETUP3.2 METHODS3.3 RESULTS AND OBSERVATIONS 4 DECISION MAKING TASKS5 RELATED WORK6 CONCLUSI…

Ubuntu20.04使用cephadm部署ceph集群

文章目录 Requirements环境安装Cephadm部署Ceph单机集群引导&#xff08;bootstrap&#xff09;建立新集群 管理OSD列出可用的OSD设备部署OSD删除OSD 管理主机列出主机信息添加主机到集群从集群中删除主机 部署Ceph集群 Cephadm通过在单个主机上创建一个Ceph单机集群&#xff0…