将BoltA和BoltB的输出发送到BoltC的最简单方法是什么。我必须使用Joins还是有更简单的解决方案? A和B具有相同的字段(ts,metric_name,metric_count)。
// KafkaSpout --> LogDecoder
builder.setBolt(LOGDECODER_BOLT_ID, logdecoderBolt, 10).shuffleGrouping(KAFKA_SPOUT_ID);
// LogDecoder --> CountBolt
builder.setBolt(COUNT_BOLT_ID, countBolt, 10).shuffleGrouping(LOGDECODER_BOLT_ID);
// LogDecoder --> HttpResCodeCountBolt
builder.setBolt(HTTP_RES_CODE_COUNT_BOLT_ID, http_res_code_count_bolt, 10).shuffleGrouping(LOGDECODER_BOLT_ID);
# And now I want to send CountBolt and HttpResCodeCountBolt output to Aggregator Bolt.
// CountBolt --> AggregatwBolt
builder.setBolt(AGGREGATE_BOLT_ID, aggregateBolt, 5).fieldsGrouping((COUNT_BOLT_ID), new Fields("ts"));
// HttpResCodeCountBolt --> AggregatwBolt
builder.setBolt(AGGREGATE_BOLT_ID, aggregateBolt, 5).fieldsGrouping((HTTP_RES_CODE_COUNT_BOLT_ID), new Fields("ts"));
这可能吗 ?
最佳答案
是的。只需在fieldsGrouping调用中添加一个流ID(以下为“stream1”和“stream2”):
BoltDeclarer bd = builder.setBolt(AGGREGATE_BOLT_ID, aggregateBolt, 5);
bd.fieldsGrouping((COUNT_BOLT_ID), "stream1", new Fields("ts"));
bd.fieldsGrouping((HTTP_RES_CODE_COUNT_BOLT_ID), "stream2", new Fields("ts"));
然后在BoltC的execute()方法中,可以测试以查看元组来自哪个流:
public void execute(Tuple tuple) {
if ("stream1".equals(tuple.getSourceStreamId())) {
// this came from stream1
} else if ("stream2".equals(tuple.getSourceStreamId())) {
// this came from stream2
}
由于您知道元组来自哪个流,因此在两个流上不必具有相同形状的元组。您只需根据流ID对元组进行编码。
您还可以检查元组来自哪个组件(在我键入此内容时,我认为这可能更适合您的情况)以及发出元组的组件实例(任务)。
关于apache-storm - 将两个 bolt 的输出发送到Storm中的单个 bolt ?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/23943560/