我有一个一般的Streams API问题,想“有效地”解决。假设我有一个(可能非常大,可能是无限的)流。我想以某种方式对其进行预处理,例如,过滤掉某些项目并对其进行突变。让我们假设这种预处理是复杂的,时间和计算密集型的,因此我不想重复两次。
接下来,我要对项目序列执行两组不同的操作,并使用不同的流类型构造来处理每个独特序列的远端。对于无限流,这将是forEach,对于有限流,则可能是收集器或其他任何对象。
显然,我可以将中间结果收集到一个列表中,然后将两个单独的流从该列表中拖出,分别处理每个流。这将对有限的流有效,尽管a)看起来“丑陋”,b)对于非常大的流可能不切实际,而平坦对于无限的流则不起作用。
我想我可以将peek用作一种“tee”。然后,我可以对peek的下游结果执行一连串的处理,并以某种方式迫使peek的Consumer完成“其他”工作,但是现在第二条路径不再是一条流。
我发现我可以创建一个BlockingQueue,使用peek将内容推送到该队列中,然后从该队列中获取流。这似乎是一个不错的主意,并且实际上效果很好,尽管我无法理解流是如何关闭的(它实际上是关闭的,但是我看不到如何)。下面的示例代码说明了这一点:
List<Student> ls = Arrays.asList(
new Student("Fred", 2.3F)
// more students (and Student definition) elided ...
);
BlockingQueue<Student> pipe = new LinkedBlockingQueue<>();
ls.stream()
.peek(s -> {
try {
pipe.put(s);
} catch (InterruptedException ioe) {
ioe.printStackTrace();
}
})
.forEach(System.out::println);
new Thread(
new Runnable() {
public void run() {
Map<String, Double> map =
pipe.stream()
.collect(Collectors.groupingBy(s->s.getName(),
Collectors.averagingDouble(s->s.getGpa())));
map.forEach(
(k,v)->
System.out.println(
"Students called " + k
+ " average " + v));
}
}).start();
因此,第一个问题是:是否有“更好”的方法来做到这一点?
第二个问题,BlockingQueue上的流如何关闭?
干杯,
托比
最佳答案
有趣的问题。我将首先回答第二个问题,因为这是一个更简单的问题。
我认为您所说的“关闭”是指流中有一定数量的元素,然后结束,而不考虑将来可能添加到队列中的任何元素。原因是队列上的流仅代表创建流时队列的当前内容。它不代表任何将来的元素,即将来某些其他线程可能添加的元素。
如果您想要一个流来表示队列的当前和将来的内容,则可以使用other answer中描述的技术。基本上使用Stream.generate()
调用queue.take()
。不过,我认为这不是您想要执行的操作,因此在此不再赘述。
现在到您更大的问题。
您有一个对象源,您想对其进行一些处理,包括过滤。然后,您要获取结果并通过两个不同的下游处理步骤将其发送。本质上,您只有一个生产者和两个消费者。
您必须处理的基本问题之一是如何处理不同处理步骤以不同速率发生的情况。假设我们已经解决了如何从队列中获取流而不使流过早终止的问题。如果生产者生产元素的速度快于生产者处理此队列中元素的速度,则队列将累积元素直到填充所有可用内存。
您还必须决定如何以不同的速率处理不同的使用者处理元素。如果一个使用者的速度明显慢于另一个使用者,则可能需要缓冲任意数量的元素(这可能会填满内存),或者必须降低速度较快的使用者的速度,以匹配速度较慢的使用者的平均速率。
让我扔掉你如何进行的草图。不过,我不知道您的实际要求,所以我不知道这是否令人满意。需要注意的一件事是,在这种应用程序中使用并行流可能会出现问题,因为并行流不能很好地处理阻塞和负载平衡。
首先,我将从生产者的流处理元素开始,并将其累积为ArrayBlockingQueue
:
BlockingQueue<T> queue = new ArrayBlockingQueue<>(capacity);
producer.map(...)
.filter(...)
.forEach(queue::put);
(请注意,
put
会引发InterruptedException
,因此您不能仅将queue::put
放在此处。您必须在此处放置try-catch块,或者编写一个辅助方法。但是,如果捕获到InterruptedException
,该怎么办并不明显。)如果队列已满,这将阻塞管道。可以在自己的线程中顺序运行此线程,或者在专用线程池中并行运行此线程,以避免阻塞公共(public)池。
接下来,消费者:
while (true) {
// wait until the queue is full, or a timeout has expired,
// depending upon how frequently you want to continue
// processing elements emitted by the producer
List<T> list = new ArrayList<>();
queue.drainTo(list);
downstream1 = list.stream().filter(...).map(...).collect(...);
downstream2 = list.stream().filter(...).map(...).collect(...);
// deal with results downstream1 and downstream2
}
这里的关键是使用
drainTo
方法批量完成从生产者到消费者的切换,该方法将队列的元素添加到目标并自动清空队列。这样,消费者不必等待生产者完成其处理(如果它是无限的,则不会发生)。此外,使用者正在使用已知数量的数据,并且不会在处理过程中阻塞。因此,如果有帮助的话,每个使用者流都可以并行运行。在这里,我让消费者步调一致。如果希望使用者以不同的速度运行,则必须构造其他队列(或其他队列)以独立缓冲他们的工作负载。
如果消费者总体上比生产者慢,则队列最终将被填满并被阻塞,从而使生产者的速度减慢到消费者可以接受的速度。如果消费者平均比生产者快,那么也许您不必担心消费者的相对处理速度。您可以让它们循环并拾取生产者设法放入队列的所有内容,甚至阻止它们直到有可用的东西为止。
我应该说,我概述的是一种非常简单的多阶段流水线方法。如果您的应用程序对性能至关重要,那么您可能会发现自己需要做很多工作来调整内存消耗,负载平衡,增加吞吐量和减少延迟。还有其他框架可能更适合您的应用程序。例如,您可能会看看LMAX Disruptor,尽管我自己对此没有任何经验。
关于java-8 - 一股上游流向多股下游流供料,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/30017961/