Flink学习——基本概述

目录

一、简介

1.1 flink是什么

1.2 flink主要特点

核心特性: 

分层API:

1.3 flink vs spark

1.3.1 数据处理框架

 1.3.2 数据模型

1.3.3 运行时架构

二、wordcount实例

2.1 项目依赖

2.2 添加框架支持

2.3 批处理 - DataSet API

2.4 有界流处理wordcount

2.5 无界流处理wordcount

执行API

主机192.168.136.20

配置端口

三、Flink部署

3.1 flink集群中的主要组件

1. 启动

2. 访问Web UI

3.3 集群启动

2. masters和worker

3. 分发安装目录

4. 启动集群

3.4  向集群提交作业

1. webUI提交作业

2. 命令行提交作业

3.5 部署模式

3.6 独立模式

3.7 yarn模式

 启动集群


一、简介

1.1 flink是什么

apache flink是一个框架分布式处理引擎,用于对无界和有界数据流进行状态计算。

1.2 flink主要特点

        类似于一个管道,数据处理完了之后能够及时输出,flink主要应用场景就是处理大规模的数据流。

核心特性: 

1、高吞吐、低延迟,每秒处理百万个事件,毫秒级延迟

2、 结果的准确性。flink提供了事件时间(event-time)和处理时间(processing-time)。对于乱序事件流,事件事件语义仍然能提供一致且准确的结果。

3、 精确一次(ecatly-once)的状态一致性保证

4、 可以与常用存储系统连接。

5、 高可用,支持动态扩展

分层API:

 约顶层越抽象,表达含义越简明,使用越方便

 约底层越具体,表达含义越丰富,使用越灵活

1.3 flink vs spark

1.3.1 数据处理框架

        Spark 以批处理为根本,并尝试在批处理之上支持流计算;在 Spark 的世界观中,万物皆批次,离线数据是一个大批次,而实时数据则是由一个一个无限的小批次组成的。所以对于流处理框架 Spark Streaming 而言,其实并不是真正意义上的“流”处理,而是“微批次” (micro-batching )处理。
        Flink 则认为,流处理才是最基本的操作,批处理也可以统一为流处理。在 Flink 的世界观中,万物皆流,实时数据是标准的、没有界限的流,而离线数据则是有界限的流。

 1.3.2 数据模型

        Spark采用RDD模型,spark streaming的DStream实际上也是一组组小批数据RDD的集合

        flink基本数据模型是数据流,以及事件event序列

1.3.3 运行时架构

        spark是批计算,将DAG划分为不同的stage,一个完成后才可以计算下一个

        flink是标准的流执行模式,一个事件在一个节点处理完后可以直接发往下一个节点进行处理

二、wordcount实例

2.1 项目依赖

<properties>
    <flink.version>1.13.0</flink.version>
    <target.java.version>1.8</target.java.version>
    <scala.binary.version>2.12</scala.binary.version>
</properties>

<dependencies>
    <!-- 引入 Flink 相关依赖-->
     <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-scala_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
</dependencies>

2.2 添加框架支持

2.3 批处理 - DataSet API

package org.example.cp2

import org.apache.flink.api.scala._

object BatchWordCount {
  def main(args: Array[String]): Unit = {
    // 1. 创建一个执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment

    // 2. 读取文本文件数据
    val lineDataSet: DataSet[String] = env.readTextFile("input/word.txt")

    // 3. 对数据集进行转换处理
    val wordAndOne: DataSet[(String, Int)] = lineDataSet.flatMap(_.split(" ")).map(x=>(x,1))

    // 4. 按照单词进行转换处理
    val wordAndOneGroup: GroupedDataSet[(String, Int)] = wordAndOne.groupBy(0)

    // 5. 对分组数据进行sum聚合统计
    val sum: AggregateDataSet[(String, Int)] = wordAndOneGroup.sum(1)

    // 6. 打印输出
    sum.print()
  }
}

这里需要导入 import org.apache.flink.api.scala._,否则会报下面的异常。

        需要注意的是,这种代码的实现方式是基于 DataSet API 的。也就是说,是把对数据的处理转换看作数据集来进行操作的。flink是一种流、批一体的处理架构,对于数据批量处理的时候底层也是数据流,所以没有必要使用 DataSet API 去进行特别的处理。

        所以官方推荐的用法是直接使用 DataStream API ,在提交任务的时候通过将执行模式设为 BATCH 来进行批处理。

2.4 有界流处理wordcount

        和批处理不一样的地方在于,流处理的执行环境使用StreamExecutionEnvironment批处理使用ExecutionEnvironment

package org.example.cp2

import org.apache.flink.api.scala.{AggregateDataSet, DataSet, GroupedDataSet}
import org.apache.flink.streaming.api.scala._

object BoundedStreamWordCount {
  def main(args: Array[String]): Unit = {
    // 1. 创建一个流式执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 2. 读取文本文件数据
    val lineDataStream: DataStream[String] = env.readTextFile("input/word.txt")

    // 3. 对数据集进行转换处理
    val wordAndOne = lineDataStream.flatMap(_.split(" ")).map(x=>(x,1))

    // 4. 按照单词进行转换处理
    val wordAndOneGroup = wordAndOne.keyBy(_._1)

    // 5. 对分组数据进行sum聚合统计
    val sum = wordAndOneGroup.sum(1)

    // 6. 打印输出
    sum.print()

    // 执行任务
    env.execute()
  }
}

2.5 无界流处理wordcount

执行API

package org.example.cp2

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object StreamWordCount {
  def main(args: Array[String]): Unit = {
    // 1. 创建一个执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 2. 读取文本文件数据
    val lineDataSet = env.socketTextStream("192.168.136.20",7777)

    // 3. 对数据集进行转换处理
    val wordAndOne = lineDataSet.flatMap(_.split(" ")).map(x=>(x,1))

    // 4. 按照单词进行转换处理
    val wordAndOneGroup = wordAndOne.keyBy(_._1)

    // 5. 对分组数据进行sum聚合统计
    val sum = wordAndOneGroup.sum(1)

    // 6. 打印输出
    sum.print()

    // 执行任务
    env.execute()
  }
}

         程序启动之后没有任何输出, 也不会退出。这是由于flink的流处理是事件驱动的,当前程序会一直处于监听状态。只有接收到数据才会执行任务、输出统计结果。

主机192.168.136.20

[root@Hadoop20 hadoop]# nc -lk 7777

        输出的结果和读取文件的流处理十分相似。每输入一条数据,就有一次对应的输出。这里的数字表示的是线程数,默认为CPU的核数。当输入的数据足够多的时候,从1-12所有的核数都会占据。

监听的端口不会写死, 可以将主机名和端口号配置在外面。在flink代码中有 parameterTool 工具用来读取。

配置端口

--host 192.168.136.20 --port 7777

// 固定端口
    val lineDataSet = env.socketTextStream("192.168.136.20",7777)

// 配置端口
    val parameterTool = ParameterTool.fromArgs(args)
    val hostname = parameterTool.get("host")
    val port = parameterTool.getInt("port")
    val lineDataStream = env.socketTextStream(hostname,port)

三、Flink部署

3.1 flink集群中的主要组件

        包括客户端(Client)、作业管理器(JobManager)和任务管理器(TaskManager)。代码编写完之后,由客户端获取并做转换,发送给JobManager,也就是Flink集群的管理者,对作业进行中央调度管理。对作业进行转换后,将任务分发给众多的TaskManager,由TaskManager对数据进行实际的处理。

详细流程:

在这里插入图片描述 

1. 启动

[root@Hadoop20 flink113]# ./bin/start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host Hadoop20.
Starting taskexecutor daemon on host Hadoop20.
[root@Hadoop20 flink113]# jps
18096 Jps
2948 NodeManager
17700 StandaloneSessionClusterEntrypoint
2117 NameNode
2773 ResourceManager
3445 RunJar
3319 RunJar
17975 TaskManagerRunner
2284 DataNode

2. 访问Web UI

启动成功后,访问192.168.136.20:8081,可以对flink集群和任务进行监控管理。

[root@Hadoop20 flink113]# ./bin/stop-cluster.sh 
Stopping taskexecutor daemon (pid: 17975) on host Hadoop20.
Stopping standalonesession daemon (pid: 17700) on host Hadoop20.

3.3 集群启动

        启动的命令和配置没有变化,需要对主从关系 masters 和 workers 进行配置。

[root@Hadoop20 flink113]# cd ./conf/
[root@Hadoop20 conf]# vim flink-conf.yaml

// 修改本机地址
jobmanager.rpc.address: 192.168.136.20

2. masters和worker

[root@Hadoop20 conf]# vim masters 
192.168.136.20:8081

[root@Hadoop20 conf]# vim workers
192.168.136.21
192.168.136.22

3. 分发安装目录

$ scp -r ./flink-1.13.0 root@xsqone21:/opt/module
$ scp -r ./flink-1.13.0 root@xsqone22:/opt/module

4. 启动集群

[root@Hadoop20 flink113]# ./bin/start-cluster.sh 

3.4  向集群提交作业

        IDEA中的默认的打包对scala代码打包效果不是很好,所以会引入打包插件。

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>3.0.0</version>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

1. webUI提交作业

2. 命令行提交作业

上传jar包

[root@Hadoop20 flink113]# ./bin/flink run -m 192.168.136.20:8081 -c org.example.cp2.StreamWordCount -p 2 ./FlinkTutorial-1.0-SNAPSHOT.jar --host 192.168.136.20 --port 7777
1. 查看当前运行的作业
[root@Hadoop20 flink113]# ./bin/flink list
Waiting for response...
No running jobs.
No scheduled jobs.

2. 查看所有的运行作业
[root@Hadoop20 flink113]# ./bin/flink list -a
Waiting for response...
No running jobs.
No scheduled jobs.

3. 取消当前作业
[root@Hadoop20 flink113]# ./bin/flink cancel [jobID]

3.5 部署模式

        flink主要有三种部署模式:会话模式、单作业模式、应用模式。

会话模式:

        先启动一个集群,保持一个会话。在这个会话中通过客户端提交作业。集群启动时所有资源都已经确定,所有提交的作业会竞争集群中的资源。

        会话模式适合单个规模小、执行时间短的大量作业。

单作业模式:

        会话模式因为资源共享会导致很多问题。为了隔离资源,为每个提交的作业启动一个集群,就是单作业模式。

        单作业模式无法直接启动,需要借助一些资源管理平台来启动集群,如yarn。

应用模式

        不管是会话模式还是单作业模式,应用代码都是在客户端执行,由客户端提交给 jobManager 。这种方式客户端需要占用大量网络带宽,用来下载依赖和向jobManager上发送数据。而往往我们提交作业用的是同一个客户端,就会加重客户端所在节点的资源消耗。

        于是应用模式直接由JobManager执行应用程序,而不是通过客户端。这意味着,我们需要为每一个提交的单独应用启动一个JobManager,也就是创建一个集群。这个JobManager只为执行这一个应用而存在,执行结束之后JobManager就会关闭。

3.6 独立模式

        独立运行,不依赖任何外部的资源管理平台,是部署flink最基本的方式。但是出现资源不足或者出现故障时,没有自动扩展或者重分配资源的保证,所以只能在开发测试等非常少的场景。

3.7 yarn模式

        yarn上的部署过程就是:客户端把flink应用提交给yarn的ResourceManager,Yarn的ResourceManager会向NodeManager申请容器。在容器上部署flink的JobManager和TaskManager实例,从而启动集群。

 启动集群

$ bin/yarn-session.sh -nm test

 可用参数解读:

-d:分离模式。即使关掉当前对话窗口,YARN session也可以在后台运行

-jm(--jobManagerMemory):配置jobManager所需内存,默认单位为MB

-nm(--name):配置在YARN UI界面上显示的任务名

-qu(--queue):指定YARN队列名

-tm(--taskManager):配置每个TaskManager所使用内存

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/20829.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

DBCO-COOH分子量:305.3,CAS:1353016-70-2,二苯基环辛炔-羧基;类似有DBCO-NH2、SH、MAL、NHS等等

中文名称&#xff1a;二苯基环辛炔-羧基 英文名称&#xff1a;DBCO-acid 英文别称&#xff1a;DBCO-COOH cas: 1353016-70-2 分子式&#xff1a;C19H15NO3 分子量&#xff1a;305.3 DBCO-COOH是DBCO 衍生化的常用构件&#xff0c;在EDC、DCC和HATU等活化剂存在下&#xf…

汇编学习教程:灵活寻址(四)

引言 在上篇博文中&#xff0c;我们学习了 [bxsi] 的灵活寻址形式&#xff0c;由此讲解了汇编中的多重循环实现。那么本篇博文中&#xff0c;我们将继续学习灵活寻址其他实现形式。 本次学习从一道编程案例开始学起。 编程示例如下&#xff1a; assume cs:code,ds:datadata…

【jvm系列-12】jvm性能调优篇---GUI工具的基本使用

JVM系列整体栏目 内容链接地址【一】初识虚拟机与java虚拟机https://blog.csdn.net/zhenghuishengq/article/details/129544460【二】jvm的类加载子系统以及jclasslib的基本使用https://blog.csdn.net/zhenghuishengq/article/details/129610963【三】运行时私有区域之虚拟机栈…

openEuler 成功适配 LeapFive InFive Poros 开发板

近日&#xff0c;openEuler RISC-V 23.03 创新版本在跃昉科技的 Poros 开发板上成功运行。 openEuler 在 Poros 上适配成功&#xff0c;XFCE 桌面启动正常&#xff0c;文件系统、终端模拟器和输入法等相关 GUI 应用也运行流畅&#xff0c;Chromium 浏览器和 LibreOffice 等应用…

大屏只用来做汇报?知道这6个应用场景,直接升职加薪!

五一假几个朋友小聚了一下&#xff0c;好久没联系了&#xff0c;现在才知道大家从事行业五花八门的。知道我从事IT行业好几年&#xff0c;他们非要让我讲讲现在异常火爆的大屏&#xff0c;说是所在企业单位都在研究这玩意儿&#xff0c;有的业务人员焦虑不已不知道如何下手&…

SD-如何训练自己的Lora模型

官方地址&#xff1a;GitHub - bmaltais/kohya_ss 尝试过mac和Ubuntu&#xff0c;装上后都会有问题 Windows按照官方步骤安装即可 第一步 git clone https://github.com/bmaltais/kohya_ss.git cd kohya_sspython -m venv venv .\venv\Scripts\activatepip install torch1.…

SpringCloud Alibaba详解

目录 微服务架构概念 服务治理 服务调用 服务网关 服务容错 链路追踪 SpringcloudAlibaba组件 Nacos 负载均衡 Ribbon Fegin Sentinel 高并发测试 容错方案 Sentinel入门 Feign整合Sentinel 微服务架构概念 服务治理 服务治理就是进行服务的自动化管理&#xf…

pod的基本介绍| harbor仓库的搭建 tomcat镜像拉取

pod的基本介绍| harbor仓库的搭建 tomcat镜像拉取 一 Pod基础概念&#xff1a;二 通常把Pod分为两类&#xff1a;三 Pod容器的分类&#xff1a;四 应用容器&#xff08;Maincontainer&#xff09;五 镜像拉取策略&#xff08;image PullPolicy&#xff09;六 部署 harbor 创建私…

SpringMVC高手进阶

&#x1f648;作者简介&#xff1a;练习时长两年半的Java up主 &#x1f649;个人主页&#xff1a;程序员老茶 &#x1f64a; ps:点赞&#x1f44d;是免费的&#xff0c;却可以让写博客的作者开兴好久好久&#x1f60e; &#x1f4da;系列专栏&#xff1a;Java全栈&#xff0c;…

解密Netty中的Reactor模式

文章目录 单线程Reactor模式多线程Reactor模式Reactor模式中IO事件的处理流程Netty中的通道ChannelNetty中的反应器ReactorNetty中的处理器HandlerNetty中的通道Channel和处理器Handler的协作组件Pipeline Reactor(反应器)模式是高性能网络编程在设计和架构方面的基础模式.Doug…

Science文章复现(Python):图1 - Aircraft obs(机载的观测 CO2)

之前有写过science文章后处理的复现Science文章复现&#xff08;Python&#xff09;&#xff1a;在机载观测中明显的强烈南大洋碳吸收 在这里是针对图细节的理解&#xff1a; 首先需要下载这个项目 https://github.com/NCAR/so-co2-airborne-obs 这里的环境配置会比较麻烦 con…

00后卷起来,真没我们老油条什么事了···

都说00后躺平了&#xff0c;但是有一说一&#xff0c;该卷的还是卷。 这不&#xff0c;前段时间我们公司来了个00后&#xff0c;工作没两年&#xff0c;跳槽到我们公司起薪20K&#xff0c;都快接近我了。后来才知道人家是个卷王&#xff0c;从早干到晚就差搬张床到工位睡觉了。…

【刷题之路】LeetCode 232. 用栈实现队列

【刷题之路】LeetCode 232. 用栈实现队列 一、题目描述二、解题1、图解主要思路2、先实现栈3、实现各个接口3.1、初始化接口3.2、入队接口3.3、出队接口3.4、取队头接口3.5、判空接口3.6、释放接口 一、题目描述 原题连接&#xff1a; 232. 用栈实现队列 题目描述&#xff1a;…

网站测试的主要方法

网站测试的主要方法 网站测试是保证网站质量的重要手段&#xff0c;通过对网站进行测试可以及时发现问题并修复&#xff0c;提高用户体验和网站的可靠性。本文将介绍网站测试的主要方法。 1.功能测试&#xff1a;测试网站的所有功能是否正常。通过模拟用户的操作&#xff0c;确…

在外包干了三年,我废了……不吹不黑!

没错&#xff0c;我也干过外包&#xff0c;一干就是三年&#xff0c;三年后&#xff0c;我废了…… 虽说废的不是很彻底&#xff0c;但那三年我几乎是出差了三年、玩了三年、荒废了三年&#xff0c;那三年&#xff0c;我的技术能力几乎是零成长的。 说起这段三年的外包经历&a…

文档管理-gitlab+markdown网页插件

特点 使用git进行版本管理&#xff0c;本地编辑使用Typora。使用gitlab进行权限管理可以在线阅读通过Markdown在线阅读插件实现&#xff0c;可显示目录显示与链接跳转&#xff0c;界面优于自带的wiki。 与其他方式对比 gitlab的wiki&#xff1a;显示界面效果不好&#xff0c…

【数据结构】栈及其实现

目录 &#x1f920;前言 什么是栈&#xff1f; 栈的定义及初始化 栈的定义 栈的初始化 栈的判空 栈顶压栈 栈顶出栈 栈的数据个数 栈的销毁 完整代码 总结 &#x1f920;前言 学了相当长一段时间的链表&#xff0c;总算是跨过了一个阶段。从今天开始我们将进入栈和…

[IOT物联网]Python快速上手开发物联网上位机程序——前言

一、什么是Python Python是一种简单易学、高级、通用的编程语言。它是一种解释型语言&#xff0c;不需要编译即可运行&#xff0c;因此可以快速地进行开发和测试。Python具有简洁优美的语法&#xff0c;使用它可以提高生产力和代码可读性。Python拥有强大的标准库和第三方库&am…

Linux Shell 实现一键部署virtualbox

VirtualBox 前言 VirtualBox 是一款开源虚拟机软件。VirtualBox 是由德国 Innotek 公司开发&#xff0c;由Sun Microsystems公司出品的软件&#xff0c;使用Qt编写&#xff0c;在 Sun 被 Oracle 收购后正式更名成 Oracle VM VirtualBox。Innotek 以 GNU General Public Licens…

孙鑫VC++第四章 1.简单绘图-MFC消息映射机制

1. MFC消息映射机制 接下来将剖析MFC消息映射机制&#xff0c;探讨发送给窗口的消息是如何被MFC框架通过窗口句柄映射表和消息映射表来用窗口类的处理函数进行响应的。另外&#xff0c;还将讲述“类向导”这一工具的运用&#xff0c;讨论设备描述表及其封装类CDC的使用&#x…