Phaser是JDK7新添加的线程同步辅助类,作用同CyclicBarrier,CountDownLatch类似,但是使用起来更加灵活:
1. Parties是动态的。
2. Phaser支持树状结构,即Phaser可以有一个父Phaser。
Phaser的构造函数涉及到两个参数:父Phaser和初始的parties,因此提供了4个构造函数:
public Phaser();
public Phaser(int parties); public Phaser(Phaser parent);
public Phaser(Phaser parent, int parties);
因为Phaser的特色在在于动态的parties,因此首先来看动态更新parties是如何实现的。
Phaser提供了两个方法:register和bulkRegister,前者会添加一个需要同步的线程,后者会添加parties个需要同步的线程。
public int register() {
return doRegister(1);
} // 增加了参数的检查
public int bulkRegister(int parties) {
if (parties < 0)
throw new IllegalArgumentException();
if (parties == 0)
return getPhase();
return doRegister(parties);
}
两个方法都调用了doRegister方法,因此接下来就来看看doRegister方法。
在分析doRegister之前先来说说Phaser的成员变量:state,它存储了Phaser的状态信息:
private volatile long state;
1. state的最高位是一个标志位,1表示Phaser的线程同步已经结束,0表示线程同步正在进行
2. state的低32位中,低16位表示没有到达的线程数量,高16位表示Parties值
3. state的高32位除了最高位之外的其他31位表示的Phaser的phase,可以理解为第多少次同步(从0开始计算)。
介绍完了state,来看方法doRegister:
private int doRegister(int registrations) {
// 把registrations值同时加到parties值和还未达到的线程数量中去
long adj = ((long)registrations << PARTIES_SHIFT) | registrations;
final Phaser parent = this.parent;
int phase;
for (;;) {
long s = state;
int counts = (int)s;
int parties = counts >>> PARTIES_SHIFT;
int unarrived = counts & UNARRIVED_MASK;
// 超过了允许的最大parties
if (registrations > MAX_PARTIES - parties)
throw new IllegalStateException(badRegister(s));
// 最高位为1,表示Phaser的线程同步已经结束
else if ((phase = (int)(s >>> PHASE_SHIFT)) < 0)
break;
// Phaser中的parties不是0
else if (counts != EMPTY) {
// 如果当前Phaser没有父Phaser,或者如果有父Phaser,
// 刷新自己的state值,如果刷新后的state没有变化。
// 这里刷新子Phaser的原因在于,会出现父Phaser已经进入下一个phase
// 而子Phaser却没有及时进入下一个phase的延迟现象
if (parent == null || reconcileState() == s) {
// 如果所有线程都到达了,等待Phaser进入下一次同步开始
if (unarrived == 0)
root.internalAwaitAdvance(phase, null);
// 更新state成功,跳出循环完成注册
else if (UNSAFE.compareAndSwapLong(this, stateOffset,
s, s + adj))
break;
}
}
// 第一次注册,且不是子Phaser
else if (parent == null) {
// 更新当前Phaser的state值成功则完成注册
long next = ((long)phase << PHASE_SHIFT) | adj;
if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
break;
}
// 第一次注册到子Phaser
else {
// 锁定当前Phaser对象
synchronized (this) {
// 再次检查state值,确保没有被更新
if (state == s) {
// 注册到父Phaser中去
parent.doRegister(1);
do { // 获取当前phase值
phase = (int)(root.state >>> PHASE_SHIFT);
} while (!UNSAFE.compareAndSwapLong
(this, stateOffset, state,
((long)phase << PHASE_SHIFT) | adj));// 更新当前Phaser的state值
break;
}
}
}
}
return phase;
}
看完了注册,那么来看同步操作的arrive,这里也涉及到两个方法:arrive和arriveAndDeregister,前者会等待其他线程的到达,后者则会立刻返回:
public int arrive() {
return doArrive(false);
} public int arriveAndDeregister() {
return doArrive(true);
}
两个方法都调用了doArrive方法,区别在于参数一个是false,一个是true。那么来看doArrive:
private int doArrive(boolean deregister) {
// arrive需要把未到达的线程数减去1,
// deregister为true,需要把parties值也减去1
int adj = deregister ? ONE_ARRIVAL|ONE_PARTY : ONE_ARRIVAL;
final Phaser root = this.root;
for (;;) {
// 如果是有父Phaser,首先刷新自己的state
long s = (root == this) ? state : reconcileState();
int phase = (int)(s >>> PHASE_SHIFT);
int counts = (int)s;
int unarrived = (counts & UNARRIVED_MASK) - 1;
// 最高位为1,表示同步已经结束,返回phase值
if (phase < 0)
return phase;
// 如果parties为0或者在此次arrive之前所有线程到达
else if (counts == EMPTY || unarrived < 0) {
// 对于非子Phaser来说,上述情况的arrive肯定是非法的
// 对于子Phaser首先刷新一下状态再做检查
if (root == this || reconcileState() == s)
throw new IllegalStateException(badArrive(s));
}
// 正常情况下,首先更新state
else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adj)) {
// 所有线程都已经到达
if (unarrived == 0) {
// 计算parties作为下一个phase的未到达的parties
long n = s & PARTIES_MASK;
int nextUnarrived = (int)n >>> PARTIES_SHIFT;
// 调用父Phaser的doArrive
if (root != this)
// 如果下一个phase的未到达的parties为0,则需要向
// 父Phaser取消注册
return parent.doArrive(nextUnarrived == 0);
// 正在进入下一个Phase,默认的实现是nextUnarrived为0
// 表示正在进入下一个Phase,因为下一个phase的parties
// 为0,需要等待parties不为0
if (onAdvance(phase, nextUnarrived))
// 正在等待下一个phase,设置状态为终止
n |= TERMINATION_BIT;
else if (nextUnarrived == 0)
// 下一个phase的parties为0,更新未到达的parties的值
n |= EMPTY;
else
// 更新下一个phase的未到达的parties的值
n |= nextUnarrived;
// phase值加1
n |= (long)((phase + 1) & MAX_PHASE) << PHASE_SHIFT; // 更新state值
UNSAFE.compareAndSwapLong(this, stateOffset, s, n); // 唤醒等待的线程
releaseWaiters(phase);
}
return phase;
}
}
}
关于arrive还有一个方法:arriveAndAwaitAdvance。这个方法会等到下一个phase开始再返回,相等于doArrive方法添加了awaitAdvance方法的功能。基本逻辑和上面说的doArrive方法类似:
public int arriveAndAwaitAdvance() {
final Phaser root = this.root;
for (;;) {
long s = (root == this) ? state : reconcileState();
int phase = (int)(s >>> PHASE_SHIFT);
int counts = (int)s;
int unarrived = (counts & UNARRIVED_MASK) - 1;
if (phase < 0)
return phase;
else if (counts == EMPTY || unarrived < 0) {
// 对于非子Phaser来说,因为可以等待下一个phase,
// 所以不是非法arrive
if (reconcileState() == s)
throw new IllegalStateException(badArrive(s));
}
else if (UNSAFE.compareAndSwapLong(this, stateOffset, s,
s -= ONE_ARRIVAL)) {
// 还有其他线程没有达到,就会等待直到下一个phase开始
if (unarrived != 0)
return root.internalAwaitAdvance(phase, null);
if (root != this)
return parent.arriveAndAwaitAdvance();
long n = s & PARTIES_MASK; // base of next state
int nextUnarrived = (int)n >>> PARTIES_SHIFT;
if (onAdvance(phase, nextUnarrived))
n |= TERMINATION_BIT;
else if (nextUnarrived == 0)
n |= EMPTY;
else
n |= nextUnarrived;
int nextPhase = (phase + 1) & MAX_PHASE;
n |= (long)nextPhase << PHASE_SHIFT;
if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n))
return (int)(state >>> PHASE_SHIFT);
releaseWaiters(phase);
return nextPhase;
}
}
}
这一部分主要讲了Phaser的动态更新parties以及线程的arrive,下一部分将会分析线程等待的实现。