我正在尝试运行一个非常简单的示例,其中涉及将Spark连接到Cassandra并汇总数据。该实现使用的是spring-cassandra连接器,java,spring和其他很多东西。
这是我通过Spring接线的Spark Config文件
@Configuration
@ComponentScan("test.spark.service")
@Import({CassandraConfig.class})
public class SparkConfig {
@Autowired
private String cassandraUrl;
@Bean
public SparkConf sparkConf() {
SparkConf sparkConf = new SparkConf();
// configure all the bells and whistles
sparkConf
.setMaster("spark://localhost:7077")
.setAppName("DataAggregator")
.set("spark.cassandra.connection.host", cassandraUrl);
return sparkConf;
}
@Bean
public JavaStreamingContext javaStreamingContext() {
return new JavaStreamingContext(sparkConf(), new Duration(1000));
}
}
这是不会抛出异常的服务类
@Service
public class SparkServiceImpl implements SparkService, Serializable {
private static final Logger LOGGER = LoggerFactory.getLogger(SparkServiceImpl.class);
@Autowired
JavaStreamingContext javaStreamingContext;
@Override
public void process() {
CassandraJavaRDD<CassandraRow> rdd = CassandraStreamingJavaUtil.javaFunctions(javaStreamingContext).cassandraTable("keyspace", "table");
}
}
这似乎可行,并返回CassandraJavaRDD
一旦我将实现更改为使用groupBy / function,它就会出现序列化异常
@Service
public class SparkServiceImpl implements SparkService, Serializable {
private static final Logger LOGGER = LoggerFactory.getLogger(SparkServiceImpl.class);
@Autowired
JavaStreamingContext javaStreamingContext;
@Override
public void process() {
CassandraJavaRDD<CassandraRow> rdd = CassandraStreamingJavaUtil.javaFunctions(javaStreamingContext).cassandraTable("keyspace", "table");
JavaPairRDD<Integer, Iterable<CassandraRow>> javaPairRDD = rdd.groupBy(new Function<CassandraRow, Integer>() {
@Override
public Integer call(CassandraRow row) throws Exception {
return row.getInt("int_column");
}
});
}
}
这是堆栈跟踪
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2037)
at org.apache.spark.rdd.RDD$$anonfun$groupBy$3.apply(RDD.scala:694)
at org.apache.spark.rdd.RDD$$anonfun$groupBy$3.apply(RDD.scala:693)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.groupBy(RDD.scala:693)
at org.apache.spark.rdd.RDD$$anonfun$groupBy$1.apply(RDD.scala:665)
at org.apache.spark.rdd.RDD$$anonfun$groupBy$1.apply(RDD.scala:665)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.groupBy(RDD.scala:664)
at org.apache.spark.api.java.JavaRDDLike$class.groupBy(JavaRDDLike.scala:242)
at org.apache.spark.api.java.AbstractJavaRDDLike.groupBy(JavaRDDLike.scala:45)
at test.spark.service.SparkServiceImpl.process(SparkServiceServiceImpl.java:56)
at test.spark.service.SparkServiceTest.testProcess(SparkServiceTest.java:27)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)
at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:252)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:94)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:191)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:117)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:262)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:84)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: java.io.NotSerializableException: org.apache.spark.streaming.api.java.JavaStreamingContext
Serialization stack:
- object not serializable (class: org.apache.spark.streaming.api.java.JavaStreamingContext, value: org.apache.spark.streaming.api.java.JavaStreamingContext@4538856f)
- field (class: test.spark.service.SparkServiceImpl, name: javaStreamingContext, type: class org.apache.spark.streaming.api.java.JavaStreamingContext)
- object (class test.spark.service.SparkServiceImpl, test.spark.service.SparkServiceImpl@7e34b127)
- field (class: test.spark.service.SparkServiceImpl$1, name: this$0, type: class test.spark.service.SparkServiceImpl)
- object (class test.spark.service.SparkServiceImpl$1, test.spark.service.SparkServiceImpl$1@536b71b4)
- field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function)
- object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
... 52 more
除了此异常之外,如果我的服务不可序列化,则也会抛出异常
这是服务
@Service
public class SparkServiceImpl implements SparkService {
private static final Logger LOGGER = LoggerFactory.getLogger(SparkServiceImpl.class);
@Autowired
JavaStreamingContext javaStreamingContext;
@Override
public void process() {
CassandraJavaRDD<CassandraRow> rdd = CassandraStreamingJavaUtil.javaFunctions(javaStreamingContext).cassandraTable("keyspace", "table");
JavaPairRDD<Integer, Iterable<CassandraRow>> javaPairRDD = rdd.groupBy(new Function<CassandraRow, Integer>() {
@Override
public Integer call(CassandraRow row) throws Exception {
return row.getInt("int_column");
}
});
}
}
这是例外
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2037)
at org.apache.spark.rdd.RDD$$anonfun$groupBy$3.apply(RDD.scala:694)
at org.apache.spark.rdd.RDD$$anonfun$groupBy$3.apply(RDD.scala:693)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.groupBy(RDD.scala:693)
at org.apache.spark.rdd.RDD$$anonfun$groupBy$1.apply(RDD.scala:665)
at org.apache.spark.rdd.RDD$$anonfun$groupBy$1.apply(RDD.scala:665)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.groupBy(RDD.scala:664)
at org.apache.spark.api.java.JavaRDDLike$class.groupBy(JavaRDDLike.scala:242)
at org.apache.spark.api.java.AbstractJavaRDDLike.groupBy(JavaRDDLike.scala:45)
at test.spark.service.SparkServiceImpl.process(SparkServiceImpl.java:32)
at test.spark.service.SparkServiceTest.testProcess(SparkServiceTest.java:27)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)
at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:252)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:94)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:191)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:117)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:262)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:84)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: java.io.NotSerializableException: test.spark.service.SparkServiceImpl
Serialization stack:
- object not serializable (class: test.spark.service.SparkServiceImpl, value: test.spark.service.SparkServiceImpl@47b269c4)
- field (class: test.spark.service.SparkServiceImpl$1, name: this$0, type: class test.spark.service.SparkServiceImpl)
- object (class test.spark.service.SparkServiceImpl$1, test.spark.service.SparkServiceImpl$1@23ad71bf)
- field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function)
- object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
... 52 more
最佳答案
快速解决:
将transient
关键字添加到@Bean
中的JavaStreamingContext SparkServiceImpl
@Autowired
private transient JavaStreamingContext javaStreamingContext;
快速说明原因:
这是因为
JavaStreamingContext
是在驱动程序上创建的,并且JavaStreamingContext
作为Spark Streaming功能的主要入口点是必需的。在您的
SparkService
实现-SparkServiceImpl
中,您对RDD进行了一些操作,并且master创建了用于声明的转换的任务。在此阶段之后,将创建的任务发送给工作人员,基本上这是最终执行任务的地方。
因此,工作人员不需要SparkContext和JavaStreamingContext-正如您所说的,序列化
JavaStreamingContext
没有意义。使用
transient
关键字,您只是说您不想序列化JavaStreamingContext
,对于执行spark作业,可以。