我有两个LabeledPoints-Prediction1Prediction2。这两个LabeledPoints都有一个值作为第一个元素,并有一个预测作为第二个元素。我想检查first中的Prediction1元素是否等于first中的Prediction2元素。所以像这样:

for each value in Prediction1 and Prediction2:
     if Prediction1.tup[0] != Prediction2.tup[0]:
         print 'Value unequal'
         break


例:

假设以下是RDD LabeledPointsPrediction1

[(1,2),(3,4),(5,6)]


Prediction2

[(1,12),(3,13),(5,2)]


在以上示例中,LabeledPoint(1,3,5)的每个Prediction1的第一元素等于LabeledPoint(1,3,5)的每个Prediction2的第一元素。但是,即使其中之一不匹配,我也要退出流程并打印不匹配并结束。

如何在PySpark中做到这一点

最佳答案

假设两个RDD具有相同数量的分区和每个分区的元素,则可以简单地ziptake

prediction1 = sc.parallelize([(1, 2), (3, 4), (5, 6)])
prediction2 = sc.parallelize([(1, 12), (3, 13), (5, 2)])
prediction3 = sc.parallelize([(1, 0), (5, 0), (5, 0)])

def mismatch(rdd1, rdd2):
    def mismatch_(xy):
        (x1, _), (y1, _) = xy
        return x1 != y1
    return bool(rdd1.zip(rdd2).filter(mismatch_).take(1))

mismatch(prediction1, prediction2)
## False
mismatch(prediction1, prediction3)
## True


由于take是惰性的,因此应按您的预期或多或少地工作。见Lazy foreach on a Spark RDD

如果不满足初始条件,则可以通过组合zip,swap(zipWithIndex)和lambda kv: (kv[1], kv[0])手动join

关于python - 如何在PySpark中比较两个LabeledPoints?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/34936591/

10-10 11:49