我收到这个随机异常,导致在进行翻新的网络通话时崩溃了我的应用程序,并且正在寻找解决方法的指导

Fatal Exception: java.io.InterruptedIOException: thread interrupted
       at okio.Timeout.throwIfReached(Timeout.java:145)
       at okio.Okio$1.write(Okio.java:76)
       at okio.AsyncTimeout$1.write(AsyncTimeout.java:180)
       at okio.RealBufferedSink.flush(RealBufferedSink.java:216)
       at okhttp3.internal.http2.Http2Writer.flush(Http2Writer.java:121)
       at okhttp3.internal.http2.Http2Connection.newStream(Http2Connection.java:239)
       at okhttp3.internal.http2.Http2Connection.newStream(Http2Connection.java:205)
       at okhttp3.internal.http2.Http2Codec.writeRequestHeaders(Http2Codec.java:111)
       at okhttp3.internal.http.CallServerInterceptor.intercept(CallServerInterceptor.java:50)
       at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
       at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
       at com.happycorp.happy.happyapp.util.Network$1.intercept(Network.java:80)
       at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
       at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
       at com.happycorp.android.commondata.net.RetrofitFactory$CustomHttpMetricsLogger.intercept(RetrofitFactory.java:139)
       at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
       at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:45)
       at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
       at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
       at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
       at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
       at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
       at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
       at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
       at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:125)
       at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
       at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
       at com.happycorp.android.commondata.net.RetrofitFactory$1.intercept(RetrofitFactory.java:83)
       at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
       at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
       at okhttp3.logging.HttpLoggingInterceptor.intercept(HttpLoggingInterceptor.java:212)
       at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
       at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
       at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:200)
       at okhttp3.RealCall.execute(RealCall.java:77)
       at retrofit2.OkHttpCall.execute(OkHttpCall.java:180)
       at retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:41)
       at io.reactivex.Observable.subscribe(Observable.java:10179)
       at retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(BodyObservable.java:34)
       at io.reactivex.Observable.subscribe(Observable.java:10179)
       at io.reactivex.internal.operators.observable.ObservableSingleSingle.subscribeActual(ObservableSingleSingle.java:35)
       at io.reactivex.Single.subscribe(Single.java:2558)
       at io.reactivex.internal.operators.single.SingleToFlowable.subscribeActual(SingleToFlowable.java:37)
       at io.reactivex.Flowable.subscribe(Flowable.java:12218)
       at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.onNext(FlowableFlatMap.java:156)
       at io.reactivex.internal.operators.flowable.FlowableFromIterable$IteratorSubscription.slowPath(FlowableFromIterable.java:238)
       at io.reactivex.internal.operators.flowable.FlowableFromIterable$BaseRangeSubscription.request(FlowableFromIterable.java:123)
       at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.onSubscribe(FlowableFlatMap.java:110)
       at io.reactivex.internal.operators.flowable.FlowableFromIterable.subscribe(FlowableFromIterable.java:68)
       at io.reactivex.internal.operators.flowable.FlowableFromIterable.subscribeActual(FlowableFromIterable.java:46)
       at io.reactivex.Flowable.subscribe(Flowable.java:12218)
       at io.reactivex.internal.operators.flowable.FlowableFlatMap.subscribeActual(FlowableFlatMap.java:52)
       at io.reactivex.Flowable.subscribe(Flowable.java:12218)
       at io.reactivex.internal.operators.flowable.FlowableSubscribeOn$SubscribeOnSubscriber.run(FlowableSubscribeOn.java:82)
       at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59)
       at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51)
       at java.util.concurrent.FutureTask.run(FutureTask.java:237)
       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)
       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
       at java.lang.Thread.run(Thread.java:761)

服务接口(interface)如下所示:
@GET("categories")
Single<Category> getCategories();

@GET("categories/{categoryId}")
Single<Category> getCategory(@Path("categoryId") Integer id);

@GET("categories")
Single<Category> getCategoriesByRootCategory(@Query("rootCategoryId") Integer id);

@GET("products")
Single<ProductSearchResult> getProductsBySingleFilter(@Query(SINGLE_FILTER_BASE + "[field]")
        String searchCriteria,
        @Query(SINGLE_FILTER_BASE + "[conditionType]")
                String conditionType,
        @Query(SINGLE_FILTER_BASE + "[value]") String value);

以下是创建okhttp客户端和改造接口(interface)的代码:
public static OkHttpClient getOkHttpClient(@Nullable final File parentCacheDir,
        @NonNull final AuthProvider authProvider,
        @NonNull final ApiEnvironment apiEnvironment,
        @Nullable final Interceptor cacheInterceptor) {

    Cache cache = null;
    if (parentCacheDir != null) {
        File responseCacheDirectory = new File(parentCacheDir, RESPONSE_CACHE_DIRECTORY);
        cache = new Cache(responseCacheDirectory, CACHE_SIZE);
    }

    OkHttpClient.Builder builder = new OkHttpClient.Builder()
            .connectTimeout(NETWORK_REQUEST_TIMEOUT_SECONDS, TimeUnit.SECONDS)
            .readTimeout(NETWORK_REQUEST_TIMEOUT_SECONDS, TimeUnit.SECONDS)
            .writeTimeout(NETWORK_REQUEST_TIMEOUT_SECONDS, TimeUnit.SECONDS)
            .cache(cache);

    HttpLoggingInterceptor loggingInterceptor = new HttpLoggingInterceptor();
    loggingInterceptor.setLevel(HttpLoggingInterceptor.Level.HEADERS);

    builder.addInterceptor(loggingInterceptor);

    builder.addInterceptor(new Interceptor() {
        @Override
        public Response intercept(Chain chain) throws IOException {
            Request.Builder requestBuilder = chain.request().newBuilder();
            requestBuilder.header(KEY_CONTENT_TYPE, APPLICATION_JSON);
            requestBuilder = addAuthHeaders(requestBuilder, apiEnvironment);
            if (apiEnvironment.getDefaultRequestParams() != null) {
                requestBuilder = addDefaultParams(requestBuilder, apiEnvironment);
            }
            return chain.proceed(requestBuilder.build());
        }
    });

    builder.addNetworkInterceptor(new CustomHttpMetricsLogger());

    if (cacheInterceptor != null) {
        builder.addNetworkInterceptor(cacheInterceptor);
    }

    return builder.build();
}

public static Retrofit newInstance(@NonNull final AuthProvider authProvider,
        @NonNull final ApiEnvironment apiEnvironment,
        @Nullable final File parentCacheDir,
        @Nullable final Interceptor debugInterceptor) {
    return new Retrofit.Builder()
            .addConverterFactory(GsonConverterFactory.create(getGson()))
            .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
            .client(getOkHttpClient(parentCacheDir, authProvider, apiEnvironment, debugInterceptor))
            .baseUrl(apiEnvironment.getBaseUrl())
            .build();
}

这是生成异常的代码:
public static Flowable<ProductSearchResult> getAllItemsForCategories(final HebMagentoApi magentoService,
                                                                     List<Category> categories) {
    return Flowable.fromIterable(categories).flatMap(category ->
            magentoService.getProductsBySingleFilter(HebMagentoApi.MAGENTO_CATEGORY_ID,
                    HebMagentoApi.MAGENTO_FILTER_EQUAL,
                    String.valueOf(category.getId())).toFlowable());
}

我认为此问题与适配器代码中的这一行有关:

https://github.com/square/retrofit/blob/master/retrofit-adapters/rxjava2/src/main/java/retrofit2/adapter/rxjava2/BodyObservable.java#L59

最佳答案

问题在于我们如何创建RxJava2CallAdapterFactory。我们使用Dagger 2创建了okhttp客户端,据我所知,该图是在主线程上构建的,因此,如果我们使用改造手工创建flowable/observable,则默认情况下rxjava2calladapterfactory将使用主线程。

为了解决此问题,以便所有使用RxJava2进行Retrofit的调用都在后台线程上进行,我们通过以下方式创建调用适配器:

RxJavaCallAdapterFactory.createWithScheduler(Schedulers.io())

重要:要防止应用崩溃的另一件事是确保为每个RxJava运算符定义onError函数:flatMap, map, etc

10-06 01:44