Canal入门使用

说明:canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费(官方介绍)。一言以蔽之,Canal是一款实现数据同步的组件。可以实现数据库之间、数据库与Redis、ES之间的数据同步。本文介绍Canal的入门使用。

Canal介绍

Canal实现原理是伪装成MySQL主节点的从节点,接收主节点的binlog日志,解析、提取数据库操作,将对数据库的操作通过代码更新到其他组件中,如其他数据库、ES、Redis等,官方解释如下:

在这里插入图片描述

官方提供的结构图如下:

在这里插入图片描述

Canal安装

首先,从官网上下载Canal服务器,地址:https://github.com/alibaba/canal/releases

在这里插入图片描述

下载下来,解压,如下:

在这里插入图片描述

canal配置文件暂时不用管,先修改一下example文件中监测的目前节点配置,修改成自己需要监测的MySQL配置,如下:

在这里插入图片描述

修改完,启动canal服务,双击startup.bat文件,如下:

在这里插入图片描述

Canal使用

只要你的MySQL服务器的IP、账号密码没输错,且测试过能用Navicat或其他数据库连接工具成功连接数据库,那么就可以进行下面的编码工作了。

首先,创建一个Maven项目,pom.xml如下,导个canal依赖就行了

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.hezy</groupId>
    <artifactId>canal_demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <!--canal客户端-->
        <dependency>
            <groupId>top.javatool</groupId>
            <artifactId>canal-spring-boot-starter</artifactId>
            <version>1.2.1-RELEASE</version>
        </dependency>
    </dependencies>

</project>

测试代码如下,用来连接canal服务器,打印canal监测到的数据内容;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;

import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;

/**
 * Canal处理器
 * 作用:打印canal服务器监测到的数据
 */
public class CanalHandler {
    public static void main(String[] args) throws InvalidProtocolBufferException {

        // 1.创建连接
        CanalConnector canalConnector = CanalConnectors
                .newSingleConnector(new InetSocketAddress("localhost", 11111), "example", "", "");

        // 2.抓取数据
        while (true) {

            // 3.开始连接
            canalConnector.connect();

            // 4.订阅数据,所有的库和表
            canalConnector.subscribe(".*\\..*");

            // 5.抓取数据,每次抓取100条
            Message message = canalConnector.get(100);

            // 6.获取entry集合
            List<CanalEntry.Entry> entries = message.getEntries();

            // 7.判断是否有数据
            if (entries.size() == 0) {
                System.out.println(">>>暂无数据<<<");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                // 8.解析数据
                for (CanalEntry.Entry entry : entries) {
                    // 获取表名
                    String tableName = entry.getHeader().getTableName();
                    // 获取操作类型
                    CanalEntry.EntryType entryType = entry.getEntryType();

                    // 判断entryType是否为ROWDATA
                    if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {
                        // 序列化数据
                        ByteString storeValue = entry.getStoreValue();
                        // 反序列化数据
                        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);

                        // 获取事件类型
                        CanalEntry.EventType eventType = rowChange.getEventType();
                        // 获取具体的数据
                        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
                        // 遍历打印
                        for (CanalEntry.RowData rowData : rowDatasList) {
                            // 获取拉取前后的数据
                            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
                            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();

                            // 用Map存储每条数据
                            HashMap<String, Object> beforeMap = new HashMap<>();
                            HashMap<String, Object> afterMap = new HashMap<>();

                            // 获取不同操作的数据
                            if (CanalEntry.EventType.INSERT.equals(eventType)) {
                                System.out.println("【" + tableName + "】表插入数据");
                                for (CanalEntry.Column column : afterColumnsList) {
                                    afterMap.put(column.getName(), column.getValue());
                                }
                                System.out.println("新增数据:" + afterMap);
                            } else if (CanalEntry.EventType.UPDATE.equals(eventType)) {
                                System.out.println("【" + tableName + "】表更新数据");
                                for (CanalEntry.Column column : beforeColumnsList) {
                                    beforeMap.put(column.getName(), column.getValue());
                                }
                                System.out.println("更新前:" + beforeMap);
                                System.out.println("----");
                                for (CanalEntry.Column column : afterColumnsList) {
                                    afterMap.put(column.getName(), column.getValue());
                                }
                                System.out.println("更新后:" + afterMap);
                            } else if (CanalEntry.EventType.DELETE.equals(eventType)) {
                                System.out.println("【" + tableName + "】表删除数据");
                                for (CanalEntry.Column column : beforeColumnsList) {
                                    beforeMap.put(column.getName(), column.getValue());
                                }
                                System.out.println("被删除的数据:" + beforeMap);
                            }
                        }
                    }
                }
            }
        }
    }
}

启动程序,查看控制台,检测中……

在这里插入图片描述

使用Navicat连接数据库,查看数据库test库,i_user表内容;

在这里插入图片描述

此时,我们新增一条数据,看控制台,canal成功接收到了这次修改;

在这里插入图片描述

更新数据;

在这里插入图片描述

删除数据;

在这里插入图片描述

头能过身体就能过,接下来不就好办了。将Canal接收到的数据转为对象,根据不同的操作类型分发给自己想要同步的组件,同步给从MySQL,就调用对应的Mapper;同步给Redis,就调用Redis对应的方法,ES同样。

总结

本文介绍了Canal入门使用,参考B站视频:

  • Canal极简入门:一小时让你快速上手Canal数据同步神技~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/579670.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【氮化镓】p-GaN HEMTs空穴陷阱低温冻结效应

这篇文章是关于低温条件下p-GaN高电子迁移率晶体管&#xff08;HEMTs&#xff09;栅极漏电的研究。文章通过电容深能级瞬态谱&#xff08;C-DLTS&#xff09;测试和理论模型分析&#xff0c;探讨了空穴陷阱对栅极漏电电流的影响。以下是对文章的总结&#xff1a; 摘要&#xf…

前端JS加密库CryptoJS的常用方法

CryptoJS是前端常用的一个加密库&#xff0c;如MD5、SHA256、AES等加密算法。 官方文档&#xff1a;https://www.npmjs.com/package/crypto-js 安装方法 方法一&#xff1a;直接在html文件中引入 <script type"text/javascript" src"path-to/bower_componen…

(六)几何平均数计算 补充案例 #统计学 #CDA学习打卡

一. 两个案例 1&#xff09;几何平均数计算&#xff1a;基金年平均增长率计算 在财务、投资和银行业的问题中&#xff0c;几何平均数的应用尤为常见&#xff0c;当你任何时候想确定过去几个连续时期的平均变化率时&#xff0c;都能应用几何平均数。其他通常的应用包括物种总体…

[Linux][网络][网络编程套接字][一][预备知识][套接字地址结构]详细讲解

目录 0.预备知识1.理解源IP地址和目的IP地址2.理解源MAC地址和目的MAC地址3.端口号4.理解端口号和进程ID5.理解源端口号和目的端口号6.通过IP地址、端口号、协议号进行通信识别7.认识TCP协议和UDP协议8.网络字节序 1.套接字地址结构(sockaddr) 0.预备知识 1.理解源IP地址和目的…

安装配置Maven(idea里面配置)

放在这个路径下&#xff08;如果需要可以免费发给你&#xff0c;dd我就好了&#xff09; D:\IearnSoftware\maven\apache-maven-3.6.1-bin.zip&#xff08;我自己的路径下面&#xff0c;防止忘记&#xff09; 1.首先测试maven在不在&#xff0c;配置对不对 mvn -v 这样就是成…

SpringMVC进阶(数据格式化以及数据校验)

文章目录 1.数据格式化1.基本介绍1.基本说明2.环境搭建 2.基本数据类型和字符串转换1.需求分析2.环境搭建1.data_valid.jsp首页面2.Monster.java封装请求信息3.MonsterHandler.java处理请求信息4.monster_addUI.jsp添加妖怪界面5.单元测试 3.保存妖怪信息1.MonsterHandler.java…

【软件开发规范篇】Git分支使用规范

作者介绍&#xff1a;本人笔名姑苏老陈&#xff0c;从事JAVA开发工作十多年了&#xff0c;带过刚毕业的实习生&#xff0c;也带过技术团队。最近有个朋友的表弟&#xff0c;马上要大学毕业了&#xff0c;想从事JAVA开发工作&#xff0c;但不知道从何处入手。于是&#xff0c;产…

windows驱动开发-中断(一)

中断是windows中最难的一部分&#xff0c;这是因为中断本身属于操作系统的一部分&#xff0c;理解了中断和内存&#xff0c;对整个系统也就了解了。 中断部分会先从中断优先级、中断处理、中断服务例程入手&#xff0c;大概讲述一下中断的概念&#xff1b;接着从中断的一般实现…

springboot 集成 activemq

文章目录 一&#xff1a;说明二&#xff1a;e-car项目配置1 引入activemq依赖2 application启动类配置消息监听3 application.yml配置4 MQConfig.java 配置类5 ecar 项目中的监听6 junit 发送消息 三&#xff1a;tcm-chatgpt项目配置5 MQListener.java 监听消息 三 测试启动act…

Docker② —— Cgroups详解

1. 概述 Cgroups 的全称是control groups&#xff0c;cgroups为每种可以控制的资源定义了一个子系统。Cgroups分为三个部分&#xff1a; cgroup 本身&#xff1a;对进程进行分组hierarchy&#xff1a;将 cgroup 形成树形结构subsystem&#xff1a;真正起到限制作用的部组件 cp…

【vscode】2024最新!vscode云端配置同步方案:code settings sync

小tian最近对电脑进行了系统重装&#xff0c;结果vscode相关配置和插件都没有保存记录&#xff0c;还好公司电脑里还有。痛定思痛&#xff0c;决定写一篇vscode云端同步配置方案&#xff0c;以作记录和分享~ 步骤一&#xff1a;安装vscode插件&#xff1a;code settings sync …

云贝餐饮连锁V2-2.9.9源码

云贝餐饮连锁V2独立版、版本更新至2.9.9&#xff0c;小程序、公众号版本&#xff0c;全插件&#xff0c;公众号小程序端&#xff0c;独立版&#xff1b; 带商家端&#xff0c;修复收银台、排队点餐、堂食点餐&#xff1b;最新版更新 搭建环境教程: 系统环境&#xff1a;CentO…

【04】JAVASE-循环语句【从零开始学JAVA】

Java零基础系列课程-JavaSE基础篇 Lecture&#xff1a;波哥 Java 是第一大编程语言和开发平台。它有助于企业降低成本、缩短开发周期、推动创新以及改善应用服务。如今全球有数百万开发人员运行着超过 51 亿个 Java 虚拟机&#xff0c;Java 仍是企业和开发人员的首选开发平台。…

算法学习笔记Day9——动态规划基础篇

一、介绍 本文解决几个问题&#xff1a;动态规划是什么&#xff1f;解决动态规划问题有什么技巧&#xff1f;如何学习动态规划&#xff1f; 1. 动态规划问题的一般形式就是求最值。动态规划其实是运筹学的一种最优化方法&#xff0c;只不过在计算机问题上应用比较多&#xff…

微信小程序小游戏开发,微信开发者工具提示该目录下的项目(wxapp2)已在工具中创建,怎么办

微信小程序小游戏开发&#xff0c;微信开发者工具提示该目录下的项目&#xff08;wxapp2&#xff09;已在工具中创建&#xff0c;怎么办 情况描述&#xff0c; 导入一个项目的时候&#xff0c;导入成了小游戏项目了 想换成小游戏项目&#xff0c;变不了了&#xff0c;提示 “…

未来已来:解锁AGI的无限潜能与挑战

未来已来&#xff1a;解锁AGI的无限潜能与挑战 引言 假设你有一天醒来&#xff0c;发现你的智能手机不仅提醒你今天的日程&#xff0c;还把你昨晚做的那个奇怪的梦解释了一番&#xff0c;并建议你可能需要减少咖啡摄入量——这不是科幻电影的情节&#xff0c;而是人工通用智能…

解决Milvus官网提供的单机版docker容器无法启动,以及其它容器进程与Milvus容器通信实现方案【Milvus】【pymilvus】【Docker】

文章目录 问题预备知识方案获取pymilvus获取milvus 实例多容器通信 问题 我的需求是做混合检索单机版可以满足&#xff0c;要走Docker容器部署&#xff0c;还需要和另一个容器中的程序做通信。官方文档提供的Milvus安装启动Milvus方案&#xff0c;见文档&#xff1a;传送门 我…

wlan二层直连组网实验(ensp)

目录 1. VLAN 端口类型及参数设计2. IP 地址规划3. WLAN数据规划(1) DHCP服务器配置(2) AC 源接口地址、认证方式配置(3) AP 组的创建(4) 创建域管理模板、国家码认证(5) 创建安全模板(6) 创建SSID模板(7) 创建VAP模板(8) AP组绑定模板(9) 查看&#xff1a; 1. VLAN 端口类型及…

以太网LAN双向透明传输CH9120透传芯片实现以太网转232串口转485转TTL串口

网络串口透传芯片 CH9120 1、概述 CH9120 是一款网络串口透传芯片。CH9120 内部集成 TCP/IP 协议栈&#xff0c;可实现网络数据包和串口数据的双向透明传输&#xff0c;具有 TCP CLIENT、TCP SERVER、UDP CLIENT 、UDP SERVER 4 种工作模式&#xff0c;串口波特率最高可支持到…

03 Docker入门Dockerfile详解及镜像创建

1.1 使用 Dockerfile 构建镜像 新建一个 Dockerfile 文件vi Dockerfile 将下面的内容复制粘贴进去:## Base Images ## 从天池基础镜像构建(from的base img 根据自己的需要更换,建议使用天池open list镜像链接:https://tianchi.aliyun.com/forum/postDetail?postId=67720) F…
最新文章