我正在对一个项目队列进行长时间运行的处理,而一个项目要么计划要处理,要么正在处理,但是我想禁止其他操作。我的代码基本上如下所示:

public class LongRunningProcess extends Thread {
    private final ConcurrentLinkedQueue<Item> pending = new ConcurrentLinkedQueue<>();
    private final Set<Item> active = Collections.newSetFromMap(new ConcurrentHashMap<Item, Boolean>());

    public LongRunningProcess() {
        // add items to pending; no more items will ever be added
    }

    @Override
    public void run() {
        while (! pending.isEmpty()) {
            // The peek/add/remove pattern here is important. The number
            // of items that are active or scheduled is always decreasing.
            // Because isScheduled checks pending before checking active,
            // this order of operations ensures that we never miss an item
            // as it is being switched from one collection to the other.
            Item nextItem = pending.peek();
            active.add(nextItem);    // <---Can any of these get reordered?
            pending.remove();        // <---+
            processItem(nextItem);   // <---+
            active.remove(nextItem); // <---+
        }
    }

    public boolean isScheduled(Item item) {
        return pending.contains(item) || active.contains(item);
    }
}


可以按照我期望的方式工作吗,还是可以对上面突出显示的代码块重新排序?您能指出我任何相关规格吗?

编辑:

@Banthar的有用评论将我引向了java.util.concurrent package documentation,它明确回答了我的问题:


  java.util.concurrent及其子包中所有类的方法将这些保证扩展到更高级别的同步。尤其是:
  
  
  在将对象放入任何并发集合之前,线程中的操作发生在访问另一个线程中的元素或从集合中删除该元素之后的操作。

最佳答案

可以按照我期望的方式工作吗,还是可以对上面突出显示的两个项目中的任何一个重新排序?您能指出我任何相关规格吗?


简短的答案是,因为两个集合都是并发类,所以active.add(...)不可能在pending.remove()之后发生。


pending.peek();pending.remove();访问volatile字段head

private transient volatile Node<E> head = new Node<E>(null);

active.add(nextItem);访问内部锁定volatile字段:

compareAndSetState(0, acquires)) {



因为您的两个集合都是并发类,所以它们都具有内部锁或volatile变量,因此方法调用具有读/写内存屏障,可确保“先发生”保证。这样可以确保由于Java Memory Model而无法对操作进行重新排序。

但是,这并不意味着您的逻辑是正确的,或者当您查看其他线程如何使用这两个集合时,就没有竞争条件。此外,这些调用不是原子的,因此您可以使用3个线程来执行:


t1-项nextItem =等待.peek();
t2-项目nextItem =待定.peek();
t1-active.add(nextItem);
t3-从活动状态中移除nextItem并对其进行处理
t2-active.add(nextItem);
t3-从活动状态移除nextItem并再次处理

08-18 00:01