我正在尝试实现从BlockingQueue创建的反应堆Flux,但不确定哪个运算符最适合我的用例?

我正在创建一个流式REST端点,响应是Flux,它需要不断从BlockingQueue发出消息作为对GET REST调用的响应。

我已经尝试过论坛和文档,并且只能从可迭代的集合或反应性数据源中找到Flux,但是从任何BlockingQueue中都找不到示例。

最佳答案

您可以尝试Flux#generateQueue#peek。请记住,如果队列为空,则peek将返回null,并且不能在onNext中使用。

就像是:

Flux.generate(sink -> {
    val element = queue.peek();
    if (element == null) {
        sink.complete();
    } else {
        sink.next(element);
    }
});


还有一个Flux#repeatWhen运算符,以防您想在队列被认为为空后重新订阅该队列,例如与:

flux.repeatWhen(it -> it.delayElements(ofSeconds(1)))

关于java - 如何从阻塞队列创建 react 堆流量?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/55566891/

10-12 18:06