大数据-玩转数据-Flink 水印

一、Flink 中的水印

在Flink的流式操作中, 会涉及不同的时间概念:

1.1 处理时间

是指的执行操作的各个设备的时间,对于运行在处理时间上的流程序, 所有的基于时间的操作(比如时间窗口)都是使用的设备时钟。比如, 一个长度为1个小时的窗口将会包含设备时钟表示的1个小时内所有的数据。 假设应用程序在 9:15am分启动, 第1个小时窗口将会包含9:15am到10:00am所有的数据,然后下个窗口是10:00am-11:00am, 等等。处理时间是最简单时间语义, 数据流和设备之间不需要做任何的协调。他提供了最好的性能和最低的延迟。 但是, 在分布式和异步的环境下,处理时间没有办法保证确定性,容易受到数据传递速度的影响: 事件的延迟和乱序。在使用窗口的时候, 如果使用处理时间, 就指定时间分配器为处理时间分配器。

1.2 事件时间

是指的这个事件发生的时间。在event进入Flink之前, 通常被嵌入到了event中, 一般作为这个event的时间戳存在。在事件时间体系中, 时间的进度依赖于数据本身,和任何设备的时间无关。事件时间程序必须制定如何产生Event Time Watermarks(水印) 。假设所有数据都已到达,事件时间操作将按预期方式运行,即使在处理无序或迟到的事件或重新处理历史数据时,也会产生正确且一致的结果。例如,每小时事件时间窗口将包含带有事件时间戳的所有记录,这些记录落入该小时。在使用窗口的时候, 如果使用事件时间, 就指定时间分配器为事件时间分配器。从1.12开始, Flink内部已经把默认的语义改成了事件时间。

1.3 Flink中的WaterMark

支持event time的流式处理框架需要一种能够测量event time 进度的方式。 比如, 一个窗口算子创建了一个长度为1小时的窗口,那么这个算子需要知道事件时间已经到达了这个窗口的关闭时间,从而在程序中去关闭这个窗口。事件时间可以不依赖处理时间来表示时间的进度。例如,在程序中, 即使处理时间和事件时间有相同的速度, 事件时间可能会轻微的落后处理时间。另外一方面使用事件时间可以在几秒内处理已经缓存在Kafka中多周的数据,这些数据可以照样被正确处理, 就像实时发生的一样能够进入正确的窗口。这种在Flink中去测量事件时间的进度的机制就是watermark(水印)。

1.4 Flink中如何产生水印

在这里插入图片描述

二、代码集成

package com.lyh.flink08;

import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import javax.naming.Context;
import java.time.Duration;

public class WatorMark_01 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<WaterSensor> stream = env.socketTextStream("hadoop100", 9999)
                .map(new MapFunction<String, WaterSensor>() {
                    @Override
                    public WaterSensor map(String value) throws Exception {
                        String[] datas = value.split(",");
                        return new WaterSensor(datas[0],
                                Long.valueOf(datas[1]),
                                Integer.valueOf(datas[2]))
                                ;
                    }
                });
        WatermarkStrategy<WaterSensor> wms = WatermarkStrategy
                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() { // 指定时间戳
            @Override
            public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                return element.getTs() * 1000;
            }
        });

        stream
                .assignTimestampsAndWatermarks(wms) // 指定水印和时间戳
                .keyBy(WaterSensor::getId)
          .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
                             @Override
                             public void process(String key,
                                                 Context ctx,
                                                 Iterable<WaterSensor> elements,
                                                 Collector<String> out) throws Exception {
                                 String msg = "当前key: " + key
                                         + "窗口: [" + ctx.window().getStart() / 1000 + "," + ctx.window().getEnd()/1000 + ") 一共有 "
                                         + elements.spliterator().estimateSize() + "条数据 ";
                                 out.collect(msg);
                             }
                         }
                ).print();
        env.execute();
    }
}

三、测试结果

在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/99980.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Uncaught ReferenceError: process is not defined

最近在搞老项目升级,将Vue2.6.11里的vuecli5.0.8升级到vite最新版本4.4.9&#xff0c;中间遇到不少问题&#xff0c;有机会以后做记录。 遇到问题 把所有的工作就搞好项目也成功的跑起来&#xff0c;页面一片空白。打开控制台 Uncaught ReferenceError: process is not defi…

hive部署

下载hive安装包&#xff1a;https://dlcdn.apache.org/hive/hive-2.3.9/解压及环境部署 tar -zxvf apache-hive-2.3.9-bin.tar.gz mv apache-hive-2.3.9-bin hivevim /etc/profile添加至环境变量 export HIVE_HOME/usr/local/hive export PATH$PATH:$HIVE_HOME/binsource /etc…

单片机电子元器件-按键

电子元器件 按键上有 四个引脚 1 2 、 3 4 按下之后 导通 1 3 、 2 4 初始导通 通常按键开关为机械弹性开关&#xff0c;开关在闭合不会马上稳定的接通&#xff0c;会有一连串的抖动 抖动时间的长短有机械特性来决定的&#xff0c;一般为5ms 到10 ms 。 消抖的分类 硬件消…

python爬取bilibili,下载视频

一. 内容简介 python爬取bilibili&#xff0c;下载视频 二. 软件环境 2.1vsCode 2.2Anaconda version: conda 22.9.0 2.3代码 链接&#xff1a;https://pan.baidu.com/s/1WuXTso_iltLlnrLffi1kYQ?pwd1234 三.主要流程 3.1 下载单个视频 代码 import requests impor…

如何使用ArcGIS Earth制作地图动画视频

通常情况下&#xff0c;我们所看到的地图都是静态展示&#xff0c;对于信息的传递&#xff0c;视频比图片肯定会更加丰富&#xff0c;所以制作地图动画视频更加有利于信息的传递&#xff0c;这里我们讲解一下ArcGIS Earth 2.0如何制作地图动画视频&#xff0c;希望能对你有所帮…

pytest---添加自定义命令行参数(pytest_addoption )

前言 在目前互联网公司中&#xff0c;都会存在多个测试环境&#xff0c;那么当我们编写的自动化想要在多套测试环境下进行运行时&#xff0c;如何使用&#xff1f;大多数人想到的可能是通过将我们自动化代码中的地址修改成不同环境&#xff0c;但是这时候就会增加一些工作量&am…

MySQL以及版本介绍

一、MySQL的介绍 MySQL数据库管理系统由瑞典的DataKonsultAB公司研发&#xff0c;该公司被Sun公司收购&#xff0c;现在Sun公司又被Oracle公司收购&#xff0c;因此MySQL目前属于 Oracle 旗下产品。 MySQL所使用的 SQL 语言是用于访问数据库的最常用标准化语言。MySQL 软件采用…

DEAP库文档教程五----计算统计

本小结将重点围绕模型在计算统计方面的问题&#xff0c;进行详细的论述 1、Computing Statistics 通常情况下&#xff0c;我们想要在优化过程中编辑数据。Statistic模块可以在任何设计好的目标上改变一些本不可改变的数据。为了达到这个目的&#xff0c;需要使用与工具箱中完…

企业数字化转型的关键技术有哪些?_光点科技

随着科技的不断进步和信息技术的快速发展&#xff0c;企业数字化转型已经成为保持竞争力和适应市场变化的关键举措。在这个数字化时代&#xff0c;企业需要借助先进的技术来优化业务流程、提升效率&#xff0c;以及更好地满足客户需求。以下是企业数字化转型过程中的关键技术。…

Modbus转Profinet网关在大型自动化仓储项目应用案例

在自动化仓储项目中&#xff0c;Modbus是一种常见的通信协议&#xff0c;用于连接各种设备&#xff0c;例如传感器、PLC和人机界面。然而&#xff0c;Modbus协议只支持串行通信&#xff0c;并且数据传输速度较慢。为了提高通信效率和整体系统性能&#xff0c;许多大型仓储项目选…

Docker环境搭建Prometheus实验环境

环境&#xff1a; OS&#xff1a;Centos7 Docker: 20.10.9 - Community Centos部署Docker 【Kubernetes】Centos中安装Docker和Minikube_云服务器安装docker和minikube_DivingKitten的博客-CSDN博客 一、拉取Prometheus镜像 ## 拉取镜像 docker pull prom/prometheus ## 启动p…

02_块元素和行内元素的使用

一、HTML块元素和行内元素的使用 1、块元素: div标签 定义和用法&#xff1a; 标签块元素,表示一块内容,div标签可以把文档分割为独立的、不同的部分可以使用css设置宽高默认是占用一整快 例如: <html><body><!-- 块元素:div标签 --><div style"he…

C++面试题(陆)-数据库(一)

目录 数据库 1.1SQL 1.1.1 介绍一下数据库分页 1.1.2 介绍一下SQL中的聚合函数 1.1.3 表跟表是怎么关联的&#xff1f; 1.1.4 说一说你对外连接的了解 1.1.6 SQL中怎么将行转成列&#xff1f; 1.1.7 谈谈你对SQL注入的理解 1.1.8 将一张表的部分数据更新到另一张表&am…

STM32 RTC实验

RTC时钟简介 STM32F103的实时时钟&#xff08;RTC&#xff09;是一个独立的定时器。 STM32的RTC模块拥有一组连续计数的计数器&#xff0c;在相对应的软件配置下&#xff0c;可提供时钟日历的功能。 修改计数器的值可以重新设置系统的当前时间和日期。 RTC模块和时钟配置系统…

uniapp项目实战系列(4):服务的异步请求,请求服务的二次封装

目录 系列往期文章&#xff08;点击跳转&#xff09;uniapp项目实战系列(1)&#xff1a;导入数据库&#xff0c;启动后端服务&#xff0c;开启代码托管&#xff08;点击跳转&#xff09;uniapp项目实战系列(2)&#xff1a;新建项目&#xff0c;项目搭建&#xff0c;微信开发工具…

七、高并发内存池--Page Cache

七、高并发内存池–Page Cache 7.1 PageCache的工作原理 PageCache是以span的大小(以页为单位)和下标一一对应为映射关系的哈希桶&#xff0c;下标是几就说明这个哈希桶下挂的span的大小就是几页的&#xff0c;是绝对映射的关系。因为PageCache也是全局只有唯一一个的&#x…

线上批量查询物流导出到表格的操作指南

现在的生活中&#xff0c;我们经常需要查询包裹物流信息。如果一次性需要查询多个快递单号的物流信息&#xff0c;手动一个一个查询会非常麻烦。今天&#xff0c;我将向大家分享一个简单实用的方法&#xff0c;可以批量查询物流并导出到表格&#xff0c;方便随时查看。 首先&am…

js 正则表达式 验证 :页面中一个输入框,可输入1个或多个vid/pid,使用英文逗号隔开...

就是意思一个输入框里面&#xff0c;按VID/PID格式输入,VID和PID最大长度是4,最多50组 1、页面代码 <el-form ref"ruleForm" :model"tempSet" :rules"rules" label-position"right"> <!-- 最多 50组&#xff0c;每组9个字符…

【USRP】集成化仪器系列1 :信号源,基于labview实现

USRP 信号源 1、设备IP地址&#xff1a;默认为192.168.10.2&#xff0c;请勿 修改&#xff0c;运行阶段无法修改。 2、天线输出端口是TX1&#xff0c;请勿修改。 3、通道&#xff1a;0 对应RF A、1 对应 RF B&#xff0c;运行 阶段无法修改。 4、中心频率&#xff1a;当需要…

LinuxUbuntu安装OpenWAF

Linux&Ubuntu安装OpenWAF 官方GitHub地址 介绍 OpenWAF&#xff08;Web Application Firewall&#xff09;是一个开源的Web应用防火墙&#xff0c;用于保护Web应用程序免受各种网络攻击。它通过与Web服务器集成&#xff0c;监控和过滤对Web应用程序的流量&#xff0c;识…