我有以下代码结构

服务

public Flowable entryFlow()
{
    return Flowable.fromIterable(this::getEntries)
}

消费者
void start()
{
    disposable = service
        .entryFlow()
        .observeOn(Schedulers.computation())
        .subscribeOn(Schedulers.computation())
        .subscribe(
            entry -> ...,
            this::onError,
            this::subscriptionFinished);
}

void stop()
{
    disposable.dispose();
}

private void onError(Throwable e)
{
    subscriptionFinished();
}

private void subscriptionFinished()
{
    //
}

我需要一种方法来阻止 flowable 在调用 stop 方法时获取和发出数据。

通过执行以下操作,我注意到 doOnCancel lambda 并不总是被调用。
void start()
{
    disposable = service
        .entryFlow()
        .observeOn(Schedulers.computation())
        .subscribeOn(Schedulers.computation())
        .doOnCancel(this::snapshotFinished)
        .subscribe(
            entry -> ...,
            this::onError,
            this::subscriptionFinished);
}

void stop()
{
    disposable.dispose();
}

替代方案是
volatile stopped;

void start()
{
    disposable = service
        .entryFlow()
        .observeOn(Schedulers.computation())
        .subscribeOn(Schedulers.computation())
        .takeUntil(x -> stopped)
        .subscribe(
            entry -> ...,
            this::onError,
            this::subscriptionFinished);
}

void stop()
{
    stopped = true;
}

什么是 start 和 stop 的推荐实现,以便 flowable 停止发射和 onComplete 或类似的方法(doOnCancel 操作?)被调用?

稍后编辑 :

使我的用例更短

调用disposable.dispose就足以阻止flowable从iterable获取数据并发送到源吗?我只有 1 个订阅者,并且无论原因如何,都需要在 flowable 结束时调用 onComplete/onError/other-callback。

其他回调是指 doOnCancel/doFinally 等之一。

谢谢

最佳答案

我建议使用 dispose() 方法。然后只需添加 doOnDispose 来触发您的副作用代码。

关于rx-java - 如何正确停止 rxjava Flowable?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/48754154/

10-10 10:31