本文介绍了如何获得KafkaListener消费者的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我使用SpringBoot 2.0.4.RELEASE的Spring-Kafka。
并使用KafkaListener获取消息
现在我要重置我的组的偏移量
但我不知道如何获得群的消费者
@KafkaListener(id="test",topics={"test"},groupId="group",containerFactory="batchContainerFactory")
public String listenTopic33(List<ConsumerRecord<Integer, String>> record, Acknowledgment ack){
// do something
}
@Autowired
KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
public void test() {
MessageListenerContainer test3 = kafkaListenerEndpointRegistry.getListenerContainer("test3");
}
推荐答案
如果要在侦听器本身中查找使用者,只需向侦听器方法添加Consumer<?, ?> consumer
参数。
max.poll.records=1
以避免这种情况。您还可以向容器添加自定义RemainingRecordsErrorHandler
,在侦听器中引发异常,错误处理程序将获得剩余的记录,而不是侦听器。
另请参阅Seeking to a Specific Offset。
void registerSeekCallback(ConsumerSeekCallback callback);
void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
void seek(String topic, int partition, long offset);
void seekToBeginning(String topic, int partition);
void seekToEnd(String topic, int partition);
这里有一个例子;我们跟踪每个主题/分区的回调...
@SpringBootApplication
public class So56584233Application {
public static void main(String[] args) {
SpringApplication.run(So56584233Application.class, args);
}
@Bean
public ApplicationRunner runner(Listener listener, KafkaTemplate<String, String> template) {
return args -> {
IntStream.range(0, 10).forEach(i -> template.send(new ProducerRecord<>("so56584233", i % 3, "foo", "bar")));
while (true) {
System.in.read();
listener.seekToStart();
}
};
}
@Bean
public NewTopic topic() {
return new NewTopic("so56584233", 3, (short) 1);
}
}
@Component
class Listener implements ConsumerSeekAware {
private static final Logger logger = LoggerFactory.getLogger(Listener.class);
private final Map<TopicPartition, ConsumerSeekCallback> callbacks = new ConcurrentHashMap<>();
private static final ThreadLocal<ConsumerSeekCallback> callbackForThread = new ThreadLocal<>();
@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {
callbackForThread.set(callback);
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
assignments.keySet().forEach(tp -> this.callbacks.put(tp, callbackForThread.get()));
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
}
@KafkaListener(id = "so56584233", topics = "so56584233", concurrency = "3")
public void listen(ConsumerRecord<String, String> in) {
logger.info(in.toString());
}
public void seekToStart() {
this.callbacks.forEach((tp, callback) -> callback.seekToBeginning(tp.topic(), tp.partition()));
}
}
这篇关于如何获得KafkaListener消费者的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!