为了避免计算所有可能的组合,我试图根据某个键对值进行分组,然后计算每个键的值的笛卡尔积,即:

Input [(k1, v1), (k1, v2), (k2, v3)]

所需的输出:[(v1, v1), (v1, v2), (v2, v2), (v2, v1), (v3, v3)]这是我尝试执行的代码:
val input = sc.textFile('data.csv')
val rdd = input.map(s=>s.split(","))
               .map(s => (s(1).toString,  s(2).toString))
val group_result:RDD[String, Iterable[String]] = rdd.groupByKey()
group_result.flatMap { t =>
{
  val stream1= t._2.toStream
  val stream2= t._2.toStream

  stream1.flatMap { src =>
    stream2.par.map { trg =>
            src + "," + trg
    }
  }
}
}

这对于很小的文件来说效果很好,但是当list(Iterable)的长度约为1000时,计算将完全冻结。

最佳答案

正如@ zero323所说,解决此问题的最佳方法是使用PairRDDFunctions的方法join,但是要实现这一点,您需要拥有一个PairedRDD,可以通过使用RDD的方法keyBy来获得。

您可以执行以下操作:

val rdd = sc.parallelize(Array(("k1", "v1"), ("k1", "v2"), ("k2", "v3"))).keyBy(_._1)

val result = rdd.join(rdd).map{
  case (key: String, (x: Tuple2[String, String], y: Tuple2[String, String])) => (x._2, y._2)
}

result.take(20)
// res9: Array[(String, String)] = Array((v1,v1), (v1,v2), (v2,v1), (v2,v2), (v3, v3))

Here我与代码共享笔记本。

关于scala - 使用Spark中的键优化笛卡尔积,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/38676654/

10-16 01:46