Flink CDC系列之:TiDB CDC 导入 Elasticsearch

Flink CDC系列之:TiDB CDC 导入 Elasticsearch

  • 一、通过docker 来启动 TiDB 集群
  • 二、下载 Flink 和所需要的依赖包
  • 三、在TiDB数据库中创建表和准备数据
  • 四、启动Flink 集群,再启动 SQL CLI
  • 五、在 Flink SQL CLI 中使用 Flink DDL 创建表
  • 六、Kibana查看ElasticSearch数据
  • 七、在 TiDB增删改数据,观察 ElasticSearch 中的结果

一、通过docker 来启动 TiDB 集群

git clone https://github.com/pingcap/tidb-docker-compose.git

替换目录 tidb-docker-compose 里面的 docker-compose.yml 文件,内容如下所示:

version: "2.1"

services:
  pd:
    image: pingcap/pd:v5.3.1
    ports:
      - "2379:2379"
    volumes:
      - ./config/pd.toml:/pd.toml
      - ./logs:/logs
    command:
      - --client-urls=http://0.0.0.0:2379
      - --peer-urls=http://0.0.0.0:2380
      - --advertise-client-urls=http://pd:2379
      - --advertise-peer-urls=http://pd:2380
      - --initial-cluster=pd=http://pd:2380
      - --data-dir=/data/pd
      - --config=/pd.toml
      - --log-file=/logs/pd.log
    restart: on-failure

  tikv:
    image: pingcap/tikv:v5.3.1
    ports:
      - "20160:20160"
    volumes:
      - ./config/tikv.toml:/tikv.toml 
      - ./logs:/logs           
    command:
      - --addr=0.0.0.0:20160
      - --advertise-addr=tikv:20160
      - --data-dir=/data/tikv
      - --pd=pd:2379
      - --config=/tikv.toml
      - --log-file=/logs/tikv.log
    depends_on:
      - "pd"
    restart: on-failure

  tidb:
    image: pingcap/tidb:v5.3.1
    ports:
      - "4000:4000"
    volumes:
      - ./config/tidb.toml:/tidb.toml
      - ./logs:/logs
    command:
      - --store=tikv
      - --path=pd:2379
      - --config=/tidb.toml
      - --log-file=/logs/tidb.log
      - --advertise-address=tidb
    depends_on:
      - "tikv"
    restart: on-failure
    
  elasticsearch:
     image: elastic/elasticsearch:7.6.0
     container_name: elasticsearch
     environment:
       - cluster.name=docker-cluster
       - bootstrap.memory_lock=true
       - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
       - discovery.type=single-node
     ports:
       - "9200:9200"
       - "9300:9300"
     ulimits:
       memlock:
         soft: -1
         hard: -1
       nofile:
         soft: 65536
         hard: 65536
         
  kibana:
     image: elastic/kibana:7.6.0
     container_name: kibana
     ports:
       - "5601:5601"
     volumes:
       - /var/run/docker.sock:/var/run/docker.sock

该 Docker Compose 中包含的容器有:

  • TiDB 集群: tikv、pd、tidb。
  • Elasticsearch:orders 表将和 products 表进行 join,join 的结果写入 Elasticsearch 中。
  • Kibana:可视化 Elasticsearch 中的数据。

本机添加 host 映射 pd 和 tikv 映射 127.0.0.1。 在 docker-compose.yml 所在目录下运行如下命令以启动所有容器:

docker-compose up -d
mysql -h 127.0.0.1 -P 4000 -u root # Just test tidb cluster is ready,if you have install mysql local.

该命令会以 detached 模式自动启动 Docker Compose 配置中定义的所有容器。 你可以通过 docker ps 来观察上述的容器是否正常启动了。 也可以访问 http://localhost:5601/ 来查看 Kibana 是否运行正常。

另外可以通过如下命令停止并删除所有的容器:

docker-compose down

二、下载 Flink 和所需要的依赖包

下载 Flink 1.17.1 并将其解压至目录 flink-1.17.1

https://archive.apache.org/dist/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz

下载下面列出的依赖包,并将它们放到目录 flink-1.17.1/lib/ 下:

  • flink-connector-tidb-cdc-2.4.1.jar
  • flink-sql-connector-elasticsearch7-3.0.1-1.17.jar

三、在TiDB数据库中创建表和准备数据

创建数据库和表 products,orders,并插入数据:

-- TiDB
CREATE DATABASE mydb;
USE mydb;
CREATE TABLE products (
                         id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
                         name VARCHAR(255) NOT NULL,
                         description VARCHAR(512)
) AUTO_INCREMENT = 101;

INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),
      (default,"car battery","12V car battery"),
      (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
      (default,"hammer","12oz carpenter's hammer"),
      (default,"hammer","14oz carpenter's hammer"),
      (default,"hammer","16oz carpenter's hammer"),
      (default,"rocks","box of assorted rocks"),
      (default,"jacket","water resistent black wind breaker"),
      (default,"spare tire","24 inch spare tire");

CREATE TABLE orders (
                       order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
                       order_date DATETIME NOT NULL,
                       customer_name VARCHAR(255) NOT NULL,
                       price DECIMAL(10, 5) NOT NULL,
                       product_id INTEGER NOT NULL,
                       order_status BOOLEAN NOT NULL -- Whether order has been placed
) AUTO_INCREMENT = 10001;

INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
      (default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
      (default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);

四、启动Flink 集群,再启动 SQL CLI

使用下面的命令跳转至 Flink 目录下

cd flink-1.17.1

使用下面的命令启动 Flink 集群

./bin/start-cluster.sh

启动成功的话,可以在 http://localhost:8081/ 访问到 Flink Web UI,如下所示:
在这里插入图片描述
使用下面的命令启动 Flink SQL CLI

./bin/sql-client.sh

启动成功后,可以看到如下的页面:

在这里插入图片描述

五、在 Flink SQL CLI 中使用 Flink DDL 创建表

首先,开启 checkpoint,每隔3秒做一次 checkpoint

-- Flink SQL                   
Flink SQL> SET execution.checkpointing.interval = 3s;

使用 Flink SQL CLI 创建对应的表,用于同步这些底层数据库表的数据

Flink SQL> CREATE TABLE products (
    id INT,
    name STRING,
    description STRING,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    'connector' = 'tidb-cdc',
    'tikv.grpc.timeout_in_ms' = '20000',
    'pd-addresses' = '127.0.0.1:2379',
    'database-name' = 'mydb',
    'table-name' = 'products'
  );

Flink SQL> CREATE TABLE orders (
   order_id INT,
   order_date TIMESTAMP(3),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   PRIMARY KEY (order_id) NOT ENFORCED
 ) WITH (
    'connector' = 'tidb-cdc',
    'tikv.grpc.timeout_in_ms' = '20000',
    'pd-addresses' = '127.0.0.1:2379',
    'database-name' = 'mydb',
    'table-name' = 'orders'
);

Flink SQL> CREATE TABLE enriched_orders (
   order_id INT,
   order_date DATE,
   customer_name STRING,
   order_status BOOLEAN,
   product_name STRING,
   product_description STRING,
   PRIMARY KEY (order_id) NOT ENFORCED
 ) WITH (
     'connector' = 'elasticsearch-7',
     'hosts' = 'http://localhost:9200',
     'index' = 'enriched_orders_1'
 );

将关联后的数据插入到ElasticSearch

Flink SQL> INSERT INTO enriched_orders
  SELECT o.order_id, o.order_date, o.customer_name, o.order_status, p.name, p.description
  FROM orders AS o
  LEFT JOIN products AS p ON o.product_id = p.id;

六、Kibana查看ElasticSearch数据

检查最终的结果是否写入 ElasticSearch 中,可以在 Kibana 看到 ElasticSearch 中的数据。

首先访问 http://localhost:5601/app/kibana#/management/kibana/index_pattern 创建 index pattern enriched_orders.

在这里插入图片描述
然后就可以在 http://localhost:5601/app/kibana#/discover 看到写入的数据了.
在这里插入图片描述

七、在 TiDB增删改数据,观察 ElasticSearch 中的结果

通过如下的 SQL 语句对 TiDB 数据库进行一些修改,然后就可以看到每执行一条 SQL 语句,Elasticsearch 中的数据都会实时更新。

INSERT INTO orders
VALUES (default, '2020-07-30 15:22:00', 'Jark', 29.71, 104, false);

UPDATE orders SET order_status = true WHERE order_id = 10004;

DELETE FROM orders WHERE order_id = 10004;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/76504.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Matplotlib绘图知识小结--Python数据分析学习

一、Pyplot子库绘制2D图表 1、Matplotlib Pyplot Pyplot 是 Matplotlib 的子库,提供了和 MATLAB 类似的绘图 API。 Pyplot 是常用的绘图模块,能很方便让用户绘制 2D 图表。 Pyplot 包含一系列绘图函数的相关函数,每个函数会对当前的图像进行…

经典人体模型SMPL介绍(一)

SMPL是马普所提出的经典人体模型,目前已成为姿态估计、人体重建等领域必不可少的基础先验。SMPL基于蒙皮和BlendShape实现,从数千个三维人体扫描结果得来,后通过PCA统计学习得来。 论文:SMPL: A Skinned Multi-Person Linear Mode…

OpenStack监控工具

OpenStack是一个开源的云计算管理平台项目,是一系列软件开源项目的组合。由NASA和Rackspace合作研发并发起,以Apache许可证(Apache软件基金会发布的一个自由软件许可证)授权。 OpenStack为私有云和公有云提供可扩展的弹性的云计算…

二.net core 自动化发布到docker (Jenkins安装之后向导)

目录 ​​​​​​​​​​​​​​ 参考资料:https://www.jenkins.io/doc/book/installing/docker/#setup-wizard Post-installation setup wizard.(安装后安装向导) 基于上一篇文章安装,在安装并运行Jenkins(不包括使用Jenkins Opera…

学习pytorch 3 tensorboard的使用

tensorboard的使用 1. 安装2. add_scalar 查看函数图形3. 查看结果4. add_image() 查看训练步骤中间结果的图片 1. 安装 pytorch conda环境 pip install tensorboard pip install opencv-python2. add_scalar 查看函数图形 常用来查看 train val loss等函数图形 from torch…

TCP 协议十大相关特性总结

目录 一、TCP特性 二、报文格式 TCP十大核心特性 1. 确认应答 2. 超时重传 3. 连接管理(三次握手,四次挥手) 三次握手 四次挥手 4. 滑动窗口 情况一:接收方的ACK丢失 情况二:发送方的数据包丢失 5. 流量控制 6. 拥塞控制 7. 延迟应答 8. 捎带应答 9. 字节流粘包问题 10. TCP的…

WebStorm修改默认打开的浏览器

有两种方式第一种修改系统默认浏览器 我采用的是下面这种,在webstorm中修改 将浏览器设置为默认的浏览器即可

【学会动态规划】乘积为正数的最长子数组长度(21)

目录 动态规划怎么学? 1. 题目解析 2. 算法原理 1. 状态表示 2. 状态转移方程 3. 初始化 4. 填表顺序 5. 返回值 3. 代码编写 写在最后: 动态规划怎么学? 学习一个算法没有捷径,更何况是学习动态规划, 跟我…

实现文件的拖放功能

文章目录 实现文件的拖放功能1 拖放文件至QT窗口1.1 实现方法1.2 效果演示 2 拖放文件至python脚本2.1 实现方法2.2 效果演示 实现文件的拖放功能 试想一下,我们希望将一个python项目文件夹或者脚本在IDE中打开,无论是去IDE中选择文件夹路径,…

vuex学习总结

一、vuex工作原理 工作流程:需求:改变组件count的sun变量的值,先调用dispatch函数传入jia函数和要改变的值给actions(这个actions里面必须有jia这个函数);actions收到后调用commit函数将jia方法和值传给mut…

【Visual Studio Code】--- Win11 C盘爆满 修改 Code 插件数据和缓存的保存路径

Win11 C盘爆满 修改 Code 插件数据和缓存的保存路径 一、概述二、修改 Code 插件数据和缓存的保存路径 一、概述 一个好的文章能够帮助开发者完成更便捷、更快速的开发。书山有路勤为径,学海无涯苦作舟。我是秋知叶i、期望每一个阅读了我的文章的开发者都能够有所成…

28 | Boss直聘数据分析

针对boss直聘网的招聘信息,然后分析互联网发展排名前十的城市在互联网方面职位的薪水,学历要求,经验要求,等等信息。 准备从以下几个方面进行分析: (1)各个城市的平均工资 (2)各个学历的平均工资 (3)各个岗位的平均工资 (4)不同工作经验要求的工资 (5)各个经验…

冉冉升起的星火,再度升级迎来2.0时代!

文章目录 前言权威性评测结果 星火大模型多模态功能插件功能简历生成文档问答PPT生成 代码能力 福利 前言 前几天从技术群里看到大家都在谈论《人工智能大模型体验报告2.0》里边的内容,抱着好奇和学习的态度把报告看了一遍。看完之后瞬间被里边提到的科大讯飞的星火…

(leecode)错误的集合

最近听到的,还可以,试试吧~ 题目: 示例: 提示: 题解: 思路: 将数字大小的位置,然后遍历每个位置,大小为0的是缺失数字,大小为2的是重复数字 int* findErro…

Gitlab-第四天-CD到k8s集群的坑

一、.gitlab-ci.yml #CD到k8s集群的 stages: - deploy-test build-image-deploy-test: stage: deploy-test image: bitnami/kubectl:latest # 使用一个包含 kubectl 工具的镜像 tags: - k8s script: - ls -al - kubectl apply -f deployment.yaml # 根据实际情况替换…

Linux学习之firewallD

systemctl status firewalld.service查看一下firewalld服务的状态,发现状态是inactive (dead)。 systemctl start firewalld.service启动firewalld,systemctl status firewalld.service查看一下firewalld服务的状态,发现状态是active (runni…

【Apollo】阿波罗自动驾驶:塑造自动驾驶技术的未来

前言 Apollo (阿波罗)是一个开放的、完整的、安全的平台,将帮助汽车行业及自动驾驶领域的合作伙伴结合车辆和硬件系统,快速搭建一套属于自己的自动驾驶系统。 开放能力、共享资源、加速创新、持续共赢是 Apollo 开放平台的口号。百度把自己所拥有的强大、…

【Unity每日一记】方位辨别—向量的叉乘点乘结合

👨‍💻个人主页:元宇宙-秩沅 👨‍💻 hallo 欢迎 点赞👍 收藏⭐ 留言📝 加关注✅! 👨‍💻 本文由 秩沅 原创 👨‍💻 收录于专栏:uni…

Go 语言并发编程 及 进阶与依赖管理

1.0 从并发编程本质了解Go高性能的本质 1.1 Goroutine 协程可以理解为轻量级线程; Go更适合高并发场景原因之一:Go语言一次可以创建上万协成; “快速”:开多个协成 打印。 go func(): 在函数前加 go 代表 创建协程; time.Sleep():…

阿里云OSS对象存储的核心概念与购买应用

文章目录 1.OSS对象存储基本介绍1.1.OSS对象存储概念1.2.NAS与OSS存储的不同1.3.OSS的应用场景1.4.OSS术语对应表 2.购买OSS存储资源包3.KodCloud云盘接入OSS对象存储3.1.创建Bucket存储空间3.2.创建子用户用于管理Bucket3.3.获取用户的AccessKey3.3.为用户设置权限3.4.将Bucke…