【Java 并发】AbstractQueuedSynchronizer

1 AQS 简介

在同步组件的实现中, AQS 是核心部分, 同步组件的实现者通过使用 AQS 提供的模板方法实现同步组件语义。
AQS 则实现了对同步状态的管理, 以及对阻塞线程进行排队, 等待通知等一些底层的实现处理。
AQS 的核心也包括了这些方面: 同步队列, 独占式锁的获取和释放, 共享锁的获取和释放以及可中断锁, 超时等待锁获取这些特性的实现,
而这些实际上则是 AQS 提供出来的模板方法, 归纳整理如下:

在 Java 并发编程领域中, AbstractQueuedSynchronizer (AQS) 是一项功能强大且设计精巧的工具。
它为开发人员提供了一种高效的同步机制, 用于安全地控制多线程环境下的资源访问和状态管理。

其本身的设计很简单, 内部维护 1 个 int 的状态和 1 个链表

  1. 一个线程过来获取锁 (本质就是通过 cas 修改 int 的状态), 获取锁成功 (int 状态修改成功), 线程继续执行
  2. 一个线程过来获取锁, 获取锁失败, 则将线程封装为链表的一个节点, 放入链表中, 然后挂起
  3. 获取锁的线程执行完逻辑, 释放锁, 就唤醒链表的头节点, 重新尝试获取锁, 获取成功, 从链表移除, 执行逻辑 (这个过程可能有从外部来的线程进行竞争)

上面是 AQS 非公平锁的大体过程, AQS 本身还提供了公平锁的实现, 为了实现这些锁的逻辑,
AQS 本身还需要支持 同步队列, 独占式锁的获取和释放, 共享锁的获取和释放以及可中断锁, 超时等待锁获取等功能
而这些功能本身复杂度高同时还是高频的逻辑, 所以 AQS 本身借助了模板方法的设计模式, 将常用的逻辑封装起来, 然后让子类去实现自己锁获取释放的逻辑。
大体的逻辑如下:

public abstract class AbstractQueuedSynchronizer {
    
    public void lock() {
        // 1. 尝试获取锁
        // 由子类决定当前线程是否获取锁成功
        if (tryAcquire()) {
            // 获取成功, 直接返回
            return;
        }
        
        // 2. 获取锁失败, 将线程封装为节点, 放入队列, 然后挂起
        // 这些逻辑由 AQS 内部进行实现
        addNodeToQueueAndPark();
    }
    
    // 由子类进行实现
    protected abstract boolean tryAcquire();
}

而这些实际上则是 AQS 提供出来的模板方法, 归纳整理如下:

独占式锁相关的方法

// 独占式获取同步状态, 如果获取失败则插入同步队列进行等待
void acquire(int arg);

// 与 acquire 方法相同, 但在同步队列中进行等待的时候可以检测中断
void acquireInterruptibly(int arg);

// 在 acquireInterruptibly 基础上增加了超时等待功能, 在超时时间内没有获得同步状态返回 false
boolean tryAcquireNanos(int arg, long nanosTimeout);

// 释放同步状态, 该方法会唤醒在同步队列中的下一个节点
boolean release(int arg);

共享式锁相关的方法

// 共享式获取同步状态, 与独占式的区别在于同一时刻有多个线程获取同步状态
void acquireShared(int arg);

// 在 acquireShared 方法基础上增加了能响应中断的功能
void acquireSharedInterruptibly(int arg);

// 在 acquireSharedInterruptibly 基础上增加了超时等待的功能
boolean tryAcquireSharedNanos(int arg, long nanosTimeout);

// 共享式释放同步状态
boolean releaseShared(int arg);

本身了解这些模板方法的逻辑, 就能够很好的理解 AQS 的设计思想, 以及后续的同步组件的实现。

2 AQS 同步队列

AQS 内部核心的 2 个变量, 1 个 int 的状态值, 1 个同步队列。
int 的状态值本身没有多大的问题, 但是链表本身有一点设计, 所以这里对 AQS 的链表做个简单的介绍, 便于后面 AQS 的理解。

在 AQS 有一个静态内部类 Node (只列举了部分重要的属性)

static final class Node {

    /******************** 属性  **************************/

    // 节点状态
    volatile int waitStatus;
    
    // 当前节点的前驱节点
    volatile Node prev;
    
    // 当前节点的后驱节点
    volatile Node next;
    
    // 加入同步队列的线程引用
    volatile Thread thread;
    
    // 等待队列中的下一个节点
    Node nextWaiter;

    /******************** 节点模式  **************************/

    // 标识节点为独占模式
    static final Node SHARED = new Node();

    // 标识节点为独占模式
    static final Node EXCLUSIVE = null;
    
    /******************** 节点状态  **************************/
    
    // 节点从同步队列中取消
    int CANCELLED = 1; 
    
    // 等待唤醒的状态
    int SIGNAL = -1;
    
    // 当前节点进入等待队列中
    int CONDITION = -2;
    
    // 在共享锁的释放中, 会从头节点向后逐个唤醒状态为 signal 的节点的线程, 直到遇到第一个状态为 0 的, 停下来, 会将其从 0 设置为 -3
    // 表示下一次共享式同步状态获取将会无条件传播下去
    int PROPAGATE = -3;
    
    // 初始状态
    int INITIAL = 0;
}

从上面的节点的属性可以知道每个节点有前驱节点 prev 和后驱节点 next, 所以可以知道同步队列的真实实现是一个双向链表。

另外 AQS 自身的属性中有两个重要的成员变量:

public abstract class AbstractQueuedSynchronizer {

    // 同步队列的头节点
    private transient volatile Node head;

    // 同步队列的尾节点
    private transient volatile Node tail;
}

结合 2 个属性, 可以得出 AQS 中维护的同步队列的结构如下:

Alt 'AQS 双向链表的结构'

同时, 我们也可以大概分析出节点加入同步队列的过程:

// 1. 将线程封装为节点
// 2. 将节点设置到双写链表的尾部
// 3. 修改 AQS 的 tail 指向新的节点

退出链表的逆推就行了, 这里就不再赘述了。

3 AQS 中的独占锁实现

3.1 独占锁的获取 - acquire 方法

public final void acquire(int arg) {

    // 调用需要子类实现的 tryAcquire() 方法, 尝试获取锁
        
    // 1. 获取锁成功了, 方法结束
    // 2. 获取锁失败, 将当前线程封装为 Node 节点, 放到等待队列中, 等待唤醒
    // 3. acquireQueued 方法返回 true 表示当前线程需要中断了, 设置线程的中断标识为 true
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        // 设置当前的线程的中断标识为 true 
        selfInterrupt();
}
3.1.1 acquire 中的入队操作 - addWaiter 方法
// 当前使用的为 OpenJdk 11 版本, 可能会有出入
// 入参的 mode 为 Node.EXCLUSIVE 或者 Node.SHARED, 表示当前节点的模式为独占模式或者共享模式
private Node addWaiter(Node mode){

    // 1 将当前线程封装成一个 Node 节点, 这个节点的下一个等待的节点的模式, 既 Node.EXCLUSIVE 或 Node.SHARED
    // 通过这个下一个节点的模式可以间接等待当前节点模式
    Node node = new Node(Thread.currentThread(), mode);

    // 死循环
    for (;;){
        
        // 取到当前链表的尾节点
        Node oldTail = tail;

        // 2 当前尾节点是否为 null
        if (oldTail != null){

            // 2.2 设置新的节点的前驱节点为当前链表的尾节点
            node.setPrevRelaxed(oldTail);

            // 通过 CAS 把当前节点设置为尾节点
            if (compareAndSetTail(oldTail, node)){
                // 旧的尾节点的下一个节点为当前的新节点
                oldTail.next = node;
                return node;
            }
        } else{
            // 2.1 当前同步队列尾节点为 null, 说明当前线程是第一个加入同步队列进行等待的线程, 初始化同步队列
            // 同步队列这时候不为空了, 又执行一次循环
            initializeSyncQueue();
        }
    }
}

private final void initializeSyncQueue() {
    Node h;
    // 创建出一个空的 Node 节点, 通过 CAS 操作尝试将其变为头节点, 再将尾节点的指针指向新创建的节点
    if (HEAD.compareAndSet(this, null, (h = new Node())))
        tail = h;
}

分析可以看上面的注释。
程序的逻辑主要分为两个部分:

  1. 当前同步队列的尾节点为 null, 调用方法 initializeSyncQueue(), 初始出一个头部没有任何信息的链表, 然后回来, 重写回到循环, 再次尝试把当前节点放到链表的尾部
  2. 当前队列的尾节点不为 null, 则采用尾插入 (compareAndSetTail() 方法) 的方式入队
3.1.2 acquire 中的在等待队列唤醒 - acquireQueued 方法

获取独占式锁失败的线程会包装成 Node, 然后插入等待同步队列。
在同步队列中的节点 (线程) 会做什么事情来保证自己能够有机会获得独占式锁了?
带着这样的问题我们就来看看 acquireQueued() 方法, 从方法名就可以很清楚, 这个方法的作用就是排队获取锁的过程, 源码如下:

final boolean acquireQueued(final Node node, int arg) {

    // 是否需要通知当前线程中断
    boolean interrupted = false;
    try {
        for (;;) {
        
            // 获取当前节点的前驱节点
            final Node p = node.predecessor();
            
            // 2 前驱节点是头节点并且成功获取同步状态, 即可以获得独占式锁
            // 在上面创建 addWaiter 方法可以知道, 同步队列为空, 会创建一个默认值的头节点 head, 再把新节点放到这个头节点前面
            // 如果一个节点的前驱节点为头节点, 就可以判断出这个节点为链表中真正数据的第一个节点
            if (p == head && tryAcquire(arg)) {
                // 当前节点设置为 头节点
                // 设置头节点 = node
                // 设置 node.thread = null
                // 设置 node.prev = null
                // 这时候头节点的状态为 signal (-1)
                setHead(node);
                p.next = null;
                return interrupted;
            }
            
            // 3 获取锁失败, 线程进入等待状态等待获取独占式锁
            // shouldParkAfterFailedAcquire 主要是判断当前的节点里面的线程是否可以挂起, 
            // 返回 true 的条件: node 的前驱节点的状态为 signal (等待唤醒的状态), 前驱在等待唤醒, 那么这个节点先挂起
            // parkAndCheckInterrupt 这时会挂起线程, 阻塞住, 直到被唤醒获取中断
            if (shouldParkAfterFailedAcquire(p, node))
                // | 或运算, 只要有一个真, 就是真
                // interrupted 默认为 false, parkAndCheckInterrupt() 返回了 true, 那么 interrupted 就会为 true
                interrupted |= parkAndCheckInterrupt();
        }
    } catch (Throwable t) {
        // 上面的逻辑出现了异常了, 正常的情况就是线程的中断标识为 true, 但是挂起了, 或者挂起中, 被中断了
        // 取消获取锁
        cancelAcquire(node);
        // 需要设置中断标识, 
        if (interrupted)
            selfInterrupt();
        throw t;
    }
}

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

    int ws = pred.waitStatus;
    // 前驱节点的状态为 signal 
    // signal 表示等待唤醒的状态, 安全的, 当前线程可以挂起
    if (ws == Node.SIGNAL)
        return true;

    // > 0, 状态为取消状态
    if (ws > 0) {    

        // 从当前节点一直往前找到第一个状态不为 CANCELLED (1) 的节点,
        // 也就是找到链表中前面中最接近当前节点, 同时状态不为 CANCELLED (1), 将当前节点放到这个节点的后面, 中间的节点舍弃掉
        // 效果: 从当前节点到第一个不为 CANCELLED 状态的节点之间所有的 CANCELLED 状态的节点都被删除
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);

        pred.next = node;
    } else {
        // 将前驱节点设置为 SIGNAL 状态, 表示节点里面的线程等待唤醒
        pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
    }
    // 返回 false, 表示当前的线程还不能挂起, 再走一遍循环
    return false;
}

private final boolean parkAndCheckInterrupt() {
    // 使当前线程挂起, 直到被唤醒
    LockSupport.park(this);
    // 返回当前线程的中断标识
    return Thread.interrupted();
}

到这里就应该清楚了, acquireQueued() 在自旋过程中主要完成了两件事情:

1 如果当前节点的前驱节点是头节点, 并且再次尝试, 能够获取到同步状态的话 (即获取到锁), 直接返回, 让线程能哥继续执行, 否则进入下一步
2 获取锁失败的话, 会根据前驱节点的状态进行处理 (如下)

2.1 前驱节点的状态为 CANCELLED, 从当前节点一直往前找到第一个不是取消状态的节点, 将当前节点放到其后面, 重新执行 acquireQueued 方法的逻辑
2.2 前驱节点不是 SIGNAL 和 CANCELLED, 将前驱节点设置为 SIGNAL 状态, 重新执行 acquireQueued 方法的逻辑
2.3 前驱节点为 SIGNAL 状态, 把当前线程挂起来。等待被唤醒

到这里可以看出独占锁的特点

  1. 线程进来, 就直接尝试获取同步状态, 获取成功, 直接返回
  2. 获取失败, 就将线程封装为节点, 放入等待链表, 然后挂起
3.1.3 acquire 中等待队列唤醒异常 - cancelAcquire 方法

在上面的 acquireQueued 方法中, 线程的中断标识为 true, 尝试挂起会失败, 这时候会让这个线程取消获取锁的逻辑

private void cancelAcquire(Node node) {

    // 节点为 null, 直接结束
    if (node == null)
        return;

    // 设置节点的线程为 null 
    node.thread = null;

    Node pred = node.prev;
    // 从当前的节点往前找到第一个状态为取消状态 (1) 的节点, 也就是当前链表中最后一个状态为取消状态的节点
    while (pred.waitStatus > 0)
        // 设置当前节点的前缀节点为这个取消状态节点的前驱节点
        node.prev = pred = pred.prev;

    // 这里的 predNext 就是当前链表中最后一个状态为取消状态的节点, 为下面的 cas 使用
    Node predNext = pred.next;   
    // 当前节点的状态设置为取消状态(1)
    node.waitStatus = Node.CANCELLED; 

    // 当前节点就是为节点, 通过 cas 将当前链表的尾节点从当前节点设置为找到的节点
    if (node == tail && compareAndSetTail(node, pred)) {
        // 设置找到的节点的下一个节点从 predNext 设置为 null
        pred.compareAndSetNext(predNext, null);
    } else {
        int ws;

        // 找到的节点不是头节点, 同时节点的线程不为空
        // 加上 节点的状态为 signal 或者 不是取消状态下, 能设置为 signal 状态
        // 后面的判断最少为了确保找到的节点为 signal 状态
        if (pred != head && pred.thread != null && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL)))) {
            // 当前节点的下一个节点
            Node next = node.next;
            // 下一个节点不为空, 同时状态不是取消状态, 将找到的节点的下一个节点设置为当前节点的下一个节点
            if (next != null && next.waitStatus <= 0)
                pred.compareAndSetNext(predNext, next);

        } else {
            // 找到的节点为头节点
            // 找到的节点的线程为空
            // 找到的节点的状态为取消状态
            // 都会执行到这个方法, 唤醒这个节点后面的第一个状态小于等于 0 的线程
            unparkSuccessor(node);
        }

        // 协助 gc
        node.next = node; 
    }
}

private void unparkSuccessor(Node node) {

    int ws = node.waitStatus;
    // 当前的节点状态为不是初始状态或者取消状态, 设置为默认值 0, 初始状态
    if (ws < 0)
        node.compareAndSetWaitStatus(ws, 0);

    // 下一个节点
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {  
        s = null;
        // 从后往前找到, 找到第一个状态不为取消的节点和初始状态的节点
        for (Node p = tail; p != node && p != null; p = p.prev)
            if (p.waitStatus <= 0)
                s = p;
    }  

    // 找到了进行唤醒
    if (s != null)
        LockSupport.unpark(s.thread);    
}

取消获取锁的过程看起来很绕, 实际整理起来很简单

  1. 清除当前节点和它前面的到第一个非取消状态的节点之间所有取消状态的节点
  2. 如果找到的节点为头节点 (注意了头节点为没有任何信息的节点), 尝试从当前节点往后找到第一个不为取消状态的节点, 唤醒它

3.2 独占锁的释放 - release 方法

独占锁的释放就相对来说比较容易理解了, 废话不多说先来看下源码:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        // 头节点存在, 同时状态不为 0 (初始状态)
        // 判断 != 0 的作用下面分析
        if (h != null && h.waitStatus != 0)
            // 唤醒头节点的下一个节点
            unparkSuccessor(h);
        return true;
    }
    return false;
}

首先获取头节点的后驱节点, 后驱节点存在并且状态不为取消状态, 唤醒这个线程。

如果不存在后驱节点或者后驱节点为取消状态, 会尝试从尾节点往前找到第一个状态不为取消状态和初始状态的节点, 同时这个节点不是当前的节点, 找到了会唤醒这个节点对应的线程。

  1. 假设现在有一个锁, 线程 A 通过 acquire 获取到了锁, 经过上面的上面的代码, 可以知道, 这时没有同步队列还没创建
  2. 线程 B 这时候通过 acquire 尝试获取锁失败了, 会创建出一个链表, 把自己封装为节点 B 放到链表的后面
  3. acquireQueued 方法中的死循环会一直判断到当前的节点的前驱节点为头节点, 会不断重试获取锁, 而不会挂起
  4. 这时候线程 A 要释放锁了, 不需要唤醒头节点的下一个节点, 在第三步中会自己唤醒
  5. 在线程 A 释放锁之前, 又要线程 C 尝试获取锁, 失败了, 拼接到节点 B 的后面, 节点 C, 这时候会被挂起
  6. 第三步中, 线程 B 获取锁成立, 会将 B 节点设置为头节点, 清空里面的前驱节点, 线程信息等, 保留下了状态 signal (-1)
  7. 后面线程 B 释放锁, 状态不为 0 了, 就能进入唤醒 C 的过程
  8. C 唤醒后, 重新执行 acquireQueued 的方法, 这是 C 的前置节点为原本的节点 B, 将自己的节点 C 设置为头节点, 这时候的链表只有一个原本节点 C 的节点了

所以最终的独占锁的处理如下:

  1. 线程获取锁失败, 线程被封装成 Node 进行入队操作, 核心方法在于 addWaiter(), 同时 addWaiter() 会在队列为 null 的时候进行初始化。同时通过不断的 CAS 操作将节点存到当前队列的尾部
  2. 线程获取锁是一个自旋的过程, 当且仅当当前节点的前驱节点是头节点并且成功获得同步状态时, 节点出队即该节点引用的线程获得锁, 否则, 当不满足条件时就会调用 LookSupport.park() 方法使得线程阻塞
  3. 释放锁的时候会唤醒后继节点

总体来说:
在获取同步状态时, AQS 维护一个同步队列, 获取同步状态失败的线程会加入到链表中进行挂起, 从链表移除 (或唤醒) 的条件是前驱节点是头节点并且成功获得了同步状态。在释放同步状态时, 同步器会调用 unparkSuccessor() 方法唤醒后驱节点

3.3 可中断式独占锁的获取 - acquireInterruptibly 方法

我们知道 lock 相较于 synchronized 有一些更方便的特性, 比如能响应中断以及超时等待等特性, 现在我们依旧采用通过学习源码的方式来看看能够响应中断是怎么实现的。
可响应中断式锁可调用方法 lock.lockInterruptibly()。

而该方法其底层会调用 AQS 的 acquireInterruptibly 方法, 源码为:

public final void acquireInterruptibly(int arg) throws InterruptedException {
    // 线程的中断标识为 true, 直接抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 尝试获取锁失败   
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

private void doAcquireInterruptibly(int arg) throws InterruptedException {

    // 将节点存入到 同步等待链表
    final Node node = addWaiter(Node.EXCLUSIVE);

    try {
        for (;;) {
            // 获取前驱节点
            final Node p = node.predecessor();

            if (p == head && tryAcquire(arg)) {
                // help GC
                setHead(node);
                p.next = null; 
                return;
            }

            // shouldParkAfterFailedAcquire 判断当前线程是否可以挂起
            // parkAndCheckInterrupt 挂起当前线程, 唤醒后, 判断线程的中断标识是否为 true, 这里为 true, 就会直接抛出异常, 结束死循环, 进入 catch 里面的逻辑
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                throw new InterruptedException();
        }

    } catch (Throwable t) {
        // 取消获取锁
        cancelAcquire(node);
        throw t;
    }

}

与 acquire 方法逻辑几乎一致, 唯一的区别是当 parkAndCheckInterrupt 返回 true, 即线程阻塞时该线程被中断, 代码抛出被中断异常。

3.4 带超时等待时间的独占锁的获取 - tryAcquireNanos 方法

通过调用 lock.tryLock(timeout,TimeUnit) 方式达到超时等待获取锁的效果, 该方法会在三种情况下才会返回:

  1. 在超时时间内, 当前线程成功获取了锁
  2. 当前线程在超时时间内被中断
  3. 超时时间结束, 仍未获得锁返回 false

该方法会调用 AQS 的方法 tryAcquireNanos(), 源码为


public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
    // 线程的中断标识为 true
    if (Thread.interrupted())
        throw new InterruptedException();

    // 先尝试获取锁, 获取锁成功, 直接返回
    // 获取锁失败, 调用实现超时等待的方法
    return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout);
}


private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {

    // 等待的时间小于 0, 直接返回
    if (nanosTimeout <= 0L)
        return false;

    // 得到最终结束等待的时间点    
    final long deadline = System.nanoTime() + nanosTimeout;   

    // 把当前节点加入到等待链表
    final Node node = addWaiter(Node.EXCLUSIVE);


    try {
        for (;;) {

            // 前驱节点为头结点, 同时获取锁成功, 将当前节点置为头结点
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null;
                return true;
            }

            // 1 计算超时时间
            nanosTimeout = deadline - System.nanoTime();

            // 2 判断是否到了结束的时间点
            if (nanosTimeout <= 0L) {
                // 将当前节点从队列里面删除
                cancelAcquire(node);
                return false;
            }
            
            // 3
            // 判断可以挂起线程, 同时设置的超时时间 > SPIN_FOR_TIMEOUT_THRESHOLD = 1000L, 即超时时间大于 1 秒
            // 带超时时间的挂起线程
            if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
                LockSupport.parkNanos(this, nanosTimeout);

            // 线程的中断标识为 true
            if (Thread.interrupted())
                throw new InterruptedException();    

        }
    } catch (Throwable t) {
        // 取消获取锁
        cancelAcquire(node);
        throw t;
    }

}

程序逻辑同独占锁可响应中断式获取基本一致, 唯一的不同在于获取锁失败后, 对超时时间的处理上。
先计算出按照现在时间和超时时间计算出理论上的截止时间 deadline, 然后 deadline - System.nanoTime() 计算出来就是一个负数, 自然而然会在第 2 步中的 if 判断之间返回 false。
如果还没有超时即第 2 步中的 if 判断为 true 时就会继续执行第 3 步。

4 AQS 中的共享锁实现

4.1 共享锁的获取 - acquireShared 方法

public final void acquireShared(int arg) {
    // 调用子类重写的获取共享锁方法
    // 返回了大于 0 的值, 表示获取锁
    // 共享锁的 tryAcquireShared 的返回值, 代表了锁当前有多少个持有者
    // 0 表示无锁状态, 返回 1 表示有 1 个持有者, 返回 2 表示锁已经有 2 个持有者
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

private void doAcquireShared(int arg) {

    // 把节点加入等待链表中
    final Node node = addWaiter(Node.SHARED);

    boolean interrupted = false;
    try {

        for (;;) {
            // 获取前驱节点
            final Node p = node.predecessor();
            // 前驱节点为头节点
            if (p == head) {
                // 获取锁
                int r = tryAcquireShared(arg);
                // 获取锁成功
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null;
                    return;
                }
            }
            // 判断是否可以挂起线程
            if (shouldParkAfterFailedAcquire(p, node))
                interrupted |= parkAndCheckInterrupt();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    } finally {
        if (interrupted)
            selfInterrupt();
    }
}

共享锁的获取逻辑和独占式锁的获取差不多, 这里的线程退出死循环的条件: 当前节点的前驱节点是头节点并且 tryAcquireShared(arg) 返回值大于等于 0 即能成功获得同步状态

和独占锁的获取不同的点在于

  1. 独占锁的获取成功, 只会把自己的节点移除
  2. 共享锁的获取成功, 则复杂了很多, 除了唤醒自己, 还需要把其他共享的节点也唤醒

4.1.1 acquireShard 中在等待代理中唤醒后的行为 - setHeadAndPropagate 方法

private void setHeadAndPropagate(Node node, int propagate) {

    Node h = head;
    // 将当前节点设置为头节点, 清空线程信息
    setHead(node);

    // 持有共享锁的线程数大于 0 
    // 头节点为 null
    // 头节点的状态为不是取消状态
    // 新的头节点为 null
    // 新的头节点的状态不是取消状态
    if (propagate > 0 || h == null || h.waitStatus < 0 ||  (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        // 下一个节点为 null 或者为共享节点
        if (s == null || s.isShared())
            // 尝试是否共享锁
            doReleaseShared();
    }
}

private void doReleaseShared() {
    // 从当前的头节点开始, 向后处理, 把后面所有的状态为 signal 的节点唤醒,
    // 直到遇到第一个节点状态不为 SIGNAL 的, 停止, 同时把这个节点的状态设置为 PROPAGATE
    for (;;) {

        // 获取头节点
        Node h = head;
        // 头节点不为 null 同时 头节点不等于尾节点
        if (h != null && h != tail) {
            // 获取头节点的状态
            int ws = h.waitStatus;
            // 头节点的状态等于 signal 
            if (ws == Node.SIGNAL) {
                // 通过 cas 将头节点从 signal 设置为 0
                if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
                    // 设置失败了, 重新开始循环
                    continue;  
                // 获取后驱节点    
                unparkSuccessor(h);
            // 状态为 0, 则通过 cas 将其从 0 设置为 -3, 设置失败了, 则继续回到头部,
            } else if (ws == 0 && !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
                continue;
        }

        if (h == head)
            break;
    }
}

大体的逻辑如下:

  1. 把当前的节点设置为头节点
  2. 如果头节点的下一个节点为共享节点, 向后处理, 把后面所有的状态为 signal 的节点唤醒, 直到遇到第一个节点状态为 0 的, 停止, 同时把这个节点的状态设置为 PROPAGATE

4.2 共享锁的释放 - releaseShared 方法

public final boolean releaseShared(int arg) {

    // 尝试释放锁
    if (tryReleaseShared(arg)) {
        // 从当前的头节点开始, 向后处理, 把后面所有的状态为 signal 的节点唤醒, 直到遇到第一个节点状态为 0 的, 停止, 同时把这个节点的状态设置为 PROPAGATE
        doReleaseShared();
        return true;
    }
    return false;
}

4.3 共享锁的其他方法

  1. 可中断式的共享锁获取 acquireSharedInterruptibly
  2. 带超时等待时间的共享锁获取 tryAcquireSharedNanos

其实现和独占式锁可中断获取锁以及超时等待的实现几乎一致, 具体的就不再说了

5 参考

深入理解AbstractQueuedSynchronizer(AQS)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/459213.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

2024三掌柜赠书活动第十六期:AI时代Python金融大数据分析实战

目录 前言 AI时代Python金融大数据分析实战 关于《AI时代Python金融大数据分析实战》 编辑推荐 内容简介 作者简介 图书目录 书中前言/序言 《AI时代Python金融大数据分析实战》全书速览 结束语 前言 随着人工智能技术的发展和金融行业的不断进步&#xff0c;大数据分…

Linux下的多线程编程:原理、工具及应用(1)

&#x1f3ac;慕斯主页&#xff1a;修仙—别有洞天 ♈️今日夜电波&#xff1a;Flower of Life—陽花 0:34━━━━━━️&#x1f49f;──────── 4:46 &#x1f504; ◀️ ⏸ ▶️ ☰ …

记一次Spring事务失效的发现与解决过程

一、事情起因是这样的 首先&#xff0c;我们是使用Spring mybatis 进行开发。 某功能在测试环境看到报错日志&#xff0c; 但是数据库里面的数据发生了变化&#xff0c;没有回滚。 执行数据库update 操作的方法上明确有 Transactional(rollbackFor Exception.class)的注解。…

【数学建模】熵权法

之前我们学了层次分析法和topsis法&#xff0c;但是主观性十分强&#xff0c;有没有科学的方法得出权重呢&#xff1f;今天&#xff0c;我们来学习熵权法&#xff01; 基本概念&#xff1a; 熵权法&#xff0c;物理学名词&#xff0c;按照信息论基本原理的解释&#xff0c;信息…

Spring状态机简单实现

一、什么是状态机 状态机&#xff0c;又称有限状态自动机&#xff0c;是表示有限个状态以及在这些状态之间的转移和动作等行为的计算模型。状态机的概念其实可以应用的各种领域&#xff0c;包括电子工程、语言学、哲学、生物学、数学和逻辑学等&#xff0c;例如日常生活中的电…

什么是MVC三层结构

1.MVC&#xff08;三层结构&#xff09; MVC&#xff08;Model-View-Controller&#xff09;是一种常见的软件设计模式&#xff0c;用于将应用程序的逻辑和界面分离成三个不同的组件。每个组件负责特定的任务&#xff0c;从而提高代码的可维护性和可扩展性。 以前的模式。 遇到…

数据集下载

一、数据集下载——谷歌Open images 谷歌Open-image-v6是由谷歌出资标注的一个超大型数据集&#xff0c;数据大小达到600多G&#xff0c;类别达到600多种分类&#xff0c;对于普通研究者而言&#xff0c;根本没办法全部下载下来做测试&#xff0c;也没必要。只需要下载与自己任…

苹果Vision Pro即将在中日韩等九国开卖 | 百能云芯

苹果公司近期透露&#xff0c;首款混合实境&#xff08;MR&#xff09;头盔「Vision Pro」即将在今年晚些时候推向更多国家销售。虽然苹果尚未公布具体的销售细节&#xff0c;但根据最新的外媒报道&#xff0c;这款高科技产品可能即将在中国、日本、韩国等九个国家开卖&#xf…

三翼鸟门店转型升级:首批260家线下店入驻天猫喵店

作者 | 曾响铃 文 | 响铃说 “资深玩家教你如何做全屋智能家居”、“一条视频给你讲清楚智能家居的设计思路”……在各大网站上搜索“智能家居”&#xff0c;就会出现类似的标题。区别于传统家居博主&#xff0c;他们主要通过分享智能家居体验&#xff0c;讲解智能家居设计等…

Hadoop大数据应用:Linux 部署 HDFS 分布式集群

目录 一、实验 1.环境 2.Linux 部署 HDFS 分布式集群 3.Linux 使用 HDFS 文件系统 二、问题 1.ssh-copy-id 报错 2. 如何禁用ssh key 检测 3.HDFS有哪些配置文件 4.hadoop查看版本报错 5.启动集群报错 6.hadoop 的启动和停止命令 7.上传文件报错 8.HDFS 使用命令 一…

【JetsonNano】onnxruntime-gpu 环境编译和安装,支持 Python 和 C++ 开发

1. 设备 2. 环境 sudo apt-get install protobuf-compiler libprotoc-devexport PATH/usr/local/cuda/bin:${PATH} export CUDA_PATH/usr/local/cuda export cuDNN_PATH/usr/lib/aarch64-linux-gnu export CMAKE_ARGS"-DONNX_CUSTOM_PROTOC_EXECUTABLE/usr/bin/protoc&qu…

SAT和SMT介绍及求解器使用

一、SAT 1、介绍 &#xff08;1&#xff09;定义 SAT即命题逻辑公式的可满足性问题/布尔可满足性问题。即给定一个与或非和变量组成的命题公式&#xff0c;判断是否存在一些结果使得这个公式成立 它是第一个被确认为NP完全的问题。 输入&#xff1a;析取范式&#xff08;C…

新站上线了

新站上线了 由于本人自身的向往&#xff0c;以及粉丝朋友的广大呼吁。我终于抽出时间给我的新站上线了。感谢各位粉丝好友的关注。欢迎大家前来踩站~。 新站地址&#xff1a;https://jhj-coding.top/ 今后会同时维护CSDN与jhj-coding哦&#xff01;期待新站可以给大家带来更好…

穿越半个世纪,探索中国数据库的前世今生

引言 在数字化潮流席卷全球的今天&#xff0c;数据库作为 IT 技术领域的“活化石”&#xff0c;已成为数字经济时代不可或缺的基础设施。那么&#xff0c;中国的数据库技术发展经历了怎样的历程&#xff1f;我们是如何在信息技术的洪流中逐步建立起自己的数据管理帝国的呢&…

Vue3基础笔记(1)模版语法 属性绑定 渲染

Vue全称Vue.js是一种渐进式的JavaScript框架&#xff0c;采用自底向上增量开发的设计&#xff0c;核心库只关注视图层。性能丰富&#xff0c;完全有能力驱动采用单文件组件和Vue生态系统支持的库开发的复杂单页应用&#xff0c;适用于场景丰富的web前端框架。灵活性和可逐步集成…

Modbus -tcp协议使用第二版

1.1 协议描述 1.1.1 总体通信结构 MODBUS TCP/IP 的通信系统可以包括不同类型的设备&#xff1a; &#xff08;1&#xff09;连接至 TCP/IP 网络的 MODBUS TCP/IP 客户机和服务器设备&#xff1b; &#xff08;2&#xff09;互连设备&#xff0c;例如&#xff1a;在 TCP/IP…

【消息队列开发】 实现内存加载

文章目录 &#x1f343;前言&#x1f333;实现思路&#x1f6a9;读取消息长度&#x1f6a9;读取相应长度的消息&#x1f6a9;进行反序列化&#x1f6a9;判定是否有效&#x1f6a9;加入有效消息&#x1f6a9;收尾工作&#x1f6a9;代码实现 ⭕总结 &#x1f343;前言 本次开发目…

微信小程序基础面试题

1、简述微信小程序原理 小程序本质就是一个单页面应用&#xff0c;所有的页面渲染和事件处理&#xff0c;都在一个页面内进行&#xff0c;但又可以通过微信客户端调用原生的各种接口&#xff1b;它的架构&#xff0c;是数据驱动的架构模式&#xff0c;它的UI和数据是分离的&am…

【UE5】动画混合空间的基本用法

项目资源文末百度网盘自取 什么是动画混合空间 混合空间分为两种: 通过一个数值控制通过两个数值控制 下面通过演示让大家更直观地了解 在Character文件夹中单击右键,选择动画(Animation),选择旧有的混合空间1D 然后选择骨骼&#xff08;动画是基于骨骼显示的,所以需要选择…

杂七杂八111

MQ 用处 一、异步。可提高性能和吞吐量 二、解耦 三、削峰 四、可靠。常用消息队列可以保证消息不丢失、不重复消费、消息顺序、消息幂等 选型 一Kafak:吞吐量最大&#xff0c;性能最好&#xff0c;集群高可用。缺点&#xff1a;会丢数据&#xff0c;功能较单一。 二Ra…