我有下面的代码。我在python 2.7(cpython)中使用pyspark 1.2.1
for colname in shuffle_columns:
colrdd = hive_context.sql('select %s from %s' % (colname, temp_table))
# zip_with_random_index is expensive
colwidx = zip_with_random_index(colrdd).map(merge_index_on_row)
(hive_context.applySchema(colwidx, a_schema)
.registerTempTable(a_name))
关于此代码的问题是,它一次只能在一个列上运行。我的集群中有足够的节点,可以一次在许多列上进行操作。有没有办法做到这一点?如果我使用了线程之类的方法,该怎么办?我可以同时启动多个
registerTempTable
(以及相关的收集操作)吗? 最佳答案
不幸的是,下面的方法效果不佳。从某种意义上说,所有单个迭代都将执行。不幸的是,由于空指针异常,随后对hive_context
对象的调用失败。
使用concurrent.futures
是可能的:
from concurrent import futures
def make_col_temptable(colname):
colrdd = hive_context.sql('select %s from %s' % (colname, temp_table))
# zip_with_random_index is expensive
colwidx = zip_with_random_index(colrdd).map(merge_index_on_row)
(hive_context.applySchema(colwidx, a_schema)
.registerTempTable(a_name))
with futures.ThreadPoolExecutor(max_workers=20) as executor:
futures.wait([executor.submit(make_col_temptable, colname) for colname in shuffle_columns])
关于python - 改善Spark SQL中的并行性,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/32217242/