DataStream API(输出算子)

源算子

源算子

转换算子

转换算子

输出算子

1.连接到外部系统

        连接外部系统是计算机科学和信息技术领域中常见的一个任务,通常涉及到与外部数据源或服务进行交互。具体的方法和工具会根据不同的应用场景和需求而有所不同。以下是一些常见的连接外部系统的方法:

  1. 应用程序接口(API):许多外部系统都提供API,以便其他系统可以与其进行交互。通过API,可以使用各种编程语言和工具来获取或交换数据。常见的API包括REST API和SOAP API。
  2. 数据集成工具:这些工具可以帮助将数据从各种来源(包括数据库、文件、API等)集成到一个中央位置。一些流行的数据集成工具包括Talend、Apache NiFi和 Informatica PowerCenter。
  3. 连接器或适配器:这些是专用的软件组件,用于连接到特定的外部系统。例如,一些CRM系统提供连接器,使其他应用程序可以与CRM系统中的数据进行交互。
  4. 编程语言和框架:许多编程语言和框架可用于连接到外部系统。例如,Python是一种流行的语言,可用于通过API或Web scraping技术连接到外部系统。
  5. 第三方服务:许多第三方服务提供连接外部系统的功能,例如OAuth、SAML和OpenID Connect等身份验证协议。这些服务可以与许多不同的系统进行交互,并提供安全的身份验证和授权机制。

        连接外部系统时需要考虑许多因素,例如安全性、性能、可靠性和兼容性。因此,选择适当的工具和技术,并仔细规划和测试连接过程是非常重要的。

2.输出到文件

要将数据流输出到文件,您可以使用 Flink 的 File Sink。以下是一个示例代码片段

import org.apache.flink.api.common.io.OutputFormat;  
import org.apache.flink.streaming.api.datastream.DataStream;  
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;  
import org.apache.flink.util.IOUtils;  
  
import java.io.BufferedWriter;  
import java.io.FileWriter;  
import java.io.IOException;  
  
// 创建 StreamExecutionEnvironment  
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  
  
// 创建数据流  
DataStream<MyData> dataStream = ... // 创建数据流  
  
// 创建自定义的输出格式,用于将数据写入文件  
class MyOutputFormat implements OutputFormat<MyData> {  
    private String filePrefix = "output";  
    private int count = 0;  
    private String suffix = ".txt";  
    private String fieldDelimiter = "\t";  
    private BufferedWriter writer;  
  
    @Override  
    public void open(Configuration parameters) throws IOException {  
        filePrefix = parameters.getString("filePrefix", filePrefix);  
        suffix = parameters.getString("suffix", suffix);  
        fieldDelimiter = parameters.getString("fieldDelimiter", fieldDelimiter);  
        count = 0;  
        writer = new BufferedWriter(new FileWriter(filePrefix + count + suffix));  
    }  
  
    @Override  
    public void writeRecord(MyData record) throws IOException {  
        writer.write(record.getField1() + fieldDelimiter + record.getField2());  
        writer.newLine();  
    }  
  
    @Override  
    public void close() throws IOException {  
        writer.close();  
    }  
}  
  
// 创建 File Sink 并将其添加到数据流中  
dataStream.addSink(new RichSinkFunction<MyData>() {  
    private MyOutputFormat outputFormat;  
  
    @Override  
    public void open(Configuration parameters) throws Exception {  
        outputFormat = new MyOutputFormat();  
    }  
  
    @Override  
    public void invoke(MyData value, Context context) throws Exception {  
        outputFormat.writeRecord(value);  
    }  
});  
  
// 执行 Flink 作业并等待结果  
env.execute("Flink File Sink Example");

        在上面的示例中,我们首先创建了一个 StreamExecutionEnvironment 和一个数据流。接下来,我们定义了一个自定义的 OutputFormat 类,用于将数据写入文件。在 MyOutputFormat 类中,我们实现了 open() 方法来打开文件并设置文件名、字段分隔符等参数,writeRecord() 方法来写入数据到文件,以及 close() 方法来关闭文件。然后,我们创建了一个 RichSinkFunction,并使用 MyOutputFormat 作为序列化器。最后,我们将 Sink 添加到数据流中并执行 Flink 作业。请注意,在打开 OutputFormat 时,我们传递了作业的参数,以便在打开文件时使用。 

3.输出到 Kafka

要将数据流输出到 Kafka,您可以使用 Flink 的 Kafka Producer Sink。以下是一个示例代码片段

 

import org.apache.flink.streaming.api.datastream.DataStream;  
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;  
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;  
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;  
  
import java.util.Properties;  
  
// 创建 StreamExecutionEnvironment  
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  
  
// 创建数据流  
DataStream<MyData> dataStream = ... // 创建数据流  
  
// 配置 Kafka 连接选项  
Properties properties = new Properties();  
properties.setProperty("bootstrap.servers", "localhost:9092");  
properties.setProperty("group.id", "test");  
  
// 创建 Kafka Producer Sink 并将其添加到数据流中  
dataStream.addSink(new FlinkKafkaProducer010<MyData>("topic", new MyDataSerializationSchema(), properties));  
  
// 执行 Flink 作业  
env.execute("Flink Kafka Sink Example");

         在上面的示例中,我们首先创建了一个 StreamExecutionEnvironment 和一个数据流。接下来,我们配置了 Kafka 连接选项,包括 Kafka Broker 的地址和消费者组的 ID。然后,我们使用 FlinkKafkaProducer010 类创建一个 Kafka Producer Sink,并设置要写入的 Kafka 主题、序列化器和 Kafka 连接属性。在本例中,我们自定义了一个 MyDataSerializationSchema 类来实现 KafkaSerializationSchema 接口,用于将 MyData 对象序列化为字节数组。最后,我们将 Sink 添加到数据流中并执行 Flink 作业。

4.输出到 Redis

要将数据流输出到 Redis,您可以使用 Flink 的 Redis Sink。以下是一个示例代码片段

import org.apache.flink.streaming.api.datastream.DataStream;  
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  
import org.apache.flink.streaming.connectors.redis.RedisSink;  
import org.apache.flink.streaming.connectors.redis.common.config.RedisConfig;  
import org.apache.flink.streaming.connectors.redis.common.config.RedisConfigBuilder;  
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;  
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;  
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;  
import org.apache.flink.streaming.connectors.redis.common.mapper.StringRedisMapper;  
  
// 创建 StreamExecutionEnvironment  
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  
  
// 创建数据流  
DataStream<MyData> dataStream = ... // 创建数据流  
  
// 配置 Redis 连接选项  
RedisConfig redisConfig = new RedisConfigBuilder()  
    .setHost("localhost")  
    .setPort(6379)  
    build();  
  
// 创建 Redis Sink 并将其添加到数据流中  
dataStream.addSink(RedisSink.<String, MyData>builder()  
    // 定义 Redis Mapper,将 MyData 对象转换为 Redis 命令和参数  
    .setMapper(new StringRedisMapper<MyData>() {  
        @Override  
        public String getCommandDescription() {  
            return RedisCommandDescription.<MyData>builder()  
                // 设置 Redis 命令为 "SET"  
                .setCommand(RedisCommand.<MyData>set())  
                // 设置键名为 "key"  
                .setKey("key")  
                // 设置值为 MyData 对象的 col1 字段  
                .setValue(myData -> "value:" + myData.col1)  
                build();  
        }  
    })  
    // 配置 Redis 连接选项  
    .setRedisConfig(redisConfig)  
    build());  
  
// 执行 Flink 作业  
env.execute("Flink Redis Sink Example");

        在上面的示例中,我们首先创建了一个 StreamExecutionEnvironment,然后创建了一个数据流。接下来,我们配置了 Redis 连接选项,包括主机和端口。然后,我们使用 RedisSink 的 builder() 方法创建一个 Redis Sink,并设置 Redis Mapper。在 Redis Mapper 中,我们将 MyData 对象转换为 Redis 命令和参数。在本例中,我们使用了 StringRedisMapper,将 MyData 对象的 col1 字段作为值。最后,我们配置 Redis 连接选项并将 Sink 添加到数据流中。最后,我们执行 Flink 作业并等待结果。 

5.输出到 Elasticsearch

要将数据流输出到 Elasticsearch,您可以使用 Flink 的 Elasticsearch Sink。以下是一个示例代码片段

import org.apache.flink.streaming.api.datastream.DataStream;  
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;  
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink;  
import org.apache.flink.streaming.connectors.elasticsearch.common.config.ESClusterConfig;  
import org.apache.flink.streaming.connectors.elasticsearch.common.config.ESClusterConfigBuilder;  
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkFunctionImpl;  
  
// 创建 StreamExecutionEnvironment  
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  
  
// 创建数据流  
DataStream<MyData> dataStream = ... // 创建数据流  
  
// 配置 Elasticsearch 连接选项  
ESClusterConfig esClusterConfig = new ESClusterConfigBuilder()  
    .setClusterName("my-cluster")  
    .setHost("localhost")  
    .setPort(9200)  
    .build();  
  
// 创建 Elasticsearch Sink 并将其添加到数据流中  
dataStream.addSink(ElasticsearchSink.sink(esClusterConfig, new ElasticsearchSinkFunctionImpl<MyData>() {  
    @Override  
    public void sink(MyData value, OutputCollector<Void> out) {  
        out.collect(new MapRecord<>(Map.<String, Object>of("field1", value.col1, "field2", value.col2)));  
    }  
}));  
  
// 执行 Flink 作业  
env.execute("Flink Elasticsearch Sink Example");

         在上面的示例中,我们首先创建了一个 StreamExecutionEnvironment,然后创建了一个数据流。接下来,我们配置了 Elasticsearch 连接选项,包括集群名称、主机和端口。然后,我们使用 ElasticsearchSink 将数据流写入 Elasticsearch。在 ElasticsearchSink 的构造函数中,我们传递了 Elasticsearch 连接选项和 ElasticsearchSinkFunction。在 ElasticsearchSinkFunction 中,我们将 MyData 对象转换为 MapRecord,其中包含要写入 Elasticsearch 的字段和值。最后,我们执行 Flink 作业并等待结果。

6.输出到 MySQLJDBC

要将数据流输出到 MySQL 数据库,您需要使用 JDBC(Java Database Connectivity)连接器。以下是一个示例代码片段

import org.apache.flink.api.common.functions.RuntimeContext;  
import org.apache.flink.streaming.api.datastream.DataStream;  
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  
import org.apache.flink.streaming.connectors.jdbc.JdbcConnectionOptions;  
import org.apache.flink.streaming.connectors.jdbc.JdbcSink;  
import org.apache.flink.streaming.connectors.jdbc.JdbcStatementBuilder;  
import org.apache.flink.streaming.connectors.jdbc.JdbcStatementBuilderImpl;  
import org.apache.flink.streaming.connectors.jdbc.JdbcType;  
  
// 创建 StreamExecutionEnvironment  
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  
  
// 创建数据流  
DataStream<MyData> dataStream = ... // 创建数据流  
  
// 配置 JDBC 连接选项  
JdbcConnectionOptions jdbcOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()  
    .withUrl("jdbc:mysql://localhost:3306/mydatabase")  
    .withDriverName("com.mysql.jdbc.Driver")  
    .withUsername("username")  
    .withPassword("password")  
    .build();  
  
// 创建 JDBC Sink 并将其添加到数据流中  
dataStream.addSink(JdbcSink.sink(  
    "INSERT INTO mytable (col1, col2) VALUES (?, ?)", // SQL 语句模板,使用占位符表示参数  
    (JdbcStatementBuilder<MyData>) (ps, value) -> { // 参数转换函数,将 MyData 对象转换为 JDBC 参数  
        ps.setString(1, value.col1);  
        ps.setInt(2, value.col2);  
    },  
    JdbcType.STRING, // JDBC 参数类型,用于类型转换和占位符替换  
    jdbcOptions));  
  
// 执行 Flink 作业  
env.execute("Flink JDBC Sink Example");

        在上面的示例中,我们首先创建了一个 StreamExecutionEnvironment,然后创建了一个数据流。接下来,我们配置了 JDBC 连接选项,包括 JDBC URL、驱动程序名称、用户名和密码。然后,我们使用 JdbcSink 将数据流写入 MySQL 数据库。在 JdbcSink 的构造函数中,我们提供了 SQL 语句模板和参数转换函数。在参数转换函数中,我们将 MyData 对象转换为 JDBC 参数。最后,我们执行 Flink 作业并等待结果。

7.自定义 Sink 输出

        Flink 是一个流处理框架,它提供了强大的功能来处理实时数据流。与 Source 类似,Flink 也提供了 SinkFunction 接口和 RichSinkFunction 抽象类,用于将数据写入外部存储。通过实现这些接口,开发人员可以自定义写入任何外部存储的方式,如数据库、文件系统、消息队列等。

        通过简单地调用 DataStream 的 addSink() 方法,并将自定义的 SinkFunction 传递给该方法,开发人员可以轻松地将数据写入外部存储。这样,开发人员可以更加灵活地处理数据流,并将处理后的结果存储在所需的位置。

        Flink 的 SinkFunction 接口定义了将数据写入外部存储的基本操作。开发人员需要实现该接口,并覆盖其中的一些方法,如 open()、invoke() 和 close()。RichSinkFunction 是 SinkFunction 的一个扩展,它提供了更多的功能和灵活性,如处理异常和延迟触发等。

        Flink 的 SinkFunction 和 RichSinkFunction 抽象类为开发人员提供了灵活的数据写入方式,使得开发人员可以轻松地将数据流写入到任何外部存储中。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/343359.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

什么是 Web3.0

什么是Web3.0 对于 Web3.0 的解释网上有很多&#xff0c;目前来说 Web3.0 是一个趋势&#xff0c;尚未有明确的定义。我们今天讨论下几个核心的点&#xff0c;就能很好的理解 Web3.0 要解决哪些问题 谁创造数据&#xff0c;这里的数据可以是一篇博客&#xff0c;一段视频&…

Linux的例行性工作(计划任务)

目录 一、单一执行的例行性任务--at&#xff08;一 次性&#xff09; 1、安装 2、启动服务 3、at命令详解 1&#xff09;格式 2&#xff09;参数 3&#xff09;时间格式 4、实例 二、循环执行的例行性任务-- crontab&#xff08;周期性&#xff09; 1、crontd服务 2…

【Go面试向】defer与time.sleep初探

【Go面试向】defer与time.sleep初探 大家好 我是寸铁&#x1f44a; 总结了一篇defer传参与time.sleep初探的文章✨ 喜欢的小伙伴可以点点关注 &#x1f49d; 请大家看下面这段代码&#xff0c;看运行结果会出现什么&#xff0c;为什么&#xff1f; 问题 demo package mainim…

基于SpringBoot Vue航空机票预订系统

大家好✌&#xff01;我是Dwzun。很高兴你能来阅读我&#xff0c;我会陆续更新Java后端、前端、数据库、项目案例等相关知识点总结&#xff0c;还为大家分享优质的实战项目&#xff0c;本人在Java项目开发领域有多年的经验&#xff0c;陆续会更新更多优质的Java实战项目&#x…

Qt-QFileDialog保存文件及获取带扩展名的文件名

正确用法 QFileDialog dialog(this, "Save File", QDir::currentPath(), "Text Files (.txt)"); dialog.setAcceptMode(QFileDialog::AcceptSave); dialog.setDefaultSuffix("txt"); // << if (!dialog.exec())return; QString fileName …

代码随想录算法训练营第28天 | 93.复原IP地址 + 78.子集 + 90.子集II

今日任务 93.复原IP地址 78.子集 90.子集II 93.复原IP地址 - Medium 题目链接&#xff1a;力扣&#xff08;LeetCode&#xff09;官网 - 全球极客挚爱的技术成长平台 有效 IP 地址 正好由四个整数&#xff08;每个整数位于 0 到 255 之间组成&#xff0c;且不能含有前导 0&a…

第一篇【传奇开心果系列】beeware的toga开发移动应用:轮盘抽奖移动应用

系列博文目录 beeware的toga开发移动应用示例系列博文目录一、项目目标二、开发传奇开心果轮盘抽奖安卓应用编程思路三、传奇开心果轮盘抽奖安卓应用示例代码四、补充抽奖逻辑实现五、开发传奇开心果轮盘抽奖苹果手机应用编程思路六、开发传奇开心果轮盘抽奖苹果手机应用示例代…

仓储管理系统——软件工程报告(总体设计)③

总体设计 一、需求规定 软件工程仓库存储管理系统的需求规定是确保系统能够满足用户期望、提高工作效率、确保数据安全性和系统可维护性的基石。其涵盖了功能性、性能、数据管理、用户界面和系统可维护性等多个方面。通过严格的验收标准&#xff0c;可以确保系统在实际应用中…

Linux: hardware: HP: DIMM

今天遇到一个问题是服务器上BIOS检查DIMM出现错误&#xff1a; 462-Uncorrectable Memory Error Threshold Exceeded(Processor 1, DIMM 14). The DIMM is mapped out and is currently not available. Action: Take corrective action for the failing DIMM. Re-map all DIMMs…

eNSP学习——配置通过STelnet登陆系统

目录 背景 实验内容 实验目的 实验步骤 实验拓扑 详细配置过程 基础配置 配置SSH server 配置SSH client 配置SFTP server 与client 背景 由于Telnet缺少安全的认证方式&#xff0c;而且传输过程采用的是TCP进行明文传输。单纯的提供Telnet服务容易招致主机IP地址欺骗、路…

机器学习之matplotlib学习

matplotlib库学习 matplotlib库的介绍折线图的绘制导入excel表数据绘制折线图 柱状图的绘制散点图的绘制扇形图的绘制总结 matplotlib库的介绍 折线图的绘制 绘制折线图使用plot函数进行绘制 第一个参数为x 横坐标&#xff0c;第二个参数为y纵坐标&#xff0c;第三个参数为线的…

linux的kali安装,换源,更新包

下载kali kali.org进入官网后点第二个 然后点第一个 解压kali 下载后获得.7z压缩包&#xff0c;建议移动到合适自己电脑的位置进行解压&#xff0c;我喜欢放在D盘 启动kali 双击进入解压出的文件夹&#xff0c;将唯一一个.vmx文件用vmware打开&#xff08;没装的自行提前装…

关于使用jdk8自带的日期类getDayOfWeek()的详细解释

问题引入 我们会发现getDayOfWeek()这个函数和其他自带函数不一样 直接写会报错 但是如果我们将他变成getDayOfWeek().getValue() 又能够正常运行,我们这次就来看看是为什么 解释 进入getDayOfWeek()源码查看 我们进入getDayOfWeek()的源码中查看 我们可以发现他给我们返回的…

Android 内存优化 内存泄漏

内存抖动 内存抖动是由于短时间内有大量对象进出新生区导致的&#xff0c;内存忽高忽低&#xff0c;有短时间内快速上升和下落的趋势&#xff0c;分析图呈锯齿状。 它伴随着频繁的GC&#xff0c;GC 会大量占用 UI 线程和CPU 资源&#xff0c;会导致APP 整体卡顿&#xff0c;甚…

Linux下用树莓派DS18B20温度传感器读取温度并上传至服务端

目录 一、DS18B20温度传感器 二、逻辑分析 三、实战操作 1、服务端 2、客户端 3、运行结果 一、DS18B20温度传感器 DS18B20是比较常用到的温度传感器&#xff0c;采用单总线控制。是美国DALLAS半导体公司继DS1820之后最新推出的一种改进型智能温度传感器。关于该温度传感…

Spring为啥不推荐使用@Autowired注解?

引言 使用IDEA开发时&#xff0c;同组小伙伴都喜欢用Autowired注入&#xff0c;代码一片warning&#xff0c;看着很不舒服&#xff0c;Autowired作为Spring的亲儿子&#xff0c;为啥在IDEA中提示了一个警告&#xff1a;Field injection is not recommended 想搞清楚这个问题之…

Unity3d C#实现三维场景中图标根据相机距离动态缩放功能

前言 如题的需求&#xff0c;其实可以通过使用UI替代场景中的图标来实现&#xff0c;不过这样UI的处理稍微麻烦&#xff0c;而且需要在图标上添加粒子特效使用SpriteRender更方便快捷。这里就根据相机离图标的位置来计算图标的缩放大小即可。这样基本保持了图标的大小&#xf…

【51单片机】IO 扩展(串转并)--74HC595

0、前言 参考&#xff1a; 普中 51 单片机开发攻略 第12章 【51单片机入门教程-2020版 程序全程纯手打 从零开始入门】 https://www.bilibili.com/video/BV1Mb411e7re/?p21&share_sourcecopy_web&vd_source77e36f24add8dc77c362748ffb980148 nop()是什么语句&#…

LLM:RoPE - 开源代码中的实现 (下)

本文着重学习一下开源代码中关于RoPE的实现:ChatGLM-6B、ChatGLM2-6B、LLAMA 回顾一下RoPE位置编码: 1:对于 token 序列中的每个词嵌入向量,首先计算其对应的 query 和 key 向量 2:然后对每个 token 位置都计算对应的旋转位置编码 3:接着对每个 token 位置的 query 和 …

防御保护---信息安全概述

文章目录 前言一、pandas是什么&#xff1f;二、使用步骤 1.引入库2.读入数据总结 本章要求 了解信息安全的基本内容了解信息安全的脆弱性及安全攻击了解信息安全要素及整体安全解决方案 一.信息安全概述 信息安全概述 信息安全是指保护信息免受未经授权的访问、使用、披露、…
最新文章