问题描述
我是一个ETL过程我正在从Spring Data Repository中检索很多实体。然后我使用并行流将实体映射到不同的实体。
我可以使用消费者逐个将这些新实体存储在另一个存储库中,或者将它们收集到List中并将其存储在单个批量操作中。
第一个是昂贵的,而后者可能超过可用内存。
I an ETL process I'm retrieving a lot of entities from a Spring Data Repository. I'm then using a parallel stream to map the entities to different ones.I can either use a consumer to store those new entities in another repository one by one or collect them into a List and store that in a single bulk operation.The first is costly while the later might exceed the available memory.
是否有一种很好的方法来收集流中的一定数量的元素(如限制消耗该块,并继续并行处理直到所有元素都被处理?
Is there a good way to collect a certain amount of elements in the stream (like limit does), consume that chunk, and keep on going in parallel until all elements are processed?
推荐答案
我的批量操作方法chunking是使用分区spliterator包装器,另一个包装器覆盖默认拆分策略(批量大小的算术级数,以1024为增量)到简单的固定批量拆分。像这样使用:
My approach to bulk operations with chunking is to use a partitioning spliterator wrapper, and another wrapper which overrides the default splitting policy (arithmetic progression of batch sizes in increments of 1024) to simple fixed-batch splitting. Use it like this:
Stream<OriginalType> existingStream = ...;
Stream<List<OriginalType>> partitioned = partition(existingStream, 100, 1);
partitioned.forEach(chunk -> ... process the chunk ...);
以下是完整代码:
import java.util.ArrayList;
import java.util.List;
import java.util.Spliterator;
import java.util.Spliterators.AbstractSpliterator;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class PartitioningSpliterator<E> extends AbstractSpliterator<List<E>>
{
private final Spliterator<E> spliterator;
private final int partitionSize;
public PartitioningSpliterator(Spliterator<E> toWrap, int partitionSize) {
super(toWrap.estimateSize(), toWrap.characteristics() | Spliterator.NONNULL);
if (partitionSize <= 0) throw new IllegalArgumentException(
"Partition size must be positive, but was " + partitionSize);
this.spliterator = toWrap;
this.partitionSize = partitionSize;
}
public static <E> Stream<List<E>> partition(Stream<E> in, int size) {
return StreamSupport.stream(new PartitioningSpliterator(in.spliterator(), size), false);
}
public static <E> Stream<List<E>> partition(Stream<E> in, int size, int batchSize) {
return StreamSupport.stream(
new FixedBatchSpliterator<>(new PartitioningSpliterator<>(in.spliterator(), size), batchSize), false);
}
@Override public boolean tryAdvance(Consumer<? super List<E>> action) {
final ArrayList<E> partition = new ArrayList<>(partitionSize);
while (spliterator.tryAdvance(partition::add)
&& partition.size() < partitionSize);
if (partition.isEmpty()) return false;
action.accept(partition);
return true;
}
@Override public long estimateSize() {
final long est = spliterator.estimateSize();
return est == Long.MAX_VALUE? est
: est / partitionSize + (est % partitionSize > 0? 1 : 0);
}
}
import static java.util.Spliterators.spliterator;
import java.util.Comparator;
import java.util.Spliterator;
import java.util.function.Consumer;
public abstract class FixedBatchSpliteratorBase<T> implements Spliterator<T> {
private final int batchSize;
private final int characteristics;
private long est;
public FixedBatchSpliteratorBase(int characteristics, int batchSize, long est) {
characteristics |= ORDERED;
if ((characteristics & SIZED) != 0) characteristics |= SUBSIZED;
this.characteristics = characteristics;
this.batchSize = batchSize;
this.est = est;
}
public FixedBatchSpliteratorBase(int characteristics, int batchSize) {
this(characteristics, batchSize, Long.MAX_VALUE);
}
public FixedBatchSpliteratorBase(int characteristics) {
this(characteristics, 64, Long.MAX_VALUE);
}
@Override public Spliterator<T> trySplit() {
final HoldingConsumer<T> holder = new HoldingConsumer<>();
if (!tryAdvance(holder)) return null;
final Object[] a = new Object[batchSize];
int j = 0;
do a[j] = holder.value; while (++j < batchSize && tryAdvance(holder));
if (est != Long.MAX_VALUE) est -= j;
return spliterator(a, 0, j, characteristics());
}
@Override public Comparator<? super T> getComparator() {
if (hasCharacteristics(SORTED)) return null;
throw new IllegalStateException();
}
@Override public long estimateSize() { return est; }
@Override public int characteristics() { return characteristics; }
static final class HoldingConsumer<T> implements Consumer<T> {
Object value;
@Override public void accept(T value) { this.value = value; }
}
}
import static java.util.stream.StreamSupport.stream;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.stream.Stream;
public class FixedBatchSpliterator<T> extends FixedBatchSpliteratorBase<T> {
private final Spliterator<T> spliterator;
public FixedBatchSpliterator(Spliterator<T> toWrap, int batchSize, long est) {
super(toWrap.characteristics(), batchSize, est);
this.spliterator = toWrap;
}
public FixedBatchSpliterator(Spliterator<T> toWrap, int batchSize) {
this(toWrap, batchSize, toWrap.estimateSize());
}
public FixedBatchSpliterator(Spliterator<T> toWrap) {
this(toWrap, 64, toWrap.estimateSize());
}
public static <T> Stream<T> withBatchSize(Stream<T> in, int batchSize) {
return stream(new FixedBatchSpliterator<>(in.spliterator(), batchSize), true);
}
public static <T> FixedBatchSpliterator<T> batchedSpliterator(Spliterator<T> toWrap, int batchSize) {
return new FixedBatchSpliterator<>(toWrap, batchSize);
}
@Override public boolean tryAdvance(Consumer<? super T> action) {
return spliterator.tryAdvance(action);
}
@Override public void forEachRemaining(Consumer<? super T> action) {
spliterator.forEachRemaining(action);
}
}
这篇关于有没有一种从java 8流中提取数据块的好方法?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!