我正在Spark中构建应用程序,并且想在类的方法中使用SparkContext和/或SQLContext,主要是从文件或SQL查询中提取/生成数据集。

例如,我想创建一个T2P对象,其中包含收集数据的方法(在这种情况下,需要访问SparkContext):

class T2P (mid: Int, sc: SparkContext, sqlContext: SQLContext) extends Serializable {

  def getImps(): DataFrame = {
      val imps = sc.textFile("file.txt").map(line => line.split("\t")).map(d => Data(d(0).toInt, d(1), d(2), d(3))).toDF()
      return imps
   }

  def getX(): DataFrame = {
      val x = sqlContext.sql("SELECT a,b,c FROM table")
      return x
  }
}

//creating the T2P object
class App {
    val conf = new SparkConf().setAppName("T2P App").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val t2p = new T2P(0, sc, sqlContext);
}


由于SparkContext不可序列化(在创建T2P对象时出现task not serializable错误),因此将SparkContext作为参数传递给T2P类无效。在类中使用SparkContext / SQLContext的最佳方法是什么?也许这是在Spark中设计数据提取类型过程的错误方法吗?

更新
从这篇文章的评论中意识到,SparkContext不是问题,但是我在'map'函数中使用了using方法,导致Spark尝试序列化整个类。由于SparkContext无法序列化,这将导致错误。

def startMetricTo(userData: ((Int, String), List[(Int, String)]), startMetric: String) : T2PUser = {
  //do something
}

def buildUserRollup() = {
  this.userRollup = this.userSorted.map(line=>startMetricTo(line, this.startMetric))
}


这导致“任务不可序列化”异常。

最佳答案

我通过创建单独的MetricCalc对象存储我的startMetricTo()方法来解决此问题(在注释程序和其他StackOverflow用户的帮助下)。然后,我更改了buildUserRollup()方法以使用此新的startMetricTo()。这允许整个MetricCalc对象被序列化而不会出现问题。

//newly created object
object MetricCalc {
    def startMetricTo(userData: ((Int, String), List[(Int, String)]), startMetric: String) : T2PUser = {
    //do something
  }
}

//using function in T2P
def buildUserRollup(startMetric: String) = {
   this.userRollup = this.userSorted.map(line=>MetricCalc.startMetricTo(line, startMetric))
}

关于java - Spark-如何在类中使用SparkContext?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/31664819/

10-12 13:49