canal调度控制器CanalController源码分析

canal调度控制器初始化:

public CanalController(final Properties properties)

1.  初始化instance公共全局配置: canal.instance.global.(mode、lazy、manager.address以及spring.xml,并放入内存 InstanceConfig globalInstanceConfig;

canal.instance.global.mode: 用于确定canal instance的全局配置加载方式,主要有两种方式一种是spring本地模式,一种是admin管理端配置, 如果指定了manager地址(canal.admin.manager),则强制使用manager模式,spring模式的是SpringCanalInstanceGenerator,通过参数化将instance名称带入路径classpath:${canal.instance.destination:}/instance.properties,获取本地resource目录下配置文件

 private InstanceConfig initGlobalConfig(Properties properties) {
   .......
}

2. 初始化每个instance独有的配置,通过destinations配置多个instance,并放入内存Map<String, InstanceConfig> instanceConfigs;

    private void initInstanceConfig(Properties properties) {
        // 对instance进行逗号分割 存在多个instance的情况下
        String destinationStr = getDestinations(properties);
        String[] destinations = StringUtils.split(destinationStr, CanalConstants.CANAL_DESTINATION_SPLIT);

        for (String destination : destinations) {
            // 可针对instance进行单独配置 mode 、Spring、lazy
            InstanceConfig config = parseInstanceConfig(properties, destination);
            InstanceConfig oldConfig = instanceConfigs.put(destination, config);

            if (oldConfig != null) {
                logger.warn("destination:{} old config:{} has replace by new config:{}", destination, oldConfig, config);
            }
        }
    }

3. 初始化canal server,server主要有两种方式,官方描述如下:

我们这里使用的是独立部署,会使用两种,均是单利模式,CanalServerWithNetty会将客户端请求派给CanalServerWithEmbeded 进行真正的处理,CanalServerWithNetty接收来自canal client的请求。

        // 嵌入式服务实例化
        embeddedCanalServer = CanalServerWithEmbedded.instance();
        embeddedCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator
        .....此处代码省略
        // canal.withoutNetty 配置
        String canalWithoutNetty = getProperty(properties, CanalConstants.CANAL_WITHOUT_NETTY);
        if (canalWithoutNetty == null || "false".equals(canalWithoutNetty)) {
            // server 初始化
            canalServer = CanalServerWithNetty.instance();
            canalServer.setIp(ip);
            canalServer.setPort(port);
        }

4. 初始化canal server集群模式,根据canal.zkServers配置的zk地址,是否走HA模式, 并进行初始化目录,创建永久节点(集群节点以及instance节点)

        // zk 集群地址
        final String zkServers = getProperty(properties, CanalConstants.CANAL_ZKSERVERS);
        if (StringUtils.isNotEmpty(zkServers)) {
            zkclientx = ZkClientx.getZkClient(zkServers);
            // 初始化系统目录 /otter/canal/destinations
            zkclientx.createPersistent(ZookeeperPathUtils.DESTINATION_ROOT_NODE, true);
            //  /otter/canal/cluster  整个集群信息
            zkclientx.createPersistent(ZookeeperPathUtils.CANAL_CLUSTER_ROOT_NODE, true);
        }

每个canal-server对每个instance的管理是交给ServerRunningMonitor类,监控运行状态,有变更的时候会进行相应的变更处理。只有当后面开启自动化扫描,才会进行初始化,每个instance对应一个ServerRunningMonitor。

 
//ServerRunningMonitors.getRunningMonitor被调用的时候,先对map进行查找,没有的话进行以下
//代码生成 ServerRunningMonitors.setRunningMonitors(MigrateMap.makeComputingMap((Function<String, ServerRunningMonitor>) destination -> {
            ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData);
            runningMonitor.setDestination(destination);
            // 回调所做的事情 启动前以及启动后
            runningMonitor.setListener(new ServerRunningListener() {

                public void processActiveEnter() {...}

                public void processActiveExit() {...}

                public void processStart() {...}

                public void processStop() {...}

            });
            if (zkclientx != null) {
                runningMonitor.setZkClient(zkclientx);
            }
            // 触发创建一下cid节点
            runningMonitor.init();
            return runningMonitor;
        }));

5.  初始化instance自动化扫描,通过配置canal.auto.scan=true 进行开启自动化扫描。

defaultAction:其作用是如果配置发生了变更,默认应该采取什么样的操作。实现了InstanceAction接口定义的三个抽象方法:start、stop和reload。当新增一个destination配置时,需要调用start方法来启动一个新instance,当移除一个destination配置时,需要调用stop方法来停止当前instance;当某个destination配置发生变更时,需要调用reload方法来进行重启。instanceConfigMonitors: 监听配置变更,Spring模式定时扫描默认是user.dir + conf目录组成,manager模式通过PlainCanalConfigClient http 方式获取admin端管理的配置。

//当instance发现变更的时候,默认应该采取什么样的操作        
autoScan = BooleanUtils.toBoolean(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN));
        if (autoScan) {
            defaultAction = new InstanceAction() {
                public void start(String destination) {...}

                public void stop(String destination) {...}

                public void reload(String destination) {...}

                @Overrid
                public void release(String destination) {....}   
              }                               
            };
            // 监听配置
            instanceConfigMonitors = MigrateMap.makeComputingMap(mode -> {
//扫描间隔时间                
int scanInterval = Integer.valueOf(getProperty(properties,
                    CanalConstants.CANAL_AUTO_SCAN_INTERVAL,
                    "5"));

                     // spring 模式
                     if (mode.isSpring()) {
                    SpringInstanceConfigMonitor monitor = new SpringInstanceConfigMonitor();
                    monitor.setScanIntervalInSecond(scanInterval);
                    monitor.setDefaultAction(defaultAction);
                    // 设置conf目录,默认是user.dir + conf目录组成
                    String rootDir = getProperty(properties, CanalConstants.CANAL_CONF_DIR);
                    if (StringUtils.isEmpty(rootDir)) {
                        rootDir = "../conf";
                    }

                    if (StringUtils.equals("otter-canal", System.getProperty("appName"))) {
                        monitor.setRootConf(rootDir);
                    } else {
                        // eclipse debug模式
                        monitor.setRootConf("src/main/resources/");
                    }
                    return monitor;
                 // admin 模式
                } else if (mode.isManager()) {
                    ManagerInstanceConfigMonitor monitor = new ManagerInstanceConfigMonitor();
                    monitor.setScanIntervalInSecond(scanInterval);
                    monitor.setDefaultAction(defaultAction);
                    String managerAddress = getProperty(properties, CanalConstants.CANAL_ADMIN_MANAGER);
                    monitor.setConfigClient(getManagerClient(managerAddress));
                    return monitor;
                } else {
                    throw new UnsupportedOperationException("unknow mode :" + mode + " for monitor");
                }
                }
            });
        }

canal调度控制器start():

  public void start() throws Throwable {
        logger.info("## start the canal server[{}({}):{}]", ip, registerIp, port);
        // 创建整个canal的工作节点
        // /otter/canal/cluster/ip:port
        final String path = ZookeeperPathUtils.getCanalClusterNode(registerIp + ":" + port);
        // 创建临时节点
        initCid(path);
        if (zkclientx != null) {
            this.zkclientx.subscribeStateChanges(new IZkStateListener() {

                public void handleStateChanged(KeeperState state) throws Exception { }
                // 新建立连接
                public void handleNewSession() throws Exception {
                    initCid(path);
                }

                @Override
                public void handleSessionEstablishmentError(Throwable error) throws Exception {
                    logger.error("failed to connect to zookeeper", error);
                }
            });
        }
        // 优先启动embedded服务
        embeddedCanalServer.start();
        // 尝试启动一下非lazy状态的通道
        for (Map.Entry<String, InstanceConfig> entry : instanceConfigs.entrySet()) {
            final String destination = entry.getKey();
            InstanceConfig config = entry.getValue();
            // 创建destination的工作节点
            if (!embeddedCanalServer.isStart(destination)) {
                // HA机制启动  创建监听,有就获取,没有就新增
                ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
                if (!config.getLazy() && !runningMonitor.isStart()) {
                    runningMonitor.start();
                }
            }

            if (autoScan) {
                instanceConfigMonitors.get(config.getMode()).register(destination, defaultAction);
            }
        }
        //启动配置文件自动检测机制
        if (autoScan) {
            instanceConfigMonitors.get(globalInstanceConfig.getMode()).start();
            for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) {
                if (!monitor.isStart()) {
                    // 启动monitor
                    monitor.start();
                }
            }
        }

        // 启动网络接口
        if (canalServer != null) {
            // 启动网络接口,监听客户端请求
            canalServer.start();
        }
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/332864.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

看完买,开放式耳机质量榜单:南卡夺冠、韶音第5、Cleer排第7

​作为一名拥有丰富经验的开放式耳机用户&#xff0c;我想在此提醒大家&#xff0c;选择耳机时&#xff0c;千万不要盲目跟风或过于信赖所谓的“网红”或“大牌产品”。毕竟&#xff0c;每个人的需求和使用环境都是独一无二的&#xff0c;因此&#xff0c;适合自己的耳机才是最…

AP5191DC-DC宽电压LED降压恒流驱动器

产品描述 AP5191是一款PWM工作模式,高效率、外围 简 单、外置功率MOS管&#xff0c;适用于4.5-150V输入的高 精度降压LED恒流驱动芯片。输出最大功率150W&#xff0c; 最大电流6A。 AP5191可实现线性调光和PWM调光&#xff0c;线性 调 光脚有效电压范围0.55-2.6V. AP519…

centos7 arm服务器编译安装PaddlePaddle

前言 随着国产服务器发展&#xff0c;部署项目需要用在国产服务器上&#xff0c;官方教程里面很多没有讲解到&#xff0c;安装过程中出现了各种各样的问题&#xff0c;以下是对官方教程的补充&#xff0c;有什么问题&#xff0c;欢迎指正&#xff01; 一、环境准备 gcc: 8.2版…

(工具变量)各地区-距沿海港口最短距离汇总数据集

全国各省距沿海港口最短距离数据提供了中国各省份与最近海港之间的最短地理距离信息。这些数据对于理解和分析中国各省的地理优势、物流效率以及对外贸易潜力有一定帮助。沿海港口作为国际贸易的重要节点&#xff0c;其距离对于省份的出口入口物流成本、运输时间以及总体贸易便…

vue $attrs和$listenners

Vue2.x 中的a t t r s 和 attrs和attrs和listeners 或许很多Vue小白跟我一样, 在之前不太了解a t t r s 和 attrs和attrs和listenners这两个API是干嘛的, 甚至没有听过或者使用过。下面我来浅述一下我对这两个API的理解。 下文将基于下面这张图片来进行解释&#xff0c;现在我…

《Python数据分析技术栈》第01章 02 Jupyter入门(Getting started with Jupyter notebooks)

02 Jupyter入门&#xff08;Getting started with Jupyter notebooks&#xff09; 《Python数据分析技术栈》第01章 02 Jupyter入门&#xff08;Getting started with Jupyter notebooks&#xff09; Before we discuss the essentials of Jupyter notebooks, let us discuss…

VSCode使用Makefile Tools插件开发C/C++程序

提起Makefile&#xff0c;可能有人会觉得它已经过时了&#xff0c;毕竟现在有比它更好的工具&#xff0c;比如CMake&#xff0c;XMake&#xff0c;Meson等等&#xff0c;但是在Linux下很多C/C源码都是直接或者间接使用Makefile文件来编译项目的&#xff0c;可以说Makefile是基石…

RT Thread Stdio生成STM32L431RCT6工程后如何修改外部时钟

一、简介 RT Thread Stdio生成STM32L431RCT6工程后默认为内部时钟&#xff0c;如何修改为外部时钟呢&#xff1f; 二、修改时钟步骤 本方案修改外部时钟为直接修改代码&#xff0c;不通过STM32CubeMX 进行配置&#xff08;使用这个软件会编译出错&#xff09; &#xff08;…

AEB滤镜再破碎,安全焦虑「解不开」?

不久前&#xff0c;理想L7重大交通事故&#xff0c;再次引发了公众对AEB的热议。 根据理想汽车公布的事故视频显示&#xff0c;碰撞发生前3秒&#xff0c;车速在178km/h时驾驶员采取了制动措施&#xff0c;但车速大幅超出AEB&#xff08;自动紧急刹车系统&#xff09;的工作范…

为什么 Golang Fasthttp 选择使用 slice 而非 map 存储请求数据

文章目录 Slice vs Map&#xff1a;基本概念内存分配和性能Fasthttp 中的 SliceMap性能优化的深层原因HTTP Headers 的特性CPU 预加载特性 结论 Fasthttp 是一个高性能的 Golang HTTP 框架&#xff0c;它在设计上做了许多优化以提高性能。其中一个显著的设计选择是使用 slice 而…

用sdkman在linux上管理多个java版本

概述&#xff1a; SDKMAN 是一个用于管理软件开发工具的工具&#xff0c;允许您轻松地安装、升级和切换不同版本的 JDK、Maven、Gradle 等工具。以下是在 Linux 上安装 SDKMAN! 的基本步骤&#xff1a; 安装SdkMan 使用 curl 安装 SDKMAN!: 打开终端&#xff0c;并运行以下命…

SpringCloud之Nacos

一、微服务介绍 1. 什么是微服务 2014年,Martin Fowler(马丁福勒 ) 提出了微服务的概念,定义了微服务是由以单一应用程序构成的小服务,自己拥有自己的进程与轻量化处理,服务依业务功能设计,以全自动的方式部署,与其他服务使用 HTTP API 通信。同时服务会使用最小的规模…

eNSP学习——配置通过Telnet登陆系统

实验内容&#xff1a; 模拟公司网络场景。R1是机房的设备&#xff0c;办公区与机房不在同一楼层&#xff0c;R2和R3模拟员工主机&#xff0c; 通过交换机S1与R1相连。 为了方便用户的管理&#xff0c;需要在R1上配置Telnet使员工可以在办公区远程管理机房设备。 为…

德施曼智能锁×去哪儿跨界联名,送你一场说走就走的新年旅行~

2024年农历新年即将来临&#xff0c;智能锁行业领军企业德施曼携手中国领先在线旅游平台去哪儿&#xff0c;紧扣“旅游过年”的新年趋势&#xff0c;推出“新年去哪儿&#xff0c;德施曼替你看家”跨界联名活动&#xff0c;为广大用户带来一场说走就走的旅行。 德施曼X去哪儿 …

学习笔记之——3D Gaussian SLAM,SplaTAM配置(Linux)与源码解读

SplaTAM全称是《SplaTAM: Splat, Track & Map 3D Gaussians for Dense RGB-D SLAM》&#xff0c;是第一个&#xff08;也是目前唯一一个&#xff09;开源的用3D Gaussian Splatting&#xff08;3DGS&#xff09;来做SLAM的工作。 在下面博客中&#xff0c;已经对3DGS进行了…

让CHAT对springSecurity原理进行简述

CHAT回复&#xff1a;Spring Security是Spring框架中用于实现认证和授权功能的安全框架。其主要原理基于Filter机制&#xff0c;可以实现基于角色或者资源URL的访问控制。 具体来说&#xff0c;Spring Security通过一系列的Filter对Web请求进行拦截&#xff0c;然后根据用户提供…

短视频代运营抖音项目规划管理计划模板

【干货资料持续更新&#xff0c;以防走丢】 短视频代运营抖音项目规划管理计划模板 部分资料预览 资料部分是网络整理&#xff0c;仅供学习参考。 短视频代运营模板&#xff08;完整资料包含以下内容&#xff09; 目录 具体的表格设计和内容可能因不同的情况和需求而有所变…

基于YOLOv8深度学习的葡萄簇目标检测系统【python源码+Pyqt5界面+数据集+训练代码】目标检测、深度学习实战

《博主简介》 小伙伴们好&#xff0c;我是阿旭。专注于人工智能、AIGC、python、计算机视觉相关分享研究。 ✌更多学习资源&#xff0c;可关注公-仲-hao:【阿旭算法与机器学习】&#xff0c;共同学习交流~ &#x1f44d;感谢小伙伴们点赞、关注&#xff01; 《------往期经典推…

k8s------Pod、Label、NameSpace

一、Pod: Kubernetes中的最小调度对象 1.1 说明 Pod(容器组)是k8s创建和调度的最小单元。一个Pod封装多个容器(container)、存储资源(volume)、一个独立网络ip和管理控制容器运行方式。 Pod可以单独运行一个容器&#xff0c;也可以兼容多个容器运行&#xff0c;多个容器共享…

CentOS 7.9 安装图解

特特特别的说明 CentOS发行版已经不再适合应用于生产环境&#xff0c;客观条件不得不用的话&#xff0c;优选7.9版本&#xff0c;8.5版本次之&#xff0c;最次6.10版本&#xff08;比如说Oracle 11GR2就建议在6版本上部署&#xff09;&#xff01; 引导和开始安装 选择倒计时结…
最新文章