在尝试了解Phaser和CyclicBarrier之间的区别时,我遇到了一些链接
Difference between Phaser and CyclicBarrier和
https://www.infoq.com/news/2008/07/phasers/
我读到,Phaser与Fork/Join接口(interface)兼容,而CyclicBarrier不兼容,这是演示此代码的代码:
移相器
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
Phaser phaser = new Phaser(16){
@Override
protected boolean onAdvance(int phase, int registeredParties) {
return phase ==1 || super.onAdvance(phase, registeredParties);
}
};
System.out.println("Available Processors: "+Runtime.getRuntime().availableProcessors());
ExecutorService executorService = ForkJoinPool.commonPool(); // Runtime.getRuntime().availableProcessors() -1
for (int i = 0; i < 16; i++) {
final int count = 0;
executorService.submit(() -> {
while (!phaser.isTerminated()) {
try {
Thread.sleep(ThreadLocalRandom.current().nextInt(300, 2000));
System.out.println(Thread.currentThread().getName() + count + " ... ");
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() + count + " ... continues ... ");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
countDownLatch.countDown();
});
}
countDownLatch.await();
}
循环屏障
public static void main(String[] args) throws InterruptedException {
AtomicInteger phases = new AtomicInteger();
CountDownLatch countDownLatch = new CountDownLatch(1);
CyclicBarrier cyclicBarrier = new CyclicBarrier(16, () -> phases.incrementAndGet());
ExecutorService executorService = ForkJoinPool.commonPool();
for (int i = 0; i < 16; i++) {
executorService.submit(() -> {
while (phases.get() < 1) {
try {
Thread.sleep(ThreadLocalRandom.current().nextInt(300, 2000));
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
System.out.println(Thread.currentThread().getName() + " Ok, I am waiting ");
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName() + " continued it's way ... ");
} catch (BrokenBarrierException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
countDownLatch.countDown();
}
});
}
countDownLatch.await();
}
解释:
这两个代码运行一个fork/join线程池,这意味着这些线程是守护程序线程,这就是为什么我使用CountDownLatch的原因。方法
commonPool()
将创建一个线程池,该线程池的线程等于Runtime.getRuntime().availableProcessors()
,我的线程数为12,因此它将创建12个线程。两个示例中的Phaser和CyclicBarrier都定义了16个参与方,即,他们需要16次调用循环障碍中的await()
和Phaser中的arriveAndAwaitAdvance()
才能继续。在使用相位器的示例中,当第12个线程被阻止时,fork/join将产生更多线程,它将创建更多线程,因此相位器最终将终止。但是,使用CyclicBarrier,当第12个线程到达
await()
时,程序将停止并且永远不会前进,它会挂起。显然,由于屏障需要进行16次调用,才能使线程前进,而创建的线程只能进行12次调用。线程池不会像移相器那样创建更多线程来推进CyclicBarrier。问题:
fork/join如何通过Phaser而不是CyclicBarrier设法创建更多线程?
为什么
arriveAndAwaitAdvance()
方法使线程池创建新线程,怎么做,但是await()
方法没有导致线程池创建更多线程? 最佳答案
Phaser之所以能够做到这一点,是因为它在阻塞线程时内部调用 ForkJoinPool.managedBlock(ManagedBlocker)
。
任何人都可以访问ForkJoinPool的此API,因此您可以轻松地增强CyclicBarrier
版本以使用它,并消除线程不足。例如,带有某种氛围:
ForkJoinPool.managedBlock(new ManagedBlocker() {
boolean isReleasable = false;
@Override
public boolean block() throws InterruptedException {
try {
cyclicBarrier.await();
} catch (BrokenBarrierException aE) {
throw new IllegalStateException(aE);
}
return isReleasable = true;
}
@Override
public boolean isReleasable() {
return isReleasable;
}
});
关于java - 在Fork/Join上下文中,Phaser和CyclicBarrier,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/57240612/