Flink checkpoint 源码分析- Checkpoint snapshot 处理流程

背景

在上一篇博客中我们分析了代码中barrier的是如何流动改的。Flink checkpoint 源码分析- Checkpoint barrier 传递源码分析-CSDN博客

最后跟踪到了代码org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate#handleEvent 

现在我们接着跟踪相应代码,观察是如何算子接受到了barrier是如何进行下一步代码处理的。以及了解flink应对不同的消费语义(At least once, exactly once)对于checkpoint的影响是怎样的。

代码分析

org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate#handleEvent 中我们主要关注对于checkpointBarrier的处理流程。

processBarrier方法实现上就可以看出,flink barrier的处理分成两种。

在这里我们需要跟踪一下barrierHandler 是如何生成的才能知道后面所要走的流程是哪一步。

通过往上追溯barrierHandler的生成,我们跟踪到方法:org.apache.flink.streaming.runtime.io.checkpointing.InputProcessorUtil#createCheckpointBarrierHandler 从代码中我们可以看到 如果是 EXACTLY_ONCE 那么生成的就SingleCheckpointBarrierHandler, 如果checkpoint 模式是AT_LEAST_ONCE, 生成对应的handler就是CheckpointBarrierTracker。 但是从代码中,EXACTLY_ONCE似乎不是简单的new 一个SingleCheckpointBarrierHandler, 而是通过一个方法来生成。因此需要进一步的观察这个方法是如何实现的。

org.apache.flink.streaming.runtime.io.checkpointing.InputProcessorUtil#createBarrierHandler

这里针对checkpoint类型做了区分,主要是分为aligned checkpoint 和 unaliged checkpoint的差异。这里可以进一步观察一下这两类checkpoint之前的差异。

对比这两个方法参数的差异,发现主要就是两处处参数有差异。subTaskCheckpointCoordinator、barrierHandlerState。这两个的差异主要体现在flink 在aligned checkpoint超时时,会切换为unaligned checkpoint。这里可以先按下不表,回到最开始的处理历程。

总结一下就是如果是flink 设置了at least once是使用的是CheckpointBarrierTracker, 当flink模式为exactly once时是SingleCheckpointBarrierHandler。 当为exactly once时checkpoint 类型又可以分为是aligned checkpoint还是unaligned checkpoint。

At least once 下 barrier是如何处理的

at least once 下对于barrier的处理是在以下的方法中实现的。

org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierTracker#processBarrier

public void processBarrier(CheckpointBarrier receivedBarrier, InputChannelInfo channelInfo) throws IOException {
		final long barrierId = receivedBarrier.getId();

		// fast path for single channel trackers
		if (totalNumberOfInputChannels == 1) {
			markAlignmentStartAndEnd(receivedBarrier.getTimestamp());
			notifyCheckpoint(receivedBarrier);
			return;
		}

		// general path for multiple input channels
		if (LOG.isDebugEnabled()) {
			LOG.debug("Received barrier for checkpoint {} from channel {}", barrierId, channelInfo);
		}

		// find the checkpoint barrier in the queue of pending barriers
		CheckpointBarrierCount barrierCount = null;
		int pos = 0;

		for (CheckpointBarrierCount next : pendingCheckpoints) {
			if (next.checkpointId == barrierId) {
				barrierCount = next;
				break;
			}
			pos++;
		}

		if (barrierCount != null) {
			// add one to the count to that barrier and check for completion
			int numBarriersNew = barrierCount.incrementBarrierCount();
			if (numBarriersNew == totalNumberOfInputChannels) {
				// checkpoint can be triggered (or is aborted and all barriers have been seen)
				// first, remove this checkpoint and all all prior pending
				// checkpoints (which are now subsumed)
				for (int i = 0; i <= pos; i++) {
					pendingCheckpoints.pollFirst();
				}

				// notify the listener
				if (!barrierCount.isAborted()) {
					if (LOG.isDebugEnabled()) {
						LOG.debug("Received all barriers for checkpoint {}", barrierId);
					}
					markAlignmentEnd();
					notifyCheckpoint(receivedBarrier);
				}
			}
		}
		else {
			// first barrier for that checkpoint ID
			// add it only if it is newer than the latest checkpoint.
			// if it is not newer than the latest checkpoint ID, then there cannot be a
			// successful checkpoint for that ID anyways
			if (barrierId > latestPendingCheckpointID) {
				markAlignmentStart(receivedBarrier.getTimestamp());
				latestPendingCheckpointID = barrierId;
				pendingCheckpoints.addLast(new CheckpointBarrierCount(barrierId));

				// make sure we do not track too many checkpoints
				if (pendingCheckpoints.size() > MAX_CHECKPOINTS_TO_TRACK) {
					pendingCheckpoints.pollFirst();
				}
			}
		}
	}

如果只有一个inputchannel的情况下,在收到这一个barrier的时候,就可以做snapshot.

在这个中间会经过triggerCheckpointOnBarrier 等方法, 最后实际还是调到了org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl#checkpointState ,看到这里其实这很长的链路实际是一个循环,下一个算子会生成barrier,接着传递这个barrier。

实际情况是作业并行度不唯一,一个subtask往往是有多个inputchannel. 可以继续看看是如何处理的。

这里面当收取到第一个barrier,会将这个barrier信息存在个队列中。

当收取到时非第一个barrier的时候会进行计数,当收取到的是最后一个barrier的时候就会将barrier队列中在这个barrier之前的barrier全部清除,之后就可以通知做checkpoint snapshot, 这个流程就和之前的一个信道的checkpoint流程是一致的。

总结而言:at least 类型的checkpoint是在收到最后一个barrier的时候开始做snapshot的。

Exactly once checkpoint是如何处理的

首先看这一段的代码

@Override
	public void processBarrier(CheckpointBarrier barrier, InputChannelInfo channelInfo) throws IOException {
		long barrierId = barrier.getId();
		LOG.debug("{}: Received barrier from channel {} @ {}.", taskName, channelInfo, barrierId);

        if (currentCheckpointId > barrierId
                || (currentCheckpointId == barrierId && !isCheckpointPending())) {
            if (!barrier.getCheckpointOptions().isUnalignedCheckpoint()) {
                inputs[channelInfo.getGateIdx()].resumeConsumption(channelInfo);
            }
			return;
		}

        checkNewCheckpoint(barrier);
        checkState(currentCheckpointId == barrierId);

        if (numBarriersReceived++ == 0) {
            if (getNumOpenChannels() == 1) {
                markAlignmentStartAndEnd(barrier.getTimestamp());
            } else {
                markAlignmentStart(barrier.getTimestamp());
            }
		}

        // we must mark alignment end before calling currentState.barrierReceived which might
        // trigger a checkpoint with unfinished future for alignment duration
        if (numBarriersReceived == numOpenChannels) {
            if (getNumOpenChannels() > 1) {
                markAlignmentEnd();
            }
        }

        try {
            currentState = currentState.barrierReceived(context, channelInfo, barrier);
        } catch (CheckpointException e) {
            abortInternal(barrier.getId(), e);
        } catch (Exception e) {
            ExceptionUtils.rethrowIOException(e);
        }

        if (numBarriersReceived == numOpenChannels) {
			numBarriersReceived = 0;
			lastCancelledOrCompletedCheckpointId = currentCheckpointId;
			LOG.debug(
				"{}: Received all barriers for checkpoint {}.", taskName, currentCheckpointId);
			resetAlignmentTimer();
			allBarriersReceivedFuture.complete(null);
			}
		}

这里需要关注一下currentState, 在最开始我们看了他的构造函数AlternatingWaitingForFirstBarrier, 因此可以可以看这个方法具体是现实。

这里可以看到这里会block 住收到barrier的信道,如果barrier 都收齐了,之后会检查是不是unaligned的checkpoint, 如果不是可以直接做一次checkpoint。这个checkpoint和之前的流程是一致的。

这里的下一个分支是超时转化,比如设置为30s,前30s是做aligned checkpoint, 如果30s还没有完成,就会转化为unaligned checkpoint。 当然,你如果不想有超时时间,可以直接设置为0.

如果是unaligned checkpoint, 会将channel 里面的数据也写会到远端。

这个中间会有一些状态转化,每次barrier的到达都会触发不同的状态变化。其中我们看到对于uc来说,uc的第一个barrier到达了,就会触发一次global checkpoint。org.apache.flink.streaming.runtime.io.checkpointing.AlternatingWaitingForFirstBarrierUnaligned#barrierReceived

org.apache.flink.streaming.runtime.io.checkpointing.AlternatingCollectingBarriersUnaligned#barrierReceived

最后如果收到所有的barrier之后会finished checkpoint。状态恢复到原位。

总结一下:在exactly once的语义下,aligned checkpoint的做法是,收到一个barrier的时候会将对应的channel block住。当收到最后一个barrier的时候再做一次checkpoint。

unaligned的做法是,收到barrier的时候,第一步就会触发一次checkpoint, 之后会不断上传channel state, 当收到最后一个barrier则表示checkpoint结束。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/603141.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

投资线上黄金是否属于外汇交易?探究黄金与外汇市场的关系

在金融市场中&#xff0c;线上黄金投资和外汇交易都是备受关注的领域。许多人可能会混淆这两者&#xff0c;认为投资线上黄金也是一种外汇交易。但实际上&#xff0c;尽管线上黄金和外汇交易有一些相似之处&#xff0c;但它们在本质上是不同的投资领域。本文将探讨投资线上黄金…

前端 Android App 上架详细流程 (Android App)

1、准备上架所需要的材料 先在需要上架的官方网站注册账号。提前把手机号&#xff0c;名字&#xff0c;身份证等等材料准备好&#xff0c;完成开发者实名认证&#xff1b;软著是必要的&#xff0c;提前准备好&#xff0c;软著申请时间比较长大概需要1-2周时间才能下来&#xf…

流畅的python-学习笔记_序列修改+散列+切片

vector第一版 reprlib.repr用于选取有限长度较长变量 vector第二版切片 注意切片还有indices属性&#xff0c;它可以入参一个序列长度&#xff0c;根据此序列长度&#xff0c;转化不规矩的start stop stride&#xff0c; vector第三版动态存取属性 obj.attra时&#xff0c;先…

构建 imx6ull sd 卡启动

1. 硬件环境 imx6ull 256MB tf 卡 512 MB 的 ddr&#xff1b; ubuntu 20.04&#xff1b; 芯片默认的启动方式是通过 LCD_DATA0 ~ LCD_DATA23&#xff1b;上下拉方式来确认的&#xff1b; 需要注意的上下拉是 BOOT_CFG1[7] BOOT_CFG1[6] BOOT_CFG1[5] 启动选择 和 BOOT_CF…

leetcode-矩阵最长递增路径-102

题目要求 思路 1.通过双循环去把每一个结点作为起始点进行统计&#xff0c;将返回的路径长度存放在res中&#xff0c;取最大的res的长度。 2.递归中需要的几个值&#xff0c;x和y当前结点的坐标&#xff0c;pre用于存储上一个结点的元素值&#xff0c;因为要求是路径上的元素是…

3D 交互展示该怎么做?

在博维数孪&#xff08;Bowell&#xff09;平台制作3D交互展示的流程相对简单&#xff0c;主要分为以下几个步骤&#xff1a; 1、准备3D模型&#xff1a;首先&#xff0c;你需要有一个3D模型。如果你有3D建模的经验&#xff0c;可以使用3ds Max或Blender等软件自行创建。如果没…

前后端分离项目中的一些疑惑

1、前后端分离项目&#xff0c;浏览器发起请求后&#xff0c;请求的是前端服务器还是后端服务器&#xff1f; 在前后端分离的项目中&#xff0c;当浏览器发起请求时&#xff0c;它首先会请求的是前端服务器。 前后端分离的工作流程大致如下&#xff1a; 用户在浏览器中输入网…

Echarts散点图的29个配置项,散点图可以随心所欲啦。

1-9 当使用 ECharts 绘制散点图时&#xff0c;可以配置以下一些常用的选项&#xff1a; 1. tooltip&#xff1a;配置提示框组件&#xff0c;用于显示鼠标悬停在散点上时的提示信息。 2. legend&#xff1a;配置图例组件&#xff0c;用于展示不同散点的标识和名称。 3. xAxis…

图数据库 之 Neo4j 与 AI 大模型的结合绘制知识图谱

引言 随着信息时代的到来&#xff0c;海量的文本数据成为了我们获取知识的重要来源。然而&#xff0c;如何从这些文本数据中提取出有用的信息&#xff0c;并将其以可视化的方式展示出来&#xff0c;一直是一个具有挑战性的问题。近年来&#xff0c;随着人工智能技术的发展&…

rust使用serde_json转换Value为rust中的数据类型

为了方便转换未知json数据&#xff0c;我们可以使用serde提供的value类型来进行转换&#xff0c;将json字符串转化为Value值&#xff0c;然后可以快速使用get方法来获取值&#xff1a; let json_str r#"{"name": "John","age": 30,"c…

科技控必看!让你轻松成为机器人领域达人

科技控们注意了&#xff01;你是不是经常对机器人技术充满无限的好奇&#xff0c;却又因为缺乏合适的渠道而难以深入了解和亲身体验呢&#xff1f;别担心&#xff0c;BFT机器人&#xff0c;正是你探索机器人世界的绝佳之地&#xff01; 在这里&#xff0c;你将发现一个充满惊喜…

政安晨:【Keras机器学习示例演绎】(三十九)—— 使用 FNet 进行文本分类

目录 简介 模型 设置 加载数据集 对数据进行标记 格式化数据集 建立模型 训练我们的模型 与变换器模型比较 政安晨的个人主页&#xff1a;政安晨 欢迎 &#x1f44d;点赞✍评论⭐收藏 收录专栏: TensorFlow与Keras机器学习实战 希望政安晨的博客能够对您有所裨益&…

Linux学习之高级IO

之前的内容我们基本掌握了基础IO&#xff0c;如套接字&#xff0c;文件描述符&#xff0c;重定向&#xff0c;缓冲区等知识都是文的基本认识&#xff0c;而高级IO则是指更加高效的IO。 对于应用层&#xff0c;在读写的时候&#xff0c;本质就是把数据写给OS&#xff0c;若一方…

1W 3KVDC 隔离 单输出 DC/DC 电源模块 ——TPF 系列

TPF系列提供输出稳压&#xff0c;精度高&#xff0c;对于输出电压有要求的场合特别适合&#xff0c;工业级环境温度&#xff0c;用于PCB安装的国际标准结构。此系列产品小巧&#xff0c;效率高&#xff0c;低输出纹波及提供3000V以上的直流电压隔离&#xff0c;封装有SIP和DIP可…

实测幻方新出的超强AI大模型,中文能力对比GPT4.0不落下风

目前从网上的消息来看&#xff0c;DeepSeek中文综合能力&#xff08;AlignBench&#xff09;开源模型中最强&#xff0c;与GPT-4-Turbo&#xff0c;文心4.0等闭源模型在评测中处于同一梯队。 话不多说&#xff0c;我们开测&#xff01; 1.首先我们来让他直接来一段逻辑推理【并…

Jetpack Compose三:主题和基础控件的使用

设置主题 与Android View的主题定义方式不同&#xff0c;Jetpack Compose中的主题由许多较低级别的结构体和相关API组成&#xff0c;它们包括颜色、排版和形状属性。 Theme.kt控制工程的主题&#xff0c;它是一个可组合的Compose函数 最后主题函数ComposeStudyTheme的相关设置…

安装Nox夜神模拟器关闭了HyperV后Docker运行不了怎么办?

1.背景 为了模拟真机&#xff0c;尝试安装了Nox夜神模拟器&#xff0c; 安装过程要求关闭Hyper-V。当时只是在程序安装卸载中关闭了系统服务。以为到时勾选上就好了。操作路径&#xff1a;控制面板\所有控制面板项\程序和功能\启用或关闭Windows功能\Hyper-V。 后来卸载掉了夜神…

D盘被格式化了能找回吗 d盘格式化了数据可以找回来吗

D盘作为电脑中重要的磁盘之一&#xff0c;很多用户都会将一些重要的数据保存在D盘。但在磁盘空间不足的情况下&#xff0c;或许有些用户会将其进行格式化&#xff0c;D盘被格式化了如何恢复数据&#xff1f; 如果是比较重要的数据&#xff0c;建议用户立即进行数据恢复操作&am…

Java-异常处理-定义三角形类Triangle和异常三角形IllegalTriangleException类 (1/2)

任意一个三角形&#xff0c;其任意两边之和大于第三边。当三角形的三条边不满足前述条件时&#xff0c;就表示发生了异常&#xff0c;将这种异常情况定义为IllegalTriangleException类。 自定义异常类IllegalTriangleException&#xff1a; 当三角形的三条边不满足条件&#x…

数据丢失不慌张,手机数据恢复一键解决!

如今手机已经成为我们生活中不可或缺的一部分。无论是工作、学习还是娱乐&#xff0c;手机都扮演着重要的角色。随着使用时间的增加&#xff0c;手机数据丢失的问题也时常发生。那么手机数据恢复有哪些方法呢&#xff1f;面对这种情况&#xff0c;先不要慌张&#xff0c;本文将…
最新文章