如何确保消息不会丢失

本篇文章大家还可以通过浏览我的博客阅读。如何确保消息不会丢失 - 胤凯 (oyto.github.io)


很多人刚开始接触消息队列的时候,最经常遇到的一个问题就是丢消息了。<!--more-->对于大部分业务来说,丢消息意味着丢数据,是完全无法接受的。

现在很多主流的消息队列都实现了完善的消息可靠性保证机制,即使发生网络中断或者硬件故障,也能确保消息的可靠传递,不丢消息。

所以,绝大部分丢消息的原因都是开发者不熟悉消息队列,没有正确地使用和配置造成的。下面我们一起来了解下消息队列是如何保证消息可靠传递的,只要熟知原理,就能很快知道如何配置消息队列,写出可靠的代码,避免消息丢失。

检查消息队列的方法

用消息队列最尴尬的情况不是丢消息,而是丢了消息还不知道。对于一个刚刚上线的系统,各方面肯定都不是很稳定,这个时候就特别许需要监控系统中是否有消息丢失的情况。

如果是对于一些基础设施比较完善的公司,可以使用分布式链路追踪系统,很方便地追踪每一条消息。如果没有的话,下面提供一种简答的方法,来检查是否有消息丢失的情况。

我们可以使用消息队列的有序性来验证是否有消息丢失。原理很简单,在 Producer 端,我们给每个发出的消息附加一个连续递增的序列号,在 Consumer 端来检查这个序列号的连续性。

如果没有消息队列,Consumer 端收到消息的序列号必然是递增的,或者说收到的消息,其中的序号必然是上一条消息的序号 + 1.如果检测到序号不连续,那就是丢消息了。还可以通过缺失的序号确定丢失的是哪条消息,方便进一步排查原因和补救。

大多数消息队列的客户端都支持拦截器,可以利用这个拦截器机制,在 Producer 发送消息之前的拦截器中将序号注入到消息中,在 Consumer 收到消息的拦截器中检测序号的连续性。这样实现的好处是:消息检测的代码不会入侵到业务代码中,待系统稳定后,也方便将这部分代码检测的逻辑关闭或者删除。

如果在一个分布式系统中实现了这个检测机制,有以下几个问题需要注意:

  1. 像 Kafka 和 RocketMQ 这样的消息队列,它是不保证在 Topic 上的严格顺序的 ,只能保证队列或分区上的消息是有序的,所以我们在发送消息的时候要指定分区,并且每个分区单独检测消息序号的连续性。

  2. 如果系统中 Producer 是多实例的,由于多个 Producer 并不好协调彼此之间的发送顺序,所以每个 Producer 分别生成各自的消息序号,并且需要附加 Producer 标识,在 Consumer 端按照每个 Producer 分别来检测序号的连续性。

  3. Consumer 实例的数量最好是和分区数量一致,做到 Consumer 和分区一一对应,这样会比较方便在 Consumer 内检测消息序号的连续性。

确保信息可靠传递

上面讲述了如何检测消息丢失,下面再来看看什么时候会发生消息丢失,该如何避免。

消息从生产到消费完成整个过程,可以分为下面三个阶段:

  • 生产阶段:在这个阶段,消息在 Producer 创建出来,经过网络传输发送到 Broker 端。

  • 存储阶段:在这个阶段,消息在 Broker 端存储,如果是集群,消息会在这个阶段被复制到其他的副本上。

  • 消费阶段:在这个阶段,Consumer 从 Broker 上拉取消息,经过网络传输发送到 Consumer 上。

1、生产阶段

在生产阶段,消息队列通过最常用的请求确认机制来保证消息的可靠传递:代码中调用发送消息的方法时,消息队列的客户端会把消息发送到 Broker,Broker 收到消息后,会给客户端返回一个确认响应,表明消息已经收到。客户端收到响应后,一次正常的消息发送就完成了。

只要 Producer 收到了 Broker 的确认响应,就可以保证消息在生产阶段不会丢失。有些消息队列在长时间没收到确认响应后,会自动重试,如果还是失败,就会以返回值或者异常的方式告知调用方。

在编写发送消息代码时,通过正确处理返回值或者捕获异常,就可以保证这个阶段的消息不会丢失了。以 Kafka 为例,我们看一下如何可靠地发送消息:

同步发送时,只要注意捕获异常即可。

partition, offset, err := producer.SendMessage(message)
if err != nil {
    fmt.Println("消息发送失败:", err)
} else {
    fmt.Printf("消息发送成功,分区:%d, 偏移:%d\n", partition, offset)
}

异步发送时,需要去异步检查返回值,并进行处理:

producer.Input() <- message
​
select {
case <-producer.Successes():
    fmt.Println("消息发送成功")
case err := <-producer.Errors():
    fmt.Println("消息发送失败:", err.Err)
}
2、存储阶段

在存储阶段,只要 Borker 不出现故障,比如进程死掉了或者服务器宕机了,就不会出现丢失消息的问题。但如果出现了的话, 还是可能会丢失消息的。

如果对于消息的可靠性要求非常高,可以通过配置 Broker 参数来避免因为宕机丢失消息

对于单个节点的 Broker,需要配置 Broker 参数,在收到消息后将消息写入磁盘,再给 Producer 返回确认响应。这样即使发生宕机,由于消息已经写入磁盘,恢复后还可以继续消费。例如,在 RocketMQ 中,需要将刷盘方式 flushDiskType 配置为 SYNC_FLUSH 同步刷盘。

如果 Borker 是由多个节点组成的集群,需要将 Borker 集群配置成:至少消息发送到 2 个以上的节点,再给客户端回复发送确认响应。这样当某个 Broker 宕机时,其他的 Broker 可以替代宕机的 Broker,也不会造成消息的丢失。

3、消费阶段

消费阶段采用和生产阶段类似的确认机制来保证消息的可靠传递。客户端从 Broker 拉取消息后,执行用户的消费业务逻辑,成功后,才会给 Broker 发送消费确认响应。如果 Broker 没有收到消息的消费确认响应,下次拉消息的时候还是会返回同一条消息,以此来确保消息不会在网络传输过程中丢失,也不会因为客户端执行消费逻辑中出错导致丢失。

在编写代码的过程中,需要注意,不要在收到消息后立马返回消息确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认。

同样,我们使用 golang 语言消费 RabbitMQ 消息为例,看看如何实现一段可靠的消费代码:

forever := make(chan bool)
​
go func() {
    for d := range msgs {
        body := d.Body
        fmt.Printf(" [x] 收到消息 %s\n", body)
​
        // 在这里处理收到的消息
        // 你可以在这里调用 database.save(body) 来保存消息
​
        fmt.Println(" [x] 消费完成")
​
        // 完成消费业务逻辑后发送消费确认响应
        d.Ack(false)
    }
}()
​
log.Printf("等待消息。要退出,请按 CTRL+C")
<-forever

正确的顺序时,先把消息保存到数据库中,然后再发送消费确认。这样如果保存消息失败了,就不会执行消费代码,下次拉取的还是这条消息,直到消费成功。

小结

这一篇文章,先讲述了在系统中,如果检查消息队列消息丢失的情况,然后分析了一条消息从发送到消费成功的整个过程,以及消息队列是如何确保消息的可靠性,不会丢失的。这个过程可以分为分三个阶段,每个阶段都需要正确的编写代码并且设置正确的配置项,才能配合消息队列的可靠性机制,确保消息不会丢失。

  • 在生产阶段,你需要捕获消息发送的错误,并重发消息。

  • 在存储阶段,你可以通过配置刷盘和复制相关的参数,让消息写入到多个副本的磁盘上,来确保消息不会因为某个 Broker 宕机或者磁盘损坏而丢失。

  • 在消费阶段,你需要在处理完全部消费业务逻辑之后,再发送消费确认。

知道这几个阶段的原理后,如果再出现丢消息的情况,可以通过在代码中加一些日志的方式,很快定位到是哪个阶段出了问题,然后再进一步分析,快速找到问题的原因。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/158342.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C语言--给定一行字符串,获取其中最长单词【图文详解】

一.问题描述 给定一行字符串,获取其中最长单词。 比如&#xff1a;给定一行字符串&#xff1a; hello wo shi xiao xiao su 输出&#xff1a;hello 二.题目分析 “打擂台算法”&#xff0c;具体内容小伙伴们可以参考前面的内容。 三.代码实现 char* MaxWord(const char* str)…

CMakeLists.txt基础指令与cmake-gui生成VS项目的步骤

简介 本博客主要介绍cmake的基本指令&#xff0c;同时&#xff0c;很多使用Visual Studio小白从Gitbub下载项目源码后&#xff0c;看到CMakeLists.txt&#xff0c;不知道如何使用Visual Studio编译源码&#xff1b;针对以上问题&#xff0c;做一下简单操作与解释&#xff0c;方…

c语言-数据结构-堆

目录 一、二叉树 1、二叉树的概念 2、完全二叉树和满二叉树 3、完全二叉树的顺序存储 二、堆 2、堆的概念与结构 3、堆的创建及初始化 4、堆的插入&#xff08;小堆&#xff09; 5、堆的删除 6、显示堆顶元素 7、显示堆里的元素个数 8、测试堆的各个功能 9、 实现堆…

零代码编程:用ChatGPT批量转换多个视频文件夹到音频并自动移动文件夹

有很多个视频文件夹&#xff1a; 要全部转成音频&#xff0c;然后复制到另一个文件夹。 在ChatGPT中输入如下提示词&#xff1a; 你是一个Python编程专家&#xff0c;要完成一个批量将Mp4视频转为Mp3音频的任务&#xff0c;具体步骤如下&#xff1a; 打开文件夹&#xff1a;…

机器学习 天气识别

>- **&#x1f368; 本文为[&#x1f517;365天深度学习训练营](https://mp.weixin.qq.com/s/Nb93582M_5usednAKp_Jtw) 中的学习记录博客** >- **&#x1f356; 原作者&#xff1a;[K同学啊 | 接辅导、项目定制](https://mtyjkh.blog.csdn.net/)** >- **&#x1f680;…

matlab层次分析法模型及相关语言基础

发现更多计算机知识&#xff0c;欢迎访问Cr不是铬的个人网站 代码放在最后面! 这篇文章是学习层次分析法模型的笔记。 1.什么时候用层次分析法 层次分析法是建模比赛中最基础的模型之一&#xff0c;其主要用于解决评价类问题&#xff08;例如&#xff1a;选择哪种方案最好、…

Mysql数据库 16.SQL语言 数据库事务

一、数据库事务 数据库事务介绍——要么全部成功要么全部失败 我们把完成特定的业务的多个数据库DML操作步骤称之为一个事务 事务——就是完成同一个业务的多个DML操作 例&#xff1a; 数据库事务四大特性 原子性&#xff08;A&#xff09;&#xff1a;一个事务中的多个D…

ZYNQ7000---FLASH读写

提示&#xff1a;写完文章后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、Flash是什么&#xff1f;二、Flash的分类1、内部结构&#xff08;接口&#xff09;区分&#xff1a;2、外部接口区分&#xff1a;SPIQPSI Flash: QSPI 控制…

如何做好性能压测 —— 压测环境设计和搭建!

简介&#xff1a;一般来说&#xff0c;保证执行性能压测的环境和生产环境高度一致是执行一次有效性能压测的首要原则。有时候&#xff0c;即便是压测环境和生产环境有很细微的差别&#xff0c;都有可能导致整个压测活动评测出来的结果不准确。 1. 性能环境要考虑的要素 1.1 系…

SMART PLC星三角延时启动功能块(梯形图FC)

这里我们介绍SMART PLC星三角延时启动功能块,SMART PLC的周期定时器功能块请参考下面文章链接: 周期定时器FB_Cycle_time(SCL+梯形图代码)-CSDN博客文章浏览阅读80次。博途PLC定时器指令使用详细介绍请参考下面文章链接:博途PLC IEC定时器编程应用(SCL语言)_scl定时器-CS…

python环境安装教程

1.python解释器安装 python解释器&#xff1a;将书写的代码转换为二进制。 1.打开官网&#xff1a;Welcome to Python.org&#xff0c;点击下载&#xff0c;选择对应的系统和想要下载的python版本进行下载&#xff1a; 2.双击打开下载好的python解释器进行安装&#xff0c;可…

链表(一)----关于单链表的一切细节这里都有

一.链表 1 链表的概念及结构 概念&#xff1a;链表是一种物理存储结构上非连续、非顺序的存储结构&#xff0c;数据元素的逻辑顺序是通过链表中的指针链接次序实现的 。 现实中的链表结构 数据结构中的链表结构 1.链式结构在逻辑上是连续的&#xff0c;但在物理上不一定是…

开启数据库审计 db,extended级别或os级别)并将审计文件存放到/opt/oracle/audit/下

文章目录 1、登录到数据库2、查看审计状态3、创建审计目录4、启用审计5、设置审计文件路径5、再次查看结果 1、登录到数据库 使用SQL*Plus或者其他Oracle数据库客户端登录到数据库。 sqlplus / as sysdba;2、查看审计状态 show parameter audit;目前是DB状态&#xff0c;并且…

matplotlib 绘制双纵坐标轴图像

效果图&#xff1a; 代码&#xff1a; 由于使用了两组y axis&#xff0c;如果直接使用ax.legend绘制图例&#xff0c;会得到两个图例。而下面的代码将两个图例合并显示。 import matplotlib.pyplot as plt import numpy as npdata np.random.randint(low0,high5,size(3,4)) …

2023年最新十大地推拉新接单平台,都是一手单 官签渠道

2023年做拉新推广的地推人员&#xff0c;一定不要错过这十个接单平台&#xff0c;助你轻松找到一手单&#xff0c;这10个平台分别是 1. 聚量推客&#xff1a; “聚量推客”汇聚了众多市场上有的和没有的地推网推拉新接单项目&#xff0c;目前比较火热&#xff0c;我们做地推和…

【leaflet】学习笔记5 自定义控制层、多图层及其控制 重构

▒ 目录 ▒ &#x1f6eb; 导读开发环境 1️⃣ 重构data.js 数据抽取MyMap 面向对象编程继承MyMap类 2️⃣ d5. 自定义控制层、多图层及其控制示例效果自定义控制层多图层及其控制 &#x1f6ec; 文章小结&#x1f4d6; 参考资料 &#x1f6eb; 导读 开发环境 版本号描述文章…

电子病历编辑器源码(Springboot+原生HTML)

一、系统简介 本系统主要面向医院医生、护士&#xff0c;提供对住院病人的电子病历书写、保存、修改、打印等功能。本系统基于云端SaaS服务方式&#xff0c;通过浏览器方式访问和使用系统功能&#xff0c;提供电子病历在线制作、管理和使用的一体化电子病历解决方案&#xff0c…

CTFhub-RCE-过滤cat

查看当前目录&#xff1a;输入:127.0.0.1|ls 127.0.0.1|cat flag_42211411527984.php 无输出内容 使用单引号绕过 127.0.0.1|cat flag_42211411527984.php|base 64 使用双引号绕过 127.0.0.1|c""at flag_42211411527984.php|base64 使用特殊变量绕过 127.0.0.…

计算机毕业设计基于java+springboot+vue的实验室管理系统

项目介绍 系统中的功能模块主要是实现管理员&#xff1b;首页、个人中心、实验室管理、用户管理、实验室申请管理、设备管理、设备报备管理、设备申请管理、消耗品管理、消耗品领取管理、论坛管理、系统管理&#xff0c;用户前台&#xff1b;首页、实验室、设备、消耗品、论坛…

无需公众号实现微信JSSDK分享卡片!Safari浏览器分享到微信自动成卡片!

摘要 要在微信分享卡片&#xff0c;需要接入微信自家的JSSDK&#xff0c;比较麻烦&#xff0c;还需要认证公众号&#xff0c;但是如果你没有这样的条件&#xff0c;那么你也可以试试使用iOS的Safari浏览器轻松实现&#xff0c;只需要在html中加入3个meta即可。 代码 <!DO…
最新文章