基本原理

AQS是Java中锁的基础,主要由两个队列组成。一个队列是同步队列,另一个是条件队列

同步队列的原理

  • 同步队列的队列头部是head,队列尾部是tail节点,head节点是一个空节点,同步队列是一个双向链表,通过nextprev连接所有节点
  • 所有的线程在竞争锁的时候都会创建一个Node节点,线程与节点绑定在一起,(如果是同步锁和排他锁不同之处是通过nextWaiter来区分的)并且添加到同步队列的尾部
  • head的第一个节点获取锁,其余节点都需要等待被唤醒
  • 同步队列中的节点会存在取消和null的情况(如:线程超时中断、线程更新节点的中间态),被取消和null的节点不能被唤醒,将会被视为无效节点
  • 一个线程只能被有效的前驱节点(取消和null的节点除外)唤醒
  • 持有锁的线程只能是有一个,其他有效节点对应的线程都会被挂起

条件队列的原理

  • 一个同步队列可以对应多个条件队列
  • 条件队列是一个单向链表,通过nextWaiter来连接起来,条件队列的头节点是firstWaiter,尾节点是lastWaiter
  • 某个条件队列中满足条件的节点(被signalsignalAll方法唤醒的节点)才会被转移到同步队列
  • 条件队列中的被转移到同步队列的节点是从头节点开始,条件队列中被阻塞的线程会添加到队列的尾部

同步队列的实现

首先,了解以下同步队列中队列的节点Node的数据结构

static final class Node {
        /** 共享锁的标识 */
        static final Node SHARED = new Node();
        /** 排他锁的标识 */
        static final Node EXCLUSIVE = null;

        /** 线程取消 */
        static final int CANCELLED =  1;
        /** 持有锁的线程的后继线程被挂起 */
        static final int SIGNAL    = -1;
        /** 条件队列标识 */
        static final int CONDITION = -2;
        /**
         * 共享锁情况下,通知所有其他节点
         */
        static final int PROPAGATE = -3;

        /**
         * waitStatus的取值如下:
         *   SIGNAL(-1): 当前节点的后继节点应该被挂起
         *   CANCELLED(1): 当前节点被取消
         *   CONDITION(-2): 当前节点在条件队列
         *   PROPAGATE(-3): 释放共享锁时需要通知所有节点
         *   0: 初始值
         *
         */
        volatile int waitStatus;

        /**
         * 前驱节点
         */
        volatile Node prev;

        /**
         * 后继节点
         */
        volatile Node next;

        /**
         * 节点对应的线程
         */
        volatile Thread thread;

        /**
         * 在共享锁的情况下,该节点的值为SHARED
         * 在排他锁的情况下,该节点的值为EXCLUSIVE
         * 在条件队列的情况下,链接的是下一个等待条件的线程
         */
        Node nextWaiter;
}

其次,我们来看一下同步队列的链表结构
深入浅出AQS源码解析-LMLPHP

接着,我们根据同步队列的原理来分析以下acquirerelease需要做哪些事情:

实现acquire功能需要做的事情

  1. 创建一个Node节点node(该节点可能是排他锁,也可以能是共享锁)
  2. node添加到同步队列尾部,如果同步队列为空(初始情况下),需要先创建一个空的头节点,然后再添加到队列的尾部
  3. 如果node的前驱节点是head,说明node是第一个节点,能够获取锁,需要将head修改成node,释放前驱节点的资源
  4. 如果node的前驱节点不是head,说明获取锁失败,需要检测是否需要将node绑定的线程挂起,分以下几种情况:
    • 如果nodewaitStatus已经被设置为SIGNAL 表示需要被挂起
    • 如果nodewaitStatus设置为CANCEL表示该节点已经被取消,需要被去掉,并修改 nodeprev,直到链接上一个有效的节点为止
    • 否则将nodewaitStatus设置为SIGNAL,表示即将要被挂起
  5. 如果需要将node绑定的线程挂起,则让出CPU,直到当前驱节点来唤起node才会开始继续从步骤3开始执行

与acquire功能相关的代码

  • acquire方法:获取排他锁
public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
  1. tryAcquire(arg):对外提供的一个扩展方法,常用的锁都要实现这个方法,具体实现与锁相关

  2. addWaiter(Node.EXCLUSIVE): 创建一个排他锁节点,并将该节点添加到同步队列尾部,代码如下:

private Node addWaiter(Node mode) {
        // 创建一个node,EXCLUSIVE类型
        Node node = new Node(mode);

        for (;;) {
            // 获取尾节点
            Node oldTail = tail;
            if (oldTail != null) {
                // 设置即将成为尾节点的前驱
                node.setPrevRelaxed(oldTail);
                // CAS操作设置尾节点
                if (compareAndSetTail(oldTail, node)) {
                    // 将新尾节点的前驱节点与新的尾节点关联起来
                    oldTail.next = node;
                    // 返回添加的节点
                    // 这个节点现在不一定是尾节点,因为如果有多个线程调用这个方法时,
                    // 可能还有节点添加在这个节点后面
                    return node;
                }
            } else {
                // 如果队列为空,初始化头节点
                initializeSyncQueue();
            }
        }
    }
  1. acquireQueued同步队列中的节点获取排他锁
final boolean acquireQueued(final Node node, int arg) {
        try {
            // 线程是否中断
            boolean interrupted = false;
            for (;;) {
                // 获取前驱节点
                final Node p = node.predecessor();
                // 如果前驱节点是头节点,获取锁
                if (p == head && tryAcquire(arg)) {
                    // 修改头节点
                    setHead(node);
                    // 释放头节点的资源
                    p.next = null; // help GC
                    // 返回线程中断的状态
                    // 这也是该方法唯一的返回值
                    // 没有获取锁的线程会一直执行该方法直到获取锁以后再返回
                    return interrupted;
                }
                // 获取锁失败后是否需要将线程挂起
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt()) // 线程挂起并返回是否被中断
                    interrupted = true;
            }
        } catch (Throwable t) {
            // 取消该节点
            cancelAcquire(node);
            throw t;
        }
    }
  1. shouldParkAfterFailedAcquire:检测线程获取锁失败以后是否需要被挂起
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 前驱节点的状态
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * 状态已经设置成SIGNAL,可以直接挂起该节点
             */
            return true;
        // 节点被取消
        if (ws > 0) {
            /*
             * 找到pred第一个有效的前驱节点
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            // pred可能是一个新的节点,需要将pred的next重写设置为node
            pred.next = node;
        } else {
            /*
             * CAS操作将pred节点的状态设置为SIGNAL
             */
            pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
        }
        // 只有当pred节点的waitStatus已经是SIGNAL状态时,才可以安全的挂起线程
        // 否则需要不能被挂起
        return false;
    }
  1. parkAndCheckInterrupt:将当前线程挂起,并检测当前线程是否中断
private final boolean parkAndCheckInterrupt() {
        // 线程挂起
        LockSupport.park(this);
        // 检测线程是否中断
        return Thread.interrupted();
    }
  1. cancelAcquire:取消节点
 private void cancelAcquire(Node node) {
        // 如果节点为空,什么都不做
        if (node == null)
            return;
        // 释放线程
        node.thread = null;

        // 从后往前过滤掉所有的被取消的节点
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        // 有效前驱节点的nex节点
        Node predNext = pred.next;

        // 将node设置为CANCELLED
        node.waitStatus = Node.CANCELLED;

        // 如果是尾节点,设置新的尾节点
        if (node == tail && compareAndSetTail(node, pred)) {
            // 将新的尾节点的后续设置为null
            pred.compareAndSetNext(predNext, null);
        } else {
            // If successor needs signal, try to set pred's next-link
            // so it will get one. Otherwise wake it up to propagate.
            int ws;
            // 如果前驱节点的线程不为null并且waitStatus为SIGNAL
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                // 将node设置成pred的后继节点
                if (next != null && next.waitStatus <= 0)
                    pred.compareAndSetNext(predNext, next);
            } else {
                // 唤起node节点的后继节点
                // 因为node节点已经释放锁了
                unparkSuccessor(node);
            }

            node.next = node; // help GC
        }
    }
  1. unparkSuccessor:唤醒后继节点
private void unparkSuccessor(Node node) {
        /*
         * 获取node节点的waitStatus
         */
        int ws = node.waitStatus;
       // 用CSA操作将waitStatus设置成初始状态
       // 不管设置是否成功,都无所谓,因为该节点即将被销毁
        if (ws < 0)
            node.compareAndSetWaitStatus(ws, 0);
        /*
         * 获取node的后继节点
         */
        Node s = node.next;
        // 如果后继节点为null或者被取消,
        // 通过从同步队列的尾节点开始一直往前找到一个有效的后继节点
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node p = tail; p != node && p != null; p = p.prev)
                if (p.waitStatus <= 0)
                    s = p;
        }
        // 如果后继节点不为空
        if (s != null)
            LockSupport.unpark(s.thread);// 唤醒后继节点的线程
    }

acquire方法类似的还有acquireInterruptiblytryAcquireNanosacquireSharedacquireSharedInterruptiblytryAcquireSharedNanos,我们都一一分析以下

  • acquireInterruptibly方法:获取可中断的排他锁
public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted()) // 如果线程中断,直接返回
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg); // 中断式的获取锁
    }
  1. doAcquireInterruptibly:可中断式的获取锁
private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
       // 创建一个排他节点加入同步队列
        final Node node = addWaiter(Node.EXCLUSIVE);
        try {
            for (;;) {
                // 获取前驱节点
                final Node p = node.predecessor();
                // 如果前驱节点是头节点,说明已经获取的锁
                if (p == head && tryAcquire(arg)) {
                    // 修改头节点
                    setHead(node);
                    p.next = null; // help GC
                    return;
                }
                // 如果没有获取锁,检测是否需要挂起
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException(); // 如果发现线程已经被中断,需要抛出异常
            }
        } catch (Throwable t) {
            // 发生异常取消节点
            cancelAcquire(node);
            throw t;
        }
    }
  • tryAcquireNanos方法:超时中断获取排他锁
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException(); // 线程中断直接返回
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout); // 超时获取排他锁
    }
  1. doAcquireNanos:超时获取排他锁
private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        // 如果超时直接返回
        if (nanosTimeout <= 0L)
            return false;
        // 获取超时时长
        final long deadline = System.nanoTime() + nanosTimeout;
        // 添加一个排他节点到同步队列尾部
        final Node node = addWaiter(Node.EXCLUSIVE);
        try {
            for (;;) {
                 // 获取前驱节点
                final Node p = node.predecessor();
                // 已经获取锁
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    return true;
                }
                nanosTimeout = deadline - System.nanoTime();
                // 如果超时了就取消
                if (nanosTimeout <= 0L) {
                    cancelAcquire(node);
                    return false;
                }
                // 检测节点是否需要被挂起
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
                    // 如果需要挂起,且超时时长大于SPIN_FOR_TIMEOUT_THRESHOLD
                    // 线程挂起nanosTimeout时间
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } catch (Throwable t) {
            // 发生异常取消节点
            cancelAcquire(node);
            throw t;
        }
    }
  • acquireShared方法:获取共享锁
public final void acquireShared(int arg) {
        // 对外提供的一个扩展方法,常用的锁都要实现这个方法,
        // 该方法的实现与锁的用途有关
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg); // 获取共享锁
    }
  1. doAcquireShared:获取共享锁
 private void doAcquireShared(int arg) {
        // 添加一个共享节点到同步队列尾部
        final Node node = addWaiter(Node.SHARED);
        try {
            boolean interrupted = false;
            for (;;) {
                // 获取前驱节点
                final Node p = node.predecessor();
                if (p == head) {
                    // 返回结果大于等于0表示获取共享锁
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        // 设置头节点并广播通知其他获取共享锁的节点
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        // 如果线程被中断,将该线程中断
                        // 共享锁会被多个线程获取,如果需要中断
                        // 所有获取共享锁的线程都要被中断
                        if (interrupted)
                            selfInterrupt();
                        return;
                    }
                }
                // 检测是否需要挂起
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt()) // 挂起并中断
                    interrupted = true;
            }
        } catch (Throwable t) {
            // 发生异常取消节点
            cancelAcquire(node);
            throw t;
        }
    }
  1. setHeadAndPropagate:设置头节点并广播其他节点来获取锁
 private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // 记录旧的头节点
        setHead(node);// 设置新的头节点
        /*
         * 如果头节点为null或者是不是取消状态,尝试唤醒后继节点
         */
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            // node节点的next是SHARED,即共享锁
            if (s == null || s.isShared())
                // 唤起获取共享锁的线程
                doReleaseShared();
        }
    }
  1. doReleaseShared:唤醒等待共享锁的节点
 private void doReleaseShared() {
        /*
         * 唤醒时是从头节点开始先唤醒第一个共享节点,
         * 第一个共享节点被唤醒后会在doAcquireShared方法里继续执行(之前就是在这个方法里被挂起的)
         * 第一个共享节点如果获取锁会调用setHeadAndPropagate方法修改头节点,然后再调用doReleaseShared方法
         * 唤醒第二个共享节点,以此类推,最后把所有的共享节点都唤醒
         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                // 获取头节点的状态
                int ws = h.waitStatus;
                // 如果头节点是SIGNAL,需要将状态设置为0,表示已经即将被唤醒
                if (ws == Node.SIGNAL) {
                    if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
                        continue;            // 如果失败了说明有其他线程在修改头节点,需要继续重试
                    unparkSuccessor(h); // 唤醒头节点的后继节点
                }
                else if (ws == 0 &&
                         !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
                    continue;                // 将头节点状态从0设置成PROPAGATE,如果失败了继续,因为也有其他获取共享锁的线程在更改头节点
            }
            // 如果头节点未改变(因为没有后继节点需要等待共享锁),跳出循环
            if (h == head)
                break;
        }
    }
  1. selfInterrupt:中断当前线程
static void selfInterrupt() {
    Thread.currentThread().interrupt();
}
  • acquireSharedInterruptibly方法:可中断的获取共享锁
public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException(); // 如果线程被中断抛出异常
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg); // 可中断的方式获取共享锁
    }
  1. doAcquireSharedInterruptibly:可中断的方式后去共享锁
 private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        // 添加共享锁节点到同步队列尾部
        final Node node = addWaiter(Node.SHARED);
        try {
            for (;;) {
                // 获取前驱节点
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        // 获取共享锁以后修改头节点,通知其他等待共享锁的节点
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        return;
                    }
                }
                // 线程获取共享锁失败后需要挂起,并且发现线程被中断,所以抛出异常
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } catch (Throwable t) {
            // 发生异常取消节点
            cancelAcquire(node);
            throw t;
        }
    }
  • tryAcquireSharedNanos方法:超时中断获取共享锁
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted()) // 线程如果中断了,直接抛出异常
            throw new InterruptedException();
        return tryAcquireShared(arg) >= 0 ||
            doAcquireSharedNanos(arg, nanosTimeout); // 超时获取共享锁
    }
  1. doAcquireSharedNanos:超时的方式获取中断锁
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        // 超时直接返回
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;
        // 添加共享节点到同步队列尾部
        final Node node = addWaiter(Node.SHARED);
        try {
            for (;;) {
                // 获取前驱节点
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        // 获取锁,修改头节点,通知所有其他等待共享锁的节点
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        return true;
                    }
                }
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L) {
                    // 超时取消节点
                    cancelAcquire(node);
                    return false;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
                    // 如果需要挂起,且超时时长大于SPIN_FOR_TIMEOUT_THRESHOLD
                    // 线程挂起nanosTimeout时间
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException(); // 中断了抛出异常
            }
        } catch (Throwable t) {
            // 发生异常取消节点
            cancelAcquire(node);
            throw t;
        }
    }

实现release功能需要做的事情

  1. 释放当前获取锁的线程持有的资源
  2. 唤醒有效的一个后继节点

与release功能相关的代码

  • release方法:释放排他锁
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            // 头节点不能是一个中间态
            if (h != null && h.waitStatus != 0)
                // 唤醒后继节点
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
  • release方法:释放共享锁
public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            // 释放共享锁,从头节点开始一个一个的释放
            // 如果存在多个共享节点在同步队列时,doReleaseShared方式其实是递归调用
            doReleaseShared();
            return true;
        }
        return false;
    }

至此,将所有获取锁和释放锁的方法相关的源码全部分析完

条件队列的实现

我们来看一下条件队列的链表结构
深入浅出AQS源码解析-LMLPHP

实现await功能需要做的事情

  1. 创建一个CONDITION类型的节点,将该节点添加到条件队列
  2. 释放已经获取的锁(因为只有当前线程先获取了锁才可能再调用Condition.await()方法)
  3. 如果无法获取锁,线程挂起

与await功能相关的代码

  • await方法:等待条件
public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException(); // 如果线程中断,直接抛出异常
            // 创建一个CONDITION类型的节点,将该节点添加到条件队列尾部
            Node node = addConditionWaiter();
            // 释放锁
            // 在调用await方法之前都会调用lock方法,这个时候已经获取锁了
            // 有时候锁还是可重入的,所以需要将所有的资源都释放掉
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            // 如果节点不再同步队列,全部都要挂起
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                // 如果在等待期间发生过中断(不管是调用signal之前还是之后),直接退出
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            // 让线程尝试去获取锁,如果无法获取锁就挂起
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            // 清除所有在条件队列中是取消状态的节点
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            // 发生中断,上报中断情况
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
  1. addConditionWaiter:在条件队列中添加一个节点
 private Node addConditionWaiter() {
            Node t = lastWaiter;
            // 清除条件队列中无效的节点
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            // 创建一个节点
            Node node = new Node(Node.CONDITION);
            // 添加到条件队列尾部
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }
  1. unlinkCancelledWaiters:清除在条件队列中被取消的节点
private void unlinkCancelledWaiters() {
            Node t = firstWaiter;
            Node trail = null;
            // 遍历条件队列将所有不是CONDITION状态的节点全部清除掉
            // 这些节点都是取消状态的节点
            while (t != null) {
                Node next = t.nextWaiter;
                if (t.waitStatus != Node.CONDITION) {
                    t.nextWaiter = null;
                    if (trail == null)
                        firstWaiter = next;
                    else
                        trail.nextWaiter = next;
                    if (next == null)
                        lastWaiter = trail;
                }
                else
                    trail = t;
                t = next;
            }
        }
  1. fullyRelease:释放线程持有的所有锁资源
final int fullyRelease(Node node) {
        try {
            int savedState = getState();
            // 释放所有的资源
            // 如果是可重入锁,savedState就是重入的次数
            if (release(savedState))
                return savedState;
            throw new IllegalMonitorStateException();
        } catch (Throwable t) {
            // 发生异常就取消该节点
            node.waitStatus = Node.CANCELLED;
            throw t;
        }
    }
  1. isOnSyncQueue:判断节点是否在同步队列
final boolean isOnSyncQueue(Node node) {
        // waitStatus是CONDITION或者node没有前驱节点,说明node不在同步队列
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        if (node.next != null) // 有后继节点一定在同步队列
            return true;
        /*
         * 在同步队列中查找node,看是否在同步队列中
         */
        return findNodeFromTail(node);
    }
  1. findNodeFromTail:在同步队列中查找节点
private boolean findNodeFromTail(Node node) {
        // 从尾节点开始查找
        for (Node p = tail;;) {
            if (p == node) // 找到了
                return true;
            if (p == null) // 找到头了还没找到
                return false;
            p = p.prev;
        }
    }
  1. checkInterruptWhileWaiting:检测中断的情况
private int checkInterruptWhileWaiting(Node node) {
            // 没有发生中断返回0
            // 调用signal之前发生中断返回THROW_IE
            // 调用signal之后发生中断返回REINTERRUPT
            return Thread.interrupted() ?
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                0;
        }
  1. transferAfterCancelledWait:清除在条件队列中被取消的节点
// 只有线程处于中断状态,才会调用此方法
// 如果需要的话,将这个已经取消等待的节点转移到阻塞队列
// 返回 true,如果此线程在 signal 之前被取消,否则返回false
final boolean transferAfterCancelledWait(Node node) {

        // 用 CAS 将节点状态设置为 0
        // 如果这步 CAS 成功,说明是 signal 方法之前发生的中断,
       // 因为如果 signal 先发生的话,signal 中会将 waitStatus 设置为 0
        if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) {
            enq(node); // 将节点放入阻塞队列
            return true;
        }
        // 到这里是因为 CAS 失败,肯定是因为 signal 方法已经将 waitStatus 设置为了 0
        // signal 方法会将节点转移到阻塞队列,但是可能还没完成,这边自旋等待其完成
        // 当然,这种事情还是比较少的吧:signal 调用之后,没完成转移之前,发生了中断
        while (!isOnSyncQueue(node))
            Thread.yield();
        return false;
    }
  1. enq:把节点添加到同步队列
private Node enq(Node node) {
        // 无限循环,将节点添加到同步队列尾部
        for (;;) {
            Node oldTail = tail;
            if (oldTail != null) {
                node.setPrevRelaxed(oldTail);
                if (compareAndSetTail(oldTail, node)) {
                    oldTail.next = node;
                    return oldTail;
                }
            } else {
                // 如果同步队列为空,初始化
                initializeSyncQueue();
            }
        }
    }
  1. reportInterruptAfterWait:中断处理
private void reportInterruptAfterWait(int interruptMode)
            throws InterruptedException {
            // 如果是THROW_IE状态,抛异常
            if (interruptMode == THROW_IE)
                throw new InterruptedException();
            else if (interruptMode == REINTERRUPT) // 再次中断,因为中断状态被使用过一次
                selfInterrupt();
        }

awaitNanosawaitUntilawait(long time, TimeUnit unit)这几个方法的整体逻辑是一样的,就不再分析了

实现signal功能需要做的事情

  1. 将条件队列中的节点加入同步队列
  2. 唤醒线程

与signal功能相关的代码

  • signal方法:唤醒等待条件的节点
 public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            // 获取条件队列中的第一个节点
            Node first = firstWaiter;
            if (first != null)
                // 唤醒等待条件的节点
                doSignal(first);
        }
  1. doSignal:唤醒等待条件的节点
private void doSignal(Node first) {
            do {
                // 去掉无效的节点
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&  // 将节点转移到同步队列
                     (first = firstWaiter) != null);
        }
  1. transferForSignal:将节点转移到同步队列
final boolean transferForSignal(Node node) {
        /*
         * 取消的节点不需要转移
         */
        if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
            return false;

        /*
         * 将节点加入同步队列尾部
         */
        Node p = enq(node);
        int ws = p.waitStatus;
        // ws > 0 说明 node 在阻塞队列中的前驱节点取消了等待锁,直接唤醒 node 对应的线程
        // 如果 ws <= 0, 那么 compareAndSetWaitStatus 将会被调用
        // 节点入队后,需要把前驱节点的状态设为SIGNAL
        if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
            // 如果前驱节点取消或者 CAS 失败,会进到这里唤醒线程
            LockSupport.unpark(node.thread);
        return true;
    }
  • signalAlll方法:唤醒所有等待条件的节点
public final void signalAll() {
            // 如果是当前线程
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                // 唤醒所有等待条件的节点
                doSignalAll(first);
        }
  1. doSignalAll:唤醒所有等待条件的节点
// 将所有的节点都转移到同步队列
private void doSignalAll(Node first) {
            lastWaiter = firstWaiter = null;
            do {
                Node next = first.nextWaiter;
                first.nextWaiter = null;
                transferForSignal(first);
                first = next;
            } while (first != null);
        }

现在将与AQS相关的核心代码都整理了一遍,里面如果有描述不清晰或者不准确的地方希望大家可以帮忙指出!

07-13 11:15