【Java 并发】CyclicBarrier 介绍

1 简介

在多线程编程中, 协调和同步线程的执行是至关重要的。Java 提供了许多并发工具来帮助开发人员有效地管理多线程应用程序。
其中之一是 CyclicBarrier, 它是一个强大的同步辅助类, 可用于在多个线程之间创建同步点, 以便它们可以在同一时间点协调执行某个任务。

CyclicBarrier 和 CountDownLatch 一样具有等待计数的功能, 但是相比于 CountDownLatch 功能更加强大。

其本身具备了以下特点

  1. 同步多个线程: 允许多个线程在达到共同的同步点之前进行等待, 然后同时开始执行
  2. 可重用性: 一旦所有等待线程达到同步点, CyclicBarrier 会重置计数器, 可以被重复使用。这使得它适用于循环的多阶段任务
  3. 自定义回调动作: 可以在所有线程达到同步点后执行自定义的回调动作, 以便在同步完成后执行一些特定的逻辑
  4. 指定等待时间: 提供了一个带有超时参数的 await() 方法, 允许等待一段指定的时间后, 即使没有足够的线程达到同步点, 也会继续执行
  5. 可检测是否被破坏: 提供的 isBroken() 方法允许检测在等待过程中是否有线程被中断, 以及是否有异常发生
  6. 内存一致性效果: 在所有线程达到同步点时, 对于之前的所有写入操作, 对于当前线程都是可以见的

为了理解 CyclicBarrier, 这里举一个通俗的例子。
开运动会时, 会有跑步这一项运动, 我们来模拟下运动员入场时的情况。
假设有 6 条跑道, 在比赛开始时, 就需要 6 个运动员在比赛开始的时候都站在起点了, 裁判员吹哨后才能开始跑步。跑道起点就相当于 “barrier”, 是临界点,
而这 6 个运动员就类比成线程的话, 就是这 6 个线程都必须到达指定点了, 意味着凑齐了一波, 然后才能继续执行, 否则每个线程都得阻塞等待, 直至凑齐一
波即可。cyclic 是循环的意思, 也就是说 CyclicBarrier 当多个线程凑齐了一波之后, 仍然有效, 可以继续凑齐下一波。CyclicBarrier的执行。

示意图如下:
Alt 'CyclicBarrier 效果图'

当多个线程都达到了指定点后, 才能继续往下继续执行。这就有点像报数的感觉, 假设 6 个线程就相当于 6 个运动员, 到赛道起点时会报数进行统计, 如果刚好是 6 的话, 这一波就凑齐了, 才能往下执行。
CyclicBarrier 在使用一次后, 可以进行重置, 继续当做计数器使用, 这是与 CountDownLatch 的区别之一

2 CyclicBarrier 的方法

上面说到的 6 个运动员, 6 个线程, 指的就是计数器的初始值 6, 可以通过 CyclicBarrier 的构造方法传入的

public class CyclicBarrier {

    public CyclicBarrier(int parties) {
        this(parties, null);
    }

    // barrierAction 指定了回调函数, 在维护的个数达到了 0 了, 就会执行这个函数
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0)
            throw new IllegalArgumentException();

        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }
}

CyclicBarrier 的主要方法

  1. await() throws InterruptedException, BrokenBarrierException : 等到所有的线程都到达指定的临界点
  2. await(long timeout, TimeUnit unit) : 与上面的 await 方法功能一致, 只不过这里有了时间限制, 调用该方法的线程等到指定的 timeout 时间后, 不管 N 是否减至为 0, 都会继续往下执行
  3. getNumberWaiting() : 获取当前距离线程执行还差多少值
  4. reset(): CyclicBarrier 状态重置, 重新开始
  5. isBroken(): 判断当前的 CyclicBarrier 是否为一个打破状态, 可以理解为当前的栅栏被破坏了, 无法继续使用了, 线程这时在调用 await 方法会抛出异常

CyclicBarrier 进入打破状态的场景

  1. 线程被设置了中断标识为 true 的情况下, 调用了 await 方法
  2. 调用了 await 带超时时间的方法, 超时了也会进入 break 状态
  3. 线程调用 await, 这时需要等待的线程数为 0 了, 执行回调函数, 这个过程失败了, 也会进入 break 状态
  4. 调用 reset 方法, 也会先设置为 break 状态, 在重试设置为 false

下面用一个具体的例子来说明 CyclicBarrier 的具体用法:

public class CyclicBarrierDemo {
 
    //指定必须有 6 个运动员到达才行, 同时搭配一个线程数达到了, 就执行的回调的函数
    private static CyclicBarrier barrier = new CyclicBarrier(6, () -> {
        System.out.println("所有运动员入场, 裁判员一声令下!!!!!");
    });


    public static void main(String[] args) {

        System.out.println("运动员准备进场, 全场欢呼............");

        ExecutorService service = Executors.newFixedThreadPool(6);

        for (int i = 0; i < 6; i++) {
            service.execute(() -> {

                try {

                    System.out.println(Thread.currentThread().getName() + " 运动员, 进场");
                    // 线程挂起, 得到计数器达到了 0 
                    barrier.await();

                    // 线程又被唤醒, 继续执行
                    System.out.println(Thread.currentThread().getName() + "  运动员出发");

                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            });
        }
    }
}

输出结果:

运动员准备进场, 全场欢呼............
pool-1-thread-2 运动员, 进场
pool-1-thread-1 运动员, 进场
pool-1-thread-3 运动员, 进场
pool-1-thread-4 运动员, 进场
pool-1-thread-5 运动员, 进场
pool-1-thread-6 运动员, 进场
所有运动员入场, 裁判员一声令下!!!!!
pool-1-thread-6  运动员出发
pool-1-thread-1  运动员出发
pool-1-thread-5  运动员出发
pool-1-thread-4  运动员出发
pool-1-thread-3  运动员出发
pool-1-thread-2  运动员出发

3 CyclicBarrier 的源码实现

CyclicBarrier 内部是通过 ReentrantLock 和 Condition 实现的
而 ReentrantLock 和 Condition 是基于 AQS 实现的, 所以还有 AQS 的基础, 就能很容易理解。

3.1 CyclicBarrier 中的 Generation

CyclicBarrier 内部定义了一个很简单的内部类 Generation

private static class Generation {
    
    Generation() {} 
    
    // 默认值为 false, 表示当前的栅栏是否为破坏状态, 也就是当前的 CyclicBarrier 已经遭到了破坏, 不适用了
    boolean broken;
}

3.2 CyclicBarrier 的 await 方法

CyclicBarrier 的 await 方法可以让线程进行等待所有线程的到齐, 一起执行, 没到齐的情况进行挂起的操作。

public class CyclicBarrier {

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            // 参数 1: 是否会超时
            // 参数 2: 0L 超时时间 0 纳秒, 即不超时
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe);
        }
    }

    private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {

        final ReentrantLock lock = this.lock;
        // 获取可重入锁, 加锁, 加锁失败会被阻塞
        lock.lock();

        try {
        
            final Generation g = generation;

            // 当前的栅栏状态已经为破坏状态, 直接抛出异常
            if (g.broken)
                throw new BrokenBarrierException();

            // 线程是中断状态
            if (Thread.interrupted()) {
                // 直接将栅栏设置为破坏状态, 通过 condition 将所有在等待的队列的线程移到同步队列, 等待获取锁唤醒
                breakBarrier();
                throw new InterruptedException();
            }    

            // 还需要等待的线程数 - 1
            int index = --count;

            // 还需要等待的线程数为 0 了
            if (index == 0) {
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    // 有设置了回调函数, 执行回调函数
                    if (command != null)
                        command.run();

                    ranAction = true;
                    nextGeneration();
                    return 0;

                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            for (;;) {
                try {

                    // 把当前的线程放到等待队列, 等待唤醒
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);    

                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        Thread.currentThread().interrupt();
                    }
                }   

                // 线程唤醒了, 继续执行其他的逻辑
                if (g.broken)
                    throw new BrokenBarrierException();     

                // 唤醒后, 保留的 Generation 引用和最新的 Generation 不一样了, 直接返回当前的 index, 既还需要等待多少个线程
                if (g != generation)
                    return index; 

                // 设置了超时, 同时超时时间小于等于 0 了
                if (timed && nanos <= 0L) {
                    // 直接将栅栏设置为爆破状态, 通过 condition 将所有在等待的队列的线程移到同步队列, 等待获取锁唤醒
                    breakBarrier();
                    throw new TimeoutException();
                }       
            }
        } finally {
            lock.unlock();
        }

    }

    private void breakBarrier() {
        // 设置 Generation 的 broken 为 true,
        generation.broken = true;
        count = parties;
        // 将在等待队列中的线程移到同步队列
        trip.signalAll();
    }

    private void nextGeneration() {
        // 将在等待队列中的线程移到同步队列
        trip.signalAll();
        // 更新新的计算值
        count = parties;
        // 重试设置一个新的  Generation, broken 默认为 false
        generation = new Generation();
    }

}

3.3 CyclicBarrier 的 isBroken 方法

public class CyclicBarrier {

    public boolean isBroken() {

        // 加锁
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 返回当前的 Generation 的 broken 状态
            return generation.broken;
        } finally {
            lock.unlock();
        }
    }
}

3.3 CyclicBarrier 的 reset 方法

public class CyclicBarrier {

    public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 设置为 break 状态, 更新新的等待线程个数
            breakBarrier();
            // 设置一个新的 Generation
            nextGeneration();
        } finally {
            lock.unlock();
        }
    }
}

4 CountDownLatch 与 CyclicBarrier 的比较

CountDownLatch 与 CyclicBarrier 都是用于控制并发的工具类, 都可以理解成维护的就是一个计数器, 但是这两者还是各有不同侧重点的:

  1. CountDownLatch 一般用于某个线程 A 等待若干个其他线程执行完任务之后, 它才执行; 而 CyclicBarrier 一般用于一组线程互相等待至某个状态,
    然后这一组线程再同时执行; CountDownLatch 强调一个线程等多个线程完成某件事情。CyclicBarrier 是多个线程互等, 等大家都完成, 再携手共进。
  2. 调用 CountDownLatch 的 countDown 方法后, 当前线程并不会阻塞, 会继续往下执行; 而调用 CyclicBarrier 的 await 方法, 会阻塞当前
    线程, 直到 CyclicBarrier 指定的线程全部都到达了指定点的时候, 才能继续往下执行
  3. CountDownLatch 方法比较少, 操作比较简单, 而 CyclicBarrier 提供的方法更多, 比如能够通过 getNumberWaiting(), isBroken() 这些
    方法获取当前多个线程的状态, 并且 CyclicBarrier 的构造方法可以传入 barrierAction, 指定当所有线程都到达时执行的业务功能
  4. CountDownLatch 是不能复用的, 而 CyclicLatch 是可以复用的, 可以通过 reset 方法进行重置

5 参考

大白话说java并发工具类-CountDownLatch, CyclicBarrier

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/262664.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Flink系列之:Checkpoints 与 Savepoints

Flink系列之&#xff1a;Checkpoints 与 Savepoints 一、概述二、功能和限制 一、概述 从概念上讲&#xff0c;Flink 的 savepoints 与 checkpoints 的不同之处类似于传统数据库系统中的备份与恢复日志之间的差异。 Checkpoints 的主要目的是为意外失败的作业提供恢复机制。 …

12、Qt:用QProcess类启动外部程序:简单使用

一、说明 简单使用&#xff1a;在一个函数中&#xff0c;使用QProcess类的临时对象调用可执行文件exe&#xff0c;只有这个exe执行完了&#xff0c;这个函数才往下执行&#xff0c;一次性打印出exe所有输出信息&#xff1b;复杂使用&#xff1a;创建QProcess类的全局对象&…

蛮力法之背包问题

问题: 有 n 个重量分别是 w1,w2....,wn 的物品&#xff08;物品编号为 1-n&#xff09;它们的价值分别为 v1,v2,...,vn 给定一个容量为 W 的背包。设计从这些物品中选取一部分放入该背包的方案。 每个物品要么选中要么不选中【其实每个物品只有 1 件】&#xff0c;要求选中…

CSS:盒子模型

CSS&#xff1a;盒子模型 盒子模型盒子模型的组成盒子内容边框 border内边距 padding盒子实际大小计算CSS3的盒子类型content-boxborder-box 外边距 margin外边距合并相邻块元素垂直外边距合并嵌套块元素垂直外边距塌陷 行内元素的内外边距 盒子相关属性圆角边框盒子阴影 盒子模…

python之导入.py文件

目录 1、文件结构 2、导入.py文件 2.1同一层内文件夹内的导入 2.2不同层内文件夹内的导入 1、文件结构 Paint_master是一个工程的根目录&#xff0c;忽略一些文件及文件夹后&#xff0c;其文件结构如下&#xff1a; src util ImageUtil.py view BaseAdjustDialog.py MainW…

字符串函数的模拟实现(部分字符串函数)

strlen函数模拟 size_t my_strlen(const char* arr) {int count 0;while(*arr){arr;count;}return count;} int main() { printf( " %zd", my_strlen("adsshadsa"));}//模拟实现strlen函数 strcpy函数模拟 char* my_strcpy(char* arr1, const char* ar…

Python算法例21 交错正负数

1. 问题描述 给出一个含有正整数和负整数的数组&#xff0c;将其重新排列成一个正负数交错的数组。 2. 问题示例 给出数组[-1&#xff0c;-2&#xff0c;-3&#xff0c;4&#xff0c;5&#xff0c;6]&#xff0c;重新排序之后&#xff0c;变成[-1&#xff0c;5&#xff0c;-…

Web前端-JavaScript(对象)

文章目录 1.对象1.1 概念1.2 创建对象三种方式**对象字面量创建对象**&#xff1a;new Object创建对象构造函数创建对象 1.3 遍历对象 2.作用域1.1 概述1.2 全局作用域1.3 局部作用域1.4 JS没有块级作用域1.5 变量的作用域1.6 作用域链1.7 预解析 1.对象 1.1 概念 什么是对象 …

Ubuntu 磁盘管理DF命令用法

Linux磁盘空间管理是系统运维中的核心环节&#xff0c;它直接影响到系统的稳定运行、数据的安全性和业务的连续性。 通过实施有效的磁盘空间管理策略&#xff0c;系统管理员可以确保系统的高效运作&#xff0c;满足不断变化的业务需求&#xff0c;并为用户提供可靠的服务。 因此…

【YOLOv8新玩法】姿态评估解锁找圆心位置

前言 Hello大家好&#xff0c;今天给大家分享一下如何基于深度学习模型训练实现圆检测与圆心位置预测&#xff0c;主要是通过对YOLOv8姿态评估模型在自定义的数据集上训练&#xff0c;生成一个自定义的圆检测与圆心定位预测模型 制作数据集 本人从网络上随便找到了个工业工件…

自动标注软件AnyLabeling安装

AnyLabeling自动标注软件介绍 该工具作为一个具有Segment Anything和YOLO模型的智能标签工具&#xff0c;可以快速、准确地对图像进行标注。 AnyLabeling LabelImg Labelme Improved UI Auto-labeling 在Python终端运行 pip install anylabeling启动AnyLabeling anylabe…

危险品内陆运输相关知识_箱讯科技

危险品拖车 危险品拖车运输是一项涉及到高度危险物质的专业工作&#xff0c;需要确保合法合规的运输&#xff0c;并提供必要的信息以保障公共安全。进行这类运输时&#xff0c;需要携带一系列文件和具备特定的资质。 什么样的车适合做危险品拖车? 1、车辆类型&#xff1a;通…

长三角安防行业盛会“2024杭州安博会”4月份在杭州博览中心召开

作为中国安防行业的盛会&#xff0c;2024杭州安博会将于4月份在杭州国际博览中心隆重召开。本届安博会将汇聚全球最先进的安防技术和产品&#xff0c;为来自世界各地的安防从业者、爱好者以及投资者提供一个交流、展示和合作的平台。 据了解&#xff0c;2024杭州安博会将会展示…

Windows11系统下如何通过.cab文件更新PL2303串口驱动?

Windows11系统下如何通过.cab文件更新PL2303串口驱动? 首先,在微软官方网站上下载所需版本的.cab文件,具体链接如下: https://www.catalog.update.microsoft.com/Search.aspx?q=Prolific%20USB-to-Serial%20Comm%20Port 如下图所示,进入该网站后,找到自己所需的驱动版…

IPD产品开发的三类变更的含义和在流程中的位置

在基于IPD的产品开发过程中&#xff0c;变更仍然不可避免&#xff0c;甚至变更比基于传统流程的产品开发更多&#xff0c;因为IPD的产品开发是盯着市场变化、快速响应市场需求的&#xff0c;而这个世界唯一的不变就是变。那么&#xff0c;我们如何对产品开发过程中的变更进行分…

智能网络与网络安全:全球发展与未来趋势

导言 随着数字化时代的到来&#xff0c;智能网络和网络安全成为科技领域的研究热点。本文将深入研究智能网络与网络安全的发展过程、遇到的问题、解决过程&#xff0c;以及未来的可用范围。同时&#xff0c;关注各国在这一领域的应用情况和未来研究的趋势&#xff0c;以便在全球…

期货交易策略模拟测试-基于CLBISO01策略-2023.12.22

采取与昨天同样的策略进行盘中模拟测试&#xff0c;今天行情还可以&#xff0c;挺“顺溜”。

Xcode15 iOS 17 Simulator 离线安装,模拟器安装

Xcode 15 安装包的大小相比之前更小&#xff0c;因为除了 macOS 的 Components&#xff0c;其他都需要动态下载安装&#xff0c;否则提示 iOS 17 Simulator Not Installed。 如果不安装对应的运行模拟库 无法真机和模拟器运行&#xff0c;更无法新建项目。但是由于模拟器安装包…

拖拽列表简单实现

样式部分&#xff1a; <style>.item {width: 500px;text-align: center;margin-bottom: 5px;height: 40px;line-height: 40px;border-radius: 5px;color: #fff;margin: 5 auto;background-color: red;}.item.moving {background: transparent;color: transparent;border…

JNI学习(二)

静态注册 接着上篇博客学习 JNI函数 JNIEXPORT void JNICALL Java_com_example_jnidemo_TextDemo_setText(JNIEnv *env, jobject this, jstring string){ __android_log_print(ANDROID_LOG_ERROR, "test", "invoke set from C\n");char* str (char*)(*e…
最新文章