问题描述
我正在使用轮询方法定期获取数据.新数据可能随时到达.我想向我的客户公开一个反应式接口.所以,我想创建一个发布者(Flux?),它会在新数据可用时发布新数据并通知订阅者.我怎么做?我看到的所有 Flux 示例都是针对数据已知/可用的情况.实际上,我想要类似基于队列的 Flux 之类的东西,并且我的轮询线程可以在找到新数据时继续填充队列.
I am using a polling method to fetch data periodically. New data may arrive at any time. I want to expose a reactive interface to my client. So, I want to create a publisher (Flux?) that would publish new data when it becomes available and notify the subscriber(s). How do I do that? All the examples of Flux that I see are for the cases where the data is already known/available. Effectively, I want something like a Flux based on a queue and my polling thread can keep filling the queue when it finds new data.
推荐答案
对于简单的事情,您可能需要使用 DirectProcessor
.这不是最复杂的助焊剂汇,但它会让您有所了解.
For something simple, you might want to use a DirectProcessor
. This isn't the most complex of flux sinks, but it'll get you a bit of the way there.
我写了一个简单的例子:
I wrote a quick example:
Flux<String> hot = DirectProcessor.create<String>()
hot.onNext("Hello")//not printed
hot.subscribe(it -> System.out.println(it))
hot.onNext("Goodbye")//printed
Thread.sleep(100)
hot.onNext("foo")//printed
DirectProcessor 实现了 Flux,所以你可以像 Flux 一样使用它.
DirectProcessor implements Flux, so you can use it like a Flux.
如您所见,在订阅热源之前添加的元素不会传递给订阅者.
As you can see, elements added before subscribing to the hotsource won't be passed down to the subscribe.
查看其他帖子,Flux#create 和 Flux#generate 可能是不错的起点.Flux.create 和 Flux.generate 之间的区别 <- 这将让您更加复杂并控制通量.
Looking at other posts, Flux#create and Flux#generate might be good places to start. Difference Between Flux.create and Flux.generate <- this will get you more complexity and control over the flux.
这篇关于如何为流数据创建 Flux/Publisher的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!