我对浓缩咖啡测试还不熟悉。在我现有的应用程序中,我们使用rxandroid来进行一些联网。我们使用RXBus与应用程序的某些部分进行通信,否则这些部分看起来“不可能”。
我们导入了实现IdlingResourceRxEspresso,因此可以使用rxandroid网络调用。
不幸的是,RxEspresso不允许rxbus工作,因为它是一个“热可观测”并且从不关闭。所以它抛出android.support.test.espresso.IdlingResourceTimeoutException: Wait for [RxIdlingResource] to become idle timed out
我做了一个小小的android应用程序来证明我的观点。
它有两个活动。第一个显示在RecyclerView中启动时通过网络调用检索的一些项。
当点击它时,它通过RXBus进行通信(我知道这有点过头了,但纯粹是为了证明这一点)。DetailActivity然后显示数据。
我们如何编辑RxEspresso以便它与我们的rxbus一起工作?
rxidlingresource也检查RxEspresso

/**
 * Provides the hooks for both RxJava and Espresso so that Espresso knows when to wait
 * until RxJava subscriptions have completed.
 */

public final class RxIdlingResource extends RxJavaObservableExecutionHook implements IdlingResource {
    public static final String TAG = "RxIdlingResource";

    static LogLevel LOG_LEVEL = NONE;

    private final AtomicInteger subscriptions = new AtomicInteger(0);

    private static RxIdlingResource INSTANCE;

    private ResourceCallback resourceCallback;

    private RxIdlingResource() {
        //private
    }

    public static RxIdlingResource get() {
        if (INSTANCE == null) {
            INSTANCE = new RxIdlingResource();
            Espresso.registerIdlingResources(INSTANCE);
        }
        return INSTANCE;
    }

    /* ======================== */
    /* IdlingResource Overrides */
    /* ======================== */

    @Override
    public String getName() {
        return TAG;
    }

    @Override
    public boolean isIdleNow() {
        int activeSubscriptionCount = subscriptions.get();
        boolean isIdle = activeSubscriptionCount == 0;

        if (LOG_LEVEL.atOrAbove(DEBUG)) {
            Log.d(TAG, "activeSubscriptionCount: " + activeSubscriptionCount);
            Log.d(TAG, "isIdleNow: " + isIdle);
        }

        return isIdle;
    }

    @Override
    public void registerIdleTransitionCallback(ResourceCallback resourceCallback) {
        if (LOG_LEVEL.atOrAbove(DEBUG)) {
            Log.d(TAG, "registerIdleTransitionCallback");
        }
        this.resourceCallback = resourceCallback;
    }

    /* ======================================= */
    /* RxJavaObservableExecutionHook Overrides */
    /* ======================================= */

    @Override
    public <T> Observable.OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance,
                                                          final Observable.OnSubscribe<T> onSubscribe) {
        int activeSubscriptionCount = subscriptions.incrementAndGet();
        if (LOG_LEVEL.atOrAbove(DEBUG)) {
            if (LOG_LEVEL.atOrAbove(VERBOSE)) {
                Log.d(TAG, onSubscribe + " - onSubscribeStart: " + activeSubscriptionCount, new Throwable());
            } else {
                Log.d(TAG, onSubscribe + " - onSubscribeStart: " + activeSubscriptionCount);
            }
        }

        onSubscribe.call(new Subscriber<T>() {
            @Override
            public void onCompleted() {
                onFinally(onSubscribe, "onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                onFinally(onSubscribe, "onError");
            }

            @Override
            public void onNext(T t) {
                //nothing
            }
        });

        return onSubscribe;
    }



    private <T> void onFinally(Observable.OnSubscribe<T> onSubscribe, final String finalizeCaller) {
        int activeSubscriptionCount = subscriptions.decrementAndGet();
        if (LOG_LEVEL.atOrAbove(DEBUG)) {
            Log.d(TAG, onSubscribe + " - " + finalizeCaller + ": " + activeSubscriptionCount);
        }
        if (activeSubscriptionCount == 0) {
            Log.d(TAG, "onTransitionToIdle");
            resourceCallback.onTransitionToIdle();
        }
    }
}

RXBUS
public class RxBus {

    //private final PublishSubject<Object> _bus = PublishSubject.create();

    // If multiple threads are going to emit events to this
    // then it must be made thread-safe like this instead
    private final Subject<Object, Object> _bus = new SerializedSubject<>(PublishSubject.create());

    public void send(Object o) {
        _bus.onNext(o);
    }

    public Observable<Object> toObserverable() {
        return _bus;
    }

    public boolean hasObservers() {
        return _bus.hasObservers();
    }
}

主要活动
public class MainActivity extends AppCompatActivity {

    @Bind(R.id.rv)
    RecyclerView RV;

    private List<NewsItem> newsItems;
    private RecyclerViewAdapter adapter;
    private Observable<List<NewsItem>> newsItemsObservable;
    private CompositeSubscription subscriptions = new CompositeSubscription();
    private RxBus rxBus;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        ButterKnife.bind(this);

        //Subscribe to RxBus
        rxBus = new RxBus();
        subscriptions.add(rxBus.toObserverable()
                .subscribe(new Action1<Object>() {
                    @Override
                    public void call(Object event) {
                        //2.
                        NewsItem myClickNewsItem = (NewsItem) event;
                        startActivity(new Intent(MainActivity.this, DetailActivity.class).putExtra("text", myClickNewsItem.getBodyText()));
                    }
                }));

        //Set the adapter
        adapter = new RecyclerViewAdapter(this);
        //Set onClickListener on the list
        ItemClickSupport.addTo(RV).setOnItemClickListener(new ItemClickSupport.OnItemClickListener() {
            @Override
            public void onItemClicked(RecyclerView recyclerView, int position, View v) {
                //Send the clicked item over the RxBus.
                //Receives it in 2.
                rxBus.send(newsItems.get(position));
            }
        });
        RV.setLayoutManager(new LinearLayoutManager(this));
        RestAdapter retrofit = new RestAdapter.Builder()
                .setEndpoint("http://URL.com/json")
                .build();
        ServiceAPI api = retrofit.create(ServiceAPI.class);
        newsItemsObservable = api.listNewsItems(); //onComplete goes to setNewsItems
    }

    @Override
    protected void onPostCreate(Bundle savedInstanceState) {
        super.onPostCreate(savedInstanceState);
        NewsItemObserver observer = new NewsItemObserver(this);
        newsItemsObservable.delaySubscription(1, TimeUnit.SECONDS).observeOn(AndroidSchedulers.mainThread()).subscribeOn(Schedulers.io()).subscribe(observer);
    }

    public void setNewsItems(List<NewsItem> newsItems) {
        this.newsItems = newsItems;
        adapter.setNewsItems(newsItems);
        RV.setAdapter(adapter);
    }

最佳答案

由于我们没有得到这个问题的更好的答案,我们假设通过rxbus发送的objects是即时的,不需要计入subscriptions.incrementAndGet();
我们只是在这条线之前把物体过滤掉。在我们的例子中,对象属于SerializedSubjectPublishSubject类。
这是我们改变的方法。

@Override
public <T> Observable.OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final Observable.OnSubscribe<T> onSubscribe) {

    int activeSubscriptionCount = 0;

    if (observableInstance instanceof SerializedSubject || observableInstance instanceof PublishSubject) {
        Log.d(TAG, "Observable we won't register: " + observableInstance.toString());
    } else {
        activeSubscriptionCount = subscriptions.incrementAndGet();
    }
    if (LOG_LEVEL.atOrAbove(DEBUG)) {
        if (LOG_LEVEL.atOrAbove(VERBOSE)) {
            Log.d(TAG, onSubscribe + " - onSubscribeStart: " + activeSubscriptionCount, new Throwable());
        } else {
            Log.d(TAG, onSubscribe + " - onSubscribeStart: " + activeSubscriptionCount);
        }
    }


    onSubscribe.call(new Subscriber<T>() {
        @Override
        public void onCompleted() {
            onFinally(onSubscribe, "onCompleted");
        }

        @Override
        public void onError(Throwable e) {
            onFinally(onSubscribe, "onError");
        }

        @Override
        public void onNext(T t) {
            Log.d(TAG, "onNext:: " + t.toString());
            //nothing
        }
    });

    return onSubscribe;
}

07-28 03:47