

我正在尝试新协程的流程,我的目标是创建一个简单的存储库,该存储库可以从Web api获取数据并将其保存到db,还可以从db返回流.

I'm trying out the new coroutine's flow, my goal is to make a simple repository that can fetch data from a web api and save it to db, also return a flow from the db.


I'm using room and firebase as the web api, now everything seems pretty straight forward until i try to pass errors coming from the api to the ui.

由于我从仅包含数据且不包含状态的数据库中获取数据流,通过将其与Web api结果相结合来赋予其状态(如加载,内容,错误)的正确方法是什么?

Since i get a flow from the database which only contains the data and no state, what is the correct approach to give it a state (like loading, content, error) by combining it with the web api result?



@Query("SELECT * FROM users")
fun getUsers(): Flow<List<UserPojo>>


val users: Flow<List<UserPojo>> = userDao.getUsers()


override fun downloadUsers(filters: UserListFilters, onResult: (result: FailableWrapper<MutableList<UserApiPojo>>) -> Unit) {
    val data = Gson().toJson(filters)

    functions.getHttpsCallable("users").call(data).addOnSuccessListener {
        try {
            val type = object : TypeToken<List<UserApiPojo>>() {}.type
            val users = Gson().fromJson<List<UserApiPojo>>(it.data.toString(), type)
            onResult.invoke(FailableWrapper(users.toMutableList(), null))
        } catch (e: java.lang.Exception) {
            onResult.invoke(FailableWrapper(null, "Error parsing data"))
    }.addOnFailureListener {
        onResult(FailableWrapper(null, it.localizedMessage))


I hope the question is clear enoughThanks for the help


Since the question wasn't clear i'll try to clarify. My issue is that with the default flow emitted by room you only have the data, so if i were to subscribe to the flow i would only receive the data (eg. In this case i would only receive a list of users). What i need to achieve is some way to notify the state of the app, like loading or error. At the moment the only way i can think of is a "response" object that contains the state, but i can't seem to find a way to implement it.


fun getUsers(): Flow<Lce<List<UserPojo>>>{
        return flowFromDatabase


But the obvious issue i'm running into is that the flow from the database is of type Flow<List<UserPojo>>, i don't know how to "enrich it" with the state editing the flow, without losing the subscription from the database and without running a new network call every time the db is updated (by doing it in a map transformation).




I believe this is more of an architecture question, but let me try to answer some of your questions first.

如果Room返回的Flow出现错误,则可以通过 catch()

If there is an error with the Flow returned by Room, you can handle it via catch()


I agree with you that having a State object is a good approach. In my mind, it is the ViewModel's responsibility to present the State object to the View. This State object should have a way to expose errors.


I have found that it is easier to have the State object that the ViewModel controls be responsible for errors instead of an object that bubbles up from the Service layer.


Now with these questions out of the way, let me try to propose one particular "solution" to your issue.


As you mention, it is common practice to have a Repository that handles retrieving data from multiple data sources. In this case, the Repository would take the DAO and an object that represents getting data from the network, let's call it Api. I am assuming that you are using FirebaseFirestore, so the class and method signature would look something like this:

class Api(private val firestore: FirebaseFirestore) {

fun getUsers() : Flow<List<UserApiPojo>


现在的问题是如何将基于回调的API转换为Flow.幸运的是,我们可以使用 callbackFlow() .然后Api变为:

Now the question becomes how to turn a callback based API into a Flow. Luckily, we can use callbackFlow() for this. Then Api becomes:

class Api(private val firestore: FirebaseFirestore) {

fun getUsers() : Flow<List<UserApiPojo> = callbackFlow {

val data = Gson().toJson(filters)

functions.getHttpsCallable("users").call(data).addOnSuccessListener {
    try {
        val type = object : TypeToken<List<UserApiPojo>>() {}.type
        val users = Gson().fromJson<List<UserApiPojo>>(it.data.toString(), type)
    } catch (e: java.lang.Exception) {
       cancel(CancellationException("API Error", e))
}.addOnFailureListener {
    cancel(CancellationException("Failure", e))


As you can see, callbackFlow allows us to cancel the flow when something goes wrong and have someone donwnstream handle the error.


Moving to the Repository we would now like to do something like:

val users: Flow<List<User>> = Flow.concat(userDao.getUsers().toUsers(), api.getUsers().toUsers()).first()

这里有一些警告. first()concat()是您似乎必须想出的运算符.我没有看到返回Flowfirst()版本;它是终端操作员(Rx以前具有first()的版本返回了Observable,Dan Lew在帖子). Flow.concat()似乎也不存在. users的目标是返回一个Flow,该Flow发出由任何源Flows发出的第一个值.另外,请注意,我正在将DAO用户和Api用户映射到一个常见的User对象.

There are a few caveats here. first() and concat() are operators you will have to come up with it seems. I did not see a version of first() that returns a Flow; it is a terminal operator (Rx used to have a version of first() that returned an Observable, Dan Lew uses it in this post). Flow.concat() does not seem to exist either. The goal of users is to return a Flow that emits the first value emitted by any of the source Flows. Also, note that I am mapping DAO users and Api users to a common User object.

我们现在可以讨论ViewModel.如前所述,ViewModel应该具有保存State的内容. State应该代表数据,错误和加载状态.可以实现的一种方法是使用数据类.

We can now talk about the ViewModel. As I said before, the ViewModel should have something that holds State. This State should represent data, errors and loading states. One way that can be accomplished is with a data class.

data class State(val users: List<User>, val loading: Boolean, val serverError: Boolean)


Since we have access to the Repository the ViewModel can look like:

val state = repo.users.map {users -> State(users, false, false)}.catch {emit(State(emptyList(), false, true)}


Please keep in mind that this is a rough explanation to point you in a direction, there are many ways to accomplish state management and this is by no means a complete implementation. It may not even make sense to turn the API call into a Flow, for example.


08-03 20:00