问题描述
我正在使用Apache Livy批处理POST方法提交Spark作业。
此HTTP请求是使用AirFlow发送的。提交作业后,我正在使用批处理ID跟踪状态。 Livy /资源管理器。
使用Apache Livy REST API可以做到吗?
Livy有一个端点来获取日志 / sessions / {sessionId} / log
& / batches / {batchId} / log
。
文档:
您可以创建如下所示的python函数来获取日志:
http = HttpHook( GET,http_conn_id = http_conn_id)
def _http_rest_call(自身,方法,终结点,数据=无,标头=无,extra_options =无):
(如果没有)extra_options:
extra_options = {}
self.http.method =方法
响应= http.run(端点,json.dumps(数据),标头,extra_options = extra_options)
返回响应
def _get_batch_session_logs(self,batch_id):
method = GET
端点=批处理/ + str(batch_id)+ / log
响应= self._http_rest_call(方法=方法,端点=端点)
#返回response.json()
返回响应
I am working on submitting Spark job using Apache Livy batches POST method.
This HTTP request is send using AirFlow. After submitting job, I am tracking status using batch Id.
I want to show driver ( client logs) logs on Air Flow logs to avoid going to multiple places AirFLow and Apache Livy/Resource Manager.
Is this possible to do using Apache Livy REST API?
Livy has an endpoint to get logs /sessions/{sessionId}/log
& /batches/{batchId}/log
.
Documentation:
- https://livy.incubator.apache.org/docs/latest/rest-api.html#get-sessionssessionidlog
- https://livy.incubator.apache.org/docs/latest/rest-api.html#get-batchesbatchidlog
You can create python functions like the one shown below to get logs:
http = HttpHook("GET", http_conn_id=http_conn_id)
def _http_rest_call(self, method, endpoint, data=None, headers=None, extra_options=None):
if not extra_options:
extra_options = {}
self.http.method = method
response = http.run(endpoint, json.dumps(data), headers, extra_options=extra_options)
return response
def _get_batch_session_logs(self, batch_id):
method = "GET"
endpoint = "batches/" + str(batch_id) + "/log"
response = self._http_rest_call(method=method, endpoint=endpoint)
# return response.json()
return response
这篇关于如何使用AirFlow提取使用Apache Livy批处理POST方法提交的Spark作业客户端日志的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!