本文介绍了JavaSparkContext不可序列化的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在cassandra中使用spark,并且有JavaRDD<String>个客户端.对于每个客户,我都希望从cassandra中选择他的互动方式,例如:

I'm using spark with cassandra, and i hava a JavaRDD<String> of clients. And for each client, i want to select from cassandra his Interactions like this :

avaPairRDD<String, List<InteractionByMonthAndCustomer>> a = client.mapToPair(new PairFunction<String, String, List<InteractionByMonthAndCustomer>>() {
        @Override
        public Tuple2<String, List<InteractionByMonthAndCustomer>> call(String s) throws Exception {
            List<InteractionByMonthAndCustomer> b = javaFunctions(sc)
                    .cassandraTable(CASSANDRA_SCHEMA, "interaction_by_month_customer")
                    .where("ctid =?", s)
                    .map(new Function<CassandraRow, InteractionByMonthAndCustomer>() {
                        @Override
                        public InteractionByMonthAndCustomer call(CassandraRow cassandraRow) throws Exception {
                            return new InteractionByMonthAndCustomer(cassandraRow.getString("channel"),
                                    cassandraRow.getString("motif"),
                                    cassandraRow.getDate("start"),
                                    cassandraRow.getDate("end"),
                                    cassandraRow.getString("ctid"),
                                    cassandraRow.getString("month")
                            );
                        }
                    }).collect();
            return new Tuple2<String, List<InteractionByMonthAndCustomer>>(s, b);
        }
    });

为此,我正在使用一个JavaSparkContext sc.但是我遇到了这个错误:

For this i'm using one JavaSparkContext sc. But i got this error :

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
at org.apache.spark.rdd.RDD.map(RDD.scala:270)
at org.apache.spark.api.java.JavaRDDLike$class.mapToPair(JavaRDDLike.scala:99)
at org.apache.spark.api.java.JavaRDD.mapToPair(JavaRDD.scala:32)
at fr.aid.cim.spark.dao.GenrateCustumorJourney.AllCleintInteractions(GenrateCustumorJourney.java:91)
at fr.aid.cim.spark.dao.GenrateCustumorJourney.main(GenrateCustumorJourney.java:75)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 14 more

我认为JavaSparkContext必须可序列化.但是我如何使其可序列化?

I think that the JavaSparkContext must be serializable. But how can i make it serializable please ?

谢谢.

推荐答案

否,JavaSparkContext不可序列化,也不应该是可序列化的.发送给远程工作人员的功能中不能使用它.在这里,您没有显式引用它,但是无论如何,引用都将被序列化,因为您的匿名内部类函数不是static,因此具有对封闭类的引用.

No, JavaSparkContext is not serializable and is not supposed to be. It can't be used in a function you send to remote workers. Here you're not explicitly referencing it but a reference is being serialized anyway because your anonymous inner class function is not static and therefore has a reference to the enclosing class.

尝试使用此功能作为static独立对象重写代码.

Try rewriting your code with this function as a static, stand-alone object.

这篇关于JavaSparkContext不可序列化的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

06-29 09:22