我的Rx调用的总体工作流程应如下工作(无论当前的Rx代码如何):
Room
Dao
获取运动传感器读数的列表(目的是将它们上载到REST API)。我正在为此Single<List<Reading>>
readings
列表为空,则执行jobFinished()
回调,并在此readings
不为空,则将网络调用链接到此Single
。网络通话返回Completable
Single
从不逻辑上引发错误,因为它获取空或非空的readings
列表jobFinished()
回调reading
Dao
sSingle
成功但Completable
错误,请更新Dao
的读数我当前的代码如下:
Single.create<List<Reading>> {
readings = readingDao.getNextUploadBatch()
if (readings.isEmpty()) {
jobFinished(job, false)
return@create
}
it.onSuccess(readings)
}
.flatMapCompletable { api.uploadSensorReadings(it) }
.doOnTerminate {
jobFinished(job, !readingDao.isEmpty())
}
.subscribeOn(rxSchedulers.network)
.observeOn(rxSchedulers.database)
.subscribe(
{
readingDao.delete(*readings.toTypedArray())
},
{
markCurrentReadingsAsNotUploading()
}
)
上面代码的逻辑问题是(尚未在运行时对其进行测试,但已编译):
flatMapCompletable
列表为空,我想从readings
开始截断代码doOnTerminate
为空,我不希望readings
执行onComplete
的{}
部分(第一个subscribe
块)执行,除非readings
非空,并且Completable
也返回成功onError
的{}
部分(第二个subscribe
块)执行,除非readings
不为空,并且Completable
失败我不确定如何将我的工作流程实现为高效,整洁的Rx调用链。任何建议都将受到欢迎!
最佳答案
如果要根据值执行不同的操作,请考虑flatMap
:
Single.fromCallable(() -> readingDao.getNextUploadBatch())
.subscribeOn(rxSchedulers.network)
.flatMapCompletable(readings -> {
if (readings.isEmpty()) {
jobFinished(job, false);
return Completable.complete();
}
return api.uploadSensorReadings(readings)
.doFinally(() -> jobFinished(job, !readingDao.isEmpty()))
.observeOn(rxSchedulers.database)
.doOnComplete(() -> readingDao.delete(readings.toTypedArray()))
})
.subscribe(() -> /* ignored */, error -> markCurrentReadingsAsNotUploading());
关于android - 有条件的单链和可完成链,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/49332431/