canal 数据同步组件

canal 数据异构组件

为啥要使用这个组件?
在更新DB的时候不同步更新到redis,es等数据库中,时间太久,而且可能会存在同步失败的问题,因此引入canal去拉取DB的数据,再去更新到redis,es等数据库中,有失败重试和回滚等功能。
canal原理?
canal 伪装成salve向mysql发送dump协议,拿到备份数据binlog,去更新数据到redis,es等数据库中或者通过组装数据之后更新。canal可以拿到更新前的所有数据,更新后的所有数据,更新了哪些数据

canal 组件的使用

1.下载canal组件

下载地址canal组件下载地址
在我的资源中也有canal组件包
在这里插入图片描述
解压启动(我是windows版,双击startup.bat)

在这里插入图片描述

2.数据库配置

1.开启MySQL , 需要先开启 Binlog 写入功能

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

2.授权 canal 作为mysql 的slave 的权限

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
3.项目引入jar包
<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.4</version>
</dependency>
4.写canal监听数据工具类
package com.next.canal;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;

import java.net.InetSocketAddress;
import java.util.List;

public class SimpleCanalClientExample {
    public static void main(String args[]) {
        // 创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                11111), "example", "", "");
        int batchSize = 1000;
        int emptyCount = 0;
        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            int totalEmptyCount = 120;
            while (emptyCount < totalEmptyCount) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count : " + emptyCount);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    emptyCount = 0;
                    // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                    printEntry(message.getEntries());
                }

                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }

            System.out.println("empty too many times, exit");
        } finally {
            connector.disconnect();
        }
    }

    private static void printEntry(List<CanalEntry.Entry> entrys) {
        for (CanalEntry.Entry entry : entrys) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            CanalEntry.RowChange rowChage = null;
            try {
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }

            CanalEntry.EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == CanalEntry.EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------&gt; before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------&gt; after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(List<CanalEntry.Column> columns) {
        for (CanalEntry.Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}

5.简单例子使用测试

1.数据库更改user_id从0改为1,再从1改为0
2.查看canal监测的数据(canal可以拿到更新前的所有数据,更新后的所有数据,更新了哪些数据)

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

6.进一步完善canal监听数据工具类,用于应用例子

1.加入监听器,项目启动时启动
2.使用线程去监听数据
3.替换掉system.out.print(),里面有锁,会阻塞,使用日志打印
4.处理canal监测到的数据

package com.next.canal;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.next.dao.TrainNumberDetailMapper;
import com.next.service.TrainNumberService;
import com.next.service.TrainSeatService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.net.InetSocketAddress;
import java.util.List;

/**
 * @desc 不要用system.out.print()里面有锁,会阻塞,用日志打印
 */
@Service
@Slf4j
public class CanalSubscribe implements ApplicationListener<ContextRefreshedEvent> {

    @Resource
    private TrainSeatService trainSeatService;

    @Resource
    private TrainNumberService trainNumberService;

    //监听,启动的时候就开始调用此监听方法
    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        canalSubscribe();
    }

    private void canalSubscribe() {
        // 创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                11111), "example", "", "");
        int batchSize = 1000;
        //使用线程
        new Thread(() -> {
            try {
                log.info("canal subscribe");
                connector.connect();
                connector.subscribe(".*\\..*");
                connector.rollback();
                while (true) {
                    Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                    long batchId = message.getId();
                    int size = message.getEntries().size();
                    if (batchId == -1 || size == 0) {
                        //没有取到数据继续
                        safeSleep(100);
                        continue;
                    }
                    try {
                        log.info("new message,batchIds:{},size:{}", batchId, batchSize);
                        //打印日志
                        printEntry(message.getEntries());
                        // 提交确认
                        connector.ack(batchId);
                    } catch (Exception e2) {
                        log.error("canal data exception,batchIds:{}", batchId, e2);
                        // 处理失败, 回滚数据
                        connector.rollback(batchId);
                    }
                }
            } catch (Exception e3) {
                log.error("canal subscribe exception", e3);
                safeSleep(1000);
                canalSubscribe();
            }
        }).start();
    }

    private void printEntry(List<CanalEntry.Entry> entrys) throws Exception{
        for (CanalEntry.Entry entry : entrys) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            CanalEntry.RowChange rowChage = null;
            try {
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("RowChange.parse Exception , data:" + entry, e);
            }

            //更新类型-更新,删除,新增
            CanalEntry.EventType eventType = rowChage.getEventType();
            //数据库名
            String schemaName = entry.getHeader().getSchemaName();
            //表名
            String tableName = entry.getHeader().getTableName();
            log.info("name:[{},{}],eventType:{}",schemaName,tableName,eventType);
            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.DELETE) {
                    handleColumn(rowData.getBeforeColumnsList(), eventType, schemaName, tableName);
                } else {
                    handleColumn(rowData.getAfterColumnsList(), eventType, schemaName, tableName);
                }
            }
        }
    }

    //处理canal监测到的数据
    private void handleColumn(List<CanalEntry.Column> columnsList, CanalEntry.EventType eventType, String schemaName, String tableName) throws Exception{
        if(schemaName.contains("12306_seat_")){
            //处理座位变更
            trainSeatService.handle(columnsList,eventType);
        }else if(tableName.equals("train_number")){
            //车次详情处理(实际上是车次信息变更之后才批量处理车次详情)
            trainNumberService.handle(columnsList,eventType);
        }else{
            log.info("drop data,no need care");
        }


    }



    private void safeSleep(int millis) {
        try {
            Thread.sleep(100);
        } catch (Exception e1) {

        }
    }

}

处理canal监测到的数据(拿到改变的数据,放到实体类中,存到redis中)

package com.next.service;


import com.alibaba.otter.canal.protocol.CanalEntry;
import com.next.dao.TrainNumberMapper;
import com.next.model.TrainNumber;
import com.next.model.TrainSeat;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.List;

@Service
@Slf4j
public class TrainSeatService {

    @Resource
    private TrainNumberMapper trainNumberMapper;

    @Resource
    private TrainCacheService trainCacheService;

    //处理座位,canal通过监听座位库,拿到改变的数据,放到实体类中
    public void handle(List<CanalEntry.Column> columns, CanalEntry.EventType eventType) {
        if (eventType != CanalEntry.EventType.UPDATE) {
            log.info("not update,no need care");
            return;
        }
        TrainSeat trainSeat = new TrainSeat();
        boolean isStatusUpdated = false;
        for (CanalEntry.Column column : columns) {
            //票的状态改变了才做下面的操作
            if (column.getName().equals("status")) {
                trainSeat.setStatus(Integer.parseInt(column.getValue()));
                if (column.getUpdated()) {
                    isStatusUpdated = true;
                } else {
                    break;
                }
            } else if (column.getName().equals("id")) {
                trainSeat.setId(Long.parseLong(column.getValue()));
            } else if (column.getName().equals("carriage_number")) {
                trainSeat.setCarriageNumber(Integer.parseInt(column.getValue()));
            } else if (column.getName().equals("row_number")) {
                trainSeat.setRowNumber(Integer.parseInt(column.getValue()));
            } else if (column.getName().equals("seat_number")) {
                trainSeat.setSeatNumber(Integer.parseInt(column.getValue()));
            } else if (column.getName().equals("train_number_id")) {
                trainSeat.setTrainNumberId(Integer.parseInt(column.getValue()));
            } else if (column.getName().equals("ticket")) {
                trainSeat.setTicket(column.getValue());
            } else if (column.getName().equals("from_station_id")) {
                trainSeat.setFromStationId(Integer.parseInt(column.getValue()));
            } else if (column.getName().equals("to_station_id")) {
                trainSeat.setToStationId(Integer.parseInt(column.getValue()));
            }
        }
        if (!isStatusUpdated) {
            log.info("status not update,no need care");
        }
        log.info("train seat update,trainSeat:{}", trainSeat);

        /**
         * 数据存到redis
         * 1.指定座位被占:hash
         * cacheKey:车次_日期  D386_20231001
         * field: carriage_row_seat_fromStationId_toStationId
         * value: 0-空闲 1-占座
         *
         * 2.每个座位详情剩余的座位数
         * cacheKey: 车次_日期_count D386_20231001_count
         * field: fromStationId_toStationId
         * value: 实际座位数
         *
         */

        TrainNumber trainNumber = trainNumberMapper.selectByPrimaryKey(trainSeat.getTrainNumberId());
        //放票
        if (trainSeat.getStatus() == 1) {
            trainCacheService.hset(trainNumber.getName() + "_" + trainSeat.getTicket(),
                    trainSeat.getCarriageNumber() + "_" + trainSeat.getRowNumber() + "_" + trainSeat.getSeatNumber()
                            + "_" + trainSeat.getFromStationId() + "_" + trainSeat.getToStationId(),
                    "0");

            trainCacheService.hincr(trainNumber.getName() + "_" + trainSeat.getTicket() + "_count",
                    trainSeat.getFromStationId() + "_" + trainSeat.getToStationId(),
                    1l);
            log.info("seat+1,trainNumber:{},trainSeat:{}", trainNumber, trainSeat);

            //占票
        } else if (trainSeat.getStatus() == 2) {
            trainCacheService.hset(trainNumber.getName() + "_" + trainSeat.getTicket(),
                    trainSeat.getCarriageNumber() + "_" + trainSeat.getRowNumber() + "_" + trainSeat.getSeatNumber()
                            + "_" + trainSeat.getFromStationId() + "_" + trainSeat.getToStationId(),
                    "1");
            trainCacheService.hincr(trainNumber.getName() + "_" + trainSeat.getTicket() + "_count",
                    trainSeat.getFromStationId() + "_" + trainSeat.getToStationId(),
                    -1l);
            log.info("seat-1,trainNumber:{},trainSeat:{}", trainNumber, trainSeat);
        } else {
            log.info("status update not 1 or 2,no need care");
        }
    }


}

在这里插入图片描述

参考文档:canal使用说明文档

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/275907.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

LED驱动升降压芯片的多种应用方案,实现产品多样化需求-FP7195

目录 FP7195LED驱动降压恒流型 FP7195驱动升压恒流型 FP7195-升降压恒流型驱动方式 FP7195-升降压恒流型驱动方式-高压版 FP7195LED驱动是一种广泛应用于LED照明产品中的驱动器&#xff0c;为了满足不同客户对于产品性能和功能的要求&#xff0c;该驱动器提供了四种不同的方…

Go 中有效并发的模式

设计高效可靠的并发系统 在现代软件开发领域中&#xff0c;利用并发的能力已经变得至关重要。随着应用程序的复杂性增加和数据处理需求的增长&#xff0c;编写既高效又可靠的并发代码成为了一个重要的关注点。为了解决这个挑战&#xff0c;开发者们已经制定了一些模式和最佳实…

java freemarker 动态生成excel文件

好久木有更新啦 抓住2023的小尾巴 浅浅更新一下吧~ 最近做了一个动态生成excel的功能&#xff0c;这里记录下部分功能&#xff0c;主要用到的是freemarker框架&#xff0c;spring就有带&#xff0c;我起的demo载入了一下freemarker的jar包 一、创建模板 首先可以创建一个e…

百度每天20%新增代码由AI生成,Comate SaaS服务8000家客户 采纳率超40%

12月28日&#xff0c;由深度学习技术及应用国家工程研究中心主办的WAVE SUMMIT深度学习开发者大会2023在北京召开。百度首席技术官、深度学习技术及应用国家工程研究中心主任王海峰现场公布了飞桨文心五载十届最新生态成果&#xff0c;文心一言最新用户规模破1亿&#xff0c;截…

idea中切换JDK8、JDK11、JDK17

有时候&#xff0c;我们可能需要在不同的Java版本中去测试或者查看源码&#xff0c;idea可以让我们修改Java的版本。 前提&#xff1a;你必须下载安装好对应的Java版本&#xff0c;可参考文章【windows下切换JDK8、JDK11、JDK17】&#xff08;https://blog.csdn.net/xijinno1/a…

九九乘法表c 语言 用于打印九九乘法表

以下是一个简单的C语言程序&#xff0c;用于打印九九乘法表&#xff1a; #include <stdio.h>int main() {int i, j;for (i 1; i < 9; i) {for (j 1; j < i; j) {printf("%d*%d%-2d ", j, i, i*j);}printf("\n");}return 0; }解释&#xff1…

快速上手makefile自动化构建工具

makefile自动化构建工具 文章目录 makefile自动化构建工具 makefile背景 简单认识makefile 依赖关系与依赖方法 生成项目 清理项目 ACM时间 语法补充 .PHONY修饰 特殊符号替换 Makefile的推导过程 总结 前言&#xff1a; 在windows下&#xff0c;很多东西都是编译器直接帮你做…

im6ull学习总结(二)Framebuffer 应用编程

1 LCD操作原理 linux中通过framebuffer驱动程序来控制LCD。framebuffer中包含LCD的参数&#xff0c;大小为LCD分辨率xbpp。framebuffer 是一块内存 内存中保存了一帧图像。 关于图像的帧指的是在图像处理中&#xff0c;一帧&#xff08;Frame&#xff09;是指图像序列中的单个…

一篇文章带你轻松入门Python

Python基础 1. Hello World! Python命令行 假设你已经安装好了Python, 那么在命令提示符输入: python 将直接进入python。然后在命令行提示符>>>后面输入: >>>print(Hello World!) 可以看到&#xff0c;随后在屏幕上输出: print是一个常用函数&#xf…

python学习14

前言&#xff1a;相信看到这篇文章的小伙伴都或多或少有一些编程基础&#xff0c;懂得一些linux的基本命令了吧&#xff0c;本篇文章将带领大家服务器如何部署一个使用django框架开发的一个网站进行云服务器端的部署。 文章使用到的的工具 Python&#xff1a;一种编程语言&…

Analytify Pro Google Analytics Goals Addon谷歌分析目标插件

Analytify Pro Google Analytics Goals Addon谷歌分析目标插件是一款极其巧妙且具有开创性的工具&#xff0c;它赋予用户细致跟踪和全面分析其网站性能的卓越能力。有了这个非凡的插件&#xff0c;个人可以毫不费力地建立并认真监控他们的Google Analytics目标&#xff0c;从而…

du和df

du 和df 不一致的问题&#xff1a; 情况如下&#xff1a; innode 没有满 同事求助&#xff0c; 他在删掉一个很大的文件后&#xff0c; 磁盘空间依旧没释放。上去一看&#xff0c; 果然 df 看到磁盘空间占用依旧是100%&#xff0c;等等 du 看了一把&#xff0c;磁盘空间剩余很…

低延时视频技术的应用场景和挑战

编者按 无线网络对人们的生活产生了巨大的影响&#xff0c;而5G技术的引入将彻底改变我们与世界互联互通的方式。在5G时代&#xff0c;实现万物互联离不开低延时技术的应用。 LiveVideoStackCon 2023 深圳站邀请到秒点科技的CEO扶凯&#xff0c;为大家分享低延时技术在物联网、…

Impala大数据框架学习网站,大数据技能提升必备利器!

介绍&#xff1a;Impala是Cloudera开发的新型查询系统&#xff0c;它能够对存储在HDFS、HBaseImpala是Cloudera开发的新型查询系统&#xff0c;它能够对存储在HDFS、HBase以及S3上的数据进行快速的交互式SQL查询。此外&#xff0c;Impala与Hive使用了统一的存储系统、同样的元数…

什么是https证书?

HTTPS证书&#xff0c;也称为SSL&#xff08;Secure Sockets Layer&#xff09;证书或TLS&#xff08;Transport Layer Security&#xff09;证书&#xff0c;是一种数字证书&#xff0c;用于在网络上建立安全的加密连接。它的主要目的是确保在互联网上进行的数据传输的安全性和…

提升设计效率:全面了解如何使用Figma插件

Figma组件库包括颜色、字体、图标、按钮、阴影、圆角、间距等。当Figma组件库的样式和Figma组件达到一定数量时&#xff0c;将难以维护&#xff0c;设计和开发的对接成本将大大提高。Figma可以在同一母版下单独设置样式&#xff0c;而不影响与母版之前的关系&#xff0c;这是Sk…

w4操作系统之windows上创建隐藏用户

隐藏用户–在windows上创建隐藏用户 1.首先查看现有哪些用户。&#xff08;通过net user 命令&#xff09; 2.然后创建隐藏用户&#xff08;net user client$ 123 /add&#xff09; 此时出现报错信息。原因是登录用户没权限。需要用管理员的权限 3.用管理员身份运行cmd&am…

AspectJWeaver之Gadget分析

前言&#xff1a; 今天看了下ysoserial的AspectJWeaver方法&#xff0c;分析了下其是如何通过调用SimpleCache$StorableCachingMap来实现写文件&#xff0c;这里把分析的流程写下来&#xff1a; 首先我们要看下其所需要的jar包&#xff1a; <dependencies><dependen…

drf知识-08

Django之了解DRF框架 # 介绍&#xff1a;DRF全称 django rest framework # 背景&#xff1a; 在序列化与反序列化时&#xff0c;虽然操作的数据不尽相同&#xff0c;但是执行的过程却是相似的&#xff0c;也就是说这部分代码是可以复用简化编写的 增&#xff1a;校验请…

基于SSM实现的电动汽车充电网点管理系统

一、系统架构 前端&#xff1a;jsp | jquery | bootstrap | css 后端&#xff1a;spring | springmvc | jdbc 环境&#xff1a;jdk1.8 | mysql 二、代码及数据库 三、功能介绍 01. web端-首页 02. web端-登录 03. web端-注册 04. web端-我要充电 05. web端-个人中心-消…