Kafka-服务端-副本机制

Kafka从0.8版本开始引入副本(Replica)的机制,其目的是为了增加Kafka集群的高可用性。

Kafka实现副本机制之后,每个分区可以有多个副本,并且会从其副本集合(Assigned Replica,AR)中选出一个副本作为Leader副本,所有的读写请求都由选举出的Leader副本处理。

剩余的其他副本都作为Follower副本,Follower副本会从Leader副本处获取消息并更新到自己的Log中。

我们可以认为Follower副本是Leader副本的热备份。

一般情况下,同一分区的多个副本会被均匀地分配到集群中的不同Broker上,当Leader副本的所在的Broker出现故障后,可以重新选举新的Leader副本继续对外提供服务。

通过这种方式提高了Kafka集群的可用性。

副本

在一个分区的Leader副本中会维护自身以及所有Follower副本的相关状态,而Follower副本只维护自己的状态。

此外,还有“本地副本”和“远程副本”两个概念需要读者注意,“本地副本”是指副本对应的Log分配在当前的Broker上,“远程副本”则是指副本对应的Log分配在其他的Broker上,在当前Broker上仅仅维护了副本的LEO等信息。

一个副本是“本地副本”还是“远程副本”与它是Leader副本还是Follower副本没有直接联系,如图所示。

在这里插入图片描述

分区

Kafka服务端使用Replica表示副本以及Replica中维护的信息,其中的partition字段指向了副本所属的分区。

服务端使用Partition表示分区,Partition负责管理每个副本对应的Replica对象,进行Leader副本的切换,ISR集合的管理以及调用日志存储子系统完成写入消息,它还提供了一些其他的辅助方法。

Partition中的核心字段的含义如下所述。

  • topic和partitionld:此Partition对象代表的Topic名称和分区编号。
  • localBrokerld:当前Broker的id,可以与replicald比较,从而判断指定的Replica对是否表示本地副本。
  • logManager:当前Broker上的LogManager对象。
  • zkUtils:操作ZooKeeper的辅助类。
  • leaderEpoch:Leader副本的年代信息。
  • leaderReplicaldOpt:该分区的Leader副本的id。
  • inSyncReplicas:Set[Replica]类型,该集合维护了该分区的ISR集合,ISR集合是AR集合的子集。
  • assignedReplicaMap:Pool[Int,Replica]类型,维护了该分区的全部副本的集合(AR集合)的信息。

Partition中的方法按照功能可以划分为下列五类。

  • 获取(或创建)Replica:getOrCreateReplica方法。
  • 副本的Leader/Follower角色切换:makeLeader方法和makeFollower方法。
  • ISR集合管理:maybeExpandIsr方法和maybeShrinkIsr()方法。
  • 调用日志存储子系统完成消息写入:appendMessagesToLeader()方法。
  • 检测HW的位置:checkEnoughReplicasReachOffset方法

创建副本

getOrCreateReplica()方法主要负责在AR集合(assignedReplicaMap)中查找指定副本的Replica对象,如果查找不到则创建Replica对象并添加到AR集合中管理。

如果创建的是Local Replica,还会创建(或恢复)对应的Log并初始化(或恢复)HW。

HW与Log.recoveryPoint类似,也会需要记录到文件中保存,在每个lig目录下都有一个replication-offset-checkpoint文件记录了此目录下每个分区的HW。

在ReplicaManager启动时会读取此文件到highWatermarkCheckpoints这个Map中,之后会定时更新replication-offset-checkpoint文件。

ISR集合管理

Partition除了对副本的Leader/Follower角色进行管理,还需要管理ISR集合。

随着Follower副本不断与Leader副本进行消息同步,Follower副本的LEO会逐渐后移,并最终追赶上Leader副本的LEO,此时该Follower副本就有资格进入ISR集合。

Partition.maybeExpandIsr()方法实现了扩张ISR集合的功能,其调用栈如图所示,它是在updateFollowerLogReadResults()方法中被调用的,在前面介绍DelayedFetch的处理流程时提到过此方法的功能,该方法用于处理来自Follower的FetchRequest。

追加消息

在分区中,只有Leader副本能够处理读写请求。

Partition.appendMessagesToLeader方法提供了向Leader副本对应的Log中追加消息的功能。

在前面介绍的DelayedProduce处理流程中,ReplicaManager.appendToLocalLog()方法就是基于此方法实现的。

ReplicaManager

在一个Broker上可能分布着多个Partition的副本信息,ReplicaManager的主要功能是管理一个Broker范围内的Partition信息。

ReplicaManager的实现依赖于前面介绍的日志存储子系统、DelayedOperationPurgatory、KafkaScheduler等组件,底层依赖于Partition和Replica。

在这里插入图片描述

ReplicaManager中各个字段的含义和功能如下所述。

  • logManager:LogManager对象,对分区的读写操作都委托给底层的日志存储子系统。
  • scheduler:KafkaSchedule对象,用于执行ReplicaManager中的周期性定时任务。在ReplicaManager中总共有三个周期性任务,它们分别是highwatermark-checkpoint任务、isr-expiration任务、isr-change-propagation任务。
  • controllerEpoch:记录KafkaController的年代信息,当重新选举Controller Leader时该字段值会递增。之后,在ReplicaManager处理来自KafkaController的请求时,会先检测请求中携带的年代信息是否等于controllerEpoch字段的值,这就避免接收旧Controller Leader发送的请求。这种设计方式在分布式系统中比较常见。
  • localBrokerld:当前Broker的id,主要用于查找Local Replica。
  • allPartitions:Pool[(String,Int),Partition]类型,其中保存了当前Broker上分配的所有Partition信息。这里需要注意Pool的valueFactory,当从Pool查找不到指定key时,则使用valueFactory创建一个默认value值放入Pool并返回。
  • replicaFetcherManager:在ReplicaFetcherManager中管理了多个ReplicaFetcherThread线程,ReplicaFetcherThread线程会向Leader副本发送FetchRequest请求来获取消息,实现Follower副本与Leader副本同步。ReplicaFetcherManager对象在ReplicaManager初始化时被创建,后面会详细介绍ReplicaFetcherManager与ReplicaFetcherThread的功能。
  • highWatermarkCheckpoints:Map[String,OffsetCheckpoint]类型,用于缓存每个log目录与OffsetCheckpoint之间的对应关系,OffsetCheckpoint记录了对应log目录下的replication-offset-checkpoint文件,该文件中记录了data目录下每个Partition的HW。ReplicaManager中的highwatermark-checkpoint任务会定时更新replication-offset-checkpoint文件的内容。
  • isrChangeSet:Set[TopicAndPartition]类型,用于记录ISR集合发生变化的分区信息。
  • delayedProducePurgatory、delayedFetchPurgatory:用于管理DelayedProduce和DelayedFetch的DelayedOperationPurgatory对象。
  • zkUtils:操作ZooKeeper的辅助类。

副本角色切换

在Kafka集群中会选举一个Broker成为KafkaController的Leader,它负责管理整个Kafka集群。

Controller Leader根据Partition的Leader副本和Follower副本的状态向对应的Broker节点发送LeaderAndIsrRequest,这个请求主要用于副本的角色切换,即指导Broker将其上的哪些分区的副本切换成Leader角色,哪些分区的副本切换成Follower介绍。

LeaderAndIsrRequest首先由KafkaAPis.handleLeaderAndIsrRequest()方法进行处理,其核心逻辑是通过ReplicaManager提供的becomeLeaderOrFollower方法实现的,而becomeLeaderOrFollower又依赖于上一小节介绍的Partition.makeLeader方法和makeFollower方法,上述调用关系如图所示。

在这里插入图片描述

在开始分析becomeLeaderOrFollower方法前,先来介绍一下LeaderAndIsrRequest和LeaderAndIsrResponse的格式,如图所示。在LeaderAndIsrRequest中比较重要的是partition_states集合这个字段,其中包含了每个分区的Leader副本所在的Brokerld、ISR集合、AR集合以及zk_version等信息。在LeaderAndIsrResponse的partitions集合字段中记录了每个分区的副本在当前Broker上的切换结果。

在这里插入图片描述

ReplicaManager.becomeLeaderOrFollower方法的主要逻辑是:获取(或创建)指定的Partition对象,根据partitionStates的信息对其切换成Leader/Follower的副本进行分类,并分别调用makeLeader和makeFollowers方法完成切换。

之后会启动highwatermark-checkpoint任务,然后关闭空闲的Fetcher线程,调用onLeadershipChange回调函数。

追加/读取消息

当Local Replica切换为Leader副本之后,就可以处理生产者发送的ProducerRequest,将消息写入到Log中。

在前面分析DelayedProduce的处理流程时,简单介绍了ReplicaManager.appendMessages方法,当时着重关注了与DelayedProduce相关的处理以及sendResponseCallback回调方法的实现。

这里详细分析appendToLocalLog方法的实现,它首先会检测消息要写入的Topic是否为Kafka的内部Topic(目前Kafka只有OffsetsTopic一个内部Topic),如果是内部Topic则需要检测是否允许对内部Topic进行追加,最终调用Partition.appendMessagesToLeader()方法完成消息追加。

appendToLocalLog方法的第二个参数记录了每个分区需要追加的消息集合。

消息同步

Follower副本与Leader副本同步的功能由ReplicaFetcherManager组件实现。

ReplicaFetcherManager继承了AbstractFetcherManager。

AbstractFetcherManager的继承和依赖关系如图所示。

在这里插入图片描述
在AbstractFetchManager中使用fetcherThreadMap字段(HashMap[BrokerAndFetcherld,AbstractFetcherThread]类型)管理AbstractFetcherThread,该Map的key值是BrokerAndFetcherld类型对象,其中封装了Broker的网络位置信息(brokerld、host、port等)以及对应的Fetcher线程的id。

AbstractFetcherManager中还提供了addFetcherForPartitions方法、removeFetcherForPartitions方法和shutdownldleFetcherThreads方法对fetcherThreadMap集合进行管理。

AbstractFetcherManager.addFetcherForPartitions()方法会让Follower副本从指定的offset开始与Leader副本进行同步。

该方法的参数涉及BrokerAndInitialOffset类,它其中封装了Broker的网络位置信息以及同步的起始offset。

具体的同步逻辑交由ReplicaFetcherThread线程处理。

在Follower发送ListOffsetRequest期间,新Leader可能不断追加消息,新Leader的LEO落后于Follower的LEO的场景得到改变,此时就不再需要进行截断操作了,Follower可以继续从其LEO与Leader进行同步。

这样新Leader与Follower的消息可能存在不一致的情况,如图所示。
在这里插入图片描述

关闭副本

当Broker接收到来自KafkaController的StopReplicaRequest请求时,会关闭其指定的副本,并根据StopReplicaRequest中的字段决定是否删除副本对应的Log。

在分区的副本进行重新分配、关闭Broker等过程中都会使用到此请求,但是需要注意的是,StopReplicaRequest并不代表一定会删除副本对应的Log,例如shutdown的场景下就没有必要删除Log。

而在重新分配Partition副本的场景下,就需要将旧副本及其Log删除。

先来介绍StopReplicaRequest、StopReplicaResponse的格式,如图所示。
在这里插入图片描述

StopReplicaRequest中的delete_partitions字段是一个boolean类型的值,表示是否要删除副本及其Log,partitions集合字段中记录待关闭的分区信息。

StopReplicaResponse的partitions集合记录了每个分区对应的处理结果。

API层接收到StopReplicaRequest后直接调用了ReplicaManager.stopReplicas()方法进行处理。

stopReplicas方法首先检查请求中的controllerEpoch值,之后停止指定分区的同步操作,最后遍历partitions集合根据delete_partitions的值决定是否对Log进行删除。

ReplicaManager中的定时任务

在ReplicaManager中总 共 有highwatermark-checkpoint、isr-expiration、isr-change-propagation三个定时任务。

highwatermark-checkpoint任务会周期性地记录每个Replica的HW并保存到其log目录中的replication-offset-checkpoint文件中。

isr-expiration任务会周期性地调用maybeShrinkIsr()方法检测每个分区是否需要缩减其ISR集合。

isr-change-propagation任务会周期性地将ISR集合发生变化的分区记录到ZooKeeper中。

如果检测到highwatermarkcheckpoint任务未启动,会调用startHighWaterMarksCheckPointThread方法启动highwatermark-checkpoint任务。

MetadataCache

MetadataCache是Broker用来缓存整个集群中全部分区状态的组件。

KafkaController通过向集群中的Broker发送UpdateMetadataRequest来更新其MetadataCache中缓存的数据,每个Broker在收到该请求后会异步更新MetadataCache中的数据。

MetadataCache中各字段的含义和功能如下所述。

  • cache:Map[String,Map[Int,PartitionStateInfo]]类型,记录了每个分区的状态,其中使用PartitionStatelnfo记录Partition的状态。
  • aliveBrokers:Map[Int,Broker]类型,记录了当前可用的Broker信息,其中使用Broker类记录每个存活Broker的网络位置信息(host、ip、port等)。
  • aliveNodes:Map[Int,Map[SecurityProtocol,Node]]类型,记录了可用节点的信息。

在开始分析Kafka处理UpdateMetadataRequest请求更新MetadataCache的流程之前,先来看一下UpdateMetadataRequest和UpdateMetadataResponse的格式,如图所示。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/344230.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

计算机网络-AAA原理概述

对于任何网络,用户管理都是最基本的安全管理要求之一,在华为设备管理中通过AAA框架进行认证、授权、计费实现安全验证。 一、AAA概述 AAA(Authentication(认证), Authorization(授权), and Accounting(计费))是一种管理框架&#…

鸿蒙APP的应用场景

鸿蒙APP可以用于多种场合和设备类型,这是鸿蒙系统的分布式能力和多终端适配的优势。以下是一些鸿蒙APP的应用场景,希望对大家有所帮助。北京木奇移动技术有限公司,专业的软件外包开发公司,欢迎交流合作。 1.智能手机和平板电脑&am…

桌面型物联网智能机器人设计(预告)

相关资料 桌面级群控机器人CoCube探索-2022--CSDN博客 视频: 能!有!多!酷!CoCube桌面级群控机器人 让我看看谁在SJTU里划水… 简要介绍 设计一个桌面型物联网智能机器人,以ESP32芯片为核心,配…

uniapp+uview封装上传图片组件(单张/多张)

uniappuview封装上传图片组件(单张/多张) 先看效果 1.uploadImg.vue <template><view class"uploadImg flex-d flex-wrap"><!-- 多张图片上传 --><view class"imgList imgArr flex-d flex-wrap" v-for"(item,index) in file…

Leetcode2806. 取整购买后的账户余额

Every day a Leetcode 题目来源&#xff1a;2806. 取整购买后的账户余额 解法1&#xff1a;数学 题目要求为将 purchaseAmount 四舍五入到最近的 10 的倍数作为 roundedAmount&#xff0c;计算 100−roundedAmount 的值并返回。 分类讨论即可。 代码&#xff1a; /** lc…

Leetcode—24. 两两交换链表中的节点【中等】

2023每日刷题&#xff08;八十七&#xff09; Leetcode—24. 两两交换链表中的节点 实现代码 /*** Definition for singly-linked list.* struct ListNode {* int val;* ListNode *next;* ListNode() : val(0), next(nullptr) {}* ListNode(int x) : val(x),…

Java零基础学习19:集合

编写博客目的&#xff1a;本系列博客均根据B站黑马程序员系列视频学习和编写目的在于记录自己的学习点滴&#xff0c;方便后续回忆和查找相关知识点&#xff0c;不足之处恳请各位有缘的朋友指正。 一、集合和数组的对比 数组和集合很相似&#xff0c;但集合只能存储引用数据类…

MySQL两个表的亲密接触-连接查询的原理

MySQL对于被驱动表的关联字段没索引的关联查询&#xff0c;一般都会使用 BNL 算法。如果有索引一般选择 NLJ 算法&#xff0c;有 索引的情况下 NLJ 算法比 BNL算法性能更高。 关系型数据库还有一个重要的概念&#xff1a;Join&#xff08;连接&#xff09;。使用Join有好处&…

【操作系统】实验四 增加Linux系统调用

&#x1f57a;作者&#xff1a; 主页 我的专栏C语言从0到1探秘C数据结构从0到1探秘Linux &#x1f618;欢迎关注&#xff1a;&#x1f44d;点赞&#x1f64c;收藏✍️留言 &#x1f3c7;码字不易&#xff0c;你的&#x1f44d;点赞&#x1f64c;收藏❤️关注对我真的很重要&…

Zabbix 整合 Prometheus:案例分享与操作指南

一、简介 Zabbix 和 Prometheus 都是流行的开源监控工具&#xff0c;它们各自具有独特的优势。Zabbix 主要用于网络和系统监控&#xff0c;而 Prometheus 则专注于开源的分布式时间序列数据库。在某些场景下&#xff0c;将这两个工具整合在一起可以更好地发挥它们的优势&#…

C语言进阶——数据结构之链表

前言 hello&#xff0c;大家好呀&#xff0c;我是Humble 在之前的两篇博客&#xff0c;我们学完了数据结构中的顺序表&#xff0c;还对它进行了一个应用&#xff0c;做了一个通讯录的小项目 那今天我们再来学习一个新的数据结构——链表 引入 我们来回忆一下顺序表 对于顺…

Spring Boot 初始(快速搭建 Spring Boot 应用环境)

提示&#xff1a; ① 通过下面的简介可以快速的搭建一个可以运行的 Spring Boot 应用&#xff08;估计也就2分钟吧&#xff09;&#xff0c;可以简单的了解运行的过程。 ② 建议还是有一点 Spring 和 SpringMVC的基础&#xff08;其实搭建一个 Spring Boot 环境不需要也没有关系…

C++多线程_std::future与std::promise

目录 1. 引言 2. promise/future的含义 std::future std::promise std::packaged_task std::async 处理异常 std::shared_future 实战&#xff1a;多线程实现快速排序 时钟与限定等待时间 参考: 1. 引言 在并发编程中&#xff0c;我们通常会用到一组非阻塞的模型&a…

共享wifi项目到底能不能做?

如今&#xff0c;互联网已经渗透到我们生活的方方面面&#xff0c;人们对WiFi的需求越来越大&#xff0c;已经成为人们不可或缺的一部分。在这样的背景下&#xff0c;共享WiFi项目应运而生&#xff0c;作为近年来兴起的创业选择&#xff0c;成为了越来越多创业者追逐的热门项目…

Servlet 与 MVC

主要内容 Servlet 重点 MVC 重点 Filter 重点 章节目标 掌握 Servlet 的作用 掌握 Servlet 的生命周期 掌握 JSP 的本质 掌握 MVC 的设计思想 掌握 Filter 的作用及使用场景 第一节 Servlet 1. Servlet 概念 Servlet 是在服务器上运行的能够对客户端请求进行处理&a…

微信小程序(八)图片的设定

注释很详细&#xff0c;直接上代码 上一篇 新增内容&#xff1a; 1.图片的三种常见缩放形式 2.图片全屏预览 源码&#xff1a; testImg.wxml <!-- 默认状态&#xff0c;不保证缩放比&#xff0c;完全拉伸填满容器 --> <image class"pic" mode"scaleTo…

Backtrader 文档学习-Target Orders

Backtrader 文档学习-Target Orders 1. 概述 sizer不能决定操作是买还是卖&#xff0c;意味着需要一个新的概念&#xff0c;通过增加小智能层可以决定买卖&#xff0c;即通过持仓份额可以决定买卖操作。 这就是策略中order_target_xxx方法族的作用。受zipline的方法的启发&am…

商城系统中30分钟未付款自动取消订单怎么实现(简单几种方法)

实现以上功能 方法1&#xff1a;定时任务批量执行 写一个定时任务&#xff0c;每隔 30分钟执行一次&#xff0c;列出所有超出时间范围得订单id的列表 AsyncScheduled(cron "20 20 1 * * ?")public void cancelOrder(){log.info("【取消订单任务开始】"…

Excel导出警告:文件格式和拓展名不匹配

原因描述&#xff1a; Content-Type 原因&#xff1a;Content-Type&#xff0c;即内容类型&#xff0c;一般是指网页中存在的Content-Type&#xff0c;用于定义网络文件的类型和网页的编码&#xff0c;决定文件接收方将以什么形式、什么编码读取这个文件&#xff0c;这就是经常…

【算法专题】动态规划之路径问题

动态规划2.0 动态规划 - - - 路径问题1. 不同路径2. 不同路径Ⅱ3. 珠宝的最高价值4. 下降路径最小和5. 最小路径和6. 地下城游戏 动态规划 - - - 路径问题 1. 不同路径 题目链接 -> Leetcode -62.不同路径 Leetcode -62.不同路径 题目&#xff1a;一个机器人位于一个 m …
最新文章