我试图在Spark中跨JavaRDD映射一个函数,并且在NotSerializableError调用中一直得到map

public class SparkPrunedSet extends AbstractSparkSet {

    private final ColumnPruner pruner;

    public SparkPrunedSet(@JsonProperty("parent") SparkSet parent, @JsonProperty("pruner") ColumnPruner     pruner) {
        super(parent);
        this.pruner = pruner;
    }

    public JavaRDD<Record> getRdd(SparkContext context) {
        JavaRDD<Record> rdd = getParent().getRdd(context);
        Function<Record, Record> mappingFunction = makeRecordTransformer(pruner);

        //The line below throws the error
        JavaRDD<Record> mappedRdd = rdd.map(mappingFunction);
        return mappedRdd;
    }

    private Function<Record, Record> makeRecordTransformer() {
        return new Function<Record, Record>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Record call(Record record) throws Exception {
                // Obviously i'd like to do something more useful in here, but this is enough
                // to throw the error
                return record;
            }
        };
    }
}



当它运行时,我得到:
java.io.NotSerializableException:com.package.SparkPrunedSet

Record是实现可序列化的接口,而MapRecord是它的实现。存在与此类似的代码,并且可以在代码库中工作,除了它使用的是rdd.filter。我已经阅读了大多数其他有关堆栈溢出的内容,但似乎都没有帮助。我认为这可能与序列化SparkPrunedSet的麻烦有关(尽管我不明白为什么它甚至需要这样做),因此我将其上的所有字段都设置为transient,但这并没有帮助要么。有人有什么想法吗?

最佳答案

实际上,您为转换创建的FunctionSparkPrunedSet的(匿名)内部类。因此,该函数的每个实例都有对创建它的SparkPrunedSet对象的隐式引用。

因此,对其进行序列化将需要对SparkPrunedSet进行序列化。

10-08 01:37