问题描述
我正在尝试计算逐点互信息 (PMI).
I'm trying to compute pointwise mutual information (PMI).
这里分别为 p(x, y) 和 p(x) 定义了两个 RDD:
I have two RDDs as defined here for p(x, y) and p(x) respectively:
pii: RDD[((String, String), Double)]
pi: RDD[(String, Double)]
我为从 RDD pii
和 pi
计算 PMI 而编写的任何代码都不漂亮.我的方法是首先压平 RDD pii
并在按摩元组元素的同时加入 pi
两次.
Any code I'm writing to compute PMI from the RDDs pii
and pi
is not pretty. My approach is first to flatten the RDD pii
and join with pi
twice while massaging the tuple elements.
val pmi = pii.map(x => (x._1._1, (x._1._2, x._1, x._2)))
.join(pi).values
.map(x => (x._1._1, (x._1._2, x._1._3, x._2)))
.join(pi).values
.map(x => (x._1._1, computePMI(x._1._2, x._1._3, x._2)))
// pmi: org.apache.spark.rdd.RDD[((String, String), Double)]
...
def computePMI(pab: Double, pa: Double, pb: Double) = {
// handle boundary conditions, etc
log(pab) - log(pa) - log(pb)
}
显然,这很糟糕.有没有更好的(惯用的)方法来做到这一点?注意:我可以通过将 log-probs 存储在 pi
和 pii
中来优化日志,但选择以这种方式编写以保持问题清晰.
Clearly, this sucks. Is there a better (idiomatic) way to do this?Note: I could optimize the logs by storing the log-probs in pi
and pii
but choosing to write this way to keep the question clear.
推荐答案
使用 broadcast
将是一个解决方案.
Using broadcast
would be a solution.
val bcPi = pi.context.broadcast(pi.collectAsMap())
val pmi = pii.map {
case ((x, y), pxy) =>
(x, y) -> computePMI(pxy, bcPi.value.get(x).get, bcPi.value.get(y).get)
}
假设:pi
在 pii
中有所有的 x
和 y
.
Assume: pi
has all x
and y
in pii
.
这篇关于在 Spark 中计算逐点互信息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!