我对scala/spark还不太熟悉,所以如果我的问题很简单,但我到处都找遍了,找不到答案,请原谅。
问题
我正试图提高一系列网络路由器观测(不同网络连接处可能的路由器类型的观测)的置信度。
我有一个类型NetblockObservation将网络上看到的设备类型与相关的netblock和confidence结合起来。信心是指我们准确地识别出我们所看到的设备的信心。

case class NetblockObservation(
                      device_type: String
                      ip_start: Long,
                      ip_end: Long,
                      confidence: Double
                    )

如果置信度高于某个阈值thresh,则我希望该观测值位于返回的数据集中如果低于thresh,则不应低于。
此外,如果我有两个相同设备类型的观察结果,并且其中一个包含另一个,则集装箱的置信度应增加集装箱的置信度。
例子
假设我有3个Netblock观察
// 0.0.0.0/28
NetblockObservation(device_type: "x", ip_start: 0, ip_end: 15, confidence_score: .4)
// 0.0.0.0/29
NetblockObservation(device_type: "x", ip_start: 0, ip_end: 7, confidence_score: .4)
// 0.0.0.0/30
NetblockObservation(device_type: "x", ip_start: 0, ip_end: 3, confidence_score: .4)

在置信阈值为1的情况下,我希望单个输出NetblockObservation(device_type: "x", ip_start: 0, ip_end: 4, confidence_score: 1.2)
说明:如果netblockobservation中包含相同的设备类型,我可以将它的置信度加在一起。
我被允许将0.0.0.0/29的置信度加上0.0.0.0/30的置信度,因为它包含在其中。
因为0.0.0.0/30不包含在0.0.0.0/29中,所以我不允许将0.0.0.0/29的置信分数添加到0.0.0.0/30中。
我的(可怜的)企图
失败原因:太慢/从未完成
我试图在学习scala/spark的同时实现它,所以我不确定是这个想法还是这个实现错了我认为它最终会成功,但一个小时后,它还没有在300000大小的数据集上完成(与生产规模相比很小),所以我放弃了它。
其思想是找到最大的netblock并将数据分为包含的netblock和不包含的netblock未包含的netblock递归地传递回同一个函数如果最大的netblock的置信度为1,则忽略整个包含的数据集,并将最大的添加到返回数据集如果confidence_score小于1,则将其confidence_score添加到包含的数据集中的所有内容中,然后递归地将该组传递回同一个函数。最后,你应该只剩下一个数据,它的置信度高于1该算法还存在不考虑设备类型的问题。
def handleDataset(largestInNetData: Option[NetblockObservation], netData: RDD[NetblockObservation]): RDD[NetblockObservation] = {
  if (netData.isEmpty) spark.sparkContext.emptyRDD else largestInNetData match {
    case Some(largest) =>
      val grouped = netData.groupBy(item =>
        if (item.ip_start >= largest.ip_start && item.ip_end <= largest.ip_end) largestInNetData
        else None)

      def lookup(k: Option[NetblockObservation]) = grouped.filter(_._1 == k).flatMap(_._2)

      val nos = handleDataset(None, lookup(None))
      // Threshold is assumed to be 1
      val next = if (largest.confidence_score >= 1) spark.sparkContext.parallelize(Seq(largest)) else
        handleDataset(None, lookup(largestInNetData)
          .filter(x => x != largest)
          .map(x => x.copy(confidence_score = x.confidence_score + largest.confidence_score)))
      nos ++ next
    case None =>
      val largest = netData.reduce((a: NetblockObservation, b: NetblockObservation) => if ((a.ip_end - a.ip_start) > (b.ip_end - b.ip_start)) a else b)
      handleDataset(Option(largest), netData)
  }
}

最佳答案

这是一个相当复杂的代码位,因此下面是一个通用算法,我希望它能有所帮助:
暂时忘掉spark,编写一个scala函数,可能在NetblockObservation的伴生对象中,它接受它们的集合并返回包含的集合的子集。你应该对这个函数进行单元测试,这也是纯Scala。
现在行动起来对你的groupBy进行RDD[NetblockObservation]操作,以device_type为键,生成一个从StringIterable[NetblockObservation]的映射。
筛选出地图中值为1且confidence以下有thresh的所有条目。
对于剩下的条目,将第一步中的函数应用到带有NetblockObservationmapValues集合。
执行reduceByKey或类似操作,只需将包含值的confidence_score相加即可。
喝一杯清爽的饮料。

关于algorithm - 增强Apache Spark中重叠观察的置信度,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/43003433/

10-11 07:51