详解Kafka分区机制原理|Kafka 系列 二

Kafka 系列第二篇,详解分区机制原理。为了不错过更新,请大家将本号“设为星标”。

点击上方“后端开发技术”,选择“设为星标” ,优质资源及时送达

上一篇文章介绍了 Kafka 的基本概念和术语,里面有个概念是 分区(Partition)。

kafka 将 一个Topic 中的消息分成多份,分别存储在不同的 Broker 里,这每一段消息被 kafka 称为分区,其中每条消息只会保存在一个分区中。

如果不太理解请回顾上一篇:

5b2ad62e786f5c79be317c3bd9772c58.jpeg

开始学习 Kafka,一文掌握基本概念|Kafka 系列 一

 

为什么有分区?

为什么要有分区呢?

Kafka 的分区机制的本质就是将一个大的 Topic 进行拆分,将一组很大的队列拆分成了多组队列。这样做有以下几个好处:

  1. 因为一个 Topic 中的消息可能非常多,多到一台Broker存不下,因此需要拆分成多段存储在不同的机器里,实现负载均衡。

  2. 拆分成多个队列,可以在多个生产者和消费者的情况下发挥多机性能,可以分流和并行处理消息,从而提高读写性能,提升系统的吞吐力。

  3. 有利于系统扩缩容,提高系统的可扩展性。不同分区在不同的broker上,可以通过增加新机器提高吞吐,并且增加新机器的时候可以通过调整分区的分布来调配负载。

0de38ad040a27e5ddabd8449fac6d55e.png

但是分区数不是越多越好,需要根据系统具体情况来设置。比如3个Broker就应该至少有3个分区,如果broker性能之间有差异,可以调大分区数进行调配。也可以通过broker的倍数来设置分区数,并且进行性能压测,测试集群的吞吐量。

分区数过多会带来资源管理上的消耗,清除日志时间变长,集群broker故障后分区leader重选时间变长,客户端消费端线程数需求增加,甚至导致连接所需的socket消耗增加。

分区策略

分区策略就是决定生产者将会把消息发送到具体哪个分区的算法,分区策略由 Partitioner 接口实现。

自定义分区策略

用于分区的 partition 方法定义如下:

/**
     * Compute the partition for the given record.
     *
     * @param topic topic名 The topic name
     * @param key 用于分区的key The key to partition on (or null if no key)
     * @param keyBytes 用于分区的序列号key The serialized key to partition on( or null if no key)
     * @param value 用于分区的值 The value to partition on or null
     * @param valueBytes 用于分区的序列号值 The serialized value to partition on or null
     * @param cluster 当前集群元数据 The current cluster metadata
     */
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

可以看出,这里提供了 Topic 和一些跟消息有关的key参数,cluster 是集群信息,包含Kafka 当前的Node 数据以及Topic、partition数据等。有了这些数据,具体拿到一条消息该发往哪个分区,我们就可以根据已有信息制定自己的分区策略。

# name of the partitioner class for partitioning events; default partition spreads data randomly
#partitioner.class=

我们实现了自定义的 Partition 类之后,就可以设置 partitioner.class 为目标策略类,Producer 就会按照我们的自定义策略来对消息进行分区。

默认分区策略

Kafka 提供了默认分区策略 DefaultPartitioner,策略内容如下:

  1. 如果在消息中指定了分区,优先使用指定的分区。

  2. 如果没有指定分区,但存在分区键,则根据序列化key使用murmur2哈希算法对分区数取模。

  3. 如果没有指定分区或分区键,则会使用粘性分区策略。(关于粘性分区策略后面讲解)

在实际生产中,我们一般都默认使用此策略,无需修改。

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    return partition(topic, key, keyBytes, value, valueBytes, cluster, cluster.partitionsForTopic(topic).size());
}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,
                         int numPartitions) {
    if (keyBytes == null) {
        return stickyPartitionCache.partition(topic, cluster);
    }
    // hash the keyBytes to choose a partition
    return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}

注意,这里指的分区键是序列化后的key,也就是变量 keyBytes,其他key、value、valueBytes 并没用到。

byte[] keyBytes = keySerializer.serialize(topic, record.headers(), record.key());
default byte[] serialize(String topic, Headers headers, T data) {
  // data 变量
    return serialize(topic, data);
}

看到 key 等序列化方法我们可以明白,key 的序列号值只受到 record.key() 的影响,所以同样的key会被固定分配到同样的partition中。(注意这里的key是指用于分区的key,而不是topic)

e16317cdfef524b9779fd048bbc2e591.png

粘性分区策略

实现类为 UniformStickyPartitioner ,他与默认分区策略的区别是:

  • DefaultPartitionerd 默认分区策略:如果有分区键的话,会按照分区键来决定分区,这个时候并不会使用粘性分区策略。

  • UniformStickyPartitioner粘性分区策略:无论有没有分区键,都用粘性分区来分配。

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    return stickyPartitionCache.partition(topic, cluster);
}

什么是粘性分区策略?

我们需要知道,在Producer在发送消息的时候,会将消息放到一个ProducerBatch中, 然后多条消息批量发送。这样可以减少网络请求次数,提高消息的发送效率。

所以批量发送消息有两个条件:

  1. 一个batch满了,与 batch.size有关,一般大小是16k。

  2. linger.ms时间到了。

满足任意一个条件,都会触发sender线程的发送。如果生产的消息较少,batch没有满,就必须等到等待时间到了,这就导致了较长的延迟。

因为ProducerBatch是多个,为了让消息尽可能快的发送,就需要让其中一个ProducerBatch先变满。

private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;

注意:一个分区对应一个双端队列Deque<ProducerBatch>>

粘性分区策略就是在相同的分区中,优先填满一个ProducerBatch,发送,再去填充另一个ProducerBatch。参见下图,第一个分区会被优先塞满并发送。

fb4f66d75ce57e9231f4ea8ed6c56c88.png

在一个 ProducerBatch 发送结束,选择新分区的时候,是随机选择的,之后便会继续优先填满新的分区。

  • 可用分区<1 ,所有分区中随机选择。

  • 可用分区=1,选择这个分区。

  • 可用分区>1,所有可用分区中随机选择。

public int nextPartition(String topic, Cluster cluster, int prevPartition) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        Integer oldPart = indexCache.get(topic);
        Integer newPart = oldPart;
        // Check that the current sticky partition for the topic is either not set or that the partition that 
        // triggered the new batch matches the sticky partition that needs to be changed.
        if (oldPart == null || oldPart == prevPartition) {
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() < 1) {
                Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
                newPart = random % partitions.size();
            } else if (availablePartitions.size() == 1) {
                newPart = availablePartitions.get(0).partition();
            } else {
                while (newPart == null || newPart.equals(oldPart)) {
                    int random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
                    newPart = availablePartitions.get(random % availablePartitions.size()).partition();
                }
            }
            // Only change the sticky partition if it is null or prevPartition matches the current sticky partition.
            if (oldPart == null) {
                indexCache.putIfAbsent(topic, newPart);
            } else {
                indexCache.replace(topic, prevPartition, newPart);
            }
            return indexCache.get(topic);
        }
        return indexCache.get(topic);
    }

轮询分区策略

Kafka 中提供了轮训策略的实现 RoundRobinPartitioner。当用户希望将写操作均匀地分发到所有分区时,可以使用此分区策略。

举例,有三个分区,针对于同一个producer,第一条消息发送到partition1,第二条消息发送到partition2,第三条发送到partition3,以此类推。

7372d4c65e8afd01c61335a18caddd6b.png
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    // 分区数
    int numPartitions = partitions.size();
    // 下一个自增值
    int nextValue = nextValue(topic);
    // 获取此主题的可用分区列表
    List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
    if (!availablePartitions.isEmpty()) {
        // topic可用分区不为空,取余
        int part = Utils.toPositive(nextValue) % availablePartitions.size();
        return availablePartitions.get(part).partition();
    } else {
        // 没有可用的分区,给出一个不可用的分区
        // no partitions are available, give a non-available partition
        return Utils.toPositive(nextValue) % numPartitions;
    }
}

hash 键的值并不会影响到数据的分布,这应该是数据均匀度最好的策略,可以保证消息最大程度的平均分配到所有分区。

除了官方提供的策略,我们还可以实现自己的分区策略,比如随机策略,实现起来也很简单;比如按照业务键去分区的策略;比如按照ip分区的策略等。

最后,欢迎大家提问和交流。

加入讨论群是升职加薪第一步!

回复:加群

9cc714a9272b10f02503d017f1df6ee5.jpeg

点赞是一种美德,如对您有帮助,欢迎评论和分享,感谢阅读!

实战总结|记一次消息队列堆积的问题排查

2023-07-18

445cd86f0993eb1268b6ff21b1866d61.jpeg

从二叉查找树到B*树,一文搞懂搜索树的演进!|原创

2023-05-23

69792976b8a1929fd83b907b1d130d17.jpeg

CAP、BASE理论真的很重要!|分布式事务系列(一)

2023-05-06

7a8d17512f5d9cd2d3cde5fa8560fac0.jpeg

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/70517.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

计算机竞赛 opencv python 深度学习垃圾图像分类系统

0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; opencv python 深度学习垃圾分类系统 &#x1f947;学长这里给一个题目综合评分(每项满分5分) 难度系数&#xff1a;3分工作量&#xff1a;3分创新点&#xff1a;4分 这是一个较为新颖的竞…

pytest结合 allure 打标记之的详细使用

前言 前面我们提到使用allure 可以生成漂亮的测试报告&#xff0c;下面就Allure 标记我们做详细介绍。 allure 标记 包含&#xff1a;epic&#xff0c;feature, story, title, testcase, issue, description, step, serverity, link, attachment 常用的标记 allure.feature…

Mysql中如果建立了索引,索引所占的空间随着数据量增长而变大,这样无论写入还是查询,性能都会有所下降,怎么处理?

索引所占空间的增长确实会对MySQL数据库的写入性能和查询性能造成影响&#xff0c;这主要是由于索引数据过多时会导致磁盘I/O操作变得非常频繁&#xff0c;从而使性能下降。为此&#xff0c;可以采取以下几种方式来减缓这种影响&#xff1a; 1. 限制索引的大小&#xff1a;可以…

PHP8定义字符串的方法-PHP8知识详解

字符串&#xff0c;顾名思义&#xff0c;就是将一堆字符串联在一起。字符串简单的定义方法是使用英文单引号&#xff08; &#xff09;或英文双引号&#xff08;" "&#xff09;包含字符。另外&#xff0c;还可以使用定界符定义字符串。本文还介绍了字符串的连接符。…

数据结构和算法三(排序)

列表排序 排序类型&#xff1a; 一、冒泡排序&#xff1a; 屏幕录制2023-07-25 13.05.12 def bubble_sort(li):exchangeFalseif len(li)<1:return lifor i in range(len(li)-1):for j in range(len(li)-i-1):if li[j]>li[j1]:li[j],li[j1]li[j1],li[j]print(li)exchangeT…

mac harbor的安装

harbor的安装 为什么要整这个呢&#xff0c;因为我在学习k8s&#xff0c;但是需要一个自己的镜像仓库。于是&#xff0c;最开始想到的就是在本地直接部署一个&#xff0c;还比较安全、快速。 直接下载了官方的项目&#xff0c;运行脚本发现出了异常&#xff0c;这种异常我已经…

项目知识点记录

1.使用druid连接池 使用properties配置文件&#xff1a; driverClassName com.mysql.cj.jdbc.Driver url jdbc:mysql://localhost:3306/book?useSSLtrue&setUnicodetrue&charsetEncodingUTF-8&serverTimezoneGMT%2B8 username root password 123456 #初始化链接数…

LiveNVR监控流媒体Onvif/RTSP功能-视频流水印如何叠加视频水印叠加动态图片叠加视频流时间示例

LiveNVR视频流水印如何叠加视频水印叠加动态图片叠加视频流时间示例 1、介绍2、摄像头OSD设置水印3、前端页面叠加4、视频流水印4.1、图片水印示例4.2、时间戳水印示例 5、RTSP/HLS/FLV/RTMP拉流Onvif流媒体服务 1、介绍 监控视频平台播放视频监控的时候&#xff0c;除了满足正…

SpringMVC的架构有什么优势?——控制器(三)

前言 「作者主页」&#xff1a;雪碧有白泡泡 「个人网站」&#xff1a;雪碧的个人网站 「推荐专栏」&#xff1a; ★java一站式服务 ★ ★ React从入门到精通★ ★前端炫酷代码分享 ★ ★ 从0到英雄&#xff0c;vue成神之路★ ★ uniapp-从构建到提升★ ★ 从0到英雄&#xff…

CentOS 7中,配置了Oracle jdk,但是使用java -version验证时,出现的版本是OpenJDK,如何解决?

1.首先&#xff0c;检查已安装的jdk版本 sudo yum list installed | grep java2.移除、卸载圈红的系统自带的openjdk sudo yum remove java-1.7.0-openjdk.x86_64 sudo yum remove java-1.7.0-openjdk-headless.x86_64 sudo yum remove java-1.8.0-openjdk.x86_64 sudo yum r…

STM32单片机蓝牙APP宠物自动喂食器定时语音提醒喂食系统设计

实践制作DIY- GC00162---蓝牙APP宠物自动喂食器 一、功能说明&#xff1a; 基于STM32单片机设计---蓝牙APP宠物自动喂食器 二、功能说明&#xff1a; STM32F103C系列最小系统板LCD1602显示器DS1302时钟模块5个按键语音播报模块ULN2003步进电机模块LED灯板HC-05蓝牙模块&#x…

XML方式AOP快速入门XML方式AOP配置详解

目录 1.XML方式AOP快速入门 1&#xff1a;导入AOP相关坐标 2&#xff1a;准备目标类&#xff0c;准备增强类&#xff0c;并配置给Spring管理 3&#xff1a;配置切点表达式&#xff08;那些方法要被增强&#xff09; 4&#xff1a;配置织入&#xff08;切点被哪些方法增强&…

C++初阶语法——类和对象

前言&#xff1a;C语言中的结构体&#xff0c;在C有着更高位替代者——类。而类的实例化叫做对象。 本篇文章不定期更新扩展后续内容。 目录 一.面向过程和面向对象初步认识二.类1.C中的结构体2.类的定义类的两种定义方式 3.类的访问限定符及封装访问限定符说明 4.类的实例化对…

Python中的诡异事:不可见字符!

文章目录 前言1. 起因2. 调查3. 高能4. 释惑 前言 今天分享一件很诡异的事情&#xff0c;我写代码的时候遇到了不可见的字符&#xff01;&#xff01;&#xff01; 1. 起因 今天在使用pipreqs导出项目中所依赖的库时突然报错了&#xff1a; pipreqs . --encodingutf-8 --forc…

Spring项目整合过滤链模式~实战应用

代码下载 设计模式代码全部在gitee上,下载链接: https://gitee.com/xiaozheng2019/desgin_mode.git 日常写代码遇到的囧 1.新建一个类,不知道该放哪个包下 2.方法名称叫A,干得却是A+B+C几件事情,随时隐藏着惊喜 3.想复用一个方法,但是里面嵌套了多余的逻辑,只能自己拆出来…

百川智能发布首个530亿参数闭源大模型,今年追上GPT-3.5

4月官宣创业&#xff0c;6月15日发布第一款7B开源模型&#xff0c;7月11日发布第二款13B、130亿参数开源模型。 平均保持2个月一个版本发布速度&#xff0c;8月8日&#xff0c;百川智能发布了创业以来的首个530亿参数闭源大模型——Baichuan-53B&#xff08;以下简称“53B”&a…

Android Https

本质&#xff1a;在客户端和服务端使用非对称加密协商出一套对称密钥&#xff0c;每次发送数据前加密&#xff0c;收到后解密&#xff0c;达到加密传输 http ssl 在http之下增加了安全层&#xff0c;用于保障http的加密传输 HTTPS连接 TLS连接步骤 1.客户端发送 client h…

Ubuntu 20.04 安装 Stable Diffusionn

步骤 1&#xff1a;安装 wget、git、Python3 和 Python3虚拟环境&#xff08;如果已安装可忽略这步骤&#xff09; sudo apt install wget git python3 python3-venv步骤 2&#xff1a;克隆 SD 项目到本地 git clone https://github.com/AUTOMATIC1111/stable-diffusion-webu…

Could not resolve host: mirrorlist.centos.org; Unknown error解决方法

今天服务器安装完CentOS系统后&#xff0c;安装网络的时候&#xff0c;出现无法联网yum yum -y install net-tools 以上代码无法运行并报错&#xff0c;这里我要提醒大家&#xff0c;如果在初始安装的时候选中安装网络工具模块就不用在安装net-tools了&#xff0c;因为我选中…

C#在自动化领域的应用前景与潜力

人机界面&#xff08;HMI&#xff09;开发&#xff1a;使用C#开发人机界面软件&#xff0c;实现与自动化设备的交互和监控。C#的图形界面设计能力和丰富的控件库使得开发人员能够创建直观、易用的界面。 数据采集与处理&#xff1a;C#可以与各种传感器、设备进行数据通信和采集…