提交应用

SPARK_HOME/bin目录中的spark-submit脚本用于启动集群上的应用程序。它可以通过一个统一的接口使用所有Spark支持的集群管理器。

绑定应用程序的依赖

如果你的代码依赖其他项目,你需要将其与你的应用程序一起打包,以便将代码分发到Spark集群。为此,创建一个包含你的代码及其依赖的assembly jar (or uber jar)。sbt和maven都有对应的组装插件。在创建assembly jar时,将Spark和Hadoop依赖设为provided级别即可,它们不需要捆绑,因为它们是由集群管理器在运行时提供的。

用spark-submit脚本启动应用

一旦你有一个组装jar,你可以调用bin/spark-submit脚本启动应用。该脚本负责使用Spark及其依赖来设置环境变量,并且可以支持不同集群管理器及部署模式:

./bin/spark-submit \
--class <main-class> \
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments]

一些常用的选项如下:

--class         应用的main类(例如org.apache.spark.examples.SparkPi)

--master  集群的master URL, spark standalone对应的是spark://host:port, mesos对应的是mesos://host:port,yarn,or local.

--deploy-mode  本地启动驱动程序(client)还是在集群的一个工作节点上启动驱动程序(cluster),默认是client

--conf           键值对格式的属性设置。如果属性值有空格的话,则用双引号包裹key=value, 形如"key=value"

application-jar  包含应用代码和依赖的组装jar的路径。路径对整个集群来说必须是可访问的,例如,一个hdfs://或者file://

application-arguments  传递给main方法的参数

常见的部署策略是从与你的worker节点物理位置相同的网关机器提交你的应用程序。这时候,client模式是合适的。在client模式下,驱动程序直接在作为集群客户端的spark-submit进程中启动。应用程序的输入和输出连接到控制台。因此,这种模式特别适用于设计REPL的应用程序(比如Spark shell)。

REPL,Read-Eval-Print Loop的简称,“读取-求值-输出”循环,是一个简单的、交互式的编程环境。

或者,如果你的应用程序是从远离worker机器的机器提交的(例如,在你本地的笔记本上),则通常使用cluster模式来尽量减少驱动程序(drivers)和执行程序(executors)之间的网络延迟。

有几个选项是特定于集群管理器的。

例如,对于cluster部署模式的Spark standalone模式或者Mesos,你可以指定--supervise以确保在驱动程序因non-zero exit code失败时可以自动重启。

具体可以使用spark-submit --help来查看所有的选项。以下是常见选项的一些示例:

# Run application locally
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master local[*] \
/path/to/examples.jar \ # Run on a Spark standalone cluster in client deploy mode
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://207.184.161.138:7077 \
--executor-memory 20G \
--total-executor-cores \
/path/to/examples.jar \ # Run on a Spark standalone cluster in cluster deploy mode with supervise
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://207.184.161.138:7077 \
--deploy-mode cluster \
--supervise \
--executor-memory 20G \
--total-executor-cores \
/path/to/examples.jar \ # Run on a YARN cluster
export HADOOP_CONF_DIR=XXX
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \ # can be client for client mode
--executor-memory 20G \
--num-executors \
/path/to/examples.jar \ # Run a Python application on a Spark standalone cluster
./bin/spark-submit \
--master spark://207.184.161.138:7077 \
examples/src/main/python/pi.py \ # Run on a Mesos cluster in cluster deploy mode with supervise
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master mesos://207.184.161.138:7077 \
--deploy-mode cluster \
--supervise \
--executor-memory 20G \
--total-executor-cores \
http://path/to/examples.jar \

Master URLS

传递给Spark的master URL可采用以下格式之一:

local                                                                      以一个工作线程在本地运行Spark应用

local[K]                                                                  以K个工作线程在本地运行Spark应用

local[K, F]                                                              以K个工作线程在本地运行Spark应用,每个任务最多失败F次

local[*]                                                                   使用与本机逻辑内核一样多的工作线程在本地运行Spark应用

local[*, F]                                                               使用与本机逻辑内核一样多的工作线程在本地运行Spark应用,每个任务最多失败F次

spark://HOST:PORT                                              Spark standalone集群的master。端口默认是7077,可以在master节点的文件中配置。

spark://HOST1:PORT1,HOST2:PORT2               连接用ZooKeeper中间件搭建的高可用的Spark standalone集群的masters。Spark standalone cluster with standby masters with Zookeeper。主机名列表必须包含所有的master节点,端口默认都是7077

mesos://HOST:PORT                                            连接Mesos集群,端口默认是5050,可配置。如果使用ZooKeeper搭建Mesos集群的话,则用mesos://zk://

yarn                                                                       连接YARN集群,以client 模式或者以 cluster 模式,取决于--deploy-mode 的值。集群地址由HADOOP_CONF_DIR 或者 YARN_CONF_DIR 参数指定

从文件加载配置

spark-submit脚本可以从属性文件加载默认的配置并传给你的应用。默认情况下,它将从SPARK_HOME的conf目录中的spark-default.conf文件读取配置。

以这种方式加载默认Spark配置使得我们在使用spark-submit脚本时可以省略特定的标志。例如,如果在spark-default.conf文件中设置了spark.master属性,则spark-submit脚本可省略--master标志。一般来说,在SparkConf上显式设置的配置值的优先级最高,spark-submit选项值优先级次之,spark-default.conf文件中的配置值优先级最低。如果不确定配置项的值最终取的是哪里的值,可以在spark-submit 的时候添加--verbose选项。

高级依赖管理

略。

实际项目中,我们会把写到shell脚本中,这样直接运行脚本就可以启动spark任务了。

示例1:

#!/bin/sh
sparkTaskName='spark-realtime-contactList2Hive';
/home/koushengrui/app/spark-2.2.-bin-hadoop2./bin/spark-submit \
--class com.kou.SparkConsumerTest \
--name ${sparkTaskName} \
--master local[] \
--driver-memory 16G \
--executor-memory 16G \
--conf spark.kafka.metadata.broker.list=192.168.56.100:,192.168.56.101:,192.168.56.102: \
--conf spark.zookeeper.quorum=192.168.56.100:,192.168.56.101:,192.168.56.102: \
--conf spark.streaming.kafka.maxRatePerPartition= \
--conf spark.default.parallelism= \
--conf spark.task.maxFailures= \
--conf spark.network.timeout= \
--conf spark.dynamicAllocation.enaled=false \
--conf spark.shuffle.service.enabled=false \
--conf spark.eventLog.enabled=true \
--conf spark.eventLog.compress=false \
--conf spark.eventLog.dir=/data/spark/events \
--conf spark.executor.extraJavaOptions="-XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+ParallelRefProcEnabled
-XX:+CMSClassUnloadingEnabled -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintHeapAtGC -XX:+HeapDumpOnOutOfMemoryError -verbose:gc" \
--verbose \
/home/koushengrui/app/spark-realtime-contactList2Hive.jar

本例的master是local[*],不用写deploy-mode。

05-27 08:05