OpenJDK8,本人看的是openJDK。以前就看过,只是经常忘记,所以记录下
图1
CyclicBarrier是Doug Lea在JDK1.5中引入的,作用就不详细描述了,主要有如下俩个方法使用:
- await()方法,如果当前线程不是最后一个线程,则阻塞当前线程,如果是最后一个线程,则不会阻塞当前线程,而且还会唤醒其它已经阻塞了的线程
- reset()方法,将栈栏重置,为什么要重置,后面会讲到
1.await()方法
开始之前,先来看构造方法,如下List-1,parties表示线程个数,barrierAction在parties个线程执行完成时会执行
List-1
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}
好了,开始来看await()方法,await()方法有俩个方法重载,一个时无参数的,一个是后时间的。出于分析方便,来看无参的,
List-2
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();//1
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;//2
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();//3
ranAction = true;
nextGeneration();//7
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)//4
trip.await();//5
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();//6
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
如List-2所示
- 用ReentrantLock获取锁,确保只有一个时刻只有一个线程在执行锁区域
- 2处将计数器减去1,表示有一个线程执行完了
- 3处,如果步骤2后,计数器是0,表示最后一个线程执行完了,此时如果barrierAction不是null,则执行它,这也就是为什么在CyclicBarrier的构造方法中传入Runnable的action后,会在最后一个线程执行完成后,该action会被执行。
- 4处,说明当前线程执行完了,但是计算器还不是0,即还有线程还在执行,由于timed是false,所以到5处,将当前线程加入到CLH队列中睡眠
- 最后一个线程执行完了,是如何唤醒其它已经执行完,在睡眠的线程呢,7处调用nextGeneration方法,如下List-3,方法中调用trip.signalAll(),这个方法唤醒在CLH中睡眠的线程。
List-3
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}
2.reset()方法
await()被调用n次后,计数器已经变为0了,此时再次使用await()不行,需要使用reset()来重置计数器
List-4
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}
List-4中,resrt()将parties值赋值给count,即重置计数器,这样之后await()又可以使用生效了。gernation会重新创建对象,这个在await(...)方法中使用到。
CyclicBarrier中使用到ReentrantLock的lock和Condition,这个在其它章节讲。