【Flink-cdc-Mysql-To-Kafka】使用 Flinksql 利用集成的 connector 实现 Mysql 数据写入 Kafka

【Flink-cdc-Mysql-To-Kafka】使用 Flinksql 利用集成的 connector 实现 Mysql 数据写入 Kafka

  • 1)环境准备
  • 2)准备相关 jar 包
  • 3)实现场景
  • 4)准备工作
    • 4.1.Mysql
    • 4.2.Kafka
  • 5)Flink-Sql
  • 6)验证

1)环境准备

Linux 或者 Windows 端需要安装:Mysql,Kafka,Flink 等。(略)

2)准备相关 jar 包

  • flink-connector-jdbc_2.11-1.12.0.jar
  • mysql-connector-java-5.1.49.jar

下载地址:JDBC-Sql-Connector

在这里插入图片描述

在这里插入图片描述

  • flink-format-changelog-json-1.2.0.jar
  • flink-sql-connector-mysql-cdc-1.2.0.jar
  • flink-sql-connector-postgres-cdc-1.2.0.jar

下载地址:ververica/flink-cdc-connectors

在这里插入图片描述

备用下载地址:gitee地址(github上不去就下载源码,改好version自己打包)

  • flink-sql-connector-kafka_2.11-1.12.0.jar

下载地址:flink-sql-connector-kafka

  • 将下载好的包放在 Flink 的 lib 目录下

3)实现场景

1、首先确认MySQL是否开启binlog机制,log_bin = ON 为开启 (如下图)

在这里插入图片描述

2、如果是本地环境的 Mysql 按照下面方式开启 binlog

在 C:\ProgramData\MySQL\MySQL Server 5.7\my.ini 下添加

log_bin = mysql-bin
binlog_format = ROW
expire_logs_days = 30

3、重启 Mysql 服务

4)准备工作

4.1.Mysql

1、在 Mysql 中创建 source 表:

CREATE TABLE `mysql2kafka_cdc_test` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `eventId` varchar(255) DEFAULT NULL,
  `eventStDt` varchar(255) DEFAULT NULL,
  `bak6` varchar(255) DEFAULT NULL,
  `bak7` varchar(255) DEFAULT NULL,
  `businessId` varchar(255) DEFAULT NULL,
  `phone` varchar(255) DEFAULT NULL,
  `bak1` varchar(255) DEFAULT NULL,
  `bak2` varchar(255) DEFAULT NULL,
  `bak13` varchar(255) DEFAULT NULL,
  `bak14` varchar(255) DEFAULT NULL,
  `bak11` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8

2、写入数据的语句准备就绪

INSERT INTO mysql2kafka_cdc_test(
eventId,
eventStDt,
bak6,
bak7,
businessId,
phone,
bak1,
bak2,
bak13,
bak14,
bak11
) VALUES(
'111',
'2022-11-3023:37:49',
'测试',
'https://test?user',
'1727980911111111111111111111',
'12345678910',
'1234',
'2021-12-0100:00:00',
'1727980911111111111111111111',
'APP',
'TEST1'
);

4.2.Kafka

创建 Topic

5)Flink-Sql

  • source
set table.dynamic-table-options.enabled=true;
set table.exec.source.cdc-events-duplicate=true;

CREATE TABLE source_mysql_test(
  id INT,
  eventId STRING,
  eventStDt STRING,
  bak6 STRING,
  bak7 STRING,
  businessId STRING,
  phone STRING,
  bak1 STRING,
  bak2 STRING,
  bak13 STRING,
  bak14 STRING,
  bak11 STRING,
  PRIMARY KEY (id) NOT ENFORCED
) WITH(
    'connector' = 'mysql-cdc',
    'hostname' = '${ip}',
    'port' = '${port}',
    'database-name' = 'test',
    'table-name' = 'mysql2kafka_cdc_test',
    'username' = '${username}',
    'password' = '${password}',
    'scan.startup.mode'='timestamp',
    'scan.startup.timestamp-millis' = '1692115200000'
);
  • sink
CREATE TABLE sink_kafka_test (
  id INT,
  eventId STRING,
  eventStDt STRING,
  bak6 STRING,
  bak7 STRING,
  businessId STRING,
  phone STRING,
  bak1 STRING,
  bak2 STRING,
  bak13 STRING,
  bak14 STRING,
  bak11 STRING,
  PRIMARY KEY (id) NOT ENFORCED
 ) WITH (
    'connector' = 'upsert-kafka',
    'topic' = 'test',
    'sink.parallelism' = '3',
    'key.format' = 'json',
    'value.format' = 'json',
    'properties.bootstrap.servers' = '${kafka-bootstrap-server}',
    'properties.security.protocol' = 'SASL_PLAINTEXT',
    'properties.sasl.kerberos.service.name' = 'kafka',
    'metadata.max.age.ms' = '300000'
);
  • insert
insert into sink_kafka_test select * from source_mysql_test;

6)验证

Mysql 中写入测试数据,Kafka-Topic 中观察是否有数据生成。

INSERT INTO mysql2kafka_cdc_test(
eventId,
eventStDt,
bak6,
bak7,
businessId,
phone,
bak1,
bak2,
bak13,
bak14,
bak11
) VALUES(
'111',
'2022-11-3023:37:49',
'测试',
'https://test?user',
'1727980911111111111111111111',
'12345678910',
'1234',
'2021-12-0100:00:00',
'1727980911111111111111111111',
'APP',
'TEST1'
);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/251411.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

毅速:3D打印随形水路 提高良品率和生产效率的新利器

随着科技的不断发展,3D打印技术已经成为模具制造领域的一种重要技术。其中,模具随形水路的设计和制造是提高注塑产品良品率和生产效率的关键环节。 模具随形水路是一种根据产品形状设计的水路,可以更靠近产品,并在模具内热点集中区…

这一篇就够了!全套SpringBoot教程02

SpringBoot运维实用篇 基础篇发布以后,看到了很多小伙伴在网上的留言,也帮助超过100位小伙伴解决了一些遇到的问题,并且已经发现了部分问题具有典型性,预计将有些问题在后面篇章的合适位置添加到本套课程中,作为解决方…

【docker 】Compose 使用介绍

Docker Compose Docker Compose文档 Docker Compose GitHub地址 Docker Compose 是用于定义和运行多容器 Docker 应用程序的工具。通过 Compose,您可以使用 YML 文件来配置应用程序需要的所有服务。然后,使用一个命令,就可以从 YML 文件配…

【图的应用一:最小生成树】- 用 C 语言实现普里姆算法

目录 一、最小生成树 二、普里姆算法的构造过程 三、普里姆算法的实现 一、最小生成树 假设要在 n 个城市之间建立通信联络网,则连通 n 个城市只需要 n - 1 条线路。这时,自然会考虑这样一个问题,如何在最节省经费的前提下建立这个通信…

针对网页html中插入动图gif不能循环播放只播放一次的解决方案

针对网页html中插入动图gif不能循环播放只播放一次的解决方案 原因分析解决方案 原因分析 使用图片编辑软件制作的过程中未启用“循环播放”功能,这里以Photoshop为例,演示设置GIF图片循环播放的操作流程:所需材料:PS。第一步&am…

使用Audition录制电脑内部声音

在电脑上播放的媒体文件,包括视频和声音,很多是可以播放却无法保存的。例如一些网页播放的视频,或者在线播放的音乐。 视频的话,可以使用工具来截图,抓取GIF或录屏。 声音的话,也可以使用工具进行录制。这里…

【算法刷题】Day17

文章目录 1. 不同路径 II题干:算法原理:代码: 2. 在排序数组中查找元素的第一个和最后一个位置题干:算法原理:解法一:暴力解法 O(n)解法二:朴素二分解法三:查找区间左右端点 代码&am…

redis未授权漏洞复现

什么是redis redis就是个数据库,跟mysql不同的地方在于redis主要将数据存在内存中,读写速度非常快 redis未授权 其原因很简单,就是redis服务器在默认安装好不配置的情况下可以直接免密码登录,登录后在web目录写入一句话木马&am…

为什么选择国产WordPress:HelpLook的优势解析

如今网站建设可以说已经是企业必备。而在众多的网站建设工具中,WordPress无疑是其中的佼佼者。作为一款开源的CMS(内容管理系统),WordPress拥有丰富的插件和主题,以及强大的功能,使得用户可以轻松地构建出符…

web前端之若依二次开发经验、使用IDEA启动若依项目、sysConfigController报错提示的解决办法、环境搭建

MENU 前言前端创建路由的细节 后端启动后端项目细节 前言 1、官网地址 2、在线文档 3、演示地址 4、代码下载 5、野生版的若依开发文档 此文章包括前端和后端,记录开发中遇到的一些问题。 前端 创建路由的细节 1、从系统管理进入菜单管理页面创建菜单,菜…

最强Pose模型RTMO开源 | 基于YOLO架构再设计,9MB+9ms性能完爆YOLO-Pose

实时多人在图像中的姿态估计面临着在速度和精度之间实现平衡的重大挑战。尽管两阶段的上下文方法在图像中人数增加时会减慢速度,但现有的单阶段方法往往无法同时实现高精度和实时性能。 本文介绍了RTMO,这是一个单阶段姿态估计框架,通过在YOL…

03进程基础-学习笔记

Process 进程 进程为操作系统的基本调度单位,占用系统资源(cpu,内存)完成特定任务,所有说进程是操作系统的标准执行单元 进程与程序的差别 程序是静态资源,存储与电脑磁盘中(disk磁盘资源)程序执行后会创建进程,负责完成功能&a…

第十五章总结

一.输入/输出流 1.输入流 InputStrema类是字节输入流的抽象类,它是所有字节输入流的父类。 该类中所有方法遇到错误都会引发IOException异常。 read()方法:从输入流中读取数据的下一个字节。返回0~255的int字节值。如果因为已经到达流末尾而没有可用的…

完美解决labelimg xml转可视化中文乱码问题,不用matplotlib

背景简述 我们有一批标注项目要转可视化,因为之前没有做过,然后网上随意找了一段代码测试完美(并没有)搞定,开始疯狂标注,当真正要转的时候傻眼了,因为测试的时候用的是英文标签,实…

重生奇迹mu再生原石介绍

再生原石的作用: 可以通过坎特鲁提炼之塔的NPC艾尔菲丝提炼成功就可以可获得再生宝石。 再生原石的用法: 1、打怪获得再生原石去提炼之塔(进入坎特鲁遗址的141188位置的传送台)。 2、找到(艾儿菲丝)把原…

【程序】STM32 读取光栅_编码器_光栅传感器_7针OLED

文章目录 源代码工程编码器基础程序参考资料 源代码工程 源代码工程打开获取: http://dt2.8tupian.net/2/28880a55b6666.pg3这里做了四倍细分,在屏幕上显示 速度、路程、方向。 接线方法: 单片机--------------串口模块 单片机的5V-------…

【JAVA基础(对象和封装以及构造方法)】----第四天

对象和封装以及构造方法 面向对象和面向过程面向过程面向对象 类与对象及其使用定义类创建一个对象,操作类补充(成员变量和局部变量) private 修饰类 封装练习编写类编写测试输出结果 面向对象和面向过程 面向过程 在了解面向对象之前先来了…

C语言刷题每日一题——求1到100中包含数字9的整数的个数

思路分析 创建一个变量count记录个数使用一个for循环完成从1到100的循环每次循环判断该数字是否包含数字9——第一种情况 :个位包含9,即求模10的结果为9 ;第二种情况:十位包含9,即除以10的结果为9(两种情况…

【Vulnhub 靶场】【VulnCMS: 1】【简单】【20210613】

1、环境介绍 靶场介绍:https://www.vulnhub.com/entry/vulncms-1,710/ 靶场下载:https://download.vulnhub.com/vulncms/VulnCMS.ova 靶场难度:简单 发布日期:2021年06月13日 文件大小:1.4 GB 靶场作者:to…

Stable Diffusion - High-Resolution Image Synthesis with Latent Diffusion Models

Paper name High-Resolution Image Synthesis with Latent Diffusion Models Paper Reading Note Paper URL: https://arxiv.org/abs/2112.10752 Code URL: https://github.com/CompVis/latent-diffusion TL;DR 2021 年 runway 和慕尼黑路德维希马克西米利安大学出品的文…