Zookeeper-应用实战

Zookeeper Java客户端实战

ZooKeeper应用的开发主要通过Java客户端API去连接和操作ZooKeeper集群。

  • ZooKeeper官方的Java客户端API。

  • 第三方的Java客户端API,比如Curator。

ZooKeeper官方的客户端API提供了基本的操作:创建会话、创建节点、读取节点、更新数据、删除节点和检查节点是否存在等。

对于实际开发来说,ZooKeeper官方API有一些不足之处:

  • ZooKeeper的Watcher监测是一次性的,每次触发之后都需要重新进行注册。

  • 会话超时之后没有实现重连机制。

  • 异常处理烦琐,ZooKeeper提供了很多异常,对于开发人员来说可能根本不知道应该如何处理这些抛出的异常。

  • 仅提供了简单的byte[]数组类型的接口,没有提供Java POJO级别的序列化数据处理接口。

  • 创建节点时如果抛出异常,需要自行检查节点是否存在。

  • 无法实现级联删除。

总之,ZooKeeper官方API功能比较简单,在实际开发过程中比较笨重,一般不推荐使用。

Zookeeper 原生Java客户端使用

引入zookeeper client依赖

<!-- zookeeper client -->
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.8.0</version>
</dependency>

注意:保持与服务端版本一致,避免兼容性的问题

ZooKeeper常用构造器

ZooKeeper (connectString, sessionTimeout, watcher)
参数描述
connectString逗号分隔的列表,每个ZooKeeper节点是一个host.port对,host 是机器名或者IP地址,port是ZooKeeper节点对客户端提供服务的端口号。客户端会任意选取connectString 中的一个节点建立连接。
sessionTimeoutsession timeout时间。
watcher接收到来自ZooKeeper集群的事件。

使用 zookeeper 原生 API,连接zookeeper集群

public class ZkClientDemo
{
    private static final String CONNECT_STR = "你的公网IP:2181";
    
    private final static String CLUSTER_CONNECT_STR = "192.168.65.156:2181,192.168.65.190:2181,192.168.65.200:2181";
    
    public static void main(String[] args)
        throws Exception
    {
        
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        ZooKeeper zooKeeper = new ZooKeeper(CONNECT_STR, 4000, new Watcher()
        {
            @Override
            public void process(WatchedEvent event)
            {
                if (Event.KeeperState.SyncConnected == event.getState() && event.getType() == Event.EventType.None)
                {
                    // 如果收到了服务端的响应事件,连接成功
                    countDownLatch.countDown();
                    System.out.println("连接建立");
                }
            }
        });
        System.out.printf("连接中");
        countDownLatch.await();
        // CONNECTED
        System.out.println(zooKeeper.getState());
        
        // 创建持久节点
        zooKeeper.create("/user", "gao".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }
}

Zookeeper主要方法

方法功能
create(path, data, acl,createMode)创建一个给定路径的 znode,并在 znode 保存 data[]的 数据,createMode指定 znode 的类型。
delete(path, version)如果给定 path 上的 znode 的版本和给定的 version 匹配, 删除 znode
exists(path, watch)判断给定 path 上的 znode 是否存在,并在 znode 设置一个 watch
getData(path, watch)返回给定 path 上的 znode 数据,并在 znode 设置一个 watch。
setData(path, data, version)如果给定 path 上的 znode 的版本和给定的 version 匹配,设置 znode 数据。
getChildren(path, watch)返回给定 path 上的 znode 的子 znode 名字,并在 znode 设置一个 watch。
sync(path)把客户端 session 连接节点和 leader 节点进行同步。

方法特点:

  • 所有获取 znode 数据的 API 都可以设置一个 watch 用来监控 znode 的变化。 

  • 所有更新 znode 数据的 API 都有两个版本: 无条件更新版本和条件更新版本。如果 version 为 -1,更新为无条件更新。否则只有给定的 version 和 znode 当前的 version 一样,才会进行更新。

  • 所有的方法都有同步和异步两个版本。同步版本的方法发送请求给 ZooKeeper 并等待服务器的响应。异步版本把请求放入客户端的请求队列,然后马上返回。异步版本通过 callback 来接受来自服务端的响应。

同步创建节点:

public void createTest() throws KeeperException, InterruptedException 
{
    String path = zooKeeper.create(ZK_NODE, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    log.info("created path: {}",path);
}

异步创建节点:

public void createAsycTest() throws InterruptedException 
{
     zooKeeper.create(ZK_NODE, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
             CreateMode.PERSISTENT,
             (rc, path, ctx, name) -> log.info("rc  {},path {},ctx {},name {}",rc,path,ctx,name),"context");
    TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
}

修改数据:

public void setTest() throws KeeperException, InterruptedException 
{
    Stat stat = new Stat();
    byte[] data = zooKeeper.getData(ZK_NODE, false, stat);
    log.info("修改前: {}",new String(data));
    zooKeeper.setData(ZK_NODE, "changed!".getBytes(), stat.getVersion());
    byte[] dataAfter = zooKeeper.getData(ZK_NODE, false, stat);
    log.info("修改后: {}",new String(dataAfter));
}

Curator开源客户端使用

Curator是Netflix公司开源的一套ZooKeeper客户端框架,和ZkClient一样它解决了非常底层的细节开发工作,包括连接、重连、反复注册Watcher的问题以及NodeExistsException异常等。

Curator是Apache基金会的顶级项目之一,Curator具有更加完善的文档,另外还提供了一套易用性和可读性更强的Fluent风格的客户端API框架。

Curator还为ZooKeeper客户端框架提供了一些比较普遍的、开箱即用的、分布式开发用的解决方案,例如Recipe、共享锁服务、Master选举机制和分布式计算器等,帮助开发者避免了“重复造轮子”的无效开发工作。

在实际的开发场景中,使用Curator客户端就足以应付日常的ZooKeeper集群操作的需求。

官网:Apache Curator

引入依赖

  • curator-framework是对ZooKeeper的底层API的一些封装。

  • curator-client提供了一些客户端的操作,例如重试策略等。

  • curator-recipes封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等。

<!-- zookeeper client -->
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.8.0</version>
</dependency>

<!--curator-->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>5.1.0</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
        </exclusion>
    </exclusions>
</dependency>

创建一个客户端实例

使用curator-framework包操作ZooKeeper前,首先要创建一个客户端实例(CuratorFramework类型的对象)

  • 使用工厂类CuratorFrameworkFactory的静态newClient()方法

//重试策略 
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3)
//创建客户端实例
CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
//启动客户端
client.start();
  • 使用工厂类CuratorFrameworkFactory的静态builder构造者方法

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("192.168.128.129:2181")
                .sessionTimeoutMs(5000)  // 会话超时时间
                .connectionTimeoutMs(5000) // 连接超时时间
                .retryPolicy(retryPolicy)
                .namespace("base") // 包含隔离名称
                .build();
client.start();
  • connectionString:服务器地址列表,一个或多个。(多个地址列表用逗号分隔)

  • retryPolicy:重试策略,当客户端异常退出或者与服务端失去连接的时候,可以通过设置客户端重新连接 ZooKeeper 服务端。( Curator 内部,通过判断服务器返回的 keeperException 状态代码判断是否重试)

策略名称描述
ExponentialBackoffRetry重试一组次数,重试之间的睡眠时间增加
RetryNTimes重试最大次数
RetryOneTime只重试一次
RetryUntilElapsed在给定的时间结束之前重试
  • 超时时间:Curator 客户端创建过程中,有两个超时时间的设置。一个是 sessionTimeoutMs 会话超时时间,用来设置该条会话在 ZooKeeper 服务端的失效时间。另一个是 connectionTimeoutMs 客户端创建会话的超时时间,用来限制客户端发起一个会话连接到接收 ZooKeeper 服务端应答的时间。

创建节点

public void testCreate() throws Exception 
{
    String path = curatorFramework.create().forPath("/curator-node");
    curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/curator-node","some-data".getBytes())
    log.info("curator create node :{}  successfully.",path);
}

一次性创建带层级结构的节点

public void testCreateWithParent() throws Exception 
{
    String pathWithParent="/node-parent/sub-node-1";
    String path = curatorFramework.create().creatingParentsIfNeeded().forPath(pathWithParent);
    log.info("curator create node :{}  successfully.",path);
}

获取数据

public void testGetData() throws Exception 
{
    byte[] bytes = curatorFramework.getData().forPath("/curator-node");
    log.info("get data from  node :{}  successfully.",new String(bytes));
}

更新节点

public void testSetData() throws Exception 
{
    curatorFramework.setData().forPath("/curator-node","changed!".getBytes());
    byte[] bytes = curatorFramework.setData().forPath("/curator-node");
    log.info("get data from  node /curator-node :{}  successfully.",new String(bytes));
}

删除节点

public void testDelete() throws Exception 
{
    String pathWithParent="/node-parent";
    curatorFramework.delete().guaranteed().deletingChildrenIfNeeded().forPath(pathWithParent);
}

guaranteed:保障删除成功,底层工作方式是:只要该客户端的会话有效,就会在后台持续发起删除请求,直到该数据节点在 ZooKeeper 服务端被删除。

deletingChildrenIfNeeded:指定了该函数后,系统在删除该数据节点的时候会以递归的方式直接删除其子节点,以及子节点的子节点。

异步接口

public interface BackgroundCallback
{
    /**
     * Called when the async background operation completes
     *
     * @param client the client
     * @param event operation result details
     * @throws Exception errors
     */
    public void processResult(CuratorFramework client, CuratorEvent event) throws Exception;
}

默认在 EventThread 中调用

public void test() throws Exception 
{
    //inBackground 异步处理默认在EventThread中执行
    curatorFramework.getData().inBackground((item1, item2) -> {
        log.info(" background: {}", item2);
    }).forPath(ZK_NODE);

    TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
}

指定线程池

public void test() throws Exception 
{
    ExecutorService executorService = Executors.newSingleThreadExecutor();
    
    curatorFramework.getData().inBackground((item1, item2) -> {
        log.info(" background: {}", item2);
    },executorService).forPath(ZK_NODE);

    TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
}

Curator 监听器

/**
 * Receives notifications about errors and background events
 */
public interface CuratorListener
{
    /**
     * Called when a background task has completed or a watch has triggered
     *
     * @param client client
     * @param event the event
     * @throws Exception any errors
     */
    public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception;
}

Curator 引入了 Cache 来实现对 Zookeeper 服务端事件监听,Cache 提供了反复注册的功能。Cache 分为两类注册类型:节点监听和子节点监听。

node cache:NodeCache 对某一个节点进行监听

@Slf4j
public class NodeCacheTest extends AbstractCuratorTest{

    public static final String NODE_CACHE="/node-cache";

    @Test
    public void testNodeCacheTest() throws Exception {

        createIfNeed(NODE_CACHE);
        NodeCache nodeCache = new NodeCache(curatorFramework, NODE_CACHE);
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                log.info("{} path nodeChanged: ",NODE_CACHE);
                printNodeData();
            }
        });

        nodeCache.start();
    }


    public void printNodeData() throws Exception {
        byte[] bytes = curatorFramework.getData().forPath(NODE_CACHE);
        log.info("data: {}",new String(bytes));
    }
}

path cache: PathChildrenCache 会对子节点进行监听,但是不会对二级子节点进行监听,

@Slf4j
public class PathCacheTest extends AbstractCuratorTest{

    public static final String PATH="/path-cache";

    @Test
    public void testPathCache() throws Exception {

        createIfNeed(PATH);
        PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework, PATH, true);
        pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                log.info("event:  {}",event);
            }
        });

        // 如果设置为true则在首次启动时就会缓存节点内容到Cache中
        pathChildrenCache.start(true);
    }
}

tree cache:TreeCache 使用一个内部类TreeNode来维护这个一个树结构。并将这个树结构与ZK节点进行了映射。所以TreeCache 可以监听当前节点下所有节点的事件。

@Slf4j
public class TreeCacheTest extends AbstractCuratorTest{

    public static final String TREE_CACHE="/tree-path";

    @Test
    public void testTreeCache() throws Exception {
        createIfNeed(TREE_CACHE);
        TreeCache treeCache = new TreeCache(curatorFramework, TREE_CACHE);
        treeCache.getListenable().addListener(new TreeCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
                log.info(" tree cache: {}",event);
            }
        });
        treeCache.start();
    }
}

Zookeeper在分布式命名服务中的实战

ZooKeeper的命名服务主要是利用ZooKeeper节点的树形分层结构和子节点的顺序维护能力,来为分布式系统中的资源命名。

典型的分布式命名服务有:

  • 分布式API目录

  • 分布式节点命名

  • 分布式ID生成器

分布式API目录

为分布式系统中各种API接口服务的名称、链接地址,提供类似JNDI(Java命名和目录接口)中的文件系统的功能。借助于ZooKeeper的树形分层结构就能提供分布式的API调用功能。

著名的Dubbo分布式框架就是应用了ZooKeeper的分布式的JNDI功能。在Dubbo中,使用ZooKeeper维护的全局服务接口API的地址列表。大致的思路为:

  • 服务提供者(Service Provider)在启动的时候,向ZooKeeper上的指定节点/dubbo/${serviceName}/providers写入自己的API地址,这个操作就相当于服务的公开。

  • 服务消费者(Consumer)启动的时候,订阅节点/dubbo/{serviceName}/providers下的服务提供者的URL地址,获得所有服务提供者的API。

分布式节点命名

一个分布式节点通常有很多节点,并且节点数量不固定。业务膨胀和流量洪峰会有新的节点加入集群,服务故障和网络波动等原因有节点退出集群。

那么分布式中大量的节点要如何命名呢?

可用于生成集群节点的编号的方案:

1、使用数据库的自增ID特性,用数据表存储机器的MAC地址或者IP来维护。

2、使用ZooKeeper持久顺序节点的顺序特性来维护节点的NodeId编号。

  • 启动节点服务,连接ZooKeeper,检查命名服务根节点是否存在,如果不存在,就创建系统的根节点。

  • 在根节点下创建一个临时顺序ZNode节点,取回ZNode的编号把它作为分布式系统中节点的NODEID。

  • 如果临时节点太多,可以根据需要删除临时顺序ZNode节点。

分布式ID生成器

分布式ID生成器的使用场景:

  • 大量的数据记录,需要分布式ID。

  • 大量的系统消息,需要分布式ID。

  • 大量的请求日志,如restful的操作记录,需要唯一标识,以便进行后续的用户行为分析和调用链路分析。

  • 分布式节点的命名服务,往往也需要分布式ID。

分布式ID生成系统需要满足条件:

1、全局唯一:不能出现重复ID。

2、高可用:ID生成系统是基础系统,被许多关键系统调用,一旦宕机,就会造成严重影响。

分布式的ID生成器方案有:

1、Java的UUID。

2、分布式缓存Redis生成ID:利用Redis的原子操作INCR和INCRBY,生成全局唯一的ID。

3、Twitter的SnowFlake算法。

4、ZooKeeper生成ID:利用ZooKeeper的顺序节点,生成全局唯一的ID。

5、MongoDb的ObjectId:MongoDB是一个分布式的非结构化NoSQL数据库,每插入一条记录会自动生成全局唯一的一个“_id”字段值,它是一个12字节的字符串,可以作为分布式系统中全局唯一的ID。

基于Zookeeper实现分布式ID生成器

ZooKeeper具备自动编号能力的节点类型:

  • PERSISTENT_SEQUENTIAL 持久化顺序节点

  • EPHEMERAL_SEQUENTIAL 临时顺序节点

ZooKeeper的每一个节点都会为它的第一级子节点维护一份顺序编号,记录每个子节点创建的先后顺序,顺序编号分布式同步且全局唯一。

通过创建ZooKeeper的临时顺序节点的方法,生成全局唯一的ID

@Slf4j
public class IDMaker extends CuratorBaseOperations
{
    private String createSeqNode(String pathPefix) throws Exception
    {
        CuratorFramework curatorFramework = getCuratorFramework();
        // 创建一个临时顺序节点
        String destPath = curatorFramework.create()
            .creatingParentsIfNeeded()
            .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
            .forPath(pathPefix);
        return destPath;
    }
    
    public String makeId(String path) throws Exception
    {
        String str = createSeqNode(path);
        if (null != str)
        {
            // 获取末尾的序号
            int index = str.lastIndexOf(path);
            if (index >= 0)
            {
                index += path.length();
                return index <= str.length() ? str.substring(index) : "";
            }
        }
        return str;
    }

	// 测试
    public static void main(String[] args) throws InterruptedException
    {
        IDMaker idMaker = new IDMaker();
        String pathPrefix = "/idmarker/id-";
        for (int i = 0; i < 5; i++)
        {
            new Thread(() -> {
                for (int j = 0; j < 10; j++)
                {
                    String id = null;
                    try
                    {
                        id = idMaker.makeId(pathPrefix);
                        log.info("{}线程第{}个创建的id为{}", Thread.currentThread().getName(), j, id);
                    }
                    catch (Exception e)
                    {
                        e.printStackTrace();
                    }
                }
            }, "thread" + i).start();
        }
        Thread.sleep(Integer.MAX_VALUE);
    }
}

public class CuratorBaseOperations
{
    public CuratorFramework getCuratorFramework()
    {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        
        CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("你的公网IP:2181")
            .sessionTimeoutMs(5000) // 会话超时时间
            .connectionTimeoutMs(5000) // 连接超时时间
            .retryPolicy(retryPolicy)
            .namespace("base") // 包含隔离名称
            .build();
        client.start();
        return client;
    }
}

基于Zookeeper实现SnowFlakeID算法

Twitter(推特)的SnowFlake算法是一种著名的分布式服务器用户ID生成算法。SnowFlake算法所生成的ID是一个64bit的长整型数字。这个64bit被划分成四个部分,其中后面三个部分分别表示时间戳、工作机器ID、序列号。

SnowFlakeID分为四个部分:

(1)第一位 占用1 bit,其值始终是0,没有实际作用。

(2)时间戳 占用41 bit,精确到毫秒,总共可以容纳约69年的时间。

(3)工作机器id占用10 bit,最多可以容纳1024个节点。

(4)序列号 占用12 bit。这个值在同一毫秒同一节点上从0开始不断累加,最多可以累加到4095。

在工作节点达到1024顶配的场景下,SnowFlake算法在同一毫秒最多可以生成的ID数量为:1024 * 4096 =4194304,在绝大多数并发场景下都是够用的。

SnowFlake算法的优点:

  • 生成ID时不依赖于数据库,完全在内存生成,高性能和高可用性。

  • 容量大,每秒可生成几百万个ID。

  • ID呈趋势递增,后续插入数据库的索引树时,性能较高。

SnowFlake算法的缺点:

  • 依赖于系统时钟的一致性,如果某台机器的系统时钟回拨了,有可能造成ID冲突,或者ID乱序。

  • 在启动之前,如果这台机器的系统时间回拨过,那么有可能出现ID重复的危险。

基于zookeeper实现雪花算法:

public class SnowflakeIdGenerator
{
    
    /**
     * 单例
     */
    public static SnowflakeIdGenerator INSTANCE = new SnowflakeIdGenerator();
    
    /**
     * 初始化单例
     *
     * @param workerId 节点Id,最大8091
     * @return the 单例
     */
    public synchronized void init(long workerId)
    {
        if (workerId > MAX_WORKER_ID)
        {
            // zk分配的workerId过大
            throw new IllegalArgumentException("woker Id wrong: " + workerId);
        }
        INSTANCE.workerId = workerId;
    }
    
    private SnowflakeIdGenerator()
    {
        
    }
    
    /**
     * 开始使用该算法的时间为: 2017-01-01 00:00:00
     */
    private static final long START_TIME = 1483200000000L;
    
    /**
     * worker id 的bit数,最多支持8192个节点
     */
    private static final int WORKER_ID_BITS = 13;
    
    /**
     * 序列号,支持单节点最高每毫秒的最大ID数1024
     */
    private static final int SEQUENCE_BITS = 10;
    
    /**
     * 最大的 worker id ,8091 -1 的补码(二进制全1)右移13位, 然后取反
     */
    private static final long MAX_WORKER_ID = ~(-1L << WORKER_ID_BITS);
    
    /**
     * 最大的序列号,1023 -1 的补码(二进制全1)右移10位, 然后取反
     */
    private static final long MAX_SEQUENCE = ~(-1L << SEQUENCE_BITS);
    
    /**
     * worker 节点编号的移位
     */
    private static final long WORKER_ID_SHIFT = SEQUENCE_BITS;
    
    /**
     * 时间戳的移位
     */
    private static final long TIMESTAMP_LEFT_SHIFT = WORKER_ID_BITS + SEQUENCE_BITS;
    
    /**
     * 该项目的worker 节点 id
     */
    private long workerId;
    
    /**
     * 上次生成ID的时间戳
     */
    private long lastTimestamp = -1L;
    
    /**
     * 当前毫秒生成的序列
     */
    private long sequence = 0L;
    
    /**
     * Next id long.
     *
     * @return the nextId
     */
    public Long nextId()
    {
        return generateId();
    }
    
    /**
     * 生成唯一id的具体实现
     */
    private synchronized long generateId()
    {
        long current = System.currentTimeMillis();
        
        if (current < lastTimestamp)
        {
            // 如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过,出现问题返回-1
            return -1;
        }
        
        if (current == lastTimestamp)
        {
            // 如果当前生成id的时间还是上次的时间,那么对sequence序列号进行+1
            sequence = (sequence + 1) & MAX_SEQUENCE;
            
            if (sequence == MAX_SEQUENCE)
            {
                // 当前毫秒生成的序列数已经大于最大值,那么阻塞到下一个毫秒再获取新的时间戳
                current = this.nextMs(lastTimestamp);
            }
        }
        else
        {
            // 当前的时间戳已经是下一个毫秒
            sequence = 0L;
        }
        
        // 更新上次生成id的时间戳
        lastTimestamp = current;
        
        // 进行移位操作生成int64的唯一ID
        
        // 时间戳右移动23位
        long time = (current - START_TIME) << TIMESTAMP_LEFT_SHIFT;
        
        // workerId 右移动10位
        long workerId = this.workerId << WORKER_ID_SHIFT;
        
        return time | workerId | sequence;
    }
    
    /**
     * 阻塞到下一个毫秒
     */
    private long nextMs(long timeStamp)
    {
        long current = System.currentTimeMillis();
        while (current <= timeStamp)
        {
            current = System.currentTimeMillis();
        }
        return current;
    }

    public static void main(String[] args)
    {
        SnowflakeIdGenerator instance = SnowflakeIdGenerator.INSTANCE;
        instance.init(1000);
        System.out.println(instance.generateId());
        System.out.println(instance.nextId());
    }
}

zookeeper实现分布式队列

常见的消息队列有:RabbitMQ,RocketMQ,Kafka等。Zookeeper作为一个分布式的小文件管理系统,同样能实现简单的队列功能。

Zookeeper不适合大数据量存储,官方并不推荐作为队列使用,但由于实现简单,集群搭建较为便利,因此在一些吞吐量不高的小型系统中还是比较好用的。

设计思路

1、创建队列根节点:在Zookeeper中创建一个持久节点,用作队列的根节点。所有队列元素的节点将放在这个根节点下。

2、实现入队操作:当需要将一个元素添加到队列时,可以在队列的根节点下创建一个临时有序节点。节点的数据可以包含队列元素的信息。

3、实现出队操作:当需要从队列中取出一个元素时,可以执行以下操作:

  • 获取根节点下的所有子节点。

  • 找到具有最小序号的子节点。

  • 获取该节点的数据。

  • 删除该节点。

  • 返回节点的数据。

/**
 * 入队
 * @param data
 * @throws Exception
 */
public void enqueue(String data) throws Exception {
    // 创建临时有序子节点
    zk.create(QUEUE_ROOT + "/queue-", data.getBytes(StandardCharsets.UTF_8),
            ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
}

/**
 * 出队
 * @return
 * @throws Exception
 */
public String dequeue() throws Exception {
    while (true) {
        List<String> children = zk.getChildren(QUEUE_ROOT, false);
        if (children.isEmpty()) {
            return null;
        }

        Collections.sort(children);

        for (String child : children) {
            String childPath = QUEUE_ROOT + "/" + child;
            try {
                byte[] data = zk.getData(childPath, false, null);
                zk.delete(childPath, -1);
                return new String(data, StandardCharsets.UTF_8);
            } catch (KeeperException.NoNodeException e) {
                // 节点已被其他消费者删除,尝试下一个节点
            }
        }
    }
}

使用Apache Curator实现分布式队列

Apache Curator是一个ZooKeeper客户端的封装库,提供了许多高级功能,其中就包括分布式队列。

public class CuratorDistributedQueueDemo {
    private static final String QUEUE_ROOT = "/curator_distributed_queue";

    public static void main(String[] args) throws Exception {
        CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181",
                new ExponentialBackoffRetry(1000, 3));
        client.start();

        // 定义队列序列化和反序列化
        QueueSerializer<String> serializer = new QueueSerializer<String>() {
            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }
        };

        // 定义队列消费者
        QueueConsumer<String> consumer = new QueueConsumer<String>() {
            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("消费消息: " + message);
            }

            @Override
            public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {

            }
        };

        // 创建分布式队列
        DistributedQueue<String> queue = QueueBuilder.builder(client, consumer, serializer, QUEUE_ROOT)
                .buildQueue();
        queue.start();

        // 生产消息
        for (int i = 0; i < 5; i++) {
            String message = "Task-" + i;
            System.out.println("生产消息: " + message);
            queue.put(message);
            Thread.sleep(1000);
        }

        Thread.sleep(10000);
        queue.close();
        client.close();
    }
}

注意事项

使用Curator的DistributedQueue时,默认情况下不使用锁。当调用QueueBuilder的lockPath()方法并指定一个锁节点路径时,才会启用锁。如果不指定锁节点路径,那么队列操作可能会受到并发问题的影响。

在创建分布式队列时,指定一个锁节点路径可以帮助确保队列操作的原子性和顺序性。分布式环境中,多个消费者可能同时尝试消费队列中的消息。如果不使用锁来同步这些操作,可能会导致消息被多次处理或者处理顺序出现混乱。如果应用场景允许消息被多次处理,或者处理顺序不是关键问题,可以不使用锁。这样可以提高队列操作的性能,因为不再需要等待获取锁。

// 创建分布式队列
QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, serializer, "/order");
//指定了一个锁节点路径/orderlock,用于实现分布式锁,以保证队列操作的原子性和顺序性。
queue = builder.lockPath("/orderlock").buildQueue();
//启动队列,这时队列开始监听ZooKeeper中/order节点下的消息。
queue.start();

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/265774.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Leetcode—415.字符串相加【简单】

2023每日刷题&#xff08;六十八&#xff09; Leetcode—415.字符串相加 实现代码 class Solution { public:string addStrings(string num1, string num2) {string ans;int len1 num1.size();int len2 num2.size();int i len1 - 1, j len2 - 1;int sum 0, c 0;while(i…

面试题 01.01. 判定字符是否唯一(优质解法)

链接&#xff1a;面试题 01.01. 判定字符是否唯一 代码&#xff1a; class Solution {public boolean isUnique(String astr) {//s[i]仅包含小写字母&#xff0c;数据范围小于 32 位&#xff0c;我们可以使用 int 变量的比特位来代替数组// 每个小写字符对应 bitMap 中的一个比…

DETR 【目标检测里程碑的任务】

paper with code - DETR 标题 End-to-End Object Detection with Transformers end-to-end 意味着去掉了NMS的操作&#xff08;生成很多的预测框&#xff0c;nms 去掉冗余的预测框&#xff09;。因为有了NMS &#xff0c;所以调参&#xff0c;训练都会多了一道工序&#xff0c…

小程序本地文件读、写、追加数据操作,以及修改文件内容

小程序系统文件管理器 FileSystemManager 要操作/读取本地文件,首先需要创建文件或文件夹,然后再对文件进行读写操作; 首先创建文件 FileSystemManager.writeFile 可直接创建文件并写入内容 定义文件路径,此路径在读写操作时保持一致 const path = `${wx.env.USER_DATA…

在Jetpack Compose中使用ExoPlayer实现直播流和音频均衡器

在Jetpack Compose中使用ExoPlayer实现直播流和音频均衡器 背景 ExoPlayer与Media3的能力结合&#xff0c;为Android应用程序播放多媒体内容提供了强大的解决方案。在本教程中&#xff0c;我们将介绍如何设置带有Media3的ExoPlayer来支持使用M3U8 URL进行直播流。此外&#x…

html网页编写语言

html是一门语言&#xff0c;所有的网页都是用html这门语言编写出来的。 HTML&#xff08;HyperText Markup Language&#xff09;&#xff1a;超文本标记语言。 超文本&#xff1a;超越了文本的限制&#xff0c;比普通文本更强大。除了文字信息&#xff0c;还可以定义图片&…

Netty-1-编写网络应用程序的基本步骤

编写网络应用程序的基本步骤如下: 完成代码编写。复查代码。“临门一脚"。上线及反馈。 完成代码编写 编写网络应用程序的第一步是完成代码编写。 选择传输协议 对于普通的应用程序而言&#xff0c;经过需求分析、定义业务数据结构和实现业务逻辑之后&#xff0c;我…

研究论文 20231123-Genome Biology:零样本学习预测细基因表达顺式调控模式

Li, Yongge, et al. "CREaTor: zero-shot cis-regulatory pattern modeling with attention mechanisms." Genome Biology 24.1 (2023): 266. 2023年11月23日见刊 微信分享&#xff1a;Genome Biology | CREaTor: 零样本学习预测细胞类型特异的基因表达顺式调控模式…

算法与数据结构--哈夫曼树与哈夫曼编码

演示视频&#xff1a; 【1】数据结构——五分钟搞定哈夫曼树&#xff0c;会求WPL值&#xff0c;不会你打我_哔哩哔哩_bilibili 【2】哈夫曼树和哈夫曼编码_哔哩哔哩_bilibili 【3】哈夫曼树的构造的做题三步骤_哔哩哔哩_bilibili 求哈夫曼编码的步骤&#xff1a; 1.根据字符及…

HTML标签(下)

一、表格标签 1.1表格的主要作用 主要用于显示、展示数据 1.2表格的基本语法 <td>单元格中的文字</td> 如果是表头单元格的话&#xff0c;eg:姓名&#xff0c;年龄<th> 姓名</th>&#xff08;th是table head&#xff09;; 作用&#xff1a;表头会…

用Python处理PDF:拆分与合并PDF文档

PDF文档在信息共享和数据保存方面被广泛使用&#xff0c;处理PDF文档也成为常见需求。其中&#xff0c;合并和拆分PDF文档能够帮助我们更有效地管理PDF文档&#xff0c;使文档内容分布更合理。通过合并&#xff0c;可以将相关文档整合成一个文件&#xff0c;以便更好地组织和提…

深入了解C编译管道

文章目录 引言1. 预处理阶段2. 编译阶段3. 汇编阶段4. 链接阶段5.流程图结论 引言 C编译管道是软件开发中至关重要的工具&#xff0c;它负责将C语言源代码转换为可执行的机器代码。理解C编译管道的工作原理有助于提高代码的可读性、可维护性&#xff0c;并有助于优化生成的可执…

20231222给NanoPC-T4(RK3399)开发板的适配原厂Android10的挖掘机方案并跑通AP6398SV

20231222给NanoPC-T4(RK3399)开发板的适配原厂Android10的挖掘机方案并跑通AP6398SV 1、简略步骤&#xff1a;rootrootrootroot-X99-Turbo:~/3TB/3399-android10$ cat Rockchip_Android10.0_SDK_Release.tar.gz0* > Rockchip_Android10.0_SDK_Release.tar.gz rootrootrootro…

SUSE Linux服务器使用zypper安装nginx

SUSE Linux 的云服务器用户&#xff0c;不能yum,安装软件&#xff0c;可通过 zypper 快速安装软件。 使用 root 账号登录 openSUSE 操作系统的云服务器。 执行 zypper service-list 或 zypper sl 命令 列出软件源 安装软件包 执行 zypper search 或 zypper se 命令&#…

扫码展示多视频怎么做?视频的活码制作技巧

现在扫码看视频的应用场景越来越多&#xff0c;用这种方式不仅能够简单有效的低成本完成视频传播&#xff0c;而且也符合用户的习惯。那么当需要将视频制作二维码来展示内容时&#xff0c;多个视频文件生成二维码的制作方法是怎么操作的呢&#xff1f;下面教大家使用视频二维码…

运用ETL快速拉取吉客云平台订单信息

吉客云介绍 吉客云是一家中国的云计算服务提供商。它提供了包括云服务器、云数据库、云存储、云网络等各种云计算产品和解决方案&#xff0c;帮助企业和个人搭建高效、可靠、安全的云计算环境。 吉客云特点和优势&#xff1a; 大规模分布式架构&#xff1a;吉客云基于自主研发…

安装nodejs,配置环境变量并将npm设置淘宝镜像源

安装nodejs并将npm设置淘宝镜像源 1. 下载nodejs 个人不喜欢安装包&#xff0c;所以是下载zip包的方式。这里我下载的node 14解压包版本 下载地址如下&#xff1a;https://nodejs.org/dist/v14.15.1/node-v14.15.1-win-x64.zip 想要其他版本的小伙伴去https://nodejs.org/di…

【【迭代16次的CORDIC算法-verilog实现】】

迭代16次的CORDIC算法-verilog实现 -32位迭代16次verilog代码实现 CORDIC.v module cordic32#(parameter DATA_WIDTH 8d32 , // we set data widthparameter PIPELINE 5d16 // Optimize waveform)(input …

opencv入门到精通——改变颜色空间

目录 目标 改变颜色空间 对象追踪 如何找到要追踪的HSV值&#xff1f; 目标 在本教程中&#xff0c;你将学习如何将图像从一个色彩空间转换到另一个&#xff0c;像BGR↔灰色&#xff0c;BGR↔HSV等 除此之外&#xff0c;我们还将创建一个应用程序&#xff0c;以提取视频中的…

Mapmost Alpha上新啦!新增移动端的丝滑且强大功能!

本文目录 一、Mapmost Alpha 介绍1.1 Maopmost 数字孪生平台1.2 Mapmost 产品能力1.3 Mapmost Alpha 产品优势 二、移动端功能介绍三、Mapmost Alpha 总结 一、Mapmost Alpha 介绍 Hello&#xff0c;各位铁铁&#xff0c;今天给大家推荐一款好用的三维城市场景创建工具。 这款…
最新文章