14、Kafka 请求是怎么被处理的

Kafka 请求是怎么被处理的

  • 1、处理请求的 2 种常见方案
    • 1.1、顺序处理请求
    • 1.2、每个请求使用单独线程处理
  • 2、Kafka 是如何处理请求的?
  • 3、控制类请求和数据类请求分离

无论是 Kafka 客户端还是 Broker 端,它们之间的交互都是通过 “请求 / 响应” 的方式完成的。比如,客户端会通过网络发送消息生产请求给 Broker,而 Broker 处理完成后,会发送对应的响应给到客户端。

Apache Kafka 自己定义了一组请求协议,用于实现各种各样的交互操作。比如常见的 PRODUCE 请求是用于生产消息的,FETCH 请求是用于消费消息的,METADATA 请求是用于请求 Kafka 集群元数据信息的。所有的请求都是通过 TCP 网络以 Socket 的方式进行通讯的。

下面会详细讨论一下 Kafka Broker 端处理请求的全流程。

1、处理请求的 2 种常见方案

1.1、顺序处理请求

缺陷是,吞吐量太差。由于只能顺序处理每个请求,因此,每个请求都必须等待前一个请求处理完毕才能得到处理。这种方式只适用于请求发送非常不频繁的系统。

1.2、每个请求使用单独线程处理

好处是,它是完全异步的,每个请求的处理都不会阻塞下一个请求。
缺陷是,为每个请求都创建线程的做法开销极大,在某些场景下甚至会压垮整个服务。还是那句话,这个方法只适用于请求发送频率很低的业务场景。

2、Kafka 是如何处理请求的?

kafka 处理请求使用 Reactor 模式。Reactor 模式是事件驱动架构的一种实现方式,特别适合应用于处理多个客户端并发向服务器端发送请求的场景

Reactor 模式的架构如下图所示:
在这里插入图片描述
从这张图中,我们可以发现,多个客户端会发送请求给到 Reactor。Reactor 有个请求分发线程 Dispatcher,也就是图中的 Acceptor,它会将不同的请求下发到多个工作线程中处理。

在这个架构中,Acceptor 线程只是用于请求分发,不涉及具体的逻辑处理,非常得轻量级,因此有很高的吞吐量表现。而这些工作线程可以根据实际业务处理需要任意增减,从而动态调节系统负载能力。

为 Kafka 画一张类似的图的话,那它应该是这个样子的:
在这里插入图片描述
这两张图长得差不多。Kafka 的 Broker 端有个 SocketServer 组件,类似于 Reactor 模式中的 Dispatcher,它也有对应的 Acceptor 线程和一个工作线程池,只不过在 Kafka 中,这个工作线程池有个专属的名字,叫网络线程池。Kafka 提供了 Broker 端参数 num.network.threads,用于调整该网络线程池的线程数。其默认值是 3,表示每台 Broker 启动时会创建 3 个网络线程,专门处理客户端发送的请求。

Acceptor 线程采用轮询的方式将入站请求公平地发到所有网络线程中,因此,在实际使用过程中,这些线程通常都有相同的几率被分配到待处理请求。这种轮询策略编写简单,同时也避免了请求处理的倾斜,有利于实现较为公平的请求处理调度。

好了,你现在了解了客户端发来的请求会被 Broker 端的 Acceptor 线程分发到任意一个网络线程中,由它们来进行处理。那么,当网络线程接收到请求后,它是怎么处理的呢?你可能会认为,它顺序处理不就好了吗?实际上,Kafka 在这个环节又做了一层异步线程池的处理,我们一起来看一看下面这张图。

在这里插入图片描述

当网络线程拿到请求后,它不是自己处理,而是将请求放入到一个共享请求队列中。Broker 端还有个 IO 线程池,负责从该队列中取出请求,执行真正的处理。如果是 PRODUCE 生产请求,则将消息写入到底层的磁盘日志中;如果是 FETCH 请求,则从磁盘或页缓存中读取消息。

IO 线程池处中的线程才是执行请求逻辑的线程。Broker 端参数 num.io.threads 控制了这个线程池中的线程数。目前该参数默认值是 8,表示每台 Broker 启动后自动创建 8 个 IO 线程处理请求。你可以根据实际硬件条件设置此线程池的个数。

比如,如果你的机器上 CPU 资源非常充裕,你完全可以调大该参数,允许更多的并发请求被同时处理。当 IO 线程处理完请求后,会将生成的响应发送到网络线程池的响应队列中,然后由对应的网络线程负责将 Response 返还给客户端。

细心的你一定发现了请求队列和响应队列的差别:请求队列是所有网络线程共享的,而响应队列则是每个网络线程专属的。这么设计的原因就在于,Dispatcher 只是用于请求分发而不负责响应回传,因此只能让每个网络线程自己发送 Response 给客户端,所以这些 Response 也就没必要放在一个公共的地方。

我们再来看看刚刚的那张图,图中有一个叫 Purgatory 的组件,这是 Kafka 中著名的 “炼狱” 组件。它是用来缓存延时请求(Delayed Request)的。所谓延时请求,就是那些一时未满足条件不能立刻处理的请求。比如设置了 acks=all 的 PRODUCE 请求,一旦设置了 acks=all,那么该请求就必须等待 ISR 中所有副本都接收了消息后才能返回,此时处理该请求的 IO 线程就必须等待其他 Broker 的写入结果。当请求不能立刻处理时,它就会暂存在 Purgatory 中。稍后一旦满足了完成条件,IO 线程会继续处理该请求,并将 Response 放入对应网络线程的响应队列中。

到这里,Kafka 请求流程解析的故事就已经讲完了

到目前为止,这里提及的请求处理流程对于所有请求都是适用的,也就是说,Kafka Broker 对所有请求是一视同仁的。

3、控制类请求和数据类请求分离

在 Kafka 内部,除了客户端发送的 PRODUCE 请求和 FETCH 请求之外,还有很多执行其他操作的请求类型,比如负责更新 Leader 副本、Follower 副本以及 ISR 集合的 LeaderAndIsr 请求,负责勒令副本下线的 StopReplica 请求等。与 PRODUCE 和 FETCH 请求相比,这些请求有个明显的不同:它们不是数据类的请求,而是控制类的请求。也就是说,它们并不是操作消息数据的,而是用来执行特定的 Kafka 内部动作的。

我来举个例子说明一下。假设我们有个主题只有 1 个分区,该分区配置了两个副本,其中 Leader 副本保存在 Broker 0 上,Follower 副本保存在 Broker 1 上。假设 Broker 0 这台机器积压了很多的 PRODUCE 请求,此时你如果使用 Kafka 命令强制将该主题分区的 Leader、Follower 角色互换,那么 Kafka 内部的控制器组件(Controller)会发送 LeaderAndIsr 请求给 Broker 0,显式地告诉它,当前它不再是 Leader,而是 Follower 了,而 Broker 1 上的 Follower 副本因为被选为新的 Leader,因此停止向 Broker 0 拉取消息。

这时,一个尴尬的场面就出现了:如果刚才积压的 PRODUCE 请求都设置了 acks=all,那么这些在 LeaderAndIsr 发送之前的请求就都无法正常完成了。就像前面说的,它们会被暂存在 Purgatory 中不断重试,直到最终请求超时返回给客户端。

设想一下,如果 Kafka 能够优先处理 LeaderAndIsr 请求,Broker 0 就会立刻抛出 NOT_LEADER_FOR_PARTITION 异常,快速地标识这些积压 PRODUCE 请求已失败,这样客户端不用等到 Purgatory 中的请求超时就能立刻感知,从而降低了请求的处理时间。即使 acks 不是 all,积压的 PRODUCE 请求能够成功写入 Leader 副本的日志,但处理 LeaderAndIsr 之后,Broker 0 上的 Leader 变为了 Follower 副本,也要执行显式的日志截断(Log Truncation,即原 Leader 副本成为 Follower 后,会将之前写入但未提交的消息全部删除),依然做了很多无用功。

再举一个例子,同样是在积压大量数据类请求的 Broker 上,当你删除主题的时候,Kafka 控制器(我会在专栏后面的内容中专门介绍它)向该 Broker 发送 StopReplica 请求。如果该请求不能及时处理,主题删除操作会一直 hang 住,从而增加了删除主题的延时。

那么,社区是如何解决的呢?社区实现了两类请求的分离。也就是说,Kafka Broker 启动后,会在后台分别创建两套网络线程池和 IO 线程池的组合,它们分别处理数据类请求和控制类请求。至于所用的 Socket 端口,自然是使用不同的端口了,你需要提供不同的 listeners 配置,显式地指定哪套端口用于处理哪类请求。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/257009.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Hypervisor Display架构

Hypervisor Display架构部分 1,所有LA侧的APP与显示相关的调用最终都会交由SurfaceFlinger处理 2,SurfaceFlinger会最终调用android.hardware.graphics.composer2.4-service服务 3,android.hardware.graphics.composer2.4-service服务会调用G…

针对企业的泄密,天锐绿盾提出十大解决方案

01 防止公司内部数据泄密 通过动态加解密技术,有效防止公司内部数据泄密。即员工在创建、编辑文档时会被自动加密存放在硬盘上,防止员工故意或由于疏忽而造成泄密或对文件恶意破坏。 管理思路>> ● 不改变员工使用习惯、不改变文件格式、不改变…

APM固件编译和仿真

事情起因 主要想对无人机APM固件进行仿真的算法验证,因实际飞行的过程实际验证太浪费飞机了,所以就先试用仿真对算法进行仿真开发。 一,环境搭建 环境搭建我建议参考官方英文教程,英文教程写的比较全,不懂可以自己使…

科聪控制系统典型应用车型 —— 料箱机器人

料箱机器人即料箱AGV是一种智能化物流搬运设备,它可以代替人力完成出库入库和搬运工作,可根据出入库生产出货需求,将货物从起点运送到终点,自动柔性完成货到人货到点的操作。 提升仓储和物流效率的自动化利器 料箱机器人的投用能…

yolov5单目测距+速度测量+目标跟踪(算法介绍和代码)

要在YOLOv5中添加测距和测速功能,您需要了解以下两个部分的原理: 单目测距算法 单目测距是使用单个摄像头来估计场景中物体的距离。常见的单目测距算法包括基于视差的方法(如立体匹配)和基于深度学习的方法(如神经网…

C语言--字符函数与字符串函数

大家好,我是残念,希望在你看完之后,能对你有所帮助,有什么不足请指正!共同学习交流 本文由残念ing 原创CSDN首发,如需要转载请通知 个人主页:残念ing-CSDN博客,欢迎各位→点赞&#…

leetcode---76. 最小覆盖子串 [C++/滑动窗口+哈希表]

原题:76. 最小覆盖子串 - 力扣(LeetCode) 题目解析: 此题在这道题的基础上进行理解会更简单 leetcode --- 30. 串联所有单词的子串[C 滑动窗口/双指针]-CSDN博客 本题要求在s字符串中找到含有t字符串所有字符的最短子串。 也就是…

【lesson17】MySQL表的基本操作--表去重、聚合函数和group by

文章目录 MySQL表的基本操作介绍插入结果查询(表去重)建表插入数据操作 聚合函数建表插入数据操作 group by(分组)建表插入数据操作 MySQL表的基本操作介绍 CRUD : Create(创建), Retrieve(读取),Update(更新)&#x…

XAgent的部署及运行

源代码clone git clone config 文件的修改 在XAgent源码目录,运行 vi .env, 修改以下配置条目 CONFIG_FILEassets/gpt-3.5-turbo_config.ymlpython环境 python >3.10 安装conda,通过conda激活python3.10的环境 wget https://repo.a…

josef约瑟 跳合位、电源监视继电器 HRTH-Y-2H2D DC220V

系列型号: HRTH-Y-2H2D-X-T跳位监视、合位监视、电源监控继电器; HRTH-Y-2Z-X-T跳位监视、合位监视、电源监控继电器; HRTH-Y-2H-X-T跳位监视、合位监视、电源监控继电器; HRTH-J-2H2D-X-T跳位监视、合位监视、电源监控继电器…

Axure中继器的基本使用

介绍中继器 在 Axure 中,中继器是一种交互设计元素,用于在不同页面之间传递数据或触发特定的事件。它可以帮助模拟真实的用户交互流程和页面之间的传递逻辑,继承关系用于描述两个元件之间的父子关系。通过使用继承关系,您可以创建…

Linux的SSH(远程登录)

SSH定义: SSH(Secure Shell 的缩写)是一种网络协议,用于加密两台计算机之间的通信,并且支持各种身份验证机制。 实务中,它主要用于保证远程登录和远程通信的安全,任何网络服务都可以用这个协议…

深度学习笔记_7经典网络模型LSTM解决FashionMNIST分类问题

1、 调用模型库,定义参数,做数据预处理 import numpy as np import torch from torchvision.datasets import FashionMNIST import torchvision.transforms as transforms from torch.utils.data import DataLoader import torch.nn.functional as F im…

自定义 spring-boot组件自动注入starter

1&#xff1a;创建maven项目 2&#xff1a;pom文件 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocati…

stm32与Freertos入门(二)移植FreeRTOS到STM32中

简介 注意&#xff1a;FreeRTOS并不是实时操作系统&#xff0c;而是分时复用的&#xff0c;只不过切换频率很快&#xff0c;感觉上是同时在工作。本次使用的单片机型号为STM32F103C8T6,通过CubeMX快速移植。 一、CubeMX快速移植 1、选择芯片 打开CubeMX软件&#xff0c;进行…

轻量封装WebGPU渲染系统示例<53>- 多盏灯灯光照在地面的效果

WebGPU实时渲染实现模拟多盏灯的灯光照在地面的效果灯光效果 。 当前示例源码github地址: https://github.com/vilyLei/voxwebgpu/blob/feature/material/src/voxgpu/sample/MultiLightsTest.ts 当前示例运行效果: 此示例基于此渲染系统实现&#xff0c;当前示例TypeScript源…

Ribbon负载均衡原理、策略、饥饿加载

Ribbon负载均衡原理、策略、饥饿加载 MapperScan("cn.itcast.order.mapper") SpringBootApplication public class OrderApplication {public static void main(String[] args) {SpringApplication.run(OrderApplication.class, args);}/*** 完成创建RestTemplate&am…

虾皮 选品:如何在虾皮平台上进行有效的选品?

在虾皮&#xff08;Shopee&#xff09;这个跨境电商平台上&#xff0c;选品对于卖家来说至关重要。选品决定了店铺的销售额和竞争力。为了帮助卖家进行选品&#xff0c;虾皮平台提供了一些免费的选品工具&#xff0c;如知虾。同时&#xff0c;还有一些第三方选品工具&#xff0…

支持可视化提取变量,Apipost配置变量不要太简单

在调试接口时我们需要将响应结果中的某个字段配置为环境变量在其他接口中引用&#xff0c;之前在Apipost中需要配置脚本而在最近Apipost后执行操作中可以进行可视化的断言和变量提取&#xff0c;无需配置繁琐脚本。 这里我们在登录接口下配置一条Token环境变量&#xff0c;在后…
最新文章