本文介绍了什么可以用作 CassandraWriterBolt 的测试存根?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我从 Kafka 读取了一个 json,FieldExtractionBolt 读取该 json 将数据提取为元组值并将它们传递给 CassandraWriterBolt,后者又在 Cassandra 中写入一条记录,将所有这些元组值写入单独的列中.

I read a json from Kafka, FieldExtractionBolt reads that json extracts data into tuple values and passes them to CassandraWriterBolt, which in its turn writes a record in Cassandra writing all those tuple values into separate columns.

Kafka 上的 JSON 消息 -

JSON message on Kafka -

{"pair":"GBPJPY","bid":134.4563,"ask":134.4354}

FieldExtractionBolt -

FieldExtractionBolt -

String message = tuple.getStringByField("message");
Map values = new Gson().fromJson(message, Map.class);
basicOutputCollector.emit(new Values(values.get("pair"), values.get("bid"), values.get("ask")));

CassandraWriterBolt -

CassandraWriterBolt -

return (CassandraWriterBolt) new CassandraWriterBolt(async(simpleQuery("INSERT INTO currency(pair, ask, bid) VALUES (?, ?, ?);").with(fields("pair", "ask", "bid")))

我尝试根据此处给出的答案编写测试 - 如何通过编程方式插入消息来端到端测试Storm拓扑的功能

I tried writing a test based on the answer given here - How to E2E test functionality of Storm Topology by programmatically inserting messages

在我的项目中,我在 Spring 配置中定义了所有的 bolt、spout 和流.这使得编写/读取我的拓扑非常容易.我通过从 ApplicationContext 获取 bolt、spout 和流 bean 来构建拓扑.在我的 Spring 配置中,KafkaSpout 和 CassandraWriterBolt 是在prod"配置文件下定义的,因此它们只能在生产和测试"配置文件下使用,我为 KafkaSpout 和 CassandraWriterBolt 定义存根.对于 KafkaSpout,我使用了 FixedToupleSpout,对于 CassandraWriterBolt,我使用了 TestWordCounter.

In my project, I define all my bolts, spouts and streams in Spring config. This makes writing/reading my topology very easy. I build topology by getting bolt, spouts and stream beans from ApplicationContext. In my Spring config, KafkaSpout and CassandraWriterBolt are defined under 'prod' profile so that they only be used in prod and under 'test' profile I define stubs for KafkaSpout and CassandraWriterBolt. For KafkaSpout, I used FixedToupleSpout and for CassandraWriterBolt I used TestWordCounter.

这是我的测试

        @Test
        public void testTopology(){
        StormTopology topology = SpringBasedTopologyBuilder.getInstance().buildStormTopologyUsingApplicationContext(applicationContext);
        TestJob COMPLETE_TOPOLOGY_TESTJOB = (cluster) -> {
              MockedSources mocked = new MockedSources();
                    mocked.addMockData("kafkaSpout",
                new Values("{\"pair\":\"GBPJPY\",\"bid\":134.4563,\"ask\":134.4354}"),
        new Values("{\"pair\":\"GBPUSD\",\"bid\":1.4563,\"ask\":1.4354}"));

        Config topoConf = new Config();
        topoConf.setNumWorkers(2);

        CompleteTopologyParam ctp = new CompleteTopologyParam();
        ctp.setMockedSources(mocked);
        ctp.setStormConf(topoConf);
        Map<String, List<FixedTuple>> results = Testing.completeTopology(cluster, topology, ctp);
                    List<List<Object>> cassandraTuples = Testing.readTuples(results, "cassandraWriterBolt");
        List<List<Object>> expectedCassandraTuples = Arrays.asList(Arrays.asList("GBPJPY", 1), Arrays.asList("GBPUSD", 1),
                Arrays.asList("134.4563", 1), Arrays.asList("1.4563", 1), Arrays.asList("134.4354", 2));
        assertTrue(expectedCassandraTuples + " expected, but found " + cassandraTuples,
                Testing.multiseteq(expectedCassandraTuples, cassandraTuples));
    MkClusterParam param = new MkClusterParam();
    param.setSupervisors(4);

    Testing.withSimulatedTimeLocalCluster(param, COMPLETE_TOPOLOGY_TESTJOB);
}

@Configuration
@Import(MainApplication.class)
public static class TestConfig
{
    @Bean
    public IRichSpout kafkaSpout(){
        return new FixedTupleSpout(Arrays.asList(new FixedTuple(Arrays.asList("{\"pair\":\"GBPJPY\",\"bid\":134.4563,\"ask\":134.4354"))), new Fields(new String[]{"message"}));
    }

    @Bean
    public IBasicBolt cassandraWriterBolt(){
        return new TestWordCounter();
    }
}

我得到的结果不是我所期望的.我收到以下错误 -

Result I am getting is not what I am expecting. I am getting following error -

        java.lang.AssertionError: [[GBPJPY, 1], [GBPUSD, 1], [134.4563, 1], [1.4563, 1], [134.4354, 2]] expected, but found [[GBPJPY, 1], [GBPUSD, 1]]

看起来,TestWordCounter 只是将第一个值作为元组读取(仅货币对并跳过出价和询价).似乎 TestWordCounter 在这里不是一个正确的选择.什么是 CassandraWriterBolt 的正确存根,以便我可以断言它会收到 2 条记录,一个是 GBPJPY,另一个是 GBPUSD,还有他们的出价和要价?

Looks like, TestWordCounter is just reading first value as a tuple (currency pair only and skipping bid and ask). Seems TestWordCounter is not a right choice here. What would be correct stub for CassandraWriterBolt so that I can assert that it would receive 2 records one for GBPJPY and another for GBPUSD with their bid and ask price as well?

推荐答案

Testing.readTuples(results, "cassandraWriterBolt") 将返回cassandraWriterBolt"发出的元组.这就是你要测试的吗?我认为您试图断言cassandraWriterBolt"接收哪些元组,而不是它发出的内容.

Testing.readTuples(results, "cassandraWriterBolt") will return the tuples emitted by "cassandraWriterBolt". Is that what you're trying to test? I think you are trying to assert about which tuples "cassandraWriterBolt" receives, not what it emits.

您可以在这里做两件事.您可以使用 readTuples 从发射到 Cassandra bolt 的 bolts 中读取,而不是从 Cassandra bolt 中读取.如果您的拓扑结构简单(例如,写入 Cassandra bolt 的不同 bolt 不多),这是一个不错的解决方案.

You can do two things here. You can use readTuples to read from the bolts that are emitting to the Cassandra bolt, instead of reading from the Cassandra bolt. This is a decent solution if your topology is simple (e.g. not many different bolts writing to the Cassandra bolt).

更好的解决方案 (IMO) 是编写一个简单的 stub bolt 来替换 TestWordCounter.Bolt 唯一应该做的就是接收输入元组,确认它,然后在一个新元组中发出值.

A better solution (IMO) is to write a simple stub bolt to replace TestWordCounter. The only thing the bolt should do is receive the input tuple, ack it, and emit the values in a new tuple.

execute(Tuple input, BasicOutputCollector collector) {
  collector.emit(input.getValues());
}

然后您可以使用 readTuples 读取 Bolt 发出的元组,这些元组将与它接收到的值相同.

Then you can use readTuples to read the tuples that bolt emits, which will be the same values it receives.

这篇关于什么可以用作 CassandraWriterBolt 的测试存根?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-17 18:24