公司的实时流管道使用SparkStreaming处理导入到Kinesis的数据。之前出现过一次Streaming任务终止,报警也没通知过来的情况。再加上目前公司大数据部门只有一个人,为了防止某天晚上被报警叫起来重启服务,决定实现Streaming任务的自动重启。
1. 整体思路
自动重启的思路很简单:定期检查yarn中运行的任务,是否有提交的生产环境中的Streaming任务。在Linux上的定期检查可以使用crontab完成。crontab的精度最低是1min检查一次,也就是说有可能服务会挂掉最长1min,对于我的使用场景勉强能够接受。
2. 编写脚本
使用Python编写重启脚本restart.py
,其中调用到另一个submit-job.py
是提交Streaming任务的命令,根据具体环境的不同会有不同的配置。在脚本restart.py
中,会检查YARN上当前运行(提交的)的任务,是否有指定环境的任务,如果没有就重启。
import os
import sys
import threading
import time
class SubmitJob(threading.Thread):
def __init__(self, env):
threading.Thread.__init__(self)
self.env = env
def run(self):
cmd = "python /home/hadoop/submit-job.py " + self.env
print(cmd)
os.system(cmd)
app_id_qa = os.popen("source yarn application --list | grep dime-streaming-qa | awk '{print $ 1}'").read().strip()
if len(app_id_qa) == 0:
print "streaming job is missing, prepare to restart it!"
count = 5
while len(app_id_qa) == 0 and count > 0:
job = SubmitJob("qa")
job.start()
time.sleep(30)
app_id_qa = os.popen("source yarn application --list | grep dime-streaming-qa | awk '{print $ 1}'").read().strip()
if (len(app_id_qa) != 0):
print("restart success! System exit")
sys.exit()
count = count - 1
else:
print "streaming job is still working"
import os
import sys
env = sys.argv[1]
print(env)
cmd = ""
if env == "qa":
cmd = "spark-submit --class com.strikingly.dime.spark.DimeJob --master yarn --deploy-mode cluster --name dime-streaming-qa --conf spark.driver.memory=1g --num-executors=2 --executor-cores=2 --executor-memory=2G /home/hadoop/dime-jobs.jar -d 's3://dime-checkpoint/dime-preproduction/' -t streaming -n test > /dev/null 2>&1"
else:
print("Not supported environment: %s, please pass in 'prod', 'preprod' or 'qa'!"%(env))
os.system(cmd)
3. 定时执行
使用crontab设定每分钟执行一次脚本,检查任务是否在运行。执行crontab -e
命令,输入内容*/1 * * * * python /home/hadoop/restart.py
。