我编写了一个程序,该程序通过将数组分成相等的块并使用冒泡排序将单个线程排序来对具有多个线程的数组进行排序。然后,我使用了合并算法来合并两个数组。

我想将此程序与使用Streams对数组进行排序的程序进行比较。我的问题是,如果我将数组传递到流中,我将如何拆分,排序和合并以并行执行排序,但是要通过使用并行流而不是创建自己的线程/可运行对象等来进行。

有任何想法吗?

最佳答案

我认为您的问题纯粹是教育性的和实验性的,没有任何实际应用,因为在Java中有许多更有效的方法来对元素进行排序。如果要在此处使用Stream API,则可以创建一个在气泡合并器中执行冒泡排序的拆分器和一个在合并器中执行合并排序的收集器。

这是分离器:

static class BubbleSpliterator<T> implements Spliterator<T> {
    private final Comparator<? super T> cmp;
    private final Spliterator<T> source;
    private T[] data;
    private int offset;

    public BubbleSpliterator(Spliterator<T> source, Comparator<? super T> cmp) {
        this.source = source;
        this.cmp = cmp;
    }

    @SuppressWarnings("unchecked")
    private void init() {
        if (data != null)
            return;
        Stream.Builder<T> buf = Stream.builder();
        source.forEachRemaining(buf);
        data = (T[]) buf.build().toArray();
        bubble(data, cmp);
    }

    private static <T> void bubble(T[] data, Comparator<? super T> cmp) {
        for (int i = 0; i < data.length - 1; i++)
            for (int j = i + 1; j < data.length; j++) {
                if (cmp.compare(data[i], data[j]) > 0) {
                    T tmp = data[i];
                    data[i] = data[j];
                    data[j] = tmp;
                }
            }
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> action) {
        init();
        if (offset >= data.length)
            return false;
        action.accept(data[offset++]);
        return true;
    }

    @Override
    public void forEachRemaining(Consumer<? super T> action) {
        init();
        for (int i = offset; i < data.length; i++)
            action.accept(data[i]);
        offset = data.length;
    }

    @Override
    public Spliterator<T> trySplit() {
        if (data != null)
            return null;
        Spliterator<T> prefix = source.trySplit();
        return prefix == null ? null : new BubbleSpliterator<>(prefix, cmp);
    }

    @Override
    public long estimateSize() {
        if (data != null)
            return data.length - offset;
        return source.estimateSize();
    }

    @Override
    public int characteristics() {
        return source.characteristics();
    }

    public static <T> Stream<T> stream(Stream<T> source,
                                       Comparator<? super T> comparator) {
        Spliterator<T> spltr = source.spliterator();
        return StreamSupport.stream(new BubbleSpliterator<>(spltr, comparator),
               source.isParallel()).onClose(source::close);
    }
}

它使用源,委托拆分为源,但是当请求元素时,它将源元素转储到数组中并对其进行气泡排序。您可以这样检查:
int[] data = new Random(1).ints(100, 0, 1000).toArray();
Comparator<Integer> comparator = Comparator.naturalOrder();
List<Integer> list = BubbleSpliterator.stream(Arrays.stream(data).parallel().boxed(), comparator).collect(
    Collectors.toList());
System.out.println(list);

结果取决于您计算机上的硬件线程数量,可能看起来像这样:
[254, 313, 588, 847, 904, 985, 434, 473, 569, 606, 748, 978, 234, 262, 263, 317, 562, 592, 99, 189, 310,...]

在这里,您可以看到输出由几个排序的序列组成。此类序列的数量与Stream API创建的并行任务的数量相对应。

现在要通过合并排序合并排序的序列,您可以编写一个特殊的收集器,如下所示:
static <T> List<T> merge(List<T> l1, List<T> l2, Comparator<? super T> cmp) {
    List<T> result = new ArrayList<>(l1.size()+l2.size());
    int i=0, j=0;
    while(i < l1.size() && j < l2.size()) {
        if(cmp.compare(l1.get(i), l2.get(j)) <= 0) {
            result.add(l1.get(i++));
        } else {
            result.add(l2.get(j++));
        }
    }
    result.addAll(l1.subList(i, l1.size()));
    result.addAll(l2.subList(j, l2.size()));
    return result;
}

static <T> Collector<T, ?, List<T>> mergeSorting(Comparator<? super T> cmp) {
    return Collector.<T, List<T>> of(ArrayList::new, List::add,
                                     (l1, l2) -> merge(l1, l2, cmp));
}

在更多顺序中,它的工作方式类似于Collectors.toList(),但在并行处理中,假定两个输入列表均已排序,则执行合并排序。我的mergeSorting实现可能不是最优的,您可以编写更好的东西。

因此,要通过Stream API对所有内容进行排序,可以一起使用BubbleSpliteratormergeSorting收集器:
int[] data = new Random(1).ints(100, 0, 1000).toArray();
Comparator<Integer> comparator = Comparator.naturalOrder();
List<Integer> list = BubbleSpliterator.stream(Arrays.stream(data).parallel().boxed(), comparator).collect(
    mergeSorting(comparator));
System.out.println(list);

结果将被完全排序。

此实现多次执行不必要的输入数据复制,所以我猜想,自定义气泡+合并实现可能会在性能上胜过这一实现。

07-24 09:45
查看更多