本文介绍了等待Kafka发送API返回的ListenAbleFuture列表的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有ListenAbleFuture列表.我要等待这个列表 ListenableFuture<SendResult<Integer, String>>最多15分钟(如果它们尚未完成).我该怎么做到.

I have the List of ListenAbleFuture.I want to wait of this List of ListenableFuture<SendResult<Integer, String>> for atmost 15 minutes if they have not comepleted.How can i achieve it.

当前我正在执行此操作,但是我不希望每个ListenAbleFuture等待15分钟.

Currently i am doing this but this wait for 15 min for every ListenAbleFuture which is what i dont want.

 for (ListenableFuture<SendResult<Integer, String>> m : myFutureList) {

                    m.get(15, TimeUnit.MINUTES) ;
    }

ListenableFuture<SendResult<Integer, String>> is from import org.springframework.util.concurrent.ListenableFuture;

我已经经历过等待未来"列表,但是这种解决方案是为了完成未来

I have gone through Waiting on a list of Future but this solution is for completablefuture

推荐答案

创建CountDownLatch,例如new CountDownLatch(50),在每个可监听的未来中添加一个监听器,并在每个监听器中减少锁存器的数量.您可以对所有期货使用相同的侦听器,而不必每次都创建一个新的侦听器.

Create a CountDownLatch, e.g. new CountDownLatch(50), add a listener to each listenable future and count the latch down in each one. You can use the same listener for all the futures rather than creating a new one each time.

然后,在发送50条记录后,使用latch.await(10, TimeUnit.SECONDS).如果超时,则可以对期货进行迭代,以确定哪些未完成.

Then, after sending 50 records, use latch.await(10, TimeUnit.SECONDS). If it times out you can then iterate over your futures to figure out which one(s) are not complete.

编辑

@Component
class Sender {

    private static final Logger LOG = LoggerFactory.getLogger(Sender.class);

    public void sendThem(KafkaTemplate<String, String> template, List<String> toSend) throws InterruptedException {
        List<ListenableFuture<SendResult<String, String>>> futures = new ArrayList<>();
        CountDownLatch latch = new CountDownLatch(toSend.size());
        ListenableFutureCallback<SendResult<String, String>> callback =
                new ListenableFutureCallback<SendResult<String, String>>() {

            @Override
            public void onSuccess(SendResult<String, String> result) {
                LOG.info(result.getRecordMetadata().toString());
                latch.countDown();
            }

            @Override
            public void onFailure(Throwable ex) {
                ProducerRecord<?, ?> producerRecord = ((KafkaProducerException) ex).getProducerRecord();
                LOG.error("Failed; " + producerRecord, ex);
                latch.countDown();
            }
        };
        toSend.forEach(str -> {
             ListenableFuture<SendResult<String, String>> future = template.send("so61490633", str);
             future.addCallback(callback);
        });
        if (latch.await(10, TimeUnit.SECONDS)) {
            LOG.info("All sent ok");
        }
        else {
            for (int i = 0; i < toSend.size(); i++) {
                if (!futures.get(i).isDone()) {
                    LOG.error("No send result for " + toSend.get(i));
                }
            }
        }
    }

}

这篇关于等待Kafka发送API返回的ListenAbleFuture列表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-15 23:31