问题描述
我已经实现了Kafka用户,现在有了一个方案.
I have implemented the Kafka consumer, now I have a scenario.
- 从Kafka流2.2.5中读取数据.通过Srpingboot发行
- 装入数据库表1
- 将数据从表1复制到表2
- 清除表1
要执行上述操作,我需要使用石英作业通过计划作业(已编写)来暂停/恢复Kafka使用者,该作业将数据从表1复制到表2.但是在此活动中,我希望我的Kafka侦听器能够暂停,复制完成后,就应该恢复.
To do the above things, I need to pause/resume the Kafka consumer using a scheduling job(already written) using quartz, which copies data from table 1 to table 2. But during this activity, I want my Kafka listener to pause, and once the copy is done, it should resume.
我的实现:
@KafkaListener(topicPartitions =
{ @TopicPartition(topic = "data_pipe", partitions = { "0" })})
public void listen(ConsumerRecord<String, String> cr) throws Exception {
推荐答案
如果您使用"kafkaListener批注",则会自动创建KafkaListenerEndpointRegistry bean,因此,您可以像下面的代码一样使用它:
if you use 'kafkaListener annotation' auto created KafkaListenerEndpointRegistry bean, so, You can use it like this code:
@Component
public class KafkaManager {
private final KafkaListenerEndpointRegistry registry;
public KafkaManager(KafkaListenerEndpointRegistry registry) {
this.registry = registry;
}
public void pause() {
registry.getListenerContainers().forEach(MessageListenerContainer::pause);
}
public void resume() {
registry.getListenerContainers().forEach(MessageListenerContainer::resume);
}
}
document: https://docs.spring.io/spring-kafka/reference/html/#pause-resume
document : https://docs.spring.io/spring-kafka/reference/html/#pause-resume
这篇关于如何使用spring-kafka暂停和恢复@KafkaListener的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!