问题描述
我使用EmbededKafka实施了一系列集成测试,以测试我们的一个使用spring-kafka框架运行的Kafka流应用程序.
I implemented a bunch of integration tests using EmbededKafka to test one of our Kafka streams application running using spring-kafka framework.
流应用程序正在从Kafka主题中读取一条消息,将其存储到内部状态存储中,进行一些转换,然后将其发送到另一个微服务中并发送到所请求的主题中.当响应返回到响应的主题时,它会从状态存储中检索原始消息,并根据某些业务逻辑将其转发到我们的一个下游系统,每个下游系统都有自己的主题.
The stream application is reading a message from a Kafka topic, it stores it into an internal state store, does some transformation and sends it to another micro service into a requested topic. When the response comes back into the responded topic it retrieves the original message from the state store and depending on some business logic it forwards it to one of our downstream systems, each one having their own topic.
集成测试只是对业务条件进行各种排列.
The integration tests just exercise the various permutations of the business conditions.
最初,将测试分为多个类.运行构建时,一个类的测试与另一类中的测试发生冲突,但有一些冲突例外.我没有花太多时间在此上,只是将所有测试移到了同一类中.这解决了我从gradle build或intelij EDI通过的所有测试通过的问题.
Initially the tests were split across to classes. When running the build the tests from one class were clashing with the ones in the other class with some conflict exceptions. I did not spend too much time on this and just moved all tests inside the same class. This fixed my issue with all tests passing either from gradle build or from intelij EDI.
这是测试:
package au.nab.tlm.streams.integration;
import au.nab.tlm.streams.serde.EntitlementsCheckSerDes;
import au.nab.tlm.streams.test.support.MockEntitlementsCheckSerDes;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
@SpringBootTest
@ContextConfiguration(classes = {MyTopologiesIntegrationTest.TestKafkaConfig.class})
@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD)
@EmbeddedKafka(
ports = 9092,
partitions = 1,
topics = {
"topic-1.v1",
"topic-2.v1",
"topic-3.v1",
"topic-4.v1",
"topic-5.v1",
"topic-6.v1",
},
brokerProperties = {"transaction.state.log.replication.factor=1", "transaction.state.log.min.isr=1", "log.dir=/tmp/embedded-kafka"}
)
public class MyTopologiesIntegrationTest {
@Autowired
EmbeddedKafkaBroker kafkaBroker;
@Autowired
EntitlementsCheckSerDes appSerDes;
@Test
public void test_1() {
}
@Test
public void test_2() {
}
@Test
public void test_3() {
}
@Test
public void test_4() {
}
@Test
public void test_5() {
}
@TestConfiguration
public static class TestKafkaConfig {
@Bean
EntitlementsCheckSerDes appSerDes() {
return new MockEntitlementsCheckSerDes();
}
}
}
我对结果感到满意,所以我推送了更改,只是发现构建在我们的CI服务器上失败了.再次运行它,这次再次失败,失败的原因与第一次不同.我找了一位同事看一看,他也遇到了与CI服务器类似的失败经历.我在我的机器上至少运行了20次构建,并且该构建始终通过.在我的同事上一个接一个地运行测试也总是通过.
Happy with the result I pushed my change just to notice the build was failing on our CI server. Running it once again it failed again this time with different failures than first time. I got a colleague to have a look and he was having the same failure experience similar with CI server. I ran the build at least twenty times on my machine and it was always passing. Running tests one by one on my colleague was always passing too.
我们得到的最常见的例外是主题xyz已经存在,但是偶尔也有其他例外情况表明群集无法被喜欢或相似.所有这些异常向我们表明,尽管使用了 DirtiesContext
批注,但在上一个测试中使用的嵌入式Kafka并未在下一个测试开始之前完全关闭.正在运行的第一个测试始终通过.
The most common exception we were getting was that the topic xyz already existed, but occasionally there was some other exceptions suggesting a cluster could not be fond or similar. All these exceptions were indicating to us the the embedded Kafka used in aa previous test was not completely shut down before the next test started, despite using the DirtiesContext
annotation.The very first test being run was always passing.
我们俩都花了整整一天的时间才拔掉头发,所以无法正常工作.我们尝试了谷歌带我们去的任何地方,无论走到哪里,都没有运气.最后,我们在测试类中只剩下了一个测试场景(交互次数最多的场景),并禁用了其余的场景.
We both spent a full day pulling off our hair, it was no way to get it working. We tried whatever and wherever google took us to, no luck at all. In the end we left the only one test scenario (the one with biggest number of interactions) in the test class and disabled the rest.
显然,这不是永久解决方案,我真的很想了解我们做错了什么以及如何解决.
Obviously this is not something to be accepted as a permanent solution and I would really want to understand what we done wrong and how could fix it.
在此先感谢您的投入.
推荐答案
请勿使用固定端口 ports = 9092,
-默认情况下,嵌入式kafka将在操作系统选择的随机端口上进行监听系统.
Do not use a fixed port ports = 9092,
- by default the embedded kafka will listen on a random port selected by the operating system.
您应该在测试用例中使用它.
You should use that for your test cases.
您可以通过调用 this.kafkaBroker.getBrokersAsString()
获得经纪人地址.
You can get the broker addresses by calling this.kafkaBroker.getBrokersAsString()
.
这篇关于嵌入式Kafka测试随机失败的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!