前言
AbstractQueuedSynchronizer是抽象同步队列,其是实现同步机器的基础组件,并发包中的锁的底层就是使用AQS实现的。
AQS中 维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。
这里volatile能够保证多线程下的可见性,当state=1则代表当前对象锁已经被占有,其他线程来加锁时则会失败,加锁失败的线程会被放入一个FIFO的等待队列中,并且会被UNSAFE.park()操作挂起,等待其他获取锁的线程释放锁才能够被唤醒。
另外state的操作都是通过CAS来保证其并发修改的安全性。
一、AQS中的关键成员变量
- state
- 在AQS中,维护了一个单一变量state,对于不同的实现其有不同的意义:
- 在ReentrantLock中,state表示重入式锁的可重入次数
- 在ReentrantReadWriteLock中,state的高16位用于表示读锁的可获取次数,低16位用于表示写锁的可重入次数。
- exclusiveOwnerThread
- 继承自AbstractOwnableSynchronizer,用于指明当前独占线程。
- head、tail
- 维护了一个队列,分别指向首尾节点
- Node
- Node节点内部的SHARED用来标记该线程是在获取共享资源时被阻塞挂起放入AQS队列的,EXCLUSIVE用来标识该线程是获取独占资源时被阻塞挂起放入AQS队列的。
- 在Node节点内部有一个成员变量waitStatus记录当前线程等待状态,可以为:
- 1:CANCELLED(线程被取消了)
- -1:SIGNAL(线程需要唤醒)
- -2:CONDITION(线程在条件队列里等待)
- -3:PROPAGATE(释放资源时需要通知其他节点)
- ConditionObject
- ConditionObject和Node一样是AQS的内部类。它用来结合锁实现线程同步,其可以访问AQS的内部变量(state和AQS阻塞队列)。
- ConditionObject是条件变量,每个条件变量对应一个条件队列,我们可以看到ConditionObject中有两个指针,分别指向条件队列的队尾和队头。条件队列用来存放调用条件变量的await方法后被阻塞的线程。
二、线程中断相关的三个方法
三、Unsafe与LockSupport
Unsafe
- CAS的全称是Compare-And-Swap,它是一条CPU并发原语。
- 它的功能是判断内存某个位置是否是预期值,如果是则更改为新的值,这个过程是原子性的
- CAS并发原语在 java的体现就是sun.mic.Unsafe类个各个方法,调用Unsafe类的方法,JVM会帮助我们实现CAS汇编指令。这是一个完全依赖于硬件的功能,通过它实现原子性操作。由于CAS是一种系统原语,由若干指令组成,该原语执行必须连续的不许中断。
这里设置了静态代码块提前获取了state、head、tail、waitStatus、next四个参数在对象内存中的偏移量。
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
/**
* CAS head field. Used only by enq.
*/
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
/**
* CAS tail field. Used only by enq.
*/
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
/**
* CAS waitStatus field of a node.
*/
private static final boolean compareAndSetWaitStatus(Node node,
int expect,
int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset,
expect, update);
}
/**
* CAS next field of a node.
*/
private static final boolean compareAndSetNext(Node node,
Node expect,
Node update) {
return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
}
LockSupport
LockSupport是用来创建锁和其他同步类的基本线程阻塞原语。简而言之,当调用LockSupport.park时,表示当前线程将会等待,直至获得许可,当调用LockSupport.unpark时,必须把等待获得许可的线程作为参数进行传递,好让此线程继续运行。
park
函数,阻塞线程,并且该线程在下列情况发生之前都会被阻塞: ① 调用unpark函数,释放该线程的许可。② 该线程被中断。③ 设置的时间到了。并且,当time为绝对时间时,isAbsolute为true,否则,isAbsolute为false。当time为0时,表示无限等待,直到unpark发生。unpark
函数,释放线程的许可,即激活调用park后阻塞的线程。这个函数不是安全的,调用这个函数时要确保线程依旧存活。
public class LockSupportDemo {
public static void main(String[] args) {
Thread A = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("线程A被LockSupport.park()阻塞");
LockSupport.park();
System.out.println("线程A被线程B LockSupport.unpark()唤醒");
}
},"A");
A.start();
Thread B = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("线程B唤醒线程A");
// 唤醒指定线程t,也就是A
LockSupport.unpark(A);
}
},"B")
B.start();
}
}
结果:
线程A被LockSupport.park()阻塞
线程B唤醒线程A
线程A被线程B LockSupport.unpark()唤醒
四、核心源码
以ReentrantLock为例进行讲解,AQS是典型的模板方法的实现,所以AQS对外暴露了多个个抽象方法(tryAcquire、tryRelease等等)需要子类进行实现。
ReentrantLock的lock方法实际上调用了sync的lock方法,而sync继承了AQS,同时针对公平策略和非公平策略有不同的实现。这里我们主要看针对非公平锁NonfairSync的实现。
lock()
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 第一次加锁
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 持有锁的线程重复加锁
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// 获取锁失败,再次判断队列是否初始化
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
// 第一次执行,也就是head和tail两个指针都为null,会初始化两个Node
for (;;) {
Node t = tail;
// 初始化队列,设置一个空Node,并将head与tail两个指针同时指向该节点
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
// 队列已经初始化完成,则将该节点插入队列尾部
} else {
node.prev = t;
// 注意,此时t仍然指向,为尾节点的上一个节点
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 如果当前节点的前置节点是头节点,则意味着本次入队操作是第一次
final Node p = node.predecessor();
// 如果是第一次入队,则再次尝试获取state
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 非第一次入队/第一次入对的第二次循环
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
因为lock方法是不可中断的,所以从lock方法中进来构建起来的同步队列不会有CANCELLED状态。CONDITION用于条件队列当中。PROPAGETE是用于共享模式下的状态。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// Acquire失败以后是否需要挂起,true:需要-false:不需要
// 针对ReentrantLock,这里指挥判断SIGNAL
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
// ws > 0 = CANCELLED
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
整体流程:
unLock()
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// 标识锁是否释放
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
这里线程二被唤醒以后将继续执行acquireQueued
方法,判断线程二的前置节点是否为head,如果是则继续使用tryAcquire()方法来尝试获取锁,其实就是使用CAS操作来修改state值,如果修改成功则代表获取锁成功。接着将线程二设置为head节点,然后空置之前的head节点数据,被空置的节点数据等着被垃圾回收。
在线程二释放锁以后,这个时候CLH队列中就只剩下线程三:
五、知识拓展
公平锁与非公平锁
非公平锁执行原理:
公平锁执行原理:
参考内容:
- https://www.bilibili.com/video/BV1vM411r7Bt
- https://juejin.cn/post/6844904146127044622