我有一个GET方法的示例,该方法生成一个Randon INT并将其发送回客户端
@GetMapping
public Flux<String> search() {
return Flux.create(fluxSink -> {
Random r = new Random();
int n;
for (int i = 0; i < 10; i++) {
n= r.nextInt(1000);
System.out.println("Creating:"+n);
fluxSink.next(String.valueOf(n));
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
fluxSink.complete();
});
}
卷曲:
curl -v -H“接受:文本/事件流” -X GET
'http://localhost:8080/stream
通过使用CURL命令的此代码,只有在fluxSink.complete发生时,我才能在客户端中看到数字。
现在,如果我将代码更改为:
@GetMapping
public Flux<String> search() {
return Flux.create(fluxSink -> {
Thread t = new Thread(() -> {
Random r = new Random();
int n;
for (int i = 0; i < 10; i++) {
n= r.nextInt(1000);
System.out.println("Creating:"+n);
fluxSink.next(String.valueOf(n));
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
fluxSink.complete();
});
t.start();
});
}
将进程包装到一个线程中,效果很好。当fluxSink.next发生时,我可以看到数据传输良好。
谁能解释这种影响?如何在不显式使用线程的情况下查看数据流?
谢谢!
最佳答案
Flux.create()
与“通过FluxSink API以同步或异步方式发射多个元素”有关。在您的情况下,您正在同步发出所有元素,并且您没有给Reactor做任何工作的机会,因为在完成所有操作之前,您永远不会退出该方法。简而言之,您将阻止当前线程,这是Reactor禁止的。
使用线程给Reactor分配这些信号的机会。但是Flux.create()
的精神(如其javadoc所示)是关于适应回调。
在这种情况下,Flux.generate
可能更合适。尝试使用Thread
模拟延迟不是一个好主意。您可以改为执行以下操作:
Flux<String> response = Flux.just(aListOfElements).delayElements(Duration.ofSeconds(2));