当流源是Reader时,我无法实现流处理的良好并行化。在四核CPU上运行以下代码,我观察到最初使用3个内核,然后突然下降到2个,然后是1个内核。总体CPU利用率徘徊在50%左右。

请注意示例的以下特征:

  • 只有6,000行;
  • 每行大约需要20毫秒的处理时间;
  • 整个过程大约需要一分钟。

  • 这意味着所有压力都在CPU上,I/O最小。该示例是用于自动并行化的“坐鸭”。
    import static java.util.concurrent.TimeUnit.NANOSECONDS;
    import static java.util.concurrent.TimeUnit.SECONDS;
    
    ... class imports elided ...
    
    public class Main
    {
      static final AtomicLong totalTime = new AtomicLong();
    
      public static void main(String[] args) throws IOException {
        final long start = System.nanoTime();
        final Path inputPath = createInput();
        System.out.println("Start processing");
    
        try (PrintWriter w = new PrintWriter(Files.newBufferedWriter(Paths.get("output.txt")))) {
          Files.lines(inputPath).parallel().map(Main::processLine)
            .forEach(w::println);
        }
    
        final double cpuTime = totalTime.get(),
                     realTime = System.nanoTime()-start;
        final int cores = Runtime.getRuntime().availableProcessors();
        System.out.println("          Cores: " + cores);
        System.out.format("       CPU time: %.2f s\n", cpuTime/SECONDS.toNanos(1));
        System.out.format("      Real time: %.2f s\n", realTime/SECONDS.toNanos(1));
        System.out.format("CPU utilization: %.2f%%", 100.0*cpuTime/realTime/cores);
      }
    
      private static String processLine(String line) {
        final long localStart = System.nanoTime();
        double ret = 0;
        for (int i = 0; i < line.length(); i++)
          for (int j = 0; j < line.length(); j++)
            ret += Math.pow(line.charAt(i), line.charAt(j)/32.0);
        final long took = System.nanoTime()-localStart;
        totalTime.getAndAdd(took);
        return NANOSECONDS.toMillis(took) + " " + ret;
      }
    
      private static Path createInput() throws IOException {
        final Path inputPath = Paths.get("input.txt");
        try (PrintWriter w = new PrintWriter(Files.newBufferedWriter(inputPath))) {
          for (int i = 0; i < 6_000; i++) {
            final String text = String.valueOf(System.nanoTime());
            for (int j = 0; j < 25; j++) w.print(text);
            w.println();
          }
        }
        return inputPath;
      }
    }
    

    我的典型输出:
              Cores: 4
           CPU time: 110.23 s
          Real time: 53.60 s
    CPU utilization: 51.41%
    

    为了进行比较,如果我使用经过稍微修改的变体,则首先将其收集到列表中,然后进行处理:
    Files.lines(inputPath).collect(toList()).parallelStream().map(Main::processLine)
      .forEach(w::println);
    

    我得到以下典型输出:
              Cores: 4
           CPU time: 138.43 s
          Real time: 35.00 s
    CPU utilization: 98.87%
    

    有什么可以解释这种影响的?我如何解决该问题以获得充分利用?

    请注意,我最初是在servlet输入流的阅读器上观察到的,因此它并不特定于FileReader

    最佳答案

    这是Spliterators.IteratorSpliterator所使用的答案BufferedReader#lines()的源代码中阐明的答案:

        @Override
        public Spliterator<T> trySplit() {
            /*
             * Split into arrays of arithmetically increasing batch
             * sizes.  This will only improve parallel performance if
             * per-element Consumer actions are more costly than
             * transferring them into an array.  The use of an
             * arithmetic progression in split sizes provides overhead
             * vs parallelism bounds that do not particularly favor or
             * penalize cases of lightweight vs heavyweight element
             * operations, across combinations of #elements vs #cores,
             * whether or not either are known.  We generate
             * O(sqrt(#elements)) splits, allowing O(sqrt(#cores))
             * potential speedup.
             */
            Iterator<? extends T> i;
            long s;
            if ((i = it) == null) {
                i = it = collection.iterator();
                s = est = (long) collection.size();
            }
            else
                s = est;
            if (s > 1 && i.hasNext()) {
                int n = batch + BATCH_UNIT;
                if (n > s)
                    n = (int) s;
                if (n > MAX_BATCH)
                    n = MAX_BATCH;
                Object[] a = new Object[n];
                int j = 0;
                do { a[j] = i.next(); } while (++j < n && i.hasNext());
                batch = j;
                if (est != Long.MAX_VALUE)
                    est -= j;
                return new ArraySpliterator<>(a, 0, j, characteristics);
            }
            return null;
        }
    

    还值得注意的是常量:
    static final int BATCH_UNIT = 1 << 10;  // batch array size increment
    static final int MAX_BATCH = 1 << 25;  // max batch array size;
    

    因此,在我使用6,000个元素的示例中,由于批次大小步长为1024,因此我只能得到三个批次。这恰好可以解释我的观察结果:最初使用了三个核心,然后减少为两个,然后在较小的批次完成时使用一个。同时,我尝试了一个包含60,000个元素的修改示例,然后获得了几乎100%的CPU利用率。

    为了解决我的问题,我开发了下面的代码,该代码使我可以将任何现有的流转换成一个流,其Spliterator#trySplit将其分成指定大小的批处理。从我的问题中将其用于用例的最简单方法是这样的:
    toFixedBatchStream(Files.newBufferedReader(inputPath).lines(), 20)
    

    在较低的级别上,下面的类是分隔器包装器,它更改包装的分隔器的trySplit行为,并使其他方面保持不变。
    import static java.util.Spliterators.spliterator;
    import static java.util.stream.StreamSupport.stream;
    
    import java.util.Comparator;
    import java.util.Spliterator;
    import java.util.function.Consumer;
    import java.util.stream.Stream;
    
    public class FixedBatchSpliteratorWrapper<T> implements Spliterator<T> {
      private final Spliterator<T> spliterator;
      private final int batchSize;
      private final int characteristics;
      private long est;
    
      public FixedBatchSpliteratorWrapper(Spliterator<T> toWrap, long est, int batchSize) {
        final int c = toWrap.characteristics();
        this.characteristics = (c & SIZED) != 0 ? c | SUBSIZED : c;
        this.spliterator = toWrap;
        this.est = est;
        this.batchSize = batchSize;
      }
      public FixedBatchSpliteratorWrapper(Spliterator<T> toWrap, int batchSize) {
        this(toWrap, toWrap.estimateSize(), batchSize);
      }
    
      public static <T> Stream<T> toFixedBatchStream(Stream<T> in, int batchSize) {
        return stream(new FixedBatchSpliteratorWrapper<>(in.spliterator(), batchSize), true);
      }
    
      @Override public Spliterator<T> trySplit() {
        final HoldingConsumer<T> holder = new HoldingConsumer<>();
        if (!spliterator.tryAdvance(holder)) return null;
        final Object[] a = new Object[batchSize];
        int j = 0;
        do a[j] = holder.value; while (++j < batchSize && tryAdvance(holder));
        if (est != Long.MAX_VALUE) est -= j;
        return spliterator(a, 0, j, characteristics());
      }
      @Override public boolean tryAdvance(Consumer<? super T> action) {
        return spliterator.tryAdvance(action);
      }
      @Override public void forEachRemaining(Consumer<? super T> action) {
        spliterator.forEachRemaining(action);
      }
      @Override public Comparator<? super T> getComparator() {
        if (hasCharacteristics(SORTED)) return null;
        throw new IllegalStateException();
      }
      @Override public long estimateSize() { return est; }
      @Override public int characteristics() { return characteristics; }
    
      static final class HoldingConsumer<T> implements Consumer<T> {
        Object value;
        @Override public void accept(T value) { this.value = value; }
      }
    }
    

    关于java - Reader#lines()由于其拆分器中不可配置的批量大小策略而严重并行化,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/22569040/

    10-12 04:24