我想在每次更改整数变量时发送更新

@PutMapping
public void update() {
    integer = random.nextInt();
}

@GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Integer> eventFlux() {
    Flux<Integer> eventFlux = Flux.fromStream(
            Stream.generate(() -> integer)
    );
    return Flux.zip(eventFlux, onUpdate()).map(Tuple2::getT1);
}

private Flux<Long> onUpdate() {
    return Flux.interval(Duration.ofSeconds(1));
}

最佳答案

您可以将FluxProcessor用作:

  • 上游Subscriber;您可以用来添加数字:
  •     @PutMapping("/{number}")
        public void update(@PathVariable Integer number) {
            processor.onNext(number);
        }
    
  • 下游Publisher(Flux);哪些客户可以订阅以接收添加的号码:
  •     @GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
        public Flux<Integer> eventFlux() {
            return processor;
        }
    

    以下是一个完整的工作示例:
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.http.MediaType;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.PathVariable;
    import org.springframework.web.bind.annotation.PutMapping;
    import org.springframework.web.bind.annotation.RestController;
    import org.springframework.web.reactive.config.EnableWebFlux;
    import org.springframework.web.reactive.config.WebFluxConfigurer;
    
    import reactor.core.publisher.DirectProcessor;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.FluxProcessor;
    
    @SpringBootApplication
    @EnableWebFlux
    @RestController
    public class EventStreamApp implements WebFluxConfigurer {
    
        public static void main(String[] args) {
            SpringApplication.run(EventStreamApp.class);
        }
    
        private FluxProcessor<Integer, Integer> processor = DirectProcessor.create();
    
        @PutMapping("/{number}")
        public void update(@PathVariable Integer number) {
            processor.onNext(number);
        }
    
        @GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
        public Flux<Integer> eventFlux() {
            return processor;
        }
    }
    

    Complete code on GitHub

    希望这可以帮助。

    08-17 09:42