问题描述
我有类似 ETL 的场景,其中我从多个 JDBC 表和文件中读取数据,并在源之间执行一些聚合和连接.
I've got ETL-like scenario, in which I read data from multiple JDBC tables and files and perform some aggregations and join between sources.
在一个步骤中,我必须连接两个 JDBC 表.我试过做这样的事情:
In one step I must join two JDBC tables. I've tried to do something like:
val df1 = spark.read.format("jdbc")
.option("url", Database.DB_URL)
.option("user", Database.DB_USER)
.option("password", Database.DB_PASSWORD)
.option("dbtable", tableName)
.option("driver", Database.DB_DRIVER)
.option("upperBound", data.upperBound)
.option("lowerBound", data.lowerBound)
.option("numPartitions", data.numPartitions)
.option("partitionColumn", data.partitionColumn)
.load();
val df2 = spark.read.format("jdbc")
.option("url", Database.DB_URL)
.option("user", Database.DB_USER)
.option("password", Database.DB_PASSWORD)
.option("dbtable", tableName)
.option("driver", Database.DB_DRIVER)
.option("upperBound", data2.upperBound)
.option("lowerBound", data2.lowerBound)
.option("numPartitions", data2.numPartitions)
.option("partitionColumn", data2.partitionColumn)
.load();
df1.join(df2, Seq("partition_key", "id")).show();
请注意,两种情况下的 partitionColumn
是相同的 - partition_key".
Note that partitionColumn
in both cases is the same - "partition_key".
但是,当我运行这样的查询时,我可以看到不必要的交换(为了可读性而清除了计划):
However, when I run such query, I can see unnecessary exchange (plan cleared for readability):
df1.join(df2, Seq("partition_key", "id")).explain(extended = true);
Project [many many fields]
+- Project [partition_key#10090L, iv_id#10091L, last_update_timestamp#10114, ... more fields]
+- SortMergeJoin [partition_key#10090L, id#10091L], [partition_key#10172L, id#10179L], Inner
:- *Sort [partition_key#10090L ASC NULLS FIRST, iv_id#10091L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(partition_key#10090L, iv_id#10091L, 4)
: +- *Scan JDBCRelation((select mod(s.id, 23) as partition_key, s.* from tab2 s)) [numPartitions=23] [partition_key#10090L,id#10091L,last_update_timestamp#10114] PushedFilters: [*IsNotNull(PARTITION_KEY)], ReadSchema: struct<partition_key:bigint,id:bigint,last_update_timestamp:timestamp>
+- *Sort [partition_key#10172L ASC NULLS FIRST, id#10179L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(partition_key#10172L, iv_id#10179L, 4)
+- *Project [partition_key#10172L, id#10179L ... 75 more fields]
+- *Scan JDBCRelation((select mod(s.id, 23) as partition_key, s.* from tab1 s)) [numPartitions=23] [fields] PushedFilters: [*IsNotNull(ID), *IsNotNull(PARTITION_KEY)], ReadSchema: struct<partition_key:bigint,id:bigint...
如果我们已经用numPartitions等选项对读取进行了分区,分区数是一样的,为什么还需要另一个Exchange?我们能以某种方式避免这种不必要的洗牌吗?在测试数据上,我看到 Sparks 在本次 Exchange 期间发送了超过 150M 的数据,其中生产Datasets
更大,因此可能是严重的瓶颈.
If we have already partitioned reading with numPartitions
and other options, partition count is the same, why there is a need for another Exchange? Can we somehow avoid this unnecessary shuffle? On the test data I see Sparks sends more than 150M of data during this Exchange, where production Datasets
are much bigger, so it can be serious bottleneck.
推荐答案
在 Date Source API 的当前实现中,没有向上游传递分区信息,因此即使数据可以在没有 shuffle 的情况下加入,Spark 也无法使用这些信息.因此,您的假设是:
With current implementation of the Date Source API there is no partitioning information passed upstream so even if data could be joined without a shuffle, Spark cannot use this information. Therefore your assumption that:
JdbcRelation 在读取时使用 RangePartitioning
只是不正确.此外,看起来 Spark 使用相同的内部代码来处理基于范围的 JDBC 分区和基于谓词的 JDBC 分区.虽然前者可以转换为 SortOrder
,但后者一般可能与 Spark SQL 不兼容.
is just incorrect. Furthermore it looks like Spark uses the same internal code to handle range-based JDBC partitions and predicate-based JDBC partitions. While the former one could be translated to SortOrder
, the latter one might be incompatible with Spark SQL in general.
如有疑问,可以使用QueryExecution
和内部RDD
来检索Partitioner
信息:
When in doubt, it is possible to retrieve Partitioner
information using QueryExecution
and internal RDD
:
df.queryExecution.toRdd.partitioner
这在未来可能会改变(SPIP: 数据 来源 API V2,SPARK-15689 - 数据源 API v2 和 火花数据帧.预排序分区).
This might change in the future (SPIP: Data Source API V2, SPARK-15689 - Data source API v2 and Spark Data Frame. PreSorded partitions ).
这篇关于如何加入两个 JDBC 表并避免交换?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!