本文介绍了在Spark Java API中使用filter(),map(),...时出错(org.apache.spark.SparkException)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当我在java api中使用spark过滤器时,我是spark和
的新手,我收到此错误(如果collect()所有表都正确工作,并且我可以看到所有数据都来自cassandra。 )我检查了主版本和工作版本是否相同,并且当应用程序在Spark的Web ui中启动时我可以看到,但是:

i'm new in spark andwhen i use filter of spark in java api, i get this error(if collect() all of table it's correctly worked and i can see all of data get from cassandra.) i checked master and workers version are same and when application start in web ui of spark i can see it but:

[Stage 0:>                                                          (0 + 0) / 6]
[Stage 0:>                                                          (0 + 2) / 6]
[Stage 0:>                                                          (0 + 4) / 6]



原因:sun.reflect.NativeMethodAccessorImpl.invoke0(本地方法)处的java.lang.reflect.InvocationTargetException )在
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
在java。 lang.reflect.Method.invoke(Method.java:498)在
org.apache.wicket.RequestListener Interface.internalInvoke(RequestListenerInterface.java:258)
... 29个更多

Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.wicket.RequestListenerInterface.internalInvoke(RequestListenerInterface.java:258) ... 29 more

原因:java.lang.RuntimeException:Panel me.SparkTestPanel建。在...

Caused by: java.lang.RuntimeException: Panel me.SparkTestPanel could not be constructed. at ...

原因:org.apache.spark.SparkException:作业由于阶段失败而中止:阶段1的任务1失败了4次,最近一次是
失败:在阶段0.0中丢失任务1.3(TID 10,21.1.0.41,执行者1):
java.lang.ClassNotFoundException:me.SparkTestPanel $ 1 at
java.net.URLClassLoader.findClass(URLClassLoader .java:381),价格为
java.lang.ClassLoader.loadClass(ClassLoader.java:424),价格为
java.lang.ClassLoader.loadClass(ClassLoader.java:357),价格为
java。 lang.Class.forName0(本机方法)于
java.lang.Class.forName(Class.java:348)于
org.apache.spark.serializer.JavaDeserializationStream $$ anon $ 1.resolveClass(JavaSerializer .scala:67)
处的
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1826)

java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713 )
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000)
a t java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)

java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)

java.io .ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java :1535)

java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)

java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)

java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
在java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)

java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
at
java.io。 ObjectInputStream .readOrdinaryObject(ObjectInputStream.java:2027)
在java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)

java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245 )
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
在java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
在java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)

org.apache.spark。 serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)

org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
在org.apache.spark。 scheduler.ResultTask.runTask(ResultTask.scala:80)
在org.apache.spark.scheduler.Task.run(Task.scala:99)在
org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:322)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:624 )
在java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 10, 21.1.0.41, executor 1): java.lang.ClassNotFoundException: me.SparkTestPanel$1 at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1826) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

驱动程序堆栈跟踪:在org.apache.spark.scheduler.DAGScheduler.org $ apache $ spark $ scheduler $ DAGScheduler $$ failJobAndIndependentStages(DAGScheduler.scala:1435)
at
org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.apply(DAGScheduler.scala:1423 )

org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.apply(DAGScheduler.scala:1422)

scala.collection.mutable.ResizableArray $ class.foreach(ResizableArray.scala:59)
在scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)

org.apache.spark.scheduler.DAGScheduler .abortStage(DAGScheduler.scala:1422)
at
org.apache.spark.scheduler.DAGScheduler $$ anonfun $ handleT askSetFailed $ 1.apply(DAGScheduler.scala:802)
at
org.apache.spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed $ 1.apply(DAGScheduler.scala:802)
在scala .Option.foreach(Option.scala:257)在
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)

org.apache.spark.scheduler .DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
at
org.apache .spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
在org.apache.spark.util.EventLoop $$ anon $ 1.run(EventLoop.scala:48)

org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
在org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)在
org.apache.spark .SparkContext.runJob(SparkContext.scala:1938)在
org.apache.spark.SparkConte xt.runJob(SparkContext.scala:1951)at
org.apache.spark.SparkContext.runJob(SparkContext.scala:1965)at
org.apache.spark.rdd.RDD.count(RDD。 scala:1158)在
org.apache.spark.api.java.JavaRDDLike $ class.count(JavaRDDLike.scala:455)

org.apache.spark.api.java。 AbstractJavaRDDLike.count(JavaRDDLike.scala:45)
at me.SparkTestPanel。(SparkTestPanel.java:77)at
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)

sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
在java.lang.reflect.Constructor .newInstance(Constructor.java:423)
... 39更多

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1965) at org.apache.spark.rdd.RDD.count(RDD.scala:1158) at org.apache.spark.api.java.JavaRDDLike$class.count(JavaRDDLike.scala:455) at org.apache.spark.api.java.AbstractJavaRDDLike.count(JavaRDDLike.scala:45) at me.SparkTestPanel.(SparkTestPanel.java:77) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ... 39 more

由以下原因引起:java.lang.ClassNotFoundException:me.SparkTestPanel $ 1 at java.net .URLClassLoader.findClass(URLClassLoader.java:381)在
java.lang.ClassLoa der.loadClass(ClassLoader.java:424)在
java.lang.ClassLoader.loadClass(ClassLoader.java:357)在
java.lang.Class.forName0(本地方法)在
org.apache.spark.serializer.JavaDeserializationStream $$ anon $ 1.resolveClass(JavaSerializer.scala:67)
at
处的java.lang.Class.forName(Class.java:348) java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1826)
at
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
at
java.io。 ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000)
在java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)

java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java: 2245)

处java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)

java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
at java.io.ObjectInputStream.readObject0(Obj ectInputStream.java:1535)

java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)

java.io.ObjectInputStream.readSerialData(ObjectInputStream.java: 2169)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
at
java .io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
在java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)

java.io.ObjectInputStream.defaultReadFields(ObjectInputStream .java:2245)
处的
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)

java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java: 2027)java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)中的
java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
中的
.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
在org .apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80)
在org.apache.spark.scheduler.Task.run(Task.scala:99)在
org.apache.spark .executor.Executor $ TaskRunner.run(Executor.scala:322)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java .util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:624)
... 1个以上

Caused by: java.lang.ClassNotFoundException: me.SparkTestPanel$1 at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1826) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more

我的代码是:

import com.datastax.spark.connector.japi.CassandraJavaUtil;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapColumnTo;
import com.datastax.spark.connector.japi.CassandraRow;
import com.datastax.spark.connector.japi.rdd.CassandraTableScanJavaRDD;

import java.util.List;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

import org.apache.wicket.markup.html.form.Form;

/**
 *
 * @author mohamadreza
 */
public class SparkTestPanel extends Panel {

    private Form form;

    public SparkTestPanel(String id) {
        super(id);
        form = new Form("form");
        form.setOutputMarkupId(true);
        this.add(form);
        SparkConf conf = new SparkConf(true);
        conf.setAppName("Spark Test");
        conf.setMaster("spark://192.16.11.18:7049");
        conf.set("spark.closure.serializer","org.apache.spark.serializer.JavaSerializer");
        conf.set("spark.serializer","org.apache.spark.serializer.JavaSerializer");

        conf.set("spark.cassandra.connection.host", "192.16.11.18");
        conf.set("spark.cassandra.connection.port", "7005");
        conf.set("spark.cassandra.auth.username", "user");
        conf.set("spark.cassandra.auth.password", "password");
        JavaSparkContext sc = null;
        try {
            sc = new JavaSparkContext(conf);
            JavaRDD<CassandraRow> cache = javaFunctions(sc).cassandraTable("keyspace", "test").cache();
            Long count = cache.filter(new Function<CassandraRow, Boolean>() {
                @Override
                public Boolean call(CassandraRow t1) throws Exception {
                    return t1.getString("value").contains("test");
                }
            }).count();
            String a = count.toString();
        } finally {
            sc.stop();
        }
    }
}

Spark版本2.1.1 ,scala版本2.11,JAVA 8和我的pom.xml:

And spark version 2.1.1 ,scala version 2.11,JAVA 8 and my pom.xml:

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.1.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.1.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.1.1</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector_2.11</artifactId>
        <version>2.0.5</version>
    </dependency>

我将docker用于cassandra和spark节点。(cassandra版本3.0)
可以任意一个

I use docker for cassandra and spark nodes.(cassandra version 3.0)Can Any one help me?

推荐答案

问题已解决:)

要使用Apache Spark的JAVA Api,必须将项目的 .jar (位于项目根目录的目标目录中)复制到每个Spark节点(主节点和工作节点)中的$ SPARK_PATH / jars / 。如果您的 .jar 很大,则可以拆分ui和spark代码,仅复制Spark代码项目的 .jar 并在您的ui项目中使用此Spark代码。

When you want use JAVA Api of Apache Spark you must Copy .jar (located in target directory in root of your project) of your project to $SPARK_PATH/jars/ in each Spark node(master and workers).if your .jar is very large you can split ui and spark code and only copy .jar of spark code project and use this spark code in your ui project.

这篇关于在Spark Java API中使用filter(),map(),...时出错(org.apache.spark.SparkException)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-21 04:29