我想创建一个可观察到的对象,仅当新值与先前的不同时才从基础热可观察对象(在此之前以-1
开始)发出值。另外,我希望将最新价值立即发送给新订户。我想出了以下代码:
PublishSubject<Integer> hotObservable = PublishSubject.create();
Observable<Integer> observable = hotObservable
.startWith(-1)
.distinctUntilChanged()
.replay(1)
.autoConnect(0);
但是,这在用
-1
发射到新订户的第一个值(总是hotObservable
,无论observable
在订阅java.lang.IllegalStateException: more produced than requested
之前发射什么)之后失败。有趣的是,当我没有自动连接而是手动订阅时:
Observable<Integer> observable = hotObservable
.startWith(-1)
.distinctUntilChanged()
.replay(1)
.autoConnect();
observable.subscribe().unsubscribe();
以下订阅者正常工作,接收最后一个值,然后更新。
我无法获得
replay(1).autoConnect(0)
的工作,感觉好像很想念-为什么要订阅和取消订阅工作,而autoConnect(0)
不能?创建这种可观察的正确方法是什么?除非使用
autoConnect(); observable.subscribe().unsubscribe()
,否则此测试方法将失败:Observable<Integer> observable = hotObservable
.startWith(-1)
.distinctUntilChanged()
.replay(1)
.autoConnect(); // With (0) it fails
observable.subscribe().unsubscribe(); // Needed if we don't auto connnect
hotObservable.onNext(1);
hotObservable.onNext(2);
hotObservable.onNext(3); // I want this value to be received by new subscriber
TestSubscriber<Integer> subscriber = TestSubscriber.create();
observable.subscribe(subscriber);
subscriber.assertNoErrors();
subscriber.assertValues(3);
最佳答案
我在RxJava 1.1.3上的上面的代码没有收到More produced than requested
错误。
断言失败的原因是replay
在任何订阅者实际请求之前不会从上游请求任何内容。如果TestSubscriber
是第一个订阅的,它将触发startWith发出-1,然后切换到PublishSubject,它不保留任何值,因此您将不会收到任何其他信息。
我相信您正在寻找的是BehaviorSubject
,它保留了最后一个值,并从新订户的值开始:
BehaviorSubject<Integer> hotObservable = BehaviorSubject.create(-1);
Observable<Integer> observable = hotObservable.distinctUntilChanged();
hotObservable.onNext(1);
hotObservable.onNext(2);
hotObservable.onNext(3);
TestSubscriber<Integer> subscriber = TestSubscriber.create();
observable.subscribe(subscriber);
subscriber.assertNoErrors();
subscriber.assertValue(3);
关于java - 使用`replay`和`autoConnect`时出现“超出要求的产量”异常,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/36544843/