SparkSQL Join选择逻辑
先看JoinSelection的注释
翻译下就是:
如果是等值join,就先看join hints,顺序如下
- broadcast hint:join类型支持的话选择broadcast hash join,如果join的两边都有broadcast hint,选择小的(基于统计)一方去广播
- sort merge hint:如果join的key是可排序的,选择sort merge join
- shuffle hash hint:join类型支持的话选择shuffle hash join
- shuffle replicate NL hint:如果是inner like类型(inner或cross),则选择cartesian product join
如果没有hint或者hint的类型是不合适的,按如下顺序选择
- broadcast hash join:如果join的一方足够小,小到可以广播,同时join类型支持,如果两边都很小,选最小的(基于统计)
- shuffle hash join:如果join的一方足够小可以构建hash map,并且比另一端小很多,同时需要spark.sql.join.preferSortMergeJoin置为false
- sort merge join:如果join的key是可排序的
- cartesian product:如果join类型是inner like类型(inner或cross)
- broadcast nested loop join:打底策略,即便可能导致OOM但别无选择
注意:
- hash join(broadcast hash join或者shuffle hash join)只支持等值Join,不支持full outer join
- 小到可以广播指的是,小于spark.sql.autoBroadcastJoinThreshold的阈值(默认10MB)
val AUTO_BROADCASTJOIN_THRESHOLD = buildConf("spark.sql.autoBroadcastJoinThreshold")
.doc("Configures the maximum size in bytes for a table that will be broadcast to all worker " +
"nodes when performing a join. By setting this value to -1 broadcasting can be disabled. " +
"Note that currently statistics are only supported for Hive Metastore tables where the " +
"command `ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan` has been " +
"run, and file-based data source tables where the statistics are computed directly on " +
"the files of data.")
.version("1.1.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("10MB")
- shuffle hash join时要求一边比另一边小很多,小很多指的是3倍小于
/**
* Matches a plan whose output should be small enough to be used in broadcast join.
*/
private def canBroadcast(plan: LogicalPlan): Boolean = {
plan.stats.sizeInBytes >= 0 && plan.stats.sizeInBytes <= conf.autoBroadcastJoinThreshold
}
/**
* Matches a plan whose single partition should be small enough to build a hash table.
*
* Note: this assume that the number of partition is fixed, requires additional work if it's
* dynamic.
*/
private def canBuildLocalHashMap(plan: LogicalPlan): Boolean = {
plan.stats.sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions
}
/**
* Returns whether plan a is much smaller (3X) than plan b.
*
* The cost to build hash map is higher than sorting, we should only build hash map on a table
* that is much smaller than other one. Since we does not have the statistic for number of rows,
* use the size of bytes here as estimation.
*/
private def muchSmaller(a: LogicalPlan, b: LogicalPlan): Boolean = {
a.stats.sizeInBytes * 3 <= b.stats.sizeInBytes
}
hash join为什么只支持等值join,同时不支持full outer join?
- 这是由于hashmap的特性决定的,随机访问效率最高O(1),为了性能是不会通过hashmap进行遍历查找的。
- 不支持full outer join 是因为小表做的是构建表,由于不是流式表,无法决定是否输出该行,完全是被动的
参考一下SparkSQL Join流程:
cartesian join为什么会限制是inner like?
def createCartesianProduct() = {
if (joinType.isInstanceOf[InnerLike]) {
Some(Seq(joins.CartesianProductExec(planLater(left), planLater(right), condition)))
} else {
None
}
}
可以看下JoinType的类,继承了InnerLike的一个是inner join,一个是cross join
object JoinType {
def apply(typ: String): JoinType = typ.toLowerCase(Locale.ROOT).replace("_", "") match {
case "inner" => Inner
case "outer" | "full" | "fullouter" => FullOuter
case "leftouter" | "left" => LeftOuter
case "rightouter" | "right" => RightOuter
case "leftsemi" | "semi" => LeftSemi
case "leftanti" | "anti" => LeftAnti
case "cross" => Cross
case _ =>
val supported = Seq(
"inner",
"outer", "full", "fullouter", "full_outer",
"leftouter", "left", "left_outer",
"rightouter", "right", "right_outer",
"leftsemi", "left_semi", "semi",
"leftanti", "left_anti", "anti",
"cross")
throw new IllegalArgumentException(s"Unsupported join type '$typ'. " +
"Supported join types include: " + supported.mkString("'", "', '", "'") + ".")
}
}
sealed abstract class JoinType {
def sql: String
}
/**
* The explicitCartesian flag indicates if the inner join was constructed with a CROSS join
* indicating a cartesian product has been explicitly requested.
*/
sealed abstract class InnerLike extends JoinType {
def explicitCartesian: Boolean
}
case object Inner extends InnerLike {
override def explicitCartesian: Boolean = false
override def sql: String = "INNER"
}
case object Cross extends InnerLike {
override def explicitCartesian: Boolean = true
override def sql: String = "CROSS"
}
case object LeftOuter extends JoinType {
override def sql: String = "LEFT OUTER"
}
case object RightOuter extends JoinType {
override def sql: String = "RIGHT OUTER"
}
case object FullOuter extends JoinType {
override def sql: String = "FULL OUTER"
}
case object LeftSemi extends JoinType {
override def sql: String = "LEFT SEMI"
}
case object LeftAnti extends JoinType {
override def sql: String = "LEFT ANTI"
}
...
}
为什么Broadcast Nested Loop Join会OOM?
Broadcast Nested Loop Join需要广播数据集和嵌套循环,计算效率极低,对内存的需求也极大,因为不论数据集大小,都会有一个数据集被广播到所有executor上。
Cross Join优化案例
select /*+ mapjoin(b)*/
a.*, sum(b.work_date) as '工作日'
from a
cross join
work_date_dim b
on b.begin_tm >= a.任务开始时间
and b.end_tm < a.任务结束时间
group by ...
不加mapjoin的hint,执行结果就是特别慢!a表不到 10w, b表只有几千条,执行了30分钟还是不行!
加mapjoin的hint,不到 1分钟就执行完了。但是,注意b表不能太大。