我有两个RDD-RDD1和RDD2,其结构如下:

RDD1:

[(u'abc', 1.0), (u'cde', 1.0),....]


RDD2:

[3.0, 0.0,....]


现在,我要形成第三个RDD,该值来自上述两个RDD的每个索引的值。因此,以上输出应变为:

RDD3:

[(u'abc', 1.0,3.0), (u'cde', 1.0,0.0),....]


如您所见,RDD2中的值已添加到RDD1的元组中。我怎样才能做到这一点?我尝试做RDD3 = RDD1.map(lambda x:x).zip(RDD2),但它会生成此输出-[((u'abc', 1.0),3.0), ((u'cde', 1.0),0.0),....]这不是我想要的,因为您可以看到()将RDD1和RDD2的值分开。

注意:我的RDD1是使用-RDD1 = data.map(lambda x:(x[0])).zip(val)形成的

最佳答案

您可以在压缩后简单地重塑数据:

rdd1 = sc.parallelize([(u'abc', 1.0), (u'cde', 1.0)])
rdd2 = sc.parallelize([3.0, 0.0])

rdd1.zip(rdd2).map(lambda t: (t[0][0], t[0][1], t[1]))


在Python 2中,可以使用:

rdd1.zip(rdd2).map(lambda ((x1, x2), y): (x1, x2, y))


但Python 3不再支持它。

如果您要使用索引来提取更多值,可能会很乏味

lambda t: (t[0][0], t[0][1], t[0][2], ..., t[1]))


因此,您可以尝试执行以下操作:

lambda t: tuple(list(t[0]) + [t[1]])


或实施一个更复杂的解决方案,例如:Flatten (an irregular) list of lists

08-24 14:25