消息中间件篇之Kafka-消息不丢失

一、 正常工作流程

        生产者发送消息到kafka集群,然后由集群发送到消费者。

        但是可能中途会出现消息的丢失。下面是解决方案。

二、 生产者发送消息到Brocker丢失

1. 设置异步发送

    //同步发送
    RecordMetadata recordMetadata = kafkaProducer.send(record).get();
    //异步发送
    kafkaProducer.send(record,new Callback() {
        @Override public void onCompletion (RecordMetadata recordMetadata, Exception e){
            if (e != null) {
                System.out.println("消息发送失败 | 记录日志");
            }
            long offset = recordMetadata.offset();
            int partition = recordMetadata.partition();
            String topic = recordMetadata.topic();
        }
    });

2.消息重试

//设置重试次数
prop.put(ProducerConfig.RETRIES_CONFIG,10);

三、消息在Brocker中存储丢失

        发送确认机制acks。消息首先Topic是key,到达Topic以后才选择分区Partition(默认就一个分区,0号分区),默认连接的就是分区的Leader节点,由leader分区同步到follower区中。

四、消费者从Brocker接收消息丢失

1.分区机制

        1. Kafka 中的分区机制指的是将每个主题划分成多个分区(Partition)。

        2. topic分区中消息只能由消费者组中的唯一一个消费者处理,不同的分区分配给不同的消费者(同一个消费者组)。

2.消费方式

        消费者默认是自动按期提交已经消费的偏移量,默认是每隔5s提交一次,如果出现重平衡的情况,可能会重复消费或丢失数据

3.那如何解决重复消费

        禁用自动提交偏移量,改为手动: 1. 同步提交。  2. 异步提交。 3. 同步+异步组合提交。

       

五、面试题

面试官:Kafka是如何保证消息不丢失?

候选人:嗯,这个保证机制很多,在发送消息到消费者接收消息,在每个阶段都有可能会丢失消息,所以我们解决的话也是从多个方面考虑:

第一个是生产者发送消息的时候,可以使用异步回调发送,如果消息发送失败,我们可以通过回调获取失败后的消息信息,可以考虑重试或记录日志,后边再做补偿都是可以的。同时在生产者这边还可以设置消息重试,有的时候是由于网络抖动的原因导致发送不成功,就可以使用重试机制来解决。

第二个在broker中消息有可能会丢失,我们可以通过kafka的复制机制来确保消息不丢失,在生产者发送消息的时候,可以设置一个acks,就是确认机制。我们可以设置参数为all,这样的话,当生产者发送消息到了分区之后,不仅仅只在leader分区保存确认,在follwer分区也会保存确认,只有当所有的副本都保存确认以后才算是成功发送了消息,所以,这样设置就很大程度了保证了消息不会在broker丢失。

第三个有可能是在消费者端丢失消息,kafka消费消息都是按照offset进行标记消费的,消费者默认是自动按期提交已经消费的偏移量,默认是每隔5s提交一次,如果出现重平衡的情况,可能会重复消费或丢失数据。我们一般都会禁用掉自动提价偏移量,改为手动提交,当消费成功以后再报告给broker消费的位置,这样就可以避免消息丢失和重复消费了。

面试官:Kafka中消息的重复消费问题如何解决的?

候选人:kafka消费消息都是按照offset进行标记消费的,消费者默认是自动按期提交已经消费的偏移量,默认是每隔5s提交一次,如果出现重平衡的情况,可能会重复消费或丢失数据。我们一般都会禁用掉自动提价偏移量,改为手动提交,当消费成功以后再报告给broker消费的位置,这样就可以避免消息丢失和重复消费了。

为了消息的幂等,我们也可以设置唯一主键来进行区分,或者是加锁,数据库的锁,或者是redis分布式锁,都能解决幂等的问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/412637.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

python Matplotlib Tkinter-->tab切换1

环境 python:python-3.12.0-amd64 包: matplotlib 3.8.2 pillow 10.1.0 import matplotlib.pyplot as plt from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg, NavigationToolbar2Tk import tkinter as tk import tkinter.messagebox as messagebox import …

Unity发布webgl获取浏览器的URL

Unity发布webgl获取浏览器的URL Unity发布webgl之后获取浏览器的url 在unity中创建文件夹Plugins,然后添加添加文件UnityGetBrowserURL.jslib var GetUrlFunc {//获取地址栏的URLStringReturnValueFunction: function () {var returnStr window.top.location.hre…

软件无线电SDR加人工智能算法实现无人机频谱探测

通用软件无线电接收机作为传感器实时接收探测无线电信号,加上深度学习算法实现频谱识别,(https://img-blog.csdnimg.cn/5a6c4d89a047453a94f763f4e67aeb17.png)

折腾Chrome插件,让内容脚本与文本交互~

我们要用内容脚本(content scripts)来给插件装上一双慧眼,让它能在你浏览的页面上跳来跳去,和文字做游戏。这就像给插件喂了一颗智慧豆,让它变得聪明起来,能够直接和网页内容打招呼啦! 在本章&…

学成在线_nacos配置_无法连接到nacos上的配置文件

问题 nacos配置完成后启动程序控制台提示无法连接到数据库 问题原因 无法连接到数据库实际上是没能成功找到nacos上的配置。因为自己的bootstrap文件的拓展名为.yml而不是.yaml。 解决方案 确保nacos上、配置文件中以及配置文件本身的拓展名都是yaml nacos, 如果不是重新创…

2024程序员容器化上云之旅-第2集-Ubuntu-WSL2-Windows11版:接近深洞

故事梗概 Java程序员马意浓在互联网公司维护老旧电商后台系统。 渴望学习新技术的他在工作中无缘Docker。 他开始自学Vue3并使用SpringBoot3完成了一个前后端分离的Web应用系统,并打算将其用Docker容器化后用K8s上云。 3 挑选工具 马意浓画好架构图后&#xff…

动态规划-最长公共子串(c)

动态规划 动态规划(dynamic programming)是一种算法设计方法。基本思想是在对一个问题的多阶段决策中,按照某一顺序,根据每一步所选决策的不同,会引起状态的转移,最后会在变化的状态中获取到一个决策序列。…

JAVA学习笔记11

1.标识符 1.1 标识符的命名规则和规范 1.1.1 标识符概念 ​ 1.Java对各种变量、方法和类等命名时使用的字符序列称为标识符 ​ 2.凡是自己可以起名字的地方都叫标识符 int num1 90。 1.1.2 标识符的命名规则(必须遵守) ​ 1.由26个英文字母、数字…

golang学习6,glang的web的restful接口传参

1.get传参 //get请求 返回json 接口传参r.GET("/getJson/:id", controller.GetUserInfo) 1.2.接收处理 package controllerimport "github.com/gin-gonic/gin"func GetUserInfo(c *gin.Context) {_ c.Param("id")ReturnSucess(c, 200, &quo…

redis八股

文章目录 数据类型字符串实现使用场景 List 列表实现使用场景 Hash 哈希实现使用场景 Set 集合实现使用场景 ZSet 有序集合实现使用场景 BitMap实现使用场景 Stream使用场景pubsub为什么不能作为消息队列 数据结构机制SDS 简单动态字符串压缩列表哈希表整数集合跳表quicklistli…

Vue3的8大生命周期

查看本专栏目录 关于作者 还是大剑师兰特:曾是美国某知名大学计算机专业研究生,现为航空航海领域高级前端工程师;CSDN知名博主,GIS领域优质创作者,深耕openlayers、leaflet、mapbox、cesium,canvas&#x…

主从复制实现Redis集群

主从复制实现Redis集群实验 (一主二从): 实验环境: 使用Docker 搭建 Redis 版本 5.0.5 打开一个终端窗口,在其中运行如下命令创建一个名为redis-master的Redis容器。注意,它的端口是6379 (本地的端口:映射到容器的端口) docker run -itd--name redis-m…

Shopify使用元字段对产品和产品变体进行自定义配置

引言 在Shopify上运营电子商务店铺时,有时您可能需要对产品和产品变体进行自定义配置。这可以包括添加额外的属性、规格或其他自定义字段,以满足特定的业务需求。Shopify提供了元字段的功能,使您能够灵活地自定义产品和产品变体的配置。在本…

[Docker 教学] 常用的Docker 命令

Docker是一种流行的容器化技术。使用Docker可以将数据科学应用程序连同代码和所需的依赖关系打包成一个名为镜像的便携式工件。因此,Docker可以简化开发环境的复制,并使本地开发变得轻松。 以下是一些必备的Docker命令列表,这些命令将在你下一…

基于YOLOv8/YOLOv7/YOLOv6/YOLOv5的动物识别系统(Python+PySide6界面+训练代码)

摘要:本博客文章深入解析了基于深度学习的动物识别系统的完整代码,并展示了采用领先的YOLOv8算法的实现代码。该系统与YOLOv7、YOLOv6、YOLOv5等早期版本的性能进行了比较,可以从静态图像到实时视频流的各种媒介中识别动物的高效性和准确性。…

html中的meta 元信息

html中的meta 元信息 1. 配置字符编码 <meta charset"utf-8">2. 针对 IE 浏览器的兼容性配置。 <meta http-equiv"X-UA-Compatible" content"IEedge">3. 针对移动端的配置 <meta name"viewport" content"widt…

【Git教程】(四)版本库 —— 存储系统,存储目录,提交对象及其命名、移动与复制~

Git教程 版本库 1️⃣ 一种简单而高效的存储系统2️⃣ 存储目录&#xff1a;Blob 与 Tree3️⃣ 相同数据只存储一次4️⃣ 压缩相似内容5️⃣ 不同文件的散列值相同6️⃣ 提交对象7️⃣ 提交历史中的对象重用8️⃣ 重命名、移动与复制&#x1f33e; 总结 事实上&#xff0c;我们…

Flutter Version Manager (FVM): Flutter的版本管理终极指南

Flutter笔记 Flutter Version Manager (FVM) - 文章信息 - Author: 李俊才 (jcLee95) Visit me at: https://jclee95.blog.csdn.netEmail: 291148484163.com. Shenzhen ChinaAddress of this article:https://blog.csdn.net/qq_28550263/article/details/136300307 my-websit…

git push 总是需要输入密码或者个人访问令牌personal access token解决方案

文章目录 遇到问题解决方法 遇到问题 git push的时候总是需要输入密码或者个人访问令牌personal access token 解决方法 ChatGPT给出的解决方案&#xff0c;解决了我的问题。 如果在使用 git push 命令时总是需要输入个人访问令牌&#xff0c;这可能是因为您的 GitHub 账号…

CUDA编程 - 用向量化访存优化 - Cuda elementwise - Add(逐点相加)- 学习记录

Cuda elementwise - Add 一、简介1.1、ElementWise Add1.2、 float4 - 向量化访存 二、实践2.1、如何使用向量化访存2.1、简单的逐点相加核函数2.2、ElementWise Add float4&#xff08;向量化访存&#xff09;2.3、完整代码 一、简介 1.1、ElementWise Add Element-wise 操作…
最新文章