我想在每次更改整数变量时发送更新
@PutMapping
public void update() {
integer = random.nextInt();
}
@GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Integer> eventFlux() {
Flux<Integer> eventFlux = Flux.fromStream(
Stream.generate(() -> integer)
);
return Flux.zip(eventFlux, onUpdate()).map(Tuple2::getT1);
}
private Flux<Long> onUpdate() {
return Flux.interval(Duration.ofSeconds(1));
}
最佳答案
您可以将FluxProcessor
用作:
Subscriber
;您可以用来添加数字: @PutMapping("/{number}")
public void update(@PathVariable Integer number) {
processor.onNext(number);
}
Publisher
(Flux
);哪些客户可以订阅以接收添加的号码: @GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Integer> eventFlux() {
return processor;
}
以下是一个完整的工作示例:
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.reactive.config.EnableWebFlux;
import org.springframework.web.reactive.config.WebFluxConfigurer;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
@SpringBootApplication
@EnableWebFlux
@RestController
public class EventStreamApp implements WebFluxConfigurer {
public static void main(String[] args) {
SpringApplication.run(EventStreamApp.class);
}
private FluxProcessor<Integer, Integer> processor = DirectProcessor.create();
@PutMapping("/{number}")
public void update(@PathVariable Integer number) {
processor.onNext(number);
}
@GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Integer> eventFlux() {
return processor;
}
}
Complete code on GitHub
希望这可以帮助。