本文介绍了如何使用AirFlow提取使用Apache Livy批处理POST方法提交的Spark作业客户端日志的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Apache Livy批处理POST方法提交Spark作业。



此HTTP请求是使用AirFlow发送的。提交作业后,我正在使用批处理ID跟踪状态。 Livy /资源管理器。



使用Apache Livy REST API可以做到吗?

解决方案

Livy有一个端点来获取日志 / sessions / {sessionId} / log & / batches / {batchId} / log



文档:








您可以创建如下所示的python函数来获取日志:

  http = HttpHook( GET,http_conn_id = http_conn_id)

def _http_rest_call(自身,方法,终结点,数据=无,标头=无,extra_options =无):
(如果没有)extra_options:
extra_options = {}

self.http.method =方法
响应= http.run(端点,json.dumps(数据),标头,extra_options = extra_options)

返回响应


def _get_batch_session_logs(self,batch_id):
method = GET
端点=批处理/ + str(batch_id)+ / log
响应= self._http_rest_call(方法=方法,端点=端点)
#返回response.json()
返回响应


I am working on submitting Spark job using Apache Livy batches POST method.

This HTTP request is send using AirFlow. After submitting job, I am tracking status using batch Id.

I want to show driver ( client logs) logs on Air Flow logs to avoid going to multiple places AirFLow and Apache Livy/Resource Manager.

Is this possible to do using Apache Livy REST API?

解决方案

Livy has an endpoint to get logs /sessions/{sessionId}/log & /batches/{batchId}/log.

Documentation:

You can create python functions like the one shown below to get logs:

http = HttpHook("GET", http_conn_id=http_conn_id)

def _http_rest_call(self, method, endpoint, data=None, headers=None, extra_options=None):
    if not extra_options:
        extra_options = {}

    self.http.method = method
    response = http.run(endpoint, json.dumps(data), headers, extra_options=extra_options)

    return response


def _get_batch_session_logs(self, batch_id):
    method = "GET"
    endpoint = "batches/" + str(batch_id) + "/log"
    response = self._http_rest_call(method=method, endpoint=endpoint)
    # return response.json()
    return response

这篇关于如何使用AirFlow提取使用Apache Livy批处理POST方法提交的Spark作业客户端日志的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-11 08:05