线程池MyThreadPoolExecutor
的执行流程和工作线程的执行逻辑如下:
线程池执行流程:
-
初始化:在创建
MyThreadPoolExecutor
实例时,会设置核心线程数、最大线程数、空闲存活时间以及工作队列。 -
提交任务:通过
execute(Runnable command)
方法提交一个任务到线程池。 -
任务判断:线程池会检查当前状态和任务的有效性。
-
核心线程判断:如果工作线程数量小于核心线程数,尝试创建一个新线程来执行任务。
-
工作队列判断:如果工作线程数量已达到核心线程数,尝试将任务放入工作队列。
-
最大线程判断:如果工作队列已满或无法添加任务,且工作线程数量小于最大线程数,创建一个新线程来执行任务。
-
拒绝策略:如果任务既不能放入工作队列,又不能创建新线程,则线程池会采取拒绝策略,通常会抛出一个异常。
工作线程执行逻辑:
-
工作线程创建:通过
addWorker(Runnable command, boolean core)
方法创建一个新的工作线程。 -
启动线程:新创建的工作线程启动,开始执行
Worker
类的run
方法。 -
获取任务:工作线程通过
runWorkers(Worker worker)
方法尝试获取任务。如果是核心线程,它会无限期地等待任务;如果是非核心线程,则会有一个超时时间。 -
执行任务:一旦获取任务,工作线程执行任务的
run
方法。 -
任务完成:任务执行完成后,工作线程记录已完成的任务数。
-
继续工作:工作线程尝试获取下一个任务,继续执行,直到没有任务可执行。
-
线程移除:当工作线程没有任务可执行时,它会尝试从工作线程集合中移除自己。
-
线程终止:如果线程池处于停止状态,工作线程将终止执行。
package com.threadpool;
import com.sun.media.sound.SF2InstrumentRegion;
import javafx.concurrent.Worker;
import java.util.HashSet;
import java.util.Scanner;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class MyThreadPoolExecutor {
private int corePoolCount;
private int maxPoolCount;
private int keepLiveTime;
BlockingDeque<Runnable> workQueue;
public MyThreadPoolExecutor(int corePoolCount, int maxPoolCount, int keepLiveTime, BlockingDeque<Runnable> workQueue) {
this.corePoolCount = corePoolCount;
this.maxPoolCount = maxPoolCount;
this.keepLiveTime = keepLiveTime;
this.workQueue = workQueue;
}
//当前线程的状态
private AtomicInteger status = new AtomicInteger();
private AtomicInteger workCount = new AtomicInteger();//工作线程的数量 都是线程安全的
private HashSet<Worker> workers = new HashSet<>();//工作线程
private final Lock lock = new ReentrantLock();//hashset不安全辅助锁
private final static Integer RUNNING = 0;
private final static Integer STOP = 1;
//当前完成的总任务数
private int completedTaskCount = 0;
public void execute(Runnable command) {
if (command == null) throw new NullPointerException();
if (status.get() == STOP) throw new RuntimeException();
if (status.get() == RUNNING) {
if (workCount.get() < corePoolCount) {
if (addWorker(command, true)) {//增加核心线程数
return;
}
}
if (workQueue.offer(command)) {//只是放到任务队列
return;
}
if (workCount.get() < maxPoolCount) {
if (addWorker(command, false)) {//创建新的线程
return;
}
}
throw new RuntimeException("拒绝策略");
}
}
private boolean addWorker(Runnable command, boolean core) {//创建新的线程加入工作者队列
if (status.get() == STOP) return false;
retry:
while (true) {
if (status.get() == STOP) return false;
while (true) {
if (workCount.get() >= (core ? corePoolCount : maxPoolCount)) {
return false;//可能其他线程也创建了
}
//创建工作线程
if (!casAddWorkerCount()) {//自增失败
continue retry;
}
break retry;
}
}
Worker worker = null;
try {
lock.lock();
worker = new Worker(command);
final Thread thread = worker.thread;
if (thread != null) {
if (thread.isAlive()) throw new IllegalArgumentException();
}
thread.start();
workers.add(worker);//向hashset中添加
} finally {
lock.unlock();
}
return true;
}
public void shutdown(){
lock.lock();
try {
setState(STOP);
interruptIdleWorkers();
}finally {
lock.unlock();
}
}
private void setState(Integer stop){
if (status.get()==stop) return;
while (true){
if (status.get() == stop){
break;
}
if (status.compareAndSet(status.get(),stop)){
break;
}
}
}
private void interruptIdleWorkers(){
lock.lock();
try {
for (Worker worker : workers) {
if (!worker.thread.isInterrupted()){
worker.thread.interrupt();
}
this.completedTaskCount += worker.completedTasks;
}
}finally {
lock.unlock();
}
}
private void runWorkers(Worker worker) {
if (worker == null) throw new NullPointerException();
try {
Runnable task = worker.firstTask;
Thread wt = worker.thread;
worker.firstTask = null;
while (task != null || (task = getTask()) != null) {//不断的取任务
if (wt.isInterrupted()) {
System.out.println("this thread is interrupted");
}
if (status.get() == STOP) {
System.out.println("this threadPool has already stopped");
}
task.run();//调用方法
task = null;
worker.completedTasks++;
}
}catch (Exception e){
e.printStackTrace();
} finally {
//没任务了
while (true) {
if (casDelWorkerCount()) {
completedTaskCount += worker.completedTasks;
break;
} else {
continue;
}
}
lock.lock();
try {
workQueue.remove(worker);
} finally {
lock.unlock();
}
}
}
private Runnable getTask() {
boolean timeout = false;
Runnable task = null;
try {
while (true) {
if (timeout){
return null;
}
if (status.get() == STOP) throw new NullPointerException();
//常驻工作队列
if (workCount.get() <= corePoolCount){
task = workQueue.take();//如果workQueue是空 那么会阻塞在这里 直到不为空 “保活”线程
}else{
task = workQueue.poll(keepLiveTime, TimeUnit.SECONDS);//如果在一定时间内拿不到直接返回null
}
if (task != null){
return task;
}
timeout = true;
}
}catch (InterruptedException exception){
exception.printStackTrace();
return null;
}
}
private boolean casAddWorkerCount() {
workCount.compareAndSet(workCount.get(), workCount.get() + 1);
return true;
}
private boolean casDelWorkerCount() {
workCount.compareAndSet(workCount.get(), workCount.get() - 1);
return true;
}
private final class Worker implements Runnable {//工作者线程
final Thread thread;
Runnable firstTask;
volatile int completedTasks;
private Worker(Runnable firstTask) {
this.firstTask = firstTask;
this.thread = new Thread(this);
this.completedTasks = 0;
}
@Override
public void run() {
runWorkers(this);//工作
}
}
}
关键逻辑点:
- 线程安全:使用
AtomicInteger
和ReentrantLock
来保证线程池中对工作线程数量和状态的修改是线程安全的。 - 任务队列:使用
BlockingDeque
作为任务队列,可以阻塞或超时地获取任务。 - 工作线程的管理:通过
HashSet
和Lock
来管理工作线程,保证在添加和移除工作线程时的线程安全。 - 状态管理:使用
AtomicInteger status
来跟踪线程池的状态,决定是否接受新任务。
注意:
- 代码中
casAddWorkerCount()
和casDelWorkerCount()
方法的实现似乎有误,因为它们没有使用compareAndSet()
方法,这可能导致工作线程数量的不一致。 shutdown()
方法中,应该在中断空闲线程后,也要尝试中断正在执行任务的工作线程,以确保线程池能够顺利关闭。- 代码中的
setState(Integer stop)
方法中,状态设置的逻辑可能存在竞态条件,因为它没有考虑状态在设置过程中可能被其他线程改变的情况。
在实际使用中,需要对这些潜在的问题进行修正和优化。