在使用airflow的过程中需要大量的dag脚本进行性能测试,如果一个个去编写dag脚本未免太过麻烦,于是想到用python的jinja2模板引擎实现批量脚本生成。
先通过pip命令安装jinja2模块:
$ pip install jinja2
然后创建模板文件(模板可以是任何形式的文本格式,没有特定扩展名,甚至可以不要扩展名):
dag_template
from datetime import timedelta, datetime
import pytz
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import DAG
default_args = {
'owner': 'cord',
# 'depends_on_past': False,
'depends_on_past': True,
# 'start_date': airflow.utils.dates.days_ago(2),
'wait_for_downstream': True,
'execution_timeout': timedelta(minutes=3),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
tz = pytz.timezone('Asia/Shanghai')
dt = datetime(2018, 7, 19, 18, 20, tzinfo=tz)
utc_dt = dt.astimezone(pytz.utc).replace(tzinfo=None)
dag = DAG(
'{{ dag_name }}',
default_args=default_args,
description='my DAG',
schedule_interval='*/1 * * * *',
start_date=utc_dt
)
root = DummyOperator(task_id='root', dag=dag)
for i in range(50):
i = str(i)
task = BashOperator(
task_id='task'+i,
bash_command= 'echo `date`',
dag=dag)
task.set_downstream(root)
jinja2中有两种分隔符: {% ... %}
和{{ ... }}
,其中{% ... %}
用于执行for循环或者赋值语句,{{ ... }}
负责将表达式的值填充到模板中。这里使用{{ ... }}
用于填充dag文件的dag_id 。
通过该模板即可批量生成dag脚本文件,生成代码如下:
Tool.py
import os
from jinja2 import Environment, FileSystemLoader
#获取模板
env = Environment(loader = FileSystemLoader(searchpath=""))
template = env.get_template("dag_template")
#删除已有的生成文件
for f in os.listdir("./output"):
path_file = os.path.join("./output", f)
if os.path.isfile(path_file):
os.remove(path_file)
#生成新的文件
for i in range(1, 101):
output = template.render({'dag_name' : "benchmark%d" % i})
with open("./output/bm%d.py" % i, 'w') as out:
out.write(output)
通过执行Tool.py
即可批量生成dag脚本文件了。