我目前正在使用rx-java 2,并且有一个用例,其中单个Camel Route订户需要使用多个Observable。
以此解决方案作为参考,我有一个部分可行的解决方案。 RxJava - Merged Observable that accepts more Observables at any time?

我打算使用将订阅到一个骆驼反应式流订阅者的PublishProcessor<T>,然后维护一个ConcurrentHashSet<Flowable<T>>,在其中可以动态添加新的Observable。
我目前无法使用PublishProcessor添加/管理Flowable<T>实例吗?
我真的是rx java的新手,所以对您的帮助不胜感激!这是我到目前为止所拥有的:

PublishProcessor<T> publishProcessor = PublishProcessor.create();
CamelReactiveStreamsService camelReactiveStreamsService =
CamelReactiveStreams.get(camelContext);
Subscriber<T> subscriber =
     camelReactiveStreamsService.streamSubscriber("t-class",T.class);
}
Set<Flowable<T>> flowableSet = Collections.newSetFromMap(new ConcurrentHashMap<Flowable<T>, Boolean>());

public void add(Flowable<T> flowableOrder){
    flowableSet.add(flowableOrder);
}

public void subscribe(){
    publishProcessor.flatMap(x -> flowableSet.forEach(// TODO)
    }) .subscribe(subscriber);
}

最佳答案

您可以有一个Processor并订阅多个可观察的流。您需要在添加和删除可观察对象时通过添加和删除订阅来管理订阅。

PublishProcessor<T> publishProcessor = PublishProcessor.create();

Map<Flowable<T>, Disposable> subscriptions = new ConcurrentHashMap<>();

void addObservable( Flowable<T> flowable ) {
  subscriptions.computeIfAbsent( flowable, fkey ->
    flowable.subscribe( publishProcessor ) );
}
void removeObservable( Flowable<T> flowable ) {
  Disposable d = subscriptions.remove( flowable );
  if ( d != null ) {
    d.dispose();
  }
}
void close() {
  for ( Disposable d: subscriptions.values() ) {
    d.dispose();
  }
}


将flowable用作地图的键,然后添加或删除订阅。

关于java - RxJava -2 Observables随时可以接受更多Observables吗?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/50139227/

10-09 05:33