问题描述
我正在寻找一种测试 Kafka Streams 应用程序的方法.这样我就可以定义输入事件,并且测试套件会向我显示输出.
I'm searching a way to test a Kafka Streams application. So that I can define the input events and the test suite shows me the output.
如果没有真正的 Kafka 设置,这可能吗?
Is this possible without a real Kafka setup?
推荐答案
更新 Kafka 1.1.0(2018 年 3 月 23 日发布):
Update Kafka 1.1.0 (released 23-Mar-2018):
KIP-247 added official test utils. Per the Upgrade Guide:
有一个新的工件 kafka-streams-test-utils
提供了 TopologyTestDriver
、ConsumerRecordFactory
和 OutputVerifier
> 班级.您可以将新工件作为常规依赖项包含在单元测试中,并使用测试驱动程序来测试 Kafka Streams 应用程序的业务逻辑.更多详情请参见 KIP-247.
来自文档:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-test-utils</artifactId>
<version>1.1.0</version>
<scope>test</scope>
</dependency>
测试驱动程序模拟库运行时不断从输入主题中获取记录并通过遍历拓扑来处理它们.您可以使用测试驱动程序来验证您指定的处理器拓扑是否通过手动管道输入数据记录计算出正确的结果.测试驱动程序捕获结果记录并允许查询其嵌入的状态存储:
The test driver simulates the library runtime that continuously fetches records from input topics and processes them by traversing the topology. You can use the test driver to verify that your specified processor topology computes the correct result with the manually piped in data records. The test driver captures the results records and allows to query its embedded state stores:
// Create your topology
Topology topology = new Topology();
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
// Run it on the test driver
TopologyTestDriver testDriver = new TopologyTestDriver(topology, config);
// Feed input data
ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>("input-topic", new StringSerializer(), new IntegerSerializer());
testDriver.pipe(factory.create("key", 42L));
// Verify output
ProducerRecord<String, Integer> outputRecord = testDriver.readOutput("output-topic", new StringDeserializer(), new LongDeserializer());
有关详细信息,请参阅文档.
See the documentation for details.
ProcessorTopologyTestDriver
从 0.11.0.0 开始可用.它在 kafka-streams
测试工件中可用(在 Maven 中用 指定):
ProcessorTopologyTestDriver
is available as of 0.11.0.0. It is available in the kafka-streams
test artifact (specified with <classifier>test</classifier>
in Maven):
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>0.11.0.0</version>
<classifier>test</classifier>
<scope>test</scope>
</dependency>
您还需要添加 kafka-clients
测试工件:
You will also need to add the kafka-clients
test artifact:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
<classifier>test</classifier>
<scope>test</scope>
</dependency>
然后就可以使用测试驱动了.根据 Javadoc,首先创建一个 ProcessorTopologyTestDriver
:
Then you can use the test driver. Per the Javadoc, first create a ProcessorTopologyTestDriver
:
StringSerializer strSerializer = new StringSerializer();
StringDeserializer strDeserializer = new StringDeserializer();
Properties props = new Properties();
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
props.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());
props.setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());
props.setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName());
props.setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());
props.setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName());
StreamsConfig config = new StreamsConfig(props);
TopologyBuilder builder = ...
ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(config, builder);
您可以将输入输入到拓扑中,就像您实际写入输入主题之一一样:
You can feed input into the topology as though you had actually written to one of the input topics:
driver.process("input-topic", "key1", "value1", strSerializer, strSerializer);
并阅读输出主题:
ProducerRecord<String, String> record1 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
ProducerRecord<String, String> record2 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
ProducerRecord<String, String> record3 = driver.readOutput("output-topic-2", strDeserializer, strDeserializer);
然后您可以对这些结果进行断言.
Then you can assert on these results.
这篇关于测试 Kafka Streams 拓扑的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!