FlinkSql 窗口函数

Windowing TVF

以前用的是Grouped Window Functions(分组窗口函数),但是分组窗口函数只支持窗口聚合

现在FlinkSql统一都是用的是Windowing TVFs(窗口表值函数),Windowing TVFs更符合 SQL 标准且更加强大,支持window join、Window aggregations、Window Top-N、Window Deduplication 

Windowing TVFs是 Flink 定义的多态表函数(Polymorphic Table Function,缩写PTF),PTF 是 SQL 2016 标准中的一种特殊的表函数,它可以把表作为一个参数

窗口函数

Flink 认为窗口把流分割为有限大小的 “桶”,这样就可以在其之上进行计算

有以下几种用法

  • 滚动窗口
  • 滑动窗口
  • 累积窗口
  • 会话窗口 (即将支持)

滚动窗口(TUMBLE)

TUMBLE 函数指定每个元素到一个指定大小的窗口中。滚动窗口的大小固定且不重复。

例如:假设指定了一个 5 分钟的滚动窗口。Flink 将每 5 分钟生成一个新的窗口,如下图所示:

TUMBLE 函数通过时间属性字段为每行数据分配一个窗口。 在流计算模式,时间属性字段必须被指定为事件或处理时间属性。在批计算模式,这个窗口表函数的时间属性字段必须是 TIMESTAMP 或 TIMESTAMP_LTZ 的类型

--TUMBLE(TABLE data, DESCRIPTOR(timecol), size [, offset ])
data :拥有时间属性列的表。
timecol :列描述符,决定数据的哪个时间属性列应该映射到窗口。
size :窗口的大小(时长)。
offset :窗口的偏移量 [非必填]。

SELECT * FROM TABLE(
   TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES));

SELECT window_start, window_end, SUM(price)
  FROM TABLE(
    TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end;

滑动窗口(HOP)

滑动窗口函数指定元素到一个定长的窗口中。和滚动窗口很像,有窗口大小参数,另外增加了一个窗口滑动步长参数。如果滑动步长小于窗口大小,就能产生数据重叠的效果。在这个例子里,数据可以被分配在多个窗口。

例如:可以定义一个每5分钟滑动一次。大小为10分钟的窗口。每5分钟获得最近10分钟到达的数据的窗口,如下图所示:

HOP 函数通过时间属性字段为每一行数据分配了一个窗口。 在流计算模式,这个时间属性字段必须被指定为事件或处理时间属性。在批计算模式,这个窗口表函数的时间属性字段必须是 TIMESTAMP 或 TIMESTAMP_LTZ 的类型

-- HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])
data:拥有时间属性列的表。
timecol:列描述符,决定数据的哪个时间属性列应该映射到窗口。
slide:窗口的滑动步长。
size:窗口的大小(时长)。
offset:窗口的偏移量 [非必填]。

SELECT * FROM TABLE(
    HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES));

SELECT window_start, window_end, SUM(price)
  FROM TABLE(
    HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end;

累积窗口(CUMULATE)

CUMULATE 函数指定元素到多个窗口,从初始的窗口开始,直到达到最大的窗口大小的窗口,所有的窗口都包含其区间内的元素,另外,窗口的开始时间是固定的。 你可以将 CUMULATE 函数视为首先应用具有最大窗口大小的 TUMBLE 窗口,然后将每个滚动窗口拆分为具有相同窗口开始但窗口结束步长不同的几个窗口。 所以累积窗口会产生重叠并且没有固定大小。

例如:1小时步长,24小时大小的累计窗口,每天可以获得如下这些窗口:[00:00, 01:00)[00:00, 02:00)[00:00, 03:00), …, [00:00, 24:00)

-- CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)
data:拥有时间属性列的表。
timecol:列描述符,决定数据的哪个时间属性列应该映射到窗口。
step:指定连续的累积窗口之间增加的窗口大小。
size:指定累积窗口的最大宽度的窗口时间。size必须是step的整数倍。
offset:窗口的偏移量 [非必填]。


SELECT * FROM TABLE(
    CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES));

SELECT window_start, window_end, SUM(price)
  FROM TABLE(
    CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end;

窗口偏移

上诉窗口都有一个 offset 参数,默认值就是 0,所以窗口默认都是整点启动的

比如10分钟的滚动窗口:TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES),只会生成[2021-06-29 23:40:00, 2021-06-29 00:50:00),[2021-06-29 23:50:00, 2021-06-30 00:00:00),window_start 和 window_end 和数据的时间无关

offset 就是用来调整窗口偏移的,当 offset 为 -16 MINUTE,时间戳为 2021-06-30 00:00:04 的数据会分配到窗口 [2021-06-29 23:54:00, 2021-06-30 00:04:00)。

窗口函数进阶用法

flink开窗需要写上windowend,否则只是带了一个windowstart的时间而已,并没有真正开启窗口

Window Aggregation

窗口聚合是通过 GROUP BY 子句定义的,其特征是包含 窗口表值函数 产生的 “window_start” 和 “window_end” 列。和普通的 GROUP BY 子句一样,窗口聚合对于每个组会计算出一行数据。

SELECT window_start, window_end, SUM(price)
  FROM TABLE(
    TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end;
+------------------+------------------+-------+
|     window_start |       window_end | price |
+------------------+------------------+-------+
| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
+------------------+------------------+-------+

并且支持多级窗口聚合 

-- tumbling 5 minutes for each supplier_id
CREATE VIEW window1 AS
-- Note: The window start and window end fields of inner Window TVF are optional in the select clause. However, if they appear in the clause, they need to be aliased to prevent name conflicting with the window start and window end of the outer Window TVF.
SELECT window_start as window_5mintumble_start, window_end as window_5mintumble_end, window_time as rowtime, SUM(price) as partial_price
  FROM TABLE(
    TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
  GROUP BY supplier_id, window_start, window_end, window_time;

-- tumbling 10 minutes on the first window
SELECT window_start, window_end, SUM(partial_price) as total_price
  FROM TABLE(
      TUMBLE(TABLE window1, DESCRIPTOR(rowtime), INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end;

下面是分组窗口聚合的写法,分组窗口聚合已经过时,官网不推荐使用了

SELECT
  user,
  TUMBLE_START(order_time, INTERVAL '1' DAY) AS wStart,
  SUM(amount) FROM Orders
GROUP BY
  TUMBLE(order_time, INTERVAL '1' DAY),
  user

Window Join

在流式查询中,与其他连续表上的关联不同,窗口关联不产生中间结果,只在窗口结束产生一个最终的结果。另外,窗口关联会清除不需要的中间状态

目前使用时有一些限制:

目前,窗口关联需要在 join on 条件中包含两个输入表的 window_start 等值条件和 window_end 等值条件

目前,关联的左右两边必须使用相同的窗口表值函数。这个规则在未来可以扩展,比如:滚动和滑动窗口在窗口大小相同的情况下 join。

语法上支持 INNER、LEFT、RIGHT、FULL OUTER、ANTI、SEMI JOIN。而且,窗口关联可以在其他基于 窗口表值函数 的操作后使用,例如 窗口聚合,窗口 Top-N 和 窗口关联

SELECT l.id as l_id,r.id as r_id,l.window_start,l.window_end
FROM (
     SELECT * FROM TABLE(TUMBLE(TABLE t_left, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
     ) l
INNER JOIN (
     SELECT * FROM TABLE(TUMBLE(TABLE t_right, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
      ) r
ON l.id = r.id 
AND l.window_start = r.window_start 
AND l.window_end = r.window_end;

Window TopN

与普通Top-N不同,窗口Top-N只在窗口最后返回汇总的Top-N数据,不会产生中间结果。窗口 Top-N 会在窗口结束后清除不需要的中间状态

窗口 Top-N 适用于用户不需要每条数据都更新Top-N结果的场景,相对普通Top-N来说性能更好

SELECT *
  FROM (
    SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price DESC) as rownum
    FROM TABLE(
               TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
  ) WHERE rownum <= 3;

还可以在窗口聚合后在进行窗口 Top-N

SELECT *
  FROM (
    SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price DESC) as rownum
    FROM (
      SELECT window_start, window_end, supplier_id, SUM(price) as price, COUNT(*) as cnt
      FROM TABLE(
        TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
      GROUP BY window_start, window_end, supplier_id
    )
  ) WHERE rownum <= 3;

Window Deduplication

窗口去重是一种特殊的 去重,它根据指定的多个列来删除重复的行,保留每个窗口和分区键的第一个或最后一个数据

对于流式查询,与普通去重不同,窗口去重只在窗口的最后返回结果数据,不会产生中间结果。它会清除不需要的中间状态。 因此,窗口去重查询在用户不需要更新结果时,性能较好

Window Deduplication是一种特殊的窗口 Top-N:N是1并且是根据处理时间或事件时间排序的(目前只支持根据事件时间属性进行排序),支持在其他窗口操作上进行去重操作,比如 窗口聚合,窗口TopN 和 窗口关联

SELECT *
  FROM (
    SELECT bidtime, price, item, supplier_id, window_start, window_end, 
      ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY bidtime DESC) AS rownum
    FROM TABLE(
               TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
  ) WHERE rownum <= 1;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/376765.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

uniapp 本地存储的方式

1. uniapp 本地存储的方式 在uniapp开发中&#xff0c;本地存储是一个常见的需求。本地存储可以帮助我们在客户端保存和管理数据&#xff0c;以便在应用程序中进行持久化存储。本文将介绍uniapp中本地存储的几种方式&#xff0c;以及相关的代码示例。 1.1. 介绍 在移动应用开发…

浅谈bypass Etw

文章目录 c#ExecuteAssemblybypass etw c# loader 一种是通过反射找到指定空间的类中method进行Invoke 另一种是通过EntryPoint.Invoke加载 反射加载 Assembly.Load()是从String或AssemblyName类型加载程序集&#xff0c;可以读取字符串形式的程序集 Assembly.LoadFrom()从指定…

elk之倒排索引

写在前面 本文看下es的倒排索引相关内容。 1&#xff1a;正排索引和倒排索引 正排索引就是通过文档id找文档内容&#xff0c;而倒排索引就是通过文档内容找文档id&#xff0c;如下图&#xff1a; 2&#xff1a;倒排索引原理 假定我们有如下的数据&#xff1a; 为了建立倒…

第21讲:动态内存管理

1.为什么要有动态内存分配 2.malloc和free 3.calloc 4.realloc 5.笔试题 6.总结c/c中程序内存区域划分 1.为什么要有动态内存分配 为了调整申请的空间大小&#xff0c;使程序员可以申请和释放空间&#xff0c;提高程序的灵活性 2.malloc和free 作用&#xff1a;分配一块…

安装Pytorch中的torchtext之CUDA版的正确方式

安装Pytorch和torchtext&#xff1a; Previous PyTorch Versions | PyTorch Installing previous versions of PyTorchhttps://pytorch.org/get-started/previous-versions/ 上面的命令如下&#xff1a; pip install torch2.1.2 torchvision0.16.2 torchaudio2.1.2 --index-…

单片机学习笔记---串口通信(2)

目录 串口内部结构 串口相关寄存器 串口控制寄存器SCON SM0和SM1 SM2 REN TB8和RB8 TI和RI 电源控制寄存器PCON SMOD 串口工作方式 方式0 方式0输出&#xff1a; 方式0输入 方式1 方式1输出。 方式1输入 方式2和方式3 方式2和方式3输出&#xff1a; 方式2和…

Nacos(2)

Nacos部署 服务器端docker部署&#xff08;需要服务器安装好docker&#xff09; 导入sql文件到服务器编写nacos配置文件custom.env&#xff08;示例如下&#xff0c;改为自己服务器nacos相关信息&#xff09; PREFER_HOST_MODEhostname MODEstandalone SPRING_DATASOURCE_PL…

CentOS7如何安装宝塔面板并实现固定公网地址远程访问

文章目录 一、使用官网一键安装命令安装宝塔二、简单配置宝塔&#xff0c;内网穿透三、使用固定公网地址访问宝塔 宝塔面板作为建站运维工具&#xff0c;适合新手&#xff0c;简单好用。当我们在家里/公司搭建了宝塔&#xff0c;没有公网IP&#xff0c;但是想要在外也可以访问内…

代码随想录算法训练营第12天—二叉树01 | ● 理论基础 ● *递归遍历 ● *迭代遍历

理论基础 文章讲解&#xff1a;https://programmercarl.com/%E4%BA%8C%E5%8F%89%E6%A0%91%E7%90%86%E8%AE%BA%E5%9F%BA%E7%A1%80.html 二叉树是一种数据结构&#xff0c;常用于递归场景二叉树&#xff1a;binary tree&#xff0c;每个节点最多有两个子节点&#xff08;分支&a…

获取旁站 / C 段:第三方网站(附链接)

一、介绍 1.1 旁段 在网络安全的上下文中&#xff0c;"旁段"&#xff08;Pivot&#xff09;是指攻击者通过入侵一个网络中的一台计算机&#xff0c;然后利用该计算机作为跳板&#xff08;或者称之为“旁道”&#xff09;来访问其他计算机或网络资源的行为。 攻击者…

伦敦金交易平台:了解交易背后的世界

伦敦金交易平台是全球金融市场中备受关注的重要平台之一。作为国际金融中心&#xff0c;伦敦汇聚了众多金融机构和投资者&#xff0c;其金交所成为全球最大的现货黄金市场。在这个繁荣蓬勃的市场中&#xff0c;交易活跃&#xff0c;投资机会多样&#xff0c;吸引了众多投资者前…

DDoS攻击激增,分享高效可靠的DDoS防御方案

当下DDoS攻击规模不断突破上限&#xff0c;形成了 "网络威胁格局中令人不安的趋势"。专业数据显示&#xff0c;对比2022年上半年与2023年上半年&#xff0c;所有行业的DDoS攻击频率增加了314%。其中零售、电信和媒体公司遭受的攻击规模最大&#xff0c;三个垂直行业的…

手把手教你激活FL Studio 21.2.2.3914中文破解版2024年图文激活教程以及如何设置中文language

FL Studio 21.2.2.3914软件简介 fl studio 21.2.2.3914中文破解版作为一款极具创意性的音乐软件工作站软件&#xff0c;FL Studio已经成为了许多音乐制作人和音乐爱好者的首选。最新的FL Studio 21.2.2.3914中文破解版的发布&#xff0c;无疑将会引起更多人的关注。 ​ FL St…

NC6X单点登录设计文档说明

前言 因为业务场景需要&#xff0c;第三方系统有些工作需要经常到NC系统里做&#xff0c;如果每次去NC系统做业务单据&#xff0c;都需要反复登录&#xff0c;导致客户使用体验不是很好&#xff0c;所以需要开发实现从第三方系统单点登录到NC系统&#xff0c;提高客户满意度。 …

多维时序 | Matlab实现CNN-RVM卷积神经网络结合相关向量机多变量时间序列预测

多维时序 | Matlab实现CNN-RVM卷积神经网络结合相关向量机多变量时间序列预测 目录 多维时序 | Matlab实现CNN-RVM卷积神经网络结合相关向量机多变量时间序列预测效果一览基本介绍程序设计参考资料 效果一览 基本介绍 Matlab实现CNN-RVM卷积神经网络结合相关向量机多变量时间序…

快准狠!在3D Slicer中,使用TotalSegmentator扩展可在1分钟内自动分割全身117个器官

本系列涵盖从 3D Slicer 医学图像查看器的基础使用到高级自动分割扩展程序的内容(从入门到高阶!),具体包括软件安装、基础使用教程,自动分割扩展(totalsegmentator, monai label)快速标注数据。 Tina姐:强烈建议做图像分割的宝宝们好好学习,跟着Tina姐涨姿势!本教程…

开关电源学习之Boost电路

如果我们需要给一个输入电压为5V的芯片供电&#xff0c;而我们只有一个3.3V的电源&#xff0c;那怎么办&#xff1f; 我们能不能把3.3V的电压升到5V&#xff1f; 一、电感的简介 而在升压的电路设计方案中&#xff0c;使用到一个重要的元器件&#xff1a;电感。 电感的特性…

44、WEB攻防——通用漏洞RCE代码执行多层面检测利用

文章目录 RCE分类&#xff1a; REC代码执行&#xff1a;引用脚本代码解析执行。例如&#xff0c;eval(phpinfo();)以php脚本解析phpinfo();。RCE命令执行&#xff1a;脚本调用操作系统命令。例如&#xff0c;system(ver)&#xff0c;命令执行能执行系统命令。 RCE漏洞对象&am…

C#中实现串口通讯和网口通讯(使用SerialPort和Socket类)

仅作自己学习使用 1 准备部份 串口通讯需要两个调试软件commix和Virtual Serial Port Driver&#xff0c;分别用于监视串口和创造虚拟串口。网口通讯需要一个网口调试助手&#xff0c;网络上有很多资源&#xff0c;我在这里采用的是微软商店中的TCP/UDP网络调试助手&#xff0…

ubuntu下修改hosts读写权限

ubuntu下修改hosts文件的操作&#xff1a; 由于需要在hosts文件下添加ip地址信息&#xff0c;但是初始情况下系统该文件为只读权限无法修改&#xff0c;具体操作如下所示&#xff1b; 1.cd到系统etc目录下&#xff0c;执行如下命令,此时会提示输入密码&#xff0c;直接输入回…
最新文章