问题描述
我正尝试使用SparkSQL在数据帧上执行广播哈希联接,如此处所述: https://docs.cloud.databricks.com/docs/latest/databricks_guide/06%20Spark%20SQL%20%26 %20DataFrames/05%20BroadcastHashJoin%20-%20scala.html
I'm trying to perform a broadcast hash join on dataframes using SparkSQL as documented here: https://docs.cloud.databricks.com/docs/latest/databricks_guide/06%20Spark%20SQL%20%26%20DataFrames/05%20BroadcastHashJoin%20-%20scala.html
在该示例中,(小)DataFrame
通过saveAsTable持久保存,然后通过spark SQL(即通过sqlContext.sql("..."))
In that example, the (small) DataFrame
is persisted via saveAsTable and then there's a join via spark SQL (i.e. via sqlContext.sql("..."))
我遇到的问题是,我需要使用sparkSQL API构造我的SQL(我留下了约50个带有ID列表的表,并且不想手工编写SQL).
The problem I have is that I need to use the sparkSQL API to construct my SQL (I am left joining ~50 tables with an ID list, and don't want to write the SQL by hand).
How do I tell spark to use the broadcast hash join via the API? The issue is that if I load the ID list (from the table persisted via `saveAsTable`) into a `DataFrame` to use in the join, it isn't clear to me if Spark can apply the broadcast hash join.
推荐答案
您可以将DataFrame
显式标记为足够小以进行广播使用broadcast
函数:
You can explicitly mark the DataFrame
as small enough for broadcastingusing broadcast
function:
Python :
from pyspark.sql.functions import broadcast
small_df = ...
large_df = ...
large_df.join(broadcast(small_df), ["foo"])
或广播提示(火花> = 2.2):
or broadcast hint (Spark >= 2.2):
large_df.join(small_df.hint("broadcast"), ["foo"])
斯卡拉:
import org.apache.spark.sql.functions.broadcast
val smallDF: DataFrame = ???
val largeDF: DataFrame = ???
largeDF.join(broadcast(smallDF), Seq("foo"))
或广播提示(火花> = 2.2):
or broadcast hint (Spark >= 2.2):
largeDF.join(smallDF.hint("broadcast"), Seq("foo"))
SQL
您可以使用提示(火花> = 2.2 ):
SELECT /*+ MAPJOIN(small) */ *
FROM large JOIN small
ON large.foo = small.foo
或
SELECT /*+ BROADCASTJOIN(small) */ *
FROM large JOIN small
ON large.foo = small.foo
或
SELECT /*+ BROADCAST(small) */ *
FROM large JOIN small
ON larger.foo = small.foo
R (SparkR):
使用hint
(火花> = 2.2):
With hint
(Spark >= 2.2):
join(large, hint(small, "broadcast"), large$foo == small$foo)
使用broadcast
(火花> = 2.3)
With broadcast
(Spark >= 2.3)
join(large, broadcast(small), large$foo == small$foo)
注意:
如果结构之一相对较小,则广播连接很有用.否则,它可能比全面洗牌要贵得多.
Broadcast join is useful if one of structures is relatively small. Otherwise it can be significantly more expensive than a full shuffle.
这篇关于Spark SQL广播哈希联接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!