问题描述
所以我正在使用 Apache Airflow 创建一个数据流,以获取一些存储在 Pandas Dataframe 中的数据,然后将其存储到 MongoDB 中.所以我有两种 python 方法,一种用于获取数据并返回数据帧,另一种用于将其存储到相关数据库中.如何获取一项任务的输出并将其作为另一项任务的输入?这就是我目前所拥有的(总结和浓缩版)
So I'm creating a data flow with Apache Airflow for grabbing some data that's stored in a Pandas Dataframe and then storing it into MongoDB. So I have two python methods, one for fetching the data and returning the dataframe and the other for storing it into the relevant database. How do I take the output of one task and feed it as the input to another task? This is what I have so far (summarized and condensed version)
我研究了 xcom pull 和 push 的概念,这就是我在下面实现的,我还看到有一个用于气流的 MongoHook,但不太确定如何使用它.
I looked into the concept of xcom pull and push and that's what I implemented below , I also saw that there's a MongoHook for Airflow but wasn't quite sure on how to use it.
import pandas as pd
import pymongo
import airflow
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
def get_data(name, **context):
data = pd.read_csv('dataset.csv')
df = data.loc[data.name == name]
context['ti'].xcom_push(task_ids=['get-data'], value=data)
def push_to_db(df, dbname, collection):
client = pymongo.MongoClient(-insert creds here-)
db = client[dbname][collection]
data = df.to_dict(orient='records')
db.insert_many(data)
args = {
'owner': 'Airflow',
'start_date': airflow.utils.dates.days_ago(2),
}
dag = DAG(
dag_id='simple_xcom',
default_args=args,
start_date=datetime(2019, 09, 02),
schedule_interval="@daily",
retries=2
)
task1 = PythonOperator(task_id='get-data', params=['name': 'John'],
python_callable=get_data,
provide_context=True, dag=dag)
task2 = PythonOperator(task_id='load-db', params=['df': context['ti'].xcom_pull(task_ids=['get-data'], key='data'),
'dbname': 'person', 'table': 'salary'),
python_callable=push_to_db, provide_context=True, dag=dag)
task1 >> task2
每次我尝试运行它时,它都会显示上下文不存在.所以也许我在将一项任务的输出作为另一个任务的输入方面做错了什么?
Everytime I try to run it, it displays that context does not exist. So maybe I'm doing some wrong in terms of feeding the output of one task as the input to another?
推荐答案
查看示例 xcom DAG.
Have a look at the example xcom DAG.
https://github.com/apache/airflow/blob/master/airflow/example_dags/example_xcom.py
这篇关于使用一个 Python 任务的输出并用作 Airflow 上另一个 Python 任务的输入的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!