我正在用Java9反应流和RxJava2测试这些水域。我对这两种方法都没有真正的偏爱,但是我正在寻找有关可能的指导。


我正在创建预配置的订户数量,如下所示:

for(int i = 0; i<MAX_SUBSCRIBERS; i++) {
     System.out.println("Creating subscriber: " + i);
     publisher.subscribe(new MySubscriber<>(i + "-subscriber"));
}

我正在从目录中读取文件列表,以便同时上传到某些第三方系统。

Stream<Path> paths = Files.list(Paths.get("/my/dir/with/files"));
paths
.filter((Files::isRegularFile))
.forEach(pathName -> publisher.submit(pathName.toString()));



我收到以下输出:

    0-subscriber: /my/dir/with/files/test0.txt received in onNext
    0-subscriber: /my/dir/with/files/test1.txt received in onNext
    1-subscriber: /my/dir/with/files/test0.txt received in onNext
    1-subscriber: /my/dir/with/files/test1.txt received in onNext


理想情况下,我们应该看到以下行为。每个订户应在一个唯一的文件上执行工作。

    0-subscriber: /my/dir/with/files/test0.txt received in onNext
    1-subscriber: /my/dir/with/files/test1.txt received in onNext


这可能吗?任何提示都很棒!

最佳答案

Java 9 Flow API由4个接口和SubmissionPublisher类组成,该类将每个提交的值分派给其所有Subscriber。当前没有支持您的数据流的JDK工具。

相比之下,RxJava是一个丰富的流利的库,具有数百个运算符,您可以在其中执行并行处理而无需复制:

    ParallelFlowable<Path> pf =
            Flowable.<Path, Stream<Path>>using(
                () -> Files.list(Paths.get("/my/dir/with/files")),
                files -> Flowable.fromIterable((Iterable<Path>)() -> files.iterator()),
                AutoCloseable::close
            )
            .parallel(2)
            .runOn(Schedulers.computation())
            .filter(Files::isRegularFile);

pf.subscribe(new Subscriber[] {
    new MySubscriber<>("0-subscriber"),
    new MySubscriber<>("1-subscriber"),
});

08-17 02:03