我正在尝试编写执行以下操作的拓扑:
实际上,我想对集合进行更多处理。
我在本地进行了测试,看起来它正在运行。但是,我不确定是否在 bolt 上正确设置了分组,并且在将其部署在实际的 Storm 群集上时是否可以正常使用。如果有人可以帮助您查看此拓扑并提出任何错误,更改或改进,我将不胜感激。
谢谢。
这就是我的拓扑图。
builder.setSpout("spout", new TwitterFilterSpout("pittsburgh"));
builder.setBolt("sampleaggregate", new SampleAggregatorBolt())
.shuffleGrouping("spout");
builder.setBolt("printaggreator",new PrinterBolt()).shuffleGrouping("sampleaggregate");
聚集 bolt
public class SampleAggregatorBolt implements IRichBolt {
protected OutputCollector collector;
protected Tuple currentTuple;
protected Logger log;
/**
* Holds the messages in the bolt till you are ready to send them out
*/
protected List<Status> statusCache;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
log = Logger.getLogger(getClass().getName());
statusCache = new ArrayList<Status>();
}
@Override
public void execute(Tuple tuple) {
currentTuple = tuple;
Status currentStatus = null;
try {
currentStatus = (Status) tuple.getValue(0);
} catch (ClassCastException e) {
}
if (currentStatus != null) {
//add it to the status cache
statusCache.add(currentStatus);
collector.ack(tuple);
//check the size of the status cache and pass it to the next stage if you have enough messages to emit
if (statusCache.size() > 10) {
collector.emit(new Values(statusCache));
}
}
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("tweets"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null; //To change body of implemented methods use File | Settings | File Templates.
}
protected void setupNonSerializableAttributes() {
}
}
打印机 bolt
public class PrinterBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
System.out.println(tuple.size() + " " + tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer ofd) {
}
}
最佳答案
从我可以看到它看起来不错。不过,细节决定成败。我不确定您的聚合器 bolt 是做什么的,但是如果它对传递给它的值做出任何假设,那么您应该考虑适当的字段分组。当您使用默认并行度提示1时,这可能不会有太大的不同,但是您应该决定使用多个聚合 bolt 实例进行扩展时,您可能会要求使用非随机分组进行隐式逻辑假设。