我实现了具有单个喷口和在本地群集模式下运行的螺栓的简单Storm拓扑。

由于某种原因,喷口的nextTuple()被多次调用。

知道为什么吗?

码:

喷口:

public class CommitFeedListener extends BaseRichSpout {
    private SpoutOutputCollector outputCollector;
    private List<String> commits;

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("commit"));
    }

    @Override
    public void open(Map configMap,
                     TopologyContext context,
                     SpoutOutputCollector outputCollector) {
        this.outputCollector = outputCollector;
    }

    **//that method is invoked more than once**
    @Override
    public void nextTuple() {

            outputCollector.emit(new Values("testValue"));

    }
}

螺栓:
public class EmailExtractor extends BaseBasicBolt {
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("email"));
    }
    @Override
    public void execute(Tuple tuple,
                        BasicOutputCollector outputCollector) {
        String commit = tuple.getStringByField("commit");
        System.out.println(commit);
    }
}

运行配置:
public class LocalTopologyRunner {
    private static final int TEN_MINUTES = 600000;
    public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("commit-feed-listener", new CommitFeedListener());
                builder
        .setBolt("email-extractor", new EmailExtractor())
                .shuffleGrouping("commit-feed-listener");
        Config config = new Config();
        config.setDebug(true);
        StormTopology topology = builder.createTopology();
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("github-commit-count-topology",
                config,
                topology);
        Utils.sleep(TEN_MINUTES);
        cluster.killTopology("github-commit-count");
        cluster.shutdown();
    }
}

谢谢大家
射线。

最佳答案

根据设计,nextTuple()在无限循环中被调用。这样可以对外部资源(数据库,流,IO等)进行脏检查。

如果您在nextTuple()中无事可做,则应睡一会儿以防止CPU垃圾邮件通过backtype.storm.utils.Utils传播。

Utils.sleep(pollIntervalInMilliseconds);

Storm是一种实时处理体系结构,因此确实是正确的行为。检查一些样本,以了解如何根据您的需要实施壶嘴。

10-07 20:29