问题描述
如何配置Airflow,以便DAG中的任何故障都会(立即)导致消息松弛?
此刻,我通过创建slack_failed_task来对其进行管理:
slack_failed_task = SlackAPIPostOperator(
task_id ='slack_failed',
channel =#datalabs,
trigger_rule ='one_failed',
token = ...,
text =':red_circle:DAG Failed',
icon_url =' http://airbnb.io/img/projects/airflow3.png',
dag = dag)
并将此任务(one_failed)设置为DAG中每个其他任务的上游:
slack_failed_task<< download_task_a
slack_failed_task<< download_task_b
slack_failed_task<< process_task_c
slack_failed_task<< process_task_d
slack_failed_task<< other_task_e
它可以工作,但是容易出错,因为忘记添加任务会跳过松弛的通知,似乎像很多工作一样。
也许有办法扩展DAG中的 email_on_failure
属性吗? / p>
Bonus ;-)包括一种将失败任务的名称传递给消息的方法。
也许这个例子会有所帮助:
def slack_failed_task(contextDictionary,** kwargs):
failed_alert = SlackAPIPostOperator(
task_id ='slack_failed',
channel =#datalabs,
token = ...,
text =':red_circle :DAG失败',
owner ='_owner',)
返回failed_alert.execute
task_with_failed_slack_alerts = PythonOperator(
task_id ='task0',
python_callable =<要执行的文件> ;,
on_failure_callback = slack_failed_task,
p rovide_context = True,
dag = dag)
How can I configure Airflow so that any failure in the DAG will (immediately) result in a slack message?
At this moment I manage it by creating a slack_failed_task:
slack_failed_task = SlackAPIPostOperator(
task_id='slack_failed',
channel="#datalabs",
trigger_rule='one_failed',
token="...",
text = ':red_circle: DAG Failed',
icon_url = 'http://airbnb.io/img/projects/airflow3.png',
dag=dag)
And set this task (one_failed) upstream from each other task in the DAG:
slack_failed_task << download_task_a
slack_failed_task << download_task_b
slack_failed_task << process_task_c
slack_failed_task << process_task_d
slack_failed_task << other_task_e
It works, but it's error prone since forgetting to add the task will skip the slack notifications and seems like a lot of work.
Is there perhaps a way to expand on the email_on_failure
property in the DAG?
Bonus ;-) for including a way to pass the name of the failed task to the message.
Maybe this example will be helpful:
def slack_failed_task(contextDictionary, **kwargs):
failed_alert = SlackAPIPostOperator(
task_id='slack_failed',
channel="#datalabs",
token="...",
text = ':red_circle: DAG Failed',
owner = '_owner',)
return failed_alert.execute
task_with_failed_slack_alerts = PythonOperator(
task_id='task0',
python_callable=<file to execute>,
on_failure_callback=slack_failed_task,
provide_context=True,
dag=dag)
这篇关于气流失败的松弛消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!