有时,当我调试我的应用程序时,我在RxCachedThreadScheduler-1中遇到InterruptedException。这是跟踪:

Fatal Exception: java.lang.InterruptedException
       at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:1991)
       at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2025)
       at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1048)
       at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:776)
       at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1035)

我有一个自定义 View ,在其中我订阅了我的可观察对象,如下所示:

@Override
protected void onAttachedToWindow() {
    super.onAttachedToWindow();

    sub = FollowHandler.getInstance().getObservable()
            .filter(new Func1<FollowEvent, Boolean>() {
                @Override
                public Boolean call(FollowEvent followEvent) {
                    if(followEvent == null || followEvent.user == null
                            || user == null)
                        return false;

                    return followEvent.user.id == user.id;
                }
            })
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<FollowEvent>() {
                @Override
                public void onCompleted() {}

                @Override
                public void onError(Throwable e) {}

                @Override
                public void onNext(FollowEvent followEvent) {
                    reactToThisNiceEvent(followEvent);
                }
            });
}

@Override
protected void onDetachedFromWindow() {
    super.onDetachedFromWindow();

    if(sub != null)
        sub.unsubscribe();
}

这是可观察到的:

eventSubject.asObservable()
        .observeOn(Schedulers.io())
        .doOnNext(new Action1<FollowEvent>() {
            @Override
            public void call(FollowEvent followEvent) {
                if(followEvent != null)
                    doSomethingNice(followEvent);
            }
        })
        .share();

其中eventSubject是一个简单的PublishSubject。
我正在将RxAndroid 1.1.0与RxJava 1.1.0一起使用。

有人知道为什么会这样吗?

最佳答案

我不确定为什么会发生这种情况,但请尝试执行以下操作:

sub = FollowHandler.getInstance().getObservable()
            .filter(...)
            .subscribeOn(Schedulers.io())    //  <<<<<<<<<<
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(...);

另外,我认为您不需要share():
eventSubject.asObservable()
        .doOnNext(...)
        .subscribeOn(Schedulers.io())   // <<<<< subscribeOn instead of observeOn, but actually, you don't need it here...
        .share();     // <<<<< remove it

如上所述,我经常将Subject用作事件总线。而且我从来没有这样的问题。

P.S.
如果您将检查是否取消订阅,则使用onDetachedFromWindow()会更好。我知道在主线程中调用此方法是不可能的,并且无法同时访问此Subscription,但是我认为这是一种好方法:
if(sub != null && !sub.isUnsubscribed())
        sub.unsubscribe();

10-06 04:06