我使用Observable.create()创建一个可观察对象以在调度程序上执行一些工作(例如Schedulers.io(),然后在AndroidSchedulers.mainThread()上返回结果。

val subscription = observable<T> {
        try {
            // perform action synchronously
            it.onNext(action.invoke(context, args))
            it.onCompleted()
        } catch (t: Exception) {
            it.onError(t)
        }
    }.subscribeOn(scheduler)
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(
                    {
                        // handle result here
                        result.set(it)
                    },
                    {
                        // handle error here
                        errorHandler.handleTaskError(model, this, it)
                    },
                    {
                        // notify completed
                        model.completeTask(this)
                    }
            )
action.invoke()内部的操作是同步的,并且可能是阻塞的IO操作。当用户决定取消时,我取消了可观察的订阅:subscription.unsubscribe()
但是,I/O操作不会被中断。是否有任何rx-java API可以中断操作?

最佳答案

当您调用yourSubscription.unsubscribe();时,Rx将调用您的退订代码。

此取消订阅的代码将是Subscription类,您可以在创建add时将其subscriber转换为Observable

Observable<Object> obs = Observable.create(subscriber -> {

      subscriber.add(new Subscription() {
            @Override
            public void unsubscribe() {
                 // perform unsubscription
            }

            @Override
            public boolean isUnsubscribed() {
                return false;
            }
        });
      subscriber.onNext(/**...*/);
      subscriber.onCompleted();
}

因此,如果有办法,可以在unsubscribe方法中中断您的工作。

请注意,当您取消订阅Observable或完成Observable时,将调用unsubscribe方法。 (它将自行退订)

编辑:在帐户中添加vladimir mironov评论

关于rx-java - RxJava : How to interrupt thread on unsubscribe?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/30191402/

10-09 09:26