考虑以下功能:
public void execute4() {
File filePath = new File(filePathData);
File[] files = filePath.listFiles((File filePathData) -> filePathData.getName().endsWith("CDR"));
List<CDR> cdrs = new ArrayList<CDR>();
Arrays.asList(files).parallelStream().forEach(file -> readCDRP(cdrs, file));
cdrs.sort(cdrsorter);
}
它读取包含CDR的文件列表并执行readCDRP(),这是这样的:
private void readCDRP(List<CDR> cdrs, File file) {
final CDR cdr = new CDR(file.getName());
try (BufferedReader bfr = new BufferedReader(new FileReader(file))) {
List<String> lines = bfr.lines().collect(Collectors.toList());
lines.parallelStream().forEach(e -> {
String[] data = e.split(",", -1);
CDREntry entry = new CDREntry(file.getName());
for (int i = 0; i < data.length; i++) {
entry.setField(i, data[i]);
}
cdr.addEntry(entry);
});
if (cdr != null) {
cdrs.add(cdr);
}
} catch (IOException e) {
e.printStackTrace();
}
}
我观察到的是,偶尔(并非一直),我在行上的readCDRP函数上遇到ArrayIndexNotBound异常(这很尴尬,因为cdr的列表是ArrayList()):
cdr.addEntry(entry);
要么
在execute4()的最后一行应用排序。
我认为问题在于,execute4中的第一个parallelStream与readCDRP()中的第二个parallelStream执行不在内存中的单独空间中,并且似乎也错误地共享了数据。我无法确认使用“似乎”一词,这只是一个h子。
问题是:
从JDK8的角度来看,我的代码是否容易出错?
是否有使用相同流程的解决方法,例如使用CountDownLatch?
是ForkJoinPool的限制吗?
感谢您的回复。
编辑(1):
addEntry是类本身的一部分:
class CDR {
public final String fileName;
private final List<CDREntry> entries = new ArrayList<CDREntry>();
public CDR(String fileName) {
super();
this.fileName = fileName;
}
public List<CDREntry> getEntries() {
return entries;
}
public List<CDREntry> addEntry(CDREntry e) {
entries.add(e);
return entries;
}
public String getFileName() {
return this.fileName;
}
}
最佳答案
当以函数式风格开始编程时,您应该首选可以通过构造(或可能使用生成器模式或某些工厂方法)完全创建的不可变对象。因此,您的CDREntry
类可能如下所示:
class CDREntry {
private final String[] fields;
private final String name;
public CDREntry(String name, String[] fields) {
this.name = name;
this.fields = fields;
}
// Add getters and whatever
}
您的
CDR
类可能如下所示:class CDR {
private final String fileName;
private final List<CDREntry> entries;
public CDR(String fileName, List<CDREntry> entries) {
this.fileName = fileName;
this.entries = entries;
}
public List<CDREntry> getEntries() {
return entries;
}
public String getFileName() {
return this.fileName;
}
}
有了这样的课程,事情变得容易了。其余代码可以这样重写:
public void execute4() {
File filePath = new File(filePathData);
File[] files = filePath.listFiles((File data, String name) ->
data.getName().endsWith("CDR")); // fixed this line: it had compilation error
List<CDR> cdrs = Arrays.stream(files).parallel()
.map(this::readCDRP).sorted(cdrsorter)
.collect(Collectors.toList());
}
private CDR readCDRP(File file) {
try (BufferedReader bfr = new BufferedReader(new FileReader(file))) {
// I'm not sure that collecting lines into list
// before main processing was actually necessary
return bfr.lines().parallelStream()
.map(e -> new CDREntry(file.getName(), e.split(",", -1)))
.collect(Collectors.collectingAndThen(
Collectors.toList(), list -> new CDR(file.getName(), list)));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
通常请记住,
forEach
通常不是解决任务的最干净的方法。将流集成到旧代码中时可能会有所帮助,但通常应避免使用。