实验前后效果对比:
之前:执行13个节点,耗时16分钟
之后:同样13个节点,耗时3分钟
具体逻辑请参照代码及注释。
import java.util.concurrent.{ExecutorService, Executors, TimeUnit} import akka.actor.{ActorSystem, Props}
import com.alibaba.fastjson.JSONObject
import xxx.listener.AddJobToQueueActor
import xxx.listener.bean.{AppStatusMessage, SparkContextStatusMessage}
import xxx.listener.utils.JSONUtil
import xxx.listener.utils.JmsUtils._
import xxx.main.SparkJob
import xxx.main.utils.JsonUtils
import com.typesafe.config.ConfigFactory
import org.apache.commons.lang.StringUtils
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{Logging, SparkConf, SparkContext} import scala.collection.mutable.Queue /**
* Created by zpc on 2016/9/1.
* JobServer实现模板。
* 修正前:各个任务节点独立提交到spark平台,其中启动sparkContext和hiveContext会占用大量时间,大约40多秒。
* 修改后:将统一用户,占用资源相同的节点利用JMS发送消息提交到同一个SparkContext上,默认设置为3个节点任务并行。
* 实现:1.提交到queue中的msg为任务包含任务中型的子类及参数信息,接收到的任务不存在依赖关系,依赖的处理在消息发送端控制。
* 前置任务执行结束,再发送下一节点任务。
* 2.第一次提交时,任务的参数在args中获取。启动之后,启动jms的lister监听,通过actor将接收到的任务信息加入队列。
* 3.通过反射调用SparkJob的各个子类(真正执行节点逻辑的类),SparkContext的默认timeout时间为30mins。
* 4.节点执行结束,发送节点成功消息到web端,节点失败,发送错误日志及错误消息。
* 程序退出,通过shutdownhook,发送sc关闭消息到web端。
* 程序被关闭,如kill时,将等待队列及正在执行集合中的任务,发送失败消息到web端。
*
*
*/
object ExecuteJobServer extends Logging { //等待执行的job所在的queue
val jobWaitingQueue = new Queue[String]
//当前正在执行的任务的集合
val jobRunningSet = new scala.collection.mutable.HashSet[JSONObject]
val timeout_mins = 30
//最后运行任务时间
var lastRunTime = System.currentTimeMillis() //spark context 对应的 applicationId, user, expId, resource
var appId : String = ""
var user: String = ""
var expId: Long = 0
var resource: String = ""
//正在执行的job JSON
var jobJson : JSONObject = null def main(args: Array[String]): Unit = { //进程杀死时,将正在执行或未执行的任务,发送失败消息到web端。
Runtime.getRuntime().addShutdownHook(new HookMessage())
//接收到的任务,可以同时提交时,线程数可以多设置,暂定为3
val threadPool: ExecutorService = Executors.newFixedThreadPool(3)
val sc = initSparkContext()
val hiveContext = new HiveContext(sc) val list = JsonUtils.parseArray(args(0))
val it = list.iterator
while (it.hasNext) {
val jobStr = it.next().toString
if(expId == 0){
val json = JSONUtil.toJSONString(jobStr)
val param = json.getJSONObject("params")
appId = sc.applicationId
user = param.getString("user")
expId = param.getLongValue("expId")
var driver_memory = ""
var num_executors = "spark.executor.instances"
var executor_memory = ""
sc.getConf.getAll.map( x => {
if(x._1 != null && "spark.executor.instances".equals(x._1)) {
num_executors = x._2
}
else if(x._1 != null && "spark.executor.memory".equals(x._1)){
executor_memory = x._2.substring(0, x._2.length - 1)
}else if(x._1 != null && "spark.driver.memory".equals(x._1)){
driver_memory = x._2.substring(0, x._2.length - 1)
}
}) resource = driver_memory + num_executors + executor_memory;
logInfo("resource is : " +resource)
// resource = param.getString("driver-memory") + param.getString("num-executors") + param.getString("executor-memory")
}
jobWaitingQueue.enqueue(jobStr)
} /** 1.启动listener监听appId,接收queue中发送过来的JobMsg消息2.通过Queue发送消息通知web端,sc启动 **/
val system = ActorSystem("mlp", ConfigFactory.load())
val actor = system.actorOf(Props(new AddJobToQueueActor(appId, jobWaitingQueue)))
createTopicListenerOfContextJobMsg("contextJobMsgListener", actor)
informSparkContextStatus(true) while (jobWaitingQueue.size > 0 || !checkTimeOut) {
while (jobWaitingQueue.size > 0) {
lastRunTime = System.currentTimeMillis()
val jobStr = jobWaitingQueue.dequeue()//.replace("\\", "")
logInfo("***** ExecuteJobServer jobStr ***** jobStr: " + jobStr)
val json = JSONUtil.toJSONString(jobStr)
jobRunningSet.add(json)
threadPool.execute(new ThreadSparkJob(json, hiveContext, sc))
jobJson = json
}
Thread.sleep(1000)
} /**
* jobWaittingQueue队列不再接收消息
*
*/
threadPool.shutdown()
var loop = true
do {
//等待所有任务完成
loop = !threadPool.awaitTermination(2, TimeUnit.SECONDS); //阻塞,直到线程池里所有任务结束
} while (loop);
} def checkTimeOut(): Boolean = {
val nowTime = System.currentTimeMillis()
if (jobRunningSet.isEmpty && (nowTime - lastRunTime) / (1000 * 60) > timeout_mins) {
return true
} else {
return false
}
} class ThreadSparkJob(json: JSONObject, hiveContext: HiveContext, ctx: SparkContext) extends Runnable {
override def run() { try{
val classStr = json.get("class").toString
val argsStr = json.get("params").toString
val obj: SparkJob = Class.forName(classStr).getMethod("self").invoke(null).asInstanceOf[SparkJob]
// val obj: SparkJob = Class.forName(classStr).newInstance().asInstanceOf[SparkJob]
obj.jobServer = true
obj.failed = false
obj.setContext(ctx)
obj.setHiveContext(hiveContext)
obj.main(Array(argsStr))
// InformJobSuccess(json)
logInfo("***** jobRunningSet remove job json***** json: " + json.toJSONString )
jobRunningSet.remove(json)
lastRunTime = System.currentTimeMillis()
}catch {
case oom: OutOfMemoryError => {
informJobFailure(oom.toString, json)
jobRunningSet.remove(json)
logInfo("***** SparkContext go to stop, reaseon: " + oom.getMessage )
hiveContext.sparkContext.stop()
//异常时,sc停止,driver程序停止
System.exit(1)
}
case ex: Exception => {
informJobFailure(ex.toString, json)
jobRunningSet.remove(json)
if(ex.toString.contains("stopped SparkContext")){
logInfo("***** SparkContext go to stop, reaseon: " + ex.getMessage )
hiveContext.sparkContext.stop()
//异常时,sc停止,driver程序停止
System.exit(1)
}
}
}
}
def informJobFailure(errMsg: String, json: JSONObject) = {
if(json != null) {
val params = json.getJSONObject("params")
val user = StringUtils.trimToEmpty(params.getString("user"))
val expId = params.getLongValue("expId")
val nodeId = params.getLongValue("nodeId")
val message = new AppStatusMessage(user, expId, nodeId, "FAILURE", errMsg)
logInfo("***** send informJobFailure message*****: expId: " + expId + "nodeId: " + nodeId)
jobStatusTemplate send message
}
}
} def initSparkContext(): SparkContext = {
val conf = new SparkConf().setAppName("cbt-mlaas")
new SparkContext(conf)
} class HookMessage extends Thread {
override def run() {
var shouldInformStop = false
informSparkContextStatus(false)
while (jobWaitingQueue.size > 0) {
val jobStr = jobWaitingQueue.dequeue()//.replace("\\", "")
val json = JSONUtil.toJSONString(jobStr)
informJobFailureInHook("SparkContext stopped, inform waiting job failed!", json)
shouldInformStop = true
}
jobRunningSet.map(json => {
informJobFailureInHook("SparkContext stopped, inform running job failed!", json);
shouldInformStop = true
})
if (shouldInformStop) {
informExpStop("SparkContext stopped, inform exp failed!", jobJson)
}
}
def informJobFailureInHook(errMsg: String, json: JSONObject) = {
if(json != null) {
val params = json.getJSONObject("params")
val user = StringUtils.trimToEmpty(params.getString("user"))
val expId = params.getLongValue("expId")
val nodeId = params.getLongValue("nodeId")
val message = new AppStatusMessage(user, expId, nodeId, "FAILURE", errMsg)
logInfo("***** send informJobFailure message*****: expId: " + expId + "nodeId: " + nodeId)
jobStatusTemplate send message
}
}
def informExpStop(errMsg: String, json: JSONObject) = {
if(json != null) {
val params = json.getJSONObject("params")
val user = StringUtils.trimToEmpty(params.getString("user"))
val expId = params.getLongValue("expId")
val nodeId = params.getLongValue("nodeId")
val message = new AppStatusMessage(user, expId, nodeId, "STOP", errMsg)
logInfo("***** send informExpStop message*****: expId: " + expId + "nodeId: " + nodeId)
jobStatusTemplate send message
}
}
} def informSparkContextStatus(start : Boolean) = {
val msg = new SparkContextStatusMessage(appId, start, user, expId, resource)
logInfo("***** send sparkContext start message*****: appId: " + appId + "start: " + start)
sparkContextStatusTemplate send msg
} }