公司的实时流管道使用SparkStreaming处理导入到Kinesis的数据。之前出现过一次Streaming任务终止,报警也没通知过来的情况。再加上目前公司大数据部门只有一个人,为了防止某天晚上被报警叫起来重启服务,决定实现Streaming任务的自动重启。

1. 整体思路

自动重启的思路很简单:定期检查yarn中运行的任务,是否有提交的生产环境中的Streaming任务。在Linux上的定期检查可以使用crontab完成。crontab的精度最低是1min检查一次,也就是说有可能服务会挂掉最长1min,对于我的使用场景勉强能够接受。

2. 编写脚本

使用Python编写重启脚本restart.py,其中调用到另一个submit-job.py是提交Streaming任务的命令,根据具体环境的不同会有不同的配置。在脚本restart.py中,会检查YARN上当前运行(提交的)的任务,是否有指定环境的任务,如果没有就重启。

import os
import sys
import threading
import time

class SubmitJob(threading.Thread):
    def __init__(self, env):
        threading.Thread.__init__(self)
        self.env = env

    def run(self):
        cmd = "python /home/hadoop/submit-job.py " + self.env
        print(cmd)
        os.system(cmd)

app_id_qa = os.popen("source yarn application --list | grep dime-streaming-qa | awk '{print $ 1}'").read().strip()

if len(app_id_qa) == 0:
    print "streaming job is missing, prepare to restart it!"
    count = 5
    while len(app_id_qa) == 0 and count > 0:
        job = SubmitJob("qa")
        job.start()
        time.sleep(30)
        app_id_qa = os.popen("source yarn application --list | grep dime-streaming-qa | awk '{print $ 1}'").read().strip()
        if (len(app_id_qa) != 0):
            print("restart success! System exit")
            sys.exit()
        count = count - 1
else:
    print "streaming job is still working"
import os
import sys

env = sys.argv[1]
print(env)
cmd = ""
if env == "qa":
    cmd = "spark-submit --class com.strikingly.dime.spark.DimeJob     --master yarn --deploy-mode cluster --name dime-streaming-qa     --conf spark.driver.memory=1g --num-executors=2     --executor-cores=2 --executor-memory=2G /home/hadoop/dime-jobs.jar     -d 's3://dime-checkpoint/dime-preproduction/'     -t streaming -n test  > /dev/null 2>&1"
else:
    print("Not supported environment: %s, please pass in 'prod', 'preprod' or 'qa'!"%(env))

os.system(cmd)

3. 定时执行

使用crontab设定每分钟执行一次脚本,检查任务是否在运行。执行crontab -e命令,输入内容*/1 * * * * python /home/hadoop/restart.py

04-09 16:53