是否有任何方法(或任何计划)可以将Spark分布式集合(RDD
,Dataframe
或Dataset
)直接转换为Broadcast
变量而无需collect
?公用API似乎没有“开箱即用”的功能,但是可以在较低级别上做些什么吗?
我可以想象这些操作有2倍的加速潜力(或更多?)。为了详细解释我的意思,我们来看一个例子:
val myUberMap: Broadcast[Map[String, String]] =
sc.broadcast(myStringPairRdd.collect().toMap)
someOtherRdd.map(someCodeUsingTheUberMap)
这导致所有数据都收集到驱动程序,然后广播数据。这意味着数据实际上是通过网络发送两次的。
很好的是这样的:
val myUberMap: Broadcast[Map[String, String]] =
myStringPairRdd.toBroadcast((a: Array[(String, String)]) => a.toMap)
someOtherRdd.map(someCodeUsingTheUberMap)
在这里,Spark可以完全绕开收集数据,而只是在节点之间移动数据。
奖励
此外,对于
combineByKey
或.toMap
上的任何操作都很昂贵的情况,但可能可以并行完成,可以使用类Monoid的API(有点像Array[T]
)。例如。构造某些Trie结构可能会很昂贵,这种功能可能会导致算法设计的范围很大。该CPU事件也可以在IO也正在运行时运行-在当前广播机制处于阻塞状态时(即所有IO,然后是所有CPU,然后是所有IO)。澄清
在这里加入不是(主要)用例,可以假设我很少使用广播的数据结构。例如,
someOtherRdd
中的键绝不会覆盖myUberMap
中的键,但在遍历someOtherRdd
并假设我多次使用myUberMap
之前,我不知道需要哪些键。我知道听起来似乎有些含糊,但是重点是针对更通用的机器学习算法设计。
最佳答案
尽管从理论上讲这是一个有趣的想法,但我将争辩说,尽管从理论上讲它是可能的,但在实际应用中却非常有限。显然,我不能代表PMC,所以我不能说是否有任何计划实现这种广播机制。
可能的实现:
由于Spark已经提供了torrent broadcasting机制,其行为描述如下:
应该有可能将相同的机制重用于直接的节点到节点广播。
值得注意的是,这种方法不能完全消除驾驶员的沟通。即使可以在本地创建块,您仍然需要一个真实的来源来宣传一组要获取的块。
有限的应用程序
广播变量的一个问题是价格昂贵。即使可以消除驱动程序瓶颈,仍然存在两个问题:
第一个问题应该相对明显。它不仅与直接内存使用有关,而且还与GC成本及其对整体延迟的影响有关。第二个是相当微妙的。我在对Why my BroadcastHashJoin is slower than ShuffledHashJoin in Spark的回答中部分地涵盖了这一点,但让我们进一步讨论一下。
从网络流量的角度来看,广播整个数据集几乎等同于创建笛卡尔积。因此,如果数据集足够大而导致驱动程序成为瓶颈,那么它不太可能成为广播的良好候选者,并且在实践中可以优先选择像哈希联接这样的有针对性的方法。
替代:
与上面直接列举的广播和解决的问题相比,有一些方法可用于实现类似的结果,包括:
关于scala - 如何在不收集的情况下将RDD,Dataframe或Dataset直接转换为Broadcast变量?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/38329738/