本文介绍了在 Spark 中计算逐点互信息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试计算逐点互信息 (PMI).

I'm trying to compute pointwise mutual information (PMI).

这里分别为 p(x, y) 和 p(x) 定义了两个 RDD:

I have two RDDs as defined here for p(x, y) and p(x) respectively:

pii: RDD[((String, String), Double)]
 pi: RDD[(String, Double)]

我为从 RDD piipi 计算 PMI 而编写的任何代码都不漂亮.我的方法是首先压平 RDD pii 并在按摩元组元素的同时加入 pi 两次.

Any code I'm writing to compute PMI from the RDDs pii and pi is not pretty. My approach is first to flatten the RDD pii and join with pi twice while massaging the tuple elements.

val pmi = pii.map(x => (x._1._1, (x._1._2, x._1, x._2)))
             .join(pi).values
             .map(x => (x._1._1, (x._1._2, x._1._3, x._2)))
             .join(pi).values
             .map(x => (x._1._1, computePMI(x._1._2, x._1._3, x._2)))
// pmi: org.apache.spark.rdd.RDD[((String, String), Double)]
...
def computePMI(pab: Double, pa: Double, pb: Double) = {
  // handle boundary conditions, etc
  log(pab) - log(pa) - log(pb)
}

显然,这很糟糕.有没有更好的(惯用的)方法来做到这一点?注意:我可以通过将 log-probs 存储在 pipii 中来优化日志,但选择以这种方式编写以保持问题清晰.

Clearly, this sucks. Is there a better (idiomatic) way to do this?Note: I could optimize the logs by storing the log-probs in pi and pii but choosing to write this way to keep the question clear.

推荐答案

使用 broadcast 将是一个解决方案.

Using broadcast would be a solution.

val bcPi = pi.context.broadcast(pi.collectAsMap())
val pmi = pii.map {
  case ((x, y), pxy) =>
    (x, y) -> computePMI(pxy, bcPi.value.get(x).get, bcPi.value.get(y).get)
}

假设:pipii 中有所有的 xy.

Assume: pi has all x and y in pii.

这篇关于在 Spark 中计算逐点互信息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-18 04:35