Overview

  • 这一部分我们主要讨论如果配置一个Spark application,如何tune and debug Spark workloads
  • 配置对Spark应用性能调优很重要。我们有必要理解一个Spark应用的性能。

Configuring Spark with SparkConf

  • 我们知道,在创建SparkContext的时候会需要SparkConf实例。一个例子:
  • val conf = new SparkConf()
    .setAppName("Test")
    .setMaster("local")
    val sc = new SparkContext(conf)
  • SparkConf类很简单,包含一些用户可覆盖的配置选项的键值对
  • 也可以通过spark-submit动态地设置配置。基于这种方法,你可以在程序中new一个“空”的SparkConf,直接传给SparkContext。这种方式下可以直接使用 --conf标记,该标记之后可以使用任何Spark配置值。例子:
  • bin/spark-submit \
    --class com.wtttt.spark.test \
    --master local \
    --name "test" \
    --conf spark.ui.port=36000 \
    test.jar
  • 这种spark-submit的方式除了可以使用--conf,还可以从文件加载。缺省情况下,spark-submit会在conf/spark-defaults.conf中读取whitespace-delimited 的键值对。你也可以使用

    --properties-file 来指定conf文件。例子:

    bin/spark-submit \
    --class com.wttttt.spark.test \
    --properties-file my-config.conf \
    test.jar
    ## Contents of my-config.conf ##
    
    spark.master    local[4]
    spark.app.name "test"
    spark.ui.port 36000
  • 如果多处同时设置的话,程序设置的优先级高于spark-submit指定的。
  • 完整的conf选项参考 spark configuration

Components of Execution: Jobs, Tasks, and Stages

  • 我们知道,Spark会把逻辑表示翻译成一系列物理执行计划,by merging multiple operations into tasks.
  • 看下面的例子:
    val input = sc.textFile("input.txt")
    
    val tokenized = input.
    map(line => line.split(" ")).
    filter(words => words.size > 0) val counts = tokenized.
    map(words => (words(0), 1)).
    reduceByKey{ (a, b) => a + b} // example of the source file "input.txt"
    ## input.txt ##
    INFO This is a message with content
    INFO This is some other content
    (empty line)
    INFO Here are more messages
    WARN This is a warning
    (empty line)
    ERROR Something bad happened
    WARN More details on the bad thing
    INFO back to normal messages
  • 当我们在shell输入上述语句之后,并不会执行任何actions,只会隐式地定义一个DAG(有向无环图)。我们可以用toDebugString来看看:
  • scala> counts.toDebugString
    res84: String =
    (2) ShuffledRDD[296] at reduceByKey at <console>:17
    +-(2) MappedRDD[295] at map at <console>:17
    | FilteredRDD[294] at filter at <console>:15
    | MappedRDD[293] at map at <console>:15
    | input.text MappedRDD[292] at textFile at <console>:13 | input.text HadoopRDD[291] at textFile at <console>:13
  • 我们执行一个action来触发计算: counts.collect()
  • 这时,Spark的调度器scheduler会创建一个物理执行计划来计算该action所需的RDDs(递归地向前找所有需要的RDDs)。
  • 更复杂的情况是,stages的物理集合不是与RDD graph 1:1对应的。这种情况发生在scheduler执行pipelining或者合并多个RDDs到一个stage的时候。pipelining发生在RDDs可以从parents本地计算的时候(不需要data movement)。
  • 对于上面的例子,计算counts的时候,即使counts有很多个parent RDDs,它也只存在two levels of indentation。所以它的物理执行只需要两个stages。该例中的pipelining就是因为有多个filter和map的序列。如下图:
  • &lt;Spark&gt;&lt;Tuning and Debugging&gt;-LMLPHP
  • 运行这个action,看spark UI: 一共一个job,两个stages。
    • Completed Jobs (1)
    • &lt;Spark&gt;&lt;Tuning and Debugging&gt;-LMLPHP
    • Completed Stages (2)

      &lt;Spark&gt;&lt;Tuning and Debugging&gt;-LMLPHP
  • 除了pipelining,Spark内部的scheduler也会在有RDD已经被persisted到内存或磁盘的时候缩短RDD graph的lineage。
  • 还有一种(可缩短lineage的)情况是,RDD已经通过之前的shuffle被materialized到磁盘。即使没有显式地调用persist()。这种情况是充分利用了shuffle的输出会写到磁盘的特点。
  • 做个实验:
    counts.cache()
    counts.collect()

    看下这个job的stages:

  &lt;Spark&gt;&lt;Tuning and Debugging&gt;-LMLPHP

可以看到只执行了一个stage,另一个被skip了。

  • 以上,Spark执行的时候包括这么几个阶段:
    1. User code定义一个RDD的DAG;
    2. Actions force DAG到执行计划的translation;这时Spark调度器提交一个job来计算所有所需的RDDs。该job会有一或多个stages,它们是由task组成的并行计算浪潮(parallel waves of computation composed of tasks)。由于pipelining,每个stage可以对应多个RDDs。
    3. Tasks被调度,在集群上执行;stages按顺序执行,individual tasks执行RDD的各个部分。

Finding Information

  • 具体的进度信息、性能度量可以在Spark web UI以及driver和executor的logfiles中找到。

Spark Web UI

  • 说明:YARN cluster模式下,application driver是运行在cluster内部的,因此你需要通过YARN resourceManager来访问UI。

Jobs: Progress and metrics of stages, tasks, and more

  • 一般首先是点进去一个比较慢的stage,其内部可能存在skew,因此你可以看下是否某些tasks运行时间过长。比如你可以看是否这些tasks read, write or compute much more than others?
  • 你还可以看每个task读、计算、写分别占用的时间。如果tasks花很少时间读写,那么可能user code本身是expensive的,作为solution之一你可以参考advanced programming中提到的"Working on a Per-Partition Basis"来减少比如创建对象的开销。 但是如果tasks花费很多时间来从外部系统读取数据,那么可能不存在更多的外部优化方式。

Storage: Information for RDDs that are persisted

  • storage页面包含了persisted RDDs的信息。
  • 通常,如果很多RDDs都会缓存的话,older ones可能会被移除。

Executors: A list of executors present in the application

  • 该页列出了应用中活跃的executors,以及每个executor在处理和存储中的一些度量。
  • 这一页的用处是你可以确认你的application拥有你所预期的资源。

Environment: Debugging Spark's configuration

  • This page enumerates the set of active properties in the environment of your Spark application.

Driver and Executor logs

  • YARN模式下,最简单的收集日志的方式是使用YARN日志收集工具

    • (running yarn logs -applicationId <app ID>)

05-11 16:23