使用Logstash将MySQL中的数据同步至Elasticsearch

目录

1 使用docker安装ELK

1.1 安装Elasticsearch

1.2 安装Kibana

1.3 安装Logstash

2 数据同步

2.1 准备MySQL表和数据

2.2 运行Logstash

2.3 测试

3 Logstash报错(踩坑)记录

3.1 记录一

3.1.1 报错信息

3.1.2 报错原因

3.1.3 解决方案

3.2 记录二

3.2.1 报错信息

3.2.2 报错原因

3.3.3 解决方案


1 使用docker安装ELK

        ELK是指Elasticsearch、Logstash、Kibana。

1.1 安装Elasticsearch

# 拉取es镜像
docker pull elasticsearch:7.4.2

mkdir -p /root/docker/elasticsearch/config
mkdir -p /root/docker/elasticsearch/data

# 任何ip都能访问
echo "http.host: 0.0.0.0" >> /root/docker/elasticsearch/config/elasticsearch.yml

# 运行elasticsearch REST API端口9200 集群端口9300
docker run --name elasticsearch -p 9200:9200 -p 9300:9300 \
--restart=always \
--privileged=true \
-e "discovery.type=single-node" \
-e ES_JAVA_OPTS="-Xms64m -Xmx512m" \
-v /root/docker/elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml \
-v /root/docker/elasticsearch/data:/usr/share/elasticsearch/data \
-v /root/docker/elasticsearch/plugins:/usr/share/elasticsearch/plugins \
-d elasticsearch:7.4.2

# 保证权限 任何人任何组都可以读写操作执行,可以进入elasticsearch使用ll命令查看权限
chmod -R 777 /root/docker/elasticsearch/ 

 测试是否安装成功:

# 查看elasticsearch是否运行
docker ps -a

        在浏览器输入虚拟机的ip和elasticsearch的REST API端口http://172.1.11.10:9200/ ,如果出现以下内容,说明安装成功。

{
    "name": "7876d2859af8",
    "cluster_name": "elasticsearch",
    "cluster_uuid": "i46io2YkTY6pXr8IQ9qmXA",
    "version": {
        "number": "7.4.2",
        "build_flavor": "default",
        "build_type": "docker",
        "build_hash": "2f90bbf7b93631e52bafb59b3b049cb44ec25e96",
        "build_date": "2019-10-28T20:40:44.881551Z",
        "build_snapshot": false,
        "lucene_version": "8.2.0",
        "minimum_wire_compatibility_version": "6.8.0",
        "minimum_index_compatibility_version": "6.0.0-beta1"
    },
    "tagline": "You Know, for Search"
}

1.2 安装Kibana

# 拉取镜像,可视化检索数据
docker pull kibana:7.4.2

# 运行Kibana
docker run --name kibana --restart=always --privileged=true \
-e ELASTICSEARCH_HOSTS=http://172.xx.xx.xx:9200 \
-p 5601:5601 -d kibana:7.4.2

说明:

(1)-e ELASTICSEARCH_HOSTS=http://172.xx.xx.xx:9200 :Elasticsearch地址。

(2)-d:后端运行。

(3)--restart=always:开机启动。

(4)--name kibana :容器名称。

(6)privileged=true :权限。

1.3 安装Logstash

  • Logstash是具有实时流水线能力的开源的数据收集引擎。Logstash可以动态统一不同来源的数据,并将数据标准化到您选择的目标输出。它提供了大量插件,可帮助我们解析,丰富,转换和缓冲任何类型的数据。 
  • 管道(Logstash Pipeline)是Logstash中独立的运行单元,每个管道都包含两个必须的元素输入(input)和输出(output),和一个可选的元素过滤器(filter),事件处理管道负责协调它们的执行。 输入和输出支持编解码器,使您可以在数据进入或退出管道时对其进行编码或解码,而不必使用单独的过滤器。
  • Logstash官方插件 logstash-input-jdbc集成在Logstash(5.x之后)的版本,可以通过配置实现mysql和es全量与增量数据的定时同步。
# 拉取logstash
docker pull logstash:7.4.2

2 数据同步

2.1 准备MySQL表和数据

create table pms_spu_info
(
   id                   bigint not null auto_increment comment '商品id',
   spu_name             varchar(200) comment '商品名称',
   spu_description      varchar(1000) comment '商品描述',
   catalog_id           bigint comment '所属分类id',
   brand_id             bigint comment '品牌id',
   weight               decimal(18,4),
   publish_status       tinyint comment '上架状态[0 - 下架,1 - 上架]',
   create_time          datetime,
   update_time          datetime,
   primary key (id)
);

2.2 运行Logstash

# 运行logstash
docker run -d --name logstash logstash:7.4.2

mkdir -p /root/docker/logstash/config
mkdir -p /root/docker/logstash/data
mkdir -p /root/docker/logstash/pipeline
mkdir -p /root/docker/logstash/jars

# 上传mysql驱动mysql-connector-java-5.1.47.jar到/root/docker/logstash/jars

#拷贝已启动的容器中的文件到宿主机,用于重启挂载
docker cp logstash2:/usr/share/logstash/config /root/docker/logstash/
docker cp logstash2:/usr/share/logstash/data /root/docker/logstash/
docker cp logstash2:/usr/share/logstash/pipeline /root/docker/logstash/

# 保证权限 任何人任何组都可以读写操作执行
chmod -R 777 /root/docker/logstash

# 删除logstash容器
docker rm -f logstash

# 配置连接es
cd /root/docker/logstash/config
vi logstash.yml
  • logstash.yml
http.host: "0.0.0.0"
xpack.monitoring.elasticsearch.hosts: [ "http://172.xx.xx.6:9200" ]
  • 创建mysql.conf,编写mysql数据同步至es相关配置
# 创建mysql.conf
cd /root/docker/logstash2/pipeline/
vi mysql.conf

        1)mysql.conf内容如下:

input {
  jdbc {
    type => "jdbc"
    # 数据库连接地址
    jdbc_connection_string => "jdbc:mysql://172.xx.xx.xx:9906/gulimall_pms?useUnicode=true&characterEncoding=UTF-8&useSSL=false"
    # 数据库连接账号和密码
    jdbc_user => "root"
    jdbc_password => "root"
    # MySQL驱动架包
    jdbc_driver_library => "/usr/share/logstash/mysql/mysql-connector-java-8.0.17.jar"
    # MySQL驱动
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    # 数据库重连尝试次数
    connection_retry_attempts => "3"
    # 判断数据库连接是否可用,默认是false不开启
    jdbc_validate_connection => "true"
    # 数据库连接可用校验超时时间,默认3600秒
    jdbc_validation_timeout => "3600"
    # 开启分页查询,默认false不开启
    jdbc_paging_enabled => "true"
    # 单次分页查询条数(默认100000,若字段较多且更新频率较高,建议调低此值)
    jdbc_page_size => "500"
    # 查询数据sql,如果sql较复杂,建议配通过statement_filepath配置sql文件的存放路径
    statement => "SELECT id,spu_name spuName,spu_description spuDescription,catalog_id catalogId,brand_id brandId,weight,publish_status publishStatus,DATE_FORMAT(create_time,'%Y-%m-%d %H:%i:%s') createTime,DATE_FORMAT(update_time,'%Y-%m-%d %H:%i:%s') updateTime FROM pms_spu_info WHERE update_time > :sql_last_value"
    # 是否将字段名转换为小写,默认true(如果有数据序列化、反序列化需求,建议改为false)
    lowercase_column_names => false
    # 是否记录上次执行结果,true表示会将上次执行结果的tracking_column字段的值保存到last_run_metadata_path指定的文件中
    record_last_run => true
    # 需要记录查询结果某字段的值时,此字段为true,否则默认tracking_column为timestamp的值
    use_column_value => true
    # 需要记录的字段,用于增量同步,需是数据库字段
    tracking_column => "updateTime"
    # 轨迹字段类型Value can be any of: numeric,timestamp,Default value is "numeric"
    tracking_column_type => timestamp
    # record_last_run上次数据存放位置
    last_run_metadata_path => "/usr/share/logstash/config/logstash_metadata"
    # 是否清除last_run_metadata_path的记录,需要增量同步时此字段必须为false
    clean_run => false
    # 同步频率(分 时 天 月 年),默认每分钟同步一次
    schedule => "* * * * *"
     
  }
}

output {
  elasticsearch {
    # host => "192.168.1.1"
		# port => "9200"
		# 配置ES集群地址
    hosts => ["172.xx.xx.xx:9200"]
    # 索引名字,必须小写
    index => "spu"
    # 文档id,数据唯一索引(建议使用表的主键)
    document_id => "%{id}"
  }
  stdout {
    codec => json_lines
  }
}

        2)查询sql如下:

SELECT 
id,spu_name spuName,spu_description spuDescription,catalog_id catalogId,
brand_id brandId,weight,publish_status publishStatus,
DATE_FORMAT(create_time,'%Y-%m-%d %H:%i:%s') createTime,
DATE_FORMAT(update_time,'%Y-%m-%d %H:%i:%s') updateTime 
FROM pms_spu_info WHERE update_time > :sql_last_value

日期通过DATE_FORMAT(date,"输出格式")进行格式化,数据库与es日期格式保持一致。

  •  重新运行logstash容器
docker run  --name logstash --restart=always -d -p 5044:5044 -p 9600:9600   \
--privileged=true \
-v /root/docker/logstash/config:/usr/share/logstash/config   \
-v /root/docker/logstash/jars/mysql-connector-java-5.1.47.jar:/usr/share/logstash/logstash-core/lib/jars/mysql-connector-java-5.1.47.jar \
-v /root/docker/logstash/pipeline:/usr/share/logstash/pipeline \
logstash:7.4.2 -f /usr/share/logstash/pipeline/mysql.conf

说明:

(1)-f 是一个非常有用的选项,可以使用户使用指定的文件来指定一些Docker镜像的构建和配置信息。

(2)-f 也可以用于强制删除容器。

2.3 测试

  • mysql表中数据,如下

  • 通过Kibana进行查询,如下:

3 Logstash报错(踩坑)记录

3.1 记录一

3.1.1 报错信息

LogStash::PluginLoadingError Unable to find driver class via URLClassLoader in given driver jars : com.mysql.jdbc.Driver and com.mysql.jdbc.Driver

3.1.2 报错原因

        Logstashd的logstash-input-jdbc插件在调用数据库驱动jar包时,默认会去logstash/logstash-core/lib/jars/目录下去找。

3.1.3 解决方案

        将数据库驱动(例如:mysql-connector-java-5.1.47.jar)放到/usr/share/logstash/logstash-core/lib/jars/下面。

3.2 记录二

3.2.1 报错信息

javax.net.ssl.SSLException: closing inbound before receiving peer's close _notify

3.2.2 报错原因

        安装的是mysql8.x的版本,远程连接发现需要做ssl身份验证,本机连接不需要,取消掉其ssl身份验证需要调整配置。        

3.3.3 解决方案

        数据库连接地址上添加useSSL=false,如下:

"jdbc:mysql://172.xx.xx.xx:9906/gulimall_pms?useUnicode=true&characterEncoding=UTF-8&useSSL=false"

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/363063.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

榜单!高阶智驾冲刺10%搭载率,哪些玩家占据自研感知「高地」

得「感知」者,是智能化尤其是智能驾驶技术变革快速演进期的受益者。尤其是对于车企来说,规控自研易,感知自研难。 尤其是过去几年时间,基于机器学习和深度学习,TransformerBEV技术进一步提高对异常行为的预测准确性&am…

证券开户怎么联系专属客户经理?新手必看!

证券开户联系专属客户经理的方式有很多,可以通过手机网上找客户经理,现在这种方式是最多的,比如咱们网站都是各大券商专业的客户经理,在线联系就可以帮您安排。您自己也可以挑选自己觉得好的券商和客户经理,然后再沟通…

文本生成高清、连贯视频,谷歌推出时空扩散模型

谷歌研究人员推出了创新性文本生成视频模型——Lumiere。 与传统模型不同的是,Lumiere采用了一种时空扩散(Space-time)U-Net架构,可以在单次推理中生成整个视频的所有时间段,能明显增强生成视频的动作连贯性&#xff…

基于MongoDB实现聊天记录的存储

一、mongodb简介 1.1 mongodb简介 MongoDB是一个基于分布式文件存储的数据库,使用C语言编写。它旨在为WEB应用提供可扩展的高性能数据存储解决方案。MongoDB介于关系数据库和非关系数据库之间,是非关系数据库当中功能最丰富、最像关系数据库的。 Mong…

oracle 结果集操作符(求交集、并集、差集)

结果集的操作符 求并集:将两个结果集合并成一个结果集返回 union是求并集去重 union all是求并集不去重 select 1 as A from dual union select 1 as B from dual; select 1 as A from dual union all select 1 as B from dual;求交集:将两个结果集中公…

Unity 访问者模式(实例详解)

文章目录 实例1:简单的形状与统计访客实例2:游戏对象组件访问者实例4:Unity场景对象遍历与清理访客实例5:角色行为树访问者 访问者模式(Visitor Pattern)在Unity中主要用于封装对一个对象结构中各个元素的操…

【开发实践】python使用 moviepy实现mp4转gif(含可视化界面)

一、分析介绍 MoviePy是一个用于视频编辑和处理的Python库。它提供了一种简单而直观的方式来创建、编辑和合成视频,同时也支持添加音频和图像。 以下是MoviePy的一些主要功能和用法示例: 创建视频剪辑: from moviepy.editor import VideoFi…

批量将本地图片转为webp

-I 后是当前图片的路径目录 -O 后是 输出的webp目录 npx webp-batch-convert -I images -O images_webp npx webp-batch-convert -I icon -O icon_webp 在 assets 目录内 执行上面的命令 直接转换

怎么对现在的现货黄金行情进行分析?

进行现货黄金行情分析,很多人一上来就讨论使用什么技术指标,什么基本面的工具等等,其实这些东西并不是首要的。要做现货黄金行情分析,首先我们要掌握一些核心的问题,下面我们就来讨论一下这些核心问题是什么&#xff1…

JNPF低代码平台存在的价值

低代码开发平台是近年来兴起的一种软件开发体例,它为非技术背景的人员提供了编写应用程序的能力,而无需编写任何代码,这种开发体例正在逐渐改变传统的软件开发模式,为企业带来了更高的效率和矫捷性,在低代码开发平台中…

CentOS部署Docker Registry镜像仓库并结合内网穿透实现远程访问

文章目录 1. 部署Docker Registry2. 本地测试推送镜像3. Linux 安装cpolar4. 配置Docker Registry公网访问地址5. 公网远程推送Docker Registry6. 固定Docker Registry公网地址 Docker Registry 本地镜像仓库,简单几步结合cpolar内网穿透工具实现远程pull or push (拉取和推送)…

RT-DETR算法优化改进:上采样算子 | 超轻量高效动态上采样DySample,效果秒杀CAFFE,助力小目标检测

💡💡💡本文独家改进:一种超轻量高效动态上采样DySample, 具有更少的参数、FLOPs,效果秒杀CAFFE和YOLOv8网络中的nn.Upsample 💡💡💡在多个数据集下验证能够涨点,尤其在小目标检测领域涨点显著。 RT-DETR魔术师专栏介绍: https://blog.csdn.net/m0_63774211/…

【目标跟踪】3D点云跟踪

文章目录 一、前言二、代码目录三、代码解读3.1、文件描述3.2、代码框架 四、关联矩阵计算4.1、ComputeLocationDistance4.2、ComputeDirectionDistance4.3、ComputeBboxSizeDistance4.4、ComputePointNumDistance4.5、ComputePointNumDistance4.6、result_distance 五、结果 一…

系统分析师-21年-下午答案

系统分析师-21年-下午答案 更多软考知识请访问 https://ruankao.blog.csdn.net/ 试题一必答,二、三、四、五题中任选两题作答 试题一 (25分) 说明 某软件企业拟开发一套基于移动互联网的在线运动器材销售系统,项目组决定采用FAST 开发方法进行系统分…

SumGNN: 多类型药物相互作用预测 - 通过高效知识图谱概括

SumGNN: 多类型药物相互作用预测 - 通过高效知识图谱概括 在医学领域,准确预测药物之间的相互作用对于药物研发和治疗方案设计至关重要。为了解决这一挑战,我们提出了一种名为"SumGNN"的新方法,旨在通过高效的知识图谱概括实现多类…

PostgreSql和Oracle的事务机制区别以及对程序的影响

前言 几年前IT信息产业的一些核心技术包括架构、产品以及生态都是国外制定,然而自从“遥遥领先”公司被制裁后,国家开始大力支持信息产业“新基建”,自2020年开始市场上涌现出了大量的国产化软件,就国产化数据库而言我所在的公司…

一种轻量分表方案-MyBatis拦截器分表实践|京东零售技术实践

背景 部门内有一些亿级别核心业务表增速非常快,增量日均100W,但线上业务只依赖近一周的数据。随着数据量的迅速增长,慢SQL频发,数据库性能下降,系统稳定性受到严重影响。本篇文章,将分享如何使用MyBatis拦…

企业微信获客助手怎么实现抖音数据回传?

在数字化时代,企业微信获客助手和数灵通外链已经成为企业获取潜在客户的得力助手。通过在微信站外的各个渠道上捕获潜在客户,企业能够更好地满足客户需求并提高业务增长。而抖音作为当前最热门的短视频平台,也成为了企业营销的重要战场。 企业…

OTG -- ULPI接口芯片USB3318讲解(二)

目录 前沿 1 初识USB PHY芯片 2 ULPI接口与USB PHY芯片 3 USB3318简介 3.1 USB3318引脚定义 3.2 USB3318与ULPI接口时序 3.3 STM32F407 OTGHS如何驱动USB3318 3.4 USB3318原理图设计 4 总结 前沿 前面对STM32F407 OTG模块进行了简单的讲解,如果使用OTG_FS模…

期末成绩群发给家长

每当学期结束,老师们的邮箱和手机便会被成绩报告单填满。那么,如何高效地将成绩群发给家长呢? 一、邮件还是短信? 首先,选择一个合适的通讯方式是关键。邮件正式且便于附件,但短信更快捷。考虑到大多数家长…