flink1.17 eventWindow不要配置processTrigger

理论上可以eventtime processtime混用,但是下面代码测试发现bug,输入一条数据会一直输出.

flink github无法提bug/问题. apache jira账户新建后竟然flink又需要一个账户,放弃

bug复现操作

idea运行代码后 往source kafka发送一条数据  

a,1,1690304400000

可以看到无限输出:

理论上时间语义不建议混用,但是在rich函数中的确可以做到混用且正常使用

问题复现代码

package com.yy.flinkWindowAndTrigger

import com.yy.flinkWindow.M1
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.configuration.{Configuration, RestOptions}
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.time.Time.seconds
import org.apache.flink.streaming.api.windowing.triggers.{ContinuousProcessingTimeTrigger, CountTrigger, ProcessingTimeTrigger}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.joda.time.Seconds


object flinkEventWindowAndProcessTriggerBUGLearn {
  def main(args: Array[String]): Unit = {


    // flink 启动本地webui
    val conf = new Configuration
    conf.setInteger(RestOptions.PORT, 28080)

    //    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
    //    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.configure(conf)


    /*
    kafka输入:
        a,1,1690304400000        //对应 2023-07-26 01:00:00 (无限输出)       //如果传入 a,1,1693037756000 对应:2023-08-26 16:15:56 (1条/s)
        a,1,7200000               // 1970-01-1 10:00:00
     */
    val brokers = "172.18.105.147:9092"
    val source = KafkaSource.builder[String].setBootstrapServers(brokers)
      .setTopics("t1")
      .setGroupId("my-group-23asdf46")
      .setStartingOffsets(OffsetsInitializer.latest())
      // .setDeserializer() // 参数: KafkaRecordDeserializationSchema
      .setDeserializer(new M1())
      .build()


    val ds1 = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")


    val s1 = ds1
      .map(_.split(","))
      .map(x => C1(x(0), x(1).toInt, x(2).toLong)) // key number 时间戳
      .assignTimestampsAndWatermarks(new OTAWatermarks(Time.seconds(0)))
      .keyBy(_.f1)
      .window(TumblingEventTimeWindows.of(seconds(10)))
      .trigger(ContinuousProcessingTimeTrigger.of[TimeWindow](seconds(10L)))
      .reduce((x, y) => C1(x.f1, x.f2 + y.f2, 100L))


    s1.print()


    env.execute("KafkaNewSourceAPi")
  }

  // 乱序流
  class OTAWatermarks(time: Time) extends BoundedOutOfOrdernessTimestampExtractor[C1](time) {
    override def extractTimestamp(element: C1): Long = {
      element.f3
    }
  }


  // key num timestamp
  case class C1(f1: String, f2: Int, f3: Long)
}

-

-

maven pom

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>FlinkLocalDemo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>FlinkLocalDemo</name>
    <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.17.1</flink.version>
        <scala.binary.version>2.12</scala.binary.version>
        <scala.version>2.12.8</scala.version>
    </properties>



    <dependencies>
        <!-- https://mvnrepository.com/artifact/joda-time/joda-time -->
        <dependency>
            <groupId>joda-time</groupId>
            <artifactId>joda-time</artifactId>
            <version>2.12.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-avro</artifactId>
            <version>${flink.version}</version>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.alibaba.fastjson2/fastjson2 -->
        <dependency>
            <groupId>com.alibaba.fastjson2</groupId>
            <artifactId>fastjson2</artifactId>
            <version>2.0.33</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83</version>
<!--            <version>1.2.17</version>-->
        </dependency>


        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-common -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- 引入flink1.13.0 scala2.12.12   -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- Either... -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- or... -->

<!--        下面几个是代码中写sql需要的包 四个中一个都不能少 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner-loader -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-loader</artifactId>
            <version>${flink.version}</version>
<!--            <scope>test</scope>-->
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-runtime -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <!--  https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
        </dependency>


<!--        注意: flink-table-planner-loader 不能和 flink-table-planner_${scala.binary.version} 共存-->
        <!--        <dependency>-->
        <!--            <groupId>org.apache.flink</groupId>-->
        <!--            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>-->
        <!--            <version>${flink.version}</version>-->
        <!--            <scope>provided</scope>-->
        <!--        </dependency>-->


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc</artifactId>
            <version>3.1.0-1.17</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.11</version>
        </dependency>


    </dependencies>
    <build>
            <plugins>
                <!-- 打jar插件 -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>2.4.3</version>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <filters>
                                    <filter>
                                        <artifact>*:*</artifact>
                                        <excludes>
                                            <exclude>META-INF/*.SF</exclude>
                                            <exclude>META-INF/*.DSA</exclude>
                                            <exclude>META-INF/*.RSA</exclude>
                                        </excludes>
                                    </filter>
                                </filters>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
                <plugin>
                    <groupId>org.scala-tools</groupId>
                    <artifactId>maven-scala-plugin</artifactId>
                    <version>2.15.2</version>
                    <executions>
                        <execution>
                            <goals>
                                <goal>compile</goal>
                                <goal>testCompile</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <scalaVersion>${scala.version}</scalaVersion>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.5.5</version>
                <configuration>
                    <!--这部分可有可无,加上的话则直接生成可运行jar包-->
                    <!--<archive>-->
                    <!--<manifest>-->
                    <!--<mainClass>${exec.mainClass}</mainClass>-->
                    <!--</manifest>-->
                    <!--</archive>-->
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.1</version>
                    <configuration>
                        <source>11</source>
                        <target>11</target>
                    </configuration>
                </plugin>
        </plugins>
    </build>
</project>















 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/60562.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

etcd

文章目录 etcd单机安装设置键值对watch操作读取键过往版本的值压缩修订版本lease租约&#xff08;过期机制&#xff09;授予租约撤销租约keepAlive续约获取租约信息 事务基于etcd实现分布式锁原生实现官方 concurrency 包实现 服务注册与发现Go 操作 Etcd 参考 etcd etcd 是一…

计算机网络-三种交换方式

计算机网络-三种交换方式 电路交换(Circuit Switching) 电话交换机接通电话线的方式称为电路交换从通信资源分配的角度来看&#xff0c;交换(Switching)就是按照某种方式动态的分配传输线路的资源 电话交换机 为了解决电话之间通信两两之间连线过多&#xff0c;所以产生了电话…

原型模式(Prototype)

原型模式是一种创建型设计模式&#xff0c;使调用方能够复制已有对象&#xff0c;而又无需使代码依赖它们所属的类。当有一个类的实例&#xff08;原型&#xff09;&#xff0c;并且想通过复制原型来创建新对象时&#xff0c;通常会使用原型模式。 The Prototype pattern is g…

解决Pycharm下opencv代码提示问题

解决Pycharm下opencv代码提示问题 新安装了opencv-python4.7.0.72版本&#xff0c;在pycharm中导入cv2后没有代码提示&#xff0c;解决方法如下&#xff1a; 解决方案 我使用的是miniconda环境&#xff0c;opencv安装的路径为&#xff1a;C:\Env\miniconda3\envs\compute\Li…

Vue3和TypeScript项目-移动端兼容

1 全局安装typescript 2 检测安装成功 3 写的是ts代码&#xff0c;但是最后一定要变成js代码&#xff0c;才能在浏览器使用 这样就会多一个js文件 3 ts语法 数组语法 对象语法 安装vue3项目 成功后进入app。安装依赖。因为我们用的是脚手架&#xff0c;要引入东西的时候不需要…

python的下载和安装步骤,python下载安装教程3.10.0

大家好&#xff0c;给大家分享一下python下载安装教程3.10.0&#xff0c;很多人还不知道这一点。下面详细解释一下。现在让我们来看看&#xff01; 第一步&#xff1a;下载Python安装包 在Python的官网 www.python.org 中找到最新版本的Python安装包&#xff0c;点击进行下载&a…

【Java】UWB高精度工业人员安全定位系统源码

基于VueSpring boot前后端分离架构开发的一套UWB技术高精度定位系统源码。 UWB高精度人员定位系统提供实时定位、电子围栏、轨迹回放等基础功能以及各种拓展功能,用户可根据实际需要任意选择搭配拓展功能。该系统简易部署&#xff0c;方便使用&#xff0c;实时响应。UWB高精度定…

【小沐学前端】GitBook制作在线电子书、技术文档(gitbook + Markdown + node)

文章目录 1、简介1.1 工具简介1.2 使用费用 2、安装2.1 安装node2.2 安装gitbook 3、测试3.1 编辑文档3.2 编译工程3.3 预览工程 结语 1、简介 官网地址&#xff1a; https://www.gitbook.com/1.1 工具简介 什么是 GitBook&#xff1f; GitBook 是一个现代文档平台&#xff…

黑客技术(网络安全)自学

一、黑客是什么 原是指热心于计算机技术&#xff0c;水平高超的电脑专家&#xff0c;尤其是程序设计人员。但后来&#xff0c;黑客一词已被用于泛指那些专门利用电脑网络搞破坏或者恶作剧的家伙。 二、学习黑客技术的原因 其实&#xff0c;网络信息空间安全已经成为海陆空之…

Stable Diffusion教程(6) - 图片高清放大

放大后细节 修复图片损坏 显存占用 速度 批量放大 文生图放大 好 是 高 慢 否 附加功能放大 一般 否 中 快 是 图生图放大 好 是 低 慢 是 tile模型放大 非常好 是 高 快 是 通过文生图页面的高清修复 优点&#xff1a;放大时能添加更多细节&am…

DAY3,C高级(shell中的变量、数组、算术运算、分支结构)

1.今日思维导图&#xff1b; 2.判断家目录下&#xff0c;普通文件的个数和目录文件的个数&#xff1b; 1 #!/bin/bash2 arr1(ls -la ~/ | cut -d r -f 1 | grep -w -)3 arr2(ls -la ~/ | cut -d r -f 1 | grep -w d)4 echo "普通文件个数&#xff1a;${#arr1[*]}"5 e…

Qt Creator中designer使用QWebEngine异常排查

Qt Creator中designer使用QWebEngine异常排查 1、前提背景 最近由于版权的原因&#xff0c;我们采取了自编译的Qt Creator。编译完成之后启动Qt Creator刚开始一切都是很顺利。 但是在Creator中打开designer&#xff0c;使用QWebEngine控件就发生了异常&#xff0c;Qt Creat…

单通道 6GSPS 16位采样DAC子卡模块--【资料下载】

FMC147是一款单通道6.4GSPS&#xff08;或者配置成2通道3.2GSPS&#xff09;采样率的12位AD采集、单通道6GSPS&#xff08;或配置成2通道3GSPS&#xff09;采样率16位DA输出子卡模块&#xff0c;该板卡为FMC标准&#xff0c;符合VITA57.4规范&#xff0c;该模块可以作为一个理想…

python数据容器

目录 数据容器 反向索引 list列表 语法 案例 列表的特点 列表的下表索引 list的常用操作 list列表的遍历 while循环遍历 for循环遍历 tuple元组 前言 元组定义 元组特点 获取元组元素 元组的相关操作 元组的遍历 while循环遍历 for循环遍历 字符串 前言…

MySQL事务篇:ACID原则、事务隔离级别及事务机制原理剖析

引言 众所周知&#xff0c;MySQL数据库的核心功能就是存储数据&#xff0c;通常是整个业务系统中最重要的一层&#xff0c;可谓是整个系统的“大本营”&#xff0c;因此只要MySQL存在些许隐患问题&#xff0c;对于整个系统而言都是致命的。那此刻不妨思考一个问题&#xff1a; …

HTML5+CSS3小实例:带标题的3D多米诺人物卡片

实例:带标题的3D多米诺人物卡片 技术栈:HTML+CSS 效果: 源码: 【html】 <!DOCTYPE html> <html><head><meta http-equiv="content-type" content="text/html; charset=utf-8"><meta name="viewport" content…

Unity 引擎做残影效果——2、屏幕后处理方式

Unity实现残影效果 大家好&#xff0c;我是阿赵。 这里继续介绍Unity里面做残影的方法。之前介绍了BakeMesh的方法做残影&#xff0c;这一期介绍的是用屏幕后处理的方法做残影。 一、原理 之前的BakeMesh方法&#xff0c;是真的生成了很多个网格模型在场景里面。如果用后处理做…

前端如何打开钉钉(如何唤起注册表中路径与软件路径不关联的软件)

在前端唤起本地应用时&#xff0c;我查询了资料&#xff0c;在注册表中找到腾讯视频会议的注册表情况&#xff0c;如下&#xff1a; 在前端代码中加入 window.location.href"wemeet:"; 就可以直接唤起腾讯视频会议&#xff0c;但是我无法唤起钉钉 之所以会这样&…

使用手机相机检测电脑屏幕刷新率Hz

使用手机相机检测电脑屏幕刷新率Hz 1、电脑打开https://www.testufo.com/frameskipping 2、相机专业模式&#xff1a;快门1/10、ISO自动&#xff0c;拍摄一张照片。120Hz至少要有12个亮块&#xff0c;50Hz至少有6个亮块。 更改刷新速率 1、选择 “开始>设置>系统>显示…

Nodejs 第八章(npm搭建私服)

构建npm私服 构建私服有什么收益吗&#xff1f; 可以离线使用&#xff0c;你可以将npm私服部署到内网集群&#xff0c;这样离线也可以访问私有的包。提高包的安全性&#xff0c;使用私有的npm仓库可以更好的管理你的包&#xff0c;避免在使用公共的npm包的时候出现漏洞。提高…