我正在尝试编写执行以下操作的拓扑:

  • 订阅Twitter提要(基于关键字)的喷口
  • 聚合 bolt ,用于聚合集合中的多个tweet(例如N),并将其发送给打印机 bolt
  • 一个简单的 bolt ,可将集合立即打印到控制台。

  • 实际上,我想对集合进行更多处理。

    我在本地进行了测试,看起来它正在运行。但是,我不确定是否在 bolt 上正确设置了分组,并且在将其部署在实际的 Storm 群集上时是否可以正常使用。如果有人可以帮助您查看此拓扑并提出任何错误,更改或改进,我将不胜感激。

    谢谢。

    这就是我的拓扑图。
    builder.setSpout("spout", new TwitterFilterSpout("pittsburgh"));
       builder.setBolt("sampleaggregate", new SampleAggregatorBolt())
                    .shuffleGrouping("spout");
       builder.setBolt("printaggreator",new PrinterBolt()).shuffleGrouping("sampleaggregate");
    

    聚集 bolt
    public class SampleAggregatorBolt implements IRichBolt {
    
        protected OutputCollector collector;
        protected Tuple currentTuple;
        protected Logger log;
        /**
         * Holds the messages in the bolt till you are ready to send them out
         */
        protected List<Status> statusCache;
    
        @Override
        public void prepare(Map stormConf, TopologyContext context,
                            OutputCollector collector) {
            this.collector = collector;
    
            log = Logger.getLogger(getClass().getName());
            statusCache = new ArrayList<Status>();
        }
    
        @Override
        public void execute(Tuple tuple) {
            currentTuple = tuple;
    
            Status currentStatus = null;
            try {
                currentStatus = (Status) tuple.getValue(0);
            } catch (ClassCastException e) {
            }
            if (currentStatus != null) {
    
                //add it to the status cache
                statusCache.add(currentStatus);
                collector.ack(tuple);
    
    
                //check the size of the status cache and pass it to the next stage if you have enough messages to emit
                if (statusCache.size() > 10) {
                    collector.emit(new Values(statusCache));
                }
    
            }
        }
    
        @Override
        public void cleanup() {
    
    
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("tweets"));
    
        }
    
        @Override
        public Map<String, Object> getComponentConfiguration() {
            return null;  //To change body of implemented methods use File | Settings | File Templates.
        }
    
    
        protected void setupNonSerializableAttributes() {
    
        }
    
    }
    

    打印机 bolt
    public class PrinterBolt extends BaseBasicBolt {
    
        @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            System.out.println(tuple.size() + " "  + tuple);
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer ofd) {
        }
    
    }
    

    最佳答案

    从我可以看到它看起来不错。不过,细节决定成败。我不确定您的聚合器 bolt 是做什么的,但是如果它对传递给它的值做出任何假设,那么您应该考虑适当的字段分组。当您使用默认并行度提示1时,这可能不会有太大的不同,但是您应该决定使用多个聚合 bolt 实例进行扩展时,您可能会要求使用非随机分组进行隐式逻辑假设。

    09-10 06:49
    查看更多