我正在上Scala Spark的Coursera课程,并且正在尝试优化此代码段:

val indexedMeansG = vectors.
       map(v => findClosest(v, means) -> v).
       groupByKey.mapValues(averageVectors)
vectorsRDD[(Int, Int)],以便查看依赖项列表和我使用的RDD的沿袭:
println(s"""GroupBy:
             | Deps: ${indexedMeansG.dependencies.size}
             | Deps: ${indexedMeansG.dependencies}
             | Lineage: ${indexedMeansG.toDebugString}""".stripMargin)

显示如下:
/* GroupBy:
   * Deps: 1
   * Deps: List(org.apache.spark.OneToOneDependency@44d1924)
   * Lineage: (6) MapPartitionsRDD[18] at mapValues at StackOverflow.scala:207 []
   *  ShuffledRDD[17] at groupByKey at StackOverflow.scala:207 []
   * +-(6) MapPartitionsRDD[16] at map at StackOverflow.scala:206 []
   *  MapPartitionsRDD[13] at map at StackOverflow.scala:139 []
   *      CachedPartitions: 6; MemorySize: 84.0 MB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
   *  MapPartitionsRDD[12] at values at StackOverflow.scala:116 []
   *  MapPartitionsRDD[11] at mapValues at StackOverflow.scala:115 []
   *  MapPartitionsRDD[10] at groupByKey at StackOverflow.scala:92 []
   *  MapPartitionsRDD[9] at join at StackOverflow.scala:91 []
   *  MapPartitionsRDD[8] at join at StackOverflow.scala:91 []
   *  CoGroupedRDD[7] at join at StackOverflow.scala:91 []
   *    +-(6) MapPartitionsRDD[4] at map at StackOverflow.scala:88 []
   *  |  MapPartitionsRDD[3] at filter at StackOverflow.scala:88 []
   *  |  MapPartitionsRDD[2] at map at StackOverflow.scala:69 []
   *  |  src/main/resources/stackoverflow/stackoverflow.csv MapPartitionsRDD[1] at textFile at StackOverflow.scala:23 []
   *  |  src/main/resources/stackoverflow/stackoverflow.csv HadoopRDD[0] at textFile at StackOverflow.scala:23 []
   *    +-(6) MapPartitionsRDD[6] at map at StackOverflow.scala:89 []
   *  MapPartitionsRDD[5] at filter at StackOverflow.scala:89 []
   *  MapPartitionsRDD[2] at map at StackOverflow.scala:69 []
   *  src/main/resources/stackoverflow/stackoverflow.csv MapPartitionsRDD[1] at textFile at StackOverflow.scala:23 []
   *  src/main/resources/stackoverflow/stackoverflow.csv HadoopRDD[0] at textFile at StackOverflow.scala:23 [] */

我从这个List(org.apache.spark.OneToOneDependency@44d1924)推断出没有改组,对吗?但是,在ShuffledRDD[17]下面打印,这意味着实际上存在改组。

我试图用groupByKey替换该reduceByKey调用,如下所示:
val indexedMeansR = vectors.
      map(v => findClosest(v, means) -> v).
      reduceByKey((a, b) => (a._1 + b._1) / 2 -> (a._2 + b._2) / 2)

其依赖关系和血统是:
/* ReduceBy:
   * Deps: 1
   * Deps: List(org.apache.spark.ShuffleDependency@4d5e813f)
   * Lineage: (6) ShuffledRDD[17] at reduceByKey at StackOverflow.scala:211 []
   * +-(6) MapPartitionsRDD[16] at map at StackOverflow.scala:210 []
   *  MapPartitionsRDD[13] at map at StackOverflow.scala:139 []
   *      CachedPartitions: 6; MemorySize: 84.0 MB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
   *  MapPartitionsRDD[12] at values at StackOverflow.scala:116 []
   *  MapPartitionsRDD[11] at mapValues at StackOverflow.scala:115 []
   *  MapPartitionsRDD[10] at groupByKey at StackOverflow.scala:92 []
   *  MapPartitionsRDD[9] at join at StackOverflow.scala:91 []
   *  MapPartitionsRDD[8] at join at StackOverflow.scala:91 []
   *  CoGroupedRDD[7] at join at StackOverflow.scala:91 []
   *    +-(6) MapPartitionsRDD[4] at map at StackOverflow.scala:88 []
   *  |  MapPartitionsRDD[3] at filter at StackOverflow.scala:88 []
   *  |  MapPartitionsRDD[2] at map at StackOverflow.scala:69 []
   *  |  src/main/resources/stackoverflow/stackoverflow.csv MapPartitionsRDD[1] at textFile at StackOverflow.scala:23 []
   *  |  src/main/resources/stackoverflow/stackoverflow.csv HadoopRDD[0] at textFile at StackOverflow.scala:23 []
   *    +-(6) MapPartitionsRDD[6] at map at StackOverflow.scala:89 []
   *  MapPartitionsRDD[5] at filter at StackOverflow.scala:89 []
   *  MapPartitionsRDD[2] at map at StackOverflow.scala:69 []
   *  src/main/resources/stackoverflow/stackoverflow.csv MapPartitionsRDD[1] at textFile at StackOverflow.scala:23 []
   *  src/main/resources/stackoverflow/stackoverflow.csv HadoopRDD[0] at textFile at StackOverflow.scala:23 [] */

这次,依赖项是ShuffleDependency,我不明白为什么。

由于RDD是一对,键是Ints ,因此具有顺序,因此我也尝试修改了分区程序并使用了RangePartitioner,但它也没有改善

最佳答案

reduceByKey操作仍然涉及随机播放,因为仍然需要确保具有相同键的所有项都成为同一分区的一部分。

但是,这将比groupByKey操作小得多。 reduceByKey将在改组之前在每个分区内执行缩减操作,从而减少了要改组的数据量。

关于scala - 避免在Spark中使用ReduceByKey改组,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/45524677/

10-10 05:54