气流失败的松弛消息

气流失败的松弛消息

本文介绍了气流失败的松弛消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如何配置Airflow,以便DAG中的任何故障都会(立即)导致消息松弛?



此刻,我通过创建slack_failed_task来对其进行管理:

  slack_failed_task = SlackAPIPostOperator(
task_id ='slack_failed',
channel =#datalabs,
trigger_rule ='one_failed',
token = ...,
text =':red_circle:DAG Failed',
icon_url =' http://airbnb.io/img/projects/airflow3.png',
dag = dag)

并将此任务(one_failed)设置为DAG中每个其他任务的上游:

  slack_failed_task<< download_task_a 
slack_failed_task<< download_task_b
slack_failed_task<< process_task_c
slack_failed_task<< process_task_d
slack_failed_task<< other_task_e

它可以工作,但是容易出错,因为忘记添加任务会跳过松弛的通知,似乎像很多工作一样。



也许有办法扩展DAG中的 email_on_failure 属性吗? / p>

Bonus ;-)包括一种将失败任务的名称传递给消息的方法。

解决方案

也许这个例子会有所帮助:

  def slack_failed_task(contextDictionary,** kwargs): 
failed_alert = SlackAPIPostOperator(
task_id ='slack_failed',
channel =#datalabs,
token = ...,
text =':red_circle :DAG失败',
owner ='_owner',)
返回failed_alert.execute


task_with_failed_slack_alerts = PythonOperator(
task_id ='task0',
python_callable =<要执行的文件> ;,
on_failure_callback = slack_failed_task,
p rovide_context = True,
dag = dag)


How can I configure Airflow so that any failure in the DAG will (immediately) result in a slack message?

At this moment I manage it by creating a slack_failed_task:

slack_failed_task =  SlackAPIPostOperator(
    task_id='slack_failed',
    channel="#datalabs",
    trigger_rule='one_failed',
    token="...",
    text = ':red_circle: DAG Failed',
    icon_url = 'http://airbnb.io/img/projects/airflow3.png',
    dag=dag)

And set this task (one_failed) upstream from each other task in the DAG:

slack_failed_task << download_task_a
slack_failed_task << download_task_b
slack_failed_task << process_task_c
slack_failed_task << process_task_d
slack_failed_task << other_task_e

It works, but it's error prone since forgetting to add the task will skip the slack notifications and seems like a lot of work.

Is there perhaps a way to expand on the email_on_failure property in the DAG?

Bonus ;-) for including a way to pass the name of the failed task to the message.

解决方案

Maybe this example will be helpful:

def slack_failed_task(contextDictionary, **kwargs):
       failed_alert = SlackAPIPostOperator(
         task_id='slack_failed',
         channel="#datalabs",
         token="...",
         text = ':red_circle: DAG Failed',
         owner = '_owner',)
         return failed_alert.execute


task_with_failed_slack_alerts = PythonOperator(
task_id='task0',
python_callable=<file to execute>,
on_failure_callback=slack_failed_task,
provide_context=True,
dag=dag)

这篇关于气流失败的松弛消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-01 20:21