我想将对象从驱动程序节点传递到RDD所在的其他节点,以便RDD的每个分区都可以访问该对象,如以下代码片段所示。

object HelloSpark {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
                .setAppName("Testing HelloSpark")
                .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                .set("spark.kryo.registrator", "xt.HelloKryoRegistrator")

        val sc = new SparkContext(conf)
        val rdd = sc.parallelize(1 to 20, 4)
        val bytes = new ImmutableBytesWritable(Bytes.toBytes("This is a test"))

        rdd.map(x => x.toString + "-" + Bytes.toString(bytes.get) + " !")
            .collect()
            .foreach(println)

        sc.stop
    }
}

// My registrator
class HelloKryoRegistrator extends KryoRegistrator {
    override def registerClasses(kryo: Kryo) = {
        kryo.register(classOf[ImmutableBytesWritable], new HelloSerializer())
    }
}

//My serializer
class HelloSerializer extends Serializer[ImmutableBytesWritable] {
    override def write(kryo: Kryo, output: Output, obj: ImmutableBytesWritable): Unit = {
        output.writeInt(obj.getLength)
        output.writeInt(obj.getOffset)
        output.writeBytes(obj.get(), obj.getOffset, obj.getLength)
    }

    override def read(kryo: Kryo, input: Input, t: Class[ImmutableBytesWritable]): ImmutableBytesWritable = {
        val length = input.readInt()
        val offset = input.readInt()
        val bytes  = new Array[Byte](length)
        input.read(bytes, offset, length)

        new ImmutableBytesWritable(bytes)
    }
}

在上面的代码段中,我尝试在Spark中对Kryo的ImmutableBytesWritable进行序列化,所以我做了以下工作:
  • 配置传递到Spark上下文的SparkConf实例,即,将“spark.serializer”设置为“org.apache.spark.serializer.KryoSerializer”,并将“spark.kryo.registrator”设置为“xt.HelloKryoRegistrator”;
  • 编写一个自定义的Kryo注册器类,在其中注册类ImmutableBytesWritable;
  • 为ImmutableBytes写一个序列化器

    但是,当我以yarn-client模式提交我的Spark应用程序时,引发了以下异常:



    看来ImmutableBytesWritable不能被Kryo序列化。那么让Spark使用Kryo序列化对象的正确方法是什么? Kryo可以序列化任何类型吗?

    最佳答案

    发生这种情况是因为您在封包中使用了ImmutableBytesWritable。 Spark尚不支持使用Kryo进行闭包序列化(仅支持RDD中的对象)。您可以借助此方法来解决您的问题:

    Spark - Task not serializable: How to work with complex map closures that call outside classes/objects?

    您只需要在通过闭包之前对对象进行序列化,然后再进行反序列化即可。即使您的类(class)不是可序列化的,这种方法也行得通,因为它在后台使用了Kryo。您所需要的只是一些 curry 。 ;)

    这是一个示例草图:

    def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)])
                   (foo: Foo) : Bar = {
        kryoWrapper.value.apply(foo)
    }
    val mapper = genMapper(KryoSerializationWrapper(new ImmutableBytesWritable(Bytes.toBytes("This is a test")))) _
    rdd.flatMap(mapper).collectAsMap()
    
    object ImmutableBytesWritable(bytes: Bytes) extends (Foo => Bar) {
        def apply(foo: Foo) : Bar = { //This is the real function }
    }
    

    关于serialization - 如何让Spark使用Kryo序列化对象?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/28554141/

  • 10-12 17:37