我有几个资源在我的应用程序,我需要加载和转储到我的数据库在第一次发射。我想做个类似的事情。
所以我创建了一个可观察的包装器来读取文件。

@Override
public Observable<List<T>> loadDataFromFile() {

    return Observable.create(new Observable.OnSubscribe<List<T>>() {
        @Override
        public void call(Subscriber<? super List<T>> subscriber) {
            LOG.info("Starting load from file for %s ON THREAD %d" + type, Thread.currentThread().getId());
            InputStream inputStream = null;
            try {
                Gson gson = JsonConverter.getExplicitGson();
                inputStream = resourceWrapper.openRawResource(resourceId);
                InputStreamReader inputStreamReader = new InputStreamReader(inputStream);
                List<T> tList = gson.fromJson(inputStreamReader, type);

                subscriber.onNext(tList);
                subscriber.onCompleted();
                LOG.info("Completed load from file for " + type);
            } catch (Exception e) {
                LOG.error("An error occurred loading the file");
                subscriber.onError(e);
            } finally {
                if (inputStream != null) {
                    try {
                        inputStream.close();
                    } catch (IOException e) {
                    }
                }
            }
        }
    });
}

但是它不是异步的,我看到有两种方法可以使它异步:
1)在Observable中执行异步生成新线程或使用基于回调的文件读取API。
2)使用调度程序在I/O线程上执行工作,
同样对于数据库,我必须创建自己的observate来包装数据库api,并且有一个带有回调的同步和异步版本。
那么,正确的方法是什么呢?
其次,我如何在一个链表中使用这些观察数据并行读取这些文件,然后将每个文件的内容存储在数据库中。当我的所有引用数据的整个过程完成时,我想接收一个未完成的事件。

最佳答案

RX的一个好处是你可以控制你的“工作”是在什么线程上完成的。你可以用

subscribeOn(Schedulers.io())

如果要并行加载资源,我建议使用merge(或mergedelayerror)运算符。
假设你有一个函数
Observable<List<T>> loadDataFromresource(int resID)

要加载一个资源,可以首先为每个资源创建一个可观察的列表
for (int i=0 ; i<10; i++) {
    obsList.add(loadDataFromresource(i+1).subscribeOn(Schedulers.io()));
}

将调度程序与每个可观察对象相关联。合并观测值使用
Observable<List<T>> mergedObs = Observable.merge(obsList);

然后,订阅得到的可观察结果应并行加载资源。如果你想将错误延迟到合并的可观测数据的末尾,那么使用
Observable<List<T>> mergedObs = Observable.mergeDelayError(obsList);

09-30 15:41
查看更多