Flink之JDBC Sink

这里介绍一下Flink Sink中jdbc sink的使用方法,以mysql为例,这里代码分为两种,事务和非事务

  • 非事务代码
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.sql.PreparedStatement;
import java.sql.SQLException;

/**
 * @Author: J
 * @Version: 1.0
 * @CreateTime: 2023/8/2
 * @Description: 测试
 **/
public class FlinkJdbcSink {
    public static void main(String[] args) throws Exception {
        // 构建流环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 这里使用的是自定义数据源CustomizeBean(name,age,gender,hobbit),为了方便测试,换成任何数据源都可,只要和最后的要写入的表结构匹配即可
        DataStreamSource<CustomizeBean> customizeSource = env.addSource(new CustomizeSource());
        // 构建jdbc sink
        SinkFunction<CustomizeBean> jdbcSink = JdbcSink.sink(
                "insert into t_user(`name`, `age`, `gender`, `hobbit`) values(?, ?, ?, ?)", // 数据插入sql语句
                new JdbcStatementBuilder<CustomizeBean>() {
                    @Override
                    public void accept(PreparedStatement pStmt, CustomizeBean customizeBean) throws SQLException {
                        pStmt.setString(1, customizeBean.getName());
                        pStmt.setInt(2, customizeBean.getAge());
                        pStmt.setString(3, customizeBean.getGender());
                        pStmt.setString(4, customizeBean.getHobbit());
                    }
                }, // 字段映射配置,这部分就和常规的java api差不多了
                JdbcExecutionOptions.builder()
                        .withBatchSize(10) // 批次大小,条数
                        .withBatchIntervalMs(5000) // 批次最大等待时间
                        .withMaxRetries(1) // 重复次数
                        .build(), // 写入参数配置
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withDriverName("com.mysql.jdbc.Driver")
                        .withUrl("jdbc:mysql://lx01:3306/test_db?useSSL=false")
                        .withUsername("root")
                        .withPassword("password")
                        .build() // jdbc信息配置
        );
        // 添加jdbc sink
        customizeSource.addSink(jdbcSink);
        env.execute();
    }
}
  • 事务代码
import com.mysql.cj.jdbc.MysqlXADataSource;
import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.util.function.SerializableSupplier;

import javax.sql.XADataSource;

/**
 * @Author: J
 * @Version: 1.0
 * @CreateTime: 2023/8/2
 * @Description: 测试
 **/
public class FlinkJdbcSink {
    public static void main(String[] args) throws Exception {
        // 构建流环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 这里使用的是自定义数据源CustomizeBean(name,age,gender,hobbit),为了方便测试,换成任何数据源都可,只要和最后的要写入的表结构匹配即可
        DataStreamSource<CustomizeBean> customizeSource = env.addSource(new CustomizeSource());

        // 每20秒作为checkpoint的一个周期
        env.enableCheckpointing(20000);
        // 两次checkpoint间隔最少是10秒
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000);
        // 程序取消或者停止时不删除checkpoint
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        // checkpoint必须在60秒结束,否则将丢弃
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        // 同一时间只能有一个checkpoint
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        // 设置EXACTLY_ONCE语义,默认就是这个
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // checkpoint存储位置
        env.getCheckpointConfig().setCheckpointStorage("file:///Users/xxx/data/testData/checkpoint");
        // 构建ExactlyOne sink,要注意使用exactlyOnceSink需要开启checkpoint
        SinkFunction<CustomizeBean> exactlyOneJdbcSink = JdbcSink.exactlyOnceSink(
                "insert into t_user(`name`, `age`, `gender`, `hobbit`) values(?, ?, ?, ?)", // 数据插入sql语句
                (JdbcStatementBuilder<CustomizeBean>) (pStmt, customizeBean) -> {
                    pStmt.setString(1, customizeBean.getName());
                    pStmt.setInt(2, customizeBean.getAge());
                    pStmt.setString(3, customizeBean.getGender());
                    pStmt.setString(4, customizeBean.getHobbit());
                }, // 字段映射配置,这部分就和常规的java api差不多了
                JdbcExecutionOptions.builder()
                        .withMaxRetries(0) // 设置重复次数
                        .withBatchSize(25) // 设置批次大小,数据条数
                        .withBatchIntervalMs(1000) // 批次最大等待时间
                        .build(),
                JdbcExactlyOnceOptions.builder()
                        // 这里使用的mysql,所以要将这个参数设置为true,因为mysql不支持一个连接上开启多个事务,oracle是支持的
                        .withTransactionPerConnection(true)
                        .build(),
                (SerializableSupplier<XADataSource>) () -> {
                    // XADataSource 就是JDBC连接,不同的是它是支持分布式事务的连接
                    MysqlXADataSource mysqlXADataSource = new MysqlXADataSource();
                    mysqlXADataSource.setUrl("jdbc:mysql://lx01:3306/test_db?useSSL=false"); // 设置url
                    mysqlXADataSource.setUser("root"); // 设置用户
                    mysqlXADataSource.setPassword("password"); // 设置密码
                    return mysqlXADataSource;
                }
        );
        // 添加jdbc sink
        customizeSource.addSink(exactlyOneJdbcSink);
        env.execute();
    }
}
  • pom依赖
        <!-- 在原有的依赖中加入下面两个内容 -->
        
        <!-- JDBC connector -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- mysql驱动 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.28</version>
        </dependency>
  • 结果
    在这里插入图片描述
    jdbc sink的具体使用方式大概就这些内容,还是比较简单的,具体应用还要结合实际业务场景.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/56980.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

opencv 31-图像平滑处理-方框滤波cv2.boxFilter()

方框滤波&#xff08;Box Filtering&#xff09;是一种简单的图像平滑处理方法&#xff0c;它主要用于去除图像中的噪声和减少细节&#xff0c;同时保持图像的整体亮度分布。 方框滤波的原理很简单&#xff1a;对于图像中的每个像素&#xff0c;将其周围的一个固定大小的邻域内…

vue3和typescript_组件

1 components下新建myComponent.vue 2 页面中引入组件&#xff0c;传入值&#xff0c;并且绑定事件函数。 3

【Valgrind】如何使用Valgrind监控内存

&#x1f449;博__主&#x1f448;&#xff1a;米码收割机 &#x1f449;技__能&#x1f448;&#xff1a;C/Python语言 &#x1f449;公众号&#x1f448;&#xff1a;测试开发自动化【获取源码商业合作】 &#x1f449;荣__誉&#x1f448;&#xff1a;阿里云博客专家博主、5…

计算机视觉与图形学-神经渲染专题-ConsistentNeRF

摘要 Neural Radiance Fields (NeRF) 已通过密集视图图像展示了卓越的 3D 重建能力。然而&#xff0c;在稀疏视图设置下&#xff0c;其性能显着恶化。我们观察到&#xff0c;在这种情况下&#xff0c;学习不同视图之间像素的 3D 一致性对于提高重建质量至关重要。在本文中&…

【LeetCode每日一题】——766.托普利茨矩阵

文章目录 一【题目类别】二【题目难度】三【题目编号】四【题目描述】五【题目示例】六【题目提示】七【题目进阶】八【解题思路】九【时间频度】十【代码实现】十一【提交结果】 一【题目类别】 矩阵 二【题目难度】 简单 三【题目编号】 766.托普利茨矩阵 四【题目描述…

【测试开发】Mq消息重复如何测试?

本篇文章主要讲述重复消费的原因&#xff0c;以及如何去测试这个场景&#xff0c;最后也会告诉大家&#xff0c;目前互联网项目关于如何避免重复消费的解决方案。 Mq为什么会有重复消费的问题? Mq 常见的缺点之一就是消息重复消费问题&#xff0c;产生这种问题的原因是什么呢…

16、外部配置源与外部配置文件及JSON配置

外部配置源与外部配置文件及JSON配置 application.properties application.yml 这些是配置文件&#xff0c; 命令行配置、环境变量配置、系统属性配置源&#xff0c;这些属于配置源。 外部配置源的作用&#xff1a; Spring Boot相当于对Spring框架进行了封装&#xff0c;Spri…

webrtc的回声消除延迟时间估算

叫回声消除的延迟时间估算不太合理&#xff0c;这里核心就是估算调用webrtc的条件边界&#xff0c;都知道webrtc回声消除的生效的前提就是一定要拿到远端声音的信息&#xff0c;然后拿近端声音和远端声音对齐&#xff0c;从近端声音中&#xff0c;结合远端声音模拟出远端声音在…

Windows用户如何安装新版本cpolar内网穿透超详细教程

Windows用户如何安装新版本cpolar内网穿透 文章目录 Windows用户如何安装新版本cpolar内网穿透 在科学技术高度发达的今天&#xff0c;我们身边充斥着各种电子产品&#xff0c;这些电子产品不仅为我们的工作带来极大的便利&#xff0c;也让生活变得丰富多彩。我们可以使用便携的…

[Python] Pylance 插件打开 Python 的类型检查

安装 Python 插件 2.打开一个 Python 文件 可以看到右下角有一个花括号和 Python 字样&#xff0c;点击花括号&#xff08;不是 Python 字样&#xff09;打开类型检查即可&#xff1a;

酷开系统 | 酷开科技,让数据变得更有价值!

身处信息时代&#xff0c;我们每个人时刻都在生成、传递和应用数据&#xff0c;数据已经成为了现代社会中宝贵的资源之一&#xff0c;而在人工智能领域&#xff0c;数据更是被称为人工智能的“燃料”。 而在AI的发展中&#xff0c;只有拥有高质量、多样性且充分代表性的数据集…

java 定时任务不按照规定时间执行

这里写目录标题 使用异步启动可能出现的问题排查代码中添加的定时任务步骤是否正确排查是否任务阻塞&#xff0c;如果定时任务出现异常阻塞后&#xff0c;将不会在次执行java中多个Scheduled定时器不执行为了让Scheduled效率更高&#xff0c;我们可以通过两种方法将定时任务变成…

vxworks文件系统分析

参考https://www.freebuf.com/articles/endpoint/335030.html 测试固件 https://service.tp-link.com.cn/detail_download_7989.html 固件提取 binwalk解压固件&#xff0c;在第一部分即为要分析的二进制文件&#xff0c;可以拖进ida分析 设置为arm小端字节序&#xff0c;点…

【HarmonyOS】性能优化之低代码开发加载多张轮播图

【关键字】 HarmonyOS、低代码开发、Swiper组件、性能优化、分页加载 写在前面 目前使用DevEco Studio的低代码工具开发元服务时&#xff0c;通过实际测试发现&#xff0c;Swiper组件加载多张轮播图时加载显示耗时较长&#xff08;实际测试网络状态一般的情况下显示耗时达到8…

ER系列路由器多网段划分设置指南

ER系列路由器多网段划分设置指南 - TP-LINK 服务支持 TP-LINK ER系列路由器支持划分多网段&#xff0c;可以针对不同的LAN接口划分网段&#xff0c;即每一个或多个LAN接口对应一个网段&#xff1b;也可以通过一个LAN接口与支持划分802.1Q VLAN的交换机进行对接&#xff0c;实现…

Stable Diffusion:网页版 体验 / AI 绘图

一、官网地址 Stable Diffusion Online 二、Stable Diffusion AI 能做什么 Stable Diffusion AI绘图是一种基于Stable Diffusion模型的生成式AI技术&#xff0c;能够生成各种类型的图像&#xff0c;包括数字艺术、照片增强和图像修复等。以下是一些可能的应用&#xff1a; …

怎么设置文件夹密码?文件夹密码设置方法合集

为文件夹设置密码可以有效地保护文件夹的数据安全&#xff0c;那么该怎么设置文件夹密码呢&#xff1f;下面我们来一起了解一下。 文件夹保护3000 想要简单快捷的为文件夹设置密码&#xff0c;那么&#xff0c;文件夹保护3000就是最佳的选择。它提供了3种文件夹保护方式&#…

IDEA项目实践——创建Java项目以及创建Maven项目案例、使用数据库连接池创建项目简介

系列文章目录 IDEA上面书写wordcount的Scala文件具体操作 IDEA创建项目的操作步骤以及在虚拟机里面创建Scala的项目简单介绍 目录 系列文章目录 前言 一 准备工作 1.1 安装Maven 1.1.1 Maven安装配置步骤 1.1.2 解压相关的软件包 1.1.3 Maven 配置环境变量 1.1.4 配…

vue2 el-carousel轮播图和文字一起改变

vue项目的话 安装一下element依赖 npm i element-ui -S在main入口文件引入element包 我在app文件里边去写的 <template><div class"w"><el-carousel height"460px"><el-carousel-item v-for"item in items" :key"i…

C++之观察者模式(发布-订阅)

目录 模式简介 介绍 优点 缺点 代码实现 场景说明 实现代码 运行结果 模式简介 观察者模式&#xff08;Observer Pattern&#xff09;&#xff0c;也叫我们熟知的发布-订阅模式。 它是一种行为型模式。 介绍 观察者模式主要关注的是对象的一对多的关系&#xff0c; …