我正在尝试计算pointwise mutual information(PMI)。

我在这里分别为p(x,y)和p(x)定义了两个RDD:

pii: RDD[((String, String), Double)]
 pi: RDD[(String, Double)]

我正在编写的用于从RDD的piipi计算PMI的任何代码都不是很漂亮。我的方法是首先对RDD的pii进行展平,并在对元组元素进行按摩的同时与pi进行两次连接。

val pmi = pii.map(x => (x._1._1, (x._1._2, x._1, x._2)))
             .join(pi).values
             .map(x => (x._1._1, (x._1._2, x._1._3, x._2)))
             .join(pi).values
             .map(x => (x._1._1, computePMI(x._1._2, x._1._3, x._2)))
// pmi: org.apache.spark.rdd.RDD[((String, String), Double)]
...
def computePMI(pab: Double, pa: Double, pb: Double) = {
  // handle boundary conditions, etc
  log(pab) - log(pa) - log(pb)
}

显然,这很糟糕。是否有更好的(惯用的)方法来做到这一点?
注意:我可以通过将日志问题存储在pipii中来优化日志,但是选择以这种方式编写以使问题更清楚。

最佳答案

使用broadcast将是一个解决方案。

val bcPi = pi.context.broadcast(pi.collectAsMap())
val pmi = pii.map {
  case ((x, y), pxy) =>
    (x, y) -> computePMI(pxy, bcPi.value.get(x).get, bcPi.value.get(y).get)
}

假设:pix中具有所有ypii

关于apache-spark - 在Spark中计算点向互信息,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/29620297/

10-11 07:11