本文介绍了如何从Airflow SimpleHttpOperator GET请求访问响应的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在学习Airflow,有一个简单的问题。下面是我的DAG,称为 dog_retriever

I'm learning Airflow and have a simple quesiton. Below is my DAG called dog_retriever

import airflow
from airflow import DAG
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.operators.sensors import HttpSensor
from datetime import datetime, timedelta
import json



default_args = {
    'owner': 'Loftium',
    'depends_on_past': False,
    'start_date': datetime(2017, 10, 9),
    'email': '[email protected]',
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=3),
}

dag = DAG('dog_retriever',
    schedule_interval='@once',
    default_args=default_args)

t1 = SimpleHttpOperator(
    task_id='get_labrador',
    method='GET',
    http_conn_id='http_default',
    endpoint='api/breed/labrador/images',
    headers={"Content-Type": "application/json"},
    dag=dag)

t2 = SimpleHttpOperator(
    task_id='get_breeds',
    method='GET',
    http_conn_id='http_default',
    endpoint='api/breeds/list',
    headers={"Content-Type": "application/json"},
    dag=dag)

t2.set_upstream(t1)

作为测试Airflow的一种方法,我只是向这个端点中的某些端点发出两个GET请求简单的 API。目标是学习如何处理通过Airflow检索到的一些数据

As a means to test out Airflow, I'm simply making two GET requests to some endpoints in this very simple http://dog.ceo API. The goal is to learn how to work with some data retrieved via Airflow

执行正常-我的代码成功调用了任务t1和t2中的指定点,我可以看到它们根据我编写的 set_upstream 规则以正确的顺序登录到Airflow UI中。

The execution is working- my code successfully calls the enpoints in tasks t1 and t2, I can see them being logged in the Airflow UI, in the correct order based on the set_upstream rule I wrote.

我不知道如何访问这两个任务的json响应。看起来很简单,但我无法弄清楚。在SimpleHtttpOperator中,我看到了response_check的参数,但是没有任何东西可以简单地打印,存储或查看json响应。

What I cannot figure out is how to ACCESS the json response of these 2 tasks. It seems so simple, but I cannot figure it out. In the SimpleHtttpOperator I see a param for response_check, but nothing to simply print, or store, or view the json response.

谢谢。

推荐答案

因此,由于这是SimpleHttpOperator,实际的json已推送到XCOM,因此您可以从那里获取它。这是该操作的代码行:

So since this is SimpleHttpOperator and the actual json is pushed to XCOM and you can get it from there. Here is the line of code for that action: https://github.com/apache/incubator-airflow/blob/master/airflow/operators/http_operator.py#L87

您需要做什么设置为 xcom_push = True ,因此您的第一个t1将为:

What you need to do is set xcom_push=True, so your first t1 will be the following:

t1 = SimpleHttpOperator(
    task_id='get_labrador',
    method='GET',
    http_conn_id='http_default',
    endpoint='api/breed/labrador/images',
    headers={"Content-Type": "application/json"},
    xcom_push=True,
    dag=dag)

您应该能够在XCOM中找到所有带有返回值的JSON,可以更详细地了解XCOM在以下位置找到:

You should be able to find all JSON with return value in XCOM, more detail of XCOM can be found at: https://airflow.incubator.apache.org/concepts.html#xcoms

这篇关于如何从Airflow SimpleHttpOperator GET请求访问响应的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-26 21:53