我想在RxJava中实现相当简单的DAG。

我们有一个项目来源:

Observable<String> itemsObservable = Observable.fromIterable(items)

接下来,我想有一个可以订阅itemsObservable并允许多个订阅者订阅的处理器。

所以我创建了:
PublishProcessor<String> itemsProccessor = PublishProcessor.create();

不幸的是,这是不可能的:
itemsObservable.subscribe(itemsProccessor);

为什么?什么是实现这种DAG的合适API?

这是一个演示图:

java - RxJava 2:为什么PublishProcessor不能订阅Observable?-LMLPHP

这是我(失败的)尝试实现这种DAG的尝试:

List<String> items = Arrays.asList("a", "b", "c");
Flowable<String> source = Flowable.fromIterable(items);

PublishProcessor<String> processor = PublishProcessor.create();
processor.doOnNext(s -> s.toUpperCase());

processor.subscribe(System.out::println);
processor.subscribe(System.out::println);
source.subscribe(processor);

最佳答案

这是因为PublishProcessor实现SubscriberObservable的subscription方法接受Observer。您可以将itemsObservable转换为Flowable,它将完成工作。

    Observable<String> items = Observable.fromIterable(Arrays.asList("a","b"));
    PublishProcessor<String> processor = PublishProcessor.create();
    items.toFlowable(BackpressureStrategy.BUFFER)
            .subscribe(processor);

07-24 09:21