Flink多流处理之coGroup(协同分组)

这篇文章主要介绍协同分组coGroup的使用,先讲解API代码模板,后面会结图解介绍coGroup是如何将流中数据进行分组的.

1 API介绍

  • 数据源
    # 左流数据
    ➜  ~ nc -lk 6666
    101,Tom
    102,小明
    103,小黑
    104,张强
    105,Ken
    106,GG小日子
    107,小花
    108,赵宣艺
    109,明亮
    
    
    # 右流数据
    ➜  ~ nc -lk 7777
    101,,本科,程序员
    102,,本科,程序员
    103,,本科,会计
    104,,大专,安全工程师
    105,,硕士,律师
    106,未知,小本,挖粪使者
    108,,本科,人事
    110,,本科,算法工程师
    
    
  • 代码
    import org.apache.flink.api.common.functions.CoGroupFunction;
    import org.apache.flink.api.common.typeinfo.TypeHint;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.api.java.tuple.Tuple4;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.util.Collector;
    
    /**
     * @Author: J
     * @Version: 1.0
     * @CreateTime: 2023/8/10
     * @Description: 协同分组
     **/
    public class FlinkCoGroup {
        public static void main(String[] args) throws Exception {
            // 构建流环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 设置并行度
            env.setParallelism(2);
            // 数据源1(socket数据源),为了方便测试,根据实际情况自行选择
            DataStreamSource<String> sourceStream1 = env.socketTextStream("localhost", 6666);
            // 将数据进行切分返回Tuple2(id,name)
            SingleOutputStreamOperator<Tuple2<String, String>> mapStream1 = sourceStream1.map(value -> {
                String[] split = value.split(",");
                return Tuple2.of(split[0], split[1]);
            }).returns(new TypeHint<Tuple2<String, String>>() {
            });
            // 数据源2(socket数据源),为了方便测试,根据实际情况自行选择
            DataStreamSource<String> sourceStream2 = env.socketTextStream("localhost", 7777);
            // 将数据进行切分返回Tuple4(id,gender,education,job)
            SingleOutputStreamOperator<Tuple4<String, String, String, String>> mapStream2 = sourceStream2.map(value -> {
                String[] split = value.split(",");
                return Tuple4.of(split[0], split[1], split[2], split[3]);
            }).returns(new TypeHint<Tuple4<String, String, String, String>>() {});
            // 数据流协同
            DataStream<Tuple4<String, String, String, String>> coGrouped = mapStream1.coGroup(mapStream2)
                    .where(tup -> tup.f0) // 左流协同分组字段(mapStream1)
                    .equalTo(tup -> tup.f0) // 右流协同分组字段(mapStream2)
                    .window(TumblingProcessingTimeWindows.of(Time.seconds(20))) // 开窗口,以处理时间划分(每20秒一个窗口)
                    .apply(new CoGroupFunction<Tuple2<String, String>, Tuple4<String, String, String, String>, Tuple4<String, String, String, String>>() {
                        @Override
                        public void coGroup(Iterable<Tuple2<String, String>> first, Iterable<Tuple4<String, String, String, String>> second, Collector<Tuple4<String, String, String, String>> out) throws Exception {
                            /**
                             *first 代表左流的迭代器
                             * second 代表右流的迭代器
                             * out 则是返回的数据形式
                             * 具体方法中两个迭代器存数据的原理后续会通过图结合进行解析
                             **/
                            // 这里的逻辑模拟sql中left join
                            // 遍历左流数据(first)
                            for (Tuple2<String, String> left : first) {
                                // 定义右流是否为NULL判断标识
                                boolean flag = false;
                                // 遍历右流数据(second)
                                for (Tuple4<String, String, String, String> right : second) {
                                    // 返回left(id, name) + right(gender, education)
                                    Tuple4<String, String, String, String> tup4 = Tuple4.of(left.f0, left.f1, right.f1, right.f2);
                                    // 输出
                                    out.collect(tup4);
                                    // 修改判断标识
                                    flag = true;
                                }
                                // 如果右流为NULL,则输出左流的数据
                                if (!flag) {
                                    // 这里用字符串"NULL"代替null值,方便观察
                                    Tuple4<String, String, String, String> tup4 = Tuple4.of(left.f0, left.f1, "NULL", "NULL");
                                    // 输出
                                    out.collect(tup4);
                                }
                            }
                        }
                    });
            // 打印结果
            coGrouped.print();
    
            env.execute("Flink CoGroup");
    
        }
    }
    
  • 结果
    2> (102,小明,男,本科)
    1> (106,GG小日子,未知,小本)
    2> (109,明亮,NULL,NULL)
    1> (107,小花,NULL,NULL)
    2> (105,Ken,男,硕士)
    2> (103,小黑,女,本科)
    2> (101,Tom,男,本科)
    2> (108,赵宣艺,女,本科)
    2> (104,张强,男,大专)
    
    从数据源和结果数据可以看到和代码逻辑是完全吻合的.

2 原理解析

我这我们先看一下图解,如下

在这里插入图片描述

  • 无界转有界
    在代码中我们开启window,这也是使用coGroup的必要条件,开启window后实际上就是将我们原本的无界数据流转变成一个以20S为界限的有界数据流.
  • 迭代器分组
    将数据进入到窗口内后,就会根据经我们前面设定的条件也就是.where.equalTo中的内容将mapStream1mapStream2中的数据根据key进行分组存储到不同的iterator中.
  • 逻辑计算
    上面已经将数据根据key都存储到iterator中了,这里就会根据我们在new CoGroupFunction<...>(){...}中的写的逻辑将mapStream1mapStream2中具有相同keyiterator进行计算.
  • 输出
    当一个window结束后,就会将数据按照计算后的结果(在代码中就是Tuple4<String, String, String, String>)输出到下游.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/72950.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【C与C++的相互调用方法】

C与C的相互调用方法 C与C为什么相互调用的方式不同C中调用CC中调用C致谢 C与C为什么相互调用的方式不同 C 和 C 之间的相互调用方式存在区别&#xff0c;主要是由于 C 和 C 语言本身的设计和特性不同。 函数调用和参数传递方式不同&#xff1a;C 和 C 在函数调用和参数传递方面…

docker — 容器网络

一、概述 Docker容器每次重启后容器ip是会发生变化的。 这也意味着如果容器间使用ip地址来进行通信的话&#xff0c;一旦有容器重启&#xff0c;重启的容器将不再能被访问到。 而Docker 网络就能够解决这个问题。 Docker 网络主要有以下两个作用&#xff1a; 容器间的互联…

docker部署springboot

基础知识 什么是docker 官网&#xff1a; Docker Docs: How to build, share, and run applications | Docker Documentation Docker 是一个基于go语言开发的开源的应用容器引擎&#xff0c;让开发者可以打包他们的应用以及依赖包到一个可移植的容器中&#xff0c;然后发布到…

97. Interleaving String 72. Edit Distance 121. 122. 123

​​​​​​97. Interleaving String 72. Edit Distance 一个bottomup&#xff08;棋盘从右下角外围逼近[0,0]&#xff09;如果横轴是string1的index i&#xff0c;纵轴string2的index j&#xff0c;那么&#xff0c;很奇妙的是i和j一起&#xff08;从右下角的格子看&#xf…

11.Eclipse 注释模板的说明及设置

1.在eclipse中点击Window——>java——>Code Style——>CodeTemplates——>Comments 2.常用Variable 3. 我的注释模板 ①Files 文件 /** * Title: ${file_name}* Description: ${todo}* author Jeremy* date ${currentDate:date(yyyy-MM-dd hh:mm:ss)} */ ②Typ…

Kotlin入门:变量和函数——02

目录 一、Kotlin 基本数据类型 ​编辑 二、变量 val 关键字&#xff1a; var 关键字: 类型推断: 可空类型: 三、函数 基本函数语法&#xff1a; 单表达式函数&#xff1a; 默认参数值&#xff1a; 命名参数&#xff1a; 一、Kotlin 基本数据类型 Kotlin 的基本数…

树结构--介绍--二叉树遍历的递归实现

目录 树 树的学术名词 树的种类 二叉树的遍历 算法实现 遍历命名 二叉树的中序遍历 二叉树的后序遍历 二叉树的后序遍历迭代算法 二叉树的前序遍历 二叉树的前序遍历迭代算法 树 树是一种非线性的数据结构&#xff0c;它是由n(n≥0)个有限节点组成一个具有层次关系…

中电金信:ChatGPT一夜爆火,知识图谱何以应战?

随着ChatGPT的爆火出圈 人工智能再次迎来发展小高潮 那么作为此前搜索领域的主流技术 知识图谱前路又将如何呢&#xff1f; 事实上&#xff0c;ChatGPT也并非“万能”&#xff0c;作为黑箱模型&#xff0c;ChatGPT很难验证生成的知识是否准确。并且ChatGPT是通过概率模型执行推…

Django入门

Day1 django环境安装 创建虚拟环境 # step1 创建虚拟环境 python3 -m venv datawhale_django # step2 mac进入虚拟环境 source ./datawhale_django/bin/activate # step3 退出虚拟环境 deactivate安装包 pip3 install django ​pip3 install djangorestframework​​ pip3 …

Jenkins自动化打包脚本

一、背景 jenkins可以设置定时任务打包&#xff0c;也已手动点按钮打包&#xff0c;还可以通过执行http请求打包&#xff0c;今天我们就通过shell脚本&#xff0c;通过curl命令进行jenkins打包。 二、步骤 2.1 在jenkins上构建项目 设置触发器 2.2 通过shell脚本触发远程构…

电商财务新时代:轻松自动对账,财务效率倍增

电商领域频繁的多平台财务对账常常令企业头痛不已。然而&#xff0c;随着轻易云数据集成平台的崭新解决方案&#xff0c;财务对账的痛点迎刃而解。本文通过引人入胜的实例&#xff0c;深入探讨电商财务对账的现状&#xff0c;突出轻易云数据集成平台在自动对账中的强大作用&…

感受RFID服装门店系统的魅力

嘿&#xff0c;亲爱的时尚追随者们&#xff01;今天小编要给你们带来一股时尚新风潮&#xff0c;让你们感受一下什么叫做“RFID服装门店系统”&#xff0c;这个超酷的东西&#xff01; 别着急&#xff0c;先别翻白眼&#xff0c;小编来解释一下RFID是什么玩意儿。它是射频识别…

RFID技术助力半导体制造行业自动化生产

由于芯片短缺问题和近2年海运拥堵和成本上升等因素&#xff0c;致使全球资本对于芯片制造工厂的投入增大&#xff0c;而中兴、华为的例子已经凸显出国产半导体供应链的重要性&#xff0c;除去地缘政治上的意义&#xff0c;发展半导体其实是中国经济的转型的必走之路。 半导体生…

Programming abstractions in C阅读笔记:p107-p110

《Programming Abstractions In C》学习第46天&#xff0c;p107-p110&#xff0c;3.1小节——“The concept of interface”&#xff0c;总结如下&#xff1a; 一、技术总结 1.client p108&#xff0c;调用library的program称为client。 2.interface p108&#xff0c;“To do …

【刷题笔记8.13】【动态规划相关】LeetCode题目:斐波那契数列、爬楼梯

【动态规划相关】LeetCode题目&#xff1a;斐波那契数列、爬楼梯 &#xff08;一&#xff09;爬楼梯 题目描述 假设你正在爬楼梯。需要 n 阶你才能到达楼顶。 每次你可以爬 1 或 2 个台阶。你有多少种不同的方法可以爬到楼顶呢&#xff1f; 提示&#xff1a; 1 < n <…

第57步 深度学习图像识别:CNN可视化(Pytorch)

基于WIN10的64位系统演示 一、写在前面 由于不少模型使用的是Pytorch&#xff0c;因此这一期补上基于Pytorch实现CNN可视化的教程和代码&#xff0c;以SqueezeNet模型为例。 二、CNN可视化实战 继续使用胸片的数据集&#xff1a;肺结核病人和健康人的胸片的识别。其中&…

CSS变形与动画(一):transform变形 与 transition过渡动画 详解(用法 + 代码 + 例子 + 效果)

文章目录 变形与动画transform 变形translate 位移scale 缩放rotate 旋转skew 倾斜多种变形设置变形中心点 transition 过渡动画多种属性变化 变形与动画 transform 变形 包括&#xff1a;位移、旋转、缩放、倾斜。 下面的方法都是transform里的&#xff0c;记得加上。 展示效…

pconsc4 安装

Pconsc4 安装遇到的问题 Pconsc4-github 按照红框给的一行命令&#xff0c;一行毁所有。 1 gcc and g not found # 1 Start by updating the packages list:sudo apt update# 2 Install the build-essential package by typing:sudo apt install build-essential## The comm…

在 Windows 中恢复数据的 5 种方法

发生数据丢失的原因有多种。无论是因为文件被意外删除、文件系统或操作系统损坏&#xff0c;还是由于软件或硬件级别的存储故障&#xff0c;数据都会在您最意想不到的时候丢失。今天我们重点介绍五种数据恢复方法&#xff0c;以应对意外情况的发生。 1.从另一台机器启动硬盘 如…