RabbitMq:RabbitMq消息中的相关处理 ③

一、解耦思想

        在 RabbitMQ 在设计的时候,特意让生产者和消费者分离,也就是消息的发布和消息的消费之间是解耦的。

        生产者与消费者之间的直连,少了很多的灵活性和策略的制定。还有一种解耦的思想存在。

  二、消息的可靠性保证与性能关系

        消息的可靠性保证:

                                1.靠事务机制(存在事务的几阶段过程,性能下降严重)

                                2.确定机制--异步回复机制(轻量级处理机制)

        相比之下,发送方确认机制最大的好处在于它提供异步回复机制,一旦发布一条消息生产者程序可以在等待信道返回确认的同时继续发布下一条消息,当消息得到最终确认之后,生产者程序可以通过回调 方法来处理该确认消息,

confirm的三种实现方式

        方式一:channel.waitForConfirms()普通发送方确认模式;消息到达交换器,就会返回 true。

        方式二:channel.waitForConfirmsOrDie()批量confirm模式,每发送一批消息之后,调用waitForConfirms()方法,等待服务端confirm,这种批量确认的模式极大的提高了confirm效率,但只要有一个消息未到达交换器就会抛出 IOException 异常。客户端往往需要将这一批次的消息全部重发,这会带来明显的重复消息,如果这种情况频繁发生的话(网络不稳定),效率也会不升反降;

        方式三:channel.addConfirmListener()异步监听发送方确认模式; 通过监听器的返回效果和批量confirm模式类似,都是每隔一段时间确认一批消息,区别就是不会造成生产者阻塞,但依旧会导致重复消息

        在 RabbitMQ 中,有不同的投递机制(生产者),但是每一种机制都对性能有一定的影响。一般来讲速度快的可靠性低,可靠性好的性能差,具体怎么使用需要根据你的应用程序来定,所以说没有最好的方式,只有最合适的方式。只有把你的项目和技术相结合,才能找到适合你的平衡。

三、消息推拉模式

        Consumer消费消息,并向服务器进行应答。表明这个消息已经消费完了还是可以继续让别人消费。主要收集了两种消费方式

推模式(push)

        1:推模式接收消息是最有效的一种消息处理方式。channel.basicConsume(queneName,consumer)方法将信道(channel)设置成投递模式,直到取消队列的订阅为止;在投递模式期间,当消息到达RabbitMQ时,RabbitMQ会自动地、不断地投递消息给匹配的消费者,而不需要消费端手动来拉取,当然投递消息的个数还是会受到channel.basicQos的限制。

        2:推模式将消息提前推送给消费者,消费者必须设置一个缓冲区缓存这些消息。优点是消费者总是有一堆在内存中待处理的消息,所以当真正去消费消息时效率很高。缺点就是缓冲区可能会溢出。

        3:由于推模式是信息到达RabbitMQ后,就会立即被投递给匹配的消费者,所以实时性非常好,消费者能及时得到最新的消息。

        4:Consumer等待rabbitMQ 服务器将message推送过来再消费。一般是启一个一直挂起的线程来等待。

拉模式(pull)

        1:如果只想从队列中获取单条消息而不是持续订阅,则可以使用channel.basicGet方法来进行消费消息。

        2:拉模式在消费者需要时才去消息中间件拉取消息,这段网络开销会明显增加消息延迟,降低系统吞吐量。 

        3:由于拉模式需要消费者手动去RabbitMQ中拉取消息,所以实时性较差;消费者难以获取实时消息,具体什么时候能拿到新消息完全取决于消费者什么时候去拉取消息。

结论

1:不能在循环中使用拉模式来模拟推模式,因为拉模式每次都需要去消息中间件中拉取消息来消费,所以会严重影响RabbitMQ性能。

2:要想实现高吞吐量,消费者需要使用推模式。

  • 其次,官方特意提到的message的持久性。关键的message不能因为服务出现问题而被忽略。还要注意,官方特意提到,所有的queue是不能被多次定义的。如果一个queue在开始时被声明为durable,那在后面再次声明这个queue时,即使声明为 not durable,那这个queue的结果也还是durable的。
  • 然后,是中间件最为关键的分发方式。这里,RabbitMQ默认是采用的fair dispatch,也叫round-robin模式,就是把消息轮询,在所有consumer中轮流发送。这种方式,没有考虑消息处理的复杂度以及consumer的处理能力。而他们改进后的方案,是consumer可以向服务器声明一个prefetchCount,我把他叫做预处理能力值。channel.basicQos(prefetchCount);表示当前这个consumer可以同时处理几个message。这样服务器在进行消息发送前,会检查这个consumer当前正在处理中的message(message已经发送,但是未收到consumer的basicAck)有几个,如果超过了这个consumer节点的能力值,就不再往这个consumer发布。
    这种模式,官方也指出还是有问题的,消息有可能全部阻塞,所有consumer节点都超过了能力值,那消息就阻塞在服务器上,这时需要自己及时发现这个问题,采取措施,比如增加consumer节点或者其他策略
    *Note about queue size
    If all the workers are busy, your queue can fill up. You will want to keep an eye on that, and maybe add more workers, or have some other strategy.*
    另外 官网上没有深入提到的,就是还是没有考虑到message处理的复杂程度。有的message处理可能很简单,有的可能很复杂,现在还是将所有message的处理程度当成一样的。还是有缺陷的,但是目前也只看到dubbo里对单个服务有权重值的概念,涉及到了这个问题。

四、消息的丢失      

1、producer生产者丢失消息
        原因:生产者发送消息由于网络等原因并没有发送到RabbitMq
解决方案:
        1.1、开启RabbitMq事务机制
        生产者发送数据之前开启 RabbitMQ 事务channel.txSelect,然后发送消 息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit,类似我们数据库数据库事务机制。

        1.2、开启 confirm 模式
        在生产者端设置开启 confirm 模式之后,你每次写的消息都会分配一个唯一的 ID,然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack 消息,告诉你说这个消息已经收到。如果 RabbitMQ 没能处理这个消息,会回调你的一个 nack 接口,告诉你这个消息接收失败,你可以重试。而且可以结合这个机制在自己业务里维护每个消息 ID 的状态,如果超过一定时间还没接收到这个消息的回调,那么可以业务主动重发。

        事务机制和 confirm 机制优劣:
        事务机制是同步的,提交一个事务之后会阻塞,吞吐量会下来,耗性能。
        confirm 机制是异步的,流程不会阻塞,吞吐量较高,性能较好。

2、broker消息中间件自身丢失消息
        原因:RabbitMq收到生产者的消息后还没有来得及持久化到磁盘,又或者创建队列没有持久化以及消息并没有设置为持久化,在Mq故障宕机后都会有消息丢失的情况。
        解决方案:
        2.1、创建队列queue的时候设置队列持久化
        2.2、mq配置deliveryMode == 2 消息持久化

        重点:必须同时设置队列持久化和消息持久化,再结合生产者的confrim模式,才能保证消息准确投递到broker并保证进入磁盘。

3、consumer消费者丢失消息
        原因:消费者自动ack配置情况下,业务代码异常或者其他故障消息并没有处理完成也会自动ack。RabbitMq消息ack后就会丢弃,这就导致异常情况下的消息丢失了。
        解决方案:
        3.1 关闭RabbitMq自动ack,业务代码成功消费了消息手动调用Mq ack,让Mq丢弃消息;如果业务代码异常则直接nack,让Mq重新推送消息进行处理。当然,在要求比较高的情况下也可以异常数据进入死信队列,保证数据的完整性。

        

五、消息的重复消费/消费顺序

        什么情况下会造成消息重复呢?

        假设我们的消费者在执行完任务后,准备显式调用 basicAck给RabbitMQ删除消息时。此时消费者宕机了。

        此时我们的RabbitMQ检测到自己队列所对应的消费者掉线了,就会将这个消息重新入队并等待下一个消费者连接进来后继续投递消费。那么问题就很明显了,上一个消费者已经消费了消息,而新连接的消费者再次消费了该消息,导致了重复消费。

        无论如何,手动提交的方式也已经解决了最大的麻烦(消息丢失的问题)。

QoS 预取模式(重复消息)
        在确认消息被接收之前,消费者可以预先要求接收一定数量的消息,在处理完一定数量的消息后,批量进行确认。如果消费者应用程序在确认消息之前崩溃,则所有在RabbitMQ中未确认的消息将被重新发送给其他消费者。很明显,这样也会导致重复消息的产生。

        我们不难发现,无论是生产者还是消费者,但凡是批量操作,总会让重复消息的概率大大提升。奈何批量操作的速度快啊,因此还是要用。

        我们发现在一个消息从生产者到消费者的过程中,至少有三种情况导致重复消息的产生。那么重复消息该如何处理呢?

消费者的事务
        当然,我们的消费者端也可以采用事务的方式来确保消息的准确性。但是依旧是效率问题导致这个方式非常鸡肋,不再赘述了。

消息幂等性处理重复消息
        让每个消息携带一个全局的唯一ID,即可保证消息的幂等性,具体消费过程为:

        消费者获取到消息后先根据id去查询redis/db是否存在该消息(或者放到内存容器中)。
        如果不存在,则正常消费,消费完毕后写入redis/db
        如果存在,则证明消息被消费过,直接丢弃。
原来说的那么高大上,无非就是交给我们消费者自己去做去重校验。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/410694.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Nacos简易示例

目录 步骤: 1. 下载并启动 Nacos Server 2. 创建用户订单微服务 2.1 创建 Spring Boot 项目 2.2 添加依赖 2.3 配置 Nacos 2.4 编写业务逻辑 3. 注册服务到 Nacos 4. 测试服务 Nacos 是一个开源的服务发现和配置管理系统,可以用于微服务架构中的…

QT数据库

八、数据库 准备工作 Qt本身并没有数据库功能,但是Qt支持调用其他主流的数据库产品。并且这些数据库产品指定了统一的Qt接口,实际上是一种数据库的中间件。 Qt支持以下数据库类型: 嵌入式常用的数据库式SQLite3,本体只有几兆大小&…

Linux进程 ----- 信号处理

前言 从信号产生到信号保存,中间经历了很多,当操作系统准备对信号进行处理时,还需要判断时机是否 “合适”,在绝大多数情况下,只有在 “合适” 的时机才能处理信号,即调用信号的执行动作。 一、信号的处理…

什么是代码签名证书中的“硬证书”?

代码签名证书是用于验证和签名软件程序的一种数字证书。使用代码签名证书,可以保护代码完整性、防止非法篡改,标识软件发行商的身份并确保软件来源可信。按不同验证级别,代码签名证书分为扩展验证型EV代码签名证书、企业验证型OV代码签名证书…

babylonjs入门模

基于babylonjs封装的一些功能和插件 ,希望有更多的小伙伴一起玩babylonjs; 欢迎加群:464146715 官方文档 中文文档 最小模版 ​ 代码如下: 在react中使用 import React, { FC, useCallback, useEffect, useRef, useState } f…

LINE封号全面解析:原因、判断方法与申诉渠道

在LINE中被封锁有两个方面:一是你被好友屏蔽,另一是遭到平台官方的封锁。通常用户会用“停权”来代表LINE的官方封锁,在实际操作上,所谓的停权并不意味着你的账户完全无法使用,只是没办法与好友发送消息,更…

聊一聊EGO-Planner膨胀系数的大小对无人机避障飞行的影响

EGO-Planner简介 EGO-Planner作为业界知名的无人机轨迹规划算法,其优势在于能够在复杂环境中快速规划出安全、平滑且动态可行的飞行轨迹。在这个算法中,膨胀系数发挥着关键作用。它通过扩大障碍物的感知范围,提供额外的安全边距,…

如何使用Lychee+cpolar搭建本地私人图床并实现远程访问存储图片

文章目录 1.前言2. Lychee网站搭建2.1. Lychee下载和安装2.2 Lychee网页测试2.3 cpolar的安装和注册 3.本地网页发布3.1 Cpolar云端设置3.2 Cpolar本地设置 4.公网访问测试5.结语 1.前言 图床作为图片集中存放的服务网站,可以看做是云存储的一部分,既可…

c++ Qt 网络连接

1、基础概念 1.1 TCP/UDP TCP 是一种面向连接的传输层协议,它能提供高可靠性通信(即数据无误、数据无丢失、 数据无失序、数据无重复到达的通信) 适用情况: 1.SN/QQ等即时通讯软件的用户登录账户管理相关的功能通常采用TCP协议 2、适合于对传输质量要求较…

网站开发--详解Servlet

💕"Echo"💕 作者:Mylvzi 文章主要内容:网站开发–详解Servlet 一.基本介绍 tomcat是Java中开发服务器的重要的一个工具,任何开发的服务器都要部署在tomcat之上,可以说tomcat是所有服务器的底座,为了更好的操作http,to…

python 进程笔记一 (概念+示例代码)

1. 进程的概念 进程是资源分配的最小单位,也是线程的容器,线程(python 线程 (概念示例代码))是CPU调度的基本单位,一个进程包括多个线程。 程序:例如xxx.py是一个程序 进程&#xf…

C++初阶 | [八] (下) vector 模拟实现

摘要:vector 模拟实现讲解(附代码示例),隐藏的浅拷贝,迭代器失效 在进行 vector 的模拟实现之前,我们先粗略浏览一下 stl_vector.h 文件中的源码来确定模拟实现的大体框架。 这里提供一些粗略浏览源码的技巧…

如何使用GAP-Burp-Extension扫描潜在的参数和节点

关于GAP-Burp-Extension GAP-Burp-Extension是一款功能强大的Burp扩展,该工具在getAllParams扩展的基础上进行了升级,该工具不仅可以帮助广大研究人员在安全审计过程中扫描潜在的参数,而且还可以搜索潜在的链接并使用这些参数进行测试&#…

HarmonyOS—代码Code Linter检查

Code Linter代码检查 Code-Linter针对ArkTS/TS代码进行最佳实践、编程规范方面的检查,目前还会检查ArkTS语法规则。开发者可根据扫描结果中告警提示手工修复代码缺陷,或者执行一键式自动修复,在代码开发阶段,确保代码质量。 检查…

Linux之项目部署与发布

目录 一、Nginx配置安装(自启动) 1.一键安装4个依赖 2. 下载并解压安装包 3. 安装Nginx 4. 启动 nginx 服务 5. 对外开放端口 6. 配置开机自启动 7.修改/etc/rc.d/rc.local的权限 二、后端部署tomcat负载均衡 1. 准备2个tomcat 2. 修改端口 3…

【Vue3】学习watch监视:深入了解Vue3响应式系统的核心功能(上)

💗💗💗欢迎来到我的博客,你将找到有关如何使用技术解决问题的文章,也会找到某个技术的学习路线。无论你是何种职业,我都希望我的博客对你有所帮助。最后不要忘记订阅我的博客以获取最新文章,也欢…

基于Java的艺培管理解决方案

✍✍计算机毕业编程指导师 ⭐⭐个人介绍:自己非常喜欢研究技术问题!专业做Java、Python、微信小程序、安卓、大数据、爬虫、Golang、大屏等实战项目。 ⛽⛽实战项目:有源码或者技术上的问题欢迎在评论区一起讨论交流! ⚡⚡ Java、…

抖音小程序获取手机号

1、* 手机号获取和登录需要分开 &#xff08;规定&#xff09; 2、 抖音小程序首先得先通过试运营 没有通过试运营的 会提示没有权限 getPhoneNumber:fail auth deny 3、上代码 <button class"phone toutiaoSq" v-if"!userInfo.phone && isLogin&…

[AutoSar]BSW_Com03 DBC详解 (一)

目录 关键词平台说明一、DBC 定义1.1 相关工具 二、主要组成部分介绍2.1 Networks2.2 ECUs2.3 Network nodes2.4 messages2.5 signal2.6 Value Tables 三、主要组成部分关系图 关键词 嵌入式、C语言、autosar、OS、BSW 平台说明 项目ValueOSautosar OSautosar厂商vector &am…

本地项目如何上传到gitee

文章目录 一、在gitee上新建远程仓库二、初始化本地仓库三、执行git命令上传代码 一、在gitee上新建远程仓库 仓库名称必填&#xff0c;路径自动跟仓库名称保持一致 解释说明&#xff1a; 仓库名称&#xff1a;必填&#xff0c;每个仓库都需要有一个名称&#xff0c;同一个码…
最新文章