首页 > 编程学习 > flink 入门(一)

flink 入门(一)

发布时间:2022/10/1 4:32:45

flink 入门(一)

image-20220930110928773

简介

​ 阅读目标:

  • 本文为入门级别文章,即阅读完下文你需要简单的知道 flink 是做什么用的,他的主要特点是什么。工欲善其事必先利其器更深入的了解,待熟练后再回头看看。

  • 简而言之flink就是一个框架,你在框架里面编写代码(接收从某处来的数据->数据处理/转换->将处理好的数据输出到某地),将编写好的代码交给flink集群,由集群取调度任务去处理

  • 阅读并实践本文可能会存在某些问题,你还需要阅读其他文章/博客加深对flink的理解(如下文中提到的某些概念:有界、无界等等

    实际是因为我懒得写了。。。

​ Flink 起源于一个叫作 Stratosphere 的项目,它是由 3 所地处柏林的大学和欧洲其他一些大 学在 2010~2014 年共同进行的研究项目,由柏林理工大学的教授沃克尔·马尔科(Volker Markl) 领衔开发。2014 年 4 月,Stratosphere 的代码被复制并捐赠给了 Apache 软件基金会,Flink 就 是在此基础上被重新设计出来的。 在德语中,“flink”一词表示“快速、灵巧”。项目的 logo 是一只彩色的松鼠,当然了, 这不仅是因为 Apache 大数据项目对动物的喜好(是否联想到了 Hadoop、Hive?),更是因为 松鼠这种小动物完美地体现了“快速、灵巧”的特点。关于 logo 的颜色,还一个有趣的缘由: 柏林当地的松鼠非常漂亮,颜色是迷人的红棕色;而 Apache 软件基金会的 logo,刚好也是一 根以红棕色为主的渐变色羽毛。于是,Flink 的松鼠 Logo 就设计成了红棕色,而且拥有一个漂 亮的渐变色尾巴,尾巴的配色与 Apache 软件基金会的 logo 一致。这只松鼠色彩炫目,既呼应 了 Apache 的风格,似乎也预示着 Flink 未来将要大放异彩。

​ Flink 的官网主页地址:https://flink.apache.org/ 在 Flink 官网主页的顶部可以看到,项目的核心目标,是“数据流上的有状态计算”(Stateful Computations over Data Streams)。

​ 很多专业词汇,我们从中至少可以提炼出一些容易理解的信息:Flink 是一个“框 架”,是一个数据处理的“引擎”;既然是“分布式”,当然是为了应付大规模数据的应用场景 了;另外,Flink 处理的是数据流。所以,Flink 是一个流式大数据处理引擎。 而“内存执行速度”和“任意规模”,突出了 Flink 的两个特点:速度快、可扩展性强— —这说的自然就是小松鼠的“快速”和“灵巧”了。

java 开发案例

以下案例为环境jdk1.8,且以下案例均为展示使用,目的是为了明白这两种方式的区别以及基本使用

  • jdk 1.8
  • maven
  • win10
  • flink 1.15.2

以下示例代码仅做入门级别使用,非生产可用。

  • pom文件

    <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <java.version>1.8</java.version>
    
            <flink.version>1.15.2</flink.version>
            <target.java.version>1.8</target.java.version>
            <scala.binary.version>2.12</scala.binary.version>
            <log4j.version>2.17.1</log4j.version>
    </properties>
    
    <dependencies>
            <!-- Apache Flink dependencies -->
            <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
    
            <!-- connector kafka-->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <!--  在此处 我添加了分词  -->
            <dependency>
                <groupId>org.ansj</groupId>
                <artifactId>ansj_seg</artifactId>
                <version>5.1.6</version>
            </dependency>
    
    
            <!-- Add logging framework, to produce console output when running in the IDE. -->
            <!-- These dependencies are excluded from the application JAR by default. -->
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-slf4j-impl</artifactId>
                <version>${log4j.version}</version>
                <scope>runtime</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-api</artifactId>
                <version>${log4j.version}</version>
                <scope>runtime</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-core</artifactId>
                <version>${log4j.version}</version>
                <scope>runtime</scope>
            </dependency>
        </dependencies>
    

Streaming(无界)

这里可以简单的理解为源源不断的数据,需要不断监听某个消息队列(kafka)或者其他来源。

    public static final String HOST = "192.168.20.127";
    public static final Integer PORT = 8888;

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> source = environment.socketTextStream(HOST, PORT);

        SingleOutputStreamOperator<Tuple2<String, Long>> wordsCollector = source.flatMap((String line, Collector<Tuple2<String, Long>> collector) -> {
            String[] words = line.split(" ");
            for (String word : words) {
                collector.collect(new Tuple2<String, Long>(word, 1L));
            }

        }).returns(Types.TUPLE(Types.STRING, Types.LONG));


        SingleOutputStreamOperator<Tuple2<String, Long>> sum = wordsCollector.keyBy(0).sum(1);

        sum.print();

        environment.execute();
    }

Batch(有界)

这里可以简单的理解为批量数据处理。

kafka

  • 运行类

    public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
    
            //new 一个实例!
            Properties properties = new Properties();
    
            //告诉程序我们要接收那台机器上生产的数据
            properties.setProperty("bootstrap.servers", "master:9092");
    
            //告诉程序开启分区,已经分区名称
            properties.setProperty("group.id", "temp-1");
    
            //属性key.serializer和value.serializer就是key和value指定的序列化方式。
            properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
            //读取kafka数据的时候需要指定消费策略,如果不指定会使用auto.offset.reset设置
            //earliest当各分区下有已提交的offset时,从提交的offset开始消费;
            //无提交的offset时,从头开始消费;
            //latest,当各分区下有已提交的offset时,从提交的offset开始消费;
            //无提交的offset时,消费新产生的该分区下的数据;
            //none,topic各分区都存在已提交的offset时,从offset后开始消费;
            //只要有一个分区不存在已提交的offset,则抛出异常
            properties.setProperty("auto.offset.reset", "earliest");
    
            //enable.auto.commit 的默认值是 true;就是默认采用自动提交的机制。
            properties.setProperty("enable.auto.commit", "false");
    
            //如果FlinkKafkaConsumer没有开启checkpoint功能,为了不重复读取
            //这种方式无法实现Exactly-Once(只执行一次)
            FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer("test_topic", new SimpleStringSchema(), properties);
    
            DataStreamSource<String> lines = environment.addSource(flinkKafkaConsumer);
    
            SingleOutputStreamOperator<Tuple2<String, Long>> sum = lines.flatMap((String line, Collector<Tuple2<String, Long>> collector) -> {
    
                List<Term> terms = ToAnalysis.parse(line).getTerms();
    
                terms.forEach(item -> {
                    collector.collect(new Tuple2<>(item.getName(), 1L));
                });
    
    
            }).returns(Types.TUPLE(Types.STRING, Types.LONG)).keyBy(0).sum(1);
    
    
            sum.print();
    
            environment.execute("word-coun-kafka");
        }
    

任务提交

提交有两种方式

  • web-ui界面

    1. 访问部署服务器 ip:8081

    2. 点击 Submit new Job

      image-20220930110400414

    3. 点击Add new

    4. 编辑Entry class与Parallelism等

      1. Entry class 为入口类 即为上文中的运行main()函数的类的全限定名

      image-20220930110446236

    5. 点击Submit

    6. 点击Jobs -> Running Jobs 查看

      image-20220930110504840

  • 命令行

flink 安装与部署

  • Flink的安装和部署主要分为本地模式和集群模式,其中本地模式只需直接解压就可以使用,不以修改任何参数,一般在做一些简单测试的时候使用。

  • 集群模式包含Standalone、Flink on Yarn等模式,适合在生产环境下面使用,且需要修改对应的配置 参数。

flink 下载

## 官方版本(可能下载速度慢)
curl -O https://dlcdn.apache.org/flink/flink-1.15.2/flink-1.15.2-bin-scala_2.12.tgz
## 腾讯云镜像(推荐,国内速度快)
curl -O http://mirrors.cloud.tencent.com/apache/flink/flink-1.15.2/flink-1.15.2-bin-scala_2.12.tgz

下载完成解压,解压后目录如下

image-20220929104102078

CentOS/Kernel环境

系统环境

以下均基于 Kernel

  • CentOS Linux release 7.9.2009 (Core)
  • Linux version 3.10.0-1160.el7.x86_64
  • gcc version 4.8.5 20150623 (Red Hat 4.8.5-44) (GCC)
  • open-jdk 11
  • 大部分过程中使用root用户。请在生产环境或特殊环境注意用户切换。本文不在linux用户做过多赘述。

本地模式

自己是jobmanager也是taskmanager

  1. 配置文件详解

    1. 修改conf/flink-conf.yaml
    cd conf
    vim flink-conf.yaml
    
    # 此处修改集群时需要修改
    jobmanager.rpc.address: localhost
    # 默认1623
    jobmanager.rpc.port: 6123
    # 任务管理默认
    jobmanager.memory.process.size: 1600m
    taskmanager.memory.process.size: 1728m
    # 任务槽 资源(并行执行 相当于 组)
    taskmanager.numberOfTaskSlots: 1
    # 默认并行度
    parallelism.default: 1
    # web界面默认端口 需要修改时 解开注释
    #rest.port: 8081
    
    1. master

      当前jobmanager(默认localhost)以及webui端口(默认8081)

    2. works

      单节点启动默认这里面没有东西

  2. 启动脚本

    # 进入flink bin目录
    cd bin
    # 单节点集群启动
    ./start-cluster.sh
    
  3. 访问服务器ip加8081(默认)

  4. 停止服务

    # 进入flink bin目录
    cd bin
    # 单节点集群启动
    ./stop-cluster.sh
    

集群

至少需要三台服务器。一台jobmanager,两台taskmanager,三台服务器之间需要配置免密登录,这里为了方便,我修改了hosts文件,三台服务器分别为

masterslave0slave1

  1. 修改hosts(ip地址 主机名/域名 (主机别名))

    自己的服务器IP-1 master
    自己的服务器IP-2 slave0
    自己的服务器IP-3 slave1
    

    使配置文件生效请参考 CentOS修改hosts

  2. 服务器之间免密登录

    请自行百度/google

  3. 修改配置文件

    • master 服务器

      • flink-conf.yaml

        # 用于节点间通信
        jobmanager.rpc.address: 0.0.0.0
        
      • master

        master:8081
        
      • works

        # 另外两台机器
        slave0
        slave1
        
    • slave0 服务器

      • flink-conf.yaml

        jobmanager.rpc.address: master
        # 不改此处 集群运行后 solt为0
        jobmanager.bind-host: 0.0.0.0
        
      • master

        master:8081
        
      • works

        slave0
        slave1
        
    • slave1 服务器

      • flink-conf.yaml

        jobmanager.rpc.address: master
        # 不改此处 集群运行后 solt为0
        jobmanager.bind-host: 0.0.0.0
        
      • master

        master:8081
        
      • works

        slave0
        slave1
        
  4. 运行集群

    master bin目录下执行,看到以下几截图后集群启动成功,即可访问webUI界面

    ./start-cluster.sh
    

    image-20220929151931120

    且执行jps命令后

    image-20220929152033006

    slave0slave1执行jps

    image-20220929152141082

    web ui 界面

    image-20220929152941340

jdk安装(多版本切换)

## 下载openjdkjdk
curl -O https://download.java.net/openjdk/jdk11/ri/openjdk-11+28_linux-x64_bin.tar.gz

## 解压
tar zxf openjdk-11+28_linux-x64_bin.tar.gz

## 添加jdk11  /opt/openjdk11/jdk-11/ 应为压缩包实际解压路径
sudo update-alternatives --install /usr/bin/java java /home/flink/opt/jdk-11/bin/java 1

## 添加jdk11  /opt/openjdk11/jdk-11/ 应为压缩包实际解压路径
sudo update-alternatives --install /usr/bin/javac javac /home/flink/opt/jdk-11/bin/javac 1

## 切换
sudo update-alternatives --config java
sudo update-alternatives --config javac

docker for windows

windows 10 专业版 21H2 WSL2

下载docker-desktop docker 历史版本

运行Docker Desktop Installer.exe

参考链接

用户提权

su
chmod -v u+w /etc/sudoers
vim /etc/sudoers

root       ALL=(ALL)           ALL


chmod -v u-w /etc/sudoers

exit

centos镜像

阿里云centos镜像

北京外国语大学开源镜像

Vmware

VMWARE

VMWARE 秘钥以及安装

VMWARE TOOLS

其他参考

Storm入门 3

Flink从入门到入土(详细教程)

JDK11下载界面

flink下载界面

flink官方安装教程

flink-streaming-platform-web

flink国内镜像 腾讯云

flink-kafka

Copyright © 2010-2022 mfbz.cn 版权所有 |关于我们| 联系方式|豫ICP备15888888号