我确实有这种行为

Subscriber OnComplete called twice

(根据http://reactivex.io/documentation/subject.html预期)

但是在我的情况下:事情是这样的:

我有一个AudioRecordingService,它显示一条通知,其中有一些选项供用户保存或删除进行中的录音,效果很好。但是我试图使用RxAndroid,通知的保存按钮将触发。

RxEventBus.getInstance().postEvent(new RxEvents(RxEventsEnum.AUDIO_STOP_AND_SAVE));


哪个触发

bindUntilActivitySpecificEvent(RxEventBus.getInstance().forEventType(RxEvents.class),ActivityEvent.DESTROY).subscribeOn(
        AndroidSchedulers.mainThread()).subscribe(new Action1<RxEvents>() {
      @Override public void call(RxEvents rxEvents) {
        onEvent(rxEvents);
      }
    });


并在基于rxEvents对象数据的onEvent(rxEvent)中适当保存和​​存储记录。第一次尝试,效果很好,但随后的时间

@Override public void call(RxEvents rxEvents) {
            onEvent(rxEvents);
          }


被多次调用,例如第二次发布事件,此回调被调用两次,第三次调用三次,依此类推(实际上是PublishSubject所做的)。我不希望这种行为,我希望Rx能够发布事件,并且仅接收发布的最新事件,而别无其他。

这是我的其他相关代码

protected final <T> Observable<T> bindUntilActivitySpecificEvent(Observable<T> observable,
      ActivityEvent event) {
    return observable.compose(RxLifecycle.<T, ActivityEvent>bindUntilEvent(lifecycle(), event))
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread());
  }


和我的磨坊RxEventBus类的运行:

public class RxEventBus {

  private static final RxEventBus INSTANCE = new RxEventBus();

  public static RxEventBus getInstance() {
    return INSTANCE;
  }

  private RxEventBus() {
  }

  private final Subject<Object, Object> mBus = new SerializedSubject<>(PublishSubject.create());


  public void postEvent(Object event) {
    mBus.onNext(event);
  }

  public <T> Observable<T> forEventType(Class<T> eventType) {
    return mBus.ofType(eventType).observeOn(AndroidSchedulers.mainThread());
  }
}


使用RxAndroid的最佳方法是什么?请注意,我只在寻找RxAndroid解决方案。

最佳答案

每当您在中触发事件时,您都在创建一个新的可观察对象

RxEventBus.getInstance().forEventType(RxEvents.class)


您需要缓存为每种事件类型创建的可观察对象。

09-27 12:42