【天衍系列 01】深入理解Flink的 FileSource 组件:实现大规模数据文件处理

文章目录

  • 01 基本概念
  • 02 工作原理
  • 03 数据流实现
  • 04 项目实战
    • 4.1 项目结构
    • 4.2 maven依赖
    • 4.3 StreamFormat读取文件数据
    • 4.4 BulkFormat读取文件数据
    • 4.5 使用小结
  • 05 数据源比较
  • 06 总结

01 基本概念

Apache Flink 是一个流式处理框架,被广泛应用于大数据领域的实时数据处理和分析任务中。在 Flink 中,FileSource 是一个重要的组件,用于从文件系统中读取数据并将其转换为 Flink 的数据流。本文将深入探讨 FileSource 的工作原理、用法以及与其他数据源的比较。

02 工作原理

FileSource 是 Flink 提供的一种用于从文件系统中读取数据的源。它能够处理各种类型的文件,包括文本文件、压缩文件、序列文件等。FileSource 的工作原理可以概括为以下几个步骤:

1.文件分配(File Assignment)

在 Flink 集群中,每个任务都会负责读取文件的一个分片。FileSource 会根据文件的大小和数量将文件分配给不同的任务进行处理。

2.并行读取(Parallel Reading)

每个任务会并行地读取分配给它的文件分片。这意味着文件中的数据会被同时读取,从而提高了整体的读取速度和处理效率。

3.数据解析(Data Parsing)

读取的数据会经过解析器进行解析,将其转换为 Flink 中的数据结构,如 DataSet 或 DataStream。

4.数据分发(Data Distribution)

解析后的数据会被分发到后续的算子中进行进一步的处理和分析。

03 数据流实现

  • 有界流(Bounded Streams)

    有界流是指具有明确结束点的数据流,即数据流在某个时刻会结束,数据量是有限的。例如,从静态文件、数据库或有限数据集中读取的数据流就是有界流。有界流的特点包括:

    • 数据量是有限的,流的结束点是已知的。
    • 可以对整个数据流进行批处理式的分析和处理,因为所有数据都可用且有限。
    • 可以使用批处理算法和优化技术,例如排序、分组聚合等。
  • 无界流(Unbounded Streams)

    无界流是指没有明确结束点的数据流,即数据流会持续不断地产生,数据量可能是无限的。例如,实时传感器数据、日志流、消息队列中的数据等都是无界流。无界流的特点包括:

    • 数据源持续不断地产生数据,流没有明确的结束点。
    • 通常用于实时流式处理,要求系统能够实时处理数据并在流中进行持续的分析和计算。
    • 需要采用流式处理的技术和算法,例如窗口计算、流式聚合、事件时间处理等。
  • 不同数据流实现

    • 创建一个 File Source 时, 默认情况下,Source 为有界/批的模式;

      //创建一个FileSource数据源,并设置为批模式,读取完文件后结束
      final FileSource<String> source = FileSource.forRecordStreamFormat(...)
              .build();
      
    • 设置参数monitorContinuously(Duration.ofMillis(5)) 可以把 Source 设置为持续的流模式

      //创建一个FileSource数据源,并设置为流模式,每隔5分钟检查路径新文件,并读取
      final FileSource<String> source = FileSource.forRecordStreamFormat(...)
              .monitorContinuously(Duration.ofMillis(5))  
              .build();   
      

04 项目实战

1.FileSource支持多种数据格式数据读取与解析,本期以Text File文件为例展开。
2.jdk版本11
3.Flink版本1.18.0
4.下面是两个简单的示例代码,演示如何在 Flink 中使用 FileSource 读取文件数据

4.1 项目结构

在这里插入图片描述

4.2 maven依赖

<!-- flink读取Text File文件依赖 start-->
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-connector-files</artifactId>
	<version>1.18.0</version>
</dependency>
<!-- flink读取Text File文件依赖 end-->

<!-- flink基础依赖 start -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.18.0</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_2.12</artifactId>
    <version>1.18.0</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>1.18.0</version>
</dependency>

<!-- flink基础依赖 end -->

4.3 StreamFormat读取文件数据

  • StreamFormat从文件流中读取文件内容。它是最简单的格式实现, 并且提供了许多拆箱即用的特性(如 Checkpoint 逻辑),但是限制了可应用的优化(例如对象重用,批处理等等)。
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.time.Duration;

/**
 * 描述:
 * flink集成FileSource & forRecordStreamFormat使用 & 流模式
 * StreamFormat:从文件流中读取文件内容。它是最简单的格式实现,
 * 并且提供了许多拆箱即用的特性(如 Checkpoint 逻辑),
 * 但是限制了可应用的优化(例如对象重用,批处理等等)。
 *
 * @author 浅夏的猫
 * @version 1.0.0
 * @date 2024-02-07 15:30:22
 */
public class FileSourceRecordStreamingJob {

    public static void main(String[] args) throws Exception {

        // 创建 需要读取的文件路径Path
        Path path = new Path("D:\\flink\\file_source.txt");

        // 创建 读取文件的格式函数
        TextLineInputFormat textLineInputFormat = new TextLineInputFormat();

        // 创建 FileSource
        FileSource<String> fileSource = FileSource.
                forRecordStreamFormat(textLineInputFormat, path)
                //放开注释则使用流模式,每隔5分钟检查是否有新文件否则默认使用批模式
//                .monitorContinuously(Duration.ofMillis(5))
                .build();

        // 创建 执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 添加 FileSource 到数据流
        env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "FileSource").print();

        // 执行任务
        env.execute("FileSourceRecordStreamingJob");
    }
}

4.4 BulkFormat读取文件数据

  • BulkFormat从文件中一次读取一批记录,虽然是最 “底层” 的格式实现,但是提供了优化实现的最大灵活性。
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.impl.StreamFormatAdapter;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.time.Duration;


/**
 * 描述:flink集成FileSource & forBulkFileFormat使用 & 流模式
 * BulkFormat:从文件中一次读取一批记录。 它虽然是最 “底层” 的格式实现,但是提供了优化实现的最大灵活性。
 *
 * @author 浅夏的猫
 * @version 1.0.0
 * @date 2024-02-07 15:30:22
 */
public class FileSourceBulkStreamingJob {

    public static void main(String[] args) throws Exception {

        //创建 批量读取文件的格式函数,其实底层还是通过对单行文件读取
        BulkFormat<String, FileSourceSplit> bulkFormat = new StreamFormatAdapter<>(new TextLineInputFormat());

        // 创建 FileSource
        FileSource<String> fileSource = FileSource.
                forBulkFileFormat(bulkFormat, new Path("D:\\flink\\file_source.txt"))
                //放开注释则使用流模式,每隔5分钟检查是否有新文件,否则默认使用批模式
//                .monitorContinuously(Duration.ofMillis(5))
                .build();

        // 创建 执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 添加 FileSource 到数据流
        env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "FileSource").print();

        // 执行任务
        env.execute("FileSourceBulkStreamingJob");
    }
}

4.5 使用小结

在上面的示例中,我们使用FileSource方法从指定路径读取文本文件,并将其转换为一个数据流,选择不同的输入格式和解析方式,然后我们调用 print 方法将数据流中的数据打印出来。

05 数据源比较

FileSource 是 Flink 中常用的数据源之一,与其他数据源相比,它具有一些优势和劣势,根据实际情况和需求,可以选择不同的数据源来满足任务的要求。

  • 优势

    • 支持读取大规模的文件数据,适用于大数据处理场景。
    • 支持并行读取和处理,能够充分利用集群资源,提高处理效率。
    • 支持多种文件格式和压缩方式,灵活性强。
  • 劣势

    • 对于小文件的处理效率较低,可能会导致资源浪费和性能下降。
    • 无法实时监控文件的变化,需要手动触发重新读取。

06 总结

FileSource 是 Apache Flink 中用于读取文件数据的重要组件,它能够高效地处理大规模的文件数据,并提供丰富的功能和灵活的用法。通过深入了解 FileSource 的工作原理和用法,可以更好地利用 Flink 来实现大规模数据文件的处理和分析任务。

通过以上详细介绍,可以对 Apache Flink 中的 FileSource 有一个全面的了解,从而更好地应用于实际的数据处理项目中

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/396464.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【VSCode】设置 一键生成vue模板 的快捷入口

问题 每次写一个组件的时候&#xff0c;都需要去手敲默认结构或者是复制粘贴&#xff0c;十分的麻烦&#xff01; 解决办法 文件 > 首选项 > 用户代码片段 > vue.json 配置vue模板 其中prefix是用来触发代码段的内容&#xff0c;即模版的快捷入口&#xff1b;body里…

【RT-DETR有效改进】可变形大核注意力 | Deformable-LKA适用于复杂背景或不同光照场景

👑欢迎大家订阅本专栏,一起学习RT-DETR👑 一、本文介绍 本文给大家带来的改进内容是Deformable-LKA(可变形大核注意力)。Deformable-LKA结合了大卷积核的广阔感受野和可变形卷积的灵活性,有效地处理复杂的视觉信息。这一机制通过动态调整卷积核的形状和大小来适…

Java实现Redis延时队列

“如何实现Redis延时队列”这个面试题应该也是比较常见的&#xff0c;解答如下&#xff1a; 使用sortedset&#xff08;有序集合&#xff09; &#xff0c;拿时间戳作为 score &#xff0c;消息内容作为key 调用 zadd 来生产消息&#xff0c;消费者用zrangebyscore 指令获取 N …

【Vuforia+Unity】01实现单张多张图片识别产生对应数字内容

1.官网注册 Home | Engine Developer Portal 2.下载插件SDK&#xff0c;导入Unity 3.官网创建数据库上传图片&#xff0c;官网处理成数据 下载好导入Unity&#xff01; 下载好导入Unity&#xff01; 下载好导入Unity&#xff01; 下载好导入Unity&#xff01; 4.在Unity设…

iconfont的使用(最详解)

目录 一、Iconfont是什么&#xff1f; 二、Iconfont如何使用 1.官网注册 2.新建项目 3.项目中使用 Unicode方式 Font class方式 Symbol方式 三、总结 一、Iconfont是什么&#xff1f; iconfont是阿里旗下的一套图标库&#xff0c;UI设计师设计号图标后&#xff0c;会…

FL Studio21.2注册激活码免费版安装包下载

FL Studio 21的音乐编辑功能强大而全面&#xff0c;能够满足音乐制作人在音乐创作过程中的各种需求。以下是一些主要特点&#xff1a; FL Studio 21 Win-安装包下载如下: https://wm.makeding.com/iclk/?zoneid55981 FL Studio 21 Mac-安装包下载如下: https://wm.makedin…

Java 基于微信小程序的考研论坛系统,附源码

博主介绍&#xff1a;✌程序员徐师兄、7年大厂程序员经历。全网粉丝12w、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专栏推荐订阅&#x1f447;…

使用SiteScan合理信息收集

一、介绍 作者&#xff1a;kracer 定位&#xff1a;专注一站式解决渗透测试的信息收集任务。 语言&#xff1a;python3开发 功能&#xff1a;包括域名ip历史解析、nmap常见端口爆破、子域名信息收集、旁站信息收集、whois信息收集、网站架构分析、cms解析、备案号信息收集、…

JVM性能调优P69+

方法 线程转储的查看方式 火焰图 线程耗尽问题

压缩感知(Compressed Sensing,CS)的基础知识

压缩感知&#xff08;Compressed Sensing&#xff0c;CS&#xff09;是一种用于信号处理的技术&#xff0c;旨在以少于奈奎斯特采样定理所要求的样本频率来重构信号。该技术利用信号的稀疏性&#xff0c;即信号可以用较少的非零系数表示。压缩感知在图像获取中的应用使得在采集…

《Solidity 简易速速上手小册》第3章:Solidity 语法基础(2024 最新版)

文章目录 3.1 变量和类型3.1.1 基础知识解析详细解析变量类型深入数据类型理解变量可见性 3.1.2 重点案例&#xff1a;创建一个简单的存储合约案例 Demo&#xff1a;编写一个简单的数字存储合约案例代码&#xff1a;SimpleStorage.sol在 Remix 中进行交互&#xff1a;拓展操作&…

css3盒子

盒子模型 一.看透网页布局本质二.认识盒子三.盒子的边框&#xff08;border&#xff09;1.概念2.简写及分开写法3.合并问题&#xff08;会相加&#xff09;4.边框会影响盒子实际大小 四.盒子的内边距&#xff08;padding&#xff09;1.概念2.简写3.内边距会影响盒子实际大小4.特…

小迪安全2023最新版笔记集合--续更

小迪安全2023最新版笔记集合–续更 小迪安全2023最新笔记集合 章节一 ---- 基础入门&#xff1a; 知识点集合&#xff1a; 应用架构&#xff1a;Web/APP/云应用/三方服务/负载均衡等 安全产品&#xff1a;CDN/WAF/IDS/IPS/蜜罐/防火墙/杀毒等 渗透命令&#xff1a;文件上传下…

代码随想录刷题笔记-Day20

1. 二叉树的最近公共祖先 236. 二叉树的最近公共祖先https://leetcode.cn/problems/lowest-common-ancestor-of-a-binary-tree/ 给定一个二叉树, 找到该树中两个指定节点的最近公共祖先。 百度百科中最近公共祖先的定义为&#xff1a;“对于有根树 T 的两个节点 p、q&#x…

NS安装-CentOS服务器安装Nightscout CGM

NS CGM 安装必要条件 有自己的云服务器好像没有2&#xff0c;有云服务器就行了 安装顺序 先安装数据库&#xff0c;目前支持的是 MongoDB &#xff0c;官方推荐4&#xff0c;其实目前最新版本就行。可以用宝塔安装&#xff0c;比较简单克隆代码&#xff0c;我是放到 /opt/ns…

自动化上位机开发C#100例:如何用面向对象的方式封装雷赛运动控制卡EtherCAT总线卡(C#代码)

自动化上位机开发C#100例:雷赛运动控制卡EtherCAT总线卡C#封装类 文章目录 LTDMC.dll下载LTDMC.cs LTDMC.dll C#调用封装下载ICard.cs 运动控制卡接口Card.cs 运动控制卡抽象类CardLTDMC.cs 雷赛运动控制卡EtherCAT总线卡实现类CardList.cs 总线卡列表封装 LTDMC.dll下载 最新…

HarmonyOS4.0系列——08、整合UI常用组件

HarmonyOS4.0 系列——08、UI 组件 Blank Blank 组件在横竖屏占满空余空间效果 // xxx.ets Entry Component struct BlankExample {build() {Column() {Row() {Text(Button).fontSize(18)Blank()Toggle({type: ToggleType.Switch}).margin({top: 14,bottom: 14,left: 6,righ…

Cannot invoke “java.sql.Connection.prepareStatement(String)“ because “conn“

下载sqlite-jdbc&#xff0c;放在目录下&#xff0c;然后IDEA右键jar文件选择“加入库”即可解决 Central Repository: org/xerial/sqlite-jdbc/3.36.0.1

第一个 Angular 项目 - 动态页面

第一个 Angular 项目 - 动态页面 使用的所有技巧都在下面的笔记里&#xff1a; [Angular 基础] - 数据绑定(databinding) [Angular 基础] - 指令(directives) 以上为静态页面&#xff0c;即不涉及到跨组件交流的内容 以下涉及到组件内的沟通&#xff0c;从这开始数据就“活”…

板块一 Servlet编程:第四节 HttpServletResponse对象全解与重定向 来自【汤米尼克的JAVAEE全套教程专栏】

板块一 Servlet编程&#xff1a;第四节 HttpServletResponse对象全解与重定向 一、什么是HttpServletResponse二、响应数据的常用方法三、响应乱码问题字符流乱码字节流乱码 四、重定向&#xff1a;sendRedirect请求转发和重定向的区别 在上一节中&#xff0c;我们系统的学习了…
最新文章