问题描述
我试图在java中理解Phaser。我写卡在预先等待其他各方到达一个例子。
I am trying to understand Phaser in java. I wrote an example which is stuck at advance waiting for other parties to arrive.
据我明白,移相器被用作可重复使用的线程同步(不像CountdownLatch这是具有屏障作用的屏障(与用于共享状态的Cyclicbarrier不同,Phaser不必在屏障动作中共享状态)。如果我错了,请纠正我。
As far as I understand, phaser is used as a reusable thread synchronization (unlike CountdownLatch which is not reusable) barrier with a barrier action (unlike Cyclicbarrier which is used to share state, Phaser doesn't have to share state in barrier action). Correct me if I am wrong.
所以,在我的例子中,我试图在一定数量的聚会/线程到达后在每个线程中执行一些随机的加法和减法代码障碍。我做错了什么?
So, in my example, I am trying to execute some random addition and subtraction code in each thread after certain number of parties/threads reach the barrier. What am I doing wrong?
import static java.lang.String.*;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.stream.IntStream;
public class PhaserUsage implements Callable<String> {
private static final int THREAD_POOL_SIZE = 10;
private final Phaser phaser;
private PhaserUsage(Phaser phaser) {
this.phaser = phaser;
}
public static void main(String a[]) {
ExecutorService execService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
CompletionService<String> completionService = new ExecutorCompletionService<>(execService);
Phaser phaser = new Phaser(1);
IntStream.range(0, THREAD_POOL_SIZE)
.forEach(nbr -> completionService.submit(new PhaserUsage(phaser)));
execService.shutdown();
try {
while (!execService.isTerminated()) {
String result = completionService.take().get();
System.out.println(format("Result is: %s", result));
}
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
@Override
public String call() {
String threadName = Thread.currentThread().getName();
System.out.println(format("Registering...%s",threadName));
phaser.register();
System.out.println(format("Arrive and await advance...%s",threadName));
phaser.arriveAndAwaitAdvance(); // await all creation
int a = 0, b = 1;
Random random = new Random();
for (int i = 0; i < random.nextInt(10000000); i++) {
a = a + b;
b = a - b;
}
System.out.println(format("De-registering...%s",threadName));
phaser.arriveAndDeregister();
return format("Thread %s results: a = %s, b = %s", threadName, a, b);
}
}
推荐答案
你使用值1初始化Phaser:
You initialize your Phaser with a value of 1:
Phaser phaser = new Phaser(1);
这意味着您的主线程是您正在等待的线程之一,但它从不调用到达()。
This means that your main thread is one of the threads you are waiting for, but it never calls arrive().
当您的线程数被修复时,您应该使用线程号初始化Phaser,并删除register()调用。
As the number of your threads is fixed, you should initialize the Phaser with the thread number, and remove the register() calls.
这篇关于通过示例了解java中的phaser的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!