AbstractQueuedSynchronized 以下简称AQS,是用来构建锁或者其他同步组件的基础框架。
在AQS中,为锁的获取和释放提供了一些模板方法,而实现锁的类(AQS的子类)需要实现这些模板方法中的同步方法。
这些方法包括:
·tryAcquire():尝试以独占模式获取锁
·tryRelease():尝试释放独占模式的锁
·tryAcquireShared():尝试以共享模式获取锁
·tryReleaseShared():尝试释放共享模式的锁
·isHeldExclusiverly():返回是否以独占模式获有锁
在分析AQS的原理之前,我们先看看LockSupport、Lock,Condition、AQS、ReentrantLok等等之间的关系和使用方式。
关系图:
使用方式有两种:一种是不带条件的普通的锁,另一种是带条件的锁。
不带条件:
/*
* ReentrantLock是Lock接口的实现类之一
* 实现的是一种可重入的锁
*/
Lock lock = new ReentrantLock();
lock.lock();
try {
//同步处理
}finally {
lock.unlock();
}
带条件的锁:
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();//创建和该锁关联的条件锁 public void conditionWait() throws InterruptedException{
lock.lock();
try {
condition.await();
}finally {
lock.unlock();
}
}
public void ConditionSignal() throws InterruptedException{
lock.lock();
try {
condition.signal();
}finally {
lock.unlock();
}
}
从使用方式可以看到,主要调用的方法就是:lock.lock()、lock.unlock()、lock.newCondition()、condition.await()、condition.signal()
下面分别来看看这几个方法:
lock.lock():由于Lock是一个接口,所以需要通过其子类来实例化,所以lock.lock()其实调用的是子类的lock()方法,在上面的例子中自然调用的就是ReentrantLock.lock()方法。
public void lock() {
sync.lock();
}
可以看到,ReentrantLock.lock()方法调用的是同步器内部类的lock()方法,而ReentrantLock的内部同步器类Sync又分为FairSync和NoFairSync。
源码如下(省略了一部分内容):
abstract static class Sync extends AbstractQueuedSynchronizer { abstract void lock();
}
static final class NonfairSync extends Sync { final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
} static final class FairSync extends Sync {
final void lock() {
acquire(1);
}
}
可以看到在非公平的获取锁的方法中,会首先尝试直接去获取锁,而不会通过同步队列去排队获取锁。否则的话,就通过AQS同步器的acquire(1)去获取锁。
※RetrantLock在初始化的时候可以通过参数指定是公平的还是不公平的锁。默认是非公平的锁。
lock.unlock():调用的是同步器中的release(1)方法,也就是AQS中的release(1)方法。
public void unlock() {
sync.release(1);
}
lock.newCondition():该方法调用的也是内部同步器类中的newCondition()方法。
public Condition newCondition() {
return sync.newCondition();
}
在ReentrantLock的内部同步器类中的newCondition()方法如下:
final ConditionObject newCondition() {
return new ConditionObject();
}
可以看到,该方法直接返回了一个AQS中的内部类ConditonObject对象。所以,在每次生成一个条件锁的时候,都会创建一个ConditionObject对象,
而每个ConditionObject对象都会在内部维护一个条件等待队列。
conditon.await():通过调用LockSupport中的park()方法,将当前挂起。
源码如下:
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
condition.signal():通过调用LockSupport的unpark()方法,来唤醒线程。
源码如下:
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
} private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false; /*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
了解完上面的概述之后,我们知道了,在不带条件的锁中,主要是通过调用AQS中的acquire(1)herelease(1)方法来获取锁和释放锁的。
下面就来具体看看AQS中的实现
在AQS中通过一个双向FIFO同步队列来维护获取对象锁的线程,当获取锁失败的时候,会用当前线程构建一个Node节点,加入到同步队列中去。
在AQS中维护了同步队列的head和tail节点,和同步状态。。
Node节点中维护了当前线程的status和前驱节点、后继节点、下一个等待节点(条件等待的时候用)。
waitStatus包含如下状态默认值为0:
CANCELLED = 1 : 表示当前线程在等待的时候已经超时了或者被取消了
SIGNAL = -1 :当前线程释放了同步状态或者被取消的时候会通知后继节点,使后继节点得以运行
CONDITINO = -2:节点在等待队列中,等待Condition,当其他线程在Condition上调用signal的时候,该线程会从等待队列转移到同步队列中去,加入到同步状态的获取。
PROPAGATE = -3:表示下一次共享式同步状态会无条件的传播下去
AQS中同步器的结构如下:
当有节点加入到同步队列的时候,只需要对tail节点重新指向就可以了
同步队列是一个FIFO的队列,在获取锁的时候总是首节点是获取同步状态的节点,首节点的线程在释放同步状态时,将会唤醒后继节点,而后继节点在获取同步状态成功时,会将自己设置为首节点。
下面就来具体看看在获取锁的时候lock.lock()调用的同步器中的acquire(1)方法的具体实现。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
以独占模式获取锁,并且不相应中断。在AQS中还有以下acquire方法:
acquireInterruptibly(int arg) :以独占模式获取锁,并且响应中断
acquireShared(int arg):以共享模式获取锁,并且不响应中断
acquireSharedInterruptibly(int arg):以共享模式获取锁,并且响应中断
该方法首先会调用tryAcquire(arg)来尝试获取锁。从源码可以看到,在AQS中该方法只是单纯的抛出一个UnsupportedOperationException异常,该方法需要实现AQS同步器的具体类中实现。
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
我们看看ReentrantLock中的具体实现。
非公平锁:
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { //如果当前的同步状态为0,就尝试直接设置同步状态和设置独占的线程为自己,来强制获取锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { //如果当前同步状态不为0,就判断是不是自己获取了锁,这里是实现可重入的
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
公平锁:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() && //当前state为0 ,并且同步队列中没有前继节点,就尝试设置自己为获得锁的线程
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { //实现可重入
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
可以看到,不论是在公平锁还是非公平锁的tryAcquire中,当获取到锁的时候返回的都是true,否则返回false。
所以,如果当前线程没有获取到锁的时候,则会继续执行后面的acquireQueued(addWaiter(Node.EXCLUSIVE),arg))
我们先看看addWaiter(Node.EXCLUSIVE)的实现。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 首先为当前线程以指定模式构建一个Node节点,然后尝试快速入队的方式加入到同步队列中去
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node); //否则的话就调用enq(node)方法来加入到同步队列中去
return node;
}
再看看enq(node)的源码实现
private Node enq(final Node node) {
for (;;) { //通过一个无条件的循环,知道将构建一个空的head,然后将当前节点加入到空的head的后面,构成一个同步队列
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
再看看acquireQueued(node,int)的源码实现
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { //判断前一个节点是不是head节点,如果是的话,则会再次尝试去获取锁
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && //如果前一个节点不是head节点,则设置前一个非取消节点的状态是signal,以确保在前一个线程释放锁的时候能唤醒当前线程
parkAndCheckInterrupt()) //挂起当前线程,并且返回当前线程是否被中断过(会清空中断状态,只有在被唤醒的时候才能从park()方法返回)
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node); //如果被中断过,则把该节点从同步队列中删除
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return; node.thread = null; // Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev; // predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next; // Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED; // If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
} node.next = node; // help GC
}
}
acquire(1)的总结:
①首先会去调用子类实现的具体tryAcquire(1)方法来获取锁,根据子类不同,实现的也不同,例如ReentrantLock中的实现就分为公平锁和非公平锁。非公平锁就是不会去同步队列中排队,
而是直接去获取锁。如果获取失败的话,就会跟公平锁一样,进入FIFO的同步队列排队。
②加入同步队列的时候,首先会判断tail节点是否为空,如果不为空,则会尝试快速入队,如果为空的话,则会先创建一个空的head节点,然后在将当前线程的节点加入到同步队列中去
③加入到同步队列之后,会再次判断前一个节点是不是head节点,如果是的话,则会再次尝试去获取锁,如果获取失败的话,则会挂起当前线程。
④直到当前线程被唤醒的时候,会判断当前线程是否被中断过,如果被中断过,则会从同步队列中删除当前线程节点。并且中断当前线程。
下面在看看lock.unlock()方法调用的同步器中的sync.release(1)方法的具体实现
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
首先会调用tryRelease(arg)方法来释放锁。然后唤醒后面的线程。
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
tryRelease()同样也是子类需要实现的方法。ReentrantLock中的实现如下:
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0); /*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
release(1)的总结:
①首先会判断当前线程是不是独占的拥有锁。如果是的话,则会去释放锁。如果当前线程都已经退出了获取锁(可重入的原因),则会设置当前线程的state为0,独占线程为null
②在释放锁之后,会唤醒下一个等待中的线程。
AQS中条件队列的实现方式参考ConditonObject实现分析点此参考