当我尝试扩展 Airflow API 中提供的 SubDagOperator 时, Airflow 网络服务器 GUI 无法将其识别为 SubDagOperator,从而使我无法放大 subdag。

如何扩展 SubDagOperator,同时保留将其放大为 subdag 的能力?我错过了什么吗?

最佳答案

请参阅下面有关如何扩展 SubDagOperator 的示例。您的情况的关键是覆盖 task_type 函数

from airflow import DAG
from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.decorators import apply_defaults


class ExampleSubdagSubclassOperator(SubDagOperator):
  template_fields = ()
  template_ext = ()

  @apply_defaults
  def __init__(self, *args, **kwargs):

    dag = kwargs.get('dag')
    task_id = kwargs.get('task_id')

    subdag = DAG(
        '{}.{}'.format(dag.dag_id, task_id),
        schedule_interval=dag.schedule_interval,
        start_date=dag.start_date
    )

    # Replace the following 3 lines with code to automatically generate the desired tasks in the subdag
    t1 = DummyOperator(dag=subdag, task_id='t1')
    t2 = DummyOperator(dag=subdag, task_id='t2')
    t3 = DummyOperator(dag=subdag, task_id='t3')

    super(ExampleSubdagSubclassOperator, self).__init__(subdag=subdag, *args, **kwargs)

  # This property needs to be overridden so that the airflow UI recognises the task as a subdag and enables
  # the "Zoom into Sub Dag" button
  @property
  def task_type(self):
      return 'SubDagOperator'

关于 Airflow :如何扩展 SubDagOperator?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/42256675/

10-10 03:26