本文介绍了如何通过另一种方法向反应堆热通量动态添加元素?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个数据源服务,该服务将观察者作为参数.

I have a data source service, which takes an observer as a parameter.

void subscribe(Consumer onEventConsumer);

我想使用flux作为RSocket的响应流.我怎样才能做到这一点?我现在看到的应该是

I want to use flux as a response stream for RSocket.How can I do this?As I see it now, it should be something like

Flux<T> controllerMethod(RequestMessage mgs) {
   var flux = Flux.empty();
   dataSource.subscribe(event -> flux.push(event));
   return flux;
}

但是我非常怀疑这是一个合适的解决方案,而且我是反应式方法的新手,我不知道在这里应该使用哪种方法?

But I have big doubts that it's a proper solution, and I'm new in the reactive approach, I don't know what methods I should use here?

推荐答案

正如Simon所指出的,这就是您使用 Flux.create 的目的.

As Simon already pointed out, this is what you use Flux.create for.

在入门指南.="https://projectreactor.io/" rel ="nofollow noreferrer"> projectreactor.io .

Take a look at the Getting Started Guide on projectreactor.io.

在镜头中,您在 create 方法的lambda中注册了一个自定义侦听器:

In shot, you register a custom listener inside the lambda of the create method:

Flux<String> bridge = Flux.create(sink -> {
    myEventProcessor.register( 
      new MyEventListener<String>() { 

        public void onDataChunk(List<String> chunk) {
          for(String s : chunk) {
            sink.next(s); 
          }
        }

        public void processComplete() {
            sink.complete(); 
        }
    });
});

您要做的是将传入元素传递到 FluxSink ,然后将这些元素发布到Flux上.

What you want to do is to pass the incoming elements on to a FluxSink, which will then publish those elements on the Flux.

这篇关于如何通过另一种方法向反应堆热通量动态添加元素?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-24 16:00