【入门Flink】- 09Flink水位线Watermark

窗口的处理过程中,基于数据的时间戳,自定义一个“逻辑时钟”。这个时钟的时间不会自动流逝;它的时间进展,就是靠着新到数据的时间戳来推动的。

什么是水位线

用来衡量事件时间进展的标记,就被称作“水位线”(Watermark)

具体实现上,水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就应该是在某个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。

有序流中水位线

(1)理想状态(数据量小),数据应该按照生成的先后顺序进入流中,每条数据产生一个水位线

(2)实际应用中,如果当前数据量非常大,且同时涌来的数据时间差会非常小(比如几毫秒),往
往对处理计算也没什么影响。所以为了提高效率,会每隔一段时间生成一个水位线

乱序流中水位线

分布式系统中,数据在节点间传输,会因为网络传输延迟的不确定性,导致顺序发生改变,这就是
所谓的“乱序数据”。

(1)乱序+数据量小:还是靠数据来驱动,每来一个数据就提取它的时间戳、插入一个水位线。乱序数据,插入新的水位线时,要先判断一下时间戳是否比之前的大,否则就不再生成新的水位线。也就是说,只有数据的时间戳比当前时钟大,才能推动时钟前进,这时才插入水位线。

(2)乱序+数据量大:考虑到大量数据同时到来的处理效率,可以周期性地生成水位线。这时
只需要保存一下之前所有数据中的最大时间截,需要插入水位线时,就直接以它作为时间戳生成新的水位线。

(3)乱序+迟到数据:无法正确处理“迟到”的数据。为了让窗口能够正确收集到迟到的数据设置迟到时间,比如2秒:也就是用当前已有数据的最大时间戳减去2秒,就是要插入的水位线的时间戳。9秒的数据到来之后,事件时钟不会直接推进到9秒,而是进展到了7秒;必须等到11秒的数据到来之后,事件时钟才会进展到9秒,此时迟到2秒的数据也会被正确收集处理。【迟到时间不能设置过长,否则会对实时性会有所影响】

水位线的特性

  • 水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据
  • 水位线主要的内容是一个时间戳,用来表示当前事件时间的进展
  • 水位线是基于数据的时间戳生成的
  • 水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进
  • 水位线可以通过设置延迟,来保证正确处理乱序数据
  • 一个水位线Watermark(t),表示在当前流中事件时间已经达到了时间戳t,代表t之前的所 有数据都到齐了,之后流中不会出现时间t'≤t的数据

水位线与窗口配合,完成对乱序数据的正确处理

水位线是流处理中对低延迟和结果正确性的一个权衡机制。

水位线生成策略

生成水位线的方法:.assignTimestampsAndWatermarks(),主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间。【指定水位线生成策略】

stream.assignTimestampsAndWatermarks(<watermark strategy>);

WatermarkStrategy 水位线策略是一个接口,里面内置一些生成策略:

image-20231111224149038

有序流中内置水位线设置

时间戳单调增长,所以永远不会出现迟到数据的问题。WatermarkStrategy.forMonotonousTimestamps()

        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy.
                <WaterSensor>forMonotonousTimestamps()
                // 指定时间戳分配器,从数据中提取 单位毫秒
                .withTimestampAssigner((SerializableTimestampAssigner<WaterSensor>) (element, recordTimestamp) -> {
                    System.out.println(" 数据 =" + element + ",recordTs=" + recordTimestamp);
                    return element.getTs() * 1000L;
                });

乱序流中内置水位线设置

由于乱序流中需要等待迟到数据到齐,必须设置一个固定量的延迟时间。WatermarkStrategy. forBoundedOutOfOrderness()

        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
                // 乱序数据,等待3s
                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                // 指定时间戳分配器,从数据中提取 单位毫秒
                .withTimestampAssigner((SerializableTimestampAssigner<WaterSensor>) (element, recordTimestamp) -> {
                    System.out.println(" 数据 =" + element + ",recordTs=" + recordTimestamp);
                    return element.getTs() * 1000L;
                });

自定义水位线生成器

(1)周期性水位线生成器(Periodic Generator)

周期性生成器一般是通过 onEvent()观察判断输入的事件,而在onPeriodicEmit()里发出水位线。
模仿该类BoundedOutOfOrdernessWatermarks

public class CustomBoundedOutOfOrdernessGenerator<T> implements WatermarkGenerator<T> {

    private Long delayTime = 5000L; // 延迟时间
    private Long maxTs = Long.MIN_VALUE + delayTime + 1L; // 观察到的最大时间戳

    @Override
    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
        // 每来一条数据就调用一次
        maxTs = Math.max(eventTimestamp, maxTs); // 更新最大时间戳
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // 发射水位线,默认 200ms 调用一次
        output.emitWatermark(new Watermark(maxTs - delayTime - 1L));
    }
}

在 onPeriodicEmit()里调用 output.emitWatermark(),就可以发出水位线了;方法由系统框架周期性地调用,默认 200ms 一次。【不建议修改】

 env.getConfig().setAutoWatermarkInterval(400L);

(2)断点式水位线生成器(Punctuated Generator)

断点式生成器会不停地检测 onEvent()中的事件,当发现带有水位线信息的事件时,就立即发出水位线。

如下:只要有数据来就直接发射水位线

    @Override
    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
        // 每来一条数据就调用一次
        maxTs = Math.max(eventTimestamp, maxTs); // 更新最大时间戳
        output.emitWatermark(new Watermark(maxTs - delayTime - 1L));
    }

(3)在数据源中发送水位线

可以在自定义的数据源中抽取事件时间,然后发送水位线。

自定义数据源中生成水位线和在程序中使用assignTimestampsAndWatermarks 方法生成水位线二者只能取其一

env.fromSource(
kafkaSource,
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), 
// WatermarkStrategy.noWatermarks() 或者不发送水位线
"kafkasource"
)

水位线的传递(空闲等待withIdleness)

一个任务接收到多个上游并行任务传递来的水位线时,应该以最小的作为当前任务的事件时钟

如下案例:当程序并行度设置为2时,自定义分区器导致一个分区一直拿不到数据(最小时钟一直为null),此时如不加以干预,事件时钟将永远不会推进,存在问题。设置空闲时间,当超过空闲时间一直收不到该分区数据,直接忽略该分区,还是会依旧推进时间时钟

        env.setParallelism(2);
		// 自定义分区器:数据%分区数,只输入奇数,都只会去往map 的一个子任务
        SingleOutputStreamOperator<Integer> socketDS = env.socketTextStream("xxxx", 7777)
                .partitionCustom(new MyPartitioner(), r -> r).map(Integer::parseInt)
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<Integer>forMonotonousTimestamps().withTimestampAssigner((r, ts) -> r * 1000L)
                                .withIdleness(Duration.ofSeconds(5)) // 空闲等待 5s
                );

	    // 分成两组:奇数一组,偶数一组,开 10s 的事件时间滚动窗口socketDS
        .keyBy(r -> r % 2)
        .window(TumblingEventTimeWindows.of(Time.seconds(10)))
        ...

迟到数据的处理

1)推迟水印推进(设置延迟时间)

水印产生时,设置一个乱序容忍度,推迟系统时间的推进,保证窗口计算被延迟执行,为乱序的数据争取更多的时间进入窗口。

WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));

2)设置窗口延迟关闭

Flink 的窗口,也允许迟到数据。当触发了窗口计算后,会先计算当前的结果,但是此时并不会关闭窗口。当达到设置延迟关闭时间之后,才会真正关闭窗口,关闭窗口后再迟到的数据就不会再处理。

.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(3))

3)使用侧流接收迟到的数据

最后兜底,窗口关闭之后的迟到数据,使用侧输出流输出。

完整方案:

public class WatermarkLateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("124.222.253.33", 7777)
                .map(new WaterSensorMapFunction());
        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
          		 // 1.设置迟到时间 3s
                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                .withTimestampAssigner((element, recordTimestamp) -> element.getTs() * 1000L)
                // .withIdleness(Duration.ofSeconds(5)); // 空闲等待 5s;

        SingleOutputStreamOperator<WaterSensor>
                sensorDSWithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);
        OutputTag<WaterSensor> lateTag = new OutputTag<>("latedata", Types.POJO(WaterSensor.class));
        SingleOutputStreamOperator<String> process = sensorDSWithWatermark.keyBy(WaterSensor::getId)
                .window(TumblingEventTimeWindows.of(Time.seconds(10))) 
                .allowedLateness(Time.seconds(2)) // 2.推迟2s关窗
                .sideOutputLateData(lateTag) // 3.关窗后的迟到数据,放入侧输出流
                .process(
                        new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
                            @Override
                            public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                                long startTs = context.window().getStart();
                                long endTs = context.window().getEnd();
                                String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                                String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");
                                long count = elements.spliterator().estimateSize();
                                out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ") 包含 " + count + " 条数据===>" + elements);
                            }
                        }
                );
        process.print();
        // 从主流获取侧输出流,打印
        process.getSideOutput(lateTag).printToErr("关窗后的迟到数据");
        env.execute();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/134197.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

AIGC ChatGPT 4 轻松实现小游戏开发制作

贪吃蛇的小游戏相信大家都玩儿过,我们让ChatGPT4来帮我们制作一个贪吃蛇的小游戏。 在ChatGPT中发送Prompt如下图: 完整代码如下: <!DOCTYPE html> <html> <head> <title>贪吃蛇游戏</title> <style type="text/css"> #can…

UPLAOD-LABS2

less7 任务 拿到一个shell服务器 提示 禁止上传所有可以解析的后缀 发现所有可以解析的后缀都被禁了 查看一下源代码 $is_upload false; $msg null; if (isset($_POST[submit])) {if (file_exists($UPLOAD_ADDR)) {$deny_ext array(".php",".php5&quo…

第一百六十九回 如何修改NavigationBar的形状

文章目录 1. 概念介绍2. 使用方法3. 代码与效果3.1 示例代码3.2 运行效果 4. 内容总结 我们在上一章回中介绍了"如何修改按钮的形状"相关的内容&#xff0c;本章回中将介绍NavigationBar组件.闲话休提&#xff0c;让我们一起Talk Flutter吧。 1. 概念介绍 我们在本章…

javaEE案例,前后端交互,计算机和用户登录

加法计算机,前端的代码如下 : 浏览器访问的效果如图 : 后端的代码如下 再在浏览器进行输入点击相加,就能获得结果 开发中程序报错,如何定位问题 1.先定位前端还是后端(通过日志分析) 1)前端 : F12 看控制台 2)后端 : 接口,控制台日志 举个例子: 如果出现了错误,我们就在后端…

WebSocket是什么以及其与HTTP的区别

新钛云服已累计为您分享774篇技术干货 HTTP协议 HTTP是单向的&#xff0c;客户端发送请求&#xff0c;服务器发送响应。举个例子&#xff0c;当用户向服务器发送请求时&#xff0c;该请求采用HTTP或HTTPS的形式&#xff0c;在接收到请求后&#xff0c;服务器将响应发送给客户端…

Linux技能篇-yum源搭建(本地源和公网源)

文章目录 前言一、yum源是什么&#xff1f;二、使用镜像搭建本地yum源1.搭建临时仓库第一步&#xff1a;挂载系统ios镜像到虚拟机第二步&#xff1a;在操作系统中挂载镜像第三步&#xff1a;修改yum源配置文件 2.搭建本地仓库第一步&#xff1a;搭建临时yum源来安装httpd并做文…

在Win11中使用docker安装Oracle19c

在Win11中使用docker安装Oracle19c 首先是去docker官网下 docker for windows安装oracle19c首先下载image运行镜像在工具中登录可能遇到的问题 首先是去docker官网下 docker for windows 官网&#xff1a; https://www.docker.com/get-started/ 如果Windows是专业版&#xff0…

【Git】Git使用Gui图形化界面,Git中SSH协议,Idea集成Git

一&#xff0c;Git使用Gui图形化界面 1.1 Gui的简介 Gui &#xff08;Graphical User Interface&#xff09;指的是图形用户界面&#xff0c;也就是指使用图形化方式来协同人和计算机进行交互的一类程序。它与传统的命令行界面相比&#xff0c;更加直观、易用&#xff0c;用户…

智慧城市数据中台建设方案:PPT全文51页,附下载

关键词&#xff1a;智慧城市解决方案&#xff0c;数据中台解决方案&#xff0c;智慧城市建设&#xff0c;数据中台技术架构&#xff0c;数据中台建设 一、智慧城市数据中台建设背景 智慧城市数据中台是在城市数字化转型和智能化升级的背景下提出的&#xff0c;旨在实现城市数…

ABAQUS分析步笔记

定义原则&#xff1a; 每个step的所有边界条件&#xff0c;载荷条件累加构成本step的仿真效果&#xff1b; step2需要在step1的状态基础上进行载荷运动等限定时&#xff0c;需要确保在step2中传递了step1的想要保留的特征&#xff0c;如&#xff1a; 1、BC-1 这里的BC-1的固…

积极应对云网络安全

以下是 IT 领导者需要了解的内容&#xff0c;才能在云网络安全方面占据上风。 如果您的组织尚未主动解决云网络安全问题&#xff0c;则将面临灾难的风险。等待攻击发生根本没有意义。 主动云安全会采取积极措施来发现潜在威胁并在网络攻击发生之前阻止网络攻击。 这是通过持…

java 继承和多态 (图文搭配,万字详解!!)

目录 1.继承 1.1 为什么需要继承 1.2 继承概念 1.3 继承的语法 1.4 父类成员访问 1.4.1 子类中访问父类的成员变量 1.4.2 子类中访问父类的成员方法 1.5 super关键字 1.6 子类构造方法 1.7 super和this 1.8 再谈初始化 1.9 protected 关键字 1.10 继承方式 1.11 f…

OpenCV C++ 图像处理实战 ——《多二维码识别》

OpenCV C++ 图像处理实战 ——《多二维码识别》 一、结果演示二、zxing库配置2.1下载编译三、多二维码识别3.1 Method one3.1.1 源码3.2 Method two3.2.1 源码四、源码测试图像下载总结一、结果演示 </

第 117 场 LeetCode 双周赛题解

A 给小朋友们分糖果 I 动态规划&#xff1a;设 p [ k ] [ i ] p[k][i] p[k][i] 为将 i i i 个糖果分给 k k k 个小朋友的方案数&#xff0c;先求 p [ 2 ] [ i ] p[2][i] p[2][i] &#xff0c;再求 p [ 3 ] [ n ] p[3][n] p[3][n] class Solution { public:using ll long …

基于Python+OpenCV+SVM车牌识别系统-车牌预处理系统

欢迎大家点赞、收藏、关注、评论啦 &#xff0c;由于篇幅有限&#xff0c;只展示了部分核心代码。 文章目录 一项目简介简介系统流程系统优势 二、功能三、系统四. 总结 一项目简介 ## PythonOpenCVSVM车牌识别系统介绍 简介 PythonOpenCVSVM车牌识别系统是一种基于计算机视…

【C++】this指针讲解超详细!!!

&#x1f490; &#x1f338; &#x1f337; &#x1f340; &#x1f339; &#x1f33b; &#x1f33a; &#x1f341; &#x1f343; &#x1f342; &#x1f33f; &#x1f344;&#x1f35d; &#x1f35b; &#x1f364; &#x1f4c3;个人主页 &#xff1a;阿然成长日记 …

postman接口自动化测试

Postman除了前面介绍的一些功能&#xff0c;还有其他一些小功能在日常接口测试或许用得上。今天&#xff0c;我们就来盘点一下&#xff0c;如下所示&#xff1a; 1.数据驱动 想要批量执行接口用例&#xff0c;我们一般会将对应的接口用例放在同一个Collection中&#xff0c;然…

Dubbo从入门到上天系列第五篇:Dubbo3与JDK17不兼容问题展示

文章目录 一&#xff1a;JDK 与 Dubbo版本对应问题说明 1&#xff1a;问题1 2&#xff1a;问题2 二&#xff1a;Spring与JDK版本对应关系 1&#xff1a;对应关系详图 2&#xff1a;JDK与Major对应关系图 大神链接&#xff1a;作者有幸结识技术大神孙哥为好友&#xff0c…

asp.net学生宿舍管理系统VS开发sqlserver数据库web结构c#编程Microsoft Visual Studio

一、源码特点 asp.net 学生宿舍管理系统是一套完善的web设计管理系统&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。开发环境为vs2010&#xff0c;数据库为sqlserver2008&#xff0c;使用c#语言 开发 asp.net学生宿舍管理系统1 应用技…

C语言达到什么水平才能从事单片机工作

C语言达到什么水平才能从事单片机工作 从事单片机工作需要具备一定的C语言编程水平。以下是几个关键要点&#xff1a;基本C语言知识&#xff1a; 掌握C语言的基本语法、数据类型、运算符、流控制语句和函数等基本概念。最近很多小伙伴找我&#xff0c;说想要一些C语言学习资料&…
最新文章