问题描述
我已经加载了一个csv文件.将其重新分区为4,然后对DataFrame进行计数.当我查看DAG时,我看到此操作分3个阶段执行.
I have loaded a csv file. Re-partitioned it to 4 and then took count of the DataFrame. And when I looked at the DAG I see this action is executed in 3 stages.
为什么将这个简单的动作分为3个阶段执行.我想第一阶段是加载文件,第二阶段是找到每个分区上的计数.
Why this simple action is executed into 3 stages. I suppose 1st stage is to load the file and 2nd is to find the count on each partition.
那么第三阶段会发生什么?
So What is happening in the 3rd stage?
这是我的代码
val sample = spark.read.format("csv").option("header", "true").option("inferSchema", "true").option("delimiter", ";").load("sample_data.csv")
sample.repartition(4).count()
推荐答案
-
第一阶段=读取文件.由于存在重新分区(因为它需要进行改组,因此无法进行广泛的转换),因此无法将其与partial_count(第二阶段)一起加入单阶段
The first stage = read a file. Because of repartition (since it's wide transformation that requires shuffling) it can't be joined into single stage with partial_count (2nd stage)
第二阶段=本地计数(计算每个分区的计数)
The second stage = local count (calculating count per partition)
第三阶段=在驱动程序上进行结果聚合.
The third stage = results aggregation on driver.
每个动作或广泛转换的产生阶段都是独立的.要获取有关窄/宽转换以及为什么宽转换需要单独阶段的更多详细信息,请查看"与狭窄的依赖关系,高性能的火花,霍顿·卡劳相比"或这篇文章.
Spark generage separate stage per action or wide transformation. To get more details about narrow/wide transformations and why wide transformation require separate stage take a look at "Wide Versus Narrow Dependencies, High Performance Spark, Holden Karau" or this article.
让我们在本地测试此假设.首先,您需要创建一个数据集:
Let's test this assumption locally. First you need create a dataset:
dataset/test-data.json
[
{ "key": 1, "value": "a" },
{ "key": 2, "value": "b" },
{ "key": 3, "value": "c" },
{ "key": 4, "value": "d" },
{ "key": 5, "value": "e" },
{ "key": 6, "value": "f" },
{ "key": 7, "value": "g" },
{ "key": 8, "value": "h" }
]
比运行以下代码:
StructType schema = new StructType()
.add("key", DataTypes.IntegerType)
.add("value", DataTypes.StringType);
SparkSession session = SparkSession.builder()
.appName("sandbox")
.master("local[*]")
.getOrCreate();
session
.read()
.schema(schema)
.json("file:///C:/<you_path>/dataset")
.repartition(4) // comment on the second run
.registerTempTable("df");
session.sqlContext().sql("SELECT COUNT(*) FROM df").explain();
输出将是:
== Physical Plan ==
*(3) HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
+- *(2) HashAggregate(keys=[], functions=[partial_count(1)])
+- Exchange RoundRobinPartitioning(4)
+- *(1) FileScan json [] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/C:/Users/iaroslav/IdeaProjects/sparksandbox/src/main/resources/dataset], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<>
但是,如果您注释/删除.repartition(4)字符串,请注意TableScan&partial_count是在单个阶段内完成的,输出将如下所示:
But if you comment/remove .repartition(4) string, note that TableScan & partial_count are done within the single stage and the output will be as following:
== Physical Plan ==
*(2) HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
+- *(1) HashAggregate(keys=[], functions=[partial_count(1)])
+- *(1) FileScan json [] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/C:/Users/iaroslav/IdeaProjects/sparksandbox/src/main/resources/dataset], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<>
P.S.请注意,额外的阶段可能会对性能产生重大影响,因为它需要磁盘I/O(请看 repartition 增加并行性级别,则可能值得.
P.S. Note that extra stage might have a significant impact on performance, since it requires disk I/O (take a look here) and is some kind of synch barrier impacting parallelization, means in most cases Spark won't start stage 2 till stage 1 is completed. Still if repartition
increase level of parallelism it probably worth it.
这篇关于为什么火花计数动作分三个阶段执行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!