Flink多流处理之connect拼接流

Flink中的拼接流connect的使用其实非常简单,就是leftStream.connect(rightStream)的方式,但是有一点我们需要清楚,使用connect后并不是将两个流给串联起来了,而是将左流和右流建立一个联系,作为一个大的流,并且这个大的流可以使用相同的逻辑处理leftStreamrightStream,也可以使用不同的逻辑处理leftStreamrightStream.
如下图:
在这里插入图片描述

下面的演示代码也可以通过这个图结合来看,其实connect算子最主要的作用就是共享状态,如常用的广播状态.

  • 代码
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;

import java.util.Arrays;

/**
 * @Author: J
 * @Version: 1.0
 * @CreateTime: 2023/8/7
 * @Description: 多流操作-流连接
 **/
public class FlinkConnect {
    public static void main(String[] args) throws Exception {
        // 构建流环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度
        env.setParallelism(3);
        // 添加数据源1
        DataStreamSource<String> sourceStream1 = env.fromCollection(Arrays.asList("a", "b", "c", "d"));
        // 添加数据源2
        DataStreamSource<Double> sourceStream2 = env.fromCollection(Arrays.asList(22.2, 11.0, 6.0, 98.0, 100.0));

        // 拼接数据流
        ConnectedStreams<String, Double> connectedStream = sourceStream1.connect(sourceStream2);

        // 这里使用map算子作为演示
        SingleOutputStreamOperator<String> resultStream = connectedStream.map(new CoMapFunction<String, Double, String>() {
            /**
             * map1作为左流
             **/
            @Override
            public String map1(String value) throws Exception {
                return "字符串: " + value;
            }

            /**
             * map2作为右流
             **/
            @Override
            public String map2(Double value) throws Exception {
                return "数字: " + (value * 100);
            }
        });

        // 打印结果
        resultStream.print();

        env.execute("Connect Operator");
    }
}

  • 结果
3> 字符串: b
1> 数字: 600.0
2> 字符串: a
3> 数字: 1100.0
2> 数字: 2220.0
2> 字符串: d
2> 数字: 9800.0
3> 数字: 10000.0
1> 字符串: c

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/65222.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

学习pytorch

学习pytorch 1. 环境安装配置镜像源conda命令记录遇到的问题1. torch.cuda.is_available() False 1. 环境安装 B站小土堆视频 配置镜像源 conda config --show channels conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/mainhttp://www.m…

canvas实现代码雨

学习抖音&#xff1a; 渡一前端必修课 效果图&#xff1a; 全部代码&#xff1a; <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge">&…

idea使用protobuf

本文参考&#xff1a;https://blog.csdn.net/m0_37695902/article/details/129438549 再次感谢分享 什么是 protobuf &#xff1f; Protocal Buffers(简称protobuf)是谷歌的一项技术&#xff0c;用于结构化的数据序列化、反序列化。 由于protobuf是跨语言的&#xff0c;所以用…

【Linux命令行与Shell脚本编程】第十六章 Shell函数

Linux命令行与Shell脚本编程 第一章 文章目录 Linux命令行与Shell脚本编程六.函数6.1.脚本函数基础6.1.1.创建函数6.1.2.使用函数 6.2.函数返回值6.2.1.默认的退出状态码6.2.2.使用return命令6.2.3.使用函数输出 6.3.函数中使用变量6.3.1.向函数传递参数6.3.2.在函数中处理变量…

Spring 是如何解决循环依赖问题的?

项目场景&#xff1a; 提示&#xff1a;这里简述项目相关背景&#xff1a; 例如&#xff1a;项目场景&#xff1a;示例:通过蓝牙芯片(HC-05)与手机 APP 通信&#xff0c;每隔 5s 传输一批传感器数据(不是很大) 问题描述 我们都知道&#xff0c;如果在代码中&#xff0c;将两个…

机器学习深度学习——循环神经网络RNN

&#x1f468;‍&#x1f393;作者简介&#xff1a;一位即将上大四&#xff0c;正专攻机器学习的保研er &#x1f30c;上期文章&#xff1a;机器学习&&深度学习—语言模型和数据集 &#x1f4da;订阅专栏&#xff1a;机器学习&&深度学习 希望文章对你们有所帮助…

c++ 运算符重载

为什么要有运算符重载&#xff1f; 观察下列代码&#xff0c;当我们要比较两个日期类(自定义类型)的大小的时候&#xff0c;我们没法使用编译器自带的小于<符号来比较&#xff0c;就像这样的形式&#xff1a;d1 < d2 我们需要自己写一个函数来进行比较&#xff0c;这是很…

YOLOv5源码中的参数超详细解析(2)— 配置文件yolov5s.yaml

前言&#xff1a;Hello大家好&#xff0c;我是小哥谈。YOLOv5配置了5种不同大小的网络模型&#xff0c;分别是YOLOv5n、YOLOv5s、YOLOv5m、YOLOv5l、YOLOv5x&#xff0c;其中YOLOv5n是网络深度和宽度最小但检测速度最快的模型&#xff0c;其他4种模型都是在YOLOv5n的基础上不断…

深度补全算法-CompletionFormer-已开源效果最好

《CompletionFormer: Depth Completion with Convolutions and Vision Transformers 》 摘要 给定稀疏深度和相应的 RGB 图像&#xff0c;深度补全旨在整个图像中空间传播稀疏测量值&#xff0c;以获得密集的深度预测。尽管基于深度学习的深度补全方法取得了巨大进步&#xff0…

分清性能测试,负载测试,压力测试这三个的区别

做测试一年多来&#xff0c;虽然平时的工作都能很好的完成&#xff0c;但最近突然发现自己在关于测试的整体知识体系上面的了解很是欠缺&#xff0c;所以&#xff0c;在工作之余也做了一些测试方面的知识的补充。不足之处&#xff0c;还请大家多多交流&#xff0c;互相学习。 …

从 GPT4All 体验 LLM

推荐&#xff1a;使用 NSDT场景编辑器 助你快速搭建可编辑的3D应用场景 什么是 GPT4All&#xff1f; 术语“GPT”源自 Radford 等人 2018 年论文的标题“通过生成预训练提高语言理解”。本文描述了如何证明变压器模型能够理解人类语言。 从那时起&#xff0c;许多人尝试使用转…

UNIX 入门

与 UNIX 建立连接启动会话登录命令提示符修改口令退出系统 简单的 UNIX 命令命令格式ls 命令who 命令虚拟终端 tty伪终端 ptywho am i 命令 cal 命令help 命令man 命令 shell 概述shell 命令更换 shell临时更改 shell永久更改 shell 登录过程 与 UNIX 建立连接 启动会话 要启…

爬虫010_列表高级_添加_append_extend_修改_查询_in_not int_删除_del_pop_remove---python工作笔记029

然后再来看列表操作 首先添加append方法 然后插入,坐标是要插入的下标,右边是插入的内容 看结果 1,2,3,4,5,6 然后这个extend,是逐个插入,放到后边 然后是修改,直接对下标赋值 看结果</

Matlab之利用MarkerFaceColor来填充marker

matlab画图在加一些marker的时候, 有实心的圆圈, 比如: plot(x,y,.r,MarkerSize,20)但是如果想要一个很大的marker, 就需要把这个markersize调得很大, 比如MarkerSize20 但是也可以用空心的圆圈然后把中间涂上颜色, 这样调整起来更方便. 比如: plot(x,y,or,MarkerSize,5,Mar…

【LeetCode每日一题】——219.存在重复元素II

文章目录 一【题目类别】二【题目难度】三【题目编号】四【题目描述】五【题目示例】六【题目提示】七【解题思路】八【时间频度】九【代码实现】十【提交结果】 一【题目类别】 哈希表 二【题目难度】 简单 三【题目编号】 219.存在重复元素II 四【题目描述】 给你一个…

MySQL多实例下安装不同的版本

MySQL多版本安装 主要步骤&#xff1a; 1. 在/etc/my.cnf 配置中&#xff0c;更改对应配置。相对于同一版本多实例需要配置的参数&#xff0c;不同版本多实例需要多配置basedir参数&#xff0c;指向mysql的解压目录。 2. 初始化数据目录。进入对应解压的MySQL目录&#xff…

如何使用Kafka构建事件驱动的架构

事件驱动的架构(EDA)是一种软件设计模式&#xff0c;它关注事件的生成、检测和使用&#xff0c;以支持高效和可扩展的系统。在EDA中&#xff0c;事件是组件之间通信的主要手段&#xff0c;允许它们实时交互和响应更改。这种架构促进了松散耦合、可扩展性和响应性&#xff0c;使…

【JAVA】有关时间的操作在编程中如何实现?

个人主页&#xff1a;【&#x1f60a;个人主页】 系列专栏&#xff1a;【❤️初识JAVA】 文章目录 前言Date 类Date 类方法Data的缺陷实例获取当前日期时间日期比较java中设置date数据的显示格式 前言 在许多应用程序中&#xff0c;日期和时间的处理是必不可少的。Java提供了一…

C语言数组第十课---------------三子棋-------数组经典练手题

作者前言 &#x1f382; ✨✨✨✨✨✨&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f382; &#x1f382; 作者介绍&#xff1a; &#x1f382;&#x1f382; &#x1f382;…

【数据结构】带你图文结合深入栈和队列,并具体分步实现

君兮_的个人主页 勤时当勉励 岁月不待人 C/C 游戏开发 Hello,米娜桑们&#xff0c;这里是君兮_&#xff0c;我们继续来学习初阶数据结构的内容&#xff0c;今天我们要讲的是栈与队列部分的内容&#xff0c;这篇博客先讲栈&#xff0c;队列我们放到下次再讲 好了&#xff0c;废…
最新文章