架构师系列-消息中间件(八)- RocketMQ 进阶(二)-生产端消息保障

5. RocketMQ消息保障

下面我们详细说下如何保障消息不丢失以及消息幂等性问题

5.1 生产端保障

生产端保障需要从一下几个方面来保障

  1. 使用可靠的消息发送方式
  2. 注意生产端重试
  3. 生产禁止自动创建topic
5.1.1 消息发送保障
5.1.1.1 同步发送

发送者向MQ执行发送消息API时,同步等待,直到消息服务器返回发送结果,会在收到接收方发回响应之后才发下一个数据包的通讯方式,这种方式只有在消息完全发送完成之后才返回结果,此方式存在需要同步等待发送结果的时间代价。

简单来说,同步发送就是指 producer 发送消息后,会在接收到 broker 响应后才继续发下一条消息的通信方式。

使用场景

由于这种同步发送的方式确保了消息的可靠性,同时也能及时得到消息发送的结果,故而适合一些发送比较重要的消息场景,比如说重要的通知邮件、营销短信等等,在实际应用中,这种同步发送的方式还是用得比较多的。

注意事项

这种方式具有内部重试机制,即在主动声明本次消息发送失败之前,内部实现将重试一定次数,默认为2次(DefaultMQProducer#getRetryTimesWhenSendFailed),发送的结果存在同一个消息可能被多次发送给broker,这里需要应用的开发者自己在消费端处理幂等性问题。

5.1.1.2 异步发送

异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。 MQ 的异步发送,需要用户实现异步发送回调接口(SendCallback

 

异步发送是指 producer 发出一条消息后,不需要等待 broker 响应,就接着发送下一条消息的通信方式,需要注意的是,不等待 broker 响应,并不意味着 broker 不响应,而是通过回调接口来接收 broker 的响应,所以要记住一点,异步发送同样可以对消息的响应结果进行处理。

使用场景

由于异步发送不需要等待 broker 的响应,故在一些比较注重 RT(响应时间)的场景就会比较适用,比如,在一些视频上传的场景,我们知道视频上传之后需要进行转码,如果使用同步发送的方式来通知启动转码服务,那么就需要等待转码完成才能发回转码结果的响应,由于转码时间往往较长,很容易造成响应超时,此时,如果使用的是异步发送通知转码服务,那么就可以等转码完成后,再通过回调接口来接收转码结果的响应了。

5.1.2 消息发送总结
5.1.2.1 发送方式对比
发送方式发送 TPS发送结果反馈可靠性适用场景
同步发送一般不丢失重要的通知场景
异步发送不丢失比较注重 RT(响应时间)的场景
单向发送最快可能丢失可靠性要求并不高的场景
5.1.2.2 使用场景对比

在实际使用场景中,利用何种发送方式,可以总结如下:

  • 当发送的消息不重要时,采用one-way方式,以提高吞吐量;
  • 当发送的消息很重要是,且对响应时间不敏感的时候采用sync方式;
  • 当发送的消息很重要,且对响应时间非常敏感的时候采用async方式;
5.1.3 发送状态

发送消息时,将获得包含SendStatus的SendResult,首先,我们假设Message的isWaitStoreMsgOK = true(默认为true),如果没有抛出异常,我们将始终获得SEND_OK,以下是每个状态的说明列表:

5.1.3.1 FLUSH_DISK_TIMEOUT

如果设置了 FlushDiskType=SYNC_FLUSH (默认是 ASYNC_FLUSH),并且 Broker 没有在 syncFlushTimeout (默认是 5 秒)设置的时间内完成刷盘,就会收到此状态码。

5.1.3.2 FLUSH_SLAVE_TIMEOUT

如果设置为 SYNC_MASTER,并且 slave Broker 没有在 syncFlushTimeout 设定时间内完成同步,就会收到此状态码。

5.1.3.3 SLAVE_NOT_AVAILABLE

如果设置为 SYNC_MASTER,并没有配置 slave Broker,就会收到此状态码。

5.1.3.4 SEND_OK

这个状态可以简单理解为,没有发生上面列出的三个问题状态就是SEND_OK,需要注意的是,SEND_OK 并不意味着可靠,如果想严格确保没有消息丢失,需要开启 SYNC_MASTER or SYNC_FLUSH

5.1.3.5 注意事项

如果收到了 FLUSH_DISK_TIMEOUT, FLUSH_SLAVE_TIMEOUT,意味着消息会丢失,有2个选择,一是无所谓,适用于消息不关紧要的场景,二是重发,但可能产生消息重复,这就需要consumer进行去重控制,如果收到了 SLAVE_NOT_AVAILABLE 就要赶紧通知管理员了。

5.1.4 MQ发送端重试保障

如果由于网络抖动等原因,Producer程序向Broker发送消息时没有成功,即发送端没有收到Broker的ACK,导致最终Consumer无法消费消息,此时RocketMQ会自动进行重试。

DefaultMQProducer可以设置消息发送失败的最大重试次数,并可以结合发送的超时时间来进行重试的处理,具体API如下:

//设置消息发送失败时的最大重试次数
public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) {
   this.retryTimesWhenSendFailed = retryTimesWhenSendFailed;
}
 
//同步发送消息,并指定超时时间
public SendResult send(Message msg,
                      long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
   return this.defaultMQProducerImpl.send(msg, timeout);
5.1.4.1 重试解惑

超时重试针对网上说的超时异常会重试的说法都是错误的

是因为下面测试代码的超时时间设置为5毫秒 ,按照正常肯定会报超时异常,但设置1次重试和3000次的重试,虽然最终都会报下面异常,但输出错误时间报显然不应该是一个级别,但测试发现无论设置的多少次的重试次数,报异常的时间都差不多。

测试代码

public class RetryProducer {
    public static void main(String[] args) throws UnsupportedEncodingException, InterruptedException, RemotingException, MQClientException, MQBrokerException {
        //创建一个消息生产者,并设置一个消息生产者组
        DefaultMQProducer producer = new DefaultMQProducer("rocket_test_consumer_group");

        //指定 NameServer 地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        //设置重试次数(默认2次)
        producer.setRetryTimesWhenSendFailed(300000);
        //初始化 Producer,整个应用生命周期内只需要初始化一次
        producer.start();
        Message msg = new Message(
                /* 消息主题名 */
                "topicTest",
                /* 消息标签 */
                "TagA",
                /* 消息内容 */
                ("Hello Java demo RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET));
        //发送消息并返回结果,设置超时时间 5ms 所以每次都会发送失败
        SendResult sendResult = producer.send(msg, 5);

        System.out.printf("%s%n", sendResult);
        // 一旦生产者实例不再被使用则将其关闭,包括清理资源,关闭网络连接等
        producer.shutdown();
    }
}
揭晓答案

针对这个疑惑,需要查看源码,发现同步发送的时候,超时是不重试的

/**
  * 说明 抽取部分代码
  */
private SendResult sendDefaultImpl(Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout) {

    //1、获取当前时间
    long beginTimestampFirst = System.currentTimeMillis();
    long beginTimestampPrev ;
    //2、去服务器看下有没有主题消息
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
        boolean callTimeout = false;
        //3、通过这里可以很明显看出 如果不是同步发送消息 那么消息重试只有1次
        int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
        //4、根据设置的重试次数,循环再去获取服务器主题消息
        for (times = 0; times < timesTotal; times++) {
            MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
            beginTimestampPrev = System.currentTimeMillis();
            long costTime = beginTimestampPrev - beginTimestampFirst;
            //5、前后时间对比 如果前后时间差 大于 设置的等待时间 那么直接跳出for循环了 这就说明连接超时是不进行多次连接重试的
            if (timeout < costTime) {
                callTimeout = true;
                break;

            }
            //6、如果超时直接报错
            if (callTimeout) {
                throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
            }
        }
    }

异步重试

可以通过以下代码设置异步从重试次数,默认两次

producer.setRetryTimesWhenSendAsyncFailed(2);

 异步重试是通过递归的方式来进行重试的

MQClientAPIImpl#sendMessageAsync

private void sendMessageAsync(... ) throws InterruptedException, RemotingException {
        final long beginStartTime = System.currentTimeMillis();
       .....
  producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
} catch (Exception e) {
    producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
    //发送重试请求
    onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
                    retryTimesWhenSendFailed, times, e, context, true, producer);
}
.....

MQClientAPIImpl#onExceptionImpl

private void onExceptionImpl(...){
    //获取到当前重试次数
    int tmp = curTimes.incrementAndGet();
    //是否需要重试,并且重试次数小于timesTotal
     if (needRetry && tmp <= timesTotal) {
         ...
         try {
             request.setOpaque(RemotingCommand.createNewRequestId());
             sendMessageAsync(addr, retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
                              timesTotal, curTimes, context, producer);
         } catch (InterruptedException e1) {
             onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
                             context, false, producer);
             ...
     }
    
}
5.1.4.2 重试总结

通过这段源码很明显可以看出以下几点

  1. 如果是异步发送默认重试次数是两次,通过递归的方式进行重试
  2. 对于同步而言,超时异常也是不会再去重试
  3. 同步发送重试是在一个for 循环里去重试,所以它是立即重试而不是隔一段时间去重试。
5.1.5 禁止自动创建topic
5.1.5.1 自动创建TOPIC流程

autoCreateTopicEnable设置为true 标识开启自动创建topic

  1. 消息发送时如果根据topic没有获取到 路由信息,则会根据默认的topic去获取,获取到路由信息后选择一个队列进行发送,发送时报文会带上默认的topic以及默认的队列数量。
  2. 消息到达broker后,broker检测没有topic的路由信息,则查找默认topic的路由信息,查到表示开启了自动创建topic,则会根据消息内容中的默认的队列数量在本broker上创建topic,然后进行消息存储。
  3. broker创建topic后并不会马上同步给namesrv,而是每30进行汇报一次,更新namesrv上的topic路由信息,producer会每30s进行拉取一次topic的路由信息,更新完成后就可以正常发送消息,更新之前一直都是按照默认的topic查找路由信息。
5.1.5.2 为什么不能开启自动创建

上述 broker 中流程会有一个问题,就是在producer更新路由信息之前的这段时间,如果消息只发送到了broker-a,则broker-b上不会创建这个topic的路由信息,broker互相之间不通信,当producer更新之后,获取到的broker列表只有broker-a,就永远不会轮询到broker-b的队列(因为没有路由信息),所以我们生产通常关闭自动创建broker,而是采用手动创建的方式。

5.1.6 发端端规避

注意了,这里我们发现,有可能在实际的生产过程中,我们的 RocketMQ 有几台服务器构成的集群

其中有可能是一个主题 TopicA 中的 4 个队列分散在 Broker1、Broker2、Broker3 服务器上。

如果这个时候 Broker2 挂了,我们知道,但是生产者不知道(因为生产者客户端每隔 30S 更新一次路由,但是 NamServer 与 Broker 之间的心跳检测间隔是 10S,所以生产者最快也需要 30S 才能感知 Broker2 挂了),所以发送到 queue2 的消息会失败,RocketMQ 发现这次消息发送失败后,就会将 Broker2排除在消息的选择范围,下次再次发送消息时就不会发送到 Broker2,这样做的目的就是为了提高发送消息的成功率。

5.1.6.1 问题梳理

例如在发送之前 sendWhichQueue 该值为 broker-a 的 q1,如果由于此时 broker-a 的突发流量异常大导致消息发送失败,会触发重试,按照轮循机制,下一个选择的队列为 broker-a 的 q2 队列,此次消息发送大概率还是会失败,即尽管会重试 2 次,但都是发送给同一个 Broker 处理,此过程会显得不那么靠谱,即大概率还是会失败,那这样重试的意义将大打折扣。

故 RocketMQ 为了解决该问题,引入了故障规避机制,在消息重试的时候,会尽量规避上一次发送的 Broker,回到上述示例,当消息发往 broker-a q1 队列时返回发送失败,那重试的时候,会先排除 broker-a 中所有队列,即这次会选择 broker-b q1 队列,增大消息发送的成功率。

上述规避思路是默认生效的,即无需干预。

5.1.6.2 规则策略

但 RocketMQ 提供了两种规避策略,该参数由 sendLatencyFaultEnable 控制,用户可干预,表示是否开启延迟规避机制,默认为不开启。(DefaultMQProducer中设置这两个参数)

  • sendLatencyFaultEnable 设置为 false:默认值,不开启,延迟规避策略只在重试时生效,例如在一次消息发送过程中如果遇到消息发送失败,规避 broekr-a,但是在下一次消息发送时,即再次调用 DefaultMQProducer 的 send 方法发送消息时,还是会选择 broker-a 的消息进行发送,只要继续发送失败后,重试时再次规避 broker-a。
  • sendLatencyFaultEnable 设置为 true:开启延迟规避机制,一旦消息发送失败会将 broker-a “悲观”地认为在接下来的一段时间内该 Broker 不可用,在为未来某一段时间内所有的客户端不会向该 Broker 发送消息,这个延迟时间就是通过 notAvailableDuration、latencyMax 共同计算的,就首先先计算本次消息发送失败所耗的时延,然后对应 latencyMax 中哪个区间,即计算在 latencyMax 的下标,然后返回 notAvailableDuration 同一个下标对应的延迟值。
5.1.6.3 注意事项

如果所有的 Broker 都触发了故障规避,并且 Broker 只是那一瞬间压力大,那岂不是明明存在可用的 Broker,但经过你这样规避,反倒是没有 Broker 可用来,那岂不是更糟糕了?针对这个问题,会退化到队列轮循机制,即不考虑故障规避这个因素,按自然顺序进行选择进行兜底。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/573626.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

springBoot集成flowable

前言 Flowable可以十分灵活地加入你的应用/服务/构架。可以将JAR形式发布的Flowable库加入应用或服务&#xff0c;来嵌入引擎。 以JAR形式发布使Flowable可以轻易加入任何Java环境&#xff1a;Java SE&#xff1b;Tomcat、Jetty或Spring之类的servlet容器&#xff1b; JBoss…

网工内推 | 上市公司网络运维,大专可投,NA/NP认证优先

01 珠海世纪鼎利科技股份有限公司 招聘岗位&#xff1a;网络运维工程师 职责描述&#xff1a; 1、负责服务器安装、维护和设备管理&#xff1b; 2、负责应用系统的部署&#xff0c;升级&#xff0c;维护&#xff1b; 3、分析网络数据&#xff0c;排查网络故障及事务的应急响应…

百度网盘svip白嫖永久手机2024最新教程

百度网盘&#xff08;原名百度云&#xff09;是百度推出的一项云存储服务&#xff0c;已覆盖主流PC和手机操作系统&#xff0c;包含Web版、Windows版、Mac版、Android版、iPhone版和Windows Phone版。用户将可以轻松将自己的文件上传到网盘上&#xff0c;并可跨终端随时随地查看…

【C语言】红黑树详解以及C语言模拟

一、红黑树的性质二、红黑树的旋转操作三、红黑树的插入操作四、红黑树的删除操作五、红黑树的应用六、C语言模拟红黑树七、总结 红黑树是一种自平衡二叉查找树&#xff0c;它能够保持树的平衡&#xff0c;从而确保查找、插入和删除的最坏情况时间复杂度为O( l o g n log_n log…

软考-系统集成项目管理中级--项目人力资源管理(输入输出很重要!!!本章可能包含案例题)

本章历年考题分值统计 本章重点常考知识点汇总清单(学握部分可直接理解记忆) 10、项目沟通管理计划一般应包括以下内容:(掌握)12上59&#xff0c;10下59&#xff0c;13上53&#xff0c;13上57,14下58&#xff0c;15下58&#xff0c;16上59 考题 (1)干系人的沟通需求。 (2)针对沟…

爬虫抓取网站数据

Fiddler 配置fiddler工具结合浏览器插件 配置fiddler Tools--Options 抓包技巧 谷歌浏览器开启无痕浏览,使用SwitchyOmega配置好代理端口 Ctrl x 清理所有请求记录,可以删除指定不需要日志方便观察 设置按请求顺序 观察cookie,观察请求hesder cookie和row返回结果 Swit…

C++/QT + Mysql + Tcp 企业协作管理系统

目录 一、项目介绍 二、项目展示 三、源码获取 一、项目介绍 1、项目概要&#xff1a;C/S架构、数据库Mysql、C、QT&#xff1b;支持实时通信、局域网内通信&#xff0c;可多个客户端同时登录&#xff1b; 2、&#xff08;Server&#xff09;管理端&#xff1a;用户管理、…

科技云报道:AIGC掀算力需求革命,边缘计算将不再“边缘”

科技云报道原创。 随着以大模型为代表的AIGC时代拉开序幕&#xff0c;算力需求持续爆发&#xff0c;AI与边缘深度融合已是大势所趋&#xff0c;越来越多的企业开始积极布局GenAI。 GenAI技术的商用化部署和应用成为企业竞逐的新阵地&#xff0c;勾勒出大模型从“技术力”转向…

数组模拟几种基本的数据结构

文章目录 数组模拟单链表数组模拟双链表数组实现栈数组模拟队列总结 数组模拟单链表 首先类比结构体存储单链表&#xff0c;我们需要一个存放下一个节点下标的数组&#xff0c;还需要一个存储当前节点的值的数组&#xff0c;其次就是一个int类型的索引&#xff0c;这个索引指向…

挑战一周完成Vue3实战项目硅谷甄选Day1:项目初始化、项目配置、项目集成

一、项目初始化 node v16.4.0以上&#xff08;查看node版本 : node -v&#xff09; pnpm 8.0.0&#xff08;npm i -g pnpm8.0.0&#xff09; 在想创建的位置新建文件夹自己命名 在此文件夹下cmd:pnpm create vite 选择如下配置 Project name&#xff08;项目名称&#xff0…

Java设计模式 _创建者模式_工厂模式(普通工厂和抽象工厂)

一、工厂模式 属于Java设计模式创建者模式的一种。在创建对象时不会对客户端暴露创建逻辑&#xff0c;并且是通过使用一个共同的接口来指向新创建的对象。 二、代码示例 场景&#xff1a;花店有不同的花&#xff0c;通过工厂模式来获取花。 1、普通工厂模式 逻辑步骤&#…

Spring - 5 ( 8000 字 Spring 入门级教程 )

一&#xff1a;Spring IoC&DI 1.1 方法注解 Bean 类注解是添加到某个类上的&#xff0c; 但是存在两个问题: 使用外部包里的类, 没办法添加类注解⼀个类, 需要多个对象, ⽐如多个数据源 这种场景, 我们就需要使用方法注解 Bean 我们先来看方法注解如何使用: public c…

AI-数学-高中-42导数的概念与意义

原作者视频&#xff1a;【导数】【一数辞典】1导数的概念与意义_哔哩哔哩_bilibili .a是加速度&#xff1b;

OpenAI 笔记:获取embedding

1 输入openai的api key from openai import OpenAIclient OpenAI(api_key**) 2 举例 response client.embeddings.create(input"Hello",model"text-embedding-3-small" )print(response.data[0].embedding) 默认情况下&#xff0c;text-embedding-3-s…

配置Linux【虚拟机】与 windows【宿主机】网络互通 (面向小白,简单操作)

1. 启动虚拟机&#xff0c;运行Linux系统 这里我使用 VMware Workstation Pro 来运行Linux系统&#xff08;cent-os7&#xff09;2. 鼠标右键打开终端 3. 输入 cd /etc/sysconfig/network-scripts , 然后输入ls &#xff0c;查看当前目录下的网卡 一般来说&#xff0c;虚拟机的…

计算机网络基础认识

本篇文章是我在B站上看到关于计算机网络的介绍视频收到的启发。本篇文章的内容来自【网络】半小时看懂<计算机网络>_哔哩哔哩_bilibili 一、物理层 从常理来说&#xff0c;进行连个设备之间的通讯&#xff0c;首先最容易想到的就是使用一根线连接两个设备进行通讯。但是…

Docker的数据管理、网络通信和dockerfile

目录 一、Docker的数据管理 1. 数据卷 1.1 数据卷定义 1.2 数据卷配置 2. 数据卷容器 2.1 创建数据卷容器 2.2 使用--volume-from来挂载test1 二、端口映射 三、容器互联 1. 创建容器互联 ​编辑2. 进入test2测试&#xff08;ping 容器名/别名&#xff09; 四、Dock…

【弱监督点云分割】All Points Matter:用于弱监督三维分割的熵细化分布对齐

All Points Matter: Entropy-Regularized Distribution Alignment for Weakly-supervised 3D Segmentation 摘要&#xff1a; 伪标签被广泛应用于弱监督三维分割任务中&#xff0c;在这种任务中&#xff0c;只有稀疏的地面真实标签可供学习使用。现有方法通常依赖经验标签选择…

用立创EDA实现一个小项目

项目介绍 名称&#xff1a;蓝牙音响 功能&#xff1a;按键切换 蓝牙控制 语音控制 项目流程 市场调研产品立项----老板--经理硬件&#xff08;外观、尺寸、大小、使用环境&#xff09;软件&#xff08;代码开发环节&#xff09;产品测试 以管理员身份运行 新建文件夹&…
最新文章