本文介绍了在Spark中计算点向互信息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试计算逐点相互信息(PMI).

I'm trying to compute pointwise mutual information (PMI).

我在这里分别为p(x,y)和p(x)定义了两个RDD:

I have two RDDs as defined here for p(x, y) and p(x) respectively:

pii: RDD[((String, String), Double)]
 pi: RDD[(String, Double)]

我编写的用于从RDD piipi计算PMI的任何代码都不是很漂亮.我的方法是首先对RDD pii进行展平,并在按摩元组元素时与pi联接两次.

Any code I'm writing to compute PMI from the RDDs pii and pi is not pretty. My approach is first to flatten the RDD pii and join with pi twice while massaging the tuple elements.

val pmi = pii.map(x => (x._1._1, (x._1._2, x._1, x._2)))
             .join(pi).values
             .map(x => (x._1._1, (x._1._2, x._1._3, x._2)))
             .join(pi).values
             .map(x => (x._1._1, computePMI(x._1._2, x._1._3, x._2)))
// pmi: org.apache.spark.rdd.RDD[((String, String), Double)]
...
def computePMI(pab: Double, pa: Double, pb: Double) = {
  // handle boundary conditions, etc
  log(pab) - log(pa) - log(pb)
}

很显然,这很糟糕.是否有更好的(惯用的)方法来做到这一点?注意:我可以通过将日志问题存储在pipii中来优化日志,但是选择以这种方式编写以使问题更清楚.

Clearly, this sucks. Is there a better (idiomatic) way to do this?Note: I could optimize the logs by storing the log-probs in pi and pii but choosing to write this way to keep the question clear.

推荐答案

使用broadcast是一种解决方案.

val bcPi = pi.context.broadcast(pi.collectAsMap())
val pmi = pii.map {
  case ((x, y), pxy) =>
    (x, y) -> computePMI(pxy, bcPi.value.get(x).get, bcPi.value.get(y).get)
}

假定:pipii中具有所有xy.

Assume: pi has all x and y in pii.

这篇关于在Spark中计算点向互信息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-18 04:35