ThreadPoolExecutor源码阅读流程图

1.创建线程池

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

10–核心线程数
20–最大线程数
1 TimeUnit.MINUTES 非核心线程数存活时间 1分钟
new ArrayBlockingQueue(100) 阻塞队列类型 数组类型传入队列长度,需要无限长度可以使用链表类型LinkedBlockingDeque
线程工厂ThreadFactory和超出队列的策略ThreadPoolExecutor.AbortPolicy(抛出异常)暂时先按默认来。

ThreadPoolExecutor executor = new ThreadPoolExecutor
(10,20,1, TimeUnit.MINUTES,new ArrayBlockingQueue<Runnable>(100));

创建线程池的时候是不会启动线程的,需要在执行具体业务逻辑时候才会执行

2.ThreadPoolExecutor重要参数及方法介绍

//ctl Int原子操作类,32位,前三位代表线程池状态,后28位记录线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;

//线程池5种状态
//RUNNING状态【可提交新任务】和【可执行阻塞队列中的任务】
private static final int RUNNING    = -1 << COUNT_BITS;
//SHUTDOWN状态【不可提交新任务】提交新任务会抛出异常和【可执行阻塞队列中的任务】
private static final int SHUTDOWN   =  0 << COUNT_BITS;//执行shutDown()方法
//STOP状态【不可提交新任务】和【不可执行阻塞队列中的任务】
private static final int STOP       =  1 << COUNT_BITS;//执行shutDownNow()方法
//TIDYING状态 所有任务都终止了,线程池中也没有线程了,这样线程池的状态就会转为TIDYING,一旦达到此状态,就会调用线程池的terminated()
private static final int TIDYING    =  2 << COUNT_BITS;
//TERMINATED状态 terminated()执行完之后就会转变为TERMINATED
private static final int TERMINATED =  3 << COUNT_BITS;

//获取线程池状态    
private static int runStateOf(int c)     { return c & ~COUNT_MASK; }
//获取当前工作线程数
private static int workerCountOf(int c)  { return c & COUNT_MASK; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

2.1线程的五种状态

  • RUNNING状态【可提交新任务】和【可执行阻塞队列中的任务】 11100000 00000000 00000000 00000000
  • SHUTDOWN状态【不可提交新任务】提交新任务会抛出异常和【可执行阻塞队列中的任务】
    00000000 00000000 00000000 00000000
  • STOP状态【不可提交新任务】和【不可执行阻塞队列中的任务】 00100000 00000000 00000000 00000000
  • TIDYING状态 所有任务都终止了,线程池中也没有线程了,这样线程池的状态就会转为TIDYING,一旦达到此状态,就会调用线程池的terminated()
    00100000 00000000 00000000 00000000
  • TERMINATED状态 terminated()执行完之后就会转变为TERMINATED 01100000 00000000 00000000 00000000
private static boolean runStateLessThan(int c, int s) {
    return c < s;
}

private static boolean runStateAtLeast(int c, int s) {
    return c >= s;
}

private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}

/**
 * Attempts to CAS-increment the workerCount field of ctl.
 * 通过CAS来对当前工作线程数增加
 */
private boolean compareAndIncrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect + 1);
}

/**
 * Attempts to CAS-decrement the workerCount field of ctl.
 * 通过CAS来对当前工作线程数减少
 */
private boolean compareAndDecrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect - 1);
}

任务执行流程图

在这里插入图片描述

3.提交任务execute

executor.execute(new Runnable() {
    @Override
    public void run() {
        //业务代码
    }
});
   public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        //获取ctl 初始值为ctlOf(RUNNING, 0) 运行状态,工作线程数0
        int c = ctl.get();
        //计算获取工作线程数<核心线程数
        if (workerCountOf(c) < corePoolSize) {
        	//当前command增加为核心工作线程,添加失败下面会进行入队操作
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //判断线程池状态(判断是因为防止别的线程把状态进行修改)
        //workQueue.offer(command) 加入队列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //再对线程池状态二次检查,如果不是running则移除队列
            if (! isRunning(recheck) && remove(command))
            	//拒绝策略,默认抛出异常
                reject(command);
            else if (workerCountOf(recheck) == 0)
            	//这里就是执行队列中的任务,下面addWorker里面有体现和讲解
                addWorker(null, false);
        }
        //线程池达到最大了的maxPool,添加失败执行拒绝策略
        else if (!addWorker(command, false))
            reject(command);
    }

3.1 submit

Future<?> submit = executor.submit(new Runnable() {
    @Override
    public void run() {
        //业务代码
    }
});

这个里面执行了execute,多了一个返回Future

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

4.addWorker

这里不同版本jdk有差异

private boolean addWorker(Runnable firstTask, boolean core) {
    //类似于goto
    retry:
    for (int c = ctl.get();;) {
        // 线程池状态>=SHUTDOWN 并且 线程池状态>=STOP或者传入的任务!=null或者阻塞队列为空则返回
        if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP)
                || firstTask != null
                || workQueue.isEmpty()))
            return false;

        for (;;) {
        	//
        	//判断工作的线程是否超过核心线程数或者最大线程数,addWork时候会传入core
            if (workerCountOf(c)
                >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                return false;
                //如果没有超过核心线程数或者最大线程数,这里通过cas对工作线程数量增加,多个竞争失败的话循环cas操作
            if (compareAndIncrementWorkerCount(c))
                break retry;//跳出外层循环
            c = ctl.get();  // Re-read ctl
            //如果线程池状态>=SHUTDOWN 跳到外层循环继续执行
            if (runStateAtLeast(c, SHUTDOWN))
                continue retry;
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
    	//新建任务Worker,会利用线程工厂去创建一个线程默认的是
    	/**
    	  *  Worker(Runnable firstTask) {
    	  *  //这个状态有0 -1 1 创建时候为-1,运行时候改为1,运行结束改为0
    	  *  // 正常应该是acquire时候+1  release时候-1 这里重写过方法
          *  setState(-1); 
          *  this.firstTask = firstTask;
          *  this.thread = getThreadFactory().newThread(this); }
    	**/
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
        	//这里的mainLock是对Workers进行操作的,防止出现并发问题
        	//用锁是因为private final HashSet<Worker> workers = new HashSet<>(); 这个不是线程安全的
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int c = ctl.get();
				//线程池如果是RUNNING状态
				// 或者状态<STOP并且传入的任务为空 这个是从阻塞队列里面拿任务执行
                if (isRunning(c) ||
                    (runStateLessThan(c, STOP) && firstTask == null)) {
                    if (t.isAlive()) // 如果线程已经在运行,就抛出异常
                        throw new IllegalThreadStateException();
                    //添加任务到工作线程的容器里
                    workers.add(w);
                    int s = workers.size();
                    //largestPoolSize 这个是记录工作线程数,没看到具体作用,但既然有肯定是有用的
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            //这里才到线程运行
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
    	//这里类似于一个回滚操作,异常情况会对worker进行移除,修改ctl
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

5.Worker相关

Worker类

5.1 构造器

   Worker(Runnable firstTask) {
    //这个状态有0 -1 1 创建时候为-1,运行时候改为1,运行结束改为0
   // 正常应该是acquire时候+1  release时候-1 这里重写过方法
   setState(-1); 
   this.firstTask = firstTask;
   this.thread = getThreadFactory().newThread(this); 
  }

5.2 tryAcquire和tryRelease

重写过从+1,-1变成cas为1和设置为0,0代表执行完任务空闲,1代表在执行任务,里面有个

protected boolean tryAcquire(int unused) {
    if (compareAndSetState(0, 1)) {
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
    }
    return false;
}
protected boolean tryRelease(int unused) {
    setExclusiveOwnerThread(null);
    setState(0);
    return true;
}

5.3 runWork

public void run() {runWorker(this);}
    
    final void runWorker(Worker w) {
    	//获取当前工作线程
        Thread wt = Thread.currentThread();
        //获取需要执行的任务
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); 
        boolean completedAbruptly = true;
        try {
        	//执行任务不为空 或者 队列中获取到了需要执行的任务
        	//如果没有获取到getTask是会阻塞的
            while (task != null || (task = getTask()) != null) {
                w.lock();
				//如果线程池状态>=STOP 并且当前线程没有被打断
				//线程池被打断并且线程池状态>=STOP 并且当前线程没有被打断
				//这里是对线程池状态作验证,如果状态发生了变更则要去尝试中断线程
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                //执行前切面 可以用来记录工作中线程和计算空闲线程,Tomcat线程池有这个行为
                    beforeExecute(wt, task);
                    try {
                        task.run();
                        //执行后或异常切面
                        afterExecute(task, null);
                    } catch (Throwable ex) {
                        afterExecute(task, ex);
                        throw ex;
                    }
                } finally {
                    task = null;
                    //执行任务数
                    w.completedTasks++;
                    w.unlock();
                }
            }
            //正常执行才会为false,表示正常退出
            completedAbruptly = false;
        } finally {
        	//执行失败completedAbruptly为true
            processWorkerExit(w, completedAbruptly);
        }
    }    

5.4 getTask()

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        for (;;) {
            int c = ctl.get();
            // 线程池状态不为RUNNING,队列为空就不需要处理任务了,直接返回空,上层runWorker也会正常退出循环
            if (runStateAtLeast(c, SHUTDOWN)
                && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            //工作中的线程数量
            int wc = workerCountOf(c);
            // 核心线程是否超时回收标志,可以通过executor.allowCoreThreadTimeOut(true);设置
            //工作线程数量>核心线程数量
            //用来判断是否是无限阻塞
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            //大于最大线程数或者超时 并且 工作线程数量>1或者队列为空 则ctl减少
   		    // && (wc > 1 || workQueue.isEmpty()) 这个判断就是要留下至少一个线程去处理队列中的任务
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            try {
            //超时阻塞和无限阻塞
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                //超时,循环时会去处理返回null
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

6.shutdown()

shutdown会把线程池状态修改为SHUTDOWN,提交新任务会抛出异常,但会继续执行队列中的任务。

  public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        //修改状态为SHOUTDOWN,并修改ctl
        advanceRunState(SHUTDOWN);
        //这里会中断工作中的线程
        interruptIdleWorkers();
        onShutdown(); // 空方法
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}
//中间还有个方法,传入的onlyOne为false
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
    	//遍历Workers,遍历前加锁
        for (Worker w : workers) {
            Thread t = w.thread;
            //把没有被打断并且没有在工作中的线程打断
            //获取到锁说明线程是空闲的,没有获取到锁说明在执行任务
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

7.shutdownNow()

shutdownNow会把线程池状态修改为STOP,提交新任务会抛出异常,也不执行队列中的任务。但会返回队列中的任务。

List<Runnable> runnables = executor.shutdownNow();
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
		//修改状态为STOP,并修改ctl
        advanceRunState(STOP);
        //中断线程
        interruptWorkers();
        //返回队列中的任务
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    //最后一个线程结束时候会把线程池状态改为TERMINATED
    tryTerminate();
    return tasks;
}
private void interruptWorkers() {
    //中断所有工作线程
    for (Worker w : workers)
        w.interruptIfStarted();
}
      void interruptIfStarted() {
        Thread t;
        //getState() >= 0 代表空闲线程和正常执行中的线程,不为空并且没有被打断的就执行打断
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/13936.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

巧用千寻位置GNSS软件| 线路施工放样应用技巧

线路施工放样主要是解决线路工程和水利工程施工中&#xff0c;线路及渠道中线和边坡施工放样编辑的专用程序。千寻位置GNSS软件中完成线路施工放样可按照下述步骤操作。 点击【测量】->【线路施工放样】&#xff0c;选择一条线路放样&#xff0c;如图 5.6-1所示。 图 5.6-1…

IT人员选择光缆的五大原因

基于铜和光纤的信号都会受到衰减&#xff0c;或者波形信号随着距离的推移而减弱。然而&#xff0c;光纤电缆可以在更长的距离上传输数据。事实上&#xff0c;差异是巨大的。 当构建需要长距离、高速和/或高带宽连接的网络时&#xff0c;毫无疑问&#xff1a;光纤电缆会赢得胜利…

在更高的起点创业 专访Aqara重庆服务商,探问「经营秘籍」

从小众产品到大众选择&#xff0c;智能家居在短短几年内迅速崛起&#xff0c;成为各大Shopping Mall的引流神器。而作为一种新消费&#xff0c;智能家居产品也为品牌和渠道在获客方面提出了新的考验。相比传统建材&#xff0c;智能家居如何快速引流&#xff0c;促进成交&#x…

【MySQL】如何使用MySQL锁(全局锁、表级锁、行级锁)?

文章目录 概述一、全局锁介绍语法特点 二、表级锁介绍表锁元数据锁意向锁 三、行级锁介绍行锁间隙锁&临键锁 概述 锁是计算机协调多个进程或线程并发访问某一资源的机制。在数据库中&#xff0c;除传统的计算资源&#xff08;CPU、RAM、I/O&#xff09;的争用以外&#xf…

怎么把avi文件转换成mp4视频格式,4个高能方法

怎么把avi文件转换成mp4视频格式&#xff1f; 当您下载到avi格式的视频文件时&#xff0c;您可能会选择将其转换为MP4格式的文件。 avi是一种由微软开发的多媒体容器格式&#xff0c;尽管现在已经被认为是老旧的技术&#xff0c;但由于其简单易懂的开发API和Windows的通用性&am…

状态机编程

//定义的枚举 typedef enum { KEY_UP 1, //按键按下 Edge_Lead2, //前沿抖动 KEY_DOWN 3, //按键松开 Edge_Back4, //后沿抖动 } KEY_Status; 主函数&#xff1a; #include "stm32f4xx.h" #include "led.h" #include "delay.h" #include "…

JavaScript—javaEE

文章目录 1.关于JavaScript2.引入的方式3.输入输出4.语法4.1变量4.2基本数据类型4.3运算符4.4数组4.5函数4.6对象 5.dom5.1获取元素5.2操作元素5.3表单控件5.4样式&#xff1a;style属性5.5模仿和服务端交互 6.ajax6.1概念6.2作用6.3Ajax代码6.4Ajax发get请求6.5Ajax发post请求…

学成在线笔记+踩坑(4)——【媒资管理模块】上传图片,Nacos+Gateway+MinIO

导航&#xff1a; 【黑马Java笔记踩坑汇总】JavaSEJavaWebSSMSpringBoot瑞吉外卖SpringCloud黑马旅游谷粒商城学成在线牛客面试题 目录 1. 媒资管理模块简介 1.1 模块介绍 1.2 业务流程 1.2.1 上传课程图片 1.2.2 上传视频 1.2.3 处理视频 1.2.4 审核媒资 1.2.5 绑定媒…

Redis原理

Redis原理 数据结构 动态字符串SDS Redis中key是字符串&#xff0c;value是字符串或字符串集合。不过redis没有直接使用C语言的字符串。因为C中字符串存在问题&#xff1a;①获取字符串长度需要运算②非二进制安全③不可修改。 //c语言&#xff0c;声明字符串&#xff1a; …

字节岗位薪酬体系曝光,看完感叹:不服真不行

曾经的互联网是PC的时代&#xff0c;随着智能手机的普及&#xff0c;移动互联网开始飞速崛起。而字节跳动抓住了这波机遇&#xff0c;2015年&#xff0c;字节跳动全面加码短视频&#xff0c;从那以后&#xff0c;抖音成为了字节跳动用户、收入和估值的最大增长引擎。 自从字节…

媒体宣传的优势与重要性

传媒如春雨&#xff0c;润物细无声&#xff0c;大家好&#xff0c;我是51媒体网胡老师。 媒体宣传日益成为企业和品牌宣传推广的重要手段&#xff0c;媒体的宣传报道更有权威性&#xff0c;能够帮助品牌进行背书&#xff0c;更有权威性&#xff0c;另外媒体的报道在搜索引擎中…

智能文案改写工具-智能改写工具免费

智能写作机器人 智能写作机器人&#xff0c;这是一种让人类写作变得更加简单的创新技术。它的出现&#xff0c;为内容生产领域带来了巨大的进步&#xff0c;不仅提高了人们的写作效率&#xff0c;还让优质的内容更容易被产生和共享。现在&#xff0c;让我们来了解一下智能写作…

Windows环境下NVM安装后Node/NPM命令无法使用

问题&#xff1a;Windows环境下安装nvm后&#xff0c;使用nvm安装node&#xff0c;无法使用node相关命令。 解决方案&#xff1a;注意安装的时候有两个路径&#xff0c;第一个是nvm所在的路径&#xff0c;第二个是nodejs所在的路径&#xff0c;大家需要在对应的目录下找到路径…

Python爬虫实战——获取电影影评

Python爬虫实战——获取电影影评 前言第三方库的安装示例代码效果演示结尾 前言 使用Python爬取指定电影的影评&#xff0c; 注意&#xff1a;本文仅用于学习交流&#xff0c;禁止用于盈利或侵权行为。 操作系统&#xff1a;windows10 家庭版 开发环境&#xff1a;Pycharm Co…

Linux嵌入式uboot使用tftp网络启动加载zImage、设备树

文章目录 一、前言二、Linux U-boot 相关命令&#xff08;1&#xff09;help 命令&#xff08;2&#xff09;printenv 命令&#xff08;3&#xff09;setenv 函数&#xff08;4&#xff09;saveenv 函数 三、tftp启动linux内核步骤&#xff08;1&#xff09;进入u-boot模式&…

vue:生成二维码 qrcode、vue-qr(二维码中间可带logo)

一、方法一 qrcode qrcode - npm 1.1、安装 yarn add qrcode 1.2、页面引入 import QRCode from qrcode; 1.3、方法里边使用 getQRCodeUrl(){ QRCode.toDataURL(hello world,{color: {dark:"#010599FF",light:"#FFBF60FF"}}).then((url) > {// 获…

基于Html+Css的图片展示25

准备项目 项目开发工具 Visual Studio Code 1.44.2 版本: 1.44.2 提交: ff915844119ce9485abfe8aa9076ec76b5300ddd 日期: 2020-04-16T16:36:23.138Z Electron: 7.1.11 Chrome: 78.0.3904.130 Node.js: 12.8.1 V8: 7.8.279.23-electron.0 OS: Windows_NT x64 10.0.19044 项目…

Linux+云服务器

目录 前言 一、Linux介绍 二、Linux 环境搭建 2.1 云服务器 2.2 XShell 终端 三、Linux 常用命令 3.1操作目录的命令 3.1.1 ls 【list的缩写】 双击某个目录 3.1.2 pwd 【print working directory的缩写】打印当前所处地址 3.1.3 cd 【change directory的缩写】切…

yolov5训练自己的目标检测模型

yolov5训练自己的目标检测模型 1.克隆项目并配置环境 1.1克隆项目 进入GitHub下载yolov5源码 点此进入 选择分支v5.0&#xff0c;并下载源码 anaconda激活相应环境 activate pytorch进入项目存放的地址 E: cd yolov5-master1.2 yolov5项目结构 ├── data&#xff1a;主…

Java版本工程管理系统软件源码 自主研发,工程行业适用

Java版工程项目管理系统 Spring CloudSpring BootMybatisVueElementUI前后端分离 功能清单如下&#xff1a; 首页 工作台&#xff1a;待办工作、消息通知、预警信息&#xff0c;点击可进入相应的列表 项目进度图表&#xff1a;选择&#xff08;总体或单个&#xff09;项目显示…