OpenJDK8,本人看的是openJDK。以前就看过,只是经常忘记,所以记录下
图1
CountDownLatch是Doug Lea在JDK1.5中引入的,作用就不详细描述了,
- await()方法,如果还有线程在执行,那么当前线程阻塞,如果最后一个线程执行完成,则被唤醒
- countDown()方法,如果不是最后一个线程,则将状态值减去1,如果是最后一个,则将被阻塞的线程唤醒
List-1
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
CountDownLatch里面有个内部类Sync继承了AbstractQueuedSynchronizer——这个类在java.util.concurrent.locks包下。
CountDownLatch的构造方法调用Sync的构造方法,设置state的值,这个起到一个类似计数器的作用。
1.await()
List-2
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
对CountDownLatch调用await()时,调用的是Sync的acquireSharedInterruptibly()——这个方法是AbstractQueuedSynchronizer里面的,如下List-3,tryAcquireShared方法在类Sync中实现,如List-1中,当还有线程还未执行完,tryAcquireShared会返还-1,这样就调用doAcquireSharedInterruptibly()如List-4,这个方法是AbstractQueuedSynchronizer的,会将当前当前线程加入到CLH队列中,这样就实现了将当前线程阻塞,后面让最后一个来唤醒。
List-3
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
List-4
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
2.countDown()
List-5
public void countDown() {
sync.releaseShared(1);
}
如List-5,内部调用了Sync的releaseShared(1),即释放1,如下List-6,先用tryReleaseShared方法来尝试是否能减少一个状态值,如List-7,在for循环中将状态值减去1,之后用CAS,更新状态值,如果此时状态值位0则返回true,如果不是则返回false。
tryReleaseShared()如果返还true,则说明是最后一个线程了,此时用doReleaseShared()唤醒阻塞的线程,底层上使用LockSupport.unpark(Thread).
List-6
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
List-7
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
CyclicBarrier内部是使用ReentrantLock来实现,而CountDownLatch内部是使用AQS内部类来实现。