Windows系统安装Flink及实现MySQL之间数据同步

        Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。Flink的设计目标是在所有常见的集群环境中运行,并以内存执行速度和任意规模来执行计算。它支持高吞吐、低延迟、高性能的流处理,并且是一个面向流处理和批处理的分布式计算框架,将批处理看作一种特殊的有界流。

Flink的主要特点包括:

  1. 事件驱动型:Flink是一个事件驱动型的应用,可以从一个或多个事件流提取数据,并根据到来的事件触发计算、状态更新或其他外部动作。
  2. 支持有状态计算:Flink提供了Extactor-once语义及checkpoint机制,支持带有事件操作的流处理和窗口处理,以及灵活的窗口处理(如时间窗口、大小窗口等)。
  3. 轻量级容错处理:Flink使用savepoint进行错误恢复,可以在出现故障时快速恢复任务。
  4. 高吞吐、低延迟、高性能:Flink的设计目标是在保证数据处理稳定性的同时,实现高吞吐、低延迟、高性能的流处理。
  5. 支持大规模集群模式:Flink支持在yarn、Mesos、k8s等大规模集群环境中运行。
  6. 支持多种编程语言:Flink对java、scala、python都提供支持,但最适合使用java进行开发。

        Flink的应用场景非常广泛,可以用于实时流数据的分析计算、实时数据与维表数据关联计算、实时数仓建设、ETL(提取-转换-加载)多存储系统之间进行数据转化和迁移等场景。同时,Flink也适用于事件驱动型应用场景,如以kafka为代表的消息队列等。

1.Winows系统安装Flink

下载地址:Downloads | Apache Flink

选择 Apache Flink 1.16.0 - 2022-10-28 (Binaries)

下载 flink-1.16.0-bin-scala_2.12.tgz

使用CMD窗口,在Flink安装路径/bin目录下启动start-cluster.bat

访问http://localhost:8081,界面如下:

2.使用Flink实现MySQL数据库之间数据同步(JAVA)

<flink.version>1.16.0</flink.version>
<flink-cdc.version>2.3.0</flink-cdc.version>

1.创建Flink流处理运行环境。

2.设置流处理并发数。

3.设置Flink存档间隔时间,单位为ms,当同步发生异常时会恢复最近的checkpoint继续同步。

4.在Flink中创建中间同步数据库。

5.在Flink中创建中间表flink_source,来源于MySQL表source,(注意connector为mysql-cdc)。

6.在Flink中创建中间表flink_sink,来源于MySQL表sink。

7.将Flink中间表来源表数据写入flink_sink表,Flink会根据MySQL binlog中source表变化,动态更新flink_sink表,同时会将flink_sink表数据写入MySQL sink表,实现MySQL数据持续同步。

package com.demo.flink;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class FlinkCdcMySql {
    public static void main(String[] args) {
        System.out.println("==========start run FlinkCdcMySql#main.");

        // 创建Flink流处理运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//        StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("127.0.0.1", 8081);
        // 设置流处理并发数
        env.setParallelism(3);
        // 设置Flink存档间隔时间,单位为ms,当同步发生异常时会恢复最近的checkpoint继续同步
        env.enableCheckpointing(5000);

        final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        // 在Flink中创建中间同步数据库
        tEnv.executeSql("CREATE DATABASE IF NOT EXISTS flink_test");

        // 在Flink中创建中间表flink_source,来源于MySQL表source
        // 注意connector为mysql-cdc
        tEnv.executeSql("CREATE TABLE flink_test.flink_source (\n" +
                "    id int,\n" +
                "    name varchar(255),\n" +
                "    create_time TIMESTAMP\n," + // Flink不支持datetime格式
                "    PRIMARY KEY (id) NOT ENFORCED" + //主键必须标明NOT ENFORCED
                ") WITH (\n" +
                "  'connector'  = 'mysql-cdc',\n" +
                "  'hostname'   = '127.0.0.1',\n" +
                "  'database-name'   = 'flink-source',\n" +
                "  'table-name' = 'source',\n" +
                "  'username'   = 'root',\n" +
                "  'password'   = 'root'\n" +
                ")");

        // 在Flink中创建中间表flink_sink,来源于MySQL表sink
        tEnv.executeSql("CREATE TABLE flink_test.flink_sink (\n" +
                "    id int,\n" +
                "    name varchar(255),\n" +
                "    create_time TIMESTAMP\n," +
                "    PRIMARY KEY (id) NOT ENFORCED" +
                ") WITH (\n" +
                "  'connector'  = 'jdbc',\n" +
                "  'url'        = 'jdbc:mysql://127.0.0.1:3306/flink-sink',\n" +
                "  'table-name' = 'sink',\n" +
                "  'driver'     = 'com.mysql.jdbc.Driver',\n" +
                "  'username'   = 'root',\n" +
                "  'password'   = 'root'\n" +
                ")");

//        Table transactions = tEnv.from("flink_source");
//        transactions.executeInsert("flink_sink");

        System.out.println("==========begin Mysql data cdc.");

        // 将Flink中间表来源表数据写入flink_sink表
        // Flink会根据MySQL binlog中source表变化,动态更新flink_sink表,同时会将flink_sink表数据写入MySQL sink表,实现MySQL数据持续同步
        tEnv.executeSql("INSERT INTO flink_test.flink_sink(id, name, create_time)\n" +
                "select id, name, create_time\n" +
                "from flink_test.flink_source\n");

        System.out.println("==========continue Mysql data cdc.");
    }

}

git代码地址:

flink-cdc-MySQL: FlinkCDC实现MySQL之间数据同步

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/375393.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

03 动力云客项目之登录功能后端实现

创建项目 使用Spring initializr初始化项目 老师讲的是3.2.0, 但小版本之间问题应该不大.

14.0 Zookeeper环球锁实现原理

全局锁是控制全局系统之间同步访问共享资源的一种方式。 下面介绍zookeeper如何实现全民锁&#xff0c;讲解他锁和共享锁两类全民锁。 排他锁 排他锁&#xff08;Exclusive Locks&#xff09;&#xff0c;又被称为写锁或独占锁&#xff0c;如果事务T1对数据对象O1加上排他锁…

2023年09月CCF-GESP编程能力等级认证C++编程一级真题解析

一、单选题(共15题,共30分) 第1题 我们通常说的“内存”属于计算机中的( )。 A:输出设备 B:输入设备 C:存储设备 D:打印设备 答案:C 第2题 以下C++不可以作为变量的名称的是( )。 A:redStar B:RedStar C:red_star D:red star 答案:D 第3题 C++表达式…

Linux C/C++ 原始套接字:打造链路层ping实现

在C/C中&#xff0c;我们可以使用socket函数来创建套接字。我们需要指定地址族为AF_PACKET&#xff0c;协议为htons(ETH_P_ALL)来捕获所有传入和传出的数据包。 可以使用sendto和recvfrom函数来发送和接收数据包。我们需要构建一个合法的链路层数据包&#xff0c;在数据包的头…

C++ //练习 4.23 因为运算符的优先级问题,下面这条表达式无法通过编译。根据4.12节中的表(第147页)指出它的问题在哪里?应该如何修改?

C Primer&#xff08;第5版&#xff09; 练习 4.23 练习 4.23 因为运算符的优先级问题&#xff0c;下面这条表达式无法通过编译。根据4.12节中的表&#xff08;第147页&#xff09;指出它的问题在哪里&#xff1f;应该如何修改&#xff1f; string s "word"; stri…

树莓派4b连接WQ9201外置无线网卡命令行配置详解

树莓派4B连接WQ9201无线网卡 接线方式 蓝色的线来连接树莓派和WQ9201demo板&#xff0c;USB接树莓派的USB接口&#xff0c;microUSB一端接demo板靠近天线部分的microUSB口。 驱动和固件准备 驱动直接放在树莓派系统的任意目录&#xff0c;目前配置则是将驱动放在树莓派的主目…

【漏洞复现】电信网关配置管理系统SQL注入漏洞

Nx01 产品简介 电信网关配置管理系统是一个用于管理和配置电信网络中网关设备的软件系统。它可以帮助网络管理员实现对网关设备的远程监控、配置、升级和故障排除等功能&#xff0c;从而确保网络的正常运行和高效性能。 Nx02 漏洞描述 电信网关配置管理系统存在SQL注入漏洞,攻…

【华为云】云上两地三中心实践实操

写在前面 应用上云之后&#xff0c;如何进行数据可靠性以及业务连续性的保障是非常关键的&#xff0c;通过华为云云上两地三中心方案了解相关方案认证地址&#xff1a;https://connect.huaweicloud.com/courses/learn/course-v1:HuaweiXCBUCNXI057Self-paced/about当前内容为华…

《动手学深度学习(PyTorch版)》笔记7.4

注&#xff1a;书中对代码的讲解并不详细&#xff0c;本文对很多细节做了详细注释。另外&#xff0c;书上的源代码是在Jupyter Notebook上运行的&#xff0c;较为分散&#xff0c;本文将代码集中起来&#xff0c;并加以完善&#xff0c;全部用vscode在python 3.9.18下测试通过&…

苹果macbook电脑删除数据恢复该怎么做?Mac电脑误删文件的恢复方法

苹果电脑删除数据恢复该怎么做&#xff1f;Mac电脑误删文件的恢复方法 如何在Mac上恢复误删除的文件&#xff1f;在日常使用Mac电脑时&#xff0c;无论是工作还是娱乐&#xff0c;我们都会创建和处理大量的文件。然而&#xff0c;有时候可能会不小心删除一些重要的文件&#x…

Stata学习(1)

一、五大窗口 Command窗口&#xff1a;实现人机交互 来导入一个自带数据&#xff1a; sysuse是导入系统自带的数据&#xff0c;auto导入该数据的名称&#xff0c;后面的clear是清除之前的数据 结果窗口&#xff1a;展示计算结果、查找功能 在Edit的find可以实现查找功能&#…

企业飞书应用机器人,使用python发送图文信息到群

企业飞书应用的自动化&#xff0c;需要创建企业应用&#xff0c;应用开通机器人能力&#xff0c;并获取机器人所需的app_id与app_secret&#xff08;这一部分大家可以在飞书的控制台获取&#xff1a;https://open.feishu.cn/api-explorer/&#xff09; 文章目录 步骤1&#xff…

第 383 场 LeetCode 周赛题解

A 边界上的蚂蚁 模拟 class Solution { public:int returnToBoundaryCount(vector<int> &nums) {int s 0;int res 0;for (auto x: nums) {s x;if (s 0)res;}return res;} };B 将单词恢复初始状态所需的最短时间 I 枚举&#xff1a;若经过 i i i 秒后 w o r d w…

c语言--指针运算

目录 一、指针-整数二、指针-指针2.1条件2.2两个指针指向同一块空间代码2.2.1运行结果 2.3两个指针指向不同块空间代码2.3.1运行结果 2.4总结 三、指针的关系运算3.1代码3.1.1运行结果3.1.2分析 一、指针整数 用数组举例&#xff1a; 因为数组在内存中是连续存放的&#xff0c…

证券公司vip快速交易通道是什么?是免费的吗?

券商VIP快速交易通道&#xff0c;又称为“快速通道”&#xff0c;是券商向交易所申请后提供给客户的一种特别服务。这是一种独立的交易单元&#xff0c;具有更快的传输速度和更高的优先级。在快速交易通道中&#xff0c;客户的订单数据会通过极速交易柜台发起&#xff0c;走的是…

vscode的ssh忽然连不上服务器:远程主机可能不符合glibc和libstdc++ VS Code服务器的先决条件

vscode自动更新了一下就发现连不上服务器了&#xff0c;我寻思估计一大堆人都寄了&#xff0c;一搜&#xff0c;果然哈哈哈哈 然后我直接搜一天内新发布的博客&#xff0c;还真给我搜到了这个问题&#xff0c;按照这个问题里面的回答&#xff08;vscode1.86无法远程连接waitin…

ES6扩展运算符——三个点(...)用法详解

目录 1 含义 2 替代数组的 apply 方法 3 扩展运算符的应用 &#xff08; 1 &#xff09;合并数组 &#xff08; 2 &#xff09;与解构赋值结合 &#xff08; 3 &#xff09;函数的返回值 &#xff08; 4 &#xff09;字符串 &#xff08; 5 &#xff09;实现了 Iter…

Git基础命令,分支,标签的使用【快速入门Git】

Git基础命令&#xff0c;分支&#xff0c;标签的使用【快速入门Git】 Git基础常用命令Git工作流程工作区&#xff0c;暂存区和版本库文件状态获取Git仓库 git init | git clone查看文件状态 git status暂存已修改的文件 git add 查看已暂存和未暂存的修改 git diff提交文件更改…

数据结构——D/二叉树

&#x1f308;个人主页&#xff1a;慢了半拍 &#x1f525; 创作专栏&#xff1a;《史上最强算法分析》 | 《无味生》 |《史上最强C语言讲解》 | 《史上最强C练习解析》 &#x1f3c6;我的格言&#xff1a;一切只是时间问题。 ​ 1.树概念及结构 1.1树的概念 树是一种非线性的…

excel 导出 The maximum length of cell contents (text) is 32767 characters

导出excel报错。错误日志提示&#xff1a;:The maximum length of cell contents (text) is 32767 characters 排查后&#xff0c;发现poi有单元格最大长度校验&#xff0c;超过32767会报错。 解决方案&#xff1a; 通过java反射机制&#xff0c;设置单元格最大校验限制为Int…