本文介绍了为什么火花计数动作分三个阶段执行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经加载了一个csv文件.将其重新分区为4,然后对DataFrame进行计数.当我查看DAG时,我看到此操作分3个阶段执行.

I have loaded a csv file. Re-partitioned it to 4 and then took count of the DataFrame. And when I looked at the DAG I see this action is executed in 3 stages.

为什么将这个简单的动作分为3个阶段执行.我想第一阶段是加载文件,第二阶段是找到每个分区上的计数.

Why this simple action is executed into 3 stages. I suppose 1st stage is to load the file and 2nd is to find the count on each partition.

那么第三阶段会发生什么?

So What is happening in the 3rd stage?

这是我的代码

val sample = spark.read.format("csv").option("header", "true").option("inferSchema", "true").option("delimiter", ";").load("sample_data.csv")

sample.repartition(4).count()

推荐答案

  1. 第一阶段=读取文件.由于存在重新分区(因为它需要进行改组,因此无法进行广泛的转换),因此无法将其与partial_count(第二阶段)一起加入单阶段

  1. The first stage = read a file. Because of repartition (since it's wide transformation that requires shuffling) it can't be joined into single stage with partial_count (2nd stage)

第二阶段=本地计数(计算每个分区的计数)

The second stage = local count (calculating count per partition)

第三阶段=在驱动程序上进行结果聚合.

The third stage = results aggregation on driver.

每个动作或广泛转换的产生阶段都是独立的.要获取有关窄/宽转换以及为什么宽转换需要单独阶段的更多详细信息,请查看"与狭窄的依赖关系,高性能的火花,霍顿·卡劳相比"这篇文章.

Spark generage separate stage per action or wide transformation. To get more details about narrow/wide transformations and why wide transformation require separate stage take a look at "Wide Versus Narrow Dependencies, High Performance Spark, Holden Karau" or this article.

让我们在本地测试此假设.首先,您需要创建一个数据集:

Let's test this assumption locally. First you need create a dataset:

dataset/test-data.json

[
  { "key":  1, "value":  "a" },
  { "key":  2, "value":  "b" },
  { "key":  3, "value":  "c" },
  { "key":  4, "value":  "d" },
  { "key":  5, "value":  "e" },
  { "key":  6, "value":  "f" },
  { "key":  7, "value":  "g" },
  { "key":  8, "value":  "h" }
]

比运行以下代码:

    StructType schema = new StructType()
            .add("key", DataTypes.IntegerType)
            .add("value", DataTypes.StringType);

    SparkSession session = SparkSession.builder()
            .appName("sandbox")
            .master("local[*]")
            .getOrCreate();

    session
            .read()
            .schema(schema)
            .json("file:///C:/<you_path>/dataset")
            .repartition(4) // comment on the second run
            .registerTempTable("df");

    session.sqlContext().sql("SELECT COUNT(*) FROM df").explain();

输出将是:

== Physical Plan ==
*(3) HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
   +- *(2) HashAggregate(keys=[], functions=[partial_count(1)])
      +- Exchange RoundRobinPartitioning(4)
         +- *(1) FileScan json [] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/C:/Users/iaroslav/IdeaProjects/sparksandbox/src/main/resources/dataset], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<>

但是,如果您注释/删除.repartition(4)字符串,请注意TableScan&partial_count是在单个阶段内完成的,输出将如下所示:

But if you comment/remove .repartition(4) string, note that TableScan & partial_count are done within the single stage and the output will be as following:

== Physical Plan ==
*(2) HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
   +- *(1) HashAggregate(keys=[], functions=[partial_count(1)])
      +- *(1) FileScan json [] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/C:/Users/iaroslav/IdeaProjects/sparksandbox/src/main/resources/dataset], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<>

P.S.请注意,额外的阶段可能会对性能产生重大影响,因为它需要磁盘I/O(请看 repartition 增加并行性级别,则可能值得.

P.S. Note that extra stage might have a significant impact on performance, since it requires disk I/O (take a look here) and is some kind of synch barrier impacting parallelization, means in most cases Spark won't start stage 2 till stage 1 is completed. Still if repartition increase level of parallelism it probably worth it.

这篇关于为什么火花计数动作分三个阶段执行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!