使用maxwell实时同步mysql数据到kafka

一、软件环境:

操作系统:CentOS release 6.5 (Final)
java版本: jdk1.8
zookeeper版本: zookeeper-3.4.11
kafka 版本: kafka_2.11-1.1.0.tgz
maxwell版本:maxwell-1.16.0.tar.gz
注意 : 关闭所有机器的防火墙,同时注意启动可以相互telnet ip 端口

二、环境部署

1、安装jdk

export JAVA_HOME=/usr/java/jdk1.8.0_181
export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib:$JAVA_HOME/jre/lib:$CLASSPATH

2、安装maven

参考:https://www.cnblogs.com/wcwen1990/p/7227278.html

3、安装zookeeper

1)下载软件:

wget http://101.96.8.157/archive.apache.org/dist/zookeeper/zookeeper-3.4.11/zookeeper-3.4.11.tar.gz 
tar zxvf zookeeper-3.4.11.tar.gz  
mv zookeeper-3.4.11 /usr/local/zookeeper

2)修改环境变量

编辑 /etc/profile 文件, 在文件末尾添加以下环境变量配置:

## ZooKeeper Env

export ZOOKEEPER_HOME=/usr/local/zookeeper
export PATH=$PATH:$ZOOKEEPER_HOME/bin

运行以下命令使环境变量生效: source /etc/profile

3)重命名配置文件

初次使用 ZooKeeper 时,需要将$ZOOKEEPER_HOME/conf 目录下的 zoo_sample.cfg 重命名为 zoo.cfg

mv  $ZOOKEEPER_HOME/conf/zoo_sample.cfg $ZOOKEEPER_HOME/conf/zoo.cfg

4)单机模式–修改配置文件
创建目录/usr/local/zookeeper/data 和/usr/local/zookeeper/logs 修改配置文件

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper/data
dataLogDir=/usr/local/zookeeper/logs
clientPort=2181

5)启动zookeeper

bin/zkServer.sh start

ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

6)验证zukeeper服务

telnet chavin.king 2181

Trying 192.168.72.130...
Connected to chavin.king.
Escape character is '^]'.
stat
Zookeeper version: 3.4.11-37e277162d567b55a07d1755f0b31c32e93c01a0, built on 11/01/2017 18:06 GMT
Clients:
 /192.168.72.130:44054[0](queued=0,recved=1,sent=0)

Latency min/avg/max: 0/0/0
Received: 1
Sent: 0
Connections: 1
Outstanding: 0
Zxid: 0x1a4
Mode: standalone
Node count: 147
Connection closed by foreign host.

4、安装zkui

git clone https://github.com/DeemOpen/zkui.git
cd zkui 
mvn clean install

修改配置文件默认值

#vim config.cfg
    serverPort=9090     #指定端口
    zkServer=192.168.1.110:2181
    sessionTimeout=300000

启动程序至后台
2.0-SNAPSHOT 会随软件的更新版本不同而不同,执行时请查看target 目录中真正生成的版本

nohup java -jar target/zkui-2.0-SNAPSHOT-jar-with-dependencies.jar & 

用浏览器访问:

http://chavin.king:9090/

5、安装kafka

wget http://archive.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz
tar -zxvf kafka_2.11-1.1.0.tgz -C /usr/local/kafka

mkdir -p /usr/local/kafka/data-logs

修改配置文件

vim server.properties

log.dirs=/usr/local/kafka/data-logs
zookeeper.connect=chavin.king:2181

启动kafka

bin/kafka-server-start.sh -daemon config/server.properties &

创建topic

bin/kafka-topics.sh --create --zookeeper chavin.king:2181 --replication-factor 1 --partitions 1 --topic maxwell 

查看所有topic

bin/kafka-topics.sh --list --zookeeper chavin.king:2181

启动producer

bin/kafka-console-producer.sh --broker-list chavin.king:9092 --topic maxwell

启动consumer

bin/kafka-console-consumer.sh --zookeeper chavin.king:2181 --topic maxwell --from-beginning

或者

bin/kafka-console-consumer.sh --bootstrap-server chavin.king:9092  --from-beginning --topic maxwell

6、开启mysql binlog

 more /etc/my.cnf
[client]
default_character_set = utf8

[mysqld]
basedir = /usr/local/mysql-5.6.24
datadir = /usr/local/mysql-5.6.24/data
port = 3306
#skip-grant-tables
character_set_server = utf8
log_error = /usr/local/mysql-5.6.24/data/mysql.err

binlog_format = row
log-bin = /usr/local/mysql-5.6.24/logs/mysql-bin
sync_binlog = 2
max_binlog_size = 16M
expire_logs_days = 10

server_id = 1
sql_mode=NO_ENGINE_SUBSTITUTION,STRICT_TRANS_TABLES

7、安装maxwell

wget https://github.com/zendesk/maxwell/releases/download/v1.16.0/maxwell-1.16.0.tar.gz
tar -zxvf maxwell-1.16.0.tar.gz -C /usr/local/maxwell 

启动maxwell

nohup bin/maxwell --user='canal' --password='canal' --host='chavin.king' --producer=kafka --kafka.bootstrap.servers=chavin.king:9092 > maxwell.log &

8、开发kafka消费程序

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class KafkaTest {

	public static void main(String[] args){
	
		String topicName = "maxwell";
		String groupID = "example-group";
	
		Properties props = new Properties();
		props.put("bootstrap.servers","192.168.72.130:9092");
		props.put("group.id",groupID);
		props.put("auto.offset.reset","earliest");
		props.put("serializer.encoding","utf-8");
		props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
		props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
	
		KafkaConsumer<String,String > consumer = new KafkaConsumer<String, String>(props);
		
		consumer.subscribe(Arrays.asList(topicName));
	
		try{
			while(true){
				ConsumerRecords<String,String> records = consumer.poll(1000);
			for (ConsumerRecord<String, String> record : records)
				System.out.printf("offset = %d, key = %s, value = %s\n",
						record.offset(), record.key(), record.value());
			}
		}finally{
			consumer.close();
		}
	}

}

ideal启动以上消费程序

9、测试

offset = 3428, key = {"database":"chavin","table":"dept","_uuid":"0b195622-e7c7-4cf6-8203-5576752f9024"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2276,"data":{"deptno":10,"dname":"ACCOUNTING","loc":"NEW YORK"}}
offset = 3429, key = {"database":"chavin","table":"dept","_uuid":"333b98e3-a597-47fc-95ad-6e59ee0dadf6"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2277,"data":{"deptno":20,"dname":"RESEARCH","loc":"DALLAS"}}
offset = 3430, key = {"database":"chavin","table":"dept","_uuid":"cf9fa656-ed13-4cb0-b909-d1218e402e96"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2278,"data":{"deptno":30,"dname":"SALES","loc":"CHICAGO"}}
offset = 3431, key = {"database":"chavin","table":"dept","_uuid":"7f2f683a-39bc-498b-9a4e-920697b3da18"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2279,"data":{"deptno":40,"dname":"OPERATIONS","loc":"BOSTON"}}
offset = 3432, key = {"database":"chavin","table":"dept","_uuid":"ef639cd1-9206-4145-8608-372bbaaaa14a"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2280,"data":{"deptno":10,"dname":"ACCOUNTING","loc":"NEW YORK"}}
offset = 3433, key = {"database":"chavin","table":"dept","_uuid":"ebdf15ad-7149-4ac4-b567-627dd910182c"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2281,"data":{"deptno":20,"dname":"RESEARCH","loc":"DALLAS"}}
offset = 3434, key = {"database":"chavin","table":"dept","_uuid":"1bc667f4-15f0-438c-8139-6f1cbe8b4db3"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2282,"data":{"deptno":30,"dname":"SALES","loc":"CHICAGO"}}
offset = 3435, key = {"database":"chavin","table":"dept","_uuid":"1613b695-284a-49e3-9793-74fb2cf8dc5b"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2283,"data":{"deptno":40,"dname":"OPERATIONS","loc":"BOSTON"}}
offset = 3436, key = {"database":"chavin","table":"dept","_uuid":"f72a800c-92cc-4494-9438-bc61c58b5cb9"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2284,"data":{"deptno":10,"dname":"ACCOUNTING","loc":"NEW YORK"}}
offset = 3437, key = {"database":"chavin","table":"dept","_uuid":"9887d144-d75d-46f8-96ba-ad7c3adf45fd"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2285,"data":{"deptno":20,"dname":"RESEARCH","loc":"DALLAS"}}

至此数据同步已经可以正常进行了,是不是很简单。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/498042.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

ES学习日记(二)-------集群设置

上一节写了elasticsearch单节点安装和配置,现在说集群,简单地说就是在多台服务器上搭建单节点,在配置文件里面增加多个ip地址即可,过程同单节点部署,主要说集群配置 注意:不建议在之前单节点es上修改配置为集群,据说运行之后会生成很多文件,在单点基础上修改容易出现未知问题,…

zedboard+AD9361 运行 open WiFi

先到github上下载img&#xff0c;网页链接如下&#xff1a; https://github.com/open-sdr/openwifi?tabreadme-ov-file 打开网页后下载 openwifi img 用win32 Disk lmager 把文件写入到SD卡中&#xff0c;这一步操作会把SD卡重新清空&#xff0c;注意保存数据。这个软件我会…

最小可行产品需要最小可行架构——可持续架构(三)

前言 最小可行产品&#xff08;MVP&#xff09;的概念可以帮助团队专注于尽快交付他们认为对客户最有价值的东西&#xff0c;以便在投入大量时间和资源之前迅速、廉价地评估产品的市场规模。MVP不仅需要考虑产品的市场可行性&#xff0c;还需要考虑其技术可行性&#xff0c;以…

【JavaWeb】Day24.Web入门——HTTP协议(一)

HTTP协议——概述 1.介绍 HTTP&#xff1a;Hyper Text Transfer Protocol(超文本传输协议)&#xff0c;规定了浏览器与服务器之间数据传输的规则。 http是互联网上应用最为广泛的一种网络协议http协议要求&#xff1a;浏览器在向服务器发送请求数据时&#xff0c;或是服务器在…

Oracle存数字精度问题number、binary_double、binary_float类型

--表1 score是number(10,5)类型 create table TEST1 (score number(10,5) ); --表2 score是binary_double类型 create table TEST2 (score binary_double ); --表3 score是binary_float类型 create table TEST3 (score binary_float );实验一&#xff1a;分别往三张表插入 小数…

Redis开源协议变更!Garnet:微软开源代替方案?

Garnet&#xff1a;微软开源的高性能替代方案&#xff0c;秉承兼容 RESP 协议的同时&#xff0c;以卓越性能和无缝迁移能力重新定义分布式缓存存储&#xff01; - 精选真开源&#xff0c;释放新价值。 概览 最近&#xff0c;Redis修改了开源协议&#xff0c;从BSD变成了 SSPLv…

青龙脚本 猫猫看看

话不多说开图 https://raw.githubusercontent.com/Huansheng1/my-qinglong-js/main/%E7%8C%AB%E7%8C%AB%E7%9C%8B%E7%9C%8B.py

探索Python人工智能在气象监测中的创新应用

Python是功能强大、免费、开源&#xff0c;实现面向对象的编程语言&#xff0c;在数据处理、科学计算、数学建模、数据挖掘和数据可视化方面具备优异的性能&#xff0c;这些优势使得Python在气象、海洋、地理、气候、水文和生态等地学领域的科研和工程项目中得到广泛应用。可以…

Jupyter安装教程(Windows 版)

这几年AI人工智能这么火&#xff0c;陆陆续续诞生了很多新的产品&#xff0c;新的商业模式&#xff0c;随着Open-sora 1.0开源之后&#xff0c;让我更加地相信GPT5也即将要到来了&#xff0c;看来不学机器学习和深度学习&#xff0c;恐怕是要跟不上时代了。于是就想着今年开始接…

【管理咨询宝藏59】某大型汽车物流战略咨询报告

本报告首发于公号“管理咨询宝藏”&#xff0c;如需阅读完整版报告内容&#xff0c;请查阅公号“管理咨询宝藏”。 【管理咨询宝藏59】某大型汽车物流战略咨询报告 【格式】PDF 【关键词】HR调研、商业分析、管理咨询 【核心观点】 - 重新评估和调整商业模式&#xff0c;开拓…

智能设备配网保姆级教程

设备配网 简单来说&#xff0c;配网就是将物联网&#xff08;IoT&#xff09;设备连接并注册到云端&#xff0c;使其拥有与云端远程通信的能力。配网后&#xff0c;智能设备才能被手机应用或者项目管理后台控制&#xff0c;依托于智能场景创造价值。本文介绍了配网的相关知识&…

【分享】CMMI V3.0版本做了哪些改变?哪些企业适合申请CMMI3.0

​ CMM是由美国卡内基梅隆大学软件工程研究所1987年开发成功的&#xff0c;它基于过去所有软件工程过程改进的成果&#xff0c;吸取了以往软件工程的经验教训&#xff0c;提供了一个基于过程改进的框架&#xff1b;CMMI(Capability Maturity Model Integration能力成熟度模型集…

代码随想录算法训练营第三十六天|435. 无重叠区间,763. 划分字母区间

435. 无重叠区间 题目 给定一个区间的集合 intervals &#xff0c;其中 intervals[i] [starti, endi] 。返回 需要移除区间的最小数量&#xff0c;使剩余区间互不重叠 。 示例 1: 输入: intervals [[1,2],[2,3],[3,4],[1,3]] 输出: 1 解释: 移除 [1,3] 后&#xff0c;剩下…

文献学习(自备)

收官大作&#xff0c;多组学融合的新套路发NC&#xff01;&#xff01; - 知乎 (zhihu.com) Hofbauer cell function in the term placenta associates with adult cardiovascular and depressive outcomes | Nature Communications 病理性胎盘炎症会增加几种成人疾病的风险&a…

Linux——信号的保存与处理

目录 前言 一、信号的常见概念 1.信号递达 2.信号未决 3.信号阻塞 二、Linux中的递达未决阻塞 三、信号集 四、信号集的处理 1.sig相关函数 2.sigprocmask()函数 3.sigpending()函数 五、信号的处理时机 六、信号处理函数 前言 在之前&#xff0c;我们学习了信号…

Codeforces Round 937 (Div. 4) A - F 题解

A. Stair, Peak, or Neither? 题解&#xff1a;直接比较输出即可。 代码&#xff1a; #include<bits/stdc.h> using namespace std ; typedef long long ll ; const int maxn 2e5 7 ; const int mod 1e9 7 ; inline ll read() {ll x 0, f 1 ;char c getchar()…

IntelliJ IDEA中遇到的“cannot access java.lang.String“错误及其解决方案(day8)

intelliJ 今天遇到使用intelliJ遇到了一个新错误&#xff0c;有问题就解决问题是一个程序员最基本的修养&#xff0c;如下&#xff1a; 在上面的代码中&#xff0c;我使用了this.这个关键字&#xff0c;发现出现了以上问题&#xff0c;找了一些资料&#xff0c;不是很明白&am…

Untiy 布局控制器Aspect Ratio Fitter

Aspect Ratio Fitter是Unity中的一种布局控制器组件&#xff0c;用于根据指定的宽高比来调整包含它的UI元素的大小。实际开发中&#xff0c;它可以确保UI元素保持特定的宽高比&#xff0c;无论UI元素的内容或父容器的大小如何变化。 如图为Aspect Ratio Fitter组件的基本属性&…

阿里云服务器价格表(2024年最新阿里云服务器租用优惠价格表)

2024年阿里云服务器优惠价格表&#xff0c;一张表整理阿里云服务器最新报价&#xff0c;阿里云服务器网aliyunfuwuqi.com整理云服务器ECS和轻量应用服务器详细CPU内存、公网带宽和系统盘详细配置报价单&#xff0c;大家也可以直接移步到阿里云CLUB中心查看 aliyun.club 当前最新…

为什么我的微信小程序 窗口背景色backgroundColor设置参数 无效的问题处理记录!

当我们在微信小程序 json 中设置 backgroundColor 时&#xff0c;实际在电脑的模拟器中根本看不到效果。 这是因为 backgroundColor 指的窗体背景颜色&#xff0c;而不是页面的背景颜色&#xff0c;即窗体下拉刷新或上拉加载时露出的背景。在电脑的模拟器中是看不到这个动作的…
最新文章