参考视频:
【尚硅谷】大数据Debezium开源工具(安装/部署/使用)_哔哩哔哩_bilibili
介绍:
- Debezuim是用于捕获变更数据的开源工具,可以响应数据库的所有插入,修改,删除操作。
- 它是一种CDC工具,工作原理和大家所熟悉的Canal,Maxwell一样,均是抽取 数据库日志 来变更的
- Debezuim是构建在 Apach Kafka之上,并提供Kafka连接器来监视特定的数据库管理
- 官网: https://debezuim.io/
架构
1 基于Kafka connector 部署Debezuim
1.1 source connector : 负责把数据库中的记录发送到Kafka(Debezuim)
1.2 sink connector : 负责把kafka topic中的数据发送到其他系统,如flume,es等,这个就不是debezuim做的了
2 使用Debezuim Server 部署 Debezuim(用的不多)
Debezuim服务器是一个可配置的,随时可用的应用程序。它将变化数据从源数据库流式传输到其他系统(目前只支持sink到kinesis)
3 嵌入式引擎(最大的优点)
Debezuim 作为嵌入式引擎,内置到其他应用中,然后就可以实时读取数据库的变化数据
目前比较火的Flink的CDC,就是Debezuim
安装
准备:
Mysql,Kafka,Zookeeper
1 数据库准备
1.1 mysql开启binlog
sudo vim /etc/my.cnf
[mysqld]
server-id=1
log-bin=mysql-bin
binlog_format=row
注意:
不可修改my.cnf的权限,否则mysql就会忽略这个配置文件
修改之后重启mysql: sudo systemctl restart mysqld
检查binlog是否开启成功: mysql -uroot -paaaaaa -e "show variables like '%log_bin%'"
查看到 log_bin 为 on 状态
2 准备测试的数据库和表
3 安装Mysql Connector
3.1 下载
mysql connector : debezuim-connector-mysq-1.7.1.Final-plugin.tar.gz
3.2 解压
mkdir -p /opt/.../debezuim/connector # 创建一个目录,用于解压
tar -zxvf debezuim-connector-mysq-1.7.1.Final-plugin.tar.gz -C /opt/.../debezuim/connector
4 Kafka里配置
cd ../kafka-2.4.1/config
vim connect-distributed.properties
[distributed.properties]
#kafka地址
bootstrap.servers=ip:9092,ip2:9092,ip3:9092
group.id= #自己配置
# 默认都是json格式,不做更改
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# 是否存schema格式,格式会更复杂,根据需要可以开启或关闭
key.converter.schemas.enable=false
value.converter.schemas.enable=false
#offset的副本数,安全点设置成2
offset.storage.partitions=2
#放开插件的参数,配置到这一步即可,再往下配置会识别不出来
plugin.path=/opt/.../debezuim/connector
分发配置文件到其他集群,重启Kafka集群
5 启动各个组件
启动 mysql connector
/opt/.../kafka-2.4.1/bin/connect-distributed.sh -daemon /opt/.../kafka-2.4.1/config/connect-distributed.properties
jps
查看ConnectDistributed是否存在
如果报错没有起来,则查看日志
cd /opt/.../kafka-2.4.1/log
cat connectDistributed.out
6 检测Kafka Connector是否正常工作
# 检测kafka连接器的服务状态
curl -H 'Accept:application/json' ip:8083
# 检查像kafka connect 注册的 连接器列表
curl -H 'Accept:application/json' ip:8083/connectors
7 部署 Debezuim Mysql Connector
配置信息
{
# 连接器的名字
“name”:"mydebezuim-mysql-connector",
"config":{
"connector.class": "io.debezuim.connector.mysql.MySqlConnector", #写死的
"database.host":"ip", # 数据库的所在地址
"database.port":"3306",
"database.user":"root",
"database.password":"123456",
"database.server.id":"184054", # 连接器作为slave来工作,不和主机id一样即可
"database.server.name":"bigdata", # 服务器名,会成为topic的前缀
"database.include.list":"database1,database2", #要监控的数据库列表,不列则监控所有
"database.history.kafka.bootstrap.servers":"hadoop2:9092", # 存一些历史数据
"database.history.kafka.topic ":"schema-changes.inventory", # schema的一些配置
}
}
生成的topic : [database.server.name].[database.include.list].[表名] ,
eg : bigdata.database1.student
注册连接信息
curl -i -X POST -H "Appect:application/json" -H "Connect-Type:application/json" ip/8083/connectors/ -d '上面的配置信息json'
快照:
debezuim第一次启动,默认情况会把mysql的数据全都采集过来(以后只监控变化数据,再次重启也不会有快照),称为快照(canal没有此功能)
注意:
oracle数据库不推荐,可能会出现一些问题
问答:
1 Debezuim能不能同步历史数据?
答: 可以。使用参数: snapshot.mode=initial 来控制,当第一次监控某个表的时候会先同步所有历史数据,然后再通过mysql的binlog监控所有变更的数据
2 Debezuim同步历史数据(做快照)的时候,这个时候如果有向Mysql的表写数据怎么办?
答:给表加锁: snapshot.locking.mode=minimal
3 默认是一个表一个topic,是否支持某个库下所有的表进入同一个topic?
答:支持,使用topic.routing.但有前提条件,这些表需要有相同的schema,也就是表结构
可参考文档:
Debezium是什么-CSDN博客实时数据同步利器debezium教程_debezium server-CSDN博客Debezium是什么-CSDN博客
CDC工具之Debezium-CSDN博客
spring 结合debezium :Debezium的基本使用(以MySQL为例)-CSDN博客