我非常需要类似于CountDownLatch的同步器,但是倒数的起始编号是未知的。要添加上下文,如果我要通过一个缓冲的记录集(例如来自文本文件或查询)并为每个记录启动一个可运行记录,但是我不知道会有多少条记录...我需要一个同步器,在迭代完成且所有可运行项完成时发出信号。
这是我想出的同步器... BufferedLatch。在迭代循环中,为每个记录递增recordSetSize的方法被调用。在为每个记录启动的每个可运行语句结束时,已处理记录集大小将增加。当所有记录的迭代完成时(但可运行对象可能仍在队列中),将调用setDownloadComplete()方法,以使BufferedLatch知道recordSetSize现在已固定。 await()方法等待erationComplete变量为真(recordsetSize现在已固定),并且recordsetSize ==已处理RecordSetSize;
这是该同步器的最佳实现吗?同步是否会阻止更多的并发机会?尽管测试似乎可以正常进行,但是我有没有忽略的陷阱?
import java.util.concurrent.atomic.AtomicInteger;
public final class BufferedLatch {
/** A customized synchronizer built for concurrent iteration processes where the number of objects to be iterated is unknown
* and a runnable will be kicked off for each object, and the await() method will wait for all runnables to be complete
*/
private final AtomicInteger recordsetSize = new AtomicInteger(0);
private final AtomicInteger processedRecordsetSize = new AtomicInteger(0);
private volatile boolean iterationComplete = false;
public int incrementRecordsetSize() throws Exception {
if (iterationComplete) {
throw new Exception("Cannot increase recordsize after download is flagged complete!");
}
else {
return recordsetSize.incrementAndGet();
}
}
public void incrementProcessedRecordSize() {
synchronized(this) {
processedRecordsetSize.incrementAndGet();
if (iterationComplete) {
if (processedRecordsetSize.get() == recordsetSize.get()) {
this.notifyAll();
}
}
}
}
public void setDownloadComplete() {
synchronized(this) {
iterationComplete = true;
}
}
public void await() throws InterruptedException {
while (! (iterationComplete && (processedRecordsetSize.get() == recordsetSize.get()))) {
synchronized(this) {
while (! (iterationComplete && (processedRecordsetSize.get() == recordsetSize.get()))) {
this.wait();
}
}
}
}
}
更新-新代码
public final class BufferedLatch {
/** A customized synchronizer built for concurrent iteration processes where the number of objects to be iterated is unknown
* and a runnable will be kicked off for each object, and the await() method will wait for all runnables to be complete
*/
private int recordCount = 0;
private int processedRecordCount = 0;
private boolean iterationComplete = false;
public synchronized void incrementRecordCount() throws Exception {
if (iterationComplete) {
throw new Exception("Cannot increase recordCount after download is flagged complete!");
}
else {
recordCount++;
}
}
public synchronized void incrementProcessedRecordCount() {
processedRecordCount++;
if (iterationComplete && recordCount == processedRecordCount) {
this.notifyAll();
}
}
public synchronized void setIterationComplete() {
iterationComplete = true;
if (iterationComplete && recordCount == processedRecordCount) {
this.notifyAll();
}
}
public synchronized void await() throws InterruptedException {
while (! (iterationComplete && (recordCount == processedRecordCount))) {
this.wait();
}
}
}
最佳答案
可能不是。我认为从概念上讲,您正在此处,因为看起来您的应用程序需要的不仅是CountDownLatch
。但是,实现似乎有几个问题。
首先,我注意到混合使用原子/挥发物和普通对象监视器锁(synchronized
)看起来很奇怪。虽然可能有适当的用法来混合这些不同的结构,但在这种情况下,我相信混合会导致错误。
考虑首先检查incrementRecordsetSize()
的iterationComplete
,只有它为false时,它才递增recordsetSize
。 iterationComplete
变量是易失性的,因此其他线程的更新将可见。但是,此处未进行锁定的事实允许TOCTOU竞争条件(检查时间与使用时间)。规则似乎是,如果recordsetSize
为true,则不得递增iterationComplete
。假设线程T1出现并发现iterationComplete
为假,因此它决定递增recordsetSize
。在此之前,出现另一个线程T2并将iterationComplete
设置为true。这将允许T1进行不正确的增量。更糟糕的是,在此之前,假设另一个线程T3出现并称为incrementProcessedRecordSize()
。它将增加processedRecordsetSize
,然后找到iterationComplete
true。进一步可能会发现processedRecordsetSize
等于recordsetSize
,然后通知所有服务员,然后这些服务员就好像处理已完成一样继续进行。但是事实并非如此,因为T1随后继续增加recordsetSize
并可能继续进行处理。
这里的问题在于,该对象的状态由三个独立的状态部分(两个int计数器和一个布尔值)的融合组成,并且必须以原子方式读取和写入所有三个状态。如果逻辑的某些位试图利用单个的易失性或原子属性,则会引入竞争条件的可能性,例如我所描述的。
我建议将其重写为具有两个简单整数和一个布尔值(不是原子的,不是易失性的)的普通对象,并且仅锁定所有内容。当然,这应该弄清楚逻辑,并使事情更容易理解。
在incrementProcessedRecordSize
中,我注意到该条件实质上与await
方法中的条件重复。简化约定是让所有更新通知并仅由侍者评估条件。这可能会导致不必要的唤醒。如果这是一个问题,则可以考虑减少通知的数量,但是您需要考虑可维护性。如果您不小心,则等待/通知条件将散布在整个代码中,并且很难推理。另外,您可以将条件重构为方法,然后在进行等待和通知的不同位置调用它。
看来await()
做着复杂的形式的双重检查锁定。它没有在锁外部测试易失性布尔值,而是在锁外部和内部测试了几条单独的信息。这似乎容易受到TOCTOU问题的影响(如上所述),但是如果可以证明状态确实锁存,那就是安全的,也就是说,一旦状态变为true,就永远不会返回false。我必须凝视代码很长时间,才能确信自己是正确的。
另一方面,这能给您带来什么?似乎仅将锁取走就可以优化。如果在处理完成后会有成千上万的线程要来,那也许是值得的,但看起来却并非如此。我只是删除外部的while循环并检查synchronized
块中的变量。
最后,拥有一个代表计数器和布尔值的对象可能对您正在做的事情非常明智,但是您所说的(在问题和评论中)其他事情是某些线程正在产生工作负载(例如,阅读行)从文件中删除),其他线程将撤消该工作负载。这意味着存在一些其他数据结构,例如包含此工作负载的队列,并且您在这里遇到生产者-消费者问题。当然,其他结构必须是线程安全的,因为有多个线程在该结构上进行交互。但是,此结构中的计数器和布尔值需要与工作负载结构的更新同步进行更新,否则,在检查和更新这些单独的对象之间可能存在竞争条件。
在我看来,您可以用队列替换该对象中的计数器,并在所有内容周围放置简单的锁。生产者将追加到队列中直到完成,然后将iterationComplete
设置为true,以防止添加更多工作。使用者从队列中拉出,直到iterationComplete
为true并且队列为空为止,此时它们已完成。如果他们发现队列为空,但iterationComplete
为false,则他们在等待进一步工作时会阻塞。
我要坚持简单的锁定,并避免挥发物/原子,直到您掌握正确的基础知识为止。如果该代码中存在瓶颈,则在保留相同不变式的同时有选择地应用优化。