当流源是Reader
时,我无法实现流处理的良好并行化。在四核CPU上运行以下代码,我观察到最初使用3个内核,然后突然下降到2个,然后是1个内核。总体CPU利用率徘徊在50%左右。
请注意示例的以下特征:
这意味着所有压力都在CPU上,I/O最小。该示例是用于自动并行化的“坐鸭”。
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
... class imports elided ...
public class Main
{
static final AtomicLong totalTime = new AtomicLong();
public static void main(String[] args) throws IOException {
final long start = System.nanoTime();
final Path inputPath = createInput();
System.out.println("Start processing");
try (PrintWriter w = new PrintWriter(Files.newBufferedWriter(Paths.get("output.txt")))) {
Files.lines(inputPath).parallel().map(Main::processLine)
.forEach(w::println);
}
final double cpuTime = totalTime.get(),
realTime = System.nanoTime()-start;
final int cores = Runtime.getRuntime().availableProcessors();
System.out.println(" Cores: " + cores);
System.out.format(" CPU time: %.2f s\n", cpuTime/SECONDS.toNanos(1));
System.out.format(" Real time: %.2f s\n", realTime/SECONDS.toNanos(1));
System.out.format("CPU utilization: %.2f%%", 100.0*cpuTime/realTime/cores);
}
private static String processLine(String line) {
final long localStart = System.nanoTime();
double ret = 0;
for (int i = 0; i < line.length(); i++)
for (int j = 0; j < line.length(); j++)
ret += Math.pow(line.charAt(i), line.charAt(j)/32.0);
final long took = System.nanoTime()-localStart;
totalTime.getAndAdd(took);
return NANOSECONDS.toMillis(took) + " " + ret;
}
private static Path createInput() throws IOException {
final Path inputPath = Paths.get("input.txt");
try (PrintWriter w = new PrintWriter(Files.newBufferedWriter(inputPath))) {
for (int i = 0; i < 6_000; i++) {
final String text = String.valueOf(System.nanoTime());
for (int j = 0; j < 25; j++) w.print(text);
w.println();
}
}
return inputPath;
}
}
我的典型输出:
Cores: 4
CPU time: 110.23 s
Real time: 53.60 s
CPU utilization: 51.41%
为了进行比较,如果我使用经过稍微修改的变体,则首先将其收集到列表中,然后进行处理:
Files.lines(inputPath).collect(toList()).parallelStream().map(Main::processLine)
.forEach(w::println);
我得到以下典型输出:
Cores: 4
CPU time: 138.43 s
Real time: 35.00 s
CPU utilization: 98.87%
有什么可以解释这种影响的?我如何解决该问题以获得充分利用?
请注意,我最初是在servlet输入流的阅读器上观察到的,因此它并不特定于
FileReader
。 最佳答案
这是Spliterators.IteratorSpliterator
所使用的答案BufferedReader#lines()
的源代码中阐明的答案:
@Override
public Spliterator<T> trySplit() {
/*
* Split into arrays of arithmetically increasing batch
* sizes. This will only improve parallel performance if
* per-element Consumer actions are more costly than
* transferring them into an array. The use of an
* arithmetic progression in split sizes provides overhead
* vs parallelism bounds that do not particularly favor or
* penalize cases of lightweight vs heavyweight element
* operations, across combinations of #elements vs #cores,
* whether or not either are known. We generate
* O(sqrt(#elements)) splits, allowing O(sqrt(#cores))
* potential speedup.
*/
Iterator<? extends T> i;
long s;
if ((i = it) == null) {
i = it = collection.iterator();
s = est = (long) collection.size();
}
else
s = est;
if (s > 1 && i.hasNext()) {
int n = batch + BATCH_UNIT;
if (n > s)
n = (int) s;
if (n > MAX_BATCH)
n = MAX_BATCH;
Object[] a = new Object[n];
int j = 0;
do { a[j] = i.next(); } while (++j < n && i.hasNext());
batch = j;
if (est != Long.MAX_VALUE)
est -= j;
return new ArraySpliterator<>(a, 0, j, characteristics);
}
return null;
}
还值得注意的是常量:
static final int BATCH_UNIT = 1 << 10; // batch array size increment
static final int MAX_BATCH = 1 << 25; // max batch array size;
因此,在我使用6,000个元素的示例中,由于批次大小步长为1024,因此我只能得到三个批次。这恰好可以解释我的观察结果:最初使用了三个核心,然后减少为两个,然后在较小的批次完成时使用一个。同时,我尝试了一个包含60,000个元素的修改示例,然后获得了几乎100%的CPU利用率。
为了解决我的问题,我开发了下面的代码,该代码使我可以将任何现有的流转换成一个流,其
Spliterator#trySplit
将其分成指定大小的批处理。从我的问题中将其用于用例的最简单方法是这样的:toFixedBatchStream(Files.newBufferedReader(inputPath).lines(), 20)
在较低的级别上,下面的类是分隔器包装器,它更改包装的分隔器的
trySplit
行为,并使其他方面保持不变。import static java.util.Spliterators.spliterator;
import static java.util.stream.StreamSupport.stream;
import java.util.Comparator;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.stream.Stream;
public class FixedBatchSpliteratorWrapper<T> implements Spliterator<T> {
private final Spliterator<T> spliterator;
private final int batchSize;
private final int characteristics;
private long est;
public FixedBatchSpliteratorWrapper(Spliterator<T> toWrap, long est, int batchSize) {
final int c = toWrap.characteristics();
this.characteristics = (c & SIZED) != 0 ? c | SUBSIZED : c;
this.spliterator = toWrap;
this.est = est;
this.batchSize = batchSize;
}
public FixedBatchSpliteratorWrapper(Spliterator<T> toWrap, int batchSize) {
this(toWrap, toWrap.estimateSize(), batchSize);
}
public static <T> Stream<T> toFixedBatchStream(Stream<T> in, int batchSize) {
return stream(new FixedBatchSpliteratorWrapper<>(in.spliterator(), batchSize), true);
}
@Override public Spliterator<T> trySplit() {
final HoldingConsumer<T> holder = new HoldingConsumer<>();
if (!spliterator.tryAdvance(holder)) return null;
final Object[] a = new Object[batchSize];
int j = 0;
do a[j] = holder.value; while (++j < batchSize && tryAdvance(holder));
if (est != Long.MAX_VALUE) est -= j;
return spliterator(a, 0, j, characteristics());
}
@Override public boolean tryAdvance(Consumer<? super T> action) {
return spliterator.tryAdvance(action);
}
@Override public void forEachRemaining(Consumer<? super T> action) {
spliterator.forEachRemaining(action);
}
@Override public Comparator<? super T> getComparator() {
if (hasCharacteristics(SORTED)) return null;
throw new IllegalStateException();
}
@Override public long estimateSize() { return est; }
@Override public int characteristics() { return characteristics; }
static final class HoldingConsumer<T> implements Consumer<T> {
Object value;
@Override public void accept(T value) { this.value = value; }
}
}
关于java - Reader#lines()由于其拆分器中不可配置的批量大小策略而严重并行化,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/22569040/