【Flink-1.17-教程】-【四】Flink DataStream API(7)输出算子(Sink)

【Flink-1.17-教程】-【四】Flink DataStream API(7)输出算子(Sink)

  • 1)连接到外部系统
  • 2)输出到文件
  • 3)输出到 Kafka
  • 4)输出到 MySQL(JDBC)
  • 5)自定义 Sink 输出

Flink 作为数据处理框架,最终还是要把计算处理的结果写入外部存储,为外部应用提供支持。

在这里插入图片描述

1)连接到外部系统

Flink 的 DataStream API 专门提供了向外部写入数据的方法:addSink。与 addSource 类似,addSink 方法对应着一个“Sink”算子,主要就是用来实现与外部系统连接、并将数据提交写入的;Flink 程序中所有对外的输出操作,一般都是利用 Sink 算子完成的。

Flink1.12 以前,Sink 算子的创建是通过调用 DataStream 的.addSink()方法实现的。

stream.addSink(new SinkFunction());

addSink 方法同样需要传入一个参数,实现的是 SinkFunction 接口。在这个接口中只需要重写一个方法 invoke(),用来将指定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用。

Flink1.12 开始,同样重构了 Sink 架构,

stream.sinkTo()

当然,Sink 多数情况下同样并不需要我们自己实现。之前我们一直在使用的 print 方法其实就是一种 Sink,它表示将数据流写入标准控制台打印输出。Flink 官方为我们提供了一部分的框架的 Sink 连接器。如下图所示,列出了 Flink 官方目前支持的第三方系统连接器:

在这里插入图片描述

我们可以看到,像 Kafka 之类流式系统,Flink 提供了完美对接,source / sink 两端都能连接,可读可写;而对于 Elasticsearch、JDBC 等数据存储系统,则只提供了输出写入的 sink 连接器。

除 Flink 官方之外,Apache Bahir 框架,也实现了一些其他第三方系统与 Flink 的连接器。

在这里插入图片描述

除此以外,就需要用户自定义实现 sink 连接器了。

2)输出到文件

Flink 专门提供了一个流式文件系统的连接器:FileSink,为批处理和流处理提供了一个统一的 Sink,它可以将分区文件写入 Flink 支持的文件系统。

FileSink 支持行编码(Row-encoded)和批量编码(Bulk-encoded)格式。这两种不同的方式都有各自的构建器(builder),可以直接调用 FileSink 的静态方法:

  • 行编码: FileSink.forRowFormat(basePath,rowEncoder)。
  • 批量编码: FileSink.forBulkFormat(basePath,bulkWriterFactory)。

示例:

public class SinkFile {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // TODO 每个目录中,都有 并行度个数的 文件在写入
        env.setParallelism(2);

        // 必须开启checkpoint,否则一直都是 .inprogress
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);


        DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(
                new GeneratorFunction<Long, String>() {
                    @Override
                    public String map(Long value) throws Exception {
                        return "Number:" + value;
                    }
                },
                Long.MAX_VALUE,
                RateLimiterStrategy.perSecond(1000),
                Types.STRING
        );

        DataStreamSource<String> dataGen = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator");

        // TODO 输出到文件系统
        FileSink<String> fieSink = FileSink
                // 输出行式存储的文件,指定路径、指定编码
                .<String>forRowFormat(new Path("f:/tmp"), new SimpleStringEncoder<>("UTF-8"))
                // 输出文件的一些配置: 文件名的前缀、后缀
                .withOutputFileConfig(
                        OutputFileConfig.builder()
                                .withPartPrefix("atguigu-")
                                .withPartSuffix(".log")
                                .build()
                )
                // 按照目录分桶:如下,就是每个小时一个目录
                .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH", ZoneId.systemDefault()))
                // 文件滚动策略:  1分钟 或 1m
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(Duration.ofMinutes(1))
                                .withMaxPartSize(new MemorySize(1024*1024))
                                .build()
                )
                .build();


        dataGen.sinkTo(fieSink);

        env.execute();
    }
}

3)输出到 Kafka

1、添加 Kafka 连接器依赖。

由于我们已经测试过从 Kafka 数据源读取数据,连接器相关依赖已经引入,这里就不重复介绍了。

2、启动 Kafka 集群。

3、编写输出到 Kafka 的示例代码。

(1)输出无 key 的 record:

public class SinkKafka {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 如果是精准一次,必须开启checkpoint(后续章节介绍)
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);

        SingleOutputStreamOperator<String> sensorDS = env
                .socketTextStream("hadoop102", 7777);

        /**
         * Kafka Sink:
         * TODO 注意:如果要使用 精准一次 写入Kafka,需要满足以下条件,缺一不可
         * 1、开启checkpoint(后续介绍)
         * 2、设置事务前缀
         * 3、设置事务超时时间:   checkpoint间隔 <  事务超时时间  < max的15分钟
         */
        KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
                // 指定 kafka 的地址和端口
                .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")
                // 指定序列化器:指定Topic名称、具体的序列化
                .setRecordSerializer(
                        KafkaRecordSerializationSchema.<String>builder()
                                .setTopic("ws")
                                .setValueSerializationSchema(new SimpleStringSchema())
                                .build()
                )
                // 写到kafka的一致性级别: 精准一次、至少一次
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                // 如果是精准一次,必须设置 事务的前缀
                .setTransactionalIdPrefix("atguigu-")
                // 如果是精准一次,必须设置 事务超时时间: 大于checkpoint间隔,小于 max 15分钟
                .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10*60*1000+"")
                .build();

        sensorDS.sinkTo(kafkaSink);

        env.execute();
    }
}

(2)自定义序列化器,实现带 key 的 record:

public class SinkKafkaWithKey {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
        env.setRestartStrategy(RestartStrategies.noRestart());

        SingleOutputStreamOperator<String> sensorDS = env
                .socketTextStream("hadoop102", 7777);

        /**
         * 如果要指定写入kafka的key
         * 可以自定义序列器:
         * 1、实现 一个接口,重写 序列化 方法
         * 2、指定key,转成 字节数组
         * 3、指定value,转成 字节数组
         * 4、返回一个 ProducerRecord对象,把key、value放进去
         *
         */
        KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
                .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")
                .setRecordSerializer(

                        new KafkaRecordSerializationSchema<String>() {

                            @Nullable
                            @Override
                            public ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) {
                                String[] datas = element.split(",");
                                byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);
                                byte[] value = element.getBytes(StandardCharsets.UTF_8);
                                return new ProducerRecord<>("ws", key, value);
                            }
                        }
                )
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .setTransactionalIdPrefix("atguigu-")
                .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "")
                .build();

        sensorDS.sinkTo(kafkaSink);

        env.execute();
    }
}

4、运行代码,在 Linux 主机启动一个消费者,查看是否收到数据。

[hadoop102 ~]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic ws

4)输出到 MySQL(JDBC)

写入数据的 MySQL 的测试步骤如下。

1、添加依赖。

添加 MySQL 驱动:

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.27</version>
</dependency>

官方还未提供 flink-connector-jdbc 的 1.17.0 的正式依赖,暂时从 apache snapshot 仓库下载,pom 文件中指定仓库路径:

<repositories>
<repository>
<id>apache-snapshots</id>
<name>apache snapshots</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
</repository>
</repositories>

添加依赖:

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>1.17-SNAPSHOT</version>
</dependency>

如果不生效,还需要修改本地 maven 的配置文件,mirrorOf 中添加如下标红内容:

<mirror>
<id>aliyunmaven</id>
<mirrorOf>*,!apache-snapshots</mirrorOf>
<name>阿里云公共仓库</name>
<url>https://maven.aliyun.com/repository/public</url>
</mirror>

2、启动 MySQL,在 test 库下建表 ws。

mysql>
CREATE TABLE `ws` (
`id` varchar(100) NOT NULL,
`ts` bigint(20) DEFAULT NULL,
`vc` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

3、编写输出到 MySQL 的示例代码。

public class SinkMySQL {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("hadoop102", 7777)
                .map(new WaterSensorMapFunction());

        /**
         * TODO 写入mysql
         * 1、只能用老的sink写法: addsink
         * 2、JDBCSink的4个参数:
         *    第一个参数: 执行的sql,一般就是 insert into
         *    第二个参数: 预编译sql, 对占位符填充值
         *    第三个参数: 执行选项 ---》 攒批、重试
         *    第四个参数: 连接选项 ---》 url、用户名、密码
         */
        SinkFunction<WaterSensor> jdbcSink = JdbcSink.sink(
                "insert into ws values(?,?,?)",
                new JdbcStatementBuilder<WaterSensor>() {
                    @Override
                    public void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {
                        //每收到一条WaterSensor,如何去填充占位符
                        preparedStatement.setString(1, waterSensor.getId());
                        preparedStatement.setLong(2, waterSensor.getTs());
                        preparedStatement.setInt(3, waterSensor.getVc());
                    }
                },
                JdbcExecutionOptions.builder()
                        .withMaxRetries(3) // 重试次数
                        .withBatchSize(100) // 批次的大小:条数
                        .withBatchIntervalMs(3000) // 批次的时间
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:mysql://hadoop102:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
                        .withUsername("root")
                        .withPassword("000000")
                        .withConnectionCheckTimeoutSeconds(60) // 重试的超时时间
                        .build()
        );

        sensorDS.addSink(jdbcSink);

        env.execute();
    }
}

4、运行代码,用客户端连接 MySQL,查看是否成功写入数据。

5)自定义 Sink 输出

如果我们想将数据存储到我们自己的存储设备中,而 Flink 并没有提供可以直接使用的连接器,就只能自定义 Sink 进行输出了。与 Source 类似,Flink 为我们提供了通用的 SinkFunction 接口和对应的 RichSinkDunction 抽象类,只要实现它,通过简单地调用 DataStream 的 .addSink() 方法就可以自定义写入任何外部存储。

stream.addSink(new MySinkFunction<String>());

在实现 SinkFunction 的时候,需要重写的一个关键方法 invoke(),在这个方法中我们就可以实现将流里的数据发送出去的逻辑。

这种方式比较通用,对于任何外部存储系统都有效;不过自定义 Sink 想要实现状态一致性并不容易,所以一般只在没有其它选择时使用。实际项目中用到的外部连接器 Flink 官方基本都已实现,而且在不断地扩充,因此自定义的场景并不常见。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/344284.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

2024年第十二届亚洲机械与材料工程国际会议(ACMME 2024)即将召开!

时间&#xff1a;2024年6月14-17日 地点&#xff1a;日本京都先端科学大学太秦校区 会议官网&#xff1a;第11届ACMME |日本京都 2024年第十二届亚洲机械与材料工程会议 &#xff08;ACMME 2024&#xff09;将于2024年6月14日-17日在日本京都先端科学大学召开。亚洲机械与材料…

STL第二讲

第二讲 视频标准库源码版本&#xff1a;gnu c 2.9.1/4.9/Visual C OOP vs GP GP是将datas与methods分开&#xff0c;OOP相反&#xff1b; 为什么list不能使用全局的sort&#xff1f; 因为sort源代码&#xff1a; *(first (last - first)/2) // 此迭代器只能是随机访问迭代…

Jedis(一)与Redis的关系

一、Jedis介绍&#xff1a; 1、背景&#xff1a; Jedis是基于Java语言的Redis的客户端&#xff0c;Jedis Java Redis。Redis不仅可以使用命令来操作&#xff0c;现在基本上主流的语言都有API支持&#xff0c;比如Java、C#、C、PHP、Node.js、Go等。在官方网站里有一些Java的…

多维时序 | Matlab实现GWO-TCN-Multihead-Attention灰狼算法优化时间卷积网络结合多头注意力机制多变量时间序列预测

多维时序 | Matlab实现GWO-TCN-Multihead-Attention灰狼算法优化时间卷积网络结合多头注意力机制多变量时间序列预测 目录 多维时序 | Matlab实现GWO-TCN-Multihead-Attention灰狼算法优化时间卷积网络结合多头注意力机制多变量时间序列预测效果一览基本介绍程序设计参考资料 效…

如何自己制作一个属于自己的小程序?

在这个数字化时代&#xff0c;小程序已经成为了我们生活中不可或缺的一部分。它们方便快捷&#xff0c;无需下载安装&#xff0c;扫一扫就能使用。如果你想拥有一个属于自己的小程序&#xff0c;不论是为了个人兴趣&#xff0c;还是商业用途&#xff0c;都可以通过编程或者使用…

VisualODX——ODX数据自动转换工具 加快开发进度

在创建ODX数据库的过程中&#xff0c;我们需要录入大量的数据以及应对多种数据格式。这不仅费时费力&#xff0c;而且还需很高的人力成本&#xff0c;且其错误率也非常高&#xff0c;从而导致开发速度缓慢、效率低下。基于多年的汽车行业诊断经验&#xff0c;我们开发了VisualO…

SpringBoot+Vue充电桩管理系统 附带详细运行指导视频

文章目录 一、项目演示二、项目介绍三、运行截图四、主要代码1. 分页获取预约数据代码2.保存预约信息代码3.修改订单状态代码 一、项目演示 项目演示地址&#xff1a; 视频地址 二、项目介绍 项目描述&#xff1a;这是一个基于SpringBootVue框架开发的充电桩管理系统。首先&…

【面试突击】微信亿级朋友圈的社交系统设计

微信亿级朋友圈的社交系统设计 先来说一下业务需求吧&#xff1a; 每个用户可以发朋友圈&#xff0c;可以点赞&#xff0c;评论可以设置权限&#xff0c;不看某些人朋友圈、不让某些人看你的朋友圈可以刷朋友圈中其他人的动态 对于这样的系统设计&#xff0c;主要从业务来考虑…

行测-判断:3.定义判断

行测-判断&#xff1a;3.定义判断 每道题先给出一个概念的定义&#xff0c;然后分别列出四种情况&#xff0c;要求报考者严格依据定义选出一个最符合或者最不符合该定义的答案。 A 1. 读得准 1.1 关键词&#xff08;主体&#xff0c;客体&#xff09; A B C&#xff0c;C选项…

2024年护眼台灯选购指南▏好视力、书客、欧普值得购买吗?

最近&#xff0c;护眼台灯备受关注&#xff0c;许多博主纷纷推崇。考虑到孩子即将放寒假&#xff0c;市场上的产品琳琅满目&#xff0c;因此我决定认真研究一番&#xff0c;辨别其中的劣质和精品。我选择了市场口碑较好的三款产品&#xff0c;进行了深入评估&#xff0c;主要从…

Tuya MiniApp 设计指南

一. 简介 小程序以其轻量、便捷的特性&#xff0c;在移动端 App 中被越来越广泛地使用。Tuya 作为物联网生态的头部 App 企业之一&#xff0c;开放 Tuya MiniApp 开发能力&#xff0c;以帮助开发者更好地服务用户。 对于开发者&#xff0c;Tuya MiniApp 以全新的开放模式&…

【Linux】文件周边001之系统文件IO

&#x1f440;樊梓慕&#xff1a;个人主页 &#x1f3a5;个人专栏&#xff1a;《C语言》《数据结构》《蓝桥杯试题》《LeetCode刷题笔记》《实训项目》《C》《Linux》《算法》 &#x1f31d;每一个不曾起舞的日子&#xff0c;都是对生命的辜负 目录 前言 1.C语言文件IO 1.1…

二维码登录实现流程

二维码登录实现 如何实现登录&#xff1f; 二维码登录本质上也是一种登录认证方式。既然是登录认证&#xff0c;要做的也就两件事情&#xff01; 告诉系统我是谁向系统证明我是谁 比如账号密码登录&#xff0c;账号就是告诉系统我是谁&#xff0c; 密码就是向系统证明我是谁…

2023年第十六届中国系统架构师大会(SACC2023):核心内容与学习收获(附大会核心PPT下载)

大会以“数字转型 架构演进”为主题&#xff0c;聚焦系统架构在数字化转型中的演进和应用。 与往届相比&#xff0c;本届大会最大的变化是从原来的大会演讲模式变革为专题研讨会模式。专题研讨会主题内容紧扣行业落地实践痛点与难点&#xff0c;多角度聚焦行业的架构演进之路。…

RabbitMQ进阶篇【理解➕应用】

&#x1f973;&#x1f973;Welcome 的Huihuis Code World ! !&#x1f973;&#x1f973; 接下来看看由辉辉所写的关于RabbitMQ的相关操作吧 目录 &#x1f973;&#x1f973;Welcome 的Huihuis Code World ! !&#x1f973;&#x1f973; 一.什么是交换机 1.概念释义 2.例…

揭秘亚信安慧AntDB15年平稳运行的升级改造经验

亚信安慧AntDB是一款备受认可的国产化数据库系统&#xff0c;它在国内市场中积累了丰富的升级改造经验。光是大的版本升级就已经实现了8次&#xff0c;让AntDB持续保持了15年的平稳运行。本文将深入探索AntDB的升级改造之路&#xff0c;揭示背后的细节。 AntDB作为一款自主研发…

k8s图形化管理工具rancher

Rancher和K8s的关系&#xff0c;Rancher和K8s区别对比。简单来说&#xff0c;K8s&#xff08;Kubernetes&#xff09;为企业提供了一种一致的方式来管理任何计算基础架构&#xff0c;Rancher则是用于管理位于任何位置的Kubernetes集群的完整平台。如果用户是自己手动部署K8s集群…

java web mvc-08-Grails 入门介绍

拓展阅读 Spring Web MVC-00-重学 mvc mvc-01-Model-View-Controller 概览 web mvc-03-JFinal web mvc-04-Apache Wicket web mvc-05-JSF JavaServer Faces web mvc-06-play framework intro web mvc-07-Vaadin web mvc-08-Grails 开源 The jdbc pool for java.(java …

2024年,AIGC赛道专利文献和软著大全

一、周红伟-深度学习国际发明专利 深度学习国际发明专利 基于深度学习的图像检索方法及装置&#xff0c;专利公开公告号&#xff1a;CN107368614A。专利类型&#xff1a;发明公布。发明人&#xff1a;周红伟;李凯;任伟;李庆;郭奇杰;周杨;刘川郁 二、机器学习算法发表文献 Simul…

多窗口大小和Ticker分组的Pandas滚动平均值

最近一个学弟在在进行数据分析时&#xff0c;经常需要计算不同时间窗口的滚动平均线。当数据是多维度的&#xff0c;比如包含多个股票或商品的每日价格时&#xff0c;我们可能需要为每个维度计算滚动平均线。然而&#xff0c;如果我们使用传统的groupby和apply方法&#xff0c;…
最新文章