我手头有一个问题,我想用某种方法解决,我很确定我不应该这样做,但是没有其他选择。我得到了一个字符串列表,应该将其分成给定大小的块。然后必须将结果传递给某种方法进行进一步处理。由于列表可能很大,因此处理应异步进行。

我的方法是创建一个接受字符串流并将其转换为Stream >的自定义收集器:

final Stream<List<Long>> chunks = list
                        .stream()
                        .parallel()
                        .collect(MyCollector.toChunks(CHUNK_SIZE))
                        .flatMap(p -> doStuff(p))
                        .collect(MyCollector.toChunks(CHUNK_SIZE))
                        .map(...)
                        ...

收集器的代码:
public final class MyCollector<T, A extends List<List<T>>, R extends Stream<List<T>>> implements Collector<T, A, R> {
private final AtomicInteger index = new AtomicInteger(0);
private final AtomicInteger current = new AtomicInteger(-1);
private final int chunkSize;

private MyCollector(final int chunkSize){
    this.chunkSize = chunkSize;
}

@Override
public Supplier<A> supplier() {
    return () -> (A)new ArrayList<List<T>>();
}

@Override
public BiConsumer<A, T> accumulator() {
    return (A candidate, T acc) -> {
        if (index.getAndIncrement() % chunkSize == 0){
            candidate.add(new ArrayList<>(chunkSize));
            current.incrementAndGet();
        }
        candidate.get(current.get()).add(acc);
    };
}

@Override
public BinaryOperator<A> combiner() {
    return (a1, a2) -> {
        a1.addAll(a2);
        return a1;
    };
}
@Override
public Function<A, R> finisher() {
    return (a) -> (R)a.stream();
}

@Override
public Set<Characteristics> characteristics() {
    return Collections.unmodifiableSet(EnumSet.of(Characteristics.CONCURRENT, Characteristics.UNORDERED));
}

public static <T> MyCollector<T, List<List<T>>, Stream<List<T>>> toChunks(final int chunkSize){
    return new MyCollector<>(chunkSize);
}

}

在大多数情况下,这似乎可行,但有时会得到NPE。.我确定累加器中的线程不是线程安全的,因为在向主列表添加新列表时可能会有两个线程在干扰。我不介意一个块包含太多或太少的元素。

我已经尝试过此方法,而不是当前的供应商功能:
 return () -> (A)new ArrayList<List<T>>(){{add(new ArrayList<T>());}};

确保始终存在一个列表。这根本不起作用,并导致列表为空。

问题:
  • 我很确定自定义Spliterator将是一个很好的解决方案。但是,它不适用于同步方案。另外,我确定要调用Spliterator吗?
  • 我知道我根本不应该拥有状态,但是不确定如何更改它。

  • 问题:
  • 这种方法是完全错误的还是可以解决的?
  • 如果我使用分离器-我可以确定它是被调用还是由底层实现决定?
  • 我很确定供应商和修整机中的(A)和(R)强制转换不是必需的,但是IntelliJ抱怨。有什么我想念的吗?

  • 编辑:
  • 我已向客户端代码添加了更多内容,因为IntStream.range的建议在链接时不起作用。
  • 我意识到我可以按照注释中的建议进行不同的处理,但是这还涉及样式以及是否有可能。
  • 我具有CONCURRENT特性,因为我假设Stream API会退回到同步处理,否则。如前所述,该解决方案不是线程安全的。

  • 任何帮助将不胜感激。

    最好,
    d

    最佳答案

    我尚无法发表评论,但我想将以下链接发布到一个非常相似的问题(据我了解,虽然不是重复的):Java 8 Stream with batch processing

    您可能也对GitHub上的以下问题感兴趣:https://github.com/jOOQ/jOOL/issues/296

    现在,您对CONCURRENT特性的使用是错误的-文档说了有关Collector.Characteristics.CONCURRENT的以下内容:

    指示此收集器是并发的,这意味着结果容器可以支持与来自多个线程的同一结果容器并发调用的累加器函数。

    这意味着supplier仅被调用一次,而combiner实际上从未被调用(参见ReferencePipeline.collect()方法的源代码)。这就是为什么有时需要NPE的原因。

    因此,我建议您提供一个简化的版本:

    public static <T> Collector<T, List<List<T>>, Stream<List<T>>> chunked(int chunkSize) {
      return Collector.of(
              ArrayList::new,
              (outerList, item) -> {
                if (outerList.isEmpty() || last(outerList).size() >= chunkSize) {
                  outerList.add(new ArrayList<>(chunkSize));
                }
                last(outerList).add(item);
              },
              (a, b) -> {
                a.addAll(b);
                return a;
              },
              List::stream,
              Collector.Characteristics.UNORDERED
      );
    }
    
    private static <T> T last(List<T> list) {
      return list.get(list.size() - 1);
    }
    

    另外,您可以使用适当的同步编写一个真正的并发Collector,但是如果您不介意包含多个列表且大小小于chunkSize(这是您使用非并发Collector所获得的效果,如我建议的那样)以上),我不会打扰。

    10-07 19:25
    查看更多