我的Rx调用的总体工作流程应如下工作(无论当前的Rx代码如何):

  • Room Dao获取运动传感器读数的列表(目的是将它们上载到REST API)。我正在为此
  • 使用Single<List<Reading>>
  • 如果该readings列表为空,则执行jobFinished()回调,并在此
  • 之后不执行任何操作
  • 如果readings不为空,则将网络调用链接到此Single。网络通话返回Completable
  • Single从不逻辑上引发错误,因为它获取空或非空的readings列表
  • 当整个Rx调用链终止时,执行jobFinished()回调
  • 在整个Rx调用链成功之后,从reading
  • 中删除那些Dao s
  • 如果Single成功但Completable错误,请更新Dao的读数

  • 我当前的代码如下:
      Single.create<List<Reading>> {
            readings = readingDao.getNextUploadBatch()
    
            if (readings.isEmpty()) {
                jobFinished(job, false)
                return@create
            }
    
            it.onSuccess(readings)
        }
                .flatMapCompletable { api.uploadSensorReadings(it) }
                .doOnTerminate {
                    jobFinished(job, !readingDao.isEmpty())
                }
                .subscribeOn(rxSchedulers.network)
                .observeOn(rxSchedulers.database)
                .subscribe(
                        {
                            readingDao.delete(*readings.toTypedArray())
                        },
                        {
                            markCurrentReadingsAsNotUploading()
                        }
                )
    

    上面代码的逻辑问题是(尚未在运行时对其进行测试,但已编译):
  • 如果flatMapCompletable列表为空,我想从readings开始截断代码
  • 如果doOnTerminate为空,我不希望readings执行
  • 我不希望onComplete{}部分(第一个subscribe块)执行,除非readings非空,并且Completable也返回成功
  • 我不希望onError{}部分(第二个subscribe块)执行,除非readings不为空,并且Completable失败

  • 我不确定如何将我的工作流程实现为高效,整洁的Rx调用链。任何建议都将受到欢迎!

    最佳答案

    如果要根据值执行不同的操作,请考虑flatMap:

    Single.fromCallable(() -> readingDao.getNextUploadBatch())
    .subscribeOn(rxSchedulers.network)
    .flatMapCompletable(readings -> {
        if (readings.isEmpty()) {
            jobFinished(job, false);
            return Completable.complete();
        }
        return api.uploadSensorReadings(readings)
               .doFinally(() -> jobFinished(job, !readingDao.isEmpty()))
               .observeOn(rxSchedulers.database)
               .doOnComplete(() -> readingDao.delete(readings.toTypedArray()))
    })
    .subscribe(() -> /* ignored */, error -> markCurrentReadingsAsNotUploading());
    

    关于android - 有条件的单链和可完成链,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/49332431/

    10-10 19:47