以下是当我尝试将作业分派给执行者时导致java.io.NotSerializableException的代码。

    JavaRDD<Row> rddToWrite = dataToWrite.toJavaRDD();
    JavaRDD<String> stringRdd = rddToWrite.map(new Function<Row, String>() {

        /**
         * Serial Version Id
         */
        private static final long serialVersionUID = 6766320395808127072L;

        @Override
        public String call(Row row) throws Exception {
            return row.mkString(dataFormat.getDelimiter());
        }
    });


但是,当我执行以下操作时,任务已成功序列化:

JavaRDD<Row> rddToWrite = dataToWrite.toJavaRDD();
List<String> dataList = rddToWrite.collect().stream().parallel()
                           .map(row -> row.mkString(dataFormat.getDelimiter()))
                           .collect(Collectors.<String>toList());
JavaSparkContext javaSparkContext = new JavaSparkContext(sessionContext.getSparkContext());
JavaRDD<String> stringRDD = javaSparkContext.parallelize(dataList);


谁能帮我指出我在这里做错了什么吗?

编辑:
dataFormat是类中包含此代码的函数编写的私有成员字段。它是DataFormat类的对象,该类定义了两个字段,即spark数据格式(例如“ com.databricks.spark.csv”)和分隔符(例如“ \ t”)。

最佳答案

new Function ...创建的匿名类需要对封闭实例的引用,而对函数进行序列化需要对封闭实例进行序列化,包括dataFormat和所有其他字段。如果该类未标记为Serializable或具有任何不可序列化的非transient字段,则该类将无效。即使这样做,它在静音方面的表现也比必要的要差。

不幸的是,要完全解决此问题,您需要创建一个命名的静态内部类(或只是一个单独的类),它甚至不能是本地的(因为匿名和local classes in Java都不能是静态的):

static class MyFunction extends Function<Row, String> {
    private String delimiter;
    private static final long serialVersionUID = 6766320395808127072L;

    MyFunction(String delimiter) {
        this.delimiter = delimiter;
    }

    @Override
    public String call(Row row) throws Exception {
        return row.mkString(delimiter);
    }
}


接着

JavaRDD<String> stringRdd = rddToWrite.map(new MyFunction(dataFormat.getDelimiter()));

关于java - 映射JavaRDD时获取java.io.NotSerializableException,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/40838697/

10-10 19:44