我是scala的新手。
我正在尝试实现自定义kryo序列化。

在其中我有两个类和一个对象:

操作

package org.agg
object Operation {
  def main(args: Array[String]) {
    var SparkConf = new SparkConf()
      .setAppName("Operation")
      .set("spark.io.compression.codec", "org.apache.spark.io.SnappyCompressionCodec")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .set("spark.kryo.registrationRequired", "true")
      .set("spark.kryo.registrator", "org.agg.KryoClass")

    var sc = new SparkContext(SparkConf)
    var sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
    println("**********Operation***********")
  }
}

KryoClass
package org.agg
class KryoClass extends KryoRegistrator {
  def registerClasses(kryo: Kryo) {
    println("**********KryoClass***********")
    kryo.register(classOf[org.agg.KryoSerializeCode])
  }
}

KryoSerializeCode
package org.agg
class KryoSerializeCode {
 println("**********KryoSerializeCode*************")
}

我正在考虑在操作类中如果我正在编写 set(“spark.kryo.registrator”,“org.agg.KryoClass”)
因此,这应该调用 KryoClass ,它将在日志文件中打印 println(“********** KryoClass ***********”)语句。

执行的命令如下:操作对象:



但是执行此操作后,仅在操作类中打印打印语句,而不是在 KryoClass KryoSerializeCode 类中打印。

有谁知道为什么它不调用 KryoClass KryoSerializeCode 类内的打印语句。

最佳答案

Spark有懒惰地做事的习惯-将不必要的操作延迟到实际需要它们之前。似乎Kryo序列化程序的创建属于该类别-由于您实际上对新创建的SparkContext并没有执行任何操作,因此Spark不会费心创建序列化程序。

如果您将任何Spark操作添加到Operation.main中,例如:

sc.parallelize(List(1,2,3)).count()

您会看到期望的打印输出。

注意:您可能必须注册更多的类,或禁用registrationRequired

09-15 17:53