我有N个 worker 共享一个要计算的元素队列。在每次迭代中,每个工作人员都会从队列中删除一个元素,并且可以产生更多要计算的元素,这些元素将被放入同一队列中。基本上,每个生产者也是一个消费者。当队列上没有元素并且所有工作程序都已完成对当前元素的计算时,计算完成(因此无法再生成要计算的元素)。我想避免 dispatch 调度员/协调员,所以 worker 应该协调。允许 worker 找出暂停条件是否有效并因此代表其他人停止计算的最佳模式是什么?

例如,如果所有线程仅执行此循环,则在计算所有元素时,将导致所有线程被永久阻塞:

while (true) {
    element = queue.poll();
    newElements[] = compute(element);
    if (newElements.length > 0) {
        queue.addAll(newElements);
    }
}

最佳答案

保持 Activity 线程数。

public class ThreadCounter {
    public static final AtomicInteger threadCounter = new AtomicInteger(N);
    public static final AtomicInteger queueCounter = new AtomicInteger(0);
    public static final Object poisonPill = new Object();
    public static volatile boolean cancel = false; // or use a final AomticBoolean instead
}

您线程的轮询循环应如下所示(我假设您使用的是BlockingQueue)
while(!ThreadCounter.cancel) {
    int threadCount = ThreadCounter.threadCounter.decrementAndGet(); // decrement before blocking
    if(threadCount == 0 && ThreadCounter.queueCounter.get() == 0) {
        ThreadCounter.cancel = true;
        queue.offer(ThreadCounter.poisonPill);
    } else {
        Object obj = queue.take();
        ThreadCounter.threadCounter.incrementAndGet(); // increment when the thread is no longer blocking
        ThreadCounter.queueCounter.decrementAndGet();
        if(obj == ThreadCounter.poisonPill) {
            queue.offer(obj); // send the poison pill back through the queue so the other threads can read it
            continue;
        }
    }
}

如果一个线程将要阻止BlockingQueue,那么它将使计数器减1;否则,计数器将减1。如果所有线程都已经在队列上等待(意味着counter == 0),那么最后一个线程将cancel设置为true,然后通过队列发送毒药以唤醒其他线程;每个线程看到毒药,将其发送回队列以唤醒其余线程,然后在看到cancel设置为true时退出循环。

编辑:我通过添加queueCounter来维护队列中对象数量的计数,从而消除了数据竞争(显然,无论将对象添加到队列中的何处,您都需要添加queueCounter.incrementAndGet()调用) 。它的工作方式如下:如果是threadCount == 0,但是是queueCount != 0,则意味着线程刚刚从队列中删除了一个项目,但尚未调用threadCount.getAndIncrement,因此cancel变量是,而未设置为true。重要的是threadCount.getAndIncrement调用必须先于queueCount.getAndDecrement调用,否则您仍然会遇到数据争用。调用queueCount.getAndIncrement的顺序无关紧要,因为您不会将其与对threadCount.getAndDecrement的调用交错(后者将在循环结束时调用,前者将在循环开始时调用)。

请注意,您不能仅使用queueCount来确定何时结束进程,因为线程可能仍处于 Activity 状态,而尚未在队列中放置任何数据-换句话说,queueCount将为零,但将为非零。一旦线程完成其当前迭​​代。

您可以让取消线程通过队列发送(N-1)poisonPill,而不是通过队列重复发送poisonPills。如果您在不同的队列中使用此方法,请小心,因为某些队列(例如Amazon的Simple Queue Service)可能会以等效于take方法的方式返回多个项目,在这种情况下,您需要重复发送poisonPill以确保一切都关闭了。

此外,除了使用while(!cancel)循环外,您还可以使用while(true)循环并在循环检测到poisonPill时中断

关于具有停止条件的Java生产者-消费者,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/16592667/

10-12 06:10