我手头有一个问题,我想用某种方法解决,我很确定我不应该这样做,但是没有其他选择。我得到了一个字符串列表,应该将其分成给定大小的块。然后必须将结果传递给某种方法进行进一步处理。由于列表可能很大,因此处理应异步进行。
我的方法是创建一个接受字符串流并将其转换为Stream >的自定义收集器:
final Stream<List<Long>> chunks = list
.stream()
.parallel()
.collect(MyCollector.toChunks(CHUNK_SIZE))
.flatMap(p -> doStuff(p))
.collect(MyCollector.toChunks(CHUNK_SIZE))
.map(...)
...
收集器的代码:
public final class MyCollector<T, A extends List<List<T>>, R extends Stream<List<T>>> implements Collector<T, A, R> {
private final AtomicInteger index = new AtomicInteger(0);
private final AtomicInteger current = new AtomicInteger(-1);
private final int chunkSize;
private MyCollector(final int chunkSize){
this.chunkSize = chunkSize;
}
@Override
public Supplier<A> supplier() {
return () -> (A)new ArrayList<List<T>>();
}
@Override
public BiConsumer<A, T> accumulator() {
return (A candidate, T acc) -> {
if (index.getAndIncrement() % chunkSize == 0){
candidate.add(new ArrayList<>(chunkSize));
current.incrementAndGet();
}
candidate.get(current.get()).add(acc);
};
}
@Override
public BinaryOperator<A> combiner() {
return (a1, a2) -> {
a1.addAll(a2);
return a1;
};
}
@Override
public Function<A, R> finisher() {
return (a) -> (R)a.stream();
}
@Override
public Set<Characteristics> characteristics() {
return Collections.unmodifiableSet(EnumSet.of(Characteristics.CONCURRENT, Characteristics.UNORDERED));
}
public static <T> MyCollector<T, List<List<T>>, Stream<List<T>>> toChunks(final int chunkSize){
return new MyCollector<>(chunkSize);
}
}
在大多数情况下,这似乎可行,但有时会得到NPE。.我确定累加器中的线程不是线程安全的,因为在向主列表添加新列表时可能会有两个线程在干扰。我不介意一个块包含太多或太少的元素。
我已经尝试过此方法,而不是当前的供应商功能:
return () -> (A)new ArrayList<List<T>>(){{add(new ArrayList<T>());}};
确保始终存在一个列表。这根本不起作用,并导致列表为空。
问题:
问题:
编辑:
任何帮助将不胜感激。
最好,
d
最佳答案
我尚无法发表评论,但我想将以下链接发布到一个非常相似的问题(据我了解,虽然不是重复的):Java 8 Stream with batch processing
您可能也对GitHub上的以下问题感兴趣:https://github.com/jOOQ/jOOL/issues/296
现在,您对CONCURRENT
特性的使用是错误的-文档说了有关Collector.Characteristics.CONCURRENT
的以下内容:
指示此收集器是并发的,这意味着结果容器可以支持与来自多个线程的同一结果容器并发调用的累加器函数。
这意味着supplier
仅被调用一次,而combiner
实际上从未被调用(参见ReferencePipeline.collect()
方法的源代码)。这就是为什么有时需要NPE的原因。
因此,我建议您提供一个简化的版本:
public static <T> Collector<T, List<List<T>>, Stream<List<T>>> chunked(int chunkSize) {
return Collector.of(
ArrayList::new,
(outerList, item) -> {
if (outerList.isEmpty() || last(outerList).size() >= chunkSize) {
outerList.add(new ArrayList<>(chunkSize));
}
last(outerList).add(item);
},
(a, b) -> {
a.addAll(b);
return a;
},
List::stream,
Collector.Characteristics.UNORDERED
);
}
private static <T> T last(List<T> list) {
return list.get(list.size() - 1);
}
另外,您可以使用适当的同步编写一个真正的并发
Collector
,但是如果您不介意包含多个列表且大小小于chunkSize
(这是您使用非并发Collector
所获得的效果,如我建议的那样)以上),我不会打扰。