我有一个任务要合并两个可观察对象-一个是从数据库存在的有限可观察对象(称为databaseStream),另一个是从队列(queueStream)存在的无限可观察对象。当客户端订阅时,我需要立即订阅queueStream,但是要缓冲项目,直到databaseStream将完全完成。发生这种情况时,我要发出queueStream的所有缓冲项目。之后,必须立即发出queueStream中的所有项目。使用RxJava 2有什么方便的方法吗?
最佳答案
找到灵感here
Flowable<Foo> queueStream = queueStream()
.subscribeOn(Schedulers.newThread());
Flowable<Foo> databaseStream = databaseStream()
.subscribeOn(Schedulers.newThread());
Flowable.concatEager(Arrays.asList(databaseStream, queueStream), 2 , 1000)
.distinct(identityFunction())
.blockingSubscribe(System.err::println);