AQS初体验

AQS是AbstractQueuedSynchronizer的简称。AQS提供了一种实现阻塞锁和一系列依赖FIFO等待队列的同步器的框架。所谓框架,AQS使用了模板方法的设计模式,为我们屏蔽了诸如内部队列等一系列复杂的操作,让我们专注于对锁相关功能的实现。

获取锁

既然涉及到锁竞争的问题,必然需要一个标志位来表示锁的状态,AQS中提供了state这样一个成员变量,为了安全的操作state,我们需要使用原子操作。将state从0修改为1就代表这个线程已经持有了这把锁。
但竞争锁的线程绝对不会只是一个,其他未竞争到锁的线程该如何进行处理?
第一个答案可能是重试,重试虽好,但是可不能贪杯,如果竞争很严重,无数的线程在不断的重新尝试获取锁,我们的CPU早晚会吃不消。
第二个比较好的方式就是排队,持有锁的线程释放锁之后,通知下一个线程去获取锁,避免了不必要的CPU损失。但是值得注意的是,即使是从队列中被唤醒的线程去获取锁也依旧可能获取不到的,因为无时无刻都有新加入的线程来竞争锁。
AQS实际上就是使用了双端队列来解决了这个问题的。

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

tryAcquire()如果失败将执行acquireQueued()中的addWaiter()方法,即尝试加入等待队列。这个等待队列使用了双端队列进行实现,在AQS中定义了一个Node的数据结构,AQS中维护着head和tail两个成员变量。
在单线程中插入队列尾部很简单,只需要将原来的tail的next指向新插入的节点,并且将tail重新设置为新插入的节点。但是在多线程环境中,很有可能发生多个线程同时插入尾部的现象,而上述的插入过程不具有原子性,同时插入的过程必将出现多个操作顺序的混乱,最终导致等待队列的tail节点
AQS在插入tail节点时使用原子操作来保证了插入的可靠。

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

插入成功的直接返回了node,而没有插入成功的则执行了enq()函数,在enq()中使用了CAS进行插入。

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

经历这个CAS插入,最后全部的节点都将被插入到队列尾部。

现在,没有获取到锁的线程已经被放进队列了,但是放入队列也代表着我们可以忘了初心。我们的目标是获取锁,而不是进入队列。acquireQueued()就在尝试为我们获取锁。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

简单说就是,检查自己是不是head节点的下一个节点,如果是的话,尝试获取获取锁;如果不是的话,将使用LockSupport的park方法阻塞当前线程,避免造成CPU的浪费。

释放锁

释放锁的过程可以分成两大部分:

  1. 恢复AQS的状态为无锁状态
  2. 唤醒等待队列中下一个等待的节点

在第一个过程中,没有排在队头的节点都已经被阻塞了,而唤醒的时机就是前一个节点已经释放锁,所以可以说这个等待队列,实际上是一个唤醒链。

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            // 使用unpark唤醒下一个线程
            unparkSuccessor(h);
        return true;
    }
    return false;
}

总结

AQS为我们提供了:

  • status 状态同步标示
  • Node双端队列 存储竞争锁线程
  • 基于Node双端队列的线程唤醒机制

我觉得AQS精华在于,将原来N个线程并发竞争锁降低为1+M(新加入)个。在我们自己实现类似的资源竞争算法中,也可以通过加入队列来降低竞争的并发度,降低CPU的负载压力。

07-25 19:10