我试图找出所有ForkJoinPool线程何时完成其任务。
我写了这个测试应用程序(我使用System.out是因为它只是一个快速的测试应用程序,并且没有错误检查/处理):
public class TestForkJoinPoolEnd {
private static final Queue<String> queue = new LinkedList<>();
private static final int MAX_SIZE = 5000;
private static final int SPEED_UP = 100;
public static void main(String[] args) {
ForkJoinPool customThreadPool = new ForkJoinPool(12);
customThreadPool.submit(
() -> makeList()
.parallelStream()
.forEach(TestForkJoinPoolEnd::process));
enqueue("Theard pool started up");
int counter = MAX_SIZE + 1;
while (!customThreadPool.isTerminating()) {
String s = dequeue();
if (s != null) {
System.out.println(s);
counter--;
}
try {
TimeUnit.MILLISECONDS.sleep(1);
} catch (InterruptedException e) {
}
}
System.out.println("counter = " + counter);
System.out.println("isQuiescent = " + customThreadPool.isQuiescent() + " isTerminating " +
"= " + customThreadPool.isTerminating() + " isTerminated = "
+ customThreadPool.isTerminated() + " isShutdown =" + customThreadPool.isShutdown());
}
static List<String> makeList() {
return Stream.generate(() -> makeString())
.limit(MAX_SIZE)
.collect(Collectors.toList());
}
static String makeString() {
int leftLimit = 97; // letter 'a'
int rightLimit = 122; // letter 'z'
int targetStringLength = 10;
Random random = new Random();
StringBuilder buffer = new StringBuilder(targetStringLength);
for (int i = 0; i < targetStringLength; i++) {
int randomLimitedInt = leftLimit + (int)
(random.nextFloat() * (rightLimit - leftLimit + 1));
buffer.append((char) randomLimitedInt);
}
return buffer.toString();
}
static int toSeed(String s) {
int sum = 0;
for (int i = 0; i < s.length(); i++) {
sum += s.charAt(i);
}
return (sum / SPEED_UP);
}
static void process(String s) {
StringBuilder sb = new StringBuilder(s);
long start = System.currentTimeMillis();
try {
TimeUnit.MILLISECONDS.sleep(toSeed(s));
} catch (InterruptedException e) {
}
long end = System.currentTimeMillis();
sb.append(" slept for ")
.append((end - start))
.append(" milliseconds");
enqueue(sb.toString());
}
static void enqueue(String s) {
synchronized (queue) {
queue.offer(s);
}
}
static String dequeue() {
synchronized (queue) {
return queue.poll();
}
}
}
此代码被卡住,永远无法完成。如果将while循环的条件更改为
!customThreadPool.isQuiescent()
,它将以计数器和队列大小设置为1终止循环。我应该使用什么来确定线程何时完成?
最佳答案
ExecutorService
不会仅因为一项作业(及其子作业)完成而终止自身。线程池背后的整个想法是可重用的。
因此,它将仅在应用程序对其调用shutdown()
时终止。
您可以使用isQuiescent()
来查找是否没有待处理的作业,仅当所有提交的作业都属于您的特定任务时,该作业才有效。使用submit
返回的未来来检查实际作业的完成情况,这更加干净。
在这两种情况下,排队任务的完成状态都不会说明您正在轮询的队列。当您了解提交的结束时,您仍然必须检查队列中是否有待处理的元素。
此外,建议使用线程安全的BlockingQueue
实现,而不是用LinkedList
块修饰synchronized
。连同其他需要清除的内容,代码如下所示:
public class TestForkJoinPoolEnd {
private static final BlockingQueue<String> QUEUE = new LinkedBlockingQueue<>();
private static final int MAX_SIZE = 5000;
private static final int SPEED_UP = 100;
public static void main(String[] args) {
ForkJoinPool customThreadPool = new ForkJoinPool(12);
ForkJoinTask<?> future = customThreadPool.submit(
() -> makeList()
.parallelStream()
.forEach(TestForkJoinPoolEnd::process));
QUEUE.offer("Theard pool started up");
int counter = MAX_SIZE + 1;
while (!future.isDone()) try {
String s = QUEUE.poll(1, TimeUnit.MILLISECONDS);
if (s != null) {
System.out.println(s);
counter--;
}
} catch (InterruptedException e) {}
for(;;) {
String s = QUEUE.poll();
if (s == null) break;
System.out.println(s);
counter--;
}
System.out.println("counter = " + counter);
System.out.println("isQuiescent = " + customThreadPool.isQuiescent() + " isTerminating " +
"= " + customThreadPool.isTerminating() + " isTerminated = "
+ customThreadPool.isTerminated() + " isShutdown =" + customThreadPool.isShutdown());
customThreadPool.shutdown();
}
static List<String> makeList() {
return IntStream.range(0, MAX_SIZE)
.mapToObj(i -> makeString())
.collect(Collectors.toList());
}
static String makeString() {
int targetStringLength = 10;
Random random = new Random();
StringBuilder buffer = new StringBuilder(targetStringLength);
for (int i = 0; i < targetStringLength; i++) {
int randomLimitedInt = random.nextInt('z' - 'a' + 1) + 'a';
buffer.append((char) randomLimitedInt);
}
return buffer.toString();
}
static int toSeed(String s) {
return s.chars().sum() / SPEED_UP;
}
static void process(String s) {
long start = System.nanoTime();
try {
TimeUnit.MILLISECONDS.sleep(toSeed(s));
} catch (InterruptedException e) {
}
long end = System.nanoTime();
QUEUE.offer(s + " slept for " + (end - start)/1000000 + " milliseconds");
}
}
如果您在接收端的
sleep
呼叫应该模拟一些工作负载而不是等待新项目,则还可以使用int counter = MAX_SIZE + 1;
while (!future.isDone()) {
String s = QUEUE.poll();
if (s != null) {
System.out.println(s);
counter--;
}
try {
TimeUnit.MILLISECONDS.sleep(1);
} catch (InterruptedException e) {}
}
但逻辑没有改变。
future.isDone()
返回true
之后,我们必须重新检查队列中的未决元素。我们只保证不会有新物品到达,而不能保证队列已经为空。附带说明,
makeString()
方法可以进一步改进为static String makeString() {
int targetStringLength = 10;
ThreadLocalRandom random = ThreadLocalRandom.current();
StringBuilder buffer = new StringBuilder(targetStringLength);
for (int i = 0; i < targetStringLength; i++) {
int randomLimitedInt = random.nextInt('a', 'z' + 1);
buffer.append((char)randomLimitedInt);
}
return buffer.toString();
}
甚至
static String makeString() {
int targetStringLength = 10;
return ThreadLocalRandom.current()
.ints(targetStringLength, 'a', 'z'+1)
.collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append)
.toString();
}