在这种情况下的主要问题是我在主线程中创建了Realm实例,但是尝试从辅助线程访问它.我们可以在主线程上订阅领域,但这不是一个好习惯.当我使用realm.where("query",query).findFirstAsync().asObservable();以Github存储库为例,我陷入Observable.concat(localResult, remoteResult).first();我的解决方案是什么?在我们的存储库实现中,仍然有两个远程和本地可观察对象,如下所示:final Observable<Page> localResult = mSearchLocalDataSource.search(query);final Observable<Page> remoteResult = mSearchRemoteDataSource.search(query) .doOnNext(new Action1<Page>() { @Override public void call(Page page) { if (page != null) { mSearchLocalDataSource.save(query, page); mResultCache.put(query, page); } } });请注意,当我们从远程获取数据时,我们将其保存到数据并缓存在内存中.如果我们无法从缓存中获取数据,我们仍然合并两个可观察对象.return Observable.concat(localResult, remoteResult) .first() .map(new Func1<Page, Page>() { @Override public Page call(Page page) { if (page == null) { throw new NoSuchElementException("No result found!"); } return page; } });使用concat时,我们尝试从领域中获取数据,如果不能,则尝试从远程中获取数据.这是远程可观察的实现:@Overridepublic Observable<Page> search(@NonNull String query) { return mWikiServices.search(query).flatMap(new Func1<Result, Observable<Page>>() { @Override public Observable<Page> call(Result result) { final ArrayList<Page> pages = new ArrayList<>(result.getQuery().getPages().values()); Log.i("data from", "remote"); return Observable.from(pages).first(); } }); }这里是本地源实现:@Overridepublic Observable<Page> search(@NonNull final String query) { return Observable.create(new Observable.OnSubscribe<Page>() { @Override public void call(Subscriber<? super Page> subscriber) { final Realm realm = Realm.getInstance(mRealmConfiguration); final Page page = realm.where(Page.class) .equalTo("query", query) .findFirst(); if (page != null && page.isLoaded() && page.isValid()) { Log.i("data from", "realm"); subscriber.onNext(realm.copyFromRealm(page)); } else { Observable.empty(); } subscriber.onCompleted(); realm.close(); } }); }关键是我创建了一个新的Observable并从那里的领域获取数据.因此,我们创建领域实例并在同一线程中使用它. (io线程).我们创建对象的副本以摆脱非法状态异常.如果我们从领域获取数据(如果为null),则返回一个可观察的空值,以免卡在concat操作中.如果获得页面,则该页面有效并已加载,我们将其发送给订户并完成操作.在这里,我们可以将从远程获取的数据保存到领域:@Overridepublic void save(@NonNull String query, @NonNull Page page) { final Realm realm = Realm.getInstance(mRealmConfiguration); realm.beginTransaction(); final Page p = realm.createObject(Page.class); p.setQuery(query); p.setId(page.getId()); p.setTitle(page.getTitle()); p.setContent(page.getContent()); realm.copyToRealmOrUpdate(p); realm.commitTransaction(); realm.close(); }这里是示例源代码. https://github.com/savepopulation/wikilight祝你好运.I'm trying to implement RxJava + Realm + Retrofit + Repository PatternHere's my local implementation:@Overridepublic Observable<Page> search(@NonNull final String query) { return Realm.getDefaultInstance().where(Page.class) .equalTo("query", query) .findAll() .asObservable() .cast(Page.class); }Here's my remote implementation: @Override public Observable<Page> search(@NonNull String query) { return mWikiServices.search(query).map(new Func1<Result, Page>() { @Override public Page call(Result result) { final List<Page> pages = new ArrayList<>(result.getQuery().getPages().values()); return pages.get(0); } }); }Here's my repo implementation: final Observable<Page> localResult = mSearchLocalDataSource.search(query); final Observable<Page> remoteResult = mSearchRemoteDataSource.search(query) .doOnNext(new Action1<Page>() { @Override public void call(Page page) { //mSearchLocalDataSource.save(query, page); //mResultCache.put(query, page); } }); return Observable.concat(localResult, remoteResult) .first() .doOnError(new Action1<Throwable>() { @Override public void call(Throwable throwable) { throwable.printStackTrace(); } });And finally here's my subscription in presenter.final Subscription subscription = mSearchRepository.search(this.mQuery) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<Page>() { @Override public void onCompleted() { // Completed } @Override public void onError(Throwable e) { mView.onDefaultMessage(e.getMessage()); } @Override public void onNext(Page page) { mView.onDefaultMessage(page.getContent()); } }); mCompositeSubscription.add(subscription);When i run code i get this exception: Realm access from incorrect thread. Realm objects can only be accessed on the thread they were created.I tried official solutions in Realm Github repo but none of them worked. Still get this exception.I think i get this exception because i'm subscribing on an io thread. Realm instance gets created in Main thread. So i get this exception.Are there any implementation offers?Thanks. 解决方案 After a long research i found the solution.First let's remember the problem: When i subscribe a Schedulars.io thread and try to get data from realm or retrofit i get "Realm access from incorrect thread. Realm objects can only be accessed on the thread they were created"Main problem in this case is i create Realm instance in Main thread but try to access it from worker thread.We can subscribe realm on main thread but this's not a good practice. When i use realm.where("query",query).findFirstAsync().asObservable();as an example in Github repo i get stuck at Observable.concat(localResult, remoteResult).first();What's my solution?In our Repository implementation still we have two observables for remote and local like below:final Observable<Page> localResult = mSearchLocalDataSource.search(query);final Observable<Page> remoteResult = mSearchRemoteDataSource.search(query) .doOnNext(new Action1<Page>() { @Override public void call(Page page) { if (page != null) { mSearchLocalDataSource.save(query, page); mResultCache.put(query, page); } } });Take attention on we save to data and cache in memory when we get data from remote.Still we concat two observables if we cannot get data from cache.return Observable.concat(localResult, remoteResult) .first() .map(new Func1<Page, Page>() { @Override public Page call(Page page) { if (page == null) { throw new NoSuchElementException("No result found!"); } return page; } });With concat we try to get data from realm and if we can't we try to get from remote.Here's remote observable implementation:@Overridepublic Observable<Page> search(@NonNull String query) { return mWikiServices.search(query).flatMap(new Func1<Result, Observable<Page>>() { @Override public Observable<Page> call(Result result) { final ArrayList<Page> pages = new ArrayList<>(result.getQuery().getPages().values()); Log.i("data from", "remote"); return Observable.from(pages).first(); } }); }Here's local source implementation:@Overridepublic Observable<Page> search(@NonNull final String query) { return Observable.create(new Observable.OnSubscribe<Page>() { @Override public void call(Subscriber<? super Page> subscriber) { final Realm realm = Realm.getInstance(mRealmConfiguration); final Page page = realm.where(Page.class) .equalTo("query", query) .findFirst(); if (page != null && page.isLoaded() && page.isValid()) { Log.i("data from", "realm"); subscriber.onNext(realm.copyFromRealm(page)); } else { Observable.empty(); } subscriber.onCompleted(); realm.close(); } }); }The point is i create a new Observable and get data from realm in there. So we create realm instance and use it in same thread. (io thread). We create copy of object to get rid of illegal state exception.When we get data from realm if null we return an empty observable to do not get stuck in concat operation.if we get page, it's valid and loaded we send to subscriber and complete operation.Here how we can save the data we get from remote to realm:@Overridepublic void save(@NonNull String query, @NonNull Page page) { final Realm realm = Realm.getInstance(mRealmConfiguration); realm.beginTransaction(); final Page p = realm.createObject(Page.class); p.setQuery(query); p.setId(page.getId()); p.setTitle(page.getTitle()); p.setContent(page.getContent()); realm.copyToRealmOrUpdate(p); realm.commitTransaction(); realm.close(); }Here's example source code.https://github.com/savepopulation/wikilightGood luck. 这篇关于Android Realm + RxJava-来自错误线程的领域访问.只能在创建对象的线程上访问领域对象的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持! 上岸,阿里云! 07-25 13:55