本文介绍了气流-从XCOM创建动态任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试从XCOM变量生成一组动态任务。在XCOM中,我存储一个列表,我想使用列表中的每个元素动态创建下游任务。

I'm attempting to generate a set of dynamic tasks from a XCOM variable. In the XCOM I'm storing a list and I want to use each element of the list to dynamically create a downstream task.

我的用例是,我有一个上游运算符,该运算符检查sftp服务器中的文件并返回匹配特定条件的文件名列表。我想为返回的每个文件名创建动态的下游任务。

My use case is that I have an upstream operator that checks a sftp server for files and returns a list of file names matching specific criteria. I want to create dynamic downstream tasks for each of the file names returned.

我已将其简化为以下内容,虽然它可以正常工作,但我感觉它不是惯用语气流解决方案。在我的用例中,我将编写一个从python运算符调用的python函数,该函数从xcom中提取值并返回它,而不是使用pusher函数。

I've simplified it to the below, and while it works I feel like its not an idiomatic airflow solution. In my use case, I would write a python function that's called from a python operator that pulls the value from xcom and returns it, instead of using the pusher function.

I我知道虽然我可以创建一个将两者结合在一起的自定义运算符,但我认为创建一个一次性运算符不是一个好习惯,我希望有另一个解决方案。

I understand that while I can create a custom operator that combines both I don't think creating a throwaway operator is good practice and I'm hoping there's another solution.

from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow import DAG
from datetime import datetime, timedelta

default_args = {
    "owner": "test",
    "depends_on_past": False,
    "start_date": datetime(2018, 10, 27),
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "email_on_success": False,
    "retries": 0,
    "provide_context": True
}

dag = DAG("test",  default_args=default_args, schedule_interval="@daily", catchup=False)


def pusher(**context):
    return ['a', 'b', 'c', 'd', 'e']

pusher_task = PythonOperator(
    task_id='pusher_task',
    dag=dag,
    python_callable=pusher  
)

def bash_wrapper(task, **context):
    return BashOperator(
        task_id='dynamic'+task,
        dag=dag,
        bash_command='date'
    )

end = BashOperator(task_id='end', dag=dag, bash_command='echo task has ended')


pusher_task >> [bash_wrapper(task) for task in pusher()] >> end


推荐答案

我不会做你想做的事情之所以能够实现这一目标,主要是因为:

I wouldn't do what you're trying to achieve mainly because:


  1. XCOM值是在运行时
  2. b中生成的状态
  3. DAG结构是由解析时间

  1. XCOM value is a state generated in runtime
  2. DAG structure is something determined in parse time

确定的,即使您使用以下内容来可以访问某些上游任务生成的XCOM值:

Even if you use something like the following to get an access to XCOM values generated by some upstream task:

from airflow.models import TaskInstance
from airflow.utils.db import provide_session

dag = DAG(...)

@provide_session
def get_files_list(session):
    execution_date = dag.previous_schedule(datetime.now())

    // Find previous task instance:
    ti = session.query(TaskInstance).filter(
        TaskInstance.dag_id == dag.dag_id,
        TaskInstance.execution_date == execution_date,
        TaskInstance.task_id == upstream_task_id).first()
    if ti:
        files_list = ti.xcom_pull()
        if files_list:
            return files_list
    // Return default state:
    return {...}


files_list = get_files_list()
// Generate tasks based on upstream task state:
task = PythonOperator(
    ...
    xcom_push=True,
    dag=dag)

但是非常奇怪的是,由于DAG解析和任务执行未按照您希望的方式进行同步。

But this would behave very strangely, because DAG parsing and task execution are not synchronised in a way you wish.

如果要这样做的主要原因是并行处理文件,具有一定数量的静态处理任务(由所需的并行性确定),这些任务从上游任务的XCOM值读取文件列表并对该列表的相关部分进行操作。

If the main reason you want to do this is parallelising files processing, I'd have some static number of processing tasks (determined by the required parallelism) that read files list from upstream task's XCOM value and operate on a relevant portion of that list.

另一种选择正在使用某种分布式框架(如Apache Spark)并行化文件处理。

Another option is parallelising files processing using some framework for distributed computations like Apache Spark.

这篇关于气流-从XCOM创建动态任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-26 21:46