本文介绍了具有Spring Boot的简单嵌入式Kafka测试示例的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

编辑FYI:使用gitHub示例

我正在互联网上搜索,却找不到嵌入式Kafka测试的可行且简单的示例.

我的设置是:

I was searching the internet and couldn't find a working and simple example of an embedded Kafka test.

My setup is:

  • 春季靴子
  • 多个 @ KafkaListener 在一个类别中具有不同的主题
  • 嵌入式Kafka进行测试,一切正常
  • 使用发送到主题的Kafkatemplate进行测试,但是 @KafkaListener 方法即使经过大量睡眠也无法接收任何内容
  • 未显示警告或错误,仅记录了来自Kafka的垃圾信息
  • Spring boot
  • Multiple @KafkaListener with different topics in one class
  • Embedded Kafka for test which is starting fine
  • Test with Kafkatemplate which is sending to topic but the@KafkaListener methods are not receiving anything even after a huge sleep time
  • No warnings or errors are shown, only info spam from Kafka in logs

请帮助我.大部分是配置过度或工程过度的示例.我相信这可以简单地完成.谢谢,伙计们!

Please help me. There are mostly over configured or overengineered examples. I am sure it can be done simple.Thanks, guys!

@Controller
public class KafkaController {

    private static final Logger LOG = getLogger(KafkaController.class);

    @KafkaListener(topics = "test.kafka.topic")
    public void receiveDunningHead(final String payload) {
        LOG.debug("Receiving event with payload [{}]", payload);
        //I will do database stuff here which i could check in db for testing
    }
}


私有静态字符串SENDER_TOPIC ="test.kafka.topic";


private static String SENDER_TOPIC = "test.kafka.topic";

@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SENDER_TOPIC);

@Test
    public void testSend() throws InterruptedException, ExecutionException {

        Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);

        KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
        producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 0, "message00")).get();
        producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 1, "message01")).get();
        producer.send(new ProducerRecord<>(SENDER_TOPIC, 1, 0, "message10")).get();
        Thread.sleep(10000);
    }

推荐答案

嵌入式Kafka测试可通过以下配置为我工作,

Embedded Kafka tests work for me with below configs,

关于测试类的注释

@EnableKafka
@SpringBootTest(classes = {KafkaController.class}) // Specify @KafkaListener class if its not the same class, or not loaded with test config
@EmbeddedKafka(
    partitions = 1,
    controlledShutdown = false,
    brokerProperties = {
        "listeners=PLAINTEXT://localhost:3333",
        "port=3333"
})
public class KafkaConsumerTest {
    @Autowired
    KafkaEmbedded kafkaEmbeded;

    @Autowired
    KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

在注释设置方法之前

@Before
public void setUp() throws Exception {
  for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
    ContainerTestUtils.waitForAssignment(messageListenerContainer,
    kafkaEmbeded.getPartitionsPerTopic());
  }
}

注意:我没有使用 @ClassRule 来创建嵌入式Kafka,而是自动接线
@Autowired EmbeddedKafka

Note: I am not using @ClassRule for creating embedded Kafka rather auto-wiring
@Autowired embeddedKafka

@Test
public void testReceive() throws Exception {
     kafkaTemplate.send(topic, data);
}

希望这会有所帮助!

标记为 @TestConfiguration

@TestConfiguration
public class TestConfig {

@Bean
public ProducerFactory<String, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(kafkaEmbedded));
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
    KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
    kafkaTemplate.setDefaultTopic(topic);
    return kafkaTemplate;
}

现在 @Test 方法将自动连接KafkaTemplate并用于发送消息

Now @Test method will autowire KafkaTemplate and use is to send message

kafkaTemplate.send(topic, data);

更新的答案代码块已在上方行

Updated answer code block with above line

这篇关于具有Spring Boot的简单嵌入式Kafka测试示例的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-07 04:06