我在PySpark中有两个RDDs
:
RDD1:
[(u'2013-01-31 00:00:00', u'a', u'Pab', u'abc', u'd'),(u'2013-01-31 00:00:00', u'a', u'ab', u'abc', u'g'),.....]
RDD2:
[(u'41',u'42.0'),(u'24',u'98.0'),....]
两个
RDDs
具有相同的数字或行。现在我想做的是从RDD1(从unicode
转换为普通string
)的每一行中获取所有列,并从RDD2的每行第二列(从unicode string
转换为float
转换)中获取第二列,并以此形成新的RDD。因此,新的RDD将如下所示:RDD3:
[('2013-01-31 00:00:00', 'a', 'Pab', 'abc', 'd',42.0),('2013-01-31 00:00:00', 'a', 'ab', u'abc', 'g',98.0),.....]
一旦完成,然后我想通过第一列中的
aggregation
值在此新的RDD3
中执行每行最后一个值的date
(浮点值)。表示date
为2013-01-31 00:00:00
的所有行,应添加其最后一个数字值。如何在PySpark中做到这一点?
最佳答案
您需要zipWithIndex您的RDDs
,此方法将使用您的数据和另一个表示该条目索引的值创建一个元组,因此您可以通过RDDs
将这两个index
连接起来。
您的方法应类似于(我敢肯定有更有效的方法):
rdd1 = sc.parallelize([u"A", u"B", u"C", u"A", u"Z"])
rdd2 = sc.parallelize(xrange(5))
zdd1 = rdd1.zipWithIndex().map(lambda (v, k): (k, v))
zdd2 = rdd2.zipWithIndex().map(lambda (v, k): (k, v))
print zdd1.join(zdd2).collect()
输出将是:
[(0, (u'A', 0)), (4, (u'Z', 4)), (1, (u'B', 1)), (2, (u'C', 2)), (3, (u'A', 3))]
,之后只需要一个map
即可重组数据。例如。以下:combinedRDD = zdd1.join(zdd2).map(lambda (k, v): v)
print combinedRDD.collect()
# You can use the .zip method combinedRDD = rdd1.zip(rdd2)
输出将是:
[(u'A', 0), (u'Z', 4), (u'B', 1), (u'C', 2), (u'A', 3)]
关于数据类型转换,我之前遇到过这个问题,为了解决这个问题,我使用了this snippet。
import unicodedata
convert = lambda (v1, v2): (unicodedata.normalize('NFKD', v1)
.encode('ascii','ignore'), v2)
combinedRDD = combinedRDD.map(convert)
print combinedRDD.collect()
将输出:
[('A', 0), ('Z', 4), ('B', 1), ('C', 2), ('A', 3)]
关于python - 如何在单个RDD中添加2个RDD的列,然后基于PySpark中的日期数据进行行聚合,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/34144648/