问题描述
我试图了解如何将Spark作业提交给Apache Livy.
I am trying to understand how to submit Spark job to Apache Livy.
我在POM.xml中添加了以下API:
I added the following API to my POM.xml:
<dependency>
<groupId>com.cloudera.livy</groupId>
<artifactId>livy-api</artifactId>
<version>0.3.0</version>
</dependency>
<dependency>
<groupId>com.cloudera.livy</groupId>
<artifactId>livy-scala-api_2.11</artifactId>
<version>0.3.0</version>
</dependency>
然后我在Spark中有以下代码,我想根据请求提交给Livy.
Then I have the following code in Spark that I want to submit to Livy on request.
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
object Test {
def main(args: Array[String]) {
val spark = SparkSession.builder()
.appName("Test")
.master("local[*]")
.getOrCreate()
import spark.sqlContext.implicits._
implicit val sparkContext = spark.sparkContext
// ...
}
}
要使用以下代码创建LivyClient
实例并将应用程序代码上传到Spark上下文:
To have the following code that creates a LivyClient
instance and uploads the application code to the Spark context:
val client = new LivyClientBuilder()
.setURI(new URI(livyUrl))
.build()
try {
client.uploadJar(new File(testJarPath)).get()
client.submit(new Test())
} finally {
client.stop(true)
}
但是,问题是Test
的代码不适用于Apache Livy.
However, the problem is that the code of Test
is not adapted to be used with Apache Livy.
如何调整Test
对象的代码以便能够运行client.submit(new Test())
?
How can I adjust the code of Test
object in order to be able to run client.submit(new Test())
?
推荐答案
您的Test
类需要实现Livy的Job
接口,并且需要在您的Test
类中实现其call
方法,从此处开始将获得对jobContext/SparkContext的访问权限.然后,您可以在submit
方法中传递Test
的实例
Your Test
class needs to implement Livy's Job
interface and you need to implement its call
method in your Test
class, from where you will get access to jobContext/SparkContext. You can then pass the instance of Test
in the submit
method
您不必自己创建SparkSession,Livy会在集群上创建它,您可以在call
方法中访问该上下文.
You don't have to create SparkSession by yourself, Livy will create it on the cluster and you can access that context in your call
method.
您可以在此处找到有关Livy编程API的更多详细信息: https ://livy.incubator.apache.org/docs/latest/programmatic-api.html
You can find more detailed information on Livy's programmatic API here: https://livy.incubator.apache.org/docs/latest/programmatic-api.html
这是Test Class的示例实现:
Here's a sample implementation of Test Class:
import com.cloudera.livy.{Job, JobContext}
class Test extends Job[Int]{
override def call(jc: JobContext): Int = {
val spark = jc.sparkSession()
// Do anything with SparkSession
1 //Return value
}
}
这篇关于如何将Spark作业提交给Apache Livy?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!