Java向导!

我想在Java中尽可能有效地实现以下要求。每微秒都很重要。

tl;博士版本是我有一些需要在新数据上运行的计算。数据更改后,就需要运行计算。如果数据在计算完成之前发生更改,则需要取消计算并从最新数据开始计算。

详细地:

  • 我有N个异步更新的新数据源。称它们为DataSource类的实例(DataSource ds1 = new DataSource();DataSource ds2 = new DataSource();等)
  • 如果可用,则getNewData()的公共(public)方法DataSource返回新数据,否则,线程将阻塞直到有新数据。
  • GlobalState为任何给定时刻流的所有状态的快照。每当任何流更新时,GlobalState都会更改。换句话说,GlobalState始终具有有关所有流数据的最新信息。如果Java通过引用传递,则可以想象如下实例化GlobalState:GlobalState gs = new GlobalState(ds1.datum, ds2.datum, ...);
  • 一旦GlobalState更改(由于其中一个流更新),就开始了一项作业,这可能会花费一些时间。如果作业在GlobalState再次更改之前完成,那么很好,我们将保存结果,然后等待其更改,然后对新状态ad infinitum进行处理。如果在GlobalState再次更改之前还没有完成,则该作业将被取消,并为新状态启动一个新的作业。

  • 我最好的猜测:
    public class App {
        public static void main(String[] args) {
    
            DataSource ds1 = new DataSource(...);
            DataSource ds2 = new DataSource(...);
            GlobalState gs = new GlobalState(ds1, ds2);
    
            ds1.start(); // runs and updates its data asynchronously
            ds2.start(); // runs and updates its data asynchronously
    
            Worker worker = new Worker();
    
            while(true) {
                try{
                    GlobalDataState gds = gs.getState(); // this blocks if the state isn't different from when the method was last called.
                    Future result = worker.doWork(gds); // work happening in a different thread.
                    System.out.println("Result is: " + result.get()); // blocks until its result is available or cancelled.
                } catch (CancellationException ce) {
                    System.err.println("Workers too slow! Starting over on new data.");
                }
            }
        }
    }
    
    public class Worker {
    
        private Future pendingResult;
        private final ExecutorService exec;
    
        public Worker() {
            this.exec = Executors.newFixedThreadPool(2);
        }
    
        public Future doWork(GlobalDataState gds) { // GlobalDataState implements Callable
            // cancels jobs that hadn't finished yet.
            if (pendingResult != null ) {
                if (!pendingResult.isDone()) {
                    pendingResult.cancel(true);
                }
            }
            pendingResult = exec.submit(gds);
    
            return pendingResult;
        }
    
    }
    

    我遇到的主要问题是弄清楚如何以不需要我在循环中轮询新数据的方式来实现GlobalState。我在想这样做的方法是使用容量为1(SynchronousQueueArrayBlockingQueue(1),...?)的阻塞队列,但我只希望它阻塞take()而不是put()。如果在gs.getState()调用中阻止了主线程,则无法使该块成为程序的一部分,该程序正在向此单个元素队列添加新的GlobalDataState。另一方面,如果数据更新的速度快于工作人员处理数据的速度,则我不希望旧数据在此队列中等待。如果队列中有一个GlobalDataState对象,并且提供了另一个对象,则需要逐出该对象并添加新对象。这样,每当主线程确实调用gs.getState()时,它绝对是最新的信息。

    我还考虑过使用Phaser来管理对计算的批准,但是我的每一次尝试似乎都是困惑的。

    所以这是我最好的猜测。我将对与数据结构和/或设计模式最能实现应用程序目标的任何建议表示感谢。记住,每一微秒都是重要的。

    谢谢!

    最佳答案

    我只了解您的“tl; dr版本”,但这应该很容易:每当有新数据传入时,就会调用您的某个同步方法(这必须是可能的,否则您将无法更新状态)。在此方法中,您可以取消与上一个计算相对应的将来对象,然后提交新的计算,并用新对象替换先前的将来对象。就是这样,您不需要队列或类似队列的容器。

    09-05 22:15
    查看更多