以下是当我尝试将作业分派给执行者时导致java.io.NotSerializableException的代码。
JavaRDD<Row> rddToWrite = dataToWrite.toJavaRDD();
JavaRDD<String> stringRdd = rddToWrite.map(new Function<Row, String>() {
/**
* Serial Version Id
*/
private static final long serialVersionUID = 6766320395808127072L;
@Override
public String call(Row row) throws Exception {
return row.mkString(dataFormat.getDelimiter());
}
});
但是,当我执行以下操作时,任务已成功序列化:
JavaRDD<Row> rddToWrite = dataToWrite.toJavaRDD();
List<String> dataList = rddToWrite.collect().stream().parallel()
.map(row -> row.mkString(dataFormat.getDelimiter()))
.collect(Collectors.<String>toList());
JavaSparkContext javaSparkContext = new JavaSparkContext(sessionContext.getSparkContext());
JavaRDD<String> stringRDD = javaSparkContext.parallelize(dataList);
谁能帮我指出我在这里做错了什么吗?
编辑:
dataFormat是类中包含此代码的函数编写的私有成员字段。它是DataFormat类的对象,该类定义了两个字段,即spark数据格式(例如“ com.databricks.spark.csv”)和分隔符(例如“ \ t”)。
最佳答案
由new Function ...
创建的匿名类需要对封闭实例的引用,而对函数进行序列化需要对封闭实例进行序列化,包括dataFormat
和所有其他字段。如果该类未标记为Serializable
或具有任何不可序列化的非transient
字段,则该类将无效。即使这样做,它在静音方面的表现也比必要的要差。
不幸的是,要完全解决此问题,您需要创建一个命名的静态内部类(或只是一个单独的类),它甚至不能是本地的(因为匿名和local classes in Java都不能是静态的):
static class MyFunction extends Function<Row, String> {
private String delimiter;
private static final long serialVersionUID = 6766320395808127072L;
MyFunction(String delimiter) {
this.delimiter = delimiter;
}
@Override
public String call(Row row) throws Exception {
return row.mkString(delimiter);
}
}
接着
JavaRDD<String> stringRdd = rddToWrite.map(new MyFunction(dataFormat.getDelimiter()));
关于java - 映射JavaRDD时获取java.io.NotSerializableException,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/40838697/