我正在寻找一种将RDD分为两个或多个RDD,并将获得的结果保存为两个单独的RDD的方法。例如:
rdd_test = sc.parallelize(range(50), 1)
我的代码:
def split_population_into_parts(rdd_test):
N = 2
repartionned_rdd = rdd_test.repartition(N).distinct()
rdds_for_testab_populations = repartionned_rdd.glom()
return rdds_for_testab_populations
rdds_for_testab_populations = split_population_into_parts(rdd_test)
这使 :
[[0,
2,
4,
6,
8,
10
12
14
16
18岁
20
22
24
26,
28,
30岁
32,
34,
36岁
38,
40岁
42
44
46,
48],
[1,
3,
5
7
9,
11
13
15
17
19
21岁
23
25岁
27
29,
31,
33,
35岁
37,
39,
41,
43
45岁
47,
49]]
现在,我想将此处的每个列表关联到一个新的RDD。以RDD1和RDD2为例。该怎么办 ?
最佳答案
我得到了解决方案:
def get_testab_populations_tables(rdds_for_testab_populations):
i = 0
while i < len(rdds_for_testab_populations.collect()):
for testab_table in rdds_for_testab_populations.toLocalIterator():
namespace = globals()
namespace['tAB_%d' % i] = sc.parallelize(testab_table)
i += 1
return;
然后,您可以执行以下操作:
print tAB_0.collect()
print tAB_1.collect()
etc.