kafka如何保证消息不丢?

概述

我们知道Kafka架构如下,主要由 Producer、Broker、Consumer 三部分组成。一条消息从生产到消费完成这个过程,可以划分三个阶段,生产阶段、存储阶段、消费阶段。

图片

  • 产阶段: 在这个阶段,从消息在 Producer 创建出来,经过网络传输发送到 Broker 端。

  • 存储阶段: 在这个阶段,消息在 Broker 端存储,如果是集群,消息会在这个阶段被复制到其他的副本上。

  • 消费阶段: 在这个阶段,Consumer 从 Broker 上拉取消息,经过网络传输发送到Consumer上。

那么如何保证消息不丢我们可以从这三部分来分析。

消息传递语义

在深度剖析消息丢失场景之前,我们先来聊聊「消息传递语义」到底是个什么玩意?

所谓的消息传递语义是 Kafka 提供的 Producer 和 Consumer 之间的消息传递过程中消息传递的保证性。主要分为三种, 如下图所示:

  1. 1. 首先当 Producer 向 Broker 发送数据后,会进行 commit,如果 commit 成功,由于 Replica 副本机制的存在,则意味着消息不会丢失,但是 Producer 发送数据给 Broker 后,遇到网络问题而造成通信中断,那么 Producer 就无法准确判断该消息是否已经被提交(commit),这就可能造成 at least once 语义。

  2. 2. 在 Kafka 0.11.0.0 之前, 如果 Producer 没有收到消息 commit 的响应结果,它只能重新发送消息,确保消息已经被正确的传输到 Broker,重新发送的时候会将消息再次写入日志中;而在 0.11.0.0 版本之后, Producer 支持幂等传递选项,保证重新发送不会导致消息在日志出现重复。为了实现这个, Broker 为 Producer 分配了一个ID,并通过每条消息的序列号进行去重。也支持了类似事务语义来保证将消息发送到多个 Topic 分区中,保证所有消息要么都写入成功,要么都失败,这个主要用在 Topic 之间的 exactly once 语义。 其中启用幂等传递的方法配置enable.idempotence = true。 启用事务支持的方法配置:设置属性 transcational.id = "指定值"

  3. 3. 从 Consumer 角度来剖析, 我们知道 Offset 是由 Consumer 自己来维护的, 如果 Consumer 收到消息后更新 Offset, 这时 Consumer 异常 crash 掉, 那么新的 Consumer 接管后再次重启消费,就会造成 at most once 语义(消息会丢,但不重复)。

  4. 4. 如果 Consumer 消费消息完成后, 再更新 Offset,如果这时 Consumer crash 掉,那么新的 Consumer 接管后重新用这个 Offset 拉取消息, 这时就会造成 at least once 语义(消息不丢,但被多次重复处理)。

总结: 默认 Kafka 提供「at least once」语义的消息传递,允许用户通过在处理消息之前保存 Offset的方式提供 「at mostonce」 语义。如果我们可以自己实现消费幂等,理想情况下这个系统的消息传递就是严格的「exactly once」, 也就是保证不丢失、且只会被精确的处理一次,但是这样是很难做到的。

接下来我们从生产阶段、存储阶段、消费阶段这几方面看下kafka如何保证消息不丢失。

生产阶段

通过深入解析Kafka消息发送过程我们知道Kafka生产者异步发送消息并返回一个Future,代表发送结果。首先需要我们获取返回结果判断是否发送成功。

// 异步发送消息,并设置回调函数 
producer.send(record, new Callback() { 
    @Override 
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        if (exception != null) { 
            System.err.println("消息发送失败: " + exception.getMessage()); 
        } else { 
            System.out.println("消息发送成功到主题: " + metadata.topic() + ", 分区: " + metadata.partition() + ", 偏移量: " + metadata.offset()); 
        } 
    } 
});

消息队列通过最常用的请求确认机制,来保证消息的可靠传递:当你的代码调用发消息方法时,消息队列的客户端会把消息发送到 Broker,Broker 收到消息后,会给客户端返回一个确认响应,表明消息已经收到了。客户端收到响应后,完成了一次正常消息的发送。

Producer(生产者)保证消息不丢失的方法:

  1. 1. 发送确认机制:Producer可以使用Kafka的acks参数来配置发送确认机制。通过设置合适的acks参数值,Producer可以在消息发送后等待Broker的确认。确认机制提供了不同级别的可靠性保证,包括:

    • • acks=0:Producer在发送消息后不会等待Broker的确认,这可能导致消息丢失风险。

    • • acks=1:Producer在发送消息后等待Broker的确认,确保至少将消息写入到Leader副本中。

    • • acks=all或acks=-1:Producer在发送消息后等待Broker的确认,确保将消息写入到所有ISR(In-Sync Replicas)副本中。这提供了最高的可靠性保证。

  2. 2. 消息重试机制:Producer可以实现消息的重试机制来应对发送失败或异常情况。如果发送失败,Producer可以重新发送消息,直到成功或达到最大重试次数。重试机制可以保证消息不会因为临时的网络问题或Broker故障而丢失。

Broker存储阶段

正常情况下,只要 Broker 在正常运行,就不会出现丢失消息的问题,但是如果 Broker 出现了故障,比如进程死掉了或者服务器宕机了,还是可能会丢失消息的。

在kafka高性能设计原理中我们了解到kafka为了提高性能用到了 Page Cache 技术.在读写磁盘日志文件时,其实操作的都是内存,然后由操作系统决定什么时候将 Page Cache 里的数据真正刷入磁盘。如果内存中数据还未刷入磁盘服务宕机了,这个时候还是会丢消息的。

为了最大程度地降低数据丢失的可能性,我们可以考虑以下方法:

  1.  持久化配置优化:可以通过调整 Kafka 的持久化配置参数来控制数据刷盘的频率,从而减少数据丢失的可能性。例如,可以降低 flush.messages 和 flush.ms 参数的值,以更频繁地刷写数据到磁盘。

  2.  副本因子增加:在 Kafka 中,可以为每个分区设置多个副本,以提高数据的可靠性。当某个 broker 发生故障时,其他副本仍然可用,可以避免数据丢失。

  3. 使用acks=all:在生产者配置中,设置 acks=all 可以确保消息在所有ISR(In-Sync Replicas)中都得到确认后才被视为发送成功。这样可以确保消息被复制到多个副本中,降低数据丢失的风险。

  4. 备份数据:定期备份 Kafka 的数据,以便在发生灾难性故障时可以进行数据恢复。

消费阶段

消费阶段采用和生产阶段类似的确认机制来保证消息的可靠传递,客户端从 Broker 拉取消息后,执行用户的消费业务逻辑,成功后,才会给 Broker 发送消费确认响应。如果 Broker 没有收到消费确认响应,下次拉消息的时候还会返回同一条消息,确保消息不会在网络传输过程中丢失,也不会因为客户端在执行消费逻辑中出错导致丢失。

  1. 自动提交位移:Consumer可以选择启用自动提交位移的功能。当Consumer成功处理一批消息后,它会自动提交当前位移,标记为已消费。这样即使Consumer发生故障,它可以使用已提交的位移来恢复并继续消费之前未处理的消息。

  2. 手动提交位移:Consumer还可以选择手动提交位移的方式。在消费一批消息后,Consumer可以显式地提交位移,以确保处理的消息被正确记录。这样可以避免重复消费和位移丢失的问题。

下面是手动提交位移的例子:

// 创建消费者实例
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

// 订阅主题
consumer.subscribe(Collections.singletonList(topic));

try {
    while (true) {
        // 消费消息
        ConsumerRecords<String, String> records = consumer.poll(100);

        for (ConsumerRecord<String, String> record : records) {
            // 处理消息逻辑
            System.out.println("消费消息:Topic = " + record.topic() +
                    ", Partition = " + record.partition() +
                    ", Offset = " + record.offset() +
                    ", Key = " + record.key() +
                    ", Value = " + record.value());

            // 手动提交位移
            TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
            OffsetAndMetadata offsetMetadata = new OffsetAndMetadata(record.offset() + 1);
            consumer.commitSync(Collections.singletonMap(topicPartition, offsetMetadata));
        }
    }
} catch (Exception e) {
    e.printStackTrace();
} finally {
    consumer.close();
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/389595.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【汇总】解决IndexedDB报Failed to execute ‘transaction‘ on ‘IDBDatabase‘

问题发现 再学习HTML5中&#xff0c;有介绍到 Web 存储&#xff0c;当代码编写完成后&#xff0c;运行报错 Failed to execute ‘transaction’ on ‘IDBDatabase’: One of the specified object stores was not found. 示例代码如下&#xff1a; <!DOCTYPE html> <…

【后端高频面试题--Nginx篇】

&#x1f680; 作者 &#xff1a;“码上有前” &#x1f680; 文章简介 &#xff1a;后端高频面试题 &#x1f680; 欢迎小伙伴们 点赞&#x1f44d;、收藏⭐、留言&#x1f4ac; 后端高频面试题--Nginx篇 往期精彩内容什么是Nginx&#xff1f;为什么要用Nginx&#xff1f;为…

HC05蓝牙模块

简介 HC-05 蓝牙串口通信模块&#xff0c;是基于 Bluetooth Specification V2.0 带 EDR 蓝牙协议的 数传模块。无线工作频段为 2.4GHz ISM&#xff0c;调制方式是 GFSK。模块最大发射功率为 4dBm&#xff0c;接收灵敏度-85dBm&#xff0c;板载 PCB 天线&#xff0c;可以实现 1…

HTTP协议-响应报文详解(Respond)

目录 前言&#xff1a; 1.Respond报文 1.1报文格式 1.2格式图解 2.状态行&#xff08;首行&#xff09; 2.1状态码/状态码解释 &#xff08;1&#xff09;200 OK &#xff08;2&#xff09;404 Not Found &#xff08;3&#xff09;403 Forbidden &#xff08;4&#…

【机器学习笔记】 6 机器学习库Scikit-learn

Scikit-learn概述 Scikit-learn是基于NumPy、 SciPy和 Matplotlib的开源Python机器学习包,它封装了一系列数据预处理、机器学习算法、模型选择等工具,是数据分析师首选的机器学习工具包。 自2007年发布以来&#xff0c;scikit-learn已经成为Python重要的机器学习库了&#xff…

C++中类的6个默认成员函数 【拷贝构造函数】

文章目录 拷贝构造函数的使用拷贝构造对于自定义类型【浅拷贝】深拷贝拷贝构造函数典型调用场景 拷贝构造函数的使用 在前几章学习对象的时候&#xff0c;我们有的时候需要一个与已存在对象一某一样的新对象 那在创建对象时&#xff0c;可否创建一个与已存在对象一某一样的新对…

数值类型的运算方式总结

提纲1&#xff1a;常见的位运算使用场景 提纲2&#xff1a;整数类型运算时的类型溢出问题&#xff0c;产生原因以及解决办法 提纲3&#xff1a;浮点类型运算时的精度丢失问题&#xff0c;产生原因以及解决办法 数值类型&#xff08;6种&#xff09;分为&#xff1a; 整型&…

简易绘图软件(水一期)

哈哈&#xff01; 1、编写代码&#xff1a; 代码&#xff1a; main: #include <graphics.h> #include <music.h> #include <heker.h> #pragma comment( linker, "/subsystem:\"windows\" /entry:\"mainCRTStartup\"" )using…

【python】python入门(输出)

本篇文章将会介绍关于python的常见输出&#xff0c;希望对您有帮助&#xff01; 输出 用到print函数 print(oh mygod)##或者 print("oh mygod")##或者 print("oh"" ""mygod") 输出结果&#xff1a; 用单引号、双引号都可以 ,引号中可…

单片机学习笔记---LCD1602

LCD1602介绍 LCD1602&#xff08;Liquid Crystal Display&#xff09;液晶显示屏是一种字符型液晶显示模块&#xff0c;可以显示ASCII码的标准字符和其它的一些内置特殊字符&#xff08;比如日文的片假名&#xff09;&#xff0c;还可以有8个自定义字符 显示容量&#xff1a;…

Linux 幻兽帕鲁服务器怎么上传存档文件?

通过控制台远程连接到 Linux 服务器后&#xff0c;你可以打开文件树&#xff0c;然后找到幻兽帕鲁存档位置&#xff0c;将存档压缩包上传到 Pal 目录中。 记得替换存档前要先停止服务。 2. 然后将 Saved.tar 文件解压&#xff0c;并完全替换新服务器上的 Saved 存档目录即可。 …

蓝桥杯:C++排序

排序 排序和排列是算法题目常见的基本算法。几乎每次蓝桥杯软件类大赛都有题目会用到排序或排列。常见的排序算法如下。 第(3)种排序算法不是基于比较的&#xff0c;而是对数值按位划分&#xff0c;按照以空间换取时间的思路来排序。看起来它们的复杂度更好&#xff0c;但实际…

真假难辨 - Sora(OpenAI)/世界模拟器的技术报告

目录 引言技术报告汉译版英文原版 引言 Sora是OpenAI在2024年2月15日发布的世界模拟器&#xff0c;功能是通过文本可以生成一分钟的高保真视频。由于较高的视频质量&#xff0c;引起了巨大关注。下面是三个示例&#xff0c;在示例之后给出了其技术报告&#xff1a; tokyo-wal…

博途PLC While指令编程应用(SCL代码)

FOR循环和While指令都可以实现循环控制。在循环体内部&#xff0c;你可以编写需要重复执行的代码。WhIile在每次循环开始之前&#xff0c;都会检查条件是否为真。如果条件为真&#xff0c;则执行循环体内的代码&#xff1b;如果条件为假&#xff0c;则跳出循环&#xff0c;继续…

Android Studio 实现图书借阅(管理)系统

&#x1f345;文章末尾有获取完整项目源码方式&#x1f345; 目录 前言 一、任务介绍 1.1 背景 1.2目的和意义 二、 实现介绍 视频演示 2.1 启动页实现 2.2 注册页面实现 2.3 登陆页面实现 2.4 图书列表的实现 2.5 当前借阅页面实现 2.6 我的页面实现…

你知道.NET的字符串在内存中是如何存储的吗?

一、字符串对象的内存布局 从“值类型”和“引用类型”来划分&#xff0c;字符串自然属于引用类型的范畴&#xff0c;所以一个字符串对象自然采用引用类型的内存布局。引用类型实例的内存布局总的来说整个内存布局分三块&#xff1a;ObjHeader TypeHandle Payload。对于一般…

如何在Windows中配置多个显示器?这里提供详细步骤

Windows可以通过多种方式使用多个显示器,扩展或复制主显示器。你甚至可以关闭主显示器。以下是如何使用简单的键盘快捷键更改辅助显示设置。 使用Windows+P投影菜单 要快速更改Windows 10处理多个显示器的方式,请按Windows+P。屏幕右侧会弹出一个名为“投影”的深灰色菜单。…

Codeforces Round 926 F. Sasha and the Wedding Binary Search Tree

F. Sasha and the Wedding Binary Search Tree 题意 给定一颗二叉搜索树&#xff0c;规定树上的所有点的点权都在范围 [ 1 , C ] [1, C] [1,C] 内&#xff0c;树上的某些节点点权已知&#xff0c;某些节点点权未知&#xff0c;求出合法的二叉搜索树的数量 思路 由于是二叉搜…

Web项目利用MybatisPlus进行分页查询

之前在写博客系统前台页面的时候&#xff0c;遇到了利用mp进行分页查询的情况&#xff0c;由于涉及到的知识点相对较为重要&#xff0c;固写一篇博客以此巩固。 一、功能需求 在首页和分类页面都需要查询文章列表。 首页&#xff1a;查询所有的文章分类页面&#xff1a;查询…

隐函数的求导【高数笔记】

1. 什么是隐函数&#xff1f; 2. 隐函数的做题步骤&#xff1f; 3. 隐函数中的复合函数求解法&#xff0c;与求导中复合函数求解法有什么不同&#xff1f; 4. 隐函数求导的过程中需要注意什么&#xff1f;
最新文章