是否有任何方法(或任何计划)可以将Spark分布式集合(RDDDataframeDataset)直接转换为Broadcast变量而无需collect?公用API似乎没有“开箱即用”的功能,但是可以在较低级别上做些什么吗?

我可以想象这些操作有2倍的加速潜力(或更多?)。为了详细解释我的意思,我们来看一个例子:

val myUberMap: Broadcast[Map[String, String]] =
  sc.broadcast(myStringPairRdd.collect().toMap)

someOtherRdd.map(someCodeUsingTheUberMap)

这导致所有数据都收集到驱动程序,然后广播数据。这意味着数据实际上是通过网络发送两次的。

很好的是这样的:
val myUberMap: Broadcast[Map[String, String]] =
  myStringPairRdd.toBroadcast((a: Array[(String, String)]) => a.toMap)

someOtherRdd.map(someCodeUsingTheUberMap)

在这里,Spark可以完全绕开收集数据,而只是在节点之间移动数据。

奖励

此外,对于combineByKey.toMap上的任何操作都很昂贵的情况,但可能可以并行完成,可以使用类Monoid的API(有点像Array[T])。例如。构造某些Trie结构可能会很昂贵,这种功能可能会导致算法设计的范围很大。该CPU事件也可以在IO也正在运行时运行-在当前广播机制处于阻塞状态时(即所有IO,然后是所有CPU,然后是所有IO)。

澄清

在这里加入不是(主要)用例,可以假设我很少使用广播的数据结构。例如,someOtherRdd中的键绝不会覆盖myUberMap中的键,但在遍历someOtherRdd并假设我多次使用myUberMap之前,我不知道需要哪些键。

我知道听起来似乎有些含糊,但是重点是针对更通用的机器学习算法设计。

最佳答案

尽管从理论上讲这是一个有趣的想法,但我将争辩说,尽管从理论上讲它是可能的,但在实际应用中却非常有限。显然,我不能代表PMC,所以我不能说是否有任何计划实现这种广播机制。

可能的实现:

由于Spark已经提供了torrent broadcasting机制,其行为描述如下:



应该有可能将相同的机制重用于直接的节点到节点广播。

值得注意的是,这种方法不能完全消除驾驶员的沟通。即使可以在本地创建块,您仍然需要一个真实的来源来宣传一组要获取的块。

有限的应用程序

广播变量的一个问题是价格昂贵。即使可以消除驱动程序瓶颈,仍然存在两个问题:

  • 在每个执行程序上存储反序列化对象所需的内存。
  • 将广播数据传输给每个执行者的成本。

  • 第一个问题应该相对明显。它不仅与直接内存使用有关,而且还与GC成本及其对整体延迟的影响有关。第二个是相当微妙的。我在对Why my BroadcastHashJoin is slower than ShuffledHashJoin in Spark的回答中部分地涵盖了这一点,但让我们进一步讨论一下。

    从网络流量的角度来看,广播整个数据集几乎等同于创建笛卡尔积。因此,如果数据集足够大而导致驱动程序成为瓶颈,那么它不太可能成为广播的良好候选者,并且在实践中可以优先选择像哈希联接这样的有针对性的方法。

    替代:

    与上面直接列举的广播和解决的问题相比,有一些方法可用于实现类似的结果,包括:
  • 通过分布式文件系统传递数据。
  • 使用与工作节点并置的复制数据库。
  • 关于scala - 如何在不收集的情况下将RDD,Dataframe或Dataset直接转换为Broadcast变量?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/38329738/

    10-12 18:41