kafka日志存储

前言

    kafka的主题(topic)可以对应多个分区(partition),而每个分区(partition)可以有多个副本(replica),我们提生产工单创建topic的时候也是要预设这些参数的。但是它究竟是如何存储的呢?

    我们在使用kafka发送消息时,实际表现是提交日志,日志记录会一个接一个地追加到日志的末尾,同时为了避免单一日志文件过大无线膨胀,kafka采用了日志分段(LogSegment)的形式进行存储。所谓日志分段,就是当一个日志文件大小到达一定条件之后,就新建一个新的日志分段,然后在新的日志分段写入数据。每个日志段对象会在磁盘上创建一组文件,包括消息日志文件(.log)、位移索引文件(.index)、时间戳索引文件 (.timeindex),也就是说日志段才是kafka真正的日志文件存储基础单元。

    整个主题、分区、副本、日志关系如下:

在这里插入图片描述

    以__consumer_offsets这个topic为例,每一个目录对应一个分区,说明dev环境下这个topic有50个分区,每个子目录下存在多组日志段,也就是多组.log、.index、.timeindex 文件组合。

    进入/tmp/kafka-logs/__consumer_offset-49目录,下图中文件名的一串数字0是该日志段的起始位移值(Base Offset),也就是该日志段中所存的第一条消息的位移值,由此也可以推测出0000000000000000000.log中共有20条日志记录

在这里插入图片描述

配置

    前面是从生产配置和kafka目录的文件直观看到消息相关的内容,下面引入几个kafka消息相关的配置。

日志清理策略

  kafka log的清理策略有两种:delete,compact,默认是delete
  • delete:一般是使用按照时间保留的策略,当不活跃的segment的时间戳是大于设置的时间的时候,当前segment就会被删除
  • compact: 日志不会被删除,会被去重清理,这种模式要求每个record都必须有key,然后kafka会按照一定的时机清理segment中的key,对于同一个key只保留最新的那个key。同样的,compact也只针对不活跃的segment
    对应的配置是log.cleanup.policy: delete,对应topic级别的配置是cleanup.policy

消息保存时长

    Kafka 支持服务器级保留策略,我们可以通过配置三个基于时间的配置属性之一来调整该策略:
  • log.retention.hours

  • log.retention.minutes

  • log.retention.ms
    其默认配置是log.retention.hours=168,即默认保留7天, Kafka自身会用较高精度值覆盖较低精度值。因此,如果在配置中新增log.retention.minutes=10,消息的保留时间将会变更位10分钟

     上面这个配置是服务器级别的,配置在server.properties中,每次新增创建topic时,如果不指定topic的日志保留时间,以上述配置为例,消息的保留时长就是7天,如果配置retention.ms=600000,这是从log.retention.minutes派生而来的,这个参数是topic级别的,配置了这个值,就会以这个值为准,创建topic之后,仍然可以单独调整retention.ms,来调整topic的保留时间
    

segment相关配置

    segment有两个很重要的配置
  • log.segment.bytes

  • log.roll.hours
    目前风控kafka上述两个配置都是默认值,见下图,log.segment.bytes是1G,log.roll.hours是7d,这两个配置在后续分析segment相关原理时还会再详细介绍
    在这里插入图片描述

     segment扫描频率的配置,日志片段文件检查的周期时间,目前生产配置为5min
    
  • log.retention.check.interval.ms
    在这里插入图片描述

原理

    为什么要看源码?因为检索了kafka的官方文档,关于segment的单独说明极少,网上检索到的资料又没有足够的说服力,所以最终决定还是从源码中寻找相关问题的答案

kafka源码搭建
当前风控系统使用的kafka是1.1版本,随机下载了相关源码,由于kafka是用scala编写的,并用gradle进行打包处理,也进行了相关程序的下载,对应的版本见下述列出,感兴趣的同事可以按如下版本下载,解压编译kafka源码,然后就可以在idea中查看了

  • kafka源码版本:1.1
  • scala版本:2.12.7
  • gradle版本:4.7

LogConfig

    该scala定义了Defaults object,scala中的Object可以看成java中的util类,存放了很多常量
  • log.segment.bytes: 1GB
  • log.roll.ms: 168hour
  • log.retention.bytes: -1
  • log.retention.ms: 168hour

LogSegment

     正如前文中介绍的,segment才是kafka存储单元的基础部分,随之找到了相关类LogSegment.scala

变量声明

     它的参数定义如下,可以查看它的注释,明确的支出了segment是由log和index组成的,这也与我们前面查看kafka目录中对应的日志文件呼应了
/**
 * A segment of the log. Each segment has two components: a log and an index. The log is a FileMessageSet containing
 * the actual messages. The index is an OffsetIndex that maps from logical offsets to physical file positions. Each
 * segment has a base offset which is an offset <= the least offset of any message in this segment and > any offset in
 * any previous segment.
 *
 * A segment with a base offset of [base_offset] would be stored in two files, a [base_offset].index and a [base_offset].log file.
 *
 * @param log The message set containing log entries
 * @param offsetIndex The offset index
 * @param timeIndex The timestamp index
 * @param baseOffset A lower bound on the offsets in this segment
 * @param indexIntervalBytes The approximate number of bytes between entries in the index
 * @param time The time instance
 */
@nonthreadsafe
@nonthreadsafe
class LogSegment private[log] (val log: FileRecords,
                               val offsetIndex: OffsetIndex,
                               val timeIndex: TimeIndex,
                               val txnIndex: TransactionIndex,
                               val baseOffset: Long,
                               val indexIntervalBytes: Int,
                               val rollJitterMs: Long,
                               val maxSegmentMs: Long,
                               val maxSegmentBytes: Int,
                               val time: Time) extends Logging {
	.......
}
      针对segment的成员变量,重点看以下几个
  • baseOffset:消息偏移量,即文件名,对于一组sgement,它都是固定的,它就是该日志段中第一条消息的位移值,一共20位,不足的话前面补0,每个日志段对象保存自己的起始位移 baseOffset,这是非常重要的属性,在源码中经常看到它的使用!事实上,你在磁盘上看到的文件名就是 baseOffset 的值。每个 LogSegment 对象实例一旦被创建,它的起始位移就是固定的了,不能再被更改。
  • maxSegmentBytes:每段最大字节数,该参数越大,日志被切成的segment就越少,控制粒度也就变小了,通过代码debug发现这个参数取决于配置【log.segment.bytes】,目前风控系统中配置为1G
  • maxSegmentMs:每段保留有效毫秒数,每个segment在写入一段时间的日志后,即使log还没有达到maxSegmentBytes最大值,kafka也会强制日志滚动,以确保可以删除或者压缩旧数据,该参数取决于【log.roll.ms】或【log.roll.hours】,当前者不存在时,取后者,后者目前在生产环境中配置为168h
  • rollJitterMs:是日志段对象新增倒计时的“扰动值”。因为目前 Broker 端日志段新增倒计时是全局设置,这就是说,在未来的某个时刻可能同时创建多个日志段对象,这将极大地增加物理磁盘 I/O 压力。有了 rollJitterMs 值的干扰,每个新增日志段在创建时会彼此岔开一小段时间,这样可以缓解物理磁盘的 I/O 负载瓶颈。这个变量给我的感觉有点像设置缓存时间加的随机值,避免缓存同时过期。

shouldRoll方法

    segment是否应该进行切分(roll)
def shouldRoll(messagesSize: Int, maxTimestampInMessages: Long, maxOffsetInMessages: Long, now: Long): Boolean = {
  
  val reachedRollMs = timeWaitedForRoll(now, maxTimestampInMessages) > maxSegmentMs - rollJitterMs
  size > maxSegmentBytes - messagesSize ||
    (size > 0 && reachedRollMs) ||
    offsetIndex.isFull || timeIndex.isFull || !canConvertToRelativeOffset(maxOffsetInMessages)
}
  • timeWaitedForRoll(now, maxTimestampInMessages):1. 如果此segment的第一个消息的时间戳存在,就用当前的新的batch的时间戳,减去此segment第一条消息的的时间戳判断是否已经超过segments.ms,2. 如果此segments的第一个消息的时间戳不存在,就用当前系统时间与此segment创建的时间差做判断。
  • reachedRollMs就表示,是否超过上述日志写入事件差值是否超过【log.roll.hours】
  • size > maxSegmentBytes - messagesSize:当前 activeSegment 在追加本次消息之后,长度超过 LogSegment 允许的最大值【log.segment.bytes】
  • offsetIndex.isFull || timeIndex.isFull:索引文件是否满了
  • !canConvertToRelativeOffset(maxOffsetInMessages):这个变量涉及到offset的相对位移概念,后面再介绍

append方法

// Update the in memory max timestamp and corresponding offset.
if (largestTimestamp > maxTimestampSoFar) {
   maxTimestampSoFar = largestTimestamp
   offsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp
}
     我这次分享的重点都是跟segment的写入和删除相关的,所以只重点介绍与之相关的内容,下面的源码解析也是这个思路

     在append方法中,即往segment写入消息时,也会同步更新segment的最大时间戳以及最大时间戳所属消息的位移值属性。每个日志段都要保存当前最大时间戳信息和所属消息的位移信息。在 Broker 端的提供定期删除日志功能中,比如我只想保留最近 7 天的日志,此处的当前最大时间戳这个值就是判断的依据;

Log

    Log 对象是 Kafka 源码(特别是 Broker 端)最核心的部分,没有之一。

    日志是日志段的容器,里面定义了很多管理日志段的操作。

object Log

“.deleted”
    .deleted 是删除日志段操作创建的文件。目前删除日志段文件是异步操作,Broker 端把日志段文件从.log 后缀修改为.deleted 后缀。如果你看到一大堆.deleted 后缀的文件名,别慌,这是 Kafka 在执行日志段文件删除。
filenamePrefixFromOffset
def filenamePrefixFromOffset(offset: Long): String = {
  val nf = NumberFormat.getInstance()
  nf.setMinimumIntegerDigits(20)
  nf.setMaximumFractionDigits(0)
  nf.setGroupingUsed(false)
  nf.format(offset)
}
    这个方法的作用是通过给定的位移值计算出对应的日志段文件名。Kafka 日志文件固定是20 位的长度,filenamePrefixFromOffset 方法就是用前面补 0 的方式,把给定位移值扩充成一个固定 20 位长度的字符串。

Log Class

/* the actual segments of the log */
private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
    为什么说Log是管理segment的容器,从这个字段就可以看出来,这是 Log 类中最重要的属性之一。它保存了分区日志下所有的日志段信息,只不过是用 Map 的数据结构来保存的。Map 的 Key 值是日志段的起始位移值,Value 则是日志段对象本身。Kafka 源码使用 ConcurrentNavigableMap 数据结构来保存日志段对象,就可以很轻松地利用该类提供的线程安全和各种支持排序的方法,来管理所有日志段对象。

    它是键值(Key)可排序的 Map。Kafka 将每个日志段的起始位移值作为 Key,这样一来,我们就能够很方便地根据所有日志段的起始位移值对它们进行排序和比较,同时还能快速地找到与给定位移值相近的前后两个日志段。

Log中删除Segment操作

三个留存策略
/**
 * Delete any log segments that have either expired due to time based retention
 * or because the log size is > retentionSize
 */
def deleteOldSegments(): Int = {
  if (!config.delete) return 0
  deleteRetentionMsBreachedSegments() + deleteRetentionSizeBreachedSegments() + deleteLogStartOffsetBreachedSegments()
}
    config.delete:配置的删除策略,要配置delete才会进行真实的删除操作

    deleteRetentionMsBreachedSegments:其中的核心条件是(segment, _) => startMs - segment.largestTimestamp > config.retentionMs,这是一个匿名函数,startMs是当前时间,largestTimestamp 正是上文提到的LogSegment在每次写入日志时都会修改的最大时间戳,config.retentionMs也是上文中提到过的配置【retention.ms】,即topic的保留时间

    deleteRetentionSizeBreachedSegments:这个删除是受segment驱使,但是有个条件retention.bytes>0,但是目前生产环境这个配置是-1,所以生产环境的kafka实际是不会触发超过指定大小后的删除策略的

    deleteLogStartOffsetBreachedSegments:下一个日志分段的起始偏移量 baseOffset 是否小于等于 logStartOffset,若是,则可以删除此日志分段
消息量很少的情况
    知道了上述删除策略后,重点分析一下当某个topic的消息数量很少的情况,即在segment在7天内,都没有因为超过segment的文件上线1G时,该segment则一直是active segment,该topic也只有这唯一一个segment

    假设这个topic的过期时间是15天,第1天产生了1条消息,第7天产生了1条消息,此时不满足shouldRoll条件,不会切分segment,第13天又产生了一条消息,此时还是不满足shouldRoll条件,那什么时候会进行日志切分呢,当下一条消息产生的时间跟上一条消息产生的时间相差超过了7天,此时才会进行日志切分,所以存在一种可能性,对于这种消息产生量很少的日志可能永远不会过期,有点像缓存的续时,一直给续上了。

asyncDeleteSegment

private def asyncDeleteSegment(segment: LogSegment) {
  segment.changeFileSuffixes("", Log.DeletedFileSuffix)
  def deleteSeg() {
    info(s"Deleting segment ${segment.baseOffset}")
    maybeHandleIOException(s"Error while deleting segments for $topicPartition in dir ${dir.getParent}") {
      segment.deleteIfExists()
    }
  }
  scheduler.schedule("delete-file", deleteSeg _, delay = config.fileDeleteDelayMs)
}
    执行segment删除上一个异步操作,首先是同步在.log文件后面加上.deleted的后缀,然后通过定时器scheduler,1分钟延迟后异步删除

.log文件和.index文件

查看文件详情

查看log文件,以__consumer_offset topic为例
/data/app/kafka_2.12-2.6.0/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/__consumer_offsets-49/00000000000000000000.log --print-data-log
在这里插入图片描述

    可以看到00000000000000000000.log的起始offset是19,最后一条offset是20,然后他的创建时间是1658900424154

在这里插入图片描述

    又获取了00000000000000000021.log的文件详情,起始offset是21,创建时间是1659505428096,它与上一个segment的最后一条消息的时间差值是605003942,是大于7天的,所以是符合上述源码分析的,即超过了segment切分时间配置【log.roll.hours】

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/595306.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Flutter分模块开发、模块可单独启动、包含Provider

前言 目前Flutter都是在一个项目中&#xff0c;创建不同目录进行模块开发&#xff0c;我进行Android原生开发时&#xff0c;发现原生端&#xff0c;是可以将每个模块独立运行起来的&#xff0c;灵感来自这&#xff1b; 折腾了几天&#xff0c;终于给整出来了。 1、创建根目录…

Go语言基本语法(三)指针

什么是指针 在Go语言中&#xff0c;"指针是一种存储变量内存地址的数据类型"&#xff0c;意味着指针本身是一个特殊的变量&#xff0c;它的值不是数据本身&#xff0c;而是另一个变量在计算机内存中的位置&#xff08;地址&#xff09;。形象地说&#xff0c;就像存…

multipass和multipassd命令的区别

multipassd通常是multipass服务的后台守护进程&#xff0c;它负责管理和控制虚拟机实例。 命令区别 例&#xff1a; multipass restart my-vm 这个命令用于重启Multipass中的虚拟机实例。例如有一个名为my-vm的虚拟机实例。 multipassd restart 这会重新启动Multipass后台…

一文学会最强大的 node.js 后端框架 nest.js

文章目录 nest cli项目基本结构IOC & DI基础注册值注册时 key 的管理动态注册类工厂函数方式注册设置别名导出 provider 模块功能模块模块的导入导出模块类中使用注入全局模块动态模块 中间件定义中间件注册中间件MiddlewareConsumer 类全局中间件 异常过滤器抛出异常自定义…

华三配置DHCP(基础)

华三交换机配置DHCP&#xff08;基础&#xff09; 1.组网拓扑图&#xff08;交换机-PC&#xff09; 2.通过交换机开启DHCP功能&#xff0c;使PC自动获取192.168.10.0&#xff08;vlan10&#xff09;网段地址 2.使用命令 <H3C>system-view [H3C]vlan 10&#xff08;建立…

Python_4-远程连接Linux

文章目录 使用Python通过SSH自动化Linux主机管理代码执行ls结果&#xff1a;文件传输&#xff1a; 使用Python通过SSH自动化Linux主机管理 在系统管理与自动化运维中&#xff0c;SSH&#xff08;Secure Shell&#xff09;是一个常用的协议&#xff0c;用于安全地访问远程计算机…

【0day】湖南建研工程质量检测系统InstrumentUsageRecordExport接口处存在任意文件读取漏洞

免责声明&#xff1a;文章来源互联网收集整理&#xff0c;请勿利用文章内的相关技术从事非法测试&#xff0c;由于传播、利用此文所提供的信息或者工具而造成的任何直接或者间接的后果及损失&#xff0c;均由使用者本人负责&#xff0c;所产生的一切不良后果与文章作者无关。该…

发那科Fanuc数控网络IP配置设定教程

1.在主面板如图按system键&#xff0c;进入系统界面 2.按右翻页切换键&#xff0c;切换到内嵌选项&#xff0c;按内嵌按钮跳转至设置IP界面&#xff0c;设置ip 3.按Focas2按钮&#xff0c;跳转至设置端口号和超时时间界面。设置端口号和时间之后&#xff0c;重启设备。注意&…

MES生产系统与数字孪生双重结合:智慧制造工厂的新引擎

随着数字化浪潮的推动&#xff0c;制造行业正在经历着前所未有的变革。在这个变革的浪潮中&#xff0c;MES生产制造系统与数字孪生技术的深度融合成为了制造工厂未来发展的核心驱动力。这种结合不仅提升了生产效率&#xff0c;优化了资源配置&#xff0c;降低了运营成本&#x…

2024年 Java 面试八股文——SpringCloud篇

目录 1.Spring Cloud Alibaba 中的 Nacos 是如何进行服务注册和发现的&#xff1f; 2.Spring Cloud Alibaba Sentinel 的流量控制规则有哪些&#xff1f; 3.Spring Cloud Alibaba 中如何实现分布式配置管理&#xff1f; 4.Spring Cloud Alibaba RocketMQ 的主要特点有哪些&…

HCIP的学习(12)

OSPF优化 ​ OSPF的优化主要目的是为了减少LSA的更新量。 路由汇总-----可以减少骨干区域的LSA数量特殊区域-----可以减少非骨干区域的LSA数量 OSPF路由汇总 域间路由汇总-----在ABR设备上进行操作 [GS-R2-ospf-1-area-0.0.0.1]abr-summary 192.168.0.0 255.255.224.0 [GS-…

什么是抖音橱窗?它和抖音小店有什么区别?普通人更适合做哪个?

大家好&#xff0c;我是电商糖果 相信有很多想在抖音卖货的朋友&#xff0c;都会搞不清抖音橱窗是什么&#xff1f; 甚至会把它和抖音小店当成一个项目&#xff0c;也不知道哪个更适合自己。 自己越了解发现越迷糊&#xff0c;有的说不需要直播&#xff0c;粉丝&#xff0c;…

汇智知了堂鸿蒙课程全新升级,权威师资引领AI新纪元

在人工智能飞速发展的今天&#xff0c;汇智知了堂紧跟时代步伐&#xff0c;全面升级鸿蒙课程&#xff0c;以权威师资、实战导向、互动教学、资源支持为核心&#xff0c;为广大学员带来前所未有的学习体验&#xff01; 首先&#xff0c;汇智知了堂鸿蒙课程汇聚了业内知名专家&…

软件设计师-应用技术-数据流图题1

基础知识及技巧&#xff1a; 0. 概念&#xff1a; 在结构化分析中&#xff0c;数据流图用来记录系统中的数据和数据在特定的过程中的流动&#xff0c;即数据如何被采集、处理、保存和使用的(围绕信息系统的功能)。 1. 元素实例&#xff1a; 补充知识&#xff1a;** 外部实体…

sqlx执行案例

SQLx简介 SQLx是Rust语言中的一个异步SQL数据库连接库&#xff0c;它支持多种数据库&#xff0c;如PostgreSQL、MySQL和SQLite。SQLx提供了简单的API和异步执行查询的能力&#xff0c;使得Rust程序员可以轻松地与数据库交互1。 本章节以PostgreSQL为例。 目录结构 cargo.tom…

区块链开发用的是哪种编程语言?

区块链技术作为近年来备受瞩目的新兴技术之一&#xff0c;其核心的特性之一就是去中心化、安全性高、透明度高和可扩展性强。而区块链的开发语言则是实现这一技术的关键因素之一。那么&#xff0c;区块链开发语言是哪一种编程语言呢&#xff1f; 一、区块链开发语言的特点和选…

五一热度最大,销量最高的十大随身WiFi!某东、某宝倾力推荐!2024随身wifi靠谱品牌推荐!随身wifi怎么选?

还在争论谁才是最强的随身WiFi&#xff1f;要我说别再争了&#xff01;直接用事实说话&#xff01;看看五一小长假期间&#xff0c;消费者购买最多、评价最好的十款随身WiFi&#xff01;数据综合了某宝、某东、某多多&#xff0c;绝对真实可靠&#xff01; 第一名&#xff1a;格…

今日详解,教你如何不直播在视频号卖货

大家好&#xff0c;我是电商笨笨熊 视频号作为背靠微信的平台&#xff0c;从不需要考虑自身的流量问题&#xff0c; 因此在视频号推出之后就有大批的主播从其他平台转入视频号&#xff1b; 而这时候很多普通人应该也发现了新的机会&#xff0c;不再去内卷抖音、快手直播&…

Java Jackson-jr 库使用介绍

介绍 Jackson-jr 是一个轻量级的Java JSON 处理库。这个库被设计用来替代 Jackson 的复杂性。对比 Jackson 的复杂 API&#xff0c;Jackson-jr 的启动速度更快&#xff0c;包大小更小。 虽然Jackson databind&#xff08;如ObjectMapper&#xff09;是通用数据绑定的良好选择…

[SaaS]建筑领域的sd应用

AirchiDesignhttp://www.aiarchi.art/#/建筑学长——千万建筑师的资源库和AI绘图创作平台建筑学长官网,为青年设计师建立的线上资源共享及AI绘图创作渲染平台,免费提供海量设计案例、CAD图纸、SU模型、PS素材、软件插件下载,提供丰富的设计软件教学与灵感参考素材图库。https:/…
最新文章