我是 Apache Airflow 的新手。我想使用 DAG 调用 REST 端点。
例如 REST 端点

@PostMapping(path = "/api/employees", consumes = "application/json")

现在我想使用 Airflow DAG 调用这个休息端点,并安排它。我正在做的是使用 SimpleHttpOperator 来调用 Rest 端点。
t1 = SimpleHttpOperator(
task_id='post_op',
endpoint='http://localhost:8084/api/employees',
data=json.dumps({"department": "Digital","id": 102,"name": "Rakesh","salary": 80000}),
headers={"Content-Type": "application/json"},
dag=dag,)

当我触发 DAG 时,任务失败了
[2019-12-30 09:09:06,330] {{taskinstance.py:862}} INFO - Executing <Task(SimpleHttpOperator):
post_op> on 2019-12-30T08:57:00.674386+00:00
[2019-12-30 09:09:06,331] {{base_task_runner.py:133}} INFO - Running: ['airflow', 'run',
'example_http_operator', 'post_op', '2019-12-30T08:57:00.674386+00:00', '--job_id', '6', '--pool',
'default_pool', '--raw', '-sd', 'DAGS_FOLDER/ExampleHttpOperator.py', '--cfg_path',
'/tmp/tmpf9t6kzxb']
[2019-12-30 09:09:07,446] {{base_task_runner.py:115}} INFO - Job 6: Subtask post_op [2019-12-30
09:09:07,445] {{__init__.py:51}} INFO - Using executor SequentialExecutor
[2019-12-30 09:09:07,446] {{base_task_runner.py:115}} INFO - Job 6: Subtask post_op [2019-12-30
09:09:07,446] {{dagbag.py:92}} INFO - Filling up the DagBag from
/usr/local/airflow/dags/ExampleHttpOperator.py
[2019-12-30 09:09:07,473] {{base_task_runner.py:115}} INFO - Job 6: Subtask post_op [2019-12-30
09:09:07,472] {{cli.py:545}} INFO - Running <TaskInstance: example_http_operator.post_op 2019-12-
30T08:57:00.674386+00:00 [running]> on host 855dbc2ce3a3
[2019-12-30 09:09:07,480] {{http_operator.py:87}} INFO - Calling HTTP method
[2019-12-30 09:09:07,483] {{logging_mixin.py:112}} INFO - [2019-12-30 09:09:07,483]
{{base_hook.py:84}} INFO - Using connection to: id: http_default. Host: https://www.google.com/,
Port: None, Schema: None, Login: None, Password: None, extra: {}
[2019-12-30 09:09:07,484] {{logging_mixin.py:112}} INFO - [2019-12-30 09:09:07,484]
{{http_hook.py:131}} INFO - Sending 'POST' to url:
https://www.google.com/http://localhost:8084/api/employees
[2019-12-30 09:09:07,501] {{logging_mixin.py:112}} INFO - [2019-12-30 09:09:07,501]
{{http_hook.py:181}} WARNING - HTTPSConnectionPool(host='www.google.com', port=443): Max retries
exceeded with url: /http://localhost:8084/api/employees (Caused by SSLError(SSLError("bad handshake:
SysCallError(-1, 'Unexpected EOF')"))) Tenacity will retry to execute the operation
[2019-12-30 09:09:07,501] {{taskinstance.py:1058}} ERROR -
HTTPSConnectionPool(host='www.google.com', port=443): Max retries exceeded with url:
/http://localhost:8084/api/employees (Caused by SSLError(SSLError("bad handshake: SysCallError(-1,
'Unexpected EOF')")))
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/urllib3/contrib/pyopenssl.py", line 485, in wrap_socket
cnx.do_handshake()
File "/usr/local/lib/python3.7/site-packages/OpenSSL/SSL.py", line 1934, in do_handshake
self._raise_ssl_error(self._ssl, result)
File "/usr/local/lib/python3.7/site-packages/OpenSSL/SSL.py", line 1664, in _raise_ssl_error
raise SysCallError(-1, "Unexpected EOF")
OpenSSL.SSL.SysCallError: (-1, 'Unexpected EOF')

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 672, in urlopen
chunked=chunked,
File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 376, in _make_request
self._validate_conn(conn)
File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 994, in _validate_conn
conn.connect()
File "/usr/local/lib/python3.7/site-packages/urllib3/connection.py", line 394, in connect
ssl_context=context,
File "/usr/local/lib/python3.7/site-packages/urllib3/util/ssl_.py", line 370, in ssl_wrap_socket
return context.wrap_socket(sock, server_hostname=server_hostname)
File "/usr/local/lib/python3.7/site-packages/urllib3/contrib/pyopenssl.py", line 491, in wrap_socket
raise ssl.SSLError("bad handshake: %r" % e)
ssl.SSLError: ("bad handshake: SysCallError(-1, 'Unexpected EOF')",)

Airflow 在 Docker 上运行,docker 镜像是 puckel/docker-airflow
为什么它调用主机 http_default。主持人:https://www.google.com/

最佳答案

您需要同时考虑您使用的 Operator 和它用于连接的底层 HookHookAirflow Connection 获取连接信息,Connections 只是一个用于存储凭证和其他连接信息的容器。您可以在 Airflow UI 中配置 Connection(使用 Airflow UI -> Admin -> Connections)。

所以在这种情况下,您需要首先配置您的 HTTP httpHook

http_hook documentation :

http_conn_id (str) – connection that has the base API url i.e https://www.google.com/

碰巧对于 Connection ,您应该通过将 host 参数设置为等于端点的 base_url 来配置 http://localhost:8084/ : http_conn_id

由于您的运算符(operator)具有默认的 Airflow Connection , Hook 将使用 Airflow UI 中名为“http_default”的 Airflow Connection
如果您不想更改默认值,您可以使用 Airflow UI 创建另一个 Connection,并将新的 conn_id 参数传递给您的运算符(operator)。

请参阅 source code 以更好地了解如何使用 http_hook 对象。

最后,根据 http_operator documentation :
endpoint (str) – The relative part of the full url. (templated)

您应该只将 URL 的相关部分传递给运算符(operator)。其余的将从底层 endpoint 获得。

在这种情况下,Operatorapi/employees 值应该是 ojit_code(不是完整的 URL)。

不幸的是,在这种情况下,Airflow 项目文档不是很清楚。请考虑做出改进,他们总是受欢迎的:)

关于rest - 如何使用 Airflow DAG 调用 REST 端点,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/59574331/

10-13 06:32