Kafka 基础整理、 Springboot 简单整合

定义:

  • Kafka 是一个分布式的基于发布/订阅默认的消息队列
  • 是一个开源的分布式事件流平台,被常用用于数据管道、流分析、数据集成、关键任务应用

消费模式:

  • 点对点模式 (少用)
    消费者主动拉取数据,消息收到后清除消息
    在这里插入图片描述
  • 发布/订阅模式
    生产者推送消息到队列,都消费者订阅各自所需的消息
    在这里插入图片描述

基本概念:

  • Producer: 消息生产者
  • Consumer: 消费者
  • Consumer: Group 消费者组,消费者组id相同得消费者为一个消费者组;一个消费者也为一个消费者组去消费
  • Broker: kafka服务器
  • Topic :消息主题, 数据分类
  • Partition :分区,一个Tpoic 有多个分区组成
  • Replica : 副本,每个分区对应多个副本
  • Leader:副本里包含leader、follower ;生产以及消费只针对 leader

生产者发送流程:

  • producer -> send(producerRecord) -> interceprots 拦截器 -> Serializer 序列化器 -> Partitioner 分区器
  • 当数据累积到 batch.size之后,sender才会发送数据;默认16k
  • 如果数据迟迟未达到batch.size , sender等待linger.ms设置的时间,到了之后就会发送数据。单位ms.默认值 0ms,标识没有延迟
  • compression.type 数据压缩方式
  • RecordAccumulator 缓冲区大小,默认32m
  • 应答模式ack
    • 0: 生产者发送数据后,不需要等待数据应答
    • 1:生产者发送过来的数据,Leader收到数据后应答
    • 1:all leader与其它所有节点收齐数据后应答在这里插入图片描述

消费大概逻辑:

  • 在这里插入图片描述

消费者组(Consumer Group (CG)):

  • groupid相同的消费者形成一个消费者组
  • 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内的一个消费者消费
  • 消费者组之间互不影响
  • 当消费者组的数量,大于分区数,则会有闲置
  • coordinator:辅助实现消费者组的初始化分区的分配
    • 每个节点有个coordinator, 通过 groupid % 50,选择出 coordinator 节点 50为 _consumer_offset 的分区数
    • 1%50 = 1 , _consumer_offset 的 号分区上的 coordinator 则为 leader
    • coordinator 再消费者组中随机选择一个 consumer 成为leader,由leader 制定消费计划,让后返回给 coordinator ,再由coordinator 来把消费技化 分配给其它消费者
    • coordinator 与消费者的心跳保持时间 3秒45秒 超时 - 会移除消费者,触发再平衡
    • 消费者消费时间过长,默认 5分钟 - 会移除消费者触发再平衡

消费流程:

  • 创建消费者网络连接客户端 ConsumerNetworkClient,与kafka交互
  • 消费请求初始化:每批次最小抓取大小、数据未达到超时 时间 500ms 、抓取数据大小上限
  • 发送消费请求 -》onSuccess() 回调, 拉取数据 -》 按批次放入消息队列
  • 消费者从 消息队列每批次消费数据 (500条) -》反序列化 -》拦截器 -》 处理数据
  • 1

消费计划(分区分配策略)默认 Range + CooperativeSticky:

  • Range:针对每一个topic,对topic分区排序、消息者排序,通过分区数 / 消费者数,决定每个消息者消费几个分区,除不尽的前面的消费者多消费。 容易产生数据倾斜
    在这里插入图片描述
  • RoundRobin:轮询分区策略,针对所有topic ,把所有topic的分区和消费者列出来,按照hashcode进行排序,通过轮询算法把分区分配给消费者
  • Sticky :黏性 (执行新的分配的时,尽量靠近上次的分配结果),首先回尽量的均匀,且随机分配分区到消费者
  • CooperativeSticky:协作者黏性,Sticky 的策略相同,但支持合作式再平衡,消费者可以继续从没有被重新分配的分区消费

offset 位移: 是标记消费消费位置

  • <0.9 : 是维护在 zookeeper中
  • 0.9 之后:offset 维护在一个内置的 topic :_consumer_offsets 中
  • 采用 key - value 方式存储数据,key:groupid +topic + 分区号
  • offset 自动提交:默认每5秒自动提交offset ,默认就是 true
  • offset 手动提交:消费的时候,手动提交offset
    • 同步:等待提交offset成功,再消费下一条
    • 异步:不等待,直接消费,失败后没有重试机制
  • 指定offset 消费:
    • earliest: 自动将偏移量重置为最早的偏移量 --from-beginning
    • latest (默认): 自动将偏离量充值为最新偏移量
    • nono: 如果未找到消费者组的先前偏移量,则向消费者抛出异常。
//设置自动提交offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
//自动提交时间 5s
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"5000");
//offset 手动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

KafkaConsumer kafkaConsumer = new KafkaConsumer<String,String>(properties);
//定义主题
ArrayList<String> topics = new ArrayList<>();
topics.add("first");
//订阅
kafkaConsumer.subscribe(topics);
while (true){
    ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
    if (CollectionUtil.isNotEmpty(consumerRecords)){
        for (ConsumerRecord<String, String> record : consumerRecords) {
            System.out.println(record);
        }
    }
    //手动提交offset
    kafkaConsumer.commitAsync();
}

指定时间消费:

 //查询对应分区
 Set<TopicPartition> partitions = kafkaConsumer.assignment();

  //保证分区分配方案定制完毕
  while (partitions.size()==0){
      kafkaConsumer.poll(Duration.ofSeconds(1));
      partitions=kafkaConsumer.assignment();
  }
  //把时间转换成对应的 offset
  Map<TopicPartition,Long> map = new HashMap<>(6);
  Map<TopicPartition,Long> offsetmap = kafkaConsumer.offsetsForTimes(map);
  for (TopicPartition topicPartition : partitions) {
      //一天前
      offsetmap.put(topicPartition,System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
  }
  Map<TopicPartition, OffsetAndTimestamp> offsetsForTimeMap = kafkaConsumer.offsetsForTimes(offsetmap);
  for (TopicPartition partition : partitions) {
      OffsetAndTimestamp timestamp = offsetsForTimeMap.get(partition);
      kafkaConsumer.seek(partition,timestamp.offset());
  }

kafka 文件存储机制 :

  • Topic 是逻辑上的概念,partition是物理上的概念, 每个partition对应一个log文件。该log文件中存储的就是 producer生产的数据
  • Producer生产的数据,会被不断追加到该log文件末端,为防止log文件过大导致数据定位效率低,kafka采取了分片和索引机制
  • 每个partition分为多个 segment,每个segment包含, .index .log .timeindex .snapshot 文件
  • 这些文件位于一个文件夹下,该文件夹命名规则为:topic名称+分区序号 first-0
    1
    2
  • 稀疏索引:大约每往log文件写入 4kb数据,会往index文件写入一条索引。
    • index文件中保存的 odffset是相对offset,这样能确保 offset得值所占空间不会过大,因此能将offset得值控制在固定大小

文件清除、压缩策略:

  • kafka 默认日志保存时间为 7 天
  • 压缩策略:compact,对应相同key的value,只保留最新的一个版本。

kafka 高效读写:

  • Kafka 本身是分布式集群,可以采用分区技术,并行度高
  • 读数据采用稀疏索引,可以快速定位要消费得数据
  • 顺序写磁盘,kafka得producer生产数据,要写入log文件中,写得过程是一直追加到文件末端,为顺序写
  • 零拷贝: Kaka的数据加工处理操作交由Kaka生产者和Kaka消费者处理。Kaka Broker应用层不关心存储的数据,所以就不用走应用层,传输效率高。
  • 页缓存: Kaka重度依赖底层操作系统提供的PageCache功能。当上层有写操作时,操作系统只是将数据写PageCache。当读操作发生时,先从PageCache中查找,如果找不到,再去磁盘中读取。实际上PageCache是把尽可能多的空闲内存都当做了磁盘缓存来使用。

常用脚本命名:

  • topic 相关命令
  • 查询topic列表 :sh kafka-topics.sh --bootstrap-server localhost:9092 --list
  • 创建topic (名称:first 分区:1个 副本 3个)副本数量不能超过集群数量
    • sh kafka-topics.sh --bootstrap-server localhost:9092 --topic first --create --partitions 1 --replication-factor 3
  • topic 信息
    • sh kafka-topics.sh --bootstrap-server localhost:9092 --topic first --describe
  • 修改topic 分区数(只能增加)
    • sh kafka-topics.sh --bootstrap-server localhost:9092 --topic first --describe --partitions 3
  • 生产消息:
    • sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first
  • 消费消费:
    • sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first
    • sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first --from-beginning

Spring boot 简单整合:

<dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
</dependency>
spring:
  mvc:
    pathmatch:
      matching-strategy: ant_path_matcher
  application:
    name: @artifactId@
  kafka:
    bootstrap-servers: 192.168.2.207:9092,192.168.2.208:9092
    # 生产配置
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    #消费配置
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      group-id: xiaoshu
      enable-auto-commit: false
    listener:
      # 手动调用Acknowledgment.acknowledge()后立即提交
      ack-mode: manual

生产:

	@Resource
    private KafkaTemplate<String,String> kafkaTemplate;

    @ApiOperation(value = "推送消息到kafak")
    @GetMapping("/sendMsg")
    public String sendMsg(String msg){
        String topic="first";
        kafkaTemplate.send(topic,msg);
        return "ok";
    }

消费:

@Configuration
public class KafkaConsumer {

    @KafkaListener(topics = {"first"})
    public void consumerTopic(ConsumerRecord<?, ?> record, Acknowledgment ack){
        String topic = record.topic();
        long offset = record.offset();
        int partition = record.partition();
        Object value = record.value();
        try {
            System.out.println("收到消息:"+value);
            ack.acknowledge();
        }catch (Exception e){
            e.printStackTrace();
        }
    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/3981.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【蓝桥杯嵌入式】第十四届蓝桥杯嵌入式[模拟赛2]客观题及详细题解

题1 解析 USART_CR1:控制寄存器1&#xff0c;其中的M位定义了数据字的长度&#xff0c;由软件对其设置和清零。USART_CR2:控制寄存器2。USART_BRR:波特率寄存器。USART_DR:数据寄存器。 (如果现场不记得&#xff0c;可以查阅芯片手册) 答案: A 题2 解析 在STM32微控制器中&a…

每日学术速递3.29

CV - 计算机视觉 | ML - 机器学习 | RL - 强化学习 | NLP 自然语言处理 Subjects: cs.CV 1.CC3D: Layout-Conditioned Generation of Compositional 3D Scenes 标题&#xff1a;CC3D&#xff1a;合成 3D 场景的布局条件生成 作者&#xff1a;Sherwin Bahmani, Jeong Joon …

五、MyBatis各种查询功能

MyBatis的各种查询功能 如果查询出的数据只有一条&#xff0c;可以通过 实体类对象接收List集合接收Map集合接收 如果查询出的数据有多条&#xff0c;一定不能用实体对象接收&#xff0c;会抛TooManyResultsException&#xff0c;可以通过 实体类类型的List集合接收Map类型的L…

学习系统编程No.11【重定向的本质】

引言&#xff1a; 北京时间&#xff1a;2023/3/27/7:05&#xff0c;哈哈哈&#xff0c;首先是开心&#xff0c;因为上篇博客热榜目前第15&#xff0c;让我初步掌握了上热榜的小妙招&#xff0c;不单单只是要日更&#xff0c;还有非常多的上榜小技巧&#xff0c;但是首先连续更…

【备战蓝桥杯】----01背包问题(动态规划)

&#x1f339;作者:云小逸 &#x1f4dd;个人主页:云小逸的主页 &#x1f4dd;Github:云小逸的Github &#x1f91f;motto:要敢于一个人默默的面对自己&#xff0c;强大自己才是核心。不要等到什么都没有了&#xff0c;才下定决心去做。种一颗树&#xff0c;最好的时间是十年前…

【数据结构】堆(堆的实现 堆向下调整算法 堆的创建 堆的插入 堆的删除 堆的代码实现 堆的应用)

文章目录堆的实现堆向下调整算法堆的创建堆的插入堆的删除堆的代码实现堆的应用堆的实现 堆是属于操作系统进程地址空间内存区域的划分。 我们下面实现数据结构中的堆。 堆是一个完全二叉树&#xff1a;分为小根堆和大根堆。 小根堆&#xff1a;任何一个节点的值都<孩子的…

YOLOv8原理解析:重新定义实时目标检测的速度和精度

文章目录0.前言1.YOLOv51.1 YOLOv5网络回顾1.2 YOLOv5网络结构图2.YOLOv82.1 YOLOv8概述2.2 YOLOv8整体结构图2.3 YOLOv8yaml 文件与 YOLOv5yaml 文件对比2.3.1 参数部分2.3.2 主干部分2.3.3 Neck部分2.3.4 Head部分2.4 正负样本分配策略2.4.1 静态分配策略和动态分配策略有什么…

【嵌入式烧录/刷写文件】-1.1-详解Motorola S-record(S19/SREC/mot/SX)格式文件

目录 1 什么是Motorola S-record 2 Motorola S-record的格式 2.1 Motorola S-record的结构 2.1.1 “Record type记录类型”的说明 2.1.2 “Record length记录长度”的说明 2.1.3 如何计算“Checksum校验和” 2.2 Record order记录顺序 2.3 Text line terminator文本行终…

【C语言】柔性数组

柔性数组1. 柔性数组介绍2. 柔性数组特点3. 用例3.1 代码一&#xff1a;3.2 代码二&#xff1a;4. 柔性数组优势&#xff1a;1. 柔性数组介绍 也许你从来没有听说过柔性数组&#xff08;flexible array&#xff09;这个概念&#xff0c;但是它确实是存在的。 C99 中&#xff0c…

#详细介绍!!! 线程池的拒绝策略(经典面试题)

本篇单独讲解线程池的拒绝策略&#xff0c;介绍了当线程池任务满了之后&#xff0c;线程池会以什么样的方式来响应添加进来的任务 目录 一&#xff1a;理解线程池拒绝策略的触发情况代码理解 二&#xff1a;线程池的四种常见的拒绝策略 1.ThreadPoolExecutor.AbortPolicy 2…

【Spring6】| GoF之代理模式(静态代理和动态代理)

目录 一&#xff1a;GoF之代理模式 1. 对代理模式的理解 2. 静态代理 3. 动态代理 3.1 JDK动态代理 3.2 CGLIB动态代理 一&#xff1a;GoF之代理模式 1. 对代理模式的理解 生活场景1&#xff1a;牛村的牛二看上了隔壁村小花&#xff0c;牛二不好意思直接找小花&#xff…

内网渗透之某后渗透利用【网络安全】

0x01 工具介绍 工具原理 Mimikatz 的主要原理是在 Windows 系统中用户登录后系统会将身份凭证存储于lsass.exe进程的内存当中&#xff0c;Mimikatz 通过注入lsass.exe进程读取进程内存&#xff0c;从中获取对应的明文密码。 常见问题 在 Windows Vista 系统之后不再存储 LM…

0203优先级下的调度问题_环_拓扑排序-有向图-数据结构和算法(Java)

1 概述 在和有向图相关的实际应用中&#xff0c;有向环特别的重要。在实际应用中&#xff0c;一般只会重点关注其中的一小部分&#xff0c;或者只想知道它们是否存在。 2 调度问题 一种应用广泛的模型是给定一组任务并安排它们的执行顺序&#xff0c;限制条件是这些任务的执…

机器学习:逻辑回归模型算法原理(附案例实战)

机器学习&#xff1a;逻辑回归模型算法原理 作者&#xff1a;i阿极 作者简介&#xff1a;Python领域新星作者、多项比赛获奖者&#xff1a;博主个人首页 &#x1f60a;&#x1f60a;&#x1f60a;如果觉得文章不错或能帮助到你学习&#xff0c;可以点赞&#x1f44d;收藏&#…

Github学生包申请秒过经验并使用Copilot

写在前面 前提是在校学生&#xff0c;且有学校邮箱&#xff0c;当然你也得有Github账户前往学信网下载 教育部学籍在线验证报告将报告转换成英文版本&#xff0c;我用的是手机夸克自带的拍照翻译功能 具体流程 设置Github个人信息 来到 https://github.com/settings/profil…

如何用Postman做接口自动化测试?没有比这个更详细的了

目录 前言 什么是自动化测试 自动化测试有哪些分类 为什么需要自动化测试 Postman自动化测试演示 1.新建集合 2.新建接口 3.填写自动化测试脚本 4.录入所有接口 5.执行自动化测试 前言 什么是自动化测试 把人对软件的测试行为转化为由机器执行测试行为的一种实践。 …

腾讯Coding平台学习笔记二:自定义团队插件的使用方法

目录一、前言二、系统环境三、工作目标四、流水线设置五、开发工具5.1 教程地址5.2 开发工具程序结构5.3 qciplugin.yml文件5.4 main.py文件六、插件的安装6.1 打包成zip6.2 上传zip包6.3 构建新插件6.4 质量门禁7、流水线设置7.1 添加质量管理阶段节点7.2 添加其它动作八、流水…

cookie和session的原理以及在Servlet中的应用

文章目录简介cookiecookie的实质及实现原理cookie在Servlet的应用sessionsession的实质及实现原理session在Servlet中的应用HttpServletRequest&#xff0c;Session&#xff0c;ServletContext简介 cookie保存在客户端&#xff0c;session保存在服务器端。二者均用于描述会话的…

【第十一届“泰迪杯”数据挖掘挑战赛】B题产品订单的数据分析与需求预测“解题思路“”以及“代码分享”

【第十一届泰迪杯B题产品订单的数据分析与需求预测产品订单的数据分析与需求预测 】第一大问代码分享&#xff08;后续更新LSTMinformer多元预测多变量模型&#xff09; PS: 代码全写有注释&#xff0c;通俗易懂&#xff0c;包看懂&#xff01;&#xff01;&#xff01;&…

RK3568平台开发系列讲解(驱动基础篇)IO 模型的分类

🚀返回专栏总目录 文章目录 一、阻塞 IO二、非阻塞 IO三、IO 多路复用四、信号驱动五、异步 IO沉淀、分享、成长,让自己和他人都能有所收获!😄 📢本篇将针对IO模型进行分类。 假设有这样一个场景,从磁盘中循环读取 100M 的数据并处理,磁盘读取 100M 需要花费 20 秒的…
最新文章