我有以下代码结构
服务
public Flowable entryFlow()
{
return Flowable.fromIterable(this::getEntries)
}
消费者
void start()
{
disposable = service
.entryFlow()
.observeOn(Schedulers.computation())
.subscribeOn(Schedulers.computation())
.subscribe(
entry -> ...,
this::onError,
this::subscriptionFinished);
}
void stop()
{
disposable.dispose();
}
private void onError(Throwable e)
{
subscriptionFinished();
}
private void subscriptionFinished()
{
//
}
我需要一种方法来阻止 flowable 在调用 stop 方法时获取和发出数据。
通过执行以下操作,我注意到 doOnCancel lambda 并不总是被调用。
void start()
{
disposable = service
.entryFlow()
.observeOn(Schedulers.computation())
.subscribeOn(Schedulers.computation())
.doOnCancel(this::snapshotFinished)
.subscribe(
entry -> ...,
this::onError,
this::subscriptionFinished);
}
void stop()
{
disposable.dispose();
}
替代方案是
volatile stopped;
void start()
{
disposable = service
.entryFlow()
.observeOn(Schedulers.computation())
.subscribeOn(Schedulers.computation())
.takeUntil(x -> stopped)
.subscribe(
entry -> ...,
this::onError,
this::subscriptionFinished);
}
void stop()
{
stopped = true;
}
什么是 start 和 stop 的推荐实现,以便 flowable 停止发射和 onComplete 或类似的方法(doOnCancel 操作?)被调用?
稍后编辑 :
使我的用例更短
调用disposable.dispose就足以阻止flowable从iterable获取数据并发送到源吗?我只有 1 个订阅者,并且无论原因如何,都需要在 flowable 结束时调用 onComplete/onError/other-callback。
其他回调是指 doOnCancel/doFinally 等之一。
谢谢
最佳答案
我建议使用 dispose()
方法。然后只需添加 doOnDispose
来触发您的副作用代码。
关于rx-java - 如何正确停止 rxjava Flowable?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/48754154/