我正在尝试实现从BlockingQueue创建的反应堆Flux,但不确定哪个运算符最适合我的用例?
我正在创建一个流式REST端点,响应是Flux,它需要不断从BlockingQueue发出消息作为对GET REST调用的响应。
我已经尝试过论坛和文档,并且只能从可迭代的集合或反应性数据源中找到Flux,但是从任何BlockingQueue中都找不到示例。
最佳答案
您可以尝试Flux#generate和Queue#peek。请记住,如果队列为空,则peek
将返回null
,并且不能在onNext
中使用。
就像是:
Flux.generate(sink -> {
val element = queue.peek();
if (element == null) {
sink.complete();
} else {
sink.next(element);
}
});
还有一个Flux#repeatWhen运算符,以防您想在队列被认为为空后重新订阅该队列,例如与:
flux.repeatWhen(it -> it.delayElements(ofSeconds(1)))
关于java - 如何从阻塞队列创建 react 堆流量?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/55566891/