Zookeeper Java SDK 开发入门

文章目录

    • 一、概述
    • 二、导入依赖包
    • 三、与 Zookeeper 建立连接
    • 四、判断 ZooKeeper 节点是否存在
    • 四、创建 ZooKeeper 节点数据
    • 五、获取 ZooKeeper 节点数据
    • 六、修改 ZooKeeper 节点数据
    • 七、异步获取 ZooKeeper 节点数据
    • 八、完整示例

如果您还没有安装Zookeeper请看ZooKeeper 安装说明,Zookeeper 命令使用方法和数据说明。

一、概述

  • ZooKeeper是一个开源的、分布式的协调服务,它主要用于分布式系统中的数据管理和协调任务。它提供了一个具有高可用性的分布式环境,用于存储和管理小规模数据,例如配置信息、命名服务、分布式锁等。

  • 本文主要介绍如何使用 Java 与 ZooKeeper 建立连接,进行数据创建、修改、读取、删除等操作。

  • 源码地址:https://github.com/apache/zookeeper

    在这里插入图片描述

二、导入依赖包

  • 在 pom.xml 文件中导入 Zookeeper 包,注意一般这个包的版本要和您安装的 Zookeeper 服务端版本一致。

    <dependency>
       <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.8.2</version>
    </dependency>
    
    

三、与 Zookeeper 建立连接

  • 与ZooKeeper集群建立连接使用 ZooKeeper 类,传递三个参数,分别是
    • connectionString ,是ZooKeeper 集群地址(没连接池的概念,是Session的概念)
    • sessionTimeout , 是ZooKeeper Session 数据超时时间(也就是这个Session关闭后,与这个Session相关的数据在ZooKeeper中保存多信)。
    • watcher, ZooKeeper Session 级别监听器( Watcher),(Watch只发生在读方法上,如 get、exists等)
    private static ZooKeeper testCreateZookeeper() throws IOException, InterruptedException, KeeperException {
        final CountDownLatch countDownLatch = new CountDownLatch(1);

        // ZooKeeper 集群地址(没连接池的概念,是Session的概念)
        String connectionString = "192.168.8.51:2181,192.168.8.52:2181,192.168.8.53:2181";
        // ZooKeeper Session 数据超时时间(也就是这个Session关闭后,与这个Session相关的数据在ZooKeeper中保存多信)。
        Integer sessionTimeout = 3000;
        // ZooKeeper Session 级别 Watcher(Watch只发生在读方法上,如 get、exists)
        final ZooKeeper zooKeeper = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                try {
                    Event.KeeperState state = watchedEvent.getState();
                    Event.EventType type = watchedEvent.getType();
                    String path = watchedEvent.getPath();

                    switch (state) {
                        case Unknown:
                            break;
                        case Disconnected:
                            break;
                        case NoSyncConnected:
                            break;
                        case SyncConnected:
                            countDownLatch.countDown();
                            break;
                        case AuthFailed:
                            break;
                        case ConnectedReadOnly:
                            break;
                        case SaslAuthenticated:
                            break;
                        case Expired:
                            break;
                        case Closed:
                            break;
                    }
                    switch (type) {
                        case None:
                            break;
                        case NodeCreated:
                            break;
                        case NodeDeleted:
                            break;
                        case NodeDataChanged:
                            break;
                        case NodeChildrenChanged:
                            break;
                        case DataWatchRemoved:
                            break;
                        case ChildWatchRemoved:
                            break;
                        case PersistentWatchRemoved:
                            break;
                    }

                    System.out.println("Session watch state=" + state);
                    System.out.println("Session watch type=" + type);
                    System.out.println("Session watch path=" + path);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        // 由于建立连接是异步的,这里先阻塞等待连接结果
        countDownLatch.await();
        ZooKeeper.States state = zooKeeper.getState();
        switch (state) {
            case CONNECTING:
                break;
            case ASSOCIATING:
                break;
            case CONNECTED:
                break;
            case CONNECTEDREADONLY:
                break;
            case CLOSED:
                break;
            case AUTH_FAILED:
                break;
            case NOT_CONNECTED:
                break;
        }
        System.out.println("ZooKeeper state=" + state);

        return zooKeeper;
    }

四、判断 ZooKeeper 节点是否存在

  • 创建节点数据使用 exists 方法,传递四个参数
    • path , 表示节点目录名称
    • watch, 表示监听器(只对该路径有效)
    • stat, 判断结果回调函数
    • context, 自定义上下文对象
    private static void testExists(ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException {
        // 判断 ZooKeeper 节点是否存在
        Object context = new Object();
        zooKeeper.exists("/yiqifu", new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {

            }
        }, new AsyncCallback.StatCallback() {
            @Override
            public void processResult(int i, String s, Object o, Stat stat) {
                if(null != stat){
                    System.out.println("ZooKeeper /yiqifu 节点存在");
                }
                else {
                    System.out.println("ZooKeeper /yiqifu 节点不存在");
                }
            }
        }, context);
    }

四、创建 ZooKeeper 节点数据

  • 创建节点数据使用 create 方法,传递四个参数
    • path , 表示节点目录名称
    • data , 表示节点数据
   private static void testCreateNode(ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException 	{

        // 在 ZooKeeper 中创建节点
        String nodeName = zooKeeper.create("/yiqifu", "test create data".getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        System.out.println("ZooKeeper 创建节点成功:" + nodeName);
    }

五、获取 ZooKeeper 节点数据

  • 获取 ZooKeeper 节点数据使用 getData 方法,传递三个参数
    • path , 表示节点目录名称
    • watch, 表示路径级别的监听器,这个监听器只对该路径下的数据操作监听生效。
    private static void testGetdata(final ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException {

        // 获取 ZooKeeper 节点数据,这里设置了Path级Watch
        final Stat stat = new Stat();
        byte[] nodeData = zooKeeper.getData("/yiqifu", new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                try {
                    Event.KeeperState state = watchedEvent.getState();
                    Event.EventType type = watchedEvent.getType();
                    String path = watchedEvent.getPath();

                    System.out.println("Path watch state=" + state);
                    System.out.println("Path watch type=" + type);
                    System.out.println("Path watch path=" + path);

                    //zooKeeper.getData("/yiqifu", true, stat); // 这里会使用Session级Watch
                    zooKeeper.getData("/yiqifu", this, stat);
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, stat);
        System.out.println("ZooKeeper 同步获取节点数据:" + new String(nodeData));
    }

六、修改 ZooKeeper 节点数据

  • 修改 ZooKeeper 节点数据使用 setData 方法,传递三个参数
    • path , 表示节点目录名称。
    • data, 表示新数据。
    • version, 表示数据版本。
    private static void testSetdata(ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException {
        // 更新 ZooKeeper 节点数据(修改数据会触发Watch)
        zooKeeper.setData("/yiqifu", "test modify data 1 ".getBytes(), 0);
        zooKeeper.setData("/yiqifu", "test modify data 2 ".getBytes(), 1);
    }

七、异步获取 ZooKeeper 节点数据

  • 修改 ZooKeeper 节点数据使用 getData 方法,传递三个参数

    • path , 表示节点目录名称。

    • watch, 表示是否触发监听器。

    • dataCallback, 表示异步获取数据的回调函数。

 private static void testAsyncGetdata(ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException {

        // 获取 ZooKeeper 节点数据(使用异步回调方式)
        Object context = new Object();
        zooKeeper.getData("/yiqifu", false, new AsyncCallback.DataCallback() {
            @Override
            public void processResult(int i, String s, Object o, byte[] bytes, Stat stat) {
                System.out.println("ZooKeeper 同步获取节点数据的目录:"+s);
                System.out.println("ZooKeeper 异步获取节点数据:"+new String(bytes));
            }
        }, context);
    }

八、完整示例

package top.yiqifu.study.p131;


import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

public class Test01_Zookeeper {

    public static void main(String[] args) {
        try {
            // 创建 ZooKeeper 对象
            ZooKeeper zooKeeper = testCreateZookeeper();
            // 在 ZooKeeper 创建数据节点
            testCreateNode(zooKeeper);
            // 在 ZooKeeper 中同步获取节点数据
            testGetdata(zooKeeper);
            // 在 ZooKeeper 中更新节点数据
            testSetdata(zooKeeper);
            // 在 ZooKeeper 异步获取节点数据
            testAsyncGetdata(zooKeeper);

            Thread.sleep(3000);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }


    private static ZooKeeper testCreateZookeeper() throws IOException, InterruptedException, KeeperException {
        final CountDownLatch countDownLatch = new CountDownLatch(1);

        // ZooKeeper 集群地址(没连接池的概念,是Session的概念)
        String connectionString = "192.168.8.51:2181,192.168.8.52:2181,192.168.8.53:2181";
        // ZooKeeper Session 数据超时时间(也就是这个Session关闭后,与这个Session相关的数据在ZooKeeper中保存多信)。
        Integer sessionTimeout = 3000;
        // ZooKeeper Session 级别 Watcher(Watch只发生在读方法上,如 get、exists)
        final ZooKeeper zooKeeper = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                try {
                    Event.KeeperState state = watchedEvent.getState();
                    Event.EventType type = watchedEvent.getType();
                    String path = watchedEvent.getPath();

                    switch (state) {
                        case Unknown:
                            break;
                        case Disconnected:
                            break;
                        case NoSyncConnected:
                            break;
                        case SyncConnected:
                            countDownLatch.countDown();
                            break;
                        case AuthFailed:
                            break;
                        case ConnectedReadOnly:
                            break;
                        case SaslAuthenticated:
                            break;
                        case Expired:
                            break;
                        case Closed:
                            break;
                    }
                    switch (type) {
                        case None:
                            break;
                        case NodeCreated:
                            break;
                        case NodeDeleted:
                            break;
                        case NodeDataChanged:
                            break;
                        case NodeChildrenChanged:
                            break;
                        case DataWatchRemoved:
                            break;
                        case ChildWatchRemoved:
                            break;
                        case PersistentWatchRemoved:
                            break;
                    }

                    System.out.println("Session watch state=" + state);
                    System.out.println("Session watch type=" + type);
                    System.out.println("Session watch path=" + path);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        countDownLatch.await();
        ZooKeeper.States state = zooKeeper.getState();
        switch (state) {
            case CONNECTING:
                break;
            case ASSOCIATING:
                break;
            case CONNECTED:
                break;
            case CONNECTEDREADONLY:
                break;
            case CLOSED:
                break;
            case AUTH_FAILED:
                break;
            case NOT_CONNECTED:
                break;
        }
        System.out.println("ZooKeeper state=" + state);

        return zooKeeper;
    }

    private static void testCreateNode(ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException {

        // 在 ZooKeeper 中创建节点
        String nodeName = zooKeeper.create("/yiqifu", "test create data".getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        System.out.println("ZooKeeper 创建节点成功:" + nodeName);
    }

    private static void testGetdata(final ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException {

        // 获取 ZooKeeper 节点数据,这里设置了Path级Watch
        final Stat stat = new Stat();
        byte[] nodeData = zooKeeper.getData("/yiqifu", new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                try {
                    Event.KeeperState state = watchedEvent.getState();
                    Event.EventType type = watchedEvent.getType();
                    String path = watchedEvent.getPath();

                    System.out.println("Path watch state=" + state);
                    System.out.println("Path watch type=" + type);
                    System.out.println("Path watch path=" + path);

                    //zooKeeper.getData("/yiqifu", true, stat); // 这里会使用Session级Watch
                    zooKeeper.getData("/yiqifu", this, stat);
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, stat);
        System.out.println("ZooKeeper 同步获取节点数据:" + new String(nodeData));
    }

    private static void testSetdata(ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException {
        // 更新 ZooKeeper 节点数据(修改数据会触发Watch)
        zooKeeper.setData("/yiqifu", "test modify data 1 ".getBytes(), 0);
        zooKeeper.setData("/yiqifu", "test modify data 2 ".getBytes(), 1);
    }

    private static void testAsyncGetdata(ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException {

        // 获取 ZooKeeper 节点数据(使用异步回调方式)
        Object context = new Object();
        zooKeeper.getData("/yiqifu", false, new AsyncCallback.DataCallback() {
            @Override
            public void processResult(int i, String s, Object o, byte[] bytes, Stat stat) {
                System.out.println("ZooKeeper 同步获取节点数据的目录:"+s);
                System.out.println("ZooKeeper 异步获取节点数据:"+new String(bytes));
            }
        }, context);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/150355.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

问题 H: 棋盘游戏(二分图变式)

题意&#xff1a;要求找到 不放车就无法达到最大数的点 的个数 题解&#xff1a;1.以行列绘制二分图 2.先算出最大二分匹配数 3.依次遍历所有边 删除该边&#xff0c;并计算二分匹配最大值 &#xff08;若小于原最大值即为重要点&#xff09;&#xff0…

Vue3中使用provide和inject依赖注入完成父组件和孙子组件之间参数传递

Vue3中使用provide和inject依赖注入完成父组件和孙子组件之间参数传递 官网介绍 注意以下写法都是使用setup 代码结构 依赖注入-父组件 import { ref, provide } from "vue"const outDialogCardInfo ref() function updateOutDialogCardInfo(item) {console.log…

sCrypt Playground 发布

sCrypt Playground 发布了。 与桌面IDE 完全相同的功能&#xff0c;但是无需安装。体验地址: https://playground.scrypt.io。 请不要在 sCrypt Playground 上存储重要数据。我们会不定时清除用户保存在其上的数据。

韩国黄金代理商主动出击时机

受中东局势影响&#xff0c;十月底国际价格一度重新站上2000美元大关&#xff0c;韩国的黄格也随之出现上涨&#xff0c;当地投资者对黄金投资的热情再次升温。在韩国首尔市钟路附近的金店一条街&#xff0c;聚集了大大小小上百家金店&#xff0c;即使是在平日的中午&#xff0…

软件测试方案该怎么做?

做某项工作之前都需要有个计划或方案&#xff0c;软件测试亦如此。软件测试方案就是描述测试目的、范围、方法和软件测试的重点等文档。对于验证软件产品的可接受程度编写测试计划文档是一种有用的方式&#xff0c;可以使测试工作和整个开发工作融合起来&#xff0c;让资源和变…

OpenAI发布会震撼AI界,千字文全面解读

你的朋友圈是否被近日 OpenAI 的开发者大会刷屏了&#xff1f;这是预料之中的事。在近日&#xff0c;OpenAI 首度召开了旨在定义未来应用市场的开发者大会。 让我们迅速捕捉 OpenAI 最新的动态以及 ChatGPT 的更新亮点。 1、OpenAI 最新动态 今晨的盛会聚焦于以下要点&#xf…

docker安装SMQTT

docker安装SMQTT smqtt介绍 官方地址: https://www.smqtt.cc/ 官方文档地址: https://wiki.smqtt.cc/docs/smqtt/ 一款高性能&开源的MQTT服务器&#xff0c;支持单机、容器化、集群部署&#xff0c;支持多种协议&#xff0c;具备低延迟&#xff0c;高吞吐量&#xff0c;…

windows 电脑删除不了.TTF的文件

出现这个问题&#xff0c;首先检查&#xff0c;你的.ttf文件是不是在哪个软件中打开了。 如果是&#xff0c;先关掉&#xff0c;然后在删一遍试试。 如果这个还是不行试着打开控制面板>外观和个性化> 字体 > 字体设置>还原默认字体设置勾选&#xff0c;然后重启一下…

网络嵌入综述

图嵌入综述整理&#xff08;上&#xff09; 来源&#xff1a;图算法探索系列&#xff08;一&#xff09;&#xff1a;图嵌入模型的原理和应用篇【万字长文】 图9是DeepWalk模型在推荐场景下的应用。图9&#xff08;a&#xff09;显示的是不同用户在不同Session中的item点击序列…

使用MathType将文献中的数学公式进行转换

mathtype将文献中的数学公式进行转换 文章目录 mathtype将文献中的数学公式进行转换一、截图识别二、MathType下载与设置2.1、MathType介绍2.2、[下载位置](http://www.51xiazai.cn/soft/5975.htm)2.3、设置 三、使用MathType&#xff1a; 一、截图识别 这两个在线网站都可以将…

A股风格因子看板 (2023.11 第10期)

该因子看板跟踪A股风格因子&#xff0c;该因子主要解释沪深两市的市场收益、刻画市场风格趋势的系列风格因子&#xff0c;用以分析市场风格切换、组合风格暴 露等。 今日为该因子跟踪第10期&#xff0c;指数组合数据截止日2023-10-31&#xff0c;要点如下 近1年A股风格因子收益…

谷歌提出AGI的6大原则,和5大能力等级

随着ChatGPT等大模型的出现,AGI概念正在从哲学层面快速转向实际应用落地&#xff0c;并且ChatGPT已经展示出了初级AGI的功能&#xff08;如AutoGPT&#xff09;,有不少专家认为&#xff0c;AGI时代可能在10年内到来。 因此&#xff0c;需要一个明确的技术框架来讨论和衡量不同…

【带头学C++】----- 六、结构体 ---- 6.7 结构体的对齐规则

6.7 结构体的对齐规则 6.7.1 知识点引入 6.7.2 结构体自动对齐规则 1、确定分配单位(一行分配多少字节) 结构体中最大的基本类型长度决定 2、确定成员的偏移量 成员偏移量成员自身类型的整数倍 需要根据你所在平台的位数&#xff0c;32位和64为类型大小不一样。cpu一次读取…

行情分析——加密货币市场大盘走势(11.15)

大饼按照预期等待下跌即可&#xff0c;现在已经下跌到35500&#xff0c;昨日晚上跌破了35000&#xff0c;现在放心大胆空。笔者现在都是空单在手。 空单策略&#xff1a;入场36000附近 止盈34000-32000 止损39000 以太昨日策略进场&#xff0c;已经止盈了&#xff0c;最低跌到…

11-Vue基础之组件通信(二)

个人名片&#xff1a; &#x1f60a;作者简介&#xff1a;一名大二在校生 &#x1f921; 个人主页&#xff1a;坠入暮云间x &#x1f43c;座右铭&#xff1a;懒惰受到的惩罚不仅仅是自己的失败&#xff0c;还有别人的成功。 &#x1f385;**学习目标: 坚持每一次的学习打卡 文章…

git push 报错 The requested URL returned error: 500

今天gitpush时报错The requested URL returned error: 500 看报错应该是本地和gitlab服务器之间通信的问题&#xff0c;登录gitlab网站查看 登录时报错无法通过ldapadmin认证&#xff0c;ldap服务器连接失败。 首先&#xff0c;登录ldap服务器&#xff0c;查看是否是ldap服务…

浅聊汽车供应链数智化发展趋势

“2023中国汽车供应链大会暨第二届中国新能源智能网联汽车生态大会”在11月10日—12日&#xff0c;武汉经开区举办。围绕供应链安全与布局、新型汽车供应链打造、传统供应链升级、全球化发展等热点话题进行深入交流与探讨&#xff0c;寻找构建世界一流汽车供应链的对策、方法和…

macos死机后IDEA打不开,Cannot connect to already running IDE instance.

Cannot connect to already running IDE instance. Exception: Process 573 is still running 解决办法 进入&#xff1a;/Users/lzq/Library/Application Support/JetBrains 找到IDEA的目录删除隐藏文件夹 .lock rm -rf .lock

【Docker】五分钟完成Docker部署Java应用,你也可以的!!!

文章目录 前言一、部署步骤1.项目结构2.Dockerfile3.docker-compose.yml4.启动5.常用命令 总结 前言 本文基于Docker Compose部署Java应用&#xff0c;请确保你已经安装了Docker和Docker Compose。 十分钟就能上手docker&#xff1f;要不你也试试&#xff1f; 一、部署步骤 1…

陪诊小程序|陪诊系统打开陪护行业新世界

随着社会老龄化加剧&#xff0c;以及人们对于医疗服务质量的要求提高&#xff0c;陪诊服务逐渐成为了医疗体系中不可或缺的一部分。而陪诊小程序作为陪诊服务的线上平台&#xff0c;更是受到了广泛的关注。下面小编就给大家讲解下陪诊小程序的功能并阐述其系统优势。 陪诊小程序…
最新文章