本文介绍了用反应堆开火就忘的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的 Spring Boot 应用程序中有一个类似下面的方法.

I have a method like below in my Spring boot app.

public Flux<Data> search(SearchRequest request) {
  Flux<Data> result = searchService.search(request);//this returns Flux<Data>
  Mono<List<Data>> listOfData = result.collectList();
//  doThisAsync() // here I want to pass this list and run some processing on it
// the processing should happen async and the search method should return immediately.
  return result;
}

//this method uses the complete List<Data> returned by above method
public void doThisAsync(List<Data> data) {
  //do some processing here
}

目前,我正在使用 @Async 注释服务类和 doThisAsync,但不知道如何传递 List>,因为我不想调用block.我只有Mono.

Currently, I'm using @Async annotated service class with doThisAsync, but don't know how to pass the List<Data>, because I don't want to call block.All I have is Mono<List<Data>>.

我的主要问题是如何单独处理这个 Mono,search 方法应该返回 Flux.

My main problem is how to process this Mono separately and the search method should return the Flux<Data>.

推荐答案

1, 如果你的 fire-and-forget 已经异步返回 Mono/Flux

1, If your fire-and-forget is already async returning Mono/Flux

public Flux<Data> search(SearchRequest request)
{
    return searchService.search(request)
                        .collectList()
                        .doOnNext(data -> doThisAsync(data).subscribe())  // add error logging here or inside doThisAsync
                        .flatMapMany(Flux::fromIterable);
}

public Mono<Void> doThisAsync(List<Data> data) {
    //do some async/non-blocking processing here like calling WebClient
}

2,如果您的即发即弃确实阻塞了 I/O

2, If your fire-and-forget does blocking I/O

public Flux<Data> search(SearchRequest request)
{
    return searchService.search(request)
                        .collectList()
                        .doOnNext(data -> Mono.fromRunnable(() -> doThisAsync(data))
                                              .subscribeOn(Schedulers.elastic())  // delegate to proper thread to not block main flow
                                              .subscribe())  // add error logging here or inside doThisAsync
                        .flatMapMany(Flux::fromIterable);
}

public void doThisAsync(List<Data> data) {
    //do some blocking I/O on calling thread
}

请注意,在上述两种情况下,您都失去了背压支持.如果 doAsyncThis 由于某种原因变慢,那么数据生产者不会关心并继续生产项目.这是火与雾机制的自然结果.

Note that in both of the above cases you lose backpressure support. If the doAsyncThis slows down for some reason, then the data producer won't care and keep producing items. This is a natural consequence of the fire-and-foget mechanism.

这篇关于用反应堆开火就忘的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-21 14:07