我有一个StateFlow协程,在我的应用程序的各个部分之间共享。当我对下游收集器的cancel进行CoroutineScope编码时,JobCancellationException会传播到StateFlow,并停止为所有当前和将来的收集器发出值。StateFlow:

val songsRelay: Flow<List<Song>> by lazy {
    MutableStateFlow<List<Song>?>(null).apply {
        CoroutineScope(Dispatchers.IO)
            .launch { songDataDao.getAll().distinctUntilChanged().collect { value = it } }
    }.filterNotNull()
}
我的代码中典型的“演示者”实现了以下基类:
abstract class BasePresenter<T : Any> : BaseContract.Presenter<T> {

    var view: T? = null

    private val job by lazy {
        Job()
    }

    private val coroutineScope by lazy { CoroutineScope( job + Dispatchers.Main) }

    override fun bindView(view: T) {
        this.view = view
    }

    override fun unbindView() {
        job.cancel()
        view = null
    }

    fun launch(block: suspend CoroutineScope.() -> Unit): Job {
        return coroutineScope.launch(block = block)
    }
}
BasePresenter实现可能会调用launch{ songsRelay.collect {...} }当演示者不受约束时,为了防止泄漏,我取消了父作业。每当收集songsRelay StateFlow的演示者不受约束时,StateFlow本质上就以JobCancellationException终止,并且其他收集器/演示者都无法从中收集值。
我注意到我可以改为调用job.cancelChildren(),这似乎可以工作(StateFlow不能用JobCancellationException完成)。但是然后我想知道如果不能取消作业本身,则声明父job有什么意义。我可以完全删除job,然后调用coroutineScope.coroutineContext.cancelChildren()达到相同的效果。
如果我只调用job.cancelChildren(),就足够了吗?我感觉好像不调用coroutineScope.cancel()job.cancel(),可能无法正确或完全清除我已经开始的任务。
我也不明白为什么在调用JobCancellationExceptionjob.cancel()会沿层次结构向上传播。 job不是这里的“ parent ”吗?为什么取消它会影响我的StateFlow

最佳答案

更新:
您确定所有演示者实际上都取消了自己的songRelay吗?我进行了此测试,并打印了“歌曲中继完成”,因为onCompletion也捕获了下游异常。但是,演示者2发出的值2恰好,之后歌曲中继打印“完成”。如果我取消了Presenter 2,将再次显示Presenter 2的JobJobcellcellationException,即“歌曲中继已完成”。
我确实发现有趣的是,一个流实例将如何为每个订阅的收集器发出一次。我没有意识到关于流量的问题。

    val songsRelay: Flow<Int> by lazy {
        MutableStateFlow<Int?>(null).apply {
            CoroutineScope(Dispatchers.IO)
                    .launch {
                        flow {
                            emit(1)
                            delay(1000)
                            emit(2)
                            delay(1000)
                            emit(3)
                        }.onCompletion {
                            println("Dao completed")
                        }.collect { value = it }
                    }
        }.filterNotNull()
                .onCompletion { cause ->
                    println("Song relay completed: $cause")
                }
    }

    @Test
    fun test() = runBlocking {
        val job = Job()
        val presenterScope1 = CoroutineScope(job + Dispatchers.Unconfined)
        val presenterScope2 = CoroutineScope(Job() + Dispatchers.Unconfined)

        presenterScope1.launch {
            songsRelay.onCompletion { cause ->
                println("Presenter 1 Completed: $cause")
            }.collect {
                println("Presenter 1 emits: $it")
            }
        }

        presenterScope2.launch {
            songsRelay.collect {
                println("Presenter 2 emits: $it")
            }
        }

        presenterScope1.cancel()

        delay(2000)
        println("Done test")
    }
我认为您需要在BasePresenter中使用 SupervisorJob 而不是Job。通常,对于整个演示者来说,使用Job是一个错误,因为一个失败的协程将取消Presenter中的所有协程。通常不是您想要的。

关于android - 当取消CoroutineScope时,Coroutine StateFlow停止发射,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/62747825/

10-10 23:06