我想创建一个可观察到的对象,仅当新值与先前的不同时才从基础热可观察对象(在此之前以-1开始)发出值。另外,我希望将最新价值立即发送给新订户。我想出了以下代码:

PublishSubject<Integer> hotObservable = PublishSubject.create();

Observable<Integer> observable = hotObservable
        .startWith(-1)
        .distinctUntilChanged()
        .replay(1)
        .autoConnect(0);


但是,这在用-1发射到新订户的第一个值(总是hotObservable,无论observable在订阅java.lang.IllegalStateException: more produced than requested之前发射什么)之后失败。
有趣的是,当我没有自动连接而是手动订阅时:

Observable<Integer> observable = hotObservable
        .startWith(-1)
        .distinctUntilChanged()
        .replay(1)
        .autoConnect();

observable.subscribe().unsubscribe();


以下订阅者正常工作,接收最后一个值,然后更新。

我无法获得replay(1).autoConnect(0)的工作,感觉好像很想念-为什么要订阅和取消订阅工作,而autoConnect(0)不能?创建这种可观察的正确方法是什么?

除非使用autoConnect(); observable.subscribe().unsubscribe(),否则此测试方法将失败:

Observable<Integer> observable = hotObservable
        .startWith(-1)
        .distinctUntilChanged()
        .replay(1)
        .autoConnect(); // With (0) it fails

observable.subscribe().unsubscribe(); // Needed if we don't auto connnect

hotObservable.onNext(1);
hotObservable.onNext(2);
hotObservable.onNext(3); // I want this value to be received by new subscriber

TestSubscriber<Integer> subscriber = TestSubscriber.create();
observable.subscribe(subscriber);

subscriber.assertNoErrors();
subscriber.assertValues(3);

最佳答案

我在RxJava 1.1.3上的上面的代码没有收到More produced than requested错误。

断言失败的原因是replay在任何订阅者实际请求之前不会从上游请求任何内容。如果TestSubscriber是第一个订阅的,它将触发startWith发出-1,然后切换到PublishSubject,它不保留任何值,因此您将不会收到任何其他信息。

我相信您正在寻找的是BehaviorSubject,它保留了最后一个值,并从新订户的值开始:

BehaviorSubject<Integer> hotObservable = BehaviorSubject.create(-1);

Observable<Integer> observable = hotObservable.distinctUntilChanged();

hotObservable.onNext(1);
hotObservable.onNext(2);
hotObservable.onNext(3);

TestSubscriber<Integer> subscriber = TestSubscriber.create();
observable.subscribe(subscriber);

subscriber.assertNoErrors();
subscriber.assertValue(3);

关于java - 使用`replay`和`autoConnect`时出现“超出要求的产量”异常,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/36544843/

10-11 04:04