我正在Spark中构建应用程序,并且想在类的方法中使用SparkContext和/或SQLContext,主要是从文件或SQL查询中提取/生成数据集。
例如,我想创建一个T2P对象,其中包含收集数据的方法(在这种情况下,需要访问SparkContext):
class T2P (mid: Int, sc: SparkContext, sqlContext: SQLContext) extends Serializable {
def getImps(): DataFrame = {
val imps = sc.textFile("file.txt").map(line => line.split("\t")).map(d => Data(d(0).toInt, d(1), d(2), d(3))).toDF()
return imps
}
def getX(): DataFrame = {
val x = sqlContext.sql("SELECT a,b,c FROM table")
return x
}
}
//creating the T2P object
class App {
val conf = new SparkConf().setAppName("T2P App").setMaster("local[2]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val t2p = new T2P(0, sc, sqlContext);
}
由于SparkContext不可序列化(在创建T2P对象时出现
task not serializable
错误),因此将SparkContext作为参数传递给T2P类无效。在类中使用SparkContext / SQLContext的最佳方法是什么?也许这是在Spark中设计数据提取类型过程的错误方法吗?更新
从这篇文章的评论中意识到,SparkContext不是问题,但是我在'map'函数中使用了using方法,导致Spark尝试序列化整个类。由于SparkContext无法序列化,这将导致错误。
def startMetricTo(userData: ((Int, String), List[(Int, String)]), startMetric: String) : T2PUser = {
//do something
}
def buildUserRollup() = {
this.userRollup = this.userSorted.map(line=>startMetricTo(line, this.startMetric))
}
这导致“任务不可序列化”异常。
最佳答案
我通过创建单独的MetricCalc
对象存储我的startMetricTo()方法来解决此问题(在注释程序和其他StackOverflow用户的帮助下)。然后,我更改了buildUserRollup()方法以使用此新的startMetricTo()。这允许整个MetricCalc
对象被序列化而不会出现问题。
//newly created object
object MetricCalc {
def startMetricTo(userData: ((Int, String), List[(Int, String)]), startMetric: String) : T2PUser = {
//do something
}
}
//using function in T2P
def buildUserRollup(startMetric: String) = {
this.userRollup = this.userSorted.map(line=>MetricCalc.startMetricTo(line, startMetric))
}
关于java - Spark-如何在类中使用SparkContext?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/31664819/