问题描述
我正在尝试有效地连接两个 DataFrame,其中一个较大,第二个较小.
有没有办法避免所有这些洗牌?我无法设置 autoBroadCastJoinThreshold
,因为它只支持整数 - 我试图广播的表比整数字节数略大.
有没有办法强制广播忽略这个变量?
Broadcast Hash Joins(类似于 Mapreduce 中的 map side join 或 map-side combine):
在 SparkSQL 中,您可以通过调用 queryExecution.executedPlan
查看正在执行的连接类型.与核心 Spark 一样,如果其中一个表比另一个小得多,您可能需要广播散列连接.您可以通过在加入之前调用 DataFrame
上的方法 broadcast
来提示 Spark SQL 应该广播给定的 DF 以进行加入
示例:largedataframe.join(broadcast(smalldataframe), "key")
在 DWH 术语中,其中 largedataframe 可能类似于 fact
smalldataframe 可能类似于 dimension
正如我最喜欢的书 (HPS) 所描述的那样.请参阅下文以更好地理解..
注意:以上broadcast
来自import org.apache.spark.sql.functions.broadcast
而不是SparkContext
Spark 也会自动使用 spark.sql.conf.autoBroadcastJoinThreshold
来确定是否应该广播表.
提示:参见 DataFrame.explain() 方法
def解释():单位将物理计划打印到控制台以进行调试.
有没有办法强制广播忽略这个变量?
sqlContext.sql("SET spark.sql.autoBroadcastJoinThreshold = -1")
注意:
另一个类似的开箱即用笔记 w.r.t.蜂巢(不是火花):类似可以使用 hive 提示 MAPJOIN
来实现,如下所示...
Select/*+ MAPJOIN(b) */a.key, a.value from a join b on a.key = b.key蜂巢>设置 hive.auto.convert.join=true;蜂巢>设置 hive.auto.convert.join.noconditionaltask.size=20971520蜂巢>设置 hive.auto.convert.join.noconditionaltask=true;蜂巢>设置 hive.auto.convert.join.use.nonstaged=true;蜂巢>设置 hive.mapjoin.smalltable.filesize = 30000000;//默认 25 mb 使其变为 30 mb
进一步阅读:请参阅我的文章BHJ、SHJ、SMJ
I am trying to effectively join two DataFrames, one of which is large and the second is a bit smaller.
Is there a way to avoid all this shuffling? I cannot set autoBroadCastJoinThreshold
, because it supports only Integers - and the table I am trying to broadcast is slightly bigger than integer number of bytes.
Is there a way to force broadcast ignoring this variable?
Broadcast Hash Joins (similar to map side join or map-side combine in Mapreduce) :
In SparkSQL you can see the type of join being performed by calling queryExecution.executedPlan
. As with core Spark, if one of the tables is much smaller than the other you may want a broadcast hash join. You can hint to Spark SQL that a given DF should be broadcast for join by calling method broadcast
on the DataFrame
before joining it
Example:largedataframe.join(broadcast(smalldataframe), "key")
As described by my fav book (HPS) pls. see below to have better understanding..
Note : Above broadcast
is from import org.apache.spark.sql.functions.broadcast
not from SparkContext
Spark also, automatically uses the spark.sql.conf.autoBroadcastJoinThreshold
to determine if a table should be broadcast.
Tip : see DataFrame.explain() method
def
explain(): Unit
Prints the physical plan to the console for debugging purposes.
sqlContext.sql("SET spark.sql.autoBroadcastJoinThreshold = -1")
NOTE :
Select /*+ MAPJOIN(b) */ a.key, a.value from a join b on a.key = b.key
hive> set hive.auto.convert.join=true;
hive> set hive.auto.convert.join.noconditionaltask.size=20971520
hive> set hive.auto.convert.join.noconditionaltask=true;
hive> set hive.auto.convert.join.use.nonstaged=true;
hive> set hive.mapjoin.smalltable.filesize = 30000000; // default 25 mb made it as 30mb
Further Reading : Please refer my article on BHJ, SHJ, SMJ
这篇关于DataFrame join 优化 - Broadcast Hash Join的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!