我有一个GET方法的示例,该方法生成一个Randon INT并将其发送回客户端

@GetMapping
public Flux<String> search() {
    return Flux.create(fluxSink -> {

            Random r = new Random();
            int n;
            for (int i = 0; i < 10; i++) {
                n= r.nextInt(1000);
                System.out.println("Creating:"+n);
                fluxSink.next(String.valueOf(n));

                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            fluxSink.complete();


    });
}

卷曲:

curl -v -H“接受:文本/事件流” -X GET
'http://localhost:8080/stream

通过使用CURL命令的此代码,只有在fluxSink.complete发生时,我才能在客户端中看到数字。

现在,如果我将代码更改为:
@GetMapping
public Flux<String> search() {
    return Flux.create(fluxSink -> {
        Thread t = new Thread(() -> {
            Random r = new Random();
            int n;
            for (int i = 0; i < 10; i++) {
                n= r.nextInt(1000);
                System.out.println("Creating:"+n);
                fluxSink.next(String.valueOf(n));

                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            fluxSink.complete();
        });
        t.start();
    });
}

将进程包装到一个线程中,效果很好。当fluxSink.next发生时,我可以看到数据传输良好。

谁能解释这种影响?如何在不显式使用线程的情况下查看数据流?

谢谢!

最佳答案

Flux.create()与“通过FluxSink API以同步或异步方式发射多个元素”有关。在您的情况下,您正在同步发出所有元素,并且您没有给Reactor做任何工作的机会,因为在完成所有操作之前,您永远不会退出该方法。简而言之,您将阻止当前线程,这是Reactor禁止的。

使用线程给Reactor分配这些信号的机会。但是Flux.create()的精神(如其javadoc所示)是关于适应回调。

在这种情况下,Flux.generate可能更合适。尝试使用Thread模拟延迟不是一个好主意。您可以改为执行以下操作:

Flux<String> response = Flux.just(aListOfElements).delayElements(Duration.ofSeconds(2));

08-26 00:35