OpenJDK8,本人看的是openJDK。以前就看过,只是经常忘记,所以记录下

         OpenJDK之CountDownLatch-LMLPHP

                                                             图1

CountDownLatch是Doug Lea在JDK1.5中引入的,作用就不详细描述了,

  1. await()方法,如果还有线程在执行,那么当前线程阻塞,如果最后一个线程执行完成,则被唤醒
  2. countDown()方法,如果不是最后一个线程,则将状态值减去1,如果是最后一个,则将被阻塞的线程唤醒

    List-1

private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    Sync(int count) {
        setState(count);
    }

    int getCount() {
        return getState();
    }

    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}

    CountDownLatch里面有个内部类Sync继承了AbstractQueuedSynchronizer——这个类在java.util.concurrent.locks包下。

    CountDownLatch的构造方法调用Sync的构造方法,设置state的值,这个起到一个类似计数器的作用。

1.await()

    List-2

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

    对CountDownLatch调用await()时,调用的是Sync的acquireSharedInterruptibly()——这个方法是AbstractQueuedSynchronizer里面的,如下List-3,tryAcquireShared方法在类Sync中实现,如List-1中,当还有线程还未执行完,tryAcquireShared会返还-1,这样就调用doAcquireSharedInterruptibly()如List-4,这个方法是AbstractQueuedSynchronizer的,会将当前当前线程加入到CLH队列中,这样就实现了将当前线程阻塞,后面让最后一个来唤醒。

    List-3

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

    List-4

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

2.countDown()

    List-5

public void countDown() {
    sync.releaseShared(1);
}

    如List-5,内部调用了Sync的releaseShared(1),即释放1,如下List-6,先用tryReleaseShared方法来尝试是否能减少一个状态值,如List-7,在for循环中将状态值减去1,之后用CAS,更新状态值,如果此时状态值位0则返回true,如果不是则返回false。

    tryReleaseShared()如果返还true,则说明是最后一个线程了,此时用doReleaseShared()唤醒阻塞的线程,底层上使用LockSupport.unpark(Thread).    

    List-6

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

    List-7

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

    CyclicBarrier内部是使用ReentrantLock来实现,而CountDownLatch内部是使用AQS内部类来实现。

Reference

  1. http://openjdk.java.net/    
10-24 07:16