在工作面试中有人问我一个并发问题,最终将其归结为以下要求。仅通过使用互斥,我就可以实现#2至#4,但不能实现#1。


使用以下方法设计任务队列:

public void registerCallback(Runnable task)

public void eventFired()


多个线程应该能够将任务(可能同时)放在队列中。
eventFired只能被调用一次。
如果先前已调用eventFired,则任何以后对这两个方法的调用都将引发异常。
如果在执行eventFired时调用了registerCallback,则将触发事件延迟到以后。
如果在执行registerCallback时调用eventFired,则引发异常。



ReentrantReadWriteLock似乎很有希望,因为registerCallback可以获取读锁,而eventFired可以写锁,但这不能解决先调用registerCallback然后再调用eventFired的竞争条件。

有任何想法吗?

最佳答案

ReentrantReadWriteLock似乎很有希望,因为registerCallback可以获取读锁,而eventFired可以写锁,但这不能解决先调用registerCallback然后再调用eventFired的竞争条件。


注册回调,换句话说,在保持读取锁定的同时修改数据结构绝不是一个好主意。您需要另一个线程安全的构造来存储回调,并使代码不必要地复杂化。

注册回调的操作(例如将引用存储到某个集合中或实例化任何类型的节点对象)的琐碎操作足以允许使用普通的互斥锁,因为该锁持有时间不长。

无论使用synchronizedLock还是支持原子更新的状态,在任何一种情况下,竞争条件都不存在,因为registerCallbackeventFired不能重叠。当正确使用时,所有这些方法都会使这些操作井然有序。因此,每个registerCallback都在第一个eventFired之前或之后。

实施起来很简单:

public class JobQueue {
    private List<Runnable> callbacks = new ArrayList<>();

    public synchronized void registerCallback(Runnable task) {
        if(callbacks == null) {
            throw new IllegalStateException("Event already fired");
        }
        callbacks.add(Objects.requireNonNull(task));
    }
    public void eventFired() {
        List<Runnable> list;
        synchronized(this) {
            list = callbacks;
            callbacks = null;
        }
        if(list == null) {
            throw new IllegalStateException("Can only fire once");
        }
        for(Runnable r: list) r.run();
    }
}


synchronized块中执行的代码很短,因此对于大多数实际用例而言,争用是无关紧要的。用Lock实现相同的操作很简单,但是没有任何好处。实际上,JVM特定的优化可以使基于synchronized的解决方案更加有效。

为了完整性,这里有一个基于原子更新的解决方案:

public class JobQueue {
    private static final Runnable DONE = () -> {};

    private final AtomicReference<Runnable> pending = new AtomicReference<>();

    public void registerCallback(Runnable task) {
        Objects.requireNonNull(task);
        for(;;) {
            Runnable previous = pending.get();
            if(previous == DONE) throw new IllegalStateException("Event already fired");
            if(pending.compareAndSet(previous, sequence(previous, task))) return;
        }
    }

    public void eventFired() {
        Runnable previous = pending.getAndSet(DONE);
        if(previous == DONE) throw new IllegalStateException("Can only fire once");
        if(previous != null) previous.run();
    }

    static Runnable sequence(Runnable a, Runnable b) {
        return a == null? b: () -> { a.run(); b.run(); };
    }
}


实际上,多个registerCallback和/或eventFired调用的执行可能会重叠,但是在那种情况下,只有一个可以成功执行关键原子更新。这使操作进入一个顺序,最多使一个eventFired调用成功,并对在此之前或之后进行的所有registerCallback调用进行分类。

关于java - 如何设计具有互斥但独立的并发方法的任务队列?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/56510636/

10-10 18:09