让ThreadPoolExecutor无所遁形:Java线程池运行原理详解

ThreadPoolExecutor的核心工作原理

当我们在Java中讨论并发和多线程时,ThreadPoolExecutor 是不可或缺的一个类。在 java.util.concurrent 包下,该类负责管理线程池内的线程,包括线程的创建、执行、管理以及线程池的监控等。理解 ThreadPoolExecutor 如何保证线程池正确运作是非常关键的。本章将带您深入源码,解析 ThreadPoolExecutor 的核心工作原理。

线程池状态和控制流程概述

file
首先,让我们来了解一下 ThreadPoolExecutor 是如何通过内部的状态控制来管理线程池的。ThreadPoolExecutor 类维护着一个 ctl 的原子整型变量,该变量高位存储线程池状态,低位存储线程数量。

ctl的位字段表示方法

在 ThreadPoolExecutor 源码中,您会看到如下定义:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// 状态在高位
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

此代码段定义了 ThreadPoolExecutor 状态。它用位操作确保状态存储在高位,而工作线程数量存储在低位。线程池的状态反映了线程池的生命周期,比如运行中、关闭中等。

状态转换及控制

线程池状态通过 ctl 变量控制,并提供了几个辅助函数来管理这些状态:

private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

这些函数帮助我们获取当前的状态以及当前正在运行的线程数量,还通过 compareAndSet 方法保证状态更新操作的原子性。
综上所述,ThreadPoolExecutor 类的 ctl 相关属性是线程池正确运行的关键部分,它不仅标示了线程池的状态,还协调着线程的工作流程。

ThreadPoolExecutor中的属性详解

在深度了解 ThreadPoolExecutor 之前,我们需要了解一些它的键属性。这些属性共同决定了线程池的行为和性能。

核心线程与最大线程数

corePoolSize 和 maximumPoolSize 这两个参数分别代表着线程池中核心线程数量和允许的最大线程数量。核心线程会一直存活,即使它们处于空闲状态,而非核心线程如果空闲时间超过了 keepAliveTime 就会被终止。

private volatile int corePoolSize;
private volatile int maximumPoolSize;

任务队列与SynchronousQueue特性

任务队列是存放提交未执行任务的缓冲队列。ThreadPoolExecutor 使用一个 BlockingQueue 来存放任务。

private final BlockingQueue<Runnable> workQueue;

其中 SynchronousQueue 是一种无存储空间的阻塞队列,每个 put 必须等待一个 take,反之亦然。

keepAliveTime与TimeUnit的作用

当线程数量超过核心线程数时,多余的空闲线程将在等待新任务的时候,最多等待 keepAliveTime 所指定的时间长度,超时后将会被终止。

private volatile long keepAliveTime;

这里的 TimeUnit 是一个枚举,用来指定 keepAliveTime 的时间单位。

private final TimeUnit unit;

ThreadFactory的自定义线程创建

用户可以通过实现 ThreadFactory 接口来自定义线程创建方式,比如设置线程名、分组、优先级等。

private volatile ThreadFactory threadFactory;

Handler的拒绝策略

当任务太多,无法及时处理时,我们必须提供一种策略来处理这些额外的任务。ThreadPoolExecutor 支持几种拒绝策略。

private volatile RejectedExecutionHandler handler;

ThreadPoolExecutor类中的重要操作方法解析

ThreadPoolExecutor 中定义了几种核心的方法,用于执行任务、管理线程池的生命周期等。在本章中,我们将通过源码来分析这些关键方法是如何实现的。

execute方法的任务提交流程

execute 方法是线程池的核心之一,用于提交任务供线程池执行。这个方法主要涉及到任务的接收和线程的调度处理。

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

在这段代码中,首先检查传入的任务对象是否为 null,然后根据当前线程数量和核心线程数比较,以便决定是否需要添加新的工作线程。如果当前运行状态是 RUNNING 并且任务可以成功添加到队列中,那么会再次检查线程池的状态并尝试添加一个非核心线程。如果这些步骤都失败了,那么将会调用拒绝策略来处理这个任务。

submit方法与FutureTask的协同

除了 execute 方法,ThreadPoolExecutor 同时提供了 submit 方法,这个方法允许提交带返回值的任务。其内部实际上是转化为 FutureTask 来处理。

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

submit 方法首先将 Callable 任务转化为 FutureTask,再调用 execute 方法进行执行。返回的 Future 对象可以用来获取异步执行结果。

shutdown和shutdownNow方法的区别

shutdown 方法用于平缓的关闭线程池,它会等待正在执行的任务执行完毕,但不接收新任务。

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN);
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
}

而 shutdownNow 方法则更加激进,它试图停止所有正在执行的任务,并返回等待执行的任务列表。

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(STOP);
        interruptWorkers();
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    return tasks;
}

ThreadPoolExecutor类中的重要内部类

ThreadPoolExecutor 的功能实现,除了依赖于它的核心属性和方法之外,还依赖于一些关键的内部类。这些内部类扮演着线程池运作中不可或缺的角色。

Worker类的作用和生命周期

ThreadPoolExecutor 使用 Worker 类来封装线程池中的每个工作线程。Worker 类是 ThreadPoolExecutor 的一个私有内部类,并且实现了 Runnable 接口。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
    final Thread thread;
    Runnable firstTask;

    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    public void run() {
        runWorker(this);
    }

    // 省略其他方法
}

这段代码展示了 Worker 类的简化结构。Worker 类首先使用给定的 ThreadFactory 来创建新的线程,并在创建时可以选择传入一个首个任务。这个类还利用了AQS来控制线程的状态。每个工作线程的生命周期开始于它对 firstTask 的执行,然后它会持续从任务队列获取并执行任务,直到线程池终止或线程无任务可执行而被终止。

内部阻塞队列的实现与特性

线程池中使用阻塞队列来暂存待执行的任务。这是一种线程安全的队列实现,能够在高并发场景下正确地管理任务。

private final BlockingQueue<Runnable> workQueue;

ThreadPoolExecutor 支持不同类型的阻塞队列,如 ArrayBlockingQueue、LinkedBlockingQueue 和 PriorityBlockingQueue。每种队列都有其特殊的使用场景和性能特点。

LinkedBlockingQueue与ArrayBlockingQueue

LinkedBlockingQueue 和 ArrayBlockingQueue 是两种常见的阻塞队列实现。LinkedBlockingQueue 基于链表结构,理论上具有更高的处理效率,而 ArrayBlockingQueue 基于数组实现,固定容量并且在预分配内存方面性能更优。

DelayedWorkQueue的定时任务处理

如果 ThreadPoolExecutor 配置为 ScheduledThreadPoolExecutor,则使用 DelayedWorkQueue 来处理定时任务。这个队列能够让线程延时或周期性地执行任务。

ThreadPoolExecutor的扩展与自定义

ThreadPoolExecutor 提供了几个可以覆写的钩子方法,使得我们可以根据业务需求来扩展和自定义线程池的行为。同时,我们可通过自定义拒绝策略来处理无法立即执行的任务。本章将介绍如何利用这些功能来增强 ThreadPoolExecutor 的功能。

beforeExecute、afterExecute和terminated方法的扩展

ThreadPoolExecutor 允许我们在任务执行前后以及线程池终止时执行一些自定义逻辑。这是通过覆写 beforeExecute、afterExecute 和 terminated 方法来实现的。

protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
protected void terminated() { }

这些方法默认是空实现,我们可以根据需要来重写这些方法,比如记录日志、计算任务执行时间、收集线程池统计信息等。

自定义拒绝策略与RejectedExecutionHandler的应用

我们可以实现 RejectedExecutionHandler 接口来创建自己的拒绝策略。例如,我们可以写一个记录日志并且尝试重新提交任务的拒绝策略。

public class LogAndRetryRejectedExecutionHandler implements RejectedExecutionHandler {
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        if (!executor.isShutdown()) {
            System.out.println("Task rejected, retrying...");
            // Retry submission of the task
            executor.execute(r);
        } else {
            System.err.println("Task rejected and cannot retry, executor has shut down.");
        }
    }
}

将上面的自定义拒绝策略应用到 ThreadPoolExecutor 中:

ThreadPoolExecutor executor = new ThreadPoolExecutor(
    corePoolSize, maximumPoolSize, keepAliveTime, unit,
    workQueue, threadFactory,
    new LogAndRetryRejectedExecutionHandler()
);

实战案例:定制一个监控线程池运行状态的ThreadPoolExecutor

现在让我们来看一个实战案例,其中我们将创建一个监控线程池的 ThreadPoolExecutor 并实施自定义扩展。

public class MonitoringThreadPoolExecutor extends ThreadPoolExecutor {
    private final ConcurrentHashMap<String, Long> timing = new ConcurrentHashMap<>();

    // 构造方法和其他必要的方法

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        timing.put(String.valueOf(r.hashCode()), System.nanoTime());
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        Long startTime = timing.remove(String.valueOf(r.hashCode()));
        long taskDuration = System.nanoTime() - startTime;
        // Record or log the task duration
    }

    @Override
    protected void terminated() {
        super.terminated();
        // Do some final logging or resource release
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/577210.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

玩转手机在AidLux上安装宝塔面板

AidLux&#xff0c;手机不用刷机、不用root&#xff0c;直接在手机应用市场就能下载使用。 1.4G的应用包&#xff0c;看起来挺大的&#xff0c;那是因为内嵌了一套完整的AIoT应用开发和部署平台。 不仅Android手机可以玩&#xff0c;华为的Harmony系统也可以使用。 使用它最主…

websocket爬虫

人群看板需求分析 先找到策略中心具体的数据。对应数据库中的数据 看看接口是否需要被逆向 点开消费者细分&#xff0c;可以找到人群包&#xff08;人群名称&#xff09; 点击查看透视 label字段分类: 在这里插入图片描述 预测年龄&#xff1a;tagTitle 苹果id&#x…

【Unity基础】TextMeshPro组件学习过程记录

目录 1.TextMeshPro组件渲染创建文本RTL Editor字体Font Asset字体加粗&#xff0c;下划线等字体大小控制字体颜色控制字体渐变控制字符间隔、单词间隔、行间距、段落间距控制WrappingUV映射控制代码 2.TextMeshPro组件AssetFace InfoGeneration Setting 3.使用Dynamic SDF Sys…

从C语言到C++过渡篇(快速入门C++)

目录 引言 命名空间 C 的输入输出&#xff08;cout & cin&#xff09; 输出 cout 输入 cin 缺省参数 函数重载 知识要点讲解 函数重载底层 引用& 内联函数 auto & nullptr 结语 引言 很多同学从C语言到C的转变不知从何下手&#xff0c;今天这篇文章主…

【MRI重建】Cartesian采样中data consistency 常规数据一致性实现(pytorch)

关于 在MRI重建中,data consistency 可以帮助加快MRI图像重建和减少模型重建带来的重建误差。 工具 方法实现 x_rec: 重建图像, (batch_size,2,H,W) mask: 欠采样模版,(batch_size,2,H,W) k_un: 真实欠采样采集数据, (batch_size,2,H,W) torch.view_as_complex: 将实数数据…

【Linux】HTTP协议2

欢迎来到Cefler的博客&#x1f601; &#x1f54c;博客主页&#xff1a;折纸花满衣 &#x1f3e0;个人专栏&#xff1a;题目解析 &#x1f30e;推荐文章&#xff1a;承接上文【Linux】HTTP协议1 目录 &#x1f449;&#x1f3fb;HTTP方法&#x1f449;&#x1f3fb;HTTP状态码&…

Swift - 流程控制

文章目录 Swift - 流程控制if-else2. while3. for3.1 闭区间运算符3.2 半开区间运算符3.3 for - 区间运算符用在数组上3.3.1 单侧区间 3.4 区间类型3.5 带间隔的区间值 4. switch4.1 fallthrough4.2 switch注意点 5. 复合条件6. 区间匹配、元组匹配7. 值绑定8. where9. 标签语句…

webpack中mode、NODE_ENV、DefinePlugin、cross-env的使用

本文讲的全部知识点&#xff0c;都是和webpack相关的。如果你之前有疑问&#xff0c;那本文一定能帮你搞清楚。 问题来源一般是类似下面代码&#xff08;webpack.json中&#xff09;&#xff1a; "scripts": {"dev": "cross-env NODE_ENVdevelopmen…

BUUCTF_[BSidesCF 2020]Had a bad day

[BSidesCF 2020]Had a bad day 1.一看题目直接尝试文件包含 2.直接报错&#xff0c;确实是存在文件包含漏洞 http://307b4461-36d6-443f-879a-68803a57f721.node5.buuoj.cn:81/index.php?categoryphp://filter/convert.base64-encode/resourceindex strpos() 函数查找字符串…

StarRocks x Paimon 构建极速实时湖仓分析架构实践

Paimon 介绍 Apache Paimon 是新一代的湖格式&#xff0c;可以使用 Flink 和 Spark 构建实时 Lakehouse 架构&#xff0c;以进行流式处理和批处理操作。Paimon 创新性地使用 LSM&#xff08;日志结构合并树&#xff09;结构&#xff0c;将实时流式更新引入 Lakehouse 架构中。 …

Docker基本操作 容器相关命令

docker run:运行镜像; docker pause:暂停容器&#xff0c;会让该容器暂时挂起&#xff1b; docker unpauser:从暂停到运行; docker stop:停止容器&#xff0c;杀死进程; docker start:重新创建进程。 docker ps&#xff1a;查看所有运行的容器及其状态&#xff0c;默认只展…

WildCard开通GitHub Copilot

更多AI内容请关注我的专栏&#xff1a;《体验AI》 期待您的点赞&#x1f44d;收藏⭐评论✍ WildCard开通GitHub Copilot GitHub Copilot 简介主要功能工作原理 开通过程1、注册Github账号2、准备一张信用卡或虚拟卡3、进入github copilot页4、选择试用5、选择支付方式6、填写卡…

实现SpringMVC底层机制(一)

文章目录 1.环境配置1.创建maven项目2.创建文件目录3.导入jar包 2.开发核心控制器文件目录1.流程图2.编写核心控制器SunDispatcherServlet.java3.类路径下编写spring配置文件sunspringmvc.xml4.配置中央控制器web.xml5.配置tomcat&#xff0c;完成测试1.配置发布方式2.配置热加…

创建Spring Boot项目

选择Maven Archetype,之后再Archetype选择webapp 两个都打勾 这是当前的打勾 这个是以后都默认勾上 打开对应的路径&#xff0c;用vscode打开settings.xml 加入国内源 阿里云 若没有此文件可上网查找 若jar包出现问题&#xff0c;可在repostitory文件内全删除 之后在Maven刷…

巴特沃斯滤波原理及代码实现(matlab详细过程版)

目录 一、算法原理1、原理概述2、参考文献 二、代码实现三、结果展示 本文由CSDN点云侠原创&#xff0c;原文链接。如果你不是在点云侠的博客中看到该文章&#xff0c;那么此处便是不要脸的爬虫与GPT。 一、算法原理 1、原理概述 巴特沃斯滤波器&#xff08;Butterworth filt…

主成分分析(PCA)在 Java 中的简单应用

在数据科学的众多工具中&#xff0c;主成分分析&#xff08;PCA&#xff09;是一种非常重要的统计技术&#xff0c;用于数据降维和模式识别。它通过提取数据中的关键特征来简化数据结构&#xff0c;从而帮助我们更好地理解数据集的主要变化因素。本文将介绍如何在 Java 编程环境…

CARLA (I)--Ubuntu20.04 服务器安装 CARLA_0.9.13服务端和客户端详细步骤

目录 0. 说明0.1 应用场景&#xff1a;0.2 本文动机&#xff1a; 1. 准备工作2. 安装 CARLA 服务端软件【远程服务器】3. 安装 CARLA 客户端【远程服务器】3.1 .egg 文件安装&#xff1a;3.2 .whl 文件安装&#xff1a;3.3 从Pypi下载Python package 4. 运行服务端程序5. 运行客…

arcgis js 4.x加载SceneLayer并实现基于属性查询定位及高亮

一、代码 <!DOCTYPE html> <html> <head><meta charset"utf-8" /><meta name"viewport" content"widthdevice-width, initial-scale1,maximum-scale1,user-scalableno"><title></title><link rel…

北京车展创新纷呈,移远通信网联赋能

时隔四年&#xff0c;备受瞩目的2024&#xff08;第十八届&#xff09;北京国际汽车展览会于4月25日盛大开幕。在这场汽车行业盛会上&#xff0c;各大主流车企竞相炫技&#xff0c;众多全球首发车、概念车、新能源车在这里汇聚&#xff0c;深刻揭示了汽车产业的最新成果和发展潮…

神经网络的激活函数

目录 神经网络 激活函数 sigmoid 激活函数 tanh 激活函数 backward方法 relu 激活函数 softmax 激活函数 神经网络 人工神经网络&#xff08; Artificial Neural Network&#xff0c; 简写为ANN&#xff09;也简称为神经网络&#xff08;NN&#xff09;&#xff0c…