本文介绍了DataFrame join 优化 - Broadcast Hash Join的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试有效地连接两个 DataFrame,其中一个较大,第二个较小.

有没有办法避免所有这些洗牌?我无法设置 autoBroadCastJoinThreshold,因为它只支持整数 - 我试图广播的表比整数字节数略大.

有没有办法强制广播忽略这个变量?

解决方案

Broadcast Hash Joins(类似于 Mapreduce 中的 map side join 或 map-side combine):

在 SparkSQL 中,您可以通过调用 queryExecution.executedPlan 查看正在执行的连接类型.与核心 Spark 一样,如果其中一个表比另一个小得多,您可能需要广播散列连接.您可以通过在加入之前调用 DataFrame 上的方法 broadcast 来提示 Spark SQL 应该广播给定的 DF 以进行加入

示例:largedataframe.join(broadcast(smalldataframe), "key")

在 DWH 术语中,其中 largedataframe 可能类似于 fact
smalldataframe 可能类似于 dimension

正如我最喜欢的书 (HPS) 所描述的那样.请参阅下文以更好地理解..

注意:以上broadcast 来自import org.apache.spark.sql.functions.broadcast 而不是SparkContext

Spark 也会自动使用 spark.sql.conf.autoBroadcastJoinThreshold 来确定是否应该广播表.

提示:参见 DataFrame.explain() 方法

def解释():单位将物理计划打印到控制台以进行调试.

有没有办法强制广播忽略这个变量?

sqlContext.sql("SET spark.sql.autoBroadcastJoinThreshold = -1")

注意:

另一个类似的开箱即用笔记 w.r.t.蜂巢(不是火花):类似可以使用 hive 提示 MAPJOIN 来实现,如下所示...

Select/*+ MAPJOIN(b) */a.key, a.value from a join b on a.key = b.key蜂巢>设置 hive.auto.convert.join=true;蜂巢>设置 hive.auto.convert.join.noconditionaltask.size=20971520蜂巢>设置 hive.auto.convert.join.noconditionaltask=true;蜂巢>设置 hive.auto.convert.join.use.nonstaged=true;蜂巢>设置 hive.mapjoin.smalltable.filesize = 30000000;//默认 25 mb 使其变为 30 mb

进一步阅读:请参阅我的文章BHJ、SHJ、SMJ

I am trying to effectively join two DataFrames, one of which is large and the second is a bit smaller.

Is there a way to avoid all this shuffling? I cannot set autoBroadCastJoinThreshold, because it supports only Integers - and the table I am trying to broadcast is slightly bigger than integer number of bytes.

Is there a way to force broadcast ignoring this variable?

解决方案

Broadcast Hash Joins (similar to map side join or map-side combine in Mapreduce) :

In SparkSQL you can see the type of join being performed by calling queryExecution.executedPlan. As with core Spark, if one of the tables is much smaller than the other you may want a broadcast hash join. You can hint to Spark SQL that a given DF should be broadcast for join by calling method broadcast on the DataFrame before joining it

Example:largedataframe.join(broadcast(smalldataframe), "key")

As described by my fav book (HPS) pls. see below to have better understanding..

Note : Above broadcast is from import org.apache.spark.sql.functions.broadcast not from SparkContext

Spark also, automatically uses the spark.sql.conf.autoBroadcastJoinThreshold to determine if a table should be broadcast.

Tip : see DataFrame.explain() method

def
explain(): Unit
Prints the physical plan to the console for debugging purposes.


sqlContext.sql("SET spark.sql.autoBroadcastJoinThreshold = -1")


NOTE :

Select /*+ MAPJOIN(b) */ a.key, a.value from a join b on a.key = b.key

hive> set hive.auto.convert.join=true;
hive> set hive.auto.convert.join.noconditionaltask.size=20971520
hive> set hive.auto.convert.join.noconditionaltask=true;
hive> set hive.auto.convert.join.use.nonstaged=true;
hive> set hive.mapjoin.smalltable.filesize = 30000000; // default 25 mb made it as 30mb

Further Reading : Please refer my article on BHJ, SHJ, SMJ

这篇关于DataFrame join 优化 - Broadcast Hash Join的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-15 13:28