本文介绍了Java Stream 并行化的可视化的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

限时删除!!

通常不太清楚并行流如何准确地将输入拆分为块以及块的连接顺序.有没有办法可视化任何流源的整个过程以更好地了解发生了什么?假设我创建了一个这样的流:

Often it's not very clear how exactly the parallel stream splits the input into chunks and in which order the chunks are joined. Is there any way to visualize the whole procedure for any stream source to better understand what's going on? Suppose I created a stream like this:

Stream<Integer> stream = IntStream.range(0, 100).boxed().parallel();

我想看一些树状结构:

             [0..99]
         _____/   \_____
        |               |
     [0..49]         [50..99]
    __/   \__        __/  \__
   |         |      |        |
[0..24]  [25..49] [50..74] [75..99]

这意味着整个输入范围 [0..99] 被拆分为 [0..49][50..99] 范围又进一步分裂.当然这样的图应该反映Stream API的实际工作,所以如果我对这样的流执行一些实际操作,应该以相同的方式进行拆分.

Which means that the whole input range [0..99] is split to [0..49] and [50..99] ranges which in turn split further. Of course such diagram should reflect the real work of Stream API, so if I perform some real operation with such stream the splitting should be performed in the same way.

推荐答案

我想补充 Tagir 的精彩回答端或什至在中间操作(当前流 API 实现施加了一些限制)监控拆分的解决方案:

I want to augment Tagir’s great answer with a solution to monitor the splitting at the source side or even at intermediate operations (with some restrictions imposed by the current stream API implementation):

public static <E> Stream<E> proxy(Stream<E> src) {
    Class<Stream<E>> sClass=(Class)Stream.class;
    Class<Spliterator<E>> spClass=(Class)Spliterator.class;
    return proxy(src, sClass, spClass, StreamSupport::stream);
}
public static IntStream proxy(IntStream src) {
    return proxy(src, IntStream.class, Spliterator.OfInt.class, StreamSupport::intStream);
}
public static LongStream proxy(LongStream src) {
    return proxy(src, LongStream.class, Spliterator.OfLong.class, StreamSupport::longStream);
}
public static DoubleStream proxy(DoubleStream src) {
    return proxy(src, DoubleStream.class, Spliterator.OfDouble.class, StreamSupport::doubleStream);
}
static final Object EMPTY=new StringBuilder("empty");
static <E,S extends BaseStream<E,S>, Sp extends Spliterator<E>> S proxy(
        S src, Class<S> sc, Class<Sp> spc, BiFunction<Sp,Boolean,S> f) {

    final class Node<T> implements InvocationHandler,Runnable,
        Consumer<Object>, IntConsumer, LongConsumer, DoubleConsumer {
        final Class<? extends Spliterator> type;
        Spliterator<T> src;
        Object first=EMPTY, last=EMPTY;
        Node<T> left, right;
        Object currConsumer;
        public Node(Spliterator<T> src, Class<? extends Spliterator> type) {
            this.src = src;
            this.type=type;
        }
        private void value(Object t) {
            if(first==EMPTY) first=t;
            last=t;
        }
        public void accept(Object t) {
            value(t); ((Consumer)currConsumer).accept(t);
        }
        public void accept(int t) {
            value(t); ((IntConsumer)currConsumer).accept(t);
        }
        public void accept(long t) {
            value(t); ((LongConsumer)currConsumer).accept(t);
        }
        public void accept(double t) {
            value(t); ((DoubleConsumer)currConsumer).accept(t);
        }
        public void run() {
            System.out.println();
            finish().forEach(System.out::println);
        }
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            Node<T> curr=this; while(curr.right!=null) curr=curr.right;
            if(method.getName().equals("tryAdvance")||method.getName().equals("forEachRemaining")) {
                curr.currConsumer=args[0];
                args[0]=curr;
            }
            if(method.getName().equals("trySplit")) {
                Spliterator s=curr.src.trySplit();
                if(s==null) return null;
                Node<T> pfx=new Node<>(s, type);
                pfx.left=curr.left; curr.left=pfx;
                curr.right=new Node<>(curr.src, type);
                src=null;
                return pfx.create();
            }
            return method.invoke(curr.src, args);
        }
        Object create() {
            return Proxy.newProxyInstance(null, new Class<?>[]{type}, this);
        }
        String pad(String s, int left, int len) {
            if (len == s.length())
                return s;
            char[] result = new char[len];
            Arrays.fill(result, ' ');
            s.getChars(0, s.length(), result, left);
            return new String(result);
        }
        public List<String> finish() {
            String cur = toString();
            if (left == null) {
                return Collections.singletonList(cur);
            }
            List<String> l = left.finish();
            List<String> r = right.finish();
            int len1 = l.get(0).length();
            int len2 = r.get(0).length();
            int totalLen = len1 + len2 + 1;
            int leftAdd = 0;
            if (cur.length() < totalLen) {
                cur = pad(cur, (totalLen - cur.length()) / 2, totalLen);
            } else {
                leftAdd = (cur.length() - totalLen) / 2;
                totalLen = cur.length();
            }
            List<String> result = new ArrayList<>();
            result.add(cur);

            char[] dashes = new char[totalLen];
            Arrays.fill(dashes, ' ');
            Arrays.fill(dashes, len1 / 2 + leftAdd + 1, len1 + len2 / 2 + 1
                    + leftAdd, '_');
            int mid = totalLen / 2;
            dashes[mid] = '/';
            dashes[mid + 1] = '\';
            result.add(new String(dashes));

            Arrays.fill(dashes, ' ');
            dashes[len1 / 2 + leftAdd] = '|';
            dashes[len1 + len2 / 2 + 1 + leftAdd] = '|';
            result.add(new String(dashes));

            int maxSize = Math.max(l.size(), r.size());
            for (int i = 0; i < maxSize; i++) {
                String lstr = l.size() > i ? l.get(i) : String.format("%"
                        + len1 + "s", "");
                String rstr = r.size() > i ? r.get(i) : String.format("%"
                        + len2 + "s", "");
                result.add(pad(lstr + " " + rstr, leftAdd, totalLen));
            }
            return result;
        }
        private Object first() {
            if(left==null) return first;
            Object o=left.first();
            if(o==EMPTY) o=right.first();
            return o;
        }
        private Object last() {
            if(right==null) return last;
            Object o=right.last();
            if(o==EMPTY) o=left.last();
            return o;
        }
        public String toString() {
            Object o=first(), p=last();
            return o==EMPTY? "(empty)": "["+o+(o!=p? ".."+p+']': "]");
        }
    }
    Node<E> n=new Node<>(src.spliterator(), spc);
    Sp sp=(Sp)Proxy.newProxyInstance(null, new Class<?>[]{n.type}, n);
    return f.apply(sp, true).onClose(n);
}

它允许使用代理包装拆分器,该代理将监视拆分操作和遇到的对象.chunk 处理的逻辑和 Tagir 类似,其实我复制了他的结果打印例程.

It allows to wrap a spliterator with a proxy which will monitor the split operations and the encountered objects. The logic of the chunk handling is similar to Tagir’s, in fact, I copied his result printing routine(s).

您可以传入流的源或已附加相同操作的流.(在后一种情况下,您应该尽早将 .parallel() 应用于流).正如 Tagir 解释的那样,在大多数情况下,拆分行为取决于源和配置的并行性,因此,在大多数情况下,监控中间状态可能会更改值,但不会更改已处理的块:

You may pass in the source of the stream or a stream with same operations already appended. (In the latter case, you should apply .parallel() as early as possible to the stream). As Tagir explained, in most cases, the split behavior depends on the source and the configured parallelism, thus, in most cases, monitoring intermediate states may change the values, but not the processed chunks:

try(IntStream is=proxy(IntStream.range(0, 100).parallel())) {
    is.filter(i -> i/20%2==0)
      .mapToObj(ix->"""+ix+'"')
      .forEach(s->{});
}

将打印

                                                                  [0..99]
                                   ___________________________________/\________________________________
                                  |                                                                     |
                              [0..49]                                                               [50..99]
                 _________________/\______________                                     _________________/\________________
                |                                 |                                   |                                   |
            [0..24]                           [25..49]                            [50..74]                            [75..99]
        ________/\_____                   ________/\_______                   ________/\_______                   ________/\_______
       |               |                 |                 |                 |                 |                 |                 |
   [0..11]         [12..24]          [25..36]          [37..49]          [50..61]          [62..74]          [75..86]          [87..99]
    ___/\_          ___/\___          ___/\___          ___/\___          ___/\___          ___/\___          ___/\___          ___/\___
   |      |        |        |        |        |        |        |        |        |        |        |        |        |        |        |
[0..5] [6..11] [12..17] [18..24] [25..30] [31..36] [37..42] [43..49] [50..55] [56..61] [62..67] [68..74] [75..80] [81..86] [87..92] [93..99]

try(Stream<String> s=proxy(IntStream.range(0, 100).parallel().filter(i -> i/20%2==0)
      .mapToObj(ix->"""+ix+'"'))) {
    s.forEach(str->{});
}

将打印

                                                                                   ["0".."99"]
                                              ___________________________________________/\___________________________________________
                                             |                                                                                        |
                                       ["0".."49"]                                                                              ["50".."99"]
                         ____________________/\______________________                                           ______________________/\___________________
                        |                                            |                                         |                                           |
                  ["0".."19"]                                  ["40".."49"]                              ["50".."59"]                                ["80".."99"]
            ____________/\_________                      ____________/\______                           _______/\___________                   ____________/\________
           |                       |                    |                    |                         |                    |                 |                      |
     ["0".."11"]             ["12".."19"]            (empty)           ["40".."49"]              ["50".."59"]            (empty)        ["80".."86"]           ["87".."99"]
      _____/\___              _____/\_____           ___/\__            _____/\_____              _____/\_____           ___/\__         _____/\__              _____/\_____
     |          |            |            |         |       |          |            |            |            |         |       |       |         |            |            |
["0".."5"] ["6".."11"] ["12".."17"] ["18".."19"] (empty) (empty) ["40".."42"] ["43".."49"] ["50".."55"] ["56".."59"] (empty) (empty) ["80"] ["81".."86"] ["87".."92"] ["93".."99"]

正如我们在这里看到的,我们正在监控 .filter(…).mapToObj(…) 的结果,但是块是由源明确确定的,可能会在下游产生空块,具体取决于过滤器的条件.

As we can see here, we are monitoring the result of .filter(…).mapToObj(…) but the chunks are clearly determined by the source, possibly producing empty chunks down-stream depending on the filter’s condition.

请注意,我们可以将源监控与 Tagir 的收集器监控结合起来:

Note that we can combine the source monitoring with Tagir’s collector monitoring:

try(IntStream s=proxy(IntStream.range(0, 100))) {
    s.parallel().filter(i -> i/20%2==0)
     .boxed().collect(parallelVisualize())
     .forEach(System.out::println);
}

这将打印(注意 collect 输出首先打印):

This will print (note that the collect output is printed first):

                                                              [0..99]
                                  ________________________________/\_______________________________
                                 |                                                                 |
                             [0..49]                                                           [50..99]
                 ________________/\______________                                   _______________/\_______________
                |                                |                                 |                                |
            [0..19]                          [40..49]                          [50..59]                         [80..99]
        ________/\_____                  ________/\______                   _______/\_______                ________/\_____
       |               |                |                |                 |                |              |               |
   [0..11]         [12..19]          (empty)         [40..49]          [50..59]          (empty)       [80..86]        [87..99]
    ___/\_          ___/\___         ___/\__          ___/\___          ___/\___         ___/\__        ___/\_          ___/\___
   |      |        |        |       |       |        |        |        |        |       |       |      |      |        |        |
[0..5] [6..11] [12..17] [18..19] (empty) (empty) [40..42] [43..49] [50..55] [56..59] (empty) (empty) [80] [81..86] [87..92] [93..99]

                                                                  [0..99]
                                   ___________________________________/\________________________________
                                  |                                                                     |
                              [0..49]                                                               [50..99]
                 _________________/\______________                                     _________________/\________________
                |                                 |                                   |                                   |
            [0..24]                           [25..49]                            [50..74]                            [75..99]
        ________/\_____                   ________/\_______                   ________/\_______                   ________/\_______
       |               |                 |                 |                 |                 |                 |                 |
   [0..11]         [12..24]          [25..36]          [37..49]          [50..61]          [62..74]          [75..86]          [87..99]
    ___/\_          ___/\___          ___/\___          ___/\___          ___/\___          ___/\___          ___/\___          ___/\___
   |      |        |        |        |        |        |        |        |        |        |        |        |        |        |        |
[0..5] [6..11] [12..17] [18..24] [25..30] [31..36] [37..42] [43..49] [50..55] [56..61] [62..67] [68..74] [75..80] [81..86] [87..92] [93..99]

我们可以清楚地看到处理的块是如何匹配的,但是过滤后,有些块的元素较少,有些则完全是空的.

We can clearly see how the chunks of the processing match, but after the filtering, some chunks have less elements, some of them are entirely empty.

这是演示的地方,两种监控方式可以产生显着差异:

This is the place to demonstrate, where the two ways of monitoring can make a significant difference:

try(DoubleStream is=proxy(DoubleStream.iterate(0, i->i+1)).parallel().limit(100)) {
    is.boxed()
      .collect(parallelVisualize())
      .forEach(System.out::println);
}
                                                                                                [0.0..99.0]
                                                   ___________________________________________________/\________________________________________________
                                                  |                                                                                                     |
                                            [0.0..49.0]                                                                                           [50.0..99.0]
                         _________________________/\______________________                                                     _________________________/\________________________
                        |                                                 |                                                   |                                                   |
                  [0.0..24.0]                                       [25.0..49.0]                                        [50.0..74.0]                                        [75.0..99.0]
            ____________/\_________                           ____________/\___________                           ____________/\___________                           ____________/\___________
           |                       |                         |                         |                         |                         |                         |                         |
     [0.0..11.0]             [12.0..24.0]              [25.0..36.0]              [37.0..49.0]              [50.0..61.0]              [62.0..74.0]              [75.0..86.0]              [87.0..99.0]
      _____/\___              _____/\_____              _____/\_____              _____/\_____              _____/\_____              _____/\_____              _____/\_____              _____/\_____
     |          |            |            |            |            |            |            |            |            |            |            |            |            |            |            |
[0.0..5.0] [6.0..11.0] [12.0..17.0] [18.0..24.0] [25.0..30.0] [31.0..36.0] [37.0..42.0] [43.0..49.0] [50.0..55.0] [56.0..61.0] [62.0..67.0] [68.0..74.0] [75.0..80.0] [81.0..86.0] [87.0..92.0] [93.0..99.0]

                             [0.0..10239.0]
       _____________________________/\_____
      |                                    |
[0.0..1023.0]                      [1024.0..10239.0]
                       ____________________/\_______
                      |                             |
              [1024.0..3071.0]             [3072.0..10239.0]
                                        ____________/\______
                                       |                    |
                               [3072.0..6143.0]     [6144.0..10239.0]
                                                         ___/\_______
                                                        |            |
                                                [6144.0..10239.0] (empty)

这证明了Tagir 已经解释过的内容,未知大小的流分割很差,甚至limit(...) 提供了一个很好的估计的可能性(实际上,infinite + limit 理论上是可以预测的),实现中没有利用它.

This demonstrates what Tagir already explained, streams with an unknown size split poorly, and even the fact the limit(…) provides the possibility for a good estimate (in fact, infinite + limit is theoretically predictable), the implementation does not take any advantage of it.

使用 1024 的批量大小将源拆分为块,每次拆分后增加 1024,创建超出 limit 施加的范围的块.我们还可以看到每次如何拆分前缀.

The source is split into chunks utilizing a batch size of 1024, increased by 1024 after each split, creating chunks way outside the range imposed by limit. We can also see how a prefix is split off each time.

但是当我们查看终端拆分输出时,我们可以看到中间这些多余的块已被删除,并且第一个块发生了另一次拆分.由于这个块是由在第一次拆分时由默认实现填充的中间数组作为后端的,我们在源代码中没有注意到它,但我们可以在终端操作中看到该数组已被拆分(不出所料)平衡良好.

But when we look at the terminal split output, we can see that in-between these excess chunks have been dropped and another splitting of the first chunk has happened. Since this chunk is backend by an intermediate array that has been filled by the default implementation on the first split, we don’t notice it at the source but we can see at the terminal action that this array has been split (unsurprisingly) well balanced.

所以我们需要两种监控方式才能在此处获得全貌……

So we need both ways of monitoring to get the full picture here…

这篇关于Java Stream 并行化的可视化的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

1403页,肝出来的..

09-06 11:49