首先看个Not in Subquery的SQL:
// test_partition1 和 test_partition2为Hive外部分区表 select * from test_partition1 t1 where t1.id not in (select id from test_partition2);
对应的完整的逻辑计划和物理计划为:
== Parsed Logical Plan == 'Project [*] +- 'Filter NOT 't1.id IN (list#3 []) : +- 'Project ['id] : +- 'UnresolvedRelation `test_partition2` +- 'SubqueryAlias `t1` +- 'UnresolvedRelation `test_partition1` == Analyzed Logical Plan == id: string, name: string, dt: string Project [id#4, name#5, dt#6] +- Filter NOT id#4 IN (list#3 []) : +- Project [id#7] : +- SubqueryAlias `default`.`test_partition2` : +- HiveTableRelation `default`.`test_partition2`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#7, name#8], [dt#9] +- SubqueryAlias `t1` +- SubqueryAlias `default`.`test_partition1` +- HiveTableRelation `default`.`test_partition1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#4, name#5], [dt#6] == Optimized Logical Plan == Join LeftAnti, ((id#4 = id#7) || isnull((id#4 = id#7))) :- HiveTableRelation `default`.`test_partition1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#4, name#5], [dt#6] +- Project [id#7] +- HiveTableRelation `default`.`test_partition2`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#7, name#8], [dt#9] == Physical Plan == BroadcastNestedLoopJoin BuildRight, LeftAnti, ((id#4 = id#7) || isnull((id#4 = id#7))) :- Scan hive default.test_partition1 [id#4, name#5, dt#6], HiveTableRelation `default`.`test_partition1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#4, name#5], [dt#6] +- BroadcastExchange IdentityBroadcastMode +- Scan hive default.test_partition2 [id#7], HiveTableRelation `default`.`test_partition2`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#7, name#8], [dt#9]
通过上述逻辑计划和物理计划可以看出,Spark SQL在对not in subquery处理,从逻辑计划转换为物理计划时,会最终选择BroadcastNestedLoopJoin(对应到Spark源码中BroadcastNestedLoopJoinExec.scala)策略。
提起BroadcastNestedLoopJoin,不得不提Nested Loop Join,它在很多RDBMS中得到应用,比如mysql。它的工作方式是循环从一张表(outer table)中读取数据,然后访问另一张表(inner table,通常有索引),将outer表中的每一条数据与inner表中的数据进行join,类似一个嵌套的循环并且在循环的过程中进行数据的比对校验是否满足一定条件。
对于被连接的数据集较小的情况下,Nested Loop Join是个较好的选择。但是当数据集非常大时,从它的执行原理可知,效率会很低甚至可能影响整个服务的稳定性。
而Spark SQL中的BroadcastNestedLoopJoin就类似于Nested Loop Join,只不过加上了广播表(build table)而已。
BroadcastNestedLoopJoin是一个低效的物理执行计划,内部实现将子查询(select id from test_partition2)进行广播,然后test_partition1每一条记录通过loop遍历广播的数据去匹配是否满足一定条件。
private def leftExistenceJoin( // 广播的数据 relation: Broadcast[Array[InternalRow]], exists: Boolean): RDD[InternalRow] = { assert(buildSide == BuildRight) /* streamed对应物理计划中: Scan hive default.test_partition1 [id#4, name#5, dt#6], HiveTableRelation `default`.`test_partition1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#4, name#5], [dt#6] */ streamed.execute().mapPartitionsInternal { streamedIter => val buildRows = relation.value val joinedRow = new JoinedRow // 条件是否定义。此处为Some(((id#4 = id#7) || isnull((id#4 = id#7)))) if (condition.isDefined) { streamedIter.filter(l => // exists主要是为了根据joinType来进一步条件判断数据的返回与否,此处joinType为LeftAnti buildRows.exists(r => boundCondition(joinedRow(l, r))) == exists ) // else } else if (buildRows.nonEmpty == exists) { streamedIter } else { Iterator.empty } } }
由于BroadcastNestedLoopJoin的低效率执行,可能导致长时间占用executor资源,影响集群性能。同时,因为子查询的结果集要进行广播,如果数据量特别大,对driver端也是一个严峻的考验,极有可能带来OOM的风险。因此,在实际生产中,要尽可能利用其他效率相对高的SQL来避免使用Not in Subquery。
虽然通过改写Not in Subquery的SQL,进行低效率的SQL到高效率的SQL过渡,能够避免上面所说的问题。但是这往往建立在我们发现任务执行慢甚至失败,然后排查任务中的SQL,发现"问题"SQL的前提下。那么如何在任务执行前,就"检查"出这样的SQL,从而进行提前预警呢?
这里笔者给出一个思路,就是解析Spark SQL计划,根据Spark SQL的join策略匹配条件等,来判断任务中是否使用了低效的Not in Subquery进行预警,然后通知业务方进行修改。同时,我们在实际完成数据的ETL处理等分析时,也要事前避免类似的低性能SQL。
关联文章:
Spark SQL如何选择join策略
关注微信公众号:大数据学习与分享,获取更对技术干货