自开始以来,我一直对如何处理InterruptedException以及如果花费太多时间如何正确取消http请求感到困惑。我有一个库,其中为我们的客户提供了两种方法,sync和async。他们可以调用他们认为适合其目的的任何方法。

  • executeSync()-等待直到得到结果,然后返回结果。
  • executeAsync()-立即返回一个Future,如果需要,可以在完成其他操作之后进行处理。

  • 他们将传递DataKey对象,该对象中包含用户ID和超时值。我们将根据用户ID确定要调用的计算机,然后使用该计算机创建URL,然后使用AsyncRestTemplate对URL进行http调用,然后根据响应是否成功将响应发送回给他们。

    我正在使用AsyncRestTemplateexchange方法,该方法返回一个ListenableFuture,并且我希望具有基于NIO的客户端连接的异步非阻塞体系结构,以便该请求使用非阻塞IO,所以这就是我选择AsyncRestTemplate的原因。这种方法听起来对我的问题定义正确吗?该库将在非常重的负载下用于生产。

    下面是我的界面:
    public interface Client {
        // for synchronous
        public DataResponse executeSync(DataKey key);
    
        // for asynchronous
        public ListenableFuture<DataResponse> executeAsync(DataKey key);
    }
    

    下面是我对接口(interface)的实现:
    public class DataClient implements Client {
    
        // using spring 4 AsyncRestTemplate
        private final AsyncRestTemplate restTemplate = new AsyncRestTemplate();
    
        // for synchronous
        @Override
        public DataResponse executeSync(DataKey keys) {
            Future<DataResponse> responseFuture = executeAsync(keys);
            DataResponse response = null;
    
            try {
                response = responseFuture.get(keys.getTimeout(), TimeUnit.MILLISECONDS);
            } catch (InterruptedException ex) {
                // do we need to catch InterruptedException here and interrupt the thread?
                Thread.currentThread().interrupt();
                // also do I need throw this RuntimeException at all?
                throw new RuntimeException("Interrupted", ex);
            } catch (TimeoutException ex) {
                DataLogging.logEvents(ex, DataErrorEnum.CLIENT_TIMEOUT, keys);
                response = new DataResponse(null, DataErrorEnum.CLIENT_TIMEOUT, DataStatusEnum.ERROR);
                responseFuture.cancel(true); // terminating the tasks that got timed out so that they don't take up the resources?
            } catch (Exception ex) {
                DataLogging.logEvents(ex, DataErrorEnum.ERROR_CLIENT, keys);
                response = new DataResponse(null, DataErrorEnum.ERROR_CLIENT, DataStatusEnum.ERROR);
            }
    
            return response;
        }
    
        // for asynchronous
        @Override
        public ListenableFuture<DataResponse> executeAsync(final DataKey keys) {
    
            final SettableFuture<DataResponse> responseFuture = SettableFuture.create();
            final org.springframework.util.concurrent.ListenableFuture orig =
                restTemplate.exchange(createURL(keys), HttpMethod.GET, keys.getEntity(), String.class);
    
            orig.addCallback(
                    new ListenableFutureCallback<ResponseEntity<String>>() {
                        @Override
                        public void onSuccess(ResponseEntity<String> result) {
                            responseFuture.set(new DataResponse(result.getBody(), DataErrorEnum.OK,
                                    DataStatusEnum.SUCCESS));
                        }
    
                        @Override
                        public void onFailure(Throwable ex) {
                            DataLogging.logErrors(ex, DataErrorEnum.ERROR_SERVER, keys);
                            responseFuture.set(new DataResponse(null, DataErrorEnum.ERROR_SERVER,
                                    DataStatusEnum.ERROR));
                        }
                    });
    
            // propagate cancellation back to the original request
            responseFuture.addListener(new Runnable() {
              @Override public void run() {
                 if (responseFuture.isCancelled()) {
                   orig.cancel(false); // I am keeping this false for now
                 }
              }
            }, MoreExecutors.directExecutor());
            return responseFuture;
        }
    }
    

    客户将通过他们的代码像这样致电-
    // if they are calling executeSync() method
    DataResponse response = DataClientFactory.getInstance().executeSync(dataKey);
    
    // and if they want to call executeAsync() method
    Future<DataResponse> response = DataClientFactory.getInstance().executeAsync(dataKey);
    

    现在的问题是-
  • 如果http请求花费的时间太长,我们可以中断AsyncRestTemplate调用吗?我实际上是在cancel方法中上述代码中的future上调用executeSync的,但是我不确定如何验证它以确保它在做什么?我想将取消传播回原来的将来,以便可以取消相应的http请求(我可能想这样做以节省资源),所以这就是为什么我在executeAsync方法中添加了一个侦听器的原因。我相信,我们无法中断RestTemplate调用,但是不确定AsyncRestTemplate是否可以执行此操作。如果说我们可以中断AsyncRestTemplate调用,那么我是否正在做一切正确的事情来中断http调用?还是有更好/更清洁的方法来做到这一点?还是我甚至不必担心在当前设计中使用AsyncRestTemplate取消Http请求?
        // propagate cancellation back to the original request
        responseFuture.addListener(new Runnable() {
          @Override public void run() {
             if (responseFuture.isCancelled()) {
               orig.cancel(false); // I am keeping this false for now
             }
          }
        }, MoreExecutors.directExecutor());
    

    使用当前设置,我可以看到它有时会抛出CancellationException(并非每次都如此)-这是否意味着我的HTTP请求被取消了?
  • 我也在InterruptedException方法中的executeSync捕获块中做正确的事情吗?如果没有,那么解决该问题的正确方法是什么。我是否需要处理InterruptedException
  • 是真的吗,默认情况下AsyncRestTamplete使用阻塞调用和每个线程的请求?如果是,那么在当前设置中是否有任何方法可以使基于NIO的客户端连接?

  • 任何解释/代码建议将有很大帮助。

    最佳答案

    首先,为什么要使用SettableFuture?为什么不能只返回AsyncRestTemplate返回的ListenableFuture?

    1. Can we interrupt AsyncRestTemplate call if http request is taking too long?
    

    当然可以!您只需要调用Future.cancel方法。此方法将中断AsyncRestTemplate实际使用的内部RestTemplate的执行。
    2. Also am I doing the right thing in catch block of InterruptedException in executeSync method?
    

    正如Phil和Danilo所说,您不需要在InterruptedException catch块内中断当前线程。当必须取消执行请求时,只需执行您需要做的任何事情即可。

    实际上,我建议您创建一个处理此行为的方法,例如handleInterruption,并将此方法用于TimeoutExceptionInterruptedException
    3. Is it true that by default AsyncRestTamplete uses blocking calls and request per thread?
    

    是。 AsyncRestTamplete的默认构造函数在内部使用SimpleClientHttpRequestFactorySimpleAsyncTaskExecutor

    这个TaskExecutor总是对每个任务发起威胁,并且从不重用Threads,因此效率非常低:
     * TaskExecutor implementation that fires up a new Thread for each task,
     * executing it asynchronously.
     *
     * Supports limiting concurrent threads through the "concurrencyLimit"
     * bean property. By default, the number of concurrent threads is unlimited.
     *
     * NOTE: This implementation does not reuse threads! Consider a
     * thread-pooling TaskExecutor implementation instead, in particular for
     * executing a large number of short-lived tasks.
     *
    

    ,我建议您使用AsyncRestTemplate的另一种配置。

    您应该使用AsyncRestTemplate的构造函数,该构造函数使用另一个TaskExecutor:
    public AsyncRestTemplate(AsyncListenableTaskExecutor taskExecutor)
    

    例如:
    AsyncRestTemplate template = new AsyncRestTemplate(new ConcurrentTaskExecutor(Executors.newCachedThreadPool()));
    

    此ExecutorService(Executors.newCachedThreadPool())根据需要创建新线程,但是在可用时会重用以前构造的线程。

    甚至更好的是,您可以使用另一个RequestFactory。例如,您可以使用HttpComponentsAsyncClientHttpRequestFactory,内部使用 NIO ,只需调用AsyncRestTemplate的适当构造函数即可:
    new AsyncRestTemplate(new HttpComponentsAsyncClientHttpRequestFactory())
    

    不要忘记AsyncRestTemplate的内部行为将取决于您如何创建对象。

    07-24 13:51