10分钟了解Flink SQL使用

Flink 是一个流处理和批处理统一的大数据框架,专门为高吞吐量和低延迟而设计。开发者可以使用SQL进行流批统一处理,大大简化了数据处理的复杂性。本文将介绍Flink SQL的基本原理、使用方法、流批统一,并通过几个例子进行实践。

1、Flink SQL基本原理

Flink SQL建立在Apache Flink之上,利用Flink的强大处理能力,使得用户可以使用SQL语句进行流数据和批数据处理。Flink SQL既支持实时的流数据处理,也支持有界的批数据处理。

Flink SQL用SQL作为处理数据的接口语言,将SQL语句转换成数据流图(Dataflow Graph),再由Flink引擎执行。

2、Flink SQL固定编码套路

使用Flink SQL时,我们通常会遵循如下编码套路,这些套路和使用Flink API的套路是一样的:

  • 环境准备:初始化一个TableEnvironment对象,它是执行Flink SQL语句的核心。这个环境可以是流数据环境,也可以是批数据环境。

  • 数据源定义:通过CREATE TABLE语句定义输入数据源(source),可以是Kafka、CSV文件等。

  • 数据处理:编写SQL语句对数据进行处理,如查询、过滤、聚合等。

  • 数据输出:通过CREATE TABLE定义输出数据源(sink),并将处理结果输出。

3、Flink SQL代码示例

以下是一个从CSV文件读取数据,通过SQL查询,再将数据输出到CSV的完整例子。

  • 先准备input.csv文件内容,如下:

1,product_A,10.5
2,product_B,20.3
3,product_C,15.8
1,product_D,12.2
2,product_A,18.7

  • 编写demo代码

编写代码之前先在pom.xml中添加依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-csv</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-json</artifactId>
    <version>${flink.version}</version>
</dependency>

示例代码如下:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class FlinkSqlDemo {
    public static void main(String[] args) throws Exception {
        // 设置环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1); //为了方便测试看效果,这里并行度设置为1
        // 使用EnvironmentSettings创建StreamTableEnvironment,明确设置为批处理模式
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .inBatchMode() // 设置为批处理模式,这样后续才能一次性的输出到csv中
                .build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

        // 定义输入数据源
        String createSourceTableDdl = "CREATE TABLE csv_source (" +
                " user_id INT," +
                " product STRING," +
                " order_amount DOUBLE" +
                ") WITH (" +
                " 'connector' = 'filesystem'," +
                " 'path' = 'file:///path/input.csv'," +
                " 'format' = 'csv'" +
                ")";
        tableEnv.executeSql(createSourceTableDdl);

//        // 编写 SQL 查询
//        String query = "SELECT user_id, SUM(order_amount) AS total_amount FROM csv_source GROUP BY user_id";
//        // 执行查询并打印
//        tableEnv.executeSql(query).print();
//        env.execute("Flink SQL Demo");

        // 定义输出数据源
        String createSinkTableDdl = "CREATE TABLE csv_sink (" +
                " user_id INT," +
                " total_amount DOUBLE" +
                ") WITH (" +
                " 'connector' = 'filesystem'," +
                " 'path' = 'file:///path/output.csv'," +
                " 'format' = 'csv'" +
                ")";
        tableEnv.executeSql(createSinkTableDdl);

        // 执行查询并将结果输出到csv_sink
        String query = "INSERT INTO csv_sink " +
                "SELECT user_id, SUM(order_amount) as total_amount " +
                "FROM csv_source " +
                "GROUP BY user_id";
        tableEnv.executeSql(query);
//        env.execute("Flink SQL Job");
    }
}

  • 执行结果如下:

4、Flink SQL做流批统一

什么是流批统一?

流批统一是大数据处理领域的一个概念,它指的是使用一套代码来同时处理流数据(Streaming)和批数据(Batching)。

流处理和批处理的区别如下:

  1. 批处理(Batch Processing):批处理是指在某一时间点处理大量数据的手段。它通常涉及到对大量静止的(不再变化的)数据集进行一次性的处理。批处理作业通常在数据集完整可用后开始执行,并且经常是在数据仓库中进行。例如,一个电商平台可能在一天结束时运行一个批处理作业来处理当天所有的交易记录。

  2. 流处理(Stream Processing):流处理是指对数据实时进行处理,通常是数据生成或接收的同时立即进行。流处理适用于连续的数据输入,这些数据一直在变化,需要立即响应。例如,社交媒体平台在接收到新的帖子时,可能会实时分析这些帖子的内容和流行趋势。

在早期,流处理和批处理通常需要不同的系统来执行。对于批处理,可能使用如Hadoop这样的框架;而对于流处理,可能使用如Apache Storm这样的框架。这就导致开发者要同时学习多种框架才能处理不同类型的数据作业。

流批统一的概念,就是将这两种数据处理方式合并到一个平台中,这样一个系统既可以处理静止的大批量数据集,也可以处理实时的数据流。这样做的优点是显而易见的:

  • 统一的API:开发人员只需要学习和使用一套工具和API,可以共享更多的代码和逻辑。

  • 维护简便:只需维护一个系统,可以减少学习成本,减轻运维压力,减少故障点。

  • 灵活的数据处理:可以根据不同的业务需求灵活选择数据处理方式。

Flink SQL流批一体的实现原理

Flink很好的实现了流批统一,可以让开发人员用相同的方式来编写批处理和流处理程序。不论是对有界(批处理)还是无界(流处理)的数据源,Flink都可以使用相同的API和处理逻辑来处理数据。

Flink 通过内置的表抽象来实现流批一体,这里的"表"可以是动态变化的(例如,来自实时数据流的表)或是静态的(例如,存储在文件或数据库中的批量数据表)。Flink SQL引擎会根据数据的实际来源自动优化执行计划。

Flink SQL的流批统一核心在于三点:

  • 统一的API和SQL语义:Flink SQL提供一致的查询构建块(如窗口、时间处理函数),这些在流处理和批处理中语义一致,确保不同模式下行为的统一性。

  • 透明的状态处理:无论是流处理还是批处理,Flink都能够保持和恢复状态,为开发者提供一致的高容错性体验。

  • 多模态存储和处理能力:Flink SQL能够访问不同存储介质的数据,这意味着相同的SQL语句可以无缝在流数据和存储的批量数据上执行。

Flink SQL流批统一的代码示例

以下是一个完整的代码示例,用Flink来实现流批统一处理。Flink同时从Kafka 和 CSV读取数据,然后合并查询再输出结果:

  • 代码示例

代码中,先配置了Flink的流处理环境和表环境,然后用DDL语句在Flink中注册了Kafka和文件系统数据源。接着执行了一个SQL查询来合并来自这两种数据源的数据,并计算总金额。最后,打印出查询结果并开始执行Flink作业。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class StreamBatchUnifiedDemo {
    public static void main(String[] args) throws Exception {
        // 设置流处理的环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .inStreamingMode()
                .build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

        // Kafka 流处理表
        String createKafkaSourceDDL = "CREATE TABLE kafka_stream_orders (" +
                "order_id STRING," +
                "amount DOUBLE)" +
                "WITH (" +
                "'connector' = 'kafka'," +
                "'topic' = 'topic_test'," +
                "'properties.bootstrap.servers' = '10.20.1.26:9092'," +
                "'format' = 'json'," +
                "'scan.startup.mode' = 'latest-offset'" +
                ")";
        tableEnv.executeSql(createKafkaSourceDDL);

        // 文件系统批处理表
        String createFilesystemSourceDDL = "CREATE TABLE file_batch_orders (" +
                "order_id STRING," +
                "amount DOUBLE)" +
                "WITH (" +
                "'connector' = 'filesystem'," +
                "'path' = 'file:///Users/yclxiao/Project/bigdata/flink-blog/doc/input_order.csv'," +
                "'format' = 'csv'" +
                ")";
        tableEnv.executeSql(createFilesystemSourceDDL);

        // 执行统一查询,计算总金额
        Table resultTable = tableEnv.sqlQuery("SELECT SUM(amount) FROM (" +
                "SELECT amount FROM kafka_stream_orders " +
                "UNION ALL " +
                "SELECT amount FROM file_batch_orders)");

        // 打印结果
        tableEnv.toRetractStream(resultTable, Row.class).print();

        // 开始执行程序
        env.execute("Stream-Batch Unified Job");
    }
}

  • 执行效果

通过以上示例代码,可以看出Flink SQL的流批一体设计:相同的SQL语句可以用在流处理和批处理中,而不需要做任何修改。Flink背后的执行引擎会自动根据数据的特性(流或者批)来进行相应的优化执行。

这就是Flink SQL非常强大的地方,它减少了开发者需要写不同代码逻辑的需求,简化了复杂的数据处理流程。

5、总结

Flink SQL是一个非常强大的数据处理工具,可以应对多种复杂的数据处理场景。

本文主要介绍了Flink SQL的基本原理、编码套路、流批统一,再结合正确的代码示例进行实践。希望对你有帮助。

文章转载自:不焦躁的程序员

原文链接:https://www.cnblogs.com/mangod/p/18187474

体验地址:引迈 - JNPF快速开发平台_低代码开发平台_零代码开发平台_流程设计器_表单引擎_工作流引擎_软件架构

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/620507.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

AI论文写作生成器-AI自动生成论文-10分钟/万字论文

在当今这个高速发展的信息时代&#xff0c;科技的进步无疑给我们的工作、学习乃至生活带来了翻天覆地的变化。学术研究领域也不例外&#xff0c;近期一款新型的论文自动写作神器诞生了&#xff0c;它凭借人工智能技术的优势&#xff0c;为学者和研究人员撰写学术论文提供了极大…

c++opencv Project3 - License Plate Detector

俄罗斯车牌识别案例&#xff1a;实时识别车牌&#xff0c;并且读取到指定文件夹中。 惯例先展示结果图&#xff1a; 对于摄像头读取图片进行车牌匹配&#xff0c;原理和人脸识别其实是一致的。 利用训练好的模型进行匹配即可。可参考&#xff1a; 对视频实现人脸识别-CSDN博…

综合模型及应用(图论学习总结部分内容)

文章目录 前言六、综合模型及应用(以题目总结为主)分层图思想(包括拆点建图) e g 1 : 通信线路 eg1:通信线路 eg1:通信线路​​​[A-Telephone Lines](https://ac.nowcoder.com/acm/contest/1055/A)(蓝书例题) e g 2 : 小雨坐地铁 eg2:小雨坐地铁 eg2:小雨坐地铁​ [1012-小雨坐…

六甲基二硅烷用途广泛 可以工业副产物为原料进行生产

六甲基二硅烷用途广泛 可以工业副产物为原料进行生产 六甲基二硅烷&#xff0c;化学式为C6H18Si2&#xff0c;外观为无色透明液体状&#xff0c;不溶于水&#xff0c;可溶于乙醚、乙二醇、丙酮、苯、二甲苯、二甲醚等多种有机溶剂&#xff0c;有刺激性&#xff0c;有高度易燃性…

Agilent MSO9404A、Keysight MSO9404A示波器,4 GHz,4 通道,20 GSa/s

Agilent MSO9404A、Keysight MSO9404A、HP MSO9404A 示波器&#xff0c;4 GHz&#xff0c;4 通道&#xff0c;20 GSa/s Keysight MSO9404A 示波器配备 15 英寸 XGA 显示屏&#xff0c;封装深度仅为 9 英寸&#xff08;23 厘米&#xff09;&#xff0c;重量仅为 26 磅&#xff…

传统鞋业如何转型?3D数字化技术让鞋业品牌焕发新机!

数字经济时代&#xff0c;3D数字化技术在各行业都得到广泛应用&#xff0c;这其中&#xff0c;传统的鞋服行业的发展也受到了3D数字化技术的影响&#xff0c;产生了深刻的变化&#xff0c;越来越多的鞋企品牌开始尝试3D数字化营销。 比如&#xff0c;时尚运动品牌VANS就在官网上…

【Obsidian】视频笔记插件Media Extended的强大功能

我将开设一个专栏&#xff0c;介绍当下最好用的笔记软件Obsidian的使用经验和技巧。欢迎持续关注。 摘要&#xff1a;本文将首先向您介绍一款功能强大的笔记软件Obsidian&#xff0c;然后为您详细解析Obsidian的一款实用插件——Media Extended&#xff0c;帮助您更好地利用Obs…

【2024年5月备考新增】】 考前篇(1)《官方平台 - 考生模拟练习平台操作指南》

1 登录 登录中国计算机技术职业资格网(https://www.ruankao.org.cn),点击服务园地的【模拟练习】。 温馨提示:实名认证通过且注册成功的考生方可登录模拟练习。 2 下载模拟作答系统 温馨提示: 点击“下载”按钮,下载对应的模拟作答系统。未报名成功的考生不允许下载…

JPA@Entry报错Could not determine recommended JdbcType for Java type

问题很明显&#xff0c;无法自动决定类型&#xff0c;那就手动告诉该字段。 一、直接上解决方案 如果是一对一的关系用 OneToOne 如果是一对多的关系用 OneToMany 如果是多对一的关系用 ManyToOne 二、另一个无空构造函数的问题 使用注解后&#xff0c;注解报错找不到空的…

mac定时任务、自启动任务

https://quail.ink/mynotes/p/mac-startup-configuration-detailed-explanation <?xml version"1.0" encoding"UTF-8"?> <!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.d…

Visual Studio 中.net8.0(以前叫NET Core)框架和.net framewok 框架有什么区别?

更新vs到2022版本后&#xff0c;新建项目时就多出不少选项&#xff0c;这里来给大家分享下.net8.0&#xff08;以前叫NET Core&#xff09;框架和.net framewok的区别 如下图&#xff0c;不带后缀的就是使用.net8.0。 .net framewok框架选项&#xff1a; 正文开始&#xff1a;…

Ci24R1 (SOP8)2.4GHz无线收发一体、双向系统的智能家居芯片

Ci24R1 &#xff08;SOP8&#xff09;工作范围在2.4GHzISM频段&#xff0c;专为低系统应用成本的无线场合设计&#xff0c;集成嵌入式ARQ基带协议引擎的无线收发器芯片。它的工作频率范围为2400MHz-2525MHz&#xff0c;共有126个1MHz带宽的信道。 Ci24R1 &#xff08;SOP8&…

ThreadLocal 源码详解

概述 ThreadLocal是一个java提供的本地线程副本变量工具类。主要用于将私有线程和该线程存放的副本对象做一个映射&#xff0c;各个线程之间的变量互不干扰&#xff0c;在高并发场景下&#xff0c;可以实现无状态的调用&#xff0c;特别适用于各个线程依赖不通的变量值完成操作…

外卖点餐小程序平台源码系统 自由切换 轻松管理 附带源码代码包以及系统搭建教程

系统概述 外卖点餐小程序平台源码系统是一款集点餐、支付、配送、评价等功能于一体的综合性平台。该系统采用先进的互联网技术&#xff0c;支持多种支付方式&#xff0c;包括微信支付、支付宝等&#xff0c;同时支持多平台使用&#xff0c;包括微信小程序、支付宝小程序等。商…

游戏找不到steam_api64.dll如何解决,介绍5种简单有效的方法

面对“找不到steam_api64.dll&#xff0c;无法继续执行代码”的问题&#xff0c;许多游戏玩家或软件使用者可能会感到手足无措。这个错误提示意味着你的计算机系统在尝试运行某个游戏或应用程序时&#xff0c;无法定位到一个至关重要的动态链接库文件——steam_api64.dll&#…

大模型与AIGC应用相关问题 模型大型

最近经常被问&#xff0c;你看“万亿的模型都出来了&#xff0c;你们训练的千亿模型是不是落伍了&#xff1f;”我想说&#xff1a;“虽然都叫超大模型&#xff0c;但是类型是不一样的&#xff0c;虽说每一类模型训出来都不容易&#xff0c;不过澄清一下概念还是必要的”。 大…

C# WinForm —— 17 MaskedTextBox 介绍

1. 简介 本质是文本框&#xff0c;但它可以通过掩码来区分输入的正确与否&#xff0c;可以控制输入的格式、长度 主要应用场景是&#xff1a;需要格式化输入信息的情况 2. 常用属性 属性解释(Name)控件ID&#xff0c;在代码里引用的时候会用到,一般以 mtxt 开头AsciiOnly是否…

LNMP 环境下 Nginx 1.26.0 开启 HTTP/3 QUIC 支持

前几天 Nginx 1.26.0 主线版发布了&#xff0c;明月总算抽出时间更新了&#xff0c;那么自然的也要尝试一下开启 HTTP/3 QUIC 支持了&#xff0c;今天就给大家分享一下。对于我们的网站来说开启 HTTP/3 QUIC 最大的好处是页面载入速度的提升&#xff0c;尤其是在支持 HTTP/3 QU…

怎么批量下载视频?DY视频爬虫在线提取采集工具

短视频批量下载工具&#xff0c;具有多种模块和功能&#xff0c;方便用户快速批量下载短视频。该软件的详细介绍&#xff1a; 功能模块介绍&#xff1a; 一. 搜索词批量搜索下载 视频关键词添加&#xff1a;支持添加多个视频关键词进行全平台视频搜索。历史去重&#xff1a;…

以目录创建的conda环境添加到jupyter的kernel中

场景&#xff1a;由于某些原因&#xff0c;服务器上的conda环境不能通过--name的方式创建&#xff0c;只能通过指定目录即-p的方式&#xff0c;在这种情况下该环境在conda env list中没有显示&#xff0c;无法在jupyter kernel中搜到&#xff0c;只能手动添加。 1.进入环境 # …