问题描述
我正在使用 LocalScheduler
选项在EC2实例上使用气流。我调用了 airflow scheduler
和 airflow webserver
,一切似乎都运行良好。也就是说,在将 cron
字符串提供给 schedule_interval
以每10分钟执行一次之后,'* / 10 * * * *'
,默认情况下,作业每24小时继续执行一次。这是代码的标题:
I am using airflow on an EC2 instance using the LocalScheduler
option. I've invoked airflow scheduler
and airflow webserver
and everything seems to be running fine. That said, after supplying the cron
string to schedule_interval
for "do this every 10 minutes," '*/10 * * * *'
, the job continue to execute every 24 hours by default. Here's the header of the code:
from datetime import datetime
import os
import sys
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
import ds_dependencies
SCRIPT_PATH = os.getenv('PREPROC_PATH')
if SCRIPT_PATH:
sys.path.insert(0, SCRIPT_PATH)
import workers
else:
print('Define PREPROC_PATH value in environmental variables')
sys.exit(1)
default_args = {
'start_date': datetime(2017, 9, 9, 10, 0, 0, 0), #..EC2 time. Equal to 11pm hora México
'max_active_runs': 1,
'concurrency': 4,
'schedule_interval': '*/10 * * * *' #..every 10 minutes
}
DAG = DAG(
dag_id='dash_update',
default_args=default_args
)
...
推荐答案
仅用于填充传递给DAG中操作员的参数。 max_active_runs
,并发
和 schedule_interval
都是用于初始化的参数您的DAG,而不是运营商。这就是您想要的:
default_args is only meant to fill params passed to operators within a DAG. max_active_runs
, concurrency
, and schedule_interval
are all parameters for initializing your DAG, not operators. This is what you want:
DAG = DAG(
dag_id='dash_update',
start_date=datetime(2017, 9, 9, 10, 0, 0, 0), #..EC2 time. Equal to 11pm hora México
max_active_runs=1,
concurrency=4,
schedule_interval='*/10 * * * *', #..every 10 minutes
default_args=default_args,
)
我之前也将它们混合在一起,以供参考(请注意有重叠):
I've mixed them up before as well, so for reference (note there are overlaps):
DAG参数:
操作员参数:
这篇关于每X分钟运行一次Airflow DAG的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!