Flink 物理执行图

文章目录

  • 物理执行图
  • 一、Task
  • 二、ResultPartition
  • 三、ResultSubpartition
  • 四、InputGate
  • 五、InputChannel


物理执行图

JobManager根据ExecutionGraph对作业进行调度,并在各个TaskManager上部署任务。这些任务在TaskManager上的实际执行过程就形成了物理执行图。物理执行图并不是一个具体的数据结构,而是描述了流处理任务在集群中的实际执行情况。
它包含的主要抽象概念有:Task、ResultPartition、ResultSubpartition、InputGate、InputChannel。
在这里插入图片描述


一、Task

Execution被调度后在分配的 TaskManager 中启动对应的 Task。Task 包裹了具有用户执行逻辑的 operator。
一个作业可以被划分为多个Task,并在不同的Task上并行执行。每个Task由一个或多个子任务(Subtask)组成,每个子任务在一个TaskSlot中运行。Task主要负责接收输入数据,执行数据转换和计算,并将结果发送到下游的算子中。

在Flink中,Task的执行由TaskExecutor来负责。Task.doRun()方法是引导Task初始化并执行其相关代码的核心方法。它会构造并实例化Task的可执行对象,即AbstractInvokable。AbstractInvokable.invoke()方法的执行过程中,如果正常执行完毕,会输出ResultPartition缓冲区数据,关闭缓冲区,并标记Task为Finished;如果因为取消操作导致退出,会标记Task为CANCELED,并关闭用户代码;如果执行过程中抛出异常,会标记Task为FAILED,关闭用户代码,并记录异常;如果执行过程中JVM抛出错误,会强制终止虚拟机,并退出当前进程。

二、ResultPartition

ResultPartition代表由一个Task生成的数据,并与ExecutionGraph中的IntermediateResultPartition一一对应。它实际上是一个缓存池,里面保存的是经过序列化之后的节点计算结果。每个ResultPartition包含多个ResultSubPartition,其数目由下游消费Task的数量和DistributionPattern来决定。ResultSubPartition是ResultPartition的一个子分区,真正持有缓冲区Buffer。

写入ResultPartition的操作由ResultPartition的add方法实现。此外,在shuffle阶段,ResultPartition的选择由ChannelSelector负责,它决定了序列化后的record应该写入哪个ResultSubPartition。

ResultPartition在Flink的物理执行图中扮演着重要角色,它确保了数据在Task之间的正确流动和传输,是构建高效、可靠数据流处理应用的关键组件之一。

三、ResultSubpartition

ResultSubpartition是ResultPartition的一个子分区,用于存储和传输数据。每个ResultPartition包含多个ResultSubpartition,其数量由下游消费Task的数量和DistributionPattern决定。这种设计有助于并行处理数据,提高处理效率。

ResultSubpartition负责接收上游Task生成的数据,并将其缓存起来,以便下游Task消费。同时,ResultSubpartition还负责数据的序列化、反序列化和传输,确保数据在不同Task之间的正确流动。

根据数据类型和传输需求,Flink提供了不同类型的ResultSubpartition实现。例如,PipelinedSubpartition是基于内存的管道模式的结果子分区,适用于低延迟的数据传输场景;BoundedBlockingSubpartition中是以阻塞的方式传输的,即数据先被写入,然后再被消费。这种机制确保了数据的有序性和一致性,避免了数据在传输过程中的丢失或乱序问题。

在Flink的物理执行图中,ResultSubpartition与InputGate和InputChannel紧密相关。每个InputGate消费一个或多个ResultPartition,而每个InputGate又包含一个或多个InputChannel。InputChannel与ResultSubpartition一对一地相连,即一个InputChannel接收一个ResultSubpartition的输出。这种设计使得数据能够按照预定的路径在Task之间流动,实现分布式数据流处理。

总的来说,ResultSubpartition是Flink数据流处理中的关键组件,它负责数据的存储、传输和消费,确保数据在不同Task之间的正确流动和高效处理。

创建ResultPartition、ResultSubpartition的相关源码

    public ResultPartition create(
            String taskNameWithSubtaskAndId,
            int partitionIndex,
            ResultPartitionID id,
            ResultPartitionType type,
            int numberOfSubpartitions,
            int maxParallelism,
            SupplierWithException<BufferPool, IOException> bufferPoolFactory) {
        BufferCompressor bufferCompressor = null;
        if (type.supportCompression() && batchShuffleCompressionEnabled) {
            bufferCompressor = new BufferCompressor(networkBufferSize, compressionCodec);
        }

        ResultSubpartition[] subpartitions = new ResultSubpartition[numberOfSubpartitions];

        final ResultPartition partition;
        if (type == ResultPartitionType.PIPELINED
                || type == ResultPartitionType.PIPELINED_BOUNDED
                || type == ResultPartitionType.PIPELINED_APPROXIMATE) {
            final PipelinedResultPartition pipelinedPartition =
                    new PipelinedResultPartition(
                            taskNameWithSubtaskAndId,
                            partitionIndex,
                            id,
                            type,
                            subpartitions,
                            maxParallelism,
                            partitionManager,
                            bufferCompressor,
                            bufferPoolFactory);

            for (int i = 0; i < subpartitions.length; i++) {
                if (type == ResultPartitionType.PIPELINED_APPROXIMATE) {
                    subpartitions[i] =
                            new PipelinedApproximateSubpartition(
                                    i, configuredNetworkBuffersPerChannel, pipelinedPartition);
                } else {
                    subpartitions[i] =
                            new PipelinedSubpartition(
                                    i, configuredNetworkBuffersPerChannel, pipelinedPartition);
                }
            }

            partition = pipelinedPartition;
        } else if (type == ResultPartitionType.BLOCKING
                || type == ResultPartitionType.BLOCKING_PERSISTENT) {
            if (numberOfSubpartitions >= sortShuffleMinParallelism) {
                partition =
                        new SortMergeResultPartition(
                                taskNameWithSubtaskAndId,
                                partitionIndex,
                                id,
                                type,
                                subpartitions.length,
                                maxParallelism,
                                batchShuffleReadBufferPool,
                                batchShuffleReadIOExecutor,
                                partitionManager,
                                channelManager.createChannel().getPath(),
                                bufferCompressor,
                                bufferPoolFactory);
            } else {
                final BoundedBlockingResultPartition blockingPartition =
                        new BoundedBlockingResultPartition(
                                taskNameWithSubtaskAndId,
                                partitionIndex,
                                id,
                                type,
                                subpartitions,
                                maxParallelism,
                                partitionManager,
                                bufferCompressor,
                                bufferPoolFactory);

                initializeBoundedBlockingPartitions(
                        subpartitions,
                        blockingPartition,
                        blockingSubpartitionType,
                        networkBufferSize,
                        channelManager,
                        sslEnabled);

                partition = blockingPartition;
            }
        } else if (type == ResultPartitionType.HYBRID_FULL
                || type == ResultPartitionType.HYBRID_SELECTIVE) {
            partition =
                    new HsResultPartition(
                            taskNameWithSubtaskAndId,
                            partitionIndex,
                            id,
                            type,
                            subpartitions.length,
                            maxParallelism,
                            batchShuffleReadBufferPool,
                            batchShuffleReadIOExecutor,
                            partitionManager,
                            channelManager.createChannel().getPath(),
                            networkBufferSize,
                            HybridShuffleConfiguration.builder(
                                            numberOfSubpartitions,
                                            batchShuffleReadBufferPool.getNumBuffersPerRequest())
                                    .setSpillingStrategyType(
                                            type == ResultPartitionType.HYBRID_FULL
                                                    ? HybridShuffleConfiguration
                                                            .SpillingStrategyType.FULL
                                                    : HybridShuffleConfiguration
                                                            .SpillingStrategyType.SELECTIVE)
                                    .build(),
                            bufferCompressor,
                            bufferPoolFactory);
        } 
        return partition;
    }

四、InputGate

InputGate是对数据输入的封装,与JobGraph中的JobEdge一一对应。每个InputGate消费一个或多个ResultPartition,这些ResultPartition代表上游Task生成的数据。InputGate的主要作用是管理和控制数据的流入,确保数据能够按照正确的顺序和方式被Task所消费。

InputGate由多个InputChannel构成,每个InputChannel与ExecutionGraph中的ExecutionEdge以及ResultSubpartition一一对应。这意味着每个InputChannel负责接收一个ResultSubpartition的输出,从而实现了数据的精确传递和接收。

在Flink的物理执行过程中,InputGate和InputChannel起着至关重要的作用。它们不仅负责数据的接收和传递,还参与了数据的序列化和反序列化过程,确保数据在不同Task之间的正确流动。此外,InputGate和InputChannel还提供了对数据传输的控制和优化功能,可以根据实际需求调整数据传输的策略和方式。

总的来说,Flink的InputGate通过对数据输入的封装和管理,实现了数据的精确传递和高效处理。

五、InputChannel

InputChannel是数据输入通道的关键组件,它位于InputGate之下,与ExecutionGraph中的ExecutionEdge以及ResultSubpartition一对一地相连。每个InputChannel负责接收一个ResultSubpartition的输出,确保数据从上游Task正确地流向下游Task。

根据消费的ResultPartition的位置,InputChannel有两种不同的实现:LocalInputChannel和RemoteInputChannel。LocalInputChannel用于处理本地数据交换,即数据在同一TaskManager的不同Task之间传输;而RemoteInputChannel则负责远程数据交换,即数据在不同TaskManager的Task之间传输。这种设计使得Flink能够灵活地处理分布式环境中的数据流动。

此外,还有一个名为UnknownInputChannel的实现类,它作为尚未确定ResultPartition位置的情况下的占位符。在实际执行过程中,UnknownInputChannel最终会被更新为LocalInputChannel或RemoteInputChannel,以反映实际的数据传输路径。

InputChannel在Flink的数据流处理中扮演着重要角色。它不仅是数据传输的通道,还参与数据的序列化和反序列化过程,确保数据在传输过程中的完整性和一致性。同时,InputChannel与InputGate和ResultSubpartition的紧密协作,使得Flink能够高效地处理大规模、高吞吐量的数据流。

总结来说,Flink InputChannel负责数据的接收、传输和序列化,确保数据在不同Task之间的正确流动。通过LocalInputChannel和RemoteInputChannel的不同实现,Flink能够处理各种分布式场景下的数据交换需求。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/440824.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Leetcode - 周赛387

目录 一&#xff0c;3069. 将元素分配到两个数组中 I 二&#xff0c;3070. 元素和小于等于 k 的子矩阵的数目 三&#xff0c;3071. 在矩阵上写出字母 Y 所需的最少操作次数 四&#xff0c;3072. 将元素分配到两个数组中 II 一&#xff0c;3069. 将元素分配到两个数组中 I 本…

[递归、搜索、回溯]----递归

前言 作者&#xff1a;小蜗牛向前冲 专栏&#xff1a;小蜗牛算法之路 专栏介绍&#xff1a;"蜗牛之道&#xff0c;攀登大厂高峰&#xff0c;让我们携手学习算法。在这个专栏中&#xff0c;将涵盖动态规划、贪心算法、回溯等高阶技巧&#xff0c;不定期为你奉上基础数据结构…

freeRTOS20240308

1.总结任务的调度算法&#xff0c;把实现代码再写一下 2.总结任务的状态以及是怎么样进行转换的

音视频学习笔记——c++多线程(一)

✊✊✊&#x1f308;大家好&#xff01;本篇文章主要整理了部分多线程相关的内容重点&#x1f607;。首先讲解了多进程和多线程并发的区别以及各自优缺点&#xff0c;之后讲解了Thead线程库的基本使用。 本专栏知识点是通过<零声教育>的音视频流媒体高级开发课程进行系统…

react的diff源码

react 的 render 阶段&#xff0c;其中 begin 时会调用 reconcileChildren 函数&#xff0c; reconcileChildren 中做的事情就是 react 知名的 diff 过程 diff 算法介绍 react 的每次更新&#xff0c;都会将新的 ReactElement 内容与旧的 fiber 树作对比&#xff0c;比较出它们…

消息队列-kafka-消息发送流程(源码跟踪) 与消息可靠性

官方网址 源码&#xff1a;https://kafka.apache.org/downloads 快速开始&#xff1a;https://kafka.apache.org/documentation/#gettingStarted springcloud整合 发送消息流程 主线程&#xff1a;主线程只负责组织消息&#xff0c;如果是同步发送会阻塞&#xff0c;如果是异…

【CSP试题回顾】202104-2-邻域均值

CSP-202104-2-邻域均值 关键点&#xff1a;二维差分数组 详见&#xff1a;【CSP考点回顾】差分数组 解题思路 初始化矩阵和参数&#xff1a;首先&#xff0c;代码接收矩阵的大小&#xff08;n x n&#xff09;&#xff0c;每个元素的亮度值&#xff08;位于[0, L]区间&…

基于Vue的体育汇App设计与实现

目 录 摘 要 I Abstract II 引 言 1 1 核心技术的理论与分析 3 1.1 客户端技术 3 1.1.1 Vue.js框架 3 1.1.2 Vue.js路由管理 3 1.1.3 Vuex状态管理 3 1.1.4 MVVM开发模式 4 1.1.5 Vant组件库 5 1.2 服务端技术 5 1.2.1 Node.js 5 1.2.2 Egg.js框架 5 1.3 数据库技术 6 1.4 本章…

webUI自动化测试框架

&#x1f525; 交流讨论&#xff1a;欢迎加入我们一起学习&#xff01; &#x1f525; 资源分享&#xff1a;耗时200小时精选的「软件测试」资料包 &#x1f525; 教程推荐&#xff1a;火遍全网的《软件测试》教程 &#x1f4e2;欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1…

【LeetCode】升级打怪之路 Day 16:二叉树题型 —— 二叉树的构造

今日题目&#xff1a; 654. 最大二叉树105. 从前序与中序遍历序列构造二叉树106. 从中序与后序遍历序列构造二叉树889. 根据前序和后序遍历构造二叉树 目录 LC 654. 最大二叉树 【easy】 Problem&#xff1a;根据遍历序列来还原二叉树 【classic】 ⭐⭐⭐⭐⭐LC 105. 从前序与中…

数据库原理实验课(1)

目录 实验内容 安装头歌中的相关内容 具体过程 完结撒花~ 我也是第一次接触oracle的相关软件和操作&#xff0c;所以是一次傻瓜式教学记录 实验内容 安装头歌中的相关内容 具体过程 这是我在百度网盘中下载解压出来的oracle文件夹内的全部内容&#xff08;可能有因为安装完…

使用Portainer让测试环境搭建飞起来

Docker的用处不多加赘述&#xff0c;Docker目前有以下应用场景&#xff1a; 测试&#xff1a;Docker很适合用于测试发布&#xff0c;将 Docker 封装后可以直接提供给测试人员进行运行&#xff0c;不再需要测试人员与运维、开发进行配合&#xff0c;进行环境搭建与部署。 测试…

【技术】基于Github Pages搭建个人博客静态网页

写在前面&#xff1a; 如果文章对你有帮助&#xff0c;记得点赞关注加收藏一波&#xff0c;利于以后需要的时候复习&#xff0c;多谢支持&#xff01; 文章目录 一、技术基础二、新建特殊仓库三、上传网页文件四、Github Pages设置 个人网页作为仅服务个人的网页&#xff0c;一…

Grafana变量默认全选

注&#xff1a;本文基于Grafana v9.2.8编写 1 问题 EKS集群里的node按照不同label被分为几类&#xff0c;我需要对这几类的node做一些统计。我希望当我使用lable选择时&#xff0c;node的值自动设置为该lable的所有node集合&#xff0c;而不需要再手动全选。 2 解决方案 变…

微信小程序(五十四)腾讯位置服务示范(2024/3/8更新)

教程如下&#xff1a; 上一篇 1.先在官网注册一下账号&#xff08;该绑定的都绑定一下&#xff09; 腾讯位置服务官网 2.进入控制台 3.创建应用 3. 额度分配 4.下载微信小程序SDK 微信小程序SDK下载渠道 5.解压将俩js文件放在项目合适的地方 6.加入安全域名or设置不验证合…

扩展CArray类,增加Contain函数

CArray不包含查找类的函数&#xff0c;使用不便。考虑扩展CArray类&#xff0c;增加Contain函数&#xff0c;通过回调函数暴露数组元素的比较方法&#xff0c;由外部定义。该方法相对重载数组元素的“”符号更加灵活&#xff0c;可以根据需要配置不同的回调函数进行比较 //类型…

AD20中关于“failed to add class member”的解决方法

目录 问题描述&#xff1a; 解决方法&#xff1a; 1.切换至PCB界面-选择工具栏的设计-类 2.把Component classes中的所有的类全部按照图中删除&#xff0c;保存 3.重新返回原理图界面转换PCB即可成功 问题描述&#xff1a; failed to add class member&#xff1a;未能添加…

解答关于:水牛社软件是做什么的?水牛社软件靠谱么?

很多对我们软件感兴趣但是没有入手的观望者都会有这样的疑问&#xff1a;水牛社软件具体是做什么的&#xff1f;水牛社软件靠谱么&#xff1f; 其实软件的简介已经讲的很清楚了&#xff0c;但是软件不是功能性软件&#xff0c;所以不能给大家免费试用&#xff0c;短期任务版块…

智能驾驶规划控制理论学习08-自动驾驶控制模块(轨迹跟踪)

目录 一、基于几何的轨迹跟踪方法 1、基本思想 2、纯追踪 3、Stanly Method 二、PID控制器 三、LQR&#xff08;Linear Quadratic Regulator&#xff09; 1、基本思想 2、LQR解法 3、案例学习 基于LQR的路径跟踪 基于LQR的速度跟踪 4、MPC&#xff08;Mode…

Python通过SFTP实现网络设备配置备份

一、背景 为了防止网络设备意外损坏&#xff0c;导致配置文件无法恢复&#xff0c;可以通过将网络设备的配置文件备份到本地电脑上。 一般情况下&#xff0c;设备支持通过FTP、TFTP、FTPS、SFTP和SCP备份配置文件。其中使用FTP和TFTP备份配置文件比较简单&#xff0c;但是存在…
最新文章