【一文详解】Java多线程和并发知识点详细总结【万字总结】

Java并发编程

并发编程的三个特性

原子性

一次操作或者多次操作,要么所有的操作全部都得到执行并且不会受到任何因素的干扰而中断,要么都不执行。

在 Java 中,可以借助synchronized、各种 Lock 以及各种原子类实现原子性。

synchronized 和各种 Lock 可以保证任一时刻只有一个线程访问该代码块,因此可以保障原子性。各种原子类是利用 CAS (compare and swap) 操作(可能也会用到 volatile或者final关键字)来保证原子操作。

可见性

当一个线程对共享变量进行了修改,那么另外的线程都是立即可以看到修改后的最新值。

在 Java 中,可以借助synchronizedvolatile 以及各种 Lock 实现可见性。

如果我们将变量声明为 volatile ,这就指示 JVM,这个变量是共享且不稳定的,每次使用它都到主存中进行读取。

有序性

由于指令重排序问题,代码的执行顺序未必就是编写代码时候的顺序。

我们上面讲重排序的时候也提到过:

指令重排序可以保证串行语义一致,但是没有义务保证多线程间的语义也一致 ,所以在多线程下,指令重排序可能会导致一些问题。

在 Java 中,volatile 关键字可以禁止指令进行重排序优化。

进程和线程

简介

进程是what?

进程就是应⽤程序在内存中分配的空间,也就是正在运⾏的程序。

各个进程之间互 不⼲扰,同时进程保存着程序每⼀个时刻运⾏的状态。

为什么需要进程?

⼈们对于计算机的性能要求越来越⾼,⽽批处理操作系统(把⼀系列需要操作的指令写下来,形成⼀个清单,⼀次性 交给计算机)的瓶颈在于内存中只存在⼀个程序,那么内存中能不能存在 多个程序呢?于是,提出了进程的概念,使⽤进程+CPU时间⽚轮转⽅式的操作系统,在宏观上看起来同⼀时间段执⾏多个 任务,换句话说,进程让操作系统的并发成为了可能。同时,进程并发从宏观上看有多个 任务在执⾏,但在事实上,对于单核CPU来说,任意具体时刻都只有⼀个任务在占 ⽤CPU资源。

为什么需要线程?

虽然进程的出现,使得操作系统的性能⼤⼤提升,但是随着时间的推移,⼈们并不 满⾜⼀个进程在⼀段时间只能做⼀件事情,如果⼀个进程有多个⼦任务时,只能逐 个得执⾏这些⼦任务,很影响效率。

比如: 当使⽤杀毒软件中的扫描病毒功能时,在扫 描病毒结束之前,⽆法使⽤杀毒软件中清理垃圾功能,这⽆法满⾜⼈ 们的要求。

那么能不能让这些进程的⼦任务同时执⾏呢?于是⼈们⼜提出了线程的概念,让⼀个线程 执⾏⼀个⼦任务,这样⼀个进程就包含了多个线程,每个线程负责⼀个单独的⼦任 务。

使⽤线程之后,事情就变得简单多了。

当⽤户使⽤扫描病毒功能时,就让扫 描病毒这个线程去执⾏。同时,如果⽤户⼜使⽤清理垃圾功能,那么可以先 暂停扫描病毒线程,先响应⽤户的清理垃圾的操作,让清理垃圾这个线程去 执⾏。响应完后再切换回来,接着执⾏扫描病毒线程。

进程和线程的提出极⼤的提⾼了操作提供的性能。

进程让操作系统的并发性 成为了可能,⽽线程让进程的内部并发成为了可能。

多进程的⽅式也可以实现并发,为什么我们要使⽤多线程?

  • 进程间的通信⽐较复杂,⽽线程间的通信⽐较简单,通常情况下,我们需要使 ⽤共享资源,这些资源在线程间的通信⽐较容易。
  • 进程是重量级的,⽽线程是轻量级的,故多线程⽅式的系统开销更⼩。

多线程?

CPU通过为每个线程分配CPU时间⽚来实现多线程机制。

CPU通过时间⽚分配算 法来循环执⾏任务,当前任务执⾏⼀个时间⽚后会切换到下⼀个任务。 但是,在切换前会保存上⼀个任务的状态,以便下次切换回这个任务时,可以再加 载这个任务的状态。所以任务从保存到再加载的过程就是⼀次上下⽂切换。

上下⽂切换通常是计算密集型的,意味着此操作会消耗⼤量的 CPU 时间,故线程 也不是越多越好。

如何减少系统中上下⽂切换次数,是提升多线程性能的⼀个重点 课题。

进程和线程的区别?

  • 进程是⼀个独⽴的运⾏环境,⽽线程是在进程中执⾏的⼀个任务。
  • 进程是操作系统进⾏资源分配的基本单位,⽽线程是操作系 统进⾏调度的基本单位,即CPU分配时间的单位 。
  • 进程单独占有⼀定的内存地址空间,所以进程间存在内存隔离,数据是分开 的,数据共享复杂但是同步简单,各个进程之间互不⼲扰;⽽线程共享所属进 程占有的内存地址空间和资源,数据共享简单,但是同步复杂。
  • 可靠性:进程单独占有⼀定的内存地址空间,⼀个进程出现问题不会影响其他进程,不 影响主程序的稳定性,可靠性⾼;⼀个线程崩溃可能影响整个程序的稳定性, 可靠性较低。
  • 进程单独占有⼀定的内存地址空间,进程的创建和销毁不仅需要保存寄存器和 栈信息,还需要资源的分配回收以及⻚调度,开销较⼤;线程只需要保存寄存 器和栈信息,开销较⼩。

线程的优先级

Java中线程优先级可以指定,范围是1~10。

但是并不是所有的操作系统都⽀持10 级优先级的划分(⽐如有些操作系统只⽀持3级划分:低,中,⾼),Java只是给 操作系统⼀个优先级的参考值,线程最终在操作系统的优先级是多少还是由操作系 统决定。

Java默认的线程优先级为5,线程的执⾏顺序由调度程序来决定,线程的优先级会 在线程被调⽤之前设定。

通常情况下,⾼优先级的线程将会⽐低优先级的线程有更⾼的⼏率得到执⾏,而在 优先级相同的情况下,按照“先到先得”的原则。

//使⽤⽅法 Thread 类的 setPriority() 实例⽅法来设定线程的优先级
public class Demo {
    public static void main(String[] args) {
        Thread a = new Thread();
        System.out.println("我是默认线程优先级:"+a.getPriority());
        Thread b = new Thread();
        b.setPriority(10);
        System.out.println("我是设置过的线程优先级:"+b.getPriority());
    }
}

//输出:
我是默认线程优先级:5
我是设置过的线程优先级:10

既然有1-10的级别来设定了线程的优先级,可以在业务实现的时候,采⽤这种⽅法来指定⼀些线程执⾏的先后顺序吗?

答: Java中的优先级来说不是特别的可靠,Java程序中对线程所设置的优先级只是给 操作系统⼀个建议,操作系统不⼀定会采纳。⽽真正的调⽤顺序,是由操作系统的 线程调度算法决定的。

当线程和线程组的优先级不⼀致的时候将会怎样呢?

答: 如果某个线程优先级⼤于线程所在线程组的最⼤优先级,那么该线程的优先 级将会失效,取⽽代之的是线程组的最⼤优先级。

线程组

Java中⽤ThreadGroup来表示线程组,我们可以使⽤线程组对线程进⾏批量控制。

ThreadGroup和Thread的关系就如同他们的字⾯意思⼀样简单粗暴,每个Thread必 然存在于⼀个ThreadGroup中,Thread不能独⽴于ThreadGroup存在。ThreadGroup管理着它下⾯的Thread。

执⾏main() ⽅法线程的名字是main,如果在new Thread时没有显式指定,那么默认将⽗线程 (当前执⾏new Thread的线程)线程组设置为⾃⼰的线程组。

如下:

public class Demo {
    public static void main(String[] args) {
        Thread testThread = new Thread(() -> {
            System.out.println("testThread当前线程组名字:" +
                               Thread.currentThread().getThreadGroup().getName());
            System.out.println("testThread线程名字:" +
                               Thread.currentThread().getName());
        });
        testThread.start();
        System.out.println("执⾏main⽅法线程名字:" + Thread.currentThread().getName());
    }
}

//输出结果
执⾏main⽅法线程名字:main
testThread当前线程组名字:main
testThread线程名字:Thread-0

总结来说,线程组是⼀个树状的结构,每个线程组下⾯可以有多个线程或者线程组。

线程组可以起到统⼀控制线程的优先级和检查线程的权限的作⽤。

线程组的常用方法:
  1. 获取当前线程组的名称:

    Thread.currentThread().getThreadGroup().getName();
    
  2. 线程组统一异常处理:

    package com.func.axc.threadgroup;
    
    public class ThreadGroupDemo {
        public static void main(String[] args) {
            //匿名内部类写法
            ThreadGroup threadGroup1 = new ThreadGroup("group1") {
                // 继承ThreadGroup并重新定义以下⽅法
                // 在线程成员抛出unchecked exception
                // 会执⾏此⽅法
                public void uncaughtException(Thread t, Throwable e) {
                    System.out.println(t.getName() + ": " + e.getMessage());
                }
            };
            // 创建线程: (匿名内部类写法)这个线程是threadGroup1的⼀员
            Thread thread1 = new Thread(threadGroup1, new Runnable() {
                public void run() {
                    // 抛出unchecked异常
                    throw new RuntimeException("测试异常");
                }
            });
            thread1.start();
        }
    }
    

默认线程和守护线程

每个Java程序都有⼀个默认的主线程,就是通过JVM启动的第⼀个线程main线程`。

有⼀种线程称为守护线程(Daemon),守护线程默认的优先级⽐较低

注意:

  • 不要忽略每一个Java程序都有一个默认的主线程,这是必备的!!
  • 如果某线程是守护线程,那如果所有的⾮守护线程结束,这个守护线程也会 ⾃动结束。
  • ⼀个线程默认是⾮守护线程,要设置成守护线程(Daemon)可以通过Thread类的setDaemon(boolean on) 来设置。

创建线程的方式

三种:

(1)继承Thread类,重写run方法【run方法无返回值】

(2)实现Runnable接口的run方法【run方法无返回值】

(3)实现Callable接口的call方法【call方法有返回值】

Thread类

Thread 类是 Java 多线程编程中的关键类之一,它用于创建和操作线程。

每个 Java 程序都至少有一个执行线程,即 main 线程,但是通过创建 Thread 对象,可以在程序中启动新线程,使多个任务并发执行。

Thread 类的一些重要方法和特性:

1. 构造函数:

  • Thread() 创建一个新的线程对象。

    这是最简单的构造函数,用于创建一个新的线程对象。通过这个构造函数创建的线程,如果没有显示设置 Runnable,其默认 run 方法是空的,不执行任何操作。如果需要在新线程中执行特定的任务,需要通过其他方式设置任务。

  • Thread(Runnable target) 创建一个带有指定 Runnable 目标的线程对象。

    这个构造函数接受一个 Runnable 接口的实现作为参数,用于创建一个带有指定 Runnable 目标的线程对象。

    通过这个构造函数创建的线程,其 run 方法将调用传入的 Runnable 对象的 run 方法。

    Runnable myRunnable = () -> {
        System.out.println("MyRunnable is running");
    };
    
    Thread thread = new Thread(myRunnable);
    
  • Thread(String name)Thread(Runnable target, String name) 创建一

    这两个构造函数用于创建一个新的线程对象,并指定线程的名称。

    指定名称的目的是为了更好地标识和调试线程。如果不指定名称,系统会自动生成一个默认的名称。

这些构造函数提供了不同的创建线程的方式,可以根据具体的需求选择适合的构造函数。通常来说,如果线程的任务比较简单,可以使用 Thread() 构造函数;如果需要执行特定的任务,可以使用带有 Runnable 的构造函数;如果需要更好地标识和调试线程,可以使用带有名称的构造函数。

2. 启动调度线程:

  • start() 启动调度线程,调用创建的新线程的 run 方法。

注意:

调用start方法和直接调用run方法的区别?

只有调用了start()方法,才会表现出多线程的特性【不同线程的run()方法里面的代码交替执行】。

如果只是调用run()方法,那么代码还是同步执行的,必须等待一个线程的run()方法里面的代码全部执行完毕之后,另外一个线程才可以执行其run()方法里面的代码。

调用start方法后线程就会马上执行?

虚拟机会先为我们创建⼀个线程,然 后等到这个线程第⼀次得到时间⽚时再调⽤run()⽅法。所以线程不一定是马上执行。

可以多次调用start方法?

不可多次调⽤start()⽅法。在第⼀次调⽤start()⽅法后,再次调⽤start() ⽅法会抛出异常。

3. 线程的生命周期:

  • sleep(long millis) Thread类的静态方法,使当前线程休眠指定的毫秒数。

    这⾥需要强调⼀下

    • sleep⽅法是不会释放当前的锁的,⽽wait(Object类的wait方法)⽅法会。这是最 常⻅的⼀个多线程⾯试题。
    • wait可以指定时间,也可以不指定;⽽sleep必须指定时间。
    • wait释放cpu资源,同时释放锁;sleep释放cpu资源,但是不释放锁,所以易死锁。
    • wait必须放在同步块或同步⽅法中,⽽sleep可以放在任意位置。
  • join()join(long millis)一个线程调用另一个线程的 join 方法时,它将等待另一个线程执行完成.

    join() 方法是 Thread 类提供的一个方法,用于让一个线程等待另一个线程执行完成,内部调用的是Object类的wait方法。

    当一个线程调用另一个线程的 join 方法时,它将会被阻塞,直到另一个线程执行完成才会继续执行。join() 方法的重载形式 join(long millis) 允许设置最大等待时间,如果另一个线程在指定的时间内没有执行完成,等待的线程将会被唤醒。

    join()⽅法是Thread类的⼀个实例⽅法

    join()的作⽤是让当前线程陷⼊“等待”状态,等 join的这个线程执⾏完成后,再继续执⾏当前线程。

    有时候,主线程创建并启动了⼦线程,如果⼦线程中需要进⾏⼤量的耗时运算,主 线程往往将早于⼦线程结束之前结束。 如果主线程想等待⼦线程执⾏完毕后,获得⼦线程中的处理完的某个数据,就要⽤ 到join⽅法了。

    下面是使用 join()join(long millis) 的示例:

    public class JoinExample {
    	//client
        public static void main(String[] args) {
            //创建线程,创建线程类示例对象:调用Thread类的构造函数,传入Runnable接口的实现
            Thread thread1 = new Thread(() -> {
                System.out.println("Thread 1 is running");
                try {
                    Thread.sleep(2000); // 模拟线程执行一段时间
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Thread 1 is finished");
            });
    		//创建线程,创建线程类示例对象:调用Thread类的构造函数,传入Runnable接口的实现
            Thread thread2 = new Thread(() -> {
                System.out.println("Thread 2 is running");
                try {
                    Thread.sleep(3000); // 模拟线程执行一段时间
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Thread 2 is finished");
            });
    		//启动调度线程
            thread1.start();
            thread2.start();
    		
            try {
                thread1.join(); // 由于当前执行的是主线程,主线程调用thread1的join方法,那么主线程等待 thread1 执行完成
                System.out.println("Thread 1 has completed");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            try {
                thread2.join(1500); // 主线程等待 thread2 最多 1500 毫秒(1.5秒)
                System.out.println("Thread 2 has completed or timed out");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            System.out.println("Main thread is finished");
        }
    }
    

    在这个例子中,main 方法启动了两个线程 thread1thread2,并分别调用了它们的 join 方法。主线程在调用 thread1.join() 后会等待 thread1 执行完成,然后继续执行。而在调用 thread2.join(1500) 后,主线程最多等待 1500 毫秒,如果 thread2 在这段时间内执行完成,主线程将继续执行,否则将不再等待。

  • yield():Thread类的静态方法.yield在英语⾥有放弃的意思,同样,这⾥的yield()指的是当前线程愿 意让出对当前处理器的占⽤。

    这⾥需要注意的是,就算当前线程调⽤了yield() ⽅法,cpu在调度的时候,也还有可能继续运⾏这个线程的;

    所以,yield 方法的调用并不保证线程切换,它只是一个提示给调度器的建议。

4.当前线程:

currentThread():Thread类的静态⽅法,返回对当前正在执⾏的线程对象的引⽤;

5. 线程状态:

  • getState() 获取新创建线程的状态。
  • isAlive() 判断新创建线程是否处于活动状态。

6. 线程优先级:

  • setPriority(int priority)getPriority() 设置和获取新创建线程的优先级。
创建线程类:继承Thread类,重写run方法
//继承Thread类重写run方法,来创建线程类和线程实例对象
public class Demo {
    public static class MyThread extends Thread {
        @Override
        //run方法无返回值
        public void run() {
            System.out.println("MyThread");
        }
    }
    
    //client
    public static void main(String[] args) {
        Thread myThread = new MyThread();//new
        myThread.start();//启动线程
    }
}

注意:

  • 要调⽤ start() ⽅法后,该线程才算启动(但不一定马上运行)!

  • 我们在程序⾥⾯调⽤了start()⽅法后,虚拟机会先为我们创建⼀个线程,然 后等到这个线程第⼀次得到时间⽚时再调⽤run()⽅法。 不可多次调⽤start()⽅法。在第⼀次调⽤start()⽅法后,再次调⽤start() ⽅法会抛出异常。

Runnable接口

what?

先看⼀下 Runnable 接⼝(JDK 1.8 +):

@FunctionalInterface //函数式接口(只有一个方法)
public interface Runnable {
 //run方法无返回值
 public abstract void run();
}
创建线程类:实现Runnable接口run方法

示例如下:

public class Demo {
    //线程类: 实现Runnable接口的run方法
    public static class MyThread implements Runnable {
        @Override
        public void run() {
            System.out.println("MyThread");
        }
    }
    public static void main(String[] args) {
        //实例化一个线程类,并启动线程调度
        new MyThread().start();
        
        
        
    }
}

同时,由于Runnable接口是一个函数式接口,所以创建线程时可以使⽤Java 8的函数式编 来简化代码,所以上面示例等价于如下:

public class Demo{
    public static void main(String[] args){
        // new Thread(Runnable target):实例化一个线程对象
        // Java 8 函数式编程,可以省略MyThread类
        new Thread(() -> {
            System.out.println("MyThread");
        }).start();
    }
}

继承Thread类和实现Runnable接口的选择

  • 如果使⽤线程时不需要使⽤Thread类的诸多⽅法,显然使⽤Runnable接⼝更 为轻量。
  • 实现接口的方式比继承类的方式更灵活,也能减少程序类之间的耦合度,面向接口编程也是设计模式6大原则的核心。
  • 同时,由于Runnable接口是一个函数式接口,所以创建线程时可以使⽤Java 8的函数式编程来简化代码

所以,我们通常优先使⽤“实现 Runnable 接⼝”这种⽅式来⾃定义线程类。

Callable接口

使⽤ Runnable 和 Thread 类是可以创建⼀个新的线程,但是它们有⼀个弊 端,就是 run ⽅法是没有返回值的。

⽽有时候我们希望开启⼀个线程去执⾏⼀个任 务,并且这个任务执⾏完成后有⼀个返回值。

JDK提供了 Callable 接⼝与 Future 接口为我们解决这个问题


先看一下Callable接口:

@FunctionalInterface //函数式接口
public interface Callable<V> {
	//call方法有返回值,且支持泛型
    V call() throws Exception;
}

Callable 与 Runnable 类似,同样是只有⼀个抽象⽅法的函数式接⼝。不同的 是, Callable 提供的⽅法是有返回值的,⽽且⽀持泛型。

一般如何使用Callable接口?

Callable⼀般是配合线程池⼯具ExecutorService 来使⽤的。我会在后续章节详细解释线程池的使⽤。

这⾥只介绍 ExecutorService 可以使⽤ submit ⽅法来让⼀个 Callable 接⼝执⾏。它会返回 ⼀个 Future(接口) ,后续的程序可以通过这个 Future 的 get ⽅法得到结果。如下示例:

// 线程类:实现Callable接口call方法
class Task implements Callable<Integer>{
    @Override
    public Integer call() throws Exception {
        // 模拟计算需要⼀秒
        Thread.sleep(1000);
        return 2;
    }
    public static void main(String args[]){
        // 实例化线程池对象
        ExecutorService executor = Executors.newCachedThreadPool();
        //实例化线程对象
        Task task = new Task();
        // ExecutorService 可以使⽤ submit ⽅法来让⼀个 实现Callable 接⼝的线程类执⾏
        //使用submit方法,有返回值, 它会返回 ⼀个 Future,后续的程序可以通过这个 Future 的 get ⽅法得到结果
        Future<Integer> result = executor.submit(task);
        //通过submit方法的返回结果的 get ⽅法得到结果
        // 注意调⽤get⽅法会阻塞当前线程,直到得到结果。
        // 所以实际编码中建议使⽤可以设置超时时间的重载get⽅法。
        System.out.println(result.get()); 
    }
}
Future接口

Future 是异步思想的典型运用,主要用在一些需要执行耗时任务的场景,避免程序一直原地等待耗时任务执行完成,执行效率太低。

具体来说是这样的:当执行某一耗时任务时,可以将这个耗时任务交给一个子线程去异步执行,同时我们可以干点其他事情,不用傻傻等待耗时任务执行完成。等事情干完后,我们再通过 Future 类获取到耗时任务的执行结果。这样一来,程序的执行效率就明显提高了。

简单理解就是:我有一个任务,提交给了 Future 来处理。任务执行期间我自己可以去做任何想做的事情。并且,在这期间我还可以取消任务以及获取任务的执行状态。一段时间之后,我就可以 Future 那里直接取出任务执行结果。

在 Java 中,Future 类只是一个泛型接口,位于 java.util.concurrent 包下,其中定义了 5 个方法,主要包括下面这 4 个功能:

  • 取消任务;
  • 判断任务是否被取消;
  • 判断任务是否已经执行完成;
  • 获取任务执行结果。

看一下Future接口:

public abstract interface Future<V> {
    //cancel方法是试图取消⼀个线程的执⾏。
    //注意是试图取消,并不⼀定能取消成功。因为任务可能已完成、已取消、或者⼀些其它因素不能取消,存在取消失败的可能
    //boolean 类型的返回值是“是否取消成功”的意思。参数 paramBoolean 表示是否采⽤中断的⽅式取消线程执⾏。
    public abstract boolean cancel(boolean paramBoolean);
    // 判断任务是否被取消
    public abstract boolean isCancelled();
    // 判断任务是否已经执行完成
    public abstract boolean isDone();
    // 获取任务执行结果
    public abstract V get() throws InterruptedException, ExecutionException;
    // 指定时间内没有返回计算结果就抛出 TimeOutException 异常
    public abstract V get(long paramLong, TimeUnit paramTimeUnit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

注意其中的cancel方法,其是试图取消⼀个线程的执⾏,注意是试图取消,并不⼀定能取消成功。

因为任务可能已完成、已取消、或者⼀些其它因素不能取消,存在取消失败的可能。

因为Future接口有cancel方法,所以为了让任务有能够取消的功能,就使⽤ Callable 接口配合Future接口来代替 Runnable 接口。

如果为了可取消性⽽使⽤ Future 但⼜不提供可⽤的结果,则可以声明 Future 形式类型、并返回 null 作为底层任务的结果。

Future对象可以表示异步计算的结果。通过 submit 方法提交任务后,会得到一个 Future 对象,可以用于获取任务的执行结果。

FutureTask类

FutureTask 类是 实现了 RunnableFuture 接⼝的,但由于RunnableFuture 接⼝同时继承了 Runnable 接⼝ 和 Future 接⼝

//如下:RunnableFuture 接⼝同时继承了 Runnable 接⼝ 和 Future 接⼝:
public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}

所以FutureTask类也相当于实现了Future接口!!是Future接口的实现类。

FutureTask 类有什么⽤?为什么要有⼀个 FutureTask 类?

前⾯说到 了 Future 只是⼀个接⼝,⽽它⾥⾯的 cancel ()get()isDone() 等⽅法要⾃⼰实现 起来都是⾮常复杂的。

所以JDK提供了⼀个 FutureTask 类来供我们使⽤,FutureTask表示一个异步运算的任务。

FutureTask里面可以传入一个Callable的具体实现类,可以对这个异步运算的任务的结果进行等待获取、判断是否已经完成、取消任务等操作。当然,由于FutureTask也是Runnable接口的实现类,所以FutureTask也可以放入线程池中。

具体示例如下:

class Task implements Callable<Integer>{
    @Override
    public Integer call() throws Exception{
        Thread.sleep(1000);
        return 2;
    }
    
    //client
    public static void main(String[] args){
        //线程池实例对象
        ExecutorService executor = Executors.newCachedThreadPool();
        //FutureTask类的实例对象,传入实现Callable接口的实现类的实例对象
        FutureTask<Integer> futureTask = new FutureTask<>(new Task());
        //使用线程池的submit方法,没有返回值,传入FutureTask类的实例对象
        executor.submit(futureTask);
        //通过FutureTask类的实例对象的 get ⽅法得到结果
        System.out.println(futureTask.get()); 
    }
}

注意:

在很多⾼并发的环境下,有可能Callable和FutureTask会创建多次。FutureTask能够在⾼并发环境下确保任务只执⾏⼀次。

FutureTask类可能返回的任务状态有哪些?

/**
 *
 * state可能的状态转变路径如下:
 * NEW -> COMPLETING -> NORMAL
 * NEW -> COMPLETING -> EXCEPTIONAL
 * NEW -> CANCELLED
 * NEW -> INTERRUPTING -> INTERRUPTED
 */
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;

state表示任务的运⾏状态,初始状态为NEW。运⾏状态只会在set、 setException、cancel⽅法中终⽌。COMPLETING、INTERRUPTING是任 务完成后的瞬时状态。

Runnable接口和Callable接口的区别

Runnable接口中的run()方法的返回值是void,它做的事情只是纯粹地去执行run()方法中的代码而已;

Callable接口中的call()方法是有返回值的,是一个泛型,和Future接口、FutureTask类配合可以用来获取任务执行的结果。

这其实是很有用的一个特性,因为多线程相比单线程更难、更复杂的一个重要原因就是因为多线程充满着未知性,我们能做的只是等待这条多线程的任务执行完毕而已。而Callable+Future/FutureTask却可以获取多线程运行的结果,可以在等待时间太长没获取到需要的数据的情况下取消该线程的任务,真的是非常有用。


Java线程的状态及状态转换

操作系统中的线程状态及转换

操作系统中,线程是被视为轻量级进程的,所以操作系统线程的状态其实和操作系统进程的状态是⼀致的

操作系统中进程/线程的状态及状态转换图:

在这里插入图片描述

其中最主要的其实就是三个状态:

  • 就绪状态(ready):线程正在等待使⽤CPU,经调度程序调⽤之后可进⼊ running状态。
  • 执⾏状态(running):线程正在使⽤CPU。
  • 等待状态(waiting): 线程经过等待事件的调⽤或者正在等待其他资源(如 I/O)。

Java中的6个线程状态

源码:

// Thread.State 源码
public enum State {
    NEW,  //新建状态
    RUNNABLE, //可运行状态
    BLOCKED, //阻塞状态
    WAITING, //等待状态
    TIMED_WAITING, //超时等待状态
    TERMINATED;  //终止状态
}
NEW

处于NEW 状态的线程此时尚未启动。

这⾥的尚未启动指的是还没调⽤Thread实例 的start()⽅法,所以创建的线程在未调用start()方法前是处于NEW状态,如下:

private void testStartNew(){
    Thread thread = new Thread(() -> {});
    System.out.println(thread.getState()); //输出NEW
}

注意:

  • 反复调用同一个线程的start()方法是会抛异常的,只能调用一次start()方法。
  • 假如⼀个线程执⾏完毕(此时处于TERMINATED状态),再次调⽤这个线程 的start()⽅法也是不可⾏的,会抛异常。
RUNNABLE

看看Thread源码⾥对RUNNABLE状态的 定义:

/**
 * Thread state for a runnable thread. A thread in the runnable
 * state is executing in the Java virtual machine but it may
 * be waiting for other resources from the operating system
 * such as processor.
 */

所以,Java线程的RUNNABLE状态其实是包括了传统操作系统线程的ready和 running两个状态的。

BLOCKED

阻塞状态。处于BLOCKED状态的线程正等待锁的释放以进⼊同步区。

举例:

假如今天你下班后准备去⻝堂吃饭。你来到⻝堂仅有的⼀个窗⼝,发现前⾯
已经有个⼈在窗⼝前了,此时你必须得等前⾯的⼈从窗⼝离开才⾏。
假设你是线程t2,你前⾯的那个⼈是线程t1。此时t1占有了锁(⻝堂唯⼀的
窗⼝),t2正在等待锁的释放,所以此时t2就处于BLOCKED状态。
WAITING

等待状态。

处于等待状态的线程变成RUNNABLE状态需要其他线程唤醒。

举例:

你等了好⼏分钟现在终于轮到你了,突然你们有⼀个“不懂事”的经理突然来
了。你看到他你就有⼀种不祥的预感,果然,他是来找你的。
他把你拉到⼀旁叫你待会⼉再吃饭,说他下午要去作报告,赶紧来找你了解
⼀下项⽬的情况。你⼼⾥虽然有⼀万个不愿意但是你还是从⻝堂窗⼝⾛开了。

此时,假设你还是线程t2,你的经理是线程t1。虽然你此时都占有锁(窗
⼝)了,“不速之客”来了你还是得释放掉锁。此时你t2的状态就是
WAITING。然后经理t1获得锁,进⼊RUNNABLE状态。
要是经理t1不主动唤醒你t2(notify、notifyAll..),可以说你t2只能⼀直等待了。
TIMED_WAITING

超时等待状态。线程等待⼀个具体的时间,时间到后会被⾃动唤醒。

举例:

到了第⼆天中午,⼜到了饭点,你还是到了窗⼝前。
突然间想起你的同事叫你等他⼀起,他说让你等他⼗分钟他改个bug。
好吧,你说那你就等等吧,你就离开了窗⼝。很快⼗分钟过去了,你⻅他还
没来,你想都等了这么久了还不来,那你还是先去吃饭好了。

这时你还是线程t1,你改bug的同事是线程t2。t2让t1等待了指定时间,t1先
主动释放了锁。此时t1等待期间就属于TIMED_WATING状态。
t1等待10分钟后,就⾃动唤醒,拥有了去争夺锁的资格。
TERMINATED

终⽌状态。此时线程已执⾏完毕。

Java中的线程状态转换

先给出一张Java中的线程状态转换的总结图,如下:

在这里插入图片描述

BLOCKED和RUNNABLE状态的转换

看⼀个例⼦:

@Test
public void blockedTest() {
    //创建线程a
    Thread a = new Thread(new Runnable() {
        @Override
        public void run() {
            testMethod();
        }
    }, "a");
    //创建线程b
    Thread b = new Thread(new Runnable() {
        @Override
        public void run() {
            testMethod();
        }
    }, "b");
    //开始调度线程a
    a.start();
    //开始调度线程b
    b.start();
    //主线程main:
    System.out.println(a.getName() + ":" + a.getState()); // 输出?
    System.out.println(b.getName() + ":" + b.getState()); // 输出?
}


// 同步⽅法争夺锁(synchronized)
private synchronized void testMethod() {
    try {
        Thread.sleep(2000L);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

初看之下,可能会觉得线程a会先调⽤同步⽅法,同步⽅法内⼜调⽤了 Thread.sleep()⽅法,必然会输出TIMED_WAITING,⽽线程b因为等待线程a释放 锁所以必然会输出BLOCKED。

但是其实不一定是,⼀是在测试⽅法blockedTest()内还有⼀个 main线程,⼆是启动线程后执⾏run⽅法还是需要消耗⼀定时间的。

上⾯代码中都应该输出RUNNABLE。【即main线程跑的最快】

测试⽅法的main线程只保证了a,b两个线程调⽤start()⽅法(转化为 RUNNABLE状态)还没等两个线程真正开始争夺锁,就已经打印此时两个 线程的状态(RUNNABLE)了。

要是想要打印出猜想的线程a是TIMED_WAITING,线程b是BLOCKED状态,该怎么处理呢?

其实就处理下测试⽅法⾥的main线程就可以了,让main线程“休息⼀会⼉”,调⽤ Thread.sleep⽅法就⾏。

这⾥需要注意的是main线程休息的时间,要保证在线程争夺锁的时间内,不要等到 前⼀个线程锁都释放了你再去争夺锁,此时就得不到想要的状态了。

所以可以把上⾯的测试⽅法blockedTest()改动⼀下,如下:

@Test
public void blockedTest() {
    //创建线程a
    Thread a = new Thread(new Runnable() {
        @Override
        public void run() {
            testMethod();
        }
    }, "a");
    //创建线程b
    Thread b = new Thread(new Runnable() {
        @Override
        public void run() {
            testMethod();
        }
    }, "b");
    //开始调度线程a
    a.start();
    //main线程休息一下
    Thread.sleep(1000L); 需要注意这⾥main线程休眠了1000毫秒,⽽testMethod()⾥休眠了2000毫秒
    //开始调度线程b
    b.start();
    //主线程main:
    System.out.println(a.getName() + ":" + a.getState()); // 输出?
    System.out.println(b.getName() + ":" + b.getState()); // 输出?
}


// 同步⽅法争夺锁(synchronized)
private synchronized void testMethod() {
    try {
        Thread.sleep(2000L);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

在这个例⼦中,由于main线程休眠,所以线程a的run()⽅法跟着执⾏,线程b再接 着执⾏。 在线程a执⾏run()调⽤testMethod()之后,线程a休眠了2000ms(注意这⾥是没有释放锁的),main线程休眠完毕,此时a还是持有锁,接着b线程执⾏的时候是争夺不到锁的,所以这 ⾥就会输出: 线程a是TIMED_WAITING,线程b是BLOCKED状态了!

WAITING状态与RUNNABLE状态的转换

一般调⽤如下2个⽅法会使线程从RUNNABLE状态进⼊WAITING等待状态:

  • Object.wait():使当前线程处于等待状态直到另⼀个线程唤醒它;

    1. 调⽤wait()⽅法前线程必须持有对象的锁。

    2. 线程调⽤wait()⽅法时,会释放当前的锁,直到有其他线程调⽤。

    3. notify() / notifyAll()⽅法可以唤醒等待锁的线程。

      需要注意的是:

      其他线程调⽤notify()⽅法只会唤醒单个等待锁的线程,如有多个线程都在等待这个锁的话不⼀定会唤醒到所有之前调⽤wait()⽅法的线程;

      而调⽤notifyAll()⽅法唤醒所有等待锁的线程之后,也不⼀定会⻢上把时 间⽚分给刚才放弃锁的那个线程,具体要看系统的调度。

  • Thread.join():等待线程执⾏完毕,底层调⽤的还是Object实例的wait⽅法;

    调⽤join()⽅法不会释放锁,调用方法的线程会⼀直等待另一个线程执⾏完毕(执行完毕后就转换为 TERMINATED状态)。

    举例,把上⾯的例⼦线程启动那⾥改变⼀下:

    @Test
    public void blockedTest() {
        //创建线程a
        Thread a = new Thread(new Runnable() {
            @Override
            public void run() {
                testMethod();
            }
        }, "a");
        //创建线程b
        Thread b = new Thread(new Runnable() {
            @Override
            public void run() {
                testMethod();
            }
        }, "b");
        //开始调度线程a
        a.start();
        //-------------
        //使用了如下方法后,主线程main将会等待线程a,直到线程a执行完毕
        a.join();
        //-------------
        //开始调度线程b
        b.start();
        //主线程main:
        System.out.println(a.getName() + ":" + a.getState()); // 输出TERMINATED
        System.out.println(b.getName() + ":" + b.getState()); // 输出
    }
    
    
    // 同步⽅法争夺锁(synchronized)
    private synchronized void testMethod() {
        try {
            Thread.sleep(2000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    

    要是主线程main没有调⽤线程a的join⽅法,main线程不管a线程是否执⾏完毕都会继续往下⾛。

    a线程启动之后⻢上调⽤了join⽅法,这⾥main线程就会等到a线程执⾏完毕,所以 这⾥a线程打印的状态固定是TERMIATED。

    ⾄于b线程的状态,有可能打印RUNNABLE(尚未进⼊同步⽅法),也有可能打印 TIMED_WAITING(进⼊了同步⽅法)。

TIMED_WAITING与RUNNABLE状态的转换

TIMED_WAITINGWAITING状态类似,只是TIMED_WAITING状态等待的时间是指定的,有限的。


一般调⽤如下三个⽅法会使线程从RUNNABLE状态进⼊TIMED_WAITING超时等待状态:

  • Thread.sleep(long millis):使当前线程睡眠指定时间

    需要注意这⾥的“睡眠”只是暂时使线程停⽌执⾏,并不会释放锁。时间到后,线程会重新进⼊RUNNABLE状态。

  • Object.wait(long timeout):线程休眠指定时间,当然等待期间可以通过 notify() / notifyAll()唤醒;

    wait(long)⽅法使线程进⼊TIMED_WAITING状态。

    这⾥的wait(long)⽅法与 ⽆参⽅法wait()相同的地⽅是,线程都会释放锁,但都可以通过其他线程调⽤notify()或notifyAll() ⽅法来唤醒。

    不同的地⽅是,有参⽅法wait(long)就算其他线程不来唤醒它,经过指定时间 long之后它会⾃动唤醒,拥有去争夺锁的资格。

  • Thread.join(long millis):等待当前线程最多执⾏millis毫秒,如果millis为0,则 会⼀直执⾏;

    底层调⽤的还是Object实例的wait⽅法.

    再来改⼀改刚才的示例:

    public void blockedTest() {
        ······
            a.start();
        a.join(1000L);
        b.start();
        System.out.println(a.getName() + ":" + a.getState()); // 输出 TIEMD_WAITING
        System.out.println(b.getName() + ":" + b.getState());
    }
    

    这⾥调⽤a.join(1000L),是指定了主线程main等待a线程执⾏的时间的长度,但由于主线程main等待a执⾏的时间是⼩于a线程sleep的时间,所以到了主线程继续往下走,走到输出语句时,a线程状态输出TIMED_WAITING。

    b线程状态仍然不固定(RUNNABLE或BLOCKED)。

线程中断机制

在某些情况下,我们在线程启动后发现并不需要它继续执⾏下去时,需要中断线程。

但⽬前在Java⾥还没有安全直接的⽅法来停⽌线程,但是Java提供了线程中断机制来处理需要中断线程的情况。

注意:

线程中断机制是⼀种协作机制。

通过中断操作并不能直接终⽌⼀ 个线程,⽽是通知需要被中断的线程⾃⾏处理。

Thread类⾥提供的关于线程中断的⼏个⽅法:

  • Thread.interrupt():中断线程。这⾥的中断线程并不会⽴即停⽌线程,⽽是设置线程的中断状态为true(默认是false);
  • Thread.interrupted():测试当前线程是否被中断。线程的中断状态受这个⽅法 的影响,意思是调⽤⼀次使线程中断状态设置为true,连续调⽤两次会使得这个线程的中断状态重新转为false;
  • Thread.isInterrupted():测试当前线程是否被中断。与上⾯⽅法不同的是调⽤ 这个⽅法并不会影响线程的中断状态。

在线程中断机制⾥,当其他线程通知需要被中断的线程后,其线程中断的状态被设置为true,但是具体被要求中断的线程要怎么处理,完全由被中断线程⾃⼰⽽定,可以在合适的时机处理中断请求,也可以完全不处理继续执⾏下 去。


Java线程间的通信

⼀般来讲,每个线程内部都有⾃⼰私有的线程上下⽂,所以各个线程之间互不⼲扰。

由于合理的使⽤Java多线程可以更好地利⽤服务器资源,所以当需要多个线程之间相互协作的时候,就需要掌握线程之间的通信⽅式。

锁与同步

什么是锁

在Java中,锁的概念都是基于对象的,所以我们⼜经常称它为对象锁。

**线程和锁的关系:**可以⽤婚姻关系来理解。

⼀个锁同⼀时间只能被⼀个线程持有。

也就是 说,⼀个锁如果和⼀个线程“结婚”(持有),那其他线程如果需要得到这个锁,就 得等这个线程和这个锁“离婚”(释放)。

什么是线程同步?

线程同步是线程之间按照⼀定的顺序执⾏。

为了达到线程同步,可以使⽤锁来实现它。

无锁代码示例:

public class NoneLock {
    static class ThreadA implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 100; i++) {
                System.out.println("Thread A " + i);
            }
        }
    }
    
    static class ThreadB implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 100; i++) {
                System.out.println("Thread B " + i);
            }
        }
    }
    public static void main(String[] args) {
        new Thread(new ThreadA()).start();
        new Thread(new ThreadB()).start();
    }
}

//执⾏这个程序,你会在控制台看到,线程A和线程B各⾃独⽴⼯作,输出⾃⼰的打印值。且每⼀次运⾏结果都会不⼀样。

但现在有⼀个需求,我想等A先执⾏完之后,再由B去执⾏,怎么办呢?最简单 的⽅式就是使⽤⼀个“对象锁”:

加锁代码示例:

public class ObjectLock {
    //这⾥声明了⼀个名字为 lock 的对象锁。
    private static Object lock = new Object();
    static class ThreadA implements Runnable {
        @Override
        public void run() {
            //在 ThreadA 和 ThreadB 内需要同步的代码块⾥,都是⽤ synchronized 关键字加上了同⼀个对象锁 lock 。
            synchronized (lock) {
                for (int i = 0; i < 100; i++) {
                    System.out.println("Thread A " + i);
                }
            }
        }
    }
    
    static class ThreadB implements Runnable {
        @Override
        public void run() {
            在 ThreadA 和 ThreadB 内需要同步的代码块⾥,都是⽤ synchronized 关键字加上了同⼀个对象锁 lock 。
            synchronized (lock) {
                for (int i = 0; i < 100; i++) {
                    System.out.println("Thread B " + i);
                }
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        new Thread(new ThreadA()).start();
        Thread.sleep(10);
        new Thread(new ThreadB()).start();
    }
}

我们说到了,根据线程和锁的关系,同⼀时间只有⼀个线程持有⼀个锁,那么 线程B就会等线程A执⾏完成后释放 lock ,线程B才能获得锁 lock 。

这⾥在主线程⾥使⽤sleep⽅法睡眠了10毫秒,是为了防⽌线程B先得到锁。 因为如果同时start,线程A和线程B都是出于就绪状态,操作系统可能会先让 B运⾏。这样就会先输出B的内容,然后B执⾏完成之后⾃动释放锁,线程A 再执⾏。

等待/通知机制

上⾯⼀种基于“锁”的⽅式,线程需要不断地去尝试获得锁,如果失败了,再继续尝试。

这可能会耗费服务器资源,⽽等待/通知机制是另⼀种⽅式。

Java多线程的等待/通知机制是基于 Object 类wait() ⽅法notify() , notifyAll() ⽅法来实现的。

notify()⽅法会随机叫醒⼀个正在等待的线程,⽽notifyAll()则会叫醒所有正在等 待的线程。

前⾯讲到,⼀个锁同⼀时刻只能被⼀个线程持有。

⽽假如线程A现在持有了⼀ 个锁 lock 并开始执⾏,它可以使⽤ lock.wait() 让⾃⼰进⼊等待状态。这个时 候, lock 这个锁是被释放了的。 这时,线程B获得了 lock 这个锁并开始执⾏,它可以在某⼀时刻,使 ⽤ lock.notify()通知之前持有 lock 锁并进⼊等待状态的线程A,说“线程A你不 ⽤等了,可以往下执⾏了”。

要注意的是,这个时候线程B并没有释放锁 lock ,除⾮线程B这个时候使⽤lock.wait()释放锁,或者线程B执⾏结束⾃⾏释放锁,线程A才能得 到 lock 锁,才能正式执行。

代码示例:

//在这个Demo⾥,线程A和线程B⾸先打印出⾃⼰需要的东⻄,然后使⽤ notify() ⽅法叫醒另⼀个正在等待的线程,然后⾃⼰使⽤ wait() ⽅法陷⼊等待并释放 lock 锁。
public class WaitAndNotify {
    private static Object lock = new Object();
    static class ThreadA implements Runnable {
        @Override
        public void run() {
            synchronized (lock) {
                for (int i = 0; i < 5; i++) {
                    try {
                        System.out.println("ThreadA: " + i);
                        lock.notify();
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                lock.notify();// 确保线程B能够正常结束等待
            }
        }
    }
    
    static class ThreadB implements Runnable {
        @Override
        public void run() {
            synchronized (lock) {
                for (int i = 0; i < 5; i++) {
                    try {
                        System.out.println("ThreadB: " + i);
                        lock.notify();
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        new Thread(new ThreadA()).start();
        Thread.sleep(1000);//主线程休眠1秒,主线程就还不会走到线程B的启动,所以可以确保线程A优先执行
        new Thread(new ThreadB()).start();
    }
}

// 输出:
ThreadA: 0
ThreadB: 0
ThreadA: 1
ThreadB: 1
ThreadA: 2
ThreadB: 2
ThreadA: 3
ThreadB: 3
ThreadA: 4
ThreadB: 4

需要注意的是:

等待/通知机制使⽤的是使⽤同⼀个对象锁,如果你两个线程使⽤的是不同的对象锁,那它们之间是不能⽤等待/通知机制通信的。

信号量

JDK提供了⼀个类似于“信号量”功能的类 Semaphore 。但这里介绍的是⼀种基于 volatile 关键字的⾃⼰实现的信号量通信。

volitile关键字能够保证内存的可⻅性,如果⽤volitile关键字声明了⼀个变量,在⼀个线程⾥⾯改变了这个变量的值,那其它线程是⽴⻢可⻅更改后的值的。

⽐如现在有⼀个需求,我想让线程A输出0,然后线程B输出1,再然后线程A输出 2…以此类推。应该怎样实现呢?

public class Signal {
    private static volatile int signal = 0;
    static class ThreadA implements Runnable {
        @Override
        public void run() {
            while (signal < 5) {
                if (signal % 2 == 0) {
                    System.out.println("threadA: " + signal);
                    synchronized (this) {
                        signal++;
                    }
                }
            }
        }
    }
    static class ThreadB implements Runnable {
        @Override
        public void run() {
            while (signal < 5) {
                if (signal % 2 == 1) {
                    System.out.println("threadB: " + signal);
                    synchronized (this) {
                        signal = signal + 1;
                    }
                }
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        new Thread(new ThreadA()).start();
        Thread.sleep(1000);
        new Thread(new ThreadB()).start();
    }
}
// 输出:
threadA: 0
threadB: 1
threadA: 2
threadB: 3
threadA: 4

可以看到,使⽤了⼀个 volatile 变量 signal 来实现了“信号量”的模型。

这⾥ 需要注意的是, volatile 变量需要进⾏原⼦操作。 signal++ 并不是⼀个原⼦操 作,所以需要使⽤ synchronized 给它“上锁”。

这种实现⽅式并不⼀定⾼效,本例只是演示信号量

信号量的应用场景:

假如在⼀个停⻋场中,⻋位是我们的公共资源,线程就如同⻋辆,⽽看⻔的管理员 就是起的“信号量”的作⽤。 因为在这种场景下,多个线程(超过2个)需要相互合作,我们⽤简单的“锁”和“等 待通知机制”就不那么⽅便了。这个时候就可以⽤到信号量。

JDK中提供的很多多线程通信⼯具类都是基于信号量模型的。

管道

管道是基于“管道流”的通信⽅式。JDK提供了 PipedWriterPipedReaderPipedOutputStreamPipedInputStream

其中,前⾯两个是基于字符的,后⾯两 个是基于字节流的。

示例代码(字符流)

public class Pipe {
    static class ReaderThread implements Runnable {
        private PipedReader reader;
        public ReaderThread(PipedReader reader) {
            this.reader = reader;
        }
        @Override
        public void run() {
            System.out.println("this is reader");
            int receive = 0;
            try {
                while ((receive = reader.read()) != -1) {
                    System.out.print((char)receive);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    static class WriterThread implements Runnable {
        private PipedWriter writer;
        public WriterThread(PipedWriter writer) {
            this.writer = writer;
        }
        @Override
        public void run() {
            System.out.println("this is writer");
            int receive = 0;
            try {
                writer.write("test");
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    writer.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    public static void main(String[] args) throws IOException, InterruptedExce
        PipedWriter writer = new PipedWriter();
    PipedReader reader = new PipedReader();
    writer.connect(reader); // 这⾥注意⼀定要连接,才能通信
    new Thread(new ReaderThread(reader)).start();
    Thread.sleep(1000);
    new Thread(new WriterThread(writer)).start();
}
}
// 输出:
this is reader
this is writer
test
    
//简单分析⼀下这个示例代码的执⾏流程:
1. 线程ReaderThread开始执⾏,
2. 线程ReaderThread使⽤管道reader.read()进⼊”阻塞“,
3. 线程WriterThread开始执⾏,
4. 线程WriterThread⽤writer.write("test")往管道写⼊字符串,
5. 线程WriterThread使⽤writer.close()结束管道写⼊,并执⾏完毕,
6. 线程ReaderThread接收到管道输出的字符串并打印,
7. 线程ReaderThread执⾏完毕。

管道通信的应用场景:

使⽤管道多半与I/O流相关。当⼀个线程需要先另⼀个线程发 送⼀个信息(⽐如字符串)或者⽂件等等时,就需要使⽤管道通信了。

其他知识点

ThreadLocal类

通常情况下,我们创建的变量是可以被任何一个线程访问并修改的。如果想实现每一个线程都有自己的专属本地变量该如何解决呢?

JDK 中自带的ThreadLocal类正是为了解决这样的问题。 ThreadLocal类主要解决的就是让每个线程绑定自己的值,可以将ThreadLocal类形象的比喻成存放数据的盒子,盒子中可以存储每个线程的私有数据。

ThreadLocal是⼀个本地线程副本变量⼯具类。

严格来说,ThreadLocal 类并不属于多线程间的通信,⽽是让每个线程有⾃⼰”独⽴“的变量,线程之间互不影响

它为每个线程都创建⼀个副本,每个线程可以访问⾃⼰内部的副本变量。

ThreadLocal类最常⽤的就是set⽅法和get⽅法

如果你创建了一个ThreadLocal变量,那么访问这个变量的每个线程都会有这个变量的本地副本,这也是ThreadLocal变量名的由来。他们可以使用 get()set() 方法来获取默认值或将其值更改为当前线程所存的副本的值,从而避免了线程安全问题。

示例代码:

public class ThreadLocalDemo {
    static class ThreadA implements Runnable {
        private ThreadLocal<String> threadLocal;
        public ThreadA(ThreadLocal<String> threadLocal) {
            this.threadLocal = threadLocal;
        }
        @Override
        public void run() {
            threadLocal.set("A");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("ThreadA输出:" + threadLocal.get());
        }
        static class ThreadB implements Runnable {
            private ThreadLocal<String> threadLocal;
            public ThreadB(ThreadLocal<String> threadLocal) {
                this.threadLocal = threadLocal;
            }
            @Override
            public void run() {
                threadLocal.set("B");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("ThreadB输出:" + threadLocal.get());
            }
        }
        public static void main(String[] args) {
            ThreadLocal<String> threadLocal = new ThreadLocal<>();
            new Thread(new ThreadA(threadLocal)).start();
            new Thread(new ThreadB(threadLocal)).start();
        }
    }
}
// 输出:
ThreadA输出:A
ThreadB输出:B
//可以看到,虽然两个线程使⽤的同⼀个ThreadLocal实例(通过构造⽅法传⼊), 但是它们各⾃可以存取⾃⼰当前线程的⼀个值。

可以看到,虽然两个线程使⽤的同⼀个ThreadLocal实例(通过构造⽅法传⼊), 但是它们各⾃可以存取⾃⼰当前线程的⼀个值。

ThreadLocal有什么作⽤呢?

如果只是单纯的想要线程隔离,在每个线程中声明 ⼀个私有变量就好了呀,为什么要使⽤ThreadLocal

**答:**在多线程的程序中,不同线程可能需要处理一些属于自己的数据。如果直接在每个线程中声明私有变量,这些变量对其他线程是不可见的,但是这样管理起来可能比较繁琐。这时,ThreadLocal 就发挥了作用。

  1. 线程隔离: ThreadLocal 允许每个线程都有自己独立的变量,不会被其他线程访问到。
  2. 便捷: 使用 ThreadLocal 更方便,不需要手动在每个线程中创建和管理变量。
  3. 简化代码: 避免了在方法之间传递参数的繁琐性,让代码更简洁。
  4. 性能提升: 由于每个线程都有自己的变量,避免了对共享数据进行加锁的开销,提高了性能。
  5. 线程安全的单例模式: ThreadLocal 可以用来实现线程安全的单例模式,确保每个线程都有唯一的实例。

总的来说,ThreadLocal 是为了在多线程环境下更方便地管理每个线程的私有数据,使得代码更易读、易维护,同时提高性能。

再举个简单的例子:

两个人去宝屋收集宝物,这两个共用一个袋子的话肯定会产生争执,但是给他们两个人每个人分配一个袋子的话就不会出现这样的问题。如果把这两个人比作线程的话,那么 ThreadLocal 就是用来避免这两个线程竞争的。

使用场景:

最常⻅的ThreadLocal使⽤场景为⽤来解决数据库连接、Session管理等。数据库连 接和Session管理涉及多个复杂对象的初始化和关闭。如果在每个线程中声明⼀些 私有变量来进⾏操作,那这个线程就变得不那么“轻量”了,需要频繁的创建和关闭 连接。

InheritableThreadLocal类

InheritableThreadLocal类与ThreadLocal类稍有不同,Inheritable是继承的意思。

它不仅仅是当前线程可以存取副本值,⽽且它的⼦线程也可以存取这个副本值。


JMM(Java Memory Model)

并发编程的两个关键问题

线程间如何通信?

即:线程之间以何种机制来交换信息

线程间如何同步?

即:线程以何种机制来控制不同线程间操作发⽣的相对顺序

有两种并发模型可以解决这两个问题:

  • 消息传递并发模型
  • 共享内存并发模型

这两种并发模型之间的区别:

在这里插入图片描述

在Java中,使⽤的是共享内存并发模型。

JMM(Java内存模型的抽象结构)

Java运行时数据区的内容区域划分

在这里插入图片描述

对于每⼀个线程来说,栈都是私有的,⽽堆是共有的。 所谓的内存可⻅性是针对的共享变量。

内存可⻅性,指的是线程之间的可⻅性,当⼀个线程修改了共享变量时,另⼀个线程可以看到和读取到这个修改后的共享变量的值。

也就是说在栈中的变量(局部变量、⽅法定义参数、异常处理器参数)不会在线程之间共享,也就不会有内存可⻅性的问题,也不受内存模型的影响。⽽在堆中的变量是共享的,也就是共享变量。

JMM抽象示意图

Java线程之间的通信由Java内存模型(简称JMM)控制!

JMM定义了线程和主内存之间的抽象关系

JMM的抽象示意图如下:

在这里插入图片描述

从图中可以看出:

  1. 所有的共享变量都存在主内存中。

  2. 每个线程都保存了⼀份该线程使⽤到的共享变量的副本。

  3. 如果线程A与线程B之间要通信的话,必须经历下⾯2个步骤:

    i. 线程A将本地内存A中更新过的共享变量刷新到主内存中去。

    ii. 线程B到主内存中去读取线程A之前已经更新过的共享变量。

所以,线程A⽆法直接访问线程B的⼯作内存,线程间通信必须经过主内存。

注意,根据JMM的规定,线程对共享变量的所有操作都必须在⾃⼰的本地内存中进⾏,不能直接从主内存中读取。

所以线程B并不是直接去主内存中读取共享变量的值,⽽是先在本地内存B中找到 这个共享变量,发现这个共享变量已经被更新了,然后本地内存B去主内存中读取 这个共享变量的新值,并拷⻉到本地内存B中,最后线程B再读取本地内存B中的新值。

那么怎么知道这个共享变量的被其他线程更新了呢?

这就是JMM的功劳了,也是 JMM存在的必要性之⼀。

JMM通过控制主内存与每个线程的本地内存之间的交互,来提供内存可⻅性保证。

我们知道堆是共享的,为什么在堆中还会有内存不可⻅问 题?

这是因为现代计算机为了⾼效,往往会在⾼速缓存区【cpu的cache】中缓存共享变量,因为cpu访问缓存区⽐访问内存要快得多。

线程之间的共享变量存在主内存中,但每个线程都有⼀个私有的本地内存,存储了该线程读、写共享变量的副本。

其中本地内存是Java内存模型的⼀个抽 概念,并不真实存在。它实际上涵盖了缓存、写缓冲区、寄存器等。


JMM与Java内存区域划分的区别和联系

区别:

两者是不同的概念层次。

JMM是抽象的,他是⽤来描述⼀组规则,通过这个规则来控制各个变量的访问⽅式,围绕原⼦性、有序性、可⻅性等展开的。

Java运⾏时内存的划分是具体的,是JVM运⾏Java程序时必要的内存划分。

联系:

都存在私有数据区域和共享数据区域。

⼀般来说,JMM中的主内存属于共享数据区域,他是包含了实际内存区域的堆和⽅法区;

同样,JMM中的本地内存属于私有数据区域,他是包含了实际内存区域的程序计数器、本地⽅法栈、虚拟机栈。


指令重排序与happens-before规则

指令重排序

什么是指令重排序?

计算机在执⾏程序时,为了提⾼性能,编译器处理器常常会对指令做重排。

  • 编译器的优化重排:

    编译器在不改变单线程程序语义的前提下,可以重新安排语句的执⾏顺序。

  • 处理器的优化重排:

    机器指令间如果不存在数据依赖性(即后⼀个执⾏的语句⽆需依赖前⾯执⾏的语句的结果),处理器可以改变语句对应的机器指令的执⾏顺序。

对编译器和处理器来说,只要不改变程序的执⾏结果(单线程程序和正确同步了的多线程程序),编译器和处理器怎么优化都⾏。

为什么指令重排序可以提高性能?

由于每⼀个指令都会包含多个步骤,每个步骤可能使⽤不同的硬件。因此, 指令流⽔线技术产⽣了,它的原理是指令1还没有执⾏完,就可以开始执⾏指令2,⽽不⽤等到指令1执⾏结束之后再执⾏指令2,这样就⼤⼤提⾼了效率;但是, 流⽔线技术最害怕中断,而指令重排就是减少指令流水线中断的⼀种技术。所以指令重排对于提⾼CPU处理性能⼗分必要。虽由此带来了乱序问题,但是是值得的。

注意:

指令重排可以保证串⾏语义⼀致,但是没有义务保证多线程间的语义也⼀致。所以在多线程下,指令重排序可能会导致⼀些问题。

happens-before规则

happens-before规则有什么用:

JMM提供了happens-before规则,程序员只要遵循happens-before规则,那他写的程序就能保证在JMM中具有强的内存可⻅性。

JMM使⽤happens-before的概念来定制两个操作之间的执⾏顺序,这两个操作可以 在⼀个线程以内,也可以是不同的线程之间。

因此,JMM可以通过happens-before 关系向程序员提供跨线程的内存可⻅性保证,即JVM就能保证指令在多线程之间的顺序性符合预期。

什么是happens-before规则:

happens-before关系的定义如下:

  1. 如果⼀个操作happens-before另⼀个操作,那么第⼀个操作的执⾏结果将对第⼆个操作可⻅,⽽且第⼀个操作的执⾏顺序排在第⼆个操作之前。

  2. 两个操作之间存在happens-before关系,并不意味着Java平台的具体实现必须要按照happens-before关系指定的顺序来执⾏。

    如果重排序之后的执⾏结果,与按happens-before关系执⾏的结果⼀致,那么JMM也允许这样的重排序。

总之,如果操作A happens-before操作B,那么操作A在内存上所做的操作对操作B都是可⻅的,不管它们在不在⼀个线程。

有哪些天然的happens-before关系?

  • 程序顺序规则:⼀个线程中的每⼀个操作,happens-before于该线程中的任意后续操作。

  • 监视器锁规则:对⼀个锁的解锁,happens-before对这个锁的加锁。

  • volatile变量规则:对⼀个volatile变量的写,happens-before于对这个 volatile变量的读。

    传递性:如果A happens-before B,且B happens-before C,那么A happens-before C。

  • start规则:如果线程A执⾏操作ThreadB.start()启动线程B,那么A线程的 ThreadB.start()操作happens-before线程B中的任意操作。

  • join规则:如果线程A执⾏操作ThreadB.join()并成功返回,那么线程B中的任意操作happens-before线程A从ThreadB.join()操作成功返回。


volatile关键字

volatile的作用

在Java中,volatile关键字有两种特殊的内存语义:

  1. 保证共享变量的内存可⻅性

  2. 禁⽌volatile变量与普通变量重排序(通过内存屏障来实现的, 且Java 5 开始才有这个“增强 的volatile内存语义”)

    volatile与普通变量的重排序规则:

    • 如果第⼀个操作是volatile变量的读,那⽆论第⼆个操作是什么,都不能重排序;
    • 如果第⼆个操作是volatile变量的写,那⽆论第⼀个操作是什么,都不能重排序;
    • 如果第⼀个操作是volatile变量的写,第⼆个操作是volatile变量的读,那不能重排序。

volatile的用途

从volatile的内存语义上来看,volatile可以保证内存可⻅性且禁⽌重排序。

从保证内存可⻅性这⼀点而言:

volatile有着与锁相同的内存语义,所以可以作为⼀个 “轻量级”的锁来使⽤。

但由于volatile仅仅保证对单个volatile变量的读/写具有原 性,⽽锁可以保证整个临界区代码的执⾏具有原⼦性。

所以在功能上,锁⽐ volatile更强⼤;在性能上,volatile更有优势。

从禁⽌指令重排序这⼀点而言:

volatile也是⾮常有⽤的。⽐如我们熟悉的单例模式,其中 有⼀种实现⽅式是“双重锁检查”,⽐如这样的代码:

public class Singleton {
    private static Singleton instance; // 不使⽤volatile关键字
    // 双重锁检验
    public static Singleton getInstance() {
        if (instance == null) { // 第7⾏
            synchronized (Singleton.class) {
                if (instance == null) {
                    instance = new Singleton(); // 第10⾏
                }
            }
        }
        return instance;
    }
}

如果这⾥的变量声明不使⽤volatile关键字,是可能会发⽣错误的。它可能会被重排序:

instance = new Singleton(); // 第10⾏

// 可以分解为以下三个步骤
1 memory=allocate();// 分配内存 相当于c的malloc
2 ctorInstanc(memory) //初始化对象
3 s=memory //设置s指向刚分配的地址

// 上述三个步骤可能会被重排序为 1-3-2,也就是:
1 memory=allocate();// 分配内存 相当于c的malloc
3 s=memory //设置s指向刚分配的地址
2 ctorInstanc(memory) //初始化对象

⽽⼀旦假设发⽣了这样的重排序,⽐如线程A在第10⾏执⾏了步骤1和步骤3,但是 步骤2还没有执⾏完。这个时候线程A执⾏到了第7⾏,它会判定instance不为空, 然后直接返回了⼀个未初始化完成的instance从而出错。


synchronized和锁

⾸先需要明确的⼀点是:Java多线程的原生的锁都是基于对象的,Java中的每⼀个对象都可以作为⼀个锁。

还有⼀点需要注意的是,我们常听到的类锁其实也是对象锁。 Java类只有⼀个Class对象(可以有多个实例对象,多个实例共享这个Class对象),⽽Class对象也是特殊的Java对象。所以我们常说的类锁,其实就是Class对象的锁。

synchronized关键字

我们通常使⽤ synchronized 关键字来给⼀段代码或⼀个⽅法上锁。

它通常有以下三种形式:

// 关键字在实例⽅法上,锁为当前类的实例对象
public synchronized void instanceLock() {
    // code
}

// 关键字在静态⽅法上,锁为当前类的Class对象
public static synchronized void classLock() {
    // code
}


// 关键字在代码块上,锁为括号⾥⾯的对象
public void blockLock() {
    Object o = new Object();
    synchronized (o) {
        // code
    }
}

临界区:

所谓“临界区”,指的是某⼀块代码区域,它同⼀时刻只能由⼀个线程执⾏。

如果 synchronized 关键字在⽅法 上,那临界区就是整个⽅法内部。

⽽如果是使⽤synchronized代码块,那临界区就 指的是代码块内部的区域。


所以下面代码的写法是等价的:

//等价写法:
// 关键字在实例⽅法上,锁为当前类的实例对象
public synchronized void instanceLock() {
    // code
}

//等价写法
// 关键字在代码块上,锁为括号⾥⾯的对象
public void blockLock() {
    synchronized (this) {
        // code
    }
}

同理,下⾯这两个⽅法也应该是等价的:

//等价写法:
// 关键字在实例⽅法上,锁为当前类的Class对象
public static synchronized void instanceLock() {
    // code
}

//等价写法
// 关键字在代码块上,锁为括号⾥⾯的对象
public void blockLock() {
    synchronized (this.getClass()) {
        // code
    }
}

总结

  • Java中的volatile关键字可以保证多线程操作共享变量的可⻅性以及禁⽌指令重排序

  • synchronized关键字不仅保证可⻅性,同时也保证了原⼦性(互斥性)。

  • 在更底层,JMM通过内存屏障来实现内存的可⻅性以及禁⽌重排序。
    同时为了程序员的⽅便理解,提出了happens-before规则。

几种锁状态

锁状态

Java 6 为了减少获得锁和释放锁带来的性能消耗,引⼊了“偏向锁”和“轻量级锁“。

在Java 6 以前,所有的锁都是”重量级“锁。

所以在Java 6 及其以后,⼀个锁对象其实有四种锁状态,它们级别由低到⾼依次是:

  1. ⽆锁状态
  2. 偏向锁状态
  3. 轻量级锁状态
  4. 重量级锁状态

⽆锁就是没有对资源进⾏锁定,任何线程都可以尝试去修改这个资源。

锁状态会随着资源的竞争情况逐渐升级,锁的升级很容易发⽣,但是锁降级发⽣的条件会⽐较苛刻。

Java对象头

Java原生的锁都是基于对象的。而每个Java对象都有对象头。

如果是⾮数组类型,则⽤2个字宽来存储对象头,如果是数组,则会⽤3个字宽来存储对象头。

在32位处理器中,⼀个字宽是32位;在64 位处理器中,⼀个字宽是64位。

对象头的内容如下表:

存储对象头的bit长度内容内容说明
32/64 bitMark Word存储对象的hashcode 或 锁信息
32/64 bitClass Metadata Address存储指向对象类型数据的指针
32/64 bitArray length数组的长度(如果是数组)

其中Mark Word的格式内容如下表:

锁状态29bit 或 61bit1bit(是否是偏向锁)?2bit(锁标志位)
无锁001
偏向锁线程ID101
轻量级锁指向栈中锁的记录空间的指针此时这一位不用于标识偏向锁00
重量级锁指向互斥量(重量级锁)的指针此时这一位不用于标识偏向锁10
GC标记此时这一位不用于标识偏向锁11
偏向锁

研究发现⼤多数情况下锁不仅不存在多线程竞争,⽽且 总是由同⼀线程多次获得,于是引⼊了偏向锁。

偏向锁会偏向于第⼀个访问锁的线程,如果在接下来的运⾏过程中,该锁没有被其他的线程竞争,则持有偏向锁的线程将永远不需要触发同步。也就是说,偏向锁在 资源⽆竞争情况下消除了同步语句,连CAS操作都不做了,提⾼了程序的运⾏性能。

其实就是对锁加个变量,

如果发现为true,代表资源⽆竞争,则⽆需再⾛各种加锁/解锁流程。

如果为false,代表存在其他线程竞争资源,那么就会⾛后⾯的各种加锁/解锁流程。

实现原理:

⼀个线程在第⼀次进⼊同步块时,会在锁的MarkWord⾥存储锁的偏向的线程ID。当下次该线程进⼊这个同步块时,会去检查锁的Mark Word⾥⾯是不是放 的⾃⼰的线程ID。

如果是,表明该线程已经获得了锁,以后该线程在进⼊和退出同步块时不需要花费 CAS操作来加锁和解锁 ;如果不是,就代表有另⼀个线程来竞争这个偏向锁。这 个时候会尝试使⽤CAS来替换Mark Word⾥⾯的线程ID为新线程的ID,这个时候要 分两种情况:

  • 成功,表示之前的线程不存在了, Mark Word⾥⾯的线程ID为新线程的ID,锁不会升级,仍然为偏向锁;

  • 失败,表示之前的线程仍然存在,那么暂停之前的线程,设置偏向锁标识为 0,并设置锁标志位为00,升级为轻量级锁,会按照轻量级锁的⽅式进⾏竞争锁。

    偏向锁升级成轻量级锁时,会暂停拥有偏向锁的线程,重置偏向锁标识。

撤销偏向锁:

偏向锁使⽤了⼀种等到竞争出现才释放锁的机制,所以当其他线程尝试竞争偏向锁 时, 持有偏向锁的线程才会释放锁。

轻量级锁

多个线程在不同时段获取同⼀把锁,即不存在锁竞争的情况,也就没有线程阻塞。

针对这种情况,JVM采⽤轻量级锁来避免线程的阻塞与唤醒。

轻量级锁的加锁:

JVM会为每个线程在当前线程的栈帧中创建⽤于存储锁记录的空间,我们称为 Displaced Mark Word。

如果⼀个线程获得锁的时候发现是轻量级锁,会把锁的 Mark Word复制到⾃⼰的Displaced Mark Word⾥⾯。

然后线程尝试⽤CAS将锁的Mark Word替换为指向锁记录的指针。如果成功,当前 线程获得锁,如果失败,表示Mark Word已经被替换成了其他线程的锁记录,说明 在与其它线程竞争锁,当前线程就尝试使⽤⾃旋来获取锁。

⾃旋:让等待锁的线程不要被阻塞,而是在不断尝试去获取锁,⼀般⽤忙循环来实现。

⾃旋是需要消耗CPU的,如果⼀直获取不到锁的话,那该线程就⼀直处在⾃旋状 态,⽩⽩浪费CPU资源。

解决这个问题最简单的办法就是指定⾃旋的次数,例如让 其循环10次,如果还没获取到锁就进⼊阻塞状态。

但是JDK采⽤了更聪明的⽅式——适应性⾃旋,简单来说就是线程如果⾃旋成功 了,则下次⾃旋的次数会更多,如果⾃旋失败了,则⾃旋的次数就会减少。

⾃旋也不是⼀直进⾏下去的,如果⾃旋到⼀定程度(和JVM、操作系统相关),依 然没有获取到锁,称为⾃旋失败,那么这个线程会阻塞。同时这个锁状态就会升级成重量级锁。

轻量级锁的释放:

在释放锁时,当前线程会使⽤CAS操作将Displaced Mark Word的内容复制回锁的 Mark Word⾥⾯。如果没有发⽣竞争,那么这个复制的操作会成功。如果有其他线 程因为⾃旋多次导致轻量级锁升级成重量级锁,那么CAS操作会失败,此时会释放锁并唤醒被阻塞的线程。

重量级锁

重量级锁依赖于操作系统的互斥量(mutex) 实现的,⽽操作系统中线程间状态的 转换需要相对⽐较⻓的时间,所以重量级锁效率很低,但被阻塞的线程不会消耗 CPU(不会自旋)。

synchronized就是一个重量级锁。

由于每⼀个对象都可以当做⼀个锁,当多个线程同时请求某个对象锁时,对象锁会设置⼏种状态⽤来区分请求的线程:

Contention List:所有请求锁的线程将被⾸先放置到该竞争队列
Entry List:Contention List中那些有资格成为候选⼈的线程被移到Entry List
Wait Set:那些调⽤wait⽅法被阻塞的线程被放置到Wait Set
OnDeck:任何时刻最多只能有⼀个线程正在竞争锁,该线程称为OnDeck
Owner:获得锁的线程称为Owner
!Owner:释放锁的线程

当⼀个线程尝试获得锁时,如果该锁已经被占⽤,则会将该线程封装成⼀ 个ObjectWaiter 对象插⼊到Contention List的队列的队⾸,然后调⽤ park 函数挂 起当前线程。

当线程释放锁时,会从Contention ListEntryList中挑选⼀个线程唤醒,被选中的 线程叫做 Heir presumptive 即假定继承⼈,假定继承⼈被唤醒后会尝试获得锁, 但 synchronized 是⾮公平的,所以假定继承⼈不⼀定能获得锁。这是因为对于重 量级锁,线程先⾃旋尝试获得锁,这样做的⽬的是为了减少执⾏操作系统同步操作 带来的开销。如果⾃旋不成功再进⼊等待队列。这对那些已经在等待队列中的线程 来说,稍微显得不公平,还有⼀个不公平的地⽅是⾃旋线程可能会抢占了Ready线 程的锁。

如果线程获得锁后调⽤ Object.wait ⽅法,则会将线程加⼊到WaitSet中,当 被 Object.notify 唤醒后,会将线程从WaitSet移动到Contention ListEntryList中 去。

需要注意的是,当调⽤⼀个锁对象的 wait 或 notify ⽅法时,如果当前锁的状态是偏向锁或轻量级锁则会先膨胀成重量级锁。

锁状态的升级流程

每⼀个线程在准备获取共享资源时:

第⼀步,检查锁的MarkWord⾥⾯是不是放的⾃⼰的ThreadId ,如果是,表示当前线程是处于 “偏向锁” 。

第⼆步,如果锁的MarkWord不是⾃⼰的ThreadId,锁状态升级成轻量级锁; 这时候,⽤CAS来执⾏切换,新的线程根据MarkWord⾥⾯现有的ThreadId,通知之前的线程暂停,同时之前的线程也会将锁的Markword的内容置为空。

第三步,两个线程都把锁对象的HashCode复制到⾃⼰新建的⽤于存储锁的记录的空间,接着开始通过CAS操作, 即把锁对象的MarKword的内容修改为⾃⼰新建的锁的记录空间的地址的⽅式,以此方式来竞争锁的MarkWord

第四步,第三步中成功执⾏CAS的线程获得资源,失败的线程则进⼊⾃旋 。

第五步,⾃旋的线程在⾃旋过程中,如果成功获得资源(即之前获的资源的线程执⾏完成并释放了共享资源),则锁状态依然处于轻量级锁的状态,如果⾃旋失败 。

第六步,锁状态则进⼊重量级锁的状态,这个时候,⾃旋的线程进⾏阻塞,等待之前的线程执⾏完成并唤醒⾃⼰。

各种锁状态优缺点对比

在这里插入图片描述

乐观锁和悲观锁

锁可以从不同的⻆度分类。其中,乐观锁和悲观锁是⼀种分类⽅式。

乐观锁与悲观锁的概念

悲观锁:

悲观锁就是我们常说的锁。

对于悲观锁来说,它总是认为每次访问共享资源时会发⽣冲突,所以必须对每次数据操作加上锁,以保证临界区的程序同⼀时间只能有⼀ 个线程在执⾏。

乐观锁:

乐观锁⼜称为“⽆锁”,顾名思义,它是乐观派。

乐观锁总是假设对共享资源的访问没有冲突,线程可以不停地执⾏,⽆需加锁也⽆需等待。

⽽⼀旦多个线程发⽣冲突,乐观锁通常是使⽤⼀种称为CAS的技术来保证线程执⾏的安全性。

由于⽆锁操作中没有锁的存在,因此不可能出现死锁的情况,也就是说乐观锁天⽣免疫死锁。

悲观锁多⽤于”写多读少“的环境,避免频繁失败和重试影响性能;

乐观锁多⽤于“读多写少“的环境,避免频繁加锁影响性能。

CAS的概念

CAS全称是:⽐较并交换(Compare And Swap)

CAS中,有这样三个值

  • V:要更新的变量(variable)
  • E:预期值(expected) [本质上指的是“旧值”]
  • N:新值(new)

⽐较并交换的过程如下

判断V是否等于E,如果等于,将V的值设置为N;

如果不等,说明已经有其它线程更新了V,则当前线程放弃更新,什么都不做。

具体示例如下:
-------------------------
1. 如果有⼀个多个线程共享的变量 i 原本等于5,我现在在线程A中,想把它设置为新的值6;
2. 我们使⽤CAS来做这个事情;
3. ⾸先我们⽤i去与5对⽐,发现它等于5,说明没有被其它线程改过,那我就把它设置为新的值6,此次CAS成功, i 的值被设置成了6;
4. 如果不等于5,说明 i 被其它线程改过了(⽐如现在 i 的值为2),那么我就什么也不做,此次CAS失败, i 的值仍然为2。

在这个例⼦中, i 就是V,5就是E,6就是N。

那有没有可能我在判断了 i 为5之后,正准备更新它的新值的时候,被其它线程更改了 i 的值呢?

答: 不会的。因为CAS是⼀种原⼦操作,它是⼀种系统原语,是⼀条CPU的原⼦指令, 从CPU层⾯保证它的原⼦性

注意:

当多个线程同时使⽤CAS操作⼀个变量时,只有⼀个会胜出,并成功更新,其余均会失败,但失败的线程并不会被挂起,仅是被告知失败,并且允许再次尝试,当然也允许失败的线程放弃操作。

JAVA实现CAS的原理-Unsafe类

前⾯提到,CAS是⼀种原⼦操作。那么Java是怎样来使⽤CAS的呢?

在 Java中,如果⼀个⽅法是native的,那Java就不负责具体实现它,⽽是交给底层的 JVM使⽤c或者c++去实现。

在Java中,有⼀个 Unsafe 类,它在 sun.misc 包中。它⾥⾯是⼀些 native ⽅法, 其中就有⼏个关于CAS的:

//他们都是 public native 的。
boolean compareAndSwapObject(Object o, long offset,Object expected, Object x);
boolean compareAndSwapInt(Object o, long offset,int expected,int x);
boolean compareAndSwapLong(Object o, long offset,long expected,long x);

Unsafe类中对CAS的实现是C++写的,它的具体实现和操作系统、CPU都有关系。

上⾯介绍了Unsafe类的⼏个⽀持CAS的⽅法。那Java具体是如何使⽤这⼏个⽅法 来实现原⼦操作的呢?

JDK提供了⼀些⽤于原⼦操作的类,在 java.util.concurrent.atomic 包下⾯,如下图所示:

在这里插入图片描述

从名字就可以看得出来这些类⼤概的⽤途:

  • 原⼦更新基本类型
  • 原⼦更新数组
  • 原⼦更新引⽤
  • 原⼦更新字段(属性)

CAS实现原子操作的三大问题
ABA问题

所谓ABA问题,就是⼀个值原来是A,变成了B,⼜变回了A。

这个时候使⽤CAS是 检查不出变化的,但实际上这个值却被更新了两次。

ABA问题的解决思路在变量前⾯追加上版本号或者时间戳

JDK 1.5开始, JDK的atomic包⾥提供了⼀个 AtomicStampedReference 类来解决ABA问题。

这个类的compareAndSet ⽅法的作⽤是⾸先检查当前引⽤是否等于预期引⽤,并且 检查当前标志是否等于预期标志,如果⼆者都相等,才使⽤CAS设置为新的值和标志。

public boolean compareAndSet(V expectedReference,
                             V newReference,
                             int expectedStamp,
                             int newStamp) {
    Pair<V> current = pair;
    return
        expectedReference == current.reference &&
        expectedStamp == current.stamp &&
        ((newReference == current.reference &&
          newStamp == current.stamp) ||
         casPair(current, Pair.of(newReference, newStamp)));
}
循环时间长开销大

CAS多与⾃旋结合。如果⾃旋CAS⻓时间不成功,会占⽤⼤量的CPU资源。

解决思路是让JVM⽀持处理器提供的pause指令

pause指令能让⾃旋失败时cpu睡眠⼀⼩段时间再继续⾃旋,从⽽使得读操作的频率低很多,为解决内存顺序冲突⽽导致的CPU流⽔线重排的代价也会⼩很多。

只能保证一个共享变量的原子操作

有两种解决⽅案:

  1. 使⽤JDK 1.5开始就提供的 AtomicReference 类保证对象之间的原⼦性,把多个变量放到⼀个对象⾥⾯进⾏CAS操作;

  2. 使⽤锁。锁内的临界区代码可以保证只有当前线程能操作。


锁接口和类

前⾯介绍了Java原⽣的锁——基于对象的锁,它⼀般是配合synchronized关键字来使⽤的。

实际上,Java在 java.util.concurrent.locks 包下,还为我们提供了 ⼏个关于锁的类和接⼝。它们有更强⼤的功能或更⾼的性能

synchronized的不足之处

  • 如果临界区代码只是只读操作,其实可以多线程⼀起执⾏,但使⽤synchronized之后的话,同⼀时间只能有⼀个线程执⾏
  • synchronized⽆法知道线程有没有成功获取到锁。
  • 使⽤synchronized,如果临界区因为IO或者sleep⽅法等原因阻塞了,⽽当前线程⼜没有释放锁,就会导致所有线程等待

可⻅,只是synchronized是远远不能满⾜多样化的业务对锁的要求的

锁的几种分类

可重入锁和非可重入锁

可重入锁和非可重入锁是两种锁的概念,它们主要涉及到在同一线程中多次获取锁的行为。

可重入锁(Reentrant Lock)
  1. 概念: 可重入锁允许同一线程持有锁的情况下多次获取该锁,而不会出现死锁

    换句话说,如果一个线程已经获得了某个锁,它可以再次获取该锁,而不会被自己所持有的锁阻塞。

  2. 实现: Java中的 ReentrantLock 就是可重入锁的一种实现。这种锁的设计允许同一线程在多个代码块中获取同一个锁,而不会发生死锁。

  3. 优势: 允许线程在持有锁的情况下多次获取锁,提高了灵活性。同时,可重入锁可以避免死锁的发生。

非可重入锁
  1. 概念: 非可重入锁是一种不允许同一线程多次获取同一个锁的锁。

    换句话说,如果一个线程已经获得了该锁,再次尝试获取会导致线程被阻塞,可能会出现死锁的情况。

  2. 实现: 非可重入锁的实现通常是通过简单的标记机制,记录锁的持有状态。当一个线程尝试获取已经持有的锁时,会被阻塞。

  3. 缺点: 非可重入锁的缺点在于,由于不允许同一线程多次获取锁,可能会降低程序的灵活性,并且容易导致死锁。


公平锁与非公平锁

公平锁和非公平锁主要涉及到线程在争夺锁时的获取顺序。

公平锁(Fair Lock)
  1. 概念: 公平锁是一种锁,它按照线程请求锁的顺序来分配锁。当多个线程同时竞争一个锁时,公平锁会按照线程的请求顺序,依次分配锁给这些线程,确保线程获得锁的顺序是按照它们发出请求的顺序来的。
  2. 实现: Java中的 ReentrantLock 提供了可选择的公平锁实现。通过在创建锁时传递 trueReentrantLock 的构造函数,可以创建一个公平锁。默认情况下,ReentrantLock 是非公平锁。
  3. 优势: 公平锁确保了线程按照请求锁的顺序获取锁,避免了线程饥饿现象(长时间获取不到锁),每个线程都有机会获得锁。
非公平锁(Non Fair Lock)
  1. 念: 非公平锁是一种锁,它允许线程按照一定策略插队,可能会导致后请求锁的线程在前请求线程先获得锁。非公平锁的优势在于减少线程切换的开销,提高程序的执行效率。
  2. 实现: Java中的 ReentrantLock 默认是非公平锁。通过在创建锁时传递 falseReentrantLock 的构造函数,也可以明确创建一个非公平锁。
  3. 优势: 非公平锁允许线程插队,可能会提高整体的吞吐量,特别是在高并发的情况下。

读写锁和排它锁

读写锁和排它锁是多线程环境下用于控制对共享资源访问的两种不同类型的锁。

读写锁(Read-Write Lock)
  1. 概念: 读写锁允许多个线程同时读取共享资源,但只允许一个线程写入共享资源。

    读多写少的场景中,读写锁能够提高并发性,允许多个线程同时读取数据,但在写入时需要独占锁。

  2. 实现: Java中的 ReentrantReadWriteLock 就是读写锁的一种实现。该锁分为读锁和写锁,多个线程可以同时持有读锁,但只有一个线程能够持有写锁。当有线程持有写锁时,其他线程无法获取读锁或写锁。

  3. 优势: 读写锁适用于读操作频繁、写操作相对较少的情况,可以提高并发性和吞吐量。

排它锁(Exclusive lock)
  1. 概念: 排它锁是一种独占锁,同一时刻只允许一个线程持有。当一个线程持有排它锁时,其他线程无法同时持有相同的锁,从而实现对共享资源的互斥访问。
  2. 实现: Java中的 ReentrantLock 就是排它锁的一种实现。通过调用 lock() 方法获取锁,只有一个线程能够成功获取锁,其他线程将被阻塞。
  3. 优势: 排它锁适用于对共享资源进行写操作,确保在写入时只有一个线程能够进行,避免并发写入导致的数据不一致。

可⻅,只是synchronized是远远不能满⾜多样化的业务对锁的要求的。所以下面介绍⼀下JDK中有关锁的⼀些接⼝和类.


JDK中有关锁的接口和类

JDK中关于并发的类⼤多都在 java.util.concurrent 简称juc) ⽽juc.locks包看名字就知道,是提供了⼀些并发锁的⼯具类的。

抽象类 AQS/AQLS/AOS

这三个抽象类有⼀定的关系.

⾸先看AQS(AbstractQueuedSynchronizer),之前有章节介绍这个类, 它是在JDK 1.5 发布的,提供了⼀个“队列同步器”的基本功能实现。

AQS⾥⾯的 “资源”是⽤⼀个 int 类型的数据来表示的,有时候我们的业务需求资源的数量超出 了 int 的范围,所以在JDK 1.6 中,多了AQLS(AbstractQueuedLongSynchronizer)。它的代码跟AQS⼏乎⼀样,只是把 资源的类型变成了 long 类型。

AQSAQLS都继承了⼀个类叫 AOS(AbstractOwnableSynchronizer)

这个类 也是在JDK 1.6 中出现的。这个类只有⼏⾏简单的代码。从源码类上的注释可以知 道,它是⽤于表示锁与持有者之间的关系(独占模式)。可以看⼀下它的主要⽅法:

// 独占模式,锁的持有者 
private transient Thread exclusiveOwnerThread; 
// 设置锁持有者 
protected final void setExclusiveOwnerThread(Thread t) { 
    exclusiveOwnerThread = t; 
} 
// 获取锁的持有线程 
protected final Thread getExclusiveOwnerThread() { 
    return exclusiveOwnerThread; 
}

接口 Lock/ReadWriteLock/Condition

juc.locks包下共有三个接⼝: Lock ReadWriteLock Condition

Lock ReadWriteLock从名字就可以看得出来,分别是读写锁的意思。

Lock

Lock接⼝⾥ ⾯有⼀些获取锁和释放锁的⽅法声明:

1.void lock():

该方法用于获取锁,如果锁不可用,则当前线程将一直等待直到获取锁。如果获取锁成功,将立即返回。

2.void lockInterruptibly() throws InterruptedException:
该方法与 lock() 类似,但是允许线程在等待锁的过程中被中断。
如果线程在等待锁的过程中被中断,将抛出 InterruptedException 异常。

3.boolean tryLock():
尝试获取锁,如果锁可用,则获取锁并立即返回 true。
如果锁不可用,则立即返回 false,不等待。

4.boolean tryLock(long time, TimeUnit unit) throws InterruptedException:
在指定的时间内尝试获取锁。
如果在指定时间内获取到锁,返回 true,否则返回 false。
如果线程在等待锁的过程中被中断,将抛出 InterruptedException 异常。

5.void unlock():
释放锁,允许其他线程获取该锁。
必须在持有锁的线程中调用,否则会抛出 IllegalMonitorStateException 异常。

6.Condition newCondition():
返回一个与该锁关联的条件对象(Condition)。
条件对象通常用于在锁上进行等待和通知。

Lock 接口提供了更灵活的锁操作,相比于传统的 synchronized 关键字,它提供了更多的控制和功能。

Lock接口的具体的实现类包括 ReentrantLockReentrantReadWriteLock.ReadLockReentrantReadWriteLock.WriteLock 等。

这些实现类也都提供了对应的锁操作方法。

ReadWriteLock

ReadWriteLock⾥⾯只有两个⽅法,分别 返回“读锁”和“写锁”:

public interface ReadWriteLock {
 Lock readLock();
 Lock writeLock();
}

Condition

Lock接⼝中有⼀个⽅法是可以获得⼀个 Condition:

Condition newCondition():
返回一个与该锁关联的条件对象(Condition)。
条件对象通常用于在锁上进行等待和通知。

之前提到了每个对象都可以⽤继承⾃ Object 的wait/notify⽅法来实现等待/通知机制。

⽽Condition接⼝也提供了类似Object监视器的⽅法,通过与Lock配合来实现等待/通知机制。

那为什么既然有Object的监视器⽅法了,还要⽤Condition呢?

Condition和Object的wait/notify基本相似。

其中,Condition的await⽅法对应的是 Object的wait⽅法,

⽽Condition的signal/signalAll⽅法则对应Object的 notify/notifyAll()。但Condition类似于Object的等待/通知机制的加强版

Condition的主要方法如下:

1.await():
使当前线程等待,并释放该条件的锁。
当其他线程调用相同条件对象的 signal() 或 signalAll() 方法时,当前线程将被唤醒,并在重新获得锁后继续执行。

2.awaitUninterruptibly():
与 await() 类似,但是不会响应线程的中断。即使当前线程被中断,它也会继续等待,直到被唤醒。

3.signal():
唤醒在此条件上等待的一个线程。
如果有多个线程在等待,只有其中一个会被唤醒

4.signalAll():
唤醒在此条件上等待的所有线程。
将所有等待的线程都唤醒,使它们有机会竞争锁。

5.awaitNanos(long nanosTimeout):
使当前线程等待,直到被唤醒、被中断或经过了指定的等待时间。
返回值表示剩余的等待时间(纳秒)。

6.awaitUntil(Date deadline):
使当前线程等待,直到被唤醒、被中断或到达指定的截止时间。
返回值为 true 表示在截止时间之前被唤醒,false 表示到达截止时间后返回。

7.await(long time, TimeUnit unit):
与 awaitNanos 类似,但是参数是时间单位而不是纳秒。

8.awaitUninterruptibly(long time, TimeUnit unit):
与 awaitUninterruptibly 类似,但是参数是时间单位而不是纳秒。

这些方法允许线程在等待某个条件成立时进入等待状态,并在其他线程满足条件时被唤醒。

通常,Condition 与 Lock 一起使用,以实现更复杂的线程同步和协调。

锁对象和条件对象的区别和联系:

  1. 锁对象(Lock Object):
    • 锁对象是一种用于多线程同步的机制,用于控制对共享资源的访问。在 Java 中,ReentrantLock 是一个实现锁的类。
    • 锁对象通过 lock() 方法来获取锁,线程在持有锁的情况下可以执行临界区的代码,通过 unlock() 方法释放锁。
  2. 条件对象(Condition Object):
    • 条件对象是锁对象的一部分,通过锁对象的 newCondition() 方法创建。条件对象允许线程在等待某个条件满足的情况下进入等待状态,并在条件满足时被通知继续执行。
    • 条件对象提供了 await() 方法用于等待,signal()signalAll() 方法用于通知等待线程。
  3. 区别和联系:
    • 锁对象是用于互斥访问共享资源的,而条件对象用于线程间的协调和通信。
    • 锁对象通过 lock()unlock() 控制对临界区的访问,而条件对象通过 await()signal()signalAll() 等方法进行线程的等待和通知。
    • 锁对象可以有多个关联的条件对象,每个条件对象管理一组等待线程。线程在某个条件上等待,直到其他线程通过相同条件的通知唤醒。
    • 通过条件对象,线程可以选择在某个条件上等待,避免忙等待,提高效率。

在实际使用中,锁对象和条件对象的结合通常用于实现复杂的线程同步和通信机制,确保线程之间能够安全地访问共享资源并进行有效的协调。


ReentrantLock

ReentrantLock是⼀个⾮抽象类,它是Lock接⼝的JDK默认实现,实现了锁的基本功能。

  • 从名字上看,可以看出它是⼀个”可重⼊“锁,

  • ReentrantLock可以⽀持”公 平锁“和”⾮公平锁“。

    ReentrantLock的构造⽅法⾥,可以传⼊⼀个 boolean 类型的参数,来指定它是否是⼀个公平锁,默认情况下是⾮公平的。这个参数⼀旦实例化后就不能修改,只能通过 isFair() ⽅法来查看。

  • ReentrantLock的锁是”独占“的,也就是 说,它的锁都是”排他锁“,不能共享。

ReentrantReadWriteLock

ReentrantReadWriteLock类也是⼀个⾮抽象类,它是ReadWriteLock接⼝的JDK默认实现

它与 ReentrantLock的功能类似,同样是可重⼊的,⽀持⾮公平锁和公平锁。不同的是,它还⽀持”读写锁“。

ReentrantReadWriteLock实现了读写锁,但它有⼀个⼩弊端,就是在“写”操作的时候,其它线程不能写也不能读。我们称这种现象为“写饥饿”.

StampedLock

StampedLock 类是在Java 8 才发布的,它没有实现Lock接⼝ReadWriteLock接⼝,但它其实是实现了“读写锁”的功能,并且性能⽐ReentrantReadWriteLock更⾼。

StampedLock还把读锁分 为了“乐观读锁”和“悲观读锁”两种。

前⾯提到了ReentrantReadWriteLock会发⽣“写饥饿”的现象,但StampedLock不 会。它是怎么做到的呢?

它的核⼼思想在于,在读的时候如果发⽣了写,应该通过 重试的⽅式来获取新的值,⽽不应该阻塞写操作。这种模式也就是典型的⽆锁编程 思想,和CAS⾃旋的思想⼀样。

这种操作⽅式决定了StampedLock在读线程⾮常多 ⽽写线程⾮常少的场景下⾮常适⽤,同时还避免了写饥饿情况的发⽣。

总的来说,StampedLock的性能是⾮常优异的,基本上可以取代 ReentrantReadWriteLock的作⽤。

AQS

AQS是什么?

AQS 的全称为 AbstractQueuedSynchronizer ,翻译过来的意思就是抽象队列同步器。

从字⾯意思上理解:

  • 抽象:抽象类,只实现⼀些主要逻辑,有些⽅法由⼦类实现;

  • 队列:使⽤先进先出(FIFO)队列存储数据;

  • 同步:实现了同步的功能。

这个类在 java.util.concurrent.locks 包下面。

AQS 就是一个抽象类,主要用来构建锁和同步器

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
}

AQS构建锁和同步器提供了一些通用功能的实现,因此,使用 AQS能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的 ReentrantLockSemaphore,其他的诸如 ReentrantReadWriteLockSynchronousQueue,FutureTask等等皆是基于 AQS 的。

当然,我们⾃⼰也能利⽤AQS⾮常轻松容易地构造出符合我们⾃⼰需求的同步器, 只要子类实现它的⼏个 protected ⽅法就可以了。

资源共享模式

资源有两种共享模式,或者说两种同步⽅式:

  • 独占模式(Exclusive):资源是独占的,⼀次只能⼀个线程获取。如 ReentrantLock

  • 共享模式(Share):同时可以被多个线程获取,具体的资源个数可以通过参数指定。如Semaphore/CountDownLatch

⼀般情况下,⼦类只需要根据需求实现其中⼀种模式,当然也有同时实现两种模式 的同步类,如 ReadWriteLock


线程池

什么是线程池?

顾名思义,线程池就是管理一系列线程的资源池。当有任务要处理时,直接从线程池中获取线程来处理,处理完之后线程并不会立即被销毁,而是等待下一个任务。

为什么要用线程池?

池化技术已经屡见不鲜了,线程池、数据库连接池、Http 连接池等等都是对这个思想的应用。

池化技术的思想主要是为了减少每次获取资源的消耗,提高对资源的利用率。

使⽤线程池主要有以下三个原因:

  • 创建/销毁线程需要消耗系统资源,线程池可以复⽤已创建的线程。
  • 控制并发的数量。并发数量过多,可能会导致资源消耗过多,从⽽造成服务器 崩溃。(主要原因)
  • 可以对线程做统⼀管理。

线程池的原理

Java中的线程池顶层接⼝是 Executor 接⼝,

ThreadPoolExecutor 是这个接⼝的实现类

还有一个常用的接口是ExecutorService接口, 其继承自 Executor 接口,提供了更丰富的任务提交、执行和管理异步任务的执行,以及管理线程池的生命周期。

任务: 表示需要在线程池中执行的工作单元。

可以是实现了 Runnable 接口的任务,也可以是实现了 Callable 接口有返回值的任务。


先看看 ThreadPoolExecutor 类。

(推荐)通过ThreadPoolExecutor构造函数创建线程池

一共有4个构造方法:

// 五个参数的构造函数
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue)
    
// 六个参数的构造函数-1
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory)
// 六个参数的构造函数-2
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler)
    
// 七个参数的构造函数
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

看看必须的5个参数是什么意思:

  • int corePoolSize:该线程池中核⼼线程数最⼤值

    核⼼线程:

    线程池中有两类线程,核⼼线程和⾮核⼼线程。

    核⼼线程默认情况下会⼀直存在于线程池中,即使这个核⼼线程什么都不⼲(铁饭碗),⽽⾮核⼼线程如果⻓时间的闲置,就会被销毁(临时⼯)。

  • int maximumPoolSize:该线程池中线程总数最⼤值

    该值等于核⼼线程数量 + ⾮核⼼线程数量

  • long keepAliveTime:⾮核⼼线程闲置超时时⻓

    ⾮核⼼线程如果处于闲置状态超过该值,就会被销毁。

    如果设置 allowCoreThreadTimeOut(true),则会也作⽤于核⼼线程。

  • TimeUnit unit:keepAliveTime的单位。

    TimeUnit是⼀个枚举类型 ,包括以下属性:

    NANOSECONDS : 1微毫秒 = 1微秒 / 1000 MICROSECONDS : 1微秒 = 1毫秒 / 1000

    MILLISECONDS : 1毫秒 = 1秒 /1000

    SECONDS : 秒

    MINUTES : 分

    HOURS : ⼩时

    DAYS : 天

  • BlockingQueue workQueue:阻塞队列,维护着等待执⾏的Runnable任务对象。当一个任务提交到线程池时,线程池会将任务放入阻塞队列。

    常用的阻塞队列:

    1.LinkedBlockingQueue:

    链式阻塞队列,底层数据结构是链表,默认⼤⼩是 Integer.MAX_VALUE , 也可以指定⼤⼩。

    2.ArrayBlockingQueue:

    数组阻塞队列,底层数据结构是数组,需要指定队列的⼤⼩。

    3. SynchronousQueue:

    同步队列,内部容量为0,每个put操作必须等待⼀个take操作,反之亦然。

    4. DelayQueue:

    延迟队列,该队列中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素 。

介绍完5个必须的参数之后,还有两个⾮必须的参数。

  • ThreadFactory threadFactory

    创建线程的⼯⼚ ,⽤于批量创建线程,统⼀在创建线程时设置⼀些参数,如是否是守护线程、线程的优先级等。如果不指定,会新建⼀个默认的线程⼯⼚。

    static class DefaultThreadFactory implements ThreadFactory {
        // 省略属性
        // 构造函数
        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
            Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                poolNumber.getAndIncrement() +
                "-thread-";
        }
        // 省略
    }
    
  • RejectedExecutionHandler handler

    1. ThreadPoolExecutor.AbortPolicy:默认拒绝处理策略,丢弃任务并抛出RejectedExecutionException异常
    2. ThreadPoolExecutor.DiscardPolicy:丢弃新来的任务,但是不抛出异常。
    3. ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列头部(最旧的) 的任务,然后重新尝试执⾏程序(如果再次失败,重复此过程)。
    4. ThreadPoolExecutor.CallerRunsPolicy:调⽤线程处理该任务。
线程池的状态

线程池本身有⼀个调度线程,这个线程就是⽤于管理整个线程池⾥的各种任务和事务,例如创建线程、销毁线程、任务队列管理、线程队列管理等等。

故线程池也有⾃⼰的状态。

ThreadPoolExecutor 类中定义了⼀个volatile int型变 量runState来表示线程池的状态 ,分别为RUNNINGSHUTDOWNSTOPTIDYINGTERMINATED

  • 线程池创建后处于RUNNING状态。

  • 调⽤shutdown()⽅法后处于SHUTDOWN状态,线程池不能接受新的任务,同时清除⼀些空闲worker,同时会等待阻塞队列的任务完成。

  • 调⽤shutdownNow()⽅法后处于STOP状态,线程池不能接受新的任务,中断 所有线程,阻塞队列中没有被执⾏的任务全部丢弃。此时,poolsize=0,阻塞队列的size也为0。

  • ThreadPoolExecutor中有⼀个控制状态的属性叫ctl,它是⼀个 AtomicInteger类型的变量。

    当所有的任务已终⽌,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。 接着会执⾏terminated()函数

  • 线程池处在TIDYING状态时,执⾏完terminated()⽅法之后,就会由 TIDYING -> TERMINATED, 线程池被设置为TERMINATED状态。

注意:

在多线程的环境下,线程池的状态是时刻发⽣变化的。

线程池的任务处理流程

处理任务的核⼼⽅法是 execute().

线程池中的任务处理流程如下:

在这里插入图片描述

  1. 线程总数量 < corePoolSize,⽆论线程是否空闲,都会新建⼀个核⼼线程执⾏任务(让核⼼线程数量快速达到corePoolSize,在核⼼线程数量 < corePoolSize时)。注意,这⼀步需要获得线程池的全局锁。

  2. 线程总数量 >= corePoolSize时,新来的线程任务会进⼊任务队列中等待,然 后空闲的核⼼线程会依次去缓存队列中取任务来执⾏(体现了线程复⽤)。

  3. 当缓存队列满了,说明这个时候任务已经多到爆棚,需要⼀些“临时⼯”来执⾏ 这些任务了。于是会创建⾮核⼼线程去执⾏这个任务。注意,这⼀步需要获得线程池的全局锁

  4. 缓存队列满了, 且总线程数达到了maximumPoolSize,则会采取上⾯提到的 拒绝策略进⾏处理。


ThreadPoolExecutor如何做到线程复用?

我们知道,⼀个线程在创建的时候会指定⼀个线程任务,当执⾏完这个线程任务之后,线程⾃动销毁。但是线程池却可以复⽤线程,即⼀个线程执⾏完线程任务后不 销毁,继续执⾏另外的线程任务。

线程池如何做到线程复⽤呢?

原来,ThreadPoolExecutor在创建线程时,会将线程封装成⼯作线程worker,并放⼊⼯作线程组中,然后这个工作线程worker反复从阻塞队列中拿任务去执⾏。 当一个线程完成了当前任务后,它可以从阻塞队列中取出下一个任务继续执行,而不是销毁该线程。

这样,线程可以被复用,避免了频繁创建和销毁线程的开销。


使用工具类创建四种常见的线程池

Executors 工具类中提供了⼏个静态⽅法来创建线程池。

newCachedThreadPool

创建一个线程池,该线程池中的线程数量根据需要动态增加。

public static ExecutorService newCachedThreadPool() {
    //ThreadPoolExecutor构造函数(传递的参数意思参照上面)
    return new ThreadPoolExecutor(0,                                                     Integer.MAX_VALUE,
                                  60L,                                                   TimeUnit.SECONDS,
                                  new                                       SynchronousQueue<Runnable>());
}

newCachedThreadPool的运⾏流程如下:

  1. 提交任务进线程池。

  2. 因为corePoolSize为0的关系,不创建核⼼线程,线程池最⼤为 Integer.MAX_VALUE

  3. 尝试将任务添加到SynchronousQueue队列

  4. 如果SynchronousQueue⼊列成功,则等待当前运⾏的线程空闲后取出来执⾏。

    如果当前没有空闲线程,那么就创建⼀个⾮核⼼线程,然后从 SynchronousQueue拉取任务并执⾏。

    1. 如果SynchronousQueue已有任务在等待,那么任务⼊列操作将会阻塞。

当需要执⾏很多短时间的任务时,CacheThreadPool的线程复⽤率⽐较⾼, 会显著的提⾼性能。

⽽且线程60s后会回收,意味着即使没有任务进来, CacheThreadPool并不会占⽤很多资源。


newFixedThreadPool

创建一个固定数量的线程池,线程池线程数量固定为指定的数量。

public static ExecutorService newFixedThreadPool(int nThreads) {
    //ThreadPoolExecutor构造函数(传递的参数意思参照上面)
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

核⼼线程数量和总线程数量相等,都是传⼊的整型参数nThreads,所以只能创建核⼼线程,不能创建⾮核⼼线程。

因为LinkedBlockingQueue的默认⼤⼩是 Integer.MAX_VALUE,故如果核⼼线程空闲,则任务交给核⼼线程处理;如果核⼼线程不空闲,则任务⼊列等待,直到核⼼线程空闲。

newFixedThreadPool和newCachedThreadPool的区别?

  • 因为 newFixedThreadPoolcorePoolSize == maximumPoolSize ,所以newFixedThreadPool只会创建核 ⼼线程。

    newCachedThreadPool因为corePoolSize=0,所以只会创建⾮核⼼线程。

  • newFixedThreadPoolgetTask() ⽅法,如果阻塞队列⾥没有任务可取,线程会⼀直阻塞在 LinkedBlockingQueue.take() ,线程不会被回收。

    newCachedThreadPool当阻塞队列里没有任务可取时,则会在 60s后回收线程。

  • 由于newFixedThreadPool的线程不会被回收,⼀直卡在阻塞,所以在没有任务的情况下, FixedThreadPool占⽤资源更多。

  • 都⼏乎不会触发拒绝策略,但是原理不同。

    newFixedThreadPool是因为阻塞队列 可以很⼤(最⼤为Integer最⼤值),由上面的线程池的任务处理流程图可知,故其⼏乎不会触发拒绝策略;

    newCachedThreadPool是因为线程池很⼤(最⼤为Integer最⼤值),⼏乎不会导致线程数量⼤于最⼤线程数,由上面的线程池的任务处理流程图可知,故其⼏乎不会触发拒绝策略。

newScheduledThreadPool

创建⼀个线程池,⽀持定时及周期性任务执⾏。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    //ScheduledThreadPoolExecutor 是ScheduledExecutorService接口的具体实现类,提供了定时执行任务的功能。
    return 
        new ScheduledThreadPoolExecutor(corePoolSize);
}

public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {
    // 构造方法
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        //ThreadPoolExecutor的构造方法(传递的参数意思参照上面)
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
    }
    // 其他方法实现,包括定时执行任务的逻辑等
}
newSingleThreadExecutor

创建一个单线程的线程池,该线程池中只有一个线程在工作,其他任务都在阻塞队列中等待。

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        //ThreadPoolExecutor的构造方法(传递的参数意思参照上面)
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

可以看到有且仅有⼀个核⼼线程( corePoolSize == maximumPoolSize=1),使⽤了 LinkedBlockingQueue(容量很⼤),所以,不会创建⾮核⼼线程。

所有任务按照 先来先执⾏的顺序执⾏。

如果这个唯⼀的线程不空闲,那么新来的任务会存储在任 务队列⾥等待执⾏。


总结:

四种常⻅的线程池基本够我们使⽤了,但是《阿⾥把把开发⼿册》不建议我们直接 使⽤Executors类中的线程池,⽽是通过 ThreadPoolExecutor构造函数 的⽅式来创建线程池,这样的处理⽅式让人需要更加明确线程池的运⾏规则,规避资源耗尽的⻛险。


如何给线程池命名?

初始化线程池的时候需要显示命名(设置线程池名称前缀),有利于定位问题。

默认情况下创建的线程名字类似 pool-1-thread-n 这样的,没有业务含义,不利于我们定位问题。

给线程池里的线程命名通常有下面两种方式:

1、利用 guava 的 ThreadFactoryBuilder

//Guava 是 Google 提供的一个开源的 Java 标准库扩展库
ThreadFactory threadFactory = new ThreadFactoryBuilder()
                        .setNameFormat(threadNamePrefix + "-%d")
                        .setDaemon(true).build();
ExecutorService threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory)

2、自己实现 ThreadFactor接口

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
 * 线程工厂,它设置线程名称,有利于我们定位问题。
 */
public final class NamingThreadFactory implements ThreadFactory {

    private final AtomicInteger threadNum = new AtomicInteger();
    private final ThreadFactory delegate;
    private final String name;

    /**
     * 创建一个带名字的线程池生产工厂
     */
    public NamingThreadFactory(ThreadFactory delegate, String name) {
        this.delegate = delegate;
        this.name = name; // TODO consider uniquifying this
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = delegate.newThread(r);
        t.setName(name + " [#" + threadNum.incrementAndGet() + "]");
        return t;
    }

}

如何设定线程池的大小?

很多人甚至可能都会觉得把线程池配置过大一点比较好!我觉得这明显是有问题的。

就拿我们生活中非常常见的一例子来说:并不是人多就能把事情做好,增加了沟通交流成本。你本来一件事情只需要 3 个人做,你硬是拉来了 6 个人,会提升做事效率嘛?并不会。 线程数量过多的影响也是和我们分配多少人做事情一样,对于多线程这个场景来说主要是增加了上下文切换成本。

我们可以肯定的一点是线程池大小设置过大或者过小都会有问题

  • 如果我们设置的线程池数量太小的话,如果同一时间有大量任务/请求需要处理,可能会导致大量的请求/任务在任务队列中排队等待执行,甚至会出现任务队列满了之后任务/请求无法处理的情况,或者大量任务堆积在任务队列导致 OOM。这样很明显是有问题的,CPU 根本没有得到充分利用。
  • 如果我们设置线程数量太大,大量线程可能会同时在争取 CPU 资源,这样会导致大量的上下文切换,从而增加线程的执行时间,影响了整体执行效率。

有一个简单并且适用面比较广的公式:

  • CPU 密集型任务(N+1): 这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1。比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
  • I/O 密集型任务(2N): 这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理(没有过多的上下文切换),这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。

阻塞队列

阻塞队列的由来

假设⼀种场景,⽣产者⼀直⽣产资源,消费者⼀直消费资源,资源存储在⼀个缓冲池中,⽣产者将⽣产的资源存进缓冲池中,消费者从缓冲池中拿到资源进⾏消 费,这就是⽣产者-消费者模式。

该模式能够简化开发过程,⼀⽅⾯消除了⽣产者类与消费者类之间的代码依赖性, 另⼀⽅⾯将⽣产数据的过程与使⽤数据的过程解耦简化负载。

⾃⼰coding实现这个模式的时候,因为需要让多个线程操作共享变量(即资源),所以很容易引发线程安全问题,造成重复消费和死锁,尤其是⽣产者和消费 者存在多个的情况。

另外,当缓冲池空了,我们需要阻塞消费者,唤醒⽣产者;当 缓冲池满了,我们需要阻塞⽣产者,唤醒消费者,这些个等待-唤醒逻辑都需要⾃ ⼰实现。

这么容易出错的事情,JDK已经帮我们做啦,这就是阻塞队列(BlockingQueue), 你只管往⾥⾯存、取就⾏,⽽不⽤担⼼多线程环境下存、取共享变量的线程安全问题

BlockingQueueJava util.concurrent包下重要的数据结构,区别于普通的队列,BlockingQueue提供了线程安全的队列访问⽅式,并发包下很多⾼级同步类的实现都是基于BlockingQueue实现的.

BlockingQueue⼀般⽤于⽣产者-消费者模式⽣产者是往队列⾥添加元素的线程, 消费者是从队列⾥拿元素的线程。BlockingQueue就是存放元素的容器


阻塞队列的API方法

阻塞队列提供了四组不同的⽅法⽤于插⼊移除检查元素:

方法类别会抛出异常的方法返回特殊值的方法会一直阻塞的方法会超时退出的方法
插入元素add(e)offer(e)put(e)offer(e,time,timeunit)
移除元素remove()poll()take()poll(time,timeunit)
检查(返回)队列头部元素element()peek()
  • 抛出异常:如果试图的操作⽆法⽴即执⾏,则抛异常。

    例如当阻塞队列满时候,再往队列⾥插⼊元素,会抛出IllegalStateException(“Queue full”)异常。

    例如当队列为空时,从队列⾥获取元素时会抛出NoSuchElementException异常 。

  • 返回特殊值:如果试图的操作⽆法⽴即执⾏,返回⼀个特殊值,通常是true / false。

  • ⼀直阻塞:如果试图的操作⽆法⽴即执⾏,则⼀直阻塞或者响应中断。

  • 超时退出:如果试图的操作⽆法⽴即执⾏,该⽅法调⽤将会发⽣阻塞,直到能够执⾏,但等待时间不会超过给定值,并且会返回⼀个特定值以告知该操作是否成 功,通常是 true / false。

注意:

  • 不能往阻塞队列中插⼊null,会抛出空指针异常。
  • 可以访问阻塞队列中的任意元素,且调⽤remove(o)可以将队列之中的特定对象移除,但并不⾼效,尽量避免使⽤。

阻塞队列的实现类

ArrayBlockingQueue

由数组结构组成的有界阻塞队列

内部结构是数组,故具有数组的特性。

//可以初始化队列⼤⼩, 且⼀旦初始化不能改变。构造⽅法中的fair表示控制对象的内部锁是否采⽤公平锁,默认是⾮公平锁。
public ArrayBlockingQueue(int capacity, boolean fair){
 //....
}
LinkedBlockingQueue

由链表结构组成的有界阻塞队列

内部结构是链表,具有链表的特性。

默认的队列的⼤⼩是 Integer.MAX_VALUE ,当然也可以手动指定队列的⼤⼩。此队列按照先进先出的原则对元素进⾏排序。

注意:

使用默认队列大小的LinkedBlockingQueue,不会阻塞数据⽣产者(因为队列是⽆界的),⽽只会在没 有可消费的数据时,阻塞数据的消费者。

因此使⽤的时候要特别注意,⽣产者⽣产 数据的速度绝对不能快于消费者消费数据的速度,否则时间⼀⻓,会最终耗尽所有 的可⽤堆内存空间。

DelayQueue

Delay Queue(延迟队列) 是 Java 中的一种特殊无界阻塞队列,其特点是队列中的元素只有当其指定的延迟时间到了,才能够从队列中获取到。

插入到 DelayQueue 中的元素必须实现 Delayed 接口,该接口定义了一个 getDelay(TimeUnit unit) 方法,返回元素的剩余延迟时间;同时,该接口继承自 Comparable 接口,因此需要实现 compareTo 方法。compareTo方法用于比较元素的延迟时间,以确定元素的顺序。

示例如下:

import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

class DelayedTask implements Delayed {
    private String taskName;
    private long startTime;

    public DelayedTask(String taskName, long delay) {
        this.taskName = taskName;
        this.startTime = System.currentTimeMillis() + delay;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        long diff = startTime - System.currentTimeMillis();
        return unit.convert(diff, TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        return Long.compare(this.startTime, ((DelayedTask) o).startTime);
    }

    public String getTaskName() {
        return taskName;
    }
}

public class DelayQueueExample {
    public static void main(String[] args) throws InterruptedException {
        DelayQueue<DelayedTask> delayQueue = new DelayQueue<>();

        // 添加延迟任务到队列
        delayQueue.put(new DelayedTask("Task 1", 2000));  // 任务1延迟2秒
        delayQueue.put(new DelayedTask("Task 2", 4000));  // 任务2延迟4秒
        delayQueue.put(new DelayedTask("Task 3", 6000));  // 任务3延迟6秒

        // 模拟消费者从队列中取出任务并执行
        while (!delayQueue.isEmpty()) {
            DelayedTask task = delayQueue.take();
            System.out.println("Executing task: " + task.getTaskName() + " at " + System.currentTimeMillis());
        }
    }
}

注意:

DelayQueue不会阻塞数据⽣产者(因为队列是⽆界的),⽽只会在没 有可消费的数据时,阻塞数据的消费者。

因此使⽤的时候要特别注意,⽣产者⽣产 数据的速度绝对不能快于消费者消费数据的速度,否则时间⼀⻓,会最终耗尽所有 的可⽤堆内存空间。

PriorityBlockingQueue

PriorityQueue (优先级阻塞队列)是 Java 中的一个基于优先级的无界阻塞队列

它按照元素的自然顺序或通过构造函数传入的 Comparator 对象来确定元素的优先级。

示例:

import java.util.PriorityQueue;

public class PriorityQueueExample {
    public static void main(String[] args) {
        // 创建一个优先级队列,默认按照元素的自然顺序进行排序
        PriorityQueue<Integer> priorityQueue = new PriorityQueue<>();

        // 添加元素到队列
        priorityQueue.add(5);
        priorityQueue.add(2);
        priorityQueue.add(8);

        // 输出队列中的元素(按照优先级顺序)
        System.out.println("PriorityQueue elements: " + priorityQueue);

        // 使用自定义 Comparator 创建优先级队列
        PriorityQueue<String> customPriorityQueue = new PriorityQueue<>((s1, s2) -> s2.length() - s1.length());
        customPriorityQueue.add("apple");
        customPriorityQueue.add("banana");
        customPriorityQueue.add("orange");

        // 输出自定义 Comparator 的优先级队列中的元素
        System.out.println("Custom PriorityQueue elements: " + customPriorityQueue);
    }
}

注意:

PriorityBlockingQueue不会阻塞数据⽣产者(因为队列是⽆界的),⽽只会在没 有可消费的数据时,阻塞数据的消费者。

因此使⽤的时候要特别注意,⽣产者⽣产 数据的速度绝对不能快于消费者消费数据的速度,否则时间⼀⻓,会最终耗尽所有 的可⽤堆内存空间。

SynchronousQueue

SynchronousQueue 是一种特殊的队列,它没有任何内部容量,甚至不存储任何元素。与其他队列不同,SynchronousQueue 没有队列的容量限制,也不保留元素。

每个 put 操作都必须等待对应的 take 操作,反之亦然。

在这里插入图片描述

如图所示,SynchronousQueue 最大的不同之处在于,它的容量为 0,所以没有一个地方来暂存元素,导致每次取数据都要先阻塞,直到有数据被放入;同理,每次放数据的时候也会阻塞,直到有消费者来取。

SynchronousQueue 的容量不是 1 而是 0,因为 SynchronousQueue 不需要去持有元素,它所做的就是直接传递(direct handoff)。由于每当需要传递的时候,SynchronousQueue 会把元素直接从生产者传给消费者,在此期间并不需要做存储,所以如果运用得当,它的效率是很高的。

这种同步性质使得 SynchronousQueue 在线程之间进行元素传递时非常有用。

换句话说SynchronousQueue 可以看作是一个零容量的阻塞队列,其中元素的存储和移除,是通过线程之间的相互配合完成的。

**具体来说:**当一个线程调用 put 方法时,它会将要添加的元素暂时保存在本地变量中,然后阻塞自己,等待另一个线程调用 take 方法取走该元素。当另一个线程调用 take 方法时,它也会阻塞自己,等待另一个线程调用 put 方法并将要传递的元素交给它。在这个过程中,元素的实际存储并不需要使用队列,因为队列并没有容量,而是仅用于实现线程之间的协作机制。


以下是一些方法的返回值,有助于理解 SynchronousQueue

  • iterator(): 永远返回空,因为 SynchronousQueue 内部不存储元素。
  • peek(): 永远返回 null,因为 SynchronousQueue 不保留任何元素。
  • put(): 往队列放进去一个元素后就一直等待,直到有其他线程进来把这个元素取走。即,put 操作会阻塞,直到有对应的 take 操作。
  • offer(): 往队列里放一个元素后立即返回,如果碰巧这个元素被另一个线程取走了,offer 方法返回 true,认为 offer 成功;否则返回 false。
  • take(): 取出并且移除队列里的元素,如果队列为空,它会一直等待直到有元素可取。
  • poll(): 取出并且移除队列里的元素,只有在另外一个线程正在往队列里提供数据(通过 offerput 操作)的时候,该方法才会成功取到元素。否则立即返回 null。
  • isEmpty(): 永远返回 true,因为 SynchronousQueue 不存储元素。
  • remove() & removeAll(): 永远返回 false,因为 SynchronousQueue 不存储元素,也不支持删除操作。

下面展示了如何在两个线程之间使用 SynchronousQueue 进行元素交换:

import java.util.concurrent.SynchronousQueue;
 
public class SynchronousQueueExample {
    public static void main(String[] args) {
        final SynchronousQueue<Integer> queue = new SynchronousQueue<Integer>();
 
        Thread producer = new Thread(new Runnable() {
            public void run() {
                try {
                    int value = 42;
                    System.out.println("Producer is putting: " + value);
                    queue.put(value);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });
 
        Thread consumer = new Thread(new Runnable() {
            public void run() {
                try {
                    int value = queue.take();
                    System.out.println("Consumer is taking: " + value);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });
 
        producer.start();
        consumer.start();
    }
}

//在上面程序中,我们创建了一个 SynchronousQueue 实例,然后创建了一个生产者线程和一个消费者线程。生产者线程调用 put 方法将值 42 添加到队列中,然后阻塞等待消费者线程取走这个元素。消费者线程调用 take 方法从队列中取出一个元素,并在控制台输出它。由于这个队列是同步的,因此生产者线程和消费者线程将会同步进行,直到元素被成功交换。

注意:

(1)对于 SynchronousQueue 来说,put 和 take 操作必须是交替执行的
如果一个线程调用了 put 方法并被阻塞,那么它只有等待另一个线程调用 take 方法才能继续执行。反之,如果一个线程调用了 take 方法并被阻塞,那么它只有等待另一个线程调用 put 方法才能继续执行。

这种交替执行的特性使得 SynchronousQueue 在某些情况下非常有用。

例如,当一个线程需要向另一个线程传递数据时,可以使用 SynchronousQueue 来保证线程之间的同步和可靠性。在这种情况下,一个线程负责生产数据并将其添加到队列中,而另一个线程负责消费数据并从队列中取出它。由于队列中始终只有一个元素,因此这种操作是非常高效和可靠的。

但要注意,如果一个线程调用了 put 方法但是没有其他线程调用 take 方法取走元素,那么这个线程就会一直被阻塞下去。同样地,如果一个线程调用了 take 方法但是没有其他线程调用 put 方法添加元素,那么这个线程也会一直被阻塞下去。

(2)SynchronousQueue 中的元素不可重复使用。
一旦一个线程调用 put 方法将元素添加到队列中,并被另一个线程调用 take 方法取走,该元素就会从队列中消失,不再可用于后续的操作了。这也是 SynchronousQueue 的一个特殊之处,使得它非常适用于需要高效、安全地将数据传递给另一个线程的场景。


因此,在使用 SynchronousQueue 时,需要特别注意线程的调度和同步问题,以免造成死锁或其他问题。


阻塞队列的原理

阻塞队列的原理,利⽤了Lock锁的多条件(Condition)阻塞控制。

我们以分析ArrayBlockingQueue JDK 1.8 的源码为例:

//数据元素数组
final Object[] items;
//下⼀个待取出元素索引
int takeIndex;
//下⼀个待添加元素索引
int putIndex;
//元素个数
int count;
//内部锁
final ReentrantLock lock;
//消费者监视器
private final Condition notEmpty;
//⽣产者监视器
private final Condition notFull; 
public ArrayBlockingQueue(int capacity, boolean fair) {
    //..省略其他代码
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull = lock.newCondition();
}

其中put操作的源码:

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    // 1.⾃旋拿锁
    lock.lockInterruptibly();
    try {
        // 2.判断队列是否满了
        while (count == items.length)
            // 2.1如果满了,阻塞该线程,并标记为notFull线程,
            // 等待notFull的唤醒,唤醒之后继续执⾏while循环。
            notFull.await();
        // 3.如果没有满,则进⼊队列
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    // 4 唤醒⼀个等待的线程
    notEmpty.signal();
}

总结put的流程:

  1. 所有执⾏put操作的线程竞争lock锁,拿到了lock锁的线程进⼊下⼀步,没有拿 到lock锁的线程⾃旋竞争锁。 \
  2. 判断阻塞队列是否满了,如果满了,则调⽤await⽅法阻塞这个线程,并标记 为notFull(⽣产者)线程,同时释放lock锁,等待被消费者线程唤醒。
  3. 如果没有满,则调⽤enqueue⽅法将元素put进阻塞队列。注意这⼀步的线程还 有⼀种情况是第⼆步中阻塞的线程被唤醒且⼜拿到了lock锁的线程。
  4. 唤醒⼀个标记为notEmpty(消费者)的线程。

take操作的源码:

    public E take() throws InterruptedException {
     final ReentrantLock lock = this.lock;
     lock.lockInterruptibly();
     try {
     while (count == 0)
     notEmpty.await();
     return dequeue();
     } finally {
     lock.unlock();
     }
    }
    private E dequeue() {
     // assert lock.getHoldCount() == 1;
     // assert items[takeIndex] != null;
     final Object[] items = this.items;
     @SuppressWarnings("unchecked")
     E x = (E) items[takeIndex];
     items[takeIndex] = null;
     if (++takeIndex == items.length)
     takeIndex = 0;
     count--;
     if (itrs != null)
     itrs.elementDequeued();
     notFull.signal();
     return x;
    }

take操作和put操作的流程是类似的,总结⼀下take操作的流程:

  1. 所有执⾏take操作的线程竞争lock锁,拿到了lock锁的线程进⼊下⼀步,没有 拿到lock锁的线程⾃旋竞争锁。
  2. 判断阻塞队列是否为空,如果是空,则调⽤await⽅法阻塞这个线程,并标记 为notEmpty(消费者)线程,同时释放lock锁,等待被⽣产者线程唤醒。
  3. 如果没有空,则调⽤dequeue⽅法。注意这⼀步的线程还有⼀种情况是第⼆步 中阻塞的线程被唤醒且⼜拿到了lock锁的线程。
  4. 唤醒⼀个标记为notFull(⽣产者)的线程。

注意:

  1. put和take操作都需要先获取锁,没有获取到锁的线程会被挡在第⼀道⼤⻔之外 ⾃旋拿锁,直到获取到锁。
  2. 就算拿到锁了之后,也不⼀定会顺利进⾏put/take操作,需要判断队列是否可 ⽤(是否满/空),如果不可⽤,则会被阻塞,并释放锁。
  3. 在第2点被阻塞的线程会被唤醒,但是在唤醒之后,依然需要拿到锁才能继续 往下执⾏,否则,⾃旋拿锁,拿到锁了再while判断队列是否可⽤。

阻塞队列使用场景

  • 生产者-消费者模式。

    参考学习链接:有道云笔记 (youdao.com)

  • 创建线程池的指定阻塞队列。

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
    

    Java中的线程池就是使⽤阻塞队列实现的,在了解阻塞队列之后,⽆论是使⽤ Exectors类中已经提供的线程池,还是⾃⼰通过ThreadPoolExecutor实现线程池, 都会更加得⼼应⼿.


并发容器集合

并发容器是Java中用于在多线程环境下安全地操作数据集合的工具。这些容器提供了线程安全的访问和操作接口,以便多个线程可以同时访问集合而不会导致数据不一致或其他并发问题。

并发容器是Java 5 提供的在多线程编程下⽤于代替同步容器,针对不同的应⽤场景 进⾏设计,提⾼容器的并发访问性,同时定义了线程安全的复合操作。

整体架构图:

在这里插入图片描述

并发Map

ConcurrentMap接口

ConcurrentMap接⼝继承了Map接⼝,在Map接⼝的基础上⼜定义了四个⽅法:

public interface ConcurrentMap<K, V> extends Map<K, V> {
    //插⼊元素
    //与原有put⽅法不同的是,putIfAbsent⽅法中如果插⼊的key相同,则不替换原有的value值;
    V putIfAbsent(K key, V value);
    //移除元素
    //与原有remove⽅法不同的是,新remove⽅法中增加了对value的判断,如果要删除的key-value不能与Map中原有的key-value对应上,则不会删除该元素;
    boolean remove(Object key, Object value);
    //替换元素
    //增加了对value值的判断,如果key-oldValue能与Map中原有的key-value对应上,才进⾏替换操作;
    boolean replace(K key, V oldValue, V newValue);
    //替换元素
    //与上⾯的replace不同的是,此replace不会对Map中原有的key-value进⾏⽐较,如果key存在则直接替换;
    V replace(K key, V value);
}
ConcurrentHashMap类

ConcurrentHashMapHashMap⼀样也是基于散列表的map,但是它提供了⼀种与HashMap完全不同的加锁策略提供更⾼效的并发性和伸缩性。

ConcurrentHashMap提供了⼀种粒度更细的加锁机制来实现在多线程下更⾼的性能,这种机制叫分段锁(Lock Striping)。


可以这样理解分段锁,就是将数据分段,对每⼀段数据分配⼀把锁。当⼀个线程占 ⽤锁访问其中⼀个段数据的时候,其他段的数据也能被其他线程访问。

有些⽅法需要跨段,⽐如size()isEmpty()containsValue(),它们可能需要锁定整个表⽽⽽不仅仅是某个段,这需要按顺序锁定所有段,操作完毕后,⼜按顺序释 放所有段的锁。


ConcurrentHashMap是由Segment数组结构HashEntry数组结构组成。

Segment数组中每一个Segment 是⼀种可重⼊锁ReentrantLockHashEntry数组中每一个HashEntry则⽤于存储键值对数据。

Segment的结构和HashMap 类似,是⼀种数组和链表结构, ⼀个Segment⾥包含⼀个HashEntry数组,每个 HashEntry是⼀个链表结构(同HashMap⼀样,它也会在⻓度达到8的时候转化为红⿊树)的元素, 每个Segment守护着⼀个HashEntry,当对 HashEntry的数据进⾏修改时,必须⾸先获得它对应的Segment锁


总的来说:

ConcurrentHashMapHashMap 的线程安全版本,适用于并发环境。

它使用了分段锁的机制,将数据分割成多个段,每个段有自己的锁,不同的段可以并发访问,从而提高了并发性能。


ConcurrentNavigableMap接⼝

ConcurrentNavigableMap接⼝继承了NavigableMap接⼝,这个接⼝提供了针对给定搜索⽬标返回最接近匹配项的导航⽅法。

ConcurrentSkipListMap类

ConcurrentNavigableMap接⼝的主要实现类是ConcurrentSkipListMap类。

从名字上来看,它的底层使⽤的是跳表(SkipList)的数据结构。关于跳表的数据结构这 ⾥不做介绍,它是⼀种”空间换时间“的数据结构,可以使⽤CAS来保证并发安全性。

并发Queue

JDK并没有提供线程安全的List类,因为对List来说,很难去开发⼀个通⽤并且没有并发瓶颈的线程安全的List。

因为即使简单的读操作,拿contains() 这样⼀个操作来说,很难搜索的时候如何避免锁住整个list。


JDK提供了对双端队列和队列的线程安全的类: ConcurrentLinkedDequeConcurrentLinkedQueue

队列相对于List来说,有更多的限制。

这两个类是使⽤CAS来实现线程安全的。

ConcurrentLinkedQueue 是一个无锁的队列实现,适用于高并发的生产者-消费者场景。它采用基于链表的数据结构,支持高效的并发操作。

ConcurrentLinkedDeque 是 Java 中 BlockingDeque 接口的一个实现,它是一个非阻塞的、线程安全的双端队列(支持在队列的两端进行元素的插入和删除。可以在队头或队尾进行插入、删除、获取元素等操作)。ConcurrentLinkedDeque 的特点在于其非阻塞算法,不使用锁,而是通过 CAS(Compare-And-Swap)等原子操作来实现线程安全。

BlockingDeque 接口定义了双端队列,支持在两端插入和移除元素的阻塞操作。LinkedBlockingDeque 是其一个实现。

并发Set

JDK提供了ConcurrentSkipListSet,是线程安全的有序的集合。ConcurrentSkipListSet 是基于跳表(Skip List)的并发实现。它提供了有序的 key-value 映射和有序的集合,并在并发环境中保持线程安全。底层是使⽤ ConcurrentSkipListMap实现。

⾕歌的guava框架则实现了⼀个线程安全的ConcurrentHashSet

Set<String> s = Sets.newConcurrentHashSet();

CopyOnWrite容器

什么是CopyOnWrite?

CopyOnWrite是计算机设计领域中的⼀种优化策略,也是⼀种在并发场景下常⽤的 设计思想——写⼊时复制思想.

什么是写⼊时复制思想?

就是当有多个调⽤者同时去请求⼀个资源数据的时 候,有⼀个调⽤者出于某些原因需要对当前的数据源进⾏修改,这个时候系统将会 复制⼀个当前数据源的副本给调⽤者修改。

什么是CopyOnWrite容器?

CopyOnWrite容器即写时复制的容器,当我们往⼀个容器中添加元素的时候,不直接 往容器中添加,⽽是将当前容器进⾏copy,复制出来⼀个新的容器,然后向新容器 中添加我们需要的元素,最后将原容器的引⽤指向新容器。

这样做的好处在于,我们可以在并发的场景下对容器进⾏"读操作"⽽不需要"加 锁",从⽽达到读写分离的⽬的。

JDK 1.5 开始Java并发包⾥提供了两个使⽤ CopyOnWrite机制实现的并发容器 ,分别是CopyOnWriteArrayListCopyOnWriteArraySet.

注意:

CopyOnWrite容器有数据⼀致性的问题,它只能保证最终数据⼀致性。

CopyOnWrite机制的原理可知,写入数据是在新容器,读取数据是在老容器,所以如果我们希望写⼊的数据⻢上能准确地读取,请不要使⽤CopyOnWrite容器

CopyOnWriteArrayList

优点:

CopyOnWriteArrayListArrayList 的线程安全版本。它通过在修改集合时创建一个新的拷贝集合(写时复制),⽆需任何同步措施,从而保证读取数据时不受锁的影响,⼤⼤增强了读的性能。适用于读多写少的场景。

CopyOnWriteArrayList由于其"读写分离",遍历和修改操作分别作⽤在不同的List容 器,所以在使⽤迭代器遍历的时候,则不会抛出异常。

缺点:

第⼀个缺点是CopyOnWriteArrayList每次执⾏写操作都会将原容器进⾏拷⻉ 了⼀份,数据量⼤的时候,内存会存在较⼤的压⼒,可能会引起频繁Full GC。⽐如原先这些对象占⽤的内存⼤约200M左右, 那么再写⼊100M数据进去,内存就会多占⽤300M

第⼆个缺点是CopyOnWriteArrayList由于实现的原因,写和读分别作⽤在不同新⽼容器上,在写操作执⾏过程中,读不会阻塞,但读取到的却是⽼容器的数据。

注意:

  • CopyOnWriteArrayList读操作是没有加锁的,而写操作是加锁的。
CopyOnWriteArraySet

CopyOnWriteArraySetSet 接口的线程安全实现,它基于 CopyOnWriteArrayList 实现。

它的特点是读取时无锁,写入时通过拷贝数组的方式实现。

业务场景实现

来具体结合业务场景实现⼀个CopyOnWriteMap的并发容器并且来使 ⽤它。

import java.util.Collection;
import java.util.Map;
import java.util.Set;
public class CopyOnWriteMap<K, V> implements Map<K, V>, Cloneable {
    
    private volatile Map<K, V> internalMap;
    
    public CopyOnWriteMap() {
        internalMap = new HashMap<K, V>();
    }
    
    public V put(K key, V value) {
        synchronized (this) {
            //HashMap 的构造函数有一个可以接受 Map 接口类型参数的重载。这个构造函数用于创建一个新的 HashMap,并将指定 Map 中的所有映射复制到新的 HashMap 中。
            Map<K, V> newMap = new HashMap<K, V>(internalMap);
            V val = newMap.put(key, value);
            internalMap = newMap;
            return val;
        }
    }
    
    public V get(Object key) {
        return internalMap.get(key);
    }
    
    public void putAll(Map<? extends K, ? extends V> newData) {
        synchronized (this) {
            Map<K, V> newMap = new HashMap<K, V>(internalMap);
            newMap.putAll(newData);
            internalMap = newMap;
        }
    }
}

上⾯就是参考CopyOnWriteArrayList实现的CopyOnWriteMap,我们可以⽤这个容 器来做什么呢?

结合我们之前说的CopyOnWrite的复制思想,它最适⽤于“读多写 少”的并发场景。

场景:假如我们有⼀个搜索的⽹站需要屏蔽⼀些“关键字”,“⿊名单”每晚定时更 新,每当⽤户搜索的时候,“⿊名单”中的关键字不会出现在搜索结果当中,并且提示⽤户敏感字。

// ⿊名单服务
public class BlackListServiceImpl {
    // 减少扩容开销。根据实际需要,初始化CopyOnWriteMap的⼤⼩,避免写时CopyOnWriteMap扩
    private static CopyOnWriteMap<String, Boolean> blackListMap = 
        new CopyOnWriteMap<String, Boolean>(1000);
    
    
    public static boolean isBlackList(String id) {
        return blackListMap.get(id) == null ? false : true;
    }
    
    public static void addBlackList(String id) {
        blackListMap.put(id, Boolean.TRUE);
    }
    /**
 * 批量添加⿊名单
 * (使⽤批量添加。因为每次添加,容器每次都会进⾏复制,所以减少添加次数,可以减少容器的复制
 * 如使⽤上⾯代码⾥的addBlackList⽅法)
 * @param ids
 */
    public static void addBlackList(Map<String,Boolean> ids) {
        blackListMap.putAll(ids);
    }
}

并发工具类

JDK中提供了⼀些⼯具类以供开发者使⽤。

这样的话我们在遇到⼀些常⻅的应⽤场景时就可以使⽤这些⼯具类,⽽不⽤⾃⼰再重复造轮⼦了。

它们都在java.util.concurrent包[JUC]下。

作用
Semaphore限制线程的数量
Exchanger两个线程交换数据
CountDownLatch用于实现一种等待多个线程完成某个操作后再继续执行的场景。它通过一个计数器来实现,当计数器减到零时,等待的线程将被唤醒。【计数器不可重置】
CyclicBarrier作⽤跟CountDownLatch类似,但是可以重复使⽤;即当所有线程都到达屏障点后,计数器重置,可以再次使用。
Phaser增强版的CyclicBarrier

Semaphore

Semaphore是什么?

Semaphore翻译过来是信号的意思。

⽽这个“信号”是⼀个 int 类型的数据,也可以看成是⼀种“资源”。

Semaphore 有什么用?

synchronizedReentrantLock 都是一次只允许一个线程访问某个资源,而Semaphore(信号量)可以用来控制同时访问特定资源的线程数量。

所以:Semaphore 通常用于那些资源有明确访问数量限制的场景,比如限流。

注意:

当初始的资源个数为 1 的时候,Semaphore 退化为排他锁。

Semaphore 有两种模式:

  • 公平模式: 调用 acquire() 方法的顺序就是获取资源的顺序,遵循 FIFO;
  • 非公平模式: 抢占式的。

Semaphore 有两种构造方法:

//这两个构造方法,都必须提供许可的数量
//第一个构造方法
public Semaphore(int permits) {
  	sync = new NonfairSync(permits);
}

//第二个构造方法可以指定是公平模式还是非公平模式,默认非公平模式
public Semaphore(int permits, boolean fair) {
  	sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

Semaphore的方法:

acquire():
获取一个许可证,如果没有可用的许可证,线程将阻塞等待,直到有许可证可用。
    
acquire(int permits)
获取指定数量的许可证,如果没有足够的许可证,线程将阻塞等待,直到有足够的许可证可用。
    
tryAcquire();
尝试获取一个许可证,如果成功立即返回 true,否则返回 false
    
tryAcquire(int permits);
尝试获取指定数量的许可证,如果成功立即返回 true,否则返回 false
    
tryAcquire(long timeout, TimeUnit unit);
尝试获取一个许可证,在指定的时间内如果成功返回 true,否则返回 false。timeout 参数表示等待的最大时间,unit 参数表示时间单位
    
release();
释放一个许可证,使其可供其他线程使用
    
release(int permits);
释放指定数量的许可证
    
availablePermits();
返回当前可用的许可证数量
    
getQueueLength();
返回正在等待获取许可证的线程数
    
hasQueuedThreads();
返回是否有线程正在等待获取许可证
    
reducePermits(int reduction);
减少许可证的数量。可以用于降低阻塞线程数
    
drainPermits();
获取并返回所有当前可用的许可证,并将可用许可证数量设置为零

最主要的⽅法是acquire⽅法和release⽅法。

acquire()⽅法会申请⼀个permit,⽽ release⽅法会释放⼀个permit。当然,也可以申请多个acquire(int permits)或者释放多个release(int permits)。

如果permit减少到了0,再有其他线程来 acquire,那就要阻塞这个线程直到有其它线程release permit为⽌。


Semaphore应用示例:

Semaphore往往⽤于资源有限的场景中,去限制线程的数量。

举个例⼦,我想限制同时只能有3个线程在⼯作:

public class SemaphoreDemo{
    //静态内部类(自定义的线程类)
    static class MyThread implements Runnable{
        private int value;
        private Semaphore semaphore;
        
        //构造函数
        public MyThread(int value,Semaphore semaphore){
            this.value = value;
            this.semaphore = semaphore;
        }
        
        @Override
        public void run(){
            try{
                //获取一个permit
                semaphore.acquire();
                System.out.println(String.format("当前线程是%d,还剩%d个线程,还有%d个线程在等待",value,semaphore.availablePermits(),semaphore.getQueueLength()));
                // 睡眠随机时间,打乱释放顺序
                Random random =new Random();
                Thread.sleep(random.nextInt(1000));
                // 释放一个permit
                semaphore.release();
                System.out.println(String.format("线程%d释放了资源", value));
            }catch(Exception e){
                e.printStackTrace();
            }
        }
    }
    
    public static void main(String[] args){
        Semaphore semaphore = new Semaphore(3);
        for(int i=0;i < 10;i++){
            new Thread(new MyThread(i,semaphore)).start();
        }
    }
}

//输出结果
当前线程是1, 还剩2个资源,还有0个线程在等待
当前线程是0, 还剩1个资源,还有0个线程在等待
当前线程是6, 还剩0个资源,还有0个线程在等待
线程6释放了资源
当前线程是2, 还剩0个资源,还有6个线程在等待
线程2释放了资源
当前线程是4, 还剩0个资源,还有5个线程在等待
线程0释放了资源
当前线程是7, 还剩0个资源,还有4个线程在等待
线程1释放了资源
当前线程是8, 还剩0个资源,还有3个线程在等待
线程7释放了资源
当前线程是5, 还剩0个资源,还有2个线程在等待
线程4释放了资源
当前线程是3, 还剩0个资源,还有1个线程在等待
线程8释放了资源
当前线程是9, 还剩0个资源,还有0个线程在等待
线程9释放了资源
线程5释放了资源
线程3释放了资源

可以看到,在这次运⾏中,最开始是1, 0, 6这三个线程获得了资源,⽽其它线程进⼊了等待队列。

然后当某个线程释放资源后,就会有等待队列中的线程获得资源。


当然,Semaphore默认的acquire⽅法是会让线程进⼊等待队列,且会抛出中断异常。

但它还有⼀些⽅法可以忽略中断不进⼊阻塞队列

// 忽略中断
public void acquireUninterruptibly()
public void acquireUninterruptibly(int permits)
    
// 不进⼊等待队列,底层使⽤CAS
public boolean tryAcquire
public boolean tryAcquire(int permits)
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
 throws InterruptedException
public boolean tryAcquire(long timeout, TimeUnit unit)

Semaphore原理

Semaphore内部有⼀个继承了AQS的同步器Sync,重写了 tryAcquireShared ⽅ 法。在这个⽅法⾥,会去尝试获取资源。 如果获取失败(想要的资源数量⼩于⽬前已有的资源数量),就会返回⼀个负数 (代表尝试获取资源失败)。然后当前线程就会进⼊AQS的等待队列。


Exchanger

Exchanger类⽤于两个线程交换数据。它⽀持泛型,也就是说你可以在两个线程之 间传送任何数据。

案例:两个线程之间想要传递字符串

public class ExchangerDemo {
    public static void main(String[] args) throws InterruptedException {
        //Exchanger(两个线程之间传递字符串)
        Exchanger<String> exchanger = new Exchanger<>();
        //线程A
        new Thread(() -> {
            try {
                System.out.println("这是线程A,得到了另⼀个线程的数据:"
                                   + exchanger.exchange("这是来⾃线程A的数据"));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        
        System.out.println("这个时候线程A是阻塞的,在等待线程B的数据");
        Thread.sleep(1000);
        
        //线程B
        new Thread(() -> {
            try {
                System.out.println("这是线程B,得到了另⼀个线程的数据:"
                                   + exchanger.exchange("这是来⾃线程B的数据"));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

//输出结果
这个时候线程A是阻塞的,在等待线程B的数据
这是线程B,得到了另⼀个线程的数据:这是来⾃线程A的数据
这是线程A,得到了另⼀个线程的数据:这是来⾃线程B的数据

可以看到,当⼀个线程调⽤exchange⽅法后,它是处于阻塞状态的,只有当另⼀ 个线程也调⽤了exchange⽅法,它才会继续向下执⾏。

Exchanger类还有⼀个有超时参数的⽅法,如果在指定时间内没有另⼀个线程调⽤ exchange,就会抛出⼀个超时异常。

public V exchange(V x, long timeout, TimeUnit unit)

那么问题来了,Exchanger只能是两个线程交换数据吗?那三个调⽤同⼀个实例的 exchange⽅法会发⽣什么呢?

答案是只有前两个线程会交换数据,第三个线程会 进⼊阻塞状态。

注意:

exchange是可以重复使⽤的。也就是说。两个线程可以使⽤ Exchanger在内存中不断地再交换数据。


CountDownLatch

解读⼀下CountDownLatch这个类名字的意义。

CountDown代表计数递减, Latch是“⻔闩”的意思,也有⼈把它称为“屏障”。

CountDownLatch这个类的作⽤ 也很贴合这个名字的意义,假设某个线程在执⾏任务之前,需要等待其它线程完成 ⼀些前置任务,必须等所有的前置任务都完成,才能开始执⾏本线程的任务。

CountDownLatch的方法如下:

// 构造⽅法:
public CountDownLatch(int count)
    
public void await() // 等待
public boolean await(long timeout, TimeUnit unit) // 超时等待
public void countDown() // count - 1
public long getCount() // 获取当前还有多少count

CountDownLatch案例示例:

我们知道,玩游戏的时候,在游戏真正开始之前,⼀般会等待⼀些前置任务完成, ⽐如“加载地图数据”,“加载⼈物模型”,“加载背景⾳乐”等等。只有当所有的东⻄都 加载完成后,玩家才能真正进⼊游戏。

下⾯就来模拟⼀下这个demo:

public class CountDownLatchDemo {
    // [静态内部类]定义前置任务线程[自定义线程类]
    static class PreTaskThread implements Runnable {
        private String task;
        private CountDownLatch countDownLatch;

        //constructor
        public PreTaskThread(String task, CountDownLatch countDownLatch) {
            this.task = task;
            this.countDownLatch = countDownLatch;
        }

        @Override
        public void run() {
            try {
                //stimulate random
                Random random = new Random();
                Thread.sleep(random.nextInt(1000));
                System.out.println(task + " - 任务完成");
                //计数器减1
                countDownLatch.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    //client
    public static void main(String[] args) {
        // 假设有三个模块需要加载
        CountDownLatch countDownLatch = new CountDownLatch(3);
        // 主任务
        new Thread(() -> {
            try {
                System.out.println("等待数据加载...");
                System.out.println(String.format("还有%d个前置任务", countDownLatch.getCount()));
                countDownLatch.await();
                System.out.println("数据加载完成,正式开始游戏!");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        // 前置任务(多线程,执行顺序随机)
        new Thread(new PreTaskThread("加载地图数据", countDownLatch)).start();
        new Thread(new PreTaskThread("加载⼈物模型", countDownLatch)).start();
        new Thread(new PreTaskThread("加载背景⾳乐", countDownLatch)).start();
    }
}

//输出结果:
等待数据加载...
还有3个前置任务
加载⼈物模型 - 任务完成
加载背景⾳乐 - 任务完成
加载地图数据 - 任务完成
数据加载完成,正式开始游戏!

CountDownLatch原理:

CountDownLatch类的原理挺简单的,内部同样是⼀个基于AQS的一个实现类 Sync,且实现起来还很简单,可能是JDK⾥⾯AQS的⼦类中最简单的实现了。

注意:

CountDownLatch类的构造器中的计数值(count)实际上就是闭锁需要等待的线程数量。
这个值只能被设置⼀次,⽽且CountDownLatch没有提供任何机制去重新设置这个计数值。


CyclicBarrier

CyclicBarrirer从名字上来理解是“循环的屏障”的意思。

前⾯提到了 CountDownLatch⼀旦计数值 count 被降为0后,就不能再重新设置了,它只能起 ⼀次“屏障”的作⽤。

CyclicBarrier拥有CountDownLatch的所有功能,还可以使⽤ reset() ⽅法重置屏障

CyclicBarrier示例案例:

同样⽤玩游戏的例⼦。如果玩⼀个游戏有多个“关卡”,而每个’"关卡"都会有初始化任务, 那使⽤ CountDownLatch显然不太合适,因为那需要为每个关卡都创建⼀个实例。

那我们可以 使⽤CyclicBarrier来实现每个关卡的数据加载等待等初始化功能,如下:

public class CyclicBarrierDemo {  
    //[静态内部类]自定义的线程类
    static class PreTaskThread implements Runnable {  
        
        private String task;  
        private CyclicBarrier cyclicBarrier;  
        
        public PreTaskThread(String task, CyclicBarrier cyclicBarrier) {  
            this.task = task;  
            this.cyclicBarrier = cyclicBarrier;  
        }  
        
        @Override  
        public void run() {  
            // 假设总共三个关卡(故循环条件为 i <= 3  )
            for (int i = 1; i <= 3; i++) { 
                try {  
                    Random random = new Random();  
                    Thread.sleep(random.nextInt(1000));  
                    System.out.println(String.format("关卡%d的任务%s完成", i, task)); // 添加task到格式化字符串中  					  // 当一个线程执行到 cyclicBarrier.await()时
                    //阻塞当前线程:它会阻塞当前线程,直到相应数量的其他线程都执行到 cyclicBarrier.await();
                    //同步点:这是所有线程必须到达的点。只有当所有线程都到达这个点时,它们才会被解锁继续执行
                    cyclicBarrier.await(); 
                } catch (InterruptedException | BrokenBarrierException e) {  
                    e.printStackTrace();  
                }
            }
            cyclicBarrier.reset(); // 重置屏障,等待下一次使用
        }  
    }  
    
    //client
    public static void main(String[] args) {  
        //计数器初始化为3
        //⼀旦调⽤await()⽅法的线程数量等于构造⽅法中传⼊的任务总量(这⾥是3),就代表达到屏障了。
        //`CyclicBarrier`允许我们在达到屏障的时候可以执⾏⼀个任务,可以在构造⽅法传⼊⼀个Runnable类型的对象[这里是lambda表达式的形式]。
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {  
            System.out.println("本关卡所有前置任务完成,开始游戏...");  
        });  
        //三个线程
        new Thread(new PreTaskThread("加载地图数据", cyclicBarrier)).start();  
        new Thread(new PreTaskThread("加载人物模型", cyclicBarrier)).start();  
        new Thread(new PreTaskThread("加载背景音乐", cyclicBarrier)).start();  
    }  
}

//可能的一个输出结果:
关卡1的任务加载地图数据完成
关卡1的任务加载背景⾳乐完成
关卡1的任务加载⼈物模型完成
本关卡所有前置任务完成,开始游戏...
关卡2的任务加载地图数据完成
关卡2的任务加载背景⾳乐完成
关卡2的任务加载⼈物模型完成
本关卡所有前置任务完成,开始游戏...
关卡3的任务加载⼈物模型完成
关卡3的任务加载地图数据完成
关卡3的任务加载背景⾳乐完成
本关卡所有前置任务完成,开始游戏...

注意这⾥跟CountDownLatch的代码有⼀些不同。

CyclicBarrier没有分 为 await() countDown() ,⽽是只有单独的⼀个 await() ⽅法。

⼀旦调⽤await()⽅法的线程数量等于构造⽅法中传⼊的任务总量(这⾥是3),就代表达到屏障了。

CyclicBarrier允许我们在达到屏障的时候可以执⾏⼀个任务,可以在构造⽅法传⼊⼀个Runnable类型的对象。

// 构造⽅法
public CyclicBarrier(int parties) {
 this(parties, null);
}

//构造方法,传入Runnable类型对象
public CyclicBarrier(int parties, Runnable barrierAction) {
 // 具体实现
}

上述案例就是在达到屏障时,输出 “本关卡所有前置任务完成,开始游戏…”。

CyclicBarrier原理:

CyclicBarrier虽说功能与CountDownLatch类似,但是实现原理却完全不同, CyclicBarrier内部使⽤的是Lock + Condition实现的等待/通知模式。


Phaser

前⾯我们介绍了CyclicBarrier,可以发现它在构造⽅法⾥传⼊“任务总 量” parties 之后,就不能修改这个值了,并且每次调⽤ await() ⽅法也只能消耗 ⼀个 parties 计数,但Phaser类却可以动态地调整任务总量!

Phaser 提供了更灵活和复杂的同步控制,相比于 CountDownLatchCyclicBarrier,它更加强大,支持分阶段的同步。

名词解释:

  • party:对应⼀个线程,线程数量可以通过register或者构造参数传⼊;
  • arrive:对应⼀个party的状态,初始时是unarrived,当调⽤arriveAndAwaitAdvance() 或者 arriveAndDeregister() 进⼊arrive状态,可以通过 getUnarrivedParties() 获取当前未到达的数量;
  • register:注册⼀个party,每⼀阶段必须所有注册的party都到达才能进⼊下⼀ 阶段;
  • deRegister: 减少一个party.
  • phase:阶段,当所有注册的party都arrive之后,将会调⽤Phaser 的 onAdvance() ⽅法来判断是否要进⼊下⼀阶段

特点:

  1. 分阶段同步:
    • Phaser 支持多阶段同步,每个阶段可以有不同数量的参与者。这使得Phaser类更适合一些需要分阶段完成任务的场景。
  2. 动态注册和注销参与者:
    • 可以动态地注册和注销参与者,即线程可以在任意时刻加入或离开同步组。
  3. 适用于可变参与者数量:
    • 不同于 CyclicBarrierPhaser 不要求在创建时就确定参与者的数量,而是可以在运行时动态地添加或减少参与者。
  4. 支持可终止的同步:
    • 可以通过 PhaserarriveAndDeregister() 方法终止某个参与者,从而实现动态地终止同步。

常用方法:

  1. arrive()arriveAndAwaitAdvance()
    • arrive() 通知 Phaser 已完成当前阶段,但不会阻塞。
    • arriveAndAwaitAdvance() 通知并等待其他参与者到达,进入下一个阶段。
  2. register()bulkRegister(int parties)
    • register() 注册一个新的参与者。
    • bulkRegister(int parties) 注册多个新的参与者。
  3. arriveAndDeregister()
    • 通知并注销当前线程,从而动态地终止某个参与者。
  4. getPhase()
    • 获取当前阶段数。
  5. awaitAdvance(int phase)
    • 等待指定阶段的到来。
  6. forceTermination()
    • 强制终止所有参与者。

代码示例:

public class PhaserExample {
    public static void main(String[] args) {
        Phaser phaser = new Phaser();

        // 注册三个参与者
        phaser.register();
        phaser.register();
        phaser.register();

        for (int i = 0; i < 3; i++) {
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + " arrives at Phase " + phaser.getPhase());
                phaser.arriveAndAwaitAdvance(); // 等待其他参与者

                System.out.println(Thread.currentThread().getName() + " continues to execute.");
                phaser.arriveAndDeregister(); // 注销参与者
            }).start();
        }
    }
}
//这个示例创建了一个 Phaser,并注册了三个参与者。每个线程到达某个阶段后,会等待其他参与者,然后继续执行。在最后,每个线程注销自己,动态地终止参与者。

Fork/Join框架

什么是Fork/Join?

fork在英⽂⾥有分叉的意思,join在英⽂⾥连接、结合的意思。

顾名思义,fork就 是要使⼀个⼤任务分解成若⼲个⼩任务,⽽join就是最后将各个⼩任务的结果结合 起来得到⼤任务的结果。

并行计算和并发计算?

  1. 并行计算(Parallel Computing):

    并行计算是指同时执行多个计算任务,以提高整体计算性能的过程。

    在并行计算中,多个处理单元(例如多核处理器、多机集群等)同时执行不同的任务,每个处理单元独立地执行其任务,最后将结果合并。并行计算通常用于解决大规模问题,显著提高计算效率。

  2. 并发运算(Concurrent Computing):

    并发运算是指多个计算任务在同一时间段内交替执行的过程。

    在并发运算中,多个任务通过时间片轮转等机制共享处理器资源,它们之间可能会交替执行,也可能同时执行。

    并发运算通常用于解决任务之间需要交互或资源共享的问题,提高系统的响应性和资源利用率。

区别:

  • 关注点不同:
    • 并行计算关注于同一时刻多个处理单元的执行,目的是提高整体计算性能。
    • 并发运算关注于多个任务在同一时间段内交替执行,目的是提高系统的响应性和资源利用率。
  • 任务执行方式不同:
    • 并行计算中的任务通常是独立的,各自执行不同的计算,最后将结果合并。
    • 并发运算中的任务可能需要共享资源或交互,它们在时间上交替执行,共享同一计算资源。
  • 应用场景不同:
    • 并行计算适用于大规模的计算问题,例如科学计算、图形处理等。
    • 并发运算适用于需要处理多个任务的系统,例如操作系统、网络服务等。

总体而言,并行计算和并发运算都是为了提高计算系统的效率和性能,但它们关注的问题、解决的场景以及任务执行的方式有所不同。在实际应用中,也有一些场景是并行计算和并发运算同时存在的。


工作窃取(Work-Stealing)算法

Fork/Join 框架采用了 Work-Stealing 算法,每个线程都有自己的任务队列,当一个线程完成自己的任务后,会从其他线程的队列中窃取任务执行,以实现负载均衡。

在这里插入图片描述

注意的是:

当⼀个线程窃取另⼀个线程的时候,为了减少两个任务线程之间的竞争,我们通常使⽤双端队列来存储任务。

所以可以做到如被窃取的任务线程都从双端队列的头部拿任务执⾏,⽽窃取其他任务的线程从双端队列的尾部执⾏任务,不会竞争。

另外,当⼀个线程在窃取任务时要是没有其他可⽤的任务了,这个线程会进⼊阻塞状态以等待再次“⼯作”。

什么是Fork/Join框架

Fork/Join 框架实现的是并行计算。

它采用了并行计算的思想,通过将大任务拆分成小任务,然后并行地执行这些小任务,最后将它们的结果合并。

在 Fork/Join 框架中,每个小任务都是独立执行的,它们之间通常不需要共享资源或进行交互。这符合并行计算的特点,即多个处理单元同时执行独立的计算任务,以提高整体计算性能。

具体而言,Fork/Join框架是⼀个实现了ExecutorService接⼝的多线程处理器,它专为那些可以通递归分解成更细⼩的任务⽽设计,最⼤化的利⽤多核处理器来提⾼应⽤程序的性能。 与其他ExecutorService相关的实现类相同的是,Fork/Join框架会将任务分配给线程池中的线程。⽽与之不同的是,Fork/Join框架在执⾏任务时使⽤了⼯作窃取算法

Fork/Join的运⾏流程⼤致如下所示:

图⾥的次级⼦任务可以⼀直分下去,⼀直分到⼦任务⾜够⼩为⽌。

体现分⽽治之(divide and conquer) 的算法思想。

在这里插入图片描述

Fork/Join框架的具体实现

其实Fork/Join框架简单来讲就是对任务的分割与⼦任务的合并,所以要实现这个框架,先得有任务。

在Fork/Join框架⾥提供了抽象类 ForkJoinTask 来实现任务。

ForkJoinTask任务抽象类

ForkJoinTask 是 Java 并发包(java.util.concurrent)中用于支持 Fork/Join 框架的抽象类。

它提供了一种表示可以分解为更小任务的任务,并且支持在 Fork/Join 框架中执行的基本功能。

主要方法:

  1. fork()

    • 用于异步执行当前任务的一个子任务。

      它将任务提交给ForkJoinPool进行执行,然后立即返回,从而允许当前线程继续执行其他任务。

      其实fork()只做了⼀件事,那就是把任务推⼊当前⼯作线程的⼯作队列⾥。

  2. join()

    • 用于等待fork() 提交的子任务的完成,并获取其计算结果。

      如果子任务尚未完成,join() 将阻塞当前线程等待子任务完成。

  3. invoke()

    • 用于执行当前任务。

      如果当前任务还没有被提交给 ForkJoinPoolinvoke() 将在当前线程中执行任务;

      如果任务已经被提交,它将等效于 fork().join()

forkJoinTask.fork().join() 模式:

通过 fork() 方法异步执行当前任务,并通过 join() 方法等待其完成,从而实现任务的并行执行。


ForkJoinTask的子类

ForkJoinTask 主要有两个子类:RecursiveTaskRecursiveAction

RecursiveAction可以看做是⽆返回值的 ForkJoinTask

RecursiveTask有返回值的ForkJoinTask

通常情况下,在创建任务的时候我们⼀般不直接继承ForkJoinTask,⽽是继承它的⼦类RecursiveAction和RecursiveTask

此外,两个⼦类都有执⾏主要计算的⽅法compute(),当然,RecursiveAction的 compute()返回voidRecursiveTask的compute()有具体的返回值

RecursiveTask

RecursiveTask<V>

  • 表示有返回结果的可分解任务。通过继承 RecursiveTask,可以实现具有返回值的任务。
class MyTask extends RecursiveTask<Integer> {
    // 实现 compute() 方法
}
RecursiveAction

表示无返回结果的可分解任务。通过继承 RecursiveAction,可以实现不需要返回值的任务。

class MyTask extends RecursiveAction {
    // 实现 compute() 方法
}

ForkJoinPool线程池
介绍

前面我们说fork()提交的任务会提交到ForkJoinPool,那么就需要了解一下这个ForkJoinPool.

ForkJoinPool 是 Java 并发包中用于支持 Fork/Join 框架的执行引擎,它提供了线程池的管理和任务调度功能。

ForkJoinPoolExecutorService接口 的一种实现,专门用于执行 ForkJoinTask类型的任务。

⼤致看下ForkJoinPool的源码:

@sun.misc.Contended
    public class ForkJoinPool extends AbstractExecutorService {
        // 任务队列
        volatile WorkQueue[] workQueues; 
        // 线程的运⾏状态
        volatile int runState; 
        // 创建ForkJoinWorkerThread的默认⼯⼚,可以通过构造函数重写
        public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThread
            // 公⽤的线程池,其运⾏状态不受shutdown()和shutdownNow()的影响
            static final ForkJoinPool common;
        // 私有构造⽅法,没有任何安全检查和参数校验,由makeCommonPool直接调⽤
        // 其他构造⽅法都是源⾃于此⽅法
        // parallelism: 并⾏度,
        // 默认调⽤java.lang.Runtime.availableProcessors() ⽅法返回可⽤处理器的数量
        private ForkJoinPool(int parallelism,
                             ForkJoinWorkerThreadFactory factory, // ⼯作线程⼯⼚
                             UncaughtExceptionHandler handler, // 拒绝任务的handler
                             int mode, // 同步模式
                             String workerNamePrefix) { // 线程名prefix
            this.workerNamePrefix = workerNamePrefix;
            this.factory = factory;
            this.ueh = handler;
            this.config = (parallelism & SMASK) | mode;
            long np = (long)(-parallelism); // offset ctl counts
            this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK)
        }
    }

其中的WorkQueue[]数组就是双端队列, ForkJoinTask存放在这⾥

当⼯作线程在处理⾃⼰的⼯作队列时,会从队列尾取任务来执⾏(LIFO);

如果是 窃取其他队列的任务时,窃取的任务位于所属任务队列的队⾸(FIFO)。【关键取任务和窃取任务的方向不同即可


其中的runState表示的是ForkJoinPool的运⾏状态。SHUTDOWN状态⽤负数表示,其他⽤2的幂次表示。

注意:

ForkJoinPool传统线程池最显著的区别就是它维护了⼀个⼯作队列数组(volatile WorkQueue[] workQueues)

ForkJoinPool中的每个⼯作线程都维护着⼀个⼯作队列。


主要特点和方法

Work-Stealing 算法:

  • ForkJoinPool 使用 Work-Stealing 算法,每个线程都有自己的任务队列,当一个线程执行完自己的任务后,它会从其他线程的队列中窃取任务执行。这种机制有助于实现负载均衡,提高线程池的效率。
  1. fork()join() 方法:

    • ForkJoinPool 针对 ForkJoinTask 提供了 fork()join() 方法,用于异步执行任务和等待任务的完成。

      这两个方法是 Fork/Join 框架的核心。

  2. invoke(ForkJoinTask<T> task)

    • 提交一个 Fork/Join 任务进行执行,并等待任务执行完成。
    • 这是一个阻塞方法,直到任务执行完成才返回结果。
  3. execute(ForkJoinTask<?> task)

    • 异步执行一个 Fork/Join 任务,不等待任务完成。
  4. submit(ForkJoinTask<T> task) :

    • 用于提交任务,并返回一个 ForkJoinTaskFuture 对象,通过该对象可以获取任务的执行结果。
  5. shutdown()shutdownNow()

    • shutdown() 方法用于优雅地关闭线程池,已提交的任务将继续执行,但不再接受新的任务。
    • shutdownNow() 方法尝试立即关闭线程池,并停止所有正在执行的任务。
  6. getParallelism()

    • 获取线程池的并行度,即可并行执行的线程数。
  7. getStealCount()

    • 获取总共的工作窃取次数,用于监测工作窃取的效率。
示例代码:

⽤⼀个计算斐波那契数列第n项的例⼦来看⼀下Fork/Join的使⽤:

斐波那契数列数列是⼀个线性递推数列,从第三项开始,每⼀项的值都等于 前两项之和: 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89······

如果设f(n)为该数列的第n项(n∈N*),那么有:f(n) = f(n-1) + f(n-2)。

public class FibonacciTest {
    class Fibonacci extends RecursiveTask<Integer> {
        int n;
        public Fibonacci(int n) {
            this.n = n;
        }
        // 主要的实现逻辑都在compute()⾥
        @Override
        protected Integer compute() {
            // 这⾥先假设 n >= 0
            if (n <= 1) {
                return n;
            } else {
                // f(n-1)
                Fibonacci f1 = new Fibonacci(n - 1);
                f1.fork();
                // f(n-2)
                Fibonacci f2 = new Fibonacci(n - 2);
                f2.fork();
                // f(n) = f(n-1) + f(n-2)
                return f1.join() + f2.join();
            }
        }
    }
    @Test
    public void testFib() throws ExecutionException, InterruptedException {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        System.out.println("CPU核数:" + Runtime.getRuntime().availableProcessors());
        long start = System.currentTimeMillis();
        Fibonacci fibonacci = new Fibonacci(40);
        Future<Integer> future = forkJoinPool.submit(fibonacci);
        System.out.println(future.get());
        long end = System.currentTimeMillis();
        System.out.println(String.format("耗时:%d millis", end - start));
    }
}

注意:

  • 为什么普通的递归或循环效率执行更快呢?

因为Fork/Join是使⽤多个线程协作来 计算的,所以会有线程通信和线程切换的开销。

  • 面对不同的计算任务如何选择?

如果要计算的任务⽐较简单(⽐如案例中的斐波那契数列),那当然是直接使⽤单线程会更快⼀些。

但如果要计算的东⻄⽐较复杂,计算机⼜是多核的情况下, 就可以充分利⽤多核CPU来提⾼计算速度。


Java8 Stream流 并行计算原理

Java 8 Stream简介

从Java 8 开始,我们可以使⽤ Stream 接⼝以及lambda表达式进⾏“流式计算”。

它可以让我们对集合的操作更加简洁、更加可读、更加⾼效。

Stream接⼝有⾮常多⽤于集合计算的⽅法,⽐如判空操作empty、过滤操作filter、 求最max值、查找操作findFirst和findAny等等。

Stream单线程串⾏计算

Stream接⼝默认是使⽤串⾏的⽅式,也就是说在⼀个线程⾥执⾏。

下⾯举⼀个例 ⼦:

public class StreamDemo {
    public static void main(String[] args) {
        Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9)
            .reduce((a, b) -> {
                System.out.println(String.format("%s: %d + %d = %d",
                                                 Thread.currentThread().getName(), a, b, a + b));
                return a + b;
            })
            .ifPresent(System.out::println);
    }
}

//⾸先我们⽤整数1~9创建了⼀个 Stream 。这⾥的Stream.of(T... values)⽅法是Stream接⼝的⼀个静态⽅法,其底层调⽤的是Arrays.stream(T[] array)⽅法。然后我们使⽤了 reduce ⽅法来计算这个集合的累加和。 reduce ⽅法这⾥做的是:从前两个元素开始,进⾏某种操作(我这⾥进⾏的是加法操作)后,返回⼀个结果,然后再拿这个结果跟第三个元素执⾏同样的操作,以此类推,直到最后的⼀个元素。我们来打印⼀下当前这个reduce操作的线程以及它们被操作的元素和返回的结果以及最后所有reduce⽅法的结果,也就代表的是数字1到9的累加和。

//输出结果:
main: 1 + 2 = 3
main: 3 + 3 = 6
main: 6 + 4 = 10
main: 10 + 5 = 15
main: 15 + 6 = 21
main: 21 + 7 = 28
main: 28 + 8 = 36
main: 36 + 9 = 45
45

可以看到,默认情况下,它是在⼀个单线程运⾏的,也就是main线程。

然后每次 reduce操作都是串⾏起来的,⾸先计算前两个数字的和,然后再往后依次计算。

Stream多线程并⾏计算

思考上⾯⼀个例⼦,是不是⼀定要在单线程⾥进⾏串⾏地计算呢?

假如计算机是⼀个多核计算机,我们在理论上能否利⽤多核来进⾏并⾏计算,提⾼计算效 率呢?

当然可以,⽐如我们在计算前两个元素1 + 2 = 3的时候,其实我们也可以同时在另 ⼀个核计算 3 + 4 = 7。然后等它们都计算完成之后,再计算 3 + 7 = 10的操作。 是不是很熟悉这样的操作⼿法?没错,它就是ForkJoin框架的思想

下⾯⼩⼩地修 改⼀下上⾯的代码,增加⼀⾏代码,使Stream使⽤多线程来并⾏计算:

public class StreamParallelDemo {
    public static void main(String[] args) {
        Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9)
            //增加了下面一行代码
            .parallel()
            .reduce((a, b) -> {
                System.out.println(String.format("%s: %d + %d = %d",
                                                 Thread.currentThread().getName(), a, b, a + b));
                return a + b;
            })
            .ifPresent(System.out::println);
    }
}

//看看这个⽅法的输出:
ForkJoinPool.commonPool-worker-1: 3 + 4 = 7
ForkJoinPool.commonPool-worker-4: 8 + 9 = 17
ForkJoinPool.commonPool-worker-2: 5 + 6 = 11
ForkJoinPool.commonPool-worker-3: 1 + 2 = 3
ForkJoinPool.commonPool-worker-4: 7 + 17 = 24
ForkJoinPool.commonPool-worker-4: 11 + 24 = 35
ForkJoinPool.commonPool-worker-3: 3 + 7 = 10
ForkJoinPool.commonPool-worker-3: 10 + 35 = 45
45

可以很明显地看到,它使⽤的线程是 ForkJoinPool ⾥⾯的 commonPool ⾥⾯的 worker线程。并且它们是并⾏计算的,并不是串⾏计算的。但由于Fork/Join框架 的作⽤,它最终能很好的协调计算结果,使得计算结果完全正确。

⽤Fork/Join代码去实现这样⼀个功能,那⽆疑是⾮常复杂的。

Java8提供了并⾏式的流式计算,⼤⼤简化了我们的代码量,使得我们只需要写很少很简单 的代码就可以利⽤计算机底层的多核资源。

总结
  • Stream并⾏的底层原理是使⽤了Fork/Join框架,而Stream并行计算带来的将是性能的大幅提升。

  • 当电脑有8核,但并⾏计算耗时并不是单线程计算耗时除以8,因为线程的创建、销毁以及维护线程上下⽂的 切换等都有⼀定的开销。

    所以如果你的服务器并不是多核服务器,那也没必要⽤ Stream的并⾏计算。

    因为在单核的情况下,往往Stream的串⾏计算⽐并⾏计算更 快,因为它不需要线程切换的开销。


计划任务

简介

JDK 1.5 开始,JDK提供了 ScheduledThreadPoolExecutor 类⽤于计划任务(⼜称定时任务).

这个类有两个⽤途:

  • 在给定的延迟之后再运⾏任务
  • 周期性重复执⾏任务

在这之前,是使⽤ Timer 类来完成定时任务的,但是 Timer 有缺陷:

  • Timer是单线程模式,如果在执⾏任务期间某个TimerTask耗时较久,那么就会影响其它任务的调度;
  • Timer的任务调度是基于绝对时间的,对系统时间敏感;
  • Timer不会捕获执⾏TimerTask时所抛出的异常,由于Timer是单线程,⼀旦出现异常,则线程就终⽌,其他任务也得不到执⾏。

所以JDK 1.5之后,⼤家就摒弃 Timer类 ,使⽤ ScheduledThreadPoolExecutor类 吧!!


ScheduledThreadPoolExecutor类

简介(what)

ScheduledThreadPoolExecutor 类是 Java 中 java.util.concurrent 包提供的一个实现了定时调度的线程池。

它继承自 ThreadPoolExecutor 类,并实现了 ScheduledExecutorService 接口,允许开发者在固定的延迟或固定的频率下执行任务。

其中ThreadPoolExecutor类在线程池章节已经介绍过了,这里便不再介绍。

倒是可以看看了解一下ScheduledExecutorService 接口:

ScheduledExecutorService接口 继承了 ExecutorService接口 ,并增加若⼲定时相关的方法,如下:

public interface ScheduledExecutorService extends ExecutorService {

    // 延迟执行一个 Runnable 任务
    ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);

    // 延迟执行一个 Callable 任务,并返回一个 ScheduledFuture 用于获取结果
    <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);

    // 以固定的频率执行一个 Runnable 任务,包括初始延迟时间和执行间隔
    ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);

    // 以固定的延迟执行一个 Runnable 任务,包括初始延迟时间和执行间隔
    ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
}

主要特点:

ScheduledThreadPoolExecutor类的主要特点:

  1. 定时调度:
    • ScheduledThreadPoolExecutor 允许开发者执行定时任务,支持在固定的延迟或固定的频率下执行任务。
  2. 继承 ThreadPoolExecutor
    • ScheduledThreadPoolExecutor 继承自 ThreadPoolExecutor,因此具有线程池的所有特性,包括线程池的大小、任务队列、拒绝策略等。
  3. 灵活的调度方法:
    • 提供了多种灵活的调度方法,如 schedule()scheduleAtFixedRate()scheduleWithFixedDelay()

#### 如何使用(how)
  1. 创建 ScheduledThreadPoolExecutor 实例:

    ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(corePoolSize);
    

    其中,corePoolSize 是线程池的核心线程数,表示可以同时执行的线程数。

  2. schedule() 方法:

    • 用于延迟执行任务,只执行一次。
    ScheduledFuture<?> future = executor.schedule(Runnable task, long delay, TimeUnit TimeUnit.SECONDS);
    
  3. scheduleAtFixedRate() 方法:

    • 用于以固定的频率执行任务,如果任务的执行时间超过设定的频率,下一个任务将立即开始执行。
    ScheduledFuture<?> future = executor.scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit TimeUnit.SECONDS);
    
  4. scheduleWithFixedDelay() 方法:

    • 用于以固定的延迟执行任务,保证在上一个任务完成后延迟一段时间再执行下一个任务。
    ScheduledFuture<?> future = executor.scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit TimeUnit.SECONDS);
    
  5. 取消任务:

    • 通过 cancel() 方法取消已经提交的任务。
    future.cancel(true);//true 是一个参数,表示是否中断正在执行任务的线程
    
  6. 关闭线程池:

    • 在不再需要线程池时,应该调用 shutdown() 方法来优雅地关闭线程池。
    executor.shutdown();
    
应用实例:

示例1:

在这个示例中,通过 ScheduledExecutorService 创建了一个 ScheduledThreadPoolExecutor 实例,然后使用不同的调度方法执行了三个不同的定时任务。

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

public class ScheduledThreadPoolExample {
    public static void main(String[] args) {
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

        // 延迟 2 秒后执行任务
        ScheduledFuture<?> future1 = executor.schedule(() -> {
            System.out.println("Task 1 executed");
        }, 2, TimeUnit.SECONDS);

        // 初始延迟 1 秒,然后每隔 3 秒执行一次任务
        ScheduledFuture<?> future2 = executor.scheduleAtFixedRate(() -> {
            System.out.println("Task 2 executed");
        }, 1, 3, TimeUnit.SECONDS);

        // 初始延迟 1 秒,然后在上一个任务完成后延迟 3 秒执行下一个任务
        ScheduledFuture<?> future3 = executor.scheduleWithFixedDelay(() -> {
            System.out.println("Task 3 executed");
        }, 1, 3, TimeUnit.SECONDS);

        // 在未来某个时间点取消任务
        executor.schedule(() -> {
            future1.cancel(true);
            future2.cancel(true);
            future3.cancel(true);
            executor.shutdown();
        }, 10, TimeUnit.SECONDS);
    }
}

示例2:

假设有⼀个需求,指定时间给⼤家发送消息。那么我们会将消息(包含其发送时间)存储在数据库中,然后想⽤⼀个定时任务,每隔1秒检查数据库在当前时间有没有需要发送的消息,那这个计划任务怎么写?下⾯是⼀个Demo:

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPool {
    private static final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1, Executors.defaultThreadFactory());
    private static final SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    public static void main(String[] args) {
        // 新建一个固定延迟时间的计划任务
        executor.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                if (haveMsgAtCurrentTime()) {
                    System.out.println(df.format(new Date()));
                    System.out.println("大家注意了,我要发消息了");
                }
            }
        }, 1, 1, TimeUnit.SECONDS);
    }

    public static boolean haveMsgAtCurrentTime() {
        // 查询数据库,有没有当前时间需要发送的消息
        // 这里省略实现,直接返回true
        return true;
    }
}

//输出结果:
2023-01-23 16:16:48
⼤家注意了,我要发消息了
2023-01-23 16:16:49
⼤家注意了,我要发消息了
2023-01-23 16:16:50
⼤家注意了,我要发消息了
2023-01-23 16:16:51
⼤家注意了,我要发消息了
2023-01-23 16:16:52
⼤家注意了,我要发消息了
2023-01-23 16:16:53
⼤家注意了,我要发消息了
2023-01-23 16:16:54
⼤家注意了,我要发消息了
2023-01-23 16:16:55
⼤家注意了,我要发消息了

The End!!创作不易,欢迎点赞/评论!!欢迎关注我的GZ号!

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/337048.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

MySQL(五)——多表查询

上期文章 MySQL&#xff08;四&#xff09;——约束 文章目录 上期文章多表关系一对多&#xff08;多对一&#xff09;多对多多表外键关系可视化一对一 多表查询概述笛卡尔积多表查询分类连接查询 内连接隐式内连接显式内连接 外连接左外连接右外连接 自连接联合查询 union&am…

python-基础篇-变量

文章目录 变量的基本使用目标01. 变量定义1) 变量演练1 —— iPython2) 变量演练 2 —— PyCharm3) 变量演练 3 —— 超市买苹果思考题 02. 变量的类型2.1 变量类型的演练 —— 个人信息2.2 变量的类型2.3 不同类型变量之间的计算1) **数字型变量** 之间可以直接计算2) **字符串…

Python基础第四篇(Python函数)

文章目录 一、函数介绍二、函数的定义三、函数的参数与返回值四、函数说明文档五、函数的嵌套六、变量域七、函数案例1.源代码2.读出结果 在程序设计领域&#xff0c;函数成为一个不可或缺的角色&#xff0c;它们为我们提供了精练、高效和易于管理的编程方式。本篇博客将带您深…

CentOS 7安装Java并配置环境

一、安装Java环境 1、检查系统是否安装Java [rootlocalhost ~]# java -version 2、更新系统软件包 [rootlocalhost ~]# yum update #遇到[y/n],选择y并回车&#xff0c;耐心等待下载完毕&#xff0c;之后系统会自动检验更新的软件包遇到 /var/run/yum.pid 已被锁定 /var/…

【动态规划】【数学】【C++算法】805 数组的均值分割

作者推荐 【动态规划】【数学】【C算法】18赛车 本文涉及知识点 动态规划 数学 805 数组的均值分割 给定你一个整数数组 nums 我们要将 nums 数组中的每个元素移动到 A 数组 或者 B 数组中&#xff0c;使得 A 数组和 B 数组不为空&#xff0c;并且 average(A) average(B)…

nuclei安装;linux上 以及使用教程

kali安装go环境_go1.17 kali安装-CSDN博客Ubuntu完美解决Github网站打不开问题 - 一抹烟霞 - 博客园 (cnblogs.com) All releases - The Go Programming Language 然但是上面两个我似乎都没用到网上的教程 也不适用 一个网不好 一个apt没找到包 然后我先试试了版本 结果 我的…

BGP Origin 属性控制选路试验

一、拓朴图&#xff1a; 二、配置步骤&#xff1a; 1、配置 IP 2、配置 IGP&#xff0c;我们这里用了静态&#xff0c;互相宣告了对端接口和 Loopback 0 3、配置 BGP 4、在 R1 上通过 BGP 宣告 1.1.1.1&#xff0c;查看 R2 的路由&#xff0c;发现两条 1.1.1.1 的路由&#x…

Vue中的组件

在应用程序的开发中&#xff0c;组件是不可缺少的。在Vue的使用中&#xff0c;同样也会用到组件。   vue组件的一般知识点&#xff1a;   1、组件的名字唯一&#xff1b;   2、组件以Html形式书写&#xff1b;   3、组件可以复用&#xff1b;   4、组件可以嵌套&…

postgresql(Windows)初始化数据库教程

省流&#xff1a;本文章内容讲的是如何初始化postgresql数据库环境&#xff0c;前提是已经安装好postgresql数据库&#xff0c;安装步骤参考postgresql&#xff08;Windows&#xff09;安装教程 # 开始&#xff1a;安装postgresql-12.14-2-windows-x64.exe完成后进行初始化数据…

gin中间件篇

1. 全局中间件 所有请求都经过此中间件 package mainimport ("fmt""time""github.com/gin-gonic/gin" )// 定义中间 func MiddleWare() gin.HandlerFunc {return func(c *gin.Context) {t : time.Now()fmt.Println("中间件开始执行了&quo…

《Linux高性能服务器编程》笔记04

Linux高性能服务器编程 本文是读书笔记&#xff0c;如有侵权&#xff0c;请联系删除。 参考 Linux高性能服务器编程源码: https://github.com/raichen/LinuxServerCodes 豆瓣: Linux高性能服务器编程 文章目录 Linux高性能服务器编程第09章I/O复用9.1 select系统调用9.2 po…

JVM之java内存区域[1](程序计数器、栈)

文章目录 版权声明零 运行时数据区一 程序计数器1.1 加载阶段1.2 执行阶段1.3 多线程情况 二 栈2.1 java虚拟机栈2.2 java虚拟机栈帧的组成2.2.1 局部变量表2.2.2 操作数栈2.2.3 帧数据 2.3 栈内存溢出2.4 设置帧大小2.5 本地方法栈 版权声明 本博客的内容基于我个人学习黑马程…

如何快速打开github

作为一个资深码农&#xff0c;怎么能不熟悉全球最大的同性交友社区——github呢&#xff0c;但头疼的是github有时能打开&#xff0c;有时打不开&#xff0c;这是怎么回事&#xff1f; 其实问题出在github.com解析DNS上&#xff0c;并不是需要FQ。下面提供一个方法&#xff0c;…

C++:基于C的语法优化

C&#xff1a;基于C的语法优化 命名空间命名空间域域作用限定符展开命名空间域 输入输出缺省参数全缺省参数半缺省参数 函数重载参数类型不同参数个数不同参数类型的顺序不同 引用基本语法按引用传递返回引用引用与指针的区别 内联函数autoauto与指针和引用结合 范围for循环nul…

官方版2345加速浏览器(好用的浏览器分享)

官方版2345加速浏览器&#xff08;好用的浏览器分享&#xff09; 2345加速浏览器拥有智能拦截骚扰广告&#xff0c;识别欺诈网站&#xff0c;云收藏夹等功能&#xff0c;高速上网、不假死、不卡机&#xff0c;是一款强大的多功能网页浏览器。 使用2345加速浏览器,您可以轻松应对…

DHCP配置(路由器,交换机)

DHCP接口地址池配置 拓扑 PC配置DHCP点击应用。 路由器配置命令 <Huawei>sy Enter system view, return user view with CtrlZ. [Huawei]int g0/0/1[Huawei-GigabitEthernet0/0/1]ip address 10.1.1.1 24[Huawei-GigabitEthernet0/0/1]q[Huawei]dhcp enable Info: T…

【日常聊聊】边缘计算的挑战和机遇

&#x1f34e;个人博客&#xff1a;个人主页 &#x1f3c6;个人专栏&#xff1a; 日常聊聊 ⛳️ 功不唐捐&#xff0c;玉汝于成 目录 前言 正文 边缘计算的挑战和机遇 一&#xff1a;数据安全与隐私保护 二&#xff1a;网络稳定性与可靠性 三&#xff1a;实时性与性能优…

电压检测芯片适用于哪些应用领域?

原文链接&#xff1a; 电压检测芯片适用于哪些应用领域&#xff1f; - 知乎 (zhihu.com) 电压检测基本涉及到电子世界的方方面面。 我上一份工作是做无人机飞控研发&#xff0c;无人机在使用过程中是需要事件监测电压的&#xff0c;还需要针对电压对航行进行预估&#xff0c;…

推荐新版AI智能聊天系统网站源码ChatGPT NineAi

Nine AI.ChatGPT是基于ChatGPT开发的一个人工智能技术驱动的自然语言处理工具&#xff0c;它能够通过学习和理解人类的语言来进行对话&#xff0c;还能根据聊天的上下文进行互动&#xff0c;真正像人类一样来聊天交流&#xff0c;甚至能完成撰写邮件、视频脚本、文案、翻译、代…

SpringMVC(八)处理AJAX请求

一、处理AJAX之准备工作: 首先我们创建一个新的工程: 我们将pom.xml复制过来: <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-in…
最新文章