问题描述
我使用了PublishSubject,并且正在向其发送消息,并且还在监听结果.它运行得很完美,但是现在我不确定如何对Kotlin的协程(流或通道)执行相同的操作.
I used a PublishSubject and I was sending messages to it and also I was listening for results. It worked flawlessly, but now I'm not sure how to do the same thing with Kotlin's coroutines (flows or channels).
private val subject = PublishProcessor.create<Boolean>>()
...
fun someMethod(b: Boolean) {
subject.onNext(b)
}
fun observe() {
subject.debounce(500, TimeUnit.MILLISECONDS)
.subscribe { /* value received */ }
}
因为我需要使用去抖运算符,所以我真的想对流执行相同的操作,因此我创建了一个通道,然后尝试从该通道创建一个流并监听更改,但是没有得到任何结果. /p>
Since I need the debounce operator I really wanted to do the same thing with flows so I created a channel and then I tried to create a flow from that channel and listen to changes, but I'm not getting any results.
private val channel = Channel<Boolean>()
...
fun someMethod(b: Boolean) {
channel.send(b)
}
fun observe() {
flow {
channel.consumeEach { value ->
emit(value)
}
}.debounce(500, TimeUnit.MILLISECONDS)
.onEach {
// value received
}
}
怎么了?
推荐答案
Flow
是冷的异步流,就像Obserable
一样.
onEach
方法仅仅是一种转换.因此,您应该将其替换为终端流量运算符collect
.您也可以使用BroadcastChannel
来获得更简洁的代码:
The onEach
method is just a transformation. Therefore you should replace it with the terminal flow operator collect
. Also you could use a BroadcastChannel
to have cleaner code:
private val channel = BroadcastChannel<Boolean>(1)
suspend fun someMethod(b: Boolean) {
channel.send(b)
}
suspend fun observe() {
channel
.asFlow()
.debounce(500)
.collect {
// value received
}
}
这篇关于发布主题与Kotlin协程(流程)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!