我有下面的代码。我在python 2.7(cpython)中使用pyspark 1.2.1

for colname in shuffle_columns:
    colrdd = hive_context.sql('select %s from %s' % (colname, temp_table))
    # zip_with_random_index is expensive
    colwidx = zip_with_random_index(colrdd).map(merge_index_on_row)
    (hive_context.applySchema(colwidx, a_schema)
        .registerTempTable(a_name))


关于此代码的问题是,它一次只能在一个列上运行。我的集群中有足够的节点,可以一次在许多列上进行操作。有没有办法做到这一点?如果我使用了线程之类的方法,该怎么办?我可以同时启动多个registerTempTable(以及相关的收集操作)吗?

最佳答案

不幸的是,下面的方法效果不佳。从某种意义上说,所有单个迭代都将执行。不幸的是,由于空指针异常,随后对hive_context对象的调用失败。



使用concurrent.futures是可能的:

from concurrent import futures

def make_col_temptable(colname):
    colrdd = hive_context.sql('select %s from %s' % (colname, temp_table))
    # zip_with_random_index is expensive
    colwidx = zip_with_random_index(colrdd).map(merge_index_on_row)
    (hive_context.applySchema(colwidx, a_schema)
        .registerTempTable(a_name))

with futures.ThreadPoolExecutor(max_workers=20) as executor:
    futures.wait([executor.submit(make_col_temptable, colname) for colname in shuffle_columns])

关于python - 改善Spark SQL中的并行性,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/32217242/

10-10 19:45