【flink番外篇】7、flink的State(Keyed State和operator state)介绍及示例 - 完整版

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列
    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列
    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列
    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列
    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列
    本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引


文章目录

  • Flink 系列文章
  • 一、maven依赖
  • 二、Keyed State
    • 1、Keyed State 介绍及示例
    • 2、Keyed State状态有效期 (TTL)
      • 1)、过期数据的清理
      • 2)、全量快照时进行清理
      • 3)、增量数据清理
      • 4)、在 RocksDB 压缩时清理
    • 3、keyed state示例:实现地铁站哪个进站口人数最多
      • 1)、java bean
      • 2)、实现
      • 3)、验证
  • 三、Operator State
    • 1、CheckpointedFunction
    • 2、带状态的 Source Function
    • 3、operator state示例:实现程序异常时自动保存state,当超过重启次数时中断运行
      • 1)、实现
      • 2)、运行结果
      • 4)、hdfs上的checkpoint


本文介绍了Flink State中的keyed state 和 operator state基本功能及示例,其中包含详细的验证步骤与验证结果。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,没有其他依赖。

本文需要hadoop环境,因为模拟checkpoint的时候使用了hdfs。

本专题分为以下几篇文章:
【flink番外篇】7、flink的State(Keyed State和operator state)介绍及示例(1) - Keyed State
【flink番外篇】7、flink的State(Keyed State和operator state)介绍及示例(2) - operator state
【flink番外篇】7、flink的State(Keyed State和operator state)介绍及示例 - 完整版

关于Flink state的更多介绍参考文章:

8、Flink四大基石之State概念、使用场景、持久化、批处理的详解与keyed state和operator state、broadcast state使用和详细示例

一、maven依赖

<properties>
    <encoding>UTF-8</encoding>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <java.version>1.8</java.version>
    <scala.version>2.12</scala.version>
    <flink.version>1.17.0</flink.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-scala_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <!-- 日志 -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.7</version>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
        <scope>runtime</scope>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.2</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
</dependencies>

二、Keyed State

1、Keyed State 介绍及示例

keyed state 接口提供不同类型状态的访问接口,这些状态都作用于当前输入数据的 key 下。换句话说,这些状态仅可在 KeyedStream 上使用,可以通过 stream.keyBy(…) 得到 KeyedStream.

所有支持的状态类型如下所示:

  • ValueState: 保存一个可以更新和检索的值(如上所述,每个值都对应到当前的输入数据的 key,因此算子接收到的每个 key 都可能对应一个值)。 这个值可以通过 update(T) 进行更新,通过 T value() 进行检索。

  • ListState: 保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索。可以通过 add(T) 或者 addAll(List) 进行添加元素,通过 Iterable get() 获得整个列表。还可以通过 update(List) 覆盖当前的列表。

  • ReducingState: 保存一个单值,表示添加到状态的所有值的聚合。接口与 ListState 类似,但使用 add(T) 增加元素,会使用提供的 ReduceFunction 进行聚合。

  • AggregatingState<IN, OUT>: 保留一个单值,表示添加到状态的所有值的聚合。和 ReducingState 相反的是, 聚合类型可能与 添加到状态的元素的类型不同。 接口与 ListState 类似,但使用 add(IN) 添加的元素会用指定的 AggregateFunction 进行聚合。

  • MapState<UK, UV>: 维护了一个映射列表。 你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用 put(UK,UV) 或者 putAll(Map<UK,UV>) 添加映射。 使用 get(UK) 检索特定 key。 使用 entries(),keys() 和 values() 分别检索映射、键和值的可迭代视图。你还可以通过 isEmpty() 来判断是否包含任何键值对。

所有类型的状态还有一个clear() 方法,清除当前 key 下的状态数据,也就是当前输入元素的 key。

这些状态对象仅用于与状态交互。状态本身不一定存储在内存中,还可能在磁盘或其他位置。 另外从状态中获取的值取决于输入元素所代表的 key。 因此,在不同 key 上调用同一个接口,可能得到不同的值。

你必须创建一个 StateDescriptor,才能得到对应的状态句柄。 这保存了状态名称(可以创建多个状态,并且它们必须具有唯一的名称以便可以引用它们), 状态所持有值的类型,并且可能包含用户指定的函数,例如ReduceFunction。

根据不同的状态类型,可以创建ValueStateDescriptor,ListStateDescriptor, ReducingStateDescriptor 或 MapStateDescriptor。

状态通过 RuntimeContext 进行访问,因此只能在 rich functions 中使用。RichFunction 中 RuntimeContext 提供如下方法:

ValueState<T> getState(ValueStateDescriptor<T>)
ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
ListState<T> getListState(ListStateDescriptor<T>)
AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)
MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)

下面是一个 FlatMapFunction 的例子,展示了如何将这些部分组合起来:

public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

    /**
     * The ValueState handle. The first field is the count, the second field a running sum.
     */
    private transient ValueState<Tuple2<Long, Long>> sum;

    @Override
    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {

        // access the state value
        Tuple2<Long, Long> currentSum = sum.value();

        // update the count
        currentSum.f0 += 1;

        // add the second field of the input value
        currentSum.f1 += input.f1;

        // update the state
        sum.update(currentSum);

        // if the count reaches 2, emit the average and clear the state
        if (currentSum.f0 >= 2) {
            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
            sum.clear();
        }
    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        "average", // the state name
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
                        Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
        sum = getRuntimeContext().getState(descriptor);
    }
}

// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
        .keyBy(value -> value.f0)
        .flatMap(new CountWindowAverage())
        .print();

// the printed output will be (1,4) and (1,5)

2、Keyed State状态有效期 (TTL)

任何类型的 keyed state 都可以有 有效期 (TTL)。如果配置了 TTL 且状态值已过期,则会尽最大可能清除对应的值。
所有状态类型都支持单元素的 TTL, 这意味着列表元素和映射元素将独立到期。
在使用状态 TTL 前,需要先构建一个StateTtlConfig 配置对象。 然后把配置传递到 state descriptor 中启用 TTL 功能:

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();
    
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);

TTL 配置有以下几个选项: newBuilder 的第一个参数表示数据的有效期,是必选项。

  • TTL 的更新策略(默认是 OnCreateAndWrite):
    StateTtlConfig.UpdateType.OnCreateAndWrite - 仅在创建和写入时更新
    StateTtlConfig.UpdateType.OnReadAndWrite - 读取时也更新
  • 数据在过期但还未被清理时的可见性配置如下(默认为 NeverReturnExpired):
    StateTtlConfig.StateVisibility.NeverReturnExpired - 不返回过期数据
    StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp - 会返回过期但未清理的数据
    NeverReturnExpired 情况下,过期数据就像不存在一样,不管是否被物理删除。这对于不能访问过期数据的场景下非常有用,比如敏感数据。 ReturnExpiredIfNotCleanedUp 在数据被物理删除前都会返回。

1)、过期数据的清理

默认情况下,过期数据会在读取的时候被删除,例如 ValueState#value,同时会有后台线程定期清理(如果 StateBackend 支持的话)。

可以通过 StateTtlConfig 配置关闭后台清理:

import org.apache.flink.api.common.state.StateTtlConfig;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .disableCleanupInBackground()
    .build();

可以按照如下所示配置更细粒度的后台清理策略。截至Flink 1.17版本的实现中 HeapStateBackend 依赖增量数据清理,RocksDBStateBackend 利用压缩过滤器进行后台清理。

2)、全量快照时进行清理

可以启用全量快照时进行清理的策略,这可以减少整个快照的大小。截至Flink 1.17版本实现中不会清理本地的状态,但从上次快照恢复时,不会恢复那些已经删除的过期数据。

该策略可以通过 StateTtlConfig 配置进行配置:

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupFullSnapshot()
    .build();

这种策略在 RocksDBStateBackend 的增量 checkpoint 模式下无效。

3)、增量数据清理

可以选择增量式清理状态数据,在状态访问或/和处理时进行。如果某个状态开启了该清理策略,则会在存储后端保留一个所有状态的惰性全局迭代器。 每次触发增量清理时,从迭代器中选择已经过期的数进行清理。

该特性可以通过 StateTtlConfig 进行配置:

import org.apache.flink.api.common.state.StateTtlConfig;

 StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupIncrementally(10, true)
    .build();

该策略有两个参数。

第一个是每次清理时检查状态的条目数,在每个状态访问时触发。

第二个参数表示是否在处理每条记录时触发清理。

Heap backend 默认会检查 5 条状态,并且关闭在每条记录时触发清理。

4)、在 RocksDB 压缩时清理

如果使用 RocksDB state backend,则会启用 Flink 为 RocksDB 定制的压缩过滤器。RocksDB 会周期性的对数据进行合并压缩从而减少存储空间。 Flink 提供的 RocksDB 压缩过滤器会在压缩时过滤掉已经过期的状态数据。

该特性可以通过 StateTtlConfig 进行配置:

import org.apache.flink.api.common.state.StateTtlConfig;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupInRocksdbCompactFilter(1000)
    .build();

Flink 处理一定条数的状态数据后,会使用当前时间戳来检测 RocksDB 中的状态是否已经过期, 可以通过StateTtlConfig.newBuilder(…).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries) 方法指定处理状态的条数。 时间戳更新的越频繁,状态的清理越及时,但由于压缩会有调用 JNI 的开销,因此会影响整体的压缩性能。

RocksDB backend 的默认后台清理策略会每处理 1000 条数据进行一次。

你还可以通过配置开启 RocksDB 过滤器的 debug 日志: log4j.logger.org.rocksdb.FlinkCompactionFilter=DEBUG

3、keyed state示例:实现地铁站哪个进站口人数最多

实际生产中,一般不需要自己实现state,除非特殊情况。

本示例仅仅用于展示state的工作过程。

实现地铁站哪个进站口人数最多,可以统计最近一段时间内的,也可以统计某一时刻的,简单起见,本处示例模糊该概念,就以输入数据的进行分组,有兴趣的读者可以自己基于前一篇的watermaker进行实现,也比较的简单。

本示例是模拟maxBy的state实现。

1)、java bean

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author alanchan
 *
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Subway {
	private String sNo;
	private Integer userCount;
	private Long enterTime;

	public Subway(String sNo, Integer userCount) {
		this.sNo = sNo;
		this.userCount = userCount;
	}
}

2)、实现

import java.util.Random;

import org.apache.commons.lang.time.FastDateFormat;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.watermaker.Subway;

/**
 * @author alanchan
 *
 */
public class KeyedStateDemo {

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		DataStreamSource<Subway> subwayDS = env.addSource(new SourceFunction<Subway>() {
			private boolean flag = true;

			@Override
			public void run(SourceContext<Subway> ctx) throws Exception {
				Random random = new Random();
				while (flag) {
					String sNo = "No" + random.nextInt(3);
					int userCount = random.nextInt(100);
					long eventTime = System.currentTimeMillis();
					Subway subway = new Subway(sNo, userCount, eventTime);
					System.err.println(subway + " ,格式化后时间 " + df.format(subway.getEnterTime()));

					ctx.collect(subway);
					Thread.sleep(1000);
				}
			}

			@Override
			public void cancel() {
				flag = false;
			}
		});

		// transformation
		// 实际中使用maxBy即可
		DataStream<Subway> maxByResult = subwayDS.keyBy(subway -> subway.getSNo()).maxBy("userCount");

		// 使用KeyState中的ValueState来实现maxBy的功能
		DataStream<Tuple3<String, Integer, Integer>> stateResult =
				// RichMapFunction<IN, OUT>
				subwayDS.keyBy(subway -> subway.getSNo()).map(new RichMapFunction<Subway, Tuple3<String, Integer, Integer>>() {
					// 定义一个状态用来存放最大值
					private ValueState<Integer> maxValueStateData;

					// 状态初始化
					@Override
					public void open(Configuration parameters) throws Exception {
						// 创建状态描述器
						ValueStateDescriptor stateDescriptor = new ValueStateDescriptor("maxValueState", Integer.class);
						// 根据状态描述器获取/初始化状态
						maxValueStateData = getRuntimeContext().getState(stateDescriptor);
					}

					@Override
					public Tuple3<String, Integer, Integer> map(Subway inValue) throws Exception {
						Integer currentValue = inValue.getUserCount();
						Tuple3<String, Integer, Integer> tuple3 = null;
						Integer historyValue = maxValueStateData.value();
						// 判断状态
						if (historyValue == null || currentValue > historyValue) {
							historyValue = currentValue;
							// 更新状态
							maxValueStateData.update(historyValue);

						}
						tuple3 = Tuple3.of(inValue.getSNo(), currentValue, historyValue);
						return tuple3;
					}
				});

		// sink
		maxByResult.print("maxBy");
		stateResult.print("stateResult");

		// execute
		env.execute();
	}

}

3)、验证

此处验证比较简单,比较一下maxby的运行结果与自己实现的maxby运行结果是否一致即可。

maxby采用的是subway输出,自己实现使用的tuple3。

Subway(sNo=No1, userCount=33, enterTime=1689227364582) ,格式化后时间 13:49:24
maxBy:10> Subway(sNo=No1, userCount=33, enterTime=1689227364582)
stateResult:10> (No1,33,33)
Subway(sNo=No1, userCount=10, enterTime=1689227365613) ,格式化后时间 13:49:25
stateResult:10> (No1,10,33)
maxBy:10> Subway(sNo=No1, userCount=33, enterTime=1689227364582)
Subway(sNo=No0, userCount=20, enterTime=1689227366627) ,格式化后时间 13:49:26
stateResult:10> (No0,20,20)
maxBy:10> Subway(sNo=No0, userCount=20, enterTime=1689227366627)
Subway(sNo=No0, userCount=66, enterTime=1689227367633) ,格式化后时间 13:49:27
maxBy:10> Subway(sNo=No0, userCount=66, enterTime=1689227367633)
stateResult:10> (No0,66,66)
Subway(sNo=No2, userCount=2, enterTime=1689227368649) ,格式化后时间 13:49:28
stateResult:3> (No2,2,2)
maxBy:3> Subway(sNo=No2, userCount=2, enterTime=1689227368649)
Subway(sNo=No1, userCount=87, enterTime=1689227369662) ,格式化后时间 13:49:29
stateResult:10> (No1,87,87)
maxBy:10> Subway(sNo=No1, userCount=87, enterTime=1689227369662)
Subway(sNo=No1, userCount=96, enterTime=1689227370675) ,格式化后时间 13:49:30
maxBy:10> Subway(sNo=No1, userCount=96, enterTime=1689227370675)
stateResult:10> (No1,96,96)
Subway(sNo=No1, userCount=58, enterTime=1689227371680) ,格式化后时间 13:49:31
maxBy:10> Subway(sNo=No1, userCount=96, enterTime=1689227370675)
stateResult:10> (No1,58,96)
Subway(sNo=No1, userCount=24, enterTime=1689227372681) ,格式化后时间 13:49:32
maxBy:10> Subway(sNo=No1, userCount=96, enterTime=1689227370675)
stateResult:10> (No1,24,96)
Subway(sNo=No2, userCount=20, enterTime=1689227373695) ,格式化后时间 13:49:33
stateResult:3> (No2,20,20)
maxBy:3> Subway(sNo=No2, userCount=20, enterTime=1689227373695)

三、Operator State

用户可以通过实现 CheckpointedFunction 接口来使用 operator state。

1、CheckpointedFunction

CheckpointedFunction 接口提供了访问 non-keyed state 的方法,需要实现如下两个方法:

void snapshotState(FunctionSnapshotContext context) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;

进行 checkpoint 时会调用 snapshotState()。 用户自定义函数初始化时会调用 initializeState(),初始化包括第一次自定义函数初始化和从之前的 checkpoint 恢复。 因此 initializeState() 不仅是定义不同状态类型初始化的地方,也需要包括状态恢复的逻辑。

当前 operator state 以 list 的形式存在。这些状态是一个 可序列化 对象的集合 List,彼此独立,方便在改变并发后进行状态的重新分派。 换句话说,这些对象是重新分配 non-keyed state 的最细粒度。根据状态的不同访问方式,有如下几种重新分配的模式:

  • Even-split redistribution: 每个算子都保存一个列表形式的状态集合,整个状态由所有的列表拼接而成。当作业恢复或重新分配的时候,整个状态会按照算子的并发度进行均匀分配。 比如说,算子 A 的并发读为 1,包含两个元素 element1 和 element2,当并发读增加为 2 时,element1 会被分到并发 0 上,element2 则会被分到并发 1 上。

  • Union redistribution: 每个算子保存一个列表形式的状态集合。整个状态由所有的列表拼接而成。当作业恢复或重新分配时,每个算子都将获得所有的状态数据。 Do not use this feature if your list may have high cardinality. Checkpoint metadata will store an offset to each list entry, which could lead to RPC framesize or out-of-memory errors.

下面的例子中的 SinkFunction 在 CheckpointedFunction 中进行数据缓存,然后统一发送到下游,这个例子演示了列表状态数据的 event-split redistribution。

public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>, CheckpointedFunction {

    private final int threshold;

    private transient ListState<Tuple2<String, Integer>> checkpointedState;

    private List<Tuple2<String, Integer>> bufferedElements;

    public BufferingSink(int threshold) {
        this.threshold = threshold;
        this.bufferedElements = new ArrayList<>();
    }

    @Override
    public void invoke(Tuple2<String, Integer> value, Context contex) throws Exception {
        bufferedElements.add(value);
        if (bufferedElements.size() == threshold) {
            for (Tuple2<String, Integer> element: bufferedElements) {
                // send it to the sink
            }
            bufferedElements.clear();
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        checkpointedState.clear();
        for (Tuple2<String, Integer> element : bufferedElements) {
            checkpointedState.add(element);
        }
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        ListStateDescriptor<Tuple2<String, Integer>> descriptor =
            new ListStateDescriptor<>(
                "buffered-elements",
                TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));

        checkpointedState = context.getOperatorStateStore().getListState(descriptor);

        if (context.isRestored()) {
            for (Tuple2<String, Integer> element : checkpointedState.get()) {
                bufferedElements.add(element);
            }
        }
    }
}

initializeState 方法接收一个 FunctionInitializationContext 参数,会用来初始化 non-keyed state 的 “容器”。这些容器是一个 ListState 用于在 checkpoint 时保存 non-keyed state 对象。

注意这些状态是如何初始化的,和 keyed state 类系,StateDescriptor 会包括状态名字、以及状态类型相关信息。

ListStateDescriptor<Tuple2<String, Integer>> descriptor =
    new ListStateDescriptor<>(
        "buffered-elements",
        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));

checkpointedState = context.getOperatorStateStore().getListState(descriptor);

调用不同的获取状态对象的接口,会使用不同的状态分配算法。比如 getUnionListState(descriptor) 会使用 union redistribution 算法, 而 getListState(descriptor) 则简单的使用 even-split redistribution 算法。

当初始化好状态对象后,我们通过 isRestored() 方法判断是否从之前的故障中恢复回来,如果该方法返回 true 则表示从故障中进行恢复,会执行接下来的恢复逻辑。

正如代码所示,BufferingSink 中初始化时,恢复回来的 ListState 的所有元素会添加到一个局部变量中,供下次 snapshotState() 时使用。 然后清空 ListState,再把当前局部变量中的所有元素写入到 checkpoint 中。

另外,我们同样可以在 initializeState() 方法中使用 FunctionInitializationContext 初始化 keyed state。

2、带状态的 Source Function

带状态的数据源比其他的算子需要注意更多东西。为了保证更新状态以及输出的原子性(用于支持 exactly-once 语义),用户需要在发送数据前获取数据源的全局锁。

public static class CounterSource extends RichParallelSourceFunction<Long>  implements CheckpointedFunction {

    /**  current offset for exactly once semantics */
    private Long offset = 0L;

    /** flag for job cancellation */
    private volatile boolean isRunning = true;
    
    /** 存储 state 的变量. */
    private ListState<Long> state;
     
    @Override
    public void run(SourceContext<Long> ctx) {
        final Object lock = ctx.getCheckpointLock();

        while (isRunning) {
            // output and state update are atomic
            synchronized (lock) {
                ctx.collect(offset);
                offset += 1;
            }
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        state = context.getOperatorStateStore().getListState(new ListStateDescriptor<>(
            "state",
            LongSerializer.INSTANCE));
            
        // 从我们已保存的状态中恢复 offset 到内存中,在进行任务恢复的时候也会调用此初始化状态的方法
        for (Long l : state.get()) {
            offset = l;
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        state.clear();
        state.add(offset);
    }
}

希望订阅 checkpoint 成功消息的算子,可以参考 org.apache.flink.api.common.state.CheckpointListener 接口。

3、operator state示例:实现程序异常时自动保存state,当超过重启次数时中断运行

实际生产中,一般不需要自己实现state,除非特殊情况。

本示例仅仅用于展示state的工作过程。

本示例实现功能是当程序出现异常时能自动重启并保存当前的state信息,当超过2次异常后程序中断运行。

该示例肯定是画蛇添足,Flink已经实现了该类,并且在介绍operator state的时候也给出了示例,本示例仅仅是以极其简单的介绍一下operator state的实现。

1)、实现

  • AlanOperatorState.java
import java.util.Iterator;

import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

/**
 * @author alanchan 
 * 使用OperatorState中的ListState模拟KafkaSource进行offset维护
 */
public class AlanOperatorState extends RichParallelSourceFunction<String> implements CheckpointedFunction {
	private boolean flag = true;
	private ListState<Long> offsetState = null;
	private Long offset = 0L;

	// 创建ListState
	@Override
	public void initializeState(FunctionInitializationContext context) throws Exception {
		ListStateDescriptor<Long> stateDescriptor = new ListStateDescriptor<>("offsetState", Long.class);
		offsetState = context.getOperatorStateStore().getListState(stateDescriptor);
	}

	// 使用state
	@Override
	public void run(SourceContext<String> ctx) throws Exception {
		while (flag) {
			Iterator<Long> iterator = offsetState.get().iterator();
			// 由于是模拟,该迭代器仅有一条数据
			if (iterator.hasNext()) {
				offset = iterator.next();
			}
			offset += 1;
			int subTaskId = getRuntimeContext().getIndexOfThisSubtask();
			ctx.collect("subTaskId:" + subTaskId + ",当前的offset值为::" + offset);
			Thread.sleep(1000);

			// 模拟异常
			if (offset % 3 == 0) {
				throw new Exception("出现了异常.....");
			}

		}
	}

	// 持久化state, 该方法会定时执行将state状态从内存存入Checkpoint磁盘目录中
	@Override
	public void snapshotState(FunctionSnapshotContext context) throws Exception {
		offsetState.clear();// 清理内容数据并存入Checkpoint磁盘目录中
		offsetState.add(offset);
	}

	@Override
	public void cancel() {
		flag = false;
	}

}

  • TestOperatorStateDemo.java
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author alanchan
 *
 */
public class TestOperatorStateDemo {
	public static void main(String[] args) throws Exception {

		System.setProperty("HADOOP_USER_NAME", "alanchan");
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
		env.setParallelism(1);
		env.enableCheckpointing(1000);
		// 设置checkpoint点在hdfs上
		env.setStateBackend(new FsStateBackend("hdfs://server1:8020//flinktest/flinkckp"));
		env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
		env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

		// 重启策略:程序出现异常的时候,重启2次,每次延迟3秒钟重启,超过2次,程序退出
		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 3000));

		// source
		DataStreamSource<String> ds = env.addSource(new AlanOperatorState()).setParallelism(1);

		// transformation

		// sink
		ds.print();

		// execute
		env.execute();
	}
}

2)、运行结果

subTaskId:0,当前的offset值为::1
subTaskId:0,当前的offset值为::2
subTaskId:0,当前的offset值为::3
subTaskId:0,当前的offset值为::4
subTaskId:0,当前的offset值为::5
subTaskId:0,当前的offset值为::6
subTaskId:0,当前的offset值为::7
subTaskId:0,当前的offset值为::8
subTaskId:0,当前的offset值为::9
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
	at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
	at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
	at java.util.concurrent.CompletableFuture.uniApply(Unknown Source)
	at java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown Source)
	at java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
	at java.util.concurrent.CompletableFuture.complete(Unknown Source)
	at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:267)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
	at java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
	at java.util.concurrent.CompletableFuture.complete(Unknown Source)
	at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1277)
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
	at java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
	at java.util.concurrent.CompletableFuture.complete(Unknown Source)
	at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
	at akka.dispatch.OnComplete.internal(Future.scala:300)
	at akka.dispatch.OnComplete.internal(Future.scala:297)
	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
	at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
	at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)
	at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)
	at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)
	
。。。。。。

	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
	at java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(Unknown Source)
	at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
	at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2, backoffTimeMS=3000)
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:258)
	
。。。。。。

	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579)
	at akka.actor.ActorCell.invoke(ActorCell.scala:547)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
	at akka.dispatch.Mailbox.run(Mailbox.scala:231)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
	... 4 more
Caused by: java.lang.Exception: 出现了异常.....
	at org.datastreamapi.state.AlanOperatorState.run(AlanOperatorState.java:46)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:333)


4)、hdfs上的checkpoint

在这里插入图片描述

以上,本文介绍了Flink State中的keyed state 和 operator state基本功能及示例,其中包含详细的验证步骤与验证结果。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本专题分为以下几篇文章:

【flink番外篇】7、flink的State(Keyed State和operator state)介绍及示例(1) - Keyed State
【flink番外篇】7、flink的State(Keyed State和operator state)介绍及示例(2) - operator state
【flink番外篇】7、flink的State(Keyed State和operator state)介绍及示例 - 完整版

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/270004.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

dpdk原理概述及核心源码剖析

dpdk原理 1、操作系统、计算机网络诞生已经几十年了&#xff0c;部分功能不再能满足现在的业务需求。如果对操作系统做更改&#xff0c;成本非常高&#xff0c;所以部分问题是在应用层想办法解决的&#xff0c;比如前面介绍的协程、quic等&#xff0c;都是在应用层重新开发的框…

docker 私有仓库

Docker 私有仓库 一、私有仓库搭建 # 1、拉取私有仓库镜像 docker pull registry # 2、启动私有仓库容器 docker run -id --nameregistry -p 5000:5000 registry # 3、打开浏览器 输入地址http://私有仓库服务器ip:5000/v2/_catalog&#xff0c;看到{"repositories&quo…

Linux操作系统——进程(三) 进程优先级

进程优先级 首先呢&#xff0c;我们知道一个进程呢&#xff08;或者也可以叫做一个任务&#xff09;&#xff0c;它呢有时候要在CPU的运行队列中排队&#xff0c;要么有时候阻塞的时候呢又要在设备的等待队列中排队&#xff0c;其实我们排队的本质就是&#xff1a;确认优先级。…

【数据结构】什么是二叉树?

&#x1f984;个人主页:修修修也 &#x1f38f;所属专栏:数据结构 ⚙️操作环境:Visual Studio 2022 目录 &#x1f4cc;二叉树的定义 &#x1f4cc;二叉树的特点 &#x1f4cc;特殊二叉树 &#x1f4cc;二叉树的性质 &#x1f4cc;二叉树的存储结构 &#x1f4cc;二叉树…

大语言模型说明书

在浩瀚的信息宇宙中&#xff0c;大语言模型如同一颗璀璨的星星正在熠熠生辉。21世纪以来&#xff0c;人工智能可谓是飞速发展&#xff0c;从简单的神经网络到大语言模型、生成式AI&#xff0c;这并非仅仅是一种技术的进步&#xff0c;更是人类智慧的飞跃。大语言模型不仅仅是语…

opencv入门到精通——形态学转换

目录 目标 理论 1. 侵蚀 2. 扩张 3. 开运算 4. 闭运算 5. 形态学梯度 6. 顶帽 7. 黑帽 结构元素 目标 在这一章当中&#xff0c; 我们将学习不同的形态学操作&#xff0c;例如侵蚀&#xff0c;膨胀&#xff0c;开运算&#xff0c;闭运算等。我们将看到不同的功能&…

期末复习【计算机图像处理】

期末复习【计算机图像处理】 前言版权推荐期末复习期末考点内容期末考试题型一、填空 2分*10 概念二、简答 5分*2三、计算 10分*6四、绘图 10分*1 测验二测验三最后 前言 2023-12-25 15:09:07 昨天没有睡好 0点~3点看B站 Google模拟面试 3点~5点没睡着 5~6点睡了一会 6~12点终…

isp代理/双isp代理/数据中心代理的区别?如何选择?

本文我们来详细科普一下几种不同的代理类型&#xff1a;isp代理/双isp代理/数据中心代理&#xff0c;了解他们的区别&#xff0c;选择更适合自己的代理类型。 在讲述这几种代理类型之前&#xff0c;我们先复习一下代理大类有哪几种。 一、机房代理和非机房代理 在做代理ip选…

《试题与研究》期刊发表投稿方式

《试题与研究》杂志是面向全国公开发行的国家CN级权威教育期刊。创刊以来一直以服务教育服务学生为办刊宗旨&#xff0c;以优秀的内容质量和编校质量深受广大读者好评&#xff0c;其权威性、导向性、针对性、实用性在全国教育期刊中独树一帜。为推动教育科研事业的发展&#xf…

20231225使用亿佰特的蓝牙模块dongle协议分析仪E104-2G4U04A抓取BLE广播数据

20231225使用亿佰特的蓝牙模块dongle协议分析仪E104-2G4U04A抓取BLE广播数据 结论&#xff1a;硬件蓝牙分析仪 不一定比 手机端的APK的效果好&#xff01; 亿佰特E104-2G4U04A需要3片【单通道】&#xff0c;电脑端的UI为全英文的。 BLE-AnalyzerPro WCH升级版BLE-PRO蓝牙分析仪…

DRF视图组件

【1】两个视图基类 APIView APIView是 Django REST Framework 提供的一个基类&#xff0c;用于创建基于函数或基于类的视图。使用 APIView 可以根据需求自定义请求处理逻辑&#xff0c;对于简单的接口逻辑&#xff0c;可以直接继承APIView类。 GenericAPIView GenericAPIVi…

如何使用 YOLOv8 做对象检测

介绍 对象检测是一项计算机视觉任务&#xff0c;涉及识别和定位图像或视频中的对象。它是许多应用的重要组成部分&#xff0c;例如自动驾驶汽车、机器人和视频监控。 多年来&#xff0c;已经开发了许多方法和算法来查找图像中的对象及其位置。卷积神经网络对于此类任务有着非…

Dbeaver如何连接Oceanbase?

Dbeaver & Oceanbase 一、新增驱动二、连接数据库 一、新增驱动 1、新建驱动 点击数据库 -> 驱动管理器 -> 新建 2、设置驱动 驱动名称可随意填写注意驱动类型要是Generichost:port填写实际的host和port 库中新增下载的oceanbase驱动jar包 二、连接数据库 1、找…

linux 系统重启 Redis 服务

先 打开服务器 执行 sudo systemctl stop redis暂停Redis服务 然后 执行 sudo systemctl start redis启动 redis 服务 然后可以执行 sudo systemctl status redis查看 redis 状态

PyAV 使用浅谈

背景&#xff1a; PyAV是一个用于音频和视频处理的Python库&#xff0c;它提供了一个简单而强大的接口&#xff0c;用于解码、编码、处理和分析各种音频和视频格式。PyAV基于FFmpeg多媒体框架&#xff0c;它本质上是FFmpeg 的Python绑定&#xff0c;因此可以利用FFmpeg的功能来…

移动Web

文章目录 移动 Web一、平面转换1. 平移2. 旋转3. 渐变 二、空间转换1. 平移2. 旋转3. 动画 三、移动适配1. 谷歌模拟器2. rem3. less4. vw 四、响应式布局1. 媒体查询2. Bootstrap 移动 Web 一、平面转换 作用&#xff1a;为元素添加动态效果&#xff0c;一般与过渡配合使用 …

如何使用PatchaPalooza对微软每月的安全更新进行全面深入的分析

关于PatchaPalooza PatchaPalooza是一款针对微软每月安全更新的强大分析工具&#xff0c;广大研究人员可以直接使用该工具来对微软每月定期推送的安全更新代码进行详细、全面且深入的安全分析。 PatchaPalooza使用了微软MSRC CVRF API的强大功能来获取、存储和分析安全更新数…

MongoDB数据库本地部署并结合内网穿透实现navicat公网访问

文章目录 前言1. 安装数据库2. 内网穿透2.1 安装cpolar内网穿透2.2 创建隧道映射2.3 测试随机公网地址远程连接 3. 配置固定TCP端口地址3.1 保留一个固定的公网TCP端口地址3.2 配置固定公网TCP端口地址3.3 测试固定地址公网远程访问 前言 MongoDB是一个基于分布式文件存储的数…

.net6使用Sejil可视化日志

&#xff08;关注博主后&#xff0c;在“粉丝专栏”&#xff0c;可免费阅读此文&#xff09; 之前介绍了这篇.net 5使用LogDashboard_.net 5logdashboard rootpath-CSDN博客 这篇文章将会更加的简单&#xff0c;最终的效果都是可视化日志。 在程序非常庞大的时候&…

虚继承解决菱形继承的原理

菱形继承的问题&#xff0c;是由多重继承的父类祖先是同一个父类导致的。如下面的情况&#xff1a; 菱形继承&#xff0c;会导致同名成员的二义性问题和数据冗余问题&#xff0c;用下面的代码来测试&#xff1a; class A { public:int _a; }; // class B : public A class B :…
最新文章