探秘Kafka背后的幕后机关,揭示消息不丢失或重复的原理与实践经验

背景

相信大家在工作中都用过消息队列,特别是 Kafka 使用得更是普遍,业务工程师在使用 Kafka 的时候除了担忧 kafka 服务端宕机外,其实最怕如下这样两件事。

  • 消息丢失。下游系统没收到上游系统发送的消息,造成系统间数据不一致。比如,订单系统没有把成功状态的订单消息成功发送到消息队列里,造成下游的统计系统没有收到下单成功订单的消息,于是造成系统间数据的不一致,从而引起用户查看个人订单列表时跟实际不相符的问题。
  • 消息重复。相同的消息重复发送会造成消费者消费两次同样的消息,同样会造成系统间数据的不一致。比如,订单支付成功后会通过消息队列给支付系统发送需要扣款的金额,如果发送两次一样的扣款消息,而订单只支付了一次,而且还造成给用户余额多扣款的问题。

总结来说,这两个问题直接影响到业务系统间的数据一致性。那到底该如何避免这两个问题的发生呢?Kafka 针对这两个问题有系统的解决方案,需要服务端、客户端做相应的配置以及采取一些补偿方案。

因此,我会从生产端、服务端、消费端三个角度讲解 Kafka 是如何做到消息不丢或消息不重复的。当然,在这个过程中,为了有利于你更好的理解,在介绍的过程中我也会简单介绍一些 Kafka 的工作原理。

三种消息语义及场景

首先我要介绍一下“消息语义”的概念,这是理论基础,会有利于你更好地抓住下面解决方案的要点。

消息语义有三种,分别是:消息最多传递一次、消息最少传递一次、消息有且仅有一次传递,这三种语义分别对应:消息不重复、消息不丢、消息既不丢失也不重复。

这里的“消息传递一次”是指生产者生产消息成功,Broker 接收和保存消息成功,消费者消费消息成功。对一个消息来说,这三个要同时满足才算是“消息传递一次”。上面所说的那三种消息语义可梳理为如下。

  • 最多一次(At most once):对应消息不重复。消息最多传递一次,消息有可能会丢,但不会重复。一般运用于高并发量、高吞吐,但是对于消息的丢失不是很敏感的场景。
  • 最少一次(At least once):对应消息不丢。消息最少传递一次,消息不会丢,但有可能重复。一般用于并发量一般,对于消息重复传递不敏感的场景。
  • 有且仅有一次(Exactly once):每条消息只会被传递一次,消息不会丢,也不会重复。 用于对消息可靠性要求高,且对吞吐量要求不高的场景。

为便于你更好地对比理解和记忆,我汇总了如下一张表格:

三种消息语义各项对比表

3778f386e418f4b2c5a7411d9316c41b.jpeg


到这里,三种消息语义的定义和相关特点就介绍完了,接下来我们正式开始分析 Kafka 是如何做到消息不丢或消息不重的。

Kafka 如何做到消息不丢失?

我们先来讨论一下 Kafka 是如何做到消息不丢失的,也就是:生产者不少生产消息,服务端不丢消息,消费者也不能少消费消息

那怎么实现这kafka不丢失消息呢?就需要生产端、服务端和消费端做好以下配置

生产端:不少生产消息

以下是为了保证消息不丢失,生产端需要配置的参数和相关使用方法。

第一个,要使用带回调方法的 API,具体 API 方法如下:

Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)


使用带有回调方法的 API 时,我们可以根据回调函数得知消息是否发送成功,如果发送失败了,我们要进行异常处理,比如把失败消息存储到本地硬盘或远程数据库,等应用正常了再发送,这样才能保证消息不丢失。

第二个,设置参数 acks=-1。acks 这个参数是指有多少分区副本收到消息后,生产者才认为消息发送成功了,可选的参数值是 0、1 和 -1。

  • acks=0,表示生产者不等待任何服务器节点的响应,只要发送消息就认为成功。
  • acks=1,表示生产者收到 leader 分区的响应就认为发送成功。
  • acks=-1 ,表示只有当 ISR(ISR的含义文章下面会给大家介绍) 中的副本全部收到消息时,生产者才会认为消息生产成功了。这种配置是最安全的,因为如果leader 副本挂了,当follower副本被选为leader副本时,消息也不会丢失。但是系统吞吐量会降低,因为生产者要等待所有副本都收到消息才能再次发送消息。


第三个,设置参数 retries=3。参数 retries 表示生产者生产消息的重试次数。 retries=3是一个建议值,一般情况下能满足足够的重试次数以至于能重试成功。但是如果重试失败了,对异常处理时就可以把消息保存到其他可靠的地方,如磁盘、数据库、远程缓存等,然后等到服务正常了再继续发送消息。


第四个,设置参数 retry.backoff.ms=300。retry.backoff.ms 指消息生产超时或失败后重试的间隔时间,单位是毫秒。如果重试时间太短,会出现系统还没恢复就开始重试的情况,进而导致再次失败,如果时间太长会造成。结合我个人经验来说,300 毫秒还是比较合适的。


只要上面四个要点配置对了,就基本可以保证生产端的生产者不少生产消息了


服务端:不丢消息

以下是为了保证服务端不丢消息,服务端需要配置的参数。

第一个,设置 replication.factor >1。replication.factor 这个参数表示分区副本的个数,这里我们要将其设置为大于 1 的数,这样当 leader 副本挂了,follower 副本还能被选为 leader 副本继续接收消息。


第二个,设置 min.insync.replicas >1

min.insync.replicas指ISR 最少的副本数量,原理同上,也需要大于 1 的副本数量保证消息不丢。

这里我们简单介绍下 ISR。ISR 集合一个分区的副本的集合,每个分区都有自己的一个 ISR 集合。但不是所有的副本都会在这个集合里。首先 leader 副本是在 ISR 集合里的,以及如果一个follower副本的消息没落后leader副本太长时间,这个follower副本也在ISR集合里,同时如果有一个follower副本落后leader副本太长时间就从 ISR 集合里淘汰出去。也就是说ISR里的副本数量是小于等于分区的副本数量的。


第三个,设置 unclean.leader.election.enable = false。unclean.leader.election.enable 指是否能把非 ISR 集合中的副本选举为 leader 副本。unclean.leader.election.enable = true,也就是说允许非 ISR 集合中的 follower 副本成为 leader 副本。如果设置成这样会有什么问题呢?下面我结合几个示意图来为你详细分析下这个问题。


假设 ISR 集合内的 follower1 副本和 ISR 集合外的 follower2 副本向 leader 副本拉取消息(如下图 1),也就是说这时 ISR 集合中就有两个副本,一个是 leader 副本,另一个是 follower1 副本,而follower2 副本由于网络或自身机器的原因已经落后 leader 副本很长时间,已经被踢出 ISR 集合。


332488d45cc1299c57aa732f70c668ae.jpeg

图 1:ISR 集合内外两个follower 副本拉取消息


突然 leader 和 follower1 这两个节点挂了(如图 2),会导致什么样的结果出现呢?


7d6580d0a60afa555db897471f6130f1.jpeg

图 2:ISR 集合内两个副本挂掉了


由于 unclean.leader.election.enable = true,而现在分区的副本能正常工作的仅仅剩下 follower2 副本,所以 follower2 最终会被选为新的 leader 副本并继续接收生产者发送的消息,如下图我们可以看到它接收了一个新的消息 5。


22fc851fcaaa1131fcbb18566ac8ac58.jpeg

图 3:follower2 副本被选为新的 leader 副本


如果这时 follower1 副本的服务恢复,又会发生什么情况呢?由于 follower 副本要拉取 leader 副本同步数据,首先要获取 leader 副本的信息,并感知到现在的 leader 副本的 LEO 比自己的还小,于是做了截断操作,这时 4 这个消息就丢了,这就造成了消息的丢失。


aa38fa4c17e41ccff38a69ea6a199e46.jpeg

图 4:follower1 副本服务恢复,消息 4 丢失


因此,我们一定要把 unclean.leader.election.enable 设置为 false,只有这样非 ISR 集合的副本才不会被选为分区的 leader 副本。但是这样做也降低了可用性,因为这个分区的副本没有 leader,就无法收发消息了,但是消息会发送到别的分区 leader 副本,也就是说分区的数量实际上减少了。


消费端:不能少消费消息

为了保证不丢失消息,消费者就不能少消费消息,该如何去实现呢?消费端需要做好如下的配置。


第一个,设置 enable.auto.commit=false。enable.auto.commit 这个参数表示是否自动提交,如果是自动提交会导致什么问题出现呢?


消费者消费消息是有两个步骤的,首先拉取消息,然后再处理消息。向服务端提交消息偏移量可以手动提交也可以自动提交。如果把参数enable.auto.commit设置为true指消息偏移量是由消费端的自动提交,由异步线程去完成的,业务线程无法控制。如果刚拉取了消息之后,业务处理还没进行完,这时提交了消息偏移量但是这时消费者挂了,这就造成还没进行完业务处理的消息的位移被提交了,下次再消费就消费不到这些消息,造成消息的丢失。因此,一定要设置 enable.auto.commit=false。 也就是手动提交消息偏移量。

第二步,手动提交偏移量的正确步骤。但是enable.auto.commit=false还不能完全满足消费端消息不丢的条件,还要有正确的手动提交偏移量的过程。具体如何操作呢?这里我们同样结合一个示意图来讲解,如下所示:


a6210ed9b7a1b0079135cd8b4b3ac38f.jpeg

避免少消费消息的偏移量提交方案


表示业务逻辑先对消息进行处理,再提交 offset。这种模式如果消费者在处理完消息后,提交 offset 前出现宕机,待消费者再上线时,还会处理未提交的那部分消息(这里是 2~7 这部分消息),但是这部分已经被消费者处理过了,也就是说这样做虽然避免了丢消息,但是会有重复消费的情况。

具体代码需要这么写:


List<String> messages = consumer.poll();
processMsg(messages);
consumer.commitOffset();


Kafka 如何做到消息不重复?

接下来我们讨论 Kafka 又是如何做到消息不重复的,同样也是生产端重复生产消息,服务端重复存储消息,消费者也不能重复消费消息


生产端:

生产端发送消息后,假如遇到网络问题,无法获得响应,生产端就无法判断该消息是否成功提交到了 Kafka,而我们一般会配置重试次数,但这样会引发生产端重新发送同一条消息,从而造成消息重复的发送。


对于这个问题,Kafka 0.11.0 的版本之前并没有什么解决方案,不过从 0.11.0 的版本开始,Kafka 给每个 producer 一个唯一 ID,并且在每条消息中生成一个 sequence num,sequence num是递增且唯一的这样就能对消息去重,达到 一个生产端不重复发送一条消息。但是这个方法是有局限性的,只在一个 生产端 内生产的消息有效,如果一个消息分别在两个 producer 发送就不行了,还是会造成消息的重复发送。但是这种可能性比较小,因为消息的重试一般会在一个生产端内进行。当然,对应一个消息分别在两个 producer 发送的请求我们也有方案,只是多做一些补偿的工作,我们可以为每一个消息分配一个全局id,并把全局id存放在远程缓存或关系型数据库里。这样在发送前判断一下是否已经发送过。


服务端:

服务端不会重复的存储消息,如果有重复消息也应该是生产端重复发送造成的,所以无需特别的配置。


消费端:

第一步,enable.auto.commit=false:

同样要避免自动提交偏移量,大家可以想象一种情况,消费端拉取消息和处理消息都完成了,但是自动提交偏移量还没提交这时消费端挂了,这时候kakfa消费组开始重平衡并把分区分给了另一个消费者,由于偏移量没提交新的消费者会重复拉取消息,最终造成重复消费消息。

第二步,单纯配成手动提交同样不能避免重复消费,需要消费端用正确姿势消费。先看下图这种情况:


587173d04a5f0941f9a735e3783d34c8.jpeg


避免重复消费的偏移量提交方案


消费者拉取消息后,先提交 offset 后再处理消息。在提交 offset 之后,业务逻辑处理消息之前出现宕机,待消费者重新上线时,就无法读到刚刚已经提交而未处理的这部分消息(这里对应图中 5~8 这部分消息),这就对应了不重复消费消息但是会有丢失消息的情况。


具体代码如下:

List<String> messages = consumer.poll();consumer.commitOffset();processMsg(messages);


总结

最后这里我也简单总结下这一讲分享的主要内容。首先我们介绍了消息的三个语义及其场景,接下来我们从 Kafka 生产端、服务端和消费端三个方面具体讲解了我们到底该如何配置才能实现消息不丢失以及消息不重复。在这个过程中,我们也同步解释了一些 Kafka 的原理知识,这样能你才能知其然并知其所以然。

Kafka 中消息不丢失、不重复很重要,就我个人经验来讲,我是公司专门负责消息队列的架构师,业务的同学除了消息队列服务端宕机外,对消息的丢失和重复消息非常敏感,因为直接影响到了业务。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/107145.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

PyTorch中grid_sample的使用方法

官方文档首先Pytorch中grid_sample函数的接口声明如下&#xff1a; torch.nn.functional.grid_sample(input, grid, modebilinear, padding_modezeros, align_cornersNone)input : 输入tensor&#xff0c; shape为 [N, C, H_in, W_in]grid: 一个field flow&#xff0c; shape为…

JAVA实现校园失物招领管理系统 开源

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、研究内容2.1 招领管理模块2.2 寻物管理模块2.3 系统公告模块2.4 感谢留言模块 三、界面展示3.1 登录注册3.2 招领模块3.3 寻物模块3.4 公告模块3.5 感谢留言模块3.6 系统基础模块 四、免责说明 一、摘要 1.1 项目介绍 基于VueSpri…

深入剖析SQL与NoSQL的优劣势,帮你决定最佳数据存储方案

你是否在为系统的数据库来一波大流量就几乎打满 CPU&#xff0c;日常 CPU 居高不下烦恼?你是否在各种 NoSQL 间纠结不定&#xff0c;到底该选用哪种最好?今天的你就是昨天的我&#xff0c;这也是我写这篇文章的初衷。 作为互联网从业人员&#xff0c;我们要知道关系型数据库…

蓝桥杯 第 2 场算法双周赛 第4题 通关【算法赛】c++ 优先队列 + 小根堆 详解注释版

题目 通关【算法赛】https://www.lanqiao.cn/problems/5889/learning/?contest_id145 问题描述 小蓝最近迷上了一款电玩游戏“蓝桥争霸”。这款游戏由很多关卡和副本组成&#xff0c;每一关可以抽象为一个节点&#xff0c;整个游戏的关卡可以抽象为一棵树形图&#xff0c;每…

群晖上搭建teamspeak3语音服务器

什么是 TeamSpeak &#xff1f; TeamSpeak &#xff08;简称 TS&#xff09;是一款团队语音通讯工具&#xff0c;但比一般的通讯工具具有更多的功能而且使用方便。它由服务器端程序和客户端程序两部分组成&#xff0c;如果不是想自己架设 TS 服务器&#xff0c;只需下载客户端程…

SQL Server Management Studio (SSMS)的安装教程

文章目录 SQL Server Management Studio (SSMS)的安装教程从Microsoft官网下载SQL Server Management Studio安装程序。选中安装程序右键并选择“以管理员的身份运行”选项选择安装目录&#xff0c;单击“安装”按钮开始安装过程安装成功界面安装完成后&#xff0c;您可以启动S…

LaTeX:在标题section中添加脚注footnote

命令讲解 先导包&#xff1a; \usepackage{footmisc} 设原标题为&#xff1a; \section{标题内容} 更改为&#xff1a; \section[标题内容]{标题内容\protect\footnote{脚注内容}} 语法讲解&#xff1a; \section[]{} []内为短标题&#xff0c;作为目录和页眉中的标题。…

Java面向对象(进阶)-- this关键字的使用

文章目录 一、引子&#xff08;1&#xff09; this是什么&#xff1f;&#xff08;2&#xff09;什么时候使用this1.实例方法或构造器中使用当前对象的成员2. 同一个类中构造器互相调用 二、探讨&#xff08;1&#xff09;问题&#xff08;2&#xff09;解决 三、this关键字&am…

Android framework服务命令行工具框架 - Android13

Android framework服务命令行工具框架 - Android13 1、framework服务命令行工具简介2、cmd 执行程序2.1 目录和Android.bp2.2 cmdMain 执行入口2.3 cmd命令 3、am命令工具&#xff0c;实质脚本执行cmd activity3.1 sh脚本3.2 activity服务注册3.3 onShellCommand执行 4、简易时…

Linux 系统调用IO口,利用光标偏移实现文件复制

用系统调用IO函数实现从一个文件读取最后2KB数据并复制到另一个文件中&#xff0c;源文件以只读方式打开&#xff0c;目标文件以只写的方式打开&#xff0c;若目标文件不存在&#xff0c;可以创建并设置初始值为0664&#xff0c;写出相应代码&#xff0c;要对出错情况有一定的处…

Peter算法小课堂—归并排序

位运算 << 这个符号相当于将一个数二进制往左移动几位&#xff0c;如(100110)2<<1(001100)2。相当于乘以2的k次方 >> 这个符号相当于将一个数二进制往右移动几位&#xff0c;如(100110)2<<1(0100110)2。相当于除以2的k次方 归并排序 先看一个视频…

macOS Sonoma 14.1正式版(23B74)发布(可下载黑白苹果镜像)

系统介绍 黑果魏叔苹果今天为 macOS Sonoma 推出了 14.1 版本更新&#xff0c;魏叔发现&#xff0c;本更新主要改善了 Apple Music 界面&#xff0c;设置中新增保修状态&#xff0c;并修复了多项错误内容。 根据苹果的新说明&#xff0c;这次的 Mac 更新不仅提供了一系列的改善…

asp.net教务管理信息系统VS开发sqlserver数据库web结构c#编程Microsoft Visual Studio计算机毕业设计

一、源码特点 asp.net 教务管理信息系统是一套完善的web设计管理系统&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。开发环境为vs2010&#xff0c;数据库为sqlserver2008&#xff0c;使用c#语言 开发 asp.net教务管理系统 应用技术&a…

数据链路层和DNS之间的那些事~

数据链路层&#xff0c;考虑的是两个节点之间的传输。这里面的典型协议也很多&#xff0c;最知名的就是“以太网”。我们本篇主要介绍的就是以太网协议。这个协议规定了数据链路层&#xff0c;也规定了物理层的内容。 目录 以太网帧格式 帧头 载荷 帧尾 DNS 从输入URL到…

(c语言进阶)字符串函数、字符分类函数和字符转换函数

一.求字符串长度 1.strlen() (1)基本概念 头文件&#xff1a;<string.h> (2)易错点&#xff1a;strlen()的返回值为无符号整形 #include<stdio.h> #include<string.h> int main() {const char* str1 "abcdef";const char* str2 "bbb&q…

Linux常见问题解决操作(yum被占用、lsb无此命令、Linux开机进入命令界面等)

Linux常见问题解决操作&#xff08;yum被占用、lsb无此命令、Linux开机进入命令界面等&#xff09; 问题一、新安装的Linux使用命令lsb_release提示无此命令&#xff0c;需先安装再使用 Linux安装lsb命令 lsb是Linux Standard Base的缩写&#xff08;Linux基本标准&#xff…

Centos7 安装和配置 Redis 5 教程

在Centos上安装Redis 5&#xff0c;如果是 Centos8&#xff0c;那么 yum 仓库中默认的 redis 版本就是 5&#xff0c;直接 yum install 即可。但如果是 Centos7&#xff0c;yum 仓库中默认的 redis 版本是 3 系列&#xff0c;比较老&#xff1a; 通过 yum list | grep redis 命…

Constellation 介绍:Chainlink 黑客马拉松

在 2020 年&#xff0c;Chainlink 举办了其第一次线上黑客马拉松。当时&#xff0c;DeFi 作为一个类别刚刚开始蓬勃发展&#xff0c;而 NFT 也只是刚刚起步。这次黑客马拉松吸引了来自 45 个国家的 1,000 多名注册参与者&#xff0c;并收到了来自 70 个项目提交。 从那时起&am…

【C++初探:简单易懂的入门指南】一

【C初探&#xff1a;简单易懂的入门指南】一 1. 命名空间1.1 命名空间的定义1.2 命名空间的使用方法 2. C的输入、输出2.1 为什么使用输入、输出要引用一个<iostream>的头文件&#xff1f;2.2 为什么代码里面开放了一个叫std的命名空间2.3 代码中出现的<<和>>…

基于SSM的航班订票管理系统的设计与实现

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;采用JSP技术开发 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#x…
最新文章