zookeeper案例

目录

案例一:服务器动态上下线

服务端:

(1)先获取zookeeper连接

(2)注册服务器到zookeeper集群:

(3)业务逻辑(睡眠):

服务端代码如下:

客户端:

(1)获取zookeeper的连接:

(2)监听/servers下边的子节点的增减:

客户端代码如下:

案例二:ZooKeeper 分布式锁

分布式锁是什么?

锁的实现:

构造函数:

加锁函数:

解锁函数:

整体代码:

测试类代码 :

Curator 框架实现分布式锁案例:

实现步骤:

代码如下:


该案例主要也是客户端监听原理,客户端监听服务器的上下线情况

先在集群上创建/servers 节点(用于存储连接的服务器的主机和该服务器的节点数)相当于zookeeper集群

案例一:服务器动态上下线

服务端:

(1)先获取zookeeper连接

        创建类对象

该类为我们创建的服务端类:

        DistributeServer server = new DistributeServer();

        获取zookeeper连接:

自己创建连接方法:

    private void getconnect() throws IOException {
       zk = new ZooKeeper(connectstring, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
            }
        });

    }

 让后server对象在main函数中调用

(2)注册服务器到zookeeper集群:

注册是需要注册到zookeeper集群的/servers路径下,需要指定参数进行创建

private void regestServer(String hostname) throws InterruptedException, KeeperException {
zk.create(parentNode+"/"+hostname,hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    //  需要创建有序的临时节点所以-e(暂时) -s(有序)
        System.out.println("服务器"+hostname+"已注册连接");
    }

(3)业务逻辑(睡眠):

    private void business() throws InterruptedException {
    Thread.sleep(Long.MAX_VALUE);
    }

服务端代码如下:

package com.tangxiaocong.case1;
import org.apache.zookeeper.*;
import java.io.IOException;
/**
 * @Date 2023/8/10 19:06
 * @Author 
 */
public class DistributeServer {
   private static String connectstring="hadoop102:2181,hadoop103:2181,hadoop104:2181";
    private static int sessionTimeout=2000;
    private ZooKeeper zk =null;
    private String parentNode = "/servers";
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        //获取zk连接
        //创建
        DistributeServer server = new DistributeServer();
        server.getconnect();
        //注册服务器到zk集群
        //注册是需要在/servers节点下创建所开启的服务器的路径
        server.regestServer(args[0]);
        //业务逻辑(实际是延时让它睡觉---不然会注册完成就关闭)
        server.business();
    }
    private void business() throws InterruptedException {
    Thread.sleep(Long.MAX_VALUE);
    }
    private void regestServer(String hostname) throws InterruptedException, KeeperException {
zk.create(parentNode+"/"+hostname,hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    //  需要创建有序的临时节点所以-e(暂时) -s(有序)
        System.out.println("服务器"+hostname+"已注册连接");
    }
    private void getconnect() throws IOException {
       zk = new ZooKeeper(connectstring, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
            }
        });
    }
}

客户端:

(1)获取zookeeper的连接:

        先创建客户端对象,在进行构建获取zookeeper连接的方法,本方法对process方法进行了重写,填写了再发生上下线的运行逻辑

 private void getConnect() throws IOException {
       zk= new ZooKeeper(connectString, sessionTimeout, new Watcher() {
        @Override
        public void process(WatchedEvent watchedEvent) {
            try {
                getServerList();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (KeeperException e) {
                e.printStackTrace();
            }
        }
    });
    }

(2)监听/servers下边的子节点的增减:

        构建方法client.getServerList()来进行监听:

代码逻辑就是通过getChildren()方法获取指定目录下的所有子目录并开启监听

再进行遍历,把遍历结果封装到一个集合中,最后进行输出

 private void getServerList() throws InterruptedException, KeeperException {
        List<String> children = zk.getChildren("/servers", true);
       //该方法会获取指定路径下的所有子节点
        //true 会走初始化中的watch 也可以自己创建watch
        //把所有的服务器都封装到一个集合
        ArrayList<String> list = new ArrayList<>();
        for (String child : children) {
            byte[] data = zk.getData("/servers" +"/"+ child, false, null);
            //上边已经便利到一个服务器对象,再进行添加
            list.add(new String(data));
        }
        System.out.println(list);
    }

(3)业务逻辑同服务端不在赘述。

客户端代码如下:

package com.tangxiaocong.case1;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
 * @Date 2023/8/10 21:27
 * @Author 
 * 客户端的监听功能
 */
public class DistributeClient {
private String connectString="hadoop102:2181,hadoop103:2181,hadoop104:2181";
   private int sessionTimeout=2000;
   private ZooKeeper zk=null;
   public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        //获取zk连接
        DistributeClient client = new DistributeClient();
        client.getConnect();
        //监听/servers下边的子节点的增减
        client.getServerList();
       //业务逻辑(睡眠)
       client.business();
    }
    private void business() throws InterruptedException {
   Thread.sleep(Long.MAX_VALUE);
   }
    private void getServerList() throws InterruptedException, KeeperException {
        List<String> children = zk.getChildren("/servers", true);
       //该方法会获取指定路径下的所有子节点
        //true 会走初始化中的watch 也可以自己创建watch
        //把所有的服务器都封装到一个集合
        ArrayList<String> list = new ArrayList<>();
        for (String child : children) {
            byte[] data = zk.getData("/servers" +"/"+ child, false, null);
            //上边已经便利到一个服务器对象,再进行添加
            list.add(new String(data));
        }
        System.out.println(list);
    }
    private void getConnect() throws IOException {
       zk= new ZooKeeper(connectString, sessionTimeout, new Watcher() {
        @Override
        public void process(WatchedEvent watchedEvent) {
            try {
                getServerList();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (KeeperException e) {
                e.printStackTrace();
            }
        }
    });
    }
}

案例二:ZooKeeper 分布式锁

分布式锁是什么?

日常使用计算机的时候,我们的电脑不会只开一个进程,但是当“进程1”在访问某些资源的时候,不能被其他进程所访问,它就会去获得锁,把她所访问的资源进行锁上,对该资源进行独占。"进程 1"用完该资源以后就将锁释放掉,让其 他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的 访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。

锁的实现:

构造函数:

在该类中首先要实现构造方法,构造方法与类名相同,在该方法中需要获取连接,重写process方法,在该方法中实现释放CountDownLatch的类对象,有两种情况,正常连接释放一种,不是正常连接状态,则释放另一种。在构造方法中还要判断是否存在“/locks”路径,存在则正常退出,不存在则创建该路径。

加锁函数:

使用ZooKeeper对象进行创建节点(临时有序),让后获取“/locks”路径下的所有节点序号,对结果进行判断,如果返回的List集合只有一个节点,则直接返回,默认加锁,不用再做监听工作。如果不是只有一个节点,则对List集合进行排序,再获取他的节点名称,通过indexOf函数来获取该名称节点的下标。如果为-1,则数据异常,为0 则为最小节点,则直接退出,进行加锁不需要设置监听,结果为其他则需要设置监听,先设置监听字符串,当状态不发生改变会一致阻塞,只有上锁节点让位后会调用process方法进行释放。

解锁函数:

解锁就是直接删除节点即可

整体代码:

package com.tangxiaocong.case2;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
 * @Date 2023/8/12 19:56
 * @Author 
 */
public class DistributedLock {
 final    private String connectString="hadoop102:2181,hadoop103:2181,hadoop104:2181";
    final  private int sessionTimeout=2000;
    final    private   ZooKeeper zk;
    private String waitPath;
    private String currentModu;
    //为了程序的健壮性,创建该对象   等待操作
    final   private CountDownLatch waitLach=new CountDownLatch(1);
    final   private CountDownLatch countDownLatch=new CountDownLatch(1);
  public DistributedLock() throws IOException, InterruptedException, KeeperException {
        //获取连接
      zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
            //  connectLatch  如果正常连接zk  可以释放
                if (watchedEvent.getState()==Event.KeeperState.SyncConnected){
                    countDownLatch.countDown();
                }
                //检测到删除节点并且是前一个节点则释放waitlatch
                if (watchedEvent.getType()==Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath))
                {
                waitLach.countDown();
                }
            }
        });
      //等待是否正常连接  正常(已)连接会释放  否则阻塞
      countDownLatch.await();
        // 判断是否存在lock锁
        Stat stat = zk.exists("/locks", false);
        if (stat==null)
        {
            //创建该节点
            String s = zk.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.CONTAINER);
        }
    }
    //对zk加锁
    public void zkLock()  {
        //创建临时的带序号的节点
        try {
            currentModu = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            List<String> children = zk.getChildren("/locks", false);
             //如果只有一个节点   则直接获取
            if(children.size()==1)
            {
                return;
            }
            else {
                //排序
                Collections.sort(children);
                //直接从s后边开始   开始的下标就是length的长度
                String substring = currentModu.substring("/locks/".length());
                //通过substring来获取在List集合中的下标位置
                int index = children.indexOf(substring);
                if (index==-1)
                {
                    System.out.println("数据异常");
                }
                else if (index==0)
                {
                    return;
                }
                else {
                    //  需要监听上一个节点
                    waitPath="/locks/"+children.get(index-1);
                    zk.getData(waitPath,true,new Stat());
                    //等待监听
                    waitLach.await();
                    return;
                }
            }
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //判断创建的节点是否是最小序号的节点 如果是则获取锁  不是则监听他的前一个节点
    }
    //对zk解锁
    public void unzkLock()
    {
//删除节点
        try {
            //-1  是版本号
            zk.delete(this.currentModu,-1);
        } catch (InterruptedException  | KeeperException e) {
            e.printStackTrace();
        }
    }
}

测试类代码 :

package com.tangxiaocong.case2;
import org.apache.zookeeper.KeeperException;
import java.io.IOException;
/**
 * @Date 2023/8/12 22:31
 * @Author 唐晓聪
 */
public class DistributedLockTest
{
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        //创建两个客户端对象
        final    DistributedLock lock1 = new DistributedLock();
        final   DistributedLock lock2 = new DistributedLock();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {  lock1.zkLock();
                System.out.println("线程1启动获得锁");
                    Thread.sleep(5*1000);
                    lock1.unzkLock();
                    System.out.println("线程1释放锁");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock2.zkLock();
                System.out.println("线程2启动获得锁");

                    Thread.sleep(5*1000);
                    lock2.unzkLock();
                    System.out.println("线程2释放锁");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

Curator 框架实现分布式锁案例:

该案例是直接使用API进行实现分布式锁

实现步骤:

创建分布式锁对象,new InterProcessMutex(),参数1为所要连接的客户端,参数2为监听路径

参数1传入的为getCuratorFramework()自定义函数,

该函数通过工厂类的方式进行建立连接,返回创建好的客户端,让后start启动客户端

创建完分布式锁对象后创建两个线程,在线程中进行获得锁,释放锁的操作。

代码如下:

package com.tangxiaocong.case3;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
/**
 * @Date 2023/8/13 20:07
 * @Author 
 */
public class CuratorLockTest {
    public static void main(String[] args) {
        //创建分布式锁1
        //参数1   所连接的客户端 参数2 监听路径
        InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");
        //创建分布式锁2
        InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");
    //创建线程
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock1.acquire();
                    System.out.println("thread 1 acquire lock");
               lock1.acquire();
                    System.out.println("thread 1 again acquire lock");
                Thread.sleep(5*1000);
                lock1.release();
                    System.out.println("thread 1 relax lock");
                    lock1.release();
                    System.out.println("thread 1 again relax lock");
                    System.out.println();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock2.acquire();
                    System.out.println("thread 2 acquire lock");
                    lock2.acquire();
                    System.out.println("thread 2 again acquire lock");
                    Thread.sleep(5*1000);
                    lock2.release();
                    System.out.println("thread 2 relax lock");
                    lock2.release();
                    System.out.println("thread 2 again relax lock");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
    private static CuratorFramework getCuratorFramework() {
        ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);
        //通过工厂类的方式进行建立连接
        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("hadoop102:2181,hadoop102:2181,hadoop104:2181")
                .connectionTimeoutMs(2000)
                .sessionTimeoutMs(2000)
                .retryPolicy(policy)//连接失败后  间隔多少秒下次间隔
                .build();
        client.start();
        System.out.println("zookeeper  success start  !!!!!");
        return client;
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/78040.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

区间预测 | MATLAB实现QRBiLSTM双向长短期记忆神经网络分位数回归时间序列区间预测

区间预测 | MATLAB实现QRBiLSTM双向长短期记忆神经网络分位数回归时间序列区间预测 目录 区间预测 | MATLAB实现QRBiLSTM双向长短期记忆神经网络分位数回归时间序列区间预测效果一览基本介绍模型描述程序设计参考资料 效果一览 基本介绍 区间预测 | MATLAB实现QRBiLSTM双向长短…

SpringBoot案例 调用第三方接口传输数据

一、前言 最近再写调用三方接口传输数据的项目&#xff0c;这篇博客记录项目完成的过程&#xff0c;方便后续再碰到类似的项目可以快速上手 项目结构&#xff1a; 二、编码 这里主要介绍HttpClient发送POST请求工具类和定时器的使用&#xff0c;mvc三层架构编码不做探究 pom.x…

dom靶场

靶场下载地址&#xff1a; https://www.vulnhub.com/entry/domdom-1,328/ 一、信息收集 获取主机ip nmap -sP 192.168.16.0/24netdiscover -r 192.168.16.0/24端口版本获取 nmap -sV -sC -A -p 1-65535 192.168.16.209开放端口只有80 目录扫描 这里扫描php后缀的文件 g…

设计模式之责任链模式【Java实现】

责任链&#xff08;Chain of Resposibility&#xff09; 模式 概念 责任链&#xff08;chain of Resposibility&#xff09; 模式&#xff1a;为了避免请求发送者与多个请求处理者耦合在一起&#xff0c;于是将所有请求的处理者 通过前一对象记住其下一个对象的引用而连成一条…

Oracle 使用 CONNECT_BY_ROOT 解锁层次结构洞察:在 SQL 中导航数据关系

CONNECT_BY_ROOT 是一个在 Oracle 数据库中使用的特殊函数&#xff0c;它通常用于在层次查询中获取根节点的值。在使用 CONNECT BY 子句进行层次查询时&#xff0c;通过 CONNECT_BY_ROOT 函数&#xff0c;你可以在每一行中获取根节点的值&#xff0c;而不仅仅是当前行的值。 假…

打印出二进制的奇数位和偶数位

void print(int a) {int i0;printf("奇数位&#xff1a;");for(i30;i>0;i-2){printf("%d ",(a>>i)&1);}printf("\n");printf("偶数位&#xff1a;");for(i31;i>1;i-2){printf("%d ",(a>>i)&1);} …

Kotlin runBlocking launch多个协程读写mutableListOf时序

Kotlin runBlocking launch多个协程读写mutableListOf时序 import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.runBlockingfun main(args: Array<String>) {var lists mutableListOf<String>()runBlocking {launch {r…

【boost网络库从青铜到王者】第五篇:asio网络编程中的同步读写的客户端和服务器示例

文章目录 1、简介2、客户端设计3、服务器设计3.1、session函数3.2、StartListen函数3、总体设计 4、效果测试5、遇到的问题5.1、服务器遇到的问题5.1.1、不用显示调用bind绑定和listen监听函数5.1.2、出现 Error occured!Error code : 10009 .Message: 提供的文件句柄无效。 [s…

09- DMA(DirectMemoryAccess直接存储器访问)

DMA 09 、DMA(DirectMemoryAccess直接存储器访问)DMA配置流程 09 、DMA(DirectMemoryAccess直接存储器访问) DMA配置流程 dma.c文件 main.c文件 详见《stm32中文参考手册》表57。

基于php驾校驾驶理论考试模拟系统

驾校驾驶理论考试模拟系统&#xff0c;是基于php编程语言&#xff0c;mysql数据库进行开发&#xff0c;本系统分为用户和管理员两个角色&#xff0c;其中用户可以注册登陆系统&#xff0c;查看考试规则&#xff0c;进行驾照考试&#xff0c;查看考试得分&#xff0c;考试错题&a…

hdu8-Congruences(中国剩余定理)

Problem - 7363 (hdu.edu.cn) 参考&#xff1a;2023杭电暑假多校8 题解 3 5 7 10 | JorbanS_JorbanS的博客-CSDN博客 题解&#xff1a;&#xff08;中国剩余定理 增量法&#xff09; 注意验证和特判&#xff0c;此题中 pi 两两互质&#xff0c;可用CRT和增量法&#xff0c;当…

设计模式之门面模式(Facade)的C++实现

1、门面模式提出 在组件的开发过程中&#xff0c;某些接口之间的依赖是比较紧密的&#xff0c;如果某个接口发生变化&#xff0c;其他的接口也会跟着发生变化&#xff0c;这样的代码违背了代码的设计原则。门面设计模式是在外部客户程序和系统程序之间添加了一层中间接口&…

Android上架商城 隐私政策需要网页 没有怎么办

Android开发的项目上架商城的时候会需要你填写url&#xff0c;但其实并不需要真的去发布一个网站 使用腾讯文档新建文档 填写隐私政策 点击生成网页 再将网址填写即可 下面我找到的一个隐私政策文档供大家参考 将XXXX应用一键替换为自己的应用 将XXXXXX公司一键替换为公司 …

Docker容器与虚拟化技术:Docker镜像创建、Dockerfile实例

目录 一、理论 1.Docker镜像的创建方法 2.Docker镜像结构的分层 3.Dockerfile 案例 4.构建Systemctl镜像&#xff08;基于SSH镜像&#xff09; 5.构建Tomcat 镜像 6.构建Mysql镜像 二、实验 1.Docker镜像的创建 2. Dockerfile 案例 3.构建Systemctl镜像&#xff08;…

web后端解决跨域问题

目录 什么是跨域问题 为什么限制访问 解决 什么是跨域问题 域是指从一个域名的网页去请求另一个域名的资源。比如从www.baidu.com 页面去请求 www.google.com 的资源。但是一般情况下不能这么做&#xff0c;它是由浏览器的同源策略造成的&#xff0c;是浏览器对js施加的安全…

[oneAPI] 手写数字识别-卷积

[oneAPI] 手写数字识别 手写数字识别参数与包加载数据模型训练过程结果 oneAPI 比赛&#xff1a;https://marketing.csdn.net/p/f3e44fbfe46c465f4d9d6c23e38e0517 Intel DevCloud for oneAPI&#xff1a;https://devcloud.intel.com/oneapi/get_started/aiAnalyticsToolkitSam…

Vector

欢迎来到Cefler的博客&#x1f601; &#x1f54c;博客主页&#xff1a;那个传说中的man的主页 &#x1f3e0;个人专栏&#xff1a;题目解析 &#x1f30e;推荐文章&#xff1a;题目大解析2 目录 &#x1f449;&#x1f3fb;vector概念&#x1f449;&#x1f3fb;vector constr…

Python爬虫——scrapy_工作原理

引擎向spiders要url引擎把将要爬取的url给调度器调度器会将url生成的请求对象放入到指定的队列中从队列中出队一个请求引擎将请求交给下载器进行处理下载器发送请求获取互联网数据下载器将数据返回给引擎引擎将数据再次给到spidersspiders通过xpath解析该数据&#xff0c;得到数…

召集令:CloudQuery 社区有奖征文活动来啦!

CloudQuery 社区第一期征文活动来袭&#xff01;&#xff01;&#xff01;只要你对 CloudQuery 产品感兴趣&#xff0c;或者是希望了解 CQ &#xff0c;都可以来参加&#xff0c;在本期活动中&#xff0c;我们也为大家准备了多种主题供你选择&#xff0c;CQ 使用案例、版本对比…

字符设备驱动分布注册

驱动文件&#xff1a; 脑图&#xff1a; 现象&#xff1a;
最新文章