我查看了Airflow subDAG部分,并试图在网上找到其他有用的信息,但是我没有找到任何详细说明如何使subDAG正常工作的信息。要运行subDAG的要求之一是应将其启用。如何启用/禁用子查询?
我写了一些示例代码,这些代码没有显示出 Airflow 中的任何错误,但是当我尝试运行它时,subDAG中的任何运算符都没有执行。
这是我的主要dag代码:
import os
from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta
from airflow.operators.subdag_operator import SubDagOperator
from linecount_subdag import sub_dag
parent_dag_name = 'example_linecount_dag'
child_dag_name = 'example_linecount_subdag'
args = {
'owner': 'airflow',
'start_date': datetime(2016, 04, 20),
'retries': 0,
}
main_dag = DAG(
dag_id=parent_dag_name,
default_args=args,
schedule_interval=timedelta(minutes=5),
start_date=datetime(2016, 04, 20),
max_active_runs=1
)
subdag = SubDagOperator(
subdag=sub_dag(parent_dag_name, child_dag_name, args, main_dag.schedule_interval),
task_id=child_dag_name,
default_args=args,
dag=main_dag)
t = BashOperator(
task_id='start',
bash_command='echo "waiting for subdag..."',
default_args=args,
dag=main_dag)
t.set_downstream(subdag)
在此代码中,任务“启动”成功,但是subdag任务不执行任何操作,也没有失败也没有成功。
这是我的subDAG代码:
from airflow.models import DAG
from airflow.operators import BashOperator
# Dag is returned by a factory method
def sub_dag(parent_dag_name, child_dag_name, args, schedule_interval):
dag = DAG(
'%s.%s' % (parent_dag_name, child_dag_name),
default_args=args,
start_date=args['start_date'],
max_active_runs=1,
)
t1 = BashOperator(
task_id='count_lines',
bash_command='cat /root/airflow/airflow.cfg | wc -l',
default_args=args,
xcom_push=True,
dag=dag)
t2 = BashOperator(
task_id='retrieve_val',
bash_command='grep "airflow_home" /root/airflow/airflow.cfg',
default_args=args,
xcom_push=True,
dag=dag)
templated_command = """
{
echo "{{ ti.xcom_pull(task_ids='count_lines') }}"
echo "{{ ti.xcom_pull(task_ids='retrieve_val') }}"
}"""
t3 = BashOperator(
task_id='print_values',
bash_command=templated_command,
default_args=args,
dag=dag)
t3.set_upstream(t1)
t3.set_upstream(t2)
return dag
此代码中的3个运算符获取文件“airflow.cfg”的行数,在该文件中找到“airflow_home”的值,然后返回要打印的两个值。这段代码是独立运行的,所以我认为这不是问题。
为了使subDAG执行其运算符,我必须进行哪些更改?
最佳答案
我在本地使用了您的代码,效果很好。
我唯一更改的是将外部dag和sub dag都设置为schedule_interval = None并手动触发它们。
将开始日期设置为 datetime(2016、04、20),并将schedule_interval设置为 5分钟将使泛洪 Airflow 调度程序,其中包含许多回填请求。
您可能需要从使用LocalExecutor切换到CeleryExecutor。 LocalExecutor相当有限。
这是子数据的最后一步的输出:
[2017-03-08 15:35:18,994] {base_task_runner.py:95} INFO - Subtask: {
[2017-03-08 15:35:18,994] {base_task_runner.py:95} INFO - Subtask: echo "226"
[2017-03-08 15:35:18,994] {base_task_runner.py:95} INFO - Subtask: echo "airflow_home = /root/airflow/"
[2017-03-08 15:35:18,994] {base_task_runner.py:95} INFO - Subtask: }
关于python - subDAG在Airflow中如何工作?启用subDAG是什么意思?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/38310317/