处理从Stream.generate
构造构建的助焊剂时遇到问题。
Java流正在从远程源中获取一些数据,因此我实现了一个自定义的供应商,该供应商嵌入了数据获取逻辑,然后使用它来填充Stream。
Stream.generate(new SearchSupplier(...))
我的想法是检测一个空列表并使用
takeWhile
->的Java9功能Stream.generate(new SearchSupplier(this, queryBody))
.takeWhile(either -> either.isRight() && either.get().nonEmpty())
(使用Vavr的Either构造)
然后,存储层通量将执行以下操作:
return Flux.fromStream (
this.searchStream(...) //this is where the stream gets generated
)
.map(Either::get)
.flatMap(Flux::fromIterable);
“服务”层由通量上的一些转换步骤组成,但是方法签名类似于
Flux<JsonObject> search(...)
。最后,控制器层具有GetMapping:
@GetMapping(produces = "application/stream+json")
public Flux search(...) {
return searchService.search(...) //this is the Flux<JsonObject> parth
.subscriberContext(...) //stuff I need available during processing
.doOnComplete(() -> log.debug("DONE"));
}
我的问题是助焊剂似乎永远不会终止。
例如,从Postman拨打电话只是在响应部分中拍摄了“正在加载...”部分。当我从IDE中终止该过程时,结果将刷新到邮递员,我看到了期望的结果。而且doOnComplete lambda永远不会被调用
我注意到的是,如果我改变助焊剂的来源:
Flux.fromArray(...) //harcoded array of lists of jsons
doOnComplete lambda被调用,并且http连接也关闭,结果显示在邮递员中。
知道可能是什么问题吗?
谢谢。
最佳答案
您可以使用如下代码直接创建Flux。请注意,我根据您的SearchSupplier的工作方式添加了一些假定的方法,您需要实现这些方法:
Flux<SearchResultType> flux = Flux.generate(
() -> new SearchSupplier(this, queryBody),
(supplier, sink) -> {
SearchResultType current = supplier.next();
if (isNotLast(current)) {
sink.next(current);
} else {
sink.complete();
}
return supplier;
},
supplier -> anyCleanupOperations(supplier)
);
关于java - 来自无限Java流的磁通端点,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/54239928/