作为an answerquestion about pausing a BlockingQueue,我想到了使用现有的阻塞结构blockingQueue2并用两个不同的锁来保护状态的想法。

public class BlockingQueueWithPause<E> extends LinkedBlockingQueue<E> {

    private static final long serialVersionUID = 184661285402L;

    private Object lock1 = new Object();//used in pause() and in take()
    private Object lock2 = new Object();//used in pause() and unpause()

    //@GuardedBy("lock1")
    private volatile boolean paused;

    private LinkedBlockingQueue<Object> blockingQueue2 = new LinkedBlockingQueue<Object>();

    public void pause() {
        if (!paused) {
            synchronized (lock1) {
            synchronized (lock2) {
                if (!paused) {
                    paused = true;
                    blockingQueue2.removeAll();//make sure it is empty, e.g after successive calls to pause() and unpause() without any consumers it will remain unempty
                }
            }
            }
        }
    }

    public void unpause() throws InterruptedException {
        if (paused) {
            synchronized (lock2) {
                paused = false;
                blockingQueue2.put(new Object());//will release waiting thread, if there is one
            }
        }
    }

    @Override
    public E take() throws InterruptedException {
        E result = super.take();

        if (paused) {
            synchronized (lock1) {//this guarantees that a single thread will be in the synchronized block, all other threads will be waiting
                if (paused) {
                    blockingQueue2.take();
                }
            }
        }

        return result;
    }

    //TODO override similarly the poll() method.
}


我需要两个不同的锁,否则unpause()可以等待使用者线程已经保存在lock1中的take()

我的问题:


这会陷入僵局吗?
真的有用吗?
由于我自己不可读,您多久看到一次这样的代码?
我应该如何用paused注释@GuardedBy("lock1, locks2")标志?


PS:欢迎进行任何改进(除了我可以使用二进制信号量代替blockingQueue2之外)。

最佳答案

我会一一回答您的问题


  这会陷入僵局吗?


不,您不会导致死锁。如果获得lock1lock2的顺序不同,则可能导致死锁。由于同时持有两者时您以相同的顺序获得它们,所以应该没问题。


  真的有用吗?


看来。所有的事情发生在订购之前似乎都可以满足。


  由于我自己不可读,您多久看到一次这样的代码?


我以前从未见过这种实现。我同意这不是很优雅。



我将提出使用Phaser的替代解决方案。可以说这不再是一种优雅,而只是一种替代方法。我已经审查了一段时间,我认为这已经足够。当然,我也从未见过这种方法,但是想起来很有趣。

public static class BlockingQueueWithPause<E> extends LinkedBlockingQueue<E> {

    private static final long serialVersionUID = 184661285402L;

    private final Phaser phaser = new Phaser(1);
    private volatile int phase = phaser.getPhase();

    public BlockingQueueWithPause() {
        // base case, all phase 0 await's will succeed through.
        phaser.arrive();
    }

    public void pause() {
        phase = phaser.getPhase();
    }

    public void unpause() throws InterruptedException {
        phaser.arrive();
    }

    @Override
    public E take() throws InterruptedException {
        phaser.awaitAdvance(phase);

        E result = super.take();

        return result;
    }
}


我想我应该解释这个解决方案。移相器就像CylicBarrierCountDownLatch有孩子一样。它允许在不等待障碍物绊倒的情况下重新使用障碍物。

在基本情况下,共享的phase将为0。由于在构造函数中调用了arrive,因此phaser的内部阶段是1。因此,如果在没有调用take的情况下调用pause awaitAdvance将被调用为0。由于内部相位为1,所以移相器快速路径输出并且是一个简单的易失性负载(已经发生了0相,因此我们不必再等待前进)。

如果调用pause,则共享的phase变量将更新为相位器的内部相位,该相位现在为1。因此,take将对awaitTermination设置为1,从而使其挂起。

unpause到达将导致所有awaitAdvance线程释放,并将移相器的内部相位增加到2。同样,后续的take将快速退出,而没有相应的暂停。

09-26 03:01