在当前早期发行的名为High Performance Spark的教科书中,Spark的开发人员指出:



但是,教科书缺少使用mapPartitions或该方法的类似变体的良好示例。网上几乎没有很好的代码示例,其中大多数是Scala。例如,我们看到此Scala代码使用的是zerot323在How to add columns into org.apache.spark.sql.Row inside of mapPartitions上编写的mapPartitions

def transformRows(iter: Iterator[Row]): Iterator[Row] = iter.map(transformRow)
sqlContext.createDataFrame(df.rdd.mapPartitions(transformRows), newSchema).show

不幸的是,Java没有为迭代器提供像iter.map(...)一样好的功能。因此,这就引出了一个问题:如何将mapPartitions的迭代器到迭代器的转换有效地使用,而又不将RDD完全作为列表散播到磁盘上?
JavaRDD<OutObj> collection = prevCollection.mapPartitions((Iterator<InObj> iter) -> {
    ArrayList<OutObj> out = new ArrayList<>();
    while(iter.hasNext()) {
        InObj current = iter.next();
        out.add(someChange(current));
    }
    return out.iterator();
});

这似乎是在Java示例中使用mapPartitions的通用语法,但是我不认为这是最有效的,假设您有一个带有成千上万条记录的JavaRDD(甚至更多...因为Spark是大数据)。您最终将得到迭代器中所有对象的列表,只是将其转换为迭代器(这意味着某种map函数在此效率会更高)。

请注意:尽管使用mapPartitions的这8行代码可以用mapflatMap编写为1行,但我故意使用mapPartitions来利用它对每个分区而不是RDD中的每个元素进行操作的事实。

有什么想法吗?

最佳答案

防止强制整个分区“物化”的一种方法是将Iterator转换为Stream,然后使用Stream的功能API(例如map函数)。

How to convert an iterator to a stream?建议了一些将Iterator转换为Stream的好方法,因此采用此处建议的选项之一,我们最终可能会得出:

rdd.mapPartitions((Iterator<InObj> iter) -> {
    Iterable<InObj> iterable = () -> iter;
    return StreamSupport.stream(iterable.spliterator(), false)
            .map(s -> transformRow(s)) // or whatever transformation
            .iterator();
});

这应该是“从迭代器到迭代器”的转换,因为所有使用的中间API(IterableStream)都是惰性计算的。

编辑:我自己尚未对其进行测试,但是OP对此进行了评论,并引用“在列表上使用Stream不会提高效率”。我不知道为什么会这样,而且我不知道这总体上是否正确,但值得一提。

关于java - Apache Spark : Effectively using mapPartitions in Java,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/42539315/

10-09 06:07