本文介绍了如何在具有Kafka集成的Spring Batch中调用StepExcecutionListener?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

下面是etl.xml中job的配置

Below is the config of job in etl.xml

< batch:job id ="procuerJob">

<batch:job id="procuerJob">

<batch:step id="Produce">

    <batch:partition partitioner="partitioner">

        <batch:handler grid-size="${ partitioner.limit}"></batch:handler>

        <batch:step>

            <batch:tasklet>

                <batch:chunk reader="Reader" writer="kafkaProducer"
                             commit-interval="20000">

                </batch:chunk>

                <batch:listeners>

                    <batch:listener ref="producingListener" />


                </batch:listeners>

            </batch:tasklet>

        </batch:step>

    </batch:partition>

</batch:step>

</batch:job>

</batch:job>

下面是用于向该主题发送消息的代码.

below is the code used to send messaged to the topic.

ListenableFuture< SendResult<字符串,消息>> listenableFuture = kafkaTemplate.send(message);

ListenableFuture<SendResult<String, message>> listenableFuture = kafkaTemplate.send(message);

listenableFuture.addCallback(new ListenableFutureCallback< SendResult< String,message>>(){

listenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, message >>() {

@Override
public void onSuccess(SendResult<String, message > result) {
    log.info("marking as SUCCESS");
    manager.updateStatus("someTable", KafkaResponse.SUCCESS);
}

@Override
public void onFailure(Throwable ex) {
    log.info("marking as FAILURE");
    manager.updateKafkaStatus(someTable, KafkaResponse.FAILURE);
}

}

执行kafkaTemplate.send(message)后,将调用侦听器并完成作业.我看到了在作业完成后调用onSuccess(),onFailure().我如何更改job的配置,以便在收到来自kafka主题的确认后调用侦听器?

Once the kafkaTemplate.send(message)is executed , the listener is called and the job completes. I see theonSuccess(), onFailure() are called post the job is completed.How can I chnage the config of job so that listener is called after receiving the acknowledgement from kafka topic?

推荐答案

我没有尝试以下方法,但是这里是一个主意:

I did not try the following but here is the idea:

ListenableFuture<SendResult<String, Message>> future = kafkaTemplate.send(message);
try {
    SendResult<String, Message> sendResult = future.get();
    // inspect sendResult
    log.info("marking as SUCCESS");
    manager.updateStatus("someTable", KafkaResponse.SUCCESS);
} catch (Exception e) {
     log.info("marking as FAILURE");
     manager.updateKafkaStatus(someTable, KafkaResponse.FAILURE);
     // do something with e
}

这篇关于如何在具有Kafka集成的Spring Batch中调用StepExcecutionListener?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-06 02:56