我正在编写以下代码片段,以从Firebase数据库中获取已保存食物的列表,然后使用该列表,我再次从Firebase数据库中获取单个食物的详细信息。

以下代码工作正常,除了我无法弄清楚如何让第二个flatMap知道第一个flatMap的发射已经完成(所有食物 list 都已处理)。所以我无法调用onCompleted()方法,因此无法检测到整个过程何时完成。

看看以下片段中的评论:

Observable.create<List<PersonalizedFood>> {

            FirebaseDTDatabase.getSavedDietFoodQuery(user.uid).addListenerForSingleValueEvent(object : ValueEventListener {
                override fun onCancelled(p0: DatabaseError?) {

                }

                override fun onDataChange(p0: DataSnapshot?) {
                    val list = ArrayList<PersonalizedFood>()
                    p0?.let {
                        for (dateObject in p0.children) {
                            for (foodItem in dateObject.children) {
                                val food = foodItem.getValue(FBPersonalizedFood::class.java) as FBPersonalizedFood
                                list.add(PersonalizedFood(food))
                            }
                        }
                    }
                    it.onNext(list)
                    it.onCompleted()
                }
            })
        }.subscribeOn(Schedulers.io()).flatMap {
            Observable.from(it) // returning a Observable that emits items of list ("it" is the list here)
        }.observeOn(Schedulers.io()).flatMap {
        // How does this flatMap know that emission of all item has been finished so that onCompleted() method could be called.
            personalizedFood ->

            Observable.create<Boolean>{
                FirebaseDTDatabase.getFoodListReference(personalizedFood.foodId).addListenerForSingleValueEvent(object :ValueEventListener{
                    override fun onCancelled(p0: DatabaseError?) {
                        it.onError(p0?.toException())
                    }

                    override fun onDataChange(p0: DataSnapshot?) {
                        if(p0 != null) {
                            val food = p0.getValue(FBFood::class.java)!!
                            val repo = LocalFoodRepository()
                            doAsync {
                                repo.insertFood(this@LoginActivity, Food(food.foodId, food.foodName, food.foodDesc))
                                repo.insertServingDetails(this@LoginActivity, food.servingList.map { it.component2() })
                                repo.saveFood(this@LoginActivity, personalizedFood)
                                it.onNext(true)
                            }

                        }else {
                            it.onNext(false)
                        }
                    }

                })
            }
        }.observeOn(Schedulers.io()).doOnCompleted{
            dismissProgressDialog()
            finish()
        }.doOnError{
            it.printStackTrace()
            dismissProgressDialog()
            finish()
        }.subscribe()

谢谢。

最佳答案

Observable发出的所有可观察对象都调用flatMap时,onCompleted()中的flatMap知道“何时完成所有项目”。您代码中的第二个onCompleted()永远不会调用onCompleted(),因为它所创建的任何可观察对象都不会调用onCompleted()

您应该在onDataChange()方法中调用flatMap。由于在onNext()中创建的每个可观察对象仅发出一项,因此可以在ojit_code方法之后直接调用它:

override fun onDataChange(p0: DataSnapshot?) {
    if(p0 != null) {
        val food = p0.getValue(FBFood::class.java)!!
        val repo = LocalFoodRepository()
        doAsync {
            repo.insertFood(this@LoginActivity, Food(food.foodId, food.foodName, food.foodDesc))
            repo.insertServingDetails(this@LoginActivity, food.servingList.map { it.component2() })
            repo.saveFood(this@LoginActivity, personalizedFood)
            it.onNext(true)
            it.onCompleted()
        }
    } else {
        it.onNext(false)
        it.onCompleted()
    }
}

08-06 10:43