大数据Zookeeper--案例

文章目录

  • 服务器动态上下线监听案例
    • 需求
    • 需求分析
    • 具体实现
    • 测试
  • Zookeeper分布式锁案例
    • 原生Zookeeper实现分布式锁
    • Curator框架实现分布式锁
  • Zookeeper面试重点
    • 选举机制
    • 生产集群安装多少zk合适
    • zk常用命令

服务器动态上下线监听案例

需求

某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知
到主节点服务器的上下线。

需求分析

服务器动态上下线

具体实现

1)先在集群上创建/servers节点

[zk: localhost:2181(CONNECTED) 10] create /servers "servers" 
Created /servers

2)在Idea中创建包名:com.yudan.case1

3)服务器端向Zookeeper注册代码

import org.apache.zookeeper.*;

import java.io.IOException;

public class DistributeServer {

    private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
    private int sessionTime = 100000;
    private ZooKeeper zk;

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {

        DistributeServer server = new DistributeServer();

        // 1、获取zk连接
        server.getConnect();

        // 2、注册服务器到zk集群
        server.regist(args[0]);

        // 3、启动 业务逻辑(睡觉)
        server.business();
    }

	// 创建到 zk 的客户端连接
    private void getConnect() throws IOException {

        zk = new ZooKeeper(connectString, sessionTime, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {

            }
        });
    }

	// 注册到服务器
    private void regist(String hostname) throws InterruptedException, KeeperException {
        String create = zk.create("/servers/"+hostname, hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

        System.out.println(hostname + " " + "is online");
    }
	
	// 业务功能
    private void business() throws InterruptedException {
        Thread.sleep(Long.MAX_VALUE);
    }
}

4)客户端代码

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class DistributeClient {

    private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
    private int sessionTime = 100000;
    private ZooKeeper zk;

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        DistributeClient client = new DistributeClient();

        // 1、获取zk连接
        client.getConnect();

        // 2、监听/servers下面子节点的增加和删除
        client.getServersList();

        // 3、业务逻辑(睡觉)
        client.business();
    }

	// 创建到 zk 的客户端连接
    private void getConnect() throws IOException {
        zk = new ZooKeeper(connectString, sessionTime, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
            	// 再次启动监听
                try {
                    getServersList();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } catch (KeeperException e) {
                    throw new RuntimeException(e);
                }
            }
        });
    }

	// 获取服务器列表信息
    private void getServersList() throws InterruptedException, KeeperException {
        // 获取服务器子节点信息,并且对父节点进行监听
        List<String> children = zk.getChildren("/servers", true);
		
		// 存储服务器信息列表 
        ArrayList<String> servers = new ArrayList<>();
        
		// 遍历所有节点,获取节点中的主机名称信息 
        for (String child : children) {
            byte[] data = zk.getData("/servers/" + child, false, null);

            servers.add(new String(data));
        }

        // 打印
        System.out.println(servers);
    }

    private void business() throws InterruptedException {
        Thread.sleep(Long.MAX_VALUE);
    }
}

测试

1)在Linux命令行上操作增加减少服务器

(1)启动DistributeClient 客户端

(2)在hadoop102上zk的客户端/servers目录上创建临时带序号节点

[zk: localhost:2181(CONNECTED) 1]  create -e -s /servers/hadoop102 "hadoop102" 
[zk: localhost:2181(CONNECTED) 2]  create -e -s /servers/hadoop103 "hadoop103"

(3)观察Idea控制台变化

[hadoop102, hadoop103]

(4)执行删除操作

[zk: localhost:2181(CONNECTED) 8]  delete /servers/hadoop1020000000000 

(5)观察Idea控制台变化

[hadoop103] 

2)在Idea上操作增加减少服务器

(1)启动DistributeClient 客户端(如果已经启动过,不需要重启)

(2)启动DistributeServer 服务

  • 点击Edit Configurations…
    在这里插入图片描述
  • 在弹出的窗口中(Program arguments)输入想启动的主机,例如,hadoop102
    在这里插入图片描述
  • 回到DistributeServer的main方法,右键,在弹出的窗口中点击Run “DistributeServer.main()”
  • 观察DistributeServer控制台,提示hadoop102 is online
  • 观察DistributeClient控制台,提示hadoop102已经上线

Zookeeper分布式锁案例

什么叫做分布式锁呢?

比如说"进程1"在使用该资源的时候,会先去获得锁,"进程1"获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,"进程1"用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。
分布式锁

原生Zookeeper实现分布式锁

1)分布式锁实现

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class DistributeLock {

    private final String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
    private final int sessionTime = 100000;
    private final ZooKeeper zk;
    // 当前client等待的子节点
    private String waitPath;
    // zookeeper节点等待
    private CountDownLatch waitLatch = new CountDownLatch(1);
    // zookeeper连接
    private CountDownLatch connectLatch = new CountDownLatch(1);
    // 当前client创建的子节点
    private String currentMode;
	
	// 和 zk 服务建立连接,并创建根节点
    public DistributeLock() throws IOException, InterruptedException, KeeperException {

        // 1、获取连接
        zk = new ZooKeeper(connectString, sessionTime, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                // connectLatch 如果连接上zk 可以释放
                // 连接建立时, 打开latch, 唤醒wait在该latch上的线程
                if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
                    connectLatch.countDown();
                }

                // waitLatch 需要释放
                // 发生了waitPath的删除事件
                if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) {
                    waitLatch.countDown();
                }
            }
        });

        // 等待 zookeeper正常连接后,往下走程序
        connectLatch.await();

        // 2、判断根节点/locks是否存在
        Stat stat = zk.exists("/locks", false);

        if (stat == null) {
            // 创建一下根节点
            zk.create("/locks","locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
        }
    }

    // 对zk加锁
    public void zkLock() {
        // 创建对应的临时带序号节点
        try {
            currentMode = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

            // 判断创建的节点是否是最小的序号节点,如果是获取到锁;如果不是, 监听他序号的前一个节点
            List<String> children = zk.getChildren("/locks", false);

            // 如果children 只有一个值,那就直接获取锁;如果有多个节点,需要判断,哪个节点最小
            if (children.size() == 1) {
                return;
            } else {
                // 对children集合内的节点进行排序
                Collections.sort(children);

                // 获取节点名称 seq-
                String thisNode = currentMode.substring("/locks/".length());

                // 通过seq- 获取到该节点在children集合中的位置
                int index = children.indexOf(thisNode);

                // 判断
                if (index == -1) {
                    System.out.println("数据异常");
                } else if (index == 0) {
                    // 就一个节点,可以获取锁了
                    return;
                }else {
                    // 需要监听前一个节点
                    waitPath = "/locks/" + children.get(index-1);
                    zk.getData(waitPath,true,null);

                    // 等待监听
                    waitLatch.await();

                    return;
                }
            }

        } catch (KeeperException e) {
            throw new RuntimeException(e);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    // 对zk解锁
    public void unzkLock() {
        // 删除节点
        try {
            zk.delete(currentMode,-1);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (KeeperException e) {
            throw new RuntimeException(e);
        }
    }
}

2)分布式锁测试

(1)创建两个线程

import org.apache.zookeeper.KeeperException;

import java.io.IOException;

public class DistributeLockTest {

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
	
		// 创建分布式锁1
        final DistributeLock lock1 = new DistributeLock();
		// 创建分布式锁2
        final DistributeLock lock2 = new DistributeLock();

        new Thread(new Runnable() {
            @Override
            public void run() {
            	// 获取锁对象
                try {
                    lock1.zkLock();
                    System.out.println("线程1 启动,获取到锁");
                    Thread.sleep(5 * 1000);

                    lock1.unzkLock();
                    System.out.println("线程1 释放锁");
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
            	// 获取锁对象
                try {
                    lock2.zkLock();
                    System.out.println("线程2 启动,获取到锁");
                    Thread.sleep(5 * 1000);

                    lock2.unzkLock();
                    System.out.println("线程2 释放锁");
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }).start();

    }
}

(2)观察控制台变化

线程1获取锁 
线程1释放锁 
线程2获取锁 
线程2释放锁

Curator框架实现分布式锁

1)原生的Java API开发存在的问题

(1)会话连接是异步的,需要自己去处理。比如使用CountDownLatch

(2)Watch需要重复注册,不然就不能生效

(3)开发的复杂性还是比较高的

(4)不支持多节点删除和创建。需要自己去递归

2)Curator是一个专门解决分布式锁的框架,解决了原生Java API开发分布式遇到的问题。

详情请查看官方文档:https://curator.apache.org/index.html

3)Curator 案例实操

(1)添加依赖

<dependency> 
	<groupId>org.apache.curator</groupId> 
	<artifactId>curator-framework</artifactId> 
	<version>4.3.0</version> 
</dependency> 

<dependency> 
	<groupId>org.apache.curator</groupId> 
	<artifactId>curator-recipes</artifactId> 
	<version>4.3.0</version> 
</dependency> 

<dependency> 
	<groupId>org.apache.curator</groupId> 
	<artifactId>curator-client</artifactId> 
	<version>4.3.0</version> 
</dependency> 

(2)代码实现

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class CuratorLockTest {

    public static void main(String[] args) {

        // 创建分布式锁1
        InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");

        // 创建分布式锁2
        InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock1.acquire();
                    System.out.println("线程1 获取到锁");

                    lock1.acquire();
                    System.out.println("线程1 再次获取到锁");

                    Thread.sleep(5 * 1000);

                    lock1.release();
                    System.out.println("线程1 释放锁");

                    lock1.release();
                    System.out.println("线程1 再次释放锁");

                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock2.acquire();
                    System.out.println("线程2 获取到锁");

                    lock2.acquire();
                    System.out.println("线程2 再次获取到锁");

                    Thread.sleep(5 * 1000);

                    lock2.release();
                    System.out.println("线程2 释放锁");

                    lock2.release();
                    System.out.println("线程2 再次释放锁");

                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        }).start();
    }

	// 分布式锁初始化
    private static CuratorFramework getCuratorFramework() {
		
		// 重试策略,初始时间3秒,重试3次
        ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);

        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("hadoop102:2181,hadoop103:2181,hadoop104:2181")
                .connectionTimeoutMs(100000)
                .sessionTimeoutMs(100000)
                .retryPolicy(policy).build();

        // 启动客户端
        client.start();

        System.out.println("zookeeper 启动成功!");
        return client;
    }
}

(2)观察控制台变化:

线程1获取锁 
线程1再次获取锁 
线程1释放锁 
线程1再次释放锁 
线程2获取锁 
线程2再次获取锁 
线程2释放锁 
线程2再次释放锁

Zookeeper面试重点

选举机制

半数机制,超过半数的投票通过,即通过。

(1)第一次启动选举规则:

投票过半数时,服务器id大的胜出

(2)第二次启动选举规则:

①EPOCH大的直接胜出

②EPOCH相同,事务id大的胜出

③事务id相同,服务器id大的胜出

生产集群安装多少zk合适

安装奇数台。

生产经验:

  • 10台服务器:3台zk;
  • 20台服务器:5台zk;
  • 100台服务器:11台zk;
  • 200台服务器:11台zk

zk常用命令

ls、get、create、delete

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/373613.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

unity 增加系统时间显示、FPS帧率、ms延迟

代码 using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks;using UnityEngine;public class Frame : MonoBehaviour {// 记录帧数private int _frame;// 上一次计算帧率的时间private float _lastTime;// 平…

Docker 一小时从入门到实战 —— Docker commands | Create your own image | vs VM ... 基本概念扫盲

Docker crash course 文章目录 Docker crash course1. What and Why of Docker?2.1 What2.2 What problem does it solve?2.2.1 before containers2.1.2 with containers 2. Docker vs Virtual Machines2.1 Difference2.2 Benefits 3. Install docker locally4. Images vs Co…

微信小程序(三十四)搜索框-带历史记录

注释很详细&#xff0c;直接上代码 新增内容&#xff1a; 1.搜索框基本模板 2.历史记录基本模板 3.细节处理 源码&#xff1a; index.wxml <!-- 1.点击搜索按钮a.非空判断b.历史记录&#xff08;去重&#xff09;c.清空搜索框d.去除前后多余空格2.删除搜索 3.无搜索记录不…

springboot158基于springboot的医院资源管理系统

简介 【毕设源码推荐 javaweb 项目】基于springbootvue 的 适用于计算机类毕业设计&#xff0c;课程设计参考与学习用途。仅供学习参考&#xff0c; 不得用于商业或者非法用途&#xff0c;否则&#xff0c;一切后果请用户自负。 看运行截图看 第五章 第四章 获取资料方式 **项…

HTTP相关问题

目录 1.从输入URL到页面展示到底发生了什么&#xff1f; 2.HTTP状态码有哪些&#xff1f; 2.1 2XX(成功状态码) 2.2 3XX(重定向状态码) 2.3 4XX(客户端错误状态码) 2.4 5XX(服务端错误状态码) 3.HTTP 请求头中常见的字段有哪些&#xff1f; 4.HTTP和HTTPS有什么区别&…

[C++] 如何使用Visual Studio 2022 + QT6创建桌面应用

安装Visual Studio 2022和C环境 [Visual Studio] 基础教程 - Window10下如何安装VS 2022社区版_visual studio 2022 社区版-CSDN博客 安装QT6开源版 下载开源版本QT Try Qt | 开发应用程序和嵌入式系统 | Qt Open Source Development | Open Source License | Qt 下载完成&…

AI人工智能怎么自动抠图去除背景

抠图在许多场合中都发挥着重要的作用&#xff0c;它可以帮助我们去除图片的背景&#xff0c;从而将图片转换成透明底色&#xff0c;方便我们为图片更换其他的背景。为了实现照片去除背景只提取人物&#xff0c;我们需要使用专业的图片处理工具&#xff0c;并进行一系列的操作。…

算法效率的度量-时间空间复杂度

常对幂指阶 1.时间复杂度 事前预估 算法 时间开销 T(n) 与 问题规模 n 的关系&#xff08; T 表示 “ time ”&#xff09; 一般默认问题规模为n。 1.单循环 2.嵌套两层循环都为n 3.单层循环指数递增型 4.搜索型 链接 &#xff1a;第七章查找算法&#xff01;&#xff01…

elementUI 表格中如何合并动态数据的单元格

elementUI 表格中如何合并动态数据的单元格 ui中提供的案例是固定写法无法满足 实际开发需求 下面进行改造如下 准备数据如下 //在表格中 设置单元格的方法 :span-method"spanMethodFun" <el-table :data"tableData" border :span-method"spa…

2024第八届生物饲料高质量发展论坛会议通知

饲料工业发展空间大&#xff0c;产量持续增长&#xff0c;品质与质量也在不断提高&#xff0c;饲料工业是支撑现代畜牧水产养殖业发展的基础产业&#xff0c;是关系到城乡居民动物性食品供应的民生产业。“十四五”时期是我国由全面建设小康社会向基本实现社会主义现代化迈进的…

LeetCode、790. 多米诺和托米诺平铺【中等,二维DP,可转一维】

文章目录 前言LeetCode、790. 多米诺和托米诺平铺【中等&#xff0c;二维DP&#xff0c;可转一维】题目与分类思路二维解法二维转一维 资料获取 前言 博主介绍&#xff1a;✌目前全网粉丝2W&#xff0c;csdn博客专家、Java领域优质创作者&#xff0c;博客之星、阿里云平台优质…

性能测试工具LoadRunner与登录性能测试分析

1. LoadRunner与Jmeter Jmeter是开源免费的&#xff0c;LoadRunner是商业收费的。 但是LoadRunner具有非常强大的录制功能&#xff0c;具有丰富且灵活的场景&#xff0c;具备丰富的报告性能。 1&#xff09;Jmeter没有录制功能 2&#xff09;LoadRunner可以设计非常丰富的测试…

构造回文数组

目录 原题描述&#xff1a; 题目描述 时间&#xff1a;1s 空间&#xff1a;256M 题目描述&#xff1a; 输入格式&#xff1a; 输出格式&#xff1a; 样例1输入&#xff1a; 样例1输出&#xff1a; 样例2输入&#xff1a; 样例2输出&#xff1a; 约定&#xff1a; 作…

JVM 性能调优 - JVM 参数基础(2)

查看 JDK 版本 $ java -version java version "1.8.0_151" Java(TM) SE Runtime Environment (build 1.8.0_151-b12) Java HotSpot(TM) 64-Bit Server VM (build 25.151-b12, mixed mode) 查看 Java 帮助文档 $ java -help 用法: java [-options] class [args...] …

CSS:三列布局

三列布局是指左右两列定宽&#xff0c;中间自适应。最终效果如下&#xff1a; HTML&#xff1a; <div class"container"><div class"left"></div><div class"center"></div><div class"right">…

javaEE - 23( 21000 字 Servlet 入门 -1 )

一&#xff1a;Servlet 1.1 Servlet 是什么 Servlet 是一种实现动态页面的技术. 是一组 Tomcat 提供给程序猿的 API, 帮助程序猿简单高效的开发一个 web app. 构建动态页面的技术有很多, 每种语言都有一些相关的库/框架来做这件事&#xff0c;Servlet 就是 Tomcat 这个 HTTP…

404. Sum of Left Leaves(左叶子之和)

问题描述 给定二叉树的根节点 root &#xff0c;返回所有左叶子之和。 问题分析 我们可以查看如果一个叶子是左叶子就加上其值然后返回&#xff0c;如果是右叶子则不用关。 代码 int sumOfLeftLeaves(struct TreeNode* root) {int sum 0;if(root!NULL){if(root->left!…

板块零 IDEA编译器基础:第二节 创建JAVA WEB项目与IDEA基本设置 来自【汤米尼克的JAVAEE全套教程专栏】

板块零 IDEA编译器基础&#xff1a;第二节 创建JAVA WEB项目与IDEA基本设置 一、创建JAVA WEB项目&#xff08;1&#xff09;普通项目升级成WEB项目&#xff08;2&#xff09;创建JAVA包 二、IDEA 开荒基本设置&#xff08;1&#xff09;设置字体字号自动缩放 &#xff08;2&am…

安全SCDN有什么作用

当前网络安全形势日益严峻&#xff0c;网络攻击事件频发&#xff0c;攻击手段不断升级&#xff0c;给企业和个人带来了严重的安全威胁。在这种背景下&#xff0c;安全SCDN作为一种网络安全解决方案&#xff0c;受到了广泛的关注。那么&#xff0c;安全SCDN真的可以应对网络攻击…

为后端做准备

这里写目录标题 flask 文件上传与接收flask应答&#xff08;接收请求&#xff08;文件、数据&#xff09;flask请求&#xff08;上传文件&#xff09;传递参数和文件 argparse 不从命令行调用参数1、设置default值2、"从命令行传入的参数".split()3、[--input,内容] …
最新文章