这是我的示例数据:

| rdd1  |
| ....  |
| 10    |
| 200   |
| 350   |
| 400   |
| 1000  |
| 1500  |
| ..... |



| rdd2  |
| label | features                 |
| ....  | .......................  |
|   0   | 1 10 30 100  200 450 600 |
|   0   | 200 300 400              |
|   1   | 200 350 450              |
|   1   | 400 600 700              |
|  .... | ........................ |



我要计算以下内容:对于rdd1的每个元素,了解如何
对于每个标签值,它多次出现在rdd2的功能中。一世
需要一个这样的元组(#of次出现,标记为0,#次
出现带有标签1),因此在上面的示例中,出现10次1
标签0和0乘以标签1乘以10,它将是(1,0)。 200出现
标签0两次,标签1一次,因此对于(2,1)
200

另外,我也想找出对于rdd1的每个元素找出
多少次它没有出现在rdd2的功能中
标签值。我需要一个这样的元组(#of次不显示
标签0,#次不带有标签1)。所以在上面
例如,对于10次我应该回来,它不会一次出现
标签,并使用标签1(1,2)两次。


我打算按键使用聚合。

val initialCount : collection.mutable.ListBuffer[Int] = ListBuffer(0, 0)
val addToCounts = (s: collection.mutable.ListBuffer[Int], label:Int) => if (label == 1) s(0) += 1 else s(1) += 1
val sumPartitionCounts = (p1: collection.mutable.ListBuffer[Int], p2: collection.mutable.ListBuffer[Int]) => ListBuffer((p1(0) + p2(0)),(p1(1) + p2(1)))


但是,我读到不允许在另一个rdd的map函数中访问rdd。关于如何解决此问题的任何想法都会很棒。

最佳答案

广播变量-如果您的rdd2足够小,则将其广播到每个节点并在rdd1.map中用作查找或
加入-加入键值rdds


您将必须重组rdd2以获得所需的密钥,以进行广播var查找或加入。如果rdd2是RDD [label,Array(feature)],我将尝试获取RDD [feature,label],如下所示:

    val rdd2Mapped: RDD[String,String] = rdd2.flatMap(x => x._2.map(y => (y,x._1)))


然后使用aggregateByKey创建RDD [功能,地图[标签,频率]]

    val initialMap = scala.collection.mutable.Map.empty[String, Int]
    val addToMap = (x: scala.collection.mutable.Map[String, Int], y: String) => {
        if(x.contains(y))
            x += ((y, x.get(y).get+1))
        else
            x += ((y, 1))
        }
    val mergeMaps = (x: scala.collection.mutable.Map[String, Int], y: scala.collection.mutable.Map[String, Int]) => {
        x ++= y
    }
    val rdd2Aggregated: RDD[String, scala.collection.mutable.Map[String,Int] =
      rdd2Mapped.aggregateByKey(initialMap)(addToMap, mergeMaps)


现在,广播rdd2Aggregated或将rdd1与rdd2Aggregated结合使用,并使用Map [label-> frequency]获得所需的结果。

对于问题的第二部分,以几乎类似的方式转换rdd2,但每个标签仅采用不同的功能

    val rdd2Mapped: RDD[String,String] = rdd2.flatMap(x => x._2.distinct.map(y => (y,x._1)))


像第一部分一样获得RDD [feature,Map [label,frequency]]。这将为您提供功能在rdd2中出现的次数。现在,获得编号。 rdd2中每个标签的行数(rdd2中标签上的简单wordcount)。您可以像以前一样将rdd1与这个新的rdd2Aggregated结合在一起,并进一步将生成的rdd与wordcount查找图(如果足够小,则广播wordcount查找图)连接。现在,对于每个功能,您都可以获得标签和频率的映射。从查找映射的相应标签计数中减去每个标签的频率,以获得所需的答案。

如果给定要素的Map [label,frequency]中不存在标签,请将该频率视为0。请确保考虑这种边缘情况。

关于scala - 在 map 中访问另一个rdd,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/35469857/

10-11 15:27