问题描述
我正在尝试有效地连接两个DataFrame,其中一个较大,而第二个较小.
I am trying to effectively join two DataFrames, one of which is large and the second is a bit smaller.
有没有办法避免所有这些改组?我不能设置autoBroadCastJoinThreshold
,因为它仅支持整数-并且我要广播的表略大于整数字节数.
Is there a way to avoid all this shuffling? I cannot set autoBroadCastJoinThreshold
, because it supports only Integers - and the table I am trying to broadcast is slightly bigger than integer number of bytes.
有没有办法强制广播忽略此变量?
推荐答案
广播哈希联接(类似于Mapreduce中的 地图侧联接 或地图侧组合):
在SparkSQL中,您可以通过调用queryExecution.executedPlan
看到正在执行的联接类型.与核心Spark一样,如果其中一个表比另一个小得多,则可能需要广播哈希联接.您可以通过在连接DataFrame
之前调用broadcast
上的方法broadcast
来提示Spark SQL要广播给定的DF,以进行连接
Broadcast Hash Joins (similar to map side join or map-side combine in Mapreduce) :
In SparkSQL you can see the type of join being performed by calling queryExecution.executedPlan
. As with core Spark, if one of the tables is much smaller than the other you may want a broadcast hash join. You can hint to Spark SQL that a given DF should be broadcast for join by calling method broadcast
on the DataFrame
before joining it
示例:largedataframe.join(broadcast(smalldataframe), "key")
正如我最喜欢的书(HPS)pls所述.请参阅下文,以更好地了解..
As described by my fav book (HPS) pls. see below to have better understanding..
注意:broadcast
以上是来自import org.apache.spark.sql.functions.broadcast
而不是来自SparkContext
Note : Above broadcast
is from import org.apache.spark.sql.functions.broadcast
not from SparkContext
Spark也会自动使用spark.sql.conf.autoBroadcastJoinThreshold
来确定是否应广播表格.
Spark also, automatically uses the spark.sql.conf.autoBroadcastJoinThreshold
to determine if a table should be broadcast.
def
explain(): Unit
Prints the physical plan to the console for debugging purposes.
sqlContext.sql("SET spark.sql.autoBroadcastJoinThreshold = -1")
注意:
sqlContext.sql("SET spark.sql.autoBroadcastJoinThreshold = -1")
NOTE :
Select /*+ MAPJOIN(b) */ a.key, a.value from a join b on a.key = b.key
hive> set hive.auto.convert.join=true;
hive> set hive.auto.convert.join.noconditionaltask.size=20971520
hive> set hive.auto.convert.join.noconditionaltask=true;
hive> set hive.auto.convert.join.use.nonstaged=true;
hive> set hive.mapjoin.smalltable.filesize = 30000000; // default 25 mb made it as 30mb
更多阅读:请参阅我的文章在BHJ,SHJ,SMJ上
Further Reading : Please refer my article on BHJ, SHJ, SMJ
这篇关于DataFrame联接优化-广播哈希联接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!