【极数系列】Flink集成DataSource读取文件数据(08)

文章目录

  • 01 引言
  • 02 简介概述
  • 03 基于文件读取数据
    • 3.1 readTextFile(path)
    • 3.2 readFile(fileInputFormat, path)
    • 3.3 readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)
    • 3.4 实现原理
    • 3.5 注意事项
    • 3.6 支持读取的文件形式
  • 04 源码实战demo
    • 4.1 pom.xml依赖
    • 4.2 创建文件数据流作业
    • 4.3 运行程序查看日志

01 引言

源码地址,一键下载可用:https://gitee.com/shawsongyue/aurora.git
模块:aurora_flink
主类:FlinkFileSourceJob(文件)

02 简介概述

1.Source 是Flink程序从中读取其输入数据的地方。你可以用 StreamExecutionEnvironment.addSource(sourceFunction) 将一个 source 关联到你的程序。

2.Flink 自带了许多预先实现的 source functions,不过你仍然可以通过实现 SourceFunction 接口编写自定义的非并行 source。

3.也可以通过实现 ParallelSourceFunction 接口或者继承 RichParallelSourceFunction 类编写自定义的并行 sources。

03 基于文件读取数据

3.1 readTextFile(path)

读取文本文件,例如遵守 TextInputFormat 规范的文件,逐行读取并将它们作为字符串返回。

3.2 readFile(fileInputFormat, path)

按照指定的文件输入格式读取(一次)文件。

3.3 readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)

这是前两个方法内部调用的方法。它基于给定的 fileInputFormat 读取路径 path 上的文件。根据提供的 watchType 的不同,source 可能定期(每 interval 毫秒)监控路径上的新数据(watchType 为 FileProcessingMode.PROCESS_CONTINUOUSLY),或者处理一次当前路径中的数据然后退出(watchType 为 FileProcessingMode.PROCESS_ONCE)。使用 pathFilter,用户可以进一步排除正在处理的文件。

3.4 实现原理

底层Flink 将文件读取过程拆分为两个子任务,即 目录监控数据读取。每个子任务都由一个单独的实体实现。监控由单个非并行(并行度 = 1)任务实现,而读取由多个并行运行的任务执行。后者的并行度和作业的并行度相等。单个监控任务的作用是扫描目录(定期或仅扫描一次,取决于 watchType),找到要处理的文件,将它们划分为 分片,并将这些分片分配给下游 reader。Reader 是将实际获取数据的角色。每个分片只能被一个 reader 读取,而一个 reader 可以一个一个地读取多个分片。

3.5 注意事项

(1)如果 watchType 设置为 FileProcessingMode.PROCESS_CONTINUOUSLY,当一个文件被修改时,它的内容会被完全重新处理。这可能会打破 “精确一次” 的语义,因为在文件末尾追加数据将导致重新处理文件的所有内容。

(2)如果 watchType 设置为 FileProcessingMode.PROCESS_ONCE,source 扫描一次路径然后退出,无需等待 reader 读完文件内容。当然,reader 会继续读取数据,直到所有文件内容都读完。关闭 source 会导致在那之后不再有检查点。这可能会导致节点故障后恢复速度变慢,因为作业将从最后一个检查点恢复读取。

3.6 支持读取的文件形式

1.本地文件

2.HDFS文件

3.文件夹

4.压缩文件

04 源码实战demo

4.1 pom.xml依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.xsy</groupId>
    <artifactId>aurora_flink</artifactId>
    <version>1.0-SNAPSHOT</version>

    <!--属性设置-->
    <properties>
        <!--java_JDK版本-->
        <java.version>11</java.version>
        <!--maven打包插件-->
        <maven.plugin.version>3.8.1</maven.plugin.version>
        <!--编译编码UTF-8-->
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <!--输出报告编码UTF-8-->
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <!--json数据格式处理工具-->
        <fastjson.version>1.2.75</fastjson.version>
        <!--log4j版本-->
        <log4j.version>2.17.1</log4j.version>
        <!--flink版本-->
        <flink.version>1.18.0</flink.version>
        <!--scala版本-->
        <scala.binary.version>2.11</scala.binary.version>
        <!--log4j依赖-->
        <log4j.version>2.17.1</log4j.version>
    </properties>

    <!--通用依赖-->
    <dependencies>

        <!-- json -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>


        <!--================================集成外部依赖==========================================-->
        <!--集成日志框架 start-->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>${log4j.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>${log4j.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>${log4j.version}</version>
        </dependency>

        <!--集成日志框架 end-->
    </dependencies>

    <!--编译打包-->
    <build>
        <finalName>${project.name}</finalName>
        <!--资源文件打包-->
        <resources>
            <resource>
                <directory>src/main/resources</directory>
            </resource>
            <resource>
                <directory>src/main/java</directory>
                <includes>
                    <include>**/*.xml</include>
                </includes>
            </resource>
        </resources>

        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>org.google.code.flindbugs:jar305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <excluder>org.apache.logging.log4j:*</excluder>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>org.xsy.sevenhee.flink.TestStreamJob</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>

        <!--插件统一管理-->
        <pluginManagement>
            <plugins>
                <!--maven打包插件-->
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                    <version>${spring.boot.version}</version>
                    <configuration>
                        <fork>true</fork>
                        <finalName>${project.build.finalName}</finalName>
                    </configuration>
                    <executions>
                        <execution>
                            <goals>
                                <goal>repackage</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>

                <!--编译打包插件-->
                <plugin>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>${maven.plugin.version}</version>
                    <configuration>
                        <source>${java.version}</source>
                        <target>${java.version}</target>
                        <encoding>UTF-8</encoding>
                        <compilerArgs>
                            <arg>-parameters</arg>
                        </compilerArgs>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>

    <!--配置Maven项目中需要使用的远程仓库-->
    <repositories>
        <repository>
            <id>aliyun-repos</id>
            <url>https://maven.aliyun.com/nexus/content/groups/public/</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>

    <!--用来配置maven插件的远程仓库-->
    <pluginRepositories>
        <pluginRepository>
            <id>aliyun-plugin</id>
            <url>https://maven.aliyun.com/nexus/content/groups/public/</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </pluginRepository>
    </pluginRepositories>

</project>

4.2 创建文件数据流作业

package com.aurora.source;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
 * @description flink的文件source应用
 * @author 浅夏的猫
 * @datetime 23:03 2024/1/28
*/
public class FlinkFileSourceJob {

    private static final Logger logger = LoggerFactory.getLogger(FlinkFileSourceJob.class);

    public static void main(String[] args) throws Exception {

        //1.创建Flink运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.设置Flink运行模式:
        //STREAMING-流模式,BATCH-批模式,AUTOMATIC-自动模式(根据数据源的边界性来决定使用哪种模式)
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //3.基于文件的source使用(本地/HDFS文件/文件夹/压缩文件)
        //3.1本地文件
        DataStreamSource<String> dataStreamSourceFile = env.readTextFile("E:\\project\\aurora_dev\\aurora_flink\\src\\main\\resources\\application.properties");

        //3.2 HDFS文件,前提你已经搭建环境
//        DataStreamSource<String> dataStreamSourceHdfs = env.readTextFile("hdfs://localhost:8020//source/application.txt");

        //3.3文件夹
        DataStreamSource<String> dataStreamSourceDir = env.readTextFile("E:\\project\\aurora_dev\\aurora_flink\\src\\main\\resources");


        //3.4压缩文件
        DataStreamSource<String> dataStreamSourceRar = env.readTextFile("E:\\project\\aurora_dev\\aurora_flink\\src\\main\\resources\\test.rar");

        //4.输出打印
        dataStreamSourceFile.print();
//        dataStreamSourceHdfs.print();
        dataStreamSourceDir.print();
        dataStreamSourceRar.print();

        //5.启动运行
        env.execute();
    }

}

4.3 运行程序查看日志

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/357916.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

ssl证书更换步骤及更换后有效期没有更新问题

因公司ssl证书到期&#xff0c;在阿里云申请免费证书更换后&#xff0c;查看证书有效期&#xff0c;发现有效期没有更新。 ssl证书更换步骤&#xff1a; 1.下载nginx证书文件 2.服务器上替换原有ssl证书&#xff08;操作前记得备份&#xff09; 3.更改nginx.conf文件中证书路径…

CES 2024:AI赋能机器人,国产机器人更亮眼

原创 | 文 BFT机器人 一年一度的国际消费电子展(CES)又与我们见面了。作为全球消费电子和科技创新的盛会&#xff0c;CES每年都吸引着无数目光。今年&#xff0c;AI赋能机器人成为展会的一大亮点&#xff0c;而国产机器人更是凭借其创新技术和实用功能&#xff0c;成为全场焦点…

ES Serverless让日志检索更加便捷

前言 在项目中,或者开发过程中,出现bug或者其他线上问题,开发人员可以通过查看日志记录来定位问题。通过日志定位 bug 是一种常见的软件开发和运维技巧,只有观察日志才能追踪到具体代码。在软件开发过程中,开发人员会在代码中添加日志记录,以记录程序的运行情况和异常信…

1.Mybatis入门

目录 前言 1入门 1.1 入门程序实现 1.2 数据准备 ​编辑 1.3 配置Mybatis 1.4 编写SQL语句 1.5 单元测试 1.6 解决SQL警告与提示 2. JDBC介绍(了解) 2.1 介绍 2.2 代码 2.3 问题分析 2.4 技术对比 3. 数据库连接池 3.1 介绍 3.2 产品 4. lombok 4.1 介绍 4.…

07-Nacos-接入Mysql实现持久化

1、默认内嵌的数据库 Derby 存于/data目录 2、扩展仅支持Mysql 5.6.5 执行Nacos中的SQL脚本&#xff0c;该脚本是Nacos-server文件夹中的nacos-mysql.sql 详见 01-Nacos源码打包、部署-CSDN博客 3、修改配置文件 Nacos-server中的conf目录下&#xff0c;application.proper…

ChatGPT更新了Mention功能,集结若干GPTs作战,AI智能体的心智入口;向量数据库的挑战和未来

&#x1f989; AI新闻 &#x1f680; ChatGPT更新了Mention功能&#xff0c;集结若干GPTs作战&#xff0c;AI智能体的心智入口 摘要&#xff1a;OpenAI在ChatGPT中引入了一个新功能&#xff0c;允许用户在聊天时任意一个GPTs&#xff08;即ChatGPT最新推出的AI Agent 智能应用…

2023量子科技十大用例 | 光子盒年度系列

随着量子科技的不断突破&#xff0c;量子计算、量子通信、量子测量等应用场景逐渐向纵深拓展&#xff0c;量子产业呈现出较好的发展势头。 量子计算的发展比以往任何时候都更加迅速&#xff0c;这提醒我们&#xff0c;这项看似‘高冷’的前沿科技&#xff0c;已悄然应用于不少领…

vue3 + antd 封装动态表单组件(三)

传送带&#xff1a; vue3 antd 封装动态表单组件&#xff08;一&#xff09; vue3 antd 封装动态表单组件&#xff08;二&#xff09; 前置条件&#xff1a; vue版本 v3.3.11 ant-design-vue版本 v4.1.1 我们发现ant-design-vue Input组件和FormItem组件某些属性支持slot插…

【RT-DETR有效改进】2024.1最新MFDS-DETR的HS-FPN改进特征融合层(降低100W参数,全网独家首发)

👑欢迎大家订阅本专栏,一起学习RT-DETR👑 一、本文介绍 本文给大家带来的改进机制是最近这几天最新发布的改进机制MFDS-DETR提出的一种HS-FPN结构,其是一种为白细胞检测设计的网络结构,主要用于解决白细胞数据集中的多尺度挑战。它的基本原理包括两个关键部分:特征…

[Linux]:软硬连接(什么是软硬链接,怎么创建软硬链接,以及对应的例子)

目录 软连接&#xff1a; 什么是软连接&#xff1a; 怎么创建软连接&#xff1a; 例子&#xff1a; 硬链接&#xff1a; 什么是硬链接&#xff1a; 怎么创建硬链接&#xff1a; 例子&#xff1a; 软连接&#xff1a; 什么是软连接&#xff1a; 软连接文件是一个独立的…

如何在群晖NAS部署office服务实现多人远程协同办公编辑文档

文章目录 本教程解决的问题是&#xff1a;1. 本地环境配置2. 制作本地分享链接3. 制作公网访问链接4. 公网ip地址访问您的分享相册5. 制作固定公网访问链接 本教程解决的问题是&#xff1a; 1.Word&#xff0c;PPT&#xff0c;Excel等重要文件存在本地环境&#xff0c;如何在编…

Adobe Photoshop 2024 v25.4.0 - 专业的图片设计软件

Adobe Photoshop 2024 v25.4.0更新了&#xff0c;从照片编辑和合成到数字绘画、动画和图形设计&#xff0c;任何您能想象到的内容都能通过PS2024轻松实现。 利用人工智能技术进行快速编辑。学习新技能并与社区分享您的工作。借助我们的最新版本&#xff0c;做令人惊叹的事情从未…

2023年全国职业院校技能大赛(高职组)“云计算应用”赛项赛卷9

某企业根据自身业务需求&#xff0c;实施数字化转型&#xff0c;规划和建设数字化平台&#xff0c;平台聚焦“DevOps开发运维一体化”和“数据驱动产品开发”&#xff0c;拟采用开源OpenStack搭建企业内部私有云平台&#xff0c;开源Kubernetes搭建云原生服务平台&#xff0c;选…

生物信息学高质量刊物

个人觉得生信期刊的水平排名&#xff08;只谈计算方法&#xff09; 1&#xff0c;Nature Biotechnology (Article) 生信人心中的梦之刊&#xff0c;很少有纯计算能上的&#xff0c;一般都需要一定的湿实验验证&#xff0c;在计算领域某些场合认可度甚至高于正刊。 2&#xf…

【pdf密码】怎么打印加密的PDF文件?

PDF文件是可以打开查看的&#xff0c;但是现在不能编辑、不能打印&#xff0c;功能栏中的功能都是灰色的&#xff0c;这种设置了加密的PDF文件该如何加密&#xff1f; 如果PDF中的大多数功能按钮以及打印按钮都是灰色的状态&#xff0c;那就证明是文件的问题导致不能打印的。 …

FC-135 / FC-135 TYPE 贴片晶振

描述 FC135是一种被广泛采用的32.768 kHz晶体单元&#xff0c;自2002年发布以来已在全球范围内使用。 理想的单片机子时钟和模块&#xff0c;从消费设备到工业设备的应用。如果温度范围至105.C&#xff0c;请与我们联系。 爱普生是千赫波段晶体单元的领先供应商&#xff0c;…

力扣hot100 柱状图中最大的矩形 单调栈

Problem: 84. 柱状图中最大的矩形 文章目录 思路复杂度Code 思路 &#x1f468;‍&#x1f3eb; 参考地址 复杂度 时间复杂度: O ( n ) O(n) O(n) 空间复杂度: O ( n ) O(n) O(n) Code class Solution {public static int largestRectangleArea(int[] height){Stack&l…

Python入门到精通(六)——Python函数进阶

Python函数进阶 一、函数的多返回值 二、函数多种传参方式 1、位置参数 2、关键字参数 3、缺省参数 4、不定长参数 &#xff08;1&#xff09;位置传递 &#xff08;2&#xff09;关键字传递 三、匿名函数 &#xff08;1&#xff09;函数作为参数传递 &#xff08;2&…

Java面试架构篇【一览众山小】

文章目录 &#x1f6a1; 简介☀️ Spring&#x1f425; 体系结构&#x1f420; 生命周期 &#x1f341; SpringMVC&#x1f330; 执行流程 &#x1f31c; SpringBoot&#x1f30d; 核心组件&#x1f38d; 自动装配&#x1f391; 3.0升级 &#x1f505; spring Cloud Alibaba&am…

docker+jekins+maven+ssh 持续集成交付部署 jar包

一. docker环境搭建&#xff0c;此处略过。 二. docker部署jekins 2.1 拉取镜像&#xff0c;挂载工作目录,xxxx为宿主机指定工作目录 docker pull jenkins/jenkins docker run -d -p 8080:8080 -p 50000:50000 --name jenkins --privilegedtrue -v xxxxxxxxxx:/var/jenkins…
最新文章