我的问题与this one here密切相关。
如此处所述,我希望主线程等待,直到工作队列为空并且所有任务都已完成。但是,我的情况是每个任务都可能递归地导致新任务被提交进行处理。这使得收集所有这些任务的 future 变得有点尴尬。

我们当前的解决方案使用忙等待循环来等待终止:

        do { //Wait until we are done the processing
      try {
        Thread.sleep(200);
      } catch (InterruptedException e) {
        throw new RuntimeException(e);
      }
    } while (!executor.getQueue().isEmpty()
             || numTasks.longValue() > executor.getCompletedTaskCount());

numTasks是随着创建每个新任务而增加的值。
这行得通,但由于繁忙的等待,我认为它不是很好。我想知道是否有一个好的方法可以使主线程同步等待,直到被显式唤醒。

最佳答案

非常感谢您的所有建议!

最后,我选择了一些我认为相当简单的东西。我发现CountDownLatch几乎是我所需要的。它一直阻塞直到计数器达到0。唯一的问题是它只能递减计数,不能递减计数,因此不能在我有任务可以提交新任务的动态设置中使用。因此,我实现了一个提供附加功能的新类CountLatch。 (请参阅下文)然后,按如下方式使用此类。

主线程调用latch.awaitZero(),直到锁存器到达0为止。

任何线程,在调用executor.execute(..)之前都会调用latch.increment()

在完成之前,任何任务都将调用latch.decrement()

当最后一个任务终止时,计数器将达到0,从而释放主线程。

欢迎进一步的建议和反馈!

public class CountLatch {

@SuppressWarnings("serial")
private static final class Sync extends AbstractQueuedSynchronizer {

    Sync(int count) {
        setState(count);
    }

    int getCount() {
        return getState();
    }

    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

    protected int acquireNonBlocking(int acquires) {
        // increment count
        for (;;) {
            int c = getState();
            int nextc = c + 1;
            if (compareAndSetState(c, nextc))
                return 1;
        }
    }

    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c - 1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}

private final Sync sync;

public CountLatch(int count) {
    this.sync = new Sync(count);
}

public void awaitZero() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public boolean awaitZero(long timeout, TimeUnit unit) throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

public void increment() {
    sync.acquireNonBlocking(1);
}

public void decrement() {
    sync.releaseShared(1);
}

public String toString() {
    return super.toString() + "[Count = " + sync.getCount() + "]";
}

}

请注意,如Sami Korhonen所建议的,可以将increment()/decrement()调用封装到自定义的Executor子类中,或者如impl所建议的那样,使用beforeExecuteafterExecute封装。看这里:
public class CountingThreadPoolExecutor extends ThreadPoolExecutor {

protected final CountLatch numRunningTasks = new CountLatch(0);

public CountingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
        BlockingQueue<Runnable> workQueue) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}

@Override
public void execute(Runnable command) {
    numRunningTasks.increment();
    super.execute(command);
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
    numRunningTasks.decrement();
    super.afterExecute(r, t);
}

/**
 * Awaits the completion of all spawned tasks.
 */
public void awaitCompletion() throws InterruptedException {
    numRunningTasks.awaitZero();
}

/**
 * Awaits the completion of all spawned tasks.
 */
public void awaitCompletion(long timeout, TimeUnit unit) throws InterruptedException {
    numRunningTasks.awaitZero(timeout, unit);
}

}

关于java - 执行器: How to synchronously wait until all tasks have finished if tasks are created recursively?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/14535770/

10-11 17:05