【Java线程】ThreadPoolExecutor和CountDownLatch: 多线程并发处理任务, 并获取线程处理结果

目录标题

  • 前言
  • 内容
    • 代码示例
      • 1.记录线程处理结果
      • 2.多线程处理
    • 关键点说明
    • 代码优化
  • 总结

前言

我们在日常开发中,大概率会遇到如下场景:主线程阻塞,并获取多个子线程执行任务的结果。而我们借助ThreadPoolExecutor和CountDownLatch可以高效率处理多个任务。

内容

代码示例

1.记录线程处理结果

@Data
public class CommunicationCheckResult {

    private boolean mysqlCommunicationCheck;

    // private boolean redisCommunicationCheck;

    private boolean rabbitmqCommunicationCheck;

}

2.多线程处理

/**
 * @author: coffee
 * @date: 2023/3/30 12:17 PM
 * @description: ...
 */
@Slf4j
@Component
public class CommunicationService {

    private static final String REDIS_COMMUNICATION_CHECK_LOCK_KEY = "communication.check.key";
    @Autowired
    private RedissonClient redissonClient;
    @Autowired
    private AmqpAdmin amqpAdmin;
    

    /**
     * 功能描述:相关服务通讯网络检查
     * @return result
     */
    public CommunicationCheckResult communicationCheck (CommunicationCheckRequest request) {
        String processId = UUID.randomUUID().toString();
        log.info("processId:[{}],开始进行网络通讯检查... request:[{}]", processId, JsonUtils.toJson(request));
        AssertUtil.isTrue(Objects.nonNull(request));
        AssertUtil.isTrue(StringUtils.hasText(request.getUserId()), "userId is empty.");

        /* 分布式锁控制 (根据业务场景考虑是否使用分布式锁) */
        RLock lock = redissonClient.getLock(REDIS_COMMUNICATION_CHECK_LOCK_KEY);
        if(!lock.tryLock()) {
            log.info("lock conflict");
            return null;
        }

        /* 流程处理 */
        CommunicationCheckResult result = new CommunicationCheckResult();
        try {
            handler(processId, request, result);
        } catch (Exception e) {
            log.error("processId:[{}], fail", processId, e);
        } finally {
            if (lock.isHeldByCurrentThread()) {
                lock.unlock();
            }
        }

        log.info("processId:[{}],完成网络通讯检查... result:[{}]", processId, JsonUtils.toJson(result));
        return result;
    }

    private void handler(String processId, CommunicationCheckRequest request, CommunicationCheckResult result)
            throws InterruptedException {
        /* 自定义线程池 */
        ThreadPoolExecutor threadPoolExecutor =  new ThreadPoolExecutor(
                10,
                20,
                1L,
                TimeUnit.MINUTES,
                new ArrayBlockingQueue<>(1)
        );

        /*
           注意事项:如果serverCount=3,但是只对2个服务做了处理,会有问题。 因为countDownLatch会一直释放不了。
         */
        int serverCount = CommunicationCheckResult.class.getDeclaredFields().length;
        CountDownLatch countDownLatch = new CountDownLatch(serverCount);
        log.info("processId:[{}], 待检查服务数量:[{}]", processId, serverCount);

        threadPoolExecutor.submit(()->{
            try{
                /* 开始检查MySQL数据库 */
                checkMysql(result);
                log.info("processId:[{}], 1.MySQL通讯正常", processId);
            } catch (Exception e) {
                result.setMysqlCommunicationCheck(false);
                log.error("mysql 通讯异常", e);
            } finally {
                countDownLatch.countDown();
            }
        });

        threadPoolExecutor.submit(()->{
            try{
                /* 开始检查rabbitmq */
                checkRabbitmq(result);
                log.info("processId:[{}], 3.rabbitmq通讯正常", processId);
            } catch (Exception e) {
                result.setMysqlCommunicationCheck(false);
                log.error("rabbitmq 通讯异常", e);
            } finally {
                countDownLatch.countDown();
            }
        });

        log.info("processId:[{}], 【========================主线程阻塞========================】", processId);
        /* 阻塞主线程 */
        countDownLatch.await();
        log.info("processId:[{}], 【========================主线程结束========================】", processId);

    }
    
    private void checkRabbitmq(CommunicationCheckResult result) {
        String queueName = "test-queue";
        amqpAdmin.getQueueProperties(queueName);
        result.setRabbitmqCommunicationCheck(true);
    }
 }

关键点说明

1、ThreadPoolExecutor自定义线程池:定义线程池,每调用一次submit就是提交一次任务
2、CountDownLatch线程计数器:用于阻塞主线程,当所有子线程执行完毕,再回到主线程

  • 注意事项:如果定义10,那么countDownLatch.countDown()必须调用10次。只有在结束计数的时候,才会重新执行到主线程;

3、获取线程处理结果:借助Java的引用类型存储数据,可解决多线程数据存储的问题。

代码优化

1、如果感觉一个一个的submit提交任务太麻烦,可以考虑借助枚举+Switch的方式优化:

           // 定义服务枚举
            ServerEnum[] array = {REDIS,RABBIT_MQ};
            for (int i = 0 ; i < array.length; i++) {
                switch (array[i]) {
                    case REDIS:
                        checkRedis(threadPoolExecutor, countDownLatch);
                        break;
                    default:
                        throw new RuntimeException();
                }
            }
=======================
          void checkRedis(threadPoolExecutor, countDownLatch) {
                threadPoolExecutor.submit(()->{
                    try{
                        /* 开始检查redis */
                        checkRedis(result);
                        log.info("processId:[{}], 2.redis通讯正常", processId);
                    } catch (Exception e) {
                        result.setRedisCommunicationCheck(false);
                        log.error("redis 通讯异常", e);
                    } finally {
                        countDownLatch.countDown();
                    }
                });
            }
           

总结

关于线程实战的更多知识,转到:【Java线程】线程篇 - 从理论到具体代码案例最全线程知识点梳理(持续更新中…) – 四、线程实战篇

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/5785.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

操作系统笔记——进程管理

操作系统笔记——进程管理2. 进程管理2.1 进程与线程2.1.1 进程的引入前趋图程序的顺序执行程序的并发执行2.1.2 进程的定义及描述进程的定义进程的特征进程和程序的关系进程与作业的区别进程的组成2.1.3 进程的状态与转换进程的5种基本状态进程的状态的相互转换2.1.4 进程的控…

java常见锁策略分享(包括cas和synchronized的优化)

前言 锁策略学习思维导图: 1.常见锁策略 ① 乐观锁和悲观锁 ● 它们是根据锁冲突的预测,如果预测锁冲突比较小,那就是乐观锁,反之,就是悲观锁. ● 举个例子:高考前夕,我总觉得高考题会很难,然后拼命做各种科目的题,全副武装的去应对高考,而我妈则觉得高考只是人生的一个阶段而…

PCB模块化设计04——USB-Type-C PCB布局布线设计规范

目录PCB模块化设计04——USB-Type-C PCB布局布线设计规范USB Type-C功能介绍信号图示Type-C接口引脚定义USB 2.0差分对电源和接地引脚RX和TX引脚CC1和CC2针脚VCONN引脚SBU1和SBU2针脚USB供电PCB设计布线要求PCB模块化设计04——USB-Type-C PCB布局布线设计规范 USB Type-C US…

STC的官网,是我永远忘不掉的炼丹炉

搞电子的&#xff0c;应该都搞过8051搞8051的&#xff0c;那应该都搞过STC在国内&#xff0c;STC已经成为了8051的代名词http://www.stcmcudata.com/如果你刚开始搞嵌入式&#xff0c;应该学单片机&#xff0c;你学习单片机&#xff0c;就应该学习下8051&#xff0c;学习8051&a…

Python+Pygame实现简单的单词小游戏

语言是一种艺术&#xff0c;但是作为语言的基础——词汇&#xff0c;却不像艺术那样赏心悦目。不断的记忆与复习&#xff0c;让词汇成为很多孩子在学习英语时&#xff0c;最难完全攻克的关卡。本文就来用Python制作一个简单的英语单词游戏吧 前言 语言是一种艺术&#xff0c;但…

【ArcGIS Pro二次开发】(17):打开GDB、SHP、CAD等各种数据

一、打开GDB数据库 // 输入一个数据库路径string gdbPath "C:\Users\Administrator\Documents\ArcGIS\Projects\Test\Test.gdb";await QueuedTask.Run(() >{// 如果文件夹存在并且包含有效的地理数据库&#xff0c;则会打开地理数据库。using (Geodatabase geoda…

【单片机/普中A2】学习笔记1-配置环境与STC-ISP烧录

目录前言连接到开发板micro-usb 测试安装串口驱动烧写准备源码烧录前言 目前我们的开发需求很简单&#xff0c;仅需三个软件&#xff1a; keli5 编写代码proteus8 professional 描绘电路板STC-ISP 串口烧录 具体教程在 CSDN 等博客平台上已经有很多&#xff0c;这里就不再赘述…

(排序2)希尔排序

写希尔排序注意&#xff1a; 写新元素融入有序数组的过程(end&tmp)将这个过程给多次类比到需要排序的一串数据中 (for&while)排完一组不够&#xff0c;需要排gap组 (再来for)敲定gap下标关系&#xff1a; 希尔排序与直接插入排序的区别与联系 希尔排序的话也叫做缩小…

刷题笔记【3】| 快速刷完67道剑指offer(Java版)

本文已收录于专栏&#x1f33b;《刷题笔记》文章目录前言&#x1f3a8; 1、斐波那契数列题目描述思路一&#xff08;递归&#xff09;思路二&#xff08;循环&#xff09;&#x1f3a8; 2、跳台阶题目描述思路一&#xff08;递归&#xff09;思路二&#xff08;循环&#xff09…

03-03 周五 镜像安装sshd和jupyter以及修改密码

03-03 周五 镜像安装sshd和jupyter以及修改密码时间版本修改人描述2023年3月3日15:34:49V0.1宋全恒新建文档 简介 由于在镜像中需要进行jupyter和sshd的安装&#xff0c;并且需要进行密码的修改&#xff0c;因此在该文档中记录了这两个交互方式的工程设计。 在线加密 在线加密…

Pycharm创建自定义代码片段

简介 PyCharm允许您创建自定义代码片段&#xff0c;也称为代码模板&#xff0c;以提高您的开发效率 实现步骤 1.添加代码模板 打开PyCharm并导航到File->Settings&#xff0c;或者按快捷键ctrl alt s 打开设置 ​ 按照如下序号步骤进行点击&#xff0c;点击“”按钮以…

基于canvas画布的实用类Fabric.js的使用Part.3

目录一、基于canvas画布的实用类Fabric.js的使用Part.1Fabric.js简介 开始方法事件canvas常用属性对象属性图层层级操作复制和粘贴二、基于canvas画布的实用类Fabric.js的使用Part.2锁定拖拽和缩放画布分组动画图像滤镜渐变右键菜单删除三、基于canvas画布的实用类Fabric.js的使…

gcc在Linux下如何运行一个C/C++程序

安装gcc&#xff1a;sudo apt-get install gcc&#xff08;之后输入密码即可&#xff09; 绝对路径的方式进入usr目录&#xff1a; cd /home /home/&#xff1a;是普通用户的主目录&#xff0c;在创建用户时&#xff0c;每个用户要有一个默认登录和保存自己数据的位置&#x…

【数据结构刷题集】链表经典习题

&#x1f63d;PREFACE&#x1f381;欢迎各位→点赞&#x1f44d; 收藏⭐ 评论&#x1f4dd;&#x1f4e2;系列专栏&#xff1a;数据结构刷题集&#x1f50a;本专栏涉及到题目是数据结构专栏的补充与应用&#xff0c;只更新相关题目&#xff0c;旨在帮助提高代码熟练度&#x…

第14章_视图

第14章_视图 &#x1f3e0;个人主页&#xff1a;shark-Gao &#x1f9d1;个人简介&#xff1a;大家好&#xff0c;我是shark-Gao&#xff0c;一个想要与大家共同进步的男人&#x1f609;&#x1f609; &#x1f389;目前状况&#xff1a;23届毕业生&#xff0c;目前在某公司…

Python 自动化指南(繁琐工作自动化)第二版:六、字符串操作

原文&#xff1a;https://automatetheboringstuff.com/2e/chapter6/ 文本是程序将处理的最常见的数据形式之一。您已经知道如何用操作符将两个字符串值连接在一起&#xff0c;但是您可以做得更多。您可以从字符串值中提取部分字符串&#xff0c;添加或删除空格&#xff0c;将字…

【新2023Q2模拟题JAVA】华为OD机试 - 找数字 or 找等值元素

最近更新的博客 华为od 2023 | 什么是华为od,od 薪资待遇,od机试题清单华为OD机试真题大全,用 Python 解华为机试题 | 机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南华为od机试,独家整理 已参加机试人员的实战技巧本篇题解:找数字 or 找等值元素 题目 …

华为OD机试 用java实现 -【重组字符串】

最近更新的博客 华为od 2023 | 什么是华为od,od 薪资待遇,od机试题清单华为OD机试真题大全,用 Python 解华为机试题 | 机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南华为od机试,独家整理 已参加机试人员的实战技巧本篇题解:重组字符串 题目 给定一个非…

计算机网络 第一章 概述小结

计算机网络 第一章 概述 1.1 因特网概述 名词解释&#xff1a;因特网服务提供者ISP&#xff08;Internet Service Provider&#xff09; 1.2 三种交换方式 电路交换&#xff1a; 优点&#xff1a;通信时延小、有序传输、没有冲突、适用范围广、实时性强、控制简单&#x…

【美赛】2023年MCM问题Y:理解二手帆船价格(代码思路)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…