Checkpoint 执行机制原理解析

在介绍Checkpoint的执行机制前,我们需要了解一下state的存储,因为stateCheckpoint进行持久化备份的主要角色。Checkpoint作为Flink最基础也是最关键的容错机制,Checkpoint快照机制很好地保证了Flink应用从异常状态恢复后的数据准确性。同时 Checkpoint相关的metrics(指标)也是诊断Flink应用健康状态最为重要的指标,成功且耗时较短的Checkpoint表明作业运行状况良好,没有异常或反压。然而,由于Checkpoint与反压的耦合,反压反过来也会作用于Checkpoint,导致Checkpoint的种种问题。Flink1.11引入Unaligned(未对齐)Checkpoint来解耦Checkpoint机制与反压机制,优化高反压情况下的Checkpoint表现。

Statebackend 的分类

下图阐释了目前Flink内置的三类state backend,其中MemoryStateBackendFsStateBackend在运行时都是存储在java heap中的,只有在执行Checkpoint时,FsStateBackend才会将数据以文件格式持久化到远程存储上。 而RocksDBStateBackend则借用了 RocksDB(内存磁盘混合的LSM DB)对state进行存储。
[点击并拖拽以移动] ​
对于在这里插入图片描述
HeapKeyedStateBackend,有两种实现:
【1】支持异步Checkpoint(默认): 存储格式CopyOnWriteStateMap
【2】仅支持同步Checkpoint 存储格式NestedStateMap

特别在MemoryStateBackend内使用HeapKeyedStateBackend时,Checkpoint序列化数据阶段默认有最大5 MB数据的限制。对于 RocksDBKeyedStateBackend,每个state都存储在一个单独的column family内,其中keyGroupKeyNamespace进行序列化存储在 DB作为key
[点击并拖拽以移动] ​

Checkpoint 执行机制详解

Checkpoint的执行流程逐步拆解进行讲解,下图左侧是Checkpoint Coordinator,是整个Checkpoint的发起者,中间是由两个 source,一个sink组成的Flink作业,最右侧的是持久化存储,在大部分用户场景中对应HDFS
[点击并拖拽以移动] ​

【1】Checkpoint Coordinator向所有source节点触发trigger Checkpoint
【2】source节点向下游广播barrier(分界线),这个barrier就是实现Chandy-Lamport分布式快照算法的核心,下游的task只有收到所有inputbarrier才会执行相应的Checkpoint

Chandy-Lamport算法将分布式系统抽象成DAG(暂时不考虑有闭环的图),节点表示进程,边表示两个进程间通信的管道。分布式快照的目的是记录下整个系统的状态,即可以分为节点的状态(进程的状态)和边的状态(信道的状态,即传输中的数据)。因为系统状态是由输入的消息序列驱动变化的,我们可以将输入的消息序列分为多个较短的子序列,图的每个节点或边先后处理完某个子序列后,都会进入同一个稳定的全局统状态。利用这个特性,系统的进程和信道在子序列的边界点分别进行本地快照,即使各部分的快照时间点不同,最终也可以组合成一个有意义的全局快照。
[点击并拖拽以移动] ​

从实现上看,Flink通过在DAG数据源定时向数据流注入名为Barrier的特殊元素,将连续的数据流切分为多个有限序列,对应多个 Checkpoint周期。每当接收到Barrier,算子进行本地的Checkpoint快照,并在完成后异步上传本地快照,同时将Barrier以广播方式发送至下游。当某个Checkpoint的所有Barrier到达DAG末端且所有算子完成快照,则标志着全局快照的成功。

[点击并拖拽以移动] ​

【3】当task完成state备份后,会将备份数据的地址state handle通知给Checkpoint coordinator
[点击并拖拽以移动] ​

【4】下游的sink节点收集齐上游两个inputbarrier之后,会执行本地快照,这里特地展示了RocksDB incremental(增量) Checkpoint的流程,首先RocksDB会全量刷数据到磁盘上(红色大三角表示),然后Flink框架会从中选择没有上传的文件进行持久化备份(紫色小三角)。
[点击并拖拽以移动] ​

【5】同样的,sink节点在完成自己的Checkpoint之后,会将state handle返回通知Checkpoint Coordinator
[点击并拖拽以移动] ​

【6】最后,当Checkpoint coordinator收集齐所有taskstate handle,就认为这一次的Checkpoint全局完成了,向持久化存储中再备份一个Checkpoint meta文件。
[点击并拖拽以移动] ​

Checkpoint 的 EXACTLY_ONCE 语义

EXACTLY ONCE语义: 在有多个输入Channel的时候,为了数据准确性,算子会等待所有流的Barrier都到达之后才会开始本地的快照,这种机制被称为Barrier对齐。在对齐的过程中,算子只会继续处理的来自未出现Barrier Channel的数据,而其余Channel的数据会被写入输入队列(Flink通过一个input buffer将在对齐阶段收到的数据缓存起来),直至在队列满后被阻塞。当所有Barrier到达后(对齐),算子进行本地快照,输出 Barrier 到下游并恢复正常处理。
比起其他分布式快照,该算法的优势在于辅以Copy-On-Write技术的情况下不需要Stop The World影响应用吞吐量,同时基本不用持久化处理中的数据,只用保存进程的状态信息,大大减小了快照的大小。

AT LEAST ONCE语义: 无需缓存收集到的数据,会对后续直接处理,所以导致restore(恢复)时,数据可能会被多次处理。下图是官网文档里面就Checkpoint align的示意图:
[点击并拖拽以移动] ​

需要特别注意的是,FlinkCheckpoint机制只能保证Flink的计算过程可以做到EXACTLY ONCE,端到端的EXACTLY ONCE需要 sourcesink支持。

Checkpoint 与反压的耦合

目前的Checkpoint算法在大多数情况下运行良好,然而当作业出现反压时,阻塞式的Barrier对齐反而会加剧作业的反压,甚至导致作业的不稳定。

首先, Chandy-Lamport分布式快照的结束依赖于Marker的流动,而反压则会限制Marker的流动,导致快照的完成时间变长甚至超时。无论是哪种情况,都会导致Checkpoint的时间点落后于实际数据流较多。这时作业的计算进度是没有被持久化的,处于一个比较脆弱的状态,如果作业出于异常被动重启或者被用户主动重启,作业会回滚丢失一定的进度。如果Checkpoint连续超时且没有很好的监控,回滚丢失的进度可能高达一天以上,对于实时业务这通常是不可接受的。更糟糕的是,回滚后的作业落后的Lag更大,通常带来更大的反压,形成一个恶性循环。

其次,Barrier对齐本身可能成为一个反压的源头,影响上游算子的效率,而这在某些情况下是不必要的。比如典型的情况是一个的作业读取多个Source,分别进行不同的聚合计算,然后将计算完的结果分别写入不同的Sink。通常来说,这些不同的Sink会复用公共的算子以减少重复计算,但并不希望不同Source间相互影响。
[点击并拖拽以移动] ​

假设一个作业要分别统计AB两个业务线的以天为粒度指标,同时还需要统计所有业务线以周为单位的指标,拓扑如上图所示。如果B业务线某天的业务量突涨,使得Checkpoint Barrier有延迟,那么会导致公用的Window Aggregate进行Barrier对齐,进而阻塞业务AFlatMap,最终令业务A的计算也出现延迟。
当然这种情况可以通过拆分作业等方式优化,但难免引入更多开发维护成本,而且更重要的是这本来就符合Flink用户常规的开发思路,应该在框架内尽量减小出现用户意料之外的行为的可能性。

Unaligned Checkpoint

为了解决这个问题,Flink1.11版本引入了Unaligned Checkpoint的特性。要理解Unaligned Checkpoint的原理,首先需要了解 Chandy-Lamport论文中对于Marker处理规则的描述:自行百度翻译

Marker-Sending Rule for a Process p. For each channel c, incident on, and
directed away from p:
p sends one marker along c after p records its state and before p sends further messages
along c.
    Marker-Receiving Rule for a Process q. On receiving a marker along a channel
C:
if q has not recorded its state then
    begin q records its state;
          q records the state c as the empty sequence
    end
else q records the state of c as the sequence of messages received along c after q’s state
was recorded and before q received the marker along c.

其中关键是if q has not recorded its state,也就是接收到Marker时算子是否已经进行过本地快照。一直以来FlinkAligned Checkpoint通过Barrier对齐,将本地快照延迟至所有Barrier到达,因而这个条件是永真的,从而巧妙地避免了对算子输入队列的状态进行快照,但代价是比较不可控的 Checkpoint时长和吞吐量的降低 。实际上这和Chandy-Lamport算法是有一定出入的。举个例子,假设我们对两个数据流进行equal-join,输出匹配上的元素。按照Flink Aligned Checkpoint的方式,系统的状态变化如下(图中不同颜色的元素代表属于不同的Checkpoint周期):
[点击并拖拽以移动] ​

图 a: 输入Channel 1存在3个元素,其中2Barrier前面;Channel 2存在4个元素,其中297Barrier前面。
图 b: 算子分别读取Channel一个元素,输出2。随后接收到Channel 1Barrier,停止处理Channel 1后续的数据,只处理 Channel 2的数据。
图 c: 算子再消费2个自Channel 2的元素,接收到Barrier,开始本地快照并输出Barrier

对于相同的情况,Chandy-Lamport算法的状态变化如下:
[点击并拖拽以移动] ​

图 a: 输入Channel 1存在3个元素,其中2Barrier前面;Channel 2存在4个元素,其中297Barrier前面。
图 b: 算子分别处理两个Channel一个元素,输出结果2。此后接收到Channel 1Barrier,算子开始本地快照记录自己的状态,并输出Barrier
图 c: 算子继续正常处理两个Channel的输入,输出9。特别的地方是Channel 2后续元素会被保存下来,直到Channel 2Barrier出现(即Channel 297)。保存的数据会作为Channel的状态成为快照的一部分。

两者的差异主要可以总结为两点:
快照的触发是在接收到第一个Barrier时还是在接收到最后一个Barrier时。
是否需要阻塞已经接收到BarrierChannel的计算。

从这两点来看,新的 Unaligned Checkpoint将快照的触发改为第一个Barrier且取消阻塞Channel的计算 ,算法上与Chandy-Lamport基本一致,同时在实现细节方面结合Flink的定位做了几个改进。
首先,不同于 Chandy-Lamport模型的只需要考虑算子输入Channel的状态,Flink的算子有输入和输出两种Channel ,在快照时两者的状态都需要被考虑。其次,无论在Chandy-Lamport还是Flink Aligned Checkpoint算法中,Barrier都必须遵循其在数据流中的位置,算子需要等待Barrier被实际处理才开始快照。而Unaligned Checkpoint改变了这个设定,允许算子优先摄入并优先输出Barrier。如此一来,第一个到达Barrier会在算子的缓存数据队列(包括输入Channel和输出Channel)中往前跳跃一段距离,而被”插队”的数据和其他输入Channel在其Barrier之前的数据会被写入快照中。
[点击并拖拽以移动] ​

这样的主要好处是,如果本身算子的处理就是瓶颈Chandy-LamportBarrier仍会被阻塞,但Unaligned Checkpoint则可以在 Barrier进入输入Channel就马上开始快照。这可以从很大程度上加快Barrier流经整个DAG的速度,从而降低Checkpoint整体时长。回到之前的例子,用Unaligned Checkpoint来实现,状态变化如下:
[点击并拖拽以移动] ​

图 a: 输入Channel 1存在3个元素,其中2Barrier前面;Channel 2存在4个元素,其中297Barrier前面。输出 Channel已存在结果数据1
图 b: 算子优先处理输入Channel 1Barrier,开始本地快照记录自己的状态,并将Barrier插到输出Channel末端。
图 c: 算子继续正常处理两个Channel的输入,输出29。同时算子会将Barrier越过的数据(即输入Channel 12和输出 Channel1)写入Checkpoint,并将输入Channel 2后续早于Barrier的数据(即 297)持续写入Checkpoint

比起Aligned Checkpoint中不同Checkpoint周期的数据以算子快照为界限分隔得很清晰,Unaligned Checkpoint进行快照和输出Barrier时,部分本属于当前Checkpoint的输入数据还未计算(因此未反映到当前算子状态中),而部分属于当前Checkpoint的输出数据却落到Barrier之后(因此未反映到下游算子的状态中)。

这也正是 Unaligned的含义: 不同Checkpoint周期的数据没有对齐,包括不同输入Channel之间的不对齐,以及输入和输出间的不对齐。而这部分不对齐的数据会被快照记录下来,以在恢复状态时重放。换句话说,从Checkpoint恢复时,不对齐的数据并不能由Source端重放的数据计算得出,同时也没有反映到算子状态中,但因为它们会被Checkpoint恢复到对应Channel中,所以依然能提供只计算一次的准确结果。

当然,Unaligned Checkpoint并不是百分百优于Aligned Checkpoint,它会带来的已知问题就有:
【1】由于要持久化缓存数据,State Size会有比较大的增长,磁盘负载会加重。
【2】随着State Size增长,作业恢复时间可能增长,运维管理难度增加。

目前看来,Unaligned Checkpoint更适合容易产生高反压同时又比较重要的复杂作业。对于像数据ETL同步等简单作业,更轻量级的 Aligned Checkpoint显然是更好的选择。

总结:Flink 1.11Unaligned Checkpoint主要解决在高反压情况下作业难以完成Checkpoint的问题,同时它以磁盘资源为代价,避免了Checkpoint可能带来的阻塞,有利于提升Flink的资源利用率。随着流计算的普及,未来的Flink应用大概会越来越复杂,在未来经过实战打磨完善后Unaligned Checkpoint很有可能会取代Aligned Checkpoint成为Flink的默认Checkpoint策略。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/272584.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

搞懂SkyWalking(40张图)

前言 在微服务架构中,一次请求往往涉及到多个模块,多个中间件,多台机器的相互协作才能完成。这一系列调用请求中,有些是串行的,有些是并行的,那么如何确定这个请求背后调用了哪些应用,哪些模块…

Python 高级(三):多线程 threading

大家好,我是水滴~~ 在Python中,threading模块提供了一种简单而强大的方式来进行多线程编程。多线程可以同时执行多个任务,使程序能够更有效地利用计算资源。本教程将介绍threading模块的基本概念、用法和一些常见的多线程编程模式。 文章中…

【JAVA】黑马MybatisPlus 学习笔记【终】【插件功能】

4.插件功能 MybatisPlus提供了很多的插件功能,进一步拓展其功能。目前已有的插件有: PaginationInnerInterceptor:自动分页TenantLineInnerInterceptor:多租户DynamicTableNameInnerInterceptor:动态表名OptimisticL…

Python 新规范 pyproject.toml 完全解析

多谢:thank Python从PEP 518开始引入的使用pyproject.toml管理项目元数据的方案。 该规范目前已经在很多开源项目中得以支持: Django 这个 Python 生态的顶级项目在 5 个月之前开始使用 pyproject.tomlPytest 这个 Python 生态测试框架的领头羊在 4 个…

HarmonyOS4.0系统性深入开发04UIAbility组件详解(下)

UIAbility组件间交互(设备内) UIAbility是系统调度的最小单元。在设备内的功能模块之间跳转时,会涉及到启动特定的UIAbility,该UIAbility可以是应用内的其他UIAbility,也可以是其他应用的UIAbility(例如启…

java练习题之接口interface练习

1:关于接口和抽象类,下列说法正确的是(ACD) A.抽象类可以有构造方法,接口没有构造方法 B.抽象类可以有属性,接口没有属性 C.抽象类可以有非抽象方法,接口中都是抽象方法 1.8之后 D.抽象类和接…

比亚迪重磅来袭,汽车圈又要大动干戈?

12月15日,我盼望已久的新车————宋L正式登场! 作为一直关注比亚迪的车主,这款新SUV一直处于我的观测范围内。终于在前几日,比亚迪宣布它将于12月15日上市,这对我来说无疑是个好消息。当我了解到宋L将推出后驱和四驱…

【Unity6.0+AI】Unity版的Pytorch之Sentis-把大模型植入Unity

本教程详细讲解什么Sentis。以及恶补一些人工智能神经网络的基础概念,概述了基本流程,加载模型、输入内容到模型、使用GPU让模型推理数据、输出数据。 官方文档 Unity Sentis: Use AI models in Unity Runtime | Unity 主页介绍 官方文档链接:Sentis overview | Sentis | 1…

挑战Python100题(6)

100+ Python challenging programming exercises 6 Question 51 Define a class named American and its subclass NewYorker. Hints: Use class Subclass(ParentClass) to define a subclass. 定义一个名为American的类及其子类NewYorker。 提示:使用class Subclass(Paren…

Scala安装

Scala安装使用 windows安装,配置环境变量 以下载Scala2.11为例,操作在Windows中安装Scala。 官网下载scala2.11:All Available Versions | The Scala Programming Language下载好后安装。双击msi包安装,记住安装的路径。配置环境变量(和配…

YOLOv7训练数据报错

YOLOv7训练数据报错 错误提示1解决方案问题2解决方案成功运行 错误提示1 fatal: not a git repository (or any of the parent directories): .git Traceback (most recent call last):File "/home/ubuntu/code/yolov7-main/utils/google_utils.py", line 26, in att…

一图梳理多模态领域发展简史

本文采用关系图的方式整理了:CLiP、ViLT、FLiP、ALBEF、CoCa、BLiP、VLMo、BEiT 和BEiT v3等近几年来经典的多模态大模型,以及这些工作的相互依托关系。灵感来自:跟李沐读论文系列——多模态串讲 1 模型关系图: ’ 2 各模型的架…

Power BI 学习

数据获取 数据清洗 对导入的数据进行数据整理的过程一般称为「数据清洗」,之所以称之为清洗,是因为在数据分析师眼中,杂乱的数据就是脏数据,只有被清洗成干净的数据后才可以进行分析使用。 数据丰富 操作 1.复制列 点击列名选…

Spring之提前编译:AOT

学习的最大理由是想摆脱平庸,早一天就多一份人生的精彩;迟一天就多一天平庸的困扰。各位小伙伴,如果您: 想系统/深入学习某技术知识点… 一个人摸索学习很难坚持,想组团高效学习… 想写博客但无从下手,急需…

讯飞星火认知大模型智能语音交互调用

随着国内外大模型热度的兴起,依托于大模型的智能化,传统的人机交互已经不能满足人们交互的需求。而结合语音和大模型的交互拜托传统互联网获取知识的文字限制,用语音也可以轻松获取想要的知识和思路。 一、大模型智能语音交互调用实现思路 …

k8s集群通过helm部署skywalking

1、安装helm 下载脚本安装 ~# curl -fsSL -o get_helm.sh https://raw.githubusercontent.com/helm/helm/master/scripts/get-helm-3 ~# chmod 700 get_helm.sh ~# ./get_helm.sh或者下载包进行安装 ~# wget https://get.helm.sh/helm-canary-linux-amd64.tar.gz ~# mv helm …

智能监测/检测系统EasyCVR国标接入无法播放是什么原因?该如何解决?

安防视频监控/视频集中存储/云存储/磁盘阵列EasyCVR平台可拓展性强、视频能力灵活、部署轻快,可支持的主流标准协议有国标GB28181、RTSP/Onvif、RTMP等,以及支持厂家私有协议与SDK接入,包括海康Ehome、海大宇等设备的SDK等。平台既具备传统安…

acwing linux docker教程

终章 听着名字还挺伤感的哈哈哈其实是Linux的终章,感谢大家这段时间的阅读,这段使时间我为了给大家清楚的讲解一下Linux自己也是很认真的学习了一遍,自己提升了也不少。其实最近学校里面是讲了Linux和windows server 2019搭载DNS、web、ftp服…

YOLOv5改进 | 主干篇 | ShuffleNetV2轻量化网络助力FPS提高(附代码+修改教程)

一、本文内容 本文给大家带来的改进内容是ShuffleNetV2,这是一种为移动设备设计的高效CNN架构。其在ShuffleNetV1的基础上强调除了FLOPs之外,还应考虑速度、内存访问成本和平台特性。(我在YOLOv5n上修改该主干降低了GFLOPs,但是参数量还是有一定上涨&am…

『JavaScript』全面掌握JavaScript数组的操作、方法与高级技巧

📣读完这篇文章里你能收获到 学习JavaScript中数组的基本操作掌握JavaScript数组的多种内置方法了解JavaScript中的数组扩展运算符、Array.from()和Array.of()等实用技巧熟悉如何在JavaScript中使用数组方法进行数据处理 文章目录 一、基本操作1. 创建数组2. 访问和…