当我启动一个计算每个密钥平均值的应用程序时出现此错误。我将函数combineBykey与lambda表达式(java8)一起使用。
我读取了三个寄存器(keytimefloat)的文件。我的工人和主人都有Java 8

 16/05/06 15:48:23 INFO DAGScheduler: ShuffleMapStage 0 (mapToPair at ProcesarFichero.java:115) failed in 3.774 s
    16/05/06 15:48:23 INFO DAGScheduler: Job 0 failed: saveAsTextFile at ProcesarFichero.java:153, took 3.950483 s
    Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 5, mcava-slave0): java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.fun$1 of type org.apache.spark.api.java.function.Function in instance of org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1
            at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
            at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
            at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2024)
            at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
            at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
            at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
            at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
            at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
            at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
            at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
            at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
            at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
            at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
            at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
            at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
            at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
            at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
            at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
            at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
            at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
            at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
            at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
            at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
            at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
            at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
            at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:64)
            at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
            at org.apache.spark.scheduler.Task.run(Task.scala:89)
            at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:745)

    Driver stacktrace:
            at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
            at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
            at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
            at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
            at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
            at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
            at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
            at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
            at scala.Option.foreach(Option.scala:236)
            at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
            at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
            at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
            at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
            at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
            at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
            at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
            at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
            at org.apache.spark.SparkContext.runJob(SparkContext.scala:1922)
            at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1213)
            at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1156)
            at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1156)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
            at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
            at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1156)
            at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1060)
            at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1026)
            at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1026)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
            at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
            at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1026)
            at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:952)
            at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:952)
            at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:952)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
            at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
            at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:951)
            at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1443)
            at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1422)
            at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1422)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
            at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
            at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1422)
            at org.apache.spark.api.java.JavaRDDLike$class.saveAsTextFile(JavaRDDLike.scala:507)
            at org.apache.spark.api.java.AbstractJavaRDDLike.saveAsTextFile(JavaRDDLike.scala:46)
            at com.baitic.mcava.spark.ProcesarFichero.main(ProcesarFichero.java:153)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
            at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
            at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
            at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
            at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
    Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.fun$1 of type org.apache.spark.api.java.function.Function in inst
    ance of org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1
            at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
            at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
            at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2024)
            at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
            at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
            at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
            at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
            at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
            at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
            at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
            at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
            at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
            at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
            at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
            at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
            at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
            at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
            at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
            at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
            at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
            at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
            at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
            at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
            at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
            at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
            at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:64)
            at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
            at org.apache.spark.scheduler.Task.run(Task.scala:89)
            at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:745)


这是引发异常的代码:

  AvgCount initial = new AvgCount(0.0F, 0);
    JavaPairRDD<String, AvgCount> avgCounts
            = pairs.combineByKey((Float x) -> new AvgCount(x, 1), (AvgCount a, Float x) -> new AvgCount(a.total_+x,a.num_+1),
                    (AvgCount a, AvgCount b) ->new AvgCount(a.total_+b.total_,a.num_+b.num_));
    avgCounts.saveAsTextFile("hdfs://mcava-master:54310/srv/hadoop/data/spark/xxmedidasSensorca");
    }

 public static class AvgCount implements Serializable {
        public AvgCount(Float total, int num) {
            total_ = total;
            num_ = num;
        }
        public Float total_;
        public int num_;
        public float avg() {
            return total_ / (float) num_;
        }
    }


我使用conf.setjars()函数分发具有所有依赖项的胖子。

最佳答案

我为sparkconf使用.setJars方法,它有效。确保jar文件的路径正确。我很难解决它,因为jar文件的路径不正确,因此最终在调试以从系统属性中获取user.dir时,我能够修复该路径,并且解决方案起作用了

09-27 08:40