本文介绍了如何将单声道流转换为通量流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个尝试使用WebClient返回Mono的方法

    @GetMapping("getMatch")
    public Mono<Object> getMatch(@RequestParam Long matchId) {
        return WebClient.create(OpenDotaConstant.BASE_URL).get()
                .uri("/matches/{matchId}", matchId)
                .accept(MediaType.APPLICATION_JSON)
                .retrieve()
                .bodyToMono(Object.class);

    }

它可以返回我预期的结果。然后我尝试创建另一个方法来支持List as Params

    @GetMapping("getMatches")
    public Flux<Object> getMatches(@RequestParam String matchesId) {
        List<Long> matchesList = JSON.parseArray(matchesId, Long.class);

        return Flux.fromStream(matchesList.parallelStream().map(this::getMatch));
    }

但这次返回一个奇怪的结果。

[
    {
        "scanAvailable": true
    },
    {
        "scanAvailable": true
    }
]

我是反应式编程新手,如何正确组合Stream和Mono,然后转换为Flux?

推荐答案

可能,您需要的内容如下:

@GetMapping("getMatches")
public Flux<Object> getMatches(@RequestParam String matchesId) {
    List<Long> matchesList = JSON.parseArray(matchesId, Long.class);
    return Flux.fromStream(matchesList.stream())
               .flatMap(this::getMatch);
}

而不是:

@GetMapping("getMatches")
public Flux<Object> getMatches(@RequestParam String matchesId) {
    List<Long> matchesList = JSON.parseArray(matchesId, Long.class);
    return Flux.fromStream(matchesList.parallelStream().map(this::getMatch));
}

备注:

  • 基本上,您期望getMatches终结点返回Flux<Object>。然而,正如它所写的-它实际上返回Flux<Mono<Object>>,因此您会看到奇怪的输出。要获取Flux<Object>,我建议首先创建匹配ID的Flux<Long>,然后flatMap调用getMatch的结果(返回Mono<Object>),最后得到Flux<Object>

  • 此外,不需要使用parallelStream()。因为您已经在使用反应器,所以所有操作都将在反应器计划程序上并发执行。

这篇关于如何将单声道流转换为通量流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-13 22:03