发布主题与Kotlin协程

发布主题与Kotlin协程

本文介绍了发布主题与Kotlin协程(流程)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用了PublishSubject,并且正在向其发送消息,并且还在监听结果.它运行得很完美,但是现在我不确定如何对Kotlin的协程(流或通道)执行相同的操作.

I used a PublishSubject and I was sending messages to it and also I was listening for results. It worked flawlessly, but now I'm not sure how to do the same thing with Kotlin's coroutines (flows or channels).

private val subject = PublishProcessor.create<Boolean>>()

...

fun someMethod(b: Boolean) {
    subject.onNext(b)
}

fun observe() {
    subject.debounce(500, TimeUnit.MILLISECONDS)
           .subscribe { /* value received */ }
}

因为我需要使用去抖运算符,所以我真的想对流执行相同的操作,因此我创建了一个通道,然后尝试从该通道创建一个流并监听更改,但是没有得到任何结果. /p>

Since I need the debounce operator I really wanted to do the same thing with flows so I created a channel and then I tried to create a flow from that channel and listen to changes, but I'm not getting any results.

private val channel = Channel<Boolean>()

...

fun someMethod(b: Boolean) {
    channel.send(b)
}

fun observe() {
    flow {
         channel.consumeEach { value ->
            emit(value)
         }
    }.debounce(500, TimeUnit.MILLISECONDS)
    .onEach {
        // value received
    }
}

怎么了?

推荐答案

Flow 是冷的异步流,就像Obserable一样.

onEach方法仅仅是一种转换.因此,您应该将其替换为终端流量运算符collect.您也可以使用BroadcastChannel来获得更简洁的代码:

The onEach method is just a transformation. Therefore you should replace it with the terminal flow operator collect. Also you could use a BroadcastChannel to have cleaner code:

private val channel = BroadcastChannel<Boolean>(1)

suspend fun someMethod(b: Boolean) {
    channel.send(b)
}

suspend fun observe() {
  channel
    .asFlow()
    .debounce(500)
    .collect {
        // value received
    }
}

这篇关于发布主题与Kotlin协程(流程)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-30 07:57