作为an answer的question about pausing a BlockingQueue,我想到了使用现有的阻塞结构blockingQueue2
并用两个不同的锁来保护状态的想法。
public class BlockingQueueWithPause<E> extends LinkedBlockingQueue<E> {
private static final long serialVersionUID = 184661285402L;
private Object lock1 = new Object();//used in pause() and in take()
private Object lock2 = new Object();//used in pause() and unpause()
//@GuardedBy("lock1")
private volatile boolean paused;
private LinkedBlockingQueue<Object> blockingQueue2 = new LinkedBlockingQueue<Object>();
public void pause() {
if (!paused) {
synchronized (lock1) {
synchronized (lock2) {
if (!paused) {
paused = true;
blockingQueue2.removeAll();//make sure it is empty, e.g after successive calls to pause() and unpause() without any consumers it will remain unempty
}
}
}
}
}
public void unpause() throws InterruptedException {
if (paused) {
synchronized (lock2) {
paused = false;
blockingQueue2.put(new Object());//will release waiting thread, if there is one
}
}
}
@Override
public E take() throws InterruptedException {
E result = super.take();
if (paused) {
synchronized (lock1) {//this guarantees that a single thread will be in the synchronized block, all other threads will be waiting
if (paused) {
blockingQueue2.take();
}
}
}
return result;
}
//TODO override similarly the poll() method.
}
我需要两个不同的锁,否则
unpause()
可以等待使用者线程已经保存在lock1
中的take()
。我的问题:
这会陷入僵局吗?
真的有用吗?
由于我自己不可读,您多久看到一次这样的代码?
我应该如何用
paused
注释@GuardedBy("lock1, locks2")
标志?PS:欢迎进行任何改进(除了我可以使用二进制信号量代替
blockingQueue2
之外)。 最佳答案
我会一一回答您的问题
这会陷入僵局吗?
不,您不会导致死锁。如果获得lock1
和lock2
的顺序不同,则可能导致死锁。由于同时持有两者时您以相同的顺序获得它们,所以应该没问题。
真的有用吗?
看来。所有的事情发生在订购之前似乎都可以满足。
由于我自己不可读,您多久看到一次这样的代码?
我以前从未见过这种实现。我同意这不是很优雅。
我将提出使用Phaser的替代解决方案。可以说这不再是一种优雅,而只是一种替代方法。我已经审查了一段时间,我认为这已经足够。当然,我也从未见过这种方法,但是想起来很有趣。
public static class BlockingQueueWithPause<E> extends LinkedBlockingQueue<E> {
private static final long serialVersionUID = 184661285402L;
private final Phaser phaser = new Phaser(1);
private volatile int phase = phaser.getPhase();
public BlockingQueueWithPause() {
// base case, all phase 0 await's will succeed through.
phaser.arrive();
}
public void pause() {
phase = phaser.getPhase();
}
public void unpause() throws InterruptedException {
phaser.arrive();
}
@Override
public E take() throws InterruptedException {
phaser.awaitAdvance(phase);
E result = super.take();
return result;
}
}
我想我应该解释这个解决方案。移相器就像CylicBarrier和CountDownLatch有孩子一样。它允许在不等待障碍物绊倒的情况下重新使用障碍物。
在基本情况下,共享的
phase
将为0。由于在构造函数中调用了arrive
,因此phaser
的内部阶段是1。因此,如果在没有调用take
的情况下调用pause
awaitAdvance
将被调用为0。由于内部相位为1,所以移相器快速路径输出并且是一个简单的易失性负载(已经发生了0相,因此我们不必再等待前进)。如果调用
pause
,则共享的phase
变量将更新为相位器的内部相位,该相位现在为1。因此,take
将对awaitTermination
设置为1,从而使其挂起。unpause
到达将导致所有awaitAdvance
线程释放,并将移相器的内部相位增加到2。同样,后续的take将快速退出,而没有相应的暂停。