flink入门程序(一)

Flink中提供了3个组件,包括DataSource、Transformation和DataSink
DataSource:表示数据源组件,主要用来接收数据,目前官网提
供了readTextFile、socketTextStream、fromCollection以及一些第三方的Source。
Transformation:表示算子,主要用来对数据进行处理,比如Map、FlatMap、Filter、Reduce、Aggregation等。
DataSink:表示输出组件,主要用来把计算的结果输出到其他存
储介质中,比如writeAsText以及Kafka、Redis、Elasticsearch
等第三方Sink组件。
因此,想要组装一个Flink Job,至少需要这3个组件。
以下来看一个flink的入门程序
首先项目里面引入依赖

        <!--flink集成-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.13.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.13.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.13.1</version>
        </dependency>

在这里我使用的是1.13.1版本
开发 Flink 程序有固定的流程
(1)获得 一个执行环境
(2)加载/创建初始化数据
(3)指定操作数据的 Transaction算子
(4)指定计算好的数据的存放位置
(5)调用 execute()触发执行程序

入门案例:flink从文件中读取数据,并统计word的个数,具体代码如下

    public static void main(String[] args) throws Exception{
        //获得执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //加载或创建具体数据源
        DataStream<Tuple2<String, Integer>> dataStream =
                 env.readTextFile("C:\\d_disk\\zq_project\\interesting\\src\\main\\resources\\22.txt")
                .flatMap(new Splitter())
                .keyBy(value -> value.f0) //按照元组里面的第一个元素分组
                //.window(TumblingProcessingTimeWindows.of(Time.seconds(5))) //时间窗口
                .sum(1);//按照元组里面的第二个元素求和
        dataStream.print();
        env.execute("wordCount");
    }

    public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            //根据空格切割行文本
            for (String word: sentence.split(" ")) {
                //把每个切割后的word放到一个二维元组里面,并计数为1
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }

txt文本内容如下

apple iphone
orange text
apple orange
dog dog cat pig

控制台打印效果如下

1> (cat,1)
5> (orange,1)
7> (apple,1)
2> (text,1)
3> (dog,1)
3> (dog,2)
3> (pig,1)
5> (orange,2)
6> (iphone,1)
7> (apple,2)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/569194.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

如何提取单片机片内程序的值进行拷贝?

对于许多单片机&#xff0c;其固件是由制造商保护的&#xff0c;并且未经授权的访问、拷贝或修改可能侵犯法律。我这里有一套嵌入式入门教程&#xff0c;不仅包含了详细的视频 讲解&#xff0c;项目实战。如果你渴望学习嵌入式&#xff0c;不妨点个关注&#xff0c;给个评论222…

跨部门协作中的沟通困境与平台建设策略——以软硬件研发为例

一、背景 在科技行业&#xff0c;跨部门合作的重要性不言而喻&#xff0c;然而实际工作中&#xff0c;经常会遭遇沟通不畅的现象。以软件与硬件研发部门为例&#xff0c;两者在产品研发过程中经常需要紧密协作&#xff0c;但却时常出现信息传递障碍。当你试图阐述观点时&#…

LangSmith帮助测试大模型系统

LangSmith是评估大模型能力好坏的评估工具,能够量化评估基于大模型的系统的效果。LangSmith通过记录langchain构建的大模型应用的中间过程,从而能够更好的调整提示词等中间过程做优化。想要使用LangSmith首先进入他的设置页面,https://smith.langchain.com/settings注册一个…

多商家AI智能名片商城系统(开源版)——构建高效数字化商业新生态

一、项目概述 1、项目背景 1&#xff09;起源 随着数字化时代的快速发展&#xff0c;传统名片和商城系统已经难以满足企业日益增长的需求。商家需要更高效、更智能的方式来展示自己的产品和服务&#xff0c;与消费者进行互动和交易。同时&#xff0c;开源技术的普及也为开发…

安卓玩机工具推荐----MTK芯片 简单制作线刷包 备份分区 备份基带 去除锁类 推荐工具操作解析

工具说明 在前面几期mtk芯片类玩机工具中解析过如何无官方固件从手机抽包 制作线刷包的步骤&#xff0c;类似的工具与操作有很多种。演示的只是本人片面的理解与一些步骤解析。mtk芯片机型抽包关键点在于..mt*****txt的分区地址段引导和 perloader临时分区引导。前面几期都是需…

在控制台实现贪吃蛇

在控制台实现贪吃蛇 前备知识Win32APICOORD这个结构体的声明如下&#xff1a;GetStdHandle 函数GetConsoleCursorInfo 函数SetConsoleCursorInfo 函数 SetConsoleCursorPosition 函数getAsyncKeyState 函数 控制台窗口的大小以及字符打印介绍控制台中的坐标宽字符及本地化介绍s…

多线程情况下IBMMQ报文丢失原因分析

背景 最近工作中&#xff0c;使用IBMMQ&#xff0c;重启服务时有偶发性的报文丢失情况&#xff0c;应用从队列中获取到了消息&#xff0c;但是线程停止没有处理。 分析 消息处理线程流程&#xff1a; 判断线程状态是否可用&#xff0c;如果不可用直接返回。使用MQQueue.get…

Seurat -- Introduction to scRNA-seq integration 跟随学习记录

文章目录 数据是如何转换的原始ifnb数据对象Splits object后的数据对象数据对象构建完成后的标准流程Normalization后的数据对象scale 后的数据对象 不同的样本进行整合JoinLayers干了什么 数据是如何转换的 seurat object 中assays R N A l a y e r s RNAlayers RNAlayersco…

卡尔曼滤波器(一):卡尔曼滤波器简介

观看MATLAB技术讲座笔记&#xff0c;该技术讲座视频来自bilibili账号&#xff1a;MATLAB中国。 一、什么是卡尔曼滤波器 卡尔曼滤波器是一种优化估计算法&#xff0c;是一种设计最优状态观测器的方法&#xff0c;其功能为&#xff1a; 估算只能被间接测量的变量&#xff1b;通…

​漏电继电器JHOK-ZBLφ150mm 0.03-3A 0.2-2S导轨安装JOSEF约瑟

系列型号&#xff1a; JHOK-ZBL多档切换式漏电&#xff08;剩余&#xff09;继电器&#xff08;导轨&#xff09; JHOK-ZBL1多档切换式漏电&#xff08;剩余&#xff09;继电器 JHOK-ZBL2多档切换式漏电&#xff08;剩余&#xff09;继电器 JHOK-ZBM多档切换式漏电&#xff08;…

深入理解分布式事务① ---->分布式事务基础(四大特性、五大类型、本地事务、MySQL并发事务问题、MySQL事务隔离级别命令设置)详解

目录 深入理解分布式事务① ---->分布式事务基础&#xff08;四大特性、五大类型、本地事务、MySQL并发事务问题、MySQL事务隔离级别命令设置&#xff09;详解事务的基本概念1、什么是事务&#xff1f;2、事务的四大特性2-1&#xff1a;原子性&#xff08;Atomic&#xff09…

STM32点灯大师(中断法)

一、使用CubeMX配置 新增加了RCC进行配置 二、代码 需要重写虚函数&#xff0c;给自己引用

Python打怪升级(4)

在计算机领域常常有说"合法"和"非法"指的是:是否合理&#xff0c;是否有效&#xff0c;并不是指触犯了法律。 random.randint(begin,end) 详细讲解一下这个random是指模板&#xff0c;也就是别人写好的代码直接来用&#xff0c;在Python当中&#xff0c;…

《R语言与农业数据统计分析及建模》学习——ggplot2绘图基础

一、农业科研数据可视化常用图形及用途 1、数据可视化的重要性 通过可视化&#xff0c;我们可以更直观地理解和分析数据的特征和趋势。 2、常用图表类型及其概述 散点图&#xff1a;用于展示两个变量之间的关系&#xff0c;可用于观察数据的分布、趋势和异常值。 折线图&…

网络安全之CSRFSSRF漏洞(上篇)(技术进阶)

目录 一&#xff0c;CSRF篇 二&#xff0c;认识什么是CSRF 三&#xff0c;实现CSRF攻击的前提 四&#xff0c;实战演练 【1】案例1 【2】案例2 【3】案例3 【4】案例4&#xff08;metinfo&#xff09; 一&#xff0c;CSRF篇 二&#xff0c;认识什么是CSRF CSRF&#x…

YesPMP众包平台最新项目

YesPMP一站式互联网众包平台&#xff0c;最新外包项目&#xff0c;有感兴趣的用户可进入平台参与竞标。 &#xff08;竞标后由项目方直接与服务商联系&#xff0c;双方直接对接&#xff09; 1.查看项目&#xff1a;个人技术-YesPMP平台 2.查看项目&#xff1…

【003_音频开发_基础篇_Linux进程通信(20种你了解几种?)】

003_音频开发_基础篇_Linux进程通信&#xff08;20种你了解几种&#xff1f;) 文章目录 003_音频开发_基础篇_Linux进程通信&#xff08;20种你了解几种&#xff1f;)创作背景Linux 进程通信类型fork() 函数fork() 输出 2 次fork() 输出 8 次fork() 返回值fork() 创建子进程 方…

zkVM选型要点

1. 引言 当选择ZK工具&#xff0c;来做可验证链下计算来扩容区块链时&#xff0c;需考虑&#xff1a; 1&#xff09;为何应选择zkVM&#xff1f;2&#xff09;zkVM有哪些基本功能&#xff1f;3&#xff09;哪些zkVM可提供这些基本功能&#xff1f; 2. 为何应选择zkVM&#x…

OpenCV——图像分块局部阈值二值化

目录 一、算法原理1、算法概述2、参考文献 二、代码实现三、结果展示 OpenCV——图像分块局部阈值二值化由CSDN点云侠原创&#xff0c;爬虫自重。如果你不是在点云侠的博客中看到该文章&#xff0c;那么此处便是不要脸的爬虫。 一、算法原理 1、算法概述 针对目前局部阈值二值…

消息队列 Kafka 入门篇(二) -- 安装启动与可视化工具

一、Windows 10 环境安装 1、下载与解压 首先&#xff0c;访问Apache Kafka的官方下载地址&#xff1a; https://kafka.apache.org/downloads 在本教程中&#xff0c;我们将使用kafka_2.13-2.8.1版本作为示例。下载完成后&#xff0c;解压到您的工作目录的合适位置&#xff…
最新文章