【业务功能篇135】多线程+countDownLatch执行大数据量定时任务

对于业务中存在一些功能需求,业务逻辑复杂且数据量大,过程处理也就比较繁琐,如果直接在单线程同步执行,效率就比较低了,所以我们需要利用多线程,开启多个线程去把任务分线程异步执行,这些效率就有显著提升

  • 多线程+ countDownLatch

CountDownLatch概念

CountDownLatch是一个同步工具类,用来协调多个线程之间的同步,或者说起到线程之间的通信(而不是用作互斥的作用)。

CountDownLatch能够使一个线程在等待另外一些线程完成各自工作之后,再继续执行。使用一个计数器进行实现。计数器初始值为线程的数量。当每一个线程完成自己任务后,计数器的值就会减一。当计数器的值为0时,表示所有的线程都已经完成一些任务,然后在CountDownLatch上等待的线程就可以恢复执行接下来的任务。

CountDownLatch的用法

CountDownLatch典型用法:1、某一线程在开始运行前等待n个线程执行完毕。将CountDownLatch的计数器初始化为new CountDownLatch(n),每当一个任务线程执行完毕,就将计数器减1 countdownLatch.countDown(),当计数器的值变为0时,在CountDownLatch上await()的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。

CountDownLatch典型用法:2、实现多个线程开始执行任务的最大并行性。注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的CountDownLatch(1),将其计算器初始化为1,多个线程在开始执行任务前首先countdownlatch.await(),当主线程调用countDown()时,计数器变为0,多个线程同时被唤醒。

  • CountDownLatch 也就是倒计锁,常用来阻塞主线程、等待被子线程唤醒,或者在子线程中阻塞等待被主线程唤醒

常用方法:

1、countDown()

countDown() 方法是用来执行线程计数器-1的,也就是多线程运行完之后,都调用此方法将计数器变成0,最后调用await()方法唤醒阻塞的位置,继续执行

2、await()

await() 方法是用来阻塞线程的,等待倒计锁被减到0的时候,才会唤醒该方法继续执行。也可以设置等待超时时间,如 countDownLatch.await(30, TimeUnit.SECONDS),设置超时时间则需要判断等待结果,true等待未超时、false等待已超时

使用示例:

 //等待方式一、正常阻塞
        int threadCount = 3;
        CountDownLatch countDownLatch = new CountDownLatch(threadCount);
 
        for (int i = 0; i < threadCount; i++) {
            executorService.execute(() -> {
                System.out.println("子线程" + Thread.currentThread().getName() + "执行!");
                countDownLatch.countDown();
            });
        }
        try {
            //阻塞主线程,等待 countDownLatch 减到0,然后被唤醒
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("主线程执行完毕!");
 
 
 
 
        //等待方式二、设置限时阻塞
        int threadCount = 3;
        CountDownLatch countDownLatch = new CountDownLatch(threadCount);
 
        for (int i = 0; i < threadCount; i++) {
            executorService.execute(() -> {
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                }
                System.out.println("子线程" + Thread.currentThread().getName() + "执行!");
                countDownLatch.countDown();
            });
        }
        try {
            //阻塞主线程,等待 countDownLatch 减到0,然后被唤醒
            boolean await = countDownLatch.await(2, TimeUnit.SECONDS);
            if (!await) {
                System.out.println("子线程执行需要3秒,阻塞只等待2秒,等待超时!");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("主线程执行完毕!");

总结: 
1、countDown() 是用来执行倒计锁 -1
2、await() 用来阻塞线程,直至 countDown() 把倒计锁减到 0 才会被唤醒
3、await() 可以设置阻塞超时等待时间,需要判断等待结果 boolean 值。如果不设置超时时间则会一直等待,不需要判断返回值

 

 业务场景实例代码: 

1.定时任务类:

调用同步数据接口方法

需要配置分布式锁配置类等信息 具体可以参考 【业务功能篇17】Springboot +shedlock锁 实现定时任务-CSDN博客


package com.task;
import net.javacrumbs.shedlock.spring.annotation.SchedulerLock;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;


@Component
@EnableScheduling
@Slf4j
public class DataSchedule {

    @Resource
    DataService dataService;

    /**
     * syncData
     * 
     */
    @Scheduled(cron = "0 30 0 * * ?") // 每天凌晨0点30分执行一次
    @SchedulerLock(name = "sync_task", lockAtLeastFor = "30m", lockAtMostFor = "30m")
    public void syncData() {
        if (SpringBeanUtils.isTestProfile()) {
            return;
        }
        dataService.syncDailyData();
    }
}
 2.同步数据接口
public interface DataService {
    /**
     * syncDailyData
     * 
     */
    void syncDailyData();
}
3.同步数据接口实现
@Service
@Slf4j
public class DataServiceImpl implements DataService {
    private static final int CORE_POOL_SIZE = 20; // 核心线程数,不建议太大,否则可能OOM

    @Resource
    DataService dataService;

	//定义线程池
    ThreadPoolExecutor executor = new ThreadPoolExecutor(CORE_POOL_SIZE, 50, 60L, TimeUnit.SECONDS,
        new ArrayBlockingQueue<>(2048), new ThreadFactoryBuilder().setNameFormat("sync-thread-%d").build(),
        new ThreadPoolExecutor.CallerRunsPolicy());


    /**
     * 定时同步数据
     */
    @Override
    public void syncDailyData() {
		//业务数据逻辑 忽略
        List<List<ItemConfig>> item = getItems();
        int taskNum = item.size();
		//根据数据量 每个子list数据集 单独一个线程执行  对应的计数器CountDownLatch 创建同样个数 每次任务执行就-1 
        CountDownLatch countDownLatch = new CountDownLatch(taskNum);
        List<FutureTask<ResponseVo>> tasks = new ArrayList<>();
		//创建任务用FutureTask封装 每个任务都调用SyncCallable类来执行,这个类是实现了Callable 任务执行内容都重写在call方法了
		//将list数据集和计数器 都传入  便于在call方法执行具体数据调度逻辑所使用 同时也会在这个方法中执行 latch.countDown() 
        itemCodeLists.forEach(list -> tasks.add(new FutureTask<>(new SyncCallable(list, countDownLatch))));
		//具体执行任务 
        executeTasks(tasks, countDownLatch);
    }


    public void executeTasks(List<FutureTask<ResponseVo>> tasks, CountDownLatch countDownLatch) {
        for (FutureTask<ResponseVo> task : tasks) {
		//通过定义线程池 执行每个任务  就会去执行封装的线程类SyncCallable中重写的call方法 任务内容写在call中,并且最后会执行latch.countDown() 计数-1
            executor.execute(task);
        }
        try {
            int successTask = 0;
			//当前主线程 等待 前面执行多批任务的多个子线程 直到countDownLatch减为0 主线程再被唤醒 接着执行后续的业务数据更新逻辑
			//因为需要等待这些数据调度到数据库之后,后续接着才能处理这些数据 所以要await等待完成
			//如果超过1分钟 还没执行完成 await就返回flase 并且我们还会手动去把任务停止,业务上定义超过1分钟还没完成就可能出现异常
			//所以就直接手动cancel取消任务,避免阻塞,避免线程一直占用内存
            boolean await = countDownLatch.await(1, TimeUnit.MINUTES);
            if (!await) {
                log.warn("Execute tasks timed out!");
            }
            for (FutureTask<ResponseVo> task : tasks) {
                task.cancel(true);
            }
            for (FutureTask<ResponseVo> task : tasks) {
			//任务超时没完成 或者线程内部返回null  get就是null 需要跳过 否则就是一次成功任务+1
                if (task.get() == null) {
                    continue;
                }
                if (task.get().getCode() == RestClientUtil.OK) {
                    successTask++;
                } else {
                    log.error(task.get().getMessage());
                }
            }
            log.info("Sync data finish! success/total = [{}/{}]", successTask, tasks.size());
			//业务逻辑 比如前面是同步数据到数据库,等待全部都执行完入库之后,就可以接着做对应业务逻辑
			//比如说对数据做一些清洗分析整合形成的最终业务所需数据
            dataService.updateItem();  
        } catch (InterruptedException | ExecutionException e) {
            log.error(e.toString());
        }
    }
}
 4.任务执行线程类
  •     核心数据调度逻辑 写在 call方法中 ,注意最后每次执行完该子线程任务后 需要执行countDown   计数-1

package com.thread.callable;

@Slf4j
public class SyncCallable implements Callable<ResponseVo> {

	//获取对应的业务类Bean 业务逻辑进行相应调用  这里用注解方式注入bean也是一样的
    private final DataService dataService = SpringBeanUtils.getBean(DataService.class);

    //计数器
    private final CountDownLatch latch;

    private final List<ItemConfig> item;

    String startDate;

    String endDate;

    public SyncCallable(List<ItemConfig> item, CountDownLatch countDownLatch) {
        this.item = item;
        this.latch = countDownLatch;
        startDate = LocalDate.now().minusDays(1).format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
        endDate = LocalDate.now().minusDays(1).format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
    }


    private Param DataParam(List<ItemConfig> itemConfigs) {
        if (CollectionUtils.isEmpty(itemConfigs)) {
            return DataParam.builder().build();
        }
        return DataParam.builder()
                .startTime(startDate + " 00:00:00")
                .endTime(endDate + " 23:59:59")
                .build();
    }

    /**
     * call
     * 
     * @return ResponseVo
     */
    @Override
    public ResponseVo call() {
		//dataService.query 调用了需要同步的数据 可能是通过第三方接口去取的来的数据
		DataParam DataParam = DataParam(item);
        ResponseVo responseVo = dataService.query(DataParam);
        try {
			//响应结果 转换 对应的实体类型 然后数据需要插入到数据库中
			List<JSONObject> jsons = (List<JSONObject>) responseVo.getData();
            List<Data> list = jsons.stream().map(String::valueOf)
                    .map(e -> JSON.parseObject(String.valueOf(e), Data.class))
                    .collect(Collectors.toList());
            dataService.insert(list);
            list.clear();
        } catch (Exception e) {
            log.error(e.toString());
            responseVo = ResponseUtils.errorResponse(Thread.currentThread().getName() + ": Error occurred when writing data.");
        } finally {
			//核心:最后每次执行完该子线程任务后 需要执行countDown   计数-1
            latch.countDown();
        }
		//返回的业务数据集 给上游返回任务执行后的数据进行使用或者判断
        return responseVo;
    }
}

总结:

  • 构建线程池,通过线程池来分配多个线程,执行多批数据任务
  • 定义线程类SyncCallable 实现Callable  重写call方法,具体的数据同步逻辑就是写在call,定义计数器,每次执行完成 计数器-1 
  • List<FutureTask<ResponseVo>>tasks 封装SyncCallable 多批任务 
  • 主线程调用执行 :线程池执行每个任务 executor.execute(task) ,通过 countDownLatch.await(10, TimeUnit.MINUTES); 等待任务执行完成,数据入库后,主线程接着需要执行数据处理逻辑再继续执行

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/399653.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【java】小学生数学练习题目生成系统

本文章主要是CSDN-问答板块&#xff0c;有题主提出的问题&#xff0c;我这边将完整代码提供出来&#xff0c;仅供大家参考学习&#xff01; 一、效果截图 二、直接上代码 package com.example.dingtalk.question;import javax.script.ScriptEngine; import javax.script.Scrip…

点成分享|如何让地球更绿,它能给你答案

一、背景介绍 随着全球经济的飞速发展&#xff0c;环境问题也日益严重。现代社会面临着诸如全球变暖、气候异常、空气和水质污染等诸多环境问题。其中&#xff0c;温室气体的排放是导致全球变暖的主要原因之一。温室气体的排放量上升加剧气候异常&#xff0c;影响人类生存和自…

NFC三大工作模式及其在物联网应用实例

NFC支持三种通信模式&#xff1a;读写模式、点对点模式和卡模拟模式。在此三种模式下&#xff0c;都仅需简单点击便可启动传输。 在读写模式下&#xff0c;系统执行非接触式读写功能。该系统的NFC芯片与内置NFC的设备-诸如非接触式智能卡、NFC标签或具有NFC功能的智能手机&…

瑞盟MS5188N——16bit、8 通道、500kSPS、 SAR 型 ADC

产品简述 MS5188N 是 8 通道、 16bit 、电荷再分配逐次逼近型模数 转换器&#xff0c;采用单电源供电。 MS5188N 拥有多通道、低功耗数据采集系统所需的所有 组成部分&#xff0c;包括&#xff1a;无失码的真 16 位 SAR ADC &#xff1b;用于将输入配 置为单端输入…

unity学习(31)——跳转到角色选择界面(打勾?手滑挂错脚本)

There are 2 audio listeners in the scene. Please ensure there is always exactly one audio listener in the scene. 是因为后来创建了一个camera&#xff0c;因为camera中自带一个组件Audio Listener。所以有两个camera就有两个audio listener导致报错。 一个简单的解决…

WebRTC最新版报错解决:city.wav:missing and no known rule to make it (二十六)

简介: CSDN博客专家,专注Android/Linux系统,分享多mic语音方案、音视频、编解码等技术,与大家一起成长! 优质专栏:Audio工程师进阶系列【原创干货持续更新中……】🚀 优质专栏:多媒体系统工程师系列【原创干货持续更新中……】🚀 人生格言: 人生从来没有捷径,只…

读懂2024年数字孪生发展新趋势!十大权威白皮书放送!

2024年&#xff0c;数字孪生 该往哪些方向走&#xff1f; 新技术的不断涌现 又会带来怎样的行业变迁 …… 在开工之际&#xff0c;我们整理了 51WORLD主导、参编的 十大权威数字孪生白皮书、行业报告 以及产业优秀案例集 分享给想要提升自我的朋友们 读完这些 上面看似…

【数据结构】时间复杂度与空间复杂度

时间复杂度 算法的时间复杂度并不是指一个代码运行时间的快慢&#xff0c;因为在不同机器上运行的时间肯定不同&#xff0c;因此算法的时间复杂度指的是基本操作的执行次数&#xff0c;他是一个数学意义上的函数。这个函数并不是C语言中那种函数&#xff0c;而是一个数学函数&…

WebGL中开发科学数据可视化应用

WebGL在科学数据可视化领域有广泛的应用&#xff0c;可以用于呈现和解释复杂的科学数据。以下是在WebGL中开发科学数据可视化应用时的一些建议&#xff0c;希望对大家有所帮助。北京木奇移动技术有限公司&#xff0c;专业的软件外包开发公司&#xff0c;欢迎交流合作。 1.选择合…

网络原理 - HTTP/HTTPS(4)

HTTP响应详解 认识"状态码"(status code) 状态码表示访问一个页面的结果.(是访问成功,还是失败,还是其它的一些情况...).(响应结果如何) 学习状态码 -> 为了调试问题. 写服务器时,按照状态码的含义正确使用. 200 OK 这是最常见的状态码,表示访问成功. 抓包抓…

Android加载富文本

直接用webview加载&#xff1a; package com.example.testcsdnproject;import androidx.appcompat.app.AppCompatActivity;import android.annotation.SuppressLint; import android.graphics.Color; import android.os.Bundle; import android.util.Log; import android.webk…

docker (十一)-进阶篇-docker-compos最佳实践部署zabbix

一 部署docker环境 关闭防火墙、selinux、开启docker&#xff0c;并设置开机自启动 注意点&#xff1a;docker部署的时候&#xff0c;bip要指定&#xff0c;不然会导致虚拟机ip和容器ip冲突&#xff0c;ssh连不上虚拟机 部署请参考 docker &#xff08;二&#xff09;-yum…

数据库管理-第153期 Oracle Vector DB AI-05(20240221)

数据库管理153期 2024-02-21 数据库管理-第153期 Oracle Vector DB & AI-05&#xff08;20240221&#xff09;1 Oracle Vector的其他特性示例1&#xff1a;示例2 2 简单使用Oracle Vector环境创建包含Vector数据类型的表插入向量数据 总结 数据库管理-第153期 Oracle Vecto…

计算机服务器中了devos勒索病毒怎么办?Devos勒索病毒解密数据恢复

网络技术的不断发展与更新&#xff0c;为企业的生产运营提供了有利保障&#xff0c;企业的生产运营离不开数据支撑&#xff0c;通过企业数据可以综合调整发展运营方向&#xff0c;但网络是一把双刃剑&#xff0c;近期&#xff0c;云天数据恢复中心接到许多企业的求助&#xff0…

2.20 Qt day1

一. 思维导图 二. 消化常用类的使用&#xff0c;以及常用成员函数对应的功能 按钮类QPushButton&#xff1a; mywidget.h&#xff1a; #ifndef MYWIDGET_H #define MYWIDGET_H#include <QWidget> #include<QPushButton>//按钮类 #include<QIcon>class MyW…

ts快速入门

文章目录 一、运行环境1、线上Playground2、VSCode 编辑器3、Code Runner 插件4、ts-node 二、声明1、变量声明2、常量声明3、类型推断 三、常用数据类型1、number2、string3、boolean4、数组5、对象 四、函数1、函数声明语法2、参数详解&#xff08;1&#xff09;特殊语法&…

C++学习Day08之类模板碰到继承的问题以及解决

目录 一、程序及输出1.1 指定父类T数据类型1.2 子类T指定父类T数据类型 二、分析与总结 一、程序及输出 1.1 指定父类T数据类型 必须要指定出父类中的T数据类型&#xff0c;才能给子类分配内存 正确使用 &#xff1a; #include<iostream> using namespace std;templa…

webpack打包速度优化思维导图

webpack打包速度优化思维导图 前言附件 前言 去年的时候公司一个项目体积过大&#xff0c;我是m1芯片的macpro&#xff0c;光启动就要1分钟&#xff0c;配置差点都电脑&#xff0c;启动就要3分钟&#xff0c;自然打包速度也会慢很多&#xff0c;我们是gitlab设置成了自动打包的…

春招面试准备笔记——NMS(非极大值抑制)算法

NMS&#xff08;非极大值抑制&#xff09;算法非极大值抑制是用于减少物体检测算法中重叠边界框或区域的数量的技术。通过对每个类别的检测框按置信度排序&#xff0c;然后逐个遍历&#xff0c;保留置信度最高的框&#xff0c;并抑制与其重叠且置信度低的框&#xff0c;从而得到…

Apache Httpd 常见漏洞解析(全)

一、Apache HTTPD 换行解析漏洞 漏洞编号&#xff1a;CVE-2017-15715 Apache HTTPD是一款HTTP服务器&#xff0c;它可以通过mod_php来运行PHP网页。 其2.4.0~2.4.29版本中存在一个解析漏洞。 在解析PHP时&#xff0c;1.php\x0A将被按照PHP后缀进行解析&#xff0c;导致绕过…
最新文章