考虑以下功能:

public void execute4() {
        File filePath = new File(filePathData);
        File[] files = filePath.listFiles((File filePathData) -> filePathData.getName().endsWith("CDR"));
        List<CDR> cdrs = new ArrayList<CDR>();
        Arrays.asList(files).parallelStream().forEach(file -> readCDRP(cdrs, file));
        cdrs.sort(cdrsorter);
    }


它读取包含CDR的文件列表并执行readCDRP(),这是这样的:

private void readCDRP(List<CDR> cdrs, File file) {
    final CDR cdr = new CDR(file.getName());
    try (BufferedReader bfr = new BufferedReader(new FileReader(file))) {
        List<String> lines = bfr.lines().collect(Collectors.toList());
        lines.parallelStream().forEach(e -> {
            String[] data = e.split(",", -1);
            CDREntry entry = new CDREntry(file.getName());
            for (int i = 0; i < data.length; i++) {
                entry.setField(i, data[i]);
            }
            cdr.addEntry(entry);
        });

        if (cdr != null) {
            cdrs.add(cdr);
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
}


我观察到的是,偶尔(并非一直),我在行上的readCDRP函数上遇到ArrayIndexNotBound异常(这很尴尬,因为cdr的列表是ArrayList()):

cdr.addEntry(entry);


要么
在execute4()的最后一行应用排序。

我认为问题在于,execute4中的第一个parallelStream与readCDRP()中的第二个parallelStream执行不在内存中的单独空间中,并且似乎也错误地共享了数据。我无法确认使用“似乎”一词,这只是一个h子。

问题是:
从JDK8的角度来看,我的代码是否容易出错?
是否有使用相同流程的解决方法,例如使用CountDownLatch?
是ForkJoinPool的限制吗?

感谢您的回复。

编辑(1):
addEntry是类本身的一部分:

class CDR {
        public final String fileName;
        private final List<CDREntry> entries = new ArrayList<CDREntry>();

        public CDR(String fileName) {
            super();
            this.fileName = fileName;
        }

        public List<CDREntry> getEntries() {
            return entries;
        }

        public List<CDREntry> addEntry(CDREntry e) {
            entries.add(e);
            return entries;
        }

        public String getFileName() {
            return this.fileName;
        }
    }

最佳答案

当以函数式风格开始编程时,您应该首选可以通过构造(或可能使用生成器模式或某些工厂方法)完全创建的不可变对象。因此,您的CDREntry类可能如下所示:

class CDREntry {
    private final String[] fields;
    private final String name;

    public CDREntry(String name, String[] fields) {
        this.name = name;
        this.fields = fields;
    }
    // Add getters and whatever
}


您的CDR类可能如下所示:

class CDR {
    private final String fileName;
    private final List<CDREntry> entries;

    public CDR(String fileName, List<CDREntry> entries) {
        this.fileName = fileName;
        this.entries = entries;
    }

    public List<CDREntry> getEntries() {
        return entries;
    }

    public String getFileName() {
        return this.fileName;
    }
}


有了这样的课程,事情变得容易了。其余代码可以这样重写:

public void execute4() {
    File filePath = new File(filePathData);
    File[] files = filePath.listFiles((File data, String name) ->
             data.getName().endsWith("CDR")); // fixed this line: it had compilation error
    List<CDR> cdrs = Arrays.stream(files).parallel()
            .map(this::readCDRP).sorted(cdrsorter)
            .collect(Collectors.toList());
}

private CDR readCDRP(File file) {
    try (BufferedReader bfr = new BufferedReader(new FileReader(file))) {
        // I'm not sure that collecting lines into list
        // before main processing was actually necessary
        return bfr.lines().parallelStream()
                .map(e -> new CDREntry(file.getName(), e.split(",", -1)))
                .collect(Collectors.collectingAndThen(
                        Collectors.toList(), list -> new CDR(file.getName(), list)));
    } catch (IOException e) {
        throw new UncheckedIOException(e);
    }
}


通常请记住,forEach通常不是解决任务的最干净的方法。将流集成到旧代码中时可能会有所帮助,但通常应避免使用。

07-26 02:58