问题描述
我使用Spring Boot 2.0.6开发了一个@KafkaListener
,它也带有ConsumerAwareRebalanceListener
界面标记.我实现了onPartitionsAssigned
方法,在该方法中,我将偏移的时间倒带了固定的时间(例如60秒).
I developed a @KafkaListener
that is also marked with the ConsumerAwareRebalanceListener
interface, using Spring Boot 2.0.6. I implemented the onPartitionsAssigned
method, in which I rewind the offset of a fixed amount of time, let's say 60 seconds.
到目前为止,一切都很好.
So far so good.
如何使用Spring Kafka给我的工具测试上述用例?我以为我需要启动一个Kafka代理(即EmbeddedKafka
),然后停止侦听器,然后再次重新启动它,以测试它是否再次读取了过去60秒内到达的消息.
How can I test the above use case using the tools that Spring Kafka gives me? I supposed I need to start a Kafka broker (i.e., an EmbeddedKafka
), then stopping the listener and then rebooting it again, to test that it read again the messages arrived in the last 60 seconds.
有人可以帮助我吗?我在Google上搜索了一下,但没有找到任何东西.非常感谢.
Can somebody help me? I googled a little, but I didn't find anything.Thanks a lot.
推荐答案
@KafkaListener
具有:
/**
* The unique identifier of the container managing for this endpoint.
* <p>If none is specified an auto-generated one is provided.
* @return the {@code id} for the container managing for this endpoint.
* @see org.springframework.kafka.config.KafkaListenerEndpointRegistry#getListenerContainer(String)
*/
String id() default "";
属性,因此您可以通过提到的KafkaListenerEndpointRegistry
访问其MessageListenerContainer
,您可以简单地@Autowired
进入基于Spring Testing Framework的测试类.然后,您可以在测试方法中真正地stop()
和start()
该MessageListenerContainer
.
attribute, so you can get an access to its MessageListenerContainer
via mentioned KafkaListenerEndpointRegistry
, which you can simply @Autowired
into the test class based on Spring Testing Framework. Then, you can really stop()
and start()
that MessageListenerContainer
in your test method.
还要注意@KafkaListener
也是如何具有autoStartup()
属性的.
Also pay attention how @KafkaListener
has an autoStartup()
attribute also.
这篇关于如何测试ConsumerAwareRebalanceListener?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!