我正在寻找一种将RDD分为两个或多个RDD,并将获得的结果保存为两个单独的RDD的方法。例如:

rdd_test = sc.parallelize(range(50), 1)


我的代码:

def split_population_into_parts(rdd_test):

    N = 2
    repartionned_rdd = rdd_test.repartition(N).distinct()
    rdds_for_testab_populations = repartionned_rdd.glom()

    return rdds_for_testab_populations

rdds_for_testab_populations = split_population_into_parts(rdd_test)


这使 :

[[0,
  2,
  4,
  6,
  8,
  10
  12
  14
  16
  18岁
  20
  22
  24
  26,
  28,
  30岁
  32,
  34,
  36岁
  38,
  40岁
  42
  44
  46,
  48],
 [1,
  3,
  5
  7
  9,
  11
  13
  15
  17
  19
  21岁
  23
  25岁
  27
  29,
  31,
  33,
  35岁
  37,
  39,
  41,
  43
  45岁
  47,
  49]]

现在,我想将此处的每个列表关联到一个新的RDD。以RDD1和RDD2为例。该怎么办 ?

最佳答案

我得到了解决方案:

def get_testab_populations_tables(rdds_for_testab_populations):
i = 0
while i < len(rdds_for_testab_populations.collect()):
    for testab_table in rdds_for_testab_populations.toLocalIterator():
        namespace = globals()
        namespace['tAB_%d' % i] = sc.parallelize(testab_table)
        i += 1

return;


然后,您可以执行以下操作:

print tAB_0.collect()
print tAB_1.collect()
etc.

09-07 13:42