问题描述
在共组转换中,例如RDD1.cogroup(RDD2,...),我曾经假设Spark在以下情况下只会随机播放/移动RDD2并保留RDD1的分区和内存存储:
In a cogroup transformation, e.g. RDD1.cogroup(RDD2, ...), I used to assume that Spark only shuffles/moves RDD2 and retains RDD1's partitioning and in-memory storage if:
- RDD1有一个明确的分区程序
- RDD1已缓存.
在我的其他项目中,大多数改组行为似乎都与该假设一致.所以昨天我写了一个简短的scala程序来一劳永逸地证明它:
In my other projects most of the shuffling behaviour seems to be consistent with this assumption. So yesterday I wrote a short scala program to prove it once and for all:
// sc is the SparkContext
val rdd1 = sc.parallelize(1 to 10, 4).map(v => v->v)
.partitionBy(new HashPartitioner(4))
rdd1.persist().count()
val rdd2 = sc.parallelize(1 to 10, 4).map(v => (11-v)->v)
val cogrouped = rdd1.cogroup(rdd2).map {
v =>
v._2._1.head -> v._2._2.head
}
val zipped = cogrouped.zipPartitions(rdd1, rdd2) {
(itr1, itr2, itr3) =>
itr1.zipAll(itr2.map(_._2), 0->0, 0).zipAll(itr3.map(_._2), (0->0)->0, 0)
.map {
v =>
(v._1._1._1, v._1._1._2, v._1._2, v._2)
}
}
zipped.collect().foreach(println)
如果rdd1不移动,则压缩的第一列应与第三列具有相同的值,所以我运行了程序,哎呀:
If rdd1 doesn't move the first column of zipped should have the same value as the third column, so I ran the programs, oops:
(4,7,4,1)
(8,3,8,2)
(1,10,1,3)
(9,2,5,4)
(5,6,9,5)
(6,5,2,6)
(10,1,6,7)
(2,9,10,0)
(3,8,3,8)
(7,4,7,9)
(0,0,0,10)
该假设不正确. Spark可能进行了一些内部优化,并决定重新生成rdd1的分区比将其保留在缓存中要快得多.
The assumption is not true. Spark probably did some internal optimisation and decided that regenerating rdd1's partitions is much faster than keeping them in cache.
所以问题是:如果我的编程要求不移动RDD1(并使其保持高速缓存)是由于速度以外的其他原因(例如资源局部性),或者在某些情况下Spark内部优化不是可取的,是否有办法明确指示框架不要在所有类似cogroup的操作中移动操作数?这还包括联接,外部联接和groupWith.
So the question is: If my programmatic requirement to not move RDD1 (and keep it cached) is because of other reasons than speed (e.g. resource locality), or in some occasions Spark internal optimisation is not preferrable, is there a way to explicitly instruct the framework to not move an operand in all cogroup-like operations? This also include join, outer join, and groupWith.
非常感谢您的帮助.到目前为止,我使用广播联接作为一种不太可扩展的临时解决方案,它在崩溃集群之前不会持续很长时间.我期待一个与分布式计算原理一致的解决方案.
Thanks a lot for your help. So far I'm using broadcast join as a not-so-scalable makeshift solution, it is not going to last long before crashing my cluster. I'm expecting a solution consistent with the distributed computing principal.
推荐答案
这个假设是不正确的.创建CoGroupedRDD
不仅与改组有关,而且与生成匹配相应记录所需的内部结构有关.在内部Spark会使用自己的 ExternalAppendOnlyMap
,它使用自定义开放哈希表实现( AppendOnlyMap
),它不提供任何订购保证.
This assumption is just incorrect. Creating CoGroupedRDD
is not only about shuffle, but also about generating internal structures required for matching corresponding records. Internally Spark will use its own ExternalAppendOnlyMap
which uses custom open hash table implementation (AppendOnlyMap
) which doesn't provide any ordering guarantees.
如果您检查调试字符串:
If you check debug string:
zipped.toDebugString
(4) ZippedPartitionsRDD3[8] at zipPartitions at <console>:36 []
| MapPartitionsRDD[7] at map at <console>:31 []
| MapPartitionsRDD[6] at cogroup at <console>:31 []
| CoGroupedRDD[5] at cogroup at <console>:31 []
| ShuffledRDD[2] at partitionBy at <console>:27 []
| CachedPartitions: 4; MemorySize: 512.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
+-(4) MapPartitionsRDD[1] at map at <console>:26 []
| ParallelCollectionRDD[0] at parallelize at <console>:26 []
+-(4) MapPartitionsRDD[4] at map at <console>:29 []
| ParallelCollectionRDD[3] at parallelize at <console>:29 []
| ShuffledRDD[2] at partitionBy at <console>:27 []
| CachedPartitions: 4; MemorySize: 512.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
+-(4) MapPartitionsRDD[1]...
您将看到Spark确实使用CachedPartitions
来计算zipped
RDD
.如果您还跳过map
转换,从而删除了分区程序,则会看到coGroup
重用了rdd1
提供的分区程序:
you'll see that Spark indeed uses CachedPartitions
to compute zipped
RDD
. If you also skip map
transformations, which removes partitioner, you'll see that coGroup
reuses partitioner provided by rdd1
:
rdd1.cogroup(rdd2).partitioner == rdd1.partitioner
Boolean = true
这篇关于在Apache Spark协作组中,如何确保不移动大于2个操作数的1个RDD?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!