我开始从Spring-boot学习Webflux。我了解到,对于RestController的终结点,您可以定义一个Flux请求主体,在这里我希望有一个真正的流量流,也就是说,整个请求的各个部分是一个接一个的,这些部分也可以一个接一个地处理。但是,在用客户机和服务器构建了一个小示例之后,我无法按预期工作。

因此,这是服务器的代码段:

@PostMapping("/digest")
    public Flux<String> digest(@RequestBody Flux<String> text) {
        continuousMD5.reset();
        return text.log("server.request.").map(piece -> continuousMD5.update(piece)).log("server.response.");
    }

注意:每段文本将被发送到ContinuousMD5对象,该对象将累加所有片段,并在每次累加之后计算并返回中间MD5哈希值。流将在MD5计算之前和之后记录。

这是客户端的代码段:
@PostConstruct
    private void init() {
        webClient = webClientBuilder.baseUrl(reactiveServerUrl).build();
    }

@PostMapping(value = "/send", consumes = MediaType.TEXT_PLAIN_VALUE)
    public Flux<String> send(@RequestBody Flux<String> text) {
        return webClient.post()
            .uri("/digest")
            .accept(MediaType.TEXT_PLAIN)
            .body(text.log("client.request."), String.class)
            .retrieve().bodyToFlux(String.class).log("client.response.");
    }

注意:客户端接受一些文本的流量流,并记录该流并将其发送到服务器(作为流量流)。

出乎意料的是,我通过以下命令行使其能够发送REST请求并让客户端接收通量流:

for i in $(seq 1 100); do echo "The message $i"; done | http POST :8080/send  Content-Type:text/plain

我可以在客户端的日志中看到:
2019-05-09 17:02:08.604  INFO 3462 --- [ctor-http-nio-2] client.response.Flux.MonoFlatMapMany.2   : onSubscribe(MonoFlatMapMany.FlatMapManyMain)
2019-05-09 17:02:08.606  INFO 3462 --- [ctor-http-nio-2] client.response.Flux.MonoFlatMapMany.2   : request(1)
2019-05-09 17:02:08.649  INFO 3462 --- [ctor-http-nio-6] client.request.Flux.SwitchIfEmpty.1      : onSubscribe(FluxSwitchIfEmpty.SwitchIfEmptySubscriber)
2019-05-09 17:02:08.650  INFO 3462 --- [ctor-http-nio-6] client.request.Flux.SwitchIfEmpty.1      : request(32)
2019-05-09 17:02:08.674  INFO 3462 --- [ctor-http-nio-2] client.request.Flux.SwitchIfEmpty.1      : onNext(The message 1)
2019-05-09 17:02:08.676  INFO 3462 --- [ctor-http-nio-2] client.request.Flux.SwitchIfEmpty.1      : request(1)
2019-05-09 17:02:08.676  INFO 3462 --- [ctor-http-nio-2] client.request.Flux.SwitchIfEmpty.1      : onNext(The message 2)
...
2019-05-09 17:02:08.710  INFO 3462 --- [ctor-http-nio-2] client.request.Flux.SwitchIfEmpty.1      : onNext(The message 100)
2019-05-09 17:02:08.710  INFO 3462 --- [ctor-http-nio-6] client.request.Flux.SwitchIfEmpty.1      : request(1)
2019-05-09 17:02:08.710  INFO 3462 --- [ctor-http-nio-6] client.request.Flux.SwitchIfEmpty.1      : request(1)
2019-05-09 17:02:08.710  INFO 3462 --- [ctor-http-nio-6] client.request.Flux.SwitchIfEmpty.1      : request(1)
2019-05-09 17:02:08.711  INFO 3462 --- [ctor-http-nio-2] client.request.Flux.SwitchIfEmpty.1      : onComplete()
2019-05-09 17:02:08.711  INFO 3462 --- [ctor-http-nio-6] client.request.Flux.SwitchIfEmpty.1      : request(1)
2019-05-09 17:02:08.711  INFO 3462 --- [ctor-http-nio-6] client.request.Flux.SwitchIfEmpty.1      : request(1)
2019-05-09 17:02:08.860  INFO 3462 --- [ctor-http-nio-6] client.response.Flux.MonoFlatMapMany.2   : onNext(CSubeSX3yIVP2CD6FRlojg==)
2019-05-09 17:02:08.862  INFO 3462 --- [ctor-http-nio-6] client.response.Flux.MonoFlatMapMany.2   : onComplete()
^C2019-05-09 17:02:47.393  INFO 3462 --- [ctor-http-nio-6] client.request.Flux.SwitchIfEmpty.1      : cancel()

每篇文章都被认为是助焊剂流的组成部分,因此需要分别提出。

但是在服务器日志中:
2019-05-09 17:02:08.811  INFO 3475 --- [ctor-http-nio-2] server.request.Flux.SwitchIfEmpty.1      : onSubscribe(FluxSwitchIfEmpty.SwitchIfEmptySubscriber)
2019-05-09 17:02:08.813  INFO 3475 --- [ctor-http-nio-2] server.response.Flux.Map.2               : onSubscribe(FluxMap.MapSubscriber)
2019-05-09 17:02:08.814  INFO 3475 --- [ctor-http-nio-2] server.response.Flux.Map.2               : request(1)
2019-05-09 17:02:08.814  INFO 3475 --- [ctor-http-nio-2] server.request.Flux.SwitchIfEmpty.1      : request(1)
2019-05-09 17:02:08.838  INFO 3475 --- [ctor-http-nio-2] server.request.Flux.SwitchIfEmpty.1      : onNext(The message 1The message 2The message 3The message 4The message 5The message 6The message 7The message 8The message 9The message 10The message 11The message 12The message 13The message 14The message 15The message 16The message 17The message 18The message 19The message 20The message 21The message 22The message 23The message 24The message 25The message 26The message 27The message 28The message 29The message 30The message 31The message 32The message 33The message 34The message 35The message 36The message 37The message 38The message 39The message 40The message 41The message 42The message 43The message 44The message 45The message 46The message 47The message 48The message 49The message 50The message 51The message 52The message 53The message 54The message 55The message 56The message 57The message 58The message 59The message 60The message 61The message 62The message 63The message 64The message 65The message 66The message 67The message 68The message 69The message 70The message 71The message 72The message 73The message 74The message 75The message 76The message 77The message 78The message 79The message 80The message 81The message 82The message 83The message 84The message 85The message 86The message 87The message 88The message 89The message 90The message 91The message 92The message 93The message 94The message 95The message 96The message 97The message 98The message 99The message 100)
2019-05-09 17:02:08.840  INFO 3475 --- [ctor-http-nio-2] server.response.Flux.Map.2               : onNext(CSubeSX3yIVP2CD6FRlojg==)
2019-05-09 17:02:08.852  INFO 3475 --- [ctor-http-nio-2] server.response.Flux.Map.2               : request(32)
2019-05-09 17:02:08.852  INFO 3475 --- [ctor-http-nio-2] server.request.Flux.SwitchIfEmpty.1      : request(32)
2019-05-09 17:02:08.852  INFO 3475 --- [ctor-http-nio-2] server.request.Flux.SwitchIfEmpty.1      : onComplete()
2019-05-09 17:02:08.852  INFO 3475 --- [ctor-http-nio-2] server.response.Flux.Map.2               : onComplete()
2019-05-09 17:02:47.394  INFO 3475 --- [ctor-http-nio-2] server.response.Flux.Map.2               : cancel()
2019-05-09 17:02:47.394  INFO 3475 --- [ctor-http-nio-2] server.request.Flux.SwitchIfEmpty.1      : cancel()

我看到所有文本立即到达服务器,因此被当作流量流中的一个大元素处理(也可以验证只计算了一个MD5哈希,而不是100个)。

我希望服务器还从客户端接收一些文本,作为通量流中的元素,否则对于服务器来说,它不是真正的反应性,而只是正常的阻塞请求。

谁能帮助我了解如何使用Webflux发出真正的助焊剂反应要求?谢谢!

更新

我使用类似的命令行对服务器发出REST请求,并且可以看到服务器以通量流的形式接收了文本(“消息x”)。因此,我认为服务器可以正常运行,现在的问题可能是客户端:如何使用WebClient发出真正的流量REST请求?

最佳答案

如果要实现流式传输效果,可以:

  • 使用支持stream-application/stream+json的其他内容类型。查看有关此内容的以下SO线程:
    Spring WebFlux Flux behavior with non streaming application/json
  • 将基础协议更改为最适合流模型的协议,例如WebSockets。 https://howtodoinjava.com/spring-webflux/reactive-websockets/
  • 关于java - 如何提出真实的助焊剂要求?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/56109240/

    10-10 17:33