在带有Reactor的Spring Boot 2中,我试图合并两个Flux
热源。但是,merge
似乎只报告了Flux
中两个merge
参数中的第一个。如何获取merge
来识别第二个Flux
。
在下面的示例中,当System.err
是第一个参数时,甚至不会打印B-2
中的outgoing1a
。如果我先创建outgoing2
,则A-2
不会打印。
下面是完整的示例;
package com.example.demo;
import java.time.Duration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class Weather {
String city;
Integer temperature;
public Weather(String city, Integer temperature) {
this.city = city;
this.temperature = temperature;
}
@Override
public String toString() {
return "Weather [city=" + city + ", temperature=" + temperature + "]";
}
public static void main(String[] args) {
BlockingQueue<Weather> queue = new LinkedBlockingQueue<>();
BlockingQueue<Weather> queue2 = new LinkedBlockingQueue<>();
// Assume Spring @Repository "A-1"
new Thread(() -> {
for (int d = 1; d < 1000; d += 1) {
for (String s: new String[] {"LDN", "NYC", "PAR", "ZUR"}) {
queue.add(new Weather(s, d));
try { Thread.sleep(250); } catch (InterruptedException e) {}
}
}
}).start();
// Assume Spring @Repository "B-1"
new Thread(() -> {
for (int d = 1; d < 1000; d += 1) {
for (String s: new String[] {"MOS", "TLV"}) {
queue2.add(new Weather(s, d));
try { Thread.sleep(1000); } catch (InterruptedException e) {}
}
}
}).start();
// Assume Spring @Service "A-2" = real-time LDN, NYC, PAR, ZUR
Flux<Weather> outgoing1 = Flux.<Weather>create(
sink -> {
for (int i = 0; i < 1000; i++) {
try {
sink.next(queue.take());
System.err.println("1 " + queue.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
sink.complete();
}
).publishOn(Schedulers.newSingle("outgoing-1"));
// Assume Spring @Service "B-2" = real-time MOS, TLV
Flux<Weather> outgoing2 = Flux.<Weather>create(
sink -> {
for (int i = 0; i < 1000; i++) {
try {
sink.next(queue2.take());
System.err.println("2 " + queue2.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
sink.complete();
}
).publishOn(Schedulers.newSingle("outgoing-2"));
// Assume Spring @Service "A-3" = 5 second summary of LDN, NYC, PAR, ZUR
Flux<Weather> outgoing1a = Flux.from(outgoing1)
.groupBy(c -> c.city)
.flatMap(g -> g
.sample(Duration.ofSeconds(5))
)
.log("C");
// Assume Spring @Service "C" - merges "A-3" and "B-2"
// only prints outgoing1a
Flux.merge(outgoing1a, outgoing2).subscribe(System.out::println);
// only prints outgoing2
//Flux.merge(outgoing2, outgoing1a).subscribe(System.out::println);
}
}
最佳答案
这里有一些事情在起作用。
请注意.merge
运算符的以下建议...
请注意,合并是为与异步源或有限源一起使用而设计的。当处理尚未在专用Scheduler上发布的无限源时,您必须在单独的Scheduler中隔离该源,因为合并可能会在订阅另一个源之前尝试耗尽它。
您的出站流量使用.publishOn
,但这仅影响在.publishOn
运算符之后链接的运算符。即,它不会影响.publishOn
之前的任何内容。具体来说,它不会影响在lambda中传递给Flux.create
的代码所执行的线程。如果在每个出站通量的.log()
之前添加.publishOn
,则可以看到此信息。
传递给Flux.create
的lambda调用一个阻止方法(queue.take
)。
由于您在subscribe(...)
线程中的合并Flux上调用了main
,因此传递给Flux.create
的lambda将在main
线程中执行并阻塞。
最简单的解决方法是使用.subscribeOn
而不是.publishOn
,以便在lambda中传递给Flux.create
的代码在不同的线程上运行(而不是main
)。这将防止main
线程被阻塞,并允许对两个出站流的合并输出进行交织。