我正在使用Spark Scala计算数据帧行之间的余弦相似度。

数据框架构如下:

root
    |-- itemId: string (nullable = true)
    |-- features: vector (nullable = true)


下面的数据框示例

    +-------+--------------------+
    | itemId|            features|
    +-------+--------------------+
    | ab    |[4.7143,0.0,5.785...|
    | cd    |[5.5,0.0,6.4286,4...|
    | ef    |[4.7143,1.4286,6....|
    ........
    +-------+--------------------+


计算余弦相似度的代码:

val irm = new IndexedRowMatrix(myDataframe.rdd.zipWithIndex().map {
      case (row, index) => IndexedRow(row.getAs[Vector]("features"), index)
}).toCoordinateMatrix.transpose.toRowMatrix.columnSimilarities


在irm矩阵中,我具有(i,j,score),其中i,j是项目i的索引,而j是我的原始数据帧的索引。
我想通过将这个irm与初始数据帧结合在一起或者是否有更好的选择来获得(itemIdA,itemIdB,分数),其中itemIdA和itemIdB分别是索引i和j的ID。

最佳答案

在将数据帧转换为矩阵之前创建一个行索引,并在索引和ID之间创建映射。计算后,使用创建的Map将列索引(以前是行索引,但已用transpose更改)转换为id。

val rdd = myDataframe.as[(String, org.apache.spark.mllib.linalg.Vector)].rdd.zipWithIndex()
val indexMap = rdd.map{case ((id, vec), index) => (index, id)}.collectAsMap()


使用之前计算余弦相似度:

val irm = new IndexedRowMatrix(rdd.map{case ((id, vec), index) => IndexedRow(index, vec)})
  .toCoordinateMatrix().transpose().toRowMatrix().columnSimilarities()


将列索引转换回id:

irm.entries.map(e => (indexMap(e.i), indexMap(e.j), e.value))


这应该给您您想要的东西。

10-06 14:59