我正在寻找一种测试 Kafka Streams 应用程序的方法.这样我就可以定义输入事件,并且测试套件会向我显示输出.

I'm searching a way to test a Kafka Streams application. So that I can define the input events and the test suite shows me the output.

如果没有真正的 Kafka 设置,这可能吗?

Is this possible without a real Kafka setup?


更新 Kafka 1.1.0(2018 年 3 月 23 日发布):

Update Kafka 1.1.0 (released 23-Mar-2018):

KIP-247 添加了官方测试工具.根据升级指南:

KIP-247 added official test utils. Per the Upgrade Guide:

有一个新的工件 kafka-streams-test-utils 提供了 TopologyTestDriverConsumerRecordFactoryOutputVerifier> 班级.您可以将新工件作为常规依赖项包含在单元测试中,并使用测试驱动程序来测试 Kafka Streams 应用程序的业务逻辑.更多详情请参见 KIP-247.




The test driver simulates the library runtime that continuously fetches records from input topics and processes them by traversing the topology. You can use the test driver to verify that your specified processor topology computes the correct result with the manually piped in data records. The test driver captures the results records and allows to query its embedded state stores:

    // Create your topology
    Topology topology = new Topology();
    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");

    // Run it on the test driver
    TopologyTestDriver testDriver = new TopologyTestDriver(topology, config);

    // Feed input data
    ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>("input-topic", new StringSerializer(), new IntegerSerializer());
    testDriver.pipe(factory.create("key", 42L));

    // Verify output
    ProducerRecord<String, Integer> outputRecord = testDriver.readOutput("output-topic", new StringDeserializer(), new LongDeserializer());


See the documentation for details.

ProcessorTopologyTestDriver 从 开始可用.它在 kafka-streams 测试工件中可用(在 Maven 中用 指定):

ProcessorTopologyTestDriver is available as of It is available in the kafka-streams test artifact (specified with <classifier>test</classifier> in Maven):


您还需要添加 kafka-clients 测试工件:

You will also need to add the kafka-clients test artifact:


然后就可以使用测试驱动了.根据 Javadoc,首先创建一个 ProcessorTopologyTestDriver:

Then you can use the test driver. Per the Javadoc, first create a ProcessorTopologyTestDriver:

    StringSerializer strSerializer = new StringSerializer();
    StringDeserializer strDeserializer = new StringDeserializer();
    Properties props = new Properties();
    props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
    props.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());
    props.setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());
    props.setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName());
    props.setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());
    props.setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName());
    StreamsConfig config = new StreamsConfig(props);
    TopologyBuilder builder = ...
    ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(config, builder);


You can feed input into the topology as though you had actually written to one of the input topics:

    driver.process("input-topic", "key1", "value1", strSerializer, strSerializer);


    ProducerRecord<String, String> record1 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
    ProducerRecord<String, String> record2 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
    ProducerRecord<String, String> record3 = driver.readOutput("output-topic-2", strDeserializer, strDeserializer);


Then you can assert on these results.

