您可以在此处查看实现:
https://github.com/apache/spark/blob/ffa05c84fe75663fc33f3d954d1cb1e084ab3280/python/pyspark/rdd.py#L804
它与“普通” reduce
函数有何不同?depth = 2
是什么意思?
我不希望reducer函数在分区上线性传递,
但首先减少每个可用的对,然后像这样迭代直到我只有一对,并将其减少到1,如图所示:
treeReduce
可以实现吗?
最佳答案
标准reduce
正在使用该函数的包装版本,并将其用于 mapPartitions
。之后,将收集结果并在驱动程序上添加reduced locally。如果分区的数量很大和/或您使用的功能很昂贵,则这会在一台计算机上造成很大的负担。treeReduce
的第一阶段与上面的阶段几乎相同,但是在此之后,部分结果被并行合并,并且仅在驱动程序上执行了最终聚合。depth
是suggested depth of the tree,并且由于树中节点的深度定义为根与节点之间的边缘数,因此尽管在某些情况下它看起来像分布式聚合can be stopped early,但您应该为您提供或多或少的期望模式。
值得注意的是,您使用treeReduce
获得的不是二叉树。分区的数量在每个级别上进行调整,并且很可能一次合并两个以上的分区。
与标准的reduce相比,基于树的版本performs reduceByKey
with each iteration意味着大量的数据改组。如果分区的数量相对较小,则使用纯reduce
会便宜得多。如果您怀疑reduce
的最后阶段是瓶颈tree*
版本,则值得尝试。