本文介绍了PySpark 和广播连接示例的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我使用的是 Spark 1.3
I am using Spark 1.3
# Read from text file, parse it and then do some basic filtering to get data1
data1.registerTempTable('data1')
# Read from text file, parse it and then do some basic filtering to get data1
data2.registerTempTable('data2')
# Perform join
data_joined = data1.join(data2, data1.id == data2.id);
我的数据非常倾斜,data2(几KB)<<
My data is quite skewed and data2 (few KB) << data1 (10s of GB) and the performance is quite bad. I was reading about broadcast join, but not sure how I can do the same using Python API.
推荐答案
Spark 1.3 不支持使用 DataFrame 的广播连接.在 Spark >= 1.5.0 中,您可以使用 broadcast
函数来应用广播连接:
Spark 1.3 doesn't support broadcast joins using DataFrame. In Spark >= 1.5.0 you can use broadcast
function to apply broadcast joins:
from pyspark.sql.functions import broadcast
data1.join(broadcast(data2), data1.id == data2.id)
对于旧版本,唯一的选择是转换为 RDD 并应用与其他语言相同的逻辑.大概是这样的:
For older versions the only option is to convert to RDD and apply the same logic as in other languages. Roughly something like this:
from pyspark.sql import Row
from pyspark.sql.types import StructType
# Create a dictionary where keys are join keys
# and values are lists of rows
data2_bd = sc.broadcast(
data2.map(lambda r: (r.id, r)).groupByKey().collectAsMap())
# Define a new row with fields from both DFs
output_row = Row(*data1.columns + data2.columns)
# And an output schema
output_schema = StructType(data1.schema.fields + data2.schema.fields)
# Given row x, extract a list of corresponding rows from broadcast
# and output a list of merged rows
def gen_rows(x):
return [output_row(*x + y) for y in data2_bd.value.get(x.id, [])]
# flatMap and create a new data frame
joined = data1.rdd.flatMap(lambda row: gen_rows(row)).toDF(output_schema)
这篇关于PySpark 和广播连接示例的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!