我试图在Spark中跨JavaRDD映射一个函数,并且在NotSerializableError
调用中一直得到map
。
public class SparkPrunedSet extends AbstractSparkSet {
private final ColumnPruner pruner;
public SparkPrunedSet(@JsonProperty("parent") SparkSet parent, @JsonProperty("pruner") ColumnPruner pruner) {
super(parent);
this.pruner = pruner;
}
public JavaRDD<Record> getRdd(SparkContext context) {
JavaRDD<Record> rdd = getParent().getRdd(context);
Function<Record, Record> mappingFunction = makeRecordTransformer(pruner);
//The line below throws the error
JavaRDD<Record> mappedRdd = rdd.map(mappingFunction);
return mappedRdd;
}
private Function<Record, Record> makeRecordTransformer() {
return new Function<Record, Record>() {
private static final long serialVersionUID = 1L;
@Override
public Record call(Record record) throws Exception {
// Obviously i'd like to do something more useful in here, but this is enough
// to throw the error
return record;
}
};
}
}
当它运行时,我得到:
java.io.NotSerializableException:com.package.SparkPrunedSet
Record
是实现可序列化的接口,而MapRecord
是它的实现。存在与此类似的代码,并且可以在代码库中工作,除了它使用的是rdd.filter
。我已经阅读了大多数其他有关堆栈溢出的内容,但似乎都没有帮助。我认为这可能与序列化SparkPrunedSet
的麻烦有关(尽管我不明白为什么它甚至需要这样做),因此我将其上的所有字段都设置为transient
,但这并没有帮助要么。有人有什么想法吗? 最佳答案
实际上,您为转换创建的Function
是SparkPrunedSet
的(匿名)内部类。因此,该函数的每个实例都有对创建它的SparkPrunedSet
对象的隐式引用。
因此,对其进行序列化将需要对SparkPrunedSet
进行序列化。