/我正在尝试/
我想对包含BZ2文件的多个HDFS存储桶执行一些Spark UDF转换。我定义了一个MyMain Scala对象extends Serializable
,因为它涉及在每个HDFS存储桶上调用UDF转换。
但是,在执行UDF转换之前,我需要过滤实际上包含某些BZ2文件的HDFS存储桶。这需要我在MyMain.main方法中保留的Hadoop FileSystem操作,以限制驱动程序内存中的这些计算并且不分发给工作节点,因为据我所知FileSystem不可序列化。
但是,即使我制作了一个单独的可序列化的HadoopUtils类并制作了一个单例伴侣对象并在MyMain.main中调用了所有FileSystem操作,我仍然得到
“任务不可序列化”异常(如下)
/问题/
从可序列化的对象(例如MyMain)调用不可序列化的FileSystem操作的方式是什么?另外,class HadoopUtils extends Serializable
似乎不是可序列化的,尽管定义是这样的?
/我的代码/
val prependtoList = (x1: String, x2: List[String]) => x2.map(x1+_)
class HadoopUtils extends Serializable {
def existsDir(fs: FileSystem, path: String) : Boolean = {
val p = new Path(path)
fs.exists(p) && fs.getFileStatus(p).isDirectory
}
def ifBZFileExists(fs: FileSystem, bucketBZDir: String) : Boolean = {
val path = new Path(bucketBZDir)
val fileStatus = fs.listStatus(path).filter(
p => { p.isFile && p.getPath.getName.endsWith(".bz2")}
)
!fileStatus.isEmpty
}
def getBZ2Buckets(fs: FileSystem, lookupPath: String) : List[String] = {
//Filter the list of buckets having at least one BZ2 file in it
val range = (1 to 16).toList.map(x => x.toString)
val buckets = prependtoList("Bucket",range)
val allBuckets = prependtoList(lookupPath + "/", buckets)
//From Bucket1 to Bucket16, filter the buckets that are existing e.g. Bucket5 may not exist
val existingBuckets = allBuckets.filter(p => { existsDir(fs,p) })
val BZ2BucketPaths = existingBuckets.filter(path => { ifBZFileExists(fs,path) }).map(
path => { path + "/*.bz2" })
BZ2BucketPaths
}
}
object HadoopUtils {
val getHadoopUtils = new HadoopUtils
}
object MyMain extends Serializable {
val clusterNameNodeURL = "hdfs://mycluster.domain.com:8020"
val basePath = "/path/to/buckets"
def main(args: Array[String]): Unit = {
//NOTE: spark, hadoopfs defined in main so as to be processed in Driver
val spark = SparkSession
.builder()
.appName("My_App")
.enableHiveSupport()
.getOrCreate()
val hadoopfs: FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val BZ2Buckets =
HadoopUtils.getHadoopUtils.getBZ2BucketPaths(hadoopfs,clusterNameNodeURL + basePath)
BZ2Buckets.foreach(path => {
//Doing Spark UDF transformations on each bucket, which needs to be serialized
})
}
}
/堆栈异常跟踪/
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:850)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:849)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:849)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:616)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:339)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2758)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
at org.apache.spark.sql.Dataset.show(Dataset.scala:747)
at org.apache.spark.sql.Dataset.show(Dataset.scala:724)
at MyMain$.main(<pastie>:197)
... 51 elided
Caused by: java.io.NotSerializableException: HadoopUtils$
Serialization stack:
- object not serializable (class: HadoopUtils$, value: HadoopUtils$@7f5bab61)
- field (class: $iw, name: HadoopUtils$module, type: class HadoopUtils$)
- object (class $iw, $iw@3f4a0d43)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@74d06d1e)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@f9764ea)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@6821099e)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@4f509444)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@11462802)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@11d2d501)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@284fd700)
- field (class: $line14.$read, name: $iw, type: class $iw)
- object (class $line14.$read, $line14.$read@46b4206a)
- field (class: $iw, name: $line14$read, type: class $line14.$read)
- object (class $iw, $iw@33486894)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@25980fc9)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@1fb0d28d)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@42ea11d5)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@42d28cc1)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@22131a73)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@631878e1)
- field (class: $line18.$read, name: $iw, type: class $iw)
- object (class $line18.$read, $line18.$read@561c52c0)
- field (class: $iw, name: $line18$read, type: class $line18.$read)
- object (class $iw, $iw@1d5b8be2)
- field (class: $iw, name: $outer, type: class $iw)
- object (class $iw, $iw@4de4c672)
- field (class: $anonfun$1, name: $outer, type: class $iw)
- object (class $anonfun$1, <function2>)
- element of array (index: 9)
- array (class [Ljava.lang.Object;, size 15)
- field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11, name: references$1, type: class [Ljava.lang.Object;)
- object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11, <function2>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
... 85 more
最佳答案
看来Task not serializable
问题与HadoopUtils
类或对象都不相关。假设在驱动程序中,通过HadoopUtils
即singleton HadoopUtils object
访问HadoopUtils.getHadoopUtil
类的实例,则HadoopUtils
类需要与MyMain
对象一起进行序列化。
这个问题的解决方案可以引用here