问题描述
我试图了解 Project reactor 为应用程序代码提供的数据可见性保证.例如我希望下面的代码会失败,但经过一百万次迭代后不会失败.我正在更改线程 A 上典型 POJO 的状态并从线程 B 读取它. Reactor 是否保证 POJO 更改在线程间可见?
I am trying to understand what guarantees with respect to data visibility Project reactor provides to application code. For e.g. I would expect the below code to fail but it does not after a million iterations. I am changing the state of a typical POJO on thread A and reading it back from thread B. Does Reactor guarantee POJO changes are visible across thread?
public class Main {
public static void main(String[] args) {
Integer result = Flux.range(1, 1_000_000)
.map(i -> {
Data data = new Data();
data.setValue(i);
data.setValueThreeTimes(i);
data.setValueObj(i + i);
return data;
})
.parallel(250)
.runOn(Schedulers.newParallel("par", 500))
.map(d -> {
d.setValueThreeTimes(d.getValueThreeTimes() + d.getValue());
return d;
})
.sequential()
.parallel(250)
.runOn(Schedulers.newParallel("par", 500))
.map(d -> {
d.setValueThreeTimes(d.getValueThreeTimes() + d.getValue());
return d;
})
// .sequential()
.map(d -> {
if (d.getValue() * 3 != d.getValueThreeTimes()) throw new RuntimeException("data corrupt error");
return d;
})
.reduce(() -> 0, (Integer sum, Data d) -> sum + d.getValueObj() + d.getValue())
.sequential()
.blockLast();
}
static class Data {
private int value;
private int valueThreeTimes;
private Integer valueObj;
public int getValueThreeTimes() {
return valueThreeTimes;
}
public void setValueThreeTimes(int valueThreeTimes) {
this.valueThreeTimes = valueThreeTimes;
}
public int getValue() {
return value;
}
@Override
public String toString() {
return "Data{" +
"value=" + value +
", valueObj=" + valueObj +
'}';
}
public void setValue(int value) {
this.value = value;
}
public Integer getValueObj() {
return valueObj;
}
public void setValueObj(Integer valueObj) {
this.valueObj = valueObj;
}
}
private static <T> T identityWithThreadLogging(T el, String operation) {
System.out.println(operation + " -- " + el + " -- " +
Thread.currentThread().getName());
return el;
}
}
推荐答案
Reactive Streams 规范为 Flux
或 Mono
(Publisher
code>), onNext
事件必须是连续的.
The Reactive Streams specification enforces that for a Flux
or Mono
(a Publisher
), onNext
events must be sequential.
parallel()
是一个 ParallelFlux
,它通过分而治之的方式放松了一点:你会得到多个轨道",每个轨道都单独遵守规范,但总体上没有(轨道之间的并行化).
parallel()
is a ParallelFlux
, which relaxes that a bit by dividing-and-conquering: you get multiple "rails" that each individually stick to the spec, but overall do not (parallelisation between rails).
反过来,sequential()
回到 Flux
世界并引入内存屏障以保证结果序列符合 RS 规范.
In turn, sequential()
goes back to the Flux
world and introduces memory barrier to guarantee the resulting sequence complies with the RS spec.
这篇关于Project Reactor 和 Java 内存模型的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!