0基础学习PyFlink——时间滑动窗口(Sliding Time Windows)

在《0基础学习PyFlink——时间滚动窗口(Tumbling Time Windows)》我们介绍了不会有重复数据的时间滚动窗口。本节我们将介绍存在重复计算数据的时间滑动窗口。
关于滑动窗口,可以先看下《0基础学习PyFlink——个数滑动窗口(Sliding Count Windows)》。下图就是个数滑动窗口示意图。
在这里插入图片描述
我们看到个数滑动窗口也会因为窗口内数据不够而不被触发。但是时间滑动窗口则可以解决这个问题,我们只要把窗口改成时间类型即可。
相应的代码我们参考《0基础学习PyFlink——时间滚动窗口(Tumbling Time Windows)》,只要把TumblingProcessingTimeWindows改成SlidingProcessingTimeWindows,并增加一个偏移参数(Time.milliseconds(1))即可。这意味着我们将运行一个时间长度为2毫秒,每次递进1毫秒的窗口。

完整代码

from typing import Iterable
import time
from pyflink.common import Types, Time
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, WindowFunction
from pyflink.datastream.window import  TimeWindow, SlidingProcessingTimeWindows
   
class SumWindowFunction(WindowFunction[tuple, tuple, str, TimeWindow]):
    def apply(self, key: str, window: TimeWindow, inputs: Iterable[tuple]):
        print(*inputs, window)
        return [(key,  len([e for e in inputs]))]


word_count_data = [("A",2),("A",1),("A",4),("A",3),("A",6),("A",5),("A",7),("A",8),("A",9),("A",10),
                   ("A",11),("A",12),("A",13),("A",14),("A",15),("A",16),("A",17),("A",18),("A",19),("A",20)]

def word_count():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
    # write all the data to one file
    env.set_parallelism(1)

    source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])
    # define the source
    # mappging
    source = env.from_collection(word_count_data, source_type_info)
    # source.print()

    # keying
    keyed=source.key_by(lambda i: i[0]) 
    
    # reducing
    reduced=keyed.window(SlidingProcessingTimeWindows.of(Time.milliseconds(2), Time.milliseconds(1))) \
                    .apply(SumWindowFunction(),
                        Types.TUPLE([Types.STRING(), Types.INT()]))
        
    # # define the sink
    reduced.print()

    # submit for execution
    env.execute()

if __name__ == '__main__':
    word_count()

运行结果

运行两次上述代码,我们发现每次都不同,而且有重叠计算的元素。

(‘A’, 2) (‘A’, 1) (‘A’, 4) TimeWindow(start=1698773292650, end=1698773292652)
(‘A’, 2) (‘A’, 1) (‘A’, 4) (‘A’, 3) (‘A’, 6) (‘A’, 5) (‘A’, 7) (‘A’, 8) (‘A’, 9) (‘A’, 10) (‘A’, 11) TimeWindow(start=1698773292651, end=1698773292653)
(A,3)
(A,11)
(‘A’, 3) (‘A’, 6) (‘A’, 5) (‘A’, 7) (‘A’, 8) (‘A’, 9) (‘A’, 10) (‘A’, 11) (‘A’, 12) (‘A’, 13) (‘A’, 14) (‘A’, 15) (‘A’, 16) (‘A’, 17) (‘A’, 18) (‘A’, 19) (‘A’, 20) TimeWindow(start=1698773292652, end=1698773292654)
(A,17)

在这里插入图片描述

(‘A’, 2) (‘A’, 1) (‘A’, 4) TimeWindow(start=1698773319933, end=1698773319935)
(‘A’, 2) (‘A’, 1) (‘A’, 4) (‘A’, 3) (‘A’, 6) (‘A’, 5) (‘A’, 7) (‘A’, 8) (‘A’, 9) (‘A’, 10) (‘A’, 11) (‘A’, 12) TimeWindow(start=1698773319934, end=1698773319936)
(A,3)
(A,12)
(‘A’, 3) (‘A’, 6) (‘A’, 5) (‘A’, 7) (‘A’, 8) (‘A’, 9) (‘A’, 10) (‘A’, 11) (‘A’, 12) (‘A’, 13) (‘A’, 14) (‘A’, 15) (‘A’, 16) (‘A’, 17) (‘A’, 18) (‘A’, 19) (‘A’, 20) TimeWindow(start=1698773319935, end=1698773319937)
(A,17)

在这里插入图片描述

参考资料

  • https://nightlies.apache.org/flink/flink-docs-master/api/python/reference/pyflink.datastream/api/pyflink.datastream.window.SlidingProcessingTimeWindows.html#pyflink.datastream.window.SlidingProcessingTimeWindows

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/116409.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【Midjourney入门教程2】Midjourney的基础操作和设置

文章目录 Midjourney的常用命令和基础设置1、 /imagine2、 /blend3、 /info4、 /subscribe5、 /settings(Midjourney的基础设置)6、 /shorten 有部分同学说我不想要英文界面的,不要慌: 点击左下角个人信息的设置按钮,找…

CoT: 思路链提示促进大语言模型的多步推理

CoT 总览摘要1 引言2 Chain-of-Thought Prompting3 算术推理 (Arithmetic Reasoning)3.1 实验设置3.2 结果3.3 消融实验3.4 CoT的鲁棒性 4 常识推理 (Commonsense Reasoning)5 符号推理 (Symbolic Reasoning&#xff0…

SystemC入门之测试平台编写完整示例:全加器

导读: 本文将完整演示基于systemC编写一个全加器的测试平台。具体内容包括:激励平台,监控平台,待测单元的编写,波形文件读取。 1,main函数模块 搭建一个测试平台主要由:Driver, Monitor, DUT(design under …

Nginx编译安装和配置

官网:http://nginx.org/ 这里以1.20.2为例 Nginx是C语言写的 如果Linux系统上没有安装C编译环境 先执行下面命令 yum install -y gcc automake autoconf libtool make 如果不确定 可以使用命令查看 命令格式 rpm -q xxx 例如 说明有C编译环境 安装前需要安装4个依赖包…

基因组WGD的鉴定与分化时间

1. WGD 简介 全基因组复制(Whole genome duplications, WGD)是生物进化的重要因素之一(导致基因组扩增的因素包括全基因组复制和转座子TEs), 所以WGD分析也是基因组分析经常用到的一种分析方法。 古 WGD 检测有两种方法,一种是共线性分析&a…

Spring Boot创建多模块项目

创建一个普通的Spring Boot项目, 然后只留下 pom.xml 剩下的都删掉 删除多余标签 标识当前为父模块 创建子模块 删除子模块中多余标签 声明父模块 在父模块中声明子模块

体验SOLIDWORKS旋转反侧切除增强 硕迪科技

大家在设计中经常使用的旋转切除命令在solidworks2024版本中迎来了新的增强,添加了旋转反侧切除选项。在设计过程中不必修改复杂的草图即可切除掉我们不需要的部分。使设计工作更加方便快捷。 打开零部件后,点击键盘上的S键并输入旋转切除以搜索该命令&a…

[极客大挑战 2019]Knife 1(两种解法)

题目环境: 这道题主要考察中国菜刀和中国蚁剑的使用方法 以及对PHP一句话木马的理解 咱们先了解一下PHP一句话木马,好吗? **eval($_POST["Syc"]);** **eval是PHP代码执行函数,**把字符串按照 PHP 代码来执行。 $_POST P…

使用HttpClient库的爬虫程序

使用HttpClient库的爬虫程序,该爬虫使用C#来抓取内容。 using System; using System.Net.Http; using System.Threading.Tasks; ​ namespace CrawlerProgram {class Program{static void Main(string[] args){// 创建HttpClient对象using (HttpClient client new…

【蓝桥杯基础题】门牌制作

👑专栏内容:蓝桥杯刷题⛪个人主页:子夜的星的主页💕座右铭:前路未远,步履不停目录 一、题目描述二、题目分析三、代码汇总1、C++代码2、Java 代码四、总结1、枚举思想2、取余判断每位数字一、题目描述 题目链接:门牌制作 小蓝要为一条街的住户制作门牌号。这条街一共…

Powercli批量修改分布式交换机端口组

背景 需求&#xff1a; 批量修改虚拟机的分布式端口组 解决方式一&#xff1a; 三条命令解决&#xff1a;先获取目标虚拟机、获取目标端口组、修改虚拟机端口组、检查虚拟机状态。 $vm Get-VM -Name <虚拟机名称> $portGroup Get-VirtualPortGroup -Name <端口…

视频编码转换技巧:视频批量转码H264转H265,高效且顺畅

随着数字媒体的广泛应用&#xff0c;视频编码转换已成为一种普遍的需求。不同的视频格式和编码标准使得在不同设备上播放视频成为可能&#xff0c;同时也带来了兼容性和传输效率的问题。本文讲解引用云炫AI智剪使视频编码转换技巧&#xff0c;即批量将H264编码转换为H265编码&a…

【CIO人物展】黄淮学院副CIO周鹏:构建数智化平台赋能学校高质量发展

周鹏 本文由黄淮学院副CIO周鹏投递并参与《2023中国数智化转型升级优秀CIO》榜单/奖项评选。丨推荐企业—锐捷网络 大数据产业创新服务媒体 ——聚焦数据 改变商业 黄淮学院是2004年经教育部批准成立的一所省属全日制普通本科高校。学校位于素有“豫州之腹地、天下之最中”之美…

关于ROS的网络通讯方式TCP/UDP

一、TCP与UDP TCP/IP协议族为传输层指明了两个协议&#xff1a;TCP和UDP&#xff0c;它们都是作为应同程序和网络操作的中介物。 **TCP&#xff08;Transmission Control Protocol&#xff09;协议全称是传输控制协议&#xff0c;是一种面向连接的、可靠的、基于字节流的传输…

蓝鹏测控测宽仪系列又添一员大将——双目测宽仪

轧钢过程中钢板的宽度是一个重要的参数&#xff0c;它直接决定了成材率。同时&#xff0c;随着高新科技越来越广泛的应用到工程实际中&#xff0c;许多控制系统需要钢板实时宽度值作为模型参数。 当前&#xff0c;相当一部分宽厚板厂还在采用人工检测的方法&#xff0c;检测环境…

LangChain+LLM实战---ChatGPT的工作原理

一个词一个词的输出 ChatGPT能够自动生成类似于人类书写的文本&#xff0c;这是非常了不起和出乎意料的。但它是如何做到的&#xff1f;为什么会有效果呢&#xff1f;我的目的在于大致概述ChatGPT内部发生了什么&#xff0c;然后探讨它为什么能够很好地生成我们认为有意义的文…

CSS必学:元素之间的空白与行内块的幽灵空白问题

作者:WangMin 格言:努力做好自己喜欢的每一件事 CSDN原创文章 博客地址 &#x1f449; WangMin 我们在开发的过程中&#xff0c;难免会出现一些难以预料的问题。那么其中&#xff0c;CSS空白现象就是非常常见的问题之一。虽然它已经被发现很久&#xff0c;但仍然有许多新手和经…

RabbitMQ 消息应答与发布

目录 一、消息应答 1、自动应答&#xff08;默认&#xff09; 2、手动消息应答的方法 ​编辑 3、消息重新入队 4、手动应答案列与效果演示 二、RabbitMQ持久化 1、队列持久化 2、消息持久化 三、不公平分发&#xff08;能者多劳&#xff0c;弱者少劳&#xff09; 1、…

Trajectory-guided Control Prediction for End-to-end Autonomous Driving论文学习

1. 解决了什么问题&#xff1f; 端到端自动驾驶方法直接将原始传感器数据映射为规划轨迹或控制信号&#xff0c;范式非常简洁&#xff0c;从理论上避免了多模块设计的错误叠加问题和繁琐的人为规则设计。当前的端到端自动驾驶方法主要有两条独立的研究路线&#xff0c;要么基于…

【leetcode】88. 合并两个有序数组(图解)

目录 1. 思路&#xff08;图解&#xff09;2. 代码 题目链接&#xff1a;leetcode 88. 合并两个有序数组 题目描述&#xff1a; 1. 思路&#xff08;图解&#xff09; 思路一&#xff1a;&#xff08;不满足题目要求&#xff09; 1. 创建一个大小为nums1和nums2长度之和的…