我想在RxJava中实现相当简单的DAG。
我们有一个项目来源:Observable<String> itemsObservable = Observable.fromIterable(items)
接下来,我想有一个可以订阅itemsObservable
并允许多个订阅者订阅的处理器。
所以我创建了:PublishProcessor<String> itemsProccessor = PublishProcessor.create();
不幸的是,这是不可能的:itemsObservable.subscribe(itemsProccessor);
为什么?什么是实现这种DAG的合适API?
这是一个演示图:
这是我(失败的)尝试实现这种DAG的尝试:
List<String> items = Arrays.asList("a", "b", "c");
Flowable<String> source = Flowable.fromIterable(items);
PublishProcessor<String> processor = PublishProcessor.create();
processor.doOnNext(s -> s.toUpperCase());
processor.subscribe(System.out::println);
processor.subscribe(System.out::println);
source.subscribe(processor);
最佳答案
这是因为PublishProcessor
实现Subscriber
而Observable
的subscription方法接受Observer
。您可以将itemsObservable
转换为Flowable
,它将完成工作。
Observable<String> items = Observable.fromIterable(Arrays.asList("a","b"));
PublishProcessor<String> processor = PublishProcessor.create();
items.toFlowable(BackpressureStrategy.BUFFER)
.subscribe(processor);