数据同步MySQL -> Elasticsearch

大家好我是苏麟,今天聊聊数据同步 .

数据同步

一般情况下,如果做查询搜索功能,使用 ES 来模糊搜索,但是数据是存放在数据库 MySQL 里的,所以说我们需要把 MySQL 中的数据和 ES 进行同步,保证数据一致(以 MySQL 为主)。

MySQL =>ES(单向)

同步方式

首次安装完 ES,把 MySQL 数据全量同步到 ES 里,写一个单次脚本 4 种方式 , 全量同步(首次)+增量同步(新数据)

1.定时任务 (推荐 : 简单)

定时任务 : 比如1分钟1次,找到 MySQL 中过去几分钟内(至少是定时周期的2 倍)发生改变的数据,然后更新到 ES.

  • 优点:简单易懂、占用资源少、不用引入第三方中间件
  • 缺点:有时间差
  • 应用场景:数据短时间内不同步影响不大、或者数据几乎不发生修改
2.双写 

双写 : 写数据的时候,必须也去写 ES;更新删除数据库同理。(事务:建议先保证 MySQL写成功,如果ES 写失败了,可以通过定时任务 + 日志 +告警进行检测和修复(补偿))

  • 优点 : 不知道
  • 缺点 : 繁琐
3. Logstash

用 Logstash 数据同步管道 (一般要配合 kafka 消息队列 + beats 采集器)

  • 优点 : 用起来方便,插件多
  • 缺点 : 成本更大 : 一般要配合其他组件使用 (比如 kafka) , 维护成本 : 多维护一个组件 , 学习成本 : 学习使用
4.Canal (推荐 : 简单 , 实时性非常强)

Canal 监听 MySQL Binlog,实时同步

  • 优点 : 实时同步 , 实时性非常强
  • 缺点 :  忽略不计   (MySQL8版本可能连接失败)

定时任务

找到 MySQL 中过去几分钟内发生改变的数据,然后更新到 ES.

双向写入

写数据的时候,也去写 ES .

这个不推荐!

Logstash

传输 处理 数据的管道

文章 : Getting Started with Logstash | Logstash Reference [7.17] | Elastic

下载 : https://artifacts.elastic.co/downloads/logstash/logstash-7.17.9-windows-x86_64.zip

快速开始 : Running Logstash on Windows | Logstash Reference [7.17] | Elastic 

这里需要学习成本 , 有兴趣的小伙伴自己了解 , 这里不过多赘述 .

订阅数据库流水的同步方式 Canal

地址 : GitHub - alibaba/canal: 阿里巴巴 MySQL binlog 增量订阅&消费组件

原理 : 数据库每次修改时,会修改 binlog 文件,只要监听该文件的修改,就能第一时间得到消息并处理canal : 帮你监听 binlog,并解析 binlog 为你可以理解的内容。 

它伪装成了 MySQL 的从节点,获取主节点给的 binlog,如图:

快速开始 : QuickStart · alibaba/canal Wiki · GitHub

windows 系统,找到你本地的 mysql 安装目录,在根目录下新建 my.ini 文件:

Linux 系统,找到你本地的 mysql 安装目录,在根目录下新建 my.cnf 文件:

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

授权 canal 链接 MySQL 账号具有作为 MSQL slave 的权限, 如果已有账户可直接 grant

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

这里有个报错 java 找不到,修改 startup.bat 脚本为你自己的 java home

set JAVA_HOME=C:\Users\59278\.jdks\corretto-1.8.0_302 (自己MySQL路径)

set PATH=%JAVA_HOME%\bin;%PATH%

Java 中引入依赖

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.0</version>
</dependency>

Demo  

import java.net.InetSocketAddress;
import java.util.List;


import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;


public class SimpleCanalClientExample {


public static void main(String args[]) {
    // 创建链接
    CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                                                                                        11111), "example", "", "");
    int batchSize = 1000;
    int emptyCount = 0;
    try {
        connector.connect();
        connector.subscribe(".*\\..*");
        connector.rollback();
        int totalEmptyCount = 120;
        while (emptyCount < totalEmptyCount) {
            Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
            long batchId = message.getId();
            int size = message.getEntries().size();
            if (batchId == -1 || size == 0) {
                emptyCount++;
                System.out.println("empty count : " + emptyCount);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
            } else {
                emptyCount = 0;
                // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                printEntry(message.getEntries());
            }

            connector.ack(batchId); // 提交确认
            // connector.rollback(batchId); // 处理失败, 回滚数据
        }

        System.out.println("empty too many times, exit");
    } finally {
        connector.disconnect();
    }
}

private static void printEntry(List<Entry> entrys) {
    for (Entry entry : entrys) {
        if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
            continue;
        }

        RowChange rowChage = null;
        try {
            rowChage = RowChange.parseFrom(entry.getStoreValue());
        } catch (Exception e) {
            throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                                       e);
        }

        EventType eventType = rowChage.getEventType();
        System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                                         entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                                         entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                                         eventType));

        for (RowData rowData : rowChage.getRowDatasList()) {
            if (eventType == EventType.DELETE) {
                printColumn(rowData.getBeforeColumnsList());
            } else if (eventType == EventType.INSERT) {
                printColumn(rowData.getAfterColumnsList());
            } else {
                System.out.println("-------&gt; before");
                printColumn(rowData.getBeforeColumnsList());
                System.out.println("-------&gt; after");
                printColumn(rowData.getAfterColumnsList());
            }
        }
    }
}

private static void printColumn(List<Column> columns) {
    for (Column column : columns) {
        System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
    }
}

    
  

}

在启动之后 修改MySQL中数据 , java控制台就能实时输出 .

这期就到这里 , 下期见 !

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/406592.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

在 Jupyter Notebook 中查看所使用的 Python 版本和 Python 解释器路径

&#x1f349; CSDN 叶庭云&#xff1a;https://yetingyun.blog.csdn.net/ 我们在做 Python 开发时&#xff0c;有时在我们的服务器上可能安装了多个 Python 版本。 使用 conda info --envs 可以列出所有的 conda 环境。当在 Linux 服务器上使用 which python 命令时&#xff0…

ES通用查询页面使用说明

前言:ES语法比较复杂,需要专门的学习,而且查询工具不太友好, 对公司运维人员使用有点困难,所以花了个时间做了一个页面,方便运维人员使用,如下。 也不难,有兴趣的朋友可以私聊发源码。 开发帮助-ES数据查询 搜索 输入要查看的文档索引,文档类型后点【查询】即可 搜…

Python之Matplotlib

Matplotlib 是一个综合库&#xff0c;用于创建静态、动画、 和交互式可视化 安装 pip install matplotlib 功能 示例 import matplotlib.pyplot as plt plt.figure() plt.plot([1,2,3],[10,20,30]) plt.show() 官方文档&#xff1a; Matplotlib — Visualization with Pyt…

Covalent Network(CQT)发展新里程碑:SOC 2 数据安全认证通过,进一步加强了其人工智能支持

Covalent Network&#xff08;CQT&#xff09;现已完成并通过了严格的 Service Organization Control&#xff08;SOC) 2 Type II 的合规性审计&#xff0c;通过由备受行业认可的机构执行&#xff0c;进一步证明了 Covalent Network&#xff08;CQT&#xff09;团队坚定不移地致…

【区块链】智能交易模式下的数据安全流通模型

【区块链】智能交易模式下的数据安全流通模型 写在最前面**区块链智能交易模式概述****数据安全流通的挑战****数据安全流通模型的核心要素****实现数据安全流通的区块链技术****区块链智能交易模式下数据安全流通模型的设计原则****数据安全流通模型的应用案例分析****面临的挑…

OpenAI划时代大模型——文本生成视频模型Sora作品欣赏(五)

Sora介绍 Sora是一个能以文本描述生成视频的人工智能模型&#xff0c;由美国人工智能研究机构OpenAI开发。 Sora这一名称源于日文“空”&#xff08;そら sora&#xff09;&#xff0c;即天空之意&#xff0c;以示其无限的创造潜力。其背后的技术是在OpenAI的文本到图像生成模…

全面解析企业财务报表系列之四:财务报表的真实性和可靠性

全面解析企业财务报表系列之四&#xff1a;财务报表的真实性和可靠性 一、什么是会计方法二、选择会计方法三、会计方法的重要性四、会计报表常用的造假手段五、财务报表经常被遗漏的重要事件六、财务报告造假的资信敏感性七、财务报告审计的重要性八、审计报告 一、什么是会计…

HL祭记汇

一.写在前面 如果说廿四10天集训&#xff0c;对于我&#xff0c;是完成了从入门&#xff08;虽然可能我比别人入门更早&#xff1f;&#xff09;到准OIer的蜕变&#xff0c;那么&#xff0c;HL7天&#xff0c;可以说是真正成为了OIer&#xff0c;虽然是被小学生、初中生&#…

【时事篇-05-04】20240224 27笔货币基金中有3笔250元的具体数目测算( itertools)

结果展示 背景需求&#xff1a; 前文测算了27只货币基金&#xff0c;如果存145、146、147、148、149、150元分别需要存几笔。结果是4、4、4、5、5、5 【时事篇-05-03】20240222 金额145-150元填充27笔货币基金的具体数目测算&#xff08; itertools&#xff09;-CSDN博客文章…

1.30主成分分析,因子分析

主成分分析 主成分分析&#xff08;Principal Component Analysis&#xff0c;简称PCA&#xff09;是一种常用的多变量数据分析方法。它用于降低数据维度&#xff0c;以便更好地理解和解释数据集中的变化。PCA通过将原始数据投影到新的坐标轴上&#xff0c;使得新的坐标轴上的…

Raspbian命令行RTSP/RTP服务

Raspbian命令行RTSP/RTP服务 1. 源由2. Raspbian摄像头2.1 命令行启动RTP摄像头2.2 命令行启动RTSP摄像头 3. 示例3.1 测试RTP摄像头3.2 测试RTSP摄像头3.3 QGroundControl测试3.3.1 RTSP配置3.3.2 RTP配置 4. 总结5. 参考资料 1. 源由 鉴于实际测试发现RTP协议下&#xff0c;…

【MATLAB】CEEMD_ MFE_SVM_LSTM 神经网络时序预测算法

有意向获取代码&#xff0c;请转文末观看代码获取方式~也可转原文链接获取~ 1 基本定义 CEEMD_MFE_SVM_LSTM神经网络时序预测算法是一种结合了多种先进技术的复杂预测方法&#xff0c;旨在提高时序预测的准确性和稳定性。下面是对该算法的详细介绍&#xff1a; CEEMD&#xff…

常用的函数式接口(Supplier、Consumer、Predicate、Function)

目录 一.函数式接口作为方法的参数 二.函数式接口作为方法的返回值 三.常用的函数式接口 3.1生产型Supplier接口 3.2消费型Consumer接口 抽象方法&#xff1a;accept 默认方法&#xff1a;andThen 3.3判断型Predicate接口 抽象方法&#xff1a;test 默认方法&#xf…

【vue】provide/inject

provide/ inject这对选项需要一起使用&#xff0c;以允许一个祖先组件向其所有子孙后代注入一个依赖&#xff0c;不论组件层次有多深&#xff0c;并在起上下游关系成立的时间里始终生效。 通途点来讲可以用来实现隔代传值&#xff0c;传统的props只能父传子&#xff0c;而 prov…

电影《热辣滚烫》观后感

过完年&#xff0c;回来的时候&#xff0c;上周看了这部电影《热辣滚烫》&#xff0c;是贾玲自导自演的一部电影&#xff0c;个人感觉还是非常好的&#xff0c;偏向小人物&#xff0c;写实风格。 &#xff08;1&#xff09;电影相关评价 但是你说这部电影有多立志&#xff0c…

SQL Server——建表时为字段添加注释

在 MySQL 中&#xff0c;新建数据库表为字段添加注释可以使用 comment 属性来实现。SQL Server 没有 comment 属性&#xff0c;但是可以通过执行 sys.sp_addextendedproperty 这个存储过程添加扩展属性来实现相同的功能。 这个存储过程的参数定义如下&#xff1a; exec sys.s…

【微服务生态】Elasticsearch

文章目录 一、概述二、下载和部署2.1 单机部署2.2 集群部署2.2.1 环境配置2.2.2 安装及部署 三、基本操作3.1 概述3.2 HTTP 操作3.2.1 索引操作3.2.2 文档操作3.2.3 关系映射3.2.4 高级查询 3.3 Java API 操作 四、Elasticsearch 进阶4.1 核心概念4.2 系统架构4.3 分布式集群4.…

C++之Easyx——图形库的基本功能(3):形状绘制(上)

目录 目录 目录 一、bar 函数定义 使用说明 示例程序 二、circle 函数定义 使用说明 示例程序 三、rectangle 函数定义 使用说明 示例程序 四、arc 函数定义 使用说明 参考线 示例程序 一、bar 函数定义 void EGEAPI bar(int left, int top, int right, int bottom, PIMAG…

【前端素材】推荐优质后台管理系统Sneat平台模板(附源码)

一、需求分析 后台管理系统是一种用于管理网站、应用程序或系统的工具&#xff0c;它通常作为一个独立的后台界面存在&#xff0c;供管理员或特定用户使用。下面详细分析后台管理系统的定义和功能&#xff1a; 1. 定义 后台管理系统是一个用于管理和控制网站、应用程序或系统…

snmp协议开通教程

目录 一、什么是snmp协议&#xff1f; 二、snmp协议可以用来干什么&#xff1f; 三、snmp协议的开通 1、snmpv2协议开通 2、snmpv3协议开通 一、什么是snmp协议&#xff1f; SNMP&#xff08;Simple Network Management Protocol&#xff09;是一种用于网络管理的标准协议&a…
最新文章