本文介绍了Spark应用程序收到“无法序列化任务"的错误?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

以下代码收到无法序列化任务"的错误?

The following code got the error of "Task not serializable"?

错误


Exception in thread "main" org.apache.spark.SparkException: Task not serializable
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
        at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:2101)
        at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
        at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
        at org.apache.spark.rdd.RDD.map(RDD.scala:369)
        at ConnTest$.main(main.scala:41)
        at ConnTest.main(main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.lang.reflect.Method.invoke(Unknown Source)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: DoWork
Serialization stack:
        - object not serializable (class: DoWork, value: DoWork@655621fd)
        - field (class: ConnTest$$anonfun$2, name: doWork$1, type: class DoWork)
        - object (class ConnTest$$anonfun$2, )
        at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
        at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
        at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
        ... 20 more

代码:

object ConnTest extends App {
  override def main(args: scala.Array[String]): Unit = {
    super.main(args)
    val date = args(0)
    val conf = new SparkConf()
    val sc = new SparkContext(conf.setAppName("Test").setMaster("local[*]"))
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    val jdbcSqlConn = "jdbc:sqlserver://......;"

    val listJob = new ItemListJob(sqlContext, jdbcSqlConn)
    val list = listJob.run(date).select("id").rdd.map(r => r(0).asInstanceOf[Int]).collect() 
    // It returns about 3000 rows

    val doWork = new DoWork(sqlContext, jdbcSqlConn)
    val processed = sc.parallelize(list).map(d => {
      doWork.run(d, date)
    })
  }
}

class ItemList(sqlContext: org.apache.spark.sql.SQLContext, jdbcSqlConn: String) {
  def run(date: LocalDate) = {
    sqlContext.read.format("jdbc").options(Map(
      "driver" -> "com.microsoft.sqlserver.jdbc.SQLServerDriver",
      "url" -> jdbcSqlConn,
      "dbtable" -> s"dbo.GetList('$date')"
    )).load()
  }
}

class DoWork(sqlContext: org.apache.spark.sql.SQLContext, jdbcSqlConn: String) {
  def run(id: Int, date: LocalDate) = {
    // ...... read the data from database for id, and create a text file
    val data = sqlContext.read.format("jdbc").options(Map(
      "driver" -> "com.microsoft.sqlserver.jdbc.SQLServerDriver",
      "url" -> jdbcSqlConn,
      "dbtable" -> s"someFunction('$id', $date)"
    )).load()
    // .... create a text file with content of data
    (id, date) 
  }
}

更新:

我将.map()调用更改为以下内容,

I changed the .map() call to the following,

val processed = sc.parallelize(dealList).toDF.map(d => {
  doWork.run(d(0).asInstanceOf[Int], rc)
})

现在我得到了


Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for java.time.LocalDate
- field (class: "java.time.LocalDate", name: "_2")
- root class: "scala.Tuple2"
        at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:602)
        at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:596)
        at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:587)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.immutable.List.foreach(List.scala:381)

推荐答案

问题出在以下闭包中:

val processed = sc.parallelize(list).map(d => {
  doWork.run(d, date)
})

map中的闭包将在执行程序中运行,因此Spark需要序列化doWork,并将其发送给执行程序. DoWork必须可序列化.然而.我看到DoWork包含scsqlContext,所以您不能仅仅使DoWork实现Serializable,因为您不能在执行程序中使用它们.

The closure in map will run in executors, so Spark needs to serialize doWork and send it to executors. DoWork must be serializable. However. I saw DoWork contains sc and sqlContext so you cannot just make DoWork implement Serializable because you cannot use them in executors.

我想您可能想将数据存储到DoWork中的数据库中.如果是这样,您可以将RDD转换为DataFrame并通过jdbc方法将其保存,例如:

I guess you probably want to store data into database in DoWork. If so, you can convert RDD to DataFrame and save it via jdbc method, such as:

sc.parallelize(list).toDF.write.jdbc(...)

由于您未提供DoWork中的代码,因此我无法提供更多建议.

I cannot give more suggestions since you don't provides the codes in DoWork.

这篇关于Spark应用程序收到“无法序列化任务"的错误?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-26 13:20