问题描述
我有ListenAbleFuture列表.我要等待这个列表 ListenableFuture<SendResult<Integer, String>>
最多15分钟(如果它们尚未完成).我该怎么做到.
I have the List of ListenAbleFuture.I want to wait of this List of ListenableFuture<SendResult<Integer, String>>
for atmost 15 minutes if they have not comepleted.How can i achieve it.
当前我正在执行此操作,但是我不希望每个ListenAbleFuture等待15分钟.
Currently i am doing this but this wait for 15 min for every ListenAbleFuture which is what i dont want.
for (ListenableFuture<SendResult<Integer, String>> m : myFutureList) {
m.get(15, TimeUnit.MINUTES) ;
}
ListenableFuture<SendResult<Integer, String>> is from import org.springframework.util.concurrent.ListenableFuture;
我已经经历过等待未来"列表,但是这种解决方案是为了完成未来
I have gone through Waiting on a list of Future but this solution is for completablefuture
推荐答案
创建CountDownLatch
,例如new CountDownLatch(50)
,在每个可监听的未来中添加一个监听器,并在每个监听器中减少锁存器的数量.您可以对所有期货使用相同的侦听器,而不必每次都创建一个新的侦听器.
Create a CountDownLatch
, e.g. new CountDownLatch(50)
, add a listener to each listenable future and count the latch down in each one. You can use the same listener for all the futures rather than creating a new one each time.
然后,在发送50条记录后,使用latch.await(10, TimeUnit.SECONDS)
.如果超时,则可以对期货进行迭代,以确定哪些未完成.
Then, after sending 50 records, use latch.await(10, TimeUnit.SECONDS)
. If it times out you can then iterate over your futures to figure out which one(s) are not complete.
编辑
@Component
class Sender {
private static final Logger LOG = LoggerFactory.getLogger(Sender.class);
public void sendThem(KafkaTemplate<String, String> template, List<String> toSend) throws InterruptedException {
List<ListenableFuture<SendResult<String, String>>> futures = new ArrayList<>();
CountDownLatch latch = new CountDownLatch(toSend.size());
ListenableFutureCallback<SendResult<String, String>> callback =
new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
LOG.info(result.getRecordMetadata().toString());
latch.countDown();
}
@Override
public void onFailure(Throwable ex) {
ProducerRecord<?, ?> producerRecord = ((KafkaProducerException) ex).getProducerRecord();
LOG.error("Failed; " + producerRecord, ex);
latch.countDown();
}
};
toSend.forEach(str -> {
ListenableFuture<SendResult<String, String>> future = template.send("so61490633", str);
future.addCallback(callback);
});
if (latch.await(10, TimeUnit.SECONDS)) {
LOG.info("All sent ok");
}
else {
for (int i = 0; i < toSend.size(); i++) {
if (!futures.get(i).isDone()) {
LOG.error("No send result for " + toSend.get(i));
}
}
}
}
}
这篇关于等待Kafka发送API返回的ListenAbleFuture列表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!