Opengauss到Oracle增量同步, 使用debezium

一、概述

PGOracle的同步方案使用debezium + kafka + kafka-connect-jdbcdebezium是一款开源的变更捕获软件,它以kafkaconnector形式运行,可以捕获PostgreSQLMySQLOracle中的变更数据,保存到kafkakafka-connect-jdbcconfluent公司的一款开源软件,以connector形式运行,可以从kafka读取变更数据,转换为PostgreSQLMySQLOracleSQLServerDB2等数据库的SQL语句,通过JDBC连接到数据库。

本方案用到的软件都是从开源代码编译而来,编译过程在第6章节。

二、kafkakafka connect

概述

kafka是一个分布式消息队列服务器。从逻辑角度讲,消息存储在称为topic的对象里(可以理解为一个topic就是一个消息队)kafka的客户端读/写消息时,需要指定topic名称。
可以用命令或代码创建topic、配置/查看属性、消息数。topic中的消息被读取后并没有删除,其它客户端仍可读取。客户端读取topic时,除了名称还需指定group id对象,它用于保存此客户端已经读到的消息的位置(offset)kafka是通过group id中的offset,来管理不同客户端对同一topic消费的不同进度。kafka把所有group idoffset保存在名为__consumer_offsettopic下。可以通过命令修改某个group idoffset。消息可以设置保存时间,超时自动删除,但无法用命令删除。topic中的消息只能以先进先出(FIFO)方式读取。

kafka的启动和配置

kafka的运行依赖zookeeper,集群中需要有zookeeper服务器,zookeeper是一个分布式数据库,一般用来存储集群中需要各节点共享的信息,保证信息的一致性,也可以用来实现分布式锁。

我下载的二进制kafka中包含了zookeeper服务器,在配置文件config/zookeeper.properties中,设置一下dataDir,作为实验其它参数默认即可:

注意,zookeeper服务器默认监听端口是2181

zookeeper和kafka都是java程序,注意配置一下java运行环境,我使用的是Java HotSpot 1.8,只要配置好JAVA_HOME和PATH两个环境变量即可,JAVA_HOME指向JDK解压后的目录,启动脚本会优先到JAVA_HOME里去找Java运行环境。

启动zookeeper,在kafka根目录下执行:

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

kafka的配置文件是config/server.properties,设置log.dirs,这是kafka存消息数据的目录,会占用较大磁盘,作为实验其它参数默认:

启动kafka,在kafka根目录下执行:

bin/kafka-server-start.sh -daemon config/server.properties

注意,kafka服务器默认监听端口是9092,kafka和zookeeper,客户端和kafka直接都是通过网络读写数据的。

我们用到的关于zookeeper、kafka、connect的脚本都在kafka的bin目录下:

消息的读写:topicgroup id 相关命令

创建topic

bin/kafka-topics.sh --create --topic my-test-topic --bootstrap-server 127.0.0.1:9092 --partitions 1 --replication-factor 1

查看指定topic属性:

bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 -describe --topic my-test-topic

修改topic属性:

bin/kafka-configs.sh --bootstrap-server 127.0.0.1:9092 --entity-type topics --alter --entity-name my-test-topic --add-config retention.ms=1000

bin/kafka-configs.sh --bootstrap-server 127.0.0.1:9092 --entity-type topics --alter --entity-name my-test-topic --delete-config retention.ms

列出所有topic

bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list

删除topic

bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --delete --topic my-test-topic

查看topic中有几条消息:

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 127.0.0.1:9092 --topic my-test-topic

查看有哪些消费者组:

./bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list

查看消费者组的偏移信息:

./bin/kafka-consumer-groups.sh --describe --bootstrap-server 127.0.0.1:9092 --group connect-sink-oracle

可以看到connect-sink-oracle中记录了多个topic的偏移量信息,LOG-END-OFFSET表示这个topic中总共有多少消息,CURRENT_OFFSET表示消费者当前已读取的消息偏移量,LAG就是还剩多少条消息没读,这个命令可以查看同步的进度。 

设置某个topic的CURRENT_OFFSET:

./bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group connect-sink-oracle  --topic my-test-topic --execute --reset-offsets --to-earliest

可以使用bin目录下的kafka-console-producer.sh和kafka-console-consumer.sh,向kafka收发消息:

./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic <topic-name>

./bin/kafka-console-consumer.sh --from-beginning --bootstrap-server localhost:9092 --topic <topic-name>

--from-beginning表示从topic的第一条消息开始读,否则只读最后一条消息之后的。

常用于调试,例如查看debezium在PG上做snapshot或捕获数据的状态:

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic connect-offsets

kafka connect

kafka connectkafka的一个组件,是一个独立于kafka服务器的Java程序,它可以统一管理多个connector注意connectconnector不同),connector用于连接不同数据库,读取数据库写到kafka,或将kafka中的消息写到数据库。一个connectorkafka的生产者或消费者客户端,是jar形式的java程序,放在kafkalibs目录下。从编程角度讲connector实现了kafka connectsinksource接口,kafka connect启动时会加载它们。用户可以通过kafka connectREST API接口配置connector的参数。

实现source接口的connector,是kafka的生产者,连接源数据库,从源数据库获取数据写到kafka

实现sink接口的connector,是kafka的消费者,连接目标数据库,从kafka读取数据,写入目的数据库。

connector写到kafka前,数据可以序列化(压缩),从kafka读出后再反序列化(解压),以降低网络和存储开销,这个工作由称为converter的Java程序来做,它以jar形式被connect加载

一个kafka connect进程称为一个workersource connectorsink connector都运行在这个进程中。

配置和启动kafka connect

kafka connect的配置文件在config/connect-distributed.properties,作为实验参数使用默认值就行。

注意:

  1. bootstrap.serverskafka服务器的IP:PORT
  2. group.idconnect worker作为消费者所使用的group id不能与sink connectorgroup id相同。
  3. plugin.path指定了connect到哪些目录下寻找jar。jdbcconnectorconverter都是jar形式的程序。为了实验简单,只指定kafka根目录下的libs,把所有用到的jar都复制到这里,建议使用绝对路径。

启动connect

./bin/connect-distributed.sh -daemon config/connect-distributed.properties

​​​​​​​​​​​​​​配置和启动connector

kafka connect启动时connctorconverterjar已被加载,还需要通过REST API配置和启动connector

下面的命令创建/启动了一个connector

curl -H "Content-Type: application/json" -X POST http://127.0.0.1:8083/connectors -d '@sink-oracle-156.json'

connect启动以后在8083端口监听REST API命令,向这个端口以http post发送json格式的配置数据,'@sink-oracle-156.json'指定了文件路径为当前目录下sink-oracle-156.json文件,是json格式文本,内容如下:

是source类型还是sink类型,取决于connector.class指向的类是实现source接口还是sink接口。这个connector的名称是sink-oracle,实现了sink接口(类io.confluent.connect.jdbc.JdbcSinkConnector实现了sink类),key.converter和value.converter设定所使用的的converter。

删除/停止名为sink-oracle的connector:

curl -X DELETE http://127.0.0.1:8083/connectors/sink-oracle

查看所有connector:

curl -X GET http://127.0.0.1:8083/connectors

查看某个connector状态:

curl -X GET http://127.0.0.1:8083/connectors/<connector name>/status|jq

配置和部署avro-converterschema-registry

源数据库的每一条变更记录,默认被转换成了json字符串,包含了元数据和数据,然后发送到kafka,下面就是update emp set ename = 'TOMAS', job = 'ENGINEER' where empno=2 and ename='SMITH' 对应的json字符串:

这样有很大冗余,有两个办法可以降低冗余:
一、使用压缩的序列化格式;
二、使用元数据服务器共享元数据,用key值获得某个表的元数据,消息只携带数据和key值,消费者使用key值查询完整的元数据来解析消息。
avro-converterschema-registry共同完成这个工作。本方案的使用的avro-converterschema-registry来自confluentinc公司的开源代码,编译过程见后面章节。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/151154.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

第一型曲面积分的第二型曲面积分的区别与联系【从几何知识的角度思考】

此处为曲线积分------>【问题思考总结】第一型曲线积分和第二型曲线积分的区别与联系【从几何知识的角度思考】 一二型曲面积分有什么区别&#xff1f;&#xff08;了解&#xff09; 一型曲面积分&#xff1a; 由dS进行表示。可以想像&#xff0c;dS是一个面积微元&#x…

MySQL数据库清理Relay_Log_File日志

背景 “Relay_Log_File” 是 MySQL 中用于复制的参数之一。在 MySQL 复制中&#xff0c;当一个服务器作为主服务器&#xff08;master&#xff09;时&#xff0c;它会将其更改写入二进制日志文件&#xff08;binary log file&#xff09;。而另一个服务器作为从服务器&#xf…

阿里云99元VS腾讯云88元,双11云服务器价格战,谁胜谁负?

在2023年的双十一优惠活动中&#xff0c;阿里云推出了一系列令人惊喜的优惠活动&#xff0c;其中包括99元一年的超值云服务器。本文将带您了解这些优惠活动的具体内容&#xff0c;以及与竞争对手腾讯云的价格对比&#xff0c;助您轻松选择最适合的云服务器。 99元一年服务器优…

每日汇评:积极的数据可能会推动澳元/美元的上涨

继 9 月份增加 6700 个就业岗位之后&#xff0c;澳大利亚 10 月份预计将增加 18000 个就业岗位&#xff1b; 失业率预计将从 3.6% 升至 3.7%&#xff0c;维持在历史低点附近&#xff1b; 澳元/美元在美元疲软的支撑下维持看涨基调&#xff0c; 其面临关键阻力位0.6520&#xff…

【Linux】初识网络

目录 背景 协议 什么是协议 协议分层 OSI七层模型 TCP/IP模型 网络协议栈与 OS 的关系 网络传输 局域网中直接通信 数据的封装与分用 局域网通信原理 数据碰撞 跨路由器进行远端通信 IP的介绍 传输演示 背景 &#x1f9ca;一开始&#xff0c;计算机都是一台台独…

Java实现拼图游戏

拼图游戏是一种智力类游戏&#xff0c;玩家需要将零散的拼图块按照一定的规律组合起来&#xff0c;最终拼成完整的图案。拼图游戏的难度可以根据拼图块数量、拼图的形状、图案的复杂程度等因素来调整。这种游戏适合各个年龄层的玩家&#xff0c;能够提高大脑的观察力、空间感知…

WPF xaml Command用法介绍

WPF (Windows Presentation Foundation) 中的命令设计模式是一种用于分离用户界面逻辑和业务逻辑的方法。在WPF中&#xff0c;这种模式通过命令接口&#xff08;如 ICommand&#xff09;实现&#xff0c;使得用户界面组件&#xff08;如按钮、菜单项等&#xff09;可以触发不直…

工厂自动化中DCS软件

概述 Monitor.Analog是新一代运行监控系统&#xff0c;是物联网时代数据驱动的智能工厂的神经中枢。通过连接到阿自倍尔专有的在线故障预测系统&#xff08;该系统利用 AI&#xff08;人工智能&#xff09;&#xff09;以及利用来自各个智能设备的监控和诊断数据的系统&#x…

Unity Meta Quest 一体机开发(六):HandGrabInteractor 和 HandGrabInteractable 知识点

文章目录 &#x1f4d5;教程说明&#x1f4d5;HandGrabInteractor⭐HandGrabAPI⭐HandWristPoint⭐GripPoint⭐PinchPoint⭐PinchArea⭐HandGrabVisual⭐HandGrabGlow &#x1f4d5;HandGrabInteractable⭐Support Grab Type⭐Pinch Grab Rules 和 Palm Grab Rules⭐Unselect M…

2023版Idea创建JavaWeb时,右键new没有Servlet快捷键选项

问题&#xff1a;右键时&#xff0c;没有创建servlet的快捷键&#xff0c;如下图&#xff1a; 解决方法&#xff1a; 1.打开idea&#xff0c;点击File>settings(设置)&#xff0c;进入settings页面&#xff0c;如下 从上图中的Files选项中没看到有servlet选项&#xff0c;…

拿到信创天翼云电脑账号后,我又傻眼了...

在《面向国产系统的 App 发布&#xff0c;含泪总结》中&#xff0c;我就吐槽过信创产品的不靠谱。用户购买一台终端&#xff0c;都没法用&#xff0c;得经历复杂的账号申请。 紧催慢催&#xff0c;等待了半个月之后&#xff0c;今天终于拿到了账号。然而&#xff0c;满怀期待登…

人工智能+游戏 会带来什么

“人工智能游戏” 写在前面 随着人类生活水平的日益提高&#xff0c;游戏正在为越来越多的人们带去欢乐。同时&#xff0c;作为21世纪新兴科学技术的人工智能&#xff0c;也正在研究人员的努力下不断向前突破。那么&#xff0c;这两列高速前进的“火车”能否接轨并行呢&#…

【数据结构】线段树(点修区查)

数据结构-线段树&#xff08;点修区查&#xff09; 前置知识 分治递归二叉树 思路 我们需要维护一个支持单点修改&#xff0c;区间查询的数据结构&#xff0c;并且要求在线&#xff0c;一般使用线段树解决。 线段树是一个二叉树形的数据结构。 线段树的思想很简单&#xff0c…

算法学习打卡day45|动态规划:股票问题总结

Leetcode股票问题总结篇 动态规划的股票问题一共六道题&#xff0c;买卖股票最佳时机和买卖股票手续费都是一个类型的问题&#xff0c;维护好买入和卖出两个状态即可&#xff0c;方法一摸一样。而冷冻期也差不多就是状态多了点&#xff0c;买入、保持卖出、当日卖出、以及冷冻期…

OpenGL_Learn12(光照)

续OpenGL_Learn11&#xff08;光照&#xff09;-CSDN博客 1. 镜面高光 和漫反射光照一样&#xff0c;镜面光照也决定于光的方向向量和物体的法向量&#xff0c;但是它也决定于观察方向&#xff0c;例如玩家是从什么方向看向这个片段的。镜面光照决定于表面的反射特性。 我们通…

IDEA没有Add Framework Support解决办法

点击File—>Settings 点击第一个设置快捷键 点击apply和ok即可 我们要点击一下项目&#xff0c;再按快捷键ctrlk 即可

LeetCode(15)分发糖果【数组/字符串】【困难】

目录 1.题目2.答案3.提交结果截图 链接&#xff1a; 135. 分发糖果 1.题目 n 个孩子站成一排。给你一个整数数组 ratings 表示每个孩子的评分。 你需要按照以下要求&#xff0c;给这些孩子分发糖果&#xff1a; 每个孩子至少分配到 1 个糖果。相邻两个孩子评分更高的孩子会获…

Unity反编译:IL2CPP 打包输出的cpp文件和dll(程序集)位置、Mono打包输出的dll(程序集)位置

目录 如题&#xff1a;IL2CPP 打包输出的cpp文件和dll位置(并不会出现在APK里) 如题&#xff1a;Mono打包输出的dll位置 校验平台&#xff1a;Android 如题&#xff1a;IL2CPP 打包输出的cpp文件和dll位置(并不会出现在APK里) Unity Assets同级目录下 Temp/StagingArea/Il2…

Django视图层

视图层 django视图层&#xff1a;Django项目下的views.py文件&#xff0c;它的内部是一系列的函数或者是类,用来处理客户端的请求后处理并返回相应的数据 三板斧 HttpResponse # 返回字符串 render # 返回html页面&#xff0c;并且在返回浏览器之前还可以给html文件…

PCA降维Python demo

读这篇15年CVPR的文章&#x1f923;&#x1f923;&#x1f923;&#x1f923;&#x1f923; inproceedings{liu2015sparse,title{Sparse convolutional neural networks},author{Liu, Baoyuan and Wang, Min and Foroosh, Hassan and Tappen, Marshall and Pensky, Marianna},…
最新文章