问题描述
自启动以来,我一直对如何处理InterruptedException以及如果花费太多时间如何正确取消http请求感到困惑.我有一个库,其中为我们的客户提供了两种方法,sync和async.他们可以调用他们认为适合自己目的的任何方法.
Since starting, I was always confuse of how to deal with InterruptedException and how to properly cancel the http request if they are taking too much time. I have a library in which I have provided two methods, sync and async for our customer. They can call whichever method they feel is right for their purpose.
- executeSync()-等待直到得到结果,然后返回结果.
- executeAsync()-立即返回一个Future,如果需要,可以在完成其他操作后对其进行处理.
- executeSync() - waits until I have a result, returns the result.
- executeAsync() - returns a Future immediately which can be processed after other things are done, if needed.
他们将传递DataKey
对象,该对象中包含用户ID和超时值.我们将根据用户ID确定要调用的计算机,然后使用该计算机创建URL,然后使用,然后根据是否成功将响应发送回给他们.
They will pass DataKey
object which has the user id and timeout value in it. We will figure out which machine to call basis on the user id and then create an URL with that machine and we will make http call to the URL using AsyncRestTemplate and then send the response back to them basis on whether it is successful or not.
我正在使用方法,该方法返回一个ListenableFuture
,我想进行异步非阻塞具有基于NIO的客户端连接的体系结构,以便请求使用非阻塞IO,因此这就是我使用AsyncRestTemplate
的原因.这种方法听起来对我的问题定义正确吗?该库将在非常重的负载下用于生产.
I am using exchange method of AsyncRestTemplate
which returns back a ListenableFuture
and I wanted to have async non blocking architecture with NIO based client connections so that request uses non blocking IO so that's why I went with AsyncRestTemplate
. Does this approach sounds right for my problem definition? This library will be used in production under very heavy load.
下面是我的界面:
public interface Client {
// for synchronous
public DataResponse executeSync(DataKey key);
// for asynchronous
public ListenableFuture<DataResponse> executeAsync(DataKey key);
}
下面是我对接口的实现:
And below is my implementation of the interface:
public class DataClient implements Client {
// using spring 4 AsyncRestTemplate
private final AsyncRestTemplate restTemplate = new AsyncRestTemplate();
// for synchronous
@Override
public DataResponse executeSync(DataKey keys) {
Future<DataResponse> responseFuture = executeAsync(keys);
DataResponse response = null;
try {
response = responseFuture.get(keys.getTimeout(), TimeUnit.MILLISECONDS);
} catch (InterruptedException ex) {
// do we need to catch InterruptedException here and interrupt the thread?
Thread.currentThread().interrupt();
// also do I need throw this RuntimeException at all?
throw new RuntimeException("Interrupted", ex);
} catch (TimeoutException ex) {
DataLogging.logEvents(ex, DataErrorEnum.CLIENT_TIMEOUT, keys);
response = new DataResponse(null, DataErrorEnum.CLIENT_TIMEOUT, DataStatusEnum.ERROR);
responseFuture.cancel(true); // terminating the tasks that got timed out so that they don't take up the resources?
} catch (Exception ex) {
DataLogging.logEvents(ex, DataErrorEnum.ERROR_CLIENT, keys);
response = new DataResponse(null, DataErrorEnum.ERROR_CLIENT, DataStatusEnum.ERROR);
}
return response;
}
// for asynchronous
@Override
public ListenableFuture<DataResponse> executeAsync(final DataKey keys) {
final SettableFuture<DataResponse> responseFuture = SettableFuture.create();
final org.springframework.util.concurrent.ListenableFuture orig =
restTemplate.exchange(createURL(keys), HttpMethod.GET, keys.getEntity(), String.class);
orig.addCallback(
new ListenableFutureCallback<ResponseEntity<String>>() {
@Override
public void onSuccess(ResponseEntity<String> result) {
responseFuture.set(new DataResponse(result.getBody(), DataErrorEnum.OK,
DataStatusEnum.SUCCESS));
}
@Override
public void onFailure(Throwable ex) {
DataLogging.logErrors(ex, DataErrorEnum.ERROR_SERVER, keys);
responseFuture.set(new DataResponse(null, DataErrorEnum.ERROR_SERVER,
DataStatusEnum.ERROR));
}
});
// propagate cancellation back to the original request
responseFuture.addListener(new Runnable() {
@Override public void run() {
if (responseFuture.isCancelled()) {
orig.cancel(false); // I am keeping this false for now
}
}
}, MoreExecutors.directExecutor());
return responseFuture;
}
}
客户将通过他们的代码这样呼叫-
And customer will be calling like this from their code -
// if they are calling executeSync() method
DataResponse response = DataClientFactory.getInstance().executeSync(dataKey);
// and if they want to call executeAsync() method
Future<DataResponse> response = DataClientFactory.getInstance().executeAsync(dataKey);
现在的问题是-
-
如果http请求花费的时间太长,我们可以中断
AsyncRestTemplate
呼叫吗?我实际上是在上述executeSync
方法中的上述代码中在future
上调用cancel
的,但是我不确定如何验证它以确保它正在执行应做的工作?我想将取消传播回原来的将来,以便可以取消相应的http请求(我可能想这样做以节省资源),所以这就是为什么我在executeAsync方法中添加了一个侦听器.我相信,我们不能中断RestTemplate
调用,但是不能确定AsyncRestTemplate
是否可以执行该操作.如果说我们可以中断AsyncRestTemplate
调用,那么我是否正在做一切正确的事情来中断http调用?还是有更好/更清洁的方法来做到这一点?还是我甚至不必担心在当前设计中使用AsyncRestTemplate
取消Http请求?
Can we interrupt
AsyncRestTemplate
call if http request is taking too long? I am actually callingcancel
on myfuture
in my above code inexecuteSync
method but I am not sure how do I verify it to make sure it is doing what it should? I want to propagate cancellation back to the original future, so that I can cancel the corresponding http request (which I probably want to do to save resources) so that's why I have added a listener in my executeAsync method. I believe, we cannot interruptRestTemplate
calls but not sure onAsyncRestTemplate
whether we can do that or not. If let's say we can interruptAsyncRestTemplate
calls, then am I doing everything right to interrupt the http calls? Or is there any better/cleaner way to do this? Or Do I even need to worry about cancelling the Http request withAsyncRestTemplate
with my current design?
// propagate cancellation back to the original request
responseFuture.addListener(new Runnable() {
@Override public void run() {
if (responseFuture.isCancelled()) {
orig.cancel(false); // I am keeping this false for now
}
}
}, MoreExecutors.directExecutor());
在当前设置下,我可以看到它有时会抛出CancellationException(并非每次都发生)-这是否意味着我的HTTP请求随后被取消了?
With the current setup, I can see it is throwing CancellationException some of the times (not everytime)- Does that mean my HTTP request was cancelled then?
任何说明/代码建议都会有很大帮助.
Any explanations/code suggestions will be of great help.
推荐答案
首先,为什么要使用SettableFuture?为什么不能只返回AsyncRestTemplate返回的ListenableFuture?
First of all, Why are you using SettableFuture? Why can't just return the ListenableFuture returned by AsyncRestTemplate?
1. Can we interrupt AsyncRestTemplate call if http request is taking too long?
当然可以!您只需要调用Future.cancel
方法.此方法将中断AsyncRestTemplate实际使用的内部RestTemplate的执行.
Of course you do! You only need to call Future.cancel
method. This method will interrupt the execution of the internal RestTemplate that AsyncRestTemplate is actually using.
2. Also am I doing the right thing in catch block of InterruptedException in executeSync method?
正如Phil和Danilo所说,您不需要在InterruptedException catch块内中断当前线程.只需执行必须取消请求的执行时要做的任何事情.
As Phil and Danilo have said, you don't need to interrupt the current thread within the InterruptedException catch block. Just do whatever you need to do when the execution of the request must be canceled.
实际上,我建议您创建一个处理此行为的方法,例如handleInterruption,并将此方法用于TimeoutException
和InterruptedException
.
In fact, I recommend you create a method that handles this behaviour, something like handleInterruption, and use this method for both TimeoutException
and InterruptedException
.
3. Is it true that by default AsyncRestTamplete uses blocking calls and request per thread?
是的. AsyncRestTamplete
的默认构造函数在内部使用SimpleClientHttpRequestFactory
和SimpleAsyncTaskExecutor
.
Yes. The default constructor of AsyncRestTamplete
is internally using SimpleClientHttpRequestFactory
and SimpleAsyncTaskExecutor
.
此TaskExecutor总是对每个任务发起威胁,并且从不重用线程,因此效率非常低:
This TaskExecutor always starts a threat for every task, and never reuse Threads, so it's very inefficient:
* TaskExecutor implementation that fires up a new Thread for each task,
* executing it asynchronously.
*
* Supports limiting concurrent threads through the "concurrencyLimit"
* bean property. By default, the number of concurrent threads is unlimited.
*
* NOTE: This implementation does not reuse threads! Consider a
* thread-pooling TaskExecutor implementation instead, in particular for
* executing a large number of short-lived tasks.
*
我建议您使用AsyncRestTemplate的另一种配置.
您应该使用AsyncRestTemplate的构造函数,该构造函数使用另一个TaskExecutor:
You should use the constructor of AsyncRestTemplate that uses another TaskExecutor:
public AsyncRestTemplate(AsyncListenableTaskExecutor taskExecutor)
例如:
AsyncRestTemplate template = new AsyncRestTemplate(new ConcurrentTaskExecutor(Executors.newCachedThreadPool()));
此ExecutorService(Executors.newCachedThreadPool())根据需要创建新线程,但是将在可用之前重用以前构造的线程.
This ExecutorService (Executors.newCachedThreadPool()) creates new threads as needed, but will reuse previously constructed threads when they are available.
更好的是,您可以使用另一个RequestFactory.例如,您可以使用HttpComponentsAsyncClientHttpRequestFactory
,内部使用 NIO ,只需调用AsyncRestTemplate的适当构造函数即可:
Or even better, you can use another RequestFactory. For instance, you can use HttpComponentsAsyncClientHttpRequestFactory
, that internally uses NIO, just calling the proper constructor of AsyncRestTemplate:
new AsyncRestTemplate(new HttpComponentsAsyncClientHttpRequestFactory())
别忘了AsyncRestTemplate的内部行为将取决于您如何创建对象.
Don't forget the internal behaviour of AsyncRestTemplate will depend on how you create the object.
这篇关于如果他们花费太多时间,如何取消AsyncRestTemplate HTTP请求?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!