13、Flink 的 Operator State 详解

1.算子状态 (Operator State)

算子状态(或者非 keyed 状态)是绑定到一个并行算子实例的状态,Kafka consumer 每个并行实例维护了 topic partitions 和偏移量的 map 作为它的算子状态。

当并行度改变的时候,算子状态支持将状态重新分发给各并行算子实例,处理重分发过程有多种不同的方案。

算子状态作为一种特殊类型的状态使用,用于实现 source/sink,以及无法对 state 进行分区而没有主键的这类场景中。

注意: Python DataStream API 仍无法支持算子状态。

2.使用 Operator State

用户可以通过实现 CheckpointedFunction 接口来使用 operator state。

a)CheckpointedFunction 概述

CheckpointedFunction 接口提供了访问 non-keyed state 的方法,需要实现如下两个方法:

void snapshotState(FunctionSnapshotContext context) throws Exception;

void initializeState(FunctionInitializationContext context) throws Exception;

进行 checkpoint 时会调用 snapshotState(),自定义函数初始化时会调用 initializeState(),初始化包括第一次自定义函数初始化和从之前的 checkpoint 恢复;因此 initializeState() 不仅是定义不同状态类型初始化的地方,也需要包括状态恢复的逻辑。

当前 operator state 以 list 的形式存在;这些状态是一个 可序列化 对象的集合 List,彼此独立,方便在改变并发后进行状态的重新分派,根据状态的不同访问方式,有如下几种重新分配的模式

  • Even-split redistribution: 每个算子都保存一个列表形式的状态集合,整个状态由所有的列表拼接而成;当作业恢复或重新分配的时候,整个状态会按照算子的并发度进行均匀分配。;比如说,算子 A 的并发度为 1,包含两个元素 element1element2,当并发度增加为 2 时,element1 会被分到并发 0 上,element2 则会被分到并发 1 上。
  • Union redistribution: 每个算子保存一个列表形式的状态集合;整个状态由所有的列表拼接而成;当作业恢复或重新分配时,每个算子都将获得所有的状态数据【不建议使用】
b)带缓冲的 SinkFunction

案例SinkFunctionCheckpointedFunction 中进行数据缓存,然后统一发送到下游,演示了列表状态数据的 event-split redistribution。

public class BufferingSink
        implements SinkFunction<Tuple2<String, Integer>>,
                   CheckpointedFunction {

    private final int threshold;

    private transient ListState<Tuple2<String, Integer>> checkpointedState;

    private List<Tuple2<String, Integer>> bufferedElements;

    public BufferingSink(int threshold) {
        this.threshold = threshold;
        this.bufferedElements = new ArrayList<>();
    }

    @Override
    public void invoke(Tuple2<String, Integer> value, Context contex) throws Exception {
        bufferedElements.add(value);
        if (bufferedElements.size() >= threshold) {
            for (Tuple2<String, Integer> element: bufferedElements) {
                // send it to the sink
            }
            bufferedElements.clear();
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        checkpointedState.update(bufferedElements);
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        ListStateDescriptor<Tuple2<String, Integer>> descriptor =
            new ListStateDescriptor<>(
                "buffered-elements",
                TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));

        checkpointedState = context.getOperatorStateStore().getListState(descriptor);

        if (context.isRestored()) {
            for (Tuple2<String, Integer> element : checkpointedState.get()) {
                bufferedElements.add(element);
            }
        }
    }
}

initializeState 方法接收一个 FunctionInitializationContext 参数,用来初始化 non-keyed state 的 “容器” 即 ListState 用于在 checkpoint 时保存 non-keyed state 对象,和 keyed state 类似,StateDescriptor 会包括状态名字、以及状态类型相关信息。

ListStateDescriptor<Tuple2<String, Integer>> descriptor =
    new ListStateDescriptor<>(
        "buffered-elements",
        TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));

checkpointedState = context.getOperatorStateStore().getListState(descriptor);

调用不同的获取状态对象的接口,会使用不同的状态分配算法,比如 getUnionListState(descriptor) 会使用 union redistribution 算法, 而 getListState(descriptor) 则使用 even-split redistribution 算法。

当初始化状态对象后,通过 isRestored() 方法判断是否从之前的故障中恢复,如果该方法返回 true 则表示从故障中进行恢复,会执行接下来的恢复逻辑,BufferingSink 中初始化时,恢复回来的 ListState 的所有元素会添加到一个局部变量中,供下次 snapshotState() 时使用,然后清空 ListState,再把当前局部变量中的所有元素写入到 checkpoint 中。

同样可以在 initializeState() 方法中使用 FunctionInitializationContext 初始化 keyed state。

c)带状态的 Source Function

需要保证更新状态以及输出的原子性(用于支持 exactly-once 语义),需要在发送数据前获取数据源的全局锁。

public static class CounterSource
        extends RichParallelSourceFunction<Long>
        implements CheckpointedFunction {

    /**  current offset for exactly once semantics */
    private Long offset = 0L;

    /** flag for job cancellation */
    private volatile boolean isRunning = true;
    
    /** 存储 state 的变量. */
    private ListState<Long> state;
     
    @Override
    public void run(SourceContext<Long> ctx) {
        final Object lock = ctx.getCheckpointLock();

        while (isRunning) {
            // output and state update are atomic
            synchronized (lock) {
                ctx.collect(offset);
                offset += 1;
            }
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        state = context.getOperatorStateStore().getListState(new ListStateDescriptor<>(
            "state",
            LongSerializer.INSTANCE));
            
        // 从已保存的状态中恢复 offset 到内存中,在进行任务恢复的时候也会调用此初始化状态的方法
        for (Long l : state.get()) {
            offset = l;
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        state.update(Collections.singletonList(offset));
    }
}

要获取 checkpoint 成功消息的算子,可以参考 org.apache.flink.api.common.state.CheckpointListener 接口

【当算子完成 checkpoint 后会回调 notifyCheckpointComplete() 方法】。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/589624.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Django数据库创建存储及管理

一、什么是ORM Django的ORM(Object-Relational Mapping)是Django框架中一个非常重要的组件。ORM可以让开发者以面向对象的方式操作数据库,而不需要直接编写SQL语句。 具体来说,Django ORM提供了以下功能: 模型定义:开发者可以在Django应用中定义Python类来表示数据库表,这些…

基于寄存器的STM32操作流程

寄存器点灯 寄存器操作STM32的好处是不需要依靠外部文件&#xff0c;自由度更高&#xff0c;更为底层&#xff0c;但也更加繁杂。 通过寄存器点灯&#xff0c;需要按照电路结构与手册配置寄存器&#xff1a; 电路结构如下&#xff1a;可知需配置的GPIO为GPIOB5与GPIOE5。 在…

Docker构建LNMP部署WordPress

前言 使用容器化技术如 Docker 可以极大地简化应用程序的部署和管理过程&#xff0c;本文将介绍如何利用 Docker 构建 LNMP 环境&#xff0c;并通过部署 WordPress 来展示这一过程。 目录 一、环境准备 1. 项目需求 2. 安装包下载 3. 服务器环境 4. 规划工作目录 5. 创…

excel怎么删除条件格式规则但保留格式?

这个问题的意思就是要将设置的条件格式&#xff0c;转换成单元格格式。除了使用VBA代码将格式转换外&#xff0c;还可以用excel自己的功能来完成这个任务。 一、将条件格式“留下来” 1.设置条件格式 选中数据&#xff0c;点击开始选项卡&#xff0c;设置条件格式&#xff0…

2024年 Java 面试八股文——SpringMVC篇

目录 1.简单介绍下你对springMVC的理解? 2.说一说SpringMVC的重要组件及其作用 3.SpringMVC的工作原理或流程 4.SpringMVC的优点 5.SpringMVC常用注解 6.SpringMVC和struts2的区别 7.怎么实现SpringMVC拦截器 8.SpringMvc的控制器是不是单例模式&#xff1f;如果是&am…

XUbuntu24.04之更换国内高速源(二百二十八)

简介&#xff1a; CSDN博客专家&#xff0c;专注Android/Linux系统&#xff0c;分享多mic语音方案、音视频、编解码等技术&#xff0c;与大家一起成长&#xff01; 优质专栏&#xff1a;Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 优质专栏&#xff1a;多媒…

无人机+飞行汽车:低空经济新引擎,有望爆发式增长

无人机和飞行汽车作为低空经济的新引擎&#xff0c;正在引领一场全新的交通革命。随着技术的不断进步和政策的支持&#xff0c;低空经济有望成为未来经济发展的重要领域&#xff0c;实现爆发式增长。 首先&#xff0c;无人机和飞行汽车具有独特的优势和应用场景。无人机可以在…

confluence 设置https代理

使用nginx反待confluence并开启https后&#xff0c;登录confluence会一直提示&#xff1a;scheme、proxyName、proxyPort设置错误。 解决办法&#xff1a; find / -name server.xmlvi /opt/atlassian/confluence/conf/server.xml HTTP反代配置 HTTPS反代配置

ue引擎游戏开发笔记(28)——实现第三人称越肩视角

1.需求分析 实现一个第三人称越肩视角 2.操作实现 1.思路&#xff1a;建立一个弹簧臂和摄像机&#xff0c;调整两者位置达到越肩效果。 2.直接在蓝图操作&#xff1a;添加摄像机和弹簧臂&#xff1a; 3.对弹簧臂勾选使用pawn控制旋转&#xff0c;并适当调整摄像机和弹簧臂位置…

[NSSCTF]prize_p1

前言 之前做了p5 才知道还有p1到p4 遂来做一下 顺便复习一下反序列化 prize_p1 <META http-equiv"Content-Type" content"text/html; charsetutf-8" /><?phphighlight_file(__FILE__);class getflag{function __destruct(){echo getenv(&qu…

Vue 组件的三大组成部分

Vue 组件通常由三大组成部分构成&#xff1a;模板&#xff08;Template&#xff09;、脚本&#xff08;Script&#xff09;、样式&#xff08;Style&#xff09; 模板部分是组件的 HTML 结构&#xff0c;它定义了组件的外观和布局。Vue 使用基于 HTML 的模板语法来声明组件的模…

【算法入门教育赛1E】最长公共前缀 - 字符串哈希 | 二分 | C++题解与代码

题目链接&#xff1a;https://www.starrycoding.com/problem/163 题目描述 牢 e e e在 S t a r r y C o d i n g StarryCoding StarryCoding的入门教育赛报名单上遇到了许多名字 s 1 , s 2 , . . . , s n s_1, s_2,...,s_n s1​,s2​,...,sn​&#xff0c;他想知道由这些人的…

网络安全风险里的威胁建模

文章目录 前言一、威胁建模的必要性二、威胁建模的过程三、威胁建模框架及方法1、NIST威胁模型框架2、STRIDE Model框架3、DREAD框架4、PASTA流程5、LINDDUN框架6、TRIKE知识库7、安全决策树四、威胁建模应用实践前言 网络安全的本质是攻防双方的对抗与博弈。然而,由于多种攻…

python学习笔记B-20:序列实战--处理千年虫

将2位数表达的年份&#xff0c;转换为用4位数表达&#xff1a; print("将列表中的2位数年份转换为4位数年份") lst[88,89,90,00,99] print(lst) for index in range(len(lst)):if len(str(lst[index]))2:lst[index] 1900int(lst[index])elif len(str(lst[index]))1…

微信小程序demo-----制作文章专栏

前言&#xff1a;不管我们要做什么种类的小程序都涉及到宣传或者扩展其他业务&#xff0c;我们就可以制作一个文章专栏的页面&#xff0c;实现点击一个专栏跳转到相应的页面&#xff0c;页面可以有科普类的知识或者其他&#xff0c;然后页面下方可以自由发挥&#xff0c;添加联…

网盘——分享文件——逻辑设计

本文主要讲解关于网盘文件操作部分的分享文件的逻辑设计部分&#xff0c;主要步骤如下&#xff1a; 目录 1、实施步骤&#xff1a; 2、代码实现 2.1、添加分享文件协议 2.2、添加取消槽函数 2.3、关联取消选择的槽函数 2.4、添加取消槽函数的定义 2.5、添加全选函数槽函…

小程序地理位置接口权限直接抄作业

小程序地理位置接口有什么功能&#xff1f; 随着小程序生态的发展&#xff0c;越来越多的小程序开发者会通过官方提供的自带接口来给用户提供便捷的服务。但是当涉及到地理位置接口时&#xff0c;却经常遇到申请驳回的问题&#xff0c;反复修改也无法通过&#xff0c;给的理由也…

rabbitMq 0 到1

前言 工作中MQ的使用场景是数不胜数&#xff0c;每个公司的技术选型又不太一样&#xff0c;用的哪个MQ&#xff0c;我们必须要先玩起来&#xff0c;RabbitMQ在windows安装遇到很多问题&#xff0c;博客也是五花八门&#xff0c;算了还是自己搞吧&#xff0c;记录一下&#xff…

C#描述-计算机视觉OpenCV(3):重映射

C#描述-计算机视觉OpenCV&#xff08;3&#xff09;&#xff1a;重映射 前言色彩波形图像重映射 前言 C#描述-计算机视觉OpenCV&#xff08;1&#xff09;&#xff1a;基础操作 C#描述-计算机视觉OpenCV&#xff08;2&#xff09;&#xff1a;图像处理 在前文中&#xff0c;描…

2.2 Java全栈开发前端+后端(全栈工程师进阶之路)-前端框架VUE3-基础-Vue基本语法

文本渲染指令 文本渲染指令-v-html与v-text Vue使用了基于HTML的模板语法&#xff0c;允许开发者声明式地将DOM绑定至底层Vue实例的数据。所有Vue的模板都是 合法的HTML&#xff0c;所以能被遵循规范的浏览器和HTML解析器解析。 在前面&#xff0c;我们一直使用的是字符串插…
最新文章