课程地址:0.第三章介绍_哔哩哔哩_bilibili (视频89~148,RocketMQ高级功能&源码分析)
第三部分 高级功能
第1节 消息存储
消息存储的介质有两种:关系型数据库DB,文件系统。
ActiveMQ默认采用的数据库KahaDB做消息存储,数据量达到一定量级后会有性能瓶颈。
RocketMQ、Kafka、RabbitMQ均采用文件系统进行持久化(即消息刷盘)。消息刷盘为消息存储提供了一种高效率、高可靠性和高性能的数据持久化方式。除非部署MQ机器本身或是本地磁盘挂了,否则一般是不会出现无法持久化的故障问题。刷盘一般可以分为异步刷盘和同步刷盘两种模式。
1.1 消息的存储和发送
总结一句话:RocketMQ利用磁盘顺序写来提高消息存储(写)的速度,利用零拷贝技术提高消息发送(读)的速度。
消息存储
RocketMQ的消息用顺序写,保证了消息存储的速度。目前的高性能磁盘,顺序写速度可以达到600MB/s,随机写的速度只有大概100KB/s,和顺序写的性能相差6000倍!
消息发送
Linux操作系统分为【用户态】和【内核态】,文件操作、网络操作需要涉及这两种形态的切换,免不了进行数据复制。
1.2 消息存储结构(文件系统)
RocketMQ消息的存储是由ConsumeQueue和CommitLog配合完成的,消息真正的物理存储文件是CommitLog,ConsumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每个Topic下的每个Message Queue都有一个对应的ConsumeQueue文件。
-
CommitLog:存储消息的元数据
-
ConsumerQueue:存储消息在CommitLog的索引
-
IndexFile:为了消息查询提供了一种通过key或时间区间来查询消息的方法,这种通过IndexFile来查找消息的方法不影响发送与消费消息的主流程
1.3 刷盘机制
RocketMQ的消息是存储到磁盘上的,这样既能保证断电后恢复, 又可以让存储的消息量超出内存的限制。RocketMQ为了提高性能,会尽可能地保证磁盘的顺序写。消息在通过Producer写入RocketMQ的时候,有两种写磁盘方式,分布式同步刷盘和异步刷盘。通过flushDiskType参数进行配置。
- 同步刷盘:可靠性高。
- 异步刷盘:高吞吐量,提高了效率。
具体在Broker集群的配置文件中进行配置:
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
第2节 高可用性机制
首先从各组件说起,RocketMQ分为四个组件——NameServer、Broker、Producer、Consumer。我们搭建双主双从(2m-2s-sync)集群时,就搭建了NameServer集群以保证NameServer的高可用。NameServer是无状态的,也就是说如果想增加节点时,无需重启服务。
RocketMQ分布式集群是通过Master和Slave的配合达到高可用性的。
Master和Slave的区别:在Broker的配置文件中,参数 brokerId的值为0表明这个Broker是Master,大于0表明这个Broker是 Slave,同时brokerRole参数也会说明这个Broker是Master还是Slave。
Master角色的Broker支持读和写,Slave角色的Broker仅支持读,也就是Producer只能和Master角色的Broker连接写入消息;Consumer可以连接Master角色的Broker,也可以连接Slave角色的Broker来读取消息。
2.1 消息消费高可用
Consumer默认从Master读,当Master不可用或者繁忙时,会自动切换从Slave读,从而保证了消费的高可用。
2.2 消息发送高可用
如何保证Producer一定能够发送消息成功呢?双主双从集群可以保证。因为如果不是双主双从,而是一主多从的话,一旦Master节点的Broker挂了之后,消息便不再能写入Broker了(Slave节点仅支持读)。只有双主双从时,一台Master节点的Broker挂了之后,会自动切换到另外一个Master节点进行发送(写入)消息,从而保证了消息发送的高可用。
面试题:RocketMQ如何保证高可用性?
从四个角度(组件)去说。
1. NameServer 的高可用:搭建NameServer集群。
2. Broker 的高可用:搭建Broker集群,通过Master和Slave的配合来达到高可用。
3. Producer 的高可用:见上面2.2
4. Consumer 的高可用:见上面2.1
2.3 消息主从复制
如果一个Broker组有Master和Slave,消息需要从Master复制到Slave 上,有同步和异步两种复制方式。通过brokerRole参数进行配置,该参数可以被配成ASYNC_MASTER、SYNC_MASTER、SLAVE三个值中的一个。
同步复制:
- 等Master和Slave均写成功后才反馈给客户端Producer写成功状态。
- 会增大数据写入 延迟,降低系统吞吐量。
- 优点是数据安全性高。
异步复制:
- 只要Master写成功即可反馈给客户端写成功状态。
- 系统拥有较低的延迟和较高的吞吐量,但是如果Master出了故障,有些数据因为没有被写入Slave,有可能会丢失。
具体在Broker集群的配置文件中进行配置:
#Broker的角色是Master节点时,配置SYNC_MASTER或者ASYNC_MASTER
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#Broker的角色是Master节点时,只能配置SLAVE
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
辨析:
同步复制or异步复制:是针对不同Broker节点来说的,即Master节点的数据复制到Slave节点的方式。
同步刷盘or异步刷盘:是针对同一个Broker来说的,即消息数据从内存持久化到磁盘的方式。
注意:
1)复制方式的配置
- 如果是Master节点时,若数据安全性是第一要求,选择同步复制SYNC_MASTER;若较低的延迟和较高的吞吐量为第一要求,则选择异步复制ASYNC_MASTER。
- 如果是Slave节点时,只能选择SLAVE。
2)刷盘机制的配置
- 如果是Master节点时,若对安全性要求高,选择SYNC_FLUSH同步刷盘;若要求高吞吐量,则选择ASYNC_FLUSH 异步刷盘。
- 如果是Slave节点时,通常选择异步刷盘ASYNC_FLUSH。
实际开发中,刷盘的配置建议主从Broker都配置为异步的ASYNC_FLUSH,因为SYNC_FLUSH方式会频繁地触发磁盘写动作,会明显降低性能;主从之间复制建议配置为同步的SYNC_MASTER,这样可以最大限度保证数据不丢失。
第3节 负载均衡
3.1 Producer负载均衡
Producer端,每个实例在发消息的时候,默认会轮询所有的message queue发送,以达到让消息平均落在不同的queue上。而由于queue可以散落在不同的broker,所以消息就发送到不同的broker下。
3.2 Consumer负载均衡
消费模式有两种:集群模式和广播模式。
消费者的负载均衡指的是集群模式。Consumer Group同名的多个消费者同步负担消费同一主题Topic下的不同队列Queue中的消息,每条消息只被消费一次。
第4节 消息重试
4.1 顺序消息的重试
对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ 会自动不断进行消息重试(每次间隔时间为 1 秒),这时,应用会出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。
4.2 无序消息的重试
对于无序消息(普通、定时、延时、事务消息),当消费者消费消息失败时,您可以通过设置返回状态达到消息重试的结果。
无序消息的重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。
1)重试次数
消息队列 RocketMQ 默认允许每条消息最多重试16次。如果消息重试16次(4小时46分钟)后仍然失败,消息将不再投递,进入死信队列。
注意: 一条消息无论重试多少次,这些重试消息的 Message ID 不会改变。
2)是否重试的配置方式
①消费失败后,进行重试的配置方式
集群消费方式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置(三种方式任选一种):
-
返回 Action.ReconsumeLater (推荐)
-
返回 Null
-
抛出异常
public class MessageListenerImpl implements MessageListener {
@Override
public Action consume(Message message, ConsumeContext context) {
//处理消息
doConsumeMessage(message);
//方式1:返回 Action.ReconsumeLater,消息将重试
return Action.ReconsumeLater;
//方式2:返回 null,消息将重试
return null;
//方式3:直接抛出异常, 消息将重试
throw new RuntimeException("Consumer Message exceotion");
}
}
②消费失败后,不重试配置方式
集群消费方式下,消息失败后期望消息不重试,需要捕获消费逻辑中可能抛出的异常,最终返回 Action.CommitMessage,此后这条消息将不会再重试。
public class MessageListenerImpl implements MessageListener {
@Override
public Action consume(Message message, ConsumeContext context) {
try {
doConsumeMessage(message);
} catch (Throwable e) {
//捕获消费逻辑中的所有异常,并返回 Action.CommitMessage;
return Action.CommitMessage;
}
//消息处理正常,直接返回 Action.CommitMessage;
return Action.CommitMessage;
}
}
③自定义消息最大重试次数
消息队列 RocketMQ 允许 Consumer 启动的时候设置最大重试次数,重试时间间隔将按照如下策略:
-
最大重试次数小于等于 16 次,则重试时间间隔同上表描述。
-
最大重试次数大于 16 次,超过 16 次的重试时间间隔均为每次 2 小时。
Properties properties = new Properties();
//配置对应 Group ID 的最大消息重试次数为 20 次
properties.put(PropertyKeyConst.MaxReconsumeTimes,"20");
Consumer consumer =ONSFactory.createConsumer(properties);
注意:
消息最大重试次数的设置对相同 Group ID 下的所有 Consumer 实例有效。
如果只对相同 Group ID 下两个 Consumer 实例中的其中一个设置了 MaxReconsumeTimes,那么该配置对两个 Consumer 实例均生效。
配置采用覆盖的方式生效,即最后启动的 Consumer 实例会覆盖之前的启动实例的配置
④获取消息重试次数
消费者收到消息后,可按照如下方式获取消息的重试次数:
public class MessageListenerImpl implements MessageListener {
@Override
public Action consume(Message message, ConsumeContext context) {
//获取消息的重试次数
System.out.println(message.getReconsumeTimes());
return Action.CommitMessage;
}
}
第5节 死信队列
1)死信队列是如何产生的?
2)死信消息的特征?
不会再被消费者正常消费。
有效期与正常消息相同,均为 3 天,3 天后会被自动删除。因此,请在死信消息产生后的 3 天内及时处理。
3)死信队列的特征?
一个死信队列对应一个 Group ID, 而不是对应单个消费者实例。
如果一个 Group ID 未产生死信消息,消息队列 RocketMQ 不会为其创建相应的死信队列。
一个死信队列包含了对应 Group ID 产生的所有死信消息,不论该消息属于哪个 Topic。
4)死信消息是否可以重新发送?
可以。一条消息进入死信队列,意味着某些因素导致消费者无法正常消费该消息,因此,通常需要您对其进行特殊处理。排查可疑因素并解决问题后,可以在消息队列 RocketMQ 控制台重新发送该消息,让消费者重新消费一次。
第6节 消费幂等
消息队列 RocketMQ 消费者在接收到消息以后,有必要根据业务上的唯一 Key 对消息做幂等处理的必要性。
6.1 消费幂等的必要性
在互联网应用中,尤其在网络不稳定的情况下,消息队列 RocketMQ 的消息有可能会出现重复,这个重复简单可以概括为以下情况:
-
发送时消息重复
当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端(Broker)对客户端(Producer)应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
-
投递时消息重复
消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端(Consumer)给服务端(Broker)反馈应答的时候网络闪断。 为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
-
负载均衡时消息重复(包括但不限于网络抖动、Broker 重启以及订阅方应用重启)
当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。
6.2 处理方式
因为 Message ID 有可能出现冲突(重复)的情况,所以真正安全的幂等处理,不建议以 Message ID 作为处理依据。 最好的方式是以业务唯一标识作为幂等处理的关键依据,而业务的唯一标识可以通过消息 Key 进行设置:
Message message = new Message();
message.setKey("ORDERID_100");
SendResult sendResult = producer.send(message);
订阅方收到消息时可以根据消息的 Key 进行幂等处理:
consumer.subscribe("ons_test", "*", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
String key = message.getKey()
// 根据业务唯一标识的 key 做幂等处理
}
});
另外,可以参考上一篇文章项目中的具体使用。
第四部分 源码分析
第1节 环境搭建
源码目录结构:
broker: broker 模块(broke 启动进程)
client :消息客户端,包含消息生产者、消息消费者相关类
common :公共包
dev :开发者信息(非源代码)
distribution :部署实例文件夹(非源代码)
example: RocketMQ 例代码
filter :消息过滤相关基础类
filtersrv:消息过滤服务器实现相关类(Filter启动进程)
logappender:日志实现相关类
namesrv:NameServer实现相关类(NameServer启动进程)
openmessageing:消息开放标准
remoting:远程通信模块,给予Netty
srcutil:服务工具类
store:消息存储实现相关类
style:checkstyle相关实现
test:测试相关类
tools:工具类,监控命令相关实现类
下载好源码后,导入idea,执行命令进行安装:
clean install -Dmaven.test.skip=true
安装成功后,进行调试。
调试
1)启动NameServer
直接启动时,控制台会报错:Please set the ROCKETMQ_HOME variable in your environment to match the location of the RocketMQ installation。故应该首先配置ROCKETMQ_HOME环境变量再启动。该环境变量的value值应该写到conf配置文件夹所在位置才行。
2)单节点启动Broker
源码分析时,没必要启动集群。配置文件位置:D:\JavaTools\rocketmq_projects\rocketmq-all-4.8.0-source-release\distribution\conf\broker.conf
第一步,修改配置文件broker.conf内容如下:
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
# namesrvAddr地址
namesrvAddr=127.0.0.1:9876
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
autoCreateTopicEnable=true
# 存储路径
storePathRootDir=D:\\JavaTools\\rocketmq_projects\\data
# commitLog路径
storePathCommitLog=D:\\JavaTools\\rocketmq_projects\\data\\commitlog
# 消息队列存储路径
storePathConsumeQueue=D:\\JavaTools\\rocketmq_projects\\data\\consumequeue
# 消息索引存储路径
storePathIndex=D:\\JavaTools\\rocketmq_projects\\data\\index
# checkpoint文件路径
storeCheckpoint=D:\\JavaTools\\rocketmq_projects\\data\\checkpoint
# abort文件存储路径
abortFile=D:\\JavaTools\\rocketmq_projects\\data\\abort
之所以配置存储路径,是为了查看那些文件时更方便,否则会存储到默认位置。
第二步,数据文件夹D:\\JavaTools\\rocketmq_projects\\data要创建出来。
第三步,配置broker.conf
和ROCKETMQ_HOME
后(如下图所示),直接启动Broker。
启动成功后,控制台打印:
The broker[broker-a, 192.168.6.1:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876
3)发送&接收消息