第十四章-Broker-消息分发(cq和indexFile)

消息存放到commitlog文件后,要给消费者使用,就还得把消息分发到 cq(ConsumeQueue)和 indexFile,那么怎么分发,由谁发起呢?这一章主要就把这个讲明白。Java相关的源代码调用链路都比较长,不好整条链接每行代码跟进去,只能挑重点讲,这点跟C/C++代码不同,所以我们就挑重点讲,从消息分发开始入手,在这之前先把调用链列出来,有兴趣的读者可以去看下整条链路的源码。

BrokerController.start() // Broker的启动

->DefaultMessageStore.start() // 存储消息服务启动

​ ->ReputMessageService.start() // 消息分发服务,也继承ServiceThread(实现Runnable)

ReputMessageService 既然是线程服务,那start()执行后,就直接调用 run()方法了

public void run() {
    DefaultMessageStore.log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            Thread.sleep(1);
            this.doReput(); // 直接看这个方法
        } catch (Exception e) {
            DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }

    DefaultMessageStore.log.info(this.getServiceName() + " service end");
}


private void doReput() {
    // reputFromOffset 表示从哪个偏移值开始分发数据,肯定不能小于commitlog最小偏移
    if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
        log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
                 this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
        // 如果小于,就从commitlog最小偏移开始
        this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
    }
    // 只要reputFromOffset 值小于 commitlog 的最大偏移则持续遍历
    for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
		// 默认false,忽略
        if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
            && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
            break;
        }
		// 取出当前 reputFromOffset 偏移到 commitlog 最大偏移期间的数据,getData就是从commitlog文件中的映射缓冲区MappedBuffer读取数据,有兴趣的可以自己进去看看
        SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
        if (result != null) {
            try {
                // 起始偏移
                this.reputFromOffset = result.getStartOffset();
				// size 就是要分发的数据的大于
                for (int readSize = 0; readSize < result.getSize() && doNext; ) {
                    // 从commitlog将消息读取出来后,再拆解出来做校验,校验通过后,再组装成一个分发请求 DispatchRequest 对象,checkMessageAndReturnSize 方法里面对消息的拆解,实际就是写入消息时(看`章节11.3.4`中 doAppend方法)的反向操作
                    DispatchRequest dispatchRequest =
                        DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
                    int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();

                    if (dispatchRequest.isSuccess()) { // 成功,证明消息校验通过,没有问题
                        if (size > 0) {
                            // 开始分发消息,这里分别包含了对cq和indexFile内容的分发,分别看`章节14.1`和`章节14.2`
                            DefaultMessageStore.this.doDispatch(dispatchRequest);
							// 如果是master,还需要通过消息者拉取消息等待的线程,可以开始消费了,这块内容,具体讲到消费者时再写
                            if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
                                && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
                                DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
                                                                                          dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
                                                                                          dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
                                                                                          dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
                            }
						 // 分发 size 字节内容,偏移值增加
                            this.reputFromOffset += size;
                            readSize += size;
                            // slave节点
                            if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
                                // 统计该topic成功分发消息次数
                                DefaultMessageStore.this.storeStatsService
                                    .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
							// 统计该topic成功分发消息大小(字节)
                                DefaultMessageStore.this.storeStatsService
                                    .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
                                    .addAndGet(dispatchRequest.getMsgSize());
                            }
                        } else if (size == 0) {
                            // 当前commitlog文件,没有可以分发的,再滚动到下一个commitlog文件
                            this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
                            readSize = result.getSize();
                        }
                    } else if (!dispatchRequest.isSuccess()) {// 不成功,就是消息校验出了问题或者消息总大小对不上

                        if (size > 0) { // 消息大小对不上
                            log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
                            this.reputFromOffset += size;
                        } else { // size <= 0 表示消息检验不通过,可以认为是非法消息,不能分发给消费者
                            doNext = false;
                            // If user open the dledger pattern or the broker is master node,
                            // it will not ignore the exception and fix the reputFromOffset variable
                            if (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() ||
                                DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
                                log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}",
                                          this.reputFromOffset);
                                // 由于不能分发给消费者,所以这里直接跳过
                                this.reputFromOffset += result.getSize() - readSize;
                            }
                        }
                    }
                }
            } finally {
                // 资源释放
                result.release();
            }
        } else {
            doNext = false;
        }
    }
}

正式分发前,还是再解释一下,分发请求 DispatchRequest 用在哪里,由谁来用,话不多数,直接看调用链:

BrokerController.initialize()

->DefaultMessageStore构建函数

public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
        final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
        // 省略
		
    	// 其他代码都省略了,直接看这3行代码就行,一看就懂,不解释。
        this.dispatcherList = new LinkedList<>();
    	// CommitLogDispatcherBuildConsumeQueue 对应分发到消息队列cq
        this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
    	// CommitLogDispatcherBuildIndex 对应分发到 indexFile 
        this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());

        // 省略
    }

使用分发请求 DispatchRequest,这段源码也在 DefaultMessageStore 类中

public void doDispatch(DispatchRequest req) {
    // 遍历上面addLast添加的内容,有点像责任链模式,但又不完全是
    for (CommitLogDispatcher dispatcher : this.dispatcherList) {
        // 接下来的两节内容就是分别讲述 CommitLogDispatcherBuildConsumeQueue 和 CommitLogDispatcherBuildIndex 类的 dispatch 方法的调用了
        dispatcher.dispatch(req);
    }
}

14.1 消息队列存储

直接进 CommitLogDispatcherBuildConsumeQueue 类,这个类比较简短,就几行代码

class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {

    @Override
    public void dispatch(DispatchRequest request) {
        final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
        switch (tranType) {
            case MessageSysFlag.TRANSACTION_NOT_TYPE: // 普通非事务消息
            case MessageSysFlag.TRANSACTION_COMMIT_TYPE: // 事务消息的最后提交类型
                DefaultMessageStore.this.putMessagePositionInfo(request);// 继续分发
                break;
            case MessageSysFlag.TRANSACTION_PREPARED_TYPE:// 事务消息的预提交
            case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: // 回滚消息
                break;
        }
    }
}

又绕到 DefaultMessageStore 类了

public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
    // 终于看到 ConsumeQueue(后面都简称 cq) 字样了,见名思意,就知道这跟消息队列有关了
    ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
    // 将分发消息存放到cq文件
    cq.putMessagePositionInfoWrapper(dispatchRequest);
}

public ConsumeQueue findConsumeQueue(String topic, int queueId) {
    // 先从缓存中根据 topic 获取,默认是没有的
    ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
    if (null == map) {
        // 创建一个Map,存放cq,Integer表示队列Id queueId
        ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
        // 先将newMap 根据topic存放到本地缓存的map中
        ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
        if (oldMap != null) {
            map = oldMap;
        } else {
            map = newMap;
        }
    }
	// 取出 queueId 对应的cq,默认也是null
    ConsumeQueue logic = map.get(queueId);
    if (null == logic) {
        // 创建 cq对象
        ConsumeQueue newLogic = new ConsumeQueue(
            topic, // topic
            queueId, // 队列id
            StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),// cq文件存放路径
            this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),// cq文件能存放的最大字节(300000 * 20),30w个元素,每个元素占20字节
            this);
        // put 进 map
        ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
        if (oldLogic != null) {
            logic = oldLogic;
        } else {
            logic = newLogic;
        }
    }
	// 返回
    return logic;
}

再进入到 ConsumeQueue 类,执行分发的消息存入文件

public void putMessagePositionInfoWrapper(DispatchRequest request) {
    final int maxRetries = 30; // 最大重试次数
    // 是否可写标志
    boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
    // 开始写入
    for (int i = 0; i < maxRetries && canWrite; i++) {
        long tagsCode = request.getTagsCode();
        // 对cq扩展的标志,默认为false,先忽略吧,以后有机会再单独拎出来讲
        if (isExtWriteEnable()) {
            ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
            cqExtUnit.setFilterBitMap(request.getBitMap());
            cqExtUnit.setMsgStoreTime(request.getStoreTimestamp());
            cqExtUnit.setTagsCode(request.getTagsCode());

            long extAddr = this.consumeQueueExt.put(cqExtUnit);
            if (isExtAddr(extAddr)) {
                tagsCode = extAddr;
            } else {
                log.warn("Save consume queue extend fail, So just save tagsCode! {}, topic:{}, queueId:{}, offset:{}", cqExtUnit,
                         topic, queueId, request.getCommitLogOffset());
            }
        }
        // 真正写文件的操作,四个参数分别告诉要从哪个偏移开始写,写多少,tags,写到cq的哪里
        boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
                                                     request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());
        if (result) {
            // 写入成功,记录时间
            this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
            return;
        } else {
            // XXX: warn and notify me
            log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + request.getCommitLogOffset()
                     + " failed, retry " + i + " times");

            try {
                // 失败了,过1秒再试下
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                log.warn("", e);
            }
        }
    }

    // XXX: warn and notify me
    log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);
    this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
}

private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
                                       final long cqOffset) {

    if (offset + size <= this.maxPhysicOffset) {
        log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);
        return true;
    }
	// 下面是对cq元素内容的存储,可以结合`章节第10章`的存储文件来阅读,会更轻松
    this.byteBufferIndex.flip();
    this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE); // 设置buffer最大20
    this.byteBufferIndex.putLong(offset); // commitlog 的偏移值
    this.byteBufferIndex.putInt(size); // 消息的大小
    this.byteBufferIndex.putLong(tagsCode); // 过滤tags的hash code值
	// cqOffset 表示在cq队列中的位置,每个元素 CQ_STORE_UNIT_SIZE 字节,这里就肯定要乘以 CQ_STORE_UNIT_SIZE
    final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
	// 取出合适的cq文件,参考commitlog文件的获取
    MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
    if (mappedFile != null) {
		// 判断是否初始创建cq,并且cqOffset不为0,但是cq文件可写的又是0,这种情况就表明cq不想从0开始写,那么前面就得补齐0
        if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
            this.minLogicOffset = expectLogicOffset;
            // 设置cq已经刷新的位置
            this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
            // 设置cq已经提交完成的位置
            this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
            // 将前面的补齐0
            this.fillPreBlank(mappedFile, expectLogicOffset);
            log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
                     + mappedFile.getWrotePosition());
        }

        if (cqOffset != 0) {
            // 当前cq的可以开始写入偏移值
            long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
			// 当前期望写入的位置 不能小于 currentLogicOffset,否则非法
            if (expectLogicOffset < currentLogicOffset) {
                log.warn("Build  consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                         expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
                return true;
            }
			// 两者值不同,那就是有bug了,给予日志提醒
            if (expectLogicOffset != currentLogicOffset) {
                LOG_ERROR.warn(
                    "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                    expectLogicOffset,
                    currentLogicOffset,
                    this.topic,
                    this.queueId,
                    expectLogicOffset - currentLogicOffset
                );
            }
        }
        // 记录分发后的commitlog的物理偏移
        this.maxPhysicOffset = offset + size;
        // 往cq文件中写入一个元素大小的数据
        return mappedFile.appendMessage(this.byteBufferIndex.array());
    }
    return false;
}

MappedFile.appendMessage

public boolean appendMessage(final byte[] data) {
    int currentPos = this.wrotePosition.get();
	// 当前位置+要写入数据的长度 不能大于 文件可容纳最大大小
    if ((currentPos + data.length) <= this.fileSize) {
        try {
            // 文件同步写入
            this.fileChannel.position(currentPos);
            this.fileChannel.write(ByteBuffer.wrap(data));
        } catch (Throwable e) {
            log.error("Error occurred when append message to mappedFile.", e);
        }
        // 增长写入的位置值
        this.wrotePosition.addAndGet(data.length);
        return true;
    }

    return false;
}

14.2 IndexFile存储

直接进 CommitLogDispatcherBuildIndex 类,这个类更简短,就几行代码

class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {

    @Override
    public void dispatch(DispatchRequest request) {
        // 写入indexFile标志,默认开启的
        if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
            // 构建index,并写入
            DefaultMessageStore.this.indexService.buildIndex(request);
        }
    }
}

进入IndexService类

public void buildIndex(DispatchRequest req) {
    // 多次尝试获取或创建indexFile文件
    IndexFile indexFile = retryGetAndCreateIndexFile();
    if (indexFile != null) {
        long endPhyOffset = indexFile.getEndPhyOffset();
        DispatchRequest msg = req;
        String topic = msg.getTopic();
        String keys = msg.getKeys();
        // 检验:commitlog的提交偏移值,肯定不能写于上一次的最大偏移值,肯定是往大的写,不可能往小了写
        if (msg.getCommitLogOffset() < endPhyOffset) {
            return;
        }

        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        switch (tranType) {
            case MessageSysFlag.TRANSACTION_NOT_TYPE:
            case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
            case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                break;
            case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                return;
        }
		// 该消息只有一个key
        if (req.getUniqKey() != null) {
            // 存入IndexFile
            indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
            if (indexFile == null) {
                log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
                return;
            }
        }
		// 该消息有多个key
        if (keys != null && keys.length() > 0) {
            String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
            for (int i = 0; i < keyset.length; i++) {
                String key = keyset[i];
                if (key.length() > 0) {
                    // 存入IndexFile
                    indexFile = putKey(indexFile, msg, buildKey(topic, key));
                    if (indexFile == null) {
                        log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
                        return;
                    }
                }
            }
        }
    } else {
        log.error("build index error, stop building index");
    }
}

多次尝试获取并创建IndexFile文件

public IndexFile retryGetAndCreateIndexFile() {
    IndexFile indexFile = null;
	// 遍历3次
    for (int times = 0; null == indexFile && times < MAX_TRY_IDX_CREATE; times++) {
        // 获取并创建文件
        indexFile = this.getAndCreateLastIndexFile();
        if (null != indexFile) // 取到了文件,直接退出for循环
            break;

        try {
            log.info("Tried to create index file " + times + " times");
            // 没有取到文件,等待1秒重试
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            log.error("Interrupted", e);
        }
    }

    if (null == indexFile) {// 最终还是失败,打印失败日志
        this.defaultMessageStore.getAccessRights().makeIndexFileError();
        log.error("Mark index file cannot build flag");
    }

    return indexFile;
}

public IndexFile getAndCreateLastIndexFile() {
    // 定义一些局部变量
    IndexFile indexFile = null;
    IndexFile prevIndexFile = null;
    long lastUpdateEndPhyOffset = 0;
    long lastUpdateIndexTimestamp = 0;

    {
        // 读锁
        this.readWriteLock.readLock().lock();
        // 缓存列表中有indexFile
        if (!this.indexFileList.isEmpty()) {
            // 取出最新的IndexFile
            IndexFile tmp = this.indexFileList.get(this.indexFileList.size() - 1);
            // 判断该文件是否已经写满了,通过 IndexCount 是否达到最大限度 5000000 * 4(500w个slot,每个slot占4字节)
            if (!tmp.isWriteFull()) {
                indexFile = tmp; // 没有写满,就继续用这个文件写
            } else {
                // 写满了,记录上次更新的物理偏移和时间戳
                lastUpdateEndPhyOffset = tmp.getEndPhyOffset();
                lastUpdateIndexTimestamp = tmp.getEndTimestamp();
                // 写满了,得用一个新文件了,所以当前文件tmp就是下一个文件的前一个文件了,哈哈,有点绕
                prevIndexFile = tmp;
            }
        }
		// 读锁释放
        this.readWriteLock.readLock().unlock();
    }
	// 初次创建
    if (indexFile == null) {
        try {
            // IndexFile文件命名是用当前时间戳转换成人类可读的年月日时分秒这种格式
            String fileName =
                this.storePath + File.separator
                + UtilAll.timeMillisToHumanString(System.currentTimeMillis());
            indexFile =
                new IndexFile(fileName, this.hashSlotNum, this.indexNum, lastUpdateEndPhyOffset,
                              lastUpdateIndexTimestamp);
            this.readWriteLock.writeLock().lock(); // 写锁
            this.indexFileList.add(indexFile); // 添加到缓存列表
        } catch (Exception e) {
            log.error("getLastIndexFile exception ", e);
        } finally {
            this.readWriteLock.writeLock().unlock(); // 释放写锁
        }

        if (indexFile != null) {
            // 写新的文件之前要把上一个文件的内存从缓冲区强制刷新到磁盘
            final IndexFile flushThisFile = prevIndexFile;
            Thread flushThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    // 通过线程强制刷新上一个文件的内容到磁盘
                    IndexService.this.flush(flushThisFile);
                }
            }, "FlushIndexFileThread");
			// 将刷新线程设置成后台线程
            flushThread.setDaemon(true);
            // 启动刷新线程
            flushThread.start();
        }
    }

    return indexFile;
}

存入IndexFile

private IndexFile putKey(IndexFile indexFile, DispatchRequest msg, String idxKey) {
    // 持续遍历写入IndexFile
    for (boolean ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp()); !ok; ) {
        log.warn("Index file [" + indexFile.getFileName() + "] is full, trying to create another one");

        indexFile = retryGetAndCreateIndexFile();
        if (null == indexFile) {
            return null;
        }

        ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp());
    }

    return indexFile;
}

// 这个方法最好结合`第十章-存储文件`来看,内部就是对IndexHeader、Slot、Indexes的存储,理解了`图10-1`,这个方法的代码就不用看了
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
    if (this.indexHeader.getIndexCount() < this.indexNum) {
        // key的hash值
        int keyHash = indexKeyHashMethod(key);
        // 求模得出slot的槽位
        int slotPos = keyHash % this.hashSlotNum;
        // IndexHeader的大小(40字节)+槽位*每个槽位所占大小(4字节),得出要写入数据的槽位偏移
        int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

        FileLock fileLock = null;

        try {

            // 先看看该槽位,原先有没有值
            int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
            // 检验slotValue的合法性
            if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
                slotValue = invalidIndex;
            }
			// 求出当前存储的时间与IndexFile存入的第一个消息的时间戳的比较
            long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();

            timeDiff = timeDiff / 1000;
			// 修正 timeDiff 的值
            if (this.indexHeader.getBeginTimestamp() <= 0) {
                timeDiff = 0;
            } else if (timeDiff > Integer.MAX_VALUE) {
                timeDiff = Integer.MAX_VALUE;
            } else if (timeDiff < 0) {
                timeDiff = 0;
            }
			// 计算开始写Index的位置
            int absIndexPos =
                IndexHeader.INDEX_HEADER_SIZE  // IndexHeader 头部所占空间
                + this.hashSlotNum * hashSlotSize // 500w个slot所占空间
                + this.indexHeader.getIndexCount() * indexSize; // IndexCount表示已经占用的数量
			// 存入keyhash
            this.mappedByteBuffer.putInt(absIndexPos, keyHash);
            // 存入commitlog物理偏移
            this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
            // 存入时差
            this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
            // 同一个slot下的前一个index的索引值,为解决slot槽位冲突,可以理解成HashMap中hashcode冲突时的链表
            this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
			// 往slot中存入当前存放index的索引值
            this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
			// 初次存入,设置一下第一个元素时间戳和commitlog物理偏移
            if (this.indexHeader.getIndexCount() <= 1) {
                this.indexHeader.setBeginPhyOffset(phyOffset);
                this.indexHeader.setBeginTimestamp(storeTimestamp);
            }
			// 统计
            this.indexHeader.incHashSlotCount();
            this.indexHeader.incIndexCount();
            // 设置一下最后一个元素的时间戳和commitlog物理偏移
            this.indexHeader.setEndPhyOffset(phyOffset);
            this.indexHeader.setEndTimestamp(storeTimestamp);
			// 结束,返回true
            return true;
        } catch (Exception e) {
            log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
        } finally {
            if (fileLock != null) {
                try {
                    fileLock.release();
                } catch (IOException e) {
                    log.error("Failed to release the lock", e);
                }
            }
        }
    } else {
        log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
                 + "; index max num = " + this.indexNum);
    }

    return false;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/555285.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

5GNR刷题

5G帧结构 5G NR帧结构的基本时间单位是( C ) A) subframe B) slot C) Tc D) symbol 5G无线帧长是多少ms&#xff08;B&#xff09; A) 5 B) 10 C) 20 D) 40 下面哪种子载波间隔是中国移动白皮书中规定必选(B ) A) 15KHz B) 30KHz C) 60KHz D) 120KHz 5G参数集包含哪…

【学习笔记二十】EWM TU运输单元业务概述及后台配置

一、EWM TU运输单元业务流程概述 TU是指车辆和运输单元在货场中,移动车辆或运输单元。 车辆是特定运输方式的专用化工具,车辆可以包含一个或多个运输单元,并代表运输车辆的实际实体。 运输单元是用于运输货物的车辆的最小可装载单位,运输单元可以是车辆的固定部分。 …

游游的you矩阵

题目&#xff1a; 游游拿到了一个字符矩阵&#xff0c;她想知道有多少个三角形满足以下条件&#xff1a; 三角形的三个顶点分别是 y、o、u 字符。三角形为直角三角形&#xff0c;且两个直角边一个为水平、另一个为垂直。 输入描述&#xff1a; 第一行输入两个正整数n,m&#…

数字营销:细分-目标-定位(STP)模式——如何实现精准营销

细分-目标-定位&#xff08;STP&#xff09;模型是最广为人知的营销策略之一。作为营销人员&#xff0c;我们倾向于追逐新鲜事物&#xff0c;总是追求最新、最闪亮的营销技术&#xff0c;并为自己领先于趋势而感到自豪。与内容营销相结合&#xff0c;STP模式仍然是简化营销运作…

【详细的Kylin使用心得】

&#x1f308;个人主页: 程序员不想敲代码啊 &#x1f3c6;CSDN优质创作者&#xff0c;CSDN实力新星&#xff0c;CSDN博客专家 &#x1f44d;点赞⭐评论⭐收藏 &#x1f91d;希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出指正&#xff0c;让我们共…

【JavaWeb】异步请求——AJAX

目录 Ajax&#xff08;Asynchronous JavaScript and XML&#xff09;优点传统Web与Ajax的差异Ajax工作流程Ajax 经典应用场景XMLHttpRequest常用方法事件常用属性 ajax: GET请求和POST请求的区别 传统Ajax实现传统方式实现Ajax的不足 $.ajax()语法常用属性参数常用函数参数 Aja…

【LeetCode题解】2007. 从双倍数组中还原原数组

文章目录 [2007. 从双倍数组中还原原数组](https://leetcode.cn/problems/find-original-array-from-doubled-array/)思路&#xff1a;代码&#xff1a; 2007. 从双倍数组中还原原数组 思路&#xff1a; 首先&#xff0c;对输入的 changed 数组进行排序&#xff0c;以便后续操…

隐式/动态游标的创建与使用

目录 将 emp 数据表中部门 10 的员工工资增加 100 元&#xff0c;然后使用隐式游标的 %ROWCOUNT 属性输出涉及的员工数量 动态游标的定义 声明游标变量 打开游标变量 检索游标变量 关闭游标变量 定义动态游标&#xff0c;输出 emp 中部门 10 的所有员工的工号和姓名 Orac…

LeetCode-热题100:102. 二叉树的层序遍历

题目描述 给你二叉树的根节点 root &#xff0c;返回其节点值的 层序遍历 。 &#xff08;即逐层地&#xff0c;从左到右访问所有节点&#xff09;。 示例 1&#xff1a; 输入&#xff1a; root [3,9,20,null,null,15,7] 输出&#xff1a; [[3],[9,20],[15,7]] 示例 2&am…

数据结构之排序了如指掌(二)

目录 题外话 正题 选择排序 选择排序思路 选择排序代码详解 选择排序复杂度 双向选择排序 双向选择排序思路 双向选择排序代码详解 堆排序 堆排序思路 堆排序代码详解 堆排序复杂度 冒泡排序 冒泡排序思路 冒泡排序代码详解 冒泡排序复杂度 小结 题外话 今天…

供应链投毒预警 | 开源供应链投毒202403月报发布啦!(含投毒案例分析)

悬镜供应链安全情报中心通过持续监测全网主流开源软件仓库&#xff0c;结合程序动静态分析方式对潜在风险的开源组件包进行动态跟踪和捕获&#xff0c;能够第一时间捕获开源组件仓库中的恶意投毒攻击。在2024年3月份&#xff0c;悬镜供应链安全情报中心在NPM官方仓库&#xff0…

2024华中杯A题完整1-3问py代码+完整思路16页+后续参考论文

A题太阳能路灯光伏板朝向问题 &#xff08;完整版资料文末获取&#xff09; 第1小问&#xff1a;计算每月15日的太阳直射强度和总能量 1. 理解太阳直射辐射和光伏板的关系**&#xff1a;光伏板接收太阳辐射并转化为电能&#xff0c;直射辐射对光伏板的效率影响最大。 2. 收集…

MES给制造业带来看得见的效益

作为连接生产控制系统和企业管理系统的纽带&#xff0c;MES为企业提供实时生产数据&#xff0c;帮助企业进行更加明智的决策&#xff0c;并实时调整生产管理&#xff0c;最终降低运营成本、提高运营利润和资产利用率、保证生产安全与合规。 MES主要功能包括工艺技术管理、生产…

面试题:一个 URL 在浏览器被输入到页面展现的过程中发生了什么

文章目录 前言一、回答二、深入追问 前言 这是一段~ 经典的旋律 ~&#xff0c;不好意思串台了&#xff0c;哈哈&#xff0c;这是一个经典的面试题&#xff1a;一个URL从浏览器到页面的过程中发生了什么&#xff0c;那么今天就带大家九浅一深来研究一下 觉得不错的同学可以加我…

波士顿动力抛弃液压机器人Atlas,推出全新电动化机器人,动作超灵活

本周&#xff0c;机器人科技巨头波士顿动力宣布液压Atlas退役&#xff0c;并推出了下一代产品——专为实际应用而设计的全电动Atlas机器人&#xff0c;这也意味着人形机器人迈出了商业化的第一步。 Atlas——人形机器人鼻祖 Atlas&#xff08;阿特拉斯&#xff09;这个名字最…

CTFHUB-技能树-Web前置技能-文件上传(前端验证—文件头检查)

CTFHUB-技能树-Web前置技能-文件上传&#xff08;前端验证—文件头检查&#xff09; 文章目录 CTFHUB-技能树-Web前置技能-文件上传&#xff08;前端验证—文件头检查&#xff09;前端验证—文件头检查题目解析 各种文件头标志 前端验证—文件头检查 题目考的是&#xff1a;pn…

【笔试强训】DFS、优先队列、滑动窗口笔试题目!

文章目录 1. 单词搜索2. 除 2 操作3. dd 爱框框 1. 单词搜索 题目链接 解题思路&#xff1a; DFS (深度优先遍历)&#xff0c;用一个 pos 记录要匹配单词 word 的位置&#xff0c;每次与 pos 进行匹配判断&#xff08;这样做的好处是不用把答案存下来&#xff09; 注意细节…

深入解析Nacos配置中心的动态配置更新技术

码到三十五 &#xff1a; 个人主页 心中有诗画&#xff0c;指尖舞代码&#xff0c;目光览世界&#xff0c;步履越千山&#xff0c;人间尽值得 ! 在微服务架构中&#xff0c;配置管理变得尤为关键。Nacos&#xff0c;作为一个开源的、易于使用的、功能丰富的平台&#xff0c;为…

electron的webview和内嵌网页如何通信

在 Electron 的世界里&#xff0c;webview 标签相当于一个小盒子&#xff0c;里面可以装一个完整的网页&#xff0c;就像一个迷你浏览器。当你想和这个小盒子里的内容说话时&#xff08;也就是进行通信&#xff09;&#xff0c;这里有几个方法可以帮你做到&#xff1a; 这里只写…

轻量化模块整理,即插即用

轻量化模块整理&#xff0c;即插即用&#xff08;持续更新&#xff09; 整理一些轻量化的结构&#xff0c;作为知识储备&#xff0c;可以用到后续的项目和研究中 Mobilenetv3 深度可分离卷积 MobileNetV3 是一个轻量级的深度学习模型&#xff0c;专为移动和边缘设备上的高效…