0基础学习PyFlink——时间滚动窗口(Tumbling Time Windows)

大纲

  • map
  • reduce
  • 完整代码
  • 参考资料

在《0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)》一文中,我们发现如果窗口内元素个数没有达到窗口大小时,计算个数的函数是不会被调用的。如下图中红色部分
在这里插入图片描述
那么有没有办法让上图中(B,2)和(D,5)也会被计算呢?
这就可以使用本节介绍的时间滚动窗口。它不依赖于窗口中元素的个数,而是窗口的时间,即窗口时间到了,计算就会进行。
我们稍微修改下《0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)》的例子,让元素集中在“A”上。

map

class SumWindowFunction(WindowFunction[tuple, tuple, str, TimeWindow]):
    def apply(self, key: str, window: TimeWindow, inputs: Iterable[tuple]):
        print(*inputs, window)
        return [(key,  len([e for e in inputs]))]


word_count_data = [("A",2),("A",1),("A",4),("A",3),("A",6),("A",5),("A",7),("A",8),("A",9),("A",10),
                   ("A",11),("A",12),("A",13),("A",14),("A",15),("A",16),("A",17),("A",18),("A",19),("A",20)]

def word_count():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
    # write all the data to one file
    env.set_parallelism(1)

    source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])
    # define the source
    # mappging
    source = env.from_collection(word_count_data, source_type_info)
    # source.print()

    # keying
    keyed=source.key_by(lambda i: i[0]) 

reduce

    # reducing
    reduced=keyed.window(TumblingProcessingTimeWindows.of(Time.milliseconds(2))) \
                    .apply(SumWindowFunction(),
                        Types.TUPLE([Types.STRING(), Types.INT()]))
        
    # # define the sink
    reduced.print()

    # submit for execution
    env.execute()

这儿我们的Window使用的是滚动时间窗口,其中参数Time.milliseconds(2)是指窗口时长,即2毫秒一个窗口。
我们运行多次代码可以得到不同的结果

(‘A’, 2) (‘A’, 1) (‘A’, 4) (‘A’, 3) (‘A’, 6) (‘A’, 5) (‘A’, 7) (‘A’, 8) (‘A’, 9) (‘A’, 10) (‘A’, 11) (‘A’, 12) TimeWindow(start=1698771761164, end=1698771761166)
(A,12)
(‘A’, 13) (‘A’, 14) (‘A’, 15) (‘A’, 16) (‘A’, 17) (‘A’, 18) (‘A’, 19) (‘A’, 20) TimeWindow(start=1698771761166, end=1698771761168)
(A,8)

在这里插入图片描述

或者

(‘A’, 2) (‘A’, 1) (‘A’, 4) (‘A’, 3) (‘A’, 6) (‘A’, 5) (‘A’, 7) (‘A’, 8) (‘A’, 9) (‘A’, 10) (‘A’, 11) (‘A’, 12) (‘A’, 13) (‘A’, 14) (‘A’, 15) (‘A’, 16) TimeWindow(start=1698771731386, end=1698771731388)
(A,16)
(‘A’, 17) (‘A’, 18) (‘A’, 19) (‘A’, 20) TimeWindow(start=1698771731388, end=1698771731390)
(A,4)

在这里插入图片描述

或者

(‘A’, 2) (‘A’, 1) (‘A’, 4) (‘A’, 3) (‘A’, 6) (‘A’, 5) (‘A’, 7) (‘A’, 8) (‘A’, 9) (‘A’, 10) (‘A’, 11) (‘A’, 12) (‘A’, 13) (‘A’, 14) (‘A’, 15) (‘A’, 16) (‘A’, 17) (‘A’, 18) (‘A’, 19) (‘A’, 20) TimeWindow(start=1698771714992, end=1698771714994)
(A,20)

在这里插入图片描述

可以发现结果并不稳定。但是可以发现,每个元素都参与了计算,而不像个数滚动窗口那样部分数据没有被触发计算。

完整代码

from typing import Iterable
import time
from pyflink.common import Types, Time
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, WindowFunction
from pyflink.datastream.window import TimeWindow, TumblingProcessingTimeWindows
   
class SumWindowFunction(WindowFunction[tuple, tuple, str, TimeWindow]):
    def apply(self, key: str, window: TimeWindow, inputs: Iterable[tuple]):
        print(*inputs, window)
        return [(key,  len([e for e in inputs]))]


word_count_data = [("A",2),("A",1),("A",4),("A",3),("A",6),("A",5),("A",7),("A",8),("A",9),("A",10),
                   ("A",11),("A",12),("A",13),("A",14),("A",15),("A",16),("A",17),("A",18),("A",19),("A",20)]

def word_count():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
    # write all the data to one file
    env.set_parallelism(1)

    source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])
    # define the source
    # mappging
    source = env.from_collection(word_count_data, source_type_info)
    # source.print()

    # keying
    keyed=source.key_by(lambda i: i[0]) 
    
    # reducing
    reduced=keyed.window(TumblingProcessingTimeWindows.of(Time.milliseconds(2))) \
                    .apply(SumWindowFunction(),
                        Types.TUPLE([Types.STRING(), Types.INT()]))
        
    # # define the sink
    reduced.print()

    # submit for execution
    env.execute()

if __name__ == '__main__':
    word_count()

参考资料

  • https://nightlies.apache.org/flink/flink-docs-master/api/python/reference/pyflink.datastream/api/pyflink.datastream.window.TumblingProcessingTimeWindows.html#pyflink.datastream.window.TumblingProcessingTimeWindows

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/116592.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

CleanMyMac X2024登录激活码

本篇将为各位小伙伴们集中讲解一下,Mac清理工具CleanMyMac X的下载、安装与激活是如何进行的。 系统:macOS 10.14(在10.15以及Big Sur中的安装激活教程相同) 下载CleanMyMac X 登录CleanMyMac X下载页面,然后点击【…

R语言 复习 习题图片

这是日天土申哥不知道从哪淘来的R语言复习知识点图片,大部分内容都是课后习题的答案 加油吧,骚年,考个好分数

MyBatis-Plus复习总结(一)

文章目录 一、环境搭键二、基本CRUD2.1 BaseMapper2.2 插入2.3 删除2.4 修改2.5 查询 三、通用Service四、常用注解4.1 雪花算法4.2 注解TableLogic 五、条件构造器和常用接口5.1 Wrapper介绍5.2 QueryWrapper5.3 UpdateWrapper5.4 condition5.5 LambdaQueryWrapper5.6 LambdaU…

五:Day11_SpringMVC03

一、拦截器 SpringMVC给出了拦截器来实现单元方法的拦截,拦截器的执行是在DispatcherServlet之后和单元方法之前的。 注意:只有URL匹配到了控制单元,拦截器才能生效。 2. 使用拦截器 2.1 创建拦截器类 public class MyInterceptor implem…

工地现场智慧管理信息化解决方案 智慧工地源码

智慧工地系统充分利用计算机技术、互联网、物联网、云计算、大数据等新一代信息技术,以PC端,移动端,设备端三位一体的管控方式为企业现场工程管理提供了先进的技术手段。让劳务、设备、物料、安全、环境、能源、资料、计划、质量、视频监控等…

图解系列--防火墙

05.01 防火墙是怎样的网络硬件 构建安全网络体系而需要遵循的 CIA 基本理念。CIA 是机密性 (Confidentiality) 、 完整性(Integrity) 、 可用性(Availability)。 防火墙硬件作为防范装置能够同时实现CIA 中3个条目的相应对策。在20世纪90年代中期,普通企业一般都…

【深度学习】pytorch——线性回归

笔记为自我总结整理的学习笔记,若有错误欢迎指出哟~ 深度学习专栏链接: http://t.csdnimg.cn/dscW7 pytorch——线性回归 线性回归简介公式说明完整代码代码解释 线性回归简介 线性回归是一种用于建立特征和目标变量之间线性关系的统计学习方法。它假设…

JavaScript处理字符串

字符串(String)是不可变的、有限数量的字符序列,字符包括可见字符、不可见字符和转义字符。在程序设计中,经常需要处理字符串,如复制、替换、连接、比较、查找、截取、分割等。在JavaScript中,字符串是一类简单值,直接…

NLP之Bert多分类实现案例(数据获取与处理)

文章目录 1. 代码解读1.1 代码展示1.2 流程介绍1.3 debug的方式逐行介绍 3. 知识点 1. 代码解读 1.1 代码展示 import json import numpy as np from tqdm import tqdmbert_model "bert-base-chinese"from transformers import AutoTokenizertokenizer AutoToken…

AI:57-基于机器学习的番茄叶部病害图像识别

🚀 本文选自专栏:AI领域专栏 从基础到实践,深入了解算法、案例和最新趋势。无论你是初学者还是经验丰富的数据科学家,通过案例和项目实践,掌握核心概念和实用技能。每篇案例都包含代码实例,详细讲解供大家学习。 📌📌📌在这个漫长的过程,中途遇到了不少问题,但是…

体验SOLIDWORKS钣金切口工具增强 硕迪科技

在工业生产制造中,钣金加工是一种常用的加工方式,在SOLIDWORKS2024新版本中,钣金切口工具再次增强了,从SOLIDWORKS 2024 开始, 您可以使用切口工具在空心或薄壁圆柱体和圆锥体中生成切口。 只需在现有空心或薄壁圆柱体…

每天五分钟计算机视觉:搭建手写字体识别的卷积神经网络

本文重点 我们学习了卷积神经网络中的卷积层和池化层,这二者都是卷积神经网络中不可缺少的元素,本例中我们将搭建一个卷积神经网络完成手写字体识别。 卷积和池化的直观体现 手写字体识别 手写字体的图片大小是32*32*3的,它是一张 RGB 模式的图片,现在我们想识别它是从 …

Leetcode刷题详解——求根节点到叶节点数字之和

1. 题目链接:129. 求根节点到叶节点数字之和 2. 题目描述: 给你一个二叉树的根节点 root ,树中每个节点都存放有一个 0 到 9 之间的数字。 每条从根节点到叶节点的路径都代表一个数字: 例如,从根节点到叶节点的路径 1…

软通杯算法竞赛--周赛题目(一)

目录 一、S属性大爆发 二、日期杯 三、 三人行必由我师 四、集合之差 五、咱们计算机不懂烷烃 六、适度跑步健康长寿 一、S属性大爆发 测试用例 5 esS qwert codeforces PoSgju LkkJKkO 输出案例 二、日期杯 输入案例: 3 2022 2022 11 1900 2100 15 1989 20…

Java继承:抽取相同共性,实现代码复用

👑专栏内容:Java⛪个人主页:子夜的星的主页💕座右铭:前路未远,步履不停 目录 一、继承的概念二、继承的语法三、父类成员访问1、子类中访问父类成员变量Ⅰ、子类和父类不存在同名成员变量Ⅱ、子类和父类成员…

Zabbix监控联想服务器的配置方法

简介 图片 随着科技的发展,对于数据的敏感和安全大部分取决于对硬件性能、故障预判的监测,由此可见实时监测保障硬件的安全很重要,从而衍生了很多对硬件的监测软件,Zabbix就一个不错的选择。开源 开源 开源! zabbix是…

树结构及其算法-二叉运算树

目录 树结构及其算法-二叉运算树 C代码 树结构及其算法-二叉运算树 二叉树的应用实际上相当广泛,例如表达式之间的转换。可以把中序表达式按运算符优先级的顺序建成一棵二叉运算树(Binary Expression Tree,或称为二叉表达式树)…

【文生图】Stable Diffusion XL 1.0模型Full Fine-tuning指南(U-Net全参微调)

文章目录 前言重要教程链接以海报生成微调为例总体流程数据获取POSTER-TEXTAutoPosterCGL-DatasetPKU PosterLayoutPosterT80KMovie & TV Series & Anime Posters 数据清洗与标注模型训练模型评估生成图片样例宠物包商品海报护肤精华商品海报 一些TipsMata:…

UUNet训练自己写的网络

记录贴写的很乱仅供参考。 自己写的Unet网络不带深度监督,但是NNUNet默认的训练方法是深度监督训练的,对应的模型也是带有深度监督的。但是NNUNetV2也贴心的提供了非深度监督的训练方法在该目录下: 也或者说我们想要自己去定义一个nnUNWtTra…

使用自定义函数拟合辨识HPPC工况下的电池数据(适用于一阶RC、二阶RC等电池模型)

该程序可以离线辨识HPPC工况下的电池数据,只需要批量导入不同SOC所对应的脉冲电流电压数据,就可以瞬间获得SOC为[100% 90% 80% 70% 60% 50% 40% 30% 20% 10% 0%]的所有电池参数,迅速得到参数辨识的结果并具有更高的精度,可以很大程度上降低参…