RocetMQ笔记

📅 2026/7/2 14:59:16 👁️ 阅读次数 📝 编程学习
RocetMQ笔记

RocetMQ笔记

  • 概念对齐
  • 消息发送
    • 普通消息
    • 顺序消息
    • 延时消息
    • 批量消息
    • 过滤消息
    • 事务消息
    • 消息发送方式
    • 消息发送(DefaultMQProducer)的重要属性
    • 消息消费(DefaultMQPushConsumer——大多都是用推模式)的重要属性
    • 消费模式
  • 高可用认知
    • 单master模式
    • 多master模式
    • 多master模式多slave异步复制模式
    • 多master模式多slave同步复制模式 + 异步刷盘

概念对齐

  • NameServer
    RocketMQ的服务注册中心。启动RocketMQ时,先启动NameServer,再启动Broker
  • Broker
    用于暂存和传输消息
    启动时向所有的NameServer注册,并与NameServer保持长连接,默认每30s检查一次broker是否还存活。
    如果Broker 宕机,NameServer会将其从路由注册表中移除——这样可实现高可用
  • Producer 生产者
    只认topic,不认broker。
  • Customer 消费者
    消息订阅者,负责接收消息并消费
  • 消费组
    负载均衡:同一条消息,在一个消费组内只会被一个消费者消费。
    消费位点:多个消费组可以都订阅一个topic,每个组维护自己的消费进度
    死信队列&重试队列:每个消费组可以自动绑定自己专属的内部队列,
    死信队列:消息达到最大重试次数(默认16),进入本消费组的死信队列
    重试队列:消费抛异常或返回失败时,进入本消费组的重试队列
    消费模式:集群模式(组内多实例分摊消息,一个消息只会被一个实例消费),广播模式(组内的每个实例都消费一次)
  • 生产组
    无特别用于,主要用于事务消息
  • Topic
    一个生产者可以将消息推给一个或多个topic,一个消费者可以订阅一个或多个topic
  • tag
    发送时可以绑定一个tag,消费者订阅时可以指定消费消息的tag,那么其他tag的消息不会被投递
    Message Queue消息队列

消息发送

普通消息

引入依赖

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.8.0</version></dependency>

配置生产者

@BeanpublicDefaultMQProducermsgMQProducer()throwsMQClientException{DefaultMQProducerproducer=new(producerGroup);producer.setNamesrvAddr(nameServerAddress);producer.setSendMsgTimeout(10000);producer.start();returnproducer;}

生产者发送消息

publicclassMsgProducer{@AutowiredDefaultMQProducermsgMQProducer;publicbooleansend(Stringtopic,Stringtag,Stringmsg){try{byte[]body=msg.getBytes(RemotingHelper.DEFAULT_CHARSET);Messagemessage=newMessage(topic,tag,body);SendResultsendResult=msgMQProducer.send(message);// 同步发送消息到一个brokerif(SendStatus.SEND_OK.equals(sendResult.getSendStatus())){log.info("消息发送成功,消息id: {}",sendResult.getMsgId());returntrue;}else{log.warn("消息发送失败,状态: {}, 消息id: {}",sendResult.getSendStatus(),sendResult.getMsgId());returnfalse;}}catch(Exceptione){log.error("e",e);}returnfalse;}}

消费者配置

@BeanpublicDefaultMQPushConsumerMsgConsumer()throwsMQClientException{DefaultMQPushConsumerconsumer=newDefaultMQPushConsumer(consumerGroup);consumer.setNamesrvAddr(nameServerAddress);//订阅一个或多个topic,并指定tag过滤条件,这里指定*表示接收所有tag的消息consumer.subscribe(myTopic,"*");consumer.registerMessageListener(msgListener);// 注入消费处理器(回调函数),MessageListenerConcurrently的实现consumer.setConsumeThreadMin(10);// 最小消费者线程数returnconsumer;}@EventListener(ApplicationReadyEvent.class)publicvoidstartMsgConsumer()throwsMQClientException{DefaultMQPushConsumerconsumer=MsgConsumer();consumer.start();}

消息的发送者步骤
1.创建消息生产者,并指定生产者组名;
2. 指定NameServer地址
3. 启动producer
4. 创建消息对象,指定topic、tag和消息体
5. 发送消息

消息的消费者步骤
1.创建消息消费者,并指定消费者组名;
2. 指定NameServer地址
3. 订阅Topic和Tag
4. 设置回调函数。消费消息
5. 启动消费者

顺序消息

RocketMQ的顺序是指Queue级别的有序
使用顺序消费,首先要保证消息有序进入到MQ,消息依次发送到同一个队列,消费时从该Queue中依次拉取,就保证了有序
全程只有一个Queue参与

还有一种是分区有序。当设置了多个消费队列的场景时,假设一个订单有创建和付款流程,但都是同一个orderId,则可以根据orderId取模运算使同一个订单路由到相同队列中,实现生产者有序推送。
同样,消费者也要保证顺序消费,使用MessageListenerOrderly做回调消费逻辑(上一条没处理完则不会拉取下一条消息,MessageListenerConcurrently则是多线程并发消费,不保证消息顺序,不会阻塞整个队列),此时消费者若消费失败,则不能将消息放入重试队列——这样会导致乱序。此时会死循环重试消费该消息,若没有死信兜底,可能导致消息队列卡死

延时消息

开源版的RocketMQ只支持固定经度的延时消息,不能指定任意时间。比如生产者发送时指定DelayTimeLevel参数为16,则含义为30min后将消息投递给消费者

批量消息

TODO 待定
不支持延时,若消息体积大于4MB,最好切分后分批次发送

过滤消息

绑定Tag

事务消息

待定
不支持延时和批量消息

消息发送方式

发送方式发送TPS发送结果可靠性解释
同步可靠发送不丢失发出消息后等待接收方返回响应后,发送下一条消息
异步可靠发送不丢失发送消息不等待接收方,直接发送下一条消息。发送方通过回调接口的方式处理接收方的响应
单向发送(sendOneway)最快可能丢失发送方只负责发,无回调函数。耗时一般在微秒级别

消息发送(DefaultMQProducer)的重要属性

producerGroup:生产者所属组
defaultTopicQueueNums:默认主题在每一个Broker队列数量
sendMsgTimeout:发送消息默认超时时间,默认3s
compressMsgBodyOverHowmuch:消息体超过该值则启用压缩,默认4k
retryTimesWhenSendFailed:同步方式发送消息重试次数,默认为2,总共执行3次
retryTimesWhenSendAsyncFailed:异步方法发送消息重试次数,默认为2
retryAnotherBrokerWhenNotStoreOK:消息重试时选择另外一个Broker时,是否不等待存储结果就返回,默认为false
maxMessageSize:允许发送的最大消息长度,默认为4M

消息消费(DefaultMQPushConsumer——大多都是用推模式)的重要属性

messageModel:默认集群模式
consumeFromWhere:指定消费开始偏移量(最大偏移量、最小偏移量、启动时间戳)开始消费
consumeThreadMin:消费者最小线程数量
consumeThreadMax:消费者最大线程数量
pullInterval:推模式下任务间隔时间
pullBatchSize:推模式下任务拉取的条数,默认32条
maxReconsumeTimes:消息重试次数,-1代表16次
consumeTimeout:消息消费超时时间
subscribe:订阅消息,并指定队列选择器
unsubscribe:取消消息订阅
registerMessageListener:注册监听器

消费模式

集群模式:默认。消费组内的各个实例分摊消费。假设一个topic绑定了三个Queue,一个消费组内有三个消费实例,那么每个实例只消费其中一个Queue,消息会平均散落到不同的Queue上。可以认为是平均消费消息,并且该模式下消费进度会维护到broker中。
不保证失败重投的消息路由到同一台机器上

广播消费:积消费组内的每个实例都会被投递一遍消息,因此消费进度会持久化到实例的本地(所以重复消费的概率稍大些),并且不会失败重投,且不支持顺序消费,不支持重置消费位点

高可用认知

broker的配置文件中,brokerId设置为0——master,大于0——slave

brokerName=broker-a brokerId=0# 异步masterbrokerRole=ASYNC_MASTER namesrvAddr=ns1:9876;ns2:9876
brokerName=broker-a brokerId=1 brokerRole=SLAVE namesrvAddr=ns1:9876;ns2:9876

单master模式

不算集群,节点宕机服务就不可用了

多master模式

多个master节点组成集群,单个 master 节点的宕机或重启对应用没有影响
该体系中,topic对应的队列会分布到多个master上,比如topic对应4个队列,双master则会对应8个队列,每台master对应4个队列,方便横向扩展
但若master宕机,未被消费的消息在这期间是无法被消费的——影响消息的实时性,所以引入了多master多slave模式

多master模式多slave异步复制模式

每个master(读写)至少对应一个slave(读)
即便master宕机,由于主从之间做数据同步,所以依然可以在slave中消费存量消息,不会影响实时性
但需斟酌考虑宕机导致数据没有同步完,出现的消息丢失问题

多master模式多slave同步复制模式 + 异步刷盘

主从同步保证消息不丢失,异步刷盘(flushDiskType=ASYNC_FLUSH)保证高吞吐量
同步刷盘:生产者发送每一条消息都同步保存到磁盘
异步刷盘(默认):生产者发送的消息先缓存起来,然后(定期或达到某一设定值)异步将数据刷盘