我使用Observable.create()
创建一个可观察对象以在调度程序上执行一些工作(例如Schedulers.io()
,然后在AndroidSchedulers.mainThread()
上返回结果。
val subscription = observable<T> {
try {
// perform action synchronously
it.onNext(action.invoke(context, args))
it.onCompleted()
} catch (t: Exception) {
it.onError(t)
}
}.subscribeOn(scheduler)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
{
// handle result here
result.set(it)
},
{
// handle error here
errorHandler.handleTaskError(model, this, it)
},
{
// notify completed
model.completeTask(this)
}
)
action.invoke()
内部的操作是同步的,并且可能是阻塞的IO操作。当用户决定取消时,我取消了可观察的订阅:subscription.unsubscribe()
但是,I/O操作不会被中断。是否有任何rx-java API可以中断操作?
最佳答案
当您调用yourSubscription.unsubscribe();
时,Rx将调用您的退订代码。
此取消订阅的代码将是Subscription
类,您可以在创建add
时将其subscriber
转换为Observable
。
Observable<Object> obs = Observable.create(subscriber -> {
subscriber.add(new Subscription() {
@Override
public void unsubscribe() {
// perform unsubscription
}
@Override
public boolean isUnsubscribed() {
return false;
}
});
subscriber.onNext(/**...*/);
subscriber.onCompleted();
}
因此,如果有办法,可以在
unsubscribe
方法中中断您的工作。请注意,当您取消订阅Observable或完成Observable时,将调用unsubscribe方法。 (它将自行退订)
编辑:在帐户中添加vladimir mironov评论
关于rx-java - RxJava : How to interrupt thread on unsubscribe?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/30191402/