如何理解AQS

AQS核心数据结构

AQS内部主要维护了一个FIFO(先进先出)的双向链表。

AQS数据结构原理

AQS内部维护的双向链表中的各个节点分别指向直接的前驱节点和直接的后续节点。所以,在AQS内部维护的双向链表可以从其中的任意一个节点遍历前驱结点和后继结点。
链表中的每个节点都是对线程的封装,在并发场景下,如果某个线程竞争锁失败,就会被封装成一个Node节点加入AQS队列的末尾。当获取到锁的线程释放锁后,会从AQS队列中唤醒一个被阻塞的线程。同时,在AQS中维护了一个使用volatile修饰的变量state来标识相应的状态。AQS内部的数据结构如下图:
在这里插入图片描述从上图可以看出,在AQS内部的双向链表中,每个节点都是对一个现成的封装。同时,存在一个头结点指针指向链表的头部,存在一个尾结点指向链表的尾部。头结点指针和尾指针会通过CAS操作改变链表节点的指向。
另外,头结点指针指向的节点封装的线程会占用资源,同时会通过CAS的方式更新AQS中的state变量。链表中其他节点的线程未竞争到资源,不会通过CAS操作更新state资源。

AQS内部队列模式

从本质上讲,AQS内部实现了两个队列,一个是同步队列,另一个是条件队列。同步队列的结构如下:
在这里插入图片描述
在同步队列中,如果当前线程获取资源失败,就会通过addWaiter()方法将当前线程放入队列的尾部,并且保持自旋的状态,不判断自己所在的节点是否是队列的头结点。如果自己所在的节点是头结点,那么此时不断判断尝试获取资源,如果获取资源成功,则通过acquire()方法退出同步队列。
AQS同步条件队列的结构如下图:
在这里插入图片描述
AQS中的条件队列就是为Lock锁实现的一个基础同步器,只有在使用了Condition时才会存在条件队列,并且一个线程可能存在多个条件队列。

AQS底层锁的支持

AQS底层支持独占锁和共享锁两种模式。其中,独占锁同一时刻只能被一个线程占用,例如,基于AQS实现的ReentrantLock锁。共享锁则在同一时刻可以被多个线程占用,例如,基于AQS实现的CountDownLatch和Semaphore等。基于AQS实现的ReadWriteLock则同时实现了独占锁和共享锁两种模式。

核心状态位

在AQS中维护了一个volatile修饰的核心状态标识state,用于标识锁的状态,如下所示:

private volatile int state

state标量使用volatile修饰,所以能够保证可见性,所以能够保证可见性,当任意改变了state变量的值后,其他线程能够立刻读取到state变量的最新值。
AQS针对state变量提供了getState()方法来读取state变量的值,提供了setState()方法来设置state变量的值。由于setState()方法无法保证原子性,所以,AQS中又提供了compareAndSetState()方法保证修改state变量的原子性。AQS中提供了getState(),setState()和compareAndSetState()方法如下:

    /**
     * Returns the current value of synchronization state.
     * This operation has memory semantics of a {@code volatile} read.
     * @return current state value
     */
    protected final int getState() {
        return state;
    }

    /**
     * Sets the value of synchronization state.
     * This operation has memory semantics of a {@code volatile} write.
     * @param newState the new state value
     */
    protected final void setState(int newState) {
        state = newState;
    }

    /**
     * Atomically sets synchronization state to the given updated
     * value if the current state value equals the expected value.
     * This operation has memory semantics of a {@code volatile} read
     * and write.
     *
     * @param expect the expected value
     * @param update the new value
     * @return {@code true} if successful. False return indicates that the actual
     *         value was not equal to the expected value.
     */
    protected final boolean compareAndSetState(int expect, int update) {
        return U.compareAndSetInt(this, STATE, expect, update);
    }

核心节点类

AQS实现的独占锁和共享锁模式都是在其静态内部类Node中定义的。静态内部类Node的源码如下(java17版本的Node与1.8的不太一样,17版本的是abstract static final class Node{}):

static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;

        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;

        /**
         * Status field, taking on only the values:
         *   SIGNAL:     The successor of this node is (or will soon be)
         *               blocked (via park), so the current node must
         *               unpark its successor when it releases or
         *               cancels. To avoid races, acquire methods must
         *               first indicate they need a signal,
         *               then retry the atomic acquire, and then,
         *               on failure, block.
         *   CANCELLED:  This node is cancelled due to timeout or interrupt.
         *               Nodes never leave this state. In particular,
         *               a thread with cancelled node never again blocks.
         *   CONDITION:  This node is currently on a condition queue.
         *               It will not be used as a sync queue node
         *               until transferred, at which time the status
         *               will be set to 0. (Use of this value here has
         *               nothing to do with the other uses of the
         *               field, but simplifies mechanics.)
         *   PROPAGATE:  A releaseShared should be propagated to other
         *               nodes. This is set (for head node only) in
         *               doReleaseShared to ensure propagation
         *               continues, even if other operations have
         *               since intervened.
         *   0:          None of the above
         *
         * The values are arranged numerically to simplify use.
         * Non-negative values mean that a node doesn't need to
         * signal. So, most code doesn't need to check for particular
         * values, just for sign.
         *
         * The field is initialized to 0 for normal sync nodes, and
         * CONDITION for condition nodes.  It is modified using CAS
         * (or when possible, unconditional volatile writes).
         */
        volatile int waitStatus;

        /**
         * Link to predecessor node that current node/thread relies on
         * for checking waitStatus. Assigned during enqueuing, and nulled
         * out (for sake of GC) only upon dequeuing.  Also, upon
         * cancellation of a predecessor, we short-circuit while
         * finding a non-cancelled one, which will always exist
         * because the head node is never cancelled: A node becomes
         * head only as a result of successful acquire. A
         * cancelled thread never succeeds in acquiring, and a thread only
         * cancels itself, not any other node.
         */
        volatile Node prev;

        /**
         * Link to the successor node that the current node/thread
         * unparks upon release. Assigned during enqueuing, adjusted
         * when bypassing cancelled predecessors, and nulled out (for
         * sake of GC) when dequeued.  The enq operation does not
         * assign next field of a predecessor until after attachment,
         * so seeing a null next field does not necessarily mean that
         * node is at end of queue. However, if a next field appears
         * to be null, we can scan prev's from the tail to
         * double-check.  The next field of cancelled nodes is set to
         * point to the node itself instead of null, to make life
         * easier for isOnSyncQueue.
         */
        volatile Node next;

        /**
         * The thread that enqueued this node.  Initialized on
         * construction and nulled out after use.
         */
        volatile Thread thread;

        /**
         * Link to next node waiting on condition, or the special
         * value SHARED.  Because condition queues are accessed only
         * when holding in exclusive mode, we just need a simple
         * linked queue to hold nodes while they are waiting on
         * conditions. They are then transferred to the queue to
         * re-acquire. And because conditions can only be exclusive,
         * we save a field by using special value to indicate shared
         * mode.
         */
        Node nextWaiter;

        /**
         * Returns true if node is waiting in shared mode.
         */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        /**
         * Returns previous node, or throws NullPointerException if null.
         * Use when predecessor cannot be null.  The null check could
         * be elided, but is present to help the VM.
         *
         * @return the predecessor of this node
         */
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

通过上述代码可以看出,静态内部类Node是一个双向链表,链表中的每个节点都存在一个指向前驱节点的指针prev和一个指向直接后继节点的指针next,每个节点中都保存了当前的状态waitStatus和当前线程thread。并且在Node类中通过SHARED和EXCLUSIVE将其定义成共享锁和独占锁,如下所示:

        /** Marker to indicate a node is waiting in shared mode */
        //标识当前节点为共享模式
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        //标识当前节点为独占模式
        static final Node EXCLUSIVE = null;

在Node类中定义了4个常量

        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;
  • CANCELLED :表示当前节点中的线程已被取消
  • SIGNAL :表示后继节点中的线程处于等待状态,需要被唤醒
  • CONDITION :表示当前节点中的线程在等待某个条件,也就是当前节点处于condition队列中。
  • PROPAGATE :表示在当前场景下能够执行后续的acquireShared操作
    另外,在Node类中存在一个volatile修饰的成员变量waitStatus,如下所示:
 volatile int waitStatus;

waitStatus的取值就是上面的4个常量值,在默认情况下,waitStatus的取值为0,表示当前节点在sync队列中,等待获取锁。

独占锁模式

在AQS中,独占锁模式比较常用,使用范围也比较广泛,它的一个典型实现就是reentrantLock。独占锁的加锁和解锁都是通过互斥实现的。

独占模式加锁流程

在AQS中,独占模式中加锁的核心入口就是acquire()方法,如下所示

    /**
     * Acquires in exclusive mode, ignoring interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     */
    public final void acquire(long arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

当某个线程调用acquire()方法获取独占锁时,在acquire()方法中会首先调用tryAcquire()方法尝试获取锁资源,tryAcquire()方法在AQS中没有具体的实现,只是简单的抛出了UnsupportedOperationException异常,具体的逻辑由AQS的子类实现,如下所示:

    /**
     * Attempts to acquire in exclusive mode. This method should query
     * if the state of the object permits it to be acquired in the
     * exclusive mode, and if so to acquire it.
     *
     * <p>This method is always invoked by the thread performing
     * acquire.  If this method reports failure, the acquire method
     * may queue the thread, if it is not already queued, until it is
     * signalled by a release from some other thread. This can be used
     * to implement method {@link Lock#tryLock()}.
     *
     * <p>The default
     * implementation throws {@link UnsupportedOperationException}.
     *
     * @param arg the acquire argument. This value is always the one
     *        passed to an acquire method, or is the value saved on entry
     *        to a condition wait.  The value is otherwise uninterpreted
     *        and can represent anything you like.
     * @return {@code true} if successful. Upon success, this object has
     *         been acquired.
     * @throws IllegalMonitorStateException if acquiring would place this
     *         synchronizer in an illegal state. This exception must be
     *         thrown in a consistent fashion for synchronization to work
     *         correctly.
     * @throws UnsupportedOperationException if exclusive mode is not supported
     */
    protected boolean tryAcquire(long arg) {
        throw new UnsupportedOperationException();
    }

当tryAcquire()方法返回false时,首先会调用addWaiter()方法将当前线程封装成独占式模式的节点,添加到AQS的队列尾部。addWaiter()方法的源码如下:

    /**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

在addWaiter()方法中,当前线程会被封装成独占模式的Node节点,Node节点被尝试放入队列尾部,如果放入成功,则通过CAS操作修改Node节点与前驱节点的指向关系。如果Node节点放入队列尾部失败或者CAS操作失败,则调用end()方法处理Node节点。

    /**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

在end()方法中,Node节点通过CAS自旋的方式被添加到队列尾部,直到添加成功为止。具体的实现方式是判断队列是否为空,如果队列为空,则创建一个空节点作为head节点,然后将tail指向head节点。在下次自旋时,就会满足队列不为空的条件,通过CAS方式将Node节点放入队列尾部。
此刻,回到acquire()方法,当通过调用addWaiter()成功将当前线程封装成独占模式的Node节点放入队列后,会调用acquireQueued()方法在等待队列中排队。acquireQueued()方法的源码如下:

    /**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, long arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

在acquireQueued()方法中,首先定义一个failed变量来标识获取资源是否失败,默认值为true,表示获取资源失败。然后,定义一个表示当前线程是否被中断过的标识interrupted,默认值为false,表示没有被中断过。
最后,进入一个自旋逻辑,获取当前Node节点的前驱节点,如果当前Node节点的前驱节点是head节点,便是当前Node节点可以尝试获取资源。如果当前节点获取资源成功,则将head节点指向Node节点。也就是说,head节点指向的Node节点就是获取到资源的节点或者为null。
在setHead()方法中,当前节点的prev指针会被设置为null,随后,当前Node节点的前驱节点的next指针被设置为null,表示head节点出队列,整个操作成功后会返回等待过程是否被中断过的标识。
如果当前节点的前驱节点不是head,则调用shouldParkAfterFailedAcquire()方法判断当前线程是否可以进入waitting状态。如果可以进入阻塞状态,则进入阻塞状态直接调用LockSupport的unpark()方法唤醒当前线程。
shouldParkAfterFailedAcquire()方法的源码如下:

    /**
     * Checks and updates status for a node that failed to acquire.
     * Returns true if thread should block. This is the main signal
     * control in all acquire loops.  Requires that pred == node.prev.
     *
     * @param pred node's predecessor holding status
     * @param node the node
     * @return {@code true} if thread should block
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

在shouldParkAfterFailedAcquire()方法中,先获取当前节点的前驱节点的状态,如果前驱结点的状态为SIGNAL(-1),则直接返回true。如果前驱结点的状态大于0,则当前节点一直向前移动,直到找到一个waitStatus状态小于或等于0的节点,排在这个节点的后面。
在acquireQueued()方法中,如果shouldParkAfterFailedAcquire()方法返回true,则调用parkAndCheckInterrupt()方法阻塞当前线程。parkAndCheckInterrupt()方法的源码如下:

    /**
     * Convenience method to park and then check if interrupted
     *
     * @return {@code true} if interrupted
     */
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

在parkAndCheckInterrupt()方法中,通过LockSupport的park()方法阻塞线程。至此,在独占模式下,整个加锁流程分析完毕。

独占模式解锁流程

在独占锁模式中,释放锁的核心入口是release()方法,如下所示:

 /**
     * Releases in exclusive mode.  Implemented by unblocking one or
     * more threads if {@link #tryRelease} returns true.
     * This method can be used to implement method {@link Lock#unlock}.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryRelease} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @return the value returned from {@link #tryRelease}
     */
    public final boolean release(long arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

在relase()方法中,会先调用tryRelease()方法尝试释放锁,tryRelease()方法在AQS中同样没有具体的实现逻辑,只是简单的抛出了UnsupportedOperationException异常,具体的逻辑由AQS的子类实现,如下所示:

    /**
     * Attempts to set the state to reflect a release in exclusive
     * mode.
     *
     * <p>This method is always invoked by the thread performing release.
     *
     * <p>The default implementation throws
     * {@link UnsupportedOperationException}.
     *
     * @param arg the release argument. This value is always the one
     *        passed to a release method, or the current state value upon
     *        entry to a condition wait.  The value is otherwise
     *        uninterpreted and can represent anything you like.
     * @return {@code true} if this object is now in a fully released
     *         state, so that any waiting threads may attempt to acquire;
     *         and {@code false} otherwise.
     * @throws IllegalMonitorStateException if releasing would place this
     *         synchronizer in an illegal state. This exception must be
     *         thrown in a consistent fashion for synchronization to work
     *         correctly.
     * @throws UnsupportedOperationException if exclusive mode is not supported
     */
    protected boolean tryRelease(long arg) {
        throw new UnsupportedOperationException();
    }

在release()方法中,如果tryRelease()方法返回true,则会先获取head节点,当head节点不为空,并且head节点的waitStatus状态不为0时,会调用unparkSuccessor()方法,并将head节点传入方法中。unparkSuccessor()方法的远吗如下:

    /**
     * Wakes up node's successor, if one exists.
     *
     * @param node the node
     */
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

unparkSuccessor()方法的主要逻辑是唤醒队列中最前面的线程。这里需要结合acquireQueued()方法理解,当线程被唤醒后,会进入acquireQueued()方法中的if(p==head && tryAcquier(arg))逻辑判断,当条件成立是,被唤醒的线程会将自己所在的节点设置为head,表示已经获取到资源,此时,acquire()方法也执行完毕了。

共享锁模式

在AQS中,共享锁模式下的加锁和释放锁操作和独占锁不同,接下来,就简单介绍下AQS共享锁模式下的加锁和释放锁的流程。

共享模式下加锁流程

在AQS中,共享模式下的加锁操作核心入口方法是acquireShared(),如下所示:

    /**
     * Acquires in shared mode, ignoring interrupts.  Implemented by
     * first invoking at least once {@link #tryAcquireShared},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquireShared} until success.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquireShared} but is otherwise uninterpreted
     *        and can represent anything you like.
     */
    public final void acquireShared(long arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

在acquireShared()方法中,会先调用tryAcquireShared()方法尝试获取共享资源,tryAcquireShared()方法在AQS中并没有具体的实现逻辑,只是简单的抛出UnsupportedOperationException异常,具体的逻辑由AQS的子类实现,如下所示:

    /**
     * Attempts to acquire in shared mode. This method should query if
     * the state of the object permits it to be acquired in the shared
     * mode, and if so to acquire it.
     *
     * <p>This method is always invoked by the thread performing
     * acquire.  If this method reports failure, the acquire method
     * may queue the thread, if it is not already queued, until it is
     * signalled by a release from some other thread.
     *
     * <p>The default implementation throws {@link
     * UnsupportedOperationException}.
     *
     * @param arg the acquire argument. This value is always the one
     *        passed to an acquire method, or is the value saved on entry
     *        to a condition wait.  The value is otherwise uninterpreted
     *        and can represent anything you like.
     * @return a negative value on failure; zero if acquisition in shared
     *         mode succeeded but no subsequent shared-mode acquire can
     *         succeed; and a positive value if acquisition in shared
     *         mode succeeded and subsequent shared-mode acquires might
     *         also succeed, in which case a subsequent waiting thread
     *         must check availability. (Support for three different
     *         return values enables this method to be used in contexts
     *         where acquires only sometimes act exclusively.)  Upon
     *         success, this object has been acquired.
     * @throws IllegalMonitorStateException if acquiring would place this
     *         synchronizer in an illegal state. This exception must be
     *         thrown in a consistent fashion for synchronization to work
     *         correctly.
     * @throws UnsupportedOperationException if shared mode is not supported
     */
    protected long tryAcquireShared(long arg) {
        throw new UnsupportedOperationException();
    }

tryAcquireShared()方法的返回值存在如下几种情况。

  • 返回负数:表示获取资源失败
  • 返回0:表示获取资源成功,但是没有剩余资源
  • 返回正数:表示获取资源成功,还有剩余资源
    当tryAcquireShared()方法获取资源失败时,在acquireShared()方法中会调用doAcquireShared()方法。doAcquireShared()方法的源码如下:
    /**
     * Acquires in shared uninterruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireShared(long arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    long r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

doAcquireShared()方法的主要逻辑就是将当前线程放入队列的尾部并阻塞,直到有其他线程释放资源并唤醒当前线程,当前线程在获取到指定量的资源后返回。
在doAcquireShared()方法中,如果当前节点的前驱节点是head节点,则尝试获取资源;如果获取资源成功,则调用setHeadAndPropagate()方法将head指向当前节点;同时如果还有剩余资源,则继续唤醒队列中后面的线程。setHeadAndPropagate()方法的源码如下:

 /**
     * Sets head of queue, and checks if successor may be waiting
     * in shared mode, if so propagating if either propagate > 0 or
     * PROPAGATE status was set.
     *
     * @param node the node
     * @param propagate the return value from a tryAcquireShared
     */
    private void setHeadAndPropagate(Node node, long propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        /*
         * Try to signal next queued node if:
         *   Propagation was indicated by caller,
         *     or was recorded (as h.waitStatus either before
         *     or after setHead) by a previous operation
         *     (note: this uses sign-check of waitStatus because
         *      PROPAGATE status may transition to SIGNAL.)
         * and
         *   The next node is waiting in shared mode,
         *     or we don't know, because it appears null
         *
         * The conservatism in both of these checks may cause
         * unnecessary wake-ups, but only when there are multiple
         * racing acquires/releases, so most need signals now or soon
         * anyway.
         */
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

在setHeadAndPropagate()方法中,首先将head节点赋值给临时节点h,并将head指向当前节点,如果资源还有剩余,则继续唤醒队列中后面的线程。

共享模式释放资源流程

在共享模式下,释放锁的核心入口方法是releaseShared(),如下所示

 /**
     * Releases in shared mode.  Implemented by unblocking one or more
     * threads if {@link #tryReleaseShared} returns true.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryReleaseShared} but is otherwise uninterpreted
     *        and can represent anything you like.
     * @return the value returned from {@link #tryReleaseShared}
     */
    public final boolean releaseShared(long arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return fal

在releaseShared()方法中,会先调用tryReleaseShared()方法尝试释放资源,tryReleaseShared()方法在AQS中并没有具体的逻辑实现,只是简单的抛出了UnsupportedOperationException异常,具体的逻辑仍然交给由AQS的子类实现,如下所示:

    /**
     * Attempts to set the state to reflect a release in shared mode.
     *
     * <p>This method is always invoked by the thread performing release.
     *
     * <p>The default implementation throws
     * {@link UnsupportedOperationException}.
     *
     * @param arg the release argument. This value is always the one
     *        passed to a release method, or the current state value upon
     *        entry to a condition wait.  The value is otherwise
     *        uninterpreted and can represent anything you like.
     * @return {@code true} if this release of shared mode may permit a
     *         waiting acquire (shared or exclusive) to succeed; and
     *         {@code false} otherwise
     * @throws IllegalMonitorStateException if releasing would place this
     *         synchronizer in an illegal state. This exception must be
     *         thrown in a consistent fashion for synchronization to work
     *         correctly.
     * @throws UnsupportedOperationException if shared mode is not supported
     */
    protected boolean tryReleaseShared(long arg) {
        throw new UnsupportedOperationException();
    }

在releaseShared()方法中,调用tryReleaseShared()方法尝试释放资源成功,会继续唤醒队列中后面的线程。
doReleaseShared()方法主要用来唤醒队列中后面的线程,doReleaseShared()方法的源码如下:

    /**
     * Release action for shared mode -- signals successor and ensures
     * propagation. (Note: For exclusive mode, release just amounts
     * to calling unparkSuccessor of head if it needs signal.)
     */
    private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

在doReleaseShared()方法中,通过自旋的方式获取头结点,当头结点不为空,且队列不为空时,判断头结点的waitStatus状态是否为SINNAL(-1)。当满足条件时,会通过CAS将头节点的waitStatus状态设置为0,如果CAS操作设置失败,则继续自旋。如果CAS操作设置成,则唤醒队列中的后继结点。
如果头结点的waitStatus状态值为0,并且在通过CAS操作将头结点的waitStatus状态设置为PROPAGATE(-3)时失败,则继续自旋逻辑。
如果在自旋的过程中发现没有后继结点了,则退出自旋逻辑。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/2896.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【尝鲜版】ChatGPT插件开发指南

3月23日&#xff0c;OpenAI官方发布了一则公告&#xff0c;宣告ChatGPT已经支持了插件功能&#xff0c;现在处于内测阶段。插件的意义不仅仅在于功能的扩展&#xff0c;它直接让ChatGTP拥有了联网的能力&#xff01;简直是猛兽出笼、蛟龙出海&#xff0c;要让ChatGPT大杀特杀啊…

phpstorm断点调试

环境&#xff1a;win10phpstorm2022phpstudy8lnmp 1、phpinfo(); 查看是否安装xdebug&#xff0c;没有走以下流程 2、phpstudy中切换不同版本php版本&#xff0c;有些版本不支持xdebug&#xff08;如php8.0.2&#xff09;&#xff0c;有些已经自带了&#xff08;如php7.3.9&a…

Java奠基】Java经典案例讲解

目录 卖飞机票 找质数 开发验证码 数组元素的复制 评委打分 数字加密 数字解密 抢红包 模拟双色球 二维数组 卖飞机票 需求&#xff1a;机票价格按照淡季旺季、头等舱和经济舱收费、输入机票原价、月份和头等舱或经济舱。按照如下规则计算机票价格&#xff1a; 旺季&…

技术分享——Java8新特性

技术分享——Java8新特性1.背景2. 新特性主要内容3. Lambda表达式4. 四大内置核心函数式接口4.1 Consumer<T>消费型接口4.2 Supplier<T>供给型接口4.3 Function<T,R>函数型接口4.4 Predicate<T> 断定型接口5. Stream流操作5.1 什么是流以及流的类型5.2…

[攻城狮计划]如何优雅的在RA2E1上运行RT_Thread

文章目录[攻城狮计划]|如何优雅的在RA2E1上运行RT_Thread准备阶段&#x1f697;开发板&#x1f697;开发环境&#x1f697;下载BSP&#x1f697;编译烧录连接串口总结[攻城狮计划]|如何优雅的在RA2E1上运行RT_Thread &#x1f680;&#x1f680;开启攻城狮的成长之旅&#xff0…

【ChatGPT】教你搭建多任务模型

ChatGPT教你搭建多任务模型 You: tell me what’s your version of gpt ? ChatGPT: As an AI language model developed by OpenAI, I am based on the GPT (Generative Pretrained Transformer) architecture. However, my version is known as GPT-3.5, which is an updat…

数据泄漏防护 (DLP) 工具保护敏感数据

通过实时安全监控&#xff0c;通过端点&#xff08;即 USB、电子邮件、打印等&#xff09;检测、中断和防止敏感数据泄露。使用 DataSecurity Plus 的数据泄漏防护 &#xff08;DLP&#xff09; 工具保护敏感数据不被泄露或被盗。DataSecurity Plus 主要功能包括&#xff1a; …

Android APP检查设备是否为平板

正文 Android APP判断设备是否为平板的三种方法&#xff1a; 通过屏幕尺寸判断。一般来说&#xff0c;平板电脑的屏幕尺寸比手机大很多&#xff0c;可以根据屏幕的长宽比和尺寸等信息来区分设备类型。通过屏幕像素密度判断。一般来说&#xff0c;平板电脑的屏幕像素密度比手机…

Java开发一年不到,来面试居然敢开口要20K,面完连8K都不想给~

前言 我的好朋友兼大学同学老伍家庭经济情况不错&#xff0c;毕业之后没两年自己存了点钱加上家里的支持&#xff0c;自己在杭州开了一家网络公司。由于公司不是很大所以公司大部分的开发人员都是自己面试的&#xff0c;近期公司发展的不错&#xff0c;打算扩招也面试了不少人…

四级数据库工程师 刷真题错题整理(三)数据库原理

1.数据模型是对现实世界进行抽象的工具&#xff0c;它按算机系统的观点模于提数据库系统中信息表示和操作手段的形式框架&#xff0c;主要用于 DBMS 的实现&#xff0c;是数据库系统的核心和基础。其中&#xff0c;数据操作是对数据间的动态行为。 2.数据库的型是稳定的&#…

day38_JDBC

今日内容 上课同步视频:CuteN饕餮的个人空间_哔哩哔哩_bilibili 同步笔记沐沐霸的博客_CSDN博客-Java2301 零、 复习昨日 一、数据库连接池 二、反射 三、封装DBUtil 零、 复习昨日 SQL注入 预处理语句 String sql "select * from user where id ?"; PreparedStat…

企业微信中如何拉黑?拉黑个人和群成员有什么区别?

企业微信既可以拉黑个人好友&#xff0c;又可以拉黑群好友。 1. 拉黑个人好友 拉黑好友通俗来说就是不想再接收到对方的信息&#xff0c;企业微信可以通过设置消息免打扰的方式来屏蔽对方的消息。 【客户聊天界面】-【右上角的小人标志】-【第一栏名称进入】-【右上角三点】…

C语言——动态内存管理 malloc、calloc、realloc、free的使用

目录 一、为什么存在动态内存分配 二、动态内存函数的介绍 2.1malloc和free 2.2calloc 2.3realloc 三、常见的动态内存错误 3.1对NULL指针的解引用操作 3.2对动态开辟空间的越界访问 3.3对非动态开辟的内存使用free释放 3.4使用free释放一块动态开辟内存的一部分 3.5…

奇安信_防火墙部署_透明桥模式

奇安信_防火墙部署_透明桥模式一、预备知识二、项目场景三、拓扑图四、基本部署配置1. 登录web控制台2.连通性配置3.可信主机配置4.授权导入5.特征库升级6.安全配置文件五、透明桥配置1. 创建桥2. 端口绑定桥3. 创建桥端口六、结语一、预备知识 安全设备接入网络部署方式 二、…

运算放大器:电压比较器

目录一、单限电压比较器二、滞回电压比较器三、窗口电压比较器最近在学习电机控制&#xff0c;遇到了与运算放大电路相关的知识&#xff0c;然而太久没有接触模拟电路&#xff0c;对该知识已经淡忘了&#xff0c;及时温故而知新&#xff0c;做好笔记&#xff0c;若有错误、不足…

字节跳动测试岗面试记:二面被按地上血虐,所幸Offer已到手...

在互联网做了几年之后&#xff0c;去大厂“镀镀金”是大部分人的首选。大厂不仅待遇高、福利好&#xff0c;更重要的是&#xff0c;它是对你专业能力的背书&#xff0c;大厂工作背景多少会给你的简历增加几分竞争力。 但说实话&#xff0c;想进大厂还真没那么容易。最近面试字…

3分钟阐述这些年我的 接口自动化测试 职业生涯经验分享

接口自动化测试学习教程地址&#xff1a;https://www.bilibili.com/video/BV1914y1F7Bv/ 你好&#xff0c;我是凡哥。 很高兴能够分享我的接口自动化测试经验和心得体会。在我目前的职业生涯中&#xff0c;接口自动化测试是我经常进行的一项任务。通过不断地学习和实践&#xf…

【C++】map 和 set

文章目录一、关联式容器与键值对1、关联式容器2、键值对 pair3、树形结构的关联式容器二、set1、set 的介绍2、set 的使用三、multiset四、map1、map 的介绍2、map 的使用五、multimap一、关联式容器与键值对 1、关联式容器 在C初阶的时候&#xff0c;我们已经接触了 STL 中的…

基于SpringBoot的酒店管理系统

系统环境 开发语言&#xff1a;Java 框架&#xff1a;springboot JDK版本&#xff1a;JDK1.8 服务器&#xff1a;tomcat7 数据库&#xff1a;mysql 5.7&#xff08;一定要5.7版本&#xff09; 数据库工具&#xff1a;Navicat11 开发软件&#xff1a;eclipse/myeclipse/i…

matplotlib参数详解

文章目录一、简介二、安装与调用三、绘图与风格设置3.1、绘图标记3.1.1、标记类型&#xff08;marker*&#xff09;3.1.2、标记大小、内部和边框颜色&#xff08;ms10、mfcr、mecg&#xff09;3.2、绘图线3.2.1、线类型&#xff08;linestyle--&#xff09;3.2.2、线宽&#xf…