我正在用Java9反应流和RxJava2测试这些水域。我对这两种方法都没有真正的偏爱,但是我正在寻找有关可能的指导。
我正在创建预配置的订户数量,如下所示:
for(int i = 0; i<MAX_SUBSCRIBERS; i++) {
System.out.println("Creating subscriber: " + i);
publisher.subscribe(new MySubscriber<>(i + "-subscriber"));
}
我正在从目录中读取文件列表,以便同时上传到某些第三方系统。
Stream<Path> paths = Files.list(Paths.get("/my/dir/with/files"));
paths
.filter((Files::isRegularFile))
.forEach(pathName -> publisher.submit(pathName.toString()));
我收到以下输出:
0-subscriber: /my/dir/with/files/test0.txt received in onNext
0-subscriber: /my/dir/with/files/test1.txt received in onNext
1-subscriber: /my/dir/with/files/test0.txt received in onNext
1-subscriber: /my/dir/with/files/test1.txt received in onNext
理想情况下,我们应该看到以下行为。每个订户应在一个唯一的文件上执行工作。
0-subscriber: /my/dir/with/files/test0.txt received in onNext
1-subscriber: /my/dir/with/files/test1.txt received in onNext
这可能吗?任何提示都很棒!
最佳答案
Java 9 Flow API由4个接口和SubmissionPublisher
类组成,该类将每个提交的值分派给其所有Subscriber
。当前没有支持您的数据流的JDK工具。
相比之下,RxJava是一个丰富的流利的库,具有数百个运算符,您可以在其中执行并行处理而无需复制:
ParallelFlowable<Path> pf =
Flowable.<Path, Stream<Path>>using(
() -> Files.list(Paths.get("/my/dir/with/files")),
files -> Flowable.fromIterable((Iterable<Path>)() -> files.iterator()),
AutoCloseable::close
)
.parallel(2)
.runOn(Schedulers.computation())
.filter(Files::isRegularFile);
pf.subscribe(new Subscriber[] {
new MySubscriber<>("0-subscriber"),
new MySubscriber<>("1-subscriber"),
});