问题描述
让我们假设以下每个时间点只有一个 Spark 作业在运行.
到目前为止我得到了什么
以下是我对 Spark 中发生的事情的理解:
- 当创建一个
SparkContext
时,每个工作节点都会启动一个执行程序.Executors 是独立的进程 (JVM),它连接回驱动程序.每个执行器都有驱动程序的jar.退出驱动程序,关闭执行程序.每个执行者可以持有一些分区. - 执行作业时,会根据沿袭图创建执行计划.
- 执行作业分为多个阶段,其中的阶段包含尽可能多的相邻(在谱系图中)转换和操作,但没有混洗.因此,各个阶段由 shuffle 分隔.
我明白
- 任务是通过序列化 Function 对象从驱动程序发送到执行程序的命令.
- 执行程序反序列化(使用驱动程序 jar)命令(任务)并在分区上执行它.
但是
问题
我如何将阶段拆分为这些任务?
特别是:
- 任务是由转换和操作决定的,还是可以在一个任务中包含多个转换/操作?
- 任务是由分区决定的吗(例如,每个分区每个阶段每个阶段一个任务).
- 任务是否由节点决定(例如,每个节点每个阶段一个任务)?
我的想法(只是部分答案,即使是正确的)
在
我的印象是规则是
每个阶段分为#number-of-partitions个任务,不考虑节点数
对于我的第一张图片,我会说我有 3 个 map 任务和 3 个 reduce 任务.
对于来自 0x0fff 的图像,我会说有 8 个 map 任务和 3 个 reduce 任务(假设只有 3 个橙色和 3 个深绿色文件).
无论如何都是开放的问题
这样对吗?但即使那是正确的,我上面的问题也没有全部回答,因为它仍然是开放的,是否多个操作(例如多个地图)在一个任务中或每个操作分成一个任务.
别人怎么说
现在:有多少任务?任务数应该等于
(Stage
* #Partitions in the stage
)
Let's assume for the following that only one Spark job is running at every point in time.
What I get so far
Here is what I understand what happens in Spark:
- When a
SparkContext
is created, each worker node starts an executor.Executors are separate processes (JVM), that connects back to the driver program. Each executor has the jar of the driver program. Quitting a driver, shuts down the executors. Each executor can hold some partitions. - When a job is executed, an execution plan is created according to the lineage graph.
- The execution job is split into stages, where stages containing as many neighbouring (in the lineage graph) transformations and action, but no shuffles. Thus stages are separated by shuffles.
I understand that
but
Question(s)
How do I split the stage into those tasks?
Specifically:
- Are the tasks determined by the transformations and actions or can be multiple transformations/actions be in a task?
- Are the tasks determined by the partition (e.g. one task per per stage per partition).
- Are the tasks determined by the nodes (e.g. one task per stage per node)?
What I think (only partial answer, even if right)
In https://0x0fff.com/spark-architecture-shuffle, the shuffle is explained with the image
and I get the impression that the rule is
For my first image I'd say that I'd have 3 map tasks and 3 reduce tasks.
For the image from 0x0fff, I'd say there are 8 map tasks and 3 reduce tasks (assuming that there are only three orange and three dark green files).
Open questions in any case
Is that correct? But even if that is correct, my questions above are not all answered, because it is still open, whether multiple operations (e.g. multiple maps) are within one task or are separated into one tasks per operation.
What others say
What is a task in Spark? How does the Spark worker execute the jar file? and How does the Apache Spark scheduler split files into tasks? are similar, but I did not feel that my question was answered clearly there.
You have a pretty nice outline here. To answer your questions
- A separate
task
does need to be launched for each partition of data for eachstage
. Consider that each partition will likely reside on distinct physical locations - e.g. blocks in HDFS or directories/volumes for a local file system.
Note that the submission of Stage
s is driven by the DAG Scheduler
. This means that stages that are not interdependent may be submitted to the cluster for execution in parallel: this maximizes the parallelization capability on the cluster. So if operations in our dataflow can happen simultaneously we will expect to see multiple stages launched.
We can see that in action in the following toy example in which we do the following types of operations:
- load two datasources
- perform some map operation on both of the data sources separately
- join them
- perform some map and filter operations on the result
- save the result
So then how many stages will we end up with?
- 1 stage each for loading the two datasources in parallel = 2 stages
- A third stage representing the
join
that is dependent on the other two stages - Note: all of the follow-on operations working on the joined data may be performed in the same stage because they must happen sequentially. There is no benefit to launching additional stages because they can not start work until the prior operation were completed.
Here is that toy program
val sfi = sc.textFile("/data/blah/input").map{ x => val xi = x.toInt; (xi,xi*xi) }
val sp = sc.parallelize{ (0 until 1000).map{ x => (x,x * x+1) }}
val spj = sfi.join(sp)
val sm = spj.mapPartitions{ iter => iter.map{ case (k,(v1,v2)) => (k, v1+v2) }}
val sf = sm.filter{ case (k,v) => v % 10 == 0 }
sf.saveAsTextFile("/data/blah/out")
And here is the DAG of the result
Now: how many tasks ? The number of tasks should be equal to
Sum of (Stage
* #Partitions in the stage
)
这篇关于Spark中的stage是如何拆分成task的?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!