问题描述
我正在尝试计算逐点相互信息(PMI).
I'm trying to compute pointwise mutual information (PMI).
我在这里分别为p(x,y)和p(x)定义了两个RDD:
I have two RDDs as defined here for p(x, y) and p(x) respectively:
pii: RDD[((String, String), Double)]
pi: RDD[(String, Double)]
我编写的用于从RDD pii
和pi
计算PMI的任何代码都不是很漂亮.我的方法是首先对RDD pii
进行展平,并在按摩元组元素时与pi
联接两次.
Any code I'm writing to compute PMI from the RDDs pii
and pi
is not pretty. My approach is first to flatten the RDD pii
and join with pi
twice while massaging the tuple elements.
val pmi = pii.map(x => (x._1._1, (x._1._2, x._1, x._2)))
.join(pi).values
.map(x => (x._1._1, (x._1._2, x._1._3, x._2)))
.join(pi).values
.map(x => (x._1._1, computePMI(x._1._2, x._1._3, x._2)))
// pmi: org.apache.spark.rdd.RDD[((String, String), Double)]
...
def computePMI(pab: Double, pa: Double, pb: Double) = {
// handle boundary conditions, etc
log(pab) - log(pa) - log(pb)
}
很显然,这很糟糕.是否有更好的(惯用的)方法来做到这一点?注意:我可以通过将日志问题存储在pi
和pii
中来优化日志,但是选择以这种方式编写以使问题更清楚.
Clearly, this sucks. Is there a better (idiomatic) way to do this?Note: I could optimize the logs by storing the log-probs in pi
and pii
but choosing to write this way to keep the question clear.
推荐答案
使用broadcast
是一种解决方案.
val bcPi = pi.context.broadcast(pi.collectAsMap())
val pmi = pii.map {
case ((x, y), pxy) =>
(x, y) -> computePMI(pxy, bcPi.value.get(x).get, bcPi.value.get(y).get)
}
假定:pi
在pii
中具有所有x
和y
.
Assume: pi
has all x
and y
in pii
.
这篇关于在Spark中计算点向互信息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!