Flink之迟到的数据

迟到数据的处理

  1. 推迟水位线推进: WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
  2. 设置窗口延迟关闭:.allowedLateness(Time.seconds(3))
  3. 使用侧流接收迟到的数据: .sideOutputLateData(lateData)
public class Flink12_LateDataCorrect {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<Event> ds = env.socketTextStream("hadoop102", 8888)
                .map(
                        line -> {
                            String[] fields = line.split(",");
                            return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim()));
                        }
                ).assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2)) // 水位线延迟2秒
                                .withTimestampAssigner(
                                        (event, ts) -> event.getTs()
                                )
                );

        ds.print("input");

        OutputTag<WordCountWithTs> lateOutputTag = new OutputTag<>("late", Types.POJO(WordCountWithTs.class));
        //new OutputTag<WordCount>("late"){}

        SingleOutputStreamOperator<UrlViewCount> urlViewCountDs = ds.map(
                event -> new WordCountWithTs(event.getUrl(), 1 , event.getTs())
        ).keyBy(
                WordCountWithTs::getWord
        ).window(
                TumblingEventTimeWindows.of(Time.seconds(10))
        ).allowedLateness(Time.seconds(5))  // 窗口延迟5秒关闭
         .sideOutputLateData(lateOutputTag) // 捕获到侧输出流
        .aggregate(
                new AggregateFunction<WordCountWithTs, UrlViewCount, UrlViewCount>() {
                    @Override
                    public UrlViewCount createAccumulator() {
                        return new UrlViewCount();
                    }

                    @Override
                    public UrlViewCount add(WordCountWithTs value, UrlViewCount accumulator) {
                        accumulator.setCount((accumulator.getCount() == null ? 0L : accumulator.getCount()) + value.getCount());
                        return accumulator;
                    }

                    @Override
                    public UrlViewCount getResult(UrlViewCount accumulator) {
                        return accumulator;
                    }

                    @Override
                    public UrlViewCount merge(UrlViewCount a, UrlViewCount b) {
                        return null;
                    }
                }
                ,
                new ProcessWindowFunction<UrlViewCount, UrlViewCount, String, TimeWindow>() {
                    @Override
                    public void process(String key, ProcessWindowFunction<UrlViewCount, UrlViewCount, String, TimeWindow>.Context context, Iterable<UrlViewCount> elements, Collector<UrlViewCount> out) throws Exception {
                        UrlViewCount urlViewCount = elements.iterator().next();
                        //补充url
                        urlViewCount.setUrl(key);
                        //补充窗口信息
                        urlViewCount.setWindowStart(context.window().getStart());
                        urlViewCount.setWindowEnd(context.window().getEnd());

                        // 写出
                        out.collect(urlViewCount);
                    }
                }
        );
        urlViewCountDs.print("window") ;

        //TODO 将窗口的计算结果写出到Mysql的表中, 有则更新,无则插入
        /*
            窗口触发计算输出的结果,该部分数据写出到mysql表中执行插入操作,
            后续迟到的数据,如果窗口进行了延迟, 窗口还能正常对数据进行计算, 该部分数据写出到mysql执行更新操作。

            建表语句:
            CREATE TABLE `url_view_count` (
              `url` VARCHAR(100) NOT NULL  ,
              `cnt` BIGINT NOT NULL,
              `window_start` BIGINT NOT NULL,
              `window_end` BIGINT NOT NULL,
              PRIMARY KEY (url, window_start, window_end )  -- 联合主键
            ) ENGINE=INNODB DEFAULT CHARSET=utf8
         */

        SinkFunction<UrlViewCount> jdbcSink = JdbcSink.<UrlViewCount>sink(
                "replace into url_view_count(url, cnt ,window_start ,window_end) value (?,?,?,?)",
                new JdbcStatementBuilder<UrlViewCount>() {
                    @Override
                    public void accept(PreparedStatement preparedStatement, UrlViewCount urlViewCount) throws SQLException {
                        preparedStatement.setString(1, urlViewCount.getUrl());
                        preparedStatement.setLong(2, urlViewCount.getCount());
                        preparedStatement.setLong(3, urlViewCount.getWindowStart());
                        preparedStatement.setLong(4, urlViewCount.getWindowEnd());
                    }
                },
                JdbcExecutionOptions.builder()
                        .withBatchSize(2)
                        .withMaxRetries(3)
                        .withBatchIntervalMs(1000L)
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withDriverName("com.mysql.cj.jdbc.Driver")
                        .withUrl("jdbc:mysql://hadoop102:3306/test")
                        .withUsername("root")
                        .withPassword("000000")
                        .build()

        );

        urlViewCountDs.addSink(jdbcSink) ;


        //捕获侧输出流
        SideOutputDataStream<WordCountWithTs> lateData = urlViewCountDs.getSideOutput(lateOutputTag);
        lateData.print("late");
        //TODO 将侧输出流中的数据,写出到mysql中的表中,需要对mysql中已经存在的数据进行修正
        //转换结构  WordCountWithTs => UrlViewCount
        //调用flink计算窗口的方式, 基于当前数据的时间计算对应的窗口
        SingleOutputStreamOperator<UrlViewCount> mapDs = lateData.map(
                wordCountWithTs -> {
                    Long windowStart = TimeWindow.getWindowStartWithOffset(wordCountWithTs.getTs()/*数据时间*/, 0L/*偏移*/, 10000L/*窗口大小*/);
                    Long windowEnd = windowStart + 10000L;
                    return new UrlViewCount(wordCountWithTs.getWord(), 1L, windowStart, windowEnd);
                }
        );
        // 写出到mysql中
        SinkFunction<UrlViewCount> lateJdbcSink = JdbcSink.<UrlViewCount>sink(
                "insert into url_view_count (url ,cnt , window_start ,window_end) values(?,?,?,?) on duplicate key update cnt = VALUES(cnt) + cnt  ",
                new JdbcStatementBuilder<UrlViewCount>() {
                    @Override
                    public void accept(PreparedStatement preparedStatement, UrlViewCount urlViewCount) throws SQLException {
                        preparedStatement.setString(1, urlViewCount.getUrl());
                        preparedStatement.setLong(2, urlViewCount.getCount());
                        preparedStatement.setLong(3, urlViewCount.getWindowStart());
                        preparedStatement.setLong(4, urlViewCount.getWindowEnd());
                    }
                },
                JdbcExecutionOptions.builder()
                        .withBatchSize(2)
                        .withMaxRetries(3)
                        .withBatchIntervalMs(1000L)
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withDriverName("com.mysql.cj.jdbc.Driver")
                        .withUrl("jdbc:mysql://hadoop102:3306/test")
                        .withUsername("root")
                        .withPassword("000000")
                        .build()

        );

        mapDs.addSink(lateJdbcSink) ;

        try {
            env.execute();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

withIdleness关键字

解决某条流长时间没有数据,不能推进水位线,导致下游窗口的窗口无法正常计算。

public class Flink12_WithIdleness {
    public static void main(String[] args) {
        //1.创建运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //默认是最大并行度
        env.setParallelism(1);

        SingleOutputStreamOperator<Event> ds1 = env.socketTextStream("hadoop102", 8888)
                        .map(
                                line -> {
                                    String[] words = line.split(" ");
                                    return new Event(words[0].trim(), words[1].trim(), Long.valueOf(words[2].trim()));
                                }
                        ).assignTimestampsAndWatermarks(
                                WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                                        .withTimestampAssigner(
                                                (event, ts) -> event.getTs()
                                        )
                                        //如果超过10秒钟不发送数据,就不等待该数据源的水位线
                                        .withIdleness(Duration.ofSeconds(10))
                        );
        ds1.print("input1");

        SingleOutputStreamOperator<Event> ds2 = env.socketTextStream("hadoop102", 9999)
                .map(
                        line -> {
                            String[] words = line.split(" ");
                            return new Event(words[0].trim(), words[1].trim(), Long.valueOf(words[2].trim()));
                        }
                ).assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                                .withTimestampAssigner(
                                        (event, ts) -> event.getTs()
                                )
                                //如果超过10秒钟不发送数据,就不等待该数据源的水位线
//                                .withIdleness(Duration.ofSeconds(10))
                );
        ds2.print("input2");

        ds1.union(ds2)
                .map(event->new WordCount(event.getUrl(),1))
                .keyBy(WordCount::getWord)
                .window(
                        TumblingEventTimeWindows.of(Time.seconds(10))
                ).sum("count")
                .print("window");

        try {
            env.execute();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

基于时间的合流

窗口联结Window Join

WindowJoin: 在同一个窗口内的相同key的数据才能join成功。

orderDs.join( detailDs )
	  .where( OrderEvent::getOrderId )  // 第一条流用于join的key
	  .equalTo( OrderDetailEvent::getOrderId) // 第二条流用于join的key
	  .window(TumblingEventTimeWindows.of(Time.seconds(10)))
	  .apply(
	          new JoinFunction<OrderEvent, OrderDetailEvent, String>() {
	              @Override
	              public String join(OrderEvent first, OrderDetailEvent second) throws Exception {
	                  // 处理join成功的数据
	                  return  first + " -- " + second ;
	              }
	          }
	  ).print("windowJoin");

时间联结intervalJoin

在这里插入图片描述

IntervalJoin : 以一条流中数据的时间为基准, 设定上界和下界, 形成一个时间范围, 另外一条流中相同key的数据如果能落到对应的时间范围内, 即可join成功。

核心代码:

 orderDs.keyBy(
               OrderEvent::getOrderId
       ).intervalJoin(
               detailDs.keyBy( OrderDetailEvent::getOrderId)
       ).between(
               Time.seconds(-2) , Time.seconds(2)
       )
       //.upperBoundExclusive()  排除上边界值
       //.lowerBoundExclusive()  排除下边界值
       .process(
               new ProcessJoinFunction<OrderEvent, OrderDetailEvent, String>() {
                   @Override
                   public void processElement(OrderEvent left, OrderDetailEvent right, ProcessJoinFunction<OrderEvent, OrderDetailEvent, String>.Context ctx, Collector<String> out) throws Exception {
                       //处理join成功的数据
                       out.collect( left + " -- " + right );
                   }
               }
       ).print("IntervalJoin");

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/237402.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

前端框架(Front-end Framework)和库(Library)的区别

聚沙成塔每天进步一点点 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 欢迎来到前端入门之旅&#xff01;感兴趣的可以订阅本专栏哦&#xff01;这个专栏是为那些对Web开发感兴趣、刚刚踏入前端领域的朋友们量身打造的。无论你是完全的新手还是有一些基础的开发…

WPF实现更加灵活绑定复杂Command(使用Microsoft XAML Behaviors 库)

1、安装NuGet 2、在XAML的命名空间引入&#xff1a; xmlns:i"http://schemas.microsoft.com/xaml/behaviors" 3、使用&#xff1a; <Canvas Background"Aqua"><Rectangle Stroke"Red" Width"{Binding RectModel.RectangleWidth}…

Docker基础概念解析:镜像、容器、仓库

当谈到容器化技术时&#xff0c;Docker往往是第一个被提及的工具。Docker的基础概念涵盖了镜像、容器和仓库&#xff0c;它们是理解和使用Docker的关键要素。在这篇文章中&#xff0c;将深入探讨这些概念&#xff0c;并提供更丰富的示例代码&#xff0c;帮助大家更好地理解和应…

【AI绘图】 学习 prompt 画图,收集网站

文章目录 在线画图网站Prompt模型下载AI 工具箱 在线画图网站 【强推】搜图&#xff1f;也可以在线画图&#xff0c;质量很高&#xff01;&#xff1a;https://lexica.art/ Lexica 是一个搜索 AI 生成图片的网站&#xff0c;可以根据图片本身关联性或描述文本&#xff08;prom…

基于OpenCV+CNN+IOT+微信小程序智能果实采摘指导系统——深度学习算法应用(含python、JS工程源码)+数据集+模型(五)

目录 前言总体设计系统整体结构图系统流程图 运行环境Python环境TensorFlow 环境Jupyter Notebook环境Pycharm 环境微信开发者工具OneNET云平台 模块实现1. 数据预处理2. 创建模型并编译3. 模型训练及保存4. 上传结果5. 小程序开发1&#xff09;查询图片2&#xff09;查询识别结…

计算机网络——期末考试复习资料

什么是计算机网络 将地理位置不同的具有独立功能的多台计算机及其外部设备通过通信线路和通信设备连接起来&#xff1b;实现资源共享和数据传递的计算机的系统。 三种交换方式 报文交换&#xff1a;路由器转发报文&#xff1b; 电路交换&#xff1a;建立一对一电路 分组交换&a…

大数据驱动下的人口普查:新时代下的新变革

人口普查数据大屏&#xff0c;是指一种通过大屏幕显示人口普查数据的设备&#xff0c;可以将人口普查数据以可视化的形式呈现出来&#xff0c;为决策者提供直观、准确的人口数据。这种大屏幕的出现&#xff0c;让人口普查数据的利用变得更加高效、便捷。 如果您需要制作一张直观…

手写VUE后台管理系统10 - 封装Axios实现异常统一处理

目录 前后端交互约定安装创建Axios实例拦截器封装请求方法业务异常处理 axios 是一个易用、简洁且高效的http库 axios 中文文档&#xff1a;http://www.axios-js.com/zh-cn/docs/ 前后端交互约定 在本项目中&#xff0c;前后端交互统一使用 application/json;charsetUTF-8 的请…

appium安卓app自动化,遇到搜索框无搜索按钮元素时无法搜索的解决方案

如XX头条&#xff0c;搜索框后面有“搜索”按钮&#xff0c;这样实现搜索操作较为方便。 但有些app没有设置该搜索按钮&#xff0c;初学者就要花点时间去学习怎么实现该功能了&#xff0c;如下图。 这时候如果定位搜索框&#xff0c;再点击操作&#xff0c;再输入文本后&#x…

【QT入门】基础知识

一.认识Qt qt是一套应用程序开发库&#xff0c;与MFC不同是跨平台的开发类库&#xff0c;主要用来开发图形界面。完全面向对象容易扩展。 优点&#xff1a;1.封装性强&#xff0c;简单易学 2.跨平台 3.独立编译为本地代码 二.qt工程 1.常见的工程文件有这两种…

2024 年 SEO 现状

搜索引擎优化&#xff08;SEO&#xff09;一直以来都是网络知名度和成功的基石。随着我们踏上 2024 年的征程&#xff0c;SEO领域正在经历重大变革&#xff0c;有些变革已经开始&#xff0c;这对企业、创作者和营销人员来说既是挑战也是机遇。 语音搜索 语音搜索曾是一个未来…

HeartBeat监控Mysql状态

目录 一、概述 二、 安装部署 三、配置 四、启动服务 五、查看数据 一、概述 使用heartbeat可以实现在kibana界面对 Mysql 服务存活状态进行观察&#xff0c;如有必要&#xff0c;也可在服务宕机后立即向相关人员发送邮件通知 二、 安装部署 参照章节&#xff1a;监控组件…

【小白专用】MySQL查询数据库所有表名及表结构其注释

一、先了解下INFORMATION_SCHEMA 1、在MySQL中&#xff0c;把INFORMATION_SCHEMA看作是一个数据库&#xff0c;确切说是信息数据库。其中保存着关于MySQL服务器所维护的所有其他数据库的信息。如数据库名&#xff0c;数据库的表&#xff0c;表栏的数据类型与访问权 限等。在INF…

HarmonyOS编译开源native库(OpenSSL实例)

前言 近期项目要开始做鸿蒙版本&#xff0c;有一部分依赖native的代码也需要迁移&#xff0c;某个native模块依赖openssl&#xff0c;需要在鸿蒙下重新编译openssl才行。一开始找了很多相关文档都没有得到方法&#xff0c;无奈只能自己凭经验慢慢试&#xff0c;最后还是成功了…

.net 安装Postgresql驱动程序ngpsql

.net 安装Postgresql驱动程序ngpsql 最近搞一个物联网项目&#xff0c;需要采集fanuc数控机床的数据&#xff0c;厂家提供的API只支持windows&#xff0c;所以就决定C#开发&#xff0c;数据库用postgresql&#xff0c; 安装数据库驱动一波三折。 作为一个讨厌微软的老程序猿&…

Postman高级应用——变量、流程控制、调试、公共函数、外部数据文件

Postman 提供了四种类型的变量 环境变量&#xff08;Environment Variable&#xff09; 不同的环境&#xff0c;使用不同的环境变量&#xff0c;例如&#xff1a;测试过程中经常会用到 测试环境&#xff0c;外网环境等 全局变量&#xff08;Global Variable&#xff09; 所有的…

使用PyTorch II的新特性加快LLM推理速度

Pytorch团队提出了一种纯粹通过PyTorch新特性在的自下而上的优化LLM方法&#xff0c;包括: Torch.compile: PyTorch模型的编译器 GPU量化:通过降低精度操作来加速模型 推测解码:使用一个小的“草稿”模型来加速llm来预测一个大的“目标”模型的输出 张量并行:通过在多个设备…

认识lambda架构(架构师考试复习)

Lambda架构主要分为三层&#xff0c;批处理层、加速层和服务层。 如下图所示&#xff1a; &#xff08;1&#xff09;批处理层&#xff08;Batch Layer&#xff09;&#xff1a;存储数据集&#xff0c;在数据集上预先计算查询函数&#xff0c;并构建查询对应的view。Batch Lay…

Unity-小工具-LookAt

Unity-小工具-LookAt &#x1f959;介绍 &#x1f959;介绍 &#x1f4a1;通过扩展方法调用 gameObject.LookAtTarget&#xff0c;让物体转向目标位置 &#x1f4a1;gameObject.StopLookat 停止更新 &#x1f4a1;可以在调用时传入自动停止标记&#xff0c;等转向目标位置后自…

C语言学习----指针和数组

&#x1f308;这篇blog记录一下指针学习~ 主要是关于指针和数组之间的关系&#xff0c;还有指针的使用等~ &#x1f34e;指针变量是一个变量 其本身也有一个地址 也需要存放&#xff0c;就和int char等类型一样的&#xff0c;也需要有一个地址来存放它 &#x1f34c;而指针变量…