【智能排班系统】快速消费线程池

文章目录

  • 线程池介绍
    • 线程池核心参数
      • 核心线程数(Core Pool Size)
      • 最大线程数(Maximum Pool Size)
      • 队列(Queue)
      • 线程空闲超时时间(KeepAliveTime)
      • 拒绝策略(RejectedExecutionHandler)
    • 线程池执行流程
  • 快速消费线程池
    • 快速消费线程池组件
      • 相关依赖
      • 快速消费队列
      • 快速消费线程池
      • 获取配置文件的配置
      • 配置线程池Bean到容器中
  • 说明

线程池介绍

线程池作为多线程编程中的重要工具,旨在通过复用已创建的线程来减少线程创建与销毁的开销,提升系统资源利用率和并发性能。要有效地使用线程池,理解和配置其核心参数至关重要。

线程池核心参数

创建一个线程池的代码如下,可以看到构造方法需要传递几个参数,下文会详细展示每个参数的含义:

// 导包
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

// 创建线程池
return new ThreadPoolExecutor(poolConfigProperties.getCoreSize(),
	poolConfigProperties.getMaxSize(),
	poolConfigProperties.getKeepAliveTime(),
	TimeUnit.SECONDS,
	//队列的最大容量
	new LinkedBlockingDeque<>(600),
	//使用默认的工程
	Executors.defaultThreadFactory(),
	//使用拒绝新来的拒绝策略
	new ThreadPoolExecutor.CallerRunsPolicy()
);

核心线程数(Core Pool Size)

核心线程数是指线程池在初始化时创建并保持活动状态的线程数量。即使这些线程当前没有任务执行,它们也不会被回收。核心线程数通常根据系统资源、预期并发负载和任务特性来设定。核心线程在池中长期存在,能够快速响应新提交的任务,减少任务提交后的等待时间。

最大线程数(Maximum Pool Size)

最大线程数限制了线程池能同时容纳的线程总数。当核心线程数无法满足当前任务需求时,线程池会创建额外的线程直至达到最大线程数。超过这个阈值后,线程池将采取拒绝策略处理新提交的任务。合理设置最大线程数,既能防止资源过度消耗导致系统过载,又能确保在高并发场景下有足够的线程处理任务。

队列(Queue)

线程池通常配合任务队列使用,用于暂存待处理的任务。当所有核心线程都处于忙碌状态且未达到最大线程数时,新提交的任务会被放入队列中等待。常见的队列类型包括无界队列(如 LinkedBlockingQueue)、有界队列(如 ArrayBlockingQueue)和优先级队列(如 PriorityBlockingQueue)。队列的选择和容量大小直接影响线程池的阻塞策略和任务调度效率。

线程空闲超时时间(KeepAliveTime)

当线程池中存在超出核心线程数的非核心线程,并且这些线程在一段时间内(即 KeepAliveTime)没有执行任何任务,则会自动终止。这个参数有助于释放闲置资源,避免资源浪费。对于长期存在大量任务的系统,可以适当增大或关闭这个超时时间。

拒绝策略(RejectedExecutionHandler)

当线程池和队列都无法接纳新任务时,需要采用拒绝策略来处理。常见的拒绝策略有:

  • AbortPolicy:默认策略,直接抛出 RejectedExecutionException。
  • CallerRunsPolicy:由提交任务的线程自行执行任务。
  • DiscardPolicy:默默地丢弃任务,不抛出异常也不执行。
  • DiscardOldestPolicy:丢弃队列中最旧的任务,尝试提交新任务。

在这里插入图片描述

线程池执行流程

  • 初始阶段线程池创建并启动核心线程数指定数量的线程。此时,如果有任务提交,直接由这些核心线程执行。

  • 核心线程饱和当所有核心线程都在执行任务且任务队列尚未满时,新提交的任务被放入队列等待

  • 队列满载:若任务提交速率持续高于线程处理速度,队列达到其容量上限。此时,线程池开始创建新的线程(不超过最大线程数),直接执行新提交的任务。

  • 达到最大线程数:若任务增长仍然无法遏制,线程池达到最大线程数。此时,新提交的任务将触发拒绝策略

  • 任务减少与线程收缩:当任务提交速率降低,线程池中的线程开始完成任务并变得空闲。对于非核心线程,若在 KeepAliveTime 时间内未获得新任务,将被终止。系统逐渐回归到更低的线程数,直至仅保留核心线程

在任务量增长的过程中,线程池通过动态调整线程数量和利用任务队列,既保证了系统的响应能力,又防止了资源过度消耗。

快速消费线程池

快速消费线程池通过对上述线程池进行改造,当核心线程饱和时,再提交的任务不是先加入到队列中,而是直接创建非核心线程来执行新提交任务。快速消费线程池可以加快任务的执行,减少任务的堆积。

快速消费线程池组件

在这里插入图片描述

相关依赖

<dependencies>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
        <scope>provided</scope>
    </dependency>
</dependencies>

快速消费队列

该类继承自LinkedBlockingQueue,并对其offer方法进行定制,以配合EagerThreadPoolExecutor实现更灵活的任务调度策略。主要目的是在满足特定条件时,促使线程池创建非核心线程以快速处理任务,而非直接将任务放入队列等待处理。

import lombok.Data;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;

/**
 * 快速消费任务队列
 */
@Data
public class EagerTaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> {

    private EagerThreadPoolExecutor executor;

    /**
     * 构造函数,传入队列容量参数,用于初始化LinkedBlockingQueue。
     *
     * @param capacity 队列的最大容量
     */
    public EagerTaskQueue(int capacity) {
        super(capacity);
    }

    /**
     * 重写父类LinkedBlockingQueue的offer方法,实现自定义的任务入队逻辑
     * 当没有到达最大线程时,返回false,让其创建非核心线程
     *
     * @param runnable 待添加的任务对象
     * @return 如果任务成功加入队列或触发线程池创建非核心线程,则返回true;否则返回false
     */
    @Override
    public boolean offer(Runnable runnable) {
        // 获取当前线程池的线程数量
        int currentPoolThreadSize = executor.getPoolSize();

        // 检查是否有核心线程处于空闲状态(已提交任务数小于当前线程数)
        if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
            // 如果有核心线程正在空闲,将任务加入阻塞队列,由核心线程进行处理任务
            return super.offer(runnable);
        }

        // 检查当前线程池线程数量是否小于最大线程数
        if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
//            System.out.println("线程池线程数量小于最大线程数,返回 False,线程池会创建非核心线程");
            // 当前线程池线程数量小于最大线程数,返回false,触发线程池创建非核心线程处理任务
            return false;
        }
        // 如果当前线程池数量大于最大线程数,任务加入阻塞队列,等待线程池中的已有线程处理
        return super.offer(runnable);
    }

    /**
     *
     * @param runnable      待添加的任务对象
     * @param timeout       等待加入队列的超时时间
     * @param timeUnit      超时时间单位
     * @return              如果任务成功加入队列或触发线程池创建非核心线程,则返回true;否则返回false
     * @throws InterruptedException 如果在等待过程中线程被中断
     * @throws RejectedExecutionException 如果线程池已关闭
     */
    public boolean retryOffer(Runnable runnable, long timeout, TimeUnit timeUnit) throws InterruptedException {
        // 如果线程池已关闭,则抛出RejectedExecutionException异常。
        if (executor.isShutdown()) {
            throw new RejectedExecutionException("Executor is shutdown!");
        }
        return super.offer(runnable, timeout, timeUnit);
    }
}

快速消费线程池

该类继承自ThreadPoolExecutor,并对其进行定制,以实现更灵活的任务调度策略。主要特点包括:

  • 使用自定义的EagerTaskQueue作为工作队列,支持根据线程池状态动态调整任务入队逻辑。
  • 维护正在处理的任务数量计数器(submittedTaskCount),以便EagerTaskQueue判断是否有核心线程处于空闲状态。
  • 在execute方法中,处理任务提交失败的情况,尝试将任务重新投递到队列或使用拒绝策略。
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 快速消费线程池
 */
public class EagerThreadPoolExecutor extends ThreadPoolExecutor {

    /**
     * 使用AtomicInteger记录当前正在处理的任务数量,提供线程安全的计数操作。
     */
    private final AtomicInteger submittedTaskCount = new AtomicInteger(0);

    /**
     * 构造函数,接受线程池相关的配置参数,包括核心线程数、最大线程数、线程存活时间、时间单位、工作队列、线程工厂和拒绝策略。
     * 工作队列类型为自定义的EagerTaskQueue,用于实现特殊的任务入队逻辑。
     *
     * @param corePoolSize         核心线程数
     * @param maximumPoolSize      最大线程数
     * @param keepAliveTime        线程空闲后的存活时间
     * @param unit                 时间单位
     * @param workQueue            工作队列,类型为EagerTaskQueue
     * @param threadFactory        线程工厂,用于创建新线程
     * @param handler              拒绝策略,当线程池和队列无法接受新任务时的处理方式
     */
    public  EagerThreadPoolExecutor(int corePoolSize,
                                   int maximumPoolSize,
                                   long keepAliveTime,
                                   TimeUnit unit,
                                   EagerTaskQueue<Runnable> workQueue,
                                   ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    /**
     * 创建一个EagerThreadPoolExecutor实例的便捷方法
     * 包括创建EagerTaskQueue并设置其与线程池的关联
     *
     * @param corePoolSize         核心线程数
     * @param maximumPoolSize      最大线程数
     * @param keepAliveTime        线程空闲后的存活时间
     * @param unit                 时间单位
     * @param queueCapacity        队列容量
     * @param threadFactory        线程工厂,用于创建新线程
     * @param handler              拒绝策略,当线程池和队列无法接受新任务时的处理方式
     * @return                     创建的EagerThreadPoolExecutor实例
     */
    public static EagerThreadPoolExecutor createEagerThreadPoolExecutor(int corePoolSize,
                                                                 int maximumPoolSize,
                                                                 long keepAliveTime,
                                                                 TimeUnit unit,
                                                                 int queueCapacity,
                                                                 ThreadFactory threadFactory,
                                                                 RejectedExecutionHandler handler) {
        EagerTaskQueue eagerTaskQueue = new EagerTaskQueue(queueCapacity);
        EagerThreadPoolExecutor eagerThreadPoolExecutor = new EagerThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, eagerTaskQueue, threadFactory, handler);
        eagerTaskQueue.setExecutor(eagerThreadPoolExecutor);
        return eagerThreadPoolExecutor;
    }

    /**
     * 获取当前正在处理的任务数量。
     *
     * @return 当前正在处理的任务数量
     */
    public int getSubmittedTaskCount() {
        return submittedTaskCount.get();
    }

    /**
     * 重写父类的afterExecute方法,当任务执行完成后,将正在执行的任务数量减一。
     * 这是ThreadPoolExecutor提供的钩子方法,用于在任务执行结束后进行清理或其他操作。
     *
     * @param r       执行完毕的任务
     * @param t       执行过程中抛出的异常(如果有的话)
     */
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        // 任务执行完成,将正在执行数量-1
        submittedTaskCount.decrementAndGet();
    }

    /**
     * 重写父类的execute方法,用于提交任务到线程池。
     * 在提交任务之前,先将正在执行的任务数量加一。若提交失败,根据具体情况尝试重新投递任务或使用拒绝策略。
     *
     * @param command 待提交的任务
     * @throws RejectedExecutionException 如果任务无法被接受,且无法重新投递到队列
     */
    @Override
    public void execute(Runnable command) {
//        System.out.println("使用快速消费线程池执行任务");

        // 将正在执行任务数量 + 1
        submittedTaskCount.incrementAndGet();
        try {
            super.execute(command);
        } catch (RejectedExecutionException ex) {
            // 任务被拒绝,间隔一定时间,将任务重新投递到队列
            EagerTaskQueue eagerTaskQueue = (EagerTaskQueue) super.getQueue();
            try {
                // 将任务重新投递到队列
                if (!eagerTaskQueue.retryOffer(command, 10, TimeUnit.MILLISECONDS)) {
                    // 队列已满,使用拒绝策略,并减少计数
                    submittedTaskCount.decrementAndGet();
                    throw new RejectedExecutionException("Queue capacity is full.", ex);
                }
            } catch (InterruptedException iex) {
                // 重试失败,将正在执行任务数量 - 1
                submittedTaskCount.decrementAndGet();
                throw new RejectedExecutionException(iex);
            }
        } catch (Exception ex) {
            // 执行失败,将正在执行任务数量 - 1
            submittedTaskCount.decrementAndGet();
            throw ex;
        }
    }
}

获取配置文件的配置

在这里插入图片描述

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@ConfigurationProperties(prefix = "sss.thread")
@Component//将该配置放到容器中
@Data
public class ThreadPoolConfigProperties {

    private Integer coreSize;
    private Integer maxSize;
    private Integer keepAliveTime;

}

配置线程池Bean到容器中

import com.dam.eager.EagerThreadPoolExecutor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Configuration
public class MyThreadConfig {

    /**
     * @param poolConfigProperties 如果需要使用到ThreadPoolConfigProperties,一定要使用Component将其加入到容器中
     * @return
     */
    @Bean
    public ThreadPoolExecutor threadPoolExecutor(ThreadPoolConfigProperties poolConfigProperties) {
        // 普通线程池
//        return new ThreadPoolExecutor(poolConfigProperties.getCoreSize(),
//                poolConfigProperties.getMaxSize(),
//                poolConfigProperties.getKeepAliveTime(),
//                TimeUnit.SECONDS,
//                //队列的最大容量
//                new LinkedBlockingDeque<>(600),
//                //使用默认的工程
//                Executors.defaultThreadFactory(),
//                //使用拒绝新来的拒绝策略
//                new ThreadPoolExecutor.CallerRunsPolicy()
//        );

        // 快速消费线程池
        return EagerThreadPoolExecutor.createEagerThreadPoolExecutor(
                poolConfigProperties.getCoreSize(),
                poolConfigProperties.getMaxSize(),
                poolConfigProperties.getKeepAliveTime(),
                TimeUnit.SECONDS,
                // 队列的最大容量
                600,
                // 使用默认的工程
                Executors.defaultThreadFactory(),
                // 使用拒绝新来的拒绝策略
                new ThreadPoolExecutor.CallerRunsPolicy()
        );
    }
}

说明

快速线程池的实现参考马哥 12306 的代码,代码仓库为12306,该项目含金量较高,有兴趣的同学可以去学习一下。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/515902.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Python学习笔记-Flask接收post请求数据并存储数据库

1.引包 from flask import Flask, request, jsonify from flask_sqlalchemy import SQLAlchemy 2.配置连接,替换为自己的MySQL 数据库的实际用户名、密码和数据库名 app Flask(__name__) #创建应用实列 app.config[SQLALCHEMY_DATABASE_URI] mysqlpymysql://ro…

微软文本转语音和语音转文本功能更新,效果显著!

今天我要和大家分享一个新功能更新——微软的文本转语音和语音转文本功能。最近&#xff0c;微软对其AI语音识别和语音合成技术进行了重大升级&#xff0c;效果非常好&#xff0c;现在我将分别为大家介绍这两个功能。 先来听下这个效果吧 微软文本转语音和语音转文本功能更新 …

二分答案(砍树,借教室)

二分的两种情况附代码&#xff1a; 二分查找条件&#xff1a;单调&#xff0c;二段性 例题1&#xff1a;P1873 [COCI 2011/2012 #5] EKO / 砍树 - 洛谷 | 计算机科学教育新生态 (luogu.com.cn) 上代码&#xff1a; #include<bits/stdc.h> using namespace std; const …

【数据结构与算法】归并排序(详解:递归与非递归的归并排序 | 赠:冒泡排序和选择排序)

前言 本篇博客会对排序做一个收尾&#xff0c;将最经典的七大排序介绍完毕。 这次的重点正如标题&#xff0c;主要讲的是归并排序&#xff0c;还会带过相对简单很多的冒泡排序和选择排序。在最后还会给这七大排序做出一个时间复杂度和稳定性展示的总结收尾。同时&#xff0c;这…

钉钉事件订阅前缀树算法gin框架解析

当钉钉监测到发生一些事件&#xff0c;如下图 此处举例三个事件user_add_org、user_change_org、user_leave_org&#xff0c;传统的做法是&#xff0c;我们写三个if条件&#xff0c;类似下图 这样字符串匹配效率比较低&#xff0c;于是联想到gin框架中的路由匹配算法&#xff0…

非写代码无以致远

标题党一下&#xff0c;本篇文章主要汇总了一些代码题&#xff0c;让大家写一些代码练习一下吧&#xff01; 变种水仙花_牛客题霸_牛客网 (nowcoder.com) #include<stdio.h> int main() {for (int i 10000; i < 99999; i) {int sum 0;for (int j 10; j < 1000…

码农失业倒计时?全球首个大厂AI程序员来了

进入互联网时代&#xff0c;程序员作为高收入职业的代表&#xff0c;长久以来一直是众多求职者梦寐以求的工作方向。程序员们凭借其对计算机科学的深刻理解和技术创新能力&#xff0c;不仅推动了科技的进步&#xff0c;也为自己赢得了可观的经济回报。 然而&#xff0c;随着人…

AD20全流程的使用笔记

目录 首先一个完整的AD工程文件需要我们自己建立的文件有这些&#xff1a; 新建工程&#xff1a; 从现有的工程文件中将元件添加到原理图库&#xff1a; 元件的摆放&#xff1a; 器件的复制及对齐&#xff1a; 导线、Netlabe、端口的添加&#xff1a; Value值的校对&…

可视化大屏 - 项目1

文章目录 技术栈echarts 可视化需求分析代码实现 技术栈 flexible.js rem 实现不同终端下的响应式布局&#xff0c;根据不同屏幕宽度&#xff0c;自适配布局&#xff1b; html中引入index.js&#xff0c;可以改名为flexible.js&#xff1b;默认划分10份&#xff0c;可以自己修…

HarmonyOS 应用开发之TaskPool和Worker的对比 (TaskPool和Worker)

TaskPool&#xff08;任务池&#xff09;和Worker的作用是为应用程序提供一个多线程的运行环境&#xff0c;用于处理耗时的计算任务或其他密集型任务。可以有效地避免这些任务阻塞主线程&#xff0c;从而最大化系统的利用率&#xff0c;降低整体资源消耗&#xff0c;并提高系统…

日期专题:做题笔记 (时间显示/星期计算/星系炸弹/第几天/纪念日)

目录 时间显示 代码 星期计算 代码 星系炸弹 代码 第几天 纪念日 代码 时间显示 时间显示 这道题主要是单位换算。 ①单位换算 ②输出格式&#xff1a; a. 不足两位补前导零。利用printf输出 b. 注意 long long 输出格式应该是 %lld 长整型 代码 #include <…

Day66-企业级防火墙iptables精讲2

Day66-企业级防火墙iptables精讲2 1. iptables项目案例2&#xff1a;局域网共享上网&#xff1a;2. iptables项目案例3&#xff1a;外网IP的端口映射到内网IP的端口3. 老男孩教育iptables项目案例4&#xff1a;IP一对一映射&#xff08;DMZ&#xff09;4. 老男孩教育iptables项…

Java常用类和基础API

文章目录 1. 字符串相关类之不可变字符序列&#xff1a;String1.1 String的特性1.2 String的内存结构1.2.1 概述1.2.2 练习类型1&#xff1a;拼接1.2.3 练习类型2&#xff1a;new1.2.4 练习类型3&#xff1a;intern() 1.3 String的常用API-11.3.1 构造器1.3.2 字符串对象的比较…

【THM】Protocols and Servers(协议和服务器)-初级渗透测试

介绍 这个房间向用户介绍了一些常用的协议,例如: HTTP协议文件传输协议POP3邮件传输协议IMAP每个协议的每个任务都旨在帮助我们了解底层发生的情况,并且通常被优雅的GUI(图形用户界面)隐藏。我们将使用简单的 Telnet 客户端来使用上述协议进行“对话”,以充分了解GUI客户…

Unity开发一个FPS游戏之三

在前面的两篇博客中&#xff0c;我已实现了一个FPS游戏的大部分功能&#xff0c;包括了第一人称的主角运动控制&#xff0c;武器射击以及敌人的智能行为。这里我将继续完善这个游戏&#xff0c;包括以下几个方面&#xff1a; 增加一个真实的游戏场景&#xff0c;模拟一个废弃的…

5.2 通用代码,数组求和,拷贝数组,si配合di翻转数组

5.2 通用代码&#xff0c;数组求和&#xff0c;拷贝数组&#xff0c;si配合di翻转数组 1. 通用代码 通用代码类似于一个用汇编语言写程序的一个框架&#xff0c;也类似于c语言的头文件编写 assume cs:code,ds:data,ss:stack data segmentdata endsstack segmentstack endsco…

刘小光本就疑心赵本山与他媳妇李琳有染,赵本山为证实清白便想起蛋糕上的字,结果呢?

刘小光本就疑心赵本山与他媳妇李琳有染&#xff0c;赵本山为证实清白便想起蛋糕上的字&#xff0c;结果呢&#xff1f; ——小品《生日快乐》&#xff08;中5&#xff09;的台词 &#xff08;接上&#xff09; 赵本山&#xff1a;噢!对对!那谁&#xff0c;老四&#xff0c;是…

GEE错误——土地分类中出现Line 126:composite.select(...).classify is not a function

错误 Line 126:composite.select(...).classify is not a function 出现 "GEE在土地分类中出现的一个问题... is not a function" 的主要原因可能有几种 1. 变量或函数名拼写错误:检查代码中的该函数或变量名是否正确拼写,包括大小写。 2. 函数或变量未声明:检查…

Docker,anaconda环境的部署与迁移

功能上线将提上日程&#xff0c;但是如何将我windows环境下的程序放到linux服务器的测试环境跑通呢&#xff1f;这是我这整个清明假期将要解决的一件事&#xff0c;最蠢的办法就是看自己的环境下有哪些依赖&#xff0c;如何到服务器上一个一个下&#xff0c;但是首先这个方法很…

Brain.js 的力量:构建多样化的人工智能应用程序

机器学习&#xff08;ML&#xff09;是人工智能 (AI) 的一种形式&#xff0c;旨在构建可以从处理的数据中学习或使用数据更好地执行的系统。人工智能是模仿人类智能的系统或机器的总称。 机器学习&#xff08;ML&#xff09;与复杂的数学纠缠在一起&#xff0c;让大多数初学者…