本文介绍了为什么火花计数动作分三个阶段执行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经加载了一个 csv 文件.将其重新分区为 4,然后对 DataFrame 进行计数.当我查看 DAG 时,我看到此操作分 3 个阶段执行.

为什么将这个简单的操作分为 3 个阶段执行.我想第一阶段是加载文件,第二阶段是找到每个分区的计数.

那么第三阶段发生了什么?

这是我的代码

val sample = spark.read.format("csv").option("header", "true").option("inferSchema", "true").option("delimiter", ";").load("sample_data.csv")sample.repartition(4).count()
解决方案
  1. 第一阶段 = 读取文件.由于重新分区(因为它是需要改组的广泛转换),因此无法使用partial_count(第二阶段)加入单个阶段

  2. 第二阶段=本地计数(计算每个分区的计数)

  3. 第三阶段 = 驱动程序的结果聚合.

Spark 生成每个动作或广泛转换的单独阶段.要获得有关窄/宽转换的更多详细信息以及为什么宽转换需要单独的阶段,请查看 "宽依赖与窄依赖、高性能 Spark、Holden Karau"这篇文章.

让我们在本地测试这个假设.首先你需要创建一个数据集:

数据集/test-data.json

[{ "key": 1, "value": "a" },{ "key": 2, "value": "b" },{ "key": 3, "value": "c" },{ "key": 4, "value": "d" },{ "key": 5, "value": "e" },{ "key": 6, "value": "f" },{ "key": 7, "value": "g" },{ "key": 8, "value": "h" }]

然后运行以下代码:

 StructType schema = new StructType().add("key", DataTypes.IntegerType).add("value", DataTypes.StringType);SparkSession 会话 = SparkSession.builder().appName("沙箱").master("本地[*]").getOrCreate();会议.读().schema(架构).json("file:///C://dataset").repartition(4)//注释第二次运行.registerTempTable("df");session.sqlContext().sql("SELECT COUNT(*) FROM df").explain();

输出将是:

== 物理计划 ==*(3) HashAggregate(keys=[], functions=[count(1)])+- 交换单分区+- *(2) HashAggregate(keys=[], 函数=[partial_count(1)])+- 交换循环分区(4)+- *(1) FileScan json [] 批处理:false,格式:JSON,位置:InMemoryFileIndex[file:/C:/Users/iaroslav/IdeaProjects/sparksandbox/src/main/resources/dataset],PartitionFilters:[],PushedFilters: [], ReadSchema: struct<>

但是如果您注释/删除 .repartition(4) 字符串,请注意 TableScan &partial_count 在单阶段完成,输出如下:

== 物理计划 ==*(2) HashAggregate(keys=[], functions=[count(1)])+- 交换单分区+- *(1) HashAggregate(keys=[], 函数=[partial_count(1)])+- *(1) FileScan json [] 批处理:false,格式:JSON,位置:InMemoryFileIndex[file:/C:/Users/iaroslav/IdeaProjects/sparksandbox/src/main/resources/dataset],PartitionFilters:[],PushedFilters: [], ReadSchema: struct<>

附言请注意,额外的阶段可能会对性能产生重大影响,因为它需要磁盘 I/O(看看 此处) 并且是某种影响并行化的同步障碍,这意味着在大多数情况下,Spark 在阶段 1 完成之前不会启动阶段 2.尽管如此,如果 repartition 提高并行度,那可能是值得的.

I have loaded a csv file. Re-partitioned it to 4 and then took count of the DataFrame. And when I looked at the DAG I see this action is executed in 3 stages.

Why this simple action is executed into 3 stages. I suppose 1st stage is to load the file and 2nd is to find the count on each partition.

So What is happening in the 3rd stage?

Here is my code

val sample = spark.read.format("csv").option("header", "true").option("inferSchema", "true").option("delimiter", ";").load("sample_data.csv")

sample.repartition(4).count()
解决方案
  1. The first stage = read a file. Because of repartition (since it's wide transformation that requires shuffling) it can't be joined into single stage with partial_count (2nd stage)

  2. The second stage = local count (calculating count per partition)

  3. The third stage = results aggregation on driver.

Spark generage separate stage per action or wide transformation. To get more details about narrow/wide transformations and why wide transformation require separate stage take a look at "Wide Versus Narrow Dependencies, High Performance Spark, Holden Karau" or this article.

Let's test this assumption locally. First you need create a dataset:

dataset/test-data.json

[
  { "key":  1, "value":  "a" },
  { "key":  2, "value":  "b" },
  { "key":  3, "value":  "c" },
  { "key":  4, "value":  "d" },
  { "key":  5, "value":  "e" },
  { "key":  6, "value":  "f" },
  { "key":  7, "value":  "g" },
  { "key":  8, "value":  "h" }
]

Than run the following code:

    StructType schema = new StructType()
            .add("key", DataTypes.IntegerType)
            .add("value", DataTypes.StringType);

    SparkSession session = SparkSession.builder()
            .appName("sandbox")
            .master("local[*]")
            .getOrCreate();

    session
            .read()
            .schema(schema)
            .json("file:///C:/<you_path>/dataset")
            .repartition(4) // comment on the second run
            .registerTempTable("df");

    session.sqlContext().sql("SELECT COUNT(*) FROM df").explain();

The output will be:

== Physical Plan ==
*(3) HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
   +- *(2) HashAggregate(keys=[], functions=[partial_count(1)])
      +- Exchange RoundRobinPartitioning(4)
         +- *(1) FileScan json [] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/C:/Users/iaroslav/IdeaProjects/sparksandbox/src/main/resources/dataset], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<>

But if you comment/remove .repartition(4) string, note that TableScan & partial_count are done within the single stage and the output will be as following:

== Physical Plan ==
*(2) HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
   +- *(1) HashAggregate(keys=[], functions=[partial_count(1)])
      +- *(1) FileScan json [] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/C:/Users/iaroslav/IdeaProjects/sparksandbox/src/main/resources/dataset], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<>

P.S. Note that extra stage might have a significant impact on performance, since it requires disk I/O (take a look here) and is some kind of synch barrier impacting parallelization, means in most cases Spark won't start stage 2 till stage 1 is completed. Still if repartition increase level of parallelism it probably worth it.

这篇关于为什么火花计数动作分三个阶段执行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-14 08:48