我在PySpark中有两个RDDs:

RDD1:

[(u'2013-01-31 00:00:00', u'a', u'Pab', u'abc', u'd'),(u'2013-01-31 00:00:00', u'a', u'ab', u'abc', u'g'),.....]

RDD2:
[(u'41',u'42.0'),(u'24',u'98.0'),....]

两个RDDs具有相同的数字或行。现在我想做的是从RDD1(从unicode转换为普通string)的每一行中获取所有列,并从RDD2的每行第二列(从unicode string转换为float转换)中获取第二列,并以此形成新的RDD。因此,新的RDD将如下所示:

RDD3:
[('2013-01-31 00:00:00', 'a', 'Pab', 'abc', 'd',42.0),('2013-01-31 00:00:00', 'a', 'ab', u'abc', 'g',98.0),.....]

一旦完成,然后我想通过第一列中的aggregation值在此新的RDD3中执行每行最后一个值的date(浮点值)。表示date2013-01-31 00:00:00的所有行,应添加其最后一个数字值。

如何在PySpark中做到这一点?

最佳答案

您需要zipWithIndex您的RDDs,此方法将使用您的数据和另一个表示该条目索引的值创建一个元组,因此您可以通过RDDs将这两个index连接起来。

您的方法应类似于(我敢肯定有更有效的方法):

rdd1 = sc.parallelize([u"A", u"B", u"C", u"A", u"Z"])
rdd2 = sc.parallelize(xrange(5))

zdd1 = rdd1.zipWithIndex().map(lambda (v, k): (k, v))
zdd2 = rdd2.zipWithIndex().map(lambda (v, k): (k, v))

print zdd1.join(zdd2).collect()

输出将是:[(0, (u'A', 0)), (4, (u'Z', 4)), (1, (u'B', 1)), (2, (u'C', 2)), (3, (u'A', 3))],之后只需要一个map即可重组数据。例如。以下:
combinedRDD = zdd1.join(zdd2).map(lambda (k, v): v)
print combinedRDD.collect()

# You can use the .zip method combinedRDD = rdd1.zip(rdd2)

输出将是:[(u'A', 0), (u'Z', 4), (u'B', 1), (u'C', 2), (u'A', 3)]
关于数据类型转换,我之前遇到过这个问题,为了解决这个问题,我使用了this snippet
import unicodedata

convert = lambda (v1, v2): (unicodedata.normalize('NFKD', v1)
                                       .encode('ascii','ignore'), v2)

combinedRDD = combinedRDD.map(convert)
print combinedRDD.collect()

将输出:[('A', 0), ('Z', 4), ('B', 1), ('C', 2), ('A', 3)]

关于python - 如何在单个RDD中添加2个RDD的列,然后基于PySpark中的日期数据进行行聚合,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/34144648/

10-12 18:44