不了解如何在Kotlin中使通量订阅工作

不了解如何在Kotlin中使通量订阅工作

我是反应式编程的新手。我希望看到

test provider started
Beat 1000
Beat 2000

在日志中,但是只有test provider started,没有Beaton complete消息。好像我错过了什么
@Service
class ProviderService {

    @PostConstruct
    fun start(){
        val hb: Flux<HeartBeat> = Flux.interval(Duration.ofSeconds(1)).map { HeartBeat(it) }
        val provider = Provider("test", hb)
    }

}
////////////////////////

open class Provider(name: String, heartBests: Flux<HeartBeat>) {
    companion object {
        val log = LoggerFactory.getLogger(Provider::class.java)!!
    }

    init {
        log.info("$name provider started")
        heartBests.doOnComplete { log.info("on complete") }
        heartBests.doOnEach { onBeat(it.get().number) }
    }

    fun onBeat(n: Number){
        log.info("Beat $n")
    }
}

/////
class HeartBeat(val number: Number)

最佳答案

在您的代码中,从未调用'doOnComplete'中的lambda,因为您创建了无限流。方法“doOnEach”作为“映射”是中间操作(例如流中的map),它不会进行调用。
而且您还有另一个错误,反应式提示“流利模式”。

试试这个简单的例子:

import reactor.core.publisher.Flux
import java.time.Duration

fun main(args: Array<String>) {
    val flux = Flux.interval(Duration.ofSeconds(1)).map { HeartBeat(it) }

    println("start")

    flux.take(3)
            .doOnEach { println("on each $it") }
            .map { println("before map");HeartBeat(it.value * 2) }
            .doOnNext { println("on next $it") }
            .doOnComplete { println("on complete") }
            .subscribe { println("subscribe $it") }

    Thread.sleep(5000)
}

data class HeartBeat(val value: Long)

关于spring - 不了解如何在Kotlin中使通量订阅工作,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/47871325/

10-12 03:23