我正在使用Spark Scala计算数据帧行之间的余弦相似度。
数据框架构如下:
root
|-- itemId: string (nullable = true)
|-- features: vector (nullable = true)
下面的数据框示例
+-------+--------------------+
| itemId| features|
+-------+--------------------+
| ab |[4.7143,0.0,5.785...|
| cd |[5.5,0.0,6.4286,4...|
| ef |[4.7143,1.4286,6....|
........
+-------+--------------------+
计算余弦相似度的代码:
val irm = new IndexedRowMatrix(myDataframe.rdd.zipWithIndex().map {
case (row, index) => IndexedRow(row.getAs[Vector]("features"), index)
}).toCoordinateMatrix.transpose.toRowMatrix.columnSimilarities
在irm矩阵中,我具有(i,j,score),其中i,j是项目i的索引,而j是我的原始数据帧的索引。
我想通过将这个irm与初始数据帧结合在一起或者是否有更好的选择来获得(itemIdA,itemIdB,分数),其中itemIdA和itemIdB分别是索引i和j的ID。
最佳答案
在将数据帧转换为矩阵之前创建一个行索引,并在索引和ID之间创建映射。计算后,使用创建的Map
将列索引(以前是行索引,但已用transpose
更改)转换为id。
val rdd = myDataframe.as[(String, org.apache.spark.mllib.linalg.Vector)].rdd.zipWithIndex()
val indexMap = rdd.map{case ((id, vec), index) => (index, id)}.collectAsMap()
使用之前计算余弦相似度:
val irm = new IndexedRowMatrix(rdd.map{case ((id, vec), index) => IndexedRow(index, vec)})
.toCoordinateMatrix().transpose().toRowMatrix().columnSimilarities()
将列索引转换回id:
irm.entries.map(e => (indexMap(e.i), indexMap(e.j), e.value))
这应该给您您想要的东西。