我正在尝试调用Web服务来获取数据,并使用以下代码将其存储到数据库中。我创建了一个单独的类来执行以下操作。

现在,问题是当我成功获取数据并将其存储在数据库中时,我想通知我的活动。如果发生一些错误,那么我想在UI本身上显示出来。

我以某种方式能够编写代码以使用分页获取数据,但是不确定如何通知UI可以订阅的UI捕获与进度和错误相关的更新(如果有)。

public Flowable<Response> getFitnessData() {

        Request request = new Request();
        request.setAccess_token("d80fa6bd6f78cc704104d61146c599bc94b82ca225349ee68762fc6c70d2dcf0");

        Flowable<Response> fitnessFlowable = new WebRequest()
                                            .getRemoteClient()
                                            .create(FitnessApi.class)
                                            .getFitnessData("5b238abb4d3590001d9b94a8",request.toMap());


         fitnessFlowable.subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .takeUntil(response->response.getSummary().getNext()!=null)

                .subscribe(new Subscriber<Response>() {
                    @Override
                    public void onSubscribe(Subscription s) {

                        s.request(Long.MAX_VALUE);
                    }

                    @Override
                    public void onNext(Response response) {

                        Log.e(TAG, "onNext" );

                        if(response !=null){

                            if(response.getFitness()!=null && response.getFitness().size()!=0){

                                Realm realm = Realm.getDefaultInstance();
                                realm.executeTransactionAsync(new Realm.Transaction() {
                                    @Override
                                    public void execute(Realm realm) {

                                        realm.copyToRealmOrUpdate(response.getFitness());

                                    }
                                }, new Realm.Transaction.OnSuccess() {
                                    @Override
                                    public void onSuccess() {

                                        Log.i(TAG, "onSuccess , fitness data saved");

                                    }
                                }, new Realm.Transaction.OnError() {
                                    @Override
                                    public void onError(Throwable error) {
                                        Log.i(TAG, "onError , fitness data failed to save"+error.getMessage() );
                                    }
                                });
                            }else{

                                Log.i(TAG, "onError , no fitness data available" );


                            }

                        }else{
                            Log.i(TAG, "onError , response is null" );

                        }
                    }

                    @Override
                    public void onError(Throwable t) {


                        Log.e(TAG, "onError" +t.getMessage());
                    }

                    @Override
                    public void onComplete() {

                        Log.e(TAG, "onComplete");
                    }
                });;

            return null;

    }


更新

创建RxBus以在UI上传播事件和错误

public class RxBus {

    private static final RxBus INSTANCE = new RxBus();

    private RxBus(){}
    private PublishSubject<Object> bus = PublishSubject.create();

    public static RxBus getInstance() {
        return INSTANCE;
    }


    public void send(Object o) {
        bus.onNext(o);
    }

    public void error(Throwable e){bus.onError(e);}

    public Observable<Object> toObservable() {
        return bus;
    }
}


活动中

 FitnessRepo fitnessRepo = new FitnessRepo();
        fitnessRepo.getFitnessData();
        RxBus.getInstance().toObservable().subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Object o) {

                if(o instanceof RealmList ){

                    RealmList<Fitness> realmList = (RealmList<Fitness>) o;
                    Log.e(TAG,"Fitness data size "+realmList.size());

                }
            }

            @Override
            public void onError(Throwable e) {

                Log.e(TAG,e.getMessage()+ "");

                if (e instanceof HttpException) {
                    ResponseBody body = ((HttpException) e).response().errorBody();


                    try {

                        Response response=  LoganSquare.parse(body.byteStream(),Response.class);

                        if(response.getErrors() !=null)
                            if(response.getErrors().size() > 0)
                                Log.e(TAG, "Error "+response.getErrors().get(0).getErrors());
                    } catch (IOException t) {
                        t.printStackTrace();
                    }

                }
            }

            @Override
            public void onComplete() {

            }
        });


在网络服务呼叫中

public void getFitnessData() {


        Request request = new Request();
        request.setAccess_token("d80fa6bd6f78cc704104d61146c599bc94b82ca225349ee68762fc6c70d2dcf0");
        request.setEnd_date("2018-07-01T00:00:00");
        Flowable<Response> fitnessFlowable = new WebRequest()
                .getRemoteClient()
                .create(FitnessApi.class)
                .getFitnessData("5b238abb4d3590001d9b94a8",request.toMap());


         fitnessFlowable.subscribeOn(Schedulers.io())
                .takeUntil(response->response.getSummary().getNext()!=null)
                .doOnNext((response) -> {
                    if(response ==null || response.getFitness() == null || response.getFitness().isEmpty()) {


                        Log.e(TAG, " Error ");
                        return;
                    }

                    RxBus.getInstance().send(response.getFitness());

                    try(Realm r = Realm.getDefaultInstance()) {
                        r.executeTransaction((realm) -> {
                            realm.copyToRealmOrUpdate(response.getFitness());
                        });
                    }
                }).subscribe(item ->{


                 },
                 error ->{

                     RxBus.getInstance().error(error);


                 });
    }

最佳答案

我有个好消息要给你!您可以删除几乎所有这些代码,从而使其总体上更好!

public void fetchFitnessData() {

    Request request = new Request();
    request.setAccess_token("d80fa6bd6f78cc704104d61146c599bc94b82ca225349ee68762fc6c70d2dcf0");

    Flowable<Response> fitnessFlowable = new WebRequest()
                                        .getRemoteClient()
                                        .create(FitnessApi.class)
                                        .getFitnessData("5b238abb4d3590001d9b94a8",request.toMap());


     fitnessFlowable.subscribeOn(Schedulers.io())
            .takeUntil(response->response.getSummary().getNext()!=null)
            .doOnNext((response) -> {
                    if(response ==null || response.getFitness() == null || response.getFitness().isEmpty()) return;

                    try(Realm r = Realm.getDefaultInstance()) {
                        r.executeTransaction((realm) -> {
                            realm.insertOrUpdate(response.getFitness());
                        });
                    }
                }
            }).subscribe();
}


此方法现在在后台线程上并返回void,因此从该方法发出内容的方法将是使用PublishSubject(一个表示成功,一个表示失败)或EventBus

private PublishSubject<Object> fitnessResults;
public Observable<Object> observeFitnessResults() {
    return fitnessResults;
}

public static class Success {
    public Success(List<Fitness> data) {
        this.data = data;
    }

    public List<Fitness> data;
}

public static class Failure {
    public Failure(Exception exception) {
        this.exception = exception;
    }

    public Exception exception;
}

public void fetchFitnessData() {
    ...
        fitnessResults.onNext(new Success(data));
    } catch(Exception e) {
        fitnessResults.onNext(new Failure(e));


接着

errors = observeFitnessResults().ofType(Error.class);
success = observeFitnessResults().ofType(Success.class);

关于android - 从Web服务获取数据存储后,在RxJava中返回订户,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/51048199/

10-12 04:18
查看更多