本文介绍了什么可以用作CassandraWriterBolt的测试存根?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我从Kafka读取了一个json,FieldExtractionBolt读取了json将数据提取到元组值中并将其传递给CassandraWriterBolt,后者又在Cassandra中写入了一条记录,将所有这些元组值写入单独的列中.

Kafka上的JSON消息-

{"pair":"GBPJPY","bid":134.4563,"ask":134.4354}

FieldExtractionBolt-

String message = tuple.getStringByField("message");
Map values = new Gson().fromJson(message, Map.class);
basicOutputCollector.emit(new Values(values.get("pair"), values.get("bid"), values.get("ask")));

CassandraWriterBolt-

return (CassandraWriterBolt) new CassandraWriterBolt(async(simpleQuery("INSERT INTO currency(pair, ask, bid) VALUES (?, ?, ?);").with(fields("pair", "ask", "bid")))

我尝试根据此处给出的答案编写测试-

在我的项目中,我在Spring配置中定义了所有螺栓,喷嘴和流.这使得写/读我的拓扑非常容易.我通过从ApplicationContext获取螺栓,喷嘴和流Bean来构建拓扑.在我的Spring配置中,KafkaSpout和CassandraWriterBolt在"prod"配置文件下定义,因此它们仅在prod和"test"配置文件下使用,我为KafkaSpout和CassandraWriterBolt定义了存根.对于KafkaSpout,我使用FixedToupleSpout,对于CassandraWriterBolt,我使用TestWordCounter.

这是我的考验

        @Test
        public void testTopology(){
        StormTopology topology = SpringBasedTopologyBuilder.getInstance().buildStormTopologyUsingApplicationContext(applicationContext);
        TestJob COMPLETE_TOPOLOGY_TESTJOB = (cluster) -> {
              MockedSources mocked = new MockedSources();
                    mocked.addMockData("kafkaSpout",
                new Values("{\"pair\":\"GBPJPY\",\"bid\":134.4563,\"ask\":134.4354}"),
        new Values("{\"pair\":\"GBPUSD\",\"bid\":1.4563,\"ask\":1.4354}"));

        Config topoConf = new Config();
        topoConf.setNumWorkers(2);

        CompleteTopologyParam ctp = new CompleteTopologyParam();
        ctp.setMockedSources(mocked);
        ctp.setStormConf(topoConf);
        Map<String, List<FixedTuple>> results = Testing.completeTopology(cluster, topology, ctp);
                    List<List<Object>> cassandraTuples = Testing.readTuples(results, "cassandraWriterBolt");
        List<List<Object>> expectedCassandraTuples = Arrays.asList(Arrays.asList("GBPJPY", 1), Arrays.asList("GBPUSD", 1),
                Arrays.asList("134.4563", 1), Arrays.asList("1.4563", 1), Arrays.asList("134.4354", 2));
        assertTrue(expectedCassandraTuples + " expected, but found " + cassandraTuples,
                Testing.multiseteq(expectedCassandraTuples, cassandraTuples));
    MkClusterParam param = new MkClusterParam();
    param.setSupervisors(4);

    Testing.withSimulatedTimeLocalCluster(param, COMPLETE_TOPOLOGY_TESTJOB);
}

@Configuration
@Import(MainApplication.class)
public static class TestConfig
{
    @Bean
    public IRichSpout kafkaSpout(){
        return new FixedTupleSpout(Arrays.asList(new FixedTuple(Arrays.asList("{\"pair\":\"GBPJPY\",\"bid\":134.4563,\"ask\":134.4354"))), new Fields(new String[]{"message"}));
    }

    @Bean
    public IBasicBolt cassandraWriterBolt(){
        return new TestWordCounter();
    }
}

我得到的结果不是我所期望的.我收到以下错误-

        java.lang.AssertionError: [[GBPJPY, 1], [GBPUSD, 1], [134.4563, 1], [1.4563, 1], [134.4354, 2]] expected, but found [[GBPJPY, 1], [GBPUSD, 1]]

看起来,TestWordCounter只是将第一个值读取为元组(仅货币对,并且跳过买入和卖出).似乎TestWordCounter在这里不是正确的选择.什么是CassandraWriterBolt的正确存根,这样我可以断言它会收到2条记录,其中GBPJPY记录,GBPUSD记录,同时还有买入价和卖价?

解决方案

Testing.readTuples(results, "cassandraWriterBolt")将返回"cassandraWriterBolt"发出的元组.这就是您要测试的吗?我认为您正在尝试断言"cassandraWriterBolt"将接收哪些元组,而不是其发出的内容.

您可以在这里做两件事.您可以使用readTuples来读取发射到Cassandra螺栓的螺栓,而不是读取Cassandra螺栓.如果您的拓扑结构简单(例如,没有太多不同的螺栓写入Cassandra螺栓),这是一个不错的解决方案.

更好的解决方案(IMO)是编写一个简单的短螺栓来替换TestWordCounter.螺栓唯一要做的就是接收输入的元组,确认它,并在新的元组中发出值.

execute(Tuple input, BasicOutputCollector collector) {
  collector.emit(input.getValues());
}

然后,您可以使用readTuples来读取bolt发出的元组,该元组将与接收到的值相同.

I read a json from Kafka, FieldExtractionBolt reads that json extracts data into tuple values and passes them to CassandraWriterBolt, which in its turn writes a record in Cassandra writing all those tuple values into separate columns.

JSON message on Kafka -

{"pair":"GBPJPY","bid":134.4563,"ask":134.4354}

FieldExtractionBolt -

String message = tuple.getStringByField("message");
Map values = new Gson().fromJson(message, Map.class);
basicOutputCollector.emit(new Values(values.get("pair"), values.get("bid"), values.get("ask")));

CassandraWriterBolt -

return (CassandraWriterBolt) new CassandraWriterBolt(async(simpleQuery("INSERT INTO currency(pair, ask, bid) VALUES (?, ?, ?);").with(fields("pair", "ask", "bid")))

I tried writing a test based on the answer given here - How to E2E test functionality of Storm Topology by programmatically inserting messages

In my project, I define all my bolts, spouts and streams in Spring config. This makes writing/reading my topology very easy. I build topology by getting bolt, spouts and stream beans from ApplicationContext. In my Spring config, KafkaSpout and CassandraWriterBolt are defined under 'prod' profile so that they only be used in prod and under 'test' profile I define stubs for KafkaSpout and CassandraWriterBolt. For KafkaSpout, I used FixedToupleSpout and for CassandraWriterBolt I used TestWordCounter.

This is my test

        @Test
        public void testTopology(){
        StormTopology topology = SpringBasedTopologyBuilder.getInstance().buildStormTopologyUsingApplicationContext(applicationContext);
        TestJob COMPLETE_TOPOLOGY_TESTJOB = (cluster) -> {
              MockedSources mocked = new MockedSources();
                    mocked.addMockData("kafkaSpout",
                new Values("{\"pair\":\"GBPJPY\",\"bid\":134.4563,\"ask\":134.4354}"),
        new Values("{\"pair\":\"GBPUSD\",\"bid\":1.4563,\"ask\":1.4354}"));

        Config topoConf = new Config();
        topoConf.setNumWorkers(2);

        CompleteTopologyParam ctp = new CompleteTopologyParam();
        ctp.setMockedSources(mocked);
        ctp.setStormConf(topoConf);
        Map<String, List<FixedTuple>> results = Testing.completeTopology(cluster, topology, ctp);
                    List<List<Object>> cassandraTuples = Testing.readTuples(results, "cassandraWriterBolt");
        List<List<Object>> expectedCassandraTuples = Arrays.asList(Arrays.asList("GBPJPY", 1), Arrays.asList("GBPUSD", 1),
                Arrays.asList("134.4563", 1), Arrays.asList("1.4563", 1), Arrays.asList("134.4354", 2));
        assertTrue(expectedCassandraTuples + " expected, but found " + cassandraTuples,
                Testing.multiseteq(expectedCassandraTuples, cassandraTuples));
    MkClusterParam param = new MkClusterParam();
    param.setSupervisors(4);

    Testing.withSimulatedTimeLocalCluster(param, COMPLETE_TOPOLOGY_TESTJOB);
}

@Configuration
@Import(MainApplication.class)
public static class TestConfig
{
    @Bean
    public IRichSpout kafkaSpout(){
        return new FixedTupleSpout(Arrays.asList(new FixedTuple(Arrays.asList("{\"pair\":\"GBPJPY\",\"bid\":134.4563,\"ask\":134.4354"))), new Fields(new String[]{"message"}));
    }

    @Bean
    public IBasicBolt cassandraWriterBolt(){
        return new TestWordCounter();
    }
}

Result I am getting is not what I am expecting. I am getting following error -

        java.lang.AssertionError: [[GBPJPY, 1], [GBPUSD, 1], [134.4563, 1], [1.4563, 1], [134.4354, 2]] expected, but found [[GBPJPY, 1], [GBPUSD, 1]]

Looks like, TestWordCounter is just reading first value as a tuple (currency pair only and skipping bid and ask). Seems TestWordCounter is not a right choice here. What would be correct stub for CassandraWriterBolt so that I can assert that it would receive 2 records one for GBPJPY and another for GBPUSD with their bid and ask price as well?

解决方案

Testing.readTuples(results, "cassandraWriterBolt") will return the tuples emitted by "cassandraWriterBolt". Is that what you're trying to test? I think you are trying to assert about which tuples "cassandraWriterBolt" receives, not what it emits.

You can do two things here. You can use readTuples to read from the bolts that are emitting to the Cassandra bolt, instead of reading from the Cassandra bolt. This is a decent solution if your topology is simple (e.g. not many different bolts writing to the Cassandra bolt).

A better solution (IMO) is to write a simple stub bolt to replace TestWordCounter. The only thing the bolt should do is receive the input tuple, ack it, and emit the values in a new tuple.

execute(Tuple input, BasicOutputCollector collector) {
  collector.emit(input.getValues());
}

Then you can use readTuples to read the tuples that bolt emits, which will be the same values it receives.

这篇关于什么可以用作CassandraWriterBolt的测试存根?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-17 18:24