​在使用airflow的过程中需要大量的dag脚本进行性能测试,如果一个个去编写dag脚本未免太过麻烦,于是想到用python的jinja2模板引擎实现批量脚本生成。

先通过pip命令安装jinja2模块:

$ pip install jinja2

然后创建模板文件(模板可以是任何形式的文本格式,没有特定扩展名,甚至可以不要扩展名):

dag_template

from datetime import timedelta, datetime
import pytz
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import DAG default_args = {
'owner': 'cord',
# 'depends_on_past': False,
'depends_on_past': True,
# 'start_date': airflow.utils.dates.days_ago(2),
'wait_for_downstream': True,
'execution_timeout': timedelta(minutes=3),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
} tz = pytz.timezone('Asia/Shanghai')
dt = datetime(2018, 7, 19, 18, 20, tzinfo=tz)
utc_dt = dt.astimezone(pytz.utc).replace(tzinfo=None)
dag = DAG(
'{{ dag_name }}',
default_args=default_args,
description='my DAG',
schedule_interval='*/1 * * * *',
start_date=utc_dt
)
root = DummyOperator(task_id='root', dag=dag)
for i in range(50):
i = str(i)
task = BashOperator(
task_id='task'+i,
bash_command= 'echo `date`',
dag=dag)
task.set_downstream(root)

jinja2中有两种分隔符: {% ... %} {{ ... }} ,其中{% ... %}用于执行for循环或者赋值语句,{{ ... }}负责将表达式的值填充到模板中。这里使用{{ ... }}用于填充dag文件的dag_id 。

通过该模板即可批量生成dag脚本文件,生成代码如下:

Tool.py

import os
from jinja2 import Environment, FileSystemLoader #获取模板
env = Environment(loader = FileSystemLoader(searchpath=""))
template = env.get_template("dag_template") #删除已有的生成文件
for f in os.listdir("./output"):
path_file = os.path.join("./output", f)
if os.path.isfile(path_file):
os.remove(path_file) #生成新的文件
for i in range(1, 101):
output = template.render({'dag_name' : "benchmark%d" % i})
with open("./output/bm%d.py" % i, 'w') as out:
out.write(output)

通过执行Tool.py即可批量生成dag脚本文件了。

05-12 12:11