我正在从数据库中检索大块数据,并使用此数据将其写入其他位置。为了避免较长的处理时间,我正在尝试使用并行流来编写它。当我将其作为顺序流运行时,它可以完美运行。但是,如果将其更改为并行,则行为很奇怪:它将多次打印同一对象(超过10次)。@PostConstructpublic void retrieveAllTypeRecords() throws SQLException { logger.info("Retrieve batch of Type records."); try { Stream<TypeRecord> typeQueryAsStream = jdbcStream.getTypeQueryAsStream(); typeQueryAsStream.forEach((type) -> { logger.info("Printing Type with field1: {} and field2: {}.", type.getField1(), type.getField2()); //the same object gets printed here multiple times //write this object somewhere else }); logger.info("Completed full retrieval of Type data."); } catch (Exception e) { logger.error("error: " + e); }}public Stream<TypeRecord> getTypeQueryAsStream() throws SQLException { String sql = typeRepository.getQueryAllTypesRecords(); //retrieves SQL query in String format TypeMapper typeMapper = new TypeMapper(); JdbcStream.StreamableQuery query = jdbcStream.streamableQuery(sql); Stream<TypeRecord> stream = query.stream() .map(row -> { return typeMapper.mapRow(row); //maps columns values to object values }); return stream; }public class StreamableQuery implements Closeable { (...) public Stream<SqlRow> stream() throws SQLException { final SqlRowSet rowSet = new ResultSetWrappingSqlRowSet(preparedStatement.executeQuery()); final SqlRow sqlRow = new SqlRowAdapter(rowSet); Supplier<Spliterator<SqlRow>> supplier = () -> Spliterators.spliteratorUnknownSize(new Iterator<SqlRow>() { @Override public boolean hasNext() { return !rowSet.isLast(); } @Override public SqlRow next() { if (!rowSet.next()) { throw new NoSuchElementException(); } return sqlRow; } }, Spliterator.CONCURRENT); return StreamSupport.stream(supplier, Spliterator.CONCURRENT, true); //this boolean sets the stream as parallel }}我也尝试使用typeQueryAsStream.parallel().forEach((type),但结果是相同的。输出示例:[ForkJoinPool.commonPool-worker-1]信息类型服务-保存类型为field1:L6797和field2:P1433。[ForkJoinPool.commonPool-worker-1]信息类型服务-保存类型为field1:L6797和field2:P1433。[main] INFO TypeService-保存类型为field1:L6797和field2:P1433。[ForkJoinPool.commonPool-worker-1]信息类型服务-保存类型为field1:L6797和field2:P1433。 最佳答案 好吧,看看你的代码, final SqlRow sqlRow = new SqlRowAdapter(rowSet); Supplier<Spliterator<SqlRow>> supplier = () -> Spliterators.spliteratorUnknownSize(new Iterator<SqlRow>() {… @Override public SqlRow next() { if (!rowSet.next()) { throw new NoSuchElementException(); } return sqlRow; } }, Spliterator.CONCURRENT);您每次都返回相同的对象。通过调用rowSet.next()时隐式修改此对象的状态,可以达到所需的效果。当多个线程尝试同时访问单个对象时,这显然不起作用。即使缓冲某些项目,将它们移交给另一个线程也会造成麻烦。因此,一旦涉及到有状态的中间操作(如sorted或distinct),此类干扰也会导致顺序流出现问题。假定typeMapper.mapRow(row)将产生一个实际的数据项,而该数据项不干扰其他数据项,则应将此步骤集成到流源中,以创建有效的流。public Stream<TypeRecord> stream(TypeMapper typeMapper) throws SQLException { SqlRowSet rowSet = new ResultSetWrappingSqlRowSet(preparedStatement.executeQuery()); SqlRow sqlRow = new SqlRowAdapter(rowSet); Spliterator<TypeRecord> sp = new Spliterators.AbstractSpliterator<TypeRecord>( Long.MAX_VALUE, Spliterator.CONCURRENT|Spliterator.ORDERED) { @Override public boolean tryAdvance(Consumer<? super TypeRecord> action) { if(!rowSet.next()) return false; action.accept(typeMapper.mapRow(sqlRow)); return true; } }; return StreamSupport.stream(sp, true); //this boolean sets the stream as parallel}请注意,对于像这样的许多用例,实现Spliterator比实现Iterator更容易(无论如何都需要通过spliteratorUnknownSize对其进行包装)。同样,也无需将此实例封装到Supplier中。最后一点,当前实现对于大小未知的流效果不佳,因为它将Long.MAX_VALUE视为一个非常大的数字,而忽略了规范为其分配的“未知”语义。提供估计大小对于并行性能非常有益,它不需要很精确,实际上,对于当前的实现,即使是完全组成的数字,例如1000可能比使用表示完全未知的大小。关于java-8 - 并行流重复项,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/44161841/
10-13 04:36