spark程序的提交是通过spark-submit脚本实现的,我们从它开始一步步解开spark提交集群的步骤。
spark-submit的主要命令行:exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
是执行spark-class脚本,并将spark.deploy.SparkSubmit类作为第一个参数。
1、 spark-class
最关键的就是下面这句了:
CMD=()
while IFS= read -d '' -r ARG; do
CMD+=("$ARG")
done < <("$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@")
exec "${CMD[@]}"
首先循环读取ARG参数,加入到CMD中。然后执行了"$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@
这个是真正执行的第一个spark的类。
该类在launcher模块下,简单的浏览下代码:
public static void main(String[] argsArray) throws Exception {
...
List<String> args = new ArrayList<String>(Arrays.asList(argsArray));
String className = args.remove(0);
...
//创建命令解析器
AbstractCommandBuilder builder;
if (className.equals("org.apache.spark.deploy.SparkSubmit")) {
try {
builder = new SparkSubmitCommandBuilder(args);
} catch (IllegalArgumentException e) {
...
}
} else {
builder = new SparkClassCommandBuilder(className, args);
}
List<String> cmd = builder.buildCommand(env);//解析器解析参数
...
//返回有效的参数
if (isWindows()) {
System.out.println(prepareWindowsCommand(cmd, env));
} else {
List<String> bashCmd = prepareBashCommand(cmd, env);
for (String c : bashCmd) {
System.out.print(c);
System.out.print('\0');
}
}
}
launcher.Main
返回的数据存储到CMD中。
然后执行命令:
exec "${CMD[@]}"
这里开始真正执行某个Spark的类。
2、 deploy.SparkSubmit类
private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
def doRunMain(): Unit = {
if (args.proxyUser != null) {
val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
UserGroupInformation.getCurrentUser())
try {
proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
runMain(args, uninitLog)
}
})
} catch {
。。。
}
} else {
runMain(args, uninitLog)
}
}
doRunMain()
}
主要是通过runMain(args,unititLog)方法来提价spark jar包。
所以必须先看看runMain方法是干什么的:
private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
val loader = getSubmitClassLoader(sparkConf)
for (jar <- childClasspath) {
addJarToClasspath(jar, loader)
}
var mainClass: Class[_] = null
try {
mainClass = Utils.classForName(childMainClass)
} catch {
...
}
val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
} else {
new JavaMainApplication(mainClass)
}
try {
app.start(childArgs.toArray, sparkConf)
} catch {
...
}
}
这就很清楚了,要做的事情有以下这些:获取类加载器,添加jar包依赖。创建SparkApplication类的可执行程序或者是JavaMainApplication,创建出来的类叫app。最后执行app.start方法。
SparkApplication是一个抽象类,我们就看看默认的JavaMainApplication就可以了,代码非常简单:
private[deploy] class JavaMainApplication(klass: Class[_]) extends SparkApplication {
override def start(args: Array[String], conf: SparkConf): Unit = {
val mainMethod = klass.getMethod("main", new Array[String](0).getClass)
if (!Modifier.isStatic(mainMethod.getModifiers)) {
throw new IllegalStateException("The main method in the given main class must be static")
}
val sysProps = conf.getAll.toMap
sysProps.foreach { case (k, v) =>
sys.props(k) = v
}
mainMethod.invoke(null, args)
}
}
就是一个kclass的封装器,用来执行入参的kclass的main方法。这里的kclass就是我们编写的spark程序了,里面总有个main方法的。
这个只是个大概,很多的奥秘还在launcher这个工程里。后续笔者试图详细解开launcher的奥秘。