OpenJDK8,本人看的是openJDK。以前就看过,只是经常忘记,所以记录下

           OpenJDK之CyclicBarrier-LMLPHP

                                                               图1

    CyclicBarrier是Doug Lea在JDK1.5中引入的,作用就不详细描述了,主要有如下俩个方法使用:

  1. await()方法,如果当前线程不是最后一个线程,则阻塞当前线程,如果是最后一个线程,则不会阻塞当前线程,而且还会唤醒其它已经阻塞了的线程
  2. reset()方法,将栈栏重置,为什么要重置,后面会讲到

1.await()方法

开始之前,先来看构造方法,如下List-1,parties表示线程个数,barrierAction在parties个线程执行完成时会执行

    List-1

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

public CyclicBarrier(int parties) {
    this(parties, null);
}

    好了,开始来看await()方法,await()方法有俩个方法重载,一个时无参数的,一个是后时间的。出于分析方便,来看无参的,

    List-2

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
            TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock();//1
    try {
        final Generation g = generation;

        if (g.broken)
            throw new BrokenBarrierException();

        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }

        int index = --count;//2
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();//3
                ranAction = true;
                nextGeneration();//7
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        // loop until tripped, broken, interrupted, or timed out
        for (;;) {
            try {
                if (!timed)//4
                    trip.await();//5
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    Thread.currentThread().interrupt();
                }
            }

            if (g.broken)
                throw new BrokenBarrierException();

            if (g != generation)
                return index;

            if (timed && nanos <= 0L) {
                breakBarrier();//6
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

    如List-2所示

  1. 用ReentrantLock获取锁,确保只有一个时刻只有一个线程在执行锁区域
  2. 2处将计数器减去1,表示有一个线程执行完了
  3. 3处,如果步骤2后,计数器是0,表示最后一个线程执行完了,此时如果barrierAction不是null,则执行它,这也就是为什么在CyclicBarrier的构造方法中传入Runnable的action后,会在最后一个线程执行完成后,该action会被执行。
  4. 4处,说明当前线程执行完了,但是计算器还不是0,即还有线程还在执行,由于timed是false,所以到5处,将当前线程加入到CLH队列中睡眠
  5. 最后一个线程执行完了,是如何唤醒其它已经执行完,在睡眠的线程呢,7处调用nextGeneration方法,如下List-3,方法中调用trip.signalAll(),这个方法唤醒在CLH中睡眠的线程。

    List-3

private void nextGeneration() {
    // signal completion of last generation
    trip.signalAll();
    // set up next generation
    count = parties;
    generation = new Generation();
}

2.reset()方法

    await()被调用n次后,计数器已经变为0了,此时再次使用await()不行,需要使用reset()来重置计数器

    List-4

public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        breakBarrier();   // break the current generation
        nextGeneration(); // start a new generation
    } finally {
        lock.unlock();
    }
}

private void breakBarrier() {
    generation.broken = true;
    count = parties;
    trip.signalAll();
}

private void nextGeneration() {
    // signal completion of last generation
    trip.signalAll();
    // set up next generation
    count = parties;
    generation = new Generation();
}

    List-4中,resrt()将parties值赋值给count,即重置计数器,这样之后await()又可以使用生效了。gernation会重新创建对象,这个在await(...)方法中使用到。

    CyclicBarrier中使用到ReentrantLock的lock和Condition,这个在其它章节讲。

Reference

  1. http://openjdk.java.net/    
10-23 06:08