我对scala/spark还不太熟悉,所以如果我的问题很简单,但我到处都找遍了,找不到答案,请原谅。
问题
我正试图提高一系列网络路由器观测(不同网络连接处可能的路由器类型的观测)的置信度。
我有一个类型NetblockObservation
将网络上看到的设备类型与相关的netblock和confidence结合起来。信心是指我们准确地识别出我们所看到的设备的信心。
case class NetblockObservation(
device_type: String
ip_start: Long,
ip_end: Long,
confidence: Double
)
如果置信度高于某个阈值
thresh
,则我希望该观测值位于返回的数据集中如果低于thresh
,则不应低于。此外,如果我有两个相同设备类型的观察结果,并且其中一个包含另一个,则集装箱的置信度应增加集装箱的置信度。
例子
假设我有3个Netblock观察
// 0.0.0.0/28
NetblockObservation(device_type: "x", ip_start: 0, ip_end: 15, confidence_score: .4)
// 0.0.0.0/29
NetblockObservation(device_type: "x", ip_start: 0, ip_end: 7, confidence_score: .4)
// 0.0.0.0/30
NetblockObservation(device_type: "x", ip_start: 0, ip_end: 3, confidence_score: .4)
在置信阈值为1的情况下,我希望单个输出
NetblockObservation(device_type: "x", ip_start: 0, ip_end: 4, confidence_score: 1.2)
说明:如果netblockobservation中包含相同的设备类型,我可以将它的置信度加在一起。
我被允许将
0.0.0.0/29
的置信度加上0.0.0.0/30
的置信度,因为它包含在其中。因为
0.0.0.0/30
不包含在0.0.0.0/29
中,所以我不允许将0.0.0.0/29
的置信分数添加到0.0.0.0/30
中。我的(可怜的)企图
失败原因:太慢/从未完成
我试图在学习scala/spark的同时实现它,所以我不确定是这个想法还是这个实现错了我认为它最终会成功,但一个小时后,它还没有在300000大小的数据集上完成(与生产规模相比很小),所以我放弃了它。
其思想是找到最大的netblock并将数据分为包含的netblock和不包含的netblock未包含的netblock递归地传递回同一个函数如果最大的netblock的置信度为1,则忽略整个包含的数据集,并将最大的添加到返回数据集如果confidence_score小于1,则将其confidence_score添加到包含的数据集中的所有内容中,然后递归地将该组传递回同一个函数。最后,你应该只剩下一个数据,它的置信度高于1该算法还存在不考虑设备类型的问题。
def handleDataset(largestInNetData: Option[NetblockObservation], netData: RDD[NetblockObservation]): RDD[NetblockObservation] = {
if (netData.isEmpty) spark.sparkContext.emptyRDD else largestInNetData match {
case Some(largest) =>
val grouped = netData.groupBy(item =>
if (item.ip_start >= largest.ip_start && item.ip_end <= largest.ip_end) largestInNetData
else None)
def lookup(k: Option[NetblockObservation]) = grouped.filter(_._1 == k).flatMap(_._2)
val nos = handleDataset(None, lookup(None))
// Threshold is assumed to be 1
val next = if (largest.confidence_score >= 1) spark.sparkContext.parallelize(Seq(largest)) else
handleDataset(None, lookup(largestInNetData)
.filter(x => x != largest)
.map(x => x.copy(confidence_score = x.confidence_score + largest.confidence_score)))
nos ++ next
case None =>
val largest = netData.reduce((a: NetblockObservation, b: NetblockObservation) => if ((a.ip_end - a.ip_start) > (b.ip_end - b.ip_start)) a else b)
handleDataset(Option(largest), netData)
}
}
最佳答案
这是一个相当复杂的代码位,因此下面是一个通用算法,我希望它能有所帮助:
暂时忘掉spark,编写一个scala函数,可能在NetblockObservation
的伴生对象中,它接受它们的集合并返回包含的集合的子集。你应该对这个函数进行单元测试,这也是纯Scala。
现在行动起来对你的groupBy
进行RDD[NetblockObservation]
操作,以device_type
为键,生成一个从String
到Iterable[NetblockObservation]
的映射。
筛选出地图中值为1且confidence
以下有thresh
的所有条目。
对于剩下的条目,将第一步中的函数应用到带有NetblockObservation
的mapValues
集合。
执行reduceByKey
或类似操作,只需将包含值的confidence_score
相加即可。
喝一杯清爽的饮料。
关于algorithm - 增强Apache Spark中重叠观察的置信度,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/43003433/