消息存放到commitlog文件后,要给消费者使用,就还得把消息分发到 cq(ConsumeQueue)和 indexFile,那么怎么分发,由谁发起呢?这一章主要就把这个讲明白。Java相关的源代码调用链路都比较长,不好整条链接每行代码跟进去,只能挑重点讲,这点跟C/C++代码不同,所以我们就挑重点讲,从消息分发开始入手,在这之前先把调用链列出来,有兴趣的读者可以去看下整条链路的源码。
BrokerController.start() // Broker的启动
->DefaultMessageStore.start() // 存储消息服务启动
->ReputMessageService.start() // 消息分发服务,也继承ServiceThread(实现Runnable)
ReputMessageService 既然是线程服务,那start()执行后,就直接调用 run()方法了
public void run() {
DefaultMessageStore.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
Thread.sleep(1);
this.doReput(); // 直接看这个方法
} catch (Exception e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
DefaultMessageStore.log.info(this.getServiceName() + " service end");
}
private void doReput() {
// reputFromOffset 表示从哪个偏移值开始分发数据,肯定不能小于commitlog最小偏移
if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
// 如果小于,就从commitlog最小偏移开始
this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
}
// 只要reputFromOffset 值小于 commitlog 的最大偏移则持续遍历
for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
// 默认false,忽略
if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
&& this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
break;
}
// 取出当前 reputFromOffset 偏移到 commitlog 最大偏移期间的数据,getData就是从commitlog文件中的映射缓冲区MappedBuffer读取数据,有兴趣的可以自己进去看看
SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
if (result != null) {
try {
// 起始偏移
this.reputFromOffset = result.getStartOffset();
// size 就是要分发的数据的大于
for (int readSize = 0; readSize < result.getSize() && doNext; ) {
// 从commitlog将消息读取出来后,再拆解出来做校验,校验通过后,再组装成一个分发请求 DispatchRequest 对象,checkMessageAndReturnSize 方法里面对消息的拆解,实际就是写入消息时(看`章节11.3.4`中 doAppend方法)的反向操作
DispatchRequest dispatchRequest =
DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
if (dispatchRequest.isSuccess()) { // 成功,证明消息校验通过,没有问题
if (size > 0) {
// 开始分发消息,这里分别包含了对cq和indexFile内容的分发,分别看`章节14.1`和`章节14.2`
DefaultMessageStore.this.doDispatch(dispatchRequest);
// 如果是master,还需要通过消息者拉取消息等待的线程,可以开始消费了,这块内容,具体讲到消费者时再写
if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
}
// 分发 size 字节内容,偏移值增加
this.reputFromOffset += size;
readSize += size;
// slave节点
if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
// 统计该topic成功分发消息次数
DefaultMessageStore.this.storeStatsService
.getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
// 统计该topic成功分发消息大小(字节)
DefaultMessageStore.this.storeStatsService
.getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
.addAndGet(dispatchRequest.getMsgSize());
}
} else if (size == 0) {
// 当前commitlog文件,没有可以分发的,再滚动到下一个commitlog文件
this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
readSize = result.getSize();
}
} else if (!dispatchRequest.isSuccess()) {// 不成功,就是消息校验出了问题或者消息总大小对不上
if (size > 0) { // 消息大小对不上
log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
this.reputFromOffset += size;
} else { // size <= 0 表示消息检验不通过,可以认为是非法消息,不能分发给消费者
doNext = false;
// If user open the dledger pattern or the broker is master node,
// it will not ignore the exception and fix the reputFromOffset variable
if (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() ||
DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}",
this.reputFromOffset);
// 由于不能分发给消费者,所以这里直接跳过
this.reputFromOffset += result.getSize() - readSize;
}
}
}
}
} finally {
// 资源释放
result.release();
}
} else {
doNext = false;
}
}
}
正式分发前,还是再解释一下,分发请求 DispatchRequest 用在哪里,由谁来用,话不多数,直接看调用链:
BrokerController.initialize()
->DefaultMessageStore构建函数
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
// 省略
// 其他代码都省略了,直接看这3行代码就行,一看就懂,不解释。
this.dispatcherList = new LinkedList<>();
// CommitLogDispatcherBuildConsumeQueue 对应分发到消息队列cq
this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
// CommitLogDispatcherBuildIndex 对应分发到 indexFile
this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());
// 省略
}
使用分发请求 DispatchRequest,这段源码也在 DefaultMessageStore 类中
public void doDispatch(DispatchRequest req) {
// 遍历上面addLast添加的内容,有点像责任链模式,但又不完全是
for (CommitLogDispatcher dispatcher : this.dispatcherList) {
// 接下来的两节内容就是分别讲述 CommitLogDispatcherBuildConsumeQueue 和 CommitLogDispatcherBuildIndex 类的 dispatch 方法的调用了
dispatcher.dispatch(req);
}
}
14.1 消息队列存储
直接进 CommitLogDispatcherBuildConsumeQueue 类,这个类比较简短,就几行代码
class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
@Override
public void dispatch(DispatchRequest request) {
final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE: // 普通非事务消息
case MessageSysFlag.TRANSACTION_COMMIT_TYPE: // 事务消息的最后提交类型
DefaultMessageStore.this.putMessagePositionInfo(request);// 继续分发
break;
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:// 事务消息的预提交
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: // 回滚消息
break;
}
}
}
又绕到 DefaultMessageStore 类了
public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
// 终于看到 ConsumeQueue(后面都简称 cq) 字样了,见名思意,就知道这跟消息队列有关了
ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
// 将分发消息存放到cq文件
cq.putMessagePositionInfoWrapper(dispatchRequest);
}
public ConsumeQueue findConsumeQueue(String topic, int queueId) {
// 先从缓存中根据 topic 获取,默认是没有的
ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
if (null == map) {
// 创建一个Map,存放cq,Integer表示队列Id queueId
ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
// 先将newMap 根据topic存放到本地缓存的map中
ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
if (oldMap != null) {
map = oldMap;
} else {
map = newMap;
}
}
// 取出 queueId 对应的cq,默认也是null
ConsumeQueue logic = map.get(queueId);
if (null == logic) {
// 创建 cq对象
ConsumeQueue newLogic = new ConsumeQueue(
topic, // topic
queueId, // 队列id
StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),// cq文件存放路径
this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),// cq文件能存放的最大字节(300000 * 20),30w个元素,每个元素占20字节
this);
// put 进 map
ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
if (oldLogic != null) {
logic = oldLogic;
} else {
logic = newLogic;
}
}
// 返回
return logic;
}
再进入到 ConsumeQueue 类,执行分发的消息存入文件
public void putMessagePositionInfoWrapper(DispatchRequest request) {
final int maxRetries = 30; // 最大重试次数
// 是否可写标志
boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
// 开始写入
for (int i = 0; i < maxRetries && canWrite; i++) {
long tagsCode = request.getTagsCode();
// 对cq扩展的标志,默认为false,先忽略吧,以后有机会再单独拎出来讲
if (isExtWriteEnable()) {
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
cqExtUnit.setFilterBitMap(request.getBitMap());
cqExtUnit.setMsgStoreTime(request.getStoreTimestamp());
cqExtUnit.setTagsCode(request.getTagsCode());
long extAddr = this.consumeQueueExt.put(cqExtUnit);
if (isExtAddr(extAddr)) {
tagsCode = extAddr;
} else {
log.warn("Save consume queue extend fail, So just save tagsCode! {}, topic:{}, queueId:{}, offset:{}", cqExtUnit,
topic, queueId, request.getCommitLogOffset());
}
}
// 真正写文件的操作,四个参数分别告诉要从哪个偏移开始写,写多少,tags,写到cq的哪里
boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());
if (result) {
// 写入成功,记录时间
this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
return;
} else {
// XXX: warn and notify me
log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + request.getCommitLogOffset()
+ " failed, retry " + i + " times");
try {
// 失败了,过1秒再试下
Thread.sleep(1000);
} catch (InterruptedException e) {
log.warn("", e);
}
}
}
// XXX: warn and notify me
log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);
this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
}
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
final long cqOffset) {
if (offset + size <= this.maxPhysicOffset) {
log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);
return true;
}
// 下面是对cq元素内容的存储,可以结合`章节第10章`的存储文件来阅读,会更轻松
this.byteBufferIndex.flip();
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE); // 设置buffer最大20
this.byteBufferIndex.putLong(offset); // commitlog 的偏移值
this.byteBufferIndex.putInt(size); // 消息的大小
this.byteBufferIndex.putLong(tagsCode); // 过滤tags的hash code值
// cqOffset 表示在cq队列中的位置,每个元素 CQ_STORE_UNIT_SIZE 字节,这里就肯定要乘以 CQ_STORE_UNIT_SIZE
final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
// 取出合适的cq文件,参考commitlog文件的获取
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
if (mappedFile != null) {
// 判断是否初始创建cq,并且cqOffset不为0,但是cq文件可写的又是0,这种情况就表明cq不想从0开始写,那么前面就得补齐0
if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
this.minLogicOffset = expectLogicOffset;
// 设置cq已经刷新的位置
this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
// 设置cq已经提交完成的位置
this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
// 将前面的补齐0
this.fillPreBlank(mappedFile, expectLogicOffset);
log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
+ mappedFile.getWrotePosition());
}
if (cqOffset != 0) {
// 当前cq的可以开始写入偏移值
long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
// 当前期望写入的位置 不能小于 currentLogicOffset,否则非法
if (expectLogicOffset < currentLogicOffset) {
log.warn("Build consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
return true;
}
// 两者值不同,那就是有bug了,给予日志提醒
if (expectLogicOffset != currentLogicOffset) {
LOG_ERROR.warn(
"[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
expectLogicOffset,
currentLogicOffset,
this.topic,
this.queueId,
expectLogicOffset - currentLogicOffset
);
}
}
// 记录分发后的commitlog的物理偏移
this.maxPhysicOffset = offset + size;
// 往cq文件中写入一个元素大小的数据
return mappedFile.appendMessage(this.byteBufferIndex.array());
}
return false;
}
MappedFile.appendMessage
public boolean appendMessage(final byte[] data) {
int currentPos = this.wrotePosition.get();
// 当前位置+要写入数据的长度 不能大于 文件可容纳最大大小
if ((currentPos + data.length) <= this.fileSize) {
try {
// 文件同步写入
this.fileChannel.position(currentPos);
this.fileChannel.write(ByteBuffer.wrap(data));
} catch (Throwable e) {
log.error("Error occurred when append message to mappedFile.", e);
}
// 增长写入的位置值
this.wrotePosition.addAndGet(data.length);
return true;
}
return false;
}
14.2 IndexFile存储
直接进 CommitLogDispatcherBuildIndex 类,这个类更简短,就几行代码
class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {
@Override
public void dispatch(DispatchRequest request) {
// 写入indexFile标志,默认开启的
if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
// 构建index,并写入
DefaultMessageStore.this.indexService.buildIndex(request);
}
}
}
进入IndexService类
public void buildIndex(DispatchRequest req) {
// 多次尝试获取或创建indexFile文件
IndexFile indexFile = retryGetAndCreateIndexFile();
if (indexFile != null) {
long endPhyOffset = indexFile.getEndPhyOffset();
DispatchRequest msg = req;
String topic = msg.getTopic();
String keys = msg.getKeys();
// 检验:commitlog的提交偏移值,肯定不能写于上一次的最大偏移值,肯定是往大的写,不可能往小了写
if (msg.getCommitLogOffset() < endPhyOffset) {
return;
}
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
break;
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
return;
}
// 该消息只有一个key
if (req.getUniqKey() != null) {
// 存入IndexFile
indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
if (indexFile == null) {
log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
return;
}
}
// 该消息有多个key
if (keys != null && keys.length() > 0) {
String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
for (int i = 0; i < keyset.length; i++) {
String key = keyset[i];
if (key.length() > 0) {
// 存入IndexFile
indexFile = putKey(indexFile, msg, buildKey(topic, key));
if (indexFile == null) {
log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
return;
}
}
}
}
} else {
log.error("build index error, stop building index");
}
}
多次尝试获取并创建IndexFile文件
public IndexFile retryGetAndCreateIndexFile() {
IndexFile indexFile = null;
// 遍历3次
for (int times = 0; null == indexFile && times < MAX_TRY_IDX_CREATE; times++) {
// 获取并创建文件
indexFile = this.getAndCreateLastIndexFile();
if (null != indexFile) // 取到了文件,直接退出for循环
break;
try {
log.info("Tried to create index file " + times + " times");
// 没有取到文件,等待1秒重试
Thread.sleep(1000);
} catch (InterruptedException e) {
log.error("Interrupted", e);
}
}
if (null == indexFile) {// 最终还是失败,打印失败日志
this.defaultMessageStore.getAccessRights().makeIndexFileError();
log.error("Mark index file cannot build flag");
}
return indexFile;
}
public IndexFile getAndCreateLastIndexFile() {
// 定义一些局部变量
IndexFile indexFile = null;
IndexFile prevIndexFile = null;
long lastUpdateEndPhyOffset = 0;
long lastUpdateIndexTimestamp = 0;
{
// 读锁
this.readWriteLock.readLock().lock();
// 缓存列表中有indexFile
if (!this.indexFileList.isEmpty()) {
// 取出最新的IndexFile
IndexFile tmp = this.indexFileList.get(this.indexFileList.size() - 1);
// 判断该文件是否已经写满了,通过 IndexCount 是否达到最大限度 5000000 * 4(500w个slot,每个slot占4字节)
if (!tmp.isWriteFull()) {
indexFile = tmp; // 没有写满,就继续用这个文件写
} else {
// 写满了,记录上次更新的物理偏移和时间戳
lastUpdateEndPhyOffset = tmp.getEndPhyOffset();
lastUpdateIndexTimestamp = tmp.getEndTimestamp();
// 写满了,得用一个新文件了,所以当前文件tmp就是下一个文件的前一个文件了,哈哈,有点绕
prevIndexFile = tmp;
}
}
// 读锁释放
this.readWriteLock.readLock().unlock();
}
// 初次创建
if (indexFile == null) {
try {
// IndexFile文件命名是用当前时间戳转换成人类可读的年月日时分秒这种格式
String fileName =
this.storePath + File.separator
+ UtilAll.timeMillisToHumanString(System.currentTimeMillis());
indexFile =
new IndexFile(fileName, this.hashSlotNum, this.indexNum, lastUpdateEndPhyOffset,
lastUpdateIndexTimestamp);
this.readWriteLock.writeLock().lock(); // 写锁
this.indexFileList.add(indexFile); // 添加到缓存列表
} catch (Exception e) {
log.error("getLastIndexFile exception ", e);
} finally {
this.readWriteLock.writeLock().unlock(); // 释放写锁
}
if (indexFile != null) {
// 写新的文件之前要把上一个文件的内存从缓冲区强制刷新到磁盘
final IndexFile flushThisFile = prevIndexFile;
Thread flushThread = new Thread(new Runnable() {
@Override
public void run() {
// 通过线程强制刷新上一个文件的内容到磁盘
IndexService.this.flush(flushThisFile);
}
}, "FlushIndexFileThread");
// 将刷新线程设置成后台线程
flushThread.setDaemon(true);
// 启动刷新线程
flushThread.start();
}
}
return indexFile;
}
存入IndexFile
private IndexFile putKey(IndexFile indexFile, DispatchRequest msg, String idxKey) {
// 持续遍历写入IndexFile
for (boolean ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp()); !ok; ) {
log.warn("Index file [" + indexFile.getFileName() + "] is full, trying to create another one");
indexFile = retryGetAndCreateIndexFile();
if (null == indexFile) {
return null;
}
ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp());
}
return indexFile;
}
// 这个方法最好结合`第十章-存储文件`来看,内部就是对IndexHeader、Slot、Indexes的存储,理解了`图10-1`,这个方法的代码就不用看了
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
if (this.indexHeader.getIndexCount() < this.indexNum) {
// key的hash值
int keyHash = indexKeyHashMethod(key);
// 求模得出slot的槽位
int slotPos = keyHash % this.hashSlotNum;
// IndexHeader的大小(40字节)+槽位*每个槽位所占大小(4字节),得出要写入数据的槽位偏移
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
FileLock fileLock = null;
try {
// 先看看该槽位,原先有没有值
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
// 检验slotValue的合法性
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
slotValue = invalidIndex;
}
// 求出当前存储的时间与IndexFile存入的第一个消息的时间戳的比较
long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
timeDiff = timeDiff / 1000;
// 修正 timeDiff 的值
if (this.indexHeader.getBeginTimestamp() <= 0) {
timeDiff = 0;
} else if (timeDiff > Integer.MAX_VALUE) {
timeDiff = Integer.MAX_VALUE;
} else if (timeDiff < 0) {
timeDiff = 0;
}
// 计算开始写Index的位置
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE // IndexHeader 头部所占空间
+ this.hashSlotNum * hashSlotSize // 500w个slot所占空间
+ this.indexHeader.getIndexCount() * indexSize; // IndexCount表示已经占用的数量
// 存入keyhash
this.mappedByteBuffer.putInt(absIndexPos, keyHash);
// 存入commitlog物理偏移
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
// 存入时差
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
// 同一个slot下的前一个index的索引值,为解决slot槽位冲突,可以理解成HashMap中hashcode冲突时的链表
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
// 往slot中存入当前存放index的索引值
this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
// 初次存入,设置一下第一个元素时间戳和commitlog物理偏移
if (this.indexHeader.getIndexCount() <= 1) {
this.indexHeader.setBeginPhyOffset(phyOffset);
this.indexHeader.setBeginTimestamp(storeTimestamp);
}
// 统计
this.indexHeader.incHashSlotCount();
this.indexHeader.incIndexCount();
// 设置一下最后一个元素的时间戳和commitlog物理偏移
this.indexHeader.setEndPhyOffset(phyOffset);
this.indexHeader.setEndTimestamp(storeTimestamp);
// 结束,返回true
return true;
} catch (Exception e) {
log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
} finally {
if (fileLock != null) {
try {
fileLock.release();
} catch (IOException e) {
log.error("Failed to release the lock", e);
}
}
}
} else {
log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
+ "; index max num = " + this.indexNum);
}
return false;
}