在当前早期发行的名为High Performance Spark的教科书中,Spark的开发人员指出:
但是,教科书缺少使用mapPartitions
或该方法的类似变体的良好示例。网上几乎没有很好的代码示例,其中大多数是Scala。例如,我们看到此Scala代码使用的是zerot323在How to add columns into org.apache.spark.sql.Row inside of mapPartitions上编写的mapPartitions
。
def transformRows(iter: Iterator[Row]): Iterator[Row] = iter.map(transformRow)
sqlContext.createDataFrame(df.rdd.mapPartitions(transformRows), newSchema).show
不幸的是,Java没有为迭代器提供像
iter.map(...)
一样好的功能。因此,这就引出了一个问题:如何将mapPartitions
的迭代器到迭代器的转换有效地使用,而又不将RDD
完全作为列表散播到磁盘上?JavaRDD<OutObj> collection = prevCollection.mapPartitions((Iterator<InObj> iter) -> {
ArrayList<OutObj> out = new ArrayList<>();
while(iter.hasNext()) {
InObj current = iter.next();
out.add(someChange(current));
}
return out.iterator();
});
这似乎是在Java示例中使用
mapPartitions
的通用语法,但是我不认为这是最有效的,假设您有一个带有成千上万条记录的JavaRDD
(甚至更多...因为Spark是大数据)。您最终将得到迭代器中所有对象的列表,只是将其转换为迭代器(这意味着某种map函数在此效率会更高)。请注意:尽管使用
mapPartitions
的这8行代码可以用map
或flatMap
编写为1行,但我故意使用mapPartitions
来利用它对每个分区而不是RDD
中的每个元素进行操作的事实。有什么想法吗?
最佳答案
防止强制整个分区“物化”的一种方法是将Iterator
转换为Stream,然后使用Stream
的功能API(例如map
函数)。
How to convert an iterator to a stream?建议了一些将Iterator
转换为Stream
的好方法,因此采用此处建议的选项之一,我们最终可能会得出:
rdd.mapPartitions((Iterator<InObj> iter) -> {
Iterable<InObj> iterable = () -> iter;
return StreamSupport.stream(iterable.spliterator(), false)
.map(s -> transformRow(s)) // or whatever transformation
.iterator();
});
这应该是“从迭代器到迭代器”的转换,因为所有使用的中间API(
Iterable
,Stream
)都是惰性计算的。编辑:我自己尚未对其进行测试,但是OP对此进行了评论,并引用“在列表上使用Stream不会提高效率”。我不知道为什么会这样,而且我不知道这总体上是否正确,但值得一提。
关于java - Apache Spark : Effectively using mapPartitions in Java,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/42539315/