本文介绍了如何为流数据创建 Flux/Publisher的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用轮询方法定期获取数据.新数据可能随时到达.我想向我的客户公开一个反应式接口.所以,我想创建一个发布者(Flux?),它会在新数据可用时发布新数据并通知订阅者.我怎么做?我看到的所有 Flux 示例都是针对数据已知/可用的情况.实际上,我想要类似基于队列的 Flux 之类的东西,并且我的轮询线程可以在找到新数据时继续填充队列.

I am using a polling method to fetch data periodically. New data may arrive at any time. I want to expose a reactive interface to my client. So, I want to create a publisher (Flux?) that would publish new data when it becomes available and notify the subscriber(s). How do I do that? All the examples of Flux that I see are for the cases where the data is already known/available. Effectively, I want something like a Flux based on a queue and my polling thread can keep filling the queue when it finds new data.

推荐答案

对于简单的事情,您可能需要使用 DirectProcessor.这不是最复杂的助焊剂汇,但它会让您有所了解.

For something simple, you might want to use a DirectProcessor. This isn't the most complex of flux sinks, but it'll get you a bit of the way there.

我写了一个简单的例子:

I wrote a quick example:

Flux<String> hot = DirectProcessor.create<String>()
hot.onNext("Hello")//not printed
hot.subscribe(it -> System.out.println(it))

hot.onNext("Goodbye")//printed
Thread.sleep(100)
hot.onNext("foo")//printed

DirectProcessor 实现了 Flux,所以你可以像 Flux 一样使用它.

DirectProcessor implements Flux, so you can use it like a Flux.

如您所见,在订阅热源之前添加的元素不会传递给订阅者.

As you can see, elements added before subscribing to the hotsource won't be passed down to the subscribe.

查看其他帖子,Flux#create 和 Flux#generate 可能是不错的起点.Flux.create 和 Flux.generate 之间的区别 <- 这将让您更加复杂并控制通量.

Looking at other posts, Flux#create and Flux#generate might be good places to start. Difference Between Flux.create and Flux.generate <- this will get you more complexity and control over the flux.

这篇关于如何为流数据创建 Flux/Publisher的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-12 17:41