一、场景
- 制作一个web应用,在页面上配置一个json字符串,保存在数据库里面。在执行json的时候,动态在本地创建一个json文件后执行,并识别是否成功,将执行过程保存在数据库中。
- 写一个函数,传入json字符串
- 识别datax是否执行成功
- 获取执行过程的打印信息
- 提取执行成功的结果信息
二、代码实现
# -*- coding:utf-8 -*-
import re
import tempfile
import subprocess
def re_search(pattern, text, default=""):
search_obj = re.search(pattern, text)
return search_obj.group(1) if search_obj else default
def parse_datax_success_result(output):
# 提取执行成功的结果信息
return {
"start_time": re_search(r'启动时刻\s+:\s+(.*)', output),
"end_time": re_search(r'结束时刻\s+:\s+(.*)', output),
"total_time": re_search(r'总计耗时\s+:\s+(.*)', output),
"average_flow": re_search(r'平均流量\s+:\s+(.*)', output),
"write_speed": re_search(r'写入速度\s+:\s+(.*)', output),
"total_records": re_search(r'读出记录总数\s+:\s+(\d+)', output),
"total_failures": re_search(r'读写失败总数\s+:\s+(\d+)', output),
}
def datax_run(json_configuration, datax_path="/data/datax/bin/datax.py"):
with tempfile.NamedTemporaryFile() as json_configuration_file:
# 创建临时文件,程序运行完,文件会自动删除
json_configuration_file.write(json_configuration.encode("utf-8"))
# 将写入内存缓冲区中的文件,刷入到磁盘中
json_configuration_file.flush()
# 获取文件路径
json_configuration_file_path = json_configuration_file.name
# 定义要执行的 datax 命令
datax_command = "python %s %s" % (datax_path, json_configuration_file_path)
# 使用 subprocess 模块执行命令
process = subprocess.Popen(datax_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# output:命令输出信息
# error:错误信息
output, error = process.communicate()
# 获取命令的返回码
return_code = process.returncode
# 如果返回码为0,那么是成功
is_success = return_code == 0
# 如果成功,那么解析datax的结果信息
datax_result_info = parse_datax_success_result(output) if is_success else {}
return is_success, datax_result_info, output, error, return_code
if __name__ == '__main__':
with open("/data/datax/job/oracle_to_mysql.json", "r") as f:
print(datax_run(f.read()))