问题描述
我正在尝试使用我通过 dag_run
I'm trying to rendered correctly data
inside a SimpleHttpOperator
in Airflow with configuration that I send via dag_run
result = SimpleHttpOperator(
task_id="schema_detector",
http_conn_id='schema_detector',
endpoint='api/schema/infer',
method='PUT',
data=json.dumps({
'url': '{{ dag_run.conf["url"] }}',
'fileType': '{{ dag_run.conf["fileType"] }}',
}),
response_check=lambda response: response.ok,
response_filter=lambda response: response.json())
问题是渲染的数据看起来像这样
Issue is that the rendered data appears to be like this
{"url": "{{ dag_run.conf[\"url\"] }}", "fileType": "{{ dag_run.conf[\"fileType\"] }}"}
我不确定我在这里做错了什么.
I'm not sure what I'm doing wrong here.
编辑完整代码如下
default_args = {
'owner': 'airflow',
'start_date': days_ago(0),
}
def print_result(**kwargs):
ti = kwargs['ti']
pulled_value_1 = ti.xcom_pull(task_ids='schema_detector')
pprint.pprint(pulled_value_1)
with DAG(
dag_id='airflow_http_operator',
default_args=default_args,
catchup=False,
schedule_interval="@once",
tags=['http']
) as dag:
result = SimpleHttpOperator(
task_id="schema_detector",
http_conn_id='schema_detector',
endpoint='api/schema/infer',
method='PUT',
headers={"Content-Type": "application/json"},
data=json.dumps({
'url': '{{ dag_run.conf["url"] }}',
'fileType': '{{ dag_run.conf["fileType"] }}',
}),
response_check=lambda response: response.ok,
response_filter=lambda response: response.json())
pull = PythonOperator(
task_id='print_result',
python_callable=print_result,
)
result >> pull
推荐答案
由于同样的错误,我挣扎了很多.因此,我创建了自己的 Operator(称为 ExtendedHttpOperator),它是 PythonOperator 和 SimpleHttpOperator 的组合.这对我有用:)
I struggled a lot due to the same error. So, I created my own Operator (called as ExtendedHttpOperator) which is a combination of PythonOperator and SimpleHttpOperator. This worked for me :)
此运算符接收一个函数,我们可以在其中收集从 API 传递的数据(使用 dag_run.conf),并在将其传递给 API 之前对其进行解析(如果需要).
This operator receives a function where we can collect data passed from the API (using dag_run.conf), and parse it (if required) before passing it to an API.
from plugins.operators.extended_http_operator import ExtendedHttpOperator
testing_extend = ExtendedHttpOperator(
task_id="process_user_ids",
http_conn_id="user_api",
endpoint="/kafka",
headers={"Content-Type": "application/json"},
data_fn=passing_data,
op_kwargs={"api": "kafka"},
method="POST",
log_response=True,
response_check=lambda response: True
if validate_response(response) is True
else False,
)
def passing_data(**context):
api = context["api"]
dag_run_conf = context["dag_run"].conf
return json.dumps(dag_run_conf[api])
def validate_response(res):
if res.status_code == 200:
return True
else:
return False
以下是将 ExtendedHttpOperator 添加到气流中的方法:
Here is how you can add ExtendedHttpOperator to your airflow:
将 extended_http_operator.py 文件放在 your_airflow_project/plugins/operators 文件夹中
Put extended_http_operator.py file inside your_airflow_project/plugins/operators folder
# extended_http_operator.py file
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.utils.decorators import apply_defaults
from airflow.exceptions import AirflowException
from airflow.hooks.http_hook import HttpHook
from typing import Optional, Dict
"""
Extend Simple Http Operator with a callable function to formulate data. This data function will
be able to access the context to retrieve data such as task instance. This allow us to write cleaner
code rather than writing one long template line to formulate the json data.
"""
class ExtendedHttpOperator(SimpleHttpOperator):
@apply_defaults
def __init__(
self,
data_fn,
log_response: bool = False,
op_kwargs: Optional[Dict] = None,
*args,
**kwargs
):
super(ExtendedHttpOperator, self).__init__(*args, **kwargs)
if not callable(data_fn):
raise AirflowException("`data_fn` param must be callable")
self.data_fn = data_fn
self.context = None
self.op_kwargs = op_kwargs or {}
self.log_response = log_response
def execute(self, context):
context.update(self.op_kwargs)
self.context = context
http = HttpHook(self.method, http_conn_id=self.http_conn_id)
data_result = self.execute_callable(context)
self.log.info("Calling HTTP method")
self.log.info("Post Data: {}".format(data_result))
response = http.run(
self.endpoint, data_result, self.headers, self.extra_options
)
if self.log_response:
self.log.info(response.text)
if self.response_check:
if not self.response_check(response):
raise AirflowException("Invalid parameters")
def execute_callable(self, context):
return self.data_fn(**context)
不要忘记在 plugins
和 plugins/operators
文件夹中创建空的 __init__.py
文件.
Dont forget to create empty __init__.py
files inside plugins
and plugins/operators
folders.
这篇关于SimpleHttpOperator Airflow,数据模板化的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!