这是来自here的后续问题。我正在尝试基于此implementation实现k-means。效果很好,但是我想用groupByKey()替换reduceByKey(),但是我不确定如何(我现在不担心性能)。以下是相关的简化代码:

val data = sc.textFile("dense.txt").map(
        t => (t.split("#")(0), parseVector(t.split("#")(1)))).cache()

val read_mean_centroids = sc.textFile("centroids.txt").map(
        t => (t.split("#")(0), parseVector(t.split("#")(1))))
var centroids = read_mean_centroids.takeSample(false, K, 42).map(x => x._2)
do {
    var closest = read_mean_centroids.map(p => (closestPoint(p._2, centroids), p._2))
    var pointsGroup = closest.groupByKey() // <-- THE VICTIM :)
    var newCentroids = pointsGroup.mapValues(ps => average(ps.toSeq)).collectAsMap()
    ..


注意,println(newCentroids)将给出:


  映射(23->(-6.269305E-4,-0.0011746404,-4.08004E-5),8->(-5.108732E-4,7.336348E-4,-3.707591E-4),17->(-0.0016383086 ,-0.0016974678,1.45 ..


println(closest)


  在kmeans.scala:75处的MapPartitionsRDD [6]


相关问题:Using reduceByKey in Apache Spark (Scala)



一些documentation


  def reduceByKey(func:(V,V)⇒V):RDD [(K,V)]


使用关联的归约函数合并每个键的值。


  def reduceByKey(func:(V,V)⇒V,numPartitions:Int):RDD [(K,V)]


使用关联的归约函数合并每个键的值。


  def reduceByKey(partitioner:分区程序,func:(V,V)⇒V):RDD [(K,V)]


使用关联的归约函数合并每个键的值。


  def groupByKey():RDD [(K,Iterable [V])]


将RDD中每个键的值分组为单个序列。

最佳答案

您可以使用aggregateByKey()(比reduceByKey()更自然)来计算newCentroids



val newCentroids = closest.aggregateByKey((Vector.zeros(dim), 0L))(
  (agg, v) => (agg._1 += v, agg._2 + 1L),
  (agg1, agg2) => (agg1._1 += agg2._1, agg1._2 + agg2._2)
).mapValues(agg => agg._1/agg._2).collectAsMap


为此,您将需要计算数据的维数,即dim,但是您只需执行一次即可。您可能会使用类似val dim = data.first._2.length的名称。

08-28 05:11